aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/cli/internal/cache/async_cache.go
blob: 0a8f467cc579c72eeba1930ba84adae82fac414f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// Adapted from https://github.com/thought-machine/please
// Copyright Thought Machine, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package cache

import (
	"sync"

	"github.com/vercel/turbo/cli/internal/turbopath"
)

// An asyncCache is a wrapper around a Cache interface that handles incoming
// store requests asynchronously and attempts to return immediately.
// The requests are handled on an internal queue, if that fills up then
// incoming requests will start to block again until it empties.
// Retrieval requests are still handled synchronously.
type asyncCache struct {
	requests  chan cacheRequest
	realCache Cache
	wg        sync.WaitGroup
}

// A cacheRequest models an incoming cache request on our queue.
type cacheRequest struct {
	anchor   turbopath.AbsoluteSystemPath
	key      string
	duration int
	files    []turbopath.AnchoredSystemPath
}

func newAsyncCache(realCache Cache, opts Opts) Cache {
	c := &asyncCache{
		requests:  make(chan cacheRequest),
		realCache: realCache,
	}
	c.wg.Add(opts.Workers)
	for i := 0; i < opts.Workers; i++ {
		go c.run()
	}
	return c
}

func (c *asyncCache) Put(anchor turbopath.AbsoluteSystemPath, key string, duration int, files []turbopath.AnchoredSystemPath) error {
	c.requests <- cacheRequest{
		anchor:   anchor,
		key:      key,
		files:    files,
		duration: duration,
	}
	return nil
}

func (c *asyncCache) Fetch(anchor turbopath.AbsoluteSystemPath, key string, files []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) {
	return c.realCache.Fetch(anchor, key, files)
}

func (c *asyncCache) Exists(key string) ItemStatus {
	return c.realCache.Exists(key)
}

func (c *asyncCache) Clean(anchor turbopath.AbsoluteSystemPath) {
	c.realCache.Clean(anchor)
}

func (c *asyncCache) CleanAll() {
	c.realCache.CleanAll()
}

func (c *asyncCache) Shutdown() {
	// fmt.Println("Shutting down cache workers...")
	close(c.requests)
	c.wg.Wait()
	// fmt.Println("Shut down all cache workers")
}

// run implements the actual async logic.
func (c *asyncCache) run() {
	for r := range c.requests {
		_ = c.realCache.Put(r.anchor, r.key, r.duration, r.files)
	}
	c.wg.Done()
}