diff --git a/backends/aggregate.go b/backends/aggregate.go index 158229869b..51750a169a 100644 --- a/backends/aggregate.go +++ b/backends/aggregate.go @@ -1,7 +1,7 @@ package backends import ( - "github.com/docker/libswarm/beam" + "github.com/docker/libswarm" "github.com/flynn/go-shlex" "fmt" @@ -9,11 +9,11 @@ import ( "sync" ) -func Aggregate() beam.Sender { - backend := beam.NewServer() - backend.OnSpawn(func(cmd ...string) (beam.Sender, error) { +func Aggregate() libswarm.Sender { + backend := libswarm.NewServer() + backend.OnSpawn(func(cmd ...string) (libswarm.Sender, error) { allBackends := New() - instance := beam.NewServer() + instance := libswarm.NewServer() a, err := newAggregator(allBackends, instance, cmd) if err != nil { @@ -30,11 +30,11 @@ func Aggregate() beam.Sender { } type aggregator struct { - backends []*beam.Object - server *beam.Server + backends []*libswarm.Client + server *libswarm.Server } -func newAggregator(allBackends *beam.Object, server *beam.Server, args []string) (*aggregator, error) { +func newAggregator(allBackends *libswarm.Client, server *libswarm.Server, args []string) (*aggregator, error) { a := &aggregator{server: server} for _, argString := range args { @@ -60,13 +60,13 @@ func newAggregator(allBackends *beam.Object, server *beam.Server, args []string) return a, nil } -func (a *aggregator) attach(name string, ret beam.Sender) error { +func (a *aggregator) attach(name string, ret libswarm.Sender) error { if name != "" { // TODO: implement this? return fmt.Errorf("attaching to a child is not implemented") } - if _, err := ret.Send(&beam.Message{Verb: beam.Ack, Ret: a.server}); err != nil { + if _, err := ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: a.server}); err != nil { return err } @@ -80,7 +80,7 @@ func (a *aggregator) attach(name string, ret beam.Sender) error { copies.Add(1) go func() { log.Printf("copying output from %#v\n", b) - beam.Copy(ret, r) + libswarm.Copy(ret, r) log.Printf("finished output from %#v\n", b) copies.Done() }() diff --git a/backends/backends.go b/backends/backends.go index 4257010a2e..5357fc161b 100644 --- a/backends/backends.go +++ b/backends/backends.go @@ -1,8 +1,9 @@ package backends import ( - "github.com/docker/libswarm/beam" + "github.com/docker/libswarm" "github.com/docker/libswarm/debug" + "github.com/docker/libswarm/utils" ) // New returns a new engine, with all backends @@ -11,8 +12,8 @@ import ( // engine, named after the desired backend. // // Example: `New().Job("debug").Run()` -func New() *beam.Object { - backends := beam.NewTree() +func New() *libswarm.Client { + backends := utils.NewTree() backends.Bind("simulator", Simulator()) backends.Bind("debug", debug.Debug()) backends.Bind("fakeclient", FakeClient()) @@ -24,5 +25,5 @@ func New() *beam.Object { backends.Bind("shipyard", Shipyard()) backends.Bind("ec2", Ec2()) backends.Bind("tutum", Tutum()) - return beam.Obj(backends) + return libswarm.AsClient(backends) } diff --git a/backends/dockerclient.go b/backends/dockerclient.go index 50960a760d..912f05c0d9 100644 --- a/backends/dockerclient.go +++ b/backends/dockerclient.go @@ -4,9 +4,10 @@ import ( "crypto/tls" "encoding/json" "fmt" - "github.com/docker/libswarm/beam" + "github.com/docker/libswarm" + "github.com/docker/libswarm/utils" "github.com/dotcloud/docker/engine" - "github.com/dotcloud/docker/utils" + dockerutils "github.com/dotcloud/docker/utils" "io" "io/ioutil" "net" @@ -22,16 +23,16 @@ type DockerClientConfig struct { TLSClientConfig *tls.Config } -func DockerClient() beam.Sender { +func DockerClient() libswarm.Sender { return DockerClientWithConfig(&DockerClientConfig{ Scheme: "http", URLHost: "dummy.host", }) } -func DockerClientWithConfig(config *DockerClientConfig) beam.Sender { - backend := beam.NewServer() - backend.OnSpawn(func(cmd ...string) (beam.Sender, error) { +func DockerClientWithConfig(config *DockerClientConfig) libswarm.Sender { + backend := libswarm.NewServer() + backend.OnSpawn(func(cmd ...string) (libswarm.Sender, error) { if len(cmd) != 1 { return nil, fmt.Errorf("dockerclient: spawn takes exactly 1 argument, got %d", len(cmd)) } @@ -42,7 +43,7 @@ func DockerClientWithConfig(config *DockerClientConfig) beam.Sender { client.setURL(cmd[0]) b := &dockerClientBackend{ client: client, - Server: beam.NewServer(), + Server: libswarm.NewServer(), } b.Server.OnAttach(b.attach) b.Server.OnStart(b.start) @@ -55,12 +56,12 @@ func DockerClientWithConfig(config *DockerClientConfig) beam.Sender { type dockerClientBackend struct { client *client - *beam.Server + *libswarm.Server } -func (b *dockerClientBackend) attach(name string, ret beam.Sender) error { +func (b *dockerClientBackend) attach(name string, ret libswarm.Sender) error { if name == "" { - ret.Send(&beam.Message{Verb: beam.Ack, Ret: b.Server}) + ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: b.Server}) <-make(chan struct{}) } else { path := fmt.Sprintf("/containers/%s/json", name) @@ -76,7 +77,7 @@ func (b *dockerClientBackend) attach(name string, ret beam.Sender) error { return fmt.Errorf("%s", respBody) } c := b.newContainer(name) - ret.Send(&beam.Message{Verb: beam.Ack, Ret: c}) + ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: c}) } return nil } @@ -106,7 +107,7 @@ func (b *dockerClientBackend) ls() ([]string, error) { return names, nil } -func (b *dockerClientBackend) spawn(cmd ...string) (beam.Sender, error) { +func (b *dockerClientBackend) spawn(cmd ...string) (libswarm.Sender, error) { if len(cmd) != 1 { return nil, fmt.Errorf("dockerclient: spawn takes exactly 1 argument, got %d", len(cmd)) } @@ -128,9 +129,9 @@ func (b *dockerClientBackend) spawn(cmd ...string) (beam.Sender, error) { return b.newContainer(respJson.Id), nil } -func (b *dockerClientBackend) newContainer(id string) beam.Sender { +func (b *dockerClientBackend) newContainer(id string) libswarm.Sender { c := &container{backend: b, id: id} - instance := beam.NewServer() + instance := libswarm.NewServer() instance.OnAttach(c.attach) instance.OnStart(c.start) instance.OnStop(c.stop) @@ -143,8 +144,8 @@ type container struct { id string } -func (c *container) attach(name string, ret beam.Sender) error { - if _, err := ret.Send(&beam.Message{Verb: beam.Ack}); err != nil { +func (c *container) attach(name string, ret libswarm.Sender) error { + if _, err := ret.Send(&libswarm.Message{Verb: libswarm.Ack}); err != nil { return err } @@ -152,8 +153,8 @@ func (c *container) attach(name string, ret beam.Sender) error { stdoutR, stdoutW := io.Pipe() stderrR, stderrW := io.Pipe() - go beam.EncodeStream(ret, stdoutR, "stdout") - go beam.EncodeStream(ret, stderrR, "stderr") + go utils.EncodeStream(ret, stdoutR, "stdout") + go utils.EncodeStream(ret, stderrR, "stderr") c.backend.client.hijack("POST", path, nil, stdoutW, stderrW) return nil @@ -270,40 +271,40 @@ func (c *client) hijack(method, path string, in io.ReadCloser, stdout, stderr io rwc, br := clientconn.Hijack() defer rwc.Close() - receiveStdout := utils.Go(func() (err error) { + receiveStdout := dockerutils.Go(func() (err error) { defer func() { if in != nil { in.Close() } }() - _, err = utils.StdCopy(stdout, stderr, br) - utils.Debugf("[hijack] End of stdout") + _, err = dockerutils.StdCopy(stdout, stderr, br) + dockerutils.Debugf("[hijack] End of stdout") return err }) - sendStdin := utils.Go(func() error { + sendStdin := dockerutils.Go(func() error { if in != nil { io.Copy(rwc, in) - utils.Debugf("[hijack] End of stdin") + dockerutils.Debugf("[hijack] End of stdin") } if tcpc, ok := rwc.(*net.TCPConn); ok { if err := tcpc.CloseWrite(); err != nil { - utils.Debugf("Couldn't send EOF: %s", err) + dockerutils.Debugf("Couldn't send EOF: %s", err) } } else if unixc, ok := rwc.(*net.UnixConn); ok { if err := unixc.CloseWrite(); err != nil { - utils.Debugf("Couldn't send EOF: %s", err) + dockerutils.Debugf("Couldn't send EOF: %s", err) } } // Discard errors due to pipe interruption return nil }) if err := <-receiveStdout; err != nil { - utils.Debugf("Error receiveStdout: %s", err) + dockerutils.Debugf("Error receiveStdout: %s", err) return err } if err := <-sendStdin; err != nil { - utils.Debugf("Error sendStdin: %s", err) + dockerutils.Debugf("Error sendStdin: %s", err) return err } return nil diff --git a/backends/dockerclient_test.go b/backends/dockerclient_test.go index 6322081111..ac9394b754 100644 --- a/backends/dockerclient_test.go +++ b/backends/dockerclient_test.go @@ -1,7 +1,7 @@ package backends import ( - "github.com/docker/libswarm/beam" + "github.com/docker/libswarm" "fmt" "io/ioutil" @@ -240,21 +240,21 @@ func (s *stubServer) AllSummaries() []string { return summaries } -func instance(t *testing.T, server *stubServer) *beam.Object { +func instance(t *testing.T, server *stubServer) *libswarm.Client { url := "tcp://localhost:4243" if server != nil { url = strings.Replace(server.URL, "http://", "tcp://", 1) } backend := DockerClient() - instance, err := beam.Obj(backend).Spawn(url) + instance, err := libswarm.AsClient(backend).Spawn(url) if err != nil { t.Fatal(err) } return instance } -func child(t *testing.T, server *stubServer, i *beam.Object, name string) *beam.Object { +func child(t *testing.T, server *stubServer, i *libswarm.Client, name string) *libswarm.Client { _, child, err := i.Attach(name) if err != nil { t.Fatal(err) diff --git a/backends/dockerserver.go b/backends/dockerserver.go index 7a3c237ff2..3b9ae39912 100644 --- a/backends/dockerserver.go +++ b/backends/dockerserver.go @@ -3,11 +3,12 @@ package backends import ( "encoding/json" "fmt" - "github.com/docker/libswarm/beam" + "github.com/docker/libswarm" + "github.com/docker/libswarm/utils" "github.com/dotcloud/docker/api" "github.com/dotcloud/docker/pkg/version" dockerContainerConfig "github.com/dotcloud/docker/runconfig" - "github.com/dotcloud/docker/utils" + dockerutils "github.com/dotcloud/docker/utils" "github.com/gorilla/mux" "io" "io/ioutil" @@ -20,10 +21,10 @@ import ( "time" ) -func DockerServer() beam.Sender { - backend := beam.NewServer() - backend.OnVerb(beam.Spawn, beam.Handler(func(ctx *beam.Message) error { - instance := beam.Task(func(in beam.Receiver, out beam.Sender) { +func DockerServer() libswarm.Sender { + backend := libswarm.NewServer() + backend.OnVerb(libswarm.Spawn, libswarm.Handler(func(ctx *libswarm.Message) error { + instance := utils.Task(func(in libswarm.Receiver, out libswarm.Sender) { url := "tcp://localhost:4243" if len(ctx.Args) > 0 { url = ctx.Args[0] @@ -33,15 +34,15 @@ func DockerServer() beam.Sender { fmt.Printf("listenAndServe: %v", err) } }) - _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: instance}) + _, err := ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: instance}) return err })) return backend } -type HttpApiFunc func(out beam.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error +type HttpApiFunc func(out libswarm.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error -func listenAndServe(urlStr string, out beam.Sender) error { +func listenAndServe(urlStr string, out libswarm.Sender) error { fmt.Println("Starting Docker server...") r, err := createRouter(out) if err != nil { @@ -69,7 +70,7 @@ func listenAndServe(urlStr string, out beam.Sender) error { return httpSrv.Serve(l) } -func ping(out beam.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error { +func ping(out libswarm.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error { _, err := w.Write([]byte{'O', 'K'}) return err } @@ -107,14 +108,14 @@ type containerJson struct { VolumesRW map[string]bool } -func getContainerJson(out beam.Sender, containerID string) (containerJson, error) { - o := beam.Obj(out) +func getContainerJson(out libswarm.Sender, containerID string) (containerJson, error) { + o := libswarm.AsClient(out) _, containerOut, err := o.Attach(containerID) if err != nil { return containerJson{}, err } - container := beam.Obj(containerOut) + container := libswarm.AsClient(containerOut) responseJson, err := container.Get() if err != nil { return containerJson{}, err @@ -128,7 +129,7 @@ func getContainerJson(out beam.Sender, containerID string) (containerJson, error return response, nil } -func getContainerInfo(out beam.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error { +func getContainerInfo(out libswarm.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error { container, err := getContainerJson(out, vars["name"]) if err != nil { return err @@ -136,12 +137,12 @@ func getContainerInfo(out beam.Sender, version version.Version, w http.ResponseW return writeJSON(w, http.StatusOK, container) } -func getContainersJSON(out beam.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error { +func getContainersJSON(out libswarm.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error { if err := r.ParseForm(); err != nil { return err } - o := beam.Obj(out) + o := libswarm.AsClient(out) names, err := o.Ls() if err != nil { return err @@ -214,7 +215,7 @@ func getContainersJSON(out beam.Sender, version version.Version, w http.Response return writeJSON(w, http.StatusOK, responses) } -func postContainersCreate(out beam.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error { +func postContainersCreate(out libswarm.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error { if err := r.ParseForm(); err != nil { return nil } @@ -224,7 +225,7 @@ func postContainersCreate(out beam.Sender, version version.Version, w http.Respo return err } - container, err := beam.Obj(out).Spawn(string(body)) + container, err := libswarm.AsClient(out).Spawn(string(body)) if err != nil { return err } @@ -241,7 +242,7 @@ func postContainersCreate(out beam.Sender, version version.Version, w http.Respo return writeJSON(w, http.StatusCreated, response) } -func postContainersStart(out beam.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error { +func postContainersStart(out libswarm.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error { if vars == nil { return fmt.Errorf("Missing parameter") } @@ -249,8 +250,8 @@ func postContainersStart(out beam.Sender, version version.Version, w http.Respon // TODO: r.Body name := vars["name"] - _, containerOut, err := beam.Obj(out).Attach(name) - container := beam.Obj(containerOut) + _, containerOut, err := libswarm.AsClient(out).Attach(name) + container := libswarm.AsClient(containerOut) if err != nil { return err } @@ -262,14 +263,14 @@ func postContainersStart(out beam.Sender, version version.Version, w http.Respon return nil } -func postContainersStop(out beam.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error { +func postContainersStop(out libswarm.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error { if vars == nil { return fmt.Errorf("Missing parameter") } name := vars["name"] - _, containerOut, err := beam.Obj(out).Attach(name) - container := beam.Obj(containerOut) + _, containerOut, err := libswarm.AsClient(out).Attach(name) + container := libswarm.AsClient(containerOut) if err != nil { return err } @@ -291,7 +292,7 @@ func hijackServer(w http.ResponseWriter) (io.ReadCloser, io.Writer, error) { return conn, conn, nil } -func postContainersAttach(out beam.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error { +func postContainersAttach(out libswarm.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error { if err := r.ParseForm(); err != nil { return err } @@ -321,20 +322,20 @@ func postContainersAttach(out beam.Sender, version version.Version, w http.Respo fmt.Fprintf(outStream, "HTTP/1.1 200 OK\r\nContent-Type: application/vnd.docker.raw-stream\r\n\r\n") // TODO: if a TTY, then no multiplexing is done - errStream := utils.NewStdWriter(outStream, utils.Stderr) - outStream = utils.NewStdWriter(outStream, utils.Stdout) + errStream := dockerutils.NewStdWriter(outStream, dockerutils.Stderr) + outStream = dockerutils.NewStdWriter(outStream, dockerutils.Stdout) - _, containerOut, err := beam.Obj(out).Attach(vars["name"]) + _, containerOut, err := libswarm.AsClient(out).Attach(vars["name"]) if err != nil { return err } - container := beam.Obj(containerOut) + container := libswarm.AsClient(containerOut) containerR, _, err := container.Attach("") var tasks sync.WaitGroup go func() { defer tasks.Done() - err := beam.DecodeStream(outStream, containerR, "stdout") + err := utils.DecodeStream(outStream, containerR, "stdout") if err != nil { fmt.Printf("decodestream: %v\n", err) } @@ -342,7 +343,7 @@ func postContainersAttach(out beam.Sender, version version.Version, w http.Respo tasks.Add(1) go func() { defer tasks.Done() - err := beam.DecodeStream(errStream, containerR, "stderr") + err := utils.DecodeStream(errStream, containerR, "stderr") if err != nil { fmt.Printf("decodestream: %v\n", err) } @@ -353,7 +354,7 @@ func postContainersAttach(out beam.Sender, version version.Version, w http.Respo return nil } -func postContainersWait(out beam.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error { +func postContainersWait(out libswarm.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error { if vars == nil { return fmt.Errorf("Missing parameter") } @@ -365,7 +366,7 @@ func postContainersWait(out beam.Sender, version version.Version, w http.Respons }) } -func createRouter(out beam.Sender) (*mux.Router, error) { +func createRouter(out libswarm.Sender) (*mux.Router, error) { r := mux.NewRouter() m := map[string]map[string]HttpApiFunc{ "GET": { @@ -405,7 +406,7 @@ func createRouter(out beam.Sender) (*mux.Router, error) { return r, nil } -func makeHttpHandler(out beam.Sender, localMethod string, localRoute string, handlerFunc HttpApiFunc, dockerVersion version.Version) http.HandlerFunc { +func makeHttpHandler(out libswarm.Sender, localMethod string, localRoute string, handlerFunc HttpApiFunc, dockerVersion version.Version) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { // log the request fmt.Printf("Calling %s %s\n", localMethod, localRoute) diff --git a/backends/ec2.go b/backends/ec2.go index 5848a2f4fc..af9ece3535 100644 --- a/backends/ec2.go +++ b/backends/ec2.go @@ -3,7 +3,7 @@ package backends import ( "errors" "fmt" - "github.com/docker/libswarm/beam" + "github.com/docker/libswarm" "net" "net/http" "os" @@ -36,22 +36,22 @@ type ec2Config struct { type ec2Client struct { config *ec2Config ec2Conn *ec2.EC2 - Server *beam.Server + Server *libswarm.Server instance *ec2.Instance sshTunnel *os.Process - dockerInstance *beam.Object + dockerInstance *libswarm.Client } -func (c *ec2Client) get(ctx *beam.Message) error { +func (c *ec2Client) get(ctx *libswarm.Message) error { output, err := c.dockerInstance.Get() if err != nil { return err } - ctx.Ret.Send(&beam.Message{Verb: beam.Set, Args: []string{output}}) + ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Set, Args: []string{output}}) return nil } -func (c *ec2Client) start(ctx *beam.Message) error { +func (c *ec2Client) start(ctx *libswarm.Message) error { if instance, err := c.findInstance(); err != nil { return err } else if instance != nil { @@ -73,44 +73,44 @@ func (c *ec2Client) start(ctx *beam.Message) error { c.waitForDockerDaemon() fmt.Printf("ec2 service up and running: region: %s zone: %s\n", c.config.region.Name, c.config.zone) - ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: c.Server}) + ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: c.Server}) return nil } -func (c *ec2Client) spawn(ctx *beam.Message) error { +func (c *ec2Client) spawn(ctx *libswarm.Message) error { out, err := c.dockerInstance.Spawn(ctx.Args...) if err != nil { return err } - ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: out}) + ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: out}) return nil } -func (c *ec2Client) ls(ctx *beam.Message) error { +func (c *ec2Client) ls(ctx *libswarm.Message) error { output, err := c.dockerInstance.Ls() if err != nil { return err } - ctx.Ret.Send(&beam.Message{Verb: beam.Set, Args: output}) + ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Set, Args: output}) return nil } -func (c *ec2Client) stop(ctx *beam.Message) error { +func (c *ec2Client) stop(ctx *libswarm.Message) error { c.dockerInstance.Stop() return nil } -func (c *ec2Client) attach(ctx *beam.Message) error { +func (c *ec2Client) attach(ctx *libswarm.Message) error { if ctx.Args[0] == "" { - ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: c.Server}) + ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: c.Server}) <-make(chan struct{}) } else { _, out, err := c.dockerInstance.Attach(ctx.Args[0]) if err != nil { return err } - ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: out}) + ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: out}) } return nil @@ -281,7 +281,7 @@ func (c *ec2Client) initDockerClientInstance(instance *ec2.Instance) error { URLHost: "localhost", }) - dockerBackend := beam.Obj(dockerClient) + dockerBackend := libswarm.AsClient(dockerClient) url := fmt.Sprintf("tcp://localhost:%s", c.config.sshLocalPort) dockerInstance, err := dockerBackend.Spawn(url) c.dockerInstance = dockerInstance @@ -341,9 +341,9 @@ func signalHandler(client *ec2Client) { }() } -func Ec2() beam.Sender { - backend := beam.NewServer() - backend.OnVerb(beam.Spawn, beam.Handler(func(ctx *beam.Message) error { +func Ec2() libswarm.Sender { + backend := libswarm.NewServer() + backend.OnVerb(libswarm.Spawn, libswarm.Handler(func(ctx *libswarm.Message) error { var config, err = newConfig(ctx.Args) if err != nil { @@ -355,16 +355,16 @@ func Ec2() beam.Sender { return err } - client := &ec2Client{config, ec2Conn, beam.NewServer(), nil, nil, nil} - client.Server.OnVerb(beam.Spawn, beam.Handler(client.spawn)) - client.Server.OnVerb(beam.Start, beam.Handler(client.start)) - client.Server.OnVerb(beam.Stop, beam.Handler(client.stop)) - client.Server.OnVerb(beam.Attach, beam.Handler(client.attach)) - client.Server.OnVerb(beam.Ls, beam.Handler(client.ls)) - client.Server.OnVerb(beam.Get, beam.Handler(client.get)) + client := &ec2Client{config, ec2Conn, libswarm.NewServer(), nil, nil, nil} + client.Server.OnVerb(libswarm.Spawn, libswarm.Handler(client.spawn)) + client.Server.OnVerb(libswarm.Start, libswarm.Handler(client.start)) + client.Server.OnVerb(libswarm.Stop, libswarm.Handler(client.stop)) + client.Server.OnVerb(libswarm.Attach, libswarm.Handler(client.attach)) + client.Server.OnVerb(libswarm.Ls, libswarm.Handler(client.ls)) + client.Server.OnVerb(libswarm.Get, libswarm.Handler(client.get)) signalHandler(client) - _, err = ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: client.Server}) + _, err = ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: client.Server}) return err })) diff --git a/backends/exec.go b/backends/exec.go index edd949ff91..1335406219 100644 --- a/backends/exec.go +++ b/backends/exec.go @@ -9,12 +9,12 @@ import ( "strings" "sync" - "github.com/docker/libswarm/beam" + "github.com/docker/libswarm" ) -func Exec() beam.Sender { - e := beam.NewServer() - e.OnVerb(beam.Spawn, beam.Handler(func(msg *beam.Message) error { +func Exec() libswarm.Sender { + e := libswarm.NewServer() + e.OnVerb(libswarm.Spawn, libswarm.Handler(func(msg *libswarm.Message) error { if len(msg.Args) < 1 { return fmt.Errorf("usage: SPAWN exec|... ") } @@ -31,9 +31,9 @@ func Exec() beam.Sender { } cmd := &command{ Cmd: exec.Command(config.Path, config.Args...), - Server: beam.NewServer(), + Server: libswarm.NewServer(), } - cmd.OnVerb(beam.Attach, beam.Handler(func(msg *beam.Message) error { + cmd.OnVerb(libswarm.Attach, libswarm.Handler(func(msg *libswarm.Message) error { stdout, err := cmd.StdoutPipe() if err != nil { return err @@ -42,11 +42,11 @@ func Exec() beam.Sender { if err != nil { return err } - inR, inW := beam.Pipe() - if _, err := msg.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: inW}); err != nil { + inR, inW := libswarm.Pipe() + if _, err := msg.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: inW}); err != nil { return err } - out := beam.Obj(msg.Ret) + out := libswarm.AsClient(msg.Ret) go func() { defer stdin.Close() for { @@ -54,7 +54,7 @@ func Exec() beam.Sender { if err != nil { return } - if msg.Verb == beam.Log && len(msg.Args) > 0 { + if msg.Verb == libswarm.Log && len(msg.Args) > 0 { fmt.Fprintf(stdin, "%s\n", strings.TrimRight(msg.Args[0], "\r\n")) } } @@ -76,7 +76,7 @@ func Exec() beam.Sender { cmd.tasks.Wait() return nil })) - cmd.OnVerb(beam.Start, beam.Handler(func(msg *beam.Message) error { + cmd.OnVerb(libswarm.Start, libswarm.Handler(func(msg *libswarm.Message) error { cmd.tasks.Add(1) if err := cmd.Cmd.Start(); err != nil { return err @@ -84,13 +84,13 @@ func Exec() beam.Sender { go func() { defer cmd.tasks.Done() if err := cmd.Cmd.Wait(); err != nil { - beam.Obj(msg.Ret).Log("%s exited status=%v", cmd.Cmd.Path, err) + libswarm.AsClient(msg.Ret).Log("%s exited status=%v", cmd.Cmd.Path, err) } }() - msg.Ret.Send(&beam.Message{Verb: beam.Ack}) + msg.Ret.Send(&libswarm.Message{Verb: libswarm.Ack}) return nil })) - if _, err := msg.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: cmd}); err != nil { + if _, err := msg.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: cmd}); err != nil { return err } return nil @@ -100,6 +100,6 @@ func Exec() beam.Sender { type command struct { *exec.Cmd - *beam.Server + *libswarm.Server tasks sync.WaitGroup } diff --git a/backends/fakeclient.go b/backends/fakeclient.go index 157ab842b6..0f357ab62e 100644 --- a/backends/fakeclient.go +++ b/backends/fakeclient.go @@ -4,17 +4,18 @@ import ( "fmt" "time" - "github.com/docker/libswarm/beam" + "github.com/docker/libswarm" + "github.com/docker/libswarm/utils" ) -func FakeClient() beam.Sender { - backend := beam.NewServer() - backend.OnVerb(beam.Spawn, beam.Handler(func(ctx *beam.Message) error { +func FakeClient() libswarm.Sender { + backend := libswarm.NewServer() + backend.OnVerb(libswarm.Spawn, libswarm.Handler(func(ctx *libswarm.Message) error { // Instantiate a new fakeclient instance - instance := beam.Task(func(in beam.Receiver, out beam.Sender) { + instance := utils.Task(func(in libswarm.Receiver, out libswarm.Sender) { fmt.Printf("fake client!\n") defer fmt.Printf("end of fake client!\n") - o := beam.Obj(out) + o := libswarm.AsClient(out) o.Log("fake client starting") defer o.Log("fake client terminating") for { @@ -22,7 +23,7 @@ func FakeClient() beam.Sender { o.Log("fake client heartbeat!") } }) - _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: instance}) + _, err := ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: instance}) return err })) return backend diff --git a/backends/orchard.go b/backends/orchard.go index f16d5aebf3..cb15af29b5 100644 --- a/backends/orchard.go +++ b/backends/orchard.go @@ -1,7 +1,7 @@ package backends import ( - "github.com/docker/libswarm/beam" + "github.com/docker/libswarm" "github.com/orchardup/go-orchard/api" "crypto/tls" @@ -11,9 +11,9 @@ import ( "os" ) -func Orchard() beam.Sender { - backend := beam.NewServer() - backend.OnSpawn(func(cmd ...string) (beam.Sender, error) { +func Orchard() libswarm.Sender { + backend := libswarm.NewServer() + backend.OnSpawn(func(cmd ...string) (libswarm.Sender, error) { if len(cmd) != 2 { return nil, fmt.Errorf("orchard: spawn expects 2 arguments: API token and name of host") } @@ -40,7 +40,7 @@ func Orchard() beam.Sender { URLHost: host.IPAddress, TLSClientConfig: tlsConfig, }) - forwardBackend := beam.Obj(backend) + forwardBackend := libswarm.AsClient(backend) forwardInstance, err := forwardBackend.Spawn(url) if err != nil { return nil, err diff --git a/backends/shipyard.go b/backends/shipyard.go index d05f4f2bb7..cc5d42a746 100644 --- a/backends/shipyard.go +++ b/backends/shipyard.go @@ -3,7 +3,7 @@ package backends import ( "encoding/json" "fmt" - "github.com/docker/libswarm/beam" + "github.com/docker/libswarm" "io/ioutil" "net/http" "net/url" @@ -11,29 +11,29 @@ import ( "time" ) -func Shipyard() beam.Sender { - backend := beam.NewServer() - backend.OnVerb(beam.Spawn, beam.Handler(func(ctx *beam.Message) error { +func Shipyard() libswarm.Sender { + backend := libswarm.NewServer() + backend.OnVerb(libswarm.Spawn, libswarm.Handler(func(ctx *libswarm.Message) error { if len(ctx.Args) != 3 { return fmt.Errorf("Shipyard: Usage ") } c := &shipyard{url: ctx.Args[0], user: ctx.Args[1], pass: ctx.Args[2]} - c.Server = beam.NewServer() - c.Server.OnVerb(beam.Attach, beam.Handler(c.attach)) - c.Server.OnVerb(beam.Start, beam.Handler(c.start)) - c.Server.OnVerb(beam.Ls, beam.Handler(c.containers)) - c.OnVerb(beam.Get, beam.Handler(c.containerInspect)) - _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: c.Server}) + c.Server = libswarm.NewServer() + c.Server.OnVerb(libswarm.Attach, libswarm.Handler(c.attach)) + c.Server.OnVerb(libswarm.Start, libswarm.Handler(c.start)) + c.Server.OnVerb(libswarm.Ls, libswarm.Handler(c.containers)) + c.OnVerb(libswarm.Get, libswarm.Handler(c.containerInspect)) + _, err := ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: c.Server}) return err })) return backend } -func (c *shipyard) attach(ctx *beam.Message) error { +func (c *shipyard) attach(ctx *libswarm.Message) error { if ctx.Args[0] == "" { - ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: c.Server}) + ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: c.Server}) for { time.Sleep(1 * time.Second) } @@ -41,17 +41,17 @@ func (c *shipyard) attach(ctx *beam.Message) error { return nil } -func (c *shipyard) start(ctx *beam.Message) error { - ctx.Ret.Send(&beam.Message{Verb: beam.Ack}) +func (c *shipyard) start(ctx *libswarm.Message) error { + ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack}) return nil } type shipyard struct { url, user, pass string - *beam.Server + *libswarm.Server } -func (c *shipyard) containers(ctx *beam.Message) error { +func (c *shipyard) containers(ctx *libswarm.Message) error { out, err := c.gateway("GET", "containers", "") if err != nil { return err @@ -62,7 +62,7 @@ func (c *shipyard) containers(ctx *beam.Message) error { for _, c := range data.Objects { ids = append(ids, c.Id) } - if _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Set, Args: ids}); err != nil { + if _, err := ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Set, Args: ids}); err != nil { return err } return nil @@ -76,7 +76,7 @@ type shipyardObject struct { Id string `json:"container_id"` } -func (c *shipyard) containerInspect(ctx *beam.Message) error { +func (c *shipyard) containerInspect(ctx *libswarm.Message) error { if len(ctx.Args) != 1 { return fmt.Errorf("Expected 1 container id, got %s", len(ctx.Args)) } @@ -87,7 +87,7 @@ func (c *shipyard) containerInspect(ctx *beam.Message) error { } var data shipyardObject json.Unmarshal(out, &data) - if _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Set, Args: []string{"foo", "bar"}}); err != nil { + if _, err := ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Set, Args: []string{"foo", "bar"}}); err != nil { return err } return nil diff --git a/backends/simulator.go b/backends/simulator.go index a2b81d957a..d992ae40b4 100644 --- a/backends/simulator.go +++ b/backends/simulator.go @@ -1,24 +1,25 @@ package backends import ( - "github.com/docker/libswarm/beam" + "github.com/docker/libswarm" + "github.com/docker/libswarm/utils" ) -func Simulator() beam.Sender { - s := beam.NewServer() - s.OnVerb(beam.Spawn, beam.Handler(func(ctx *beam.Message) error { +func Simulator() libswarm.Sender { + s := libswarm.NewServer() + s.OnVerb(libswarm.Spawn, libswarm.Handler(func(ctx *libswarm.Message) error { containers := ctx.Args - instance := beam.Task(func(in beam.Receiver, out beam.Sender) { - beam.Obj(out).Log("[simulator] starting\n") - s := beam.NewServer() - s.OnVerb(beam.Ls, beam.Handler(func(msg *beam.Message) error { - beam.Obj(out).Log("[simulator] generating fake list of objects...\n") - beam.Obj(msg.Ret).Set(containers...) + instance := utils.Task(func(in libswarm.Receiver, out libswarm.Sender) { + libswarm.AsClient(out).Log("[simulator] starting\n") + s := libswarm.NewServer() + s.OnVerb(libswarm.Ls, libswarm.Handler(func(msg *libswarm.Message) error { + libswarm.AsClient(out).Log("[simulator] generating fake list of objects...\n") + libswarm.AsClient(msg.Ret).Set(containers...) return nil })) - beam.Copy(s, in) + libswarm.Copy(s, in) }) - ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: instance}) + ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: instance}) return nil })) return s diff --git a/backends/tutum.go b/backends/tutum.go index d9a1d41643..fd3e042969 100644 --- a/backends/tutum.go +++ b/backends/tutum.go @@ -3,7 +3,7 @@ package backends import ( "encoding/json" "fmt" - "github.com/docker/libswarm/beam" + "github.com/docker/libswarm" "github.com/dotcloud/docker/engine" "github.com/tutumcloud/go-tutum" "io/ioutil" @@ -18,9 +18,9 @@ var ( tutumConnectorVersion = "v1.11" ) -func Tutum() beam.Sender { - backend := beam.NewServer() - backend.OnVerb(beam.Spawn, beam.Handler(func(ctx *beam.Message) error { +func Tutum() libswarm.Sender { + backend := libswarm.NewServer() + backend.OnVerb(libswarm.Spawn, libswarm.Handler(func(ctx *libswarm.Message) error { if len(ctx.Args) == 2 { tutum.User = ctx.Args[0] tutum.ApiKey = ctx.Args[1] @@ -34,13 +34,13 @@ func Tutum() beam.Sender { } t := &tutumBackend{ tutumDockerConnector: tutumDockerConnector, - Server: beam.NewServer(), + Server: libswarm.NewServer(), } - t.Server.OnVerb(beam.Attach, beam.Handler(t.attach)) - t.Server.OnVerb(beam.Start, beam.Handler(t.ack)) - t.Server.OnVerb(beam.Ls, beam.Handler(t.ls)) - t.Server.OnVerb(beam.Spawn, beam.Handler(t.spawn)) - _, err = ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: t.Server}) + t.Server.OnVerb(libswarm.Attach, libswarm.Handler(t.attach)) + t.Server.OnVerb(libswarm.Start, libswarm.Handler(t.ack)) + t.Server.OnVerb(libswarm.Ls, libswarm.Handler(t.ls)) + t.Server.OnVerb(libswarm.Spawn, libswarm.Handler(t.spawn)) + _, err = ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: t.Server}) return err })) return backend @@ -48,28 +48,28 @@ func Tutum() beam.Sender { type tutumBackend struct { tutumDockerConnector *tutumDockerConnector - *beam.Server + *libswarm.Server } -func (t *tutumBackend) attach(ctx *beam.Message) error { +func (t *tutumBackend) attach(ctx *libswarm.Message) error { if ctx.Args[0] == "" { - ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: t.Server}) + ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: t.Server}) for { time.Sleep(1 * time.Second) } } else { c := t.newContainer(ctx.Args[0]) - ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: c}) + ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: c}) } return nil } -func (t *tutumBackend) ack(ctx *beam.Message) error { - ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: t.Server}) +func (t *tutumBackend) ack(ctx *libswarm.Message) error { + ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: t.Server}) return nil } -func (t *tutumBackend) ls(ctx *beam.Message) error { +func (t *tutumBackend) ls(ctx *libswarm.Message) error { resp, err := t.tutumDockerConnector.call("GET", "/containers/json", "") if err != nil { return fmt.Errorf("%s: get: %v", t.tutumDockerConnector.URL.String(), err) @@ -86,13 +86,13 @@ func (t *tutumBackend) ls(ctx *beam.Message) error { for _, env := range c.Data { ids = append(ids, env.GetList("Id")[0]) } - if _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Set, Args: ids}); err != nil { + if _, err := ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Set, Args: ids}); err != nil { return fmt.Errorf("%s: send response: %v", t.tutumDockerConnector.URL.String(), err) } return nil } -func (t *tutumBackend) spawn(ctx *beam.Message) error { +func (t *tutumBackend) spawn(ctx *libswarm.Message) error { if len(ctx.Args) != 1 { return fmt.Errorf("tutum: spawn takes exactly 1 argument, got %d", len(ctx.Args)) } @@ -112,18 +112,18 @@ func (t *tutumBackend) spawn(ctx *beam.Message) error { return err } c := t.newContainer(respJson.Id) - if _, err = ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: c}); err != nil { + if _, err = ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: c}); err != nil { return err } return nil } -func (t *tutumBackend) newContainer(id string) beam.Sender { +func (t *tutumBackend) newContainer(id string) libswarm.Sender { c := &tutumContainer{tutumBackend: t, id: id} - instance := beam.NewServer() - instance.OnVerb(beam.Get, beam.Handler(c.get)) - instance.OnVerb(beam.Start, beam.Handler(c.start)) - instance.OnVerb(beam.Stop, beam.Handler(c.stop)) + instance := libswarm.NewServer() + instance.OnVerb(libswarm.Get, libswarm.Handler(c.get)) + instance.OnVerb(libswarm.Start, libswarm.Handler(c.start)) + instance.OnVerb(libswarm.Stop, libswarm.Handler(c.stop)) return instance } @@ -132,7 +132,7 @@ type tutumContainer struct { id string } -func (c *tutumContainer) get(ctx *beam.Message) error { +func (c *tutumContainer) get(ctx *libswarm.Message) error { path := fmt.Sprintf("/containers/%s/json", c.id) resp, err := c.tutumBackend.tutumDockerConnector.call("GET", path, "") if err != nil { @@ -146,13 +146,13 @@ func (c *tutumContainer) get(ctx *beam.Message) error { if resp.StatusCode != 200 { return fmt.Errorf("%s", respBody) } - if _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Set, Args: []string{string(respBody)}}); err != nil { + if _, err := ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Set, Args: []string{string(respBody)}}); err != nil { return err } return nil } -func (c *tutumContainer) start(ctx *beam.Message) error { +func (c *tutumContainer) start(ctx *libswarm.Message) error { path := fmt.Sprintf("/containers/%s/start", c.id) resp, err := c.tutumBackend.tutumDockerConnector.call("POST", path, "") if err != nil { @@ -165,13 +165,13 @@ func (c *tutumContainer) start(ctx *beam.Message) error { if resp.StatusCode != 204 { return fmt.Errorf("expected status code 204, got %d:\n%s", resp.StatusCode, respBody) } - if _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Ack}); err != nil { + if _, err := ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack}); err != nil { return err } return nil } -func (c *tutumContainer) stop(ctx *beam.Message) error { +func (c *tutumContainer) stop(ctx *libswarm.Message) error { path := fmt.Sprintf("/containers/%s/stop", c.id) resp, err := c.tutumBackend.tutumDockerConnector.call("POST", path, "") if err != nil { @@ -184,7 +184,7 @@ func (c *tutumContainer) stop(ctx *beam.Message) error { if resp.StatusCode != 204 { return fmt.Errorf("expected status code 204, got %d:\n%s", resp.StatusCode, respBody) } - if _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Ack}); err != nil { + if _, err := ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack}); err != nil { return err } return nil diff --git a/beam/task.go b/beam/task.go deleted file mode 100644 index 4ee3f95584..0000000000 --- a/beam/task.go +++ /dev/null @@ -1,36 +0,0 @@ -package beam - -import ( - "fmt" - "sync" -) - -func Task(f func(in Receiver, out Sender)) Sender { - var running bool - var l sync.RWMutex - inR, inW := Pipe() - outR, outW := Pipe() - obj := NewServer() - obj.OnVerb(Attach, Handler(func(msg *Message) error { - msg.Ret.Send(&Message{Verb: Ack, Ret: inW}) - fmt.Printf("copying task output from %#v to %#v\n", outR, msg.Ret) - defer fmt.Printf("(DONE) copying task output from %#v to %#v\n", outR, msg.Ret) - Copy(msg.Ret, outR) - return nil - })) - obj.OnVerb(Start, Handler(func(msg *Message) error { - l.RLock() - r := running - l.RUnlock() - if r { - return fmt.Errorf("already running") - } - l.Lock() - go f(inR, outW) - running = true - l.Unlock() - msg.Ret.Send(&Message{Verb: Ack}) - return nil - })) - return obj -} diff --git a/beam/tree.go b/beam/tree.go deleted file mode 100644 index 83c92dc1a0..0000000000 --- a/beam/tree.go +++ /dev/null @@ -1,44 +0,0 @@ -package beam - -import ( - "sort" -) - -type Tree struct { - *Server - children map[string]Sender -} - -func NewTree() *Tree { - t := &Tree{ - Server: NewServer(), - children: make(map[string]Sender), - } - t.OnVerb(Attach, Handler(func(msg *Message) error { - if len(msg.Args) == 0 || msg.Args[0] == "" { - msg.Ret.Send(&Message{Verb: Ack, Ret: t}) - return nil - } - if child, exists := t.children[msg.Args[0]]; exists { - msg.Ret.Send(&Message{Verb: Ack, Ret: child}) - return nil - } - Obj(msg.Ret).Error("not found") - return nil - })) - t.OnVerb(Ls, Handler(func(msg *Message) error { - names := make([]string, 0, len(t.children)) - for name := range t.children { - names = append(names, name) - } - sort.Strings(names) - Obj(msg.Ret).Set(names...) - return nil - })) - return t -} - -func (t *Tree) Bind(name string, dst Sender) *Tree { - t.children[name] = dst - return t -} diff --git a/beam/object.go b/client.go similarity index 68% rename from beam/object.go rename to client.go index eca3389c1f..518c42dccf 100644 --- a/beam/object.go +++ b/client.go @@ -1,4 +1,4 @@ -package beam +package libswarm import ( "encoding/json" @@ -8,23 +8,21 @@ import ( "strings" ) -// FIXME: rename Object to Client - -type Object struct { +type Client struct { Sender } -func Obj(dst Sender) *Object { - return &Object{dst} +func AsClient(dst Sender) *Client { + return &Client{dst} } -func (o *Object) Log(msg string, args ...interface{}) error { - _, err := o.Send(&Message{Verb: Log, Args: []string{fmt.Sprintf(msg, args...)}}) +func (c *Client) Log(msg string, args ...interface{}) error { + _, err := c.Send(&Message{Verb: Log, Args: []string{fmt.Sprintf(msg, args...)}}) return err } -func (o *Object) Ls() ([]string, error) { - ret, err := o.Send(&Message{Verb: Ls, Ret: RetPipe}) +func (c *Client) Ls() ([]string, error) { + ret, err := c.Send(&Message{Verb: Ls, Ret: RetPipe}) if err != nil { return nil, err } @@ -44,8 +42,8 @@ func (o *Object) Ls() ([]string, error) { return nil, fmt.Errorf("unexpected verb %v", msg.Verb) } -func (o *Object) Spawn(cmd ...string) (out *Object, err error) { - ret, err := o.Send(&Message{Verb: Spawn, Args: cmd, Ret: RetPipe}) +func (c *Client) Spawn(cmd ...string) (out *Client, err error) { + ret, err := c.Send(&Message{Verb: Spawn, Args: cmd, Ret: RetPipe}) if err != nil { return nil, err } @@ -57,7 +55,7 @@ func (o *Object) Spawn(cmd ...string) (out *Object, err error) { return nil, err } if msg.Verb == Ack { - return &Object{msg.Ret}, nil + return &Client{msg.Ret}, nil } msg.Ret.Close() if msg.Verb == Error { @@ -66,8 +64,8 @@ func (o *Object) Spawn(cmd ...string) (out *Object, err error) { return nil, fmt.Errorf("unexpected verb %v", msg.Verb) } -func (o *Object) Attach(name string) (in Receiver, out *Object, err error) { - ret, err := o.Send(&Message{Verb: Attach, Args: []string{name}, Ret: RetPipe}) +func (c *Client) Attach(name string) (in Receiver, out *Client, err error) { + ret, err := c.Send(&Message{Verb: Attach, Args: []string{name}, Ret: RetPipe}) if err != nil { return nil, nil, err } @@ -79,7 +77,7 @@ func (o *Object) Attach(name string) (in Receiver, out *Object, err error) { return nil, nil, err } if msg.Verb == Ack { - return ret, &Object{msg.Ret}, nil + return ret, &Client{msg.Ret}, nil } msg.Ret.Close() if msg.Verb == Error { @@ -88,13 +86,13 @@ func (o *Object) Attach(name string) (in Receiver, out *Object, err error) { return nil, nil, fmt.Errorf("unexpected verb %v", msg.Verb) } -func (o *Object) Error(msg string, args ...interface{}) error { - _, err := o.Send(&Message{Verb: Error, Args: []string{fmt.Sprintf(msg, args...)}}) +func (c *Client) Error(msg string, args ...interface{}) error { + _, err := c.Send(&Message{Verb: Error, Args: []string{fmt.Sprintf(msg, args...)}}) return err } -func (o *Object) Connect() (net.Conn, error) { - ret, err := o.Send(&Message{Verb: Connect, Ret: RetPipe}) +func (c *Client) Connect() (net.Conn, error) { + ret, err := c.Send(&Message{Verb: Connect, Ret: RetPipe}) if err != nil { return nil, err } @@ -121,21 +119,21 @@ func (o *Object) Connect() (net.Conn, error) { return nil, fmt.Errorf("unexpected verb %v", msg.Verb) } -func (o *Object) SetJson(val interface{}) error { +func (c *Client) SetJson(val interface{}) error { txt, err := json.Marshal(val) if err != nil { return err } - return o.Set(string(txt)) + return c.Set(string(txt)) } -func (o *Object) Set(vals ...string) error { - _, err := o.Send(&Message{Verb: Set, Args: vals}) +func (c *Client) Set(vals ...string) error { + _, err := c.Send(&Message{Verb: Set, Args: vals}) return err } -func (o *Object) Get() (string, error) { - ret, err := o.Send(&Message{Verb: Get, Ret: RetPipe}) +func (c *Client) Get() (string, error) { + ret, err := c.Send(&Message{Verb: Get, Ret: RetPipe}) if err != nil { return "", err } @@ -158,8 +156,8 @@ func (o *Object) Get() (string, error) { return "", fmt.Errorf("unexpected verb %v", msg.Verb) } -func (o *Object) Watch() (Receiver, error) { - ret, err := o.Send(&Message{Verb: Watch, Ret: RetPipe}) +func (c *Client) Watch() (Receiver, error) { + ret, err := c.Send(&Message{Verb: Watch, Ret: RetPipe}) if err != nil { return nil, err } @@ -173,8 +171,8 @@ func (o *Object) Watch() (Receiver, error) { return nil, fmt.Errorf("unexpected verb %v", msg.Verb) } -func (o *Object) Start() error { - ret, err := o.Send(&Message{Verb: Start, Ret: RetPipe}) +func (c *Client) Start() error { + ret, err := c.Send(&Message{Verb: Start, Ret: RetPipe}) msg, err := ret.Receive(0) if err == io.EOF { return fmt.Errorf("unexpected EOF") @@ -188,8 +186,8 @@ func (o *Object) Start() error { return fmt.Errorf("unexpected verb %v", msg.Verb) } -func (o *Object) Stop() error { - ret, err := o.Send(&Message{Verb: Stop, Ret: RetPipe}) +func (c *Client) Stop() error { + ret, err := c.Send(&Message{Verb: Stop, Ret: RetPipe}) msg, err := ret.Receive(0) if err == io.EOF { return fmt.Errorf("unexpected EOF") diff --git a/debug/debug.go b/debug/debug.go index 0e361e6db7..de0d0a9bd3 100644 --- a/debug/debug.go +++ b/debug/debug.go @@ -5,46 +5,47 @@ import ( "io" "log" - "github.com/docker/libswarm/beam" + "github.com/docker/libswarm" + "github.com/docker/libswarm/utils" ) // The Debug service is an example of intercepting messages between a receiver and a sender. // The service also exposes messages passing through it for debug purposes. -func Debug() beam.Sender { +func Debug() libswarm.Sender { dbgInstance := &debug{ - service: beam.NewServer(), + service: libswarm.NewServer(), } - sender := beam.NewServer() - sender.OnVerb(beam.Spawn, beam.Handler(dbgInstance.spawn)) + sender := libswarm.NewServer() + sender.OnVerb(libswarm.Spawn, libswarm.Handler(dbgInstance.spawn)) return sender } // Debug service type type debug struct { - service *beam.Server - out beam.Sender + service *libswarm.Server + out libswarm.Sender } // Spawn will return a new instance as the Ret channel of the message sent back -func (dbg *debug) spawn(msg *beam.Message) (err error) { - // By sending back a task, beam will run the function with the in and out arguments +func (dbg *debug) spawn(msg *libswarm.Message) (err error) { + // By sending back a task, libswarm will run the function with the in and out arguments // set to the services present before and after this one in the pipeline. - instance := beam.Task(func(in beam.Receiver, out beam.Sender) { + instance := utils.Task(func(in libswarm.Receiver, out libswarm.Sender) { // Setup our channels dbg.out = out // Set up the debug interceptor - dbg.service.Catchall(beam.Handler(dbg.catchall)) + dbg.service.Catchall(libswarm.Handler(dbg.catchall)) // Copy everything from the receiver to our service. By copying like this in the task // we can use the catchall handler instead of handling the message here. - beam.Copy(dbg.service, in) + libswarm.Copy(dbg.service, in) }) // Inform the system of our new instance - msg.Ret.Send(&beam.Message{ - Verb: beam.Ack, + msg.Ret.Send(&libswarm.Message{ + Verb: libswarm.Ack, Ret: instance, }) @@ -52,7 +53,7 @@ func (dbg *debug) spawn(msg *beam.Message) (err error) { } // Catches all messages sent to the service -func (dbg *debug) catchall(msg *beam.Message) (err error) { +func (dbg *debug) catchall(msg *libswarm.Message) (err error) { log.Printf("[debug] ---> Outbound Message ---> { Verb: %s, Args: %v }\n", msg.Verb, msg.Args) // If there's no output after us then we'll just reply with an error @@ -61,13 +62,13 @@ func (dbg *debug) catchall(msg *beam.Message) (err error) { return fmt.Errorf("[debug] Verb: %s is not implemented.", msg.Verb) } - // We forward the message with a special Ret value of "beam.RetPipe" - this + // We forward the message with a special Ret value of "libswarm.RetPipe" - this // asks libchan to open a new pipe so that we can read replies from upstream - forwardedMsg := &beam.Message{ + forwardedMsg := &libswarm.Message{ Verb: msg.Verb, Args: msg.Args, Att: msg.Att, - Ret: beam.RetPipe, + Ret: libswarm.RetPipe, } // Send the forwarded message @@ -78,7 +79,7 @@ func (dbg *debug) catchall(msg *beam.Message) (err error) { } else { for { // Relay all messages returned until the inbound channel is empty (EOF) - var reply *beam.Message + var reply *libswarm.Message if reply, err = inbound.Receive(0); err != nil { if err == io.EOF { // EOF is expected diff --git a/beam/message.go b/message.go similarity index 99% rename from beam/message.go rename to message.go index eac5b23c76..0e06e95f8a 100644 --- a/beam/message.go +++ b/message.go @@ -1,4 +1,4 @@ -package beam +package libswarm import ( "github.com/docker/libchan" diff --git a/beam/message_test.go b/message_test.go similarity index 99% rename from beam/message_test.go rename to message_test.go index 8987cb199e..9d24177b01 100644 --- a/beam/message_test.go +++ b/message_test.go @@ -1,4 +1,4 @@ -package beam +package libswarm import ( "io/ioutil" diff --git a/beam/server.go b/server.go similarity index 99% rename from beam/server.go rename to server.go index 23a2e09baa..ee584d3380 100644 --- a/beam/server.go +++ b/server.go @@ -1,4 +1,4 @@ -package beam +package libswarm import ( "github.com/docker/libchan" diff --git a/swarmd/swarmd.go b/swarmd/swarmd.go index 864311d4b0..cb0e512319 100644 --- a/swarmd/swarmd.go +++ b/swarmd/swarmd.go @@ -4,7 +4,7 @@ import ( "fmt" "github.com/codegangsta/cli" "github.com/docker/libswarm/backends" - "github.com/docker/libswarm/beam" + "github.com/docker/libswarm" _ "github.com/dotcloud/docker/api/server" "github.com/flynn/go-shlex" "io" @@ -24,7 +24,7 @@ func main() { } func cmdDaemon(c *cli.Context) { - app := beam.NewServer() + app := libswarm.NewServer() app.OnLog(func(args ...string) error { log.Printf("%s\n", strings.Join(args, " ")) return nil @@ -42,7 +42,7 @@ func cmdDaemon(c *cli.Context) { fmt.Println(strings.Join(names, "\n")) return } - var previousInstanceR beam.Receiver + var previousInstanceR libswarm.Receiver // FIXME: refactor into a Pipeline for idx, backendArg := range c.Args() { bName, bArgs, err := parseCmd(backendArg) @@ -61,9 +61,9 @@ func cmdDaemon(c *cli.Context) { if err != nil { Fatalf("attach: %v", err) } - go func(r beam.Receiver, w beam.Sender, idx int) { + go func(r libswarm.Receiver, w libswarm.Sender, idx int) { if r != nil { - beam.Copy(w, r) + libswarm.Copy(w, r) } w.Close() }(previousInstanceR, instanceW, idx) @@ -72,7 +72,7 @@ func cmdDaemon(c *cli.Context) { } previousInstanceR = instanceR } - _, err := beam.Copy(app, previousInstanceR) + _, err := libswarm.Copy(app, previousInstanceR) if err != nil { Fatalf("copy: %v", err) } diff --git a/beam/nop.go b/utils/nop.go similarity index 65% rename from beam/nop.go rename to utils/nop.go index 9d3cabb392..6efe9d2a72 100644 --- a/beam/nop.go +++ b/utils/nop.go @@ -1,13 +1,15 @@ -package beam +package utils import ( "github.com/docker/libchan" + "github.com/docker/libswarm" + "io" ) type NopSender struct{} -func (s NopSender) Send(msg *Message) (Receiver, error) { +func (s NopSender) Send(msg *libswarm.Message) (libswarm.Receiver, error) { return NopReceiver{}, nil } @@ -21,7 +23,7 @@ func (s NopSender) Unwrap() libchan.Sender { type NopReceiver struct{} -func (r NopReceiver) Receive(mode int) (*Message, error) { +func (r NopReceiver) Receive(mode int) (*libswarm.Message, error) { return nil, io.EOF } diff --git a/beam/stream.go b/utils/stream.go similarity index 50% rename from beam/stream.go rename to utils/stream.go index beef7229c1..cec33002b4 100644 --- a/beam/stream.go +++ b/utils/stream.go @@ -1,28 +1,30 @@ -package beam +package utils import ( + "github.com/docker/libswarm" + "fmt" "io" ) -func EncodeStream(sender Sender, reader io.Reader, tag string) { +func EncodeStream(sender libswarm.Sender, reader io.Reader, tag string) { chunk := make([]byte, 4096) for { n, err := reader.Read(chunk) if n > 0 { - sender.Send(&Message{Verb: Log, Args: []string{tag, string(chunk[0:n])}}) + sender.Send(&libswarm.Message{Verb: libswarm.Log, Args: []string{tag, string(chunk[0:n])}}) } if err != nil { message := fmt.Sprintf("Error reading from stream: %v", err) - sender.Send(&Message{Verb: Error, Args: []string{message}}) + sender.Send(&libswarm.Message{Verb: libswarm.Error, Args: []string{message}}) break } } } -func DecodeStream(dst io.Writer, src Receiver, tag string) error { +func DecodeStream(dst io.Writer, src libswarm.Receiver, tag string) error { for { - msg, err := src.Receive(Ret) + msg, err := src.Receive(libswarm.Ret) if err == io.EOF { return nil } diff --git a/utils/task.go b/utils/task.go new file mode 100644 index 0000000000..e937dcb052 --- /dev/null +++ b/utils/task.go @@ -0,0 +1,38 @@ +package utils + +import ( + "github.com/docker/libswarm" + + "fmt" + "sync" +) + +func Task(f func(in libswarm.Receiver, out libswarm.Sender)) libswarm.Sender { + var running bool + var l sync.RWMutex + inR, inW := libswarm.Pipe() + outR, outW := libswarm.Pipe() + obj := libswarm.NewServer() + obj.OnVerb(libswarm.Attach, libswarm.Handler(func(msg *libswarm.Message) error { + msg.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: inW}) + fmt.Printf("copying task output from %#v to %#v\n", outR, msg.Ret) + defer fmt.Printf("(DONE) copying task output from %#v to %#v\n", outR, msg.Ret) + libswarm.Copy(msg.Ret, outR) + return nil + })) + obj.OnVerb(libswarm.Start, libswarm.Handler(func(msg *libswarm.Message) error { + l.RLock() + r := running + l.RUnlock() + if r { + return fmt.Errorf("already running") + } + l.Lock() + go f(inR, outW) + running = true + l.Unlock() + msg.Ret.Send(&libswarm.Message{Verb: libswarm.Ack}) + return nil + })) + return obj +} diff --git a/utils/tree.go b/utils/tree.go new file mode 100644 index 0000000000..169d1de846 --- /dev/null +++ b/utils/tree.go @@ -0,0 +1,46 @@ +package utils + +import ( + "github.com/docker/libswarm" + + "sort" +) + +type Tree struct { + *libswarm.Server + children map[string]libswarm.Sender +} + +func NewTree() *Tree { + t := &Tree{ + Server: libswarm.NewServer(), + children: make(map[string]libswarm.Sender), + } + t.OnVerb(libswarm.Attach, libswarm.Handler(func(msg *libswarm.Message) error { + if len(msg.Args) == 0 || msg.Args[0] == "" { + msg.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: t}) + return nil + } + if child, exists := t.children[msg.Args[0]]; exists { + msg.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: child}) + return nil + } + libswarm.AsClient(msg.Ret).Error("not found") + return nil + })) + t.OnVerb(libswarm.Ls, libswarm.Handler(func(msg *libswarm.Message) error { + names := make([]string, 0, len(t.children)) + for name := range t.children { + names = append(names, name) + } + sort.Strings(names) + libswarm.AsClient(msg.Ret).Set(names...) + return nil + })) + return t +} + +func (t *Tree) Bind(name string, dst libswarm.Sender) *Tree { + t.children[name] = dst + return t +} diff --git a/beam/verbs.go b/verbs.go similarity index 98% rename from beam/verbs.go rename to verbs.go index ad10ca4a3b..460871ef41 100644 --- a/beam/verbs.go +++ b/verbs.go @@ -1,4 +1,4 @@ -package beam +package libswarm import ( "fmt"