Merge pull request #36 from shykes/beam-exec

This commit is contained in:
Solomon Hykes 2014-06-05 19:47:52 -07:00
commit c1e2ff7e89
4 changed files with 122 additions and 12 deletions

View File

@ -16,5 +16,6 @@ func New() *beam.Object {
backends.Bind("debug", Debug())
backends.Bind("fakeclient", FakeClient())
backends.Bind("forward", Forward())
backends.Bind("exec", Exec())
return beam.Obj(backends)
}

105
backends/exec.go Normal file
View File

@ -0,0 +1,105 @@
package backends
import (
"fmt"
"io"
"os/exec"
"encoding/json"
"bufio"
"strings"
"sync"
"github.com/docker/libswarm/beam"
)
func Exec() beam.Sender {
e := beam.NewServer()
e.OnSpawn(beam.Handler(func(msg *beam.Message) error {
if len(msg.Args) < 1 {
return fmt.Errorf("usage: SPAWN exec|... <config>")
}
if msg.Args[0] != "exec" {
return fmt.Errorf("invalid command: %s", msg.Args[0])
}
var config struct {
Path string
Args []string
}
if err := json.Unmarshal([]byte(msg.Args[1]), &config); err != nil {
config.Path = msg.Args[1]
config.Args = msg.Args[2:]
}
cmd := &command{
Cmd: exec.Command(config.Path, config.Args...),
Server: beam.NewServer(),
}
cmd.OnAttach(beam.Handler(func(msg *beam.Message) error {
stdout, err := cmd.StdoutPipe()
if err != nil {
return err
}
stdin, err := cmd.StdinPipe()
if err != nil {
return err
}
inR, inW := beam.Pipe()
if _, err := msg.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: inW}); err != nil {
return err
}
out := beam.Obj(msg.Ret)
go func() {
defer stdin.Close()
for {
msg, err := inR.Receive(0)
if err != nil {
return
}
if msg.Verb == beam.Log && len(msg.Args) > 0 {
fmt.Fprintf(stdin, "%s\n", strings.TrimRight(msg.Args[0], "\r\n"))
}
}
}()
cmd.tasks.Add(1)
go func() {
defer cmd.tasks.Done()
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
if scanner.Err() != io.EOF && scanner.Err() != nil {
return
}
if err := out.Log(scanner.Text()); err != nil {
out.Error("%v", err)
return
}
}
}()
cmd.tasks.Wait()
return nil
}))
cmd.OnStart(beam.Handler(func(msg *beam.Message) error {
cmd.tasks.Add(1)
if err := cmd.Cmd.Start(); err != nil {
return err
}
go func() {
defer cmd.tasks.Done()
if err := cmd.Cmd.Wait(); err != nil {
beam.Obj(msg.Ret).Log("%s exited status=%v", cmd.Cmd.Path, err)
}
}()
msg.Ret.Send(&beam.Message{Verb: beam.Ack})
return nil
}))
if _, err := msg.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: cmd}); err != nil {
return err
}
return nil
}))
return e
}
type command struct {
*exec.Cmd
*beam.Server
tasks sync.WaitGroup
}

View File

@ -1,6 +1,7 @@
package beam
import (
"io"
"sync"
)
@ -22,6 +23,9 @@ func Copy(dst Sender, src Receiver) (int, error) {
)
for {
msg, err := src.Receive(Ret)
if err == io.EOF {
return n, nil
}
if err != nil {
return n, err
}

View File

@ -42,37 +42,37 @@ func cmdDaemon(c *cli.Context) {
fmt.Println(strings.Join(names, "\n"))
return
}
var previousInstanceIn beam.Receiver
for _, backendArg := range c.Args() {
var previousInstanceR beam.Receiver
// FIXME: refactor into a Pipeline
for idx, backendArg := range c.Args() {
bName, bArgs, err := parseCmd(backendArg)
if err != nil {
Fatalf("parse: %v", err)
}
fmt.Printf("---> Loading backend '%s'\n", strings.Join(append([]string{bName}, bArgs...), " "))
_, backend, err := back.Attach(bName)
if err != nil {
Fatalf("%s: %v\n", bName, err)
}
fmt.Printf("---> Spawning\n")
instance, err := backend.Spawn(bArgs...)
if err != nil {
Fatalf("spawn %s: %v\n", bName, err)
}
fmt.Printf("---> Attaching\n")
instanceIn, instanceOut, err := instance.Attach("")
instanceR, instanceW, err := instance.Attach("")
if err != nil {
Fatalf("attach: %v", err)
}
fmt.Printf("---> Starting\n")
go func(r beam.Receiver, w beam.Sender, idx int) {
if r != nil {
beam.Copy(w, r)
}
w.Close()
}(previousInstanceR, instanceW, idx)
if err := instance.Start(); err != nil {
Fatalf("start: %v", err)
}
if previousInstanceIn != nil {
go beam.Copy(instanceOut, previousInstanceIn)
previousInstanceR = instanceR
}
previousInstanceIn = instanceIn
}
_, err := beam.Copy(app, previousInstanceIn)
_, err := beam.Copy(app, previousInstanceR)
if err != nil {
Fatalf("copy: %v", err)
}