WIP more beam conversion

This commit is contained in:
Solomon Hykes 2014-06-01 22:34:03 +00:00
parent 5b54be92ca
commit c90ce90113
2 changed files with 53 additions and 17 deletions

View File

@ -1,6 +1,7 @@
package backends package backends
import ( import (
"io"
"fmt" "fmt"
"github.com/docker/beam" "github.com/docker/beam"
beamutils "github.com/docker/beam/utils" beamutils "github.com/docker/beam/utils"
@ -17,18 +18,31 @@ import (
// Example: `New().Job("debug").Run()` // Example: `New().Job("debug").Run()`
func New() beam.Sender { func New() beam.Sender {
backends := beamutils.NewHub() backends := beamutils.NewHub()
backends.RegisterName("debug", func(msg *beam.Message, in beam.Receiver, out beam.Sender, next beam.Sender) (bool, error) { backends.RegisterName("cd", func(msg *beam.Message, in beam.Receiver, out beam.Sender, next beam.Sender) (bool, error) {
backends.RegisterTask(func(r beam.Receiver, w beam.Sender) error { return false, fmt.Errorf("no such backend: %s\n", strings.Join(msg.Args, " "))
for { })
msg, msgr, msgw, err := r.Receive(beam.R | beam.W) backends.RegisterName("cd", func(msg *beam.Message, in beam.Receiver, out beam.Sender, next beam.Sender) (bool, error) {
if err != nil { if len(msg.Args) > 0 && msg.Args[0] == "debug" {
return err debugr, debugw, err := out.Send(&beam.Message{Name: "register"}, beam.R|beam.W)
} if err != nil {
fmt.Printf("[DEBUG] %s %s\n", msg.Name, strings.Join(msg.Args, " ")) return false, err
// FIXME: goroutine?
splice(w, msg, msgr, msgw)
} }
}) go func() {
for {
msg, msgr, msgw, err := debugr.Receive(beam.R | beam.W)
if err == io.EOF {
return
}
if err != nil {
return
}
fmt.Printf("[DEBUG] %s %s\n", msg.Name, strings.Join(msg.Args, " "))
// FIXME: goroutine?
Splice(debugw, msg, msgr, msgw)
}
}()
return false, nil
}
return true, nil return true, nil
}) })
backends.RegisterName("fakeclient", func(msg *beam.Message, in beam.Receiver, out beam.Sender, next beam.Sender) (bool, error) { backends.RegisterName("fakeclient", func(msg *beam.Message, in beam.Receiver, out beam.Sender, next beam.Sender) (bool, error) {
@ -43,7 +57,7 @@ func New() beam.Sender {
if err != nil { if err != nil {
return err return err
} }
go beamutils.Copy(beamutils.NopSender{}, containersR) go beamutils.Copy(beam.NopSender{}, containersR)
} }
}) })
return true, nil return true, nil
@ -51,7 +65,7 @@ func New() beam.Sender {
return backends return backends
} }
func splice(dst beam.Sender, msg *beam.Message, r beam.Receiver, w beam.Sender) error { func Splice(dst beam.Sender, msg *beam.Message, r beam.Receiver, w beam.Sender) error {
dstR, dstW, err := dst.Send(msg, beam.R|beam.W) dstR, dstW, err := dst.Send(msg, beam.R|beam.W)
if err != nil { if err != nil {
return err return err

View File

@ -154,7 +154,7 @@ func cmdDaemon(c *cli.Context) {
} }
hub := beamutils.NewHub() hub := beamutils.NewHub()
backends := backends.New() back := backends.New()
// Load backends // Load backends
for _, cmd := range c.Args() { for _, cmd := range c.Args() {
bName, bArgs, err := parseCmd(cmd) bName, bArgs, err := parseCmd(cmd)
@ -162,12 +162,31 @@ func cmdDaemon(c *cli.Context) {
Fatalf("%v", err) Fatalf("%v", err)
} }
fmt.Printf("---> Loading backend '%s'\n", strings.Join(append([]string{bName}, bArgs...), " ")) fmt.Printf("---> Loading backend '%s'\n", strings.Join(append([]string{bName}, bArgs...), " "))
_, backend, err := backends.Send(&beam.Message{Name: bName, Args: bArgs}, beam.W) backendr, _, err := back.Send(&beam.Message{Name: "cd", Args: []string{bName}}, beam.R)
if err != nil { if err != nil {
Fatalf("%s: %v\n", bName, err) Fatalf("%s: %v\n", bName, err)
} }
if err := hub.Register(backend); err != nil { // backendr will return either 'error' or 'register'.
Fatalf("%v", err) for {
m, mr, mw, err := backendr.Receive(beam.R|beam.W)
if err != nil {
Fatalf("%v", err)
}
if m.Name == "error" {
Fatalf("%v", strings.Join(m.Args, " "))
}
if m.Name == "register" {
// FIXME: adapt the beam interface to allow the caller to
// (optionally) pass their own Sender/Receiver?
// Would make proxying/splicing easier.
hubr, hubw, err := hub.Send(m, beam.R|beam.W)
if err != nil {
Fatalf("%v", err)
}
fmt.Printf("successfully registered\n")
go beamutils.Copy(hubw, mr)
go beamutils.Copy(mw, hubr)
}
} }
} }
in, _, err := hub.Send(&beam.Message{Name: "start"}, beam.R) in, _, err := hub.Send(&beam.Message{Name: "start"}, beam.R)
@ -176,6 +195,9 @@ func cmdDaemon(c *cli.Context) {
} }
for { for {
msg, _, _, err := in.Receive(0) msg, _, _, err := in.Receive(0)
if err == io.EOF {
break
}
if err != nil { if err != nil {
Fatalf("%v", err) Fatalf("%v", err)
} }