Compare commits
2 Commits
irbekrm/de
...
rec_in_use
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
152707960e | ||
|
|
7423c72f84 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -5,6 +5,7 @@
|
||||
*.dll
|
||||
*.so
|
||||
*.dylib
|
||||
*.swp
|
||||
|
||||
cmd/tailscale/tailscale
|
||||
cmd/tailscaled/tailscaled
|
||||
|
||||
@@ -20,6 +20,7 @@ import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"math"
|
||||
"math/big"
|
||||
"math/rand"
|
||||
"os"
|
||||
@@ -41,6 +42,9 @@ import (
|
||||
|
||||
var debug, _ = strconv.ParseBool(os.Getenv("DERP_DEBUG_LOGS"))
|
||||
|
||||
// How long a flow is considered to be active for.
|
||||
var DerpFlowLogTime = 3 * time.Minute
|
||||
|
||||
// verboseDropKeys is the set of destination public keys that should
|
||||
// verbosely log whenever DERP drops a packet.
|
||||
var verboseDropKeys = map[key.Public]bool{}
|
||||
@@ -121,6 +125,10 @@ type Server struct {
|
||||
multiForwarderDeleted expvar.Int
|
||||
removePktForwardOther expvar.Int
|
||||
|
||||
flow_mu sync.Mutex
|
||||
activeFlows map[*sclient]flow
|
||||
flowLogs metrics.LabelMap
|
||||
|
||||
mu sync.Mutex
|
||||
closed bool
|
||||
netConns map[Conn]chan struct{} // chan is closed when conn closes
|
||||
@@ -140,6 +148,17 @@ type Server struct {
|
||||
sentTo map[key.Public]map[key.Public]int64 // src => dst => dst's latest sclient.connNum
|
||||
}
|
||||
|
||||
// Flow retains metrics about packets sent in serial.
|
||||
//
|
||||
type flow struct {
|
||||
packetKinds struct {
|
||||
disco expvar.Int
|
||||
other expvar.Int
|
||||
}
|
||||
createdAt time.Time
|
||||
timer *time.Timer
|
||||
}
|
||||
|
||||
// PacketForwarder is something that can forward packets.
|
||||
//
|
||||
// It's mostly an inteface for circular dependency reasons; the
|
||||
@@ -182,6 +201,9 @@ func NewServer(privateKey key.Private, logf logger.Logf) *Server {
|
||||
memSys0: ms.Sys,
|
||||
watchers: map[*sclient]bool{},
|
||||
sentTo: map[key.Public]map[key.Public]int64{},
|
||||
|
||||
activeFlows: map[*sclient]flow{},
|
||||
flowLogs: metrics.LabelMap{Label: "minutes"},
|
||||
}
|
||||
s.initMetacert()
|
||||
s.packetsRecvDisco = s.packetsRecvByKind.Get("disco")
|
||||
@@ -636,6 +658,8 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("client %x: recvPacket: %v", c.key, err)
|
||||
}
|
||||
/// Do not need to block on updating metrics
|
||||
go s.updateFlow(c, disco.LooksLikeDiscoWrapper(contents), nil)
|
||||
|
||||
var fwd PacketForwarder
|
||||
s.mu.Lock()
|
||||
@@ -854,6 +878,46 @@ func (s *Server) recvPacket(br *bufio.Reader, frameLen uint32) (dstKey key.Publi
|
||||
return dstKey, contents, nil
|
||||
}
|
||||
|
||||
// Updates an active flow for a given client given that we
|
||||
// saw a packet from them
|
||||
func (s *Server) updateFlow(c *sclient, isDiscoPacket bool, done chan<- bool) {
|
||||
s.flow_mu.Lock()
|
||||
defer s.flow_mu.Unlock()
|
||||
|
||||
flow, exists := s.activeFlows[c]
|
||||
if isDiscoPacket {
|
||||
flow.packetKinds.disco.Add(1)
|
||||
} else {
|
||||
flow.packetKinds.other.Add(1)
|
||||
}
|
||||
|
||||
if !exists {
|
||||
flow.createdAt = time.Now()
|
||||
s.activeFlows[c] = flow
|
||||
}
|
||||
|
||||
if flow.timer == nil {
|
||||
flow.timer = time.AfterFunc(DerpFlowLogTime, func() {
|
||||
s.flow_mu.Lock()
|
||||
defer s.flow_mu.Unlock()
|
||||
running_time := time.Since(flow.createdAt)
|
||||
// report how many flows were alive for how many minutes
|
||||
s.flowLogs.Get(fmt.Sprint(math.Ceil(running_time.Minutes()))).Add(1)
|
||||
delete(s.activeFlows, c)
|
||||
if done != nil {
|
||||
done <- true
|
||||
}
|
||||
})
|
||||
} else {
|
||||
if !flow.timer.Reset(DerpFlowLogTime) {
|
||||
// If the previous timer already ran, just stop timer and exit since it either removed it
|
||||
// from the map or is waiting on the lock.
|
||||
flow.timer.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// zpub is the key.Public zero value.
|
||||
var zpub key.Public
|
||||
|
||||
@@ -1290,6 +1354,7 @@ func (s *Server) ExpVar() expvar.Var {
|
||||
m.Set("multiforwarder_created", &s.multiForwarderCreated)
|
||||
m.Set("multiforwarder_deleted", &s.multiForwarderDeleted)
|
||||
m.Set("packet_forwarder_delete_other_value", &s.removePktForwardOther)
|
||||
m.Set("live_flow_durations", &s.flowLogs)
|
||||
var expvarVersion expvar.String
|
||||
expvarVersion.Set(version.Long)
|
||||
m.Set("version", &expvarVersion)
|
||||
|
||||
@@ -28,6 +28,10 @@ import (
|
||||
"tailscale.com/types/logger"
|
||||
)
|
||||
|
||||
func init() {
|
||||
DerpFlowLogTime = time.Nanosecond
|
||||
}
|
||||
|
||||
func newPrivateKey(tb testing.TB) (k key.Private) {
|
||||
tb.Helper()
|
||||
if _, err := crand.Read(k[:]); err != nil {
|
||||
@@ -774,6 +778,27 @@ func TestForwarderRegistration(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestDerpFlowLogging(t *testing.T) {
|
||||
ts := newTestServer(t)
|
||||
defer ts.close(t)
|
||||
wantCounter := func(c *expvar.Int, want int) {
|
||||
t.Helper()
|
||||
if got := c.Value(); got != int64(want) {
|
||||
t.Errorf("counter = %v; want %v", got, want)
|
||||
}
|
||||
}
|
||||
wantCounter(ts.s.flowLogs.Get("1"), 0)
|
||||
tc0 := newRegularClient(t, ts, "c0")
|
||||
defer tc0.close(t)
|
||||
time.Sleep(10 * time.Microsecond)
|
||||
for _, sc := range ts.s.clients {
|
||||
done := make(chan bool, 1)
|
||||
ts.s.updateFlow(sc, false, done)
|
||||
<-done
|
||||
}
|
||||
wantCounter(ts.s.flowLogs.Get("1"), 1)
|
||||
}
|
||||
|
||||
func TestMetaCert(t *testing.T) {
|
||||
priv := newPrivateKey(t)
|
||||
pub := priv.Public()
|
||||
|
||||
Reference in New Issue
Block a user