From 5cc6312bfc4e511784693d02b9bb8e8d9d1c04b0 Mon Sep 17 00:00:00 2001 From: Victor Vieux Date: Thu, 23 Jan 2014 11:12:17 -0800 Subject: [PATCH] move events to job Docker-DCO-1.1-Signed-off-by: Victor Vieux (github: vieux) --- api.go | 56 +++++---------------------------------------------- server.go | 60 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 51 deletions(-) diff --git a/api.go b/api.go index 864a577fd1..73933e5398 100644 --- a/api.go +++ b/api.go @@ -236,61 +236,15 @@ func getInfo(srv *Server, version float64, w http.ResponseWriter, r *http.Reques } func getEvents(srv *Server, version float64, w http.ResponseWriter, r *http.Request, vars map[string]string) error { - sendEvent := func(wf *utils.WriteFlusher, event *utils.JSONMessage) error { - b, err := json.Marshal(event) - if err != nil { - return fmt.Errorf("JSON error") - } - _, err = wf.Write(b) - if err != nil { - // On error, evict the listener - utils.Errorf("%s", err) - srv.Lock() - delete(srv.listeners, r.RemoteAddr) - srv.Unlock() - return err - } - return nil - } - if err := parseForm(r); err != nil { return err } - listener := make(chan utils.JSONMessage) - srv.Lock() - srv.listeners[r.RemoteAddr] = listener - srv.Unlock() - since, err := strconv.ParseInt(r.Form.Get("since"), 10, 0) - if err != nil { - since = 0 - } + w.Header().Set("Content-Type", "application/json") - wf := utils.NewWriteFlusher(w) - wf.Flush() - if since != 0 { - // If since, send previous events that happened after the timestamp - for _, event := range srv.GetEvents() { - if event.Time >= since { - err := sendEvent(wf, &event) - if err != nil && err.Error() == "JSON error" { - continue - } - if err != nil { - return err - } - } - } - } - for event := range listener { - err := sendEvent(wf, &event) - if err != nil && err.Error() == "JSON error" { - continue - } - if err != nil { - return err - } - } - return nil + var job = srv.Eng.Job("events", r.RemoteAddr) + job.Stdout.Add(utils.NewWriteFlusher(w)) + job.Setenv("since", r.Form.Get("since")) + return job.Run() } func getImagesHistory(srv *Server, version float64, w http.ResponseWriter, r *http.Request, vars map[string]string) error { diff --git a/server.go b/server.go index f7aaf90812..21ce15419f 100644 --- a/server.go +++ b/server.go @@ -101,6 +101,7 @@ func jobInitApi(job *engine.Job) engine.Status { "import": srv.ImageImport, "image_delete": srv.ImageDelete, "inspect": srv.JobInspect, + "events": srv.Events, } { if err := job.Eng.Register(name, handler); err != nil { job.Error(err) @@ -240,6 +241,65 @@ func (srv *Server) ContainerKill(job *engine.Job) engine.Status { } return engine.StatusOK } +func (srv *Server) Events(job *engine.Job) engine.Status { + if len(job.Args) != 1 { + job.Errorf("Usage: %s FROM", job.Name) + return engine.StatusErr + } + + var ( + from = job.Args[0] + since = job.GetenvInt64("since") + ) + sendEvent := func(event *utils.JSONMessage) error { + b, err := json.Marshal(event) + if err != nil { + return fmt.Errorf("JSON error") + } + _, err = job.Stdout.Write(b) + if err != nil { + // On error, evict the listener + utils.Errorf("%s", err) + srv.Lock() + delete(srv.listeners, from) + srv.Unlock() + return err + } + return nil + } + + listener := make(chan utils.JSONMessage) + srv.Lock() + srv.listeners[from] = listener + srv.Unlock() + job.Stdout.Write(nil) // flush + if since != 0 { + // If since, send previous events that happened after the timestamp + for _, event := range srv.GetEvents() { + if event.Time >= since { + err := sendEvent(&event) + if err != nil && err.Error() == "JSON error" { + continue + } + if err != nil { + job.Error(err) + return engine.StatusErr + } + } + } + } + for event := range listener { + err := sendEvent(&event) + if err != nil && err.Error() == "JSON error" { + continue + } + if err != nil { + job.Error(err) + return engine.StatusErr + } + } + return engine.StatusOK +} func (srv *Server) ContainerExport(job *engine.Job) engine.Status { if len(job.Args) != 1 {