1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
|
package server
import (
"context"
"sync"
"time"
"github.com/hashicorp/go-hclog"
"github.com/pkg/errors"
"github.com/vercel/turbo/cli/internal/filewatcher"
"github.com/vercel/turbo/cli/internal/fs"
"github.com/vercel/turbo/cli/internal/globwatcher"
"github.com/vercel/turbo/cli/internal/turbodprotocol"
"github.com/vercel/turbo/cli/internal/turbopath"
"google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// Server implements the GRPC serverside of TurbodServer
// Note for the future: we don't yet make use of turbo.json
// or the package graph in the server. Once we do, we may need a
// layer of indirection between "the thing that responds to grpc requests"
// and "the thing that holds our persistent data structures" to handle
// changes in the underlying configuration.
type Server struct {
turbodprotocol.UnimplementedTurbodServer
watcher *filewatcher.FileWatcher
globWatcher *globwatcher.GlobWatcher
turboVersion string
started time.Time
logFilePath turbopath.AbsoluteSystemPath
repoRoot turbopath.AbsoluteSystemPath
closerMu sync.Mutex
closer *closer
}
// GRPCServer is the interface that the turbo server needs to the underlying
// GRPC server. This lets the turbo server register itself, as well as provides
// a hook for shutting down the server.
type GRPCServer interface {
grpc.ServiceRegistrar
GracefulStop()
}
type closer struct {
grpcServer GRPCServer
once sync.Once
}
func (c *closer) close() {
// This can get triggered from a request handler (Shutdown). Since
// calling GracefulStop blocks until all request handlers complete,
// we need to run it in a goroutine to let the Shutdown handler complete
// and avoid deadlocking.
c.once.Do(func() {
go func() {
c.grpcServer.GracefulStop()
}()
})
}
var _defaultCookieTimeout = 500 * time.Millisecond
// New returns a new instance of Server
func New(serverName string, logger hclog.Logger, repoRoot turbopath.AbsoluteSystemPath, turboVersion string, logFilePath turbopath.AbsoluteSystemPath) (*Server, error) {
cookieDir := fs.GetTurboDataDir().UntypedJoin("cookies", serverName)
cookieJar, err := filewatcher.NewCookieJar(cookieDir, _defaultCookieTimeout)
if err != nil {
return nil, err
}
watcher, err := filewatcher.GetPlatformSpecificBackend(logger)
if err != nil {
return nil, err
}
fileWatcher := filewatcher.New(logger.Named("FileWatcher"), repoRoot, watcher)
globWatcher := globwatcher.New(logger.Named("GlobWatcher"), repoRoot, cookieJar)
server := &Server{
watcher: fileWatcher,
globWatcher: globWatcher,
turboVersion: turboVersion,
started: time.Now(),
logFilePath: logFilePath,
repoRoot: repoRoot,
}
server.watcher.AddClient(cookieJar)
server.watcher.AddClient(globWatcher)
server.watcher.AddClient(server)
if err := server.watcher.Start(); err != nil {
return nil, errors.Wrapf(err, "watching %v", repoRoot)
}
if err := server.watcher.AddRoot(cookieDir); err != nil {
_ = server.watcher.Close()
return nil, errors.Wrapf(err, "failed to watch cookie directory: %v", cookieDir)
}
return server, nil
}
func (s *Server) tryClose() bool {
s.closerMu.Lock()
defer s.closerMu.Unlock()
if s.closer != nil {
s.closer.close()
return true
}
return false
}
// OnFileWatchEvent implements filewatcher.FileWatchClient.OnFileWatchEvent
// In the event that the root of the monorepo is deleted, shut down the server.
func (s *Server) OnFileWatchEvent(ev filewatcher.Event) {
if ev.EventType == filewatcher.FileDeleted && ev.Path == s.repoRoot {
_ = s.tryClose()
}
}
// OnFileWatchError implements filewatcher.FileWatchClient.OnFileWatchError
func (s *Server) OnFileWatchError(err error) {}
// OnFileWatchClosed implements filewatcher.FileWatchClient.OnFileWatchClosed
func (s *Server) OnFileWatchClosed() {}
// Close is used for shutting down this copy of the server
func (s *Server) Close() error {
return s.watcher.Close()
}
// Register registers this server to respond to GRPC requests
func (s *Server) Register(grpcServer GRPCServer) {
s.closerMu.Lock()
s.closer = &closer{
grpcServer: grpcServer,
}
s.closerMu.Unlock()
turbodprotocol.RegisterTurbodServer(grpcServer, s)
}
// NotifyOutputsWritten implements the NotifyOutputsWritten rpc from turbo.proto
func (s *Server) NotifyOutputsWritten(ctx context.Context, req *turbodprotocol.NotifyOutputsWrittenRequest) (*turbodprotocol.NotifyOutputsWrittenResponse, error) {
outputs := fs.TaskOutputs{
Inclusions: req.OutputGlobs,
Exclusions: req.OutputExclusionGlobs,
}
err := s.globWatcher.WatchGlobs(req.Hash, outputs)
if err != nil {
return nil, err
}
return &turbodprotocol.NotifyOutputsWrittenResponse{}, nil
}
// GetChangedOutputs implements the GetChangedOutputs rpc from turbo.proto
func (s *Server) GetChangedOutputs(ctx context.Context, req *turbodprotocol.GetChangedOutputsRequest) (*turbodprotocol.GetChangedOutputsResponse, error) {
changedGlobs, err := s.globWatcher.GetChangedGlobs(req.Hash, req.OutputGlobs)
if err != nil {
return nil, err
}
return &turbodprotocol.GetChangedOutputsResponse{
ChangedOutputGlobs: changedGlobs,
}, nil
}
// Hello implements the Hello rpc from turbo.proto
func (s *Server) Hello(ctx context.Context, req *turbodprotocol.HelloRequest) (*turbodprotocol.HelloResponse, error) {
clientVersion := req.Version
if clientVersion != s.turboVersion {
err := status.Errorf(codes.FailedPrecondition, "version mismatch. Client %v Server %v", clientVersion, s.turboVersion)
return nil, err
}
return &turbodprotocol.HelloResponse{}, nil
}
// Shutdown implements the Shutdown rpc from turbo.proto
func (s *Server) Shutdown(ctx context.Context, req *turbodprotocol.ShutdownRequest) (*turbodprotocol.ShutdownResponse, error) {
if s.tryClose() {
return &turbodprotocol.ShutdownResponse{}, nil
}
err := status.Error(codes.NotFound, "shutdown mechanism not found")
return nil, err
}
// Status implements the Status rpc from turbo.proto
func (s *Server) Status(ctx context.Context, req *turbodprotocol.StatusRequest) (*turbodprotocol.StatusResponse, error) {
uptime := uint64(time.Since(s.started).Milliseconds())
return &turbodprotocol.StatusResponse{
DaemonStatus: &turbodprotocol.DaemonStatus{
LogFile: s.logFilePath.ToString(),
UptimeMsec: uptime,
},
}, nil
}
|