diff options
Diffstat (limited to 'cli/internal/cache/cache_http.go')
| -rw-r--r-- | cli/internal/cache/cache_http.go | 375 |
1 files changed, 0 insertions, 375 deletions
diff --git a/cli/internal/cache/cache_http.go b/cli/internal/cache/cache_http.go deleted file mode 100644 index 1d345bf..0000000 --- a/cli/internal/cache/cache_http.go +++ /dev/null @@ -1,375 +0,0 @@ -// Adapted from https://github.com/thought-machine/please -// Copyright Thought Machine, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 -package cache - -import ( - "archive/tar" - "bytes" - "errors" - "fmt" - "io" - "io/ioutil" - log "log" - "net/http" - "os" - "path/filepath" - "strconv" - "time" - - "github.com/DataDog/zstd" - - "github.com/vercel/turbo/cli/internal/analytics" - "github.com/vercel/turbo/cli/internal/tarpatch" - "github.com/vercel/turbo/cli/internal/turbopath" -) - -type client interface { - PutArtifact(hash string, body []byte, duration int, tag string) error - FetchArtifact(hash string) (*http.Response, error) - ArtifactExists(hash string) (*http.Response, error) - GetTeamID() string -} - -type httpCache struct { - writable bool - client client - requestLimiter limiter - recorder analytics.Recorder - signerVerifier *ArtifactSignatureAuthentication - repoRoot turbopath.AbsoluteSystemPath -} - -type limiter chan struct{} - -func (l limiter) acquire() { - l <- struct{}{} -} - -func (l limiter) release() { - <-l -} - -// mtime is the time we attach for the modification time of all files. -var mtime = time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC) - -// nobody is the usual uid / gid of the 'nobody' user. -const nobody = 65534 - -func (cache *httpCache) Put(_ turbopath.AbsoluteSystemPath, hash string, duration int, files []turbopath.AnchoredSystemPath) error { - // if cache.writable { - cache.requestLimiter.acquire() - defer cache.requestLimiter.release() - - r, w := io.Pipe() - go cache.write(w, hash, files) - - // Read the entire artifact tar into memory so we can easily compute the signature. - // Note: retryablehttp.NewRequest reads the files into memory anyways so there's no - // additional overhead by doing the ioutil.ReadAll here instead. - artifactBody, err := ioutil.ReadAll(r) - if err != nil { - return fmt.Errorf("failed to store files in HTTP cache: %w", err) - } - tag := "" - if cache.signerVerifier.isEnabled() { - tag, err = cache.signerVerifier.generateTag(hash, artifactBody) - if err != nil { - return fmt.Errorf("failed to store files in HTTP cache: %w", err) - } - } - return cache.client.PutArtifact(hash, artifactBody, duration, tag) -} - -// write writes a series of files into the given Writer. -func (cache *httpCache) write(w io.WriteCloser, hash string, files []turbopath.AnchoredSystemPath) { - defer w.Close() - defer func() { _ = w.Close() }() - zw := zstd.NewWriter(w) - defer func() { _ = zw.Close() }() - tw := tar.NewWriter(zw) - defer func() { _ = tw.Close() }() - for _, file := range files { - // log.Printf("caching file %v", file) - if err := cache.storeFile(tw, file); err != nil { - log.Printf("[ERROR] Error uploading artifact %s to HTTP cache due to: %s", file, err) - // TODO(jaredpalmer): How can we cancel the request at this point? - } - } -} - -func (cache *httpCache) storeFile(tw *tar.Writer, repoRelativePath turbopath.AnchoredSystemPath) error { - absoluteFilePath := repoRelativePath.RestoreAnchor(cache.repoRoot) - info, err := absoluteFilePath.Lstat() - if err != nil { - return err - } - target := "" - if info.Mode()&os.ModeSymlink != 0 { - target, err = absoluteFilePath.Readlink() - if err != nil { - return err - } - } - hdr, err := tarpatch.FileInfoHeader(repoRelativePath.ToUnixPath(), info, filepath.ToSlash(target)) - if err != nil { - return err - } - // Ensure posix path for filename written in header. - hdr.Name = repoRelativePath.ToUnixPath().ToString() - // Zero out all timestamps. - hdr.ModTime = mtime - hdr.AccessTime = mtime - hdr.ChangeTime = mtime - // Strip user/group ids. - hdr.Uid = nobody - hdr.Gid = nobody - hdr.Uname = "nobody" - hdr.Gname = "nobody" - if err := tw.WriteHeader(hdr); err != nil { - return err - } else if info.IsDir() || target != "" { - return nil // nothing to write - } - f, err := absoluteFilePath.Open() - if err != nil { - return err - } - defer func() { _ = f.Close() }() - _, err = io.Copy(tw, f) - if errors.Is(err, tar.ErrWriteTooLong) { - log.Printf("Error writing %v to tar file, info: %v, mode: %v, is regular: %v", repoRelativePath, info, info.Mode(), info.Mode().IsRegular()) - } - return err -} - -func (cache *httpCache) Fetch(_ turbopath.AbsoluteSystemPath, key string, _ []string) (ItemStatus, []turbopath.AnchoredSystemPath, int, error) { - cache.requestLimiter.acquire() - defer cache.requestLimiter.release() - hit, files, duration, err := cache.retrieve(key) - if err != nil { - // TODO: analytics event? - return ItemStatus{Remote: false}, files, duration, fmt.Errorf("failed to retrieve files from HTTP cache: %w", err) - } - cache.logFetch(hit, key, duration) - return ItemStatus{Remote: hit}, files, duration, err -} - -func (cache *httpCache) Exists(key string) ItemStatus { - cache.requestLimiter.acquire() - defer cache.requestLimiter.release() - hit, err := cache.exists(key) - if err != nil { - return ItemStatus{Remote: false} - } - return ItemStatus{Remote: hit} -} - -func (cache *httpCache) logFetch(hit bool, hash string, duration int) { - var event string - if hit { - event = CacheEventHit - } else { - event = CacheEventMiss - } - payload := &CacheEvent{ - Source: CacheSourceRemote, - Event: event, - Hash: hash, - Duration: duration, - } - cache.recorder.LogEvent(payload) -} - -func (cache *httpCache) exists(hash string) (bool, error) { - resp, err := cache.client.ArtifactExists(hash) - if err != nil { - return false, nil - } - - defer func() { err = resp.Body.Close() }() - - if resp.StatusCode == http.StatusNotFound { - return false, nil - } else if resp.StatusCode != http.StatusOK { - return false, fmt.Errorf("%s", strconv.Itoa(resp.StatusCode)) - } - return true, err -} - -func (cache *httpCache) retrieve(hash string) (bool, []turbopath.AnchoredSystemPath, int, error) { - resp, err := cache.client.FetchArtifact(hash) - if err != nil { - return false, nil, 0, err - } - defer resp.Body.Close() - if resp.StatusCode == http.StatusNotFound { - return false, nil, 0, nil // doesn't exist - not an error - } else if resp.StatusCode != http.StatusOK { - b, _ := ioutil.ReadAll(resp.Body) - return false, nil, 0, fmt.Errorf("%s", string(b)) - } - // If present, extract the duration from the response. - duration := 0 - if resp.Header.Get("x-artifact-duration") != "" { - intVar, err := strconv.Atoi(resp.Header.Get("x-artifact-duration")) - if err != nil { - return false, nil, 0, fmt.Errorf("invalid x-artifact-duration header: %w", err) - } - duration = intVar - } - var tarReader io.Reader - - defer func() { _ = resp.Body.Close() }() - if cache.signerVerifier.isEnabled() { - expectedTag := resp.Header.Get("x-artifact-tag") - if expectedTag == "" { - // If the verifier is enabled all incoming artifact downloads must have a signature - return false, nil, 0, errors.New("artifact verification failed: Downloaded artifact is missing required x-artifact-tag header") - } - b, err := ioutil.ReadAll(resp.Body) - if err != nil { - return false, nil, 0, fmt.Errorf("artifact verification failed: %w", err) - } - isValid, err := cache.signerVerifier.validate(hash, b, expectedTag) - if err != nil { - return false, nil, 0, fmt.Errorf("artifact verification failed: %w", err) - } - if !isValid { - err = fmt.Errorf("artifact verification failed: artifact tag does not match expected tag %s", expectedTag) - return false, nil, 0, err - } - // The artifact has been verified and the body can be read and untarred - tarReader = bytes.NewReader(b) - } else { - tarReader = resp.Body - } - files, err := restoreTar(cache.repoRoot, tarReader) - if err != nil { - return false, nil, 0, err - } - return true, files, duration, nil -} - -// restoreTar returns posix-style repo-relative paths of the files it -// restored. In the future, these should likely be repo-relative system paths -// so that they are suitable for being fed into cache.Put for other caches. -// For now, I think this is working because windows also accepts /-delimited paths. -func restoreTar(root turbopath.AbsoluteSystemPath, reader io.Reader) ([]turbopath.AnchoredSystemPath, error) { - files := []turbopath.AnchoredSystemPath{} - missingLinks := []*tar.Header{} - zr := zstd.NewReader(reader) - var closeError error - defer func() { closeError = zr.Close() }() - tr := tar.NewReader(zr) - for { - hdr, err := tr.Next() - if err != nil { - if err == io.EOF { - for _, link := range missingLinks { - err := restoreSymlink(root, link, true) - if err != nil { - return nil, err - } - } - - return files, closeError - } - return nil, err - } - // hdr.Name is always a posix-style path - // FIXME: THIS IS A BUG. - restoredName := turbopath.AnchoredUnixPath(hdr.Name) - files = append(files, restoredName.ToSystemPath()) - filename := restoredName.ToSystemPath().RestoreAnchor(root) - if isChild, err := root.ContainsPath(filename); err != nil { - return nil, err - } else if !isChild { - return nil, fmt.Errorf("cannot untar file to %v", filename) - } - switch hdr.Typeflag { - case tar.TypeDir: - if err := filename.MkdirAll(0775); err != nil { - return nil, err - } - case tar.TypeReg: - if dir := filename.Dir(); dir != "." { - if err := dir.MkdirAll(0775); err != nil { - return nil, err - } - } - if f, err := filename.OpenFile(os.O_WRONLY|os.O_TRUNC|os.O_CREATE, os.FileMode(hdr.Mode)); err != nil { - return nil, err - } else if _, err := io.Copy(f, tr); err != nil { - return nil, err - } else if err := f.Close(); err != nil { - return nil, err - } - case tar.TypeSymlink: - if err := restoreSymlink(root, hdr, false); errors.Is(err, errNonexistentLinkTarget) { - missingLinks = append(missingLinks, hdr) - } else if err != nil { - return nil, err - } - default: - log.Printf("Unhandled file type %d for %s", hdr.Typeflag, hdr.Name) - } - } -} - -var errNonexistentLinkTarget = errors.New("the link target does not exist") - -func restoreSymlink(root turbopath.AbsoluteSystemPath, hdr *tar.Header, allowNonexistentTargets bool) error { - // Note that hdr.Linkname is really the link target - relativeLinkTarget := filepath.FromSlash(hdr.Linkname) - linkFilename := root.UntypedJoin(hdr.Name) - if err := linkFilename.EnsureDir(); err != nil { - return err - } - - // TODO: check if this is an absolute path, or if we even care - linkTarget := linkFilename.Dir().UntypedJoin(relativeLinkTarget) - if _, err := linkTarget.Lstat(); err != nil { - if os.IsNotExist(err) { - if !allowNonexistentTargets { - return errNonexistentLinkTarget - } - // if we're allowing nonexistent link targets, proceed to creating the link - } else { - return err - } - } - // Ensure that the link we're about to create doesn't already exist - if err := linkFilename.Remove(); err != nil && !errors.Is(err, os.ErrNotExist) { - return err - } - if err := linkFilename.Symlink(relativeLinkTarget); err != nil { - return err - } - return nil -} - -func (cache *httpCache) Clean(_ turbopath.AbsoluteSystemPath) { - // Not possible; this implementation can only clean for a hash. -} - -func (cache *httpCache) CleanAll() { - // Also not possible. -} - -func (cache *httpCache) Shutdown() {} - -func newHTTPCache(opts Opts, client client, recorder analytics.Recorder) *httpCache { - return &httpCache{ - writable: true, - client: client, - requestLimiter: make(limiter, 20), - recorder: recorder, - signerVerifier: &ArtifactSignatureAuthentication{ - // TODO(Gaspar): this should use RemoteCacheOptions.TeamId once we start - // enforcing team restrictions for repositories. - teamId: client.GetTeamID(), - enabled: opts.RemoteCacheOpts.Signature, - }, - } -} |
