diff options
Diffstat (limited to 'cli/internal/globwatcher/globwatcher.go')
| -rw-r--r-- | cli/internal/globwatcher/globwatcher.go | 210 |
1 files changed, 210 insertions, 0 deletions
diff --git a/cli/internal/globwatcher/globwatcher.go b/cli/internal/globwatcher/globwatcher.go new file mode 100644 index 0000000..9226cfa --- /dev/null +++ b/cli/internal/globwatcher/globwatcher.go @@ -0,0 +1,210 @@ +package globwatcher + +import ( + "errors" + "fmt" + "path/filepath" + "sync" + + "github.com/hashicorp/go-hclog" + "github.com/vercel/turbo/cli/internal/doublestar" + "github.com/vercel/turbo/cli/internal/filewatcher" + "github.com/vercel/turbo/cli/internal/fs" + "github.com/vercel/turbo/cli/internal/turbopath" + "github.com/vercel/turbo/cli/internal/util" +) + +// ErrClosed is returned when attempting to get changed globs after glob watching has closed +var ErrClosed = errors.New("glob watching is closed") + +type globs struct { + Inclusions util.Set + Exclusions util.Set +} + +// GlobWatcher is used to track unchanged globs by hash. Once a glob registers a file change +// it is no longer tracked until a new hash requests it. Once all globs for a particular hash +// have changed, that hash is no longer tracked. +type GlobWatcher struct { + logger hclog.Logger + repoRoot turbopath.AbsoluteSystemPath + cookieWaiter filewatcher.CookieWaiter + + mu sync.RWMutex // protects field below + hashGlobs map[string]globs + globStatus map[string]util.Set // glob -> hashes where this glob hasn't changed + + closed bool +} + +// New returns a new GlobWatcher instance +func New(logger hclog.Logger, repoRoot turbopath.AbsoluteSystemPath, cookieWaiter filewatcher.CookieWaiter) *GlobWatcher { + return &GlobWatcher{ + logger: logger, + repoRoot: repoRoot, + cookieWaiter: cookieWaiter, + hashGlobs: make(map[string]globs), + globStatus: make(map[string]util.Set), + } +} + +func (g *GlobWatcher) setClosed() { + g.mu.Lock() + g.closed = true + g.mu.Unlock() +} + +func (g *GlobWatcher) isClosed() bool { + g.mu.RLock() + defer g.mu.RUnlock() + return g.closed +} + +// WatchGlobs registers the given set of globs to be watched for changes and grouped +// under the given hash. This method pairs with GetChangedGlobs to determine which globs +// out of a set of candidates have changed since WatchGlobs was called for the same hash. +func (g *GlobWatcher) WatchGlobs(hash string, globsToWatch fs.TaskOutputs) error { + if g.isClosed() { + return ErrClosed + } + // Wait for a cookie here + // that will ensure that we have seen all filesystem writes + // *by the calling client*. Other tasks _could_ write to the + // same output directories, however we are relying on task + // execution dependencies to prevent that. + if err := g.cookieWaiter.WaitForCookie(); err != nil { + return err + } + g.mu.Lock() + defer g.mu.Unlock() + g.hashGlobs[hash] = globs{ + Inclusions: util.SetFromStrings(globsToWatch.Inclusions), + Exclusions: util.SetFromStrings(globsToWatch.Exclusions), + } + + for _, glob := range globsToWatch.Inclusions { + existing, ok := g.globStatus[glob] + if !ok { + existing = make(util.Set) + } + existing.Add(hash) + g.globStatus[glob] = existing + } + return nil +} + +// GetChangedGlobs returns the subset of the given candidates that we are not currently +// tracking as "unchanged". +func (g *GlobWatcher) GetChangedGlobs(hash string, candidates []string) ([]string, error) { + if g.isClosed() { + // If filewatching has crashed, return all candidates as changed. + return candidates, nil + } + // Wait for a cookie here + // that will ensure that we have seen all filesystem writes + // *by the calling client*. Other tasks _could_ write to the + // same output directories, however we are relying on task + // execution dependencies to prevent that. + if err := g.cookieWaiter.WaitForCookie(); err != nil { + return nil, err + } + // hashGlobs tracks all of the unchanged globs for a given hash + // If hashGlobs doesn't have our hash, either everything has changed, + // or we were never tracking it. Either way, consider all the candidates + // to be changed globs. + g.mu.RLock() + defer g.mu.RUnlock() + globsToCheck, ok := g.hashGlobs[hash] + if !ok { + return candidates, nil + } + allGlobs := util.SetFromStrings(candidates) + diff := allGlobs.Difference(globsToCheck.Inclusions) + + return diff.UnsafeListOfStrings(), nil +} + +// OnFileWatchEvent implements FileWatchClient.OnFileWatchEvent +// On a file change, check if we have a glob that matches this file. Invalidate +// any matching globs, and remove them from the set of unchanged globs for the corresponding +// hashes. If this is the last glob for a hash, remove the hash from being tracked. +func (g *GlobWatcher) OnFileWatchEvent(ev filewatcher.Event) { + // At this point, we don't care what the Op is, any Op represents a change + // that should invalidate matching globs + g.logger.Trace(fmt.Sprintf("Got fsnotify event %v", ev)) + absolutePath := ev.Path + repoRelativePath, err := g.repoRoot.RelativePathString(absolutePath.ToStringDuringMigration()) + if err != nil { + g.logger.Debug(fmt.Sprintf("could not get relative path from %v to %v: %v", g.repoRoot, absolutePath, err)) + return + } + g.mu.Lock() + defer g.mu.Unlock() + for glob, hashStatus := range g.globStatus { + matches, err := doublestar.Match(glob, filepath.ToSlash(repoRelativePath)) + if err != nil { + g.logger.Error(fmt.Sprintf("failed to check path %v against glob %v: %v", repoRelativePath, glob, err)) + continue + } + // If this glob matches, we know that it has changed for every hash that included this glob + // and is not excluded by a hash's exclusion globs. + // So, we can delete this glob from every hash tracking it as well as stop watching this glob. + // To stop watching, we unref each of the directories corresponding to this glob. + if matches { + for hashUntyped := range hashStatus { + hash := hashUntyped.(string) + hashGlobs, ok := g.hashGlobs[hash] + + if !ok { + g.logger.Warn(fmt.Sprintf("failed to find hash %v referenced from glob %v", hash, glob)) + continue + } + + isExcluded := false + // Check if we've excluded this path by going through exclusion globs + for exclusionGlob := range hashGlobs.Exclusions { + matches, err := doublestar.Match(exclusionGlob.(string), filepath.ToSlash(repoRelativePath)) + if err != nil { + g.logger.Error(fmt.Sprintf("failed to check path %v against glob %v: %v", repoRelativePath, glob, err)) + continue + } + + if matches { + isExcluded = true + break + } + } + + // If we have excluded this path, then we skip it + if isExcluded { + continue + } + + // We delete hash from the globStatus entry + g.globStatus[glob].Delete(hash) + + // If we've deleted the last hash for a glob in globStatus, delete the whole glob entry + if len(g.globStatus[glob]) == 0 { + delete(g.globStatus, glob) + } + + hashGlobs.Inclusions.Delete(glob) + // If we've deleted the last glob for a hash, delete the whole hash entry + if hashGlobs.Inclusions.Len() == 0 { + delete(g.hashGlobs, hash) + } + } + } + } +} + +// OnFileWatchError implements FileWatchClient.OnFileWatchError +func (g *GlobWatcher) OnFileWatchError(err error) { + g.logger.Error(fmt.Sprintf("file watching received an error: %v", err)) +} + +// OnFileWatchClosed implements FileWatchClient.OnFileWatchClosed +func (g *GlobWatcher) OnFileWatchClosed() { + g.setClosed() + g.logger.Warn("GlobWatching is closing due to file watching closing") +} |
