diff options
Diffstat (limited to 'cli/internal/filewatcher')
| -rw-r--r-- | cli/internal/filewatcher/backend.go | 209 | ||||
| -rw-r--r-- | cli/internal/filewatcher/backend_darwin.go | 220 | ||||
| -rw-r--r-- | cli/internal/filewatcher/cookie.go | 160 | ||||
| -rw-r--r-- | cli/internal/filewatcher/cookie_test.go | 130 | ||||
| -rw-r--r-- | cli/internal/filewatcher/filewatcher.go | 167 | ||||
| -rw-r--r-- | cli/internal/filewatcher/filewatcher_test.go | 152 |
6 files changed, 1038 insertions, 0 deletions
diff --git a/cli/internal/filewatcher/backend.go b/cli/internal/filewatcher/backend.go new file mode 100644 index 0000000..b8b7fa8 --- /dev/null +++ b/cli/internal/filewatcher/backend.go @@ -0,0 +1,209 @@ +//go:build !darwin +// +build !darwin + +package filewatcher + +import ( + "fmt" + "os" + "path/filepath" + "sync" + + "github.com/fsnotify/fsnotify" + "github.com/hashicorp/go-hclog" + "github.com/karrick/godirwalk" + "github.com/pkg/errors" + "github.com/vercel/turbo/cli/internal/doublestar" + "github.com/vercel/turbo/cli/internal/fs" + "github.com/vercel/turbo/cli/internal/turbopath" +) + +// watchAddMode is used to indicate whether watchRecursively should synthesize events +// for existing files. +type watchAddMode int + +const ( + dontSynthesizeEvents watchAddMode = iota + synthesizeEvents +) + +type fsNotifyBackend struct { + watcher *fsnotify.Watcher + events chan Event + errors chan error + logger hclog.Logger + + mu sync.Mutex + allExcludes []string + closed bool +} + +func (f *fsNotifyBackend) Events() <-chan Event { + return f.events +} + +func (f *fsNotifyBackend) Errors() <-chan error { + return f.errors +} + +func (f *fsNotifyBackend) Close() error { + f.mu.Lock() + defer f.mu.Unlock() + if f.closed { + return ErrFilewatchingClosed + } + f.closed = true + close(f.events) + close(f.errors) + if err := f.watcher.Close(); err != nil { + return err + } + return nil +} + +// onFileAdded helps up paper over cross-platform inconsistencies in fsnotify. +// Some fsnotify backends automatically add the contents of directories. Some do +// not. Adding a watch is idempotent, so anytime any file we care about gets added, +// watch it. +func (f *fsNotifyBackend) onFileAdded(name turbopath.AbsoluteSystemPath) error { + info, err := name.Lstat() + if err != nil { + if errors.Is(err, os.ErrNotExist) { + // We can race with a file being added and removed. Ignore it + return nil + } + return errors.Wrapf(err, "error checking lstat of new file %v", name) + } + if info.IsDir() { + // If a directory has been added, we need to synthesize events for everything it contains + if err := f.watchRecursively(name, []string{}, synthesizeEvents); err != nil { + return errors.Wrapf(err, "failed recursive watch of %v", name) + } + } else { + if err := f.watcher.Add(name.ToString()); err != nil { + return errors.Wrapf(err, "failed adding watch to %v", name) + } + } + return nil +} + +func (f *fsNotifyBackend) watchRecursively(root turbopath.AbsoluteSystemPath, excludePatterns []string, addMode watchAddMode) error { + f.mu.Lock() + defer f.mu.Unlock() + err := fs.WalkMode(root.ToString(), func(name string, isDir bool, info os.FileMode) error { + for _, excludePattern := range excludePatterns { + excluded, err := doublestar.Match(excludePattern, filepath.ToSlash(name)) + if err != nil { + return err + } + if excluded { + return godirwalk.SkipThis + } + } + if info.IsDir() && (info&os.ModeSymlink == 0) { + if err := f.watcher.Add(name); err != nil { + return errors.Wrapf(err, "failed adding watch to %v", name) + } + f.logger.Debug(fmt.Sprintf("watching directory %v", name)) + } + if addMode == synthesizeEvents { + f.events <- Event{ + Path: fs.AbsoluteSystemPathFromUpstream(name), + EventType: FileAdded, + } + } + return nil + }) + if err != nil { + return err + } + f.allExcludes = append(f.allExcludes, excludePatterns...) + + return nil +} + +func (f *fsNotifyBackend) watch() { +outer: + for { + select { + case ev, ok := <-f.watcher.Events: + if !ok { + break outer + } + eventType := toFileEvent(ev.Op) + path := fs.AbsoluteSystemPathFromUpstream(ev.Name) + if eventType == FileAdded { + if err := f.onFileAdded(path); err != nil { + f.errors <- err + } + } + f.events <- Event{ + Path: path, + EventType: eventType, + } + case err, ok := <-f.watcher.Errors: + if !ok { + break outer + } + f.errors <- err + } + } +} + +var _modifiedMask = fsnotify.Chmod | fsnotify.Write + +func toFileEvent(op fsnotify.Op) FileEvent { + if op&fsnotify.Create != 0 { + return FileAdded + } else if op&fsnotify.Remove != 0 { + return FileDeleted + } else if op&_modifiedMask != 0 { + return FileModified + } else if op&fsnotify.Rename != 0 { + return FileRenamed + } + return FileOther +} + +func (f *fsNotifyBackend) Start() error { + f.mu.Lock() + defer f.mu.Unlock() + if f.closed { + return ErrFilewatchingClosed + } + for _, dir := range f.watcher.WatchList() { + for _, excludePattern := range f.allExcludes { + excluded, err := doublestar.Match(excludePattern, filepath.ToSlash(dir)) + if err != nil { + return err + } + if excluded { + if err := f.watcher.Remove(dir); err != nil { + return err + } + } + } + } + go f.watch() + return nil +} + +func (f *fsNotifyBackend) AddRoot(root turbopath.AbsoluteSystemPath, excludePatterns ...string) error { + // We don't synthesize events for the initial watch + return f.watchRecursively(root, excludePatterns, dontSynthesizeEvents) +} + +// GetPlatformSpecificBackend returns a filewatching backend appropriate for the OS we are +// running on. +func GetPlatformSpecificBackend(logger hclog.Logger) (Backend, error) { + watcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + return &fsNotifyBackend{ + watcher: watcher, + events: make(chan Event), + errors: make(chan error), + logger: logger.Named("fsnotify"), + }, nil +} diff --git a/cli/internal/filewatcher/backend_darwin.go b/cli/internal/filewatcher/backend_darwin.go new file mode 100644 index 0000000..4c029c4 --- /dev/null +++ b/cli/internal/filewatcher/backend_darwin.go @@ -0,0 +1,220 @@ +//go:build darwin +// +build darwin + +package filewatcher + +import ( + "fmt" + "strings" + "sync" + "time" + + "github.com/pkg/errors" + "github.com/yookoala/realpath" + + "github.com/fsnotify/fsevents" + "github.com/hashicorp/go-hclog" + "github.com/vercel/turbo/cli/internal/doublestar" + "github.com/vercel/turbo/cli/internal/fs" + "github.com/vercel/turbo/cli/internal/turbopath" +) + +type fseventsBackend struct { + events chan Event + errors chan error + logger hclog.Logger + mu sync.Mutex + streams []*fsevents.EventStream + closed bool +} + +func (f *fseventsBackend) Events() <-chan Event { + return f.events +} + +func (f *fseventsBackend) Errors() <-chan error { + return f.errors +} + +func (f *fseventsBackend) Close() error { + f.mu.Lock() + defer f.mu.Unlock() + if f.closed { + return ErrFilewatchingClosed + } + f.closed = true + for _, stream := range f.streams { + stream.Stop() + } + close(f.events) + close(f.errors) + return nil +} + +func (f *fseventsBackend) Start() error { + return nil +} + +var ( + _eventLatency = 10 * time.Millisecond + _cookieTimeout = 500 * time.Millisecond +) + +// AddRoot starts watching a new directory hierarchy. Events matching the provided excludePatterns +// will not be forwarded. +func (f *fseventsBackend) AddRoot(someRoot turbopath.AbsoluteSystemPath, excludePatterns ...string) error { + // We need to resolve the real path to the hierarchy that we are going to watch + realRoot, err := realpath.Realpath(someRoot.ToString()) + if err != nil { + return err + } + root := fs.AbsoluteSystemPathFromUpstream(realRoot) + dev, err := fsevents.DeviceForPath(root.ToString()) + if err != nil { + return err + } + + // Optimistically set up and start a stream, assuming the watch is still valid. + s := &fsevents.EventStream{ + Paths: []string{root.ToString()}, + Latency: _eventLatency, + Device: dev, + Flags: fsevents.FileEvents | fsevents.WatchRoot, + } + s.Start() + events := s.Events + + // fsevents delivers events for all existing files first, so use a cookie to detect when we're ready for new events + if err := waitForCookie(root, events, _cookieTimeout); err != nil { + s.Stop() + return err + } + + // Now try to persist the stream. + f.mu.Lock() + defer f.mu.Unlock() + if f.closed { + s.Stop() + return ErrFilewatchingClosed + } + f.streams = append(f.streams, s) + f.logger.Debug(fmt.Sprintf("watching root %v, excluding %v", root, excludePatterns)) + + go func() { + for evs := range events { + for _, ev := range evs { + isExcluded := false + + // 1. Ensure that we have a `/`-prefixed path from the event. + var eventPath string + if !strings.HasPrefix("/", ev.Path) { + eventPath = "/" + ev.Path + } else { + eventPath = ev.Path + } + + // 2. We're getting events from the real path, but we need to translate + // back to the path we were provided since that's what the caller will + // expect in terms of event paths. + watchRootRelativePath := eventPath[len(realRoot):] + processedEventPath := someRoot.UntypedJoin(watchRootRelativePath) + + // 3. Compare the event to all exclude patterns, short-circuit if we know + // we are not watching this file. + processedPathString := processedEventPath.ToString() // loop invariant + for _, pattern := range excludePatterns { + matches, err := doublestar.Match(pattern, processedPathString) + if err != nil { + f.errors <- err + } else if matches { + isExcluded = true + break + } + } + + // 4. Report the file events we care about. + if !isExcluded { + f.events <- Event{ + Path: processedEventPath, + EventType: toFileEvent(ev.Flags), + } + } + } + } + }() + + return nil +} + +func waitForCookie(root turbopath.AbsoluteSystemPath, events <-chan []fsevents.Event, timeout time.Duration) error { + // This cookie needs to be in a location that we're watching, and at this point we can't guarantee + // what the root is, or if something like "node_modules/.cache/turbo" would make sense. As a compromise, ensure + // that we clean it up even in the event of a failure. + cookiePath := root.UntypedJoin(".turbo-cookie") + if err := cookiePath.WriteFile([]byte("cookie"), 0755); err != nil { + return err + } + expected := cookiePath.ToString()[1:] // trim leading slash + if err := waitForEvent(events, expected, fsevents.ItemCreated, timeout); err != nil { + // Attempt to not leave the cookie file lying around. + // Ignore the error, since there's not much we can do with it. + _ = cookiePath.Remove() + return err + } + if err := cookiePath.Remove(); err != nil { + return err + } + if err := waitForEvent(events, expected, fsevents.ItemRemoved, timeout); err != nil { + return err + } + return nil +} + +func waitForEvent(events <-chan []fsevents.Event, path string, flag fsevents.EventFlags, timeout time.Duration) error { + ch := make(chan struct{}) + go func() { + for evs := range events { + for _, ev := range evs { + if ev.Path == path && ev.Flags&flag != 0 { + close(ch) + return + } + } + } + }() + select { + case <-time.After(timeout): + return errors.Wrap(ErrFailedToStart, "timed out waiting for initial fsevents cookie") + case <-ch: + return nil + } +} + +var _modifiedMask = fsevents.ItemModified | fsevents.ItemInodeMetaMod | fsevents.ItemFinderInfoMod | fsevents.ItemChangeOwner | fsevents.ItemXattrMod + +func toFileEvent(flags fsevents.EventFlags) FileEvent { + if flags&fsevents.ItemCreated != 0 { + return FileAdded + } else if flags&fsevents.ItemRemoved != 0 { + return FileDeleted + } else if flags&_modifiedMask != 0 { + return FileModified + } else if flags&fsevents.ItemRenamed != 0 { + return FileRenamed + } else if flags&fsevents.RootChanged != 0 { + // count this as a delete, something affected the path to the root + // of the stream + return FileDeleted + } + return FileOther +} + +// GetPlatformSpecificBackend returns a filewatching backend appropriate for the OS we are +// running on. +func GetPlatformSpecificBackend(logger hclog.Logger) (Backend, error) { + return &fseventsBackend{ + events: make(chan Event), + errors: make(chan error), + logger: logger.Named("fsevents"), + }, nil +} diff --git a/cli/internal/filewatcher/cookie.go b/cli/internal/filewatcher/cookie.go new file mode 100644 index 0000000..7a4931e --- /dev/null +++ b/cli/internal/filewatcher/cookie.go @@ -0,0 +1,160 @@ +package filewatcher + +import ( + "fmt" + "os" + "sync" + "sync/atomic" + "time" + + "github.com/pkg/errors" + "github.com/vercel/turbo/cli/internal/fs" + "github.com/vercel/turbo/cli/internal/turbopath" +) + +// CookieWaiter is the interface used by clients that need to wait +// for a roundtrip through the filewatching API. +type CookieWaiter interface { + WaitForCookie() error +} + +var ( + // ErrCookieTimeout is returned when we did not see our cookie file within the given time constraints + ErrCookieTimeout = errors.New("timed out waiting for cookie") + // ErrCookieWatchingClosed is returned when the underlying filewatching has been closed. + ErrCookieWatchingClosed = errors.New("filewatching has closed, cannot watch cookies") +) + +// CookieJar is used for tracking roundtrips through the filesystem watching API +type CookieJar struct { + timeout time.Duration + dir turbopath.AbsoluteSystemPath + serial uint64 + mu sync.Mutex + cookies map[turbopath.AbsoluteSystemPath]chan error + closed bool +} + +// NewCookieJar returns a new instance of a CookieJar. There should only ever be a single +// instance live per cookieDir, since they expect to have full control over that directory. +func NewCookieJar(cookieDir turbopath.AbsoluteSystemPath, timeout time.Duration) (*CookieJar, error) { + if err := cookieDir.RemoveAll(); err != nil { + return nil, err + } + if err := cookieDir.MkdirAll(0775); err != nil { + return nil, err + } + return &CookieJar{ + timeout: timeout, + dir: cookieDir, + cookies: make(map[turbopath.AbsoluteSystemPath]chan error), + }, nil +} + +// removeAllCookiesWithError sends the error to every channel, closes every channel, +// and attempts to remove every cookie file. Must be called while the cj.mu is held. +// If the cookie jar is going to be reused afterwards, the cookies map must be reinitialized. +func (cj *CookieJar) removeAllCookiesWithError(err error) { + for p, ch := range cj.cookies { + _ = p.Remove() + ch <- err + close(ch) + } + // Drop all of the references so they can be cleaned up + cj.cookies = nil +} + +// OnFileWatchClosed handles the case where filewatching had to close for some reason +// We send an error to all of our cookies and stop accepting new ones. +func (cj *CookieJar) OnFileWatchClosed() { + cj.mu.Lock() + defer cj.mu.Unlock() + cj.closed = true + cj.removeAllCookiesWithError(ErrCookieWatchingClosed) + +} + +// OnFileWatchError handles when filewatching has encountered an error. +// In the error case, we remove all cookies and send them errors. We remain +// available for later cookies. +func (cj *CookieJar) OnFileWatchError(err error) { + // We are now in an inconsistent state. Drop all of our cookies, + // but we still allow new ones to be created + cj.mu.Lock() + defer cj.mu.Unlock() + cj.removeAllCookiesWithError(err) + cj.cookies = make(map[turbopath.AbsoluteSystemPath]chan error) +} + +// OnFileWatchEvent determines if the specified event is relevant +// for cookie watching and notifies the appropriate cookie if so. +func (cj *CookieJar) OnFileWatchEvent(ev Event) { + if ev.EventType == FileAdded { + isCookie, err := fs.DirContainsPath(cj.dir.ToStringDuringMigration(), ev.Path.ToStringDuringMigration()) + if err != nil { + cj.OnFileWatchError(errors.Wrapf(err, "failed to determine if path is a cookie: %v", ev.Path)) + } else if isCookie { + cj.notifyCookie(ev.Path, nil) + } + } +} + +// WaitForCookie touches a unique file, then waits for it to show up in filesystem notifications. +// This provides a theoretical bound on filesystem operations, although it's possible +// that underlying filewatch mechanisms don't respect this ordering. +func (cj *CookieJar) WaitForCookie() error { + // we're only ever going to send a single error on the channel, add a buffer so that we never + // block sending it. + ch := make(chan error, 1) + serial := atomic.AddUint64(&cj.serial, 1) + cookiePath := cj.dir.UntypedJoin(fmt.Sprintf("%v.cookie", serial)) + cj.mu.Lock() + if cj.closed { + cj.mu.Unlock() + return ErrCookieWatchingClosed + } + cj.cookies[cookiePath] = ch + cj.mu.Unlock() + if err := touchCookieFile(cookiePath); err != nil { + cj.notifyCookie(cookiePath, err) + return err + } + select { + case <-time.After(cj.timeout): + return ErrCookieTimeout + case err, ok := <-ch: + if !ok { + // the channel closed without an error, we're all set + return nil + } + // the channel didn't close, meaning we got some error. + // We don't need to wait on channel close, it's going to be closed + // immediately by whoever sent the error. Return the error directly + return err + } +} + +func (cj *CookieJar) notifyCookie(cookie turbopath.AbsoluteSystemPath, err error) { + cj.mu.Lock() + ch, ok := cj.cookies[cookie] + // delete is a no-op if the key doesn't exist + delete(cj.cookies, cookie) + cj.mu.Unlock() + if ok { + if err != nil { + ch <- err + } + close(ch) + } +} + +func touchCookieFile(cookie turbopath.AbsoluteSystemPath) error { + f, err := cookie.OpenFile(os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0700) + if err != nil { + return err + } + if err := f.Close(); err != nil { + return err + } + return nil +} diff --git a/cli/internal/filewatcher/cookie_test.go b/cli/internal/filewatcher/cookie_test.go new file mode 100644 index 0000000..96241b4 --- /dev/null +++ b/cli/internal/filewatcher/cookie_test.go @@ -0,0 +1,130 @@ +package filewatcher + +import ( + "testing" + "time" + + "github.com/hashicorp/go-hclog" + "github.com/pkg/errors" + "github.com/vercel/turbo/cli/internal/fs" + "gotest.tools/v3/assert" +) + +func TestWaitForCookie(t *testing.T) { + logger := hclog.Default() + cookieDir := fs.AbsoluteSystemPathFromUpstream(t.TempDir()) + repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir()) + + jar, err := NewCookieJar(cookieDir, 5*time.Second) + assert.NilError(t, err, "NewCookieJar") + + watcher, err := GetPlatformSpecificBackend(logger) + assert.NilError(t, err, "NewWatcher") + fw := New(logger, repoRoot, watcher) + err = fw.Start() + assert.NilError(t, err, "Start") + fw.AddClient(jar) + err = fw.AddRoot(cookieDir) + assert.NilError(t, err, "Add") + + err = jar.WaitForCookie() + assert.NilError(t, err, "failed to roundtrip cookie") +} + +func TestWaitForCookieAfterClose(t *testing.T) { + logger := hclog.Default() + cookieDir := fs.AbsoluteSystemPathFromUpstream(t.TempDir()) + repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir()) + + jar, err := NewCookieJar(cookieDir, 5*time.Second) + assert.NilError(t, err, "NewCookieJar") + + watcher, err := GetPlatformSpecificBackend(logger) + assert.NilError(t, err, "NewWatcher") + fw := New(logger, repoRoot, watcher) + err = fw.Start() + assert.NilError(t, err, "Start") + fw.AddClient(jar) + err = fw.AddRoot(cookieDir) + assert.NilError(t, err, "Add") + + err = fw.Close() + assert.NilError(t, err, "Close") + err = jar.WaitForCookie() + assert.ErrorIs(t, err, ErrCookieWatchingClosed) +} + +func TestWaitForCookieTimeout(t *testing.T) { + logger := hclog.Default() + cookieDir := fs.AbsoluteSystemPathFromUpstream(t.TempDir()) + repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir()) + + jar, err := NewCookieJar(cookieDir, 10*time.Millisecond) + assert.NilError(t, err, "NewCookieJar") + + watcher, err := GetPlatformSpecificBackend(logger) + assert.NilError(t, err, "NewWatcher") + fw := New(logger, repoRoot, watcher) + err = fw.Start() + assert.NilError(t, err, "Start") + fw.AddClient(jar) + + // NOTE: don't call fw.Add here so that no file event gets delivered + + err = jar.WaitForCookie() + assert.ErrorIs(t, err, ErrCookieTimeout) +} + +func TestWaitForCookieWithError(t *testing.T) { + logger := hclog.Default() + cookieDir := fs.AbsoluteSystemPathFromUpstream(t.TempDir()) + repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir()) + + jar, err := NewCookieJar(cookieDir, 10*time.Second) + assert.NilError(t, err, "NewCookieJar") + + watcher, err := GetPlatformSpecificBackend(logger) + assert.NilError(t, err, "NewWatcher") + fw := New(logger, repoRoot, watcher) + err = fw.Start() + assert.NilError(t, err, "Start") + fw.AddClient(jar) + + // NOTE: don't call fw.Add here so that no file event gets delivered + myErr := errors.New("an error") + ch := make(chan error) + go func() { + if err := jar.WaitForCookie(); err != nil { + ch <- err + } + close(ch) + }() + // wait for the cookie to be registered in the jar + for { + found := false + jar.mu.Lock() + if len(jar.cookies) == 1 { + found = true + } + jar.mu.Unlock() + if found { + break + } + <-time.After(10 * time.Millisecond) + } + jar.OnFileWatchError(myErr) + + err, ok := <-ch + if !ok { + t.Error("expected to get an error from cookie watching") + } + assert.ErrorIs(t, err, myErr) + + // ensure waiting for a new cookie still works. + // Add the filewatch to allow cookies work normally + err = fw.AddRoot(cookieDir) + assert.NilError(t, err, "Add") + + err = jar.WaitForCookie() + assert.NilError(t, err, "WaitForCookie") +} diff --git a/cli/internal/filewatcher/filewatcher.go b/cli/internal/filewatcher/filewatcher.go new file mode 100644 index 0000000..4f79495 --- /dev/null +++ b/cli/internal/filewatcher/filewatcher.go @@ -0,0 +1,167 @@ +// Package filewatcher is used to handle watching for file changes inside the monorepo +package filewatcher + +import ( + "path/filepath" + "strings" + "sync" + + "github.com/hashicorp/go-hclog" + "github.com/pkg/errors" + "github.com/vercel/turbo/cli/internal/turbopath" +) + +// _ignores is the set of paths we exempt from file-watching +var _ignores = []string{".git", "node_modules"} + +// FileWatchClient defines the callbacks used by the file watching loop. +// All methods are called from the same goroutine so they: +// 1) do not need synchronization +// 2) should minimize the work they are doing when called, if possible +type FileWatchClient interface { + OnFileWatchEvent(ev Event) + OnFileWatchError(err error) + OnFileWatchClosed() +} + +// FileEvent is an enum covering the kinds of things that can happen +// to files that we might be interested in +type FileEvent int + +const ( + // FileAdded - this is a new file + FileAdded FileEvent = iota + 1 + // FileDeleted - this file has been removed + FileDeleted + // FileModified - this file has been changed in some way + FileModified + // FileRenamed - a file's name has changed + FileRenamed + // FileOther - some other backend-specific event has happened + FileOther +) + +var ( + // ErrFilewatchingClosed is returned when filewatching has been closed + ErrFilewatchingClosed = errors.New("Close() has already been called for filewatching") + // ErrFailedToStart is returned when filewatching fails to start up + ErrFailedToStart = errors.New("filewatching failed to start") +) + +// Event is the backend-independent information about a file change +type Event struct { + Path turbopath.AbsoluteSystemPath + EventType FileEvent +} + +// Backend is the interface that describes what an underlying filesystem watching backend +// must provide. +type Backend interface { + AddRoot(root turbopath.AbsoluteSystemPath, excludePatterns ...string) error + Events() <-chan Event + Errors() <-chan error + Close() error + Start() error +} + +// FileWatcher handles watching all of the files in the monorepo. +// We currently ignore .git and top-level node_modules. We can revisit +// if necessary. +type FileWatcher struct { + backend Backend + + logger hclog.Logger + repoRoot turbopath.AbsoluteSystemPath + excludePattern string + + clientsMu sync.RWMutex + clients []FileWatchClient + closed bool +} + +// New returns a new FileWatcher instance +func New(logger hclog.Logger, repoRoot turbopath.AbsoluteSystemPath, backend Backend) *FileWatcher { + excludes := make([]string, len(_ignores)) + for i, ignore := range _ignores { + excludes[i] = filepath.ToSlash(repoRoot.UntypedJoin(ignore).ToString() + "/**") + } + excludePattern := "{" + strings.Join(excludes, ",") + "}" + return &FileWatcher{ + backend: backend, + logger: logger, + repoRoot: repoRoot, + excludePattern: excludePattern, + } +} + +// Close shuts down filewatching +func (fw *FileWatcher) Close() error { + return fw.backend.Close() +} + +// Start recursively adds all directories from the repo root, redacts the excluded ones, +// then fires off a goroutine to respond to filesystem events +func (fw *FileWatcher) Start() error { + if err := fw.backend.AddRoot(fw.repoRoot, fw.excludePattern); err != nil { + return err + } + if err := fw.backend.Start(); err != nil { + return err + } + go fw.watch() + return nil +} + +// AddRoot registers the root a filesystem hierarchy to be watched for changes. Events are *not* +// fired for existing files when AddRoot is called, only for subsequent changes. +// NOTE: if it appears helpful, we could change this behavior so that we provide a stream of initial +// events. +func (fw *FileWatcher) AddRoot(root turbopath.AbsoluteSystemPath, excludePatterns ...string) error { + return fw.backend.AddRoot(root, excludePatterns...) +} + +// watch is the main file-watching loop. Watching is not recursive, +// so when new directories are added, they are manually recursively watched. +func (fw *FileWatcher) watch() { +outer: + for { + select { + case ev, ok := <-fw.backend.Events(): + if !ok { + fw.logger.Info("Events channel closed. Exiting watch loop") + break outer + } + fw.clientsMu.RLock() + for _, client := range fw.clients { + client.OnFileWatchEvent(ev) + } + fw.clientsMu.RUnlock() + case err, ok := <-fw.backend.Errors(): + if !ok { + fw.logger.Info("Errors channel closed. Exiting watch loop") + break outer + } + fw.clientsMu.RLock() + for _, client := range fw.clients { + client.OnFileWatchError(err) + } + fw.clientsMu.RUnlock() + } + } + fw.clientsMu.Lock() + fw.closed = true + for _, client := range fw.clients { + client.OnFileWatchClosed() + } + fw.clientsMu.Unlock() +} + +// AddClient registers a client for filesystem events +func (fw *FileWatcher) AddClient(client FileWatchClient) { + fw.clientsMu.Lock() + defer fw.clientsMu.Unlock() + fw.clients = append(fw.clients, client) + if fw.closed { + client.OnFileWatchClosed() + } +} diff --git a/cli/internal/filewatcher/filewatcher_test.go b/cli/internal/filewatcher/filewatcher_test.go new file mode 100644 index 0000000..72b48ba --- /dev/null +++ b/cli/internal/filewatcher/filewatcher_test.go @@ -0,0 +1,152 @@ +package filewatcher + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/hashicorp/go-hclog" + "github.com/vercel/turbo/cli/internal/fs" + "github.com/vercel/turbo/cli/internal/turbopath" + "gotest.tools/v3/assert" +) + +type testClient struct { + mu sync.Mutex + createEvents []Event + notify chan Event +} + +func (c *testClient) OnFileWatchEvent(ev Event) { + if ev.EventType == FileAdded { + c.mu.Lock() + defer c.mu.Unlock() + c.createEvents = append(c.createEvents, ev) + c.notify <- ev + } +} + +func (c *testClient) OnFileWatchError(err error) {} + +func (c *testClient) OnFileWatchClosed() {} + +func expectFilesystemEvent(t *testing.T, ch <-chan Event, expected Event) { + // mark this method as a helper + t.Helper() + timeout := time.After(1 * time.Second) + for { + select { + case ev := <-ch: + t.Logf("got event %v", ev) + if ev.Path == expected.Path && ev.EventType == expected.EventType { + return + } + case <-timeout: + t.Errorf("Timed out waiting for filesystem event at %v", expected.Path) + return + } + } +} + +func expectNoFilesystemEvent(t *testing.T, ch <-chan Event) { + // mark this method as a helper + t.Helper() + select { + case ev, ok := <-ch: + if ok { + t.Errorf("got unexpected filesystem event %v", ev) + } else { + t.Error("filewatching closed unexpectedly") + } + case <-time.After(500 * time.Millisecond): + return + } +} + +func expectWatching(t *testing.T, c *testClient, dirs []turbopath.AbsoluteSystemPath) { + t.Helper() + now := time.Now() + filename := fmt.Sprintf("test-%v", now.UnixMilli()) + for _, dir := range dirs { + file := dir.UntypedJoin(filename) + err := file.WriteFile([]byte("hello"), 0755) + assert.NilError(t, err, "WriteFile") + expectFilesystemEvent(t, c.notify, Event{ + Path: file, + EventType: FileAdded, + }) + } +} + +func TestFileWatching(t *testing.T) { + logger := hclog.Default() + logger.SetLevel(hclog.Debug) + repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir()) + err := repoRoot.UntypedJoin(".git").MkdirAll(0775) + assert.NilError(t, err, "MkdirAll") + err = repoRoot.UntypedJoin("node_modules", "some-dep").MkdirAll(0775) + assert.NilError(t, err, "MkdirAll") + err = repoRoot.UntypedJoin("parent", "child").MkdirAll(0775) + assert.NilError(t, err, "MkdirAll") + err = repoRoot.UntypedJoin("parent", "sibling").MkdirAll(0775) + assert.NilError(t, err, "MkdirAll") + + // Directory layout: + // <repoRoot>/ + // .git/ + // node_modules/ + // some-dep/ + // parent/ + // child/ + // sibling/ + + watcher, err := GetPlatformSpecificBackend(logger) + assert.NilError(t, err, "GetPlatformSpecificBackend") + fw := New(logger, repoRoot, watcher) + err = fw.Start() + assert.NilError(t, err, "fw.Start") + + // Add a client + ch := make(chan Event, 1) + c := &testClient{ + notify: ch, + } + fw.AddClient(c) + expectedWatching := []turbopath.AbsoluteSystemPath{ + repoRoot, + repoRoot.UntypedJoin("parent"), + repoRoot.UntypedJoin("parent", "child"), + repoRoot.UntypedJoin("parent", "sibling"), + } + expectWatching(t, c, expectedWatching) + + fooPath := repoRoot.UntypedJoin("parent", "child", "foo") + err = fooPath.WriteFile([]byte("hello"), 0644) + assert.NilError(t, err, "WriteFile") + expectFilesystemEvent(t, ch, Event{ + EventType: FileAdded, + Path: fooPath, + }) + + deepPath := repoRoot.UntypedJoin("parent", "sibling", "deep", "path") + err = deepPath.MkdirAll(0775) + assert.NilError(t, err, "MkdirAll") + // We'll catch an event for "deep", but not "deep/path" since + // we don't have a recursive watch + expectFilesystemEvent(t, ch, Event{ + Path: repoRoot.UntypedJoin("parent", "sibling", "deep"), + EventType: FileAdded, + }) + expectFilesystemEvent(t, ch, Event{ + Path: repoRoot.UntypedJoin("parent", "sibling", "deep", "path"), + EventType: FileAdded, + }) + expectedWatching = append(expectedWatching, deepPath, repoRoot.UntypedJoin("parent", "sibling", "deep")) + expectWatching(t, c, expectedWatching) + + gitFilePath := repoRoot.UntypedJoin(".git", "git-file") + err = gitFilePath.WriteFile([]byte("nope"), 0644) + assert.NilError(t, err, "WriteFile") + expectNoFilesystemEvent(t, ch) +} |
