inmem: switch to a Sender/Receiver/Pipe interface similar to io.Pipe

Signed-off-by: Solomon Hykes <solomon@docker.com>
This commit is contained in:
Solomon Hykes 2014-05-09 20:55:39 -07:00
parent c0364720db
commit a1ed1ec1fa
2 changed files with 190 additions and 80 deletions

View File

@ -1,21 +1,19 @@
package inmem package inmem
import ( import (
"fmt"
"io" "io"
"sync" "sync"
) )
type Handle interface { type Sender interface {
Send(msg *Message, mode int) (Handle, error) Send(msg *Message, mode int) (Receiver, Sender, error)
Receive(mode int) (*Message, Handle, error) Close() error
CloseWrite() error
} }
const ( type Receiver interface {
R = 1 << (32 - 1 - iota) Receive(mode int) (*Message, Receiver, Sender, error)
W Close() error
) }
type Message struct { type Message struct {
Name string Name string
@ -23,66 +21,173 @@ type Message struct {
Data string Data string
} }
const (
R = 1 << (32 - 1 - iota)
W
)
func Pipe() (Handle, Handle) { func Pipe() (*PipeReceiver, *PipeSender) {
red := make(chan *pipeMessage) p := new(pipe)
black := make(chan *pipeMessage) p.rwait.L = &p.l
return &PipeHandle{r: red, w: black}, &PipeHandle{r: black, w: red} p.wwait.L = &p.l
r := &PipeReceiver{p}
w := &PipeSender{p}
return r, w
} }
type PipeHandle struct { type pipe struct {
sync.RWMutex ch chan *pipeMessage
r chan *pipeMessage rwait sync.Cond
w chan *pipeMessage wwait sync.Cond
} l sync.Mutex
rl sync.Mutex
func (h *PipeHandle) Send(msg *Message, mode int) (Handle, error) { wl sync.Mutex
h.RLock() rerr error // if reader closed, error to give writes
defer h.RUnlock() werr error // if writer closed, error to give reads
if h.w == nil { pmsg *pipeMessage
return nil, fmt.Errorf("closed pipe")
}
var (
rh Handle
lh Handle
)
if mode&(R|W) != 0 {
rh, lh = Pipe()
if mode&W == 0 {
lh.CloseWrite()
}
}
h.w <-&pipeMessage{msg, rh}
return lh, nil
}
func (h *PipeHandle) Receive(mode int) (*Message, Handle, error) {
pmsg, ok := <-h.r
if !ok {
return nil, nil, io.EOF
}
var handle Handle
if pmsg.handle != nil && mode&W == 0 {
pmsg.handle.CloseWrite()
}
if mode&(R|W) != 0 {
handle = pmsg.handle
}
return pmsg.payload, handle, nil
}
func (h *PipeHandle) CloseWrite() error {
h.Lock()
defer h.Unlock()
if h.w == nil {
return fmt.Errorf("already closed")
}
close(h.w)
h.w = nil
return nil
} }
type pipeMessage struct { type pipeMessage struct {
payload *Message msg *Message
handle Handle out *PipeSender
in *PipeReceiver
}
func (p *pipe) send(msg *Message, mode int) (in Receiver, out Sender, err error) {
// Prepare the message
pmsg := &pipeMessage{msg: msg}
if mode&R != 0 {
in, pmsg.out = Pipe()
defer func() {
if err != nil {
in.Close()
in = nil
pmsg.out.Close()
}
}()
}
if mode&W != 0 {
pmsg.in, out = Pipe()
defer func() {
if err != nil {
out.Close()
out = nil
pmsg.in.Close()
}
}()
}
// One writer at a time.
p.wl.Lock()
defer p.wl.Unlock()
p.l.Lock()
defer p.l.Unlock()
p.pmsg = pmsg
p.rwait.Signal()
for {
if p.pmsg == nil {
break
}
if p.rerr != nil {
err = p.rerr
break
}
if p.werr != nil {
err = io.ErrClosedPipe
}
p.wwait.Wait()
}
p.pmsg = nil // in case of rerr or werr
return
}
func (p *pipe) receive(mode int) (msg *Message, in Receiver, out Sender, err error) {
p.rl.Lock()
defer p.rl.Unlock()
p.l.Lock()
defer p.l.Unlock()
for {
if p.rerr != nil {
return nil, nil, nil, io.ErrClosedPipe
}
if p.pmsg != nil {
break
}
if p.werr != nil {
return nil, nil, nil, p.werr
}
p.rwait.Wait()
}
pmsg := p.pmsg
if pmsg.out != nil && mode&W == 0 {
pmsg.out.Close()
}
if pmsg.in != nil && mode&R == 0 {
pmsg.in.Close()
}
p.pmsg = nil
msg = pmsg.msg
p.wwait.Signal()
return
}
func (p *pipe) rclose(err error) {
if err == nil {
err = io.ErrClosedPipe
}
p.l.Lock()
defer p.l.Unlock()
p.rerr = err
p.rwait.Signal()
p.wwait.Signal()
}
func (p *pipe) wclose(err error) {
if err == nil {
err = io.EOF
}
p.l.Lock()
defer p.l.Unlock()
p.werr = err
p.rwait.Signal()
p.wwait.Signal()
}
// PipeReceiver
type PipeReceiver struct {
p *pipe
}
func (r *PipeReceiver) Receive(mode int) (*Message, Receiver, Sender, error) {
return r.p.receive(mode)
}
func (r *PipeReceiver) Close() error {
return r.CloseWithError(nil)
}
func (r *PipeReceiver) CloseWithError(err error) error {
r.p.rclose(err)
return nil
}
// PipeSender
type PipeSender struct {
p *pipe
}
func (w *PipeSender) Send(msg *Message, mode int) (Receiver, Sender, error) {
return w.p.send(msg, mode)
}
func (w *PipeSender) Close() error {
return w.CloseWithError(nil)
}
func (w *PipeSender) CloseWithError(err error) error {
w.p.wclose(err)
return nil
} }

View File

@ -6,13 +6,11 @@ import (
) )
func TestSimpleSend(t *testing.T) { func TestSimpleSend(t *testing.T) {
a, b := Pipe() r, w := Pipe()
defer a.CloseWrite()
defer b.CloseWrite()
onTimeout := time.After(100 * time.Millisecond) onTimeout := time.After(100 * time.Millisecond)
onRcv := make(chan bool) onRcv := make(chan bool)
go func() { go func() {
msg, h, err := b.Receive(0) msg, in, out, err := r.Receive(0)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -25,20 +23,27 @@ func TestSimpleSend(t *testing.T) {
if len(msg.Args) != 0 { if len(msg.Args) != 0 {
t.Fatalf("%#v", *msg) t.Fatalf("%#v", *msg)
} }
if h != nil { if in != nil {
t.Fatalf("%#v", h) t.Fatalf("%#v", in)
}
if out != nil {
t.Fatalf("%#v", out)
} }
close(onRcv) close(onRcv)
}() }()
h, err := a.Send(&Message{Name:"print", Data: "hello world"}, 0) in, out, err := w.Send(&Message{Name: "print", Data: "hello world"}, 0)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if h != nil { if in != nil {
t.Fatalf("%#v", h) t.Fatalf("%#v", in)
}
if out != nil {
t.Fatalf("%#v", out)
} }
select { select {
case <-onTimeout: t.Fatalf("timeout") case <-onTimeout:
t.Fatalf("timeout")
case <-onRcv: case <-onRcv:
} }
} }