From c9199cef5a014468fb8195b31da1b146be2a2e1b Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Wed, 6 May 2015 23:38:19 -0700 Subject: [PATCH] godep: update docker/docker/pkg Signed-off-by: Andrea Luzzardi --- Godeps/Godeps.json | 12 +- .../docker/docker/pkg/ioutils/readers.go | 129 +++++++++++- .../docker/docker/pkg/ioutils/readers_test.go | 183 ++++++++++++++++++ .../docker/docker/pkg/ioutils/writers.go | 21 ++ .../docker/docker/pkg/ioutils/writers_test.go | 65 +++++++ .../docker/pkg/parsers/filters/parse.go | 35 +++- .../docker/docker/pkg/units/MAINTAINERS | 2 - .../docker/docker/pkg/units/duration.go | 3 +- .../docker/docker/pkg/units/duration_test.go | 4 +- .../docker/docker/pkg/units/size.go | 24 +-- 10 files changed, 446 insertions(+), 32 deletions(-) create mode 100644 Godeps/_workspace/src/github.com/docker/docker/pkg/ioutils/writers_test.go delete mode 100644 Godeps/_workspace/src/github.com/docker/docker/pkg/units/MAINTAINERS diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 1c20966395..51d647a50e 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -32,18 +32,18 @@ }, { "ImportPath": "github.com/docker/docker/pkg/ioutils", - "Comment": "v1.4.1-216-g12fef2d", - "Rev": "12fef2d8df4e5da024418e324f7e3c3e82220a27" + "Comment": "v1.4.1-3245-g443437f", + "Rev": "443437f5ea04da9d62bf3e05d7951f7d30e77d96" }, { "ImportPath": "github.com/docker/docker/pkg/parsers/filters", - "Comment": "v1.4.1-216-g12fef2d", - "Rev": "12fef2d8df4e5da024418e324f7e3c3e82220a27" + "Comment": "v1.4.1-3245-g443437f", + "Rev": "443437f5ea04da9d62bf3e05d7951f7d30e77d96" }, { "ImportPath": "github.com/docker/docker/pkg/units", - "Comment": "v1.4.1-216-g12fef2d", - "Rev": "12fef2d8df4e5da024418e324f7e3c3e82220a27" + "Comment": "v1.4.1-3245-g443437f", + "Rev": "443437f5ea04da9d62bf3e05d7951f7d30e77d96" }, { "ImportPath": "github.com/gorilla/context", diff --git a/Godeps/_workspace/src/github.com/docker/docker/pkg/ioutils/readers.go b/Godeps/_workspace/src/github.com/docker/docker/pkg/ioutils/readers.go index 22f46fbd92..0e542cbad3 100644 --- a/Godeps/_workspace/src/github.com/docker/docker/pkg/ioutils/readers.go +++ b/Godeps/_workspace/src/github.com/docker/docker/pkg/ioutils/readers.go @@ -2,8 +2,13 @@ package ioutils import ( "bytes" + "crypto/rand" + "crypto/sha256" + "encoding/hex" "io" + "math/big" "sync" + "time" ) type readCloserWrapper struct { @@ -42,20 +47,40 @@ func NewReaderErrWrapper(r io.Reader, closer func()) io.Reader { } } +// bufReader allows the underlying reader to continue to produce +// output by pre-emptively reading from the wrapped reader. +// This is achieved by buffering this data in bufReader's +// expanding buffer. type bufReader struct { sync.Mutex - buf *bytes.Buffer - reader io.Reader - err error - wait sync.Cond - drainBuf []byte + buf *bytes.Buffer + reader io.Reader + err error + wait sync.Cond + drainBuf []byte + reuseBuf []byte + maxReuse int64 + resetTimeout time.Duration + bufLenResetThreshold int64 + maxReadDataReset int64 } func NewBufReader(r io.Reader) *bufReader { + var timeout int + if randVal, err := rand.Int(rand.Reader, big.NewInt(120)); err == nil { + timeout = int(randVal.Int64()) + 180 + } else { + timeout = 300 + } reader := &bufReader{ - buf: &bytes.Buffer{}, - drainBuf: make([]byte, 1024), - reader: r, + buf: &bytes.Buffer{}, + drainBuf: make([]byte, 1024), + reuseBuf: make([]byte, 4096), + maxReuse: 1000, + resetTimeout: time.Second * time.Duration(timeout), + bufLenResetThreshold: 100 * 1024, + maxReadDataReset: 10 * 1024 * 1024, + reader: r, } reader.wait.L = &reader.Mutex go reader.drain() @@ -74,14 +99,94 @@ func NewBufReaderWithDrainbufAndBuffer(r io.Reader, drainBuffer []byte, buffer * } func (r *bufReader) drain() { + var ( + duration time.Duration + lastReset time.Time + now time.Time + reset bool + bufLen int64 + dataSinceReset int64 + maxBufLen int64 + reuseBufLen int64 + reuseCount int64 + ) + reuseBufLen = int64(len(r.reuseBuf)) + lastReset = time.Now() for { n, err := r.reader.Read(r.drainBuf) + dataSinceReset += int64(n) r.Lock() + bufLen = int64(r.buf.Len()) + if bufLen > maxBufLen { + maxBufLen = bufLen + } + + // Avoid unbounded growth of the buffer over time. + // This has been discovered to be the only non-intrusive + // solution to the unbounded growth of the buffer. + // Alternative solutions such as compression, multiple + // buffers, channels and other similar pieces of code + // were reducing throughput, overall Docker performance + // or simply crashed Docker. + // This solution releases the buffer when specific + // conditions are met to avoid the continuous resizing + // of the buffer for long lived containers. + // + // Move data to the front of the buffer if it's + // smaller than what reuseBuf can store + if bufLen > 0 && reuseBufLen >= bufLen { + n, _ := r.buf.Read(r.reuseBuf) + r.buf.Write(r.reuseBuf[0:n]) + // Take action if the buffer has been reused too many + // times and if there's data in the buffer. + // The timeout is also used as means to avoid doing + // these operations more often or less often than + // required. + // The various conditions try to detect heavy activity + // in the buffer which might be indicators of heavy + // growth of the buffer. + } else if reuseCount >= r.maxReuse && bufLen > 0 { + now = time.Now() + duration = now.Sub(lastReset) + timeoutReached := duration >= r.resetTimeout + + // The timeout has been reached and the + // buffered data couldn't be moved to the front + // of the buffer, so the buffer gets reset. + if timeoutReached && bufLen > reuseBufLen { + reset = true + } + // The amount of buffered data is too high now, + // reset the buffer. + if timeoutReached && maxBufLen >= r.bufLenResetThreshold { + reset = true + } + // Reset the buffer if a certain amount of + // data has gone through the buffer since the + // last reset. + if timeoutReached && dataSinceReset >= r.maxReadDataReset { + reset = true + } + // The buffered data is moved to a fresh buffer, + // swap the old buffer with the new one and + // reset all counters. + if reset { + newbuf := &bytes.Buffer{} + newbuf.ReadFrom(r.buf) + r.buf = newbuf + lastReset = now + reset = false + dataSinceReset = 0 + maxBufLen = 0 + reuseCount = 0 + } + } if err != nil { r.err = err } else { r.buf.Write(r.drainBuf[0:n]) } + reuseCount++ r.wait.Signal() r.Unlock() if err != nil { @@ -112,3 +217,11 @@ func (r *bufReader) Close() error { } return closer.Close() } + +func HashData(src io.Reader) (string, error) { + h := sha256.New() + if _, err := io.Copy(h, src); err != nil { + return "", err + } + return "sha256:" + hex.EncodeToString(h.Sum(nil)), nil +} diff --git a/Godeps/_workspace/src/github.com/docker/docker/pkg/ioutils/readers_test.go b/Godeps/_workspace/src/github.com/docker/docker/pkg/ioutils/readers_test.go index a7a2dad176..b4cbfd95f6 100644 --- a/Godeps/_workspace/src/github.com/docker/docker/pkg/ioutils/readers_test.go +++ b/Godeps/_workspace/src/github.com/docker/docker/pkg/ioutils/readers_test.go @@ -2,11 +2,92 @@ package ioutils import ( "bytes" + "fmt" "io" "io/ioutil" + "strings" "testing" ) +// Implement io.Reader +type errorReader struct{} + +func (r *errorReader) Read(p []byte) (int, error) { + return 0, fmt.Errorf("Error reader always fail.") +} + +func TestReadCloserWrapperClose(t *testing.T) { + reader := strings.NewReader("A string reader") + wrapper := NewReadCloserWrapper(reader, func() error { + return fmt.Errorf("This will be called when closing") + }) + err := wrapper.Close() + if err == nil || !strings.Contains(err.Error(), "This will be called when closing") { + t.Fatalf("readCloserWrapper should have call the anonymous func and thus, fail.") + } +} + +func TestReaderErrWrapperReadOnError(t *testing.T) { + called := false + reader := &errorReader{} + wrapper := NewReaderErrWrapper(reader, func() { + called = true + }) + _, err := wrapper.Read([]byte{}) + if err == nil || !strings.Contains(err.Error(), "Error reader always fail.") { + t.Fatalf("readErrWrapper should returned an error") + } + if !called { + t.Fatalf("readErrWrapper should have call the anonymous function on failure") + } +} + +func TestReaderErrWrapperRead(t *testing.T) { + called := false + reader := strings.NewReader("a string reader.") + wrapper := NewReaderErrWrapper(reader, func() { + called = true // Should not be called + }) + // Read 20 byte (should be ok with the string above) + num, err := wrapper.Read(make([]byte, 20)) + if err != nil { + t.Fatal(err) + } + if num != 16 { + t.Fatalf("readerErrWrapper should have read 16 byte, but read %d", num) + } +} + +func TestNewBufReaderWithDrainbufAndBuffer(t *testing.T) { + reader, writer := io.Pipe() + + drainBuffer := make([]byte, 1024) + buffer := bytes.Buffer{} + bufreader := NewBufReaderWithDrainbufAndBuffer(reader, drainBuffer, &buffer) + + // Write everything down to a Pipe + // Usually, a pipe should block but because of the buffered reader, + // the writes will go through + done := make(chan bool) + go func() { + writer.Write([]byte("hello world")) + writer.Close() + done <- true + }() + + // Drain the reader *after* everything has been written, just to verify + // it is indeed buffering + <-done + + output, err := ioutil.ReadAll(bufreader) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(output, []byte("hello world")) { + t.Error(string(output)) + } +} + func TestBufReader(t *testing.T) { reader, writer := io.Pipe() bufreader := NewBufReader(reader) @@ -32,3 +113,105 @@ func TestBufReader(t *testing.T) { t.Error(string(output)) } } + +func TestBufReaderCloseWithNonReaderCloser(t *testing.T) { + reader := strings.NewReader("buffer") + bufreader := NewBufReader(reader) + + if err := bufreader.Close(); err != nil { + t.Fatal(err) + } + +} + +// implements io.ReadCloser +type simpleReaderCloser struct{} + +func (r *simpleReaderCloser) Read(p []byte) (n int, err error) { + return 0, nil +} + +func (r *simpleReaderCloser) Close() error { + return nil +} + +func TestBufReaderCloseWithReaderCloser(t *testing.T) { + reader := &simpleReaderCloser{} + bufreader := NewBufReader(reader) + + err := bufreader.Close() + if err != nil { + t.Fatal(err) + } + +} + +func TestHashData(t *testing.T) { + reader := strings.NewReader("hash-me") + actual, err := HashData(reader) + if err != nil { + t.Fatal(err) + } + expected := "sha256:4d11186aed035cc624d553e10db358492c84a7cd6b9670d92123c144930450aa" + if actual != expected { + t.Fatalf("Expecting %s, got %s", expected, actual) + } +} + +type repeatedReader struct { + readCount int + maxReads int + data []byte +} + +func newRepeatedReader(max int, data []byte) *repeatedReader { + return &repeatedReader{0, max, data} +} + +func (r *repeatedReader) Read(p []byte) (int, error) { + if r.readCount >= r.maxReads { + return 0, io.EOF + } + r.readCount++ + n := copy(p, r.data) + return n, nil +} + +func testWithData(data []byte, reads int) { + reader := newRepeatedReader(reads, data) + bufReader := NewBufReader(reader) + io.Copy(ioutil.Discard, bufReader) +} + +func Benchmark1M10BytesReads(b *testing.B) { + reads := 1000000 + readSize := int64(10) + data := make([]byte, readSize) + b.SetBytes(readSize * int64(reads)) + b.ResetTimer() + for i := 0; i < b.N; i++ { + testWithData(data, reads) + } +} + +func Benchmark1M1024BytesReads(b *testing.B) { + reads := 1000000 + readSize := int64(1024) + data := make([]byte, readSize) + b.SetBytes(readSize * int64(reads)) + b.ResetTimer() + for i := 0; i < b.N; i++ { + testWithData(data, reads) + } +} + +func Benchmark10k32KBytesReads(b *testing.B) { + reads := 10000 + readSize := int64(32 * 1024) + data := make([]byte, readSize) + b.SetBytes(readSize * int64(reads)) + b.ResetTimer() + for i := 0; i < b.N; i++ { + testWithData(data, reads) + } +} diff --git a/Godeps/_workspace/src/github.com/docker/docker/pkg/ioutils/writers.go b/Godeps/_workspace/src/github.com/docker/docker/pkg/ioutils/writers.go index c0b3608fe6..43fdc44ea9 100644 --- a/Godeps/_workspace/src/github.com/docker/docker/pkg/ioutils/writers.go +++ b/Godeps/_workspace/src/github.com/docker/docker/pkg/ioutils/writers.go @@ -37,3 +37,24 @@ func NewWriteCloserWrapper(r io.Writer, closer func() error) io.WriteCloser { closer: closer, } } + +// Wrap a concrete io.Writer and hold a count of the number +// of bytes written to the writer during a "session". +// This can be convenient when write return is masked +// (e.g., json.Encoder.Encode()) +type WriteCounter struct { + Count int64 + Writer io.Writer +} + +func NewWriteCounter(w io.Writer) *WriteCounter { + return &WriteCounter{ + Writer: w, + } +} + +func (wc *WriteCounter) Write(p []byte) (count int, err error) { + count, err = wc.Writer.Write(p) + wc.Count += int64(count) + return +} diff --git a/Godeps/_workspace/src/github.com/docker/docker/pkg/ioutils/writers_test.go b/Godeps/_workspace/src/github.com/docker/docker/pkg/ioutils/writers_test.go new file mode 100644 index 0000000000..564b1cd4f5 --- /dev/null +++ b/Godeps/_workspace/src/github.com/docker/docker/pkg/ioutils/writers_test.go @@ -0,0 +1,65 @@ +package ioutils + +import ( + "bytes" + "strings" + "testing" +) + +func TestWriteCloserWrapperClose(t *testing.T) { + called := false + writer := bytes.NewBuffer([]byte{}) + wrapper := NewWriteCloserWrapper(writer, func() error { + called = true + return nil + }) + if err := wrapper.Close(); err != nil { + t.Fatal(err) + } + if !called { + t.Fatalf("writeCloserWrapper should have call the anonymous function.") + } +} + +func TestNopWriteCloser(t *testing.T) { + writer := bytes.NewBuffer([]byte{}) + wrapper := NopWriteCloser(writer) + if err := wrapper.Close(); err != nil { + t.Fatal("NopWriteCloser always return nil on Close.") + } + +} + +func TestNopWriter(t *testing.T) { + nw := &NopWriter{} + l, err := nw.Write([]byte{'c'}) + if err != nil { + t.Fatal(err) + } + if l != 1 { + t.Fatalf("Expected 1 got %d", l) + } +} + +func TestWriteCounter(t *testing.T) { + dummy1 := "This is a dummy string." + dummy2 := "This is another dummy string." + totalLength := int64(len(dummy1) + len(dummy2)) + + reader1 := strings.NewReader(dummy1) + reader2 := strings.NewReader(dummy2) + + var buffer bytes.Buffer + wc := NewWriteCounter(&buffer) + + reader1.WriteTo(wc) + reader2.WriteTo(wc) + + if wc.Count != totalLength { + t.Errorf("Wrong count: %d vs. %d", wc.Count, totalLength) + } + + if buffer.String() != dummy1+dummy2 { + t.Error("Wrong message written") + } +} diff --git a/Godeps/_workspace/src/github.com/docker/docker/pkg/parsers/filters/parse.go b/Godeps/_workspace/src/github.com/docker/docker/pkg/parsers/filters/parse.go index 8b045a3098..df5486d515 100644 --- a/Godeps/_workspace/src/github.com/docker/docker/pkg/parsers/filters/parse.go +++ b/Godeps/_workspace/src/github.com/docker/docker/pkg/parsers/filters/parse.go @@ -58,13 +58,44 @@ func FromParam(p string) (Args, error) { if len(p) == 0 { return args, nil } - err := json.Unmarshal([]byte(p), &args) - if err != nil { + if err := json.NewDecoder(strings.NewReader(p)).Decode(&args); err != nil { return nil, err } return args, nil } +func (filters Args) MatchKVList(field string, sources map[string]string) bool { + fieldValues := filters[field] + + //do not filter if there is no filter set or cannot determine filter + if len(fieldValues) == 0 { + return true + } + + if sources == nil || len(sources) == 0 { + return false + } + +outer: + for _, name2match := range fieldValues { + testKV := strings.SplitN(name2match, "=", 2) + + for k, v := range sources { + if len(testKV) == 1 { + if k == testKV[0] { + continue outer + } + } else if k == testKV[0] && v == testKV[1] { + continue outer + } + } + + return false + } + + return true +} + func (filters Args) Match(field, source string) bool { fieldValues := filters[field] diff --git a/Godeps/_workspace/src/github.com/docker/docker/pkg/units/MAINTAINERS b/Godeps/_workspace/src/github.com/docker/docker/pkg/units/MAINTAINERS deleted file mode 100644 index 96abeae570..0000000000 --- a/Godeps/_workspace/src/github.com/docker/docker/pkg/units/MAINTAINERS +++ /dev/null @@ -1,2 +0,0 @@ -Victor Vieux (@vieux) -Jessie Frazelle (@jfrazelle) diff --git a/Godeps/_workspace/src/github.com/docker/docker/pkg/units/duration.go b/Godeps/_workspace/src/github.com/docker/docker/pkg/units/duration.go index cd33121496..a31f780e3b 100644 --- a/Godeps/_workspace/src/github.com/docker/docker/pkg/units/duration.go +++ b/Godeps/_workspace/src/github.com/docker/docker/pkg/units/duration.go @@ -26,6 +26,7 @@ func HumanDuration(d time.Duration) string { return fmt.Sprintf("%d weeks", hours/24/7) } else if hours < 24*365*2 { return fmt.Sprintf("%d months", hours/24/30) + } else { + return fmt.Sprintf("%d years", hours/24/365) } - return fmt.Sprintf("%f years", d.Hours()/24/365) } diff --git a/Godeps/_workspace/src/github.com/docker/docker/pkg/units/duration_test.go b/Godeps/_workspace/src/github.com/docker/docker/pkg/units/duration_test.go index a22947402b..fcfb6b7bbd 100644 --- a/Godeps/_workspace/src/github.com/docker/docker/pkg/units/duration_test.go +++ b/Godeps/_workspace/src/github.com/docker/docker/pkg/units/duration_test.go @@ -41,6 +41,6 @@ func TestHumanDuration(t *testing.T) { assertEquals(t, "13 months", HumanDuration(13*month)) assertEquals(t, "23 months", HumanDuration(23*month)) assertEquals(t, "24 months", HumanDuration(24*month)) - assertEquals(t, "2.010959 years", HumanDuration(24*month+2*week)) - assertEquals(t, "3.164384 years", HumanDuration(3*year+2*month)) + assertEquals(t, "2 years", HumanDuration(24*month+2*week)) + assertEquals(t, "3 years", HumanDuration(3*year+2*month)) } diff --git a/Godeps/_workspace/src/github.com/docker/docker/pkg/units/size.go b/Godeps/_workspace/src/github.com/docker/docker/pkg/units/size.go index 7cfb57ba51..d7850ad0b0 100644 --- a/Godeps/_workspace/src/github.com/docker/docker/pkg/units/size.go +++ b/Godeps/_workspace/src/github.com/docker/docker/pkg/units/size.go @@ -37,23 +37,25 @@ var ( var decimapAbbrs = []string{"B", "kB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"} var binaryAbbrs = []string{"B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"} +// CustomSize returns a human-readable approximation of a size +// using custom format +func CustomSize(format string, size float64, base float64, _map []string) string { + i := 0 + for size >= base { + size = size / base + i++ + } + return fmt.Sprintf(format, size, _map[i]) +} + // HumanSize returns a human-readable approximation of a size // using SI standard (eg. "44kB", "17MB") func HumanSize(size float64) string { - return intToString(float64(size), 1000.0, decimapAbbrs) + return CustomSize("%.4g %s", float64(size), 1000.0, decimapAbbrs) } func BytesSize(size float64) string { - return intToString(size, 1024.0, binaryAbbrs) -} - -func intToString(size, unit float64, _map []string) string { - i := 0 - for size >= unit { - size = size / unit - i++ - } - return fmt.Sprintf("%.4g %s", size, _map[i]) + return CustomSize("%.4g %s", size, 1024.0, binaryAbbrs) } // FromHumanSize returns an integer from a human-readable specification of a