Compare commits

...

2 Commits

Author SHA1 Message Date
Irbe Krumina
11532daad9 WIP: Connector multi-replica
Signed-off-by: Irbe Krumina <irbe@tailscale.com>
2024-11-06 07:22:02 +00:00
Irbe Krumina
7de684e71a client/tailscale,ipn/{ipnlocal,localapi}: add a pre-shutdown localAPI endpoint that terminates control connections.
Adds a /lameduck local API endpoint that just shuts down control client.
This can be run before shutting down an HA subnet router/app connector replica - it will ensure
that all connection to control are dropped and control thus considers this node inactive and tells
peers to switch over to another replica. Meanwhile the existing connections keep working (assuming
that the replica is given some graceful shutdown period).

Updates tailscale/tailscale#14020

Signed-off-by: Irbe Krumina <irbe@tailscale.com>
2024-11-06 06:52:25 +00:00
17 changed files with 380 additions and 194 deletions

View File

@@ -1327,6 +1327,17 @@ func (lc *LocalClient) SetServeConfig(ctx context.Context, config *ipn.ServeConf
return nil
}
// LameDuck shuts down all connections to control, thus making control consider this node inactive. This can be run on
// HA subnet router or app connector replicas before shutting them down to ensure peers get told to switch over to
// another replica whilst there is still some grace period for the existing connections to terminate.
func (lc *LocalClient) LameDuck(ctx context.Context) error {
_, _, err := lc.sendWithHeaders(ctx, "POST", "/localapi/v0/lameduck", 200, nil, nil)
if err != nil {
return fmt.Errorf("error enabling lameduck mode: %w", err)
}
return nil
}
// NetworkLockDisable shuts down network-lock across the tailnet.
func (lc *LocalClient) NetworkLockDisable(ctx context.Context, secret []byte) error {
if _, err := lc.send(ctx, "POST", "/localapi/v0/tka/disable", 200, bytes.NewReader(secret)); err != nil {

View File

@@ -0,0 +1,52 @@
# HA failover
This is an experimental prototype for fast failover for subnet routers via Kubernetes operator.
Problem: how can we ensure that if multiple subnet router replicas are ran and a replica is about to be deleted (i.e StatefulSet upgrade), peers that currently route via this subnet router will switch to another subnet router instance _before_ the first one is deleted.
This code change:
- adds a lameduck local API endpoint that can be called to shut down control client and thus force control to consider this node inactive
- adds a prestop hook definition to Connector StatefulSet that calls terminate endpoint
- bumps termination grace period seconds on Connector Pod spec 30s -> 120s to ensure that the /terminate endpoint gets a chance to finish
This change also includes WIP work to run Connector in multi-replica mode.
### How to try it:
```
$ helm upgrade --install operator tailscale-dev/tailscale-operator -n tailscale --create-namespace --set operatorConfig.image.repo=gcr.io/csi-test-290908/operator --set operatorConfig.image.tag=0.12connmultir --set proxyConfig.image.repo=gcr.io/csi-test-290908/proxy --set proxyConfig.image.tag=v0.0.15connmultir -n tailscale --create-namespace --set oauth.clientId=<id> --set oauth.clientSecret=<>
```
```
$ kubectl delete crd connectors.tailscale.com // need to re-apply CRD from this branch
```
(from this branch)
```
$ kubectl apply -f cmd/k8s-operator/deploy/crds/tailscale.com_connectors.yaml
```
Apply a multi-replica Connector with some route:
```
apiVersion: tailscale.com/v1alpha1
kind: Connector
metadata:
name: prod
spec:
tags:
- "tag:prod"
hostname: ts-prod
subnetRouter:
- <route>
replicas: 3
```
Test failover during deletion, i.e curl the backend in a tight-ish loop and delete the primary Pod, you should be able to observe that within ~a minute traffic switches over to the second Pod, meanwhile the connection should keep working without an obvious hitch.
(I was curl-ing with 1s interval and saw a RST, then it switched over)

View File

@@ -6,10 +6,14 @@
package main
import (
"context"
"log"
"net"
"net/http"
"sync"
"time"
"tailscale.com/client/tailscale"
)
// healthz is a simple health check server, if enabled it returns 200 OK if
@@ -33,14 +37,20 @@ func (h *healthz) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// runHealthz runs a simple HTTP health endpoint on /healthz, listening on the
// provided address. A containerized tailscale instance is considered healthy if
// it has at least one tailnet IP address.
func runHealthz(addr string, h *healthz) {
func run(addr string, h *healthz, lc *tailscale.LocalClient) {
lis, err := net.Listen("tcp", addr)
if err != nil {
log.Fatalf("error listening on the provided health endpoint address %q: %v", addr, err)
}
mux := http.NewServeMux()
mux.Handle("/healthz", h)
log.Printf("Running healthcheck endpoint at %s/healthz", addr)
t := terminator{lc: lc}
// /terminate is an endpoint that can be called from a prestop hook of this containerboot instance.
// https://kubernetes.io/docs/concepts/containers/container-lifecycle-hooks/#container-hooks It drops all
// connections to and from Tailscale control plane. This can be used for containerboot instances that are HA
// subnet routers. Control plane will consider the instance that is not responding as 'inactive' and prompt
// peers to switch to another subnet router. Whilst this happens the existing connections will remain functional.
mux.Handle("/terminate", t)
hs := &http.Server{Handler: mux}
go func() {
@@ -49,3 +59,38 @@ func runHealthz(addr string, h *healthz) {
}
}()
}
type terminator struct {
// nfr linuxfw.NetfilterRunner
lc *tailscale.LocalClient
}
func (t terminator) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Printf("prestopBlockNetmapUpdates triggered")
if err := t.lc.LameDuck(context.Background()); err != nil {
log.Fatalf("error enabling lameduck: %v", err)
}
// tailscaleIPs, err := resolveDNS(context.Background(), "controlplane.tailscale.com")
// if err != nil {
// log.Printf("prestopBlockNetmapUpdates errored: %v", err)
// return
// }
// var (
// addrs []netip.Addr
// )
// for _, ip := range tailscaleIPs {
// if ip.To4() != nil {
// addrs = append(addrs, netip.AddrFrom4([4]byte(ip.To4())))
// }
// // just v4 for this prototype
// }
// for _, addr := range addrs {
// log.Printf("dropping traffic to %v", addr)
// if err := t.nfr.AddDropRule(addr); err != nil {
// log.Printf("error adding drop rule for %v: %v", addr, err)
// }
// }
log.Printf("sleeping to give control plane a chance to update...")
time.Sleep(time.Second * 100)
log.Printf("finished sleeping")
}

View File

@@ -307,6 +307,12 @@ authLoop:
if err != nil {
log.Fatalf("rewatching tailscaled for updates after auth: %v", err)
}
var nfr linuxfw.NetfilterRunner
// for this prototype
nfr, err = newNetfilterRunner(log.Printf)
if err != nil {
log.Fatalf("error creating new netfilter runner: %v", err)
}
var (
startupTasksDone = false
@@ -323,19 +329,11 @@ authLoop:
certDomainChanged = make(chan bool, 1)
h = &healthz{} // http server for the healthz endpoint
healthzRunner = sync.OnceFunc(func() { runHealthz(cfg.HealthCheckAddrPort, h) })
healthzRunner = sync.OnceFunc(func() { run(cfg.HealthCheckAddrPort, h, client) })
)
if cfg.ServeConfigPath != "" {
go watchServeConfigChanges(ctx, cfg.ServeConfigPath, certDomainChanged, certDomain, client)
}
var nfr linuxfw.NetfilterRunner
if isL3Proxy(cfg) {
nfr, err = newNetfilterRunner(log.Printf)
if err != nil {
log.Fatalf("error creating new netfilter runner: %v", err)
}
}
// Setup for proxies that are configured to proxy to a target specified
// by a DNS name (TS_EXPERIMENTAL_DEST_DNS_NAME).
const defaultCheckPeriod = time.Minute * 10 // how often to check what IPs the DNS name resolves to
@@ -744,3 +742,7 @@ func tailscaledConfigFilePath() string {
log.Printf("Using tailscaled config file %q for capability version %q", maxCompatVer, tailcfg.CurrentCapabilityVersion)
return path.Join(dir, kubeutils.TailscaledConfigFileName(maxCompatVer))
}
func preStopBlockNetmapUpdates(ctx context.Context, nfr linuxfw.NetfilterRunner) {
// figure out if we are a subnet router in HA mode
}

View File

@@ -170,11 +170,6 @@ func (s *settings) validate() error {
if s.EnableForwardingOptimizations && s.UserspaceMode {
return errors.New("TS_EXPERIMENTAL_ENABLE_FORWARDING_OPTIMIZATIONS is not supported in userspace mode")
}
if s.HealthCheckAddrPort != "" {
if _, err := netip.ParseAddrPort(s.HealthCheckAddrPort); err != nil {
return fmt.Errorf("error parsing TS_HEALTH_CHECK_ADDR_PORT value %q: %w", s.HealthCheckAddrPort, err)
}
}
return nil
}

View File

@@ -183,6 +183,10 @@ func (a *ConnectorReconciler) maybeProvisionConnector(ctx context.Context, logge
isExitNode: cn.Spec.ExitNode,
},
ProxyClassName: proxyClass,
Replicas: 1,
}
if cn.Spec.Replicas != nil {
sts.Replicas = int32(*cn.Spec.Replicas)
}
if cn.Spec.SubnetRouter != nil && len(cn.Spec.SubnetRouter.AdvertiseRoutes) > 0 {
@@ -213,21 +217,21 @@ func (a *ConnectorReconciler) maybeProvisionConnector(ctx context.Context, logge
return err
}
_, tsHost, ips, err := a.ssr.DeviceInfo(ctx, crl)
if err != nil {
return err
}
// _, tsHost, ips, err := a.ssr.DeviceInfo(ctx, crl)
// if err != nil {
// return err
// }
if tsHost == "" {
logger.Debugf("no Tailscale hostname known yet, waiting for connector pod to finish auth")
// No hostname yet. Wait for the connector pod to auth.
cn.Status.TailnetIPs = nil
cn.Status.Hostname = ""
return nil
}
// if tsHost == "" {
// logger.Debugf("no Tailscale hostname known yet, waiting for connector pod to finish auth")
// // No hostname yet. Wait for the connector pod to auth.
// cn.Status.TailnetIPs = nil
// cn.Status.Hostname = ""
// return nil
// }
cn.Status.TailnetIPs = ips
cn.Status.Hostname = tsHost
// cn.Status.TailnetIPs = ips
// cn.Status.Hostname = tsHost
return nil
}

View File

@@ -65,6 +65,8 @@ spec:
More info:
https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status
type: object
required:
- replicas
properties:
exitNode:
description: |-
@@ -88,6 +90,8 @@ spec:
resources created for this Connector. If unset, the operator will
create resources with the default configuration.
type: string
replicas:
type: integer
subnetRouter:
description: |-
SubnetRouter defines subnet routes that the Connector node should

View File

@@ -113,6 +113,8 @@ spec:
resources created for this Connector. If unset, the operator will
create resources with the default configuration.
type: string
replicas:
type: integer
subnetRouter:
description: |-
SubnetRouter defines subnet routes that the Connector node should
@@ -149,6 +151,8 @@ spec:
pattern: ^tag:[a-zA-Z][a-zA-Z0-9-]*$
type: string
type: array
required:
- replicas
type: object
x-kubernetes-validations:
- message: A Connector needs to be either an exit node or a subnet router, or both.

View File

@@ -9,6 +9,7 @@ spec:
metadata:
deletionGracePeriodSeconds: 10
spec:
terminationGracePeriodSeconds: 120
serviceAccountName: proxies
initContainers:
- name: sysctler
@@ -22,6 +23,11 @@ spec:
memory: 1Mi
containers:
- name: tailscale
lifecycle:
preStop:
httpGet:
path: /terminate
port: 8081
imagePullPolicy: Always
env:
- name: TS_USERSPACE

View File

@@ -129,6 +129,7 @@ type tailscaleSTSConfig struct {
ProxyClassName string // name of ProxyClass if one needs to be applied to the proxy
ProxyClass *tsapi.ProxyClass // ProxyClass that needs to be applied to the proxy (if there is one)
Replicas int32
}
type connector struct {
@@ -186,11 +187,11 @@ func (a *tailscaleSTSReconciler) Provision(ctx context.Context, logger *zap.Suga
}
sts.ProxyClass = proxyClass
secretName, tsConfigHash, configs, err := a.createOrGetSecret(ctx, logger, sts, hsvc)
tsConfigHash, configs, err := a.createOrGetSecrets(ctx, logger, sts, hsvc)
if err != nil {
return nil, fmt.Errorf("failed to create or get API key secret: %w", err)
}
_, err = a.reconcileSTS(ctx, logger, sts, hsvc, secretName, tsConfigHash, configs)
_, err = a.reconcileSTS(ctx, logger, sts, hsvc, tsConfigHash, configs)
if err != nil {
return nil, fmt.Errorf("failed to reconcile statefulset: %w", err)
}
@@ -226,22 +227,27 @@ func (a *tailscaleSTSReconciler) Cleanup(ctx context.Context, logger *zap.Sugare
logger.Debugf("started deletion of statefulset %s/%s", sts.GetNamespace(), sts.GetName())
return false, nil
}
id, _, _, err := a.DeviceInfo(ctx, labels)
if err != nil {
return false, fmt.Errorf("getting device info: %w", err)
stateSecrets := &corev1.SecretList{}
if err := a.List(ctx, stateSecrets); err != nil {
return false, err
}
if id != "" {
logger.Debugf("deleting device %s from control", string(id))
if err := a.tsClient.DeleteDevice(ctx, string(id)); err != nil {
errResp := &tailscale.ErrResponse{}
if ok := errors.As(err, errResp); ok && errResp.Status == http.StatusNotFound {
logger.Debugf("device %s not found, likely because it has already been deleted from control", string(id))
for _, sec := range stateSecrets.Items {
id, _, _, err := deviceInfo(&sec)
if err != nil {
return false, fmt.Errorf("error cleaning up state: %v", err)
}
if id != "" {
logger.Debugf("deleting device %s from control", string(id))
if err := a.tsClient.DeleteDevice(ctx, string(id)); err != nil {
errResp := &tailscale.ErrResponse{}
if ok := errors.As(err, errResp); ok && errResp.Status == http.StatusNotFound {
logger.Debugf("device %s not found, likely because it has already been deleted from control", string(id))
} else {
return false, fmt.Errorf("deleting device: %w", err)
}
} else {
return false, fmt.Errorf("deleting device: %w", err)
logger.Debugf("device %s deleted from control", string(id))
}
} else {
logger.Debugf("device %s deleted from control", string(id))
}
}
@@ -304,96 +310,96 @@ func (a *tailscaleSTSReconciler) reconcileHeadlessService(ctx context.Context, l
return createOrUpdate(ctx, a.Client, a.operatorNamespace, hsvc, func(svc *corev1.Service) { svc.Spec = hsvc.Spec })
}
func (a *tailscaleSTSReconciler) createOrGetSecret(ctx context.Context, logger *zap.SugaredLogger, stsC *tailscaleSTSConfig, hsvc *corev1.Service) (secretName, hash string, configs tailscaledConfigs, _ error) {
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
// Hardcode a -0 suffix so that in future, if we support
// multiple StatefulSet replicas, we can provision -N for
// those.
Name: hsvc.Name + "-0",
Namespace: a.operatorNamespace,
Labels: stsC.ChildResourceLabels,
},
}
var orig *corev1.Secret // unmodified copy of secret
if err := a.Get(ctx, client.ObjectKeyFromObject(secret), secret); err == nil {
logger.Debugf("secret %s/%s already exists", secret.GetNamespace(), secret.GetName())
orig = secret.DeepCopy()
} else if !apierrors.IsNotFound(err) {
return "", "", nil, err
// tailscaled config secrets
func (a *tailscaleSTSReconciler) createOrGetSecrets(ctx context.Context, logger *zap.SugaredLogger, stsC *tailscaleSTSConfig, hsvc *corev1.Service) (hash string, configs tailscaledConfigs, _ error) {
var allConfigs []tailscaledConfigs
// TODO: deal with pre-existing secrets so we don't recreate _all_ auth keys on upgrade to this version.
for i := range stsC.Replicas {
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%d-config", hsvc.Name, i),
Namespace: a.operatorNamespace,
Labels: stsC.ChildResourceLabels,
},
}
var orig *corev1.Secret // unmodified copy of secret
if err := a.Get(ctx, client.ObjectKeyFromObject(secret), secret); err == nil {
logger.Debugf("secret %s/%s already exists", secret.GetNamespace(), secret.GetName())
orig = secret.DeepCopy()
} else if !apierrors.IsNotFound(err) {
return "", nil, err
}
var authKey string
if orig == nil {
sts, err := getSingleObject[appsv1.StatefulSet](ctx, a.Client, a.operatorNamespace, stsC.ChildResourceLabels)
if err != nil {
return "", nil, err
}
if sts != nil {
// StatefulSet exists, so we have already created the secret.
// If the secret is missing, they should delete the StatefulSet.
logger.Errorf("Tailscale proxy secret doesn't exist, but the corresponding StatefulSet %s/%s already does. Something is wrong, please delete the StatefulSet.", sts.GetNamespace(), sts.GetName())
return "", nil, nil
}
// Create auth key Secret which is going to be used by the Statefulset to authenticate with Tailscale.
logger.Debugf("creating authkey for new tailscale proxy")
tags := stsC.Tags
if len(tags) == 0 {
tags = a.defaultTags
}
authKey, err = newAuthKey(ctx, a.tsClient, tags)
if err != nil {
return "", nil, err
}
}
configs, err := tailscaledConfig(stsC, authKey, orig, i)
if err != nil {
return "", nil, fmt.Errorf("error creating tailscaled config: %w", err)
}
allConfigs = append(allConfigs, configs)
latest := tailcfg.CapabilityVersion(-1)
var latestConfig ipn.ConfigVAlpha
for key, val := range configs {
fn := tsoperator.TailscaledConfigFileName(key)
b, err := json.Marshal(val)
if err != nil {
return "", nil, fmt.Errorf("error marshalling tailscaled config: %w", err)
}
mak.Set(&secret.StringData, fn, string(b))
if key > latest {
latest = key
latestConfig = val
}
}
if stsC.ServeConfig != nil {
j, err := json.Marshal(stsC.ServeConfig)
if err != nil {
return "", nil, err
}
mak.Set(&secret.StringData, "serve-config", string(j))
}
if orig != nil {
logger.Debugf("patching the existing proxy Secret with tailscaled config %s", sanitizeConfigBytes(latestConfig))
if err := a.Patch(ctx, secret, client.MergeFrom(orig)); err != nil {
return "", nil, err
}
} else {
logger.Debugf("creating a new Secret for the proxy with tailscaled config %s", sanitizeConfigBytes(latestConfig))
if err := a.Create(ctx, secret); err != nil {
return "", nil, err
}
}
}
var authKey string
if orig == nil {
// Initially it contains only tailscaled config, but when the
// proxy starts, it will also store there the state, certs and
// ACME account key.
sts, err := getSingleObject[appsv1.StatefulSet](ctx, a.Client, a.operatorNamespace, stsC.ChildResourceLabels)
if err != nil {
return "", "", nil, err
}
if sts != nil {
// StatefulSet exists, so we have already created the secret.
// If the secret is missing, they should delete the StatefulSet.
logger.Errorf("Tailscale proxy secret doesn't exist, but the corresponding StatefulSet %s/%s already does. Something is wrong, please delete the StatefulSet.", sts.GetNamespace(), sts.GetName())
return "", "", nil, nil
}
// Create API Key secret which is going to be used by the statefulset
// to authenticate with Tailscale.
logger.Debugf("creating authkey for new tailscale proxy")
tags := stsC.Tags
if len(tags) == 0 {
tags = a.defaultTags
}
authKey, err = newAuthKey(ctx, a.tsClient, tags)
if err != nil {
return "", "", nil, err
}
}
configs, err := tailscaledConfig(stsC, authKey, orig)
hash, err := tailscaledConfigHash(allConfigs)
if err != nil {
return "", "", nil, fmt.Errorf("error creating tailscaled config: %w", err)
}
hash, err = tailscaledConfigHash(configs)
if err != nil {
return "", "", nil, fmt.Errorf("error calculating hash of tailscaled configs: %w", err)
return "", nil, fmt.Errorf("error calculating hash of tailscaled configs: %w", err)
}
latest := tailcfg.CapabilityVersion(-1)
var latestConfig ipn.ConfigVAlpha
for key, val := range configs {
fn := tsoperator.TailscaledConfigFileName(key)
b, err := json.Marshal(val)
if err != nil {
return "", "", nil, fmt.Errorf("error marshalling tailscaled config: %w", err)
}
mak.Set(&secret.StringData, fn, string(b))
if key > latest {
latest = key
latestConfig = val
}
}
if stsC.ServeConfig != nil {
j, err := json.Marshal(stsC.ServeConfig)
if err != nil {
return "", "", nil, err
}
mak.Set(&secret.StringData, "serve-config", string(j))
}
if orig != nil {
logger.Debugf("patching the existing proxy Secret with tailscaled config %s", sanitizeConfigBytes(latestConfig))
if err := a.Patch(ctx, secret, client.MergeFrom(orig)); err != nil {
return "", "", nil, err
}
} else {
logger.Debugf("creating a new Secret for the proxy with tailscaled config %s", sanitizeConfigBytes(latestConfig))
if err := a.Create(ctx, secret); err != nil {
return "", "", nil, err
}
}
return secret.Name, hash, configs, nil
return hash, configs, nil
}
// sanitizeConfigBytes returns ipn.ConfigVAlpha in string form with redacted
@@ -473,7 +479,7 @@ var proxyYaml []byte
//go:embed deploy/manifests/userspace-proxy.yaml
var userspaceProxyYaml []byte
func (a *tailscaleSTSReconciler) reconcileSTS(ctx context.Context, logger *zap.SugaredLogger, sts *tailscaleSTSConfig, headlessSvc *corev1.Service, proxySecret, tsConfigHash string, configs map[tailcfg.CapabilityVersion]ipn.ConfigVAlpha) (*appsv1.StatefulSet, error) {
func (a *tailscaleSTSReconciler) reconcileSTS(ctx context.Context, logger *zap.SugaredLogger, sts *tailscaleSTSConfig, headlessSvc *corev1.Service, tsConfigHash string, configs map[tailcfg.CapabilityVersion]ipn.ConfigVAlpha) (*appsv1.StatefulSet, error) {
ss := new(appsv1.StatefulSet)
if sts.ServeConfig != nil && sts.ForwardClusterTrafficViaL7IngressProxy != true { // If forwarding cluster traffic via is required we need non-userspace + NET_ADMIN + forwarding
if err := yaml.Unmarshal(userspaceProxyYaml, &ss); err != nil {
@@ -507,6 +513,7 @@ func (a *tailscaleSTSReconciler) reconcileSTS(ctx context.Context, logger *zap.S
"app": sts.ParentResourceUID,
},
}
ss.Spec.Replicas = &sts.Replicas
mak.Set(&pod.Labels, "app", sts.ParentResourceUID)
for key, val := range sts.ChildResourceLabels {
pod.Labels[key] = val // sync StatefulSet labels to Pod to make it easier for users to select the Pod
@@ -514,21 +521,34 @@ func (a *tailscaleSTSReconciler) reconcileSTS(ctx context.Context, logger *zap.S
// Generic containerboot configuration options.
container.Env = append(container.Env,
corev1.EnvVar{
Name: "POD_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
// Secret is named after the pod.
FieldPath: "metadata.name",
},
},
},
corev1.EnvVar{
Name: "TS_KUBE_SECRET",
Value: proxySecret,
Value: "$(POD_NAME)",
},
corev1.EnvVar{
// Old tailscaled config key is still used for backwards compatibility.
Name: "EXPERIMENTAL_TS_CONFIGFILE_PATH",
Value: "/etc/tsconfig/tailscaled",
Name: "TS_STATE",
Value: "kube:$(POD_NAME)",
},
corev1.EnvVar{
// New style is in the form of cap-<capability-version>.hujson.
Name: "TS_EXPERIMENTAL_VERSIONED_CONFIG_DIR",
Value: "/etc/tsconfig",
Value: "/etc/tsconfig/$(POD_NAME)",
},
)
if sts.ServeConfig != nil {
container.Env = append(container.Env, corev1.EnvVar{
Name: "TS_SERVE_CONFIG",
Value: "/etc/tsconfig/$(POD_NAME)/serve-config",
})
}
if sts.ForwardClusterTrafficViaL7IngressProxy {
container.Env = append(container.Env, corev1.EnvVar{
Name: "EXPERIMENTAL_ALLOW_PROXYING_CLUSTER_TRAFFIC_VIA_INGRESS",
@@ -538,27 +558,31 @@ func (a *tailscaleSTSReconciler) reconcileSTS(ctx context.Context, logger *zap.S
// Configure containeboot to run tailscaled with a configfile read from the state Secret.
mak.Set(&ss.Spec.Template.Annotations, podAnnotationLastSetConfigFileHash, tsConfigHash)
configVolume := corev1.Volume{
Name: "tailscaledconfig",
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: proxySecret,
for i := range sts.Replicas {
ss.Spec.Template.Spec.Volumes = append(ss.Spec.Template.Spec.Volumes, corev1.Volume{
Name: fmt.Sprintf("tailscaledconfig-%d", i),
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: fmt.Sprintf("%s-%d-config", ss.Name, i),
},
},
},
}
pod.Spec.Volumes = append(ss.Spec.Template.Spec.Volumes, configVolume)
container.VolumeMounts = append(container.VolumeMounts, corev1.VolumeMount{
Name: "tailscaledconfig",
ReadOnly: true,
MountPath: "/etc/tsconfig",
})
if a.tsFirewallMode != "" {
container.Env = append(container.Env, corev1.EnvVar{
Name: "TS_DEBUG_FIREWALL_MODE",
Value: a.tsFirewallMode,
})
container.VolumeMounts = append(container.VolumeMounts, corev1.VolumeMount{
Name: fmt.Sprintf("tailscaledconfig-%d", i),
ReadOnly: true,
MountPath: fmt.Sprintf("/etc/tsconfig/%s-%d", ss.Name, i),
})
}
// for this prototype
container.Env = append(container.Env, corev1.EnvVar{
Name: "TS_DEBUG_FIREWALL_MODE",
Value: "iptables",
},
corev1.EnvVar{Name: "TS_HEALTHCHECK_ADDR_PORT",
Value: ":8081",
},
)
pod.Spec.PriorityClassName = a.proxyPriorityClassName
// Ingress/egress proxy configuration options.
@@ -586,25 +610,6 @@ func (a *tailscaleSTSReconciler) reconcileSTS(ctx context.Context, logger *zap.S
Value: sts.TailnetTargetFQDN,
})
mak.Set(&ss.Spec.Template.Annotations, podAnnotationLastSetTailnetTargetFQDN, sts.TailnetTargetFQDN)
} else if sts.ServeConfig != nil {
container.Env = append(container.Env, corev1.EnvVar{
Name: "TS_SERVE_CONFIG",
Value: "/etc/tailscaled/serve-config",
})
container.VolumeMounts = append(container.VolumeMounts, corev1.VolumeMount{
Name: "serve-config",
ReadOnly: true,
MountPath: "/etc/tailscaled",
})
pod.Spec.Volumes = append(ss.Spec.Template.Spec.Volumes, corev1.Volume{
Name: "serve-config",
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: proxySecret,
Items: []corev1.KeyToPath{{Key: "serve-config", Path: "serve-config"}},
},
},
})
}
app, err := appInfoForProxy(sts)
if err != nil {
@@ -618,7 +623,7 @@ func (a *tailscaleSTSReconciler) reconcileSTS(ctx context.Context, logger *zap.S
Value: app,
})
}
logger.Debugf("reconciling statefulset %s/%s", ss.GetNamespace(), ss.GetName())
logger.Debugf("reconciling Statefulset %s/%s", ss.GetNamespace(), ss.GetName())
if sts.ProxyClassName != "" {
logger.Debugf("configuring proxy resources with ProxyClass %s", sts.ProxyClassName)
ss = applyProxyClassToStatefulSet(sts.ProxyClass, ss, sts, logger)
@@ -786,6 +791,14 @@ func readAuthKey(secret *corev1.Secret, key string) (*string, error) {
return origConf.AuthKey, nil
}
func hostNameForReplica(hostNamePrefix string, idx int32) *string {
if idx == 0 {
return &hostNamePrefix
}
s := fmt.Sprintf("%s-%d", hostNamePrefix, idx)
return &s
}
// tailscaledConfig takes a proxy config, a newly generated auth key if
// generated and a Secret with the previous proxy state and auth key and
// returns tailscaled configuration and a hash of that configuration.
@@ -795,13 +808,13 @@ func readAuthKey(secret *corev1.Secret, key string) (*string, error) {
// TODO (irbekrm): remove the legacy config once we no longer need to support
// versions older than cap94,
// https://tailscale.com/kb/1236/kubernetes-operator#operator-and-proxies
func tailscaledConfig(stsC *tailscaleSTSConfig, newAuthkey string, oldSecret *corev1.Secret) (tailscaledConfigs, error) {
func tailscaledConfig(stsC *tailscaleSTSConfig, newAuthkey string, oldSecret *corev1.Secret, idx int32) (tailscaledConfigs, error) {
conf := &ipn.ConfigVAlpha{
Version: "alpha0",
AcceptDNS: "false",
AcceptRoutes: "false", // AcceptRoutes defaults to true
Locked: "false",
Hostname: &stsC.Hostname,
Hostname: hostNameForReplica(stsC.Hostname, idx),
NoStatefulFiltering: "false",
}
@@ -896,7 +909,7 @@ type tailscaledConfigs map[tailcfg.CapabilityVersion]ipn.ConfigVAlpha
// thing that changed is operator version (the hash is also exposed to users via
// an annotation and might be confusing if it changes without the config having
// changed).
func tailscaledConfigHash(c tailscaledConfigs) (string, error) {
func tailscaledConfigHash(c []tailscaledConfigs) (string, error) {
b, err := json.Marshal(c)
if err != nil {
return "", fmt.Errorf("error marshalling tailscaled configs: %w", err)

View File

@@ -780,6 +780,19 @@ func (b *LocalBackend) pauseOrResumeControlClientLocked() {
b.cc.SetPaused((b.state == ipn.Stopped && b.netMap != nil) || (!networkUp && !testenv.InTest() && !assumeNetworkUpdateForTest()))
}
// LameDuck shuts down control client. This can be run before node shutdown to force control to consider this ndoe
// inactive. This can be used to ensure that nodes that are HA subnet router or app connector replicas are shutting
// down, clients switch over to other replicas whilst the existing connections are kept alive for some period of time.
func (b *LocalBackend) LameDuck() {
b.mu.Lock()
defer b.mu.Unlock()
cc := b.resetControlClientLocked()
if cc == nil {
return
}
cc.Shutdown()
}
// captivePortalDetectionInterval is the duration to wait in an unhealthy state with connectivity broken
// before running captive portal detection.
const captivePortalDetectionInterval = 2 * time.Second

View File

@@ -108,6 +108,7 @@ var handler = map[string]localAPIHandler{
"goroutines": (*Handler).serveGoroutines,
"handle-push-message": (*Handler).serveHandlePushMessage,
"id-token": (*Handler).serveIDToken,
"lameduck": (*Handler).lameDuck,
"login-interactive": (*Handler).serveLoginInteractive,
"logout": (*Handler).serveLogout,
"logtap": (*Handler).serveLogTap,
@@ -952,6 +953,22 @@ func (h *Handler) servePprof(w http.ResponseWriter, r *http.Request) {
servePprofFunc(w, r)
}
// lameDuck is the handler for local API /lameduck endpoint that shuts down control client, so that node no longer
// communicates with control. Doing this makes control consider this node inactive. This can be used before shutting
// down a replica of HA subnet router or app connector deployments to ensure that control tells the peers to switch
// over to another replica whilst still maintaining th existing peer connections.
func (h *Handler) lameDuck(w http.ResponseWriter, r *http.Request) {
if !h.PermitWrite {
http.Error(w, "access denied", http.StatusForbidden)
return
}
if r.Method != httpm.POST {
http.Error(w, "use POST", http.StatusMethodNotAllowed)
return
}
h.b.LameDuck()
}
func (h *Handler) reloadConfig(w http.ResponseWriter, r *http.Request) {
if !h.PermitWrite {
http.Error(w, "access denied", http.StatusForbidden)

View File

@@ -83,11 +83,12 @@ _Appears in:_
| Field | Description | Default | Validation |
| --- | --- | --- | --- |
| `subnetRouter` _[SubnetRouter](#subnetrouter)_ | SubnetRouter defines subnet routes that the Connector node should<br />expose to tailnet. If unset, none are exposed.<br />https://tailscale.com/kb/1019/subnets/ | | |
| `exitNode` _boolean_ | ExitNode defines whether the Connector node should act as a<br />Tailscale exit node. Defaults to false.<br />https://tailscale.com/kb/1103/exit-nodes | | |
| `replicas` _integer_ | | | |
| `tags` _[Tags](#tags)_ | Tags that the Tailscale node will be tagged with.<br />Defaults to [tag:k8s].<br />To autoapprove the subnet routes or exit node defined by a Connector,<br />you can configure Tailscale ACLs to give these tags the necessary<br />permissions.<br />See https://tailscale.com/kb/1337/acl-syntax#autoapprovers.<br />If you specify custom tags here, you must also make the operator an owner of these tags.<br />See https://tailscale.com/kb/1236/kubernetes-operator/#setting-up-the-kubernetes-operator.<br />Tags cannot be changed once a Connector node has been created.<br />Tag values must be in form ^tag:[a-zA-Z][a-zA-Z0-9-]*$. | | Pattern: `^tag:[a-zA-Z][a-zA-Z0-9-]*$` <br />Type: string <br /> |
| `hostname` _[Hostname](#hostname)_ | Hostname is the tailnet hostname that should be assigned to the<br />Connector node. If unset, hostname defaults to <connector<br />name>-connector. Hostname can contain lower case letters, numbers and<br />dashes, it must not start or end with a dash and must be between 2<br />and 63 characters long. | | Pattern: `^[a-z0-9][a-z0-9-]{0,61}[a-z0-9]$` <br />Type: string <br /> |
| `proxyClass` _string_ | ProxyClass is the name of the ProxyClass custom resource that<br />contains configuration options that should be applied to the<br />resources created for this Connector. If unset, the operator will<br />create resources with the default configuration. | | |
| `subnetRouter` _[SubnetRouter](#subnetrouter)_ | SubnetRouter defines subnet routes that the Connector node should<br />expose to tailnet. If unset, none are exposed.<br />https://tailscale.com/kb/1019/subnets/ | | |
| `exitNode` _boolean_ | ExitNode defines whether the Connector node should act as a<br />Tailscale exit node. Defaults to false.<br />https://tailscale.com/kb/1103/exit-nodes | | |
#### ConnectorStatus

View File

@@ -57,6 +57,17 @@ type ConnectorList struct {
// ConnectorSpec describes a Tailscale node to be deployed in the cluster.
// +kubebuilder:validation:XValidation:rule="has(self.subnetRouter) || self.exitNode == true",message="A Connector needs to be either an exit node or a subnet router, or both."
type ConnectorSpec struct {
// SubnetRouter defines subnet routes that the Connector node should
// expose to tailnet. If unset, none are exposed.
// https://tailscale.com/kb/1019/subnets/
// +optional
SubnetRouter *SubnetRouter `json:"subnetRouter"`
// ExitNode defines whether the Connector node should act as a
// Tailscale exit node. Defaults to false.
// https://tailscale.com/kb/1103/exit-nodes
// +optional
ExitNode bool `json:"exitNode"`
Replicas *int `json:"replicas"`
// Tags that the Tailscale node will be tagged with.
// Defaults to [tag:k8s].
// To autoapprove the subnet routes or exit node defined by a Connector,
@@ -82,16 +93,6 @@ type ConnectorSpec struct {
// create resources with the default configuration.
// +optional
ProxyClass string `json:"proxyClass,omitempty"`
// SubnetRouter defines subnet routes that the Connector node should
// expose to tailnet. If unset, none are exposed.
// https://tailscale.com/kb/1019/subnets/
// +optional
SubnetRouter *SubnetRouter `json:"subnetRouter"`
// ExitNode defines whether the Connector node should act as a
// Tailscale exit node. Defaults to false.
// https://tailscale.com/kb/1103/exit-nodes
// +optional
ExitNode bool `json:"exitNode"`
}
// SubnetRouter defines subnet routes that should be exposed to tailnet via a

View File

@@ -75,16 +75,21 @@ func (in *ConnectorList) DeepCopyObject() runtime.Object {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ConnectorSpec) DeepCopyInto(out *ConnectorSpec) {
*out = *in
if in.Tags != nil {
in, out := &in.Tags, &out.Tags
*out = make(Tags, len(*in))
copy(*out, *in)
}
if in.SubnetRouter != nil {
in, out := &in.SubnetRouter, &out.SubnetRouter
*out = new(SubnetRouter)
(*in).DeepCopyInto(*out)
}
if in.Replicas != nil {
in, out := &in.Replicas, &out.Replicas
*out = new(int)
**out = **in
}
if in.Tags != nil {
in, out := &in.Tags, &out.Tags
*out = make(Tags, len(*in))
copy(*out, *in)
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConnectorSpec.

View File

@@ -372,6 +372,14 @@ func (i *iptablesRunner) AddDNATRule(origDst, dst netip.Addr) error {
return table.Insert("nat", "PREROUTING", 1, "--destination", origDst.String(), "-j", "DNAT", "--to-destination", dst.String())
}
func (i *iptablesRunner) AddDropRule(dst netip.Addr) error {
table := i.getIPTByAddr(dst)
if err := table.Insert("filter", "OUTPUT", 1, "--destination", dst.String(), "-j", "DROP"); err != nil {
return err
}
return table.Insert("filter", "INPUT", 1, "--source", dst.String(), "-j", "DROP")
}
// EnsureSNATForDst sets up firewall to ensure that all traffic aimed for dst, has its source ip set to src:
// - creates a SNAT rule if not already present
// - ensures that any no longer valid SNAT rules for the same dst are removed

View File

@@ -570,6 +570,8 @@ type NetfilterRunner interface {
// DelMagicsockPortRule removes the rule created by AddMagicsockPortRule,
// if it exists.
DelMagicsockPortRule(port uint16, network string) error
AddDropRule(dst netip.Addr) error
}
// New creates a NetfilterRunner, auto-detecting whether to use
@@ -692,6 +694,9 @@ func (n *nftablesRunner) HasIPV6NAT() bool {
func (n *nftablesRunner) HasIPV6Filter() bool {
return n.v6Available
}
func (n *nftablesRunner) AddDropRule(addr netip.Addr) error {
return nil
}
// findRule iterates through the rules to find the rule with matching expressions.
func findRule(conn *nftables.Conn, rule *nftables.Rule) (*nftables.Rule, error) {