Beam in-mem: an in-memory implementation of Beam

This commit is contained in:
Solomon Hykes 2014-05-03 12:29:38 +00:00
parent 2c9e18009f
commit 3725628467
2 changed files with 133 additions and 0 deletions

89
inmem/inmem.go Normal file
View File

@ -0,0 +1,89 @@
package inmem
import (
"fmt"
"io"
"sync"
)
type Handle interface {
Send(msg *Message, mode int) (Handle, error)
Receive(mode int) (*Message, Handle, error)
CloseWrite() error
}
const (
R = 1 << (32 - 1 - iota)
W
)
type Message struct {
Target string
Name string
Args []string
Data string
}
func Pipe() (Handle, Handle) {
red := make(chan *pipeMessage)
black := make(chan *pipeMessage)
return &PipeHandle{r: red, w: black}, &PipeHandle{r: black, w: red}
}
type PipeHandle struct {
sync.RWMutex
r chan *pipeMessage
w chan *pipeMessage
}
func (h *PipeHandle) Send(msg *Message, mode int) (Handle, error) {
h.RLock()
defer h.RUnlock()
if h.w == nil {
return nil, fmt.Errorf("closed pipe")
}
var (
rh Handle
lh Handle
)
if mode&(R|W) != 0 {
rh, lh = Pipe()
if mode&W == 0 {
lh.CloseWrite()
}
}
h.w <-&pipeMessage{msg, rh}
return lh, nil
}
func (h *PipeHandle) Receive(mode int) (*Message, Handle, error) {
pmsg, ok := <-h.r
if !ok {
return nil, nil, io.EOF
}
var handle Handle
if pmsg.handle != nil && mode&W == 0 {
pmsg.handle.CloseWrite()
}
if mode&(R|W) != 0 {
handle = pmsg.handle
}
return pmsg.payload, handle, nil
}
func (h *PipeHandle) CloseWrite() error {
h.Lock()
defer h.Unlock()
if h.w == nil {
return fmt.Errorf("already closed")
}
close(h.w)
h.w = nil
return nil
}
type pipeMessage struct {
payload *Message
handle Handle
}

44
inmem/inmem_test.go Normal file
View File

@ -0,0 +1,44 @@
package inmem
import (
"testing"
"time"
)
func TestSimpleSend(t *testing.T) {
a, b := Pipe()
defer a.CloseWrite()
defer b.CloseWrite()
onTimeout := time.After(100 * time.Millisecond)
onRcv := make(chan bool)
go func() {
msg, h, err := b.Receive(0)
if err != nil {
t.Fatal(err)
}
if msg.Data != "hello world" {
t.Fatalf("%#v", *msg)
}
if msg.Name != "print" {
t.Fatalf("%#v", *msg)
}
if len(msg.Args) != 0 {
t.Fatalf("%#v", *msg)
}
if h != nil {
t.Fatalf("%#v", h)
}
close(onRcv)
}()
h, err := a.Send(&Message{Name:"print", Data: "hello world"}, 0)
if err != nil {
t.Fatal(err)
}
if h != nil {
t.Fatalf("%#v", h)
}
select {
case <-onTimeout: t.Fatalf("timeout")
case <-onRcv:
}
}