From 597e0e69b4c8521f39691d0a07d1f31b7116a337 Mon Sep 17 00:00:00 2001 From: Victor Vieux Date: Thu, 28 Nov 2013 12:16:57 -0800 Subject: [PATCH] split in 3 files --- commands.go | 2 +- graph.go | 2 +- server.go | 34 +++---- utils/jsonmessage.go | 118 ++++++++++++++++++++++++ utils/progressreader.go | 55 +++++++++++ utils/streamformatter.go | 68 ++++++++++++++ utils/utils.go | 192 --------------------------------------- 7 files changed, 260 insertions(+), 211 deletions(-) create mode 100644 utils/jsonmessage.go create mode 100644 utils/progressreader.go create mode 100644 utils/streamformatter.go diff --git a/commands.go b/commands.go index d992db2e6c..f2e8d0de2a 100644 --- a/commands.go +++ b/commands.go @@ -202,7 +202,7 @@ func (cli *DockerCli) CmdBuild(args ...string) error { // FIXME: ProgressReader shouldn't be this annoying to use if context != nil { sf := utils.NewStreamFormatter(false) - body = utils.ProgressReader(ioutil.NopCloser(context), 0, cli.err, sf.FormatProgress("", "Uploading context", "%v bytes%0.0s%0.0s"), sf, true) + body = utils.ProgressReader(ioutil.NopCloser(context), 0, cli.err, sf, true, "", "Uploading context") } // Upload the build context v := &url.Values{} diff --git a/graph.go b/graph.go index 04d77b9146..19c668830e 100644 --- a/graph.go +++ b/graph.go @@ -205,7 +205,7 @@ func (graph *Graph) TempLayerArchive(id string, compression archive.Compression, if err != nil { return nil, err } - return archive.NewTempArchive(utils.ProgressReader(ioutil.NopCloser(a), 0, output, sf.FormatProgress("", "Buffering to disk", "%v/%v (%v)"), sf, true), tmp) + return archive.NewTempArchive(utils.ProgressReader(ioutil.NopCloser(a), 0, output, sf, true, "", "Buffering to disk"), tmp) } // Mktemp creates a temporary sub-directory inside the graph's filesystem. diff --git a/server.go b/server.go index 3783c7bded..bd4dc22740 100644 --- a/server.go +++ b/server.go @@ -451,7 +451,7 @@ func (srv *Server) ImageInsert(name, url, path string, out io.Writer, sf *utils. return err } - if err := c.Inject(utils.ProgressReader(file.Body, int(file.ContentLength), out, sf.FormatProgress("", "Downloading", "%8v/%v (%v)"), sf, false), path); err != nil { + if err := c.Inject(utils.ProgressReader(file.Body, int(file.ContentLength), out, sf, false, "", "Downloading"), path); err != nil { return err } // FIXME: Handle custom repo, tag comment, author @@ -761,7 +761,7 @@ func (srv *Server) pullImage(r *registry.Registry, out io.Writer, imgID, endpoin if err != nil { return err } - out.Write(sf.FormatProgress(utils.TruncateID(imgID), "Pulling", "dependent layers")) + out.Write(sf.FormatProgress(utils.TruncateID(imgID), "Pulling dependent layers", nil)) // FIXME: Try to stream the images? // FIXME: Launch the getRemoteImage() in goroutines @@ -776,33 +776,33 @@ func (srv *Server) pullImage(r *registry.Registry, out io.Writer, imgID, endpoin defer srv.poolRemove("pull", "layer:"+id) if !srv.runtime.graph.Exists(id) { - out.Write(sf.FormatProgress(utils.TruncateID(id), "Pulling", "metadata")) + out.Write(sf.FormatProgress(utils.TruncateID(id), "Pulling metadata", nil)) imgJSON, imgSize, err := r.GetRemoteImageJSON(id, endpoint, token) if err != nil { - out.Write(sf.FormatProgress(utils.TruncateID(id), "Error", "pulling dependent layers")) + out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil)) // FIXME: Keep going in case of error? return err } img, err := NewImgJSON(imgJSON) if err != nil { - out.Write(sf.FormatProgress(utils.TruncateID(id), "Error", "pulling dependent layers")) + out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil)) return fmt.Errorf("Failed to parse json: %s", err) } // Get the layer - out.Write(sf.FormatProgress(utils.TruncateID(id), "Pulling", "fs layer")) + out.Write(sf.FormatProgress(utils.TruncateID(id), "Pulling fs layer", nil)) layer, err := r.GetRemoteImageLayer(img.ID, endpoint, token) if err != nil { - out.Write(sf.FormatProgress(utils.TruncateID(id), "Error", "pulling dependent layers")) + out.Write(sf.FormatProgress(utils.TruncateID(id), "Error pulling dependent layers", nil)) return err } defer layer.Close() - if err := srv.runtime.graph.Register(imgJSON, utils.ProgressReader(layer, imgSize, out, sf.FormatProgress(utils.TruncateID(id), "Downloading", "%8v/%v (%v)"), sf, false), img); err != nil { - out.Write(sf.FormatProgress(utils.TruncateID(id), "Error", "downloading dependent layers")) + if err := srv.runtime.graph.Register(imgJSON, utils.ProgressReader(layer, imgSize, out, sf, false, utils.TruncateID(id), "Downloading"), img); err != nil { + out.Write(sf.FormatProgress(utils.TruncateID(id), "Error downloading dependent layers", nil)) return err } } - out.Write(sf.FormatProgress(utils.TruncateID(id), "Download", "complete")) + out.Write(sf.FormatProgress(utils.TruncateID(id), "Download complete", nil)) } return nil @@ -875,29 +875,29 @@ func (srv *Server) pullRepository(r *registry.Registry, out io.Writer, localName } defer srv.poolRemove("pull", "img:"+img.ID) - out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Pulling", fmt.Sprintf("image (%s) from %s", img.Tag, localName))) + out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s", img.Tag, localName), nil)) success := false var lastErr error for _, ep := range repoData.Endpoints { - out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Pulling", fmt.Sprintf("image (%s) from %s, endpoint: %s", img.Tag, localName, ep))) + out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, endpoint: %s", img.Tag, localName, ep), nil)) if err := srv.pullImage(r, out, img.ID, ep, repoData.Tokens, sf); err != nil { // Its not ideal that only the last error is returned, it would be better to concatenate the errors. // As the error is also given to the output stream the user will see the error. lastErr = err - out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Error pulling", fmt.Sprintf("image (%s) from %s, endpoint: %s, %s", img.Tag, localName, ep, err))) + out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Error pulling image (%s) from %s, endpoint: %s, %s", img.Tag, localName, ep, err), nil)) continue } success = true break } if !success { - out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Error pulling", fmt.Sprintf("image (%s) from %s, %s", img.Tag, localName, lastErr))) + out.Write(sf.FormatProgress(utils.TruncateID(img.ID), fmt.Sprintf("Error pulling image (%s) from %s, %s", img.Tag, localName, lastErr), nil)) if parallel { errors <- fmt.Errorf("Could not find repository on any of the indexed registries.") return } } - out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download", "complete")) + out.Write(sf.FormatProgress(utils.TruncateID(img.ID), "Download complete", nil)) if parallel { errors <- nil @@ -1171,7 +1171,7 @@ func (srv *Server) pushImage(r *registry.Registry, out io.Writer, remote, imgID, defer os.RemoveAll(layerData.Name()) // Send the layer - checksum, err = r.PushImageLayerRegistry(imgData.ID, utils.ProgressReader(layerData, int(layerData.Size), out, sf.FormatProgress("", "Pushing", "%8v/%v (%v)"), sf, false), ep, token, jsonRaw) + checksum, err = r.PushImageLayerRegistry(imgData.ID, utils.ProgressReader(layerData, int(layerData.Size), out, sf, false, "", "Pushing"), ep, token, jsonRaw) if err != nil { return "", err } @@ -1251,7 +1251,7 @@ func (srv *Server) ImageImport(src, repo, tag string, in io.Reader, out io.Write if err != nil { return err } - archive = utils.ProgressReader(resp.Body, int(resp.ContentLength), out, sf.FormatProgress("", "Importing", "%8v/%v (%v)"), sf, true) + archive = utils.ProgressReader(resp.Body, int(resp.ContentLength), out, sf, true, "", "Importing") } img, err := srv.runtime.graph.Create(archive, nil, "Imported from "+src, "", nil) if err != nil { diff --git a/utils/jsonmessage.go b/utils/jsonmessage.go new file mode 100644 index 0000000000..93c40116e4 --- /dev/null +++ b/utils/jsonmessage.go @@ -0,0 +1,118 @@ +package utils + +import ( + "encoding/json" + "fmt" + "io" + "time" +) + +type JSONError struct { + Code int `json:"code,omitempty"` + Message string `json:"message,omitempty"` +} + +func (e *JSONError) Error() string { + return e.Message +} + +type JSONProgress struct { + Current int `json:"current,omitempty"` + Total int `json:"total,omitempty"` +} + +func (p *JSONProgress) String() string { + if p.Current == 0 && p.Total == 0 { + return "" + } + current := HumanSize(int64(p.Current)) + if p.Total == 0 { + return fmt.Sprintf("%8v/?", current) + } + total := HumanSize(int64(p.Total)) + percentage := float64(p.Current) / float64(p.Total) * 100 + return fmt.Sprintf("%8v/%v (%.0f%%)", current, total, percentage) +} + +type JSONMessage struct { + Status string `json:"status,omitempty"` + Progress *JSONProgress `json:"progressDetail,omitempty"` + ProgressMessage string `json:"progress,omitempty"` //deprecated + ID string `json:"id,omitempty"` + From string `json:"from,omitempty"` + Time int64 `json:"time,omitempty"` + Error *JSONError `json:"errorDetail,omitempty"` + ErrorMessage string `json:"error,omitempty"` //deprecated +} + +func (jm *JSONMessage) Display(out io.Writer, isTerminal bool) error { + if jm.Error != nil { + if jm.Error.Code == 401 { + return fmt.Errorf("Authentication is required.") + } + return jm.Error + } + endl := "" + if isTerminal { + // [2K = erase entire current line + fmt.Fprintf(out, "%c[2K\r", 27) + endl = "\r" + } + if jm.Time != 0 { + fmt.Fprintf(out, "[%s] ", time.Unix(jm.Time, 0)) + } + if jm.ID != "" { + fmt.Fprintf(out, "%s: ", jm.ID) + } + if jm.From != "" { + fmt.Fprintf(out, "(from %s) ", jm.From) + } + if jm.Progress != nil { + fmt.Fprintf(out, "%s %s%s", jm.Status, jm.Progress.String(), endl) + } else if jm.ProgressMessage != "" { //deprecated + fmt.Fprintf(out, "%s %s%s", jm.Status, jm.ProgressMessage, endl) + } else { + fmt.Fprintf(out, "%s%s\n", jm.Status, endl) + } + return nil +} + +func DisplayJSONMessagesStream(in io.Reader, out io.Writer, isTerminal bool) error { + dec := json.NewDecoder(in) + ids := make(map[string]int) + diff := 0 + for { + jm := JSONMessage{} + if err := dec.Decode(&jm); err == io.EOF { + break + } else if err != nil { + return err + } + if (jm.Progress != nil || jm.ProgressMessage != "") && jm.ID != "" { + line, ok := ids[jm.ID] + if !ok { + line = len(ids) + ids[jm.ID] = line + fmt.Fprintf(out, "\n") + diff = 0 + } else { + diff = len(ids) - line + } + if isTerminal { + // [{diff}A = move cursor up diff rows + fmt.Fprintf(out, "%c[%dA", 27, diff) + } + } + err := jm.Display(out, isTerminal) + if jm.ID != "" { + if isTerminal { + // [{diff}B = move cursor down diff rows + fmt.Fprintf(out, "%c[%dB", 27, diff) + } + } + if err != nil { + return err + } + } + return nil +} diff --git a/utils/progressreader.go b/utils/progressreader.go new file mode 100644 index 0000000000..db332a6d1c --- /dev/null +++ b/utils/progressreader.go @@ -0,0 +1,55 @@ +package utils + +import ( + "io" +) + +// Reader with progress bar +type progressReader struct { + reader io.ReadCloser // Stream to read from + output io.Writer // Where to send progress bar to + progress JSONProgress + // readTotal int // Expected stream length (bytes) + // readProgress int // How much has been read so far (bytes) + lastUpdate int // How many bytes read at least update + ID string + action string + // template string // Template to print. Default "%v/%v (%v)" + sf *StreamFormatter + newLine bool +} + +func (r *progressReader) Read(p []byte) (n int, err error) { + read, err := io.ReadCloser(r.reader).Read(p) + r.progress.Current += read + updateEvery := 1024 * 512 //512kB + if r.progress.Total > 0 { + // Update progress for every 1% read if 1% < 512kB + if increment := int(0.01 * float64(r.progress.Total)); increment < updateEvery { + updateEvery = increment + } + } + if r.progress.Current-r.lastUpdate > updateEvery || err != nil { + r.output.Write(r.sf.FormatProgress(r.ID, r.action, &r.progress)) + r.lastUpdate = r.progress.Current + } + // Send newline when complete + if r.newLine && err != nil { + r.output.Write(r.sf.FormatStatus("", "")) + } + return read, err +} +func (r *progressReader) Close() error { + return io.ReadCloser(r.reader).Close() +} +func ProgressReader(r io.ReadCloser, size int, output io.Writer, sf *StreamFormatter, newline bool, ID, action string) *progressReader { + return &progressReader{ + reader: r, + output: NewWriteFlusher(output), + ID: ID, + action: action, + progress: JSONProgress{Total: size}, + sf: sf, + newLine: newline, + } +} diff --git a/utils/streamformatter.go b/utils/streamformatter.go new file mode 100644 index 0000000000..a28cd5050e --- /dev/null +++ b/utils/streamformatter.go @@ -0,0 +1,68 @@ +package utils + +import ( + "encoding/json" + "fmt" +) + +type StreamFormatter struct { + json bool + used bool +} + +func NewStreamFormatter(json bool) *StreamFormatter { + return &StreamFormatter{json, false} +} + +func (sf *StreamFormatter) FormatStatus(id, format string, a ...interface{}) []byte { + sf.used = true + str := fmt.Sprintf(format, a...) + if sf.json { + b, err := json.Marshal(&JSONMessage{ID: id, Status: str}) + if err != nil { + return sf.FormatError(err) + } + return b + } + return []byte(str + "\r\n") +} + +func (sf *StreamFormatter) FormatError(err error) []byte { + sf.used = true + if sf.json { + jsonError, ok := err.(*JSONError) + if !ok { + jsonError = &JSONError{Message: err.Error()} + } + if b, err := json.Marshal(&JSONMessage{Error: jsonError, ErrorMessage: err.Error()}); err == nil { + return b + } + return []byte("{\"error\":\"format error\"}") + } + return []byte("Error: " + err.Error() + "\r\n") +} + +func (sf *StreamFormatter) FormatProgress(id, action string, progress *JSONProgress) []byte { + if progress == nil { + progress = &JSONProgress{} + } + sf.used = true + if sf.json { + + b, err := json.Marshal(&JSONMessage{ + Status: action, + ProgressMessage: progress.String(), + Progress: progress, + ID: id, + }) + if err != nil { + return nil + } + return b + } + return []byte(action + " " + progress.String() + "\r") +} + +func (sf *StreamFormatter) Used() bool { + return sf.used +} diff --git a/utils/utils.go b/utils/utils.go index cfdc73bb2e..0ab2245940 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -94,56 +94,6 @@ func Errorf(format string, a ...interface{}) { logf("error", format, a...) } -// Reader with progress bar -type progressReader struct { - reader io.ReadCloser // Stream to read from - output io.Writer // Where to send progress bar to - readTotal int // Expected stream length (bytes) - readProgress int // How much has been read so far (bytes) - lastUpdate int // How many bytes read at least update - template string // Template to print. Default "%v/%v (%v)" - sf *StreamFormatter - newLine bool -} - -func (r *progressReader) Read(p []byte) (n int, err error) { - read, err := io.ReadCloser(r.reader).Read(p) - r.readProgress += read - updateEvery := 1024 * 512 //512kB - if r.readTotal > 0 { - // Update progress for every 1% read if 1% < 512kB - if increment := int(0.01 * float64(r.readTotal)); increment < updateEvery { - updateEvery = increment - } - } - if r.readProgress-r.lastUpdate > updateEvery || err != nil { - if r.readTotal > 0 { - fmt.Fprintf(r.output, r.template, HumanSize(int64(r.readProgress)), HumanSize(int64(r.readTotal)), fmt.Sprintf("%.0f%%", float64(r.readProgress)/float64(r.readTotal)*100)) - } else { - fmt.Fprintf(r.output, r.template, r.readProgress, "?", "n/a") - } - r.lastUpdate = r.readProgress - } - // Send newline when complete - if r.newLine && err != nil { - r.output.Write(r.sf.FormatStatus("", "")) - } - return read, err -} -func (r *progressReader) Close() error { - return io.ReadCloser(r.reader).Close() -} -func ProgressReader(r io.ReadCloser, size int, output io.Writer, tpl []byte, sf *StreamFormatter, newline bool) *progressReader { - return &progressReader{ - reader: r, - output: NewWriteFlusher(output), - readTotal: size, - template: string(tpl), - sf: sf, - newLine: newline, - } -} - // HumanDuration returns a human-readable approximation of a duration // (eg. "About a minute", "4 hours ago", etc.) func HumanDuration(d time.Duration) string { @@ -754,25 +704,6 @@ func NewWriteFlusher(w io.Writer) *WriteFlusher { return &WriteFlusher{w: w, flusher: flusher} } -type JSONError struct { - Code int `json:"code,omitempty"` - Message string `json:"message,omitempty"` -} - -type JSONMessage struct { - Status string `json:"status,omitempty"` - Progress string `json:"progress,omitempty"` - ErrorMessage string `json:"error,omitempty"` //deprecated - ID string `json:"id,omitempty"` - From string `json:"from,omitempty"` - Time int64 `json:"time,omitempty"` - Error *JSONError `json:"errorDetail,omitempty"` -} - -func (e *JSONError) Error() string { - return e.Message -} - func NewHTTPRequestError(msg string, res *http.Response) error { return &JSONError{ Message: msg, @@ -780,129 +711,6 @@ func NewHTTPRequestError(msg string, res *http.Response) error { } } -func (jm *JSONMessage) Display(out io.Writer, isTerminal bool) error { - if jm.Error != nil { - if jm.Error.Code == 401 { - return fmt.Errorf("Authentication is required.") - } - return jm.Error - } - endl := "" - if isTerminal { - // [2K = erase entire current line - fmt.Fprintf(out, "%c[2K\r", 27) - endl = "\r" - } - if jm.Time != 0 { - fmt.Fprintf(out, "[%s] ", time.Unix(jm.Time, 0)) - } - if jm.ID != "" { - fmt.Fprintf(out, "%s: ", jm.ID) - } - if jm.From != "" { - fmt.Fprintf(out, "(from %s) ", jm.From) - } - if jm.Progress != "" { - fmt.Fprintf(out, "%s %s%s", jm.Status, jm.Progress, endl) - } else { - fmt.Fprintf(out, "%s%s\n", jm.Status, endl) - } - return nil -} - -func DisplayJSONMessagesStream(in io.Reader, out io.Writer, isTerminal bool) error { - dec := json.NewDecoder(in) - ids := make(map[string]int) - diff := 0 - for { - jm := JSONMessage{} - if err := dec.Decode(&jm); err == io.EOF { - break - } else if err != nil { - return err - } - if jm.Progress != "" && jm.ID != "" { - line, ok := ids[jm.ID] - if !ok { - line = len(ids) - ids[jm.ID] = line - fmt.Fprintf(out, "\n") - diff = 0 - } else { - diff = len(ids) - line - } - if isTerminal { - // [{diff}A = move cursor up diff rows - fmt.Fprintf(out, "%c[%dA", 27, diff) - } - } - err := jm.Display(out, isTerminal) - if jm.ID != "" { - if isTerminal { - // [{diff}B = move cursor down diff rows - fmt.Fprintf(out, "%c[%dB", 27, diff) - } - } - if err != nil { - return err - } - } - return nil -} - -type StreamFormatter struct { - json bool - used bool -} - -func NewStreamFormatter(json bool) *StreamFormatter { - return &StreamFormatter{json, false} -} - -func (sf *StreamFormatter) FormatStatus(id, format string, a ...interface{}) []byte { - sf.used = true - str := fmt.Sprintf(format, a...) - if sf.json { - b, err := json.Marshal(&JSONMessage{ID: id, Status: str}) - if err != nil { - return sf.FormatError(err) - } - return b - } - return []byte(str + "\r\n") -} - -func (sf *StreamFormatter) FormatError(err error) []byte { - sf.used = true - if sf.json { - jsonError, ok := err.(*JSONError) - if !ok { - jsonError = &JSONError{Message: err.Error()} - } - if b, err := json.Marshal(&JSONMessage{Error: jsonError, ErrorMessage: err.Error()}); err == nil { - return b - } - return []byte("{\"error\":\"format error\"}") - } - return []byte("Error: " + err.Error() + "\r\n") -} - -func (sf *StreamFormatter) FormatProgress(id, action, progress string) []byte { - sf.used = true - if sf.json { - b, err := json.Marshal(&JSONMessage{Status: action, Progress: progress, ID: id}) - if err != nil { - return nil - } - return b - } - return []byte(action + " " + progress + "\r") -} - -func (sf *StreamFormatter) Used() bool { - return sf.used -} - func IsURL(str string) bool { return strings.HasPrefix(str, "http://") || strings.HasPrefix(str, "https://") }