aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/cli/internal/filewatcher/cookie.go
diff options
context:
space:
mode:
Diffstat (limited to 'cli/internal/filewatcher/cookie.go')
-rw-r--r--cli/internal/filewatcher/cookie.go160
1 files changed, 160 insertions, 0 deletions
diff --git a/cli/internal/filewatcher/cookie.go b/cli/internal/filewatcher/cookie.go
new file mode 100644
index 0000000..7a4931e
--- /dev/null
+++ b/cli/internal/filewatcher/cookie.go
@@ -0,0 +1,160 @@
+package filewatcher
+
+import (
+ "fmt"
+ "os"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/pkg/errors"
+ "github.com/vercel/turbo/cli/internal/fs"
+ "github.com/vercel/turbo/cli/internal/turbopath"
+)
+
+// CookieWaiter is the interface used by clients that need to wait
+// for a roundtrip through the filewatching API.
+type CookieWaiter interface {
+ WaitForCookie() error
+}
+
+var (
+ // ErrCookieTimeout is returned when we did not see our cookie file within the given time constraints
+ ErrCookieTimeout = errors.New("timed out waiting for cookie")
+ // ErrCookieWatchingClosed is returned when the underlying filewatching has been closed.
+ ErrCookieWatchingClosed = errors.New("filewatching has closed, cannot watch cookies")
+)
+
+// CookieJar is used for tracking roundtrips through the filesystem watching API
+type CookieJar struct {
+ timeout time.Duration
+ dir turbopath.AbsoluteSystemPath
+ serial uint64
+ mu sync.Mutex
+ cookies map[turbopath.AbsoluteSystemPath]chan error
+ closed bool
+}
+
+// NewCookieJar returns a new instance of a CookieJar. There should only ever be a single
+// instance live per cookieDir, since they expect to have full control over that directory.
+func NewCookieJar(cookieDir turbopath.AbsoluteSystemPath, timeout time.Duration) (*CookieJar, error) {
+ if err := cookieDir.RemoveAll(); err != nil {
+ return nil, err
+ }
+ if err := cookieDir.MkdirAll(0775); err != nil {
+ return nil, err
+ }
+ return &CookieJar{
+ timeout: timeout,
+ dir: cookieDir,
+ cookies: make(map[turbopath.AbsoluteSystemPath]chan error),
+ }, nil
+}
+
+// removeAllCookiesWithError sends the error to every channel, closes every channel,
+// and attempts to remove every cookie file. Must be called while the cj.mu is held.
+// If the cookie jar is going to be reused afterwards, the cookies map must be reinitialized.
+func (cj *CookieJar) removeAllCookiesWithError(err error) {
+ for p, ch := range cj.cookies {
+ _ = p.Remove()
+ ch <- err
+ close(ch)
+ }
+ // Drop all of the references so they can be cleaned up
+ cj.cookies = nil
+}
+
+// OnFileWatchClosed handles the case where filewatching had to close for some reason
+// We send an error to all of our cookies and stop accepting new ones.
+func (cj *CookieJar) OnFileWatchClosed() {
+ cj.mu.Lock()
+ defer cj.mu.Unlock()
+ cj.closed = true
+ cj.removeAllCookiesWithError(ErrCookieWatchingClosed)
+
+}
+
+// OnFileWatchError handles when filewatching has encountered an error.
+// In the error case, we remove all cookies and send them errors. We remain
+// available for later cookies.
+func (cj *CookieJar) OnFileWatchError(err error) {
+ // We are now in an inconsistent state. Drop all of our cookies,
+ // but we still allow new ones to be created
+ cj.mu.Lock()
+ defer cj.mu.Unlock()
+ cj.removeAllCookiesWithError(err)
+ cj.cookies = make(map[turbopath.AbsoluteSystemPath]chan error)
+}
+
+// OnFileWatchEvent determines if the specified event is relevant
+// for cookie watching and notifies the appropriate cookie if so.
+func (cj *CookieJar) OnFileWatchEvent(ev Event) {
+ if ev.EventType == FileAdded {
+ isCookie, err := fs.DirContainsPath(cj.dir.ToStringDuringMigration(), ev.Path.ToStringDuringMigration())
+ if err != nil {
+ cj.OnFileWatchError(errors.Wrapf(err, "failed to determine if path is a cookie: %v", ev.Path))
+ } else if isCookie {
+ cj.notifyCookie(ev.Path, nil)
+ }
+ }
+}
+
+// WaitForCookie touches a unique file, then waits for it to show up in filesystem notifications.
+// This provides a theoretical bound on filesystem operations, although it's possible
+// that underlying filewatch mechanisms don't respect this ordering.
+func (cj *CookieJar) WaitForCookie() error {
+ // we're only ever going to send a single error on the channel, add a buffer so that we never
+ // block sending it.
+ ch := make(chan error, 1)
+ serial := atomic.AddUint64(&cj.serial, 1)
+ cookiePath := cj.dir.UntypedJoin(fmt.Sprintf("%v.cookie", serial))
+ cj.mu.Lock()
+ if cj.closed {
+ cj.mu.Unlock()
+ return ErrCookieWatchingClosed
+ }
+ cj.cookies[cookiePath] = ch
+ cj.mu.Unlock()
+ if err := touchCookieFile(cookiePath); err != nil {
+ cj.notifyCookie(cookiePath, err)
+ return err
+ }
+ select {
+ case <-time.After(cj.timeout):
+ return ErrCookieTimeout
+ case err, ok := <-ch:
+ if !ok {
+ // the channel closed without an error, we're all set
+ return nil
+ }
+ // the channel didn't close, meaning we got some error.
+ // We don't need to wait on channel close, it's going to be closed
+ // immediately by whoever sent the error. Return the error directly
+ return err
+ }
+}
+
+func (cj *CookieJar) notifyCookie(cookie turbopath.AbsoluteSystemPath, err error) {
+ cj.mu.Lock()
+ ch, ok := cj.cookies[cookie]
+ // delete is a no-op if the key doesn't exist
+ delete(cj.cookies, cookie)
+ cj.mu.Unlock()
+ if ok {
+ if err != nil {
+ ch <- err
+ }
+ close(ch)
+ }
+}
+
+func touchCookieFile(cookie turbopath.AbsoluteSystemPath) error {
+ f, err := cookie.OpenFile(os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0700)
+ if err != nil {
+ return err
+ }
+ if err := f.Close(); err != nil {
+ return err
+ }
+ return nil
+}