From c7a78ae81e6c0ddcf6937aab5195c284c711dc99 Mon Sep 17 00:00:00 2001 From: Solomon Hykes Date: Mon, 2 Jun 2014 09:10:52 +0000 Subject: [PATCH] Simple fork-exec backend Signed-off-by: Solomon Hykes Signed-off-by: Ben Firshman Signed-off-by: Aanand Prasad --- backends/backends.go | 1 + backends/exec.go | 105 +++++++++++++++++++++++++++++++++++++++++++ swarmd/swarmd.go | 24 +++++----- 3 files changed, 118 insertions(+), 12 deletions(-) create mode 100644 backends/exec.go diff --git a/backends/backends.go b/backends/backends.go index 3358215b94..2f0f6dd7e0 100644 --- a/backends/backends.go +++ b/backends/backends.go @@ -16,5 +16,6 @@ func New() *beam.Object { backends.Bind("debug", Debug()) backends.Bind("fakeclient", FakeClient()) backends.Bind("forward", Forward()) + backends.Bind("exec", Exec()) return beam.Obj(backends) } diff --git a/backends/exec.go b/backends/exec.go new file mode 100644 index 0000000000..49251a39b3 --- /dev/null +++ b/backends/exec.go @@ -0,0 +1,105 @@ +package backends + +import ( + "fmt" + "io" + "os/exec" + "encoding/json" + "bufio" + "strings" + "sync" + + "github.com/docker/libswarm/beam" +) + +func Exec() beam.Sender { + e := beam.NewServer() + e.OnSpawn(beam.Handler(func(msg *beam.Message) error { + if len(msg.Args) < 1 { + return fmt.Errorf("usage: SPAWN exec|... ") + } + if msg.Args[0] != "exec" { + return fmt.Errorf("invalid command: %s", msg.Args[0]) + } + var config struct { + Path string + Args []string + } + if err := json.Unmarshal([]byte(msg.Args[1]), &config); err != nil { + config.Path = msg.Args[1] + config.Args = msg.Args[2:] + } + cmd := &command{ + Cmd: exec.Command(config.Path, config.Args...), + Server: beam.NewServer(), + } + cmd.OnAttach(beam.Handler(func(msg *beam.Message) error { + stdout, err := cmd.StdoutPipe() + if err != nil { + return err + } + stdin, err := cmd.StdinPipe() + if err != nil { + return err + } + inR, inW := beam.Pipe() + if _, err := msg.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: inW}); err != nil { + return err + } + out := beam.Obj(msg.Ret) + go func() { + defer stdin.Close() + for { + msg, err := inR.Receive(0) + if err != nil { + return + } + if msg.Verb == beam.Log && len(msg.Args) > 0 { + fmt.Fprintf(stdin, "%s\n", strings.TrimRight(msg.Args[0], "\r\n")) + } + } + }() + cmd.tasks.Add(1) + go func() { + defer cmd.tasks.Done() + scanner := bufio.NewScanner(stdout) + for scanner.Scan() { + if scanner.Err() != io.EOF && scanner.Err() != nil { + return + } + if err := out.Log(scanner.Text()); err != nil { + out.Error("%v", err) + return + } + } + }() + cmd.tasks.Wait() + return nil + })) + cmd.OnStart(beam.Handler(func(msg *beam.Message) error { + cmd.tasks.Add(1) + if err := cmd.Cmd.Start(); err != nil { + return err + } + go func() { + defer cmd.tasks.Done() + if err := cmd.Cmd.Wait(); err != nil { + beam.Obj(msg.Ret).Log("%s exited status=%v", cmd.Cmd.Path, err) + } + }() + msg.Ret.Send(&beam.Message{Verb: beam.Ack}) + return nil + })) + if _, err := msg.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: cmd}); err != nil { + return err + } + return nil + })) + return e +} + +type command struct { + *exec.Cmd + *beam.Server + tasks sync.WaitGroup +} diff --git a/swarmd/swarmd.go b/swarmd/swarmd.go index 79099d374c..dccecdb080 100644 --- a/swarmd/swarmd.go +++ b/swarmd/swarmd.go @@ -42,37 +42,37 @@ func cmdDaemon(c *cli.Context) { fmt.Println(strings.Join(names, "\n")) return } - var previousInstanceIn beam.Receiver - for _, backendArg := range c.Args() { + var previousInstanceR beam.Receiver + // FIXME: refactor into a Pipeline + for idx, backendArg := range c.Args() { bName, bArgs, err := parseCmd(backendArg) if err != nil { Fatalf("parse: %v", err) } - fmt.Printf("---> Loading backend '%s'\n", strings.Join(append([]string{bName}, bArgs...), " ")) _, backend, err := back.Attach(bName) if err != nil { Fatalf("%s: %v\n", bName, err) } - fmt.Printf("---> Spawning\n") instance, err := backend.Spawn(bArgs...) if err != nil { Fatalf("spawn %s: %v\n", bName, err) } - fmt.Printf("---> Attaching\n") - instanceIn, instanceOut, err := instance.Attach("") + instanceR, instanceW, err := instance.Attach("") if err != nil { Fatalf("attach: %v", err) } - fmt.Printf("---> Starting\n") + go func(r beam.Receiver, w beam.Sender, idx int) { + if r != nil { + beam.Copy(w, r) + } + w.Close() + }(previousInstanceR, instanceW, idx) if err := instance.Start(); err != nil { Fatalf("start: %v", err) } - if previousInstanceIn != nil { - go beam.Copy(instanceOut, previousInstanceIn) - } - previousInstanceIn = instanceIn + previousInstanceR = instanceR } - _, err := beam.Copy(app, previousInstanceIn) + _, err := beam.Copy(app, previousInstanceR) if err != nil { Fatalf("copy: %v", err) }