aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/cli/internal/cache/cache.go
diff options
context:
space:
mode:
Diffstat (limited to 'cli/internal/cache/cache.go')
-rw-r--r--cli/internal/cache/cache.go317
1 files changed, 317 insertions, 0 deletions
diff --git a/cli/internal/cache/cache.go b/cli/internal/cache/cache.go
new file mode 100644
index 0000000..8b74272
--- /dev/null
+++ b/cli/internal/cache/cache.go
@@ -0,0 +1,317 @@
+// Package cache abstracts storing and fetching previously run tasks
+//
+// Adapted from https://github.com/thought-machine/please
+// Copyright Thought Machine, Inc. or its affiliates. All Rights Reserved.
+// SPDX-License-Identifier: Apache-2.0
+package cache
+
+import (
+ "errors"
+ "sync"
+
+ "github.com/vercel/turbo/cli/internal/analytics"
+ "github.com/vercel/turbo/cli/internal/fs"
+ "github.com/vercel/turbo/cli/internal/turbopath"
+ "github.com/vercel/turbo/cli/internal/util"
+ "golang.org/x/sync/errgroup"
+)
+
+// Cache is abstracted way to cache/fetch previously run tasks
+type Cache interface {
+ // Fetch returns true if there is a cache it. It is expected to move files
+ // into their correct position as a side effect
+ Fetch(anchor turbopath.AbsoluteSystemPath, hash string, files []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error)
+ Exists(hash string) ItemStatus
+ // Put caches files for a given hash
+ Put(anchor turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error
+ Clean(anchor turbopath.AbsoluteSystemPath)
+ CleanAll()
+ Shutdown()
+}
+
+// ItemStatus holds whether artifacts exists for a given hash on local
+// and/or remote caching server
+type ItemStatus struct {
+ Local bool `json:"local"`
+ Remote bool `json:"remote"`
+}
+
+const (
+ // CacheSourceFS is a constant to indicate local cache hit
+ CacheSourceFS = "LOCAL"
+ // CacheSourceRemote is a constant to indicate remote cache hit
+ CacheSourceRemote = "REMOTE"
+ // CacheEventHit is a constant to indicate a cache hit
+ CacheEventHit = "HIT"
+ // CacheEventMiss is a constant to indicate a cache miss
+ CacheEventMiss = "MISS"
+)
+
+type CacheEvent struct {
+ Source string `mapstructure:"source"`
+ Event string `mapstructure:"event"`
+ Hash string `mapstructure:"hash"`
+ Duration int `mapstructure:"duration"`
+}
+
+// DefaultLocation returns the default filesystem cache location, given a repo root
+func DefaultLocation(repoRoot turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath {
+ return repoRoot.UntypedJoin("node_modules", ".cache", "turbo")
+}
+
+// OnCacheRemoved defines a callback that the cache system calls if a particular cache
+// needs to be removed. In practice, this happens when Remote Caching has been disabled
+// the but CLI continues to try to use it.
+type OnCacheRemoved = func(cache Cache, err error)
+
+// ErrNoCachesEnabled is returned when both the filesystem and http cache are unavailable
+var ErrNoCachesEnabled = errors.New("no caches are enabled")
+
+// Opts holds configuration options for the cache
+// TODO(gsoltis): further refactor this into fs cache opts and http cache opts
+type Opts struct {
+ OverrideDir string
+ SkipRemote bool
+ SkipFilesystem bool
+ Workers int
+ RemoteCacheOpts fs.RemoteCacheOptions
+}
+
+// resolveCacheDir calculates the location turbo should use to cache artifacts,
+// based on the options supplied by the user.
+func (o *Opts) resolveCacheDir(repoRoot turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath {
+ if o.OverrideDir != "" {
+ return fs.ResolveUnknownPath(repoRoot, o.OverrideDir)
+ }
+ return DefaultLocation(repoRoot)
+}
+
+var _remoteOnlyHelp = `Ignore the local filesystem cache for all tasks. Only
+allow reading and caching artifacts using the remote cache.`
+
+// New creates a new cache
+func New(opts Opts, repoRoot turbopath.AbsoluteSystemPath, client client, recorder analytics.Recorder, onCacheRemoved OnCacheRemoved) (Cache, error) {
+ c, err := newSyncCache(opts, repoRoot, client, recorder, onCacheRemoved)
+ if err != nil && !errors.Is(err, ErrNoCachesEnabled) {
+ return nil, err
+ }
+ if opts.Workers > 0 {
+ return newAsyncCache(c, opts), err
+ }
+ return c, err
+}
+
+// newSyncCache can return an error with a usable noopCache.
+func newSyncCache(opts Opts, repoRoot turbopath.AbsoluteSystemPath, client client, recorder analytics.Recorder, onCacheRemoved OnCacheRemoved) (Cache, error) {
+ // Check to see if the user has turned off particular cache implementations.
+ useFsCache := !opts.SkipFilesystem
+ useHTTPCache := !opts.SkipRemote
+
+ // Since the above two flags are not mutually exclusive it is possible to configure
+ // yourself out of having a cache. We should tell you about it but we shouldn't fail
+ // your build for that reason.
+ //
+ // Further, since the httpCache can be removed at runtime, we need to insert a noopCache
+ // as a backup if you are configured to have *just* an httpCache.
+ //
+ // This is reduced from (!useFsCache && !useHTTPCache) || (!useFsCache & useHTTPCache)
+ useNoopCache := !useFsCache
+
+ // Build up an array of cache implementations, we can only ever have 1 or 2.
+ cacheImplementations := make([]Cache, 0, 2)
+
+ if useFsCache {
+ implementation, err := newFsCache(opts, recorder, repoRoot)
+ if err != nil {
+ return nil, err
+ }
+ cacheImplementations = append(cacheImplementations, implementation)
+ }
+
+ if useHTTPCache {
+ implementation := newHTTPCache(opts, client, recorder)
+ cacheImplementations = append(cacheImplementations, implementation)
+ }
+
+ if useNoopCache {
+ implementation := newNoopCache()
+ cacheImplementations = append(cacheImplementations, implementation)
+ }
+
+ // Precisely two cache implementations:
+ // fsCache and httpCache OR httpCache and noopCache
+ useMultiplexer := len(cacheImplementations) > 1
+ if useMultiplexer {
+ // We have early-returned any possible errors for this scenario.
+ return &cacheMultiplexer{
+ onCacheRemoved: onCacheRemoved,
+ opts: opts,
+ caches: cacheImplementations,
+ }, nil
+ }
+
+ // Precisely one cache implementation: fsCache OR noopCache
+ implementation := cacheImplementations[0]
+ _, isNoopCache := implementation.(*noopCache)
+
+ // We want to let the user know something is wonky, but we don't want
+ // to trigger their build to fail.
+ if isNoopCache {
+ return implementation, ErrNoCachesEnabled
+ }
+ return implementation, nil
+}
+
+// A cacheMultiplexer multiplexes several caches into one.
+// Used when we have several active (eg. http, dir).
+type cacheMultiplexer struct {
+ caches []Cache
+ opts Opts
+ mu sync.RWMutex
+ onCacheRemoved OnCacheRemoved
+}
+
+func (mplex *cacheMultiplexer) Put(anchor turbopath.AbsoluteSystemPath, key string, duration int, files []turbopath.AnchoredSystemPath) error {
+ return mplex.storeUntil(anchor, key, duration, files, len(mplex.caches))
+}
+
+type cacheRemoval struct {
+ cache Cache
+ err *util.CacheDisabledError
+}
+
+// storeUntil stores artifacts into higher priority caches than the given one.
+// Used after artifact retrieval to ensure we have them in eg. the directory cache after
+// downloading from the RPC cache.
+func (mplex *cacheMultiplexer) storeUntil(anchor turbopath.AbsoluteSystemPath, key string, duration int, files []turbopath.AnchoredSystemPath, stopAt int) error {
+ // Attempt to store on all caches simultaneously.
+ toRemove := make([]*cacheRemoval, stopAt)
+ g := &errgroup.Group{}
+ mplex.mu.RLock()
+ for i, cache := range mplex.caches {
+ if i == stopAt {
+ break
+ }
+ c := cache
+ i := i
+ g.Go(func() error {
+ err := c.Put(anchor, key, duration, files)
+ if err != nil {
+ cd := &util.CacheDisabledError{}
+ if errors.As(err, &cd) {
+ toRemove[i] = &cacheRemoval{
+ cache: c,
+ err: cd,
+ }
+ // we don't want this to cancel other cache actions
+ return nil
+ }
+ return err
+ }
+ return nil
+ })
+ }
+ mplex.mu.RUnlock()
+
+ if err := g.Wait(); err != nil {
+ return err
+ }
+
+ for _, removal := range toRemove {
+ if removal != nil {
+ mplex.removeCache(removal)
+ }
+ }
+ return nil
+}
+
+// removeCache takes a requested removal and tries to actually remove it. However,
+// multiple requests could result in concurrent requests to remove the same cache.
+// Let one of them win and propagate the error, the rest will no-op.
+func (mplex *cacheMultiplexer) removeCache(removal *cacheRemoval) {
+ mplex.mu.Lock()
+ defer mplex.mu.Unlock()
+ for i, cache := range mplex.caches {
+ if cache == removal.cache {
+ mplex.caches = append(mplex.caches[:i], mplex.caches[i+1:]...)
+ mplex.onCacheRemoved(cache, removal.err)
+ break
+ }
+ }
+}
+
+func (mplex *cacheMultiplexer) Fetch(anchor turbopath.AbsoluteSystemPath, key string, files []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) {
+ // Make a shallow copy of the caches, since storeUntil can call removeCache
+ mplex.mu.RLock()
+ caches := make([]Cache, len(mplex.caches))
+ copy(caches, mplex.caches)
+ mplex.mu.RUnlock()
+
+ // We need to return a composite cache status from multiple caches
+ // Initialize the empty struct so we can assign values to it. This is similar
+ // to how the Exists() method works.
+ combinedCacheState := ItemStatus{}
+
+ // Retrieve from caches sequentially; if we did them simultaneously we could
+ // easily write the same file from two goroutines at once.
+ for i, cache := range caches {
+ itemStatus, actualFiles, duration, err := cache.Fetch(anchor, key, files)
+ ok := itemStatus.Local || itemStatus.Remote
+
+ if err != nil {
+ cd := &util.CacheDisabledError{}
+ if errors.As(err, &cd) {
+ mplex.removeCache(&cacheRemoval{
+ cache: cache,
+ err: cd,
+ })
+ }
+ // We're ignoring the error in the else case, since with this cache
+ // abstraction, we want to check lower priority caches rather than fail
+ // the operation. Future work that plumbs UI / Logging into the cache system
+ // should probably log this at least.
+ }
+ if ok {
+ // Store this into other caches. We can ignore errors here because we know
+ // we have previously successfully stored in a higher-priority cache, and so the overall
+ // result is a success at fetching. Storing in lower-priority caches is an optimization.
+ _ = mplex.storeUntil(anchor, key, duration, actualFiles, i)
+
+ // If another cache had already set this to true, we don't need to set it again from this cache
+ combinedCacheState.Local = combinedCacheState.Local || itemStatus.Local
+ combinedCacheState.Remote = combinedCacheState.Remote || itemStatus.Remote
+ return combinedCacheState, actualFiles, duration, err
+ }
+ }
+
+ return ItemStatus{Local: false, Remote: false}, nil, 0, nil
+}
+
+func (mplex *cacheMultiplexer) Exists(target string) ItemStatus {
+ syncCacheState := ItemStatus{}
+ for _, cache := range mplex.caches {
+ itemStatus := cache.Exists(target)
+ syncCacheState.Local = syncCacheState.Local || itemStatus.Local
+ syncCacheState.Remote = syncCacheState.Remote || itemStatus.Remote
+ }
+
+ return syncCacheState
+}
+
+func (mplex *cacheMultiplexer) Clean(anchor turbopath.AbsoluteSystemPath) {
+ for _, cache := range mplex.caches {
+ cache.Clean(anchor)
+ }
+}
+
+func (mplex *cacheMultiplexer) CleanAll() {
+ for _, cache := range mplex.caches {
+ cache.CleanAll()
+ }
+}
+
+func (mplex *cacheMultiplexer) Shutdown() {
+ for _, cache := range mplex.caches {
+ cache.Shutdown()
+ }
+}