diff options
| author | 2023-04-28 01:36:55 +0800 | |
|---|---|---|
| committer | 2023-04-28 01:36:55 +0800 | |
| commit | fc8c5fdce62fb229202659408798a7b6c98f6e8b (patch) | |
| tree | 7554f80e50de4af6fd255afa7c21bcdd58a7af34 /cli/internal/daemon | |
| parent | dd84b9d64fb98746a230cd24233ff50a562c39c9 (diff) | |
| download | HydroRoll-fc8c5fdce62fb229202659408798a7b6c98f6e8b.tar.gz HydroRoll-fc8c5fdce62fb229202659408798a7b6c98f6e8b.zip | |
Diffstat (limited to 'cli/internal/daemon')
| -rw-r--r-- | cli/internal/daemon/connector/connector.go | 391 | ||||
| -rw-r--r-- | cli/internal/daemon/connector/connector_test.go | 256 | ||||
| -rw-r--r-- | cli/internal/daemon/connector/fork.go | 15 | ||||
| -rw-r--r-- | cli/internal/daemon/connector/fork_windows.go | 15 | ||||
| -rw-r--r-- | cli/internal/daemon/daemon.go | 307 | ||||
| -rw-r--r-- | cli/internal/daemon/daemon_test.go | 262 |
6 files changed, 0 insertions, 1246 deletions
diff --git a/cli/internal/daemon/connector/connector.go b/cli/internal/daemon/connector/connector.go deleted file mode 100644 index d05ef59..0000000 --- a/cli/internal/daemon/connector/connector.go +++ /dev/null @@ -1,391 +0,0 @@ -package connector - -import ( - "context" - "fmt" - "io/fs" - "os" - "os/exec" - "time" - - "github.com/cenkalti/backoff/v4" - "github.com/hashicorp/go-hclog" - "github.com/nightlyone/lockfile" - "github.com/pkg/errors" - "github.com/vercel/turbo/cli/internal/turbodprotocol" - "github.com/vercel/turbo/cli/internal/turbopath" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/status" -) - -var ( - // ErrFailedToStart is returned when the daemon process cannot be started - ErrFailedToStart = errors.New("daemon could not be started") - // ErrVersionMismatch is returned when the daemon process was spawned by a different version than the connecting client - ErrVersionMismatch = errors.New("daemon version does not match client version") - errConnectionFailure = errors.New("could not connect to daemon") - // ErrTooManyAttempts is returned when the client fails to connect too many times - ErrTooManyAttempts = errors.New("reached maximum number of attempts contacting daemon") - // ErrDaemonNotRunning is returned when the client cannot contact the daemon and has - // been instructed not to attempt to start a new daemon - ErrDaemonNotRunning = errors.New("the daemon is not running") -) - -// Opts is the set of configurable options for the client connection, -// including some options to be passed through to the daemon process if -// it needs to be started. -type Opts struct { - ServerTimeout time.Duration - DontStart bool // if true, don't attempt to start the daemon - DontKill bool // if true, don't attempt to kill the daemon -} - -// Client represents a connection to the daemon process -type Client struct { - turbodprotocol.TurbodClient - *grpc.ClientConn - SockPath turbopath.AbsoluteSystemPath - PidPath turbopath.AbsoluteSystemPath - LogPath turbopath.AbsoluteSystemPath -} - -// Connector instances are used to create a connection to turbo's daemon process -// The daemon will be started , or killed and restarted, if necessary -type Connector struct { - Logger hclog.Logger - Bin string - Opts Opts - SockPath turbopath.AbsoluteSystemPath - PidPath turbopath.AbsoluteSystemPath - LogPath turbopath.AbsoluteSystemPath - TurboVersion string -} - -// ConnectionError is returned in the error case from connect. It wraps the underlying -// cause and adds a message with the relevant files for the user to check. -type ConnectionError struct { - SockPath turbopath.AbsoluteSystemPath - PidPath turbopath.AbsoluteSystemPath - LogPath turbopath.AbsoluteSystemPath - cause error -} - -func (ce *ConnectionError) Error() string { - return fmt.Sprintf(`connection to turbo daemon process failed. Please ensure the following: - - the process identified by the pid in the file at %v is not running, and remove %v - - check the logs at %v - - the unix domain socket at %v has been removed - You can also run without the daemon process by passing --no-daemon`, ce.PidPath, ce.PidPath, ce.LogPath, ce.SockPath) -} - -// Unwrap allows a connection error to work with standard library "errors" and compatible packages -func (ce *ConnectionError) Unwrap() error { - return ce.cause -} - -func (c *Connector) wrapConnectionError(err error) error { - return &ConnectionError{ - SockPath: c.SockPath, - PidPath: c.PidPath, - LogPath: c.LogPath, - cause: err, - } -} - -// lockFile returns a pointer to where a lockfile should be. -// lockfile.New does not perform IO and the only error it produces -// is in the case a non-absolute path was provided. We're guaranteeing an -// turbopath.AbsoluteSystemPath, so an error here is an indication of a bug and -// we should crash. -func (c *Connector) lockFile() lockfile.Lockfile { - lockFile, err := lockfile.New(c.PidPath.ToString()) - if err != nil { - panic(err) - } - return lockFile -} - -func (c *Connector) addr() string { - // grpc special-cases parsing of unix:<path> urls - // to avoid url.Parse. This lets us pass through our absolute - // paths unmodified, even on windows. - // See code here: https://github.com/grpc/grpc-go/blob/d83070ec0d9043f713b6a63e1963c593b447208c/internal/transport/http_util.go#L392 - return fmt.Sprintf("unix:%v", c.SockPath.ToString()) -} - -// We defer to the daemon's pid file as the locking mechanism. -// If it doesn't exist, we will attempt to start the daemon. -// If the daemon has a different version, ask it to shut down. -// If the pid file exists but we can't connect, try to kill -// the daemon. -// If we can't cause the daemon to remove the pid file, report -// an error to the user that includes the file location so that -// they can resolve it. -const ( - _maxAttempts = 3 - _shutdownTimeout = 1 * time.Second - _socketPollTimeout = 1 * time.Second -) - -// killLiveServer tells a running server to shut down. This method is also responsible -// for closing this client connection. -func (c *Connector) killLiveServer(ctx context.Context, client *Client, serverPid int) error { - defer func() { _ = client.Close() }() - - _, err := client.Shutdown(ctx, &turbodprotocol.ShutdownRequest{}) - if err != nil { - c.Logger.Error(fmt.Sprintf("failed to shutdown running daemon. attempting to force it closed: %v", err)) - return c.killDeadServer(serverPid) - } - // Wait for the server to gracefully exit - err = backoff.Retry(func() error { - lockFile := c.lockFile() - owner, err := lockFile.GetOwner() - if os.IsNotExist(err) { - // If there is no pid more file, we can conclude that the daemon successfully - // exited and cleaned up after itself. - return nil - } else if err != nil { - // some other error occurred getting the lockfile owner - return backoff.Permanent(err) - } else if owner.Pid == serverPid { - // // We're still waiting for the server to shut down - return errNeedsRetry - } - // if there's no error and the lockfile has a new pid, someone else must've started a new daemon. - // Consider the old one killed and move on. - return nil - }, backoffWithTimeout(_shutdownTimeout)) - if errors.Is(err, errNeedsRetry) { - c.Logger.Error(fmt.Sprintf("daemon did not exit after %v, attempting to force it closed", _shutdownTimeout.String())) - return c.killDeadServer(serverPid) - } else if err != nil { - return err - } - return nil -} - -func (c *Connector) killDeadServer(pid int) error { - // currently the only error that this constructor returns is - // in the case that you don't provide an absolute path. - // Given that we require an absolute path as input, this should - // hopefully never happen. - lockFile := c.lockFile() - process, err := lockFile.GetOwner() - if err == nil { - // Check that this is the same process that we failed to connect to. - // Otherwise, connectInternal will loop around again and start with whatever - // new process has the pid file. - if process.Pid == pid { - // we have a process that we need to kill - // TODO(gsoltis): graceful kill? the process is already not responding to requests, - // but it could be in the middle of a graceful shutdown. Probably should let it clean - // itself up, and report an error and defer to a force-kill by the user - if err := process.Kill(); err != nil { - return err - } - } - return nil - } else if errors.Is(err, os.ErrNotExist) { - // There's no pid file. Someone else killed it. Returning no error will cause the - // connectInternal to loop around and try the connection again. - return nil - } - return err -} - -// Connect attempts to create a connection to a turbo daemon. -// Retries and daemon restarts are built in. If this fails, -// it is unlikely to succeed after an automated retry. -func (c *Connector) Connect(ctx context.Context) (*Client, error) { - client, err := c.connectInternal(ctx) - if err != nil { - return nil, c.wrapConnectionError(err) - } - return client, nil -} - -func (c *Connector) connectInternal(ctx context.Context) (*Client, error) { - // for each attempt, we: - // 1. try to find or start a daemon process, getting its pid - // 2. wait for the unix domain socket file to appear - // 3. connect to the unix domain socket. Note that this connection is not validated - // 4. send a hello message. This validates the connection as a side effect of - // negotiating versions, which currently requires exact match. - // In the event of a live, but incompatible server, we attempt to shut it down and start - // a new one. In the event of an unresponsive server, we attempt to kill the process - // identified by the pid file, with the hope that it will clean up after itself. - // Failures include details about where to find logs, the pid file, and the socket file. - for i := 0; i < _maxAttempts; i++ { - serverPid, err := c.getOrStartDaemon() - if err != nil { - // If we fail to even start the daemon process, return immediately, we're unlikely - // to succeed without user intervention - return nil, err - } - if err := c.waitForSocket(); errors.Is(err, ErrFailedToStart) { - // If we didn't see the socket file, try again. It's possible that - // the daemon encountered an transitory error - continue - } else if err != nil { - return nil, err - } - client, err := c.getClientConn() - if err != nil { - return nil, err - } - if err := c.sendHello(ctx, client); err == nil { - // We connected and negotiated a version, we're all set - return client, nil - } else if errors.Is(err, ErrVersionMismatch) { - // We don't want to knock down a perfectly fine daemon in a status check. - if c.Opts.DontKill { - return nil, err - } - - // We now know we aren't going to return this client, - // but killLiveServer still needs it to send the Shutdown request. - // killLiveServer will close the client when it is done with it. - if err := c.killLiveServer(ctx, client, serverPid); err != nil { - return nil, err - } - // Loops back around and tries again. - } else if errors.Is(err, errConnectionFailure) { - // close the client, see if we can kill the stale daemon - _ = client.Close() - if err := c.killDeadServer(serverPid); err != nil { - return nil, err - } - // if we successfully killed the dead server, loop around and try again - } else if err != nil { - // Some other error occurred, close the client and - // report the error to the user - if closeErr := client.Close(); closeErr != nil { - // In the event that we fail to close the client, bundle that error along also. - // Keep the original error in the error chain, as it's more likely to be useful - // or needed for matching on later. - err = errors.Wrapf(err, "also failed to close client connection: %v", closeErr) - } - return nil, err - } - } - return nil, ErrTooManyAttempts -} - -// getOrStartDaemon returns the PID of the daemon process on success. It may start -// the daemon if it doesn't find one running. -func (c *Connector) getOrStartDaemon() (int, error) { - lockFile := c.lockFile() - daemonProcess, getDaemonProcessErr := lockFile.GetOwner() - if getDaemonProcessErr != nil { - // If we're in a clean state this isn't an "error" per se. - // We attempt to start a daemon. - if errors.Is(getDaemonProcessErr, fs.ErrNotExist) { - if c.Opts.DontStart { - return 0, ErrDaemonNotRunning - } - pid, startDaemonErr := c.startDaemon() - if startDaemonErr != nil { - return 0, startDaemonErr - } - return pid, nil - } - - // We could have hit any number of errors. - // - Failed to read the file for permission reasons. - // - User emptied the file's contents. - // - etc. - return 0, errors.Wrapf(getDaemonProcessErr, "An issue was encountered with the pid file. Please remove it and try again: %v", c.PidPath) - } - - return daemonProcess.Pid, nil -} - -func (c *Connector) getClientConn() (*Client, error) { - creds := insecure.NewCredentials() - conn, err := grpc.Dial(c.addr(), grpc.WithTransportCredentials(creds)) - if err != nil { - return nil, err - } - tc := turbodprotocol.NewTurbodClient(conn) - return &Client{ - TurbodClient: tc, - ClientConn: conn, - SockPath: c.SockPath, - PidPath: c.PidPath, - LogPath: c.LogPath, - }, nil -} - -func (c *Connector) sendHello(ctx context.Context, client turbodprotocol.TurbodClient) error { - _, err := client.Hello(ctx, &turbodprotocol.HelloRequest{ - Version: c.TurboVersion, - // TODO: add session id - }) - status := status.Convert(err) - switch status.Code() { - case codes.OK: - return nil - case codes.FailedPrecondition: - return ErrVersionMismatch - case codes.Unavailable: - return errConnectionFailure - default: - return err - } -} - -var errNeedsRetry = errors.New("retry the operation") - -// backoffWithTimeout returns an exponential backoff, starting at 2ms and doubling until -// the specific timeout has elapsed. Note that backoff instances are stateful, so we need -// a new one each time we do a Retry. -func backoffWithTimeout(timeout time.Duration) *backoff.ExponentialBackOff { - return &backoff.ExponentialBackOff{ - Multiplier: 2, - InitialInterval: 2 * time.Millisecond, - MaxElapsedTime: timeout, - Clock: backoff.SystemClock, - Stop: backoff.Stop, - } -} - -// waitForSocket waits for the unix domain socket to appear -func (c *Connector) waitForSocket() error { - // Note that we don't care if this is our daemon - // or not. We started a process, but someone else could beat - // use to listening. That's fine, we'll check the version - // later. - err := backoff.Retry(func() error { - if !c.SockPath.FileExists() { - return errNeedsRetry - } - return nil - }, backoffWithTimeout(_socketPollTimeout)) - if errors.Is(err, errNeedsRetry) { - return ErrFailedToStart - } else if err != nil { - return err - } - return nil -} - -// startDaemon starts the daemon and returns the pid for the new process -func (c *Connector) startDaemon() (int, error) { - args := []string{"daemon"} - if c.Opts.ServerTimeout != 0 { - args = append(args, fmt.Sprintf("--idle-time=%v", c.Opts.ServerTimeout.String())) - } - c.Logger.Debug(fmt.Sprintf("starting turbod binary %v", c.Bin)) - cmd := exec.Command(c.Bin, args...) - // For the daemon to have its own process group id so that any attempts - // to kill it and its process tree don't kill this client. - cmd.SysProcAttr = getSysProcAttrs() - err := cmd.Start() - if err != nil { - return 0, err - } - return cmd.Process.Pid, nil -} diff --git a/cli/internal/daemon/connector/connector_test.go b/cli/internal/daemon/connector/connector_test.go deleted file mode 100644 index 62b4504..0000000 --- a/cli/internal/daemon/connector/connector_test.go +++ /dev/null @@ -1,256 +0,0 @@ -package connector - -import ( - "context" - "errors" - "net" - "os/exec" - "runtime" - "strconv" - "testing" - - "github.com/hashicorp/go-hclog" - "github.com/nightlyone/lockfile" - "github.com/vercel/turbo/cli/internal/fs" - "github.com/vercel/turbo/cli/internal/turbodprotocol" - "github.com/vercel/turbo/cli/internal/turbopath" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/status" - "google.golang.org/grpc/test/bufconn" - "gotest.tools/v3/assert" -) - -// testBin returns a platform-appropriate executable to run node. -// Node works here as an arbitrary process to start, since it's -// required for turbo development. It will obviously not implement -// our grpc service, use a mockServer instance where that's needed. -func testBin() string { - if runtime.GOOS == "windows" { - return "node.exe" - } - return "node" -} - -func getUnixSocket(dir turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath { - return dir.UntypedJoin("turbod-test.sock") -} - -func getPidFile(dir turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath { - return dir.UntypedJoin("turbod-test.pid") -} - -func TestGetOrStartDaemonInvalidPIDFile(t *testing.T) { - logger := hclog.Default() - dir := t.TempDir() - dirPath := fs.AbsoluteSystemPathFromUpstream(dir) - - pidPath := getPidFile(dirPath) - writeFileErr := pidPath.WriteFile(nil, 0777) - assert.NilError(t, writeFileErr, "WriteFile") - - c := &Connector{ - Logger: logger, - Opts: Opts{}, - PidPath: pidPath, - } - - pid, err := c.getOrStartDaemon() - assert.Equal(t, pid, 0) - assert.ErrorContains(t, err, "issue was encountered with the pid file") -} - -func TestConnectFailsWithoutGrpcServer(t *testing.T) { - // We aren't starting a server that is going to write - // to our socket file, so we should see a series of connection - // failures, followed by ErrTooManyAttempts - logger := hclog.Default() - dir := t.TempDir() - dirPath := fs.AbsoluteSystemPathFromUpstream(dir) - - sockPath := getUnixSocket(dirPath) - pidPath := getPidFile(dirPath) - ctx := context.Background() - bin := testBin() - c := &Connector{ - Logger: logger, - Bin: bin, - Opts: Opts{}, - SockPath: sockPath, - PidPath: pidPath, - } - // Note that we expect ~3s here, for 3 attempts with a timeout of 1s - _, err := c.connectInternal(ctx) - assert.ErrorIs(t, err, ErrTooManyAttempts) -} - -func TestKillDeadServerNoPid(t *testing.T) { - logger := hclog.Default() - dir := t.TempDir() - dirPath := fs.AbsoluteSystemPathFromUpstream(dir) - - sockPath := getUnixSocket(dirPath) - pidPath := getPidFile(dirPath) - c := &Connector{ - Logger: logger, - Bin: "nonexistent", - Opts: Opts{}, - SockPath: sockPath, - PidPath: pidPath, - } - - err := c.killDeadServer(99999) - assert.NilError(t, err, "killDeadServer") -} - -func TestKillDeadServerNoProcess(t *testing.T) { - logger := hclog.Default() - dir := t.TempDir() - dirPath := fs.AbsoluteSystemPathFromUpstream(dir) - - sockPath := getUnixSocket(dirPath) - pidPath := getPidFile(dirPath) - // Simulate the socket already existing, with no live daemon - err := sockPath.WriteFile([]byte("junk"), 0644) - assert.NilError(t, err, "WriteFile") - err = pidPath.WriteFile([]byte("99999"), 0644) - assert.NilError(t, err, "WriteFile") - c := &Connector{ - Logger: logger, - Bin: "nonexistent", - Opts: Opts{}, - SockPath: sockPath, - PidPath: pidPath, - } - - err = c.killDeadServer(99999) - assert.ErrorIs(t, err, lockfile.ErrDeadOwner) - stillExists := pidPath.FileExists() - if !stillExists { - t.Error("pidPath should still exist, expected the user to clean it up") - } -} - -func TestKillDeadServerWithProcess(t *testing.T) { - logger := hclog.Default() - dir := t.TempDir() - dirPath := fs.AbsoluteSystemPathFromUpstream(dir) - - sockPath := getUnixSocket(dirPath) - pidPath := getPidFile(dirPath) - // Simulate the socket already existing, with no live daemon - err := sockPath.WriteFile([]byte("junk"), 0644) - assert.NilError(t, err, "WriteFile") - bin := testBin() - cmd := exec.Command(bin) - err = cmd.Start() - assert.NilError(t, err, "cmd.Start") - pid := cmd.Process.Pid - if pid == 0 { - t.Fatalf("failed to start process %v", bin) - } - - err = pidPath.WriteFile([]byte(strconv.Itoa(pid)), 0644) - assert.NilError(t, err, "WriteFile") - c := &Connector{ - Logger: logger, - Bin: "nonexistent", - Opts: Opts{}, - SockPath: sockPath, - PidPath: pidPath, - } - - err = c.killDeadServer(pid) - assert.NilError(t, err, "killDeadServer") - stillExists := pidPath.FileExists() - if !stillExists { - t.Error("pidPath no longer exists, expected client to not clean it up") - } - err = cmd.Wait() - exitErr := &exec.ExitError{} - if !errors.As(err, &exitErr) { - t.Errorf("expected an exit error from %v, got %v", bin, err) - } -} - -type mockServer struct { - turbodprotocol.UnimplementedTurbodServer - helloErr error - shutdownResp *turbodprotocol.ShutdownResponse - pidFile turbopath.AbsoluteSystemPath -} - -// Simulates server exiting by cleaning up the pid file -func (s *mockServer) Shutdown(ctx context.Context, req *turbodprotocol.ShutdownRequest) (*turbodprotocol.ShutdownResponse, error) { - if err := s.pidFile.Remove(); err != nil { - return nil, err - } - return s.shutdownResp, nil -} - -func (s *mockServer) Hello(ctx context.Context, req *turbodprotocol.HelloRequest) (*turbodprotocol.HelloResponse, error) { - if req.Version == "" { - return nil, errors.New("missing version") - } - return nil, s.helloErr -} - -func TestKillLiveServer(t *testing.T) { - logger := hclog.Default() - dir := t.TempDir() - dirPath := fs.AbsoluteSystemPathFromUpstream(dir) - - sockPath := getUnixSocket(dirPath) - pidPath := getPidFile(dirPath) - err := pidPath.WriteFile([]byte("99999"), 0644) - assert.NilError(t, err, "WriteFile") - - ctx := context.Background() - c := &Connector{ - Logger: logger, - Bin: "nonexistent", - Opts: Opts{}, - SockPath: sockPath, - PidPath: pidPath, - TurboVersion: "some-version", - } - - st := status.New(codes.FailedPrecondition, "version mismatch") - mock := &mockServer{ - shutdownResp: &turbodprotocol.ShutdownResponse{}, - helloErr: st.Err(), - pidFile: pidPath, - } - lis := bufconn.Listen(1024 * 1024) - grpcServer := grpc.NewServer() - turbodprotocol.RegisterTurbodServer(grpcServer, mock) - go func(t *testing.T) { - if err := grpcServer.Serve(lis); err != nil { - t.Logf("server closed: %v", err) - } - }(t) - - conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) { - return lis.Dial() - }), grpc.WithTransportCredentials(insecure.NewCredentials())) - assert.NilError(t, err, "DialContext") - turboClient := turbodprotocol.NewTurbodClient(conn) - client := &Client{ - TurbodClient: turboClient, - ClientConn: conn, - } - err = c.sendHello(ctx, client) - if !errors.Is(err, ErrVersionMismatch) { - t.Errorf("sendHello error got %v, want %v", err, ErrVersionMismatch) - } - err = c.killLiveServer(ctx, client, 99999) - assert.NilError(t, err, "killLiveServer") - // Expect the pid file and socket files to have been cleaned up - if pidPath.FileExists() { - t.Errorf("expected pid file to have been deleted: %v", pidPath) - } - if sockPath.FileExists() { - t.Errorf("expected socket file to have been deleted: %v", sockPath) - } -} diff --git a/cli/internal/daemon/connector/fork.go b/cli/internal/daemon/connector/fork.go deleted file mode 100644 index 8a6d01d..0000000 --- a/cli/internal/daemon/connector/fork.go +++ /dev/null @@ -1,15 +0,0 @@ -//go:build !windows -// +build !windows - -package connector - -import "syscall" - -// getSysProcAttrs returns the platform-specific attributes we want to -// use while forking the daemon process. Currently this is limited to -// forcing a new process group -func getSysProcAttrs() *syscall.SysProcAttr { - return &syscall.SysProcAttr{ - Setpgid: true, - } -} diff --git a/cli/internal/daemon/connector/fork_windows.go b/cli/internal/daemon/connector/fork_windows.go deleted file mode 100644 index b9d6e77..0000000 --- a/cli/internal/daemon/connector/fork_windows.go +++ /dev/null @@ -1,15 +0,0 @@ -//go:build windows -// +build windows - -package connector - -import "syscall" - -// getSysProcAttrs returns the platform-specific attributes we want to -// use while forking the daemon process. Currently this is limited to -// forcing a new process group -func getSysProcAttrs() *syscall.SysProcAttr { - return &syscall.SysProcAttr{ - CreationFlags: syscall.CREATE_NEW_PROCESS_GROUP, - } -} diff --git a/cli/internal/daemon/daemon.go b/cli/internal/daemon/daemon.go deleted file mode 100644 index 81d5283..0000000 --- a/cli/internal/daemon/daemon.go +++ /dev/null @@ -1,307 +0,0 @@ -package daemon - -import ( - "context" - "crypto/sha256" - "encoding/hex" - "fmt" - "io" - "net" - "os" - "path/filepath" - "strings" - "time" - - grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" - "github.com/hashicorp/go-hclog" - "github.com/nightlyone/lockfile" - "github.com/pkg/errors" - "github.com/vercel/turbo/cli/internal/cmdutil" - "github.com/vercel/turbo/cli/internal/daemon/connector" - "github.com/vercel/turbo/cli/internal/fs" - "github.com/vercel/turbo/cli/internal/server" - "github.com/vercel/turbo/cli/internal/signals" - "github.com/vercel/turbo/cli/internal/turbopath" - "github.com/vercel/turbo/cli/internal/turbostate" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -type daemon struct { - logger hclog.Logger - repoRoot turbopath.AbsoluteSystemPath - timeout time.Duration - reqCh chan struct{} - timedOutCh chan struct{} -} - -func getRepoHash(repoRoot turbopath.AbsoluteSystemPath) string { - pathHash := sha256.Sum256([]byte(repoRoot.ToString())) - // We grab a substring of the hash because there is a 108-character limit on the length - // of a filepath for unix domain socket. - return hex.EncodeToString(pathHash[:])[:16] -} - -func getDaemonFileRoot(repoRoot turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath { - tempDir := fs.TempDir("turbod") - hexHash := getRepoHash(repoRoot) - return tempDir.UntypedJoin(hexHash) -} - -func getLogFilePath(repoRoot turbopath.AbsoluteSystemPath) (turbopath.AbsoluteSystemPath, error) { - hexHash := getRepoHash(repoRoot) - base := repoRoot.Base() - logFilename := fmt.Sprintf("%v-%v.log", hexHash, base) - - logsDir := fs.GetTurboDataDir().UntypedJoin("logs") - return logsDir.UntypedJoin(logFilename), nil -} - -func getUnixSocket(repoRoot turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath { - root := getDaemonFileRoot(repoRoot) - return root.UntypedJoin("turbod.sock") -} - -func getPidFile(repoRoot turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath { - root := getDaemonFileRoot(repoRoot) - return root.UntypedJoin("turbod.pid") -} - -// logError logs an error and outputs it to the UI. -func (d *daemon) logError(err error) { - d.logger.Error(fmt.Sprintf("error %v", err)) -} - -// we're only appending, and we're creating the file if it doesn't exist. -// we do not need to read the log file. -var _logFileFlags = os.O_WRONLY | os.O_APPEND | os.O_CREATE - -// ExecuteDaemon executes the root daemon command -func ExecuteDaemon(ctx context.Context, helper *cmdutil.Helper, signalWatcher *signals.Watcher, args *turbostate.ParsedArgsFromRust) error { - base, err := helper.GetCmdBase(args) - if err != nil { - return err - } - if args.TestRun { - base.UI.Info("Daemon test run successful") - return nil - } - - idleTimeout := 4 * time.Hour - if args.Command.Daemon.IdleTimeout != "" { - idleTimeout, err = time.ParseDuration(args.Command.Daemon.IdleTimeout) - if err != nil { - return err - } - } - - logFilePath, err := getLogFilePath(base.RepoRoot) - if err != nil { - return err - } - if err := logFilePath.EnsureDir(); err != nil { - return err - } - logFile, err := logFilePath.OpenFile(_logFileFlags, 0644) - if err != nil { - return err - } - defer func() { _ = logFile.Close() }() - logger := hclog.New(&hclog.LoggerOptions{ - Output: io.MultiWriter(logFile, os.Stdout), - Level: hclog.Info, - Color: hclog.ColorOff, - Name: "turbod", - }) - - d := &daemon{ - logger: logger, - repoRoot: base.RepoRoot, - timeout: idleTimeout, - reqCh: make(chan struct{}), - timedOutCh: make(chan struct{}), - } - serverName := getRepoHash(base.RepoRoot) - turboServer, err := server.New(serverName, d.logger.Named("rpc server"), base.RepoRoot, base.TurboVersion, logFilePath) - if err != nil { - d.logError(err) - return err - } - defer func() { _ = turboServer.Close() }() - err = d.runTurboServer(ctx, turboServer, signalWatcher) - if err != nil { - d.logError(err) - return err - } - return nil -} - -var errInactivityTimeout = errors.New("turbod shut down from inactivity") - -// tryAcquirePidfileLock attempts to ensure that only one daemon is running from the given pid file path -// at a time. If this process fails to write its PID to the lockfile, it must exit. -func tryAcquirePidfileLock(pidPath turbopath.AbsoluteSystemPath) (lockfile.Lockfile, error) { - if err := pidPath.EnsureDir(); err != nil { - return "", err - } - lockFile, err := lockfile.New(pidPath.ToString()) - if err != nil { - // lockfile.New should only return an error if it wasn't given an absolute path. - // We are attempting to use the type system to enforce that we are passing an - // absolute path. An error here likely means a bug, and we should crash. - panic(err) - } - if err := lockFile.TryLock(); err != nil { - return "", err - } - return lockFile, nil -} - -type rpcServer interface { - Register(grpcServer server.GRPCServer) -} - -func (d *daemon) runTurboServer(parentContext context.Context, rpcServer rpcServer, signalWatcher *signals.Watcher) error { - ctx, cancel := context.WithCancel(parentContext) - defer cancel() - pidPath := getPidFile(d.repoRoot) - lock, err := tryAcquirePidfileLock(pidPath) - if err != nil { - return errors.Wrapf(err, "failed to lock the pid file at %v. Is another turbo daemon running?", lock) - } - // When we're done serving, clean up the pid file. - // Also, if *this* goroutine panics, make sure we unlock the pid file. - defer func() { - if err := lock.Unlock(); err != nil { - d.logger.Error(errors.Wrapf(err, "failed unlocking pid file at %v", lock).Error()) - } - }() - // This handler runs in request goroutines. If a request causes a panic, - // this handler will get called after a call to recover(), meaning we are - // no longer panicking. We return a server error and cancel our context, - // which triggers a shutdown of the server. - panicHandler := func(thePanic interface{}) error { - cancel() - d.logger.Error(fmt.Sprintf("Caught panic %v", thePanic)) - return status.Error(codes.Internal, "server panicked") - } - - // If we have the lock, assume that we are the owners of the socket file, - // whether it already exists or not. That means we are free to remove it. - sockPath := getUnixSocket(d.repoRoot) - if err := sockPath.Remove(); err != nil && !errors.Is(err, os.ErrNotExist) { - return err - } - d.logger.Debug(fmt.Sprintf("Using socket path %v (%v)\n", sockPath, len(sockPath))) - lis, err := net.Listen("unix", sockPath.ToString()) - if err != nil { - return err - } - // We don't need to explicitly close 'lis', the grpc server will handle that - s := grpc.NewServer( - grpc.ChainUnaryInterceptor( - d.onRequest, - grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandler(panicHandler)), - ), - ) - go d.timeoutLoop(ctx) - - rpcServer.Register(s) - errCh := make(chan error) - go func(errCh chan<- error) { - if err := s.Serve(lis); err != nil { - errCh <- err - } - close(errCh) - }(errCh) - - // Note that we aren't deferring s.GracefulStop here because we also need - // to drain the error channel, which isn't guaranteed to happen until - // the server has stopped. That in turn may depend on GracefulStop being - // called. - // Future work could restructure this to make that simpler. - var exitErr error - select { - case err, ok := <-errCh: - // The server exited - if ok { - exitErr = err - } - case <-d.timedOutCh: - // This is the inactivity timeout case - exitErr = errInactivityTimeout - s.GracefulStop() - case <-ctx.Done(): - // If a request handler panics, it will cancel this context - s.GracefulStop() - case <-signalWatcher.Done(): - // This is fired if caught a signal - s.GracefulStop() - } - // Wait for the server to exit, if it hasn't already. - // When it does, this channel will close. We don't - // care about the error in this scenario because we've - // either requested a close via cancelling the context, - // an inactivity timeout, or caught a signal. - for range errCh { - } - return exitErr -} - -func (d *daemon) onRequest(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { - d.reqCh <- struct{}{} - return handler(ctx, req) -} - -func (d *daemon) timeoutLoop(ctx context.Context) { - timeoutCh := time.After(d.timeout) -outer: - for { - select { - case <-d.reqCh: - timeoutCh = time.After(d.timeout) - case <-timeoutCh: - close(d.timedOutCh) - break outer - case <-ctx.Done(): - break outer - } - } -} - -// ClientOpts re-exports connector.Ops to encapsulate the connector package -type ClientOpts = connector.Opts - -// Client re-exports connector.Client to encapsulate the connector package -type Client = connector.Client - -// GetClient returns a client that can be used to interact with the daemon -func GetClient(ctx context.Context, repoRoot turbopath.AbsoluteSystemPath, logger hclog.Logger, turboVersion string, opts ClientOpts) (*Client, error) { - sockPath := getUnixSocket(repoRoot) - pidPath := getPidFile(repoRoot) - logPath, err := getLogFilePath(repoRoot) - if err != nil { - return nil, err - } - bin, err := os.Executable() - if err != nil { - return nil, err - } - // The Go binary can no longer be called directly, so we need to route back to the rust wrapper - if strings.HasSuffix(bin, "go-turbo") { - bin = filepath.Join(filepath.Dir(bin), "turbo") - } else if strings.HasSuffix(bin, "go-turbo.exe") { - bin = filepath.Join(filepath.Dir(bin), "turbo.exe") - } - c := &connector.Connector{ - Logger: logger.Named("TurbodClient"), - Bin: bin, - Opts: opts, - SockPath: sockPath, - PidPath: pidPath, - LogPath: logPath, - TurboVersion: turboVersion, - } - return c.Connect(ctx) -} diff --git a/cli/internal/daemon/daemon_test.go b/cli/internal/daemon/daemon_test.go deleted file mode 100644 index 66a714d..0000000 --- a/cli/internal/daemon/daemon_test.go +++ /dev/null @@ -1,262 +0,0 @@ -package daemon - -import ( - "context" - "errors" - "os/exec" - "runtime" - "strconv" - "sync" - "testing" - "time" - - "github.com/hashicorp/go-hclog" - "github.com/nightlyone/lockfile" - "github.com/vercel/turbo/cli/internal/fs" - "github.com/vercel/turbo/cli/internal/server" - "github.com/vercel/turbo/cli/internal/signals" - "github.com/vercel/turbo/cli/internal/turbopath" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/test/grpc_testing" - "gotest.tools/v3/assert" -) - -// testBin returns a platform-appropriate node binary. -// We need some process to be running and findable by the -// lockfile library, and we don't particularly care what it is. -// Since node is required for turbo development, it makes a decent -// candidate. -func testBin() string { - if runtime.GOOS == "windows" { - return "node.exe" - } - return "node" -} - -func TestPidFileLock(t *testing.T) { - repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir()) - - pidPath := getPidFile(repoRoot) - // the lockfile library handles removing pids from dead owners - _, err := tryAcquirePidfileLock(pidPath) - assert.NilError(t, err, "acquirePidLock") - - // Start up a node process and fake a pid file for it. - // Ensure that we can't start the daemon while the node process is live - bin := testBin() - node := exec.Command(bin) - err = node.Start() - assert.NilError(t, err, "Start") - stopNode := func() error { - if err := node.Process.Kill(); err != nil { - return err - } - // We expect an error from node, we just sent a kill signal - _ = node.Wait() - return nil - } - // In case we fail the test, still try to kill the node process - t.Cleanup(func() { _ = stopNode() }) - nodePid := node.Process.Pid - err = pidPath.WriteFile([]byte(strconv.Itoa(nodePid)), 0644) - assert.NilError(t, err, "WriteFile") - - _, err = tryAcquirePidfileLock(pidPath) - assert.ErrorIs(t, err, lockfile.ErrBusy) - - // Stop the node process, but leave the pid file there - // This simulates a crash - err = stopNode() - assert.NilError(t, err, "stopNode") - // the lockfile library handles removing pids from dead owners - _, err = tryAcquirePidfileLock(pidPath) - assert.NilError(t, err, "acquirePidLock") -} - -type testRPCServer struct { - grpc_testing.UnimplementedTestServiceServer - registered chan struct{} -} - -func (ts *testRPCServer) EmptyCall(ctx context.Context, req *grpc_testing.Empty) (*grpc_testing.Empty, error) { - panic("intended to panic") -} - -func (ts *testRPCServer) Register(grpcServer server.GRPCServer) { - grpc_testing.RegisterTestServiceServer(grpcServer, ts) - ts.registered <- struct{}{} -} - -func newTestRPCServer() *testRPCServer { - return &testRPCServer{ - registered: make(chan struct{}, 1), - } -} - -func waitForFile(t *testing.T, filename turbopath.AbsoluteSystemPath, timeout time.Duration) { - t.Helper() - deadline := time.After(timeout) -outer: - for !filename.FileExists() { - select { - case <-deadline: - break outer - case <-time.After(10 * time.Millisecond): - } - } - if !filename.FileExists() { - t.Errorf("timed out waiting for %v to exist after %v", filename, timeout) - } -} - -func TestDaemonLifecycle(t *testing.T) { - logger := hclog.Default() - logger.SetLevel(hclog.Debug) - repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir()) - - ts := newTestRPCServer() - watcher := signals.NewWatcher() - ctx, cancel := context.WithCancel(context.Background()) - - d := &daemon{ - logger: logger, - repoRoot: repoRoot, - timeout: 10 * time.Second, - reqCh: make(chan struct{}), - timedOutCh: make(chan struct{}), - } - - var serverErr error - wg := &sync.WaitGroup{} - wg.Add(1) - go func() { - serverErr = d.runTurboServer(ctx, ts, watcher) - wg.Done() - }() - - sockPath := getUnixSocket(repoRoot) - waitForFile(t, sockPath, 30*time.Second) - pidPath := getPidFile(repoRoot) - waitForFile(t, pidPath, 1*time.Second) - cancel() - wg.Wait() - assert.NilError(t, serverErr, "runTurboServer") - if sockPath.FileExists() { - t.Errorf("%v still exists, should have been cleaned up", sockPath) - } - if pidPath.FileExists() { - t.Errorf("%v still exists, should have been cleaned up", sockPath) - } -} - -func TestTimeout(t *testing.T) { - logger := hclog.Default() - logger.SetLevel(hclog.Debug) - repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir()) - - ts := newTestRPCServer() - watcher := signals.NewWatcher() - ctx := context.Background() - - d := &daemon{ - logger: logger, - repoRoot: repoRoot, - timeout: 5 * time.Millisecond, - reqCh: make(chan struct{}), - timedOutCh: make(chan struct{}), - } - err := d.runTurboServer(ctx, ts, watcher) - if !errors.Is(err, errInactivityTimeout) { - t.Errorf("server error got %v, want %v", err, errInactivityTimeout) - } -} - -func TestCaughtSignal(t *testing.T) { - logger := hclog.Default() - logger.SetLevel(hclog.Debug) - repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir()) - - ts := newTestRPCServer() - watcher := signals.NewWatcher() - ctx := context.Background() - - d := &daemon{ - logger: logger, - repoRoot: repoRoot, - timeout: 5 * time.Second, - reqCh: make(chan struct{}), - timedOutCh: make(chan struct{}), - } - errCh := make(chan error) - go func() { - err := d.runTurboServer(ctx, ts, watcher) - errCh <- err - }() - <-ts.registered - // grpc doesn't provide a signal to know when the server is serving. - // So while this call to Close can race with the call to grpc.Server.Serve, if we've - // registered with the turboserver, we've registered all of our - // signal handlers as well. We just may or may not be serving when Close() - // is called. It shouldn't matter for the purposes of this test: - // Either we are serving, and Serve will return with nil when GracefulStop is - // called, or we aren't serving yet, and the subsequent call to Serve will - // immediately return with grpc.ErrServerStopped. So, both nil and grpc.ErrServerStopped - // are acceptable outcomes for runTurboServer. Any other error, or a timeout, is a - // failure. - watcher.Close() - - err := <-errCh - pidPath := getPidFile(repoRoot) - if pidPath.FileExists() { - t.Errorf("expected to clean up %v, but it still exists", pidPath) - } - // We'll either get nil or ErrServerStopped, depending on whether - // or not we close the signal watcher before grpc.Server.Serve was - // called. - if err != nil && !errors.Is(err, grpc.ErrServerStopped) { - t.Errorf("runTurboServer got err %v, want nil or ErrServerStopped", err) - } -} - -func TestCleanupOnPanic(t *testing.T) { - logger := hclog.Default() - logger.SetLevel(hclog.Debug) - repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir()) - - ts := newTestRPCServer() - watcher := signals.NewWatcher() - ctx := context.Background() - - d := &daemon{ - logger: logger, - repoRoot: repoRoot, - timeout: 5 * time.Second, - reqCh: make(chan struct{}), - timedOutCh: make(chan struct{}), - } - errCh := make(chan error) - go func() { - err := d.runTurboServer(ctx, ts, watcher) - errCh <- err - }() - <-ts.registered - - creds := insecure.NewCredentials() - sockFile := getUnixSocket(repoRoot) - conn, err := grpc.Dial("unix://"+sockFile.ToString(), grpc.WithTransportCredentials(creds)) - assert.NilError(t, err, "Dial") - - client := grpc_testing.NewTestServiceClient(conn) - _, err = client.EmptyCall(ctx, &grpc_testing.Empty{}) - if err == nil { - t.Error("nil error") - } - // wait for the server to finish - <-errCh - - pidPath := getPidFile(repoRoot) - if pidPath.FileExists() { - t.Errorf("expected to clean up %v, but it still exists", pidPath) - } -} |
