diff --git a/server/server.go b/server/server.go index 76a51e796f..620c8b20b8 100644 --- a/server/server.go +++ b/server/server.go @@ -248,85 +248,63 @@ func (srv *Server) ContainerKill(job *engine.Job) engine.Status { return engine.StatusOK } -func (srv *Server) EvictListener(from int64) { - srv.Lock() - if old, ok := srv.listeners[from]; ok { - delete(srv.listeners, from) - close(old) - } - srv.Unlock() -} - func (srv *Server) Events(job *engine.Job) engine.Status { if len(job.Args) != 0 { return job.Errorf("Usage: %s", job.Name) } var ( - from = time.Now().UTC().UnixNano() since = job.GetenvInt64("since") until = job.GetenvInt64("until") timeout = time.NewTimer(time.Unix(until, 0).Sub(time.Now())) ) - sendEvent := func(event *utils.JSONMessage) error { - b, err := json.Marshal(event) - if err != nil { - return fmt.Errorf("JSON error") - } - _, err = job.Stdout.Write(b) - return err + + // If no until, disable timeout + if until == 0 { + timeout.Stop() } listener := make(chan utils.JSONMessage) - srv.Lock() - if old, ok := srv.listeners[from]; ok { - delete(srv.listeners, from) - close(old) + srv.eventPublisher.Subscribe(listener) + defer srv.eventPublisher.Unsubscribe(listener) + + // When sending an event JSON serialization errors are ignored, but all + // other errors lead to the eviction of the listener. + sendEvent := func(event *utils.JSONMessage) error { + if b, err := json.Marshal(event); err == nil { + if _, err = job.Stdout.Write(b); err != nil { + return err + } + } + return nil } - srv.listeners[from] = listener - srv.Unlock() - job.Stdout.Write(nil) // flush + + job.Stdout.Write(nil) + + // Resend every event in the [since, until] time interval. if since != 0 { - // If since, send previous events that happened after the timestamp and until timestamp for _, event := range srv.GetEvents() { if event.Time >= since && (event.Time <= until || until == 0) { - err := sendEvent(&event) - if err != nil && err.Error() == "JSON error" { - continue - } - if err != nil { - // On error, evict the listener - srv.EvictListener(from) + if err := sendEvent(&event); err != nil { return job.Error(err) } } } } - // If no until, disable timeout - if until == 0 { - timeout.Stop() - } for { select { case event, ok := <-listener: - if !ok { // Channel is closed: listener was evicted + if !ok { return engine.StatusOK } - err := sendEvent(&event) - if err != nil && err.Error() == "JSON error" { - continue - } - if err != nil { - // On error, evict the listener - srv.EvictListener(from) + if err := sendEvent(&event); err != nil { return job.Error(err) } case <-timeout.C: return engine.StatusOK } } - return engine.StatusOK } func (srv *Server) ContainerExport(job *engine.Job) engine.Status { @@ -797,7 +775,7 @@ func (srv *Server) DockerInfo(job *engine.Job) engine.Status { v.SetInt("NFd", utils.GetTotalUsedFds()) v.SetInt("NGoroutines", runtime.NumGoroutine()) v.Set("ExecutionDriver", srv.daemon.ExecutionDriver().Name()) - v.SetInt("NEventsListener", len(srv.listeners)) + v.SetInt("NEventsListener", srv.eventPublisher.SubscribersCount()) v.Set("KernelVersion", kernelVersion) v.Set("IndexServerAddress", registry.IndexServerAddress()) v.Set("InitSha1", dockerversion.INITSHA1) @@ -2387,12 +2365,12 @@ func NewServer(eng *engine.Engine, config *daemonconfig.Config) (*Server, error) return nil, err } srv := &Server{ - Eng: eng, - daemon: daemon, - pullingPool: make(map[string]chan struct{}), - pushingPool: make(map[string]chan struct{}), - events: make([]utils.JSONMessage, 0, 64), //only keeps the 64 last events - listeners: make(map[int64]chan utils.JSONMessage), + Eng: eng, + daemon: daemon, + pullingPool: make(map[string]chan struct{}), + pushingPool: make(map[string]chan struct{}), + events: make([]utils.JSONMessage, 0, 64), //only keeps the 64 last events + eventPublisher: utils.NewJSONMessagePublisher(), } daemon.SetServer(srv) return srv, nil @@ -2402,14 +2380,7 @@ func (srv *Server) LogEvent(action, id, from string) *utils.JSONMessage { now := time.Now().UTC().Unix() jm := utils.JSONMessage{Status: action, ID: id, From: from, Time: now} srv.AddEvent(jm) - srv.Lock() - for _, c := range srv.listeners { - select { // non blocking channel - case c <- jm: - default: - } - } - srv.Unlock() + srv.eventPublisher.Publish(jm) return &jm } @@ -2461,12 +2432,12 @@ func (srv *Server) Close() error { type Server struct { sync.RWMutex - daemon *daemon.Daemon - pullingPool map[string]chan struct{} - pushingPool map[string]chan struct{} - events []utils.JSONMessage - listeners map[int64]chan utils.JSONMessage - Eng *engine.Engine - running bool - tasks sync.WaitGroup + daemon *daemon.Daemon + pullingPool map[string]chan struct{} + pushingPool map[string]chan struct{} + events []utils.JSONMessage + eventPublisher *utils.JSONMessagePublisher + Eng *engine.Engine + running bool + tasks sync.WaitGroup } diff --git a/server/server_unit_test.go b/server/server_unit_test.go index 47e4be8280..e6c5d49b82 100644 --- a/server/server_unit_test.go +++ b/server/server_unit_test.go @@ -47,16 +47,14 @@ func TestPools(t *testing.T) { func TestLogEvent(t *testing.T) { srv := &Server{ - events: make([]utils.JSONMessage, 0, 64), - listeners: make(map[int64]chan utils.JSONMessage), + events: make([]utils.JSONMessage, 0, 64), + eventPublisher: utils.NewJSONMessagePublisher(), } srv.LogEvent("fakeaction", "fakeid", "fakeimage") listener := make(chan utils.JSONMessage) - srv.Lock() - srv.listeners[1337] = listener - srv.Unlock() + srv.eventPublisher.Subscribe(listener) srv.LogEvent("fakeaction2", "fakeid", "fakeimage") diff --git a/utils/jsonmessagepublisher.go b/utils/jsonmessagepublisher.go new file mode 100644 index 0000000000..659e6c8304 --- /dev/null +++ b/utils/jsonmessagepublisher.go @@ -0,0 +1,61 @@ +package utils + +import ( + "sync" + "time" +) + +func NewJSONMessagePublisher() *JSONMessagePublisher { + return &JSONMessagePublisher{} +} + +type JSONMessageListener chan<- JSONMessage + +type JSONMessagePublisher struct { + m sync.RWMutex + subscribers []JSONMessageListener +} + +func (p *JSONMessagePublisher) Subscribe(l JSONMessageListener) { + p.m.Lock() + p.subscribers = append(p.subscribers, l) + p.m.Unlock() +} + +func (p *JSONMessagePublisher) SubscribersCount() int { + p.m.RLock() + count := len(p.subscribers) + p.m.RUnlock() + return count +} + +// Unsubscribe closes and removes the specified listener from the list of +// previously registed ones. +// It returns a boolean value indicating if the listener was successfully +// found, closed and unregistered. +func (p *JSONMessagePublisher) Unsubscribe(l JSONMessageListener) bool { + p.m.Lock() + defer p.m.Unlock() + + for i, subscriber := range p.subscribers { + if subscriber == l { + close(l) + p.subscribers = append(p.subscribers[:i], p.subscribers[i+1:]...) + return true + } + } + return false +} + +func (p *JSONMessagePublisher) Publish(m JSONMessage) { + p.m.RLock() + for _, subscriber := range p.subscribers { + // We give each subscriber a 100ms time window to receive the event, + // after which we move to the next. + select { + case subscriber <- m: + case <-time.After(100 * time.Millisecond): + } + } + p.m.RUnlock() +} diff --git a/utils/jsonmessagepublisher_test.go b/utils/jsonmessagepublisher_test.go new file mode 100644 index 0000000000..2e1a820ca3 --- /dev/null +++ b/utils/jsonmessagepublisher_test.go @@ -0,0 +1,73 @@ +package utils + +import ( + "testing" + "time" +) + +func assertSubscribersCount(t *testing.T, q *JSONMessagePublisher, expected int) { + if q.SubscribersCount() != expected { + t.Fatalf("Expected %d registered subscribers, got %d", expected, q.SubscribersCount()) + } +} + +func TestJSONMessagePublisherSubscription(t *testing.T) { + q := NewJSONMessagePublisher() + l1 := make(chan JSONMessage) + l2 := make(chan JSONMessage) + + assertSubscribersCount(t, q, 0) + q.Subscribe(l1) + assertSubscribersCount(t, q, 1) + q.Subscribe(l2) + assertSubscribersCount(t, q, 2) + + q.Unsubscribe(l1) + q.Unsubscribe(l2) + assertSubscribersCount(t, q, 0) +} + +func TestJSONMessagePublisherPublish(t *testing.T) { + q := NewJSONMessagePublisher() + l1 := make(chan JSONMessage) + l2 := make(chan JSONMessage) + + go func() { + for { + select { + case <-l1: + close(l1) + l1 = nil + case <-l2: + close(l2) + l2 = nil + case <-time.After(1 * time.Second): + q.Unsubscribe(l1) + q.Unsubscribe(l2) + t.Fatal("Timeout waiting for broadcasted message") + } + } + }() + + q.Subscribe(l1) + q.Subscribe(l2) + q.Publish(JSONMessage{}) +} + +func TestJSONMessagePublishTimeout(t *testing.T) { + q := NewJSONMessagePublisher() + l := make(chan JSONMessage) + q.Subscribe(l) + + c := make(chan struct{}) + go func() { + q.Publish(JSONMessage{}) + close(c) + }() + + select { + case <-c: + case <-time.After(time.Second): + t.Fatal("Timeout publishing message") + } +}