Compare commits

...

49 Commits

Author SHA1 Message Date
Fran Bull
27b69ca97b wip 2025-02-27 13:43:56 -08:00
Fran Bull
ea684a5ed5 update for start signature change 2025-02-27 13:43:56 -08:00
Fran Bull
238fe26165 fix cmd/natc-consensus tests 2025-02-27 13:43:56 -08:00
Fran Bull
06347d7cc7 wip 2025-02-27 13:43:56 -08:00
Fran Bull
cd7d3ae4d2 add clarifying continue 2025-02-27 13:35:07 -08:00
Fran Bull
7dbcb388b8 shorten 2025-02-27 13:30:11 -08:00
Fran Bull
c4110ec886 Export functions in authorization file 2025-02-27 13:23:26 -08:00
Fran Bull
24ce3279f4 close the conn, don't leave it open 2025-02-27 13:01:05 -08:00
Fran Bull
ad7d1ee07a addr -> netip.AddrPortFrom().String() 2025-02-27 11:41:28 -08:00
Fran Bull
4175e2e21d don't try to http.Error after trying to encode to w 2025-02-27 10:18:30 -08:00
Fran Bull
af2fd8bd7e change comment style 2025-02-27 09:38:28 -08:00
Fran Bull
559643b034 command mux robustness 2025-02-27 08:47:24 -08:00
Fran Bull
b789daaf99 badrequest -> methodnotallowed 2025-02-27 08:01:23 -08:00
Fran Bull
ace8630d89 splithostport -> parseaddrport 2025-02-27 07:47:07 -08:00
Fran Bull
e8b2224932 401 -> 403 2025-02-27 07:39:16 -08:00
Fran Bull
69f76641ac auth getstatus max 1/s 2025-02-27 07:31:30 -08:00
Fran Bull
ae30f58b46 centralize cmd http auth 2025-02-27 05:15:49 -08:00
Fran Bull
5afa742b06 unnecessary export 2025-02-26 15:03:37 -08:00
Fran Bull
c35c3d1194 use a views.Slice 2025-02-26 14:45:44 -08:00
Fran Bull
66ecab9540 use a set for authorization, test it 2025-02-26 14:09:24 -08:00
Fran Bull
f63ce0066d comment use of mutex 2025-02-26 09:36:20 -08:00
Fran Bull
9d65e1fc22 use status code constants, and the right ones 2025-02-26 09:34:13 -08:00
Fran Bull
2d2b954006 no need to defer 2025-02-26 09:26:39 -08:00
Fran Bull
5e15c25937 no need for a blank return at the end 2025-02-26 09:25:36 -08:00
Fran Bull
f7ec770f03 use new encoder, not marshalindent 2025-02-26 09:10:10 -08:00
Fran Bull
3a35ac716d only serve debug monitor if asked for 2025-02-26 09:04:51 -08:00
Fran Bull
e13b8c271b log errors don't return to http client 2025-02-26 08:38:08 -08:00
Fran Bull
e0415e0221 use dnsname instead of strings split 2025-02-26 08:29:51 -08:00
Fran Bull
febe30ea68 error auth refresh if server not running 2025-02-26 08:29:13 -08:00
Fran Bull
5fa145674d make tests pass under -race 2025-02-25 14:57:50 -08:00
Fran Bull
8dfb749ea5 missed a direct events access 2025-02-24 12:01:15 -08:00
Fran Bull
773894638c lint says conn can't be nil 2025-02-24 11:55:45 -08:00
Fran Bull
0b971dffd3 configure logging 2025-02-24 11:41:53 -08:00
Fran Bull
f0223a9dba avoid race 2025-02-24 10:38:21 -08:00
Fran Bull
3ed0736ae9 protect nil pointer 2025-02-24 10:29:05 -08:00
Fran Bull
05277e020e move package doc to right place 2025-02-24 08:45:10 -08:00
Fran Bull
e623e1d2d9 go mod tidy 2025-02-24 08:30:45 -08:00
Fran Bull
aee5b38001 use httpm i guess? 2025-02-21 13:38:53 -08:00
Fran Bull
89af057be5 add copyright headers 2025-02-21 13:10:59 -08:00
Fran Bull
d944cd1778 do we need to wait longer? 2025-02-21 13:07:41 -08:00
Fran Bull
0354836398 don't actually need those bits for the interface 2025-02-21 13:06:07 -08:00
Fran Bull
4040e14cb8 use the error value 2025-02-21 13:02:05 -08:00
Fran Bull
82e6b2508a fix test 2025-02-21 12:34:53 -08:00
Fran Bull
a828917152 allow concurrent usage of authorization 2025-02-21 11:24:00 -08:00
Fran Bull
d593a85bae comments, whitespace 2025-02-21 11:23:22 -08:00
Fran Bull
7c539e3d2f to squash: restrict communication to tagged nodes 2025-02-20 15:14:22 -08:00
Fran Bull
6ebb0c749d tsconsensus: add a tsconsensus package
tsconsensus enables tsnet.Server instances to form a consensus.

tsconsensus wraps hashicorp/raft with
 * the ability to do discovery via tailscale tags
 * inter node communication over tailscale
 * routing of commands to the leader

Updates #14667

Signed-off-by: Fran Bull <fran@tailscale.com>
2025-02-20 11:12:17 -08:00
Erisa A
074372d6c5 scripts/installer.sh: add SparkyLinux as a Debian derivative (#15076)
Fixes #15075

Signed-off-by: Erisa A <erisa@tailscale.com>
2025-02-20 18:22:08 +00:00
Andrew Lytvynov
2c3338c46b client/tailscale: fix Client.BuildURL and Client.BuildTailnetURL (#15064)
This method uses `path.Join` to build the URL. Turns out with 1.24 this
started stripping consecutive "/" characters, so "http://..." in baseURL
becomes "http:/...".

Also, `c.Tailnet` is a function that returns `c.tailnet`. Using it as a
path element would encode as a pointer instead of the tailnet name.

Finally, provide a way to prevent escaping of path elements e.g. for `?`
in `acl?details=1`.

Updates #15015

Signed-off-by: Andrew Lytvynov <awly@tailscale.com>
2025-02-19 17:19:54 -08:00
16 changed files with 2876 additions and 19 deletions

View File

@@ -12,6 +12,7 @@ import (
"fmt"
"net/http"
"net/netip"
"net/url"
)
// ACLRow defines a rule that grants access by a set of users or groups to a set
@@ -126,7 +127,7 @@ func (c *Client) ACLHuJSON(ctx context.Context) (acl *ACLHuJSON, err error) {
}
}()
path := c.BuildTailnetURL("acl?details=1")
path := c.BuildTailnetURL("acl", url.Values{"details": {"1"}})
req, err := http.NewRequestWithContext(ctx, "GET", path, nil)
if err != nil {
return nil, err
@@ -146,7 +147,7 @@ func (c *Client) ACLHuJSON(ctx context.Context) (acl *ACLHuJSON, err error) {
Warnings []string `json:"warnings"`
}{}
if err := json.Unmarshal(b, &data); err != nil {
return nil, err
return nil, fmt.Errorf("json.Unmarshal %q: %w", b, err)
}
acl = &ACLHuJSON{
@@ -328,7 +329,7 @@ type ACLPreview struct {
}
func (c *Client) previewACLPostRequest(ctx context.Context, body []byte, previewType string, previewFor string) (res *ACLPreviewResponse, err error) {
path := c.BuildTailnetURL("acl/preview")
path := c.BuildTailnetURL("acl", "preview")
req, err := http.NewRequestWithContext(ctx, "POST", path, bytes.NewBuffer(body))
if err != nil {
return nil, err
@@ -488,7 +489,7 @@ func (c *Client) ValidateACLJSON(ctx context.Context, source, dest string) (test
return nil, err
}
path := c.BuildTailnetURL("acl/validate")
path := c.BuildTailnetURL("acl", "validate")
req, err := http.NewRequestWithContext(ctx, "POST", path, bytes.NewBuffer(postData))
if err != nil {
return nil, err

View File

@@ -66,31 +66,41 @@ func (c *Client) httpClient() *http.Client {
}
// BuildURL builds a url to http(s)://<apiserver>/api/v2/<slash-separated-pathElements>
// using the given pathElements. It url escapes each path element, so the caller
// doesn't need to worry about that.
// using the given pathElements. It url escapes each path element, so the
// caller doesn't need to worry about that. The last item of pathElements can
// be of type url.Values to add a query string to the URL.
//
// For example, BuildURL(devices, 5) with the default server URL would result in
// https://api.tailscale.com/api/v2/devices/5.
func (c *Client) BuildURL(pathElements ...any) string {
elem := make([]string, 2, len(pathElements)+1)
elem[0] = c.baseURL()
elem[1] = "/api/v2"
for _, pathElement := range pathElements {
elem = append(elem, url.PathEscape(fmt.Sprint(pathElement)))
elem := make([]string, 1, len(pathElements)+1)
elem[0] = "/api/v2"
var query string
for i, pathElement := range pathElements {
if uv, ok := pathElement.(url.Values); ok && i == len(pathElements)-1 {
query = uv.Encode()
} else {
elem = append(elem, url.PathEscape(fmt.Sprint(pathElement)))
}
}
return path.Join(elem...)
url := c.baseURL() + path.Join(elem...)
if query != "" {
url += "?" + query
}
return url
}
// BuildTailnetURL builds a url to http(s)://<apiserver>/api/v2/tailnet/<tailnet>/<slash-separated-pathElements>
// using the given pathElements. It url escapes each path element, so the
// caller doesn't need to worry about that.
// using the given pathElements. It url escapes each path element, so the
// caller doesn't need to worry about that. The last item of pathElements can
// be of type url.Values to add a query string to the URL.
//
// For example, BuildTailnetURL(policy, validate) with the default server URL and a tailnet of "example.com"
// would result in https://api.tailscale.com/api/v2/tailnet/example.com/policy/validate.
func (c *Client) BuildTailnetURL(pathElements ...any) string {
allElements := make([]any, 3, len(pathElements)+2)
allElements := make([]any, 2, len(pathElements)+2)
allElements[0] = "tailnet"
allElements[1] = c.Tailnet
allElements[1] = c.tailnet
allElements = append(allElements, pathElements...)
return c.BuildURL(allElements...)
}
@@ -189,7 +199,7 @@ func (e ErrResponse) Error() string {
func HandleErrorResponse(b []byte, resp *http.Response) error {
var errResp ErrResponse
if err := json.Unmarshal(b, &errResp); err != nil {
return err
return fmt.Errorf("json.Unmarshal %q: %w", b, err)
}
errResp.Status = resp.StatusCode
return errResp

View File

@@ -0,0 +1,86 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package tailscale
import (
"net/url"
"testing"
)
func TestClientBuildURL(t *testing.T) {
c := Client{BaseURL: "http://127.0.0.1:1234"}
for _, tt := range []struct {
desc string
elements []any
want string
}{
{
desc: "single-element",
elements: []any{"devices"},
want: "http://127.0.0.1:1234/api/v2/devices",
},
{
desc: "multiple-elements",
elements: []any{"tailnet", "example.com"},
want: "http://127.0.0.1:1234/api/v2/tailnet/example.com",
},
{
desc: "escape-element",
elements: []any{"tailnet", "example dot com?foo=bar"},
want: `http://127.0.0.1:1234/api/v2/tailnet/example%20dot%20com%3Ffoo=bar`,
},
{
desc: "url.Values",
elements: []any{"tailnet", "example.com", "acl", url.Values{"details": {"1"}}},
want: `http://127.0.0.1:1234/api/v2/tailnet/example.com/acl?details=1`,
},
} {
t.Run(tt.desc, func(t *testing.T) {
got := c.BuildURL(tt.elements...)
if got != tt.want {
t.Errorf("got %q, want %q", got, tt.want)
}
})
}
}
func TestClientBuildTailnetURL(t *testing.T) {
c := Client{
BaseURL: "http://127.0.0.1:1234",
tailnet: "example.com",
}
for _, tt := range []struct {
desc string
elements []any
want string
}{
{
desc: "single-element",
elements: []any{"devices"},
want: "http://127.0.0.1:1234/api/v2/tailnet/example.com/devices",
},
{
desc: "multiple-elements",
elements: []any{"devices", 123},
want: "http://127.0.0.1:1234/api/v2/tailnet/example.com/devices/123",
},
{
desc: "escape-element",
elements: []any{"foo bar?baz=qux"},
want: `http://127.0.0.1:1234/api/v2/tailnet/example.com/foo%20bar%3Fbaz=qux`,
},
{
desc: "url.Values",
elements: []any{"acl", url.Values{"details": {"1"}}},
want: `http://127.0.0.1:1234/api/v2/tailnet/example.com/acl?details=1`,
},
} {
t.Run(tt.desc, func(t *testing.T) {
got := c.BuildTailnetURL(tt.elements...)
if got != tt.want {
t.Errorf("got %q, want %q", got, tt.want)
}
})
}
}

View File

@@ -0,0 +1,48 @@
package main
import (
"encoding/json"
"fmt"
"io"
"github.com/hashicorp/raft"
"tailscale.com/tsconsensus"
)
// fulfil the raft lib functional state machine interface
type fsm ipPool
type fsmSnapshot struct{}
func (f *fsm) Apply(l *raft.Log) interface{} {
var c tsconsensus.Command
if err := json.Unmarshal(l.Data, &c); err != nil {
panic(fmt.Sprintf("failed to unmarshal command: %s", err.Error()))
}
switch c.Name {
case "checkoutAddr":
return f.executeCheckoutAddr(c.Args)
case "markLastUsed":
return f.executeMarkLastUsed(c.Args)
default:
panic(fmt.Sprintf("unrecognized command: %s", c.Name))
}
}
func (f *fsm) Snapshot() (raft.FSMSnapshot, error) {
panic("Snapshot unexpectedly used")
return nil, nil
}
func (f *fsm) Restore(rc io.ReadCloser) error {
panic("Restore unexpectedly used")
return nil
}
func (f *fsmSnapshot) Persist(sink raft.SnapshotSink) error {
panic("Persist unexpectedly used")
return nil
}
func (f *fsmSnapshot) Release() {
panic("Release unexpectedly used")
}

View File

@@ -0,0 +1,278 @@
package main
import (
"context"
"encoding/json"
"errors"
"log"
"net/netip"
"sync"
"time"
"github.com/gaissmai/bart"
"tailscale.com/syncs"
"tailscale.com/tailcfg"
"tailscale.com/tsconsensus"
"tailscale.com/tsnet"
"tailscale.com/util/mak"
)
/*
An ipPool is a group of one or more IPV4 ranges from which individual IPV4 addresses can be
checked out.
natc-consensus provides per domain router functionality for a tailnet.
- when a node does a dns lookup for a domain the natc-consensus handles, natc-consensus asks ipPool for an IP address
for that node and domain. When ipPool
- when a node sends traffic to the IP address it has for a domain, natc-consensus asks ipPool which domain that traffic
is for.
- when an IP address hasn't been used for a while ipPool forgets about that node-ip-domain mapping and may provide
that IP address to that node in response to a subsequent DNS request.
The pool is distributed across servers in a cluster, to provide high availability.
Each tailcfg.NodeID has the full range available. The same IPV4 address will be provided to different nodes.
ipPool will maintain the node-ip-domain mapping until it expires, and won't hand out the IP address to that node
again while it maintains the mapping.
Reading from the pool is fast, writing to the pool is slow. Because reads can be done in memory on the server that got
the traffic, but writes must be sent to the consensus peers.
To handle expiry we write on reads, to update the last-used-date, but we do that after we've returned a response.
ipPool.DomainForIP gets the domain associated with a previous IP checkout for a node
ipPool.IPForDomain gets an IP address for the node+domain. It will return an IP address from any existing mapping,
or it may create a mapping with a new unused IP address.
*/
type ipPool struct {
perPeerMap syncs.Map[tailcfg.NodeID, *perPeerState]
v4Ranges []netip.Prefix
dnsAddr netip.Addr
consensus *tsconsensus.Consensus
}
func (ipp *ipPool) DomainForIP(from tailcfg.NodeID, addr netip.Addr, updatedAt time.Time) string {
// TODO lock
pm, ok := ipp.perPeerMap.Load(from)
if !ok {
log.Printf("DomainForIP: peer state absent for: %d", from)
return ""
}
ww, ok := pm.AddrToDomain.Lookup(addr)
if !ok {
log.Printf("DomainForIP: peer state doesn't recognize domain")
return ""
}
go func() {
err := ipp.markLastUsed(from, addr, ww.Domain, updatedAt)
if err != nil {
panic(err)
}
}()
return ww.Domain
}
type markLastUsedArgs struct {
NodeID tailcfg.NodeID
Addr netip.Addr
Domain string
UpdatedAt time.Time
}
// called by raft
func (cd *fsm) executeMarkLastUsed(bs []byte) tsconsensus.CommandResult {
var args markLastUsedArgs
err := json.Unmarshal(bs, &args)
if err != nil {
return tsconsensus.CommandResult{Err: err}
}
err = cd.applyMarkLastUsed(args.NodeID, args.Addr, args.Domain, args.UpdatedAt)
if err != nil {
return tsconsensus.CommandResult{Err: err}
}
return tsconsensus.CommandResult{}
}
func (ipp *fsm) applyMarkLastUsed(from tailcfg.NodeID, addr netip.Addr, domain string, updatedAt time.Time) error {
// TODO lock
ps, ok := ipp.perPeerMap.Load(from)
if !ok {
// There's nothing to mark. But this is unexpected, because we mark last used after we do things with peer state.
log.Printf("applyMarkLastUsed: could not find peer state, nodeID: %s", from)
return nil
}
ww, ok := ps.AddrToDomain.Lookup(addr)
if !ok {
// The peer state didn't have an entry for the IP address (possibly it expired), so there's nothing to mark.
return nil
}
if ww.Domain != domain {
// The IP address expired and was reused for a new domain. Don't mark.
return nil
}
if ww.LastUsed.After(updatedAt) {
// This has been marked more recently. Don't mark.
return nil
}
ww.LastUsed = updatedAt
ps.AddrToDomain.Insert(netip.PrefixFrom(addr, addr.BitLen()), ww)
return nil
}
func (ipp *ipPool) StartConsensus(ctx context.Context, ts *tsnet.Server, clusterTag string) error {
cns, err := tsconsensus.Start(ctx, ts, (*fsm)(ipp), clusterTag, tsconsensus.DefaultConfig(), true)
if err != nil {
return err
}
ipp.consensus = cns
return nil
}
type whereWhen struct {
Domain string
LastUsed time.Time
}
type perPeerState struct {
DomainToAddr map[string]netip.Addr
AddrToDomain *bart.Table[whereWhen]
mu sync.Mutex // not jsonified
}
func (ps *perPeerState) unusedIPV4(ranges []netip.Prefix, exclude netip.Addr, reuseDeadline time.Time) (netip.Addr, bool, string, error) {
// TODO here we iterate through each ip within the ranges until we find one that's unused
// could be done more efficiently either by:
// 1) storing an index into ranges and an ip we had last used from that range in perPeerState
// (how would this work with checking ips back into the pool though?)
// 2) using a random approach like the natc does now, except the raft state machine needs to
// be deterministic so it can replay logs, so I think we would do something like generate a
// random ip each time, and then have a call into the state machine that says "give me whatever
// ip you have, and if you don't have one use this one". I think that would work.
for _, r := range ranges {
ip := r.Addr()
for r.Contains(ip) {
if ip != exclude {
ww, ok := ps.AddrToDomain.Lookup(ip)
if !ok {
return ip, false, "", nil
}
if ww.LastUsed.Before(reuseDeadline) {
return ip, true, ww.Domain, nil
}
}
ip = ip.Next()
}
}
return netip.Addr{}, false, "", errors.New("ip pool exhausted")
}
func (cd *ipPool) IpForDomain(nid tailcfg.NodeID, domain string) (netip.Addr, error) {
now := time.Now()
args := checkoutAddrArgs{
NodeID: nid,
Domain: domain,
ReuseDeadline: now.Add(-10 * time.Second), // TODO what time period? 48 hours?
UpdatedAt: now,
}
bs, err := json.Marshal(args)
if err != nil {
return netip.Addr{}, err
}
c := tsconsensus.Command{
Name: "checkoutAddr",
Args: bs,
}
result, err := cd.consensus.ExecuteCommand(c)
if err != nil {
log.Printf("IpForDomain: raft error executing command: %v", err)
return netip.Addr{}, err
}
if result.Err != nil {
log.Printf("IpForDomain: error returned from state machine: %v", err)
return netip.Addr{}, result.Err
}
var addr netip.Addr
err = json.Unmarshal(result.Result, &addr)
return addr, err
}
func (cd *ipPool) markLastUsed(nid tailcfg.NodeID, addr netip.Addr, domain string, lastUsed time.Time) error {
args := markLastUsedArgs{
NodeID: nid,
Addr: addr,
Domain: domain,
UpdatedAt: lastUsed,
}
bs, err := json.Marshal(args)
if err != nil {
return err
}
//c := command{
c := tsconsensus.Command{
Name: "markLastUsed",
Args: bs,
}
result, err := cd.consensus.ExecuteCommand(c)
if err != nil {
log.Printf("markLastUsed: raft error executing command: %v", err)
return err
}
if result.Err != nil {
log.Printf("markLastUsed: error returned from state machine: %v", err)
return result.Err
}
return nil
}
type checkoutAddrArgs struct {
NodeID tailcfg.NodeID
Domain string
ReuseDeadline time.Time
UpdatedAt time.Time
}
// called by raft
func (cd *fsm) executeCheckoutAddr(bs []byte) tsconsensus.CommandResult {
var args checkoutAddrArgs
err := json.Unmarshal(bs, &args)
if err != nil {
return tsconsensus.CommandResult{Err: err}
}
addr, err := cd.applyCheckoutAddr(args.NodeID, args.Domain, args.ReuseDeadline, args.UpdatedAt)
if err != nil {
return tsconsensus.CommandResult{Err: err}
}
resultBs, err := json.Marshal(addr)
if err != nil {
return tsconsensus.CommandResult{Err: err}
}
return tsconsensus.CommandResult{Result: resultBs}
}
func (cd *fsm) applyCheckoutAddr(nid tailcfg.NodeID, domain string, reuseDeadline, updatedAt time.Time) (netip.Addr, error) {
// TODO lock and unlock
pm, _ := cd.perPeerMap.LoadOrStore(nid, &perPeerState{
AddrToDomain: &bart.Table[whereWhen]{},
})
if existing, ok := pm.DomainToAddr[domain]; ok {
// TODO handle error case where this doesn't exist
ww, _ := pm.AddrToDomain.Lookup(existing)
ww.LastUsed = updatedAt
pm.AddrToDomain.Insert(netip.PrefixFrom(existing, existing.BitLen()), ww)
return existing, nil
}
addr, wasInUse, previousDomain, err := pm.unusedIPV4(cd.v4Ranges, cd.dnsAddr, reuseDeadline)
if err != nil {
return netip.Addr{}, err
}
mak.Set(&pm.DomainToAddr, domain, addr)
if wasInUse {
// remove it from domaintoaddr
delete(pm.DomainToAddr, previousDomain)
// don't need to remove it from addrtodomain, insert will do that
}
pm.AddrToDomain.Insert(netip.PrefixFrom(addr, addr.BitLen()), whereWhen{Domain: domain, LastUsed: updatedAt})
return addr, nil
}

View File

@@ -0,0 +1,100 @@
package main
import (
"net/netip"
"testing"
"time"
"tailscale.com/tailcfg"
)
func TestV6V4(t *testing.T) {
c := connector{
v6ULA: ula(uint16(1)),
}
tests := [][]string{
[]string{"100.64.0.0", "fd7a:115c:a1e0:a99c:1:0:6440:0"},
[]string{"0.0.0.0", "fd7a:115c:a1e0:a99c:1::"},
[]string{"255.255.255.255", "fd7a:115c:a1e0:a99c:1:0:ffff:ffff"},
}
for i, test := range tests {
// to v6
v6 := c.v6ForV4(netip.MustParseAddr(test[0]))
want := netip.MustParseAddr(test[1])
if v6 != want {
t.Fatalf("test %d: want: %v, got: %v", i, want, v6)
}
// to v4
v4 := v4ForV6(netip.MustParseAddr(test[1]))
want = netip.MustParseAddr(test[0])
if v4 != want {
t.Fatalf("test %d: want: %v, got: %v", i, want, v4)
}
}
}
func TestIPForDomain(t *testing.T) {
pfx := netip.MustParsePrefix("100.64.0.0/16")
ipp := fsm{
v4Ranges: []netip.Prefix{pfx},
dnsAddr: netip.MustParseAddr("100.64.0.0"),
}
now := time.Now()
deadline := now.Add(-2 * time.Hour)
a, err := ipp.applyCheckoutAddr(tailcfg.NodeID(1), "example.com", deadline, now)
if err != nil {
t.Fatal(err)
}
if !pfx.Contains(a) {
t.Fatalf("expected %v to be in the prefix %v", a, pfx)
}
b, err := ipp.applyCheckoutAddr(tailcfg.NodeID(1), "a.example.com", deadline, now)
if err != nil {
t.Fatal(err)
}
if !pfx.Contains(b) {
t.Fatalf("expected %v to be in the prefix %v", b, pfx)
}
if b == a {
t.Fatalf("same address issued twice %v, %v", a, b)
}
c, err := ipp.applyCheckoutAddr(tailcfg.NodeID(1), "example.com", deadline, now)
if err != nil {
t.Fatal(err)
}
if c != a {
t.Fatalf("expected %v to be remembered as the addr for example.com, but got %v", a, c)
}
}
func TestDomainForIP(t *testing.T) {
pfx := netip.MustParsePrefix("100.64.0.0/16")
sm := fsm{
v4Ranges: []netip.Prefix{pfx},
dnsAddr: netip.MustParseAddr("100.64.0.0"),
}
ipp := (*ipPool)(&sm)
nid := tailcfg.NodeID(1)
domain := "example.com"
now := time.Now()
deadline := now.Add(-2 * time.Hour)
d := ipp.DomainForIP(nid, netip.MustParseAddr("100.64.0.1"), now)
if d != "" {
t.Fatalf("expected an empty string if the addr is not found but got %s", d)
}
a, err := sm.applyCheckoutAddr(nid, domain, deadline, now)
if err != nil {
t.Fatal(err)
}
d2 := ipp.DomainForIP(nid, a, now)
if d2 != domain {
t.Fatalf("expected %s but got %s", domain, d2)
}
}

504
cmd/natc-consensus/natc.go Normal file
View File

@@ -0,0 +1,504 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
// The natc command is a work-in-progress implementation of a NAT based
// connector for Tailscale. It is intended to be used to route traffic to a
// specific domain through a specific node.
package main
import (
"context"
"errors"
"flag"
"fmt"
"log"
"net"
"net/http"
"net/netip"
"os"
"strings"
"time"
"github.com/gaissmai/bart"
"github.com/inetaf/tcpproxy"
"github.com/peterbourgon/ff/v3"
"golang.org/x/net/dns/dnsmessage"
"tailscale.com/client/tailscale"
"tailscale.com/envknob"
"tailscale.com/hostinfo"
"tailscale.com/ipn"
"tailscale.com/net/netutil"
"tailscale.com/tailcfg"
"tailscale.com/tsnet"
"tailscale.com/tsweb"
)
func main() {
hostinfo.SetApp("natc")
if !envknob.UseWIPCode() {
log.Fatal("cmd/natc-consensus is a work in progress and has not been security reviewed;\nits use requires TAILSCALE_USE_WIP_CODE=1 be set in the environment for now.")
}
// Parse flags
fs := flag.NewFlagSet("natc", flag.ExitOnError)
var (
debugPort = fs.Int("debug-port", 8893, "Listening port for debug/metrics endpoint")
hostname = fs.String("hostname", "", "Hostname to register the service under")
siteID = fs.Uint("site-id", 1, "an integer site ID to use for the ULA prefix which allows for multiple proxies to act in a HA configuration")
v4PfxStr = fs.String("v4-pfx", "100.64.1.0/24", "comma-separated list of IPv4 prefixes to advertise")
verboseTSNet = fs.Bool("verbose-tsnet", false, "enable verbose logging in tsnet")
printULA = fs.Bool("print-ula", false, "print the ULA prefix and exit")
ignoreDstPfxStr = fs.String("ignore-destinations", "", "comma-separated list of prefixes to ignore")
wgPort = fs.Uint("wg-port", 0, "udp port for wireguard and peer to peer traffic")
clusterTag = fs.String("cluster-tag", "", "TODO")
)
ff.Parse(fs, os.Args[1:], ff.WithEnvVarPrefix("TS_NATC"))
if *printULA {
fmt.Println(ula(uint16(*siteID)))
return
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if *siteID == 0 {
log.Fatalf("site-id must be set")
} else if *siteID > 0xffff {
log.Fatalf("site-id must be in the range [0, 65535]")
}
var ignoreDstTable *bart.Table[bool]
for _, s := range strings.Split(*ignoreDstPfxStr, ",") {
s := strings.TrimSpace(s)
if s == "" {
continue
}
if ignoreDstTable == nil {
ignoreDstTable = &bart.Table[bool]{}
}
pfx, err := netip.ParsePrefix(s)
if err != nil {
log.Fatalf("unable to parse prefix: %v", err)
}
if pfx.Masked() != pfx {
log.Fatalf("prefix %v is not normalized (bits are set outside the mask)", pfx)
}
ignoreDstTable.Insert(pfx, true)
}
var v4Prefixes []netip.Prefix
for _, s := range strings.Split(*v4PfxStr, ",") {
p := netip.MustParsePrefix(strings.TrimSpace(s))
if p.Masked() != p {
log.Fatalf("v4 prefix %v is not a masked prefix", p)
}
v4Prefixes = append(v4Prefixes, p)
}
if len(v4Prefixes) == 0 {
log.Fatalf("no v4 prefixes specified")
}
dnsAddr := v4Prefixes[0].Addr()
ts := &tsnet.Server{
Hostname: *hostname,
}
ts.ControlURL = "http://host.docker.internal:31544" // TODO
if *wgPort != 0 {
if *wgPort >= 1<<16 {
log.Fatalf("wg-port must be in the range [0, 65535]")
}
ts.Port = uint16(*wgPort)
}
defer ts.Close()
if *verboseTSNet {
ts.Logf = log.Printf
}
// Start special-purpose listeners: dns, http promotion, debug server
if *debugPort != 0 {
mux := http.NewServeMux()
tsweb.Debugger(mux)
dln, err := ts.Listen("tcp", fmt.Sprintf(":%d", *debugPort))
if err != nil {
log.Fatalf("failed listening on debug port: %v", err)
}
defer dln.Close()
go func() {
log.Fatalf("debug serve: %v", http.Serve(dln, mux))
}()
}
lc, err := ts.LocalClient()
if err != nil {
log.Fatalf("LocalClient() failed: %v", err)
}
if _, err := ts.Up(ctx); err != nil {
log.Fatalf("ts.Up: %v", err)
}
ipp := ipPool{
v4Ranges: v4Prefixes,
dnsAddr: dnsAddr,
}
err = ipp.StartConsensus(ctx, ts, *clusterTag)
if err != nil {
log.Fatalf("StartConsensus: %v", err)
}
defer ipp.consensus.Stop(ctx)
c := &connector{
ts: ts,
lc: lc,
dnsAddr: dnsAddr,
v4Ranges: v4Prefixes,
v6ULA: ula(uint16(*siteID)),
ignoreDsts: ignoreDstTable,
ipAddrs: &ipp,
}
c.run(ctx)
}
type connector struct {
// ts is the tsnet.Server used to host the connector.
ts *tsnet.Server
// lc is the LocalClient used to interact with the tsnet.Server hosting this
// connector.
lc *tailscale.LocalClient
// dnsAddr is the IPv4 address to listen on for DNS requests. It is used to
// prevent the app connector from assigning it to a domain.
dnsAddr netip.Addr
// v4Ranges is the list of IPv4 ranges to advertise and assign addresses from.
// These are masked prefixes.
v4Ranges []netip.Prefix
// v6ULA is the ULA prefix used by the app connector to assign IPv6 addresses.
v6ULA netip.Prefix
ipAddrs *ipPool
// ignoreDsts is initialized at start up with the contents of --ignore-destinations (if none it is nil)
// It is never mutated, only used for lookups.
// Users who want to natc a DNS wildcard but not every address record in that domain can supply the
// exceptions in --ignore-destinations. When we receive a dns request we will look up the fqdn
// and if any of the ip addresses in response to the lookup match any 'ignore destinations' prefix we will
// return a dns response that contains the ip addresses we discovered with the lookup (ie not the
// natc behavior, which would return a dummy ip address pointing at natc).
ignoreDsts *bart.Table[bool]
}
// v6ULA is the ULA prefix used by the app connector to assign IPv6 addresses.
// The 8th and 9th bytes are used to encode the site ID which allows for
// multiple proxies to act in a HA configuration.
// mnemonic: a99c = appc
var v6ULA = netip.MustParsePrefix("fd7a:115c:a1e0:a99c::/64")
func ula(siteID uint16) netip.Prefix {
as16 := v6ULA.Addr().As16()
as16[8] = byte(siteID >> 8)
as16[9] = byte(siteID)
return netip.PrefixFrom(netip.AddrFrom16(as16), 64+16)
}
// run runs the connector.
//
// The passed in context is only used for the initial setup. The connector runs
// forever.
func (c *connector) run(ctx context.Context) {
if _, err := c.lc.EditPrefs(ctx, &ipn.MaskedPrefs{
AdvertiseRoutesSet: true,
Prefs: ipn.Prefs{
AdvertiseRoutes: append(c.v4Ranges, c.v6ULA),
},
}); err != nil {
log.Fatalf("failed to advertise routes: %v", err)
}
c.ts.RegisterFallbackTCPHandler(c.handleTCPFlow)
c.serveDNS()
}
func (c *connector) serveDNS() {
pc, err := c.ts.ListenPacket("udp", net.JoinHostPort(c.dnsAddr.String(), "53"))
if err != nil {
log.Fatalf("failed listening on port 53: %v", err)
}
defer pc.Close()
log.Printf("Listening for DNS on %s", pc.LocalAddr().String())
for {
buf := make([]byte, 1500)
n, addr, err := pc.ReadFrom(buf)
if err != nil {
if errors.Is(err, net.ErrClosed) {
return
}
log.Printf("serveDNS.ReadFrom failed: %v", err)
continue
}
go c.handleDNS(pc, buf[:n], addr.(*net.UDPAddr))
}
}
func lookupDestinationIP(domain string) ([]netip.Addr, error) {
netIPs, err := net.LookupIP(domain)
if err != nil {
var dnsError *net.DNSError
if errors.As(err, &dnsError) && dnsError.IsNotFound {
return nil, nil
} else {
return nil, err
}
}
var addrs []netip.Addr
for _, ip := range netIPs {
a, ok := netip.AddrFromSlice(ip)
if ok {
addrs = append(addrs, a)
}
}
return addrs, nil
}
// handleDNS handles a DNS request to the app connector.
// It generates a response based on the request and the node that sent it.
//
// Each node is assigned a unique pair of IP addresses for each domain it
// queries. This assignment is done lazily and is not persisted across restarts.
// A per-peer assignment allows the connector to reuse a limited number of IP
// addresses across multiple nodes and domains. It also allows for clear
// failover behavior when an app connector is restarted.
//
// This assignment later allows the connector to determine where to forward
// traffic based on the destination IP address.
func (c *connector) handleDNS(pc net.PacketConn, buf []byte, remoteAddr *net.UDPAddr) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
who, err := c.lc.WhoIs(ctx, remoteAddr.String())
if err != nil {
log.Printf("HandleDNS: WhoIs failed: %v\n", err)
return
}
var msg dnsmessage.Message
err = msg.Unpack(buf)
if err != nil {
log.Printf("HandleDNS: dnsmessage unpack failed: %v\n ", err)
return
}
// If there are destination ips that we don't want to route, we
// have to do a dns lookup here to find the destination ip.
if c.ignoreDsts != nil {
if len(msg.Questions) > 0 {
q := msg.Questions[0]
switch q.Type {
case dnsmessage.TypeAAAA, dnsmessage.TypeA:
dstAddrs, err := lookupDestinationIP(q.Name.String())
if err != nil {
log.Printf("HandleDNS: lookup destination failed: %v\n ", err)
return
}
if c.ignoreDestination(dstAddrs) {
bs, err := dnsResponse(&msg, dstAddrs)
// TODO (fran): treat as SERVFAIL
if err != nil {
log.Printf("HandleDNS: generate ignore response failed: %v\n", err)
return
}
_, err = pc.WriteTo(bs, remoteAddr)
if err != nil {
log.Printf("HandleDNS: write failed: %v\n", err)
}
return
}
}
}
}
// None of the destination IP addresses match an ignore destination prefix, do
// the natc thing.
resp, err := c.generateDNSResponse(&msg, who.Node.ID)
// TODO (fran): treat as SERVFAIL
if err != nil {
log.Printf("HandleDNS: connector handling failed: %v\n", err)
return
}
// TODO (fran): treat as NXDOMAIN
if len(resp) == 0 {
return
}
// This connector handled the DNS request
_, err = pc.WriteTo(resp, remoteAddr)
if err != nil {
log.Printf("HandleDNS: write failed: %v\n", err)
}
}
// tsMBox is the mailbox used in SOA records.
// The convention is to replace the @ symbol with a dot.
// So in this case, the mailbox is support.tailscale.com. with the trailing dot
// to indicate that it is a fully qualified domain name.
var tsMBox = dnsmessage.MustNewName("support.tailscale.com.")
// generateDNSResponse generates a DNS response for the given request. The from
// argument is the NodeID of the node that sent the request.
func (c *connector) generateDNSResponse(req *dnsmessage.Message, from tailcfg.NodeID) ([]byte, error) {
var addrs []netip.Addr
if len(req.Questions) > 0 {
switch req.Questions[0].Type {
case dnsmessage.TypeAAAA, dnsmessage.TypeA:
v4, err := c.ipAddrs.IpForDomain(from, req.Questions[0].Name.String())
if err != nil {
return nil, err
}
addrs = []netip.Addr{v4, c.v6ForV4(v4)}
}
}
return dnsResponse(req, addrs)
}
// dnsResponse makes a DNS response for the natc. If the dnsmessage is requesting TypeAAAA
// or TypeA the provided addrs of the requested type will be used.
func dnsResponse(req *dnsmessage.Message, addrs []netip.Addr) ([]byte, error) {
b := dnsmessage.NewBuilder(nil,
dnsmessage.Header{
ID: req.Header.ID,
Response: true,
Authoritative: true,
})
b.EnableCompression()
if len(req.Questions) == 0 {
return b.Finish()
}
q := req.Questions[0]
if err := b.StartQuestions(); err != nil {
return nil, err
}
if err := b.Question(q); err != nil {
return nil, err
}
if err := b.StartAnswers(); err != nil {
return nil, err
}
switch q.Type {
case dnsmessage.TypeAAAA, dnsmessage.TypeA:
want6 := q.Type == dnsmessage.TypeAAAA
for _, ip := range addrs {
if want6 != ip.Is6() {
continue
}
if want6 {
if err := b.AAAAResource(
dnsmessage.ResourceHeader{Name: q.Name, Class: q.Class, TTL: 5},
dnsmessage.AAAAResource{AAAA: ip.As16()},
); err != nil {
return nil, err
}
} else {
if err := b.AResource(
dnsmessage.ResourceHeader{Name: q.Name, Class: q.Class, TTL: 5},
dnsmessage.AResource{A: ip.As4()},
); err != nil {
return nil, err
}
}
}
case dnsmessage.TypeSOA:
if err := b.SOAResource(
dnsmessage.ResourceHeader{Name: q.Name, Class: q.Class, TTL: 120},
dnsmessage.SOAResource{NS: q.Name, MBox: tsMBox, Serial: 2023030600,
Refresh: 120, Retry: 120, Expire: 120, MinTTL: 60},
); err != nil {
return nil, err
}
case dnsmessage.TypeNS:
if err := b.NSResource(
dnsmessage.ResourceHeader{Name: q.Name, Class: q.Class, TTL: 120},
dnsmessage.NSResource{NS: tsMBox},
); err != nil {
return nil, err
}
}
return b.Finish()
}
// handleTCPFlow handles a TCP flow from the given source to the given
// destination. It uses the source address to determine the node that sent the
// request and the destination address to determine the domain that the request
// is for based on the IP address assigned to the destination in the DNS
// response.
func (c *connector) handleTCPFlow(src, dst netip.AddrPort) (handler func(net.Conn), intercept bool) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
who, err := c.lc.WhoIs(ctx, src.Addr().String())
cancel()
if err != nil {
log.Printf("HandleTCPFlow: WhoIs failed: %v\n", err)
return nil, false
}
from := who.Node.ID
dstAddr := dst.Addr()
if dstAddr.Is6() {
dstAddr = v4ForV6(dstAddr)
}
domain := c.ipAddrs.DomainForIP(from, dstAddr, time.Now())
if domain == "" {
log.Print("handleTCPFlow: found no domain")
return nil, false
}
return func(conn net.Conn) {
proxyTCPConn(conn, domain)
}, true
}
// ignoreDestination reports whether any of the provided dstAddrs match the prefixes configured
// in --ignore-destinations
func (c *connector) ignoreDestination(dstAddrs []netip.Addr) bool {
for _, a := range dstAddrs {
if _, ok := c.ignoreDsts.Lookup(a); ok {
return true
}
}
return false
}
func proxyTCPConn(c net.Conn, dest string) {
if c.RemoteAddr() == nil {
log.Printf("proxyTCPConn: nil RemoteAddr")
c.Close()
return
}
addrPortStr := c.LocalAddr().String()
_, port, err := net.SplitHostPort(addrPortStr)
if err != nil {
log.Printf("tcpRoundRobinHandler.Handle: bogus addrPort %q", addrPortStr)
c.Close()
return
}
p := &tcpproxy.Proxy{
ListenFunc: func(net, laddr string) (net.Listener, error) {
return netutil.NewOneConnListener(c, nil), nil
},
}
p.AddRoute(addrPortStr, &tcpproxy.DialProxy{
Addr: fmt.Sprintf("%s:%s", dest, port),
})
p.Start()
}
func (c *connector) v6ForV4(v4 netip.Addr) netip.Addr {
as16 := c.v6ULA.Addr().As16()
as4 := v4.As4()
copy(as16[12:], as4[:])
v6 := netip.AddrFrom16(as16)
return v6
}
func v4ForV6(v6 netip.Addr) netip.Addr {
as16 := v6.As16()
var as4 [4]byte
copy(as4[:], as16[12:])
v4 := netip.AddrFrom4(as4)
return v4
}

7
go.mod
View File

@@ -45,6 +45,7 @@ require (
github.com/google/nftables v0.2.1-0.20240414091927-5e242ec57806
github.com/google/uuid v1.6.0
github.com/goreleaser/nfpm/v2 v2.33.1
github.com/hashicorp/raft v1.7.2
github.com/hdevalence/ed25519consensus v0.2.0
github.com/illarion/gonotify/v3 v3.0.2
github.com/inetaf/tcpproxy v0.0.0-20250203165043-ded522cbd03f
@@ -127,6 +128,7 @@ require (
github.com/OpenPeeDeeP/depguard/v2 v2.2.0 // indirect
github.com/alecthomas/go-check-sumtype v0.1.4 // indirect
github.com/alexkohler/nakedret/v2 v2.0.4 // indirect
github.com/armon/go-metrics v0.4.1 // indirect
github.com/bombsimon/wsl/v4 v4.2.1 // indirect
github.com/butuzov/mirror v1.1.0 // indirect
github.com/catenacyber/perfsprint v0.7.1 // indirect
@@ -145,6 +147,11 @@ require (
github.com/golangci/plugin-module-register v0.1.1 // indirect
github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49 // indirect
github.com/gorilla/securecookie v1.1.2 // indirect
github.com/hashicorp/go-hclog v1.6.2 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/go-metrics v0.5.4 // indirect
github.com/hashicorp/go-msgpack/v2 v2.1.2 // indirect
github.com/hashicorp/golang-lru v0.6.0 // indirect
github.com/jjti/go-spancheck v0.5.3 // indirect
github.com/karamaru-alpha/copyloopvar v1.0.8 // indirect
github.com/macabu/inamedparam v0.1.3 // indirect

44
go.sum
View File

@@ -61,8 +61,9 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/toml v1.4.1-0.20240526193622-a339e1f7089c h1:pxW6RcqyfI9/kWtOwnv/G+AzdKuy2ZrqINhenH4HyNs=
github.com/BurntSushi/toml v1.4.1-0.20240526193622-a339e1f7089c/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ=
github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/DataDog/zstd v1.5.2 h1:vUG4lAyuPCXO0TLbXvPv7EB7cNK1QV/luu55UHLrrn8=
github.com/DataDog/zstd v1.5.2/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
github.com/Djarvur/go-err113 v0.1.0 h1:uCRZZOdMQ0TZPHYTdYpoC0bLYJKPEHPUJ8MeAa51lNU=
github.com/Djarvur/go-err113 v0.1.0/go.mod h1:4UJr5HIiMZrwgkSPdsjy2uOQExX/WEILpIrO9UPGuXs=
github.com/GaijinEntertainment/go-exhaustruct/v3 v3.2.0 h1:sATXp1x6/axKxz2Gjxv8MALP0bXaNRfQinEwyfMcx8c=
@@ -114,6 +115,8 @@ github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1
github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8=
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4=
github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJA=
github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+3JqfkOG4=
github.com/armon/go-proxyproto v0.0.0-20210323213023-7e956b284f0a/go.mod h1:QmP9hvJ91BbJmGVGSbutW19IC0Q9phDCLGaomwTJbgU=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
@@ -212,6 +215,8 @@ github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5P
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/cilium/ebpf v0.15.0 h1:7NxJhNiBT3NG8pZJ3c+yfrVdHY8ScgKD27sScgjLMMk=
github.com/cilium/ebpf v0.15.0/go.mod h1:DHp1WyrLeiBh19Cf/tfiSMhqheEiK8fXFZ4No0P1Hso=
github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag=
github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I=
github.com/ckaznocha/intrange v0.1.0 h1:ZiGBhvrdsKpoEfzh9CjBfDSZof6QB0ORY5tXasUtiew=
github.com/ckaznocha/intrange v0.1.0/go.mod h1:Vwa9Ekex2BrEQMg6zlrWwbs/FtYw7eS5838Q7UjK7TQ=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
@@ -288,6 +293,7 @@ github.com/evanphx/json-patch/v5 v5.9.0 h1:kcBlZQbplgElYIlo/n1hJbls2z/1awpXxpRi0
github.com/evanphx/json-patch/v5 v5.9.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ=
github.com/evanw/esbuild v0.19.11 h1:mbPO1VJ/df//jjUd+p/nRLYCpizXxXb2w/zZMShxa2k=
github.com/evanw/esbuild v0.19.11/go.mod h1:D2vIQZqV/vIf/VRHtViaUtViZmG7o+kKmlBfVQuRi48=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM=
github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU=
github.com/fatih/structtag v1.2.0 h1:/OdNE99OxoI/PqaW/SuSK9uxxT3f/tcSZgon/ssNSx4=
@@ -527,13 +533,30 @@ github.com/gostaticanalysis/testutil v0.4.0/go.mod h1:bLIoPefWXrRi/ssLFWX1dx7Rep
github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k=
github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-hclog v1.6.2 h1:NOtoftovWkDheyUM/8JW3QMiXyxJK3uHRK7wV04nD2I=
github.com/hashicorp/go-hclog v1.6.2/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M=
github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-immutable-radix v1.3.1 h1:DKHmCUm2hRBK510BaiZlwvpD40f8bJFeZnpfm2KLowc=
github.com/hashicorp/go-immutable-radix v1.3.1/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-metrics v0.5.4 h1:8mmPiIJkTPPEbAiV97IxdAGNdRdaWwVap1BU6elejKY=
github.com/hashicorp/go-metrics v0.5.4/go.mod h1:CG5yz4NZ/AI/aQt9Ucm/vdBnbh7fvmv4lxZ350i+QQI=
github.com/hashicorp/go-msgpack/v2 v2.1.2 h1:4Ee8FTp834e+ewB71RDrQ0VKpyFdrKOjvYtnQ/ltVj0=
github.com/hashicorp/go-msgpack/v2 v2.1.2/go.mod h1:upybraOAblm4S7rx0+jeNy+CWWhzywQsSRV5033mMu4=
github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs=
github.com/hashicorp/go-uuid v1.0.0 h1:RS8zrF7PhGwyNPOtxSClXXj9HA8feRnJzgnI1RJCSnM=
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-version v1.2.1/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek=
github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.6.0 h1:uL2shRDx7RTrOrTCUZEGP/wJUFiUI8QT6E7z5o8jga4=
github.com/hashicorp/golang-lru v0.6.0/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hashicorp/raft v1.7.2 h1:pyvxhfJ4R8VIAlHKvLoKQWElZspsCVT6YWuxVxsPAgc=
github.com/hashicorp/raft v1.7.2/go.mod h1:DfvCGFxpAUPE0L4Uc8JLlTPtc3GzSbdH0MTJCLgnmJQ=
github.com/hdevalence/ed25519consensus v0.2.0 h1:37ICyZqdyj0lAZ8P4D1d1id3HqbbG1N3iBb1Tb4rdcU=
github.com/hdevalence/ed25519consensus v0.2.0/go.mod h1:w3BHWjwJbFU29IRHL1Iqkw3sus+7FctEyM4RqDxYNzo=
github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM=
@@ -578,6 +601,7 @@ github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX
github.com/jsimonetti/rtnetlink v1.4.0 h1:Z1BF0fRgcETPEa0Kt0MRk3yV5+kF1FWTni6KUFKrq2I=
github.com/jsimonetti/rtnetlink v1.4.0/go.mod h1:5W1jDvWdnthFJ7fxYX1GMK07BUpI4oskfOqvPteYS6E=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
@@ -650,8 +674,12 @@ github.com/matoous/godox v0.0.0-20230222163458-006bad1f9d26 h1:gWg6ZQ4JhDfJPqlo2
github.com/matoous/godox v0.0.0-20230222163458-006bad1f9d26/go.mod h1:1BELzlh859Sh1c6+90blK8lbYy0kwQf1bYlBhBysy1s=
github.com/matryer/is v1.4.0 h1:sosSmIWwkYITGrxZ25ULNDeKiMNzFSr4V/eqBQP0PeE=
github.com/matryer/is v1.4.0/go.mod h1:8I/i5uYgLzgsgEloJE1U6xx5HkBQpAZvepWuujKwMRU=
github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
@@ -733,6 +761,8 @@ github.com/otiai10/curr v0.0.0-20150429015615-9b4961190c95/go.mod h1:9qAhocn7zKJ
github.com/otiai10/curr v1.0.0/go.mod h1:LskTG5wDwr8Rs+nNQ+1LlxRjAtTZZjtJW4rMXl6j4vs=
github.com/otiai10/mint v1.3.0/go.mod h1:F5AjcsTsWUqX+Na9fpHb52P8pcRX2CI6A3ctIT91xUo=
github.com/otiai10/mint v1.3.1/go.mod h1:/yxELlJQ0ufhjUwhshSj+wFjZ78CnZ48/1wtmBH1OTc=
github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pelletier/go-toml/v2 v2.2.0 h1:QLgLl2yMN7N+ruc31VynXs1vhMZa7CeHHejIeBAsoHo=
github.com/pelletier/go-toml/v2 v2.2.0/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/peterbourgon/ff/v3 v3.4.0 h1:QBvM/rizZM1cB0p0lGMdmR7HxZeI/ZrBWB4DqLkMUBc=
@@ -761,8 +791,10 @@ github.com/prometheus-community/pro-bing v0.4.0 h1:YMbv+i08gQz97OZZBwLyvmmQEEzyf
github.com/prometheus-community/pro-bing v0.4.0/go.mod h1:b7wRYZtCcPmt4Sz319BykUU241rWLe1VFXyiyWK/dH4=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.4.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.12.1/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY=
github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE=
github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho=
@@ -773,6 +805,7 @@ github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6T
github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E=
github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4=
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc=
github.com/prometheus/common v0.32.1/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls=
@@ -780,6 +813,7 @@ github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G
github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
@@ -880,6 +914,7 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
@@ -946,6 +981,7 @@ github.com/tommy-muehle/go-mnd/v2 v2.5.1 h1:NowYhSdyE/1zwK9QCLeRb6USWdoif80Ie+v+
github.com/tommy-muehle/go-mnd/v2 v2.5.1/go.mod h1:WsUAkMJMYww6l/ufffCD3m+P7LEvr8TnZn9lwVDlgzw=
github.com/toqueteos/webbrowser v1.2.0 h1:tVP/gpK69Fx+qMJKsLE7TD8LuGWPnEV71wBN9rrstGQ=
github.com/toqueteos/webbrowser v1.2.0/go.mod h1:XWoZq4cyp9WeUeak7w7LXRUQf1F1ATJMir8RTqb4ayM=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/u-root/gobusybox/src v0.0.0-20231228173702-b69f654846aa h1:unMPGGK/CRzfg923allsikmvk2l7beBeFPUNC4RVX/8=
github.com/u-root/gobusybox/src v0.0.0-20231228173702-b69f654846aa/go.mod h1:Zj4Tt22fJVn/nz/y6Ergm1SahR9dio1Zm/D2/S0TmXM=
github.com/u-root/u-root v0.12.0 h1:K0AuBFriwr0w/PGS3HawiAw89e3+MU7ks80GpghAsNs=
@@ -1174,6 +1210,7 @@ golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -1199,9 +1236,12 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211105183446-c75c47738b0c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220702020025-31831981b65f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

View File

@@ -186,6 +186,12 @@ main() {
VERSION="$DEBIAN_CODENAME"
fi
;;
sparky)
OS="debian"
PACKAGETYPE="apt"
VERSION="$DEBIAN_CODENAME"
APT_KEY_TYPE="keyring"
;;
centos)
OS="$ID"
VERSION="$VERSION_ID"

View File

@@ -0,0 +1,145 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package tsconsensus
import (
"context"
"errors"
"net/netip"
"slices"
"sync"
"time"
"tailscale.com/ipn"
"tailscale.com/ipn/ipnstate"
"tailscale.com/tsnet"
"tailscale.com/types/views"
"tailscale.com/util/set"
)
type statusGetter interface {
getStatus(context.Context) (*ipnstate.Status, error)
}
type tailscaleStatusGetter struct {
ts *tsnet.Server
mu sync.Mutex // protects the following
lastStatus *ipnstate.Status
lastStatusTime time.Time
}
func (sg *tailscaleStatusGetter) fetchStatus(ctx context.Context) (*ipnstate.Status, error) {
lc, err := sg.ts.LocalClient()
if err != nil {
return nil, err
}
return lc.Status(ctx)
}
func (sg *tailscaleStatusGetter) getStatus(ctx context.Context) (*ipnstate.Status, error) {
sg.mu.Lock()
defer sg.mu.Unlock()
if sg.lastStatus != nil && time.Since(sg.lastStatusTime) < 1*time.Second {
return sg.lastStatus, nil
}
status, err := sg.fetchStatus(ctx)
if err != nil {
return nil, err
}
sg.lastStatus = status
sg.lastStatusTime = time.Now()
return status, nil
}
type authorization struct {
sg statusGetter
tag string
mu sync.Mutex
peers *peers // protected by mu
}
func newAuthorization(ts *tsnet.Server, tag string) *authorization {
return &authorization{
sg: &tailscaleStatusGetter{
ts: ts,
},
tag: tag,
}
}
func (a *authorization) Refresh(ctx context.Context) error {
tStatus, err := a.sg.getStatus(ctx)
if err != nil {
return err
}
if tStatus == nil {
return errors.New("no status")
}
if tStatus.BackendState != ipn.Running.String() {
return errors.New("ts Server is not running")
}
a.mu.Lock()
defer a.mu.Unlock()
a.peers = newPeers(tStatus, a.tag)
return nil
}
func (a *authorization) AllowsHost(addr netip.Addr) bool {
if a.peers == nil {
return false
}
a.mu.Lock()
defer a.mu.Unlock()
return a.peers.peerExists(addr, a.tag)
}
func (a *authorization) SelfAllowed() bool {
if a.peers == nil {
return false
}
a.mu.Lock()
defer a.mu.Unlock()
return a.peers.status.Self.Tags != nil && slices.Contains(a.peers.status.Self.Tags.AsSlice(), a.tag)
}
func (a *authorization) AllowedPeers() views.Slice[*ipnstate.PeerStatus] {
if a.peers == nil {
return views.SliceOf([]*ipnstate.PeerStatus{})
}
a.mu.Lock()
defer a.mu.Unlock()
return views.SliceOf(a.peers.allowedPeers)
}
type peers struct {
status *ipnstate.Status
allowedRemoteAddrs set.Set[netip.Addr]
allowedPeers []*ipnstate.PeerStatus
}
func (ps *peers) peerExists(a netip.Addr, tag string) bool {
return ps.allowedRemoteAddrs.Contains(a)
}
func newPeers(status *ipnstate.Status, tag string) *peers {
ps := &peers{
status: status,
allowedRemoteAddrs: set.Set[netip.Addr]{},
}
for _, p := range status.Peer {
if p.Tags != nil && p.Tags.ContainsFunc(func(s string) bool {
return s == tag
}) {
ps.allowedPeers = append(ps.allowedPeers, p)
//for _, addr := range p.TailscaleIPs {
for _, pfx := range p.AllowedIPs.All() { // TODO not this! switch back
addr := pfx.Addr()
ps.allowedRemoteAddrs.Add(addr)
}
}
}
return ps
}

View File

@@ -0,0 +1,174 @@
package tsconsensus
import (
"context"
"net/netip"
"testing"
"tailscale.com/ipn"
"tailscale.com/ipn/ipnstate"
"tailscale.com/types/key"
"tailscale.com/types/views"
)
type testStatusGetter struct {
status *ipnstate.Status
}
func (sg testStatusGetter) getStatus(ctx context.Context) (*ipnstate.Status, error) {
return sg.status, nil
}
const testTag string = "tag:clusterTag"
func authForStatus(s *ipnstate.Status) *authorization {
return &authorization{
sg: testStatusGetter{
status: s,
},
tag: testTag,
}
}
func addrsForIndex(i int) []netip.Addr {
return []netip.Addr{
netip.AddrFrom4([4]byte{100, 0, 0, byte(i)}),
netip.AddrFrom4([4]byte{100, 0, 1, byte(i)}),
}
}
func statusForTags(self []string, peers [][]string) *ipnstate.Status {
selfTags := views.SliceOf(self)
s := &ipnstate.Status{
BackendState: ipn.Running.String(),
Self: &ipnstate.PeerStatus{
Tags: &selfTags,
},
Peer: map[key.NodePublic]*ipnstate.PeerStatus{},
}
for i, tagStrings := range peers {
tags := views.SliceOf(tagStrings)
s.Peer[key.NewNode().Public()] = &ipnstate.PeerStatus{
Tags: &tags,
TailscaleIPs: addrsForIndex(i),
}
}
return s
}
func authForTags(self []string, peers [][]string) *authorization {
return authForStatus(statusForTags(self, peers))
}
func TestAuthRefreshErrorsNotRunning(t *testing.T) {
ctx := context.Background()
a := authForStatus(nil)
err := a.Refresh(ctx)
if err == nil {
t.Fatalf("expected err to be non-nil")
}
expected := "no status"
if err.Error() != expected {
t.Fatalf("expected: %s, got: %s", expected, err.Error())
}
a = authForStatus(&ipnstate.Status{
BackendState: "NeedsMachineAuth",
})
err = a.Refresh(ctx)
if err == nil {
t.Fatalf("expected err to be non-nil")
}
expected = "ts Server is not running"
if err.Error() != expected {
t.Fatalf("expected: %s, got: %s", expected, err.Error())
}
}
func TestAuthUnrefreshed(t *testing.T) {
a := authForStatus(nil)
if a.AllowsHost(netip.MustParseAddr("100.0.0.1")) {
t.Fatalf("never refreshed authorization, allowsHost: expected false, got true")
}
gotAllowedPeers := a.AllowedPeers()
if gotAllowedPeers.Len() != 0 {
t.Fatalf("never refreshed authorization, allowedPeers: expected [], got %v", gotAllowedPeers)
}
if a.SelfAllowed() != false {
t.Fatalf("never refreshed authorization, selfAllowed: expected false got true")
}
}
func TestAuthAllowsHost(t *testing.T) {
ctx := context.Background()
peerTags := [][]string{
[]string{"woo"},
nil,
[]string{"woo", testTag},
[]string{testTag},
}
expected := []bool{
false,
false,
true,
true,
}
a := authForTags(nil, peerTags)
err := a.Refresh(ctx)
if err != nil {
t.Fatal(err)
}
for i, tags := range peerTags {
for _, addr := range addrsForIndex(i) {
got := a.AllowsHost(addr)
if got != expected[i] {
t.Fatalf("allowed %v, expected: %t, got %t", tags, expected[i], got)
}
}
}
}
func TestAuthAllowedPeers(t *testing.T) {
ctx := context.Background()
a := authForTags(nil, [][]string{
[]string{"woo"},
nil,
[]string{"woo", testTag},
[]string{testTag},
})
err := a.Refresh(ctx)
if err != nil {
t.Fatal(err)
}
ps := a.AllowedPeers()
if ps.Len() != 2 {
t.Fatalf("expected: 2, got: %d", ps.Len())
}
}
func TestAuthSelfAllowed(t *testing.T) {
ctx := context.Background()
a := authForTags([]string{"woo"}, nil)
err := a.Refresh(ctx)
if err != nil {
t.Fatal(err)
}
got := a.SelfAllowed()
if got {
t.Fatalf("expected: false, got: %t", got)
}
a = authForTags([]string{"woo", testTag}, nil)
err = a.Refresh(ctx)
if err != nil {
t.Fatal(err)
}
got = a.SelfAllowed()
if !got {
t.Fatalf("expected: true, got: %t", got)
}
}

180
tsconsensus/http.go Normal file
View File

@@ -0,0 +1,180 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package tsconsensus
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
"time"
"tailscale.com/util/httpm"
)
type joinRequest struct {
RemoteHost string
RemoteID string
}
type commandClient struct {
port uint16
httpClient *http.Client
}
func (rac *commandClient) url(host string, path string) string {
return fmt.Sprintf("http://%s:%d%s", host, rac.port, path)
}
func (rac *commandClient) join(host string, jr joinRequest) error {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
rBs, err := json.Marshal(jr)
if err != nil {
return err
}
url := rac.url(host, "/join")
req, err := http.NewRequestWithContext(ctx, httpm.POST, url, bytes.NewReader(rBs))
if err != nil {
return err
}
resp, err := rac.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
respBs, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
if resp.StatusCode != 200 {
return fmt.Errorf("remote responded %d: %s", resp.StatusCode, string(respBs))
}
return nil
}
func (rac *commandClient) executeCommand(host string, bs []byte) (CommandResult, error) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
url := rac.url(host, "/executeCommand")
req, err := http.NewRequestWithContext(ctx, httpm.POST, url, bytes.NewReader(bs))
if err != nil {
return CommandResult{}, err
}
resp, err := rac.httpClient.Do(req)
if err != nil {
return CommandResult{}, err
}
defer resp.Body.Close()
respBs, err := io.ReadAll(resp.Body)
if err != nil {
return CommandResult{}, err
}
if resp.StatusCode != 200 {
return CommandResult{}, fmt.Errorf("remote responded %d: %s", resp.StatusCode, string(respBs))
}
var cr CommandResult
if err = json.Unmarshal(respBs, &cr); err != nil {
return CommandResult{}, err
}
return cr, nil
}
type authedHandler struct {
auth *authorization
mux *http.ServeMux
}
func (h authedHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
err := h.auth.Refresh(r.Context())
if err != nil {
log.Printf("error authedHandler ServeHTTP refresh auth: %v", err)
http.Error(w, "", http.StatusInternalServerError)
return
}
a, err := addrFromServerAddress(r.RemoteAddr)
if err != nil {
log.Printf("error authedHandler ServeHTTP refresh auth: %v", err)
http.Error(w, "", http.StatusInternalServerError)
return
}
allowed := h.auth.AllowsHost(a)
if !allowed {
http.Error(w, "peer not allowed", http.StatusForbidden)
return
}
h.mux.ServeHTTP(w, r)
}
func (c *Consensus) makeCommandMux() *http.ServeMux {
mux := http.NewServeMux()
mux.HandleFunc("/join", func(w http.ResponseWriter, r *http.Request) {
if r.Method != httpm.POST {
http.Error(w, "Method must be POST", http.StatusMethodNotAllowed)
return
}
defer r.Body.Close()
decoder := json.NewDecoder(http.MaxBytesReader(w, r.Body, 1024*1024))
var jr joinRequest
err := decoder.Decode(&jr)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
_, err = decoder.Token()
if !errors.Is(err, io.EOF) {
http.Error(w, "Request body must only contain a single JSON object", http.StatusBadRequest)
return
}
if jr.RemoteHost == "" {
http.Error(w, "Required: remoteAddr", http.StatusBadRequest)
return
}
if jr.RemoteID == "" {
http.Error(w, "Required: remoteID", http.StatusBadRequest)
return
}
err = c.handleJoin(jr)
if err != nil {
log.Printf("join handler error: %v", err)
http.Error(w, "", http.StatusInternalServerError)
return
}
})
mux.HandleFunc("/executeCommand", func(w http.ResponseWriter, r *http.Request) {
if r.Method != httpm.POST {
http.Error(w, "Bad Request", http.StatusBadRequest)
return
}
defer r.Body.Close()
decoder := json.NewDecoder(r.Body)
var cmd Command
err := decoder.Decode(&cmd)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
result, err := c.executeCommandLocally(cmd)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := json.NewEncoder(w).Encode(result); err != nil {
log.Printf("error encoding execute command result: %v", err)
return
}
})
return mux
}
func (c *Consensus) makeCommandHandler(auth *authorization) http.Handler {
return authedHandler{
mux: c.makeCommandMux(),
auth: auth,
}
}

152
tsconsensus/monitor.go Normal file
View File

@@ -0,0 +1,152 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package tsconsensus
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"slices"
"strings"
"tailscale.com/ipn"
"tailscale.com/ipn/ipnstate"
"tailscale.com/tsnet"
"tailscale.com/util/dnsname"
)
type status struct {
Status *ipnstate.Status
RaftState string
}
type monitor struct {
ts *tsnet.Server
con *Consensus
}
func (m *monitor) getStatus(ctx context.Context) (status, error) {
lc, err := m.ts.LocalClient()
if err != nil {
return status{}, err
}
tStatus, err := lc.Status(ctx)
if err != nil {
return status{}, err
}
return status{Status: tStatus, RaftState: m.con.raft.State().String()}, nil
}
func serveMonitor(c *Consensus, ts *tsnet.Server, listenAddr string) (*http.Server, error) {
ln, err := ts.Listen("tcp", listenAddr)
if err != nil {
return nil, err
}
m := &monitor{con: c, ts: ts}
mux := http.NewServeMux()
mux.HandleFunc("/full", m.handleFullStatus)
mux.HandleFunc("/", m.handleSummaryStatus)
mux.HandleFunc("/netmap", m.handleNetmap)
mux.HandleFunc("/dial", m.handleDial)
srv := &http.Server{Handler: mux}
go func() {
err := srv.Serve(ln)
log.Printf("MonitorHTTP stopped serving with error: %v", err)
}()
return srv, nil
}
func (m *monitor) handleFullStatus(w http.ResponseWriter, r *http.Request) {
s, err := m.getStatus(r.Context())
if err != nil {
log.Printf("monitor: error getStatus: %v", err)
http.Error(w, "", http.StatusInternalServerError)
return
}
if err := json.NewEncoder(w).Encode(s); err != nil {
log.Printf("monitor: error encoding full status: %v", err)
return
}
}
func (m *monitor) handleSummaryStatus(w http.ResponseWriter, r *http.Request) {
s, err := m.getStatus(r.Context())
if err != nil {
log.Printf("monitor: error getStatus: %v", err)
http.Error(w, "", http.StatusInternalServerError)
return
}
lines := []string{}
for _, p := range s.Status.Peer {
if p.Online {
name := dnsname.FirstLabel(p.DNSName)
lines = append(lines, fmt.Sprintf("%s\t\t%d\t%d\t%t", name, p.RxBytes, p.TxBytes, p.Active))
}
}
slices.Sort(lines)
lines = append([]string{fmt.Sprintf("RaftState: %s", s.RaftState)}, lines...)
txt := strings.Join(lines, "\n") + "\n"
w.Write([]byte(txt))
}
func (m *monitor) handleNetmap(w http.ResponseWriter, r *http.Request) {
var mask ipn.NotifyWatchOpt = ipn.NotifyInitialNetMap
mask |= ipn.NotifyNoPrivateKeys
lc, err := m.ts.LocalClient()
if err != nil {
log.Printf("monitor: error LocalClient: %v", err)
http.Error(w, "", http.StatusInternalServerError)
return
}
watcher, err := lc.WatchIPNBus(r.Context(), mask)
if err != nil {
log.Printf("monitor: error WatchIPNBus: %v", err)
http.Error(w, "", http.StatusInternalServerError)
return
}
defer watcher.Close()
n, err := watcher.Next()
if err != nil {
log.Printf("monitor: error watcher.Next: %v", err)
http.Error(w, "", http.StatusInternalServerError)
return
}
encoder := json.NewEncoder(w)
encoder.SetIndent("", "\t")
if err := encoder.Encode(n); err != nil {
log.Printf("monitor: error encoding netmap: %v", err)
return
}
}
func (m *monitor) handleDial(w http.ResponseWriter, r *http.Request) {
var dialParams struct {
Addr string
}
defer r.Body.Close()
bs, err := io.ReadAll(r.Body)
if err != nil {
log.Printf("monitor: error reading body: %v", err)
http.Error(w, "", http.StatusInternalServerError)
return
}
err = json.Unmarshal(bs, &dialParams)
if err != nil {
log.Printf("monitor: error unmarshalling json: %v", err)
http.Error(w, "", http.StatusBadRequest)
return
}
c, err := m.ts.Dial(r.Context(), "tcp", dialParams.Addr)
if err != nil {
log.Printf("monitor: error dialing: %v", err)
http.Error(w, "", http.StatusInternalServerError)
return
}
c.Close()
w.Write([]byte("ok\n"))
}

436
tsconsensus/tsconsensus.go Normal file
View File

@@ -0,0 +1,436 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
// Package tsconsensus implements a consensus algorithm for a group of tsnet.Servers
//
// The Raft consensus algorithm relies on you implementing a state machine that will give the same
// result to a give command as long as the same logs have been applied in the same order.
//
// tsconsensus uses the hashicorp/raft library to implement leader elections and log application.
//
// tsconsensus provides:
// - cluster peer discovery based on tailscale tags
// - executing a command on the leader
// - communication between cluster peers over tailscale using tsnet
//
// Users implement a state machine that satisfies the raft.FSM interface, with the business logic they desire.
// When changes to state are needed any node may
// - create a Command instance with serialized Args.
// - call ExecuteCommand with the Command instance
// this will propagate the command to the leader,
// and then from the reader to every node via raft.
// - the state machine then can implement raft.Apply, and dispatch commands via the Command.Name
// returning a CommandResult with an Err or a serialized Result.
package tsconsensus
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"net"
"net/http"
"net/netip"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/raft"
"tailscale.com/ipn/ipnstate"
"tailscale.com/tsnet"
"tailscale.com/types/views"
)
func raftAddr(host netip.Addr, cfg Config) string {
return netip.AddrPortFrom(host, cfg.RaftPort).String()
}
func addrFromServerAddress(sa string) (netip.Addr, error) {
addrPort, err := netip.ParseAddrPort(sa)
if err != nil {
return netip.Addr{}, err
}
return addrPort.Addr(), nil
}
// A selfRaftNode is the info we need to talk to hashicorp/raft about our node.
// We specify the ID and Addr on Consensus Start, and then use it later for raft
// operations such as BootstrapCluster and AddVoter.
type selfRaftNode struct {
id string
hostAddr netip.Addr
}
// A Config holds configurable values such as ports and timeouts.
// Use DefaultConfig to get a useful Config.
type Config struct {
CommandPort uint16
RaftPort uint16
MonitorPort uint16
Raft *raft.Config
MaxConnPool int
ConnTimeout time.Duration
}
// DefaultConfig returns a Config populated with default values ready for use.
func DefaultConfig() Config {
return Config{
CommandPort: 6271,
RaftPort: 6270,
MonitorPort: 8081,
Raft: raft.DefaultConfig(),
MaxConnPool: 5,
ConnTimeout: 5 * time.Second,
}
}
// StreamLayer implements an interface asked for by raft.NetworkTransport.
// It does the raft interprocess communication via tailscale.
type StreamLayer struct {
net.Listener
s *tsnet.Server
auth *authorization
}
// Dial implements the raft.StreamLayer interface with the tsnet.Server's Dial.
func (sl StreamLayer) Dial(address raft.ServerAddress, timeout time.Duration) (net.Conn, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
err := sl.auth.Refresh(ctx)
if err != nil {
return nil, err
}
addr, err := addrFromServerAddress(string(address))
if err != nil {
return nil, err
}
if !sl.auth.AllowsHost(addr) {
return nil, errors.New("peer is not allowed")
}
return sl.s.Dial(ctx, "tcp", string(address))
}
func (sl StreamLayer) connAuthorized(conn net.Conn) (bool, error) {
if conn.RemoteAddr() == nil {
return false, nil
}
addr, err := addrFromServerAddress(conn.RemoteAddr().String())
if err != nil {
// bad RemoteAddr is not authorized
return false, nil
}
ctx := context.Background() // TODO
err = sl.auth.Refresh(ctx)
if err != nil {
// might be authorized, we couldn't tell
return false, err
}
return sl.auth.AllowsHost(addr), nil
}
func (sl StreamLayer) Accept() (net.Conn, error) {
for {
conn, err := sl.Listener.Accept()
if err != nil || conn == nil {
return conn, err
}
authorized, err := sl.connAuthorized(conn)
if err != nil {
conn.Close()
return nil, err
}
if !authorized {
conn.Close()
continue
}
return conn, err
}
}
// Start returns a pointer to a running Consensus instance.
// Calling it with a *tsnet.Server will cause that server to join or start a consensus cluster
// with other nodes on the tailnet tagged with the clusterTag. The *tsnet.Server will run the state
// machine defined by the raft.FSM also provided, and keep it in sync with the other cluster members'
// state machines using Raft.
func Start(ctx context.Context, ts *tsnet.Server, fsm raft.FSM, clusterTag string, cfg Config, serveDebugMonitor bool) (*Consensus, error) {
if clusterTag == "" {
return nil, errors.New("cluster tag must be provided")
}
cc := commandClient{
port: cfg.CommandPort,
httpClient: ts.HTTPClient(),
}
v4, _ := ts.TailscaleIPs()
self := selfRaftNode{
id: v4.String(),
hostAddr: v4,
}
c := Consensus{
commandClient: &cc,
self: self,
config: cfg,
}
auth := newAuthorization(ts, clusterTag)
err := auth.Refresh(ctx)
if err != nil {
return nil, fmt.Errorf("auth refresh: %w", err)
}
if !auth.SelfAllowed() {
return nil, errors.New("this node is not tagged with the cluster tag")
}
// after startRaft it's possible some other raft node that has us in their configuration will get
// in contact, so by the time we do anything else we may already be a functioning member
// of a consensus
r, err := startRaft(ts, &fsm, c.self, auth, cfg)
if err != nil {
return nil, err
}
c.raft = r
srv, err := c.serveCmdHttp(ts, auth)
if err != nil {
return nil, err
}
c.cmdHttpServer = srv
c.bootstrap(auth.AllowedPeers())
if serveDebugMonitor {
srv, err = serveMonitor(&c, ts, netip.AddrPortFrom(c.self.hostAddr, cfg.MonitorPort).String())
if err != nil {
return nil, err
}
c.monitorHttpServer = srv
}
return &c, nil
}
func startRaft(ts *tsnet.Server, fsm *raft.FSM, self selfRaftNode, auth *authorization, cfg Config) (*raft.Raft, error) {
cfg.Raft.LocalID = raft.ServerID(self.id)
// no persistence (for now?)
logStore := raft.NewInmemStore()
stableStore := raft.NewInmemStore()
snapshots := raft.NewInmemSnapshotStore()
// opens the listener on the raft port, raft will close it when it thinks it's appropriate
ln, err := ts.Listen("tcp", raftAddr(self.hostAddr, cfg))
if err != nil {
return nil, err
}
logger := hclog.New(&hclog.LoggerOptions{
Name: "raft-net",
Output: cfg.Raft.LogOutput,
Level: hclog.LevelFromString(cfg.Raft.LogLevel),
})
transport := raft.NewNetworkTransportWithLogger(StreamLayer{
s: ts,
Listener: ln,
auth: auth,
},
cfg.MaxConnPool,
cfg.ConnTimeout,
logger)
return raft.NewRaft(cfg.Raft, *fsm, logStore, stableStore, snapshots, transport)
}
// A Consensus is the consensus algorithm for a tsnet.Server
// It wraps a raft.Raft instance and performs the peer discovery
// and command execution on the leader.
type Consensus struct {
raft *raft.Raft
commandClient *commandClient
self selfRaftNode
config Config
cmdHttpServer *http.Server
monitorHttpServer *http.Server
}
// bootstrap tries to join a raft cluster, or start one.
//
// We need to do the very first raft cluster configuration, but after that raft manages it.
// bootstrap is called at start up, and we are not currently aware of what the cluster config might be,
// our node may already be in it. Try to join the raft cluster of all the other nodes we know about, and
// if unsuccessful, assume we are the first and start our own.
//
// It's possible for bootstrap to return an error, or start a errant breakaway cluster.
//
// We have a list of expected cluster members already from control (the members of the tailnet with the tag)
// so we could do the initial configuration with all servers specified.
// Choose to start with just this machine in the raft configuration instead, as:
// - We want to handle machines joining after start anyway.
// - Not all tagged nodes tailscale believes are active are necessarily actually responsive right now,
// so let each node opt in when able.
func (c *Consensus) bootstrap(targets views.Slice[*ipnstate.PeerStatus]) error {
log.Printf("Trying to find cluster: num targets to try: %d", targets.Len())
for _, p := range targets.All() {
if !p.Online {
log.Printf("Trying to find cluster: tailscale reports not online: %s", p.TailscaleIPs[0])
} else {
log.Printf("Trying to find cluster: trying %s", p.TailscaleIPs[0])
err := c.commandClient.join(p.TailscaleIPs[0].String(), joinRequest{
RemoteHost: c.self.hostAddr.String(),
RemoteID: c.self.id,
})
if err != nil {
log.Printf("Trying to find cluster: could not join %s: %v", p.TailscaleIPs[0], err)
continue
} else {
log.Printf("Trying to find cluster: joined %s", p.TailscaleIPs[0])
return nil
}
}
}
log.Printf("Trying to find cluster: unsuccessful, starting as leader: %s", c.self.hostAddr.String())
f := c.raft.BootstrapCluster(
raft.Configuration{
Servers: []raft.Server{
{
ID: raft.ServerID(c.self.id),
Address: raft.ServerAddress(c.raftAddr(c.self.hostAddr)),
},
},
})
return f.Error()
}
// ExecuteCommand propagates a Command to be executed on the leader. Which
// uses raft to Apply it to the followers.
func (c *Consensus) ExecuteCommand(cmd Command) (CommandResult, error) {
b, err := json.Marshal(cmd)
if err != nil {
return CommandResult{}, err
}
result, err := c.executeCommandLocally(cmd)
var leErr lookElsewhereError
for errors.As(err, &leErr) {
result, err = c.commandClient.executeCommand(leErr.where, b)
}
return result, err
}
// Stop attempts to gracefully shutdown various components.
func (c *Consensus) Stop(ctx context.Context) error {
fut := c.raft.Shutdown()
err := fut.Error()
if err != nil {
log.Printf("Stop: Error in Raft Shutdown: %v", err)
}
err = c.cmdHttpServer.Shutdown(ctx)
if err != nil {
log.Printf("Stop: Error in command HTTP Shutdown: %v", err)
}
if c.monitorHttpServer != nil {
err = c.monitorHttpServer.Shutdown(ctx)
if err != nil {
log.Printf("Stop: Error in monitor HTTP Shutdown: %v", err)
}
}
return nil
}
// A Command is a representation of a state machine action.
// The Name can be used to dispatch the command when received.
// The Args are serialized for transport.
type Command struct {
Name string
Args []byte
}
// A CommandResult is a representation of the result of a state
// machine action.
// Err is any error that occurred on the node that tried to execute the command,
// including any error from the underlying operation and deserialization problems etc.
// Result is serialized for transport.
type CommandResult struct {
Err error
Result []byte
}
type lookElsewhereError struct {
where string
}
func (e lookElsewhereError) Error() string {
return fmt.Sprintf("not the leader, try: %s", e.where)
}
var errLeaderUnknown = errors.New("Leader Unknown")
func (c *Consensus) serveCmdHttp(ts *tsnet.Server, auth *authorization) (*http.Server, error) {
ln, err := ts.Listen("tcp", c.commandAddr(c.self.hostAddr))
if err != nil {
return nil, err
}
srv := &http.Server{Handler: c.makeCommandHandler(auth)}
go func() {
err := srv.Serve(ln)
log.Printf("CmdHttp stopped serving with err: %v", err)
}()
return srv, nil
}
func (c *Consensus) getLeader() (string, error) {
raftLeaderAddr, _ := c.raft.LeaderWithID()
leaderAddr := (string)(raftLeaderAddr)
if leaderAddr == "" {
// Raft doesn't know who the leader is.
return "", errLeaderUnknown
}
// Raft gives us the address with the raft port, we don't always want that.
host, _, err := net.SplitHostPort(leaderAddr)
return host, err
}
func (c *Consensus) executeCommandLocally(cmd Command) (CommandResult, error) {
b, err := json.Marshal(cmd)
if err != nil {
return CommandResult{}, err
}
f := c.raft.Apply(b, 10*time.Second) // TODO hardcoded timeout
err = f.Error()
result := f.Response()
if errors.Is(err, raft.ErrNotLeader) {
leader, err := c.getLeader()
if err != nil {
// we know we're not leader but we were unable to give the address of the leader
return CommandResult{}, err
}
return CommandResult{}, lookElsewhereError{where: leader}
}
if result == nil {
result = CommandResult{}
}
return result.(CommandResult), err
}
func (c *Consensus) handleJoin(jr joinRequest) error {
addr, err := netip.ParseAddr(jr.RemoteHost)
if err != nil {
return err
}
remoteAddr := c.raftAddr(addr)
f := c.raft.AddVoter(raft.ServerID(jr.RemoteID), raft.ServerAddress(remoteAddr), 0, 0)
if f.Error() != nil {
return f.Error()
}
return nil
}
func (c *Consensus) raftAddr(host netip.Addr) string {
return raftAddr(host, c.config)
}
func (c *Consensus) commandAddr(host netip.Addr) string {
return netip.AddrPortFrom(host, c.config.CommandPort).String()
}

View File

@@ -0,0 +1,690 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package tsconsensus
import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net"
"net/http"
"net/http/httptest"
"net/netip"
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/raft"
"golang.org/x/exp/rand"
"tailscale.com/client/tailscale"
"tailscale.com/ipn/store/mem"
"tailscale.com/net/netns"
"tailscale.com/tailcfg"
"tailscale.com/tsnet"
"tailscale.com/tstest/integration"
"tailscale.com/tstest/integration/testcontrol"
"tailscale.com/tstest/nettest"
"tailscale.com/types/key"
"tailscale.com/types/logger"
"tailscale.com/types/views"
)
type fsm struct {
events []map[string]any
count int
mu sync.Mutex
}
func (f *fsm) Apply(l *raft.Log) any {
f.mu.Lock()
defer f.mu.Unlock()
f.count++
f.events = append(f.events, map[string]any{
"type": "Apply",
"l": l,
})
return CommandResult{
Result: []byte{byte(f.count)},
}
}
func (f *fsm) numEvents() int {
f.mu.Lock()
defer f.mu.Unlock()
return len(f.events)
}
func (f *fsm) Snapshot() (raft.FSMSnapshot, error) {
return nil, nil
}
func (f *fsm) Restore(rc io.ReadCloser) error {
return nil
}
var verboseDERP = false
var verboseNodes = false
var testContextTimeout = 60 * time.Second
func startControl(t *testing.T) (control *testcontrol.Server, controlURL string) {
// Corp#4520: don't use netns for tests.
netns.SetEnabled(false)
t.Cleanup(func() {
netns.SetEnabled(true)
})
derpLogf := logger.Discard
if verboseDERP {
derpLogf = t.Logf
}
derpMap := integration.RunDERPAndSTUN(t, derpLogf, "127.0.0.1")
control = &testcontrol.Server{
DERPMap: derpMap,
DNSConfig: &tailcfg.DNSConfig{
Proxied: true,
},
MagicDNSDomain: "tail-scale.ts.net",
}
control.HTTPTestServer = httptest.NewUnstartedServer(control)
control.HTTPTestServer.Start()
t.Cleanup(control.HTTPTestServer.Close)
controlURL = control.HTTPTestServer.URL
t.Logf("testcontrol listening on %s", controlURL)
return control, controlURL
}
func startNode(t *testing.T, ctx context.Context, controlURL, hostname string) (*tsnet.Server, key.NodePublic, netip.Addr) {
t.Helper()
tmp := filepath.Join(t.TempDir(), hostname)
os.MkdirAll(tmp, 0755)
s := &tsnet.Server{
Dir: tmp,
ControlURL: controlURL,
Hostname: hostname,
Store: new(mem.Store),
Ephemeral: true,
}
if verboseNodes {
s.Logf = log.Printf
}
t.Cleanup(func() { s.Close() })
status, err := s.Up(ctx)
if err != nil {
t.Fatal(err)
}
return s, status.Self.PublicKey, status.TailscaleIPs[0]
}
func waitForNodesToBeTaggedInStatus(t *testing.T, ctx context.Context, ts *tsnet.Server, nodeKeys []key.NodePublic, tag string) {
waitFor(t, "nodes tagged in status", func() bool {
lc, err := ts.LocalClient()
if err != nil {
t.Fatal(err)
}
status, err := lc.Status(ctx)
if err != nil {
t.Fatalf("error getting status: %v", err)
}
for _, k := range nodeKeys {
var tags *views.Slice[string]
if k == status.Self.PublicKey {
tags = status.Self.Tags
} else {
tags = status.Peer[k].Tags
}
if tag == "" {
if tags != nil && tags.Len() != 0 {
return false
}
} else {
if tags == nil {
return false
}
sliceTags := tags.AsSlice()
if len(sliceTags) != 1 || sliceTags[0] != tag {
return false
}
}
}
return true
}, 20, 2*time.Second)
}
func tagNodes(t *testing.T, control *testcontrol.Server, nodeKeys []key.NodePublic, tag string) {
t.Helper()
for _, key := range nodeKeys {
n := control.Node(key)
if tag == "" {
if len(n.Tags) != 1 {
t.Fatalf("expected tags to have one tag")
}
n.Tags = nil
} else {
if len(n.Tags) != 0 {
// if we want this to work with multiple tags we'll have to change the logic
// for checking if a tag got removed yet.
t.Fatalf("expected tags to be empty")
}
n.Tags = append(n.Tags, tag)
}
b := true
n.Online = &b
control.UpdateNode(n)
}
}
func addIDedLogger(id string, c Config) Config {
// logs that identify themselves
c.Raft.Logger = hclog.New(&hclog.LoggerOptions{
Name: fmt.Sprintf("raft: %s", id),
Output: c.Raft.LogOutput,
Level: hclog.LevelFromString(c.Raft.LogLevel),
})
return c
}
func warnLogConfig() Config {
c := DefaultConfig()
// fewer logs from raft
c.Raft.LogLevel = "WARN"
// timeouts long enough that we can form a cluster under -race
// TODO but if I set them to even longer then we have trouble with auth refresh: Get "http://local-tailscaled.sock/localapi/v0/status": context deadline exceeded
c.Raft.LeaderLeaseTimeout = 2 * time.Second
c.Raft.HeartbeatTimeout = 4 * time.Second
c.Raft.ElectionTimeout = 4 * time.Second
return c
}
func TestStart(t *testing.T) {
nettest.SkipIfNoNetwork(t)
control, controlURL := startControl(t)
ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
defer cancel()
one, k, _ := startNode(t, ctx, controlURL, "one")
clusterTag := "tag:whatever"
// nodes must be tagged with the cluster tag, to find each other
tagNodes(t, control, []key.NodePublic{k}, clusterTag)
waitForNodesToBeTaggedInStatus(t, ctx, one, []key.NodePublic{k}, clusterTag)
sm := &fsm{}
r, err := Start(ctx, one, (*fsm)(sm), clusterTag, warnLogConfig(), false)
if err != nil {
t.Fatal(err)
}
defer r.Stop(ctx)
}
func waitFor(t *testing.T, msg string, condition func() bool, nTries int, waitBetweenTries time.Duration) {
for try := 0; try < nTries; try++ {
done := condition()
if done {
t.Logf("waitFor success: %s: after %d tries", msg, try)
return
}
time.Sleep(waitBetweenTries)
}
t.Fatalf("waitFor timed out: %s, after %d tries", msg, nTries)
}
type participant struct {
c *Consensus
sm *fsm
ts *tsnet.Server
key key.NodePublic
}
// starts and tags the *tsnet.Server nodes with the control, waits for the nodes to make successful
// LocalClient Status calls that show the first node as Online.
func startNodesAndWaitForPeerStatus(t *testing.T, ctx context.Context, clusterTag string, nNodes int) ([]*participant, *testcontrol.Server, string) {
ps := make([]*participant, nNodes)
keysToTag := make([]key.NodePublic, nNodes)
localClients := make([]*tailscale.LocalClient, nNodes)
control, controlURL := startControl(t)
for i := 0; i < nNodes; i++ {
ts, key, _ := startNode(t, ctx, controlURL, fmt.Sprintf("node: %d", i))
ps[i] = &participant{ts: ts, key: key}
keysToTag[i] = key
lc, err := ts.LocalClient()
if err != nil {
t.Fatalf("%d: error getting local client: %v", i, err)
}
localClients[i] = lc
}
tagNodes(t, control, keysToTag, clusterTag)
waitForNodesToBeTaggedInStatus(t, ctx, ps[0].ts, keysToTag, clusterTag)
fxCameOnline := func() bool {
// all the _other_ nodes see the first as online
for i := 1; i < nNodes; i++ {
status, err := localClients[i].Status(ctx)
if err != nil {
t.Fatalf("%d: error getting status: %v", i, err)
}
if !status.Peer[ps[0].key].Online {
return false
}
}
return true
}
waitFor(t, "other nodes see node 1 online in ts status", fxCameOnline, 10, 2*time.Second)
return ps, control, controlURL
}
// populates participants with their consensus fields, waits for all nodes to show all nodes
// as part of the same consensus cluster. Starts the first participant first and waits for it to
// become leader before adding other nodes.
func createConsensusCluster(t *testing.T, ctx context.Context, clusterTag string, participants []*participant, cfg Config, serveDebugMonitor bool) {
participants[0].sm = &fsm{}
rand.Seed(uint64(time.Now().UnixNano()))
randomNumber := rand.Intn(8999) + 1000
myCfg := addIDedLogger(fmt.Sprintf("0(%d)", randomNumber), cfg)
first, err := Start(ctx, participants[0].ts, (*fsm)(participants[0].sm), clusterTag, myCfg, serveDebugMonitor)
if err != nil {
t.Fatal(err)
}
fxFirstIsLeader := func() bool {
return first.raft.State() == raft.Leader
}
waitFor(t, "node 0 is leader", fxFirstIsLeader, 20, 2*time.Second)
participants[0].c = first
for i := 1; i < len(participants); i++ {
participants[i].sm = &fsm{}
randomNumber := rand.Intn(8999) + 1000
myCfg := addIDedLogger(fmt.Sprintf("%d(%d)", i, randomNumber), cfg)
c, err := Start(ctx, participants[i].ts, (*fsm)(participants[i].sm), clusterTag, myCfg, false)
if err != nil {
t.Fatal(err)
}
participants[i].c = c
}
fxRaftConfigContainsAll := func() bool {
for i := 0; i < len(participants); i++ {
fut := participants[i].c.raft.GetConfiguration()
err = fut.Error()
if err != nil {
t.Fatalf("%d: Getting Configuration errored: %v", i, err)
}
if len(fut.Configuration().Servers) != len(participants) {
return false
}
}
return true
}
waitFor(t, "all raft machines have all servers in their config", fxRaftConfigContainsAll, 15, time.Second*2)
}
func TestApply(t *testing.T) {
nettest.SkipIfNoNetwork(t)
ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
defer cancel()
clusterTag := "tag:whatever"
ps, _, _ := startNodesAndWaitForPeerStatus(t, ctx, clusterTag, 2)
cfg := warnLogConfig()
createConsensusCluster(t, ctx, clusterTag, ps, cfg, false)
for _, p := range ps {
defer p.c.Stop(ctx)
}
fut := ps[0].c.raft.Apply([]byte("woo"), 2*time.Second)
err := fut.Error()
if err != nil {
t.Fatalf("Raft Apply Error: %v", err)
}
fxBothMachinesHaveTheApply := func() bool {
return ps[0].sm.numEvents() == 1 && ps[1].sm.numEvents() == 1
}
waitFor(t, "the apply event made it into both state machines", fxBothMachinesHaveTheApply, 10, time.Second*1)
}
// calls ExecuteCommand on each participant and checks that all participants get all commands
func assertCommandsWorkOnAnyNode(t *testing.T, participants []*participant) {
for i, p := range participants {
res, err := p.c.ExecuteCommand(Command{Args: []byte{byte(i)}})
if err != nil {
t.Fatalf("%d: Error ExecuteCommand: %v", i, err)
}
if res.Err != nil {
t.Fatalf("%d: Result Error ExecuteCommand: %v", i, res.Err)
}
retVal := int(res.Result[0])
// the test implementation of the fsm returns the count of events that have been received
if retVal != i+1 {
t.Fatalf("Result, want %d, got %d", i+1, retVal)
}
expectedEventsLength := i + 1
fxEventsInAll := func() bool {
for _, pOther := range participants {
if pOther.sm.numEvents() != expectedEventsLength {
return false
}
}
return true
}
waitFor(t, "event makes it to all", fxEventsInAll, 10, time.Second*1)
}
}
func TestConfig(t *testing.T) {
nettest.SkipIfNoNetwork(t)
ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
defer cancel()
clusterTag := "tag:whatever"
ps, _, _ := startNodesAndWaitForPeerStatus(t, ctx, clusterTag, 3)
cfg := warnLogConfig()
// test all is well with non default ports
cfg.CommandPort = 12347
cfg.RaftPort = 11882
mp := uint16(8798)
cfg.MonitorPort = mp
createConsensusCluster(t, ctx, clusterTag, ps, cfg, true)
for _, p := range ps {
defer p.c.Stop(ctx)
}
assertCommandsWorkOnAnyNode(t, ps)
url := fmt.Sprintf("http://%s:%d/", ps[0].c.self.hostAddr.String(), mp)
httpClientOnTailnet := ps[1].ts.HTTPClient()
rsp, err := httpClientOnTailnet.Get(url)
if err != nil {
t.Fatal(err)
}
if rsp.StatusCode != 200 {
t.Fatalf("monitor status want %d, got %d", 200, rsp.StatusCode)
}
defer rsp.Body.Close()
body, err := io.ReadAll(rsp.Body)
if err != nil {
t.Fatal(err)
}
// Not a great assertion because it relies on the format of the response.
line1 := strings.Split(string(body), "\n")[0]
if line1[:10] != "RaftState:" {
t.Fatalf("getting monitor status, first line, want something that starts with 'RaftState:', got '%s'", line1)
}
}
func TestFollowerFailover(t *testing.T) {
nettest.SkipIfNoNetwork(t)
ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
defer cancel()
clusterTag := "tag:whatever"
ps, _, _ := startNodesAndWaitForPeerStatus(t, ctx, clusterTag, 3)
cfg := warnLogConfig()
createConsensusCluster(t, ctx, clusterTag, ps, cfg, false)
for _, p := range ps {
defer p.c.Stop(ctx)
}
smThree := ps[2].sm
fut := ps[0].c.raft.Apply([]byte("a"), 2*time.Second)
futTwo := ps[0].c.raft.Apply([]byte("b"), 2*time.Second)
err := fut.Error()
if err != nil {
t.Fatalf("Apply Raft error %v", err)
}
err = futTwo.Error()
if err != nil {
t.Fatalf("Apply Raft error %v", err)
}
fxAllMachinesHaveTheApplies := func() bool {
return ps[0].sm.numEvents() == 2 && ps[1].sm.numEvents() == 2 && smThree.numEvents() == 2
}
waitFor(t, "the apply events made it into all state machines", fxAllMachinesHaveTheApplies, 10, time.Second*1)
//a follower goes loses contact with the cluster
ps[2].c.Stop(ctx)
// applies still make it to one and two
futThree := ps[0].c.raft.Apply([]byte("c"), 2*time.Second)
futFour := ps[0].c.raft.Apply([]byte("d"), 2*time.Second)
err = futThree.Error()
if err != nil {
t.Fatalf("Apply Raft error %v", err)
}
err = futFour.Error()
if err != nil {
t.Fatalf("Apply Raft error %v", err)
}
fxAliveMachinesHaveTheApplies := func() bool {
return ps[0].sm.numEvents() == 4 && ps[1].sm.numEvents() == 4 && smThree.numEvents() == 2
}
waitFor(t, "the apply events made it into eligible state machines", fxAliveMachinesHaveTheApplies, 10, time.Second*1)
// follower comes back
smThreeAgain := &fsm{}
cfg = addIDedLogger("2 after restarting", warnLogConfig())
rThreeAgain, err := Start(ctx, ps[2].ts, (*fsm)(smThreeAgain), clusterTag, cfg, false)
if err != nil {
t.Fatal(err)
}
defer rThreeAgain.Stop(ctx)
fxThreeGetsCaughtUp := func() bool {
return smThreeAgain.numEvents() == 4
}
waitFor(t, "the apply events made it into the third node when it appeared with an empty state machine", fxThreeGetsCaughtUp, 20, time.Second*2)
if smThree.numEvents() != 2 {
t.Fatalf("Expected smThree to remain on 2 events: got %d", smThree.numEvents())
}
}
func TestRejoin(t *testing.T) {
nettest.SkipIfNoNetwork(t)
ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
defer cancel()
clusterTag := "tag:whatever"
ps, control, controlURL := startNodesAndWaitForPeerStatus(t, ctx, clusterTag, 3)
cfg := warnLogConfig()
createConsensusCluster(t, ctx, clusterTag, ps, cfg, false)
for _, p := range ps {
defer p.c.Stop(ctx)
}
// 1st node gets a redundant second join request from the second node
ps[0].c.handleJoin(joinRequest{
RemoteHost: ps[1].c.self.hostAddr.String(),
RemoteID: ps[1].c.self.id,
})
tsJoiner, keyJoiner, _ := startNode(t, ctx, controlURL, "node: joiner")
tagNodes(t, control, []key.NodePublic{keyJoiner}, clusterTag)
waitForNodesToBeTaggedInStatus(t, ctx, ps[0].ts, []key.NodePublic{keyJoiner}, clusterTag)
smJoiner := &fsm{}
cJoiner, err := Start(ctx, tsJoiner, (*fsm)(smJoiner), clusterTag, cfg, false)
if err != nil {
t.Fatal(err)
}
ps = append(ps, &participant{
sm: smJoiner,
c: cJoiner,
ts: tsJoiner,
key: keyJoiner,
})
assertCommandsWorkOnAnyNode(t, ps)
}
func TestOnlyTaggedPeersCanDialRaftPort(t *testing.T) {
nettest.SkipIfNoNetwork(t)
ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
defer cancel()
clusterTag := "tag:whatever"
ps, control, controlURL := startNodesAndWaitForPeerStatus(t, ctx, clusterTag, 3)
cfg := warnLogConfig()
createConsensusCluster(t, ctx, clusterTag, ps, cfg, false)
for _, p := range ps {
defer p.c.Stop(ctx)
}
assertCommandsWorkOnAnyNode(t, ps)
untaggedNode, _, _ := startNode(t, ctx, controlURL, "untagged node")
taggedNode, taggedKey, _ := startNode(t, ctx, controlURL, "untagged node")
tagNodes(t, control, []key.NodePublic{taggedKey}, clusterTag)
waitForNodesToBeTaggedInStatus(t, ctx, ps[0].ts, []key.NodePublic{taggedKey}, clusterTag)
// surface area: command http, peer tcp
//untagged
ipv4, _ := ps[0].ts.TailscaleIPs()
sAddr := fmt.Sprintf("%s:%d", ipv4, cfg.RaftPort)
getErrorFromTryingToSend := func(s *tsnet.Server) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
conn, err := s.Dial(ctx, "tcp", sAddr)
if err != nil {
t.Fatalf("unexpected Dial err: %v", err)
}
fmt.Fprintf(conn, "hellllllloooooo")
status, err := bufio.NewReader(conn).ReadString('\n')
if status != "" {
t.Fatalf("node sending non-raft message should get empty response, got: '%s' for: %s", status, s.Hostname)
}
if err == nil {
t.Fatalf("node sending non-raft message should get an error but got nil err for: %s", s.Hostname)
}
return err
}
isNetErr := func(err error) bool {
var netErr net.Error
return errors.As(err, &netErr)
}
err := getErrorFromTryingToSend(untaggedNode)
if !isNetErr(err) {
t.Fatalf("untagged node trying to send should get a net.Error, got: %v", err)
}
// we still get an error trying to send but it's EOF the target node was happy to talk
// to us but couldn't understand what we said.
err = getErrorFromTryingToSend(taggedNode)
if isNetErr(err) {
t.Fatalf("tagged node trying to send should not get a net.Error, got: %v", err)
}
}
func TestOnlyTaggedPeersCanBeDialed(t *testing.T) {
nettest.SkipIfNoNetwork(t)
ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
defer cancel()
clusterTag := "tag:whatever"
ps, control, _ := startNodesAndWaitForPeerStatus(t, ctx, clusterTag, 3)
// make a StreamLayer for ps[0]
ts := ps[0].ts
auth := newAuthorization(ts, clusterTag)
port := 19841
lns := make([]net.Listener, 3)
for i, p := range ps {
ln, err := p.ts.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
t.Fatal(err)
}
lns[i] = ln
}
sl := StreamLayer{
s: ts,
Listener: lns[0],
auth: auth,
}
ip1, _ := ps[1].ts.TailscaleIPs()
a1 := raft.ServerAddress(fmt.Sprintf("%s:%d", ip1, port))
ip2, _ := ps[2].ts.TailscaleIPs()
a2 := raft.ServerAddress(fmt.Sprintf("%s:%d", ip2, port))
// both can be dialed...
conn, err := sl.Dial(a1, 2*time.Second)
if err != nil {
t.Fatal(err)
}
conn.Close()
conn, err = sl.Dial(a2, 2*time.Second)
if err != nil {
t.Fatal(err)
}
conn.Close()
// untag ps[2]
tagNodes(t, control, []key.NodePublic{ps[2].key}, "")
waitForNodesToBeTaggedInStatus(t, ctx, ps[0].ts, []key.NodePublic{ps[2].key}, "")
// now only ps[1] can be dialed
conn, err = sl.Dial(a1, 2*time.Second)
if err != nil {
t.Fatal(err)
}
conn.Close()
_, err = sl.Dial(a2, 2*time.Second)
if err.Error() != "peer is not allowed" {
t.Fatalf("expected peer is not allowed, got: %v", err)
}
}
func TestOnlyTaggedPeersCanJoin(t *testing.T) {
nettest.SkipIfNoNetwork(t)
ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
defer cancel()
clusterTag := "tag:whatever"
ps, _, controlURL := startNodesAndWaitForPeerStatus(t, ctx, clusterTag, 3)
cfg := warnLogConfig()
createConsensusCluster(t, ctx, clusterTag, ps, cfg, false)
for _, p := range ps {
defer p.c.Stop(ctx)
}
tsJoiner, _, _ := startNode(t, ctx, controlURL, "joiner node")
ipv4, _ := tsJoiner.TailscaleIPs()
url := fmt.Sprintf("http://%s/join", ps[0].c.commandAddr(ps[0].c.self.hostAddr))
payload, err := json.Marshal(joinRequest{
RemoteHost: ipv4.String(),
RemoteID: "node joiner",
})
if err != nil {
t.Fatal(err)
}
body := bytes.NewBuffer(payload)
req, err := http.NewRequest("POST", url, body)
if err != nil {
t.Fatal(err)
}
resp, err := tsJoiner.HTTPClient().Do(req)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusForbidden {
t.Fatalf("join req when not tagged, expected status: %d, got: %d", http.StatusForbidden, resp.StatusCode)
}
rBody, _ := io.ReadAll(resp.Body)
sBody := strings.TrimSpace(string(rBody))
expected := "peer not allowed"
if sBody != expected {
t.Fatalf("join req when not tagged, expected body: %s, got: %s", expected, sBody)
}
}