diff --git a/inmem/copy.go b/inmem/copy.go new file mode 100644 index 0000000000..4f655f46d0 --- /dev/null +++ b/inmem/copy.go @@ -0,0 +1,53 @@ +package inmem + +import ( + "io" + "fmt" +) + +type ReceiverFrom interface { + ReceiveFrom(Receiver) (int, error) +} + +type SenderTo interface { + SendTo(Sender) (int, error) +} + +func Copy(dst Sender, src Receiver) (int, error) { + if senderTo, ok := src.(SenderTo); ok { + return senderTo.SendTo(dst) + } + if receiverFrom, ok := dst.(ReceiverFrom); ok { + return receiverFrom.ReceiveFrom(src) + } + var ( + n int + ) + for { + msg, r, w, err := src.Receive(R|W) + if err == io.EOF { + break + } + if r != nil { + // FIXME: spawn goroutines to shuttle messages for each + // level of nested sender/receiver. + r.Close() + return n, fmt.Errorf("operation not supported") + } + if w != nil { + // FIXME: spawn goroutines to shuttle messages for each + // level of nested sender/receiver. + w.Close() + return n, fmt.Errorf("operation not supported") + } + if err != nil { + return n, err + } + if _, _, err := dst.Send(msg, 0); err != nil { + return n, err + } + n++ + } + return n, nil +} + diff --git a/inmem/inmem.go b/inmem/inmem.go index f94ac9480c..e5369a0660 100644 --- a/inmem/inmem.go +++ b/inmem/inmem.go @@ -1,6 +1,7 @@ package inmem import ( + "fmt" "io" "sync" ) @@ -18,7 +19,7 @@ type Receiver interface { type Message struct { Name string Args []string - Data string + Data []byte } const ( @@ -53,6 +54,33 @@ type pipeMessage struct { in *PipeReceiver } +func (p *pipe) psend(pmsg *pipeMessage) error { + var err error + // One writer at a time. + p.wl.Lock() + defer p.wl.Unlock() + + p.l.Lock() + defer p.l.Unlock() + p.pmsg = pmsg + p.rwait.Signal() + for { + if p.pmsg == nil { + break + } + if p.rerr != nil { + err = p.rerr + break + } + if p.werr != nil { + err = io.ErrClosedPipe + } + p.wwait.Wait() + } + p.pmsg = nil // in case of rerr or werr + return err +} + func (p *pipe) send(msg *Message, mode int) (in *PipeReceiver, out *PipeSender, err error) { // Prepare the message pmsg := &pipeMessage{msg: msg} @@ -76,28 +104,7 @@ func (p *pipe) send(msg *Message, mode int) (in *PipeReceiver, out *PipeSender, } }() } - // One writer at a time. - p.wl.Lock() - defer p.wl.Unlock() - - p.l.Lock() - defer p.l.Unlock() - p.pmsg = pmsg - p.rwait.Signal() - for { - if p.pmsg == nil { - break - } - if p.rerr != nil { - err = p.rerr - break - } - if p.werr != nil { - err = io.ErrClosedPipe - } - p.wwait.Wait() - } - p.pmsg = nil // in case of rerr or werr + err = p.psend(pmsg) return } @@ -164,7 +171,7 @@ type PipeReceiver struct { p *pipe } -func (r *PipeReceiver) Receive(mode int) (*Message, *PipeReceiver, *PipeSender, error) { +func (r *PipeReceiver) Receive(mode int) (*Message, Receiver, Sender, error) { return r.p.receive(mode) } @@ -183,10 +190,49 @@ type PipeSender struct { p *pipe } -func (w *PipeSender) Send(msg *Message, mode int) (*PipeReceiver, *PipeSender, error) { +func (w *PipeSender) Send(msg *Message, mode int) (Receiver, Sender, error) { return w.p.send(msg, mode) } +func (w *PipeSender) ReceiveFrom(src Receiver, mode int) (int, error) { + var n int + for { + msg, msgr, msgw, err := src.Receive(mode) + if err == io.EOF { + break + } + if err != nil { + return n, err + } + pmsg := &pipeMessage{msg: msg} + if msgr != nil { + if pmsgr, ok := msgr.(*PipeReceiver); ok { + pmsg.in = pmsgr + } else { + // FIXME: if we're not receiving from a PipeReceiver, + // we need to create a new pipe and shuttle messages through it + // in a new goroutine. + return n, fmt.Errorf("operation not supported") + } + } + if msgw != nil { + if pmsgw, ok := msgw.(*PipeSender); ok { + pmsg.out = pmsgw + } else { + // FIXME: if we're not sending to a PipeSender, + // we need to create a new pipe and shuttle messages through it + // in a new goroutine. + return n, fmt.Errorf("operation not supported") + } + } + if err := w.p.psend(pmsg); err != nil { + return n, err + } + n++ + } + return n, nil +} + func (w *PipeSender) Close() error { return w.CloseWithError(nil) }