Merge pull request #5 from docker/unix

Expose the unix transport under the new interface
This commit is contained in:
Solomon Hykes 2014-05-18 17:48:14 -07:00
commit df3394c350
18 changed files with 597 additions and 639 deletions

160
beam.go
View File

@ -1,166 +1,22 @@
package beam package beam
import ( import ()
"fmt"
"io"
"os"
)
type Sender interface { type Sender interface {
Send([]byte, *os.File) error Send(msg *Message, mode int) (Receiver, Sender, error)
Close() error
} }
type Receiver interface { type Receiver interface {
Receive() ([]byte, *os.File, error) Receive(mode int) (*Message, Receiver, Sender, error)
} }
type ReceiveCloser interface { type Message struct {
Receiver Name string
Close() error Args []string
}
type SendCloser interface {
Sender
Close() error
}
type ReceiveSender interface {
Receiver
Sender
} }
const ( const (
R int = 1 << (32 - 1 - iota) R = 1 << (32 - 1 - iota)
W W
) )
func sendPipe(dst Sender, data []byte, mode int) (*os.File, error) {
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
var (
remote *os.File
local *os.File
)
if mode == R {
remote = r
local = w
} else if mode == W {
remote = w
local = r
}
if err := dst.Send(data, remote); err != nil {
local.Close()
remote.Close()
return nil, err
}
return local, nil
}
// SendRPipe create a pipe and sends its *read* end attached in a beam message
// to `dst`, with `data` as the message payload.
// It returns the *write* end of the pipe, or an error.
func SendRPipe(dst Sender, data []byte) (*os.File, error) {
return sendPipe(dst, data, R)
}
// SendWPipe create a pipe and sends its *read* end attached in a beam message
// to `dst`, with `data` as the message payload.
// It returns the *write* end of the pipe, or an error.
func SendWPipe(dst Sender, data []byte) (*os.File, error) {
return sendPipe(dst, data, W)
}
func SendConn(dst Sender, data []byte) (conn *UnixConn, err error) {
local, remote, err := SocketPair()
if err != nil {
return nil, err
}
defer func() {
if err != nil {
local.Close()
remote.Close()
}
}()
conn, err = FileConn(local)
if err != nil {
return nil, err
}
local.Close()
if err := dst.Send(data, remote); err != nil {
return nil, err
}
return conn, nil
}
func ReceiveConn(src Receiver) ([]byte, *UnixConn, error) {
for {
data, f, err := src.Receive()
if err != nil {
return nil, nil, err
}
if f == nil {
// Skip empty attachments
continue
}
conn, err := FileConn(f)
if err != nil {
// Skip beam attachments which are not connections
// (for example might be a regular file, directory etc)
continue
}
return data, conn, nil
}
panic("impossibru!")
return nil, nil, nil
}
func Copy(dst Sender, src Receiver) (int, error) {
var n int
for {
payload, attachment, err := src.Receive()
if err == io.EOF {
return n, nil
} else if err != nil {
return n, err
}
if err := dst.Send(payload, attachment); err != nil {
if attachment != nil {
attachment.Close()
}
return n, err
}
n++
}
panic("impossibru!")
return n, nil
}
// MsgDesc returns a human readable description of a beam message, usually
// for debugging purposes.
func MsgDesc(payload []byte, attachment *os.File) string {
var filedesc string = "<nil>"
if attachment != nil {
filedesc = fmt.Sprintf("%d", attachment.Fd())
}
return fmt.Sprintf("'%s'[%s]", payload, filedesc)
}
type devnull struct{}
func Devnull() ReceiveSender {
return devnull{}
}
func (d devnull) Send(p []byte, a *os.File) error {
if a != nil {
a.Close()
}
return nil
}
func (d devnull) Receive() ([]byte, *os.File, error) {
return nil, nil, io.EOF
}

View File

@ -1,39 +1,17 @@
package beam package beam
import ( import (
"github.com/dotcloud/docker/pkg/beam/data"
"testing" "testing"
) )
func TestSendConn(t *testing.T) { func TestModes(t *testing.T) {
a, b, err := USocketPair() if R == W {
if err != nil { t.Fatalf("0")
t.Fatal(err)
} }
defer a.Close() if R == 0 {
defer b.Close() t.Fatalf("0")
go func() {
conn, err := SendConn(a, data.Empty().Set("type", "connection").Bytes())
if err != nil {
t.Fatal(err)
}
if err := conn.Send(data.Empty().Set("foo", "bar").Bytes(), nil); err != nil {
t.Fatal(err)
}
conn.CloseWrite()
}()
payload, conn, err := ReceiveConn(b)
if err != nil {
t.Fatal(err)
} }
if val := data.Message(string(payload)).Get("type"); val == nil || val[0] != "connection" { if W == 0 {
t.Fatalf("%v != %v\n", val, "connection") t.Fatalf("0")
}
msg, _, err := conn.Receive()
if err != nil {
t.Fatal(err)
}
if val := data.Message(string(msg)).Get("foo"); val == nil || val[0] != "bar" {
t.Fatalf("%v != %v\n", val, "bar")
} }
} }

View File

@ -1,10 +1,12 @@
package inmem package inmem
import () import (
"github.com/docker/beam"
)
type Buffer []*Message type Buffer []*beam.Message
func (buf *Buffer) Send(msg *Message, mode int) (Receiver, Sender, error) { func (buf *Buffer) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) {
(*buf) = append(*buf, msg) (*buf) = append(*buf, msg)
return NopReceiver{}, NopSender{}, nil return NopReceiver{}, NopSender{}, nil
} }

View File

@ -2,15 +2,16 @@ package inmem
import ( import (
"errors" "errors"
"github.com/docker/beam"
"sync" "sync"
) )
type ReceiverFrom interface { type ReceiverFrom interface {
ReceiveFrom(Receiver) (int, error) ReceiveFrom(beam.Receiver) (int, error)
} }
type SenderTo interface { type SenderTo interface {
SendTo(Sender) (int, error) SendTo(beam.Sender) (int, error)
} }
var ( var (
@ -18,7 +19,7 @@ var (
ErrIncompatibleReceiver = errors.New("incompatible receiver") ErrIncompatibleReceiver = errors.New("incompatible receiver")
) )
func Copy(dst Sender, src Receiver) (int, error) { func Copy(dst beam.Sender, src beam.Receiver) (int, error) {
var tasks sync.WaitGroup var tasks sync.WaitGroup
defer tasks.Wait() defer tasks.Wait()
if senderTo, ok := src.(SenderTo); ok { if senderTo, ok := src.(SenderTo); ok {
@ -34,7 +35,7 @@ func Copy(dst Sender, src Receiver) (int, error) {
var ( var (
n int n int
) )
copyAndClose := func(dst Sender, src Receiver) { copyAndClose := func(dst beam.Sender, src beam.Receiver) {
if dst == nil { if dst == nil {
return return
} }
@ -45,11 +46,11 @@ func Copy(dst Sender, src Receiver) (int, error) {
Copy(dst, src) Copy(dst, src)
} }
for { for {
msg, rcvR, rcvW, err := src.Receive(R | W) msg, rcvR, rcvW, err := src.Receive(beam.R | beam.W)
if err != nil { if err != nil {
return n, err return n, err
} }
sndR, sndW, err := dst.Send(msg, R|W) sndR, sndW, err := dst.Send(msg, beam.R|beam.W)
if err != nil { if err != nil {
if rcvW != nil { if rcvW != nil {
rcvW.Close() rcvW.Close()

View File

@ -1,29 +1,11 @@
package inmem package inmem
import ( import (
"github.com/docker/beam"
"io" "io"
"sync" "sync"
) )
type Sender interface {
Send(msg *Message, mode int) (Receiver, Sender, error)
Close() error
}
type Receiver interface {
Receive(mode int) (*Message, Receiver, Sender, error)
}
type Message struct {
Name string
Args []string
}
const (
R = 1 << (32 - 1 - iota)
W
)
func Pipe() (*PipeReceiver, *PipeSender) { func Pipe() (*PipeReceiver, *PipeSender) {
p := new(pipe) p := new(pipe)
p.rwait.L = &p.l p.rwait.L = &p.l
@ -46,7 +28,7 @@ type pipe struct {
} }
type pipeMessage struct { type pipeMessage struct {
msg *Message msg *beam.Message
out *PipeSender out *PipeSender
in *PipeReceiver in *PipeReceiver
} }
@ -78,10 +60,10 @@ func (p *pipe) psend(pmsg *pipeMessage) error {
return err return err
} }
func (p *pipe) send(msg *Message, mode int) (in *PipeReceiver, out *PipeSender, err error) { func (p *pipe) send(msg *beam.Message, mode int) (in *PipeReceiver, out *PipeSender, err error) {
// Prepare the message // Prepare the message
pmsg := &pipeMessage{msg: msg} pmsg := &pipeMessage{msg: msg}
if mode&R != 0 { if mode&beam.R != 0 {
in, pmsg.out = Pipe() in, pmsg.out = Pipe()
defer func() { defer func() {
if err != nil { if err != nil {
@ -91,7 +73,7 @@ func (p *pipe) send(msg *Message, mode int) (in *PipeReceiver, out *PipeSender,
} }
}() }()
} }
if mode&W != 0 { if mode&beam.W != 0 {
pmsg.in, out = Pipe() pmsg.in, out = Pipe()
defer func() { defer func() {
if err != nil { if err != nil {
@ -129,16 +111,16 @@ func (p *pipe) preceive() (*pipeMessage, error) {
return pmsg, nil return pmsg, nil
} }
func (p *pipe) receive(mode int) (*Message, *PipeReceiver, *PipeSender, error) { func (p *pipe) receive(mode int) (*beam.Message, *PipeReceiver, *PipeSender, error) {
pmsg, err := p.preceive() pmsg, err := p.preceive()
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, err
} }
if pmsg.out != nil && mode&W == 0 { if pmsg.out != nil && mode&beam.W == 0 {
pmsg.out.Close() pmsg.out.Close()
pmsg.out = nil pmsg.out = nil
} }
if pmsg.in != nil && mode&R == 0 { if pmsg.in != nil && mode&beam.R == 0 {
pmsg.in.Close() pmsg.in.Close()
pmsg.in = nil pmsg.in = nil
} }
@ -173,14 +155,14 @@ type PipeReceiver struct {
p *pipe p *pipe
} }
func (r *PipeReceiver) Receive(mode int) (*Message, Receiver, Sender, error) { func (r *PipeReceiver) Receive(mode int) (*beam.Message, beam.Receiver, beam.Sender, error) {
msg, pin, pout, err := r.p.receive(mode) msg, pin, pout, err := r.p.receive(mode)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, err
} }
var ( var (
in Receiver in beam.Receiver
out Sender out beam.Sender
) )
if pin != nil { if pin != nil {
in = pin in = pin
@ -191,7 +173,7 @@ func (r *PipeReceiver) Receive(mode int) (*Message, Receiver, Sender, error) {
return msg, in, out, err return msg, in, out, err
} }
func (r *PipeReceiver) SendTo(dst Sender) (int, error) { func (r *PipeReceiver) SendTo(dst beam.Sender) (int, error) {
var n int var n int
// If the destination is a PipeSender, we can cheat // If the destination is a PipeSender, we can cheat
pdst, ok := dst.(*PipeSender) pdst, ok := dst.(*PipeSender)
@ -229,11 +211,11 @@ type PipeSender struct {
p *pipe p *pipe
} }
func (w *PipeSender) Send(msg *Message, mode int) (Receiver, Sender, error) { func (w *PipeSender) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) {
pin, pout, err := w.p.send(msg, mode) pin, pout, err := w.p.send(msg, mode)
var ( var (
in Receiver in beam.Receiver
out Sender out beam.Sender
) )
if pin != nil { if pin != nil {
in = pin in = pin
@ -244,7 +226,7 @@ func (w *PipeSender) Send(msg *Message, mode int) (Receiver, Sender, error) {
return in, out, err return in, out, err
} }
func (w *PipeSender) ReceiveFrom(src Receiver) (int, error) { func (w *PipeSender) ReceiveFrom(src beam.Receiver) (int, error) {
var n int var n int
// If the destination is a PipeReceiver, we can cheat // If the destination is a PipeReceiver, we can cheat
psrc, ok := src.(*PipeReceiver) psrc, ok := src.(*PipeReceiver)

View File

@ -1,22 +1,11 @@
package inmem package inmem
import ( import (
"github.com/docker/beam"
"github.com/dotcloud/docker/pkg/testutils" "github.com/dotcloud/docker/pkg/testutils"
"testing" "testing"
) )
func TestModes(t *testing.T) {
if R == W {
t.Fatalf("0")
}
if R == 0 {
t.Fatalf("0")
}
if W == 0 {
t.Fatalf("0")
}
}
func TestSimpleSend(t *testing.T) { func TestSimpleSend(t *testing.T) {
r, w := Pipe() r, w := Pipe()
defer r.Close() defer r.Close()
@ -35,7 +24,7 @@ func TestSimpleSend(t *testing.T) {
} }
assertMode(t, in, out, 0) assertMode(t, in, out, 0)
}() }()
in, out, err := w.Send(&Message{Name: "print", Args: []string{"hello world"}}, 0) in, out, err := w.Send(&beam.Message{Name: "print", Args: []string{"hello world"}}, 0)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -50,9 +39,9 @@ func TestSimpleSend(t *testing.T) {
// //
// If any of these conditions are not met, t.Fatal is called and the active // If any of these conditions are not met, t.Fatal is called and the active
// test fails. // test fails.
func assertMode(t *testing.T, r Receiver, w Sender, mode int) { func assertMode(t *testing.T, r beam.Receiver, w beam.Sender, mode int) {
// If mode has the R bit set, r must be non-nil // If mode has the R bit set, r must be non-nil
if mode&R != 0 { if mode&beam.R != 0 {
if r == nil { if r == nil {
t.Fatalf("should be non-nil: %#v", r) t.Fatalf("should be non-nil: %#v", r)
} }
@ -63,7 +52,7 @@ func assertMode(t *testing.T, r Receiver, w Sender, mode int) {
} }
} }
// If mode has the W bit set, w must be non-nil // If mode has the W bit set, w must be non-nil
if mode&W != 0 { if mode&beam.W != 0 {
if w == nil { if w == nil {
t.Fatalf("should be non-nil: %#v", w) t.Fatalf("should be non-nil: %#v", w)
} }
@ -83,11 +72,11 @@ func TestSendReply(t *testing.T) {
// Send // Send
go func() { go func() {
// Send a message with mode=R // Send a message with mode=R
in, out, err := w.Send(&Message{Args: []string{"this is the request"}}, R) in, out, err := w.Send(&beam.Message{Args: []string{"this is the request"}}, beam.R)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
assertMode(t, in, out, R) assertMode(t, in, out, beam.R)
// Read for a reply // Read for a reply
resp, _, _, err := in.Receive(0) resp, _, _, err := in.Receive(0)
if err != nil { if err != nil {
@ -98,16 +87,16 @@ func TestSendReply(t *testing.T) {
} }
}() }()
// Receive a message with mode=W // Receive a message with mode=W
msg, in, out, err := r.Receive(W) msg, in, out, err := r.Receive(beam.W)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if msg.Args[0] != "this is the request" { if msg.Args[0] != "this is the request" {
t.Fatalf("%#v", msg) t.Fatalf("%#v", msg)
} }
assertMode(t, in, out, W) assertMode(t, in, out, beam.W)
// Send a reply // Send a reply
_, _, err = out.Send(&Message{Args: []string{"this is the reply"}}, 0) _, _, err = out.Send(&beam.Message{Args: []string{"this is the reply"}}, 0)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -122,26 +111,26 @@ func TestSendNested(t *testing.T) {
// Send // Send
go func() { go func() {
// Send a message with mode=W // Send a message with mode=W
in, out, err := w.Send(&Message{Args: []string{"this is the request"}}, W) in, out, err := w.Send(&beam.Message{Args: []string{"this is the request"}}, beam.W)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
assertMode(t, in, out, W) assertMode(t, in, out, beam.W)
// Send a nested message // Send a nested message
_, _, err = out.Send(&Message{Args: []string{"this is the nested message"}}, 0) _, _, err = out.Send(&beam.Message{Args: []string{"this is the nested message"}}, 0)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
}() }()
// Receive a message with mode=R // Receive a message with mode=R
msg, in, out, err := r.Receive(R) msg, in, out, err := r.Receive(beam.R)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if msg.Args[0] != "this is the request" { if msg.Args[0] != "this is the request" {
t.Fatalf("%#v", msg) t.Fatalf("%#v", msg)
} }
assertMode(t, in, out, R) assertMode(t, in, out, beam.R)
// Read for a nested message // Read for a nested message
nested, _, _, err := in.Receive(0) nested, _, _, err := in.Receive(0)
if err != nil { if err != nil {

View File

@ -1,12 +1,13 @@
package inmem package inmem
import ( import (
"github.com/docker/beam"
"io" "io"
) )
type NopSender struct{} type NopSender struct{}
func (s NopSender) Send(msg *Message, mode int) (Receiver, Sender, error) { func (s NopSender) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) {
return NopReceiver{}, NopSender{}, nil return NopReceiver{}, NopSender{}, nil
} }
@ -16,6 +17,6 @@ func (s NopSender) Close() error {
type NopReceiver struct{} type NopReceiver struct{}
func (r NopReceiver) Receive(mode int) (*Message, Receiver, Sender, error) { func (r NopReceiver) Receive(mode int) (*beam.Message, beam.Receiver, beam.Sender, error) {
return nil, nil, nil, io.EOF return nil, nil, nil, io.EOF
} }

184
router.go
View File

@ -1,184 +0,0 @@
package beam
import (
"fmt"
"github.com/dotcloud/docker/pkg/beam/data"
"io"
"os"
)
type Router struct {
routes []*Route
sink Sender
}
func NewRouter(sink Sender) *Router {
return &Router{sink: sink}
}
func (r *Router) Send(payload []byte, attachment *os.File) (err error) {
//fmt.Printf("Router.Send(%s)\n", MsgDesc(payload, attachment))
defer func() {
//fmt.Printf("DONE Router.Send(%s) = %v\n", MsgDesc(payload, attachment), err)
}()
for _, route := range r.routes {
if route.Match(payload, attachment) {
return route.Handle(payload, attachment)
}
}
if r.sink != nil {
// fmt.Printf("[%d] [Router.Send] no match. sending %s to sink %#v\n", os.Getpid(), MsgDesc(payload, attachment), r.sink)
return r.sink.Send(payload, attachment)
}
//fmt.Printf("[Router.Send] no match. return error.\n")
return fmt.Errorf("no matching route")
}
func (r *Router) NewRoute() *Route {
route := &Route{}
r.routes = append(r.routes, route)
return route
}
type Route struct {
rules []func([]byte, *os.File) bool
handler func([]byte, *os.File) error
}
func (route *Route) Match(payload []byte, attachment *os.File) bool {
for _, rule := range route.rules {
if !rule(payload, attachment) {
return false
}
}
return true
}
func (route *Route) Handle(payload []byte, attachment *os.File) error {
if route.handler == nil {
return nil
}
return route.handler(payload, attachment)
}
func (r *Route) HasAttachment() *Route {
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
return attachment != nil
})
return r
}
func (route *Route) Tee(dst Sender) *Route {
inner := route.handler
route.handler = func(payload []byte, attachment *os.File) error {
if inner == nil {
return nil
}
if attachment == nil {
return inner(payload, attachment)
}
// Setup the tee
w, err := SendRPipe(dst, payload)
if err != nil {
return err
}
teeR, teeW, err := os.Pipe()
if err != nil {
w.Close()
return err
}
go func() {
io.Copy(io.MultiWriter(teeW, w), attachment)
attachment.Close()
w.Close()
teeW.Close()
}()
return inner(payload, teeR)
}
return route
}
func (r *Route) Filter(f func([]byte, *os.File) bool) *Route {
r.rules = append(r.rules, f)
return r
}
func (r *Route) KeyStartsWith(k string, beginning ...string) *Route {
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
values := data.Message(payload).Get(k)
if values == nil {
return false
}
if len(values) < len(beginning) {
return false
}
for i, v := range beginning {
if v != values[i] {
return false
}
}
return true
})
return r
}
func (r *Route) KeyEquals(k string, full ...string) *Route {
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
values := data.Message(payload).Get(k)
if len(values) != len(full) {
return false
}
for i, v := range full {
if v != values[i] {
return false
}
}
return true
})
return r
}
func (r *Route) KeyIncludes(k, v string) *Route {
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
for _, val := range data.Message(payload).Get(k) {
if val == v {
return true
}
}
return false
})
return r
}
func (r *Route) NoKey(k string) *Route {
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
return len(data.Message(payload).Get(k)) == 0
})
return r
}
func (r *Route) KeyExists(k string) *Route {
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
return data.Message(payload).Get(k) != nil
})
return r
}
func (r *Route) Passthrough(dst Sender) *Route {
r.handler = func(payload []byte, attachment *os.File) error {
return dst.Send(payload, attachment)
}
return r
}
func (r *Route) All() *Route {
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
return true
})
return r
}
func (r *Route) Handler(h func([]byte, *os.File) error) *Route {
r.handler = h
return r
}

View File

@ -1,95 +0,0 @@
package beam
import (
"fmt"
"io/ioutil"
"os"
"sync"
"testing"
)
type msg struct {
payload []byte
attachment *os.File
}
func (m msg) String() string {
return MsgDesc(m.payload, m.attachment)
}
type mockReceiver []msg
func (r *mockReceiver) Send(p []byte, a *os.File) error {
(*r) = append((*r), msg{p, a})
return nil
}
func TestSendNoSinkNoRoute(t *testing.T) {
r := NewRouter(nil)
if err := r.Send([]byte("hello"), nil); err == nil {
t.Fatalf("error expected")
}
a, b, err := os.Pipe()
if err != nil {
t.Fatal(err)
}
defer a.Close()
defer b.Close()
if err := r.Send([]byte("foo bar baz"), a); err == nil {
t.Fatalf("error expected")
}
}
func TestSendSinkNoRoute(t *testing.T) {
var sink mockReceiver
r := NewRouter(&sink)
if err := r.Send([]byte("hello"), nil); err != nil {
t.Fatal(err)
}
a, b, err := os.Pipe()
if err != nil {
t.Fatal(err)
}
defer a.Close()
defer b.Close()
if err := r.Send([]byte("world"), a); err != nil {
t.Fatal(err)
}
if len(sink) != 2 {
t.Fatalf("%#v\n", sink)
}
if string(sink[0].payload) != "hello" {
t.Fatalf("%#v\n", sink)
}
if sink[0].attachment != nil {
t.Fatalf("%#v\n", sink)
}
if string(sink[1].payload) != "world" {
t.Fatalf("%#v\n", sink)
}
if sink[1].attachment == nil || sink[1].attachment.Fd() > 42 || sink[1].attachment.Fd() < 0 {
t.Fatalf("%v\n", sink)
}
var tasks sync.WaitGroup
tasks.Add(2)
go func() {
defer tasks.Done()
fmt.Printf("[%d] Reading from '%d'\n", os.Getpid(), sink[1].attachment.Fd())
data, err := ioutil.ReadAll(sink[1].attachment)
if err != nil {
t.Fatal(err)
}
if string(data) != "foo bar\n" {
t.Fatalf("%v\n", string(data))
}
}()
go func() {
defer tasks.Done()
fmt.Printf("[%d] writing to '%d'\n", os.Getpid(), a.Fd())
if _, err := fmt.Fprintf(b, "foo bar\n"); err != nil {
t.Fatal(err)
}
b.Close()
}()
tasks.Wait()
}

View File

@ -1,85 +0,0 @@
package beam
import (
"net"
)
// Listen is a convenience interface for applications to create service endpoints
// which can be easily used with existing networking code.
//
// Listen registers a new service endpoint on the beam connection `conn`, using the
// service name `name`. It returns a listener which can be used in the usual
// way. Calling Accept() on the listener will block until a new connection is available
// on the service endpoint. The endpoint is then returned as a regular net.Conn and
// can be used as a regular network connection.
//
// Note that if the underlying file descriptor received in attachment is nil or does
// not point to a connection, that message will be skipped.
//
func Listen(conn Sender, name string) (net.Listener, error) {
endpoint, err := SendConn(conn, []byte(name))
if err != nil {
return nil, err
}
return &listener{
name: name,
endpoint: endpoint,
}, nil
}
func Connect(ctx *UnixConn, name string) (net.Conn, error) {
l, err := Listen(ctx, name)
if err != nil {
return nil, err
}
conn, err := l.Accept()
if err != nil {
return nil, err
}
return conn, nil
}
type listener struct {
name string
endpoint ReceiveCloser
}
func (l *listener) Accept() (net.Conn, error) {
for {
_, f, err := l.endpoint.Receive()
if err != nil {
return nil, err
}
if f == nil {
// Skip empty attachments
continue
}
conn, err := net.FileConn(f)
if err != nil {
// Skip beam attachments which are not connections
// (for example might be a regular file, directory etc)
continue
}
return conn, nil
}
panic("impossibru!")
return nil, nil
}
func (l *listener) Close() error {
return l.endpoint.Close()
}
func (l *listener) Addr() net.Addr {
return addr(l.name)
}
type addr string
func (a addr) Network() string {
return "beam"
}
func (a addr) String() string {
return string(a)
}

166
unix/beam.go Normal file
View File

@ -0,0 +1,166 @@
package unix
import (
"fmt"
"io"
"os"
)
type Sender interface {
Send([]byte, *os.File) error
}
type Receiver interface {
Receive() ([]byte, *os.File, error)
}
type ReceiveCloser interface {
Receiver
Close() error
}
type SendCloser interface {
Sender
Close() error
}
type ReceiveSender interface {
Receiver
Sender
}
const (
R int = 1 << (32 - 1 - iota)
W
)
func sendPipe(dst Sender, data []byte, mode int) (*os.File, error) {
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
var (
remote *os.File
local *os.File
)
if mode == R {
remote = r
local = w
} else if mode == W {
remote = w
local = r
}
if err := dst.Send(data, remote); err != nil {
local.Close()
remote.Close()
return nil, err
}
return local, nil
}
// SendRPipe create a pipe and sends its *read* end attached in a beam message
// to `dst`, with `data` as the message payload.
// It returns the *write* end of the pipe, or an error.
func SendRPipe(dst Sender, data []byte) (*os.File, error) {
return sendPipe(dst, data, R)
}
// SendWPipe create a pipe and sends its *read* end attached in a beam message
// to `dst`, with `data` as the message payload.
// It returns the *write* end of the pipe, or an error.
func SendWPipe(dst Sender, data []byte) (*os.File, error) {
return sendPipe(dst, data, W)
}
func SendConn(dst Sender, data []byte) (conn *UnixConn, err error) {
local, remote, err := SocketPair()
if err != nil {
return nil, err
}
defer func() {
if err != nil {
local.Close()
remote.Close()
}
}()
conn, err = FileConn(local)
if err != nil {
return nil, err
}
local.Close()
if err := dst.Send(data, remote); err != nil {
return nil, err
}
return conn, nil
}
func ReceiveConn(src Receiver) ([]byte, *UnixConn, error) {
for {
data, f, err := src.Receive()
if err != nil {
return nil, nil, err
}
if f == nil {
// Skip empty attachments
continue
}
conn, err := FileConn(f)
if err != nil {
// Skip beam attachments which are not connections
// (for example might be a regular file, directory etc)
continue
}
return data, conn, nil
}
panic("impossibru!")
return nil, nil, nil
}
func Copy(dst Sender, src Receiver) (int, error) {
var n int
for {
payload, attachment, err := src.Receive()
if err == io.EOF {
return n, nil
} else if err != nil {
return n, err
}
if err := dst.Send(payload, attachment); err != nil {
if attachment != nil {
attachment.Close()
}
return n, err
}
n++
}
panic("impossibru!")
return n, nil
}
// MsgDesc returns a human readable description of a beam message, usually
// for debugging purposes.
func MsgDesc(payload []byte, attachment *os.File) string {
var filedesc string = "<nil>"
if attachment != nil {
filedesc = fmt.Sprintf("%d", attachment.Fd())
}
return fmt.Sprintf("'%s'[%s]", payload, filedesc)
}
type devnull struct{}
func Devnull() ReceiveSender {
return devnull{}
}
func (d devnull) Send(p []byte, a *os.File) error {
if a != nil {
a.Close()
}
return nil
}
func (d devnull) Receive() ([]byte, *os.File, error) {
return nil, nil, io.EOF
}

39
unix/beam_test.go Normal file
View File

@ -0,0 +1,39 @@
package unix
import (
"github.com/dotcloud/docker/pkg/beam/data"
"testing"
)
func TestSendConn(t *testing.T) {
a, b, err := USocketPair()
if err != nil {
t.Fatal(err)
}
defer a.Close()
defer b.Close()
go func() {
conn, err := SendConn(a, data.Empty().Set("type", "connection").Bytes())
if err != nil {
t.Fatal(err)
}
if err := conn.Send(data.Empty().Set("foo", "bar").Bytes(), nil); err != nil {
t.Fatal(err)
}
conn.CloseWrite()
}()
payload, conn, err := ReceiveConn(b)
if err != nil {
t.Fatal(err)
}
if val := data.Message(string(payload)).Get("type"); val == nil || val[0] != "connection" {
t.Fatalf("%v != %v\n", val, "connection")
}
msg, _, err := conn.Receive()
if err != nil {
t.Fatal(err)
}
if val := data.Message(string(msg)).Get("foo"); val == nil || val[0] != "bar" {
t.Fatalf("%v != %v\n", val, "bar")
}
}

121
unix/conn.go Normal file
View File

@ -0,0 +1,121 @@
package unix
import (
"fmt"
"os"
"github.com/docker/beam"
"github.com/docker/beam/data"
)
func Pair() (*Conn, *Conn, error) {
c1, c2, err := USocketPair()
if err != nil {
return nil, nil, err
}
return &Conn{c1}, &Conn{c2}, nil
}
type Conn struct {
*UnixConn
}
func sendablePair() (conn *UnixConn, remoteFd *os.File, err error) {
// Get 2 *os.File
local, remote, err := SocketPair()
if err != nil {
return nil, nil, err
}
defer func() {
if err != nil {
local.Close()
remote.Close()
}
}()
// Convert 1 to *net.UnixConn
conn, err = FileConn(local)
if err != nil {
return nil, nil, err
}
local.Close()
// Return the "mismatched" pair
return conn, remote, nil
}
// This implements beam.Sender.Close which *only closes the sender*.
// This is similar to the pattern of only closing go channels from
// the sender's side.
// If you want to close the entire connection, call Conn.UnixConn.Close.
func (c *Conn) Close() error {
return c.UnixConn.CloseWrite()
}
func (c *Conn) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) {
parts := []string{msg.Name}
parts = append(parts, msg.Args...)
b := []byte(data.EncodeList(parts))
// Setup nested streams
var (
fd *os.File
r beam.Receiver
w beam.Sender
)
if mode&(beam.R|beam.W) != 0 {
local, remote, err := sendablePair()
if err != nil {
return nil, nil, err
}
fd = remote
if mode&beam.R != 0 {
r = &Conn{local}
}
if mode&beam.W != 0 {
w = &Conn{local}
} else {
local.CloseWrite()
}
}
c.UnixConn.Send(b, fd)
return r, w, nil
}
func (c *Conn) Receive(mode int) (*beam.Message, beam.Receiver, beam.Sender, error) {
b, fd, err := c.UnixConn.Receive()
if err != nil {
return nil, nil, nil, err
}
parts, n, err := data.DecodeList(string(b))
if err != nil {
return nil, nil, nil, err
}
if n != len(b) {
return nil, nil, nil, fmt.Errorf("garbage data %#v", b[:n])
}
if len(parts) == 0 {
return nil, nil, nil, fmt.Errorf("malformed message")
}
msg := &beam.Message{parts[0], parts[1:]}
// Setup nested streams
var (
r beam.Receiver
w beam.Sender
)
// Apply mode mask
if fd != nil {
subconn, err := FileConn(fd)
if err != nil {
return nil, nil, nil, err
}
fd.Close()
if mode&beam.R != 0 {
r = &Conn{subconn}
}
if mode&beam.W != 0 {
w = &Conn{subconn}
} else {
subconn.CloseWrite()
}
}
return msg, r, w, nil
}

154
unix/conn_test.go Normal file
View File

@ -0,0 +1,154 @@
package unix
import (
"github.com/docker/beam"
"github.com/dotcloud/docker/pkg/testutils"
"testing"
)
func TestPair(t *testing.T) {
r, w, err := Pair()
if err != nil {
t.Fatal("Unexpected error")
}
defer r.Close()
defer w.Close()
testutils.Timeout(t, func() {
go func() {
msg, in, out, err := r.Receive(0)
if err != nil {
t.Fatal(err)
}
if msg.Name != "print" {
t.Fatalf("%#v", *msg)
}
if msg.Args[0] != "hello world" {
t.Fatalf("%#v", *msg)
}
if in != nil && out != nil {
t.Fatal("Unexpected return value")
}
}()
_, _, err := w.Send(&beam.Message{Name: "print", Args: []string{"hello world"}}, 0)
if err != nil {
t.Fatal(err)
}
})
}
func TestSendReply(t *testing.T) {
r, w, err := Pair()
if err != nil {
t.Fatal(err)
}
defer r.Close()
defer w.Close()
testutils.Timeout(t, func() {
// Send
go func() {
// Send a message with mode=R
in, out, err := w.Send(&beam.Message{Args: []string{"this is the request"}}, beam.R)
if err != nil {
t.Fatal(err)
}
assertMode(t, in, out, beam.R)
// Read for a reply
resp, _, _, err := in.Receive(0)
if err != nil {
t.Fatal(err)
}
if resp.Args[0] != "this is the reply" {
t.Fatalf("%#v", resp)
}
}()
// Receive a message with mode=W
msg, in, out, err := r.Receive(beam.W)
if err != nil {
t.Fatal(err)
}
if msg.Args[0] != "this is the request" {
t.Fatalf("%#v", msg)
}
assertMode(t, in, out, beam.W)
// Send a reply
_, _, err = out.Send(&beam.Message{Args: []string{"this is the reply"}}, 0)
if err != nil {
t.Fatal(err)
}
})
}
func TestSendNested(t *testing.T) {
r, w, err := Pair()
if err != nil {
t.Fatal(err)
}
defer r.Close()
defer w.Close()
testutils.Timeout(t, func() {
// Send
go func() {
// Send a message with mode=W
in, out, err := w.Send(&beam.Message{Args: []string{"this is the request"}}, beam.W)
if err != nil {
t.Fatal(err)
}
assertMode(t, in, out, beam.W)
// Send a nested message
_, _, err = out.Send(&beam.Message{Args: []string{"this is the nested message"}}, 0)
if err != nil {
t.Fatal(err)
}
}()
// Receive a message with mode=R
msg, in, out, err := r.Receive(beam.R)
if err != nil {
t.Fatal(err)
}
if msg.Args[0] != "this is the request" {
t.Fatalf("%#v", msg)
}
assertMode(t, in, out, beam.R)
// Read for a nested message
nested, _, _, err := in.Receive(0)
if err != nil {
t.Fatal(err)
}
if nested.Args[0] != "this is the nested message" {
t.Fatalf("%#v", nested)
}
})
}
// FIXME: duplicate from inmem/inmem_test.go
// assertMode verifies that the values of r and w match
// mode.
// If mode has the R bit set, r must be non-nil. Otherwise it must be nil.
// If mode has the W bit set, w must be non-nil. Otherwise it must be nil.
//
// If any of these conditions are not met, t.Fatal is called and the active
// test fails.
func assertMode(t *testing.T, r beam.Receiver, w beam.Sender, mode int) {
// If mode has the R bit set, r must be non-nil
if mode&beam.R != 0 {
if r == nil {
t.Fatalf("should be non-nil: %#v", r)
}
// Otherwise it must be nil.
} else {
if r != nil {
t.Fatalf("should be nil: %#v", r)
}
}
// If mode has the W bit set, w must be non-nil
if mode&beam.W != 0 {
if w == nil {
t.Fatalf("should be non-nil: %#v", w)
}
// Otherwise it must be nil.
} else {
if w != nil {
t.Fatalf("should be nil: %#v", w)
}
}
}

View File

@ -1,4 +1,4 @@
package beam package unix
import ( import (
"bufio" "bufio"

View File

@ -1,4 +1,4 @@
package beam package unix
import ( import (
"fmt" "fmt"

View File

@ -1,8 +1,9 @@
package inmem package utils
import ( import (
"container/list" "container/list"
"fmt" "fmt"
"github.com/docker/beam"
"strings" "strings"
"sync" "sync"
) )
@ -23,8 +24,8 @@ func NewStackSender() *StackSender {
} }
} }
func (s *StackSender) Send(msg *Message, mode int) (r Receiver, w Sender, err error) { func (s *StackSender) Send(msg *beam.Message, mode int) (r beam.Receiver, w beam.Sender, err error) {
completed := s.walk(func(h Sender) (ok bool) { completed := s.walk(func(h beam.Sender) (ok bool) {
r, w, err = h.Send(msg, mode) r, w, err = h.Send(msg, mode)
fmt.Printf("[stacksender] sending %v to %#v returned %v\n", msg, h, err) fmt.Printf("[stacksender] sending %v to %#v returned %v\n", msg, h, err)
if err == nil { if err == nil {
@ -40,7 +41,7 @@ func (s *StackSender) Send(msg *Message, mode int) (r Receiver, w Sender, err er
return nil, nil, nil return nil, nil, nil
} }
func (s *StackSender) Add(dst Sender) *StackSender { func (s *StackSender) Add(dst beam.Sender) *StackSender {
s.l.Lock() s.l.Lock()
defer s.l.Unlock() defer s.l.Unlock()
prev := &StackSender{ prev := &StackSender{
@ -53,7 +54,7 @@ func (s *StackSender) Add(dst Sender) *StackSender {
} }
func (s *StackSender) Close() error { func (s *StackSender) Close() error {
s.walk(func(h Sender) bool { s.walk(func(h beam.Sender) bool {
h.Close() h.Close()
// remove all handlers // remove all handlers
return false return false
@ -67,7 +68,7 @@ func (s *StackSender) _walk(f func(*list.Element) bool) bool {
e = s.stack.Front() e = s.stack.Front()
s.l.RUnlock() s.l.RUnlock()
for e != nil { for e != nil {
fmt.Printf("[StackSender.Walk] %v\n", e.Value.(Sender)) fmt.Printf("[StackSender.Walk] %v\n", e.Value.(beam.Sender))
s.l.RLock() s.l.RLock()
next := e.Next() next := e.Next()
s.l.RUnlock() s.l.RUnlock()
@ -80,9 +81,9 @@ func (s *StackSender) _walk(f func(*list.Element) bool) bool {
return true return true
} }
func (s *StackSender) walk(f func(Sender) bool) bool { func (s *StackSender) walk(f func(beam.Sender) bool) bool {
return s._walk(func(e *list.Element) bool { return s._walk(func(e *list.Element) bool {
ok := f(e.Value.(Sender)) ok := f(e.Value.(beam.Sender))
if ok { if ok {
// Found a valid handler. Stop walking. // Found a valid handler. Stop walking.
return false return false
@ -104,7 +105,7 @@ func (s *StackSender) Len() int {
func (s *StackSender) String() string { func (s *StackSender) String() string {
var parts []string var parts []string
s._walk(func(e *list.Element) bool { s._walk(func(e *list.Element) bool {
parts = append(parts, fmt.Sprintf("%v", e.Value.(Sender))) parts = append(parts, fmt.Sprintf("%v", e.Value.(beam.Sender)))
return true return true
}) })
return fmt.Sprintf("%d:[%s]", len(parts), strings.Join(parts, "->")) return fmt.Sprintf("%d:[%s]", len(parts), strings.Join(parts, "->"))

View File

@ -1,13 +1,16 @@
package inmem package utils
import ( import (
"github.com/docker/beam"
"github.com/docker/beam/inmem"
"github.com/docker/beam/unix"
"github.com/dotcloud/docker/pkg/testutils" "github.com/dotcloud/docker/pkg/testutils"
"strings" "strings"
"testing" "testing"
) )
func TestSendStack(t *testing.T) { func TestStackWithPipe(t *testing.T) {
r, w := Pipe() r, w := inmem.Pipe()
defer r.Close() defer r.Close()
defer w.Close() defer w.Close()
s := NewStackSender() s := NewStackSender()
@ -25,7 +28,36 @@ func TestSendStack(t *testing.T) {
t.Fatalf("%#v", msg) t.Fatalf("%#v", msg)
} }
}() }()
_, _, err := s.Send(&Message{"hello", []string{"wonderful", "world"}}, 0) _, _, err := s.Send(&beam.Message{"hello", []string{"wonderful", "world"}}, 0)
if err != nil {
t.Fatal(err)
}
})
}
func TestStackWithPair(t *testing.T) {
r, w, err := unix.Pair()
if err != nil {
t.Fatal(err)
}
defer r.Close()
defer w.Close()
s := NewStackSender()
s.Add(w)
testutils.Timeout(t, func() {
go func() {
msg, _, _, err := r.Receive(0)
if err != nil {
t.Fatal(err)
}
if msg.Name != "hello" {
t.Fatalf("%#v", msg)
}
if strings.Join(msg.Args, " ") != "wonderful world" {
t.Fatalf("%#v", msg)
}
}()
_, _, err := s.Send(&beam.Message{"hello", []string{"wonderful", "world"}}, 0)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -41,7 +73,7 @@ func TestStackLen(t *testing.T) {
func TestStackAdd(t *testing.T) { func TestStackAdd(t *testing.T) {
s := NewStackSender() s := NewStackSender()
a := Buffer{} a := inmem.Buffer{}
beforeA := s.Add(&a) beforeA := s.Add(&a)
// Add on an empty StackSender should return an empty StackSender // Add on an empty StackSender should return an empty StackSender
if beforeA.Len() != 0 { if beforeA.Len() != 0 {
@ -51,7 +83,7 @@ func TestStackAdd(t *testing.T) {
t.Fatalf("%#v", beforeA) t.Fatalf("%#v", beforeA)
} }
// Add a 2nd element // Add a 2nd element
b := Buffer{} b := inmem.Buffer{}
beforeB := s.Add(&b) beforeB := s.Add(&b)
if beforeB.Len() != 1 { if beforeB.Len() != 1 {
t.Fatalf("%#v", beforeA) t.Fatalf("%#v", beforeA)
@ -59,9 +91,9 @@ func TestStackAdd(t *testing.T) {
if s.Len() != 2 { if s.Len() != 2 {
t.Fatalf("%#v", beforeA) t.Fatalf("%#v", beforeA)
} }
s.Send(&Message{"for b", nil}, 0) s.Send(&beam.Message{"for b", nil}, 0)
beforeB.Send(&Message{"for a", nil}, 0) beforeB.Send(&beam.Message{"for a", nil}, 0)
beforeA.Send(&Message{"for nobody", nil}, 0) beforeA.Send(&beam.Message{"for nobody", nil}, 0)
if len(a) != 1 { if len(a) != 1 {
t.Fatalf("%#v", a) t.Fatalf("%#v", a)
} }
@ -73,15 +105,15 @@ func TestStackAdd(t *testing.T) {
// Misbehaving backends must be removed // Misbehaving backends must be removed
func TestStackAddBad(t *testing.T) { func TestStackAddBad(t *testing.T) {
s := NewStackSender() s := NewStackSender()
buf := Buffer{} buf := inmem.Buffer{}
s.Add(&buf) s.Add(&buf)
r, w := Pipe() r, w := inmem.Pipe()
s.Add(w) s.Add(w)
if s.Len() != 2 { if s.Len() != 2 {
t.Fatalf("%#v", s) t.Fatalf("%#v", s)
} }
r.Close() r.Close()
if _, _, err := s.Send(&Message{"for the buffer", nil}, 0); err != nil { if _, _, err := s.Send(&beam.Message{"for the buffer", nil}, 0); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if s.Len() != 1 { if s.Len() != 1 {