swarmd: remove unused engine/beam adapters.

Signed-off-by: Solomon Hykes <solomon@docker.com>
This commit is contained in:
Solomon Hykes 2014-06-02 06:09:44 +00:00
parent a1dcc4aceb
commit f556ed19f7
2 changed files with 0 additions and 123 deletions

View File

@ -1,21 +1,17 @@
package main
import (
"bufio"
"fmt"
"log"
"github.com/codegangsta/cli"
"github.com/docker/libswarm/beam"
"github.com/docker/libswarm/beam/inmem"
beamutils "github.com/docker/libswarm/beam/utils"
"github.com/docker/libswarm/backends"
_ "github.com/dotcloud/docker/api/server"
"github.com/dotcloud/docker/engine"
"github.com/flynn/go-shlex"
"io"
"os"
"strings"
"sync"
)
func main() {
@ -30,125 +26,6 @@ func main() {
app.Run(os.Args)
}
func EngineAsSender(eng *engine.Engine) beam.Sender {
r, w := inmem.Pipe()
go func() {
for {
msg, msgr, msgw, err := r.Receive(beam.R | beam.W)
if err != nil {
return
}
go func(msg *beam.Message, in beam.Receiver, out beam.Sender) {
job := eng.Job(msg.Name, msg.Args...)
stdout, _ := job.Stdout.AddPipe() // can't fail
stderr, _ := job.Stderr.AddPipe() // can't fail
stdinR, stdinW := io.Pipe()
defer stdinR.Close()
defer stdinW.Close()
job.Stdin.Add(stdinR)
log := func(src io.Reader) {
scanner := bufio.NewScanner(src)
for scanner.Scan() {
if scanner.Err() != nil {
return
}
if _, _, err := out.Send(&beam.Message{Name: "log", Args: []string{scanner.Text()}}, 0); err != nil {
return
}
}
}
var tasks sync.WaitGroup
tasks.Add(3)
go func() {
// Read from stdout, send "log" events
defer tasks.Done()
log(stdout)
}()
go func() {
// Read from stderr, send "log" events
// FIXME: how to differentiate stderr/stdout logs?
defer tasks.Done()
log(stderr)
}()
go func() {
// Receive events, send "log" events to stdin
defer tasks.Done()
for {
m, _, _, err := in.Receive(0)
if err != nil {
return
}
if m.Name == "log" {
if len(m.Args) < 1 {
continue
}
fmt.Fprintf(stdinW, "%s\n", strings.TrimRight(m.Args[0], "\r\n"))
}
}
}()
err := job.Run()
if err != nil {
out.Send(&beam.Message{Name: "error", Args: []string{err.Error()}}, 0)
}
}(msg, msgr, msgw)
}
}()
return w
}
func SenderAsEngine(s beam.Sender) *engine.Engine {
eng := engine.New()
eng.RegisterCatchall(func(job *engine.Job) engine.Status {
msg := &beam.Message{
Name: job.Name,
Args: job.Args,
}
// FIXME: serialize job.Env into a trailing argument
r, w, err := s.Send(msg, beam.R|beam.W)
if err != nil {
return job.Errorf("beam send: %v", err)
}
var tasks sync.WaitGroup
tasks.Add(1)
go func() {
defer tasks.Done()
in := bufio.NewScanner(job.Stdin)
for in.Scan() {
_, _, err := w.Send(&beam.Message{Name: "log", Args: []string{in.Text()}}, 0)
if err != nil {
return
}
}
}()
tasks.Add(1)
var status engine.Status = engine.StatusOK
go func() {
defer tasks.Done()
for {
msg, _, _, err := r.Receive(0)
if err != nil {
return
}
if msg.Name == "log" {
if len(msg.Args) < 1 {
continue
}
fmt.Fprintf(job.Stdout, "%s\n", strings.TrimRight(msg.Args[0], "\r\n"))
} else if msg.Name == "error" {
status = engine.StatusErr
if len(msg.Args) < 1 {
continue
}
fmt.Fprintf(job.Stderr, "%s\n", strings.TrimRight(msg.Args[0], "\r\n"))
}
}
}()
tasks.Wait()
return status
})
return eng
}
func cmdDaemon(c *cli.Context) {
if len(c.Args()) == 0 {
Fatalf("Usage: %s <proto>://<address> [<proto>://<address>]...\n", c.App.Name)