remove uses of github.com/docker/docker/pkg/ioutils ReadCloserWrapper

It was the only utility we consumed from the package, and it's trivial
to implement, so let's create local copies of it.

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
This commit is contained in:
Sebastiaan van Stijn 2025-07-14 22:09:31 +02:00
parent c69d8bde4a
commit 3600ebca76
No known key found for this signature in database
GPG Key ID: 76698F39D527CE8C
7 changed files with 41 additions and 293 deletions

View File

@ -9,7 +9,6 @@ import (
"github.com/docker/cli/cli/command" "github.com/docker/cli/cli/command"
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/stdcopy" "github.com/docker/docker/pkg/stdcopy"
"github.com/moby/term" "github.com/moby/term"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -19,6 +18,18 @@ import (
// TODO: This could be moved to `pkg/term`. // TODO: This could be moved to `pkg/term`.
var defaultEscapeKeys = []byte{16, 17} var defaultEscapeKeys = []byte{16, 17}
// readCloserWrapper wraps an io.Reader, and implements an io.ReadCloser
// It calls the given callback function when closed.
type readCloserWrapper struct {
io.Reader
closer func() error
}
// Close calls back the passed closer function
func (r *readCloserWrapper) Close() error {
return r.closer()
}
// A hijackedIOStreamer handles copying input to and output from streams to the // A hijackedIOStreamer handles copying input to and output from streams to the
// connection. // connection.
type hijackedIOStreamer struct { type hijackedIOStreamer struct {
@ -100,7 +111,10 @@ func (h *hijackedIOStreamer) setupInput() (restore func(), err error) {
} }
} }
h.inputStream = ioutils.NewReadCloserWrapper(term.NewEscapeProxy(h.inputStream, escapeKeys), h.inputStream.Close) h.inputStream = &readCloserWrapper{
Reader: term.NewEscapeProxy(h.inputStream, escapeKeys),
closer: h.inputStream.Close,
}
return restore, nil return restore, nil
} }

View File

@ -15,7 +15,6 @@ import (
"time" "time"
"github.com/docker/docker/builder/remotecontext/git" "github.com/docker/docker/builder/remotecontext/git"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/progress" "github.com/docker/docker/pkg/progress"
"github.com/docker/docker/pkg/streamformatter" "github.com/docker/docker/pkg/streamformatter"
"github.com/docker/docker/pkg/stringid" "github.com/docker/docker/pkg/stringid"
@ -108,7 +107,7 @@ func DetectArchiveReader(input io.ReadCloser) (rc io.ReadCloser, isArchive bool,
return nil, false, errors.Errorf("failed to peek context header from STDIN: %v", err) return nil, false, errors.Errorf("failed to peek context header from STDIN: %v", err)
} }
return ioutils.NewReadCloserWrapper(buf, func() error { return input.Close() }), IsArchive(magic), nil return newReadCloserWrapper(buf, func() error { return input.Close() }), IsArchive(magic), nil
} }
// WriteTempDockerfile writes a Dockerfile stream to a temporary file with a // WriteTempDockerfile writes a Dockerfile stream to a temporary file with a
@ -169,7 +168,7 @@ func GetContextFromReader(rc io.ReadCloser, dockerfileName string) (out io.ReadC
return nil, "", err return nil, "", err
} }
return ioutils.NewReadCloserWrapper(tarArchive, func() error { return newReadCloserWrapper(tarArchive, func() error {
err := tarArchive.Close() err := tarArchive.Close()
os.RemoveAll(dockerfileDir) os.RemoveAll(dockerfileDir)
return err return err
@ -227,7 +226,7 @@ func GetContextFromURL(out io.Writer, remoteURL, dockerfileName string) (io.Read
// Pass the response body through a progress reader. // Pass the response body through a progress reader.
progReader := progress.NewProgressReader(response.Body, progressOutput, response.ContentLength, "", "Downloading build context from remote url: "+remoteURL) progReader := progress.NewProgressReader(response.Body, progressOutput, response.ContentLength, "", "Downloading build context from remote url: "+remoteURL)
return GetContextFromReader(ioutils.NewReadCloserWrapper(progReader, func() error { return response.Body.Close() }), dockerfileName) return GetContextFromReader(newReadCloserWrapper(progReader, func() error { return response.Body.Close() }), dockerfileName)
} }
// getWithStatusError does an http.Get() and returns an error if the // getWithStatusError does an http.Get() and returns an error if the
@ -444,3 +443,25 @@ func Compress(buildCtx io.ReadCloser) (io.ReadCloser, error) {
return pipeReader, nil return pipeReader, nil
} }
// readCloserWrapper wraps an io.Reader, and implements an io.ReadCloser
// It calls the given callback function when closed. It should be constructed
// with [newReadCloserWrapper].
type readCloserWrapper struct {
io.Reader
closer func() error
}
// Close calls back the passed closer function
func (r *readCloserWrapper) Close() error {
return r.closer()
}
// newReadCloserWrapper wraps an io.Reader, and implements an io.ReadCloser.
// It calls the given callback function when closed.
func newReadCloserWrapper(r io.Reader, closer func() error) io.ReadCloser {
return &readCloserWrapper{
Reader: r,
closer: closer,
}
}

View File

@ -1,44 +0,0 @@
package ioutils
import (
"io"
"os"
"github.com/moby/sys/atomicwriter"
)
// NewAtomicFileWriter returns WriteCloser so that writing to it writes to a
// temporary file and closing it atomically changes the temporary file to
// destination path. Writing and closing concurrently is not allowed.
// NOTE: umask is not considered for the file's permissions.
//
// Deprecated: use [atomicwriter.New] instead.
func NewAtomicFileWriter(filename string, perm os.FileMode) (io.WriteCloser, error) {
return atomicwriter.New(filename, perm)
}
// AtomicWriteFile atomically writes data to a file named by filename and with the specified permission bits.
// NOTE: umask is not considered for the file's permissions.
//
// Deprecated: use [atomicwriter.WriteFile] instead.
func AtomicWriteFile(filename string, data []byte, perm os.FileMode) error {
return atomicwriter.WriteFile(filename, data, perm)
}
// AtomicWriteSet is used to atomically write a set
// of files and ensure they are visible at the same time.
// Must be committed to a new directory.
//
// Deprecated: use [atomicwriter.WriteSet] instead.
type AtomicWriteSet = atomicwriter.WriteSet
// NewAtomicWriteSet creates a new atomic write set to
// atomically create a set of files. The given directory
// is used as the base directory for storing files before
// commit. If no temporary directory is given the system
// default is used.
//
// Deprecated: use [atomicwriter.NewWriteSet] instead.
func NewAtomicWriteSet(tmpDir string) (*atomicwriter.WriteSet, error) {
return atomicwriter.NewWriteSet(tmpDir)
}

View File

@ -1,118 +0,0 @@
package ioutils
import (
"context"
"io"
"runtime/debug"
"sync/atomic"
"github.com/containerd/log"
)
// readCloserWrapper wraps an io.Reader, and implements an io.ReadCloser
// It calls the given callback function when closed. It should be constructed
// with NewReadCloserWrapper
type readCloserWrapper struct {
io.Reader
closer func() error
closed atomic.Bool
}
// Close calls back the passed closer function
func (r *readCloserWrapper) Close() error {
if !r.closed.CompareAndSwap(false, true) {
subsequentCloseWarn("ReadCloserWrapper")
return nil
}
return r.closer()
}
// NewReadCloserWrapper wraps an io.Reader, and implements an io.ReadCloser.
// It calls the given callback function when closed.
func NewReadCloserWrapper(r io.Reader, closer func() error) io.ReadCloser {
return &readCloserWrapper{
Reader: r,
closer: closer,
}
}
// cancelReadCloser wraps an io.ReadCloser with a context for cancelling read
// operations.
type cancelReadCloser struct {
cancel func()
pR *io.PipeReader // Stream to read from
pW *io.PipeWriter
closed atomic.Bool
}
// NewCancelReadCloser creates a wrapper that closes the ReadCloser when the
// context is cancelled. The returned io.ReadCloser must be closed when it is
// no longer needed.
func NewCancelReadCloser(ctx context.Context, in io.ReadCloser) io.ReadCloser {
pR, pW := io.Pipe()
// Create a context used to signal when the pipe is closed
doneCtx, cancel := context.WithCancel(context.Background())
p := &cancelReadCloser{
cancel: cancel,
pR: pR,
pW: pW,
}
go func() {
_, err := io.Copy(pW, in)
select {
case <-ctx.Done():
// If the context was closed, p.closeWithError
// was already called. Calling it again would
// change the error that Read returns.
default:
p.closeWithError(err)
}
in.Close()
}()
go func() {
for {
select {
case <-ctx.Done():
p.closeWithError(ctx.Err())
case <-doneCtx.Done():
return
}
}
}()
return p
}
// Read wraps the Read method of the pipe that provides data from the wrapped
// ReadCloser.
func (p *cancelReadCloser) Read(buf []byte) (int, error) {
return p.pR.Read(buf)
}
// closeWithError closes the wrapper and its underlying reader. It will
// cause future calls to Read to return err.
func (p *cancelReadCloser) closeWithError(err error) {
_ = p.pW.CloseWithError(err)
p.cancel()
}
// Close closes the wrapper its underlying reader. It will cause
// future calls to Read to return io.EOF.
func (p *cancelReadCloser) Close() error {
if !p.closed.CompareAndSwap(false, true) {
subsequentCloseWarn("cancelReadCloser")
return nil
}
p.closeWithError(io.EOF)
return nil
}
func subsequentCloseWarn(name string) {
log.G(context.TODO()).Error("subsequent attempt to close " + name)
if log.GetLevel() >= log.DebugLevel {
log.G(context.TODO()).Errorf("stack trace: %s", string(debug.Stack()))
}
}

View File

@ -1,96 +0,0 @@
package ioutils
import (
"io"
"sync"
)
// WriteFlusher wraps the Write and Flush operation ensuring that every write
// is a flush. In addition, the Close method can be called to intercept
// Read/Write calls if the targets lifecycle has already ended.
type WriteFlusher struct {
w io.Writer
flusher flusher
flushed chan struct{}
flushedOnce sync.Once
closed chan struct{}
closeLock sync.Mutex
}
type flusher interface {
Flush()
}
func (wf *WriteFlusher) Write(b []byte) (int, error) {
select {
case <-wf.closed:
return 0, io.EOF
default:
}
n, err := wf.w.Write(b)
wf.Flush() // every write is a flush.
return n, err
}
// Flush the stream immediately.
func (wf *WriteFlusher) Flush() {
select {
case <-wf.closed:
return
default:
}
wf.flushedOnce.Do(func() {
close(wf.flushed)
})
wf.flusher.Flush()
}
// Flushed returns the state of flushed.
// If it's flushed, return true, or else it return false.
func (wf *WriteFlusher) Flushed() bool {
// BUG(stevvooe): Remove this method. Its use is inherently racy. Seems to
// be used to detect whether or a response code has been issued or not.
// Another hook should be used instead.
var flushed bool
select {
case <-wf.flushed:
flushed = true
default:
}
return flushed
}
// Close closes the write flusher, disallowing any further writes to the
// target. After the flusher is closed, all calls to write or flush will
// result in an error.
func (wf *WriteFlusher) Close() error {
wf.closeLock.Lock()
defer wf.closeLock.Unlock()
select {
case <-wf.closed:
return io.EOF
default:
close(wf.closed)
}
return nil
}
// nopFlusher represents a type which flush operation is nop.
type nopFlusher struct{}
// Flush is a nop operation.
func (f *nopFlusher) Flush() {}
// NewWriteFlusher returns a new WriteFlusher.
func NewWriteFlusher(w io.Writer) *WriteFlusher {
var fl flusher
if f, ok := w.(flusher); ok {
fl = f
} else {
fl = &nopFlusher{}
}
return &WriteFlusher{w: w, flusher: fl, closed: make(chan struct{}), flushed: make(chan struct{})}
}

View File

@ -1,28 +0,0 @@
package ioutils
import (
"io"
"sync/atomic"
)
type writeCloserWrapper struct {
io.Writer
closer func() error
closed atomic.Bool
}
func (r *writeCloserWrapper) Close() error {
if !r.closed.CompareAndSwap(false, true) {
subsequentCloseWarn("WriteCloserWrapper")
return nil
}
return r.closer()
}
// NewWriteCloserWrapper returns a new io.WriteCloser.
func NewWriteCloserWrapper(r io.Writer, closer func() error) io.WriteCloser {
return &writeCloserWrapper{
Writer: r,
closer: closer,
}
}

1
vendor/modules.txt vendored
View File

@ -95,7 +95,6 @@ github.com/docker/docker/client
github.com/docker/docker/internal/lazyregexp github.com/docker/docker/internal/lazyregexp
github.com/docker/docker/internal/multierror github.com/docker/docker/internal/multierror
github.com/docker/docker/pkg/homedir github.com/docker/docker/pkg/homedir
github.com/docker/docker/pkg/ioutils
github.com/docker/docker/pkg/jsonmessage github.com/docker/docker/pkg/jsonmessage
github.com/docker/docker/pkg/longpath github.com/docker/docker/pkg/longpath
github.com/docker/docker/pkg/process github.com/docker/docker/pkg/process