diff --git a/backends/backends.go b/backends/backends.go index 6a5b1517cf..05b6675d1c 100644 --- a/backends/backends.go +++ b/backends/backends.go @@ -21,5 +21,6 @@ func New() *beam.Object { backends.Bind("orchard", Orchard()) backends.Bind("aggregate", Aggregate()) backends.Bind("shipyard", Shipyard()) + backends.Bind("tutum", Tutum()) return beam.Obj(backends) } diff --git a/backends/tutum.go b/backends/tutum.go index 0f2800b27c..dc83e56633 100644 --- a/backends/tutum.go +++ b/backends/tutum.go @@ -3,223 +3,218 @@ package backends import ( "encoding/json" "fmt" + "github.com/docker/libswarm/beam" "github.com/dotcloud/docker/engine" - "github.com/dotcloud/docker/runconfig" "github.com/tutumcloud/go-tutum" - "io" "io/ioutil" - "log" "net/http" "net/url" "strings" + "time" ) var ( - tutumConnector = "docker.tutum.co:49460" + tutumConnectorHost = "https://docker.tutum.co:49460" tutumConnectorVersion = "v1.11" ) -func Tutum() engine.Installer { - return &tutumBackend{} +func Tutum() beam.Sender { + backend := beam.NewServer() + backend.OnSpawn(beam.Handler(func(ctx *beam.Message) error { + if len(ctx.Args) == 2 { + tutum.User = ctx.Args[0] + tutum.ApiKey = ctx.Args[1] + } + if !tutum.IsAuthenticated() { + return fmt.Errorf("You need to provide your Tutum credentials in ~/.tutum or environment variables TUTUM_USER and TUTUM_APIKEY") + } + tutumDockerConnector, err := newConnector(tutumConnectorHost, tutumConnectorVersion) + if err != nil { + return fmt.Errorf("%v", err) + } + t := &tutumBackend{ + tutumDockerConnector: tutumDockerConnector, + Server: beam.NewServer(), + } + t.Server.OnAttach(beam.Handler(t.attach)) + t.Server.OnStart(beam.Handler(t.ack)) + t.Server.OnLs(beam.Handler(t.ls)) + t.Server.OnSpawn(beam.Handler(t.spawn)) + _, err = ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: t.Server}) + return err + })) + return backend } type tutumBackend struct { + tutumDockerConnector *tutumDockerConnector + *beam.Server } -func (t *tutumBackend) Install(eng *engine.Engine) error { - eng.Register("tutum", func(job *engine.Job) engine.Status { - if len(job.Args) == 1 { - tutumConnector = job.Args[0] +func (t *tutumBackend) attach(ctx *beam.Message) error { + if ctx.Args[0] == "" { + ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: t.Server}) + for { + time.Sleep(1 * time.Second) } - if !tutum.IsAuthenticated() { - return job.Errorf("You need to provide your Tutum credentials in ~/.tutum or environment variables TUTUM_USER and TUTUM_APIKEY") - } - job.Eng.Register("containers", func(job *engine.Job) engine.Status { - log.Printf("Received '%s' operation....", job.Name) - path := fmt.Sprintf( - "/containers/json?all=%s&limit=%s", - url.QueryEscape(job.Getenv("all")), - url.QueryEscape(job.Getenv("limit")), - ) - resp, err := tutumConnectorCall("GET", path, "") - if err != nil { - return job.Errorf("%s: get: %v", path, err) - } - c := engine.NewTable("Created", 0) - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return job.Errorf("%s: read body: %v", path, err) - } - if _, err := c.ReadListFrom(body); err != nil { - return job.Errorf("%s: readlist: %v", path, err) - } - c.WriteListTo(job.Stdout) - return engine.StatusOK - }) - job.Eng.Register("create", func(job *engine.Job) engine.Status { - log.Printf("Received '%s' operation....", job.Name) - path := fmt.Sprintf( - "/containers/create", - ) - config := runconfig.ContainerConfigFromJob(job) - data, err := json.Marshal(config) - if err != nil { - return job.Errorf("%s: json marshal: %v", path, err) - } - resp, err := tutumConnectorCall("POST", path, string(data)) - if err != nil { - return job.Errorf("%s: post: %v", path, err) - } - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return job.Errorf("%s: read body: %#v", path, err) - } - var containerOut struct { - Id string - Warnings []string - } - err = json.Unmarshal([]byte(body), &containerOut) - _, err = job.Printf("%s\n", containerOut.Id) - if err != nil { - return job.Errorf("%s: write body: %#v", path, err) - } - log.Printf("%s", string(body)) - return engine.StatusOK - }) - job.Eng.Register("container_delete", func(job *engine.Job) engine.Status { - log.Printf("Received '%s' operation....", job.Name) - path := fmt.Sprintf( - "/containers/%s?force=%s", - job.Args[0], - url.QueryEscape(job.Getenv("forceRemove")), - ) - _, err := tutumConnectorCall("DELETE", path, "") - if err != nil { - return job.Errorf("%s: delete: %v", path, err) - } - return engine.StatusOK - }) - job.Eng.Register("start", func(job *engine.Job) engine.Status { - log.Printf("Received '%s' operation....", job.Name) - path := fmt.Sprintf("/containers/%s/start", job.Args[0]) - config := runconfig.ContainerConfigFromJob(job) - data, err := json.Marshal(config) - if err != nil { - return job.Errorf("%s: json marshal: %v", path, err) - } - _, err = tutumConnectorCall("POST", path, string(data)) - if err != nil { - return job.Errorf("%s: post: %v", path, err) - } - return engine.StatusOK - }) - job.Eng.Register("stop", func(job *engine.Job) engine.Status { - log.Printf("Received '%s' operation....", job.Name) - path := fmt.Sprintf( - "/containers/%s/stop?t=%s", - job.Args[0], - url.QueryEscape(job.Getenv("t")), - ) - _, err := tutumConnectorCall("POST", path, "") - if err != nil { - return job.Errorf("%s: post: %v", path, err) - } - return engine.StatusOK - }) - job.Eng.Register("kill", func(job *engine.Job) engine.Status { - log.Printf("Received '%s' operation....", job.Name) - path := fmt.Sprintf( - "/containers/%s/kill?signal=%s", - job.Args[0], - job.Args[1], - ) - _, err := tutumConnectorCall("POST", path, "") - if err != nil { - return job.Errorf("%s: post: %v", path, err) - } - return engine.StatusOK - }) - job.Eng.Register("restart", func(job *engine.Job) engine.Status { - log.Printf("Received '%s' operation....", job.Name) - path := fmt.Sprintf( - "/containers/%s/restart?t=%s", - job.Args[0], - url.QueryEscape(job.Getenv("t")), - ) - _, err := tutumConnectorCall("POST", path, "") - if err != nil { - return job.Errorf("%s: post: %v", path, err) - } - return engine.StatusOK - }) - job.Eng.Register("inspect", func(job *engine.Job) engine.Status { - log.Printf("Received '%s' operation....", job.Name) - path := fmt.Sprintf( - "/containers/%s/json", - job.Args[0], - ) - resp, err := tutumConnectorCall("GET", path, "") - if err != nil { - return job.Errorf("%s: get: %v", path, err) - } - _, err = io.Copy(job.Stdout, resp.Body) - if err != nil { - return job.Errorf("%s: copy stream: %v", path, err) - } - return engine.StatusOK - }) - job.Eng.Register("logs", func(job *engine.Job) engine.Status { - log.Printf("Received '%s' operation....", job.Name) - path := fmt.Sprintf( - "/containers/%s/logs?stdout=%s&stderr=%s", - job.Args[0], - url.QueryEscape(job.Getenv("stdout")), - url.QueryEscape(job.Getenv("stderr")), - ) - resp, err := tutumConnectorCall("GET", path, "") - if err != nil { - return job.Errorf("%s: get: %v", path, err) - } - _, err = io.Copy(job.Stdout, resp.Body) - if err != nil { - return job.Errorf("%s: copy stream: %v", path, err) - } - return engine.StatusOK - }) - job.Eng.Register("version", func(job *engine.Job) engine.Status { - log.Printf("Received '%s' operation....", job.Name) - path := "/version" - resp, err := tutumConnectorCall("GET", path, "") - if err != nil { - return job.Errorf("%s: get: %v", path, err) - } - _, err = io.Copy(job.Stdout, resp.Body) - if err != nil { - return job.Errorf("%s: copy stream: %v", path, err) - } - return engine.StatusOK - }) - job.Eng.RegisterCatchall(func(job *engine.Job) engine.Status { - return job.Errorf("Operation not yet supported: %s", job.Name) - }) - return engine.StatusOK - }) + } else { + c := t.newContainer(ctx.Args[0]) + ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: c}) + } return nil } -func tutumConnectorCall(method, path, body string) (*http.Response, error) { - apiPath := fmt.Sprintf( - "/%s%s", - tutumConnectorVersion, - path, - ) - u, err := url.Parse(apiPath) +func (t *tutumBackend) ack(ctx *beam.Message) error { + ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: t.Server}) + return nil +} + +func (t *tutumBackend) ls(ctx *beam.Message) error { + resp, err := t.tutumDockerConnector.call("GET", "/containers/json", "") + if err != nil { + return fmt.Errorf("%s: get: %v", t.tutumDockerConnector.URL.String(), err) + } + c := engine.NewTable("Created", 0) + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("%s: read body: %v", t.tutumDockerConnector.URL.String(), err) + } + if _, err := c.ReadListFrom(body); err != nil { + return fmt.Errorf("%s: readlist: %v", t.tutumDockerConnector.URL.String(), err) + } + ids := []string{} + for _, env := range c.Data { + ids = append(ids, env.GetList("Id")[0]) + } + if _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Set, Args: ids}); err != nil { + return fmt.Errorf("%s: send response: %v", t.tutumDockerConnector.URL.String(), err) + } + return nil +} + +func (t *tutumBackend) spawn(ctx *beam.Message) error { + if len(ctx.Args) != 1 { + return fmt.Errorf("tutum: spawn takes exactly 1 argument, got %d", len(ctx.Args)) + } + resp, err := t.tutumDockerConnector.call("POST", "/containers/create", ctx.Args[0]) + if err != nil { + return err + } + respBody, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + if resp.StatusCode != 201 { + return fmt.Errorf("expected status code 201, got %d:\n%s", resp.StatusCode, respBody) + } + var respJson struct{ Id string } + if err = json.Unmarshal(respBody, &respJson); err != nil { + return err + } + c := t.newContainer(respJson.Id) + if _, err = ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: c}); err != nil { + return err + } + return nil +} + +func (t *tutumBackend) newContainer(id string) beam.Sender { + c := &tutumContainer{tutumBackend: t, id: id} + instance := beam.NewServer() + instance.OnGet(beam.Handler(c.get)) + instance.OnStart(beam.Handler(c.start)) + instance.OnStop(beam.Handler(c.stop)) + return instance +} + +type tutumContainer struct { + tutumBackend *tutumBackend + id string +} + +func (c *tutumContainer) get(ctx *beam.Message) error { + path := fmt.Sprintf("/containers/%s/json", c.id) + resp, err := c.tutumBackend.tutumDockerConnector.call("GET", path, "") + if err != nil { + return err + } + respBody, err := ioutil.ReadAll(resp.Body) + fmt.Printf("%s", respBody) + if err != nil { + return err + } + if resp.StatusCode != 200 { + return fmt.Errorf("%s", respBody) + } + if _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Set, Args: []string{string(respBody)}}); err != nil { + return err + } + return nil +} + +func (c *tutumContainer) start(ctx *beam.Message) error { + path := fmt.Sprintf("/containers/%s/start", c.id) + resp, err := c.tutumBackend.tutumDockerConnector.call("POST", path, "") + if err != nil { + return err + } + respBody, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + if resp.StatusCode != 204 { + return fmt.Errorf("expected status code 204, got %d:\n%s", resp.StatusCode, respBody) + } + if _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Ack}); err != nil { + return err + } + return nil +} + +func (c *tutumContainer) stop(ctx *beam.Message) error { + path := fmt.Sprintf("/containers/%s/stop", c.id) + resp, err := c.tutumBackend.tutumDockerConnector.call("POST", path, "") + if err != nil { + return err + } + respBody, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + if resp.StatusCode != 204 { + return fmt.Errorf("expected status code 204, got %d:\n%s", resp.StatusCode, respBody) + } + if _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Ack}); err != nil { + return err + } + return nil +} + +type tutumDockerConnector struct { + URL *url.URL + version string +} + +func newConnector(peer, version string) (*tutumDockerConnector, error) { + u, err := url.Parse(peer) if err != nil { return nil, err } - u.Host = tutumConnector - u.Scheme = "https" - log.Printf("[tutum] >> Calling connector: %s %s", method, path) + c := &tutumDockerConnector{ + URL: u, + version: version, + } + return c, nil +} + +func (c *tutumDockerConnector) call(method, path, body string) (*http.Response, error) { + path = fmt.Sprintf("/%s%s", c.version, path) + u, err := url.Parse(path) + if err != nil { + return nil, err + } + u.Host = c.URL.Host + u.Scheme = c.URL.Scheme req, err := http.NewRequest(method, u.String(), strings.NewReader(body)) if err != nil { return nil, err @@ -230,7 +225,6 @@ func tutumConnectorCall(method, path, body string) (*http.Response, error) { if err != nil { return nil, err } - log.Printf("[tutum] << Response: %d", resp.StatusCode) if resp.StatusCode >= 400 { body, err := ioutil.ReadAll(resp.Body) if err != nil { diff --git a/swarmd/Godeps/Godeps.json b/swarmd/Godeps/Godeps.json index 22c8bc02f3..513fdff52b 100644 --- a/swarmd/Godeps/Godeps.json +++ b/swarmd/Godeps/Godeps.json @@ -150,7 +150,7 @@ }, { "ImportPath": "github.com/tutumcloud/go-tutum", - "Rev": "d826286d2e5882428c8163ab44261987bea85a44" + "Rev": "c07f413c4e14bcc40f5bb9c6dbe40ec2289fa234" }, { "ImportPath": "github.com/rackspace/gophercloud",