From c90ce90113e06b2a13c0658b2d720dc2c42df8ba Mon Sep 17 00:00:00 2001 From: Solomon Hykes Date: Sun, 1 Jun 2014 22:34:03 +0000 Subject: [PATCH] WIP more beam conversion --- backends/backends.go | 40 +++++++++++++++++++++++++++------------- swarmd/swarmd.go | 30 ++++++++++++++++++++++++++---- 2 files changed, 53 insertions(+), 17 deletions(-) diff --git a/backends/backends.go b/backends/backends.go index e363b10167..e7bbfbf392 100644 --- a/backends/backends.go +++ b/backends/backends.go @@ -1,6 +1,7 @@ package backends import ( + "io" "fmt" "github.com/docker/beam" beamutils "github.com/docker/beam/utils" @@ -17,18 +18,31 @@ import ( // Example: `New().Job("debug").Run()` func New() beam.Sender { backends := beamutils.NewHub() - backends.RegisterName("debug", func(msg *beam.Message, in beam.Receiver, out beam.Sender, next beam.Sender) (bool, error) { - backends.RegisterTask(func(r beam.Receiver, w beam.Sender) error { - for { - msg, msgr, msgw, err := r.Receive(beam.R | beam.W) - if err != nil { - return err - } - fmt.Printf("[DEBUG] %s %s\n", msg.Name, strings.Join(msg.Args, " ")) - // FIXME: goroutine? - splice(w, msg, msgr, msgw) + backends.RegisterName("cd", func(msg *beam.Message, in beam.Receiver, out beam.Sender, next beam.Sender) (bool, error) { + return false, fmt.Errorf("no such backend: %s\n", strings.Join(msg.Args, " ")) + }) + backends.RegisterName("cd", func(msg *beam.Message, in beam.Receiver, out beam.Sender, next beam.Sender) (bool, error) { + if len(msg.Args) > 0 && msg.Args[0] == "debug" { + debugr, debugw, err := out.Send(&beam.Message{Name: "register"}, beam.R|beam.W) + if err != nil { + return false, err } - }) + go func() { + for { + msg, msgr, msgw, err := debugr.Receive(beam.R | beam.W) + if err == io.EOF { + return + } + if err != nil { + return + } + fmt.Printf("[DEBUG] %s %s\n", msg.Name, strings.Join(msg.Args, " ")) + // FIXME: goroutine? + Splice(debugw, msg, msgr, msgw) + } + }() + return false, nil + } return true, nil }) backends.RegisterName("fakeclient", func(msg *beam.Message, in beam.Receiver, out beam.Sender, next beam.Sender) (bool, error) { @@ -43,7 +57,7 @@ func New() beam.Sender { if err != nil { return err } - go beamutils.Copy(beamutils.NopSender{}, containersR) + go beamutils.Copy(beam.NopSender{}, containersR) } }) return true, nil @@ -51,7 +65,7 @@ func New() beam.Sender { return backends } -func splice(dst beam.Sender, msg *beam.Message, r beam.Receiver, w beam.Sender) error { +func Splice(dst beam.Sender, msg *beam.Message, r beam.Receiver, w beam.Sender) error { dstR, dstW, err := dst.Send(msg, beam.R|beam.W) if err != nil { return err diff --git a/swarmd/swarmd.go b/swarmd/swarmd.go index 90596d78d3..9369142ea1 100644 --- a/swarmd/swarmd.go +++ b/swarmd/swarmd.go @@ -154,7 +154,7 @@ func cmdDaemon(c *cli.Context) { } hub := beamutils.NewHub() - backends := backends.New() + back := backends.New() // Load backends for _, cmd := range c.Args() { bName, bArgs, err := parseCmd(cmd) @@ -162,12 +162,31 @@ func cmdDaemon(c *cli.Context) { Fatalf("%v", err) } fmt.Printf("---> Loading backend '%s'\n", strings.Join(append([]string{bName}, bArgs...), " ")) - _, backend, err := backends.Send(&beam.Message{Name: bName, Args: bArgs}, beam.W) + backendr, _, err := back.Send(&beam.Message{Name: "cd", Args: []string{bName}}, beam.R) if err != nil { Fatalf("%s: %v\n", bName, err) } - if err := hub.Register(backend); err != nil { - Fatalf("%v", err) + // backendr will return either 'error' or 'register'. + for { + m, mr, mw, err := backendr.Receive(beam.R|beam.W) + if err != nil { + Fatalf("%v", err) + } + if m.Name == "error" { + Fatalf("%v", strings.Join(m.Args, " ")) + } + if m.Name == "register" { + // FIXME: adapt the beam interface to allow the caller to + // (optionally) pass their own Sender/Receiver? + // Would make proxying/splicing easier. + hubr, hubw, err := hub.Send(m, beam.R|beam.W) + if err != nil { + Fatalf("%v", err) + } + fmt.Printf("successfully registered\n") + go beamutils.Copy(hubw, mr) + go beamutils.Copy(mw, hubr) + } } } in, _, err := hub.Send(&beam.Message{Name: "start"}, beam.R) @@ -176,6 +195,9 @@ func cmdDaemon(c *cli.Context) { } for { msg, _, _, err := in.Receive(0) + if err == io.EOF { + break + } if err != nil { Fatalf("%v", err) }