aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/cli/internal/analytics
diff options
context:
space:
mode:
author简律纯 <hsiangnianian@outlook.com>2023-04-28 01:36:55 +0800
committer简律纯 <hsiangnianian@outlook.com>2023-04-28 01:36:55 +0800
commitfc8c5fdce62fb229202659408798a7b6c98f6e8b (patch)
tree7554f80e50de4af6fd255afa7c21bcdd58a7af34 /cli/internal/analytics
parentdd84b9d64fb98746a230cd24233ff50a562c39c9 (diff)
downloadHydroRoll-fc8c5fdce62fb229202659408798a7b6c98f6e8b.tar.gz
HydroRoll-fc8c5fdce62fb229202659408798a7b6c98f6e8b.zip
Diffstat (limited to 'cli/internal/analytics')
-rw-r--r--cli/internal/analytics/analytics.go175
-rw-r--r--cli/internal/analytics/analytics_test.go192
2 files changed, 0 insertions, 367 deletions
diff --git a/cli/internal/analytics/analytics.go b/cli/internal/analytics/analytics.go
deleted file mode 100644
index 8d9a3b6..0000000
--- a/cli/internal/analytics/analytics.go
+++ /dev/null
@@ -1,175 +0,0 @@
-package analytics
-
-import (
- "context"
- "sync"
- "time"
-
- "github.com/google/uuid"
- "github.com/hashicorp/go-hclog"
- "github.com/mitchellh/mapstructure"
- "github.com/vercel/turbo/cli/internal/util"
-)
-
-type Events = []map[string]interface{}
-
-type EventPayload = interface{}
-
-type Recorder interface {
- LogEvent(payload EventPayload)
-}
-
-type Client interface {
- Recorder
- Close()
- CloseWithTimeout(timeout time.Duration)
-}
-
-type Sink interface {
- RecordAnalyticsEvents(events Events) error
-}
-
-type nullSink struct{}
-
-func (n *nullSink) RecordAnalyticsEvents(events Events) error {
- return nil
-}
-
-// NullSink is an analytics sink to use in the event that we don't want to send
-// analytics
-var NullSink = &nullSink{}
-
-type client struct {
- ch chan<- EventPayload
- cancel func()
-
- worker *worker
-}
-
-type worker struct {
- buffer []EventPayload
- ch <-chan EventPayload
- ctx context.Context
- doneSemaphore util.Semaphore
- sessionID uuid.UUID
- sink Sink
- wg sync.WaitGroup
- logger hclog.Logger
-}
-
-const bufferThreshold = 10
-const eventTimeout = 200 * time.Millisecond
-const noTimeout = 24 * time.Hour
-
-func newWorker(ctx context.Context, ch <-chan EventPayload, sink Sink, logger hclog.Logger) *worker {
- buffer := []EventPayload{}
- sessionID := uuid.New()
- w := &worker{
- buffer: buffer,
- ch: ch,
- ctx: ctx,
- doneSemaphore: util.NewSemaphore(1),
- sessionID: sessionID,
- sink: sink,
- logger: logger,
- }
- w.doneSemaphore.Acquire()
- go w.analyticsClient()
- return w
-}
-
-func NewClient(parent context.Context, sink Sink, logger hclog.Logger) Client {
- ch := make(chan EventPayload)
- ctx, cancel := context.WithCancel(parent)
- // creates and starts the worker
- worker := newWorker(ctx, ch, sink, logger)
- s := &client{
- ch: ch,
- cancel: cancel,
- worker: worker,
- }
- return s
-}
-
-func (s *client) LogEvent(event EventPayload) {
- s.ch <- event
-}
-
-func (s *client) Close() {
- s.cancel()
- s.worker.Wait()
-}
-
-func (s *client) CloseWithTimeout(timeout time.Duration) {
- ch := make(chan struct{})
- go func() {
- s.Close()
- close(ch)
- }()
- select {
- case <-ch:
- case <-time.After(timeout):
- }
-}
-
-func (w *worker) Wait() {
- w.doneSemaphore.Acquire()
- w.wg.Wait()
-}
-
-func (w *worker) analyticsClient() {
- timeout := time.After(noTimeout)
- for {
- select {
- case e := <-w.ch:
- w.buffer = append(w.buffer, e)
- if len(w.buffer) == bufferThreshold {
- w.flush()
- timeout = time.After(noTimeout)
- } else {
- timeout = time.After(eventTimeout)
- }
- case <-timeout:
- w.flush()
- timeout = time.After(noTimeout)
- case <-w.ctx.Done():
- w.flush()
- w.doneSemaphore.Release()
- return
- }
- }
-}
-
-func (w *worker) flush() {
- if len(w.buffer) > 0 {
- w.sendEvents(w.buffer)
- w.buffer = []EventPayload{}
- }
-}
-
-func (w *worker) sendEvents(events []EventPayload) {
- w.wg.Add(1)
- go func() {
- payload, err := addSessionID(w.sessionID.String(), events)
- if err != nil {
- w.logger.Debug("failed to encode cache usage analytics", "error", err)
- }
- err = w.sink.RecordAnalyticsEvents(payload)
- if err != nil {
- w.logger.Debug("failed to record cache usage analytics", "error", err)
- }
- w.wg.Done()
- }()
-}
-
-func addSessionID(sessionID string, events []EventPayload) (Events, error) {
- eventMaps := []map[string]interface{}{}
- err := mapstructure.Decode(events, &eventMaps)
- if err != nil {
- return nil, err
- }
- for _, event := range eventMaps {
- event["sessionId"] = sessionID
- }
- return eventMaps, nil
-}
diff --git a/cli/internal/analytics/analytics_test.go b/cli/internal/analytics/analytics_test.go
deleted file mode 100644
index 0715fda..0000000
--- a/cli/internal/analytics/analytics_test.go
+++ /dev/null
@@ -1,192 +0,0 @@
-package analytics
-
-import (
- "context"
- "sync"
- "testing"
- "time"
-
- "github.com/hashicorp/go-hclog"
-)
-
-type dummySink struct {
- events []*Events
- err error
- mu sync.Mutex
- ch chan struct{}
-}
-
-type evt struct {
- I int
-}
-
-func newDummySink() *dummySink {
- return &dummySink{
- events: []*Events{},
- ch: make(chan struct{}, 1),
- }
-}
-
-func (d *dummySink) RecordAnalyticsEvents(events Events) error {
- d.mu.Lock()
- defer d.mu.Unlock()
- // Make a copy in case a test is holding a copy too
- eventsCopy := make([]*Events, len(d.events))
- copy(eventsCopy, d.events)
- d.events = append(eventsCopy, &events)
- d.ch <- struct{}{}
- return d.err
-}
-
-func (d *dummySink) Events() []*Events {
- d.mu.Lock()
- defer d.mu.Unlock()
- return d.events
-}
-
-func (d *dummySink) ExpectImmediateMessage(t *testing.T) {
- select {
- case <-time.After(150 * time.Millisecond):
- t.Errorf("expected to not wait out the flush timeout")
- case <-d.ch:
- }
-}
-
-func (d *dummySink) ExpectTimeoutThenMessage(t *testing.T) {
- select {
- case <-d.ch:
- t.Errorf("Expected to wait out the flush timeout")
- case <-time.After(150 * time.Millisecond):
- }
- <-d.ch
-}
-
-func Test_batching(t *testing.T) {
- d := newDummySink()
- ctx := context.Background()
- c := NewClient(ctx, d, hclog.Default())
- for i := 0; i < 2; i++ {
- c.LogEvent(&evt{i})
- }
- found := d.Events()
- if len(found) != 0 {
- t.Errorf("got %v events, want 0 due to batching", len(found))
- }
- // Should timeout
- d.ExpectTimeoutThenMessage(t)
- found = d.Events()
- if len(found) != 1 {
- t.Errorf("got %v, want 1 batch to have been flushed", len(found))
- }
- payloads := *found[0]
- if len(payloads) != 2 {
- t.Errorf("got %v, want 2 payloads to have been flushed", len(payloads))
- }
-}
-
-func Test_batchingAcrossTwoBatches(t *testing.T) {
- d := newDummySink()
- ctx := context.Background()
- c := NewClient(ctx, d, hclog.Default())
- for i := 0; i < 12; i++ {
- c.LogEvent(&evt{i})
- }
- // We sent more than the batch size, expect a message immediately
- d.ExpectImmediateMessage(t)
- found := d.Events()
- if len(found) != 1 {
- t.Errorf("got %v, want 1 batch to have been flushed", len(found))
- }
- payloads := *found[0]
- if len(payloads) != 10 {
- t.Errorf("got %v, want 10 payloads to have been flushed", len(payloads))
- }
- // Should timeout second batch
- d.ExpectTimeoutThenMessage(t)
- found = d.Events()
- if len(found) != 2 {
- t.Errorf("got %v, want 2 batches to have been flushed", len(found))
- }
- payloads = *found[1]
- if len(payloads) != 2 {
- t.Errorf("got %v, want 2 payloads to have been flushed", len(payloads))
- }
-}
-
-func Test_closing(t *testing.T) {
- d := newDummySink()
- ctx := context.Background()
- c := NewClient(ctx, d, hclog.Default())
- for i := 0; i < 2; i++ {
- c.LogEvent(&evt{i})
- }
- found := d.Events()
- if len(found) != 0 {
- t.Errorf("got %v events, want 0 due to batching", len(found))
- }
- c.Close()
- found = d.Events()
- if len(found) != 1 {
- t.Errorf("got %v, want 1 batch to have been flushed", len(found))
- }
- payloads := *found[0]
- if len(payloads) != 2 {
- t.Errorf("got %v, want 2 payloads to have been flushed", len(payloads))
- }
-}
-
-func Test_closingByContext(t *testing.T) {
- d := newDummySink()
- ctx, cancel := context.WithCancel(context.Background())
- c := NewClient(ctx, d, hclog.Default())
- for i := 0; i < 2; i++ {
- c.LogEvent(&evt{i})
- }
- found := d.Events()
- if len(found) != 0 {
- t.Errorf("got %v events, want 0 due to batching", len(found))
- }
- cancel()
- d.ExpectImmediateMessage(t)
- found = d.Events()
- if len(found) != 1 {
- t.Errorf("got %v, want 1 batch to have been flushed", len(found))
- }
- payloads := *found[0]
- if len(payloads) != 2 {
- t.Errorf("got %v, want 2 payloads to have been flushed", len(payloads))
- }
-}
-
-func Test_addSessionId(t *testing.T) {
- events := []struct {
- Foo string `mapstructure:"foo"`
- }{
- {
- Foo: "foo1",
- },
- {
- Foo: "foo2",
- },
- }
- arr := make([]interface{}, len(events))
- for i, event := range events {
- arr[i] = event
- }
- sessionID := "my-uuid"
- output, err := addSessionID(sessionID, arr)
- if err != nil {
- t.Errorf("failed to encode analytics events: %v", err)
- }
- if len(output) != 2 {
- t.Errorf("len output got %v, want 2", len(output))
- }
- if output[0]["foo"] != "foo1" {
- t.Errorf("first event foo got %v, want foo1", output[0]["foo"])
- }
- for i, event := range output {
- if event["sessionId"] != "my-uuid" {
- t.Errorf("event %v sessionId got %v, want %v", i, event["sessionId"], sessionID)
- }
- }
-}