Convert aggregate and orchard to new server DSL

Signed-off-by: Aanand Prasad <aanand.prasad@gmail.com>
This commit is contained in:
Aanand Prasad 2014-06-19 15:15:34 +01:00 committed by Aaron Feng
parent 97a0b78d35
commit 6486d7fc94
2 changed files with 26 additions and 31 deletions

View File

@ -11,22 +11,21 @@ import (
func Aggregate() beam.Sender {
backend := beam.NewServer()
backend.OnVerb(beam.Spawn, beam.Handler(func(ctx *beam.Message) error {
backend.OnSpawn(func(cmd ...string) (beam.Sender, error) {
allBackends := New()
instance := beam.NewServer()
a, err := newAggregator(allBackends, instance, ctx.Args)
a, err := newAggregator(allBackends, instance, cmd)
if err != nil {
return err
return nil, err
}
instance.OnVerb(beam.Attach, beam.Handler(a.attach))
instance.OnVerb(beam.Start, beam.Handler(a.start))
instance.OnVerb(beam.Ls, beam.Handler(a.ls))
instance.OnAttach(a.attach)
instance.OnStart(a.start)
instance.OnLs(a.ls)
_, err = ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: instance})
return err
}))
return instance, nil
})
return backend
}
@ -61,13 +60,13 @@ func newAggregator(allBackends *beam.Object, server *beam.Server, args []string)
return a, nil
}
func (a *aggregator) attach(ctx *beam.Message) error {
if ctx.Args[0] != "" {
func (a *aggregator) attach(name string, ret beam.Sender) error {
if name != "" {
// TODO: implement this?
return fmt.Errorf("attaching to a child is not implemented")
}
if _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: a.server}); err != nil {
if _, err := ret.Send(&beam.Message{Verb: beam.Ack, Ret: a.server}); err != nil {
return err
}
@ -81,7 +80,7 @@ func (a *aggregator) attach(ctx *beam.Message) error {
copies.Add(1)
go func() {
log.Printf("copying output from %#v\n", b)
beam.Copy(ctx.Ret, r)
beam.Copy(ret, r)
log.Printf("finished output from %#v\n", b)
copies.Done()
}()
@ -91,29 +90,26 @@ func (a *aggregator) attach(ctx *beam.Message) error {
return nil
}
func (a *aggregator) start(ctx *beam.Message) error {
func (a *aggregator) start() error {
for _, b := range a.backends {
err := b.Start()
if err != nil {
return err
}
}
_, err := ctx.Ret.Send(&beam.Message{Verb: beam.Ack})
return err
return nil
}
func (a *aggregator) ls(ctx *beam.Message) error {
func (a *aggregator) ls() ([]string, error) {
var children []string
for _, b := range a.backends {
bChildren, err := b.Ls()
if err != nil {
return err
return nil, err
}
children = append(children, bChildren...)
}
ctx.Ret.Send(&beam.Message{Verb: beam.Set, Args: children})
return nil
return children, nil
}

View File

@ -13,11 +13,11 @@ import (
func Orchard() beam.Sender {
backend := beam.NewServer()
backend.OnVerb(beam.Spawn, beam.Handler(func(ctx *beam.Message) error {
if len(ctx.Args) != 2 {
return fmt.Errorf("orchard: spawn expects 2 arguments: API token and name of host")
backend.OnSpawn(func(cmd ...string) (beam.Sender, error) {
if len(cmd) != 2 {
return nil, fmt.Errorf("orchard: spawn expects 2 arguments: API token and name of host")
}
apiToken, hostName := ctx.Args[0], ctx.Args[1]
apiToken, hostName := cmd[0], cmd[1]
apiClient := &api.HTTPClient{
BaseURL: "https://api.orchardup.com/v2",
@ -26,13 +26,13 @@ func Orchard() beam.Sender {
host, err := apiClient.GetHost(hostName)
if err != nil {
return err
return nil, err
}
url := fmt.Sprintf("tcp://%s:4243", host.IPAddress)
tlsConfig, err := getTLSConfig([]byte(host.ClientCert), []byte(host.ClientKey))
if err != nil {
return err
return nil, err
}
backend := DockerClientWithConfig(&DockerClientConfig{
@ -43,12 +43,11 @@ func Orchard() beam.Sender {
forwardBackend := beam.Obj(backend)
forwardInstance, err := forwardBackend.Spawn(url)
if err != nil {
return err
return nil, err
}
_, err = ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: forwardInstance})
return err
}))
return forwardInstance, nil
})
return backend
}