aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/cli/internal/run
diff options
context:
space:
mode:
author简律纯 <hsiangnianian@outlook.com>2023-04-28 01:36:44 +0800
committer简律纯 <hsiangnianian@outlook.com>2023-04-28 01:36:44 +0800
commitdd84b9d64fb98746a230cd24233ff50a562c39c9 (patch)
treeb583261ef00b3afe72ec4d6dacb31e57779a6faf /cli/internal/run
parent0b46fcd72ac34382387b2bcf9095233efbcc52f4 (diff)
downloadHydroRoll-dd84b9d64fb98746a230cd24233ff50a562c39c9.tar.gz
HydroRoll-dd84b9d64fb98746a230cd24233ff50a562c39c9.zip
Diffstat (limited to 'cli/internal/run')
-rw-r--r--cli/internal/run/dry_run.go122
-rw-r--r--cli/internal/run/global_hash.go164
-rw-r--r--cli/internal/run/graph_run.go46
-rw-r--r--cli/internal/run/log_tag_go.go11
-rw-r--r--cli/internal/run/log_tag_rust.go11
-rw-r--r--cli/internal/run/real_run.go420
-rw-r--r--cli/internal/run/run.go487
-rw-r--r--cli/internal/run/run_spec.go90
-rw-r--r--cli/internal/run/run_spec_test.go107
9 files changed, 1458 insertions, 0 deletions
diff --git a/cli/internal/run/dry_run.go b/cli/internal/run/dry_run.go
new file mode 100644
index 0000000..eeee431
--- /dev/null
+++ b/cli/internal/run/dry_run.go
@@ -0,0 +1,122 @@
+// Package run implements `turbo run`
+// This file implements the logic for `turbo run --dry`
+package run
+
+import (
+ gocontext "context"
+ "sync"
+
+ "github.com/pkg/errors"
+ "github.com/vercel/turbo/cli/internal/cache"
+ "github.com/vercel/turbo/cli/internal/cmdutil"
+ "github.com/vercel/turbo/cli/internal/core"
+ "github.com/vercel/turbo/cli/internal/fs"
+ "github.com/vercel/turbo/cli/internal/graph"
+ "github.com/vercel/turbo/cli/internal/nodes"
+ "github.com/vercel/turbo/cli/internal/runsummary"
+ "github.com/vercel/turbo/cli/internal/taskhash"
+ "github.com/vercel/turbo/cli/internal/util"
+)
+
+// DryRun gets all the info needed from tasks and prints out a summary, but doesn't actually
+// execute the task.
+func DryRun(
+ ctx gocontext.Context,
+ g *graph.CompleteGraph,
+ rs *runSpec,
+ engine *core.Engine,
+ _ *taskhash.Tracker, // unused, but keep here for parity with RealRun method signature
+ turboCache cache.Cache,
+ _ *fs.TurboJSON, // unused, but keep here for parity with RealRun method signature
+ globalEnvMode util.EnvMode,
+ base *cmdutil.CmdBase,
+ summary runsummary.Meta,
+) error {
+ defer turboCache.Shutdown()
+
+ taskSummaries := []*runsummary.TaskSummary{}
+
+ mu := sync.Mutex{}
+ execFunc := func(ctx gocontext.Context, packageTask *nodes.PackageTask, taskSummary *runsummary.TaskSummary) error {
+ // Assign some fallbacks if they were missing
+ if taskSummary.Command == "" {
+ taskSummary.Command = runsummary.MissingTaskLabel
+ }
+
+ if taskSummary.Framework == "" {
+ taskSummary.Framework = runsummary.MissingFrameworkLabel
+ }
+
+ // This mutex is not _really_ required, since we are using Concurrency: 1 as an execution
+ // option, but we add it here to match the shape of RealRuns execFunc.
+ mu.Lock()
+ defer mu.Unlock()
+ taskSummaries = append(taskSummaries, taskSummary)
+ return nil
+ }
+
+ // This setup mirrors a real run. We call engine.execute() with
+ // a visitor function and some hardcoded execOpts.
+ // Note: we do not currently attempt to parallelize the graph walking
+ // (as we do in real execution)
+ getArgs := func(taskID string) []string {
+ return rs.ArgsForTask(taskID)
+ }
+
+ visitorFn := g.GetPackageTaskVisitor(ctx, engine.TaskGraph, globalEnvMode, getArgs, base.Logger, execFunc)
+ execOpts := core.EngineExecutionOptions{
+ Concurrency: 1,
+ Parallel: false,
+ }
+
+ if errs := engine.Execute(visitorFn, execOpts); len(errs) > 0 {
+ for _, err := range errs {
+ base.UI.Error(err.Error())
+ }
+ return errors.New("errors occurred during dry-run graph traversal")
+ }
+
+ // We walk the graph with no concurrency.
+ // Populating the cache state is parallelizable.
+ // Do this _after_ walking the graph.
+ populateCacheState(turboCache, taskSummaries)
+
+ // Assign the Task Summaries to the main summary
+ summary.RunSummary.Tasks = taskSummaries
+
+ // The exitCode isn't really used by the Run Summary Close() method for dry runs
+ // but we pass in a successful value to match Real Runs.
+ return summary.Close(ctx, 0, g.WorkspaceInfos)
+}
+
+func populateCacheState(turboCache cache.Cache, taskSummaries []*runsummary.TaskSummary) {
+ // We make at most 8 requests at a time for cache state.
+ maxParallelRequests := 8
+ taskCount := len(taskSummaries)
+
+ parallelRequestCount := maxParallelRequests
+ if taskCount < maxParallelRequests {
+ parallelRequestCount = taskCount
+ }
+
+ queue := make(chan int, taskCount)
+
+ wg := &sync.WaitGroup{}
+ for i := 0; i < parallelRequestCount; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for index := range queue {
+ task := taskSummaries[index]
+ itemStatus := turboCache.Exists(task.Hash)
+ task.CacheSummary = runsummary.NewTaskCacheSummary(itemStatus, nil)
+ }
+ }()
+ }
+
+ for index := range taskSummaries {
+ queue <- index
+ }
+ close(queue)
+ wg.Wait()
+}
diff --git a/cli/internal/run/global_hash.go b/cli/internal/run/global_hash.go
new file mode 100644
index 0000000..2ebf642
--- /dev/null
+++ b/cli/internal/run/global_hash.go
@@ -0,0 +1,164 @@
+package run
+
+import (
+ "fmt"
+ "path/filepath"
+ "strings"
+
+ "github.com/hashicorp/go-hclog"
+ "github.com/mitchellh/cli"
+ "github.com/vercel/turbo/cli/internal/env"
+ "github.com/vercel/turbo/cli/internal/fs"
+ "github.com/vercel/turbo/cli/internal/globby"
+ "github.com/vercel/turbo/cli/internal/hashing"
+ "github.com/vercel/turbo/cli/internal/lockfile"
+ "github.com/vercel/turbo/cli/internal/packagemanager"
+ "github.com/vercel/turbo/cli/internal/turbopath"
+ "github.com/vercel/turbo/cli/internal/util"
+)
+
+const _globalCacheKey = "Buffalo buffalo Buffalo buffalo buffalo buffalo Buffalo buffalo"
+
+// Variables that we always include
+var _defaultEnvVars = []string{
+ "VERCEL_ANALYTICS_ID",
+}
+
+// GlobalHashable represents all the things that we use to create the global hash
+type GlobalHashable struct {
+ globalFileHashMap map[turbopath.AnchoredUnixPath]string
+ rootExternalDepsHash string
+ envVars env.DetailedMap
+ globalCacheKey string
+ pipeline fs.PristinePipeline
+ envVarPassthroughs []string
+ envMode util.EnvMode
+}
+
+// This exists because the global hash used to have different fields. Changing
+// to a new struct layout changes the global hash. We can remove this converter
+// when we are going to have to update the global hash for something else.
+type oldGlobalHashable struct {
+ globalFileHashMap map[turbopath.AnchoredUnixPath]string
+ rootExternalDepsHash string
+ envVars env.EnvironmentVariablePairs
+ globalCacheKey string
+ pipeline fs.PristinePipeline
+}
+
+// calculateGlobalHashFromHashable returns a hash string from the globalHashable
+func calculateGlobalHashFromHashable(full GlobalHashable) (string, error) {
+ switch full.envMode {
+ case util.Infer:
+ if full.envVarPassthroughs != nil {
+ // In infer mode, if there is any passThru config (even if it is an empty array)
+ // we'll hash the whole object, so we can detect changes to that config
+ // Further, resolve the envMode to the concrete value.
+ full.envMode = util.Strict
+ return fs.HashObject(full)
+ }
+
+ // If we're in infer mode, and there is no global pass through config,
+ // we use the old struct layout. this will be true for everyone not using the strict env
+ // feature, and we don't want to break their cache.
+ return fs.HashObject(oldGlobalHashable{
+ globalFileHashMap: full.globalFileHashMap,
+ rootExternalDepsHash: full.rootExternalDepsHash,
+ envVars: full.envVars.All.ToHashable(),
+ globalCacheKey: full.globalCacheKey,
+ pipeline: full.pipeline,
+ })
+ case util.Loose:
+ // Remove the passthroughs from hash consideration if we're explicitly loose.
+ full.envVarPassthroughs = nil
+ return fs.HashObject(full)
+ case util.Strict:
+ // Collapse `nil` and `[]` in strict mode.
+ if full.envVarPassthroughs == nil {
+ full.envVarPassthroughs = make([]string, 0)
+ }
+ return fs.HashObject(full)
+ default:
+ panic("unimplemented environment mode")
+ }
+}
+
+func calculateGlobalHash(
+ rootpath turbopath.AbsoluteSystemPath,
+ rootPackageJSON *fs.PackageJSON,
+ pipeline fs.Pipeline,
+ envVarDependencies []string,
+ globalFileDependencies []string,
+ packageManager *packagemanager.PackageManager,
+ lockFile lockfile.Lockfile,
+ envVarPassthroughs []string,
+ envMode util.EnvMode,
+ logger hclog.Logger,
+ ui cli.Ui,
+ isStructuredOutput bool,
+) (GlobalHashable, error) {
+ // Calculate env var dependencies
+ envVars := []string{}
+ envVars = append(envVars, envVarDependencies...)
+ envVars = append(envVars, _defaultEnvVars...)
+ globalHashableEnvVars, err := env.GetHashableEnvVars(envVars, []string{".*THASH.*"}, "")
+ if err != nil {
+ return GlobalHashable{}, err
+ }
+
+ // The only way we can add env vars into the hash via matching is via THASH,
+ // so we only do a simple check here for entries in `BySource.Matching`.
+ // If we enable globalEnv to accept wildcard characters, we'll need to update this
+ // check.
+ if !isStructuredOutput && len(globalHashableEnvVars.BySource.Matching) > 0 {
+ ui.Warn(fmt.Sprintf("[DEPRECATED] Using .*THASH.* to specify an environment variable for inclusion into the hash is deprecated. You specified: %s.", strings.Join(globalHashableEnvVars.BySource.Matching.Names(), ", ")))
+ }
+
+ logger.Debug("global hash env vars", "vars", globalHashableEnvVars.All.Names())
+
+ // Calculate global file dependencies
+ globalDeps := make(util.Set)
+ if len(globalFileDependencies) > 0 {
+ ignores, err := packageManager.GetWorkspaceIgnores(rootpath)
+ if err != nil {
+ return GlobalHashable{}, err
+ }
+
+ f, err := globby.GlobFiles(rootpath.ToStringDuringMigration(), globalFileDependencies, ignores)
+ if err != nil {
+ return GlobalHashable{}, err
+ }
+
+ for _, val := range f {
+ globalDeps.Add(val)
+ }
+ }
+
+ if lockFile == nil {
+ // If we don't have lockfile information available, add the specfile and lockfile to global deps
+ globalDeps.Add(filepath.Join(rootpath.ToStringDuringMigration(), packageManager.Specfile))
+ globalDeps.Add(filepath.Join(rootpath.ToStringDuringMigration(), packageManager.Lockfile))
+ }
+
+ // No prefix, global deps already have full paths
+ globalDepsArray := globalDeps.UnsafeListOfStrings()
+ globalDepsPaths := make([]turbopath.AbsoluteSystemPath, len(globalDepsArray))
+ for i, path := range globalDepsArray {
+ globalDepsPaths[i] = turbopath.AbsoluteSystemPathFromUpstream(path)
+ }
+
+ globalFileHashMap, err := hashing.GetHashableDeps(rootpath, globalDepsPaths)
+ if err != nil {
+ return GlobalHashable{}, fmt.Errorf("error hashing files: %w", err)
+ }
+
+ return GlobalHashable{
+ globalFileHashMap: globalFileHashMap,
+ rootExternalDepsHash: rootPackageJSON.ExternalDepsHash,
+ envVars: globalHashableEnvVars,
+ globalCacheKey: _globalCacheKey,
+ pipeline: pipeline.Pristine(),
+ envVarPassthroughs: envVarPassthroughs,
+ envMode: envMode,
+ }, nil
+}
diff --git a/cli/internal/run/graph_run.go b/cli/internal/run/graph_run.go
new file mode 100644
index 0000000..8531718
--- /dev/null
+++ b/cli/internal/run/graph_run.go
@@ -0,0 +1,46 @@
+package run
+
+import (
+ gocontext "context"
+
+ "github.com/pyr-sh/dag"
+ "github.com/vercel/turbo/cli/internal/cmdutil"
+ "github.com/vercel/turbo/cli/internal/core"
+ "github.com/vercel/turbo/cli/internal/graphvisualizer"
+ "github.com/vercel/turbo/cli/internal/util"
+)
+
+// GraphRun generates a visualization of the task graph rather than executing it.
+func GraphRun(ctx gocontext.Context, rs *runSpec, engine *core.Engine, base *cmdutil.CmdBase) error {
+ graph := engine.TaskGraph
+ if rs.Opts.runOpts.SinglePackage {
+ graph = filterSinglePackageGraphForDisplay(engine.TaskGraph)
+ }
+ visualizer := graphvisualizer.New(base.RepoRoot, base.UI, graph)
+
+ if rs.Opts.runOpts.GraphDot {
+ visualizer.RenderDotGraph()
+ } else {
+ err := visualizer.GenerateGraphFile(rs.Opts.runOpts.GraphFile)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// filterSinglePackageGraphForDisplay builds an equivalent graph with package names stripped from tasks.
+// Given that this should only be used in a single-package context, all of the package names are expected
+// to be //. Also, all nodes are always connected to the root node, so we are not concerned with leaving
+// behind any unconnected nodes.
+func filterSinglePackageGraphForDisplay(originalGraph *dag.AcyclicGraph) *dag.AcyclicGraph {
+ graph := &dag.AcyclicGraph{}
+ for _, edge := range originalGraph.Edges() {
+ src := util.StripPackageName(edge.Source().(string))
+ tgt := util.StripPackageName(edge.Target().(string))
+ graph.Add(src)
+ graph.Add(tgt)
+ graph.Connect(dag.BasicEdge(src, tgt))
+ }
+ return graph
+}
diff --git a/cli/internal/run/log_tag_go.go b/cli/internal/run/log_tag_go.go
new file mode 100644
index 0000000..a3e825f
--- /dev/null
+++ b/cli/internal/run/log_tag_go.go
@@ -0,0 +1,11 @@
+//go:build go || !rust
+// +build go !rust
+
+package run
+
+import "github.com/hashicorp/go-hclog"
+
+// LogTag logs out the build tag (in this case "go") for the current build.
+func LogTag(logger hclog.Logger) {
+ logger.Debug("build tag: go")
+}
diff --git a/cli/internal/run/log_tag_rust.go b/cli/internal/run/log_tag_rust.go
new file mode 100644
index 0000000..065f438
--- /dev/null
+++ b/cli/internal/run/log_tag_rust.go
@@ -0,0 +1,11 @@
+//go:build rust
+// +build rust
+
+package run
+
+import "github.com/hashicorp/go-hclog"
+
+// LogTag logs out the build tag (in this case "rust") for the current build.
+func LogTag(logger hclog.Logger) {
+ logger.Debug("build tag: rust")
+}
diff --git a/cli/internal/run/real_run.go b/cli/internal/run/real_run.go
new file mode 100644
index 0000000..32c7965
--- /dev/null
+++ b/cli/internal/run/real_run.go
@@ -0,0 +1,420 @@
+package run
+
+import (
+ gocontext "context"
+ "fmt"
+ "log"
+ "os/exec"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/fatih/color"
+ "github.com/hashicorp/go-hclog"
+ "github.com/mitchellh/cli"
+ "github.com/pkg/errors"
+ "github.com/vercel/turbo/cli/internal/cache"
+ "github.com/vercel/turbo/cli/internal/cmdutil"
+ "github.com/vercel/turbo/cli/internal/colorcache"
+ "github.com/vercel/turbo/cli/internal/core"
+ "github.com/vercel/turbo/cli/internal/env"
+ "github.com/vercel/turbo/cli/internal/fs"
+ "github.com/vercel/turbo/cli/internal/graph"
+ "github.com/vercel/turbo/cli/internal/logstreamer"
+ "github.com/vercel/turbo/cli/internal/nodes"
+ "github.com/vercel/turbo/cli/internal/packagemanager"
+ "github.com/vercel/turbo/cli/internal/process"
+ "github.com/vercel/turbo/cli/internal/runcache"
+ "github.com/vercel/turbo/cli/internal/runsummary"
+ "github.com/vercel/turbo/cli/internal/spinner"
+ "github.com/vercel/turbo/cli/internal/taskhash"
+ "github.com/vercel/turbo/cli/internal/turbopath"
+ "github.com/vercel/turbo/cli/internal/ui"
+ "github.com/vercel/turbo/cli/internal/util"
+)
+
+// RealRun executes a set of tasks
+func RealRun(
+ ctx gocontext.Context,
+ g *graph.CompleteGraph,
+ rs *runSpec,
+ engine *core.Engine,
+ taskHashTracker *taskhash.Tracker,
+ turboCache cache.Cache,
+ turboJSON *fs.TurboJSON,
+ globalEnvMode util.EnvMode,
+ packagesInScope []string,
+ base *cmdutil.CmdBase,
+ runSummary runsummary.Meta,
+ packageManager *packagemanager.PackageManager,
+ processes *process.Manager,
+) error {
+ singlePackage := rs.Opts.runOpts.SinglePackage
+
+ if singlePackage {
+ base.UI.Output(fmt.Sprintf("%s %s", ui.Dim("• Running"), ui.Dim(ui.Bold(strings.Join(rs.Targets, ", ")))))
+ } else {
+ base.UI.Output(fmt.Sprintf(ui.Dim("• Packages in scope: %v"), strings.Join(packagesInScope, ", ")))
+ base.UI.Output(fmt.Sprintf("%s %s %s", ui.Dim("• Running"), ui.Dim(ui.Bold(strings.Join(rs.Targets, ", "))), ui.Dim(fmt.Sprintf("in %v packages", rs.FilteredPkgs.Len()))))
+ }
+
+ // Log whether remote cache is enabled
+ useHTTPCache := !rs.Opts.cacheOpts.SkipRemote
+ if useHTTPCache {
+ base.UI.Info(ui.Dim("• Remote caching enabled"))
+ } else {
+ base.UI.Info(ui.Dim("• Remote caching disabled"))
+ }
+
+ defer func() {
+ _ = spinner.WaitFor(ctx, turboCache.Shutdown, base.UI, "...writing to cache...", 1500*time.Millisecond)
+ }()
+ colorCache := colorcache.New()
+
+ runCache := runcache.New(turboCache, base.RepoRoot, rs.Opts.runcacheOpts, colorCache)
+
+ ec := &execContext{
+ colorCache: colorCache,
+ runSummary: runSummary,
+ rs: rs,
+ ui: &cli.ConcurrentUi{Ui: base.UI},
+ runCache: runCache,
+ env: turboJSON.GlobalEnv,
+ passthroughEnv: turboJSON.GlobalPassthroughEnv,
+ logger: base.Logger,
+ packageManager: packageManager,
+ processes: processes,
+ taskHashTracker: taskHashTracker,
+ repoRoot: base.RepoRoot,
+ isSinglePackage: singlePackage,
+ }
+
+ // run the thing
+ execOpts := core.EngineExecutionOptions{
+ Parallel: rs.Opts.runOpts.Parallel,
+ Concurrency: rs.Opts.runOpts.Concurrency,
+ }
+
+ mu := sync.Mutex{}
+ taskSummaries := []*runsummary.TaskSummary{}
+ execFunc := func(ctx gocontext.Context, packageTask *nodes.PackageTask, taskSummary *runsummary.TaskSummary) error {
+ taskExecutionSummary, err := ec.exec(ctx, packageTask)
+
+ // taskExecutionSummary will be nil if the task never executed
+ // (i.e. if the workspace didn't implement the script corresponding to the task)
+ // We don't need to collect any of the outputs or execution if the task didn't execute.
+ if taskExecutionSummary != nil {
+ taskSummary.ExpandedOutputs = taskHashTracker.GetExpandedOutputs(taskSummary.TaskID)
+ taskSummary.Execution = taskExecutionSummary
+ taskSummary.CacheSummary = taskHashTracker.GetCacheStatus(taskSummary.TaskID)
+
+ // lock since multiple things to be appending to this array at the same time
+ mu.Lock()
+ taskSummaries = append(taskSummaries, taskSummary)
+ // not using defer, just release the lock
+ mu.Unlock()
+ }
+
+ // Return the error when there is one
+ if err != nil {
+ return err
+ }
+
+ return nil
+ }
+
+ getArgs := func(taskID string) []string {
+ return rs.ArgsForTask(taskID)
+ }
+
+ visitorFn := g.GetPackageTaskVisitor(ctx, engine.TaskGraph, globalEnvMode, getArgs, base.Logger, execFunc)
+ errs := engine.Execute(visitorFn, execOpts)
+
+ // Track if we saw any child with a non-zero exit code
+ exitCode := 0
+ exitCodeErr := &process.ChildExit{}
+
+ // Assign tasks after execution
+ runSummary.RunSummary.Tasks = taskSummaries
+
+ for _, err := range errs {
+ if errors.As(err, &exitCodeErr) {
+ // If a process gets killed via a signal, Go reports it's exit code as -1.
+ // We take the absolute value of the exit code so we don't select '0' as
+ // the greatest exit code.
+ childExit := exitCodeErr.ExitCode
+ if childExit < 0 {
+ childExit = -childExit
+ }
+ if childExit > exitCode {
+ exitCode = childExit
+ }
+ } else if exitCode == 0 {
+ // We hit some error, it shouldn't be exit code 0
+ exitCode = 1
+ }
+ base.UI.Error(err.Error())
+ }
+
+ // When continue on error is enabled don't register failed tasks as errors
+ // and instead must inspect the task summaries.
+ if ec.rs.Opts.runOpts.ContinueOnError {
+ for _, summary := range runSummary.RunSummary.Tasks {
+ if childExit := summary.Execution.ExitCode(); childExit != nil {
+ childExit := *childExit
+ if childExit < 0 {
+ childExit = -childExit
+ }
+ if childExit > exitCode {
+ exitCode = childExit
+ }
+ }
+ }
+ }
+
+ if err := runSummary.Close(ctx, exitCode, g.WorkspaceInfos); err != nil {
+ // We don't need to throw an error, but we can warn on this.
+ // Note: this method doesn't actually return an error for Real Runs at the time of writing.
+ base.UI.Info(fmt.Sprintf("Failed to close Run Summary %v", err))
+ }
+
+ if exitCode != 0 {
+ return &process.ChildExit{
+ ExitCode: exitCode,
+ }
+ }
+ return nil
+}
+
+type execContext struct {
+ colorCache *colorcache.ColorCache
+ runSummary runsummary.Meta
+ rs *runSpec
+ ui cli.Ui
+ runCache *runcache.RunCache
+ env []string
+ passthroughEnv []string
+ logger hclog.Logger
+ packageManager *packagemanager.PackageManager
+ processes *process.Manager
+ taskHashTracker *taskhash.Tracker
+ repoRoot turbopath.AbsoluteSystemPath
+ isSinglePackage bool
+}
+
+func (ec *execContext) logError(prefix string, err error) {
+ ec.logger.Error(prefix, "error", err)
+
+ if prefix != "" {
+ prefix += ": "
+ }
+
+ ec.ui.Error(fmt.Sprintf("%s%s%s", ui.ERROR_PREFIX, prefix, color.RedString(" %v", err)))
+}
+
+func (ec *execContext) exec(ctx gocontext.Context, packageTask *nodes.PackageTask) (*runsummary.TaskExecutionSummary, error) {
+ // Setup tracer. Every time tracer() is called the taskExecutionSummary's duration is updated
+ // So make sure to call it before returning.
+ tracer, taskExecutionSummary := ec.runSummary.RunSummary.TrackTask(packageTask.TaskID)
+
+ progressLogger := ec.logger.Named("")
+ progressLogger.Debug("start")
+
+ passThroughArgs := ec.rs.ArgsForTask(packageTask.Task)
+ hash := packageTask.Hash
+ ec.logger.Debug("task hash", "value", hash)
+ // TODO(gsoltis): if/when we fix https://github.com/vercel/turbo/issues/937
+ // the following block should never get hit. In the meantime, keep it after hashing
+ // so that downstream tasks can count on the hash existing
+ //
+ // bail if the script doesn't exist
+ if packageTask.Command == "" {
+ progressLogger.Debug("no task in package, skipping")
+ progressLogger.Debug("done", "status", "skipped", "duration", taskExecutionSummary.Duration)
+ // Return nil here because there was no execution, so there is no task execution summary
+ return nil, nil
+ }
+
+ // Set building status now that we know it's going to run.
+ tracer(runsummary.TargetBuilding, nil, &successCode)
+
+ var prefix string
+ var prettyPrefix string
+ if ec.rs.Opts.runOpts.LogPrefix == "none" {
+ prefix = ""
+ } else {
+ prefix = packageTask.OutputPrefix(ec.isSinglePackage)
+ }
+
+ prettyPrefix = ec.colorCache.PrefixWithColor(packageTask.PackageName, prefix)
+
+ // Cache ---------------------------------------------
+ taskCache := ec.runCache.TaskCache(packageTask, hash)
+ // Create a logger for replaying
+ prefixedUI := &cli.PrefixedUi{
+ Ui: ec.ui,
+ OutputPrefix: prettyPrefix,
+ InfoPrefix: prettyPrefix,
+ ErrorPrefix: prettyPrefix,
+ WarnPrefix: prettyPrefix,
+ }
+
+ cacheStatus, timeSaved, err := taskCache.RestoreOutputs(ctx, prefixedUI, progressLogger)
+
+ // It's safe to set the CacheStatus even if there's an error, because if there's
+ // an error, the 0 values are actually what we want. We save cacheStatus and timeSaved
+ // for the task, so that even if there's an error, we have those values for the taskSummary.
+ ec.taskHashTracker.SetCacheStatus(
+ packageTask.TaskID,
+ runsummary.NewTaskCacheSummary(cacheStatus, &timeSaved),
+ )
+
+ if err != nil {
+ prefixedUI.Error(fmt.Sprintf("error fetching from cache: %s", err))
+ } else if cacheStatus.Local || cacheStatus.Remote { // If there was a cache hit
+ ec.taskHashTracker.SetExpandedOutputs(packageTask.TaskID, taskCache.ExpandedOutputs)
+ // We only cache successful executions, so we can assume this is a successCode exit.
+ tracer(runsummary.TargetCached, nil, &successCode)
+ return taskExecutionSummary, nil
+ }
+
+ // Setup command execution
+ argsactual := append([]string{"run"}, packageTask.Task)
+ if len(passThroughArgs) > 0 {
+ // This will be either '--' or a typed nil
+ argsactual = append(argsactual, ec.packageManager.ArgSeparator...)
+ argsactual = append(argsactual, passThroughArgs...)
+ }
+
+ cmd := exec.Command(ec.packageManager.Command, argsactual...)
+ cmd.Dir = packageTask.Pkg.Dir.ToSystemPath().RestoreAnchor(ec.repoRoot).ToString()
+
+ currentState := env.GetEnvMap()
+ passthroughEnv := env.EnvironmentVariableMap{}
+
+ if packageTask.EnvMode == util.Strict {
+ defaultPassthrough := []string{
+ "PATH",
+ "SHELL",
+ "SYSTEMROOT", // Go will always include this on Windows, but we're being explicit here
+ }
+
+ passthroughEnv.Merge(env.FromKeys(currentState, defaultPassthrough))
+ passthroughEnv.Merge(env.FromKeys(currentState, ec.env))
+ passthroughEnv.Merge(env.FromKeys(currentState, ec.passthroughEnv))
+ passthroughEnv.Merge(env.FromKeys(currentState, packageTask.TaskDefinition.EnvVarDependencies))
+ passthroughEnv.Merge(env.FromKeys(currentState, packageTask.TaskDefinition.PassthroughEnv))
+ } else {
+ passthroughEnv.Merge(currentState)
+ }
+
+ // Always last to make sure it clobbers.
+ passthroughEnv.Add("TURBO_HASH", hash)
+
+ cmd.Env = passthroughEnv.ToHashable()
+
+ // Setup stdout/stderr
+ // If we are not caching anything, then we don't need to write logs to disk
+ // be careful about this conditional given the default of cache = true
+ writer, err := taskCache.OutputWriter(prettyPrefix)
+ if err != nil {
+ tracer(runsummary.TargetBuildFailed, err, nil)
+
+ ec.logError(prettyPrefix, err)
+ if !ec.rs.Opts.runOpts.ContinueOnError {
+ return nil, errors.Wrapf(err, "failed to capture outputs for \"%v\"", packageTask.TaskID)
+ }
+ }
+
+ // Create a logger
+ logger := log.New(writer, "", 0)
+ // Setup a streamer that we'll pipe cmd.Stdout to
+ logStreamerOut := logstreamer.NewLogstreamer(logger, prettyPrefix, false)
+ // Setup a streamer that we'll pipe cmd.Stderr to.
+ logStreamerErr := logstreamer.NewLogstreamer(logger, prettyPrefix, false)
+ cmd.Stderr = logStreamerErr
+ cmd.Stdout = logStreamerOut
+ // Flush/Reset any error we recorded
+ logStreamerErr.FlushRecord()
+ logStreamerOut.FlushRecord()
+
+ closeOutputs := func() error {
+ var closeErrors []error
+
+ if err := logStreamerOut.Close(); err != nil {
+ closeErrors = append(closeErrors, errors.Wrap(err, "log stdout"))
+ }
+ if err := logStreamerErr.Close(); err != nil {
+ closeErrors = append(closeErrors, errors.Wrap(err, "log stderr"))
+ }
+
+ if err := writer.Close(); err != nil {
+ closeErrors = append(closeErrors, errors.Wrap(err, "log file"))
+ }
+ if len(closeErrors) > 0 {
+ msgs := make([]string, len(closeErrors))
+ for i, err := range closeErrors {
+ msgs[i] = err.Error()
+ }
+ return fmt.Errorf("could not flush log output: %v", strings.Join(msgs, ", "))
+ }
+ return nil
+ }
+
+ // Run the command
+ if err := ec.processes.Exec(cmd); err != nil {
+ // close off our outputs. We errored, so we mostly don't care if we fail to close
+ _ = closeOutputs()
+ // if we already know we're in the process of exiting,
+ // we don't need to record an error to that effect.
+ if errors.Is(err, process.ErrClosing) {
+ return taskExecutionSummary, nil
+ }
+
+ // If the error we got is a ChildExit, it will have an ExitCode field
+ // Pass that along into the tracer.
+ var e *process.ChildExit
+ if errors.As(err, &e) {
+ tracer(runsummary.TargetBuildFailed, err, &e.ExitCode)
+ } else {
+ // If it wasn't a ChildExit, and something else went wrong, we don't have an exitCode
+ tracer(runsummary.TargetBuildFailed, err, nil)
+ }
+
+ progressLogger.Error(fmt.Sprintf("Error: command finished with error: %v", err))
+ if !ec.rs.Opts.runOpts.ContinueOnError {
+ prefixedUI.Error(fmt.Sprintf("ERROR: command finished with error: %s", err))
+ ec.processes.Close()
+ } else {
+ prefixedUI.Warn("command finished with error, but continuing...")
+ // Set to nil so we don't short-circuit any other execution
+ err = nil
+ }
+
+ // If there was an error, flush the buffered output
+ taskCache.OnError(prefixedUI, progressLogger)
+
+ return taskExecutionSummary, err
+ }
+
+ // Add another timestamp into the tracer, so we have an accurate timestamp for how long the task took.
+ tracer(runsummary.TargetExecuted, nil, nil)
+
+ // Close off our outputs and cache them
+ if err := closeOutputs(); err != nil {
+ ec.logError("", err)
+ } else {
+ if err = taskCache.SaveOutputs(ctx, progressLogger, prefixedUI, int(taskExecutionSummary.Duration.Milliseconds())); err != nil {
+ ec.logError("", fmt.Errorf("error caching output: %w", err))
+ } else {
+ ec.taskHashTracker.SetExpandedOutputs(packageTask.TaskID, taskCache.ExpandedOutputs)
+ }
+ }
+
+ // Clean up tracing
+ tracer(runsummary.TargetBuilt, nil, &successCode)
+ progressLogger.Debug("done", "status", "complete", "duration", taskExecutionSummary.Duration)
+ return taskExecutionSummary, nil
+}
+
+var successCode = 0
diff --git a/cli/internal/run/run.go b/cli/internal/run/run.go
new file mode 100644
index 0000000..2ac1141
--- /dev/null
+++ b/cli/internal/run/run.go
@@ -0,0 +1,487 @@
+package run
+
+import (
+ gocontext "context"
+ "fmt"
+ "os"
+ "sort"
+ "sync"
+ "time"
+
+ "github.com/vercel/turbo/cli/internal/analytics"
+ "github.com/vercel/turbo/cli/internal/cache"
+ "github.com/vercel/turbo/cli/internal/cmdutil"
+ "github.com/vercel/turbo/cli/internal/context"
+ "github.com/vercel/turbo/cli/internal/core"
+ "github.com/vercel/turbo/cli/internal/daemon"
+ "github.com/vercel/turbo/cli/internal/daemonclient"
+ "github.com/vercel/turbo/cli/internal/env"
+ "github.com/vercel/turbo/cli/internal/fs"
+ "github.com/vercel/turbo/cli/internal/graph"
+ "github.com/vercel/turbo/cli/internal/process"
+ "github.com/vercel/turbo/cli/internal/runsummary"
+ "github.com/vercel/turbo/cli/internal/scm"
+ "github.com/vercel/turbo/cli/internal/scope"
+ "github.com/vercel/turbo/cli/internal/signals"
+ "github.com/vercel/turbo/cli/internal/taskhash"
+ "github.com/vercel/turbo/cli/internal/turbostate"
+ "github.com/vercel/turbo/cli/internal/ui"
+ "github.com/vercel/turbo/cli/internal/util"
+
+ "github.com/pkg/errors"
+)
+
+// ExecuteRun executes the run command
+func ExecuteRun(ctx gocontext.Context, helper *cmdutil.Helper, signalWatcher *signals.Watcher, args *turbostate.ParsedArgsFromRust) error {
+ base, err := helper.GetCmdBase(args)
+ LogTag(base.Logger)
+ if err != nil {
+ return err
+ }
+ tasks := args.Command.Run.Tasks
+ passThroughArgs := args.Command.Run.PassThroughArgs
+ if len(tasks) == 0 {
+ return errors.New("at least one task must be specified")
+ }
+ opts, err := optsFromArgs(args)
+ if err != nil {
+ return err
+ }
+
+ opts.runOpts.PassThroughArgs = passThroughArgs
+ run := configureRun(base, opts, signalWatcher)
+ if err := run.run(ctx, tasks); err != nil {
+ base.LogError("run failed: %v", err)
+ return err
+ }
+ return nil
+}
+
+func optsFromArgs(args *turbostate.ParsedArgsFromRust) (*Opts, error) {
+ runPayload := args.Command.Run
+
+ opts := getDefaultOptions()
+ // aliases := make(map[string]string)
+ if err := scope.OptsFromArgs(&opts.scopeOpts, args); err != nil {
+ return nil, err
+ }
+
+ // Cache flags
+ opts.clientOpts.Timeout = args.RemoteCacheTimeout
+ opts.cacheOpts.SkipFilesystem = runPayload.RemoteOnly
+ opts.cacheOpts.OverrideDir = runPayload.CacheDir
+ opts.cacheOpts.Workers = runPayload.CacheWorkers
+
+ // Run flags
+ opts.runOpts.LogPrefix = runPayload.LogPrefix
+ opts.runOpts.Summarize = runPayload.Summarize
+ opts.runOpts.ExperimentalSpaceID = runPayload.ExperimentalSpaceID
+ opts.runOpts.EnvMode = runPayload.EnvMode
+
+ // Runcache flags
+ opts.runcacheOpts.SkipReads = runPayload.Force
+ opts.runcacheOpts.SkipWrites = runPayload.NoCache
+
+ if runPayload.OutputLogs != "" {
+ err := opts.runcacheOpts.SetTaskOutputMode(runPayload.OutputLogs)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ // Run flags
+ if runPayload.Concurrency != "" {
+ concurrency, err := util.ParseConcurrency(runPayload.Concurrency)
+ if err != nil {
+ return nil, err
+ }
+ opts.runOpts.Concurrency = concurrency
+ }
+ opts.runOpts.Parallel = runPayload.Parallel
+ opts.runOpts.Profile = runPayload.Profile
+ opts.runOpts.ContinueOnError = runPayload.ContinueExecution
+ opts.runOpts.Only = runPayload.Only
+ opts.runOpts.NoDaemon = runPayload.NoDaemon
+ opts.runOpts.SinglePackage = args.Command.Run.SinglePackage
+
+ // See comment on Graph in turbostate.go for an explanation on Graph's representation.
+ // If flag is passed...
+ if runPayload.Graph != nil {
+ // If no value is attached, we print to stdout
+ if *runPayload.Graph == "" {
+ opts.runOpts.GraphDot = true
+ } else {
+ // Otherwise, we emit to the file name attached as value
+ opts.runOpts.GraphDot = false
+ opts.runOpts.GraphFile = *runPayload.Graph
+ }
+ }
+
+ if runPayload.DryRun != "" {
+ opts.runOpts.DryRunJSON = runPayload.DryRun == _dryRunJSONValue
+
+ if runPayload.DryRun == _dryRunTextValue || runPayload.DryRun == _dryRunJSONValue {
+ opts.runOpts.DryRun = true
+ } else {
+ return nil, fmt.Errorf("invalid dry-run mode: %v", runPayload.DryRun)
+ }
+ }
+
+ return opts, nil
+}
+
+func configureRun(base *cmdutil.CmdBase, opts *Opts, signalWatcher *signals.Watcher) *run {
+ if os.Getenv("TURBO_FORCE") == "true" {
+ opts.runcacheOpts.SkipReads = true
+ }
+
+ if os.Getenv("TURBO_REMOTE_ONLY") == "true" {
+ opts.cacheOpts.SkipFilesystem = true
+ }
+
+ processes := process.NewManager(base.Logger.Named("processes"))
+ signalWatcher.AddOnClose(processes.Close)
+ return &run{
+ base: base,
+ opts: opts,
+ processes: processes,
+ }
+}
+
+type run struct {
+ base *cmdutil.CmdBase
+ opts *Opts
+ processes *process.Manager
+}
+
+func (r *run) run(ctx gocontext.Context, targets []string) error {
+ startAt := time.Now()
+ packageJSONPath := r.base.RepoRoot.UntypedJoin("package.json")
+ rootPackageJSON, err := fs.ReadPackageJSON(packageJSONPath)
+ if err != nil {
+ return fmt.Errorf("failed to read package.json: %w", err)
+ }
+
+ isStructuredOutput := r.opts.runOpts.GraphDot || r.opts.runOpts.DryRunJSON
+
+ var pkgDepGraph *context.Context
+ if r.opts.runOpts.SinglePackage {
+ pkgDepGraph, err = context.SinglePackageGraph(r.base.RepoRoot, rootPackageJSON)
+ } else {
+ pkgDepGraph, err = context.BuildPackageGraph(r.base.RepoRoot, rootPackageJSON)
+ }
+ if err != nil {
+ var warnings *context.Warnings
+ if errors.As(err, &warnings) {
+ r.base.LogWarning("Issues occurred when constructing package graph. Turbo will function, but some features may not be available", err)
+ } else {
+ return err
+ }
+ }
+
+ if ui.IsCI && !r.opts.runOpts.NoDaemon {
+ r.base.Logger.Info("skipping turbod since we appear to be in a non-interactive context")
+ } else if !r.opts.runOpts.NoDaemon {
+ turbodClient, err := daemon.GetClient(ctx, r.base.RepoRoot, r.base.Logger, r.base.TurboVersion, daemon.ClientOpts{})
+ if err != nil {
+ r.base.LogWarning("", errors.Wrap(err, "failed to contact turbod. Continuing in standalone mode"))
+ } else {
+ defer func() { _ = turbodClient.Close() }()
+ r.base.Logger.Debug("running in daemon mode")
+ daemonClient := daemonclient.New(turbodClient)
+ r.opts.runcacheOpts.OutputWatcher = daemonClient
+ }
+ }
+
+ if err := util.ValidateGraph(&pkgDepGraph.WorkspaceGraph); err != nil {
+ return errors.Wrap(err, "Invalid package dependency graph")
+ }
+
+ // TODO: consolidate some of these arguments
+ // Note: not all properties are set here. GlobalHash and Pipeline keys are set later
+ g := &graph.CompleteGraph{
+ WorkspaceGraph: pkgDepGraph.WorkspaceGraph,
+ WorkspaceInfos: pkgDepGraph.WorkspaceInfos,
+ RootNode: pkgDepGraph.RootNode,
+ TaskDefinitions: map[string]*fs.TaskDefinition{},
+ RepoRoot: r.base.RepoRoot,
+ }
+
+ turboJSON, err := g.GetTurboConfigFromWorkspace(util.RootPkgName, r.opts.runOpts.SinglePackage)
+ if err != nil {
+ return err
+ }
+
+ // TODO: these values come from a config file, hopefully viper can help us merge these
+ r.opts.cacheOpts.RemoteCacheOpts = turboJSON.RemoteCacheOptions
+
+ pipeline := turboJSON.Pipeline
+ g.Pipeline = pipeline
+ scmInstance, err := scm.FromInRepo(r.base.RepoRoot)
+ if err != nil {
+ if errors.Is(err, scm.ErrFallback) {
+ r.base.Logger.Debug("", err)
+ } else {
+ return errors.Wrap(err, "failed to create SCM")
+ }
+ }
+ filteredPkgs, isAllPackages, err := scope.ResolvePackages(&r.opts.scopeOpts, r.base.RepoRoot, scmInstance, pkgDepGraph, r.base.UI, r.base.Logger)
+ if err != nil {
+ return errors.Wrap(err, "failed to resolve packages to run")
+ }
+ if isAllPackages {
+ // if there is a root task for any of our targets, we need to add it
+ for _, target := range targets {
+ key := util.RootTaskID(target)
+ if _, ok := pipeline[key]; ok {
+ filteredPkgs.Add(util.RootPkgName)
+ // we only need to know we're running a root task once to add it for consideration
+ break
+ }
+ }
+ }
+
+ globalHashable, err := calculateGlobalHash(
+ r.base.RepoRoot,
+ rootPackageJSON,
+ pipeline,
+ turboJSON.GlobalEnv,
+ turboJSON.GlobalDeps,
+ pkgDepGraph.PackageManager,
+ pkgDepGraph.Lockfile,
+ turboJSON.GlobalPassthroughEnv,
+ r.opts.runOpts.EnvMode,
+ r.base.Logger,
+ r.base.UI,
+ isStructuredOutput,
+ )
+
+ if err != nil {
+ return fmt.Errorf("failed to collect global hash inputs: %v", err)
+ }
+
+ if globalHash, err := calculateGlobalHashFromHashable(globalHashable); err == nil {
+ r.base.Logger.Debug("global hash", "value", globalHash)
+ g.GlobalHash = globalHash
+ } else {
+ return fmt.Errorf("failed to calculate global hash: %v", err)
+ }
+
+ r.base.Logger.Debug("local cache folder", "path", r.opts.cacheOpts.OverrideDir)
+
+ rs := &runSpec{
+ Targets: targets,
+ FilteredPkgs: filteredPkgs,
+ Opts: r.opts,
+ }
+ packageManager := pkgDepGraph.PackageManager
+
+ engine, err := buildTaskGraphEngine(
+ g,
+ rs,
+ r.opts.runOpts.SinglePackage,
+ )
+
+ if err != nil {
+ return errors.Wrap(err, "error preparing engine")
+ }
+
+ taskHashTracker := taskhash.NewTracker(
+ g.RootNode,
+ g.GlobalHash,
+ // TODO(mehulkar): remove g,Pipeline, because we need to get task definitions from CompleteGaph instead
+ g.Pipeline,
+ )
+
+ g.TaskHashTracker = taskHashTracker
+
+ // CalculateFileHashes assigns PackageInputsExpandedHashes as a side-effect
+ err = taskHashTracker.CalculateFileHashes(
+ engine.TaskGraph.Vertices(),
+ rs.Opts.runOpts.Concurrency,
+ g.WorkspaceInfos,
+ g.TaskDefinitions,
+ r.base.RepoRoot,
+ )
+
+ if err != nil {
+ return errors.Wrap(err, "error hashing package files")
+ }
+
+ // If we are running in parallel, then we remove all the edges in the graph
+ // except for the root. Rebuild the task graph for backwards compatibility.
+ // We still use dependencies specified by the pipeline configuration.
+ if rs.Opts.runOpts.Parallel {
+ for _, edge := range g.WorkspaceGraph.Edges() {
+ if edge.Target() != g.RootNode {
+ g.WorkspaceGraph.RemoveEdge(edge)
+ }
+ }
+ engine, err = buildTaskGraphEngine(
+ g,
+ rs,
+ r.opts.runOpts.SinglePackage,
+ )
+ if err != nil {
+ return errors.Wrap(err, "error preparing engine")
+ }
+ }
+
+ // Graph Run
+ if rs.Opts.runOpts.GraphFile != "" || rs.Opts.runOpts.GraphDot {
+ return GraphRun(ctx, rs, engine, r.base)
+ }
+
+ packagesInScope := rs.FilteredPkgs.UnsafeListOfStrings()
+ sort.Strings(packagesInScope)
+ // Initiate analytics and cache
+ analyticsClient := r.initAnalyticsClient(ctx)
+ defer analyticsClient.CloseWithTimeout(50 * time.Millisecond)
+ turboCache, err := r.initCache(ctx, rs, analyticsClient)
+
+ if err != nil {
+ if errors.Is(err, cache.ErrNoCachesEnabled) {
+ r.base.UI.Warn("No caches are enabled. You can try \"turbo login\", \"turbo link\", or ensuring you are not passing --remote-only to enable caching")
+ } else {
+ return errors.Wrap(err, "failed to set up caching")
+ }
+ }
+
+ var envVarPassthroughMap env.EnvironmentVariableMap
+ if globalHashable.envVarPassthroughs != nil {
+ if envVarPassthroughDetailedMap, err := env.GetHashableEnvVars(globalHashable.envVarPassthroughs, nil, ""); err == nil {
+ envVarPassthroughMap = envVarPassthroughDetailedMap.BySource.Explicit
+ }
+ }
+
+ globalEnvMode := rs.Opts.runOpts.EnvMode
+ if globalEnvMode == util.Infer && turboJSON.GlobalPassthroughEnv != nil {
+ globalEnvMode = util.Strict
+ }
+
+ // RunSummary contains information that is statically analyzable about
+ // the tasks that we expect to run based on the user command.
+ summary := runsummary.NewRunSummary(
+ startAt,
+ r.base.UI,
+ r.base.RepoRoot,
+ rs.Opts.scopeOpts.PackageInferenceRoot,
+ r.base.TurboVersion,
+ r.base.APIClient,
+ rs.Opts.runOpts,
+ packagesInScope,
+ globalEnvMode,
+ runsummary.NewGlobalHashSummary(
+ globalHashable.globalFileHashMap,
+ globalHashable.rootExternalDepsHash,
+ globalHashable.envVars,
+ envVarPassthroughMap,
+ globalHashable.globalCacheKey,
+ globalHashable.pipeline,
+ ),
+ rs.Opts.SynthesizeCommand(rs.Targets),
+ )
+
+ // Dry Run
+ if rs.Opts.runOpts.DryRun {
+ return DryRun(
+ ctx,
+ g,
+ rs,
+ engine,
+ taskHashTracker,
+ turboCache,
+ turboJSON,
+ globalEnvMode,
+ r.base,
+ summary,
+ )
+ }
+
+ // Regular run
+ return RealRun(
+ ctx,
+ g,
+ rs,
+ engine,
+ taskHashTracker,
+ turboCache,
+ turboJSON,
+ globalEnvMode,
+ packagesInScope,
+ r.base,
+ summary,
+ // Extra arg only for regular runs, dry-run doesn't get this
+ packageManager,
+ r.processes,
+ )
+}
+
+func (r *run) initAnalyticsClient(ctx gocontext.Context) analytics.Client {
+ apiClient := r.base.APIClient
+ var analyticsSink analytics.Sink
+ if apiClient.IsLinked() {
+ analyticsSink = apiClient
+ } else {
+ r.opts.cacheOpts.SkipRemote = true
+ analyticsSink = analytics.NullSink
+ }
+ analyticsClient := analytics.NewClient(ctx, analyticsSink, r.base.Logger.Named("analytics"))
+ return analyticsClient
+}
+
+func (r *run) initCache(ctx gocontext.Context, rs *runSpec, analyticsClient analytics.Client) (cache.Cache, error) {
+ apiClient := r.base.APIClient
+ // Theoretically this is overkill, but bias towards not spamming the console
+ once := &sync.Once{}
+
+ return cache.New(rs.Opts.cacheOpts, r.base.RepoRoot, apiClient, analyticsClient, func(_cache cache.Cache, err error) {
+ // Currently the HTTP Cache is the only one that can be disabled.
+ // With a cache system refactor, we might consider giving names to the caches so
+ // we can accurately report them here.
+ once.Do(func() {
+ r.base.LogWarning("Remote Caching is unavailable", err)
+ })
+ })
+}
+
+func buildTaskGraphEngine(
+ g *graph.CompleteGraph,
+ rs *runSpec,
+ isSinglePackage bool,
+) (*core.Engine, error) {
+ engine := core.NewEngine(g, isSinglePackage)
+
+ // Note: g.Pipeline is a map, but this for loop only cares about the keys
+ for taskName := range g.Pipeline {
+ engine.AddTask(taskName)
+ }
+
+ if err := engine.Prepare(&core.EngineBuildingOptions{
+ Packages: rs.FilteredPkgs.UnsafeListOfStrings(),
+ TaskNames: rs.Targets,
+ TasksOnly: rs.Opts.runOpts.Only,
+ }); err != nil {
+ return nil, err
+ }
+
+ // Check for cycles in the DAG.
+ if err := util.ValidateGraph(engine.TaskGraph); err != nil {
+ return nil, fmt.Errorf("Invalid task dependency graph:\n%v", err)
+ }
+
+ // Check that no tasks would be blocked by a persistent task
+ if err := engine.ValidatePersistentDependencies(g, rs.Opts.runOpts.Concurrency); err != nil {
+ return nil, fmt.Errorf("Invalid persistent task configuration:\n%v", err)
+ }
+
+ return engine, nil
+}
+
+// dry run custom flag
+// NOTE: These *must* be kept in sync with the corresponding Rust
+// enum definitions in shim/src/commands/mod.rs
+const (
+ _dryRunJSONValue = "Json"
+ _dryRunTextValue = "Text"
+)
diff --git a/cli/internal/run/run_spec.go b/cli/internal/run/run_spec.go
new file mode 100644
index 0000000..14402d3
--- /dev/null
+++ b/cli/internal/run/run_spec.go
@@ -0,0 +1,90 @@
+// Package run implements `turbo run`
+// This file implements some structs for options
+package run
+
+import (
+ "strings"
+
+ "github.com/vercel/turbo/cli/internal/cache"
+ "github.com/vercel/turbo/cli/internal/client"
+ "github.com/vercel/turbo/cli/internal/runcache"
+ "github.com/vercel/turbo/cli/internal/scope"
+ "github.com/vercel/turbo/cli/internal/util"
+)
+
+// runSpec contains the run-specific configuration elements that come from a particular
+// invocation of turbo.
+type runSpec struct {
+ // Target is a list of task that are going to run this time
+ // E.g. in `turbo run build lint` Targets will be ["build", "lint"]
+ Targets []string
+
+ // FilteredPkgs is the list of packages that are relevant for this run.
+ FilteredPkgs util.Set
+
+ // Opts contains various opts, gathered from CLI flags,
+ // but bucketed in smaller structs based on what they mean.
+ Opts *Opts
+}
+
+// ArgsForTask returns the set of args that need to be passed through to the task
+func (rs *runSpec) ArgsForTask(task string) []string {
+ passThroughArgs := make([]string, 0, len(rs.Opts.runOpts.PassThroughArgs))
+ for _, target := range rs.Targets {
+ if target == task {
+ passThroughArgs = append(passThroughArgs, rs.Opts.runOpts.PassThroughArgs...)
+ }
+ }
+ return passThroughArgs
+}
+
+// Opts holds the current run operations configuration
+type Opts struct {
+ runOpts util.RunOpts
+ cacheOpts cache.Opts
+ clientOpts client.Opts
+ runcacheOpts runcache.Opts
+ scopeOpts scope.Opts
+}
+
+// SynthesizeCommand produces a command that produces an equivalent set of packages, tasks,
+// and task arguments to what the current set of opts selects.
+func (o *Opts) SynthesizeCommand(tasks []string) string {
+ cmd := "turbo run"
+ cmd += " " + strings.Join(tasks, " ")
+ for _, filterPattern := range o.scopeOpts.FilterPatterns {
+ cmd += " --filter=" + filterPattern
+ }
+ for _, filterPattern := range o.scopeOpts.LegacyFilter.AsFilterPatterns() {
+ cmd += " --filter=" + filterPattern
+ }
+ if o.runOpts.Parallel {
+ cmd += " --parallel"
+ }
+ if o.runOpts.ContinueOnError {
+ cmd += " --continue"
+ }
+ if o.runOpts.DryRun {
+ if o.runOpts.DryRunJSON {
+ cmd += " --dry=json"
+ } else {
+ cmd += " --dry"
+ }
+ }
+ if len(o.runOpts.PassThroughArgs) > 0 {
+ cmd += " -- " + strings.Join(o.runOpts.PassThroughArgs, " ")
+ }
+ return cmd
+}
+
+// getDefaultOptions returns the default set of Opts for every run
+func getDefaultOptions() *Opts {
+ return &Opts{
+ runOpts: util.RunOpts{
+ Concurrency: 10,
+ },
+ clientOpts: client.Opts{
+ Timeout: client.ClientTimeout,
+ },
+ }
+}
diff --git a/cli/internal/run/run_spec_test.go b/cli/internal/run/run_spec_test.go
new file mode 100644
index 0000000..2bcfe2b
--- /dev/null
+++ b/cli/internal/run/run_spec_test.go
@@ -0,0 +1,107 @@
+package run
+
+import (
+ "testing"
+
+ "github.com/vercel/turbo/cli/internal/scope"
+ "github.com/vercel/turbo/cli/internal/util"
+)
+
+func TestSynthesizeCommand(t *testing.T) {
+ testCases := []struct {
+ filterPatterns []string
+ legacyFilter scope.LegacyFilter
+ passThroughArgs []string
+ parallel bool
+ continueOnError bool
+ dryRun bool
+ dryRunJSON bool
+ tasks []string
+ expected string
+ }{
+ {
+ filterPatterns: []string{"my-app"},
+ tasks: []string{"build"},
+ expected: "turbo run build --filter=my-app",
+ },
+ {
+ filterPatterns: []string{"my-app"},
+ tasks: []string{"build"},
+ passThroughArgs: []string{"-v", "--foo=bar"},
+ expected: "turbo run build --filter=my-app -- -v --foo=bar",
+ },
+ {
+ legacyFilter: scope.LegacyFilter{
+ Entrypoints: []string{"my-app"},
+ SkipDependents: true,
+ },
+ tasks: []string{"build"},
+ passThroughArgs: []string{"-v", "--foo=bar"},
+ expected: "turbo run build --filter=my-app -- -v --foo=bar",
+ },
+ {
+ legacyFilter: scope.LegacyFilter{
+ Entrypoints: []string{"my-app"},
+ SkipDependents: true,
+ },
+ filterPatterns: []string{"other-app"},
+ tasks: []string{"build"},
+ passThroughArgs: []string{"-v", "--foo=bar"},
+ expected: "turbo run build --filter=other-app --filter=my-app -- -v --foo=bar",
+ },
+ {
+ legacyFilter: scope.LegacyFilter{
+ Entrypoints: []string{"my-app"},
+ IncludeDependencies: true,
+ Since: "some-ref",
+ },
+ filterPatterns: []string{"other-app"},
+ tasks: []string{"build"},
+ expected: "turbo run build --filter=other-app --filter=...my-app...[some-ref]...",
+ },
+ {
+ filterPatterns: []string{"my-app"},
+ tasks: []string{"build"},
+ parallel: true,
+ continueOnError: true,
+ expected: "turbo run build --filter=my-app --parallel --continue",
+ },
+ {
+ filterPatterns: []string{"my-app"},
+ tasks: []string{"build"},
+ dryRun: true,
+ expected: "turbo run build --filter=my-app --dry",
+ },
+ {
+ filterPatterns: []string{"my-app"},
+ tasks: []string{"build"},
+ dryRun: true,
+ dryRunJSON: true,
+ expected: "turbo run build --filter=my-app --dry=json",
+ },
+ }
+
+ for _, testCase := range testCases {
+ testCase := testCase
+ t.Run(testCase.expected, func(t *testing.T) {
+ o := Opts{
+ scopeOpts: scope.Opts{
+ FilterPatterns: testCase.filterPatterns,
+ LegacyFilter: testCase.legacyFilter,
+ },
+ runOpts: util.RunOpts{
+ PassThroughArgs: testCase.passThroughArgs,
+ Parallel: testCase.parallel,
+ ContinueOnError: testCase.continueOnError,
+ DryRun: testCase.dryRun,
+ DryRunJSON: testCase.dryRunJSON,
+ },
+ }
+ cmd := o.SynthesizeCommand(testCase.tasks)
+ if cmd != testCase.expected {
+ t.Errorf("SynthesizeCommand() got %v, want %v", cmd, testCase.expected)
+ }
+ })
+ }
+
+}