From 58f7d34f0feff966f92b82ac6d806ae328b56a05 Mon Sep 17 00:00:00 2001 From: Solomon Hykes Date: Mon, 2 Jun 2014 23:55:15 +0000 Subject: [PATCH] Move Pipe() into the beam package Docker-DCO-1.1-Signed-off-by: Solomon Hykes (github: shykes) --- beam/{inmem => }/inmem.go | 31 +++++++++++++++---------------- beam/{inmem => }/inmem_test.go | 21 ++++++++++----------- beam/utils/hub.go | 3 +-- beam/utils/queue.go | 5 ++--- beam/utils/queue_test.go | 3 +-- beam/utils/stack_test.go | 5 ++--- 6 files changed, 31 insertions(+), 37 deletions(-) rename beam/{inmem => }/inmem.go (78%) rename beam/{inmem => }/inmem_test.go (76%) diff --git a/beam/inmem/inmem.go b/beam/inmem.go similarity index 78% rename from beam/inmem/inmem.go rename to beam/inmem.go index 6d6a2a9d53..20593fb715 100644 --- a/beam/inmem/inmem.go +++ b/beam/inmem.go @@ -1,7 +1,6 @@ -package inmem +package beam import ( - "github.com/docker/libswarm/beam" "io" "sync" ) @@ -23,10 +22,10 @@ type pipe struct { wl sync.Mutex rerr error // if reader closed, error to give writes werr error // if writer closed, error to give reads - msg *beam.Message + msg *Message } -func (p *pipe) psend(msg *beam.Message) error { +func (p *pipe) psend(msg *Message) error { var err error // One writer at a time. p.wl.Lock() @@ -53,16 +52,16 @@ func (p *pipe) psend(msg *beam.Message) error { return err } -func (p *pipe) send(msg *beam.Message) (ret beam.Receiver, err error) { +func (p *pipe) send(msg *Message) (ret Receiver, err error) { // Prepare nested Receiver if requested - if beam.RetPipe.Equals(msg.Ret) { + if RetPipe.Equals(msg.Ret) { ret, msg.Ret = Pipe() } err = p.psend(msg) return } -func (p *pipe) preceive() (*beam.Message, error) { +func (p *pipe) preceive() (*Message, error) { p.rl.Lock() defer p.rl.Unlock() @@ -86,15 +85,15 @@ func (p *pipe) preceive() (*beam.Message, error) { return msg, nil } -func (p *pipe) receive(mode int) (*beam.Message, error) { +func (p *pipe) receive(mode int) (*Message, error) { msg, err := p.preceive() if err != nil { return nil, err } if msg.Ret == nil { - msg.Ret = beam.NopSender{} + msg.Ret = NopSender{} } - if mode&beam.Ret == 0 { + if mode&Ret == 0 { msg.Ret.Close() } return msg, nil @@ -128,16 +127,16 @@ type PipeReceiver struct { p *pipe } -func (r *PipeReceiver) Receive(mode int) (*beam.Message, error) { +func (r *PipeReceiver) Receive(mode int) (*Message, error) { return r.p.receive(mode) } -func (r *PipeReceiver) SendTo(dst beam.Sender) (int, error) { +func (r *PipeReceiver) SendTo(dst Sender) (int, error) { var n int // If the destination is a PipeSender, we can cheat pdst, ok := dst.(*PipeSender) if !ok { - return 0, beam.ErrIncompatibleSender + return 0, ErrIncompatibleSender } for { pmsg, err := r.p.preceive() @@ -170,16 +169,16 @@ type PipeSender struct { p *pipe } -func (w *PipeSender) Send(msg *beam.Message) (beam.Receiver, error) { +func (w *PipeSender) Send(msg *Message) (Receiver, error) { return w.p.send(msg) } -func (w *PipeSender) ReceiveFrom(src beam.Receiver) (int, error) { +func (w *PipeSender) ReceiveFrom(src Receiver) (int, error) { var n int // If the destination is a PipeReceiver, we can cheat psrc, ok := src.(*PipeReceiver) if !ok { - return 0, beam.ErrIncompatibleReceiver + return 0, ErrIncompatibleReceiver } for { pmsg, err := psrc.p.preceive() diff --git a/beam/inmem/inmem_test.go b/beam/inmem_test.go similarity index 76% rename from beam/inmem/inmem_test.go rename to beam/inmem_test.go index 9a67acab4b..d1574c37ea 100644 --- a/beam/inmem/inmem_test.go +++ b/beam/inmem_test.go @@ -1,21 +1,20 @@ -package inmem +package beam import ( "fmt" - "github.com/docker/libswarm/beam" "github.com/dotcloud/docker/pkg/testutils" "io/ioutil" "os" "testing" ) -func TestRetPipe(t *testing.T) { +func TestInmemRetPipe(t *testing.T) { r, w := Pipe() defer r.Close() defer w.Close() wait := make(chan struct{}) go func() { - ret, err := w.Send(&beam.Message{Name: "hello", Ret: beam.RetPipe}) + ret, err := w.Send(&Message{Name: "hello", Ret: RetPipe}) if err != nil { t.Fatal(err) } @@ -28,11 +27,11 @@ func TestRetPipe(t *testing.T) { } close(wait) }() - msg, err := r.Receive(beam.Ret) + msg, err := r.Receive(Ret) if err != nil { t.Fatal(err) } - if _, err := msg.Ret.Send(&beam.Message{Name: "this better not crash"}); err != nil { + if _, err := msg.Ret.Send(&Message{Name: "this better not crash"}); err != nil { t.Fatal(err) } <-wait @@ -55,7 +54,7 @@ func TestSimpleSend(t *testing.T) { t.Fatalf("%#v", *msg) } }() - if _, err := w.Send(&beam.Message{Name: "print", Args: []string{"hello world"}}); err != nil { + if _, err := w.Send(&Message{Name: "print", Args: []string{"hello world"}}); err != nil { t.Fatal(err) } }) @@ -68,7 +67,7 @@ func TestSendReply(t *testing.T) { testutils.Timeout(t, func() { // Send go func() { - ret, err := w.Send(&beam.Message{Args: []string{"this is the request"}, Ret: beam.RetPipe}) + ret, err := w.Send(&Message{Args: []string{"this is the request"}, Ret: RetPipe}) if err != nil { t.Fatal(err) } @@ -85,7 +84,7 @@ func TestSendReply(t *testing.T) { } }() // Receive a message with mode=Ret - msg, err := r.Receive(beam.Ret) + msg, err := r.Receive(Ret) if err != nil { t.Fatal(err) } @@ -96,7 +95,7 @@ func TestSendReply(t *testing.T) { t.Fatalf("%#v", msg) } // Send a reply - _, err = msg.Ret.Send(&beam.Message{Args: []string{"this is the reply"}}) + _, err = msg.Ret.Send(&Message{Args: []string{"this is the reply"}}) if err != nil { t.Fatal(err) } @@ -117,7 +116,7 @@ func TestSendFile(t *testing.T) { tmp.Seek(0, 0) testutils.Timeout(t, func() { go func() { - _, err := w.Send(&beam.Message{Name: "file", Args: []string{"path=" + tmp.Name()}, Att: tmp}) + _, err := w.Send(&Message{Name: "file", Args: []string{"path=" + tmp.Name()}, Att: tmp}) if err != nil { t.Fatal(err) } diff --git a/beam/utils/hub.go b/beam/utils/hub.go index 42cc75cf8a..8c5f315089 100644 --- a/beam/utils/hub.go +++ b/beam/utils/hub.go @@ -3,7 +3,6 @@ package utils import ( "fmt" "github.com/docker/libswarm/beam" - "github.com/docker/libswarm/beam/inmem" "io" "strings" "sync" @@ -30,7 +29,7 @@ func (hub *Hub) Send(msg *beam.Message) (ret beam.Receiver, err error) { fmt.Printf("[hub] received %v\n", msg) hIn := msg.Ret if hIn == beam.RetPipe { - ret, hIn = inmem.Pipe() + ret, hIn = beam.Pipe() } // This queue guarantees that the first message received by the handler // is the "register" response. diff --git a/beam/utils/queue.go b/beam/utils/queue.go index 335bde1fe9..9da2fad953 100644 --- a/beam/utils/queue.go +++ b/beam/utils/queue.go @@ -2,17 +2,16 @@ package utils import ( "github.com/docker/libswarm/beam" - "github.com/docker/libswarm/beam/inmem" ) type Queue struct { - *inmem.PipeSender + *beam.PipeSender dst beam.Sender ch chan *beam.Message } func NewQueue(dst beam.Sender, size int) *Queue { - r, w := inmem.Pipe() + r, w := beam.Pipe() q := &Queue{ PipeSender: w, dst: dst, diff --git a/beam/utils/queue_test.go b/beam/utils/queue_test.go index f7ed252edd..dfc75a19e0 100644 --- a/beam/utils/queue_test.go +++ b/beam/utils/queue_test.go @@ -2,12 +2,11 @@ package utils import ( "github.com/docker/libswarm/beam" - "github.com/docker/libswarm/beam/inmem" "testing" ) func TestSendRet(t *testing.T) { - r, w := inmem.Pipe() + r, w := beam.Pipe() defer r.Close() defer w.Close() q := NewQueue(w, 1) diff --git a/beam/utils/stack_test.go b/beam/utils/stack_test.go index 7cc53eb466..03189a9f4b 100644 --- a/beam/utils/stack_test.go +++ b/beam/utils/stack_test.go @@ -2,7 +2,6 @@ package utils import ( "github.com/docker/libswarm/beam" - "github.com/docker/libswarm/beam/inmem" "github.com/docker/libswarm/beam/unix" "github.com/dotcloud/docker/pkg/testutils" "strings" @@ -10,7 +9,7 @@ import ( ) func TestStackWithPipe(t *testing.T) { - r, w := inmem.Pipe() + r, w := beam.Pipe() defer r.Close() defer w.Close() s := NewStackSender() @@ -107,7 +106,7 @@ func TestStackAddBad(t *testing.T) { s := NewStackSender() buf := Buffer{} s.Add(&buf) - r, w := inmem.Pipe() + r, w := beam.Pipe() s.Add(w) if s.Len() != 2 { t.Fatalf("%#v", s)