Hub: convenience API to register jobs

* `Hub.Register`
* `Hub.RegisterTask`
* `Hub.RegisterName`
This commit is contained in:
Solomon Hykes 2014-05-12 04:58:28 +00:00
parent 74eaf8492f
commit e4beab1432
1 changed files with 75 additions and 0 deletions

View File

@ -4,6 +4,7 @@ import (
"fmt"
"github.com/docker/beam"
"github.com/docker/beam/inmem"
"io"
"sync"
)
@ -44,6 +45,80 @@ func (hub *Hub) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, e
return hub.handlers.Send(msg, mode)
}
func (hub *Hub) Register(dst beam.Sender) error {
in, _, err := hub.Send(&beam.Message{Name: "register"}, beam.R)
if err != nil {
return err
}
go Copy(dst, in)
return nil
}
func (hub *Hub) RegisterTask(h func(beam.Receiver, beam.Sender) error) error {
in, out, err := hub.Send(&beam.Message{Name: "register"}, beam.R|beam.W)
if err != nil {
return err
}
go func() {
h(in, out)
out.Close()
}()
return nil
}
type Handler func(msg *beam.Message, in beam.Receiver, out beam.Sender, next beam.Sender) (pass bool, err error)
func (hub *Hub) RegisterName(name string, h Handler) error {
return hub.RegisterTask(func(in beam.Receiver, out beam.Sender) error {
var tasks sync.WaitGroup
copyTask := func(dst beam.Sender, src beam.Receiver) {
tasks.Add(1)
go func() {
defer tasks.Done()
if dst == nil {
return
}
defer dst.Close()
if src == nil {
return
}
Copy(dst, src)
}()
}
for {
msg, msgin, msgout, err := in.Receive(beam.R | beam.W)
if err == io.EOF {
break
}
if err != nil {
return err
}
var pass = true
if msg.Name == name || name == "" {
pass, err = h(msg, msgin, msgout, out)
if err != nil {
if _, _, err := msgout.Send(&beam.Message{Name: "error", Args: []string{err.Error()}}, 0); err != nil {
return err
}
}
}
if pass {
nextin, nextout, err := out.Send(msg, beam.R|beam.W)
if err != nil {
return err
}
copyTask(nextout, msgin)
copyTask(msgout, nextin)
} else {
if msgout != nil {
msgout.Close()
}
}
}
return nil
})
}
func (hub *Hub) Wait() {
hub.tasks.Wait()
}