diff --git a/inmem/inmem.go b/inmem/inmem.go new file mode 100644 index 0000000000..d5c6645b1d --- /dev/null +++ b/inmem/inmem.go @@ -0,0 +1,89 @@ +package inmem + +import ( + "fmt" + "io" + "sync" +) + +type Handle interface { + Send(msg *Message, mode int) (Handle, error) + Receive(mode int) (*Message, Handle, error) + CloseWrite() error +} + +const ( + R = 1 << (32 - 1 - iota) + W +) + +type Message struct { + Target string + Name string + Args []string + Data string +} + + +func Pipe() (Handle, Handle) { + red := make(chan *pipeMessage) + black := make(chan *pipeMessage) + return &PipeHandle{r: red, w: black}, &PipeHandle{r: black, w: red} +} + +type PipeHandle struct { + sync.RWMutex + r chan *pipeMessage + w chan *pipeMessage +} + +func (h *PipeHandle) Send(msg *Message, mode int) (Handle, error) { + h.RLock() + defer h.RUnlock() + if h.w == nil { + return nil, fmt.Errorf("closed pipe") + } + var ( + rh Handle + lh Handle + ) + if mode&(R|W) != 0 { + rh, lh = Pipe() + if mode&W == 0 { + lh.CloseWrite() + } + } + h.w <-&pipeMessage{msg, rh} + return lh, nil +} + +func (h *PipeHandle) Receive(mode int) (*Message, Handle, error) { + pmsg, ok := <-h.r + if !ok { + return nil, nil, io.EOF + } + var handle Handle + if pmsg.handle != nil && mode&W == 0 { + pmsg.handle.CloseWrite() + } + if mode&(R|W) != 0 { + handle = pmsg.handle + } + return pmsg.payload, handle, nil +} + +func (h *PipeHandle) CloseWrite() error { + h.Lock() + defer h.Unlock() + if h.w == nil { + return fmt.Errorf("already closed") + } + close(h.w) + h.w = nil + return nil +} + +type pipeMessage struct { + payload *Message + handle Handle +} diff --git a/inmem/inmem_test.go b/inmem/inmem_test.go new file mode 100644 index 0000000000..4bac748dd4 --- /dev/null +++ b/inmem/inmem_test.go @@ -0,0 +1,44 @@ +package inmem + +import ( + "testing" + "time" +) + +func TestSimpleSend(t *testing.T) { + a, b := Pipe() + defer a.CloseWrite() + defer b.CloseWrite() + onTimeout := time.After(100 * time.Millisecond) + onRcv := make(chan bool) + go func() { + msg, h, err := b.Receive(0) + if err != nil { + t.Fatal(err) + } + if msg.Data != "hello world" { + t.Fatalf("%#v", *msg) + } + if msg.Name != "print" { + t.Fatalf("%#v", *msg) + } + if len(msg.Args) != 0 { + t.Fatalf("%#v", *msg) + } + if h != nil { + t.Fatalf("%#v", h) + } + close(onRcv) + }() + h, err := a.Send(&Message{Name:"print", Data: "hello world"}, 0) + if err != nil { + t.Fatal(err) + } + if h != nil { + t.Fatalf("%#v", h) + } + select { + case <-onTimeout: t.Fatalf("timeout") + case <-onRcv: + } +}