beam: encode Verb in an enum instead of a string

Signed-off-bu: Solomon Hykes <solomon@docker.com>
This commit is contained in:
Solomon Hykes 2014-06-05 07:02:21 +00:00
parent 56ee97a572
commit d95b9728e8
13 changed files with 98 additions and 168 deletions

View File

@ -26,11 +26,11 @@ func Debug() beam.Sender {
backend.OnSpawn(beam.Handler(func(ctx *beam.Message) error {
instance := beam.NewServer()
instance.Catchall(beam.Handler(func(msg *beam.Message) error {
fmt.Printf("[DEBUG] %s %s\n", msg.Name, strings.Join(msg.Args, " "))
fmt.Printf("[DEBUG] %s %s\n", msg.Verb, strings.Join(msg.Args, " "))
ctx.Ret.Send(msg)
return nil
}))
_, err := ctx.Ret.Send(&beam.Message{Name: string(beam.Ack), Ret: instance})
_, err := ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: instance})
return err
}))
return backend
@ -51,7 +51,7 @@ func FakeClient() beam.Sender {
o.Log("fake client heartbeat!")
}
})
_, err := ctx.Ret.Send(&beam.Message{Name: string(beam.Ack), Ret: instance})
_, err := ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: instance})
return err
}))
return backend

View File

@ -18,7 +18,7 @@ func Simulator() beam.Sender {
}))
beam.Copy(s, in)
})
ctx.Ret.Send(&beam.Message{Name: "ack", Ret: instance})
ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: instance})
return nil
}))
return s

View File

@ -15,7 +15,7 @@ type Receiver interface {
}
type Message struct {
Name string
Verb Verb
Args []string
Att *os.File
Ret Sender
@ -63,4 +63,4 @@ func Repeater(payload *Message) Sender {
})
}
var NotImplemented = Repeater(&Message{Name: "error", Args: []string{"not implemented"}})
var NotImplemented = Repeater(&Message{Verb: Error, Args: []string{"not implemented"}})

View File

@ -14,7 +14,8 @@ func TestInmemRetPipe(t *testing.T) {
defer w.Close()
wait := make(chan struct{})
go func() {
ret, err := w.Send(&Message{Name: "hello", Ret: RetPipe})
defer close(wait)
ret, err := w.Send(&Message{Verb: Log, Args: []string{"hello"}, Ret: RetPipe})
if err != nil {
t.Fatal(err)
}
@ -22,16 +23,18 @@ func TestInmemRetPipe(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if msg.Name != "this better not crash" {
if msg.Verb != Ack {
t.Fatalf("%#v", msg)
}
if msg.Args[0] != "this better not crash" {
t.Fatalf("%#v", msg)
}
close(wait)
}()
msg, err := r.Receive(Ret)
if err != nil {
t.Fatal(err)
}
if _, err := msg.Ret.Send(&Message{Name: "this better not crash"}); err != nil {
if _, err := msg.Ret.Send(&Message{Verb: Ack, Args: []string{"this better not crash"}}); err != nil {
t.Fatal(err)
}
<-wait
@ -47,14 +50,14 @@ func TestSimpleSend(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if msg.Name != "print" {
if msg.Verb != Log {
t.Fatalf("%#v", *msg)
}
if msg.Args[0] != "hello world" {
t.Fatalf("%#v", *msg)
}
}()
if _, err := w.Send(&Message{Name: "print", Args: []string{"hello world"}}); err != nil {
if _, err := w.Send(&Message{Verb: Log, Args: []string{"hello world"}}); err != nil {
t.Fatal(err)
}
})
@ -116,7 +119,7 @@ func TestSendFile(t *testing.T) {
tmp.Seek(0, 0)
testutils.Timeout(t, func() {
go func() {
_, err := w.Send(&Message{Name: "file", Args: []string{"path=" + tmp.Name()}, Att: tmp})
_, err := w.Send(&Message{Verb: File, Args: []string{"path=" + tmp.Name()}, Att: tmp})
if err != nil {
t.Fatal(err)
}
@ -125,7 +128,7 @@ func TestSendFile(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if msg.Name != "file" {
if msg.Verb != File {
t.Fatalf("%#v", msg)
}
if msg.Args[0] != "path="+tmp.Name() {

View File

@ -19,12 +19,12 @@ func Obj(dst Sender) *Object {
}
func (o *Object) Log(msg string, args ...interface{}) error {
_, err := o.Send(&Message{Name: "log", Args: []string{fmt.Sprintf(msg, args...)}})
_, err := o.Send(&Message{Verb: Log, Args: []string{fmt.Sprintf(msg, args...)}})
return err
}
func (o *Object) Ls() ([]string, error) {
ret, err := o.Send(&Message{Name: "ls", Ret: RetPipe})
ret, err := o.Send(&Message{Verb: Ls, Ret: RetPipe})
if err != nil {
return nil, err
}
@ -32,20 +32,20 @@ func (o *Object) Ls() ([]string, error) {
if err == io.EOF {
return nil, fmt.Errorf("unexpected EOF")
}
if msg.Name == "set" {
if msg.Verb == Set {
if err != nil {
return nil, err
}
return msg.Args, nil
}
if msg.Name == "error" {
if msg.Verb == Error {
return nil, fmt.Errorf(strings.Join(msg.Args[:1], ""))
}
return nil, fmt.Errorf("unexpected verb %v", msg.Name)
return nil, fmt.Errorf("unexpected verb %v", msg.Verb)
}
func (o *Object) Spawn(cmd ...string) (out *Object, err error) {
ret, err := o.Send(&Message{Name: "spawn", Args: cmd, Ret: RetPipe})
ret, err := o.Send(&Message{Verb: Spawn, Args: cmd, Ret: RetPipe})
if err != nil {
return nil, err
}
@ -56,18 +56,18 @@ func (o *Object) Spawn(cmd ...string) (out *Object, err error) {
if err != nil {
return nil, err
}
if msg.Name == "ack" {
if msg.Verb == Ack {
return &Object{msg.Ret}, nil
}
msg.Ret.Close()
if msg.Name == "error" {
if msg.Verb == Error {
return nil, fmt.Errorf("%s", strings.Join(msg.Args[:1], ""))
}
return nil, fmt.Errorf("unexpected verb %v", msg.Name)
return nil, fmt.Errorf("unexpected verb %v", msg.Verb)
}
func (o *Object) Attach(name string) (in Receiver, out *Object, err error) {
ret, err := o.Send(&Message{Name: "attach", Args: []string{name}, Ret: RetPipe})
ret, err := o.Send(&Message{Verb: Attach, Args: []string{name}, Ret: RetPipe})
if err != nil {
return nil, nil, err
}
@ -78,23 +78,23 @@ func (o *Object) Attach(name string) (in Receiver, out *Object, err error) {
if err != nil {
return nil, nil, err
}
if msg.Name == "ack" {
if msg.Verb == Ack {
return ret, &Object{msg.Ret}, nil
}
msg.Ret.Close()
if msg.Name == "error" {
if msg.Verb == Error {
return nil, nil, fmt.Errorf("%s", strings.Join(msg.Args[:1], ""))
}
return nil, nil, fmt.Errorf("unexpected verb %v", msg.Name)
return nil, nil, fmt.Errorf("unexpected verb %v", msg.Verb)
}
func (o *Object) Error(msg string, args ...interface{}) error {
_, err := o.Send(&Message{Name: "error", Args: []string{fmt.Sprintf(msg, args...)}})
_, err := o.Send(&Message{Verb: Error, Args: []string{fmt.Sprintf(msg, args...)}})
return err
}
func (o *Object) Connect() (net.Conn, error) {
ret, err := o.Send(&Message{Name: "connect", Ret: RetPipe})
ret, err := o.Send(&Message{Verb: Connect, Ret: RetPipe})
if err != nil {
return nil, err
}
@ -103,7 +103,7 @@ func (o *Object) Connect() (net.Conn, error) {
if err == io.EOF {
return nil, fmt.Errorf("unexpected EOF")
}
if msg.Name == "connect" {
if msg.Verb == Connect {
if msg.Att == nil {
return nil, fmt.Errorf("missing attachment")
}
@ -115,10 +115,10 @@ func (o *Object) Connect() (net.Conn, error) {
msg.Att.Close()
return conn, nil
}
if msg.Name == "error" {
if msg.Verb == Error {
return nil, fmt.Errorf(strings.Join(msg.Args[:1], ""))
}
return nil, fmt.Errorf("unexpected verb %v", msg.Name)
return nil, fmt.Errorf("unexpected verb %v", msg.Verb)
}
func (o *Object) SetJson(val interface{}) error {
@ -130,12 +130,12 @@ func (o *Object) SetJson(val interface{}) error {
}
func (o *Object) Set(vals ...string) error {
_, err := o.Send(&Message{Name: "set", Args: vals})
_, err := o.Send(&Message{Verb: Set, Args: vals})
return err
}
func (o *Object) Get(key string) (string, error) {
ret, err := o.Send(&Message{Name: "get", Args: []string{key}, Ret: RetPipe})
ret, err := o.Send(&Message{Verb: Get, Args: []string{key}, Ret: RetPipe})
if err != nil {
return "", err
}
@ -143,7 +143,7 @@ func (o *Object) Get(key string) (string, error) {
if err == io.EOF {
return "", fmt.Errorf("unexpected EOF")
}
if msg.Name == "set" {
if msg.Verb == Set {
if err != nil {
return "", err
}
@ -152,38 +152,38 @@ func (o *Object) Get(key string) (string, error) {
}
return msg.Args[0], nil
}
if msg.Name == "error" {
if msg.Verb == Error {
return "", fmt.Errorf(strings.Join(msg.Args[:1], ""))
}
return "", fmt.Errorf("unexpected verb %v", msg.Name)
return "", fmt.Errorf("unexpected verb %v", msg.Verb)
}
func (o *Object) Watch() (Receiver, error) {
ret, err := o.Send(&Message{Name: "watch", Ret: RetPipe})
ret, err := o.Send(&Message{Verb: Watch, Ret: RetPipe})
if err != nil {
return nil, err
}
msg, err := ret.Receive(0)
if msg.Name == "ok" {
if msg.Verb == Ack {
return ret, nil
}
if msg.Name == "error" {
if msg.Verb == Error {
return nil, fmt.Errorf(strings.Join(msg.Args[:1], ""))
}
return nil, fmt.Errorf("unexpected verb %v", msg.Name)
return nil, fmt.Errorf("unexpected verb %v", msg.Verb)
}
func (o *Object) Start() error {
ret, err := o.Send(&Message{Name: "start", Ret: RetPipe})
ret, err := o.Send(&Message{Verb: Start, Ret: RetPipe})
msg, err := ret.Receive(0)
if err == io.EOF {
return fmt.Errorf("unexpected EOF")
}
if Verb(msg.Name) == Ack {
if msg.Verb == Ack {
return nil
}
if msg.Name == "error" {
if msg.Verb == Error {
return fmt.Errorf(strings.Join(msg.Args[:1], ""))
}
return fmt.Errorf("unexpected verb %v", msg.Name)
return fmt.Errorf("unexpected verb %v", msg.Verb)
}

View File

@ -54,7 +54,7 @@ func (s *Server) OnLs(h Sender) *Server {
}
func (s *Server) Send(msg *Message) (Receiver, error) {
if h, exists := s.handlers[Verb(msg.Name)]; exists {
if h, exists := s.handlers[msg.Verb]; exists {
return h.Send(msg)
}
if s.catchall != nil {

View File

@ -12,7 +12,7 @@ func Task(f func(in Receiver, out Sender)) Sender {
outR, outW := Pipe()
obj := NewServer()
obj.OnAttach(Handler(func(msg *Message) error {
msg.Ret.Send(&Message{Name: "ack", Ret: inW})
msg.Ret.Send(&Message{Verb: Ack, Ret: inW})
fmt.Printf("copying task output from %#v to %#v\n", outR, msg.Ret)
defer fmt.Printf("(DONE) copying task output from %#v to %#v\n", outR, msg.Ret)
Copy(msg.Ret, outR)
@ -29,7 +29,7 @@ func Task(f func(in Receiver, out Sender)) Sender {
go f(inR, outW)
running = true
l.Unlock()
msg.Ret.Send(&Message{Name: "ack"})
msg.Ret.Send(&Message{Verb: Ack})
return nil
}))
return obj

View File

@ -16,11 +16,11 @@ func NewTree() *Tree {
}
t.OnAttach(Handler(func(msg *Message) error {
if len(msg.Args) == 0 || msg.Args[0] == "" {
msg.Ret.Send(&Message{Name: "ack", Ret: t})
msg.Ret.Send(&Message{Verb: Ack, Ret: t})
return nil
}
if child, exists := t.children[msg.Args[0]]; exists {
msg.Ret.Send(&Message{Name: "ack", Ret: child})
msg.Ret.Send(&Message{Verb: Ack, Ret: child})
return nil
}
Obj(msg.Ret).Error("not found")

View File

@ -3,6 +3,7 @@ package unix
import (
"fmt"
"os"
"strconv"
"github.com/docker/libswarm/beam"
"github.com/docker/libswarm/beam/data"
@ -54,7 +55,7 @@ func (c *Conn) Send(msg *beam.Message) (beam.Receiver, error) {
if msg.Att != nil {
return nil, fmt.Errorf("file attachment not yet implemented in unix transport")
}
parts := []string{msg.Name}
parts := []string{fmt.Sprintf("%d", msg.Verb)}
parts = append(parts, msg.Args...)
b := []byte(data.EncodeList(parts))
// Setup nested streams
@ -120,7 +121,11 @@ func (c *Conn) Receive(mode int) (*beam.Message, error) {
if len(parts) == 0 {
return nil, fmt.Errorf("malformed message")
}
msg := &beam.Message{Name: parts[0], Args: parts[1:]}
v, err := strconv.ParseUint(parts[0], 10, 32)
if err != nil {
return nil, err
}
msg := &beam.Message{Verb: beam.Verb(v), Args: parts[1:]}
// Apply mode mask
if fd != nil {

View File

@ -15,21 +15,18 @@ func TestPair(t *testing.T) {
defer w.Close()
testutils.Timeout(t, func() {
go func() {
msg, in, out, err := r.Receive(0)
msg, err := r.Receive(0)
if err != nil {
t.Fatal(err)
}
if msg.Name != "print" {
if msg.Verb != beam.Log {
t.Fatalf("%#v", *msg)
}
if msg.Args[0] != "hello world" {
t.Fatalf("%#v", *msg)
}
if in != nil && out != nil {
t.Fatal("Unexpected return value")
}
}()
_, _, err := w.Send(&beam.Message{Name: "print", Args: []string{"hello world"}}, 0)
_, err := w.Send(&beam.Message{Verb: beam.Log, Args: []string{"hello world"}})
if err != nil {
t.Fatal(err)
}
@ -47,108 +44,31 @@ func TestSendReply(t *testing.T) {
// Send
go func() {
// Send a message with mode=R
in, out, err := w.Send(&beam.Message{Args: []string{"this is the request"}}, beam.R)
ret, err := w.Send(&beam.Message{Args: []string{"this is the request"}, Ret: beam.RetPipe})
if err != nil {
t.Fatal(err)
}
assertMode(t, in, out, beam.R)
// Read for a reply
resp, _, _, err := in.Receive(0)
msg, err := ret.Receive(0)
if err != nil {
t.Fatal(err)
}
if resp.Args[0] != "this is the reply" {
t.Fatalf("%#v", resp)
if msg.Args[0] != "this is the reply" {
t.Fatalf("%#v", msg)
}
}()
// Receive a message with mode=W
msg, in, out, err := r.Receive(beam.W)
msg, err := r.Receive(beam.Ret)
if err != nil {
t.Fatal(err)
}
if msg.Args[0] != "this is the request" {
t.Fatalf("%#v", msg)
}
assertMode(t, in, out, beam.W)
// Send a reply
_, _, err = out.Send(&beam.Message{Args: []string{"this is the reply"}}, 0)
_, err = msg.Ret.Send(&beam.Message{Args: []string{"this is the reply"}})
if err != nil {
t.Fatal(err)
}
})
}
func TestSendNested(t *testing.T) {
r, w, err := Pair()
if err != nil {
t.Fatal(err)
}
defer r.Close()
defer w.Close()
testutils.Timeout(t, func() {
// Send
go func() {
// Send a message with mode=W
in, out, err := w.Send(&beam.Message{Args: []string{"this is the request"}}, beam.W)
if err != nil {
t.Fatal(err)
}
assertMode(t, in, out, beam.W)
// Send a nested message
_, _, err = out.Send(&beam.Message{Args: []string{"this is the nested message"}}, 0)
if err != nil {
t.Fatal(err)
}
}()
// Receive a message with mode=R
msg, in, out, err := r.Receive(beam.R)
if err != nil {
t.Fatal(err)
}
if msg.Args[0] != "this is the request" {
t.Fatalf("%#v", msg)
}
assertMode(t, in, out, beam.R)
// Read for a nested message
nested, _, _, err := in.Receive(0)
if err != nil {
t.Fatal(err)
}
if nested.Args[0] != "this is the nested message" {
t.Fatalf("%#v", nested)
}
})
}
// FIXME: duplicate from inmem/inmem_test.go
// assertMode verifies that the values of r and w match
// mode.
// If mode has the R bit set, r must be non-nil. Otherwise it must be nil.
// If mode has the W bit set, w must be non-nil. Otherwise it must be nil.
//
// If any of these conditions are not met, t.Fatal is called and the active
// test fails.
func assertMode(t *testing.T, r beam.Receiver, w beam.Sender, mode int) {
// If mode has the R bit set, r must be non-nil
if mode&beam.R != 0 {
if r == nil {
t.Fatalf("should be non-nil: %#v", r)
}
// Otherwise it must be nil.
} else {
if r != nil {
t.Fatalf("should be nil: %#v", r)
}
}
// If mode has the W bit set, w must be non-nil
if mode&beam.W != 0 {
if w == nil {
t.Fatalf("should be non-nil: %#v", w)
}
// Otherwise it must be nil.
} else {
if w != nil {
t.Fatalf("should be nil: %#v", w)
}
}
}

View File

@ -11,7 +11,7 @@ func TestSendRet(t *testing.T) {
defer w.Close()
q := NewQueue(w, 1)
defer q.Close()
ret, err := q.Send(&beam.Message{Name: "ping", Ret: beam.RetPipe})
ret, err := q.Send(&beam.Message{Verb: beam.Log, Args: []string{"ping"}, Ret: beam.RetPipe})
if err != nil {
t.Fatal(err)
}
@ -20,7 +20,7 @@ func TestSendRet(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if _, err := ping.Ret.Send(&beam.Message{Name: "pong"}); err != nil {
if _, err := ping.Ret.Send(&beam.Message{Verb: beam.Log, Args: []string{"pong"}}); err != nil {
t.Fatal(err)
}
}()
@ -28,16 +28,16 @@ func TestSendRet(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if pong.Name != "pong" {
if pong.Verb != beam.Log {
t.Fatal(err)
}
}
func TestSendClose(t *testing.T) {
q := NewQueue(beam.NopSender{}, 1)
q.Send(&beam.Message{Name: "hello"})
q.Send(&beam.Message{Verb: beam.Error, Args: []string{"hello"}})
q.Close()
if _, err := q.Send(&beam.Message{Name: "again"}); err == nil {
if _, err := q.Send(&beam.Message{Verb: beam.Ack, Args: []string{"again"}}); err == nil {
t.Fatal("send on closed queue should return an error")
}
}

View File

@ -20,14 +20,14 @@ func TestStackWithPipe(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if msg.Name != "hello" {
if msg.Verb != beam.Log {
t.Fatalf("%#v", msg)
}
if strings.Join(msg.Args, " ") != "wonderful world" {
t.Fatalf("%#v", msg)
}
}()
_, err := s.Send(&beam.Message{Name: "hello", Args: []string{"wonderful", "world"}})
_, err := s.Send(&beam.Message{Verb: beam.Log, Args: []string{"wonderful", "world"}})
if err != nil {
t.Fatal(err)
}
@ -49,14 +49,14 @@ func TestStackWithPair(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if msg.Name != "hello" {
if msg.Verb != beam.Log {
t.Fatalf("%#v", msg)
}
if strings.Join(msg.Args, " ") != "wonderful world" {
t.Fatalf("%#v", msg)
}
}()
_, err := s.Send(&beam.Message{Name: "hello", Args: []string{"wonderful", "world"}})
_, err := s.Send(&beam.Message{Verb: beam.Log, Args: []string{"wonderful", "world"}})
if err != nil {
t.Fatal(err)
}
@ -90,9 +90,9 @@ func TestStackAdd(t *testing.T) {
if s.Len() != 2 {
t.Fatalf("%#v", beforeA)
}
s.Send(&beam.Message{Name: "for b", Args: nil})
beforeB.Send(&beam.Message{Name: "for a", Args: nil})
beforeA.Send(&beam.Message{Name: "for nobody", Args: nil})
s.Send(&beam.Message{Verb: beam.Log, Args: []string{"for b"}})
beforeB.Send(&beam.Message{Verb: beam.Log, Args: []string{"for a"}})
beforeA.Send(&beam.Message{Verb: beam.Log, Args: []string{"for nobody"}})
if len(a) != 1 {
t.Fatalf("%#v", a)
}
@ -112,7 +112,7 @@ func TestStackAddBad(t *testing.T) {
t.Fatalf("%#v", s)
}
r.Close()
if _, err := s.Send(&beam.Message{Name: "for the buffer", Args: nil}); err != nil {
if _, err := s.Send(&beam.Message{Verb: beam.Log, Args: []string{"for the buffer"}}); err != nil {
t.Fatal(err)
}
if s.Len() != 1 {
@ -121,7 +121,7 @@ func TestStackAddBad(t *testing.T) {
if len(buf) != 1 {
t.Fatalf("%#v", buf)
}
if buf[0].Name != "for the buffer" {
if buf[0].Args[0] != "for the buffer" {
t.Fatalf("%#v", buf)
}
}

View File

@ -1,17 +1,19 @@
package beam
type Verb string
type Verb uint32
var (
Ack Verb = "ack"
Log Verb = "log"
Start Verb = "start"
Stop Verb = "stop"
Attach Verb = "attach"
Spawn Verb = "spawn"
Set Verb = "set"
Get Verb = "get"
File Verb = "file"
Error Verb = "error"
Ls Verb = "ls"
const (
Ack Verb = iota
Attach
Connect
Error
File
Get
Log
Ls
Set
Spawn
Start
Stop
Watch
)