mirror of https://github.com/docker/docs.git
Remove events package
Signed-off-by: Alexander Morozov <lk4d4@docker.com>
This commit is contained in:
parent
c9eb37f975
commit
d487ca03e6
231
events/events.go
231
events/events.go
|
@ -1,231 +0,0 @@
|
|||
package events
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/docker/docker/engine"
|
||||
"github.com/docker/docker/pkg/jsonmessage"
|
||||
"github.com/docker/docker/pkg/parsers/filters"
|
||||
)
|
||||
|
||||
const eventsLimit = 64
|
||||
|
||||
type listener chan<- *jsonmessage.JSONMessage
|
||||
|
||||
type Events struct {
|
||||
mu sync.RWMutex
|
||||
events []*jsonmessage.JSONMessage
|
||||
subscribers []listener
|
||||
}
|
||||
|
||||
func New() *Events {
|
||||
return &Events{
|
||||
events: make([]*jsonmessage.JSONMessage, 0, eventsLimit),
|
||||
}
|
||||
}
|
||||
|
||||
// Install installs events public api in docker engine
|
||||
func (e *Events) Install(eng *engine.Engine) error {
|
||||
// Here you should describe public interface
|
||||
jobs := map[string]engine.Handler{
|
||||
"events": e.Get,
|
||||
"log": e.Log,
|
||||
"subscribers_count": e.SubscribersCount,
|
||||
}
|
||||
for name, job := range jobs {
|
||||
if err := eng.Register(name, job); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Events) Get(job *engine.Job) error {
|
||||
var (
|
||||
since = job.GetenvInt64("since")
|
||||
until = job.GetenvInt64("until")
|
||||
timeout = time.NewTimer(time.Unix(until, 0).Sub(time.Now()))
|
||||
)
|
||||
|
||||
eventFilters, err := filters.FromParam(job.Getenv("filters"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// If no until, disable timeout
|
||||
if job.Getenv("until") == "" {
|
||||
timeout.Stop()
|
||||
}
|
||||
|
||||
listener := make(chan *jsonmessage.JSONMessage)
|
||||
e.subscribe(listener)
|
||||
defer e.unsubscribe(listener)
|
||||
|
||||
job.Stdout.Write(nil)
|
||||
|
||||
// Resend every event in the [since, until] time interval.
|
||||
if job.Getenv("since") != "" {
|
||||
if err := e.writeCurrent(job, since, until, eventFilters); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case event, ok := <-listener:
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
if err := writeEvent(job, event, eventFilters); err != nil {
|
||||
return err
|
||||
}
|
||||
case <-timeout.C:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Events) Log(job *engine.Job) error {
|
||||
if len(job.Args) != 3 {
|
||||
return fmt.Errorf("usage: %s ACTION ID FROM", job.Name)
|
||||
}
|
||||
// not waiting for receivers
|
||||
go e.log(job.Args[0], job.Args[1], job.Args[2])
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Events) SubscribersCount(job *engine.Job) error {
|
||||
ret := &engine.Env{}
|
||||
ret.SetInt("count", e.subscribersCount())
|
||||
ret.WriteTo(job.Stdout)
|
||||
return nil
|
||||
}
|
||||
|
||||
func writeEvent(job *engine.Job, event *jsonmessage.JSONMessage, eventFilters filters.Args) error {
|
||||
isFiltered := func(field string, filter []string) bool {
|
||||
if len(filter) == 0 {
|
||||
return false
|
||||
}
|
||||
for _, v := range filter {
|
||||
if v == field {
|
||||
return false
|
||||
}
|
||||
if strings.Contains(field, ":") {
|
||||
image := strings.Split(field, ":")
|
||||
if image[0] == v {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
//incoming container filter can be name,id or partial id, convert and replace as a full container id
|
||||
for i, cn := range eventFilters["container"] {
|
||||
eventFilters["container"][i] = GetContainerId(job.Eng, cn)
|
||||
}
|
||||
|
||||
if isFiltered(event.Status, eventFilters["event"]) || isFiltered(event.From, eventFilters["image"]) ||
|
||||
isFiltered(event.ID, eventFilters["container"]) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// When sending an event JSON serialization errors are ignored, but all
|
||||
// other errors lead to the eviction of the listener.
|
||||
if b, err := json.Marshal(event); err == nil {
|
||||
if _, err = job.Stdout.Write(b); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Events) writeCurrent(job *engine.Job, since, until int64, eventFilters filters.Args) error {
|
||||
e.mu.RLock()
|
||||
for _, event := range e.events {
|
||||
if event.Time >= since && (event.Time <= until || until == 0) {
|
||||
if err := writeEvent(job, event, eventFilters); err != nil {
|
||||
e.mu.RUnlock()
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
e.mu.RUnlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Events) subscribersCount() int {
|
||||
e.mu.RLock()
|
||||
c := len(e.subscribers)
|
||||
e.mu.RUnlock()
|
||||
return c
|
||||
}
|
||||
|
||||
func (e *Events) log(action, id, from string) {
|
||||
e.mu.Lock()
|
||||
now := time.Now().UTC().Unix()
|
||||
jm := &jsonmessage.JSONMessage{Status: action, ID: id, From: from, Time: now}
|
||||
if len(e.events) == cap(e.events) {
|
||||
// discard oldest event
|
||||
copy(e.events, e.events[1:])
|
||||
e.events[len(e.events)-1] = jm
|
||||
} else {
|
||||
e.events = append(e.events, jm)
|
||||
}
|
||||
for _, s := range e.subscribers {
|
||||
// We give each subscriber a 100ms time window to receive the event,
|
||||
// after which we move to the next.
|
||||
select {
|
||||
case s <- jm:
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
}
|
||||
}
|
||||
e.mu.Unlock()
|
||||
}
|
||||
|
||||
func (e *Events) subscribe(l listener) {
|
||||
e.mu.Lock()
|
||||
e.subscribers = append(e.subscribers, l)
|
||||
e.mu.Unlock()
|
||||
}
|
||||
|
||||
// unsubscribe closes and removes the specified listener from the list of
|
||||
// previously registed ones.
|
||||
// It returns a boolean value indicating if the listener was successfully
|
||||
// found, closed and unregistered.
|
||||
func (e *Events) unsubscribe(l listener) bool {
|
||||
e.mu.Lock()
|
||||
for i, subscriber := range e.subscribers {
|
||||
if subscriber == l {
|
||||
close(l)
|
||||
e.subscribers = append(e.subscribers[:i], e.subscribers[i+1:]...)
|
||||
e.mu.Unlock()
|
||||
return true
|
||||
}
|
||||
}
|
||||
e.mu.Unlock()
|
||||
return false
|
||||
}
|
||||
|
||||
func GetContainerId(eng *engine.Engine, name string) string {
|
||||
var buf bytes.Buffer
|
||||
job := eng.Job("container_inspect", name)
|
||||
|
||||
var outStream io.Writer
|
||||
|
||||
outStream = &buf
|
||||
job.Stdout.Set(outStream)
|
||||
|
||||
if err := job.Run(); err != nil {
|
||||
return ""
|
||||
}
|
||||
var out struct{ ID string }
|
||||
json.NewDecoder(&buf).Decode(&out)
|
||||
return out.ID
|
||||
}
|
|
@ -1,154 +0,0 @@
|
|||
package events
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/docker/docker/engine"
|
||||
"github.com/docker/docker/pkg/jsonmessage"
|
||||
)
|
||||
|
||||
func TestEventsPublish(t *testing.T) {
|
||||
e := New()
|
||||
l1 := make(chan *jsonmessage.JSONMessage)
|
||||
l2 := make(chan *jsonmessage.JSONMessage)
|
||||
e.subscribe(l1)
|
||||
e.subscribe(l2)
|
||||
count := e.subscribersCount()
|
||||
if count != 2 {
|
||||
t.Fatalf("Must be 2 subscribers, got %d", count)
|
||||
}
|
||||
go e.log("test", "cont", "image")
|
||||
select {
|
||||
case msg := <-l1:
|
||||
if len(e.events) != 1 {
|
||||
t.Fatalf("Must be only one event, got %d", len(e.events))
|
||||
}
|
||||
if msg.Status != "test" {
|
||||
t.Fatalf("Status should be test, got %s", msg.Status)
|
||||
}
|
||||
if msg.ID != "cont" {
|
||||
t.Fatalf("ID should be cont, got %s", msg.ID)
|
||||
}
|
||||
if msg.From != "image" {
|
||||
t.Fatalf("From should be image, got %s", msg.From)
|
||||
}
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatal("Timeout waiting for broadcasted message")
|
||||
}
|
||||
select {
|
||||
case msg := <-l2:
|
||||
if len(e.events) != 1 {
|
||||
t.Fatalf("Must be only one event, got %d", len(e.events))
|
||||
}
|
||||
if msg.Status != "test" {
|
||||
t.Fatalf("Status should be test, got %s", msg.Status)
|
||||
}
|
||||
if msg.ID != "cont" {
|
||||
t.Fatalf("ID should be cont, got %s", msg.ID)
|
||||
}
|
||||
if msg.From != "image" {
|
||||
t.Fatalf("From should be image, got %s", msg.From)
|
||||
}
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatal("Timeout waiting for broadcasted message")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventsPublishTimeout(t *testing.T) {
|
||||
e := New()
|
||||
l := make(chan *jsonmessage.JSONMessage)
|
||||
e.subscribe(l)
|
||||
|
||||
c := make(chan struct{})
|
||||
go func() {
|
||||
e.log("test", "cont", "image")
|
||||
close(c)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-c:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("Timeout publishing message")
|
||||
}
|
||||
}
|
||||
|
||||
func TestLogEvents(t *testing.T) {
|
||||
e := New()
|
||||
eng := engine.New()
|
||||
if err := e.Install(eng); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for i := 0; i < eventsLimit+16; i++ {
|
||||
action := fmt.Sprintf("action_%d", i)
|
||||
id := fmt.Sprintf("cont_%d", i)
|
||||
from := fmt.Sprintf("image_%d", i)
|
||||
job := eng.Job("log", action, id, from)
|
||||
if err := job.Run(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
if len(e.events) != eventsLimit {
|
||||
t.Fatalf("Must be %d events, got %d", eventsLimit, len(e.events))
|
||||
}
|
||||
|
||||
job := eng.Job("events")
|
||||
job.SetenvInt64("since", 1)
|
||||
job.SetenvInt64("until", time.Now().Unix())
|
||||
buf := bytes.NewBuffer(nil)
|
||||
job.Stdout.Add(buf)
|
||||
if err := job.Run(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
buf = bytes.NewBuffer(buf.Bytes())
|
||||
dec := json.NewDecoder(buf)
|
||||
var msgs []jsonmessage.JSONMessage
|
||||
for {
|
||||
var jm jsonmessage.JSONMessage
|
||||
if err := dec.Decode(&jm); err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
t.Fatal(err)
|
||||
}
|
||||
msgs = append(msgs, jm)
|
||||
}
|
||||
if len(msgs) != eventsLimit {
|
||||
t.Fatalf("Must be %d events, got %d", eventsLimit, len(msgs))
|
||||
}
|
||||
first := msgs[0]
|
||||
if first.Status != "action_16" {
|
||||
t.Fatalf("First action is %s, must be action_15", first.Status)
|
||||
}
|
||||
last := msgs[len(msgs)-1]
|
||||
if last.Status != "action_79" {
|
||||
t.Fatalf("First action is %s, must be action_79", first.Status)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventsCountJob(t *testing.T) {
|
||||
e := New()
|
||||
eng := engine.New()
|
||||
if err := e.Install(eng); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
l1 := make(chan *jsonmessage.JSONMessage)
|
||||
l2 := make(chan *jsonmessage.JSONMessage)
|
||||
e.subscribe(l1)
|
||||
e.subscribe(l2)
|
||||
job := eng.Job("subscribers_count")
|
||||
env, _ := job.Stdout.AddEnv()
|
||||
if err := job.Run(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
count := env.GetInt("count")
|
||||
if count != 2 {
|
||||
t.Fatalf("There must be 2 subscribers, got %d", count)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue