mirror of https://github.com/docker/docs.git
Merge pull request #8 from shykes/pr_out_hub_a_hub_for_self_registering_services_using_any_beam_transport
This commit is contained in:
commit
800bf713eb
17
beam.go
17
beam.go
|
@ -1,6 +1,8 @@
|
||||||
package beam
|
package beam
|
||||||
|
|
||||||
import ()
|
import (
|
||||||
|
"errors"
|
||||||
|
)
|
||||||
|
|
||||||
type Sender interface {
|
type Sender interface {
|
||||||
Send(msg *Message, mode int) (Receiver, Sender, error)
|
Send(msg *Message, mode int) (Receiver, Sender, error)
|
||||||
|
@ -20,3 +22,16 @@ const (
|
||||||
R = 1 << (32 - 1 - iota)
|
R = 1 << (32 - 1 - iota)
|
||||||
W
|
W
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type ReceiverFrom interface {
|
||||||
|
ReceiveFrom(Receiver) (int, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type SenderTo interface {
|
||||||
|
SendTo(Sender) (int, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrIncompatibleSender = errors.New("incompatible sender")
|
||||||
|
ErrIncompatibleReceiver = errors.New("incompatible receiver")
|
||||||
|
)
|
||||||
|
|
|
@ -178,7 +178,7 @@ func (r *PipeReceiver) SendTo(dst beam.Sender) (int, error) {
|
||||||
// If the destination is a PipeSender, we can cheat
|
// If the destination is a PipeSender, we can cheat
|
||||||
pdst, ok := dst.(*PipeSender)
|
pdst, ok := dst.(*PipeSender)
|
||||||
if !ok {
|
if !ok {
|
||||||
return 0, ErrIncompatibleSender
|
return 0, beam.ErrIncompatibleSender
|
||||||
}
|
}
|
||||||
for {
|
for {
|
||||||
pmsg, err := r.p.preceive()
|
pmsg, err := r.p.preceive()
|
||||||
|
@ -231,7 +231,7 @@ func (w *PipeSender) ReceiveFrom(src beam.Receiver) (int, error) {
|
||||||
// If the destination is a PipeReceiver, we can cheat
|
// If the destination is a PipeReceiver, we can cheat
|
||||||
psrc, ok := src.(*PipeReceiver)
|
psrc, ok := src.(*PipeReceiver)
|
||||||
if !ok {
|
if !ok {
|
||||||
return 0, ErrIncompatibleReceiver
|
return 0, beam.ErrIncompatibleReceiver
|
||||||
}
|
}
|
||||||
for {
|
for {
|
||||||
pmsg, err := psrc.p.preceive()
|
pmsg, err := psrc.p.preceive()
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package inmem
|
package utils
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/docker/beam"
|
"github.com/docker/beam"
|
|
@ -1,34 +1,20 @@
|
||||||
package inmem
|
package utils
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"github.com/docker/beam"
|
"github.com/docker/beam"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ReceiverFrom interface {
|
|
||||||
ReceiveFrom(beam.Receiver) (int, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type SenderTo interface {
|
|
||||||
SendTo(beam.Sender) (int, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
ErrIncompatibleSender = errors.New("incompatible sender")
|
|
||||||
ErrIncompatibleReceiver = errors.New("incompatible receiver")
|
|
||||||
)
|
|
||||||
|
|
||||||
func Copy(dst beam.Sender, src beam.Receiver) (int, error) {
|
func Copy(dst beam.Sender, src beam.Receiver) (int, error) {
|
||||||
var tasks sync.WaitGroup
|
var tasks sync.WaitGroup
|
||||||
defer tasks.Wait()
|
defer tasks.Wait()
|
||||||
if senderTo, ok := src.(SenderTo); ok {
|
if senderTo, ok := src.(beam.SenderTo); ok {
|
||||||
if n, err := senderTo.SendTo(dst); err != ErrIncompatibleSender {
|
if n, err := senderTo.SendTo(dst); err != beam.ErrIncompatibleSender {
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if receiverFrom, ok := dst.(ReceiverFrom); ok {
|
if receiverFrom, ok := dst.(beam.ReceiverFrom); ok {
|
||||||
if n, err := receiverFrom.ReceiveFrom(src); err != ErrIncompatibleReceiver {
|
if n, err := receiverFrom.ReceiveFrom(src); err != beam.ErrIncompatibleReceiver {
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -0,0 +1,53 @@
|
||||||
|
package utils
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"github.com/docker/beam"
|
||||||
|
"github.com/docker/beam/inmem"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Hub passes messages to dynamically registered handlers.
|
||||||
|
type Hub struct {
|
||||||
|
handlers *StackSender
|
||||||
|
tasks sync.WaitGroup
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewHub() *Hub {
|
||||||
|
return &Hub{
|
||||||
|
handlers: NewStackSender(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hub *Hub) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) {
|
||||||
|
if msg.Name == "register" {
|
||||||
|
if mode&beam.R == 0 {
|
||||||
|
return nil, nil, fmt.Errorf("register: no return channel")
|
||||||
|
}
|
||||||
|
fmt.Printf("[hub] received %v\n", msg)
|
||||||
|
hYoutr, hYoutw := inmem.Pipe()
|
||||||
|
hYinr, hYinw := inmem.Pipe()
|
||||||
|
// Register the new handler on top of the others,
|
||||||
|
// and get a reference to the previous stack of handlers.
|
||||||
|
prevHandlers := hub.handlers.Add(hYinw)
|
||||||
|
// Pass requests from the new handler to the previous chain of handlers
|
||||||
|
// hYout -> hXin
|
||||||
|
hub.tasks.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer hub.tasks.Done()
|
||||||
|
Copy(prevHandlers, hYoutr)
|
||||||
|
hYoutr.Close()
|
||||||
|
}()
|
||||||
|
return hYinr, hYoutw, nil
|
||||||
|
}
|
||||||
|
fmt.Printf("sending %#v to %d handlers\n", msg, hub.handlers.Len())
|
||||||
|
return hub.handlers.Send(msg, mode)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hub *Hub) Wait() {
|
||||||
|
hub.tasks.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hub *Hub) Close() error {
|
||||||
|
return hub.handlers.Close()
|
||||||
|
}
|
|
@ -0,0 +1,49 @@
|
||||||
|
package utils
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/docker/beam"
|
||||||
|
"github.com/dotcloud/docker/pkg/testutils"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestHubSendEmpty(t *testing.T) {
|
||||||
|
hub := NewHub()
|
||||||
|
// Send to empty hub should silently drop
|
||||||
|
if r, w, err := hub.Send(&beam.Message{"hello", nil}, beam.R|beam.W); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if r != nil {
|
||||||
|
t.Fatalf("%#v", r)
|
||||||
|
} else if w != nil {
|
||||||
|
t.Fatalf("%#v", w)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type CountSender int
|
||||||
|
|
||||||
|
func (s *CountSender) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) {
|
||||||
|
(*s)++
|
||||||
|
return nil, nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHubSendOneHandler(t *testing.T) {
|
||||||
|
hub := NewHub()
|
||||||
|
defer hub.Close()
|
||||||
|
testutils.Timeout(t, func() {
|
||||||
|
in, _, err := hub.Send(&beam.Message{"register", nil}, beam.R)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
if _, _, err := hub.Send(&beam.Message{"hello", nil}, 0); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
msg, _, _, err := in.Receive(0)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if msg.Name != "hello" {
|
||||||
|
t.Fatalf("%#v", msg)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package inmem
|
package utils
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/docker/beam"
|
"github.com/docker/beam"
|
|
@ -73,7 +73,7 @@ func TestStackLen(t *testing.T) {
|
||||||
|
|
||||||
func TestStackAdd(t *testing.T) {
|
func TestStackAdd(t *testing.T) {
|
||||||
s := NewStackSender()
|
s := NewStackSender()
|
||||||
a := inmem.Buffer{}
|
a := Buffer{}
|
||||||
beforeA := s.Add(&a)
|
beforeA := s.Add(&a)
|
||||||
// Add on an empty StackSender should return an empty StackSender
|
// Add on an empty StackSender should return an empty StackSender
|
||||||
if beforeA.Len() != 0 {
|
if beforeA.Len() != 0 {
|
||||||
|
@ -83,7 +83,7 @@ func TestStackAdd(t *testing.T) {
|
||||||
t.Fatalf("%#v", beforeA)
|
t.Fatalf("%#v", beforeA)
|
||||||
}
|
}
|
||||||
// Add a 2nd element
|
// Add a 2nd element
|
||||||
b := inmem.Buffer{}
|
b := Buffer{}
|
||||||
beforeB := s.Add(&b)
|
beforeB := s.Add(&b)
|
||||||
if beforeB.Len() != 1 {
|
if beforeB.Len() != 1 {
|
||||||
t.Fatalf("%#v", beforeA)
|
t.Fatalf("%#v", beforeA)
|
||||||
|
@ -105,7 +105,7 @@ func TestStackAdd(t *testing.T) {
|
||||||
// Misbehaving backends must be removed
|
// Misbehaving backends must be removed
|
||||||
func TestStackAddBad(t *testing.T) {
|
func TestStackAddBad(t *testing.T) {
|
||||||
s := NewStackSender()
|
s := NewStackSender()
|
||||||
buf := inmem.Buffer{}
|
buf := Buffer{}
|
||||||
s.Add(&buf)
|
s.Add(&buf)
|
||||||
r, w := inmem.Pipe()
|
r, w := inmem.Pipe()
|
||||||
s.Add(w)
|
s.Add(w)
|
||||||
|
|
Loading…
Reference in New Issue