diff options
| author | 2023-04-28 01:36:44 +0800 | |
|---|---|---|
| committer | 2023-04-28 01:36:44 +0800 | |
| commit | dd84b9d64fb98746a230cd24233ff50a562c39c9 (patch) | |
| tree | b583261ef00b3afe72ec4d6dacb31e57779a6faf /cli/internal/core | |
| parent | 0b46fcd72ac34382387b2bcf9095233efbcc52f4 (diff) | |
| download | HydroRoll-dd84b9d64fb98746a230cd24233ff50a562c39c9.tar.gz HydroRoll-dd84b9d64fb98746a230cd24233ff50a562c39c9.zip | |
Diffstat (limited to 'cli/internal/core')
| -rw-r--r-- | cli/internal/core/engine.go | 591 | ||||
| -rw-r--r-- | cli/internal/core/engine_test.go | 88 |
2 files changed, 679 insertions, 0 deletions
diff --git a/cli/internal/core/engine.go b/cli/internal/core/engine.go new file mode 100644 index 0000000..7f08ea8 --- /dev/null +++ b/cli/internal/core/engine.go @@ -0,0 +1,591 @@ +package core + +import ( + "errors" + "fmt" + "os" + "sort" + "strings" + "sync/atomic" + + "github.com/vercel/turbo/cli/internal/fs" + "github.com/vercel/turbo/cli/internal/graph" + "github.com/vercel/turbo/cli/internal/util" + + "github.com/pyr-sh/dag" +) + +const ROOT_NODE_NAME = "___ROOT___" + +// Task is a higher level struct that contains the underlying TaskDefinition +// but also some adjustments to it, based on business logic. +type Task struct { + Name string + // TaskDefinition contains the config for the task from turbo.json + TaskDefinition fs.TaskDefinition +} + +type Visitor = func(taskID string) error + +// Engine contains both the DAG for the packages and the tasks and implements the methods to execute tasks in them +type Engine struct { + // TaskGraph is a graph of package-tasks + TaskGraph *dag.AcyclicGraph + PackageTaskDeps map[string][]string + rootEnabledTasks util.Set + + // completeGraph is the CompleteGraph. We need this to look up the Pipeline, etc. + completeGraph *graph.CompleteGraph + // isSinglePackage is used to load turbo.json correctly + isSinglePackage bool +} + +// NewEngine creates a new engine given a topologic graph of workspace package names +func NewEngine( + completeGraph *graph.CompleteGraph, + isSinglePackage bool, +) *Engine { + return &Engine{ + completeGraph: completeGraph, + TaskGraph: &dag.AcyclicGraph{}, + PackageTaskDeps: map[string][]string{}, + rootEnabledTasks: make(util.Set), + isSinglePackage: isSinglePackage, + } +} + +// EngineBuildingOptions help construct the TaskGraph +type EngineBuildingOptions struct { + // Packages in the execution scope, if nil, all packages will be considered in scope + Packages []string + // TaskNames in the execution scope, if nil, all tasks will be executed + TaskNames []string + // Restrict execution to only the listed task names + TasksOnly bool +} + +// EngineExecutionOptions controls a single walk of the task graph +type EngineExecutionOptions struct { + // Parallel is whether to run tasks in parallel + Parallel bool + // Concurrency is the number of concurrent tasks that can be executed + Concurrency int +} + +// Execute executes the pipeline, constructing an internal task graph and walking it accordingly. +func (e *Engine) Execute(visitor Visitor, opts EngineExecutionOptions) []error { + var sema = util.NewSemaphore(opts.Concurrency) + var errored int32 + return e.TaskGraph.Walk(func(v dag.Vertex) error { + // If something has already errored, short-circuit. + // There is a race here between concurrent tasks. However, if there is not a + // dependency edge between them, we are not required to have a strict order + // between them, so a failed task can fail to short-circuit a concurrent + // task that happened to be starting at the same time. + if atomic.LoadInt32(&errored) != 0 { + return nil + } + // Each vertex in the graph is a taskID (package#task format) + taskID := dag.VertexName(v) + + // Always return if it is the root node + if strings.Contains(taskID, ROOT_NODE_NAME) { + return nil + } + + // Acquire the semaphore unless parallel + if !opts.Parallel { + sema.Acquire() + defer sema.Release() + } + + if err := visitor(taskID); err != nil { + // We only ever flip from false to true, so we don't need to compare and swap the atomic + atomic.StoreInt32(&errored, 1) + return err + } + return nil + }) +} + +// MissingTaskError is a specialized Error thrown in the case that we can't find a task. +// We want to allow this error when getting task definitions, so we have to special case it. +type MissingTaskError struct { + workspaceName string + taskID string + taskName string +} + +func (m *MissingTaskError) Error() string { + return fmt.Sprintf("Could not find \"%s\" or \"%s\" in workspace \"%s\"", m.taskName, m.taskID, m.workspaceName) +} + +func (e *Engine) getTaskDefinition(pkg string, taskName string, taskID string) (*Task, error) { + pipeline, err := e.completeGraph.GetPipelineFromWorkspace(pkg, e.isSinglePackage) + + if err != nil { + if pkg != util.RootPkgName { + // If there was no turbo.json in the workspace, fallback to the root turbo.json + if errors.Is(err, os.ErrNotExist) { + return e.getTaskDefinition(util.RootPkgName, taskName, taskID) + } + + // otherwise bubble it up + return nil, err + } + + return nil, err + } + + if task, ok := pipeline[taskID]; ok { + return &Task{ + Name: taskName, + TaskDefinition: task.GetTaskDefinition(), + }, nil + } + + if task, ok := pipeline[taskName]; ok { + return &Task{ + Name: taskName, + TaskDefinition: task.GetTaskDefinition(), + }, nil + } + + // An error here means turbo.json exists, but didn't define the task. + // Fallback to the root pipeline to find the task. + if pkg != util.RootPkgName { + return e.getTaskDefinition(util.RootPkgName, taskName, taskID) + } + + // Return this as a custom type so we can ignore it specifically + return nil, &MissingTaskError{ + taskName: taskName, + taskID: taskID, + workspaceName: pkg, + } +} + +// Prepare constructs the Task Graph for a list of packages and tasks +func (e *Engine) Prepare(options *EngineBuildingOptions) error { + pkgs := options.Packages + taskNames := options.TaskNames + tasksOnly := options.TasksOnly + + // If there are no affected packages, we don't need to go through all this work + // we can just exit early. + // TODO(mehulkar): but we still need to validate bad task names? + if len(pkgs) == 0 { + return nil + } + + traversalQueue := []string{} + + // get a set of taskNames passed in. we'll remove the ones that have a definition + missing := util.SetFromStrings(taskNames) + + // Get a list of entry points into our TaskGraph. + // We do this by taking the input taskNames, and pkgs + // and creating a queue of taskIDs that we can traverse and gather dependencies from. + for _, pkg := range pkgs { + for _, taskName := range taskNames { + taskID := util.GetTaskId(pkg, taskName) + + // Look up the task in the package + foundTask, err := e.getTaskDefinition(pkg, taskName, taskID) + + // We can skip MissingTaskErrors because we'll validate against them later + // Return all other errors + if err != nil { + var e *MissingTaskError + if errors.As(err, &e) { + // Initially, non-package tasks are not required to exist, as long as some + // package in the list packages defines it as a package-task. Dependencies + // *are* required to have a definition. + continue + } + + return err + } + + // If we found a task definition, remove it from the missing list + if foundTask != nil { + // delete taskName if it was found + missing.Delete(taskName) + + // Even if a task definition was found, we _only_ want to add it as an entry point to + // the task graph (i.e. the traversalQueue), if it's: + // - A task from the non-root workspace (i.e. tasks from every other workspace) + // - A task that we *know* is rootEnabled task (in which case, the root workspace is acceptable) + isRootPkg := pkg == util.RootPkgName + if !isRootPkg || e.rootEnabledTasks.Includes(taskName) { + traversalQueue = append(traversalQueue, taskID) + } + } + } + } + + visited := make(util.Set) + + // validate that all tasks passed were found + missingList := missing.UnsafeListOfStrings() + sort.Strings(missingList) + + if len(missingList) > 0 { + return fmt.Errorf("Could not find the following tasks in project: %s", strings.Join(missingList, ", ")) + } + + // Things get appended to traversalQueue inside this loop, so we use the len() check instead of range. + for len(traversalQueue) > 0 { + // pop off the first item from the traversalQueue + taskID := traversalQueue[0] + traversalQueue = traversalQueue[1:] + + pkg, taskName := util.GetPackageTaskFromId(taskID) + + if pkg == util.RootPkgName && !e.rootEnabledTasks.Includes(taskName) { + return fmt.Errorf("%v needs an entry in turbo.json before it can be depended on because it is a task run from the root package", taskID) + } + + if pkg != ROOT_NODE_NAME { + if _, ok := e.completeGraph.WorkspaceInfos.PackageJSONs[pkg]; !ok { + // If we have a pkg it should be in WorkspaceInfos. + // If we're hitting this error something has gone wrong earlier when building WorkspaceInfos + // or the workspace really doesn't exist and turbo.json is misconfigured. + return fmt.Errorf("Could not find workspace \"%s\" from task \"%s\" in project", pkg, taskID) + } + } + + taskDefinitions, err := e.getTaskDefinitionChain(taskID, taskName) + if err != nil { + return err + } + + taskDefinition, err := fs.MergeTaskDefinitions(taskDefinitions) + if err != nil { + return err + } + + // Skip this iteration of the loop if we've already seen this taskID + if visited.Includes(taskID) { + continue + } + + visited.Add(taskID) + + // Put this taskDefinition into the Graph so we can look it up later during execution. + e.completeGraph.TaskDefinitions[taskID] = taskDefinition + + topoDeps := util.SetFromStrings(taskDefinition.TopologicalDependencies) + deps := make(util.Set) + isPackageTask := util.IsPackageTask(taskName) + + for _, dependency := range taskDefinition.TaskDependencies { + // If the current task is a workspace-specific task (including root Task) + // and its dependency is _also_ a workspace-specific task, we need to add + // a reference to this dependency directly into the engine. + // TODO @mehulkar: Why do we need this? + if isPackageTask && util.IsPackageTask(dependency) { + if err := e.AddDep(dependency, taskName); err != nil { + return err + } + } else { + // For non-workspace-specific dependencies, we attach a reference to + // the task that is added into the engine. + deps.Add(dependency) + } + } + + // Filter down the tasks if there's a filter in place + // https: //turbo.build/repo/docs/reference/command-line-reference#--only + if tasksOnly { + deps = deps.Filter(func(d interface{}) bool { + for _, target := range taskNames { + return fmt.Sprintf("%v", d) == target + } + return false + }) + topoDeps = topoDeps.Filter(func(d interface{}) bool { + for _, target := range taskNames { + return fmt.Sprintf("%v", d) == target + } + return false + }) + } + + toTaskID := taskID + + // hasTopoDeps will be true if the task depends on any tasks from dependency packages + // E.g. `dev: { dependsOn: [^dev] }` + hasTopoDeps := topoDeps.Len() > 0 && e.completeGraph.WorkspaceGraph.DownEdges(pkg).Len() > 0 + + // hasDeps will be true if the task depends on any tasks from its own package + // E.g. `build: { dependsOn: [dev] }` + hasDeps := deps.Len() > 0 + + // hasPackageTaskDeps will be true if this is a workspace-specific task, and + // it depends on another workspace-specific tasks + // E.g. `my-package#build: { dependsOn: [my-package#beforebuild] }`. + hasPackageTaskDeps := false + if _, ok := e.PackageTaskDeps[toTaskID]; ok { + hasPackageTaskDeps = true + } + + if hasTopoDeps { + depPkgs := e.completeGraph.WorkspaceGraph.DownEdges(pkg) + for _, from := range topoDeps.UnsafeListOfStrings() { + // add task dep from all the package deps within repo + for depPkg := range depPkgs { + fromTaskID := util.GetTaskId(depPkg, from) + e.TaskGraph.Add(fromTaskID) + e.TaskGraph.Add(toTaskID) + e.TaskGraph.Connect(dag.BasicEdge(toTaskID, fromTaskID)) + traversalQueue = append(traversalQueue, fromTaskID) + } + } + } + + if hasDeps { + for _, from := range deps.UnsafeListOfStrings() { + fromTaskID := util.GetTaskId(pkg, from) + e.TaskGraph.Add(fromTaskID) + e.TaskGraph.Add(toTaskID) + e.TaskGraph.Connect(dag.BasicEdge(toTaskID, fromTaskID)) + traversalQueue = append(traversalQueue, fromTaskID) + } + } + + if hasPackageTaskDeps { + if pkgTaskDeps, ok := e.PackageTaskDeps[toTaskID]; ok { + for _, fromTaskID := range pkgTaskDeps { + e.TaskGraph.Add(fromTaskID) + e.TaskGraph.Add(toTaskID) + e.TaskGraph.Connect(dag.BasicEdge(toTaskID, fromTaskID)) + traversalQueue = append(traversalQueue, fromTaskID) + } + } + } + + // Add the root node into the graph + if !hasDeps && !hasTopoDeps && !hasPackageTaskDeps { + e.TaskGraph.Add(ROOT_NODE_NAME) + e.TaskGraph.Add(toTaskID) + e.TaskGraph.Connect(dag.BasicEdge(toTaskID, ROOT_NODE_NAME)) + } + } + + return nil +} + +// AddTask adds root tasks to the engine so they can be looked up later. +func (e *Engine) AddTask(taskName string) { + if util.IsPackageTask(taskName) { + pkg, taskName := util.GetPackageTaskFromId(taskName) + if pkg == util.RootPkgName { + e.rootEnabledTasks.Add(taskName) + } + } +} + +// AddDep adds tuples from+to task ID combos in tuple format so they can be looked up later. +func (e *Engine) AddDep(fromTaskID string, toTaskID string) error { + fromPkg, _ := util.GetPackageTaskFromId(fromTaskID) + if fromPkg != ROOT_NODE_NAME && fromPkg != util.RootPkgName && !e.completeGraph.WorkspaceGraph.HasVertex(fromPkg) { + return fmt.Errorf("found reference to unknown package: %v in task %v", fromPkg, fromTaskID) + } + + if _, ok := e.PackageTaskDeps[toTaskID]; !ok { + e.PackageTaskDeps[toTaskID] = []string{} + } + + e.PackageTaskDeps[toTaskID] = append(e.PackageTaskDeps[toTaskID], fromTaskID) + + return nil +} + +// ValidatePersistentDependencies checks if any task dependsOn persistent tasks and throws +// an error if that task is actually implemented +func (e *Engine) ValidatePersistentDependencies(graph *graph.CompleteGraph, concurrency int) error { + var validationError error + persistentCount := 0 + + // Adding in a lock because otherwise walking the graph can introduce a data race + // (reproducible with `go test -race`) + var sema = util.NewSemaphore(1) + + errs := e.TaskGraph.Walk(func(v dag.Vertex) error { + vertexName := dag.VertexName(v) // vertexName is a taskID + + // No need to check the root node if that's where we are. + if strings.Contains(vertexName, ROOT_NODE_NAME) { + return nil + } + + // Aquire a lock, because otherwise walking this group can cause a race condition + // writing to the same validationError var defined outside the Walk(). This shows + // up when running tests with the `-race` flag. + sema.Acquire() + defer sema.Release() + + currentTaskDefinition, currentTaskExists := e.completeGraph.TaskDefinitions[vertexName] + if currentTaskExists && currentTaskDefinition.Persistent { + persistentCount++ + } + + currentPackageName, currentTaskName := util.GetPackageTaskFromId(vertexName) + + // For each "downEdge" (i.e. each task that _this_ task dependsOn) + // check if the downEdge is a Persistent task, and if it actually has the script implemented + // in that package's package.json + for dep := range e.TaskGraph.DownEdges(vertexName) { + depTaskID := dep.(string) + // No need to check the root node + if strings.Contains(depTaskID, ROOT_NODE_NAME) { + return nil + } + + // Parse the taskID of this dependency task + packageName, taskName := util.GetPackageTaskFromId(depTaskID) + + // Get the Task Definition so we can check if it is Persistent + depTaskDefinition, taskExists := e.completeGraph.TaskDefinitions[depTaskID] + + if !taskExists { + return fmt.Errorf("Cannot find task definition for %v in package %v", depTaskID, packageName) + } + + // Get information about the package + pkg, pkgExists := graph.WorkspaceInfos.PackageJSONs[packageName] + if !pkgExists { + return fmt.Errorf("Cannot find package %v", packageName) + } + _, hasScript := pkg.Scripts[taskName] + + // If both conditions are true set a value and break out of checking the dependencies + if depTaskDefinition.Persistent && hasScript { + validationError = fmt.Errorf( + "\"%s\" is a persistent task, \"%s\" cannot depend on it", + util.GetTaskId(packageName, taskName), + util.GetTaskId(currentPackageName, currentTaskName), + ) + + break + } + } + + return nil + }) + + for _, err := range errs { + return fmt.Errorf("Validation failed: %v", err) + } + + if validationError != nil { + return validationError + } else if persistentCount >= concurrency { + return fmt.Errorf("You have %v persistent tasks but `turbo` is configured for concurrency of %v. Set --concurrency to at least %v", persistentCount, concurrency, persistentCount+1) + } + + return nil +} + +// getTaskDefinitionChain gets a set of TaskDefinitions that apply to the taskID. +// These definitions should be merged by the consumer. +func (e *Engine) getTaskDefinitionChain(taskID string, taskName string) ([]fs.BookkeepingTaskDefinition, error) { + // Start a list of TaskDefinitions we've found for this TaskID + taskDefinitions := []fs.BookkeepingTaskDefinition{} + + rootPipeline, err := e.completeGraph.GetPipelineFromWorkspace(util.RootPkgName, e.isSinglePackage) + if err != nil { + // It should be very unlikely that we can't find a root pipeline. Even for single package repos + // the pipeline is synthesized from package.json, so there should be _something_ here. + return nil, err + } + + // Look for the taskDefinition in the root pipeline. + if rootTaskDefinition, err := rootPipeline.GetTask(taskID, taskName); err == nil { + taskDefinitions = append(taskDefinitions, *rootTaskDefinition) + } + + // If we're in a single package repo, we can just exit with the TaskDefinition in the root pipeline + // since there are no workspaces, and we don't need to follow any extends keys. + if e.isSinglePackage { + if len(taskDefinitions) == 0 { + return nil, fmt.Errorf("Could not find \"%s\" in root turbo.json", taskID) + } + return taskDefinitions, nil + } + + // If the taskID is a root task (e.g. //#build), we don't need to look + // for a workspace task, since these can only be defined in the root turbo.json. + taskIDPackage, _ := util.GetPackageTaskFromId(taskID) + if taskIDPackage != util.RootPkgName && taskIDPackage != ROOT_NODE_NAME { + // If there is an error, we can ignore it, since turbo.json config is not required in the workspace. + if workspaceTurboJSON, err := e.completeGraph.GetTurboConfigFromWorkspace(taskIDPackage, e.isSinglePackage); err != nil { + // swallow the error where the config file doesn't exist, but bubble up other things + if !errors.Is(err, os.ErrNotExist) { + return nil, err + } + } else { + // Run some validations on a workspace turbo.json. Note that these validations are on + // the whole struct, and not relevant to the taskID we're looking at right now. + validationErrors := workspaceTurboJSON.Validate([]fs.TurboJSONValidation{ + validateNoPackageTaskSyntax, + validateExtends, + }) + + if len(validationErrors) > 0 { + fullError := errors.New("Invalid turbo.json") + for _, validationErr := range validationErrors { + fullError = fmt.Errorf("%w\n - %s", fullError, validationErr) + } + + return nil, fullError + } + + // If there are no errors, we can (try to) add the TaskDefinition to our list. + if workspaceDefinition, ok := workspaceTurboJSON.Pipeline[taskName]; ok { + taskDefinitions = append(taskDefinitions, workspaceDefinition) + } + } + } + + if len(taskDefinitions) == 0 { + return nil, fmt.Errorf("Could not find \"%s\" in root turbo.json or \"%s\" workspace", taskID, taskIDPackage) + } + + return taskDefinitions, nil +} + +func validateNoPackageTaskSyntax(turboJSON *fs.TurboJSON) []error { + errors := []error{} + + for taskIDOrName := range turboJSON.Pipeline { + if util.IsPackageTask(taskIDOrName) { + taskName := util.StripPackageName(taskIDOrName) + errors = append(errors, fmt.Errorf("\"%s\". Use \"%s\" instead", taskIDOrName, taskName)) + } + } + + return errors +} + +func validateExtends(turboJSON *fs.TurboJSON) []error { + extendErrors := []error{} + extends := turboJSON.Extends + // TODO(mehulkar): Enable extending from more than one workspace. + if len(extends) > 1 { + extendErrors = append(extendErrors, fmt.Errorf("You can only extend from the root workspace")) + } + + // We don't support this right now + if len(extends) == 0 { + extendErrors = append(extendErrors, fmt.Errorf("No \"extends\" key found")) + } + + // TODO(mehulkar): Enable extending from non-root workspace. + if len(extends) == 1 && extends[0] != util.RootPkgName { + extendErrors = append(extendErrors, fmt.Errorf("You can only extend from the root workspace")) + } + + return extendErrors +} diff --git a/cli/internal/core/engine_test.go b/cli/internal/core/engine_test.go new file mode 100644 index 0000000..a92264d --- /dev/null +++ b/cli/internal/core/engine_test.go @@ -0,0 +1,88 @@ +package core + +import ( + "errors" + "testing" + + "github.com/vercel/turbo/cli/internal/fs" + "github.com/vercel/turbo/cli/internal/graph" + "github.com/vercel/turbo/cli/internal/workspace" + "gotest.tools/v3/assert" + + "github.com/pyr-sh/dag" +) + +func TestShortCircuiting(t *testing.T) { + var workspaceGraph dag.AcyclicGraph + workspaceGraph.Add("a") + workspaceGraph.Add("b") + workspaceGraph.Add("c") + // Dependencies: a -> b -> c + workspaceGraph.Connect(dag.BasicEdge("a", "b")) + workspaceGraph.Connect(dag.BasicEdge("b", "c")) + + buildTask := &fs.BookkeepingTaskDefinition{} + err := buildTask.UnmarshalJSON([]byte("{\"dependsOn\": [\"^build\"]}")) + assert.NilError(t, err, "BookkeepingTaskDefinition unmarshall") + + pipeline := map[string]fs.BookkeepingTaskDefinition{ + "build": *buildTask, + } + + p := NewEngine(&graph.CompleteGraph{ + WorkspaceGraph: workspaceGraph, + Pipeline: pipeline, + TaskDefinitions: map[string]*fs.TaskDefinition{}, + WorkspaceInfos: workspace.Catalog{ + PackageJSONs: map[string]*fs.PackageJSON{ + "//": {}, + "a": {}, + "b": {}, + "c": {}, + }, + TurboConfigs: map[string]*fs.TurboJSON{ + "//": { + Pipeline: pipeline, + }, + }, + }, + }, false) + + p.AddTask("build") + + err = p.Prepare(&EngineBuildingOptions{ + Packages: []string{"a", "b", "c"}, + TaskNames: []string{"build"}, + TasksOnly: false, + }) + + if err != nil { + t.Fatalf("%v", err) + } + + executed := map[string]bool{ + "a#build": false, + "b#build": false, + "c#build": false, + } + expectedErr := errors.New("an error occurred") + // b#build is going to error, we expect to not execute a#build, which depends on b + testVisitor := func(taskID string) error { + println(taskID) + executed[taskID] = true + if taskID == "b#build" { + return expectedErr + } + return nil + } + + errs := p.Execute(testVisitor, EngineExecutionOptions{ + Concurrency: 10, + }) + assert.Equal(t, len(errs), 1) + assert.Equal(t, errs[0], expectedErr) + + assert.Equal(t, executed["c#build"], true) + assert.Equal(t, executed["b#build"], true) + assert.Equal(t, executed["a#build"], false) +} |
