Merge pull request #41 from dmcgowan/beam_add_http2

This commit is contained in:
Solomon Hykes 2014-06-06 17:50:25 -07:00
commit 337fb233b4
7 changed files with 609 additions and 0 deletions

View File

@ -1,5 +1,7 @@
This package defines a remote transport for Beam services using http2/spdy and tls.
Uses https://github.com/docker/spdystream
Pointers:
* Low-level protocol framer: http://code.google.com/p/go.net/spdy

86
beam/http2/listener.go Normal file
View File

@ -0,0 +1,86 @@
package http2
import (
"github.com/docker/libswarm/beam"
"github.com/docker/spdystream"
"net"
"sync"
)
type ListenSession struct {
listener net.Listener
streamChan chan *spdystream.Stream
streamLock sync.RWMutex
subStreamChans map[string]chan *spdystream.Stream
auth Authenticator
}
func NewListenSession(listener net.Listener, auth Authenticator) (*ListenSession, error) {
return &ListenSession{
listener: listener,
streamChan: make(chan *spdystream.Stream),
subStreamChans: make(map[string]chan *spdystream.Stream),
auth: auth,
}, nil
}
func (l *ListenSession) streamHandler(stream *spdystream.Stream) {
streamChan := l.getStreamChan(stream.Parent())
streamChan <- stream
}
func (l *ListenSession) addStreamChan(stream *spdystream.Stream, streamChan chan *spdystream.Stream) {
l.streamLock.Lock()
l.subStreamChans[stream.String()] = streamChan
l.streamLock.Unlock()
}
func (l *ListenSession) getStreamChan(stream *spdystream.Stream) chan *spdystream.Stream {
if stream == nil {
return l.streamChan
}
l.streamLock.RLock()
defer l.streamLock.RUnlock()
streamChan, ok := l.subStreamChans[stream.String()]
if ok {
return streamChan
}
return l.streamChan
}
func (l *ListenSession) Serve() {
for {
conn, err := l.listener.Accept()
if err != nil {
// TODO log
break
}
go func() {
authHandler, authErr := l.auth(conn)
if authErr != nil {
// TODO log
conn.Close()
return
}
spdyConn, spdyErr := spdystream.NewConnection(conn, true)
if spdyErr != nil {
// TODO log
conn.Close()
return
}
go spdyConn.Serve(l.streamHandler, authHandler)
}()
}
}
func (l *ListenSession) Shutdown() error {
return l.listener.Close()
}
func (l *ListenSession) Receive(mode int) (*beam.Message, error) {
stream := <-l.streamChan
return createStreamMessage(stream, mode, l, nil)
}

View File

@ -0,0 +1,83 @@
package http2
import (
"github.com/docker/libswarm/beam"
"io"
"net"
"testing"
)
func TestListenSession(t *testing.T) {
listen := "localhost:7743"
listener, listenErr := net.Listen("tcp", listen)
if listenErr != nil {
t.Fatalf("Error creating listener: %s", listenErr)
}
session, sessionErr := NewListenSession(listener, NoAuthenticator)
if sessionErr != nil {
t.Fatalf("Error creating session: %s", sessionErr)
}
go session.Serve()
end := make(chan bool)
go exerciseServer(t, listen, end)
msg, msgErr := session.Receive(beam.Ret)
if msgErr != nil {
t.Fatalf("Error receiving message: %s", msgErr)
}
if msg.Att == nil {
t.Fatalf("Error message missing attachment")
}
if msg.Verb != beam.Attach {
t.Fatalf("Wrong verb\nActual: %s\nExpecting: %s", msg.Verb, beam.Attach)
}
receiver, sendErr := msg.Ret.Send(&beam.Message{Verb: beam.Ack})
if sendErr != nil {
t.Fatalf("Error sending return message: %s", sendErr)
}
_, ackErr := receiver.Receive(0)
if ackErr == nil {
t.Fatalf("No error receiving from message with no return pipe")
}
if ackErr != io.EOF {
t.Fatalf("Unexpected error receiving from message: %s", ackErr)
}
<-end
shutdownErr := session.Shutdown()
if shutdownErr != nil {
t.Fatalf("Error shutting down: %s", shutdownErr)
}
}
func exerciseServer(t *testing.T, server string, endChan chan bool) {
defer close(endChan)
conn, connErr := net.Dial("tcp", server)
if connErr != nil {
t.Fatalf("Error dialing server: %s", connErr)
}
session, sessionErr := NewStreamSession(conn)
if sessionErr != nil {
t.Fatalf("Error creating session: %s", sessionErr)
}
receiver, sendErr := session.Send(&beam.Message{Verb: beam.Attach, Ret: beam.RetPipe})
if sendErr != nil {
t.Fatalf("Error sending message: %s", sendErr)
}
msg, receiveErr := receiver.Receive(beam.Ret)
if receiveErr != nil {
t.Fatalf("Error receiving message")
}
if msg.Verb != beam.Ack {
t.Fatalf("Wrong verb\nActual: %s\nExpecting: %s", msg.Verb, beam.Ack)
}
}

109
beam/http2/spdy.go Normal file
View File

@ -0,0 +1,109 @@
package http2
import (
"encoding/base64"
"fmt"
"github.com/docker/libswarm/beam"
"github.com/docker/libswarm/beam/data"
"github.com/docker/spdystream"
"io"
"net"
"net/http"
"os"
"syscall"
)
type Authenticator func(conn net.Conn) (spdystream.AuthHandler, error)
func NoAuthenticator(conn net.Conn) (spdystream.AuthHandler, error) {
return func(header http.Header, slot uint8, parent uint32) bool {
return true
}, nil
}
type streamChanProvider interface {
addStreamChan(stream *spdystream.Stream, streamChan chan *spdystream.Stream)
getStreamChan(stream *spdystream.Stream) chan *spdystream.Stream
}
func encodeArgs(args []string) string {
encoded := data.Encode(map[string][]string{"args": args})
return base64.URLEncoding.EncodeToString([]byte(encoded))
}
func decodeArgs(argString string) ([]string, error) {
decoded, decodeErr := base64.URLEncoding.DecodeString(argString)
if decodeErr != nil {
return []string{}, decodeErr
}
dataMap, dataErr := data.Decode(string(decoded))
if dataErr != nil {
return []string{}, dataErr
}
return dataMap["args"], nil
}
func createStreamMessage(stream *spdystream.Stream, mode int, streamChans streamChanProvider, ret beam.Sender) (*beam.Message, error) {
verbString := stream.Headers()["Verb"]
if len(verbString) != 1 {
if len(verbString) == 0 {
return nil, fmt.Errorf("Stream(%s) is missing verb header", stream)
} else {
return nil, fmt.Errorf("Stream(%s) has multiple verb headers", stream)
}
}
verb, verbOk := verbs[verbString[0]]
if !verbOk {
return nil, fmt.Errorf("Unknown verb: %s", verbString[0])
}
var args []string
argString := stream.Headers()["Args"]
if len(argString) > 1 {
return nil, fmt.Errorf("Stream(%s) has multiple args headers", stream)
}
if len(argString) == 1 {
var err error
args, err = decodeArgs(argString[0])
if err != nil {
return nil, err
}
}
var attach *os.File
if !stream.IsFinished() {
socketFds, socketErr := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM|syscall.FD_CLOEXEC, 0)
if socketErr != nil {
return nil, socketErr
}
attach = os.NewFile(uintptr(socketFds[0]), "")
conn, connErr := net.FileConn(os.NewFile(uintptr(socketFds[1]), ""))
if connErr != nil {
return nil, connErr
}
go func() {
io.Copy(conn, stream)
}()
go func() {
io.Copy(stream, conn)
}()
}
retSender := ret
if retSender == nil || beam.RetPipe.Equals(retSender) {
retSender = &StreamSender{stream: stream, streamChans: streamChans}
}
if mode&beam.Ret == 0 {
retSender.Close()
}
return &beam.Message{
Verb: verb,
Args: args,
Att: attach,
Ret: retSender,
}, nil
}

166
beam/http2/stream.go Normal file
View File

@ -0,0 +1,166 @@
package http2
import (
"fmt"
"github.com/docker/libswarm/beam"
"github.com/docker/spdystream"
"net"
"net/http"
"sync"
)
var verbs = map[string]beam.Verb{
"Ack": beam.Ack,
"Attach": beam.Attach,
"Connect": beam.Connect,
"Error": beam.Error,
"File": beam.File,
"Get": beam.Get,
"Log": beam.Log,
"Ls": beam.Ls,
"Set": beam.Set,
"Spawn": beam.Spawn,
"Start": beam.Start,
"Stop": beam.Stop,
"Watch": beam.Watch,
}
// Only allows sending, no parent stream
type StreamSession struct {
conn *spdystream.Connection
streamLock sync.Mutex
streamChan chan *spdystream.Stream
subStreamChans map[string]chan *spdystream.Stream
}
func (s *StreamSession) addStreamChan(stream *spdystream.Stream, streamChan chan *spdystream.Stream) {
s.subStreamChans[stream.String()] = streamChan
}
func (s *StreamSession) getStreamChan(stream *spdystream.Stream) chan *spdystream.Stream {
if stream == nil {
return s.streamChan
}
streamChan, ok := s.subStreamChans[stream.String()]
if ok {
return streamChan
}
return s.streamChan
}
func (s *StreamSession) newStreamHandler(stream *spdystream.Stream) {
streamChan := s.getStreamChan(stream.Parent())
streamChan <- stream
}
func NewStreamSession(conn net.Conn) (*StreamSession, error) {
session := &StreamSession{
streamChan: make(chan *spdystream.Stream),
subStreamChans: make(map[string]chan *spdystream.Stream),
}
spdyConn, spdyErr := spdystream.NewConnection(conn, false)
if spdyErr != nil {
return nil, spdyErr
}
go spdyConn.Serve(session.newStreamHandler, spdystream.NoAuthHandler)
session.conn = spdyConn
return session, nil
}
func (s *StreamSession) Send(msg *beam.Message) (ret beam.Receiver, err error) {
if msg.Att != nil {
return nil, fmt.Errorf("file attachment not yet implemented for spdy transport")
}
var fin bool
if beam.RetPipe.Equals(msg.Ret) {
fin = false
} else {
fin = true
}
headers := http.Header{
"Verb": []string{msg.Verb.String()},
"Args": []string{encodeArgs(msg.Args)},
}
stream, streamErr := s.conn.CreateStream(headers, nil, fin)
if streamErr != nil {
return nil, streamErr
}
streamChan := make(chan *spdystream.Stream)
s.subStreamChans[stream.String()] = streamChan
if beam.RetPipe.Equals(msg.Ret) {
ret = &StreamReceiver{stream: stream, streamChans: s}
} else {
ret = &beam.NopReceiver{}
}
return
}
func (s *StreamSession) Close() error {
return s.conn.Close()
}
type StreamReceiver struct {
stream *spdystream.Stream
streamChans streamChanProvider
ret beam.Sender
}
func (s *StreamReceiver) Receive(mode int) (*beam.Message, error) {
waitErr := s.stream.Wait()
if waitErr != nil {
return nil, waitErr
}
streamChan := s.streamChans.getStreamChan(s.stream)
stream := <-streamChan
return createStreamMessage(stream, mode, s.streamChans, s.ret)
}
type StreamSender struct {
stream *spdystream.Stream
streamChans streamChanProvider
}
func (s *StreamSender) Send(msg *beam.Message) (ret beam.Receiver, err error) {
if msg.Att != nil {
return nil, fmt.Errorf("file attachment not yet implemented for spdy transport")
}
var fin bool
if beam.RetPipe.Equals(msg.Ret) {
fin = false
} else {
fin = true
}
headers := http.Header{
"Verb": []string{msg.Verb.String()},
"Args": []string{encodeArgs(msg.Args)},
}
stream, streamErr := s.stream.CreateSubStream(headers, fin)
if streamErr != nil {
return nil, streamErr
}
streamChan := make(chan *spdystream.Stream)
s.streamChans.addStreamChan(stream, streamChan)
if beam.RetPipe.Equals(msg.Ret) {
ret = &StreamReceiver{stream: stream, streamChans: s.streamChans}
} else {
ret = beam.NopReceiver{}
}
return
}
func (s *StreamSender) Close() error {
// TODO Remove stream from stream chans
return s.stream.Close()
}

159
beam/http2/stream_test.go Normal file
View File

@ -0,0 +1,159 @@
package http2
import (
//"bytes"
"github.com/docker/libswarm/beam"
//"github.com/docker/spdystream"
"io"
"net"
"testing"
)
func TestBeamSession(t *testing.T) {
end := make(chan bool)
listen := "localhost:7543"
server, serverErr := runServer(listen, t, end)
if serverErr != nil {
t.Fatalf("Error initializing server: %s", serverErr)
}
conn, connErr := net.Dial("tcp", listen)
if connErr != nil {
t.Fatalf("Error dialing server: %s", connErr)
}
sender, senderErr := NewStreamSession(conn)
if senderErr != nil {
t.Fatalf("Error creating sender: %s", senderErr)
}
// Ls interaction
receiver, sendErr := sender.Send(&beam.Message{Verb: beam.Ls, Ret: beam.RetPipe})
if sendErr != nil {
t.Fatalf("Error sending beam message: %s", sendErr)
}
message, receiveErr := receiver.Receive(0)
if receiveErr != nil {
t.Fatalf("Error receiving beam message: %s", receiveErr)
}
if message.Verb != beam.Set {
t.Errorf("Unexpected message name:\nActual: %s\nExpected: %s", message.Verb, beam.Ls.String())
}
if len(message.Args) != 3 {
t.Fatalf("Unexpected args length\nActual: %d\nExpected: %d", len(message.Args), 3)
}
if message.Args[0] != "file1" {
t.Errorf("Unexpected arg[0]\nActual: %s\nExpected: %s", message.Args[0], "file1")
}
if message.Args[1] != "file2" {
t.Errorf("Unexpected arg[0]\nActual: %s\nExpected: %s", message.Args[1], "file2")
}
if message.Args[2] != string([]byte{0x00, 0x00, 0x00}) {
t.Errorf("Unexpected arg[0]\nActual: %s\nExpected: %s", message.Args[2], []byte{0x00, 0x00, 0x00})
}
// Attach interactions
receiver, sendErr = sender.Send(&beam.Message{Verb: beam.Attach, Ret: beam.RetPipe})
if sendErr != nil {
t.Fatalf("Error sending beam message: %s", sendErr)
}
message, receiveErr = receiver.Receive(beam.Ret)
if receiveErr != nil {
t.Fatalf("Error receiving beam message: %s", receiveErr)
}
if message.Verb != beam.Ack {
t.Errorf("Unexpected message name:\nActual: %s\nExpected: %s", message.Verb, beam.Ack.String())
}
// TODO full connect interaction
//if message.Att == nil {
// t.Fatalf("Missing attachment on message")
//}
//testBytes := []byte("Hello")
//n, writeErr := message.Att.Write(testBytes)
//if writeErr != nil {
// t.Fatalf("Error writing bytes: %s", writeErr)
//}
//if n != 5 {
// t.Fatalf("Unexpected number of bytes read:\nActual: %d\nExpected: 5", n)
//}
//buf := make([]byte, 10)
//n, readErr := message.Att.Read(buf)
//if readErr != nil {
// t.Fatalf("Error writing bytes: %s", readErr)
//}
//if n != 5 {
// t.Fatalf("Unexpected number of bytes read:\nActual: %d\nExpected: 5", n)
//}
//if bytes.Compare(buf[:n], testBytes) != 0 {
// t.Fatalf("Did not receive expected message:\nActual: %s\nExpectd: %s", buf, testBytes)
//}
closeErr := server.Close()
if closeErr != nil {
t.Fatalf("Error closing server: %s", closeErr)
}
closeErr = sender.Close()
if closeErr != nil {
t.Fatalf("Error closing sender: %s", closeErr)
}
<-end
}
func runServer(listen string, t *testing.T, endChan chan bool) (io.Closer, error) {
listener, lErr := net.Listen("tcp", listen)
if lErr != nil {
return nil, lErr
}
session, sessionErr := NewListenSession(listener, NoAuthenticator)
if sessionErr != nil {
t.Fatalf("Error creating session: %s", sessionErr)
}
go session.Serve()
go func() {
defer close(endChan)
// Ls exchange
message, receiveErr := session.Receive(beam.Ret)
if receiveErr != nil {
t.Fatalf("Error receiving on server: %s", receiveErr)
}
if message.Verb != beam.Ls {
t.Fatalf("Unexpected verb: %s", message.Verb)
}
receiver, sendErr := message.Ret.Send(&beam.Message{Verb: beam.Set, Args: []string{"file1", "file2", string([]byte{0x00, 0x00, 0x00})}})
if sendErr != nil {
t.Fatalf("Error sending set message: %s", sendErr)
}
_, receiveErr = receiver.Receive(0)
if receiveErr == nil {
t.Fatalf("No error received from empty receiver")
}
if receiveErr != io.EOF {
t.Fatalf("Expected error from empty receiver: %s", receiveErr)
}
// Connect exchange
message, receiveErr = session.Receive(beam.Ret)
if receiveErr != nil {
t.Fatalf("Error receiving on server: %s", receiveErr)
}
if message.Verb != beam.Attach {
t.Fatalf("Unexpected verb: %s", message.Verb)
}
receiver, sendErr = message.Ret.Send(&beam.Message{Verb: beam.Ack})
if sendErr != nil {
t.Fatalf("Error sending set message: %s", sendErr)
}
// TODO full connect interaction
}()
return listener, nil
}

View File

@ -32,6 +32,10 @@
"Comment": "0-4-g2fb21b3",
"Rev": "2fb21b34171f083d46d66195caa7ec121d941ec5"
},
{
"ImportPath": "github.com/docker/spdystream",
"Rev": "87969a9c4b79508e63004ac17a97eb9278c08b3c"
},
{
"ImportPath": "github.com/dotcloud/docker/api",
"Comment": "v0.11.1-466-g77ae37a",