diff --git a/inmem/inmem.go b/inmem/inmem.go index 7ca887a016..0c7a7e90fe 100644 --- a/inmem/inmem.go +++ b/inmem/inmem.go @@ -161,8 +161,13 @@ func (r *PipeReceiver) Receive(mode int) (*beam.Message, beam.Receiver, beam.Sen return nil, nil, nil, err } var ( - in beam.Receiver - out beam.Sender + // Always return NopReceiver/NopSender instead of nil values, + // because: + // - if they were requested in the mode, they can safely be used + // - if they were not requested, they can safely be ignored (ie no leak if they + // aren't closed) + in beam.Receiver = beam.NopReceiver{} + out beam.Sender = beam.NopSender{} ) if pin != nil { in = pin diff --git a/inmem/inmem_test.go b/inmem/inmem_test.go index 69611637b4..4703ddb053 100644 --- a/inmem/inmem_test.go +++ b/inmem/inmem_test.go @@ -9,6 +9,20 @@ import ( "testing" ) +func TestReceiveW(t *testing.T) { + r, w := Pipe() + go func() { + w.Send(&beam.Message{Name: "hello"}, 0) + }() + _, _, ww, err := r.Receive(beam.W) + if err != nil { + t.Fatal(err) + } + if _, _, err := ww.Send(&beam.Message{Name: "this better not crash"}, 0); err != nil { + t.Fatal(err) + } +} + func TestSimpleSend(t *testing.T) { r, w := Pipe() defer r.Close() @@ -37,8 +51,8 @@ func TestSimpleSend(t *testing.T) { // assertMode verifies that the values of r and w match // mode. -// If mode has the R bit set, r must be non-nil. Otherwise it must be nil. -// If mode has the W bit set, w must be non-nil. Otherwise it must be nil. +// If mode has the R bit set, r must be non-nil. +// If mode has the W bit set, w must be non-nil. // // If any of these conditions are not met, t.Fatal is called and the active // test fails. @@ -49,10 +63,6 @@ func assertMode(t *testing.T, r beam.Receiver, w beam.Sender, mode int) { t.Fatalf("should be non-nil: %#v", r) } // Otherwise it must be nil. - } else { - if r != nil { - t.Fatalf("should be nil: %#v", r) - } } // If mode has the W bit set, w must be non-nil if mode&beam.W != 0 { @@ -60,10 +70,6 @@ func assertMode(t *testing.T, r beam.Receiver, w beam.Sender, mode int) { t.Fatalf("should be non-nil: %#v", w) } // Otherwise it must be nil. - } else { - if w != nil { - t.Fatalf("should be nil: %#v", w) - } } } diff --git a/nop.go b/nop.go new file mode 100644 index 0000000000..061e5f41a5 --- /dev/null +++ b/nop.go @@ -0,0 +1,21 @@ +package beam + +import ( + "io" +) + +type NopSender struct{} + +func (s NopSender) Send(msg *Message, mode int) (Receiver, Sender, error) { + return NopReceiver{}, NopSender{}, nil +} + +func (s NopSender) Close() error { + return nil +} + +type NopReceiver struct{} + +func (r NopReceiver) Receive(mode int) (*Message, Receiver, Sender, error) { + return nil, nil, nil, io.EOF +} diff --git a/utils/buf.go b/utils/buf.go index 514f9c8f0a..d4164a0220 100644 --- a/utils/buf.go +++ b/utils/buf.go @@ -8,7 +8,7 @@ type Buffer []*beam.Message func (buf *Buffer) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) { (*buf) = append(*buf, msg) - return NopReceiver{}, NopSender{}, nil + return beam.NopReceiver{}, beam.NopSender{}, nil } func (buf *Buffer) Close() error { diff --git a/utils/nop.go b/utils/nop.go deleted file mode 100644 index 943f0ce1c3..0000000000 --- a/utils/nop.go +++ /dev/null @@ -1,22 +0,0 @@ -package utils - -import ( - "github.com/docker/beam" - "io" -) - -type NopSender struct{} - -func (s NopSender) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) { - return NopReceiver{}, NopSender{}, nil -} - -func (s NopSender) Close() error { - return nil -} - -type NopReceiver struct{} - -func (r NopReceiver) Receive(mode int) (*beam.Message, beam.Receiver, beam.Sender, error) { - return nil, nil, nil, io.EOF -} diff --git a/utils/stack.go b/utils/stack.go index a8b1f6baea..11fd678288 100644 --- a/utils/stack.go +++ b/utils/stack.go @@ -38,7 +38,7 @@ func (s *StackSender) Send(msg *beam.Message, mode int) (r beam.Receiver, w beam return r, w, err } // Silently drop messages if no valid backend is available. - return NopSender{}.Send(msg, mode) + return beam.NopSender{}.Send(msg, mode) } func (s *StackSender) Add(dst beam.Sender) *StackSender {