Compare commits
1 Commits
awly/cli-j
...
marwan/alt
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d77d3e5a45 |
@@ -1094,29 +1094,6 @@ func (lc *LocalClient) NetworkLockDisable(ctx context.Context, secret []byte) er
|
||||
return nil
|
||||
}
|
||||
|
||||
// StreamServe returns an io.ReadCloser that streams serve/Funnel
|
||||
// connections made to the provided HostPort.
|
||||
//
|
||||
// If Serve and Funnel were not already enabled for the HostPort in the ServeConfig,
|
||||
// the backend enables it for the duration of the context's lifespan and
|
||||
// then turns it back off once the context is closed. If either are already enabled,
|
||||
// then they remain that way but logs are still streamed
|
||||
func (lc *LocalClient) StreamServe(ctx context.Context, hp ipn.ServeStreamRequest) (io.ReadCloser, error) {
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", "http://"+apitype.LocalAPIHost+"/localapi/v0/stream-serve", jsonBody(hp))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res, err := lc.doLocalRequestNiceError(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if res.StatusCode != 200 {
|
||||
res.Body.Close()
|
||||
return nil, errors.New(res.Status)
|
||||
}
|
||||
return res.Body, nil
|
||||
}
|
||||
|
||||
// GetServeConfig return the current serve config.
|
||||
//
|
||||
// If the serve config is empty, it returns (nil, nil).
|
||||
|
||||
@@ -149,7 +149,6 @@ type localServeClient interface {
|
||||
QueryFeature(ctx context.Context, feature string) (*tailcfg.QueryFeatureResponse, error)
|
||||
WatchIPNBus(ctx context.Context, mask ipn.NotifyWatchOpt) (*tailscale.IPNBusWatcher, error)
|
||||
IncrementCounter(ctx context.Context, name string, delta int) error
|
||||
StreamServe(ctx context.Context, req ipn.ServeStreamRequest) (io.ReadCloser, error) // TODO: testing :)
|
||||
}
|
||||
|
||||
// serveEnv is the environment the serve command runs within. All I/O should be
|
||||
|
||||
@@ -5,9 +5,10 @@ package cli
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
@@ -30,14 +31,14 @@ var infoMap = map[string]commandInfo{
|
||||
ShortHelp: "Serve content and local servers on your tailnet",
|
||||
LongHelp: strings.Join([]string{
|
||||
"Serve lets you share a local server securely within your tailnet.",
|
||||
"To share a local server on the internet, use \"tailscale funnel\"",
|
||||
`To share a local server on the internet, use "tailscale funnel"`,
|
||||
}, "\n"),
|
||||
},
|
||||
"funnel": {
|
||||
ShortHelp: "Serve content and local servers on the internet",
|
||||
LongHelp: strings.Join([]string{
|
||||
"Funnel lets you share a local server on the internet using Tailscale.",
|
||||
"To share only within your tailnet, use \"tailscale serve\"",
|
||||
`To share only within your tailnet, use "tailscale serve"`,
|
||||
}, "\n"),
|
||||
},
|
||||
}
|
||||
@@ -134,14 +135,77 @@ func (e *serveEnv) runServeDev(funnel bool) execFunc {
|
||||
}
|
||||
|
||||
func (e *serveEnv) streamServe(ctx context.Context, req ipn.ServeStreamRequest) error {
|
||||
stream, err := e.lc.StreamServe(ctx, req)
|
||||
watcher, err := e.lc.WatchIPNBus(ctx, ipn.NotifyInitialState|ipn.NotifyServeRequest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer stream.Close()
|
||||
defer watcher.Close()
|
||||
n, err := watcher.Next()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sessionID := n.SessionID
|
||||
if sessionID == "" {
|
||||
return errors.New("missing SessionID")
|
||||
}
|
||||
sc, err := e.lc.GetServeConfig(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting serve config: %w", err)
|
||||
}
|
||||
if sc == nil {
|
||||
sc = &ipn.ServeConfig{}
|
||||
}
|
||||
setHandler(sc, req, sessionID)
|
||||
err = e.lc.SetServeConfig(ctx, sc)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error setting serve config: %w", err)
|
||||
}
|
||||
|
||||
fmt.Fprintf(os.Stderr, "Serve started on \"https://%s\".\n", strings.TrimSuffix(string(req.HostPort), ":443"))
|
||||
fmt.Fprintf(os.Stderr, "Press Ctrl-C to stop.\n\n")
|
||||
_, err = io.Copy(os.Stdout, stream)
|
||||
return err
|
||||
fmt.Fprintf(os.Stderr, "Funnel started on \"https://%s\".\n", strings.TrimSuffix(string(req.HostPort), ":443"))
|
||||
fmt.Fprintf(os.Stderr, "Press Ctrl-C to stop Funnel.\n\n")
|
||||
|
||||
for {
|
||||
n, err := watcher.Next()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error calling next: %w", err)
|
||||
}
|
||||
if n.RequestAccessLog == nil || n.RequestAccessLog.SessionID != sessionID {
|
||||
continue
|
||||
}
|
||||
bts, _ := json.Marshal(n.RequestAccessLog)
|
||||
fmt.Printf("%s\n", bts)
|
||||
}
|
||||
}
|
||||
|
||||
func setHandler(sc *ipn.ServeConfig, req ipn.ServeStreamRequest, sessionID string) {
|
||||
if sc.Foreground == nil {
|
||||
sc.Foreground = make(map[string]*ipn.ServeConfig)
|
||||
}
|
||||
if sc.Foreground[sessionID] == nil {
|
||||
sc.Foreground[sessionID] = &ipn.ServeConfig{}
|
||||
}
|
||||
if sc.Foreground[sessionID].TCP == nil {
|
||||
sc.Foreground[sessionID].TCP = make(map[uint16]*ipn.TCPPortHandler)
|
||||
}
|
||||
if _, ok := sc.Foreground[sessionID].TCP[443]; !ok {
|
||||
sc.Foreground[sessionID].TCP[443] = &ipn.TCPPortHandler{HTTPS: true}
|
||||
}
|
||||
if sc.Foreground[sessionID].Web == nil {
|
||||
sc.Foreground[sessionID].Web = make(map[ipn.HostPort]*ipn.WebServerConfig)
|
||||
}
|
||||
wsc, ok := sc.Foreground[sessionID].Web[req.HostPort]
|
||||
if !ok {
|
||||
wsc = &ipn.WebServerConfig{}
|
||||
sc.Foreground[sessionID].Web[req.HostPort] = wsc
|
||||
}
|
||||
if wsc.Handlers == nil {
|
||||
wsc.Handlers = make(map[string]*ipn.HTTPHandler)
|
||||
}
|
||||
wsc.Handlers[req.MountPoint] = &ipn.HTTPHandler{
|
||||
Proxy: req.Source,
|
||||
}
|
||||
if sc.AllowFunnel == nil {
|
||||
sc.AllowFunnel = make(map[ipn.HostPort]bool)
|
||||
}
|
||||
sc.AllowFunnel[req.HostPort] = true
|
||||
}
|
||||
|
||||
@@ -66,6 +66,8 @@ const (
|
||||
NotifyInitialNetMap // if set, the first Notify message (sent immediately) will contain the current NetMap
|
||||
|
||||
NotifyNoPrivateKeys // if set, private keys that would normally be sent in updates are zeroed out
|
||||
|
||||
NotifyServeRequest // if set, RequestAccessLog messages will be sent to the watcher
|
||||
)
|
||||
|
||||
// Notify is a communication from a backend (e.g. tailscaled) to a frontend
|
||||
@@ -122,6 +124,10 @@ type Notify struct {
|
||||
ClientVersion *tailcfg.ClientVersion `json:",omitempty"`
|
||||
|
||||
// type is mirrored in xcode/Shared/IPN.swift
|
||||
|
||||
// RequestAccessLog is a notification that a request
|
||||
// has been sent via the serve config.
|
||||
RequestAccessLog *RequestAccessLog `json:",omitempty"`
|
||||
}
|
||||
|
||||
func (n Notify) String() string {
|
||||
|
||||
@@ -246,9 +246,6 @@ type LocalBackend struct {
|
||||
|
||||
serveListeners map[netip.AddrPort]*serveListener // addrPort => serveListener
|
||||
serveProxyHandlers sync.Map // string (HTTPHandler.Proxy) => *httputil.ReverseProxy
|
||||
// serveStreamers is a map for those running Funnel in the foreground
|
||||
// and streaming incoming requests.
|
||||
serveStreamers map[uint16]map[uint32]func(ipn.FunnelRequestLog) // serve port => map of stream loggers (key is UUID)
|
||||
|
||||
// statusLock must be held before calling statusChanged.Wait() or
|
||||
// statusChanged.Broadcast().
|
||||
@@ -2014,7 +2011,18 @@ func (b *LocalBackend) WatchNotifications(ctx context.Context, mask ipn.NotifyWa
|
||||
go b.pollRequestEngineStatus(ctx)
|
||||
}
|
||||
|
||||
defer b.DeleteForegroundSession(sessionID) // TODO(marwan-at-work): check err
|
||||
// TODO(marwan-at-work): check err
|
||||
// TODO(marwan-at-work): streaming background logs?
|
||||
defer b.DeleteForegroundSession(sessionID)
|
||||
|
||||
if mask&ipn.NotifyServeRequest == 0 {
|
||||
fn = func(roNotify *ipn.Notify) (keepGoing bool) {
|
||||
if roNotify.RequestAccessLog != nil {
|
||||
return true
|
||||
}
|
||||
return origFn(roNotify)
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
@@ -2346,7 +2354,7 @@ func (b *LocalBackend) setAtomicValuesFromPrefsLocked(p ipn.PrefsView) {
|
||||
} else {
|
||||
filtered := tsaddr.FilterPrefixesCopy(p.AdvertiseRoutes(), tsaddr.IsViaPrefix)
|
||||
b.containsViaIPFuncAtomic.Store(tsaddr.NewContainsIPFunc(filtered))
|
||||
b.setTCPPortsInterceptedFromNetmapAndPrefsLocked(p)
|
||||
b.setTCPPortsInterceptedFromNetmapAndPrefsLocked(p, true)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4048,7 +4056,7 @@ func (b *LocalBackend) setNetMapLocked(nm *netmap.NetworkMap) {
|
||||
netns.SetBindToInterfaceByRoute(hasCapability(nm, tailcfg.CapabilityBindToInterfaceByRoute))
|
||||
netns.SetDisableBindConnToInterface(hasCapability(nm, tailcfg.CapabilityDebugDisableBindConnToInterface))
|
||||
|
||||
b.setTCPPortsInterceptedFromNetmapAndPrefsLocked(b.pm.CurrentPrefs())
|
||||
b.setTCPPortsInterceptedFromNetmapAndPrefsLocked(b.pm.CurrentPrefs(), true)
|
||||
if nm == nil {
|
||||
b.nodeByAddr = nil
|
||||
return
|
||||
@@ -4095,7 +4103,11 @@ func (b *LocalBackend) setDebugLogsByCapabilityLocked(nm *netmap.NetworkMap) {
|
||||
}
|
||||
}
|
||||
|
||||
func (b *LocalBackend) reloadServeConfigLocked(prefs ipn.PrefsView) {
|
||||
// reloadServeConfigLocked reloads the serve config from the store or resets the
|
||||
// serve config to nil if not logged in. The "changed" parameter, when false, instructs
|
||||
// the method to only run the reset-logic and not reload the store from memory to ensure
|
||||
// foreground sessions are not removed if they are not saved on disk.
|
||||
func (b *LocalBackend) reloadServeConfigLocked(prefs ipn.PrefsView, changed bool) {
|
||||
if b.netMap == nil || !b.netMap.SelfNode.Valid() || !prefs.Valid() || b.pm.CurrentProfile().ID == "" {
|
||||
// We're not logged in, so we don't have a profile.
|
||||
// Don't try to load the serve config.
|
||||
@@ -4103,6 +4115,9 @@ func (b *LocalBackend) reloadServeConfigLocked(prefs ipn.PrefsView) {
|
||||
b.serveConfig = ipn.ServeConfigView{}
|
||||
return
|
||||
}
|
||||
if !changed {
|
||||
return
|
||||
}
|
||||
confKey := ipn.ServeConfigKey(b.pm.CurrentProfile().ID)
|
||||
// TODO(maisem,bradfitz): prevent reading the config from disk
|
||||
// if the profile has not changed.
|
||||
@@ -4129,20 +4144,27 @@ func (b *LocalBackend) reloadServeConfigLocked(prefs ipn.PrefsView) {
|
||||
// the ports that tailscaled should handle as a function of b.netMap and b.prefs.
|
||||
//
|
||||
// b.mu must be held.
|
||||
func (b *LocalBackend) setTCPPortsInterceptedFromNetmapAndPrefsLocked(prefs ipn.PrefsView) {
|
||||
func (b *LocalBackend) setTCPPortsInterceptedFromNetmapAndPrefsLocked(prefs ipn.PrefsView, changed bool) {
|
||||
handlePorts := make([]uint16, 0, 4)
|
||||
|
||||
if prefs.Valid() && prefs.RunSSH() && envknob.CanSSHD() {
|
||||
handlePorts = append(handlePorts, 22)
|
||||
}
|
||||
|
||||
b.reloadServeConfigLocked(prefs)
|
||||
b.reloadServeConfigLocked(prefs, changed)
|
||||
if b.serveConfig.Valid() {
|
||||
servePorts := make([]uint16, 0, 3)
|
||||
b.serveConfig.TCP().Range(func(port uint16, _ ipn.TCPPortHandlerView) bool {
|
||||
if port > 0 {
|
||||
servePorts = append(servePorts, uint16(port))
|
||||
}
|
||||
addServePorts := func(tcp views.MapFn[uint16, *ipn.TCPPortHandler, ipn.TCPPortHandlerView]) {
|
||||
tcp.Range(func(port uint16, _ ipn.TCPPortHandlerView) bool {
|
||||
if port > 0 {
|
||||
servePorts = append(servePorts, uint16(port))
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
addServePorts(b.serveConfig.TCP())
|
||||
b.serveConfig.Foreground().Range(func(_ string, v ipn.ServeConfigView) (cont bool) {
|
||||
addServePorts(v.TCP())
|
||||
return true
|
||||
})
|
||||
handlePorts = append(handlePorts, servePorts...)
|
||||
@@ -4172,29 +4194,36 @@ func (b *LocalBackend) setServeProxyHandlersLocked() {
|
||||
return
|
||||
}
|
||||
var backends map[string]bool
|
||||
b.serveConfig.Web().Range(func(_ ipn.HostPort, conf ipn.WebServerConfigView) (cont bool) {
|
||||
conf.Handlers().Range(func(_ string, h ipn.HTTPHandlerView) (cont bool) {
|
||||
backend := h.Proxy()
|
||||
if backend == "" {
|
||||
// Only create proxy handlers for servers with a proxy backend.
|
||||
return true
|
||||
}
|
||||
mak.Set(&backends, backend, true)
|
||||
if _, ok := b.serveProxyHandlers.Load(backend); ok {
|
||||
return true
|
||||
}
|
||||
setBackends := func(webCfg views.MapFn[ipn.HostPort, *ipn.WebServerConfig, ipn.WebServerConfigView]) {
|
||||
webCfg.Range(func(_ ipn.HostPort, conf ipn.WebServerConfigView) (cont bool) {
|
||||
conf.Handlers().Range(func(_ string, h ipn.HTTPHandlerView) (cont bool) {
|
||||
backend := h.Proxy()
|
||||
if backend == "" {
|
||||
// Only create proxy handlers for servers with a proxy backend.
|
||||
return true
|
||||
}
|
||||
mak.Set(&backends, backend, true)
|
||||
if _, ok := b.serveProxyHandlers.Load(backend); ok {
|
||||
return true
|
||||
}
|
||||
|
||||
b.logf("serve: creating a new proxy handler for %s", backend)
|
||||
p, err := b.proxyHandlerForBackend(backend)
|
||||
if err != nil {
|
||||
// The backend endpoint (h.Proxy) should have been validated by expandProxyTarget
|
||||
// in the CLI, so just log the error here.
|
||||
b.logf("[unexpected] could not create proxy for %v: %s", backend, err)
|
||||
b.logf("serve: creating a new proxy handler for %s", backend)
|
||||
p, err := b.proxyHandlerForBackend(backend)
|
||||
if err != nil {
|
||||
// The backend endpoint (h.Proxy) should have been validated by expandProxyTarget
|
||||
// in the CLI, so just log the error here.
|
||||
b.logf("[unexpected] could not create proxy for %v: %s", backend, err)
|
||||
return true
|
||||
}
|
||||
b.serveProxyHandlers.Store(backend, p)
|
||||
return true
|
||||
}
|
||||
b.serveProxyHandlers.Store(backend, p)
|
||||
})
|
||||
return true
|
||||
})
|
||||
}
|
||||
setBackends(b.serveConfig.Web())
|
||||
b.serveConfig.Foreground().Range(func(_ string, v ipn.ServeConfigView) (cont bool) {
|
||||
setBackends(v.Web())
|
||||
return true
|
||||
})
|
||||
|
||||
@@ -4881,7 +4910,7 @@ func (b *LocalBackend) SetDevStateStore(key, value string) error {
|
||||
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
b.setTCPPortsInterceptedFromNetmapAndPrefsLocked(b.pm.CurrentPrefs())
|
||||
b.setTCPPortsInterceptedFromNetmapAndPrefsLocked(b.pm.CurrentPrefs(), true)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -23,13 +23,14 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"go4.org/mem"
|
||||
"tailscale.com/ipn"
|
||||
"tailscale.com/logtail/backoff"
|
||||
"tailscale.com/net/netutil"
|
||||
"tailscale.com/syncs"
|
||||
"tailscale.com/tailcfg"
|
||||
"tailscale.com/types/logger"
|
||||
"tailscale.com/types/views"
|
||||
"tailscale.com/util/mak"
|
||||
"tailscale.com/version"
|
||||
)
|
||||
@@ -239,17 +240,21 @@ func (b *LocalBackend) setServeConfigLocked(config *ipn.ServeConfig) error {
|
||||
|
||||
var bs []byte
|
||||
if config != nil {
|
||||
j, err := json.Marshal(config)
|
||||
// TODO(marwan): either Clone+StripForeground here (which means we need to double check lastServeConfJSON is unaffected)
|
||||
// OR: strip foreground on backend start ups.
|
||||
var err error
|
||||
bs, err = json.Marshal(config)
|
||||
if err != nil {
|
||||
return fmt.Errorf("encoding serve config: %w", err)
|
||||
}
|
||||
bs = j
|
||||
b.serveConfig = config.View()
|
||||
b.lastServeConfJSON = mem.B(bs)
|
||||
}
|
||||
if err := b.store.WriteState(confKey, bs); err != nil {
|
||||
return fmt.Errorf("writing ServeConfig to StateStore: %w", err)
|
||||
}
|
||||
|
||||
b.setTCPPortsInterceptedFromNetmapAndPrefsLocked(b.pm.CurrentPrefs())
|
||||
b.setTCPPortsInterceptedFromNetmapAndPrefsLocked(b.pm.CurrentPrefs(), false)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -271,154 +276,67 @@ func (b *LocalBackend) DeleteForegroundSession(sessionID string) error {
|
||||
return nil
|
||||
}
|
||||
sc := b.serveConfig.AsStruct()
|
||||
if hp, ok := shouldDeleteFunnel(sc, sessionID); ok {
|
||||
delete(sc.AllowFunnel, hp)
|
||||
if len(sc.AllowFunnel) == 0 {
|
||||
sc.AllowFunnel = nil
|
||||
}
|
||||
}
|
||||
delete(sc.Foreground, sessionID)
|
||||
return b.setServeConfigLocked(sc)
|
||||
}
|
||||
|
||||
// StreamServe opens a stream to write any incoming connections made
|
||||
// to the given HostPort out to the listening io.Writer.
|
||||
//
|
||||
// If Serve and Funnel were not already enabled for the HostPort in the ServeConfig,
|
||||
// the backend enables it for the duration of the context's lifespan and
|
||||
// then turns it back off once the context is closed. If either are already enabled,
|
||||
// then they remain that way but logs are still streamed
|
||||
func (b *LocalBackend) StreamServe(ctx context.Context, w io.Writer, req ipn.ServeStreamRequest) (err error) {
|
||||
f, ok := w.(http.Flusher)
|
||||
if !ok {
|
||||
return errors.New("writer not a flusher")
|
||||
// shouldDeleteFunnel returns the port and true if:
|
||||
// 1. This foreground has a TCP port, that
|
||||
// 2. Funnel has it referenced and allowed, and
|
||||
// 3. No other foreground or background session has the port
|
||||
// Ambiguity: what if another background had funnel on?
|
||||
func shouldDeleteFunnel(sc *ipn.ServeConfig, sessionID string) (hp ipn.HostPort, ok bool) {
|
||||
fg := sc.Foreground[sessionID]
|
||||
if len(fg.TCP) == 0 {
|
||||
return "", false
|
||||
}
|
||||
f.Flush()
|
||||
|
||||
port, err := req.HostPort.Port()
|
||||
if err != nil {
|
||||
return err
|
||||
// we can't have multiple TCPs under a single foreground session
|
||||
var port uint16
|
||||
for key := range fg.TCP {
|
||||
port = key
|
||||
}
|
||||
|
||||
// Turn on Funnel for the given HostPort.
|
||||
sc := b.ServeConfig().AsStruct()
|
||||
if sc == nil {
|
||||
sc = &ipn.ServeConfig{}
|
||||
}
|
||||
setHandler(sc, req)
|
||||
if err := b.SetServeConfig(sc); err != nil {
|
||||
return fmt.Errorf("errro setting serve config: %w", err)
|
||||
}
|
||||
// Defer turning off Funnel once stream ends.
|
||||
defer func() {
|
||||
sc := b.ServeConfig().AsStruct()
|
||||
deleteHandler(sc, req, port)
|
||||
err = errors.Join(err, b.SetServeConfig(sc))
|
||||
}()
|
||||
|
||||
var writeErrs []error
|
||||
writeToStream := func(log ipn.FunnelRequestLog) {
|
||||
jsonLog, err := json.Marshal(log)
|
||||
if err != nil {
|
||||
writeErrs = append(writeErrs, err)
|
||||
return
|
||||
var hasFunnel bool
|
||||
for key, b := range sc.AllowFunnel {
|
||||
givenPort, _ := key.Port()
|
||||
if givenPort == port {
|
||||
hasFunnel = b
|
||||
hp = key
|
||||
break
|
||||
}
|
||||
if _, err := fmt.Fprintf(w, "%s\n", jsonLog); err != nil {
|
||||
writeErrs = append(writeErrs, err)
|
||||
return
|
||||
}
|
||||
if !hasFunnel {
|
||||
return "", false
|
||||
}
|
||||
if _, ok := sc.TCP[port]; ok {
|
||||
return "", false
|
||||
}
|
||||
for key, givenFg := range sc.Foreground {
|
||||
if key == sessionID {
|
||||
continue
|
||||
}
|
||||
if _, ok := givenFg.TCP[port]; ok {
|
||||
return "", false
|
||||
}
|
||||
f.Flush()
|
||||
}
|
||||
|
||||
// Hook up connections stream.
|
||||
b.mu.Lock()
|
||||
mak.NonNilMapForJSON(&b.serveStreamers)
|
||||
if b.serveStreamers[port] == nil {
|
||||
b.serveStreamers[port] = make(map[uint32]func(ipn.FunnelRequestLog))
|
||||
}
|
||||
id := uuid.New().ID()
|
||||
b.serveStreamers[port][id] = writeToStream
|
||||
b.mu.Unlock()
|
||||
|
||||
// Clean up streamer when done.
|
||||
defer func() {
|
||||
b.mu.Lock()
|
||||
delete(b.serveStreamers[port], id)
|
||||
b.mu.Unlock()
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// Triggered by foreground `tailscale funnel` process
|
||||
// (the streamer) getting closed, or by turning off Tailscale.
|
||||
}
|
||||
|
||||
return errors.Join(writeErrs...)
|
||||
return hp, true
|
||||
}
|
||||
|
||||
func setHandler(sc *ipn.ServeConfig, req ipn.ServeStreamRequest) {
|
||||
if sc.TCP == nil {
|
||||
sc.TCP = make(map[uint16]*ipn.TCPPortHandler)
|
||||
// maybeLogServeConnection creates a RequestAccessLog and sends it to any watchers
|
||||
// who are subscribed to ipn.NotifyServeRequest. SessionID, when not empty, indicates
|
||||
// that this log is intended for a specific foreground serve session, otherwise it may be
|
||||
// a background serve whose logs are being followed.
|
||||
func (b *LocalBackend) maybeLogServeConnection(destPort uint16, srcAddr netip.AddrPort, sessionID string) {
|
||||
log := &ipn.RequestAccessLog{
|
||||
SrcAddr: srcAddr,
|
||||
Time: b.clock.Now(),
|
||||
SessionID: sessionID,
|
||||
}
|
||||
if _, ok := sc.TCP[443]; !ok {
|
||||
sc.TCP[443] = &ipn.TCPPortHandler{
|
||||
HTTPS: true,
|
||||
}
|
||||
}
|
||||
if sc.Web == nil {
|
||||
sc.Web = make(map[ipn.HostPort]*ipn.WebServerConfig)
|
||||
}
|
||||
wsc, ok := sc.Web[req.HostPort]
|
||||
if !ok {
|
||||
wsc = &ipn.WebServerConfig{}
|
||||
sc.Web[req.HostPort] = wsc
|
||||
}
|
||||
if wsc.Handlers == nil {
|
||||
wsc.Handlers = make(map[string]*ipn.HTTPHandler)
|
||||
}
|
||||
wsc.Handlers[req.MountPoint] = &ipn.HTTPHandler{
|
||||
Proxy: req.Source,
|
||||
}
|
||||
if req.Funnel {
|
||||
if sc.AllowFunnel == nil {
|
||||
sc.AllowFunnel = make(map[ipn.HostPort]bool)
|
||||
}
|
||||
sc.AllowFunnel[req.HostPort] = true
|
||||
}
|
||||
}
|
||||
|
||||
func deleteHandler(sc *ipn.ServeConfig, req ipn.ServeStreamRequest, port uint16) {
|
||||
delete(sc.AllowFunnel, req.HostPort)
|
||||
if sc.TCP != nil {
|
||||
delete(sc.TCP, port)
|
||||
}
|
||||
if sc.Web == nil {
|
||||
return
|
||||
}
|
||||
if sc.Web[req.HostPort] == nil {
|
||||
return
|
||||
}
|
||||
wsc, ok := sc.Web[req.HostPort]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if wsc.Handlers == nil {
|
||||
return
|
||||
}
|
||||
if _, ok := wsc.Handlers[req.MountPoint]; !ok {
|
||||
return
|
||||
}
|
||||
delete(wsc.Handlers, req.MountPoint)
|
||||
if len(wsc.Handlers) == 0 {
|
||||
delete(sc.Web, req.HostPort)
|
||||
}
|
||||
}
|
||||
|
||||
func (b *LocalBackend) maybeLogServeConnection(destPort uint16, srcAddr netip.AddrPort) {
|
||||
b.mu.Lock()
|
||||
streamers := b.serveStreamers[destPort]
|
||||
b.mu.Unlock()
|
||||
if len(streamers) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
var log ipn.FunnelRequestLog
|
||||
log.SrcAddr = srcAddr
|
||||
log.Time = b.clock.Now()
|
||||
|
||||
if node, user, ok := b.WhoIs(srcAddr); ok {
|
||||
log.NodeName = node.ComputedName()
|
||||
@@ -430,9 +348,7 @@ func (b *LocalBackend) maybeLogServeConnection(destPort uint16, srcAddr netip.Ad
|
||||
}
|
||||
}
|
||||
|
||||
for _, stream := range streamers {
|
||||
stream(log)
|
||||
}
|
||||
b.send(ipn.Notify{RequestAccessLog: log})
|
||||
}
|
||||
|
||||
func (b *LocalBackend) HandleIngressTCPConn(ingressPeer tailcfg.NodeView, target ipn.HostPort, srcAddr netip.AddrPort, getConnOrReset func() (net.Conn, bool), sendRST func()) {
|
||||
@@ -504,83 +420,93 @@ func (b *LocalBackend) tcpHandlerForServe(dport uint16, srcAddr netip.AddrPort)
|
||||
return nil
|
||||
}
|
||||
|
||||
tcph, ok := sc.TCP().GetOk(dport)
|
||||
if !ok {
|
||||
b.logf("[unexpected] localbackend: got TCP conn without TCP config for port %v; from %v", dport, srcAddr)
|
||||
findHandler := func(tcpCfg views.MapFn[uint16, *ipn.TCPPortHandler, ipn.TCPPortHandlerView], sessionID string) (handler func(net.Conn) error) {
|
||||
tcph, ok := tcpCfg.GetOk(dport)
|
||||
if !ok {
|
||||
b.logf("[unexpected] localbackend: got TCP conn without TCP config for port %v; from %v", dport, srcAddr)
|
||||
return nil
|
||||
}
|
||||
|
||||
if tcph.HTTPS() || tcph.HTTP() {
|
||||
hs := &http.Server{
|
||||
Handler: b.newServeWebHandlerForSession(sessionID),
|
||||
BaseContext: func(_ net.Listener) context.Context {
|
||||
return context.WithValue(context.Background(), serveHTTPContextKey{}, &serveHTTPContext{
|
||||
SrcAddr: srcAddr,
|
||||
DestPort: dport,
|
||||
})
|
||||
},
|
||||
}
|
||||
if tcph.HTTPS() {
|
||||
hs.TLSConfig = &tls.Config{
|
||||
GetCertificate: b.getTLSServeCertForPort(dport),
|
||||
}
|
||||
return func(c net.Conn) error {
|
||||
return hs.ServeTLS(netutil.NewOneConnListener(c, nil), "", "")
|
||||
}
|
||||
}
|
||||
|
||||
return func(c net.Conn) error {
|
||||
return hs.Serve(netutil.NewOneConnListener(c, nil))
|
||||
}
|
||||
}
|
||||
|
||||
if backDst := tcph.TCPForward(); backDst != "" {
|
||||
return func(conn net.Conn) error {
|
||||
defer conn.Close()
|
||||
b.maybeLogServeConnection(dport, srcAddr, sessionID)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
backConn, err := b.dialer.SystemDial(ctx, "tcp", backDst)
|
||||
cancel()
|
||||
if err != nil {
|
||||
b.logf("localbackend: failed to TCP proxy port %v (from %v) to %s: %v", dport, srcAddr, backDst, err)
|
||||
return nil
|
||||
}
|
||||
defer backConn.Close()
|
||||
if sni := tcph.TerminateTLS(); sni != "" {
|
||||
conn = tls.Server(conn, &tls.Config{
|
||||
GetCertificate: func(hi *tls.ClientHelloInfo) (*tls.Certificate, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
|
||||
defer cancel()
|
||||
pair, err := b.GetCertPEM(ctx, sni, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cert, err := tls.X509KeyPair(pair.CertPEM, pair.KeyPEM)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &cert, nil
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// TODO(bradfitz): do the RegisterIPPortIdentity and
|
||||
// UnregisterIPPortIdentity stuff that netstack does
|
||||
errc := make(chan error, 1)
|
||||
go func() {
|
||||
_, err := io.Copy(backConn, conn)
|
||||
errc <- err
|
||||
}()
|
||||
go func() {
|
||||
_, err := io.Copy(conn, backConn)
|
||||
errc <- err
|
||||
}()
|
||||
return <-errc
|
||||
}
|
||||
}
|
||||
|
||||
b.logf("closing TCP conn to port %v (from %v) with actionless TCPPortHandler", dport, srcAddr)
|
||||
return nil
|
||||
}
|
||||
|
||||
if tcph.HTTPS() || tcph.HTTP() {
|
||||
hs := &http.Server{
|
||||
Handler: http.HandlerFunc(b.serveWebHandler),
|
||||
BaseContext: func(_ net.Listener) context.Context {
|
||||
return context.WithValue(context.Background(), serveHTTPContextKey{}, &serveHTTPContext{
|
||||
SrcAddr: srcAddr,
|
||||
DestPort: dport,
|
||||
})
|
||||
},
|
||||
}
|
||||
if tcph.HTTPS() {
|
||||
hs.TLSConfig = &tls.Config{
|
||||
GetCertificate: b.getTLSServeCertForPort(dport),
|
||||
}
|
||||
return func(c net.Conn) error {
|
||||
return hs.ServeTLS(netutil.NewOneConnListener(c, nil), "", "")
|
||||
}
|
||||
}
|
||||
|
||||
return func(c net.Conn) error {
|
||||
return hs.Serve(netutil.NewOneConnListener(c, nil))
|
||||
}
|
||||
sc.Foreground().Range(func(k string, v ipn.ServeConfigView) (cont bool) {
|
||||
handler = findHandler(v.TCP(), k)
|
||||
return handler == nil
|
||||
})
|
||||
if handler != nil {
|
||||
return handler
|
||||
}
|
||||
|
||||
if backDst := tcph.TCPForward(); backDst != "" {
|
||||
return func(conn net.Conn) error {
|
||||
defer conn.Close()
|
||||
b.maybeLogServeConnection(dport, srcAddr)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
backConn, err := b.dialer.SystemDial(ctx, "tcp", backDst)
|
||||
cancel()
|
||||
if err != nil {
|
||||
b.logf("localbackend: failed to TCP proxy port %v (from %v) to %s: %v", dport, srcAddr, backDst, err)
|
||||
return nil
|
||||
}
|
||||
defer backConn.Close()
|
||||
if sni := tcph.TerminateTLS(); sni != "" {
|
||||
conn = tls.Server(conn, &tls.Config{
|
||||
GetCertificate: func(hi *tls.ClientHelloInfo) (*tls.Certificate, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
|
||||
defer cancel()
|
||||
pair, err := b.GetCertPEM(ctx, sni, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cert, err := tls.X509KeyPair(pair.CertPEM, pair.KeyPEM)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &cert, nil
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// TODO(bradfitz): do the RegisterIPPortIdentity and
|
||||
// UnregisterIPPortIdentity stuff that netstack does
|
||||
errc := make(chan error, 1)
|
||||
go func() {
|
||||
_, err := io.Copy(backConn, conn)
|
||||
errc <- err
|
||||
}()
|
||||
go func() {
|
||||
_, err := io.Copy(conn, backConn)
|
||||
errc <- err
|
||||
}()
|
||||
return <-errc
|
||||
}
|
||||
}
|
||||
|
||||
b.logf("closing TCP conn to port %v (from %v) with actionless TCPPortHandler", dport, srcAddr)
|
||||
return nil
|
||||
return findHandler(sc.TCP(), "")
|
||||
}
|
||||
|
||||
func getServeHTTPContext(r *http.Request) (c *serveHTTPContext, ok bool) {
|
||||
@@ -700,40 +626,45 @@ func (b *LocalBackend) addTailscaleIdentityHeaders(r *httputil.ProxyRequest) {
|
||||
r.Out.Header.Set("Tailscale-Headers-Info", "https://tailscale.com/s/serve-headers")
|
||||
}
|
||||
|
||||
func (b *LocalBackend) serveWebHandler(w http.ResponseWriter, r *http.Request) {
|
||||
h, mountPoint, ok := b.getServeHandler(r)
|
||||
if !ok {
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
if c, ok := getServeHTTPContext(r); ok {
|
||||
b.maybeLogServeConnection(c.DestPort, c.SrcAddr)
|
||||
}
|
||||
if s := h.Text(); s != "" {
|
||||
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
|
||||
io.WriteString(w, s)
|
||||
return
|
||||
}
|
||||
if v := h.Path(); v != "" {
|
||||
b.serveFileOrDirectory(w, r, v, mountPoint)
|
||||
return
|
||||
}
|
||||
if v := h.Proxy(); v != "" {
|
||||
p, ok := b.serveProxyHandlers.Load(v)
|
||||
// newServeWebHandlerForSession returns an http.HandlerFunc that matches incoming
|
||||
// HTTP requests to its corresponding Web Handler in the ServeConfig. The handler
|
||||
// also logs requests to the IPN Bus which will include the given sessionID if not empty.
|
||||
func (b *LocalBackend) newServeWebHandlerForSession(sessionID string) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
h, mountPoint, ok := b.getServeHandler(r)
|
||||
if !ok {
|
||||
http.Error(w, "unknown proxy destination", http.StatusInternalServerError)
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
h := p.(http.Handler)
|
||||
// Trim the mount point from the URL path before proxying. (#6571)
|
||||
if r.URL.Path != "/" {
|
||||
h = http.StripPrefix(strings.TrimSuffix(mountPoint, "/"), h)
|
||||
if c, ok := getServeHTTPContext(r); ok {
|
||||
b.maybeLogServeConnection(c.DestPort, c.SrcAddr, sessionID)
|
||||
}
|
||||
if s := h.Text(); s != "" {
|
||||
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
|
||||
io.WriteString(w, s)
|
||||
return
|
||||
}
|
||||
if v := h.Path(); v != "" {
|
||||
b.serveFileOrDirectory(w, r, v, mountPoint)
|
||||
return
|
||||
}
|
||||
if v := h.Proxy(); v != "" {
|
||||
p, ok := b.serveProxyHandlers.Load(v)
|
||||
if !ok {
|
||||
http.Error(w, "unknown proxy destination", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
h := p.(http.Handler)
|
||||
// Trim the mount point from the URL path before proxying. (#6571)
|
||||
if r.URL.Path != "/" {
|
||||
h = http.StripPrefix(strings.TrimSuffix(mountPoint, "/"), h)
|
||||
}
|
||||
h.ServeHTTP(w, r)
|
||||
return
|
||||
}
|
||||
h.ServeHTTP(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
http.Error(w, "empty handler", 500)
|
||||
http.Error(w, "empty handler", 500)
|
||||
}
|
||||
}
|
||||
|
||||
func (b *LocalBackend) serveFileOrDirectory(w http.ResponseWriter, r *http.Request, fileOrDir, mountPoint string) {
|
||||
@@ -844,6 +775,13 @@ func (b *LocalBackend) webServerConfig(hostname string, port uint16) (c ipn.WebS
|
||||
if !b.serveConfig.Valid() {
|
||||
return c, false
|
||||
}
|
||||
b.serveConfig.Foreground().Range(func(k string, v ipn.ServeConfigView) (cont bool) {
|
||||
c, ok = v.Web().GetOk(key)
|
||||
return !ok
|
||||
})
|
||||
if ok {
|
||||
return c, ok
|
||||
}
|
||||
return b.serveConfig.Web().GetOk(key)
|
||||
}
|
||||
|
||||
|
||||
@@ -296,7 +296,7 @@ func TestServeHTTPProxy(t *testing.T) {
|
||||
}))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
b.serveWebHandler(w, req)
|
||||
b.newServeWebHandlerForSession("")(w, req)
|
||||
|
||||
// Verify the headers.
|
||||
h := w.Result().Header
|
||||
|
||||
@@ -97,7 +97,6 @@ var handler = map[string]localAPIHandler{
|
||||
"set-expiry-sooner": (*Handler).serveSetExpirySooner,
|
||||
"start": (*Handler).serveStart,
|
||||
"status": (*Handler).serveStatus,
|
||||
"stream-serve": (*Handler).serveStreamServe,
|
||||
"tka/init": (*Handler).serveTKAInit,
|
||||
"tka/log": (*Handler).serveTKALog,
|
||||
"tka/modify": (*Handler).serveTKAModify,
|
||||
@@ -854,35 +853,6 @@ func (h *Handler) serveServeConfig(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
// serveStreamServe handles foreground serve and funnel streams. This is
|
||||
// currently in development per https://github.com/tailscale/tailscale/issues/8489
|
||||
func (h *Handler) serveStreamServe(w http.ResponseWriter, r *http.Request) {
|
||||
if !envknob.UseWIPCode() {
|
||||
http.Error(w, "stream serve not yet available", http.StatusNotImplemented)
|
||||
return
|
||||
}
|
||||
if !h.PermitWrite {
|
||||
// Write permission required because we modify the ServeConfig.
|
||||
http.Error(w, "serve stream denied", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
if r.Method != "POST" {
|
||||
http.Error(w, "POST required", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
var req ipn.ServeStreamRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeErrorJSON(w, fmt.Errorf("decoding HostPort: %w", err))
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
if err := h.b.StreamServe(r.Context(), w, req); err != nil {
|
||||
writeErrorJSON(w, fmt.Errorf("streaming serve: %w", err))
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
func (h *Handler) serveCheckIPForwarding(w http.ResponseWriter, r *http.Request) {
|
||||
if !h.PermitRead {
|
||||
http.Error(w, "IP forwarding check access denied", http.StatusForbidden)
|
||||
|
||||
14
ipn/serve.go
14
ipn/serve.go
@@ -38,7 +38,7 @@ type ServeConfig struct {
|
||||
// traffic is allowed, from trusted ingress peers.
|
||||
AllowFunnel map[HostPort]bool `json:",omitempty"`
|
||||
|
||||
// Foreground is a map of an IPN Bus session id to a
|
||||
// Foreground is a map of an IPN Bus session ID to a
|
||||
// foreground serve config. Note that only TCP and Web
|
||||
// are used inside the Foreground map.
|
||||
//
|
||||
@@ -107,16 +107,20 @@ type ServeStreamRequest struct {
|
||||
Funnel bool `json:",omitempty"`
|
||||
}
|
||||
|
||||
// FunnelRequestLog is the JSON type written out to io.Writers
|
||||
// watching funnel connections via ipnlocal.StreamServe.
|
||||
// RequestAccessLog is the JSON type written out to io.Writers
|
||||
// watching serve connections via ipnlocal.StreamServe.
|
||||
//
|
||||
// This structure is in development and subject to change.
|
||||
type FunnelRequestLog struct {
|
||||
type RequestAccessLog struct {
|
||||
Time time.Time `json:",omitempty"` // time of request forwarding
|
||||
|
||||
// SrcAddr is the address that initiated the Funnel request.
|
||||
// SrcAddr is the address that initiated the serve or funnel request.
|
||||
SrcAddr netip.AddrPort `json:",omitempty"`
|
||||
|
||||
// SessionID, if not empty, means this request was
|
||||
// meant for a specific WatchIPNBus session.
|
||||
SessionID string `json:",omitempty"`
|
||||
|
||||
// The following fields are only populated if the connection
|
||||
// initiated from another node on the client's tailnet.
|
||||
|
||||
|
||||
Reference in New Issue
Block a user