diff options
| author | 2023-04-28 01:36:55 +0800 | |
|---|---|---|
| committer | 2023-04-28 01:36:55 +0800 | |
| commit | fc8c5fdce62fb229202659408798a7b6c98f6e8b (patch) | |
| tree | 7554f80e50de4af6fd255afa7c21bcdd58a7af34 /cli/internal/run/run.go | |
| parent | dd84b9d64fb98746a230cd24233ff50a562c39c9 (diff) | |
| download | HydroRoll-fc8c5fdce62fb229202659408798a7b6c98f6e8b.tar.gz HydroRoll-fc8c5fdce62fb229202659408798a7b6c98f6e8b.zip | |
Diffstat (limited to 'cli/internal/run/run.go')
| -rw-r--r-- | cli/internal/run/run.go | 487 |
1 files changed, 0 insertions, 487 deletions
diff --git a/cli/internal/run/run.go b/cli/internal/run/run.go deleted file mode 100644 index 2ac1141..0000000 --- a/cli/internal/run/run.go +++ /dev/null @@ -1,487 +0,0 @@ -package run - -import ( - gocontext "context" - "fmt" - "os" - "sort" - "sync" - "time" - - "github.com/vercel/turbo/cli/internal/analytics" - "github.com/vercel/turbo/cli/internal/cache" - "github.com/vercel/turbo/cli/internal/cmdutil" - "github.com/vercel/turbo/cli/internal/context" - "github.com/vercel/turbo/cli/internal/core" - "github.com/vercel/turbo/cli/internal/daemon" - "github.com/vercel/turbo/cli/internal/daemonclient" - "github.com/vercel/turbo/cli/internal/env" - "github.com/vercel/turbo/cli/internal/fs" - "github.com/vercel/turbo/cli/internal/graph" - "github.com/vercel/turbo/cli/internal/process" - "github.com/vercel/turbo/cli/internal/runsummary" - "github.com/vercel/turbo/cli/internal/scm" - "github.com/vercel/turbo/cli/internal/scope" - "github.com/vercel/turbo/cli/internal/signals" - "github.com/vercel/turbo/cli/internal/taskhash" - "github.com/vercel/turbo/cli/internal/turbostate" - "github.com/vercel/turbo/cli/internal/ui" - "github.com/vercel/turbo/cli/internal/util" - - "github.com/pkg/errors" -) - -// ExecuteRun executes the run command -func ExecuteRun(ctx gocontext.Context, helper *cmdutil.Helper, signalWatcher *signals.Watcher, args *turbostate.ParsedArgsFromRust) error { - base, err := helper.GetCmdBase(args) - LogTag(base.Logger) - if err != nil { - return err - } - tasks := args.Command.Run.Tasks - passThroughArgs := args.Command.Run.PassThroughArgs - if len(tasks) == 0 { - return errors.New("at least one task must be specified") - } - opts, err := optsFromArgs(args) - if err != nil { - return err - } - - opts.runOpts.PassThroughArgs = passThroughArgs - run := configureRun(base, opts, signalWatcher) - if err := run.run(ctx, tasks); err != nil { - base.LogError("run failed: %v", err) - return err - } - return nil -} - -func optsFromArgs(args *turbostate.ParsedArgsFromRust) (*Opts, error) { - runPayload := args.Command.Run - - opts := getDefaultOptions() - // aliases := make(map[string]string) - if err := scope.OptsFromArgs(&opts.scopeOpts, args); err != nil { - return nil, err - } - - // Cache flags - opts.clientOpts.Timeout = args.RemoteCacheTimeout - opts.cacheOpts.SkipFilesystem = runPayload.RemoteOnly - opts.cacheOpts.OverrideDir = runPayload.CacheDir - opts.cacheOpts.Workers = runPayload.CacheWorkers - - // Run flags - opts.runOpts.LogPrefix = runPayload.LogPrefix - opts.runOpts.Summarize = runPayload.Summarize - opts.runOpts.ExperimentalSpaceID = runPayload.ExperimentalSpaceID - opts.runOpts.EnvMode = runPayload.EnvMode - - // Runcache flags - opts.runcacheOpts.SkipReads = runPayload.Force - opts.runcacheOpts.SkipWrites = runPayload.NoCache - - if runPayload.OutputLogs != "" { - err := opts.runcacheOpts.SetTaskOutputMode(runPayload.OutputLogs) - if err != nil { - return nil, err - } - } - - // Run flags - if runPayload.Concurrency != "" { - concurrency, err := util.ParseConcurrency(runPayload.Concurrency) - if err != nil { - return nil, err - } - opts.runOpts.Concurrency = concurrency - } - opts.runOpts.Parallel = runPayload.Parallel - opts.runOpts.Profile = runPayload.Profile - opts.runOpts.ContinueOnError = runPayload.ContinueExecution - opts.runOpts.Only = runPayload.Only - opts.runOpts.NoDaemon = runPayload.NoDaemon - opts.runOpts.SinglePackage = args.Command.Run.SinglePackage - - // See comment on Graph in turbostate.go for an explanation on Graph's representation. - // If flag is passed... - if runPayload.Graph != nil { - // If no value is attached, we print to stdout - if *runPayload.Graph == "" { - opts.runOpts.GraphDot = true - } else { - // Otherwise, we emit to the file name attached as value - opts.runOpts.GraphDot = false - opts.runOpts.GraphFile = *runPayload.Graph - } - } - - if runPayload.DryRun != "" { - opts.runOpts.DryRunJSON = runPayload.DryRun == _dryRunJSONValue - - if runPayload.DryRun == _dryRunTextValue || runPayload.DryRun == _dryRunJSONValue { - opts.runOpts.DryRun = true - } else { - return nil, fmt.Errorf("invalid dry-run mode: %v", runPayload.DryRun) - } - } - - return opts, nil -} - -func configureRun(base *cmdutil.CmdBase, opts *Opts, signalWatcher *signals.Watcher) *run { - if os.Getenv("TURBO_FORCE") == "true" { - opts.runcacheOpts.SkipReads = true - } - - if os.Getenv("TURBO_REMOTE_ONLY") == "true" { - opts.cacheOpts.SkipFilesystem = true - } - - processes := process.NewManager(base.Logger.Named("processes")) - signalWatcher.AddOnClose(processes.Close) - return &run{ - base: base, - opts: opts, - processes: processes, - } -} - -type run struct { - base *cmdutil.CmdBase - opts *Opts - processes *process.Manager -} - -func (r *run) run(ctx gocontext.Context, targets []string) error { - startAt := time.Now() - packageJSONPath := r.base.RepoRoot.UntypedJoin("package.json") - rootPackageJSON, err := fs.ReadPackageJSON(packageJSONPath) - if err != nil { - return fmt.Errorf("failed to read package.json: %w", err) - } - - isStructuredOutput := r.opts.runOpts.GraphDot || r.opts.runOpts.DryRunJSON - - var pkgDepGraph *context.Context - if r.opts.runOpts.SinglePackage { - pkgDepGraph, err = context.SinglePackageGraph(r.base.RepoRoot, rootPackageJSON) - } else { - pkgDepGraph, err = context.BuildPackageGraph(r.base.RepoRoot, rootPackageJSON) - } - if err != nil { - var warnings *context.Warnings - if errors.As(err, &warnings) { - r.base.LogWarning("Issues occurred when constructing package graph. Turbo will function, but some features may not be available", err) - } else { - return err - } - } - - if ui.IsCI && !r.opts.runOpts.NoDaemon { - r.base.Logger.Info("skipping turbod since we appear to be in a non-interactive context") - } else if !r.opts.runOpts.NoDaemon { - turbodClient, err := daemon.GetClient(ctx, r.base.RepoRoot, r.base.Logger, r.base.TurboVersion, daemon.ClientOpts{}) - if err != nil { - r.base.LogWarning("", errors.Wrap(err, "failed to contact turbod. Continuing in standalone mode")) - } else { - defer func() { _ = turbodClient.Close() }() - r.base.Logger.Debug("running in daemon mode") - daemonClient := daemonclient.New(turbodClient) - r.opts.runcacheOpts.OutputWatcher = daemonClient - } - } - - if err := util.ValidateGraph(&pkgDepGraph.WorkspaceGraph); err != nil { - return errors.Wrap(err, "Invalid package dependency graph") - } - - // TODO: consolidate some of these arguments - // Note: not all properties are set here. GlobalHash and Pipeline keys are set later - g := &graph.CompleteGraph{ - WorkspaceGraph: pkgDepGraph.WorkspaceGraph, - WorkspaceInfos: pkgDepGraph.WorkspaceInfos, - RootNode: pkgDepGraph.RootNode, - TaskDefinitions: map[string]*fs.TaskDefinition{}, - RepoRoot: r.base.RepoRoot, - } - - turboJSON, err := g.GetTurboConfigFromWorkspace(util.RootPkgName, r.opts.runOpts.SinglePackage) - if err != nil { - return err - } - - // TODO: these values come from a config file, hopefully viper can help us merge these - r.opts.cacheOpts.RemoteCacheOpts = turboJSON.RemoteCacheOptions - - pipeline := turboJSON.Pipeline - g.Pipeline = pipeline - scmInstance, err := scm.FromInRepo(r.base.RepoRoot) - if err != nil { - if errors.Is(err, scm.ErrFallback) { - r.base.Logger.Debug("", err) - } else { - return errors.Wrap(err, "failed to create SCM") - } - } - filteredPkgs, isAllPackages, err := scope.ResolvePackages(&r.opts.scopeOpts, r.base.RepoRoot, scmInstance, pkgDepGraph, r.base.UI, r.base.Logger) - if err != nil { - return errors.Wrap(err, "failed to resolve packages to run") - } - if isAllPackages { - // if there is a root task for any of our targets, we need to add it - for _, target := range targets { - key := util.RootTaskID(target) - if _, ok := pipeline[key]; ok { - filteredPkgs.Add(util.RootPkgName) - // we only need to know we're running a root task once to add it for consideration - break - } - } - } - - globalHashable, err := calculateGlobalHash( - r.base.RepoRoot, - rootPackageJSON, - pipeline, - turboJSON.GlobalEnv, - turboJSON.GlobalDeps, - pkgDepGraph.PackageManager, - pkgDepGraph.Lockfile, - turboJSON.GlobalPassthroughEnv, - r.opts.runOpts.EnvMode, - r.base.Logger, - r.base.UI, - isStructuredOutput, - ) - - if err != nil { - return fmt.Errorf("failed to collect global hash inputs: %v", err) - } - - if globalHash, err := calculateGlobalHashFromHashable(globalHashable); err == nil { - r.base.Logger.Debug("global hash", "value", globalHash) - g.GlobalHash = globalHash - } else { - return fmt.Errorf("failed to calculate global hash: %v", err) - } - - r.base.Logger.Debug("local cache folder", "path", r.opts.cacheOpts.OverrideDir) - - rs := &runSpec{ - Targets: targets, - FilteredPkgs: filteredPkgs, - Opts: r.opts, - } - packageManager := pkgDepGraph.PackageManager - - engine, err := buildTaskGraphEngine( - g, - rs, - r.opts.runOpts.SinglePackage, - ) - - if err != nil { - return errors.Wrap(err, "error preparing engine") - } - - taskHashTracker := taskhash.NewTracker( - g.RootNode, - g.GlobalHash, - // TODO(mehulkar): remove g,Pipeline, because we need to get task definitions from CompleteGaph instead - g.Pipeline, - ) - - g.TaskHashTracker = taskHashTracker - - // CalculateFileHashes assigns PackageInputsExpandedHashes as a side-effect - err = taskHashTracker.CalculateFileHashes( - engine.TaskGraph.Vertices(), - rs.Opts.runOpts.Concurrency, - g.WorkspaceInfos, - g.TaskDefinitions, - r.base.RepoRoot, - ) - - if err != nil { - return errors.Wrap(err, "error hashing package files") - } - - // If we are running in parallel, then we remove all the edges in the graph - // except for the root. Rebuild the task graph for backwards compatibility. - // We still use dependencies specified by the pipeline configuration. - if rs.Opts.runOpts.Parallel { - for _, edge := range g.WorkspaceGraph.Edges() { - if edge.Target() != g.RootNode { - g.WorkspaceGraph.RemoveEdge(edge) - } - } - engine, err = buildTaskGraphEngine( - g, - rs, - r.opts.runOpts.SinglePackage, - ) - if err != nil { - return errors.Wrap(err, "error preparing engine") - } - } - - // Graph Run - if rs.Opts.runOpts.GraphFile != "" || rs.Opts.runOpts.GraphDot { - return GraphRun(ctx, rs, engine, r.base) - } - - packagesInScope := rs.FilteredPkgs.UnsafeListOfStrings() - sort.Strings(packagesInScope) - // Initiate analytics and cache - analyticsClient := r.initAnalyticsClient(ctx) - defer analyticsClient.CloseWithTimeout(50 * time.Millisecond) - turboCache, err := r.initCache(ctx, rs, analyticsClient) - - if err != nil { - if errors.Is(err, cache.ErrNoCachesEnabled) { - r.base.UI.Warn("No caches are enabled. You can try \"turbo login\", \"turbo link\", or ensuring you are not passing --remote-only to enable caching") - } else { - return errors.Wrap(err, "failed to set up caching") - } - } - - var envVarPassthroughMap env.EnvironmentVariableMap - if globalHashable.envVarPassthroughs != nil { - if envVarPassthroughDetailedMap, err := env.GetHashableEnvVars(globalHashable.envVarPassthroughs, nil, ""); err == nil { - envVarPassthroughMap = envVarPassthroughDetailedMap.BySource.Explicit - } - } - - globalEnvMode := rs.Opts.runOpts.EnvMode - if globalEnvMode == util.Infer && turboJSON.GlobalPassthroughEnv != nil { - globalEnvMode = util.Strict - } - - // RunSummary contains information that is statically analyzable about - // the tasks that we expect to run based on the user command. - summary := runsummary.NewRunSummary( - startAt, - r.base.UI, - r.base.RepoRoot, - rs.Opts.scopeOpts.PackageInferenceRoot, - r.base.TurboVersion, - r.base.APIClient, - rs.Opts.runOpts, - packagesInScope, - globalEnvMode, - runsummary.NewGlobalHashSummary( - globalHashable.globalFileHashMap, - globalHashable.rootExternalDepsHash, - globalHashable.envVars, - envVarPassthroughMap, - globalHashable.globalCacheKey, - globalHashable.pipeline, - ), - rs.Opts.SynthesizeCommand(rs.Targets), - ) - - // Dry Run - if rs.Opts.runOpts.DryRun { - return DryRun( - ctx, - g, - rs, - engine, - taskHashTracker, - turboCache, - turboJSON, - globalEnvMode, - r.base, - summary, - ) - } - - // Regular run - return RealRun( - ctx, - g, - rs, - engine, - taskHashTracker, - turboCache, - turboJSON, - globalEnvMode, - packagesInScope, - r.base, - summary, - // Extra arg only for regular runs, dry-run doesn't get this - packageManager, - r.processes, - ) -} - -func (r *run) initAnalyticsClient(ctx gocontext.Context) analytics.Client { - apiClient := r.base.APIClient - var analyticsSink analytics.Sink - if apiClient.IsLinked() { - analyticsSink = apiClient - } else { - r.opts.cacheOpts.SkipRemote = true - analyticsSink = analytics.NullSink - } - analyticsClient := analytics.NewClient(ctx, analyticsSink, r.base.Logger.Named("analytics")) - return analyticsClient -} - -func (r *run) initCache(ctx gocontext.Context, rs *runSpec, analyticsClient analytics.Client) (cache.Cache, error) { - apiClient := r.base.APIClient - // Theoretically this is overkill, but bias towards not spamming the console - once := &sync.Once{} - - return cache.New(rs.Opts.cacheOpts, r.base.RepoRoot, apiClient, analyticsClient, func(_cache cache.Cache, err error) { - // Currently the HTTP Cache is the only one that can be disabled. - // With a cache system refactor, we might consider giving names to the caches so - // we can accurately report them here. - once.Do(func() { - r.base.LogWarning("Remote Caching is unavailable", err) - }) - }) -} - -func buildTaskGraphEngine( - g *graph.CompleteGraph, - rs *runSpec, - isSinglePackage bool, -) (*core.Engine, error) { - engine := core.NewEngine(g, isSinglePackage) - - // Note: g.Pipeline is a map, but this for loop only cares about the keys - for taskName := range g.Pipeline { - engine.AddTask(taskName) - } - - if err := engine.Prepare(&core.EngineBuildingOptions{ - Packages: rs.FilteredPkgs.UnsafeListOfStrings(), - TaskNames: rs.Targets, - TasksOnly: rs.Opts.runOpts.Only, - }); err != nil { - return nil, err - } - - // Check for cycles in the DAG. - if err := util.ValidateGraph(engine.TaskGraph); err != nil { - return nil, fmt.Errorf("Invalid task dependency graph:\n%v", err) - } - - // Check that no tasks would be blocked by a persistent task - if err := engine.ValidatePersistentDependencies(g, rs.Opts.runOpts.Concurrency); err != nil { - return nil, fmt.Errorf("Invalid persistent task configuration:\n%v", err) - } - - return engine, nil -} - -// dry run custom flag -// NOTE: These *must* be kept in sync with the corresponding Rust -// enum definitions in shim/src/commands/mod.rs -const ( - _dryRunJSONValue = "Json" - _dryRunTextValue = "Text" -) |
