From dd84b9d64fb98746a230cd24233ff50a562c39c9 Mon Sep 17 00:00:00 2001 From: 简律纯 Date: Fri, 28 Apr 2023 01:36:44 +0800 Subject: --- cli/internal/cache/async_cache.go | 82 +++++ cli/internal/cache/cache.go | 317 +++++++++++++++++ cli/internal/cache/cache_fs.go | 174 ++++++++++ cli/internal/cache/cache_fs_test.go | 253 ++++++++++++++ cli/internal/cache/cache_http.go | 375 +++++++++++++++++++++ cli/internal/cache/cache_http_test.go | 245 ++++++++++++++ cli/internal/cache/cache_noop.go | 23 ++ .../cache/cache_signature_authentication.go | 88 +++++ .../cache/cache_signature_authentication_test.go | 195 +++++++++++ cli/internal/cache/cache_test.go | 318 +++++++++++++++++ 10 files changed, 2070 insertions(+) create mode 100644 cli/internal/cache/async_cache.go create mode 100644 cli/internal/cache/cache.go create mode 100644 cli/internal/cache/cache_fs.go create mode 100644 cli/internal/cache/cache_fs_test.go create mode 100644 cli/internal/cache/cache_http.go create mode 100644 cli/internal/cache/cache_http_test.go create mode 100644 cli/internal/cache/cache_noop.go create mode 100644 cli/internal/cache/cache_signature_authentication.go create mode 100644 cli/internal/cache/cache_signature_authentication_test.go create mode 100644 cli/internal/cache/cache_test.go (limited to 'cli/internal/cache') diff --git a/cli/internal/cache/async_cache.go b/cli/internal/cache/async_cache.go new file mode 100644 index 0000000..0a8f467 --- /dev/null +++ b/cli/internal/cache/async_cache.go @@ -0,0 +1,82 @@ +// Adapted from https://github.com/thought-machine/please +// Copyright Thought Machine, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package cache + +import ( + "sync" + + "github.com/vercel/turbo/cli/internal/turbopath" +) + +// An asyncCache is a wrapper around a Cache interface that handles incoming +// store requests asynchronously and attempts to return immediately. +// The requests are handled on an internal queue, if that fills up then +// incoming requests will start to block again until it empties. +// Retrieval requests are still handled synchronously. +type asyncCache struct { + requests chan cacheRequest + realCache Cache + wg sync.WaitGroup +} + +// A cacheRequest models an incoming cache request on our queue. +type cacheRequest struct { + anchor turbopath.AbsoluteSystemPath + key string + duration int + files []turbopath.AnchoredSystemPath +} + +func newAsyncCache(realCache Cache, opts Opts) Cache { + c := &asyncCache{ + requests: make(chan cacheRequest), + realCache: realCache, + } + c.wg.Add(opts.Workers) + for i := 0; i < opts.Workers; i++ { + go c.run() + } + return c +} + +func (c *asyncCache) Put(anchor turbopath.AbsoluteSystemPath, key string, duration int, files []turbopath.AnchoredSystemPath) error { + c.requests <- cacheRequest{ + anchor: anchor, + key: key, + files: files, + duration: duration, + } + return nil +} + +func (c *asyncCache) Fetch(anchor turbopath.AbsoluteSystemPath, key string, files []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) { + return c.realCache.Fetch(anchor, key, files) +} + +func (c *asyncCache) Exists(key string) ItemStatus { + return c.realCache.Exists(key) +} + +func (c *asyncCache) Clean(anchor turbopath.AbsoluteSystemPath) { + c.realCache.Clean(anchor) +} + +func (c *asyncCache) CleanAll() { + c.realCache.CleanAll() +} + +func (c *asyncCache) Shutdown() { + // fmt.Println("Shutting down cache workers...") + close(c.requests) + c.wg.Wait() + // fmt.Println("Shut down all cache workers") +} + +// run implements the actual async logic. +func (c *asyncCache) run() { + for r := range c.requests { + _ = c.realCache.Put(r.anchor, r.key, r.duration, r.files) + } + c.wg.Done() +} diff --git a/cli/internal/cache/cache.go b/cli/internal/cache/cache.go new file mode 100644 index 0000000..8b74272 --- /dev/null +++ b/cli/internal/cache/cache.go @@ -0,0 +1,317 @@ +// Package cache abstracts storing and fetching previously run tasks +// +// Adapted from https://github.com/thought-machine/please +// Copyright Thought Machine, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package cache + +import ( + "errors" + "sync" + + "github.com/vercel/turbo/cli/internal/analytics" + "github.com/vercel/turbo/cli/internal/fs" + "github.com/vercel/turbo/cli/internal/turbopath" + "github.com/vercel/turbo/cli/internal/util" + "golang.org/x/sync/errgroup" +) + +// Cache is abstracted way to cache/fetch previously run tasks +type Cache interface { + // Fetch returns true if there is a cache it. It is expected to move files + // into their correct position as a side effect + Fetch(anchor turbopath.AbsoluteSystemPath, hash string, files []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) + Exists(hash string) ItemStatus + // Put caches files for a given hash + Put(anchor turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error + Clean(anchor turbopath.AbsoluteSystemPath) + CleanAll() + Shutdown() +} + +// ItemStatus holds whether artifacts exists for a given hash on local +// and/or remote caching server +type ItemStatus struct { + Local bool `json:"local"` + Remote bool `json:"remote"` +} + +const ( + // CacheSourceFS is a constant to indicate local cache hit + CacheSourceFS = "LOCAL" + // CacheSourceRemote is a constant to indicate remote cache hit + CacheSourceRemote = "REMOTE" + // CacheEventHit is a constant to indicate a cache hit + CacheEventHit = "HIT" + // CacheEventMiss is a constant to indicate a cache miss + CacheEventMiss = "MISS" +) + +type CacheEvent struct { + Source string `mapstructure:"source"` + Event string `mapstructure:"event"` + Hash string `mapstructure:"hash"` + Duration int `mapstructure:"duration"` +} + +// DefaultLocation returns the default filesystem cache location, given a repo root +func DefaultLocation(repoRoot turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath { + return repoRoot.UntypedJoin("node_modules", ".cache", "turbo") +} + +// OnCacheRemoved defines a callback that the cache system calls if a particular cache +// needs to be removed. In practice, this happens when Remote Caching has been disabled +// the but CLI continues to try to use it. +type OnCacheRemoved = func(cache Cache, err error) + +// ErrNoCachesEnabled is returned when both the filesystem and http cache are unavailable +var ErrNoCachesEnabled = errors.New("no caches are enabled") + +// Opts holds configuration options for the cache +// TODO(gsoltis): further refactor this into fs cache opts and http cache opts +type Opts struct { + OverrideDir string + SkipRemote bool + SkipFilesystem bool + Workers int + RemoteCacheOpts fs.RemoteCacheOptions +} + +// resolveCacheDir calculates the location turbo should use to cache artifacts, +// based on the options supplied by the user. +func (o *Opts) resolveCacheDir(repoRoot turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath { + if o.OverrideDir != "" { + return fs.ResolveUnknownPath(repoRoot, o.OverrideDir) + } + return DefaultLocation(repoRoot) +} + +var _remoteOnlyHelp = `Ignore the local filesystem cache for all tasks. Only +allow reading and caching artifacts using the remote cache.` + +// New creates a new cache +func New(opts Opts, repoRoot turbopath.AbsoluteSystemPath, client client, recorder analytics.Recorder, onCacheRemoved OnCacheRemoved) (Cache, error) { + c, err := newSyncCache(opts, repoRoot, client, recorder, onCacheRemoved) + if err != nil && !errors.Is(err, ErrNoCachesEnabled) { + return nil, err + } + if opts.Workers > 0 { + return newAsyncCache(c, opts), err + } + return c, err +} + +// newSyncCache can return an error with a usable noopCache. +func newSyncCache(opts Opts, repoRoot turbopath.AbsoluteSystemPath, client client, recorder analytics.Recorder, onCacheRemoved OnCacheRemoved) (Cache, error) { + // Check to see if the user has turned off particular cache implementations. + useFsCache := !opts.SkipFilesystem + useHTTPCache := !opts.SkipRemote + + // Since the above two flags are not mutually exclusive it is possible to configure + // yourself out of having a cache. We should tell you about it but we shouldn't fail + // your build for that reason. + // + // Further, since the httpCache can be removed at runtime, we need to insert a noopCache + // as a backup if you are configured to have *just* an httpCache. + // + // This is reduced from (!useFsCache && !useHTTPCache) || (!useFsCache & useHTTPCache) + useNoopCache := !useFsCache + + // Build up an array of cache implementations, we can only ever have 1 or 2. + cacheImplementations := make([]Cache, 0, 2) + + if useFsCache { + implementation, err := newFsCache(opts, recorder, repoRoot) + if err != nil { + return nil, err + } + cacheImplementations = append(cacheImplementations, implementation) + } + + if useHTTPCache { + implementation := newHTTPCache(opts, client, recorder) + cacheImplementations = append(cacheImplementations, implementation) + } + + if useNoopCache { + implementation := newNoopCache() + cacheImplementations = append(cacheImplementations, implementation) + } + + // Precisely two cache implementations: + // fsCache and httpCache OR httpCache and noopCache + useMultiplexer := len(cacheImplementations) > 1 + if useMultiplexer { + // We have early-returned any possible errors for this scenario. + return &cacheMultiplexer{ + onCacheRemoved: onCacheRemoved, + opts: opts, + caches: cacheImplementations, + }, nil + } + + // Precisely one cache implementation: fsCache OR noopCache + implementation := cacheImplementations[0] + _, isNoopCache := implementation.(*noopCache) + + // We want to let the user know something is wonky, but we don't want + // to trigger their build to fail. + if isNoopCache { + return implementation, ErrNoCachesEnabled + } + return implementation, nil +} + +// A cacheMultiplexer multiplexes several caches into one. +// Used when we have several active (eg. http, dir). +type cacheMultiplexer struct { + caches []Cache + opts Opts + mu sync.RWMutex + onCacheRemoved OnCacheRemoved +} + +func (mplex *cacheMultiplexer) Put(anchor turbopath.AbsoluteSystemPath, key string, duration int, files []turbopath.AnchoredSystemPath) error { + return mplex.storeUntil(anchor, key, duration, files, len(mplex.caches)) +} + +type cacheRemoval struct { + cache Cache + err *util.CacheDisabledError +} + +// storeUntil stores artifacts into higher priority caches than the given one. +// Used after artifact retrieval to ensure we have them in eg. the directory cache after +// downloading from the RPC cache. +func (mplex *cacheMultiplexer) storeUntil(anchor turbopath.AbsoluteSystemPath, key string, duration int, files []turbopath.AnchoredSystemPath, stopAt int) error { + // Attempt to store on all caches simultaneously. + toRemove := make([]*cacheRemoval, stopAt) + g := &errgroup.Group{} + mplex.mu.RLock() + for i, cache := range mplex.caches { + if i == stopAt { + break + } + c := cache + i := i + g.Go(func() error { + err := c.Put(anchor, key, duration, files) + if err != nil { + cd := &util.CacheDisabledError{} + if errors.As(err, &cd) { + toRemove[i] = &cacheRemoval{ + cache: c, + err: cd, + } + // we don't want this to cancel other cache actions + return nil + } + return err + } + return nil + }) + } + mplex.mu.RUnlock() + + if err := g.Wait(); err != nil { + return err + } + + for _, removal := range toRemove { + if removal != nil { + mplex.removeCache(removal) + } + } + return nil +} + +// removeCache takes a requested removal and tries to actually remove it. However, +// multiple requests could result in concurrent requests to remove the same cache. +// Let one of them win and propagate the error, the rest will no-op. +func (mplex *cacheMultiplexer) removeCache(removal *cacheRemoval) { + mplex.mu.Lock() + defer mplex.mu.Unlock() + for i, cache := range mplex.caches { + if cache == removal.cache { + mplex.caches = append(mplex.caches[:i], mplex.caches[i+1:]...) + mplex.onCacheRemoved(cache, removal.err) + break + } + } +} + +func (mplex *cacheMultiplexer) Fetch(anchor turbopath.AbsoluteSystemPath, key string, files []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) { + // Make a shallow copy of the caches, since storeUntil can call removeCache + mplex.mu.RLock() + caches := make([]Cache, len(mplex.caches)) + copy(caches, mplex.caches) + mplex.mu.RUnlock() + + // We need to return a composite cache status from multiple caches + // Initialize the empty struct so we can assign values to it. This is similar + // to how the Exists() method works. + combinedCacheState := ItemStatus{} + + // Retrieve from caches sequentially; if we did them simultaneously we could + // easily write the same file from two goroutines at once. + for i, cache := range caches { + itemStatus, actualFiles, duration, err := cache.Fetch(anchor, key, files) + ok := itemStatus.Local || itemStatus.Remote + + if err != nil { + cd := &util.CacheDisabledError{} + if errors.As(err, &cd) { + mplex.removeCache(&cacheRemoval{ + cache: cache, + err: cd, + }) + } + // We're ignoring the error in the else case, since with this cache + // abstraction, we want to check lower priority caches rather than fail + // the operation. Future work that plumbs UI / Logging into the cache system + // should probably log this at least. + } + if ok { + // Store this into other caches. We can ignore errors here because we know + // we have previously successfully stored in a higher-priority cache, and so the overall + // result is a success at fetching. Storing in lower-priority caches is an optimization. + _ = mplex.storeUntil(anchor, key, duration, actualFiles, i) + + // If another cache had already set this to true, we don't need to set it again from this cache + combinedCacheState.Local = combinedCacheState.Local || itemStatus.Local + combinedCacheState.Remote = combinedCacheState.Remote || itemStatus.Remote + return combinedCacheState, actualFiles, duration, err + } + } + + return ItemStatus{Local: false, Remote: false}, nil, 0, nil +} + +func (mplex *cacheMultiplexer) Exists(target string) ItemStatus { + syncCacheState := ItemStatus{} + for _, cache := range mplex.caches { + itemStatus := cache.Exists(target) + syncCacheState.Local = syncCacheState.Local || itemStatus.Local + syncCacheState.Remote = syncCacheState.Remote || itemStatus.Remote + } + + return syncCacheState +} + +func (mplex *cacheMultiplexer) Clean(anchor turbopath.AbsoluteSystemPath) { + for _, cache := range mplex.caches { + cache.Clean(anchor) + } +} + +func (mplex *cacheMultiplexer) CleanAll() { + for _, cache := range mplex.caches { + cache.CleanAll() + } +} + +func (mplex *cacheMultiplexer) Shutdown() { + for _, cache := range mplex.caches { + cache.Shutdown() + } +} diff --git a/cli/internal/cache/cache_fs.go b/cli/internal/cache/cache_fs.go new file mode 100644 index 0000000..fb15a02 --- /dev/null +++ b/cli/internal/cache/cache_fs.go @@ -0,0 +1,174 @@ +// Adapted from https://github.com/thought-machine/please +// Copyright Thought Machine, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +// Package cache implements our cache abstraction. +package cache + +import ( + "encoding/json" + "fmt" + + "github.com/vercel/turbo/cli/internal/analytics" + "github.com/vercel/turbo/cli/internal/cacheitem" + "github.com/vercel/turbo/cli/internal/turbopath" +) + +// fsCache is a local filesystem cache +type fsCache struct { + cacheDirectory turbopath.AbsoluteSystemPath + recorder analytics.Recorder +} + +// newFsCache creates a new filesystem cache +func newFsCache(opts Opts, recorder analytics.Recorder, repoRoot turbopath.AbsoluteSystemPath) (*fsCache, error) { + cacheDir := opts.resolveCacheDir(repoRoot) + if err := cacheDir.MkdirAll(0775); err != nil { + return nil, err + } + return &fsCache{ + cacheDirectory: cacheDir, + recorder: recorder, + }, nil +} + +// Fetch returns true if items are cached. It moves them into position as a side effect. +func (f *fsCache) Fetch(anchor turbopath.AbsoluteSystemPath, hash string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) { + uncompressedCachePath := f.cacheDirectory.UntypedJoin(hash + ".tar") + compressedCachePath := f.cacheDirectory.UntypedJoin(hash + ".tar.zst") + + var actualCachePath turbopath.AbsoluteSystemPath + if uncompressedCachePath.FileExists() { + actualCachePath = uncompressedCachePath + } else if compressedCachePath.FileExists() { + actualCachePath = compressedCachePath + } else { + // It's not in the cache, bail now + f.logFetch(false, hash, 0) + return ItemStatus{Local: false}, nil, 0, nil + } + + cacheItem, openErr := cacheitem.Open(actualCachePath) + if openErr != nil { + return ItemStatus{Local: false}, nil, 0, openErr + } + + restoredFiles, restoreErr := cacheItem.Restore(anchor) + if restoreErr != nil { + _ = cacheItem.Close() + return ItemStatus{Local: false}, nil, 0, restoreErr + } + + meta, err := ReadCacheMetaFile(f.cacheDirectory.UntypedJoin(hash + "-meta.json")) + if err != nil { + _ = cacheItem.Close() + return ItemStatus{Local: false}, nil, 0, fmt.Errorf("error reading cache metadata: %w", err) + } + f.logFetch(true, hash, meta.Duration) + + // Wait to see what happens with close. + closeErr := cacheItem.Close() + if closeErr != nil { + return ItemStatus{Local: false}, restoredFiles, 0, closeErr + } + return ItemStatus{Local: true}, restoredFiles, meta.Duration, nil +} + +func (f *fsCache) Exists(hash string) ItemStatus { + uncompressedCachePath := f.cacheDirectory.UntypedJoin(hash + ".tar") + compressedCachePath := f.cacheDirectory.UntypedJoin(hash + ".tar.zst") + + if compressedCachePath.FileExists() || uncompressedCachePath.FileExists() { + return ItemStatus{Local: true} + } + + return ItemStatus{Local: false} +} + +func (f *fsCache) logFetch(hit bool, hash string, duration int) { + var event string + if hit { + event = CacheEventHit + } else { + event = CacheEventMiss + } + payload := &CacheEvent{ + Source: CacheSourceFS, + Event: event, + Hash: hash, + Duration: duration, + } + f.recorder.LogEvent(payload) +} + +func (f *fsCache) Put(anchor turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error { + cachePath := f.cacheDirectory.UntypedJoin(hash + ".tar.zst") + cacheItem, err := cacheitem.Create(cachePath) + if err != nil { + return err + } + + for _, file := range files { + err := cacheItem.AddFile(anchor, file) + if err != nil { + _ = cacheItem.Close() + return err + } + } + + writeErr := WriteCacheMetaFile(f.cacheDirectory.UntypedJoin(hash+"-meta.json"), &CacheMetadata{ + Duration: duration, + Hash: hash, + }) + + if writeErr != nil { + _ = cacheItem.Close() + return writeErr + } + + return cacheItem.Close() +} + +func (f *fsCache) Clean(_ turbopath.AbsoluteSystemPath) { + fmt.Println("Not implemented yet") +} + +func (f *fsCache) CleanAll() { + fmt.Println("Not implemented yet") +} + +func (f *fsCache) Shutdown() {} + +// CacheMetadata stores duration and hash information for a cache entry so that aggregate Time Saved calculations +// can be made from artifacts from various caches +type CacheMetadata struct { + Hash string `json:"hash"` + Duration int `json:"duration"` +} + +// WriteCacheMetaFile writes cache metadata file at a path +func WriteCacheMetaFile(path turbopath.AbsoluteSystemPath, config *CacheMetadata) error { + jsonBytes, marshalErr := json.Marshal(config) + if marshalErr != nil { + return marshalErr + } + writeFilErr := path.WriteFile(jsonBytes, 0644) + if writeFilErr != nil { + return writeFilErr + } + return nil +} + +// ReadCacheMetaFile reads cache metadata file at a path +func ReadCacheMetaFile(path turbopath.AbsoluteSystemPath) (*CacheMetadata, error) { + jsonBytes, readFileErr := path.ReadFile() + if readFileErr != nil { + return nil, readFileErr + } + var config CacheMetadata + marshalErr := json.Unmarshal(jsonBytes, &config) + if marshalErr != nil { + return nil, marshalErr + } + return &config, nil +} diff --git a/cli/internal/cache/cache_fs_test.go b/cli/internal/cache/cache_fs_test.go new file mode 100644 index 0000000..614ad86 --- /dev/null +++ b/cli/internal/cache/cache_fs_test.go @@ -0,0 +1,253 @@ +package cache + +import ( + "path/filepath" + "testing" + + "github.com/vercel/turbo/cli/internal/analytics" + "github.com/vercel/turbo/cli/internal/cacheitem" + "github.com/vercel/turbo/cli/internal/turbopath" + "gotest.tools/v3/assert" +) + +type dummyRecorder struct{} + +func (dr *dummyRecorder) LogEvent(payload analytics.EventPayload) {} + +func TestPut(t *testing.T) { + // Set up a test source and cache directory + // The "source" directory simulates a package + // + // / + // b + // child/ + // a + // link -> ../b + // broken -> missing + // + // Ensure we end up with a matching directory under a + // "cache" directory: + // + // /the-hash//... + + src := turbopath.AbsoluteSystemPath(t.TempDir()) + childDir := src.UntypedJoin("child") + err := childDir.MkdirAll(0775) + assert.NilError(t, err, "Mkdir") + aPath := childDir.UntypedJoin("a") + aFile, err := aPath.Create() + assert.NilError(t, err, "Create") + _, err = aFile.WriteString("hello") + assert.NilError(t, err, "WriteString") + assert.NilError(t, aFile.Close(), "Close") + + bPath := src.UntypedJoin("b") + bFile, err := bPath.Create() + assert.NilError(t, err, "Create") + _, err = bFile.WriteString("bFile") + assert.NilError(t, err, "WriteString") + assert.NilError(t, bFile.Close(), "Close") + + srcLinkPath := childDir.UntypedJoin("link") + linkTarget := filepath.FromSlash("../b") + assert.NilError(t, srcLinkPath.Symlink(linkTarget), "Symlink") + + srcBrokenLinkPath := childDir.Join("broken") + assert.NilError(t, srcBrokenLinkPath.Symlink("missing"), "Symlink") + circlePath := childDir.Join("circle") + assert.NilError(t, circlePath.Symlink(filepath.FromSlash("../child")), "Symlink") + + files := []turbopath.AnchoredSystemPath{ + turbopath.AnchoredUnixPath("child/").ToSystemPath(), // childDir + turbopath.AnchoredUnixPath("child/a").ToSystemPath(), // aPath, + turbopath.AnchoredUnixPath("b").ToSystemPath(), // bPath, + turbopath.AnchoredUnixPath("child/link").ToSystemPath(), // srcLinkPath, + turbopath.AnchoredUnixPath("child/broken").ToSystemPath(), // srcBrokenLinkPath, + turbopath.AnchoredUnixPath("child/circle").ToSystemPath(), // circlePath + } + + dst := turbopath.AbsoluteSystemPath(t.TempDir()) + dr := &dummyRecorder{} + + cache := &fsCache{ + cacheDirectory: dst, + recorder: dr, + } + + hash := "the-hash" + duration := 0 + putErr := cache.Put(src, hash, duration, files) + assert.NilError(t, putErr, "Put") + + // Verify that we got the files that we're expecting + dstCachePath := dst.UntypedJoin(hash) + + // This test checks outputs, so we go ahead and pull things back out. + // Attempting to satisfy our beliefs that the change is viable with + // as few changes to the tests as possible. + cacheItem, openErr := cacheitem.Open(dst.UntypedJoin(hash + ".tar.zst")) + assert.NilError(t, openErr, "Open") + + _, restoreErr := cacheItem.Restore(dstCachePath) + assert.NilError(t, restoreErr, "Restore") + + dstAPath := dstCachePath.UntypedJoin("child", "a") + assertFileMatches(t, aPath, dstAPath) + + dstBPath := dstCachePath.UntypedJoin("b") + assertFileMatches(t, bPath, dstBPath) + + dstLinkPath := dstCachePath.UntypedJoin("child", "link") + target, err := dstLinkPath.Readlink() + assert.NilError(t, err, "Readlink") + if target != linkTarget { + t.Errorf("Readlink got %v, want %v", target, linkTarget) + } + + dstBrokenLinkPath := dstCachePath.UntypedJoin("child", "broken") + target, err = dstBrokenLinkPath.Readlink() + assert.NilError(t, err, "Readlink") + if target != "missing" { + t.Errorf("Readlink got %v, want missing", target) + } + + dstCirclePath := dstCachePath.UntypedJoin("child", "circle") + circleLinkDest, err := dstCirclePath.Readlink() + assert.NilError(t, err, "Readlink") + expectedCircleLinkDest := filepath.FromSlash("../child") + if circleLinkDest != expectedCircleLinkDest { + t.Errorf("Cache link got %v, want %v", circleLinkDest, expectedCircleLinkDest) + } + + assert.NilError(t, cacheItem.Close(), "Close") +} + +func assertFileMatches(t *testing.T, orig turbopath.AbsoluteSystemPath, copy turbopath.AbsoluteSystemPath) { + t.Helper() + origBytes, err := orig.ReadFile() + assert.NilError(t, err, "ReadFile") + copyBytes, err := copy.ReadFile() + assert.NilError(t, err, "ReadFile") + assert.DeepEqual(t, origBytes, copyBytes) + origStat, err := orig.Lstat() + assert.NilError(t, err, "Lstat") + copyStat, err := copy.Lstat() + assert.NilError(t, err, "Lstat") + assert.Equal(t, origStat.Mode(), copyStat.Mode()) +} + +func TestFetch(t *testing.T) { + // Set up a test cache directory and target output directory + // The "cacheDir" directory simulates a cached package + // + // / + // the-hash-meta.json + // the-hash/ + // some-package/ + // b + // child/ + // a + // link -> ../b + // broken -> missing + // circle -> ../child + // + // Ensure we end up with a matching directory under a + // "some-package" directory: + // + // "some-package"/... + + cacheDir := turbopath.AbsoluteSystemPath(t.TempDir()) + hash := "the-hash" + src := cacheDir.UntypedJoin(hash, "some-package") + err := src.MkdirAll(0775) + assert.NilError(t, err, "mkdirAll") + + childDir := src.UntypedJoin("child") + err = childDir.MkdirAll(0775) + assert.NilError(t, err, "Mkdir") + aPath := childDir.UntypedJoin("a") + aFile, err := aPath.Create() + assert.NilError(t, err, "Create") + _, err = aFile.WriteString("hello") + assert.NilError(t, err, "WriteString") + assert.NilError(t, aFile.Close(), "Close") + + bPath := src.UntypedJoin("b") + bFile, err := bPath.Create() + assert.NilError(t, err, "Create") + _, err = bFile.WriteString("bFile") + assert.NilError(t, err, "WriteString") + assert.NilError(t, bFile.Close(), "Close") + + srcLinkPath := childDir.UntypedJoin("link") + linkTarget := filepath.FromSlash("../b") + assert.NilError(t, srcLinkPath.Symlink(linkTarget), "Symlink") + + srcBrokenLinkPath := childDir.UntypedJoin("broken") + srcBrokenLinkTarget := turbopath.AnchoredUnixPath("missing").ToSystemPath() + assert.NilError(t, srcBrokenLinkPath.Symlink(srcBrokenLinkTarget.ToString()), "Symlink") + + circlePath := childDir.Join("circle") + srcCircleLinkTarget := turbopath.AnchoredUnixPath("../child").ToSystemPath() + assert.NilError(t, circlePath.Symlink(srcCircleLinkTarget.ToString()), "Symlink") + + metadataPath := cacheDir.UntypedJoin("the-hash-meta.json") + err = metadataPath.WriteFile([]byte(`{"hash":"the-hash","duration":0}`), 0777) + assert.NilError(t, err, "WriteFile") + + dr := &dummyRecorder{} + + cache := &fsCache{ + cacheDirectory: cacheDir, + recorder: dr, + } + + inputFiles := []turbopath.AnchoredSystemPath{ + turbopath.AnchoredUnixPath("some-package/child/").ToSystemPath(), // childDir + turbopath.AnchoredUnixPath("some-package/child/a").ToSystemPath(), // aPath, + turbopath.AnchoredUnixPath("some-package/b").ToSystemPath(), // bPath, + turbopath.AnchoredUnixPath("some-package/child/link").ToSystemPath(), // srcLinkPath, + turbopath.AnchoredUnixPath("some-package/child/broken").ToSystemPath(), // srcBrokenLinkPath, + turbopath.AnchoredUnixPath("some-package/child/circle").ToSystemPath(), // circlePath + } + + putErr := cache.Put(cacheDir.UntypedJoin(hash), hash, 0, inputFiles) + assert.NilError(t, putErr, "Put") + + outputDir := turbopath.AbsoluteSystemPath(t.TempDir()) + dstOutputPath := "some-package" + cacheStatus, files, _, err := cache.Fetch(outputDir, "the-hash", []string{}) + assert.NilError(t, err, "Fetch") + hit := cacheStatus.Local || cacheStatus.Remote + if !hit { + t.Error("Fetch got false, want true") + } + if len(files) != len(inputFiles) { + t.Errorf("len(files) got %v, want %v", len(files), len(inputFiles)) + } + + dstAPath := outputDir.UntypedJoin(dstOutputPath, "child", "a") + assertFileMatches(t, aPath, dstAPath) + + dstBPath := outputDir.UntypedJoin(dstOutputPath, "b") + assertFileMatches(t, bPath, dstBPath) + + dstLinkPath := outputDir.UntypedJoin(dstOutputPath, "child", "link") + target, err := dstLinkPath.Readlink() + assert.NilError(t, err, "Readlink") + if target != linkTarget { + t.Errorf("Readlink got %v, want %v", target, linkTarget) + } + + // Assert that we restore broken symlinks correctly + dstBrokenLinkPath := outputDir.UntypedJoin(dstOutputPath, "child", "broken") + target, readlinkErr := dstBrokenLinkPath.Readlink() + assert.NilError(t, readlinkErr, "Readlink") + assert.Equal(t, target, srcBrokenLinkTarget.ToString()) + + // Assert that we restore symlinks to directories correctly + dstCirclePath := outputDir.UntypedJoin(dstOutputPath, "child", "circle") + circleTarget, circleReadlinkErr := dstCirclePath.Readlink() + assert.NilError(t, circleReadlinkErr, "Circle Readlink") + assert.Equal(t, circleTarget, srcCircleLinkTarget.ToString()) +} diff --git a/cli/internal/cache/cache_http.go b/cli/internal/cache/cache_http.go new file mode 100644 index 0000000..1d345bf --- /dev/null +++ b/cli/internal/cache/cache_http.go @@ -0,0 +1,375 @@ +// Adapted from https://github.com/thought-machine/please +// Copyright Thought Machine, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package cache + +import ( + "archive/tar" + "bytes" + "errors" + "fmt" + "io" + "io/ioutil" + log "log" + "net/http" + "os" + "path/filepath" + "strconv" + "time" + + "github.com/DataDog/zstd" + + "github.com/vercel/turbo/cli/internal/analytics" + "github.com/vercel/turbo/cli/internal/tarpatch" + "github.com/vercel/turbo/cli/internal/turbopath" +) + +type client interface { + PutArtifact(hash string, body []byte, duration int, tag string) error + FetchArtifact(hash string) (*http.Response, error) + ArtifactExists(hash string) (*http.Response, error) + GetTeamID() string +} + +type httpCache struct { + writable bool + client client + requestLimiter limiter + recorder analytics.Recorder + signerVerifier *ArtifactSignatureAuthentication + repoRoot turbopath.AbsoluteSystemPath +} + +type limiter chan struct{} + +func (l limiter) acquire() { + l <- struct{}{} +} + +func (l limiter) release() { + <-l +} + +// mtime is the time we attach for the modification time of all files. +var mtime = time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC) + +// nobody is the usual uid / gid of the 'nobody' user. +const nobody = 65534 + +func (cache *httpCache) Put(_ turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error { + // if cache.writable { + cache.requestLimiter.acquire() + defer cache.requestLimiter.release() + + r, w := io.Pipe() + go cache.write(w, hash, files) + + // Read the entire artifact tar into memory so we can easily compute the signature. + // Note: retryablehttp.NewRequest reads the files into memory anyways so there's no + // additional overhead by doing the ioutil.ReadAll here instead. + artifactBody, err := ioutil.ReadAll(r) + if err != nil { + return fmt.Errorf("failed to store files in HTTP cache: %w", err) + } + tag := "" + if cache.signerVerifier.isEnabled() { + tag, err = cache.signerVerifier.generateTag(hash, artifactBody) + if err != nil { + return fmt.Errorf("failed to store files in HTTP cache: %w", err) + } + } + return cache.client.PutArtifact(hash, artifactBody, duration, tag) +} + +// write writes a series of files into the given Writer. +func (cache *httpCache) write(w io.WriteCloser, hash string, files []turbopath.AnchoredSystemPath) { + defer w.Close() + defer func() { _ = w.Close() }() + zw := zstd.NewWriter(w) + defer func() { _ = zw.Close() }() + tw := tar.NewWriter(zw) + defer func() { _ = tw.Close() }() + for _, file := range files { + // log.Printf("caching file %v", file) + if err := cache.storeFile(tw, file); err != nil { + log.Printf("[ERROR] Error uploading artifact %s to HTTP cache due to: %s", file, err) + // TODO(jaredpalmer): How can we cancel the request at this point? + } + } +} + +func (cache *httpCache) storeFile(tw *tar.Writer, repoRelativePath turbopath.AnchoredSystemPath) error { + absoluteFilePath := repoRelativePath.RestoreAnchor(cache.repoRoot) + info, err := absoluteFilePath.Lstat() + if err != nil { + return err + } + target := "" + if info.Mode()&os.ModeSymlink != 0 { + target, err = absoluteFilePath.Readlink() + if err != nil { + return err + } + } + hdr, err := tarpatch.FileInfoHeader(repoRelativePath.ToUnixPath(), info, filepath.ToSlash(target)) + if err != nil { + return err + } + // Ensure posix path for filename written in header. + hdr.Name = repoRelativePath.ToUnixPath().ToString() + // Zero out all timestamps. + hdr.ModTime = mtime + hdr.AccessTime = mtime + hdr.ChangeTime = mtime + // Strip user/group ids. + hdr.Uid = nobody + hdr.Gid = nobody + hdr.Uname = "nobody" + hdr.Gname = "nobody" + if err := tw.WriteHeader(hdr); err != nil { + return err + } else if info.IsDir() || target != "" { + return nil // nothing to write + } + f, err := absoluteFilePath.Open() + if err != nil { + return err + } + defer func() { _ = f.Close() }() + _, err = io.Copy(tw, f) + if errors.Is(err, tar.ErrWriteTooLong) { + log.Printf("Error writing %v to tar file, info: %v, mode: %v, is regular: %v", repoRelativePath, info, info.Mode(), info.Mode().IsRegular()) + } + return err +} + +func (cache *httpCache) Fetch(_ turbopath.AbsoluteSystemPath, key string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) { + cache.requestLimiter.acquire() + defer cache.requestLimiter.release() + hit, files, duration, err := cache.retrieve(key) + if err != nil { + // TODO: analytics event? + return ItemStatus{Remote: false}, files, duration, fmt.Errorf("failed to retrieve files from HTTP cache: %w", err) + } + cache.logFetch(hit, key, duration) + return ItemStatus{Remote: hit}, files, duration, err +} + +func (cache *httpCache) Exists(key string) ItemStatus { + cache.requestLimiter.acquire() + defer cache.requestLimiter.release() + hit, err := cache.exists(key) + if err != nil { + return ItemStatus{Remote: false} + } + return ItemStatus{Remote: hit} +} + +func (cache *httpCache) logFetch(hit bool, hash string, duration int) { + var event string + if hit { + event = CacheEventHit + } else { + event = CacheEventMiss + } + payload := &CacheEvent{ + Source: CacheSourceRemote, + Event: event, + Hash: hash, + Duration: duration, + } + cache.recorder.LogEvent(payload) +} + +func (cache *httpCache) exists(hash string) (bool, error) { + resp, err := cache.client.ArtifactExists(hash) + if err != nil { + return false, nil + } + + defer func() { err = resp.Body.Close() }() + + if resp.StatusCode == http.StatusNotFound { + return false, nil + } else if resp.StatusCode != http.StatusOK { + return false, fmt.Errorf("%s", strconv.Itoa(resp.StatusCode)) + } + return true, err +} + +func (cache *httpCache) retrieve(hash string) (bool, []turbopath.AnchoredSystemPath, int, error) { + resp, err := cache.client.FetchArtifact(hash) + if err != nil { + return false, nil, 0, err + } + defer resp.Body.Close() + if resp.StatusCode == http.StatusNotFound { + return false, nil, 0, nil // doesn't exist - not an error + } else if resp.StatusCode != http.StatusOK { + b, _ := ioutil.ReadAll(resp.Body) + return false, nil, 0, fmt.Errorf("%s", string(b)) + } + // If present, extract the duration from the response. + duration := 0 + if resp.Header.Get("x-artifact-duration") != "" { + intVar, err := strconv.Atoi(resp.Header.Get("x-artifact-duration")) + if err != nil { + return false, nil, 0, fmt.Errorf("invalid x-artifact-duration header: %w", err) + } + duration = intVar + } + var tarReader io.Reader + + defer func() { _ = resp.Body.Close() }() + if cache.signerVerifier.isEnabled() { + expectedTag := resp.Header.Get("x-artifact-tag") + if expectedTag == "" { + // If the verifier is enabled all incoming artifact downloads must have a signature + return false, nil, 0, errors.New("artifact verification failed: Downloaded artifact is missing required x-artifact-tag header") + } + b, err := ioutil.ReadAll(resp.Body) + if err != nil { + return false, nil, 0, fmt.Errorf("artifact verification failed: %w", err) + } + isValid, err := cache.signerVerifier.validate(hash, b, expectedTag) + if err != nil { + return false, nil, 0, fmt.Errorf("artifact verification failed: %w", err) + } + if !isValid { + err = fmt.Errorf("artifact verification failed: artifact tag does not match expected tag %s", expectedTag) + return false, nil, 0, err + } + // The artifact has been verified and the body can be read and untarred + tarReader = bytes.NewReader(b) + } else { + tarReader = resp.Body + } + files, err := restoreTar(cache.repoRoot, tarReader) + if err != nil { + return false, nil, 0, err + } + return true, files, duration, nil +} + +// restoreTar returns posix-style repo-relative paths of the files it +// restored. In the future, these should likely be repo-relative system paths +// so that they are suitable for being fed into cache.Put for other caches. +// For now, I think this is working because windows also accepts /-delimited paths. +func restoreTar(root turbopath.AbsoluteSystemPath, reader io.Reader) ([]turbopath.AnchoredSystemPath, error) { + files := []turbopath.AnchoredSystemPath{} + missingLinks := []*tar.Header{} + zr := zstd.NewReader(reader) + var closeError error + defer func() { closeError = zr.Close() }() + tr := tar.NewReader(zr) + for { + hdr, err := tr.Next() + if err != nil { + if err == io.EOF { + for _, link := range missingLinks { + err := restoreSymlink(root, link, true) + if err != nil { + return nil, err + } + } + + return files, closeError + } + return nil, err + } + // hdr.Name is always a posix-style path + // FIXME: THIS IS A BUG. + restoredName := turbopath.AnchoredUnixPath(hdr.Name) + files = append(files, restoredName.ToSystemPath()) + filename := restoredName.ToSystemPath().RestoreAnchor(root) + if isChild, err := root.ContainsPath(filename); err != nil { + return nil, err + } else if !isChild { + return nil, fmt.Errorf("cannot untar file to %v", filename) + } + switch hdr.Typeflag { + case tar.TypeDir: + if err := filename.MkdirAll(0775); err != nil { + return nil, err + } + case tar.TypeReg: + if dir := filename.Dir(); dir != "." { + if err := dir.MkdirAll(0775); err != nil { + return nil, err + } + } + if f, err := filename.OpenFile(os.O_WRONLY|os.O_TRUNC|os.O_CREATE, os.FileMode(hdr.Mode)); err != nil { + return nil, err + } else if _, err := io.Copy(f, tr); err != nil { + return nil, err + } else if err := f.Close(); err != nil { + return nil, err + } + case tar.TypeSymlink: + if err := restoreSymlink(root, hdr, false); errors.Is(err, errNonexistentLinkTarget) { + missingLinks = append(missingLinks, hdr) + } else if err != nil { + return nil, err + } + default: + log.Printf("Unhandled file type %d for %s", hdr.Typeflag, hdr.Name) + } + } +} + +var errNonexistentLinkTarget = errors.New("the link target does not exist") + +func restoreSymlink(root turbopath.AbsoluteSystemPath, hdr *tar.Header, allowNonexistentTargets bool) error { + // Note that hdr.Linkname is really the link target + relativeLinkTarget := filepath.FromSlash(hdr.Linkname) + linkFilename := root.UntypedJoin(hdr.Name) + if err := linkFilename.EnsureDir(); err != nil { + return err + } + + // TODO: check if this is an absolute path, or if we even care + linkTarget := linkFilename.Dir().UntypedJoin(relativeLinkTarget) + if _, err := linkTarget.Lstat(); err != nil { + if os.IsNotExist(err) { + if !allowNonexistentTargets { + return errNonexistentLinkTarget + } + // if we're allowing nonexistent link targets, proceed to creating the link + } else { + return err + } + } + // Ensure that the link we're about to create doesn't already exist + if err := linkFilename.Remove(); err != nil && !errors.Is(err, os.ErrNotExist) { + return err + } + if err := linkFilename.Symlink(relativeLinkTarget); err != nil { + return err + } + return nil +} + +func (cache *httpCache) Clean(_ turbopath.AbsoluteSystemPath) { + // Not possible; this implementation can only clean for a hash. +} + +func (cache *httpCache) CleanAll() { + // Also not possible. +} + +func (cache *httpCache) Shutdown() {} + +func newHTTPCache(opts Opts, client client, recorder analytics.Recorder) *httpCache { + return &httpCache{ + writable: true, + client: client, + requestLimiter: make(limiter, 20), + recorder: recorder, + signerVerifier: &ArtifactSignatureAuthentication{ + // TODO(Gaspar): this should use RemoteCacheOptions.TeamId once we start + // enforcing team restrictions for repositories. + teamId: client.GetTeamID(), + enabled: opts.RemoteCacheOpts.Signature, + }, + } +} diff --git a/cli/internal/cache/cache_http_test.go b/cli/internal/cache/cache_http_test.go new file mode 100644 index 0000000..d187931 --- /dev/null +++ b/cli/internal/cache/cache_http_test.go @@ -0,0 +1,245 @@ +package cache + +import ( + "archive/tar" + "bytes" + "errors" + "net/http" + "testing" + + "github.com/DataDog/zstd" + + "github.com/vercel/turbo/cli/internal/fs" + "github.com/vercel/turbo/cli/internal/turbopath" + "github.com/vercel/turbo/cli/internal/util" + "gotest.tools/v3/assert" +) + +type errorResp struct { + err error +} + +func (sr *errorResp) PutArtifact(hash string, body []byte, duration int, tag string) error { + return sr.err +} + +func (sr *errorResp) FetchArtifact(hash string) (*http.Response, error) { + return nil, sr.err +} + +func (sr *errorResp) ArtifactExists(hash string) (*http.Response, error) { + return nil, sr.err +} + +func (sr *errorResp) GetTeamID() string { + return "" +} + +func TestRemoteCachingDisabled(t *testing.T) { + clientErr := &util.CacheDisabledError{ + Status: util.CachingStatusDisabled, + Message: "Remote Caching has been disabled for this team. A team owner can enable it here: $URL", + } + client := &errorResp{err: clientErr} + cache := &httpCache{ + client: client, + requestLimiter: make(limiter, 20), + } + cd := &util.CacheDisabledError{} + _, _, _, err := cache.Fetch("unused-target", "some-hash", []string{"unused", "outputs"}) + if !errors.As(err, &cd) { + t.Errorf("cache.Fetch err got %v, want a CacheDisabled error", err) + } + if cd.Status != util.CachingStatusDisabled { + t.Errorf("CacheDisabled.Status got %v, want %v", cd.Status, util.CachingStatusDisabled) + } +} + +func makeValidTar(t *testing.T) *bytes.Buffer { + // + // my-pkg/ + // some-file + // link-to-extra-file -> ../extra-file + // broken-link -> ../../global-dep + // extra-file + + t.Helper() + buf := &bytes.Buffer{} + zw := zstd.NewWriter(buf) + defer func() { + if err := zw.Close(); err != nil { + t.Fatalf("failed to close gzip: %v", err) + } + }() + tw := tar.NewWriter(zw) + defer func() { + if err := tw.Close(); err != nil { + t.Fatalf("failed to close tar: %v", err) + } + }() + + // my-pkg + h := &tar.Header{ + Name: "my-pkg/", + Mode: int64(0644), + Typeflag: tar.TypeDir, + } + if err := tw.WriteHeader(h); err != nil { + t.Fatalf("failed to write header: %v", err) + } + // my-pkg/some-file + contents := []byte("some-file-contents") + h = &tar.Header{ + Name: "my-pkg/some-file", + Mode: int64(0644), + Typeflag: tar.TypeReg, + Size: int64(len(contents)), + } + if err := tw.WriteHeader(h); err != nil { + t.Fatalf("failed to write header: %v", err) + } + if _, err := tw.Write(contents); err != nil { + t.Fatalf("failed to write file: %v", err) + } + // my-pkg/link-to-extra-file + h = &tar.Header{ + Name: "my-pkg/link-to-extra-file", + Mode: int64(0644), + Typeflag: tar.TypeSymlink, + Linkname: "../extra-file", + } + if err := tw.WriteHeader(h); err != nil { + t.Fatalf("failed to write header: %v", err) + } + // my-pkg/broken-link + h = &tar.Header{ + Name: "my-pkg/broken-link", + Mode: int64(0644), + Typeflag: tar.TypeSymlink, + Linkname: "../../global-dep", + } + if err := tw.WriteHeader(h); err != nil { + t.Fatalf("failed to write header: %v", err) + } + // extra-file + contents = []byte("extra-file-contents") + h = &tar.Header{ + Name: "extra-file", + Mode: int64(0644), + Typeflag: tar.TypeReg, + Size: int64(len(contents)), + } + if err := tw.WriteHeader(h); err != nil { + t.Fatalf("failed to write header: %v", err) + } + if _, err := tw.Write(contents); err != nil { + t.Fatalf("failed to write file: %v", err) + } + + return buf +} + +func makeInvalidTar(t *testing.T) *bytes.Buffer { + // contains a single file that traverses out + // ../some-file + + t.Helper() + buf := &bytes.Buffer{} + zw := zstd.NewWriter(buf) + defer func() { + if err := zw.Close(); err != nil { + t.Fatalf("failed to close gzip: %v", err) + } + }() + tw := tar.NewWriter(zw) + defer func() { + if err := tw.Close(); err != nil { + t.Fatalf("failed to close tar: %v", err) + } + }() + + // my-pkg/some-file + contents := []byte("some-file-contents") + h := &tar.Header{ + Name: "../some-file", + Mode: int64(0644), + Typeflag: tar.TypeReg, + Size: int64(len(contents)), + } + if err := tw.WriteHeader(h); err != nil { + t.Fatalf("failed to write header: %v", err) + } + if _, err := tw.Write(contents); err != nil { + t.Fatalf("failed to write file: %v", err) + } + return buf +} + +func TestRestoreTar(t *testing.T) { + root := fs.AbsoluteSystemPathFromUpstream(t.TempDir()) + + tar := makeValidTar(t) + + expectedFiles := []turbopath.AnchoredSystemPath{ + turbopath.AnchoredUnixPath("extra-file").ToSystemPath(), + turbopath.AnchoredUnixPath("my-pkg/").ToSystemPath(), + turbopath.AnchoredUnixPath("my-pkg/some-file").ToSystemPath(), + turbopath.AnchoredUnixPath("my-pkg/link-to-extra-file").ToSystemPath(), + turbopath.AnchoredUnixPath("my-pkg/broken-link").ToSystemPath(), + } + files, err := restoreTar(root, tar) + assert.NilError(t, err, "readTar") + + expectedSet := make(util.Set) + for _, file := range expectedFiles { + expectedSet.Add(file.ToString()) + } + gotSet := make(util.Set) + for _, file := range files { + gotSet.Add(file.ToString()) + } + extraFiles := gotSet.Difference(expectedSet) + if extraFiles.Len() > 0 { + t.Errorf("got extra files: %v", extraFiles.UnsafeListOfStrings()) + } + missingFiles := expectedSet.Difference(gotSet) + if missingFiles.Len() > 0 { + t.Errorf("missing expected files: %v", missingFiles.UnsafeListOfStrings()) + } + + // Verify file contents + extraFile := root.UntypedJoin("extra-file") + contents, err := extraFile.ReadFile() + assert.NilError(t, err, "ReadFile") + assert.DeepEqual(t, contents, []byte("extra-file-contents")) + + someFile := root.UntypedJoin("my-pkg", "some-file") + contents, err = someFile.ReadFile() + assert.NilError(t, err, "ReadFile") + assert.DeepEqual(t, contents, []byte("some-file-contents")) +} + +func TestRestoreInvalidTar(t *testing.T) { + root := fs.AbsoluteSystemPathFromUpstream(t.TempDir()) + expectedContents := []byte("important-data") + someFile := root.UntypedJoin("some-file") + err := someFile.WriteFile(expectedContents, 0644) + assert.NilError(t, err, "WriteFile") + + tar := makeInvalidTar(t) + // use a child directory so that blindly untarring will squash the file + // that we just wrote above. + repoRoot := root.UntypedJoin("repo") + _, err = restoreTar(repoRoot, tar) + if err == nil { + t.Error("expected error untarring invalid tar") + } + + contents, err := someFile.ReadFile() + assert.NilError(t, err, "ReadFile") + assert.Equal(t, string(contents), string(expectedContents), "expected to not overwrite file") +} + +// Note that testing Put will require mocking the filesystem and is not currently the most +// interesting test. The current implementation directly returns the error from PutArtifact. +// We should still add the test once feasible to avoid future breakage. diff --git a/cli/internal/cache/cache_noop.go b/cli/internal/cache/cache_noop.go new file mode 100644 index 0000000..80a3c23 --- /dev/null +++ b/cli/internal/cache/cache_noop.go @@ -0,0 +1,23 @@ +package cache + +import "github.com/vercel/turbo/cli/internal/turbopath" + +type noopCache struct{} + +func newNoopCache() *noopCache { + return &noopCache{} +} + +func (c *noopCache) Put(_ turbopath.AbsoluteSystemPath, _ string, _ int, _ []turbopath.AnchoredSystemPath) error { + return nil +} +func (c *noopCache) Fetch(_ turbopath.AbsoluteSystemPath, _ string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) { + return ItemStatus{Local: false, Remote: false}, nil, 0, nil +} +func (c *noopCache) Exists(_ string) ItemStatus { + return ItemStatus{} +} + +func (c *noopCache) Clean(_ turbopath.AbsoluteSystemPath) {} +func (c *noopCache) CleanAll() {} +func (c *noopCache) Shutdown() {} diff --git a/cli/internal/cache/cache_signature_authentication.go b/cli/internal/cache/cache_signature_authentication.go new file mode 100644 index 0000000..f9fe4c0 --- /dev/null +++ b/cli/internal/cache/cache_signature_authentication.go @@ -0,0 +1,88 @@ +// Adapted from https://github.com/thought-machine/please +// Copyright Thought Machine, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package cache + +import ( + "crypto/hmac" + "crypto/sha256" + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "hash" + "os" +) + +type ArtifactSignatureAuthentication struct { + teamId string + enabled bool +} + +func (asa *ArtifactSignatureAuthentication) isEnabled() bool { + return asa.enabled +} + +// If the secret key is not found or the secret key length is 0, an error is returned +// Preference is given to the environment specified secret key. +func (asa *ArtifactSignatureAuthentication) secretKey() ([]byte, error) { + secret := os.Getenv("TURBO_REMOTE_CACHE_SIGNATURE_KEY") + if len(secret) == 0 { + return nil, errors.New("signature secret key not found. You must specify a secret key in the TURBO_REMOTE_CACHE_SIGNATURE_KEY environment variable") + } + return []byte(secret), nil +} + +func (asa *ArtifactSignatureAuthentication) generateTag(hash string, artifactBody []byte) (string, error) { + tag, err := asa.getTagGenerator(hash) + if err != nil { + return "", err + } + tag.Write(artifactBody) + return base64.StdEncoding.EncodeToString(tag.Sum(nil)), nil +} + +func (asa *ArtifactSignatureAuthentication) getTagGenerator(hash string) (hash.Hash, error) { + teamId := asa.teamId + secret, err := asa.secretKey() + if err != nil { + return nil, err + } + artifactMetadata := &struct { + Hash string `json:"hash"` + TeamId string `json:"teamId"` + }{ + Hash: hash, + TeamId: teamId, + } + metadata, err := json.Marshal(artifactMetadata) + if err != nil { + return nil, err + } + + // TODO(Gaspar) Support additional signing algorithms here + h := hmac.New(sha256.New, secret) + h.Write(metadata) + return h, nil +} + +func (asa *ArtifactSignatureAuthentication) validate(hash string, artifactBody []byte, expectedTag string) (bool, error) { + computedTag, err := asa.generateTag(hash, artifactBody) + if err != nil { + return false, fmt.Errorf("failed to verify artifact tag: %w", err) + } + return hmac.Equal([]byte(computedTag), []byte(expectedTag)), nil +} + +type StreamValidator struct { + currentHash hash.Hash +} + +func (sv *StreamValidator) Validate(expectedTag string) bool { + computedTag := base64.StdEncoding.EncodeToString(sv.currentHash.Sum(nil)) + return hmac.Equal([]byte(computedTag), []byte(expectedTag)) +} + +func (sv *StreamValidator) CurrentValue() string { + return base64.StdEncoding.EncodeToString(sv.currentHash.Sum(nil)) +} diff --git a/cli/internal/cache/cache_signature_authentication_test.go b/cli/internal/cache/cache_signature_authentication_test.go new file mode 100644 index 0000000..7f3f865 --- /dev/null +++ b/cli/internal/cache/cache_signature_authentication_test.go @@ -0,0 +1,195 @@ +// Adapted from ghttps://github.com/thought-machine/please +// Copyright Thought Machine, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package cache + +import ( + "crypto/hmac" + "crypto/sha256" + "encoding/base64" + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_SecretKeySuccess(t *testing.T) { + teamId := "team_someid" + secretKeyEnvName := "TURBO_REMOTE_CACHE_SIGNATURE_KEY" + secretKeyEnvValue := "my-secret-key-env" + t.Setenv(secretKeyEnvName, secretKeyEnvValue) + + cases := []struct { + name string + asa *ArtifactSignatureAuthentication + expectedSecretKey string + expectedSecretKeyError bool + }{ + { + name: "Accepts secret key", + asa: &ArtifactSignatureAuthentication{ + teamId: teamId, + enabled: true, + }, + expectedSecretKey: secretKeyEnvValue, + expectedSecretKeyError: false, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + secretKey, err := tc.asa.secretKey() + if tc.expectedSecretKeyError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, tc.expectedSecretKey, string(secretKey)) + } + }) + } +} + +func Test_SecretKeyErrors(t *testing.T) { + teamId := "team_someid" + + // Env secret key TURBO_REMOTE_CACHE_SIGNATURE_KEY is not set + + cases := []struct { + name string + asa *ArtifactSignatureAuthentication + expectedSecretKey string + expectedSecretKeyError bool + }{ + { + name: "Secret key not defined errors", + asa: &ArtifactSignatureAuthentication{ + teamId: teamId, + enabled: true, + }, + expectedSecretKey: "", + expectedSecretKeyError: true, + }, + { + name: "Secret key is empty errors", + asa: &ArtifactSignatureAuthentication{ + teamId: teamId, + enabled: true, + }, + expectedSecretKey: "", + expectedSecretKeyError: true, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + secretKey, err := tc.asa.secretKey() + if tc.expectedSecretKeyError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, tc.expectedSecretKey, string(secretKey)) + } + }) + } +} + +func Test_GenerateTagAndValidate(t *testing.T) { + teamId := "team_someid" + hash := "the-artifact-hash" + artifactBody := []byte("the artifact body as bytes") + secretKeyEnvName := "TURBO_REMOTE_CACHE_SIGNATURE_KEY" + secretKeyEnvValue := "my-secret-key-env" + t.Setenv(secretKeyEnvName, secretKeyEnvValue) + + cases := []struct { + name string + asa *ArtifactSignatureAuthentication + expectedTagMatches string + expectedTagDoesNotMatch string + }{ + { + name: "Uses hash to generate tag", + asa: &ArtifactSignatureAuthentication{ + teamId: teamId, + enabled: true, + }, + expectedTagMatches: testUtilGetHMACTag(hash, teamId, artifactBody, secretKeyEnvValue), + expectedTagDoesNotMatch: testUtilGetHMACTag("wrong-hash", teamId, artifactBody, secretKeyEnvValue), + }, + { + name: "Uses teamId to generate tag", + asa: &ArtifactSignatureAuthentication{ + teamId: teamId, + enabled: true, + }, + expectedTagMatches: testUtilGetHMACTag(hash, teamId, artifactBody, secretKeyEnvValue), + expectedTagDoesNotMatch: testUtilGetHMACTag(hash, "wrong-teamId", artifactBody, secretKeyEnvValue), + }, + { + name: "Uses artifactBody to generate tag", + asa: &ArtifactSignatureAuthentication{ + teamId: teamId, + enabled: true, + }, + expectedTagMatches: testUtilGetHMACTag(hash, teamId, artifactBody, secretKeyEnvValue), + expectedTagDoesNotMatch: testUtilGetHMACTag(hash, teamId, []byte("wrong-artifact-body"), secretKeyEnvValue), + }, + { + name: "Uses secret to generate tag", + asa: &ArtifactSignatureAuthentication{ + teamId: teamId, + enabled: true, + }, + expectedTagMatches: testUtilGetHMACTag(hash, teamId, artifactBody, secretKeyEnvValue), + expectedTagDoesNotMatch: testUtilGetHMACTag(hash, teamId, artifactBody, "wrong-secret"), + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + tag, err := tc.asa.generateTag(hash, artifactBody) + assert.NoError(t, err) + + // validates the tag + assert.Equal(t, tc.expectedTagMatches, tag) + isValid, err := tc.asa.validate(hash, artifactBody, tc.expectedTagMatches) + assert.NoError(t, err) + assert.True(t, isValid) + + // does not validate the tag + assert.NotEqual(t, tc.expectedTagDoesNotMatch, tag) + isValid, err = tc.asa.validate(hash, artifactBody, tc.expectedTagDoesNotMatch) + assert.NoError(t, err) + assert.False(t, isValid) + + }) + } +} + +// Test utils + +// Return the Base64 encoded HMAC given the artifact metadata and artifact body +func testUtilGetHMACTag(hash string, teamId string, artifactBody []byte, secret string) string { + artifactMetadata := &struct { + Hash string `json:"hash"` + TeamId string `json:"teamId"` + }{ + Hash: hash, + TeamId: teamId, + } + metadata, _ := json.Marshal(artifactMetadata) + h := hmac.New(sha256.New, []byte(secret)) + h.Write(metadata) + h.Write(artifactBody) + return base64.StdEncoding.EncodeToString(h.Sum(nil)) +} + +func Test_Utils(t *testing.T) { + teamId := "team_someid" + secret := "my-secret" + hash := "the-artifact-hash" + artifactBody := []byte("the artifact body as bytes") + testTag := testUtilGetHMACTag(hash, teamId, artifactBody, secret) + expectedTag := "9Fu8YniPZ2dEBolTPQoNlFWG0LNMW8EXrBsRmf/fEHk=" + assert.True(t, hmac.Equal([]byte(testTag), []byte(expectedTag))) +} diff --git a/cli/internal/cache/cache_test.go b/cli/internal/cache/cache_test.go new file mode 100644 index 0000000..3f17877 --- /dev/null +++ b/cli/internal/cache/cache_test.go @@ -0,0 +1,318 @@ +package cache + +import ( + "net/http" + "reflect" + "sync/atomic" + "testing" + + "github.com/vercel/turbo/cli/internal/analytics" + "github.com/vercel/turbo/cli/internal/fs" + "github.com/vercel/turbo/cli/internal/turbopath" + "github.com/vercel/turbo/cli/internal/util" +) + +type testCache struct { + disabledErr *util.CacheDisabledError + entries map[string][]turbopath.AnchoredSystemPath +} + +func (tc *testCache) Fetch(_ turbopath.AbsoluteSystemPath, hash string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) { + if tc.disabledErr != nil { + return ItemStatus{}, nil, 0, tc.disabledErr + } + foundFiles, ok := tc.entries[hash] + if ok { + duration := 5 + return ItemStatus{Local: true}, foundFiles, duration, nil + } + return ItemStatus{}, nil, 0, nil +} + +func (tc *testCache) Exists(hash string) ItemStatus { + if tc.disabledErr != nil { + return ItemStatus{} + } + _, ok := tc.entries[hash] + if ok { + return ItemStatus{Local: true} + } + return ItemStatus{} +} + +func (tc *testCache) Put(_ turbopath.AbsoluteSystemPath, hash string, _ int, files []turbopath.AnchoredSystemPath) error { + if tc.disabledErr != nil { + return tc.disabledErr + } + tc.entries[hash] = files + return nil +} + +func (tc *testCache) Clean(_ turbopath.AbsoluteSystemPath) {} +func (tc *testCache) CleanAll() {} +func (tc *testCache) Shutdown() {} + +func newEnabledCache() *testCache { + return &testCache{ + entries: make(map[string][]turbopath.AnchoredSystemPath), + } +} + +func newDisabledCache() *testCache { + return &testCache{ + disabledErr: &util.CacheDisabledError{ + Status: util.CachingStatusDisabled, + Message: "remote caching is disabled", + }, + } +} + +func TestPutCachingDisabled(t *testing.T) { + disabledCache := newDisabledCache() + caches := []Cache{ + newEnabledCache(), + disabledCache, + newEnabledCache(), + newEnabledCache(), + } + var removeCalled uint64 + mplex := &cacheMultiplexer{ + caches: caches, + onCacheRemoved: func(cache Cache, err error) { + atomic.AddUint64(&removeCalled, 1) + }, + } + + err := mplex.Put("unused-target", "some-hash", 5, []turbopath.AnchoredSystemPath{"a-file"}) + if err != nil { + // don't leak the cache removal + t.Errorf("Put got error %v, want ", err) + } + + removes := atomic.LoadUint64(&removeCalled) + if removes != 1 { + t.Errorf("removes count: %v, want 1", removes) + } + + mplex.mu.RLock() + if len(mplex.caches) != 3 { + t.Errorf("found %v caches, expected to have 3 after one was removed", len(mplex.caches)) + } + for _, cache := range mplex.caches { + if cache == disabledCache { + t.Error("found disabled cache, expected it to be removed") + } + } + mplex.mu.RUnlock() + + // subsequent Fetch should still work + cacheStatus, _, _, err := mplex.Fetch("unused-target", "some-hash", []string{"unused", "files"}) + if err != nil { + t.Errorf("got error fetching files: %v", err) + } + hit := cacheStatus.Local || cacheStatus.Remote + if !hit { + t.Error("failed to find previously stored files") + } + + removes = atomic.LoadUint64(&removeCalled) + if removes != 1 { + t.Errorf("removes count: %v, want 1", removes) + } +} + +func TestExists(t *testing.T) { + caches := []Cache{ + newEnabledCache(), + } + + mplex := &cacheMultiplexer{ + caches: caches, + } + + itemStatus := mplex.Exists("some-hash") + if itemStatus.Local { + t.Error("did not expect file to exist") + } + + err := mplex.Put("unused-target", "some-hash", 5, []turbopath.AnchoredSystemPath{"a-file"}) + if err != nil { + // don't leak the cache removal + t.Errorf("Put got error %v, want ", err) + } + + itemStatus = mplex.Exists("some-hash") + if !itemStatus.Local { + t.Error("failed to find previously stored files") + } +} + +type fakeClient struct{} + +// FetchArtifact implements client +func (*fakeClient) FetchArtifact(hash string) (*http.Response, error) { + panic("unimplemented") +} + +func (*fakeClient) ArtifactExists(hash string) (*http.Response, error) { + panic("unimplemented") +} + +// GetTeamID implements client +func (*fakeClient) GetTeamID() string { + return "fake-team-id" +} + +// PutArtifact implements client +func (*fakeClient) PutArtifact(hash string, body []byte, duration int, tag string) error { + panic("unimplemented") +} + +var _ client = &fakeClient{} + +func TestFetchCachingDisabled(t *testing.T) { + disabledCache := newDisabledCache() + caches := []Cache{ + newEnabledCache(), + disabledCache, + newEnabledCache(), + newEnabledCache(), + } + var removeCalled uint64 + mplex := &cacheMultiplexer{ + caches: caches, + onCacheRemoved: func(cache Cache, err error) { + atomic.AddUint64(&removeCalled, 1) + }, + } + + cacheStatus, _, _, err := mplex.Fetch("unused-target", "some-hash", []string{"unused", "files"}) + if err != nil { + // don't leak the cache removal + t.Errorf("Fetch got error %v, want ", err) + } + hit := cacheStatus.Local || cacheStatus.Remote + if hit { + t.Error("hit on empty cache, expected miss") + } + + removes := atomic.LoadUint64(&removeCalled) + if removes != 1 { + t.Errorf("removes count: %v, want 1", removes) + } + + mplex.mu.RLock() + if len(mplex.caches) != 3 { + t.Errorf("found %v caches, expected to have 3 after one was removed", len(mplex.caches)) + } + for _, cache := range mplex.caches { + if cache == disabledCache { + t.Error("found disabled cache, expected it to be removed") + } + } + mplex.mu.RUnlock() +} + +type nullRecorder struct{} + +func (nullRecorder) LogEvent(analytics.EventPayload) {} + +func TestNew(t *testing.T) { + // Test will bomb if this fails, no need to specially handle the error + repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir()) + type args struct { + opts Opts + recorder analytics.Recorder + onCacheRemoved OnCacheRemoved + client fakeClient + } + tests := []struct { + name string + args args + want Cache + wantErr bool + }{ + { + name: "With no caches configured, new returns a noopCache and an error", + args: args{ + opts: Opts{ + SkipFilesystem: true, + SkipRemote: true, + }, + recorder: &nullRecorder{}, + onCacheRemoved: func(Cache, error) {}, + }, + want: &noopCache{}, + wantErr: true, + }, + { + name: "With just httpCache configured, new returns an httpCache and a noopCache", + args: args{ + opts: Opts{ + SkipFilesystem: true, + RemoteCacheOpts: fs.RemoteCacheOptions{ + Signature: true, + }, + }, + recorder: &nullRecorder{}, + onCacheRemoved: func(Cache, error) {}, + }, + want: &cacheMultiplexer{ + caches: []Cache{&httpCache{}, &noopCache{}}, + }, + wantErr: false, + }, + { + name: "With just fsCache configured, new returns only an fsCache", + args: args{ + opts: Opts{ + SkipRemote: true, + }, + recorder: &nullRecorder{}, + onCacheRemoved: func(Cache, error) {}, + }, + want: &fsCache{}, + }, + { + name: "With both configured, new returns an fsCache and httpCache", + args: args{ + opts: Opts{ + RemoteCacheOpts: fs.RemoteCacheOptions{ + Signature: true, + }, + }, + recorder: &nullRecorder{}, + onCacheRemoved: func(Cache, error) {}, + }, + want: &cacheMultiplexer{ + caches: []Cache{&fsCache{}, &httpCache{}}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := New(tt.args.opts, repoRoot, &tt.args.client, tt.args.recorder, tt.args.onCacheRemoved) + if (err != nil) != tt.wantErr { + t.Errorf("New() error = %v, wantErr %v", err, tt.wantErr) + return + } + switch multiplexer := got.(type) { + case *cacheMultiplexer: + want := tt.want.(*cacheMultiplexer) + for i := range multiplexer.caches { + if reflect.TypeOf(multiplexer.caches[i]) != reflect.TypeOf(want.caches[i]) { + t.Errorf("New() = %v, want %v", reflect.TypeOf(multiplexer.caches[i]), reflect.TypeOf(want.caches[i])) + } + } + case *fsCache: + if reflect.TypeOf(got) != reflect.TypeOf(tt.want) { + t.Errorf("New() = %v, want %v", reflect.TypeOf(got), reflect.TypeOf(tt.want)) + } + case *noopCache: + if reflect.TypeOf(got) != reflect.TypeOf(tt.want) { + t.Errorf("New() = %v, want %v", reflect.TypeOf(got), reflect.TypeOf(tt.want)) + } + } + }) + } +} -- cgit v1.2.3-70-g09d2