Compare commits
62 Commits
dsnet/admi
...
josh/io_ur
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f93fe2c43b | ||
|
|
8ae85389e7 | ||
|
|
07374071d0 | ||
|
|
c0deb1c65e | ||
|
|
fff4cd99ae | ||
|
|
ce52a5631b | ||
|
|
e33d6a049b | ||
|
|
abafdc6292 | ||
|
|
934709fd7a | ||
|
|
ba6c48c9e9 | ||
|
|
72e8ab8781 | ||
|
|
f355e685ef | ||
|
|
385851de40 | ||
|
|
61cde40000 | ||
|
|
0132d52e9a | ||
|
|
ed0f50f924 | ||
|
|
dd94b37ef3 | ||
|
|
c7cd8a7bae | ||
|
|
5023b6b0c3 | ||
|
|
152038cabd | ||
|
|
8a7b42a557 | ||
|
|
8478d34cca | ||
|
|
f329d69fb4 | ||
|
|
a1a2fb9181 | ||
|
|
4847a89ecf | ||
|
|
58c556ad15 | ||
|
|
334c09ab19 | ||
|
|
3e6e5a2eee | ||
|
|
6ec3378f7b | ||
|
|
385f86e85f | ||
|
|
52ccff8835 | ||
|
|
2118b821cd | ||
|
|
1e3e5fd8e7 | ||
|
|
e0d8dcf3eb | ||
|
|
c1bc58defc | ||
|
|
7cb1369b19 | ||
|
|
f274a0cfab | ||
|
|
f71ff18c11 | ||
|
|
961a23b9df | ||
|
|
f27a61502d | ||
|
|
a98ed81f2e | ||
|
|
275cb37031 | ||
|
|
9559752cb5 | ||
|
|
78dbd02718 | ||
|
|
4d58223422 | ||
|
|
b05f305eaf | ||
|
|
e337ed2033 | ||
|
|
0fb656794c | ||
|
|
283614d5e9 | ||
|
|
11036e23a1 | ||
|
|
522fa9306e | ||
|
|
7fd5e31070 | ||
|
|
a03ee93e21 | ||
|
|
56ba714e10 | ||
|
|
bb78cf81b6 | ||
|
|
38a872d2c1 | ||
|
|
6ef301e787 | ||
|
|
48e338130e | ||
|
|
56ece41326 | ||
|
|
53117d9761 | ||
|
|
f75d32151b | ||
|
|
fcdc9086a2 |
3
.github/workflows/linux.yml
vendored
3
.github/workflows/linux.yml
vendored
@@ -25,6 +25,9 @@ jobs:
|
||||
- name: Check out code into the Go module directory
|
||||
uses: actions/checkout@v1
|
||||
|
||||
- name: Install liburing
|
||||
run: sudo apt install liburing-dev
|
||||
|
||||
- name: Basic build
|
||||
run: go build ./cmd/...
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
"golang.zx2c4.com/wireguard/tun"
|
||||
"inet.af/netaddr"
|
||||
"tailscale.com/net/packet"
|
||||
"tailscale.com/net/uring"
|
||||
"tailscale.com/types/ipproto"
|
||||
"tailscale.com/types/logger"
|
||||
"tailscale.com/wgengine/filter"
|
||||
@@ -160,6 +161,17 @@ func Wrap(logf logger.Logf, tdev tun.Device) *Wrapper {
|
||||
filterFlags: filter.LogAccepts | filter.LogDrops,
|
||||
}
|
||||
|
||||
if uring.Available() {
|
||||
uringTun, err := uring.NewTUN(tdev)
|
||||
name, _ := tdev.Name()
|
||||
if err != nil {
|
||||
logf("not using io_uring for TUN %v: %v", name, err)
|
||||
} else {
|
||||
logf("using uring for TUN %v", name)
|
||||
tdev = uringTun
|
||||
}
|
||||
}
|
||||
|
||||
go tun.poll()
|
||||
go tun.pumpEvents()
|
||||
// The buffer starts out consumed.
|
||||
|
||||
12
net/uring/all.go
Normal file
12
net/uring/all.go
Normal file
@@ -0,0 +1,12 @@
|
||||
// Package uring provides a net.PacketConn and tun.Device that use io_uring for I/O.
|
||||
package uring
|
||||
|
||||
import "runtime"
|
||||
|
||||
// This file contains code shared across all platforms.
|
||||
|
||||
// Available reports whether io_uring is available on this machine.
|
||||
// If Available reports false, no other package uring APIs should be called.
|
||||
func Available() bool {
|
||||
return runtime.GOOS == "linux"
|
||||
}
|
||||
26
net/uring/capability_linux.go
Normal file
26
net/uring/capability_linux.go
Normal file
@@ -0,0 +1,26 @@
|
||||
package uring
|
||||
|
||||
// #cgo CFLAGS: -I${SRCDIR}/liburing/src/include
|
||||
// #cgo LDFLAGS: -L${SRCDIR}/liburing/src/ -luring
|
||||
// #include "io_uring_linux.c"
|
||||
import "C"
|
||||
|
||||
import (
|
||||
"syscall"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
// hasUring reports whether it is possible to use io_uring syscalls on the system.
|
||||
func uringSupported() bool {
|
||||
probe, err := C.io_uring_get_probe()
|
||||
if err == nil && probe != nil {
|
||||
C.free(unsafe.Pointer(probe))
|
||||
}
|
||||
return err != syscall.ENOSYS
|
||||
}
|
||||
|
||||
// If/when we want to probe for specific io_uring capabilities,
|
||||
// rather than just the presence of the syscalls,
|
||||
// this code by Julian Knodt might be handy:
|
||||
// https://gist.github.com/JulianKnodt/e7030739d163f5251eb47f8ac1d67b62
|
||||
// (See discussion in https://github.com/tailscale/tailscale/pull/2371.)
|
||||
202
net/uring/file_linux.go
Normal file
202
net/uring/file_linux.go
Normal file
@@ -0,0 +1,202 @@
|
||||
package uring
|
||||
|
||||
// #cgo LDFLAGS: -luring
|
||||
// #include "io_uring_linux.c"
|
||||
import "C"
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"tailscale.com/syncs"
|
||||
)
|
||||
|
||||
// A file is a file handle that uses io_uring for reads and writes.
|
||||
// It is intended for use with TUN fds, and thus only supports
|
||||
// reading from and writing to file offset 0.
|
||||
type file struct {
|
||||
// We have two urings so that we don't have to demux completion events.
|
||||
|
||||
// writeRing is the uring for pwritev calls.
|
||||
writeRing writeRing
|
||||
// readRing is the uring for preadv calls.
|
||||
readRing *C.go_uring
|
||||
|
||||
// close ensures that file closes occur exactly once.
|
||||
close sync.Once
|
||||
// closed indicates whether the file has been closed.
|
||||
closed syncs.AtomicBool
|
||||
// shutdown is a sequence of funcs to be called when the UDPConn closes.
|
||||
shutdown []func()
|
||||
|
||||
// file is the os file underlying this file.
|
||||
file *os.File
|
||||
|
||||
// readReqs is an array of re-usable file preadv requests.
|
||||
// We attempt to keep them all queued up for the kernel to fulfill.
|
||||
// The array length is tied to the size of the uring.
|
||||
readReqs [1]*C.goreq // Whoops! The kernel apparently cannot handle more than 1 concurrent preadv calls on a tun device!
|
||||
|
||||
// refcount counts the number of outstanding read/write requests.
|
||||
// See the length comment for UDPConn.refcount for details.
|
||||
refcount syncs.AtomicInt32
|
||||
}
|
||||
|
||||
func newFile(f *os.File) (*file, error) {
|
||||
u := &file{
|
||||
readRing: new(C.go_uring),
|
||||
file: f,
|
||||
}
|
||||
u.writeRing.ring = new(C.go_uring)
|
||||
|
||||
fd := f.Fd()
|
||||
if ret := C.initialize(u.readRing, C.int(fd)); ret < 0 {
|
||||
u.doShutdown()
|
||||
return nil, fmt.Errorf("readRing initialization failed: %w", syscall.Errno(-ret))
|
||||
}
|
||||
u.shutdown = append(u.shutdown, func() {
|
||||
C.io_uring_queue_exit(u.readRing)
|
||||
})
|
||||
|
||||
if ret := C.initialize(u.writeRing.ring, C.int(fd)); ret < 0 {
|
||||
u.doShutdown()
|
||||
return nil, fmt.Errorf("writeRing initialization failed: %w", syscall.Errno(-ret))
|
||||
}
|
||||
u.shutdown = append(u.shutdown, func() {
|
||||
C.io_uring_queue_exit(u.writeRing.ring)
|
||||
})
|
||||
|
||||
// Initialize buffers
|
||||
for i := range &u.readReqs {
|
||||
u.readReqs[i] = C.initializeReq(bufferSize, C.size_t(i), 0) // 0: not used for IP addresses
|
||||
}
|
||||
u.writeRing.initReqs(0) // 0: not used for IP addresses
|
||||
u.shutdown = append(u.shutdown, func() {
|
||||
for _, r := range u.readReqs {
|
||||
C.freeReq(r)
|
||||
}
|
||||
u.writeRing.freeReqs()
|
||||
})
|
||||
|
||||
// Initialize read half.
|
||||
for i := range u.readReqs {
|
||||
if err := u.submitReadvRequest(i); err != nil {
|
||||
u.doShutdown()
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Initialization succeeded.
|
||||
// Take ownership of the file.
|
||||
u.shutdown = append(u.shutdown, func() {
|
||||
u.file.Close()
|
||||
})
|
||||
return u, nil
|
||||
}
|
||||
|
||||
func (u *file) submitReadvRequest(idx int) error {
|
||||
errno := C.submit_readv_request(u.readRing, u.readReqs[idx])
|
||||
if errno < 0 {
|
||||
return fmt.Errorf("uring.submitReadvRequest failed: %w", syscall.Errno(-errno))
|
||||
}
|
||||
atomic.AddInt32(u.readReqInKernel(idx), 1) // TODO: CAS?
|
||||
return nil
|
||||
}
|
||||
|
||||
func (u *file) readReqInKernel(idx int) *int32 {
|
||||
return (*int32)(unsafe.Pointer(&u.readReqs[idx].in_kernel))
|
||||
}
|
||||
|
||||
// Read data into buf.
|
||||
func (u *file) Read(buf []byte) (n int, err error) {
|
||||
// The docs for the u.refcount field document this prologue.
|
||||
u.refcount.Add(1)
|
||||
defer u.refcount.Add(-1)
|
||||
if u.closed.Get() {
|
||||
return 0, os.ErrClosed
|
||||
}
|
||||
|
||||
n, idx, err := waitCompletion(u.readRing)
|
||||
if errors.Is(err, syscall.ECANCELED) {
|
||||
atomic.AddInt32(u.readReqInKernel(idx), -1)
|
||||
return 0, os.ErrClosed
|
||||
}
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("Read: io_uring failed to issue syscall: %w", err)
|
||||
}
|
||||
atomic.AddInt32(u.readReqInKernel(idx), -1)
|
||||
if n < 0 {
|
||||
// io_uring ran our syscall, which failed.
|
||||
// Best effort attempt not to leak idx.
|
||||
u.submitReadvRequest(int(idx))
|
||||
return 0, fmt.Errorf("Read: syscall failed: %w", syscall.Errno(-n))
|
||||
}
|
||||
// Success.
|
||||
r := u.readReqs[idx]
|
||||
rbuf := sliceOf(r.buf, n)
|
||||
copy(buf, rbuf)
|
||||
// Queue up a new request.
|
||||
if err := u.submitReadvRequest(int(idx)); err != nil {
|
||||
// Aggressively return this error.
|
||||
return 0, err
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (u *file) Write(buf []byte) (int, error) {
|
||||
// The docs for the u.refcount field document this prologue.
|
||||
u.refcount.Add(1)
|
||||
defer u.refcount.Add(-1)
|
||||
if u.closed.Get() {
|
||||
return 0, os.ErrClosed
|
||||
}
|
||||
|
||||
// Get a req, blocking as needed.
|
||||
r, err := u.writeRing.getReq()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
// Do the write.
|
||||
rbuf := sliceOf(r.buf, len(buf))
|
||||
copy(rbuf, buf)
|
||||
C.submit_writev_request(u.writeRing.ring, r, C.int(len(buf)))
|
||||
// Get an extra buffer, if available.
|
||||
u.writeRing.prefetch()
|
||||
return len(buf), nil
|
||||
}
|
||||
|
||||
func (u *file) Close() error {
|
||||
u.close.Do(func() {
|
||||
// Announce to readers and writers that we are closing down.
|
||||
// Busy loop until all reads and writes are unblocked.
|
||||
// See the docs for u.refcount.
|
||||
u.closed.Set(true)
|
||||
for {
|
||||
// Request that the kernel cancel all submitted reads. (Writes don't block indefinitely.)
|
||||
for idx := range u.readReqs {
|
||||
if atomic.LoadInt32(u.readReqInKernel(idx)) != 0 {
|
||||
C.submit_cancel_request(u.readRing, C.size_t(idx))
|
||||
}
|
||||
}
|
||||
if u.refcount.Get() == 0 {
|
||||
break
|
||||
}
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
// Do the rest of the shutdown.
|
||||
u.doShutdown()
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (u *file) doShutdown() {
|
||||
for _, fn := range u.shutdown {
|
||||
fn()
|
||||
}
|
||||
}
|
||||
33
net/uring/file_test_linux.go
Normal file
33
net/uring/file_test_linux.go
Normal file
@@ -0,0 +1,33 @@
|
||||
package uring
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
qt "github.com/frankban/quicktest"
|
||||
)
|
||||
|
||||
func TestFileRead(t *testing.T) {
|
||||
if !Available() {
|
||||
t.Skip("io_uring not available")
|
||||
}
|
||||
c := qt.New(t)
|
||||
|
||||
const path = "testdata/voltaire.txt"
|
||||
want, err := os.ReadFile(path)
|
||||
c.Assert(err, qt.IsNil)
|
||||
|
||||
f, err := os.Open(path)
|
||||
c.Assert(err, qt.IsNil)
|
||||
t.Cleanup(func() { f.Close() })
|
||||
|
||||
uf, err := newFile(f)
|
||||
if err != nil {
|
||||
t.Skipf("io_uring not available: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { uf.Close() })
|
||||
buf := make([]byte, len(want)+128)
|
||||
n, err := uf.Read(buf)
|
||||
c.Assert(err, qt.IsNil)
|
||||
c.Assert(buf[:n], qt.DeepEquals, want)
|
||||
}
|
||||
157
net/uring/io_uring_linux.c
Normal file
157
net/uring/io_uring_linux.c
Normal file
@@ -0,0 +1,157 @@
|
||||
#include <arpa/inet.h> // debugging
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
#include <errno.h>
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/ioctl.h>
|
||||
#include <liburing.h>
|
||||
#include <stdlib.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/socket.h>
|
||||
#include <netinet/in.h>
|
||||
#include <netinet/udp.h>
|
||||
|
||||
// TODO: use fixed buffers? https://unixism.net/loti/tutorial/fixed_buffers.html
|
||||
|
||||
typedef struct io_uring go_uring;
|
||||
typedef struct msghdr go_msghdr;
|
||||
typedef struct iovec go_iovec;
|
||||
typedef struct sockaddr_in go_sockaddr_in;
|
||||
typedef struct io_uring_params go_io_uring_params;
|
||||
|
||||
static int initialize(struct io_uring *ring, int fd) {
|
||||
int ret = io_uring_queue_init(16, ring, 0); // 16: size of ring
|
||||
if (ret < 0) {
|
||||
return ret;
|
||||
}
|
||||
ret = io_uring_register_files(ring, &fd, 1);
|
||||
// TODO: Do we need to unregister files on close, or is Closing the uring enough?
|
||||
if (ret < 0) {
|
||||
perror("io_uring_queue_init");
|
||||
return ret;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
struct req {
|
||||
struct msghdr hdr;
|
||||
struct iovec iov;
|
||||
struct sockaddr_in sa;
|
||||
struct sockaddr_in6 sa6;
|
||||
// in_kernel indicates (by being non-zero) whether this request is sitting in the kernel
|
||||
// It is accessed atomically.
|
||||
int32_t in_kernel;
|
||||
char *buf;
|
||||
size_t idx;
|
||||
};
|
||||
|
||||
typedef struct req goreq;
|
||||
|
||||
static struct req *initializeReq(size_t sz, size_t idx, int ipLen) {
|
||||
struct req *r = malloc(sizeof(struct req));
|
||||
memset(r, 0, sizeof(*r));
|
||||
r->buf = malloc(sz);
|
||||
memset(r->buf, 0, sz);
|
||||
r->iov.iov_base = r->buf;
|
||||
r->iov.iov_len = sz;
|
||||
r->hdr.msg_iov = &r->iov;
|
||||
r->hdr.msg_iovlen = 1;
|
||||
r->idx = idx;
|
||||
switch(ipLen) {
|
||||
case 4:
|
||||
r->hdr.msg_name = &r->sa;
|
||||
r->hdr.msg_namelen = sizeof(r->sa);
|
||||
break;
|
||||
case 16:
|
||||
r->hdr.msg_name = &r->sa6;
|
||||
r->hdr.msg_namelen = sizeof(r->sa6);
|
||||
break;
|
||||
}
|
||||
return r;
|
||||
}
|
||||
|
||||
static void freeReq(struct req *r) {
|
||||
free(r->buf);
|
||||
free(r);
|
||||
}
|
||||
|
||||
// submit a recvmsg request via liburing
|
||||
// TODO: What recvfrom support arrives, maybe use that instead?
|
||||
static int submit_recvmsg_request(struct io_uring *ring, struct req *r) {
|
||||
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
|
||||
io_uring_prep_recvmsg(sqe, 0, &r->hdr, 0); // use the 0th file in the list of registered fds
|
||||
io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE);
|
||||
io_uring_sqe_set_data(sqe, (void *)(r->idx));
|
||||
io_uring_submit(ring);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// submit a recvmsg request via liburing
|
||||
// TODO: What recvfrom support arrives, maybe use that instead?
|
||||
static int submit_sendmsg_request(struct io_uring *ring, struct req *r, int buflen) {
|
||||
r->iov.iov_len = buflen;
|
||||
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
|
||||
io_uring_prep_sendmsg(sqe, 0, &r->hdr, 0); // use the 0th file in the list of registered fds
|
||||
io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE);
|
||||
io_uring_sqe_set_data(sqe, (void *)(r->idx));
|
||||
io_uring_submit(ring);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void submit_cancel_request(struct io_uring *ring, size_t idx) {
|
||||
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
|
||||
io_uring_prep_cancel(sqe, (void *)(idx), 0);
|
||||
io_uring_submit(ring);
|
||||
}
|
||||
|
||||
// submit a writev request via liburing
|
||||
static int submit_writev_request(struct io_uring *ring, struct req *r, int buflen) {
|
||||
r->iov.iov_len = buflen;
|
||||
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
|
||||
io_uring_prep_writev(sqe, 0, &r->iov, 1, 0); // use the 0th file in the list of registered fds
|
||||
io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE);
|
||||
io_uring_sqe_set_data(sqe, (void *)(r->idx));
|
||||
int submitted = io_uring_submit(ring);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// submit a readv request via liburing
|
||||
static int submit_readv_request(struct io_uring *ring, struct req *r) {
|
||||
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
|
||||
io_uring_prep_readv(sqe, 0, &r->iov, 1, 0); // use the 0th file in the list of registered fds
|
||||
io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE);
|
||||
io_uring_sqe_set_data(sqe, (void *)(r->idx));
|
||||
int submitted = io_uring_submit(ring);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
struct completion_result {
|
||||
int err;
|
||||
int n;
|
||||
size_t idx;
|
||||
};
|
||||
|
||||
typedef struct completion_result go_completion_result;
|
||||
|
||||
static go_completion_result completion(struct io_uring *ring, int block) {
|
||||
struct io_uring_cqe *cqe;
|
||||
struct completion_result res;
|
||||
res.err = 0;
|
||||
res.n = 0;
|
||||
res.idx = 0;
|
||||
if (block) {
|
||||
res.err = io_uring_wait_cqe(ring, &cqe);
|
||||
} else {
|
||||
res.err = io_uring_peek_cqe(ring, &cqe);
|
||||
}
|
||||
if (res.err < 0) {
|
||||
return res;
|
||||
}
|
||||
res.idx = (size_t)io_uring_cqe_get_data(cqe);
|
||||
res.n = cqe->res;
|
||||
io_uring_cqe_seen(ring, cqe);
|
||||
return res;
|
||||
}
|
||||
118
net/uring/io_uring_linux.go
Normal file
118
net/uring/io_uring_linux.go
Normal file
@@ -0,0 +1,118 @@
|
||||
package uring
|
||||
|
||||
// #cgo LDFLAGS: -luring
|
||||
// #include "io_uring_linux.c"
|
||||
import "C"
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"syscall"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
// A writeRing is an io_uring usable for sendmsg or pwritev calls.
|
||||
// It manages an array of re-usable buffers.
|
||||
type writeRing struct {
|
||||
ring *C.go_uring
|
||||
// reqs is an array of re-usable write requests.
|
||||
// We dispatch them to the kernel as writes are requested.
|
||||
// The array length is tied to the size of the uring.
|
||||
reqs [8]*C.goreq
|
||||
// reqC is a channel containing indices into reqs
|
||||
// that are free to use (that is, not in the kernel).
|
||||
reqC chan int
|
||||
}
|
||||
|
||||
// initReqs initializes r's reqs so that they can be used for writes/sends.
|
||||
func (r *writeRing) initReqs(ipLen int) {
|
||||
for i := range &r.reqs {
|
||||
r.reqs[i] = C.initializeReq(bufferSize, C.size_t(i), C.int(ipLen))
|
||||
}
|
||||
r.reqC = make(chan int, len(r.reqs))
|
||||
for i := range r.reqs {
|
||||
r.reqC <- i
|
||||
}
|
||||
}
|
||||
|
||||
// getReq gets a req usable for a write/send.
|
||||
// It blocks until such a req is available.
|
||||
func (r *writeRing) getReq() (req *C.goreq, err error) {
|
||||
var idx int
|
||||
select {
|
||||
case idx = <-r.reqC:
|
||||
default:
|
||||
// No request available. Get one from the kernel.
|
||||
n, idx, err := waitCompletion(r.ring)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Write io_uring call failed: %w", err)
|
||||
}
|
||||
if n < 0 {
|
||||
// Past syscall failed.
|
||||
r.reqC <- idx // don't leak idx
|
||||
return nil, fmt.Errorf("previous Write failed: %w", syscall.Errno(-n))
|
||||
}
|
||||
}
|
||||
return r.reqs[idx], nil
|
||||
}
|
||||
|
||||
// prefetch attempts to fetch a req for use by future writes.
|
||||
// It does not block.
|
||||
// TODO: does this actually buy us anything?
|
||||
// TODO: return errors encountered here, rather than delaying them until later?
|
||||
func (r *writeRing) prefetch() {
|
||||
idx, ok := peekCompletion(r.ring)
|
||||
if ok {
|
||||
// Put the request buffer back in the usable queue.
|
||||
// Should never block, by construction.
|
||||
r.reqC <- idx
|
||||
}
|
||||
}
|
||||
|
||||
// freeReqs frees the reqs allocated by initReqs.
|
||||
func (r *writeRing) freeReqs() {
|
||||
for _, req := range r.reqs {
|
||||
C.freeReq(req)
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
noBlockForCompletion = 0
|
||||
blockForCompletion = 1
|
||||
)
|
||||
|
||||
// waitCompletion blocks until a completion on ring succeeds, or until *fd == 0.
|
||||
// If *fd == 0, that indicates that the ring is no loner valid, in which case waitCompletion returns net.ErrClosed.
|
||||
// Reads of *fd are atomic.
|
||||
func waitCompletion(ring *C.go_uring) (n, idx int, err error) {
|
||||
for {
|
||||
r := C.completion(ring, blockForCompletion)
|
||||
if syscall.Errno(-r.err) == syscall.EAGAIN || syscall.Errno(-r.err) == syscall.EINTR {
|
||||
continue
|
||||
}
|
||||
var err error
|
||||
if r.err < 0 {
|
||||
err = syscall.Errno(-r.err)
|
||||
}
|
||||
return int(r.n), int(r.idx), err
|
||||
}
|
||||
}
|
||||
|
||||
func peekCompletion(ring *C.go_uring) (idx int, ok bool) {
|
||||
r := C.completion(ring, noBlockForCompletion)
|
||||
if r.err < 0 {
|
||||
return 0, false
|
||||
}
|
||||
return int(r.idx), true
|
||||
}
|
||||
|
||||
// sliceOf returns ptr[:n] as a byte slice.
|
||||
// TODO: replace with unsafe.Slice once we are using Go 1.17.
|
||||
func sliceOf(ptr *C.char, n int) []byte {
|
||||
var b []byte
|
||||
h := (*reflect.SliceHeader)(unsafe.Pointer(&b))
|
||||
h.Data = uintptr(unsafe.Pointer(ptr))
|
||||
h.Len = n
|
||||
h.Cap = n
|
||||
return b
|
||||
}
|
||||
11
net/uring/io_uring_test.go
Normal file
11
net/uring/io_uring_test.go
Normal file
@@ -0,0 +1,11 @@
|
||||
// +build linux
|
||||
|
||||
package uring
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestUringAvailable(t *testing.T) {
|
||||
uringSupported()
|
||||
}
|
||||
27
net/uring/stubs.go
Normal file
27
net/uring/stubs.go
Normal file
@@ -0,0 +1,27 @@
|
||||
// +build !linux
|
||||
|
||||
package uring
|
||||
|
||||
import (
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"golang.zx2c4.com/wireguard/tun"
|
||||
"inet.af/netaddr"
|
||||
)
|
||||
|
||||
// This file contains stubs for platforms that are known at compile time not to support io_uring.
|
||||
|
||||
type UDPConn struct{}
|
||||
|
||||
func NewUDPConn(net.PacketConn) (*UDPConn, error) { panic("io_uring unavailable") }
|
||||
func (u *UDPConn) ReadFromNetaddr([]byte) (int, netaddr.IPPort, error) { panic("io_uring unavailable") }
|
||||
func (u *UDPConn) Close() error { panic("io_uring unavailable") }
|
||||
func (c *UDPConn) ReadFrom([]byte) (int, net.Addr, error) { panic("io_uring unavailable") }
|
||||
func (u *UDPConn) WriteTo([]byte, net.Addr) (int, error) { panic("io_uring unavailable") }
|
||||
func (c *UDPConn) LocalAddr() net.Addr { panic("io_uring unavailable") }
|
||||
func (c *UDPConn) SetDeadline(time.Time) error { panic("io_uring unavailable") }
|
||||
func (c *UDPConn) SetReadDeadline(time.Time) error { panic("io_uring unavailable") }
|
||||
func (c *UDPConn) SetWriteDeadline(time.Time) error { panic("io_uring unavailable") }
|
||||
|
||||
func NewTUN(tun.Device) (tun.Device, error) { panic("io_uring unavailable") }
|
||||
1
net/uring/testdata/voltaire.txt
vendored
Normal file
1
net/uring/testdata/voltaire.txt
vendored
Normal file
@@ -0,0 +1 @@
|
||||
If io_uring did not exist, it would be necessary to invent it.
|
||||
104
net/uring/tun_linux.go
Normal file
104
net/uring/tun_linux.go
Normal file
@@ -0,0 +1,104 @@
|
||||
package uring
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"reflect"
|
||||
"syscall"
|
||||
"unsafe"
|
||||
|
||||
"golang.org/x/net/ipv6"
|
||||
"golang.zx2c4.com/wireguard/tun"
|
||||
)
|
||||
|
||||
// Wrap files into TUN devices.
|
||||
|
||||
func NewTUN(d tun.Device) (tun.Device, error) {
|
||||
nt, ok := d.(*tun.NativeTun)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("NewTUN only wraps *tun.NativeTun, got %T", d)
|
||||
}
|
||||
f, err := newFile(nt.File())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
v := reflect.ValueOf(nt)
|
||||
field, ok := v.Elem().Type().FieldByName("errors")
|
||||
if !ok {
|
||||
return nil, errors.New("could not find internal tun.NativeTun errors field")
|
||||
}
|
||||
ptr := unsafe.Pointer(nt)
|
||||
ptr = unsafe.Pointer(uintptr(ptr) + field.Offset) // TODO: switch to unsafe.Add with Go 1.17...as if that's the worst thing in this line
|
||||
c := *(*chan error)(ptr)
|
||||
return &TUN{d: nt, f: f, errors: c}, nil
|
||||
}
|
||||
|
||||
// No nopi
|
||||
type TUN struct {
|
||||
d *tun.NativeTun
|
||||
f *file
|
||||
errors chan error
|
||||
}
|
||||
|
||||
func (t *TUN) File() *os.File {
|
||||
return t.f.file
|
||||
}
|
||||
|
||||
func (t *TUN) Read(buf []byte, offset int) (int, error) {
|
||||
select {
|
||||
case err := <-t.errors:
|
||||
return 0, err
|
||||
default:
|
||||
}
|
||||
// TODO: upstream has graceful shutdown error handling here.
|
||||
buff := buf[offset-4:]
|
||||
n, err := t.f.Read(buff[:])
|
||||
if errors.Is(err, syscall.EBADFD) {
|
||||
err = os.ErrClosed
|
||||
}
|
||||
if n < 4 {
|
||||
n = 0
|
||||
} else {
|
||||
n -= 4
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (t *TUN) Write(buf []byte, offset int) (int, error) {
|
||||
// below copied from wireguard-go NativeTun.Write
|
||||
|
||||
// reserve space for header
|
||||
buf = buf[offset-4:]
|
||||
|
||||
// add packet information header
|
||||
buf[0] = 0x00
|
||||
buf[1] = 0x00
|
||||
if buf[4]>>4 == ipv6.Version {
|
||||
buf[2] = 0x86
|
||||
buf[3] = 0xdd
|
||||
} else {
|
||||
buf[2] = 0x08
|
||||
buf[3] = 0x00
|
||||
}
|
||||
|
||||
n, err := t.f.Write(buf)
|
||||
if errors.Is(err, syscall.EBADFD) {
|
||||
err = os.ErrClosed
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (t *TUN) Flush() error { return t.d.Flush() }
|
||||
func (t *TUN) MTU() (int, error) { return t.d.MTU() }
|
||||
func (t *TUN) Name() (string, error) { return t.d.Name() }
|
||||
func (t *TUN) Events() chan tun.Event { return t.d.Events() }
|
||||
|
||||
func (t *TUN) Close() error {
|
||||
err1 := t.f.Close()
|
||||
err2 := t.d.Close()
|
||||
if err1 != nil {
|
||||
return err1
|
||||
}
|
||||
return err2
|
||||
}
|
||||
306
net/uring/udp_linux.go
Normal file
306
net/uring/udp_linux.go
Normal file
@@ -0,0 +1,306 @@
|
||||
package uring
|
||||
|
||||
// #cgo LDFLAGS: -luring
|
||||
// #include "io_uring_linux.c"
|
||||
import "C"
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"golang.zx2c4.com/wireguard/device"
|
||||
"inet.af/netaddr"
|
||||
"tailscale.com/syncs"
|
||||
"tailscale.com/util/endian"
|
||||
)
|
||||
|
||||
const bufferSize = device.MaxSegmentSize
|
||||
|
||||
// A UDPConn is a UDP connection that uses io_uring to send and receive packets.
|
||||
type UDPConn struct {
|
||||
// We have two urings so that we don't have to demux completion events.
|
||||
|
||||
// recvRing is the uring for recvmsg calls.
|
||||
recvRing *C.go_uring
|
||||
// sendRing is the uring for sendmsg calls.
|
||||
sendRing writeRing
|
||||
|
||||
// close ensures that connection closes occur exactly once.
|
||||
close sync.Once
|
||||
// closed indicates whether the connection has been closed.
|
||||
closed syncs.AtomicBool
|
||||
// shutdown is a sequence of funcs to be called when the UDPConn closes.
|
||||
shutdown []func()
|
||||
|
||||
// file is the os file underlying this connection.
|
||||
file *os.File
|
||||
// local is the local address of this UDPConn.
|
||||
local net.Addr
|
||||
// is4 indicates whether the conn is an IPv4 connection.
|
||||
is4 bool
|
||||
|
||||
// recvReqs is an array of re-usable UDP recvmsg requests.
|
||||
// We attempt to keep them all queued up for the kernel to fulfill.
|
||||
// The array length is tied to the size of the uring.
|
||||
recvReqs [8]*C.goreq
|
||||
// sendReqs is an array of re-usable UDP sendmsg requests.
|
||||
// We dispatch them to the kernel as writes are requested.
|
||||
// The array length is tied to the size of the uring.
|
||||
sendReqs [8]*C.goreq
|
||||
// sendReqC is a channel containing indices into sendReqs
|
||||
// that are free to use (that is, not in the kernel).
|
||||
sendReqC chan int
|
||||
|
||||
// refcount counts the number of outstanding read/write requests.
|
||||
// refcount is used for graceful shutdown.
|
||||
// The pattern (very roughly) is:
|
||||
//
|
||||
// func readOrWrite() {
|
||||
// refcount++
|
||||
// defer refcount--
|
||||
// if closed {
|
||||
// return
|
||||
// }
|
||||
// // ...
|
||||
// }
|
||||
//
|
||||
// Close sets closed to true and polls until refcount hits zero.
|
||||
// Once refcount hits zero, there are no ongoing reads or writes.
|
||||
// Any future reads or writes will exit immediately (because closed is true),
|
||||
// so resources used by reads and writes may be freed.
|
||||
// The polling is unfortunate, but it occurs only during Close, is fast,
|
||||
// and avoids ugly sequencing issues around canceling outstanding io_uring submissions.
|
||||
//
|
||||
// (The obvious alternative is to use a sync.RWMutex, but that has a chicken-and-egg problem.
|
||||
// Reads/writes must take an rlock, but Close cannot take a wlock under all the rlocks are released,
|
||||
// but Close cannot issue cancellations to release the rlocks without first taking a wlock.)
|
||||
refcount syncs.AtomicInt32
|
||||
}
|
||||
|
||||
func NewUDPConn(pconn net.PacketConn) (*UDPConn, error) {
|
||||
conn, ok := pconn.(*net.UDPConn)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("cannot use io_uring with conn of type %T", pconn)
|
||||
}
|
||||
local := conn.LocalAddr()
|
||||
udpAddr, ok := local.(*net.UDPAddr)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("cannot use io_uring with conn.LocalAddr of type %T", local)
|
||||
}
|
||||
|
||||
// TODO: probe for system capabilities: https://unixism.net/loti/tutorial/probe_liburing.html
|
||||
|
||||
file, err := conn.File()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// conn.File dup'd the conn's fd. We no longer need the original conn.
|
||||
conn.Close()
|
||||
|
||||
u := &UDPConn{
|
||||
recvRing: new(C.go_uring),
|
||||
file: file,
|
||||
local: local,
|
||||
is4: len(udpAddr.IP) == 4,
|
||||
}
|
||||
u.sendRing.ring = new(C.go_uring)
|
||||
|
||||
fd := file.Fd()
|
||||
u.shutdown = append(u.shutdown, func() {
|
||||
file.Close()
|
||||
})
|
||||
|
||||
if ret := C.initialize(u.recvRing, C.int(fd)); ret < 0 {
|
||||
u.doShutdown()
|
||||
return nil, fmt.Errorf("recvRing initialization failed: %w", syscall.Errno(-ret))
|
||||
}
|
||||
u.shutdown = append(u.shutdown, func() {
|
||||
C.io_uring_queue_exit(u.recvRing)
|
||||
})
|
||||
|
||||
if ret := C.initialize(u.sendRing.ring, C.int(fd)); ret < 0 {
|
||||
u.doShutdown()
|
||||
return nil, fmt.Errorf("sendRing initialization failed: %w", syscall.Errno(-ret))
|
||||
}
|
||||
u.shutdown = append(u.shutdown, func() {
|
||||
C.io_uring_queue_exit(u.sendRing.ring)
|
||||
})
|
||||
|
||||
// Initialize buffers
|
||||
for i := range u.recvReqs {
|
||||
u.recvReqs[i] = C.initializeReq(bufferSize, C.size_t(i), C.int(len(udpAddr.IP)))
|
||||
}
|
||||
u.sendRing.initReqs(len(udpAddr.IP))
|
||||
u.shutdown = append(u.shutdown, func() {
|
||||
for _, r := range u.recvReqs {
|
||||
C.freeReq(r)
|
||||
}
|
||||
u.sendRing.freeReqs()
|
||||
})
|
||||
|
||||
// Initialize recv half.
|
||||
for i := range u.recvReqs {
|
||||
if err := u.submitRecvRequest(i); err != nil {
|
||||
u.doShutdown()
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return u, nil
|
||||
}
|
||||
|
||||
func (u *UDPConn) submitRecvRequest(idx int) error {
|
||||
errno := C.submit_recvmsg_request(u.recvRing, u.recvReqs[idx])
|
||||
if errno < 0 {
|
||||
return fmt.Errorf("uring.submitRecvRequest failed: %w", syscall.Errno(-errno))
|
||||
}
|
||||
atomic.AddInt32(u.recvReqInKernel(idx), 1) // TODO: CAS?
|
||||
return nil
|
||||
}
|
||||
|
||||
func (u *UDPConn) recvReqInKernel(idx int) *int32 {
|
||||
return (*int32)(unsafe.Pointer(&u.recvReqs[idx].in_kernel))
|
||||
}
|
||||
|
||||
func (u *UDPConn) ReadFromNetaddr(buf []byte) (int, netaddr.IPPort, error) {
|
||||
// The docs for the u.refcount field document this prologue.
|
||||
u.refcount.Add(1)
|
||||
defer u.refcount.Add(-1)
|
||||
if u.closed.Get() {
|
||||
return 0, netaddr.IPPort{}, net.ErrClosed
|
||||
}
|
||||
|
||||
n, idx, err := waitCompletion(u.recvRing)
|
||||
if errors.Is(err, syscall.ECANCELED) {
|
||||
atomic.AddInt32(u.recvReqInKernel(idx), -1)
|
||||
return 0, netaddr.IPPort{}, net.ErrClosed
|
||||
}
|
||||
if err != nil {
|
||||
// io_uring failed to run our syscall.
|
||||
return 0, netaddr.IPPort{}, fmt.Errorf("ReadFromNetaddr io_uring could not run syscall: %w", err)
|
||||
}
|
||||
atomic.AddInt32(u.recvReqInKernel(idx), -1)
|
||||
if n < 0 {
|
||||
// io_uring ran our syscall, which failed.
|
||||
// Best effort attempt not to leak idx.
|
||||
u.submitRecvRequest(int(idx))
|
||||
return 0, netaddr.IPPort{}, fmt.Errorf("ReadFromNetaddr syscall failed: %w", syscall.Errno(-n))
|
||||
}
|
||||
r := u.recvReqs[idx]
|
||||
var ip netaddr.IP
|
||||
var port uint16
|
||||
if u.is4 {
|
||||
ip = netaddr.IPFrom4(*(*[4]byte)((unsafe.Pointer)((&r.sa.sin_addr.s_addr))))
|
||||
port = endian.Ntoh16(uint16(r.sa.sin_port))
|
||||
} else {
|
||||
ip = netaddr.IPFrom16(*(*[16]byte)((unsafe.Pointer)((&r.sa6.sin6_addr))))
|
||||
port = endian.Ntoh16(uint16(r.sa6.sin6_port))
|
||||
}
|
||||
ipp := netaddr.IPPortFrom(ip, port)
|
||||
// Copy the data to the buffer provided by wireguard-go.
|
||||
// Maybe some sparkling day this copy wil be the slowest thing in our stack.
|
||||
// It's not even on the radar now.
|
||||
rbuf := sliceOf(r.buf, n)
|
||||
copy(buf, rbuf)
|
||||
// Queue up a new request.
|
||||
if err := u.submitRecvRequest(int(idx)); err != nil {
|
||||
// Aggressively return this error.
|
||||
// The error will bubble up and cause the entire conn to be closed down,
|
||||
// so it doesn't matter that we lost a packet here.
|
||||
return 0, netaddr.IPPort{}, err
|
||||
}
|
||||
return n, ipp, nil
|
||||
}
|
||||
|
||||
func (c *UDPConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
|
||||
n, ipp, err := c.ReadFromNetaddr(p)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
return n, ipp.UDPAddr(), err
|
||||
}
|
||||
|
||||
func (u *UDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
|
||||
// The docs for the u.refcount field document this prologue.
|
||||
u.refcount.Add(1)
|
||||
defer u.refcount.Add(-1)
|
||||
if u.closed.Get() {
|
||||
return 0, net.ErrClosed
|
||||
}
|
||||
|
||||
udpAddr, ok := addr.(*net.UDPAddr)
|
||||
if !ok {
|
||||
return 0, fmt.Errorf("cannot WriteTo net.Addr of type %T", addr)
|
||||
}
|
||||
|
||||
// Get a req, blocking as needed.
|
||||
r, err := u.sendRing.getReq()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
// Do the write.
|
||||
rbuf := sliceOf(r.buf, len(p))
|
||||
copy(rbuf, p)
|
||||
|
||||
if u.is4 {
|
||||
dst := (*[4]byte)((unsafe.Pointer)(&r.sa.sin_addr.s_addr))
|
||||
src := (*[4]byte)((unsafe.Pointer)(&udpAddr.IP[0]))
|
||||
*dst = *src
|
||||
r.sa.sin_port = C.uint16_t(endian.Hton16(uint16(udpAddr.Port)))
|
||||
r.sa.sin_family = C.AF_INET
|
||||
} else {
|
||||
dst := (*[16]byte)((unsafe.Pointer)(&r.sa6.sin6_addr))
|
||||
src := (*[16]byte)((unsafe.Pointer)(&udpAddr.IP[0]))
|
||||
*dst = *src
|
||||
r.sa6.sin6_port = C.uint16_t(endian.Hton16(uint16(udpAddr.Port)))
|
||||
r.sa6.sin6_family = C.AF_INET6
|
||||
}
|
||||
C.submit_sendmsg_request(u.sendRing.ring, r, C.int(len(p)))
|
||||
// Get an extra buffer, if available.
|
||||
u.sendRing.prefetch()
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
func (u *UDPConn) Close() error {
|
||||
u.close.Do(func() {
|
||||
// Announce to readers and writers that we are closing down.
|
||||
// Busy loop until all reads and writes are unblocked.
|
||||
// See the docs for u.refcount.
|
||||
u.closed.Set(true)
|
||||
for {
|
||||
// Request that the kernel cancel all submitted reads. (Writes don't block indefinitely.)
|
||||
for idx := range u.recvReqs {
|
||||
if atomic.LoadInt32(u.recvReqInKernel(idx)) != 0 {
|
||||
C.submit_cancel_request(u.recvRing, C.size_t(idx))
|
||||
}
|
||||
}
|
||||
if u.refcount.Get() == 0 {
|
||||
break
|
||||
}
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
// Do the rest of the shutdown.
|
||||
u.doShutdown()
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (u *UDPConn) doShutdown() {
|
||||
for _, fn := range u.shutdown {
|
||||
fn()
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure that UDPConn implements net.PacketConn.
|
||||
var _ net.PacketConn = (*UDPConn)(nil)
|
||||
|
||||
func (c *UDPConn) LocalAddr() net.Addr { return c.local }
|
||||
func (c *UDPConn) SetDeadline(t time.Time) error { panic("not implemented") }
|
||||
func (c *UDPConn) SetReadDeadline(t time.Time) error { panic("not implemented") }
|
||||
func (c *UDPConn) SetWriteDeadline(t time.Time) error { panic("not implemented") }
|
||||
45
net/uring/udp_test_linux.go
Normal file
45
net/uring/udp_test_linux.go
Normal file
@@ -0,0 +1,45 @@
|
||||
package uring
|
||||
|
||||
import (
|
||||
"net"
|
||||
"testing"
|
||||
|
||||
qt "github.com/frankban/quicktest"
|
||||
)
|
||||
|
||||
func TestUDPSendRecv(t *testing.T) {
|
||||
if !Available() {
|
||||
t.Skip("io_uring not available")
|
||||
}
|
||||
c := qt.New(t)
|
||||
|
||||
listen, err := net.ListenUDP("udp4", &net.UDPAddr{Port: 9999})
|
||||
t.Cleanup(func() { listen.Close() })
|
||||
c.Assert(err, qt.IsNil)
|
||||
|
||||
conn, err := NewUDPConn(listen)
|
||||
t.Cleanup(func() { conn.Close() })
|
||||
if err != nil {
|
||||
t.Skipf("io_uring not available: %v", err)
|
||||
}
|
||||
addr := listen.LocalAddr()
|
||||
sendBuf := make([]byte, 200)
|
||||
for i := range sendBuf {
|
||||
sendBuf[i] = byte(i)
|
||||
}
|
||||
recvBuf := make([]byte, 200)
|
||||
|
||||
// Write one direction.
|
||||
_, err = conn.WriteTo(sendBuf, addr)
|
||||
c.Assert(err, qt.IsNil)
|
||||
n, ipp, err := conn.ReadFromNetaddr(recvBuf)
|
||||
c.Assert(err, qt.IsNil)
|
||||
c.Assert(recvBuf[:n], qt.DeepEquals, sendBuf)
|
||||
|
||||
// Write the other direction, to check that ipp is correct.
|
||||
_, err = conn.WriteTo(sendBuf, ipp.UDPAddr())
|
||||
c.Assert(err, qt.IsNil)
|
||||
n, _, err = conn.ReadFromNetaddr(recvBuf)
|
||||
c.Assert(err, qt.IsNil)
|
||||
c.Assert(recvBuf[:n], qt.DeepEquals, sendBuf)
|
||||
}
|
||||
@@ -94,6 +94,21 @@ func (b *AtomicUint32) Get() uint32 {
|
||||
return atomic.LoadUint32((*uint32)(b))
|
||||
}
|
||||
|
||||
// AtomicInt32 is an atomic int32.
|
||||
type AtomicInt32 int32
|
||||
|
||||
func (b *AtomicInt32) Set(v int32) {
|
||||
atomic.StoreInt32((*int32)(b), v)
|
||||
}
|
||||
|
||||
func (b *AtomicInt32) Get() int32 {
|
||||
return atomic.LoadInt32((*int32)(b))
|
||||
}
|
||||
|
||||
func (b *AtomicInt32) Add(v int32) {
|
||||
atomic.AddInt32((*int32)(b), v)
|
||||
}
|
||||
|
||||
// Semaphore is a counting semaphore.
|
||||
//
|
||||
// Use NewSemaphore to create one.
|
||||
|
||||
@@ -13,3 +13,12 @@ const Big = true
|
||||
|
||||
// Native is the platform's native byte order.
|
||||
var Native = binary.BigEndian
|
||||
|
||||
// Ntoh16 converts network order into native/host.
|
||||
func Ntoh16(v uint16) uint16 { return v }
|
||||
|
||||
// Hton32 converts native/host uint32 order into network order.
|
||||
func Hton32(v uint32) uint32 { return v }
|
||||
|
||||
// Hton16 converts native/host uint16 order into network order.
|
||||
func Hton16(v uint16) uint16 { return v }
|
||||
|
||||
48
util/endian/encoding_test.go
Normal file
48
util/endian/encoding_test.go
Normal file
@@ -0,0 +1,48 @@
|
||||
package endian
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"testing"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
func TestNtoh16(t *testing.T) {
|
||||
raw := uint16(0xABCD)
|
||||
rawBytes := toNativeBytes16(raw)
|
||||
big := binary.BigEndian.Uint16(rawBytes[:])
|
||||
if raw != Ntoh16(big) {
|
||||
t.Errorf("ntohs failed, want %v, got %v", raw, Ntoh16(big))
|
||||
}
|
||||
}
|
||||
|
||||
func toNativeBytes32(v uint32) [4]byte {
|
||||
return *(*[4]byte)(unsafe.Pointer(&v))
|
||||
}
|
||||
|
||||
func TestHton32(t *testing.T) {
|
||||
raw := uint32(0xDEADBEEF)
|
||||
|
||||
networkOrder := Hton32(raw)
|
||||
bytes := toNativeBytes32(networkOrder)
|
||||
fromBig := binary.BigEndian.Uint32(bytes[:])
|
||||
|
||||
if fromBig != raw {
|
||||
t.Errorf("htonl failed, want %v, got %v", raw, fromBig)
|
||||
}
|
||||
}
|
||||
|
||||
func toNativeBytes16(v uint16) [2]byte {
|
||||
return *(*[2]byte)(unsafe.Pointer(&v))
|
||||
}
|
||||
|
||||
func TestHton16(t *testing.T) {
|
||||
raw := uint16(0xBEEF)
|
||||
|
||||
networkOrder := Hton16(raw)
|
||||
bytes := toNativeBytes16(networkOrder)
|
||||
fromBig := binary.BigEndian.Uint16(bytes[:])
|
||||
|
||||
if fromBig != raw {
|
||||
t.Errorf("htonl failed, want %v, got %v", raw, fromBig)
|
||||
}
|
||||
}
|
||||
@@ -6,10 +6,22 @@
|
||||
|
||||
package endian
|
||||
|
||||
import "encoding/binary"
|
||||
import (
|
||||
"encoding/binary"
|
||||
"math/bits"
|
||||
)
|
||||
|
||||
// Big is whether the current platform is big endian.
|
||||
const Big = false
|
||||
|
||||
// Native is the platform's native byte order.
|
||||
var Native = binary.LittleEndian
|
||||
|
||||
// Ntoh16 converts network into native/host order.
|
||||
func Ntoh16(v uint16) uint16 { return bits.ReverseBytes16(v) }
|
||||
|
||||
// Hton32 converts native/host uint32 order into network order.
|
||||
func Hton32(v uint32) uint32 { return bits.ReverseBytes32(v) }
|
||||
|
||||
// Hton16 converts native/host uint16 order into network order.
|
||||
func Hton16(v uint16) uint16 { return bits.ReverseBytes16(v) }
|
||||
|
||||
@@ -44,6 +44,7 @@ import (
|
||||
"tailscale.com/net/netns"
|
||||
"tailscale.com/net/portmapper"
|
||||
"tailscale.com/net/stun"
|
||||
"tailscale.com/net/uring"
|
||||
"tailscale.com/syncs"
|
||||
"tailscale.com/tailcfg"
|
||||
"tailscale.com/tstime"
|
||||
@@ -2696,6 +2697,15 @@ func (c *Conn) bindSocket(rucPtr **RebindingUDPConn, network string, curPortFate
|
||||
}
|
||||
// Success.
|
||||
ruc.pconn = pconn
|
||||
if uring.Available() {
|
||||
uringConn, err := uring.NewUDPConn(pconn)
|
||||
if err != nil {
|
||||
c.logf("not using io_uring for UDP %v: %v", pconn.LocalAddr(), err)
|
||||
} else {
|
||||
c.logf("using uring for UDP %v", pconn.LocalAddr())
|
||||
ruc.pconn = uringConn
|
||||
}
|
||||
}
|
||||
if network == "udp4" {
|
||||
health.SetUDP4Unbound(false)
|
||||
}
|
||||
@@ -2851,17 +2861,22 @@ func (c *RebindingUDPConn) ReadFromNetaddr(b []byte) (n int, ipp netaddr.IPPort,
|
||||
for {
|
||||
pconn := c.currentConn()
|
||||
|
||||
// Optimization: Treat *net.UDPConn specially.
|
||||
// ReadFromUDP gets partially inlined, avoiding allocating a *net.UDPAddr,
|
||||
// Optimization: Treat a few pconn types specially.
|
||||
// For *net.UDPConn, ReadFromUDP gets partially inlined, avoiding allocating a *net.UDPAddr,
|
||||
// as long as pAddr itself doesn't escape.
|
||||
// The non-*net.UDPConn case works, but it allocates.
|
||||
// *uring.UDPConn can return netaddr.IPPorts directly.
|
||||
// The default case works, but it allocates.
|
||||
var pAddr *net.UDPAddr
|
||||
if udpConn, ok := pconn.(*net.UDPConn); ok {
|
||||
n, pAddr, err = udpConn.ReadFromUDP(b)
|
||||
} else {
|
||||
switch pconn := pconn.(type) {
|
||||
case *net.UDPConn:
|
||||
n, pAddr, err = pconn.ReadFromUDP(b)
|
||||
case *uring.UDPConn:
|
||||
n, ipp, err = pconn.ReadFromNetaddr(b)
|
||||
default:
|
||||
var addr net.Addr
|
||||
n, addr, err = pconn.ReadFrom(b)
|
||||
if addr != nil {
|
||||
var ok bool
|
||||
pAddr, ok = addr.(*net.UDPAddr)
|
||||
if !ok {
|
||||
return 0, netaddr.IPPort{}, fmt.Errorf("RebindingUDPConn.ReadFromNetaddr: underlying connection returned address of type %T, want *netaddr.UDPAddr", addr)
|
||||
@@ -2873,7 +2888,7 @@ func (c *RebindingUDPConn) ReadFromNetaddr(b []byte) (n int, ipp netaddr.IPPort,
|
||||
if pconn != c.currentConn() {
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
} else if pAddr != nil {
|
||||
// Convert pAddr to a netaddr.IPPort.
|
||||
// This prevents pAddr from escaping.
|
||||
var ok bool
|
||||
|
||||
@@ -35,6 +35,7 @@ import (
|
||||
"tailscale.com/ipn/ipnstate"
|
||||
"tailscale.com/net/stun/stuntest"
|
||||
"tailscale.com/net/tstun"
|
||||
"tailscale.com/net/uring"
|
||||
"tailscale.com/tailcfg"
|
||||
"tailscale.com/tstest"
|
||||
"tailscale.com/tstest/natlab"
|
||||
@@ -1509,6 +1510,7 @@ func TestSetNetworkMapChangingNodeKey(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRebindStress(t *testing.T) {
|
||||
t.Parallel()
|
||||
conn := newNonLegacyTestConn(t)
|
||||
|
||||
var logBuf bytes.Buffer
|
||||
@@ -1540,17 +1542,24 @@ func TestRebindStress(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
|
||||
iters := 2000
|
||||
if uring.Available() {
|
||||
// io_uring connection close is slow.
|
||||
// Keep the test short when using it.
|
||||
iters = 20
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for i := 0; i < 2000; i++ {
|
||||
for i := 0; i < iters; i++ {
|
||||
conn.Rebind()
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for i := 0; i < 2000; i++ {
|
||||
for i := 0; i < iters; i++ {
|
||||
conn.Rebind()
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -11,15 +11,18 @@ import (
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"tailscale.com/net/uring"
|
||||
)
|
||||
|
||||
func TestWatchdog(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var maxWaitMultiple time.Duration = 1
|
||||
if runtime.GOOS == "darwin" {
|
||||
if runtime.GOOS == "darwin" || uring.Available() {
|
||||
// Work around slow close syscalls on Big Sur with content filter Network Extensions installed.
|
||||
// See https://github.com/tailscale/tailscale/issues/1598.
|
||||
// uring shutdown is also a bit too slow.
|
||||
maxWaitMultiple = 15
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user