Add attach to dockerserver

- Move copyOutput helper from dockerclient to beam/stream.go
 - Implement /containers/<id>/attach and /containers/<id>/wait

Signed-off-by: Ben Firshman <ben@firshman.co.uk>
This commit is contained in:
Ben Firshman 2014-06-09 11:49:56 -07:00 committed by Aaron Feng
parent f1783fc527
commit 25d1250c2c
3 changed files with 132 additions and 20 deletions

View File

@ -161,28 +161,13 @@ func (c *container) attach(ctx *beam.Message) error {
stdoutR, stdoutW := io.Pipe() stdoutR, stdoutW := io.Pipe()
stderrR, stderrW := io.Pipe() stderrR, stderrW := io.Pipe()
go copyOutput(ctx.Ret, stdoutR, "stdout") go beam.EncodeStream(ctx.Ret, stdoutR, "stdout")
go copyOutput(ctx.Ret, stderrR, "stderr") go beam.EncodeStream(ctx.Ret, stderrR, "stderr")
c.backend.client.hijack("POST", path, nil, stdoutW, stderrW) c.backend.client.hijack("POST", path, nil, stdoutW, stderrW)
return nil return nil
} }
func copyOutput(sender beam.Sender, reader io.Reader, tag string) {
chunk := make([]byte, 4096)
for {
n, err := reader.Read(chunk)
if n > 0 {
sender.Send(&beam.Message{Verb: beam.Log, Args: []string{tag, string(chunk[0:n])}})
}
if err != nil {
message := fmt.Sprintf("Error reading from stream: %v", err)
sender.Send(&beam.Message{Verb: beam.Error, Args: []string{message}})
break
}
}
}
func (c *container) start(ctx *beam.Message) error { func (c *container) start(ctx *beam.Message) error {
path := fmt.Sprintf("/containers/%s/start", c.id) path := fmt.Sprintf("/containers/%s/start", c.id)
resp, err := c.backend.client.call("POST", path, "{}") resp, err := c.backend.client.call("POST", path, "{}")

View File

@ -6,12 +6,15 @@ import (
"github.com/docker/libswarm/beam" "github.com/docker/libswarm/beam"
"github.com/dotcloud/docker/api" "github.com/dotcloud/docker/api"
"github.com/dotcloud/docker/pkg/version" "github.com/dotcloud/docker/pkg/version"
"github.com/dotcloud/docker/utils"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"io"
"io/ioutil" "io/ioutil"
"net" "net"
"net/http" "net/http"
"net/url" "net/url"
"strings" "strings"
"sync"
"time" "time"
"strconv" "strconv"
) )
@ -241,6 +244,90 @@ func postContainersStop(out beam.Sender, version version.Version, w http.Respons
return nil return nil
} }
func hijackServer(w http.ResponseWriter) (io.ReadCloser, io.Writer, error) {
conn, _, err := w.(http.Hijacker).Hijack()
if err != nil {
return nil, nil, err
}
// Flush the options to make sure the client sets the raw mode
conn.Write([]byte{})
return conn, conn, nil
}
func postContainersAttach(out beam.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := r.ParseForm(); err != nil {
return err
}
if vars == nil {
return fmt.Errorf("Missing parameter")
}
inStream, outStream, err := hijackServer(w)
if err != nil {
return err
}
defer func() {
if tcpc, ok := inStream.(*net.TCPConn); ok {
tcpc.CloseWrite()
} else {
inStream.Close()
}
}()
defer func() {
if tcpc, ok := outStream.(*net.TCPConn); ok {
tcpc.CloseWrite()
} else if closer, ok := outStream.(io.Closer); ok {
closer.Close()
}
}()
fmt.Fprintf(outStream, "HTTP/1.1 200 OK\r\nContent-Type: application/vnd.docker.raw-stream\r\n\r\n")
// TODO: if a TTY, then no multiplexing is done
errStream := utils.NewStdWriter(outStream, utils.Stderr)
outStream = utils.NewStdWriter(outStream, utils.Stdout)
_, containerOut, err := beam.Obj(out).Attach(vars["name"])
if err != nil {
return err
}
container := beam.Obj(containerOut)
containerR, _, err := container.Attach("")
var tasks sync.WaitGroup
go func() {
defer tasks.Done()
err := beam.DecodeStream(outStream, containerR, "stdout")
if err != nil {
fmt.Printf("decodestream: %v\n", err)
}
}()
tasks.Add(1)
go func() {
defer tasks.Done()
err := beam.DecodeStream(errStream, containerR, "stderr")
if err != nil {
fmt.Printf("decodestream: %v\n", err)
}
}()
tasks.Add(1)
tasks.Wait()
return nil
}
func postContainersWait(out beam.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if vars == nil {
return fmt.Errorf("Missing parameter")
}
// TODO: this should wait for container to end out output correct
// exit status
return writeJSON(w, http.StatusOK, map[string]interface{}{
"StatusCode": "0",
})
}
func createRouter(out beam.Sender) (*mux.Router, error) { func createRouter(out beam.Sender) (*mux.Router, error) {
r := mux.NewRouter() r := mux.NewRouter()
m := map[string]map[string]HttpApiFunc{ m := map[string]map[string]HttpApiFunc{
@ -249,9 +336,11 @@ func createRouter(out beam.Sender) (*mux.Router, error) {
"/containers/json": getContainersJSON, "/containers/json": getContainersJSON,
}, },
"POST": { "POST": {
"/containers/create": postContainersCreate, "/containers/create": postContainersCreate,
"/containers/{name:.*}/start": postContainersStart, "/containers/{name:.*}/attach": postContainersAttach,
"/containers/{name:.*}/stop": postContainersStop, "/containers/{name:.*}/start": postContainersStart,
"/containers/{name:.*}/stop": postContainersStop,
"/containers/{name:.*}/wait": postContainersWait,
}, },
"DELETE": {}, "DELETE": {},
"OPTIONS": {}, "OPTIONS": {},

38
beam/stream.go Normal file
View File

@ -0,0 +1,38 @@
package beam
import (
"fmt"
"io"
)
func EncodeStream(sender Sender, reader io.Reader, tag string) {
chunk := make([]byte, 4096)
for {
n, err := reader.Read(chunk)
if n > 0 {
sender.Send(&Message{Verb: Log, Args: []string{tag, string(chunk[0:n])}})
}
if err != nil {
message := fmt.Sprintf("Error reading from stream: %v", err)
sender.Send(&Message{Verb: Error, Args: []string{message}})
break
}
}
}
func DecodeStream(dst io.Writer, src Receiver, tag string) error {
for {
msg, err := src.Receive(Ret)
if err == io.EOF {
return nil
}
if err != nil {
return err
}
if tag == msg.Args[0] {
if _, err := dst.Write([]byte(msg.Args[1])); err != nil {
return err
}
}
}
}