diff --git a/backends/backends.go b/backends/backends.go index f3eaccf3c2..6a5b1517cf 100644 --- a/backends/backends.go +++ b/backends/backends.go @@ -15,7 +15,7 @@ func New() *beam.Object { backends.Bind("simulator", Simulator()) backends.Bind("debug", Debug()) backends.Bind("fakeclient", FakeClient()) - backends.Bind("forward", Forward()) + backends.Bind("dockerclient", DockerClient()) backends.Bind("exec", Exec()) backends.Bind("dockerserver", DockerServer()) backends.Bind("orchard", Orchard()) diff --git a/backends/forward.go b/backends/dockerclient.go similarity index 81% rename from backends/forward.go rename to backends/dockerclient.go index 046804c85d..38127c4474 100644 --- a/backends/forward.go +++ b/backends/dockerclient.go @@ -14,62 +14,58 @@ import ( "net/http/httputil" "net/url" "strings" - "time" ) -type ForwardConfig struct { +type DockerClientConfig struct { Scheme string URLHost string TLSClientConfig *tls.Config } -func Forward() beam.Sender { - return ForwardWithConfig(&ForwardConfig{ +func DockerClient() beam.Sender { + return DockerClientWithConfig(&DockerClientConfig{ Scheme: "http", URLHost: "dummy.host", }) } -func ForwardWithConfig(config *ForwardConfig) beam.Sender { +func DockerClientWithConfig(config *DockerClientConfig) beam.Sender { backend := beam.NewServer() backend.OnSpawn(beam.Handler(func(ctx *beam.Message) error { if len(ctx.Args) != 1 { - return fmt.Errorf("forward: spawn takes exactly 1 argument, got %d", len(ctx.Args)) + return fmt.Errorf("dockerclient: spawn takes exactly 1 argument, got %d", len(ctx.Args)) } client := newClient() client.scheme = config.Scheme client.urlHost = config.URLHost client.transport.TLSClientConfig = config.TLSClientConfig client.setURL(ctx.Args[0]) - f := &forwarder{ + b := &dockerClientBackend{ client: client, Server: beam.NewServer(), } - f.Server.OnAttach(beam.Handler(f.attach)) - f.Server.OnStart(beam.Handler(f.start)) - f.Server.OnLs(beam.Handler(f.ls)) - f.Server.OnSpawn(beam.Handler(f.spawn)) - _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: f.Server}) + b.Server.OnAttach(beam.Handler(b.attach)) + b.Server.OnStart(beam.Handler(b.start)) + b.Server.OnLs(beam.Handler(b.ls)) + b.Server.OnSpawn(beam.Handler(b.spawn)) + _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: b.Server}) return err })) return backend } -type forwarder struct { +type dockerClientBackend struct { client *client *beam.Server } -func (f *forwarder) attach(ctx *beam.Message) error { +func (b *dockerClientBackend) attach(ctx *beam.Message) error { if ctx.Args[0] == "" { - ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: f.Server}) - for { - time.Sleep(1 * time.Second) - (&beam.Object{ctx.Ret}).Log("forward: heartbeat") - } + ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: b.Server}) + <-make(chan struct{}) } else { path := fmt.Sprintf("/containers/%s/json", ctx.Args[0]) - resp, err := f.client.call("GET", path, "") + resp, err := b.client.call("GET", path, "") if err != nil { return err } @@ -80,19 +76,19 @@ func (f *forwarder) attach(ctx *beam.Message) error { if resp.StatusCode != 200 { return fmt.Errorf("%s", respBody) } - c := f.newContainer(ctx.Args[0]) + c := b.newContainer(ctx.Args[0]) ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: c}) } return nil } -func (f *forwarder) start(ctx *beam.Message) error { +func (b *dockerClientBackend) start(ctx *beam.Message) error { ctx.Ret.Send(&beam.Message{Verb: beam.Ack}) return nil } -func (f *forwarder) ls(ctx *beam.Message) error { - resp, err := f.client.call("GET", "/containers/json", "") +func (b *dockerClientBackend) ls(ctx *beam.Message) error { + resp, err := b.client.call("GET", "/containers/json", "") if err != nil { return fmt.Errorf("get: %v", err) } @@ -115,11 +111,11 @@ func (f *forwarder) ls(ctx *beam.Message) error { return nil } -func (f *forwarder) spawn(ctx *beam.Message) error { +func (b *dockerClientBackend) spawn(ctx *beam.Message) error { if len(ctx.Args) != 1 { - return fmt.Errorf("forward: spawn takes exactly 1 argument, got %d", len(ctx.Args)) + return fmt.Errorf("dockerclient: spawn takes exactly 1 argument, got %d", len(ctx.Args)) } - resp, err := f.client.call("POST", "/containers/create", ctx.Args[0]) + resp, err := b.client.call("POST", "/containers/create", ctx.Args[0]) if err != nil { return err } @@ -134,15 +130,15 @@ func (f *forwarder) spawn(ctx *beam.Message) error { if err = json.Unmarshal(respBody, &respJson); err != nil { return err } - c := f.newContainer(respJson.Id) + c := b.newContainer(respJson.Id) if _, err = ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: c}); err != nil { return err } return nil } -func (f *forwarder) newContainer(id string) beam.Sender { - c := &container{forwarder: f, id: id} +func (b *dockerClientBackend) newContainer(id string) beam.Sender { + c := &container{backend: b, id: id} instance := beam.NewServer() instance.OnAttach(beam.Handler(c.attach)) instance.OnStart(beam.Handler(c.start)) @@ -152,8 +148,8 @@ func (f *forwarder) newContainer(id string) beam.Sender { } type container struct { - forwarder *forwarder - id string + backend *dockerClientBackend + id string } func (c *container) attach(ctx *beam.Message) error { @@ -167,7 +163,7 @@ func (c *container) attach(ctx *beam.Message) error { stderrR, stderrW := io.Pipe() go copyOutput(ctx.Ret, stdoutR, "stdout") go copyOutput(ctx.Ret, stderrR, "stderr") - c.forwarder.client.hijack("POST", path, nil, stdoutW, stderrW) + c.backend.client.hijack("POST", path, nil, stdoutW, stderrW) return nil } @@ -189,7 +185,7 @@ func copyOutput(sender beam.Sender, reader io.Reader, tag string) { func (c *container) start(ctx *beam.Message) error { path := fmt.Sprintf("/containers/%s/start", c.id) - resp, err := c.forwarder.client.call("POST", path, "{}") + resp, err := c.backend.client.call("POST", path, "{}") if err != nil { return err } @@ -208,7 +204,7 @@ func (c *container) start(ctx *beam.Message) error { func (c *container) stop(ctx *beam.Message) error { path := fmt.Sprintf("/containers/%s/stop", c.id) - resp, err := c.forwarder.client.call("POST", path, "{}") + resp, err := c.backend.client.call("POST", path, "") if err != nil { return err } @@ -227,7 +223,7 @@ func (c *container) stop(ctx *beam.Message) error { func (c *container) get(ctx *beam.Message) error { path := fmt.Sprintf("/containers/%s/json", c.id) - resp, err := c.forwarder.client.call("GET", path, "") + resp, err := c.backend.client.call("GET", path, "") if err != nil { return err } diff --git a/backends/dockerclient_test.go b/backends/dockerclient_test.go new file mode 100644 index 0000000000..4c5948893a --- /dev/null +++ b/backends/dockerclient_test.go @@ -0,0 +1,186 @@ +package backends + +import ( + "github.com/docker/libswarm/beam" + + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "reflect" + "strings" + "testing" +) + +type requestStub struct { + reqMethod string + reqPath string + reqBody string + + resStatus int + resBody string +} + +func TestBackendSpawn(t *testing.T) { + instance(t, nil) +} + +func TestAttachAndStart(t *testing.T) { + i := instance(t, nil) + _, _, err := i.Attach("") + if err != nil { + t.Fatal(err) + } + err = i.Start() + if err != nil { + t.Fatal(err) + } +} + +func TestLs(t *testing.T) { + server := makeServer(t, &requestStub{ + reqMethod: "GET", + reqPath: "/containers/json", + resBody: ` + [ + {"Names": ["/foo"]}, + {"Names": ["/bar"]} + ] + `, + }) + i := instance(t, server) + names, err := i.Ls() + if err != nil { + t.Fatal(err) + } + expected := []string{"foo", "bar"} + if !reflect.DeepEqual(names, expected) { + t.Fatalf("expected %#v, got %#v", expected, names) + } +} + +func TestSpawn(t *testing.T) { + server := makeServer(t, &requestStub{ + reqMethod: "POST", + reqPath: "/containers/create", + reqBody: "{}", + + resStatus: 201, + resBody: "{}", + }) + i := instance(t, server) + _, err := i.Spawn("{}") + if err != nil { + t.Fatal(err) + } +} + +func TestAttachToChild(t *testing.T) { + name := "foo" + server := makeServer(t, &requestStub{ + reqMethod: "GET", + reqPath: fmt.Sprintf("/containers/%s/json", name), + + resBody: "{}", + }) + i := instance(t, server) + child(t, server, i, name) +} + +func TestStartChild(t *testing.T) { + name := "foo" + server := makeServer(t, &requestStub{ + reqMethod: "GET", + reqPath: fmt.Sprintf("/containers/%s/json", name), + + resBody: "{}", + }, &requestStub{ + reqMethod: "POST", + reqPath: fmt.Sprintf("/containers/%s/start", name), + reqBody: "{}", + + resStatus: 204, + }) + i := instance(t, server) + c := child(t, server, i, name) + err := c.Start() + if err != nil { + t.Fatal(err) + } +} + +func TestStopChild(t *testing.T) { + name := "foo" + server := makeServer(t, &requestStub{ + reqMethod: "GET", + reqPath: fmt.Sprintf("/containers/%s/json", name), + + resBody: "{}", + }, &requestStub{ + reqMethod: "POST", + reqPath: fmt.Sprintf("/containers/%s/stop", name), + + resStatus: 204, + }) + i := instance(t, server) + c := child(t, server, i, name) + err := c.Stop() + if err != nil { + t.Fatal(err) + } +} + +func makeServer(t *testing.T, stubs ...*requestStub) *httptest.Server { + for _, stub := range stubs { + stub.reqPath = fmt.Sprintf("/v1.11%s", stub.reqPath) + } + + stubSummaries := []string{} + for _, stub := range stubs { + stubSummaries = append(stubSummaries, fmt.Sprintf("%s %s", stub.reqMethod, stub.reqPath)) + } + + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + for _, stub := range stubs { + if r.Method == stub.reqMethod && r.URL.Path == stub.reqPath { + body, err := ioutil.ReadAll(r.Body) + if err != nil { + t.Fatal(err) + } + if string(body) != stub.reqBody { + t.Fatalf("expected body: %#v, got body: %#v", stub.reqBody, string(body)) + } + + if stub.resStatus > 0 { + w.WriteHeader(stub.resStatus) + } + w.Write([]byte(stub.resBody)) + return + } + } + + t.Fatalf("Unexpected request: '%s %s'.\nStubs: %#v", r.Method, r.URL.Path, stubSummaries) + })) +} + +func instance(t *testing.T, server *httptest.Server) *beam.Object { + url := "tcp://localhost:4243" + if server != nil { + url = strings.Replace(server.URL, "http://", "tcp://", 1) + } + + backend := DockerClient() + instance, err := beam.Obj(backend).Spawn(url) + if err != nil { + t.Fatal(err) + } + return instance +} + +func child(t *testing.T, server *httptest.Server, i *beam.Object, name string) *beam.Object { + _, child, err := i.Attach(name) + if err != nil { + t.Fatal(err) + } + return child +} diff --git a/backends/orchard.go b/backends/orchard.go index 536a13241b..1b30f873b7 100644 --- a/backends/orchard.go +++ b/backends/orchard.go @@ -35,7 +35,7 @@ func Orchard() beam.Sender { return err } - backend := ForwardWithConfig(&ForwardConfig{ + backend := DockerClientWithConfig(&DockerClientConfig{ Scheme: "https", URLHost: host.IPAddress, TLSClientConfig: tlsConfig, diff --git a/dockerclient/dockerclient.go b/dockerclient/dockerclient.go deleted file mode 100644 index 2153e87f1c..0000000000 --- a/dockerclient/dockerclient.go +++ /dev/null @@ -1,159 +0,0 @@ -package main - -import ( - "encoding/json" - "fmt" - "github.com/codegangsta/cli" - "github.com/docker/libswarm/backends" - "github.com/docker/libswarm/beam" - "github.com/dotcloud/docker/runconfig" - "github.com/dotcloud/docker/utils" - "io" - "os" - "strings" -) - -func main() { - app := cli.NewApp() - app.Name = "swarmd" - app.Usage = "Control a heterogenous distributed system with the Docker API" - app.Version = "0.0.1" - app.Flags = []cli.Flag{ - cli.StringFlag{"backend", "debug", "load a backend"}, - } - app.Action = cmdDaemon - app.Run(os.Args) -} - -func cmdDaemon(c *cli.Context) { - app := beam.NewServer() - app.OnLog(beam.Handler(func(msg *beam.Message) error { - utils.Debugf("%s", strings.Join(msg.Args, " ")) - return nil - })) - app.OnError(beam.Handler(func(msg *beam.Message) error { - Fatalf("Fatal: %v", strings.Join(msg.Args[:1], "")) - return nil - })) - - backend := beam.Object{backends.Forward()} - - dockerHost := os.Getenv("DOCKER_HOST") - if dockerHost == "" { - dockerHost = "unix:///var/run/docker.sock" - } - - instance, err := backend.Spawn(dockerHost) - if err != nil { - Fatalf("spawn: %v\n", err) - } - - instanceR, instanceW, err := instance.Attach("") - if err != nil { - Fatalf("attach: %v", err) - } - defer instanceW.Close() - go beam.Copy(app, instanceR) - - if err := instance.Start(); err != nil { - Fatalf("start: %v", err) - } - - err = doCmd(instance, c.Args()) - if err != nil { - Fatalf("%v", err) - } -} - -func doCmd(instance *beam.Object, args []string) error { - if len(args) == 0 { - return fmt.Errorf("no command supplied") - } - if args[0] == "ps" { - if len(args) != 1 { - return fmt.Errorf("usage: ps") - } - names, err := instance.Ls() - if err != nil { - return err - } - fmt.Println(strings.Join(names, "\n")) - return nil - } - if args[0] == "run" { - if len(args) < 3 { - return fmt.Errorf("usage: run IMAGE COMMAND...") - } - containerJson, err := json.Marshal(&runconfig.Config{ - Image: args[1], - Cmd: args[2:], - AttachStdin: false, - AttachStdout: true, - AttachStderr: true, - }) - if err != nil { - return err - } - container, err := instance.Spawn(string(containerJson)) - if err != nil { - return fmt.Errorf("spawn: %v", err) - } - logs, _, err := container.Attach("") - if err != nil { - return fmt.Errorf("attach: %v", err) - } - if err = container.Start(); err != nil { - return fmt.Errorf("start: %v", err) - } - for { - msg, err := logs.Receive(beam.Ret) - if err != nil { - if err.Error() == "EOF" { - break - } - return fmt.Errorf("error reading from container: %v", err) - } - if msg.Verb != beam.Log { - return fmt.Errorf("unexpected message reading from container: %v", msg) - } - if len(msg.Args) != 2 { - return fmt.Errorf("expected exactly 2 args to log message, got %d", len(msg.Args)) - } - tag, chunk := msg.Args[0], msg.Args[1] - var stream io.Writer - if tag == "stdout" { - stream = os.Stdout - } else if tag == "stderr" { - stream = os.Stderr - } else { - return fmt.Errorf("unrecognised tag: %s", tag) - } - fmt.Fprint(stream, chunk) - } - return nil - } - if args[0] == "inspect" { - if len(args) != 2 { - return fmt.Errorf("usage: inspect CONTAINER") - } - _, container, err := instance.Attach(args[1]) - if err != nil { - return fmt.Errorf("attach: %v", err) - } - json, err := container.Get() - if err != nil { - return fmt.Errorf("get: %v", err) - } - fmt.Println(json) - return nil - } - return fmt.Errorf("unrecognised command: %s", args[0]) -} - -func Fatalf(msg string, args ...interface{}) { - if !strings.HasSuffix(msg, "\n") { - msg = msg + "\n" - } - fmt.Fprintf(os.Stderr, msg, args...) - os.Exit(1) -}