From 9dcbdbc4b1addb67c0fdcadab1c8f98f30e58b4c Mon Sep 17 00:00:00 2001 From: Victor Vieux Date: Wed, 22 Jan 2014 13:35:35 -0800 Subject: [PATCH 1/3] move pull and import to a job Docker-DCO-1.1-Signed-off-by: Victor Vieux (github: vieux) --- api.go | 38 +++++++++--------- buildfile.go | 7 +++- engine/engine.go | 7 +++- engine/job.go | 8 +++- integration/runtime_test.go | 4 +- integration/sorter_test.go | 7 ++-- server.go | 77 ++++++++++++++++++++++++++++--------- utils/streamformatter.go | 4 ++ 8 files changed, 107 insertions(+), 45 deletions(-) diff --git a/api.go b/api.go index 0591723ea0..eba1ec31e9 100644 --- a/api.go +++ b/api.go @@ -413,11 +413,11 @@ func postImagesCreate(srv *Server, version float64, w http.ResponseWriter, r *ht return err } - src := r.Form.Get("fromSrc") - image := r.Form.Get("fromImage") - tag := r.Form.Get("tag") - repo := r.Form.Get("repo") - + var ( + image = r.Form.Get("fromImage") + tag = r.Form.Get("tag") + job *engine.Job + ) authEncoded := r.Header.Get("X-Registry-Auth") authConfig := &auth.AuthConfig{} if authEncoded != "" { @@ -431,7 +431,6 @@ func postImagesCreate(srv *Server, version float64, w http.ResponseWriter, r *ht if version > 1.0 { w.Header().Set("Content-Type", "application/json") } - sf := utils.NewStreamFormatter(version > 1.0) if image != "" { //pull metaHeaders := map[string][]string{} for k, v := range r.Header { @@ -439,22 +438,25 @@ func postImagesCreate(srv *Server, version float64, w http.ResponseWriter, r *ht metaHeaders[k] = v } } - if err := srv.ImagePull(image, tag, w, sf, authConfig, metaHeaders, version > 1.3); err != nil { - if sf.Used() { - w.Write(sf.FormatError(err)) - return nil - } - return err - } + job = srv.Eng.Job("pull", r.Form.Get("fromImage"), tag) + job.SetenvBool("parallel", version > 1.3) + job.SetenvJson("metaHeaders", metaHeaders) + job.SetenvJson("authConfig", authConfig) } else { //import - if err := srv.ImageImport(src, repo, tag, r.Body, w, sf); err != nil { - if sf.Used() { - w.Write(sf.FormatError(err)) - return nil - } + job = srv.Eng.Job("import", r.Form.Get("fromSrc"), r.Form.Get("repo"), tag) + job.Stdin.Add(r.Body) + } + + job.SetenvBool("json", version > 1.0) + job.Stdout.Add(w) + if err := job.Run(); err != nil { + if !job.Stdout.Used() { return err } + sf := utils.NewStreamFormatter(version > 1.0) + w.Write(sf.FormatError(err)) } + return nil } diff --git a/buildfile.go b/buildfile.go index fc8bfed5d3..89afccebbd 100644 --- a/buildfile.go +++ b/buildfile.go @@ -84,7 +84,12 @@ func (b *buildFile) CmdFrom(name string) error { resolvedAuth := b.configFile.ResolveAuthConfig(endpoint) pullRegistryAuth = &resolvedAuth } - if err := b.srv.ImagePull(remote, tag, b.outOld, b.sf, pullRegistryAuth, nil, true); err != nil { + job := b.srv.Eng.Job("pull", remote, tag) + job.SetenvBool("json", b.sf.Json()) + job.SetenvBool("parallel", true) + job.SetenvJson("authConfig", pullRegistryAuth) + job.Stdout.Add(b.outOld) + if err := job.Run(); err != nil { return err } image, err = b.runtime.repositories.LookupImage(name) diff --git a/engine/engine.go b/engine/engine.go index ff69dcd138..ec880b9c85 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -137,6 +137,9 @@ func (eng *Engine) Job(name string, args ...string) *Job { } func (eng *Engine) Logf(format string, args ...interface{}) (n int, err error) { - prefixedFormat := fmt.Sprintf("[%s] %s\n", eng, strings.TrimRight(format, "\n")) - return fmt.Fprintf(eng.Stderr, prefixedFormat, args...) + if os.Getenv("TEST") == "" { + prefixedFormat := fmt.Sprintf("[%s] %s\n", eng, strings.TrimRight(format, "\n")) + return fmt.Fprintf(eng.Stderr, prefixedFormat, args...) + } + return 0, nil } diff --git a/engine/job.go b/engine/job.go index 68b1715d92..179b2ebdda 100644 --- a/engine/job.go +++ b/engine/job.go @@ -3,6 +3,7 @@ package engine import ( "fmt" "io" + "os" "strings" "time" ) @@ -176,8 +177,11 @@ func (job *Job) Environ() map[string]string { } func (job *Job) Logf(format string, args ...interface{}) (n int, err error) { - prefixedFormat := fmt.Sprintf("[%s] %s\n", job, strings.TrimRight(format, "\n")) - return fmt.Fprintf(job.Stderr, prefixedFormat, args...) + if os.Getenv("TEST") == "" { + prefixedFormat := fmt.Sprintf("[%s] %s\n", job, strings.TrimRight(format, "\n")) + return fmt.Fprintf(job.Stderr, prefixedFormat, args...) + } + return 0, nil } func (job *Job) Printf(format string, args ...interface{}) (n int, err error) { diff --git a/integration/runtime_test.go b/integration/runtime_test.go index f3d8384082..008be9ef38 100644 --- a/integration/runtime_test.go +++ b/integration/runtime_test.go @@ -137,7 +137,9 @@ func setupBaseImage() { // If the unit test is not found, try to download it. if img, err := srv.ImageInspect(unitTestImageName); err != nil || img.ID != unitTestImageID { // Retrieve the Image - if err := srv.ImagePull(unitTestImageName, "", os.Stdout, utils.NewStreamFormatter(false), nil, nil, true); err != nil { + job = eng.Job("pull", unitTestImageName) + job.Stdout.Add(utils.NopWriteCloser(os.Stdout)) + if err := job.Run(); err != nil { log.Fatalf("Unable to pull the test image: %s", err) } } diff --git a/integration/sorter_test.go b/integration/sorter_test.go index 02d08d3409..d193fca1f0 100644 --- a/integration/sorter_test.go +++ b/integration/sorter_test.go @@ -2,8 +2,6 @@ package docker import ( "github.com/dotcloud/docker" - "github.com/dotcloud/docker/utils" - "io/ioutil" "testing" "time" ) @@ -53,5 +51,8 @@ func generateImage(name string, srv *docker.Server) error { if err != nil { return err } - return srv.ImageImport("-", "repo", name, archive, ioutil.Discard, utils.NewStreamFormatter(true)) + job := srv.Eng.Job("import", "-", "repo", name) + job.Stdin.Add(archive) + job.SetenvBool("json", true) + return job.Run() } diff --git a/server.go b/server.go index c0b45feeb2..e672f6a69a 100644 --- a/server.go +++ b/server.go @@ -97,6 +97,8 @@ func jobInitApi(job *engine.Job) engine.Status { "top": srv.ContainerTop, "load": srv.ImageLoad, "build": srv.Build, + "pull": srv.ImagePull, + "import": srv.ImageImport, } { if err := job.Eng.Register(name, handler); err != nil { job.Error(err) @@ -1312,8 +1314,25 @@ func (srv *Server) poolRemove(kind, key string) error { return nil } -func (srv *Server) ImagePull(localName string, tag string, out io.Writer, sf *utils.StreamFormatter, authConfig *auth.AuthConfig, metaHeaders map[string][]string, parallel bool) error { - out = utils.NewWriteFlusher(out) +func (srv *Server) ImagePull(job *engine.Job) engine.Status { + if n := len(job.Args); n != 1 && n != 2 { + job.Errorf("Usage: %s IMAGE [TAG]", job.Name) + return engine.StatusErr + } + var ( + localName = job.Args[0] + tag string + sf = utils.NewStreamFormatter(job.GetenvBool("json")) + out = utils.NewWriteFlusher(job.Stdout) + authConfig *auth.AuthConfig + metaHeaders map[string][]string + ) + if len(job.Args) > 1 { + tag = job.Args[1] + } + + job.GetenvJson("authConfig", authConfig) + job.GetenvJson("metaHeaders", metaHeaders) c, err := srv.poolAdd("pull", localName+":"+tag) if err != nil { @@ -1321,21 +1340,24 @@ func (srv *Server) ImagePull(localName string, tag string, out io.Writer, sf *ut // Another pull of the same repository is already taking place; just wait for it to finish out.Write(sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", localName)) <-c - return nil + return engine.StatusOK } - return err + job.Error(err) + return engine.StatusErr } defer srv.poolRemove("pull", localName+":"+tag) // Resolve the Repository name from fqn to endpoint + name endpoint, remoteName, err := registry.ResolveRepositoryName(localName) if err != nil { - return err + job.Error(err) + return engine.StatusErr } r, err := registry.NewRegistry(authConfig, srv.HTTPRequestFactory(metaHeaders), endpoint) if err != nil { - return err + job.Error(err) + return engine.StatusErr } if endpoint == auth.IndexServerAddress() { @@ -1343,11 +1365,12 @@ func (srv *Server) ImagePull(localName string, tag string, out io.Writer, sf *ut localName = remoteName } - if err = srv.pullRepository(r, out, localName, remoteName, tag, sf, parallel); err != nil { - return err + if err = srv.pullRepository(r, out, localName, remoteName, tag, sf, job.GetenvBool("parallel")); err != nil { + job.Error(err) + return engine.StatusErr } - return nil + return engine.StatusOK } // Retrieve the all the images to be uploaded in the correct order @@ -1551,16 +1574,31 @@ func (srv *Server) ImagePush(localName string, out io.Writer, sf *utils.StreamFo return nil } -func (srv *Server) ImageImport(src, repo, tag string, in io.Reader, out io.Writer, sf *utils.StreamFormatter) error { - var archive io.Reader - var resp *http.Response +func (srv *Server) ImageImport(job *engine.Job) engine.Status { + if n := len(job.Args); n != 2 && n != 3 { + job.Errorf("Usage: %s SRC REPO [TAG]", job.Name) + return engine.StatusErr + } + var ( + src = job.Args[0] + repo = job.Args[1] + tag string + sf = utils.NewStreamFormatter(job.GetenvBool("json")) + out = utils.NewWriteFlusher(job.Stdout) + archive io.Reader + resp *http.Response + ) + if len(job.Args) > 2 { + tag = job.Args[2] + } if src == "-" { - archive = in + archive = job.Stdin } else { u, err := url.Parse(src) if err != nil { - return err + job.Error(err) + return engine.StatusErr } if u.Scheme == "" { u.Scheme = "http" @@ -1572,22 +1610,25 @@ func (srv *Server) ImageImport(src, repo, tag string, in io.Reader, out io.Write // If curl is not available, fallback to http.Get() resp, err = utils.Download(u.String()) if err != nil { - return err + job.Error(err) + return engine.StatusErr } archive = utils.ProgressReader(resp.Body, int(resp.ContentLength), out, sf, true, "", "Importing") } img, err := srv.runtime.graph.Create(archive, nil, "Imported from "+src, "", nil) if err != nil { - return err + job.Error(err) + return engine.StatusErr } // Optionally register the image at REPO/TAG if repo != "" { if err := srv.runtime.repositories.Set(repo, tag, img.ID, true); err != nil { - return err + job.Error(err) + return engine.StatusErr } } out.Write(sf.FormatStatus("", img.ID)) - return nil + return engine.StatusOK } func (srv *Server) ContainerCreate(job *engine.Job) engine.Status { diff --git a/utils/streamformatter.go b/utils/streamformatter.go index 0c41d0bddd..9345c3cb16 100644 --- a/utils/streamformatter.go +++ b/utils/streamformatter.go @@ -82,3 +82,7 @@ func (sf *StreamFormatter) FormatProgress(id, action string, progress *JSONProgr func (sf *StreamFormatter) Used() bool { return sf.used } + +func (sf *StreamFormatter) Json() bool { + return sf.json +} From 90e9a2d85a6c981b137df0c22c31d6f32f4b6f66 Mon Sep 17 00:00:00 2001 From: Victor Vieux Date: Wed, 22 Jan 2014 16:15:23 -0800 Subject: [PATCH 2/3] fix flush Docker-DCO-1.1-Signed-off-by: Victor Vieux (github: vieux) --- engine/streams.go | 12 ++++++++++++ server.go | 2 +- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/engine/streams.go b/engine/streams.go index 4b1f172b49..07db058b2d 100644 --- a/engine/streams.go +++ b/engine/streams.go @@ -109,6 +109,18 @@ func (o *Output) Write(p []byte) (n int, err error) { return len(p), firstErr } +func (o *Output) Flush() { + o.Mutex.Lock() + defer o.Mutex.Unlock() + for _, dst := range o.dests { + if f, ok := dst.(interface { + Flush() + }); ok { + f.Flush() + } + } +} + // Close unregisters all destinations and waits for all background // AddTail and AddString tasks to complete. // The Close method of each destination is called if it exists. diff --git a/server.go b/server.go index e672f6a69a..3e4daad77f 100644 --- a/server.go +++ b/server.go @@ -1324,7 +1324,7 @@ func (srv *Server) ImagePull(job *engine.Job) engine.Status { tag string sf = utils.NewStreamFormatter(job.GetenvBool("json")) out = utils.NewWriteFlusher(job.Stdout) - authConfig *auth.AuthConfig + authConfig = &auth.AuthConfig{} metaHeaders map[string][]string ) if len(job.Args) > 1 { From 35641f0ec7ecae16f88ba9affe0aeea0ae864874 Mon Sep 17 00:00:00 2001 From: Victor Vieux Date: Thu, 23 Jan 2014 16:00:07 -0800 Subject: [PATCH 3/3] remove useless flush method Docker-DCO-1.1-Signed-off-by: Victor Vieux (github: vieux) --- api.go | 2 +- engine/streams.go | 12 ------------ server.go | 12 +++++------- 3 files changed, 6 insertions(+), 20 deletions(-) diff --git a/api.go b/api.go index eba1ec31e9..13397e6b92 100644 --- a/api.go +++ b/api.go @@ -448,7 +448,7 @@ func postImagesCreate(srv *Server, version float64, w http.ResponseWriter, r *ht } job.SetenvBool("json", version > 1.0) - job.Stdout.Add(w) + job.Stdout.Add(utils.NewWriteFlusher(w)) if err := job.Run(); err != nil { if !job.Stdout.Used() { return err diff --git a/engine/streams.go b/engine/streams.go index 07db058b2d..4b1f172b49 100644 --- a/engine/streams.go +++ b/engine/streams.go @@ -109,18 +109,6 @@ func (o *Output) Write(p []byte) (n int, err error) { return len(p), firstErr } -func (o *Output) Flush() { - o.Mutex.Lock() - defer o.Mutex.Unlock() - for _, dst := range o.dests { - if f, ok := dst.(interface { - Flush() - }); ok { - f.Flush() - } - } -} - // Close unregisters all destinations and waits for all background // AddTail and AddString tasks to complete. // The Close method of each destination is called if it exists. diff --git a/server.go b/server.go index 3e4daad77f..49beeb5fb4 100644 --- a/server.go +++ b/server.go @@ -1323,7 +1323,6 @@ func (srv *Server) ImagePull(job *engine.Job) engine.Status { localName = job.Args[0] tag string sf = utils.NewStreamFormatter(job.GetenvBool("json")) - out = utils.NewWriteFlusher(job.Stdout) authConfig = &auth.AuthConfig{} metaHeaders map[string][]string ) @@ -1338,7 +1337,7 @@ func (srv *Server) ImagePull(job *engine.Job) engine.Status { if err != nil { if c != nil { // Another pull of the same repository is already taking place; just wait for it to finish - out.Write(sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", localName)) + job.Stdout.Write(sf.FormatStatus("", "Repository %s already being pulled by another client. Waiting.", localName)) <-c return engine.StatusOK } @@ -1365,7 +1364,7 @@ func (srv *Server) ImagePull(job *engine.Job) engine.Status { localName = remoteName } - if err = srv.pullRepository(r, out, localName, remoteName, tag, sf, job.GetenvBool("parallel")); err != nil { + if err = srv.pullRepository(r, job.Stdout, localName, remoteName, tag, sf, job.GetenvBool("parallel")); err != nil { job.Error(err) return engine.StatusErr } @@ -1584,7 +1583,6 @@ func (srv *Server) ImageImport(job *engine.Job) engine.Status { repo = job.Args[1] tag string sf = utils.NewStreamFormatter(job.GetenvBool("json")) - out = utils.NewWriteFlusher(job.Stdout) archive io.Reader resp *http.Response ) @@ -1605,7 +1603,7 @@ func (srv *Server) ImageImport(job *engine.Job) engine.Status { u.Host = src u.Path = "" } - out.Write(sf.FormatStatus("", "Downloading from %s", u)) + job.Stdout.Write(sf.FormatStatus("", "Downloading from %s", u)) // Download with curl (pretty progress bar) // If curl is not available, fallback to http.Get() resp, err = utils.Download(u.String()) @@ -1613,7 +1611,7 @@ func (srv *Server) ImageImport(job *engine.Job) engine.Status { job.Error(err) return engine.StatusErr } - archive = utils.ProgressReader(resp.Body, int(resp.ContentLength), out, sf, true, "", "Importing") + archive = utils.ProgressReader(resp.Body, int(resp.ContentLength), job.Stdout, sf, true, "", "Importing") } img, err := srv.runtime.graph.Create(archive, nil, "Imported from "+src, "", nil) if err != nil { @@ -1627,7 +1625,7 @@ func (srv *Server) ImageImport(job *engine.Job) engine.Status { return engine.StatusErr } } - out.Write(sf.FormatStatus("", img.ID)) + job.Stdout.Write(sf.FormatStatus("", img.ID)) return engine.StatusOK }