diff --git a/beam.go b/beam.go index 88d3ea7c79..3403a1ec95 100644 --- a/beam.go +++ b/beam.go @@ -1,166 +1,22 @@ package beam -import ( - "fmt" - "io" - "os" -) +import () type Sender interface { - Send([]byte, *os.File) error + Send(msg *Message, mode int) (Receiver, Sender, error) + Close() error } type Receiver interface { - Receive() ([]byte, *os.File, error) + Receive(mode int) (*Message, Receiver, Sender, error) } -type ReceiveCloser interface { - Receiver - Close() error -} - -type SendCloser interface { - Sender - Close() error -} - -type ReceiveSender interface { - Receiver - Sender +type Message struct { + Name string + Args []string } const ( - R int = 1 << (32 - 1 - iota) + R = 1 << (32 - 1 - iota) W ) - -func sendPipe(dst Sender, data []byte, mode int) (*os.File, error) { - r, w, err := os.Pipe() - if err != nil { - return nil, err - } - var ( - remote *os.File - local *os.File - ) - if mode == R { - remote = r - local = w - } else if mode == W { - remote = w - local = r - } - if err := dst.Send(data, remote); err != nil { - local.Close() - remote.Close() - return nil, err - } - return local, nil - -} - -// SendRPipe create a pipe and sends its *read* end attached in a beam message -// to `dst`, with `data` as the message payload. -// It returns the *write* end of the pipe, or an error. -func SendRPipe(dst Sender, data []byte) (*os.File, error) { - return sendPipe(dst, data, R) -} - -// SendWPipe create a pipe and sends its *read* end attached in a beam message -// to `dst`, with `data` as the message payload. -// It returns the *write* end of the pipe, or an error. -func SendWPipe(dst Sender, data []byte) (*os.File, error) { - return sendPipe(dst, data, W) -} - -func SendConn(dst Sender, data []byte) (conn *UnixConn, err error) { - local, remote, err := SocketPair() - if err != nil { - return nil, err - } - defer func() { - if err != nil { - local.Close() - remote.Close() - } - }() - conn, err = FileConn(local) - if err != nil { - return nil, err - } - local.Close() - if err := dst.Send(data, remote); err != nil { - return nil, err - } - return conn, nil -} - -func ReceiveConn(src Receiver) ([]byte, *UnixConn, error) { - for { - data, f, err := src.Receive() - if err != nil { - return nil, nil, err - } - if f == nil { - // Skip empty attachments - continue - } - conn, err := FileConn(f) - if err != nil { - // Skip beam attachments which are not connections - // (for example might be a regular file, directory etc) - continue - } - return data, conn, nil - } - panic("impossibru!") - return nil, nil, nil -} - -func Copy(dst Sender, src Receiver) (int, error) { - var n int - for { - payload, attachment, err := src.Receive() - if err == io.EOF { - return n, nil - } else if err != nil { - return n, err - } - if err := dst.Send(payload, attachment); err != nil { - if attachment != nil { - attachment.Close() - } - return n, err - } - n++ - } - panic("impossibru!") - return n, nil -} - -// MsgDesc returns a human readable description of a beam message, usually -// for debugging purposes. -func MsgDesc(payload []byte, attachment *os.File) string { - var filedesc string = "" - if attachment != nil { - filedesc = fmt.Sprintf("%d", attachment.Fd()) - } - return fmt.Sprintf("'%s'[%s]", payload, filedesc) -} - -type devnull struct{} - -func Devnull() ReceiveSender { - return devnull{} -} - -func (d devnull) Send(p []byte, a *os.File) error { - if a != nil { - a.Close() - } - return nil -} - -func (d devnull) Receive() ([]byte, *os.File, error) { - return nil, nil, io.EOF -} diff --git a/beam_test.go b/beam_test.go index 2822861a37..ee0096b189 100644 --- a/beam_test.go +++ b/beam_test.go @@ -1,39 +1,17 @@ package beam import ( - "github.com/dotcloud/docker/pkg/beam/data" "testing" ) -func TestSendConn(t *testing.T) { - a, b, err := USocketPair() - if err != nil { - t.Fatal(err) +func TestModes(t *testing.T) { + if R == W { + t.Fatalf("0") } - defer a.Close() - defer b.Close() - go func() { - conn, err := SendConn(a, data.Empty().Set("type", "connection").Bytes()) - if err != nil { - t.Fatal(err) - } - if err := conn.Send(data.Empty().Set("foo", "bar").Bytes(), nil); err != nil { - t.Fatal(err) - } - conn.CloseWrite() - }() - payload, conn, err := ReceiveConn(b) - if err != nil { - t.Fatal(err) + if R == 0 { + t.Fatalf("0") } - if val := data.Message(string(payload)).Get("type"); val == nil || val[0] != "connection" { - t.Fatalf("%v != %v\n", val, "connection") - } - msg, _, err := conn.Receive() - if err != nil { - t.Fatal(err) - } - if val := data.Message(string(msg)).Get("foo"); val == nil || val[0] != "bar" { - t.Fatalf("%v != %v\n", val, "bar") + if W == 0 { + t.Fatalf("0") } } diff --git a/inmem/buf.go b/inmem/buf.go index 32243e0e2d..244953644b 100644 --- a/inmem/buf.go +++ b/inmem/buf.go @@ -1,10 +1,12 @@ package inmem -import () +import ( + "github.com/docker/beam" +) -type Buffer []*Message +type Buffer []*beam.Message -func (buf *Buffer) Send(msg *Message, mode int) (Receiver, Sender, error) { +func (buf *Buffer) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) { (*buf) = append(*buf, msg) return NopReceiver{}, NopSender{}, nil } diff --git a/inmem/copy.go b/inmem/copy.go index cc13eef5ef..8fe3e7b14e 100644 --- a/inmem/copy.go +++ b/inmem/copy.go @@ -2,15 +2,16 @@ package inmem import ( "errors" + "github.com/docker/beam" "sync" ) type ReceiverFrom interface { - ReceiveFrom(Receiver) (int, error) + ReceiveFrom(beam.Receiver) (int, error) } type SenderTo interface { - SendTo(Sender) (int, error) + SendTo(beam.Sender) (int, error) } var ( @@ -18,7 +19,7 @@ var ( ErrIncompatibleReceiver = errors.New("incompatible receiver") ) -func Copy(dst Sender, src Receiver) (int, error) { +func Copy(dst beam.Sender, src beam.Receiver) (int, error) { var tasks sync.WaitGroup defer tasks.Wait() if senderTo, ok := src.(SenderTo); ok { @@ -34,7 +35,7 @@ func Copy(dst Sender, src Receiver) (int, error) { var ( n int ) - copyAndClose := func(dst Sender, src Receiver) { + copyAndClose := func(dst beam.Sender, src beam.Receiver) { if dst == nil { return } @@ -45,11 +46,11 @@ func Copy(dst Sender, src Receiver) (int, error) { Copy(dst, src) } for { - msg, rcvR, rcvW, err := src.Receive(R | W) + msg, rcvR, rcvW, err := src.Receive(beam.R | beam.W) if err != nil { return n, err } - sndR, sndW, err := dst.Send(msg, R|W) + sndR, sndW, err := dst.Send(msg, beam.R|beam.W) if err != nil { if rcvW != nil { rcvW.Close() diff --git a/inmem/inmem.go b/inmem/inmem.go index 67007867f8..f401f7ac19 100644 --- a/inmem/inmem.go +++ b/inmem/inmem.go @@ -1,29 +1,11 @@ package inmem import ( + "github.com/docker/beam" "io" "sync" ) -type Sender interface { - Send(msg *Message, mode int) (Receiver, Sender, error) - Close() error -} - -type Receiver interface { - Receive(mode int) (*Message, Receiver, Sender, error) -} - -type Message struct { - Name string - Args []string -} - -const ( - R = 1 << (32 - 1 - iota) - W -) - func Pipe() (*PipeReceiver, *PipeSender) { p := new(pipe) p.rwait.L = &p.l @@ -46,7 +28,7 @@ type pipe struct { } type pipeMessage struct { - msg *Message + msg *beam.Message out *PipeSender in *PipeReceiver } @@ -78,10 +60,10 @@ func (p *pipe) psend(pmsg *pipeMessage) error { return err } -func (p *pipe) send(msg *Message, mode int) (in *PipeReceiver, out *PipeSender, err error) { +func (p *pipe) send(msg *beam.Message, mode int) (in *PipeReceiver, out *PipeSender, err error) { // Prepare the message pmsg := &pipeMessage{msg: msg} - if mode&R != 0 { + if mode&beam.R != 0 { in, pmsg.out = Pipe() defer func() { if err != nil { @@ -91,7 +73,7 @@ func (p *pipe) send(msg *Message, mode int) (in *PipeReceiver, out *PipeSender, } }() } - if mode&W != 0 { + if mode&beam.W != 0 { pmsg.in, out = Pipe() defer func() { if err != nil { @@ -129,16 +111,16 @@ func (p *pipe) preceive() (*pipeMessage, error) { return pmsg, nil } -func (p *pipe) receive(mode int) (*Message, *PipeReceiver, *PipeSender, error) { +func (p *pipe) receive(mode int) (*beam.Message, *PipeReceiver, *PipeSender, error) { pmsg, err := p.preceive() if err != nil { return nil, nil, nil, err } - if pmsg.out != nil && mode&W == 0 { + if pmsg.out != nil && mode&beam.W == 0 { pmsg.out.Close() pmsg.out = nil } - if pmsg.in != nil && mode&R == 0 { + if pmsg.in != nil && mode&beam.R == 0 { pmsg.in.Close() pmsg.in = nil } @@ -173,14 +155,14 @@ type PipeReceiver struct { p *pipe } -func (r *PipeReceiver) Receive(mode int) (*Message, Receiver, Sender, error) { +func (r *PipeReceiver) Receive(mode int) (*beam.Message, beam.Receiver, beam.Sender, error) { msg, pin, pout, err := r.p.receive(mode) if err != nil { return nil, nil, nil, err } var ( - in Receiver - out Sender + in beam.Receiver + out beam.Sender ) if pin != nil { in = pin @@ -191,7 +173,7 @@ func (r *PipeReceiver) Receive(mode int) (*Message, Receiver, Sender, error) { return msg, in, out, err } -func (r *PipeReceiver) SendTo(dst Sender) (int, error) { +func (r *PipeReceiver) SendTo(dst beam.Sender) (int, error) { var n int // If the destination is a PipeSender, we can cheat pdst, ok := dst.(*PipeSender) @@ -229,11 +211,11 @@ type PipeSender struct { p *pipe } -func (w *PipeSender) Send(msg *Message, mode int) (Receiver, Sender, error) { +func (w *PipeSender) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) { pin, pout, err := w.p.send(msg, mode) var ( - in Receiver - out Sender + in beam.Receiver + out beam.Sender ) if pin != nil { in = pin @@ -244,7 +226,7 @@ func (w *PipeSender) Send(msg *Message, mode int) (Receiver, Sender, error) { return in, out, err } -func (w *PipeSender) ReceiveFrom(src Receiver) (int, error) { +func (w *PipeSender) ReceiveFrom(src beam.Receiver) (int, error) { var n int // If the destination is a PipeReceiver, we can cheat psrc, ok := src.(*PipeReceiver) diff --git a/inmem/inmem_test.go b/inmem/inmem_test.go index 28660422e7..f8c7cb7b67 100644 --- a/inmem/inmem_test.go +++ b/inmem/inmem_test.go @@ -1,22 +1,11 @@ package inmem import ( + "github.com/docker/beam" "github.com/dotcloud/docker/pkg/testutils" "testing" ) -func TestModes(t *testing.T) { - if R == W { - t.Fatalf("0") - } - if R == 0 { - t.Fatalf("0") - } - if W == 0 { - t.Fatalf("0") - } -} - func TestSimpleSend(t *testing.T) { r, w := Pipe() defer r.Close() @@ -35,7 +24,7 @@ func TestSimpleSend(t *testing.T) { } assertMode(t, in, out, 0) }() - in, out, err := w.Send(&Message{Name: "print", Args: []string{"hello world"}}, 0) + in, out, err := w.Send(&beam.Message{Name: "print", Args: []string{"hello world"}}, 0) if err != nil { t.Fatal(err) } @@ -50,9 +39,9 @@ func TestSimpleSend(t *testing.T) { // // If any of these conditions are not met, t.Fatal is called and the active // test fails. -func assertMode(t *testing.T, r Receiver, w Sender, mode int) { +func assertMode(t *testing.T, r beam.Receiver, w beam.Sender, mode int) { // If mode has the R bit set, r must be non-nil - if mode&R != 0 { + if mode&beam.R != 0 { if r == nil { t.Fatalf("should be non-nil: %#v", r) } @@ -63,7 +52,7 @@ func assertMode(t *testing.T, r Receiver, w Sender, mode int) { } } // If mode has the W bit set, w must be non-nil - if mode&W != 0 { + if mode&beam.W != 0 { if w == nil { t.Fatalf("should be non-nil: %#v", w) } @@ -83,11 +72,11 @@ func TestSendReply(t *testing.T) { // Send go func() { // Send a message with mode=R - in, out, err := w.Send(&Message{Args: []string{"this is the request"}}, R) + in, out, err := w.Send(&beam.Message{Args: []string{"this is the request"}}, beam.R) if err != nil { t.Fatal(err) } - assertMode(t, in, out, R) + assertMode(t, in, out, beam.R) // Read for a reply resp, _, _, err := in.Receive(0) if err != nil { @@ -98,16 +87,16 @@ func TestSendReply(t *testing.T) { } }() // Receive a message with mode=W - msg, in, out, err := r.Receive(W) + msg, in, out, err := r.Receive(beam.W) if err != nil { t.Fatal(err) } if msg.Args[0] != "this is the request" { t.Fatalf("%#v", msg) } - assertMode(t, in, out, W) + assertMode(t, in, out, beam.W) // Send a reply - _, _, err = out.Send(&Message{Args: []string{"this is the reply"}}, 0) + _, _, err = out.Send(&beam.Message{Args: []string{"this is the reply"}}, 0) if err != nil { t.Fatal(err) } @@ -122,26 +111,26 @@ func TestSendNested(t *testing.T) { // Send go func() { // Send a message with mode=W - in, out, err := w.Send(&Message{Args: []string{"this is the request"}}, W) + in, out, err := w.Send(&beam.Message{Args: []string{"this is the request"}}, beam.W) if err != nil { t.Fatal(err) } - assertMode(t, in, out, W) + assertMode(t, in, out, beam.W) // Send a nested message - _, _, err = out.Send(&Message{Args: []string{"this is the nested message"}}, 0) + _, _, err = out.Send(&beam.Message{Args: []string{"this is the nested message"}}, 0) if err != nil { t.Fatal(err) } }() // Receive a message with mode=R - msg, in, out, err := r.Receive(R) + msg, in, out, err := r.Receive(beam.R) if err != nil { t.Fatal(err) } if msg.Args[0] != "this is the request" { t.Fatalf("%#v", msg) } - assertMode(t, in, out, R) + assertMode(t, in, out, beam.R) // Read for a nested message nested, _, _, err := in.Receive(0) if err != nil { diff --git a/inmem/nop.go b/inmem/nop.go index d4042a7e5c..2d822fe340 100644 --- a/inmem/nop.go +++ b/inmem/nop.go @@ -1,12 +1,13 @@ package inmem import ( + "github.com/docker/beam" "io" ) type NopSender struct{} -func (s NopSender) Send(msg *Message, mode int) (Receiver, Sender, error) { +func (s NopSender) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) { return NopReceiver{}, NopSender{}, nil } @@ -16,6 +17,6 @@ func (s NopSender) Close() error { type NopReceiver struct{} -func (r NopReceiver) Receive(mode int) (*Message, Receiver, Sender, error) { +func (r NopReceiver) Receive(mode int) (*beam.Message, beam.Receiver, beam.Sender, error) { return nil, nil, nil, io.EOF } diff --git a/router.go b/router.go deleted file mode 100644 index 15910e95b1..0000000000 --- a/router.go +++ /dev/null @@ -1,184 +0,0 @@ -package beam - -import ( - "fmt" - "github.com/dotcloud/docker/pkg/beam/data" - "io" - "os" -) - -type Router struct { - routes []*Route - sink Sender -} - -func NewRouter(sink Sender) *Router { - return &Router{sink: sink} -} - -func (r *Router) Send(payload []byte, attachment *os.File) (err error) { - //fmt.Printf("Router.Send(%s)\n", MsgDesc(payload, attachment)) - defer func() { - //fmt.Printf("DONE Router.Send(%s) = %v\n", MsgDesc(payload, attachment), err) - }() - for _, route := range r.routes { - if route.Match(payload, attachment) { - return route.Handle(payload, attachment) - } - } - if r.sink != nil { - // fmt.Printf("[%d] [Router.Send] no match. sending %s to sink %#v\n", os.Getpid(), MsgDesc(payload, attachment), r.sink) - return r.sink.Send(payload, attachment) - } - //fmt.Printf("[Router.Send] no match. return error.\n") - return fmt.Errorf("no matching route") -} - -func (r *Router) NewRoute() *Route { - route := &Route{} - r.routes = append(r.routes, route) - return route -} - -type Route struct { - rules []func([]byte, *os.File) bool - handler func([]byte, *os.File) error -} - -func (route *Route) Match(payload []byte, attachment *os.File) bool { - for _, rule := range route.rules { - if !rule(payload, attachment) { - return false - } - } - return true -} - -func (route *Route) Handle(payload []byte, attachment *os.File) error { - if route.handler == nil { - return nil - } - return route.handler(payload, attachment) -} - -func (r *Route) HasAttachment() *Route { - r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool { - return attachment != nil - }) - return r -} - -func (route *Route) Tee(dst Sender) *Route { - inner := route.handler - route.handler = func(payload []byte, attachment *os.File) error { - if inner == nil { - return nil - } - if attachment == nil { - return inner(payload, attachment) - } - // Setup the tee - w, err := SendRPipe(dst, payload) - if err != nil { - return err - } - teeR, teeW, err := os.Pipe() - if err != nil { - w.Close() - return err - } - go func() { - io.Copy(io.MultiWriter(teeW, w), attachment) - attachment.Close() - w.Close() - teeW.Close() - }() - return inner(payload, teeR) - } - return route -} - -func (r *Route) Filter(f func([]byte, *os.File) bool) *Route { - r.rules = append(r.rules, f) - return r -} - -func (r *Route) KeyStartsWith(k string, beginning ...string) *Route { - r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool { - values := data.Message(payload).Get(k) - if values == nil { - return false - } - if len(values) < len(beginning) { - return false - } - for i, v := range beginning { - if v != values[i] { - return false - } - } - return true - }) - return r -} - -func (r *Route) KeyEquals(k string, full ...string) *Route { - r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool { - values := data.Message(payload).Get(k) - if len(values) != len(full) { - return false - } - for i, v := range full { - if v != values[i] { - return false - } - } - return true - }) - return r -} - -func (r *Route) KeyIncludes(k, v string) *Route { - r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool { - for _, val := range data.Message(payload).Get(k) { - if val == v { - return true - } - } - return false - }) - return r -} - -func (r *Route) NoKey(k string) *Route { - r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool { - return len(data.Message(payload).Get(k)) == 0 - }) - return r -} - -func (r *Route) KeyExists(k string) *Route { - r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool { - return data.Message(payload).Get(k) != nil - }) - return r -} - -func (r *Route) Passthrough(dst Sender) *Route { - r.handler = func(payload []byte, attachment *os.File) error { - return dst.Send(payload, attachment) - } - return r -} - -func (r *Route) All() *Route { - r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool { - return true - }) - return r -} - -func (r *Route) Handler(h func([]byte, *os.File) error) *Route { - r.handler = h - return r -} diff --git a/router_test.go b/router_test.go deleted file mode 100644 index f7f7bf1d2d..0000000000 --- a/router_test.go +++ /dev/null @@ -1,95 +0,0 @@ -package beam - -import ( - "fmt" - "io/ioutil" - "os" - "sync" - "testing" -) - -type msg struct { - payload []byte - attachment *os.File -} - -func (m msg) String() string { - return MsgDesc(m.payload, m.attachment) -} - -type mockReceiver []msg - -func (r *mockReceiver) Send(p []byte, a *os.File) error { - (*r) = append((*r), msg{p, a}) - return nil -} - -func TestSendNoSinkNoRoute(t *testing.T) { - r := NewRouter(nil) - if err := r.Send([]byte("hello"), nil); err == nil { - t.Fatalf("error expected") - } - a, b, err := os.Pipe() - if err != nil { - t.Fatal(err) - } - defer a.Close() - defer b.Close() - if err := r.Send([]byte("foo bar baz"), a); err == nil { - t.Fatalf("error expected") - } -} - -func TestSendSinkNoRoute(t *testing.T) { - var sink mockReceiver - r := NewRouter(&sink) - if err := r.Send([]byte("hello"), nil); err != nil { - t.Fatal(err) - } - a, b, err := os.Pipe() - if err != nil { - t.Fatal(err) - } - defer a.Close() - defer b.Close() - if err := r.Send([]byte("world"), a); err != nil { - t.Fatal(err) - } - if len(sink) != 2 { - t.Fatalf("%#v\n", sink) - } - if string(sink[0].payload) != "hello" { - t.Fatalf("%#v\n", sink) - } - if sink[0].attachment != nil { - t.Fatalf("%#v\n", sink) - } - if string(sink[1].payload) != "world" { - t.Fatalf("%#v\n", sink) - } - if sink[1].attachment == nil || sink[1].attachment.Fd() > 42 || sink[1].attachment.Fd() < 0 { - t.Fatalf("%v\n", sink) - } - var tasks sync.WaitGroup - tasks.Add(2) - go func() { - defer tasks.Done() - fmt.Printf("[%d] Reading from '%d'\n", os.Getpid(), sink[1].attachment.Fd()) - data, err := ioutil.ReadAll(sink[1].attachment) - if err != nil { - t.Fatal(err) - } - if string(data) != "foo bar\n" { - t.Fatalf("%v\n", string(data)) - } - }() - go func() { - defer tasks.Done() - fmt.Printf("[%d] writing to '%d'\n", os.Getpid(), a.Fd()) - if _, err := fmt.Fprintf(b, "foo bar\n"); err != nil { - t.Fatal(err) - } - b.Close() - }() - tasks.Wait() -} diff --git a/service.go b/service.go deleted file mode 100644 index 8e117059cb..0000000000 --- a/service.go +++ /dev/null @@ -1,85 +0,0 @@ -package beam - -import ( - "net" -) - -// Listen is a convenience interface for applications to create service endpoints -// which can be easily used with existing networking code. -// -// Listen registers a new service endpoint on the beam connection `conn`, using the -// service name `name`. It returns a listener which can be used in the usual -// way. Calling Accept() on the listener will block until a new connection is available -// on the service endpoint. The endpoint is then returned as a regular net.Conn and -// can be used as a regular network connection. -// -// Note that if the underlying file descriptor received in attachment is nil or does -// not point to a connection, that message will be skipped. -// -func Listen(conn Sender, name string) (net.Listener, error) { - endpoint, err := SendConn(conn, []byte(name)) - if err != nil { - return nil, err - } - return &listener{ - name: name, - endpoint: endpoint, - }, nil -} - -func Connect(ctx *UnixConn, name string) (net.Conn, error) { - l, err := Listen(ctx, name) - if err != nil { - return nil, err - } - conn, err := l.Accept() - if err != nil { - return nil, err - } - return conn, nil -} - -type listener struct { - name string - endpoint ReceiveCloser -} - -func (l *listener) Accept() (net.Conn, error) { - for { - _, f, err := l.endpoint.Receive() - if err != nil { - return nil, err - } - if f == nil { - // Skip empty attachments - continue - } - conn, err := net.FileConn(f) - if err != nil { - // Skip beam attachments which are not connections - // (for example might be a regular file, directory etc) - continue - } - return conn, nil - } - panic("impossibru!") - return nil, nil -} - -func (l *listener) Close() error { - return l.endpoint.Close() -} - -func (l *listener) Addr() net.Addr { - return addr(l.name) -} - -type addr string - -func (a addr) Network() string { - return "beam" -} - -func (a addr) String() string { - return string(a) -} diff --git a/unix/beam.go b/unix/beam.go new file mode 100644 index 0000000000..9e6dc90f1b --- /dev/null +++ b/unix/beam.go @@ -0,0 +1,166 @@ +package unix + +import ( + "fmt" + "io" + "os" +) + +type Sender interface { + Send([]byte, *os.File) error +} + +type Receiver interface { + Receive() ([]byte, *os.File, error) +} + +type ReceiveCloser interface { + Receiver + Close() error +} + +type SendCloser interface { + Sender + Close() error +} + +type ReceiveSender interface { + Receiver + Sender +} + +const ( + R int = 1 << (32 - 1 - iota) + W +) + +func sendPipe(dst Sender, data []byte, mode int) (*os.File, error) { + r, w, err := os.Pipe() + if err != nil { + return nil, err + } + var ( + remote *os.File + local *os.File + ) + if mode == R { + remote = r + local = w + } else if mode == W { + remote = w + local = r + } + if err := dst.Send(data, remote); err != nil { + local.Close() + remote.Close() + return nil, err + } + return local, nil + +} + +// SendRPipe create a pipe and sends its *read* end attached in a beam message +// to `dst`, with `data` as the message payload. +// It returns the *write* end of the pipe, or an error. +func SendRPipe(dst Sender, data []byte) (*os.File, error) { + return sendPipe(dst, data, R) +} + +// SendWPipe create a pipe and sends its *read* end attached in a beam message +// to `dst`, with `data` as the message payload. +// It returns the *write* end of the pipe, or an error. +func SendWPipe(dst Sender, data []byte) (*os.File, error) { + return sendPipe(dst, data, W) +} + +func SendConn(dst Sender, data []byte) (conn *UnixConn, err error) { + local, remote, err := SocketPair() + if err != nil { + return nil, err + } + defer func() { + if err != nil { + local.Close() + remote.Close() + } + }() + conn, err = FileConn(local) + if err != nil { + return nil, err + } + local.Close() + if err := dst.Send(data, remote); err != nil { + return nil, err + } + return conn, nil +} + +func ReceiveConn(src Receiver) ([]byte, *UnixConn, error) { + for { + data, f, err := src.Receive() + if err != nil { + return nil, nil, err + } + if f == nil { + // Skip empty attachments + continue + } + conn, err := FileConn(f) + if err != nil { + // Skip beam attachments which are not connections + // (for example might be a regular file, directory etc) + continue + } + return data, conn, nil + } + panic("impossibru!") + return nil, nil, nil +} + +func Copy(dst Sender, src Receiver) (int, error) { + var n int + for { + payload, attachment, err := src.Receive() + if err == io.EOF { + return n, nil + } else if err != nil { + return n, err + } + if err := dst.Send(payload, attachment); err != nil { + if attachment != nil { + attachment.Close() + } + return n, err + } + n++ + } + panic("impossibru!") + return n, nil +} + +// MsgDesc returns a human readable description of a beam message, usually +// for debugging purposes. +func MsgDesc(payload []byte, attachment *os.File) string { + var filedesc string = "" + if attachment != nil { + filedesc = fmt.Sprintf("%d", attachment.Fd()) + } + return fmt.Sprintf("'%s'[%s]", payload, filedesc) +} + +type devnull struct{} + +func Devnull() ReceiveSender { + return devnull{} +} + +func (d devnull) Send(p []byte, a *os.File) error { + if a != nil { + a.Close() + } + return nil +} + +func (d devnull) Receive() ([]byte, *os.File, error) { + return nil, nil, io.EOF +} diff --git a/unix/beam_test.go b/unix/beam_test.go new file mode 100644 index 0000000000..83bd91e0d3 --- /dev/null +++ b/unix/beam_test.go @@ -0,0 +1,39 @@ +package unix + +import ( + "github.com/dotcloud/docker/pkg/beam/data" + "testing" +) + +func TestSendConn(t *testing.T) { + a, b, err := USocketPair() + if err != nil { + t.Fatal(err) + } + defer a.Close() + defer b.Close() + go func() { + conn, err := SendConn(a, data.Empty().Set("type", "connection").Bytes()) + if err != nil { + t.Fatal(err) + } + if err := conn.Send(data.Empty().Set("foo", "bar").Bytes(), nil); err != nil { + t.Fatal(err) + } + conn.CloseWrite() + }() + payload, conn, err := ReceiveConn(b) + if err != nil { + t.Fatal(err) + } + if val := data.Message(string(payload)).Get("type"); val == nil || val[0] != "connection" { + t.Fatalf("%v != %v\n", val, "connection") + } + msg, _, err := conn.Receive() + if err != nil { + t.Fatal(err) + } + if val := data.Message(string(msg)).Get("foo"); val == nil || val[0] != "bar" { + t.Fatalf("%v != %v\n", val, "bar") + } +} diff --git a/unix/conn.go b/unix/conn.go new file mode 100644 index 0000000000..d76832c261 --- /dev/null +++ b/unix/conn.go @@ -0,0 +1,121 @@ +package unix + +import ( + "fmt" + "os" + + "github.com/docker/beam" + "github.com/docker/beam/data" +) + +func Pair() (*Conn, *Conn, error) { + c1, c2, err := USocketPair() + if err != nil { + return nil, nil, err + } + return &Conn{c1}, &Conn{c2}, nil +} + +type Conn struct { + *UnixConn +} + +func sendablePair() (conn *UnixConn, remoteFd *os.File, err error) { + // Get 2 *os.File + local, remote, err := SocketPair() + if err != nil { + return nil, nil, err + } + defer func() { + if err != nil { + local.Close() + remote.Close() + } + }() + // Convert 1 to *net.UnixConn + conn, err = FileConn(local) + if err != nil { + return nil, nil, err + } + local.Close() + // Return the "mismatched" pair + return conn, remote, nil +} + +// This implements beam.Sender.Close which *only closes the sender*. +// This is similar to the pattern of only closing go channels from +// the sender's side. +// If you want to close the entire connection, call Conn.UnixConn.Close. +func (c *Conn) Close() error { + return c.UnixConn.CloseWrite() +} + +func (c *Conn) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) { + parts := []string{msg.Name} + parts = append(parts, msg.Args...) + b := []byte(data.EncodeList(parts)) + // Setup nested streams + var ( + fd *os.File + r beam.Receiver + w beam.Sender + ) + if mode&(beam.R|beam.W) != 0 { + local, remote, err := sendablePair() + if err != nil { + return nil, nil, err + } + fd = remote + if mode&beam.R != 0 { + r = &Conn{local} + } + if mode&beam.W != 0 { + w = &Conn{local} + } else { + local.CloseWrite() + } + } + c.UnixConn.Send(b, fd) + return r, w, nil +} + +func (c *Conn) Receive(mode int) (*beam.Message, beam.Receiver, beam.Sender, error) { + b, fd, err := c.UnixConn.Receive() + if err != nil { + return nil, nil, nil, err + } + parts, n, err := data.DecodeList(string(b)) + if err != nil { + return nil, nil, nil, err + } + if n != len(b) { + return nil, nil, nil, fmt.Errorf("garbage data %#v", b[:n]) + } + if len(parts) == 0 { + return nil, nil, nil, fmt.Errorf("malformed message") + } + msg := &beam.Message{parts[0], parts[1:]} + + // Setup nested streams + var ( + r beam.Receiver + w beam.Sender + ) + // Apply mode mask + if fd != nil { + subconn, err := FileConn(fd) + if err != nil { + return nil, nil, nil, err + } + fd.Close() + if mode&beam.R != 0 { + r = &Conn{subconn} + } + if mode&beam.W != 0 { + w = &Conn{subconn} + } else { + subconn.CloseWrite() + } + } + return msg, r, w, nil +} diff --git a/unix/conn_test.go b/unix/conn_test.go new file mode 100644 index 0000000000..aa5be778e2 --- /dev/null +++ b/unix/conn_test.go @@ -0,0 +1,154 @@ +package unix + +import ( + "github.com/docker/beam" + "github.com/dotcloud/docker/pkg/testutils" + "testing" +) + +func TestPair(t *testing.T) { + r, w, err := Pair() + if err != nil { + t.Fatal("Unexpected error") + } + defer r.Close() + defer w.Close() + testutils.Timeout(t, func() { + go func() { + msg, in, out, err := r.Receive(0) + if err != nil { + t.Fatal(err) + } + if msg.Name != "print" { + t.Fatalf("%#v", *msg) + } + if msg.Args[0] != "hello world" { + t.Fatalf("%#v", *msg) + } + if in != nil && out != nil { + t.Fatal("Unexpected return value") + } + }() + _, _, err := w.Send(&beam.Message{Name: "print", Args: []string{"hello world"}}, 0) + if err != nil { + t.Fatal(err) + } + }) +} + +func TestSendReply(t *testing.T) { + r, w, err := Pair() + if err != nil { + t.Fatal(err) + } + defer r.Close() + defer w.Close() + testutils.Timeout(t, func() { + // Send + go func() { + // Send a message with mode=R + in, out, err := w.Send(&beam.Message{Args: []string{"this is the request"}}, beam.R) + if err != nil { + t.Fatal(err) + } + assertMode(t, in, out, beam.R) + // Read for a reply + resp, _, _, err := in.Receive(0) + if err != nil { + t.Fatal(err) + } + if resp.Args[0] != "this is the reply" { + t.Fatalf("%#v", resp) + } + }() + // Receive a message with mode=W + msg, in, out, err := r.Receive(beam.W) + if err != nil { + t.Fatal(err) + } + if msg.Args[0] != "this is the request" { + t.Fatalf("%#v", msg) + } + assertMode(t, in, out, beam.W) + // Send a reply + _, _, err = out.Send(&beam.Message{Args: []string{"this is the reply"}}, 0) + if err != nil { + t.Fatal(err) + } + }) +} + +func TestSendNested(t *testing.T) { + r, w, err := Pair() + if err != nil { + t.Fatal(err) + } + defer r.Close() + defer w.Close() + testutils.Timeout(t, func() { + // Send + go func() { + // Send a message with mode=W + in, out, err := w.Send(&beam.Message{Args: []string{"this is the request"}}, beam.W) + if err != nil { + t.Fatal(err) + } + assertMode(t, in, out, beam.W) + // Send a nested message + _, _, err = out.Send(&beam.Message{Args: []string{"this is the nested message"}}, 0) + if err != nil { + t.Fatal(err) + } + }() + // Receive a message with mode=R + msg, in, out, err := r.Receive(beam.R) + if err != nil { + t.Fatal(err) + } + if msg.Args[0] != "this is the request" { + t.Fatalf("%#v", msg) + } + assertMode(t, in, out, beam.R) + // Read for a nested message + nested, _, _, err := in.Receive(0) + if err != nil { + t.Fatal(err) + } + if nested.Args[0] != "this is the nested message" { + t.Fatalf("%#v", nested) + } + }) +} + +// FIXME: duplicate from inmem/inmem_test.go +// assertMode verifies that the values of r and w match +// mode. +// If mode has the R bit set, r must be non-nil. Otherwise it must be nil. +// If mode has the W bit set, w must be non-nil. Otherwise it must be nil. +// +// If any of these conditions are not met, t.Fatal is called and the active +// test fails. +func assertMode(t *testing.T, r beam.Receiver, w beam.Sender, mode int) { + // If mode has the R bit set, r must be non-nil + if mode&beam.R != 0 { + if r == nil { + t.Fatalf("should be non-nil: %#v", r) + } + // Otherwise it must be nil. + } else { + if r != nil { + t.Fatalf("should be nil: %#v", r) + } + } + // If mode has the W bit set, w must be non-nil + if mode&beam.W != 0 { + if w == nil { + t.Fatalf("should be non-nil: %#v", w) + } + // Otherwise it must be nil. + } else { + if w != nil { + t.Fatalf("should be nil: %#v", w) + } + } +} diff --git a/unix.go b/unix/unix.go similarity index 99% rename from unix.go rename to unix/unix.go index b2d0d94150..594eb21b10 100644 --- a/unix.go +++ b/unix/unix.go @@ -1,4 +1,4 @@ -package beam +package unix import ( "bufio" diff --git a/unix_test.go b/unix/unix_test.go similarity index 99% rename from unix_test.go rename to unix/unix_test.go index 976f089c23..7f947760b3 100644 --- a/unix_test.go +++ b/unix/unix_test.go @@ -1,4 +1,4 @@ -package beam +package unix import ( "fmt" diff --git a/inmem/stack.go b/utils/stack.go similarity index 78% rename from inmem/stack.go rename to utils/stack.go index 282b8eaf03..1309402b83 100644 --- a/inmem/stack.go +++ b/utils/stack.go @@ -1,8 +1,9 @@ -package inmem +package utils import ( "container/list" "fmt" + "github.com/docker/beam" "strings" "sync" ) @@ -23,8 +24,8 @@ func NewStackSender() *StackSender { } } -func (s *StackSender) Send(msg *Message, mode int) (r Receiver, w Sender, err error) { - completed := s.walk(func(h Sender) (ok bool) { +func (s *StackSender) Send(msg *beam.Message, mode int) (r beam.Receiver, w beam.Sender, err error) { + completed := s.walk(func(h beam.Sender) (ok bool) { r, w, err = h.Send(msg, mode) fmt.Printf("[stacksender] sending %v to %#v returned %v\n", msg, h, err) if err == nil { @@ -40,7 +41,7 @@ func (s *StackSender) Send(msg *Message, mode int) (r Receiver, w Sender, err er return nil, nil, nil } -func (s *StackSender) Add(dst Sender) *StackSender { +func (s *StackSender) Add(dst beam.Sender) *StackSender { s.l.Lock() defer s.l.Unlock() prev := &StackSender{ @@ -53,7 +54,7 @@ func (s *StackSender) Add(dst Sender) *StackSender { } func (s *StackSender) Close() error { - s.walk(func(h Sender) bool { + s.walk(func(h beam.Sender) bool { h.Close() // remove all handlers return false @@ -67,7 +68,7 @@ func (s *StackSender) _walk(f func(*list.Element) bool) bool { e = s.stack.Front() s.l.RUnlock() for e != nil { - fmt.Printf("[StackSender.Walk] %v\n", e.Value.(Sender)) + fmt.Printf("[StackSender.Walk] %v\n", e.Value.(beam.Sender)) s.l.RLock() next := e.Next() s.l.RUnlock() @@ -80,9 +81,9 @@ func (s *StackSender) _walk(f func(*list.Element) bool) bool { return true } -func (s *StackSender) walk(f func(Sender) bool) bool { +func (s *StackSender) walk(f func(beam.Sender) bool) bool { return s._walk(func(e *list.Element) bool { - ok := f(e.Value.(Sender)) + ok := f(e.Value.(beam.Sender)) if ok { // Found a valid handler. Stop walking. return false @@ -104,7 +105,7 @@ func (s *StackSender) Len() int { func (s *StackSender) String() string { var parts []string s._walk(func(e *list.Element) bool { - parts = append(parts, fmt.Sprintf("%v", e.Value.(Sender))) + parts = append(parts, fmt.Sprintf("%v", e.Value.(beam.Sender))) return true }) return fmt.Sprintf("%d:[%s]", len(parts), strings.Join(parts, "->")) diff --git a/inmem/stack_test.go b/utils/stack_test.go similarity index 56% rename from inmem/stack_test.go rename to utils/stack_test.go index 44425ce38a..90ed590694 100644 --- a/inmem/stack_test.go +++ b/utils/stack_test.go @@ -1,13 +1,16 @@ -package inmem +package utils import ( + "github.com/docker/beam" + "github.com/docker/beam/inmem" + "github.com/docker/beam/unix" "github.com/dotcloud/docker/pkg/testutils" "strings" "testing" ) -func TestSendStack(t *testing.T) { - r, w := Pipe() +func TestStackWithPipe(t *testing.T) { + r, w := inmem.Pipe() defer r.Close() defer w.Close() s := NewStackSender() @@ -25,7 +28,36 @@ func TestSendStack(t *testing.T) { t.Fatalf("%#v", msg) } }() - _, _, err := s.Send(&Message{"hello", []string{"wonderful", "world"}}, 0) + _, _, err := s.Send(&beam.Message{"hello", []string{"wonderful", "world"}}, 0) + if err != nil { + t.Fatal(err) + } + }) +} + +func TestStackWithPair(t *testing.T) { + r, w, err := unix.Pair() + if err != nil { + t.Fatal(err) + } + defer r.Close() + defer w.Close() + s := NewStackSender() + s.Add(w) + testutils.Timeout(t, func() { + go func() { + msg, _, _, err := r.Receive(0) + if err != nil { + t.Fatal(err) + } + if msg.Name != "hello" { + t.Fatalf("%#v", msg) + } + if strings.Join(msg.Args, " ") != "wonderful world" { + t.Fatalf("%#v", msg) + } + }() + _, _, err := s.Send(&beam.Message{"hello", []string{"wonderful", "world"}}, 0) if err != nil { t.Fatal(err) } @@ -41,7 +73,7 @@ func TestStackLen(t *testing.T) { func TestStackAdd(t *testing.T) { s := NewStackSender() - a := Buffer{} + a := inmem.Buffer{} beforeA := s.Add(&a) // Add on an empty StackSender should return an empty StackSender if beforeA.Len() != 0 { @@ -51,7 +83,7 @@ func TestStackAdd(t *testing.T) { t.Fatalf("%#v", beforeA) } // Add a 2nd element - b := Buffer{} + b := inmem.Buffer{} beforeB := s.Add(&b) if beforeB.Len() != 1 { t.Fatalf("%#v", beforeA) @@ -59,9 +91,9 @@ func TestStackAdd(t *testing.T) { if s.Len() != 2 { t.Fatalf("%#v", beforeA) } - s.Send(&Message{"for b", nil}, 0) - beforeB.Send(&Message{"for a", nil}, 0) - beforeA.Send(&Message{"for nobody", nil}, 0) + s.Send(&beam.Message{"for b", nil}, 0) + beforeB.Send(&beam.Message{"for a", nil}, 0) + beforeA.Send(&beam.Message{"for nobody", nil}, 0) if len(a) != 1 { t.Fatalf("%#v", a) } @@ -73,15 +105,15 @@ func TestStackAdd(t *testing.T) { // Misbehaving backends must be removed func TestStackAddBad(t *testing.T) { s := NewStackSender() - buf := Buffer{} + buf := inmem.Buffer{} s.Add(&buf) - r, w := Pipe() + r, w := inmem.Pipe() s.Add(w) if s.Len() != 2 { t.Fatalf("%#v", s) } r.Close() - if _, _, err := s.Send(&Message{"for the buffer", nil}, 0); err != nil { + if _, _, err := s.Send(&beam.Message{"for the buffer", nil}, 0); err != nil { t.Fatal(err) } if s.Len() != 1 {