From 62734b842fac02ae0f442249859969cb07668bfe Mon Sep 17 00:00:00 2001 From: John Hopper Date: Thu, 19 Jun 2014 11:34:27 -0500 Subject: [PATCH] Fix Debug for Libchan This includes some minor cleanup among other things needed to make debug work with libchan (correctly). Signed-off-by: John Hopper john.hopper@jpserver.net --- debug/debug.go | 65 ++++++++++++++++++++++---------------------------- 1 file changed, 28 insertions(+), 37 deletions(-) diff --git a/debug/debug.go b/debug/debug.go index 686da4acd0..0e361e6db7 100644 --- a/debug/debug.go +++ b/debug/debug.go @@ -2,6 +2,7 @@ package debug import ( "fmt" + "io" "log" "github.com/docker/libswarm/beam" @@ -19,18 +20,9 @@ func Debug() beam.Sender { return sender } -// Function for forwarding a messgae including some minor error formatting -func forward(out beam.Sender, msg *beam.Message) (err error) { - if _, err := out.Send(msg); err != nil { - return fmt.Errorf("[debug] Failed to forward msg. Reason: %v\n", err) - } - return -} - // Debug service type type debug struct { service *beam.Server - in beam.Receiver out beam.Sender } @@ -40,7 +32,6 @@ func (dbg *debug) spawn(msg *beam.Message) (err error) { // set to the services present before and after this one in the pipeline. instance := beam.Task(func(in beam.Receiver, out beam.Sender) { // Setup our channels - dbg.in = in dbg.out = out // Set up the debug interceptor @@ -62,7 +53,7 @@ func (dbg *debug) spawn(msg *beam.Message) (err error) { // Catches all messages sent to the service func (dbg *debug) catchall(msg *beam.Message) (err error) { - log.Printf("[debug] ---> Upstream Message { Verb: %s, Args: %v }\n", msg.Verb, msg.Args) + log.Printf("[debug] ---> Outbound Message ---> { Verb: %s, Args: %v }\n", msg.Verb, msg.Args) // If there's no output after us then we'll just reply with an error // informing the receiver that the verb is not implemented. @@ -70,39 +61,39 @@ func (dbg *debug) catchall(msg *beam.Message) (err error) { return fmt.Errorf("[debug] Verb: %s is not implemented.", msg.Verb) } - // The forwarded message has the return channel set to a new replyHandler. The replyHandler is a small - // callback that allows for interception of downstream messages. - forwardedMessage := &beam.Message{ + // We forward the message with a special Ret value of "beam.RetPipe" - this + // asks libchan to open a new pipe so that we can read replies from upstream + forwardedMsg := &beam.Message{ Verb: msg.Verb, Args: msg.Args, Att: msg.Att, - Ret: &replyHandler{ - Sender: msg.Ret, - }, + Ret: beam.RetPipe, } // Send the forwarded message - if err := forward(dbg.out, forwardedMessage); err == nil { - // Hijack the return channel so we can avoid any interference with things such as close - msg.Ret = beam.NopSender{} + if inbound, err := dbg.out.Send(forwardedMsg); err != nil { + return fmt.Errorf("[debug] Failed to forward msg. Reason: %v\n", err) + } else if inbound == nil { + return fmt.Errorf("[debug] Inbound channel nil.\n") + } else { + for { + // Relay all messages returned until the inbound channel is empty (EOF) + var reply *beam.Message + if reply, err = inbound.Receive(0); err != nil { + if err == io.EOF { + // EOF is expected + err = nil + } + break + } + + // Forward the message back downstream + if _, err = msg.Ret.Send(reply); err != nil { + return fmt.Errorf("[debug] Failed to forward msg. Reason: %v\n", err) + } + log.Printf("[debug] <--- Inbound Message <--- { Verb: %s, Args: %v }\n", reply.Verb, reply.Args) + } } return } - -// We use a replyHandler to provide context for relaying the return channel -// of the origin message. -type replyHandler struct { - beam.Sender -} - -// Send a message using the out channel -func (rh *replyHandler) Send(msg *beam.Message) (receiver beam.Receiver, err error) { - log.Printf("[debug] <--- Downstream Message { Verb: %s, Args: %v }\n", msg.Verb, msg.Args) - return nil, forward(rh.Sender, msg) -} - -func (rh *replyHandler) Close() (err error) { - // Since we don't allow the downstream handler to close the return channel, we do so here. - return rh.Sender.Close() -}