Compare commits

...

5 Commits

Author SHA1 Message Date
Simeng He
ccde87bee5 Logs
Signed-off-by: Simeng He <simeng@tailscale.com>
2021-07-07 13:35:30 -04:00
Simeng He
fcbabd35b2 Many logs 2021-07-05 15:40:42 -04:00
Simeng He
e4f414135f removed comments 2021-07-05 15:39:42 -04:00
Simeng He
5244fb42ff check status
Signed-off-by: Simeng He <simeng@tailscale.com>
2021-07-05 15:39:42 -04:00
Simeng He
c0ca24aa3b tstest/integration: add TCP listener two node
Signed-off-by: Simeng He <simeng@tailscale.com>
2021-07-05 15:39:42 -04:00
8 changed files with 250 additions and 5 deletions

2
go.mod
View File

@@ -47,3 +47,5 @@ require (
inet.af/wf v0.0.0-20210516214145-a5343001b756
rsc.io/goversion v1.2.0
)
replace golang.zx2c4.com/wireguard => /home/simenghe/gohack/golang.zx2c4.com/wireguard

View File

@@ -160,7 +160,7 @@ func (c *Conn) handleRequest() error {
}
c.request = req
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
srv, err := c.srv.dial(
ctx,

View File

@@ -6,6 +6,7 @@ package tstun
import (
"io"
"log"
"os"
"golang.zx2c4.com/wireguard/tun"
@@ -35,11 +36,13 @@ func (t *fakeTUN) Close() error {
}
func (t *fakeTUN) Read(out []byte, offset int) (int, error) {
log.Println("TSTUN : FAKE")
<-t.closechan
return 0, io.EOF
}
func (t *fakeTUN) Write(b []byte, n int) (int, error) {
log.Println("FAKE : Write Called")
select {
case <-t.closechan:
return 0, ErrClosed

View File

@@ -8,8 +8,11 @@ package tstun
import (
"errors"
"fmt"
"io"
"log"
"os"
"runtime/debug"
"sync"
"sync/atomic"
"time"
@@ -140,6 +143,8 @@ type tunReadResult struct {
}
func Wrap(logf logger.Logf, tdev tun.Device) *Wrapper {
fmt.Printf("Tunnel Type %T", tdev)
debug.PrintStack()
tun := &Wrapper{
logf: logger.WithPrefix(logf, "tstun: "),
tdev: tdev,
@@ -276,6 +281,7 @@ func allowSendOnClosedChannel() {
// so packets may be stuck in t.outbound if t.Read called t.tdev.Read directly.
func (t *Wrapper) poll() {
defer allowSendOnClosedChannel() // for send to t.outbound
log.Println("TSTUN : POLL Started with len ", len(t.bufferConsumed))
for range t.bufferConsumed {
var n int
var err error
@@ -287,14 +293,20 @@ func (t *Wrapper) poll() {
// We don't need this loop for correctness,
// but wireguard-go will skip an empty read,
// so we might as well avoid the send through t.outbound.
log.Println("TSTUN : BEFORE READ")
for n == 0 && err == nil {
log.Println("TSTUN : BEFORE READ IN FOR")
if t.isClosed() {
log.Println("TSTUN : BEFORE T CLOSED")
return
}
n, err = t.tdev.Read(t.buffer[:], PacketStartOffset)
}
log.Println("TSTUN : BEFORE OUTBOUND")
t.outbound <- tunReadResult{data: t.buffer[PacketStartOffset : PacketStartOffset+n], err: err}
log.Println("TSTUN : sent to outbound")
}
log.Println("TSTUN : POLL FINISHED")
}
var magicDNSIPPort = netaddr.MustParseIPPort("100.100.100.100:0")
@@ -349,12 +361,16 @@ func (t *Wrapper) IdleDuration() time.Duration {
}
func (t *Wrapper) Read(buf []byte, offset int) (int, error) {
now := time.Now()
res, ok := <-t.outbound
log.Println("TSTUN : outbound wait for channel read time, ", time.Since(now))
if !ok {
// Wrapper is closed.
log.Println("TSTUN : EOF")
return 0, io.EOF
}
if res.err != nil {
log.Println("TSTUN : err: ", res.err)
return 0, res.err
}
defer allowSendOnClosedChannel() // for send to t.bufferConsumed
@@ -389,10 +405,13 @@ func (t *Wrapper) Read(buf []byte, offset int) (int, error) {
}
t.noteActivity()
log.Printf("TSTUN : Read Completed in %v\n", time.Since(now).Seconds())
return n, nil
}
func (t *Wrapper) filterIn(buf []byte) filter.Response {
log.Println("TSTUN : Filter In called")
now := time.Now()
p := parsedPacketPool.Get().(*packet.Parsed)
defer parsedPacketPool.Put(p)
p.Decode(buf)
@@ -408,12 +427,14 @@ func (t *Wrapper) filterIn(buf []byte) filter.Response {
}
}
}
log.Println("TSTUN : Filter In After TSMP")
if t.PreFilterIn != nil {
if res := t.PreFilterIn(p, t); res.IsDrop() {
return res
}
}
log.Println("TSTUN : Filter In After PreFilter")
filt, _ := t.filter.Load().(*filter.Filter)
@@ -422,6 +443,7 @@ func (t *Wrapper) filterIn(buf []byte) filter.Response {
}
outcome := filt.RunIn(p, t.filterFlags)
log.Println("TSTUN : Filter In After Outcome")
// Let peerapi through the filter; its ACLs are handled at L7,
// not at the packet level.
@@ -433,14 +455,17 @@ func (t *Wrapper) filterIn(buf []byte) filter.Response {
outcome = filter.Accept
}
}
log.Println("TSTUN : Filter In After Outcome check2 type : ", outcome.String())
if outcome != filter.Accept {
log.Println("TSTUN : Filter In After Outcome check3")
// Tell them, via TSMP, we're dropping them due to the ACL.
// Their host networking stack can translate this into ICMP
// or whatnot as required. But notably, their GUI or tailscale CLI
// can show them a rejection history with reasons.
if p.IPVersion == 4 && p.IPProto == ipproto.TCP && p.TCPFlags&packet.TCPSyn != 0 && !t.disableTSMPRejected {
log.Println("TSTUN : Filter In After Outcome check4")
rj := packet.TailscaleRejectedHeader{
IPSrc: p.Dst.IP(),
IPDst: p.Src.IP(),
@@ -454,18 +479,21 @@ func (t *Wrapper) filterIn(buf []byte) filter.Response {
}
pkt := packet.Generate(rj, nil)
t.InjectOutbound(pkt)
log.Println("TSTUN : FilterIn Inject took ,", time.Since(now))
// TODO(bradfitz): also send a TCP RST, after the TSMP message.
}
return filter.Drop
}
log.Println("TSTUN : Filter In After Outcome check5")
if t.PostFilterIn != nil {
if res := t.PostFilterIn(p, t); res.IsDrop() {
return res
}
}
log.Println("TSTUN : Filter In After Outcome check6")
return filter.Accept
}
@@ -473,6 +501,7 @@ func (t *Wrapper) filterIn(buf []byte) filter.Response {
// Write accepts an incoming packet. The packet begins at buf[offset:],
// like wireguard-go/tun.Device.Write.
func (t *Wrapper) Write(buf []byte, offset int) (int, error) {
now := time.Now()
if !t.disableFilter {
if t.filterIn(buf[offset:]) != filter.Accept {
// If we're not accepting the packet, lie to wireguard-go and pretend
@@ -488,11 +517,16 @@ func (t *Wrapper) Write(buf []byte, offset int) (int, error) {
// device/receive.go: _, err = device.tun.device.Write(....)
//
// TODO(bradfitz): fix upstream interface docs, implementation.
log.Println("TSTUN : Write completed early in ", time.Since(now))
return len(buf), nil
}
}
// Causes a data race ???
// defer log.Println("TSTUN : Write completed in ", time.Since(now))
// debug.PrintStack()
t.noteActivity()
// log.Println("TSTUN : Write completed in ", time.Since(now))
return t.tdev.Write(buf, offset)
}
@@ -549,6 +583,7 @@ func (t *Wrapper) InjectInboundCopy(packet []byte) error {
}
func (t *Wrapper) injectOutboundPong(pp *packet.Parsed, req packet.TSMPPingRequest) {
now := time.Now()
pong := packet.TSMPPongReply{
Data: req.Data,
}
@@ -569,6 +604,7 @@ func (t *Wrapper) injectOutboundPong(pp *packet.Parsed, req packet.TSMPPingReque
}
t.InjectOutbound(packet.Generate(pong, nil))
log.Println("TSTUN : Inject outbound pong took ", time.Since(now))
}
// InjectOutbound makes the Wrapper device behave as if a packet
@@ -577,6 +613,8 @@ func (t *Wrapper) injectOutboundPong(pp *packet.Parsed, req packet.TSMPPingReque
// The injected packet will not pass through outbound filters.
// Injecting an empty packet is a no-op.
func (t *Wrapper) InjectOutbound(packet []byte) error {
log.Println("TSTUN: Inject Outbound")
now := time.Now()
if len(packet) > MaxPacketSize {
return errPacketTooBig
}
@@ -585,6 +623,7 @@ func (t *Wrapper) InjectOutbound(packet []byte) error {
}
defer allowSendOnClosedChannel() // for send to t.outbound
t.outbound <- tunReadResult{data: packet}
log.Println("TSTUN : Inject took ", time.Since(now))
return nil
}

View File

@@ -14,6 +14,7 @@ import (
"io"
"io/ioutil"
"log"
"net"
"net/http"
"net/http/httptest"
"os"
@@ -21,6 +22,7 @@ import (
"path/filepath"
"regexp"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
@@ -28,6 +30,7 @@ import (
"time"
"go4.org/mem"
"golang.org/x/net/proxy"
"tailscale.com/ipn/ipnstate"
"tailscale.com/safesocket"
"tailscale.com/smallzstd"
@@ -286,6 +289,132 @@ func TestAddPingRequest(t *testing.T) {
t.Error("all ping attempts failed")
}
func TestTwoNodeConnectivity(t *testing.T) {
t.Parallel()
bins := BuildTestBinaries(t)
env := newTestEnv(t, bins)
defer env.Close()
// Create two nodes and hope that logs come out correctly
n1 := newTestNode(t, env)
n1SocksAddrCh := n1.socks5AddrChan()
d1 := n1.StartDaemonPrefix(t, "Node1 ")
defer d1.Kill()
n2 := newTestNode(t, env)
n2SocksAddrCh := n2.socks5AddrChan()
d2 := n2.StartDaemonPrefix(t, "Node2 ")
defer d2.Kill()
n1Socks := n1.AwaitSocksAddr(t, n1SocksAddrCh)
n2Socks := n2.AwaitSocksAddr(t, n2SocksAddrCh)
t.Logf("node1 SOCKS5 addr: %v", n1Socks)
t.Logf("node2 SOCKS5 addr: %v", n2Socks)
n1.AwaitListening(t)
n2.AwaitListening(t)
n1.MustUp()
n2.MustUp()
n1.AwaitRunning(t)
n2.AwaitRunning(t)
n2IP := n2.AwaitIP(t)
defer func() {
d1.MustCleanShutdown(t)
d2.MustCleanShutdown(t)
d1.Kill()
d2.Kill()
}()
if err := tstest.WaitFor(20*time.Second, func() error {
now := time.Now()
// Wait for status of node #1
st1 := n1.MustStatus(t)
if len(st1.Peer) == 0 {
return errors.New("no peers")
}
if len(st1.Peer) > 1 {
return fmt.Errorf("got %d peers; want 1", len(st1.Peer))
}
peer1 := st1.Peer[st1.Peers()[0]]
if peer1.ID == st1.Self.ID {
return errors.New("peer1 is self")
}
// Wait for status of node #2
st2 := n2.MustStatus(t)
if len(st2.Peer) == 0 {
return errors.New("no peers")
}
if len(st2.Peer) > 1 {
return fmt.Errorf("got %d peers; want 1", len(st2.Peer))
}
peer2 := st2.Peer[st2.Peers()[0]]
if peer2.ID == st2.Self.ID {
return errors.New("peer2 is self")
}
// start listener for test
l, err := net.Listen("tcp", "localhost:0")
if err != nil {
return err
}
// Dial this conn.addr
go func() {
conn, err := l.Accept()
if err != nil {
t.Error(err)
}
defer conn.Close()
_, err = conn.Write([]byte("TestString"))
if err != nil {
t.Error(err)
}
}()
dialer, err := proxy.SOCKS5("tcp", n1Socks, nil, proxy.Direct)
if err != nil {
return err
}
t.Log(dialer)
port := l.Addr().(*net.TCPAddr)
t.Log(port)
testIP := strings.ReplaceAll(net.JoinHostPort(n2IP, strconv.Itoa(port.Port)), "\n", "")
t.Log("Dialing : ", testIP)
dialerTimer := time.Now()
dialerConn, err := dialer.Dial("tcp", testIP)
if err != nil {
return err
}
dialerDuration := time.Since(dialerTimer).Seconds()
t.Logf("dialing took %v seconds", dialerDuration)
defer dialerConn.Close()
t.Logf("Dialer Connection Established at %v", dialerConn.LocalAddr())
_, err = dialerConn.Write([]byte("TestTest"))
if err != nil {
return err
}
// Read the bytes in
p := make([]byte, 1024)
_, err = dialerConn.Read(p)
if err != nil {
return err
}
t.Logf("Time taken for this run : %vs", time.Since(now).Seconds())
return nil
}); err != nil {
t.Fatal(err)
}
}
// testEnv contains the test environment (set of servers) used by one
// or more nodes.
type testEnv struct {
@@ -480,9 +609,29 @@ func (d *Daemon) MustCleanShutdown(t testing.TB) {
}
}
type PrefixedWriter struct {
prefix string
w io.Writer
}
func (p *PrefixedWriter) Write(b []byte) (int, error) {
var buf []byte
buf = append(buf, p.prefix...)
buf = append(buf, b...)
_, err := p.w.Write(buf)
if err != nil {
return 0, err
}
return len(b), err
}
func (n *testNode) StartDaemon(t testing.TB) *Daemon {
return n.StartDaemonPrefix(t, "")
}
// StartDaemon starts the node's tailscaled, failing if it fails to
// start.
func (n *testNode) StartDaemon(t testing.TB) *Daemon {
func (n *testNode) StartDaemonPrefix(t testing.TB, prefix string) *Daemon {
cmd := exec.Command(n.env.Binaries.Daemon,
"--tun=userspace-networking",
"--state="+n.stateFile,
@@ -496,8 +645,8 @@ func (n *testNode) StartDaemon(t testing.TB) *Daemon {
)
cmd.Stderr = &nodeOutputParser{n: n}
if *verboseTailscaled {
cmd.Stdout = os.Stdout
cmd.Stderr = io.MultiWriter(cmd.Stderr, os.Stderr)
cmd.Stdout = &PrefixedWriter{prefix: prefix, w: os.Stdout}
cmd.Stderr = io.MultiWriter(cmd.Stderr, &PrefixedWriter{prefix: prefix, w: os.Stderr})
}
if err := cmd.Start(); err != nil {
t.Fatalf("starting tailscaled: %v", err)

View File

@@ -11,6 +11,7 @@ import (
"encoding/binary"
"errors"
"hash"
"log"
"net"
"strings"
"sync"
@@ -144,6 +145,7 @@ func (c *Conn) resetAddrSetStatesLocked() {
}
func (c *Conn) sendAddrSet(b []byte, as *addrSet) error {
log.Println("sendAddrSet")
if c.disableLegacy {
return errDisabled
}

View File

@@ -15,6 +15,7 @@ import (
"errors"
"fmt"
"hash/fnv"
"log"
"math"
"math/rand"
"net"
@@ -1113,13 +1114,16 @@ func (c *Conn) Send(b []byte, ep conn.Endpoint) error {
if c.networkDown() {
return errNetworkDown
}
log.Println("MS: SENDER")
switch v := ep.(type) {
default:
panic(fmt.Sprintf("[unexpected] Endpoint type %T", v))
case *discoEndpoint:
log.Println("SEND CALLED DISCO", ep)
return v.send(b)
case *addrSet:
log.Println("SEND CALLED addrSet", ep)
return c.sendAddrSet(b, v)
}
}
@@ -1135,6 +1139,7 @@ var udpAddrPool = &sync.Pool{
// sendUDP sends UDP packet b to ipp.
// See sendAddr's docs on the return value meanings.
func (c *Conn) sendUDP(ipp netaddr.IPPort, b []byte) (sent bool, err error) {
log.Println("SENDUDP")
ua := udpAddrPool.Get().(*net.UDPAddr)
defer udpAddrPool.Put(ua)
return c.sendUDPStd(ipp.UDPAddrAt(ua), b)
@@ -1146,6 +1151,7 @@ func (c *Conn) sendUDPStd(addr *net.UDPAddr, b []byte) (sent bool, err error) {
switch {
case addr.IP.To4() != nil:
_, err = c.pconn4.WriteTo(b, addr)
log.Println("IPV4UDP ", err)
if err != nil && c.noV4.Get() {
return false, nil
}
@@ -1155,6 +1161,7 @@ func (c *Conn) sendUDPStd(addr *net.UDPAddr, b []byte) (sent bool, err error) {
return false, nil
}
_, err = c.pconn6.WriteTo(b, addr)
log.Println("IPV6UDP ", err)
if err != nil && c.noV6.Get() {
return false, nil
}
@@ -1175,6 +1182,7 @@ func (c *Conn) sendUDPStd(addr *net.UDPAddr, b []byte) (sent bool, err error) {
// IPv6 address when the local machine doesn't have IPv6 support
// returns (false, nil); it's not an error, but nothing was sent.
func (c *Conn) sendAddr(addr netaddr.IPPort, pubKey key.Public, b []byte) (sent bool, err error) {
log.Println("SENDADDR")
if addr.IP() != derpMagicIPAddr {
return c.sendUDP(addr, b)
}
@@ -1183,6 +1191,7 @@ func (c *Conn) sendAddr(addr netaddr.IPPort, pubKey key.Public, b []byte) (sent
if ch == nil {
return false, nil
}
log.Println("CHANNEL", ch)
// TODO(bradfitz): this makes garbage for now; we could use a
// buffer pool later. Previously we passed ownership of this
@@ -1194,11 +1203,14 @@ func (c *Conn) sendAddr(addr netaddr.IPPort, pubKey key.Public, b []byte) (sent
select {
case <-c.donec:
log.Println("Connection Closed!")
return false, errConnClosed
case ch <- derpWriteRequest{addr, pubKey, pkt}:
log.Println("DERPWRITEREQUEST")
return true, nil
default:
// Too many writes queued. Drop packet.
log.Println("DROPPED")
return false, errDropDerpPacket
}
}
@@ -1217,6 +1229,7 @@ const bufferedDerpWritesBeforeDrop = 32
// If peer is non-zero, it can be used to find an active reverse
// path, without using addr.
func (c *Conn) derpWriteChanOfAddr(addr netaddr.IPPort, peer key.Public) chan<- derpWriteRequest {
log.Println("derpWriteChanOfAddr")
if addr.IP() != derpMagicIPAddr {
return nil
}
@@ -1526,6 +1539,7 @@ type derpWriteRequest struct {
// runDerpWriter runs in a goroutine for the life of a DERP
// connection, handling received packets.
func (c *Conn) runDerpWriter(ctx context.Context, dc *derphttp.Client, ch <-chan derpWriteRequest, wg *syncs.WaitGroupChan, startGate <-chan struct{}) {
log.Println("RUNDERPWRITER")
defer wg.Decr()
select {
case <-startGate:
@@ -1538,6 +1552,7 @@ func (c *Conn) runDerpWriter(ctx context.Context, dc *derphttp.Client, ch <-chan
case <-ctx.Done():
return
case wr := <-ch:
log.Println("DERPWRITER")
err := dc.Send(wr.pubKey, wr.b)
if err != nil {
c.logf("magicsock: derp.Send(%v): %v", wr.addr, err)
@@ -1617,6 +1632,7 @@ func (c *Conn) receiveIPv4(b []byte) (n int, ep conn.Endpoint, err error) {
// ok is whether this read should be reported up to wireguard-go (our
// caller).
func (c *Conn) receiveIP(b []byte, ipp netaddr.IPPort, cache *ippEndpointCache) (ep conn.Endpoint, ok bool) {
log.Println("MS : Received IP")
if stun.Is(b) {
c.stunReceiveFunc.Load().(func([]byte, netaddr.IPPort))(b, ipp)
return nil, false
@@ -1624,6 +1640,7 @@ func (c *Conn) receiveIP(b []byte, ipp netaddr.IPPort, cache *ippEndpointCache)
if c.handleDiscoMessage(b, ipp) {
return nil, false
}
log.Println("MS : ReceivedIP called HandleDiscoMessage")
if !c.havePrivateKey.Get() {
// If we have no private key, we're logged out or
// stopped. Don't try to pass these wireguard packets
@@ -1670,6 +1687,7 @@ func (c *connBind) receiveDERP(b []byte) (n int, ep conn.Endpoint, err error) {
}
func (c *Conn) processDERPReadResult(dm derpReadResult, b []byte) (n int, ep conn.Endpoint) {
log.Println("MS : Process DERP READ RESULT")
if dm.copyBuf == nil {
return 0, nil
}
@@ -1684,8 +1702,10 @@ func (c *Conn) processDERPReadResult(dm derpReadResult, b []byte) (n int, ep con
ipp := netaddr.IPPortFrom(derpMagicIPAddr, uint16(regionID))
if c.handleDiscoMessage(b[:n], ipp) {
log.Println("MS : c.HandleDiscoMessage")
return 0, nil
}
log.Println("MS : ProcessDerp HandleDiscoMessage")
var (
didNoteRecvActivity bool
@@ -1749,6 +1769,7 @@ const (
)
func (c *Conn) sendDiscoMessage(dst netaddr.IPPort, dstKey tailcfg.NodeKey, dstDisco tailcfg.DiscoKey, m disco.Message, logLevel discoLogLevel) (sent bool, err error) {
log.Println("MS: SENDDISCOMESSAGE")
c.mu.Lock()
if c.closed {
c.mu.Unlock()
@@ -1921,8 +1942,10 @@ func (c *Conn) handleDiscoMessage(msg []byte, src netaddr.IPPort) (isDiscoMsg bo
switch dm := dm.(type) {
case *disco.Ping:
log.Println("Pinger", de)
c.handlePingLocked(dm, de, src, sender, peerNode)
case *disco.Pong:
log.Println("Ponger", de)
if de == nil {
return
}
@@ -1941,10 +1964,12 @@ func (c *Conn) handleDiscoMessage(msg []byte, src netaddr.IPPort) (isDiscoMsg bo
go de.handleCallMeMaybe(dm)
}
}
log.Println("DISCO MESSAGE COMPLETE")
return
}
func (c *Conn) handlePingLocked(dm *disco.Ping, de *discoEndpoint, src netaddr.IPPort, sender tailcfg.DiscoKey, peerNode *tailcfg.Node) {
now := time.Now()
if peerNode == nil {
c.logf("magicsock: disco: [unexpected] ignoring ping from unknown peer Node")
return
@@ -1966,6 +1991,7 @@ func (c *Conn) handlePingLocked(dm *disco.Ping, de *discoEndpoint, src netaddr.I
TxID: dm.TxID,
Src: src,
}, discoVerboseLog)
log.Printf("HandlePingLocked took %v seconds\n", time.Since(now).Seconds())
}
// enqueueCallMeMaybe schedules a send of disco.CallMeMaybe to de via derpAddr
@@ -2637,6 +2663,7 @@ func (c *Conn) listenPacket(network string, host netaddr.IP, port uint16) (net.P
// If curPortFate is set to dropCurrentPort, no attempt is made to reuse
// the current port.
func (c *Conn) bindSocket(rucPtr **RebindingUDPConn, network string, curPortFate currentPortFate) error {
log.Println("MS: BINDSOCKET")
var host netaddr.IP
if inTest() && !c.simulatedNetwork {
switch network {
@@ -2824,6 +2851,7 @@ func (c *RebindingUDPConn) currentConn() net.PacketConn {
// ReadFrom reads a packet from c into b.
// It returns the number of bytes copied and the source address.
func (c *RebindingUDPConn) ReadFrom(b []byte) (int, net.Addr, error) {
log.Println("MS : ReadFrom ", c.pconn.LocalAddr())
for {
pconn := c.currentConn()
n, addr, err := pconn.ReadFrom(b)
@@ -2909,6 +2937,9 @@ func (c *RebindingUDPConn) closeLocked() error {
func (c *RebindingUDPConn) WriteTo(b []byte, addr net.Addr) (int, error) {
for {
log.Printf("MS : WRITETO %v", c.pconn.LocalAddr())
log.Println(string(b))
log.Println(b)
c.mu.Lock()
pconn := c.pconn
c.mu.Unlock()
@@ -2923,6 +2954,7 @@ func (c *RebindingUDPConn) WriteTo(b []byte, addr net.Addr) (int, error) {
continue
}
}
log.Println("MS : WRITETO complete ", n, err)
return n, err
}
}
@@ -2950,6 +2982,7 @@ func (c *blockForeverConn) ReadFrom(p []byte) (n int, addr net.Addr, err error)
}
func (c *blockForeverConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
log.Println("DROP WRITE")
// Silently drop writes.
return len(p), nil
}
@@ -3163,7 +3196,7 @@ const (
// heartbeatInterval is how often pings to the best UDP address
// are sent.
heartbeatInterval = 2 * time.Second
heartbeatInterval = 100 * time.Second
// discoPingInterval is the minimum time between pings
// to an endpoint. (Except in the case of CallMeMaybe frames
@@ -3320,6 +3353,7 @@ func (de *discoEndpoint) addrForSendLocked(now time.Time) (udpAddr, derpAddr net
// heartbeat is called every heartbeatInterval to keep the best UDP path alive,
// or kick off discovery of other paths.
func (de *discoEndpoint) heartbeat() {
log.Println("BEATING")
de.mu.Lock()
defer de.mu.Unlock()
@@ -3415,19 +3449,26 @@ func (de *discoEndpoint) send(b []byte) error {
de.noteActiveLocked()
de.mu.Unlock()
log.Println("TESTUDP", udpAddr.IsZero(), derpAddr.IsZero())
log.Printf("UDP : %v, DERP : %v\n", udpAddr, derpAddr)
if udpAddr.IsZero() && derpAddr.IsZero() {
log.Println("HANDSHAKEFAIL")
return errors.New("no UDP or DERP addr")
}
log.Println("nohandshakeerr")
var err error
if !udpAddr.IsZero() {
log.Println("UDPADDR FINE")
_, err = de.c.sendAddr(udpAddr, key.Public(de.publicKey), b)
}
if !derpAddr.IsZero() {
log.Println("DERPADDR FINE")
if ok, _ := de.c.sendAddr(derpAddr, key.Public(de.publicKey), b); ok && err != nil {
// UDP failed but DERP worked, so good enough:
return nil
}
}
log.Println("NO SENDADDR ERR")
return err
}
@@ -3644,10 +3685,13 @@ func (de *discoEndpoint) noteConnectivityChange() {
// handlePongConnLocked handles a Pong message (a reply to an earlier ping).
// It should be called with the Conn.mu held.
func (de *discoEndpoint) handlePongConnLocked(m *disco.Pong, src netaddr.IPPort) {
// log.Println("HandlePongConnLocked")
now2 := time.Now()
de.mu.Lock()
defer de.mu.Unlock()
isDerp := src.IP() == derpMagicIPAddr
// log.Println("ISDERP : ", isDerp)
sp, ok := de.sentPing[m.TxID]
if !ok {
@@ -3660,6 +3704,7 @@ func (de *discoEndpoint) handlePongConnLocked(m *disco.Pong, src netaddr.IPPort)
latency := now.Sub(sp.at)
if !isDerp {
log.Println("NOT DERP")
st, ok := de.endpointState[sp.to]
if !ok {
// This is no longer an endpoint we care about.
@@ -3677,6 +3722,7 @@ func (de *discoEndpoint) handlePongConnLocked(m *disco.Pong, src netaddr.IPPort)
}
if sp.purpose != pingHeartbeat {
log.Println("SPECIAL")
de.c.logf("[v1] magicsock: disco: %v<-%v (%v, %v) got pong tx=%x latency=%v pong.src=%v%v", de.c.discoShort, de.discoShort, de.publicKey.ShortString(), src, m.TxID[:6], latency.Round(time.Millisecond), m.Src, logger.ArgWriter(func(bw *bufio.Writer) {
if sp.to != src {
fmt.Fprintf(bw, " ping.to=%v", sp.to)
@@ -3704,6 +3750,8 @@ func (de *discoEndpoint) handlePongConnLocked(m *disco.Pong, src netaddr.IPPort)
de.trustBestAddrUntil = now.Add(trustUDPAddrDuration)
}
}
log.Println("HANDLEPONGCOMPLETE")
log.Printf("HandlePongLocked took %v seconds\n", time.Since(now2).Seconds())
}
// addrLatency is an IPPort with an associated latency.

View File

@@ -399,6 +399,7 @@ func (ns *Impl) DialContextUDP(ctx context.Context, addr string) (*gonet.UDPConn
}
func (ns *Impl) injectOutbound() {
log.Println("NETSTACK Inject outbound")
for {
packetInfo, ok := ns.linkEP.ReadContext(context.Background())
if !ok {
@@ -431,6 +432,7 @@ func (ns *Impl) isLocalIP(ip netaddr.IP) bool {
}
func (ns *Impl) injectInbound(p *packet.Parsed, t *tstun.Wrapper) filter.Response {
log.Println("NETSTACK: injectInbound")
if ns.onlySubnets && ns.isLocalIP(p.Dst.IP()) {
// In hybrid ("only subnets") mode, bail out early if
// the traffic is destined for an actual Tailscale