diff --git a/backends/aggregate.go b/backends/aggregate.go index a836e235d6..158229869b 100644 --- a/backends/aggregate.go +++ b/backends/aggregate.go @@ -11,22 +11,21 @@ import ( func Aggregate() beam.Sender { backend := beam.NewServer() - backend.OnVerb(beam.Spawn, beam.Handler(func(ctx *beam.Message) error { + backend.OnSpawn(func(cmd ...string) (beam.Sender, error) { allBackends := New() instance := beam.NewServer() - a, err := newAggregator(allBackends, instance, ctx.Args) + a, err := newAggregator(allBackends, instance, cmd) if err != nil { - return err + return nil, err } - instance.OnVerb(beam.Attach, beam.Handler(a.attach)) - instance.OnVerb(beam.Start, beam.Handler(a.start)) - instance.OnVerb(beam.Ls, beam.Handler(a.ls)) + instance.OnAttach(a.attach) + instance.OnStart(a.start) + instance.OnLs(a.ls) - _, err = ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: instance}) - return err - })) + return instance, nil + }) return backend } @@ -61,13 +60,13 @@ func newAggregator(allBackends *beam.Object, server *beam.Server, args []string) return a, nil } -func (a *aggregator) attach(ctx *beam.Message) error { - if ctx.Args[0] != "" { +func (a *aggregator) attach(name string, ret beam.Sender) error { + if name != "" { // TODO: implement this? return fmt.Errorf("attaching to a child is not implemented") } - if _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: a.server}); err != nil { + if _, err := ret.Send(&beam.Message{Verb: beam.Ack, Ret: a.server}); err != nil { return err } @@ -81,7 +80,7 @@ func (a *aggregator) attach(ctx *beam.Message) error { copies.Add(1) go func() { log.Printf("copying output from %#v\n", b) - beam.Copy(ctx.Ret, r) + beam.Copy(ret, r) log.Printf("finished output from %#v\n", b) copies.Done() }() @@ -91,29 +90,26 @@ func (a *aggregator) attach(ctx *beam.Message) error { return nil } -func (a *aggregator) start(ctx *beam.Message) error { +func (a *aggregator) start() error { for _, b := range a.backends { err := b.Start() if err != nil { return err } } - _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Ack}) - return err + return nil } -func (a *aggregator) ls(ctx *beam.Message) error { +func (a *aggregator) ls() ([]string, error) { var children []string for _, b := range a.backends { bChildren, err := b.Ls() if err != nil { - return err + return nil, err } children = append(children, bChildren...) } - ctx.Ret.Send(&beam.Message{Verb: beam.Set, Args: children}) - - return nil + return children, nil } diff --git a/backends/orchard.go b/backends/orchard.go index 3425c1e8bd..f16d5aebf3 100644 --- a/backends/orchard.go +++ b/backends/orchard.go @@ -13,11 +13,11 @@ import ( func Orchard() beam.Sender { backend := beam.NewServer() - backend.OnVerb(beam.Spawn, beam.Handler(func(ctx *beam.Message) error { - if len(ctx.Args) != 2 { - return fmt.Errorf("orchard: spawn expects 2 arguments: API token and name of host") + backend.OnSpawn(func(cmd ...string) (beam.Sender, error) { + if len(cmd) != 2 { + return nil, fmt.Errorf("orchard: spawn expects 2 arguments: API token and name of host") } - apiToken, hostName := ctx.Args[0], ctx.Args[1] + apiToken, hostName := cmd[0], cmd[1] apiClient := &api.HTTPClient{ BaseURL: "https://api.orchardup.com/v2", @@ -26,13 +26,13 @@ func Orchard() beam.Sender { host, err := apiClient.GetHost(hostName) if err != nil { - return err + return nil, err } url := fmt.Sprintf("tcp://%s:4243", host.IPAddress) tlsConfig, err := getTLSConfig([]byte(host.ClientCert), []byte(host.ClientKey)) if err != nil { - return err + return nil, err } backend := DockerClientWithConfig(&DockerClientConfig{ @@ -43,12 +43,11 @@ func Orchard() beam.Sender { forwardBackend := beam.Obj(backend) forwardInstance, err := forwardBackend.Spawn(url) if err != nil { - return err + return nil, err } - _, err = ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: forwardInstance}) - return err - })) + return forwardInstance, nil + }) return backend }