aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/cli/internal/cache/cache_http.go
diff options
context:
space:
mode:
Diffstat (limited to 'cli/internal/cache/cache_http.go')
-rw-r--r--cli/internal/cache/cache_http.go375
1 files changed, 0 insertions, 375 deletions
diff --git a/cli/internal/cache/cache_http.go b/cli/internal/cache/cache_http.go
deleted file mode 100644
index 1d345bf..0000000
--- a/cli/internal/cache/cache_http.go
+++ /dev/null
@@ -1,375 +0,0 @@
-// Adapted from https://github.com/thought-machine/please
-// Copyright Thought Machine, Inc. or its affiliates. All Rights Reserved.
-// SPDX-License-Identifier: Apache-2.0
-package cache
-
-import (
- "archive/tar"
- "bytes"
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- log "log"
- "net/http"
- "os"
- "path/filepath"
- "strconv"
- "time"
-
- "github.com/DataDog/zstd"
-
- "github.com/vercel/turbo/cli/internal/analytics"
- "github.com/vercel/turbo/cli/internal/tarpatch"
- "github.com/vercel/turbo/cli/internal/turbopath"
-)
-
-type client interface {
- PutArtifact(hash string, body []byte, duration int, tag string) error
- FetchArtifact(hash string) (*http.Response, error)
- ArtifactExists(hash string) (*http.Response, error)
- GetTeamID() string
-}
-
-type httpCache struct {
- writable bool
- client client
- requestLimiter limiter
- recorder analytics.Recorder
- signerVerifier *ArtifactSignatureAuthentication
- repoRoot turbopath.AbsoluteSystemPath
-}
-
-type limiter chan struct{}
-
-func (l limiter) acquire() {
- l <- struct{}{}
-}
-
-func (l limiter) release() {
- <-l
-}
-
-// mtime is the time we attach for the modification time of all files.
-var mtime = time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)
-
-// nobody is the usual uid / gid of the 'nobody' user.
-const nobody = 65534
-
-func (cache *httpCache) Put(_ turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error {
- // if cache.writable {
- cache.requestLimiter.acquire()
- defer cache.requestLimiter.release()
-
- r, w := io.Pipe()
- go cache.write(w, hash, files)
-
- // Read the entire artifact tar into memory so we can easily compute the signature.
- // Note: retryablehttp.NewRequest reads the files into memory anyways so there's no
- // additional overhead by doing the ioutil.ReadAll here instead.
- artifactBody, err := ioutil.ReadAll(r)
- if err != nil {
- return fmt.Errorf("failed to store files in HTTP cache: %w", err)
- }
- tag := ""
- if cache.signerVerifier.isEnabled() {
- tag, err = cache.signerVerifier.generateTag(hash, artifactBody)
- if err != nil {
- return fmt.Errorf("failed to store files in HTTP cache: %w", err)
- }
- }
- return cache.client.PutArtifact(hash, artifactBody, duration, tag)
-}
-
-// write writes a series of files into the given Writer.
-func (cache *httpCache) write(w io.WriteCloser, hash string, files []turbopath.AnchoredSystemPath) {
- defer w.Close()
- defer func() { _ = w.Close() }()
- zw := zstd.NewWriter(w)
- defer func() { _ = zw.Close() }()
- tw := tar.NewWriter(zw)
- defer func() { _ = tw.Close() }()
- for _, file := range files {
- // log.Printf("caching file %v", file)
- if err := cache.storeFile(tw, file); err != nil {
- log.Printf("[ERROR] Error uploading artifact %s to HTTP cache due to: %s", file, err)
- // TODO(jaredpalmer): How can we cancel the request at this point?
- }
- }
-}
-
-func (cache *httpCache) storeFile(tw *tar.Writer, repoRelativePath turbopath.AnchoredSystemPath) error {
- absoluteFilePath := repoRelativePath.RestoreAnchor(cache.repoRoot)
- info, err := absoluteFilePath.Lstat()
- if err != nil {
- return err
- }
- target := ""
- if info.Mode()&os.ModeSymlink != 0 {
- target, err = absoluteFilePath.Readlink()
- if err != nil {
- return err
- }
- }
- hdr, err := tarpatch.FileInfoHeader(repoRelativePath.ToUnixPath(), info, filepath.ToSlash(target))
- if err != nil {
- return err
- }
- // Ensure posix path for filename written in header.
- hdr.Name = repoRelativePath.ToUnixPath().ToString()
- // Zero out all timestamps.
- hdr.ModTime = mtime
- hdr.AccessTime = mtime
- hdr.ChangeTime = mtime
- // Strip user/group ids.
- hdr.Uid = nobody
- hdr.Gid = nobody
- hdr.Uname = "nobody"
- hdr.Gname = "nobody"
- if err := tw.WriteHeader(hdr); err != nil {
- return err
- } else if info.IsDir() || target != "" {
- return nil // nothing to write
- }
- f, err := absoluteFilePath.Open()
- if err != nil {
- return err
- }
- defer func() { _ = f.Close() }()
- _, err = io.Copy(tw, f)
- if errors.Is(err, tar.ErrWriteTooLong) {
- log.Printf("Error writing %v to tar file, info: %v, mode: %v, is regular: %v", repoRelativePath, info, info.Mode(), info.Mode().IsRegular())
- }
- return err
-}
-
-func (cache *httpCache) Fetch(_ turbopath.AbsoluteSystemPath, key string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) {
- cache.requestLimiter.acquire()
- defer cache.requestLimiter.release()
- hit, files, duration, err := cache.retrieve(key)
- if err != nil {
- // TODO: analytics event?
- return ItemStatus{Remote: false}, files, duration, fmt.Errorf("failed to retrieve files from HTTP cache: %w", err)
- }
- cache.logFetch(hit, key, duration)
- return ItemStatus{Remote: hit}, files, duration, err
-}
-
-func (cache *httpCache) Exists(key string) ItemStatus {
- cache.requestLimiter.acquire()
- defer cache.requestLimiter.release()
- hit, err := cache.exists(key)
- if err != nil {
- return ItemStatus{Remote: false}
- }
- return ItemStatus{Remote: hit}
-}
-
-func (cache *httpCache) logFetch(hit bool, hash string, duration int) {
- var event string
- if hit {
- event = CacheEventHit
- } else {
- event = CacheEventMiss
- }
- payload := &CacheEvent{
- Source: CacheSourceRemote,
- Event: event,
- Hash: hash,
- Duration: duration,
- }
- cache.recorder.LogEvent(payload)
-}
-
-func (cache *httpCache) exists(hash string) (bool, error) {
- resp, err := cache.client.ArtifactExists(hash)
- if err != nil {
- return false, nil
- }
-
- defer func() { err = resp.Body.Close() }()
-
- if resp.StatusCode == http.StatusNotFound {
- return false, nil
- } else if resp.StatusCode != http.StatusOK {
- return false, fmt.Errorf("%s", strconv.Itoa(resp.StatusCode))
- }
- return true, err
-}
-
-func (cache *httpCache) retrieve(hash string) (bool, []turbopath.AnchoredSystemPath, int, error) {
- resp, err := cache.client.FetchArtifact(hash)
- if err != nil {
- return false, nil, 0, err
- }
- defer resp.Body.Close()
- if resp.StatusCode == http.StatusNotFound {
- return false, nil, 0, nil // doesn't exist - not an error
- } else if resp.StatusCode != http.StatusOK {
- b, _ := ioutil.ReadAll(resp.Body)
- return false, nil, 0, fmt.Errorf("%s", string(b))
- }
- // If present, extract the duration from the response.
- duration := 0
- if resp.Header.Get("x-artifact-duration") != "" {
- intVar, err := strconv.Atoi(resp.Header.Get("x-artifact-duration"))
- if err != nil {
- return false, nil, 0, fmt.Errorf("invalid x-artifact-duration header: %w", err)
- }
- duration = intVar
- }
- var tarReader io.Reader
-
- defer func() { _ = resp.Body.Close() }()
- if cache.signerVerifier.isEnabled() {
- expectedTag := resp.Header.Get("x-artifact-tag")
- if expectedTag == "" {
- // If the verifier is enabled all incoming artifact downloads must have a signature
- return false, nil, 0, errors.New("artifact verification failed: Downloaded artifact is missing required x-artifact-tag header")
- }
- b, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- return false, nil, 0, fmt.Errorf("artifact verification failed: %w", err)
- }
- isValid, err := cache.signerVerifier.validate(hash, b, expectedTag)
- if err != nil {
- return false, nil, 0, fmt.Errorf("artifact verification failed: %w", err)
- }
- if !isValid {
- err = fmt.Errorf("artifact verification failed: artifact tag does not match expected tag %s", expectedTag)
- return false, nil, 0, err
- }
- // The artifact has been verified and the body can be read and untarred
- tarReader = bytes.NewReader(b)
- } else {
- tarReader = resp.Body
- }
- files, err := restoreTar(cache.repoRoot, tarReader)
- if err != nil {
- return false, nil, 0, err
- }
- return true, files, duration, nil
-}
-
-// restoreTar returns posix-style repo-relative paths of the files it
-// restored. In the future, these should likely be repo-relative system paths
-// so that they are suitable for being fed into cache.Put for other caches.
-// For now, I think this is working because windows also accepts /-delimited paths.
-func restoreTar(root turbopath.AbsoluteSystemPath, reader io.Reader) ([]turbopath.AnchoredSystemPath, error) {
- files := []turbopath.AnchoredSystemPath{}
- missingLinks := []*tar.Header{}
- zr := zstd.NewReader(reader)
- var closeError error
- defer func() { closeError = zr.Close() }()
- tr := tar.NewReader(zr)
- for {
- hdr, err := tr.Next()
- if err != nil {
- if err == io.EOF {
- for _, link := range missingLinks {
- err := restoreSymlink(root, link, true)
- if err != nil {
- return nil, err
- }
- }
-
- return files, closeError
- }
- return nil, err
- }
- // hdr.Name is always a posix-style path
- // FIXME: THIS IS A BUG.
- restoredName := turbopath.AnchoredUnixPath(hdr.Name)
- files = append(files, restoredName.ToSystemPath())
- filename := restoredName.ToSystemPath().RestoreAnchor(root)
- if isChild, err := root.ContainsPath(filename); err != nil {
- return nil, err
- } else if !isChild {
- return nil, fmt.Errorf("cannot untar file to %v", filename)
- }
- switch hdr.Typeflag {
- case tar.TypeDir:
- if err := filename.MkdirAll(0775); err != nil {
- return nil, err
- }
- case tar.TypeReg:
- if dir := filename.Dir(); dir != "." {
- if err := dir.MkdirAll(0775); err != nil {
- return nil, err
- }
- }
- if f, err := filename.OpenFile(os.O_WRONLY|os.O_TRUNC|os.O_CREATE, os.FileMode(hdr.Mode)); err != nil {
- return nil, err
- } else if _, err := io.Copy(f, tr); err != nil {
- return nil, err
- } else if err := f.Close(); err != nil {
- return nil, err
- }
- case tar.TypeSymlink:
- if err := restoreSymlink(root, hdr, false); errors.Is(err, errNonexistentLinkTarget) {
- missingLinks = append(missingLinks, hdr)
- } else if err != nil {
- return nil, err
- }
- default:
- log.Printf("Unhandled file type %d for %s", hdr.Typeflag, hdr.Name)
- }
- }
-}
-
-var errNonexistentLinkTarget = errors.New("the link target does not exist")
-
-func restoreSymlink(root turbopath.AbsoluteSystemPath, hdr *tar.Header, allowNonexistentTargets bool) error {
- // Note that hdr.Linkname is really the link target
- relativeLinkTarget := filepath.FromSlash(hdr.Linkname)
- linkFilename := root.UntypedJoin(hdr.Name)
- if err := linkFilename.EnsureDir(); err != nil {
- return err
- }
-
- // TODO: check if this is an absolute path, or if we even care
- linkTarget := linkFilename.Dir().UntypedJoin(relativeLinkTarget)
- if _, err := linkTarget.Lstat(); err != nil {
- if os.IsNotExist(err) {
- if !allowNonexistentTargets {
- return errNonexistentLinkTarget
- }
- // if we're allowing nonexistent link targets, proceed to creating the link
- } else {
- return err
- }
- }
- // Ensure that the link we're about to create doesn't already exist
- if err := linkFilename.Remove(); err != nil && !errors.Is(err, os.ErrNotExist) {
- return err
- }
- if err := linkFilename.Symlink(relativeLinkTarget); err != nil {
- return err
- }
- return nil
-}
-
-func (cache *httpCache) Clean(_ turbopath.AbsoluteSystemPath) {
- // Not possible; this implementation can only clean for a hash.
-}
-
-func (cache *httpCache) CleanAll() {
- // Also not possible.
-}
-
-func (cache *httpCache) Shutdown() {}
-
-func newHTTPCache(opts Opts, client client, recorder analytics.Recorder) *httpCache {
- return &httpCache{
- writable: true,
- client: client,
- requestLimiter: make(limiter, 20),
- recorder: recorder,
- signerVerifier: &ArtifactSignatureAuthentication{
- // TODO(Gaspar): this should use RemoteCacheOptions.TeamId once we start
- // enforcing team restrictions for repositories.
- teamId: client.GetTeamID(),
- enabled: opts.RemoteCacheOpts.Signature,
- },
- }
-}