diff --git a/backends/backends.go b/backends/backends.go index 3358215b94..2f0f6dd7e0 100644 --- a/backends/backends.go +++ b/backends/backends.go @@ -16,5 +16,6 @@ func New() *beam.Object { backends.Bind("debug", Debug()) backends.Bind("fakeclient", FakeClient()) backends.Bind("forward", Forward()) + backends.Bind("exec", Exec()) return beam.Obj(backends) } diff --git a/backends/exec.go b/backends/exec.go new file mode 100644 index 0000000000..49251a39b3 --- /dev/null +++ b/backends/exec.go @@ -0,0 +1,105 @@ +package backends + +import ( + "fmt" + "io" + "os/exec" + "encoding/json" + "bufio" + "strings" + "sync" + + "github.com/docker/libswarm/beam" +) + +func Exec() beam.Sender { + e := beam.NewServer() + e.OnSpawn(beam.Handler(func(msg *beam.Message) error { + if len(msg.Args) < 1 { + return fmt.Errorf("usage: SPAWN exec|... ") + } + if msg.Args[0] != "exec" { + return fmt.Errorf("invalid command: %s", msg.Args[0]) + } + var config struct { + Path string + Args []string + } + if err := json.Unmarshal([]byte(msg.Args[1]), &config); err != nil { + config.Path = msg.Args[1] + config.Args = msg.Args[2:] + } + cmd := &command{ + Cmd: exec.Command(config.Path, config.Args...), + Server: beam.NewServer(), + } + cmd.OnAttach(beam.Handler(func(msg *beam.Message) error { + stdout, err := cmd.StdoutPipe() + if err != nil { + return err + } + stdin, err := cmd.StdinPipe() + if err != nil { + return err + } + inR, inW := beam.Pipe() + if _, err := msg.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: inW}); err != nil { + return err + } + out := beam.Obj(msg.Ret) + go func() { + defer stdin.Close() + for { + msg, err := inR.Receive(0) + if err != nil { + return + } + if msg.Verb == beam.Log && len(msg.Args) > 0 { + fmt.Fprintf(stdin, "%s\n", strings.TrimRight(msg.Args[0], "\r\n")) + } + } + }() + cmd.tasks.Add(1) + go func() { + defer cmd.tasks.Done() + scanner := bufio.NewScanner(stdout) + for scanner.Scan() { + if scanner.Err() != io.EOF && scanner.Err() != nil { + return + } + if err := out.Log(scanner.Text()); err != nil { + out.Error("%v", err) + return + } + } + }() + cmd.tasks.Wait() + return nil + })) + cmd.OnStart(beam.Handler(func(msg *beam.Message) error { + cmd.tasks.Add(1) + if err := cmd.Cmd.Start(); err != nil { + return err + } + go func() { + defer cmd.tasks.Done() + if err := cmd.Cmd.Wait(); err != nil { + beam.Obj(msg.Ret).Log("%s exited status=%v", cmd.Cmd.Path, err) + } + }() + msg.Ret.Send(&beam.Message{Verb: beam.Ack}) + return nil + })) + if _, err := msg.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: cmd}); err != nil { + return err + } + return nil + })) + return e +} + +type command struct { + *exec.Cmd + *beam.Server + tasks sync.WaitGroup +} diff --git a/beam/copy.go b/beam/copy.go index dd979dc55f..d62c03b8b4 100644 --- a/beam/copy.go +++ b/beam/copy.go @@ -1,6 +1,7 @@ package beam import ( + "io" "sync" ) @@ -22,6 +23,9 @@ func Copy(dst Sender, src Receiver) (int, error) { ) for { msg, err := src.Receive(Ret) + if err == io.EOF { + return n, nil + } if err != nil { return n, err } diff --git a/swarmd/swarmd.go b/swarmd/swarmd.go index 79099d374c..dccecdb080 100644 --- a/swarmd/swarmd.go +++ b/swarmd/swarmd.go @@ -42,37 +42,37 @@ func cmdDaemon(c *cli.Context) { fmt.Println(strings.Join(names, "\n")) return } - var previousInstanceIn beam.Receiver - for _, backendArg := range c.Args() { + var previousInstanceR beam.Receiver + // FIXME: refactor into a Pipeline + for idx, backendArg := range c.Args() { bName, bArgs, err := parseCmd(backendArg) if err != nil { Fatalf("parse: %v", err) } - fmt.Printf("---> Loading backend '%s'\n", strings.Join(append([]string{bName}, bArgs...), " ")) _, backend, err := back.Attach(bName) if err != nil { Fatalf("%s: %v\n", bName, err) } - fmt.Printf("---> Spawning\n") instance, err := backend.Spawn(bArgs...) if err != nil { Fatalf("spawn %s: %v\n", bName, err) } - fmt.Printf("---> Attaching\n") - instanceIn, instanceOut, err := instance.Attach("") + instanceR, instanceW, err := instance.Attach("") if err != nil { Fatalf("attach: %v", err) } - fmt.Printf("---> Starting\n") + go func(r beam.Receiver, w beam.Sender, idx int) { + if r != nil { + beam.Copy(w, r) + } + w.Close() + }(previousInstanceR, instanceW, idx) if err := instance.Start(); err != nil { Fatalf("start: %v", err) } - if previousInstanceIn != nil { - go beam.Copy(instanceOut, previousInstanceIn) - } - previousInstanceIn = instanceIn + previousInstanceR = instanceR } - _, err := beam.Copy(app, previousInstanceIn) + _, err := beam.Copy(app, previousInstanceR) if err != nil { Fatalf("copy: %v", err) }