aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/cli/internal/daemon/daemon.go
diff options
context:
space:
mode:
Diffstat (limited to 'cli/internal/daemon/daemon.go')
-rw-r--r--cli/internal/daemon/daemon.go307
1 files changed, 307 insertions, 0 deletions
diff --git a/cli/internal/daemon/daemon.go b/cli/internal/daemon/daemon.go
new file mode 100644
index 0000000..81d5283
--- /dev/null
+++ b/cli/internal/daemon/daemon.go
@@ -0,0 +1,307 @@
+package daemon
+
+import (
+ "context"
+ "crypto/sha256"
+ "encoding/hex"
+ "fmt"
+ "io"
+ "net"
+ "os"
+ "path/filepath"
+ "strings"
+ "time"
+
+ grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
+ "github.com/hashicorp/go-hclog"
+ "github.com/nightlyone/lockfile"
+ "github.com/pkg/errors"
+ "github.com/vercel/turbo/cli/internal/cmdutil"
+ "github.com/vercel/turbo/cli/internal/daemon/connector"
+ "github.com/vercel/turbo/cli/internal/fs"
+ "github.com/vercel/turbo/cli/internal/server"
+ "github.com/vercel/turbo/cli/internal/signals"
+ "github.com/vercel/turbo/cli/internal/turbopath"
+ "github.com/vercel/turbo/cli/internal/turbostate"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+type daemon struct {
+ logger hclog.Logger
+ repoRoot turbopath.AbsoluteSystemPath
+ timeout time.Duration
+ reqCh chan struct{}
+ timedOutCh chan struct{}
+}
+
+func getRepoHash(repoRoot turbopath.AbsoluteSystemPath) string {
+ pathHash := sha256.Sum256([]byte(repoRoot.ToString()))
+ // We grab a substring of the hash because there is a 108-character limit on the length
+ // of a filepath for unix domain socket.
+ return hex.EncodeToString(pathHash[:])[:16]
+}
+
+func getDaemonFileRoot(repoRoot turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath {
+ tempDir := fs.TempDir("turbod")
+ hexHash := getRepoHash(repoRoot)
+ return tempDir.UntypedJoin(hexHash)
+}
+
+func getLogFilePath(repoRoot turbopath.AbsoluteSystemPath) (turbopath.AbsoluteSystemPath, error) {
+ hexHash := getRepoHash(repoRoot)
+ base := repoRoot.Base()
+ logFilename := fmt.Sprintf("%v-%v.log", hexHash, base)
+
+ logsDir := fs.GetTurboDataDir().UntypedJoin("logs")
+ return logsDir.UntypedJoin(logFilename), nil
+}
+
+func getUnixSocket(repoRoot turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath {
+ root := getDaemonFileRoot(repoRoot)
+ return root.UntypedJoin("turbod.sock")
+}
+
+func getPidFile(repoRoot turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath {
+ root := getDaemonFileRoot(repoRoot)
+ return root.UntypedJoin("turbod.pid")
+}
+
+// logError logs an error and outputs it to the UI.
+func (d *daemon) logError(err error) {
+ d.logger.Error(fmt.Sprintf("error %v", err))
+}
+
+// we're only appending, and we're creating the file if it doesn't exist.
+// we do not need to read the log file.
+var _logFileFlags = os.O_WRONLY | os.O_APPEND | os.O_CREATE
+
+// ExecuteDaemon executes the root daemon command
+func ExecuteDaemon(ctx context.Context, helper *cmdutil.Helper, signalWatcher *signals.Watcher, args *turbostate.ParsedArgsFromRust) error {
+ base, err := helper.GetCmdBase(args)
+ if err != nil {
+ return err
+ }
+ if args.TestRun {
+ base.UI.Info("Daemon test run successful")
+ return nil
+ }
+
+ idleTimeout := 4 * time.Hour
+ if args.Command.Daemon.IdleTimeout != "" {
+ idleTimeout, err = time.ParseDuration(args.Command.Daemon.IdleTimeout)
+ if err != nil {
+ return err
+ }
+ }
+
+ logFilePath, err := getLogFilePath(base.RepoRoot)
+ if err != nil {
+ return err
+ }
+ if err := logFilePath.EnsureDir(); err != nil {
+ return err
+ }
+ logFile, err := logFilePath.OpenFile(_logFileFlags, 0644)
+ if err != nil {
+ return err
+ }
+ defer func() { _ = logFile.Close() }()
+ logger := hclog.New(&hclog.LoggerOptions{
+ Output: io.MultiWriter(logFile, os.Stdout),
+ Level: hclog.Info,
+ Color: hclog.ColorOff,
+ Name: "turbod",
+ })
+
+ d := &daemon{
+ logger: logger,
+ repoRoot: base.RepoRoot,
+ timeout: idleTimeout,
+ reqCh: make(chan struct{}),
+ timedOutCh: make(chan struct{}),
+ }
+ serverName := getRepoHash(base.RepoRoot)
+ turboServer, err := server.New(serverName, d.logger.Named("rpc server"), base.RepoRoot, base.TurboVersion, logFilePath)
+ if err != nil {
+ d.logError(err)
+ return err
+ }
+ defer func() { _ = turboServer.Close() }()
+ err = d.runTurboServer(ctx, turboServer, signalWatcher)
+ if err != nil {
+ d.logError(err)
+ return err
+ }
+ return nil
+}
+
+var errInactivityTimeout = errors.New("turbod shut down from inactivity")
+
+// tryAcquirePidfileLock attempts to ensure that only one daemon is running from the given pid file path
+// at a time. If this process fails to write its PID to the lockfile, it must exit.
+func tryAcquirePidfileLock(pidPath turbopath.AbsoluteSystemPath) (lockfile.Lockfile, error) {
+ if err := pidPath.EnsureDir(); err != nil {
+ return "", err
+ }
+ lockFile, err := lockfile.New(pidPath.ToString())
+ if err != nil {
+ // lockfile.New should only return an error if it wasn't given an absolute path.
+ // We are attempting to use the type system to enforce that we are passing an
+ // absolute path. An error here likely means a bug, and we should crash.
+ panic(err)
+ }
+ if err := lockFile.TryLock(); err != nil {
+ return "", err
+ }
+ return lockFile, nil
+}
+
+type rpcServer interface {
+ Register(grpcServer server.GRPCServer)
+}
+
+func (d *daemon) runTurboServer(parentContext context.Context, rpcServer rpcServer, signalWatcher *signals.Watcher) error {
+ ctx, cancel := context.WithCancel(parentContext)
+ defer cancel()
+ pidPath := getPidFile(d.repoRoot)
+ lock, err := tryAcquirePidfileLock(pidPath)
+ if err != nil {
+ return errors.Wrapf(err, "failed to lock the pid file at %v. Is another turbo daemon running?", lock)
+ }
+ // When we're done serving, clean up the pid file.
+ // Also, if *this* goroutine panics, make sure we unlock the pid file.
+ defer func() {
+ if err := lock.Unlock(); err != nil {
+ d.logger.Error(errors.Wrapf(err, "failed unlocking pid file at %v", lock).Error())
+ }
+ }()
+ // This handler runs in request goroutines. If a request causes a panic,
+ // this handler will get called after a call to recover(), meaning we are
+ // no longer panicking. We return a server error and cancel our context,
+ // which triggers a shutdown of the server.
+ panicHandler := func(thePanic interface{}) error {
+ cancel()
+ d.logger.Error(fmt.Sprintf("Caught panic %v", thePanic))
+ return status.Error(codes.Internal, "server panicked")
+ }
+
+ // If we have the lock, assume that we are the owners of the socket file,
+ // whether it already exists or not. That means we are free to remove it.
+ sockPath := getUnixSocket(d.repoRoot)
+ if err := sockPath.Remove(); err != nil && !errors.Is(err, os.ErrNotExist) {
+ return err
+ }
+ d.logger.Debug(fmt.Sprintf("Using socket path %v (%v)\n", sockPath, len(sockPath)))
+ lis, err := net.Listen("unix", sockPath.ToString())
+ if err != nil {
+ return err
+ }
+ // We don't need to explicitly close 'lis', the grpc server will handle that
+ s := grpc.NewServer(
+ grpc.ChainUnaryInterceptor(
+ d.onRequest,
+ grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandler(panicHandler)),
+ ),
+ )
+ go d.timeoutLoop(ctx)
+
+ rpcServer.Register(s)
+ errCh := make(chan error)
+ go func(errCh chan<- error) {
+ if err := s.Serve(lis); err != nil {
+ errCh <- err
+ }
+ close(errCh)
+ }(errCh)
+
+ // Note that we aren't deferring s.GracefulStop here because we also need
+ // to drain the error channel, which isn't guaranteed to happen until
+ // the server has stopped. That in turn may depend on GracefulStop being
+ // called.
+ // Future work could restructure this to make that simpler.
+ var exitErr error
+ select {
+ case err, ok := <-errCh:
+ // The server exited
+ if ok {
+ exitErr = err
+ }
+ case <-d.timedOutCh:
+ // This is the inactivity timeout case
+ exitErr = errInactivityTimeout
+ s.GracefulStop()
+ case <-ctx.Done():
+ // If a request handler panics, it will cancel this context
+ s.GracefulStop()
+ case <-signalWatcher.Done():
+ // This is fired if caught a signal
+ s.GracefulStop()
+ }
+ // Wait for the server to exit, if it hasn't already.
+ // When it does, this channel will close. We don't
+ // care about the error in this scenario because we've
+ // either requested a close via cancelling the context,
+ // an inactivity timeout, or caught a signal.
+ for range errCh {
+ }
+ return exitErr
+}
+
+func (d *daemon) onRequest(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
+ d.reqCh <- struct{}{}
+ return handler(ctx, req)
+}
+
+func (d *daemon) timeoutLoop(ctx context.Context) {
+ timeoutCh := time.After(d.timeout)
+outer:
+ for {
+ select {
+ case <-d.reqCh:
+ timeoutCh = time.After(d.timeout)
+ case <-timeoutCh:
+ close(d.timedOutCh)
+ break outer
+ case <-ctx.Done():
+ break outer
+ }
+ }
+}
+
+// ClientOpts re-exports connector.Ops to encapsulate the connector package
+type ClientOpts = connector.Opts
+
+// Client re-exports connector.Client to encapsulate the connector package
+type Client = connector.Client
+
+// GetClient returns a client that can be used to interact with the daemon
+func GetClient(ctx context.Context, repoRoot turbopath.AbsoluteSystemPath, logger hclog.Logger, turboVersion string, opts ClientOpts) (*Client, error) {
+ sockPath := getUnixSocket(repoRoot)
+ pidPath := getPidFile(repoRoot)
+ logPath, err := getLogFilePath(repoRoot)
+ if err != nil {
+ return nil, err
+ }
+ bin, err := os.Executable()
+ if err != nil {
+ return nil, err
+ }
+ // The Go binary can no longer be called directly, so we need to route back to the rust wrapper
+ if strings.HasSuffix(bin, "go-turbo") {
+ bin = filepath.Join(filepath.Dir(bin), "turbo")
+ } else if strings.HasSuffix(bin, "go-turbo.exe") {
+ bin = filepath.Join(filepath.Dir(bin), "turbo.exe")
+ }
+ c := &connector.Connector{
+ Logger: logger.Named("TurbodClient"),
+ Bin: bin,
+ Opts: opts,
+ SockPath: sockPath,
+ PidPath: pidPath,
+ LogPath: logPath,
+ TurboVersion: turboVersion,
+ }
+ return c.Connect(ctx)
+}