diff options
Diffstat (limited to 'cli/internal/analytics')
| -rw-r--r-- | cli/internal/analytics/analytics.go | 175 | ||||
| -rw-r--r-- | cli/internal/analytics/analytics_test.go | 192 |
2 files changed, 0 insertions, 367 deletions
diff --git a/cli/internal/analytics/analytics.go b/cli/internal/analytics/analytics.go deleted file mode 100644 index 8d9a3b6..0000000 --- a/cli/internal/analytics/analytics.go +++ /dev/null @@ -1,175 +0,0 @@ -package analytics - -import ( - "context" - "sync" - "time" - - "github.com/google/uuid" - "github.com/hashicorp/go-hclog" - "github.com/mitchellh/mapstructure" - "github.com/vercel/turbo/cli/internal/util" -) - -type Events = []map[string]interface{} - -type EventPayload = interface{} - -type Recorder interface { - LogEvent(payload EventPayload) -} - -type Client interface { - Recorder - Close() - CloseWithTimeout(timeout time.Duration) -} - -type Sink interface { - RecordAnalyticsEvents(events Events) error -} - -type nullSink struct{} - -func (n *nullSink) RecordAnalyticsEvents(events Events) error { - return nil -} - -// NullSink is an analytics sink to use in the event that we don't want to send -// analytics -var NullSink = &nullSink{} - -type client struct { - ch chan<- EventPayload - cancel func() - - worker *worker -} - -type worker struct { - buffer []EventPayload - ch <-chan EventPayload - ctx context.Context - doneSemaphore util.Semaphore - sessionID uuid.UUID - sink Sink - wg sync.WaitGroup - logger hclog.Logger -} - -const bufferThreshold = 10 -const eventTimeout = 200 * time.Millisecond -const noTimeout = 24 * time.Hour - -func newWorker(ctx context.Context, ch <-chan EventPayload, sink Sink, logger hclog.Logger) *worker { - buffer := []EventPayload{} - sessionID := uuid.New() - w := &worker{ - buffer: buffer, - ch: ch, - ctx: ctx, - doneSemaphore: util.NewSemaphore(1), - sessionID: sessionID, - sink: sink, - logger: logger, - } - w.doneSemaphore.Acquire() - go w.analyticsClient() - return w -} - -func NewClient(parent context.Context, sink Sink, logger hclog.Logger) Client { - ch := make(chan EventPayload) - ctx, cancel := context.WithCancel(parent) - // creates and starts the worker - worker := newWorker(ctx, ch, sink, logger) - s := &client{ - ch: ch, - cancel: cancel, - worker: worker, - } - return s -} - -func (s *client) LogEvent(event EventPayload) { - s.ch <- event -} - -func (s *client) Close() { - s.cancel() - s.worker.Wait() -} - -func (s *client) CloseWithTimeout(timeout time.Duration) { - ch := make(chan struct{}) - go func() { - s.Close() - close(ch) - }() - select { - case <-ch: - case <-time.After(timeout): - } -} - -func (w *worker) Wait() { - w.doneSemaphore.Acquire() - w.wg.Wait() -} - -func (w *worker) analyticsClient() { - timeout := time.After(noTimeout) - for { - select { - case e := <-w.ch: - w.buffer = append(w.buffer, e) - if len(w.buffer) == bufferThreshold { - w.flush() - timeout = time.After(noTimeout) - } else { - timeout = time.After(eventTimeout) - } - case <-timeout: - w.flush() - timeout = time.After(noTimeout) - case <-w.ctx.Done(): - w.flush() - w.doneSemaphore.Release() - return - } - } -} - -func (w *worker) flush() { - if len(w.buffer) > 0 { - w.sendEvents(w.buffer) - w.buffer = []EventPayload{} - } -} - -func (w *worker) sendEvents(events []EventPayload) { - w.wg.Add(1) - go func() { - payload, err := addSessionID(w.sessionID.String(), events) - if err != nil { - w.logger.Debug("failed to encode cache usage analytics", "error", err) - } - err = w.sink.RecordAnalyticsEvents(payload) - if err != nil { - w.logger.Debug("failed to record cache usage analytics", "error", err) - } - w.wg.Done() - }() -} - -func addSessionID(sessionID string, events []EventPayload) (Events, error) { - eventMaps := []map[string]interface{}{} - err := mapstructure.Decode(events, &eventMaps) - if err != nil { - return nil, err - } - for _, event := range eventMaps { - event["sessionId"] = sessionID - } - return eventMaps, nil -} diff --git a/cli/internal/analytics/analytics_test.go b/cli/internal/analytics/analytics_test.go deleted file mode 100644 index 0715fda..0000000 --- a/cli/internal/analytics/analytics_test.go +++ /dev/null @@ -1,192 +0,0 @@ -package analytics - -import ( - "context" - "sync" - "testing" - "time" - - "github.com/hashicorp/go-hclog" -) - -type dummySink struct { - events []*Events - err error - mu sync.Mutex - ch chan struct{} -} - -type evt struct { - I int -} - -func newDummySink() *dummySink { - return &dummySink{ - events: []*Events{}, - ch: make(chan struct{}, 1), - } -} - -func (d *dummySink) RecordAnalyticsEvents(events Events) error { - d.mu.Lock() - defer d.mu.Unlock() - // Make a copy in case a test is holding a copy too - eventsCopy := make([]*Events, len(d.events)) - copy(eventsCopy, d.events) - d.events = append(eventsCopy, &events) - d.ch <- struct{}{} - return d.err -} - -func (d *dummySink) Events() []*Events { - d.mu.Lock() - defer d.mu.Unlock() - return d.events -} - -func (d *dummySink) ExpectImmediateMessage(t *testing.T) { - select { - case <-time.After(150 * time.Millisecond): - t.Errorf("expected to not wait out the flush timeout") - case <-d.ch: - } -} - -func (d *dummySink) ExpectTimeoutThenMessage(t *testing.T) { - select { - case <-d.ch: - t.Errorf("Expected to wait out the flush timeout") - case <-time.After(150 * time.Millisecond): - } - <-d.ch -} - -func Test_batching(t *testing.T) { - d := newDummySink() - ctx := context.Background() - c := NewClient(ctx, d, hclog.Default()) - for i := 0; i < 2; i++ { - c.LogEvent(&evt{i}) - } - found := d.Events() - if len(found) != 0 { - t.Errorf("got %v events, want 0 due to batching", len(found)) - } - // Should timeout - d.ExpectTimeoutThenMessage(t) - found = d.Events() - if len(found) != 1 { - t.Errorf("got %v, want 1 batch to have been flushed", len(found)) - } - payloads := *found[0] - if len(payloads) != 2 { - t.Errorf("got %v, want 2 payloads to have been flushed", len(payloads)) - } -} - -func Test_batchingAcrossTwoBatches(t *testing.T) { - d := newDummySink() - ctx := context.Background() - c := NewClient(ctx, d, hclog.Default()) - for i := 0; i < 12; i++ { - c.LogEvent(&evt{i}) - } - // We sent more than the batch size, expect a message immediately - d.ExpectImmediateMessage(t) - found := d.Events() - if len(found) != 1 { - t.Errorf("got %v, want 1 batch to have been flushed", len(found)) - } - payloads := *found[0] - if len(payloads) != 10 { - t.Errorf("got %v, want 10 payloads to have been flushed", len(payloads)) - } - // Should timeout second batch - d.ExpectTimeoutThenMessage(t) - found = d.Events() - if len(found) != 2 { - t.Errorf("got %v, want 2 batches to have been flushed", len(found)) - } - payloads = *found[1] - if len(payloads) != 2 { - t.Errorf("got %v, want 2 payloads to have been flushed", len(payloads)) - } -} - -func Test_closing(t *testing.T) { - d := newDummySink() - ctx := context.Background() - c := NewClient(ctx, d, hclog.Default()) - for i := 0; i < 2; i++ { - c.LogEvent(&evt{i}) - } - found := d.Events() - if len(found) != 0 { - t.Errorf("got %v events, want 0 due to batching", len(found)) - } - c.Close() - found = d.Events() - if len(found) != 1 { - t.Errorf("got %v, want 1 batch to have been flushed", len(found)) - } - payloads := *found[0] - if len(payloads) != 2 { - t.Errorf("got %v, want 2 payloads to have been flushed", len(payloads)) - } -} - -func Test_closingByContext(t *testing.T) { - d := newDummySink() - ctx, cancel := context.WithCancel(context.Background()) - c := NewClient(ctx, d, hclog.Default()) - for i := 0; i < 2; i++ { - c.LogEvent(&evt{i}) - } - found := d.Events() - if len(found) != 0 { - t.Errorf("got %v events, want 0 due to batching", len(found)) - } - cancel() - d.ExpectImmediateMessage(t) - found = d.Events() - if len(found) != 1 { - t.Errorf("got %v, want 1 batch to have been flushed", len(found)) - } - payloads := *found[0] - if len(payloads) != 2 { - t.Errorf("got %v, want 2 payloads to have been flushed", len(payloads)) - } -} - -func Test_addSessionId(t *testing.T) { - events := []struct { - Foo string `mapstructure:"foo"` - }{ - { - Foo: "foo1", - }, - { - Foo: "foo2", - }, - } - arr := make([]interface{}, len(events)) - for i, event := range events { - arr[i] = event - } - sessionID := "my-uuid" - output, err := addSessionID(sessionID, arr) - if err != nil { - t.Errorf("failed to encode analytics events: %v", err) - } - if len(output) != 2 { - t.Errorf("len output got %v, want 2", len(output)) - } - if output[0]["foo"] != "foo1" { - t.Errorf("first event foo got %v, want foo1", output[0]["foo"]) - } - for i, event := range output { - if event["sessionId"] != "my-uuid" { - t.Errorf("event %v sessionId got %v, want %v", i, event["sessionId"], sessionID) - } - } -} |
