aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/cli/internal/cache/cache.go
blob: 8b74272ed10dd1c5b90a5c2dcafc289917944168 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
// Package cache abstracts storing and fetching previously run tasks
//
// Adapted from https://github.com/thought-machine/please
// Copyright Thought Machine, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package cache

import (
	"errors"
	"sync"

	"github.com/vercel/turbo/cli/internal/analytics"
	"github.com/vercel/turbo/cli/internal/fs"
	"github.com/vercel/turbo/cli/internal/turbopath"
	"github.com/vercel/turbo/cli/internal/util"
	"golang.org/x/sync/errgroup"
)

// Cache is abstracted way to cache/fetch previously run tasks
type Cache interface {
	// Fetch returns true if there is a cache it. It is expected to move files
	// into their correct position as a side effect
	Fetch(anchor turbopath.AbsoluteSystemPath, hash string, files []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error)
	Exists(hash string) ItemStatus
	// Put caches files for a given hash
	Put(anchor turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error
	Clean(anchor turbopath.AbsoluteSystemPath)
	CleanAll()
	Shutdown()
}

// ItemStatus holds whether artifacts exists for a given hash on local
// and/or remote caching server
type ItemStatus struct {
	Local  bool `json:"local"`
	Remote bool `json:"remote"`
}

const (
	// CacheSourceFS is a constant to indicate local cache hit
	CacheSourceFS = "LOCAL"
	// CacheSourceRemote is a constant to indicate remote cache hit
	CacheSourceRemote = "REMOTE"
	// CacheEventHit is a constant to indicate a cache hit
	CacheEventHit = "HIT"
	// CacheEventMiss is a constant to indicate a cache miss
	CacheEventMiss = "MISS"
)

type CacheEvent struct {
	Source   string `mapstructure:"source"`
	Event    string `mapstructure:"event"`
	Hash     string `mapstructure:"hash"`
	Duration int    `mapstructure:"duration"`
}

// DefaultLocation returns the default filesystem cache location, given a repo root
func DefaultLocation(repoRoot turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath {
	return repoRoot.UntypedJoin("node_modules", ".cache", "turbo")
}

// OnCacheRemoved defines a callback that the cache system calls if a particular cache
// needs to be removed. In practice, this happens when Remote Caching has been disabled
// the but CLI continues to try to use it.
type OnCacheRemoved = func(cache Cache, err error)

// ErrNoCachesEnabled is returned when both the filesystem and http cache are unavailable
var ErrNoCachesEnabled = errors.New("no caches are enabled")

// Opts holds configuration options for the cache
// TODO(gsoltis): further refactor this into fs cache opts and http cache opts
type Opts struct {
	OverrideDir     string
	SkipRemote      bool
	SkipFilesystem  bool
	Workers         int
	RemoteCacheOpts fs.RemoteCacheOptions
}

// resolveCacheDir calculates the location turbo should use to cache artifacts,
// based on the options supplied by the user.
func (o *Opts) resolveCacheDir(repoRoot turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath {
	if o.OverrideDir != "" {
		return fs.ResolveUnknownPath(repoRoot, o.OverrideDir)
	}
	return DefaultLocation(repoRoot)
}

var _remoteOnlyHelp = `Ignore the local filesystem cache for all tasks. Only
allow reading and caching artifacts using the remote cache.`

// New creates a new cache
func New(opts Opts, repoRoot turbopath.AbsoluteSystemPath, client client, recorder analytics.Recorder, onCacheRemoved OnCacheRemoved) (Cache, error) {
	c, err := newSyncCache(opts, repoRoot, client, recorder, onCacheRemoved)
	if err != nil && !errors.Is(err, ErrNoCachesEnabled) {
		return nil, err
	}
	if opts.Workers > 0 {
		return newAsyncCache(c, opts), err
	}
	return c, err
}

// newSyncCache can return an error with a usable noopCache.
func newSyncCache(opts Opts, repoRoot turbopath.AbsoluteSystemPath, client client, recorder analytics.Recorder, onCacheRemoved OnCacheRemoved) (Cache, error) {
	// Check to see if the user has turned off particular cache implementations.
	useFsCache := !opts.SkipFilesystem
	useHTTPCache := !opts.SkipRemote

	// Since the above two flags are not mutually exclusive it is possible to configure
	// yourself out of having a cache. We should tell you about it but we shouldn't fail
	// your build for that reason.
	//
	// Further, since the httpCache can be removed at runtime, we need to insert a noopCache
	// as a backup if you are configured to have *just* an httpCache.
	//
	// This is reduced from (!useFsCache && !useHTTPCache) || (!useFsCache & useHTTPCache)
	useNoopCache := !useFsCache

	// Build up an array of cache implementations, we can only ever have 1 or 2.
	cacheImplementations := make([]Cache, 0, 2)

	if useFsCache {
		implementation, err := newFsCache(opts, recorder, repoRoot)
		if err != nil {
			return nil, err
		}
		cacheImplementations = append(cacheImplementations, implementation)
	}

	if useHTTPCache {
		implementation := newHTTPCache(opts, client, recorder)
		cacheImplementations = append(cacheImplementations, implementation)
	}

	if useNoopCache {
		implementation := newNoopCache()
		cacheImplementations = append(cacheImplementations, implementation)
	}

	// Precisely two cache implementations:
	// fsCache and httpCache OR httpCache and noopCache
	useMultiplexer := len(cacheImplementations) > 1
	if useMultiplexer {
		// We have early-returned any possible errors for this scenario.
		return &cacheMultiplexer{
			onCacheRemoved: onCacheRemoved,
			opts:           opts,
			caches:         cacheImplementations,
		}, nil
	}

	// Precisely one cache implementation: fsCache OR noopCache
	implementation := cacheImplementations[0]
	_, isNoopCache := implementation.(*noopCache)

	// We want to let the user know something is wonky, but we don't want
	// to trigger their build to fail.
	if isNoopCache {
		return implementation, ErrNoCachesEnabled
	}
	return implementation, nil
}

// A cacheMultiplexer multiplexes several caches into one.
// Used when we have several active (eg. http, dir).
type cacheMultiplexer struct {
	caches         []Cache
	opts           Opts
	mu             sync.RWMutex
	onCacheRemoved OnCacheRemoved
}

func (mplex *cacheMultiplexer) Put(anchor turbopath.AbsoluteSystemPath, key string, duration int, files []turbopath.AnchoredSystemPath) error {
	return mplex.storeUntil(anchor, key, duration, files, len(mplex.caches))
}

type cacheRemoval struct {
	cache Cache
	err   *util.CacheDisabledError
}

// storeUntil stores artifacts into higher priority caches than the given one.
// Used after artifact retrieval to ensure we have them in eg. the directory cache after
// downloading from the RPC cache.
func (mplex *cacheMultiplexer) storeUntil(anchor turbopath.AbsoluteSystemPath, key string, duration int, files []turbopath.AnchoredSystemPath, stopAt int) error {
	// Attempt to store on all caches simultaneously.
	toRemove := make([]*cacheRemoval, stopAt)
	g := &errgroup.Group{}
	mplex.mu.RLock()
	for i, cache := range mplex.caches {
		if i == stopAt {
			break
		}
		c := cache
		i := i
		g.Go(func() error {
			err := c.Put(anchor, key, duration, files)
			if err != nil {
				cd := &util.CacheDisabledError{}
				if errors.As(err, &cd) {
					toRemove[i] = &cacheRemoval{
						cache: c,
						err:   cd,
					}
					// we don't want this to cancel other cache actions
					return nil
				}
				return err
			}
			return nil
		})
	}
	mplex.mu.RUnlock()

	if err := g.Wait(); err != nil {
		return err
	}

	for _, removal := range toRemove {
		if removal != nil {
			mplex.removeCache(removal)
		}
	}
	return nil
}

// removeCache takes a requested removal and tries to actually remove it. However,
// multiple requests could result in concurrent requests to remove the same cache.
// Let one of them win and propagate the error, the rest will no-op.
func (mplex *cacheMultiplexer) removeCache(removal *cacheRemoval) {
	mplex.mu.Lock()
	defer mplex.mu.Unlock()
	for i, cache := range mplex.caches {
		if cache == removal.cache {
			mplex.caches = append(mplex.caches[:i], mplex.caches[i+1:]...)
			mplex.onCacheRemoved(cache, removal.err)
			break
		}
	}
}

func (mplex *cacheMultiplexer) Fetch(anchor turbopath.AbsoluteSystemPath, key string, files []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) {
	// Make a shallow copy of the caches, since storeUntil can call removeCache
	mplex.mu.RLock()
	caches := make([]Cache, len(mplex.caches))
	copy(caches, mplex.caches)
	mplex.mu.RUnlock()

	// We need to return a composite cache status from multiple caches
	// Initialize the empty struct so we can assign values to it. This is similar
	// to how the Exists() method works.
	combinedCacheState := ItemStatus{}

	// Retrieve from caches sequentially; if we did them simultaneously we could
	// easily write the same file from two goroutines at once.
	for i, cache := range caches {
		itemStatus, actualFiles, duration, err := cache.Fetch(anchor, key, files)
		ok := itemStatus.Local || itemStatus.Remote

		if err != nil {
			cd := &util.CacheDisabledError{}
			if errors.As(err, &cd) {
				mplex.removeCache(&cacheRemoval{
					cache: cache,
					err:   cd,
				})
			}
			// We're ignoring the error in the else case, since with this cache
			// abstraction, we want to check lower priority caches rather than fail
			// the operation. Future work that plumbs UI / Logging into the cache system
			// should probably log this at least.
		}
		if ok {
			// Store this into other caches. We can ignore errors here because we know
			// we have previously successfully stored in a higher-priority cache, and so the overall
			// result is a success at fetching. Storing in lower-priority caches is an optimization.
			_ = mplex.storeUntil(anchor, key, duration, actualFiles, i)

			// If another cache had already set this to true, we don't need to set it again from this cache
			combinedCacheState.Local = combinedCacheState.Local || itemStatus.Local
			combinedCacheState.Remote = combinedCacheState.Remote || itemStatus.Remote
			return combinedCacheState, actualFiles, duration, err
		}
	}

	return ItemStatus{Local: false, Remote: false}, nil, 0, nil
}

func (mplex *cacheMultiplexer) Exists(target string) ItemStatus {
	syncCacheState := ItemStatus{}
	for _, cache := range mplex.caches {
		itemStatus := cache.Exists(target)
		syncCacheState.Local = syncCacheState.Local || itemStatus.Local
		syncCacheState.Remote = syncCacheState.Remote || itemStatus.Remote
	}

	return syncCacheState
}

func (mplex *cacheMultiplexer) Clean(anchor turbopath.AbsoluteSystemPath) {
	for _, cache := range mplex.caches {
		cache.Clean(anchor)
	}
}

func (mplex *cacheMultiplexer) CleanAll() {
	for _, cache := range mplex.caches {
		cache.CleanAll()
	}
}

func (mplex *cacheMultiplexer) Shutdown() {
	for _, cache := range mplex.caches {
		cache.Shutdown()
	}
}