From d487ca03e6e897e4bb5f9ba28b268450f059fc0d Mon Sep 17 00:00:00 2001 From: Alexander Morozov Date: Fri, 3 Apr 2015 15:18:12 -0700 Subject: [PATCH] Remove events package Signed-off-by: Alexander Morozov --- events/events.go | 231 ------------------------------------------ events/events_test.go | 154 ---------------------------- 2 files changed, 385 deletions(-) delete mode 100644 events/events.go delete mode 100644 events/events_test.go diff --git a/events/events.go b/events/events.go deleted file mode 100644 index 93ea9a0391..0000000000 --- a/events/events.go +++ /dev/null @@ -1,231 +0,0 @@ -package events - -import ( - "bytes" - "encoding/json" - "fmt" - "io" - "strings" - "sync" - "time" - - "github.com/docker/docker/engine" - "github.com/docker/docker/pkg/jsonmessage" - "github.com/docker/docker/pkg/parsers/filters" -) - -const eventsLimit = 64 - -type listener chan<- *jsonmessage.JSONMessage - -type Events struct { - mu sync.RWMutex - events []*jsonmessage.JSONMessage - subscribers []listener -} - -func New() *Events { - return &Events{ - events: make([]*jsonmessage.JSONMessage, 0, eventsLimit), - } -} - -// Install installs events public api in docker engine -func (e *Events) Install(eng *engine.Engine) error { - // Here you should describe public interface - jobs := map[string]engine.Handler{ - "events": e.Get, - "log": e.Log, - "subscribers_count": e.SubscribersCount, - } - for name, job := range jobs { - if err := eng.Register(name, job); err != nil { - return err - } - } - return nil -} - -func (e *Events) Get(job *engine.Job) error { - var ( - since = job.GetenvInt64("since") - until = job.GetenvInt64("until") - timeout = time.NewTimer(time.Unix(until, 0).Sub(time.Now())) - ) - - eventFilters, err := filters.FromParam(job.Getenv("filters")) - if err != nil { - return err - } - - // If no until, disable timeout - if job.Getenv("until") == "" { - timeout.Stop() - } - - listener := make(chan *jsonmessage.JSONMessage) - e.subscribe(listener) - defer e.unsubscribe(listener) - - job.Stdout.Write(nil) - - // Resend every event in the [since, until] time interval. - if job.Getenv("since") != "" { - if err := e.writeCurrent(job, since, until, eventFilters); err != nil { - return err - } - } - - for { - select { - case event, ok := <-listener: - if !ok { - return nil - } - if err := writeEvent(job, event, eventFilters); err != nil { - return err - } - case <-timeout.C: - return nil - } - } -} - -func (e *Events) Log(job *engine.Job) error { - if len(job.Args) != 3 { - return fmt.Errorf("usage: %s ACTION ID FROM", job.Name) - } - // not waiting for receivers - go e.log(job.Args[0], job.Args[1], job.Args[2]) - return nil -} - -func (e *Events) SubscribersCount(job *engine.Job) error { - ret := &engine.Env{} - ret.SetInt("count", e.subscribersCount()) - ret.WriteTo(job.Stdout) - return nil -} - -func writeEvent(job *engine.Job, event *jsonmessage.JSONMessage, eventFilters filters.Args) error { - isFiltered := func(field string, filter []string) bool { - if len(filter) == 0 { - return false - } - for _, v := range filter { - if v == field { - return false - } - if strings.Contains(field, ":") { - image := strings.Split(field, ":") - if image[0] == v { - return false - } - } - } - return true - } - - //incoming container filter can be name,id or partial id, convert and replace as a full container id - for i, cn := range eventFilters["container"] { - eventFilters["container"][i] = GetContainerId(job.Eng, cn) - } - - if isFiltered(event.Status, eventFilters["event"]) || isFiltered(event.From, eventFilters["image"]) || - isFiltered(event.ID, eventFilters["container"]) { - return nil - } - - // When sending an event JSON serialization errors are ignored, but all - // other errors lead to the eviction of the listener. - if b, err := json.Marshal(event); err == nil { - if _, err = job.Stdout.Write(b); err != nil { - return err - } - } - return nil -} - -func (e *Events) writeCurrent(job *engine.Job, since, until int64, eventFilters filters.Args) error { - e.mu.RLock() - for _, event := range e.events { - if event.Time >= since && (event.Time <= until || until == 0) { - if err := writeEvent(job, event, eventFilters); err != nil { - e.mu.RUnlock() - return err - } - } - } - e.mu.RUnlock() - return nil -} - -func (e *Events) subscribersCount() int { - e.mu.RLock() - c := len(e.subscribers) - e.mu.RUnlock() - return c -} - -func (e *Events) log(action, id, from string) { - e.mu.Lock() - now := time.Now().UTC().Unix() - jm := &jsonmessage.JSONMessage{Status: action, ID: id, From: from, Time: now} - if len(e.events) == cap(e.events) { - // discard oldest event - copy(e.events, e.events[1:]) - e.events[len(e.events)-1] = jm - } else { - e.events = append(e.events, jm) - } - for _, s := range e.subscribers { - // We give each subscriber a 100ms time window to receive the event, - // after which we move to the next. - select { - case s <- jm: - case <-time.After(100 * time.Millisecond): - } - } - e.mu.Unlock() -} - -func (e *Events) subscribe(l listener) { - e.mu.Lock() - e.subscribers = append(e.subscribers, l) - e.mu.Unlock() -} - -// unsubscribe closes and removes the specified listener from the list of -// previously registed ones. -// It returns a boolean value indicating if the listener was successfully -// found, closed and unregistered. -func (e *Events) unsubscribe(l listener) bool { - e.mu.Lock() - for i, subscriber := range e.subscribers { - if subscriber == l { - close(l) - e.subscribers = append(e.subscribers[:i], e.subscribers[i+1:]...) - e.mu.Unlock() - return true - } - } - e.mu.Unlock() - return false -} - -func GetContainerId(eng *engine.Engine, name string) string { - var buf bytes.Buffer - job := eng.Job("container_inspect", name) - - var outStream io.Writer - - outStream = &buf - job.Stdout.Set(outStream) - - if err := job.Run(); err != nil { - return "" - } - var out struct{ ID string } - json.NewDecoder(&buf).Decode(&out) - return out.ID -} diff --git a/events/events_test.go b/events/events_test.go deleted file mode 100644 index a232576fe5..0000000000 --- a/events/events_test.go +++ /dev/null @@ -1,154 +0,0 @@ -package events - -import ( - "bytes" - "encoding/json" - "fmt" - "io" - "testing" - "time" - - "github.com/docker/docker/engine" - "github.com/docker/docker/pkg/jsonmessage" -) - -func TestEventsPublish(t *testing.T) { - e := New() - l1 := make(chan *jsonmessage.JSONMessage) - l2 := make(chan *jsonmessage.JSONMessage) - e.subscribe(l1) - e.subscribe(l2) - count := e.subscribersCount() - if count != 2 { - t.Fatalf("Must be 2 subscribers, got %d", count) - } - go e.log("test", "cont", "image") - select { - case msg := <-l1: - if len(e.events) != 1 { - t.Fatalf("Must be only one event, got %d", len(e.events)) - } - if msg.Status != "test" { - t.Fatalf("Status should be test, got %s", msg.Status) - } - if msg.ID != "cont" { - t.Fatalf("ID should be cont, got %s", msg.ID) - } - if msg.From != "image" { - t.Fatalf("From should be image, got %s", msg.From) - } - case <-time.After(1 * time.Second): - t.Fatal("Timeout waiting for broadcasted message") - } - select { - case msg := <-l2: - if len(e.events) != 1 { - t.Fatalf("Must be only one event, got %d", len(e.events)) - } - if msg.Status != "test" { - t.Fatalf("Status should be test, got %s", msg.Status) - } - if msg.ID != "cont" { - t.Fatalf("ID should be cont, got %s", msg.ID) - } - if msg.From != "image" { - t.Fatalf("From should be image, got %s", msg.From) - } - case <-time.After(1 * time.Second): - t.Fatal("Timeout waiting for broadcasted message") - } -} - -func TestEventsPublishTimeout(t *testing.T) { - e := New() - l := make(chan *jsonmessage.JSONMessage) - e.subscribe(l) - - c := make(chan struct{}) - go func() { - e.log("test", "cont", "image") - close(c) - }() - - select { - case <-c: - case <-time.After(time.Second): - t.Fatal("Timeout publishing message") - } -} - -func TestLogEvents(t *testing.T) { - e := New() - eng := engine.New() - if err := e.Install(eng); err != nil { - t.Fatal(err) - } - - for i := 0; i < eventsLimit+16; i++ { - action := fmt.Sprintf("action_%d", i) - id := fmt.Sprintf("cont_%d", i) - from := fmt.Sprintf("image_%d", i) - job := eng.Job("log", action, id, from) - if err := job.Run(); err != nil { - t.Fatal(err) - } - } - time.Sleep(50 * time.Millisecond) - if len(e.events) != eventsLimit { - t.Fatalf("Must be %d events, got %d", eventsLimit, len(e.events)) - } - - job := eng.Job("events") - job.SetenvInt64("since", 1) - job.SetenvInt64("until", time.Now().Unix()) - buf := bytes.NewBuffer(nil) - job.Stdout.Add(buf) - if err := job.Run(); err != nil { - t.Fatal(err) - } - buf = bytes.NewBuffer(buf.Bytes()) - dec := json.NewDecoder(buf) - var msgs []jsonmessage.JSONMessage - for { - var jm jsonmessage.JSONMessage - if err := dec.Decode(&jm); err != nil { - if err == io.EOF { - break - } - t.Fatal(err) - } - msgs = append(msgs, jm) - } - if len(msgs) != eventsLimit { - t.Fatalf("Must be %d events, got %d", eventsLimit, len(msgs)) - } - first := msgs[0] - if first.Status != "action_16" { - t.Fatalf("First action is %s, must be action_15", first.Status) - } - last := msgs[len(msgs)-1] - if last.Status != "action_79" { - t.Fatalf("First action is %s, must be action_79", first.Status) - } -} - -func TestEventsCountJob(t *testing.T) { - e := New() - eng := engine.New() - if err := e.Install(eng); err != nil { - t.Fatal(err) - } - l1 := make(chan *jsonmessage.JSONMessage) - l2 := make(chan *jsonmessage.JSONMessage) - e.subscribe(l1) - e.subscribe(l2) - job := eng.Job("subscribers_count") - env, _ := job.Stdout.AddEnv() - if err := job.Run(); err != nil { - t.Fatal(err) - } - count := env.GetInt("count") - if count != 2 { - t.Fatalf("There must be 2 subscribers, got %d", count) - } -}