Merge pull request #100 from aanand/port-to-libchan-pt2

This commit is contained in:
Solomon Hykes 2014-07-01 19:00:26 -07:00
commit 379c579493
25 changed files with 358 additions and 346 deletions

View File

@ -1,7 +1,7 @@
package backends
import (
"github.com/docker/libswarm/beam"
"github.com/docker/libswarm"
"github.com/flynn/go-shlex"
"fmt"
@ -9,11 +9,11 @@ import (
"sync"
)
func Aggregate() beam.Sender {
backend := beam.NewServer()
backend.OnSpawn(func(cmd ...string) (beam.Sender, error) {
func Aggregate() libswarm.Sender {
backend := libswarm.NewServer()
backend.OnSpawn(func(cmd ...string) (libswarm.Sender, error) {
allBackends := New()
instance := beam.NewServer()
instance := libswarm.NewServer()
a, err := newAggregator(allBackends, instance, cmd)
if err != nil {
@ -30,11 +30,11 @@ func Aggregate() beam.Sender {
}
type aggregator struct {
backends []*beam.Object
server *beam.Server
backends []*libswarm.Client
server *libswarm.Server
}
func newAggregator(allBackends *beam.Object, server *beam.Server, args []string) (*aggregator, error) {
func newAggregator(allBackends *libswarm.Client, server *libswarm.Server, args []string) (*aggregator, error) {
a := &aggregator{server: server}
for _, argString := range args {
@ -60,13 +60,13 @@ func newAggregator(allBackends *beam.Object, server *beam.Server, args []string)
return a, nil
}
func (a *aggregator) attach(name string, ret beam.Sender) error {
func (a *aggregator) attach(name string, ret libswarm.Sender) error {
if name != "" {
// TODO: implement this?
return fmt.Errorf("attaching to a child is not implemented")
}
if _, err := ret.Send(&beam.Message{Verb: beam.Ack, Ret: a.server}); err != nil {
if _, err := ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: a.server}); err != nil {
return err
}
@ -80,7 +80,7 @@ func (a *aggregator) attach(name string, ret beam.Sender) error {
copies.Add(1)
go func() {
log.Printf("copying output from %#v\n", b)
beam.Copy(ret, r)
libswarm.Copy(ret, r)
log.Printf("finished output from %#v\n", b)
copies.Done()
}()

View File

@ -1,8 +1,9 @@
package backends
import (
"github.com/docker/libswarm/beam"
"github.com/docker/libswarm"
"github.com/docker/libswarm/debug"
"github.com/docker/libswarm/utils"
)
// New returns a new engine, with all backends
@ -11,8 +12,8 @@ import (
// engine, named after the desired backend.
//
// Example: `New().Job("debug").Run()`
func New() *beam.Object {
backends := beam.NewTree()
func New() *libswarm.Client {
backends := utils.NewTree()
backends.Bind("simulator", Simulator())
backends.Bind("debug", debug.Debug())
backends.Bind("fakeclient", FakeClient())
@ -24,5 +25,5 @@ func New() *beam.Object {
backends.Bind("shipyard", Shipyard())
backends.Bind("ec2", Ec2())
backends.Bind("tutum", Tutum())
return beam.Obj(backends)
return libswarm.AsClient(backends)
}

View File

@ -4,9 +4,10 @@ import (
"crypto/tls"
"encoding/json"
"fmt"
"github.com/docker/libswarm/beam"
"github.com/docker/libswarm"
"github.com/docker/libswarm/utils"
"github.com/dotcloud/docker/engine"
"github.com/dotcloud/docker/utils"
dockerutils "github.com/dotcloud/docker/utils"
"io"
"io/ioutil"
"net"
@ -22,16 +23,16 @@ type DockerClientConfig struct {
TLSClientConfig *tls.Config
}
func DockerClient() beam.Sender {
func DockerClient() libswarm.Sender {
return DockerClientWithConfig(&DockerClientConfig{
Scheme: "http",
URLHost: "dummy.host",
})
}
func DockerClientWithConfig(config *DockerClientConfig) beam.Sender {
backend := beam.NewServer()
backend.OnSpawn(func(cmd ...string) (beam.Sender, error) {
func DockerClientWithConfig(config *DockerClientConfig) libswarm.Sender {
backend := libswarm.NewServer()
backend.OnSpawn(func(cmd ...string) (libswarm.Sender, error) {
if len(cmd) != 1 {
return nil, fmt.Errorf("dockerclient: spawn takes exactly 1 argument, got %d", len(cmd))
}
@ -42,7 +43,7 @@ func DockerClientWithConfig(config *DockerClientConfig) beam.Sender {
client.setURL(cmd[0])
b := &dockerClientBackend{
client: client,
Server: beam.NewServer(),
Server: libswarm.NewServer(),
}
b.Server.OnAttach(b.attach)
b.Server.OnStart(b.start)
@ -55,12 +56,12 @@ func DockerClientWithConfig(config *DockerClientConfig) beam.Sender {
type dockerClientBackend struct {
client *client
*beam.Server
*libswarm.Server
}
func (b *dockerClientBackend) attach(name string, ret beam.Sender) error {
func (b *dockerClientBackend) attach(name string, ret libswarm.Sender) error {
if name == "" {
ret.Send(&beam.Message{Verb: beam.Ack, Ret: b.Server})
ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: b.Server})
<-make(chan struct{})
} else {
path := fmt.Sprintf("/containers/%s/json", name)
@ -76,7 +77,7 @@ func (b *dockerClientBackend) attach(name string, ret beam.Sender) error {
return fmt.Errorf("%s", respBody)
}
c := b.newContainer(name)
ret.Send(&beam.Message{Verb: beam.Ack, Ret: c})
ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: c})
}
return nil
}
@ -106,7 +107,7 @@ func (b *dockerClientBackend) ls() ([]string, error) {
return names, nil
}
func (b *dockerClientBackend) spawn(cmd ...string) (beam.Sender, error) {
func (b *dockerClientBackend) spawn(cmd ...string) (libswarm.Sender, error) {
if len(cmd) != 1 {
return nil, fmt.Errorf("dockerclient: spawn takes exactly 1 argument, got %d", len(cmd))
}
@ -128,9 +129,9 @@ func (b *dockerClientBackend) spawn(cmd ...string) (beam.Sender, error) {
return b.newContainer(respJson.Id), nil
}
func (b *dockerClientBackend) newContainer(id string) beam.Sender {
func (b *dockerClientBackend) newContainer(id string) libswarm.Sender {
c := &container{backend: b, id: id}
instance := beam.NewServer()
instance := libswarm.NewServer()
instance.OnAttach(c.attach)
instance.OnStart(c.start)
instance.OnStop(c.stop)
@ -143,8 +144,8 @@ type container struct {
id string
}
func (c *container) attach(name string, ret beam.Sender) error {
if _, err := ret.Send(&beam.Message{Verb: beam.Ack}); err != nil {
func (c *container) attach(name string, ret libswarm.Sender) error {
if _, err := ret.Send(&libswarm.Message{Verb: libswarm.Ack}); err != nil {
return err
}
@ -152,8 +153,8 @@ func (c *container) attach(name string, ret beam.Sender) error {
stdoutR, stdoutW := io.Pipe()
stderrR, stderrW := io.Pipe()
go beam.EncodeStream(ret, stdoutR, "stdout")
go beam.EncodeStream(ret, stderrR, "stderr")
go utils.EncodeStream(ret, stdoutR, "stdout")
go utils.EncodeStream(ret, stderrR, "stderr")
c.backend.client.hijack("POST", path, nil, stdoutW, stderrW)
return nil
@ -270,40 +271,40 @@ func (c *client) hijack(method, path string, in io.ReadCloser, stdout, stderr io
rwc, br := clientconn.Hijack()
defer rwc.Close()
receiveStdout := utils.Go(func() (err error) {
receiveStdout := dockerutils.Go(func() (err error) {
defer func() {
if in != nil {
in.Close()
}
}()
_, err = utils.StdCopy(stdout, stderr, br)
utils.Debugf("[hijack] End of stdout")
_, err = dockerutils.StdCopy(stdout, stderr, br)
dockerutils.Debugf("[hijack] End of stdout")
return err
})
sendStdin := utils.Go(func() error {
sendStdin := dockerutils.Go(func() error {
if in != nil {
io.Copy(rwc, in)
utils.Debugf("[hijack] End of stdin")
dockerutils.Debugf("[hijack] End of stdin")
}
if tcpc, ok := rwc.(*net.TCPConn); ok {
if err := tcpc.CloseWrite(); err != nil {
utils.Debugf("Couldn't send EOF: %s", err)
dockerutils.Debugf("Couldn't send EOF: %s", err)
}
} else if unixc, ok := rwc.(*net.UnixConn); ok {
if err := unixc.CloseWrite(); err != nil {
utils.Debugf("Couldn't send EOF: %s", err)
dockerutils.Debugf("Couldn't send EOF: %s", err)
}
}
// Discard errors due to pipe interruption
return nil
})
if err := <-receiveStdout; err != nil {
utils.Debugf("Error receiveStdout: %s", err)
dockerutils.Debugf("Error receiveStdout: %s", err)
return err
}
if err := <-sendStdin; err != nil {
utils.Debugf("Error sendStdin: %s", err)
dockerutils.Debugf("Error sendStdin: %s", err)
return err
}
return nil

View File

@ -1,7 +1,7 @@
package backends
import (
"github.com/docker/libswarm/beam"
"github.com/docker/libswarm"
"fmt"
"io/ioutil"
@ -240,21 +240,21 @@ func (s *stubServer) AllSummaries() []string {
return summaries
}
func instance(t *testing.T, server *stubServer) *beam.Object {
func instance(t *testing.T, server *stubServer) *libswarm.Client {
url := "tcp://localhost:4243"
if server != nil {
url = strings.Replace(server.URL, "http://", "tcp://", 1)
}
backend := DockerClient()
instance, err := beam.Obj(backend).Spawn(url)
instance, err := libswarm.AsClient(backend).Spawn(url)
if err != nil {
t.Fatal(err)
}
return instance
}
func child(t *testing.T, server *stubServer, i *beam.Object, name string) *beam.Object {
func child(t *testing.T, server *stubServer, i *libswarm.Client, name string) *libswarm.Client {
_, child, err := i.Attach(name)
if err != nil {
t.Fatal(err)

View File

@ -3,11 +3,12 @@ package backends
import (
"encoding/json"
"fmt"
"github.com/docker/libswarm/beam"
"github.com/docker/libswarm"
"github.com/docker/libswarm/utils"
"github.com/dotcloud/docker/api"
"github.com/dotcloud/docker/pkg/version"
dockerContainerConfig "github.com/dotcloud/docker/runconfig"
"github.com/dotcloud/docker/utils"
dockerutils "github.com/dotcloud/docker/utils"
"github.com/gorilla/mux"
"io"
"io/ioutil"
@ -20,10 +21,10 @@ import (
"time"
)
func DockerServer() beam.Sender {
backend := beam.NewServer()
backend.OnVerb(beam.Spawn, beam.Handler(func(ctx *beam.Message) error {
instance := beam.Task(func(in beam.Receiver, out beam.Sender) {
func DockerServer() libswarm.Sender {
backend := libswarm.NewServer()
backend.OnVerb(libswarm.Spawn, libswarm.Handler(func(ctx *libswarm.Message) error {
instance := utils.Task(func(in libswarm.Receiver, out libswarm.Sender) {
url := "tcp://localhost:4243"
if len(ctx.Args) > 0 {
url = ctx.Args[0]
@ -33,15 +34,15 @@ func DockerServer() beam.Sender {
fmt.Printf("listenAndServe: %v", err)
}
})
_, err := ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: instance})
_, err := ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: instance})
return err
}))
return backend
}
type HttpApiFunc func(out beam.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error
type HttpApiFunc func(out libswarm.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error
func listenAndServe(urlStr string, out beam.Sender) error {
func listenAndServe(urlStr string, out libswarm.Sender) error {
fmt.Println("Starting Docker server...")
r, err := createRouter(out)
if err != nil {
@ -69,7 +70,7 @@ func listenAndServe(urlStr string, out beam.Sender) error {
return httpSrv.Serve(l)
}
func ping(out beam.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func ping(out libswarm.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
_, err := w.Write([]byte{'O', 'K'})
return err
}
@ -107,14 +108,14 @@ type containerJson struct {
VolumesRW map[string]bool
}
func getContainerJson(out beam.Sender, containerID string) (containerJson, error) {
o := beam.Obj(out)
func getContainerJson(out libswarm.Sender, containerID string) (containerJson, error) {
o := libswarm.AsClient(out)
_, containerOut, err := o.Attach(containerID)
if err != nil {
return containerJson{}, err
}
container := beam.Obj(containerOut)
container := libswarm.AsClient(containerOut)
responseJson, err := container.Get()
if err != nil {
return containerJson{}, err
@ -128,7 +129,7 @@ func getContainerJson(out beam.Sender, containerID string) (containerJson, error
return response, nil
}
func getContainerInfo(out beam.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func getContainerInfo(out libswarm.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
container, err := getContainerJson(out, vars["name"])
if err != nil {
return err
@ -136,12 +137,12 @@ func getContainerInfo(out beam.Sender, version version.Version, w http.ResponseW
return writeJSON(w, http.StatusOK, container)
}
func getContainersJSON(out beam.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func getContainersJSON(out libswarm.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := r.ParseForm(); err != nil {
return err
}
o := beam.Obj(out)
o := libswarm.AsClient(out)
names, err := o.Ls()
if err != nil {
return err
@ -214,7 +215,7 @@ func getContainersJSON(out beam.Sender, version version.Version, w http.Response
return writeJSON(w, http.StatusOK, responses)
}
func postContainersCreate(out beam.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func postContainersCreate(out libswarm.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := r.ParseForm(); err != nil {
return nil
}
@ -224,7 +225,7 @@ func postContainersCreate(out beam.Sender, version version.Version, w http.Respo
return err
}
container, err := beam.Obj(out).Spawn(string(body))
container, err := libswarm.AsClient(out).Spawn(string(body))
if err != nil {
return err
}
@ -241,7 +242,7 @@ func postContainersCreate(out beam.Sender, version version.Version, w http.Respo
return writeJSON(w, http.StatusCreated, response)
}
func postContainersStart(out beam.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func postContainersStart(out libswarm.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if vars == nil {
return fmt.Errorf("Missing parameter")
}
@ -249,8 +250,8 @@ func postContainersStart(out beam.Sender, version version.Version, w http.Respon
// TODO: r.Body
name := vars["name"]
_, containerOut, err := beam.Obj(out).Attach(name)
container := beam.Obj(containerOut)
_, containerOut, err := libswarm.AsClient(out).Attach(name)
container := libswarm.AsClient(containerOut)
if err != nil {
return err
}
@ -262,14 +263,14 @@ func postContainersStart(out beam.Sender, version version.Version, w http.Respon
return nil
}
func postContainersStop(out beam.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func postContainersStop(out libswarm.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if vars == nil {
return fmt.Errorf("Missing parameter")
}
name := vars["name"]
_, containerOut, err := beam.Obj(out).Attach(name)
container := beam.Obj(containerOut)
_, containerOut, err := libswarm.AsClient(out).Attach(name)
container := libswarm.AsClient(containerOut)
if err != nil {
return err
}
@ -291,7 +292,7 @@ func hijackServer(w http.ResponseWriter) (io.ReadCloser, io.Writer, error) {
return conn, conn, nil
}
func postContainersAttach(out beam.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func postContainersAttach(out libswarm.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := r.ParseForm(); err != nil {
return err
}
@ -321,20 +322,20 @@ func postContainersAttach(out beam.Sender, version version.Version, w http.Respo
fmt.Fprintf(outStream, "HTTP/1.1 200 OK\r\nContent-Type: application/vnd.docker.raw-stream\r\n\r\n")
// TODO: if a TTY, then no multiplexing is done
errStream := utils.NewStdWriter(outStream, utils.Stderr)
outStream = utils.NewStdWriter(outStream, utils.Stdout)
errStream := dockerutils.NewStdWriter(outStream, dockerutils.Stderr)
outStream = dockerutils.NewStdWriter(outStream, dockerutils.Stdout)
_, containerOut, err := beam.Obj(out).Attach(vars["name"])
_, containerOut, err := libswarm.AsClient(out).Attach(vars["name"])
if err != nil {
return err
}
container := beam.Obj(containerOut)
container := libswarm.AsClient(containerOut)
containerR, _, err := container.Attach("")
var tasks sync.WaitGroup
go func() {
defer tasks.Done()
err := beam.DecodeStream(outStream, containerR, "stdout")
err := utils.DecodeStream(outStream, containerR, "stdout")
if err != nil {
fmt.Printf("decodestream: %v\n", err)
}
@ -342,7 +343,7 @@ func postContainersAttach(out beam.Sender, version version.Version, w http.Respo
tasks.Add(1)
go func() {
defer tasks.Done()
err := beam.DecodeStream(errStream, containerR, "stderr")
err := utils.DecodeStream(errStream, containerR, "stderr")
if err != nil {
fmt.Printf("decodestream: %v\n", err)
}
@ -353,7 +354,7 @@ func postContainersAttach(out beam.Sender, version version.Version, w http.Respo
return nil
}
func postContainersWait(out beam.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
func postContainersWait(out libswarm.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if vars == nil {
return fmt.Errorf("Missing parameter")
}
@ -365,7 +366,7 @@ func postContainersWait(out beam.Sender, version version.Version, w http.Respons
})
}
func createRouter(out beam.Sender) (*mux.Router, error) {
func createRouter(out libswarm.Sender) (*mux.Router, error) {
r := mux.NewRouter()
m := map[string]map[string]HttpApiFunc{
"GET": {
@ -405,7 +406,7 @@ func createRouter(out beam.Sender) (*mux.Router, error) {
return r, nil
}
func makeHttpHandler(out beam.Sender, localMethod string, localRoute string, handlerFunc HttpApiFunc, dockerVersion version.Version) http.HandlerFunc {
func makeHttpHandler(out libswarm.Sender, localMethod string, localRoute string, handlerFunc HttpApiFunc, dockerVersion version.Version) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// log the request
fmt.Printf("Calling %s %s\n", localMethod, localRoute)

View File

@ -3,7 +3,7 @@ package backends
import (
"errors"
"fmt"
"github.com/docker/libswarm/beam"
"github.com/docker/libswarm"
"net"
"net/http"
"os"
@ -36,22 +36,22 @@ type ec2Config struct {
type ec2Client struct {
config *ec2Config
ec2Conn *ec2.EC2
Server *beam.Server
Server *libswarm.Server
instance *ec2.Instance
sshTunnel *os.Process
dockerInstance *beam.Object
dockerInstance *libswarm.Client
}
func (c *ec2Client) get(ctx *beam.Message) error {
func (c *ec2Client) get(ctx *libswarm.Message) error {
output, err := c.dockerInstance.Get()
if err != nil {
return err
}
ctx.Ret.Send(&beam.Message{Verb: beam.Set, Args: []string{output}})
ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Set, Args: []string{output}})
return nil
}
func (c *ec2Client) start(ctx *beam.Message) error {
func (c *ec2Client) start(ctx *libswarm.Message) error {
if instance, err := c.findInstance(); err != nil {
return err
} else if instance != nil {
@ -73,44 +73,44 @@ func (c *ec2Client) start(ctx *beam.Message) error {
c.waitForDockerDaemon()
fmt.Printf("ec2 service up and running: region: %s zone: %s\n",
c.config.region.Name, c.config.zone)
ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: c.Server})
ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: c.Server})
return nil
}
func (c *ec2Client) spawn(ctx *beam.Message) error {
func (c *ec2Client) spawn(ctx *libswarm.Message) error {
out, err := c.dockerInstance.Spawn(ctx.Args...)
if err != nil {
return err
}
ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: out})
ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: out})
return nil
}
func (c *ec2Client) ls(ctx *beam.Message) error {
func (c *ec2Client) ls(ctx *libswarm.Message) error {
output, err := c.dockerInstance.Ls()
if err != nil {
return err
}
ctx.Ret.Send(&beam.Message{Verb: beam.Set, Args: output})
ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Set, Args: output})
return nil
}
func (c *ec2Client) stop(ctx *beam.Message) error {
func (c *ec2Client) stop(ctx *libswarm.Message) error {
c.dockerInstance.Stop()
return nil
}
func (c *ec2Client) attach(ctx *beam.Message) error {
func (c *ec2Client) attach(ctx *libswarm.Message) error {
if ctx.Args[0] == "" {
ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: c.Server})
ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: c.Server})
<-make(chan struct{})
} else {
_, out, err := c.dockerInstance.Attach(ctx.Args[0])
if err != nil {
return err
}
ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: out})
ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: out})
}
return nil
@ -281,7 +281,7 @@ func (c *ec2Client) initDockerClientInstance(instance *ec2.Instance) error {
URLHost: "localhost",
})
dockerBackend := beam.Obj(dockerClient)
dockerBackend := libswarm.AsClient(dockerClient)
url := fmt.Sprintf("tcp://localhost:%s", c.config.sshLocalPort)
dockerInstance, err := dockerBackend.Spawn(url)
c.dockerInstance = dockerInstance
@ -341,9 +341,9 @@ func signalHandler(client *ec2Client) {
}()
}
func Ec2() beam.Sender {
backend := beam.NewServer()
backend.OnVerb(beam.Spawn, beam.Handler(func(ctx *beam.Message) error {
func Ec2() libswarm.Sender {
backend := libswarm.NewServer()
backend.OnVerb(libswarm.Spawn, libswarm.Handler(func(ctx *libswarm.Message) error {
var config, err = newConfig(ctx.Args)
if err != nil {
@ -355,16 +355,16 @@ func Ec2() beam.Sender {
return err
}
client := &ec2Client{config, ec2Conn, beam.NewServer(), nil, nil, nil}
client.Server.OnVerb(beam.Spawn, beam.Handler(client.spawn))
client.Server.OnVerb(beam.Start, beam.Handler(client.start))
client.Server.OnVerb(beam.Stop, beam.Handler(client.stop))
client.Server.OnVerb(beam.Attach, beam.Handler(client.attach))
client.Server.OnVerb(beam.Ls, beam.Handler(client.ls))
client.Server.OnVerb(beam.Get, beam.Handler(client.get))
client := &ec2Client{config, ec2Conn, libswarm.NewServer(), nil, nil, nil}
client.Server.OnVerb(libswarm.Spawn, libswarm.Handler(client.spawn))
client.Server.OnVerb(libswarm.Start, libswarm.Handler(client.start))
client.Server.OnVerb(libswarm.Stop, libswarm.Handler(client.stop))
client.Server.OnVerb(libswarm.Attach, libswarm.Handler(client.attach))
client.Server.OnVerb(libswarm.Ls, libswarm.Handler(client.ls))
client.Server.OnVerb(libswarm.Get, libswarm.Handler(client.get))
signalHandler(client)
_, err = ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: client.Server})
_, err = ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: client.Server})
return err
}))

View File

@ -9,12 +9,12 @@ import (
"strings"
"sync"
"github.com/docker/libswarm/beam"
"github.com/docker/libswarm"
)
func Exec() beam.Sender {
e := beam.NewServer()
e.OnVerb(beam.Spawn, beam.Handler(func(msg *beam.Message) error {
func Exec() libswarm.Sender {
e := libswarm.NewServer()
e.OnVerb(libswarm.Spawn, libswarm.Handler(func(msg *libswarm.Message) error {
if len(msg.Args) < 1 {
return fmt.Errorf("usage: SPAWN exec|... <config>")
}
@ -31,9 +31,9 @@ func Exec() beam.Sender {
}
cmd := &command{
Cmd: exec.Command(config.Path, config.Args...),
Server: beam.NewServer(),
Server: libswarm.NewServer(),
}
cmd.OnVerb(beam.Attach, beam.Handler(func(msg *beam.Message) error {
cmd.OnVerb(libswarm.Attach, libswarm.Handler(func(msg *libswarm.Message) error {
stdout, err := cmd.StdoutPipe()
if err != nil {
return err
@ -42,11 +42,11 @@ func Exec() beam.Sender {
if err != nil {
return err
}
inR, inW := beam.Pipe()
if _, err := msg.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: inW}); err != nil {
inR, inW := libswarm.Pipe()
if _, err := msg.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: inW}); err != nil {
return err
}
out := beam.Obj(msg.Ret)
out := libswarm.AsClient(msg.Ret)
go func() {
defer stdin.Close()
for {
@ -54,7 +54,7 @@ func Exec() beam.Sender {
if err != nil {
return
}
if msg.Verb == beam.Log && len(msg.Args) > 0 {
if msg.Verb == libswarm.Log && len(msg.Args) > 0 {
fmt.Fprintf(stdin, "%s\n", strings.TrimRight(msg.Args[0], "\r\n"))
}
}
@ -76,7 +76,7 @@ func Exec() beam.Sender {
cmd.tasks.Wait()
return nil
}))
cmd.OnVerb(beam.Start, beam.Handler(func(msg *beam.Message) error {
cmd.OnVerb(libswarm.Start, libswarm.Handler(func(msg *libswarm.Message) error {
cmd.tasks.Add(1)
if err := cmd.Cmd.Start(); err != nil {
return err
@ -84,13 +84,13 @@ func Exec() beam.Sender {
go func() {
defer cmd.tasks.Done()
if err := cmd.Cmd.Wait(); err != nil {
beam.Obj(msg.Ret).Log("%s exited status=%v", cmd.Cmd.Path, err)
libswarm.AsClient(msg.Ret).Log("%s exited status=%v", cmd.Cmd.Path, err)
}
}()
msg.Ret.Send(&beam.Message{Verb: beam.Ack})
msg.Ret.Send(&libswarm.Message{Verb: libswarm.Ack})
return nil
}))
if _, err := msg.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: cmd}); err != nil {
if _, err := msg.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: cmd}); err != nil {
return err
}
return nil
@ -100,6 +100,6 @@ func Exec() beam.Sender {
type command struct {
*exec.Cmd
*beam.Server
*libswarm.Server
tasks sync.WaitGroup
}

View File

@ -4,17 +4,18 @@ import (
"fmt"
"time"
"github.com/docker/libswarm/beam"
"github.com/docker/libswarm"
"github.com/docker/libswarm/utils"
)
func FakeClient() beam.Sender {
backend := beam.NewServer()
backend.OnVerb(beam.Spawn, beam.Handler(func(ctx *beam.Message) error {
func FakeClient() libswarm.Sender {
backend := libswarm.NewServer()
backend.OnVerb(libswarm.Spawn, libswarm.Handler(func(ctx *libswarm.Message) error {
// Instantiate a new fakeclient instance
instance := beam.Task(func(in beam.Receiver, out beam.Sender) {
instance := utils.Task(func(in libswarm.Receiver, out libswarm.Sender) {
fmt.Printf("fake client!\n")
defer fmt.Printf("end of fake client!\n")
o := beam.Obj(out)
o := libswarm.AsClient(out)
o.Log("fake client starting")
defer o.Log("fake client terminating")
for {
@ -22,7 +23,7 @@ func FakeClient() beam.Sender {
o.Log("fake client heartbeat!")
}
})
_, err := ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: instance})
_, err := ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: instance})
return err
}))
return backend

View File

@ -1,7 +1,7 @@
package backends
import (
"github.com/docker/libswarm/beam"
"github.com/docker/libswarm"
"github.com/orchardup/go-orchard/api"
"crypto/tls"
@ -11,9 +11,9 @@ import (
"os"
)
func Orchard() beam.Sender {
backend := beam.NewServer()
backend.OnSpawn(func(cmd ...string) (beam.Sender, error) {
func Orchard() libswarm.Sender {
backend := libswarm.NewServer()
backend.OnSpawn(func(cmd ...string) (libswarm.Sender, error) {
if len(cmd) != 2 {
return nil, fmt.Errorf("orchard: spawn expects 2 arguments: API token and name of host")
}
@ -40,7 +40,7 @@ func Orchard() beam.Sender {
URLHost: host.IPAddress,
TLSClientConfig: tlsConfig,
})
forwardBackend := beam.Obj(backend)
forwardBackend := libswarm.AsClient(backend)
forwardInstance, err := forwardBackend.Spawn(url)
if err != nil {
return nil, err

View File

@ -3,7 +3,7 @@ package backends
import (
"encoding/json"
"fmt"
"github.com/docker/libswarm/beam"
"github.com/docker/libswarm"
"io/ioutil"
"net/http"
"net/url"
@ -11,29 +11,29 @@ import (
"time"
)
func Shipyard() beam.Sender {
backend := beam.NewServer()
backend.OnVerb(beam.Spawn, beam.Handler(func(ctx *beam.Message) error {
func Shipyard() libswarm.Sender {
backend := libswarm.NewServer()
backend.OnVerb(libswarm.Spawn, libswarm.Handler(func(ctx *libswarm.Message) error {
if len(ctx.Args) != 3 {
return fmt.Errorf("Shipyard: Usage <shipyard URL> <user> <pass>")
}
c := &shipyard{url: ctx.Args[0], user: ctx.Args[1], pass: ctx.Args[2]}
c.Server = beam.NewServer()
c.Server.OnVerb(beam.Attach, beam.Handler(c.attach))
c.Server.OnVerb(beam.Start, beam.Handler(c.start))
c.Server.OnVerb(beam.Ls, beam.Handler(c.containers))
c.OnVerb(beam.Get, beam.Handler(c.containerInspect))
_, err := ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: c.Server})
c.Server = libswarm.NewServer()
c.Server.OnVerb(libswarm.Attach, libswarm.Handler(c.attach))
c.Server.OnVerb(libswarm.Start, libswarm.Handler(c.start))
c.Server.OnVerb(libswarm.Ls, libswarm.Handler(c.containers))
c.OnVerb(libswarm.Get, libswarm.Handler(c.containerInspect))
_, err := ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: c.Server})
return err
}))
return backend
}
func (c *shipyard) attach(ctx *beam.Message) error {
func (c *shipyard) attach(ctx *libswarm.Message) error {
if ctx.Args[0] == "" {
ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: c.Server})
ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: c.Server})
for {
time.Sleep(1 * time.Second)
}
@ -41,17 +41,17 @@ func (c *shipyard) attach(ctx *beam.Message) error {
return nil
}
func (c *shipyard) start(ctx *beam.Message) error {
ctx.Ret.Send(&beam.Message{Verb: beam.Ack})
func (c *shipyard) start(ctx *libswarm.Message) error {
ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack})
return nil
}
type shipyard struct {
url, user, pass string
*beam.Server
*libswarm.Server
}
func (c *shipyard) containers(ctx *beam.Message) error {
func (c *shipyard) containers(ctx *libswarm.Message) error {
out, err := c.gateway("GET", "containers", "")
if err != nil {
return err
@ -62,7 +62,7 @@ func (c *shipyard) containers(ctx *beam.Message) error {
for _, c := range data.Objects {
ids = append(ids, c.Id)
}
if _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Set, Args: ids}); err != nil {
if _, err := ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Set, Args: ids}); err != nil {
return err
}
return nil
@ -76,7 +76,7 @@ type shipyardObject struct {
Id string `json:"container_id"`
}
func (c *shipyard) containerInspect(ctx *beam.Message) error {
func (c *shipyard) containerInspect(ctx *libswarm.Message) error {
if len(ctx.Args) != 1 {
return fmt.Errorf("Expected 1 container id, got %s", len(ctx.Args))
}
@ -87,7 +87,7 @@ func (c *shipyard) containerInspect(ctx *beam.Message) error {
}
var data shipyardObject
json.Unmarshal(out, &data)
if _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Set, Args: []string{"foo", "bar"}}); err != nil {
if _, err := ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Set, Args: []string{"foo", "bar"}}); err != nil {
return err
}
return nil

View File

@ -1,24 +1,25 @@
package backends
import (
"github.com/docker/libswarm/beam"
"github.com/docker/libswarm"
"github.com/docker/libswarm/utils"
)
func Simulator() beam.Sender {
s := beam.NewServer()
s.OnVerb(beam.Spawn, beam.Handler(func(ctx *beam.Message) error {
func Simulator() libswarm.Sender {
s := libswarm.NewServer()
s.OnVerb(libswarm.Spawn, libswarm.Handler(func(ctx *libswarm.Message) error {
containers := ctx.Args
instance := beam.Task(func(in beam.Receiver, out beam.Sender) {
beam.Obj(out).Log("[simulator] starting\n")
s := beam.NewServer()
s.OnVerb(beam.Ls, beam.Handler(func(msg *beam.Message) error {
beam.Obj(out).Log("[simulator] generating fake list of objects...\n")
beam.Obj(msg.Ret).Set(containers...)
instance := utils.Task(func(in libswarm.Receiver, out libswarm.Sender) {
libswarm.AsClient(out).Log("[simulator] starting\n")
s := libswarm.NewServer()
s.OnVerb(libswarm.Ls, libswarm.Handler(func(msg *libswarm.Message) error {
libswarm.AsClient(out).Log("[simulator] generating fake list of objects...\n")
libswarm.AsClient(msg.Ret).Set(containers...)
return nil
}))
beam.Copy(s, in)
libswarm.Copy(s, in)
})
ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: instance})
ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: instance})
return nil
}))
return s

View File

@ -3,7 +3,7 @@ package backends
import (
"encoding/json"
"fmt"
"github.com/docker/libswarm/beam"
"github.com/docker/libswarm"
"github.com/dotcloud/docker/engine"
"github.com/tutumcloud/go-tutum"
"io/ioutil"
@ -18,9 +18,9 @@ var (
tutumConnectorVersion = "v1.11"
)
func Tutum() beam.Sender {
backend := beam.NewServer()
backend.OnVerb(beam.Spawn, beam.Handler(func(ctx *beam.Message) error {
func Tutum() libswarm.Sender {
backend := libswarm.NewServer()
backend.OnVerb(libswarm.Spawn, libswarm.Handler(func(ctx *libswarm.Message) error {
if len(ctx.Args) == 2 {
tutum.User = ctx.Args[0]
tutum.ApiKey = ctx.Args[1]
@ -34,13 +34,13 @@ func Tutum() beam.Sender {
}
t := &tutumBackend{
tutumDockerConnector: tutumDockerConnector,
Server: beam.NewServer(),
Server: libswarm.NewServer(),
}
t.Server.OnVerb(beam.Attach, beam.Handler(t.attach))
t.Server.OnVerb(beam.Start, beam.Handler(t.ack))
t.Server.OnVerb(beam.Ls, beam.Handler(t.ls))
t.Server.OnVerb(beam.Spawn, beam.Handler(t.spawn))
_, err = ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: t.Server})
t.Server.OnVerb(libswarm.Attach, libswarm.Handler(t.attach))
t.Server.OnVerb(libswarm.Start, libswarm.Handler(t.ack))
t.Server.OnVerb(libswarm.Ls, libswarm.Handler(t.ls))
t.Server.OnVerb(libswarm.Spawn, libswarm.Handler(t.spawn))
_, err = ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: t.Server})
return err
}))
return backend
@ -48,28 +48,28 @@ func Tutum() beam.Sender {
type tutumBackend struct {
tutumDockerConnector *tutumDockerConnector
*beam.Server
*libswarm.Server
}
func (t *tutumBackend) attach(ctx *beam.Message) error {
func (t *tutumBackend) attach(ctx *libswarm.Message) error {
if ctx.Args[0] == "" {
ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: t.Server})
ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: t.Server})
for {
time.Sleep(1 * time.Second)
}
} else {
c := t.newContainer(ctx.Args[0])
ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: c})
ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: c})
}
return nil
}
func (t *tutumBackend) ack(ctx *beam.Message) error {
ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: t.Server})
func (t *tutumBackend) ack(ctx *libswarm.Message) error {
ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: t.Server})
return nil
}
func (t *tutumBackend) ls(ctx *beam.Message) error {
func (t *tutumBackend) ls(ctx *libswarm.Message) error {
resp, err := t.tutumDockerConnector.call("GET", "/containers/json", "")
if err != nil {
return fmt.Errorf("%s: get: %v", t.tutumDockerConnector.URL.String(), err)
@ -86,13 +86,13 @@ func (t *tutumBackend) ls(ctx *beam.Message) error {
for _, env := range c.Data {
ids = append(ids, env.GetList("Id")[0])
}
if _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Set, Args: ids}); err != nil {
if _, err := ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Set, Args: ids}); err != nil {
return fmt.Errorf("%s: send response: %v", t.tutumDockerConnector.URL.String(), err)
}
return nil
}
func (t *tutumBackend) spawn(ctx *beam.Message) error {
func (t *tutumBackend) spawn(ctx *libswarm.Message) error {
if len(ctx.Args) != 1 {
return fmt.Errorf("tutum: spawn takes exactly 1 argument, got %d", len(ctx.Args))
}
@ -112,18 +112,18 @@ func (t *tutumBackend) spawn(ctx *beam.Message) error {
return err
}
c := t.newContainer(respJson.Id)
if _, err = ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: c}); err != nil {
if _, err = ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: c}); err != nil {
return err
}
return nil
}
func (t *tutumBackend) newContainer(id string) beam.Sender {
func (t *tutumBackend) newContainer(id string) libswarm.Sender {
c := &tutumContainer{tutumBackend: t, id: id}
instance := beam.NewServer()
instance.OnVerb(beam.Get, beam.Handler(c.get))
instance.OnVerb(beam.Start, beam.Handler(c.start))
instance.OnVerb(beam.Stop, beam.Handler(c.stop))
instance := libswarm.NewServer()
instance.OnVerb(libswarm.Get, libswarm.Handler(c.get))
instance.OnVerb(libswarm.Start, libswarm.Handler(c.start))
instance.OnVerb(libswarm.Stop, libswarm.Handler(c.stop))
return instance
}
@ -132,7 +132,7 @@ type tutumContainer struct {
id string
}
func (c *tutumContainer) get(ctx *beam.Message) error {
func (c *tutumContainer) get(ctx *libswarm.Message) error {
path := fmt.Sprintf("/containers/%s/json", c.id)
resp, err := c.tutumBackend.tutumDockerConnector.call("GET", path, "")
if err != nil {
@ -146,13 +146,13 @@ func (c *tutumContainer) get(ctx *beam.Message) error {
if resp.StatusCode != 200 {
return fmt.Errorf("%s", respBody)
}
if _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Set, Args: []string{string(respBody)}}); err != nil {
if _, err := ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Set, Args: []string{string(respBody)}}); err != nil {
return err
}
return nil
}
func (c *tutumContainer) start(ctx *beam.Message) error {
func (c *tutumContainer) start(ctx *libswarm.Message) error {
path := fmt.Sprintf("/containers/%s/start", c.id)
resp, err := c.tutumBackend.tutumDockerConnector.call("POST", path, "")
if err != nil {
@ -165,13 +165,13 @@ func (c *tutumContainer) start(ctx *beam.Message) error {
if resp.StatusCode != 204 {
return fmt.Errorf("expected status code 204, got %d:\n%s", resp.StatusCode, respBody)
}
if _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Ack}); err != nil {
if _, err := ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack}); err != nil {
return err
}
return nil
}
func (c *tutumContainer) stop(ctx *beam.Message) error {
func (c *tutumContainer) stop(ctx *libswarm.Message) error {
path := fmt.Sprintf("/containers/%s/stop", c.id)
resp, err := c.tutumBackend.tutumDockerConnector.call("POST", path, "")
if err != nil {
@ -184,7 +184,7 @@ func (c *tutumContainer) stop(ctx *beam.Message) error {
if resp.StatusCode != 204 {
return fmt.Errorf("expected status code 204, got %d:\n%s", resp.StatusCode, respBody)
}
if _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Ack}); err != nil {
if _, err := ctx.Ret.Send(&libswarm.Message{Verb: libswarm.Ack}); err != nil {
return err
}
return nil

View File

@ -1,36 +0,0 @@
package beam
import (
"fmt"
"sync"
)
func Task(f func(in Receiver, out Sender)) Sender {
var running bool
var l sync.RWMutex
inR, inW := Pipe()
outR, outW := Pipe()
obj := NewServer()
obj.OnVerb(Attach, Handler(func(msg *Message) error {
msg.Ret.Send(&Message{Verb: Ack, Ret: inW})
fmt.Printf("copying task output from %#v to %#v\n", outR, msg.Ret)
defer fmt.Printf("(DONE) copying task output from %#v to %#v\n", outR, msg.Ret)
Copy(msg.Ret, outR)
return nil
}))
obj.OnVerb(Start, Handler(func(msg *Message) error {
l.RLock()
r := running
l.RUnlock()
if r {
return fmt.Errorf("already running")
}
l.Lock()
go f(inR, outW)
running = true
l.Unlock()
msg.Ret.Send(&Message{Verb: Ack})
return nil
}))
return obj
}

View File

@ -1,44 +0,0 @@
package beam
import (
"sort"
)
type Tree struct {
*Server
children map[string]Sender
}
func NewTree() *Tree {
t := &Tree{
Server: NewServer(),
children: make(map[string]Sender),
}
t.OnVerb(Attach, Handler(func(msg *Message) error {
if len(msg.Args) == 0 || msg.Args[0] == "" {
msg.Ret.Send(&Message{Verb: Ack, Ret: t})
return nil
}
if child, exists := t.children[msg.Args[0]]; exists {
msg.Ret.Send(&Message{Verb: Ack, Ret: child})
return nil
}
Obj(msg.Ret).Error("not found")
return nil
}))
t.OnVerb(Ls, Handler(func(msg *Message) error {
names := make([]string, 0, len(t.children))
for name := range t.children {
names = append(names, name)
}
sort.Strings(names)
Obj(msg.Ret).Set(names...)
return nil
}))
return t
}
func (t *Tree) Bind(name string, dst Sender) *Tree {
t.children[name] = dst
return t
}

View File

@ -1,4 +1,4 @@
package beam
package libswarm
import (
"encoding/json"
@ -8,23 +8,21 @@ import (
"strings"
)
// FIXME: rename Object to Client
type Object struct {
type Client struct {
Sender
}
func Obj(dst Sender) *Object {
return &Object{dst}
func AsClient(dst Sender) *Client {
return &Client{dst}
}
func (o *Object) Log(msg string, args ...interface{}) error {
_, err := o.Send(&Message{Verb: Log, Args: []string{fmt.Sprintf(msg, args...)}})
func (c *Client) Log(msg string, args ...interface{}) error {
_, err := c.Send(&Message{Verb: Log, Args: []string{fmt.Sprintf(msg, args...)}})
return err
}
func (o *Object) Ls() ([]string, error) {
ret, err := o.Send(&Message{Verb: Ls, Ret: RetPipe})
func (c *Client) Ls() ([]string, error) {
ret, err := c.Send(&Message{Verb: Ls, Ret: RetPipe})
if err != nil {
return nil, err
}
@ -44,8 +42,8 @@ func (o *Object) Ls() ([]string, error) {
return nil, fmt.Errorf("unexpected verb %v", msg.Verb)
}
func (o *Object) Spawn(cmd ...string) (out *Object, err error) {
ret, err := o.Send(&Message{Verb: Spawn, Args: cmd, Ret: RetPipe})
func (c *Client) Spawn(cmd ...string) (out *Client, err error) {
ret, err := c.Send(&Message{Verb: Spawn, Args: cmd, Ret: RetPipe})
if err != nil {
return nil, err
}
@ -57,7 +55,7 @@ func (o *Object) Spawn(cmd ...string) (out *Object, err error) {
return nil, err
}
if msg.Verb == Ack {
return &Object{msg.Ret}, nil
return &Client{msg.Ret}, nil
}
msg.Ret.Close()
if msg.Verb == Error {
@ -66,8 +64,8 @@ func (o *Object) Spawn(cmd ...string) (out *Object, err error) {
return nil, fmt.Errorf("unexpected verb %v", msg.Verb)
}
func (o *Object) Attach(name string) (in Receiver, out *Object, err error) {
ret, err := o.Send(&Message{Verb: Attach, Args: []string{name}, Ret: RetPipe})
func (c *Client) Attach(name string) (in Receiver, out *Client, err error) {
ret, err := c.Send(&Message{Verb: Attach, Args: []string{name}, Ret: RetPipe})
if err != nil {
return nil, nil, err
}
@ -79,7 +77,7 @@ func (o *Object) Attach(name string) (in Receiver, out *Object, err error) {
return nil, nil, err
}
if msg.Verb == Ack {
return ret, &Object{msg.Ret}, nil
return ret, &Client{msg.Ret}, nil
}
msg.Ret.Close()
if msg.Verb == Error {
@ -88,13 +86,13 @@ func (o *Object) Attach(name string) (in Receiver, out *Object, err error) {
return nil, nil, fmt.Errorf("unexpected verb %v", msg.Verb)
}
func (o *Object) Error(msg string, args ...interface{}) error {
_, err := o.Send(&Message{Verb: Error, Args: []string{fmt.Sprintf(msg, args...)}})
func (c *Client) Error(msg string, args ...interface{}) error {
_, err := c.Send(&Message{Verb: Error, Args: []string{fmt.Sprintf(msg, args...)}})
return err
}
func (o *Object) Connect() (net.Conn, error) {
ret, err := o.Send(&Message{Verb: Connect, Ret: RetPipe})
func (c *Client) Connect() (net.Conn, error) {
ret, err := c.Send(&Message{Verb: Connect, Ret: RetPipe})
if err != nil {
return nil, err
}
@ -121,21 +119,21 @@ func (o *Object) Connect() (net.Conn, error) {
return nil, fmt.Errorf("unexpected verb %v", msg.Verb)
}
func (o *Object) SetJson(val interface{}) error {
func (c *Client) SetJson(val interface{}) error {
txt, err := json.Marshal(val)
if err != nil {
return err
}
return o.Set(string(txt))
return c.Set(string(txt))
}
func (o *Object) Set(vals ...string) error {
_, err := o.Send(&Message{Verb: Set, Args: vals})
func (c *Client) Set(vals ...string) error {
_, err := c.Send(&Message{Verb: Set, Args: vals})
return err
}
func (o *Object) Get() (string, error) {
ret, err := o.Send(&Message{Verb: Get, Ret: RetPipe})
func (c *Client) Get() (string, error) {
ret, err := c.Send(&Message{Verb: Get, Ret: RetPipe})
if err != nil {
return "", err
}
@ -158,8 +156,8 @@ func (o *Object) Get() (string, error) {
return "", fmt.Errorf("unexpected verb %v", msg.Verb)
}
func (o *Object) Watch() (Receiver, error) {
ret, err := o.Send(&Message{Verb: Watch, Ret: RetPipe})
func (c *Client) Watch() (Receiver, error) {
ret, err := c.Send(&Message{Verb: Watch, Ret: RetPipe})
if err != nil {
return nil, err
}
@ -173,8 +171,8 @@ func (o *Object) Watch() (Receiver, error) {
return nil, fmt.Errorf("unexpected verb %v", msg.Verb)
}
func (o *Object) Start() error {
ret, err := o.Send(&Message{Verb: Start, Ret: RetPipe})
func (c *Client) Start() error {
ret, err := c.Send(&Message{Verb: Start, Ret: RetPipe})
msg, err := ret.Receive(0)
if err == io.EOF {
return fmt.Errorf("unexpected EOF")
@ -188,8 +186,8 @@ func (o *Object) Start() error {
return fmt.Errorf("unexpected verb %v", msg.Verb)
}
func (o *Object) Stop() error {
ret, err := o.Send(&Message{Verb: Stop, Ret: RetPipe})
func (c *Client) Stop() error {
ret, err := c.Send(&Message{Verb: Stop, Ret: RetPipe})
msg, err := ret.Receive(0)
if err == io.EOF {
return fmt.Errorf("unexpected EOF")

View File

@ -5,46 +5,47 @@ import (
"io"
"log"
"github.com/docker/libswarm/beam"
"github.com/docker/libswarm"
"github.com/docker/libswarm/utils"
)
// The Debug service is an example of intercepting messages between a receiver and a sender.
// The service also exposes messages passing through it for debug purposes.
func Debug() beam.Sender {
func Debug() libswarm.Sender {
dbgInstance := &debug{
service: beam.NewServer(),
service: libswarm.NewServer(),
}
sender := beam.NewServer()
sender.OnVerb(beam.Spawn, beam.Handler(dbgInstance.spawn))
sender := libswarm.NewServer()
sender.OnVerb(libswarm.Spawn, libswarm.Handler(dbgInstance.spawn))
return sender
}
// Debug service type
type debug struct {
service *beam.Server
out beam.Sender
service *libswarm.Server
out libswarm.Sender
}
// Spawn will return a new instance as the Ret channel of the message sent back
func (dbg *debug) spawn(msg *beam.Message) (err error) {
// By sending back a task, beam will run the function with the in and out arguments
func (dbg *debug) spawn(msg *libswarm.Message) (err error) {
// By sending back a task, libswarm will run the function with the in and out arguments
// set to the services present before and after this one in the pipeline.
instance := beam.Task(func(in beam.Receiver, out beam.Sender) {
instance := utils.Task(func(in libswarm.Receiver, out libswarm.Sender) {
// Setup our channels
dbg.out = out
// Set up the debug interceptor
dbg.service.Catchall(beam.Handler(dbg.catchall))
dbg.service.Catchall(libswarm.Handler(dbg.catchall))
// Copy everything from the receiver to our service. By copying like this in the task
// we can use the catchall handler instead of handling the message here.
beam.Copy(dbg.service, in)
libswarm.Copy(dbg.service, in)
})
// Inform the system of our new instance
msg.Ret.Send(&beam.Message{
Verb: beam.Ack,
msg.Ret.Send(&libswarm.Message{
Verb: libswarm.Ack,
Ret: instance,
})
@ -52,7 +53,7 @@ func (dbg *debug) spawn(msg *beam.Message) (err error) {
}
// Catches all messages sent to the service
func (dbg *debug) catchall(msg *beam.Message) (err error) {
func (dbg *debug) catchall(msg *libswarm.Message) (err error) {
log.Printf("[debug] ---> Outbound Message ---> { Verb: %s, Args: %v }\n", msg.Verb, msg.Args)
// If there's no output after us then we'll just reply with an error
@ -61,13 +62,13 @@ func (dbg *debug) catchall(msg *beam.Message) (err error) {
return fmt.Errorf("[debug] Verb: %s is not implemented.", msg.Verb)
}
// We forward the message with a special Ret value of "beam.RetPipe" - this
// We forward the message with a special Ret value of "libswarm.RetPipe" - this
// asks libchan to open a new pipe so that we can read replies from upstream
forwardedMsg := &beam.Message{
forwardedMsg := &libswarm.Message{
Verb: msg.Verb,
Args: msg.Args,
Att: msg.Att,
Ret: beam.RetPipe,
Ret: libswarm.RetPipe,
}
// Send the forwarded message
@ -78,7 +79,7 @@ func (dbg *debug) catchall(msg *beam.Message) (err error) {
} else {
for {
// Relay all messages returned until the inbound channel is empty (EOF)
var reply *beam.Message
var reply *libswarm.Message
if reply, err = inbound.Receive(0); err != nil {
if err == io.EOF {
// EOF is expected

View File

@ -1,4 +1,4 @@
package beam
package libswarm
import (
"github.com/docker/libchan"

View File

@ -1,4 +1,4 @@
package beam
package libswarm
import (
"io/ioutil"

View File

@ -1,4 +1,4 @@
package beam
package libswarm
import (
"github.com/docker/libchan"

View File

@ -4,7 +4,7 @@ import (
"fmt"
"github.com/codegangsta/cli"
"github.com/docker/libswarm/backends"
"github.com/docker/libswarm/beam"
"github.com/docker/libswarm"
_ "github.com/dotcloud/docker/api/server"
"github.com/flynn/go-shlex"
"io"
@ -24,7 +24,7 @@ func main() {
}
func cmdDaemon(c *cli.Context) {
app := beam.NewServer()
app := libswarm.NewServer()
app.OnLog(func(args ...string) error {
log.Printf("%s\n", strings.Join(args, " "))
return nil
@ -42,7 +42,7 @@ func cmdDaemon(c *cli.Context) {
fmt.Println(strings.Join(names, "\n"))
return
}
var previousInstanceR beam.Receiver
var previousInstanceR libswarm.Receiver
// FIXME: refactor into a Pipeline
for idx, backendArg := range c.Args() {
bName, bArgs, err := parseCmd(backendArg)
@ -61,9 +61,9 @@ func cmdDaemon(c *cli.Context) {
if err != nil {
Fatalf("attach: %v", err)
}
go func(r beam.Receiver, w beam.Sender, idx int) {
go func(r libswarm.Receiver, w libswarm.Sender, idx int) {
if r != nil {
beam.Copy(w, r)
libswarm.Copy(w, r)
}
w.Close()
}(previousInstanceR, instanceW, idx)
@ -72,7 +72,7 @@ func cmdDaemon(c *cli.Context) {
}
previousInstanceR = instanceR
}
_, err := beam.Copy(app, previousInstanceR)
_, err := libswarm.Copy(app, previousInstanceR)
if err != nil {
Fatalf("copy: %v", err)
}

View File

@ -1,13 +1,15 @@
package beam
package utils
import (
"github.com/docker/libchan"
"github.com/docker/libswarm"
"io"
)
type NopSender struct{}
func (s NopSender) Send(msg *Message) (Receiver, error) {
func (s NopSender) Send(msg *libswarm.Message) (libswarm.Receiver, error) {
return NopReceiver{}, nil
}
@ -21,7 +23,7 @@ func (s NopSender) Unwrap() libchan.Sender {
type NopReceiver struct{}
func (r NopReceiver) Receive(mode int) (*Message, error) {
func (r NopReceiver) Receive(mode int) (*libswarm.Message, error) {
return nil, io.EOF
}

View File

@ -1,28 +1,30 @@
package beam
package utils
import (
"github.com/docker/libswarm"
"fmt"
"io"
)
func EncodeStream(sender Sender, reader io.Reader, tag string) {
func EncodeStream(sender libswarm.Sender, reader io.Reader, tag string) {
chunk := make([]byte, 4096)
for {
n, err := reader.Read(chunk)
if n > 0 {
sender.Send(&Message{Verb: Log, Args: []string{tag, string(chunk[0:n])}})
sender.Send(&libswarm.Message{Verb: libswarm.Log, Args: []string{tag, string(chunk[0:n])}})
}
if err != nil {
message := fmt.Sprintf("Error reading from stream: %v", err)
sender.Send(&Message{Verb: Error, Args: []string{message}})
sender.Send(&libswarm.Message{Verb: libswarm.Error, Args: []string{message}})
break
}
}
}
func DecodeStream(dst io.Writer, src Receiver, tag string) error {
func DecodeStream(dst io.Writer, src libswarm.Receiver, tag string) error {
for {
msg, err := src.Receive(Ret)
msg, err := src.Receive(libswarm.Ret)
if err == io.EOF {
return nil
}

38
utils/task.go Normal file
View File

@ -0,0 +1,38 @@
package utils
import (
"github.com/docker/libswarm"
"fmt"
"sync"
)
func Task(f func(in libswarm.Receiver, out libswarm.Sender)) libswarm.Sender {
var running bool
var l sync.RWMutex
inR, inW := libswarm.Pipe()
outR, outW := libswarm.Pipe()
obj := libswarm.NewServer()
obj.OnVerb(libswarm.Attach, libswarm.Handler(func(msg *libswarm.Message) error {
msg.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: inW})
fmt.Printf("copying task output from %#v to %#v\n", outR, msg.Ret)
defer fmt.Printf("(DONE) copying task output from %#v to %#v\n", outR, msg.Ret)
libswarm.Copy(msg.Ret, outR)
return nil
}))
obj.OnVerb(libswarm.Start, libswarm.Handler(func(msg *libswarm.Message) error {
l.RLock()
r := running
l.RUnlock()
if r {
return fmt.Errorf("already running")
}
l.Lock()
go f(inR, outW)
running = true
l.Unlock()
msg.Ret.Send(&libswarm.Message{Verb: libswarm.Ack})
return nil
}))
return obj
}

46
utils/tree.go Normal file
View File

@ -0,0 +1,46 @@
package utils
import (
"github.com/docker/libswarm"
"sort"
)
type Tree struct {
*libswarm.Server
children map[string]libswarm.Sender
}
func NewTree() *Tree {
t := &Tree{
Server: libswarm.NewServer(),
children: make(map[string]libswarm.Sender),
}
t.OnVerb(libswarm.Attach, libswarm.Handler(func(msg *libswarm.Message) error {
if len(msg.Args) == 0 || msg.Args[0] == "" {
msg.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: t})
return nil
}
if child, exists := t.children[msg.Args[0]]; exists {
msg.Ret.Send(&libswarm.Message{Verb: libswarm.Ack, Ret: child})
return nil
}
libswarm.AsClient(msg.Ret).Error("not found")
return nil
}))
t.OnVerb(libswarm.Ls, libswarm.Handler(func(msg *libswarm.Message) error {
names := make([]string, 0, len(t.children))
for name := range t.children {
names = append(names, name)
}
sort.Strings(names)
libswarm.AsClient(msg.Ret).Set(names...)
return nil
}))
return t
}
func (t *Tree) Bind(name string, dst libswarm.Sender) *Tree {
t.children[name] = dst
return t
}

View File

@ -1,4 +1,4 @@
package beam
package libswarm
import (
"fmt"