diff --git a/beam/utils/hub.go b/beam/utils/hub.go deleted file mode 100644 index 8c5f315089..0000000000 --- a/beam/utils/hub.go +++ /dev/null @@ -1,116 +0,0 @@ -package utils - -import ( - "fmt" - "github.com/docker/libswarm/beam" - "io" - "strings" - "sync" -) - -// Hub passes messages to dynamically registered handlers. -type Hub struct { - handlers *StackSender - tasks sync.WaitGroup - l sync.RWMutex -} - -func NewHub() *Hub { - return &Hub{ - handlers: NewStackSender(), - } -} - -func (hub *Hub) Send(msg *beam.Message) (ret beam.Receiver, err error) { - if msg.Name == "register" { - if msg.Ret == nil { - return nil, fmt.Errorf("register: no return channel") - } - fmt.Printf("[hub] received %v\n", msg) - hIn := msg.Ret - if hIn == beam.RetPipe { - ret, hIn = beam.Pipe() - } - // This queue guarantees that the first message received by the handler - // is the "register" response. - hIn = NewQueue(hIn, 1) - // Reply to the handler with a "register" call of our own, - // passing a reference to the previous handler stack. - // This allows the new handler to query previous handlers - // without creating loops. - hOut, err := hIn.Send(&beam.Message{Name: "register", Ret: beam.RetPipe}) - if err != nil { - return nil, err - } - // Register the new handler on top of the others, - // and get a reference to the previous stack of handlers. - prevHandlers := hub.handlers.Add(hIn) - go beam.Copy(prevHandlers, hOut) - return ret, nil - } - fmt.Printf("sending %#v to %d handlers\n", msg, hub.handlers.Len()) - return hub.handlers.Send(msg) -} - -func (hub *Hub) RegisterTask(h func(beam.Receiver, beam.Sender) error) error { - ret, err := hub.Send(&beam.Message{Name: "register", Ret: beam.RetPipe}) - if err != nil { - return err - } - ack, err := ret.Receive(beam.Ret) - if err != nil { - return err - } - if ack.Name == "error" { - return fmt.Errorf(strings.Join(ack.Args, ", ")) - } - if ack.Name != "register" { - return fmt.Errorf("invalid response: expected verb 'register', got '%v'", ack.Name) - } - go func() { - h(ret, ack.Ret) - ack.Ret.Close() - }() - return nil -} - -type Handler func(msg *beam.Message, out beam.Sender) (pass bool, err error) - -func (hub *Hub) RegisterName(name string, h Handler) error { - return hub.RegisterTask(func(in beam.Receiver, out beam.Sender) error { - for { - msg, err := in.Receive(beam.Ret) - if err == io.EOF { - break - } - if err != nil { - return err - } - var pass = true - if msg.Name == name || name == "" { - pass, err = h(msg, out) - if err != nil { - if _, err := msg.Ret.Send(&beam.Message{Name: "error", Args: []string{err.Error()}}); err != nil { - return err - } - } - } - if pass { - if _, err := out.Send(msg); err != nil { - return err - } - } else { - msg.Ret.Close() - } - } - return nil - }) -} - -func (hub *Hub) Wait() { - hub.tasks.Wait() -} - -func (hub *Hub) Close() error { - return hub.handlers.Close() -} diff --git a/beam/utils/hub_test.go b/beam/utils/hub_test.go deleted file mode 100644 index 925e2894bc..0000000000 --- a/beam/utils/hub_test.go +++ /dev/null @@ -1,62 +0,0 @@ -package utils - -import ( - "github.com/docker/libswarm/beam" - "github.com/dotcloud/docker/pkg/testutils" - "testing" -) - -func TestHubSendEmpty(t *testing.T) { - hub := NewHub() - // Send to empty hub should silently drop - ret, err := hub.Send(&beam.Message{Name: "hello", Args: nil, Ret: beam.RetPipe}) - // Send must not return an error - if err != nil { - t.Fatal(err) - } - // We set beam.R, so a valid return pipe must be returned - if ret == nil { - t.Fatalf("%#v", ret) - } -} - -type CountSender int - -func (s *CountSender) Send(msg *beam.Message) (beam.Receiver, error) { - (*s)++ - return nil, nil -} - -func TestHubSendOneHandler(t *testing.T) { - hub := NewHub() - defer hub.Close() - testutils.Timeout(t, func() { - handlerIn, err := hub.Send(&beam.Message{Name: "register", Args: nil, Ret: beam.RetPipe}) - if err != nil { - t.Fatal(err) - } - ack, err := handlerIn.Receive(beam.Ret) - if err != nil { - t.Fatal(err) - } - if ack.Name != "register" { - t.Fatalf("%#v", err) - } - handlerOut := ack.Ret - if handlerOut == nil { - t.Fatalf("nil handler out") - } - go func() { - if _, err := hub.Send(&beam.Message{Name: "hello", Args: nil}); err != nil { - t.Fatal(err) - } - }() - msg, err := handlerIn.Receive(0) - if err != nil { - t.Fatal(err) - } - if msg.Name != "hello" { - t.Fatalf("%#v", msg) - } - }) -}