Fix Debug for Libchan

This includes some minor cleanup among other things needed to make debug work with libchan (correctly).

Signed-off-by: John Hopper john.hopper@jpserver.net
This commit is contained in:
John Hopper 2014-06-19 11:34:27 -05:00 committed by Aaron Feng
parent 6486d7fc94
commit 62734b842f
1 changed files with 28 additions and 37 deletions

View File

@ -2,6 +2,7 @@ package debug
import ( import (
"fmt" "fmt"
"io"
"log" "log"
"github.com/docker/libswarm/beam" "github.com/docker/libswarm/beam"
@ -19,18 +20,9 @@ func Debug() beam.Sender {
return sender return sender
} }
// Function for forwarding a messgae including some minor error formatting
func forward(out beam.Sender, msg *beam.Message) (err error) {
if _, err := out.Send(msg); err != nil {
return fmt.Errorf("[debug] Failed to forward msg. Reason: %v\n", err)
}
return
}
// Debug service type // Debug service type
type debug struct { type debug struct {
service *beam.Server service *beam.Server
in beam.Receiver
out beam.Sender out beam.Sender
} }
@ -40,7 +32,6 @@ func (dbg *debug) spawn(msg *beam.Message) (err error) {
// set to the services present before and after this one in the pipeline. // set to the services present before and after this one in the pipeline.
instance := beam.Task(func(in beam.Receiver, out beam.Sender) { instance := beam.Task(func(in beam.Receiver, out beam.Sender) {
// Setup our channels // Setup our channels
dbg.in = in
dbg.out = out dbg.out = out
// Set up the debug interceptor // Set up the debug interceptor
@ -62,7 +53,7 @@ func (dbg *debug) spawn(msg *beam.Message) (err error) {
// Catches all messages sent to the service // Catches all messages sent to the service
func (dbg *debug) catchall(msg *beam.Message) (err error) { func (dbg *debug) catchall(msg *beam.Message) (err error) {
log.Printf("[debug] ---> Upstream Message { Verb: %s, Args: %v }\n", msg.Verb, msg.Args) log.Printf("[debug] ---> Outbound Message ---> { Verb: %s, Args: %v }\n", msg.Verb, msg.Args)
// If there's no output after us then we'll just reply with an error // If there's no output after us then we'll just reply with an error
// informing the receiver that the verb is not implemented. // informing the receiver that the verb is not implemented.
@ -70,39 +61,39 @@ func (dbg *debug) catchall(msg *beam.Message) (err error) {
return fmt.Errorf("[debug] Verb: %s is not implemented.", msg.Verb) return fmt.Errorf("[debug] Verb: %s is not implemented.", msg.Verb)
} }
// The forwarded message has the return channel set to a new replyHandler. The replyHandler is a small // We forward the message with a special Ret value of "beam.RetPipe" - this
// callback that allows for interception of downstream messages. // asks libchan to open a new pipe so that we can read replies from upstream
forwardedMessage := &beam.Message{ forwardedMsg := &beam.Message{
Verb: msg.Verb, Verb: msg.Verb,
Args: msg.Args, Args: msg.Args,
Att: msg.Att, Att: msg.Att,
Ret: &replyHandler{ Ret: beam.RetPipe,
Sender: msg.Ret,
},
} }
// Send the forwarded message // Send the forwarded message
if err := forward(dbg.out, forwardedMessage); err == nil { if inbound, err := dbg.out.Send(forwardedMsg); err != nil {
// Hijack the return channel so we can avoid any interference with things such as close return fmt.Errorf("[debug] Failed to forward msg. Reason: %v\n", err)
msg.Ret = beam.NopSender{} } else if inbound == nil {
return fmt.Errorf("[debug] Inbound channel nil.\n")
} else {
for {
// Relay all messages returned until the inbound channel is empty (EOF)
var reply *beam.Message
if reply, err = inbound.Receive(0); err != nil {
if err == io.EOF {
// EOF is expected
err = nil
}
break
}
// Forward the message back downstream
if _, err = msg.Ret.Send(reply); err != nil {
return fmt.Errorf("[debug] Failed to forward msg. Reason: %v\n", err)
}
log.Printf("[debug] <--- Inbound Message <--- { Verb: %s, Args: %v }\n", reply.Verb, reply.Args)
}
} }
return return
} }
// We use a replyHandler to provide context for relaying the return channel
// of the origin message.
type replyHandler struct {
beam.Sender
}
// Send a message using the out channel
func (rh *replyHandler) Send(msg *beam.Message) (receiver beam.Receiver, err error) {
log.Printf("[debug] <--- Downstream Message { Verb: %s, Args: %v }\n", msg.Verb, msg.Args)
return nil, forward(rh.Sender, msg)
}
func (rh *replyHandler) Close() (err error) {
// Since we don't allow the downstream handler to close the return channel, we do so here.
return rh.Sender.Close()
}