Hub: a hub for self-registering services using any Beam transport

This commit is contained in:
Solomon Hykes 2014-05-19 13:48:49 -07:00
parent c6f0a70141
commit e7e31cf71b
2 changed files with 102 additions and 0 deletions

53
utils/hub.go Normal file
View File

@ -0,0 +1,53 @@
package utils
import (
"fmt"
"github.com/docker/beam"
"github.com/docker/beam/inmem"
"sync"
)
// Hub passes messages to dynamically registered handlers.
type Hub struct {
handlers *StackSender
tasks sync.WaitGroup
}
func NewHub() *Hub {
return &Hub{
handlers: NewStackSender(),
}
}
func (hub *Hub) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) {
if msg.Name == "register" {
if mode&beam.R == 0 {
return nil, nil, fmt.Errorf("register: no return channel")
}
fmt.Printf("[hub] received %v\n", msg)
hYoutr, hYoutw := inmem.Pipe()
hYinr, hYinw := inmem.Pipe()
// Register the new handler on top of the others,
// and get a reference to the previous stack of handlers.
prevHandlers := hub.handlers.Add(hYinw)
// Pass requests from the new handler to the previous chain of handlers
// hYout -> hXin
hub.tasks.Add(1)
go func() {
defer hub.tasks.Done()
Copy(prevHandlers, hYoutr)
hYoutr.Close()
}()
return hYinr, hYoutw, nil
}
fmt.Printf("sending %#v to %d handlers\n", msg, hub.handlers.Len())
return hub.handlers.Send(msg, mode)
}
func (hub *Hub) Wait() {
hub.tasks.Wait()
}
func (hub *Hub) Close() error {
return hub.handlers.Close()
}

49
utils/hub_test.go Normal file
View File

@ -0,0 +1,49 @@
package utils
import (
"github.com/docker/beam"
"github.com/dotcloud/docker/pkg/testutils"
"testing"
)
func TestHubSendEmpty(t *testing.T) {
hub := NewHub()
// Send to empty hub should silently drop
if r, w, err := hub.Send(&beam.Message{"hello", nil}, beam.R|beam.W); err != nil {
t.Fatal(err)
} else if r != nil {
t.Fatalf("%#v", r)
} else if w != nil {
t.Fatalf("%#v", w)
}
}
type CountSender int
func (s *CountSender) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) {
(*s)++
return nil, nil, nil
}
func TestHubSendOneHandler(t *testing.T) {
hub := NewHub()
defer hub.Close()
testutils.Timeout(t, func() {
in, _, err := hub.Send(&beam.Message{"register", nil}, beam.R)
if err != nil {
t.Fatal(err)
}
go func() {
if _, _, err := hub.Send(&beam.Message{"hello", nil}, 0); err != nil {
t.Fatal(err)
}
}()
msg, _, _, err := in.Receive(0)
if err != nil {
t.Fatal(err)
}
if msg.Name != "hello" {
t.Fatalf("%#v", msg)
}
})
}