diff options
| author | 2023-04-28 01:36:44 +0800 | |
|---|---|---|
| committer | 2023-04-28 01:36:44 +0800 | |
| commit | dd84b9d64fb98746a230cd24233ff50a562c39c9 (patch) | |
| tree | b583261ef00b3afe72ec4d6dacb31e57779a6faf /cli/internal/daemon/daemon.go | |
| parent | 0b46fcd72ac34382387b2bcf9095233efbcc52f4 (diff) | |
| download | HydroRoll-dd84b9d64fb98746a230cd24233ff50a562c39c9.tar.gz HydroRoll-dd84b9d64fb98746a230cd24233ff50a562c39c9.zip | |
Diffstat (limited to 'cli/internal/daemon/daemon.go')
| -rw-r--r-- | cli/internal/daemon/daemon.go | 307 |
1 files changed, 307 insertions, 0 deletions
diff --git a/cli/internal/daemon/daemon.go b/cli/internal/daemon/daemon.go new file mode 100644 index 0000000..81d5283 --- /dev/null +++ b/cli/internal/daemon/daemon.go @@ -0,0 +1,307 @@ +package daemon + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "io" + "net" + "os" + "path/filepath" + "strings" + "time" + + grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" + "github.com/hashicorp/go-hclog" + "github.com/nightlyone/lockfile" + "github.com/pkg/errors" + "github.com/vercel/turbo/cli/internal/cmdutil" + "github.com/vercel/turbo/cli/internal/daemon/connector" + "github.com/vercel/turbo/cli/internal/fs" + "github.com/vercel/turbo/cli/internal/server" + "github.com/vercel/turbo/cli/internal/signals" + "github.com/vercel/turbo/cli/internal/turbopath" + "github.com/vercel/turbo/cli/internal/turbostate" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type daemon struct { + logger hclog.Logger + repoRoot turbopath.AbsoluteSystemPath + timeout time.Duration + reqCh chan struct{} + timedOutCh chan struct{} +} + +func getRepoHash(repoRoot turbopath.AbsoluteSystemPath) string { + pathHash := sha256.Sum256([]byte(repoRoot.ToString())) + // We grab a substring of the hash because there is a 108-character limit on the length + // of a filepath for unix domain socket. + return hex.EncodeToString(pathHash[:])[:16] +} + +func getDaemonFileRoot(repoRoot turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath { + tempDir := fs.TempDir("turbod") + hexHash := getRepoHash(repoRoot) + return tempDir.UntypedJoin(hexHash) +} + +func getLogFilePath(repoRoot turbopath.AbsoluteSystemPath) (turbopath.AbsoluteSystemPath, error) { + hexHash := getRepoHash(repoRoot) + base := repoRoot.Base() + logFilename := fmt.Sprintf("%v-%v.log", hexHash, base) + + logsDir := fs.GetTurboDataDir().UntypedJoin("logs") + return logsDir.UntypedJoin(logFilename), nil +} + +func getUnixSocket(repoRoot turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath { + root := getDaemonFileRoot(repoRoot) + return root.UntypedJoin("turbod.sock") +} + +func getPidFile(repoRoot turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath { + root := getDaemonFileRoot(repoRoot) + return root.UntypedJoin("turbod.pid") +} + +// logError logs an error and outputs it to the UI. +func (d *daemon) logError(err error) { + d.logger.Error(fmt.Sprintf("error %v", err)) +} + +// we're only appending, and we're creating the file if it doesn't exist. +// we do not need to read the log file. +var _logFileFlags = os.O_WRONLY | os.O_APPEND | os.O_CREATE + +// ExecuteDaemon executes the root daemon command +func ExecuteDaemon(ctx context.Context, helper *cmdutil.Helper, signalWatcher *signals.Watcher, args *turbostate.ParsedArgsFromRust) error { + base, err := helper.GetCmdBase(args) + if err != nil { + return err + } + if args.TestRun { + base.UI.Info("Daemon test run successful") + return nil + } + + idleTimeout := 4 * time.Hour + if args.Command.Daemon.IdleTimeout != "" { + idleTimeout, err = time.ParseDuration(args.Command.Daemon.IdleTimeout) + if err != nil { + return err + } + } + + logFilePath, err := getLogFilePath(base.RepoRoot) + if err != nil { + return err + } + if err := logFilePath.EnsureDir(); err != nil { + return err + } + logFile, err := logFilePath.OpenFile(_logFileFlags, 0644) + if err != nil { + return err + } + defer func() { _ = logFile.Close() }() + logger := hclog.New(&hclog.LoggerOptions{ + Output: io.MultiWriter(logFile, os.Stdout), + Level: hclog.Info, + Color: hclog.ColorOff, + Name: "turbod", + }) + + d := &daemon{ + logger: logger, + repoRoot: base.RepoRoot, + timeout: idleTimeout, + reqCh: make(chan struct{}), + timedOutCh: make(chan struct{}), + } + serverName := getRepoHash(base.RepoRoot) + turboServer, err := server.New(serverName, d.logger.Named("rpc server"), base.RepoRoot, base.TurboVersion, logFilePath) + if err != nil { + d.logError(err) + return err + } + defer func() { _ = turboServer.Close() }() + err = d.runTurboServer(ctx, turboServer, signalWatcher) + if err != nil { + d.logError(err) + return err + } + return nil +} + +var errInactivityTimeout = errors.New("turbod shut down from inactivity") + +// tryAcquirePidfileLock attempts to ensure that only one daemon is running from the given pid file path +// at a time. If this process fails to write its PID to the lockfile, it must exit. +func tryAcquirePidfileLock(pidPath turbopath.AbsoluteSystemPath) (lockfile.Lockfile, error) { + if err := pidPath.EnsureDir(); err != nil { + return "", err + } + lockFile, err := lockfile.New(pidPath.ToString()) + if err != nil { + // lockfile.New should only return an error if it wasn't given an absolute path. + // We are attempting to use the type system to enforce that we are passing an + // absolute path. An error here likely means a bug, and we should crash. + panic(err) + } + if err := lockFile.TryLock(); err != nil { + return "", err + } + return lockFile, nil +} + +type rpcServer interface { + Register(grpcServer server.GRPCServer) +} + +func (d *daemon) runTurboServer(parentContext context.Context, rpcServer rpcServer, signalWatcher *signals.Watcher) error { + ctx, cancel := context.WithCancel(parentContext) + defer cancel() + pidPath := getPidFile(d.repoRoot) + lock, err := tryAcquirePidfileLock(pidPath) + if err != nil { + return errors.Wrapf(err, "failed to lock the pid file at %v. Is another turbo daemon running?", lock) + } + // When we're done serving, clean up the pid file. + // Also, if *this* goroutine panics, make sure we unlock the pid file. + defer func() { + if err := lock.Unlock(); err != nil { + d.logger.Error(errors.Wrapf(err, "failed unlocking pid file at %v", lock).Error()) + } + }() + // This handler runs in request goroutines. If a request causes a panic, + // this handler will get called after a call to recover(), meaning we are + // no longer panicking. We return a server error and cancel our context, + // which triggers a shutdown of the server. + panicHandler := func(thePanic interface{}) error { + cancel() + d.logger.Error(fmt.Sprintf("Caught panic %v", thePanic)) + return status.Error(codes.Internal, "server panicked") + } + + // If we have the lock, assume that we are the owners of the socket file, + // whether it already exists or not. That means we are free to remove it. + sockPath := getUnixSocket(d.repoRoot) + if err := sockPath.Remove(); err != nil && !errors.Is(err, os.ErrNotExist) { + return err + } + d.logger.Debug(fmt.Sprintf("Using socket path %v (%v)\n", sockPath, len(sockPath))) + lis, err := net.Listen("unix", sockPath.ToString()) + if err != nil { + return err + } + // We don't need to explicitly close 'lis', the grpc server will handle that + s := grpc.NewServer( + grpc.ChainUnaryInterceptor( + d.onRequest, + grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandler(panicHandler)), + ), + ) + go d.timeoutLoop(ctx) + + rpcServer.Register(s) + errCh := make(chan error) + go func(errCh chan<- error) { + if err := s.Serve(lis); err != nil { + errCh <- err + } + close(errCh) + }(errCh) + + // Note that we aren't deferring s.GracefulStop here because we also need + // to drain the error channel, which isn't guaranteed to happen until + // the server has stopped. That in turn may depend on GracefulStop being + // called. + // Future work could restructure this to make that simpler. + var exitErr error + select { + case err, ok := <-errCh: + // The server exited + if ok { + exitErr = err + } + case <-d.timedOutCh: + // This is the inactivity timeout case + exitErr = errInactivityTimeout + s.GracefulStop() + case <-ctx.Done(): + // If a request handler panics, it will cancel this context + s.GracefulStop() + case <-signalWatcher.Done(): + // This is fired if caught a signal + s.GracefulStop() + } + // Wait for the server to exit, if it hasn't already. + // When it does, this channel will close. We don't + // care about the error in this scenario because we've + // either requested a close via cancelling the context, + // an inactivity timeout, or caught a signal. + for range errCh { + } + return exitErr +} + +func (d *daemon) onRequest(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + d.reqCh <- struct{}{} + return handler(ctx, req) +} + +func (d *daemon) timeoutLoop(ctx context.Context) { + timeoutCh := time.After(d.timeout) +outer: + for { + select { + case <-d.reqCh: + timeoutCh = time.After(d.timeout) + case <-timeoutCh: + close(d.timedOutCh) + break outer + case <-ctx.Done(): + break outer + } + } +} + +// ClientOpts re-exports connector.Ops to encapsulate the connector package +type ClientOpts = connector.Opts + +// Client re-exports connector.Client to encapsulate the connector package +type Client = connector.Client + +// GetClient returns a client that can be used to interact with the daemon +func GetClient(ctx context.Context, repoRoot turbopath.AbsoluteSystemPath, logger hclog.Logger, turboVersion string, opts ClientOpts) (*Client, error) { + sockPath := getUnixSocket(repoRoot) + pidPath := getPidFile(repoRoot) + logPath, err := getLogFilePath(repoRoot) + if err != nil { + return nil, err + } + bin, err := os.Executable() + if err != nil { + return nil, err + } + // The Go binary can no longer be called directly, so we need to route back to the rust wrapper + if strings.HasSuffix(bin, "go-turbo") { + bin = filepath.Join(filepath.Dir(bin), "turbo") + } else if strings.HasSuffix(bin, "go-turbo.exe") { + bin = filepath.Join(filepath.Dir(bin), "turbo.exe") + } + c := &connector.Connector{ + Logger: logger.Named("TurbodClient"), + Bin: bin, + Opts: opts, + SockPath: sockPath, + PidPath: pidPath, + LogPath: logPath, + TurboVersion: turboVersion, + } + return c.Connect(ctx) +} |
