aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/cli/internal/taskhash/taskhash.go
diff options
context:
space:
mode:
author简律纯 <hsiangnianian@outlook.com>2023-04-28 01:36:44 +0800
committer简律纯 <hsiangnianian@outlook.com>2023-04-28 01:36:44 +0800
commitdd84b9d64fb98746a230cd24233ff50a562c39c9 (patch)
treeb583261ef00b3afe72ec4d6dacb31e57779a6faf /cli/internal/taskhash/taskhash.go
parent0b46fcd72ac34382387b2bcf9095233efbcc52f4 (diff)
downloadHydroRoll-dd84b9d64fb98746a230cd24233ff50a562c39c9.tar.gz
HydroRoll-dd84b9d64fb98746a230cd24233ff50a562c39c9.zip
Diffstat (limited to 'cli/internal/taskhash/taskhash.go')
-rw-r--r--cli/internal/taskhash/taskhash.go497
1 files changed, 497 insertions, 0 deletions
diff --git a/cli/internal/taskhash/taskhash.go b/cli/internal/taskhash/taskhash.go
new file mode 100644
index 0000000..a912ad9
--- /dev/null
+++ b/cli/internal/taskhash/taskhash.go
@@ -0,0 +1,497 @@
+// Package taskhash handles calculating dependency hashes for nodes in the task execution graph.
+package taskhash
+
+import (
+ "fmt"
+ "sort"
+ "strings"
+ "sync"
+
+ "github.com/hashicorp/go-hclog"
+ "github.com/pyr-sh/dag"
+ gitignore "github.com/sabhiram/go-gitignore"
+ "github.com/vercel/turbo/cli/internal/doublestar"
+ "github.com/vercel/turbo/cli/internal/env"
+ "github.com/vercel/turbo/cli/internal/fs"
+ "github.com/vercel/turbo/cli/internal/hashing"
+ "github.com/vercel/turbo/cli/internal/inference"
+ "github.com/vercel/turbo/cli/internal/nodes"
+ "github.com/vercel/turbo/cli/internal/runsummary"
+ "github.com/vercel/turbo/cli/internal/turbopath"
+ "github.com/vercel/turbo/cli/internal/util"
+ "github.com/vercel/turbo/cli/internal/workspace"
+ "golang.org/x/sync/errgroup"
+)
+
+// Tracker caches package-inputs hashes, as well as package-task hashes.
+// package-inputs hashes must be calculated before package-task hashes,
+// and package-task hashes must be calculated in topographical order.
+// package-task hashing is threadsafe, provided topographical order is
+// respected.
+type Tracker struct {
+ rootNode string
+ globalHash string
+ pipeline fs.Pipeline
+
+ packageInputsHashes packageFileHashes
+
+ // packageInputsExpandedHashes is a map of a hashkey to a list of files that are inputs to the task.
+ // Writes to this map happen during CalculateFileHash(). Since this happens synchronously
+ // before walking the task graph, it does not need to be protected by a mutex.
+ packageInputsExpandedHashes map[packageFileHashKey]map[turbopath.AnchoredUnixPath]string
+
+ // mu is a mutex that we can lock/unlock to read/write from maps
+ // the fields below should be protected by the mutex.
+ mu sync.RWMutex
+ packageTaskEnvVars map[string]env.DetailedMap // taskId -> envvar pairs that affect the hash.
+ packageTaskHashes map[string]string // taskID -> hash
+ packageTaskFramework map[string]string // taskID -> inferred framework for package
+ packageTaskOutputs map[string][]turbopath.AnchoredSystemPath
+ packageTaskCacheStatus map[string]runsummary.TaskCacheSummary
+}
+
+// NewTracker creates a tracker for package-inputs combinations and package-task combinations.
+func NewTracker(rootNode string, globalHash string, pipeline fs.Pipeline) *Tracker {
+ return &Tracker{
+ rootNode: rootNode,
+ globalHash: globalHash,
+ pipeline: pipeline,
+ packageTaskHashes: make(map[string]string),
+ packageTaskFramework: make(map[string]string),
+ packageTaskEnvVars: make(map[string]env.DetailedMap),
+ packageTaskOutputs: make(map[string][]turbopath.AnchoredSystemPath),
+ packageTaskCacheStatus: make(map[string]runsummary.TaskCacheSummary),
+ }
+}
+
+// packageFileSpec defines a combination of a package and optional set of input globs
+type packageFileSpec struct {
+ pkg string
+ inputs []string
+}
+
+func specFromPackageTask(packageTask *nodes.PackageTask) packageFileSpec {
+ return packageFileSpec{
+ pkg: packageTask.PackageName,
+ inputs: packageTask.TaskDefinition.Inputs,
+ }
+}
+
+// packageFileHashKey is a hashable representation of a packageFileSpec.
+type packageFileHashKey string
+
+// hashes the inputs for a packageTask
+func (pfs packageFileSpec) ToKey() packageFileHashKey {
+ sort.Strings(pfs.inputs)
+ return packageFileHashKey(fmt.Sprintf("%v#%v", pfs.pkg, strings.Join(pfs.inputs, "!")))
+}
+
+func safeCompileIgnoreFile(filepath string) (*gitignore.GitIgnore, error) {
+ if fs.FileExists(filepath) {
+ return gitignore.CompileIgnoreFile(filepath)
+ }
+ // no op
+ return gitignore.CompileIgnoreLines([]string{}...), nil
+}
+
+func (pfs *packageFileSpec) getHashObject(pkg *fs.PackageJSON, repoRoot turbopath.AbsoluteSystemPath) map[turbopath.AnchoredUnixPath]string {
+ hashObject, pkgDepsErr := hashing.GetPackageDeps(repoRoot, &hashing.PackageDepsOptions{
+ PackagePath: pkg.Dir,
+ InputPatterns: pfs.inputs,
+ })
+ if pkgDepsErr != nil {
+ manualHashObject, err := manuallyHashPackage(pkg, pfs.inputs, repoRoot)
+ if err != nil {
+ return make(map[turbopath.AnchoredUnixPath]string)
+ }
+ hashObject = manualHashObject
+ }
+
+ return hashObject
+}
+
+func (pfs *packageFileSpec) hash(hashObject map[turbopath.AnchoredUnixPath]string) (string, error) {
+ hashOfFiles, otherErr := fs.HashObject(hashObject)
+ if otherErr != nil {
+ return "", otherErr
+ }
+ return hashOfFiles, nil
+}
+
+func manuallyHashPackage(pkg *fs.PackageJSON, inputs []string, rootPath turbopath.AbsoluteSystemPath) (map[turbopath.AnchoredUnixPath]string, error) {
+ hashObject := make(map[turbopath.AnchoredUnixPath]string)
+ // Instead of implementing all gitignore properly, we hack it. We only respect .gitignore in the root and in
+ // the directory of a package.
+ ignore, err := safeCompileIgnoreFile(rootPath.UntypedJoin(".gitignore").ToString())
+ if err != nil {
+ return nil, err
+ }
+
+ ignorePkg, err := safeCompileIgnoreFile(rootPath.UntypedJoin(pkg.Dir.ToStringDuringMigration(), ".gitignore").ToString())
+ if err != nil {
+ return nil, err
+ }
+
+ pathPrefix := rootPath.UntypedJoin(pkg.Dir.ToStringDuringMigration())
+ includePattern := ""
+ excludePattern := ""
+ if len(inputs) > 0 {
+ var includePatterns []string
+ var excludePatterns []string
+ for _, pattern := range inputs {
+ if len(pattern) > 0 && pattern[0] == '!' {
+ excludePatterns = append(excludePatterns, pathPrefix.UntypedJoin(pattern[1:]).ToString())
+ } else {
+ includePatterns = append(includePatterns, pathPrefix.UntypedJoin(pattern).ToString())
+ }
+ }
+ if len(includePatterns) > 0 {
+ includePattern = "{" + strings.Join(includePatterns, ",") + "}"
+ }
+ if len(excludePatterns) > 0 {
+ excludePattern = "{" + strings.Join(excludePatterns, ",") + "}"
+ }
+ }
+
+ err = fs.Walk(pathPrefix.ToStringDuringMigration(), func(name string, isDir bool) error {
+ convertedName := turbopath.AbsoluteSystemPathFromUpstream(name)
+ rootMatch := ignore.MatchesPath(convertedName.ToString())
+ otherMatch := ignorePkg.MatchesPath(convertedName.ToString())
+ if !rootMatch && !otherMatch {
+ if !isDir {
+ if includePattern != "" {
+ val, err := doublestar.PathMatch(includePattern, convertedName.ToString())
+ if err != nil {
+ return err
+ }
+ if !val {
+ return nil
+ }
+ }
+ if excludePattern != "" {
+ val, err := doublestar.PathMatch(excludePattern, convertedName.ToString())
+ if err != nil {
+ return err
+ }
+ if val {
+ return nil
+ }
+ }
+ hash, err := fs.GitLikeHashFile(convertedName.ToString())
+ if err != nil {
+ return fmt.Errorf("could not hash file %v. \n%w", convertedName.ToString(), err)
+ }
+
+ relativePath, err := convertedName.RelativeTo(pathPrefix)
+ if err != nil {
+ return fmt.Errorf("File path cannot be made relative: %w", err)
+ }
+ hashObject[relativePath.ToUnixPath()] = hash
+ }
+ }
+ return nil
+ })
+ if err != nil {
+ return nil, err
+ }
+ return hashObject, nil
+}
+
+// packageFileHashes is a map from a package and optional input globs to the hash of
+// the matched files in the package.
+type packageFileHashes map[packageFileHashKey]string
+
+// CalculateFileHashes hashes each unique package-inputs combination that is present
+// in the task graph. Must be called before calculating task hashes.
+func (th *Tracker) CalculateFileHashes(
+ allTasks []dag.Vertex,
+ workerCount int,
+ workspaceInfos workspace.Catalog,
+ taskDefinitions map[string]*fs.TaskDefinition,
+ repoRoot turbopath.AbsoluteSystemPath,
+) error {
+ hashTasks := make(util.Set)
+
+ for _, v := range allTasks {
+ taskID, ok := v.(string)
+ if !ok {
+ return fmt.Errorf("unknown task %v", taskID)
+ }
+ if taskID == th.rootNode {
+ continue
+ }
+ pkgName, _ := util.GetPackageTaskFromId(taskID)
+ if pkgName == th.rootNode {
+ continue
+ }
+
+ taskDefinition, ok := taskDefinitions[taskID]
+ if !ok {
+ return fmt.Errorf("missing pipeline entry %v", taskID)
+ }
+
+ pfs := &packageFileSpec{
+ pkg: pkgName,
+ inputs: taskDefinition.Inputs,
+ }
+
+ hashTasks.Add(pfs)
+ }
+
+ hashes := make(map[packageFileHashKey]string, len(hashTasks))
+ hashObjects := make(map[packageFileHashKey]map[turbopath.AnchoredUnixPath]string, len(hashTasks))
+ hashQueue := make(chan *packageFileSpec, workerCount)
+ hashErrs := &errgroup.Group{}
+
+ for i := 0; i < workerCount; i++ {
+ hashErrs.Go(func() error {
+ for packageFileSpec := range hashQueue {
+ pkg, ok := workspaceInfos.PackageJSONs[packageFileSpec.pkg]
+ if !ok {
+ return fmt.Errorf("cannot find package %v", packageFileSpec.pkg)
+ }
+ hashObject := packageFileSpec.getHashObject(pkg, repoRoot)
+ hash, err := packageFileSpec.hash(hashObject)
+ if err != nil {
+ return err
+ }
+ th.mu.Lock()
+ pfsKey := packageFileSpec.ToKey()
+ hashes[pfsKey] = hash
+ hashObjects[pfsKey] = hashObject
+ th.mu.Unlock()
+ }
+ return nil
+ })
+ }
+ for ht := range hashTasks {
+ hashQueue <- ht.(*packageFileSpec)
+ }
+ close(hashQueue)
+ err := hashErrs.Wait()
+ if err != nil {
+ return err
+ }
+ th.packageInputsHashes = hashes
+ th.packageInputsExpandedHashes = hashObjects
+ return nil
+}
+
+type taskHashable struct {
+ packageDir turbopath.AnchoredUnixPath
+ hashOfFiles string
+ externalDepsHash string
+ task string
+ outputs fs.TaskOutputs
+ passThruArgs []string
+ envMode util.EnvMode
+ passthroughEnv []string
+ hashableEnvPairs []string
+ globalHash string
+ taskDependencyHashes []string
+}
+
+type oldTaskHashable struct {
+ packageDir turbopath.AnchoredUnixPath
+ hashOfFiles string
+ externalDepsHash string
+ task string
+ outputs fs.TaskOutputs
+ passThruArgs []string
+ hashableEnvPairs []string
+ globalHash string
+ taskDependencyHashes []string
+}
+
+// calculateTaskHashFromHashable returns a hash string from the taskHashable
+func calculateTaskHashFromHashable(full *taskHashable, useOldTaskHashable bool) (string, error) {
+ // The user is not using the strict environment variables feature.
+ if useOldTaskHashable {
+ return fs.HashObject(&oldTaskHashable{
+ packageDir: full.packageDir,
+ hashOfFiles: full.hashOfFiles,
+ externalDepsHash: full.externalDepsHash,
+ task: full.task,
+ outputs: full.outputs,
+ passThruArgs: full.passThruArgs,
+ hashableEnvPairs: full.hashableEnvPairs,
+ globalHash: full.globalHash,
+ taskDependencyHashes: full.taskDependencyHashes,
+ })
+ }
+
+ switch full.envMode {
+ case util.Loose:
+ // Remove the passthroughs from hash consideration if we're explicitly loose.
+ full.passthroughEnv = nil
+ return fs.HashObject(full)
+ case util.Strict:
+ // Collapse `nil` and `[]` in strict mode.
+ if full.passthroughEnv == nil {
+ full.passthroughEnv = make([]string, 0)
+ }
+ return fs.HashObject(full)
+ case util.Infer:
+ panic("task inferred status should have already been resolved")
+ default:
+ panic("unimplemented environment mode")
+ }
+}
+
+func (th *Tracker) calculateDependencyHashes(dependencySet dag.Set) ([]string, error) {
+ dependencyHashSet := make(util.Set)
+
+ rootPrefix := th.rootNode + util.TaskDelimiter
+ th.mu.RLock()
+ defer th.mu.RUnlock()
+ for _, dependency := range dependencySet {
+ if dependency == th.rootNode {
+ continue
+ }
+ dependencyTask, ok := dependency.(string)
+ if !ok {
+ return nil, fmt.Errorf("unknown task: %v", dependency)
+ }
+ if strings.HasPrefix(dependencyTask, rootPrefix) {
+ continue
+ }
+ dependencyHash, ok := th.packageTaskHashes[dependencyTask]
+ if !ok {
+ return nil, fmt.Errorf("missing hash for dependent task: %v", dependencyTask)
+ }
+ dependencyHashSet.Add(dependencyHash)
+ }
+ dependenciesHashList := dependencyHashSet.UnsafeListOfStrings()
+ sort.Strings(dependenciesHashList)
+ return dependenciesHashList, nil
+}
+
+// CalculateTaskHash calculates the hash for package-task combination. It is threadsafe, provided
+// that it has previously been called on its task-graph dependencies. File hashes must be calculated
+// first.
+func (th *Tracker) CalculateTaskHash(packageTask *nodes.PackageTask, dependencySet dag.Set, logger hclog.Logger, args []string, useOldTaskHashable bool) (string, error) {
+ pfs := specFromPackageTask(packageTask)
+ pkgFileHashKey := pfs.ToKey()
+
+ hashOfFiles, ok := th.packageInputsHashes[pkgFileHashKey]
+ if !ok {
+ return "", fmt.Errorf("cannot find package-file hash for %v", pkgFileHashKey)
+ }
+
+ var keyMatchers []string
+ framework := inference.InferFramework(packageTask.Pkg)
+ if framework != nil && framework.EnvMatcher != "" {
+ // log auto detected framework and env prefix
+ logger.Debug(fmt.Sprintf("auto detected framework for %s", packageTask.PackageName), "framework", framework.Slug, "env_prefix", framework.EnvMatcher)
+ keyMatchers = append(keyMatchers, framework.EnvMatcher)
+ }
+
+ envVars, err := env.GetHashableEnvVars(
+ packageTask.TaskDefinition.EnvVarDependencies,
+ keyMatchers,
+ "TURBO_CI_VENDOR_ENV_KEY",
+ )
+ if err != nil {
+ return "", err
+ }
+ hashableEnvPairs := envVars.All.ToHashable()
+ outputs := packageTask.HashableOutputs()
+ taskDependencyHashes, err := th.calculateDependencyHashes(dependencySet)
+ if err != nil {
+ return "", err
+ }
+ // log any auto detected env vars
+ logger.Debug(fmt.Sprintf("task hash env vars for %s:%s", packageTask.PackageName, packageTask.Task), "vars", hashableEnvPairs)
+
+ hash, err := calculateTaskHashFromHashable(&taskHashable{
+ packageDir: packageTask.Pkg.Dir.ToUnixPath(),
+ hashOfFiles: hashOfFiles,
+ externalDepsHash: packageTask.Pkg.ExternalDepsHash,
+ task: packageTask.Task,
+ outputs: outputs.Sort(),
+ passThruArgs: args,
+ envMode: packageTask.EnvMode,
+ passthroughEnv: packageTask.TaskDefinition.PassthroughEnv,
+ hashableEnvPairs: hashableEnvPairs,
+ globalHash: th.globalHash,
+ taskDependencyHashes: taskDependencyHashes,
+ }, useOldTaskHashable)
+ if err != nil {
+ return "", fmt.Errorf("failed to hash task %v: %v", packageTask.TaskID, hash)
+ }
+ th.mu.Lock()
+ th.packageTaskEnvVars[packageTask.TaskID] = envVars
+ th.packageTaskHashes[packageTask.TaskID] = hash
+ if framework != nil {
+ th.packageTaskFramework[packageTask.TaskID] = framework.Slug
+ }
+ th.mu.Unlock()
+ return hash, nil
+}
+
+// GetExpandedInputs gets the expanded set of inputs for a given PackageTask
+func (th *Tracker) GetExpandedInputs(packageTask *nodes.PackageTask) map[turbopath.AnchoredUnixPath]string {
+ pfs := specFromPackageTask(packageTask)
+ expandedInputs := th.packageInputsExpandedHashes[pfs.ToKey()]
+ inputsCopy := make(map[turbopath.AnchoredUnixPath]string, len(expandedInputs))
+
+ for path, hash := range expandedInputs {
+ inputsCopy[path] = hash
+ }
+
+ return inputsCopy
+}
+
+// GetEnvVars returns the hashed env vars for a given taskID
+func (th *Tracker) GetEnvVars(taskID string) env.DetailedMap {
+ th.mu.RLock()
+ defer th.mu.RUnlock()
+ return th.packageTaskEnvVars[taskID]
+}
+
+// GetFramework returns the inferred framework for a given taskID
+func (th *Tracker) GetFramework(taskID string) string {
+ th.mu.RLock()
+ defer th.mu.RUnlock()
+ return th.packageTaskFramework[taskID]
+}
+
+// GetExpandedOutputs returns a list of outputs for a given taskID
+func (th *Tracker) GetExpandedOutputs(taskID string) []turbopath.AnchoredSystemPath {
+ th.mu.RLock()
+ defer th.mu.RUnlock()
+ outputs, ok := th.packageTaskOutputs[taskID]
+
+ if !ok {
+ return []turbopath.AnchoredSystemPath{}
+ }
+
+ return outputs
+}
+
+// SetExpandedOutputs a list of outputs for a given taskID so it can be read later
+func (th *Tracker) SetExpandedOutputs(taskID string, outputs []turbopath.AnchoredSystemPath) {
+ th.mu.Lock()
+ defer th.mu.Unlock()
+ th.packageTaskOutputs[taskID] = outputs
+}
+
+// SetCacheStatus records the task status for the given taskID
+func (th *Tracker) SetCacheStatus(taskID string, cacheSummary runsummary.TaskCacheSummary) {
+ th.mu.Lock()
+ defer th.mu.Unlock()
+ th.packageTaskCacheStatus[taskID] = cacheSummary
+}
+
+// GetCacheStatus records the task status for the given taskID
+func (th *Tracker) GetCacheStatus(taskID string) runsummary.TaskCacheSummary {
+ th.mu.Lock()
+ defer th.mu.Unlock()
+
+ if status, ok := th.packageTaskCacheStatus[taskID]; ok {
+ return status
+ }
+
+ // Return an empty one, all the fields will be false and 0
+ return runsummary.TaskCacheSummary{}
+}