jsonstream WIP

This commit is contained in:
Victor Vieux 2013-05-23 15:16:35 +00:00
parent 422edd513a
commit cf35e8ed81
4 changed files with 35 additions and 19 deletions

1
api.go
View File

@ -288,6 +288,7 @@ func postImagesCreate(srv *Server, w http.ResponseWriter, r *http.Request, vars
if image != "" { //pull if image != "" { //pull
registry := r.Form.Get("registry") registry := r.Form.Get("registry")
w.Header().Set("Content-Type", "application/json")
if err := srv.ImagePull(image, tag, registry, w); err != nil { if err := srv.ImagePull(image, tag, registry, w); err != nil {
return err return err
} }

View File

@ -1223,8 +1223,31 @@ func (cli *DockerCli) stream(method, path string, in io.Reader, out io.Writer) e
return fmt.Errorf("error: %s", body) return fmt.Errorf("error: %s", body)
} }
if _, err := io.Copy(out, resp.Body); err != nil { if resp.Header.Get("Content-Type") == "application/json" {
return err
type Message struct {
Status string `json:"status,omitempty"`
Progress string `json:"progress,omitempty"`
}
dec := json.NewDecoder(resp.Body)
for {
var m Message
if err := dec.Decode(&m); err == io.EOF {
break
} else if err != nil {
return err
}
if m.Status != "" {
fmt.Fprintf(out, "%s\n", m.Status)
} else if m.Progress != "" {
fmt.Fprintf(out, "Downloading... %s\r", m.Progress)
}
}
fmt.Fprintf(out, "\n")
} else {
if _, err := io.Copy(out, resp.Body); err != nil {
return err
}
} }
return nil return nil
} }

View File

@ -292,17 +292,15 @@ func (srv *Server) ContainerTag(name, repo, tag string, force bool) error {
} }
func (srv *Server) pullImage(out io.Writer, imgId, registry string, token []string) error { func (srv *Server) pullImage(out io.Writer, imgId, registry string, token []string) error {
out = utils.NewWriteFlusher(out)
history, err := srv.registry.GetRemoteHistory(imgId, registry, token) history, err := srv.registry.GetRemoteHistory(imgId, registry, token)
if err != nil { if err != nil {
return err return err
} }
// FIXME: Try to stream the images? // FIXME: Try to stream the images?
// FIXME: Launch the getRemoteImage() in goroutines // FIXME: Launch the getRemoteImage() in goroutines
for _, id := range history { for _, id := range history {
if !srv.runtime.graph.Exists(id) { if !srv.runtime.graph.Exists(id) {
fmt.Fprintf(out, "Pulling %s metadata\r\n", id) fmt.Fprintf(out, "{\"status\" :\"Pulling %s metadata\"}", id)
imgJson, err := srv.registry.GetRemoteImageJson(id, registry, token) imgJson, err := srv.registry.GetRemoteImageJson(id, registry, token)
if err != nil { if err != nil {
// FIXME: Keep goging in case of error? // FIXME: Keep goging in case of error?
@ -314,12 +312,12 @@ func (srv *Server) pullImage(out io.Writer, imgId, registry string, token []stri
} }
// Get the layer // Get the layer
fmt.Fprintf(out, "Pulling %s fs layer\r\n", img.Id) fmt.Fprintf(out, "{\"status\" :\"Pulling %s fs layer\"}", img.Id)
layer, contentLength, err := srv.registry.GetRemoteImageLayer(img.Id, registry, token) layer, contentLength, err := srv.registry.GetRemoteImageLayer(img.Id, registry, token)
if err != nil { if err != nil {
return err return err
} }
if err := srv.runtime.graph.Register(utils.ProgressReader(layer, contentLength, out, "Downloading %v/%v (%v)"), false, img); err != nil { if err := srv.runtime.graph.Register(utils.ProgressReader(layer, contentLength, out, ""), false, img); err != nil {
return err return err
} }
} }
@ -328,8 +326,7 @@ func (srv *Server) pullImage(out io.Writer, imgId, registry string, token []stri
} }
func (srv *Server) pullRepository(out io.Writer, remote, askedTag string) error { func (srv *Server) pullRepository(out io.Writer, remote, askedTag string) error {
out = utils.NewWriteFlusher(out) fmt.Fprintf(out, "{\"status\":\"Pulling repository %s from %s\"}", remote, auth.IndexServerAddress())
fmt.Fprintf(out, "Pulling repository %s from %s\r\n", remote, auth.IndexServerAddress())
repoData, err := srv.registry.GetRepositoryData(remote) repoData, err := srv.registry.GetRepositoryData(remote)
if err != nil { if err != nil {
return err return err
@ -366,7 +363,7 @@ func (srv *Server) pullRepository(out io.Writer, remote, askedTag string) error
utils.Debugf("(%s) does not match %s (id: %s), skipping", img.Tag, askedTag, img.Id) utils.Debugf("(%s) does not match %s (id: %s), skipping", img.Tag, askedTag, img.Id)
continue continue
} }
fmt.Fprintf(out, "Pulling image %s (%s) from %s\n", img.Id, img.Tag, remote) fmt.Fprintf(out, "{\"status\":\"Pulling image %s (%s) from %s\"}", img.Id, img.Tag, remote)
success := false success := false
for _, ep := range repoData.Endpoints { for _, ep := range repoData.Endpoints {
if err := srv.pullImage(out, img.Id, "https://"+ep+"/v1", repoData.Tokens); err != nil { if err := srv.pullImage(out, img.Id, "https://"+ep+"/v1", repoData.Tokens); err != nil {
@ -396,6 +393,7 @@ func (srv *Server) pullRepository(out io.Writer, remote, askedTag string) error
} }
func (srv *Server) ImagePull(name, tag, registry string, out io.Writer) error { func (srv *Server) ImagePull(name, tag, registry string, out io.Writer) error {
out = utils.NewWriteFlusher(out)
if registry != "" { if registry != "" {
if err := srv.pullImage(out, name, registry, nil); err != nil { if err := srv.pullImage(out, name, registry, nil); err != nil {
return err return err
@ -406,7 +404,6 @@ func (srv *Server) ImagePull(name, tag, registry string, out io.Writer) error {
if err := srv.pullRepository(out, name, tag); err != nil { if err := srv.pullRepository(out, name, tag); err != nil {
return err return err
} }
return nil return nil
} }

View File

@ -84,17 +84,12 @@ func (r *progressReader) Read(p []byte) (n int, err error) {
} }
if r.readProgress-r.lastUpdate > updateEvery || err != nil { if r.readProgress-r.lastUpdate > updateEvery || err != nil {
if r.readTotal > 0 { if r.readTotal > 0 {
fmt.Fprintf(r.output, r.template+"\r", r.readProgress, r.readTotal, fmt.Sprintf("%.0f%%", float64(r.readProgress)/float64(r.readTotal)*100)) fmt.Fprintf(r.output, r.template, r.readProgress, r.readTotal)
} else { } else {
fmt.Fprintf(r.output, r.template+"\r", r.readProgress, "?", "n/a") fmt.Fprintf(r.output, r.template, r.readProgress, "?")
} }
r.lastUpdate = r.readProgress r.lastUpdate = r.readProgress
} }
// Send newline when complete
if err != nil {
fmt.Fprintf(r.output, "\n")
}
return read, err return read, err
} }
func (r *progressReader) Close() error { func (r *progressReader) Close() error {
@ -102,7 +97,7 @@ func (r *progressReader) Close() error {
} }
func ProgressReader(r io.ReadCloser, size int, output io.Writer, template string) *progressReader { func ProgressReader(r io.ReadCloser, size int, output io.Writer, template string) *progressReader {
if template == "" { if template == "" {
template = "%v/%v (%v)" template = "{\"progress\":\"%v/%v\"}"
} }
return &progressReader{r, NewWriteFlusher(output), size, 0, 0, template} return &progressReader{r, NewWriteFlusher(output), size, 0, 0, template}
} }