diff options
Diffstat (limited to 'cli/internal/analytics')
| -rw-r--r-- | cli/internal/analytics/analytics.go | 175 | ||||
| -rw-r--r-- | cli/internal/analytics/analytics_test.go | 192 |
2 files changed, 367 insertions, 0 deletions
diff --git a/cli/internal/analytics/analytics.go b/cli/internal/analytics/analytics.go new file mode 100644 index 0000000..8d9a3b6 --- /dev/null +++ b/cli/internal/analytics/analytics.go @@ -0,0 +1,175 @@ +package analytics + +import ( + "context" + "sync" + "time" + + "github.com/google/uuid" + "github.com/hashicorp/go-hclog" + "github.com/mitchellh/mapstructure" + "github.com/vercel/turbo/cli/internal/util" +) + +type Events = []map[string]interface{} + +type EventPayload = interface{} + +type Recorder interface { + LogEvent(payload EventPayload) +} + +type Client interface { + Recorder + Close() + CloseWithTimeout(timeout time.Duration) +} + +type Sink interface { + RecordAnalyticsEvents(events Events) error +} + +type nullSink struct{} + +func (n *nullSink) RecordAnalyticsEvents(events Events) error { + return nil +} + +// NullSink is an analytics sink to use in the event that we don't want to send +// analytics +var NullSink = &nullSink{} + +type client struct { + ch chan<- EventPayload + cancel func() + + worker *worker +} + +type worker struct { + buffer []EventPayload + ch <-chan EventPayload + ctx context.Context + doneSemaphore util.Semaphore + sessionID uuid.UUID + sink Sink + wg sync.WaitGroup + logger hclog.Logger +} + +const bufferThreshold = 10 +const eventTimeout = 200 * time.Millisecond +const noTimeout = 24 * time.Hour + +func newWorker(ctx context.Context, ch <-chan EventPayload, sink Sink, logger hclog.Logger) *worker { + buffer := []EventPayload{} + sessionID := uuid.New() + w := &worker{ + buffer: buffer, + ch: ch, + ctx: ctx, + doneSemaphore: util.NewSemaphore(1), + sessionID: sessionID, + sink: sink, + logger: logger, + } + w.doneSemaphore.Acquire() + go w.analyticsClient() + return w +} + +func NewClient(parent context.Context, sink Sink, logger hclog.Logger) Client { + ch := make(chan EventPayload) + ctx, cancel := context.WithCancel(parent) + // creates and starts the worker + worker := newWorker(ctx, ch, sink, logger) + s := &client{ + ch: ch, + cancel: cancel, + worker: worker, + } + return s +} + +func (s *client) LogEvent(event EventPayload) { + s.ch <- event +} + +func (s *client) Close() { + s.cancel() + s.worker.Wait() +} + +func (s *client) CloseWithTimeout(timeout time.Duration) { + ch := make(chan struct{}) + go func() { + s.Close() + close(ch) + }() + select { + case <-ch: + case <-time.After(timeout): + } +} + +func (w *worker) Wait() { + w.doneSemaphore.Acquire() + w.wg.Wait() +} + +func (w *worker) analyticsClient() { + timeout := time.After(noTimeout) + for { + select { + case e := <-w.ch: + w.buffer = append(w.buffer, e) + if len(w.buffer) == bufferThreshold { + w.flush() + timeout = time.After(noTimeout) + } else { + timeout = time.After(eventTimeout) + } + case <-timeout: + w.flush() + timeout = time.After(noTimeout) + case <-w.ctx.Done(): + w.flush() + w.doneSemaphore.Release() + return + } + } +} + +func (w *worker) flush() { + if len(w.buffer) > 0 { + w.sendEvents(w.buffer) + w.buffer = []EventPayload{} + } +} + +func (w *worker) sendEvents(events []EventPayload) { + w.wg.Add(1) + go func() { + payload, err := addSessionID(w.sessionID.String(), events) + if err != nil { + w.logger.Debug("failed to encode cache usage analytics", "error", err) + } + err = w.sink.RecordAnalyticsEvents(payload) + if err != nil { + w.logger.Debug("failed to record cache usage analytics", "error", err) + } + w.wg.Done() + }() +} + +func addSessionID(sessionID string, events []EventPayload) (Events, error) { + eventMaps := []map[string]interface{}{} + err := mapstructure.Decode(events, &eventMaps) + if err != nil { + return nil, err + } + for _, event := range eventMaps { + event["sessionId"] = sessionID + } + return eventMaps, nil +} diff --git a/cli/internal/analytics/analytics_test.go b/cli/internal/analytics/analytics_test.go new file mode 100644 index 0000000..0715fda --- /dev/null +++ b/cli/internal/analytics/analytics_test.go @@ -0,0 +1,192 @@ +package analytics + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/hashicorp/go-hclog" +) + +type dummySink struct { + events []*Events + err error + mu sync.Mutex + ch chan struct{} +} + +type evt struct { + I int +} + +func newDummySink() *dummySink { + return &dummySink{ + events: []*Events{}, + ch: make(chan struct{}, 1), + } +} + +func (d *dummySink) RecordAnalyticsEvents(events Events) error { + d.mu.Lock() + defer d.mu.Unlock() + // Make a copy in case a test is holding a copy too + eventsCopy := make([]*Events, len(d.events)) + copy(eventsCopy, d.events) + d.events = append(eventsCopy, &events) + d.ch <- struct{}{} + return d.err +} + +func (d *dummySink) Events() []*Events { + d.mu.Lock() + defer d.mu.Unlock() + return d.events +} + +func (d *dummySink) ExpectImmediateMessage(t *testing.T) { + select { + case <-time.After(150 * time.Millisecond): + t.Errorf("expected to not wait out the flush timeout") + case <-d.ch: + } +} + +func (d *dummySink) ExpectTimeoutThenMessage(t *testing.T) { + select { + case <-d.ch: + t.Errorf("Expected to wait out the flush timeout") + case <-time.After(150 * time.Millisecond): + } + <-d.ch +} + +func Test_batching(t *testing.T) { + d := newDummySink() + ctx := context.Background() + c := NewClient(ctx, d, hclog.Default()) + for i := 0; i < 2; i++ { + c.LogEvent(&evt{i}) + } + found := d.Events() + if len(found) != 0 { + t.Errorf("got %v events, want 0 due to batching", len(found)) + } + // Should timeout + d.ExpectTimeoutThenMessage(t) + found = d.Events() + if len(found) != 1 { + t.Errorf("got %v, want 1 batch to have been flushed", len(found)) + } + payloads := *found[0] + if len(payloads) != 2 { + t.Errorf("got %v, want 2 payloads to have been flushed", len(payloads)) + } +} + +func Test_batchingAcrossTwoBatches(t *testing.T) { + d := newDummySink() + ctx := context.Background() + c := NewClient(ctx, d, hclog.Default()) + for i := 0; i < 12; i++ { + c.LogEvent(&evt{i}) + } + // We sent more than the batch size, expect a message immediately + d.ExpectImmediateMessage(t) + found := d.Events() + if len(found) != 1 { + t.Errorf("got %v, want 1 batch to have been flushed", len(found)) + } + payloads := *found[0] + if len(payloads) != 10 { + t.Errorf("got %v, want 10 payloads to have been flushed", len(payloads)) + } + // Should timeout second batch + d.ExpectTimeoutThenMessage(t) + found = d.Events() + if len(found) != 2 { + t.Errorf("got %v, want 2 batches to have been flushed", len(found)) + } + payloads = *found[1] + if len(payloads) != 2 { + t.Errorf("got %v, want 2 payloads to have been flushed", len(payloads)) + } +} + +func Test_closing(t *testing.T) { + d := newDummySink() + ctx := context.Background() + c := NewClient(ctx, d, hclog.Default()) + for i := 0; i < 2; i++ { + c.LogEvent(&evt{i}) + } + found := d.Events() + if len(found) != 0 { + t.Errorf("got %v events, want 0 due to batching", len(found)) + } + c.Close() + found = d.Events() + if len(found) != 1 { + t.Errorf("got %v, want 1 batch to have been flushed", len(found)) + } + payloads := *found[0] + if len(payloads) != 2 { + t.Errorf("got %v, want 2 payloads to have been flushed", len(payloads)) + } +} + +func Test_closingByContext(t *testing.T) { + d := newDummySink() + ctx, cancel := context.WithCancel(context.Background()) + c := NewClient(ctx, d, hclog.Default()) + for i := 0; i < 2; i++ { + c.LogEvent(&evt{i}) + } + found := d.Events() + if len(found) != 0 { + t.Errorf("got %v events, want 0 due to batching", len(found)) + } + cancel() + d.ExpectImmediateMessage(t) + found = d.Events() + if len(found) != 1 { + t.Errorf("got %v, want 1 batch to have been flushed", len(found)) + } + payloads := *found[0] + if len(payloads) != 2 { + t.Errorf("got %v, want 2 payloads to have been flushed", len(payloads)) + } +} + +func Test_addSessionId(t *testing.T) { + events := []struct { + Foo string `mapstructure:"foo"` + }{ + { + Foo: "foo1", + }, + { + Foo: "foo2", + }, + } + arr := make([]interface{}, len(events)) + for i, event := range events { + arr[i] = event + } + sessionID := "my-uuid" + output, err := addSessionID(sessionID, arr) + if err != nil { + t.Errorf("failed to encode analytics events: %v", err) + } + if len(output) != 2 { + t.Errorf("len output got %v, want 2", len(output)) + } + if output[0]["foo"] != "foo1" { + t.Errorf("first event foo got %v, want foo1", output[0]["foo"]) + } + for i, event := range output { + if event["sessionId"] != "my-uuid" { + t.Errorf("event %v sessionId got %v, want %v", i, event["sessionId"], sessionID) + } + } +} |
