From fc8c5fdce62fb229202659408798a7b6c98f6e8b Mon Sep 17 00:00:00 2001 From: 简律纯 Date: Fri, 28 Apr 2023 01:36:55 +0800 Subject: --- cli/internal/filewatcher/backend.go | 209 ------------------------- cli/internal/filewatcher/backend_darwin.go | 220 --------------------------- cli/internal/filewatcher/cookie.go | 160 ------------------- cli/internal/filewatcher/cookie_test.go | 130 ---------------- cli/internal/filewatcher/filewatcher.go | 167 -------------------- cli/internal/filewatcher/filewatcher_test.go | 152 ------------------ 6 files changed, 1038 deletions(-) delete mode 100644 cli/internal/filewatcher/backend.go delete mode 100644 cli/internal/filewatcher/backend_darwin.go delete mode 100644 cli/internal/filewatcher/cookie.go delete mode 100644 cli/internal/filewatcher/cookie_test.go delete mode 100644 cli/internal/filewatcher/filewatcher.go delete mode 100644 cli/internal/filewatcher/filewatcher_test.go (limited to 'cli/internal/filewatcher') diff --git a/cli/internal/filewatcher/backend.go b/cli/internal/filewatcher/backend.go deleted file mode 100644 index b8b7fa8..0000000 --- a/cli/internal/filewatcher/backend.go +++ /dev/null @@ -1,209 +0,0 @@ -//go:build !darwin -// +build !darwin - -package filewatcher - -import ( - "fmt" - "os" - "path/filepath" - "sync" - - "github.com/fsnotify/fsnotify" - "github.com/hashicorp/go-hclog" - "github.com/karrick/godirwalk" - "github.com/pkg/errors" - "github.com/vercel/turbo/cli/internal/doublestar" - "github.com/vercel/turbo/cli/internal/fs" - "github.com/vercel/turbo/cli/internal/turbopath" -) - -// watchAddMode is used to indicate whether watchRecursively should synthesize events -// for existing files. -type watchAddMode int - -const ( - dontSynthesizeEvents watchAddMode = iota - synthesizeEvents -) - -type fsNotifyBackend struct { - watcher *fsnotify.Watcher - events chan Event - errors chan error - logger hclog.Logger - - mu sync.Mutex - allExcludes []string - closed bool -} - -func (f *fsNotifyBackend) Events() <-chan Event { - return f.events -} - -func (f *fsNotifyBackend) Errors() <-chan error { - return f.errors -} - -func (f *fsNotifyBackend) Close() error { - f.mu.Lock() - defer f.mu.Unlock() - if f.closed { - return ErrFilewatchingClosed - } - f.closed = true - close(f.events) - close(f.errors) - if err := f.watcher.Close(); err != nil { - return err - } - return nil -} - -// onFileAdded helps up paper over cross-platform inconsistencies in fsnotify. -// Some fsnotify backends automatically add the contents of directories. Some do -// not. Adding a watch is idempotent, so anytime any file we care about gets added, -// watch it. -func (f *fsNotifyBackend) onFileAdded(name turbopath.AbsoluteSystemPath) error { - info, err := name.Lstat() - if err != nil { - if errors.Is(err, os.ErrNotExist) { - // We can race with a file being added and removed. Ignore it - return nil - } - return errors.Wrapf(err, "error checking lstat of new file %v", name) - } - if info.IsDir() { - // If a directory has been added, we need to synthesize events for everything it contains - if err := f.watchRecursively(name, []string{}, synthesizeEvents); err != nil { - return errors.Wrapf(err, "failed recursive watch of %v", name) - } - } else { - if err := f.watcher.Add(name.ToString()); err != nil { - return errors.Wrapf(err, "failed adding watch to %v", name) - } - } - return nil -} - -func (f *fsNotifyBackend) watchRecursively(root turbopath.AbsoluteSystemPath, excludePatterns []string, addMode watchAddMode) error { - f.mu.Lock() - defer f.mu.Unlock() - err := fs.WalkMode(root.ToString(), func(name string, isDir bool, info os.FileMode) error { - for _, excludePattern := range excludePatterns { - excluded, err := doublestar.Match(excludePattern, filepath.ToSlash(name)) - if err != nil { - return err - } - if excluded { - return godirwalk.SkipThis - } - } - if info.IsDir() && (info&os.ModeSymlink == 0) { - if err := f.watcher.Add(name); err != nil { - return errors.Wrapf(err, "failed adding watch to %v", name) - } - f.logger.Debug(fmt.Sprintf("watching directory %v", name)) - } - if addMode == synthesizeEvents { - f.events <- Event{ - Path: fs.AbsoluteSystemPathFromUpstream(name), - EventType: FileAdded, - } - } - return nil - }) - if err != nil { - return err - } - f.allExcludes = append(f.allExcludes, excludePatterns...) - - return nil -} - -func (f *fsNotifyBackend) watch() { -outer: - for { - select { - case ev, ok := <-f.watcher.Events: - if !ok { - break outer - } - eventType := toFileEvent(ev.Op) - path := fs.AbsoluteSystemPathFromUpstream(ev.Name) - if eventType == FileAdded { - if err := f.onFileAdded(path); err != nil { - f.errors <- err - } - } - f.events <- Event{ - Path: path, - EventType: eventType, - } - case err, ok := <-f.watcher.Errors: - if !ok { - break outer - } - f.errors <- err - } - } -} - -var _modifiedMask = fsnotify.Chmod | fsnotify.Write - -func toFileEvent(op fsnotify.Op) FileEvent { - if op&fsnotify.Create != 0 { - return FileAdded - } else if op&fsnotify.Remove != 0 { - return FileDeleted - } else if op&_modifiedMask != 0 { - return FileModified - } else if op&fsnotify.Rename != 0 { - return FileRenamed - } - return FileOther -} - -func (f *fsNotifyBackend) Start() error { - f.mu.Lock() - defer f.mu.Unlock() - if f.closed { - return ErrFilewatchingClosed - } - for _, dir := range f.watcher.WatchList() { - for _, excludePattern := range f.allExcludes { - excluded, err := doublestar.Match(excludePattern, filepath.ToSlash(dir)) - if err != nil { - return err - } - if excluded { - if err := f.watcher.Remove(dir); err != nil { - return err - } - } - } - } - go f.watch() - return nil -} - -func (f *fsNotifyBackend) AddRoot(root turbopath.AbsoluteSystemPath, excludePatterns ...string) error { - // We don't synthesize events for the initial watch - return f.watchRecursively(root, excludePatterns, dontSynthesizeEvents) -} - -// GetPlatformSpecificBackend returns a filewatching backend appropriate for the OS we are -// running on. -func GetPlatformSpecificBackend(logger hclog.Logger) (Backend, error) { - watcher, err := fsnotify.NewWatcher() - if err != nil { - return nil, err - } - return &fsNotifyBackend{ - watcher: watcher, - events: make(chan Event), - errors: make(chan error), - logger: logger.Named("fsnotify"), - }, nil -} diff --git a/cli/internal/filewatcher/backend_darwin.go b/cli/internal/filewatcher/backend_darwin.go deleted file mode 100644 index 4c029c4..0000000 --- a/cli/internal/filewatcher/backend_darwin.go +++ /dev/null @@ -1,220 +0,0 @@ -//go:build darwin -// +build darwin - -package filewatcher - -import ( - "fmt" - "strings" - "sync" - "time" - - "github.com/pkg/errors" - "github.com/yookoala/realpath" - - "github.com/fsnotify/fsevents" - "github.com/hashicorp/go-hclog" - "github.com/vercel/turbo/cli/internal/doublestar" - "github.com/vercel/turbo/cli/internal/fs" - "github.com/vercel/turbo/cli/internal/turbopath" -) - -type fseventsBackend struct { - events chan Event - errors chan error - logger hclog.Logger - mu sync.Mutex - streams []*fsevents.EventStream - closed bool -} - -func (f *fseventsBackend) Events() <-chan Event { - return f.events -} - -func (f *fseventsBackend) Errors() <-chan error { - return f.errors -} - -func (f *fseventsBackend) Close() error { - f.mu.Lock() - defer f.mu.Unlock() - if f.closed { - return ErrFilewatchingClosed - } - f.closed = true - for _, stream := range f.streams { - stream.Stop() - } - close(f.events) - close(f.errors) - return nil -} - -func (f *fseventsBackend) Start() error { - return nil -} - -var ( - _eventLatency = 10 * time.Millisecond - _cookieTimeout = 500 * time.Millisecond -) - -// AddRoot starts watching a new directory hierarchy. Events matching the provided excludePatterns -// will not be forwarded. -func (f *fseventsBackend) AddRoot(someRoot turbopath.AbsoluteSystemPath, excludePatterns ...string) error { - // We need to resolve the real path to the hierarchy that we are going to watch - realRoot, err := realpath.Realpath(someRoot.ToString()) - if err != nil { - return err - } - root := fs.AbsoluteSystemPathFromUpstream(realRoot) - dev, err := fsevents.DeviceForPath(root.ToString()) - if err != nil { - return err - } - - // Optimistically set up and start a stream, assuming the watch is still valid. - s := &fsevents.EventStream{ - Paths: []string{root.ToString()}, - Latency: _eventLatency, - Device: dev, - Flags: fsevents.FileEvents | fsevents.WatchRoot, - } - s.Start() - events := s.Events - - // fsevents delivers events for all existing files first, so use a cookie to detect when we're ready for new events - if err := waitForCookie(root, events, _cookieTimeout); err != nil { - s.Stop() - return err - } - - // Now try to persist the stream. - f.mu.Lock() - defer f.mu.Unlock() - if f.closed { - s.Stop() - return ErrFilewatchingClosed - } - f.streams = append(f.streams, s) - f.logger.Debug(fmt.Sprintf("watching root %v, excluding %v", root, excludePatterns)) - - go func() { - for evs := range events { - for _, ev := range evs { - isExcluded := false - - // 1. Ensure that we have a `/`-prefixed path from the event. - var eventPath string - if !strings.HasPrefix("/", ev.Path) { - eventPath = "/" + ev.Path - } else { - eventPath = ev.Path - } - - // 2. We're getting events from the real path, but we need to translate - // back to the path we were provided since that's what the caller will - // expect in terms of event paths. - watchRootRelativePath := eventPath[len(realRoot):] - processedEventPath := someRoot.UntypedJoin(watchRootRelativePath) - - // 3. Compare the event to all exclude patterns, short-circuit if we know - // we are not watching this file. - processedPathString := processedEventPath.ToString() // loop invariant - for _, pattern := range excludePatterns { - matches, err := doublestar.Match(pattern, processedPathString) - if err != nil { - f.errors <- err - } else if matches { - isExcluded = true - break - } - } - - // 4. Report the file events we care about. - if !isExcluded { - f.events <- Event{ - Path: processedEventPath, - EventType: toFileEvent(ev.Flags), - } - } - } - } - }() - - return nil -} - -func waitForCookie(root turbopath.AbsoluteSystemPath, events <-chan []fsevents.Event, timeout time.Duration) error { - // This cookie needs to be in a location that we're watching, and at this point we can't guarantee - // what the root is, or if something like "node_modules/.cache/turbo" would make sense. As a compromise, ensure - // that we clean it up even in the event of a failure. - cookiePath := root.UntypedJoin(".turbo-cookie") - if err := cookiePath.WriteFile([]byte("cookie"), 0755); err != nil { - return err - } - expected := cookiePath.ToString()[1:] // trim leading slash - if err := waitForEvent(events, expected, fsevents.ItemCreated, timeout); err != nil { - // Attempt to not leave the cookie file lying around. - // Ignore the error, since there's not much we can do with it. - _ = cookiePath.Remove() - return err - } - if err := cookiePath.Remove(); err != nil { - return err - } - if err := waitForEvent(events, expected, fsevents.ItemRemoved, timeout); err != nil { - return err - } - return nil -} - -func waitForEvent(events <-chan []fsevents.Event, path string, flag fsevents.EventFlags, timeout time.Duration) error { - ch := make(chan struct{}) - go func() { - for evs := range events { - for _, ev := range evs { - if ev.Path == path && ev.Flags&flag != 0 { - close(ch) - return - } - } - } - }() - select { - case <-time.After(timeout): - return errors.Wrap(ErrFailedToStart, "timed out waiting for initial fsevents cookie") - case <-ch: - return nil - } -} - -var _modifiedMask = fsevents.ItemModified | fsevents.ItemInodeMetaMod | fsevents.ItemFinderInfoMod | fsevents.ItemChangeOwner | fsevents.ItemXattrMod - -func toFileEvent(flags fsevents.EventFlags) FileEvent { - if flags&fsevents.ItemCreated != 0 { - return FileAdded - } else if flags&fsevents.ItemRemoved != 0 { - return FileDeleted - } else if flags&_modifiedMask != 0 { - return FileModified - } else if flags&fsevents.ItemRenamed != 0 { - return FileRenamed - } else if flags&fsevents.RootChanged != 0 { - // count this as a delete, something affected the path to the root - // of the stream - return FileDeleted - } - return FileOther -} - -// GetPlatformSpecificBackend returns a filewatching backend appropriate for the OS we are -// running on. -func GetPlatformSpecificBackend(logger hclog.Logger) (Backend, error) { - return &fseventsBackend{ - events: make(chan Event), - errors: make(chan error), - logger: logger.Named("fsevents"), - }, nil -} diff --git a/cli/internal/filewatcher/cookie.go b/cli/internal/filewatcher/cookie.go deleted file mode 100644 index 7a4931e..0000000 --- a/cli/internal/filewatcher/cookie.go +++ /dev/null @@ -1,160 +0,0 @@ -package filewatcher - -import ( - "fmt" - "os" - "sync" - "sync/atomic" - "time" - - "github.com/pkg/errors" - "github.com/vercel/turbo/cli/internal/fs" - "github.com/vercel/turbo/cli/internal/turbopath" -) - -// CookieWaiter is the interface used by clients that need to wait -// for a roundtrip through the filewatching API. -type CookieWaiter interface { - WaitForCookie() error -} - -var ( - // ErrCookieTimeout is returned when we did not see our cookie file within the given time constraints - ErrCookieTimeout = errors.New("timed out waiting for cookie") - // ErrCookieWatchingClosed is returned when the underlying filewatching has been closed. - ErrCookieWatchingClosed = errors.New("filewatching has closed, cannot watch cookies") -) - -// CookieJar is used for tracking roundtrips through the filesystem watching API -type CookieJar struct { - timeout time.Duration - dir turbopath.AbsoluteSystemPath - serial uint64 - mu sync.Mutex - cookies map[turbopath.AbsoluteSystemPath]chan error - closed bool -} - -// NewCookieJar returns a new instance of a CookieJar. There should only ever be a single -// instance live per cookieDir, since they expect to have full control over that directory. -func NewCookieJar(cookieDir turbopath.AbsoluteSystemPath, timeout time.Duration) (*CookieJar, error) { - if err := cookieDir.RemoveAll(); err != nil { - return nil, err - } - if err := cookieDir.MkdirAll(0775); err != nil { - return nil, err - } - return &CookieJar{ - timeout: timeout, - dir: cookieDir, - cookies: make(map[turbopath.AbsoluteSystemPath]chan error), - }, nil -} - -// removeAllCookiesWithError sends the error to every channel, closes every channel, -// and attempts to remove every cookie file. Must be called while the cj.mu is held. -// If the cookie jar is going to be reused afterwards, the cookies map must be reinitialized. -func (cj *CookieJar) removeAllCookiesWithError(err error) { - for p, ch := range cj.cookies { - _ = p.Remove() - ch <- err - close(ch) - } - // Drop all of the references so they can be cleaned up - cj.cookies = nil -} - -// OnFileWatchClosed handles the case where filewatching had to close for some reason -// We send an error to all of our cookies and stop accepting new ones. -func (cj *CookieJar) OnFileWatchClosed() { - cj.mu.Lock() - defer cj.mu.Unlock() - cj.closed = true - cj.removeAllCookiesWithError(ErrCookieWatchingClosed) - -} - -// OnFileWatchError handles when filewatching has encountered an error. -// In the error case, we remove all cookies and send them errors. We remain -// available for later cookies. -func (cj *CookieJar) OnFileWatchError(err error) { - // We are now in an inconsistent state. Drop all of our cookies, - // but we still allow new ones to be created - cj.mu.Lock() - defer cj.mu.Unlock() - cj.removeAllCookiesWithError(err) - cj.cookies = make(map[turbopath.AbsoluteSystemPath]chan error) -} - -// OnFileWatchEvent determines if the specified event is relevant -// for cookie watching and notifies the appropriate cookie if so. -func (cj *CookieJar) OnFileWatchEvent(ev Event) { - if ev.EventType == FileAdded { - isCookie, err := fs.DirContainsPath(cj.dir.ToStringDuringMigration(), ev.Path.ToStringDuringMigration()) - if err != nil { - cj.OnFileWatchError(errors.Wrapf(err, "failed to determine if path is a cookie: %v", ev.Path)) - } else if isCookie { - cj.notifyCookie(ev.Path, nil) - } - } -} - -// WaitForCookie touches a unique file, then waits for it to show up in filesystem notifications. -// This provides a theoretical bound on filesystem operations, although it's possible -// that underlying filewatch mechanisms don't respect this ordering. -func (cj *CookieJar) WaitForCookie() error { - // we're only ever going to send a single error on the channel, add a buffer so that we never - // block sending it. - ch := make(chan error, 1) - serial := atomic.AddUint64(&cj.serial, 1) - cookiePath := cj.dir.UntypedJoin(fmt.Sprintf("%v.cookie", serial)) - cj.mu.Lock() - if cj.closed { - cj.mu.Unlock() - return ErrCookieWatchingClosed - } - cj.cookies[cookiePath] = ch - cj.mu.Unlock() - if err := touchCookieFile(cookiePath); err != nil { - cj.notifyCookie(cookiePath, err) - return err - } - select { - case <-time.After(cj.timeout): - return ErrCookieTimeout - case err, ok := <-ch: - if !ok { - // the channel closed without an error, we're all set - return nil - } - // the channel didn't close, meaning we got some error. - // We don't need to wait on channel close, it's going to be closed - // immediately by whoever sent the error. Return the error directly - return err - } -} - -func (cj *CookieJar) notifyCookie(cookie turbopath.AbsoluteSystemPath, err error) { - cj.mu.Lock() - ch, ok := cj.cookies[cookie] - // delete is a no-op if the key doesn't exist - delete(cj.cookies, cookie) - cj.mu.Unlock() - if ok { - if err != nil { - ch <- err - } - close(ch) - } -} - -func touchCookieFile(cookie turbopath.AbsoluteSystemPath) error { - f, err := cookie.OpenFile(os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0700) - if err != nil { - return err - } - if err := f.Close(); err != nil { - return err - } - return nil -} diff --git a/cli/internal/filewatcher/cookie_test.go b/cli/internal/filewatcher/cookie_test.go deleted file mode 100644 index 96241b4..0000000 --- a/cli/internal/filewatcher/cookie_test.go +++ /dev/null @@ -1,130 +0,0 @@ -package filewatcher - -import ( - "testing" - "time" - - "github.com/hashicorp/go-hclog" - "github.com/pkg/errors" - "github.com/vercel/turbo/cli/internal/fs" - "gotest.tools/v3/assert" -) - -func TestWaitForCookie(t *testing.T) { - logger := hclog.Default() - cookieDir := fs.AbsoluteSystemPathFromUpstream(t.TempDir()) - repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir()) - - jar, err := NewCookieJar(cookieDir, 5*time.Second) - assert.NilError(t, err, "NewCookieJar") - - watcher, err := GetPlatformSpecificBackend(logger) - assert.NilError(t, err, "NewWatcher") - fw := New(logger, repoRoot, watcher) - err = fw.Start() - assert.NilError(t, err, "Start") - fw.AddClient(jar) - err = fw.AddRoot(cookieDir) - assert.NilError(t, err, "Add") - - err = jar.WaitForCookie() - assert.NilError(t, err, "failed to roundtrip cookie") -} - -func TestWaitForCookieAfterClose(t *testing.T) { - logger := hclog.Default() - cookieDir := fs.AbsoluteSystemPathFromUpstream(t.TempDir()) - repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir()) - - jar, err := NewCookieJar(cookieDir, 5*time.Second) - assert.NilError(t, err, "NewCookieJar") - - watcher, err := GetPlatformSpecificBackend(logger) - assert.NilError(t, err, "NewWatcher") - fw := New(logger, repoRoot, watcher) - err = fw.Start() - assert.NilError(t, err, "Start") - fw.AddClient(jar) - err = fw.AddRoot(cookieDir) - assert.NilError(t, err, "Add") - - err = fw.Close() - assert.NilError(t, err, "Close") - err = jar.WaitForCookie() - assert.ErrorIs(t, err, ErrCookieWatchingClosed) -} - -func TestWaitForCookieTimeout(t *testing.T) { - logger := hclog.Default() - cookieDir := fs.AbsoluteSystemPathFromUpstream(t.TempDir()) - repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir()) - - jar, err := NewCookieJar(cookieDir, 10*time.Millisecond) - assert.NilError(t, err, "NewCookieJar") - - watcher, err := GetPlatformSpecificBackend(logger) - assert.NilError(t, err, "NewWatcher") - fw := New(logger, repoRoot, watcher) - err = fw.Start() - assert.NilError(t, err, "Start") - fw.AddClient(jar) - - // NOTE: don't call fw.Add here so that no file event gets delivered - - err = jar.WaitForCookie() - assert.ErrorIs(t, err, ErrCookieTimeout) -} - -func TestWaitForCookieWithError(t *testing.T) { - logger := hclog.Default() - cookieDir := fs.AbsoluteSystemPathFromUpstream(t.TempDir()) - repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir()) - - jar, err := NewCookieJar(cookieDir, 10*time.Second) - assert.NilError(t, err, "NewCookieJar") - - watcher, err := GetPlatformSpecificBackend(logger) - assert.NilError(t, err, "NewWatcher") - fw := New(logger, repoRoot, watcher) - err = fw.Start() - assert.NilError(t, err, "Start") - fw.AddClient(jar) - - // NOTE: don't call fw.Add here so that no file event gets delivered - myErr := errors.New("an error") - ch := make(chan error) - go func() { - if err := jar.WaitForCookie(); err != nil { - ch <- err - } - close(ch) - }() - // wait for the cookie to be registered in the jar - for { - found := false - jar.mu.Lock() - if len(jar.cookies) == 1 { - found = true - } - jar.mu.Unlock() - if found { - break - } - <-time.After(10 * time.Millisecond) - } - jar.OnFileWatchError(myErr) - - err, ok := <-ch - if !ok { - t.Error("expected to get an error from cookie watching") - } - assert.ErrorIs(t, err, myErr) - - // ensure waiting for a new cookie still works. - // Add the filewatch to allow cookies work normally - err = fw.AddRoot(cookieDir) - assert.NilError(t, err, "Add") - - err = jar.WaitForCookie() - assert.NilError(t, err, "WaitForCookie") -} diff --git a/cli/internal/filewatcher/filewatcher.go b/cli/internal/filewatcher/filewatcher.go deleted file mode 100644 index 4f79495..0000000 --- a/cli/internal/filewatcher/filewatcher.go +++ /dev/null @@ -1,167 +0,0 @@ -// Package filewatcher is used to handle watching for file changes inside the monorepo -package filewatcher - -import ( - "path/filepath" - "strings" - "sync" - - "github.com/hashicorp/go-hclog" - "github.com/pkg/errors" - "github.com/vercel/turbo/cli/internal/turbopath" -) - -// _ignores is the set of paths we exempt from file-watching -var _ignores = []string{".git", "node_modules"} - -// FileWatchClient defines the callbacks used by the file watching loop. -// All methods are called from the same goroutine so they: -// 1) do not need synchronization -// 2) should minimize the work they are doing when called, if possible -type FileWatchClient interface { - OnFileWatchEvent(ev Event) - OnFileWatchError(err error) - OnFileWatchClosed() -} - -// FileEvent is an enum covering the kinds of things that can happen -// to files that we might be interested in -type FileEvent int - -const ( - // FileAdded - this is a new file - FileAdded FileEvent = iota + 1 - // FileDeleted - this file has been removed - FileDeleted - // FileModified - this file has been changed in some way - FileModified - // FileRenamed - a file's name has changed - FileRenamed - // FileOther - some other backend-specific event has happened - FileOther -) - -var ( - // ErrFilewatchingClosed is returned when filewatching has been closed - ErrFilewatchingClosed = errors.New("Close() has already been called for filewatching") - // ErrFailedToStart is returned when filewatching fails to start up - ErrFailedToStart = errors.New("filewatching failed to start") -) - -// Event is the backend-independent information about a file change -type Event struct { - Path turbopath.AbsoluteSystemPath - EventType FileEvent -} - -// Backend is the interface that describes what an underlying filesystem watching backend -// must provide. -type Backend interface { - AddRoot(root turbopath.AbsoluteSystemPath, excludePatterns ...string) error - Events() <-chan Event - Errors() <-chan error - Close() error - Start() error -} - -// FileWatcher handles watching all of the files in the monorepo. -// We currently ignore .git and top-level node_modules. We can revisit -// if necessary. -type FileWatcher struct { - backend Backend - - logger hclog.Logger - repoRoot turbopath.AbsoluteSystemPath - excludePattern string - - clientsMu sync.RWMutex - clients []FileWatchClient - closed bool -} - -// New returns a new FileWatcher instance -func New(logger hclog.Logger, repoRoot turbopath.AbsoluteSystemPath, backend Backend) *FileWatcher { - excludes := make([]string, len(_ignores)) - for i, ignore := range _ignores { - excludes[i] = filepath.ToSlash(repoRoot.UntypedJoin(ignore).ToString() + "/**") - } - excludePattern := "{" + strings.Join(excludes, ",") + "}" - return &FileWatcher{ - backend: backend, - logger: logger, - repoRoot: repoRoot, - excludePattern: excludePattern, - } -} - -// Close shuts down filewatching -func (fw *FileWatcher) Close() error { - return fw.backend.Close() -} - -// Start recursively adds all directories from the repo root, redacts the excluded ones, -// then fires off a goroutine to respond to filesystem events -func (fw *FileWatcher) Start() error { - if err := fw.backend.AddRoot(fw.repoRoot, fw.excludePattern); err != nil { - return err - } - if err := fw.backend.Start(); err != nil { - return err - } - go fw.watch() - return nil -} - -// AddRoot registers the root a filesystem hierarchy to be watched for changes. Events are *not* -// fired for existing files when AddRoot is called, only for subsequent changes. -// NOTE: if it appears helpful, we could change this behavior so that we provide a stream of initial -// events. -func (fw *FileWatcher) AddRoot(root turbopath.AbsoluteSystemPath, excludePatterns ...string) error { - return fw.backend.AddRoot(root, excludePatterns...) -} - -// watch is the main file-watching loop. Watching is not recursive, -// so when new directories are added, they are manually recursively watched. -func (fw *FileWatcher) watch() { -outer: - for { - select { - case ev, ok := <-fw.backend.Events(): - if !ok { - fw.logger.Info("Events channel closed. Exiting watch loop") - break outer - } - fw.clientsMu.RLock() - for _, client := range fw.clients { - client.OnFileWatchEvent(ev) - } - fw.clientsMu.RUnlock() - case err, ok := <-fw.backend.Errors(): - if !ok { - fw.logger.Info("Errors channel closed. Exiting watch loop") - break outer - } - fw.clientsMu.RLock() - for _, client := range fw.clients { - client.OnFileWatchError(err) - } - fw.clientsMu.RUnlock() - } - } - fw.clientsMu.Lock() - fw.closed = true - for _, client := range fw.clients { - client.OnFileWatchClosed() - } - fw.clientsMu.Unlock() -} - -// AddClient registers a client for filesystem events -func (fw *FileWatcher) AddClient(client FileWatchClient) { - fw.clientsMu.Lock() - defer fw.clientsMu.Unlock() - fw.clients = append(fw.clients, client) - if fw.closed { - client.OnFileWatchClosed() - } -} diff --git a/cli/internal/filewatcher/filewatcher_test.go b/cli/internal/filewatcher/filewatcher_test.go deleted file mode 100644 index 72b48ba..0000000 --- a/cli/internal/filewatcher/filewatcher_test.go +++ /dev/null @@ -1,152 +0,0 @@ -package filewatcher - -import ( - "fmt" - "sync" - "testing" - "time" - - "github.com/hashicorp/go-hclog" - "github.com/vercel/turbo/cli/internal/fs" - "github.com/vercel/turbo/cli/internal/turbopath" - "gotest.tools/v3/assert" -) - -type testClient struct { - mu sync.Mutex - createEvents []Event - notify chan Event -} - -func (c *testClient) OnFileWatchEvent(ev Event) { - if ev.EventType == FileAdded { - c.mu.Lock() - defer c.mu.Unlock() - c.createEvents = append(c.createEvents, ev) - c.notify <- ev - } -} - -func (c *testClient) OnFileWatchError(err error) {} - -func (c *testClient) OnFileWatchClosed() {} - -func expectFilesystemEvent(t *testing.T, ch <-chan Event, expected Event) { - // mark this method as a helper - t.Helper() - timeout := time.After(1 * time.Second) - for { - select { - case ev := <-ch: - t.Logf("got event %v", ev) - if ev.Path == expected.Path && ev.EventType == expected.EventType { - return - } - case <-timeout: - t.Errorf("Timed out waiting for filesystem event at %v", expected.Path) - return - } - } -} - -func expectNoFilesystemEvent(t *testing.T, ch <-chan Event) { - // mark this method as a helper - t.Helper() - select { - case ev, ok := <-ch: - if ok { - t.Errorf("got unexpected filesystem event %v", ev) - } else { - t.Error("filewatching closed unexpectedly") - } - case <-time.After(500 * time.Millisecond): - return - } -} - -func expectWatching(t *testing.T, c *testClient, dirs []turbopath.AbsoluteSystemPath) { - t.Helper() - now := time.Now() - filename := fmt.Sprintf("test-%v", now.UnixMilli()) - for _, dir := range dirs { - file := dir.UntypedJoin(filename) - err := file.WriteFile([]byte("hello"), 0755) - assert.NilError(t, err, "WriteFile") - expectFilesystemEvent(t, c.notify, Event{ - Path: file, - EventType: FileAdded, - }) - } -} - -func TestFileWatching(t *testing.T) { - logger := hclog.Default() - logger.SetLevel(hclog.Debug) - repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir()) - err := repoRoot.UntypedJoin(".git").MkdirAll(0775) - assert.NilError(t, err, "MkdirAll") - err = repoRoot.UntypedJoin("node_modules", "some-dep").MkdirAll(0775) - assert.NilError(t, err, "MkdirAll") - err = repoRoot.UntypedJoin("parent", "child").MkdirAll(0775) - assert.NilError(t, err, "MkdirAll") - err = repoRoot.UntypedJoin("parent", "sibling").MkdirAll(0775) - assert.NilError(t, err, "MkdirAll") - - // Directory layout: - // / - // .git/ - // node_modules/ - // some-dep/ - // parent/ - // child/ - // sibling/ - - watcher, err := GetPlatformSpecificBackend(logger) - assert.NilError(t, err, "GetPlatformSpecificBackend") - fw := New(logger, repoRoot, watcher) - err = fw.Start() - assert.NilError(t, err, "fw.Start") - - // Add a client - ch := make(chan Event, 1) - c := &testClient{ - notify: ch, - } - fw.AddClient(c) - expectedWatching := []turbopath.AbsoluteSystemPath{ - repoRoot, - repoRoot.UntypedJoin("parent"), - repoRoot.UntypedJoin("parent", "child"), - repoRoot.UntypedJoin("parent", "sibling"), - } - expectWatching(t, c, expectedWatching) - - fooPath := repoRoot.UntypedJoin("parent", "child", "foo") - err = fooPath.WriteFile([]byte("hello"), 0644) - assert.NilError(t, err, "WriteFile") - expectFilesystemEvent(t, ch, Event{ - EventType: FileAdded, - Path: fooPath, - }) - - deepPath := repoRoot.UntypedJoin("parent", "sibling", "deep", "path") - err = deepPath.MkdirAll(0775) - assert.NilError(t, err, "MkdirAll") - // We'll catch an event for "deep", but not "deep/path" since - // we don't have a recursive watch - expectFilesystemEvent(t, ch, Event{ - Path: repoRoot.UntypedJoin("parent", "sibling", "deep"), - EventType: FileAdded, - }) - expectFilesystemEvent(t, ch, Event{ - Path: repoRoot.UntypedJoin("parent", "sibling", "deep", "path"), - EventType: FileAdded, - }) - expectedWatching = append(expectedWatching, deepPath, repoRoot.UntypedJoin("parent", "sibling", "deep")) - expectWatching(t, c, expectedWatching) - - gitFilePath := repoRoot.UntypedJoin(".git", "git-file") - err = gitFilePath.WriteFile([]byte("nope"), 0644) - assert.NilError(t, err, "WriteFile") - expectNoFilesystemEvent(t, ch) -} -- cgit v1.2.3-70-g09d2