package docker import ( "bytes" "context" "encoding/json" "errors" "fmt" "io" "math" "mime" "mime/multipart" "net/http" "net/url" "os" "os/exec" "strings" "sync" "github.com/containers/image/v5/docker/reference" "github.com/containers/image/v5/internal/imagesource/impl" "github.com/containers/image/v5/internal/imagesource/stubs" "github.com/containers/image/v5/internal/iolimits" "github.com/containers/image/v5/internal/private" "github.com/containers/image/v5/internal/signature" "github.com/containers/image/v5/manifest" "github.com/containers/image/v5/pkg/blobinfocache/none" "github.com/containers/image/v5/pkg/sysregistriesv2" "github.com/containers/image/v5/types" "github.com/containers/storage/pkg/regexp" digest "github.com/opencontainers/go-digest" "github.com/sirupsen/logrus" ) // maxLookasideSignatures is an arbitrary limit for the total number of signatures we would try to read from a lookaside server, // even if it were broken or malicious and it continued serving an enormous number of items. const maxLookasideSignatures = 128 type dockerImageSource struct { impl.Compat impl.PropertyMethodsInitialize impl.DoesNotAffectLayerInfosForCopy stubs.ImplementsGetBlobAt logicalRef dockerReference // The reference the user requested. This must satisfy !isUnknownDigest physicalRef dockerReference // The actual reference we are accessing (possibly a mirror). This must satisfy !isUnknownDigest c *dockerClient // State cachedManifest []byte // nil if not loaded yet cachedManifestMIMEType string // Only valid if cachedManifest != nil } // newImageSource creates a new ImageSource for the specified image reference. // The caller must call .Close() on the returned ImageSource. // The caller must ensure !ref.isUnknownDigest. func newImageSource(ctx context.Context, sys *types.SystemContext, ref dockerReference) (*dockerImageSource, error) { if ref.isUnknownDigest { return nil, fmt.Errorf("reading images from docker: reference %q without a tag or digest is not supported", ref.StringWithinTransport()) } registryConfig, err := loadRegistryConfiguration(sys) if err != nil { return nil, err } registry, err := sysregistriesv2.FindRegistry(sys, ref.ref.Name()) if err != nil { return nil, fmt.Errorf("loading registries configuration: %w", err) } if registry == nil { // No configuration was found for the provided reference, so use the // equivalent of a default configuration. registry = &sysregistriesv2.Registry{ Endpoint: sysregistriesv2.Endpoint{ Location: ref.ref.String(), }, Prefix: ref.ref.String(), } } // Check all endpoints for the manifest availability. If we find one that does // contain the image, it will be used for all future pull actions. Always try the // non-mirror original location last; this both transparently handles the case // of no mirrors configured, and ensures we return the error encountered when // accessing the upstream location if all endpoints fail. pullSources, err := registry.PullSourcesFromReference(ref.ref) if err != nil { return nil, err } type attempt struct { ref reference.Named err error } attempts := []attempt{} for _, pullSource := range pullSources { if sys != nil && sys.DockerLogMirrorChoice { logrus.Infof("Trying to access %q", pullSource.Reference) } else { logrus.Debugf("Trying to access %q", pullSource.Reference) } s, err := newImageSourceAttempt(ctx, sys, ref, pullSource, registryConfig) if err == nil { return s, nil } logrus.Debugf("Accessing %q failed: %v", pullSource.Reference, err) attempts = append(attempts, attempt{ ref: pullSource.Reference, err: err, }) } switch len(attempts) { case 0: return nil, errors.New("Internal error: newImageSource returned without trying any endpoint") case 1: return nil, attempts[0].err // If no mirrors are used, perfectly preserve the error type and add no noise. default: // Don’t just build a string, try to preserve the typed error. primary := &attempts[len(attempts)-1] extras := []string{} for _, attempt := range attempts[:len(attempts)-1] { // This is difficult to fit into a single-line string, when the error can contain arbitrary strings including any metacharacters we decide to use. // The paired [] at least have some chance of being unambiguous. extras = append(extras, fmt.Sprintf("[%s: %v]", attempt.ref.String(), attempt.err)) } return nil, fmt.Errorf("(Mirrors also failed: %s): %s: %w", strings.Join(extras, "\n"), primary.ref.String(), primary.err) } } // newImageSourceAttempt is an internal helper for newImageSource. Everyone else must call newImageSource. // Given a logicalReference and a pullSource, return a dockerImageSource if it is reachable. // The caller must call .Close() on the returned ImageSource. func newImageSourceAttempt(ctx context.Context, sys *types.SystemContext, logicalRef dockerReference, pullSource sysregistriesv2.PullSource, registryConfig *registryConfiguration) (*dockerImageSource, error) { physicalRef, err := newReference(pullSource.Reference, false) if err != nil { return nil, err } endpointSys := sys // sys.DockerAuthConfig does not explicitly specify a registry; we must not blindly send the credentials intended for the primary endpoint to mirrors. if endpointSys != nil && endpointSys.DockerAuthConfig != nil && reference.Domain(physicalRef.ref) != reference.Domain(logicalRef.ref) { copy := *endpointSys copy.DockerAuthConfig = nil copy.DockerBearerRegistryToken = "" endpointSys = © } client, err := newDockerClientFromRef(endpointSys, physicalRef, registryConfig, false, "pull") if err != nil { return nil, err } client.tlsClientConfig.InsecureSkipVerify = pullSource.Endpoint.Insecure s := &dockerImageSource{ PropertyMethodsInitialize: impl.PropertyMethods(impl.Properties{ HasThreadSafeGetBlob: true, }), logicalRef: logicalRef, physicalRef: physicalRef, c: client, } s.Compat = impl.AddCompat(s) if err := s.ensureManifestIsLoaded(ctx); err != nil { client.Close() return nil, err } if h, err := sysregistriesv2.AdditionalLayerStoreAuthHelper(endpointSys); err == nil && h != "" { acf := map[string]struct { Username string `json:"username,omitempty"` Password string `json:"password,omitempty"` IdentityToken string `json:"identityToken,omitempty"` }{ physicalRef.ref.String(): { Username: client.auth.Username, Password: client.auth.Password, IdentityToken: client.auth.IdentityToken, }, } acfD, err := json.Marshal(acf) if err != nil { logrus.Warnf("failed to marshal auth config: %v", err) } else { cmd := exec.Command(h) cmd.Stdin = bytes.NewReader(acfD) if err := cmd.Run(); err != nil { var stderr string if ee, ok := err.(*exec.ExitError); ok { stderr = string(ee.Stderr) } logrus.Warnf("Failed to call additional-layer-store-auth-helper (stderr:%s): %v", stderr, err) } } } return s, nil } // Reference returns the reference used to set up this source, _as specified by the user_ // (not as the image itself, or its underlying storage, claims). This can be used e.g. to determine which public keys are trusted for this image. func (s *dockerImageSource) Reference() types.ImageReference { return s.logicalRef } // Close removes resources associated with an initialized ImageSource, if any. func (s *dockerImageSource) Close() error { return s.c.Close() } // simplifyContentType drops parameters from a HTTP media type (see https://tools.ietf.org/html/rfc7231#section-3.1.1.1) // Alternatively, an empty string is returned unchanged, and invalid values are "simplified" to an empty string. func simplifyContentType(contentType string) string { if contentType == "" { return contentType } mimeType, _, err := mime.ParseMediaType(contentType) if err != nil { return "" } return mimeType } // GetManifest returns the image's manifest along with its MIME type (which may be empty when it can't be determined but the manifest is available). // It may use a remote (= slow) service. // If instanceDigest is not nil, it contains a digest of the specific manifest instance to retrieve (when the primary manifest is a manifest list); // this never happens if the primary manifest is not a manifest list (e.g. if the source never returns manifest lists). func (s *dockerImageSource) GetManifest(ctx context.Context, instanceDigest *digest.Digest) ([]byte, string, error) { if instanceDigest != nil { if err := instanceDigest.Validate(); err != nil { // Make sure instanceDigest.String() does not contain any unexpected characters return nil, "", err } return s.fetchManifest(ctx, instanceDigest.String()) } err := s.ensureManifestIsLoaded(ctx) if err != nil { return nil, "", err } return s.cachedManifest, s.cachedManifestMIMEType, nil } // fetchManifest fetches a manifest for tagOrDigest. // The caller is responsible for ensuring tagOrDigest uses the expected format. func (s *dockerImageSource) fetchManifest(ctx context.Context, tagOrDigest string) ([]byte, string, error) { return s.c.fetchManifest(ctx, s.physicalRef, tagOrDigest) } // ensureManifestIsLoaded sets s.cachedManifest and s.cachedManifestMIMEType // // ImageSource implementations are not required or expected to do any caching, // but because our signatures are “attached” to the manifest digest, // we need to ensure that the digest of the manifest returned by GetManifest(ctx, nil) // and used by GetSignatures(ctx, nil) are consistent, otherwise we would get spurious // signature verification failures when pulling while a tag is being updated. func (s *dockerImageSource) ensureManifestIsLoaded(ctx context.Context) error { if s.cachedManifest != nil { return nil } reference, err := s.physicalRef.tagOrDigest() if err != nil { return err } manblob, mt, err := s.fetchManifest(ctx, reference) if err != nil { return err } // We might validate manblob against the Docker-Content-Digest header here to protect against transport errors. s.cachedManifest = manblob s.cachedManifestMIMEType = mt return nil } // splitHTTP200ResponseToPartial splits a 200 response in multiple streams as specified by the chunks func splitHTTP200ResponseToPartial(streams chan io.ReadCloser, errs chan error, body io.ReadCloser, chunks []private.ImageSourceChunk) { defer close(streams) defer close(errs) currentOffset := uint64(0) body = makeBufferedNetworkReader(body, 64, 16384) defer body.Close() for _, c := range chunks { if c.Offset != currentOffset { if c.Offset < currentOffset { errs <- fmt.Errorf("invalid chunk offset specified %v (expected >= %v)", c.Offset, currentOffset) break } toSkip := c.Offset - currentOffset if _, err := io.Copy(io.Discard, io.LimitReader(body, int64(toSkip))); err != nil { errs <- err break } currentOffset += toSkip } var reader io.Reader if c.Length == math.MaxUint64 { reader = body } else { reader = io.LimitReader(body, int64(c.Length)) } s := signalCloseReader{ closed: make(chan struct{}), stream: io.NopCloser(reader), consumeStream: true, } streams <- s // Wait until the stream is closed before going to the next chunk <-s.closed currentOffset += c.Length } } // handle206Response reads a 206 response and send each part as a separate ReadCloser to the streams chan. func handle206Response(streams chan io.ReadCloser, errs chan error, body io.ReadCloser, chunks []private.ImageSourceChunk, mediaType string, params map[string]string) { defer close(streams) defer close(errs) if !strings.HasPrefix(mediaType, "multipart/") { streams <- body return } boundary, found := params["boundary"] if !found { errs <- errors.New("could not find boundary") body.Close() return } buffered := makeBufferedNetworkReader(body, 64, 16384) defer buffered.Close() mr := multipart.NewReader(buffered, boundary) parts := 0 for { p, err := mr.NextPart() if err != nil { if err != io.EOF { errs <- err } if parts != len(chunks) { errs <- errors.New("invalid number of chunks returned by the server") } return } if parts >= len(chunks) { errs <- errors.New("too many parts returned by the server") break } s := signalCloseReader{ closed: make(chan struct{}), stream: p, } streams <- s // NextPart() cannot be called while the current part // is being read, so wait until it is closed <-s.closed parts++ } } var multipartByteRangesRe = regexp.Delayed("multipart/byteranges; boundary=([A-Za-z-0-9:]+)") func parseMediaType(contentType string) (string, map[string]string, error) { mediaType, params, err := mime.ParseMediaType(contentType) if err != nil { if err == mime.ErrInvalidMediaParameter { // CloudFront returns an invalid MIME type, that contains an unquoted ":" in the boundary // param, let's handle it here. matches := multipartByteRangesRe.FindStringSubmatch(contentType) if len(matches) == 2 { mediaType = "multipart/byteranges" params = map[string]string{ "boundary": matches[1], } err = nil } } if err != nil { return "", nil, err } } return mediaType, params, err } // GetBlobAt returns a sequential channel of readers that contain data for the requested // blob chunks, and a channel that might get a single error value. // The specified chunks must be not overlapping and sorted by their offset. // The readers must be fully consumed, in the order they are returned, before blocking // to read the next chunk. // If the Length for the last chunk is set to math.MaxUint64, then it // fully fetches the remaining data from the offset to the end of the blob. func (s *dockerImageSource) GetBlobAt(ctx context.Context, info types.BlobInfo, chunks []private.ImageSourceChunk) (chan io.ReadCloser, chan error, error) { headers := make(map[string][]string) rangeVals := make([]string, 0, len(chunks)) lastFound := false for _, c := range chunks { if lastFound { return nil, nil, fmt.Errorf("internal error: another chunk requested after an util-EOF chunk") } // If the Length is set to -1, then request anything after the specified offset. if c.Length == math.MaxUint64 { lastFound = true rangeVals = append(rangeVals, fmt.Sprintf("%d-", c.Offset)) } else { rangeVals = append(rangeVals, fmt.Sprintf("%d-%d", c.Offset, c.Offset+c.Length-1)) } } headers["Range"] = []string{fmt.Sprintf("bytes=%s", strings.Join(rangeVals, ","))} if len(info.URLs) != 0 { return nil, nil, fmt.Errorf("external URLs not supported with GetBlobAt") } if err := info.Digest.Validate(); err != nil { // Make sure info.Digest.String() does not contain any unexpected characters return nil, nil, err } path := fmt.Sprintf(blobsPath, reference.Path(s.physicalRef.ref), info.Digest.String()) logrus.Debugf("Downloading %s", path) res, err := s.c.makeRequest(ctx, http.MethodGet, path, headers, nil, v2Auth, nil) if err != nil { return nil, nil, err } switch res.StatusCode { case http.StatusOK: // if the server replied with a 200 status code, convert the full body response to a series of // streams as it would have been done with 206. streams := make(chan io.ReadCloser) errs := make(chan error) go splitHTTP200ResponseToPartial(streams, errs, res.Body, chunks) return streams, errs, nil case http.StatusPartialContent: mediaType, params, err := parseMediaType(res.Header.Get("Content-Type")) if err != nil { return nil, nil, err } streams := make(chan io.ReadCloser) errs := make(chan error) go handle206Response(streams, errs, res.Body, chunks, mediaType, params) return streams, errs, nil case http.StatusBadRequest: res.Body.Close() return nil, nil, private.BadPartialRequestError{Status: res.Status} default: err := registryHTTPResponseToError(res) res.Body.Close() return nil, nil, fmt.Errorf("fetching partial blob: %w", err) } } // GetBlob returns a stream for the specified blob, and the blob’s size (or -1 if unknown). // The Digest field in BlobInfo is guaranteed to be provided, Size may be -1 and MediaType may be optionally provided. // May update BlobInfoCache, preferably after it knows for certain that a blob truly exists at a specific location. func (s *dockerImageSource) GetBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache) (io.ReadCloser, int64, error) { return s.c.getBlob(ctx, s.physicalRef, info, cache) } // GetSignaturesWithFormat returns the image's signatures. It may use a remote (= slow) service. // If instanceDigest is not nil, it contains a digest of the specific manifest instance to retrieve signatures for // (when the primary manifest is a manifest list); this never happens if the primary manifest is not a manifest list // (e.g. if the source never returns manifest lists). func (s *dockerImageSource) GetSignaturesWithFormat(ctx context.Context, instanceDigest *digest.Digest) ([]signature.Signature, error) { if err := s.c.detectProperties(ctx); err != nil { return nil, err } var res []signature.Signature switch { case s.c.supportsSignatures: if err := s.appendSignaturesFromAPIExtension(ctx, &res, instanceDigest); err != nil { return nil, err } case s.c.signatureBase != nil: if err := s.appendSignaturesFromLookaside(ctx, &res, instanceDigest); err != nil { return nil, err } default: return nil, errors.New("Internal error: X-Registry-Supports-Signatures extension not supported, and lookaside should not be empty configuration") } if err := s.appendSignaturesFromSigstoreAttachments(ctx, &res, instanceDigest); err != nil { return nil, err } return res, nil } // manifestDigest returns a digest of the manifest, from instanceDigest if non-nil; or from the supplied reference, // or finally, from a fetched manifest. func (s *dockerImageSource) manifestDigest(ctx context.Context, instanceDigest *digest.Digest) (digest.Digest, error) { if instanceDigest != nil { return *instanceDigest, nil } if digested, ok := s.physicalRef.ref.(reference.Digested); ok { d := digested.Digest() if d.Algorithm() == digest.Canonical { return d, nil } } if err := s.ensureManifestIsLoaded(ctx); err != nil { return "", err } return manifest.Digest(s.cachedManifest) } // appendSignaturesFromLookaside implements GetSignaturesWithFormat() from the lookaside location configured in s.c.signatureBase, // which is not nil, storing the signatures to *dest. // On error, the contents of *dest are undefined. func (s *dockerImageSource) appendSignaturesFromLookaside(ctx context.Context, dest *[]signature.Signature, instanceDigest *digest.Digest) error { manifestDigest, err := s.manifestDigest(ctx, instanceDigest) if err != nil { return err } // NOTE: Keep this in sync with docs/signature-protocols.md! for i := 0; ; i++ { if i >= maxLookasideSignatures { return fmt.Errorf("server provided %d signatures, assuming that's unreasonable and a server error", maxLookasideSignatures) } sigURL, err := lookasideStorageURL(s.c.signatureBase, manifestDigest, i) if err != nil { return err } signature, missing, err := s.getOneSignature(ctx, sigURL) if err != nil { return err } if missing { break } *dest = append(*dest, signature) } return nil } // getOneSignature downloads one signature from sigURL, and returns (signature, false, nil) // If it successfully determines that the signature does not exist, returns (nil, true, nil). // NOTE: Keep this in sync with docs/signature-protocols.md! func (s *dockerImageSource) getOneSignature(ctx context.Context, sigURL *url.URL) (signature.Signature, bool, error) { switch sigURL.Scheme { case "file": logrus.Debugf("Reading %s", sigURL.Path) sigBlob, err := os.ReadFile(sigURL.Path) if err != nil { if os.IsNotExist(err) { return nil, true, nil } return nil, false, err } sig, err := signature.FromBlob(sigBlob) if err != nil { return nil, false, fmt.Errorf("parsing signature %q: %w", sigURL.Path, err) } return sig, false, nil case "http", "https": logrus.Debugf("GET %s", sigURL.Redacted()) req, err := http.NewRequestWithContext(ctx, http.MethodGet, sigURL.String(), nil) if err != nil { return nil, false, err } res, err := s.c.client.Do(req) if err != nil { return nil, false, err } defer res.Body.Close() if res.StatusCode == http.StatusNotFound { logrus.Debugf("... got status 404, as expected = end of signatures") return nil, true, nil } else if res.StatusCode != http.StatusOK { return nil, false, fmt.Errorf("reading signature from %s: %w", sigURL.Redacted(), newUnexpectedHTTPStatusError(res)) } contentType := res.Header.Get("Content-Type") if mimeType := simplifyContentType(contentType); mimeType == "text/html" { logrus.Warnf("Signature %q has Content-Type %q, unexpected for a signature", sigURL.Redacted(), contentType) // Don’t immediately fail; the lookaside spec does not place any requirements on Content-Type. // If the content really is HTML, it’s going to fail in signature.FromBlob. } sigBlob, err := iolimits.ReadAtMost(res.Body, iolimits.MaxSignatureBodySize) if err != nil { return nil, false, err } sig, err := signature.FromBlob(sigBlob) if err != nil { return nil, false, fmt.Errorf("parsing signature %s: %w", sigURL.Redacted(), err) } return sig, false, nil default: return nil, false, fmt.Errorf("Unsupported scheme when reading signature from %s", sigURL.Redacted()) } } // appendSignaturesFromAPIExtension implements GetSignaturesWithFormat() using the X-Registry-Supports-Signatures API extension, // storing the signatures to *dest. // On error, the contents of *dest are undefined. func (s *dockerImageSource) appendSignaturesFromAPIExtension(ctx context.Context, dest *[]signature.Signature, instanceDigest *digest.Digest) error { manifestDigest, err := s.manifestDigest(ctx, instanceDigest) if err != nil { return err } parsedBody, err := s.c.getExtensionsSignatures(ctx, s.physicalRef, manifestDigest) if err != nil { return err } for _, sig := range parsedBody.Signatures { if sig.Version == extensionSignatureSchemaVersion && sig.Type == extensionSignatureTypeAtomic { *dest = append(*dest, signature.SimpleSigningFromBlob(sig.Content)) } } return nil } // appendSignaturesFromSigstoreAttachments implements GetSignaturesWithFormat() using the sigstore tag convention, // storing the signatures to *dest. // On error, the contents of *dest are undefined. func (s *dockerImageSource) appendSignaturesFromSigstoreAttachments(ctx context.Context, dest *[]signature.Signature, instanceDigest *digest.Digest) error { if !s.c.useSigstoreAttachments { logrus.Debugf("Not looking for sigstore attachments: disabled by configuration") return nil } manifestDigest, err := s.manifestDigest(ctx, instanceDigest) if err != nil { return err } ociManifest, err := s.c.getSigstoreAttachmentManifest(ctx, s.physicalRef, manifestDigest) if err != nil { return err } if ociManifest == nil { return nil } logrus.Debugf("Found a sigstore attachment manifest with %d layers", len(ociManifest.Layers)) for layerIndex, layer := range ociManifest.Layers { // Note that this copies all kinds of attachments: attestations, and whatever else is there, // not just signatures. We leave the signature consumers to decide based on the MIME type. logrus.Debugf("Fetching sigstore attachment %d/%d: %s", layerIndex+1, len(ociManifest.Layers), layer.Digest.String()) // We don’t benefit from a real BlobInfoCache here because we never try to reuse/mount attachment payloads. // That might eventually need to change if payloads grow to be not just signatures, but something // significantly large. payload, err := s.c.getOCIDescriptorContents(ctx, s.physicalRef, layer, iolimits.MaxSignatureBodySize, none.NoCache) if err != nil { return err } *dest = append(*dest, signature.SigstoreFromComponents(layer.MediaType, payload, layer.Annotations)) } return nil } // deleteImage deletes the named image from the registry, if supported. func deleteImage(ctx context.Context, sys *types.SystemContext, ref dockerReference) error { if ref.isUnknownDigest { return fmt.Errorf("Docker reference without a tag or digest cannot be deleted") } registryConfig, err := loadRegistryConfiguration(sys) if err != nil { return err } // docker/distribution does not document what action should be used for deleting images. // // Current docker/distribution requires "pull" for reading the manifest and "delete" for deleting it. // quay.io requires "push" (an explicit "pull" is unnecessary), does not grant any token (fails parsing the request) if "delete" is included. // OpenShift ignores the action string (both the password and the token is an OpenShift API token identifying a user). // // We have to hard-code a single string, luckily both docker/distribution and quay.io support "*" to mean "everything". c, err := newDockerClientFromRef(sys, ref, registryConfig, true, "*") if err != nil { return err } defer c.Close() headers := map[string][]string{ "Accept": manifest.DefaultRequestedManifestMIMETypes, } refTail, err := ref.tagOrDigest() if err != nil { return err } getPath := fmt.Sprintf(manifestPath, reference.Path(ref.ref), refTail) get, err := c.makeRequest(ctx, http.MethodGet, getPath, headers, nil, v2Auth, nil) if err != nil { return err } defer get.Body.Close() switch get.StatusCode { case http.StatusOK: case http.StatusNotFound: return fmt.Errorf("Unable to delete %v. Image may not exist or is not stored with a v2 Schema in a v2 registry", ref.ref) default: return fmt.Errorf("deleting %v: %w", ref.ref, registryHTTPResponseToError(get)) } manifestBody, err := iolimits.ReadAtMost(get.Body, iolimits.MaxManifestBodySize) if err != nil { return err } manifestDigest, err := manifest.Digest(manifestBody) if err != nil { return fmt.Errorf("computing manifest digest: %w", err) } deletePath := fmt.Sprintf(manifestPath, reference.Path(ref.ref), manifestDigest) // When retrieving the digest from a registry >= 2.3 use the following header: // "Accept": "application/vnd.docker.distribution.manifest.v2+json" delete, err := c.makeRequest(ctx, http.MethodDelete, deletePath, headers, nil, v2Auth, nil) if err != nil { return err } defer delete.Body.Close() if delete.StatusCode != http.StatusAccepted { return fmt.Errorf("deleting %v: %w", ref.ref, registryHTTPResponseToError(delete)) } for i := 0; ; i++ { sigURL, err := lookasideStorageURL(c.signatureBase, manifestDigest, i) if err != nil { return err } missing, err := c.deleteOneSignature(sigURL) if err != nil { return err } if missing { break } } return nil } type bufferedNetworkReaderBuffer struct { data []byte len int consumed int err error } type bufferedNetworkReader struct { stream io.ReadCloser emptyBuffer chan *bufferedNetworkReaderBuffer readyBuffer chan *bufferedNetworkReaderBuffer terminate chan bool current *bufferedNetworkReaderBuffer mutex sync.Mutex gotEOF bool } // handleBufferedNetworkReader runs in a goroutine func handleBufferedNetworkReader(br *bufferedNetworkReader) { defer close(br.readyBuffer) for { select { case b := <-br.emptyBuffer: b.len, b.err = br.stream.Read(b.data) br.readyBuffer <- b if b.err != nil { return } case <-br.terminate: return } } } func (n *bufferedNetworkReader) Close() error { close(n.terminate) close(n.emptyBuffer) return n.stream.Close() } func (n *bufferedNetworkReader) read(p []byte) (int, error) { if n.current != nil { copied := copy(p, n.current.data[n.current.consumed:n.current.len]) n.current.consumed += copied if n.current.consumed == n.current.len { n.emptyBuffer <- n.current n.current = nil } if copied > 0 { return copied, nil } } if n.gotEOF { return 0, io.EOF } var b *bufferedNetworkReaderBuffer select { case b = <-n.readyBuffer: if b.err != nil { if b.err != io.EOF { return b.len, b.err } n.gotEOF = true } b.consumed = 0 n.current = b return n.read(p) case <-n.terminate: return 0, io.EOF } } func (n *bufferedNetworkReader) Read(p []byte) (int, error) { n.mutex.Lock() defer n.mutex.Unlock() return n.read(p) } func makeBufferedNetworkReader(stream io.ReadCloser, nBuffers, bufferSize uint) *bufferedNetworkReader { br := bufferedNetworkReader{ stream: stream, emptyBuffer: make(chan *bufferedNetworkReaderBuffer, nBuffers), readyBuffer: make(chan *bufferedNetworkReaderBuffer, nBuffers), terminate: make(chan bool), } go func() { handleBufferedNetworkReader(&br) }() for range nBuffers { b := bufferedNetworkReaderBuffer{ data: make([]byte, bufferSize), } br.emptyBuffer <- &b } return &br } type signalCloseReader struct { closed chan struct{} stream io.ReadCloser consumeStream bool } func (s signalCloseReader) Read(p []byte) (int, error) { return s.stream.Read(p) } func (s signalCloseReader) Close() error { defer close(s.closed) if s.consumeStream { if _, err := io.Copy(io.Discard, s.stream); err != nil { s.stream.Close() return err } } return s.stream.Close() }