From c6f0a701418a0d587ade82fe3fc65654b282f63f Mon Sep 17 00:00:00 2001 From: Solomon Hykes Date: Mon, 19 May 2014 13:45:33 -0700 Subject: [PATCH 1/2] Move utilities and interfaces out of `inmem` * `utils.Buffer`: a Sender which stores messages in a slice * `utils.Copy`: beam equivalent of `io.Copy` * `utlis.NopSender`: a Sender which silently discards messages * `utlis.NopReceiver`: a Receiver which always returns EOF --- beam.go | 17 ++++++++++++++++- inmem/inmem.go | 4 ++-- {inmem => utils}/buf.go | 2 +- {inmem => utils}/copy.go | 24 +++++------------------- {inmem => utils}/nop.go | 2 +- utils/stack_test.go | 6 +++--- 6 files changed, 28 insertions(+), 27 deletions(-) rename {inmem => utils}/buf.go (95%) rename {inmem => utils}/copy.go (59%) rename {inmem => utils}/nop.go (96%) diff --git a/beam.go b/beam.go index 3403a1ec95..c0f7b4be9e 100644 --- a/beam.go +++ b/beam.go @@ -1,6 +1,8 @@ package beam -import () +import ( + "errors" +) type Sender interface { Send(msg *Message, mode int) (Receiver, Sender, error) @@ -20,3 +22,16 @@ const ( R = 1 << (32 - 1 - iota) W ) + +type ReceiverFrom interface { + ReceiveFrom(Receiver) (int, error) +} + +type SenderTo interface { + SendTo(Sender) (int, error) +} + +var ( + ErrIncompatibleSender = errors.New("incompatible sender") + ErrIncompatibleReceiver = errors.New("incompatible receiver") +) diff --git a/inmem/inmem.go b/inmem/inmem.go index f401f7ac19..7ca887a016 100644 --- a/inmem/inmem.go +++ b/inmem/inmem.go @@ -178,7 +178,7 @@ func (r *PipeReceiver) SendTo(dst beam.Sender) (int, error) { // If the destination is a PipeSender, we can cheat pdst, ok := dst.(*PipeSender) if !ok { - return 0, ErrIncompatibleSender + return 0, beam.ErrIncompatibleSender } for { pmsg, err := r.p.preceive() @@ -231,7 +231,7 @@ func (w *PipeSender) ReceiveFrom(src beam.Receiver) (int, error) { // If the destination is a PipeReceiver, we can cheat psrc, ok := src.(*PipeReceiver) if !ok { - return 0, ErrIncompatibleReceiver + return 0, beam.ErrIncompatibleReceiver } for { pmsg, err := psrc.p.preceive() diff --git a/inmem/buf.go b/utils/buf.go similarity index 95% rename from inmem/buf.go rename to utils/buf.go index 244953644b..514f9c8f0a 100644 --- a/inmem/buf.go +++ b/utils/buf.go @@ -1,4 +1,4 @@ -package inmem +package utils import ( "github.com/docker/beam" diff --git a/inmem/copy.go b/utils/copy.go similarity index 59% rename from inmem/copy.go rename to utils/copy.go index 8fe3e7b14e..ad41758d46 100644 --- a/inmem/copy.go +++ b/utils/copy.go @@ -1,34 +1,20 @@ -package inmem +package utils import ( - "errors" "github.com/docker/beam" "sync" ) -type ReceiverFrom interface { - ReceiveFrom(beam.Receiver) (int, error) -} - -type SenderTo interface { - SendTo(beam.Sender) (int, error) -} - -var ( - ErrIncompatibleSender = errors.New("incompatible sender") - ErrIncompatibleReceiver = errors.New("incompatible receiver") -) - func Copy(dst beam.Sender, src beam.Receiver) (int, error) { var tasks sync.WaitGroup defer tasks.Wait() - if senderTo, ok := src.(SenderTo); ok { - if n, err := senderTo.SendTo(dst); err != ErrIncompatibleSender { + if senderTo, ok := src.(beam.SenderTo); ok { + if n, err := senderTo.SendTo(dst); err != beam.ErrIncompatibleSender { return n, err } } - if receiverFrom, ok := dst.(ReceiverFrom); ok { - if n, err := receiverFrom.ReceiveFrom(src); err != ErrIncompatibleReceiver { + if receiverFrom, ok := dst.(beam.ReceiverFrom); ok { + if n, err := receiverFrom.ReceiveFrom(src); err != beam.ErrIncompatibleReceiver { return n, err } } diff --git a/inmem/nop.go b/utils/nop.go similarity index 96% rename from inmem/nop.go rename to utils/nop.go index 2d822fe340..943f0ce1c3 100644 --- a/inmem/nop.go +++ b/utils/nop.go @@ -1,4 +1,4 @@ -package inmem +package utils import ( "github.com/docker/beam" diff --git a/utils/stack_test.go b/utils/stack_test.go index 90ed590694..dd1aa9ae9e 100644 --- a/utils/stack_test.go +++ b/utils/stack_test.go @@ -73,7 +73,7 @@ func TestStackLen(t *testing.T) { func TestStackAdd(t *testing.T) { s := NewStackSender() - a := inmem.Buffer{} + a := Buffer{} beforeA := s.Add(&a) // Add on an empty StackSender should return an empty StackSender if beforeA.Len() != 0 { @@ -83,7 +83,7 @@ func TestStackAdd(t *testing.T) { t.Fatalf("%#v", beforeA) } // Add a 2nd element - b := inmem.Buffer{} + b := Buffer{} beforeB := s.Add(&b) if beforeB.Len() != 1 { t.Fatalf("%#v", beforeA) @@ -105,7 +105,7 @@ func TestStackAdd(t *testing.T) { // Misbehaving backends must be removed func TestStackAddBad(t *testing.T) { s := NewStackSender() - buf := inmem.Buffer{} + buf := Buffer{} s.Add(&buf) r, w := inmem.Pipe() s.Add(w) From e7e31cf71bac8e434a84591fbf4836b8a00db8f0 Mon Sep 17 00:00:00 2001 From: Solomon Hykes Date: Mon, 19 May 2014 13:48:49 -0700 Subject: [PATCH 2/2] Hub: a hub for self-registering services using any Beam transport --- utils/hub.go | 53 +++++++++++++++++++++++++++++++++++++++++++++++ utils/hub_test.go | 49 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+) create mode 100644 utils/hub.go create mode 100644 utils/hub_test.go diff --git a/utils/hub.go b/utils/hub.go new file mode 100644 index 0000000000..ce3f09e623 --- /dev/null +++ b/utils/hub.go @@ -0,0 +1,53 @@ +package utils + +import ( + "fmt" + "github.com/docker/beam" + "github.com/docker/beam/inmem" + "sync" +) + +// Hub passes messages to dynamically registered handlers. +type Hub struct { + handlers *StackSender + tasks sync.WaitGroup +} + +func NewHub() *Hub { + return &Hub{ + handlers: NewStackSender(), + } +} + +func (hub *Hub) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) { + if msg.Name == "register" { + if mode&beam.R == 0 { + return nil, nil, fmt.Errorf("register: no return channel") + } + fmt.Printf("[hub] received %v\n", msg) + hYoutr, hYoutw := inmem.Pipe() + hYinr, hYinw := inmem.Pipe() + // Register the new handler on top of the others, + // and get a reference to the previous stack of handlers. + prevHandlers := hub.handlers.Add(hYinw) + // Pass requests from the new handler to the previous chain of handlers + // hYout -> hXin + hub.tasks.Add(1) + go func() { + defer hub.tasks.Done() + Copy(prevHandlers, hYoutr) + hYoutr.Close() + }() + return hYinr, hYoutw, nil + } + fmt.Printf("sending %#v to %d handlers\n", msg, hub.handlers.Len()) + return hub.handlers.Send(msg, mode) +} + +func (hub *Hub) Wait() { + hub.tasks.Wait() +} + +func (hub *Hub) Close() error { + return hub.handlers.Close() +} diff --git a/utils/hub_test.go b/utils/hub_test.go new file mode 100644 index 0000000000..466e7c0cfc --- /dev/null +++ b/utils/hub_test.go @@ -0,0 +1,49 @@ +package utils + +import ( + "github.com/docker/beam" + "github.com/dotcloud/docker/pkg/testutils" + "testing" +) + +func TestHubSendEmpty(t *testing.T) { + hub := NewHub() + // Send to empty hub should silently drop + if r, w, err := hub.Send(&beam.Message{"hello", nil}, beam.R|beam.W); err != nil { + t.Fatal(err) + } else if r != nil { + t.Fatalf("%#v", r) + } else if w != nil { + t.Fatalf("%#v", w) + } +} + +type CountSender int + +func (s *CountSender) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) { + (*s)++ + return nil, nil, nil +} + +func TestHubSendOneHandler(t *testing.T) { + hub := NewHub() + defer hub.Close() + testutils.Timeout(t, func() { + in, _, err := hub.Send(&beam.Message{"register", nil}, beam.R) + if err != nil { + t.Fatal(err) + } + go func() { + if _, _, err := hub.Send(&beam.Message{"hello", nil}, 0); err != nil { + t.Fatal(err) + } + }() + msg, _, _, err := in.Receive(0) + if err != nil { + t.Fatal(err) + } + if msg.Name != "hello" { + t.Fatalf("%#v", msg) + } + }) +}