aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/cli/internal/analytics
diff options
context:
space:
mode:
author简律纯 <hsiangnianian@outlook.com>2023-04-28 01:36:44 +0800
committer简律纯 <hsiangnianian@outlook.com>2023-04-28 01:36:44 +0800
commitdd84b9d64fb98746a230cd24233ff50a562c39c9 (patch)
treeb583261ef00b3afe72ec4d6dacb31e57779a6faf /cli/internal/analytics
parent0b46fcd72ac34382387b2bcf9095233efbcc52f4 (diff)
downloadHydroRoll-dd84b9d64fb98746a230cd24233ff50a562c39c9.tar.gz
HydroRoll-dd84b9d64fb98746a230cd24233ff50a562c39c9.zip
Diffstat (limited to 'cli/internal/analytics')
-rw-r--r--cli/internal/analytics/analytics.go175
-rw-r--r--cli/internal/analytics/analytics_test.go192
2 files changed, 367 insertions, 0 deletions
diff --git a/cli/internal/analytics/analytics.go b/cli/internal/analytics/analytics.go
new file mode 100644
index 0000000..8d9a3b6
--- /dev/null
+++ b/cli/internal/analytics/analytics.go
@@ -0,0 +1,175 @@
+package analytics
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "github.com/google/uuid"
+ "github.com/hashicorp/go-hclog"
+ "github.com/mitchellh/mapstructure"
+ "github.com/vercel/turbo/cli/internal/util"
+)
+
+type Events = []map[string]interface{}
+
+type EventPayload = interface{}
+
+type Recorder interface {
+ LogEvent(payload EventPayload)
+}
+
+type Client interface {
+ Recorder
+ Close()
+ CloseWithTimeout(timeout time.Duration)
+}
+
+type Sink interface {
+ RecordAnalyticsEvents(events Events) error
+}
+
+type nullSink struct{}
+
+func (n *nullSink) RecordAnalyticsEvents(events Events) error {
+ return nil
+}
+
+// NullSink is an analytics sink to use in the event that we don't want to send
+// analytics
+var NullSink = &nullSink{}
+
+type client struct {
+ ch chan<- EventPayload
+ cancel func()
+
+ worker *worker
+}
+
+type worker struct {
+ buffer []EventPayload
+ ch <-chan EventPayload
+ ctx context.Context
+ doneSemaphore util.Semaphore
+ sessionID uuid.UUID
+ sink Sink
+ wg sync.WaitGroup
+ logger hclog.Logger
+}
+
+const bufferThreshold = 10
+const eventTimeout = 200 * time.Millisecond
+const noTimeout = 24 * time.Hour
+
+func newWorker(ctx context.Context, ch <-chan EventPayload, sink Sink, logger hclog.Logger) *worker {
+ buffer := []EventPayload{}
+ sessionID := uuid.New()
+ w := &worker{
+ buffer: buffer,
+ ch: ch,
+ ctx: ctx,
+ doneSemaphore: util.NewSemaphore(1),
+ sessionID: sessionID,
+ sink: sink,
+ logger: logger,
+ }
+ w.doneSemaphore.Acquire()
+ go w.analyticsClient()
+ return w
+}
+
+func NewClient(parent context.Context, sink Sink, logger hclog.Logger) Client {
+ ch := make(chan EventPayload)
+ ctx, cancel := context.WithCancel(parent)
+ // creates and starts the worker
+ worker := newWorker(ctx, ch, sink, logger)
+ s := &client{
+ ch: ch,
+ cancel: cancel,
+ worker: worker,
+ }
+ return s
+}
+
+func (s *client) LogEvent(event EventPayload) {
+ s.ch <- event
+}
+
+func (s *client) Close() {
+ s.cancel()
+ s.worker.Wait()
+}
+
+func (s *client) CloseWithTimeout(timeout time.Duration) {
+ ch := make(chan struct{})
+ go func() {
+ s.Close()
+ close(ch)
+ }()
+ select {
+ case <-ch:
+ case <-time.After(timeout):
+ }
+}
+
+func (w *worker) Wait() {
+ w.doneSemaphore.Acquire()
+ w.wg.Wait()
+}
+
+func (w *worker) analyticsClient() {
+ timeout := time.After(noTimeout)
+ for {
+ select {
+ case e := <-w.ch:
+ w.buffer = append(w.buffer, e)
+ if len(w.buffer) == bufferThreshold {
+ w.flush()
+ timeout = time.After(noTimeout)
+ } else {
+ timeout = time.After(eventTimeout)
+ }
+ case <-timeout:
+ w.flush()
+ timeout = time.After(noTimeout)
+ case <-w.ctx.Done():
+ w.flush()
+ w.doneSemaphore.Release()
+ return
+ }
+ }
+}
+
+func (w *worker) flush() {
+ if len(w.buffer) > 0 {
+ w.sendEvents(w.buffer)
+ w.buffer = []EventPayload{}
+ }
+}
+
+func (w *worker) sendEvents(events []EventPayload) {
+ w.wg.Add(1)
+ go func() {
+ payload, err := addSessionID(w.sessionID.String(), events)
+ if err != nil {
+ w.logger.Debug("failed to encode cache usage analytics", "error", err)
+ }
+ err = w.sink.RecordAnalyticsEvents(payload)
+ if err != nil {
+ w.logger.Debug("failed to record cache usage analytics", "error", err)
+ }
+ w.wg.Done()
+ }()
+}
+
+func addSessionID(sessionID string, events []EventPayload) (Events, error) {
+ eventMaps := []map[string]interface{}{}
+ err := mapstructure.Decode(events, &eventMaps)
+ if err != nil {
+ return nil, err
+ }
+ for _, event := range eventMaps {
+ event["sessionId"] = sessionID
+ }
+ return eventMaps, nil
+}
diff --git a/cli/internal/analytics/analytics_test.go b/cli/internal/analytics/analytics_test.go
new file mode 100644
index 0000000..0715fda
--- /dev/null
+++ b/cli/internal/analytics/analytics_test.go
@@ -0,0 +1,192 @@
+package analytics
+
+import (
+ "context"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/hashicorp/go-hclog"
+)
+
+type dummySink struct {
+ events []*Events
+ err error
+ mu sync.Mutex
+ ch chan struct{}
+}
+
+type evt struct {
+ I int
+}
+
+func newDummySink() *dummySink {
+ return &dummySink{
+ events: []*Events{},
+ ch: make(chan struct{}, 1),
+ }
+}
+
+func (d *dummySink) RecordAnalyticsEvents(events Events) error {
+ d.mu.Lock()
+ defer d.mu.Unlock()
+ // Make a copy in case a test is holding a copy too
+ eventsCopy := make([]*Events, len(d.events))
+ copy(eventsCopy, d.events)
+ d.events = append(eventsCopy, &events)
+ d.ch <- struct{}{}
+ return d.err
+}
+
+func (d *dummySink) Events() []*Events {
+ d.mu.Lock()
+ defer d.mu.Unlock()
+ return d.events
+}
+
+func (d *dummySink) ExpectImmediateMessage(t *testing.T) {
+ select {
+ case <-time.After(150 * time.Millisecond):
+ t.Errorf("expected to not wait out the flush timeout")
+ case <-d.ch:
+ }
+}
+
+func (d *dummySink) ExpectTimeoutThenMessage(t *testing.T) {
+ select {
+ case <-d.ch:
+ t.Errorf("Expected to wait out the flush timeout")
+ case <-time.After(150 * time.Millisecond):
+ }
+ <-d.ch
+}
+
+func Test_batching(t *testing.T) {
+ d := newDummySink()
+ ctx := context.Background()
+ c := NewClient(ctx, d, hclog.Default())
+ for i := 0; i < 2; i++ {
+ c.LogEvent(&evt{i})
+ }
+ found := d.Events()
+ if len(found) != 0 {
+ t.Errorf("got %v events, want 0 due to batching", len(found))
+ }
+ // Should timeout
+ d.ExpectTimeoutThenMessage(t)
+ found = d.Events()
+ if len(found) != 1 {
+ t.Errorf("got %v, want 1 batch to have been flushed", len(found))
+ }
+ payloads := *found[0]
+ if len(payloads) != 2 {
+ t.Errorf("got %v, want 2 payloads to have been flushed", len(payloads))
+ }
+}
+
+func Test_batchingAcrossTwoBatches(t *testing.T) {
+ d := newDummySink()
+ ctx := context.Background()
+ c := NewClient(ctx, d, hclog.Default())
+ for i := 0; i < 12; i++ {
+ c.LogEvent(&evt{i})
+ }
+ // We sent more than the batch size, expect a message immediately
+ d.ExpectImmediateMessage(t)
+ found := d.Events()
+ if len(found) != 1 {
+ t.Errorf("got %v, want 1 batch to have been flushed", len(found))
+ }
+ payloads := *found[0]
+ if len(payloads) != 10 {
+ t.Errorf("got %v, want 10 payloads to have been flushed", len(payloads))
+ }
+ // Should timeout second batch
+ d.ExpectTimeoutThenMessage(t)
+ found = d.Events()
+ if len(found) != 2 {
+ t.Errorf("got %v, want 2 batches to have been flushed", len(found))
+ }
+ payloads = *found[1]
+ if len(payloads) != 2 {
+ t.Errorf("got %v, want 2 payloads to have been flushed", len(payloads))
+ }
+}
+
+func Test_closing(t *testing.T) {
+ d := newDummySink()
+ ctx := context.Background()
+ c := NewClient(ctx, d, hclog.Default())
+ for i := 0; i < 2; i++ {
+ c.LogEvent(&evt{i})
+ }
+ found := d.Events()
+ if len(found) != 0 {
+ t.Errorf("got %v events, want 0 due to batching", len(found))
+ }
+ c.Close()
+ found = d.Events()
+ if len(found) != 1 {
+ t.Errorf("got %v, want 1 batch to have been flushed", len(found))
+ }
+ payloads := *found[0]
+ if len(payloads) != 2 {
+ t.Errorf("got %v, want 2 payloads to have been flushed", len(payloads))
+ }
+}
+
+func Test_closingByContext(t *testing.T) {
+ d := newDummySink()
+ ctx, cancel := context.WithCancel(context.Background())
+ c := NewClient(ctx, d, hclog.Default())
+ for i := 0; i < 2; i++ {
+ c.LogEvent(&evt{i})
+ }
+ found := d.Events()
+ if len(found) != 0 {
+ t.Errorf("got %v events, want 0 due to batching", len(found))
+ }
+ cancel()
+ d.ExpectImmediateMessage(t)
+ found = d.Events()
+ if len(found) != 1 {
+ t.Errorf("got %v, want 1 batch to have been flushed", len(found))
+ }
+ payloads := *found[0]
+ if len(payloads) != 2 {
+ t.Errorf("got %v, want 2 payloads to have been flushed", len(payloads))
+ }
+}
+
+func Test_addSessionId(t *testing.T) {
+ events := []struct {
+ Foo string `mapstructure:"foo"`
+ }{
+ {
+ Foo: "foo1",
+ },
+ {
+ Foo: "foo2",
+ },
+ }
+ arr := make([]interface{}, len(events))
+ for i, event := range events {
+ arr[i] = event
+ }
+ sessionID := "my-uuid"
+ output, err := addSessionID(sessionID, arr)
+ if err != nil {
+ t.Errorf("failed to encode analytics events: %v", err)
+ }
+ if len(output) != 2 {
+ t.Errorf("len output got %v, want 2", len(output))
+ }
+ if output[0]["foo"] != "foo1" {
+ t.Errorf("first event foo got %v, want foo1", output[0]["foo"])
+ }
+ for i, event := range output {
+ if event["sessionId"] != "my-uuid" {
+ t.Errorf("event %v sessionId got %v, want %v", i, event["sessionId"], sessionID)
+ }
+ }
+}