aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/cli/internal/filewatcher
diff options
context:
space:
mode:
author简律纯 <hsiangnianian@outlook.com>2023-04-28 01:36:55 +0800
committer简律纯 <hsiangnianian@outlook.com>2023-04-28 01:36:55 +0800
commitfc8c5fdce62fb229202659408798a7b6c98f6e8b (patch)
tree7554f80e50de4af6fd255afa7c21bcdd58a7af34 /cli/internal/filewatcher
parentdd84b9d64fb98746a230cd24233ff50a562c39c9 (diff)
downloadHydroRoll-fc8c5fdce62fb229202659408798a7b6c98f6e8b.tar.gz
HydroRoll-fc8c5fdce62fb229202659408798a7b6c98f6e8b.zip
Diffstat (limited to 'cli/internal/filewatcher')
-rw-r--r--cli/internal/filewatcher/backend.go209
-rw-r--r--cli/internal/filewatcher/backend_darwin.go220
-rw-r--r--cli/internal/filewatcher/cookie.go160
-rw-r--r--cli/internal/filewatcher/cookie_test.go130
-rw-r--r--cli/internal/filewatcher/filewatcher.go167
-rw-r--r--cli/internal/filewatcher/filewatcher_test.go152
6 files changed, 0 insertions, 1038 deletions
diff --git a/cli/internal/filewatcher/backend.go b/cli/internal/filewatcher/backend.go
deleted file mode 100644
index b8b7fa8..0000000
--- a/cli/internal/filewatcher/backend.go
+++ /dev/null
@@ -1,209 +0,0 @@
-//go:build !darwin
-// +build !darwin
-
-package filewatcher
-
-import (
- "fmt"
- "os"
- "path/filepath"
- "sync"
-
- "github.com/fsnotify/fsnotify"
- "github.com/hashicorp/go-hclog"
- "github.com/karrick/godirwalk"
- "github.com/pkg/errors"
- "github.com/vercel/turbo/cli/internal/doublestar"
- "github.com/vercel/turbo/cli/internal/fs"
- "github.com/vercel/turbo/cli/internal/turbopath"
-)
-
-// watchAddMode is used to indicate whether watchRecursively should synthesize events
-// for existing files.
-type watchAddMode int
-
-const (
- dontSynthesizeEvents watchAddMode = iota
- synthesizeEvents
-)
-
-type fsNotifyBackend struct {
- watcher *fsnotify.Watcher
- events chan Event
- errors chan error
- logger hclog.Logger
-
- mu sync.Mutex
- allExcludes []string
- closed bool
-}
-
-func (f *fsNotifyBackend) Events() <-chan Event {
- return f.events
-}
-
-func (f *fsNotifyBackend) Errors() <-chan error {
- return f.errors
-}
-
-func (f *fsNotifyBackend) Close() error {
- f.mu.Lock()
- defer f.mu.Unlock()
- if f.closed {
- return ErrFilewatchingClosed
- }
- f.closed = true
- close(f.events)
- close(f.errors)
- if err := f.watcher.Close(); err != nil {
- return err
- }
- return nil
-}
-
-// onFileAdded helps up paper over cross-platform inconsistencies in fsnotify.
-// Some fsnotify backends automatically add the contents of directories. Some do
-// not. Adding a watch is idempotent, so anytime any file we care about gets added,
-// watch it.
-func (f *fsNotifyBackend) onFileAdded(name turbopath.AbsoluteSystemPath) error {
- info, err := name.Lstat()
- if err != nil {
- if errors.Is(err, os.ErrNotExist) {
- // We can race with a file being added and removed. Ignore it
- return nil
- }
- return errors.Wrapf(err, "error checking lstat of new file %v", name)
- }
- if info.IsDir() {
- // If a directory has been added, we need to synthesize events for everything it contains
- if err := f.watchRecursively(name, []string{}, synthesizeEvents); err != nil {
- return errors.Wrapf(err, "failed recursive watch of %v", name)
- }
- } else {
- if err := f.watcher.Add(name.ToString()); err != nil {
- return errors.Wrapf(err, "failed adding watch to %v", name)
- }
- }
- return nil
-}
-
-func (f *fsNotifyBackend) watchRecursively(root turbopath.AbsoluteSystemPath, excludePatterns []string, addMode watchAddMode) error {
- f.mu.Lock()
- defer f.mu.Unlock()
- err := fs.WalkMode(root.ToString(), func(name string, isDir bool, info os.FileMode) error {
- for _, excludePattern := range excludePatterns {
- excluded, err := doublestar.Match(excludePattern, filepath.ToSlash(name))
- if err != nil {
- return err
- }
- if excluded {
- return godirwalk.SkipThis
- }
- }
- if info.IsDir() && (info&os.ModeSymlink == 0) {
- if err := f.watcher.Add(name); err != nil {
- return errors.Wrapf(err, "failed adding watch to %v", name)
- }
- f.logger.Debug(fmt.Sprintf("watching directory %v", name))
- }
- if addMode == synthesizeEvents {
- f.events <- Event{
- Path: fs.AbsoluteSystemPathFromUpstream(name),
- EventType: FileAdded,
- }
- }
- return nil
- })
- if err != nil {
- return err
- }
- f.allExcludes = append(f.allExcludes, excludePatterns...)
-
- return nil
-}
-
-func (f *fsNotifyBackend) watch() {
-outer:
- for {
- select {
- case ev, ok := <-f.watcher.Events:
- if !ok {
- break outer
- }
- eventType := toFileEvent(ev.Op)
- path := fs.AbsoluteSystemPathFromUpstream(ev.Name)
- if eventType == FileAdded {
- if err := f.onFileAdded(path); err != nil {
- f.errors <- err
- }
- }
- f.events <- Event{
- Path: path,
- EventType: eventType,
- }
- case err, ok := <-f.watcher.Errors:
- if !ok {
- break outer
- }
- f.errors <- err
- }
- }
-}
-
-var _modifiedMask = fsnotify.Chmod | fsnotify.Write
-
-func toFileEvent(op fsnotify.Op) FileEvent {
- if op&fsnotify.Create != 0 {
- return FileAdded
- } else if op&fsnotify.Remove != 0 {
- return FileDeleted
- } else if op&_modifiedMask != 0 {
- return FileModified
- } else if op&fsnotify.Rename != 0 {
- return FileRenamed
- }
- return FileOther
-}
-
-func (f *fsNotifyBackend) Start() error {
- f.mu.Lock()
- defer f.mu.Unlock()
- if f.closed {
- return ErrFilewatchingClosed
- }
- for _, dir := range f.watcher.WatchList() {
- for _, excludePattern := range f.allExcludes {
- excluded, err := doublestar.Match(excludePattern, filepath.ToSlash(dir))
- if err != nil {
- return err
- }
- if excluded {
- if err := f.watcher.Remove(dir); err != nil {
- return err
- }
- }
- }
- }
- go f.watch()
- return nil
-}
-
-func (f *fsNotifyBackend) AddRoot(root turbopath.AbsoluteSystemPath, excludePatterns ...string) error {
- // We don't synthesize events for the initial watch
- return f.watchRecursively(root, excludePatterns, dontSynthesizeEvents)
-}
-
-// GetPlatformSpecificBackend returns a filewatching backend appropriate for the OS we are
-// running on.
-func GetPlatformSpecificBackend(logger hclog.Logger) (Backend, error) {
- watcher, err := fsnotify.NewWatcher()
- if err != nil {
- return nil, err
- }
- return &fsNotifyBackend{
- watcher: watcher,
- events: make(chan Event),
- errors: make(chan error),
- logger: logger.Named("fsnotify"),
- }, nil
-}
diff --git a/cli/internal/filewatcher/backend_darwin.go b/cli/internal/filewatcher/backend_darwin.go
deleted file mode 100644
index 4c029c4..0000000
--- a/cli/internal/filewatcher/backend_darwin.go
+++ /dev/null
@@ -1,220 +0,0 @@
-//go:build darwin
-// +build darwin
-
-package filewatcher
-
-import (
- "fmt"
- "strings"
- "sync"
- "time"
-
- "github.com/pkg/errors"
- "github.com/yookoala/realpath"
-
- "github.com/fsnotify/fsevents"
- "github.com/hashicorp/go-hclog"
- "github.com/vercel/turbo/cli/internal/doublestar"
- "github.com/vercel/turbo/cli/internal/fs"
- "github.com/vercel/turbo/cli/internal/turbopath"
-)
-
-type fseventsBackend struct {
- events chan Event
- errors chan error
- logger hclog.Logger
- mu sync.Mutex
- streams []*fsevents.EventStream
- closed bool
-}
-
-func (f *fseventsBackend) Events() <-chan Event {
- return f.events
-}
-
-func (f *fseventsBackend) Errors() <-chan error {
- return f.errors
-}
-
-func (f *fseventsBackend) Close() error {
- f.mu.Lock()
- defer f.mu.Unlock()
- if f.closed {
- return ErrFilewatchingClosed
- }
- f.closed = true
- for _, stream := range f.streams {
- stream.Stop()
- }
- close(f.events)
- close(f.errors)
- return nil
-}
-
-func (f *fseventsBackend) Start() error {
- return nil
-}
-
-var (
- _eventLatency = 10 * time.Millisecond
- _cookieTimeout = 500 * time.Millisecond
-)
-
-// AddRoot starts watching a new directory hierarchy. Events matching the provided excludePatterns
-// will not be forwarded.
-func (f *fseventsBackend) AddRoot(someRoot turbopath.AbsoluteSystemPath, excludePatterns ...string) error {
- // We need to resolve the real path to the hierarchy that we are going to watch
- realRoot, err := realpath.Realpath(someRoot.ToString())
- if err != nil {
- return err
- }
- root := fs.AbsoluteSystemPathFromUpstream(realRoot)
- dev, err := fsevents.DeviceForPath(root.ToString())
- if err != nil {
- return err
- }
-
- // Optimistically set up and start a stream, assuming the watch is still valid.
- s := &fsevents.EventStream{
- Paths: []string{root.ToString()},
- Latency: _eventLatency,
- Device: dev,
- Flags: fsevents.FileEvents | fsevents.WatchRoot,
- }
- s.Start()
- events := s.Events
-
- // fsevents delivers events for all existing files first, so use a cookie to detect when we're ready for new events
- if err := waitForCookie(root, events, _cookieTimeout); err != nil {
- s.Stop()
- return err
- }
-
- // Now try to persist the stream.
- f.mu.Lock()
- defer f.mu.Unlock()
- if f.closed {
- s.Stop()
- return ErrFilewatchingClosed
- }
- f.streams = append(f.streams, s)
- f.logger.Debug(fmt.Sprintf("watching root %v, excluding %v", root, excludePatterns))
-
- go func() {
- for evs := range events {
- for _, ev := range evs {
- isExcluded := false
-
- // 1. Ensure that we have a `/`-prefixed path from the event.
- var eventPath string
- if !strings.HasPrefix("/", ev.Path) {
- eventPath = "/" + ev.Path
- } else {
- eventPath = ev.Path
- }
-
- // 2. We're getting events from the real path, but we need to translate
- // back to the path we were provided since that's what the caller will
- // expect in terms of event paths.
- watchRootRelativePath := eventPath[len(realRoot):]
- processedEventPath := someRoot.UntypedJoin(watchRootRelativePath)
-
- // 3. Compare the event to all exclude patterns, short-circuit if we know
- // we are not watching this file.
- processedPathString := processedEventPath.ToString() // loop invariant
- for _, pattern := range excludePatterns {
- matches, err := doublestar.Match(pattern, processedPathString)
- if err != nil {
- f.errors <- err
- } else if matches {
- isExcluded = true
- break
- }
- }
-
- // 4. Report the file events we care about.
- if !isExcluded {
- f.events <- Event{
- Path: processedEventPath,
- EventType: toFileEvent(ev.Flags),
- }
- }
- }
- }
- }()
-
- return nil
-}
-
-func waitForCookie(root turbopath.AbsoluteSystemPath, events <-chan []fsevents.Event, timeout time.Duration) error {
- // This cookie needs to be in a location that we're watching, and at this point we can't guarantee
- // what the root is, or if something like "node_modules/.cache/turbo" would make sense. As a compromise, ensure
- // that we clean it up even in the event of a failure.
- cookiePath := root.UntypedJoin(".turbo-cookie")
- if err := cookiePath.WriteFile([]byte("cookie"), 0755); err != nil {
- return err
- }
- expected := cookiePath.ToString()[1:] // trim leading slash
- if err := waitForEvent(events, expected, fsevents.ItemCreated, timeout); err != nil {
- // Attempt to not leave the cookie file lying around.
- // Ignore the error, since there's not much we can do with it.
- _ = cookiePath.Remove()
- return err
- }
- if err := cookiePath.Remove(); err != nil {
- return err
- }
- if err := waitForEvent(events, expected, fsevents.ItemRemoved, timeout); err != nil {
- return err
- }
- return nil
-}
-
-func waitForEvent(events <-chan []fsevents.Event, path string, flag fsevents.EventFlags, timeout time.Duration) error {
- ch := make(chan struct{})
- go func() {
- for evs := range events {
- for _, ev := range evs {
- if ev.Path == path && ev.Flags&flag != 0 {
- close(ch)
- return
- }
- }
- }
- }()
- select {
- case <-time.After(timeout):
- return errors.Wrap(ErrFailedToStart, "timed out waiting for initial fsevents cookie")
- case <-ch:
- return nil
- }
-}
-
-var _modifiedMask = fsevents.ItemModified | fsevents.ItemInodeMetaMod | fsevents.ItemFinderInfoMod | fsevents.ItemChangeOwner | fsevents.ItemXattrMod
-
-func toFileEvent(flags fsevents.EventFlags) FileEvent {
- if flags&fsevents.ItemCreated != 0 {
- return FileAdded
- } else if flags&fsevents.ItemRemoved != 0 {
- return FileDeleted
- } else if flags&_modifiedMask != 0 {
- return FileModified
- } else if flags&fsevents.ItemRenamed != 0 {
- return FileRenamed
- } else if flags&fsevents.RootChanged != 0 {
- // count this as a delete, something affected the path to the root
- // of the stream
- return FileDeleted
- }
- return FileOther
-}
-
-// GetPlatformSpecificBackend returns a filewatching backend appropriate for the OS we are
-// running on.
-func GetPlatformSpecificBackend(logger hclog.Logger) (Backend, error) {
- return &fseventsBackend{
- events: make(chan Event),
- errors: make(chan error),
- logger: logger.Named("fsevents"),
- }, nil
-}
diff --git a/cli/internal/filewatcher/cookie.go b/cli/internal/filewatcher/cookie.go
deleted file mode 100644
index 7a4931e..0000000
--- a/cli/internal/filewatcher/cookie.go
+++ /dev/null
@@ -1,160 +0,0 @@
-package filewatcher
-
-import (
- "fmt"
- "os"
- "sync"
- "sync/atomic"
- "time"
-
- "github.com/pkg/errors"
- "github.com/vercel/turbo/cli/internal/fs"
- "github.com/vercel/turbo/cli/internal/turbopath"
-)
-
-// CookieWaiter is the interface used by clients that need to wait
-// for a roundtrip through the filewatching API.
-type CookieWaiter interface {
- WaitForCookie() error
-}
-
-var (
- // ErrCookieTimeout is returned when we did not see our cookie file within the given time constraints
- ErrCookieTimeout = errors.New("timed out waiting for cookie")
- // ErrCookieWatchingClosed is returned when the underlying filewatching has been closed.
- ErrCookieWatchingClosed = errors.New("filewatching has closed, cannot watch cookies")
-)
-
-// CookieJar is used for tracking roundtrips through the filesystem watching API
-type CookieJar struct {
- timeout time.Duration
- dir turbopath.AbsoluteSystemPath
- serial uint64
- mu sync.Mutex
- cookies map[turbopath.AbsoluteSystemPath]chan error
- closed bool
-}
-
-// NewCookieJar returns a new instance of a CookieJar. There should only ever be a single
-// instance live per cookieDir, since they expect to have full control over that directory.
-func NewCookieJar(cookieDir turbopath.AbsoluteSystemPath, timeout time.Duration) (*CookieJar, error) {
- if err := cookieDir.RemoveAll(); err != nil {
- return nil, err
- }
- if err := cookieDir.MkdirAll(0775); err != nil {
- return nil, err
- }
- return &CookieJar{
- timeout: timeout,
- dir: cookieDir,
- cookies: make(map[turbopath.AbsoluteSystemPath]chan error),
- }, nil
-}
-
-// removeAllCookiesWithError sends the error to every channel, closes every channel,
-// and attempts to remove every cookie file. Must be called while the cj.mu is held.
-// If the cookie jar is going to be reused afterwards, the cookies map must be reinitialized.
-func (cj *CookieJar) removeAllCookiesWithError(err error) {
- for p, ch := range cj.cookies {
- _ = p.Remove()
- ch <- err
- close(ch)
- }
- // Drop all of the references so they can be cleaned up
- cj.cookies = nil
-}
-
-// OnFileWatchClosed handles the case where filewatching had to close for some reason
-// We send an error to all of our cookies and stop accepting new ones.
-func (cj *CookieJar) OnFileWatchClosed() {
- cj.mu.Lock()
- defer cj.mu.Unlock()
- cj.closed = true
- cj.removeAllCookiesWithError(ErrCookieWatchingClosed)
-
-}
-
-// OnFileWatchError handles when filewatching has encountered an error.
-// In the error case, we remove all cookies and send them errors. We remain
-// available for later cookies.
-func (cj *CookieJar) OnFileWatchError(err error) {
- // We are now in an inconsistent state. Drop all of our cookies,
- // but we still allow new ones to be created
- cj.mu.Lock()
- defer cj.mu.Unlock()
- cj.removeAllCookiesWithError(err)
- cj.cookies = make(map[turbopath.AbsoluteSystemPath]chan error)
-}
-
-// OnFileWatchEvent determines if the specified event is relevant
-// for cookie watching and notifies the appropriate cookie if so.
-func (cj *CookieJar) OnFileWatchEvent(ev Event) {
- if ev.EventType == FileAdded {
- isCookie, err := fs.DirContainsPath(cj.dir.ToStringDuringMigration(), ev.Path.ToStringDuringMigration())
- if err != nil {
- cj.OnFileWatchError(errors.Wrapf(err, "failed to determine if path is a cookie: %v", ev.Path))
- } else if isCookie {
- cj.notifyCookie(ev.Path, nil)
- }
- }
-}
-
-// WaitForCookie touches a unique file, then waits for it to show up in filesystem notifications.
-// This provides a theoretical bound on filesystem operations, although it's possible
-// that underlying filewatch mechanisms don't respect this ordering.
-func (cj *CookieJar) WaitForCookie() error {
- // we're only ever going to send a single error on the channel, add a buffer so that we never
- // block sending it.
- ch := make(chan error, 1)
- serial := atomic.AddUint64(&cj.serial, 1)
- cookiePath := cj.dir.UntypedJoin(fmt.Sprintf("%v.cookie", serial))
- cj.mu.Lock()
- if cj.closed {
- cj.mu.Unlock()
- return ErrCookieWatchingClosed
- }
- cj.cookies[cookiePath] = ch
- cj.mu.Unlock()
- if err := touchCookieFile(cookiePath); err != nil {
- cj.notifyCookie(cookiePath, err)
- return err
- }
- select {
- case <-time.After(cj.timeout):
- return ErrCookieTimeout
- case err, ok := <-ch:
- if !ok {
- // the channel closed without an error, we're all set
- return nil
- }
- // the channel didn't close, meaning we got some error.
- // We don't need to wait on channel close, it's going to be closed
- // immediately by whoever sent the error. Return the error directly
- return err
- }
-}
-
-func (cj *CookieJar) notifyCookie(cookie turbopath.AbsoluteSystemPath, err error) {
- cj.mu.Lock()
- ch, ok := cj.cookies[cookie]
- // delete is a no-op if the key doesn't exist
- delete(cj.cookies, cookie)
- cj.mu.Unlock()
- if ok {
- if err != nil {
- ch <- err
- }
- close(ch)
- }
-}
-
-func touchCookieFile(cookie turbopath.AbsoluteSystemPath) error {
- f, err := cookie.OpenFile(os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0700)
- if err != nil {
- return err
- }
- if err := f.Close(); err != nil {
- return err
- }
- return nil
-}
diff --git a/cli/internal/filewatcher/cookie_test.go b/cli/internal/filewatcher/cookie_test.go
deleted file mode 100644
index 96241b4..0000000
--- a/cli/internal/filewatcher/cookie_test.go
+++ /dev/null
@@ -1,130 +0,0 @@
-package filewatcher
-
-import (
- "testing"
- "time"
-
- "github.com/hashicorp/go-hclog"
- "github.com/pkg/errors"
- "github.com/vercel/turbo/cli/internal/fs"
- "gotest.tools/v3/assert"
-)
-
-func TestWaitForCookie(t *testing.T) {
- logger := hclog.Default()
- cookieDir := fs.AbsoluteSystemPathFromUpstream(t.TempDir())
- repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir())
-
- jar, err := NewCookieJar(cookieDir, 5*time.Second)
- assert.NilError(t, err, "NewCookieJar")
-
- watcher, err := GetPlatformSpecificBackend(logger)
- assert.NilError(t, err, "NewWatcher")
- fw := New(logger, repoRoot, watcher)
- err = fw.Start()
- assert.NilError(t, err, "Start")
- fw.AddClient(jar)
- err = fw.AddRoot(cookieDir)
- assert.NilError(t, err, "Add")
-
- err = jar.WaitForCookie()
- assert.NilError(t, err, "failed to roundtrip cookie")
-}
-
-func TestWaitForCookieAfterClose(t *testing.T) {
- logger := hclog.Default()
- cookieDir := fs.AbsoluteSystemPathFromUpstream(t.TempDir())
- repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir())
-
- jar, err := NewCookieJar(cookieDir, 5*time.Second)
- assert.NilError(t, err, "NewCookieJar")
-
- watcher, err := GetPlatformSpecificBackend(logger)
- assert.NilError(t, err, "NewWatcher")
- fw := New(logger, repoRoot, watcher)
- err = fw.Start()
- assert.NilError(t, err, "Start")
- fw.AddClient(jar)
- err = fw.AddRoot(cookieDir)
- assert.NilError(t, err, "Add")
-
- err = fw.Close()
- assert.NilError(t, err, "Close")
- err = jar.WaitForCookie()
- assert.ErrorIs(t, err, ErrCookieWatchingClosed)
-}
-
-func TestWaitForCookieTimeout(t *testing.T) {
- logger := hclog.Default()
- cookieDir := fs.AbsoluteSystemPathFromUpstream(t.TempDir())
- repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir())
-
- jar, err := NewCookieJar(cookieDir, 10*time.Millisecond)
- assert.NilError(t, err, "NewCookieJar")
-
- watcher, err := GetPlatformSpecificBackend(logger)
- assert.NilError(t, err, "NewWatcher")
- fw := New(logger, repoRoot, watcher)
- err = fw.Start()
- assert.NilError(t, err, "Start")
- fw.AddClient(jar)
-
- // NOTE: don't call fw.Add here so that no file event gets delivered
-
- err = jar.WaitForCookie()
- assert.ErrorIs(t, err, ErrCookieTimeout)
-}
-
-func TestWaitForCookieWithError(t *testing.T) {
- logger := hclog.Default()
- cookieDir := fs.AbsoluteSystemPathFromUpstream(t.TempDir())
- repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir())
-
- jar, err := NewCookieJar(cookieDir, 10*time.Second)
- assert.NilError(t, err, "NewCookieJar")
-
- watcher, err := GetPlatformSpecificBackend(logger)
- assert.NilError(t, err, "NewWatcher")
- fw := New(logger, repoRoot, watcher)
- err = fw.Start()
- assert.NilError(t, err, "Start")
- fw.AddClient(jar)
-
- // NOTE: don't call fw.Add here so that no file event gets delivered
- myErr := errors.New("an error")
- ch := make(chan error)
- go func() {
- if err := jar.WaitForCookie(); err != nil {
- ch <- err
- }
- close(ch)
- }()
- // wait for the cookie to be registered in the jar
- for {
- found := false
- jar.mu.Lock()
- if len(jar.cookies) == 1 {
- found = true
- }
- jar.mu.Unlock()
- if found {
- break
- }
- <-time.After(10 * time.Millisecond)
- }
- jar.OnFileWatchError(myErr)
-
- err, ok := <-ch
- if !ok {
- t.Error("expected to get an error from cookie watching")
- }
- assert.ErrorIs(t, err, myErr)
-
- // ensure waiting for a new cookie still works.
- // Add the filewatch to allow cookies work normally
- err = fw.AddRoot(cookieDir)
- assert.NilError(t, err, "Add")
-
- err = jar.WaitForCookie()
- assert.NilError(t, err, "WaitForCookie")
-}
diff --git a/cli/internal/filewatcher/filewatcher.go b/cli/internal/filewatcher/filewatcher.go
deleted file mode 100644
index 4f79495..0000000
--- a/cli/internal/filewatcher/filewatcher.go
+++ /dev/null
@@ -1,167 +0,0 @@
-// Package filewatcher is used to handle watching for file changes inside the monorepo
-package filewatcher
-
-import (
- "path/filepath"
- "strings"
- "sync"
-
- "github.com/hashicorp/go-hclog"
- "github.com/pkg/errors"
- "github.com/vercel/turbo/cli/internal/turbopath"
-)
-
-// _ignores is the set of paths we exempt from file-watching
-var _ignores = []string{".git", "node_modules"}
-
-// FileWatchClient defines the callbacks used by the file watching loop.
-// All methods are called from the same goroutine so they:
-// 1) do not need synchronization
-// 2) should minimize the work they are doing when called, if possible
-type FileWatchClient interface {
- OnFileWatchEvent(ev Event)
- OnFileWatchError(err error)
- OnFileWatchClosed()
-}
-
-// FileEvent is an enum covering the kinds of things that can happen
-// to files that we might be interested in
-type FileEvent int
-
-const (
- // FileAdded - this is a new file
- FileAdded FileEvent = iota + 1
- // FileDeleted - this file has been removed
- FileDeleted
- // FileModified - this file has been changed in some way
- FileModified
- // FileRenamed - a file's name has changed
- FileRenamed
- // FileOther - some other backend-specific event has happened
- FileOther
-)
-
-var (
- // ErrFilewatchingClosed is returned when filewatching has been closed
- ErrFilewatchingClosed = errors.New("Close() has already been called for filewatching")
- // ErrFailedToStart is returned when filewatching fails to start up
- ErrFailedToStart = errors.New("filewatching failed to start")
-)
-
-// Event is the backend-independent information about a file change
-type Event struct {
- Path turbopath.AbsoluteSystemPath
- EventType FileEvent
-}
-
-// Backend is the interface that describes what an underlying filesystem watching backend
-// must provide.
-type Backend interface {
- AddRoot(root turbopath.AbsoluteSystemPath, excludePatterns ...string) error
- Events() <-chan Event
- Errors() <-chan error
- Close() error
- Start() error
-}
-
-// FileWatcher handles watching all of the files in the monorepo.
-// We currently ignore .git and top-level node_modules. We can revisit
-// if necessary.
-type FileWatcher struct {
- backend Backend
-
- logger hclog.Logger
- repoRoot turbopath.AbsoluteSystemPath
- excludePattern string
-
- clientsMu sync.RWMutex
- clients []FileWatchClient
- closed bool
-}
-
-// New returns a new FileWatcher instance
-func New(logger hclog.Logger, repoRoot turbopath.AbsoluteSystemPath, backend Backend) *FileWatcher {
- excludes := make([]string, len(_ignores))
- for i, ignore := range _ignores {
- excludes[i] = filepath.ToSlash(repoRoot.UntypedJoin(ignore).ToString() + "/**")
- }
- excludePattern := "{" + strings.Join(excludes, ",") + "}"
- return &FileWatcher{
- backend: backend,
- logger: logger,
- repoRoot: repoRoot,
- excludePattern: excludePattern,
- }
-}
-
-// Close shuts down filewatching
-func (fw *FileWatcher) Close() error {
- return fw.backend.Close()
-}
-
-// Start recursively adds all directories from the repo root, redacts the excluded ones,
-// then fires off a goroutine to respond to filesystem events
-func (fw *FileWatcher) Start() error {
- if err := fw.backend.AddRoot(fw.repoRoot, fw.excludePattern); err != nil {
- return err
- }
- if err := fw.backend.Start(); err != nil {
- return err
- }
- go fw.watch()
- return nil
-}
-
-// AddRoot registers the root a filesystem hierarchy to be watched for changes. Events are *not*
-// fired for existing files when AddRoot is called, only for subsequent changes.
-// NOTE: if it appears helpful, we could change this behavior so that we provide a stream of initial
-// events.
-func (fw *FileWatcher) AddRoot(root turbopath.AbsoluteSystemPath, excludePatterns ...string) error {
- return fw.backend.AddRoot(root, excludePatterns...)
-}
-
-// watch is the main file-watching loop. Watching is not recursive,
-// so when new directories are added, they are manually recursively watched.
-func (fw *FileWatcher) watch() {
-outer:
- for {
- select {
- case ev, ok := <-fw.backend.Events():
- if !ok {
- fw.logger.Info("Events channel closed. Exiting watch loop")
- break outer
- }
- fw.clientsMu.RLock()
- for _, client := range fw.clients {
- client.OnFileWatchEvent(ev)
- }
- fw.clientsMu.RUnlock()
- case err, ok := <-fw.backend.Errors():
- if !ok {
- fw.logger.Info("Errors channel closed. Exiting watch loop")
- break outer
- }
- fw.clientsMu.RLock()
- for _, client := range fw.clients {
- client.OnFileWatchError(err)
- }
- fw.clientsMu.RUnlock()
- }
- }
- fw.clientsMu.Lock()
- fw.closed = true
- for _, client := range fw.clients {
- client.OnFileWatchClosed()
- }
- fw.clientsMu.Unlock()
-}
-
-// AddClient registers a client for filesystem events
-func (fw *FileWatcher) AddClient(client FileWatchClient) {
- fw.clientsMu.Lock()
- defer fw.clientsMu.Unlock()
- fw.clients = append(fw.clients, client)
- if fw.closed {
- client.OnFileWatchClosed()
- }
-}
diff --git a/cli/internal/filewatcher/filewatcher_test.go b/cli/internal/filewatcher/filewatcher_test.go
deleted file mode 100644
index 72b48ba..0000000
--- a/cli/internal/filewatcher/filewatcher_test.go
+++ /dev/null
@@ -1,152 +0,0 @@
-package filewatcher
-
-import (
- "fmt"
- "sync"
- "testing"
- "time"
-
- "github.com/hashicorp/go-hclog"
- "github.com/vercel/turbo/cli/internal/fs"
- "github.com/vercel/turbo/cli/internal/turbopath"
- "gotest.tools/v3/assert"
-)
-
-type testClient struct {
- mu sync.Mutex
- createEvents []Event
- notify chan Event
-}
-
-func (c *testClient) OnFileWatchEvent(ev Event) {
- if ev.EventType == FileAdded {
- c.mu.Lock()
- defer c.mu.Unlock()
- c.createEvents = append(c.createEvents, ev)
- c.notify <- ev
- }
-}
-
-func (c *testClient) OnFileWatchError(err error) {}
-
-func (c *testClient) OnFileWatchClosed() {}
-
-func expectFilesystemEvent(t *testing.T, ch <-chan Event, expected Event) {
- // mark this method as a helper
- t.Helper()
- timeout := time.After(1 * time.Second)
- for {
- select {
- case ev := <-ch:
- t.Logf("got event %v", ev)
- if ev.Path == expected.Path && ev.EventType == expected.EventType {
- return
- }
- case <-timeout:
- t.Errorf("Timed out waiting for filesystem event at %v", expected.Path)
- return
- }
- }
-}
-
-func expectNoFilesystemEvent(t *testing.T, ch <-chan Event) {
- // mark this method as a helper
- t.Helper()
- select {
- case ev, ok := <-ch:
- if ok {
- t.Errorf("got unexpected filesystem event %v", ev)
- } else {
- t.Error("filewatching closed unexpectedly")
- }
- case <-time.After(500 * time.Millisecond):
- return
- }
-}
-
-func expectWatching(t *testing.T, c *testClient, dirs []turbopath.AbsoluteSystemPath) {
- t.Helper()
- now := time.Now()
- filename := fmt.Sprintf("test-%v", now.UnixMilli())
- for _, dir := range dirs {
- file := dir.UntypedJoin(filename)
- err := file.WriteFile([]byte("hello"), 0755)
- assert.NilError(t, err, "WriteFile")
- expectFilesystemEvent(t, c.notify, Event{
- Path: file,
- EventType: FileAdded,
- })
- }
-}
-
-func TestFileWatching(t *testing.T) {
- logger := hclog.Default()
- logger.SetLevel(hclog.Debug)
- repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir())
- err := repoRoot.UntypedJoin(".git").MkdirAll(0775)
- assert.NilError(t, err, "MkdirAll")
- err = repoRoot.UntypedJoin("node_modules", "some-dep").MkdirAll(0775)
- assert.NilError(t, err, "MkdirAll")
- err = repoRoot.UntypedJoin("parent", "child").MkdirAll(0775)
- assert.NilError(t, err, "MkdirAll")
- err = repoRoot.UntypedJoin("parent", "sibling").MkdirAll(0775)
- assert.NilError(t, err, "MkdirAll")
-
- // Directory layout:
- // <repoRoot>/
- // .git/
- // node_modules/
- // some-dep/
- // parent/
- // child/
- // sibling/
-
- watcher, err := GetPlatformSpecificBackend(logger)
- assert.NilError(t, err, "GetPlatformSpecificBackend")
- fw := New(logger, repoRoot, watcher)
- err = fw.Start()
- assert.NilError(t, err, "fw.Start")
-
- // Add a client
- ch := make(chan Event, 1)
- c := &testClient{
- notify: ch,
- }
- fw.AddClient(c)
- expectedWatching := []turbopath.AbsoluteSystemPath{
- repoRoot,
- repoRoot.UntypedJoin("parent"),
- repoRoot.UntypedJoin("parent", "child"),
- repoRoot.UntypedJoin("parent", "sibling"),
- }
- expectWatching(t, c, expectedWatching)
-
- fooPath := repoRoot.UntypedJoin("parent", "child", "foo")
- err = fooPath.WriteFile([]byte("hello"), 0644)
- assert.NilError(t, err, "WriteFile")
- expectFilesystemEvent(t, ch, Event{
- EventType: FileAdded,
- Path: fooPath,
- })
-
- deepPath := repoRoot.UntypedJoin("parent", "sibling", "deep", "path")
- err = deepPath.MkdirAll(0775)
- assert.NilError(t, err, "MkdirAll")
- // We'll catch an event for "deep", but not "deep/path" since
- // we don't have a recursive watch
- expectFilesystemEvent(t, ch, Event{
- Path: repoRoot.UntypedJoin("parent", "sibling", "deep"),
- EventType: FileAdded,
- })
- expectFilesystemEvent(t, ch, Event{
- Path: repoRoot.UntypedJoin("parent", "sibling", "deep", "path"),
- EventType: FileAdded,
- })
- expectedWatching = append(expectedWatching, deepPath, repoRoot.UntypedJoin("parent", "sibling", "deep"))
- expectWatching(t, c, expectedWatching)
-
- gitFilePath := repoRoot.UntypedJoin(".git", "git-file")
- err = gitFilePath.WriteFile([]byte("nope"), 0644)
- assert.NilError(t, err, "WriteFile")
- expectNoFilesystemEvent(t, ch)
-}