Remove Hub

Signed-off-by: Solomon Hykes <solomon@docker.com>
This commit is contained in:
Solomon Hykes 2014-06-05 07:00:30 +00:00
parent 91c9eec591
commit 56ee97a572
2 changed files with 0 additions and 178 deletions

View File

@ -1,116 +0,0 @@
package utils
import (
"fmt"
"github.com/docker/libswarm/beam"
"io"
"strings"
"sync"
)
// Hub passes messages to dynamically registered handlers.
type Hub struct {
handlers *StackSender
tasks sync.WaitGroup
l sync.RWMutex
}
func NewHub() *Hub {
return &Hub{
handlers: NewStackSender(),
}
}
func (hub *Hub) Send(msg *beam.Message) (ret beam.Receiver, err error) {
if msg.Name == "register" {
if msg.Ret == nil {
return nil, fmt.Errorf("register: no return channel")
}
fmt.Printf("[hub] received %v\n", msg)
hIn := msg.Ret
if hIn == beam.RetPipe {
ret, hIn = beam.Pipe()
}
// This queue guarantees that the first message received by the handler
// is the "register" response.
hIn = NewQueue(hIn, 1)
// Reply to the handler with a "register" call of our own,
// passing a reference to the previous handler stack.
// This allows the new handler to query previous handlers
// without creating loops.
hOut, err := hIn.Send(&beam.Message{Name: "register", Ret: beam.RetPipe})
if err != nil {
return nil, err
}
// Register the new handler on top of the others,
// and get a reference to the previous stack of handlers.
prevHandlers := hub.handlers.Add(hIn)
go beam.Copy(prevHandlers, hOut)
return ret, nil
}
fmt.Printf("sending %#v to %d handlers\n", msg, hub.handlers.Len())
return hub.handlers.Send(msg)
}
func (hub *Hub) RegisterTask(h func(beam.Receiver, beam.Sender) error) error {
ret, err := hub.Send(&beam.Message{Name: "register", Ret: beam.RetPipe})
if err != nil {
return err
}
ack, err := ret.Receive(beam.Ret)
if err != nil {
return err
}
if ack.Name == "error" {
return fmt.Errorf(strings.Join(ack.Args, ", "))
}
if ack.Name != "register" {
return fmt.Errorf("invalid response: expected verb 'register', got '%v'", ack.Name)
}
go func() {
h(ret, ack.Ret)
ack.Ret.Close()
}()
return nil
}
type Handler func(msg *beam.Message, out beam.Sender) (pass bool, err error)
func (hub *Hub) RegisterName(name string, h Handler) error {
return hub.RegisterTask(func(in beam.Receiver, out beam.Sender) error {
for {
msg, err := in.Receive(beam.Ret)
if err == io.EOF {
break
}
if err != nil {
return err
}
var pass = true
if msg.Name == name || name == "" {
pass, err = h(msg, out)
if err != nil {
if _, err := msg.Ret.Send(&beam.Message{Name: "error", Args: []string{err.Error()}}); err != nil {
return err
}
}
}
if pass {
if _, err := out.Send(msg); err != nil {
return err
}
} else {
msg.Ret.Close()
}
}
return nil
})
}
func (hub *Hub) Wait() {
hub.tasks.Wait()
}
func (hub *Hub) Close() error {
return hub.handlers.Close()
}

View File

@ -1,62 +0,0 @@
package utils
import (
"github.com/docker/libswarm/beam"
"github.com/dotcloud/docker/pkg/testutils"
"testing"
)
func TestHubSendEmpty(t *testing.T) {
hub := NewHub()
// Send to empty hub should silently drop
ret, err := hub.Send(&beam.Message{Name: "hello", Args: nil, Ret: beam.RetPipe})
// Send must not return an error
if err != nil {
t.Fatal(err)
}
// We set beam.R, so a valid return pipe must be returned
if ret == nil {
t.Fatalf("%#v", ret)
}
}
type CountSender int
func (s *CountSender) Send(msg *beam.Message) (beam.Receiver, error) {
(*s)++
return nil, nil
}
func TestHubSendOneHandler(t *testing.T) {
hub := NewHub()
defer hub.Close()
testutils.Timeout(t, func() {
handlerIn, err := hub.Send(&beam.Message{Name: "register", Args: nil, Ret: beam.RetPipe})
if err != nil {
t.Fatal(err)
}
ack, err := handlerIn.Receive(beam.Ret)
if err != nil {
t.Fatal(err)
}
if ack.Name != "register" {
t.Fatalf("%#v", err)
}
handlerOut := ack.Ret
if handlerOut == nil {
t.Fatalf("nil handler out")
}
go func() {
if _, err := hub.Send(&beam.Message{Name: "hello", Args: nil}); err != nil {
t.Fatal(err)
}
}()
msg, err := handlerIn.Receive(0)
if err != nil {
t.Fatal(err)
}
if msg.Name != "hello" {
t.Fatalf("%#v", msg)
}
})
}