mirror of https://github.com/docker/docs.git
Merge pull request #91
This commit is contained in:
commit
b59b88ebd3
|
@ -2,6 +2,7 @@ package backends
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/docker/libswarm/beam"
|
"github.com/docker/libswarm/beam"
|
||||||
|
"github.com/docker/libswarm/debug"
|
||||||
)
|
)
|
||||||
|
|
||||||
// New returns a new engine, with all backends
|
// New returns a new engine, with all backends
|
||||||
|
@ -13,7 +14,7 @@ import (
|
||||||
func New() *beam.Object {
|
func New() *beam.Object {
|
||||||
backends := beam.NewTree()
|
backends := beam.NewTree()
|
||||||
backends.Bind("simulator", Simulator())
|
backends.Bind("simulator", Simulator())
|
||||||
backends.Bind("debug", Debug())
|
backends.Bind("debug", debug.Debug())
|
||||||
backends.Bind("fakeclient", FakeClient())
|
backends.Bind("fakeclient", FakeClient())
|
||||||
backends.Bind("dockerclient", DockerClient())
|
backends.Bind("dockerclient", DockerClient())
|
||||||
backends.Bind("exec", Exec())
|
backends.Bind("exec", Exec())
|
||||||
|
|
|
@ -1,31 +0,0 @@
|
||||||
package backends
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
"github.com/docker/libswarm/beam"
|
|
||||||
)
|
|
||||||
|
|
||||||
func Debug() beam.Sender {
|
|
||||||
backend := beam.NewServer()
|
|
||||||
backend.OnSpawn(beam.Handler(func(ctx *beam.Message) error {
|
|
||||||
instance := beam.Task(func(in beam.Receiver, out beam.Sender) {
|
|
||||||
for {
|
|
||||||
msg, err := in.Receive(beam.Ret)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("debug receive: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
fmt.Printf("[DEBUG] %s %s\n", msg.Verb, strings.Join(msg.Args, " "))
|
|
||||||
if _, err := out.Send(msg); err != nil {
|
|
||||||
fmt.Printf("debug send: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
_, err := ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: instance})
|
|
||||||
return err
|
|
||||||
}))
|
|
||||||
return backend
|
|
||||||
}
|
|
|
@ -0,0 +1,108 @@
|
||||||
|
package debug
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
|
||||||
|
"github.com/docker/libswarm/beam"
|
||||||
|
)
|
||||||
|
|
||||||
|
// The Debug service is an example of intercepting messages between a receiver and a sender.
|
||||||
|
// The service also exposes messages passing through it for debug purposes.
|
||||||
|
func Debug() beam.Sender {
|
||||||
|
dbgInstance := &debug{
|
||||||
|
service: beam.NewServer(),
|
||||||
|
}
|
||||||
|
|
||||||
|
sender := beam.NewServer()
|
||||||
|
sender.OnSpawn(beam.Handler(dbgInstance.spawn))
|
||||||
|
return sender
|
||||||
|
}
|
||||||
|
|
||||||
|
// Function for forwarding a messgae including some minor error formatting
|
||||||
|
func forward(out beam.Sender, msg *beam.Message) (err error) {
|
||||||
|
if _, err := out.Send(msg); err != nil {
|
||||||
|
return fmt.Errorf("[debug] Failed to forward msg. Reason: %v\n", err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Debug service type
|
||||||
|
type debug struct {
|
||||||
|
service *beam.Server
|
||||||
|
in beam.Receiver
|
||||||
|
out beam.Sender
|
||||||
|
}
|
||||||
|
|
||||||
|
// Spawn will return a new instance as the Ret channel of the message sent back
|
||||||
|
func (dbg *debug) spawn(msg *beam.Message) (err error) {
|
||||||
|
// By sending back a task, beam will run the function with the in and out arguments
|
||||||
|
// set to the services present before and after this one in the pipeline.
|
||||||
|
instance := beam.Task(func(in beam.Receiver, out beam.Sender) {
|
||||||
|
// Setup our channels
|
||||||
|
dbg.in = in
|
||||||
|
dbg.out = out
|
||||||
|
|
||||||
|
// Set up the debug interceptor
|
||||||
|
dbg.service.Catchall(beam.Handler(dbg.catchall))
|
||||||
|
|
||||||
|
// Copy everything from the receiver to our service. By copying like this in the task
|
||||||
|
// we can use the catchall handler instead of handling the message here.
|
||||||
|
beam.Copy(dbg.service, in)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Inform the system of our new instance
|
||||||
|
msg.Ret.Send(&beam.Message{
|
||||||
|
Verb: beam.Ack,
|
||||||
|
Ret: instance,
|
||||||
|
})
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Catches all messages sent to the service
|
||||||
|
func (dbg *debug) catchall(msg *beam.Message) (err error) {
|
||||||
|
log.Printf("[debug] ---> Upstream Message { Verb: %s, Args: %v }\n", msg.Verb, msg.Args)
|
||||||
|
|
||||||
|
// If there's no output after us then we'll just reply with an error
|
||||||
|
// informing the receiver that the verb is not implemented.
|
||||||
|
if dbg.out == nil {
|
||||||
|
return fmt.Errorf("[debug] Verb: %s is not implemented.", msg.Verb)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The forwarded message has the return channel set to a new replyHandler. The replyHandler is a small
|
||||||
|
// callback that allows for interception of downstream messages.
|
||||||
|
forwardedMessage := &beam.Message{
|
||||||
|
Verb: msg.Verb,
|
||||||
|
Args: msg.Args,
|
||||||
|
Att: msg.Att,
|
||||||
|
Ret: &replyHandler{
|
||||||
|
out: msg.Ret,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send the forwarded message
|
||||||
|
if err := forward(dbg.out, forwardedMessage); err == nil {
|
||||||
|
// Hijack the return channel so we can avoid any interference with things such as close
|
||||||
|
msg.Ret = beam.NopSender{}
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// We use a replyHandler to provide context for relaying the return channel
|
||||||
|
// of the origin message.
|
||||||
|
type replyHandler struct {
|
||||||
|
out beam.Sender
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send a message using the out channel
|
||||||
|
func (rh *replyHandler) Send(msg *beam.Message) (receiver beam.Receiver, err error) {
|
||||||
|
log.Printf("[debug] <--- Downstream Message { Verb: %s, Args: %v }\n", msg.Verb, msg.Args)
|
||||||
|
return nil, forward(rh.out, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rh *replyHandler) Close() (err error) {
|
||||||
|
// Since we don't allow the downstream handler to close the return channel, we do so here.
|
||||||
|
return rh.out.Close()
|
||||||
|
}
|
Loading…
Reference in New Issue