Compare commits

...

3 Commits

Author SHA1 Message Date
Simeng He
ab3eaf69ce bad news 2021-06-28 09:41:59 -04:00
Simeng He
a291147a24 garbage clocking 2021-06-28 09:41:59 -04:00
Simeng He
7938353776 net/isoping: add isoping package
Signed-off-by: Simeng He <simeng@tailscale.com>
2021-06-28 09:41:59 -04:00
3 changed files with 604 additions and 0 deletions

24
net/isoping/constants.go Normal file
View File

@@ -0,0 +1,24 @@
// Copyright (c) 2021 Tailscale Inc & AUTHORS All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package isoping
const (
MAGIC = 0x424c4950
SERVER_PORT = ":4948"
DEFAULT_PACKETS_PER_SEC float64 = 10.0
USEC_PER_CYCLE = (10 * 1000 * 1000)
)
// DIV takes two int64 divides the two and returns a float64
func DIV(x int64, y int64) float64 {
if y == 0 {
return 0
}
return float64(x) / float64(y)
}
// DIFF takes the difference between two uint32s and returns int32
func DIFF(x uint32, y uint32) int32 {
return int32(uint32(x) - uint32(y))
}

475
net/isoping/isoping.go Normal file
View File

@@ -0,0 +1,475 @@
// Copyright (c) 2021 Tailscale Inc & AUTHORS All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// package isoping implements isoping in Go.
package isoping
import (
"bytes"
"encoding/binary"
"log"
"math"
"net"
"reflect"
"time"
)
type Packet struct {
Magic uint32 // Magic number to reject bogus packets
Id uint32 // Id is a sequential packet id number
Txtime uint32 // Txtime is the transmitter's monotonic time when pkt was sent
Clockdiff uint32 // Clockdiff is an estimate of (transmitter's clk) - (receiver's clk)
Usec_per_pkt uint32 // Usec_per_pkt microseconds of delay between packets
Num_lost uint32 // Num_lost is the number of pkts transmitter expected to get but didn't
First_ack uint32 // First_ack is the starting index in acks[] circular buffer
Acks [64]struct {
// txtime==0 for empty elements in this array.
Id uint32 // Id field from a received packet
Rxtime uint32 // Rxtime is a receiver's monotonic time when pkt arrived
}
}
type Isoping struct {
ClockStartTime time.Time // ClockStartTime is the time the program starts
IsServer bool // IsServer distinguishes if we are a server or client
Conn *net.UDPConn // Conn is either the server or client's connection
Tx Packet // Tx is a Packet that will be sent
Rx Packet // Rx is a Packet that will be sent
LastAckInfo string // LastAckInfo human readable format of latest ack
ListenAddr *net.UDPAddr // ListenAddr is the address of the listener
RemoteAddr *net.UDPAddr // RemtoteAddr remote UDP address we send to.
RxAddr *net.UDPAddr // RxAddr keeps track of what address we are sending to
LastRxAddr *net.UDPAddr // LastRxAddr keeps track of what we last used
Quiet bool
printsPerSec float64
packetsPerSec float64
usecPerPkt int32
usecPerPrint int32
nextTxId uint32
nextRxId uint32
nextRxackId uint32
startRtxtime uint32 // remote's txtime at startup
startRxtime uint32 // local rxtime at startup
lastRxtime uint32 // local rxtime of last received packet
minCycleRxdiff int32 // smallest packet delay seen this cycle
nextCycle uint32 // time when next cycle begins
now uint32 // current time
nextSend uint32 // time when we'll send next pkt
numLost uint32 // number of rx packets not received
nextTxackIndex int // next array item to fill in tx.acks
lastPrint uint32 // time of last packet printout
latTx int64
latTxMin int64
latTxMax int64
latTxCount int64
latTxSum int64
latTxVarSum int64
latRx int64
latRxMin int64
latRxMax int64
latRxCount int64
latRxSum int64
latRxVarSum int64
}
// Incremental standard deviation calculation, without needing to know the
// mean in advance. See:
// http://mathcentral.uregina.ca/QQ/database/QQ.09.02/carlos1.html
func onepass_stddev(sumsq int64, sum int64, count int64) float64 {
numer := (count * sumsq) - (sum * sum)
denom := count * (count - 1)
return math.Sqrt(DIV(numer, denom))
}
// ustimenow subtracts the time since the program started and returns it
func (srv *Isoping) ustimenow() uint64 {
tn := time.Since(srv.ClockStartTime)
return uint64(tn.Microseconds())
}
// Ustime casts the result of ustimenow to uint32 and returns it
func (srv *Isoping) Ustime() uint32 {
return uint32(srv.ustimenow())
}
// initClock keeps track of when the server/client starts.
// keeps the exact time and we can subtract from the time
// to get monotonicClock values
func (srv *Isoping) initClock() {
srv.ClockStartTime = time.Now()
}
// initClient sets the Isoping.Conn, to the address string otherwise
// uses [::]:4948 as the default
func (srv *Isoping) initClient(addressString string) {
srv.initClock()
srv.IsServer = false
udpaddr, err := net.ResolveUDPAddr("udp6", addressString)
if err != nil {
log.Println(err)
addr := "[::]" + SERVER_PORT
udpaddr, err = net.ResolveUDPAddr("udp6", addr)
if err != nil {
log.Println(err)
return
}
log.Printf("Address %v failed to resolve, using %v instead\n", addressString, udpaddr)
}
conn, err := net.DialUDP("udp6", nil, udpaddr)
if err != nil {
log.Println(err)
return
}
srv.RemoteAddr = udpaddr
srv.Conn = conn
}
// initServer sets the Conn field of Isoping, for the listener side.
func (srv *Isoping) initServer() {
srv.initClock()
srv.IsServer = true
addr, err := net.ResolveUDPAddr("udp6", SERVER_PORT)
if err != nil {
log.Println(err)
return
}
srv.ListenAddr = addr
srv.Conn, err = net.ListenUDP("udp", addr)
if err != nil {
log.Printf("%v\n", err)
return
}
}
// initVars initializes a lot of the necessary variables for the calculation
func (srv *Isoping) initVars() {
srv.packetsPerSec = DEFAULT_PACKETS_PER_SEC
srv.printsPerSec = -1
srv.usecPerPkt = int32(1e6 / srv.packetsPerSec)
srv.usecPerPrint = 0
if srv.usecPerPrint > 0 {
srv.usecPerPrint = int32(1e6 / srv.printsPerSec)
}
log.Println("UsecPerPkt : ", srv.usecPerPkt)
log.Println("UsecPerPrint : ", srv.usecPerPrint)
srv.nextTxId = 1
srv.nextRxId = 0
srv.nextRxackId = 0
srv.startRtxtime = 0
srv.startRxtime = 0
srv.lastRxtime = 0
srv.minCycleRxdiff = 0
srv.nextCycle = 0
srv.now = srv.Ustime()
srv.nextSend = srv.now + uint32(srv.usecPerPkt)
srv.numLost = 0
srv.nextTxackIndex = 0
srv.Tx = Packet{}
srv.Rx = Packet{}
srv.LastAckInfo = ""
srv.lastPrint = srv.now - uint32(srv.usecPerPkt)
srv.latTx, srv.latTxMin, srv.latTxMax = 0, 0x7fffffff, 0
srv.latTxCount, srv.latTxMin, srv.latTxVarSum = 0, 0, 0
srv.latRx, srv.latRxMin, srv.latRxMax = 0, 0x7fffffff, 0
srv.latRxCount, srv.latRxMin, srv.latRxVarSum = 0, 0, 0
}
// generateInitialPacket generates the inital packet Tx
func (srv *Isoping) generateInitialPacket() (*bytes.Buffer, error) {
srv.Tx.Magic = MAGIC
srv.Tx.Id = srv.nextTxId
srv.nextTxId++
srv.Tx.Txtime = srv.nextSend
srv.Tx.Usec_per_pkt = uint32(srv.usecPerPkt)
srv.Tx.Clockdiff = 0
if srv.startRtxtime > 0 {
srv.Tx.Clockdiff = srv.startRtxtime - srv.startRxtime
log.Println("SETCLOCKDIFF to", srv.Rx.Clockdiff)
}
srv.Tx.Num_lost = srv.numLost
srv.Tx.First_ack = uint32(srv.nextTxackIndex)
// Setup the Tx to be sent from either server of client
buf := new(bytes.Buffer)
return buf, binary.Write(buf, binary.BigEndian, srv.Tx)
}
// Start starts the Isoping instance, if an address is passed a client is started
// with that address, otherwise a server will start.
func (srv *Isoping) Start(address ...string) {
if len(address) > 0 {
srv.initClient(address[0])
} else {
srv.initServer()
}
srv.initVars()
}
func (srv *Isoping) MainTest() {
for {
srv.initTimer()
}
}
func (srv *Isoping) initTimer() {
tv := time.Duration(0)
if DIFF(srv.nextSend, srv.now) < 0 {
log.Printf("Set to %v us\n", tv.Microseconds())
} else {
tv = time.Microsecond * time.Duration(DIFF(srv.nextSend, srv.now))
log.Printf("Set to %v us\n", tv.Microseconds())
}
log.Println(tv)
// emulate the select with timeout of tv.
if srv.RemoteAddr != nil {
deadline := time.Now().Add(time.Duration(tv.Microseconds()) * time.Microsecond)
log.Println("TIMEOUT DURATION : ", deadline)
err := srv.Conn.SetReadDeadline(deadline)
if err != nil {
log.Println(err)
return
}
} else {
// Reset the timeout, we will have no timeout in this case.
err := srv.Conn.SetReadDeadline(time.Time{})
if err != nil {
log.Println(err)
return
}
}
srv.now = srv.Ustime()
}
func (srv *Isoping) MainLoop() {
for {
srv.initTimer()
log.Printf("%d - %d = %d\n", srv.now, srv.nextSend, DIFF(srv.now, srv.nextSend))
if srv.RemoteAddr != nil && DIFF(srv.now, srv.nextSend) >= 0 {
// Check if it is a server or not
buf, err := srv.generateInitialPacket()
if err != nil {
log.Println(err)
continue
}
if srv.IsServer {
log.Println("Isoping Server Found.")
// Attempt to send the srv.Tx to the client
n, err := srv.Conn.WriteToUDP(buf.Bytes(), srv.RemoteAddr)
if err != nil {
log.Println(err)
}
log.Printf("Sent %v bytes to %v\n", n, srv.RemoteAddr)
} else {
log.Println("Isoping Client Found.")
n, err := srv.Conn.Write(buf.Bytes())
if err != nil {
log.Println(err)
}
log.Printf("Sent %v bytes to %v\n", n, srv.RemoteAddr)
}
if srv.IsServer && DIFF(srv.now, srv.lastRxtime) > 60*1000*1000 {
log.Println("client disconnected")
srv.RemoteAddr = nil
}
srv.nextSend += uint32(srv.usecPerPkt)
}
// handle incoming packet
log.Println("BEFORE READ")
p := make([]byte, binary.Size(srv.Rx))
got, rxaddr, err := srv.Conn.ReadFromUDP(p)
if err != nil {
// log.Println("READ error : ", err)
continue
} else {
log.Println("SUCC")
}
srv.RxAddr = rxaddr
log.Println("AFTER READ")
// Copy the data received into the Rx struct.
buffer := bytes.NewBuffer(p)
err = binary.Read(buffer, binary.BigEndian, &srv.Rx)
if err != nil {
log.Println("BINARY READ err: ", err)
continue
}
if got != binary.Size(srv.Rx) || srv.Rx.Magic != MAGIC {
log.Println("received Rx is not proper")
continue
}
log.Printf("%+v\n", srv.Rx)
if srv.IsServer {
if srv.RemoteAddr == nil || !reflect.DeepEqual(srv.RxAddr, srv.LastRxAddr) {
log.Printf("new client connected %v\n", srv.RxAddr)
}
srv.LastRxAddr = srv.RxAddr
log.Println(srv.LastRxAddr, srv.RxAddr)
srv.RemoteAddr = srv.LastRxAddr
srv.nextSend = srv.now + 10*1000
srv.nextTxId = 1
srv.nextRxId, srv.nextRxackId = 0, 0
srv.startRtxtime, srv.startRxtime = 0, 0
srv.numLost = 0
srv.nextTxackIndex = 0
srv.usecPerPkt = int32(srv.Rx.Usec_per_pkt)
// memset Tx to reset it.
srv.Tx = Packet{}
log.Println("Remote address for server : ", srv.RemoteAddr)
}
txtime := srv.Rx.Txtime
rxtime := srv.now
id := srv.Rx.Id
if srv.nextRxId == 0 {
log.Println(txtime, id)
srv.startRtxtime = txtime - id*uint32(srv.usecPerPkt)
srv.startRxtime = rxtime - id*uint32(srv.usecPerPkt)
srv.minCycleRxdiff = 0
srv.nextRxId = id
srv.nextCycle = srv.now + USEC_PER_CYCLE
}
log.Println("tmpdiff break")
tmpdiff := DIFF(id, srv.nextRxId)
if tmpdiff > 0 {
log.Printf("lost %v expected=%v got=%v\n",
int64(tmpdiff), int64(srv.nextRxId), int64(id))
srv.numLost += uint32(tmpdiff)
srv.nextRxId += uint32(tmpdiff) + 1
} else if tmpdiff == 0 {
log.Println("tmpdiff == 0 -> nextRxId ++")
srv.nextRxId++
} else if tmpdiff < 0 {
log.Println("out-of-order packets? %ld\n", int64(tmpdiff))
}
tmpdiff = DIFF(rxtime, srv.startRxtime+id*uint32(srv.usecPerPkt))
if tmpdiff < -20 {
log.Printf("time paradox: backsliding start by %v usec\n", tmpdiff)
srv.startRxtime = rxtime - id*uint32(srv.usecPerPkt)
}
rxdiff := DIFF(rxtime, srv.startRxtime+id*uint32(srv.usecPerPkt))
clockdiff := DIFF(srv.startRxtime, srv.startRtxtime)
rtt := clockdiff + int32(srv.Rx.Clockdiff)
// Casting issue here may exist
offset := DIFF(uint32(clockdiff), uint32(rtt/2))
if srv.Rx.Clockdiff == 0 {
srv.lastPrint = srv.now - uint32(srv.usecPerPrint) + 1
} else {
srv.latRxCount++
srv.latRx = int64(rxdiff) + int64(rtt/2)
// Take the mins and maxes.
if srv.latRxMin > srv.latRx {
srv.latRxMin = srv.latRx
}
if srv.latRxMax < srv.latRx {
srv.latRxMax = srv.latRx
}
srv.latRxSum += srv.latRx
srv.latRxVarSum += srv.latRx * srv.latRx
}
okToPrint := !srv.Quiet && DIFF(srv.now, srv.lastPrint) >= srv.usecPerPrint
log.Printf("OKTOPRINT : %v, QUIET : %v\n", okToPrint, srv.Quiet)
log.Printf("%v - %v = %v\n", srv.now, srv.lastPrint, DIFF(srv.now, srv.lastPrint))
log.Printf("usecPerPrint : %v", srv.usecPerPrint)
// Print the information.
if okToPrint {
ackinfo := srv.LastAckInfo
msRx := float64((rxdiff + rtt/2) / 1000.0)
min := float64((rtt / 2) / 1000.0)
rxNumLost := int64(srv.Rx.Num_lost)
nextTxId := srv.nextTxId - 1
numLost := int64(srv.numLost)
nextRxId := int64(srv.nextRxId - 1)
log.Printf("%12s %6.1f ms rx (min=%.1f) loss: %v/%v tx %v/%v rx\n",
ackinfo, msRx, min, rxNumLost, nextTxId, numLost, nextRxId)
srv.lastPrint = srv.now
}
if rxdiff < srv.minCycleRxdiff {
srv.minCycleRxdiff = rxdiff
}
if DIFF(srv.now, srv.nextCycle) >= 0 {
if srv.minCycleRxdiff > 0 {
log.Printf("clock skew: sliding start by %v usec\n", srv.minCycleRxdiff)
srv.startRxtime += uint32(srv.minCycleRxdiff)
}
srv.minCycleRxdiff = 0x7fffffff
srv.nextCycle += uint32(srv.minCycleRxdiff)
}
log.Println("SCHEDULING")
srv.Tx.Acks[srv.nextTxackIndex].Id = id
srv.Tx.Acks[srv.nextTxackIndex].Rxtime = rxtime
srv.nextTxackIndex = (srv.nextTxackIndex + 1) % len(srv.Tx.Acks)
first_ack := uint32(srv.Rx.First_ack)
log.Println("FOR START")
for i := uint32(0); i < uint32(len(srv.Rx.Acks)); i++ {
var acki uint32 = (first_ack + i) % uint32(len(srv.Rx.Acks))
var ackid uint32 = srv.Rx.Acks[acki].Id
if ackid == 0 {
continue
}
if DIFF(ackid, srv.nextRxackId) >= 0 {
log.Println("EXPECTED AN ACK")
startTxTime := srv.nextSend - srv.nextTxId*uint32(srv.usecPerPkt)
txtime := startTxTime + ackid*uint32(srv.usecPerPkt)
rrxtime := srv.Rx.Acks[acki].Rxtime
rxtime := rrxtime + uint32(offset)
txdiff := DIFF(rxtime, txtime)
if srv.usecPerPkt <= 0 && len(srv.LastAckInfo) > 0 {
log.Printf("%12s\n", srv.LastAckInfo)
}
if len(srv.LastAckInfo) == 0 {
//populate it
}
srv.nextRxackId = ackid + 1
srv.latTxCount++
srv.latTx = int64(txdiff)
if srv.latTxMin > srv.latTx {
srv.latTxMin = srv.latTx
}
if srv.latTxMax < srv.latTx {
srv.latTxMax = srv.latTx
}
srv.latTxSum += srv.latTx
srv.latTxVarSum += srv.latTx * srv.latTx
}
}
srv.lastRxtime = rxtime
}
}
func (srv *Isoping) printResult() {
log.Printf("\n---\n")
}

105
net/isoping/isoping_test.go Normal file
View File

@@ -0,0 +1,105 @@
// Copyright (c) 2021 Tailscale Inc & AUTHORS All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package isoping
import (
"bytes"
"encoding/binary"
"math"
"testing"
)
// Tests if our stddev calculation is within reason
// Must do some rounding to a certain significant digit
// Currently only need 6 digits for the testing.
func sigDigs(x float64, digs int) float64 {
return math.Round(x*math.Pow10(digs)) / math.Pow10(digs)
}
// TestOnepass_stddev tests if the function receives the same answer as in
// the C implementation of this function.
func TestOnepass_stddev(t *testing.T) {
t.Parallel()
answer := sigDigs(onepass_stddev(12, 2, 3), 6)
expected := 2.309401
answer2 := sigDigs(onepass_stddev(12023232232, 212, 321), 6)
expected2 := 6129.649279
if answer != expected {
t.Errorf("got %v, expected %v", answer, expected)
}
if answer2 != expected2 {
t.Errorf("got %v, expected %v", answer2, expected2)
}
}
// TestUstimeCast tests if casting was correct
// sanity check, probably will be removed for redundancy
func TestUstimeCast(t *testing.T) {
t.Parallel()
var num uint64 = 11471851221
var expected uint32 = 2881916629
if uint32(num) != expected {
t.Errorf("expected %v, got : %v", expected, uint32(num))
}
}
// TestValidInitialPacket will send a packet via UDP, and check if it matches
// The size and the Magic number field that needs to be equal.
// This mocks the initial packet sent in Isoping.
func TestValidInitialPacket(t *testing.T) {
client := Isoping{IsServer: false}
client.Start("[::]:4948")
server := Isoping{IsServer: true}
server.Start()
defer server.Conn.Close()
buf, err := client.generateInitialPacket()
if err != nil {
t.Error(err)
}
// Client writes to the server, server tries to read it.
p := make([]byte, binary.Size(server.Rx))
if _, err := client.Conn.Write(buf.Bytes()); err != nil {
t.Error(err)
}
got, rxaddr, err := server.Conn.ReadFromUDP(p)
if err != nil {
t.Error(err)
}
buffer := bytes.NewBuffer(p)
defer buffer.Reset()
err = binary.Read(buffer, binary.BigEndian, &server.Rx)
if err != nil {
t.Error(err)
}
if got != binary.Size(server.Rx) || server.Rx.Magic != MAGIC {
t.Error("received Rx is not proper")
}
t.Logf("Proper Packet received from %v\n", rxaddr)
}
func TestMainLoop(t *testing.T) {
server := Isoping{}
server.Start()
defer server.Conn.Close()
server.MainLoop()
// server.MainTest()
}
func TestStartClient(t *testing.T) {
client := Isoping{}
client.Start("[::]:4948")
defer client.Conn.Close()
client.MainLoop()
// client.MainTest()
}