Compare commits

...

1 Commits

Author SHA1 Message Date
Brad Fitzpatrick
2396a87871 wgengine/filter: do stateful TCP connection tracking
WIP

Fixes #859

Change-Id: I34c077825248dcebf4283d63081e5bc152b7a58b
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
2022-05-06 11:07:33 -07:00
5 changed files with 181 additions and 9 deletions

View File

@@ -93,6 +93,11 @@ var debugCmd = &ffcli.Command{
Exec: localAPIAction("rebind"),
ShortHelp: "force a magicsock rebind",
},
{
Name: "kick-all-tcp-in",
Exec: localAPIAction("kick-all-tcp-in"),
ShortHelp: "test TCP flow kick [incoming]",
},
{
Name: "prefs",
Exec: runPrefs,

View File

@@ -9,6 +9,7 @@ import (
"errors"
"fmt"
"io"
"log"
"net"
"net/http"
"os"
@@ -3283,6 +3284,17 @@ func (b *LocalBackend) DebugReSTUN() error {
return nil
}
func (b *LocalBackend) DebugKickAllTCPIn() error {
filt, ok := b.filterAtomic.Load().(*filter.Filter)
if !ok {
return errors.New("no filter")
}
for _, flow := range filt.OpenTCPFlows() {
log.Printf("XXX: flow open: %+v", flow)
}
return nil
}
func (b *LocalBackend) magicConn() (*magicsock.Conn, error) {
ig, ok := b.e.(wgengine.InternalsGetter)
if !ok {

View File

@@ -278,6 +278,8 @@ func (h *Handler) serveDebug(w http.ResponseWriter, r *http.Request) {
err = h.b.DebugRebind()
case "restun":
err = h.b.DebugReSTUN()
case "kick-all-tcp-in":
err = h.b.DebugKickAllTCPIn()
case "":
err = fmt.Errorf("missing parameter 'action'")
default:

View File

@@ -390,6 +390,16 @@ func (q *Parsed) IsTCPSyn() bool {
return (q.TCPFlags & TCPSynAck) == TCPSyn
}
// IsTCPRst reports whether q is a TCP RST packet.
func (q *Parsed) IsTCPRst() bool {
return (q.TCPFlags & TCPRst) != 0
}
// IsTCPFin reports whether q is a TCP FIN packet.
func (q *Parsed) IsTCPFin() bool {
return (q.TCPFlags & TCPFin) != 0
}
// IsError reports whether q is an ICMP "Error" packet.
func (q *Parsed) IsError() bool {
switch q.IPProto {

View File

@@ -7,6 +7,7 @@ package filter
import (
"fmt"
"log"
"sync"
"time"
@@ -17,6 +18,7 @@ import (
"tailscale.com/tstime/rate"
"tailscale.com/types/ipproto"
"tailscale.com/types/logger"
"tailscale.com/util/mak"
)
// Filter is a stateful packet filter.
@@ -56,8 +58,106 @@ type Filter struct {
// filterState is a state cache of past seen packets.
type filterState struct {
mu sync.Mutex
mu sync.Mutex // guards following
// lru is the flow track cached used by UDP & SCTP.
lru *flowtrack.Cache // from flowtrack.Tuple -> nil
// tcpFlows tracks open TCP flows, both inbound and outbound. Regardless of
// which direction initiated it, the Tuple's Src is always the remote side
// and Dst is the local side.
tcpFlows map[flowtrack.Tuple]*TCPFlow
}
// OpenTCPFlows returns the set of open TCP flows in an unsorted order.
func (f *Filter) OpenTCPFlows() []*TCPFlow {
st := f.state
st.mu.Lock()
defer st.mu.Unlock()
ret := make([]*TCPFlow, 0, len(st.tcpFlows))
for _, f := range st.tcpFlows {
ret = append(ret, f)
}
return ret
}
type TCPFlow struct {
flowtrack.Tuple
Out bool // was an outbound connection (from local tailscale)
Created time.Time
// TODO(bradfitz): lastActivity mono.Time, once we can do it fast enough
// to update on all packets.
mu sync.Mutex // guards finIn/finOut; lock order: filterState.mu, then TCPFlow.mu
// finOut and finIn record whether we've seen a FIN packet in or out
// for this flow.
finOut bool
finIn bool
}
func (s *filterState) addOutgoingTCPFlow(t flowtrack.Tuple) {
s.mu.Lock()
defer s.mu.Unlock()
log.Printf("XXX adding out flow %v", t)
mak.Set(&s.tcpFlows, t, &TCPFlow{
Tuple: t,
Out: true,
Created: time.Now(),
})
}
func (s *filterState) addIncomingTCPFlow(t flowtrack.Tuple) {
s.mu.Lock()
defer s.mu.Unlock()
log.Printf("XXX adding in flow %v", t)
mak.Set(&s.tcpFlows, t, &TCPFlow{
Tuple: t,
Out: false,
Created: time.Now(),
})
}
func (s *filterState) removeTCPFlow(t flowtrack.Tuple) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.tcpFlows, t)
log.Printf("XXX removing flow %v", t)
}
func (s *filterState) setFinOut(t flowtrack.Tuple) {
s.mu.Lock()
defer s.mu.Unlock()
if f, ok := s.tcpFlows[t]; ok {
f.mu.Lock()
f.finOut = true
done := f.finOut && f.finIn
f.mu.Unlock()
log.Printf("XXX FIN out for flow %v", t)
if done {
delete(s.tcpFlows, t)
log.Printf("XXX due to FINs, removing flow %v", t)
}
} else {
log.Printf("XXX FIN out for unknown flow %v", t)
}
}
func (s *filterState) setFinIn(t flowtrack.Tuple) {
s.mu.Lock()
defer s.mu.Unlock()
if f, ok := s.tcpFlows[t]; ok {
f.mu.Lock()
f.finIn = true
done := f.finOut && f.finIn
f.mu.Unlock()
log.Printf("XXX FIN in for flow %v", t)
if done {
delete(s.tcpFlows, t)
log.Printf("XXX due to FINs, removing flow %v", t)
}
} else {
log.Printf("XXX FIN in for unknown flow %v", t)
}
}
// lruMax is the size of the LRU cache in filterState.
@@ -416,12 +516,25 @@ func (f *Filter) runIn4(q *packet.Parsed) (r Response, why string) {
// can't be initiated without first sending a SYN.
// It happens to also be much faster.
// TODO(apenwarr): Skip the rest of decoding in this path?
if !q.IsTCPSyn() {
return Accept, "tcp non-syn"
if q.IsTCPSyn() {
if f.matches4.match(q) {
t := flowtrack.Tuple{Proto: q.IPProto, Src: q.Src, Dst: q.Dst}
f.state.addIncomingTCPFlow(t)
return Accept, "tcp ok"
}
return Drop, "no rules matched"
}
if f.matches4.match(q) {
return Accept, "tcp ok"
isFin := q.IsTCPFin()
isRst := q.IsTCPRst()
if isFin || isRst {
t := flowtrack.Tuple{Proto: q.IPProto, Src: q.Src, Dst: q.Dst}
if isFin {
f.state.setFinIn(t)
} else if isRst {
f.state.removeTCPFlow(t)
}
}
return Accept, "tcp non-syn"
case ipproto.UDP, ipproto.SCTP:
t := flowtrack.Tuple{Proto: q.IPProto, Src: q.Src, Dst: q.Dst}
@@ -476,12 +589,25 @@ func (f *Filter) runIn6(q *packet.Parsed) (r Response, why string) {
// can't be initiated without first sending a SYN.
// It happens to also be much faster.
// TODO(apenwarr): Skip the rest of decoding in this path?
if q.IPProto == ipproto.TCP && !q.IsTCPSyn() {
return Accept, "tcp non-syn"
if q.IsTCPSyn() {
if f.matches6.match(q) {
t := flowtrack.Tuple{Proto: q.IPProto, Src: q.Src, Dst: q.Dst}
f.state.addIncomingTCPFlow(t)
return Accept, "tcp ok"
}
return Drop, "no rules matched"
}
if f.matches6.match(q) {
return Accept, "tcp ok"
isFin := q.IsTCPFin()
isRst := q.IsTCPRst()
if isFin || isRst {
t := flowtrack.Tuple{Proto: q.IPProto, Src: q.Src, Dst: q.Dst}
if isFin {
f.state.setFinIn(t)
} else if isRst {
f.state.removeTCPFlow(t)
}
}
return Accept, "tcp non-syn"
case ipproto.UDP, ipproto.SCTP:
t := flowtrack.Tuple{Proto: q.IPProto, Src: q.Src, Dst: q.Dst}
@@ -517,6 +643,23 @@ func (f *Filter) runOut(q *packet.Parsed) (r Response, why string) {
f.state.mu.Lock()
f.state.lru.Add(tuple, nil)
f.state.mu.Unlock()
case ipproto.TCP:
isSyn := q.IsTCPSyn()
isRst := q.IsTCPRst()
isFin := q.IsTCPFin()
if isSyn || isRst || isFin {
tuple := flowtrack.Tuple{
Proto: q.IPProto,
Src: q.Dst, Dst: q.Src, // src/dst reversed
}
if isSyn {
f.state.addOutgoingTCPFlow(tuple)
} else if isRst {
f.state.removeTCPFlow(tuple)
} else if isFin {
f.state.setFinOut(tuple)
}
}
}
return Accept, "ok out"
}