diff options
Diffstat (limited to 'cli/internal/run/real_run.go')
| -rw-r--r-- | cli/internal/run/real_run.go | 420 |
1 files changed, 420 insertions, 0 deletions
diff --git a/cli/internal/run/real_run.go b/cli/internal/run/real_run.go new file mode 100644 index 0000000..32c7965 --- /dev/null +++ b/cli/internal/run/real_run.go @@ -0,0 +1,420 @@ +package run + +import ( + gocontext "context" + "fmt" + "log" + "os/exec" + "strings" + "sync" + "time" + + "github.com/fatih/color" + "github.com/hashicorp/go-hclog" + "github.com/mitchellh/cli" + "github.com/pkg/errors" + "github.com/vercel/turbo/cli/internal/cache" + "github.com/vercel/turbo/cli/internal/cmdutil" + "github.com/vercel/turbo/cli/internal/colorcache" + "github.com/vercel/turbo/cli/internal/core" + "github.com/vercel/turbo/cli/internal/env" + "github.com/vercel/turbo/cli/internal/fs" + "github.com/vercel/turbo/cli/internal/graph" + "github.com/vercel/turbo/cli/internal/logstreamer" + "github.com/vercel/turbo/cli/internal/nodes" + "github.com/vercel/turbo/cli/internal/packagemanager" + "github.com/vercel/turbo/cli/internal/process" + "github.com/vercel/turbo/cli/internal/runcache" + "github.com/vercel/turbo/cli/internal/runsummary" + "github.com/vercel/turbo/cli/internal/spinner" + "github.com/vercel/turbo/cli/internal/taskhash" + "github.com/vercel/turbo/cli/internal/turbopath" + "github.com/vercel/turbo/cli/internal/ui" + "github.com/vercel/turbo/cli/internal/util" +) + +// RealRun executes a set of tasks +func RealRun( + ctx gocontext.Context, + g *graph.CompleteGraph, + rs *runSpec, + engine *core.Engine, + taskHashTracker *taskhash.Tracker, + turboCache cache.Cache, + turboJSON *fs.TurboJSON, + globalEnvMode util.EnvMode, + packagesInScope []string, + base *cmdutil.CmdBase, + runSummary runsummary.Meta, + packageManager *packagemanager.PackageManager, + processes *process.Manager, +) error { + singlePackage := rs.Opts.runOpts.SinglePackage + + if singlePackage { + base.UI.Output(fmt.Sprintf("%s %s", ui.Dim("• Running"), ui.Dim(ui.Bold(strings.Join(rs.Targets, ", "))))) + } else { + base.UI.Output(fmt.Sprintf(ui.Dim("• Packages in scope: %v"), strings.Join(packagesInScope, ", "))) + base.UI.Output(fmt.Sprintf("%s %s %s", ui.Dim("• Running"), ui.Dim(ui.Bold(strings.Join(rs.Targets, ", "))), ui.Dim(fmt.Sprintf("in %v packages", rs.FilteredPkgs.Len())))) + } + + // Log whether remote cache is enabled + useHTTPCache := !rs.Opts.cacheOpts.SkipRemote + if useHTTPCache { + base.UI.Info(ui.Dim("• Remote caching enabled")) + } else { + base.UI.Info(ui.Dim("• Remote caching disabled")) + } + + defer func() { + _ = spinner.WaitFor(ctx, turboCache.Shutdown, base.UI, "...writing to cache...", 1500*time.Millisecond) + }() + colorCache := colorcache.New() + + runCache := runcache.New(turboCache, base.RepoRoot, rs.Opts.runcacheOpts, colorCache) + + ec := &execContext{ + colorCache: colorCache, + runSummary: runSummary, + rs: rs, + ui: &cli.ConcurrentUi{Ui: base.UI}, + runCache: runCache, + env: turboJSON.GlobalEnv, + passthroughEnv: turboJSON.GlobalPassthroughEnv, + logger: base.Logger, + packageManager: packageManager, + processes: processes, + taskHashTracker: taskHashTracker, + repoRoot: base.RepoRoot, + isSinglePackage: singlePackage, + } + + // run the thing + execOpts := core.EngineExecutionOptions{ + Parallel: rs.Opts.runOpts.Parallel, + Concurrency: rs.Opts.runOpts.Concurrency, + } + + mu := sync.Mutex{} + taskSummaries := []*runsummary.TaskSummary{} + execFunc := func(ctx gocontext.Context, packageTask *nodes.PackageTask, taskSummary *runsummary.TaskSummary) error { + taskExecutionSummary, err := ec.exec(ctx, packageTask) + + // taskExecutionSummary will be nil if the task never executed + // (i.e. if the workspace didn't implement the script corresponding to the task) + // We don't need to collect any of the outputs or execution if the task didn't execute. + if taskExecutionSummary != nil { + taskSummary.ExpandedOutputs = taskHashTracker.GetExpandedOutputs(taskSummary.TaskID) + taskSummary.Execution = taskExecutionSummary + taskSummary.CacheSummary = taskHashTracker.GetCacheStatus(taskSummary.TaskID) + + // lock since multiple things to be appending to this array at the same time + mu.Lock() + taskSummaries = append(taskSummaries, taskSummary) + // not using defer, just release the lock + mu.Unlock() + } + + // Return the error when there is one + if err != nil { + return err + } + + return nil + } + + getArgs := func(taskID string) []string { + return rs.ArgsForTask(taskID) + } + + visitorFn := g.GetPackageTaskVisitor(ctx, engine.TaskGraph, globalEnvMode, getArgs, base.Logger, execFunc) + errs := engine.Execute(visitorFn, execOpts) + + // Track if we saw any child with a non-zero exit code + exitCode := 0 + exitCodeErr := &process.ChildExit{} + + // Assign tasks after execution + runSummary.RunSummary.Tasks = taskSummaries + + for _, err := range errs { + if errors.As(err, &exitCodeErr) { + // If a process gets killed via a signal, Go reports it's exit code as -1. + // We take the absolute value of the exit code so we don't select '0' as + // the greatest exit code. + childExit := exitCodeErr.ExitCode + if childExit < 0 { + childExit = -childExit + } + if childExit > exitCode { + exitCode = childExit + } + } else if exitCode == 0 { + // We hit some error, it shouldn't be exit code 0 + exitCode = 1 + } + base.UI.Error(err.Error()) + } + + // When continue on error is enabled don't register failed tasks as errors + // and instead must inspect the task summaries. + if ec.rs.Opts.runOpts.ContinueOnError { + for _, summary := range runSummary.RunSummary.Tasks { + if childExit := summary.Execution.ExitCode(); childExit != nil { + childExit := *childExit + if childExit < 0 { + childExit = -childExit + } + if childExit > exitCode { + exitCode = childExit + } + } + } + } + + if err := runSummary.Close(ctx, exitCode, g.WorkspaceInfos); err != nil { + // We don't need to throw an error, but we can warn on this. + // Note: this method doesn't actually return an error for Real Runs at the time of writing. + base.UI.Info(fmt.Sprintf("Failed to close Run Summary %v", err)) + } + + if exitCode != 0 { + return &process.ChildExit{ + ExitCode: exitCode, + } + } + return nil +} + +type execContext struct { + colorCache *colorcache.ColorCache + runSummary runsummary.Meta + rs *runSpec + ui cli.Ui + runCache *runcache.RunCache + env []string + passthroughEnv []string + logger hclog.Logger + packageManager *packagemanager.PackageManager + processes *process.Manager + taskHashTracker *taskhash.Tracker + repoRoot turbopath.AbsoluteSystemPath + isSinglePackage bool +} + +func (ec *execContext) logError(prefix string, err error) { + ec.logger.Error(prefix, "error", err) + + if prefix != "" { + prefix += ": " + } + + ec.ui.Error(fmt.Sprintf("%s%s%s", ui.ERROR_PREFIX, prefix, color.RedString(" %v", err))) +} + +func (ec *execContext) exec(ctx gocontext.Context, packageTask *nodes.PackageTask) (*runsummary.TaskExecutionSummary, error) { + // Setup tracer. Every time tracer() is called the taskExecutionSummary's duration is updated + // So make sure to call it before returning. + tracer, taskExecutionSummary := ec.runSummary.RunSummary.TrackTask(packageTask.TaskID) + + progressLogger := ec.logger.Named("") + progressLogger.Debug("start") + + passThroughArgs := ec.rs.ArgsForTask(packageTask.Task) + hash := packageTask.Hash + ec.logger.Debug("task hash", "value", hash) + // TODO(gsoltis): if/when we fix https://github.com/vercel/turbo/issues/937 + // the following block should never get hit. In the meantime, keep it after hashing + // so that downstream tasks can count on the hash existing + // + // bail if the script doesn't exist + if packageTask.Command == "" { + progressLogger.Debug("no task in package, skipping") + progressLogger.Debug("done", "status", "skipped", "duration", taskExecutionSummary.Duration) + // Return nil here because there was no execution, so there is no task execution summary + return nil, nil + } + + // Set building status now that we know it's going to run. + tracer(runsummary.TargetBuilding, nil, &successCode) + + var prefix string + var prettyPrefix string + if ec.rs.Opts.runOpts.LogPrefix == "none" { + prefix = "" + } else { + prefix = packageTask.OutputPrefix(ec.isSinglePackage) + } + + prettyPrefix = ec.colorCache.PrefixWithColor(packageTask.PackageName, prefix) + + // Cache --------------------------------------------- + taskCache := ec.runCache.TaskCache(packageTask, hash) + // Create a logger for replaying + prefixedUI := &cli.PrefixedUi{ + Ui: ec.ui, + OutputPrefix: prettyPrefix, + InfoPrefix: prettyPrefix, + ErrorPrefix: prettyPrefix, + WarnPrefix: prettyPrefix, + } + + cacheStatus, timeSaved, err := taskCache.RestoreOutputs(ctx, prefixedUI, progressLogger) + + // It's safe to set the CacheStatus even if there's an error, because if there's + // an error, the 0 values are actually what we want. We save cacheStatus and timeSaved + // for the task, so that even if there's an error, we have those values for the taskSummary. + ec.taskHashTracker.SetCacheStatus( + packageTask.TaskID, + runsummary.NewTaskCacheSummary(cacheStatus, &timeSaved), + ) + + if err != nil { + prefixedUI.Error(fmt.Sprintf("error fetching from cache: %s", err)) + } else if cacheStatus.Local || cacheStatus.Remote { // If there was a cache hit + ec.taskHashTracker.SetExpandedOutputs(packageTask.TaskID, taskCache.ExpandedOutputs) + // We only cache successful executions, so we can assume this is a successCode exit. + tracer(runsummary.TargetCached, nil, &successCode) + return taskExecutionSummary, nil + } + + // Setup command execution + argsactual := append([]string{"run"}, packageTask.Task) + if len(passThroughArgs) > 0 { + // This will be either '--' or a typed nil + argsactual = append(argsactual, ec.packageManager.ArgSeparator...) + argsactual = append(argsactual, passThroughArgs...) + } + + cmd := exec.Command(ec.packageManager.Command, argsactual...) + cmd.Dir = packageTask.Pkg.Dir.ToSystemPath().RestoreAnchor(ec.repoRoot).ToString() + + currentState := env.GetEnvMap() + passthroughEnv := env.EnvironmentVariableMap{} + + if packageTask.EnvMode == util.Strict { + defaultPassthrough := []string{ + "PATH", + "SHELL", + "SYSTEMROOT", // Go will always include this on Windows, but we're being explicit here + } + + passthroughEnv.Merge(env.FromKeys(currentState, defaultPassthrough)) + passthroughEnv.Merge(env.FromKeys(currentState, ec.env)) + passthroughEnv.Merge(env.FromKeys(currentState, ec.passthroughEnv)) + passthroughEnv.Merge(env.FromKeys(currentState, packageTask.TaskDefinition.EnvVarDependencies)) + passthroughEnv.Merge(env.FromKeys(currentState, packageTask.TaskDefinition.PassthroughEnv)) + } else { + passthroughEnv.Merge(currentState) + } + + // Always last to make sure it clobbers. + passthroughEnv.Add("TURBO_HASH", hash) + + cmd.Env = passthroughEnv.ToHashable() + + // Setup stdout/stderr + // If we are not caching anything, then we don't need to write logs to disk + // be careful about this conditional given the default of cache = true + writer, err := taskCache.OutputWriter(prettyPrefix) + if err != nil { + tracer(runsummary.TargetBuildFailed, err, nil) + + ec.logError(prettyPrefix, err) + if !ec.rs.Opts.runOpts.ContinueOnError { + return nil, errors.Wrapf(err, "failed to capture outputs for \"%v\"", packageTask.TaskID) + } + } + + // Create a logger + logger := log.New(writer, "", 0) + // Setup a streamer that we'll pipe cmd.Stdout to + logStreamerOut := logstreamer.NewLogstreamer(logger, prettyPrefix, false) + // Setup a streamer that we'll pipe cmd.Stderr to. + logStreamerErr := logstreamer.NewLogstreamer(logger, prettyPrefix, false) + cmd.Stderr = logStreamerErr + cmd.Stdout = logStreamerOut + // Flush/Reset any error we recorded + logStreamerErr.FlushRecord() + logStreamerOut.FlushRecord() + + closeOutputs := func() error { + var closeErrors []error + + if err := logStreamerOut.Close(); err != nil { + closeErrors = append(closeErrors, errors.Wrap(err, "log stdout")) + } + if err := logStreamerErr.Close(); err != nil { + closeErrors = append(closeErrors, errors.Wrap(err, "log stderr")) + } + + if err := writer.Close(); err != nil { + closeErrors = append(closeErrors, errors.Wrap(err, "log file")) + } + if len(closeErrors) > 0 { + msgs := make([]string, len(closeErrors)) + for i, err := range closeErrors { + msgs[i] = err.Error() + } + return fmt.Errorf("could not flush log output: %v", strings.Join(msgs, ", ")) + } + return nil + } + + // Run the command + if err := ec.processes.Exec(cmd); err != nil { + // close off our outputs. We errored, so we mostly don't care if we fail to close + _ = closeOutputs() + // if we already know we're in the process of exiting, + // we don't need to record an error to that effect. + if errors.Is(err, process.ErrClosing) { + return taskExecutionSummary, nil + } + + // If the error we got is a ChildExit, it will have an ExitCode field + // Pass that along into the tracer. + var e *process.ChildExit + if errors.As(err, &e) { + tracer(runsummary.TargetBuildFailed, err, &e.ExitCode) + } else { + // If it wasn't a ChildExit, and something else went wrong, we don't have an exitCode + tracer(runsummary.TargetBuildFailed, err, nil) + } + + progressLogger.Error(fmt.Sprintf("Error: command finished with error: %v", err)) + if !ec.rs.Opts.runOpts.ContinueOnError { + prefixedUI.Error(fmt.Sprintf("ERROR: command finished with error: %s", err)) + ec.processes.Close() + } else { + prefixedUI.Warn("command finished with error, but continuing...") + // Set to nil so we don't short-circuit any other execution + err = nil + } + + // If there was an error, flush the buffered output + taskCache.OnError(prefixedUI, progressLogger) + + return taskExecutionSummary, err + } + + // Add another timestamp into the tracer, so we have an accurate timestamp for how long the task took. + tracer(runsummary.TargetExecuted, nil, nil) + + // Close off our outputs and cache them + if err := closeOutputs(); err != nil { + ec.logError("", err) + } else { + if err = taskCache.SaveOutputs(ctx, progressLogger, prefixedUI, int(taskExecutionSummary.Duration.Milliseconds())); err != nil { + ec.logError("", fmt.Errorf("error caching output: %w", err)) + } else { + ec.taskHashTracker.SetExpandedOutputs(packageTask.TaskID, taskCache.ExpandedOutputs) + } + } + + // Clean up tracing + tracer(runsummary.TargetBuilt, nil, &successCode) + progressLogger.Debug("done", "status", "complete", "duration", taskExecutionSummary.Duration) + return taskExecutionSummary, nil +} + +var successCode = 0 |
