From 25d1250c2c3d66b5cc595911b32e9ed57d460fc3 Mon Sep 17 00:00:00 2001 From: Ben Firshman Date: Mon, 9 Jun 2014 11:49:56 -0700 Subject: [PATCH] Add attach to dockerserver - Move copyOutput helper from dockerclient to beam/stream.go - Implement /containers//attach and /containers//wait Signed-off-by: Ben Firshman --- backends/dockerclient.go | 19 +------- backends/dockerserver.go | 95 ++++++++++++++++++++++++++++++++++++++-- beam/stream.go | 38 ++++++++++++++++ 3 files changed, 132 insertions(+), 20 deletions(-) create mode 100644 beam/stream.go diff --git a/backends/dockerclient.go b/backends/dockerclient.go index 38127c4474..e2b0f50113 100644 --- a/backends/dockerclient.go +++ b/backends/dockerclient.go @@ -161,28 +161,13 @@ func (c *container) attach(ctx *beam.Message) error { stdoutR, stdoutW := io.Pipe() stderrR, stderrW := io.Pipe() - go copyOutput(ctx.Ret, stdoutR, "stdout") - go copyOutput(ctx.Ret, stderrR, "stderr") + go beam.EncodeStream(ctx.Ret, stdoutR, "stdout") + go beam.EncodeStream(ctx.Ret, stderrR, "stderr") c.backend.client.hijack("POST", path, nil, stdoutW, stderrW) return nil } -func copyOutput(sender beam.Sender, reader io.Reader, tag string) { - chunk := make([]byte, 4096) - for { - n, err := reader.Read(chunk) - if n > 0 { - sender.Send(&beam.Message{Verb: beam.Log, Args: []string{tag, string(chunk[0:n])}}) - } - if err != nil { - message := fmt.Sprintf("Error reading from stream: %v", err) - sender.Send(&beam.Message{Verb: beam.Error, Args: []string{message}}) - break - } - } -} - func (c *container) start(ctx *beam.Message) error { path := fmt.Sprintf("/containers/%s/start", c.id) resp, err := c.backend.client.call("POST", path, "{}") diff --git a/backends/dockerserver.go b/backends/dockerserver.go index af14aa0ca1..d6fe22d8fd 100644 --- a/backends/dockerserver.go +++ b/backends/dockerserver.go @@ -6,12 +6,15 @@ import ( "github.com/docker/libswarm/beam" "github.com/dotcloud/docker/api" "github.com/dotcloud/docker/pkg/version" + "github.com/dotcloud/docker/utils" "github.com/gorilla/mux" + "io" "io/ioutil" "net" "net/http" "net/url" "strings" + "sync" "time" "strconv" ) @@ -241,6 +244,90 @@ func postContainersStop(out beam.Sender, version version.Version, w http.Respons return nil } +func hijackServer(w http.ResponseWriter) (io.ReadCloser, io.Writer, error) { + conn, _, err := w.(http.Hijacker).Hijack() + if err != nil { + return nil, nil, err + } + // Flush the options to make sure the client sets the raw mode + conn.Write([]byte{}) + return conn, conn, nil +} + +func postContainersAttach(out beam.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error { + if err := r.ParseForm(); err != nil { + return err + } + if vars == nil { + return fmt.Errorf("Missing parameter") + } + + inStream, outStream, err := hijackServer(w) + if err != nil { + return err + } + defer func() { + if tcpc, ok := inStream.(*net.TCPConn); ok { + tcpc.CloseWrite() + } else { + inStream.Close() + } + }() + defer func() { + if tcpc, ok := outStream.(*net.TCPConn); ok { + tcpc.CloseWrite() + } else if closer, ok := outStream.(io.Closer); ok { + closer.Close() + } + }() + + fmt.Fprintf(outStream, "HTTP/1.1 200 OK\r\nContent-Type: application/vnd.docker.raw-stream\r\n\r\n") + + // TODO: if a TTY, then no multiplexing is done + errStream := utils.NewStdWriter(outStream, utils.Stderr) + outStream = utils.NewStdWriter(outStream, utils.Stdout) + + _, containerOut, err := beam.Obj(out).Attach(vars["name"]) + if err != nil { + return err + } + container := beam.Obj(containerOut) + + containerR, _, err := container.Attach("") + var tasks sync.WaitGroup + go func() { + defer tasks.Done() + err := beam.DecodeStream(outStream, containerR, "stdout") + if err != nil { + fmt.Printf("decodestream: %v\n", err) + } + }() + tasks.Add(1) + go func() { + defer tasks.Done() + err := beam.DecodeStream(errStream, containerR, "stderr") + if err != nil { + fmt.Printf("decodestream: %v\n", err) + } + }() + tasks.Add(1) + tasks.Wait() + + return nil +} + +func postContainersWait(out beam.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error { + if vars == nil { + return fmt.Errorf("Missing parameter") + } + + // TODO: this should wait for container to end out output correct + // exit status + return writeJSON(w, http.StatusOK, map[string]interface{}{ + "StatusCode": "0", + }) +} + func createRouter(out beam.Sender) (*mux.Router, error) { r := mux.NewRouter() m := map[string]map[string]HttpApiFunc{ @@ -249,9 +336,11 @@ func createRouter(out beam.Sender) (*mux.Router, error) { "/containers/json": getContainersJSON, }, "POST": { - "/containers/create": postContainersCreate, - "/containers/{name:.*}/start": postContainersStart, - "/containers/{name:.*}/stop": postContainersStop, + "/containers/create": postContainersCreate, + "/containers/{name:.*}/attach": postContainersAttach, + "/containers/{name:.*}/start": postContainersStart, + "/containers/{name:.*}/stop": postContainersStop, + "/containers/{name:.*}/wait": postContainersWait, }, "DELETE": {}, "OPTIONS": {}, diff --git a/beam/stream.go b/beam/stream.go new file mode 100644 index 0000000000..beef7229c1 --- /dev/null +++ b/beam/stream.go @@ -0,0 +1,38 @@ +package beam + +import ( + "fmt" + "io" +) + +func EncodeStream(sender Sender, reader io.Reader, tag string) { + chunk := make([]byte, 4096) + for { + n, err := reader.Read(chunk) + if n > 0 { + sender.Send(&Message{Verb: Log, Args: []string{tag, string(chunk[0:n])}}) + } + if err != nil { + message := fmt.Sprintf("Error reading from stream: %v", err) + sender.Send(&Message{Verb: Error, Args: []string{message}}) + break + } + } +} + +func DecodeStream(dst io.Writer, src Receiver, tag string) error { + for { + msg, err := src.Receive(Ret) + if err == io.EOF { + return nil + } + if err != nil { + return err + } + if tag == msg.Args[0] { + if _, err := dst.Write([]byte(msg.Args[1])); err != nil { + return err + } + } + } +}