diff --git a/backends/backends.go b/backends/backends.go index 05b6675d1c..ab1423402b 100644 --- a/backends/backends.go +++ b/backends/backends.go @@ -2,6 +2,7 @@ package backends import ( "github.com/docker/libswarm/beam" + "github.com/docker/libswarm/debug" ) // New returns a new engine, with all backends @@ -13,7 +14,7 @@ import ( func New() *beam.Object { backends := beam.NewTree() backends.Bind("simulator", Simulator()) - backends.Bind("debug", Debug()) + backends.Bind("debug", debug.Debug()) backends.Bind("fakeclient", FakeClient()) backends.Bind("dockerclient", DockerClient()) backends.Bind("exec", Exec()) diff --git a/backends/debug.go b/backends/debug.go deleted file mode 100644 index fa8020676f..0000000000 --- a/backends/debug.go +++ /dev/null @@ -1,31 +0,0 @@ -package backends - -import ( - "fmt" - "strings" - - "github.com/docker/libswarm/beam" -) - -func Debug() beam.Sender { - backend := beam.NewServer() - backend.OnSpawn(beam.Handler(func(ctx *beam.Message) error { - instance := beam.Task(func(in beam.Receiver, out beam.Sender) { - for { - msg, err := in.Receive(beam.Ret) - if err != nil { - fmt.Printf("debug receive: %v", err) - return - } - fmt.Printf("[DEBUG] %s %s\n", msg.Verb, strings.Join(msg.Args, " ")) - if _, err := out.Send(msg); err != nil { - fmt.Printf("debug send: %v", err) - return - } - } - }) - _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: instance}) - return err - })) - return backend -} diff --git a/debug/debug.go b/debug/debug.go new file mode 100644 index 0000000000..c287fc4bd8 --- /dev/null +++ b/debug/debug.go @@ -0,0 +1,108 @@ +package debug + +import ( + "fmt" + "log" + + "github.com/docker/libswarm/beam" +) + +// The Debug service is an example of intercepting messages between a receiver and a sender. +// The service also exposes messages passing through it for debug purposes. +func Debug() beam.Sender { + dbgInstance := &debug{ + service: beam.NewServer(), + } + + sender := beam.NewServer() + sender.OnSpawn(beam.Handler(dbgInstance.spawn)) + return sender +} + +// Function for forwarding a messgae including some minor error formatting +func forward(out beam.Sender, msg *beam.Message) (err error) { + if _, err := out.Send(msg); err != nil { + return fmt.Errorf("[debug] Failed to forward msg. Reason: %v\n", err) + } + return +} + +// Debug service type +type debug struct { + service *beam.Server + in beam.Receiver + out beam.Sender +} + +// Spawn will return a new instance as the Ret channel of the message sent back +func (dbg *debug) spawn(msg *beam.Message) (err error) { + // By sending back a task, beam will run the function with the in and out arguments + // set to the services present before and after this one in the pipeline. + instance := beam.Task(func(in beam.Receiver, out beam.Sender) { + // Setup our channels + dbg.in = in + dbg.out = out + + // Set up the debug interceptor + dbg.service.Catchall(beam.Handler(dbg.catchall)) + + // Copy everything from the receiver to our service. By copying like this in the task + // we can use the catchall handler instead of handling the message here. + beam.Copy(dbg.service, in) + }) + + // Inform the system of our new instance + msg.Ret.Send(&beam.Message{ + Verb: beam.Ack, + Ret: instance, + }) + + return +} + +// Catches all messages sent to the service +func (dbg *debug) catchall(msg *beam.Message) (err error) { + log.Printf("[debug] ---> Upstream Message { Verb: %s, Args: %v }\n", msg.Verb, msg.Args) + + // If there's no output after us then we'll just reply with an error + // informing the receiver that the verb is not implemented. + if dbg.out == nil { + return fmt.Errorf("[debug] Verb: %s is not implemented.", msg.Verb) + } + + // The forwarded message has the return channel set to a new replyHandler. The replyHandler is a small + // callback that allows for interception of downstream messages. + forwardedMessage := &beam.Message{ + Verb: msg.Verb, + Args: msg.Args, + Att: msg.Att, + Ret: &replyHandler{ + out: msg.Ret, + }, + } + + // Send the forwarded message + if err := forward(dbg.out, forwardedMessage); err == nil { + // Hijack the return channel so we can avoid any interference with things such as close + msg.Ret = beam.NopSender{} + } + + return +} + +// We use a replyHandler to provide context for relaying the return channel +// of the origin message. +type replyHandler struct { + out beam.Sender +} + +// Send a message using the out channel +func (rh *replyHandler) Send(msg *beam.Message) (receiver beam.Receiver, err error) { + log.Printf("[debug] <--- Downstream Message { Verb: %s, Args: %v }\n", msg.Verb, msg.Args) + return nil, forward(rh.out, msg) +} + +func (rh *replyHandler) Close() (err error) { + // Since we don't allow the downstream handler to close the return channel, we do so here. + return rh.out.Close() +}