From fc41d09026e6215eff9ea24eb65b3f6e1d6c55f1 Mon Sep 17 00:00:00 2001 From: Aanand Prasad Date: Thu, 5 Jun 2014 14:14:58 -0700 Subject: [PATCH] Rough implementation of 'docker run' Signed-off-by: Aanand Prasad (github: aanand) --- backends/forward.go | 164 +++++++++++++++++++++++++++++++++++ dockerclient/dockerclient.go | 30 +++++++ 2 files changed, 194 insertions(+) diff --git a/backends/forward.go b/backends/forward.go index 3e75d25d8b..0992235651 100644 --- a/backends/forward.go +++ b/backends/forward.go @@ -1,11 +1,18 @@ package backends import ( + "encoding/json" "fmt" "github.com/docker/libswarm/beam" "github.com/dotcloud/docker/engine" + "github.com/dotcloud/docker/runconfig" + "github.com/dotcloud/docker/utils" + "io" "io/ioutil" + "log" + "net" "net/http" + "net/http/httputil" "net/url" "strings" "time" @@ -26,6 +33,7 @@ func Forward() beam.Sender { instance.OnAttach(beam.Handler(f.attach)) instance.OnStart(beam.Handler(f.start)) instance.OnLs(beam.Handler(f.ls)) + instance.OnSpawn(beam.Handler(f.spawn)) _, err = ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: instance}) return err })) @@ -74,6 +82,104 @@ func (f *forwarder) ls(ctx *beam.Message) error { return nil } +func (f *forwarder) spawn(ctx *beam.Message) error { + if len(ctx.Args) < 1 { + return fmt.Errorf("forward: spawn takes at least 1 argument, got %d", len(ctx.Args)) + } + body, err := json.Marshal(&runconfig.Config{ + Image: ctx.Args[0], + Cmd: ctx.Args[1:], + AttachStdin: false, + AttachStdout: true, + AttachStderr: true, + }) + if err != nil { + return err + } + resp, err := f.client.call("POST", "/containers/create", string(body)) + if err != nil { + return err + } + respBody, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + if resp.StatusCode != 201 { + return fmt.Errorf("expected status code 201, got %d:\n%s", resp.StatusCode, respBody) + } + var respJson struct{ Id string } + if err = json.Unmarshal(respBody, &respJson); err != nil { + return err + } + c := f.newContainer(respJson.Id) + if _, err = ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: c}); err != nil { + return err + } + return nil +} + +func (f *forwarder) newContainer(id string) beam.Sender { + c := &container{forwarder: f, id: id} + instance := beam.NewServer() + instance.OnAttach(beam.Handler(c.attach)) + instance.OnStart(beam.Handler(c.start)) + return instance +} + +type container struct { + forwarder *forwarder + id string +} + +func (c *container) attach(ctx *beam.Message) error { + if _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Ack}); err != nil { + return err + } + + path := fmt.Sprintf("/containers/%s/attach?stdout=1&stderr=0&stream=1&logs=1", c.id) + + stdoutR, stdoutW := io.Pipe() + _, stderrW := io.Pipe() + go copyOutput(ctx.Ret, stdoutR) + c.forwarder.client.hijack("POST", path, nil, stdoutW, stderrW) + + return nil +} + +func copyOutput(sender beam.Sender, reader io.Reader) { + chunk := make([]byte, 4096) + for { + n, err := reader.Read(chunk) + if n > 0 { + sender.Send(&beam.Message{Verb: beam.Log, Args: []string{string(chunk[0:n])}}) + } + if err != nil { + message := fmt.Sprintf("Error reading from stream: %v", err) + sender.Send(&beam.Message{Verb: beam.Error, Args: []string{message}}) + break + } + } +} + +func (c *container) start(ctx *beam.Message) error { + path := fmt.Sprintf("/containers/%s/start", c.id) + resp, err := c.forwarder.client.call("POST", path, "{}") + if err != nil { + return err + } + respBody, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + if resp.StatusCode != 204 { + return fmt.Errorf("expected status code 204, got %d:\n%s", resp.StatusCode, respBody) + } + if _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Ack}); err != nil { + return err + } + return nil +} + type client struct { URL *url.URL proto string @@ -111,3 +217,61 @@ func (c *client) call(method, path, body string) (*http.Response, error) { } return resp, nil } + +func (c *client) hijack(method, path string, in io.ReadCloser, stdout, stderr io.Writer) error { + dial, err := net.Dial("tcp", c.URL.Host) + if err != nil { + return err + } + req, err := http.NewRequest(method, path, nil) + if err != nil { + return err + } + + clientconn := httputil.NewClientConn(dial, nil) + defer clientconn.Close() + + clientconn.Do(req) + + rwc, br := clientconn.Hijack() + defer rwc.Close() + + receiveStdout := utils.Go(func() (err error) { + defer func() { + if in != nil { + in.Close() + } + }() + // _, err = utils.StdCopy(stdout, stderr, br) + _, err = io.Copy(stdout, br) + log.Println("[hijack] End of stdout") + return err + }) + sendStdin := utils.Go(func() error { + if in != nil { + io.Copy(rwc, in) + log.Println("[hijack] End of stdin") + } + if tcpc, ok := rwc.(*net.TCPConn); ok { + if err := tcpc.CloseWrite(); err != nil { + log.Printf("Couldn't send EOF: %s\n", err) + } + } else if unixc, ok := rwc.(*net.UnixConn); ok { + if err := unixc.CloseWrite(); err != nil { + log.Printf("Couldn't send EOF: %s\n", err) + } + } + // Discard errors due to pipe interruption + return nil + }) + if err := <-receiveStdout; err != nil { + log.Printf("Error receiveStdout: %s\n", err) + return err + } + + if err := <-sendStdin; err != nil { + log.Printf("Error sendStdin: %s\n", err) + return err + } + return nil +} diff --git a/dockerclient/dockerclient.go b/dockerclient/dockerclient.go index b4af69a4a7..c0d250f37a 100644 --- a/dockerclient/dockerclient.go +++ b/dockerclient/dockerclient.go @@ -81,6 +81,36 @@ func doCmd(instance *beam.Object, args []string) error { fmt.Println(strings.Join(names, "\n")) return nil } + if args[0] == "run" { + if len(args) < 3 { + return fmt.Errorf("usage: run IMAGE COMMAND...") + } + container, err := instance.Spawn(args[1:]...) + if err != nil { + return fmt.Errorf("spawn: %v", err) + } + logs, _, err := container.Attach("") + if err != nil { + return fmt.Errorf("attach: %v", err) + } + if err = container.Start(); err != nil { + return fmt.Errorf("start: %v", err) + } + for { + msg, err := logs.Receive(beam.Ret) + if err != nil { + if err.Error() == "EOF" { + break + } + return fmt.Errorf("error reading from container: %v", err) + } + if msg.Verb != beam.Log { + return fmt.Errorf("unexpected message reading from container: %v", msg) + } + fmt.Print(msg.Args[0]) + } + return nil + } return fmt.Errorf("unrecognised command: %s", args[0]) }