diff --git a/beam/http2/server.go b/beam/http2/server.go new file mode 100644 index 0000000000..4ec20d3272 --- /dev/null +++ b/beam/http2/server.go @@ -0,0 +1,66 @@ +package http2 + +import ( + "github.com/docker/libswarm/beam" + "github.com/docker/spdystream" + "net" + "sync" +) + +// Serve a Beam endpoint over a single HTTP2 connection +type Server struct { + conn *spdystream.Connection + streamChan chan *spdystream.Stream + streamLock sync.RWMutex + subStreamChans map[string]chan *spdystream.Stream +} + +// Create a Beam receiver from a net.Conn +func NewServer(conn net.Conn) (*Server, error) { + spdyConn, err := spdystream.NewConnection(conn, true) + if err != nil { + return nil, err + } + + s := &Server{ + conn: spdyConn, + streamChan: make(chan *spdystream.Stream), + subStreamChans: make(map[string]chan *spdystream.Stream), + } + go s.conn.Serve(s.streamHandler, spdystream.NoAuthHandler) + + return s, nil +} + +func (s *Server) Close() error { + return s.conn.Close() +} + +func (s *Server) Receive(mode int) (*beam.Message, error) { + stream := <-s.streamChan + return createStreamMessage(stream, mode, s, nil) +} + +func (s *Server) streamHandler(stream *spdystream.Stream) { + streamChan := s.getStreamChan(stream.Parent()) + streamChan <- stream +} + +func (s *Server) addStreamChan(stream *spdystream.Stream, streamChan chan *spdystream.Stream) { + s.streamLock.Lock() + s.subStreamChans[stream.String()] = streamChan + s.streamLock.Unlock() +} + +func (s *Server) getStreamChan(stream *spdystream.Stream) chan *spdystream.Stream { + if stream == nil { + return s.streamChan + } + s.streamLock.RLock() + defer s.streamLock.RUnlock() + streamChan, ok := s.subStreamChans[stream.String()] + if ok { + return streamChan + } + return s.streamChan +} diff --git a/beam/ws/ws.go b/beam/ws/ws.go new file mode 100644 index 0000000000..f6c03a9424 --- /dev/null +++ b/beam/ws/ws.go @@ -0,0 +1,72 @@ +package ws + +import ( + "errors" + "github.com/docker/libswarm/beam" + "github.com/docker/libswarm/beam/http2" + "github.com/docker/spdystream/ws" + "github.com/gorilla/websocket" + "net/http" +) + +// Connect to a Beam server over a Websocket connection as a client +func NewSender(wsConn *websocket.Conn) (beam.Sender, error) { + session, err := http2.NewStreamSession(ws.NewConnection(wsConn)) + if err != nil { + return nil, err + } + return session, nil +} + +// Upgrade an HTTP connection to a Beam over HTTP2 over +// Websockets connection. +type Upgrader struct { + Upgrader websocket.Upgrader +} + +func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (*http2.Server, error) { + wsConn, err := u.Upgrader.Upgrade(w, r, responseHeader) + if err != nil { + return nil, err + } + + netConn := ws.NewConnection(wsConn) + server, err := http2.NewServer(netConn) + if err != nil { + netConn.Close() + return nil, err + } + + return server, nil +} + +// Returns true if a handshake error occured in websockets, which means +// a response has already been written to the stream. +func IsHandshakeError(err error) bool { + _, ok := err.(websocket.HandshakeError) + return ok +} + +type BeamFunc func(beam.Receiver) + +// Handler function for serving Beam over HTTP. Will invoke f and +// then close the server's Beam endpoint after f returns. +func Serve(u *Upgrader, f BeamFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if r.Method != "GET" { + u.Upgrader.Error(w, r, http.StatusMethodNotAllowed, errors.New("Method not allowed")) + return + } + + server, err := u.Upgrade(w, r, nil) + if err != nil { + if !IsHandshakeError(err) { + u.Upgrader.Error(w, r, http.StatusInternalServerError, errors.New("Unable to open an HTTP2 connection over Websockets")) + } + return + } + defer server.Close() + + f(server) + } +} diff --git a/beam/ws/ws_test.go b/beam/ws/ws_test.go new file mode 100644 index 0000000000..6dd96a6645 --- /dev/null +++ b/beam/ws/ws_test.go @@ -0,0 +1,73 @@ +package ws + +import ( + "github.com/docker/libswarm/beam" + "github.com/gorilla/websocket" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" +) + +func TestServe(t *testing.T) { + gotAck := make(chan bool) + u := &Upgrader{} + server := httptest.NewServer(Serve(u, func(r beam.Receiver) { + msg, msgErr := r.Receive(beam.Ret) + if msgErr != nil { + t.Fatalf("Error receiving message: %s", msgErr) + } + if msg.Att == nil { + t.Fatalf("Error message missing attachment") + } + if msg.Verb != beam.Attach { + t.Fatalf("Wrong verb\nActual: %s\nExpecting: %s", msg.Verb, beam.Attach) + } + + receiver, sendErr := msg.Ret.Send(&beam.Message{Verb: beam.Ack}) + if sendErr != nil { + t.Fatalf("Error sending return message: %s", sendErr) + } + + _, ackErr := receiver.Receive(0) + if ackErr == nil { + t.Fatalf("No error receiving from message with no return pipe") + } + if ackErr != io.EOF { + t.Fatalf("Unexpected error receiving from message: %s", ackErr) + } + + <-gotAck + })) + + wsConn, _, err := websocket.DefaultDialer.Dial(strings.Replace(server.URL, "http://", "ws://", 1), http.Header{"Origin": {server.URL}}) + if err != nil { + t.Fatal(err) + } + sender, senderErr := NewSender(wsConn) + if senderErr != nil { + t.Fatalf("Error creating sender: %s", senderErr) + } + + receiver, sendErr := sender.Send(&beam.Message{Verb: beam.Attach, Ret: beam.RetPipe}) + if sendErr != nil { + t.Fatalf("Error sending message: %s", sendErr) + } + + msg, receiveErr := receiver.Receive(beam.Ret) + if receiveErr != nil { + t.Fatalf("Error receiving message") + } + + if msg.Verb != beam.Ack { + t.Fatalf("Wrong verb\nActual: %s\nExpecting: %s", msg.Verb, beam.Ack) + } + + gotAck <- true + + shutdownErr := sender.Close() + if shutdownErr != nil && !strings.Contains(shutdownErr.Error(), "broken pipe") { + t.Fatalf("Error closing: %s", shutdownErr) + } +}