Merge pull request #49 from smarterclayton/add_websocket_server

This commit is contained in:
Solomon Hykes 2014-06-07 21:02:54 -07:00
commit f67c1b7e4b
3 changed files with 211 additions and 0 deletions

66
beam/http2/server.go Normal file
View File

@ -0,0 +1,66 @@
package http2
import (
"github.com/docker/libswarm/beam"
"github.com/docker/spdystream"
"net"
"sync"
)
// Serve a Beam endpoint over a single HTTP2 connection
type Server struct {
conn *spdystream.Connection
streamChan chan *spdystream.Stream
streamLock sync.RWMutex
subStreamChans map[string]chan *spdystream.Stream
}
// Create a Beam receiver from a net.Conn
func NewServer(conn net.Conn) (*Server, error) {
spdyConn, err := spdystream.NewConnection(conn, true)
if err != nil {
return nil, err
}
s := &Server{
conn: spdyConn,
streamChan: make(chan *spdystream.Stream),
subStreamChans: make(map[string]chan *spdystream.Stream),
}
go s.conn.Serve(s.streamHandler, spdystream.NoAuthHandler)
return s, nil
}
func (s *Server) Close() error {
return s.conn.Close()
}
func (s *Server) Receive(mode int) (*beam.Message, error) {
stream := <-s.streamChan
return createStreamMessage(stream, mode, s, nil)
}
func (s *Server) streamHandler(stream *spdystream.Stream) {
streamChan := s.getStreamChan(stream.Parent())
streamChan <- stream
}
func (s *Server) addStreamChan(stream *spdystream.Stream, streamChan chan *spdystream.Stream) {
s.streamLock.Lock()
s.subStreamChans[stream.String()] = streamChan
s.streamLock.Unlock()
}
func (s *Server) getStreamChan(stream *spdystream.Stream) chan *spdystream.Stream {
if stream == nil {
return s.streamChan
}
s.streamLock.RLock()
defer s.streamLock.RUnlock()
streamChan, ok := s.subStreamChans[stream.String()]
if ok {
return streamChan
}
return s.streamChan
}

72
beam/ws/ws.go Normal file
View File

@ -0,0 +1,72 @@
package ws
import (
"errors"
"github.com/docker/libswarm/beam"
"github.com/docker/libswarm/beam/http2"
"github.com/docker/spdystream/ws"
"github.com/gorilla/websocket"
"net/http"
)
// Connect to a Beam server over a Websocket connection as a client
func NewSender(wsConn *websocket.Conn) (beam.Sender, error) {
session, err := http2.NewStreamSession(ws.NewConnection(wsConn))
if err != nil {
return nil, err
}
return session, nil
}
// Upgrade an HTTP connection to a Beam over HTTP2 over
// Websockets connection.
type Upgrader struct {
Upgrader websocket.Upgrader
}
func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (*http2.Server, error) {
wsConn, err := u.Upgrader.Upgrade(w, r, responseHeader)
if err != nil {
return nil, err
}
netConn := ws.NewConnection(wsConn)
server, err := http2.NewServer(netConn)
if err != nil {
netConn.Close()
return nil, err
}
return server, nil
}
// Returns true if a handshake error occured in websockets, which means
// a response has already been written to the stream.
func IsHandshakeError(err error) bool {
_, ok := err.(websocket.HandshakeError)
return ok
}
type BeamFunc func(beam.Receiver)
// Handler function for serving Beam over HTTP. Will invoke f and
// then close the server's Beam endpoint after f returns.
func Serve(u *Upgrader, f BeamFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
u.Upgrader.Error(w, r, http.StatusMethodNotAllowed, errors.New("Method not allowed"))
return
}
server, err := u.Upgrade(w, r, nil)
if err != nil {
if !IsHandshakeError(err) {
u.Upgrader.Error(w, r, http.StatusInternalServerError, errors.New("Unable to open an HTTP2 connection over Websockets"))
}
return
}
defer server.Close()
f(server)
}
}

73
beam/ws/ws_test.go Normal file
View File

@ -0,0 +1,73 @@
package ws
import (
"github.com/docker/libswarm/beam"
"github.com/gorilla/websocket"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
)
func TestServe(t *testing.T) {
gotAck := make(chan bool)
u := &Upgrader{}
server := httptest.NewServer(Serve(u, func(r beam.Receiver) {
msg, msgErr := r.Receive(beam.Ret)
if msgErr != nil {
t.Fatalf("Error receiving message: %s", msgErr)
}
if msg.Att == nil {
t.Fatalf("Error message missing attachment")
}
if msg.Verb != beam.Attach {
t.Fatalf("Wrong verb\nActual: %s\nExpecting: %s", msg.Verb, beam.Attach)
}
receiver, sendErr := msg.Ret.Send(&beam.Message{Verb: beam.Ack})
if sendErr != nil {
t.Fatalf("Error sending return message: %s", sendErr)
}
_, ackErr := receiver.Receive(0)
if ackErr == nil {
t.Fatalf("No error receiving from message with no return pipe")
}
if ackErr != io.EOF {
t.Fatalf("Unexpected error receiving from message: %s", ackErr)
}
<-gotAck
}))
wsConn, _, err := websocket.DefaultDialer.Dial(strings.Replace(server.URL, "http://", "ws://", 1), http.Header{"Origin": {server.URL}})
if err != nil {
t.Fatal(err)
}
sender, senderErr := NewSender(wsConn)
if senderErr != nil {
t.Fatalf("Error creating sender: %s", senderErr)
}
receiver, sendErr := sender.Send(&beam.Message{Verb: beam.Attach, Ret: beam.RetPipe})
if sendErr != nil {
t.Fatalf("Error sending message: %s", sendErr)
}
msg, receiveErr := receiver.Receive(beam.Ret)
if receiveErr != nil {
t.Fatalf("Error receiving message")
}
if msg.Verb != beam.Ack {
t.Fatalf("Wrong verb\nActual: %s\nExpecting: %s", msg.Verb, beam.Ack)
}
gotAck <- true
shutdownErr := sender.Close()
if shutdownErr != nil && !strings.Contains(shutdownErr.Error(), "broken pipe") {
t.Fatalf("Error closing: %s", shutdownErr)
}
}