Compare commits

...

7 Commits

Author SHA1 Message Date
Tom DNetto
6462876f04 tka: implement filesystem chonk garbage collection
Signed-off-by: Tom DNetto <tom@tailscale.com>
2023-03-24 12:14:10 -07:00
Anton Tolchanov
6f9aed1656 scripts: use pkg server to determine supported deb/rpm distros
Fixes https://github.com/tailscale/corp/issues/8952

Signed-off-by: Anton Tolchanov <anton@tailscale.com>
2023-03-24 17:36:43 +00:00
Andrew Dunham
4cb1bfee44 net/netcheck: improve determinism in hairpinning test
If multiple Go channels have a value (or are closed), receiving from
them all in a select will nondeterministically return one of the two
arms. In this case, it's possible that the hairpin check timer will have
expired between when we start checking and before we check at all, but
the hairpin packet has already been received. In such cases, we'd
nondeterministically set report.HairPinning.

Instead, check if we have a value in our results channel first, then
select on the value and timeout channel after. Also, add a test that
catches this particular failure.

Fixes #1795

Change-Id: I842ab0bd38d66fabc6cabf2c2c1bb9bd32febf35
Signed-off-by: Andrew Dunham <andrew@du.nham.ca>
2023-03-24 12:01:23 -04:00
Maisem Ali
4a89642f7f log/sockstatlog: make shutdown close idle connections
Updates tailscale/corp#10030

Signed-off-by: Maisem Ali <maisem@tailscale.com>
2023-03-23 19:15:30 -07:00
Maisem Ali
9e81db50f6 ipn/ipnlocal: use atomicfile.WriteFile in certFileStore
Signed-off-by: Maisem Ali <maisem@tailscale.com>
2023-03-23 17:35:44 -07:00
Maisem Ali
8a11f76a0d ipn/ipnlocal: fix cert storage in Kubernetes
We were checking against the wrong directory, instead if we
have a custom store configured just use that.

Fixes #7588
Fixes #7665

Signed-off-by: Maisem Ali <maisem@tailscale.com>
2023-03-23 17:35:44 -07:00
Maisem Ali
ec90522a53 ipn/ipnlocal: also store ACME keys in the certStore
We were not storing the ACME keys in the state store, they would always
be stored on disk.

Updates #7588

Signed-off-by: Maisem Ali <maisem@tailscale.com>
2023-03-23 17:35:44 -07:00
9 changed files with 418 additions and 132 deletions

View File

@@ -212,7 +212,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de
tailscale.com/ipn/ipnstate from tailscale.com/control/controlclient+
tailscale.com/ipn/localapi from tailscale.com/ipn/ipnserver
tailscale.com/ipn/policy from tailscale.com/ipn/ipnlocal
tailscale.com/ipn/store from tailscale.com/cmd/tailscaled
tailscale.com/ipn/store from tailscale.com/cmd/tailscaled+
L tailscale.com/ipn/store/awsstore from tailscale.com/ipn/store
L tailscale.com/ipn/store/kubestore from tailscale.com/ipn/store
tailscale.com/ipn/store/mem from tailscale.com/ipn/store+

View File

@@ -31,10 +31,13 @@ import (
"time"
"golang.org/x/crypto/acme"
"tailscale.com/atomicfile"
"tailscale.com/envknob"
"tailscale.com/hostinfo"
"tailscale.com/ipn"
"tailscale.com/ipn/ipnstate"
"tailscale.com/ipn/store"
"tailscale.com/ipn/store/mem"
"tailscale.com/types/logger"
"tailscale.com/version"
"tailscale.com/version/distro"
@@ -82,11 +85,6 @@ func (b *LocalBackend) GetCertPEM(ctx context.Context, domain string) (*TLSCertK
return nil, errors.New("invalid domain")
}
logf := logger.WithPrefix(b.logf, fmt.Sprintf("cert(%q): ", domain))
dir, err := b.certDir()
if err != nil {
logf("failed to get certDir: %v", err)
return nil, err
}
now := time.Now()
traceACME := func(v any) {
if !acmeDebug() {
@@ -96,17 +94,22 @@ func (b *LocalBackend) GetCertPEM(ctx context.Context, domain string) (*TLSCertK
log.Printf("acme %T: %s", v, j)
}
if pair, err := b.getCertPEMCached(dir, domain, now); err == nil {
cs, err := b.getCertStore()
if err != nil {
return nil, err
}
if pair, err := getCertPEMCached(cs, domain, now); err == nil {
future := now.AddDate(0, 0, 14)
if b.shouldStartDomainRenewal(dir, domain, future) {
if b.shouldStartDomainRenewal(cs, domain, future) {
logf("starting async renewal")
// Start renewal in the background.
go b.getCertPEM(context.Background(), logf, traceACME, dir, domain, future)
go b.getCertPEM(context.Background(), cs, logf, traceACME, domain, future)
}
return pair, nil
}
pair, err := b.getCertPEM(ctx, logf, traceACME, dir, domain, now)
pair, err := b.getCertPEM(ctx, cs, logf, traceACME, domain, now)
if err != nil {
logf("getCertPEM: %v", err)
return nil, err
@@ -114,7 +117,7 @@ func (b *LocalBackend) GetCertPEM(ctx context.Context, domain string) (*TLSCertK
return pair, nil
}
func (b *LocalBackend) shouldStartDomainRenewal(dir, domain string, future time.Time) bool {
func (b *LocalBackend) shouldStartDomainRenewal(cs certStore, domain string, future time.Time) bool {
renewMu.Lock()
defer renewMu.Unlock()
now := time.Now()
@@ -124,7 +127,7 @@ func (b *LocalBackend) shouldStartDomainRenewal(dir, domain string, future time.
return false
}
lastRenewCheck[domain] = now
_, err := b.getCertPEMCached(dir, domain, future)
_, err := getCertPEMCached(cs, domain, future)
return errors.Is(err, errCertExpired)
}
@@ -140,15 +143,32 @@ type certStore interface {
WriteCert(domain string, cert []byte) error
// WriteKey writes the key for domain.
WriteKey(domain string, key []byte) error
// ACMEKey returns the value previously stored via WriteACMEKey.
// It is a PEM encoded ECDSA key.
ACMEKey() ([]byte, error)
// WriteACMEKey stores the provided PEM encoded ECDSA key.
WriteACMEKey([]byte) error
}
var errCertExpired = errors.New("cert expired")
func (b *LocalBackend) getCertStore(dir string) certStore {
if hostinfo.GetEnvType() == hostinfo.Kubernetes && dir == "/tmp" {
return certStateStore{StateStore: b.store}
func (b *LocalBackend) getCertStore() (certStore, error) {
switch b.store.(type) {
case *store.FileStore:
case *mem.Store:
default:
if hostinfo.GetEnvType() == hostinfo.Kubernetes {
// We're running in Kubernetes with a custom StateStore,
// use that instead of the cert directory.
// TODO(maisem): expand this to other environments?
return certStateStore{StateStore: b.store}, nil
}
}
return certFileStore{dir: dir}
dir, err := b.certDir()
if err != nil {
return nil, err
}
return certFileStore{dir: dir}, nil
}
// certFileStore implements certStore by storing the cert & key files in the named directory.
@@ -160,6 +180,25 @@ type certFileStore struct {
testRoots *x509.CertPool
}
const acmePEMName = "acme-account.key.pem"
func (f certFileStore) ACMEKey() ([]byte, error) {
pemName := filepath.Join(f.dir, acmePEMName)
v, err := os.ReadFile(pemName)
if err != nil {
if os.IsNotExist(err) {
return nil, ipn.ErrStateNotExist
}
return nil, err
}
return v, nil
}
func (f certFileStore) WriteACMEKey(b []byte) error {
pemName := filepath.Join(f.dir, acmePEMName)
return atomicfile.WriteFile(pemName, b, 0600)
}
func (f certFileStore) Read(domain string, now time.Time) (*TLSCertKeyPair, error) {
certPEM, err := os.ReadFile(certFile(f.dir, domain))
if err != nil {
@@ -182,11 +221,11 @@ func (f certFileStore) Read(domain string, now time.Time) (*TLSCertKeyPair, erro
}
func (f certFileStore) WriteCert(domain string, cert []byte) error {
return os.WriteFile(certFile(f.dir, domain), cert, 0644)
return atomicfile.WriteFile(certFile(f.dir, domain), cert, 0644)
}
func (f certFileStore) WriteKey(domain string, key []byte) error {
return os.WriteFile(keyFile(f.dir, domain), key, 0600)
return atomicfile.WriteFile(keyFile(f.dir, domain), key, 0600)
}
// certStateStore implements certStore by storing the cert & key files in an ipn.StateStore.
@@ -221,6 +260,14 @@ func (s certStateStore) WriteKey(domain string, key []byte) error {
return s.WriteState(ipn.StateKey(domain+".key"), key)
}
func (s certStateStore) ACMEKey() ([]byte, error) {
return s.ReadState(ipn.StateKey(acmePEMName))
}
func (s certStateStore) WriteACMEKey(key []byte) error {
return s.WriteState(ipn.StateKey(acmePEMName), key)
}
// TLSCertKeyPair is a TLS public and private key, and whether they were obtained
// from cache or freshly obtained.
type TLSCertKeyPair struct {
@@ -236,26 +283,26 @@ func certFile(dir, domain string) string { return filepath.Join(dir, domain+".cr
// domain exists on disk in dir that is valid at the provided now time.
// If the keypair is expired, it returns errCertExpired.
// If the keypair doesn't exist, it returns ipn.ErrStateNotExist.
func (b *LocalBackend) getCertPEMCached(dir, domain string, now time.Time) (p *TLSCertKeyPair, err error) {
func getCertPEMCached(cs certStore, domain string, now time.Time) (p *TLSCertKeyPair, err error) {
if !validLookingCertDomain(domain) {
// Before we read files from disk using it, validate it's halfway
// reasonable looking.
return nil, fmt.Errorf("invalid domain %q", domain)
}
return b.getCertStore(dir).Read(domain, now)
return cs.Read(domain, now)
}
func (b *LocalBackend) getCertPEM(ctx context.Context, logf logger.Logf, traceACME func(any), dir, domain string, now time.Time) (*TLSCertKeyPair, error) {
func (b *LocalBackend) getCertPEM(ctx context.Context, cs certStore, logf logger.Logf, traceACME func(any), domain string, now time.Time) (*TLSCertKeyPair, error) {
acmeMu.Lock()
defer acmeMu.Unlock()
if p, err := b.getCertPEMCached(dir, domain, now); err == nil {
if p, err := getCertPEMCached(cs, domain, now); err == nil {
return p, nil
} else if !errors.Is(err, ipn.ErrStateNotExist) && !errors.Is(err, errCertExpired) {
return nil, err
}
key, err := acmeKey(dir)
key, err := acmeKey(cs)
if err != nil {
return nil, fmt.Errorf("acmeKey: %w", err)
}
@@ -366,8 +413,7 @@ func (b *LocalBackend) getCertPEM(ctx context.Context, logf logger.Logf, traceAC
if err := encodeECDSAKey(&privPEM, certPrivKey); err != nil {
return nil, err
}
certStore := b.getCertStore(dir)
if err := certStore.WriteKey(domain, privPEM.Bytes()); err != nil {
if err := cs.WriteKey(domain, privPEM.Bytes()); err != nil {
return nil, err
}
@@ -390,7 +436,7 @@ func (b *LocalBackend) getCertPEM(ctx context.Context, logf logger.Logf, traceAC
return nil, err
}
}
if err := certStore.WriteCert(domain, certPEM.Bytes()); err != nil {
if err := cs.WriteCert(domain, certPEM.Bytes()); err != nil {
return nil, err
}
@@ -444,14 +490,15 @@ func parsePrivateKey(der []byte) (crypto.Signer, error) {
return nil, errors.New("acme/autocert: failed to parse private key")
}
func acmeKey(dir string) (crypto.Signer, error) {
pemName := filepath.Join(dir, "acme-account.key.pem")
if v, err := os.ReadFile(pemName); err == nil {
func acmeKey(cs certStore) (crypto.Signer, error) {
if v, err := cs.ACMEKey(); err == nil {
priv, _ := pem.Decode(v)
if priv == nil || !strings.Contains(priv.Type, "PRIVATE") {
return nil, errors.New("acme/autocert: invalid account key found in cache")
}
return parsePrivateKey(priv.Bytes)
} else if err != nil && !errors.Is(err, ipn.ErrStateNotExist) {
return nil, err
}
privKey, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
@@ -462,7 +509,7 @@ func acmeKey(dir string) (crypto.Signer, error) {
if err := encodeECDSAKey(&pemBuf, privKey); err != nil {
return nil, err
}
if err := os.WriteFile(pemName, pemBuf.Bytes(), 0600); err != nil {
if err := cs.WriteACMEKey(pemBuf.Bytes()); err != nil {
return nil, err
}
return privKey, nil

View File

@@ -299,7 +299,7 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, store ipn.StateStor
statsLogf: logger.LogOnChange(logf, 5*time.Minute, time.Now),
e: e,
pm: pm,
store: pm.Store(),
store: store,
dialer: dialer,
backendLogID: logID,
state: ipn.NoState,
@@ -4800,6 +4800,10 @@ func (b *LocalBackend) initTKALocked() error {
if err != nil {
return fmt.Errorf("opening tailchonk: %v", err)
}
// Actually delete data which has been purged for 7 days:
if err := storage.CollectGarbage(7 * 24 * time.Hour); err != nil {
b.logf("tka garbage collection failed: %v", err)
}
authority, err := tka.Open(storage)
if err != nil {
return fmt.Errorf("initializing tka: %v", err)

View File

@@ -36,6 +36,8 @@ type Logger struct {
logf logger.Logf
logger *logtail.Logger
filch *filch.Filch
tr *http.Transport
}
// deltaStat represents the bytes transferred during a time period.
@@ -88,6 +90,8 @@ func NewLogger(logdir string, logf logger.Logf, logID logid.PublicID) (*Logger,
cancelFn: cancel,
ticker: time.NewTicker(pollPeriod),
logf: logf,
filch: filch,
tr: logpolicy.NewLogtailTransport(logtail.DefaultHost),
}
logger.logger = logtail.NewLogger(logtail.Config{
BaseURL: logpolicy.LogURL(),
@@ -107,7 +111,7 @@ func NewLogger(logdir string, logf logger.Logf, logID logid.PublicID) (*Logger,
},
Stderr: io.Discard, // don't log to stderr
HTTPC: &http.Client{Transport: logpolicy.NewLogtailTransport(logtail.DefaultHost)},
HTTPC: &http.Client{Transport: logger.tr},
}, logf)
go logger.poll()
@@ -166,8 +170,10 @@ func (l *Logger) Flush() {
func (l *Logger) Shutdown() {
l.ticker.Stop()
l.logger.Shutdown(context.Background())
l.logger.Shutdown(l.ctx)
l.cancelFn()
l.filch.Close()
l.tr.CloseIdleConnections()
}
// delta calculates the delta stats between two SockStats snapshots.

View File

@@ -638,20 +638,23 @@ func (rs *reportState) waitHairCheck(ctx context.Context) {
return
}
// First, check whether we have a value before we check for timeouts.
select {
case <-rs.gotHairSTUN:
ret.HairPinning.Set(true)
return
default:
}
// Now, wait for a response or a timeout.
select {
case <-rs.gotHairSTUN:
ret.HairPinning.Set(true)
case <-rs.hairTimeout:
rs.c.vlogf("hairCheck timeout")
ret.HairPinning.Set(false)
default:
select {
case <-rs.gotHairSTUN:
ret.HairPinning.Set(true)
case <-rs.hairTimeout:
ret.HairPinning.Set(false)
case <-ctx.Done():
}
case <-ctx.Done():
rs.c.vlogf("hairCheck context timeout")
}
}

View File

@@ -47,6 +47,113 @@ func TestHairpinSTUN(t *testing.T) {
}
}
func TestHairpinWait(t *testing.T) {
makeClient := func(t *testing.T) (*Client, *reportState) {
tx := stun.NewTxID()
c := &Client{}
req := stun.Request(tx)
if !stun.Is(req) {
t.Fatal("expected STUN message")
}
var err error
rs := &reportState{
c: c,
hairTX: tx,
gotHairSTUN: make(chan netip.AddrPort, 1),
hairTimeout: make(chan struct{}),
report: newReport(),
}
rs.pc4Hair, err = net.ListenUDP("udp4", &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: 0,
})
if err != nil {
t.Fatal(err)
}
c.curState = rs
return c, rs
}
ll, err := net.ListenPacket("udp", "localhost:0")
if err != nil {
t.Fatal(err)
}
defer ll.Close()
dstAddr := netip.MustParseAddrPort(ll.LocalAddr().String())
t.Run("Success", func(t *testing.T) {
c, rs := makeClient(t)
req := stun.Request(rs.hairTX)
// Start a hairpin check to ourselves.
rs.startHairCheckLocked(dstAddr)
// Fake receiving the stun check from ourselves after some period of time.
src := netip.MustParseAddrPort(rs.pc4Hair.LocalAddr().String())
c.handleHairSTUNLocked(req, src)
rs.waitHairCheck(context.Background())
// Verify that we set HairPinning
if got := rs.report.HairPinning; !got.EqualBool(true) {
t.Errorf("wanted HairPinning=true, got %v", got)
}
})
t.Run("LateReply", func(t *testing.T) {
c, rs := makeClient(t)
req := stun.Request(rs.hairTX)
// Start a hairpin check to ourselves.
rs.startHairCheckLocked(dstAddr)
// Wait until we've timed out, to mimic the race in #1795.
<-rs.hairTimeout
// Fake receiving the stun check from ourselves after some period of time.
src := netip.MustParseAddrPort(rs.pc4Hair.LocalAddr().String())
c.handleHairSTUNLocked(req, src)
// Wait for a hairpin response
rs.waitHairCheck(context.Background())
// Verify that we set HairPinning
if got := rs.report.HairPinning; !got.EqualBool(true) {
t.Errorf("wanted HairPinning=true, got %v", got)
}
})
t.Run("Timeout", func(t *testing.T) {
_, rs := makeClient(t)
// Start a hairpin check to ourselves.
rs.startHairCheckLocked(dstAddr)
ctx, cancel := context.WithTimeout(context.Background(), hairpinCheckTimeout*50)
defer cancel()
// Wait in the background
waitDone := make(chan struct{})
go func() {
rs.waitHairCheck(ctx)
close(waitDone)
}()
// If we do nothing, then we time out; confirm that we set
// HairPinning to false in this case.
select {
case <-waitDone:
if got := rs.report.HairPinning; !got.EqualBool(false) {
t.Errorf("wanted HairPinning=false, got %v", got)
}
case <-ctx.Done():
t.Fatalf("timed out waiting for hairpin channel")
}
})
}
func TestBasic(t *testing.T) {
stunAddr, cleanup := stuntest.Serve(t)
defer cleanup()

View File

@@ -293,83 +293,28 @@ main() {
fi
fi
# Ideally we want to use curl, but on some installs we
# only have wget. Detect and use what's available.
CURL=
if type curl >/dev/null; then
CURL="curl -fsSL"
elif type wget >/dev/null; then
CURL="wget -q -O-"
fi
if [ -z "$CURL" ]; then
echo "The installer needs either curl or wget to download files."
echo "Please install either curl or wget to proceed."
exit 1
fi
# Step 2: having detected an OS we support, is it one of the
# versions we support?
OS_UNSUPPORTED=
case "$OS" in
ubuntu)
if [ "$VERSION" != "xenial" ] && \
[ "$VERSION" != "bionic" ] && \
[ "$VERSION" != "eoan" ] && \
[ "$VERSION" != "focal" ] && \
[ "$VERSION" != "groovy" ] && \
[ "$VERSION" != "hirsute" ] && \
[ "$VERSION" != "impish" ] && \
[ "$VERSION" != "jammy" ] && \
[ "$VERSION" != "kinetic" ] && \
[ "$VERSION" != "lunar" ]
then
OS_UNSUPPORTED=1
fi
;;
debian)
if [ "$VERSION" != "stretch" ] && \
[ "$VERSION" != "buster" ] && \
[ "$VERSION" != "bullseye" ] && \
[ "$VERSION" != "bookworm" ] && \
[ "$VERSION" != "sid" ]
then
OS_UNSUPPORTED=1
fi
;;
raspbian)
if [ "$VERSION" != "stretch" ] && \
[ "$VERSION" != "buster" ] && \
[ "$VERSION" != "bullseye" ]
then
OS_UNSUPPORTED=1
fi
;;
centos)
if [ "$VERSION" != "7" ] && \
[ "$VERSION" != "8" ] && \
[ "$VERSION" != "9" ]
then
OS_UNSUPPORTED=1
fi
;;
oracle)
if [ "$VERSION" != "7" ] && \
[ "$VERSION" != "8" ]
then
OS_UNSUPPORTED=1
fi
;;
rhel)
if [ "$VERSION" != "7" ] && \
[ "$VERSION" != "8" ] && \
[ "$VERSION" != "9" ]
then
OS_UNSUPPORTED=1
fi
;;
amazon-linux)
if [ "$VERSION" != "2" ] && \
[ "$VERSION" != "2022" ] && \
[ "$VERSION" != "2023" ]
then
OS_UNSUPPORTED=1
fi
;;
opensuse)
if [ "$VERSION" != "leap/15.1" ] && \
[ "$VERSION" != "leap/15.2" ] && \
[ "$VERSION" != "leap/15.3" ] && \
[ "$VERSION" != "leap/15.4" ] && \
[ "$VERSION" != "tumbleweed" ]
then
OS_UNSUPPORTED=1
fi
ubuntu|debian|raspbian|centos|oracle|rhel|amazon-linux|opensuse)
# Check with the package server whether a given version is supported.
URL="https://pkgs.tailscale.com/$TRACK/$OS/$VERSION/installer-supported"
$CURL "$URL" 2> /dev/null | grep -q OK || OS_UNSUPPORTED=1
;;
fedora)
# All versions supported, no version checking required.
@@ -474,19 +419,6 @@ main() {
echo "Installing Tailscale for $OS $VERSION, using method $PACKAGETYPE"
case "$PACKAGETYPE" in
apt)
# Ideally we want to use curl, but on some installs we
# only have wget. Detect and use what's available.
CURL=
if type curl >/dev/null; then
CURL="curl -fsSL"
elif type wget >/dev/null; then
CURL="wget -q -O-"
fi
if [ -z "$CURL" ]; then
echo "The installer needs either curl or wget to download files."
echo "Please install either curl or wget to proceed."
exit 1
fi
export DEBIAN_FRONTEND=noninteractive
if [ "$APT_KEY_TYPE" = "legacy" ] && ! type gpg >/dev/null; then
$SUDO apt-get update

View File

@@ -201,10 +201,6 @@ func ChonkDir(dir string) (*FS, error) {
return nil, fmt.Errorf("chonk directory %q is a file", dir)
}
// TODO(tom): *FS marks AUMs as deleted but does not actually
// delete them, to avoid data loss in the event of a bug.
// Implement deletion after we are fairly sure in the implementation.
return &FS{base: dir}, nil
}
@@ -218,6 +214,9 @@ func ChonkDir(dir string) (*FS, error) {
// much smaller than JSON for AUMs. The 'keyasint' thing isn't essential
// but again it saves a bunch of bytes.
type fsHashInfo struct {
// diskHash specifies the AUMHash this structure describes.
diskHash AUMHash
Children []AUMHash `cbor:"1,keyasint"`
AUM *AUM `cbor:"2,keyasint"`
CreatedUnix int64 `cbor:"3,keyasint,omitempty"`
@@ -344,6 +343,7 @@ func (c *FS) get(h AUMHash) (*fsHashInfo, error) {
if out.AUM != nil && out.AUM.Hash() != h {
return nil, fmt.Errorf("%s: AUM does not match file name hash %s", f.Name(), out.AUM.Hash())
}
out.diskHash = h
return &out, nil
}
@@ -380,6 +380,104 @@ func (c *FS) AllAUMs() ([]AUMHash, error) {
return out, err
}
// CollectGarbage frees up disk space by removing purged AUMs
// and files which contain no data.
func (c *FS) CollectGarbage(maxAge time.Duration) error {
c.mu.Lock()
defer c.mu.Unlock()
// Collect the list of all stored hashes which are marked
// for deletion & old enough to delete.
var (
deletionCandidates = make(map[AUMHash]*fsHashInfo)
purgeBefore = time.Now().Add(-maxAge)
)
err := c.scanHashes(func(info *fsHashInfo) {
// Mark for deletion all hashes which are explicitly purged, or
// hashes that store no data.
purged := info.PurgedUnix > 0 && time.Unix(info.PurgedUnix, 0).Before(purgeBefore)
if purged || (info.AUM == nil && len(info.Children) == 0) {
deletionCandidates[info.diskHash] = info
}
})
if err != nil {
return err
}
// TODO: consistency check that no deletion candidate is the last active
// ancestor nor a parent is the last active ancestor.
for h, info := range deletionCandidates {
// First, if we store the parent, remove the reference to this
// hash as a child.
if info.AUM != nil {
if parent, haveParent := info.AUM.Parent(); haveParent {
dir, base := c.aumDir(parent)
_, err := os.Stat(filepath.Join(dir, base))
parentExists := err == nil
if parentExists {
err := c.commit(parent, func(info *fsHashInfo) {
newChildren := make([]AUMHash, 0, len(info.Children))
for _, c := range info.Children {
if c != h {
newChildren = append(newChildren, c)
}
}
info.Children = newChildren
})
if err != nil {
return fmt.Errorf("mutating parent %x of %x: %v", parent, h, err)
}
}
}
}
dir, base := c.aumDir(h)
path := filepath.Join(dir, base)
if len(info.Children) == 0 {
// This hash has no dependencies.
//
// Technically, info.Children could be stale, because if this hash was
// someones parent then that someone would have removed their hash from
// the list. Because thats only ever a deletion tho, this is still safe,
// staleness will result in us not deleting this file but it will be
// deleted next time.
if err := os.Remove(path); err != nil {
return fmt.Errorf("removing dead entry: %w", err)
}
continue
}
// This hash has children it needs to keep track of, so might not
// be able to be deleted outright.
var delete bool
err := c.commit(h, func(info *fsHashInfo) {
info.AUM = nil // in all cases this hash shouldnt store its own AUM info
newChildren := make([]AUMHash, 0, len(info.Children))
for _, c := range info.Children {
if _, deleted := deletionCandidates[c]; !deleted {
newChildren = append(newChildren, c)
}
}
info.Children = newChildren
delete = len(newChildren) == 0
})
if err != nil {
return fmt.Errorf("mutating entry %x: %v", h, err)
}
if delete {
if err := os.Remove(path); err != nil {
return fmt.Errorf("removing empty entry: %w", err)
}
}
}
return nil
}
func (c *FS) scanHashes(eachHashInfo func(*fsHashInfo)) error {
prefixDirs, err := os.ReadDir(c.base)
if err != nil {

View File

@@ -630,6 +630,7 @@ func TestCompact(t *testing.T) {
// OLD is deleted because it does not match retention criteria, and
// though it is a descendant of the new lastActiveAncestor (C), it is not a
// descendant of a retained AUM.
// O is deleted because it is orphaned.
// G, & H are retained as recent (MinChain=2) ancestors of HEAD.
// E & F are retained because they are between retained AUMs (G+) and
// their newest checkpoint ancestor.
@@ -648,6 +649,9 @@ func TestCompact(t *testing.T) {
| -> F1 -> F2 | -> G2
| -> OLD
// Orphaned AUM
O
// make {A,B,C,D} compaction candidates
A.template = checkpoint
B.template = checkpoint
@@ -658,13 +662,14 @@ func TestCompact(t *testing.T) {
F1.hashSeed = 1
OLD.hashSeed = 2
G2.hashSeed = 3
O.hashSeed = 4
`, optTemplate("checkpoint", AUM{MessageKind: AUMCheckpoint, State: fakeState}))
storage := &compactingChonkFake{
Mem: (*c.Chonk().(*Mem)),
aumAge: map[AUMHash]time.Time{(c.AUMHashes["F1"]): time.Now()},
t: t,
wantDelete: []AUMHash{c.AUMHashes["A"], c.AUMHashes["B"], c.AUMHashes["OLD"]},
wantDelete: []AUMHash{c.AUMHashes["A"], c.AUMHashes["B"], c.AUMHashes["O"], c.AUMHashes["OLD"]},
}
lastActiveAncestor, err := Compact(storage, c.AUMHashes["H"], CompactionOptions{MinChain: 2, MinAge: time.Hour})
@@ -681,3 +686,87 @@ func TestCompact(t *testing.T) {
}
}
}
func TestCollectGarbage(t *testing.T) {
fakeState := &State{
Keys: []Key{{Kind: Key25519, Votes: 1}},
DisablementSecrets: [][]byte{bytes.Repeat([]byte{1}, 32)},
}
c := newTestchain(t, `
A -> B -> C -> C2 -> D -> E -> F -> G -> H
| -> OLD | -> G2
// make {A,B,C,D} compaction candidates
A.template = checkpoint
B.template = checkpoint
C.template = checkpoint
D.template = checkpoint
// tweak seeds of forks so hashes arent identical
OLD.hashSeed = 2
G2.hashSeed = 3
`, optTemplate("checkpoint", AUM{MessageKind: AUMCheckpoint, State: fakeState}))
// Populate a *FS chonk.
storage, err := ChonkDir(t.TempDir())
if err != nil {
t.Fatal(err)
}
for _, update := range c.AUMs {
if err := storage.CommitVerifiedAUMs([]AUM{update}); err != nil {
t.Fatal(err)
}
}
if err := storage.SetLastActiveAncestor(c.AUMHashes["A"]); err != nil {
t.Fatal(err)
}
// Run compaction.
lastActiveAncestor, err := Compact(storage, c.AUMHashes["H"], CompactionOptions{MinChain: 2, MinAge: 1})
if err != nil {
t.Errorf("Compact() failed: %v", err)
}
if lastActiveAncestor != c.AUMHashes["D"] {
t.Errorf("last active ancestor = %v, want %v", lastActiveAncestor, c.AUMHashes["C"])
}
deletedAUMs := []AUMHash{c.AUMHashes["A"], c.AUMHashes["B"], c.AUMHashes["C"], c.AUMHashes["C2"], c.AUMHashes["OLD"]}
// Make sure deleted AUMs are unreadable.
for _, h := range deletedAUMs {
if _, err := storage.AUM(h); err != os.ErrNotExist {
t.Errorf("storage.AUM(%v).err = %v, want ErrNotExist", h, err)
}
}
if err := storage.CollectGarbage(0); err != nil {
t.Fatal(err)
}
// Make sure files for deleted AUMs are gone.
for _, h := range deletedAUMs {
dir, base := storage.aumDir(h)
path := filepath.Join(dir, base)
// C2 is excluded, because its child D exists and the file
// stores the parent->child relationship.
if _, err := os.Stat(path); err == nil && h != c.AUMHashes["C2"] {
t.Errorf("file for deleted AUM %v exists", h)
}
}
if t.Failed() {
for name, hash := range c.AUMHashes {
t.Logf("AUM[%q] = %v", name, hash)
}
}
// Lastly, lets make sure an authority can start from the garbage-collected state.
a, err := Open(storage)
if err != nil {
t.Fatal(err)
}
if a.Head() != c.AUMHashes["H"] {
t.Errorf("head = %v, want %v", a.Head(), c.AUMHashes["H"])
}
}