Compare commits
3 Commits
docker_sta
...
tomhjp/con
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fe3b3d5827 | ||
|
|
30a13ebbd8 | ||
|
|
1363be38ce |
@@ -8,15 +8,22 @@ package main
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/netip"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"tailscale.com/ipn"
|
||||
"tailscale.com/kube/kubeapi"
|
||||
"tailscale.com/kube/kubeclient"
|
||||
"tailscale.com/kube/kubetypes"
|
||||
"tailscale.com/logtail/backoff"
|
||||
"tailscale.com/tailcfg"
|
||||
"tailscale.com/types/logger"
|
||||
)
|
||||
|
||||
// kubeClient is a wrapper around Tailscale's internal kube client that knows how to talk to the kube API server. We use
|
||||
@@ -125,3 +132,62 @@ func (kc *kubeClient) storeCapVerUID(ctx context.Context, podUID string) error {
|
||||
}
|
||||
return kc.StrategicMergePatchSecret(ctx, kc.stateSecret, s, "tailscale-container")
|
||||
}
|
||||
|
||||
// waitForConsistentState waits for tailscaled to finish writing state if it
|
||||
// looks like it's started. It is designed to reduce the likelihood that
|
||||
// tailscaled gets shut down in the window between authenticating to control
|
||||
// and finishing writing state. However, it's not bullet proof because we can't
|
||||
// atomically authenticate and write state.
|
||||
func (kc *kubeClient) waitForConsistentState(ctx context.Context) error {
|
||||
var logged bool
|
||||
|
||||
bo := backoff.NewBackoff("", logger.Discard, 2*time.Second)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
secret, err := kc.GetSecret(ctx, kc.stateSecret)
|
||||
if ctx.Err() != nil || kubeclient.IsNotFoundErr(err) {
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting Secret %q: %v", kc.stateSecret, err)
|
||||
}
|
||||
|
||||
if hasConsistentState(secret.Data) {
|
||||
return nil
|
||||
}
|
||||
|
||||
if !logged {
|
||||
log.Printf("Waiting for tailscaled to finish writing state to Secret %q", kc.stateSecret)
|
||||
logged = true
|
||||
}
|
||||
bo.BackOff(ctx, errors.New("")) // Fake error to trigger actual sleep.
|
||||
}
|
||||
}
|
||||
|
||||
// hasConsistentState returns true is there is either no state or the full set
|
||||
// of expected keys are present.
|
||||
func hasConsistentState(d map[string][]byte) bool {
|
||||
var (
|
||||
_, hasCurrent = d[string(ipn.CurrentProfileStateKey)]
|
||||
_, hasKnown = d[string(ipn.KnownProfilesStateKey)]
|
||||
_, hasMachine = d[string(ipn.MachineKeyStateKey)]
|
||||
hasProfile bool
|
||||
)
|
||||
|
||||
for k := range d {
|
||||
if strings.HasPrefix(k, "profile-") {
|
||||
if hasProfile {
|
||||
return false // We only expect one profile.
|
||||
}
|
||||
hasProfile = true
|
||||
}
|
||||
}
|
||||
|
||||
// Approximate check, we don't want to reimplement all of profileManager.
|
||||
return (hasCurrent && hasKnown && hasMachine && hasProfile) ||
|
||||
(!hasCurrent && !hasKnown && !hasMachine && !hasProfile)
|
||||
}
|
||||
|
||||
@@ -9,8 +9,10 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"tailscale.com/ipn"
|
||||
"tailscale.com/kube/kubeapi"
|
||||
"tailscale.com/kube/kubeclient"
|
||||
)
|
||||
@@ -205,3 +207,34 @@ func TestSetupKube(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestWaitForConsistentState(t *testing.T) {
|
||||
data := map[string][]byte{
|
||||
// Missing _current-profile.
|
||||
string(ipn.KnownProfilesStateKey): []byte(""),
|
||||
string(ipn.MachineKeyStateKey): []byte(""),
|
||||
"profile-foo": []byte(""),
|
||||
}
|
||||
kc := &kubeClient{
|
||||
Client: &kubeclient.FakeClient{
|
||||
GetSecretImpl: func(context.Context, string) (*kubeapi.Secret, error) {
|
||||
return &kubeapi.Secret{
|
||||
Data: data,
|
||||
}, nil
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
if err := kc.waitForConsistentState(ctx); err != context.DeadlineExceeded {
|
||||
t.Fatalf("expected DeadlineExceeded, got %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
data[string(ipn.CurrentProfileStateKey)] = []byte("")
|
||||
if err := kc.waitForConsistentState(ctx); err != nil {
|
||||
t.Fatalf("expected nil, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -137,53 +137,83 @@ func newNetfilterRunner(logf logger.Logf) (linuxfw.NetfilterRunner, error) {
|
||||
}
|
||||
|
||||
func main() {
|
||||
if err := mainErr(); err != nil && !errors.Is(err, context.Canceled) {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func mainErr() error {
|
||||
log.SetPrefix("boot: ")
|
||||
tailscale.I_Acknowledge_This_API_Is_Unstable = true
|
||||
|
||||
cfg, err := configFromEnv()
|
||||
if err != nil {
|
||||
log.Fatalf("invalid configuration: %v", err)
|
||||
return fmt.Errorf("invalid configuration: %w", err)
|
||||
}
|
||||
|
||||
if !cfg.UserspaceMode {
|
||||
if err := ensureTunFile(cfg.Root); err != nil {
|
||||
log.Fatalf("Unable to create tuntap device file: %v", err)
|
||||
return fmt.Errorf("unable to create tuntap device file: %w", err)
|
||||
}
|
||||
if cfg.ProxyTargetIP != "" || cfg.ProxyTargetDNSName != "" || cfg.Routes != nil || cfg.TailnetTargetIP != "" || cfg.TailnetTargetFQDN != "" {
|
||||
if err := ensureIPForwarding(cfg.Root, cfg.ProxyTargetIP, cfg.TailnetTargetIP, cfg.TailnetTargetFQDN, cfg.Routes); err != nil {
|
||||
log.Printf("Failed to enable IP forwarding: %v", err)
|
||||
log.Printf("To run tailscale as a proxy or router container, IP forwarding must be enabled.")
|
||||
if cfg.InKubernetes {
|
||||
log.Fatalf("You can either set the sysctls as a privileged initContainer, or run the tailscale container with privileged=true.")
|
||||
return fmt.Errorf("you can either set the sysctls as a privileged initContainer, or run the tailscale container with privileged=true.")
|
||||
} else {
|
||||
log.Fatalf("You can fix this by running the container with privileged=true, or the equivalent in your container runtime that permits access to sysctls.")
|
||||
return fmt.Errorf("you can fix this by running the container with privileged=true, or the equivalent in your container runtime that permits access to sysctls.")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Context is used for all setup stuff until we're in steady
|
||||
// Root context for the whole containerboot process, used to make sure
|
||||
// shutdown signals are promptly and cleanly handled.
|
||||
ctx, cancel := contextWithExitSignalWatch()
|
||||
defer cancel()
|
||||
|
||||
// bootCtx is used for all setup stuff until we're in steady
|
||||
// state, so that if something is hanging we eventually time out
|
||||
// and crashloop the container.
|
||||
bootCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
|
||||
bootCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
|
||||
defer cancel()
|
||||
|
||||
var kc *kubeClient
|
||||
if cfg.InKubernetes {
|
||||
kc, err = newKubeClient(cfg.Root, cfg.KubeSecret)
|
||||
if err != nil {
|
||||
log.Fatalf("error initializing kube client: %v", err)
|
||||
return fmt.Errorf("error initializing kube client: %w", err)
|
||||
}
|
||||
if err := cfg.setupKube(bootCtx, kc); err != nil {
|
||||
log.Fatalf("error setting up for running on Kubernetes: %v", err)
|
||||
return fmt.Errorf("error setting up for running on Kubernetes: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
client, daemonProcess, err := startTailscaled(bootCtx, cfg)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to bring up tailscale: %v", err)
|
||||
return fmt.Errorf("failed to bring up tailscale: %w", err)
|
||||
}
|
||||
killTailscaled := func() {
|
||||
if hasKubeStateStore(cfg) {
|
||||
// Check we're not shutting tailscaled down while it's still writing
|
||||
// state. If we authenticate and fail to write all the state, we'll
|
||||
// never recover automatically.
|
||||
//
|
||||
// The termination grace period for the pod is 30s. We wait 25s at
|
||||
// most so that we still reserve some of that budget for tailscaled
|
||||
// to receive and react to a SIGTERM before the SIGKILL that k8s
|
||||
// will send at the end of the grace period.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 25*time.Second)
|
||||
defer cancel()
|
||||
|
||||
log.Printf("Checking for consistent state")
|
||||
err := kc.waitForConsistentState(ctx)
|
||||
if err != nil {
|
||||
log.Printf("Error waiting for consistent state on shutdown: %v", err)
|
||||
}
|
||||
}
|
||||
log.Printf("Sending SIGTERM to tailscaled")
|
||||
if err := daemonProcess.Signal(unix.SIGTERM); err != nil {
|
||||
log.Fatalf("error shutting tailscaled down: %v", err)
|
||||
}
|
||||
@@ -226,7 +256,7 @@ func main() {
|
||||
|
||||
w, err := client.WatchIPNBus(bootCtx, ipn.NotifyInitialNetMap|ipn.NotifyInitialPrefs|ipn.NotifyInitialState)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to watch tailscaled for updates: %v", err)
|
||||
return fmt.Errorf("failed to watch tailscaled for updates: %w", err)
|
||||
}
|
||||
|
||||
// Now that we've started tailscaled, we can symlink the socket to the
|
||||
@@ -262,18 +292,18 @@ func main() {
|
||||
didLogin = true
|
||||
w.Close()
|
||||
if err := tailscaleUp(bootCtx, cfg); err != nil {
|
||||
return fmt.Errorf("failed to auth tailscale: %v", err)
|
||||
return fmt.Errorf("failed to auth tailscale: %w", err)
|
||||
}
|
||||
w, err = client.WatchIPNBus(bootCtx, ipn.NotifyInitialNetMap|ipn.NotifyInitialState)
|
||||
if err != nil {
|
||||
return fmt.Errorf("rewatching tailscaled for updates after auth: %v", err)
|
||||
return fmt.Errorf("rewatching tailscaled for updates after auth: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if isTwoStepConfigAlwaysAuth(cfg) {
|
||||
if err := authTailscale(); err != nil {
|
||||
log.Fatalf("failed to auth tailscale: %v", err)
|
||||
return fmt.Errorf("failed to auth tailscale: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -281,7 +311,7 @@ authLoop:
|
||||
for {
|
||||
n, err := w.Next()
|
||||
if err != nil {
|
||||
log.Fatalf("failed to read from tailscaled: %v", err)
|
||||
return fmt.Errorf("failed to read from tailscaled: %w", err)
|
||||
}
|
||||
|
||||
if n.State != nil {
|
||||
@@ -290,10 +320,10 @@ authLoop:
|
||||
if isOneStepConfig(cfg) {
|
||||
// This could happen if this is the first time tailscaled was run for this
|
||||
// device and the auth key was not passed via the configfile.
|
||||
log.Fatalf("invalid state: tailscaled daemon started with a config file, but tailscale is not logged in: ensure you pass a valid auth key in the config file.")
|
||||
return fmt.Errorf("invalid state: tailscaled daemon started with a config file, but tailscale is not logged in: ensure you pass a valid auth key in the config file.")
|
||||
}
|
||||
if err := authTailscale(); err != nil {
|
||||
log.Fatalf("failed to auth tailscale: %v", err)
|
||||
return fmt.Errorf("failed to auth tailscale: %w", err)
|
||||
}
|
||||
case ipn.NeedsMachineAuth:
|
||||
log.Printf("machine authorization required, please visit the admin panel")
|
||||
@@ -313,14 +343,11 @@ authLoop:
|
||||
|
||||
w.Close()
|
||||
|
||||
ctx, cancel := contextWithExitSignalWatch()
|
||||
defer cancel()
|
||||
|
||||
if isTwoStepConfigAuthOnce(cfg) {
|
||||
// Now that we are authenticated, we can set/reset any of the
|
||||
// settings that we need to.
|
||||
if err := tailscaleSet(ctx, cfg); err != nil {
|
||||
log.Fatalf("failed to auth tailscale: %v", err)
|
||||
return fmt.Errorf("failed to auth tailscale: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -329,10 +356,10 @@ authLoop:
|
||||
if cfg.ServeConfigPath != "" {
|
||||
log.Printf("serve proxy: unsetting previous config")
|
||||
if err := client.SetServeConfig(ctx, new(ipn.ServeConfig)); err != nil {
|
||||
log.Fatalf("failed to unset serve config: %v", err)
|
||||
return fmt.Errorf("failed to unset serve config: %w", err)
|
||||
}
|
||||
if err := kc.storeHTTPSEndpoint(ctx, ""); err != nil {
|
||||
log.Fatalf("failed to update HTTPS endpoint in tailscale state: %v", err)
|
||||
return fmt.Errorf("failed to update HTTPS endpoint in tailscale state: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -342,19 +369,19 @@ authLoop:
|
||||
// wipe it, but it's good hygiene.
|
||||
log.Printf("Deleting authkey from kube secret")
|
||||
if err := kc.deleteAuthKey(ctx); err != nil {
|
||||
log.Fatalf("deleting authkey from kube secret: %v", err)
|
||||
return fmt.Errorf("deleting authkey from kube secret: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if hasKubeStateStore(cfg) {
|
||||
if err := kc.storeCapVerUID(ctx, cfg.PodUID); err != nil {
|
||||
log.Fatalf("storing capability version and UID: %v", err)
|
||||
return fmt.Errorf("storing capability version and UID: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
w, err = client.WatchIPNBus(ctx, ipn.NotifyInitialNetMap|ipn.NotifyInitialState)
|
||||
if err != nil {
|
||||
log.Fatalf("rewatching tailscaled for updates after auth: %v", err)
|
||||
return fmt.Errorf("rewatching tailscaled for updates after auth: %w", err)
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -378,7 +405,7 @@ authLoop:
|
||||
if isL3Proxy(cfg) {
|
||||
nfr, err = newNetfilterRunner(log.Printf)
|
||||
if err != nil {
|
||||
log.Fatalf("error creating new netfilter runner: %v", err)
|
||||
return fmt.Errorf("error creating new netfilter runner: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -449,7 +476,7 @@ runLoop:
|
||||
killTailscaled()
|
||||
break runLoop
|
||||
case err := <-errChan:
|
||||
log.Fatalf("failed to read from tailscaled: %v", err)
|
||||
return fmt.Errorf("failed to read from tailscaled: %w", err)
|
||||
case n := <-notifyChan:
|
||||
if n.State != nil && *n.State != ipn.Running {
|
||||
// Something's gone wrong and we've left the authenticated state.
|
||||
@@ -457,7 +484,7 @@ runLoop:
|
||||
// control flow required to make it work now is hard. So, just crash
|
||||
// the container and rely on the container runtime to restart us,
|
||||
// whereupon we'll go through initial auth again.
|
||||
log.Fatalf("tailscaled left running state (now in state %q), exiting", *n.State)
|
||||
return fmt.Errorf("tailscaled left running state (now in state %q), exiting", *n.State)
|
||||
}
|
||||
if n.NetMap != nil {
|
||||
addrs = n.NetMap.SelfNode.Addresses().AsSlice()
|
||||
@@ -475,7 +502,7 @@ runLoop:
|
||||
deviceID := n.NetMap.SelfNode.StableID()
|
||||
if hasKubeStateStore(cfg) && deephash.Update(¤tDeviceID, &deviceID) {
|
||||
if err := kc.storeDeviceID(ctx, n.NetMap.SelfNode.StableID()); err != nil {
|
||||
log.Fatalf("storing device ID in Kubernetes Secret: %v", err)
|
||||
return fmt.Errorf("storing device ID in Kubernetes Secret: %w", err)
|
||||
}
|
||||
}
|
||||
if cfg.TailnetTargetFQDN != "" {
|
||||
@@ -512,12 +539,12 @@ runLoop:
|
||||
rulesInstalled = true
|
||||
log.Printf("Installing forwarding rules for destination %v", ea.String())
|
||||
if err := installEgressForwardingRule(ctx, ea.String(), addrs, nfr); err != nil {
|
||||
log.Fatalf("installing egress proxy rules for destination %s: %v", ea.String(), err)
|
||||
return fmt.Errorf("installing egress proxy rules for destination %s: %v", ea.String(), err)
|
||||
}
|
||||
}
|
||||
}
|
||||
if !rulesInstalled {
|
||||
log.Fatalf("no forwarding rules for egress addresses %v, host supports IPv6: %v", egressAddrs, nfr.HasIPV6NAT())
|
||||
return fmt.Errorf("no forwarding rules for egress addresses %v, host supports IPv6: %v", egressAddrs, nfr.HasIPV6NAT())
|
||||
}
|
||||
}
|
||||
currentEgressIPs = newCurentEgressIPs
|
||||
@@ -525,7 +552,7 @@ runLoop:
|
||||
if cfg.ProxyTargetIP != "" && len(addrs) != 0 && ipsHaveChanged {
|
||||
log.Printf("Installing proxy rules")
|
||||
if err := installIngressForwardingRule(ctx, cfg.ProxyTargetIP, addrs, nfr); err != nil {
|
||||
log.Fatalf("installing ingress proxy rules: %v", err)
|
||||
return fmt.Errorf("installing ingress proxy rules: %w", err)
|
||||
}
|
||||
}
|
||||
if cfg.ProxyTargetDNSName != "" && len(addrs) != 0 && ipsHaveChanged {
|
||||
@@ -541,7 +568,7 @@ runLoop:
|
||||
if backendsHaveChanged {
|
||||
log.Printf("installing ingress proxy rules for backends %v", newBackendAddrs)
|
||||
if err := installIngressForwardingRuleForDNSTarget(ctx, newBackendAddrs, addrs, nfr); err != nil {
|
||||
log.Fatalf("error installing ingress proxy rules: %v", err)
|
||||
return fmt.Errorf("error installing ingress proxy rules: %w", err)
|
||||
}
|
||||
}
|
||||
resetTimer(false)
|
||||
@@ -563,7 +590,7 @@ runLoop:
|
||||
if cfg.TailnetTargetIP != "" && ipsHaveChanged && len(addrs) != 0 {
|
||||
log.Printf("Installing forwarding rules for destination %v", cfg.TailnetTargetIP)
|
||||
if err := installEgressForwardingRule(ctx, cfg.TailnetTargetIP, addrs, nfr); err != nil {
|
||||
log.Fatalf("installing egress proxy rules: %v", err)
|
||||
return fmt.Errorf("installing egress proxy rules: %w", err)
|
||||
}
|
||||
}
|
||||
// If this is a L7 cluster ingress proxy (set up
|
||||
@@ -575,7 +602,7 @@ runLoop:
|
||||
if cfg.AllowProxyingClusterTrafficViaIngress && cfg.ServeConfigPath != "" && ipsHaveChanged && len(addrs) != 0 {
|
||||
log.Printf("installing rules to forward traffic for %s to node's tailnet IP", cfg.PodIP)
|
||||
if err := installTSForwardingRuleForDestination(ctx, cfg.PodIP, addrs, nfr); err != nil {
|
||||
log.Fatalf("installing rules to forward traffic to node's tailnet IP: %v", err)
|
||||
return fmt.Errorf("installing rules to forward traffic to node's tailnet IP: %w", err)
|
||||
}
|
||||
}
|
||||
currentIPs = newCurrentIPs
|
||||
@@ -594,7 +621,7 @@ runLoop:
|
||||
deviceEndpoints := []any{n.NetMap.SelfNode.Name(), n.NetMap.SelfNode.Addresses()}
|
||||
if hasKubeStateStore(cfg) && deephash.Update(¤tDeviceEndpoints, &deviceEndpoints) {
|
||||
if err := kc.storeDeviceEndpoints(ctx, n.NetMap.SelfNode.Name(), n.NetMap.SelfNode.Addresses().AsSlice()); err != nil {
|
||||
log.Fatalf("storing device IPs and FQDN in Kubernetes Secret: %v", err)
|
||||
return fmt.Errorf("storing device IPs and FQDN in Kubernetes Secret: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -684,16 +711,18 @@ runLoop:
|
||||
if backendsHaveChanged && len(addrs) != 0 {
|
||||
log.Printf("Backend address change detected, installing proxy rules for backends %v", newBackendAddrs)
|
||||
if err := installIngressForwardingRuleForDNSTarget(ctx, newBackendAddrs, addrs, nfr); err != nil {
|
||||
log.Fatalf("installing ingress proxy rules for DNS target %s: %v", cfg.ProxyTargetDNSName, err)
|
||||
return fmt.Errorf("installing ingress proxy rules for DNS target %s: %v", cfg.ProxyTargetDNSName, err)
|
||||
}
|
||||
}
|
||||
backendAddrs = newBackendAddrs
|
||||
resetTimer(false)
|
||||
case e := <-egressSvcsErrorChan:
|
||||
log.Fatalf("egress proxy failed: %v", e)
|
||||
return fmt.Errorf("egress proxy failed: %v", e)
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ensureTunFile checks that /dev/net/tun exists, creating it if
|
||||
@@ -722,13 +751,13 @@ func resolveDNS(ctx context.Context, name string) ([]net.IP, error) {
|
||||
ip4s, err := net.DefaultResolver.LookupIP(ctx, "ip4", name)
|
||||
if err != nil {
|
||||
if e, ok := err.(*net.DNSError); !(ok && e.IsNotFound) {
|
||||
return nil, fmt.Errorf("error looking up IPv4 addresses: %v", err)
|
||||
return nil, fmt.Errorf("error looking up IPv4 addresses: %w", err)
|
||||
}
|
||||
}
|
||||
ip6s, err := net.DefaultResolver.LookupIP(ctx, "ip6", name)
|
||||
if err != nil {
|
||||
if e, ok := err.(*net.DNSError); !(ok && e.IsNotFound) {
|
||||
return nil, fmt.Errorf("error looking up IPv6 addresses: %v", err)
|
||||
return nil, fmt.Errorf("error looking up IPv6 addresses: %w", err)
|
||||
}
|
||||
}
|
||||
if len(ip4s) == 0 && len(ip6s) == 0 {
|
||||
@@ -741,7 +770,7 @@ func resolveDNS(ctx context.Context, name string) ([]net.IP, error) {
|
||||
// context that gets cancelled when a signal is received and a cancel function
|
||||
// that can be called to free the resources when the watch should be stopped.
|
||||
func contextWithExitSignalWatch() (context.Context, func()) {
|
||||
closeChan := make(chan string)
|
||||
closeChan := make(chan struct{})
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
signalChan := make(chan os.Signal, 1)
|
||||
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
|
||||
@@ -753,8 +782,11 @@ func contextWithExitSignalWatch() (context.Context, func()) {
|
||||
return
|
||||
}
|
||||
}()
|
||||
closeOnce := sync.Once{}
|
||||
f := func() {
|
||||
closeChan <- "goodbye"
|
||||
closeOnce.Do(func() {
|
||||
close(closeChan)
|
||||
})
|
||||
}
|
||||
return ctx, f
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -47,9 +48,7 @@ func TestContainerBoot(t *testing.T) {
|
||||
defer lapi.Close()
|
||||
|
||||
kube := kubeServer{FSRoot: d}
|
||||
if err := kube.Start(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
kube.Start(t, "127.0.0.1")
|
||||
defer kube.Close()
|
||||
|
||||
tailscaledConf := &ipn.ConfigVAlpha{AuthKey: ptr.To("foo"), Version: "alpha0"}
|
||||
@@ -130,15 +129,29 @@ func TestContainerBoot(t *testing.T) {
|
||||
|
||||
// WantCmds is the commands that containerboot should run in this phase.
|
||||
WantCmds []string
|
||||
|
||||
// WantKubeSecret is the secret keys/values that should exist in the
|
||||
// kube secret.
|
||||
WantKubeSecret map[string]string
|
||||
|
||||
// Update the kube secret with these keys/values at the beginning of the
|
||||
// phase (simulates our fake tailscaled doing it).
|
||||
UpdateKubeSecret map[string]string
|
||||
|
||||
// WantFiles files that should exist in the container and their
|
||||
// contents.
|
||||
WantFiles map[string]string
|
||||
// WantFatalLog is the fatal log message we expect from containerboot.
|
||||
// If set for a phase, the test will finish on that phase.
|
||||
WantFatalLog string
|
||||
|
||||
// WantLog is a log message we expect from containerboot.
|
||||
WantLog string
|
||||
|
||||
// If set for a phase, the test will expect containerboot to exit with
|
||||
// this error code, and the test will finish on that phase without
|
||||
// waiting for the successful startup log message.
|
||||
WantExitCode *int
|
||||
|
||||
// The signal to send to containerboot at the start of the phase.
|
||||
Signal *syscall.Signal
|
||||
|
||||
EndpointStatuses map[string]int
|
||||
}
|
||||
@@ -426,7 +439,8 @@ func TestContainerBoot(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
WantFatalLog: "no forwarding rules for egress addresses [::1/128], host supports IPv6: false",
|
||||
WantLog: "no forwarding rules for egress addresses [::1/128], host supports IPv6: false",
|
||||
WantExitCode: ptr.To(1),
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -829,6 +843,60 @@ func TestContainerBoot(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "kube_shutdown_during_state_write",
|
||||
Env: map[string]string{
|
||||
"KUBERNETES_SERVICE_HOST": kube.Host,
|
||||
"KUBERNETES_SERVICE_PORT_HTTPS": kube.Port,
|
||||
},
|
||||
KubeSecret: map[string]string{
|
||||
"authkey": "tskey-key",
|
||||
},
|
||||
Phases: []phase{
|
||||
{
|
||||
// Normal startup.
|
||||
WantCmds: []string{
|
||||
"/usr/bin/tailscaled --socket=/tmp/tailscaled.sock --state=kube:tailscale --statedir=/tmp --tun=userspace-networking",
|
||||
"/usr/bin/tailscale --socket=/tmp/tailscaled.sock up --accept-dns=false --authkey=tskey-key",
|
||||
},
|
||||
WantKubeSecret: map[string]string{
|
||||
"authkey": "tskey-key",
|
||||
},
|
||||
},
|
||||
{
|
||||
// SIGTERM before state is finished writing, should wait for
|
||||
// consistent state before propagating SIGTERM to tailscaled.
|
||||
Signal: ptr.To(unix.SIGTERM),
|
||||
UpdateKubeSecret: map[string]string{
|
||||
"_machinekey": "foo",
|
||||
"_profiles": "foo",
|
||||
"profile-baff": "foo",
|
||||
// Missing "_current-profile" key.
|
||||
},
|
||||
WantKubeSecret: map[string]string{
|
||||
"authkey": "tskey-key",
|
||||
"_machinekey": "foo",
|
||||
"_profiles": "foo",
|
||||
"profile-baff": "foo",
|
||||
},
|
||||
WantLog: "Waiting for tailscaled to finish writing state to Secret \"tailscale\"",
|
||||
},
|
||||
{
|
||||
// tailscaled has finished writing state, should propagate SIGTERM.
|
||||
UpdateKubeSecret: map[string]string{
|
||||
"_current-profile": "foo",
|
||||
},
|
||||
WantKubeSecret: map[string]string{
|
||||
"authkey": "tskey-key",
|
||||
"_machinekey": "foo",
|
||||
"_profiles": "foo",
|
||||
"profile-baff": "foo",
|
||||
"_current-profile": "foo",
|
||||
},
|
||||
WantExitCode: ptr.To(0),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
@@ -873,26 +941,36 @@ func TestContainerBoot(t *testing.T) {
|
||||
|
||||
var wantCmds []string
|
||||
for i, p := range test.Phases {
|
||||
for k, v := range p.UpdateKubeSecret {
|
||||
kube.SetSecret(k, v)
|
||||
}
|
||||
lapi.Notify(p.Notify)
|
||||
if p.WantFatalLog != "" {
|
||||
if p.Signal != nil {
|
||||
cmd.Process.Signal(*p.Signal)
|
||||
}
|
||||
if p.WantLog != "" {
|
||||
err := tstest.WaitFor(2*time.Second, func() error {
|
||||
state, err := cmd.Process.Wait()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if state.ExitCode() != 1 {
|
||||
return fmt.Errorf("process exited with code %d but wanted %d", state.ExitCode(), 1)
|
||||
}
|
||||
waitLogLine(t, time.Second, cbOut, p.WantFatalLog)
|
||||
waitLogLine(t, time.Second, cbOut, p.WantLog)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
if p.WantExitCode != nil {
|
||||
state, err := cmd.Process.Wait()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if state.ExitCode() != *p.WantExitCode {
|
||||
t.Fatalf("phase %d: want exit code %d, got %d", i, *p.WantExitCode, state.ExitCode())
|
||||
}
|
||||
|
||||
// Early test return, we don't expect the successful startup log message.
|
||||
return
|
||||
}
|
||||
|
||||
wantCmds = append(wantCmds, p.WantCmds...)
|
||||
waitArgs(t, 2*time.Second, d, argFile, strings.Join(wantCmds, "\n"))
|
||||
err := tstest.WaitFor(2*time.Second, func() error {
|
||||
@@ -948,6 +1026,9 @@ func TestContainerBoot(t *testing.T) {
|
||||
}
|
||||
}
|
||||
waitLogLine(t, 2*time.Second, cbOut, "Startup complete, waiting for shutdown signal")
|
||||
if cmd.ProcessState != nil {
|
||||
t.Fatalf("containerboot should be running but exited with exit code %d", cmd.ProcessState.ExitCode())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1179,18 +1260,18 @@ func (k *kubeServer) Reset() {
|
||||
k.secret = map[string]string{}
|
||||
}
|
||||
|
||||
func (k *kubeServer) Start() error {
|
||||
func (k *kubeServer) Start(t *testing.T, ip string) {
|
||||
root := filepath.Join(k.FSRoot, "var/run/secrets/kubernetes.io/serviceaccount")
|
||||
|
||||
if err := os.MkdirAll(root, 0700); err != nil {
|
||||
return err
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := os.WriteFile(filepath.Join(root, "namespace"), []byte("default"), 0600); err != nil {
|
||||
return err
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := os.WriteFile(filepath.Join(root, "token"), []byte("bearer_token"), 0600); err != nil {
|
||||
return err
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
k.srv = httptest.NewTLSServer(k)
|
||||
@@ -1199,13 +1280,11 @@ func (k *kubeServer) Start() error {
|
||||
|
||||
var cert bytes.Buffer
|
||||
if err := pem.Encode(&cert, &pem.Block{Type: "CERTIFICATE", Bytes: k.srv.Certificate().Raw}); err != nil {
|
||||
return err
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := os.WriteFile(filepath.Join(root, "ca.crt"), cert.Bytes(), 0600); err != nil {
|
||||
return err
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (k *kubeServer) Close() {
|
||||
@@ -1254,6 +1333,7 @@ func (k *kubeServer) serveSecret(w http.ResponseWriter, r *http.Request) {
|
||||
http.Error(w, fmt.Sprintf("reading request body: %v", err), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
defer r.Body.Close()
|
||||
|
||||
switch r.Method {
|
||||
case "GET":
|
||||
@@ -1286,13 +1366,16 @@ func (k *kubeServer) serveSecret(w http.ResponseWriter, r *http.Request) {
|
||||
panic(fmt.Sprintf("json decode failed: %v. Body:\n\n%s", err, string(bs)))
|
||||
}
|
||||
for _, op := range req {
|
||||
if op.Op != "remove" {
|
||||
switch op.Op {
|
||||
case "remove":
|
||||
if !strings.HasPrefix(op.Path, "/data/") {
|
||||
panic(fmt.Sprintf("unsupported json-patch path %q", op.Path))
|
||||
}
|
||||
delete(k.secret, strings.TrimPrefix(op.Path, "/data/"))
|
||||
default:
|
||||
panic(fmt.Sprintf("unsupported json-patch op %q", op.Op))
|
||||
}
|
||||
if !strings.HasPrefix(op.Path, "/data/") {
|
||||
panic(fmt.Sprintf("unsupported json-patch path %q", op.Path))
|
||||
}
|
||||
delete(k.secret, strings.TrimPrefix(op.Path, "/data/"))
|
||||
|
||||
}
|
||||
case "application/strategic-merge-patch+json":
|
||||
req := struct {
|
||||
@@ -1308,6 +1391,6 @@ func (k *kubeServer) serveSecret(w http.ResponseWriter, r *http.Request) {
|
||||
panic(fmt.Sprintf("unknown content type %q", r.Header.Get("Content-Type")))
|
||||
}
|
||||
default:
|
||||
panic(fmt.Sprintf("unhandled HTTP method %q", r.Method))
|
||||
panic(fmt.Sprintf("unhandled HTTP request %s %s", r.Method, r.URL))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user