Convert 'forward' backend to beam

Signed-off-by: Aanand Prasad <aanand.prasad@gmail.com> (github: aanand)
This commit is contained in:
Aanand Prasad 2014-06-04 16:11:29 -07:00 committed by Solomon Hykes
parent 3634a1c9ee
commit 188d75e0ab
3 changed files with 151 additions and 39 deletions

View File

@ -18,6 +18,7 @@ func New() *beam.Object {
backends.Bind("simulator", Simulator())
backends.Bind("debug", Debug())
backends.Bind("fakeclient", FakeClient())
backends.Bind("forward", Forward())
return beam.Obj(backends)
}

View File

@ -2,57 +2,75 @@ package backends
import (
"fmt"
"github.com/docker/libswarm/beam"
"github.com/dotcloud/docker/engine"
"io/ioutil"
"net/http"
"net/url"
"strings"
"time"
)
func Forward() engine.Installer {
return &forwarder{}
func Forward() beam.Sender {
backend := beam.NewServer()
backend.OnSpawn(beam.Handler(func(ctx *beam.Message) error {
if len(ctx.Args) != 1 {
return fmt.Errorf("forward: spawn takes exactly 1 argument, got %d", len(ctx.Args))
}
client, err := newClient(ctx.Args[0], "v0.10")
if err != nil {
return fmt.Errorf("%v", err)
}
f := &forwarder{client: client}
instance := beam.NewServer()
instance.OnAttach(beam.Handler(f.attach))
instance.OnStart(beam.Handler(f.start))
instance.OnLs(beam.Handler(f.ls))
_, err = ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: instance})
return err
}))
return backend
}
type forwarder struct {
client *client
}
func (f *forwarder) Install(eng *engine.Engine) error {
eng.Register("forward", func(job *engine.Job) engine.Status {
if len(job.Args) != 1 {
return job.Errorf("usage: %s <proto>://<addr>", job.Name)
func (f *forwarder) attach(ctx *beam.Message) error {
ctx.Ret.Send(&beam.Message{Verb: beam.Ack})
for {
time.Sleep(1 * time.Second)
(&beam.Object{ctx.Ret}).Log("forward: heartbeat")
}
client, err := newClient(job.Args[0], "v0.10")
return nil
}
func (f *forwarder) start(ctx *beam.Message) error {
ctx.Ret.Send(&beam.Message{Verb: beam.Ack})
return nil
}
func (f *forwarder) ls(ctx *beam.Message) error {
resp, err := f.client.call("GET", "/containers/json", "")
if err != nil {
return job.Errorf("%v", err)
}
job.Eng.Register("containers", func(job *engine.Job) engine.Status {
path := fmt.Sprintf(
"/containers/json?all=%s&size=%s&since=%s&before=%s&limit=%s",
url.QueryEscape(job.Getenv("all")),
url.QueryEscape(job.Getenv("size")),
url.QueryEscape(job.Getenv("since")),
url.QueryEscape(job.Getenv("before")),
url.QueryEscape(job.Getenv("limit")),
)
resp, err := client.call("GET", path, "")
if err != nil {
return job.Errorf("%s: get: %v", client.URL.String(), err)
return fmt.Errorf("%s: get: %v", f.client.URL.String(), err)
}
// FIXME: check for response error
c := engine.NewTable("Created", 0)
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return job.Errorf("%s: read body: %v", client.URL.String(), err)
return fmt.Errorf("%s: read body: %v", f.client.URL.String(), err)
}
fmt.Printf("---> '%s'\n", body)
if _, err := c.ReadListFrom(body); err != nil {
return job.Errorf("%s: readlist: %v", client.URL.String(), err)
return fmt.Errorf("%s: readlist: %v", f.client.URL.String(), err)
}
names := []string{}
for _, env := range c.Data {
names = append(names, env.GetList("Names")[0][1:])
}
if _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Set, Args: names}); err != nil {
return fmt.Errorf("%s: send response: %v", f.client.URL.String(), err)
}
c.WriteListTo(job.Stdout)
return engine.StatusOK
})
return engine.StatusOK
})
return nil
}

View File

@ -0,0 +1,93 @@
package main
import (
"fmt"
"github.com/codegangsta/cli"
"github.com/docker/libswarm/backends"
"github.com/docker/libswarm/beam"
"log"
"os"
"strings"
)
func main() {
app := cli.NewApp()
app.Name = "swarmd"
app.Usage = "Control a heterogenous distributed system with the Docker API"
app.Version = "0.0.1"
app.Flags = []cli.Flag{
cli.StringFlag{"backend", "debug", "load a backend"},
}
app.Action = cmdDaemon
app.Run(os.Args)
}
func cmdDaemon(c *cli.Context) {
app := beam.NewServer()
app.OnLog(beam.Handler(func(msg *beam.Message) error {
log.Printf("%s\n", strings.Join(msg.Args, " "))
return nil
}))
app.OnError(beam.Handler(func(msg *beam.Message) error {
Fatalf("Fatal: %v", strings.Join(msg.Args[:1], ""))
return nil
}))
backend := beam.Object{backends.Forward()}
dockerHost := os.Getenv("DOCKER_HOST")
if dockerHost == "" {
dockerHost = "unix:///var/run/docker.sock"
}
log.Printf("---> Spawning\n")
instance, err := backend.Spawn(dockerHost)
if err != nil {
Fatalf("spawn: %v\n", err)
}
log.Printf("---> Attaching\n")
instanceIn, instanceOut, err := instance.Attach("")
if err != nil {
Fatalf("attach: %v", err)
}
defer instanceOut.Close()
go beam.Copy(app, instanceIn)
log.Printf("---> Starting\n")
if err := instance.Start(); err != nil {
Fatalf("start: %v", err)
}
err = doCmd(instance, c.Args())
if err != nil {
Fatalf("%v", err)
}
}
func doCmd(instance *beam.Object, args []string) error {
if len(args) == 0 {
return fmt.Errorf("no command supplied")
}
log.Printf("---> %s\n", args[0])
if args[0] == "ps" {
if len(args) != 1 {
return fmt.Errorf("usage: ps")
}
names, err := instance.Ls()
if err != nil {
return err
}
fmt.Println(strings.Join(names, "\n"))
return nil
}
return fmt.Errorf("unrecognised command: %s", args[0])
}
func Fatalf(msg string, args ...interface{}) {
if !strings.HasSuffix(msg, "\n") {
msg = msg + "\n"
}
fmt.Fprintf(os.Stderr, msg, args...)
os.Exit(1)
}