inmem: complete Copy implementation

Copy can now handle arbitrary nesting of streams, without knowing the
underlying type of either the source or destination.

It does this by attempting `SendTo` and `ReceiveFrom`, then if that is
not supported, falling back to nested copies in separate goroutines.

Signed-off-by: Solomon Hykes <solomon@docker.com>
This commit is contained in:
Solomon Hykes 2014-05-11 23:49:07 +00:00
parent 1508c1d46f
commit b9c316df29
2 changed files with 92 additions and 36 deletions

View File

@ -1,7 +1,8 @@
package inmem
import (
"fmt"
"errors"
"sync"
)
type ReceiverFrom interface {
@ -12,15 +13,59 @@ type SenderTo interface {
SendTo(Sender) (int, error)
}
var (
ErrIncompatibleSender = errors.New("incompatible sender")
ErrIncompatibleReceiver = errors.New("incompatible receiver")
)
func Copy(dst Sender, src Receiver) (int, error) {
var tasks sync.WaitGroup
defer tasks.Wait()
if senderTo, ok := src.(SenderTo); ok {
return senderTo.SendTo(dst)
if n, err := senderTo.SendTo(dst); err != ErrIncompatibleSender {
return n, err
}
}
if receiverFrom, ok := dst.(ReceiverFrom); ok {
return receiverFrom.ReceiveFrom(src)
if n, err := receiverFrom.ReceiveFrom(src); err != ErrIncompatibleReceiver {
return n, err
}
}
var (
n int
)
return n, fmt.Errorf("operation not supported")
copyAndClose := func(dst Sender, src Receiver) {
if dst == nil {
return
}
defer dst.Close()
if src == nil {
return
}
Copy(dst, src)
}
for {
msg, rcvR, rcvW, err := src.Receive(R | W)
if err != nil {
return n, err
}
sndR, sndW, err := dst.Send(msg, R|W)
if err != nil {
if rcvW != nil {
rcvW.Close()
}
return n, err
}
tasks.Add(2)
go func() {
copyAndClose(rcvW, sndR)
tasks.Done()
}()
go func() {
copyAndClose(sndW, rcvR)
tasks.Done()
}()
n++
}
return n, nil
}

View File

@ -1,7 +1,6 @@
package inmem
import (
"fmt"
"io"
"sync"
)
@ -106,7 +105,7 @@ func (p *pipe) send(msg *Message, mode int) (in *PipeReceiver, out *PipeSender,
return
}
func (p *pipe) receive(mode int) (msg *Message, in *PipeReceiver, out *PipeSender, err error) {
func (p *pipe) preceive() (*pipeMessage, error) {
p.rl.Lock()
defer p.rl.Unlock()
@ -114,17 +113,27 @@ func (p *pipe) receive(mode int) (msg *Message, in *PipeReceiver, out *PipeSende
defer p.l.Unlock()
for {
if p.rerr != nil {
return nil, nil, nil, io.ErrClosedPipe
return nil, io.ErrClosedPipe
}
if p.pmsg != nil {
break
}
if p.werr != nil {
return nil, nil, nil, p.werr
return nil, p.werr
}
p.rwait.Wait()
}
pmsg := p.pmsg
p.pmsg = nil
p.wwait.Signal()
return pmsg, nil
}
func (p *pipe) receive(mode int) (*Message, *PipeReceiver, *PipeSender, error) {
pmsg, err := p.preceive()
if err != nil {
return nil, nil, nil, err
}
if pmsg.out != nil && mode&W == 0 {
pmsg.out.Close()
pmsg.out = nil
@ -133,12 +142,7 @@ func (p *pipe) receive(mode int) (msg *Message, in *PipeReceiver, out *PipeSende
pmsg.in.Close()
pmsg.in = nil
}
p.pmsg = nil
msg = pmsg.msg
in = pmsg.in
out = pmsg.out
p.wwait.Signal()
return
return pmsg.msg, pmsg.in, pmsg.out, nil
}
func (p *pipe) rclose(err error) {
@ -187,6 +191,29 @@ func (r *PipeReceiver) Receive(mode int) (*Message, Receiver, Sender, error) {
return msg, in, out, err
}
func (r *PipeReceiver) SendTo(dst Sender) (int, error) {
var n int
// If the destination is a PipeSender, we can cheat
pdst, ok := dst.(*PipeSender)
if !ok {
return 0, ErrIncompatibleSender
}
for {
pmsg, err := r.p.preceive()
if err == io.EOF {
break
}
if err != nil {
return n, err
}
if err := pdst.p.psend(pmsg); err != nil {
return n, err
}
}
n++
return n, nil
}
func (r *PipeReceiver) Close() error {
return r.CloseWithError(nil)
}
@ -219,35 +246,19 @@ func (w *PipeSender) Send(msg *Message, mode int) (Receiver, Sender, error) {
func (w *PipeSender) ReceiveFrom(src Receiver) (int, error) {
var n int
// If the destination is a PipeReceiver, we can cheat
psrc, ok := src.(*PipeReceiver)
if !ok {
return 0, ErrIncompatibleReceiver
}
for {
msg, msgr, msgw, err := src.Receive(R | W)
pmsg, err := psrc.p.preceive()
if err == io.EOF {
break
}
if err != nil {
return n, err
}
pmsg := &pipeMessage{msg: msg}
if msgr != nil {
if pmsgr, ok := msgr.(*PipeReceiver); ok {
pmsg.in = pmsgr
} else {
// FIXME: if we're not receiving from a PipeReceiver,
// we need to create a new pipe and shuttle messages through it
// in a new goroutine.
return n, fmt.Errorf("operation not supported")
}
}
if msgw != nil {
if pmsgw, ok := msgw.(*PipeSender); ok {
pmsg.out = pmsgw
} else {
// FIXME: if we're not sending to a PipeSender,
// we need to create a new pipe and shuttle messages through it
// in a new goroutine.
return n, fmt.Errorf("operation not supported")
}
}
if err := w.p.psend(pmsg); err != nil {
return n, err
}