diff options
| author | 2023-04-28 01:36:44 +0800 | |
|---|---|---|
| committer | 2023-04-28 01:36:44 +0800 | |
| commit | dd84b9d64fb98746a230cd24233ff50a562c39c9 (patch) | |
| tree | b583261ef00b3afe72ec4d6dacb31e57779a6faf /cli/internal/daemon | |
| parent | 0b46fcd72ac34382387b2bcf9095233efbcc52f4 (diff) | |
| download | HydroRoll-dd84b9d64fb98746a230cd24233ff50a562c39c9.tar.gz HydroRoll-dd84b9d64fb98746a230cd24233ff50a562c39c9.zip | |
Diffstat (limited to 'cli/internal/daemon')
| -rw-r--r-- | cli/internal/daemon/connector/connector.go | 391 | ||||
| -rw-r--r-- | cli/internal/daemon/connector/connector_test.go | 256 | ||||
| -rw-r--r-- | cli/internal/daemon/connector/fork.go | 15 | ||||
| -rw-r--r-- | cli/internal/daemon/connector/fork_windows.go | 15 | ||||
| -rw-r--r-- | cli/internal/daemon/daemon.go | 307 | ||||
| -rw-r--r-- | cli/internal/daemon/daemon_test.go | 262 |
6 files changed, 1246 insertions, 0 deletions
diff --git a/cli/internal/daemon/connector/connector.go b/cli/internal/daemon/connector/connector.go new file mode 100644 index 0000000..d05ef59 --- /dev/null +++ b/cli/internal/daemon/connector/connector.go @@ -0,0 +1,391 @@ +package connector + +import ( + "context" + "fmt" + "io/fs" + "os" + "os/exec" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/hashicorp/go-hclog" + "github.com/nightlyone/lockfile" + "github.com/pkg/errors" + "github.com/vercel/turbo/cli/internal/turbodprotocol" + "github.com/vercel/turbo/cli/internal/turbopath" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" +) + +var ( + // ErrFailedToStart is returned when the daemon process cannot be started + ErrFailedToStart = errors.New("daemon could not be started") + // ErrVersionMismatch is returned when the daemon process was spawned by a different version than the connecting client + ErrVersionMismatch = errors.New("daemon version does not match client version") + errConnectionFailure = errors.New("could not connect to daemon") + // ErrTooManyAttempts is returned when the client fails to connect too many times + ErrTooManyAttempts = errors.New("reached maximum number of attempts contacting daemon") + // ErrDaemonNotRunning is returned when the client cannot contact the daemon and has + // been instructed not to attempt to start a new daemon + ErrDaemonNotRunning = errors.New("the daemon is not running") +) + +// Opts is the set of configurable options for the client connection, +// including some options to be passed through to the daemon process if +// it needs to be started. +type Opts struct { + ServerTimeout time.Duration + DontStart bool // if true, don't attempt to start the daemon + DontKill bool // if true, don't attempt to kill the daemon +} + +// Client represents a connection to the daemon process +type Client struct { + turbodprotocol.TurbodClient + *grpc.ClientConn + SockPath turbopath.AbsoluteSystemPath + PidPath turbopath.AbsoluteSystemPath + LogPath turbopath.AbsoluteSystemPath +} + +// Connector instances are used to create a connection to turbo's daemon process +// The daemon will be started , or killed and restarted, if necessary +type Connector struct { + Logger hclog.Logger + Bin string + Opts Opts + SockPath turbopath.AbsoluteSystemPath + PidPath turbopath.AbsoluteSystemPath + LogPath turbopath.AbsoluteSystemPath + TurboVersion string +} + +// ConnectionError is returned in the error case from connect. It wraps the underlying +// cause and adds a message with the relevant files for the user to check. +type ConnectionError struct { + SockPath turbopath.AbsoluteSystemPath + PidPath turbopath.AbsoluteSystemPath + LogPath turbopath.AbsoluteSystemPath + cause error +} + +func (ce *ConnectionError) Error() string { + return fmt.Sprintf(`connection to turbo daemon process failed. Please ensure the following: + - the process identified by the pid in the file at %v is not running, and remove %v + - check the logs at %v + - the unix domain socket at %v has been removed + You can also run without the daemon process by passing --no-daemon`, ce.PidPath, ce.PidPath, ce.LogPath, ce.SockPath) +} + +// Unwrap allows a connection error to work with standard library "errors" and compatible packages +func (ce *ConnectionError) Unwrap() error { + return ce.cause +} + +func (c *Connector) wrapConnectionError(err error) error { + return &ConnectionError{ + SockPath: c.SockPath, + PidPath: c.PidPath, + LogPath: c.LogPath, + cause: err, + } +} + +// lockFile returns a pointer to where a lockfile should be. +// lockfile.New does not perform IO and the only error it produces +// is in the case a non-absolute path was provided. We're guaranteeing an +// turbopath.AbsoluteSystemPath, so an error here is an indication of a bug and +// we should crash. +func (c *Connector) lockFile() lockfile.Lockfile { + lockFile, err := lockfile.New(c.PidPath.ToString()) + if err != nil { + panic(err) + } + return lockFile +} + +func (c *Connector) addr() string { + // grpc special-cases parsing of unix:<path> urls + // to avoid url.Parse. This lets us pass through our absolute + // paths unmodified, even on windows. + // See code here: https://github.com/grpc/grpc-go/blob/d83070ec0d9043f713b6a63e1963c593b447208c/internal/transport/http_util.go#L392 + return fmt.Sprintf("unix:%v", c.SockPath.ToString()) +} + +// We defer to the daemon's pid file as the locking mechanism. +// If it doesn't exist, we will attempt to start the daemon. +// If the daemon has a different version, ask it to shut down. +// If the pid file exists but we can't connect, try to kill +// the daemon. +// If we can't cause the daemon to remove the pid file, report +// an error to the user that includes the file location so that +// they can resolve it. +const ( + _maxAttempts = 3 + _shutdownTimeout = 1 * time.Second + _socketPollTimeout = 1 * time.Second +) + +// killLiveServer tells a running server to shut down. This method is also responsible +// for closing this client connection. +func (c *Connector) killLiveServer(ctx context.Context, client *Client, serverPid int) error { + defer func() { _ = client.Close() }() + + _, err := client.Shutdown(ctx, &turbodprotocol.ShutdownRequest{}) + if err != nil { + c.Logger.Error(fmt.Sprintf("failed to shutdown running daemon. attempting to force it closed: %v", err)) + return c.killDeadServer(serverPid) + } + // Wait for the server to gracefully exit + err = backoff.Retry(func() error { + lockFile := c.lockFile() + owner, err := lockFile.GetOwner() + if os.IsNotExist(err) { + // If there is no pid more file, we can conclude that the daemon successfully + // exited and cleaned up after itself. + return nil + } else if err != nil { + // some other error occurred getting the lockfile owner + return backoff.Permanent(err) + } else if owner.Pid == serverPid { + // // We're still waiting for the server to shut down + return errNeedsRetry + } + // if there's no error and the lockfile has a new pid, someone else must've started a new daemon. + // Consider the old one killed and move on. + return nil + }, backoffWithTimeout(_shutdownTimeout)) + if errors.Is(err, errNeedsRetry) { + c.Logger.Error(fmt.Sprintf("daemon did not exit after %v, attempting to force it closed", _shutdownTimeout.String())) + return c.killDeadServer(serverPid) + } else if err != nil { + return err + } + return nil +} + +func (c *Connector) killDeadServer(pid int) error { + // currently the only error that this constructor returns is + // in the case that you don't provide an absolute path. + // Given that we require an absolute path as input, this should + // hopefully never happen. + lockFile := c.lockFile() + process, err := lockFile.GetOwner() + if err == nil { + // Check that this is the same process that we failed to connect to. + // Otherwise, connectInternal will loop around again and start with whatever + // new process has the pid file. + if process.Pid == pid { + // we have a process that we need to kill + // TODO(gsoltis): graceful kill? the process is already not responding to requests, + // but it could be in the middle of a graceful shutdown. Probably should let it clean + // itself up, and report an error and defer to a force-kill by the user + if err := process.Kill(); err != nil { + return err + } + } + return nil + } else if errors.Is(err, os.ErrNotExist) { + // There's no pid file. Someone else killed it. Returning no error will cause the + // connectInternal to loop around and try the connection again. + return nil + } + return err +} + +// Connect attempts to create a connection to a turbo daemon. +// Retries and daemon restarts are built in. If this fails, +// it is unlikely to succeed after an automated retry. +func (c *Connector) Connect(ctx context.Context) (*Client, error) { + client, err := c.connectInternal(ctx) + if err != nil { + return nil, c.wrapConnectionError(err) + } + return client, nil +} + +func (c *Connector) connectInternal(ctx context.Context) (*Client, error) { + // for each attempt, we: + // 1. try to find or start a daemon process, getting its pid + // 2. wait for the unix domain socket file to appear + // 3. connect to the unix domain socket. Note that this connection is not validated + // 4. send a hello message. This validates the connection as a side effect of + // negotiating versions, which currently requires exact match. + // In the event of a live, but incompatible server, we attempt to shut it down and start + // a new one. In the event of an unresponsive server, we attempt to kill the process + // identified by the pid file, with the hope that it will clean up after itself. + // Failures include details about where to find logs, the pid file, and the socket file. + for i := 0; i < _maxAttempts; i++ { + serverPid, err := c.getOrStartDaemon() + if err != nil { + // If we fail to even start the daemon process, return immediately, we're unlikely + // to succeed without user intervention + return nil, err + } + if err := c.waitForSocket(); errors.Is(err, ErrFailedToStart) { + // If we didn't see the socket file, try again. It's possible that + // the daemon encountered an transitory error + continue + } else if err != nil { + return nil, err + } + client, err := c.getClientConn() + if err != nil { + return nil, err + } + if err := c.sendHello(ctx, client); err == nil { + // We connected and negotiated a version, we're all set + return client, nil + } else if errors.Is(err, ErrVersionMismatch) { + // We don't want to knock down a perfectly fine daemon in a status check. + if c.Opts.DontKill { + return nil, err + } + + // We now know we aren't going to return this client, + // but killLiveServer still needs it to send the Shutdown request. + // killLiveServer will close the client when it is done with it. + if err := c.killLiveServer(ctx, client, serverPid); err != nil { + return nil, err + } + // Loops back around and tries again. + } else if errors.Is(err, errConnectionFailure) { + // close the client, see if we can kill the stale daemon + _ = client.Close() + if err := c.killDeadServer(serverPid); err != nil { + return nil, err + } + // if we successfully killed the dead server, loop around and try again + } else if err != nil { + // Some other error occurred, close the client and + // report the error to the user + if closeErr := client.Close(); closeErr != nil { + // In the event that we fail to close the client, bundle that error along also. + // Keep the original error in the error chain, as it's more likely to be useful + // or needed for matching on later. + err = errors.Wrapf(err, "also failed to close client connection: %v", closeErr) + } + return nil, err + } + } + return nil, ErrTooManyAttempts +} + +// getOrStartDaemon returns the PID of the daemon process on success. It may start +// the daemon if it doesn't find one running. +func (c *Connector) getOrStartDaemon() (int, error) { + lockFile := c.lockFile() + daemonProcess, getDaemonProcessErr := lockFile.GetOwner() + if getDaemonProcessErr != nil { + // If we're in a clean state this isn't an "error" per se. + // We attempt to start a daemon. + if errors.Is(getDaemonProcessErr, fs.ErrNotExist) { + if c.Opts.DontStart { + return 0, ErrDaemonNotRunning + } + pid, startDaemonErr := c.startDaemon() + if startDaemonErr != nil { + return 0, startDaemonErr + } + return pid, nil + } + + // We could have hit any number of errors. + // - Failed to read the file for permission reasons. + // - User emptied the file's contents. + // - etc. + return 0, errors.Wrapf(getDaemonProcessErr, "An issue was encountered with the pid file. Please remove it and try again: %v", c.PidPath) + } + + return daemonProcess.Pid, nil +} + +func (c *Connector) getClientConn() (*Client, error) { + creds := insecure.NewCredentials() + conn, err := grpc.Dial(c.addr(), grpc.WithTransportCredentials(creds)) + if err != nil { + return nil, err + } + tc := turbodprotocol.NewTurbodClient(conn) + return &Client{ + TurbodClient: tc, + ClientConn: conn, + SockPath: c.SockPath, + PidPath: c.PidPath, + LogPath: c.LogPath, + }, nil +} + +func (c *Connector) sendHello(ctx context.Context, client turbodprotocol.TurbodClient) error { + _, err := client.Hello(ctx, &turbodprotocol.HelloRequest{ + Version: c.TurboVersion, + // TODO: add session id + }) + status := status.Convert(err) + switch status.Code() { + case codes.OK: + return nil + case codes.FailedPrecondition: + return ErrVersionMismatch + case codes.Unavailable: + return errConnectionFailure + default: + return err + } +} + +var errNeedsRetry = errors.New("retry the operation") + +// backoffWithTimeout returns an exponential backoff, starting at 2ms and doubling until +// the specific timeout has elapsed. Note that backoff instances are stateful, so we need +// a new one each time we do a Retry. +func backoffWithTimeout(timeout time.Duration) *backoff.ExponentialBackOff { + return &backoff.ExponentialBackOff{ + Multiplier: 2, + InitialInterval: 2 * time.Millisecond, + MaxElapsedTime: timeout, + Clock: backoff.SystemClock, + Stop: backoff.Stop, + } +} + +// waitForSocket waits for the unix domain socket to appear +func (c *Connector) waitForSocket() error { + // Note that we don't care if this is our daemon + // or not. We started a process, but someone else could beat + // use to listening. That's fine, we'll check the version + // later. + err := backoff.Retry(func() error { + if !c.SockPath.FileExists() { + return errNeedsRetry + } + return nil + }, backoffWithTimeout(_socketPollTimeout)) + if errors.Is(err, errNeedsRetry) { + return ErrFailedToStart + } else if err != nil { + return err + } + return nil +} + +// startDaemon starts the daemon and returns the pid for the new process +func (c *Connector) startDaemon() (int, error) { + args := []string{"daemon"} + if c.Opts.ServerTimeout != 0 { + args = append(args, fmt.Sprintf("--idle-time=%v", c.Opts.ServerTimeout.String())) + } + c.Logger.Debug(fmt.Sprintf("starting turbod binary %v", c.Bin)) + cmd := exec.Command(c.Bin, args...) + // For the daemon to have its own process group id so that any attempts + // to kill it and its process tree don't kill this client. + cmd.SysProcAttr = getSysProcAttrs() + err := cmd.Start() + if err != nil { + return 0, err + } + return cmd.Process.Pid, nil +} diff --git a/cli/internal/daemon/connector/connector_test.go b/cli/internal/daemon/connector/connector_test.go new file mode 100644 index 0000000..62b4504 --- /dev/null +++ b/cli/internal/daemon/connector/connector_test.go @@ -0,0 +1,256 @@ +package connector + +import ( + "context" + "errors" + "net" + "os/exec" + "runtime" + "strconv" + "testing" + + "github.com/hashicorp/go-hclog" + "github.com/nightlyone/lockfile" + "github.com/vercel/turbo/cli/internal/fs" + "github.com/vercel/turbo/cli/internal/turbodprotocol" + "github.com/vercel/turbo/cli/internal/turbopath" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" + "google.golang.org/grpc/test/bufconn" + "gotest.tools/v3/assert" +) + +// testBin returns a platform-appropriate executable to run node. +// Node works here as an arbitrary process to start, since it's +// required for turbo development. It will obviously not implement +// our grpc service, use a mockServer instance where that's needed. +func testBin() string { + if runtime.GOOS == "windows" { + return "node.exe" + } + return "node" +} + +func getUnixSocket(dir turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath { + return dir.UntypedJoin("turbod-test.sock") +} + +func getPidFile(dir turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath { + return dir.UntypedJoin("turbod-test.pid") +} + +func TestGetOrStartDaemonInvalidPIDFile(t *testing.T) { + logger := hclog.Default() + dir := t.TempDir() + dirPath := fs.AbsoluteSystemPathFromUpstream(dir) + + pidPath := getPidFile(dirPath) + writeFileErr := pidPath.WriteFile(nil, 0777) + assert.NilError(t, writeFileErr, "WriteFile") + + c := &Connector{ + Logger: logger, + Opts: Opts{}, + PidPath: pidPath, + } + + pid, err := c.getOrStartDaemon() + assert.Equal(t, pid, 0) + assert.ErrorContains(t, err, "issue was encountered with the pid file") +} + +func TestConnectFailsWithoutGrpcServer(t *testing.T) { + // We aren't starting a server that is going to write + // to our socket file, so we should see a series of connection + // failures, followed by ErrTooManyAttempts + logger := hclog.Default() + dir := t.TempDir() + dirPath := fs.AbsoluteSystemPathFromUpstream(dir) + + sockPath := getUnixSocket(dirPath) + pidPath := getPidFile(dirPath) + ctx := context.Background() + bin := testBin() + c := &Connector{ + Logger: logger, + Bin: bin, + Opts: Opts{}, + SockPath: sockPath, + PidPath: pidPath, + } + // Note that we expect ~3s here, for 3 attempts with a timeout of 1s + _, err := c.connectInternal(ctx) + assert.ErrorIs(t, err, ErrTooManyAttempts) +} + +func TestKillDeadServerNoPid(t *testing.T) { + logger := hclog.Default() + dir := t.TempDir() + dirPath := fs.AbsoluteSystemPathFromUpstream(dir) + + sockPath := getUnixSocket(dirPath) + pidPath := getPidFile(dirPath) + c := &Connector{ + Logger: logger, + Bin: "nonexistent", + Opts: Opts{}, + SockPath: sockPath, + PidPath: pidPath, + } + + err := c.killDeadServer(99999) + assert.NilError(t, err, "killDeadServer") +} + +func TestKillDeadServerNoProcess(t *testing.T) { + logger := hclog.Default() + dir := t.TempDir() + dirPath := fs.AbsoluteSystemPathFromUpstream(dir) + + sockPath := getUnixSocket(dirPath) + pidPath := getPidFile(dirPath) + // Simulate the socket already existing, with no live daemon + err := sockPath.WriteFile([]byte("junk"), 0644) + assert.NilError(t, err, "WriteFile") + err = pidPath.WriteFile([]byte("99999"), 0644) + assert.NilError(t, err, "WriteFile") + c := &Connector{ + Logger: logger, + Bin: "nonexistent", + Opts: Opts{}, + SockPath: sockPath, + PidPath: pidPath, + } + + err = c.killDeadServer(99999) + assert.ErrorIs(t, err, lockfile.ErrDeadOwner) + stillExists := pidPath.FileExists() + if !stillExists { + t.Error("pidPath should still exist, expected the user to clean it up") + } +} + +func TestKillDeadServerWithProcess(t *testing.T) { + logger := hclog.Default() + dir := t.TempDir() + dirPath := fs.AbsoluteSystemPathFromUpstream(dir) + + sockPath := getUnixSocket(dirPath) + pidPath := getPidFile(dirPath) + // Simulate the socket already existing, with no live daemon + err := sockPath.WriteFile([]byte("junk"), 0644) + assert.NilError(t, err, "WriteFile") + bin := testBin() + cmd := exec.Command(bin) + err = cmd.Start() + assert.NilError(t, err, "cmd.Start") + pid := cmd.Process.Pid + if pid == 0 { + t.Fatalf("failed to start process %v", bin) + } + + err = pidPath.WriteFile([]byte(strconv.Itoa(pid)), 0644) + assert.NilError(t, err, "WriteFile") + c := &Connector{ + Logger: logger, + Bin: "nonexistent", + Opts: Opts{}, + SockPath: sockPath, + PidPath: pidPath, + } + + err = c.killDeadServer(pid) + assert.NilError(t, err, "killDeadServer") + stillExists := pidPath.FileExists() + if !stillExists { + t.Error("pidPath no longer exists, expected client to not clean it up") + } + err = cmd.Wait() + exitErr := &exec.ExitError{} + if !errors.As(err, &exitErr) { + t.Errorf("expected an exit error from %v, got %v", bin, err) + } +} + +type mockServer struct { + turbodprotocol.UnimplementedTurbodServer + helloErr error + shutdownResp *turbodprotocol.ShutdownResponse + pidFile turbopath.AbsoluteSystemPath +} + +// Simulates server exiting by cleaning up the pid file +func (s *mockServer) Shutdown(ctx context.Context, req *turbodprotocol.ShutdownRequest) (*turbodprotocol.ShutdownResponse, error) { + if err := s.pidFile.Remove(); err != nil { + return nil, err + } + return s.shutdownResp, nil +} + +func (s *mockServer) Hello(ctx context.Context, req *turbodprotocol.HelloRequest) (*turbodprotocol.HelloResponse, error) { + if req.Version == "" { + return nil, errors.New("missing version") + } + return nil, s.helloErr +} + +func TestKillLiveServer(t *testing.T) { + logger := hclog.Default() + dir := t.TempDir() + dirPath := fs.AbsoluteSystemPathFromUpstream(dir) + + sockPath := getUnixSocket(dirPath) + pidPath := getPidFile(dirPath) + err := pidPath.WriteFile([]byte("99999"), 0644) + assert.NilError(t, err, "WriteFile") + + ctx := context.Background() + c := &Connector{ + Logger: logger, + Bin: "nonexistent", + Opts: Opts{}, + SockPath: sockPath, + PidPath: pidPath, + TurboVersion: "some-version", + } + + st := status.New(codes.FailedPrecondition, "version mismatch") + mock := &mockServer{ + shutdownResp: &turbodprotocol.ShutdownResponse{}, + helloErr: st.Err(), + pidFile: pidPath, + } + lis := bufconn.Listen(1024 * 1024) + grpcServer := grpc.NewServer() + turbodprotocol.RegisterTurbodServer(grpcServer, mock) + go func(t *testing.T) { + if err := grpcServer.Serve(lis); err != nil { + t.Logf("server closed: %v", err) + } + }(t) + + conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) { + return lis.Dial() + }), grpc.WithTransportCredentials(insecure.NewCredentials())) + assert.NilError(t, err, "DialContext") + turboClient := turbodprotocol.NewTurbodClient(conn) + client := &Client{ + TurbodClient: turboClient, + ClientConn: conn, + } + err = c.sendHello(ctx, client) + if !errors.Is(err, ErrVersionMismatch) { + t.Errorf("sendHello error got %v, want %v", err, ErrVersionMismatch) + } + err = c.killLiveServer(ctx, client, 99999) + assert.NilError(t, err, "killLiveServer") + // Expect the pid file and socket files to have been cleaned up + if pidPath.FileExists() { + t.Errorf("expected pid file to have been deleted: %v", pidPath) + } + if sockPath.FileExists() { + t.Errorf("expected socket file to have been deleted: %v", sockPath) + } +} diff --git a/cli/internal/daemon/connector/fork.go b/cli/internal/daemon/connector/fork.go new file mode 100644 index 0000000..8a6d01d --- /dev/null +++ b/cli/internal/daemon/connector/fork.go @@ -0,0 +1,15 @@ +//go:build !windows +// +build !windows + +package connector + +import "syscall" + +// getSysProcAttrs returns the platform-specific attributes we want to +// use while forking the daemon process. Currently this is limited to +// forcing a new process group +func getSysProcAttrs() *syscall.SysProcAttr { + return &syscall.SysProcAttr{ + Setpgid: true, + } +} diff --git a/cli/internal/daemon/connector/fork_windows.go b/cli/internal/daemon/connector/fork_windows.go new file mode 100644 index 0000000..b9d6e77 --- /dev/null +++ b/cli/internal/daemon/connector/fork_windows.go @@ -0,0 +1,15 @@ +//go:build windows +// +build windows + +package connector + +import "syscall" + +// getSysProcAttrs returns the platform-specific attributes we want to +// use while forking the daemon process. Currently this is limited to +// forcing a new process group +func getSysProcAttrs() *syscall.SysProcAttr { + return &syscall.SysProcAttr{ + CreationFlags: syscall.CREATE_NEW_PROCESS_GROUP, + } +} diff --git a/cli/internal/daemon/daemon.go b/cli/internal/daemon/daemon.go new file mode 100644 index 0000000..81d5283 --- /dev/null +++ b/cli/internal/daemon/daemon.go @@ -0,0 +1,307 @@ +package daemon + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "io" + "net" + "os" + "path/filepath" + "strings" + "time" + + grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" + "github.com/hashicorp/go-hclog" + "github.com/nightlyone/lockfile" + "github.com/pkg/errors" + "github.com/vercel/turbo/cli/internal/cmdutil" + "github.com/vercel/turbo/cli/internal/daemon/connector" + "github.com/vercel/turbo/cli/internal/fs" + "github.com/vercel/turbo/cli/internal/server" + "github.com/vercel/turbo/cli/internal/signals" + "github.com/vercel/turbo/cli/internal/turbopath" + "github.com/vercel/turbo/cli/internal/turbostate" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type daemon struct { + logger hclog.Logger + repoRoot turbopath.AbsoluteSystemPath + timeout time.Duration + reqCh chan struct{} + timedOutCh chan struct{} +} + +func getRepoHash(repoRoot turbopath.AbsoluteSystemPath) string { + pathHash := sha256.Sum256([]byte(repoRoot.ToString())) + // We grab a substring of the hash because there is a 108-character limit on the length + // of a filepath for unix domain socket. + return hex.EncodeToString(pathHash[:])[:16] +} + +func getDaemonFileRoot(repoRoot turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath { + tempDir := fs.TempDir("turbod") + hexHash := getRepoHash(repoRoot) + return tempDir.UntypedJoin(hexHash) +} + +func getLogFilePath(repoRoot turbopath.AbsoluteSystemPath) (turbopath.AbsoluteSystemPath, error) { + hexHash := getRepoHash(repoRoot) + base := repoRoot.Base() + logFilename := fmt.Sprintf("%v-%v.log", hexHash, base) + + logsDir := fs.GetTurboDataDir().UntypedJoin("logs") + return logsDir.UntypedJoin(logFilename), nil +} + +func getUnixSocket(repoRoot turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath { + root := getDaemonFileRoot(repoRoot) + return root.UntypedJoin("turbod.sock") +} + +func getPidFile(repoRoot turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath { + root := getDaemonFileRoot(repoRoot) + return root.UntypedJoin("turbod.pid") +} + +// logError logs an error and outputs it to the UI. +func (d *daemon) logError(err error) { + d.logger.Error(fmt.Sprintf("error %v", err)) +} + +// we're only appending, and we're creating the file if it doesn't exist. +// we do not need to read the log file. +var _logFileFlags = os.O_WRONLY | os.O_APPEND | os.O_CREATE + +// ExecuteDaemon executes the root daemon command +func ExecuteDaemon(ctx context.Context, helper *cmdutil.Helper, signalWatcher *signals.Watcher, args *turbostate.ParsedArgsFromRust) error { + base, err := helper.GetCmdBase(args) + if err != nil { + return err + } + if args.TestRun { + base.UI.Info("Daemon test run successful") + return nil + } + + idleTimeout := 4 * time.Hour + if args.Command.Daemon.IdleTimeout != "" { + idleTimeout, err = time.ParseDuration(args.Command.Daemon.IdleTimeout) + if err != nil { + return err + } + } + + logFilePath, err := getLogFilePath(base.RepoRoot) + if err != nil { + return err + } + if err := logFilePath.EnsureDir(); err != nil { + return err + } + logFile, err := logFilePath.OpenFile(_logFileFlags, 0644) + if err != nil { + return err + } + defer func() { _ = logFile.Close() }() + logger := hclog.New(&hclog.LoggerOptions{ + Output: io.MultiWriter(logFile, os.Stdout), + Level: hclog.Info, + Color: hclog.ColorOff, + Name: "turbod", + }) + + d := &daemon{ + logger: logger, + repoRoot: base.RepoRoot, + timeout: idleTimeout, + reqCh: make(chan struct{}), + timedOutCh: make(chan struct{}), + } + serverName := getRepoHash(base.RepoRoot) + turboServer, err := server.New(serverName, d.logger.Named("rpc server"), base.RepoRoot, base.TurboVersion, logFilePath) + if err != nil { + d.logError(err) + return err + } + defer func() { _ = turboServer.Close() }() + err = d.runTurboServer(ctx, turboServer, signalWatcher) + if err != nil { + d.logError(err) + return err + } + return nil +} + +var errInactivityTimeout = errors.New("turbod shut down from inactivity") + +// tryAcquirePidfileLock attempts to ensure that only one daemon is running from the given pid file path +// at a time. If this process fails to write its PID to the lockfile, it must exit. +func tryAcquirePidfileLock(pidPath turbopath.AbsoluteSystemPath) (lockfile.Lockfile, error) { + if err := pidPath.EnsureDir(); err != nil { + return "", err + } + lockFile, err := lockfile.New(pidPath.ToString()) + if err != nil { + // lockfile.New should only return an error if it wasn't given an absolute path. + // We are attempting to use the type system to enforce that we are passing an + // absolute path. An error here likely means a bug, and we should crash. + panic(err) + } + if err := lockFile.TryLock(); err != nil { + return "", err + } + return lockFile, nil +} + +type rpcServer interface { + Register(grpcServer server.GRPCServer) +} + +func (d *daemon) runTurboServer(parentContext context.Context, rpcServer rpcServer, signalWatcher *signals.Watcher) error { + ctx, cancel := context.WithCancel(parentContext) + defer cancel() + pidPath := getPidFile(d.repoRoot) + lock, err := tryAcquirePidfileLock(pidPath) + if err != nil { + return errors.Wrapf(err, "failed to lock the pid file at %v. Is another turbo daemon running?", lock) + } + // When we're done serving, clean up the pid file. + // Also, if *this* goroutine panics, make sure we unlock the pid file. + defer func() { + if err := lock.Unlock(); err != nil { + d.logger.Error(errors.Wrapf(err, "failed unlocking pid file at %v", lock).Error()) + } + }() + // This handler runs in request goroutines. If a request causes a panic, + // this handler will get called after a call to recover(), meaning we are + // no longer panicking. We return a server error and cancel our context, + // which triggers a shutdown of the server. + panicHandler := func(thePanic interface{}) error { + cancel() + d.logger.Error(fmt.Sprintf("Caught panic %v", thePanic)) + return status.Error(codes.Internal, "server panicked") + } + + // If we have the lock, assume that we are the owners of the socket file, + // whether it already exists or not. That means we are free to remove it. + sockPath := getUnixSocket(d.repoRoot) + if err := sockPath.Remove(); err != nil && !errors.Is(err, os.ErrNotExist) { + return err + } + d.logger.Debug(fmt.Sprintf("Using socket path %v (%v)\n", sockPath, len(sockPath))) + lis, err := net.Listen("unix", sockPath.ToString()) + if err != nil { + return err + } + // We don't need to explicitly close 'lis', the grpc server will handle that + s := grpc.NewServer( + grpc.ChainUnaryInterceptor( + d.onRequest, + grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandler(panicHandler)), + ), + ) + go d.timeoutLoop(ctx) + + rpcServer.Register(s) + errCh := make(chan error) + go func(errCh chan<- error) { + if err := s.Serve(lis); err != nil { + errCh <- err + } + close(errCh) + }(errCh) + + // Note that we aren't deferring s.GracefulStop here because we also need + // to drain the error channel, which isn't guaranteed to happen until + // the server has stopped. That in turn may depend on GracefulStop being + // called. + // Future work could restructure this to make that simpler. + var exitErr error + select { + case err, ok := <-errCh: + // The server exited + if ok { + exitErr = err + } + case <-d.timedOutCh: + // This is the inactivity timeout case + exitErr = errInactivityTimeout + s.GracefulStop() + case <-ctx.Done(): + // If a request handler panics, it will cancel this context + s.GracefulStop() + case <-signalWatcher.Done(): + // This is fired if caught a signal + s.GracefulStop() + } + // Wait for the server to exit, if it hasn't already. + // When it does, this channel will close. We don't + // care about the error in this scenario because we've + // either requested a close via cancelling the context, + // an inactivity timeout, or caught a signal. + for range errCh { + } + return exitErr +} + +func (d *daemon) onRequest(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + d.reqCh <- struct{}{} + return handler(ctx, req) +} + +func (d *daemon) timeoutLoop(ctx context.Context) { + timeoutCh := time.After(d.timeout) +outer: + for { + select { + case <-d.reqCh: + timeoutCh = time.After(d.timeout) + case <-timeoutCh: + close(d.timedOutCh) + break outer + case <-ctx.Done(): + break outer + } + } +} + +// ClientOpts re-exports connector.Ops to encapsulate the connector package +type ClientOpts = connector.Opts + +// Client re-exports connector.Client to encapsulate the connector package +type Client = connector.Client + +// GetClient returns a client that can be used to interact with the daemon +func GetClient(ctx context.Context, repoRoot turbopath.AbsoluteSystemPath, logger hclog.Logger, turboVersion string, opts ClientOpts) (*Client, error) { + sockPath := getUnixSocket(repoRoot) + pidPath := getPidFile(repoRoot) + logPath, err := getLogFilePath(repoRoot) + if err != nil { + return nil, err + } + bin, err := os.Executable() + if err != nil { + return nil, err + } + // The Go binary can no longer be called directly, so we need to route back to the rust wrapper + if strings.HasSuffix(bin, "go-turbo") { + bin = filepath.Join(filepath.Dir(bin), "turbo") + } else if strings.HasSuffix(bin, "go-turbo.exe") { + bin = filepath.Join(filepath.Dir(bin), "turbo.exe") + } + c := &connector.Connector{ + Logger: logger.Named("TurbodClient"), + Bin: bin, + Opts: opts, + SockPath: sockPath, + PidPath: pidPath, + LogPath: logPath, + TurboVersion: turboVersion, + } + return c.Connect(ctx) +} diff --git a/cli/internal/daemon/daemon_test.go b/cli/internal/daemon/daemon_test.go new file mode 100644 index 0000000..66a714d --- /dev/null +++ b/cli/internal/daemon/daemon_test.go @@ -0,0 +1,262 @@ +package daemon + +import ( + "context" + "errors" + "os/exec" + "runtime" + "strconv" + "sync" + "testing" + "time" + + "github.com/hashicorp/go-hclog" + "github.com/nightlyone/lockfile" + "github.com/vercel/turbo/cli/internal/fs" + "github.com/vercel/turbo/cli/internal/server" + "github.com/vercel/turbo/cli/internal/signals" + "github.com/vercel/turbo/cli/internal/turbopath" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/test/grpc_testing" + "gotest.tools/v3/assert" +) + +// testBin returns a platform-appropriate node binary. +// We need some process to be running and findable by the +// lockfile library, and we don't particularly care what it is. +// Since node is required for turbo development, it makes a decent +// candidate. +func testBin() string { + if runtime.GOOS == "windows" { + return "node.exe" + } + return "node" +} + +func TestPidFileLock(t *testing.T) { + repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir()) + + pidPath := getPidFile(repoRoot) + // the lockfile library handles removing pids from dead owners + _, err := tryAcquirePidfileLock(pidPath) + assert.NilError(t, err, "acquirePidLock") + + // Start up a node process and fake a pid file for it. + // Ensure that we can't start the daemon while the node process is live + bin := testBin() + node := exec.Command(bin) + err = node.Start() + assert.NilError(t, err, "Start") + stopNode := func() error { + if err := node.Process.Kill(); err != nil { + return err + } + // We expect an error from node, we just sent a kill signal + _ = node.Wait() + return nil + } + // In case we fail the test, still try to kill the node process + t.Cleanup(func() { _ = stopNode() }) + nodePid := node.Process.Pid + err = pidPath.WriteFile([]byte(strconv.Itoa(nodePid)), 0644) + assert.NilError(t, err, "WriteFile") + + _, err = tryAcquirePidfileLock(pidPath) + assert.ErrorIs(t, err, lockfile.ErrBusy) + + // Stop the node process, but leave the pid file there + // This simulates a crash + err = stopNode() + assert.NilError(t, err, "stopNode") + // the lockfile library handles removing pids from dead owners + _, err = tryAcquirePidfileLock(pidPath) + assert.NilError(t, err, "acquirePidLock") +} + +type testRPCServer struct { + grpc_testing.UnimplementedTestServiceServer + registered chan struct{} +} + +func (ts *testRPCServer) EmptyCall(ctx context.Context, req *grpc_testing.Empty) (*grpc_testing.Empty, error) { + panic("intended to panic") +} + +func (ts *testRPCServer) Register(grpcServer server.GRPCServer) { + grpc_testing.RegisterTestServiceServer(grpcServer, ts) + ts.registered <- struct{}{} +} + +func newTestRPCServer() *testRPCServer { + return &testRPCServer{ + registered: make(chan struct{}, 1), + } +} + +func waitForFile(t *testing.T, filename turbopath.AbsoluteSystemPath, timeout time.Duration) { + t.Helper() + deadline := time.After(timeout) +outer: + for !filename.FileExists() { + select { + case <-deadline: + break outer + case <-time.After(10 * time.Millisecond): + } + } + if !filename.FileExists() { + t.Errorf("timed out waiting for %v to exist after %v", filename, timeout) + } +} + +func TestDaemonLifecycle(t *testing.T) { + logger := hclog.Default() + logger.SetLevel(hclog.Debug) + repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir()) + + ts := newTestRPCServer() + watcher := signals.NewWatcher() + ctx, cancel := context.WithCancel(context.Background()) + + d := &daemon{ + logger: logger, + repoRoot: repoRoot, + timeout: 10 * time.Second, + reqCh: make(chan struct{}), + timedOutCh: make(chan struct{}), + } + + var serverErr error + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + serverErr = d.runTurboServer(ctx, ts, watcher) + wg.Done() + }() + + sockPath := getUnixSocket(repoRoot) + waitForFile(t, sockPath, 30*time.Second) + pidPath := getPidFile(repoRoot) + waitForFile(t, pidPath, 1*time.Second) + cancel() + wg.Wait() + assert.NilError(t, serverErr, "runTurboServer") + if sockPath.FileExists() { + t.Errorf("%v still exists, should have been cleaned up", sockPath) + } + if pidPath.FileExists() { + t.Errorf("%v still exists, should have been cleaned up", sockPath) + } +} + +func TestTimeout(t *testing.T) { + logger := hclog.Default() + logger.SetLevel(hclog.Debug) + repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir()) + + ts := newTestRPCServer() + watcher := signals.NewWatcher() + ctx := context.Background() + + d := &daemon{ + logger: logger, + repoRoot: repoRoot, + timeout: 5 * time.Millisecond, + reqCh: make(chan struct{}), + timedOutCh: make(chan struct{}), + } + err := d.runTurboServer(ctx, ts, watcher) + if !errors.Is(err, errInactivityTimeout) { + t.Errorf("server error got %v, want %v", err, errInactivityTimeout) + } +} + +func TestCaughtSignal(t *testing.T) { + logger := hclog.Default() + logger.SetLevel(hclog.Debug) + repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir()) + + ts := newTestRPCServer() + watcher := signals.NewWatcher() + ctx := context.Background() + + d := &daemon{ + logger: logger, + repoRoot: repoRoot, + timeout: 5 * time.Second, + reqCh: make(chan struct{}), + timedOutCh: make(chan struct{}), + } + errCh := make(chan error) + go func() { + err := d.runTurboServer(ctx, ts, watcher) + errCh <- err + }() + <-ts.registered + // grpc doesn't provide a signal to know when the server is serving. + // So while this call to Close can race with the call to grpc.Server.Serve, if we've + // registered with the turboserver, we've registered all of our + // signal handlers as well. We just may or may not be serving when Close() + // is called. It shouldn't matter for the purposes of this test: + // Either we are serving, and Serve will return with nil when GracefulStop is + // called, or we aren't serving yet, and the subsequent call to Serve will + // immediately return with grpc.ErrServerStopped. So, both nil and grpc.ErrServerStopped + // are acceptable outcomes for runTurboServer. Any other error, or a timeout, is a + // failure. + watcher.Close() + + err := <-errCh + pidPath := getPidFile(repoRoot) + if pidPath.FileExists() { + t.Errorf("expected to clean up %v, but it still exists", pidPath) + } + // We'll either get nil or ErrServerStopped, depending on whether + // or not we close the signal watcher before grpc.Server.Serve was + // called. + if err != nil && !errors.Is(err, grpc.ErrServerStopped) { + t.Errorf("runTurboServer got err %v, want nil or ErrServerStopped", err) + } +} + +func TestCleanupOnPanic(t *testing.T) { + logger := hclog.Default() + logger.SetLevel(hclog.Debug) + repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir()) + + ts := newTestRPCServer() + watcher := signals.NewWatcher() + ctx := context.Background() + + d := &daemon{ + logger: logger, + repoRoot: repoRoot, + timeout: 5 * time.Second, + reqCh: make(chan struct{}), + timedOutCh: make(chan struct{}), + } + errCh := make(chan error) + go func() { + err := d.runTurboServer(ctx, ts, watcher) + errCh <- err + }() + <-ts.registered + + creds := insecure.NewCredentials() + sockFile := getUnixSocket(repoRoot) + conn, err := grpc.Dial("unix://"+sockFile.ToString(), grpc.WithTransportCredentials(creds)) + assert.NilError(t, err, "Dial") + + client := grpc_testing.NewTestServiceClient(conn) + _, err = client.EmptyCall(ctx, &grpc_testing.Empty{}) + if err == nil { + t.Error("nil error") + } + // wait for the server to finish + <-errCh + + pidPath := getPidFile(repoRoot) + if pidPath.FileExists() { + t.Errorf("expected to clean up %v, but it still exists", pidPath) + } +} |
