diff options
Diffstat (limited to 'cli/internal/core')
| -rw-r--r-- | cli/internal/core/engine.go | 591 | ||||
| -rw-r--r-- | cli/internal/core/engine_test.go | 88 |
2 files changed, 0 insertions, 679 deletions
diff --git a/cli/internal/core/engine.go b/cli/internal/core/engine.go deleted file mode 100644 index 7f08ea8..0000000 --- a/cli/internal/core/engine.go +++ /dev/null @@ -1,591 +0,0 @@ -package core - -import ( - "errors" - "fmt" - "os" - "sort" - "strings" - "sync/atomic" - - "github.com/vercel/turbo/cli/internal/fs" - "github.com/vercel/turbo/cli/internal/graph" - "github.com/vercel/turbo/cli/internal/util" - - "github.com/pyr-sh/dag" -) - -const ROOT_NODE_NAME = "___ROOT___" - -// Task is a higher level struct that contains the underlying TaskDefinition -// but also some adjustments to it, based on business logic. -type Task struct { - Name string - // TaskDefinition contains the config for the task from turbo.json - TaskDefinition fs.TaskDefinition -} - -type Visitor = func(taskID string) error - -// Engine contains both the DAG for the packages and the tasks and implements the methods to execute tasks in them -type Engine struct { - // TaskGraph is a graph of package-tasks - TaskGraph *dag.AcyclicGraph - PackageTaskDeps map[string][]string - rootEnabledTasks util.Set - - // completeGraph is the CompleteGraph. We need this to look up the Pipeline, etc. - completeGraph *graph.CompleteGraph - // isSinglePackage is used to load turbo.json correctly - isSinglePackage bool -} - -// NewEngine creates a new engine given a topologic graph of workspace package names -func NewEngine( - completeGraph *graph.CompleteGraph, - isSinglePackage bool, -) *Engine { - return &Engine{ - completeGraph: completeGraph, - TaskGraph: &dag.AcyclicGraph{}, - PackageTaskDeps: map[string][]string{}, - rootEnabledTasks: make(util.Set), - isSinglePackage: isSinglePackage, - } -} - -// EngineBuildingOptions help construct the TaskGraph -type EngineBuildingOptions struct { - // Packages in the execution scope, if nil, all packages will be considered in scope - Packages []string - // TaskNames in the execution scope, if nil, all tasks will be executed - TaskNames []string - // Restrict execution to only the listed task names - TasksOnly bool -} - -// EngineExecutionOptions controls a single walk of the task graph -type EngineExecutionOptions struct { - // Parallel is whether to run tasks in parallel - Parallel bool - // Concurrency is the number of concurrent tasks that can be executed - Concurrency int -} - -// Execute executes the pipeline, constructing an internal task graph and walking it accordingly. -func (e *Engine) Execute(visitor Visitor, opts EngineExecutionOptions) []error { - var sema = util.NewSemaphore(opts.Concurrency) - var errored int32 - return e.TaskGraph.Walk(func(v dag.Vertex) error { - // If something has already errored, short-circuit. - // There is a race here between concurrent tasks. However, if there is not a - // dependency edge between them, we are not required to have a strict order - // between them, so a failed task can fail to short-circuit a concurrent - // task that happened to be starting at the same time. - if atomic.LoadInt32(&errored) != 0 { - return nil - } - // Each vertex in the graph is a taskID (package#task format) - taskID := dag.VertexName(v) - - // Always return if it is the root node - if strings.Contains(taskID, ROOT_NODE_NAME) { - return nil - } - - // Acquire the semaphore unless parallel - if !opts.Parallel { - sema.Acquire() - defer sema.Release() - } - - if err := visitor(taskID); err != nil { - // We only ever flip from false to true, so we don't need to compare and swap the atomic - atomic.StoreInt32(&errored, 1) - return err - } - return nil - }) -} - -// MissingTaskError is a specialized Error thrown in the case that we can't find a task. -// We want to allow this error when getting task definitions, so we have to special case it. -type MissingTaskError struct { - workspaceName string - taskID string - taskName string -} - -func (m *MissingTaskError) Error() string { - return fmt.Sprintf("Could not find \"%s\" or \"%s\" in workspace \"%s\"", m.taskName, m.taskID, m.workspaceName) -} - -func (e *Engine) getTaskDefinition(pkg string, taskName string, taskID string) (*Task, error) { - pipeline, err := e.completeGraph.GetPipelineFromWorkspace(pkg, e.isSinglePackage) - - if err != nil { - if pkg != util.RootPkgName { - // If there was no turbo.json in the workspace, fallback to the root turbo.json - if errors.Is(err, os.ErrNotExist) { - return e.getTaskDefinition(util.RootPkgName, taskName, taskID) - } - - // otherwise bubble it up - return nil, err - } - - return nil, err - } - - if task, ok := pipeline[taskID]; ok { - return &Task{ - Name: taskName, - TaskDefinition: task.GetTaskDefinition(), - }, nil - } - - if task, ok := pipeline[taskName]; ok { - return &Task{ - Name: taskName, - TaskDefinition: task.GetTaskDefinition(), - }, nil - } - - // An error here means turbo.json exists, but didn't define the task. - // Fallback to the root pipeline to find the task. - if pkg != util.RootPkgName { - return e.getTaskDefinition(util.RootPkgName, taskName, taskID) - } - - // Return this as a custom type so we can ignore it specifically - return nil, &MissingTaskError{ - taskName: taskName, - taskID: taskID, - workspaceName: pkg, - } -} - -// Prepare constructs the Task Graph for a list of packages and tasks -func (e *Engine) Prepare(options *EngineBuildingOptions) error { - pkgs := options.Packages - taskNames := options.TaskNames - tasksOnly := options.TasksOnly - - // If there are no affected packages, we don't need to go through all this work - // we can just exit early. - // TODO(mehulkar): but we still need to validate bad task names? - if len(pkgs) == 0 { - return nil - } - - traversalQueue := []string{} - - // get a set of taskNames passed in. we'll remove the ones that have a definition - missing := util.SetFromStrings(taskNames) - - // Get a list of entry points into our TaskGraph. - // We do this by taking the input taskNames, and pkgs - // and creating a queue of taskIDs that we can traverse and gather dependencies from. - for _, pkg := range pkgs { - for _, taskName := range taskNames { - taskID := util.GetTaskId(pkg, taskName) - - // Look up the task in the package - foundTask, err := e.getTaskDefinition(pkg, taskName, taskID) - - // We can skip MissingTaskErrors because we'll validate against them later - // Return all other errors - if err != nil { - var e *MissingTaskError - if errors.As(err, &e) { - // Initially, non-package tasks are not required to exist, as long as some - // package in the list packages defines it as a package-task. Dependencies - // *are* required to have a definition. - continue - } - - return err - } - - // If we found a task definition, remove it from the missing list - if foundTask != nil { - // delete taskName if it was found - missing.Delete(taskName) - - // Even if a task definition was found, we _only_ want to add it as an entry point to - // the task graph (i.e. the traversalQueue), if it's: - // - A task from the non-root workspace (i.e. tasks from every other workspace) - // - A task that we *know* is rootEnabled task (in which case, the root workspace is acceptable) - isRootPkg := pkg == util.RootPkgName - if !isRootPkg || e.rootEnabledTasks.Includes(taskName) { - traversalQueue = append(traversalQueue, taskID) - } - } - } - } - - visited := make(util.Set) - - // validate that all tasks passed were found - missingList := missing.UnsafeListOfStrings() - sort.Strings(missingList) - - if len(missingList) > 0 { - return fmt.Errorf("Could not find the following tasks in project: %s", strings.Join(missingList, ", ")) - } - - // Things get appended to traversalQueue inside this loop, so we use the len() check instead of range. - for len(traversalQueue) > 0 { - // pop off the first item from the traversalQueue - taskID := traversalQueue[0] - traversalQueue = traversalQueue[1:] - - pkg, taskName := util.GetPackageTaskFromId(taskID) - - if pkg == util.RootPkgName && !e.rootEnabledTasks.Includes(taskName) { - return fmt.Errorf("%v needs an entry in turbo.json before it can be depended on because it is a task run from the root package", taskID) - } - - if pkg != ROOT_NODE_NAME { - if _, ok := e.completeGraph.WorkspaceInfos.PackageJSONs[pkg]; !ok { - // If we have a pkg it should be in WorkspaceInfos. - // If we're hitting this error something has gone wrong earlier when building WorkspaceInfos - // or the workspace really doesn't exist and turbo.json is misconfigured. - return fmt.Errorf("Could not find workspace \"%s\" from task \"%s\" in project", pkg, taskID) - } - } - - taskDefinitions, err := e.getTaskDefinitionChain(taskID, taskName) - if err != nil { - return err - } - - taskDefinition, err := fs.MergeTaskDefinitions(taskDefinitions) - if err != nil { - return err - } - - // Skip this iteration of the loop if we've already seen this taskID - if visited.Includes(taskID) { - continue - } - - visited.Add(taskID) - - // Put this taskDefinition into the Graph so we can look it up later during execution. - e.completeGraph.TaskDefinitions[taskID] = taskDefinition - - topoDeps := util.SetFromStrings(taskDefinition.TopologicalDependencies) - deps := make(util.Set) - isPackageTask := util.IsPackageTask(taskName) - - for _, dependency := range taskDefinition.TaskDependencies { - // If the current task is a workspace-specific task (including root Task) - // and its dependency is _also_ a workspace-specific task, we need to add - // a reference to this dependency directly into the engine. - // TODO @mehulkar: Why do we need this? - if isPackageTask && util.IsPackageTask(dependency) { - if err := e.AddDep(dependency, taskName); err != nil { - return err - } - } else { - // For non-workspace-specific dependencies, we attach a reference to - // the task that is added into the engine. - deps.Add(dependency) - } - } - - // Filter down the tasks if there's a filter in place - // https: //turbo.build/repo/docs/reference/command-line-reference#--only - if tasksOnly { - deps = deps.Filter(func(d interface{}) bool { - for _, target := range taskNames { - return fmt.Sprintf("%v", d) == target - } - return false - }) - topoDeps = topoDeps.Filter(func(d interface{}) bool { - for _, target := range taskNames { - return fmt.Sprintf("%v", d) == target - } - return false - }) - } - - toTaskID := taskID - - // hasTopoDeps will be true if the task depends on any tasks from dependency packages - // E.g. `dev: { dependsOn: [^dev] }` - hasTopoDeps := topoDeps.Len() > 0 && e.completeGraph.WorkspaceGraph.DownEdges(pkg).Len() > 0 - - // hasDeps will be true if the task depends on any tasks from its own package - // E.g. `build: { dependsOn: [dev] }` - hasDeps := deps.Len() > 0 - - // hasPackageTaskDeps will be true if this is a workspace-specific task, and - // it depends on another workspace-specific tasks - // E.g. `my-package#build: { dependsOn: [my-package#beforebuild] }`. - hasPackageTaskDeps := false - if _, ok := e.PackageTaskDeps[toTaskID]; ok { - hasPackageTaskDeps = true - } - - if hasTopoDeps { - depPkgs := e.completeGraph.WorkspaceGraph.DownEdges(pkg) - for _, from := range topoDeps.UnsafeListOfStrings() { - // add task dep from all the package deps within repo - for depPkg := range depPkgs { - fromTaskID := util.GetTaskId(depPkg, from) - e.TaskGraph.Add(fromTaskID) - e.TaskGraph.Add(toTaskID) - e.TaskGraph.Connect(dag.BasicEdge(toTaskID, fromTaskID)) - traversalQueue = append(traversalQueue, fromTaskID) - } - } - } - - if hasDeps { - for _, from := range deps.UnsafeListOfStrings() { - fromTaskID := util.GetTaskId(pkg, from) - e.TaskGraph.Add(fromTaskID) - e.TaskGraph.Add(toTaskID) - e.TaskGraph.Connect(dag.BasicEdge(toTaskID, fromTaskID)) - traversalQueue = append(traversalQueue, fromTaskID) - } - } - - if hasPackageTaskDeps { - if pkgTaskDeps, ok := e.PackageTaskDeps[toTaskID]; ok { - for _, fromTaskID := range pkgTaskDeps { - e.TaskGraph.Add(fromTaskID) - e.TaskGraph.Add(toTaskID) - e.TaskGraph.Connect(dag.BasicEdge(toTaskID, fromTaskID)) - traversalQueue = append(traversalQueue, fromTaskID) - } - } - } - - // Add the root node into the graph - if !hasDeps && !hasTopoDeps && !hasPackageTaskDeps { - e.TaskGraph.Add(ROOT_NODE_NAME) - e.TaskGraph.Add(toTaskID) - e.TaskGraph.Connect(dag.BasicEdge(toTaskID, ROOT_NODE_NAME)) - } - } - - return nil -} - -// AddTask adds root tasks to the engine so they can be looked up later. -func (e *Engine) AddTask(taskName string) { - if util.IsPackageTask(taskName) { - pkg, taskName := util.GetPackageTaskFromId(taskName) - if pkg == util.RootPkgName { - e.rootEnabledTasks.Add(taskName) - } - } -} - -// AddDep adds tuples from+to task ID combos in tuple format so they can be looked up later. -func (e *Engine) AddDep(fromTaskID string, toTaskID string) error { - fromPkg, _ := util.GetPackageTaskFromId(fromTaskID) - if fromPkg != ROOT_NODE_NAME && fromPkg != util.RootPkgName && !e.completeGraph.WorkspaceGraph.HasVertex(fromPkg) { - return fmt.Errorf("found reference to unknown package: %v in task %v", fromPkg, fromTaskID) - } - - if _, ok := e.PackageTaskDeps[toTaskID]; !ok { - e.PackageTaskDeps[toTaskID] = []string{} - } - - e.PackageTaskDeps[toTaskID] = append(e.PackageTaskDeps[toTaskID], fromTaskID) - - return nil -} - -// ValidatePersistentDependencies checks if any task dependsOn persistent tasks and throws -// an error if that task is actually implemented -func (e *Engine) ValidatePersistentDependencies(graph *graph.CompleteGraph, concurrency int) error { - var validationError error - persistentCount := 0 - - // Adding in a lock because otherwise walking the graph can introduce a data race - // (reproducible with `go test -race`) - var sema = util.NewSemaphore(1) - - errs := e.TaskGraph.Walk(func(v dag.Vertex) error { - vertexName := dag.VertexName(v) // vertexName is a taskID - - // No need to check the root node if that's where we are. - if strings.Contains(vertexName, ROOT_NODE_NAME) { - return nil - } - - // Aquire a lock, because otherwise walking this group can cause a race condition - // writing to the same validationError var defined outside the Walk(). This shows - // up when running tests with the `-race` flag. - sema.Acquire() - defer sema.Release() - - currentTaskDefinition, currentTaskExists := e.completeGraph.TaskDefinitions[vertexName] - if currentTaskExists && currentTaskDefinition.Persistent { - persistentCount++ - } - - currentPackageName, currentTaskName := util.GetPackageTaskFromId(vertexName) - - // For each "downEdge" (i.e. each task that _this_ task dependsOn) - // check if the downEdge is a Persistent task, and if it actually has the script implemented - // in that package's package.json - for dep := range e.TaskGraph.DownEdges(vertexName) { - depTaskID := dep.(string) - // No need to check the root node - if strings.Contains(depTaskID, ROOT_NODE_NAME) { - return nil - } - - // Parse the taskID of this dependency task - packageName, taskName := util.GetPackageTaskFromId(depTaskID) - - // Get the Task Definition so we can check if it is Persistent - depTaskDefinition, taskExists := e.completeGraph.TaskDefinitions[depTaskID] - - if !taskExists { - return fmt.Errorf("Cannot find task definition for %v in package %v", depTaskID, packageName) - } - - // Get information about the package - pkg, pkgExists := graph.WorkspaceInfos.PackageJSONs[packageName] - if !pkgExists { - return fmt.Errorf("Cannot find package %v", packageName) - } - _, hasScript := pkg.Scripts[taskName] - - // If both conditions are true set a value and break out of checking the dependencies - if depTaskDefinition.Persistent && hasScript { - validationError = fmt.Errorf( - "\"%s\" is a persistent task, \"%s\" cannot depend on it", - util.GetTaskId(packageName, taskName), - util.GetTaskId(currentPackageName, currentTaskName), - ) - - break - } - } - - return nil - }) - - for _, err := range errs { - return fmt.Errorf("Validation failed: %v", err) - } - - if validationError != nil { - return validationError - } else if persistentCount >= concurrency { - return fmt.Errorf("You have %v persistent tasks but `turbo` is configured for concurrency of %v. Set --concurrency to at least %v", persistentCount, concurrency, persistentCount+1) - } - - return nil -} - -// getTaskDefinitionChain gets a set of TaskDefinitions that apply to the taskID. -// These definitions should be merged by the consumer. -func (e *Engine) getTaskDefinitionChain(taskID string, taskName string) ([]fs.BookkeepingTaskDefinition, error) { - // Start a list of TaskDefinitions we've found for this TaskID - taskDefinitions := []fs.BookkeepingTaskDefinition{} - - rootPipeline, err := e.completeGraph.GetPipelineFromWorkspace(util.RootPkgName, e.isSinglePackage) - if err != nil { - // It should be very unlikely that we can't find a root pipeline. Even for single package repos - // the pipeline is synthesized from package.json, so there should be _something_ here. - return nil, err - } - - // Look for the taskDefinition in the root pipeline. - if rootTaskDefinition, err := rootPipeline.GetTask(taskID, taskName); err == nil { - taskDefinitions = append(taskDefinitions, *rootTaskDefinition) - } - - // If we're in a single package repo, we can just exit with the TaskDefinition in the root pipeline - // since there are no workspaces, and we don't need to follow any extends keys. - if e.isSinglePackage { - if len(taskDefinitions) == 0 { - return nil, fmt.Errorf("Could not find \"%s\" in root turbo.json", taskID) - } - return taskDefinitions, nil - } - - // If the taskID is a root task (e.g. //#build), we don't need to look - // for a workspace task, since these can only be defined in the root turbo.json. - taskIDPackage, _ := util.GetPackageTaskFromId(taskID) - if taskIDPackage != util.RootPkgName && taskIDPackage != ROOT_NODE_NAME { - // If there is an error, we can ignore it, since turbo.json config is not required in the workspace. - if workspaceTurboJSON, err := e.completeGraph.GetTurboConfigFromWorkspace(taskIDPackage, e.isSinglePackage); err != nil { - // swallow the error where the config file doesn't exist, but bubble up other things - if !errors.Is(err, os.ErrNotExist) { - return nil, err - } - } else { - // Run some validations on a workspace turbo.json. Note that these validations are on - // the whole struct, and not relevant to the taskID we're looking at right now. - validationErrors := workspaceTurboJSON.Validate([]fs.TurboJSONValidation{ - validateNoPackageTaskSyntax, - validateExtends, - }) - - if len(validationErrors) > 0 { - fullError := errors.New("Invalid turbo.json") - for _, validationErr := range validationErrors { - fullError = fmt.Errorf("%w\n - %s", fullError, validationErr) - } - - return nil, fullError - } - - // If there are no errors, we can (try to) add the TaskDefinition to our list. - if workspaceDefinition, ok := workspaceTurboJSON.Pipeline[taskName]; ok { - taskDefinitions = append(taskDefinitions, workspaceDefinition) - } - } - } - - if len(taskDefinitions) == 0 { - return nil, fmt.Errorf("Could not find \"%s\" in root turbo.json or \"%s\" workspace", taskID, taskIDPackage) - } - - return taskDefinitions, nil -} - -func validateNoPackageTaskSyntax(turboJSON *fs.TurboJSON) []error { - errors := []error{} - - for taskIDOrName := range turboJSON.Pipeline { - if util.IsPackageTask(taskIDOrName) { - taskName := util.StripPackageName(taskIDOrName) - errors = append(errors, fmt.Errorf("\"%s\". Use \"%s\" instead", taskIDOrName, taskName)) - } - } - - return errors -} - -func validateExtends(turboJSON *fs.TurboJSON) []error { - extendErrors := []error{} - extends := turboJSON.Extends - // TODO(mehulkar): Enable extending from more than one workspace. - if len(extends) > 1 { - extendErrors = append(extendErrors, fmt.Errorf("You can only extend from the root workspace")) - } - - // We don't support this right now - if len(extends) == 0 { - extendErrors = append(extendErrors, fmt.Errorf("No \"extends\" key found")) - } - - // TODO(mehulkar): Enable extending from non-root workspace. - if len(extends) == 1 && extends[0] != util.RootPkgName { - extendErrors = append(extendErrors, fmt.Errorf("You can only extend from the root workspace")) - } - - return extendErrors -} diff --git a/cli/internal/core/engine_test.go b/cli/internal/core/engine_test.go deleted file mode 100644 index a92264d..0000000 --- a/cli/internal/core/engine_test.go +++ /dev/null @@ -1,88 +0,0 @@ -package core - -import ( - "errors" - "testing" - - "github.com/vercel/turbo/cli/internal/fs" - "github.com/vercel/turbo/cli/internal/graph" - "github.com/vercel/turbo/cli/internal/workspace" - "gotest.tools/v3/assert" - - "github.com/pyr-sh/dag" -) - -func TestShortCircuiting(t *testing.T) { - var workspaceGraph dag.AcyclicGraph - workspaceGraph.Add("a") - workspaceGraph.Add("b") - workspaceGraph.Add("c") - // Dependencies: a -> b -> c - workspaceGraph.Connect(dag.BasicEdge("a", "b")) - workspaceGraph.Connect(dag.BasicEdge("b", "c")) - - buildTask := &fs.BookkeepingTaskDefinition{} - err := buildTask.UnmarshalJSON([]byte("{\"dependsOn\": [\"^build\"]}")) - assert.NilError(t, err, "BookkeepingTaskDefinition unmarshall") - - pipeline := map[string]fs.BookkeepingTaskDefinition{ - "build": *buildTask, - } - - p := NewEngine(&graph.CompleteGraph{ - WorkspaceGraph: workspaceGraph, - Pipeline: pipeline, - TaskDefinitions: map[string]*fs.TaskDefinition{}, - WorkspaceInfos: workspace.Catalog{ - PackageJSONs: map[string]*fs.PackageJSON{ - "//": {}, - "a": {}, - "b": {}, - "c": {}, - }, - TurboConfigs: map[string]*fs.TurboJSON{ - "//": { - Pipeline: pipeline, - }, - }, - }, - }, false) - - p.AddTask("build") - - err = p.Prepare(&EngineBuildingOptions{ - Packages: []string{"a", "b", "c"}, - TaskNames: []string{"build"}, - TasksOnly: false, - }) - - if err != nil { - t.Fatalf("%v", err) - } - - executed := map[string]bool{ - "a#build": false, - "b#build": false, - "c#build": false, - } - expectedErr := errors.New("an error occurred") - // b#build is going to error, we expect to not execute a#build, which depends on b - testVisitor := func(taskID string) error { - println(taskID) - executed[taskID] = true - if taskID == "b#build" { - return expectedErr - } - return nil - } - - errs := p.Execute(testVisitor, EngineExecutionOptions{ - Concurrency: 10, - }) - assert.Equal(t, len(errs), 1) - assert.Equal(t, errs[0], expectedErr) - - assert.Equal(t, executed["c#build"], true) - assert.Equal(t, executed["b#build"], true) - assert.Equal(t, executed["a#build"], false) -} |
