From fc8c5fdce62fb229202659408798a7b6c98f6e8b Mon Sep 17 00:00:00 2001 From: 简律纯 Date: Fri, 28 Apr 2023 01:36:55 +0800 Subject: --- cli/internal/cache/cache.go | 317 -------------------------------------------- 1 file changed, 317 deletions(-) delete mode 100644 cli/internal/cache/cache.go (limited to 'cli/internal/cache/cache.go') diff --git a/cli/internal/cache/cache.go b/cli/internal/cache/cache.go deleted file mode 100644 index 8b74272..0000000 --- a/cli/internal/cache/cache.go +++ /dev/null @@ -1,317 +0,0 @@ -// Package cache abstracts storing and fetching previously run tasks -// -// Adapted from https://github.com/thought-machine/please -// Copyright Thought Machine, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 -package cache - -import ( - "errors" - "sync" - - "github.com/vercel/turbo/cli/internal/analytics" - "github.com/vercel/turbo/cli/internal/fs" - "github.com/vercel/turbo/cli/internal/turbopath" - "github.com/vercel/turbo/cli/internal/util" - "golang.org/x/sync/errgroup" -) - -// Cache is abstracted way to cache/fetch previously run tasks -type Cache interface { - // Fetch returns true if there is a cache it. It is expected to move files - // into their correct position as a side effect - Fetch(anchor turbopath.AbsoluteSystemPath, hash string, files []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) - Exists(hash string) ItemStatus - // Put caches files for a given hash - Put(anchor turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error - Clean(anchor turbopath.AbsoluteSystemPath) - CleanAll() - Shutdown() -} - -// ItemStatus holds whether artifacts exists for a given hash on local -// and/or remote caching server -type ItemStatus struct { - Local bool `json:"local"` - Remote bool `json:"remote"` -} - -const ( - // CacheSourceFS is a constant to indicate local cache hit - CacheSourceFS = "LOCAL" - // CacheSourceRemote is a constant to indicate remote cache hit - CacheSourceRemote = "REMOTE" - // CacheEventHit is a constant to indicate a cache hit - CacheEventHit = "HIT" - // CacheEventMiss is a constant to indicate a cache miss - CacheEventMiss = "MISS" -) - -type CacheEvent struct { - Source string `mapstructure:"source"` - Event string `mapstructure:"event"` - Hash string `mapstructure:"hash"` - Duration int `mapstructure:"duration"` -} - -// DefaultLocation returns the default filesystem cache location, given a repo root -func DefaultLocation(repoRoot turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath { - return repoRoot.UntypedJoin("node_modules", ".cache", "turbo") -} - -// OnCacheRemoved defines a callback that the cache system calls if a particular cache -// needs to be removed. In practice, this happens when Remote Caching has been disabled -// the but CLI continues to try to use it. -type OnCacheRemoved = func(cache Cache, err error) - -// ErrNoCachesEnabled is returned when both the filesystem and http cache are unavailable -var ErrNoCachesEnabled = errors.New("no caches are enabled") - -// Opts holds configuration options for the cache -// TODO(gsoltis): further refactor this into fs cache opts and http cache opts -type Opts struct { - OverrideDir string - SkipRemote bool - SkipFilesystem bool - Workers int - RemoteCacheOpts fs.RemoteCacheOptions -} - -// resolveCacheDir calculates the location turbo should use to cache artifacts, -// based on the options supplied by the user. -func (o *Opts) resolveCacheDir(repoRoot turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath { - if o.OverrideDir != "" { - return fs.ResolveUnknownPath(repoRoot, o.OverrideDir) - } - return DefaultLocation(repoRoot) -} - -var _remoteOnlyHelp = `Ignore the local filesystem cache for all tasks. Only -allow reading and caching artifacts using the remote cache.` - -// New creates a new cache -func New(opts Opts, repoRoot turbopath.AbsoluteSystemPath, client client, recorder analytics.Recorder, onCacheRemoved OnCacheRemoved) (Cache, error) { - c, err := newSyncCache(opts, repoRoot, client, recorder, onCacheRemoved) - if err != nil && !errors.Is(err, ErrNoCachesEnabled) { - return nil, err - } - if opts.Workers > 0 { - return newAsyncCache(c, opts), err - } - return c, err -} - -// newSyncCache can return an error with a usable noopCache. -func newSyncCache(opts Opts, repoRoot turbopath.AbsoluteSystemPath, client client, recorder analytics.Recorder, onCacheRemoved OnCacheRemoved) (Cache, error) { - // Check to see if the user has turned off particular cache implementations. - useFsCache := !opts.SkipFilesystem - useHTTPCache := !opts.SkipRemote - - // Since the above two flags are not mutually exclusive it is possible to configure - // yourself out of having a cache. We should tell you about it but we shouldn't fail - // your build for that reason. - // - // Further, since the httpCache can be removed at runtime, we need to insert a noopCache - // as a backup if you are configured to have *just* an httpCache. - // - // This is reduced from (!useFsCache && !useHTTPCache) || (!useFsCache & useHTTPCache) - useNoopCache := !useFsCache - - // Build up an array of cache implementations, we can only ever have 1 or 2. - cacheImplementations := make([]Cache, 0, 2) - - if useFsCache { - implementation, err := newFsCache(opts, recorder, repoRoot) - if err != nil { - return nil, err - } - cacheImplementations = append(cacheImplementations, implementation) - } - - if useHTTPCache { - implementation := newHTTPCache(opts, client, recorder) - cacheImplementations = append(cacheImplementations, implementation) - } - - if useNoopCache { - implementation := newNoopCache() - cacheImplementations = append(cacheImplementations, implementation) - } - - // Precisely two cache implementations: - // fsCache and httpCache OR httpCache and noopCache - useMultiplexer := len(cacheImplementations) > 1 - if useMultiplexer { - // We have early-returned any possible errors for this scenario. - return &cacheMultiplexer{ - onCacheRemoved: onCacheRemoved, - opts: opts, - caches: cacheImplementations, - }, nil - } - - // Precisely one cache implementation: fsCache OR noopCache - implementation := cacheImplementations[0] - _, isNoopCache := implementation.(*noopCache) - - // We want to let the user know something is wonky, but we don't want - // to trigger their build to fail. - if isNoopCache { - return implementation, ErrNoCachesEnabled - } - return implementation, nil -} - -// A cacheMultiplexer multiplexes several caches into one. -// Used when we have several active (eg. http, dir). -type cacheMultiplexer struct { - caches []Cache - opts Opts - mu sync.RWMutex - onCacheRemoved OnCacheRemoved -} - -func (mplex *cacheMultiplexer) Put(anchor turbopath.AbsoluteSystemPath, key string, duration int, files []turbopath.AnchoredSystemPath) error { - return mplex.storeUntil(anchor, key, duration, files, len(mplex.caches)) -} - -type cacheRemoval struct { - cache Cache - err *util.CacheDisabledError -} - -// storeUntil stores artifacts into higher priority caches than the given one. -// Used after artifact retrieval to ensure we have them in eg. the directory cache after -// downloading from the RPC cache. -func (mplex *cacheMultiplexer) storeUntil(anchor turbopath.AbsoluteSystemPath, key string, duration int, files []turbopath.AnchoredSystemPath, stopAt int) error { - // Attempt to store on all caches simultaneously. - toRemove := make([]*cacheRemoval, stopAt) - g := &errgroup.Group{} - mplex.mu.RLock() - for i, cache := range mplex.caches { - if i == stopAt { - break - } - c := cache - i := i - g.Go(func() error { - err := c.Put(anchor, key, duration, files) - if err != nil { - cd := &util.CacheDisabledError{} - if errors.As(err, &cd) { - toRemove[i] = &cacheRemoval{ - cache: c, - err: cd, - } - // we don't want this to cancel other cache actions - return nil - } - return err - } - return nil - }) - } - mplex.mu.RUnlock() - - if err := g.Wait(); err != nil { - return err - } - - for _, removal := range toRemove { - if removal != nil { - mplex.removeCache(removal) - } - } - return nil -} - -// removeCache takes a requested removal and tries to actually remove it. However, -// multiple requests could result in concurrent requests to remove the same cache. -// Let one of them win and propagate the error, the rest will no-op. -func (mplex *cacheMultiplexer) removeCache(removal *cacheRemoval) { - mplex.mu.Lock() - defer mplex.mu.Unlock() - for i, cache := range mplex.caches { - if cache == removal.cache { - mplex.caches = append(mplex.caches[:i], mplex.caches[i+1:]...) - mplex.onCacheRemoved(cache, removal.err) - break - } - } -} - -func (mplex *cacheMultiplexer) Fetch(anchor turbopath.AbsoluteSystemPath, key string, files []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) { - // Make a shallow copy of the caches, since storeUntil can call removeCache - mplex.mu.RLock() - caches := make([]Cache, len(mplex.caches)) - copy(caches, mplex.caches) - mplex.mu.RUnlock() - - // We need to return a composite cache status from multiple caches - // Initialize the empty struct so we can assign values to it. This is similar - // to how the Exists() method works. - combinedCacheState := ItemStatus{} - - // Retrieve from caches sequentially; if we did them simultaneously we could - // easily write the same file from two goroutines at once. - for i, cache := range caches { - itemStatus, actualFiles, duration, err := cache.Fetch(anchor, key, files) - ok := itemStatus.Local || itemStatus.Remote - - if err != nil { - cd := &util.CacheDisabledError{} - if errors.As(err, &cd) { - mplex.removeCache(&cacheRemoval{ - cache: cache, - err: cd, - }) - } - // We're ignoring the error in the else case, since with this cache - // abstraction, we want to check lower priority caches rather than fail - // the operation. Future work that plumbs UI / Logging into the cache system - // should probably log this at least. - } - if ok { - // Store this into other caches. We can ignore errors here because we know - // we have previously successfully stored in a higher-priority cache, and so the overall - // result is a success at fetching. Storing in lower-priority caches is an optimization. - _ = mplex.storeUntil(anchor, key, duration, actualFiles, i) - - // If another cache had already set this to true, we don't need to set it again from this cache - combinedCacheState.Local = combinedCacheState.Local || itemStatus.Local - combinedCacheState.Remote = combinedCacheState.Remote || itemStatus.Remote - return combinedCacheState, actualFiles, duration, err - } - } - - return ItemStatus{Local: false, Remote: false}, nil, 0, nil -} - -func (mplex *cacheMultiplexer) Exists(target string) ItemStatus { - syncCacheState := ItemStatus{} - for _, cache := range mplex.caches { - itemStatus := cache.Exists(target) - syncCacheState.Local = syncCacheState.Local || itemStatus.Local - syncCacheState.Remote = syncCacheState.Remote || itemStatus.Remote - } - - return syncCacheState -} - -func (mplex *cacheMultiplexer) Clean(anchor turbopath.AbsoluteSystemPath) { - for _, cache := range mplex.caches { - cache.Clean(anchor) - } -} - -func (mplex *cacheMultiplexer) CleanAll() { - for _, cache := range mplex.caches { - cache.CleanAll() - } -} - -func (mplex *cacheMultiplexer) Shutdown() { - for _, cache := range mplex.caches { - cache.Shutdown() - } -} -- cgit v1.2.3-70-g09d2