aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/cli/internal/daemon
diff options
context:
space:
mode:
Diffstat (limited to 'cli/internal/daemon')
-rw-r--r--cli/internal/daemon/connector/connector.go391
-rw-r--r--cli/internal/daemon/connector/connector_test.go256
-rw-r--r--cli/internal/daemon/connector/fork.go15
-rw-r--r--cli/internal/daemon/connector/fork_windows.go15
-rw-r--r--cli/internal/daemon/daemon.go307
-rw-r--r--cli/internal/daemon/daemon_test.go262
6 files changed, 0 insertions, 1246 deletions
diff --git a/cli/internal/daemon/connector/connector.go b/cli/internal/daemon/connector/connector.go
deleted file mode 100644
index d05ef59..0000000
--- a/cli/internal/daemon/connector/connector.go
+++ /dev/null
@@ -1,391 +0,0 @@
-package connector
-
-import (
- "context"
- "fmt"
- "io/fs"
- "os"
- "os/exec"
- "time"
-
- "github.com/cenkalti/backoff/v4"
- "github.com/hashicorp/go-hclog"
- "github.com/nightlyone/lockfile"
- "github.com/pkg/errors"
- "github.com/vercel/turbo/cli/internal/turbodprotocol"
- "github.com/vercel/turbo/cli/internal/turbopath"
- "google.golang.org/grpc"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/credentials/insecure"
- "google.golang.org/grpc/status"
-)
-
-var (
- // ErrFailedToStart is returned when the daemon process cannot be started
- ErrFailedToStart = errors.New("daemon could not be started")
- // ErrVersionMismatch is returned when the daemon process was spawned by a different version than the connecting client
- ErrVersionMismatch = errors.New("daemon version does not match client version")
- errConnectionFailure = errors.New("could not connect to daemon")
- // ErrTooManyAttempts is returned when the client fails to connect too many times
- ErrTooManyAttempts = errors.New("reached maximum number of attempts contacting daemon")
- // ErrDaemonNotRunning is returned when the client cannot contact the daemon and has
- // been instructed not to attempt to start a new daemon
- ErrDaemonNotRunning = errors.New("the daemon is not running")
-)
-
-// Opts is the set of configurable options for the client connection,
-// including some options to be passed through to the daemon process if
-// it needs to be started.
-type Opts struct {
- ServerTimeout time.Duration
- DontStart bool // if true, don't attempt to start the daemon
- DontKill bool // if true, don't attempt to kill the daemon
-}
-
-// Client represents a connection to the daemon process
-type Client struct {
- turbodprotocol.TurbodClient
- *grpc.ClientConn
- SockPath turbopath.AbsoluteSystemPath
- PidPath turbopath.AbsoluteSystemPath
- LogPath turbopath.AbsoluteSystemPath
-}
-
-// Connector instances are used to create a connection to turbo's daemon process
-// The daemon will be started , or killed and restarted, if necessary
-type Connector struct {
- Logger hclog.Logger
- Bin string
- Opts Opts
- SockPath turbopath.AbsoluteSystemPath
- PidPath turbopath.AbsoluteSystemPath
- LogPath turbopath.AbsoluteSystemPath
- TurboVersion string
-}
-
-// ConnectionError is returned in the error case from connect. It wraps the underlying
-// cause and adds a message with the relevant files for the user to check.
-type ConnectionError struct {
- SockPath turbopath.AbsoluteSystemPath
- PidPath turbopath.AbsoluteSystemPath
- LogPath turbopath.AbsoluteSystemPath
- cause error
-}
-
-func (ce *ConnectionError) Error() string {
- return fmt.Sprintf(`connection to turbo daemon process failed. Please ensure the following:
- - the process identified by the pid in the file at %v is not running, and remove %v
- - check the logs at %v
- - the unix domain socket at %v has been removed
- You can also run without the daemon process by passing --no-daemon`, ce.PidPath, ce.PidPath, ce.LogPath, ce.SockPath)
-}
-
-// Unwrap allows a connection error to work with standard library "errors" and compatible packages
-func (ce *ConnectionError) Unwrap() error {
- return ce.cause
-}
-
-func (c *Connector) wrapConnectionError(err error) error {
- return &ConnectionError{
- SockPath: c.SockPath,
- PidPath: c.PidPath,
- LogPath: c.LogPath,
- cause: err,
- }
-}
-
-// lockFile returns a pointer to where a lockfile should be.
-// lockfile.New does not perform IO and the only error it produces
-// is in the case a non-absolute path was provided. We're guaranteeing an
-// turbopath.AbsoluteSystemPath, so an error here is an indication of a bug and
-// we should crash.
-func (c *Connector) lockFile() lockfile.Lockfile {
- lockFile, err := lockfile.New(c.PidPath.ToString())
- if err != nil {
- panic(err)
- }
- return lockFile
-}
-
-func (c *Connector) addr() string {
- // grpc special-cases parsing of unix:<path> urls
- // to avoid url.Parse. This lets us pass through our absolute
- // paths unmodified, even on windows.
- // See code here: https://github.com/grpc/grpc-go/blob/d83070ec0d9043f713b6a63e1963c593b447208c/internal/transport/http_util.go#L392
- return fmt.Sprintf("unix:%v", c.SockPath.ToString())
-}
-
-// We defer to the daemon's pid file as the locking mechanism.
-// If it doesn't exist, we will attempt to start the daemon.
-// If the daemon has a different version, ask it to shut down.
-// If the pid file exists but we can't connect, try to kill
-// the daemon.
-// If we can't cause the daemon to remove the pid file, report
-// an error to the user that includes the file location so that
-// they can resolve it.
-const (
- _maxAttempts = 3
- _shutdownTimeout = 1 * time.Second
- _socketPollTimeout = 1 * time.Second
-)
-
-// killLiveServer tells a running server to shut down. This method is also responsible
-// for closing this client connection.
-func (c *Connector) killLiveServer(ctx context.Context, client *Client, serverPid int) error {
- defer func() { _ = client.Close() }()
-
- _, err := client.Shutdown(ctx, &turbodprotocol.ShutdownRequest{})
- if err != nil {
- c.Logger.Error(fmt.Sprintf("failed to shutdown running daemon. attempting to force it closed: %v", err))
- return c.killDeadServer(serverPid)
- }
- // Wait for the server to gracefully exit
- err = backoff.Retry(func() error {
- lockFile := c.lockFile()
- owner, err := lockFile.GetOwner()
- if os.IsNotExist(err) {
- // If there is no pid more file, we can conclude that the daemon successfully
- // exited and cleaned up after itself.
- return nil
- } else if err != nil {
- // some other error occurred getting the lockfile owner
- return backoff.Permanent(err)
- } else if owner.Pid == serverPid {
- // // We're still waiting for the server to shut down
- return errNeedsRetry
- }
- // if there's no error and the lockfile has a new pid, someone else must've started a new daemon.
- // Consider the old one killed and move on.
- return nil
- }, backoffWithTimeout(_shutdownTimeout))
- if errors.Is(err, errNeedsRetry) {
- c.Logger.Error(fmt.Sprintf("daemon did not exit after %v, attempting to force it closed", _shutdownTimeout.String()))
- return c.killDeadServer(serverPid)
- } else if err != nil {
- return err
- }
- return nil
-}
-
-func (c *Connector) killDeadServer(pid int) error {
- // currently the only error that this constructor returns is
- // in the case that you don't provide an absolute path.
- // Given that we require an absolute path as input, this should
- // hopefully never happen.
- lockFile := c.lockFile()
- process, err := lockFile.GetOwner()
- if err == nil {
- // Check that this is the same process that we failed to connect to.
- // Otherwise, connectInternal will loop around again and start with whatever
- // new process has the pid file.
- if process.Pid == pid {
- // we have a process that we need to kill
- // TODO(gsoltis): graceful kill? the process is already not responding to requests,
- // but it could be in the middle of a graceful shutdown. Probably should let it clean
- // itself up, and report an error and defer to a force-kill by the user
- if err := process.Kill(); err != nil {
- return err
- }
- }
- return nil
- } else if errors.Is(err, os.ErrNotExist) {
- // There's no pid file. Someone else killed it. Returning no error will cause the
- // connectInternal to loop around and try the connection again.
- return nil
- }
- return err
-}
-
-// Connect attempts to create a connection to a turbo daemon.
-// Retries and daemon restarts are built in. If this fails,
-// it is unlikely to succeed after an automated retry.
-func (c *Connector) Connect(ctx context.Context) (*Client, error) {
- client, err := c.connectInternal(ctx)
- if err != nil {
- return nil, c.wrapConnectionError(err)
- }
- return client, nil
-}
-
-func (c *Connector) connectInternal(ctx context.Context) (*Client, error) {
- // for each attempt, we:
- // 1. try to find or start a daemon process, getting its pid
- // 2. wait for the unix domain socket file to appear
- // 3. connect to the unix domain socket. Note that this connection is not validated
- // 4. send a hello message. This validates the connection as a side effect of
- // negotiating versions, which currently requires exact match.
- // In the event of a live, but incompatible server, we attempt to shut it down and start
- // a new one. In the event of an unresponsive server, we attempt to kill the process
- // identified by the pid file, with the hope that it will clean up after itself.
- // Failures include details about where to find logs, the pid file, and the socket file.
- for i := 0; i < _maxAttempts; i++ {
- serverPid, err := c.getOrStartDaemon()
- if err != nil {
- // If we fail to even start the daemon process, return immediately, we're unlikely
- // to succeed without user intervention
- return nil, err
- }
- if err := c.waitForSocket(); errors.Is(err, ErrFailedToStart) {
- // If we didn't see the socket file, try again. It's possible that
- // the daemon encountered an transitory error
- continue
- } else if err != nil {
- return nil, err
- }
- client, err := c.getClientConn()
- if err != nil {
- return nil, err
- }
- if err := c.sendHello(ctx, client); err == nil {
- // We connected and negotiated a version, we're all set
- return client, nil
- } else if errors.Is(err, ErrVersionMismatch) {
- // We don't want to knock down a perfectly fine daemon in a status check.
- if c.Opts.DontKill {
- return nil, err
- }
-
- // We now know we aren't going to return this client,
- // but killLiveServer still needs it to send the Shutdown request.
- // killLiveServer will close the client when it is done with it.
- if err := c.killLiveServer(ctx, client, serverPid); err != nil {
- return nil, err
- }
- // Loops back around and tries again.
- } else if errors.Is(err, errConnectionFailure) {
- // close the client, see if we can kill the stale daemon
- _ = client.Close()
- if err := c.killDeadServer(serverPid); err != nil {
- return nil, err
- }
- // if we successfully killed the dead server, loop around and try again
- } else if err != nil {
- // Some other error occurred, close the client and
- // report the error to the user
- if closeErr := client.Close(); closeErr != nil {
- // In the event that we fail to close the client, bundle that error along also.
- // Keep the original error in the error chain, as it's more likely to be useful
- // or needed for matching on later.
- err = errors.Wrapf(err, "also failed to close client connection: %v", closeErr)
- }
- return nil, err
- }
- }
- return nil, ErrTooManyAttempts
-}
-
-// getOrStartDaemon returns the PID of the daemon process on success. It may start
-// the daemon if it doesn't find one running.
-func (c *Connector) getOrStartDaemon() (int, error) {
- lockFile := c.lockFile()
- daemonProcess, getDaemonProcessErr := lockFile.GetOwner()
- if getDaemonProcessErr != nil {
- // If we're in a clean state this isn't an "error" per se.
- // We attempt to start a daemon.
- if errors.Is(getDaemonProcessErr, fs.ErrNotExist) {
- if c.Opts.DontStart {
- return 0, ErrDaemonNotRunning
- }
- pid, startDaemonErr := c.startDaemon()
- if startDaemonErr != nil {
- return 0, startDaemonErr
- }
- return pid, nil
- }
-
- // We could have hit any number of errors.
- // - Failed to read the file for permission reasons.
- // - User emptied the file's contents.
- // - etc.
- return 0, errors.Wrapf(getDaemonProcessErr, "An issue was encountered with the pid file. Please remove it and try again: %v", c.PidPath)
- }
-
- return daemonProcess.Pid, nil
-}
-
-func (c *Connector) getClientConn() (*Client, error) {
- creds := insecure.NewCredentials()
- conn, err := grpc.Dial(c.addr(), grpc.WithTransportCredentials(creds))
- if err != nil {
- return nil, err
- }
- tc := turbodprotocol.NewTurbodClient(conn)
- return &Client{
- TurbodClient: tc,
- ClientConn: conn,
- SockPath: c.SockPath,
- PidPath: c.PidPath,
- LogPath: c.LogPath,
- }, nil
-}
-
-func (c *Connector) sendHello(ctx context.Context, client turbodprotocol.TurbodClient) error {
- _, err := client.Hello(ctx, &turbodprotocol.HelloRequest{
- Version: c.TurboVersion,
- // TODO: add session id
- })
- status := status.Convert(err)
- switch status.Code() {
- case codes.OK:
- return nil
- case codes.FailedPrecondition:
- return ErrVersionMismatch
- case codes.Unavailable:
- return errConnectionFailure
- default:
- return err
- }
-}
-
-var errNeedsRetry = errors.New("retry the operation")
-
-// backoffWithTimeout returns an exponential backoff, starting at 2ms and doubling until
-// the specific timeout has elapsed. Note that backoff instances are stateful, so we need
-// a new one each time we do a Retry.
-func backoffWithTimeout(timeout time.Duration) *backoff.ExponentialBackOff {
- return &backoff.ExponentialBackOff{
- Multiplier: 2,
- InitialInterval: 2 * time.Millisecond,
- MaxElapsedTime: timeout,
- Clock: backoff.SystemClock,
- Stop: backoff.Stop,
- }
-}
-
-// waitForSocket waits for the unix domain socket to appear
-func (c *Connector) waitForSocket() error {
- // Note that we don't care if this is our daemon
- // or not. We started a process, but someone else could beat
- // use to listening. That's fine, we'll check the version
- // later.
- err := backoff.Retry(func() error {
- if !c.SockPath.FileExists() {
- return errNeedsRetry
- }
- return nil
- }, backoffWithTimeout(_socketPollTimeout))
- if errors.Is(err, errNeedsRetry) {
- return ErrFailedToStart
- } else if err != nil {
- return err
- }
- return nil
-}
-
-// startDaemon starts the daemon and returns the pid for the new process
-func (c *Connector) startDaemon() (int, error) {
- args := []string{"daemon"}
- if c.Opts.ServerTimeout != 0 {
- args = append(args, fmt.Sprintf("--idle-time=%v", c.Opts.ServerTimeout.String()))
- }
- c.Logger.Debug(fmt.Sprintf("starting turbod binary %v", c.Bin))
- cmd := exec.Command(c.Bin, args...)
- // For the daemon to have its own process group id so that any attempts
- // to kill it and its process tree don't kill this client.
- cmd.SysProcAttr = getSysProcAttrs()
- err := cmd.Start()
- if err != nil {
- return 0, err
- }
- return cmd.Process.Pid, nil
-}
diff --git a/cli/internal/daemon/connector/connector_test.go b/cli/internal/daemon/connector/connector_test.go
deleted file mode 100644
index 62b4504..0000000
--- a/cli/internal/daemon/connector/connector_test.go
+++ /dev/null
@@ -1,256 +0,0 @@
-package connector
-
-import (
- "context"
- "errors"
- "net"
- "os/exec"
- "runtime"
- "strconv"
- "testing"
-
- "github.com/hashicorp/go-hclog"
- "github.com/nightlyone/lockfile"
- "github.com/vercel/turbo/cli/internal/fs"
- "github.com/vercel/turbo/cli/internal/turbodprotocol"
- "github.com/vercel/turbo/cli/internal/turbopath"
- "google.golang.org/grpc"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/credentials/insecure"
- "google.golang.org/grpc/status"
- "google.golang.org/grpc/test/bufconn"
- "gotest.tools/v3/assert"
-)
-
-// testBin returns a platform-appropriate executable to run node.
-// Node works here as an arbitrary process to start, since it's
-// required for turbo development. It will obviously not implement
-// our grpc service, use a mockServer instance where that's needed.
-func testBin() string {
- if runtime.GOOS == "windows" {
- return "node.exe"
- }
- return "node"
-}
-
-func getUnixSocket(dir turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath {
- return dir.UntypedJoin("turbod-test.sock")
-}
-
-func getPidFile(dir turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath {
- return dir.UntypedJoin("turbod-test.pid")
-}
-
-func TestGetOrStartDaemonInvalidPIDFile(t *testing.T) {
- logger := hclog.Default()
- dir := t.TempDir()
- dirPath := fs.AbsoluteSystemPathFromUpstream(dir)
-
- pidPath := getPidFile(dirPath)
- writeFileErr := pidPath.WriteFile(nil, 0777)
- assert.NilError(t, writeFileErr, "WriteFile")
-
- c := &Connector{
- Logger: logger,
- Opts: Opts{},
- PidPath: pidPath,
- }
-
- pid, err := c.getOrStartDaemon()
- assert.Equal(t, pid, 0)
- assert.ErrorContains(t, err, "issue was encountered with the pid file")
-}
-
-func TestConnectFailsWithoutGrpcServer(t *testing.T) {
- // We aren't starting a server that is going to write
- // to our socket file, so we should see a series of connection
- // failures, followed by ErrTooManyAttempts
- logger := hclog.Default()
- dir := t.TempDir()
- dirPath := fs.AbsoluteSystemPathFromUpstream(dir)
-
- sockPath := getUnixSocket(dirPath)
- pidPath := getPidFile(dirPath)
- ctx := context.Background()
- bin := testBin()
- c := &Connector{
- Logger: logger,
- Bin: bin,
- Opts: Opts{},
- SockPath: sockPath,
- PidPath: pidPath,
- }
- // Note that we expect ~3s here, for 3 attempts with a timeout of 1s
- _, err := c.connectInternal(ctx)
- assert.ErrorIs(t, err, ErrTooManyAttempts)
-}
-
-func TestKillDeadServerNoPid(t *testing.T) {
- logger := hclog.Default()
- dir := t.TempDir()
- dirPath := fs.AbsoluteSystemPathFromUpstream(dir)
-
- sockPath := getUnixSocket(dirPath)
- pidPath := getPidFile(dirPath)
- c := &Connector{
- Logger: logger,
- Bin: "nonexistent",
- Opts: Opts{},
- SockPath: sockPath,
- PidPath: pidPath,
- }
-
- err := c.killDeadServer(99999)
- assert.NilError(t, err, "killDeadServer")
-}
-
-func TestKillDeadServerNoProcess(t *testing.T) {
- logger := hclog.Default()
- dir := t.TempDir()
- dirPath := fs.AbsoluteSystemPathFromUpstream(dir)
-
- sockPath := getUnixSocket(dirPath)
- pidPath := getPidFile(dirPath)
- // Simulate the socket already existing, with no live daemon
- err := sockPath.WriteFile([]byte("junk"), 0644)
- assert.NilError(t, err, "WriteFile")
- err = pidPath.WriteFile([]byte("99999"), 0644)
- assert.NilError(t, err, "WriteFile")
- c := &Connector{
- Logger: logger,
- Bin: "nonexistent",
- Opts: Opts{},
- SockPath: sockPath,
- PidPath: pidPath,
- }
-
- err = c.killDeadServer(99999)
- assert.ErrorIs(t, err, lockfile.ErrDeadOwner)
- stillExists := pidPath.FileExists()
- if !stillExists {
- t.Error("pidPath should still exist, expected the user to clean it up")
- }
-}
-
-func TestKillDeadServerWithProcess(t *testing.T) {
- logger := hclog.Default()
- dir := t.TempDir()
- dirPath := fs.AbsoluteSystemPathFromUpstream(dir)
-
- sockPath := getUnixSocket(dirPath)
- pidPath := getPidFile(dirPath)
- // Simulate the socket already existing, with no live daemon
- err := sockPath.WriteFile([]byte("junk"), 0644)
- assert.NilError(t, err, "WriteFile")
- bin := testBin()
- cmd := exec.Command(bin)
- err = cmd.Start()
- assert.NilError(t, err, "cmd.Start")
- pid := cmd.Process.Pid
- if pid == 0 {
- t.Fatalf("failed to start process %v", bin)
- }
-
- err = pidPath.WriteFile([]byte(strconv.Itoa(pid)), 0644)
- assert.NilError(t, err, "WriteFile")
- c := &Connector{
- Logger: logger,
- Bin: "nonexistent",
- Opts: Opts{},
- SockPath: sockPath,
- PidPath: pidPath,
- }
-
- err = c.killDeadServer(pid)
- assert.NilError(t, err, "killDeadServer")
- stillExists := pidPath.FileExists()
- if !stillExists {
- t.Error("pidPath no longer exists, expected client to not clean it up")
- }
- err = cmd.Wait()
- exitErr := &exec.ExitError{}
- if !errors.As(err, &exitErr) {
- t.Errorf("expected an exit error from %v, got %v", bin, err)
- }
-}
-
-type mockServer struct {
- turbodprotocol.UnimplementedTurbodServer
- helloErr error
- shutdownResp *turbodprotocol.ShutdownResponse
- pidFile turbopath.AbsoluteSystemPath
-}
-
-// Simulates server exiting by cleaning up the pid file
-func (s *mockServer) Shutdown(ctx context.Context, req *turbodprotocol.ShutdownRequest) (*turbodprotocol.ShutdownResponse, error) {
- if err := s.pidFile.Remove(); err != nil {
- return nil, err
- }
- return s.shutdownResp, nil
-}
-
-func (s *mockServer) Hello(ctx context.Context, req *turbodprotocol.HelloRequest) (*turbodprotocol.HelloResponse, error) {
- if req.Version == "" {
- return nil, errors.New("missing version")
- }
- return nil, s.helloErr
-}
-
-func TestKillLiveServer(t *testing.T) {
- logger := hclog.Default()
- dir := t.TempDir()
- dirPath := fs.AbsoluteSystemPathFromUpstream(dir)
-
- sockPath := getUnixSocket(dirPath)
- pidPath := getPidFile(dirPath)
- err := pidPath.WriteFile([]byte("99999"), 0644)
- assert.NilError(t, err, "WriteFile")
-
- ctx := context.Background()
- c := &Connector{
- Logger: logger,
- Bin: "nonexistent",
- Opts: Opts{},
- SockPath: sockPath,
- PidPath: pidPath,
- TurboVersion: "some-version",
- }
-
- st := status.New(codes.FailedPrecondition, "version mismatch")
- mock := &mockServer{
- shutdownResp: &turbodprotocol.ShutdownResponse{},
- helloErr: st.Err(),
- pidFile: pidPath,
- }
- lis := bufconn.Listen(1024 * 1024)
- grpcServer := grpc.NewServer()
- turbodprotocol.RegisterTurbodServer(grpcServer, mock)
- go func(t *testing.T) {
- if err := grpcServer.Serve(lis); err != nil {
- t.Logf("server closed: %v", err)
- }
- }(t)
-
- conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) {
- return lis.Dial()
- }), grpc.WithTransportCredentials(insecure.NewCredentials()))
- assert.NilError(t, err, "DialContext")
- turboClient := turbodprotocol.NewTurbodClient(conn)
- client := &Client{
- TurbodClient: turboClient,
- ClientConn: conn,
- }
- err = c.sendHello(ctx, client)
- if !errors.Is(err, ErrVersionMismatch) {
- t.Errorf("sendHello error got %v, want %v", err, ErrVersionMismatch)
- }
- err = c.killLiveServer(ctx, client, 99999)
- assert.NilError(t, err, "killLiveServer")
- // Expect the pid file and socket files to have been cleaned up
- if pidPath.FileExists() {
- t.Errorf("expected pid file to have been deleted: %v", pidPath)
- }
- if sockPath.FileExists() {
- t.Errorf("expected socket file to have been deleted: %v", sockPath)
- }
-}
diff --git a/cli/internal/daemon/connector/fork.go b/cli/internal/daemon/connector/fork.go
deleted file mode 100644
index 8a6d01d..0000000
--- a/cli/internal/daemon/connector/fork.go
+++ /dev/null
@@ -1,15 +0,0 @@
-//go:build !windows
-// +build !windows
-
-package connector
-
-import "syscall"
-
-// getSysProcAttrs returns the platform-specific attributes we want to
-// use while forking the daemon process. Currently this is limited to
-// forcing a new process group
-func getSysProcAttrs() *syscall.SysProcAttr {
- return &syscall.SysProcAttr{
- Setpgid: true,
- }
-}
diff --git a/cli/internal/daemon/connector/fork_windows.go b/cli/internal/daemon/connector/fork_windows.go
deleted file mode 100644
index b9d6e77..0000000
--- a/cli/internal/daemon/connector/fork_windows.go
+++ /dev/null
@@ -1,15 +0,0 @@
-//go:build windows
-// +build windows
-
-package connector
-
-import "syscall"
-
-// getSysProcAttrs returns the platform-specific attributes we want to
-// use while forking the daemon process. Currently this is limited to
-// forcing a new process group
-func getSysProcAttrs() *syscall.SysProcAttr {
- return &syscall.SysProcAttr{
- CreationFlags: syscall.CREATE_NEW_PROCESS_GROUP,
- }
-}
diff --git a/cli/internal/daemon/daemon.go b/cli/internal/daemon/daemon.go
deleted file mode 100644
index 81d5283..0000000
--- a/cli/internal/daemon/daemon.go
+++ /dev/null
@@ -1,307 +0,0 @@
-package daemon
-
-import (
- "context"
- "crypto/sha256"
- "encoding/hex"
- "fmt"
- "io"
- "net"
- "os"
- "path/filepath"
- "strings"
- "time"
-
- grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
- "github.com/hashicorp/go-hclog"
- "github.com/nightlyone/lockfile"
- "github.com/pkg/errors"
- "github.com/vercel/turbo/cli/internal/cmdutil"
- "github.com/vercel/turbo/cli/internal/daemon/connector"
- "github.com/vercel/turbo/cli/internal/fs"
- "github.com/vercel/turbo/cli/internal/server"
- "github.com/vercel/turbo/cli/internal/signals"
- "github.com/vercel/turbo/cli/internal/turbopath"
- "github.com/vercel/turbo/cli/internal/turbostate"
- "google.golang.org/grpc"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
-)
-
-type daemon struct {
- logger hclog.Logger
- repoRoot turbopath.AbsoluteSystemPath
- timeout time.Duration
- reqCh chan struct{}
- timedOutCh chan struct{}
-}
-
-func getRepoHash(repoRoot turbopath.AbsoluteSystemPath) string {
- pathHash := sha256.Sum256([]byte(repoRoot.ToString()))
- // We grab a substring of the hash because there is a 108-character limit on the length
- // of a filepath for unix domain socket.
- return hex.EncodeToString(pathHash[:])[:16]
-}
-
-func getDaemonFileRoot(repoRoot turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath {
- tempDir := fs.TempDir("turbod")
- hexHash := getRepoHash(repoRoot)
- return tempDir.UntypedJoin(hexHash)
-}
-
-func getLogFilePath(repoRoot turbopath.AbsoluteSystemPath) (turbopath.AbsoluteSystemPath, error) {
- hexHash := getRepoHash(repoRoot)
- base := repoRoot.Base()
- logFilename := fmt.Sprintf("%v-%v.log", hexHash, base)
-
- logsDir := fs.GetTurboDataDir().UntypedJoin("logs")
- return logsDir.UntypedJoin(logFilename), nil
-}
-
-func getUnixSocket(repoRoot turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath {
- root := getDaemonFileRoot(repoRoot)
- return root.UntypedJoin("turbod.sock")
-}
-
-func getPidFile(repoRoot turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath {
- root := getDaemonFileRoot(repoRoot)
- return root.UntypedJoin("turbod.pid")
-}
-
-// logError logs an error and outputs it to the UI.
-func (d *daemon) logError(err error) {
- d.logger.Error(fmt.Sprintf("error %v", err))
-}
-
-// we're only appending, and we're creating the file if it doesn't exist.
-// we do not need to read the log file.
-var _logFileFlags = os.O_WRONLY | os.O_APPEND | os.O_CREATE
-
-// ExecuteDaemon executes the root daemon command
-func ExecuteDaemon(ctx context.Context, helper *cmdutil.Helper, signalWatcher *signals.Watcher, args *turbostate.ParsedArgsFromRust) error {
- base, err := helper.GetCmdBase(args)
- if err != nil {
- return err
- }
- if args.TestRun {
- base.UI.Info("Daemon test run successful")
- return nil
- }
-
- idleTimeout := 4 * time.Hour
- if args.Command.Daemon.IdleTimeout != "" {
- idleTimeout, err = time.ParseDuration(args.Command.Daemon.IdleTimeout)
- if err != nil {
- return err
- }
- }
-
- logFilePath, err := getLogFilePath(base.RepoRoot)
- if err != nil {
- return err
- }
- if err := logFilePath.EnsureDir(); err != nil {
- return err
- }
- logFile, err := logFilePath.OpenFile(_logFileFlags, 0644)
- if err != nil {
- return err
- }
- defer func() { _ = logFile.Close() }()
- logger := hclog.New(&hclog.LoggerOptions{
- Output: io.MultiWriter(logFile, os.Stdout),
- Level: hclog.Info,
- Color: hclog.ColorOff,
- Name: "turbod",
- })
-
- d := &daemon{
- logger: logger,
- repoRoot: base.RepoRoot,
- timeout: idleTimeout,
- reqCh: make(chan struct{}),
- timedOutCh: make(chan struct{}),
- }
- serverName := getRepoHash(base.RepoRoot)
- turboServer, err := server.New(serverName, d.logger.Named("rpc server"), base.RepoRoot, base.TurboVersion, logFilePath)
- if err != nil {
- d.logError(err)
- return err
- }
- defer func() { _ = turboServer.Close() }()
- err = d.runTurboServer(ctx, turboServer, signalWatcher)
- if err != nil {
- d.logError(err)
- return err
- }
- return nil
-}
-
-var errInactivityTimeout = errors.New("turbod shut down from inactivity")
-
-// tryAcquirePidfileLock attempts to ensure that only one daemon is running from the given pid file path
-// at a time. If this process fails to write its PID to the lockfile, it must exit.
-func tryAcquirePidfileLock(pidPath turbopath.AbsoluteSystemPath) (lockfile.Lockfile, error) {
- if err := pidPath.EnsureDir(); err != nil {
- return "", err
- }
- lockFile, err := lockfile.New(pidPath.ToString())
- if err != nil {
- // lockfile.New should only return an error if it wasn't given an absolute path.
- // We are attempting to use the type system to enforce that we are passing an
- // absolute path. An error here likely means a bug, and we should crash.
- panic(err)
- }
- if err := lockFile.TryLock(); err != nil {
- return "", err
- }
- return lockFile, nil
-}
-
-type rpcServer interface {
- Register(grpcServer server.GRPCServer)
-}
-
-func (d *daemon) runTurboServer(parentContext context.Context, rpcServer rpcServer, signalWatcher *signals.Watcher) error {
- ctx, cancel := context.WithCancel(parentContext)
- defer cancel()
- pidPath := getPidFile(d.repoRoot)
- lock, err := tryAcquirePidfileLock(pidPath)
- if err != nil {
- return errors.Wrapf(err, "failed to lock the pid file at %v. Is another turbo daemon running?", lock)
- }
- // When we're done serving, clean up the pid file.
- // Also, if *this* goroutine panics, make sure we unlock the pid file.
- defer func() {
- if err := lock.Unlock(); err != nil {
- d.logger.Error(errors.Wrapf(err, "failed unlocking pid file at %v", lock).Error())
- }
- }()
- // This handler runs in request goroutines. If a request causes a panic,
- // this handler will get called after a call to recover(), meaning we are
- // no longer panicking. We return a server error and cancel our context,
- // which triggers a shutdown of the server.
- panicHandler := func(thePanic interface{}) error {
- cancel()
- d.logger.Error(fmt.Sprintf("Caught panic %v", thePanic))
- return status.Error(codes.Internal, "server panicked")
- }
-
- // If we have the lock, assume that we are the owners of the socket file,
- // whether it already exists or not. That means we are free to remove it.
- sockPath := getUnixSocket(d.repoRoot)
- if err := sockPath.Remove(); err != nil && !errors.Is(err, os.ErrNotExist) {
- return err
- }
- d.logger.Debug(fmt.Sprintf("Using socket path %v (%v)\n", sockPath, len(sockPath)))
- lis, err := net.Listen("unix", sockPath.ToString())
- if err != nil {
- return err
- }
- // We don't need to explicitly close 'lis', the grpc server will handle that
- s := grpc.NewServer(
- grpc.ChainUnaryInterceptor(
- d.onRequest,
- grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandler(panicHandler)),
- ),
- )
- go d.timeoutLoop(ctx)
-
- rpcServer.Register(s)
- errCh := make(chan error)
- go func(errCh chan<- error) {
- if err := s.Serve(lis); err != nil {
- errCh <- err
- }
- close(errCh)
- }(errCh)
-
- // Note that we aren't deferring s.GracefulStop here because we also need
- // to drain the error channel, which isn't guaranteed to happen until
- // the server has stopped. That in turn may depend on GracefulStop being
- // called.
- // Future work could restructure this to make that simpler.
- var exitErr error
- select {
- case err, ok := <-errCh:
- // The server exited
- if ok {
- exitErr = err
- }
- case <-d.timedOutCh:
- // This is the inactivity timeout case
- exitErr = errInactivityTimeout
- s.GracefulStop()
- case <-ctx.Done():
- // If a request handler panics, it will cancel this context
- s.GracefulStop()
- case <-signalWatcher.Done():
- // This is fired if caught a signal
- s.GracefulStop()
- }
- // Wait for the server to exit, if it hasn't already.
- // When it does, this channel will close. We don't
- // care about the error in this scenario because we've
- // either requested a close via cancelling the context,
- // an inactivity timeout, or caught a signal.
- for range errCh {
- }
- return exitErr
-}
-
-func (d *daemon) onRequest(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
- d.reqCh <- struct{}{}
- return handler(ctx, req)
-}
-
-func (d *daemon) timeoutLoop(ctx context.Context) {
- timeoutCh := time.After(d.timeout)
-outer:
- for {
- select {
- case <-d.reqCh:
- timeoutCh = time.After(d.timeout)
- case <-timeoutCh:
- close(d.timedOutCh)
- break outer
- case <-ctx.Done():
- break outer
- }
- }
-}
-
-// ClientOpts re-exports connector.Ops to encapsulate the connector package
-type ClientOpts = connector.Opts
-
-// Client re-exports connector.Client to encapsulate the connector package
-type Client = connector.Client
-
-// GetClient returns a client that can be used to interact with the daemon
-func GetClient(ctx context.Context, repoRoot turbopath.AbsoluteSystemPath, logger hclog.Logger, turboVersion string, opts ClientOpts) (*Client, error) {
- sockPath := getUnixSocket(repoRoot)
- pidPath := getPidFile(repoRoot)
- logPath, err := getLogFilePath(repoRoot)
- if err != nil {
- return nil, err
- }
- bin, err := os.Executable()
- if err != nil {
- return nil, err
- }
- // The Go binary can no longer be called directly, so we need to route back to the rust wrapper
- if strings.HasSuffix(bin, "go-turbo") {
- bin = filepath.Join(filepath.Dir(bin), "turbo")
- } else if strings.HasSuffix(bin, "go-turbo.exe") {
- bin = filepath.Join(filepath.Dir(bin), "turbo.exe")
- }
- c := &connector.Connector{
- Logger: logger.Named("TurbodClient"),
- Bin: bin,
- Opts: opts,
- SockPath: sockPath,
- PidPath: pidPath,
- LogPath: logPath,
- TurboVersion: turboVersion,
- }
- return c.Connect(ctx)
-}
diff --git a/cli/internal/daemon/daemon_test.go b/cli/internal/daemon/daemon_test.go
deleted file mode 100644
index 66a714d..0000000
--- a/cli/internal/daemon/daemon_test.go
+++ /dev/null
@@ -1,262 +0,0 @@
-package daemon
-
-import (
- "context"
- "errors"
- "os/exec"
- "runtime"
- "strconv"
- "sync"
- "testing"
- "time"
-
- "github.com/hashicorp/go-hclog"
- "github.com/nightlyone/lockfile"
- "github.com/vercel/turbo/cli/internal/fs"
- "github.com/vercel/turbo/cli/internal/server"
- "github.com/vercel/turbo/cli/internal/signals"
- "github.com/vercel/turbo/cli/internal/turbopath"
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials/insecure"
- "google.golang.org/grpc/test/grpc_testing"
- "gotest.tools/v3/assert"
-)
-
-// testBin returns a platform-appropriate node binary.
-// We need some process to be running and findable by the
-// lockfile library, and we don't particularly care what it is.
-// Since node is required for turbo development, it makes a decent
-// candidate.
-func testBin() string {
- if runtime.GOOS == "windows" {
- return "node.exe"
- }
- return "node"
-}
-
-func TestPidFileLock(t *testing.T) {
- repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir())
-
- pidPath := getPidFile(repoRoot)
- // the lockfile library handles removing pids from dead owners
- _, err := tryAcquirePidfileLock(pidPath)
- assert.NilError(t, err, "acquirePidLock")
-
- // Start up a node process and fake a pid file for it.
- // Ensure that we can't start the daemon while the node process is live
- bin := testBin()
- node := exec.Command(bin)
- err = node.Start()
- assert.NilError(t, err, "Start")
- stopNode := func() error {
- if err := node.Process.Kill(); err != nil {
- return err
- }
- // We expect an error from node, we just sent a kill signal
- _ = node.Wait()
- return nil
- }
- // In case we fail the test, still try to kill the node process
- t.Cleanup(func() { _ = stopNode() })
- nodePid := node.Process.Pid
- err = pidPath.WriteFile([]byte(strconv.Itoa(nodePid)), 0644)
- assert.NilError(t, err, "WriteFile")
-
- _, err = tryAcquirePidfileLock(pidPath)
- assert.ErrorIs(t, err, lockfile.ErrBusy)
-
- // Stop the node process, but leave the pid file there
- // This simulates a crash
- err = stopNode()
- assert.NilError(t, err, "stopNode")
- // the lockfile library handles removing pids from dead owners
- _, err = tryAcquirePidfileLock(pidPath)
- assert.NilError(t, err, "acquirePidLock")
-}
-
-type testRPCServer struct {
- grpc_testing.UnimplementedTestServiceServer
- registered chan struct{}
-}
-
-func (ts *testRPCServer) EmptyCall(ctx context.Context, req *grpc_testing.Empty) (*grpc_testing.Empty, error) {
- panic("intended to panic")
-}
-
-func (ts *testRPCServer) Register(grpcServer server.GRPCServer) {
- grpc_testing.RegisterTestServiceServer(grpcServer, ts)
- ts.registered <- struct{}{}
-}
-
-func newTestRPCServer() *testRPCServer {
- return &testRPCServer{
- registered: make(chan struct{}, 1),
- }
-}
-
-func waitForFile(t *testing.T, filename turbopath.AbsoluteSystemPath, timeout time.Duration) {
- t.Helper()
- deadline := time.After(timeout)
-outer:
- for !filename.FileExists() {
- select {
- case <-deadline:
- break outer
- case <-time.After(10 * time.Millisecond):
- }
- }
- if !filename.FileExists() {
- t.Errorf("timed out waiting for %v to exist after %v", filename, timeout)
- }
-}
-
-func TestDaemonLifecycle(t *testing.T) {
- logger := hclog.Default()
- logger.SetLevel(hclog.Debug)
- repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir())
-
- ts := newTestRPCServer()
- watcher := signals.NewWatcher()
- ctx, cancel := context.WithCancel(context.Background())
-
- d := &daemon{
- logger: logger,
- repoRoot: repoRoot,
- timeout: 10 * time.Second,
- reqCh: make(chan struct{}),
- timedOutCh: make(chan struct{}),
- }
-
- var serverErr error
- wg := &sync.WaitGroup{}
- wg.Add(1)
- go func() {
- serverErr = d.runTurboServer(ctx, ts, watcher)
- wg.Done()
- }()
-
- sockPath := getUnixSocket(repoRoot)
- waitForFile(t, sockPath, 30*time.Second)
- pidPath := getPidFile(repoRoot)
- waitForFile(t, pidPath, 1*time.Second)
- cancel()
- wg.Wait()
- assert.NilError(t, serverErr, "runTurboServer")
- if sockPath.FileExists() {
- t.Errorf("%v still exists, should have been cleaned up", sockPath)
- }
- if pidPath.FileExists() {
- t.Errorf("%v still exists, should have been cleaned up", sockPath)
- }
-}
-
-func TestTimeout(t *testing.T) {
- logger := hclog.Default()
- logger.SetLevel(hclog.Debug)
- repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir())
-
- ts := newTestRPCServer()
- watcher := signals.NewWatcher()
- ctx := context.Background()
-
- d := &daemon{
- logger: logger,
- repoRoot: repoRoot,
- timeout: 5 * time.Millisecond,
- reqCh: make(chan struct{}),
- timedOutCh: make(chan struct{}),
- }
- err := d.runTurboServer(ctx, ts, watcher)
- if !errors.Is(err, errInactivityTimeout) {
- t.Errorf("server error got %v, want %v", err, errInactivityTimeout)
- }
-}
-
-func TestCaughtSignal(t *testing.T) {
- logger := hclog.Default()
- logger.SetLevel(hclog.Debug)
- repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir())
-
- ts := newTestRPCServer()
- watcher := signals.NewWatcher()
- ctx := context.Background()
-
- d := &daemon{
- logger: logger,
- repoRoot: repoRoot,
- timeout: 5 * time.Second,
- reqCh: make(chan struct{}),
- timedOutCh: make(chan struct{}),
- }
- errCh := make(chan error)
- go func() {
- err := d.runTurboServer(ctx, ts, watcher)
- errCh <- err
- }()
- <-ts.registered
- // grpc doesn't provide a signal to know when the server is serving.
- // So while this call to Close can race with the call to grpc.Server.Serve, if we've
- // registered with the turboserver, we've registered all of our
- // signal handlers as well. We just may or may not be serving when Close()
- // is called. It shouldn't matter for the purposes of this test:
- // Either we are serving, and Serve will return with nil when GracefulStop is
- // called, or we aren't serving yet, and the subsequent call to Serve will
- // immediately return with grpc.ErrServerStopped. So, both nil and grpc.ErrServerStopped
- // are acceptable outcomes for runTurboServer. Any other error, or a timeout, is a
- // failure.
- watcher.Close()
-
- err := <-errCh
- pidPath := getPidFile(repoRoot)
- if pidPath.FileExists() {
- t.Errorf("expected to clean up %v, but it still exists", pidPath)
- }
- // We'll either get nil or ErrServerStopped, depending on whether
- // or not we close the signal watcher before grpc.Server.Serve was
- // called.
- if err != nil && !errors.Is(err, grpc.ErrServerStopped) {
- t.Errorf("runTurboServer got err %v, want nil or ErrServerStopped", err)
- }
-}
-
-func TestCleanupOnPanic(t *testing.T) {
- logger := hclog.Default()
- logger.SetLevel(hclog.Debug)
- repoRoot := fs.AbsoluteSystemPathFromUpstream(t.TempDir())
-
- ts := newTestRPCServer()
- watcher := signals.NewWatcher()
- ctx := context.Background()
-
- d := &daemon{
- logger: logger,
- repoRoot: repoRoot,
- timeout: 5 * time.Second,
- reqCh: make(chan struct{}),
- timedOutCh: make(chan struct{}),
- }
- errCh := make(chan error)
- go func() {
- err := d.runTurboServer(ctx, ts, watcher)
- errCh <- err
- }()
- <-ts.registered
-
- creds := insecure.NewCredentials()
- sockFile := getUnixSocket(repoRoot)
- conn, err := grpc.Dial("unix://"+sockFile.ToString(), grpc.WithTransportCredentials(creds))
- assert.NilError(t, err, "Dial")
-
- client := grpc_testing.NewTestServiceClient(conn)
- _, err = client.EmptyCall(ctx, &grpc_testing.Empty{})
- if err == nil {
- t.Error("nil error")
- }
- // wait for the server to finish
- <-errCh
-
- pidPath := getPidFile(repoRoot)
- if pidPath.FileExists() {
- t.Errorf("expected to clean up %v, but it still exists", pidPath)
- }
-}