aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/cli/internal/core/engine.go
diff options
context:
space:
mode:
author简律纯 <hsiangnianian@outlook.com>2023-04-28 01:36:44 +0800
committer简律纯 <hsiangnianian@outlook.com>2023-04-28 01:36:44 +0800
commitdd84b9d64fb98746a230cd24233ff50a562c39c9 (patch)
treeb583261ef00b3afe72ec4d6dacb31e57779a6faf /cli/internal/core/engine.go
parent0b46fcd72ac34382387b2bcf9095233efbcc52f4 (diff)
downloadHydroRoll-dd84b9d64fb98746a230cd24233ff50a562c39c9.tar.gz
HydroRoll-dd84b9d64fb98746a230cd24233ff50a562c39c9.zip
Diffstat (limited to 'cli/internal/core/engine.go')
-rw-r--r--cli/internal/core/engine.go591
1 files changed, 591 insertions, 0 deletions
diff --git a/cli/internal/core/engine.go b/cli/internal/core/engine.go
new file mode 100644
index 0000000..7f08ea8
--- /dev/null
+++ b/cli/internal/core/engine.go
@@ -0,0 +1,591 @@
+package core
+
+import (
+ "errors"
+ "fmt"
+ "os"
+ "sort"
+ "strings"
+ "sync/atomic"
+
+ "github.com/vercel/turbo/cli/internal/fs"
+ "github.com/vercel/turbo/cli/internal/graph"
+ "github.com/vercel/turbo/cli/internal/util"
+
+ "github.com/pyr-sh/dag"
+)
+
+const ROOT_NODE_NAME = "___ROOT___"
+
+// Task is a higher level struct that contains the underlying TaskDefinition
+// but also some adjustments to it, based on business logic.
+type Task struct {
+ Name string
+ // TaskDefinition contains the config for the task from turbo.json
+ TaskDefinition fs.TaskDefinition
+}
+
+type Visitor = func(taskID string) error
+
+// Engine contains both the DAG for the packages and the tasks and implements the methods to execute tasks in them
+type Engine struct {
+ // TaskGraph is a graph of package-tasks
+ TaskGraph *dag.AcyclicGraph
+ PackageTaskDeps map[string][]string
+ rootEnabledTasks util.Set
+
+ // completeGraph is the CompleteGraph. We need this to look up the Pipeline, etc.
+ completeGraph *graph.CompleteGraph
+ // isSinglePackage is used to load turbo.json correctly
+ isSinglePackage bool
+}
+
+// NewEngine creates a new engine given a topologic graph of workspace package names
+func NewEngine(
+ completeGraph *graph.CompleteGraph,
+ isSinglePackage bool,
+) *Engine {
+ return &Engine{
+ completeGraph: completeGraph,
+ TaskGraph: &dag.AcyclicGraph{},
+ PackageTaskDeps: map[string][]string{},
+ rootEnabledTasks: make(util.Set),
+ isSinglePackage: isSinglePackage,
+ }
+}
+
+// EngineBuildingOptions help construct the TaskGraph
+type EngineBuildingOptions struct {
+ // Packages in the execution scope, if nil, all packages will be considered in scope
+ Packages []string
+ // TaskNames in the execution scope, if nil, all tasks will be executed
+ TaskNames []string
+ // Restrict execution to only the listed task names
+ TasksOnly bool
+}
+
+// EngineExecutionOptions controls a single walk of the task graph
+type EngineExecutionOptions struct {
+ // Parallel is whether to run tasks in parallel
+ Parallel bool
+ // Concurrency is the number of concurrent tasks that can be executed
+ Concurrency int
+}
+
+// Execute executes the pipeline, constructing an internal task graph and walking it accordingly.
+func (e *Engine) Execute(visitor Visitor, opts EngineExecutionOptions) []error {
+ var sema = util.NewSemaphore(opts.Concurrency)
+ var errored int32
+ return e.TaskGraph.Walk(func(v dag.Vertex) error {
+ // If something has already errored, short-circuit.
+ // There is a race here between concurrent tasks. However, if there is not a
+ // dependency edge between them, we are not required to have a strict order
+ // between them, so a failed task can fail to short-circuit a concurrent
+ // task that happened to be starting at the same time.
+ if atomic.LoadInt32(&errored) != 0 {
+ return nil
+ }
+ // Each vertex in the graph is a taskID (package#task format)
+ taskID := dag.VertexName(v)
+
+ // Always return if it is the root node
+ if strings.Contains(taskID, ROOT_NODE_NAME) {
+ return nil
+ }
+
+ // Acquire the semaphore unless parallel
+ if !opts.Parallel {
+ sema.Acquire()
+ defer sema.Release()
+ }
+
+ if err := visitor(taskID); err != nil {
+ // We only ever flip from false to true, so we don't need to compare and swap the atomic
+ atomic.StoreInt32(&errored, 1)
+ return err
+ }
+ return nil
+ })
+}
+
+// MissingTaskError is a specialized Error thrown in the case that we can't find a task.
+// We want to allow this error when getting task definitions, so we have to special case it.
+type MissingTaskError struct {
+ workspaceName string
+ taskID string
+ taskName string
+}
+
+func (m *MissingTaskError) Error() string {
+ return fmt.Sprintf("Could not find \"%s\" or \"%s\" in workspace \"%s\"", m.taskName, m.taskID, m.workspaceName)
+}
+
+func (e *Engine) getTaskDefinition(pkg string, taskName string, taskID string) (*Task, error) {
+ pipeline, err := e.completeGraph.GetPipelineFromWorkspace(pkg, e.isSinglePackage)
+
+ if err != nil {
+ if pkg != util.RootPkgName {
+ // If there was no turbo.json in the workspace, fallback to the root turbo.json
+ if errors.Is(err, os.ErrNotExist) {
+ return e.getTaskDefinition(util.RootPkgName, taskName, taskID)
+ }
+
+ // otherwise bubble it up
+ return nil, err
+ }
+
+ return nil, err
+ }
+
+ if task, ok := pipeline[taskID]; ok {
+ return &Task{
+ Name: taskName,
+ TaskDefinition: task.GetTaskDefinition(),
+ }, nil
+ }
+
+ if task, ok := pipeline[taskName]; ok {
+ return &Task{
+ Name: taskName,
+ TaskDefinition: task.GetTaskDefinition(),
+ }, nil
+ }
+
+ // An error here means turbo.json exists, but didn't define the task.
+ // Fallback to the root pipeline to find the task.
+ if pkg != util.RootPkgName {
+ return e.getTaskDefinition(util.RootPkgName, taskName, taskID)
+ }
+
+ // Return this as a custom type so we can ignore it specifically
+ return nil, &MissingTaskError{
+ taskName: taskName,
+ taskID: taskID,
+ workspaceName: pkg,
+ }
+}
+
+// Prepare constructs the Task Graph for a list of packages and tasks
+func (e *Engine) Prepare(options *EngineBuildingOptions) error {
+ pkgs := options.Packages
+ taskNames := options.TaskNames
+ tasksOnly := options.TasksOnly
+
+ // If there are no affected packages, we don't need to go through all this work
+ // we can just exit early.
+ // TODO(mehulkar): but we still need to validate bad task names?
+ if len(pkgs) == 0 {
+ return nil
+ }
+
+ traversalQueue := []string{}
+
+ // get a set of taskNames passed in. we'll remove the ones that have a definition
+ missing := util.SetFromStrings(taskNames)
+
+ // Get a list of entry points into our TaskGraph.
+ // We do this by taking the input taskNames, and pkgs
+ // and creating a queue of taskIDs that we can traverse and gather dependencies from.
+ for _, pkg := range pkgs {
+ for _, taskName := range taskNames {
+ taskID := util.GetTaskId(pkg, taskName)
+
+ // Look up the task in the package
+ foundTask, err := e.getTaskDefinition(pkg, taskName, taskID)
+
+ // We can skip MissingTaskErrors because we'll validate against them later
+ // Return all other errors
+ if err != nil {
+ var e *MissingTaskError
+ if errors.As(err, &e) {
+ // Initially, non-package tasks are not required to exist, as long as some
+ // package in the list packages defines it as a package-task. Dependencies
+ // *are* required to have a definition.
+ continue
+ }
+
+ return err
+ }
+
+ // If we found a task definition, remove it from the missing list
+ if foundTask != nil {
+ // delete taskName if it was found
+ missing.Delete(taskName)
+
+ // Even if a task definition was found, we _only_ want to add it as an entry point to
+ // the task graph (i.e. the traversalQueue), if it's:
+ // - A task from the non-root workspace (i.e. tasks from every other workspace)
+ // - A task that we *know* is rootEnabled task (in which case, the root workspace is acceptable)
+ isRootPkg := pkg == util.RootPkgName
+ if !isRootPkg || e.rootEnabledTasks.Includes(taskName) {
+ traversalQueue = append(traversalQueue, taskID)
+ }
+ }
+ }
+ }
+
+ visited := make(util.Set)
+
+ // validate that all tasks passed were found
+ missingList := missing.UnsafeListOfStrings()
+ sort.Strings(missingList)
+
+ if len(missingList) > 0 {
+ return fmt.Errorf("Could not find the following tasks in project: %s", strings.Join(missingList, ", "))
+ }
+
+ // Things get appended to traversalQueue inside this loop, so we use the len() check instead of range.
+ for len(traversalQueue) > 0 {
+ // pop off the first item from the traversalQueue
+ taskID := traversalQueue[0]
+ traversalQueue = traversalQueue[1:]
+
+ pkg, taskName := util.GetPackageTaskFromId(taskID)
+
+ if pkg == util.RootPkgName && !e.rootEnabledTasks.Includes(taskName) {
+ return fmt.Errorf("%v needs an entry in turbo.json before it can be depended on because it is a task run from the root package", taskID)
+ }
+
+ if pkg != ROOT_NODE_NAME {
+ if _, ok := e.completeGraph.WorkspaceInfos.PackageJSONs[pkg]; !ok {
+ // If we have a pkg it should be in WorkspaceInfos.
+ // If we're hitting this error something has gone wrong earlier when building WorkspaceInfos
+ // or the workspace really doesn't exist and turbo.json is misconfigured.
+ return fmt.Errorf("Could not find workspace \"%s\" from task \"%s\" in project", pkg, taskID)
+ }
+ }
+
+ taskDefinitions, err := e.getTaskDefinitionChain(taskID, taskName)
+ if err != nil {
+ return err
+ }
+
+ taskDefinition, err := fs.MergeTaskDefinitions(taskDefinitions)
+ if err != nil {
+ return err
+ }
+
+ // Skip this iteration of the loop if we've already seen this taskID
+ if visited.Includes(taskID) {
+ continue
+ }
+
+ visited.Add(taskID)
+
+ // Put this taskDefinition into the Graph so we can look it up later during execution.
+ e.completeGraph.TaskDefinitions[taskID] = taskDefinition
+
+ topoDeps := util.SetFromStrings(taskDefinition.TopologicalDependencies)
+ deps := make(util.Set)
+ isPackageTask := util.IsPackageTask(taskName)
+
+ for _, dependency := range taskDefinition.TaskDependencies {
+ // If the current task is a workspace-specific task (including root Task)
+ // and its dependency is _also_ a workspace-specific task, we need to add
+ // a reference to this dependency directly into the engine.
+ // TODO @mehulkar: Why do we need this?
+ if isPackageTask && util.IsPackageTask(dependency) {
+ if err := e.AddDep(dependency, taskName); err != nil {
+ return err
+ }
+ } else {
+ // For non-workspace-specific dependencies, we attach a reference to
+ // the task that is added into the engine.
+ deps.Add(dependency)
+ }
+ }
+
+ // Filter down the tasks if there's a filter in place
+ // https: //turbo.build/repo/docs/reference/command-line-reference#--only
+ if tasksOnly {
+ deps = deps.Filter(func(d interface{}) bool {
+ for _, target := range taskNames {
+ return fmt.Sprintf("%v", d) == target
+ }
+ return false
+ })
+ topoDeps = topoDeps.Filter(func(d interface{}) bool {
+ for _, target := range taskNames {
+ return fmt.Sprintf("%v", d) == target
+ }
+ return false
+ })
+ }
+
+ toTaskID := taskID
+
+ // hasTopoDeps will be true if the task depends on any tasks from dependency packages
+ // E.g. `dev: { dependsOn: [^dev] }`
+ hasTopoDeps := topoDeps.Len() > 0 && e.completeGraph.WorkspaceGraph.DownEdges(pkg).Len() > 0
+
+ // hasDeps will be true if the task depends on any tasks from its own package
+ // E.g. `build: { dependsOn: [dev] }`
+ hasDeps := deps.Len() > 0
+
+ // hasPackageTaskDeps will be true if this is a workspace-specific task, and
+ // it depends on another workspace-specific tasks
+ // E.g. `my-package#build: { dependsOn: [my-package#beforebuild] }`.
+ hasPackageTaskDeps := false
+ if _, ok := e.PackageTaskDeps[toTaskID]; ok {
+ hasPackageTaskDeps = true
+ }
+
+ if hasTopoDeps {
+ depPkgs := e.completeGraph.WorkspaceGraph.DownEdges(pkg)
+ for _, from := range topoDeps.UnsafeListOfStrings() {
+ // add task dep from all the package deps within repo
+ for depPkg := range depPkgs {
+ fromTaskID := util.GetTaskId(depPkg, from)
+ e.TaskGraph.Add(fromTaskID)
+ e.TaskGraph.Add(toTaskID)
+ e.TaskGraph.Connect(dag.BasicEdge(toTaskID, fromTaskID))
+ traversalQueue = append(traversalQueue, fromTaskID)
+ }
+ }
+ }
+
+ if hasDeps {
+ for _, from := range deps.UnsafeListOfStrings() {
+ fromTaskID := util.GetTaskId(pkg, from)
+ e.TaskGraph.Add(fromTaskID)
+ e.TaskGraph.Add(toTaskID)
+ e.TaskGraph.Connect(dag.BasicEdge(toTaskID, fromTaskID))
+ traversalQueue = append(traversalQueue, fromTaskID)
+ }
+ }
+
+ if hasPackageTaskDeps {
+ if pkgTaskDeps, ok := e.PackageTaskDeps[toTaskID]; ok {
+ for _, fromTaskID := range pkgTaskDeps {
+ e.TaskGraph.Add(fromTaskID)
+ e.TaskGraph.Add(toTaskID)
+ e.TaskGraph.Connect(dag.BasicEdge(toTaskID, fromTaskID))
+ traversalQueue = append(traversalQueue, fromTaskID)
+ }
+ }
+ }
+
+ // Add the root node into the graph
+ if !hasDeps && !hasTopoDeps && !hasPackageTaskDeps {
+ e.TaskGraph.Add(ROOT_NODE_NAME)
+ e.TaskGraph.Add(toTaskID)
+ e.TaskGraph.Connect(dag.BasicEdge(toTaskID, ROOT_NODE_NAME))
+ }
+ }
+
+ return nil
+}
+
+// AddTask adds root tasks to the engine so they can be looked up later.
+func (e *Engine) AddTask(taskName string) {
+ if util.IsPackageTask(taskName) {
+ pkg, taskName := util.GetPackageTaskFromId(taskName)
+ if pkg == util.RootPkgName {
+ e.rootEnabledTasks.Add(taskName)
+ }
+ }
+}
+
+// AddDep adds tuples from+to task ID combos in tuple format so they can be looked up later.
+func (e *Engine) AddDep(fromTaskID string, toTaskID string) error {
+ fromPkg, _ := util.GetPackageTaskFromId(fromTaskID)
+ if fromPkg != ROOT_NODE_NAME && fromPkg != util.RootPkgName && !e.completeGraph.WorkspaceGraph.HasVertex(fromPkg) {
+ return fmt.Errorf("found reference to unknown package: %v in task %v", fromPkg, fromTaskID)
+ }
+
+ if _, ok := e.PackageTaskDeps[toTaskID]; !ok {
+ e.PackageTaskDeps[toTaskID] = []string{}
+ }
+
+ e.PackageTaskDeps[toTaskID] = append(e.PackageTaskDeps[toTaskID], fromTaskID)
+
+ return nil
+}
+
+// ValidatePersistentDependencies checks if any task dependsOn persistent tasks and throws
+// an error if that task is actually implemented
+func (e *Engine) ValidatePersistentDependencies(graph *graph.CompleteGraph, concurrency int) error {
+ var validationError error
+ persistentCount := 0
+
+ // Adding in a lock because otherwise walking the graph can introduce a data race
+ // (reproducible with `go test -race`)
+ var sema = util.NewSemaphore(1)
+
+ errs := e.TaskGraph.Walk(func(v dag.Vertex) error {
+ vertexName := dag.VertexName(v) // vertexName is a taskID
+
+ // No need to check the root node if that's where we are.
+ if strings.Contains(vertexName, ROOT_NODE_NAME) {
+ return nil
+ }
+
+ // Aquire a lock, because otherwise walking this group can cause a race condition
+ // writing to the same validationError var defined outside the Walk(). This shows
+ // up when running tests with the `-race` flag.
+ sema.Acquire()
+ defer sema.Release()
+
+ currentTaskDefinition, currentTaskExists := e.completeGraph.TaskDefinitions[vertexName]
+ if currentTaskExists && currentTaskDefinition.Persistent {
+ persistentCount++
+ }
+
+ currentPackageName, currentTaskName := util.GetPackageTaskFromId(vertexName)
+
+ // For each "downEdge" (i.e. each task that _this_ task dependsOn)
+ // check if the downEdge is a Persistent task, and if it actually has the script implemented
+ // in that package's package.json
+ for dep := range e.TaskGraph.DownEdges(vertexName) {
+ depTaskID := dep.(string)
+ // No need to check the root node
+ if strings.Contains(depTaskID, ROOT_NODE_NAME) {
+ return nil
+ }
+
+ // Parse the taskID of this dependency task
+ packageName, taskName := util.GetPackageTaskFromId(depTaskID)
+
+ // Get the Task Definition so we can check if it is Persistent
+ depTaskDefinition, taskExists := e.completeGraph.TaskDefinitions[depTaskID]
+
+ if !taskExists {
+ return fmt.Errorf("Cannot find task definition for %v in package %v", depTaskID, packageName)
+ }
+
+ // Get information about the package
+ pkg, pkgExists := graph.WorkspaceInfos.PackageJSONs[packageName]
+ if !pkgExists {
+ return fmt.Errorf("Cannot find package %v", packageName)
+ }
+ _, hasScript := pkg.Scripts[taskName]
+
+ // If both conditions are true set a value and break out of checking the dependencies
+ if depTaskDefinition.Persistent && hasScript {
+ validationError = fmt.Errorf(
+ "\"%s\" is a persistent task, \"%s\" cannot depend on it",
+ util.GetTaskId(packageName, taskName),
+ util.GetTaskId(currentPackageName, currentTaskName),
+ )
+
+ break
+ }
+ }
+
+ return nil
+ })
+
+ for _, err := range errs {
+ return fmt.Errorf("Validation failed: %v", err)
+ }
+
+ if validationError != nil {
+ return validationError
+ } else if persistentCount >= concurrency {
+ return fmt.Errorf("You have %v persistent tasks but `turbo` is configured for concurrency of %v. Set --concurrency to at least %v", persistentCount, concurrency, persistentCount+1)
+ }
+
+ return nil
+}
+
+// getTaskDefinitionChain gets a set of TaskDefinitions that apply to the taskID.
+// These definitions should be merged by the consumer.
+func (e *Engine) getTaskDefinitionChain(taskID string, taskName string) ([]fs.BookkeepingTaskDefinition, error) {
+ // Start a list of TaskDefinitions we've found for this TaskID
+ taskDefinitions := []fs.BookkeepingTaskDefinition{}
+
+ rootPipeline, err := e.completeGraph.GetPipelineFromWorkspace(util.RootPkgName, e.isSinglePackage)
+ if err != nil {
+ // It should be very unlikely that we can't find a root pipeline. Even for single package repos
+ // the pipeline is synthesized from package.json, so there should be _something_ here.
+ return nil, err
+ }
+
+ // Look for the taskDefinition in the root pipeline.
+ if rootTaskDefinition, err := rootPipeline.GetTask(taskID, taskName); err == nil {
+ taskDefinitions = append(taskDefinitions, *rootTaskDefinition)
+ }
+
+ // If we're in a single package repo, we can just exit with the TaskDefinition in the root pipeline
+ // since there are no workspaces, and we don't need to follow any extends keys.
+ if e.isSinglePackage {
+ if len(taskDefinitions) == 0 {
+ return nil, fmt.Errorf("Could not find \"%s\" in root turbo.json", taskID)
+ }
+ return taskDefinitions, nil
+ }
+
+ // If the taskID is a root task (e.g. //#build), we don't need to look
+ // for a workspace task, since these can only be defined in the root turbo.json.
+ taskIDPackage, _ := util.GetPackageTaskFromId(taskID)
+ if taskIDPackage != util.RootPkgName && taskIDPackage != ROOT_NODE_NAME {
+ // If there is an error, we can ignore it, since turbo.json config is not required in the workspace.
+ if workspaceTurboJSON, err := e.completeGraph.GetTurboConfigFromWorkspace(taskIDPackage, e.isSinglePackage); err != nil {
+ // swallow the error where the config file doesn't exist, but bubble up other things
+ if !errors.Is(err, os.ErrNotExist) {
+ return nil, err
+ }
+ } else {
+ // Run some validations on a workspace turbo.json. Note that these validations are on
+ // the whole struct, and not relevant to the taskID we're looking at right now.
+ validationErrors := workspaceTurboJSON.Validate([]fs.TurboJSONValidation{
+ validateNoPackageTaskSyntax,
+ validateExtends,
+ })
+
+ if len(validationErrors) > 0 {
+ fullError := errors.New("Invalid turbo.json")
+ for _, validationErr := range validationErrors {
+ fullError = fmt.Errorf("%w\n - %s", fullError, validationErr)
+ }
+
+ return nil, fullError
+ }
+
+ // If there are no errors, we can (try to) add the TaskDefinition to our list.
+ if workspaceDefinition, ok := workspaceTurboJSON.Pipeline[taskName]; ok {
+ taskDefinitions = append(taskDefinitions, workspaceDefinition)
+ }
+ }
+ }
+
+ if len(taskDefinitions) == 0 {
+ return nil, fmt.Errorf("Could not find \"%s\" in root turbo.json or \"%s\" workspace", taskID, taskIDPackage)
+ }
+
+ return taskDefinitions, nil
+}
+
+func validateNoPackageTaskSyntax(turboJSON *fs.TurboJSON) []error {
+ errors := []error{}
+
+ for taskIDOrName := range turboJSON.Pipeline {
+ if util.IsPackageTask(taskIDOrName) {
+ taskName := util.StripPackageName(taskIDOrName)
+ errors = append(errors, fmt.Errorf("\"%s\". Use \"%s\" instead", taskIDOrName, taskName))
+ }
+ }
+
+ return errors
+}
+
+func validateExtends(turboJSON *fs.TurboJSON) []error {
+ extendErrors := []error{}
+ extends := turboJSON.Extends
+ // TODO(mehulkar): Enable extending from more than one workspace.
+ if len(extends) > 1 {
+ extendErrors = append(extendErrors, fmt.Errorf("You can only extend from the root workspace"))
+ }
+
+ // We don't support this right now
+ if len(extends) == 0 {
+ extendErrors = append(extendErrors, fmt.Errorf("No \"extends\" key found"))
+ }
+
+ // TODO(mehulkar): Enable extending from non-root workspace.
+ if len(extends) == 1 && extends[0] != util.RootPkgName {
+ extendErrors = append(extendErrors, fmt.Errorf("You can only extend from the root workspace"))
+ }
+
+ return extendErrors
+}