diff options
Diffstat (limited to 'cli/internal/analytics/analytics.go')
| -rw-r--r-- | cli/internal/analytics/analytics.go | 175 |
1 files changed, 0 insertions, 175 deletions
diff --git a/cli/internal/analytics/analytics.go b/cli/internal/analytics/analytics.go deleted file mode 100644 index 8d9a3b6..0000000 --- a/cli/internal/analytics/analytics.go +++ /dev/null @@ -1,175 +0,0 @@ -package analytics - -import ( - "context" - "sync" - "time" - - "github.com/google/uuid" - "github.com/hashicorp/go-hclog" - "github.com/mitchellh/mapstructure" - "github.com/vercel/turbo/cli/internal/util" -) - -type Events = []map[string]interface{} - -type EventPayload = interface{} - -type Recorder interface { - LogEvent(payload EventPayload) -} - -type Client interface { - Recorder - Close() - CloseWithTimeout(timeout time.Duration) -} - -type Sink interface { - RecordAnalyticsEvents(events Events) error -} - -type nullSink struct{} - -func (n *nullSink) RecordAnalyticsEvents(events Events) error { - return nil -} - -// NullSink is an analytics sink to use in the event that we don't want to send -// analytics -var NullSink = &nullSink{} - -type client struct { - ch chan<- EventPayload - cancel func() - - worker *worker -} - -type worker struct { - buffer []EventPayload - ch <-chan EventPayload - ctx context.Context - doneSemaphore util.Semaphore - sessionID uuid.UUID - sink Sink - wg sync.WaitGroup - logger hclog.Logger -} - -const bufferThreshold = 10 -const eventTimeout = 200 * time.Millisecond -const noTimeout = 24 * time.Hour - -func newWorker(ctx context.Context, ch <-chan EventPayload, sink Sink, logger hclog.Logger) *worker { - buffer := []EventPayload{} - sessionID := uuid.New() - w := &worker{ - buffer: buffer, - ch: ch, - ctx: ctx, - doneSemaphore: util.NewSemaphore(1), - sessionID: sessionID, - sink: sink, - logger: logger, - } - w.doneSemaphore.Acquire() - go w.analyticsClient() - return w -} - -func NewClient(parent context.Context, sink Sink, logger hclog.Logger) Client { - ch := make(chan EventPayload) - ctx, cancel := context.WithCancel(parent) - // creates and starts the worker - worker := newWorker(ctx, ch, sink, logger) - s := &client{ - ch: ch, - cancel: cancel, - worker: worker, - } - return s -} - -func (s *client) LogEvent(event EventPayload) { - s.ch <- event -} - -func (s *client) Close() { - s.cancel() - s.worker.Wait() -} - -func (s *client) CloseWithTimeout(timeout time.Duration) { - ch := make(chan struct{}) - go func() { - s.Close() - close(ch) - }() - select { - case <-ch: - case <-time.After(timeout): - } -} - -func (w *worker) Wait() { - w.doneSemaphore.Acquire() - w.wg.Wait() -} - -func (w *worker) analyticsClient() { - timeout := time.After(noTimeout) - for { - select { - case e := <-w.ch: - w.buffer = append(w.buffer, e) - if len(w.buffer) == bufferThreshold { - w.flush() - timeout = time.After(noTimeout) - } else { - timeout = time.After(eventTimeout) - } - case <-timeout: - w.flush() - timeout = time.After(noTimeout) - case <-w.ctx.Done(): - w.flush() - w.doneSemaphore.Release() - return - } - } -} - -func (w *worker) flush() { - if len(w.buffer) > 0 { - w.sendEvents(w.buffer) - w.buffer = []EventPayload{} - } -} - -func (w *worker) sendEvents(events []EventPayload) { - w.wg.Add(1) - go func() { - payload, err := addSessionID(w.sessionID.String(), events) - if err != nil { - w.logger.Debug("failed to encode cache usage analytics", "error", err) - } - err = w.sink.RecordAnalyticsEvents(payload) - if err != nil { - w.logger.Debug("failed to record cache usage analytics", "error", err) - } - w.wg.Done() - }() -} - -func addSessionID(sessionID string, events []EventPayload) (Events, error) { - eventMaps := []map[string]interface{}{} - err := mapstructure.Decode(events, &eventMaps) - if err != nil { - return nil, err - } - for _, event := range eventMaps { - event["sessionId"] = sessionID - } - return eventMaps, nil -} |
