From fc8c5fdce62fb229202659408798a7b6c98f6e8b Mon Sep 17 00:00:00 2001 From: 简律纯 Date: Fri, 28 Apr 2023 01:36:55 +0800 Subject: --- cli/internal/daemon/daemon.go | 307 ------------------------------------------ 1 file changed, 307 deletions(-) delete mode 100644 cli/internal/daemon/daemon.go (limited to 'cli/internal/daemon/daemon.go') diff --git a/cli/internal/daemon/daemon.go b/cli/internal/daemon/daemon.go deleted file mode 100644 index 81d5283..0000000 --- a/cli/internal/daemon/daemon.go +++ /dev/null @@ -1,307 +0,0 @@ -package daemon - -import ( - "context" - "crypto/sha256" - "encoding/hex" - "fmt" - "io" - "net" - "os" - "path/filepath" - "strings" - "time" - - grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" - "github.com/hashicorp/go-hclog" - "github.com/nightlyone/lockfile" - "github.com/pkg/errors" - "github.com/vercel/turbo/cli/internal/cmdutil" - "github.com/vercel/turbo/cli/internal/daemon/connector" - "github.com/vercel/turbo/cli/internal/fs" - "github.com/vercel/turbo/cli/internal/server" - "github.com/vercel/turbo/cli/internal/signals" - "github.com/vercel/turbo/cli/internal/turbopath" - "github.com/vercel/turbo/cli/internal/turbostate" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -type daemon struct { - logger hclog.Logger - repoRoot turbopath.AbsoluteSystemPath - timeout time.Duration - reqCh chan struct{} - timedOutCh chan struct{} -} - -func getRepoHash(repoRoot turbopath.AbsoluteSystemPath) string { - pathHash := sha256.Sum256([]byte(repoRoot.ToString())) - // We grab a substring of the hash because there is a 108-character limit on the length - // of a filepath for unix domain socket. - return hex.EncodeToString(pathHash[:])[:16] -} - -func getDaemonFileRoot(repoRoot turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath { - tempDir := fs.TempDir("turbod") - hexHash := getRepoHash(repoRoot) - return tempDir.UntypedJoin(hexHash) -} - -func getLogFilePath(repoRoot turbopath.AbsoluteSystemPath) (turbopath.AbsoluteSystemPath, error) { - hexHash := getRepoHash(repoRoot) - base := repoRoot.Base() - logFilename := fmt.Sprintf("%v-%v.log", hexHash, base) - - logsDir := fs.GetTurboDataDir().UntypedJoin("logs") - return logsDir.UntypedJoin(logFilename), nil -} - -func getUnixSocket(repoRoot turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath { - root := getDaemonFileRoot(repoRoot) - return root.UntypedJoin("turbod.sock") -} - -func getPidFile(repoRoot turbopath.AbsoluteSystemPath) turbopath.AbsoluteSystemPath { - root := getDaemonFileRoot(repoRoot) - return root.UntypedJoin("turbod.pid") -} - -// logError logs an error and outputs it to the UI. -func (d *daemon) logError(err error) { - d.logger.Error(fmt.Sprintf("error %v", err)) -} - -// we're only appending, and we're creating the file if it doesn't exist. -// we do not need to read the log file. -var _logFileFlags = os.O_WRONLY | os.O_APPEND | os.O_CREATE - -// ExecuteDaemon executes the root daemon command -func ExecuteDaemon(ctx context.Context, helper *cmdutil.Helper, signalWatcher *signals.Watcher, args *turbostate.ParsedArgsFromRust) error { - base, err := helper.GetCmdBase(args) - if err != nil { - return err - } - if args.TestRun { - base.UI.Info("Daemon test run successful") - return nil - } - - idleTimeout := 4 * time.Hour - if args.Command.Daemon.IdleTimeout != "" { - idleTimeout, err = time.ParseDuration(args.Command.Daemon.IdleTimeout) - if err != nil { - return err - } - } - - logFilePath, err := getLogFilePath(base.RepoRoot) - if err != nil { - return err - } - if err := logFilePath.EnsureDir(); err != nil { - return err - } - logFile, err := logFilePath.OpenFile(_logFileFlags, 0644) - if err != nil { - return err - } - defer func() { _ = logFile.Close() }() - logger := hclog.New(&hclog.LoggerOptions{ - Output: io.MultiWriter(logFile, os.Stdout), - Level: hclog.Info, - Color: hclog.ColorOff, - Name: "turbod", - }) - - d := &daemon{ - logger: logger, - repoRoot: base.RepoRoot, - timeout: idleTimeout, - reqCh: make(chan struct{}), - timedOutCh: make(chan struct{}), - } - serverName := getRepoHash(base.RepoRoot) - turboServer, err := server.New(serverName, d.logger.Named("rpc server"), base.RepoRoot, base.TurboVersion, logFilePath) - if err != nil { - d.logError(err) - return err - } - defer func() { _ = turboServer.Close() }() - err = d.runTurboServer(ctx, turboServer, signalWatcher) - if err != nil { - d.logError(err) - return err - } - return nil -} - -var errInactivityTimeout = errors.New("turbod shut down from inactivity") - -// tryAcquirePidfileLock attempts to ensure that only one daemon is running from the given pid file path -// at a time. If this process fails to write its PID to the lockfile, it must exit. -func tryAcquirePidfileLock(pidPath turbopath.AbsoluteSystemPath) (lockfile.Lockfile, error) { - if err := pidPath.EnsureDir(); err != nil { - return "", err - } - lockFile, err := lockfile.New(pidPath.ToString()) - if err != nil { - // lockfile.New should only return an error if it wasn't given an absolute path. - // We are attempting to use the type system to enforce that we are passing an - // absolute path. An error here likely means a bug, and we should crash. - panic(err) - } - if err := lockFile.TryLock(); err != nil { - return "", err - } - return lockFile, nil -} - -type rpcServer interface { - Register(grpcServer server.GRPCServer) -} - -func (d *daemon) runTurboServer(parentContext context.Context, rpcServer rpcServer, signalWatcher *signals.Watcher) error { - ctx, cancel := context.WithCancel(parentContext) - defer cancel() - pidPath := getPidFile(d.repoRoot) - lock, err := tryAcquirePidfileLock(pidPath) - if err != nil { - return errors.Wrapf(err, "failed to lock the pid file at %v. Is another turbo daemon running?", lock) - } - // When we're done serving, clean up the pid file. - // Also, if *this* goroutine panics, make sure we unlock the pid file. - defer func() { - if err := lock.Unlock(); err != nil { - d.logger.Error(errors.Wrapf(err, "failed unlocking pid file at %v", lock).Error()) - } - }() - // This handler runs in request goroutines. If a request causes a panic, - // this handler will get called after a call to recover(), meaning we are - // no longer panicking. We return a server error and cancel our context, - // which triggers a shutdown of the server. - panicHandler := func(thePanic interface{}) error { - cancel() - d.logger.Error(fmt.Sprintf("Caught panic %v", thePanic)) - return status.Error(codes.Internal, "server panicked") - } - - // If we have the lock, assume that we are the owners of the socket file, - // whether it already exists or not. That means we are free to remove it. - sockPath := getUnixSocket(d.repoRoot) - if err := sockPath.Remove(); err != nil && !errors.Is(err, os.ErrNotExist) { - return err - } - d.logger.Debug(fmt.Sprintf("Using socket path %v (%v)\n", sockPath, len(sockPath))) - lis, err := net.Listen("unix", sockPath.ToString()) - if err != nil { - return err - } - // We don't need to explicitly close 'lis', the grpc server will handle that - s := grpc.NewServer( - grpc.ChainUnaryInterceptor( - d.onRequest, - grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandler(panicHandler)), - ), - ) - go d.timeoutLoop(ctx) - - rpcServer.Register(s) - errCh := make(chan error) - go func(errCh chan<- error) { - if err := s.Serve(lis); err != nil { - errCh <- err - } - close(errCh) - }(errCh) - - // Note that we aren't deferring s.GracefulStop here because we also need - // to drain the error channel, which isn't guaranteed to happen until - // the server has stopped. That in turn may depend on GracefulStop being - // called. - // Future work could restructure this to make that simpler. - var exitErr error - select { - case err, ok := <-errCh: - // The server exited - if ok { - exitErr = err - } - case <-d.timedOutCh: - // This is the inactivity timeout case - exitErr = errInactivityTimeout - s.GracefulStop() - case <-ctx.Done(): - // If a request handler panics, it will cancel this context - s.GracefulStop() - case <-signalWatcher.Done(): - // This is fired if caught a signal - s.GracefulStop() - } - // Wait for the server to exit, if it hasn't already. - // When it does, this channel will close. We don't - // care about the error in this scenario because we've - // either requested a close via cancelling the context, - // an inactivity timeout, or caught a signal. - for range errCh { - } - return exitErr -} - -func (d *daemon) onRequest(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { - d.reqCh <- struct{}{} - return handler(ctx, req) -} - -func (d *daemon) timeoutLoop(ctx context.Context) { - timeoutCh := time.After(d.timeout) -outer: - for { - select { - case <-d.reqCh: - timeoutCh = time.After(d.timeout) - case <-timeoutCh: - close(d.timedOutCh) - break outer - case <-ctx.Done(): - break outer - } - } -} - -// ClientOpts re-exports connector.Ops to encapsulate the connector package -type ClientOpts = connector.Opts - -// Client re-exports connector.Client to encapsulate the connector package -type Client = connector.Client - -// GetClient returns a client that can be used to interact with the daemon -func GetClient(ctx context.Context, repoRoot turbopath.AbsoluteSystemPath, logger hclog.Logger, turboVersion string, opts ClientOpts) (*Client, error) { - sockPath := getUnixSocket(repoRoot) - pidPath := getPidFile(repoRoot) - logPath, err := getLogFilePath(repoRoot) - if err != nil { - return nil, err - } - bin, err := os.Executable() - if err != nil { - return nil, err - } - // The Go binary can no longer be called directly, so we need to route back to the rust wrapper - if strings.HasSuffix(bin, "go-turbo") { - bin = filepath.Join(filepath.Dir(bin), "turbo") - } else if strings.HasSuffix(bin, "go-turbo.exe") { - bin = filepath.Join(filepath.Dir(bin), "turbo.exe") - } - c := &connector.Connector{ - Logger: logger.Named("TurbodClient"), - Bin: bin, - Opts: opts, - SockPath: sockPath, - PidPath: pidPath, - LogPath: logPath, - TurboVersion: turboVersion, - } - return c.Connect(ctx) -} -- cgit v1.2.3-70-g09d2