diff --git a/api.go b/api.go index b6ab7badfa..9d0348b608 100644 --- a/api.go +++ b/api.go @@ -217,6 +217,31 @@ func getInfo(srv *Server, version float64, w http.ResponseWriter, r *http.Reques return nil } +func getEvents(srv *Server, version float64, w http.ResponseWriter, r *http.Request, vars map[string]string) error { + events := make(chan utils.JSONMessage) + srv.Lock() + srv.events[r.RemoteAddr] = events + srv.Unlock() + w.Header().Set("Content-Type", "application/json") + wf := utils.NewWriteFlusher(w) + for { + event := <-events + b, err := json.Marshal(event) + if err != nil { + continue + } + _, err = wf.Write(b) + if err != nil { + utils.Debugf("%s", err) + srv.Lock() + delete(srv.events, r.RemoteAddr) + srv.Unlock() + return err + } + } + return nil +} + func getImagesHistory(srv *Server, version float64, w http.ResponseWriter, r *http.Request, vars map[string]string) error { if vars == nil { return fmt.Errorf("Missing parameter") @@ -855,8 +880,9 @@ func createRouter(srv *Server, logging bool) (*mux.Router, error) { m := map[string]map[string]func(*Server, float64, http.ResponseWriter, *http.Request, map[string]string) error{ "GET": { "/auth": getAuth, - "/version": getVersion, + "/events": getEvents, "/info": getInfo, + "/version": getVersion, "/images/json": getImagesJSON, "/images/viz": getImagesViz, "/images/search": getImagesSearch, diff --git a/api_params.go b/api_params.go index b371ca314f..26d70711a1 100644 --- a/api_params.go +++ b/api_params.go @@ -17,13 +17,14 @@ type APIImages struct { } type APIInfo struct { - Debug bool - Containers int - Images int - NFd int `json:",omitempty"` - NGoroutines int `json:",omitempty"` - MemoryLimit bool `json:",omitempty"` - SwapLimit bool `json:",omitempty"` + Debug bool + Containers int + Images int + NFd int `json:",omitempty"` + NGoroutines int `json:",omitempty"` + MemoryLimit bool `json:",omitempty"` + SwapLimit bool `json:",omitempty"` + NEventsListener int `json:",omitempty"` } type APITop struct { diff --git a/commands.go b/commands.go index b0e32162e6..12647feead 100644 --- a/commands.go +++ b/commands.go @@ -78,6 +78,7 @@ func (cli *DockerCli) CmdHelp(args ...string) error { {"build", "Build a container from a Dockerfile"}, {"commit", "Create a new image from a container's changes"}, {"diff", "Inspect changes on a container's filesystem"}, + {"events", "Get real time events from the server"}, {"export", "Stream the contents of a container as a tar archive"}, {"history", "Show the history of an image"}, {"images", "List images"}, @@ -466,6 +467,7 @@ func (cli *DockerCli) CmdInfo(args ...string) error { fmt.Fprintf(cli.out, "Debug mode (client): %v\n", os.Getenv("DEBUG") != "") fmt.Fprintf(cli.out, "Fds: %d\n", out.NFd) fmt.Fprintf(cli.out, "Goroutines: %d\n", out.NGoroutines) + fmt.Fprintf(cli.out, "EventsListeners: %d\n", out.NEventsListener) } if !out.MemoryLimit { fmt.Fprintf(cli.err, "WARNING: No memory limit support\n") @@ -1055,6 +1057,23 @@ func (cli *DockerCli) CmdCommit(args ...string) error { return nil } +func (cli *DockerCli) CmdEvents(args ...string) error { + cmd := Subcmd("events", "", "Get real time events from the server") + if err := cmd.Parse(args); err != nil { + return nil + } + + if cmd.NArg() != 0 { + cmd.Usage() + return nil + } + + if err := cli.stream("GET", "/events", nil, cli.out); err != nil { + return err + } + return nil +} + func (cli *DockerCli) CmdExport(args ...string) error { cmd := Subcmd("export", "CONTAINER", "Export the contents of a filesystem as a tar archive") if err := cmd.Parse(args); err != nil { @@ -1509,19 +1528,13 @@ func (cli *DockerCli) stream(method, path string, in io.Reader, out io.Writer) e if resp.Header.Get("Content-Type") == "application/json" { dec := json.NewDecoder(resp.Body) for { - var m utils.JSONMessage - if err := dec.Decode(&m); err == io.EOF { + var jm utils.JSONMessage + if err := dec.Decode(&jm); err == io.EOF { break } else if err != nil { return err } - if m.Progress != "" { - fmt.Fprintf(out, "%s %s\r", m.Status, m.Progress) - } else if m.Error != "" { - return fmt.Errorf(m.Error) - } else { - fmt.Fprintf(out, "%s\n", m.Status) - } + jm.Display(out) } } else { if _, err := io.Copy(out, resp.Body); err != nil { diff --git a/runtime_test.go b/runtime_test.go index 66d92c8100..807097404d 100644 --- a/runtime_test.go +++ b/runtime_test.go @@ -17,12 +17,12 @@ import ( ) const ( - unitTestImageName = "docker-test-image" - unitTestImageID = "83599e29c455eb719f77d799bc7c51521b9551972f5a850d7ad265bc1b5292f6" // 1.0 - unitTestNetworkBridge = "testdockbr0" - unitTestStoreBase = "/var/lib/docker/unit-tests" - testDaemonAddr = "127.0.0.1:4270" - testDaemonProto = "tcp" + unitTestImageName = "docker-test-image" + unitTestImageID = "83599e29c455eb719f77d799bc7c51521b9551972f5a850d7ad265bc1b5292f6" // 1.0 + unitTestNetworkBridge = "testdockbr0" + unitTestStoreBase = "/var/lib/docker/unit-tests" + testDaemonAddr = "127.0.0.1:4270" + testDaemonProto = "tcp" ) var globalRuntime *Runtime diff --git a/server.go b/server.go index b92ed8fd73..2499d64397 100644 --- a/server.go +++ b/server.go @@ -32,8 +32,9 @@ func (srv *Server) DockerVersion() APIVersion { func (srv *Server) ContainerKill(name string) error { if container := srv.runtime.Get(name); container != nil { if err := container.Kill(); err != nil { - return fmt.Errorf("Error restarting container %s: %s", name, err) + return fmt.Errorf("Error killing container %s: %s", name, err) } + srv.SendEvent("kill", name) } else { return fmt.Errorf("No such container: %s", name) } @@ -52,6 +53,7 @@ func (srv *Server) ContainerExport(name string, out io.Writer) error { if _, err := io.Copy(out, data); err != nil { return err } + srv.SendEvent("export", name) return nil } return fmt.Errorf("No such container: %s", name) @@ -209,13 +211,14 @@ func (srv *Server) DockerInfo() *APIInfo { imgcount = len(images) } return &APIInfo{ - Containers: len(srv.runtime.List()), - Images: imgcount, - MemoryLimit: srv.runtime.capabilities.MemoryLimit, - SwapLimit: srv.runtime.capabilities.SwapLimit, - Debug: os.Getenv("DEBUG") != "", - NFd: utils.GetTotalUsedFds(), - NGoroutines: runtime.NumGoroutine(), + Containers: len(srv.runtime.List()), + Images: imgcount, + MemoryLimit: srv.runtime.capabilities.MemoryLimit, + SwapLimit: srv.runtime.capabilities.SwapLimit, + Debug: os.Getenv("DEBUG") != "", + NFd: utils.GetTotalUsedFds(), + NGoroutines: runtime.NumGoroutine(), + NEventsListener: len(srv.events), } } @@ -810,6 +813,7 @@ func (srv *Server) ContainerCreate(config *Config) (string, error) { } return "", err } + srv.SendEvent("create", container.ShortID()) return container.ShortID(), nil } @@ -818,6 +822,7 @@ func (srv *Server) ContainerRestart(name string, t int) error { if err := container.Restart(t); err != nil { return fmt.Errorf("Error restarting container %s: %s", name, err) } + srv.SendEvent("restart", name) } else { return fmt.Errorf("No such container: %s", name) } @@ -837,6 +842,7 @@ func (srv *Server) ContainerDestroy(name string, removeVolume bool) error { if err := srv.runtime.Destroy(container); err != nil { return fmt.Errorf("Error destroying container %s: %s", name, err) } + srv.SendEvent("destroy", name) if removeVolume { // Retrieve all volumes from all remaining containers @@ -903,6 +909,7 @@ func (srv *Server) deleteImageAndChildren(id string, imgs *[]APIRmi) error { return err } *imgs = append(*imgs, APIRmi{Deleted: utils.TruncateID(id)}) + srv.SendEvent("delete", utils.TruncateID(id)) return nil } return nil @@ -946,6 +953,7 @@ func (srv *Server) deleteImage(img *Image, repoName, tag string) ([]APIRmi, erro } if tagDeleted { imgs = append(imgs, APIRmi{Untagged: img.ShortID()}) + srv.SendEvent("untagged", img.ShortID()) } if len(srv.runtime.repositories.ByID()[img.ID]) == 0 { if err := srv.deleteImageAndChildren(img.ID, &imgs); err != nil { @@ -1018,6 +1026,7 @@ func (srv *Server) ContainerStart(name string, hostConfig *HostConfig) error { if err := container.Start(hostConfig); err != nil { return fmt.Errorf("Error starting container %s: %s", name, err) } + srv.SendEvent("start", name) } else { return fmt.Errorf("No such container: %s", name) } @@ -1029,6 +1038,7 @@ func (srv *Server) ContainerStop(name string, t int) error { if err := container.Stop(t); err != nil { return fmt.Errorf("Error stopping container %s: %s", name, err) } + srv.SendEvent("stop", name) } else { return fmt.Errorf("No such container: %s", name) } @@ -1162,15 +1172,23 @@ func NewServer(flGraphPath string, autoRestart, enableCors bool, dns ListOpts) ( enableCors: enableCors, pullingPool: make(map[string]struct{}), pushingPool: make(map[string]struct{}), + events: make(map[string]chan utils.JSONMessage), } runtime.srv = srv return srv, nil } +func (srv *Server) SendEvent(action, id string) { + for _, c := range srv.events { + c <- utils.JSONMessage{Status: action, ID: id} + } +} + type Server struct { sync.Mutex runtime *Runtime enableCors bool pullingPool map[string]struct{} pushingPool map[string]struct{} + events map[string]chan utils.JSONMessage } diff --git a/utils/utils.go b/utils/utils.go index 77b3f879cd..1523835f99 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -611,8 +611,23 @@ type JSONMessage struct { Status string `json:"status,omitempty"` Progress string `json:"progress,omitempty"` Error string `json:"error,omitempty"` + ID string `json:"id,omitempty"` } +func (jm *JSONMessage) Display(out io.Writer) (error) { + if jm.Progress != "" { + fmt.Fprintf(out, "%s %s\r", jm.Status, jm.Progress) + } else if jm.Error != "" { + return fmt.Errorf(jm.Error) + } else if jm.ID != "" { + fmt.Fprintf(out, "%s: %s\n", jm.ID, jm.Status) + } else { + fmt.Fprintf(out, "%s\n", jm.Status) + } + return nil +} + + type StreamFormatter struct { json bool used bool