aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/cli/internal/run/real_run.go
diff options
context:
space:
mode:
author简律纯 <hsiangnianian@outlook.com>2023-04-28 01:36:55 +0800
committer简律纯 <hsiangnianian@outlook.com>2023-04-28 01:36:55 +0800
commitfc8c5fdce62fb229202659408798a7b6c98f6e8b (patch)
tree7554f80e50de4af6fd255afa7c21bcdd58a7af34 /cli/internal/run/real_run.go
parentdd84b9d64fb98746a230cd24233ff50a562c39c9 (diff)
downloadHydroRoll-fc8c5fdce62fb229202659408798a7b6c98f6e8b.tar.gz
HydroRoll-fc8c5fdce62fb229202659408798a7b6c98f6e8b.zip
Diffstat (limited to 'cli/internal/run/real_run.go')
-rw-r--r--cli/internal/run/real_run.go420
1 files changed, 0 insertions, 420 deletions
diff --git a/cli/internal/run/real_run.go b/cli/internal/run/real_run.go
deleted file mode 100644
index 32c7965..0000000
--- a/cli/internal/run/real_run.go
+++ /dev/null
@@ -1,420 +0,0 @@
-package run
-
-import (
- gocontext "context"
- "fmt"
- "log"
- "os/exec"
- "strings"
- "sync"
- "time"
-
- "github.com/fatih/color"
- "github.com/hashicorp/go-hclog"
- "github.com/mitchellh/cli"
- "github.com/pkg/errors"
- "github.com/vercel/turbo/cli/internal/cache"
- "github.com/vercel/turbo/cli/internal/cmdutil"
- "github.com/vercel/turbo/cli/internal/colorcache"
- "github.com/vercel/turbo/cli/internal/core"
- "github.com/vercel/turbo/cli/internal/env"
- "github.com/vercel/turbo/cli/internal/fs"
- "github.com/vercel/turbo/cli/internal/graph"
- "github.com/vercel/turbo/cli/internal/logstreamer"
- "github.com/vercel/turbo/cli/internal/nodes"
- "github.com/vercel/turbo/cli/internal/packagemanager"
- "github.com/vercel/turbo/cli/internal/process"
- "github.com/vercel/turbo/cli/internal/runcache"
- "github.com/vercel/turbo/cli/internal/runsummary"
- "github.com/vercel/turbo/cli/internal/spinner"
- "github.com/vercel/turbo/cli/internal/taskhash"
- "github.com/vercel/turbo/cli/internal/turbopath"
- "github.com/vercel/turbo/cli/internal/ui"
- "github.com/vercel/turbo/cli/internal/util"
-)
-
-// RealRun executes a set of tasks
-func RealRun(
- ctx gocontext.Context,
- g *graph.CompleteGraph,
- rs *runSpec,
- engine *core.Engine,
- taskHashTracker *taskhash.Tracker,
- turboCache cache.Cache,
- turboJSON *fs.TurboJSON,
- globalEnvMode util.EnvMode,
- packagesInScope []string,
- base *cmdutil.CmdBase,
- runSummary runsummary.Meta,
- packageManager *packagemanager.PackageManager,
- processes *process.Manager,
-) error {
- singlePackage := rs.Opts.runOpts.SinglePackage
-
- if singlePackage {
- base.UI.Output(fmt.Sprintf("%s %s", ui.Dim("• Running"), ui.Dim(ui.Bold(strings.Join(rs.Targets, ", ")))))
- } else {
- base.UI.Output(fmt.Sprintf(ui.Dim("• Packages in scope: %v"), strings.Join(packagesInScope, ", ")))
- base.UI.Output(fmt.Sprintf("%s %s %s", ui.Dim("• Running"), ui.Dim(ui.Bold(strings.Join(rs.Targets, ", "))), ui.Dim(fmt.Sprintf("in %v packages", rs.FilteredPkgs.Len()))))
- }
-
- // Log whether remote cache is enabled
- useHTTPCache := !rs.Opts.cacheOpts.SkipRemote
- if useHTTPCache {
- base.UI.Info(ui.Dim("• Remote caching enabled"))
- } else {
- base.UI.Info(ui.Dim("• Remote caching disabled"))
- }
-
- defer func() {
- _ = spinner.WaitFor(ctx, turboCache.Shutdown, base.UI, "...writing to cache...", 1500*time.Millisecond)
- }()
- colorCache := colorcache.New()
-
- runCache := runcache.New(turboCache, base.RepoRoot, rs.Opts.runcacheOpts, colorCache)
-
- ec := &execContext{
- colorCache: colorCache,
- runSummary: runSummary,
- rs: rs,
- ui: &cli.ConcurrentUi{Ui: base.UI},
- runCache: runCache,
- env: turboJSON.GlobalEnv,
- passthroughEnv: turboJSON.GlobalPassthroughEnv,
- logger: base.Logger,
- packageManager: packageManager,
- processes: processes,
- taskHashTracker: taskHashTracker,
- repoRoot: base.RepoRoot,
- isSinglePackage: singlePackage,
- }
-
- // run the thing
- execOpts := core.EngineExecutionOptions{
- Parallel: rs.Opts.runOpts.Parallel,
- Concurrency: rs.Opts.runOpts.Concurrency,
- }
-
- mu := sync.Mutex{}
- taskSummaries := []*runsummary.TaskSummary{}
- execFunc := func(ctx gocontext.Context, packageTask *nodes.PackageTask, taskSummary *runsummary.TaskSummary) error {
- taskExecutionSummary, err := ec.exec(ctx, packageTask)
-
- // taskExecutionSummary will be nil if the task never executed
- // (i.e. if the workspace didn't implement the script corresponding to the task)
- // We don't need to collect any of the outputs or execution if the task didn't execute.
- if taskExecutionSummary != nil {
- taskSummary.ExpandedOutputs = taskHashTracker.GetExpandedOutputs(taskSummary.TaskID)
- taskSummary.Execution = taskExecutionSummary
- taskSummary.CacheSummary = taskHashTracker.GetCacheStatus(taskSummary.TaskID)
-
- // lock since multiple things to be appending to this array at the same time
- mu.Lock()
- taskSummaries = append(taskSummaries, taskSummary)
- // not using defer, just release the lock
- mu.Unlock()
- }
-
- // Return the error when there is one
- if err != nil {
- return err
- }
-
- return nil
- }
-
- getArgs := func(taskID string) []string {
- return rs.ArgsForTask(taskID)
- }
-
- visitorFn := g.GetPackageTaskVisitor(ctx, engine.TaskGraph, globalEnvMode, getArgs, base.Logger, execFunc)
- errs := engine.Execute(visitorFn, execOpts)
-
- // Track if we saw any child with a non-zero exit code
- exitCode := 0
- exitCodeErr := &process.ChildExit{}
-
- // Assign tasks after execution
- runSummary.RunSummary.Tasks = taskSummaries
-
- for _, err := range errs {
- if errors.As(err, &exitCodeErr) {
- // If a process gets killed via a signal, Go reports it's exit code as -1.
- // We take the absolute value of the exit code so we don't select '0' as
- // the greatest exit code.
- childExit := exitCodeErr.ExitCode
- if childExit < 0 {
- childExit = -childExit
- }
- if childExit > exitCode {
- exitCode = childExit
- }
- } else if exitCode == 0 {
- // We hit some error, it shouldn't be exit code 0
- exitCode = 1
- }
- base.UI.Error(err.Error())
- }
-
- // When continue on error is enabled don't register failed tasks as errors
- // and instead must inspect the task summaries.
- if ec.rs.Opts.runOpts.ContinueOnError {
- for _, summary := range runSummary.RunSummary.Tasks {
- if childExit := summary.Execution.ExitCode(); childExit != nil {
- childExit := *childExit
- if childExit < 0 {
- childExit = -childExit
- }
- if childExit > exitCode {
- exitCode = childExit
- }
- }
- }
- }
-
- if err := runSummary.Close(ctx, exitCode, g.WorkspaceInfos); err != nil {
- // We don't need to throw an error, but we can warn on this.
- // Note: this method doesn't actually return an error for Real Runs at the time of writing.
- base.UI.Info(fmt.Sprintf("Failed to close Run Summary %v", err))
- }
-
- if exitCode != 0 {
- return &process.ChildExit{
- ExitCode: exitCode,
- }
- }
- return nil
-}
-
-type execContext struct {
- colorCache *colorcache.ColorCache
- runSummary runsummary.Meta
- rs *runSpec
- ui cli.Ui
- runCache *runcache.RunCache
- env []string
- passthroughEnv []string
- logger hclog.Logger
- packageManager *packagemanager.PackageManager
- processes *process.Manager
- taskHashTracker *taskhash.Tracker
- repoRoot turbopath.AbsoluteSystemPath
- isSinglePackage bool
-}
-
-func (ec *execContext) logError(prefix string, err error) {
- ec.logger.Error(prefix, "error", err)
-
- if prefix != "" {
- prefix += ": "
- }
-
- ec.ui.Error(fmt.Sprintf("%s%s%s", ui.ERROR_PREFIX, prefix, color.RedString(" %v", err)))
-}
-
-func (ec *execContext) exec(ctx gocontext.Context, packageTask *nodes.PackageTask) (*runsummary.TaskExecutionSummary, error) {
- // Setup tracer. Every time tracer() is called the taskExecutionSummary's duration is updated
- // So make sure to call it before returning.
- tracer, taskExecutionSummary := ec.runSummary.RunSummary.TrackTask(packageTask.TaskID)
-
- progressLogger := ec.logger.Named("")
- progressLogger.Debug("start")
-
- passThroughArgs := ec.rs.ArgsForTask(packageTask.Task)
- hash := packageTask.Hash
- ec.logger.Debug("task hash", "value", hash)
- // TODO(gsoltis): if/when we fix https://github.com/vercel/turbo/issues/937
- // the following block should never get hit. In the meantime, keep it after hashing
- // so that downstream tasks can count on the hash existing
- //
- // bail if the script doesn't exist
- if packageTask.Command == "" {
- progressLogger.Debug("no task in package, skipping")
- progressLogger.Debug("done", "status", "skipped", "duration", taskExecutionSummary.Duration)
- // Return nil here because there was no execution, so there is no task execution summary
- return nil, nil
- }
-
- // Set building status now that we know it's going to run.
- tracer(runsummary.TargetBuilding, nil, &successCode)
-
- var prefix string
- var prettyPrefix string
- if ec.rs.Opts.runOpts.LogPrefix == "none" {
- prefix = ""
- } else {
- prefix = packageTask.OutputPrefix(ec.isSinglePackage)
- }
-
- prettyPrefix = ec.colorCache.PrefixWithColor(packageTask.PackageName, prefix)
-
- // Cache ---------------------------------------------
- taskCache := ec.runCache.TaskCache(packageTask, hash)
- // Create a logger for replaying
- prefixedUI := &cli.PrefixedUi{
- Ui: ec.ui,
- OutputPrefix: prettyPrefix,
- InfoPrefix: prettyPrefix,
- ErrorPrefix: prettyPrefix,
- WarnPrefix: prettyPrefix,
- }
-
- cacheStatus, timeSaved, err := taskCache.RestoreOutputs(ctx, prefixedUI, progressLogger)
-
- // It's safe to set the CacheStatus even if there's an error, because if there's
- // an error, the 0 values are actually what we want. We save cacheStatus and timeSaved
- // for the task, so that even if there's an error, we have those values for the taskSummary.
- ec.taskHashTracker.SetCacheStatus(
- packageTask.TaskID,
- runsummary.NewTaskCacheSummary(cacheStatus, &timeSaved),
- )
-
- if err != nil {
- prefixedUI.Error(fmt.Sprintf("error fetching from cache: %s", err))
- } else if cacheStatus.Local || cacheStatus.Remote { // If there was a cache hit
- ec.taskHashTracker.SetExpandedOutputs(packageTask.TaskID, taskCache.ExpandedOutputs)
- // We only cache successful executions, so we can assume this is a successCode exit.
- tracer(runsummary.TargetCached, nil, &successCode)
- return taskExecutionSummary, nil
- }
-
- // Setup command execution
- argsactual := append([]string{"run"}, packageTask.Task)
- if len(passThroughArgs) > 0 {
- // This will be either '--' or a typed nil
- argsactual = append(argsactual, ec.packageManager.ArgSeparator...)
- argsactual = append(argsactual, passThroughArgs...)
- }
-
- cmd := exec.Command(ec.packageManager.Command, argsactual...)
- cmd.Dir = packageTask.Pkg.Dir.ToSystemPath().RestoreAnchor(ec.repoRoot).ToString()
-
- currentState := env.GetEnvMap()
- passthroughEnv := env.EnvironmentVariableMap{}
-
- if packageTask.EnvMode == util.Strict {
- defaultPassthrough := []string{
- "PATH",
- "SHELL",
- "SYSTEMROOT", // Go will always include this on Windows, but we're being explicit here
- }
-
- passthroughEnv.Merge(env.FromKeys(currentState, defaultPassthrough))
- passthroughEnv.Merge(env.FromKeys(currentState, ec.env))
- passthroughEnv.Merge(env.FromKeys(currentState, ec.passthroughEnv))
- passthroughEnv.Merge(env.FromKeys(currentState, packageTask.TaskDefinition.EnvVarDependencies))
- passthroughEnv.Merge(env.FromKeys(currentState, packageTask.TaskDefinition.PassthroughEnv))
- } else {
- passthroughEnv.Merge(currentState)
- }
-
- // Always last to make sure it clobbers.
- passthroughEnv.Add("TURBO_HASH", hash)
-
- cmd.Env = passthroughEnv.ToHashable()
-
- // Setup stdout/stderr
- // If we are not caching anything, then we don't need to write logs to disk
- // be careful about this conditional given the default of cache = true
- writer, err := taskCache.OutputWriter(prettyPrefix)
- if err != nil {
- tracer(runsummary.TargetBuildFailed, err, nil)
-
- ec.logError(prettyPrefix, err)
- if !ec.rs.Opts.runOpts.ContinueOnError {
- return nil, errors.Wrapf(err, "failed to capture outputs for \"%v\"", packageTask.TaskID)
- }
- }
-
- // Create a logger
- logger := log.New(writer, "", 0)
- // Setup a streamer that we'll pipe cmd.Stdout to
- logStreamerOut := logstreamer.NewLogstreamer(logger, prettyPrefix, false)
- // Setup a streamer that we'll pipe cmd.Stderr to.
- logStreamerErr := logstreamer.NewLogstreamer(logger, prettyPrefix, false)
- cmd.Stderr = logStreamerErr
- cmd.Stdout = logStreamerOut
- // Flush/Reset any error we recorded
- logStreamerErr.FlushRecord()
- logStreamerOut.FlushRecord()
-
- closeOutputs := func() error {
- var closeErrors []error
-
- if err := logStreamerOut.Close(); err != nil {
- closeErrors = append(closeErrors, errors.Wrap(err, "log stdout"))
- }
- if err := logStreamerErr.Close(); err != nil {
- closeErrors = append(closeErrors, errors.Wrap(err, "log stderr"))
- }
-
- if err := writer.Close(); err != nil {
- closeErrors = append(closeErrors, errors.Wrap(err, "log file"))
- }
- if len(closeErrors) > 0 {
- msgs := make([]string, len(closeErrors))
- for i, err := range closeErrors {
- msgs[i] = err.Error()
- }
- return fmt.Errorf("could not flush log output: %v", strings.Join(msgs, ", "))
- }
- return nil
- }
-
- // Run the command
- if err := ec.processes.Exec(cmd); err != nil {
- // close off our outputs. We errored, so we mostly don't care if we fail to close
- _ = closeOutputs()
- // if we already know we're in the process of exiting,
- // we don't need to record an error to that effect.
- if errors.Is(err, process.ErrClosing) {
- return taskExecutionSummary, nil
- }
-
- // If the error we got is a ChildExit, it will have an ExitCode field
- // Pass that along into the tracer.
- var e *process.ChildExit
- if errors.As(err, &e) {
- tracer(runsummary.TargetBuildFailed, err, &e.ExitCode)
- } else {
- // If it wasn't a ChildExit, and something else went wrong, we don't have an exitCode
- tracer(runsummary.TargetBuildFailed, err, nil)
- }
-
- progressLogger.Error(fmt.Sprintf("Error: command finished with error: %v", err))
- if !ec.rs.Opts.runOpts.ContinueOnError {
- prefixedUI.Error(fmt.Sprintf("ERROR: command finished with error: %s", err))
- ec.processes.Close()
- } else {
- prefixedUI.Warn("command finished with error, but continuing...")
- // Set to nil so we don't short-circuit any other execution
- err = nil
- }
-
- // If there was an error, flush the buffered output
- taskCache.OnError(prefixedUI, progressLogger)
-
- return taskExecutionSummary, err
- }
-
- // Add another timestamp into the tracer, so we have an accurate timestamp for how long the task took.
- tracer(runsummary.TargetExecuted, nil, nil)
-
- // Close off our outputs and cache them
- if err := closeOutputs(); err != nil {
- ec.logError("", err)
- } else {
- if err = taskCache.SaveOutputs(ctx, progressLogger, prefixedUI, int(taskExecutionSummary.Duration.Milliseconds())); err != nil {
- ec.logError("", fmt.Errorf("error caching output: %w", err))
- } else {
- ec.taskHashTracker.SetExpandedOutputs(packageTask.TaskID, taskCache.ExpandedOutputs)
- }
- }
-
- // Clean up tracing
- tracer(runsummary.TargetBuilt, nil, &successCode)
- progressLogger.Debug("done", "status", "complete", "duration", taskExecutionSummary.Duration)
- return taskExecutionSummary, nil
-}
-
-var successCode = 0