Move utilities and interfaces out of `inmem`

* `utils.Buffer`: a Sender which stores messages in a slice
* `utils.Copy`: beam equivalent of `io.Copy`
* `utlis.NopSender`: a Sender which silently discards messages
* `utlis.NopReceiver`: a Receiver which always returns EOF
This commit is contained in:
Solomon Hykes 2014-05-19 13:45:33 -07:00
parent df3394c350
commit c6f0a70141
6 changed files with 28 additions and 27 deletions

17
beam.go
View File

@ -1,6 +1,8 @@
package beam package beam
import () import (
"errors"
)
type Sender interface { type Sender interface {
Send(msg *Message, mode int) (Receiver, Sender, error) Send(msg *Message, mode int) (Receiver, Sender, error)
@ -20,3 +22,16 @@ const (
R = 1 << (32 - 1 - iota) R = 1 << (32 - 1 - iota)
W W
) )
type ReceiverFrom interface {
ReceiveFrom(Receiver) (int, error)
}
type SenderTo interface {
SendTo(Sender) (int, error)
}
var (
ErrIncompatibleSender = errors.New("incompatible sender")
ErrIncompatibleReceiver = errors.New("incompatible receiver")
)

View File

@ -178,7 +178,7 @@ func (r *PipeReceiver) SendTo(dst beam.Sender) (int, error) {
// If the destination is a PipeSender, we can cheat // If the destination is a PipeSender, we can cheat
pdst, ok := dst.(*PipeSender) pdst, ok := dst.(*PipeSender)
if !ok { if !ok {
return 0, ErrIncompatibleSender return 0, beam.ErrIncompatibleSender
} }
for { for {
pmsg, err := r.p.preceive() pmsg, err := r.p.preceive()
@ -231,7 +231,7 @@ func (w *PipeSender) ReceiveFrom(src beam.Receiver) (int, error) {
// If the destination is a PipeReceiver, we can cheat // If the destination is a PipeReceiver, we can cheat
psrc, ok := src.(*PipeReceiver) psrc, ok := src.(*PipeReceiver)
if !ok { if !ok {
return 0, ErrIncompatibleReceiver return 0, beam.ErrIncompatibleReceiver
} }
for { for {
pmsg, err := psrc.p.preceive() pmsg, err := psrc.p.preceive()

View File

@ -1,4 +1,4 @@
package inmem package utils
import ( import (
"github.com/docker/beam" "github.com/docker/beam"

View File

@ -1,34 +1,20 @@
package inmem package utils
import ( import (
"errors"
"github.com/docker/beam" "github.com/docker/beam"
"sync" "sync"
) )
type ReceiverFrom interface {
ReceiveFrom(beam.Receiver) (int, error)
}
type SenderTo interface {
SendTo(beam.Sender) (int, error)
}
var (
ErrIncompatibleSender = errors.New("incompatible sender")
ErrIncompatibleReceiver = errors.New("incompatible receiver")
)
func Copy(dst beam.Sender, src beam.Receiver) (int, error) { func Copy(dst beam.Sender, src beam.Receiver) (int, error) {
var tasks sync.WaitGroup var tasks sync.WaitGroup
defer tasks.Wait() defer tasks.Wait()
if senderTo, ok := src.(SenderTo); ok { if senderTo, ok := src.(beam.SenderTo); ok {
if n, err := senderTo.SendTo(dst); err != ErrIncompatibleSender { if n, err := senderTo.SendTo(dst); err != beam.ErrIncompatibleSender {
return n, err return n, err
} }
} }
if receiverFrom, ok := dst.(ReceiverFrom); ok { if receiverFrom, ok := dst.(beam.ReceiverFrom); ok {
if n, err := receiverFrom.ReceiveFrom(src); err != ErrIncompatibleReceiver { if n, err := receiverFrom.ReceiveFrom(src); err != beam.ErrIncompatibleReceiver {
return n, err return n, err
} }
} }

View File

@ -1,4 +1,4 @@
package inmem package utils
import ( import (
"github.com/docker/beam" "github.com/docker/beam"

View File

@ -73,7 +73,7 @@ func TestStackLen(t *testing.T) {
func TestStackAdd(t *testing.T) { func TestStackAdd(t *testing.T) {
s := NewStackSender() s := NewStackSender()
a := inmem.Buffer{} a := Buffer{}
beforeA := s.Add(&a) beforeA := s.Add(&a)
// Add on an empty StackSender should return an empty StackSender // Add on an empty StackSender should return an empty StackSender
if beforeA.Len() != 0 { if beforeA.Len() != 0 {
@ -83,7 +83,7 @@ func TestStackAdd(t *testing.T) {
t.Fatalf("%#v", beforeA) t.Fatalf("%#v", beforeA)
} }
// Add a 2nd element // Add a 2nd element
b := inmem.Buffer{} b := Buffer{}
beforeB := s.Add(&b) beforeB := s.Add(&b)
if beforeB.Len() != 1 { if beforeB.Len() != 1 {
t.Fatalf("%#v", beforeA) t.Fatalf("%#v", beforeA)
@ -105,7 +105,7 @@ func TestStackAdd(t *testing.T) {
// Misbehaving backends must be removed // Misbehaving backends must be removed
func TestStackAddBad(t *testing.T) { func TestStackAddBad(t *testing.T) {
s := NewStackSender() s := NewStackSender()
buf := inmem.Buffer{} buf := Buffer{}
s.Add(&buf) s.Add(&buf)
r, w := inmem.Pipe() r, w := inmem.Pipe()
s.Add(w) s.Add(w)