aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/cli/internal/filewatcher
diff options
context:
space:
mode:
author简律纯 <hsiangnianian@outlook.com>2023-04-28 01:36:44 +0800
committer简律纯 <hsiangnianian@outlook.com>2023-04-28 01:36:44 +0800
commitdd84b9d64fb98746a230cd24233ff50a562c39c9 (patch)
treeb583261ef00b3afe72ec4d6dacb31e57779a6faf /cli/internal/filewatcher
parent0b46fcd72ac34382387b2bcf9095233efbcc52f4 (diff)
downloadHydroRoll-dd84b9d64fb98746a230cd24233ff50a562c39c9.tar.gz
HydroRoll-dd84b9d64fb98746a230cd24233ff50a562c39c9.zip
Diffstat (limited to 'cli/internal/filewatcher')
-rw-r--r--cli/internal/filewatcher/backend.go209
-rw-r--r--cli/internal/filewatcher/backend_darwin.go220
-rw-r--r--cli/internal/filewatcher/cookie.go160
-rw-r--r--cli/internal/filewatcher/cookie_test.go130
-rw-r--r--cli/internal/filewatcher/filewatcher.go167
-rw-r--r--cli/internal/filewatcher/filewatcher_test.go152
6 files changed, 1038 insertions, 0 deletions
diff --git a/cli/internal/filewatcher/backend.go b/cli/internal/filewatcher/backend.go
new file mode 100644
index 0000000..b8b7fa8
--- /dev/null
+++ b/cli/internal/filewatcher/backend.go
@@ -0,0 +1,209 @@
+//go:build !darwin
+// +build !darwin
+
+package filewatcher
+
+import (
+ "fmt"
+ "os"
+ "path/filepath"
+ "sync"
+
+ "github.com/fsnotify/fsnotify"
+ "github.com/hashicorp/go-hclog"
+ "github.com/karrick/godirwalk"
+ "github.com/pkg/errors"
+ "github.com/vercel/turbo/cli/internal/doublestar"
+ "github.com/vercel/turbo/cli/internal/fs"
+ "github.com/vercel/turbo/cli/internal/turbopath"
+)
+
+// watchAddMode is used to indicate whether watchRecursively should synthesize events
+// for existing files.
+type watchAddMode int
+
+const (
+ dontSynthesizeEvents watchAddMode = iota
+ synthesizeEvents
+)
+
+type fsNotifyBackend struct {
+ watcher *fsnotify.Watcher
+ events chan Event
+ errors chan error
+ logger hclog.Logger
+
+ mu sync.Mutex
+ allExcludes []string
+ closed bool
+}
+
+func (f *fsNotifyBackend) Events() <-chan Event {
+ return f.events
+}
+
+func (f *fsNotifyBackend) Errors() <-chan error {
+ return f.errors
+}
+
+func (f *fsNotifyBackend) Close() error {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+ if f.closed {
+ return ErrFilewatchingClosed
+ }
+ f.closed = true
+ close(f.events)
+ close(f.errors)
+ if err := f.watcher.Close(); err != nil {
+ return err
+ }
+ return nil
+}
+
+// onFileAdded helps up paper over cross-platform inconsistencies in fsnotify.
+// Some fsnotify backends automatically add the contents of directories. Some do
+// not. Adding a watch is idempotent, so anytime any file we care about gets added,
+// watch it.
+func (f *fsNotifyBackend) onFileAdded(name turbopath.AbsoluteSystemPath) error {
+ info, err := name.Lstat()
+ if err != nil {
+ if errors.Is(err, os.ErrNotExist) {
+ // We can race with a file being added and removed. Ignore it
+ return nil
+ }
+ return errors.Wrapf(err, "error checking lstat of new file %v", name)
+ }
+ if info.IsDir() {
+ // If a directory has been added, we need to synthesize events for everything it contains
+ if err := f.watchRecursively(name, []string{}, synthesizeEvents); err != nil {
+ return errors.Wrapf(err, "failed recursive watch of %v", name)
+ }
+ } else {
+ if err := f.watcher.Add(name.ToString()); err != nil {
+ return errors.Wrapf(err, "failed adding watch to %v", name)
+ }
+ }
+ return nil
+}
+
+func (f *fsNotifyBackend) watchRecursively(root turbopath.AbsoluteSystemPath, excludePatterns []string, addMode watchAddMode) error {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+ err := fs.WalkMode(root.ToString(), func(name string, isDir bool, info os.FileMode) error {
+ for _, excludePattern := range excludePatterns {
+ excluded, err := doublestar.Match(excludePattern, filepath.ToSlash(name))
+ if err != nil {
+ return err
+ }
+ if excluded {
+ return godirwalk.SkipThis
+ }
+ }
+ if info.IsDir() && (info&os.ModeSymlink == 0) {
+ if err := f.watcher.Add(name); err != nil {
+ return errors.Wrapf(err, "failed adding watch to %v", name)
+ }
+ f.logger.Debug(fmt.Sprintf("watching directory %v", name))
+ }
+ if addMode == synthesizeEvents {
+ f.events <- Event{
+ Path: fs.AbsoluteSystemPathFromUpstream(name),
+ EventType: FileAdded,
+ }
+ }
+ return nil
+ })
+ if err != nil {
+ return err
+ }
+ f.allExcludes = append(f.allExcludes, excludePatterns...)
+
+ return nil
+}
+
+func (f *fsNotifyBackend) watch() {
+outer:
+ for {
+ select {
+ case ev, ok := <-f.watcher.Events:
+ if !ok {
+ break outer
+ }
+ eventType := toFileEvent(ev.Op)
+ path := fs.AbsoluteSystemPathFromUpstream(ev.Name)
+ if eventType == FileAdded {
+ if err := f.onFileAdded(path); err != nil {
+ f.errors <- err
+ }
+ }
+ f.events <- Event{
+ Path: path,
+ EventType: eventType,
+ }
+ case err, ok := <-f.watcher.Errors:
+ if !ok {
+ break outer
+ }
+ f.errors <- err
+ }
+ }
+}
+
+var _modifiedMask = fsnotify.Chmod | fsnotify.Write
+
+func toFileEvent(op fsnotify.Op) FileEvent {
+ if op&fsnotify.Create != 0 {
+ return FileAdded
+ } else if op&fsnotify.Remove != 0 {
+ return FileDeleted
+ } else if op&_modifiedMask != 0 {
+ return FileModified
+ } else if op&fsnotify.Rename != 0 {
+ return FileRenamed
+ }
+ return FileOther
+}
+
+func (f *fsNotifyBackend) Start() error {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+ if f.closed {
+ return ErrFilewatchingClosed
+ }
+ for _, dir := range f.watcher.WatchList() {
+ for _, excludePattern := range f.allExcludes {
+ excluded, err := doublestar.Match(excludePattern, filepath.ToSlash(dir))
+ if err != nil {
+ return err
+ }
+ if excluded {
+ if err := f.watcher.Remove(dir); err != nil {
+ return err
+ }
+ }
+ }
+ }
+ go f.watch()
+ return nil
+}
+
+func (f *fsNotifyBackend) AddRoot(root turbopath.AbsoluteSystemPath, excludePatterns ...string) error {
+ // We don't synthesize events for the initial watch
+ return f.watchRecursively(root, excludePatterns, dontSynthesizeEvents)
+}
+
+// GetPlatformSpecificBackend returns a filewatching backend appropriate for the OS we are
+// running on.
+func GetPlatformSpecificBackend(logger hclog.Logger) (Backend, error) {
+ watcher, err := fsnotify.NewWatcher()
+ if err != nil {
+ return nil, err
+ }
+ return &fsNotifyBackend{
+ watcher: watcher,
+ events: make(chan Event),
+ errors: make(chan error),
+ logger: logger.Named("fsnotify"),
+ }, nil
+}
diff --git a/cli/internal/filewatcher/backend_darwin.go b/cli/internal/filewatcher/backend_darwin.go
new file mode 100644
index 0000000..4c029c4
--- /dev/null
+++ b/cli/internal/filewatcher/backend_darwin.go
@@ -0,0 +1,220 @@
+//go:build darwin
+// +build darwin
+
+package filewatcher
+
+import (
+ "fmt"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/pkg/errors"
+ "github.com/yookoala/realpath"
+
+ "github.com/fsnotify/fsevents"
+ "github.com/hashicorp/go-hclog"
+ "github.com/vercel/turbo/cli/internal/doublestar"
+ "github.com/vercel/turbo/cli/internal/fs"
+ "github.com/vercel/turbo/cli/internal/turbopath"
+)
+
+type fseventsBackend struct {
+ events chan Event
+ errors chan error
+ logger hclog.Logger
+ mu sync.Mutex
+ streams []*fsevents.EventStream
+ closed bool
+}
+
+func (f *fseventsBackend) Events() <-chan Event {
+ return f.events
+}
+
+func (f *fseventsBackend) Errors() <-chan error {
+ return f.errors
+}
+
+func (f *fseventsBackend) Close() error {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+ if f.closed {
+ return ErrFilewatchingClosed
+ }
+ f.closed = true
+ for _, stream := range f.streams {
+ stream.Stop()
+ }
+ close(f.events)
+ close(f.errors)
+ return nil
+}
+
+func (f *fseventsBackend) Start() error {
+ return nil
+}
+
+var (
+ _eventLatency = 10 * time.Millisecond
+ _cookieTimeout = 500 * time.Millisecond
+)
+
+// AddRoot starts watching a new directory hierarchy. Events matching the provided excludePatterns
+// will not be forwarded.
+func (f *fseventsBackend) AddRoot(someRoot turbopath.AbsoluteSystemPath, excludePatterns ...string) error {
+ // We need to resolve the real path to the hierarchy that we are going to watch
+ realRoot, err := realpath.Realpath(someRoot.ToString())
+ if err != nil {
+ return err
+ }
+ root := fs.AbsoluteSystemPathFromUpstream(realRoot)
+ dev, err := fsevents.DeviceForPath(root.ToString())
+ if err != nil {
+ return err
+ }
+
+ // Optimistically set up and start a stream, assuming the watch is still valid.
+ s := &fsevents.EventStream{
+ Paths: []string{root.ToString()},
+ Latency: _eventLatency,
+ Device: dev,
+ Flags: fsevents.FileEvents | fsevents.WatchRoot,
+ }
+ s.Start()
+ events := s.Events
+
+ // fsevents delivers events for all existing files first, so use a cookie to detect when we're ready for new events
+ if err := waitForCookie(root, events, _cookieTimeout); err != nil {
+ s.Stop()
+ return err
+ }
+
+ // Now try to persist the stream.
+ f.mu.Lock()
+ defer f.mu.Unlock()
+ if f.closed {
+ s.Stop()
+ return ErrFilewatchingClosed
+ }
+ f.streams = append(f.streams, s)
+ f.logger.Debug(fmt.Sprintf("watching root %v, excluding %v", root, excludePatterns))
+
+ go func() {
+ for evs := range events {
+ for _, ev := range evs {
+ isExcluded := false
+
+ // 1. Ensure that we have a `/`-prefixed path from the event.
+ var eventPath string
+ if !strings.HasPrefix("/", ev.Path) {
+ eventPath = "/" + ev.Path
+ } else {
+ eventPath = ev.Path
+ }
+
+ // 2. We're getting events from the real path, but we need to translate
+ // back to the path we were provided since that's what the caller will
+ // expect in terms of event paths.
+ watchRootRelativePath := eventPath[len(realRoot):]
+ processedEventPath := someRoot.UntypedJoin(watchRootRelativePath)
+
+ // 3. Compare the event to all exclude patterns, short-circuit if we know
+ // we are not watching this file.
+ processedPathString := processedEventPath.ToString() // loop invariant
+ for _, pattern := range excludePatterns {
+ matches, err := doublestar.Match(pattern, processedPathString)
+ if err != nil {
+ f.errors <- err
+ } else if matches {
+ isExcluded = true
+ break
+ }
+ }
+
+ // 4. Report the file events we care about.
+ if !isExcluded {
+ f.events <- Event{
+ Path: processedEventPath,
+ EventType: toFileEvent(ev.Flags),
+ }
+ }
+ }
+ }
+ }()
+
+ return nil
+}
+
+func waitForCookie(root turbopath.AbsoluteSystemPath, events <-chan []fsevents.Event, timeout time.Duration) error {
+ // This cookie needs to be in a location that we're watching, and at this point we can't guarantee
+ // what the root is, or if something like "node_modules/.cache/turbo" would make sense. As a compromise, ensure
+ // that we clean it up even in the event of a failure.
+ cookiePath := root.UntypedJoin(".turbo-cookie")
+ if err := cookiePath.WriteFile([]byte("cookie"), 0755); err != nil {
+ return err
+ }
+ expected := cookiePath.ToString()[1:] // trim leading slash
+ if err := waitForEvent(events, expected, fsevents.ItemCreated, timeout); err != nil {
+ // Attempt to not leave the cookie file lying around.
+ // Ignore the error, since there's not much we can do with it.
+ _ = cookiePath.Remove()
+ return err
+ }
+ if err := cookiePath.Remove(); err != nil {
+ return err
+ }
+ if err := waitForEvent(events, expected, fsevents.ItemRemoved, timeout); err != nil {
+ return err
+ }
+ return nil
+}
+
+func waitForEvent(events <-chan []fsevents.Event, path string, flag fsevents.EventFlags, timeout time.Duration) error {
+ ch := make(chan struct{})
+ go func() {
+ for evs := range events {
+ for _, ev := range evs {
+ if ev.Path == path && ev.Flags&flag != 0 {
+ close(ch)
+ return
+ }
+ }
+ }
+ }()
+ select {
+ case <-time.After(timeout):
+ return errors.Wrap(ErrFailedToStart, "timed out waiting for initial fsevents cookie")
+ case <-ch:
+ return nil
+ }
+}
+
+var _modifiedMask = fsevents.ItemModified | fsevents.ItemInodeMetaMod | fsevents.ItemFinderInfoMod | fsevents.ItemChangeOwner | fsevents.ItemXattrMod
+
+func toFileEvent(flags fsevents.EventFlags) FileEvent {
+ if flags&fsevents.ItemCreated != 0 {
+ return FileAdded
+ } else if flags&fsevents.ItemRemoved != 0 {
+ return FileDeleted
+ } else if flags&_modifiedMask != 0 {
+ return FileModified
+ } else if flags&fsevents.ItemRenamed != 0 {
+ return FileRenamed
+ } else if flags&fsevents.RootChanged != 0 {
+ // count this as a delete, something affected the path to the root
+ // of the stream
+ return FileDeleted
+ }
+ return FileOther
+}
+
+// GetPlatformSpecificBackend returns a filewatching backend appropriate for the OS we are
+// running on.
+func GetPlatformSpecificBackend(logger hclog.Logger) (Backend, error) {
+ return &fseventsBackend{
+ events: make(chan Event),
+ errors: make(chan error),
+ logger: logger.Named("fsevents"),
+ }, nil
+}
diff --git a/cli/internal/filewatcher/cookie.go b/cli/internal/filewatcher/cookie.go
new file mode 100644
index 0000000..7a4931e
--- /dev/null
+++ b/cli/internal/filewatcher/cookie.go
@@ -0,0 +1,160 @@
+package filewatcher
+
+import (
+ "fmt"
+ "os"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/pkg/errors"
+ "github.com/vercel/turbo/cli/internal/fs"
+ "github.com/vercel/turbo/cli/internal/turbopath"
+)
+
+// CookieWaiter is the interface used by clients that need to wait
+// for a roundtrip through the filewatching API.
+type CookieWaiter interface {
+ WaitForCookie() error
+}
+
+var (
+ // ErrCookieTimeout is returned when we did not see our cookie file within the given time constraints
+ ErrCookieTimeout = errors.New("timed out waiting for cookie")
+ // ErrCookieWatchingClosed is returned when the underlying filewatching has been closed.
+ ErrCookieWatchingClosed = errors.New("filewatching has closed, cannot watch cookies")
+)
+
+// CookieJar is used for tracking roundtrips through the filesystem watching API
+type CookieJar struct {
+ timeout time.Duration
+ dir turbopath.AbsoluteSystemPath
+ serial uint64
+ mu sync.Mutex
+ cookies map[turbopath.AbsoluteSystemPath]chan error
+ closed bool
+}
+
+// NewCookieJar returns a new instance of a CookieJar. There should only ever be a single
+// instance live per cookieDir, since they expect to have full control over that directory.
+func NewCookieJar(cookieDir turbopath.AbsoluteSystemPath, timeout time.Duration) (*CookieJar, error) {
+ if err := cookieDir.RemoveAll(); err != nil {
+ return nil, err
+ }
+ if err := cookieDir.MkdirAll(0775); err != nil {
+ return nil, err
+ }
+ return &CookieJar{
+ timeout: timeout,
+ dir: cookieDir,
+ cookies: make(map[turbopath.AbsoluteSystemPath]chan error),
+ }, nil
+}
+
+// removeAllCookiesWithError sends the error to every channel, closes every channel,
+// and attempts to remove every cookie file. Must be called while the cj.mu is held.
+// If the cookie jar is going to be reused afterwards, the cookies map must be reinitialized.
+func (cj *CookieJar) removeAllCookiesWithError(err error) {
+ for p, ch := range cj.cookies {
+ _ = p.Remove()
+ ch <- err
+ close(ch)
+ }
+ // Drop all of the references so they can be cleaned up
+ cj.cookies = nil
+}
+
+// OnFileWatchClosed handles the case where filewatching had to close for some reason
+// We send an error to all of our cookies and stop accepting new ones.
+func (cj *CookieJar) OnFileWatchClosed() {
+ cj.mu.Lock()
+ defer cj.mu.Unlock()
+ cj.closed = true
+ cj.removeAllCookiesWithError(ErrCookieWatchingClosed)
+
+}
+
+// OnFileWatchError handles when filewatching has encountered an error.
+// In the error case, we remove all cookies and send them errors. We remain
+// available for later cookies.
+func (cj *CookieJar) OnFileWatchError(err error) {
+ // We are now in an inconsistent state. Drop all of our cookies,
+ // but we still allow new ones to be created
+ cj.mu.Lock()
+ defer cj.mu.Unlock()
+ cj.removeAllCookiesWithError(err)
+ cj.cookies = make(map[turbopath.AbsoluteSystemPath]chan error)
+}
+
+// OnFileWatchEvent determines if the specified event is relevant
+// for cookie watching and notifies the appropriate cookie if so.
+func (cj *CookieJar) OnFileWatchEvent(ev Event) {
+ if ev.EventType == FileAdded {
+ isCookie, err := fs.DirContainsPath(cj.dir.ToStringDuringMigration(), ev.Path.ToStringDuringMigration())
+ if err != nil {
+ cj.OnFileWatchError(errors.Wrapf(err, "failed to determine if path is a cookie: %v", ev.Path))
+ } else if isCookie {
+ cj.notifyCookie(ev.Path, nil)
+ }
+ }
+}
+
+// WaitForCookie touches a unique file, then waits for it to show up in filesystem notifications.
+// This provides a theoretical bound on filesystem operations, although it's possible
+// that underlying filewatch mechanisms don't respect this ordering.
+func (cj *CookieJar) WaitForCookie() error {
+ // we're only ever going to send a single error on the channel, add a buffer so that we never
+ // block sending it.
+ ch := make(chan error, 1)
+ serial := atomic.AddUint64(&cj.serial, 1)
+ cookiePath := cj.dir.UntypedJoin(fmt.Sprintf("%v.cookie", serial))
+ cj.mu.Lock()
+ if cj.closed {
+ cj.mu.Unlock()
+ return ErrCookieWatchingClosed
+ }
+ cj.cookies[cookiePath] = ch
+ cj.mu.Unlock()
+ if err := touchCookieFile(cookiePath); err != nil {
+ cj.notifyCookie(cookiePath, err)
+ return err
+ }
+ select {
+ case <-time.After(cj.timeout):
+ return ErrCookieTimeout
+ case err, ok := <-ch:
+ if !ok {
+ // the channel closed without an error, we're all set
+ return nil
+ }
+ // the channel didn't close, meaning we got some error.
+ // We don't need to wait on channel close, it's going to be closed
+ // immediately by whoever sent the error. Return the error directly
+ return err
+ }
+}
+
+func (cj *CookieJar) notifyCookie(cookie turbopath.AbsoluteSystemPath, err error) {
+ cj.mu.Lock()
+ ch, ok := cj.cookies[cookie]
+ // delete is a no-op if the key doesn't exist
+ delete(cj.cookies, cookie)
+ cj.mu.Unlock()
+ if ok {
+ if err != nil {
+ ch <- err
+ }
+ close(ch)
+ }
+}
+
+func touchCookieFile(cookie turbopath.AbsoluteSystemPath) error {
+ f, err := cookie.OpenFile(os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0700)
+ if err != nil {
+ return err
+ }
+ if err := f.Close(); err != nil {
+ return err
+ }
+ return nil
+}
diff --git a/cli/internal/filewatcher/cookie_test.go b/cli/internal/filewatcher/cookie_test.go
new file mode 100644
index 0000000..96241b4
--- /dev/null
+++ b/cli/internal/filewatcher/cookie_test.go
@@ -0,0 +1,130 @@
+package filewatcher
+
+import (
+ "testing"
+ "time"
+
+ "github.com/hashicorp/go-hclog"
+ "github.com/pkg/errors"
+ "github.com/vercel/turbo/cli/internal/fs"
+ "gotest.tools/v3/assert"
+)
+
+func TestWaitForCookie(t *testing.T) {
+ logger := hclog.Default()
+ cookieDir := fs.AbsoluteSystemPathFromUpstream(t.TempDir())
+ repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir())
+
+ jar, err := NewCookieJar(cookieDir, 5*time.Second)
+ assert.NilError(t, err, "NewCookieJar")
+
+ watcher, err := GetPlatformSpecificBackend(logger)
+ assert.NilError(t, err, "NewWatcher")
+ fw := New(logger, repoRoot, watcher)
+ err = fw.Start()
+ assert.NilError(t, err, "Start")
+ fw.AddClient(jar)
+ err = fw.AddRoot(cookieDir)
+ assert.NilError(t, err, "Add")
+
+ err = jar.WaitForCookie()
+ assert.NilError(t, err, "failed to roundtrip cookie")
+}
+
+func TestWaitForCookieAfterClose(t *testing.T) {
+ logger := hclog.Default()
+ cookieDir := fs.AbsoluteSystemPathFromUpstream(t.TempDir())
+ repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir())
+
+ jar, err := NewCookieJar(cookieDir, 5*time.Second)
+ assert.NilError(t, err, "NewCookieJar")
+
+ watcher, err := GetPlatformSpecificBackend(logger)
+ assert.NilError(t, err, "NewWatcher")
+ fw := New(logger, repoRoot, watcher)
+ err = fw.Start()
+ assert.NilError(t, err, "Start")
+ fw.AddClient(jar)
+ err = fw.AddRoot(cookieDir)
+ assert.NilError(t, err, "Add")
+
+ err = fw.Close()
+ assert.NilError(t, err, "Close")
+ err = jar.WaitForCookie()
+ assert.ErrorIs(t, err, ErrCookieWatchingClosed)
+}
+
+func TestWaitForCookieTimeout(t *testing.T) {
+ logger := hclog.Default()
+ cookieDir := fs.AbsoluteSystemPathFromUpstream(t.TempDir())
+ repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir())
+
+ jar, err := NewCookieJar(cookieDir, 10*time.Millisecond)
+ assert.NilError(t, err, "NewCookieJar")
+
+ watcher, err := GetPlatformSpecificBackend(logger)
+ assert.NilError(t, err, "NewWatcher")
+ fw := New(logger, repoRoot, watcher)
+ err = fw.Start()
+ assert.NilError(t, err, "Start")
+ fw.AddClient(jar)
+
+ // NOTE: don't call fw.Add here so that no file event gets delivered
+
+ err = jar.WaitForCookie()
+ assert.ErrorIs(t, err, ErrCookieTimeout)
+}
+
+func TestWaitForCookieWithError(t *testing.T) {
+ logger := hclog.Default()
+ cookieDir := fs.AbsoluteSystemPathFromUpstream(t.TempDir())
+ repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir())
+
+ jar, err := NewCookieJar(cookieDir, 10*time.Second)
+ assert.NilError(t, err, "NewCookieJar")
+
+ watcher, err := GetPlatformSpecificBackend(logger)
+ assert.NilError(t, err, "NewWatcher")
+ fw := New(logger, repoRoot, watcher)
+ err = fw.Start()
+ assert.NilError(t, err, "Start")
+ fw.AddClient(jar)
+
+ // NOTE: don't call fw.Add here so that no file event gets delivered
+ myErr := errors.New("an error")
+ ch := make(chan error)
+ go func() {
+ if err := jar.WaitForCookie(); err != nil {
+ ch <- err
+ }
+ close(ch)
+ }()
+ // wait for the cookie to be registered in the jar
+ for {
+ found := false
+ jar.mu.Lock()
+ if len(jar.cookies) == 1 {
+ found = true
+ }
+ jar.mu.Unlock()
+ if found {
+ break
+ }
+ <-time.After(10 * time.Millisecond)
+ }
+ jar.OnFileWatchError(myErr)
+
+ err, ok := <-ch
+ if !ok {
+ t.Error("expected to get an error from cookie watching")
+ }
+ assert.ErrorIs(t, err, myErr)
+
+ // ensure waiting for a new cookie still works.
+ // Add the filewatch to allow cookies work normally
+ err = fw.AddRoot(cookieDir)
+ assert.NilError(t, err, "Add")
+
+ err = jar.WaitForCookie()
+ assert.NilError(t, err, "WaitForCookie")
+}
diff --git a/cli/internal/filewatcher/filewatcher.go b/cli/internal/filewatcher/filewatcher.go
new file mode 100644
index 0000000..4f79495
--- /dev/null
+++ b/cli/internal/filewatcher/filewatcher.go
@@ -0,0 +1,167 @@
+// Package filewatcher is used to handle watching for file changes inside the monorepo
+package filewatcher
+
+import (
+ "path/filepath"
+ "strings"
+ "sync"
+
+ "github.com/hashicorp/go-hclog"
+ "github.com/pkg/errors"
+ "github.com/vercel/turbo/cli/internal/turbopath"
+)
+
+// _ignores is the set of paths we exempt from file-watching
+var _ignores = []string{".git", "node_modules"}
+
+// FileWatchClient defines the callbacks used by the file watching loop.
+// All methods are called from the same goroutine so they:
+// 1) do not need synchronization
+// 2) should minimize the work they are doing when called, if possible
+type FileWatchClient interface {
+ OnFileWatchEvent(ev Event)
+ OnFileWatchError(err error)
+ OnFileWatchClosed()
+}
+
+// FileEvent is an enum covering the kinds of things that can happen
+// to files that we might be interested in
+type FileEvent int
+
+const (
+ // FileAdded - this is a new file
+ FileAdded FileEvent = iota + 1
+ // FileDeleted - this file has been removed
+ FileDeleted
+ // FileModified - this file has been changed in some way
+ FileModified
+ // FileRenamed - a file's name has changed
+ FileRenamed
+ // FileOther - some other backend-specific event has happened
+ FileOther
+)
+
+var (
+ // ErrFilewatchingClosed is returned when filewatching has been closed
+ ErrFilewatchingClosed = errors.New("Close() has already been called for filewatching")
+ // ErrFailedToStart is returned when filewatching fails to start up
+ ErrFailedToStart = errors.New("filewatching failed to start")
+)
+
+// Event is the backend-independent information about a file change
+type Event struct {
+ Path turbopath.AbsoluteSystemPath
+ EventType FileEvent
+}
+
+// Backend is the interface that describes what an underlying filesystem watching backend
+// must provide.
+type Backend interface {
+ AddRoot(root turbopath.AbsoluteSystemPath, excludePatterns ...string) error
+ Events() <-chan Event
+ Errors() <-chan error
+ Close() error
+ Start() error
+}
+
+// FileWatcher handles watching all of the files in the monorepo.
+// We currently ignore .git and top-level node_modules. We can revisit
+// if necessary.
+type FileWatcher struct {
+ backend Backend
+
+ logger hclog.Logger
+ repoRoot turbopath.AbsoluteSystemPath
+ excludePattern string
+
+ clientsMu sync.RWMutex
+ clients []FileWatchClient
+ closed bool
+}
+
+// New returns a new FileWatcher instance
+func New(logger hclog.Logger, repoRoot turbopath.AbsoluteSystemPath, backend Backend) *FileWatcher {
+ excludes := make([]string, len(_ignores))
+ for i, ignore := range _ignores {
+ excludes[i] = filepath.ToSlash(repoRoot.UntypedJoin(ignore).ToString() + "/**")
+ }
+ excludePattern := "{" + strings.Join(excludes, ",") + "}"
+ return &FileWatcher{
+ backend: backend,
+ logger: logger,
+ repoRoot: repoRoot,
+ excludePattern: excludePattern,
+ }
+}
+
+// Close shuts down filewatching
+func (fw *FileWatcher) Close() error {
+ return fw.backend.Close()
+}
+
+// Start recursively adds all directories from the repo root, redacts the excluded ones,
+// then fires off a goroutine to respond to filesystem events
+func (fw *FileWatcher) Start() error {
+ if err := fw.backend.AddRoot(fw.repoRoot, fw.excludePattern); err != nil {
+ return err
+ }
+ if err := fw.backend.Start(); err != nil {
+ return err
+ }
+ go fw.watch()
+ return nil
+}
+
+// AddRoot registers the root a filesystem hierarchy to be watched for changes. Events are *not*
+// fired for existing files when AddRoot is called, only for subsequent changes.
+// NOTE: if it appears helpful, we could change this behavior so that we provide a stream of initial
+// events.
+func (fw *FileWatcher) AddRoot(root turbopath.AbsoluteSystemPath, excludePatterns ...string) error {
+ return fw.backend.AddRoot(root, excludePatterns...)
+}
+
+// watch is the main file-watching loop. Watching is not recursive,
+// so when new directories are added, they are manually recursively watched.
+func (fw *FileWatcher) watch() {
+outer:
+ for {
+ select {
+ case ev, ok := <-fw.backend.Events():
+ if !ok {
+ fw.logger.Info("Events channel closed. Exiting watch loop")
+ break outer
+ }
+ fw.clientsMu.RLock()
+ for _, client := range fw.clients {
+ client.OnFileWatchEvent(ev)
+ }
+ fw.clientsMu.RUnlock()
+ case err, ok := <-fw.backend.Errors():
+ if !ok {
+ fw.logger.Info("Errors channel closed. Exiting watch loop")
+ break outer
+ }
+ fw.clientsMu.RLock()
+ for _, client := range fw.clients {
+ client.OnFileWatchError(err)
+ }
+ fw.clientsMu.RUnlock()
+ }
+ }
+ fw.clientsMu.Lock()
+ fw.closed = true
+ for _, client := range fw.clients {
+ client.OnFileWatchClosed()
+ }
+ fw.clientsMu.Unlock()
+}
+
+// AddClient registers a client for filesystem events
+func (fw *FileWatcher) AddClient(client FileWatchClient) {
+ fw.clientsMu.Lock()
+ defer fw.clientsMu.Unlock()
+ fw.clients = append(fw.clients, client)
+ if fw.closed {
+ client.OnFileWatchClosed()
+ }
+}
diff --git a/cli/internal/filewatcher/filewatcher_test.go b/cli/internal/filewatcher/filewatcher_test.go
new file mode 100644
index 0000000..72b48ba
--- /dev/null
+++ b/cli/internal/filewatcher/filewatcher_test.go
@@ -0,0 +1,152 @@
+package filewatcher
+
+import (
+ "fmt"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/hashicorp/go-hclog"
+ "github.com/vercel/turbo/cli/internal/fs"
+ "github.com/vercel/turbo/cli/internal/turbopath"
+ "gotest.tools/v3/assert"
+)
+
+type testClient struct {
+ mu sync.Mutex
+ createEvents []Event
+ notify chan Event
+}
+
+func (c *testClient) OnFileWatchEvent(ev Event) {
+ if ev.EventType == FileAdded {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ c.createEvents = append(c.createEvents, ev)
+ c.notify <- ev
+ }
+}
+
+func (c *testClient) OnFileWatchError(err error) {}
+
+func (c *testClient) OnFileWatchClosed() {}
+
+func expectFilesystemEvent(t *testing.T, ch <-chan Event, expected Event) {
+ // mark this method as a helper
+ t.Helper()
+ timeout := time.After(1 * time.Second)
+ for {
+ select {
+ case ev := <-ch:
+ t.Logf("got event %v", ev)
+ if ev.Path == expected.Path && ev.EventType == expected.EventType {
+ return
+ }
+ case <-timeout:
+ t.Errorf("Timed out waiting for filesystem event at %v", expected.Path)
+ return
+ }
+ }
+}
+
+func expectNoFilesystemEvent(t *testing.T, ch <-chan Event) {
+ // mark this method as a helper
+ t.Helper()
+ select {
+ case ev, ok := <-ch:
+ if ok {
+ t.Errorf("got unexpected filesystem event %v", ev)
+ } else {
+ t.Error("filewatching closed unexpectedly")
+ }
+ case <-time.After(500 * time.Millisecond):
+ return
+ }
+}
+
+func expectWatching(t *testing.T, c *testClient, dirs []turbopath.AbsoluteSystemPath) {
+ t.Helper()
+ now := time.Now()
+ filename := fmt.Sprintf("test-%v", now.UnixMilli())
+ for _, dir := range dirs {
+ file := dir.UntypedJoin(filename)
+ err := file.WriteFile([]byte("hello"), 0755)
+ assert.NilError(t, err, "WriteFile")
+ expectFilesystemEvent(t, c.notify, Event{
+ Path: file,
+ EventType: FileAdded,
+ })
+ }
+}
+
+func TestFileWatching(t *testing.T) {
+ logger := hclog.Default()
+ logger.SetLevel(hclog.Debug)
+ repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir())
+ err := repoRoot.UntypedJoin(".git").MkdirAll(0775)
+ assert.NilError(t, err, "MkdirAll")
+ err = repoRoot.UntypedJoin("node_modules", "some-dep").MkdirAll(0775)
+ assert.NilError(t, err, "MkdirAll")
+ err = repoRoot.UntypedJoin("parent", "child").MkdirAll(0775)
+ assert.NilError(t, err, "MkdirAll")
+ err = repoRoot.UntypedJoin("parent", "sibling").MkdirAll(0775)
+ assert.NilError(t, err, "MkdirAll")
+
+ // Directory layout:
+ // <repoRoot>/
+ // .git/
+ // node_modules/
+ // some-dep/
+ // parent/
+ // child/
+ // sibling/
+
+ watcher, err := GetPlatformSpecificBackend(logger)
+ assert.NilError(t, err, "GetPlatformSpecificBackend")
+ fw := New(logger, repoRoot, watcher)
+ err = fw.Start()
+ assert.NilError(t, err, "fw.Start")
+
+ // Add a client
+ ch := make(chan Event, 1)
+ c := &testClient{
+ notify: ch,
+ }
+ fw.AddClient(c)
+ expectedWatching := []turbopath.AbsoluteSystemPath{
+ repoRoot,
+ repoRoot.UntypedJoin("parent"),
+ repoRoot.UntypedJoin("parent", "child"),
+ repoRoot.UntypedJoin("parent", "sibling"),
+ }
+ expectWatching(t, c, expectedWatching)
+
+ fooPath := repoRoot.UntypedJoin("parent", "child", "foo")
+ err = fooPath.WriteFile([]byte("hello"), 0644)
+ assert.NilError(t, err, "WriteFile")
+ expectFilesystemEvent(t, ch, Event{
+ EventType: FileAdded,
+ Path: fooPath,
+ })
+
+ deepPath := repoRoot.UntypedJoin("parent", "sibling", "deep", "path")
+ err = deepPath.MkdirAll(0775)
+ assert.NilError(t, err, "MkdirAll")
+ // We'll catch an event for "deep", but not "deep/path" since
+ // we don't have a recursive watch
+ expectFilesystemEvent(t, ch, Event{
+ Path: repoRoot.UntypedJoin("parent", "sibling", "deep"),
+ EventType: FileAdded,
+ })
+ expectFilesystemEvent(t, ch, Event{
+ Path: repoRoot.UntypedJoin("parent", "sibling", "deep", "path"),
+ EventType: FileAdded,
+ })
+ expectedWatching = append(expectedWatching, deepPath, repoRoot.UntypedJoin("parent", "sibling", "deep"))
+ expectWatching(t, c, expectedWatching)
+
+ gitFilePath := repoRoot.UntypedJoin(".git", "git-file")
+ err = gitFilePath.WriteFile([]byte("nope"), 0644)
+ assert.NilError(t, err, "WriteFile")
+ expectNoFilesystemEvent(t, ch)
+}