aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/cli/internal/cache
diff options
context:
space:
mode:
author简律纯 <hsiangnianian@outlook.com>2023-04-28 01:36:44 +0800
committer简律纯 <hsiangnianian@outlook.com>2023-04-28 01:36:44 +0800
commitdd84b9d64fb98746a230cd24233ff50a562c39c9 (patch)
treeb583261ef00b3afe72ec4d6dacb31e57779a6faf /cli/internal/cache
parent0b46fcd72ac34382387b2bcf9095233efbcc52f4 (diff)
downloadHydroRoll-dd84b9d64fb98746a230cd24233ff50a562c39c9.tar.gz
HydroRoll-dd84b9d64fb98746a230cd24233ff50a562c39c9.zip
Diffstat (limited to 'cli/internal/cache')
-rw-r--r--cli/internal/cache/async_cache.go82
-rw-r--r--cli/internal/cache/cache.go317
-rw-r--r--cli/internal/cache/cache_fs.go174
-rw-r--r--cli/internal/cache/cache_fs_test.go253
-rw-r--r--cli/internal/cache/cache_http.go375
-rw-r--r--cli/internal/cache/cache_http_test.go245
-rw-r--r--cli/internal/cache/cache_noop.go23
-rw-r--r--cli/internal/cache/cache_signature_authentication.go88
-rw-r--r--cli/internal/cache/cache_signature_authentication_test.go195
-rw-r--r--cli/internal/cache/cache_test.go318
10 files changed, 2070 insertions, 0 deletions
diff --git a/cli/internal/cache/async_cache.go b/cli/internal/cache/async_cache.go
new file mode 100644
index 0000000..0a8f467
--- /dev/null
+++ b/cli/internal/cache/async_cache.go
@@ -0,0 +1,82 @@
+// Adapted from https://github.com/thought-machine/please
+// Copyright Thought Machine, Inc. or its affiliates. All Rights Reserved.
+// SPDX-License-Identifier: Apache-2.0
+package cache
+
+import (
+ "sync"
+
+ "github.com/vercel/turbo/cli/internal/turbopath"
+)
+
+// An asyncCache is a wrapper around a Cache interface that handles incoming
+// store requests asynchronously and attempts to return immediately.
+// The requests are handled on an internal queue, if that fills up then
+// incoming requests will start to block again until it empties.
+// Retrieval requests are still handled synchronously.
+type asyncCache struct {
+ requests chan cacheRequest
+ realCache Cache
+ wg sync.WaitGroup
+}
+
+// A cacheRequest models an incoming cache request on our queue.
+type cacheRequest struct {
+ anchor turbopath.AbsoluteSystemPath
+ key string
+ duration int
+ files []turbopath.AnchoredSystemPath
+}
+
+func newAsyncCache(realCache Cache, opts Opts) Cache {
+ c := &asyncCache{
+ requests: make(chan cacheRequest),
+ realCache: realCache,
+ }
+ c.wg.Add(opts.Workers)
+ for i := 0; i < opts.Workers; i++ {
+ go c.run()
+ }
+ return c
+}
+
+func (c *asyncCache) Put(anchor turbopath.AbsoluteSystemPath, key string, duration int, files []turbopath.AnchoredSystemPath) error {
+ c.requests <- cacheRequest{
+ anchor: anchor,
+ key: key,
+ files: files,
+ duration: duration,
+ }
+ return nil
+}
+
+func (c *asyncCache) Fetch(anchor turbopath.AbsoluteSystemPath, key string, files []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) {
+ return c.realCache.Fetch(anchor, key, files)
+}
+
+func (c *asyncCache) Exists(key string) ItemStatus {
+ return c.realCache.Exists(key)
+}
+
+func (c *asyncCache) Clean(anchor turbopath.AbsoluteSystemPath) {
+ c.realCache.Clean(anchor)
+}
+
+func (c *asyncCache) CleanAll() {
+ c.realCache.CleanAll()
+}
+
+func (c *asyncCache) Shutdown() {
+ // fmt.Println("Shutting down cache workers...")
+ close(c.requests)
+ c.wg.Wait()
+ // fmt.Println("Shut down all cache workers")
+}
+
+// run implements the actual async logic.
+func (c *asyncCache) run() {
+ for r := range c.requests {
+ _ = c.realCache.Put(r.anchor, r.key, r.duration, r.files)
+ }
+ c.wg.Done()
+}
diff --git a/cli/internal/cache/cache.go b/cli/internal/cache/cache.go
new file mode 100644
index 0000000..8b74272
--- /dev/null
+++ b/cli/internal/cache/cache.go
@@ -0,0 +1,317 @@
+// Package cache abstracts storing and fetching previously run tasks
+//
+// Adapted from https://github.com/thought-machine/please
+// Copyright Thought Machine, Inc. or its affiliates. All Rights Reserved.
+// SPDX-License-Identifier: Apache-2.0
+package cache
+
+import (
+ "errors"
+ "sync"
+
+ "github.com/vercel/turbo/cli/internal/analytics"
+ "github.com/vercel/turbo/cli/internal/fs"
+ "github.com/vercel/turbo/cli/internal/turbopath"
+ "github.com/vercel/turbo/cli/internal/util"
+ "golang.org/x/sync/errgroup"
+)
+
+// Cache is abstracted way to cache/fetch previously run tasks
+type Cache interface {
+ // Fetch returns true if there is a cache it. It is expected to move files
+ // into their correct position as a side effect
+ Fetch(anchor turbopath.AbsoluteSystemPath, hash string, files []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error)
+ Exists(hash string) ItemStatus
+ // Put caches files for a given hash
+ Put(anchor turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error
+ Clean(anchor turbopath.AbsoluteSystemPath)
+ CleanAll()
+ Shutdown()
+}
+
+// ItemStatus holds whether artifacts exists for a given hash on local
+// and/or remote caching server
+type ItemStatus struct {
+ Local bool `json:"local"`
+ Remote bool `json:"remote"`
+}
+
+const (
+ // CacheSourceFS is a constant to indicate local cache hit
+ CacheSourceFS = "LOCAL"
+ // CacheSourceRemote is a constant to indicate remote cache hit
+ CacheSourceRemote = "REMOTE"
+ // CacheEventHit is a constant to indicate a cache hit
+ CacheEventHit = "HIT"
+ // CacheEventMiss is a constant to indicate a cache miss
+ CacheEventMiss = "MISS"
+)
+
+type CacheEvent struct {
+ Source string `mapstructure:"source"`
+ Event string `mapstructure:"event"`
+ Hash string `mapstructure:"hash"`
+ Duration int `mapstructure:"duration"`
+}
+
+// DefaultLocation returns the default filesystem cache location, given a repo root
+func DefaultLocation(repoRoot turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath {
+ return repoRoot.UntypedJoin("node_modules", ".cache", "turbo")
+}
+
+// OnCacheRemoved defines a callback that the cache system calls if a particular cache
+// needs to be removed. In practice, this happens when Remote Caching has been disabled
+// the but CLI continues to try to use it.
+type OnCacheRemoved = func(cache Cache, err error)
+
+// ErrNoCachesEnabled is returned when both the filesystem and http cache are unavailable
+var ErrNoCachesEnabled = errors.New("no caches are enabled")
+
+// Opts holds configuration options for the cache
+// TODO(gsoltis): further refactor this into fs cache opts and http cache opts
+type Opts struct {
+ OverrideDir string
+ SkipRemote bool
+ SkipFilesystem bool
+ Workers int
+ RemoteCacheOpts fs.RemoteCacheOptions
+}
+
+// resolveCacheDir calculates the location turbo should use to cache artifacts,
+// based on the options supplied by the user.
+func (o *Opts) resolveCacheDir(repoRoot turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath {
+ if o.OverrideDir != "" {
+ return fs.ResolveUnknownPath(repoRoot, o.OverrideDir)
+ }
+ return DefaultLocation(repoRoot)
+}
+
+var _remoteOnlyHelp = `Ignore the local filesystem cache for all tasks. Only
+allow reading and caching artifacts using the remote cache.`
+
+// New creates a new cache
+func New(opts Opts, repoRoot turbopath.AbsoluteSystemPath, client client, recorder analytics.Recorder, onCacheRemoved OnCacheRemoved) (Cache, error) {
+ c, err := newSyncCache(opts, repoRoot, client, recorder, onCacheRemoved)
+ if err != nil && !errors.Is(err, ErrNoCachesEnabled) {
+ return nil, err
+ }
+ if opts.Workers > 0 {
+ return newAsyncCache(c, opts), err
+ }
+ return c, err
+}
+
+// newSyncCache can return an error with a usable noopCache.
+func newSyncCache(opts Opts, repoRoot turbopath.AbsoluteSystemPath, client client, recorder analytics.Recorder, onCacheRemoved OnCacheRemoved) (Cache, error) {
+ // Check to see if the user has turned off particular cache implementations.
+ useFsCache := !opts.SkipFilesystem
+ useHTTPCache := !opts.SkipRemote
+
+ // Since the above two flags are not mutually exclusive it is possible to configure
+ // yourself out of having a cache. We should tell you about it but we shouldn't fail
+ // your build for that reason.
+ //
+ // Further, since the httpCache can be removed at runtime, we need to insert a noopCache
+ // as a backup if you are configured to have *just* an httpCache.
+ //
+ // This is reduced from (!useFsCache && !useHTTPCache) || (!useFsCache & useHTTPCache)
+ useNoopCache := !useFsCache
+
+ // Build up an array of cache implementations, we can only ever have 1 or 2.
+ cacheImplementations := make([]Cache, 0, 2)
+
+ if useFsCache {
+ implementation, err := newFsCache(opts, recorder, repoRoot)
+ if err != nil {
+ return nil, err
+ }
+ cacheImplementations = append(cacheImplementations, implementation)
+ }
+
+ if useHTTPCache {
+ implementation := newHTTPCache(opts, client, recorder)
+ cacheImplementations = append(cacheImplementations, implementation)
+ }
+
+ if useNoopCache {
+ implementation := newNoopCache()
+ cacheImplementations = append(cacheImplementations, implementation)
+ }
+
+ // Precisely two cache implementations:
+ // fsCache and httpCache OR httpCache and noopCache
+ useMultiplexer := len(cacheImplementations) > 1
+ if useMultiplexer {
+ // We have early-returned any possible errors for this scenario.
+ return &cacheMultiplexer{
+ onCacheRemoved: onCacheRemoved,
+ opts: opts,
+ caches: cacheImplementations,
+ }, nil
+ }
+
+ // Precisely one cache implementation: fsCache OR noopCache
+ implementation := cacheImplementations[0]
+ _, isNoopCache := implementation.(*noopCache)
+
+ // We want to let the user know something is wonky, but we don't want
+ // to trigger their build to fail.
+ if isNoopCache {
+ return implementation, ErrNoCachesEnabled
+ }
+ return implementation, nil
+}
+
+// A cacheMultiplexer multiplexes several caches into one.
+// Used when we have several active (eg. http, dir).
+type cacheMultiplexer struct {
+ caches []Cache
+ opts Opts
+ mu sync.RWMutex
+ onCacheRemoved OnCacheRemoved
+}
+
+func (mplex *cacheMultiplexer) Put(anchor turbopath.AbsoluteSystemPath, key string, duration int, files []turbopath.AnchoredSystemPath) error {
+ return mplex.storeUntil(anchor, key, duration, files, len(mplex.caches))
+}
+
+type cacheRemoval struct {
+ cache Cache
+ err *util.CacheDisabledError
+}
+
+// storeUntil stores artifacts into higher priority caches than the given one.
+// Used after artifact retrieval to ensure we have them in eg. the directory cache after
+// downloading from the RPC cache.
+func (mplex *cacheMultiplexer) storeUntil(anchor turbopath.AbsoluteSystemPath, key string, duration int, files []turbopath.AnchoredSystemPath, stopAt int) error {
+ // Attempt to store on all caches simultaneously.
+ toRemove := make([]*cacheRemoval, stopAt)
+ g := &errgroup.Group{}
+ mplex.mu.RLock()
+ for i, cache := range mplex.caches {
+ if i == stopAt {
+ break
+ }
+ c := cache
+ i := i
+ g.Go(func() error {
+ err := c.Put(anchor, key, duration, files)
+ if err != nil {
+ cd := &util.CacheDisabledError{}
+ if errors.As(err, &cd) {
+ toRemove[i] = &cacheRemoval{
+ cache: c,
+ err: cd,
+ }
+ // we don't want this to cancel other cache actions
+ return nil
+ }
+ return err
+ }
+ return nil
+ })
+ }
+ mplex.mu.RUnlock()
+
+ if err := g.Wait(); err != nil {
+ return err
+ }
+
+ for _, removal := range toRemove {
+ if removal != nil {
+ mplex.removeCache(removal)
+ }
+ }
+ return nil
+}
+
+// removeCache takes a requested removal and tries to actually remove it. However,
+// multiple requests could result in concurrent requests to remove the same cache.
+// Let one of them win and propagate the error, the rest will no-op.
+func (mplex *cacheMultiplexer) removeCache(removal *cacheRemoval) {
+ mplex.mu.Lock()
+ defer mplex.mu.Unlock()
+ for i, cache := range mplex.caches {
+ if cache == removal.cache {
+ mplex.caches = append(mplex.caches[:i], mplex.caches[i+1:]...)
+ mplex.onCacheRemoved(cache, removal.err)
+ break
+ }
+ }
+}
+
+func (mplex *cacheMultiplexer) Fetch(anchor turbopath.AbsoluteSystemPath, key string, files []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) {
+ // Make a shallow copy of the caches, since storeUntil can call removeCache
+ mplex.mu.RLock()
+ caches := make([]Cache, len(mplex.caches))
+ copy(caches, mplex.caches)
+ mplex.mu.RUnlock()
+
+ // We need to return a composite cache status from multiple caches
+ // Initialize the empty struct so we can assign values to it. This is similar
+ // to how the Exists() method works.
+ combinedCacheState := ItemStatus{}
+
+ // Retrieve from caches sequentially; if we did them simultaneously we could
+ // easily write the same file from two goroutines at once.
+ for i, cache := range caches {
+ itemStatus, actualFiles, duration, err := cache.Fetch(anchor, key, files)
+ ok := itemStatus.Local || itemStatus.Remote
+
+ if err != nil {
+ cd := &util.CacheDisabledError{}
+ if errors.As(err, &cd) {
+ mplex.removeCache(&cacheRemoval{
+ cache: cache,
+ err: cd,
+ })
+ }
+ // We're ignoring the error in the else case, since with this cache
+ // abstraction, we want to check lower priority caches rather than fail
+ // the operation. Future work that plumbs UI / Logging into the cache system
+ // should probably log this at least.
+ }
+ if ok {
+ // Store this into other caches. We can ignore errors here because we know
+ // we have previously successfully stored in a higher-priority cache, and so the overall
+ // result is a success at fetching. Storing in lower-priority caches is an optimization.
+ _ = mplex.storeUntil(anchor, key, duration, actualFiles, i)
+
+ // If another cache had already set this to true, we don't need to set it again from this cache
+ combinedCacheState.Local = combinedCacheState.Local || itemStatus.Local
+ combinedCacheState.Remote = combinedCacheState.Remote || itemStatus.Remote
+ return combinedCacheState, actualFiles, duration, err
+ }
+ }
+
+ return ItemStatus{Local: false, Remote: false}, nil, 0, nil
+}
+
+func (mplex *cacheMultiplexer) Exists(target string) ItemStatus {
+ syncCacheState := ItemStatus{}
+ for _, cache := range mplex.caches {
+ itemStatus := cache.Exists(target)
+ syncCacheState.Local = syncCacheState.Local || itemStatus.Local
+ syncCacheState.Remote = syncCacheState.Remote || itemStatus.Remote
+ }
+
+ return syncCacheState
+}
+
+func (mplex *cacheMultiplexer) Clean(anchor turbopath.AbsoluteSystemPath) {
+ for _, cache := range mplex.caches {
+ cache.Clean(anchor)
+ }
+}
+
+func (mplex *cacheMultiplexer) CleanAll() {
+ for _, cache := range mplex.caches {
+ cache.CleanAll()
+ }
+}
+
+func (mplex *cacheMultiplexer) Shutdown() {
+ for _, cache := range mplex.caches {
+ cache.Shutdown()
+ }
+}
diff --git a/cli/internal/cache/cache_fs.go b/cli/internal/cache/cache_fs.go
new file mode 100644
index 0000000..fb15a02
--- /dev/null
+++ b/cli/internal/cache/cache_fs.go
@@ -0,0 +1,174 @@
+// Adapted from https://github.com/thought-machine/please
+// Copyright Thought Machine, Inc. or its affiliates. All Rights Reserved.
+// SPDX-License-Identifier: Apache-2.0
+
+// Package cache implements our cache abstraction.
+package cache
+
+import (
+ "encoding/json"
+ "fmt"
+
+ "github.com/vercel/turbo/cli/internal/analytics"
+ "github.com/vercel/turbo/cli/internal/cacheitem"
+ "github.com/vercel/turbo/cli/internal/turbopath"
+)
+
+// fsCache is a local filesystem cache
+type fsCache struct {
+ cacheDirectory turbopath.AbsoluteSystemPath
+ recorder analytics.Recorder
+}
+
+// newFsCache creates a new filesystem cache
+func newFsCache(opts Opts, recorder analytics.Recorder, repoRoot turbopath.AbsoluteSystemPath) (*fsCache, error) {
+ cacheDir := opts.resolveCacheDir(repoRoot)
+ if err := cacheDir.MkdirAll(0775); err != nil {
+ return nil, err
+ }
+ return &fsCache{
+ cacheDirectory: cacheDir,
+ recorder: recorder,
+ }, nil
+}
+
+// Fetch returns true if items are cached. It moves them into position as a side effect.
+func (f *fsCache) Fetch(anchor turbopath.AbsoluteSystemPath, hash string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) {
+ uncompressedCachePath := f.cacheDirectory.UntypedJoin(hash + ".tar")
+ compressedCachePath := f.cacheDirectory.UntypedJoin(hash + ".tar.zst")
+
+ var actualCachePath turbopath.AbsoluteSystemPath
+ if uncompressedCachePath.FileExists() {
+ actualCachePath = uncompressedCachePath
+ } else if compressedCachePath.FileExists() {
+ actualCachePath = compressedCachePath
+ } else {
+ // It's not in the cache, bail now
+ f.logFetch(false, hash, 0)
+ return ItemStatus{Local: false}, nil, 0, nil
+ }
+
+ cacheItem, openErr := cacheitem.Open(actualCachePath)
+ if openErr != nil {
+ return ItemStatus{Local: false}, nil, 0, openErr
+ }
+
+ restoredFiles, restoreErr := cacheItem.Restore(anchor)
+ if restoreErr != nil {
+ _ = cacheItem.Close()
+ return ItemStatus{Local: false}, nil, 0, restoreErr
+ }
+
+ meta, err := ReadCacheMetaFile(f.cacheDirectory.UntypedJoin(hash + "-meta.json"))
+ if err != nil {
+ _ = cacheItem.Close()
+ return ItemStatus{Local: false}, nil, 0, fmt.Errorf("error reading cache metadata: %w", err)
+ }
+ f.logFetch(true, hash, meta.Duration)
+
+ // Wait to see what happens with close.
+ closeErr := cacheItem.Close()
+ if closeErr != nil {
+ return ItemStatus{Local: false}, restoredFiles, 0, closeErr
+ }
+ return ItemStatus{Local: true}, restoredFiles, meta.Duration, nil
+}
+
+func (f *fsCache) Exists(hash string) ItemStatus {
+ uncompressedCachePath := f.cacheDirectory.UntypedJoin(hash + ".tar")
+ compressedCachePath := f.cacheDirectory.UntypedJoin(hash + ".tar.zst")
+
+ if compressedCachePath.FileExists() || uncompressedCachePath.FileExists() {
+ return ItemStatus{Local: true}
+ }
+
+ return ItemStatus{Local: false}
+}
+
+func (f *fsCache) logFetch(hit bool, hash string, duration int) {
+ var event string
+ if hit {
+ event = CacheEventHit
+ } else {
+ event = CacheEventMiss
+ }
+ payload := &CacheEvent{
+ Source: CacheSourceFS,
+ Event: event,
+ Hash: hash,
+ Duration: duration,
+ }
+ f.recorder.LogEvent(payload)
+}
+
+func (f *fsCache) Put(anchor turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error {
+ cachePath := f.cacheDirectory.UntypedJoin(hash + ".tar.zst")
+ cacheItem, err := cacheitem.Create(cachePath)
+ if err != nil {
+ return err
+ }
+
+ for _, file := range files {
+ err := cacheItem.AddFile(anchor, file)
+ if err != nil {
+ _ = cacheItem.Close()
+ return err
+ }
+ }
+
+ writeErr := WriteCacheMetaFile(f.cacheDirectory.UntypedJoin(hash+"-meta.json"), &CacheMetadata{
+ Duration: duration,
+ Hash: hash,
+ })
+
+ if writeErr != nil {
+ _ = cacheItem.Close()
+ return writeErr
+ }
+
+ return cacheItem.Close()
+}
+
+func (f *fsCache) Clean(_ turbopath.AbsoluteSystemPath) {
+ fmt.Println("Not implemented yet")
+}
+
+func (f *fsCache) CleanAll() {
+ fmt.Println("Not implemented yet")
+}
+
+func (f *fsCache) Shutdown() {}
+
+// CacheMetadata stores duration and hash information for a cache entry so that aggregate Time Saved calculations
+// can be made from artifacts from various caches
+type CacheMetadata struct {
+ Hash string `json:"hash"`
+ Duration int `json:"duration"`
+}
+
+// WriteCacheMetaFile writes cache metadata file at a path
+func WriteCacheMetaFile(path turbopath.AbsoluteSystemPath, config *CacheMetadata) error {
+ jsonBytes, marshalErr := json.Marshal(config)
+ if marshalErr != nil {
+ return marshalErr
+ }
+ writeFilErr := path.WriteFile(jsonBytes, 0644)
+ if writeFilErr != nil {
+ return writeFilErr
+ }
+ return nil
+}
+
+// ReadCacheMetaFile reads cache metadata file at a path
+func ReadCacheMetaFile(path turbopath.AbsoluteSystemPath) (*CacheMetadata, error) {
+ jsonBytes, readFileErr := path.ReadFile()
+ if readFileErr != nil {
+ return nil, readFileErr
+ }
+ var config CacheMetadata
+ marshalErr := json.Unmarshal(jsonBytes, &config)
+ if marshalErr != nil {
+ return nil, marshalErr
+ }
+ return &config, nil
+}
diff --git a/cli/internal/cache/cache_fs_test.go b/cli/internal/cache/cache_fs_test.go
new file mode 100644
index 0000000..614ad86
--- /dev/null
+++ b/cli/internal/cache/cache_fs_test.go
@@ -0,0 +1,253 @@
+package cache
+
+import (
+ "path/filepath"
+ "testing"
+
+ "github.com/vercel/turbo/cli/internal/analytics"
+ "github.com/vercel/turbo/cli/internal/cacheitem"
+ "github.com/vercel/turbo/cli/internal/turbopath"
+ "gotest.tools/v3/assert"
+)
+
+type dummyRecorder struct{}
+
+func (dr *dummyRecorder) LogEvent(payload analytics.EventPayload) {}
+
+func TestPut(t *testing.T) {
+ // Set up a test source and cache directory
+ // The "source" directory simulates a package
+ //
+ // <src>/
+ // b
+ // child/
+ // a
+ // link -> ../b
+ // broken -> missing
+ //
+ // Ensure we end up with a matching directory under a
+ // "cache" directory:
+ //
+ // <dst>/the-hash/<src>/...
+
+ src := turbopath.AbsoluteSystemPath(t.TempDir())
+ childDir := src.UntypedJoin("child")
+ err := childDir.MkdirAll(0775)
+ assert.NilError(t, err, "Mkdir")
+ aPath := childDir.UntypedJoin("a")
+ aFile, err := aPath.Create()
+ assert.NilError(t, err, "Create")
+ _, err = aFile.WriteString("hello")
+ assert.NilError(t, err, "WriteString")
+ assert.NilError(t, aFile.Close(), "Close")
+
+ bPath := src.UntypedJoin("b")
+ bFile, err := bPath.Create()
+ assert.NilError(t, err, "Create")
+ _, err = bFile.WriteString("bFile")
+ assert.NilError(t, err, "WriteString")
+ assert.NilError(t, bFile.Close(), "Close")
+
+ srcLinkPath := childDir.UntypedJoin("link")
+ linkTarget := filepath.FromSlash("../b")
+ assert.NilError(t, srcLinkPath.Symlink(linkTarget), "Symlink")
+
+ srcBrokenLinkPath := childDir.Join("broken")
+ assert.NilError(t, srcBrokenLinkPath.Symlink("missing"), "Symlink")
+ circlePath := childDir.Join("circle")
+ assert.NilError(t, circlePath.Symlink(filepath.FromSlash("../child")), "Symlink")
+
+ files := []turbopath.AnchoredSystemPath{
+ turbopath.AnchoredUnixPath("child/").ToSystemPath(), // childDir
+ turbopath.AnchoredUnixPath("child/a").ToSystemPath(), // aPath,
+ turbopath.AnchoredUnixPath("b").ToSystemPath(), // bPath,
+ turbopath.AnchoredUnixPath("child/link").ToSystemPath(), // srcLinkPath,
+ turbopath.AnchoredUnixPath("child/broken").ToSystemPath(), // srcBrokenLinkPath,
+ turbopath.AnchoredUnixPath("child/circle").ToSystemPath(), // circlePath
+ }
+
+ dst := turbopath.AbsoluteSystemPath(t.TempDir())
+ dr := &dummyRecorder{}
+
+ cache := &fsCache{
+ cacheDirectory: dst,
+ recorder: dr,
+ }
+
+ hash := "the-hash"
+ duration := 0
+ putErr := cache.Put(src, hash, duration, files)
+ assert.NilError(t, putErr, "Put")
+
+ // Verify that we got the files that we're expecting
+ dstCachePath := dst.UntypedJoin(hash)
+
+ // This test checks outputs, so we go ahead and pull things back out.
+ // Attempting to satisfy our beliefs that the change is viable with
+ // as few changes to the tests as possible.
+ cacheItem, openErr := cacheitem.Open(dst.UntypedJoin(hash + ".tar.zst"))
+ assert.NilError(t, openErr, "Open")
+
+ _, restoreErr := cacheItem.Restore(dstCachePath)
+ assert.NilError(t, restoreErr, "Restore")
+
+ dstAPath := dstCachePath.UntypedJoin("child", "a")
+ assertFileMatches(t, aPath, dstAPath)
+
+ dstBPath := dstCachePath.UntypedJoin("b")
+ assertFileMatches(t, bPath, dstBPath)
+
+ dstLinkPath := dstCachePath.UntypedJoin("child", "link")
+ target, err := dstLinkPath.Readlink()
+ assert.NilError(t, err, "Readlink")
+ if target != linkTarget {
+ t.Errorf("Readlink got %v, want %v", target, linkTarget)
+ }
+
+ dstBrokenLinkPath := dstCachePath.UntypedJoin("child", "broken")
+ target, err = dstBrokenLinkPath.Readlink()
+ assert.NilError(t, err, "Readlink")
+ if target != "missing" {
+ t.Errorf("Readlink got %v, want missing", target)
+ }
+
+ dstCirclePath := dstCachePath.UntypedJoin("child", "circle")
+ circleLinkDest, err := dstCirclePath.Readlink()
+ assert.NilError(t, err, "Readlink")
+ expectedCircleLinkDest := filepath.FromSlash("../child")
+ if circleLinkDest != expectedCircleLinkDest {
+ t.Errorf("Cache link got %v, want %v", circleLinkDest, expectedCircleLinkDest)
+ }
+
+ assert.NilError(t, cacheItem.Close(), "Close")
+}
+
+func assertFileMatches(t *testing.T, orig turbopath.AbsoluteSystemPath, copy turbopath.AbsoluteSystemPath) {
+ t.Helper()
+ origBytes, err := orig.ReadFile()
+ assert.NilError(t, err, "ReadFile")
+ copyBytes, err := copy.ReadFile()
+ assert.NilError(t, err, "ReadFile")
+ assert.DeepEqual(t, origBytes, copyBytes)
+ origStat, err := orig.Lstat()
+ assert.NilError(t, err, "Lstat")
+ copyStat, err := copy.Lstat()
+ assert.NilError(t, err, "Lstat")
+ assert.Equal(t, origStat.Mode(), copyStat.Mode())
+}
+
+func TestFetch(t *testing.T) {
+ // Set up a test cache directory and target output directory
+ // The "cacheDir" directory simulates a cached package
+ //
+ // <cacheDir>/
+ // the-hash-meta.json
+ // the-hash/
+ // some-package/
+ // b
+ // child/
+ // a
+ // link -> ../b
+ // broken -> missing
+ // circle -> ../child
+ //
+ // Ensure we end up with a matching directory under a
+ // "some-package" directory:
+ //
+ // "some-package"/...
+
+ cacheDir := turbopath.AbsoluteSystemPath(t.TempDir())
+ hash := "the-hash"
+ src := cacheDir.UntypedJoin(hash, "some-package")
+ err := src.MkdirAll(0775)
+ assert.NilError(t, err, "mkdirAll")
+
+ childDir := src.UntypedJoin("child")
+ err = childDir.MkdirAll(0775)
+ assert.NilError(t, err, "Mkdir")
+ aPath := childDir.UntypedJoin("a")
+ aFile, err := aPath.Create()
+ assert.NilError(t, err, "Create")
+ _, err = aFile.WriteString("hello")
+ assert.NilError(t, err, "WriteString")
+ assert.NilError(t, aFile.Close(), "Close")
+
+ bPath := src.UntypedJoin("b")
+ bFile, err := bPath.Create()
+ assert.NilError(t, err, "Create")
+ _, err = bFile.WriteString("bFile")
+ assert.NilError(t, err, "WriteString")
+ assert.NilError(t, bFile.Close(), "Close")
+
+ srcLinkPath := childDir.UntypedJoin("link")
+ linkTarget := filepath.FromSlash("../b")
+ assert.NilError(t, srcLinkPath.Symlink(linkTarget), "Symlink")
+
+ srcBrokenLinkPath := childDir.UntypedJoin("broken")
+ srcBrokenLinkTarget := turbopath.AnchoredUnixPath("missing").ToSystemPath()
+ assert.NilError(t, srcBrokenLinkPath.Symlink(srcBrokenLinkTarget.ToString()), "Symlink")
+
+ circlePath := childDir.Join("circle")
+ srcCircleLinkTarget := turbopath.AnchoredUnixPath("../child").ToSystemPath()
+ assert.NilError(t, circlePath.Symlink(srcCircleLinkTarget.ToString()), "Symlink")
+
+ metadataPath := cacheDir.UntypedJoin("the-hash-meta.json")
+ err = metadataPath.WriteFile([]byte(`{"hash":"the-hash","duration":0}`), 0777)
+ assert.NilError(t, err, "WriteFile")
+
+ dr := &dummyRecorder{}
+
+ cache := &fsCache{
+ cacheDirectory: cacheDir,
+ recorder: dr,
+ }
+
+ inputFiles := []turbopath.AnchoredSystemPath{
+ turbopath.AnchoredUnixPath("some-package/child/").ToSystemPath(), // childDir
+ turbopath.AnchoredUnixPath("some-package/child/a").ToSystemPath(), // aPath,
+ turbopath.AnchoredUnixPath("some-package/b").ToSystemPath(), // bPath,
+ turbopath.AnchoredUnixPath("some-package/child/link").ToSystemPath(), // srcLinkPath,
+ turbopath.AnchoredUnixPath("some-package/child/broken").ToSystemPath(), // srcBrokenLinkPath,
+ turbopath.AnchoredUnixPath("some-package/child/circle").ToSystemPath(), // circlePath
+ }
+
+ putErr := cache.Put(cacheDir.UntypedJoin(hash), hash, 0, inputFiles)
+ assert.NilError(t, putErr, "Put")
+
+ outputDir := turbopath.AbsoluteSystemPath(t.TempDir())
+ dstOutputPath := "some-package"
+ cacheStatus, files, _, err := cache.Fetch(outputDir, "the-hash", []string{})
+ assert.NilError(t, err, "Fetch")
+ hit := cacheStatus.Local || cacheStatus.Remote
+ if !hit {
+ t.Error("Fetch got false, want true")
+ }
+ if len(files) != len(inputFiles) {
+ t.Errorf("len(files) got %v, want %v", len(files), len(inputFiles))
+ }
+
+ dstAPath := outputDir.UntypedJoin(dstOutputPath, "child", "a")
+ assertFileMatches(t, aPath, dstAPath)
+
+ dstBPath := outputDir.UntypedJoin(dstOutputPath, "b")
+ assertFileMatches(t, bPath, dstBPath)
+
+ dstLinkPath := outputDir.UntypedJoin(dstOutputPath, "child", "link")
+ target, err := dstLinkPath.Readlink()
+ assert.NilError(t, err, "Readlink")
+ if target != linkTarget {
+ t.Errorf("Readlink got %v, want %v", target, linkTarget)
+ }
+
+ // Assert that we restore broken symlinks correctly
+ dstBrokenLinkPath := outputDir.UntypedJoin(dstOutputPath, "child", "broken")
+ target, readlinkErr := dstBrokenLinkPath.Readlink()
+ assert.NilError(t, readlinkErr, "Readlink")
+ assert.Equal(t, target, srcBrokenLinkTarget.ToString())
+
+ // Assert that we restore symlinks to directories correctly
+ dstCirclePath := outputDir.UntypedJoin(dstOutputPath, "child", "circle")
+ circleTarget, circleReadlinkErr := dstCirclePath.Readlink()
+ assert.NilError(t, circleReadlinkErr, "Circle Readlink")
+ assert.Equal(t, circleTarget, srcCircleLinkTarget.ToString())
+}
diff --git a/cli/internal/cache/cache_http.go b/cli/internal/cache/cache_http.go
new file mode 100644
index 0000000..1d345bf
--- /dev/null
+++ b/cli/internal/cache/cache_http.go
@@ -0,0 +1,375 @@
+// Adapted from https://github.com/thought-machine/please
+// Copyright Thought Machine, Inc. or its affiliates. All Rights Reserved.
+// SPDX-License-Identifier: Apache-2.0
+package cache
+
+import (
+ "archive/tar"
+ "bytes"
+ "errors"
+ "fmt"
+ "io"
+ "io/ioutil"
+ log "log"
+ "net/http"
+ "os"
+ "path/filepath"
+ "strconv"
+ "time"
+
+ "github.com/DataDog/zstd"
+
+ "github.com/vercel/turbo/cli/internal/analytics"
+ "github.com/vercel/turbo/cli/internal/tarpatch"
+ "github.com/vercel/turbo/cli/internal/turbopath"
+)
+
+type client interface {
+ PutArtifact(hash string, body []byte, duration int, tag string) error
+ FetchArtifact(hash string) (*http.Response, error)
+ ArtifactExists(hash string) (*http.Response, error)
+ GetTeamID() string
+}
+
+type httpCache struct {
+ writable bool
+ client client
+ requestLimiter limiter
+ recorder analytics.Recorder
+ signerVerifier *ArtifactSignatureAuthentication
+ repoRoot turbopath.AbsoluteSystemPath
+}
+
+type limiter chan struct{}
+
+func (l limiter) acquire() {
+ l <- struct{}{}
+}
+
+func (l limiter) release() {
+ <-l
+}
+
+// mtime is the time we attach for the modification time of all files.
+var mtime = time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)
+
+// nobody is the usual uid / gid of the 'nobody' user.
+const nobody = 65534
+
+func (cache *httpCache) Put(_ turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error {
+ // if cache.writable {
+ cache.requestLimiter.acquire()
+ defer cache.requestLimiter.release()
+
+ r, w := io.Pipe()
+ go cache.write(w, hash, files)
+
+ // Read the entire artifact tar into memory so we can easily compute the signature.
+ // Note: retryablehttp.NewRequest reads the files into memory anyways so there's no
+ // additional overhead by doing the ioutil.ReadAll here instead.
+ artifactBody, err := ioutil.ReadAll(r)
+ if err != nil {
+ return fmt.Errorf("failed to store files in HTTP cache: %w", err)
+ }
+ tag := ""
+ if cache.signerVerifier.isEnabled() {
+ tag, err = cache.signerVerifier.generateTag(hash, artifactBody)
+ if err != nil {
+ return fmt.Errorf("failed to store files in HTTP cache: %w", err)
+ }
+ }
+ return cache.client.PutArtifact(hash, artifactBody, duration, tag)
+}
+
+// write writes a series of files into the given Writer.
+func (cache *httpCache) write(w io.WriteCloser, hash string, files []turbopath.AnchoredSystemPath) {
+ defer w.Close()
+ defer func() { _ = w.Close() }()
+ zw := zstd.NewWriter(w)
+ defer func() { _ = zw.Close() }()
+ tw := tar.NewWriter(zw)
+ defer func() { _ = tw.Close() }()
+ for _, file := range files {
+ // log.Printf("caching file %v", file)
+ if err := cache.storeFile(tw, file); err != nil {
+ log.Printf("[ERROR] Error uploading artifact %s to HTTP cache due to: %s", file, err)
+ // TODO(jaredpalmer): How can we cancel the request at this point?
+ }
+ }
+}
+
+func (cache *httpCache) storeFile(tw *tar.Writer, repoRelativePath turbopath.AnchoredSystemPath) error {
+ absoluteFilePath := repoRelativePath.RestoreAnchor(cache.repoRoot)
+ info, err := absoluteFilePath.Lstat()
+ if err != nil {
+ return err
+ }
+ target := ""
+ if info.Mode()&os.ModeSymlink != 0 {
+ target, err = absoluteFilePath.Readlink()
+ if err != nil {
+ return err
+ }
+ }
+ hdr, err := tarpatch.FileInfoHeader(repoRelativePath.ToUnixPath(), info, filepath.ToSlash(target))
+ if err != nil {
+ return err
+ }
+ // Ensure posix path for filename written in header.
+ hdr.Name = repoRelativePath.ToUnixPath().ToString()
+ // Zero out all timestamps.
+ hdr.ModTime = mtime
+ hdr.AccessTime = mtime
+ hdr.ChangeTime = mtime
+ // Strip user/group ids.
+ hdr.Uid = nobody
+ hdr.Gid = nobody
+ hdr.Uname = "nobody"
+ hdr.Gname = "nobody"
+ if err := tw.WriteHeader(hdr); err != nil {
+ return err
+ } else if info.IsDir() || target != "" {
+ return nil // nothing to write
+ }
+ f, err := absoluteFilePath.Open()
+ if err != nil {
+ return err
+ }
+ defer func() { _ = f.Close() }()
+ _, err = io.Copy(tw, f)
+ if errors.Is(err, tar.ErrWriteTooLong) {
+ log.Printf("Error writing %v to tar file, info: %v, mode: %v, is regular: %v", repoRelativePath, info, info.Mode(), info.Mode().IsRegular())
+ }
+ return err
+}
+
+func (cache *httpCache) Fetch(_ turbopath.AbsoluteSystemPath, key string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) {
+ cache.requestLimiter.acquire()
+ defer cache.requestLimiter.release()
+ hit, files, duration, err := cache.retrieve(key)
+ if err != nil {
+ // TODO: analytics event?
+ return ItemStatus{Remote: false}, files, duration, fmt.Errorf("failed to retrieve files from HTTP cache: %w", err)
+ }
+ cache.logFetch(hit, key, duration)
+ return ItemStatus{Remote: hit}, files, duration, err
+}
+
+func (cache *httpCache) Exists(key string) ItemStatus {
+ cache.requestLimiter.acquire()
+ defer cache.requestLimiter.release()
+ hit, err := cache.exists(key)
+ if err != nil {
+ return ItemStatus{Remote: false}
+ }
+ return ItemStatus{Remote: hit}
+}
+
+func (cache *httpCache) logFetch(hit bool, hash string, duration int) {
+ var event string
+ if hit {
+ event = CacheEventHit
+ } else {
+ event = CacheEventMiss
+ }
+ payload := &CacheEvent{
+ Source: CacheSourceRemote,
+ Event: event,
+ Hash: hash,
+ Duration: duration,
+ }
+ cache.recorder.LogEvent(payload)
+}
+
+func (cache *httpCache) exists(hash string) (bool, error) {
+ resp, err := cache.client.ArtifactExists(hash)
+ if err != nil {
+ return false, nil
+ }
+
+ defer func() { err = resp.Body.Close() }()
+
+ if resp.StatusCode == http.StatusNotFound {
+ return false, nil
+ } else if resp.StatusCode != http.StatusOK {
+ return false, fmt.Errorf("%s", strconv.Itoa(resp.StatusCode))
+ }
+ return true, err
+}
+
+func (cache *httpCache) retrieve(hash string) (bool, []turbopath.AnchoredSystemPath, int, error) {
+ resp, err := cache.client.FetchArtifact(hash)
+ if err != nil {
+ return false, nil, 0, err
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode == http.StatusNotFound {
+ return false, nil, 0, nil // doesn't exist - not an error
+ } else if resp.StatusCode != http.StatusOK {
+ b, _ := ioutil.ReadAll(resp.Body)
+ return false, nil, 0, fmt.Errorf("%s", string(b))
+ }
+ // If present, extract the duration from the response.
+ duration := 0
+ if resp.Header.Get("x-artifact-duration") != "" {
+ intVar, err := strconv.Atoi(resp.Header.Get("x-artifact-duration"))
+ if err != nil {
+ return false, nil, 0, fmt.Errorf("invalid x-artifact-duration header: %w", err)
+ }
+ duration = intVar
+ }
+ var tarReader io.Reader
+
+ defer func() { _ = resp.Body.Close() }()
+ if cache.signerVerifier.isEnabled() {
+ expectedTag := resp.Header.Get("x-artifact-tag")
+ if expectedTag == "" {
+ // If the verifier is enabled all incoming artifact downloads must have a signature
+ return false, nil, 0, errors.New("artifact verification failed: Downloaded artifact is missing required x-artifact-tag header")
+ }
+ b, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return false, nil, 0, fmt.Errorf("artifact verification failed: %w", err)
+ }
+ isValid, err := cache.signerVerifier.validate(hash, b, expectedTag)
+ if err != nil {
+ return false, nil, 0, fmt.Errorf("artifact verification failed: %w", err)
+ }
+ if !isValid {
+ err = fmt.Errorf("artifact verification failed: artifact tag does not match expected tag %s", expectedTag)
+ return false, nil, 0, err
+ }
+ // The artifact has been verified and the body can be read and untarred
+ tarReader = bytes.NewReader(b)
+ } else {
+ tarReader = resp.Body
+ }
+ files, err := restoreTar(cache.repoRoot, tarReader)
+ if err != nil {
+ return false, nil, 0, err
+ }
+ return true, files, duration, nil
+}
+
+// restoreTar returns posix-style repo-relative paths of the files it
+// restored. In the future, these should likely be repo-relative system paths
+// so that they are suitable for being fed into cache.Put for other caches.
+// For now, I think this is working because windows also accepts /-delimited paths.
+func restoreTar(root turbopath.AbsoluteSystemPath, reader io.Reader) ([]turbopath.AnchoredSystemPath, error) {
+ files := []turbopath.AnchoredSystemPath{}
+ missingLinks := []*tar.Header{}
+ zr := zstd.NewReader(reader)
+ var closeError error
+ defer func() { closeError = zr.Close() }()
+ tr := tar.NewReader(zr)
+ for {
+ hdr, err := tr.Next()
+ if err != nil {
+ if err == io.EOF {
+ for _, link := range missingLinks {
+ err := restoreSymlink(root, link, true)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ return files, closeError
+ }
+ return nil, err
+ }
+ // hdr.Name is always a posix-style path
+ // FIXME: THIS IS A BUG.
+ restoredName := turbopath.AnchoredUnixPath(hdr.Name)
+ files = append(files, restoredName.ToSystemPath())
+ filename := restoredName.ToSystemPath().RestoreAnchor(root)
+ if isChild, err := root.ContainsPath(filename); err != nil {
+ return nil, err
+ } else if !isChild {
+ return nil, fmt.Errorf("cannot untar file to %v", filename)
+ }
+ switch hdr.Typeflag {
+ case tar.TypeDir:
+ if err := filename.MkdirAll(0775); err != nil {
+ return nil, err
+ }
+ case tar.TypeReg:
+ if dir := filename.Dir(); dir != "." {
+ if err := dir.MkdirAll(0775); err != nil {
+ return nil, err
+ }
+ }
+ if f, err := filename.OpenFile(os.O_WRONLY|os.O_TRUNC|os.O_CREATE, os.FileMode(hdr.Mode)); err != nil {
+ return nil, err
+ } else if _, err := io.Copy(f, tr); err != nil {
+ return nil, err
+ } else if err := f.Close(); err != nil {
+ return nil, err
+ }
+ case tar.TypeSymlink:
+ if err := restoreSymlink(root, hdr, false); errors.Is(err, errNonexistentLinkTarget) {
+ missingLinks = append(missingLinks, hdr)
+ } else if err != nil {
+ return nil, err
+ }
+ default:
+ log.Printf("Unhandled file type %d for %s", hdr.Typeflag, hdr.Name)
+ }
+ }
+}
+
+var errNonexistentLinkTarget = errors.New("the link target does not exist")
+
+func restoreSymlink(root turbopath.AbsoluteSystemPath, hdr *tar.Header, allowNonexistentTargets bool) error {
+ // Note that hdr.Linkname is really the link target
+ relativeLinkTarget := filepath.FromSlash(hdr.Linkname)
+ linkFilename := root.UntypedJoin(hdr.Name)
+ if err := linkFilename.EnsureDir(); err != nil {
+ return err
+ }
+
+ // TODO: check if this is an absolute path, or if we even care
+ linkTarget := linkFilename.Dir().UntypedJoin(relativeLinkTarget)
+ if _, err := linkTarget.Lstat(); err != nil {
+ if os.IsNotExist(err) {
+ if !allowNonexistentTargets {
+ return errNonexistentLinkTarget
+ }
+ // if we're allowing nonexistent link targets, proceed to creating the link
+ } else {
+ return err
+ }
+ }
+ // Ensure that the link we're about to create doesn't already exist
+ if err := linkFilename.Remove(); err != nil && !errors.Is(err, os.ErrNotExist) {
+ return err
+ }
+ if err := linkFilename.Symlink(relativeLinkTarget); err != nil {
+ return err
+ }
+ return nil
+}
+
+func (cache *httpCache) Clean(_ turbopath.AbsoluteSystemPath) {
+ // Not possible; this implementation can only clean for a hash.
+}
+
+func (cache *httpCache) CleanAll() {
+ // Also not possible.
+}
+
+func (cache *httpCache) Shutdown() {}
+
+func newHTTPCache(opts Opts, client client, recorder analytics.Recorder) *httpCache {
+ return &httpCache{
+ writable: true,
+ client: client,
+ requestLimiter: make(limiter, 20),
+ recorder: recorder,
+ signerVerifier: &ArtifactSignatureAuthentication{
+ // TODO(Gaspar): this should use RemoteCacheOptions.TeamId once we start
+ // enforcing team restrictions for repositories.
+ teamId: client.GetTeamID(),
+ enabled: opts.RemoteCacheOpts.Signature,
+ },
+ }
+}
diff --git a/cli/internal/cache/cache_http_test.go b/cli/internal/cache/cache_http_test.go
new file mode 100644
index 0000000..d187931
--- /dev/null
+++ b/cli/internal/cache/cache_http_test.go
@@ -0,0 +1,245 @@
+package cache
+
+import (
+ "archive/tar"
+ "bytes"
+ "errors"
+ "net/http"
+ "testing"
+
+ "github.com/DataDog/zstd"
+
+ "github.com/vercel/turbo/cli/internal/fs"
+ "github.com/vercel/turbo/cli/internal/turbopath"
+ "github.com/vercel/turbo/cli/internal/util"
+ "gotest.tools/v3/assert"
+)
+
+type errorResp struct {
+ err error
+}
+
+func (sr *errorResp) PutArtifact(hash string, body []byte, duration int, tag string) error {
+ return sr.err
+}
+
+func (sr *errorResp) FetchArtifact(hash string) (*http.Response, error) {
+ return nil, sr.err
+}
+
+func (sr *errorResp) ArtifactExists(hash string) (*http.Response, error) {
+ return nil, sr.err
+}
+
+func (sr *errorResp) GetTeamID() string {
+ return ""
+}
+
+func TestRemoteCachingDisabled(t *testing.T) {
+ clientErr := &util.CacheDisabledError{
+ Status: util.CachingStatusDisabled,
+ Message: "Remote Caching has been disabled for this team. A team owner can enable it here: $URL",
+ }
+ client := &errorResp{err: clientErr}
+ cache := &httpCache{
+ client: client,
+ requestLimiter: make(limiter, 20),
+ }
+ cd := &util.CacheDisabledError{}
+ _, _, _, err := cache.Fetch("unused-target", "some-hash", []string{"unused", "outputs"})
+ if !errors.As(err, &cd) {
+ t.Errorf("cache.Fetch err got %v, want a CacheDisabled error", err)
+ }
+ if cd.Status != util.CachingStatusDisabled {
+ t.Errorf("CacheDisabled.Status got %v, want %v", cd.Status, util.CachingStatusDisabled)
+ }
+}
+
+func makeValidTar(t *testing.T) *bytes.Buffer {
+ // <repoRoot>
+ // my-pkg/
+ // some-file
+ // link-to-extra-file -> ../extra-file
+ // broken-link -> ../../global-dep
+ // extra-file
+
+ t.Helper()
+ buf := &bytes.Buffer{}
+ zw := zstd.NewWriter(buf)
+ defer func() {
+ if err := zw.Close(); err != nil {
+ t.Fatalf("failed to close gzip: %v", err)
+ }
+ }()
+ tw := tar.NewWriter(zw)
+ defer func() {
+ if err := tw.Close(); err != nil {
+ t.Fatalf("failed to close tar: %v", err)
+ }
+ }()
+
+ // my-pkg
+ h := &tar.Header{
+ Name: "my-pkg/",
+ Mode: int64(0644),
+ Typeflag: tar.TypeDir,
+ }
+ if err := tw.WriteHeader(h); err != nil {
+ t.Fatalf("failed to write header: %v", err)
+ }
+ // my-pkg/some-file
+ contents := []byte("some-file-contents")
+ h = &tar.Header{
+ Name: "my-pkg/some-file",
+ Mode: int64(0644),
+ Typeflag: tar.TypeReg,
+ Size: int64(len(contents)),
+ }
+ if err := tw.WriteHeader(h); err != nil {
+ t.Fatalf("failed to write header: %v", err)
+ }
+ if _, err := tw.Write(contents); err != nil {
+ t.Fatalf("failed to write file: %v", err)
+ }
+ // my-pkg/link-to-extra-file
+ h = &tar.Header{
+ Name: "my-pkg/link-to-extra-file",
+ Mode: int64(0644),
+ Typeflag: tar.TypeSymlink,
+ Linkname: "../extra-file",
+ }
+ if err := tw.WriteHeader(h); err != nil {
+ t.Fatalf("failed to write header: %v", err)
+ }
+ // my-pkg/broken-link
+ h = &tar.Header{
+ Name: "my-pkg/broken-link",
+ Mode: int64(0644),
+ Typeflag: tar.TypeSymlink,
+ Linkname: "../../global-dep",
+ }
+ if err := tw.WriteHeader(h); err != nil {
+ t.Fatalf("failed to write header: %v", err)
+ }
+ // extra-file
+ contents = []byte("extra-file-contents")
+ h = &tar.Header{
+ Name: "extra-file",
+ Mode: int64(0644),
+ Typeflag: tar.TypeReg,
+ Size: int64(len(contents)),
+ }
+ if err := tw.WriteHeader(h); err != nil {
+ t.Fatalf("failed to write header: %v", err)
+ }
+ if _, err := tw.Write(contents); err != nil {
+ t.Fatalf("failed to write file: %v", err)
+ }
+
+ return buf
+}
+
+func makeInvalidTar(t *testing.T) *bytes.Buffer {
+ // contains a single file that traverses out
+ // ../some-file
+
+ t.Helper()
+ buf := &bytes.Buffer{}
+ zw := zstd.NewWriter(buf)
+ defer func() {
+ if err := zw.Close(); err != nil {
+ t.Fatalf("failed to close gzip: %v", err)
+ }
+ }()
+ tw := tar.NewWriter(zw)
+ defer func() {
+ if err := tw.Close(); err != nil {
+ t.Fatalf("failed to close tar: %v", err)
+ }
+ }()
+
+ // my-pkg/some-file
+ contents := []byte("some-file-contents")
+ h := &tar.Header{
+ Name: "../some-file",
+ Mode: int64(0644),
+ Typeflag: tar.TypeReg,
+ Size: int64(len(contents)),
+ }
+ if err := tw.WriteHeader(h); err != nil {
+ t.Fatalf("failed to write header: %v", err)
+ }
+ if _, err := tw.Write(contents); err != nil {
+ t.Fatalf("failed to write file: %v", err)
+ }
+ return buf
+}
+
+func TestRestoreTar(t *testing.T) {
+ root := fs.AbsoluteSystemPathFromUpstream(t.TempDir())
+
+ tar := makeValidTar(t)
+
+ expectedFiles := []turbopath.AnchoredSystemPath{
+ turbopath.AnchoredUnixPath("extra-file").ToSystemPath(),
+ turbopath.AnchoredUnixPath("my-pkg/").ToSystemPath(),
+ turbopath.AnchoredUnixPath("my-pkg/some-file").ToSystemPath(),
+ turbopath.AnchoredUnixPath("my-pkg/link-to-extra-file").ToSystemPath(),
+ turbopath.AnchoredUnixPath("my-pkg/broken-link").ToSystemPath(),
+ }
+ files, err := restoreTar(root, tar)
+ assert.NilError(t, err, "readTar")
+
+ expectedSet := make(util.Set)
+ for _, file := range expectedFiles {
+ expectedSet.Add(file.ToString())
+ }
+ gotSet := make(util.Set)
+ for _, file := range files {
+ gotSet.Add(file.ToString())
+ }
+ extraFiles := gotSet.Difference(expectedSet)
+ if extraFiles.Len() > 0 {
+ t.Errorf("got extra files: %v", extraFiles.UnsafeListOfStrings())
+ }
+ missingFiles := expectedSet.Difference(gotSet)
+ if missingFiles.Len() > 0 {
+ t.Errorf("missing expected files: %v", missingFiles.UnsafeListOfStrings())
+ }
+
+ // Verify file contents
+ extraFile := root.UntypedJoin("extra-file")
+ contents, err := extraFile.ReadFile()
+ assert.NilError(t, err, "ReadFile")
+ assert.DeepEqual(t, contents, []byte("extra-file-contents"))
+
+ someFile := root.UntypedJoin("my-pkg", "some-file")
+ contents, err = someFile.ReadFile()
+ assert.NilError(t, err, "ReadFile")
+ assert.DeepEqual(t, contents, []byte("some-file-contents"))
+}
+
+func TestRestoreInvalidTar(t *testing.T) {
+ root := fs.AbsoluteSystemPathFromUpstream(t.TempDir())
+ expectedContents := []byte("important-data")
+ someFile := root.UntypedJoin("some-file")
+ err := someFile.WriteFile(expectedContents, 0644)
+ assert.NilError(t, err, "WriteFile")
+
+ tar := makeInvalidTar(t)
+ // use a child directory so that blindly untarring will squash the file
+ // that we just wrote above.
+ repoRoot := root.UntypedJoin("repo")
+ _, err = restoreTar(repoRoot, tar)
+ if err == nil {
+ t.Error("expected error untarring invalid tar")
+ }
+
+ contents, err := someFile.ReadFile()
+ assert.NilError(t, err, "ReadFile")
+ assert.Equal(t, string(contents), string(expectedContents), "expected to not overwrite file")
+}
+
+// Note that testing Put will require mocking the filesystem and is not currently the most
+// interesting test. The current implementation directly returns the error from PutArtifact.
+// We should still add the test once feasible to avoid future breakage.
diff --git a/cli/internal/cache/cache_noop.go b/cli/internal/cache/cache_noop.go
new file mode 100644
index 0000000..80a3c23
--- /dev/null
+++ b/cli/internal/cache/cache_noop.go
@@ -0,0 +1,23 @@
+package cache
+
+import "github.com/vercel/turbo/cli/internal/turbopath"
+
+type noopCache struct{}
+
+func newNoopCache() *noopCache {
+ return &noopCache{}
+}
+
+func (c *noopCache) Put(_ turbopath.AbsoluteSystemPath, _ string, _ int, _ []turbopath.AnchoredSystemPath) error {
+ return nil
+}
+func (c *noopCache) Fetch(_ turbopath.AbsoluteSystemPath, _ string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) {
+ return ItemStatus{Local: false, Remote: false}, nil, 0, nil
+}
+func (c *noopCache) Exists(_ string) ItemStatus {
+ return ItemStatus{}
+}
+
+func (c *noopCache) Clean(_ turbopath.AbsoluteSystemPath) {}
+func (c *noopCache) CleanAll() {}
+func (c *noopCache) Shutdown() {}
diff --git a/cli/internal/cache/cache_signature_authentication.go b/cli/internal/cache/cache_signature_authentication.go
new file mode 100644
index 0000000..f9fe4c0
--- /dev/null
+++ b/cli/internal/cache/cache_signature_authentication.go
@@ -0,0 +1,88 @@
+// Adapted from https://github.com/thought-machine/please
+// Copyright Thought Machine, Inc. or its affiliates. All Rights Reserved.
+// SPDX-License-Identifier: Apache-2.0
+package cache
+
+import (
+ "crypto/hmac"
+ "crypto/sha256"
+ "encoding/base64"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "hash"
+ "os"
+)
+
+type ArtifactSignatureAuthentication struct {
+ teamId string
+ enabled bool
+}
+
+func (asa *ArtifactSignatureAuthentication) isEnabled() bool {
+ return asa.enabled
+}
+
+// If the secret key is not found or the secret key length is 0, an error is returned
+// Preference is given to the environment specified secret key.
+func (asa *ArtifactSignatureAuthentication) secretKey() ([]byte, error) {
+ secret := os.Getenv("TURBO_REMOTE_CACHE_SIGNATURE_KEY")
+ if len(secret) == 0 {
+ return nil, errors.New("signature secret key not found. You must specify a secret key in the TURBO_REMOTE_CACHE_SIGNATURE_KEY environment variable")
+ }
+ return []byte(secret), nil
+}
+
+func (asa *ArtifactSignatureAuthentication) generateTag(hash string, artifactBody []byte) (string, error) {
+ tag, err := asa.getTagGenerator(hash)
+ if err != nil {
+ return "", err
+ }
+ tag.Write(artifactBody)
+ return base64.StdEncoding.EncodeToString(tag.Sum(nil)), nil
+}
+
+func (asa *ArtifactSignatureAuthentication) getTagGenerator(hash string) (hash.Hash, error) {
+ teamId := asa.teamId
+ secret, err := asa.secretKey()
+ if err != nil {
+ return nil, err
+ }
+ artifactMetadata := &struct {
+ Hash string `json:"hash"`
+ TeamId string `json:"teamId"`
+ }{
+ Hash: hash,
+ TeamId: teamId,
+ }
+ metadata, err := json.Marshal(artifactMetadata)
+ if err != nil {
+ return nil, err
+ }
+
+ // TODO(Gaspar) Support additional signing algorithms here
+ h := hmac.New(sha256.New, secret)
+ h.Write(metadata)
+ return h, nil
+}
+
+func (asa *ArtifactSignatureAuthentication) validate(hash string, artifactBody []byte, expectedTag string) (bool, error) {
+ computedTag, err := asa.generateTag(hash, artifactBody)
+ if err != nil {
+ return false, fmt.Errorf("failed to verify artifact tag: %w", err)
+ }
+ return hmac.Equal([]byte(computedTag), []byte(expectedTag)), nil
+}
+
+type StreamValidator struct {
+ currentHash hash.Hash
+}
+
+func (sv *StreamValidator) Validate(expectedTag string) bool {
+ computedTag := base64.StdEncoding.EncodeToString(sv.currentHash.Sum(nil))
+ return hmac.Equal([]byte(computedTag), []byte(expectedTag))
+}
+
+func (sv *StreamValidator) CurrentValue() string {
+ return base64.StdEncoding.EncodeToString(sv.currentHash.Sum(nil))
+}
diff --git a/cli/internal/cache/cache_signature_authentication_test.go b/cli/internal/cache/cache_signature_authentication_test.go
new file mode 100644
index 0000000..7f3f865
--- /dev/null
+++ b/cli/internal/cache/cache_signature_authentication_test.go
@@ -0,0 +1,195 @@
+// Adapted from ghttps://github.com/thought-machine/please
+// Copyright Thought Machine, Inc. or its affiliates. All Rights Reserved.
+// SPDX-License-Identifier: Apache-2.0
+package cache
+
+import (
+ "crypto/hmac"
+ "crypto/sha256"
+ "encoding/base64"
+ "encoding/json"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_SecretKeySuccess(t *testing.T) {
+ teamId := "team_someid"
+ secretKeyEnvName := "TURBO_REMOTE_CACHE_SIGNATURE_KEY"
+ secretKeyEnvValue := "my-secret-key-env"
+ t.Setenv(secretKeyEnvName, secretKeyEnvValue)
+
+ cases := []struct {
+ name string
+ asa *ArtifactSignatureAuthentication
+ expectedSecretKey string
+ expectedSecretKeyError bool
+ }{
+ {
+ name: "Accepts secret key",
+ asa: &ArtifactSignatureAuthentication{
+ teamId: teamId,
+ enabled: true,
+ },
+ expectedSecretKey: secretKeyEnvValue,
+ expectedSecretKeyError: false,
+ },
+ }
+
+ for _, tc := range cases {
+ t.Run(tc.name, func(t *testing.T) {
+ secretKey, err := tc.asa.secretKey()
+ if tc.expectedSecretKeyError {
+ assert.Error(t, err)
+ } else {
+ assert.NoError(t, err)
+ assert.Equal(t, tc.expectedSecretKey, string(secretKey))
+ }
+ })
+ }
+}
+
+func Test_SecretKeyErrors(t *testing.T) {
+ teamId := "team_someid"
+
+ // Env secret key TURBO_REMOTE_CACHE_SIGNATURE_KEY is not set
+
+ cases := []struct {
+ name string
+ asa *ArtifactSignatureAuthentication
+ expectedSecretKey string
+ expectedSecretKeyError bool
+ }{
+ {
+ name: "Secret key not defined errors",
+ asa: &ArtifactSignatureAuthentication{
+ teamId: teamId,
+ enabled: true,
+ },
+ expectedSecretKey: "",
+ expectedSecretKeyError: true,
+ },
+ {
+ name: "Secret key is empty errors",
+ asa: &ArtifactSignatureAuthentication{
+ teamId: teamId,
+ enabled: true,
+ },
+ expectedSecretKey: "",
+ expectedSecretKeyError: true,
+ },
+ }
+
+ for _, tc := range cases {
+ t.Run(tc.name, func(t *testing.T) {
+ secretKey, err := tc.asa.secretKey()
+ if tc.expectedSecretKeyError {
+ assert.Error(t, err)
+ } else {
+ assert.NoError(t, err)
+ assert.Equal(t, tc.expectedSecretKey, string(secretKey))
+ }
+ })
+ }
+}
+
+func Test_GenerateTagAndValidate(t *testing.T) {
+ teamId := "team_someid"
+ hash := "the-artifact-hash"
+ artifactBody := []byte("the artifact body as bytes")
+ secretKeyEnvName := "TURBO_REMOTE_CACHE_SIGNATURE_KEY"
+ secretKeyEnvValue := "my-secret-key-env"
+ t.Setenv(secretKeyEnvName, secretKeyEnvValue)
+
+ cases := []struct {
+ name string
+ asa *ArtifactSignatureAuthentication
+ expectedTagMatches string
+ expectedTagDoesNotMatch string
+ }{
+ {
+ name: "Uses hash to generate tag",
+ asa: &ArtifactSignatureAuthentication{
+ teamId: teamId,
+ enabled: true,
+ },
+ expectedTagMatches: testUtilGetHMACTag(hash, teamId, artifactBody, secretKeyEnvValue),
+ expectedTagDoesNotMatch: testUtilGetHMACTag("wrong-hash", teamId, artifactBody, secretKeyEnvValue),
+ },
+ {
+ name: "Uses teamId to generate tag",
+ asa: &ArtifactSignatureAuthentication{
+ teamId: teamId,
+ enabled: true,
+ },
+ expectedTagMatches: testUtilGetHMACTag(hash, teamId, artifactBody, secretKeyEnvValue),
+ expectedTagDoesNotMatch: testUtilGetHMACTag(hash, "wrong-teamId", artifactBody, secretKeyEnvValue),
+ },
+ {
+ name: "Uses artifactBody to generate tag",
+ asa: &ArtifactSignatureAuthentication{
+ teamId: teamId,
+ enabled: true,
+ },
+ expectedTagMatches: testUtilGetHMACTag(hash, teamId, artifactBody, secretKeyEnvValue),
+ expectedTagDoesNotMatch: testUtilGetHMACTag(hash, teamId, []byte("wrong-artifact-body"), secretKeyEnvValue),
+ },
+ {
+ name: "Uses secret to generate tag",
+ asa: &ArtifactSignatureAuthentication{
+ teamId: teamId,
+ enabled: true,
+ },
+ expectedTagMatches: testUtilGetHMACTag(hash, teamId, artifactBody, secretKeyEnvValue),
+ expectedTagDoesNotMatch: testUtilGetHMACTag(hash, teamId, artifactBody, "wrong-secret"),
+ },
+ }
+
+ for _, tc := range cases {
+ t.Run(tc.name, func(t *testing.T) {
+ tag, err := tc.asa.generateTag(hash, artifactBody)
+ assert.NoError(t, err)
+
+ // validates the tag
+ assert.Equal(t, tc.expectedTagMatches, tag)
+ isValid, err := tc.asa.validate(hash, artifactBody, tc.expectedTagMatches)
+ assert.NoError(t, err)
+ assert.True(t, isValid)
+
+ // does not validate the tag
+ assert.NotEqual(t, tc.expectedTagDoesNotMatch, tag)
+ isValid, err = tc.asa.validate(hash, artifactBody, tc.expectedTagDoesNotMatch)
+ assert.NoError(t, err)
+ assert.False(t, isValid)
+
+ })
+ }
+}
+
+// Test utils
+
+// Return the Base64 encoded HMAC given the artifact metadata and artifact body
+func testUtilGetHMACTag(hash string, teamId string, artifactBody []byte, secret string) string {
+ artifactMetadata := &struct {
+ Hash string `json:"hash"`
+ TeamId string `json:"teamId"`
+ }{
+ Hash: hash,
+ TeamId: teamId,
+ }
+ metadata, _ := json.Marshal(artifactMetadata)
+ h := hmac.New(sha256.New, []byte(secret))
+ h.Write(metadata)
+ h.Write(artifactBody)
+ return base64.StdEncoding.EncodeToString(h.Sum(nil))
+}
+
+func Test_Utils(t *testing.T) {
+ teamId := "team_someid"
+ secret := "my-secret"
+ hash := "the-artifact-hash"
+ artifactBody := []byte("the artifact body as bytes")
+ testTag := testUtilGetHMACTag(hash, teamId, artifactBody, secret)
+ expectedTag := "9Fu8YniPZ2dEBolTPQoNlFWG0LNMW8EXrBsRmf/fEHk="
+ assert.True(t, hmac.Equal([]byte(testTag), []byte(expectedTag)))
+}
diff --git a/cli/internal/cache/cache_test.go b/cli/internal/cache/cache_test.go
new file mode 100644
index 0000000..3f17877
--- /dev/null
+++ b/cli/internal/cache/cache_test.go
@@ -0,0 +1,318 @@
+package cache
+
+import (
+ "net/http"
+ "reflect"
+ "sync/atomic"
+ "testing"
+
+ "github.com/vercel/turbo/cli/internal/analytics"
+ "github.com/vercel/turbo/cli/internal/fs"
+ "github.com/vercel/turbo/cli/internal/turbopath"
+ "github.com/vercel/turbo/cli/internal/util"
+)
+
+type testCache struct {
+ disabledErr *util.CacheDisabledError
+ entries map[string][]turbopath.AnchoredSystemPath
+}
+
+func (tc *testCache) Fetch(_ turbopath.AbsoluteSystemPath, hash string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) {
+ if tc.disabledErr != nil {
+ return ItemStatus{}, nil, 0, tc.disabledErr
+ }
+ foundFiles, ok := tc.entries[hash]
+ if ok {
+ duration := 5
+ return ItemStatus{Local: true}, foundFiles, duration, nil
+ }
+ return ItemStatus{}, nil, 0, nil
+}
+
+func (tc *testCache) Exists(hash string) ItemStatus {
+ if tc.disabledErr != nil {
+ return ItemStatus{}
+ }
+ _, ok := tc.entries[hash]
+ if ok {
+ return ItemStatus{Local: true}
+ }
+ return ItemStatus{}
+}
+
+func (tc *testCache) Put(_ turbopath.AbsoluteSystemPath, hash string, _ int, files []turbopath.AnchoredSystemPath) error {
+ if tc.disabledErr != nil {
+ return tc.disabledErr
+ }
+ tc.entries[hash] = files
+ return nil
+}
+
+func (tc *testCache) Clean(_ turbopath.AbsoluteSystemPath) {}
+func (tc *testCache) CleanAll() {}
+func (tc *testCache) Shutdown() {}
+
+func newEnabledCache() *testCache {
+ return &testCache{
+ entries: make(map[string][]turbopath.AnchoredSystemPath),
+ }
+}
+
+func newDisabledCache() *testCache {
+ return &testCache{
+ disabledErr: &util.CacheDisabledError{
+ Status: util.CachingStatusDisabled,
+ Message: "remote caching is disabled",
+ },
+ }
+}
+
+func TestPutCachingDisabled(t *testing.T) {
+ disabledCache := newDisabledCache()
+ caches := []Cache{
+ newEnabledCache(),
+ disabledCache,
+ newEnabledCache(),
+ newEnabledCache(),
+ }
+ var removeCalled uint64
+ mplex := &cacheMultiplexer{
+ caches: caches,
+ onCacheRemoved: func(cache Cache, err error) {
+ atomic.AddUint64(&removeCalled, 1)
+ },
+ }
+
+ err := mplex.Put("unused-target", "some-hash", 5, []turbopath.AnchoredSystemPath{"a-file"})
+ if err != nil {
+ // don't leak the cache removal
+ t.Errorf("Put got error %v, want <nil>", err)
+ }
+
+ removes := atomic.LoadUint64(&removeCalled)
+ if removes != 1 {
+ t.Errorf("removes count: %v, want 1", removes)
+ }
+
+ mplex.mu.RLock()
+ if len(mplex.caches) != 3 {
+ t.Errorf("found %v caches, expected to have 3 after one was removed", len(mplex.caches))
+ }
+ for _, cache := range mplex.caches {
+ if cache == disabledCache {
+ t.Error("found disabled cache, expected it to be removed")
+ }
+ }
+ mplex.mu.RUnlock()
+
+ // subsequent Fetch should still work
+ cacheStatus, _, _, err := mplex.Fetch("unused-target", "some-hash", []string{"unused", "files"})
+ if err != nil {
+ t.Errorf("got error fetching files: %v", err)
+ }
+ hit := cacheStatus.Local || cacheStatus.Remote
+ if !hit {
+ t.Error("failed to find previously stored files")
+ }
+
+ removes = atomic.LoadUint64(&removeCalled)
+ if removes != 1 {
+ t.Errorf("removes count: %v, want 1", removes)
+ }
+}
+
+func TestExists(t *testing.T) {
+ caches := []Cache{
+ newEnabledCache(),
+ }
+
+ mplex := &cacheMultiplexer{
+ caches: caches,
+ }
+
+ itemStatus := mplex.Exists("some-hash")
+ if itemStatus.Local {
+ t.Error("did not expect file to exist")
+ }
+
+ err := mplex.Put("unused-target", "some-hash", 5, []turbopath.AnchoredSystemPath{"a-file"})
+ if err != nil {
+ // don't leak the cache removal
+ t.Errorf("Put got error %v, want <nil>", err)
+ }
+
+ itemStatus = mplex.Exists("some-hash")
+ if !itemStatus.Local {
+ t.Error("failed to find previously stored files")
+ }
+}
+
+type fakeClient struct{}
+
+// FetchArtifact implements client
+func (*fakeClient) FetchArtifact(hash string) (*http.Response, error) {
+ panic("unimplemented")
+}
+
+func (*fakeClient) ArtifactExists(hash string) (*http.Response, error) {
+ panic("unimplemented")
+}
+
+// GetTeamID implements client
+func (*fakeClient) GetTeamID() string {
+ return "fake-team-id"
+}
+
+// PutArtifact implements client
+func (*fakeClient) PutArtifact(hash string, body []byte, duration int, tag string) error {
+ panic("unimplemented")
+}
+
+var _ client = &fakeClient{}
+
+func TestFetchCachingDisabled(t *testing.T) {
+ disabledCache := newDisabledCache()
+ caches := []Cache{
+ newEnabledCache(),
+ disabledCache,
+ newEnabledCache(),
+ newEnabledCache(),
+ }
+ var removeCalled uint64
+ mplex := &cacheMultiplexer{
+ caches: caches,
+ onCacheRemoved: func(cache Cache, err error) {
+ atomic.AddUint64(&removeCalled, 1)
+ },
+ }
+
+ cacheStatus, _, _, err := mplex.Fetch("unused-target", "some-hash", []string{"unused", "files"})
+ if err != nil {
+ // don't leak the cache removal
+ t.Errorf("Fetch got error %v, want <nil>", err)
+ }
+ hit := cacheStatus.Local || cacheStatus.Remote
+ if hit {
+ t.Error("hit on empty cache, expected miss")
+ }
+
+ removes := atomic.LoadUint64(&removeCalled)
+ if removes != 1 {
+ t.Errorf("removes count: %v, want 1", removes)
+ }
+
+ mplex.mu.RLock()
+ if len(mplex.caches) != 3 {
+ t.Errorf("found %v caches, expected to have 3 after one was removed", len(mplex.caches))
+ }
+ for _, cache := range mplex.caches {
+ if cache == disabledCache {
+ t.Error("found disabled cache, expected it to be removed")
+ }
+ }
+ mplex.mu.RUnlock()
+}
+
+type nullRecorder struct{}
+
+func (nullRecorder) LogEvent(analytics.EventPayload) {}
+
+func TestNew(t *testing.T) {
+ // Test will bomb if this fails, no need to specially handle the error
+ repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir())
+ type args struct {
+ opts Opts
+ recorder analytics.Recorder
+ onCacheRemoved OnCacheRemoved
+ client fakeClient
+ }
+ tests := []struct {
+ name string
+ args args
+ want Cache
+ wantErr bool
+ }{
+ {
+ name: "With no caches configured, new returns a noopCache and an error",
+ args: args{
+ opts: Opts{
+ SkipFilesystem: true,
+ SkipRemote: true,
+ },
+ recorder: &nullRecorder{},
+ onCacheRemoved: func(Cache, error) {},
+ },
+ want: &noopCache{},
+ wantErr: true,
+ },
+ {
+ name: "With just httpCache configured, new returns an httpCache and a noopCache",
+ args: args{
+ opts: Opts{
+ SkipFilesystem: true,
+ RemoteCacheOpts: fs.RemoteCacheOptions{
+ Signature: true,
+ },
+ },
+ recorder: &nullRecorder{},
+ onCacheRemoved: func(Cache, error) {},
+ },
+ want: &cacheMultiplexer{
+ caches: []Cache{&httpCache{}, &noopCache{}},
+ },
+ wantErr: false,
+ },
+ {
+ name: "With just fsCache configured, new returns only an fsCache",
+ args: args{
+ opts: Opts{
+ SkipRemote: true,
+ },
+ recorder: &nullRecorder{},
+ onCacheRemoved: func(Cache, error) {},
+ },
+ want: &fsCache{},
+ },
+ {
+ name: "With both configured, new returns an fsCache and httpCache",
+ args: args{
+ opts: Opts{
+ RemoteCacheOpts: fs.RemoteCacheOptions{
+ Signature: true,
+ },
+ },
+ recorder: &nullRecorder{},
+ onCacheRemoved: func(Cache, error) {},
+ },
+ want: &cacheMultiplexer{
+ caches: []Cache{&fsCache{}, &httpCache{}},
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got, err := New(tt.args.opts, repoRoot, &tt.args.client, tt.args.recorder, tt.args.onCacheRemoved)
+ if (err != nil) != tt.wantErr {
+ t.Errorf("New() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+ switch multiplexer := got.(type) {
+ case *cacheMultiplexer:
+ want := tt.want.(*cacheMultiplexer)
+ for i := range multiplexer.caches {
+ if reflect.TypeOf(multiplexer.caches[i]) != reflect.TypeOf(want.caches[i]) {
+ t.Errorf("New() = %v, want %v", reflect.TypeOf(multiplexer.caches[i]), reflect.TypeOf(want.caches[i]))
+ }
+ }
+ case *fsCache:
+ if reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
+ t.Errorf("New() = %v, want %v", reflect.TypeOf(got), reflect.TypeOf(tt.want))
+ }
+ case *noopCache:
+ if reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
+ t.Errorf("New() = %v, want %v", reflect.TypeOf(got), reflect.TypeOf(tt.want))
+ }
+ }
+ })
+ }
+}