diff options
| author | 2023-04-28 01:36:44 +0800 | |
|---|---|---|
| committer | 2023-04-28 01:36:44 +0800 | |
| commit | dd84b9d64fb98746a230cd24233ff50a562c39c9 (patch) | |
| tree | b583261ef00b3afe72ec4d6dacb31e57779a6faf /cli/internal/run/dry_run.go | |
| parent | 0b46fcd72ac34382387b2bcf9095233efbcc52f4 (diff) | |
| download | HydroRoll-dd84b9d64fb98746a230cd24233ff50a562c39c9.tar.gz HydroRoll-dd84b9d64fb98746a230cd24233ff50a562c39c9.zip | |
Diffstat (limited to 'cli/internal/run/dry_run.go')
| -rw-r--r-- | cli/internal/run/dry_run.go | 122 |
1 files changed, 122 insertions, 0 deletions
diff --git a/cli/internal/run/dry_run.go b/cli/internal/run/dry_run.go new file mode 100644 index 0000000..eeee431 --- /dev/null +++ b/cli/internal/run/dry_run.go @@ -0,0 +1,122 @@ +// Package run implements `turbo run` +// This file implements the logic for `turbo run --dry` +package run + +import ( + gocontext "context" + "sync" + + "github.com/pkg/errors" + "github.com/vercel/turbo/cli/internal/cache" + "github.com/vercel/turbo/cli/internal/cmdutil" + "github.com/vercel/turbo/cli/internal/core" + "github.com/vercel/turbo/cli/internal/fs" + "github.com/vercel/turbo/cli/internal/graph" + "github.com/vercel/turbo/cli/internal/nodes" + "github.com/vercel/turbo/cli/internal/runsummary" + "github.com/vercel/turbo/cli/internal/taskhash" + "github.com/vercel/turbo/cli/internal/util" +) + +// DryRun gets all the info needed from tasks and prints out a summary, but doesn't actually +// execute the task. +func DryRun( + ctx gocontext.Context, + g *graph.CompleteGraph, + rs *runSpec, + engine *core.Engine, + _ *taskhash.Tracker, // unused, but keep here for parity with RealRun method signature + turboCache cache.Cache, + _ *fs.TurboJSON, // unused, but keep here for parity with RealRun method signature + globalEnvMode util.EnvMode, + base *cmdutil.CmdBase, + summary runsummary.Meta, +) error { + defer turboCache.Shutdown() + + taskSummaries := []*runsummary.TaskSummary{} + + mu := sync.Mutex{} + execFunc := func(ctx gocontext.Context, packageTask *nodes.PackageTask, taskSummary *runsummary.TaskSummary) error { + // Assign some fallbacks if they were missing + if taskSummary.Command == "" { + taskSummary.Command = runsummary.MissingTaskLabel + } + + if taskSummary.Framework == "" { + taskSummary.Framework = runsummary.MissingFrameworkLabel + } + + // This mutex is not _really_ required, since we are using Concurrency: 1 as an execution + // option, but we add it here to match the shape of RealRuns execFunc. + mu.Lock() + defer mu.Unlock() + taskSummaries = append(taskSummaries, taskSummary) + return nil + } + + // This setup mirrors a real run. We call engine.execute() with + // a visitor function and some hardcoded execOpts. + // Note: we do not currently attempt to parallelize the graph walking + // (as we do in real execution) + getArgs := func(taskID string) []string { + return rs.ArgsForTask(taskID) + } + + visitorFn := g.GetPackageTaskVisitor(ctx, engine.TaskGraph, globalEnvMode, getArgs, base.Logger, execFunc) + execOpts := core.EngineExecutionOptions{ + Concurrency: 1, + Parallel: false, + } + + if errs := engine.Execute(visitorFn, execOpts); len(errs) > 0 { + for _, err := range errs { + base.UI.Error(err.Error()) + } + return errors.New("errors occurred during dry-run graph traversal") + } + + // We walk the graph with no concurrency. + // Populating the cache state is parallelizable. + // Do this _after_ walking the graph. + populateCacheState(turboCache, taskSummaries) + + // Assign the Task Summaries to the main summary + summary.RunSummary.Tasks = taskSummaries + + // The exitCode isn't really used by the Run Summary Close() method for dry runs + // but we pass in a successful value to match Real Runs. + return summary.Close(ctx, 0, g.WorkspaceInfos) +} + +func populateCacheState(turboCache cache.Cache, taskSummaries []*runsummary.TaskSummary) { + // We make at most 8 requests at a time for cache state. + maxParallelRequests := 8 + taskCount := len(taskSummaries) + + parallelRequestCount := maxParallelRequests + if taskCount < maxParallelRequests { + parallelRequestCount = taskCount + } + + queue := make(chan int, taskCount) + + wg := &sync.WaitGroup{} + for i := 0; i < parallelRequestCount; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for index := range queue { + task := taskSummaries[index] + itemStatus := turboCache.Exists(task.Hash) + task.CacheSummary = runsummary.NewTaskCacheSummary(itemStatus, nil) + } + }() + } + + for index := range taskSummaries { + queue <- index + } + close(queue) + wg.Wait() +} |
