diff options
Diffstat (limited to 'cli/internal/taskhash/taskhash.go')
| -rw-r--r-- | cli/internal/taskhash/taskhash.go | 497 |
1 files changed, 497 insertions, 0 deletions
diff --git a/cli/internal/taskhash/taskhash.go b/cli/internal/taskhash/taskhash.go new file mode 100644 index 0000000..a912ad9 --- /dev/null +++ b/cli/internal/taskhash/taskhash.go @@ -0,0 +1,497 @@ +// Package taskhash handles calculating dependency hashes for nodes in the task execution graph. +package taskhash + +import ( + "fmt" + "sort" + "strings" + "sync" + + "github.com/hashicorp/go-hclog" + "github.com/pyr-sh/dag" + gitignore "github.com/sabhiram/go-gitignore" + "github.com/vercel/turbo/cli/internal/doublestar" + "github.com/vercel/turbo/cli/internal/env" + "github.com/vercel/turbo/cli/internal/fs" + "github.com/vercel/turbo/cli/internal/hashing" + "github.com/vercel/turbo/cli/internal/inference" + "github.com/vercel/turbo/cli/internal/nodes" + "github.com/vercel/turbo/cli/internal/runsummary" + "github.com/vercel/turbo/cli/internal/turbopath" + "github.com/vercel/turbo/cli/internal/util" + "github.com/vercel/turbo/cli/internal/workspace" + "golang.org/x/sync/errgroup" +) + +// Tracker caches package-inputs hashes, as well as package-task hashes. +// package-inputs hashes must be calculated before package-task hashes, +// and package-task hashes must be calculated in topographical order. +// package-task hashing is threadsafe, provided topographical order is +// respected. +type Tracker struct { + rootNode string + globalHash string + pipeline fs.Pipeline + + packageInputsHashes packageFileHashes + + // packageInputsExpandedHashes is a map of a hashkey to a list of files that are inputs to the task. + // Writes to this map happen during CalculateFileHash(). Since this happens synchronously + // before walking the task graph, it does not need to be protected by a mutex. + packageInputsExpandedHashes map[packageFileHashKey]map[turbopath.AnchoredUnixPath]string + + // mu is a mutex that we can lock/unlock to read/write from maps + // the fields below should be protected by the mutex. + mu sync.RWMutex + packageTaskEnvVars map[string]env.DetailedMap // taskId -> envvar pairs that affect the hash. + packageTaskHashes map[string]string // taskID -> hash + packageTaskFramework map[string]string // taskID -> inferred framework for package + packageTaskOutputs map[string][]turbopath.AnchoredSystemPath + packageTaskCacheStatus map[string]runsummary.TaskCacheSummary +} + +// NewTracker creates a tracker for package-inputs combinations and package-task combinations. +func NewTracker(rootNode string, globalHash string, pipeline fs.Pipeline) *Tracker { + return &Tracker{ + rootNode: rootNode, + globalHash: globalHash, + pipeline: pipeline, + packageTaskHashes: make(map[string]string), + packageTaskFramework: make(map[string]string), + packageTaskEnvVars: make(map[string]env.DetailedMap), + packageTaskOutputs: make(map[string][]turbopath.AnchoredSystemPath), + packageTaskCacheStatus: make(map[string]runsummary.TaskCacheSummary), + } +} + +// packageFileSpec defines a combination of a package and optional set of input globs +type packageFileSpec struct { + pkg string + inputs []string +} + +func specFromPackageTask(packageTask *nodes.PackageTask) packageFileSpec { + return packageFileSpec{ + pkg: packageTask.PackageName, + inputs: packageTask.TaskDefinition.Inputs, + } +} + +// packageFileHashKey is a hashable representation of a packageFileSpec. +type packageFileHashKey string + +// hashes the inputs for a packageTask +func (pfs packageFileSpec) ToKey() packageFileHashKey { + sort.Strings(pfs.inputs) + return packageFileHashKey(fmt.Sprintf("%v#%v", pfs.pkg, strings.Join(pfs.inputs, "!"))) +} + +func safeCompileIgnoreFile(filepath string) (*gitignore.GitIgnore, error) { + if fs.FileExists(filepath) { + return gitignore.CompileIgnoreFile(filepath) + } + // no op + return gitignore.CompileIgnoreLines([]string{}...), nil +} + +func (pfs *packageFileSpec) getHashObject(pkg *fs.PackageJSON, repoRoot turbopath.AbsoluteSystemPath) map[turbopath.AnchoredUnixPath]string { + hashObject, pkgDepsErr := hashing.GetPackageDeps(repoRoot, &hashing.PackageDepsOptions{ + PackagePath: pkg.Dir, + InputPatterns: pfs.inputs, + }) + if pkgDepsErr != nil { + manualHashObject, err := manuallyHashPackage(pkg, pfs.inputs, repoRoot) + if err != nil { + return make(map[turbopath.AnchoredUnixPath]string) + } + hashObject = manualHashObject + } + + return hashObject +} + +func (pfs *packageFileSpec) hash(hashObject map[turbopath.AnchoredUnixPath]string) (string, error) { + hashOfFiles, otherErr := fs.HashObject(hashObject) + if otherErr != nil { + return "", otherErr + } + return hashOfFiles, nil +} + +func manuallyHashPackage(pkg *fs.PackageJSON, inputs []string, rootPath turbopath.AbsoluteSystemPath) (map[turbopath.AnchoredUnixPath]string, error) { + hashObject := make(map[turbopath.AnchoredUnixPath]string) + // Instead of implementing all gitignore properly, we hack it. We only respect .gitignore in the root and in + // the directory of a package. + ignore, err := safeCompileIgnoreFile(rootPath.UntypedJoin(".gitignore").ToString()) + if err != nil { + return nil, err + } + + ignorePkg, err := safeCompileIgnoreFile(rootPath.UntypedJoin(pkg.Dir.ToStringDuringMigration(), ".gitignore").ToString()) + if err != nil { + return nil, err + } + + pathPrefix := rootPath.UntypedJoin(pkg.Dir.ToStringDuringMigration()) + includePattern := "" + excludePattern := "" + if len(inputs) > 0 { + var includePatterns []string + var excludePatterns []string + for _, pattern := range inputs { + if len(pattern) > 0 && pattern[0] == '!' { + excludePatterns = append(excludePatterns, pathPrefix.UntypedJoin(pattern[1:]).ToString()) + } else { + includePatterns = append(includePatterns, pathPrefix.UntypedJoin(pattern).ToString()) + } + } + if len(includePatterns) > 0 { + includePattern = "{" + strings.Join(includePatterns, ",") + "}" + } + if len(excludePatterns) > 0 { + excludePattern = "{" + strings.Join(excludePatterns, ",") + "}" + } + } + + err = fs.Walk(pathPrefix.ToStringDuringMigration(), func(name string, isDir bool) error { + convertedName := turbopath.AbsoluteSystemPathFromUpstream(name) + rootMatch := ignore.MatchesPath(convertedName.ToString()) + otherMatch := ignorePkg.MatchesPath(convertedName.ToString()) + if !rootMatch && !otherMatch { + if !isDir { + if includePattern != "" { + val, err := doublestar.PathMatch(includePattern, convertedName.ToString()) + if err != nil { + return err + } + if !val { + return nil + } + } + if excludePattern != "" { + val, err := doublestar.PathMatch(excludePattern, convertedName.ToString()) + if err != nil { + return err + } + if val { + return nil + } + } + hash, err := fs.GitLikeHashFile(convertedName.ToString()) + if err != nil { + return fmt.Errorf("could not hash file %v. \n%w", convertedName.ToString(), err) + } + + relativePath, err := convertedName.RelativeTo(pathPrefix) + if err != nil { + return fmt.Errorf("File path cannot be made relative: %w", err) + } + hashObject[relativePath.ToUnixPath()] = hash + } + } + return nil + }) + if err != nil { + return nil, err + } + return hashObject, nil +} + +// packageFileHashes is a map from a package and optional input globs to the hash of +// the matched files in the package. +type packageFileHashes map[packageFileHashKey]string + +// CalculateFileHashes hashes each unique package-inputs combination that is present +// in the task graph. Must be called before calculating task hashes. +func (th *Tracker) CalculateFileHashes( + allTasks []dag.Vertex, + workerCount int, + workspaceInfos workspace.Catalog, + taskDefinitions map[string]*fs.TaskDefinition, + repoRoot turbopath.AbsoluteSystemPath, +) error { + hashTasks := make(util.Set) + + for _, v := range allTasks { + taskID, ok := v.(string) + if !ok { + return fmt.Errorf("unknown task %v", taskID) + } + if taskID == th.rootNode { + continue + } + pkgName, _ := util.GetPackageTaskFromId(taskID) + if pkgName == th.rootNode { + continue + } + + taskDefinition, ok := taskDefinitions[taskID] + if !ok { + return fmt.Errorf("missing pipeline entry %v", taskID) + } + + pfs := &packageFileSpec{ + pkg: pkgName, + inputs: taskDefinition.Inputs, + } + + hashTasks.Add(pfs) + } + + hashes := make(map[packageFileHashKey]string, len(hashTasks)) + hashObjects := make(map[packageFileHashKey]map[turbopath.AnchoredUnixPath]string, len(hashTasks)) + hashQueue := make(chan *packageFileSpec, workerCount) + hashErrs := &errgroup.Group{} + + for i := 0; i < workerCount; i++ { + hashErrs.Go(func() error { + for packageFileSpec := range hashQueue { + pkg, ok := workspaceInfos.PackageJSONs[packageFileSpec.pkg] + if !ok { + return fmt.Errorf("cannot find package %v", packageFileSpec.pkg) + } + hashObject := packageFileSpec.getHashObject(pkg, repoRoot) + hash, err := packageFileSpec.hash(hashObject) + if err != nil { + return err + } + th.mu.Lock() + pfsKey := packageFileSpec.ToKey() + hashes[pfsKey] = hash + hashObjects[pfsKey] = hashObject + th.mu.Unlock() + } + return nil + }) + } + for ht := range hashTasks { + hashQueue <- ht.(*packageFileSpec) + } + close(hashQueue) + err := hashErrs.Wait() + if err != nil { + return err + } + th.packageInputsHashes = hashes + th.packageInputsExpandedHashes = hashObjects + return nil +} + +type taskHashable struct { + packageDir turbopath.AnchoredUnixPath + hashOfFiles string + externalDepsHash string + task string + outputs fs.TaskOutputs + passThruArgs []string + envMode util.EnvMode + passthroughEnv []string + hashableEnvPairs []string + globalHash string + taskDependencyHashes []string +} + +type oldTaskHashable struct { + packageDir turbopath.AnchoredUnixPath + hashOfFiles string + externalDepsHash string + task string + outputs fs.TaskOutputs + passThruArgs []string + hashableEnvPairs []string + globalHash string + taskDependencyHashes []string +} + +// calculateTaskHashFromHashable returns a hash string from the taskHashable +func calculateTaskHashFromHashable(full *taskHashable, useOldTaskHashable bool) (string, error) { + // The user is not using the strict environment variables feature. + if useOldTaskHashable { + return fs.HashObject(&oldTaskHashable{ + packageDir: full.packageDir, + hashOfFiles: full.hashOfFiles, + externalDepsHash: full.externalDepsHash, + task: full.task, + outputs: full.outputs, + passThruArgs: full.passThruArgs, + hashableEnvPairs: full.hashableEnvPairs, + globalHash: full.globalHash, + taskDependencyHashes: full.taskDependencyHashes, + }) + } + + switch full.envMode { + case util.Loose: + // Remove the passthroughs from hash consideration if we're explicitly loose. + full.passthroughEnv = nil + return fs.HashObject(full) + case util.Strict: + // Collapse `nil` and `[]` in strict mode. + if full.passthroughEnv == nil { + full.passthroughEnv = make([]string, 0) + } + return fs.HashObject(full) + case util.Infer: + panic("task inferred status should have already been resolved") + default: + panic("unimplemented environment mode") + } +} + +func (th *Tracker) calculateDependencyHashes(dependencySet dag.Set) ([]string, error) { + dependencyHashSet := make(util.Set) + + rootPrefix := th.rootNode + util.TaskDelimiter + th.mu.RLock() + defer th.mu.RUnlock() + for _, dependency := range dependencySet { + if dependency == th.rootNode { + continue + } + dependencyTask, ok := dependency.(string) + if !ok { + return nil, fmt.Errorf("unknown task: %v", dependency) + } + if strings.HasPrefix(dependencyTask, rootPrefix) { + continue + } + dependencyHash, ok := th.packageTaskHashes[dependencyTask] + if !ok { + return nil, fmt.Errorf("missing hash for dependent task: %v", dependencyTask) + } + dependencyHashSet.Add(dependencyHash) + } + dependenciesHashList := dependencyHashSet.UnsafeListOfStrings() + sort.Strings(dependenciesHashList) + return dependenciesHashList, nil +} + +// CalculateTaskHash calculates the hash for package-task combination. It is threadsafe, provided +// that it has previously been called on its task-graph dependencies. File hashes must be calculated +// first. +func (th *Tracker) CalculateTaskHash(packageTask *nodes.PackageTask, dependencySet dag.Set, logger hclog.Logger, args []string, useOldTaskHashable bool) (string, error) { + pfs := specFromPackageTask(packageTask) + pkgFileHashKey := pfs.ToKey() + + hashOfFiles, ok := th.packageInputsHashes[pkgFileHashKey] + if !ok { + return "", fmt.Errorf("cannot find package-file hash for %v", pkgFileHashKey) + } + + var keyMatchers []string + framework := inference.InferFramework(packageTask.Pkg) + if framework != nil && framework.EnvMatcher != "" { + // log auto detected framework and env prefix + logger.Debug(fmt.Sprintf("auto detected framework for %s", packageTask.PackageName), "framework", framework.Slug, "env_prefix", framework.EnvMatcher) + keyMatchers = append(keyMatchers, framework.EnvMatcher) + } + + envVars, err := env.GetHashableEnvVars( + packageTask.TaskDefinition.EnvVarDependencies, + keyMatchers, + "TURBO_CI_VENDOR_ENV_KEY", + ) + if err != nil { + return "", err + } + hashableEnvPairs := envVars.All.ToHashable() + outputs := packageTask.HashableOutputs() + taskDependencyHashes, err := th.calculateDependencyHashes(dependencySet) + if err != nil { + return "", err + } + // log any auto detected env vars + logger.Debug(fmt.Sprintf("task hash env vars for %s:%s", packageTask.PackageName, packageTask.Task), "vars", hashableEnvPairs) + + hash, err := calculateTaskHashFromHashable(&taskHashable{ + packageDir: packageTask.Pkg.Dir.ToUnixPath(), + hashOfFiles: hashOfFiles, + externalDepsHash: packageTask.Pkg.ExternalDepsHash, + task: packageTask.Task, + outputs: outputs.Sort(), + passThruArgs: args, + envMode: packageTask.EnvMode, + passthroughEnv: packageTask.TaskDefinition.PassthroughEnv, + hashableEnvPairs: hashableEnvPairs, + globalHash: th.globalHash, + taskDependencyHashes: taskDependencyHashes, + }, useOldTaskHashable) + if err != nil { + return "", fmt.Errorf("failed to hash task %v: %v", packageTask.TaskID, hash) + } + th.mu.Lock() + th.packageTaskEnvVars[packageTask.TaskID] = envVars + th.packageTaskHashes[packageTask.TaskID] = hash + if framework != nil { + th.packageTaskFramework[packageTask.TaskID] = framework.Slug + } + th.mu.Unlock() + return hash, nil +} + +// GetExpandedInputs gets the expanded set of inputs for a given PackageTask +func (th *Tracker) GetExpandedInputs(packageTask *nodes.PackageTask) map[turbopath.AnchoredUnixPath]string { + pfs := specFromPackageTask(packageTask) + expandedInputs := th.packageInputsExpandedHashes[pfs.ToKey()] + inputsCopy := make(map[turbopath.AnchoredUnixPath]string, len(expandedInputs)) + + for path, hash := range expandedInputs { + inputsCopy[path] = hash + } + + return inputsCopy +} + +// GetEnvVars returns the hashed env vars for a given taskID +func (th *Tracker) GetEnvVars(taskID string) env.DetailedMap { + th.mu.RLock() + defer th.mu.RUnlock() + return th.packageTaskEnvVars[taskID] +} + +// GetFramework returns the inferred framework for a given taskID +func (th *Tracker) GetFramework(taskID string) string { + th.mu.RLock() + defer th.mu.RUnlock() + return th.packageTaskFramework[taskID] +} + +// GetExpandedOutputs returns a list of outputs for a given taskID +func (th *Tracker) GetExpandedOutputs(taskID string) []turbopath.AnchoredSystemPath { + th.mu.RLock() + defer th.mu.RUnlock() + outputs, ok := th.packageTaskOutputs[taskID] + + if !ok { + return []turbopath.AnchoredSystemPath{} + } + + return outputs +} + +// SetExpandedOutputs a list of outputs for a given taskID so it can be read later +func (th *Tracker) SetExpandedOutputs(taskID string, outputs []turbopath.AnchoredSystemPath) { + th.mu.Lock() + defer th.mu.Unlock() + th.packageTaskOutputs[taskID] = outputs +} + +// SetCacheStatus records the task status for the given taskID +func (th *Tracker) SetCacheStatus(taskID string, cacheSummary runsummary.TaskCacheSummary) { + th.mu.Lock() + defer th.mu.Unlock() + th.packageTaskCacheStatus[taskID] = cacheSummary +} + +// GetCacheStatus records the task status for the given taskID +func (th *Tracker) GetCacheStatus(taskID string) runsummary.TaskCacheSummary { + th.mu.Lock() + defer th.mu.Unlock() + + if status, ok := th.packageTaskCacheStatus[taskID]; ok { + return status + } + + // Return an empty one, all the fields will be false and 0 + return runsummary.TaskCacheSummary{} +} |
