aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/cli/internal/globwatcher
diff options
context:
space:
mode:
author简律纯 <hsiangnianian@outlook.com>2023-04-28 01:36:44 +0800
committer简律纯 <hsiangnianian@outlook.com>2023-04-28 01:36:44 +0800
commitdd84b9d64fb98746a230cd24233ff50a562c39c9 (patch)
treeb583261ef00b3afe72ec4d6dacb31e57779a6faf /cli/internal/globwatcher
parent0b46fcd72ac34382387b2bcf9095233efbcc52f4 (diff)
downloadHydroRoll-dd84b9d64fb98746a230cd24233ff50a562c39c9.tar.gz
HydroRoll-dd84b9d64fb98746a230cd24233ff50a562c39c9.zip
Diffstat (limited to 'cli/internal/globwatcher')
-rw-r--r--cli/internal/globwatcher/globwatcher.go210
-rw-r--r--cli/internal/globwatcher/globwatcher_test.go232
2 files changed, 442 insertions, 0 deletions
diff --git a/cli/internal/globwatcher/globwatcher.go b/cli/internal/globwatcher/globwatcher.go
new file mode 100644
index 0000000..9226cfa
--- /dev/null
+++ b/cli/internal/globwatcher/globwatcher.go
@@ -0,0 +1,210 @@
+package globwatcher
+
+import (
+ "errors"
+ "fmt"
+ "path/filepath"
+ "sync"
+
+ "github.com/hashicorp/go-hclog"
+ "github.com/vercel/turbo/cli/internal/doublestar"
+ "github.com/vercel/turbo/cli/internal/filewatcher"
+ "github.com/vercel/turbo/cli/internal/fs"
+ "github.com/vercel/turbo/cli/internal/turbopath"
+ "github.com/vercel/turbo/cli/internal/util"
+)
+
+// ErrClosed is returned when attempting to get changed globs after glob watching has closed
+var ErrClosed = errors.New("glob watching is closed")
+
+type globs struct {
+ Inclusions util.Set
+ Exclusions util.Set
+}
+
+// GlobWatcher is used to track unchanged globs by hash. Once a glob registers a file change
+// it is no longer tracked until a new hash requests it. Once all globs for a particular hash
+// have changed, that hash is no longer tracked.
+type GlobWatcher struct {
+ logger hclog.Logger
+ repoRoot turbopath.AbsoluteSystemPath
+ cookieWaiter filewatcher.CookieWaiter
+
+ mu sync.RWMutex // protects field below
+ hashGlobs map[string]globs
+ globStatus map[string]util.Set // glob -> hashes where this glob hasn't changed
+
+ closed bool
+}
+
+// New returns a new GlobWatcher instance
+func New(logger hclog.Logger, repoRoot turbopath.AbsoluteSystemPath, cookieWaiter filewatcher.CookieWaiter) *GlobWatcher {
+ return &GlobWatcher{
+ logger: logger,
+ repoRoot: repoRoot,
+ cookieWaiter: cookieWaiter,
+ hashGlobs: make(map[string]globs),
+ globStatus: make(map[string]util.Set),
+ }
+}
+
+func (g *GlobWatcher) setClosed() {
+ g.mu.Lock()
+ g.closed = true
+ g.mu.Unlock()
+}
+
+func (g *GlobWatcher) isClosed() bool {
+ g.mu.RLock()
+ defer g.mu.RUnlock()
+ return g.closed
+}
+
+// WatchGlobs registers the given set of globs to be watched for changes and grouped
+// under the given hash. This method pairs with GetChangedGlobs to determine which globs
+// out of a set of candidates have changed since WatchGlobs was called for the same hash.
+func (g *GlobWatcher) WatchGlobs(hash string, globsToWatch fs.TaskOutputs) error {
+ if g.isClosed() {
+ return ErrClosed
+ }
+ // Wait for a cookie here
+ // that will ensure that we have seen all filesystem writes
+ // *by the calling client*. Other tasks _could_ write to the
+ // same output directories, however we are relying on task
+ // execution dependencies to prevent that.
+ if err := g.cookieWaiter.WaitForCookie(); err != nil {
+ return err
+ }
+ g.mu.Lock()
+ defer g.mu.Unlock()
+ g.hashGlobs[hash] = globs{
+ Inclusions: util.SetFromStrings(globsToWatch.Inclusions),
+ Exclusions: util.SetFromStrings(globsToWatch.Exclusions),
+ }
+
+ for _, glob := range globsToWatch.Inclusions {
+ existing, ok := g.globStatus[glob]
+ if !ok {
+ existing = make(util.Set)
+ }
+ existing.Add(hash)
+ g.globStatus[glob] = existing
+ }
+ return nil
+}
+
+// GetChangedGlobs returns the subset of the given candidates that we are not currently
+// tracking as "unchanged".
+func (g *GlobWatcher) GetChangedGlobs(hash string, candidates []string) ([]string, error) {
+ if g.isClosed() {
+ // If filewatching has crashed, return all candidates as changed.
+ return candidates, nil
+ }
+ // Wait for a cookie here
+ // that will ensure that we have seen all filesystem writes
+ // *by the calling client*. Other tasks _could_ write to the
+ // same output directories, however we are relying on task
+ // execution dependencies to prevent that.
+ if err := g.cookieWaiter.WaitForCookie(); err != nil {
+ return nil, err
+ }
+ // hashGlobs tracks all of the unchanged globs for a given hash
+ // If hashGlobs doesn't have our hash, either everything has changed,
+ // or we were never tracking it. Either way, consider all the candidates
+ // to be changed globs.
+ g.mu.RLock()
+ defer g.mu.RUnlock()
+ globsToCheck, ok := g.hashGlobs[hash]
+ if !ok {
+ return candidates, nil
+ }
+ allGlobs := util.SetFromStrings(candidates)
+ diff := allGlobs.Difference(globsToCheck.Inclusions)
+
+ return diff.UnsafeListOfStrings(), nil
+}
+
+// OnFileWatchEvent implements FileWatchClient.OnFileWatchEvent
+// On a file change, check if we have a glob that matches this file. Invalidate
+// any matching globs, and remove them from the set of unchanged globs for the corresponding
+// hashes. If this is the last glob for a hash, remove the hash from being tracked.
+func (g *GlobWatcher) OnFileWatchEvent(ev filewatcher.Event) {
+ // At this point, we don't care what the Op is, any Op represents a change
+ // that should invalidate matching globs
+ g.logger.Trace(fmt.Sprintf("Got fsnotify event %v", ev))
+ absolutePath := ev.Path
+ repoRelativePath, err := g.repoRoot.RelativePathString(absolutePath.ToStringDuringMigration())
+ if err != nil {
+ g.logger.Debug(fmt.Sprintf("could not get relative path from %v to %v: %v", g.repoRoot, absolutePath, err))
+ return
+ }
+ g.mu.Lock()
+ defer g.mu.Unlock()
+ for glob, hashStatus := range g.globStatus {
+ matches, err := doublestar.Match(glob, filepath.ToSlash(repoRelativePath))
+ if err != nil {
+ g.logger.Error(fmt.Sprintf("failed to check path %v against glob %v: %v", repoRelativePath, glob, err))
+ continue
+ }
+ // If this glob matches, we know that it has changed for every hash that included this glob
+ // and is not excluded by a hash's exclusion globs.
+ // So, we can delete this glob from every hash tracking it as well as stop watching this glob.
+ // To stop watching, we unref each of the directories corresponding to this glob.
+ if matches {
+ for hashUntyped := range hashStatus {
+ hash := hashUntyped.(string)
+ hashGlobs, ok := g.hashGlobs[hash]
+
+ if !ok {
+ g.logger.Warn(fmt.Sprintf("failed to find hash %v referenced from glob %v", hash, glob))
+ continue
+ }
+
+ isExcluded := false
+ // Check if we've excluded this path by going through exclusion globs
+ for exclusionGlob := range hashGlobs.Exclusions {
+ matches, err := doublestar.Match(exclusionGlob.(string), filepath.ToSlash(repoRelativePath))
+ if err != nil {
+ g.logger.Error(fmt.Sprintf("failed to check path %v against glob %v: %v", repoRelativePath, glob, err))
+ continue
+ }
+
+ if matches {
+ isExcluded = true
+ break
+ }
+ }
+
+ // If we have excluded this path, then we skip it
+ if isExcluded {
+ continue
+ }
+
+ // We delete hash from the globStatus entry
+ g.globStatus[glob].Delete(hash)
+
+ // If we've deleted the last hash for a glob in globStatus, delete the whole glob entry
+ if len(g.globStatus[glob]) == 0 {
+ delete(g.globStatus, glob)
+ }
+
+ hashGlobs.Inclusions.Delete(glob)
+ // If we've deleted the last glob for a hash, delete the whole hash entry
+ if hashGlobs.Inclusions.Len() == 0 {
+ delete(g.hashGlobs, hash)
+ }
+ }
+ }
+ }
+}
+
+// OnFileWatchError implements FileWatchClient.OnFileWatchError
+func (g *GlobWatcher) OnFileWatchError(err error) {
+ g.logger.Error(fmt.Sprintf("file watching received an error: %v", err))
+}
+
+// OnFileWatchClosed implements FileWatchClient.OnFileWatchClosed
+func (g *GlobWatcher) OnFileWatchClosed() {
+ g.setClosed()
+ g.logger.Warn("GlobWatching is closing due to file watching closing")
+}
diff --git a/cli/internal/globwatcher/globwatcher_test.go b/cli/internal/globwatcher/globwatcher_test.go
new file mode 100644
index 0000000..6fb89a7
--- /dev/null
+++ b/cli/internal/globwatcher/globwatcher_test.go
@@ -0,0 +1,232 @@
+package globwatcher
+
+import (
+ "testing"
+
+ "github.com/hashicorp/go-hclog"
+ "github.com/vercel/turbo/cli/internal/filewatcher"
+ "github.com/vercel/turbo/cli/internal/fs"
+ "github.com/vercel/turbo/cli/internal/turbopath"
+ "gotest.tools/v3/assert"
+)
+
+func setup(t *testing.T, repoRoot turbopath.AbsoluteSystemPath) {
+ // Directory layout:
+ // <repoRoot>/
+ // my-pkg/
+ // irrelevant
+ // dist/
+ // dist-file
+ // distChild/
+ // child-file
+ // .next/
+ // next-file
+ distPath := repoRoot.UntypedJoin("my-pkg", "dist")
+ childFilePath := distPath.UntypedJoin("distChild", "child-file")
+ err := childFilePath.EnsureDir()
+ assert.NilError(t, err, "EnsureDir")
+ f, err := childFilePath.Create()
+ assert.NilError(t, err, "Create")
+ err = f.Close()
+ assert.NilError(t, err, "Close")
+ distFilePath := repoRoot.UntypedJoin("my-pkg", "dist", "dist-file")
+ f, err = distFilePath.Create()
+ assert.NilError(t, err, "Create")
+ err = f.Close()
+ assert.NilError(t, err, "Close")
+ nextFilePath := repoRoot.UntypedJoin("my-pkg", ".next", "next-file")
+ err = nextFilePath.EnsureDir()
+ assert.NilError(t, err, "EnsureDir")
+ f, err = nextFilePath.Create()
+ assert.NilError(t, err, "Create")
+ err = f.Close()
+ assert.NilError(t, err, "Close")
+ irrelevantPath := repoRoot.UntypedJoin("my-pkg", "irrelevant")
+ f, err = irrelevantPath.Create()
+ assert.NilError(t, err, "Create")
+ err = f.Close()
+ assert.NilError(t, err, "Close")
+}
+
+type noopCookieWaiter struct{}
+
+func (*noopCookieWaiter) WaitForCookie() error {
+ return nil
+}
+
+var _noopCookieWaiter = &noopCookieWaiter{}
+
+func TestTrackOutputs(t *testing.T) {
+ logger := hclog.Default()
+
+ repoRootRaw := t.TempDir()
+ repoRoot := fs.AbsoluteSystemPathFromUpstream(repoRootRaw)
+
+ setup(t, repoRoot)
+
+ globWatcher := New(logger, repoRoot, _noopCookieWaiter)
+
+ globs := fs.TaskOutputs{
+ Inclusions: []string{
+ "my-pkg/dist/**",
+ "my-pkg/.next/**",
+ },
+ Exclusions: []string{"my-pkg/.next/cache/**"},
+ }
+
+ hash := "the-hash"
+ err := globWatcher.WatchGlobs(hash, globs)
+ assert.NilError(t, err, "WatchGlobs")
+
+ changed, err := globWatcher.GetChangedGlobs(hash, globs.Inclusions)
+ assert.NilError(t, err, "GetChangedGlobs")
+ assert.Equal(t, 0, len(changed), "Expected no changed paths")
+
+ // Make an irrelevant change
+ globWatcher.OnFileWatchEvent(filewatcher.Event{
+ EventType: filewatcher.FileAdded,
+ Path: repoRoot.UntypedJoin("my-pkg", "irrelevant"),
+ })
+
+ changed, err = globWatcher.GetChangedGlobs(hash, globs.Inclusions)
+ assert.NilError(t, err, "GetChangedGlobs")
+ assert.Equal(t, 0, len(changed), "Expected no changed paths")
+
+ // Make an excluded change
+ globWatcher.OnFileWatchEvent(filewatcher.Event{
+ EventType: filewatcher.FileAdded,
+ Path: repoRoot.Join("my-pkg", ".next", "cache", "foo"),
+ })
+
+ changed, err = globWatcher.GetChangedGlobs(hash, globs.Inclusions)
+ assert.NilError(t, err, "GetChangedGlobs")
+ assert.Equal(t, 0, len(changed), "Expected no changed paths")
+
+ // Make a relevant change
+ globWatcher.OnFileWatchEvent(filewatcher.Event{
+ EventType: filewatcher.FileAdded,
+ Path: repoRoot.UntypedJoin("my-pkg", "dist", "foo"),
+ })
+
+ changed, err = globWatcher.GetChangedGlobs(hash, globs.Inclusions)
+ assert.NilError(t, err, "GetChangedGlobs")
+ assert.Equal(t, 1, len(changed), "Expected one changed path remaining")
+ expected := "my-pkg/dist/**"
+ assert.Equal(t, expected, changed[0], "Expected dist glob to have changed")
+
+ // Change a file matching the other glob
+ globWatcher.OnFileWatchEvent(filewatcher.Event{
+ EventType: filewatcher.FileAdded,
+ Path: repoRoot.UntypedJoin("my-pkg", ".next", "foo"),
+ })
+ // We should no longer be watching anything, since both globs have
+ // registered changes
+ if len(globWatcher.hashGlobs) != 0 {
+ t.Errorf("expected to not track any hashes, found %v", globWatcher.hashGlobs)
+ }
+
+ // Both globs have changed, we should have stopped tracking
+ // this hash
+ changed, err = globWatcher.GetChangedGlobs(hash, globs.Inclusions)
+ assert.NilError(t, err, "GetChangedGlobs")
+ assert.DeepEqual(t, globs.Inclusions, changed)
+}
+
+func TestTrackMultipleHashes(t *testing.T) {
+ logger := hclog.Default()
+
+ repoRootRaw := t.TempDir()
+ repoRoot := fs.AbsoluteSystemPathFromUpstream(repoRootRaw)
+
+ setup(t, repoRoot)
+
+ globWatcher := New(logger, repoRoot, _noopCookieWaiter)
+
+ globs := fs.TaskOutputs{
+ Inclusions: []string{
+ "my-pkg/dist/**",
+ "my-pkg/.next/**",
+ },
+ }
+
+ hash := "the-hash"
+ err := globWatcher.WatchGlobs(hash, globs)
+ assert.NilError(t, err, "WatchGlobs")
+
+ secondGlobs := fs.TaskOutputs{
+ Inclusions: []string{
+ "my-pkg/.next/**",
+ },
+ Exclusions: []string{"my-pkg/.next/cache/**"},
+ }
+
+ secondHash := "the-second-hash"
+ err = globWatcher.WatchGlobs(secondHash, secondGlobs)
+ assert.NilError(t, err, "WatchGlobs")
+
+ changed, err := globWatcher.GetChangedGlobs(hash, globs.Inclusions)
+ assert.NilError(t, err, "GetChangedGlobs")
+ assert.Equal(t, 0, len(changed), "Expected no changed paths")
+
+ changed, err = globWatcher.GetChangedGlobs(secondHash, secondGlobs.Inclusions)
+ assert.NilError(t, err, "GetChangedGlobs")
+ assert.Equal(t, 0, len(changed), "Expected no changed paths")
+
+ // Make a change that is excluded in one of the hashes but not in the other
+ globWatcher.OnFileWatchEvent(filewatcher.Event{
+ EventType: filewatcher.FileAdded,
+ Path: repoRoot.UntypedJoin("my-pkg", ".next", "cache", "foo"),
+ })
+
+ changed, err = globWatcher.GetChangedGlobs(hash, globs.Inclusions)
+ assert.NilError(t, err, "GetChangedGlobs")
+ assert.Equal(t, 1, len(changed), "Expected one changed path remaining")
+
+ changed, err = globWatcher.GetChangedGlobs(secondHash, secondGlobs.Inclusions)
+ assert.NilError(t, err, "GetChangedGlobs")
+ assert.Equal(t, 0, len(changed), "Expected no changed paths")
+
+ assert.Equal(t, 1, len(globWatcher.globStatus["my-pkg/.next/**"]), "Expected to be still watching `my-pkg/.next/**`")
+
+ // Make a change for secondHash
+ globWatcher.OnFileWatchEvent(filewatcher.Event{
+ EventType: filewatcher.FileAdded,
+ Path: repoRoot.UntypedJoin("my-pkg", ".next", "bar"),
+ })
+
+ assert.Equal(t, 0, len(globWatcher.globStatus["my-pkg/.next/**"]), "Expected to be no longer watching `my-pkg/.next/**`")
+}
+
+func TestWatchSingleFile(t *testing.T) {
+ logger := hclog.Default()
+
+ repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir())
+
+ setup(t, repoRoot)
+
+ //watcher := newTestWatcher()
+ globWatcher := New(logger, repoRoot, _noopCookieWaiter)
+ globs := fs.TaskOutputs{
+ Inclusions: []string{"my-pkg/.next/next-file"},
+ Exclusions: []string{},
+ }
+ hash := "the-hash"
+ err := globWatcher.WatchGlobs(hash, globs)
+ assert.NilError(t, err, "WatchGlobs")
+
+ assert.Equal(t, 1, len(globWatcher.hashGlobs))
+
+ // A change to an irrelevant file
+ globWatcher.OnFileWatchEvent(filewatcher.Event{
+ EventType: filewatcher.FileAdded,
+ Path: repoRoot.UntypedJoin("my-pkg", ".next", "foo"),
+ })
+ assert.Equal(t, 1, len(globWatcher.hashGlobs))
+
+ // Change the watched file
+ globWatcher.OnFileWatchEvent(filewatcher.Event{
+ EventType: filewatcher.FileAdded,
+ Path: repoRoot.UntypedJoin("my-pkg", ".next", "next-file"),
+ })
+ assert.Equal(t, 0, len(globWatcher.hashGlobs))
+}