aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/cli/internal/cache/cache.go
diff options
context:
space:
mode:
author简律纯 <hsiangnianian@outlook.com>2023-04-28 01:36:55 +0800
committer简律纯 <hsiangnianian@outlook.com>2023-04-28 01:36:55 +0800
commitfc8c5fdce62fb229202659408798a7b6c98f6e8b (patch)
tree7554f80e50de4af6fd255afa7c21bcdd58a7af34 /cli/internal/cache/cache.go
parentdd84b9d64fb98746a230cd24233ff50a562c39c9 (diff)
downloadHydroRoll-fc8c5fdce62fb229202659408798a7b6c98f6e8b.tar.gz
HydroRoll-fc8c5fdce62fb229202659408798a7b6c98f6e8b.zip
Diffstat (limited to 'cli/internal/cache/cache.go')
-rw-r--r--cli/internal/cache/cache.go317
1 files changed, 0 insertions, 317 deletions
diff --git a/cli/internal/cache/cache.go b/cli/internal/cache/cache.go
deleted file mode 100644
index 8b74272..0000000
--- a/cli/internal/cache/cache.go
+++ /dev/null
@@ -1,317 +0,0 @@
-// Package cache abstracts storing and fetching previously run tasks
-//
-// Adapted from https://github.com/thought-machine/please
-// Copyright Thought Machine, Inc. or its affiliates. All Rights Reserved.
-// SPDX-License-Identifier: Apache-2.0
-package cache
-
-import (
- "errors"
- "sync"
-
- "github.com/vercel/turbo/cli/internal/analytics"
- "github.com/vercel/turbo/cli/internal/fs"
- "github.com/vercel/turbo/cli/internal/turbopath"
- "github.com/vercel/turbo/cli/internal/util"
- "golang.org/x/sync/errgroup"
-)
-
-// Cache is abstracted way to cache/fetch previously run tasks
-type Cache interface {
- // Fetch returns true if there is a cache it. It is expected to move files
- // into their correct position as a side effect
- Fetch(anchor turbopath.AbsoluteSystemPath, hash string, files []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error)
- Exists(hash string) ItemStatus
- // Put caches files for a given hash
- Put(anchor turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error
- Clean(anchor turbopath.AbsoluteSystemPath)
- CleanAll()
- Shutdown()
-}
-
-// ItemStatus holds whether artifacts exists for a given hash on local
-// and/or remote caching server
-type ItemStatus struct {
- Local bool `json:"local"`
- Remote bool `json:"remote"`
-}
-
-const (
- // CacheSourceFS is a constant to indicate local cache hit
- CacheSourceFS = "LOCAL"
- // CacheSourceRemote is a constant to indicate remote cache hit
- CacheSourceRemote = "REMOTE"
- // CacheEventHit is a constant to indicate a cache hit
- CacheEventHit = "HIT"
- // CacheEventMiss is a constant to indicate a cache miss
- CacheEventMiss = "MISS"
-)
-
-type CacheEvent struct {
- Source string `mapstructure:"source"`
- Event string `mapstructure:"event"`
- Hash string `mapstructure:"hash"`
- Duration int `mapstructure:"duration"`
-}
-
-// DefaultLocation returns the default filesystem cache location, given a repo root
-func DefaultLocation(repoRoot turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath {
- return repoRoot.UntypedJoin("node_modules", ".cache", "turbo")
-}
-
-// OnCacheRemoved defines a callback that the cache system calls if a particular cache
-// needs to be removed. In practice, this happens when Remote Caching has been disabled
-// the but CLI continues to try to use it.
-type OnCacheRemoved = func(cache Cache, err error)
-
-// ErrNoCachesEnabled is returned when both the filesystem and http cache are unavailable
-var ErrNoCachesEnabled = errors.New("no caches are enabled")
-
-// Opts holds configuration options for the cache
-// TODO(gsoltis): further refactor this into fs cache opts and http cache opts
-type Opts struct {
- OverrideDir string
- SkipRemote bool
- SkipFilesystem bool
- Workers int
- RemoteCacheOpts fs.RemoteCacheOptions
-}
-
-// resolveCacheDir calculates the location turbo should use to cache artifacts,
-// based on the options supplied by the user.
-func (o *Opts) resolveCacheDir(repoRoot turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath {
- if o.OverrideDir != "" {
- return fs.ResolveUnknownPath(repoRoot, o.OverrideDir)
- }
- return DefaultLocation(repoRoot)
-}
-
-var _remoteOnlyHelp = `Ignore the local filesystem cache for all tasks. Only
-allow reading and caching artifacts using the remote cache.`
-
-// New creates a new cache
-func New(opts Opts, repoRoot turbopath.AbsoluteSystemPath, client client, recorder analytics.Recorder, onCacheRemoved OnCacheRemoved) (Cache, error) {
- c, err := newSyncCache(opts, repoRoot, client, recorder, onCacheRemoved)
- if err != nil && !errors.Is(err, ErrNoCachesEnabled) {
- return nil, err
- }
- if opts.Workers > 0 {
- return newAsyncCache(c, opts), err
- }
- return c, err
-}
-
-// newSyncCache can return an error with a usable noopCache.
-func newSyncCache(opts Opts, repoRoot turbopath.AbsoluteSystemPath, client client, recorder analytics.Recorder, onCacheRemoved OnCacheRemoved) (Cache, error) {
- // Check to see if the user has turned off particular cache implementations.
- useFsCache := !opts.SkipFilesystem
- useHTTPCache := !opts.SkipRemote
-
- // Since the above two flags are not mutually exclusive it is possible to configure
- // yourself out of having a cache. We should tell you about it but we shouldn't fail
- // your build for that reason.
- //
- // Further, since the httpCache can be removed at runtime, we need to insert a noopCache
- // as a backup if you are configured to have *just* an httpCache.
- //
- // This is reduced from (!useFsCache && !useHTTPCache) || (!useFsCache & useHTTPCache)
- useNoopCache := !useFsCache
-
- // Build up an array of cache implementations, we can only ever have 1 or 2.
- cacheImplementations := make([]Cache, 0, 2)
-
- if useFsCache {
- implementation, err := newFsCache(opts, recorder, repoRoot)
- if err != nil {
- return nil, err
- }
- cacheImplementations = append(cacheImplementations, implementation)
- }
-
- if useHTTPCache {
- implementation := newHTTPCache(opts, client, recorder)
- cacheImplementations = append(cacheImplementations, implementation)
- }
-
- if useNoopCache {
- implementation := newNoopCache()
- cacheImplementations = append(cacheImplementations, implementation)
- }
-
- // Precisely two cache implementations:
- // fsCache and httpCache OR httpCache and noopCache
- useMultiplexer := len(cacheImplementations) > 1
- if useMultiplexer {
- // We have early-returned any possible errors for this scenario.
- return &cacheMultiplexer{
- onCacheRemoved: onCacheRemoved,
- opts: opts,
- caches: cacheImplementations,
- }, nil
- }
-
- // Precisely one cache implementation: fsCache OR noopCache
- implementation := cacheImplementations[0]
- _, isNoopCache := implementation.(*noopCache)
-
- // We want to let the user know something is wonky, but we don't want
- // to trigger their build to fail.
- if isNoopCache {
- return implementation, ErrNoCachesEnabled
- }
- return implementation, nil
-}
-
-// A cacheMultiplexer multiplexes several caches into one.
-// Used when we have several active (eg. http, dir).
-type cacheMultiplexer struct {
- caches []Cache
- opts Opts
- mu sync.RWMutex
- onCacheRemoved OnCacheRemoved
-}
-
-func (mplex *cacheMultiplexer) Put(anchor turbopath.AbsoluteSystemPath, key string, duration int, files []turbopath.AnchoredSystemPath) error {
- return mplex.storeUntil(anchor, key, duration, files, len(mplex.caches))
-}
-
-type cacheRemoval struct {
- cache Cache
- err *util.CacheDisabledError
-}
-
-// storeUntil stores artifacts into higher priority caches than the given one.
-// Used after artifact retrieval to ensure we have them in eg. the directory cache after
-// downloading from the RPC cache.
-func (mplex *cacheMultiplexer) storeUntil(anchor turbopath.AbsoluteSystemPath, key string, duration int, files []turbopath.AnchoredSystemPath, stopAt int) error {
- // Attempt to store on all caches simultaneously.
- toRemove := make([]*cacheRemoval, stopAt)
- g := &errgroup.Group{}
- mplex.mu.RLock()
- for i, cache := range mplex.caches {
- if i == stopAt {
- break
- }
- c := cache
- i := i
- g.Go(func() error {
- err := c.Put(anchor, key, duration, files)
- if err != nil {
- cd := &util.CacheDisabledError{}
- if errors.As(err, &cd) {
- toRemove[i] = &cacheRemoval{
- cache: c,
- err: cd,
- }
- // we don't want this to cancel other cache actions
- return nil
- }
- return err
- }
- return nil
- })
- }
- mplex.mu.RUnlock()
-
- if err := g.Wait(); err != nil {
- return err
- }
-
- for _, removal := range toRemove {
- if removal != nil {
- mplex.removeCache(removal)
- }
- }
- return nil
-}
-
-// removeCache takes a requested removal and tries to actually remove it. However,
-// multiple requests could result in concurrent requests to remove the same cache.
-// Let one of them win and propagate the error, the rest will no-op.
-func (mplex *cacheMultiplexer) removeCache(removal *cacheRemoval) {
- mplex.mu.Lock()
- defer mplex.mu.Unlock()
- for i, cache := range mplex.caches {
- if cache == removal.cache {
- mplex.caches = append(mplex.caches[:i], mplex.caches[i+1:]...)
- mplex.onCacheRemoved(cache, removal.err)
- break
- }
- }
-}
-
-func (mplex *cacheMultiplexer) Fetch(anchor turbopath.AbsoluteSystemPath, key string, files []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) {
- // Make a shallow copy of the caches, since storeUntil can call removeCache
- mplex.mu.RLock()
- caches := make([]Cache, len(mplex.caches))
- copy(caches, mplex.caches)
- mplex.mu.RUnlock()
-
- // We need to return a composite cache status from multiple caches
- // Initialize the empty struct so we can assign values to it. This is similar
- // to how the Exists() method works.
- combinedCacheState := ItemStatus{}
-
- // Retrieve from caches sequentially; if we did them simultaneously we could
- // easily write the same file from two goroutines at once.
- for i, cache := range caches {
- itemStatus, actualFiles, duration, err := cache.Fetch(anchor, key, files)
- ok := itemStatus.Local || itemStatus.Remote
-
- if err != nil {
- cd := &util.CacheDisabledError{}
- if errors.As(err, &cd) {
- mplex.removeCache(&cacheRemoval{
- cache: cache,
- err: cd,
- })
- }
- // We're ignoring the error in the else case, since with this cache
- // abstraction, we want to check lower priority caches rather than fail
- // the operation. Future work that plumbs UI / Logging into the cache system
- // should probably log this at least.
- }
- if ok {
- // Store this into other caches. We can ignore errors here because we know
- // we have previously successfully stored in a higher-priority cache, and so the overall
- // result is a success at fetching. Storing in lower-priority caches is an optimization.
- _ = mplex.storeUntil(anchor, key, duration, actualFiles, i)
-
- // If another cache had already set this to true, we don't need to set it again from this cache
- combinedCacheState.Local = combinedCacheState.Local || itemStatus.Local
- combinedCacheState.Remote = combinedCacheState.Remote || itemStatus.Remote
- return combinedCacheState, actualFiles, duration, err
- }
- }
-
- return ItemStatus{Local: false, Remote: false}, nil, 0, nil
-}
-
-func (mplex *cacheMultiplexer) Exists(target string) ItemStatus {
- syncCacheState := ItemStatus{}
- for _, cache := range mplex.caches {
- itemStatus := cache.Exists(target)
- syncCacheState.Local = syncCacheState.Local || itemStatus.Local
- syncCacheState.Remote = syncCacheState.Remote || itemStatus.Remote
- }
-
- return syncCacheState
-}
-
-func (mplex *cacheMultiplexer) Clean(anchor turbopath.AbsoluteSystemPath) {
- for _, cache := range mplex.caches {
- cache.Clean(anchor)
- }
-}
-
-func (mplex *cacheMultiplexer) CleanAll() {
- for _, cache := range mplex.caches {
- cache.CleanAll()
- }
-}
-
-func (mplex *cacheMultiplexer) Shutdown() {
- for _, cache := range mplex.caches {
- cache.Shutdown()
- }
-}