1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
|
package globwatcher
import (
"errors"
"fmt"
"path/filepath"
"sync"
"github.com/hashicorp/go-hclog"
"github.com/vercel/turbo/cli/internal/doublestar"
"github.com/vercel/turbo/cli/internal/filewatcher"
"github.com/vercel/turbo/cli/internal/fs"
"github.com/vercel/turbo/cli/internal/turbopath"
"github.com/vercel/turbo/cli/internal/util"
)
// ErrClosed is returned when attempting to get changed globs after glob watching has closed
var ErrClosed = errors.New("glob watching is closed")
type globs struct {
Inclusions util.Set
Exclusions util.Set
}
// GlobWatcher is used to track unchanged globs by hash. Once a glob registers a file change
// it is no longer tracked until a new hash requests it. Once all globs for a particular hash
// have changed, that hash is no longer tracked.
type GlobWatcher struct {
logger hclog.Logger
repoRoot turbopath.AbsoluteSystemPath
cookieWaiter filewatcher.CookieWaiter
mu sync.RWMutex // protects field below
hashGlobs map[string]globs
globStatus map[string]util.Set // glob -> hashes where this glob hasn't changed
closed bool
}
// New returns a new GlobWatcher instance
func New(logger hclog.Logger, repoRoot turbopath.AbsoluteSystemPath, cookieWaiter filewatcher.CookieWaiter) *GlobWatcher {
return &GlobWatcher{
logger: logger,
repoRoot: repoRoot,
cookieWaiter: cookieWaiter,
hashGlobs: make(map[string]globs),
globStatus: make(map[string]util.Set),
}
}
func (g *GlobWatcher) setClosed() {
g.mu.Lock()
g.closed = true
g.mu.Unlock()
}
func (g *GlobWatcher) isClosed() bool {
g.mu.RLock()
defer g.mu.RUnlock()
return g.closed
}
// WatchGlobs registers the given set of globs to be watched for changes and grouped
// under the given hash. This method pairs with GetChangedGlobs to determine which globs
// out of a set of candidates have changed since WatchGlobs was called for the same hash.
func (g *GlobWatcher) WatchGlobs(hash string, globsToWatch fs.TaskOutputs) error {
if g.isClosed() {
return ErrClosed
}
// Wait for a cookie here
// that will ensure that we have seen all filesystem writes
// *by the calling client*. Other tasks _could_ write to the
// same output directories, however we are relying on task
// execution dependencies to prevent that.
if err := g.cookieWaiter.WaitForCookie(); err != nil {
return err
}
g.mu.Lock()
defer g.mu.Unlock()
g.hashGlobs[hash] = globs{
Inclusions: util.SetFromStrings(globsToWatch.Inclusions),
Exclusions: util.SetFromStrings(globsToWatch.Exclusions),
}
for _, glob := range globsToWatch.Inclusions {
existing, ok := g.globStatus[glob]
if !ok {
existing = make(util.Set)
}
existing.Add(hash)
g.globStatus[glob] = existing
}
return nil
}
// GetChangedGlobs returns the subset of the given candidates that we are not currently
// tracking as "unchanged".
func (g *GlobWatcher) GetChangedGlobs(hash string, candidates []string) ([]string, error) {
if g.isClosed() {
// If filewatching has crashed, return all candidates as changed.
return candidates, nil
}
// Wait for a cookie here
// that will ensure that we have seen all filesystem writes
// *by the calling client*. Other tasks _could_ write to the
// same output directories, however we are relying on task
// execution dependencies to prevent that.
if err := g.cookieWaiter.WaitForCookie(); err != nil {
return nil, err
}
// hashGlobs tracks all of the unchanged globs for a given hash
// If hashGlobs doesn't have our hash, either everything has changed,
// or we were never tracking it. Either way, consider all the candidates
// to be changed globs.
g.mu.RLock()
defer g.mu.RUnlock()
globsToCheck, ok := g.hashGlobs[hash]
if !ok {
return candidates, nil
}
allGlobs := util.SetFromStrings(candidates)
diff := allGlobs.Difference(globsToCheck.Inclusions)
return diff.UnsafeListOfStrings(), nil
}
// OnFileWatchEvent implements FileWatchClient.OnFileWatchEvent
// On a file change, check if we have a glob that matches this file. Invalidate
// any matching globs, and remove them from the set of unchanged globs for the corresponding
// hashes. If this is the last glob for a hash, remove the hash from being tracked.
func (g *GlobWatcher) OnFileWatchEvent(ev filewatcher.Event) {
// At this point, we don't care what the Op is, any Op represents a change
// that should invalidate matching globs
g.logger.Trace(fmt.Sprintf("Got fsnotify event %v", ev))
absolutePath := ev.Path
repoRelativePath, err := g.repoRoot.RelativePathString(absolutePath.ToStringDuringMigration())
if err != nil {
g.logger.Debug(fmt.Sprintf("could not get relative path from %v to %v: %v", g.repoRoot, absolutePath, err))
return
}
g.mu.Lock()
defer g.mu.Unlock()
for glob, hashStatus := range g.globStatus {
matches, err := doublestar.Match(glob, filepath.ToSlash(repoRelativePath))
if err != nil {
g.logger.Error(fmt.Sprintf("failed to check path %v against glob %v: %v", repoRelativePath, glob, err))
continue
}
// If this glob matches, we know that it has changed for every hash that included this glob
// and is not excluded by a hash's exclusion globs.
// So, we can delete this glob from every hash tracking it as well as stop watching this glob.
// To stop watching, we unref each of the directories corresponding to this glob.
if matches {
for hashUntyped := range hashStatus {
hash := hashUntyped.(string)
hashGlobs, ok := g.hashGlobs[hash]
if !ok {
g.logger.Warn(fmt.Sprintf("failed to find hash %v referenced from glob %v", hash, glob))
continue
}
isExcluded := false
// Check if we've excluded this path by going through exclusion globs
for exclusionGlob := range hashGlobs.Exclusions {
matches, err := doublestar.Match(exclusionGlob.(string), filepath.ToSlash(repoRelativePath))
if err != nil {
g.logger.Error(fmt.Sprintf("failed to check path %v against glob %v: %v", repoRelativePath, glob, err))
continue
}
if matches {
isExcluded = true
break
}
}
// If we have excluded this path, then we skip it
if isExcluded {
continue
}
// We delete hash from the globStatus entry
g.globStatus[glob].Delete(hash)
// If we've deleted the last hash for a glob in globStatus, delete the whole glob entry
if len(g.globStatus[glob]) == 0 {
delete(g.globStatus, glob)
}
hashGlobs.Inclusions.Delete(glob)
// If we've deleted the last glob for a hash, delete the whole hash entry
if hashGlobs.Inclusions.Len() == 0 {
delete(g.hashGlobs, hash)
}
}
}
}
}
// OnFileWatchError implements FileWatchClient.OnFileWatchError
func (g *GlobWatcher) OnFileWatchError(err error) {
g.logger.Error(fmt.Sprintf("file watching received an error: %v", err))
}
// OnFileWatchClosed implements FileWatchClient.OnFileWatchClosed
func (g *GlobWatcher) OnFileWatchClosed() {
g.setClosed()
g.logger.Warn("GlobWatching is closing due to file watching closing")
}
|