aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/cli/internal/daemon
diff options
context:
space:
mode:
author简律纯 <hsiangnianian@outlook.com>2023-04-28 01:36:44 +0800
committer简律纯 <hsiangnianian@outlook.com>2023-04-28 01:36:44 +0800
commitdd84b9d64fb98746a230cd24233ff50a562c39c9 (patch)
treeb583261ef00b3afe72ec4d6dacb31e57779a6faf /cli/internal/daemon
parent0b46fcd72ac34382387b2bcf9095233efbcc52f4 (diff)
downloadHydroRoll-dd84b9d64fb98746a230cd24233ff50a562c39c9.tar.gz
HydroRoll-dd84b9d64fb98746a230cd24233ff50a562c39c9.zip
Diffstat (limited to 'cli/internal/daemon')
-rw-r--r--cli/internal/daemon/connector/connector.go391
-rw-r--r--cli/internal/daemon/connector/connector_test.go256
-rw-r--r--cli/internal/daemon/connector/fork.go15
-rw-r--r--cli/internal/daemon/connector/fork_windows.go15
-rw-r--r--cli/internal/daemon/daemon.go307
-rw-r--r--cli/internal/daemon/daemon_test.go262
6 files changed, 1246 insertions, 0 deletions
diff --git a/cli/internal/daemon/connector/connector.go b/cli/internal/daemon/connector/connector.go
new file mode 100644
index 0000000..d05ef59
--- /dev/null
+++ b/cli/internal/daemon/connector/connector.go
@@ -0,0 +1,391 @@
+package connector
+
+import (
+ "context"
+ "fmt"
+ "io/fs"
+ "os"
+ "os/exec"
+ "time"
+
+ "github.com/cenkalti/backoff/v4"
+ "github.com/hashicorp/go-hclog"
+ "github.com/nightlyone/lockfile"
+ "github.com/pkg/errors"
+ "github.com/vercel/turbo/cli/internal/turbodprotocol"
+ "github.com/vercel/turbo/cli/internal/turbopath"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/credentials/insecure"
+ "google.golang.org/grpc/status"
+)
+
+var (
+ // ErrFailedToStart is returned when the daemon process cannot be started
+ ErrFailedToStart = errors.New("daemon could not be started")
+ // ErrVersionMismatch is returned when the daemon process was spawned by a different version than the connecting client
+ ErrVersionMismatch = errors.New("daemon version does not match client version")
+ errConnectionFailure = errors.New("could not connect to daemon")
+ // ErrTooManyAttempts is returned when the client fails to connect too many times
+ ErrTooManyAttempts = errors.New("reached maximum number of attempts contacting daemon")
+ // ErrDaemonNotRunning is returned when the client cannot contact the daemon and has
+ // been instructed not to attempt to start a new daemon
+ ErrDaemonNotRunning = errors.New("the daemon is not running")
+)
+
+// Opts is the set of configurable options for the client connection,
+// including some options to be passed through to the daemon process if
+// it needs to be started.
+type Opts struct {
+ ServerTimeout time.Duration
+ DontStart bool // if true, don't attempt to start the daemon
+ DontKill bool // if true, don't attempt to kill the daemon
+}
+
+// Client represents a connection to the daemon process
+type Client struct {
+ turbodprotocol.TurbodClient
+ *grpc.ClientConn
+ SockPath turbopath.AbsoluteSystemPath
+ PidPath turbopath.AbsoluteSystemPath
+ LogPath turbopath.AbsoluteSystemPath
+}
+
+// Connector instances are used to create a connection to turbo's daemon process
+// The daemon will be started , or killed and restarted, if necessary
+type Connector struct {
+ Logger hclog.Logger
+ Bin string
+ Opts Opts
+ SockPath turbopath.AbsoluteSystemPath
+ PidPath turbopath.AbsoluteSystemPath
+ LogPath turbopath.AbsoluteSystemPath
+ TurboVersion string
+}
+
+// ConnectionError is returned in the error case from connect. It wraps the underlying
+// cause and adds a message with the relevant files for the user to check.
+type ConnectionError struct {
+ SockPath turbopath.AbsoluteSystemPath
+ PidPath turbopath.AbsoluteSystemPath
+ LogPath turbopath.AbsoluteSystemPath
+ cause error
+}
+
+func (ce *ConnectionError) Error() string {
+ return fmt.Sprintf(`connection to turbo daemon process failed. Please ensure the following:
+ - the process identified by the pid in the file at %v is not running, and remove %v
+ - check the logs at %v
+ - the unix domain socket at %v has been removed
+ You can also run without the daemon process by passing --no-daemon`, ce.PidPath, ce.PidPath, ce.LogPath, ce.SockPath)
+}
+
+// Unwrap allows a connection error to work with standard library "errors" and compatible packages
+func (ce *ConnectionError) Unwrap() error {
+ return ce.cause
+}
+
+func (c *Connector) wrapConnectionError(err error) error {
+ return &ConnectionError{
+ SockPath: c.SockPath,
+ PidPath: c.PidPath,
+ LogPath: c.LogPath,
+ cause: err,
+ }
+}
+
+// lockFile returns a pointer to where a lockfile should be.
+// lockfile.New does not perform IO and the only error it produces
+// is in the case a non-absolute path was provided. We're guaranteeing an
+// turbopath.AbsoluteSystemPath, so an error here is an indication of a bug and
+// we should crash.
+func (c *Connector) lockFile() lockfile.Lockfile {
+ lockFile, err := lockfile.New(c.PidPath.ToString())
+ if err != nil {
+ panic(err)
+ }
+ return lockFile
+}
+
+func (c *Connector) addr() string {
+ // grpc special-cases parsing of unix:<path> urls
+ // to avoid url.Parse. This lets us pass through our absolute
+ // paths unmodified, even on windows.
+ // See code here: https://github.com/grpc/grpc-go/blob/d83070ec0d9043f713b6a63e1963c593b447208c/internal/transport/http_util.go#L392
+ return fmt.Sprintf("unix:%v", c.SockPath.ToString())
+}
+
+// We defer to the daemon's pid file as the locking mechanism.
+// If it doesn't exist, we will attempt to start the daemon.
+// If the daemon has a different version, ask it to shut down.
+// If the pid file exists but we can't connect, try to kill
+// the daemon.
+// If we can't cause the daemon to remove the pid file, report
+// an error to the user that includes the file location so that
+// they can resolve it.
+const (
+ _maxAttempts = 3
+ _shutdownTimeout = 1 * time.Second
+ _socketPollTimeout = 1 * time.Second
+)
+
+// killLiveServer tells a running server to shut down. This method is also responsible
+// for closing this client connection.
+func (c *Connector) killLiveServer(ctx context.Context, client *Client, serverPid int) error {
+ defer func() { _ = client.Close() }()
+
+ _, err := client.Shutdown(ctx, &turbodprotocol.ShutdownRequest{})
+ if err != nil {
+ c.Logger.Error(fmt.Sprintf("failed to shutdown running daemon. attempting to force it closed: %v", err))
+ return c.killDeadServer(serverPid)
+ }
+ // Wait for the server to gracefully exit
+ err = backoff.Retry(func() error {
+ lockFile := c.lockFile()
+ owner, err := lockFile.GetOwner()
+ if os.IsNotExist(err) {
+ // If there is no pid more file, we can conclude that the daemon successfully
+ // exited and cleaned up after itself.
+ return nil
+ } else if err != nil {
+ // some other error occurred getting the lockfile owner
+ return backoff.Permanent(err)
+ } else if owner.Pid == serverPid {
+ // // We're still waiting for the server to shut down
+ return errNeedsRetry
+ }
+ // if there's no error and the lockfile has a new pid, someone else must've started a new daemon.
+ // Consider the old one killed and move on.
+ return nil
+ }, backoffWithTimeout(_shutdownTimeout))
+ if errors.Is(err, errNeedsRetry) {
+ c.Logger.Error(fmt.Sprintf("daemon did not exit after %v, attempting to force it closed", _shutdownTimeout.String()))
+ return c.killDeadServer(serverPid)
+ } else if err != nil {
+ return err
+ }
+ return nil
+}
+
+func (c *Connector) killDeadServer(pid int) error {
+ // currently the only error that this constructor returns is
+ // in the case that you don't provide an absolute path.
+ // Given that we require an absolute path as input, this should
+ // hopefully never happen.
+ lockFile := c.lockFile()
+ process, err := lockFile.GetOwner()
+ if err == nil {
+ // Check that this is the same process that we failed to connect to.
+ // Otherwise, connectInternal will loop around again and start with whatever
+ // new process has the pid file.
+ if process.Pid == pid {
+ // we have a process that we need to kill
+ // TODO(gsoltis): graceful kill? the process is already not responding to requests,
+ // but it could be in the middle of a graceful shutdown. Probably should let it clean
+ // itself up, and report an error and defer to a force-kill by the user
+ if err := process.Kill(); err != nil {
+ return err
+ }
+ }
+ return nil
+ } else if errors.Is(err, os.ErrNotExist) {
+ // There's no pid file. Someone else killed it. Returning no error will cause the
+ // connectInternal to loop around and try the connection again.
+ return nil
+ }
+ return err
+}
+
+// Connect attempts to create a connection to a turbo daemon.
+// Retries and daemon restarts are built in. If this fails,
+// it is unlikely to succeed after an automated retry.
+func (c *Connector) Connect(ctx context.Context) (*Client, error) {
+ client, err := c.connectInternal(ctx)
+ if err != nil {
+ return nil, c.wrapConnectionError(err)
+ }
+ return client, nil
+}
+
+func (c *Connector) connectInternal(ctx context.Context) (*Client, error) {
+ // for each attempt, we:
+ // 1. try to find or start a daemon process, getting its pid
+ // 2. wait for the unix domain socket file to appear
+ // 3. connect to the unix domain socket. Note that this connection is not validated
+ // 4. send a hello message. This validates the connection as a side effect of
+ // negotiating versions, which currently requires exact match.
+ // In the event of a live, but incompatible server, we attempt to shut it down and start
+ // a new one. In the event of an unresponsive server, we attempt to kill the process
+ // identified by the pid file, with the hope that it will clean up after itself.
+ // Failures include details about where to find logs, the pid file, and the socket file.
+ for i := 0; i < _maxAttempts; i++ {
+ serverPid, err := c.getOrStartDaemon()
+ if err != nil {
+ // If we fail to even start the daemon process, return immediately, we're unlikely
+ // to succeed without user intervention
+ return nil, err
+ }
+ if err := c.waitForSocket(); errors.Is(err, ErrFailedToStart) {
+ // If we didn't see the socket file, try again. It's possible that
+ // the daemon encountered an transitory error
+ continue
+ } else if err != nil {
+ return nil, err
+ }
+ client, err := c.getClientConn()
+ if err != nil {
+ return nil, err
+ }
+ if err := c.sendHello(ctx, client); err == nil {
+ // We connected and negotiated a version, we're all set
+ return client, nil
+ } else if errors.Is(err, ErrVersionMismatch) {
+ // We don't want to knock down a perfectly fine daemon in a status check.
+ if c.Opts.DontKill {
+ return nil, err
+ }
+
+ // We now know we aren't going to return this client,
+ // but killLiveServer still needs it to send the Shutdown request.
+ // killLiveServer will close the client when it is done with it.
+ if err := c.killLiveServer(ctx, client, serverPid); err != nil {
+ return nil, err
+ }
+ // Loops back around and tries again.
+ } else if errors.Is(err, errConnectionFailure) {
+ // close the client, see if we can kill the stale daemon
+ _ = client.Close()
+ if err := c.killDeadServer(serverPid); err != nil {
+ return nil, err
+ }
+ // if we successfully killed the dead server, loop around and try again
+ } else if err != nil {
+ // Some other error occurred, close the client and
+ // report the error to the user
+ if closeErr := client.Close(); closeErr != nil {
+ // In the event that we fail to close the client, bundle that error along also.
+ // Keep the original error in the error chain, as it's more likely to be useful
+ // or needed for matching on later.
+ err = errors.Wrapf(err, "also failed to close client connection: %v", closeErr)
+ }
+ return nil, err
+ }
+ }
+ return nil, ErrTooManyAttempts
+}
+
+// getOrStartDaemon returns the PID of the daemon process on success. It may start
+// the daemon if it doesn't find one running.
+func (c *Connector) getOrStartDaemon() (int, error) {
+ lockFile := c.lockFile()
+ daemonProcess, getDaemonProcessErr := lockFile.GetOwner()
+ if getDaemonProcessErr != nil {
+ // If we're in a clean state this isn't an "error" per se.
+ // We attempt to start a daemon.
+ if errors.Is(getDaemonProcessErr, fs.ErrNotExist) {
+ if c.Opts.DontStart {
+ return 0, ErrDaemonNotRunning
+ }
+ pid, startDaemonErr := c.startDaemon()
+ if startDaemonErr != nil {
+ return 0, startDaemonErr
+ }
+ return pid, nil
+ }
+
+ // We could have hit any number of errors.
+ // - Failed to read the file for permission reasons.
+ // - User emptied the file's contents.
+ // - etc.
+ return 0, errors.Wrapf(getDaemonProcessErr, "An issue was encountered with the pid file. Please remove it and try again: %v", c.PidPath)
+ }
+
+ return daemonProcess.Pid, nil
+}
+
+func (c *Connector) getClientConn() (*Client, error) {
+ creds := insecure.NewCredentials()
+ conn, err := grpc.Dial(c.addr(), grpc.WithTransportCredentials(creds))
+ if err != nil {
+ return nil, err
+ }
+ tc := turbodprotocol.NewTurbodClient(conn)
+ return &Client{
+ TurbodClient: tc,
+ ClientConn: conn,
+ SockPath: c.SockPath,
+ PidPath: c.PidPath,
+ LogPath: c.LogPath,
+ }, nil
+}
+
+func (c *Connector) sendHello(ctx context.Context, client turbodprotocol.TurbodClient) error {
+ _, err := client.Hello(ctx, &turbodprotocol.HelloRequest{
+ Version: c.TurboVersion,
+ // TODO: add session id
+ })
+ status := status.Convert(err)
+ switch status.Code() {
+ case codes.OK:
+ return nil
+ case codes.FailedPrecondition:
+ return ErrVersionMismatch
+ case codes.Unavailable:
+ return errConnectionFailure
+ default:
+ return err
+ }
+}
+
+var errNeedsRetry = errors.New("retry the operation")
+
+// backoffWithTimeout returns an exponential backoff, starting at 2ms and doubling until
+// the specific timeout has elapsed. Note that backoff instances are stateful, so we need
+// a new one each time we do a Retry.
+func backoffWithTimeout(timeout time.Duration) *backoff.ExponentialBackOff {
+ return &backoff.ExponentialBackOff{
+ Multiplier: 2,
+ InitialInterval: 2 * time.Millisecond,
+ MaxElapsedTime: timeout,
+ Clock: backoff.SystemClock,
+ Stop: backoff.Stop,
+ }
+}
+
+// waitForSocket waits for the unix domain socket to appear
+func (c *Connector) waitForSocket() error {
+ // Note that we don't care if this is our daemon
+ // or not. We started a process, but someone else could beat
+ // use to listening. That's fine, we'll check the version
+ // later.
+ err := backoff.Retry(func() error {
+ if !c.SockPath.FileExists() {
+ return errNeedsRetry
+ }
+ return nil
+ }, backoffWithTimeout(_socketPollTimeout))
+ if errors.Is(err, errNeedsRetry) {
+ return ErrFailedToStart
+ } else if err != nil {
+ return err
+ }
+ return nil
+}
+
+// startDaemon starts the daemon and returns the pid for the new process
+func (c *Connector) startDaemon() (int, error) {
+ args := []string{"daemon"}
+ if c.Opts.ServerTimeout != 0 {
+ args = append(args, fmt.Sprintf("--idle-time=%v", c.Opts.ServerTimeout.String()))
+ }
+ c.Logger.Debug(fmt.Sprintf("starting turbod binary %v", c.Bin))
+ cmd := exec.Command(c.Bin, args...)
+ // For the daemon to have its own process group id so that any attempts
+ // to kill it and its process tree don't kill this client.
+ cmd.SysProcAttr = getSysProcAttrs()
+ err := cmd.Start()
+ if err != nil {
+ return 0, err
+ }
+ return cmd.Process.Pid, nil
+}
diff --git a/cli/internal/daemon/connector/connector_test.go b/cli/internal/daemon/connector/connector_test.go
new file mode 100644
index 0000000..62b4504
--- /dev/null
+++ b/cli/internal/daemon/connector/connector_test.go
@@ -0,0 +1,256 @@
+package connector
+
+import (
+ "context"
+ "errors"
+ "net"
+ "os/exec"
+ "runtime"
+ "strconv"
+ "testing"
+
+ "github.com/hashicorp/go-hclog"
+ "github.com/nightlyone/lockfile"
+ "github.com/vercel/turbo/cli/internal/fs"
+ "github.com/vercel/turbo/cli/internal/turbodprotocol"
+ "github.com/vercel/turbo/cli/internal/turbopath"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/credentials/insecure"
+ "google.golang.org/grpc/status"
+ "google.golang.org/grpc/test/bufconn"
+ "gotest.tools/v3/assert"
+)
+
+// testBin returns a platform-appropriate executable to run node.
+// Node works here as an arbitrary process to start, since it's
+// required for turbo development. It will obviously not implement
+// our grpc service, use a mockServer instance where that's needed.
+func testBin() string {
+ if runtime.GOOS == "windows" {
+ return "node.exe"
+ }
+ return "node"
+}
+
+func getUnixSocket(dir turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath {
+ return dir.UntypedJoin("turbod-test.sock")
+}
+
+func getPidFile(dir turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath {
+ return dir.UntypedJoin("turbod-test.pid")
+}
+
+func TestGetOrStartDaemonInvalidPIDFile(t *testing.T) {
+ logger := hclog.Default()
+ dir := t.TempDir()
+ dirPath := fs.AbsoluteSystemPathFromUpstream(dir)
+
+ pidPath := getPidFile(dirPath)
+ writeFileErr := pidPath.WriteFile(nil, 0777)
+ assert.NilError(t, writeFileErr, "WriteFile")
+
+ c := &Connector{
+ Logger: logger,
+ Opts: Opts{},
+ PidPath: pidPath,
+ }
+
+ pid, err := c.getOrStartDaemon()
+ assert.Equal(t, pid, 0)
+ assert.ErrorContains(t, err, "issue was encountered with the pid file")
+}
+
+func TestConnectFailsWithoutGrpcServer(t *testing.T) {
+ // We aren't starting a server that is going to write
+ // to our socket file, so we should see a series of connection
+ // failures, followed by ErrTooManyAttempts
+ logger := hclog.Default()
+ dir := t.TempDir()
+ dirPath := fs.AbsoluteSystemPathFromUpstream(dir)
+
+ sockPath := getUnixSocket(dirPath)
+ pidPath := getPidFile(dirPath)
+ ctx := context.Background()
+ bin := testBin()
+ c := &Connector{
+ Logger: logger,
+ Bin: bin,
+ Opts: Opts{},
+ SockPath: sockPath,
+ PidPath: pidPath,
+ }
+ // Note that we expect ~3s here, for 3 attempts with a timeout of 1s
+ _, err := c.connectInternal(ctx)
+ assert.ErrorIs(t, err, ErrTooManyAttempts)
+}
+
+func TestKillDeadServerNoPid(t *testing.T) {
+ logger := hclog.Default()
+ dir := t.TempDir()
+ dirPath := fs.AbsoluteSystemPathFromUpstream(dir)
+
+ sockPath := getUnixSocket(dirPath)
+ pidPath := getPidFile(dirPath)
+ c := &Connector{
+ Logger: logger,
+ Bin: "nonexistent",
+ Opts: Opts{},
+ SockPath: sockPath,
+ PidPath: pidPath,
+ }
+
+ err := c.killDeadServer(99999)
+ assert.NilError(t, err, "killDeadServer")
+}
+
+func TestKillDeadServerNoProcess(t *testing.T) {
+ logger := hclog.Default()
+ dir := t.TempDir()
+ dirPath := fs.AbsoluteSystemPathFromUpstream(dir)
+
+ sockPath := getUnixSocket(dirPath)
+ pidPath := getPidFile(dirPath)
+ // Simulate the socket already existing, with no live daemon
+ err := sockPath.WriteFile([]byte("junk"), 0644)
+ assert.NilError(t, err, "WriteFile")
+ err = pidPath.WriteFile([]byte("99999"), 0644)
+ assert.NilError(t, err, "WriteFile")
+ c := &Connector{
+ Logger: logger,
+ Bin: "nonexistent",
+ Opts: Opts{},
+ SockPath: sockPath,
+ PidPath: pidPath,
+ }
+
+ err = c.killDeadServer(99999)
+ assert.ErrorIs(t, err, lockfile.ErrDeadOwner)
+ stillExists := pidPath.FileExists()
+ if !stillExists {
+ t.Error("pidPath should still exist, expected the user to clean it up")
+ }
+}
+
+func TestKillDeadServerWithProcess(t *testing.T) {
+ logger := hclog.Default()
+ dir := t.TempDir()
+ dirPath := fs.AbsoluteSystemPathFromUpstream(dir)
+
+ sockPath := getUnixSocket(dirPath)
+ pidPath := getPidFile(dirPath)
+ // Simulate the socket already existing, with no live daemon
+ err := sockPath.WriteFile([]byte("junk"), 0644)
+ assert.NilError(t, err, "WriteFile")
+ bin := testBin()
+ cmd := exec.Command(bin)
+ err = cmd.Start()
+ assert.NilError(t, err, "cmd.Start")
+ pid := cmd.Process.Pid
+ if pid == 0 {
+ t.Fatalf("failed to start process %v", bin)
+ }
+
+ err = pidPath.WriteFile([]byte(strconv.Itoa(pid)), 0644)
+ assert.NilError(t, err, "WriteFile")
+ c := &Connector{
+ Logger: logger,
+ Bin: "nonexistent",
+ Opts: Opts{},
+ SockPath: sockPath,
+ PidPath: pidPath,
+ }
+
+ err = c.killDeadServer(pid)
+ assert.NilError(t, err, "killDeadServer")
+ stillExists := pidPath.FileExists()
+ if !stillExists {
+ t.Error("pidPath no longer exists, expected client to not clean it up")
+ }
+ err = cmd.Wait()
+ exitErr := &exec.ExitError{}
+ if !errors.As(err, &exitErr) {
+ t.Errorf("expected an exit error from %v, got %v", bin, err)
+ }
+}
+
+type mockServer struct {
+ turbodprotocol.UnimplementedTurbodServer
+ helloErr error
+ shutdownResp *turbodprotocol.ShutdownResponse
+ pidFile turbopath.AbsoluteSystemPath
+}
+
+// Simulates server exiting by cleaning up the pid file
+func (s *mockServer) Shutdown(ctx context.Context, req *turbodprotocol.ShutdownRequest) (*turbodprotocol.ShutdownResponse, error) {
+ if err := s.pidFile.Remove(); err != nil {
+ return nil, err
+ }
+ return s.shutdownResp, nil
+}
+
+func (s *mockServer) Hello(ctx context.Context, req *turbodprotocol.HelloRequest) (*turbodprotocol.HelloResponse, error) {
+ if req.Version == "" {
+ return nil, errors.New("missing version")
+ }
+ return nil, s.helloErr
+}
+
+func TestKillLiveServer(t *testing.T) {
+ logger := hclog.Default()
+ dir := t.TempDir()
+ dirPath := fs.AbsoluteSystemPathFromUpstream(dir)
+
+ sockPath := getUnixSocket(dirPath)
+ pidPath := getPidFile(dirPath)
+ err := pidPath.WriteFile([]byte("99999"), 0644)
+ assert.NilError(t, err, "WriteFile")
+
+ ctx := context.Background()
+ c := &Connector{
+ Logger: logger,
+ Bin: "nonexistent",
+ Opts: Opts{},
+ SockPath: sockPath,
+ PidPath: pidPath,
+ TurboVersion: "some-version",
+ }
+
+ st := status.New(codes.FailedPrecondition, "version mismatch")
+ mock := &mockServer{
+ shutdownResp: &turbodprotocol.ShutdownResponse{},
+ helloErr: st.Err(),
+ pidFile: pidPath,
+ }
+ lis := bufconn.Listen(1024 * 1024)
+ grpcServer := grpc.NewServer()
+ turbodprotocol.RegisterTurbodServer(grpcServer, mock)
+ go func(t *testing.T) {
+ if err := grpcServer.Serve(lis); err != nil {
+ t.Logf("server closed: %v", err)
+ }
+ }(t)
+
+ conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) {
+ return lis.Dial()
+ }), grpc.WithTransportCredentials(insecure.NewCredentials()))
+ assert.NilError(t, err, "DialContext")
+ turboClient := turbodprotocol.NewTurbodClient(conn)
+ client := &Client{
+ TurbodClient: turboClient,
+ ClientConn: conn,
+ }
+ err = c.sendHello(ctx, client)
+ if !errors.Is(err, ErrVersionMismatch) {
+ t.Errorf("sendHello error got %v, want %v", err, ErrVersionMismatch)
+ }
+ err = c.killLiveServer(ctx, client, 99999)
+ assert.NilError(t, err, "killLiveServer")
+ // Expect the pid file and socket files to have been cleaned up
+ if pidPath.FileExists() {
+ t.Errorf("expected pid file to have been deleted: %v", pidPath)
+ }
+ if sockPath.FileExists() {
+ t.Errorf("expected socket file to have been deleted: %v", sockPath)
+ }
+}
diff --git a/cli/internal/daemon/connector/fork.go b/cli/internal/daemon/connector/fork.go
new file mode 100644
index 0000000..8a6d01d
--- /dev/null
+++ b/cli/internal/daemon/connector/fork.go
@@ -0,0 +1,15 @@
+//go:build !windows
+// +build !windows
+
+package connector
+
+import "syscall"
+
+// getSysProcAttrs returns the platform-specific attributes we want to
+// use while forking the daemon process. Currently this is limited to
+// forcing a new process group
+func getSysProcAttrs() *syscall.SysProcAttr {
+ return &syscall.SysProcAttr{
+ Setpgid: true,
+ }
+}
diff --git a/cli/internal/daemon/connector/fork_windows.go b/cli/internal/daemon/connector/fork_windows.go
new file mode 100644
index 0000000..b9d6e77
--- /dev/null
+++ b/cli/internal/daemon/connector/fork_windows.go
@@ -0,0 +1,15 @@
+//go:build windows
+// +build windows
+
+package connector
+
+import "syscall"
+
+// getSysProcAttrs returns the platform-specific attributes we want to
+// use while forking the daemon process. Currently this is limited to
+// forcing a new process group
+func getSysProcAttrs() *syscall.SysProcAttr {
+ return &syscall.SysProcAttr{
+ CreationFlags: syscall.CREATE_NEW_PROCESS_GROUP,
+ }
+}
diff --git a/cli/internal/daemon/daemon.go b/cli/internal/daemon/daemon.go
new file mode 100644
index 0000000..81d5283
--- /dev/null
+++ b/cli/internal/daemon/daemon.go
@@ -0,0 +1,307 @@
+package daemon
+
+import (
+ "context"
+ "crypto/sha256"
+ "encoding/hex"
+ "fmt"
+ "io"
+ "net"
+ "os"
+ "path/filepath"
+ "strings"
+ "time"
+
+ grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
+ "github.com/hashicorp/go-hclog"
+ "github.com/nightlyone/lockfile"
+ "github.com/pkg/errors"
+ "github.com/vercel/turbo/cli/internal/cmdutil"
+ "github.com/vercel/turbo/cli/internal/daemon/connector"
+ "github.com/vercel/turbo/cli/internal/fs"
+ "github.com/vercel/turbo/cli/internal/server"
+ "github.com/vercel/turbo/cli/internal/signals"
+ "github.com/vercel/turbo/cli/internal/turbopath"
+ "github.com/vercel/turbo/cli/internal/turbostate"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+type daemon struct {
+ logger hclog.Logger
+ repoRoot turbopath.AbsoluteSystemPath
+ timeout time.Duration
+ reqCh chan struct{}
+ timedOutCh chan struct{}
+}
+
+func getRepoHash(repoRoot turbopath.AbsoluteSystemPath) string {
+ pathHash := sha256.Sum256([]byte(repoRoot.ToString()))
+ // We grab a substring of the hash because there is a 108-character limit on the length
+ // of a filepath for unix domain socket.
+ return hex.EncodeToString(pathHash[:])[:16]
+}
+
+func getDaemonFileRoot(repoRoot turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath {
+ tempDir := fs.TempDir("turbod")
+ hexHash := getRepoHash(repoRoot)
+ return tempDir.UntypedJoin(hexHash)
+}
+
+func getLogFilePath(repoRoot turbopath.AbsoluteSystemPath) (turbopath.AbsoluteSystemPath, error) {
+ hexHash := getRepoHash(repoRoot)
+ base := repoRoot.Base()
+ logFilename := fmt.Sprintf("%v-%v.log", hexHash, base)
+
+ logsDir := fs.GetTurboDataDir().UntypedJoin("logs")
+ return logsDir.UntypedJoin(logFilename), nil
+}
+
+func getUnixSocket(repoRoot turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath {
+ root := getDaemonFileRoot(repoRoot)
+ return root.UntypedJoin("turbod.sock")
+}
+
+func getPidFile(repoRoot turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath {
+ root := getDaemonFileRoot(repoRoot)
+ return root.UntypedJoin("turbod.pid")
+}
+
+// logError logs an error and outputs it to the UI.
+func (d *daemon) logError(err error) {
+ d.logger.Error(fmt.Sprintf("error %v", err))
+}
+
+// we're only appending, and we're creating the file if it doesn't exist.
+// we do not need to read the log file.
+var _logFileFlags = os.O_WRONLY | os.O_APPEND | os.O_CREATE
+
+// ExecuteDaemon executes the root daemon command
+func ExecuteDaemon(ctx context.Context, helper *cmdutil.Helper, signalWatcher *signals.Watcher, args *turbostate.ParsedArgsFromRust) error {
+ base, err := helper.GetCmdBase(args)
+ if err != nil {
+ return err
+ }
+ if args.TestRun {
+ base.UI.Info("Daemon test run successful")
+ return nil
+ }
+
+ idleTimeout := 4 * time.Hour
+ if args.Command.Daemon.IdleTimeout != "" {
+ idleTimeout, err = time.ParseDuration(args.Command.Daemon.IdleTimeout)
+ if err != nil {
+ return err
+ }
+ }
+
+ logFilePath, err := getLogFilePath(base.RepoRoot)
+ if err != nil {
+ return err
+ }
+ if err := logFilePath.EnsureDir(); err != nil {
+ return err
+ }
+ logFile, err := logFilePath.OpenFile(_logFileFlags, 0644)
+ if err != nil {
+ return err
+ }
+ defer func() { _ = logFile.Close() }()
+ logger := hclog.New(&hclog.LoggerOptions{
+ Output: io.MultiWriter(logFile, os.Stdout),
+ Level: hclog.Info,
+ Color: hclog.ColorOff,
+ Name: "turbod",
+ })
+
+ d := &daemon{
+ logger: logger,
+ repoRoot: base.RepoRoot,
+ timeout: idleTimeout,
+ reqCh: make(chan struct{}),
+ timedOutCh: make(chan struct{}),
+ }
+ serverName := getRepoHash(base.RepoRoot)
+ turboServer, err := server.New(serverName, d.logger.Named("rpc server"), base.RepoRoot, base.TurboVersion, logFilePath)
+ if err != nil {
+ d.logError(err)
+ return err
+ }
+ defer func() { _ = turboServer.Close() }()
+ err = d.runTurboServer(ctx, turboServer, signalWatcher)
+ if err != nil {
+ d.logError(err)
+ return err
+ }
+ return nil
+}
+
+var errInactivityTimeout = errors.New("turbod shut down from inactivity")
+
+// tryAcquirePidfileLock attempts to ensure that only one daemon is running from the given pid file path
+// at a time. If this process fails to write its PID to the lockfile, it must exit.
+func tryAcquirePidfileLock(pidPath turbopath.AbsoluteSystemPath) (lockfile.Lockfile, error) {
+ if err := pidPath.EnsureDir(); err != nil {
+ return "", err
+ }
+ lockFile, err := lockfile.New(pidPath.ToString())
+ if err != nil {
+ // lockfile.New should only return an error if it wasn't given an absolute path.
+ // We are attempting to use the type system to enforce that we are passing an
+ // absolute path. An error here likely means a bug, and we should crash.
+ panic(err)
+ }
+ if err := lockFile.TryLock(); err != nil {
+ return "", err
+ }
+ return lockFile, nil
+}
+
+type rpcServer interface {
+ Register(grpcServer server.GRPCServer)
+}
+
+func (d *daemon) runTurboServer(parentContext context.Context, rpcServer rpcServer, signalWatcher *signals.Watcher) error {
+ ctx, cancel := context.WithCancel(parentContext)
+ defer cancel()
+ pidPath := getPidFile(d.repoRoot)
+ lock, err := tryAcquirePidfileLock(pidPath)
+ if err != nil {
+ return errors.Wrapf(err, "failed to lock the pid file at %v. Is another turbo daemon running?", lock)
+ }
+ // When we're done serving, clean up the pid file.
+ // Also, if *this* goroutine panics, make sure we unlock the pid file.
+ defer func() {
+ if err := lock.Unlock(); err != nil {
+ d.logger.Error(errors.Wrapf(err, "failed unlocking pid file at %v", lock).Error())
+ }
+ }()
+ // This handler runs in request goroutines. If a request causes a panic,
+ // this handler will get called after a call to recover(), meaning we are
+ // no longer panicking. We return a server error and cancel our context,
+ // which triggers a shutdown of the server.
+ panicHandler := func(thePanic interface{}) error {
+ cancel()
+ d.logger.Error(fmt.Sprintf("Caught panic %v", thePanic))
+ return status.Error(codes.Internal, "server panicked")
+ }
+
+ // If we have the lock, assume that we are the owners of the socket file,
+ // whether it already exists or not. That means we are free to remove it.
+ sockPath := getUnixSocket(d.repoRoot)
+ if err := sockPath.Remove(); err != nil && !errors.Is(err, os.ErrNotExist) {
+ return err
+ }
+ d.logger.Debug(fmt.Sprintf("Using socket path %v (%v)\n", sockPath, len(sockPath)))
+ lis, err := net.Listen("unix", sockPath.ToString())
+ if err != nil {
+ return err
+ }
+ // We don't need to explicitly close 'lis', the grpc server will handle that
+ s := grpc.NewServer(
+ grpc.ChainUnaryInterceptor(
+ d.onRequest,
+ grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandler(panicHandler)),
+ ),
+ )
+ go d.timeoutLoop(ctx)
+
+ rpcServer.Register(s)
+ errCh := make(chan error)
+ go func(errCh chan<- error) {
+ if err := s.Serve(lis); err != nil {
+ errCh <- err
+ }
+ close(errCh)
+ }(errCh)
+
+ // Note that we aren't deferring s.GracefulStop here because we also need
+ // to drain the error channel, which isn't guaranteed to happen until
+ // the server has stopped. That in turn may depend on GracefulStop being
+ // called.
+ // Future work could restructure this to make that simpler.
+ var exitErr error
+ select {
+ case err, ok := <-errCh:
+ // The server exited
+ if ok {
+ exitErr = err
+ }
+ case <-d.timedOutCh:
+ // This is the inactivity timeout case
+ exitErr = errInactivityTimeout
+ s.GracefulStop()
+ case <-ctx.Done():
+ // If a request handler panics, it will cancel this context
+ s.GracefulStop()
+ case <-signalWatcher.Done():
+ // This is fired if caught a signal
+ s.GracefulStop()
+ }
+ // Wait for the server to exit, if it hasn't already.
+ // When it does, this channel will close. We don't
+ // care about the error in this scenario because we've
+ // either requested a close via cancelling the context,
+ // an inactivity timeout, or caught a signal.
+ for range errCh {
+ }
+ return exitErr
+}
+
+func (d *daemon) onRequest(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
+ d.reqCh <- struct{}{}
+ return handler(ctx, req)
+}
+
+func (d *daemon) timeoutLoop(ctx context.Context) {
+ timeoutCh := time.After(d.timeout)
+outer:
+ for {
+ select {
+ case <-d.reqCh:
+ timeoutCh = time.After(d.timeout)
+ case <-timeoutCh:
+ close(d.timedOutCh)
+ break outer
+ case <-ctx.Done():
+ break outer
+ }
+ }
+}
+
+// ClientOpts re-exports connector.Ops to encapsulate the connector package
+type ClientOpts = connector.Opts
+
+// Client re-exports connector.Client to encapsulate the connector package
+type Client = connector.Client
+
+// GetClient returns a client that can be used to interact with the daemon
+func GetClient(ctx context.Context, repoRoot turbopath.AbsoluteSystemPath, logger hclog.Logger, turboVersion string, opts ClientOpts) (*Client, error) {
+ sockPath := getUnixSocket(repoRoot)
+ pidPath := getPidFile(repoRoot)
+ logPath, err := getLogFilePath(repoRoot)
+ if err != nil {
+ return nil, err
+ }
+ bin, err := os.Executable()
+ if err != nil {
+ return nil, err
+ }
+ // The Go binary can no longer be called directly, so we need to route back to the rust wrapper
+ if strings.HasSuffix(bin, "go-turbo") {
+ bin = filepath.Join(filepath.Dir(bin), "turbo")
+ } else if strings.HasSuffix(bin, "go-turbo.exe") {
+ bin = filepath.Join(filepath.Dir(bin), "turbo.exe")
+ }
+ c := &connector.Connector{
+ Logger: logger.Named("TurbodClient"),
+ Bin: bin,
+ Opts: opts,
+ SockPath: sockPath,
+ PidPath: pidPath,
+ LogPath: logPath,
+ TurboVersion: turboVersion,
+ }
+ return c.Connect(ctx)
+}
diff --git a/cli/internal/daemon/daemon_test.go b/cli/internal/daemon/daemon_test.go
new file mode 100644
index 0000000..66a714d
--- /dev/null
+++ b/cli/internal/daemon/daemon_test.go
@@ -0,0 +1,262 @@
+package daemon
+
+import (
+ "context"
+ "errors"
+ "os/exec"
+ "runtime"
+ "strconv"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/hashicorp/go-hclog"
+ "github.com/nightlyone/lockfile"
+ "github.com/vercel/turbo/cli/internal/fs"
+ "github.com/vercel/turbo/cli/internal/server"
+ "github.com/vercel/turbo/cli/internal/signals"
+ "github.com/vercel/turbo/cli/internal/turbopath"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+ "google.golang.org/grpc/test/grpc_testing"
+ "gotest.tools/v3/assert"
+)
+
+// testBin returns a platform-appropriate node binary.
+// We need some process to be running and findable by the
+// lockfile library, and we don't particularly care what it is.
+// Since node is required for turbo development, it makes a decent
+// candidate.
+func testBin() string {
+ if runtime.GOOS == "windows" {
+ return "node.exe"
+ }
+ return "node"
+}
+
+func TestPidFileLock(t *testing.T) {
+ repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir())
+
+ pidPath := getPidFile(repoRoot)
+ // the lockfile library handles removing pids from dead owners
+ _, err := tryAcquirePidfileLock(pidPath)
+ assert.NilError(t, err, "acquirePidLock")
+
+ // Start up a node process and fake a pid file for it.
+ // Ensure that we can't start the daemon while the node process is live
+ bin := testBin()
+ node := exec.Command(bin)
+ err = node.Start()
+ assert.NilError(t, err, "Start")
+ stopNode := func() error {
+ if err := node.Process.Kill(); err != nil {
+ return err
+ }
+ // We expect an error from node, we just sent a kill signal
+ _ = node.Wait()
+ return nil
+ }
+ // In case we fail the test, still try to kill the node process
+ t.Cleanup(func() { _ = stopNode() })
+ nodePid := node.Process.Pid
+ err = pidPath.WriteFile([]byte(strconv.Itoa(nodePid)), 0644)
+ assert.NilError(t, err, "WriteFile")
+
+ _, err = tryAcquirePidfileLock(pidPath)
+ assert.ErrorIs(t, err, lockfile.ErrBusy)
+
+ // Stop the node process, but leave the pid file there
+ // This simulates a crash
+ err = stopNode()
+ assert.NilError(t, err, "stopNode")
+ // the lockfile library handles removing pids from dead owners
+ _, err = tryAcquirePidfileLock(pidPath)
+ assert.NilError(t, err, "acquirePidLock")
+}
+
+type testRPCServer struct {
+ grpc_testing.UnimplementedTestServiceServer
+ registered chan struct{}
+}
+
+func (ts *testRPCServer) EmptyCall(ctx context.Context, req *grpc_testing.Empty) (*grpc_testing.Empty, error) {
+ panic("intended to panic")
+}
+
+func (ts *testRPCServer) Register(grpcServer server.GRPCServer) {
+ grpc_testing.RegisterTestServiceServer(grpcServer, ts)
+ ts.registered <- struct{}{}
+}
+
+func newTestRPCServer() *testRPCServer {
+ return &testRPCServer{
+ registered: make(chan struct{}, 1),
+ }
+}
+
+func waitForFile(t *testing.T, filename turbopath.AbsoluteSystemPath, timeout time.Duration) {
+ t.Helper()
+ deadline := time.After(timeout)
+outer:
+ for !filename.FileExists() {
+ select {
+ case <-deadline:
+ break outer
+ case <-time.After(10 * time.Millisecond):
+ }
+ }
+ if !filename.FileExists() {
+ t.Errorf("timed out waiting for %v to exist after %v", filename, timeout)
+ }
+}
+
+func TestDaemonLifecycle(t *testing.T) {
+ logger := hclog.Default()
+ logger.SetLevel(hclog.Debug)
+ repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir())
+
+ ts := newTestRPCServer()
+ watcher := signals.NewWatcher()
+ ctx, cancel := context.WithCancel(context.Background())
+
+ d := &daemon{
+ logger: logger,
+ repoRoot: repoRoot,
+ timeout: 10 * time.Second,
+ reqCh: make(chan struct{}),
+ timedOutCh: make(chan struct{}),
+ }
+
+ var serverErr error
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+ go func() {
+ serverErr = d.runTurboServer(ctx, ts, watcher)
+ wg.Done()
+ }()
+
+ sockPath := getUnixSocket(repoRoot)
+ waitForFile(t, sockPath, 30*time.Second)
+ pidPath := getPidFile(repoRoot)
+ waitForFile(t, pidPath, 1*time.Second)
+ cancel()
+ wg.Wait()
+ assert.NilError(t, serverErr, "runTurboServer")
+ if sockPath.FileExists() {
+ t.Errorf("%v still exists, should have been cleaned up", sockPath)
+ }
+ if pidPath.FileExists() {
+ t.Errorf("%v still exists, should have been cleaned up", sockPath)
+ }
+}
+
+func TestTimeout(t *testing.T) {
+ logger := hclog.Default()
+ logger.SetLevel(hclog.Debug)
+ repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir())
+
+ ts := newTestRPCServer()
+ watcher := signals.NewWatcher()
+ ctx := context.Background()
+
+ d := &daemon{
+ logger: logger,
+ repoRoot: repoRoot,
+ timeout: 5 * time.Millisecond,
+ reqCh: make(chan struct{}),
+ timedOutCh: make(chan struct{}),
+ }
+ err := d.runTurboServer(ctx, ts, watcher)
+ if !errors.Is(err, errInactivityTimeout) {
+ t.Errorf("server error got %v, want %v", err, errInactivityTimeout)
+ }
+}
+
+func TestCaughtSignal(t *testing.T) {
+ logger := hclog.Default()
+ logger.SetLevel(hclog.Debug)
+ repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir())
+
+ ts := newTestRPCServer()
+ watcher := signals.NewWatcher()
+ ctx := context.Background()
+
+ d := &daemon{
+ logger: logger,
+ repoRoot: repoRoot,
+ timeout: 5 * time.Second,
+ reqCh: make(chan struct{}),
+ timedOutCh: make(chan struct{}),
+ }
+ errCh := make(chan error)
+ go func() {
+ err := d.runTurboServer(ctx, ts, watcher)
+ errCh <- err
+ }()
+ <-ts.registered
+ // grpc doesn't provide a signal to know when the server is serving.
+ // So while this call to Close can race with the call to grpc.Server.Serve, if we've
+ // registered with the turboserver, we've registered all of our
+ // signal handlers as well. We just may or may not be serving when Close()
+ // is called. It shouldn't matter for the purposes of this test:
+ // Either we are serving, and Serve will return with nil when GracefulStop is
+ // called, or we aren't serving yet, and the subsequent call to Serve will
+ // immediately return with grpc.ErrServerStopped. So, both nil and grpc.ErrServerStopped
+ // are acceptable outcomes for runTurboServer. Any other error, or a timeout, is a
+ // failure.
+ watcher.Close()
+
+ err := <-errCh
+ pidPath := getPidFile(repoRoot)
+ if pidPath.FileExists() {
+ t.Errorf("expected to clean up %v, but it still exists", pidPath)
+ }
+ // We'll either get nil or ErrServerStopped, depending on whether
+ // or not we close the signal watcher before grpc.Server.Serve was
+ // called.
+ if err != nil && !errors.Is(err, grpc.ErrServerStopped) {
+ t.Errorf("runTurboServer got err %v, want nil or ErrServerStopped", err)
+ }
+}
+
+func TestCleanupOnPanic(t *testing.T) {
+ logger := hclog.Default()
+ logger.SetLevel(hclog.Debug)
+ repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir())
+
+ ts := newTestRPCServer()
+ watcher := signals.NewWatcher()
+ ctx := context.Background()
+
+ d := &daemon{
+ logger: logger,
+ repoRoot: repoRoot,
+ timeout: 5 * time.Second,
+ reqCh: make(chan struct{}),
+ timedOutCh: make(chan struct{}),
+ }
+ errCh := make(chan error)
+ go func() {
+ err := d.runTurboServer(ctx, ts, watcher)
+ errCh <- err
+ }()
+ <-ts.registered
+
+ creds := insecure.NewCredentials()
+ sockFile := getUnixSocket(repoRoot)
+ conn, err := grpc.Dial("unix://"+sockFile.ToString(), grpc.WithTransportCredentials(creds))
+ assert.NilError(t, err, "Dial")
+
+ client := grpc_testing.NewTestServiceClient(conn)
+ _, err = client.EmptyCall(ctx, &grpc_testing.Empty{})
+ if err == nil {
+ t.Error("nil error")
+ }
+ // wait for the server to finish
+ <-errCh
+
+ pidPath := getPidFile(repoRoot)
+ if pidPath.FileExists() {
+ t.Errorf("expected to clean up %v, but it still exists", pidPath)
+ }
+}