diff options
Diffstat (limited to 'cli/internal/run/run.go')
| -rw-r--r-- | cli/internal/run/run.go | 487 |
1 files changed, 487 insertions, 0 deletions
diff --git a/cli/internal/run/run.go b/cli/internal/run/run.go new file mode 100644 index 0000000..2ac1141 --- /dev/null +++ b/cli/internal/run/run.go @@ -0,0 +1,487 @@ +package run + +import ( + gocontext "context" + "fmt" + "os" + "sort" + "sync" + "time" + + "github.com/vercel/turbo/cli/internal/analytics" + "github.com/vercel/turbo/cli/internal/cache" + "github.com/vercel/turbo/cli/internal/cmdutil" + "github.com/vercel/turbo/cli/internal/context" + "github.com/vercel/turbo/cli/internal/core" + "github.com/vercel/turbo/cli/internal/daemon" + "github.com/vercel/turbo/cli/internal/daemonclient" + "github.com/vercel/turbo/cli/internal/env" + "github.com/vercel/turbo/cli/internal/fs" + "github.com/vercel/turbo/cli/internal/graph" + "github.com/vercel/turbo/cli/internal/process" + "github.com/vercel/turbo/cli/internal/runsummary" + "github.com/vercel/turbo/cli/internal/scm" + "github.com/vercel/turbo/cli/internal/scope" + "github.com/vercel/turbo/cli/internal/signals" + "github.com/vercel/turbo/cli/internal/taskhash" + "github.com/vercel/turbo/cli/internal/turbostate" + "github.com/vercel/turbo/cli/internal/ui" + "github.com/vercel/turbo/cli/internal/util" + + "github.com/pkg/errors" +) + +// ExecuteRun executes the run command +func ExecuteRun(ctx gocontext.Context, helper *cmdutil.Helper, signalWatcher *signals.Watcher, args *turbostate.ParsedArgsFromRust) error { + base, err := helper.GetCmdBase(args) + LogTag(base.Logger) + if err != nil { + return err + } + tasks := args.Command.Run.Tasks + passThroughArgs := args.Command.Run.PassThroughArgs + if len(tasks) == 0 { + return errors.New("at least one task must be specified") + } + opts, err := optsFromArgs(args) + if err != nil { + return err + } + + opts.runOpts.PassThroughArgs = passThroughArgs + run := configureRun(base, opts, signalWatcher) + if err := run.run(ctx, tasks); err != nil { + base.LogError("run failed: %v", err) + return err + } + return nil +} + +func optsFromArgs(args *turbostate.ParsedArgsFromRust) (*Opts, error) { + runPayload := args.Command.Run + + opts := getDefaultOptions() + // aliases := make(map[string]string) + if err := scope.OptsFromArgs(&opts.scopeOpts, args); err != nil { + return nil, err + } + + // Cache flags + opts.clientOpts.Timeout = args.RemoteCacheTimeout + opts.cacheOpts.SkipFilesystem = runPayload.RemoteOnly + opts.cacheOpts.OverrideDir = runPayload.CacheDir + opts.cacheOpts.Workers = runPayload.CacheWorkers + + // Run flags + opts.runOpts.LogPrefix = runPayload.LogPrefix + opts.runOpts.Summarize = runPayload.Summarize + opts.runOpts.ExperimentalSpaceID = runPayload.ExperimentalSpaceID + opts.runOpts.EnvMode = runPayload.EnvMode + + // Runcache flags + opts.runcacheOpts.SkipReads = runPayload.Force + opts.runcacheOpts.SkipWrites = runPayload.NoCache + + if runPayload.OutputLogs != "" { + err := opts.runcacheOpts.SetTaskOutputMode(runPayload.OutputLogs) + if err != nil { + return nil, err + } + } + + // Run flags + if runPayload.Concurrency != "" { + concurrency, err := util.ParseConcurrency(runPayload.Concurrency) + if err != nil { + return nil, err + } + opts.runOpts.Concurrency = concurrency + } + opts.runOpts.Parallel = runPayload.Parallel + opts.runOpts.Profile = runPayload.Profile + opts.runOpts.ContinueOnError = runPayload.ContinueExecution + opts.runOpts.Only = runPayload.Only + opts.runOpts.NoDaemon = runPayload.NoDaemon + opts.runOpts.SinglePackage = args.Command.Run.SinglePackage + + // See comment on Graph in turbostate.go for an explanation on Graph's representation. + // If flag is passed... + if runPayload.Graph != nil { + // If no value is attached, we print to stdout + if *runPayload.Graph == "" { + opts.runOpts.GraphDot = true + } else { + // Otherwise, we emit to the file name attached as value + opts.runOpts.GraphDot = false + opts.runOpts.GraphFile = *runPayload.Graph + } + } + + if runPayload.DryRun != "" { + opts.runOpts.DryRunJSON = runPayload.DryRun == _dryRunJSONValue + + if runPayload.DryRun == _dryRunTextValue || runPayload.DryRun == _dryRunJSONValue { + opts.runOpts.DryRun = true + } else { + return nil, fmt.Errorf("invalid dry-run mode: %v", runPayload.DryRun) + } + } + + return opts, nil +} + +func configureRun(base *cmdutil.CmdBase, opts *Opts, signalWatcher *signals.Watcher) *run { + if os.Getenv("TURBO_FORCE") == "true" { + opts.runcacheOpts.SkipReads = true + } + + if os.Getenv("TURBO_REMOTE_ONLY") == "true" { + opts.cacheOpts.SkipFilesystem = true + } + + processes := process.NewManager(base.Logger.Named("processes")) + signalWatcher.AddOnClose(processes.Close) + return &run{ + base: base, + opts: opts, + processes: processes, + } +} + +type run struct { + base *cmdutil.CmdBase + opts *Opts + processes *process.Manager +} + +func (r *run) run(ctx gocontext.Context, targets []string) error { + startAt := time.Now() + packageJSONPath := r.base.RepoRoot.UntypedJoin("package.json") + rootPackageJSON, err := fs.ReadPackageJSON(packageJSONPath) + if err != nil { + return fmt.Errorf("failed to read package.json: %w", err) + } + + isStructuredOutput := r.opts.runOpts.GraphDot || r.opts.runOpts.DryRunJSON + + var pkgDepGraph *context.Context + if r.opts.runOpts.SinglePackage { + pkgDepGraph, err = context.SinglePackageGraph(r.base.RepoRoot, rootPackageJSON) + } else { + pkgDepGraph, err = context.BuildPackageGraph(r.base.RepoRoot, rootPackageJSON) + } + if err != nil { + var warnings *context.Warnings + if errors.As(err, &warnings) { + r.base.LogWarning("Issues occurred when constructing package graph. Turbo will function, but some features may not be available", err) + } else { + return err + } + } + + if ui.IsCI && !r.opts.runOpts.NoDaemon { + r.base.Logger.Info("skipping turbod since we appear to be in a non-interactive context") + } else if !r.opts.runOpts.NoDaemon { + turbodClient, err := daemon.GetClient(ctx, r.base.RepoRoot, r.base.Logger, r.base.TurboVersion, daemon.ClientOpts{}) + if err != nil { + r.base.LogWarning("", errors.Wrap(err, "failed to contact turbod. Continuing in standalone mode")) + } else { + defer func() { _ = turbodClient.Close() }() + r.base.Logger.Debug("running in daemon mode") + daemonClient := daemonclient.New(turbodClient) + r.opts.runcacheOpts.OutputWatcher = daemonClient + } + } + + if err := util.ValidateGraph(&pkgDepGraph.WorkspaceGraph); err != nil { + return errors.Wrap(err, "Invalid package dependency graph") + } + + // TODO: consolidate some of these arguments + // Note: not all properties are set here. GlobalHash and Pipeline keys are set later + g := &graph.CompleteGraph{ + WorkspaceGraph: pkgDepGraph.WorkspaceGraph, + WorkspaceInfos: pkgDepGraph.WorkspaceInfos, + RootNode: pkgDepGraph.RootNode, + TaskDefinitions: map[string]*fs.TaskDefinition{}, + RepoRoot: r.base.RepoRoot, + } + + turboJSON, err := g.GetTurboConfigFromWorkspace(util.RootPkgName, r.opts.runOpts.SinglePackage) + if err != nil { + return err + } + + // TODO: these values come from a config file, hopefully viper can help us merge these + r.opts.cacheOpts.RemoteCacheOpts = turboJSON.RemoteCacheOptions + + pipeline := turboJSON.Pipeline + g.Pipeline = pipeline + scmInstance, err := scm.FromInRepo(r.base.RepoRoot) + if err != nil { + if errors.Is(err, scm.ErrFallback) { + r.base.Logger.Debug("", err) + } else { + return errors.Wrap(err, "failed to create SCM") + } + } + filteredPkgs, isAllPackages, err := scope.ResolvePackages(&r.opts.scopeOpts, r.base.RepoRoot, scmInstance, pkgDepGraph, r.base.UI, r.base.Logger) + if err != nil { + return errors.Wrap(err, "failed to resolve packages to run") + } + if isAllPackages { + // if there is a root task for any of our targets, we need to add it + for _, target := range targets { + key := util.RootTaskID(target) + if _, ok := pipeline[key]; ok { + filteredPkgs.Add(util.RootPkgName) + // we only need to know we're running a root task once to add it for consideration + break + } + } + } + + globalHashable, err := calculateGlobalHash( + r.base.RepoRoot, + rootPackageJSON, + pipeline, + turboJSON.GlobalEnv, + turboJSON.GlobalDeps, + pkgDepGraph.PackageManager, + pkgDepGraph.Lockfile, + turboJSON.GlobalPassthroughEnv, + r.opts.runOpts.EnvMode, + r.base.Logger, + r.base.UI, + isStructuredOutput, + ) + + if err != nil { + return fmt.Errorf("failed to collect global hash inputs: %v", err) + } + + if globalHash, err := calculateGlobalHashFromHashable(globalHashable); err == nil { + r.base.Logger.Debug("global hash", "value", globalHash) + g.GlobalHash = globalHash + } else { + return fmt.Errorf("failed to calculate global hash: %v", err) + } + + r.base.Logger.Debug("local cache folder", "path", r.opts.cacheOpts.OverrideDir) + + rs := &runSpec{ + Targets: targets, + FilteredPkgs: filteredPkgs, + Opts: r.opts, + } + packageManager := pkgDepGraph.PackageManager + + engine, err := buildTaskGraphEngine( + g, + rs, + r.opts.runOpts.SinglePackage, + ) + + if err != nil { + return errors.Wrap(err, "error preparing engine") + } + + taskHashTracker := taskhash.NewTracker( + g.RootNode, + g.GlobalHash, + // TODO(mehulkar): remove g,Pipeline, because we need to get task definitions from CompleteGaph instead + g.Pipeline, + ) + + g.TaskHashTracker = taskHashTracker + + // CalculateFileHashes assigns PackageInputsExpandedHashes as a side-effect + err = taskHashTracker.CalculateFileHashes( + engine.TaskGraph.Vertices(), + rs.Opts.runOpts.Concurrency, + g.WorkspaceInfos, + g.TaskDefinitions, + r.base.RepoRoot, + ) + + if err != nil { + return errors.Wrap(err, "error hashing package files") + } + + // If we are running in parallel, then we remove all the edges in the graph + // except for the root. Rebuild the task graph for backwards compatibility. + // We still use dependencies specified by the pipeline configuration. + if rs.Opts.runOpts.Parallel { + for _, edge := range g.WorkspaceGraph.Edges() { + if edge.Target() != g.RootNode { + g.WorkspaceGraph.RemoveEdge(edge) + } + } + engine, err = buildTaskGraphEngine( + g, + rs, + r.opts.runOpts.SinglePackage, + ) + if err != nil { + return errors.Wrap(err, "error preparing engine") + } + } + + // Graph Run + if rs.Opts.runOpts.GraphFile != "" || rs.Opts.runOpts.GraphDot { + return GraphRun(ctx, rs, engine, r.base) + } + + packagesInScope := rs.FilteredPkgs.UnsafeListOfStrings() + sort.Strings(packagesInScope) + // Initiate analytics and cache + analyticsClient := r.initAnalyticsClient(ctx) + defer analyticsClient.CloseWithTimeout(50 * time.Millisecond) + turboCache, err := r.initCache(ctx, rs, analyticsClient) + + if err != nil { + if errors.Is(err, cache.ErrNoCachesEnabled) { + r.base.UI.Warn("No caches are enabled. You can try \"turbo login\", \"turbo link\", or ensuring you are not passing --remote-only to enable caching") + } else { + return errors.Wrap(err, "failed to set up caching") + } + } + + var envVarPassthroughMap env.EnvironmentVariableMap + if globalHashable.envVarPassthroughs != nil { + if envVarPassthroughDetailedMap, err := env.GetHashableEnvVars(globalHashable.envVarPassthroughs, nil, ""); err == nil { + envVarPassthroughMap = envVarPassthroughDetailedMap.BySource.Explicit + } + } + + globalEnvMode := rs.Opts.runOpts.EnvMode + if globalEnvMode == util.Infer && turboJSON.GlobalPassthroughEnv != nil { + globalEnvMode = util.Strict + } + + // RunSummary contains information that is statically analyzable about + // the tasks that we expect to run based on the user command. + summary := runsummary.NewRunSummary( + startAt, + r.base.UI, + r.base.RepoRoot, + rs.Opts.scopeOpts.PackageInferenceRoot, + r.base.TurboVersion, + r.base.APIClient, + rs.Opts.runOpts, + packagesInScope, + globalEnvMode, + runsummary.NewGlobalHashSummary( + globalHashable.globalFileHashMap, + globalHashable.rootExternalDepsHash, + globalHashable.envVars, + envVarPassthroughMap, + globalHashable.globalCacheKey, + globalHashable.pipeline, + ), + rs.Opts.SynthesizeCommand(rs.Targets), + ) + + // Dry Run + if rs.Opts.runOpts.DryRun { + return DryRun( + ctx, + g, + rs, + engine, + taskHashTracker, + turboCache, + turboJSON, + globalEnvMode, + r.base, + summary, + ) + } + + // Regular run + return RealRun( + ctx, + g, + rs, + engine, + taskHashTracker, + turboCache, + turboJSON, + globalEnvMode, + packagesInScope, + r.base, + summary, + // Extra arg only for regular runs, dry-run doesn't get this + packageManager, + r.processes, + ) +} + +func (r *run) initAnalyticsClient(ctx gocontext.Context) analytics.Client { + apiClient := r.base.APIClient + var analyticsSink analytics.Sink + if apiClient.IsLinked() { + analyticsSink = apiClient + } else { + r.opts.cacheOpts.SkipRemote = true + analyticsSink = analytics.NullSink + } + analyticsClient := analytics.NewClient(ctx, analyticsSink, r.base.Logger.Named("analytics")) + return analyticsClient +} + +func (r *run) initCache(ctx gocontext.Context, rs *runSpec, analyticsClient analytics.Client) (cache.Cache, error) { + apiClient := r.base.APIClient + // Theoretically this is overkill, but bias towards not spamming the console + once := &sync.Once{} + + return cache.New(rs.Opts.cacheOpts, r.base.RepoRoot, apiClient, analyticsClient, func(_cache cache.Cache, err error) { + // Currently the HTTP Cache is the only one that can be disabled. + // With a cache system refactor, we might consider giving names to the caches so + // we can accurately report them here. + once.Do(func() { + r.base.LogWarning("Remote Caching is unavailable", err) + }) + }) +} + +func buildTaskGraphEngine( + g *graph.CompleteGraph, + rs *runSpec, + isSinglePackage bool, +) (*core.Engine, error) { + engine := core.NewEngine(g, isSinglePackage) + + // Note: g.Pipeline is a map, but this for loop only cares about the keys + for taskName := range g.Pipeline { + engine.AddTask(taskName) + } + + if err := engine.Prepare(&core.EngineBuildingOptions{ + Packages: rs.FilteredPkgs.UnsafeListOfStrings(), + TaskNames: rs.Targets, + TasksOnly: rs.Opts.runOpts.Only, + }); err != nil { + return nil, err + } + + // Check for cycles in the DAG. + if err := util.ValidateGraph(engine.TaskGraph); err != nil { + return nil, fmt.Errorf("Invalid task dependency graph:\n%v", err) + } + + // Check that no tasks would be blocked by a persistent task + if err := engine.ValidatePersistentDependencies(g, rs.Opts.runOpts.Concurrency); err != nil { + return nil, fmt.Errorf("Invalid persistent task configuration:\n%v", err) + } + + return engine, nil +} + +// dry run custom flag +// NOTE: These *must* be kept in sync with the corresponding Rust +// enum definitions in shim/src/commands/mod.rs +const ( + _dryRunJSONValue = "Json" + _dryRunTextValue = "Text" +) |
