aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/cli/internal/taskhash/taskhash.go
blob: a912ad9370b449229be0d3699bd27cbd84f7a74d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
// Package taskhash handles calculating dependency hashes for nodes in the task execution graph.
package taskhash

import (
	"fmt"
	"sort"
	"strings"
	"sync"

	"github.com/hashicorp/go-hclog"
	"github.com/pyr-sh/dag"
	gitignore "github.com/sabhiram/go-gitignore"
	"github.com/vercel/turbo/cli/internal/doublestar"
	"github.com/vercel/turbo/cli/internal/env"
	"github.com/vercel/turbo/cli/internal/fs"
	"github.com/vercel/turbo/cli/internal/hashing"
	"github.com/vercel/turbo/cli/internal/inference"
	"github.com/vercel/turbo/cli/internal/nodes"
	"github.com/vercel/turbo/cli/internal/runsummary"
	"github.com/vercel/turbo/cli/internal/turbopath"
	"github.com/vercel/turbo/cli/internal/util"
	"github.com/vercel/turbo/cli/internal/workspace"
	"golang.org/x/sync/errgroup"
)

// Tracker caches package-inputs hashes, as well as package-task hashes.
// package-inputs hashes must be calculated before package-task hashes,
// and package-task hashes must be calculated in topographical order.
// package-task hashing is threadsafe, provided topographical order is
// respected.
type Tracker struct {
	rootNode   string
	globalHash string
	pipeline   fs.Pipeline

	packageInputsHashes packageFileHashes

	// packageInputsExpandedHashes is a map of a hashkey to a list of files that are inputs to the task.
	// Writes to this map happen during CalculateFileHash(). Since this happens synchronously
	// before walking the task graph, it does not need to be protected by a mutex.
	packageInputsExpandedHashes map[packageFileHashKey]map[turbopath.AnchoredUnixPath]string

	// mu is a mutex that we can lock/unlock to read/write from maps
	// the fields below should be protected by the mutex.
	mu                     sync.RWMutex
	packageTaskEnvVars     map[string]env.DetailedMap // taskId -> envvar pairs that affect the hash.
	packageTaskHashes      map[string]string          // taskID -> hash
	packageTaskFramework   map[string]string          // taskID -> inferred framework for package
	packageTaskOutputs     map[string][]turbopath.AnchoredSystemPath
	packageTaskCacheStatus map[string]runsummary.TaskCacheSummary
}

// NewTracker creates a tracker for package-inputs combinations and package-task combinations.
func NewTracker(rootNode string, globalHash string, pipeline fs.Pipeline) *Tracker {
	return &Tracker{
		rootNode:               rootNode,
		globalHash:             globalHash,
		pipeline:               pipeline,
		packageTaskHashes:      make(map[string]string),
		packageTaskFramework:   make(map[string]string),
		packageTaskEnvVars:     make(map[string]env.DetailedMap),
		packageTaskOutputs:     make(map[string][]turbopath.AnchoredSystemPath),
		packageTaskCacheStatus: make(map[string]runsummary.TaskCacheSummary),
	}
}

// packageFileSpec defines a combination of a package and optional set of input globs
type packageFileSpec struct {
	pkg    string
	inputs []string
}

func specFromPackageTask(packageTask *nodes.PackageTask) packageFileSpec {
	return packageFileSpec{
		pkg:    packageTask.PackageName,
		inputs: packageTask.TaskDefinition.Inputs,
	}
}

// packageFileHashKey is a hashable representation of a packageFileSpec.
type packageFileHashKey string

// hashes the inputs for a packageTask
func (pfs packageFileSpec) ToKey() packageFileHashKey {
	sort.Strings(pfs.inputs)
	return packageFileHashKey(fmt.Sprintf("%v#%v", pfs.pkg, strings.Join(pfs.inputs, "!")))
}

func safeCompileIgnoreFile(filepath string) (*gitignore.GitIgnore, error) {
	if fs.FileExists(filepath) {
		return gitignore.CompileIgnoreFile(filepath)
	}
	// no op
	return gitignore.CompileIgnoreLines([]string{}...), nil
}

func (pfs *packageFileSpec) getHashObject(pkg *fs.PackageJSON, repoRoot turbopath.AbsoluteSystemPath) map[turbopath.AnchoredUnixPath]string {
	hashObject, pkgDepsErr := hashing.GetPackageDeps(repoRoot, &hashing.PackageDepsOptions{
		PackagePath:   pkg.Dir,
		InputPatterns: pfs.inputs,
	})
	if pkgDepsErr != nil {
		manualHashObject, err := manuallyHashPackage(pkg, pfs.inputs, repoRoot)
		if err != nil {
			return make(map[turbopath.AnchoredUnixPath]string)
		}
		hashObject = manualHashObject
	}

	return hashObject
}

func (pfs *packageFileSpec) hash(hashObject map[turbopath.AnchoredUnixPath]string) (string, error) {
	hashOfFiles, otherErr := fs.HashObject(hashObject)
	if otherErr != nil {
		return "", otherErr
	}
	return hashOfFiles, nil
}

func manuallyHashPackage(pkg *fs.PackageJSON, inputs []string, rootPath turbopath.AbsoluteSystemPath) (map[turbopath.AnchoredUnixPath]string, error) {
	hashObject := make(map[turbopath.AnchoredUnixPath]string)
	// Instead of implementing all gitignore properly, we hack it. We only respect .gitignore in the root and in
	// the directory of a package.
	ignore, err := safeCompileIgnoreFile(rootPath.UntypedJoin(".gitignore").ToString())
	if err != nil {
		return nil, err
	}

	ignorePkg, err := safeCompileIgnoreFile(rootPath.UntypedJoin(pkg.Dir.ToStringDuringMigration(), ".gitignore").ToString())
	if err != nil {
		return nil, err
	}

	pathPrefix := rootPath.UntypedJoin(pkg.Dir.ToStringDuringMigration())
	includePattern := ""
	excludePattern := ""
	if len(inputs) > 0 {
		var includePatterns []string
		var excludePatterns []string
		for _, pattern := range inputs {
			if len(pattern) > 0 && pattern[0] == '!' {
				excludePatterns = append(excludePatterns, pathPrefix.UntypedJoin(pattern[1:]).ToString())
			} else {
				includePatterns = append(includePatterns, pathPrefix.UntypedJoin(pattern).ToString())
			}
		}
		if len(includePatterns) > 0 {
			includePattern = "{" + strings.Join(includePatterns, ",") + "}"
		}
		if len(excludePatterns) > 0 {
			excludePattern = "{" + strings.Join(excludePatterns, ",") + "}"
		}
	}

	err = fs.Walk(pathPrefix.ToStringDuringMigration(), func(name string, isDir bool) error {
		convertedName := turbopath.AbsoluteSystemPathFromUpstream(name)
		rootMatch := ignore.MatchesPath(convertedName.ToString())
		otherMatch := ignorePkg.MatchesPath(convertedName.ToString())
		if !rootMatch && !otherMatch {
			if !isDir {
				if includePattern != "" {
					val, err := doublestar.PathMatch(includePattern, convertedName.ToString())
					if err != nil {
						return err
					}
					if !val {
						return nil
					}
				}
				if excludePattern != "" {
					val, err := doublestar.PathMatch(excludePattern, convertedName.ToString())
					if err != nil {
						return err
					}
					if val {
						return nil
					}
				}
				hash, err := fs.GitLikeHashFile(convertedName.ToString())
				if err != nil {
					return fmt.Errorf("could not hash file %v. \n%w", convertedName.ToString(), err)
				}

				relativePath, err := convertedName.RelativeTo(pathPrefix)
				if err != nil {
					return fmt.Errorf("File path cannot be made relative: %w", err)
				}
				hashObject[relativePath.ToUnixPath()] = hash
			}
		}
		return nil
	})
	if err != nil {
		return nil, err
	}
	return hashObject, nil
}

// packageFileHashes is a map from a package and optional input globs to the hash of
// the matched files in the package.
type packageFileHashes map[packageFileHashKey]string

// CalculateFileHashes hashes each unique package-inputs combination that is present
// in the task graph. Must be called before calculating task hashes.
func (th *Tracker) CalculateFileHashes(
	allTasks []dag.Vertex,
	workerCount int,
	workspaceInfos workspace.Catalog,
	taskDefinitions map[string]*fs.TaskDefinition,
	repoRoot turbopath.AbsoluteSystemPath,
) error {
	hashTasks := make(util.Set)

	for _, v := range allTasks {
		taskID, ok := v.(string)
		if !ok {
			return fmt.Errorf("unknown task %v", taskID)
		}
		if taskID == th.rootNode {
			continue
		}
		pkgName, _ := util.GetPackageTaskFromId(taskID)
		if pkgName == th.rootNode {
			continue
		}

		taskDefinition, ok := taskDefinitions[taskID]
		if !ok {
			return fmt.Errorf("missing pipeline entry %v", taskID)
		}

		pfs := &packageFileSpec{
			pkg:    pkgName,
			inputs: taskDefinition.Inputs,
		}

		hashTasks.Add(pfs)
	}

	hashes := make(map[packageFileHashKey]string, len(hashTasks))
	hashObjects := make(map[packageFileHashKey]map[turbopath.AnchoredUnixPath]string, len(hashTasks))
	hashQueue := make(chan *packageFileSpec, workerCount)
	hashErrs := &errgroup.Group{}

	for i := 0; i < workerCount; i++ {
		hashErrs.Go(func() error {
			for packageFileSpec := range hashQueue {
				pkg, ok := workspaceInfos.PackageJSONs[packageFileSpec.pkg]
				if !ok {
					return fmt.Errorf("cannot find package %v", packageFileSpec.pkg)
				}
				hashObject := packageFileSpec.getHashObject(pkg, repoRoot)
				hash, err := packageFileSpec.hash(hashObject)
				if err != nil {
					return err
				}
				th.mu.Lock()
				pfsKey := packageFileSpec.ToKey()
				hashes[pfsKey] = hash
				hashObjects[pfsKey] = hashObject
				th.mu.Unlock()
			}
			return nil
		})
	}
	for ht := range hashTasks {
		hashQueue <- ht.(*packageFileSpec)
	}
	close(hashQueue)
	err := hashErrs.Wait()
	if err != nil {
		return err
	}
	th.packageInputsHashes = hashes
	th.packageInputsExpandedHashes = hashObjects
	return nil
}

type taskHashable struct {
	packageDir           turbopath.AnchoredUnixPath
	hashOfFiles          string
	externalDepsHash     string
	task                 string
	outputs              fs.TaskOutputs
	passThruArgs         []string
	envMode              util.EnvMode
	passthroughEnv       []string
	hashableEnvPairs     []string
	globalHash           string
	taskDependencyHashes []string
}

type oldTaskHashable struct {
	packageDir           turbopath.AnchoredUnixPath
	hashOfFiles          string
	externalDepsHash     string
	task                 string
	outputs              fs.TaskOutputs
	passThruArgs         []string
	hashableEnvPairs     []string
	globalHash           string
	taskDependencyHashes []string
}

// calculateTaskHashFromHashable returns a hash string from the taskHashable
func calculateTaskHashFromHashable(full *taskHashable, useOldTaskHashable bool) (string, error) {
	// The user is not using the strict environment variables feature.
	if useOldTaskHashable {
		return fs.HashObject(&oldTaskHashable{
			packageDir:           full.packageDir,
			hashOfFiles:          full.hashOfFiles,
			externalDepsHash:     full.externalDepsHash,
			task:                 full.task,
			outputs:              full.outputs,
			passThruArgs:         full.passThruArgs,
			hashableEnvPairs:     full.hashableEnvPairs,
			globalHash:           full.globalHash,
			taskDependencyHashes: full.taskDependencyHashes,
		})
	}

	switch full.envMode {
	case util.Loose:
		// Remove the passthroughs from hash consideration if we're explicitly loose.
		full.passthroughEnv = nil
		return fs.HashObject(full)
	case util.Strict:
		// Collapse `nil` and `[]` in strict mode.
		if full.passthroughEnv == nil {
			full.passthroughEnv = make([]string, 0)
		}
		return fs.HashObject(full)
	case util.Infer:
		panic("task inferred status should have already been resolved")
	default:
		panic("unimplemented environment mode")
	}
}

func (th *Tracker) calculateDependencyHashes(dependencySet dag.Set) ([]string, error) {
	dependencyHashSet := make(util.Set)

	rootPrefix := th.rootNode + util.TaskDelimiter
	th.mu.RLock()
	defer th.mu.RUnlock()
	for _, dependency := range dependencySet {
		if dependency == th.rootNode {
			continue
		}
		dependencyTask, ok := dependency.(string)
		if !ok {
			return nil, fmt.Errorf("unknown task: %v", dependency)
		}
		if strings.HasPrefix(dependencyTask, rootPrefix) {
			continue
		}
		dependencyHash, ok := th.packageTaskHashes[dependencyTask]
		if !ok {
			return nil, fmt.Errorf("missing hash for dependent task: %v", dependencyTask)
		}
		dependencyHashSet.Add(dependencyHash)
	}
	dependenciesHashList := dependencyHashSet.UnsafeListOfStrings()
	sort.Strings(dependenciesHashList)
	return dependenciesHashList, nil
}

// CalculateTaskHash calculates the hash for package-task combination. It is threadsafe, provided
// that it has previously been called on its task-graph dependencies. File hashes must be calculated
// first.
func (th *Tracker) CalculateTaskHash(packageTask *nodes.PackageTask, dependencySet dag.Set, logger hclog.Logger, args []string, useOldTaskHashable bool) (string, error) {
	pfs := specFromPackageTask(packageTask)
	pkgFileHashKey := pfs.ToKey()

	hashOfFiles, ok := th.packageInputsHashes[pkgFileHashKey]
	if !ok {
		return "", fmt.Errorf("cannot find package-file hash for %v", pkgFileHashKey)
	}

	var keyMatchers []string
	framework := inference.InferFramework(packageTask.Pkg)
	if framework != nil && framework.EnvMatcher != "" {
		// log auto detected framework and env prefix
		logger.Debug(fmt.Sprintf("auto detected framework for %s", packageTask.PackageName), "framework", framework.Slug, "env_prefix", framework.EnvMatcher)
		keyMatchers = append(keyMatchers, framework.EnvMatcher)
	}

	envVars, err := env.GetHashableEnvVars(
		packageTask.TaskDefinition.EnvVarDependencies,
		keyMatchers,
		"TURBO_CI_VENDOR_ENV_KEY",
	)
	if err != nil {
		return "", err
	}
	hashableEnvPairs := envVars.All.ToHashable()
	outputs := packageTask.HashableOutputs()
	taskDependencyHashes, err := th.calculateDependencyHashes(dependencySet)
	if err != nil {
		return "", err
	}
	// log any auto detected env vars
	logger.Debug(fmt.Sprintf("task hash env vars for %s:%s", packageTask.PackageName, packageTask.Task), "vars", hashableEnvPairs)

	hash, err := calculateTaskHashFromHashable(&taskHashable{
		packageDir:           packageTask.Pkg.Dir.ToUnixPath(),
		hashOfFiles:          hashOfFiles,
		externalDepsHash:     packageTask.Pkg.ExternalDepsHash,
		task:                 packageTask.Task,
		outputs:              outputs.Sort(),
		passThruArgs:         args,
		envMode:              packageTask.EnvMode,
		passthroughEnv:       packageTask.TaskDefinition.PassthroughEnv,
		hashableEnvPairs:     hashableEnvPairs,
		globalHash:           th.globalHash,
		taskDependencyHashes: taskDependencyHashes,
	}, useOldTaskHashable)
	if err != nil {
		return "", fmt.Errorf("failed to hash task %v: %v", packageTask.TaskID, hash)
	}
	th.mu.Lock()
	th.packageTaskEnvVars[packageTask.TaskID] = envVars
	th.packageTaskHashes[packageTask.TaskID] = hash
	if framework != nil {
		th.packageTaskFramework[packageTask.TaskID] = framework.Slug
	}
	th.mu.Unlock()
	return hash, nil
}

// GetExpandedInputs gets the expanded set of inputs for a given PackageTask
func (th *Tracker) GetExpandedInputs(packageTask *nodes.PackageTask) map[turbopath.AnchoredUnixPath]string {
	pfs := specFromPackageTask(packageTask)
	expandedInputs := th.packageInputsExpandedHashes[pfs.ToKey()]
	inputsCopy := make(map[turbopath.AnchoredUnixPath]string, len(expandedInputs))

	for path, hash := range expandedInputs {
		inputsCopy[path] = hash
	}

	return inputsCopy
}

// GetEnvVars returns the hashed env vars for a given taskID
func (th *Tracker) GetEnvVars(taskID string) env.DetailedMap {
	th.mu.RLock()
	defer th.mu.RUnlock()
	return th.packageTaskEnvVars[taskID]
}

// GetFramework returns the inferred framework for a given taskID
func (th *Tracker) GetFramework(taskID string) string {
	th.mu.RLock()
	defer th.mu.RUnlock()
	return th.packageTaskFramework[taskID]
}

// GetExpandedOutputs returns a list of outputs for a given taskID
func (th *Tracker) GetExpandedOutputs(taskID string) []turbopath.AnchoredSystemPath {
	th.mu.RLock()
	defer th.mu.RUnlock()
	outputs, ok := th.packageTaskOutputs[taskID]

	if !ok {
		return []turbopath.AnchoredSystemPath{}
	}

	return outputs
}

// SetExpandedOutputs a list of outputs for a given taskID so it can be read later
func (th *Tracker) SetExpandedOutputs(taskID string, outputs []turbopath.AnchoredSystemPath) {
	th.mu.Lock()
	defer th.mu.Unlock()
	th.packageTaskOutputs[taskID] = outputs
}

// SetCacheStatus records the task status for the given taskID
func (th *Tracker) SetCacheStatus(taskID string, cacheSummary runsummary.TaskCacheSummary) {
	th.mu.Lock()
	defer th.mu.Unlock()
	th.packageTaskCacheStatus[taskID] = cacheSummary
}

// GetCacheStatus records the task status for the given taskID
func (th *Tracker) GetCacheStatus(taskID string) runsummary.TaskCacheSummary {
	th.mu.Lock()
	defer th.mu.Unlock()

	if status, ok := th.packageTaskCacheStatus[taskID]; ok {
		return status
	}

	// Return an empty one, all the fields will be false and 0
	return runsummary.TaskCacheSummary{}
}