aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/cli/internal/globwatcher/globwatcher.go
diff options
context:
space:
mode:
author简律纯 <hsiangnianian@outlook.com>2023-04-28 01:36:44 +0800
committer简律纯 <hsiangnianian@outlook.com>2023-04-28 01:36:44 +0800
commitdd84b9d64fb98746a230cd24233ff50a562c39c9 (patch)
treeb583261ef00b3afe72ec4d6dacb31e57779a6faf /cli/internal/globwatcher/globwatcher.go
parent0b46fcd72ac34382387b2bcf9095233efbcc52f4 (diff)
downloadHydroRoll-dd84b9d64fb98746a230cd24233ff50a562c39c9.tar.gz
HydroRoll-dd84b9d64fb98746a230cd24233ff50a562c39c9.zip
Diffstat (limited to 'cli/internal/globwatcher/globwatcher.go')
-rw-r--r--cli/internal/globwatcher/globwatcher.go210
1 files changed, 210 insertions, 0 deletions
diff --git a/cli/internal/globwatcher/globwatcher.go b/cli/internal/globwatcher/globwatcher.go
new file mode 100644
index 0000000..9226cfa
--- /dev/null
+++ b/cli/internal/globwatcher/globwatcher.go
@@ -0,0 +1,210 @@
+package globwatcher
+
+import (
+ "errors"
+ "fmt"
+ "path/filepath"
+ "sync"
+
+ "github.com/hashicorp/go-hclog"
+ "github.com/vercel/turbo/cli/internal/doublestar"
+ "github.com/vercel/turbo/cli/internal/filewatcher"
+ "github.com/vercel/turbo/cli/internal/fs"
+ "github.com/vercel/turbo/cli/internal/turbopath"
+ "github.com/vercel/turbo/cli/internal/util"
+)
+
+// ErrClosed is returned when attempting to get changed globs after glob watching has closed
+var ErrClosed = errors.New("glob watching is closed")
+
+type globs struct {
+ Inclusions util.Set
+ Exclusions util.Set
+}
+
+// GlobWatcher is used to track unchanged globs by hash. Once a glob registers a file change
+// it is no longer tracked until a new hash requests it. Once all globs for a particular hash
+// have changed, that hash is no longer tracked.
+type GlobWatcher struct {
+ logger hclog.Logger
+ repoRoot turbopath.AbsoluteSystemPath
+ cookieWaiter filewatcher.CookieWaiter
+
+ mu sync.RWMutex // protects field below
+ hashGlobs map[string]globs
+ globStatus map[string]util.Set // glob -> hashes where this glob hasn't changed
+
+ closed bool
+}
+
+// New returns a new GlobWatcher instance
+func New(logger hclog.Logger, repoRoot turbopath.AbsoluteSystemPath, cookieWaiter filewatcher.CookieWaiter) *GlobWatcher {
+ return &GlobWatcher{
+ logger: logger,
+ repoRoot: repoRoot,
+ cookieWaiter: cookieWaiter,
+ hashGlobs: make(map[string]globs),
+ globStatus: make(map[string]util.Set),
+ }
+}
+
+func (g *GlobWatcher) setClosed() {
+ g.mu.Lock()
+ g.closed = true
+ g.mu.Unlock()
+}
+
+func (g *GlobWatcher) isClosed() bool {
+ g.mu.RLock()
+ defer g.mu.RUnlock()
+ return g.closed
+}
+
+// WatchGlobs registers the given set of globs to be watched for changes and grouped
+// under the given hash. This method pairs with GetChangedGlobs to determine which globs
+// out of a set of candidates have changed since WatchGlobs was called for the same hash.
+func (g *GlobWatcher) WatchGlobs(hash string, globsToWatch fs.TaskOutputs) error {
+ if g.isClosed() {
+ return ErrClosed
+ }
+ // Wait for a cookie here
+ // that will ensure that we have seen all filesystem writes
+ // *by the calling client*. Other tasks _could_ write to the
+ // same output directories, however we are relying on task
+ // execution dependencies to prevent that.
+ if err := g.cookieWaiter.WaitForCookie(); err != nil {
+ return err
+ }
+ g.mu.Lock()
+ defer g.mu.Unlock()
+ g.hashGlobs[hash] = globs{
+ Inclusions: util.SetFromStrings(globsToWatch.Inclusions),
+ Exclusions: util.SetFromStrings(globsToWatch.Exclusions),
+ }
+
+ for _, glob := range globsToWatch.Inclusions {
+ existing, ok := g.globStatus[glob]
+ if !ok {
+ existing = make(util.Set)
+ }
+ existing.Add(hash)
+ g.globStatus[glob] = existing
+ }
+ return nil
+}
+
+// GetChangedGlobs returns the subset of the given candidates that we are not currently
+// tracking as "unchanged".
+func (g *GlobWatcher) GetChangedGlobs(hash string, candidates []string) ([]string, error) {
+ if g.isClosed() {
+ // If filewatching has crashed, return all candidates as changed.
+ return candidates, nil
+ }
+ // Wait for a cookie here
+ // that will ensure that we have seen all filesystem writes
+ // *by the calling client*. Other tasks _could_ write to the
+ // same output directories, however we are relying on task
+ // execution dependencies to prevent that.
+ if err := g.cookieWaiter.WaitForCookie(); err != nil {
+ return nil, err
+ }
+ // hashGlobs tracks all of the unchanged globs for a given hash
+ // If hashGlobs doesn't have our hash, either everything has changed,
+ // or we were never tracking it. Either way, consider all the candidates
+ // to be changed globs.
+ g.mu.RLock()
+ defer g.mu.RUnlock()
+ globsToCheck, ok := g.hashGlobs[hash]
+ if !ok {
+ return candidates, nil
+ }
+ allGlobs := util.SetFromStrings(candidates)
+ diff := allGlobs.Difference(globsToCheck.Inclusions)
+
+ return diff.UnsafeListOfStrings(), nil
+}
+
+// OnFileWatchEvent implements FileWatchClient.OnFileWatchEvent
+// On a file change, check if we have a glob that matches this file. Invalidate
+// any matching globs, and remove them from the set of unchanged globs for the corresponding
+// hashes. If this is the last glob for a hash, remove the hash from being tracked.
+func (g *GlobWatcher) OnFileWatchEvent(ev filewatcher.Event) {
+ // At this point, we don't care what the Op is, any Op represents a change
+ // that should invalidate matching globs
+ g.logger.Trace(fmt.Sprintf("Got fsnotify event %v", ev))
+ absolutePath := ev.Path
+ repoRelativePath, err := g.repoRoot.RelativePathString(absolutePath.ToStringDuringMigration())
+ if err != nil {
+ g.logger.Debug(fmt.Sprintf("could not get relative path from %v to %v: %v", g.repoRoot, absolutePath, err))
+ return
+ }
+ g.mu.Lock()
+ defer g.mu.Unlock()
+ for glob, hashStatus := range g.globStatus {
+ matches, err := doublestar.Match(glob, filepath.ToSlash(repoRelativePath))
+ if err != nil {
+ g.logger.Error(fmt.Sprintf("failed to check path %v against glob %v: %v", repoRelativePath, glob, err))
+ continue
+ }
+ // If this glob matches, we know that it has changed for every hash that included this glob
+ // and is not excluded by a hash's exclusion globs.
+ // So, we can delete this glob from every hash tracking it as well as stop watching this glob.
+ // To stop watching, we unref each of the directories corresponding to this glob.
+ if matches {
+ for hashUntyped := range hashStatus {
+ hash := hashUntyped.(string)
+ hashGlobs, ok := g.hashGlobs[hash]
+
+ if !ok {
+ g.logger.Warn(fmt.Sprintf("failed to find hash %v referenced from glob %v", hash, glob))
+ continue
+ }
+
+ isExcluded := false
+ // Check if we've excluded this path by going through exclusion globs
+ for exclusionGlob := range hashGlobs.Exclusions {
+ matches, err := doublestar.Match(exclusionGlob.(string), filepath.ToSlash(repoRelativePath))
+ if err != nil {
+ g.logger.Error(fmt.Sprintf("failed to check path %v against glob %v: %v", repoRelativePath, glob, err))
+ continue
+ }
+
+ if matches {
+ isExcluded = true
+ break
+ }
+ }
+
+ // If we have excluded this path, then we skip it
+ if isExcluded {
+ continue
+ }
+
+ // We delete hash from the globStatus entry
+ g.globStatus[glob].Delete(hash)
+
+ // If we've deleted the last hash for a glob in globStatus, delete the whole glob entry
+ if len(g.globStatus[glob]) == 0 {
+ delete(g.globStatus, glob)
+ }
+
+ hashGlobs.Inclusions.Delete(glob)
+ // If we've deleted the last glob for a hash, delete the whole hash entry
+ if hashGlobs.Inclusions.Len() == 0 {
+ delete(g.hashGlobs, hash)
+ }
+ }
+ }
+ }
+}
+
+// OnFileWatchError implements FileWatchClient.OnFileWatchError
+func (g *GlobWatcher) OnFileWatchError(err error) {
+ g.logger.Error(fmt.Sprintf("file watching received an error: %v", err))
+}
+
+// OnFileWatchClosed implements FileWatchClient.OnFileWatchClosed
+func (g *GlobWatcher) OnFileWatchClosed() {
+ g.setClosed()
+ g.logger.Warn("GlobWatching is closing due to file watching closing")
+}