From fc8c5fdce62fb229202659408798a7b6c98f6e8b Mon Sep 17 00:00:00 2001 From: 简律纯 Date: Fri, 28 Apr 2023 01:36:55 +0800 Subject: --- cli/internal/cache/async_cache.go | 82 ----- cli/internal/cache/cache.go | 317 ----------------- cli/internal/cache/cache_fs.go | 174 ---------- cli/internal/cache/cache_fs_test.go | 253 -------------- cli/internal/cache/cache_http.go | 375 --------------------- cli/internal/cache/cache_http_test.go | 245 -------------- cli/internal/cache/cache_noop.go | 23 -- .../cache/cache_signature_authentication.go | 88 ----- .../cache/cache_signature_authentication_test.go | 195 ----------- cli/internal/cache/cache_test.go | 318 ----------------- 10 files changed, 2070 deletions(-) delete mode 100644 cli/internal/cache/async_cache.go delete mode 100644 cli/internal/cache/cache.go delete mode 100644 cli/internal/cache/cache_fs.go delete mode 100644 cli/internal/cache/cache_fs_test.go delete mode 100644 cli/internal/cache/cache_http.go delete mode 100644 cli/internal/cache/cache_http_test.go delete mode 100644 cli/internal/cache/cache_noop.go delete mode 100644 cli/internal/cache/cache_signature_authentication.go delete mode 100644 cli/internal/cache/cache_signature_authentication_test.go delete mode 100644 cli/internal/cache/cache_test.go (limited to 'cli/internal/cache') diff --git a/cli/internal/cache/async_cache.go b/cli/internal/cache/async_cache.go deleted file mode 100644 index 0a8f467..0000000 --- a/cli/internal/cache/async_cache.go +++ /dev/null @@ -1,82 +0,0 @@ -// Adapted from https://github.com/thought-machine/please -// Copyright Thought Machine, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 -package cache - -import ( - "sync" - - "github.com/vercel/turbo/cli/internal/turbopath" -) - -// An asyncCache is a wrapper around a Cache interface that handles incoming -// store requests asynchronously and attempts to return immediately. -// The requests are handled on an internal queue, if that fills up then -// incoming requests will start to block again until it empties. -// Retrieval requests are still handled synchronously. -type asyncCache struct { - requests chan cacheRequest - realCache Cache - wg sync.WaitGroup -} - -// A cacheRequest models an incoming cache request on our queue. -type cacheRequest struct { - anchor turbopath.AbsoluteSystemPath - key string - duration int - files []turbopath.AnchoredSystemPath -} - -func newAsyncCache(realCache Cache, opts Opts) Cache { - c := &asyncCache{ - requests: make(chan cacheRequest), - realCache: realCache, - } - c.wg.Add(opts.Workers) - for i := 0; i < opts.Workers; i++ { - go c.run() - } - return c -} - -func (c *asyncCache) Put(anchor turbopath.AbsoluteSystemPath, key string, duration int, files []turbopath.AnchoredSystemPath) error { - c.requests <- cacheRequest{ - anchor: anchor, - key: key, - files: files, - duration: duration, - } - return nil -} - -func (c *asyncCache) Fetch(anchor turbopath.AbsoluteSystemPath, key string, files []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) { - return c.realCache.Fetch(anchor, key, files) -} - -func (c *asyncCache) Exists(key string) ItemStatus { - return c.realCache.Exists(key) -} - -func (c *asyncCache) Clean(anchor turbopath.AbsoluteSystemPath) { - c.realCache.Clean(anchor) -} - -func (c *asyncCache) CleanAll() { - c.realCache.CleanAll() -} - -func (c *asyncCache) Shutdown() { - // fmt.Println("Shutting down cache workers...") - close(c.requests) - c.wg.Wait() - // fmt.Println("Shut down all cache workers") -} - -// run implements the actual async logic. -func (c *asyncCache) run() { - for r := range c.requests { - _ = c.realCache.Put(r.anchor, r.key, r.duration, r.files) - } - c.wg.Done() -} diff --git a/cli/internal/cache/cache.go b/cli/internal/cache/cache.go deleted file mode 100644 index 8b74272..0000000 --- a/cli/internal/cache/cache.go +++ /dev/null @@ -1,317 +0,0 @@ -// Package cache abstracts storing and fetching previously run tasks -// -// Adapted from https://github.com/thought-machine/please -// Copyright Thought Machine, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 -package cache - -import ( - "errors" - "sync" - - "github.com/vercel/turbo/cli/internal/analytics" - "github.com/vercel/turbo/cli/internal/fs" - "github.com/vercel/turbo/cli/internal/turbopath" - "github.com/vercel/turbo/cli/internal/util" - "golang.org/x/sync/errgroup" -) - -// Cache is abstracted way to cache/fetch previously run tasks -type Cache interface { - // Fetch returns true if there is a cache it. It is expected to move files - // into their correct position as a side effect - Fetch(anchor turbopath.AbsoluteSystemPath, hash string, files []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) - Exists(hash string) ItemStatus - // Put caches files for a given hash - Put(anchor turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error - Clean(anchor turbopath.AbsoluteSystemPath) - CleanAll() - Shutdown() -} - -// ItemStatus holds whether artifacts exists for a given hash on local -// and/or remote caching server -type ItemStatus struct { - Local bool `json:"local"` - Remote bool `json:"remote"` -} - -const ( - // CacheSourceFS is a constant to indicate local cache hit - CacheSourceFS = "LOCAL" - // CacheSourceRemote is a constant to indicate remote cache hit - CacheSourceRemote = "REMOTE" - // CacheEventHit is a constant to indicate a cache hit - CacheEventHit = "HIT" - // CacheEventMiss is a constant to indicate a cache miss - CacheEventMiss = "MISS" -) - -type CacheEvent struct { - Source string `mapstructure:"source"` - Event string `mapstructure:"event"` - Hash string `mapstructure:"hash"` - Duration int `mapstructure:"duration"` -} - -// DefaultLocation returns the default filesystem cache location, given a repo root -func DefaultLocation(repoRoot turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath { - return repoRoot.UntypedJoin("node_modules", ".cache", "turbo") -} - -// OnCacheRemoved defines a callback that the cache system calls if a particular cache -// needs to be removed. In practice, this happens when Remote Caching has been disabled -// the but CLI continues to try to use it. -type OnCacheRemoved = func(cache Cache, err error) - -// ErrNoCachesEnabled is returned when both the filesystem and http cache are unavailable -var ErrNoCachesEnabled = errors.New("no caches are enabled") - -// Opts holds configuration options for the cache -// TODO(gsoltis): further refactor this into fs cache opts and http cache opts -type Opts struct { - OverrideDir string - SkipRemote bool - SkipFilesystem bool - Workers int - RemoteCacheOpts fs.RemoteCacheOptions -} - -// resolveCacheDir calculates the location turbo should use to cache artifacts, -// based on the options supplied by the user. -func (o *Opts) resolveCacheDir(repoRoot turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath { - if o.OverrideDir != "" { - return fs.ResolveUnknownPath(repoRoot, o.OverrideDir) - } - return DefaultLocation(repoRoot) -} - -var _remoteOnlyHelp = `Ignore the local filesystem cache for all tasks. Only -allow reading and caching artifacts using the remote cache.` - -// New creates a new cache -func New(opts Opts, repoRoot turbopath.AbsoluteSystemPath, client client, recorder analytics.Recorder, onCacheRemoved OnCacheRemoved) (Cache, error) { - c, err := newSyncCache(opts, repoRoot, client, recorder, onCacheRemoved) - if err != nil && !errors.Is(err, ErrNoCachesEnabled) { - return nil, err - } - if opts.Workers > 0 { - return newAsyncCache(c, opts), err - } - return c, err -} - -// newSyncCache can return an error with a usable noopCache. -func newSyncCache(opts Opts, repoRoot turbopath.AbsoluteSystemPath, client client, recorder analytics.Recorder, onCacheRemoved OnCacheRemoved) (Cache, error) { - // Check to see if the user has turned off particular cache implementations. - useFsCache := !opts.SkipFilesystem - useHTTPCache := !opts.SkipRemote - - // Since the above two flags are not mutually exclusive it is possible to configure - // yourself out of having a cache. We should tell you about it but we shouldn't fail - // your build for that reason. - // - // Further, since the httpCache can be removed at runtime, we need to insert a noopCache - // as a backup if you are configured to have *just* an httpCache. - // - // This is reduced from (!useFsCache && !useHTTPCache) || (!useFsCache & useHTTPCache) - useNoopCache := !useFsCache - - // Build up an array of cache implementations, we can only ever have 1 or 2. - cacheImplementations := make([]Cache, 0, 2) - - if useFsCache { - implementation, err := newFsCache(opts, recorder, repoRoot) - if err != nil { - return nil, err - } - cacheImplementations = append(cacheImplementations, implementation) - } - - if useHTTPCache { - implementation := newHTTPCache(opts, client, recorder) - cacheImplementations = append(cacheImplementations, implementation) - } - - if useNoopCache { - implementation := newNoopCache() - cacheImplementations = append(cacheImplementations, implementation) - } - - // Precisely two cache implementations: - // fsCache and httpCache OR httpCache and noopCache - useMultiplexer := len(cacheImplementations) > 1 - if useMultiplexer { - // We have early-returned any possible errors for this scenario. - return &cacheMultiplexer{ - onCacheRemoved: onCacheRemoved, - opts: opts, - caches: cacheImplementations, - }, nil - } - - // Precisely one cache implementation: fsCache OR noopCache - implementation := cacheImplementations[0] - _, isNoopCache := implementation.(*noopCache) - - // We want to let the user know something is wonky, but we don't want - // to trigger their build to fail. - if isNoopCache { - return implementation, ErrNoCachesEnabled - } - return implementation, nil -} - -// A cacheMultiplexer multiplexes several caches into one. -// Used when we have several active (eg. http, dir). -type cacheMultiplexer struct { - caches []Cache - opts Opts - mu sync.RWMutex - onCacheRemoved OnCacheRemoved -} - -func (mplex *cacheMultiplexer) Put(anchor turbopath.AbsoluteSystemPath, key string, duration int, files []turbopath.AnchoredSystemPath) error { - return mplex.storeUntil(anchor, key, duration, files, len(mplex.caches)) -} - -type cacheRemoval struct { - cache Cache - err *util.CacheDisabledError -} - -// storeUntil stores artifacts into higher priority caches than the given one. -// Used after artifact retrieval to ensure we have them in eg. the directory cache after -// downloading from the RPC cache. -func (mplex *cacheMultiplexer) storeUntil(anchor turbopath.AbsoluteSystemPath, key string, duration int, files []turbopath.AnchoredSystemPath, stopAt int) error { - // Attempt to store on all caches simultaneously. - toRemove := make([]*cacheRemoval, stopAt) - g := &errgroup.Group{} - mplex.mu.RLock() - for i, cache := range mplex.caches { - if i == stopAt { - break - } - c := cache - i := i - g.Go(func() error { - err := c.Put(anchor, key, duration, files) - if err != nil { - cd := &util.CacheDisabledError{} - if errors.As(err, &cd) { - toRemove[i] = &cacheRemoval{ - cache: c, - err: cd, - } - // we don't want this to cancel other cache actions - return nil - } - return err - } - return nil - }) - } - mplex.mu.RUnlock() - - if err := g.Wait(); err != nil { - return err - } - - for _, removal := range toRemove { - if removal != nil { - mplex.removeCache(removal) - } - } - return nil -} - -// removeCache takes a requested removal and tries to actually remove it. However, -// multiple requests could result in concurrent requests to remove the same cache. -// Let one of them win and propagate the error, the rest will no-op. -func (mplex *cacheMultiplexer) removeCache(removal *cacheRemoval) { - mplex.mu.Lock() - defer mplex.mu.Unlock() - for i, cache := range mplex.caches { - if cache == removal.cache { - mplex.caches = append(mplex.caches[:i], mplex.caches[i+1:]...) - mplex.onCacheRemoved(cache, removal.err) - break - } - } -} - -func (mplex *cacheMultiplexer) Fetch(anchor turbopath.AbsoluteSystemPath, key string, files []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) { - // Make a shallow copy of the caches, since storeUntil can call removeCache - mplex.mu.RLock() - caches := make([]Cache, len(mplex.caches)) - copy(caches, mplex.caches) - mplex.mu.RUnlock() - - // We need to return a composite cache status from multiple caches - // Initialize the empty struct so we can assign values to it. This is similar - // to how the Exists() method works. - combinedCacheState := ItemStatus{} - - // Retrieve from caches sequentially; if we did them simultaneously we could - // easily write the same file from two goroutines at once. - for i, cache := range caches { - itemStatus, actualFiles, duration, err := cache.Fetch(anchor, key, files) - ok := itemStatus.Local || itemStatus.Remote - - if err != nil { - cd := &util.CacheDisabledError{} - if errors.As(err, &cd) { - mplex.removeCache(&cacheRemoval{ - cache: cache, - err: cd, - }) - } - // We're ignoring the error in the else case, since with this cache - // abstraction, we want to check lower priority caches rather than fail - // the operation. Future work that plumbs UI / Logging into the cache system - // should probably log this at least. - } - if ok { - // Store this into other caches. We can ignore errors here because we know - // we have previously successfully stored in a higher-priority cache, and so the overall - // result is a success at fetching. Storing in lower-priority caches is an optimization. - _ = mplex.storeUntil(anchor, key, duration, actualFiles, i) - - // If another cache had already set this to true, we don't need to set it again from this cache - combinedCacheState.Local = combinedCacheState.Local || itemStatus.Local - combinedCacheState.Remote = combinedCacheState.Remote || itemStatus.Remote - return combinedCacheState, actualFiles, duration, err - } - } - - return ItemStatus{Local: false, Remote: false}, nil, 0, nil -} - -func (mplex *cacheMultiplexer) Exists(target string) ItemStatus { - syncCacheState := ItemStatus{} - for _, cache := range mplex.caches { - itemStatus := cache.Exists(target) - syncCacheState.Local = syncCacheState.Local || itemStatus.Local - syncCacheState.Remote = syncCacheState.Remote || itemStatus.Remote - } - - return syncCacheState -} - -func (mplex *cacheMultiplexer) Clean(anchor turbopath.AbsoluteSystemPath) { - for _, cache := range mplex.caches { - cache.Clean(anchor) - } -} - -func (mplex *cacheMultiplexer) CleanAll() { - for _, cache := range mplex.caches { - cache.CleanAll() - } -} - -func (mplex *cacheMultiplexer) Shutdown() { - for _, cache := range mplex.caches { - cache.Shutdown() - } -} diff --git a/cli/internal/cache/cache_fs.go b/cli/internal/cache/cache_fs.go deleted file mode 100644 index fb15a02..0000000 --- a/cli/internal/cache/cache_fs.go +++ /dev/null @@ -1,174 +0,0 @@ -// Adapted from https://github.com/thought-machine/please -// Copyright Thought Machine, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 - -// Package cache implements our cache abstraction. -package cache - -import ( - "encoding/json" - "fmt" - - "github.com/vercel/turbo/cli/internal/analytics" - "github.com/vercel/turbo/cli/internal/cacheitem" - "github.com/vercel/turbo/cli/internal/turbopath" -) - -// fsCache is a local filesystem cache -type fsCache struct { - cacheDirectory turbopath.AbsoluteSystemPath - recorder analytics.Recorder -} - -// newFsCache creates a new filesystem cache -func newFsCache(opts Opts, recorder analytics.Recorder, repoRoot turbopath.AbsoluteSystemPath) (*fsCache, error) { - cacheDir := opts.resolveCacheDir(repoRoot) - if err := cacheDir.MkdirAll(0775); err != nil { - return nil, err - } - return &fsCache{ - cacheDirectory: cacheDir, - recorder: recorder, - }, nil -} - -// Fetch returns true if items are cached. It moves them into position as a side effect. -func (f *fsCache) Fetch(anchor turbopath.AbsoluteSystemPath, hash string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) { - uncompressedCachePath := f.cacheDirectory.UntypedJoin(hash + ".tar") - compressedCachePath := f.cacheDirectory.UntypedJoin(hash + ".tar.zst") - - var actualCachePath turbopath.AbsoluteSystemPath - if uncompressedCachePath.FileExists() { - actualCachePath = uncompressedCachePath - } else if compressedCachePath.FileExists() { - actualCachePath = compressedCachePath - } else { - // It's not in the cache, bail now - f.logFetch(false, hash, 0) - return ItemStatus{Local: false}, nil, 0, nil - } - - cacheItem, openErr := cacheitem.Open(actualCachePath) - if openErr != nil { - return ItemStatus{Local: false}, nil, 0, openErr - } - - restoredFiles, restoreErr := cacheItem.Restore(anchor) - if restoreErr != nil { - _ = cacheItem.Close() - return ItemStatus{Local: false}, nil, 0, restoreErr - } - - meta, err := ReadCacheMetaFile(f.cacheDirectory.UntypedJoin(hash + "-meta.json")) - if err != nil { - _ = cacheItem.Close() - return ItemStatus{Local: false}, nil, 0, fmt.Errorf("error reading cache metadata: %w", err) - } - f.logFetch(true, hash, meta.Duration) - - // Wait to see what happens with close. - closeErr := cacheItem.Close() - if closeErr != nil { - return ItemStatus{Local: false}, restoredFiles, 0, closeErr - } - return ItemStatus{Local: true}, restoredFiles, meta.Duration, nil -} - -func (f *fsCache) Exists(hash string) ItemStatus { - uncompressedCachePath := f.cacheDirectory.UntypedJoin(hash + ".tar") - compressedCachePath := f.cacheDirectory.UntypedJoin(hash + ".tar.zst") - - if compressedCachePath.FileExists() || uncompressedCachePath.FileExists() { - return ItemStatus{Local: true} - } - - return ItemStatus{Local: false} -} - -func (f *fsCache) logFetch(hit bool, hash string, duration int) { - var event string - if hit { - event = CacheEventHit - } else { - event = CacheEventMiss - } - payload := &CacheEvent{ - Source: CacheSourceFS, - Event: event, - Hash: hash, - Duration: duration, - } - f.recorder.LogEvent(payload) -} - -func (f *fsCache) Put(anchor turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error { - cachePath := f.cacheDirectory.UntypedJoin(hash + ".tar.zst") - cacheItem, err := cacheitem.Create(cachePath) - if err != nil { - return err - } - - for _, file := range files { - err := cacheItem.AddFile(anchor, file) - if err != nil { - _ = cacheItem.Close() - return err - } - } - - writeErr := WriteCacheMetaFile(f.cacheDirectory.UntypedJoin(hash+"-meta.json"), &CacheMetadata{ - Duration: duration, - Hash: hash, - }) - - if writeErr != nil { - _ = cacheItem.Close() - return writeErr - } - - return cacheItem.Close() -} - -func (f *fsCache) Clean(_ turbopath.AbsoluteSystemPath) { - fmt.Println("Not implemented yet") -} - -func (f *fsCache) CleanAll() { - fmt.Println("Not implemented yet") -} - -func (f *fsCache) Shutdown() {} - -// CacheMetadata stores duration and hash information for a cache entry so that aggregate Time Saved calculations -// can be made from artifacts from various caches -type CacheMetadata struct { - Hash string `json:"hash"` - Duration int `json:"duration"` -} - -// WriteCacheMetaFile writes cache metadata file at a path -func WriteCacheMetaFile(path turbopath.AbsoluteSystemPath, config *CacheMetadata) error { - jsonBytes, marshalErr := json.Marshal(config) - if marshalErr != nil { - return marshalErr - } - writeFilErr := path.WriteFile(jsonBytes, 0644) - if writeFilErr != nil { - return writeFilErr - } - return nil -} - -// ReadCacheMetaFile reads cache metadata file at a path -func ReadCacheMetaFile(path turbopath.AbsoluteSystemPath) (*CacheMetadata, error) { - jsonBytes, readFileErr := path.ReadFile() - if readFileErr != nil { - return nil, readFileErr - } - var config CacheMetadata - marshalErr := json.Unmarshal(jsonBytes, &config) - if marshalErr != nil { - return nil, marshalErr - } - return &config, nil -} diff --git a/cli/internal/cache/cache_fs_test.go b/cli/internal/cache/cache_fs_test.go deleted file mode 100644 index 614ad86..0000000 --- a/cli/internal/cache/cache_fs_test.go +++ /dev/null @@ -1,253 +0,0 @@ -package cache - -import ( - "path/filepath" - "testing" - - "github.com/vercel/turbo/cli/internal/analytics" - "github.com/vercel/turbo/cli/internal/cacheitem" - "github.com/vercel/turbo/cli/internal/turbopath" - "gotest.tools/v3/assert" -) - -type dummyRecorder struct{} - -func (dr *dummyRecorder) LogEvent(payload analytics.EventPayload) {} - -func TestPut(t *testing.T) { - // Set up a test source and cache directory - // The "source" directory simulates a package - // - // / - // b - // child/ - // a - // link -> ../b - // broken -> missing - // - // Ensure we end up with a matching directory under a - // "cache" directory: - // - // /the-hash//... - - src := turbopath.AbsoluteSystemPath(t.TempDir()) - childDir := src.UntypedJoin("child") - err := childDir.MkdirAll(0775) - assert.NilError(t, err, "Mkdir") - aPath := childDir.UntypedJoin("a") - aFile, err := aPath.Create() - assert.NilError(t, err, "Create") - _, err = aFile.WriteString("hello") - assert.NilError(t, err, "WriteString") - assert.NilError(t, aFile.Close(), "Close") - - bPath := src.UntypedJoin("b") - bFile, err := bPath.Create() - assert.NilError(t, err, "Create") - _, err = bFile.WriteString("bFile") - assert.NilError(t, err, "WriteString") - assert.NilError(t, bFile.Close(), "Close") - - srcLinkPath := childDir.UntypedJoin("link") - linkTarget := filepath.FromSlash("../b") - assert.NilError(t, srcLinkPath.Symlink(linkTarget), "Symlink") - - srcBrokenLinkPath := childDir.Join("broken") - assert.NilError(t, srcBrokenLinkPath.Symlink("missing"), "Symlink") - circlePath := childDir.Join("circle") - assert.NilError(t, circlePath.Symlink(filepath.FromSlash("../child")), "Symlink") - - files := []turbopath.AnchoredSystemPath{ - turbopath.AnchoredUnixPath("child/").ToSystemPath(), // childDir - turbopath.AnchoredUnixPath("child/a").ToSystemPath(), // aPath, - turbopath.AnchoredUnixPath("b").ToSystemPath(), // bPath, - turbopath.AnchoredUnixPath("child/link").ToSystemPath(), // srcLinkPath, - turbopath.AnchoredUnixPath("child/broken").ToSystemPath(), // srcBrokenLinkPath, - turbopath.AnchoredUnixPath("child/circle").ToSystemPath(), // circlePath - } - - dst := turbopath.AbsoluteSystemPath(t.TempDir()) - dr := &dummyRecorder{} - - cache := &fsCache{ - cacheDirectory: dst, - recorder: dr, - } - - hash := "the-hash" - duration := 0 - putErr := cache.Put(src, hash, duration, files) - assert.NilError(t, putErr, "Put") - - // Verify that we got the files that we're expecting - dstCachePath := dst.UntypedJoin(hash) - - // This test checks outputs, so we go ahead and pull things back out. - // Attempting to satisfy our beliefs that the change is viable with - // as few changes to the tests as possible. - cacheItem, openErr := cacheitem.Open(dst.UntypedJoin(hash + ".tar.zst")) - assert.NilError(t, openErr, "Open") - - _, restoreErr := cacheItem.Restore(dstCachePath) - assert.NilError(t, restoreErr, "Restore") - - dstAPath := dstCachePath.UntypedJoin("child", "a") - assertFileMatches(t, aPath, dstAPath) - - dstBPath := dstCachePath.UntypedJoin("b") - assertFileMatches(t, bPath, dstBPath) - - dstLinkPath := dstCachePath.UntypedJoin("child", "link") - target, err := dstLinkPath.Readlink() - assert.NilError(t, err, "Readlink") - if target != linkTarget { - t.Errorf("Readlink got %v, want %v", target, linkTarget) - } - - dstBrokenLinkPath := dstCachePath.UntypedJoin("child", "broken") - target, err = dstBrokenLinkPath.Readlink() - assert.NilError(t, err, "Readlink") - if target != "missing" { - t.Errorf("Readlink got %v, want missing", target) - } - - dstCirclePath := dstCachePath.UntypedJoin("child", "circle") - circleLinkDest, err := dstCirclePath.Readlink() - assert.NilError(t, err, "Readlink") - expectedCircleLinkDest := filepath.FromSlash("../child") - if circleLinkDest != expectedCircleLinkDest { - t.Errorf("Cache link got %v, want %v", circleLinkDest, expectedCircleLinkDest) - } - - assert.NilError(t, cacheItem.Close(), "Close") -} - -func assertFileMatches(t *testing.T, orig turbopath.AbsoluteSystemPath, copy turbopath.AbsoluteSystemPath) { - t.Helper() - origBytes, err := orig.ReadFile() - assert.NilError(t, err, "ReadFile") - copyBytes, err := copy.ReadFile() - assert.NilError(t, err, "ReadFile") - assert.DeepEqual(t, origBytes, copyBytes) - origStat, err := orig.Lstat() - assert.NilError(t, err, "Lstat") - copyStat, err := copy.Lstat() - assert.NilError(t, err, "Lstat") - assert.Equal(t, origStat.Mode(), copyStat.Mode()) -} - -func TestFetch(t *testing.T) { - // Set up a test cache directory and target output directory - // The "cacheDir" directory simulates a cached package - // - // / - // the-hash-meta.json - // the-hash/ - // some-package/ - // b - // child/ - // a - // link -> ../b - // broken -> missing - // circle -> ../child - // - // Ensure we end up with a matching directory under a - // "some-package" directory: - // - // "some-package"/... - - cacheDir := turbopath.AbsoluteSystemPath(t.TempDir()) - hash := "the-hash" - src := cacheDir.UntypedJoin(hash, "some-package") - err := src.MkdirAll(0775) - assert.NilError(t, err, "mkdirAll") - - childDir := src.UntypedJoin("child") - err = childDir.MkdirAll(0775) - assert.NilError(t, err, "Mkdir") - aPath := childDir.UntypedJoin("a") - aFile, err := aPath.Create() - assert.NilError(t, err, "Create") - _, err = aFile.WriteString("hello") - assert.NilError(t, err, "WriteString") - assert.NilError(t, aFile.Close(), "Close") - - bPath := src.UntypedJoin("b") - bFile, err := bPath.Create() - assert.NilError(t, err, "Create") - _, err = bFile.WriteString("bFile") - assert.NilError(t, err, "WriteString") - assert.NilError(t, bFile.Close(), "Close") - - srcLinkPath := childDir.UntypedJoin("link") - linkTarget := filepath.FromSlash("../b") - assert.NilError(t, srcLinkPath.Symlink(linkTarget), "Symlink") - - srcBrokenLinkPath := childDir.UntypedJoin("broken") - srcBrokenLinkTarget := turbopath.AnchoredUnixPath("missing").ToSystemPath() - assert.NilError(t, srcBrokenLinkPath.Symlink(srcBrokenLinkTarget.ToString()), "Symlink") - - circlePath := childDir.Join("circle") - srcCircleLinkTarget := turbopath.AnchoredUnixPath("../child").ToSystemPath() - assert.NilError(t, circlePath.Symlink(srcCircleLinkTarget.ToString()), "Symlink") - - metadataPath := cacheDir.UntypedJoin("the-hash-meta.json") - err = metadataPath.WriteFile([]byte(`{"hash":"the-hash","duration":0}`), 0777) - assert.NilError(t, err, "WriteFile") - - dr := &dummyRecorder{} - - cache := &fsCache{ - cacheDirectory: cacheDir, - recorder: dr, - } - - inputFiles := []turbopath.AnchoredSystemPath{ - turbopath.AnchoredUnixPath("some-package/child/").ToSystemPath(), // childDir - turbopath.AnchoredUnixPath("some-package/child/a").ToSystemPath(), // aPath, - turbopath.AnchoredUnixPath("some-package/b").ToSystemPath(), // bPath, - turbopath.AnchoredUnixPath("some-package/child/link").ToSystemPath(), // srcLinkPath, - turbopath.AnchoredUnixPath("some-package/child/broken").ToSystemPath(), // srcBrokenLinkPath, - turbopath.AnchoredUnixPath("some-package/child/circle").ToSystemPath(), // circlePath - } - - putErr := cache.Put(cacheDir.UntypedJoin(hash), hash, 0, inputFiles) - assert.NilError(t, putErr, "Put") - - outputDir := turbopath.AbsoluteSystemPath(t.TempDir()) - dstOutputPath := "some-package" - cacheStatus, files, _, err := cache.Fetch(outputDir, "the-hash", []string{}) - assert.NilError(t, err, "Fetch") - hit := cacheStatus.Local || cacheStatus.Remote - if !hit { - t.Error("Fetch got false, want true") - } - if len(files) != len(inputFiles) { - t.Errorf("len(files) got %v, want %v", len(files), len(inputFiles)) - } - - dstAPath := outputDir.UntypedJoin(dstOutputPath, "child", "a") - assertFileMatches(t, aPath, dstAPath) - - dstBPath := outputDir.UntypedJoin(dstOutputPath, "b") - assertFileMatches(t, bPath, dstBPath) - - dstLinkPath := outputDir.UntypedJoin(dstOutputPath, "child", "link") - target, err := dstLinkPath.Readlink() - assert.NilError(t, err, "Readlink") - if target != linkTarget { - t.Errorf("Readlink got %v, want %v", target, linkTarget) - } - - // Assert that we restore broken symlinks correctly - dstBrokenLinkPath := outputDir.UntypedJoin(dstOutputPath, "child", "broken") - target, readlinkErr := dstBrokenLinkPath.Readlink() - assert.NilError(t, readlinkErr, "Readlink") - assert.Equal(t, target, srcBrokenLinkTarget.ToString()) - - // Assert that we restore symlinks to directories correctly - dstCirclePath := outputDir.UntypedJoin(dstOutputPath, "child", "circle") - circleTarget, circleReadlinkErr := dstCirclePath.Readlink() - assert.NilError(t, circleReadlinkErr, "Circle Readlink") - assert.Equal(t, circleTarget, srcCircleLinkTarget.ToString()) -} diff --git a/cli/internal/cache/cache_http.go b/cli/internal/cache/cache_http.go deleted file mode 100644 index 1d345bf..0000000 --- a/cli/internal/cache/cache_http.go +++ /dev/null @@ -1,375 +0,0 @@ -// Adapted from https://github.com/thought-machine/please -// Copyright Thought Machine, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 -package cache - -import ( - "archive/tar" - "bytes" - "errors" - "fmt" - "io" - "io/ioutil" - log "log" - "net/http" - "os" - "path/filepath" - "strconv" - "time" - - "github.com/DataDog/zstd" - - "github.com/vercel/turbo/cli/internal/analytics" - "github.com/vercel/turbo/cli/internal/tarpatch" - "github.com/vercel/turbo/cli/internal/turbopath" -) - -type client interface { - PutArtifact(hash string, body []byte, duration int, tag string) error - FetchArtifact(hash string) (*http.Response, error) - ArtifactExists(hash string) (*http.Response, error) - GetTeamID() string -} - -type httpCache struct { - writable bool - client client - requestLimiter limiter - recorder analytics.Recorder - signerVerifier *ArtifactSignatureAuthentication - repoRoot turbopath.AbsoluteSystemPath -} - -type limiter chan struct{} - -func (l limiter) acquire() { - l <- struct{}{} -} - -func (l limiter) release() { - <-l -} - -// mtime is the time we attach for the modification time of all files. -var mtime = time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC) - -// nobody is the usual uid / gid of the 'nobody' user. -const nobody = 65534 - -func (cache *httpCache) Put(_ turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error { - // if cache.writable { - cache.requestLimiter.acquire() - defer cache.requestLimiter.release() - - r, w := io.Pipe() - go cache.write(w, hash, files) - - // Read the entire artifact tar into memory so we can easily compute the signature. - // Note: retryablehttp.NewRequest reads the files into memory anyways so there's no - // additional overhead by doing the ioutil.ReadAll here instead. - artifactBody, err := ioutil.ReadAll(r) - if err != nil { - return fmt.Errorf("failed to store files in HTTP cache: %w", err) - } - tag := "" - if cache.signerVerifier.isEnabled() { - tag, err = cache.signerVerifier.generateTag(hash, artifactBody) - if err != nil { - return fmt.Errorf("failed to store files in HTTP cache: %w", err) - } - } - return cache.client.PutArtifact(hash, artifactBody, duration, tag) -} - -// write writes a series of files into the given Writer. -func (cache *httpCache) write(w io.WriteCloser, hash string, files []turbopath.AnchoredSystemPath) { - defer w.Close() - defer func() { _ = w.Close() }() - zw := zstd.NewWriter(w) - defer func() { _ = zw.Close() }() - tw := tar.NewWriter(zw) - defer func() { _ = tw.Close() }() - for _, file := range files { - // log.Printf("caching file %v", file) - if err := cache.storeFile(tw, file); err != nil { - log.Printf("[ERROR] Error uploading artifact %s to HTTP cache due to: %s", file, err) - // TODO(jaredpalmer): How can we cancel the request at this point? - } - } -} - -func (cache *httpCache) storeFile(tw *tar.Writer, repoRelativePath turbopath.AnchoredSystemPath) error { - absoluteFilePath := repoRelativePath.RestoreAnchor(cache.repoRoot) - info, err := absoluteFilePath.Lstat() - if err != nil { - return err - } - target := "" - if info.Mode()&os.ModeSymlink != 0 { - target, err = absoluteFilePath.Readlink() - if err != nil { - return err - } - } - hdr, err := tarpatch.FileInfoHeader(repoRelativePath.ToUnixPath(), info, filepath.ToSlash(target)) - if err != nil { - return err - } - // Ensure posix path for filename written in header. - hdr.Name = repoRelativePath.ToUnixPath().ToString() - // Zero out all timestamps. - hdr.ModTime = mtime - hdr.AccessTime = mtime - hdr.ChangeTime = mtime - // Strip user/group ids. - hdr.Uid = nobody - hdr.Gid = nobody - hdr.Uname = "nobody" - hdr.Gname = "nobody" - if err := tw.WriteHeader(hdr); err != nil { - return err - } else if info.IsDir() || target != "" { - return nil // nothing to write - } - f, err := absoluteFilePath.Open() - if err != nil { - return err - } - defer func() { _ = f.Close() }() - _, err = io.Copy(tw, f) - if errors.Is(err, tar.ErrWriteTooLong) { - log.Printf("Error writing %v to tar file, info: %v, mode: %v, is regular: %v", repoRelativePath, info, info.Mode(), info.Mode().IsRegular()) - } - return err -} - -func (cache *httpCache) Fetch(_ turbopath.AbsoluteSystemPath, key string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) { - cache.requestLimiter.acquire() - defer cache.requestLimiter.release() - hit, files, duration, err := cache.retrieve(key) - if err != nil { - // TODO: analytics event? - return ItemStatus{Remote: false}, files, duration, fmt.Errorf("failed to retrieve files from HTTP cache: %w", err) - } - cache.logFetch(hit, key, duration) - return ItemStatus{Remote: hit}, files, duration, err -} - -func (cache *httpCache) Exists(key string) ItemStatus { - cache.requestLimiter.acquire() - defer cache.requestLimiter.release() - hit, err := cache.exists(key) - if err != nil { - return ItemStatus{Remote: false} - } - return ItemStatus{Remote: hit} -} - -func (cache *httpCache) logFetch(hit bool, hash string, duration int) { - var event string - if hit { - event = CacheEventHit - } else { - event = CacheEventMiss - } - payload := &CacheEvent{ - Source: CacheSourceRemote, - Event: event, - Hash: hash, - Duration: duration, - } - cache.recorder.LogEvent(payload) -} - -func (cache *httpCache) exists(hash string) (bool, error) { - resp, err := cache.client.ArtifactExists(hash) - if err != nil { - return false, nil - } - - defer func() { err = resp.Body.Close() }() - - if resp.StatusCode == http.StatusNotFound { - return false, nil - } else if resp.StatusCode != http.StatusOK { - return false, fmt.Errorf("%s", strconv.Itoa(resp.StatusCode)) - } - return true, err -} - -func (cache *httpCache) retrieve(hash string) (bool, []turbopath.AnchoredSystemPath, int, error) { - resp, err := cache.client.FetchArtifact(hash) - if err != nil { - return false, nil, 0, err - } - defer resp.Body.Close() - if resp.StatusCode == http.StatusNotFound { - return false, nil, 0, nil // doesn't exist - not an error - } else if resp.StatusCode != http.StatusOK { - b, _ := ioutil.ReadAll(resp.Body) - return false, nil, 0, fmt.Errorf("%s", string(b)) - } - // If present, extract the duration from the response. - duration := 0 - if resp.Header.Get("x-artifact-duration") != "" { - intVar, err := strconv.Atoi(resp.Header.Get("x-artifact-duration")) - if err != nil { - return false, nil, 0, fmt.Errorf("invalid x-artifact-duration header: %w", err) - } - duration = intVar - } - var tarReader io.Reader - - defer func() { _ = resp.Body.Close() }() - if cache.signerVerifier.isEnabled() { - expectedTag := resp.Header.Get("x-artifact-tag") - if expectedTag == "" { - // If the verifier is enabled all incoming artifact downloads must have a signature - return false, nil, 0, errors.New("artifact verification failed: Downloaded artifact is missing required x-artifact-tag header") - } - b, err := ioutil.ReadAll(resp.Body) - if err != nil { - return false, nil, 0, fmt.Errorf("artifact verification failed: %w", err) - } - isValid, err := cache.signerVerifier.validate(hash, b, expectedTag) - if err != nil { - return false, nil, 0, fmt.Errorf("artifact verification failed: %w", err) - } - if !isValid { - err = fmt.Errorf("artifact verification failed: artifact tag does not match expected tag %s", expectedTag) - return false, nil, 0, err - } - // The artifact has been verified and the body can be read and untarred - tarReader = bytes.NewReader(b) - } else { - tarReader = resp.Body - } - files, err := restoreTar(cache.repoRoot, tarReader) - if err != nil { - return false, nil, 0, err - } - return true, files, duration, nil -} - -// restoreTar returns posix-style repo-relative paths of the files it -// restored. In the future, these should likely be repo-relative system paths -// so that they are suitable for being fed into cache.Put for other caches. -// For now, I think this is working because windows also accepts /-delimited paths. -func restoreTar(root turbopath.AbsoluteSystemPath, reader io.Reader) ([]turbopath.AnchoredSystemPath, error) { - files := []turbopath.AnchoredSystemPath{} - missingLinks := []*tar.Header{} - zr := zstd.NewReader(reader) - var closeError error - defer func() { closeError = zr.Close() }() - tr := tar.NewReader(zr) - for { - hdr, err := tr.Next() - if err != nil { - if err == io.EOF { - for _, link := range missingLinks { - err := restoreSymlink(root, link, true) - if err != nil { - return nil, err - } - } - - return files, closeError - } - return nil, err - } - // hdr.Name is always a posix-style path - // FIXME: THIS IS A BUG. - restoredName := turbopath.AnchoredUnixPath(hdr.Name) - files = append(files, restoredName.ToSystemPath()) - filename := restoredName.ToSystemPath().RestoreAnchor(root) - if isChild, err := root.ContainsPath(filename); err != nil { - return nil, err - } else if !isChild { - return nil, fmt.Errorf("cannot untar file to %v", filename) - } - switch hdr.Typeflag { - case tar.TypeDir: - if err := filename.MkdirAll(0775); err != nil { - return nil, err - } - case tar.TypeReg: - if dir := filename.Dir(); dir != "." { - if err := dir.MkdirAll(0775); err != nil { - return nil, err - } - } - if f, err := filename.OpenFile(os.O_WRONLY|os.O_TRUNC|os.O_CREATE, os.FileMode(hdr.Mode)); err != nil { - return nil, err - } else if _, err := io.Copy(f, tr); err != nil { - return nil, err - } else if err := f.Close(); err != nil { - return nil, err - } - case tar.TypeSymlink: - if err := restoreSymlink(root, hdr, false); errors.Is(err, errNonexistentLinkTarget) { - missingLinks = append(missingLinks, hdr) - } else if err != nil { - return nil, err - } - default: - log.Printf("Unhandled file type %d for %s", hdr.Typeflag, hdr.Name) - } - } -} - -var errNonexistentLinkTarget = errors.New("the link target does not exist") - -func restoreSymlink(root turbopath.AbsoluteSystemPath, hdr *tar.Header, allowNonexistentTargets bool) error { - // Note that hdr.Linkname is really the link target - relativeLinkTarget := filepath.FromSlash(hdr.Linkname) - linkFilename := root.UntypedJoin(hdr.Name) - if err := linkFilename.EnsureDir(); err != nil { - return err - } - - // TODO: check if this is an absolute path, or if we even care - linkTarget := linkFilename.Dir().UntypedJoin(relativeLinkTarget) - if _, err := linkTarget.Lstat(); err != nil { - if os.IsNotExist(err) { - if !allowNonexistentTargets { - return errNonexistentLinkTarget - } - // if we're allowing nonexistent link targets, proceed to creating the link - } else { - return err - } - } - // Ensure that the link we're about to create doesn't already exist - if err := linkFilename.Remove(); err != nil && !errors.Is(err, os.ErrNotExist) { - return err - } - if err := linkFilename.Symlink(relativeLinkTarget); err != nil { - return err - } - return nil -} - -func (cache *httpCache) Clean(_ turbopath.AbsoluteSystemPath) { - // Not possible; this implementation can only clean for a hash. -} - -func (cache *httpCache) CleanAll() { - // Also not possible. -} - -func (cache *httpCache) Shutdown() {} - -func newHTTPCache(opts Opts, client client, recorder analytics.Recorder) *httpCache { - return &httpCache{ - writable: true, - client: client, - requestLimiter: make(limiter, 20), - recorder: recorder, - signerVerifier: &ArtifactSignatureAuthentication{ - // TODO(Gaspar): this should use RemoteCacheOptions.TeamId once we start - // enforcing team restrictions for repositories. - teamId: client.GetTeamID(), - enabled: opts.RemoteCacheOpts.Signature, - }, - } -} diff --git a/cli/internal/cache/cache_http_test.go b/cli/internal/cache/cache_http_test.go deleted file mode 100644 index d187931..0000000 --- a/cli/internal/cache/cache_http_test.go +++ /dev/null @@ -1,245 +0,0 @@ -package cache - -import ( - "archive/tar" - "bytes" - "errors" - "net/http" - "testing" - - "github.com/DataDog/zstd" - - "github.com/vercel/turbo/cli/internal/fs" - "github.com/vercel/turbo/cli/internal/turbopath" - "github.com/vercel/turbo/cli/internal/util" - "gotest.tools/v3/assert" -) - -type errorResp struct { - err error -} - -func (sr *errorResp) PutArtifact(hash string, body []byte, duration int, tag string) error { - return sr.err -} - -func (sr *errorResp) FetchArtifact(hash string) (*http.Response, error) { - return nil, sr.err -} - -func (sr *errorResp) ArtifactExists(hash string) (*http.Response, error) { - return nil, sr.err -} - -func (sr *errorResp) GetTeamID() string { - return "" -} - -func TestRemoteCachingDisabled(t *testing.T) { - clientErr := &util.CacheDisabledError{ - Status: util.CachingStatusDisabled, - Message: "Remote Caching has been disabled for this team. A team owner can enable it here: $URL", - } - client := &errorResp{err: clientErr} - cache := &httpCache{ - client: client, - requestLimiter: make(limiter, 20), - } - cd := &util.CacheDisabledError{} - _, _, _, err := cache.Fetch("unused-target", "some-hash", []string{"unused", "outputs"}) - if !errors.As(err, &cd) { - t.Errorf("cache.Fetch err got %v, want a CacheDisabled error", err) - } - if cd.Status != util.CachingStatusDisabled { - t.Errorf("CacheDisabled.Status got %v, want %v", cd.Status, util.CachingStatusDisabled) - } -} - -func makeValidTar(t *testing.T) *bytes.Buffer { - // - // my-pkg/ - // some-file - // link-to-extra-file -> ../extra-file - // broken-link -> ../../global-dep - // extra-file - - t.Helper() - buf := &bytes.Buffer{} - zw := zstd.NewWriter(buf) - defer func() { - if err := zw.Close(); err != nil { - t.Fatalf("failed to close gzip: %v", err) - } - }() - tw := tar.NewWriter(zw) - defer func() { - if err := tw.Close(); err != nil { - t.Fatalf("failed to close tar: %v", err) - } - }() - - // my-pkg - h := &tar.Header{ - Name: "my-pkg/", - Mode: int64(0644), - Typeflag: tar.TypeDir, - } - if err := tw.WriteHeader(h); err != nil { - t.Fatalf("failed to write header: %v", err) - } - // my-pkg/some-file - contents := []byte("some-file-contents") - h = &tar.Header{ - Name: "my-pkg/some-file", - Mode: int64(0644), - Typeflag: tar.TypeReg, - Size: int64(len(contents)), - } - if err := tw.WriteHeader(h); err != nil { - t.Fatalf("failed to write header: %v", err) - } - if _, err := tw.Write(contents); err != nil { - t.Fatalf("failed to write file: %v", err) - } - // my-pkg/link-to-extra-file - h = &tar.Header{ - Name: "my-pkg/link-to-extra-file", - Mode: int64(0644), - Typeflag: tar.TypeSymlink, - Linkname: "../extra-file", - } - if err := tw.WriteHeader(h); err != nil { - t.Fatalf("failed to write header: %v", err) - } - // my-pkg/broken-link - h = &tar.Header{ - Name: "my-pkg/broken-link", - Mode: int64(0644), - Typeflag: tar.TypeSymlink, - Linkname: "../../global-dep", - } - if err := tw.WriteHeader(h); err != nil { - t.Fatalf("failed to write header: %v", err) - } - // extra-file - contents = []byte("extra-file-contents") - h = &tar.Header{ - Name: "extra-file", - Mode: int64(0644), - Typeflag: tar.TypeReg, - Size: int64(len(contents)), - } - if err := tw.WriteHeader(h); err != nil { - t.Fatalf("failed to write header: %v", err) - } - if _, err := tw.Write(contents); err != nil { - t.Fatalf("failed to write file: %v", err) - } - - return buf -} - -func makeInvalidTar(t *testing.T) *bytes.Buffer { - // contains a single file that traverses out - // ../some-file - - t.Helper() - buf := &bytes.Buffer{} - zw := zstd.NewWriter(buf) - defer func() { - if err := zw.Close(); err != nil { - t.Fatalf("failed to close gzip: %v", err) - } - }() - tw := tar.NewWriter(zw) - defer func() { - if err := tw.Close(); err != nil { - t.Fatalf("failed to close tar: %v", err) - } - }() - - // my-pkg/some-file - contents := []byte("some-file-contents") - h := &tar.Header{ - Name: "../some-file", - Mode: int64(0644), - Typeflag: tar.TypeReg, - Size: int64(len(contents)), - } - if err := tw.WriteHeader(h); err != nil { - t.Fatalf("failed to write header: %v", err) - } - if _, err := tw.Write(contents); err != nil { - t.Fatalf("failed to write file: %v", err) - } - return buf -} - -func TestRestoreTar(t *testing.T) { - root := fs.AbsoluteSystemPathFromUpstream(t.TempDir()) - - tar := makeValidTar(t) - - expectedFiles := []turbopath.AnchoredSystemPath{ - turbopath.AnchoredUnixPath("extra-file").ToSystemPath(), - turbopath.AnchoredUnixPath("my-pkg/").ToSystemPath(), - turbopath.AnchoredUnixPath("my-pkg/some-file").ToSystemPath(), - turbopath.AnchoredUnixPath("my-pkg/link-to-extra-file").ToSystemPath(), - turbopath.AnchoredUnixPath("my-pkg/broken-link").ToSystemPath(), - } - files, err := restoreTar(root, tar) - assert.NilError(t, err, "readTar") - - expectedSet := make(util.Set) - for _, file := range expectedFiles { - expectedSet.Add(file.ToString()) - } - gotSet := make(util.Set) - for _, file := range files { - gotSet.Add(file.ToString()) - } - extraFiles := gotSet.Difference(expectedSet) - if extraFiles.Len() > 0 { - t.Errorf("got extra files: %v", extraFiles.UnsafeListOfStrings()) - } - missingFiles := expectedSet.Difference(gotSet) - if missingFiles.Len() > 0 { - t.Errorf("missing expected files: %v", missingFiles.UnsafeListOfStrings()) - } - - // Verify file contents - extraFile := root.UntypedJoin("extra-file") - contents, err := extraFile.ReadFile() - assert.NilError(t, err, "ReadFile") - assert.DeepEqual(t, contents, []byte("extra-file-contents")) - - someFile := root.UntypedJoin("my-pkg", "some-file") - contents, err = someFile.ReadFile() - assert.NilError(t, err, "ReadFile") - assert.DeepEqual(t, contents, []byte("some-file-contents")) -} - -func TestRestoreInvalidTar(t *testing.T) { - root := fs.AbsoluteSystemPathFromUpstream(t.TempDir()) - expectedContents := []byte("important-data") - someFile := root.UntypedJoin("some-file") - err := someFile.WriteFile(expectedContents, 0644) - assert.NilError(t, err, "WriteFile") - - tar := makeInvalidTar(t) - // use a child directory so that blindly untarring will squash the file - // that we just wrote above. - repoRoot := root.UntypedJoin("repo") - _, err = restoreTar(repoRoot, tar) - if err == nil { - t.Error("expected error untarring invalid tar") - } - - contents, err := someFile.ReadFile() - assert.NilError(t, err, "ReadFile") - assert.Equal(t, string(contents), string(expectedContents), "expected to not overwrite file") -} - -// Note that testing Put will require mocking the filesystem and is not currently the most -// interesting test. The current implementation directly returns the error from PutArtifact. -// We should still add the test once feasible to avoid future breakage. diff --git a/cli/internal/cache/cache_noop.go b/cli/internal/cache/cache_noop.go deleted file mode 100644 index 80a3c23..0000000 --- a/cli/internal/cache/cache_noop.go +++ /dev/null @@ -1,23 +0,0 @@ -package cache - -import "github.com/vercel/turbo/cli/internal/turbopath" - -type noopCache struct{} - -func newNoopCache() *noopCache { - return &noopCache{} -} - -func (c *noopCache) Put(_ turbopath.AbsoluteSystemPath, _ string, _ int, _ []turbopath.AnchoredSystemPath) error { - return nil -} -func (c *noopCache) Fetch(_ turbopath.AbsoluteSystemPath, _ string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) { - return ItemStatus{Local: false, Remote: false}, nil, 0, nil -} -func (c *noopCache) Exists(_ string) ItemStatus { - return ItemStatus{} -} - -func (c *noopCache) Clean(_ turbopath.AbsoluteSystemPath) {} -func (c *noopCache) CleanAll() {} -func (c *noopCache) Shutdown() {} diff --git a/cli/internal/cache/cache_signature_authentication.go b/cli/internal/cache/cache_signature_authentication.go deleted file mode 100644 index f9fe4c0..0000000 --- a/cli/internal/cache/cache_signature_authentication.go +++ /dev/null @@ -1,88 +0,0 @@ -// Adapted from https://github.com/thought-machine/please -// Copyright Thought Machine, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 -package cache - -import ( - "crypto/hmac" - "crypto/sha256" - "encoding/base64" - "encoding/json" - "errors" - "fmt" - "hash" - "os" -) - -type ArtifactSignatureAuthentication struct { - teamId string - enabled bool -} - -func (asa *ArtifactSignatureAuthentication) isEnabled() bool { - return asa.enabled -} - -// If the secret key is not found or the secret key length is 0, an error is returned -// Preference is given to the environment specified secret key. -func (asa *ArtifactSignatureAuthentication) secretKey() ([]byte, error) { - secret := os.Getenv("TURBO_REMOTE_CACHE_SIGNATURE_KEY") - if len(secret) == 0 { - return nil, errors.New("signature secret key not found. You must specify a secret key in the TURBO_REMOTE_CACHE_SIGNATURE_KEY environment variable") - } - return []byte(secret), nil -} - -func (asa *ArtifactSignatureAuthentication) generateTag(hash string, artifactBody []byte) (string, error) { - tag, err := asa.getTagGenerator(hash) - if err != nil { - return "", err - } - tag.Write(artifactBody) - return base64.StdEncoding.EncodeToString(tag.Sum(nil)), nil -} - -func (asa *ArtifactSignatureAuthentication) getTagGenerator(hash string) (hash.Hash, error) { - teamId := asa.teamId - secret, err := asa.secretKey() - if err != nil { - return nil, err - } - artifactMetadata := &struct { - Hash string `json:"hash"` - TeamId string `json:"teamId"` - }{ - Hash: hash, - TeamId: teamId, - } - metadata, err := json.Marshal(artifactMetadata) - if err != nil { - return nil, err - } - - // TODO(Gaspar) Support additional signing algorithms here - h := hmac.New(sha256.New, secret) - h.Write(metadata) - return h, nil -} - -func (asa *ArtifactSignatureAuthentication) validate(hash string, artifactBody []byte, expectedTag string) (bool, error) { - computedTag, err := asa.generateTag(hash, artifactBody) - if err != nil { - return false, fmt.Errorf("failed to verify artifact tag: %w", err) - } - return hmac.Equal([]byte(computedTag), []byte(expectedTag)), nil -} - -type StreamValidator struct { - currentHash hash.Hash -} - -func (sv *StreamValidator) Validate(expectedTag string) bool { - computedTag := base64.StdEncoding.EncodeToString(sv.currentHash.Sum(nil)) - return hmac.Equal([]byte(computedTag), []byte(expectedTag)) -} - -func (sv *StreamValidator) CurrentValue() string { - return base64.StdEncoding.EncodeToString(sv.currentHash.Sum(nil)) -} diff --git a/cli/internal/cache/cache_signature_authentication_test.go b/cli/internal/cache/cache_signature_authentication_test.go deleted file mode 100644 index 7f3f865..0000000 --- a/cli/internal/cache/cache_signature_authentication_test.go +++ /dev/null @@ -1,195 +0,0 @@ -// Adapted from ghttps://github.com/thought-machine/please -// Copyright Thought Machine, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 -package cache - -import ( - "crypto/hmac" - "crypto/sha256" - "encoding/base64" - "encoding/json" - "testing" - - "github.com/stretchr/testify/assert" -) - -func Test_SecretKeySuccess(t *testing.T) { - teamId := "team_someid" - secretKeyEnvName := "TURBO_REMOTE_CACHE_SIGNATURE_KEY" - secretKeyEnvValue := "my-secret-key-env" - t.Setenv(secretKeyEnvName, secretKeyEnvValue) - - cases := []struct { - name string - asa *ArtifactSignatureAuthentication - expectedSecretKey string - expectedSecretKeyError bool - }{ - { - name: "Accepts secret key", - asa: &ArtifactSignatureAuthentication{ - teamId: teamId, - enabled: true, - }, - expectedSecretKey: secretKeyEnvValue, - expectedSecretKeyError: false, - }, - } - - for _, tc := range cases { - t.Run(tc.name, func(t *testing.T) { - secretKey, err := tc.asa.secretKey() - if tc.expectedSecretKeyError { - assert.Error(t, err) - } else { - assert.NoError(t, err) - assert.Equal(t, tc.expectedSecretKey, string(secretKey)) - } - }) - } -} - -func Test_SecretKeyErrors(t *testing.T) { - teamId := "team_someid" - - // Env secret key TURBO_REMOTE_CACHE_SIGNATURE_KEY is not set - - cases := []struct { - name string - asa *ArtifactSignatureAuthentication - expectedSecretKey string - expectedSecretKeyError bool - }{ - { - name: "Secret key not defined errors", - asa: &ArtifactSignatureAuthentication{ - teamId: teamId, - enabled: true, - }, - expectedSecretKey: "", - expectedSecretKeyError: true, - }, - { - name: "Secret key is empty errors", - asa: &ArtifactSignatureAuthentication{ - teamId: teamId, - enabled: true, - }, - expectedSecretKey: "", - expectedSecretKeyError: true, - }, - } - - for _, tc := range cases { - t.Run(tc.name, func(t *testing.T) { - secretKey, err := tc.asa.secretKey() - if tc.expectedSecretKeyError { - assert.Error(t, err) - } else { - assert.NoError(t, err) - assert.Equal(t, tc.expectedSecretKey, string(secretKey)) - } - }) - } -} - -func Test_GenerateTagAndValidate(t *testing.T) { - teamId := "team_someid" - hash := "the-artifact-hash" - artifactBody := []byte("the artifact body as bytes") - secretKeyEnvName := "TURBO_REMOTE_CACHE_SIGNATURE_KEY" - secretKeyEnvValue := "my-secret-key-env" - t.Setenv(secretKeyEnvName, secretKeyEnvValue) - - cases := []struct { - name string - asa *ArtifactSignatureAuthentication - expectedTagMatches string - expectedTagDoesNotMatch string - }{ - { - name: "Uses hash to generate tag", - asa: &ArtifactSignatureAuthentication{ - teamId: teamId, - enabled: true, - }, - expectedTagMatches: testUtilGetHMACTag(hash, teamId, artifactBody, secretKeyEnvValue), - expectedTagDoesNotMatch: testUtilGetHMACTag("wrong-hash", teamId, artifactBody, secretKeyEnvValue), - }, - { - name: "Uses teamId to generate tag", - asa: &ArtifactSignatureAuthentication{ - teamId: teamId, - enabled: true, - }, - expectedTagMatches: testUtilGetHMACTag(hash, teamId, artifactBody, secretKeyEnvValue), - expectedTagDoesNotMatch: testUtilGetHMACTag(hash, "wrong-teamId", artifactBody, secretKeyEnvValue), - }, - { - name: "Uses artifactBody to generate tag", - asa: &ArtifactSignatureAuthentication{ - teamId: teamId, - enabled: true, - }, - expectedTagMatches: testUtilGetHMACTag(hash, teamId, artifactBody, secretKeyEnvValue), - expectedTagDoesNotMatch: testUtilGetHMACTag(hash, teamId, []byte("wrong-artifact-body"), secretKeyEnvValue), - }, - { - name: "Uses secret to generate tag", - asa: &ArtifactSignatureAuthentication{ - teamId: teamId, - enabled: true, - }, - expectedTagMatches: testUtilGetHMACTag(hash, teamId, artifactBody, secretKeyEnvValue), - expectedTagDoesNotMatch: testUtilGetHMACTag(hash, teamId, artifactBody, "wrong-secret"), - }, - } - - for _, tc := range cases { - t.Run(tc.name, func(t *testing.T) { - tag, err := tc.asa.generateTag(hash, artifactBody) - assert.NoError(t, err) - - // validates the tag - assert.Equal(t, tc.expectedTagMatches, tag) - isValid, err := tc.asa.validate(hash, artifactBody, tc.expectedTagMatches) - assert.NoError(t, err) - assert.True(t, isValid) - - // does not validate the tag - assert.NotEqual(t, tc.expectedTagDoesNotMatch, tag) - isValid, err = tc.asa.validate(hash, artifactBody, tc.expectedTagDoesNotMatch) - assert.NoError(t, err) - assert.False(t, isValid) - - }) - } -} - -// Test utils - -// Return the Base64 encoded HMAC given the artifact metadata and artifact body -func testUtilGetHMACTag(hash string, teamId string, artifactBody []byte, secret string) string { - artifactMetadata := &struct { - Hash string `json:"hash"` - TeamId string `json:"teamId"` - }{ - Hash: hash, - TeamId: teamId, - } - metadata, _ := json.Marshal(artifactMetadata) - h := hmac.New(sha256.New, []byte(secret)) - h.Write(metadata) - h.Write(artifactBody) - return base64.StdEncoding.EncodeToString(h.Sum(nil)) -} - -func Test_Utils(t *testing.T) { - teamId := "team_someid" - secret := "my-secret" - hash := "the-artifact-hash" - artifactBody := []byte("the artifact body as bytes") - testTag := testUtilGetHMACTag(hash, teamId, artifactBody, secret) - expectedTag := "9Fu8YniPZ2dEBolTPQoNlFWG0LNMW8EXrBsRmf/fEHk=" - assert.True(t, hmac.Equal([]byte(testTag), []byte(expectedTag))) -} diff --git a/cli/internal/cache/cache_test.go b/cli/internal/cache/cache_test.go deleted file mode 100644 index 3f17877..0000000 --- a/cli/internal/cache/cache_test.go +++ /dev/null @@ -1,318 +0,0 @@ -package cache - -import ( - "net/http" - "reflect" - "sync/atomic" - "testing" - - "github.com/vercel/turbo/cli/internal/analytics" - "github.com/vercel/turbo/cli/internal/fs" - "github.com/vercel/turbo/cli/internal/turbopath" - "github.com/vercel/turbo/cli/internal/util" -) - -type testCache struct { - disabledErr *util.CacheDisabledError - entries map[string][]turbopath.AnchoredSystemPath -} - -func (tc *testCache) Fetch(_ turbopath.AbsoluteSystemPath, hash string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) { - if tc.disabledErr != nil { - return ItemStatus{}, nil, 0, tc.disabledErr - } - foundFiles, ok := tc.entries[hash] - if ok { - duration := 5 - return ItemStatus{Local: true}, foundFiles, duration, nil - } - return ItemStatus{}, nil, 0, nil -} - -func (tc *testCache) Exists(hash string) ItemStatus { - if tc.disabledErr != nil { - return ItemStatus{} - } - _, ok := tc.entries[hash] - if ok { - return ItemStatus{Local: true} - } - return ItemStatus{} -} - -func (tc *testCache) Put(_ turbopath.AbsoluteSystemPath, hash string, _ int, files []turbopath.AnchoredSystemPath) error { - if tc.disabledErr != nil { - return tc.disabledErr - } - tc.entries[hash] = files - return nil -} - -func (tc *testCache) Clean(_ turbopath.AbsoluteSystemPath) {} -func (tc *testCache) CleanAll() {} -func (tc *testCache) Shutdown() {} - -func newEnabledCache() *testCache { - return &testCache{ - entries: make(map[string][]turbopath.AnchoredSystemPath), - } -} - -func newDisabledCache() *testCache { - return &testCache{ - disabledErr: &util.CacheDisabledError{ - Status: util.CachingStatusDisabled, - Message: "remote caching is disabled", - }, - } -} - -func TestPutCachingDisabled(t *testing.T) { - disabledCache := newDisabledCache() - caches := []Cache{ - newEnabledCache(), - disabledCache, - newEnabledCache(), - newEnabledCache(), - } - var removeCalled uint64 - mplex := &cacheMultiplexer{ - caches: caches, - onCacheRemoved: func(cache Cache, err error) { - atomic.AddUint64(&removeCalled, 1) - }, - } - - err := mplex.Put("unused-target", "some-hash", 5, []turbopath.AnchoredSystemPath{"a-file"}) - if err != nil { - // don't leak the cache removal - t.Errorf("Put got error %v, want ", err) - } - - removes := atomic.LoadUint64(&removeCalled) - if removes != 1 { - t.Errorf("removes count: %v, want 1", removes) - } - - mplex.mu.RLock() - if len(mplex.caches) != 3 { - t.Errorf("found %v caches, expected to have 3 after one was removed", len(mplex.caches)) - } - for _, cache := range mplex.caches { - if cache == disabledCache { - t.Error("found disabled cache, expected it to be removed") - } - } - mplex.mu.RUnlock() - - // subsequent Fetch should still work - cacheStatus, _, _, err := mplex.Fetch("unused-target", "some-hash", []string{"unused", "files"}) - if err != nil { - t.Errorf("got error fetching files: %v", err) - } - hit := cacheStatus.Local || cacheStatus.Remote - if !hit { - t.Error("failed to find previously stored files") - } - - removes = atomic.LoadUint64(&removeCalled) - if removes != 1 { - t.Errorf("removes count: %v, want 1", removes) - } -} - -func TestExists(t *testing.T) { - caches := []Cache{ - newEnabledCache(), - } - - mplex := &cacheMultiplexer{ - caches: caches, - } - - itemStatus := mplex.Exists("some-hash") - if itemStatus.Local { - t.Error("did not expect file to exist") - } - - err := mplex.Put("unused-target", "some-hash", 5, []turbopath.AnchoredSystemPath{"a-file"}) - if err != nil { - // don't leak the cache removal - t.Errorf("Put got error %v, want ", err) - } - - itemStatus = mplex.Exists("some-hash") - if !itemStatus.Local { - t.Error("failed to find previously stored files") - } -} - -type fakeClient struct{} - -// FetchArtifact implements client -func (*fakeClient) FetchArtifact(hash string) (*http.Response, error) { - panic("unimplemented") -} - -func (*fakeClient) ArtifactExists(hash string) (*http.Response, error) { - panic("unimplemented") -} - -// GetTeamID implements client -func (*fakeClient) GetTeamID() string { - return "fake-team-id" -} - -// PutArtifact implements client -func (*fakeClient) PutArtifact(hash string, body []byte, duration int, tag string) error { - panic("unimplemented") -} - -var _ client = &fakeClient{} - -func TestFetchCachingDisabled(t *testing.T) { - disabledCache := newDisabledCache() - caches := []Cache{ - newEnabledCache(), - disabledCache, - newEnabledCache(), - newEnabledCache(), - } - var removeCalled uint64 - mplex := &cacheMultiplexer{ - caches: caches, - onCacheRemoved: func(cache Cache, err error) { - atomic.AddUint64(&removeCalled, 1) - }, - } - - cacheStatus, _, _, err := mplex.Fetch("unused-target", "some-hash", []string{"unused", "files"}) - if err != nil { - // don't leak the cache removal - t.Errorf("Fetch got error %v, want ", err) - } - hit := cacheStatus.Local || cacheStatus.Remote - if hit { - t.Error("hit on empty cache, expected miss") - } - - removes := atomic.LoadUint64(&removeCalled) - if removes != 1 { - t.Errorf("removes count: %v, want 1", removes) - } - - mplex.mu.RLock() - if len(mplex.caches) != 3 { - t.Errorf("found %v caches, expected to have 3 after one was removed", len(mplex.caches)) - } - for _, cache := range mplex.caches { - if cache == disabledCache { - t.Error("found disabled cache, expected it to be removed") - } - } - mplex.mu.RUnlock() -} - -type nullRecorder struct{} - -func (nullRecorder) LogEvent(analytics.EventPayload) {} - -func TestNew(t *testing.T) { - // Test will bomb if this fails, no need to specially handle the error - repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir()) - type args struct { - opts Opts - recorder analytics.Recorder - onCacheRemoved OnCacheRemoved - client fakeClient - } - tests := []struct { - name string - args args - want Cache - wantErr bool - }{ - { - name: "With no caches configured, new returns a noopCache and an error", - args: args{ - opts: Opts{ - SkipFilesystem: true, - SkipRemote: true, - }, - recorder: &nullRecorder{}, - onCacheRemoved: func(Cache, error) {}, - }, - want: &noopCache{}, - wantErr: true, - }, - { - name: "With just httpCache configured, new returns an httpCache and a noopCache", - args: args{ - opts: Opts{ - SkipFilesystem: true, - RemoteCacheOpts: fs.RemoteCacheOptions{ - Signature: true, - }, - }, - recorder: &nullRecorder{}, - onCacheRemoved: func(Cache, error) {}, - }, - want: &cacheMultiplexer{ - caches: []Cache{&httpCache{}, &noopCache{}}, - }, - wantErr: false, - }, - { - name: "With just fsCache configured, new returns only an fsCache", - args: args{ - opts: Opts{ - SkipRemote: true, - }, - recorder: &nullRecorder{}, - onCacheRemoved: func(Cache, error) {}, - }, - want: &fsCache{}, - }, - { - name: "With both configured, new returns an fsCache and httpCache", - args: args{ - opts: Opts{ - RemoteCacheOpts: fs.RemoteCacheOptions{ - Signature: true, - }, - }, - recorder: &nullRecorder{}, - onCacheRemoved: func(Cache, error) {}, - }, - want: &cacheMultiplexer{ - caches: []Cache{&fsCache{}, &httpCache{}}, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := New(tt.args.opts, repoRoot, &tt.args.client, tt.args.recorder, tt.args.onCacheRemoved) - if (err != nil) != tt.wantErr { - t.Errorf("New() error = %v, wantErr %v", err, tt.wantErr) - return - } - switch multiplexer := got.(type) { - case *cacheMultiplexer: - want := tt.want.(*cacheMultiplexer) - for i := range multiplexer.caches { - if reflect.TypeOf(multiplexer.caches[i]) != reflect.TypeOf(want.caches[i]) { - t.Errorf("New() = %v, want %v", reflect.TypeOf(multiplexer.caches[i]), reflect.TypeOf(want.caches[i])) - } - } - case *fsCache: - if reflect.TypeOf(got) != reflect.TypeOf(tt.want) { - t.Errorf("New() = %v, want %v", reflect.TypeOf(got), reflect.TypeOf(tt.want)) - } - case *noopCache: - if reflect.TypeOf(got) != reflect.TypeOf(tt.want) { - t.Errorf("New() = %v, want %v", reflect.TypeOf(got), reflect.TypeOf(tt.want)) - } - } - }) - } -} -- cgit v1.2.3-70-g09d2