aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/cli/internal/cache/async_cache.go
diff options
context:
space:
mode:
author简律纯 <hsiangnianian@outlook.com>2023-04-28 01:36:44 +0800
committer简律纯 <hsiangnianian@outlook.com>2023-04-28 01:36:44 +0800
commitdd84b9d64fb98746a230cd24233ff50a562c39c9 (patch)
treeb583261ef00b3afe72ec4d6dacb31e57779a6faf /cli/internal/cache/async_cache.go
parent0b46fcd72ac34382387b2bcf9095233efbcc52f4 (diff)
downloadHydroRoll-dd84b9d64fb98746a230cd24233ff50a562c39c9.tar.gz
HydroRoll-dd84b9d64fb98746a230cd24233ff50a562c39c9.zip
Diffstat (limited to 'cli/internal/cache/async_cache.go')
-rw-r--r--cli/internal/cache/async_cache.go82
1 files changed, 82 insertions, 0 deletions
diff --git a/cli/internal/cache/async_cache.go b/cli/internal/cache/async_cache.go
new file mode 100644
index 0000000..0a8f467
--- /dev/null
+++ b/cli/internal/cache/async_cache.go
@@ -0,0 +1,82 @@
+// Adapted from https://github.com/thought-machine/please
+// Copyright Thought Machine, Inc. or its affiliates. All Rights Reserved.
+// SPDX-License-Identifier: Apache-2.0
+package cache
+
+import (
+ "sync"
+
+ "github.com/vercel/turbo/cli/internal/turbopath"
+)
+
+// An asyncCache is a wrapper around a Cache interface that handles incoming
+// store requests asynchronously and attempts to return immediately.
+// The requests are handled on an internal queue, if that fills up then
+// incoming requests will start to block again until it empties.
+// Retrieval requests are still handled synchronously.
+type asyncCache struct {
+ requests chan cacheRequest
+ realCache Cache
+ wg sync.WaitGroup
+}
+
+// A cacheRequest models an incoming cache request on our queue.
+type cacheRequest struct {
+ anchor turbopath.AbsoluteSystemPath
+ key string
+ duration int
+ files []turbopath.AnchoredSystemPath
+}
+
+func newAsyncCache(realCache Cache, opts Opts) Cache {
+ c := &asyncCache{
+ requests: make(chan cacheRequest),
+ realCache: realCache,
+ }
+ c.wg.Add(opts.Workers)
+ for i := 0; i < opts.Workers; i++ {
+ go c.run()
+ }
+ return c
+}
+
+func (c *asyncCache) Put(anchor turbopath.AbsoluteSystemPath, key string, duration int, files []turbopath.AnchoredSystemPath) error {
+ c.requests <- cacheRequest{
+ anchor: anchor,
+ key: key,
+ files: files,
+ duration: duration,
+ }
+ return nil
+}
+
+func (c *asyncCache) Fetch(anchor turbopath.AbsoluteSystemPath, key string, files []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) {
+ return c.realCache.Fetch(anchor, key, files)
+}
+
+func (c *asyncCache) Exists(key string) ItemStatus {
+ return c.realCache.Exists(key)
+}
+
+func (c *asyncCache) Clean(anchor turbopath.AbsoluteSystemPath) {
+ c.realCache.Clean(anchor)
+}
+
+func (c *asyncCache) CleanAll() {
+ c.realCache.CleanAll()
+}
+
+func (c *asyncCache) Shutdown() {
+ // fmt.Println("Shutting down cache workers...")
+ close(c.requests)
+ c.wg.Wait()
+ // fmt.Println("Shut down all cache workers")
+}
+
+// run implements the actual async logic.
+func (c *asyncCache) run() {
+ for r := range c.requests {
+ _ = c.realCache.Put(r.anchor, r.key, r.duration, r.files)
+ }
+ c.wg.Done()
+}