diff --git a/beam/http2/README.md b/beam/http2/README.md index d9af9bc043..92ea4c8cdf 100644 --- a/beam/http2/README.md +++ b/beam/http2/README.md @@ -1,5 +1,7 @@ This package defines a remote transport for Beam services using http2/spdy and tls. +Uses https://github.com/docker/spdystream + Pointers: * Low-level protocol framer: http://code.google.com/p/go.net/spdy diff --git a/beam/http2/listener.go b/beam/http2/listener.go new file mode 100644 index 0000000000..8b17bf2b08 --- /dev/null +++ b/beam/http2/listener.go @@ -0,0 +1,86 @@ +package http2 + +import ( + "github.com/docker/libswarm/beam" + "github.com/docker/spdystream" + "net" + "sync" +) + +type ListenSession struct { + listener net.Listener + streamChan chan *spdystream.Stream + streamLock sync.RWMutex + subStreamChans map[string]chan *spdystream.Stream + auth Authenticator +} + +func NewListenSession(listener net.Listener, auth Authenticator) (*ListenSession, error) { + return &ListenSession{ + listener: listener, + streamChan: make(chan *spdystream.Stream), + subStreamChans: make(map[string]chan *spdystream.Stream), + auth: auth, + }, nil +} + +func (l *ListenSession) streamHandler(stream *spdystream.Stream) { + streamChan := l.getStreamChan(stream.Parent()) + streamChan <- stream +} + +func (l *ListenSession) addStreamChan(stream *spdystream.Stream, streamChan chan *spdystream.Stream) { + l.streamLock.Lock() + l.subStreamChans[stream.String()] = streamChan + l.streamLock.Unlock() +} + +func (l *ListenSession) getStreamChan(stream *spdystream.Stream) chan *spdystream.Stream { + if stream == nil { + return l.streamChan + } + l.streamLock.RLock() + defer l.streamLock.RUnlock() + streamChan, ok := l.subStreamChans[stream.String()] + if ok { + return streamChan + } + return l.streamChan +} + +func (l *ListenSession) Serve() { + for { + conn, err := l.listener.Accept() + if err != nil { + // TODO log + break + } + + go func() { + authHandler, authErr := l.auth(conn) + if authErr != nil { + // TODO log + conn.Close() + return + } + + spdyConn, spdyErr := spdystream.NewConnection(conn, true) + if spdyErr != nil { + // TODO log + conn.Close() + return + } + + go spdyConn.Serve(l.streamHandler, authHandler) + }() + } +} + +func (l *ListenSession) Shutdown() error { + return l.listener.Close() +} + +func (l *ListenSession) Receive(mode int) (*beam.Message, error) { + stream := <-l.streamChan + return createStreamMessage(stream, mode, l, nil) +} diff --git a/beam/http2/listener_test.go b/beam/http2/listener_test.go new file mode 100644 index 0000000000..2d4a8a9432 --- /dev/null +++ b/beam/http2/listener_test.go @@ -0,0 +1,83 @@ +package http2 + +import ( + "github.com/docker/libswarm/beam" + "io" + "net" + "testing" +) + +func TestListenSession(t *testing.T) { + listen := "localhost:7743" + listener, listenErr := net.Listen("tcp", listen) + if listenErr != nil { + t.Fatalf("Error creating listener: %s", listenErr) + } + + session, sessionErr := NewListenSession(listener, NoAuthenticator) + if sessionErr != nil { + t.Fatalf("Error creating session: %s", sessionErr) + } + + go session.Serve() + + end := make(chan bool) + go exerciseServer(t, listen, end) + + msg, msgErr := session.Receive(beam.Ret) + if msgErr != nil { + t.Fatalf("Error receiving message: %s", msgErr) + } + if msg.Att == nil { + t.Fatalf("Error message missing attachment") + } + if msg.Verb != beam.Attach { + t.Fatalf("Wrong verb\nActual: %s\nExpecting: %s", msg.Verb, beam.Attach) + } + + receiver, sendErr := msg.Ret.Send(&beam.Message{Verb: beam.Ack}) + if sendErr != nil { + t.Fatalf("Error sending return message: %s", sendErr) + } + _, ackErr := receiver.Receive(0) + if ackErr == nil { + t.Fatalf("No error receiving from message with no return pipe") + } + if ackErr != io.EOF { + t.Fatalf("Unexpected error receiving from message: %s", ackErr) + } + + <-end + shutdownErr := session.Shutdown() + if shutdownErr != nil { + t.Fatalf("Error shutting down: %s", shutdownErr) + } +} + +func exerciseServer(t *testing.T, server string, endChan chan bool) { + defer close(endChan) + + conn, connErr := net.Dial("tcp", server) + if connErr != nil { + t.Fatalf("Error dialing server: %s", connErr) + } + + session, sessionErr := NewStreamSession(conn) + if sessionErr != nil { + t.Fatalf("Error creating session: %s", sessionErr) + } + + receiver, sendErr := session.Send(&beam.Message{Verb: beam.Attach, Ret: beam.RetPipe}) + if sendErr != nil { + t.Fatalf("Error sending message: %s", sendErr) + } + + msg, receiveErr := receiver.Receive(beam.Ret) + if receiveErr != nil { + t.Fatalf("Error receiving message") + } + + if msg.Verb != beam.Ack { + t.Fatalf("Wrong verb\nActual: %s\nExpecting: %s", msg.Verb, beam.Ack) + } +} diff --git a/beam/http2/spdy.go b/beam/http2/spdy.go new file mode 100644 index 0000000000..293cc229ab --- /dev/null +++ b/beam/http2/spdy.go @@ -0,0 +1,109 @@ +package http2 + +import ( + "encoding/base64" + "fmt" + "github.com/docker/libswarm/beam" + "github.com/docker/libswarm/beam/data" + "github.com/docker/spdystream" + "io" + "net" + "net/http" + "os" + "syscall" +) + +type Authenticator func(conn net.Conn) (spdystream.AuthHandler, error) + +func NoAuthenticator(conn net.Conn) (spdystream.AuthHandler, error) { + return func(header http.Header, slot uint8, parent uint32) bool { + return true + }, nil +} + +type streamChanProvider interface { + addStreamChan(stream *spdystream.Stream, streamChan chan *spdystream.Stream) + getStreamChan(stream *spdystream.Stream) chan *spdystream.Stream +} + +func encodeArgs(args []string) string { + encoded := data.Encode(map[string][]string{"args": args}) + return base64.URLEncoding.EncodeToString([]byte(encoded)) +} + +func decodeArgs(argString string) ([]string, error) { + decoded, decodeErr := base64.URLEncoding.DecodeString(argString) + if decodeErr != nil { + return []string{}, decodeErr + } + dataMap, dataErr := data.Decode(string(decoded)) + if dataErr != nil { + return []string{}, dataErr + } + return dataMap["args"], nil +} + +func createStreamMessage(stream *spdystream.Stream, mode int, streamChans streamChanProvider, ret beam.Sender) (*beam.Message, error) { + verbString := stream.Headers()["Verb"] + if len(verbString) != 1 { + if len(verbString) == 0 { + return nil, fmt.Errorf("Stream(%s) is missing verb header", stream) + } else { + return nil, fmt.Errorf("Stream(%s) has multiple verb headers", stream) + } + + } + verb, verbOk := verbs[verbString[0]] + if !verbOk { + return nil, fmt.Errorf("Unknown verb: %s", verbString[0]) + } + + var args []string + argString := stream.Headers()["Args"] + if len(argString) > 1 { + return nil, fmt.Errorf("Stream(%s) has multiple args headers", stream) + } + if len(argString) == 1 { + var err error + args, err = decodeArgs(argString[0]) + if err != nil { + return nil, err + } + } + + var attach *os.File + if !stream.IsFinished() { + socketFds, socketErr := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM|syscall.FD_CLOEXEC, 0) + if socketErr != nil { + return nil, socketErr + } + attach = os.NewFile(uintptr(socketFds[0]), "") + conn, connErr := net.FileConn(os.NewFile(uintptr(socketFds[1]), "")) + if connErr != nil { + return nil, connErr + } + + go func() { + io.Copy(conn, stream) + }() + go func() { + io.Copy(stream, conn) + }() + } + + retSender := ret + if retSender == nil || beam.RetPipe.Equals(retSender) { + retSender = &StreamSender{stream: stream, streamChans: streamChans} + } + + if mode&beam.Ret == 0 { + retSender.Close() + } + + return &beam.Message{ + Verb: verb, + Args: args, + Att: attach, + Ret: retSender, + }, nil +} diff --git a/beam/http2/stream.go b/beam/http2/stream.go new file mode 100644 index 0000000000..ac5c67364d --- /dev/null +++ b/beam/http2/stream.go @@ -0,0 +1,166 @@ +package http2 + +import ( + "fmt" + "github.com/docker/libswarm/beam" + "github.com/docker/spdystream" + "net" + "net/http" + "sync" +) + +var verbs = map[string]beam.Verb{ + "Ack": beam.Ack, + "Attach": beam.Attach, + "Connect": beam.Connect, + "Error": beam.Error, + "File": beam.File, + "Get": beam.Get, + "Log": beam.Log, + "Ls": beam.Ls, + "Set": beam.Set, + "Spawn": beam.Spawn, + "Start": beam.Start, + "Stop": beam.Stop, + "Watch": beam.Watch, +} + +// Only allows sending, no parent stream +type StreamSession struct { + conn *spdystream.Connection + + streamLock sync.Mutex + streamChan chan *spdystream.Stream + subStreamChans map[string]chan *spdystream.Stream +} + +func (s *StreamSession) addStreamChan(stream *spdystream.Stream, streamChan chan *spdystream.Stream) { + s.subStreamChans[stream.String()] = streamChan +} + +func (s *StreamSession) getStreamChan(stream *spdystream.Stream) chan *spdystream.Stream { + if stream == nil { + return s.streamChan + } + streamChan, ok := s.subStreamChans[stream.String()] + if ok { + return streamChan + } + return s.streamChan +} + +func (s *StreamSession) newStreamHandler(stream *spdystream.Stream) { + streamChan := s.getStreamChan(stream.Parent()) + streamChan <- stream +} + +func NewStreamSession(conn net.Conn) (*StreamSession, error) { + session := &StreamSession{ + streamChan: make(chan *spdystream.Stream), + subStreamChans: make(map[string]chan *spdystream.Stream), + } + + spdyConn, spdyErr := spdystream.NewConnection(conn, false) + if spdyErr != nil { + return nil, spdyErr + } + go spdyConn.Serve(session.newStreamHandler, spdystream.NoAuthHandler) + + session.conn = spdyConn + + return session, nil +} + +func (s *StreamSession) Send(msg *beam.Message) (ret beam.Receiver, err error) { + if msg.Att != nil { + return nil, fmt.Errorf("file attachment not yet implemented for spdy transport") + } + + var fin bool + if beam.RetPipe.Equals(msg.Ret) { + fin = false + } else { + fin = true + } + headers := http.Header{ + "Verb": []string{msg.Verb.String()}, + "Args": []string{encodeArgs(msg.Args)}, + } + stream, streamErr := s.conn.CreateStream(headers, nil, fin) + if streamErr != nil { + return nil, streamErr + } + + streamChan := make(chan *spdystream.Stream) + s.subStreamChans[stream.String()] = streamChan + + if beam.RetPipe.Equals(msg.Ret) { + ret = &StreamReceiver{stream: stream, streamChans: s} + } else { + ret = &beam.NopReceiver{} + } + return +} + +func (s *StreamSession) Close() error { + return s.conn.Close() +} + +type StreamReceiver struct { + stream *spdystream.Stream + streamChans streamChanProvider + ret beam.Sender +} + +func (s *StreamReceiver) Receive(mode int) (*beam.Message, error) { + waitErr := s.stream.Wait() + if waitErr != nil { + return nil, waitErr + } + streamChan := s.streamChans.getStreamChan(s.stream) + stream := <-streamChan + return createStreamMessage(stream, mode, s.streamChans, s.ret) +} + +type StreamSender struct { + stream *spdystream.Stream + streamChans streamChanProvider +} + +func (s *StreamSender) Send(msg *beam.Message) (ret beam.Receiver, err error) { + if msg.Att != nil { + return nil, fmt.Errorf("file attachment not yet implemented for spdy transport") + } + + var fin bool + if beam.RetPipe.Equals(msg.Ret) { + fin = false + } else { + fin = true + } + headers := http.Header{ + "Verb": []string{msg.Verb.String()}, + "Args": []string{encodeArgs(msg.Args)}, + } + + stream, streamErr := s.stream.CreateSubStream(headers, fin) + if streamErr != nil { + return nil, streamErr + } + + streamChan := make(chan *spdystream.Stream) + s.streamChans.addStreamChan(stream, streamChan) + + if beam.RetPipe.Equals(msg.Ret) { + ret = &StreamReceiver{stream: stream, streamChans: s.streamChans} + } else { + ret = beam.NopReceiver{} + } + + return +} + +func (s *StreamSender) Close() error { + // TODO Remove stream from stream chans + return s.stream.Close() +} diff --git a/beam/http2/stream_test.go b/beam/http2/stream_test.go new file mode 100644 index 0000000000..8ae9d27a2b --- /dev/null +++ b/beam/http2/stream_test.go @@ -0,0 +1,159 @@ +package http2 + +import ( + //"bytes" + "github.com/docker/libswarm/beam" + //"github.com/docker/spdystream" + "io" + "net" + "testing" +) + +func TestBeamSession(t *testing.T) { + end := make(chan bool) + listen := "localhost:7543" + server, serverErr := runServer(listen, t, end) + if serverErr != nil { + t.Fatalf("Error initializing server: %s", serverErr) + } + + conn, connErr := net.Dial("tcp", listen) + if connErr != nil { + t.Fatalf("Error dialing server: %s", connErr) + } + + sender, senderErr := NewStreamSession(conn) + if senderErr != nil { + t.Fatalf("Error creating sender: %s", senderErr) + } + + // Ls interaction + receiver, sendErr := sender.Send(&beam.Message{Verb: beam.Ls, Ret: beam.RetPipe}) + if sendErr != nil { + t.Fatalf("Error sending beam message: %s", sendErr) + } + message, receiveErr := receiver.Receive(0) + if receiveErr != nil { + t.Fatalf("Error receiving beam message: %s", receiveErr) + } + if message.Verb != beam.Set { + t.Errorf("Unexpected message name:\nActual: %s\nExpected: %s", message.Verb, beam.Ls.String()) + } + if len(message.Args) != 3 { + t.Fatalf("Unexpected args length\nActual: %d\nExpected: %d", len(message.Args), 3) + } + if message.Args[0] != "file1" { + t.Errorf("Unexpected arg[0]\nActual: %s\nExpected: %s", message.Args[0], "file1") + } + if message.Args[1] != "file2" { + t.Errorf("Unexpected arg[0]\nActual: %s\nExpected: %s", message.Args[1], "file2") + } + if message.Args[2] != string([]byte{0x00, 0x00, 0x00}) { + t.Errorf("Unexpected arg[0]\nActual: %s\nExpected: %s", message.Args[2], []byte{0x00, 0x00, 0x00}) + } + + // Attach interactions + receiver, sendErr = sender.Send(&beam.Message{Verb: beam.Attach, Ret: beam.RetPipe}) + if sendErr != nil { + t.Fatalf("Error sending beam message: %s", sendErr) + } + message, receiveErr = receiver.Receive(beam.Ret) + if receiveErr != nil { + t.Fatalf("Error receiving beam message: %s", receiveErr) + } + if message.Verb != beam.Ack { + t.Errorf("Unexpected message name:\nActual: %s\nExpected: %s", message.Verb, beam.Ack.String()) + } + + // TODO full connect interaction + //if message.Att == nil { + // t.Fatalf("Missing attachment on message") + //} + + //testBytes := []byte("Hello") + //n, writeErr := message.Att.Write(testBytes) + //if writeErr != nil { + // t.Fatalf("Error writing bytes: %s", writeErr) + //} + //if n != 5 { + // t.Fatalf("Unexpected number of bytes read:\nActual: %d\nExpected: 5", n) + //} + + //buf := make([]byte, 10) + //n, readErr := message.Att.Read(buf) + //if readErr != nil { + // t.Fatalf("Error writing bytes: %s", readErr) + //} + //if n != 5 { + // t.Fatalf("Unexpected number of bytes read:\nActual: %d\nExpected: 5", n) + //} + //if bytes.Compare(buf[:n], testBytes) != 0 { + // t.Fatalf("Did not receive expected message:\nActual: %s\nExpectd: %s", buf, testBytes) + //} + + closeErr := server.Close() + if closeErr != nil { + t.Fatalf("Error closing server: %s", closeErr) + } + + closeErr = sender.Close() + if closeErr != nil { + t.Fatalf("Error closing sender: %s", closeErr) + } + <-end +} + +func runServer(listen string, t *testing.T, endChan chan bool) (io.Closer, error) { + listener, lErr := net.Listen("tcp", listen) + if lErr != nil { + return nil, lErr + } + + session, sessionErr := NewListenSession(listener, NoAuthenticator) + if sessionErr != nil { + t.Fatalf("Error creating session: %s", sessionErr) + } + + go session.Serve() + + go func() { + defer close(endChan) + // Ls exchange + message, receiveErr := session.Receive(beam.Ret) + if receiveErr != nil { + t.Fatalf("Error receiving on server: %s", receiveErr) + } + if message.Verb != beam.Ls { + t.Fatalf("Unexpected verb: %s", message.Verb) + } + receiver, sendErr := message.Ret.Send(&beam.Message{Verb: beam.Set, Args: []string{"file1", "file2", string([]byte{0x00, 0x00, 0x00})}}) + if sendErr != nil { + t.Fatalf("Error sending set message: %s", sendErr) + } + _, receiveErr = receiver.Receive(0) + if receiveErr == nil { + t.Fatalf("No error received from empty receiver") + } + if receiveErr != io.EOF { + t.Fatalf("Expected error from empty receiver: %s", receiveErr) + } + + // Connect exchange + message, receiveErr = session.Receive(beam.Ret) + if receiveErr != nil { + t.Fatalf("Error receiving on server: %s", receiveErr) + } + if message.Verb != beam.Attach { + t.Fatalf("Unexpected verb: %s", message.Verb) + } + receiver, sendErr = message.Ret.Send(&beam.Message{Verb: beam.Ack}) + if sendErr != nil { + t.Fatalf("Error sending set message: %s", sendErr) + } + + // TODO full connect interaction + + }() + + return listener, nil +} diff --git a/swarmd/Godeps/Godeps.json b/swarmd/Godeps/Godeps.json index 905d163b4a..bdccac656c 100644 --- a/swarmd/Godeps/Godeps.json +++ b/swarmd/Godeps/Godeps.json @@ -32,6 +32,10 @@ "Comment": "0-4-g2fb21b3", "Rev": "2fb21b34171f083d46d66195caa7ec121d941ec5" }, + { + "ImportPath": "github.com/docker/spdystream", + "Rev": "87969a9c4b79508e63004ac17a97eb9278c08b3c" + }, { "ImportPath": "github.com/dotcloud/docker/api", "Comment": "v0.11.1-466-g77ae37a",