Merge branch 'beam' into master

This commit is contained in:
Solomon Hykes 2014-06-07 03:27:25 +00:00
commit 475dd0f459
63 changed files with 5281 additions and 155 deletions

161
backends/apiserver.go Normal file
View File

@ -0,0 +1,161 @@
package backends
import (
"encoding/json"
"fmt"
"github.com/docker/libswarm/beam"
"github.com/dotcloud/docker/api"
"github.com/dotcloud/docker/pkg/version"
"github.com/gorilla/mux"
"net"
"net/http"
)
func ApiServer() beam.Sender {
backend := beam.NewServer()
backend.OnSpawn(beam.Handler(func(ctx *beam.Message) error {
instance := beam.Task(func(in beam.Receiver, out beam.Sender) {
err := listenAndServe("tcp", "0.0.0.0:4243", out)
if err != nil {
fmt.Printf("listenAndServe: %v", err)
}
})
_, err := ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: instance})
return err
}))
return backend
}
type HttpApiFunc func(out beam.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error
func listenAndServe(proto, addr string, out beam.Sender) error {
fmt.Println("Starting API server...")
r, err := createRouter(out)
if err != nil {
return err
}
l, err := net.Listen(proto, addr)
if err != nil {
return err
}
httpSrv := http.Server{Addr: addr, Handler: r}
return httpSrv.Serve(l)
}
func ping(out beam.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
_, err := w.Write([]byte{'O', 'K'})
return err
}
func getContainersJSON(out beam.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := r.ParseForm(); err != nil {
return err
}
o := beam.Obj(out)
names, err := o.Ls()
if err != nil {
return err
}
return writeJSON(w, http.StatusOK, names)
}
func postContainersStart(out beam.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if vars == nil {
return fmt.Errorf("Missing parameter")
}
// TODO: r.Body
name := vars["name"]
_, containerOut, err := beam.Obj(out).Attach(name)
container := beam.Obj(containerOut)
if err != nil {
return err
}
if err := container.Start(); err != nil {
return err
}
w.WriteHeader(http.StatusNoContent)
return nil
}
func postContainersStop(out beam.Sender, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if vars == nil {
return fmt.Errorf("Missing parameter")
}
name := vars["name"]
_, containerOut, err := beam.Obj(out).Attach(name)
container := beam.Obj(containerOut)
if err != nil {
return err
}
if err := container.Stop(); err != nil {
return err
}
w.WriteHeader(http.StatusNoContent)
return nil
}
func createRouter(out beam.Sender) (*mux.Router, error) {
r := mux.NewRouter()
m := map[string]map[string]HttpApiFunc{
"GET": {
"/_ping": ping,
"/containers/json": getContainersJSON,
},
"POST": {
"/containers/{name:.*}/start": postContainersStart,
"/containers/{name:.*}/stop": postContainersStop,
},
"DELETE": {},
"OPTIONS": {},
}
for method, routes := range m {
for route, fct := range routes {
localRoute := route
localFct := fct
localMethod := method
f := makeHttpHandler(out, localMethod, localRoute, localFct, version.Version("0.11.0"))
// add the new route
if localRoute == "" {
r.Methods(localMethod).HandlerFunc(f)
} else {
r.Path("/v{version:[0-9.]+}" + localRoute).Methods(localMethod).HandlerFunc(f)
r.Path(localRoute).Methods(localMethod).HandlerFunc(f)
}
}
}
return r, nil
}
func makeHttpHandler(out beam.Sender, localMethod string, localRoute string, handlerFunc HttpApiFunc, dockerVersion version.Version) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// log the request
fmt.Printf("Calling %s %s\n", localMethod, localRoute)
version := version.Version(mux.Vars(r)["version"])
if version == "" {
version = api.APIVERSION
}
if err := handlerFunc(out, version, w, r, mux.Vars(r)); err != nil {
fmt.Printf("Error: %s", err)
}
}
}
func writeJSON(w http.ResponseWriter, code int, v interface{}) error {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
enc := json.NewEncoder(w)
return enc.Encode(v)
}

View File

@ -1,7 +1,7 @@
package backends package backends
import ( import (
"github.com/dotcloud/docker/engine" "github.com/docker/libswarm/beam"
) )
// New returns a new engine, with all backends // New returns a new engine, with all backends
@ -10,15 +10,13 @@ import (
// engine, named after the desired backend. // engine, named after the desired backend.
// //
// Example: `New().Job("debug").Run()` // Example: `New().Job("debug").Run()`
func New() *engine.Engine { func New() *beam.Object {
back := engine.New() backends := beam.NewTree()
back.Logging = false backends.Bind("simulator", Simulator())
// Register all backends here backends.Bind("debug", Debug())
Debug().Install(back) backends.Bind("fakeclient", FakeClient())
Simulator().Install(back) backends.Bind("forward", Forward())
Forward().Install(back) backends.Bind("exec", Exec())
CloudBackend().Install(back) backends.Bind("apiserver", ApiServer())
Tutum().Install(back) return beam.Obj(backends)
Shipyard().Install(back)
return back
} }

View File

@ -2,32 +2,30 @@ package backends
import ( import (
"fmt" "fmt"
"github.com/dotcloud/docker/engine"
"strings" "strings"
"github.com/docker/libswarm/beam"
) )
func Debug() engine.Installer { func Debug() beam.Sender {
return &debug{} backend := beam.NewServer()
} backend.OnSpawn(beam.Handler(func(ctx *beam.Message) error {
instance := beam.Task(func(in beam.Receiver, out beam.Sender) {
type debug struct { for {
} msg, err := in.Receive(beam.Ret)
if err != nil {
func (d *debug) Install(eng *engine.Engine) error { fmt.Printf("debug receive: %v", err)
eng.Register("debug", func(job *engine.Job) engine.Status { return
job.Eng.RegisterCatchall(func(job *engine.Job) engine.Status { }
fmt.Printf("--> %s %s\n", job.Name, strings.Join(job.Args, " ")) fmt.Printf("[DEBUG] %s %s\n", msg.Verb, strings.Join(msg.Args, " "))
for k, v := range job.Env().Map() { if _, err := out.Send(msg); err != nil {
fmt.Printf(" %s=%s\n", k, v) fmt.Printf("debug send: %v", err)
return
}
} }
// This helps us detect the race condition if our time.Sleep
// missed it. (see comment in main)
if job.Name == "acceptconnections" {
panic("race condition in github.com/dotcloud/docker/api/server/ServeApi")
}
return engine.StatusOK
}) })
return engine.StatusOK _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: instance})
}) return err
return nil }))
return backend
} }

105
backends/exec.go Normal file
View File

@ -0,0 +1,105 @@
package backends
import (
"fmt"
"io"
"os/exec"
"encoding/json"
"bufio"
"strings"
"sync"
"github.com/docker/libswarm/beam"
)
func Exec() beam.Sender {
e := beam.NewServer()
e.OnSpawn(beam.Handler(func(msg *beam.Message) error {
if len(msg.Args) < 1 {
return fmt.Errorf("usage: SPAWN exec|... <config>")
}
if msg.Args[0] != "exec" {
return fmt.Errorf("invalid command: %s", msg.Args[0])
}
var config struct {
Path string
Args []string
}
if err := json.Unmarshal([]byte(msg.Args[1]), &config); err != nil {
config.Path = msg.Args[1]
config.Args = msg.Args[2:]
}
cmd := &command{
Cmd: exec.Command(config.Path, config.Args...),
Server: beam.NewServer(),
}
cmd.OnAttach(beam.Handler(func(msg *beam.Message) error {
stdout, err := cmd.StdoutPipe()
if err != nil {
return err
}
stdin, err := cmd.StdinPipe()
if err != nil {
return err
}
inR, inW := beam.Pipe()
if _, err := msg.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: inW}); err != nil {
return err
}
out := beam.Obj(msg.Ret)
go func() {
defer stdin.Close()
for {
msg, err := inR.Receive(0)
if err != nil {
return
}
if msg.Verb == beam.Log && len(msg.Args) > 0 {
fmt.Fprintf(stdin, "%s\n", strings.TrimRight(msg.Args[0], "\r\n"))
}
}
}()
cmd.tasks.Add(1)
go func() {
defer cmd.tasks.Done()
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
if scanner.Err() != io.EOF && scanner.Err() != nil {
return
}
if err := out.Log(scanner.Text()); err != nil {
out.Error("%v", err)
return
}
}
}()
cmd.tasks.Wait()
return nil
}))
cmd.OnStart(beam.Handler(func(msg *beam.Message) error {
cmd.tasks.Add(1)
if err := cmd.Cmd.Start(); err != nil {
return err
}
go func() {
defer cmd.tasks.Done()
if err := cmd.Cmd.Wait(); err != nil {
beam.Obj(msg.Ret).Log("%s exited status=%v", cmd.Cmd.Path, err)
}
}()
msg.Ret.Send(&beam.Message{Verb: beam.Ack})
return nil
}))
if _, err := msg.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: cmd}); err != nil {
return err
}
return nil
}))
return e
}
type command struct {
*exec.Cmd
*beam.Server
tasks sync.WaitGroup
}

30
backends/fakeclient.go Normal file
View File

@ -0,0 +1,30 @@
package backends
import (
"fmt"
"time"
"github.com/docker/libswarm/beam"
)
func FakeClient() beam.Sender {
backend := beam.NewServer()
backend.OnSpawn(beam.Handler(func(ctx *beam.Message) error {
// Instantiate a new fakeclient instance
instance := beam.Task(func(in beam.Receiver, out beam.Sender) {
fmt.Printf("fake client!\n")
defer fmt.Printf("end of fake client!\n")
o := beam.Obj(out)
o.Log("fake client starting")
defer o.Log("fake client terminating")
for {
time.Sleep(1 * time.Second)
o.Log("fake client heartbeat!")
}
})
_, err := ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: instance})
return err
}))
return backend
}

View File

@ -1,58 +1,199 @@
package backends package backends
import ( import (
"encoding/json"
"fmt" "fmt"
"github.com/docker/libswarm/beam"
"github.com/dotcloud/docker/engine" "github.com/dotcloud/docker/engine"
"github.com/dotcloud/docker/utils"
"io"
"io/ioutil" "io/ioutil"
"net"
"net/http" "net/http"
"net/http/httputil"
"net/url" "net/url"
"strings" "strings"
"time"
) )
func Forward() engine.Installer { func Forward() beam.Sender {
return &forwarder{} backend := beam.NewServer()
backend.OnSpawn(beam.Handler(func(ctx *beam.Message) error {
if len(ctx.Args) != 1 {
return fmt.Errorf("forward: spawn takes exactly 1 argument, got %d", len(ctx.Args))
}
client, err := newClient(ctx.Args[0], "v1.11")
if err != nil {
return fmt.Errorf("%v", err)
}
f := &forwarder{
client: client,
Server: beam.NewServer(),
}
f.Server.OnAttach(beam.Handler(f.attach))
f.Server.OnStart(beam.Handler(f.start))
f.Server.OnLs(beam.Handler(f.ls))
f.Server.OnSpawn(beam.Handler(f.spawn))
_, err = ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: f.Server})
return err
}))
return backend
} }
type forwarder struct { type forwarder struct {
client *client
*beam.Server
} }
func (f *forwarder) Install(eng *engine.Engine) error { func (f *forwarder) attach(ctx *beam.Message) error {
eng.Register("forward", func(job *engine.Job) engine.Status { if ctx.Args[0] == "" {
if len(job.Args) != 1 { ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: f.Server})
return job.Errorf("usage: %s <proto>://<addr>", job.Name) for {
time.Sleep(1 * time.Second)
(&beam.Object{ctx.Ret}).Log("forward: heartbeat")
}
} else {
c := f.newContainer(ctx.Args[0])
ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: c})
}
return nil
}
func (f *forwarder) start(ctx *beam.Message) error {
ctx.Ret.Send(&beam.Message{Verb: beam.Ack})
return nil
}
func (f *forwarder) ls(ctx *beam.Message) error {
resp, err := f.client.call("GET", "/containers/json", "")
if err != nil {
return fmt.Errorf("%s: get: %v", f.client.URL.String(), err)
}
// FIXME: check for response error
c := engine.NewTable("Created", 0)
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("%s: read body: %v", f.client.URL.String(), err)
}
if _, err := c.ReadListFrom(body); err != nil {
return fmt.Errorf("%s: readlist: %v", f.client.URL.String(), err)
}
names := []string{}
for _, env := range c.Data {
names = append(names, env.GetList("Names")[0][1:])
}
if _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Set, Args: names}); err != nil {
return fmt.Errorf("%s: send response: %v", f.client.URL.String(), err)
}
return nil
}
func (f *forwarder) spawn(ctx *beam.Message) error {
if len(ctx.Args) != 1 {
return fmt.Errorf("forward: spawn takes exactly 1 argument, got %d", len(ctx.Args))
}
resp, err := f.client.call("POST", "/containers/create", ctx.Args[0])
if err != nil {
return err
}
respBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
if resp.StatusCode != 201 {
return fmt.Errorf("expected status code 201, got %d:\n%s", resp.StatusCode, respBody)
}
var respJson struct{ Id string }
if err = json.Unmarshal(respBody, &respJson); err != nil {
return err
}
c := f.newContainer(respJson.Id)
if _, err = ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: c}); err != nil {
return err
}
return nil
}
func (f *forwarder) newContainer(id string) beam.Sender {
c := &container{forwarder: f, id: id}
instance := beam.NewServer()
instance.OnAttach(beam.Handler(c.attach))
instance.OnStart(beam.Handler(c.start))
instance.OnStop(beam.Handler(c.stop))
return instance
}
type container struct {
forwarder *forwarder
id string
}
func (c *container) attach(ctx *beam.Message) error {
if _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Ack}); err != nil {
return err
}
path := fmt.Sprintf("/containers/%s/attach?stdout=1&stderr=1&stream=1", c.id)
stdoutR, stdoutW := io.Pipe()
stderrR, stderrW := io.Pipe()
go copyOutput(ctx.Ret, stdoutR, "stdout")
go copyOutput(ctx.Ret, stderrR, "stderr")
c.forwarder.client.hijack("POST", path, nil, stdoutW, stderrW)
return nil
}
func copyOutput(sender beam.Sender, reader io.Reader, tag string) {
chunk := make([]byte, 4096)
for {
n, err := reader.Read(chunk)
if n > 0 {
sender.Send(&beam.Message{Verb: beam.Log, Args: []string{tag, string(chunk[0:n])}})
} }
client, err := newClient(job.Args[0], "v0.10")
if err != nil { if err != nil {
return job.Errorf("%v", err) message := fmt.Sprintf("Error reading from stream: %v", err)
sender.Send(&beam.Message{Verb: beam.Error, Args: []string{message}})
break
} }
job.Eng.Register("containers", func(job *engine.Job) engine.Status { }
path := fmt.Sprintf( }
"/containers/json?all=%s&size=%s&since=%s&before=%s&limit=%s",
url.QueryEscape(job.Getenv("all")), func (c *container) start(ctx *beam.Message) error {
url.QueryEscape(job.Getenv("size")), path := fmt.Sprintf("/containers/%s/start", c.id)
url.QueryEscape(job.Getenv("since")), resp, err := c.forwarder.client.call("POST", path, "{}")
url.QueryEscape(job.Getenv("before")), if err != nil {
url.QueryEscape(job.Getenv("limit")), return err
) }
resp, err := client.call("GET", path, "") respBody, err := ioutil.ReadAll(resp.Body)
if err != nil { if err != nil {
return job.Errorf("%s: get: %v", client.URL.String(), err) return err
} }
// FIXME: check for response error if resp.StatusCode != 204 {
c := engine.NewTable("Created", 0) return fmt.Errorf("expected status code 204, got %d:\n%s", resp.StatusCode, respBody)
body, err := ioutil.ReadAll(resp.Body) }
if err != nil { if _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Ack}); err != nil {
return job.Errorf("%s: read body: %v", client.URL.String(), err) return err
} }
fmt.Printf("---> '%s'\n", body) return nil
if _, err := c.ReadListFrom(body); err != nil { }
return job.Errorf("%s: readlist: %v", client.URL.String(), err)
} func (c *container) stop(ctx *beam.Message) error {
c.WriteListTo(job.Stdout) path := fmt.Sprintf("/containers/%s/stop", c.id)
return engine.StatusOK resp, err := c.forwarder.client.call("POST", path, "{}")
}) if err != nil {
return engine.StatusOK return err
}) }
respBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
if resp.StatusCode != 204 {
return fmt.Errorf("expected status code 204, got %d:\n%s", resp.StatusCode, respBody)
}
if _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Ack}); err != nil {
return err
}
return nil return nil
} }
@ -77,6 +218,7 @@ func newClient(peer, version string) (*client, error) {
} }
func (c *client) call(method, path, body string) (*http.Response, error) { func (c *client) call(method, path, body string) (*http.Response, error) {
path = fmt.Sprintf("/%s%s", c.version, path)
u, err := url.Parse(path) u, err := url.Parse(path)
if err != nil { if err != nil {
return nil, err return nil, err
@ -93,3 +235,61 @@ func (c *client) call(method, path, body string) (*http.Response, error) {
} }
return resp, nil return resp, nil
} }
func (c *client) hijack(method, path string, in io.ReadCloser, stdout, stderr io.Writer) error {
path = fmt.Sprintf("/%s%s", c.version, path)
dial, err := net.Dial("tcp", c.URL.Host)
if err != nil {
return err
}
req, err := http.NewRequest(method, path, nil)
if err != nil {
return err
}
clientconn := httputil.NewClientConn(dial, nil)
defer clientconn.Close()
clientconn.Do(req)
rwc, br := clientconn.Hijack()
defer rwc.Close()
receiveStdout := utils.Go(func() (err error) {
defer func() {
if in != nil {
in.Close()
}
}()
_, err = utils.StdCopy(stdout, stderr, br)
utils.Debugf("[hijack] End of stdout")
return err
})
sendStdin := utils.Go(func() error {
if in != nil {
io.Copy(rwc, in)
utils.Debugf("[hijack] End of stdin")
}
if tcpc, ok := rwc.(*net.TCPConn); ok {
if err := tcpc.CloseWrite(); err != nil {
utils.Debugf("Couldn't send EOF: %s", err)
}
} else if unixc, ok := rwc.(*net.UnixConn); ok {
if err := unixc.CloseWrite(); err != nil {
utils.Debugf("Couldn't send EOF: %s", err)
}
}
// Discard errors due to pipe interruption
return nil
})
if err := <-receiveStdout; err != nil {
utils.Debugf("Error receiveStdout: %s", err)
return err
}
if err := <-sendStdin; err != nil {
utils.Debugf("Error sendStdin: %s", err)
return err
}
return nil
}

View File

@ -1,32 +1,25 @@
package backends package backends
import ( import (
"github.com/dotcloud/docker/engine" "github.com/docker/libswarm/beam"
) )
func Simulator() engine.Installer { func Simulator() beam.Sender {
return &simulator{} s := beam.NewServer()
} s.OnSpawn(beam.Handler(func(ctx *beam.Message) error {
containers := ctx.Args
type simulator struct { instance := beam.Task(func(in beam.Receiver, out beam.Sender) {
containers []string beam.Obj(out).Log("[simulator] starting\n")
} s := beam.NewServer()
s.OnLs(beam.Handler(func(msg *beam.Message) error {
func (s *simulator) Install(eng *engine.Engine) error { beam.Obj(out).Log("[simulator] generating fake list of objects...\n")
eng.Register("simulator", func(job *engine.Job) engine.Status { beam.Obj(msg.Ret).Set(containers...)
s.containers = job.Args return nil
job.Eng.Register("containers", func(job *engine.Job) engine.Status { }))
t := engine.NewTable("Id", len(s.containers)) beam.Copy(s, in)
for _, c := range s.containers {
e := &engine.Env{}
e.Set("Id", c)
e.Set("Image", "foobar")
t.Add(e)
}
t.WriteListTo(job.Stdout)
return engine.StatusOK
}) })
return engine.StatusOK ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: instance})
}) return nil
return nil }))
return s
} }

1
beam/AUTHORS Normal file
View File

@ -0,0 +1 @@
Solomon Hykes <solomon@docker.com>

1
beam/MAINTAINERS Normal file
View File

@ -0,0 +1 @@
Solomon Hykes <solomon@docker.com> (@shykes)

79
beam/README.md Normal file
View File

@ -0,0 +1,79 @@
# Beam
## A library to break down an application into loosely coupled micro-services
Beam is a library to turn your application into a collection of loosely coupled micro-services.
It implements an ultra-lightweight hub for the different components of an application
to discover and consume each other, either in-memory or across the network.
Beam can be embedded with very little overhead by using Go channels. It
also implements an efficient http2/tls transport which can be used to
securely expose and consume any micro-service across a distributed system.
Because remote Beam sessions are regular HTTP2 over TLS sessions, they can
be used in combination with any standard proxy or authentication
middleware. This means Beam, when configured propely, can be safely exposed
on the public Internet. It can also be embedded in an existing rest API
using an http1 and websocket fallback.
## How is it different from RPC or REST?
Modern micro-services are not a great fit for classical RPC or REST
protocols because they often rely heavily on events, bi-directional
communication, stream multiplexing, and some form of data synchronization.
Sometimes these services have a component which requires raw socket access,
either for performance (file transfer, event firehose, database access) or
simply because they have their own protocol (dns, smtp, sql, ssh,
zeromq, etc). These components typically need a separate set of tools
because they are outside the scope of the REST and RPC tools. If there is
also a websocket or ServerEvents transport, those require yet another layer
of tools.
Instead of a clunky patchwork of tools, Beam implements in a single
minimalistic library all the primitives needed by modern micro-services:
* Request/response with arbitrary structured data
* Asynchronous events flowing in real-time in both directions
* Requests and responses can flow in any direction, and can be arbitrarily
nested, for example to implement a self-registering worker model
* Any request or response can include any number of streams, multiplexed in
both directions on the same session.
* Any message serialization format can be plugged in: json, msgpack, xml,
protobuf.
* As an optional convenience a minimalist key-value format is implemented.
It is designed to be extremely fast to serialize and parse, dead-simple to
implement, and suitable for both one-time data copy, file storage, and
real-time synchronization.
* Raw file descriptors can be "attached" to any message, and passed under
the hood using the best method available to each transport. The Go channel
transport just passes os.File pointers around. The unix socket transport
uses fd passing which makes it suitable for high-performance IPC. The
tcp transport uses dedicated http2 streams. And as a bonus extension, a
built-in tcp gateway can be used to proxy raw network sockets without
extra overhead. That means Beam services can be used as smart gateways to a
sql database, ssh or file transfer service, with unified auth, discovery
and tooling and without performance penalty.
## Design philosophy
An explicit goal of Beam is simplicity of implementation and clarity of
spec. Porting it to any language should be as effortless as humanly
possible.
## Creators
**Solomon Hykes**
- <http://twitter.com/solomonstre>
- <http://github.com/shykes>
## Copyright and license
Code and documentation copyright 2013-2014 Docker, inc. Code released under the Apache 2.0 license.
Docs released under Creative commons.

66
beam/beam.go Normal file
View File

@ -0,0 +1,66 @@
package beam
import (
"errors"
"os"
)
type Sender interface {
Send(msg *Message) (Receiver, error)
Close() error
}
type Receiver interface {
Receive(mode int) (*Message, error)
}
type Message struct {
Verb Verb
Args []string
Att *os.File
Ret Sender
}
const (
Ret int = 1 << iota
// FIXME: use an `Att` flag to auto-close attachments by default
)
type ReceiverFrom interface {
ReceiveFrom(Receiver) (int, error)
}
type SenderTo interface {
SendTo(Sender) (int, error)
}
var (
ErrIncompatibleSender = errors.New("incompatible sender")
ErrIncompatibleReceiver = errors.New("incompatible receiver")
)
// RetPipe is a special value for `Message.Ret`.
// When a Message is sent with `Ret=SendPipe`, the transport must
// substitute it with the writing end of a new pipe, and return the
// other end as a return value.
type retPipe struct {
NopSender
}
var RetPipe = retPipe{}
func (r retPipe) Equals(val Sender) bool {
if rval, ok := val.(retPipe); ok {
return rval == r
}
return false
}
func Repeater(payload *Message) Sender {
return Handler(func(msg *Message) error {
msg.Ret.Send(payload)
return nil
})
}
var NotImplemented = Repeater(&Message{Verb: Error, Args: []string{"not implemented"}})

20
beam/beam_test.go Normal file
View File

@ -0,0 +1,20 @@
package beam
import (
"testing"
)
func TestModes(t *testing.T) {
if Ret == 0 {
t.Fatalf("0")
}
}
func TestRetPipe(t *testing.T) {
var (
shouldBeEqual = RetPipe
)
if RetPipe != shouldBeEqual {
t.Fatalf("%#v should equal %#v", RetPipe, shouldBeEqual)
}
}

38
beam/copy.go Normal file
View File

@ -0,0 +1,38 @@
package beam
import (
"io"
"sync"
)
func Copy(dst Sender, src Receiver) (int, error) {
var tasks sync.WaitGroup
defer tasks.Wait()
if senderTo, ok := src.(SenderTo); ok {
if n, err := senderTo.SendTo(dst); err != ErrIncompatibleSender {
return n, err
}
}
if receiverFrom, ok := dst.(ReceiverFrom); ok {
if n, err := receiverFrom.ReceiveFrom(src); err != ErrIncompatibleReceiver {
return n, err
}
}
var (
n int
)
for {
msg, err := src.Receive(Ret)
if err == io.EOF {
return n, nil
}
if err != nil {
return n, err
}
if _, err := dst.Send(msg); err != nil {
return n, err
}
n++
}
return n, nil
}

119
beam/data/data.go Normal file
View File

@ -0,0 +1,119 @@
package data
import (
"fmt"
"strconv"
"strings"
)
func Encode(obj map[string][]string) string {
var msg string
msg += encodeHeader(0)
for k, values := range obj {
msg += encodeNamedList(k, values)
}
return msg
}
func encodeHeader(msgtype int) string {
return fmt.Sprintf("%03.3d;", msgtype)
}
func encodeString(s string) string {
return fmt.Sprintf("%d:%s,", len(s), s)
}
var EncodeString = encodeString
var DecodeString = decodeString
var EncodeList = encodeList
func encodeList(l []string) string {
values := make([]string, 0, len(l))
for _, s := range l {
values = append(values, encodeString(s))
}
return encodeString(strings.Join(values, ""))
}
func encodeNamedList(name string, l []string) string {
return encodeString(name) + encodeList(l)
}
func Decode(msg string) (map[string][]string, error) {
msgtype, skip, err := decodeHeader(msg)
if err != nil {
return nil, err
}
if msgtype != 0 {
// FIXME: use special error type so the caller can easily ignore
return nil, fmt.Errorf("unknown message type: %d", msgtype)
}
msg = msg[skip:]
obj := make(map[string][]string)
for len(msg) > 0 {
k, skip, err := decodeString(msg)
if err != nil {
return nil, err
}
msg = msg[skip:]
values, skip, err := decodeList(msg)
if err != nil {
return nil, err
}
msg = msg[skip:]
obj[k] = values
}
return obj, nil
}
var DecodeList = decodeList
func decodeList(msg string) ([]string, int, error) {
blob, skip, err := decodeString(msg)
if err != nil {
return nil, 0, err
}
var l []string
for len(blob) > 0 {
v, skipv, err := decodeString(blob)
if err != nil {
return nil, 0, err
}
l = append(l, v)
blob = blob[skipv:]
}
return l, skip, nil
}
func decodeString(msg string) (string, int, error) {
parts := strings.SplitN(msg, ":", 2)
if len(parts) != 2 {
return "", 0, fmt.Errorf("invalid format: no column")
}
var length int
if l, err := strconv.ParseUint(parts[0], 10, 64); err != nil {
return "", 0, err
} else {
length = int(l)
}
if len(parts[1]) < length+1 {
return "", 0, fmt.Errorf("message '%s' is %d bytes, expected at least %d", parts[1], len(parts[1]), length+1)
}
payload := parts[1][:length+1]
if payload[length] != ',' {
return "", 0, fmt.Errorf("message is not comma-terminated")
}
return payload[:length], len(parts[0]) + 1 + length + 1, nil
}
func decodeHeader(msg string) (int, int, error) {
if len(msg) < 4 {
return 0, 0, fmt.Errorf("message too small")
}
msgtype, err := strconv.ParseInt(msg[:3], 10, 32)
if err != nil {
return 0, 0, err
}
return int(msgtype), 4, nil
}

129
beam/data/data_test.go Normal file
View File

@ -0,0 +1,129 @@
package data
import (
"strings"
"testing"
)
func TestEncodeHelloWorld(t *testing.T) {
input := "hello world!"
output := encodeString(input)
expectedOutput := "12:hello world!,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestEncodeEmptyString(t *testing.T) {
input := ""
output := encodeString(input)
expectedOutput := "0:,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestEncodeEmptyList(t *testing.T) {
input := []string{}
output := encodeList(input)
expectedOutput := "0:,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestEncodeEmptyMap(t *testing.T) {
input := make(map[string][]string)
output := Encode(input)
expectedOutput := "000;"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestEncode1Key1Value(t *testing.T) {
input := make(map[string][]string)
input["hello"] = []string{"world"}
output := Encode(input)
expectedOutput := "000;5:hello,8:5:world,,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestEncode1Key2Value(t *testing.T) {
input := make(map[string][]string)
input["hello"] = []string{"beautiful", "world"}
output := Encode(input)
expectedOutput := "000;5:hello,20:9:beautiful,5:world,,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestEncodeEmptyValue(t *testing.T) {
input := make(map[string][]string)
input["foo"] = []string{}
output := Encode(input)
expectedOutput := "000;3:foo,0:,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestEncodeBinaryKey(t *testing.T) {
input := make(map[string][]string)
input["foo\x00bar\x7f"] = []string{}
output := Encode(input)
expectedOutput := "000;8:foo\x00bar\x7f,0:,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestEncodeBinaryValue(t *testing.T) {
input := make(map[string][]string)
input["foo\x00bar\x7f"] = []string{"\x01\x02\x03\x04"}
output := Encode(input)
expectedOutput := "000;8:foo\x00bar\x7f,7:4:\x01\x02\x03\x04,,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestDecodeString(t *testing.T) {
validEncodedStrings := []struct {
input string
output string
skip int
}{
{"3:foo,", "foo", 6},
{"5:hello,", "hello", 8},
{"5:hello,5:world,", "hello", 8},
}
for _, sample := range validEncodedStrings {
output, skip, err := decodeString(sample.input)
if err != nil {
t.Fatalf("error decoding '%v': %v", sample.input, err)
}
if skip != sample.skip {
t.Fatalf("invalid skip: %v!=%v", skip, sample.skip)
}
if output != sample.output {
t.Fatalf("invalid output: %v!=%v", output, sample.output)
}
}
}
func TestDecode1Key1Value(t *testing.T) {
input := "000;3:foo,6:3:bar,,"
output, err := Decode(input)
if err != nil {
t.Fatal(err)
}
if v, exists := output["foo"]; !exists {
t.Fatalf("wrong output: %v\n", output)
} else if len(v) != 1 || strings.Join(v, "") != "bar" {
t.Fatalf("wrong output: %v\n", output)
}
}

93
beam/data/message.go Normal file
View File

@ -0,0 +1,93 @@
package data
import (
"fmt"
"strings"
)
type Message string
func Empty() Message {
return Message(Encode(nil))
}
func Parse(args []string) Message {
data := make(map[string][]string)
for _, word := range args {
if strings.Contains(word, "=") {
kv := strings.SplitN(word, "=", 2)
key := kv[0]
var val string
if len(kv) == 2 {
val = kv[1]
}
data[key] = []string{val}
}
}
return Message(Encode(data))
}
func (m Message) Add(k, v string) Message {
data, err := Decode(string(m))
if err != nil {
return m
}
if values, exists := data[k]; exists {
data[k] = append(values, v)
} else {
data[k] = []string{v}
}
return Message(Encode(data))
}
func (m Message) Set(k string, v ...string) Message {
data, err := Decode(string(m))
if err != nil {
panic(err)
return m
}
data[k] = v
return Message(Encode(data))
}
func (m Message) Del(k string) Message {
data, err := Decode(string(m))
if err != nil {
panic(err)
return m
}
delete(data, k)
return Message(Encode(data))
}
func (m Message) Get(k string) []string {
data, err := Decode(string(m))
if err != nil {
return nil
}
v, exists := data[k]
if !exists {
return nil
}
return v
}
func (m Message) Pretty() string {
data, err := Decode(string(m))
if err != nil {
return ""
}
entries := make([]string, 0, len(data))
for k, values := range data {
entries = append(entries, fmt.Sprintf("%s=%s", k, strings.Join(values, ",")))
}
return strings.Join(entries, " ")
}
func (m Message) String() string {
return string(m)
}
func (m Message) Bytes() []byte {
return []byte(m)
}

53
beam/data/message_test.go Normal file
View File

@ -0,0 +1,53 @@
package data
import (
"testing"
)
func TestEmptyMessage(t *testing.T) {
m := Empty()
if m.String() != Encode(nil) {
t.Fatalf("%v != %v", m.String(), Encode(nil))
}
}
func TestSetMessage(t *testing.T) {
m := Empty().Set("foo", "bar")
output := m.String()
expectedOutput := "000;3:foo,6:3:bar,,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
decodedOutput, err := Decode(output)
if err != nil {
t.Fatal(err)
}
if len(decodedOutput) != 1 {
t.Fatalf("wrong output data: %#v\n", decodedOutput)
}
}
func TestSetMessageTwice(t *testing.T) {
m := Empty().Set("foo", "bar").Set("ga", "bu")
output := m.String()
expectedOutput := "000;3:foo,6:3:bar,,2:ga,5:2:bu,,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
decodedOutput, err := Decode(output)
if err != nil {
t.Fatal(err)
}
if len(decodedOutput) != 2 {
t.Fatalf("wrong output data: %#v\n", decodedOutput)
}
}
func TestSetDelMessage(t *testing.T) {
m := Empty().Set("foo", "bar").Del("foo")
output := m.String()
expectedOutput := Encode(nil)
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}

92
beam/data/netstring.txt Normal file
View File

@ -0,0 +1,92 @@
##
## Netstrings spec copied as-is from http://cr.yp.to/proto/netstrings.txt
##
Netstrings
D. J. Bernstein, djb@pobox.com
19970201
1. Introduction
A netstring is a self-delimiting encoding of a string. Netstrings are
very easy to generate and to parse. Any string may be encoded as a
netstring; there are no restrictions on length or on allowed bytes.
Another virtue of a netstring is that it declares the string size up
front. Thus an application can check in advance whether it has enough
space to store the entire string.
Netstrings may be used as a basic building block for reliable network
protocols. Most high-level protocols, in effect, transmit a sequence
of strings; those strings may be encoded as netstrings and then
concatenated into a sequence of characters, which in turn may be
transmitted over a reliable stream protocol such as TCP.
Note that netstrings can be used recursively. The result of encoding
a sequence of strings is a single string. A series of those encoded
strings may in turn be encoded into a single string. And so on.
In this document, a string of 8-bit bytes may be written in two
different forms: as a series of hexadecimal numbers between angle
brackets, or as a sequence of ASCII characters between double quotes.
For example, <68 65 6c 6c 6f 20 77 6f 72 6c 64 21> is a string of
length 12; it is the same as the string "hello world!".
Although this document restricts attention to strings of 8-bit bytes,
netstrings could be used with any 6-bit-or-larger character set.
2. Definition
Any string of 8-bit bytes may be encoded as [len]":"[string]",".
Here [string] is the string and [len] is a nonempty sequence of ASCII
digits giving the length of [string] in decimal. The ASCII digits are
<30> for 0, <31> for 1, and so on up through <39> for 9. Extra zeros
at the front of [len] are prohibited: [len] begins with <30> exactly
when [string] is empty.
For example, the string "hello world!" is encoded as <31 32 3a 68
65 6c 6c 6f 20 77 6f 72 6c 64 21 2c>, i.e., "12:hello world!,". The
empty string is encoded as "0:,".
[len]":"[string]"," is called a netstring. [string] is called the
interpretation of the netstring.
3. Sample code
The following C code starts with a buffer buf of length len and
prints it as a netstring.
if (printf("%lu:",len) < 0) barf();
if (fwrite(buf,1,len,stdout) < len) barf();
if (putchar(',') < 0) barf();
The following C code reads a netstring and decodes it into a
dynamically allocated buffer buf of length len.
if (scanf("%9lu",&len) < 1) barf(); /* >999999999 bytes is bad */
if (getchar() != ':') barf();
buf = malloc(len + 1); /* malloc(0) is not portable */
if (!buf) barf();
if (fread(buf,1,len,stdin) < len) barf();
if (getchar() != ',') barf();
Both of these code fragments assume that the local character set is
ASCII, and that the relevant stdio streams are in binary mode.
4. Security considerations
The famous Finger security hole may be blamed on Finger's use of the
CRLF encoding. In that encoding, each string is simply terminated by
CRLF. This encoding has several problems. Most importantly, it does
not declare the string size in advance. This means that a correct
CRLF parser must be prepared to ask for more and more memory as it is
reading the string. In the case of Finger, a lazy implementor found
this to be too much trouble; instead he simply declared a fixed-size
buffer and used C's gets() function. The rest is history.
In contrast, as the above sample code shows, it is very easy to
handle netstrings without risking buffer overflow. Thus widespread
use of netstrings may improve network security.

BIN
beam/examples/beamsh/beamsh Executable file

Binary file not shown.

View File

@ -0,0 +1,542 @@
package main
import (
"bufio"
"flag"
"fmt"
"github.com/dotcloud/docker/pkg/beam"
"github.com/dotcloud/docker/pkg/beam/data"
"github.com/dotcloud/docker/pkg/dockerscript"
"github.com/dotcloud/docker/pkg/term"
"io"
"net"
"net/url"
"os"
"path"
"strings"
"sync"
)
var rootPlugins = []string{
"stdio",
}
var (
flX bool
flPing bool
introspect beam.ReceiveSender = beam.Devnull()
)
func main() {
fd3 := os.NewFile(3, "beam-introspect")
if introsp, err := beam.FileConn(fd3); err == nil {
introspect = introsp
Logf("introspection enabled\n")
} else {
Logf("introspection disabled\n")
}
fd3.Close()
flag.BoolVar(&flX, "x", false, "print commands as they are being executed")
flag.Parse()
if flag.NArg() == 0 {
if term.IsTerminal(0) {
// No arguments, stdin is terminal --> interactive mode
input := bufio.NewScanner(os.Stdin)
for {
fmt.Printf("[%d] beamsh> ", os.Getpid())
if !input.Scan() {
break
}
line := input.Text()
if len(line) != 0 {
cmd, err := dockerscript.Parse(strings.NewReader(line))
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
continue
}
if err := executeRootScript(cmd); err != nil {
Fatal(err)
}
}
if err := input.Err(); err == io.EOF {
break
} else if err != nil {
Fatal(err)
}
}
} else {
// No arguments, stdin not terminal --> batch mode
script, err := dockerscript.Parse(os.Stdin)
if err != nil {
Fatal("parse error: %v\n", err)
}
if err := executeRootScript(script); err != nil {
Fatal(err)
}
}
} else {
// 1+ arguments: parse them as script files
for _, scriptpath := range flag.Args() {
f, err := os.Open(scriptpath)
if err != nil {
Fatal(err)
}
script, err := dockerscript.Parse(f)
if err != nil {
Fatal("parse error: %v\n", err)
}
if err := executeRootScript(script); err != nil {
Fatal(err)
}
}
}
}
func executeRootScript(script []*dockerscript.Command) error {
if len(rootPlugins) > 0 {
// If there are root plugins, wrap the script inside them
var (
rootCmd *dockerscript.Command
lastCmd *dockerscript.Command
)
for _, plugin := range rootPlugins {
pluginCmd := &dockerscript.Command{
Args: []string{plugin},
}
if rootCmd == nil {
rootCmd = pluginCmd
} else {
lastCmd.Children = []*dockerscript.Command{pluginCmd}
}
lastCmd = pluginCmd
}
lastCmd.Children = script
script = []*dockerscript.Command{rootCmd}
}
handlers, err := Handlers(introspect)
if err != nil {
return err
}
defer handlers.Close()
var tasks sync.WaitGroup
defer func() {
Debugf("Waiting for introspection...\n")
tasks.Wait()
Debugf("DONE Waiting for introspection\n")
}()
if introspect != nil {
tasks.Add(1)
go func() {
Debugf("starting introspection\n")
defer Debugf("done with introspection\n")
defer tasks.Done()
introspect.Send(data.Empty().Set("cmd", "log", "stdout").Set("message", "introspection worked!").Bytes(), nil)
Debugf("XXX starting reading introspection messages\n")
r := beam.NewRouter(handlers)
r.NewRoute().All().Handler(func(p []byte, a *os.File) error {
Logf("[INTROSPECTION] %s\n", beam.MsgDesc(p, a))
return handlers.Send(p, a)
})
n, err := beam.Copy(r, introspect)
Debugf("XXX done reading %d introspection messages: %v\n", n, err)
}()
}
if err := executeScript(handlers, script); err != nil {
return err
}
return nil
}
func executeScript(out beam.Sender, script []*dockerscript.Command) error {
Debugf("executeScript(%s)\n", scriptString(script))
defer Debugf("executeScript(%s) DONE\n", scriptString(script))
var background sync.WaitGroup
defer background.Wait()
for _, cmd := range script {
if cmd.Background {
background.Add(1)
go func(out beam.Sender, cmd *dockerscript.Command) {
executeCommand(out, cmd)
background.Done()
}(out, cmd)
} else {
if err := executeCommand(out, cmd); err != nil {
return err
}
}
}
return nil
}
// 1) Find a handler for the command (if no handler, fail)
// 2) Attach new in & out pair to the handler
// 3) [in the background] Copy handler output to our own output
// 4) [in the background] Run the handler
// 5) Recursively executeScript() all children commands and wait for them to complete
// 6) Wait for handler to return and (shortly afterwards) output copy to complete
// 7) Profit
func executeCommand(out beam.Sender, cmd *dockerscript.Command) error {
if flX {
fmt.Printf("+ %v\n", strings.Replace(strings.TrimRight(cmd.String(), "\n"), "\n", "\n+ ", -1))
}
Debugf("executeCommand(%s)\n", strings.Join(cmd.Args, " "))
defer Debugf("executeCommand(%s) DONE\n", strings.Join(cmd.Args, " "))
if len(cmd.Args) == 0 {
return fmt.Errorf("empty command")
}
Debugf("[executeCommand] sending job '%s'\n", strings.Join(cmd.Args, " "))
job, err := beam.SendConn(out, data.Empty().Set("cmd", cmd.Args...).Set("type", "job").Bytes())
if err != nil {
return fmt.Errorf("%v\n", err)
}
var tasks sync.WaitGroup
tasks.Add(1)
Debugf("[executeCommand] spawning background copy of the output of '%s'\n", strings.Join(cmd.Args, " "))
go func() {
if out != nil {
Debugf("[executeCommand] background copy of the output of '%s'\n", strings.Join(cmd.Args, " "))
n, err := beam.Copy(out, job)
if err != nil {
Fatalf("[executeCommand] [%s] error during background copy: %v\n", strings.Join(cmd.Args, " "), err)
}
Debugf("[executeCommand] background copy done of the output of '%s': copied %d messages\n", strings.Join(cmd.Args, " "), n)
}
tasks.Done()
}()
// depth-first execution of children commands
// executeScript() blocks until all commands are completed
Debugf("[executeCommand] recursively running children of '%s'\n", strings.Join(cmd.Args, " "))
executeScript(job, cmd.Children)
Debugf("[executeCommand] DONE recursively running children of '%s'\n", strings.Join(cmd.Args, " "))
job.CloseWrite()
Debugf("[executeCommand] closing the input of '%s' (all children are completed)\n", strings.Join(cmd.Args, " "))
Debugf("[executeCommand] waiting for background copy of '%s' to complete...\n", strings.Join(cmd.Args, " "))
tasks.Wait()
Debugf("[executeCommand] background copy of '%s' complete! This means the job completed.\n", strings.Join(cmd.Args, " "))
return nil
}
type Handler func([]string, io.Writer, io.Writer, beam.Receiver, beam.Sender)
func Handlers(sink beam.Sender) (*beam.UnixConn, error) {
var tasks sync.WaitGroup
pub, priv, err := beam.USocketPair()
if err != nil {
return nil, err
}
go func() {
defer func() {
Debugf("[handlers] closewrite() on endpoint\n")
// FIXME: this is not yet necessary but will be once
// there is synchronization over standard beam messages
priv.CloseWrite()
Debugf("[handlers] done closewrite() on endpoint\n")
}()
r := beam.NewRouter(sink)
r.NewRoute().HasAttachment().KeyIncludes("type", "job").Handler(func(payload []byte, attachment *os.File) error {
conn, err := beam.FileConn(attachment)
if err != nil {
attachment.Close()
return err
}
// attachment.Close()
tasks.Add(1)
go func() {
defer tasks.Done()
defer func() {
Debugf("[handlers] '%s' closewrite\n", payload)
conn.CloseWrite()
Debugf("[handlers] '%s' done closewrite\n", payload)
}()
cmd := data.Message(payload).Get("cmd")
Debugf("[handlers] received %s\n", strings.Join(cmd, " "))
if len(cmd) == 0 {
return
}
handler := GetHandler(cmd[0])
if handler == nil {
return
}
stdout, err := beam.SendRPipe(conn, data.Empty().Set("cmd", "log", "stdout").Set("fromcmd", cmd...).Bytes())
if err != nil {
return
}
defer stdout.Close()
stderr, err := beam.SendRPipe(conn, data.Empty().Set("cmd", "log", "stderr").Set("fromcmd", cmd...).Bytes())
if err != nil {
return
}
defer stderr.Close()
Debugf("[handlers] calling %s\n", strings.Join(cmd, " "))
handler(cmd, stdout, stderr, beam.Receiver(conn), beam.Sender(conn))
Debugf("[handlers] returned: %s\n", strings.Join(cmd, " "))
}()
return nil
})
beam.Copy(r, priv)
Debugf("[handlers] waiting for all tasks\n")
tasks.Wait()
Debugf("[handlers] all tasks returned\n")
}()
return pub, nil
}
func GetHandler(name string) Handler {
if name == "logger" {
return CmdLogger
} else if name == "render" {
return CmdRender
} else if name == "devnull" {
return CmdDevnull
} else if name == "prompt" {
return CmdPrompt
} else if name == "stdio" {
return CmdStdio
} else if name == "echo" {
return CmdEcho
} else if name == "pass" {
return CmdPass
} else if name == "in" {
return CmdIn
} else if name == "exec" {
return CmdExec
} else if name == "trace" {
return CmdTrace
} else if name == "emit" {
return CmdEmit
} else if name == "print" {
return CmdPrint
} else if name == "multiprint" {
return CmdMultiprint
} else if name == "listen" {
return CmdListen
} else if name == "beamsend" {
return CmdBeamsend
} else if name == "beamreceive" {
return CmdBeamreceive
} else if name == "connect" {
return CmdConnect
} else if name == "openfile" {
return CmdOpenfile
} else if name == "spawn" {
return CmdSpawn
} else if name == "chdir" {
return CmdChdir
}
return nil
}
// VARIOUS HELPER FUNCTIONS:
func connToFile(conn net.Conn) (f *os.File, err error) {
if connWithFile, ok := conn.(interface {
File() (*os.File, error)
}); !ok {
return nil, fmt.Errorf("no file descriptor available")
} else {
f, err = connWithFile.File()
if err != nil {
return nil, err
}
}
return f, err
}
type Msg struct {
payload []byte
attachment *os.File
}
func Logf(msg string, args ...interface{}) (int, error) {
if len(msg) == 0 || msg[len(msg)-1] != '\n' {
msg = msg + "\n"
}
msg = fmt.Sprintf("[%v] [%v] %s", os.Getpid(), path.Base(os.Args[0]), msg)
return fmt.Printf(msg, args...)
}
func Debugf(msg string, args ...interface{}) {
if os.Getenv("BEAMDEBUG") != "" {
Logf(msg, args...)
}
}
func Fatalf(msg string, args ...interface{}) {
Logf(msg, args...)
os.Exit(1)
}
func Fatal(args ...interface{}) {
Fatalf("%v", args[0])
}
func scriptString(script []*dockerscript.Command) string {
lines := make([]string, 0, len(script))
for _, cmd := range script {
line := strings.Join(cmd.Args, " ")
if len(cmd.Children) > 0 {
line += fmt.Sprintf(" { %s }", scriptString(cmd.Children))
} else {
line += " {}"
}
lines = append(lines, line)
}
return fmt.Sprintf("'%s'", strings.Join(lines, "; "))
}
func dialer(addr string) (chan net.Conn, error) {
u, err := url.Parse(addr)
if err != nil {
return nil, err
}
connections := make(chan net.Conn)
go func() {
defer close(connections)
for {
conn, err := net.Dial(u.Scheme, u.Host)
if err != nil {
return
}
connections <- conn
}
}()
return connections, nil
}
func listener(addr string) (chan net.Conn, error) {
u, err := url.Parse(addr)
if err != nil {
return nil, err
}
l, err := net.Listen(u.Scheme, u.Host)
if err != nil {
return nil, err
}
connections := make(chan net.Conn)
go func() {
defer close(connections)
for {
conn, err := l.Accept()
if err != nil {
return
}
Logf("new connection\n")
connections <- conn
}
}()
return connections, nil
}
func SendToConn(connections chan net.Conn, src beam.Receiver) error {
var tasks sync.WaitGroup
defer tasks.Wait()
for {
payload, attachment, err := src.Receive()
if err == io.EOF {
return nil
} else if err != nil {
return err
}
conn, ok := <-connections
if !ok {
break
}
Logf("Sending %s\n", msgDesc(payload, attachment))
tasks.Add(1)
go func(payload []byte, attachment *os.File, conn net.Conn) {
defer tasks.Done()
if _, err := conn.Write([]byte(data.EncodeString(string(payload)))); err != nil {
return
}
if attachment == nil {
conn.Close()
return
}
var iotasks sync.WaitGroup
iotasks.Add(2)
go func(attachment *os.File, conn net.Conn) {
defer iotasks.Done()
Debugf("copying the connection to [%d]\n", attachment.Fd())
io.Copy(attachment, conn)
attachment.Close()
Debugf("done copying the connection to [%d]\n", attachment.Fd())
}(attachment, conn)
go func(attachment *os.File, conn net.Conn) {
defer iotasks.Done()
Debugf("copying [%d] to the connection\n", attachment.Fd())
io.Copy(conn, attachment)
conn.Close()
Debugf("done copying [%d] to the connection\n", attachment.Fd())
}(attachment, conn)
iotasks.Wait()
}(payload, attachment, conn)
}
return nil
}
func msgDesc(payload []byte, attachment *os.File) string {
return beam.MsgDesc(payload, attachment)
}
func ReceiveFromConn(connections chan net.Conn, dst beam.Sender) error {
for conn := range connections {
err := func() error {
Logf("parsing message from network...\n")
defer Logf("done parsing message from network\n")
buf := make([]byte, 4098)
n, err := conn.Read(buf)
if n == 0 {
conn.Close()
if err == io.EOF {
return nil
} else {
return err
}
}
Logf("decoding message from '%s'\n", buf[:n])
header, skip, err := data.DecodeString(string(buf[:n]))
if err != nil {
conn.Close()
return err
}
pub, priv, err := beam.SocketPair()
if err != nil {
return err
}
Logf("decoded message: %s\n", data.Message(header).Pretty())
go func(skipped []byte, conn net.Conn, f *os.File) {
// this closes both conn and f
if len(skipped) > 0 {
if _, err := f.Write(skipped); err != nil {
Logf("ERROR: %v\n", err)
f.Close()
conn.Close()
return
}
}
bicopy(conn, f)
}(buf[skip:n], conn, pub)
if err := dst.Send([]byte(header), priv); err != nil {
return err
}
return nil
}()
if err != nil {
Logf("Error reading from connection: %v\n", err)
}
}
return nil
}
func bicopy(a, b io.ReadWriteCloser) {
var iotasks sync.WaitGroup
oneCopy := func(dst io.WriteCloser, src io.Reader) {
defer iotasks.Done()
io.Copy(dst, src)
dst.Close()
}
iotasks.Add(2)
go oneCopy(a, b)
go oneCopy(b, a)
iotasks.Wait()
}

View File

@ -0,0 +1,441 @@
package main
import (
"bufio"
"fmt"
"github.com/dotcloud/docker/pkg/beam"
"github.com/dotcloud/docker/pkg/beam/data"
"github.com/dotcloud/docker/pkg/term"
"github.com/dotcloud/docker/utils"
"io"
"net"
"net/url"
"os"
"os/exec"
"path"
"strings"
"sync"
"text/template"
)
func CmdLogger(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
if err := os.MkdirAll("logs", 0700); err != nil {
fmt.Fprintf(stderr, "%v\n", err)
return
}
var tasks sync.WaitGroup
defer tasks.Wait()
var n int = 1
r := beam.NewRouter(out)
r.NewRoute().HasAttachment().KeyStartsWith("cmd", "log").Handler(func(payload []byte, attachment *os.File) error {
tasks.Add(1)
go func(n int) {
defer tasks.Done()
defer attachment.Close()
var streamname string
if cmd := data.Message(payload).Get("cmd"); len(cmd) == 1 || cmd[1] == "stdout" {
streamname = "stdout"
} else {
streamname = cmd[1]
}
if fromcmd := data.Message(payload).Get("fromcmd"); len(fromcmd) != 0 {
streamname = fmt.Sprintf("%s-%s", strings.Replace(strings.Join(fromcmd, "_"), "/", "_", -1), streamname)
}
logfile, err := os.OpenFile(path.Join("logs", fmt.Sprintf("%d-%s", n, streamname)), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0700)
if err != nil {
fmt.Fprintf(stderr, "%v\n", err)
return
}
defer logfile.Close()
io.Copy(logfile, attachment)
logfile.Sync()
}(n)
n++
return nil
}).Tee(out)
if _, err := beam.Copy(r, in); err != nil {
fmt.Fprintf(stderr, "%v\n", err)
return
}
}
func CmdRender(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
if len(args) != 2 {
fmt.Fprintf(stderr, "Usage: %s FORMAT\n", args[0])
out.Send(data.Empty().Set("status", "1").Bytes(), nil)
return
}
txt := args[1]
if !strings.HasSuffix(txt, "\n") {
txt += "\n"
}
t := template.Must(template.New("render").Parse(txt))
for {
payload, attachment, err := in.Receive()
if err != nil {
return
}
msg, err := data.Decode(string(payload))
if err != nil {
fmt.Fprintf(stderr, "decode error: %v\n")
}
if err := t.Execute(stdout, msg); err != nil {
fmt.Fprintf(stderr, "rendering error: %v\n", err)
out.Send(data.Empty().Set("status", "1").Bytes(), nil)
return
}
if err := out.Send(payload, attachment); err != nil {
return
}
}
}
func CmdDevnull(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
for {
_, attachment, err := in.Receive()
if err != nil {
return
}
if attachment != nil {
attachment.Close()
}
}
}
func CmdPrompt(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
if len(args) < 2 {
fmt.Fprintf(stderr, "usage: %s PROMPT...\n", args[0])
return
}
if !term.IsTerminal(0) {
fmt.Fprintf(stderr, "can't prompt: no tty available...\n")
return
}
fmt.Printf("%s: ", strings.Join(args[1:], " "))
oldState, _ := term.SaveState(0)
term.DisableEcho(0, oldState)
line, _, err := bufio.NewReader(os.Stdin).ReadLine()
if err != nil {
fmt.Fprintln(stderr, err.Error())
return
}
val := string(line)
fmt.Printf("\n")
term.RestoreTerminal(0, oldState)
out.Send(data.Empty().Set("fromcmd", args...).Set("value", val).Bytes(), nil)
}
func CmdStdio(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
var tasks sync.WaitGroup
defer tasks.Wait()
r := beam.NewRouter(out)
r.NewRoute().HasAttachment().KeyStartsWith("cmd", "log").Handler(func(payload []byte, attachment *os.File) error {
tasks.Add(1)
go func() {
defer tasks.Done()
defer attachment.Close()
io.Copy(os.Stdout, attachment)
attachment.Close()
}()
return nil
}).Tee(out)
if _, err := beam.Copy(r, in); err != nil {
Fatal(err)
fmt.Fprintf(stderr, "%v\n", err)
return
}
}
func CmdEcho(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
fmt.Fprintln(stdout, strings.Join(args[1:], " "))
}
func CmdPass(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
for {
payload, attachment, err := in.Receive()
if err != nil {
return
}
if err := out.Send(payload, attachment); err != nil {
if attachment != nil {
attachment.Close()
}
return
}
}
}
func CmdSpawn(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
c := exec.Command(utils.SelfPath())
r, w, err := os.Pipe()
if err != nil {
fmt.Fprintf(stderr, "%v\n", err)
return
}
c.Stdin = r
c.Stdout = stdout
c.Stderr = stderr
go func() {
fmt.Fprintf(w, strings.Join(args[1:], " "))
w.Sync()
w.Close()
}()
if err := c.Run(); err != nil {
fmt.Fprintf(stderr, "%v\n", err)
return
}
}
func CmdIn(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
os.Chdir(args[1])
GetHandler("pass")([]string{"pass"}, stdout, stderr, in, out)
}
func CmdExec(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
cmd := exec.Command(args[1], args[2:]...)
cmd.Stdout = stdout
cmd.Stderr = stderr
//cmd.Stdin = os.Stdin
local, remote, err := beam.SocketPair()
if err != nil {
fmt.Fprintf(stderr, "%v\n", err)
return
}
child, err := beam.FileConn(local)
if err != nil {
local.Close()
remote.Close()
fmt.Fprintf(stderr, "%v\n", err)
return
}
local.Close()
cmd.ExtraFiles = append(cmd.ExtraFiles, remote)
var tasks sync.WaitGroup
tasks.Add(1)
go func() {
defer Debugf("done copying to child\n")
defer tasks.Done()
defer child.CloseWrite()
beam.Copy(child, in)
}()
tasks.Add(1)
go func() {
defer Debugf("done copying from child %d\n")
defer tasks.Done()
r := beam.NewRouter(out)
r.NewRoute().All().Handler(func(p []byte, a *os.File) error {
return out.Send(data.Message(p).Set("pid", fmt.Sprintf("%d", cmd.Process.Pid)).Bytes(), a)
})
beam.Copy(r, child)
}()
execErr := cmd.Run()
// We can close both ends of the socket without worrying about data stuck in the buffer,
// because unix socket writes are fully synchronous.
child.Close()
tasks.Wait()
var status string
if execErr != nil {
status = execErr.Error()
} else {
status = "ok"
}
out.Send(data.Empty().Set("status", status).Set("cmd", args...).Bytes(), nil)
}
func CmdTrace(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
r := beam.NewRouter(out)
r.NewRoute().All().Handler(func(payload []byte, attachment *os.File) error {
var sfd string = "nil"
if attachment != nil {
sfd = fmt.Sprintf("%d", attachment.Fd())
}
fmt.Printf("===> %s [%s]\n", data.Message(payload).Pretty(), sfd)
out.Send(payload, attachment)
return nil
})
beam.Copy(r, in)
}
func CmdEmit(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
out.Send(data.Parse(args[1:]).Bytes(), nil)
}
func CmdPrint(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
for {
payload, a, err := in.Receive()
if err != nil {
return
}
// Skip commands
if a != nil && data.Message(payload).Get("cmd") == nil {
dup, err := beam.SendRPipe(out, payload)
if err != nil {
a.Close()
return
}
io.Copy(io.MultiWriter(os.Stdout, dup), a)
dup.Close()
} else {
if err := out.Send(payload, a); err != nil {
return
}
}
}
}
func CmdMultiprint(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
var tasks sync.WaitGroup
defer tasks.Wait()
r := beam.NewRouter(out)
multiprint := func(p []byte, a *os.File) error {
tasks.Add(1)
go func() {
defer tasks.Done()
defer a.Close()
msg := data.Message(string(p))
input := bufio.NewScanner(a)
for input.Scan() {
fmt.Printf("[%s] %s\n", msg.Pretty(), input.Text())
}
}()
return nil
}
r.NewRoute().KeyIncludes("type", "job").Passthrough(out)
r.NewRoute().HasAttachment().Handler(multiprint).Tee(out)
beam.Copy(r, in)
}
func CmdListen(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
if len(args) != 2 {
out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil)
return
}
u, err := url.Parse(args[1])
if err != nil {
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
return
}
l, err := net.Listen(u.Scheme, u.Host)
if err != nil {
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
return
}
for {
conn, err := l.Accept()
if err != nil {
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
return
}
f, err := connToFile(conn)
if err != nil {
conn.Close()
continue
}
out.Send(data.Empty().Set("type", "socket").Set("remoteaddr", conn.RemoteAddr().String()).Bytes(), f)
}
}
func CmdBeamsend(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
if len(args) < 2 {
if err := out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil {
Fatal(err)
}
return
}
var connector func(string) (chan net.Conn, error)
connector = dialer
connections, err := connector(args[1])
if err != nil {
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
return
}
// Copy in to conn
SendToConn(connections, in)
}
func CmdBeamreceive(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
if len(args) != 2 {
if err := out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil {
Fatal(err)
}
return
}
var connector func(string) (chan net.Conn, error)
connector = listener
connections, err := connector(args[1])
if err != nil {
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
return
}
// Copy in to conn
ReceiveFromConn(connections, out)
}
func CmdConnect(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
if len(args) != 2 {
out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil)
return
}
u, err := url.Parse(args[1])
if err != nil {
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
return
}
var tasks sync.WaitGroup
for {
_, attachment, err := in.Receive()
if err != nil {
break
}
if attachment == nil {
continue
}
Logf("connecting to %s/%s\n", u.Scheme, u.Host)
conn, err := net.Dial(u.Scheme, u.Host)
if err != nil {
out.Send(data.Empty().Set("cmd", "msg", "connect error: "+err.Error()).Bytes(), nil)
return
}
out.Send(data.Empty().Set("cmd", "msg", "connection established").Bytes(), nil)
tasks.Add(1)
go func(attachment *os.File, conn net.Conn) {
defer tasks.Done()
// even when successful, conn.File() returns a duplicate,
// so we must close the original
var iotasks sync.WaitGroup
iotasks.Add(2)
go func(attachment *os.File, conn net.Conn) {
defer iotasks.Done()
io.Copy(attachment, conn)
}(attachment, conn)
go func(attachment *os.File, conn net.Conn) {
defer iotasks.Done()
io.Copy(conn, attachment)
}(attachment, conn)
iotasks.Wait()
conn.Close()
attachment.Close()
}(attachment, conn)
}
tasks.Wait()
}
func CmdOpenfile(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
for _, name := range args {
f, err := os.Open(name)
if err != nil {
continue
}
if err := out.Send(data.Empty().Set("path", name).Set("type", "file").Bytes(), f); err != nil {
f.Close()
}
}
}
func CmdChdir(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
os.Chdir(args[1])
}

View File

@ -0,0 +1,3 @@
#!/usr/bin/env beamsh
exec ls -l

View File

@ -0,0 +1,5 @@
#!/usr/bin/env beamsh
trace {
exec ls -l
}

View File

@ -0,0 +1,7 @@
#!/usr/bin/env beamsh
trace {
stdio {
exec ls -l
}
}

View File

@ -0,0 +1,10 @@
#!/usr/bin/env beamsh -x
trace outer {
# stdio fails
stdio {
trace inner {
exec ls -l
}
}
}

View File

@ -0,0 +1,9 @@
#!/usr/bin/env beamsh
stdio {
trace {
stdio {
exec ls -l
}
}
}

View File

@ -0,0 +1,6 @@
#!/usr/bin/env beamsh
stdio {
# exec fails
exec ls -l
}

View File

@ -0,0 +1,7 @@
#!/usr/bin/env beamsh
stdio {
trace {
echo hello
}
}

View File

@ -0,0 +1,6 @@
#!/usr/bin/env beamsh
stdio {
# exec fails
echo hello world
}

View File

@ -0,0 +1,9 @@
#!/usr/bin/env beamsh
devnull {
multiprint {
exec tail -f /var/log/system.log &
exec ls -l
exec ls ksdhfkjdshf jksdfhkjsdhf
}
}

View File

@ -0,0 +1,8 @@
#!/usr/bin/env beamsh
print {
trace {
emit msg=hello
emit msg=world
}
}

View File

@ -0,0 +1,9 @@
#!/usr/bin/env beamsh
trace {
log {
exec ls -l
exec ls /tmp/jhsdfkjhsdjkfhsdjkfhsdjkkhsdjkf
echo hello world
}
}

View File

@ -0,0 +1,9 @@
#!/usr/bin/env beamsh
multiprint {
trace {
listen tcp://localhost:7676 &
listen tcp://localhost:8787 &
}
}

31
beam/handler.go Normal file
View File

@ -0,0 +1,31 @@
package beam
import (
"fmt"
)
type Handler func(msg *Message) error
func (h Handler) Send(msg *Message) (Receiver, error) {
var ret Receiver
if RetPipe.Equals(msg.Ret) {
ret, msg.Ret = Pipe()
}
go func() {
// Ret must always be a valid Sender, so handlers
// can safely send to it
if msg.Ret == nil {
msg.Ret = NopSender{}
}
err := h(msg)
if err != nil {
Obj(msg.Ret).Error("%v", err)
}
msg.Ret.Close()
}()
return ret, nil
}
func (h Handler) Close() error {
return fmt.Errorf("can't close")
}

8
beam/http2/README.md Normal file
View File

@ -0,0 +1,8 @@
This package defines a remote transport for Beam services using http2/spdy and tls.
Uses https://github.com/docker/spdystream
Pointers:
* Low-level protocol framer: http://code.google.com/p/go.net/spdy
* (incomplete) high-level server implementation: https://github.com/shykes/spdy-go

86
beam/http2/listener.go Normal file
View File

@ -0,0 +1,86 @@
package http2
import (
"github.com/docker/libswarm/beam"
"github.com/docker/spdystream"
"net"
"sync"
)
type ListenSession struct {
listener net.Listener
streamChan chan *spdystream.Stream
streamLock sync.RWMutex
subStreamChans map[string]chan *spdystream.Stream
auth Authenticator
}
func NewListenSession(listener net.Listener, auth Authenticator) (*ListenSession, error) {
return &ListenSession{
listener: listener,
streamChan: make(chan *spdystream.Stream),
subStreamChans: make(map[string]chan *spdystream.Stream),
auth: auth,
}, nil
}
func (l *ListenSession) streamHandler(stream *spdystream.Stream) {
streamChan := l.getStreamChan(stream.Parent())
streamChan <- stream
}
func (l *ListenSession) addStreamChan(stream *spdystream.Stream, streamChan chan *spdystream.Stream) {
l.streamLock.Lock()
l.subStreamChans[stream.String()] = streamChan
l.streamLock.Unlock()
}
func (l *ListenSession) getStreamChan(stream *spdystream.Stream) chan *spdystream.Stream {
if stream == nil {
return l.streamChan
}
l.streamLock.RLock()
defer l.streamLock.RUnlock()
streamChan, ok := l.subStreamChans[stream.String()]
if ok {
return streamChan
}
return l.streamChan
}
func (l *ListenSession) Serve() {
for {
conn, err := l.listener.Accept()
if err != nil {
// TODO log
break
}
go func() {
authHandler, authErr := l.auth(conn)
if authErr != nil {
// TODO log
conn.Close()
return
}
spdyConn, spdyErr := spdystream.NewConnection(conn, true)
if spdyErr != nil {
// TODO log
conn.Close()
return
}
go spdyConn.Serve(l.streamHandler, authHandler)
}()
}
}
func (l *ListenSession) Shutdown() error {
return l.listener.Close()
}
func (l *ListenSession) Receive(mode int) (*beam.Message, error) {
stream := <-l.streamChan
return createStreamMessage(stream, mode, l, nil)
}

View File

@ -0,0 +1,83 @@
package http2
import (
"github.com/docker/libswarm/beam"
"io"
"net"
"testing"
)
func TestListenSession(t *testing.T) {
listen := "localhost:7743"
listener, listenErr := net.Listen("tcp", listen)
if listenErr != nil {
t.Fatalf("Error creating listener: %s", listenErr)
}
session, sessionErr := NewListenSession(listener, NoAuthenticator)
if sessionErr != nil {
t.Fatalf("Error creating session: %s", sessionErr)
}
go session.Serve()
end := make(chan bool)
go exerciseServer(t, listen, end)
msg, msgErr := session.Receive(beam.Ret)
if msgErr != nil {
t.Fatalf("Error receiving message: %s", msgErr)
}
if msg.Att == nil {
t.Fatalf("Error message missing attachment")
}
if msg.Verb != beam.Attach {
t.Fatalf("Wrong verb\nActual: %s\nExpecting: %s", msg.Verb, beam.Attach)
}
receiver, sendErr := msg.Ret.Send(&beam.Message{Verb: beam.Ack})
if sendErr != nil {
t.Fatalf("Error sending return message: %s", sendErr)
}
_, ackErr := receiver.Receive(0)
if ackErr == nil {
t.Fatalf("No error receiving from message with no return pipe")
}
if ackErr != io.EOF {
t.Fatalf("Unexpected error receiving from message: %s", ackErr)
}
<-end
shutdownErr := session.Shutdown()
if shutdownErr != nil {
t.Fatalf("Error shutting down: %s", shutdownErr)
}
}
func exerciseServer(t *testing.T, server string, endChan chan bool) {
defer close(endChan)
conn, connErr := net.Dial("tcp", server)
if connErr != nil {
t.Fatalf("Error dialing server: %s", connErr)
}
session, sessionErr := NewStreamSession(conn)
if sessionErr != nil {
t.Fatalf("Error creating session: %s", sessionErr)
}
receiver, sendErr := session.Send(&beam.Message{Verb: beam.Attach, Ret: beam.RetPipe})
if sendErr != nil {
t.Fatalf("Error sending message: %s", sendErr)
}
msg, receiveErr := receiver.Receive(beam.Ret)
if receiveErr != nil {
t.Fatalf("Error receiving message")
}
if msg.Verb != beam.Ack {
t.Fatalf("Wrong verb\nActual: %s\nExpecting: %s", msg.Verb, beam.Ack)
}
}

109
beam/http2/spdy.go Normal file
View File

@ -0,0 +1,109 @@
package http2
import (
"encoding/base64"
"fmt"
"github.com/docker/libswarm/beam"
"github.com/docker/libswarm/beam/data"
"github.com/docker/spdystream"
"io"
"net"
"net/http"
"os"
"syscall"
)
type Authenticator func(conn net.Conn) (spdystream.AuthHandler, error)
func NoAuthenticator(conn net.Conn) (spdystream.AuthHandler, error) {
return func(header http.Header, slot uint8, parent uint32) bool {
return true
}, nil
}
type streamChanProvider interface {
addStreamChan(stream *spdystream.Stream, streamChan chan *spdystream.Stream)
getStreamChan(stream *spdystream.Stream) chan *spdystream.Stream
}
func encodeArgs(args []string) string {
encoded := data.Encode(map[string][]string{"args": args})
return base64.URLEncoding.EncodeToString([]byte(encoded))
}
func decodeArgs(argString string) ([]string, error) {
decoded, decodeErr := base64.URLEncoding.DecodeString(argString)
if decodeErr != nil {
return []string{}, decodeErr
}
dataMap, dataErr := data.Decode(string(decoded))
if dataErr != nil {
return []string{}, dataErr
}
return dataMap["args"], nil
}
func createStreamMessage(stream *spdystream.Stream, mode int, streamChans streamChanProvider, ret beam.Sender) (*beam.Message, error) {
verbString := stream.Headers()["Verb"]
if len(verbString) != 1 {
if len(verbString) == 0 {
return nil, fmt.Errorf("Stream(%s) is missing verb header", stream)
} else {
return nil, fmt.Errorf("Stream(%s) has multiple verb headers", stream)
}
}
verb, verbOk := verbs[verbString[0]]
if !verbOk {
return nil, fmt.Errorf("Unknown verb: %s", verbString[0])
}
var args []string
argString := stream.Headers()["Args"]
if len(argString) > 1 {
return nil, fmt.Errorf("Stream(%s) has multiple args headers", stream)
}
if len(argString) == 1 {
var err error
args, err = decodeArgs(argString[0])
if err != nil {
return nil, err
}
}
var attach *os.File
if !stream.IsFinished() {
socketFds, socketErr := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM|syscall.FD_CLOEXEC, 0)
if socketErr != nil {
return nil, socketErr
}
attach = os.NewFile(uintptr(socketFds[0]), "")
conn, connErr := net.FileConn(os.NewFile(uintptr(socketFds[1]), ""))
if connErr != nil {
return nil, connErr
}
go func() {
io.Copy(conn, stream)
}()
go func() {
io.Copy(stream, conn)
}()
}
retSender := ret
if retSender == nil || beam.RetPipe.Equals(retSender) {
retSender = &StreamSender{stream: stream, streamChans: streamChans}
}
if mode&beam.Ret == 0 {
retSender.Close()
}
return &beam.Message{
Verb: verb,
Args: args,
Att: attach,
Ret: retSender,
}, nil
}

166
beam/http2/stream.go Normal file
View File

@ -0,0 +1,166 @@
package http2
import (
"fmt"
"github.com/docker/libswarm/beam"
"github.com/docker/spdystream"
"net"
"net/http"
"sync"
)
var verbs = map[string]beam.Verb{
"Ack": beam.Ack,
"Attach": beam.Attach,
"Connect": beam.Connect,
"Error": beam.Error,
"File": beam.File,
"Get": beam.Get,
"Log": beam.Log,
"Ls": beam.Ls,
"Set": beam.Set,
"Spawn": beam.Spawn,
"Start": beam.Start,
"Stop": beam.Stop,
"Watch": beam.Watch,
}
// Only allows sending, no parent stream
type StreamSession struct {
conn *spdystream.Connection
streamLock sync.Mutex
streamChan chan *spdystream.Stream
subStreamChans map[string]chan *spdystream.Stream
}
func (s *StreamSession) addStreamChan(stream *spdystream.Stream, streamChan chan *spdystream.Stream) {
s.subStreamChans[stream.String()] = streamChan
}
func (s *StreamSession) getStreamChan(stream *spdystream.Stream) chan *spdystream.Stream {
if stream == nil {
return s.streamChan
}
streamChan, ok := s.subStreamChans[stream.String()]
if ok {
return streamChan
}
return s.streamChan
}
func (s *StreamSession) newStreamHandler(stream *spdystream.Stream) {
streamChan := s.getStreamChan(stream.Parent())
streamChan <- stream
}
func NewStreamSession(conn net.Conn) (*StreamSession, error) {
session := &StreamSession{
streamChan: make(chan *spdystream.Stream),
subStreamChans: make(map[string]chan *spdystream.Stream),
}
spdyConn, spdyErr := spdystream.NewConnection(conn, false)
if spdyErr != nil {
return nil, spdyErr
}
go spdyConn.Serve(session.newStreamHandler, spdystream.NoAuthHandler)
session.conn = spdyConn
return session, nil
}
func (s *StreamSession) Send(msg *beam.Message) (ret beam.Receiver, err error) {
if msg.Att != nil {
return nil, fmt.Errorf("file attachment not yet implemented for spdy transport")
}
var fin bool
if beam.RetPipe.Equals(msg.Ret) {
fin = false
} else {
fin = true
}
headers := http.Header{
"Verb": []string{msg.Verb.String()},
"Args": []string{encodeArgs(msg.Args)},
}
stream, streamErr := s.conn.CreateStream(headers, nil, fin)
if streamErr != nil {
return nil, streamErr
}
streamChan := make(chan *spdystream.Stream)
s.subStreamChans[stream.String()] = streamChan
if beam.RetPipe.Equals(msg.Ret) {
ret = &StreamReceiver{stream: stream, streamChans: s}
} else {
ret = &beam.NopReceiver{}
}
return
}
func (s *StreamSession) Close() error {
return s.conn.Close()
}
type StreamReceiver struct {
stream *spdystream.Stream
streamChans streamChanProvider
ret beam.Sender
}
func (s *StreamReceiver) Receive(mode int) (*beam.Message, error) {
waitErr := s.stream.Wait()
if waitErr != nil {
return nil, waitErr
}
streamChan := s.streamChans.getStreamChan(s.stream)
stream := <-streamChan
return createStreamMessage(stream, mode, s.streamChans, s.ret)
}
type StreamSender struct {
stream *spdystream.Stream
streamChans streamChanProvider
}
func (s *StreamSender) Send(msg *beam.Message) (ret beam.Receiver, err error) {
if msg.Att != nil {
return nil, fmt.Errorf("file attachment not yet implemented for spdy transport")
}
var fin bool
if beam.RetPipe.Equals(msg.Ret) {
fin = false
} else {
fin = true
}
headers := http.Header{
"Verb": []string{msg.Verb.String()},
"Args": []string{encodeArgs(msg.Args)},
}
stream, streamErr := s.stream.CreateSubStream(headers, fin)
if streamErr != nil {
return nil, streamErr
}
streamChan := make(chan *spdystream.Stream)
s.streamChans.addStreamChan(stream, streamChan)
if beam.RetPipe.Equals(msg.Ret) {
ret = &StreamReceiver{stream: stream, streamChans: s.streamChans}
} else {
ret = beam.NopReceiver{}
}
return
}
func (s *StreamSender) Close() error {
// TODO Remove stream from stream chans
return s.stream.Close()
}

159
beam/http2/stream_test.go Normal file
View File

@ -0,0 +1,159 @@
package http2
import (
//"bytes"
"github.com/docker/libswarm/beam"
//"github.com/docker/spdystream"
"io"
"net"
"testing"
)
func TestBeamSession(t *testing.T) {
end := make(chan bool)
listen := "localhost:7543"
server, serverErr := runServer(listen, t, end)
if serverErr != nil {
t.Fatalf("Error initializing server: %s", serverErr)
}
conn, connErr := net.Dial("tcp", listen)
if connErr != nil {
t.Fatalf("Error dialing server: %s", connErr)
}
sender, senderErr := NewStreamSession(conn)
if senderErr != nil {
t.Fatalf("Error creating sender: %s", senderErr)
}
// Ls interaction
receiver, sendErr := sender.Send(&beam.Message{Verb: beam.Ls, Ret: beam.RetPipe})
if sendErr != nil {
t.Fatalf("Error sending beam message: %s", sendErr)
}
message, receiveErr := receiver.Receive(0)
if receiveErr != nil {
t.Fatalf("Error receiving beam message: %s", receiveErr)
}
if message.Verb != beam.Set {
t.Errorf("Unexpected message name:\nActual: %s\nExpected: %s", message.Verb, beam.Ls.String())
}
if len(message.Args) != 3 {
t.Fatalf("Unexpected args length\nActual: %d\nExpected: %d", len(message.Args), 3)
}
if message.Args[0] != "file1" {
t.Errorf("Unexpected arg[0]\nActual: %s\nExpected: %s", message.Args[0], "file1")
}
if message.Args[1] != "file2" {
t.Errorf("Unexpected arg[0]\nActual: %s\nExpected: %s", message.Args[1], "file2")
}
if message.Args[2] != string([]byte{0x00, 0x00, 0x00}) {
t.Errorf("Unexpected arg[0]\nActual: %s\nExpected: %s", message.Args[2], []byte{0x00, 0x00, 0x00})
}
// Attach interactions
receiver, sendErr = sender.Send(&beam.Message{Verb: beam.Attach, Ret: beam.RetPipe})
if sendErr != nil {
t.Fatalf("Error sending beam message: %s", sendErr)
}
message, receiveErr = receiver.Receive(beam.Ret)
if receiveErr != nil {
t.Fatalf("Error receiving beam message: %s", receiveErr)
}
if message.Verb != beam.Ack {
t.Errorf("Unexpected message name:\nActual: %s\nExpected: %s", message.Verb, beam.Ack.String())
}
// TODO full connect interaction
//if message.Att == nil {
// t.Fatalf("Missing attachment on message")
//}
//testBytes := []byte("Hello")
//n, writeErr := message.Att.Write(testBytes)
//if writeErr != nil {
// t.Fatalf("Error writing bytes: %s", writeErr)
//}
//if n != 5 {
// t.Fatalf("Unexpected number of bytes read:\nActual: %d\nExpected: 5", n)
//}
//buf := make([]byte, 10)
//n, readErr := message.Att.Read(buf)
//if readErr != nil {
// t.Fatalf("Error writing bytes: %s", readErr)
//}
//if n != 5 {
// t.Fatalf("Unexpected number of bytes read:\nActual: %d\nExpected: 5", n)
//}
//if bytes.Compare(buf[:n], testBytes) != 0 {
// t.Fatalf("Did not receive expected message:\nActual: %s\nExpectd: %s", buf, testBytes)
//}
closeErr := server.Close()
if closeErr != nil {
t.Fatalf("Error closing server: %s", closeErr)
}
closeErr = sender.Close()
if closeErr != nil {
t.Fatalf("Error closing sender: %s", closeErr)
}
<-end
}
func runServer(listen string, t *testing.T, endChan chan bool) (io.Closer, error) {
listener, lErr := net.Listen("tcp", listen)
if lErr != nil {
return nil, lErr
}
session, sessionErr := NewListenSession(listener, NoAuthenticator)
if sessionErr != nil {
t.Fatalf("Error creating session: %s", sessionErr)
}
go session.Serve()
go func() {
defer close(endChan)
// Ls exchange
message, receiveErr := session.Receive(beam.Ret)
if receiveErr != nil {
t.Fatalf("Error receiving on server: %s", receiveErr)
}
if message.Verb != beam.Ls {
t.Fatalf("Unexpected verb: %s", message.Verb)
}
receiver, sendErr := message.Ret.Send(&beam.Message{Verb: beam.Set, Args: []string{"file1", "file2", string([]byte{0x00, 0x00, 0x00})}})
if sendErr != nil {
t.Fatalf("Error sending set message: %s", sendErr)
}
_, receiveErr = receiver.Receive(0)
if receiveErr == nil {
t.Fatalf("No error received from empty receiver")
}
if receiveErr != io.EOF {
t.Fatalf("Expected error from empty receiver: %s", receiveErr)
}
// Connect exchange
message, receiveErr = session.Receive(beam.Ret)
if receiveErr != nil {
t.Fatalf("Error receiving on server: %s", receiveErr)
}
if message.Verb != beam.Attach {
t.Fatalf("Unexpected verb: %s", message.Verb)
}
receiver, sendErr = message.Ret.Send(&beam.Message{Verb: beam.Ack})
if sendErr != nil {
t.Fatalf("Error sending set message: %s", sendErr)
}
// TODO full connect interaction
}()
return listener, nil
}

206
beam/inmem.go Normal file
View File

@ -0,0 +1,206 @@
package beam
import (
"io"
"sync"
)
func Pipe() (*PipeReceiver, *PipeSender) {
p := new(pipe)
p.rwait.L = &p.l
p.wwait.L = &p.l
r := &PipeReceiver{p}
w := &PipeSender{p}
return r, w
}
type pipe struct {
rwait sync.Cond
wwait sync.Cond
l sync.Mutex
rl sync.Mutex
wl sync.Mutex
rerr error // if reader closed, error to give writes
werr error // if writer closed, error to give reads
msg *Message
}
func (p *pipe) psend(msg *Message) error {
var err error
// One writer at a time.
p.wl.Lock()
defer p.wl.Unlock()
p.l.Lock()
defer p.l.Unlock()
p.msg = msg
p.rwait.Signal()
for {
if p.msg == nil {
break
}
if p.rerr != nil {
err = p.rerr
break
}
if p.werr != nil {
err = io.ErrClosedPipe
}
p.wwait.Wait()
}
p.msg = nil // in case of rerr or werr
return err
}
func (p *pipe) send(msg *Message) (ret Receiver, err error) {
// Prepare nested Receiver if requested
if RetPipe.Equals(msg.Ret) {
ret, msg.Ret = Pipe()
}
err = p.psend(msg)
return
}
func (p *pipe) preceive() (*Message, error) {
p.rl.Lock()
defer p.rl.Unlock()
p.l.Lock()
defer p.l.Unlock()
for {
if p.rerr != nil {
return nil, io.ErrClosedPipe
}
if p.msg != nil {
break
}
if p.werr != nil {
return nil, p.werr
}
p.rwait.Wait()
}
msg := p.msg
p.msg = nil
p.wwait.Signal()
return msg, nil
}
func (p *pipe) receive(mode int) (*Message, error) {
msg, err := p.preceive()
if err != nil {
return nil, err
}
if msg.Ret == nil {
msg.Ret = NopSender{}
}
if mode&Ret == 0 {
msg.Ret.Close()
}
return msg, nil
}
func (p *pipe) rclose(err error) {
if err == nil {
err = io.ErrClosedPipe
}
p.l.Lock()
defer p.l.Unlock()
p.rerr = err
p.rwait.Signal()
p.wwait.Signal()
}
func (p *pipe) wclose(err error) {
if err == nil {
err = io.EOF
}
p.l.Lock()
defer p.l.Unlock()
p.werr = err
p.rwait.Signal()
p.wwait.Signal()
}
// PipeReceiver
type PipeReceiver struct {
p *pipe
}
func (r *PipeReceiver) Receive(mode int) (*Message, error) {
return r.p.receive(mode)
}
func (r *PipeReceiver) SendTo(dst Sender) (int, error) {
var n int
// If the destination is a PipeSender, we can cheat
pdst, ok := dst.(*PipeSender)
if !ok {
return 0, ErrIncompatibleSender
}
for {
pmsg, err := r.p.preceive()
if err == io.EOF {
break
}
if err != nil {
return n, err
}
if err := pdst.p.psend(pmsg); err != nil {
return n, err
}
}
n++
return n, nil
}
func (r *PipeReceiver) Close() error {
return r.CloseWithError(nil)
}
func (r *PipeReceiver) CloseWithError(err error) error {
r.p.rclose(err)
return nil
}
// PipeSender
type PipeSender struct {
p *pipe
}
func (w *PipeSender) Send(msg *Message) (Receiver, error) {
return w.p.send(msg)
}
func (w *PipeSender) ReceiveFrom(src Receiver) (int, error) {
var n int
// If the destination is a PipeReceiver, we can cheat
psrc, ok := src.(*PipeReceiver)
if !ok {
return 0, ErrIncompatibleReceiver
}
for {
pmsg, err := psrc.p.preceive()
if err == io.EOF {
break
}
if err != nil {
return n, err
}
if err := w.p.psend(pmsg); err != nil {
return n, err
}
n++
}
return n, nil
}
func (w *PipeSender) Close() error {
return w.CloseWithError(nil)
}
func (w *PipeSender) CloseWithError(err error) error {
w.p.wclose(err)
return nil
}

145
beam/inmem_test.go Normal file
View File

@ -0,0 +1,145 @@
package beam
import (
"fmt"
"github.com/dotcloud/docker/pkg/testutils"
"io/ioutil"
"os"
"testing"
)
func TestInmemRetPipe(t *testing.T) {
r, w := Pipe()
defer r.Close()
defer w.Close()
wait := make(chan struct{})
go func() {
defer close(wait)
ret, err := w.Send(&Message{Verb: Log, Args: []string{"hello"}, Ret: RetPipe})
if err != nil {
t.Fatal(err)
}
msg, err := ret.Receive(0)
if err != nil {
t.Fatal(err)
}
if msg.Verb != Ack {
t.Fatalf("%#v", msg)
}
if msg.Args[0] != "this better not crash" {
t.Fatalf("%#v", msg)
}
}()
msg, err := r.Receive(Ret)
if err != nil {
t.Fatal(err)
}
if _, err := msg.Ret.Send(&Message{Verb: Ack, Args: []string{"this better not crash"}}); err != nil {
t.Fatal(err)
}
<-wait
}
func TestSimpleSend(t *testing.T) {
r, w := Pipe()
defer r.Close()
defer w.Close()
testutils.Timeout(t, func() {
go func() {
msg, err := r.Receive(0)
if err != nil {
t.Fatal(err)
}
if msg.Verb != Log {
t.Fatalf("%#v", *msg)
}
if msg.Args[0] != "hello world" {
t.Fatalf("%#v", *msg)
}
}()
if _, err := w.Send(&Message{Verb: Log, Args: []string{"hello world"}}); err != nil {
t.Fatal(err)
}
})
}
func TestSendReply(t *testing.T) {
r, w := Pipe()
defer r.Close()
defer w.Close()
testutils.Timeout(t, func() {
// Send
go func() {
ret, err := w.Send(&Message{Args: []string{"this is the request"}, Ret: RetPipe})
if err != nil {
t.Fatal(err)
}
if ret == nil {
t.Fatalf("ret = nil\n")
}
// Read for a reply
msg, err := ret.Receive(0)
if err != nil {
t.Fatal(err)
}
if msg.Args[0] != "this is the reply" {
t.Fatalf("%#v", msg)
}
}()
// Receive a message with mode=Ret
msg, err := r.Receive(Ret)
if err != nil {
t.Fatal(err)
}
if msg.Args[0] != "this is the request" {
t.Fatalf("%#v", msg)
}
if msg.Ret == nil {
t.Fatalf("%#v", msg)
}
// Send a reply
_, err = msg.Ret.Send(&Message{Args: []string{"this is the reply"}})
if err != nil {
t.Fatal(err)
}
})
}
func TestSendFile(t *testing.T) {
r, w := Pipe()
defer r.Close()
defer w.Close()
tmp, err := ioutil.TempFile("", "beam-test-")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmp.Name())
fmt.Fprintf(tmp, "hello world\n")
tmp.Sync()
tmp.Seek(0, 0)
testutils.Timeout(t, func() {
go func() {
_, err := w.Send(&Message{Verb: File, Args: []string{"path=" + tmp.Name()}, Att: tmp})
if err != nil {
t.Fatal(err)
}
}()
msg, err := r.Receive(0)
if err != nil {
t.Fatal(err)
}
if msg.Verb != File {
t.Fatalf("%#v", msg)
}
if msg.Args[0] != "path="+tmp.Name() {
t.Fatalf("%#v", msg)
}
txt, err := ioutil.ReadAll(msg.Att)
if err != nil {
t.Fatal(err)
}
if string(txt) != "hello world\n" {
t.Fatalf("%s\n", txt)
}
})
}

21
beam/nop.go Normal file
View File

@ -0,0 +1,21 @@
package beam
import (
"io"
)
type NopSender struct{}
func (s NopSender) Send(msg *Message) (Receiver, error) {
return NopReceiver{}, nil
}
func (s NopSender) Close() error {
return nil
}
type NopReceiver struct{}
func (r NopReceiver) Receive(mode int) (*Message, error) {
return nil, io.EOF
}

204
beam/object.go Normal file
View File

@ -0,0 +1,204 @@
package beam
import (
"encoding/json"
"fmt"
"io"
"net"
"strings"
)
// FIXME: rename Object to Client
type Object struct {
Sender
}
func Obj(dst Sender) *Object {
return &Object{dst}
}
func (o *Object) Log(msg string, args ...interface{}) error {
_, err := o.Send(&Message{Verb: Log, Args: []string{fmt.Sprintf(msg, args...)}})
return err
}
func (o *Object) Ls() ([]string, error) {
ret, err := o.Send(&Message{Verb: Ls, Ret: RetPipe})
if err != nil {
return nil, err
}
msg, err := ret.Receive(0)
if err == io.EOF {
return nil, fmt.Errorf("unexpected EOF")
}
if msg.Verb == Set {
if err != nil {
return nil, err
}
return msg.Args, nil
}
if msg.Verb == Error {
return nil, fmt.Errorf(strings.Join(msg.Args[:1], ""))
}
return nil, fmt.Errorf("unexpected verb %v", msg.Verb)
}
func (o *Object) Spawn(cmd ...string) (out *Object, err error) {
ret, err := o.Send(&Message{Verb: Spawn, Args: cmd, Ret: RetPipe})
if err != nil {
return nil, err
}
msg, err := ret.Receive(Ret)
if err == io.EOF {
return nil, fmt.Errorf("unexpected EOF")
}
if err != nil {
return nil, err
}
if msg.Verb == Ack {
return &Object{msg.Ret}, nil
}
msg.Ret.Close()
if msg.Verb == Error {
return nil, fmt.Errorf("%s", strings.Join(msg.Args[:1], ""))
}
return nil, fmt.Errorf("unexpected verb %v", msg.Verb)
}
func (o *Object) Attach(name string) (in Receiver, out *Object, err error) {
ret, err := o.Send(&Message{Verb: Attach, Args: []string{name}, Ret: RetPipe})
if err != nil {
return nil, nil, err
}
msg, err := ret.Receive(Ret)
if err == io.EOF {
return nil, nil, fmt.Errorf("unexpected EOF")
}
if err != nil {
return nil, nil, err
}
if msg.Verb == Ack {
return ret, &Object{msg.Ret}, nil
}
msg.Ret.Close()
if msg.Verb == Error {
return nil, nil, fmt.Errorf("%s", strings.Join(msg.Args[:1], ""))
}
return nil, nil, fmt.Errorf("unexpected verb %v", msg.Verb)
}
func (o *Object) Error(msg string, args ...interface{}) error {
_, err := o.Send(&Message{Verb: Error, Args: []string{fmt.Sprintf(msg, args...)}})
return err
}
func (o *Object) Connect() (net.Conn, error) {
ret, err := o.Send(&Message{Verb: Connect, Ret: RetPipe})
if err != nil {
return nil, err
}
// FIXME: set Att
msg, err := ret.Receive(0)
if err == io.EOF {
return nil, fmt.Errorf("unexpected EOF")
}
if msg.Verb == Connect {
if msg.Att == nil {
return nil, fmt.Errorf("missing attachment")
}
conn, err := net.FileConn(msg.Att)
if err != nil {
msg.Att.Close()
return nil, err
}
msg.Att.Close()
return conn, nil
}
if msg.Verb == Error {
return nil, fmt.Errorf(strings.Join(msg.Args[:1], ""))
}
return nil, fmt.Errorf("unexpected verb %v", msg.Verb)
}
func (o *Object) SetJson(val interface{}) error {
txt, err := json.Marshal(val)
if err != nil {
return err
}
return o.Set(string(txt))
}
func (o *Object) Set(vals ...string) error {
_, err := o.Send(&Message{Verb: Set, Args: vals})
return err
}
func (o *Object) Get(key string) (string, error) {
ret, err := o.Send(&Message{Verb: Get, Args: []string{key}, Ret: RetPipe})
if err != nil {
return "", err
}
msg, err := ret.Receive(0)
if err == io.EOF {
return "", fmt.Errorf("unexpected EOF")
}
if msg.Verb == Set {
if err != nil {
return "", err
}
if len(msg.Args) != 1 {
return "", fmt.Errorf("protocol error")
}
return msg.Args[0], nil
}
if msg.Verb == Error {
return "", fmt.Errorf(strings.Join(msg.Args[:1], ""))
}
return "", fmt.Errorf("unexpected verb %v", msg.Verb)
}
func (o *Object) Watch() (Receiver, error) {
ret, err := o.Send(&Message{Verb: Watch, Ret: RetPipe})
if err != nil {
return nil, err
}
msg, err := ret.Receive(0)
if msg.Verb == Ack {
return ret, nil
}
if msg.Verb == Error {
return nil, fmt.Errorf(strings.Join(msg.Args[:1], ""))
}
return nil, fmt.Errorf("unexpected verb %v", msg.Verb)
}
func (o *Object) Start() error {
ret, err := o.Send(&Message{Verb: Start, Ret: RetPipe})
msg, err := ret.Receive(0)
if err == io.EOF {
return fmt.Errorf("unexpected EOF")
}
if msg.Verb == Ack {
return nil
}
if msg.Verb == Error {
return fmt.Errorf(strings.Join(msg.Args[:1], ""))
}
return fmt.Errorf("unexpected verb %v", msg.Verb)
}
func (o *Object) Stop() error {
ret, err := o.Send(&Message{Verb: Stop, Ret: RetPipe})
msg, err := ret.Receive(0)
if err == io.EOF {
return fmt.Errorf("unexpected EOF")
}
if msg.Verb == Ack {
return nil
}
if msg.Verb == Error {
return fmt.Errorf(strings.Join(msg.Args[:1], ""))
}
return fmt.Errorf("unexpected verb %v", msg.Verb)
}

68
beam/server.go Normal file
View File

@ -0,0 +1,68 @@
package beam
import (
"fmt"
)
type Server struct {
handlers map[Verb]Sender
catchall Sender
}
func NewServer() *Server {
return &Server{
handlers: make(map[Verb]Sender),
}
}
func (s *Server) Catchall(h Sender) *Server {
s.catchall = h
return s
}
func (s *Server) OnVerb(v Verb, h Sender) *Server {
s.handlers[v] = h
return s
}
func (s *Server) OnSpawn(h Sender) *Server {
return s.OnVerb(Spawn, h)
}
func (s *Server) OnStart(h Sender) *Server {
return s.OnVerb(Start, h)
}
func (s *Server) OnStop(h Sender) *Server {
return s.OnVerb(Stop, h)
}
func (s *Server) OnAttach(h Sender) *Server {
return s.OnVerb(Attach, h)
}
func (s *Server) OnLog(h Sender) *Server {
return s.OnVerb(Log, h)
}
func (s *Server) OnError(h Sender) *Server {
return s.OnVerb(Error, h)
}
func (s *Server) OnLs(h Sender) *Server {
return s.OnVerb(Ls, h)
}
func (s *Server) Send(msg *Message) (Receiver, error) {
if h, exists := s.handlers[msg.Verb]; exists {
return h.Send(msg)
}
if s.catchall != nil {
return s.catchall.Send(msg)
}
return NotImplemented.Send(msg)
}
func (s *Server) Close() error {
return fmt.Errorf("can't close")
}

36
beam/task.go Normal file
View File

@ -0,0 +1,36 @@
package beam
import (
"fmt"
"sync"
)
func Task(f func(in Receiver, out Sender)) Sender {
var running bool
var l sync.RWMutex
inR, inW := Pipe()
outR, outW := Pipe()
obj := NewServer()
obj.OnAttach(Handler(func(msg *Message) error {
msg.Ret.Send(&Message{Verb: Ack, Ret: inW})
fmt.Printf("copying task output from %#v to %#v\n", outR, msg.Ret)
defer fmt.Printf("(DONE) copying task output from %#v to %#v\n", outR, msg.Ret)
Copy(msg.Ret, outR)
return nil
}))
obj.OnStart(Handler(func(msg *Message) error {
l.RLock()
r := running
l.RUnlock()
if r {
return fmt.Errorf("already running")
}
l.Lock()
go f(inR, outW)
running = true
l.Unlock()
msg.Ret.Send(&Message{Verb: Ack})
return nil
}))
return obj
}

44
beam/tree.go Normal file
View File

@ -0,0 +1,44 @@
package beam
import (
"sort"
)
type Tree struct {
*Server
children map[string]Sender
}
func NewTree() *Tree {
t := &Tree{
Server: NewServer(),
children: make(map[string]Sender),
}
t.OnAttach(Handler(func(msg *Message) error {
if len(msg.Args) == 0 || msg.Args[0] == "" {
msg.Ret.Send(&Message{Verb: Ack, Ret: t})
return nil
}
if child, exists := t.children[msg.Args[0]]; exists {
msg.Ret.Send(&Message{Verb: Ack, Ret: child})
return nil
}
Obj(msg.Ret).Error("not found")
return nil
}))
t.OnLs(Handler(func(msg *Message) error {
names := make([]string, 0, len(t.children))
for name := range t.children {
names = append(names, name)
}
sort.Strings(names)
Obj(msg.Ret).Set(names...)
return nil
}))
return t
}
func (t *Tree) Bind(name string, dst Sender) *Tree {
t.children[name] = dst
return t
}

166
beam/unix/beam.go Normal file
View File

@ -0,0 +1,166 @@
package unix
import (
"fmt"
"io"
"os"
)
type Sender interface {
Send([]byte, *os.File) error
}
type Receiver interface {
Receive() ([]byte, *os.File, error)
}
type ReceiveCloser interface {
Receiver
Close() error
}
type SendCloser interface {
Sender
Close() error
}
type ReceiveSender interface {
Receiver
Sender
}
const (
R int = 1 << (32 - 1 - iota)
W
)
func sendPipe(dst Sender, data []byte, mode int) (*os.File, error) {
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
var (
remote *os.File
local *os.File
)
if mode == R {
remote = r
local = w
} else if mode == W {
remote = w
local = r
}
if err := dst.Send(data, remote); err != nil {
local.Close()
remote.Close()
return nil, err
}
return local, nil
}
// SendRPipe create a pipe and sends its *read* end attached in a beam message
// to `dst`, with `data` as the message payload.
// It returns the *write* end of the pipe, or an error.
func SendRPipe(dst Sender, data []byte) (*os.File, error) {
return sendPipe(dst, data, R)
}
// SendWPipe create a pipe and sends its *read* end attached in a beam message
// to `dst`, with `data` as the message payload.
// It returns the *write* end of the pipe, or an error.
func SendWPipe(dst Sender, data []byte) (*os.File, error) {
return sendPipe(dst, data, W)
}
func SendConn(dst Sender, data []byte) (conn *UnixConn, err error) {
local, remote, err := SocketPair()
if err != nil {
return nil, err
}
defer func() {
if err != nil {
local.Close()
remote.Close()
}
}()
conn, err = FileConn(local)
if err != nil {
return nil, err
}
local.Close()
if err := dst.Send(data, remote); err != nil {
return nil, err
}
return conn, nil
}
func ReceiveConn(src Receiver) ([]byte, *UnixConn, error) {
for {
data, f, err := src.Receive()
if err != nil {
return nil, nil, err
}
if f == nil {
// Skip empty attachments
continue
}
conn, err := FileConn(f)
if err != nil {
// Skip beam attachments which are not connections
// (for example might be a regular file, directory etc)
continue
}
return data, conn, nil
}
panic("impossibru!")
return nil, nil, nil
}
func Copy(dst Sender, src Receiver) (int, error) {
var n int
for {
payload, attachment, err := src.Receive()
if err == io.EOF {
return n, nil
} else if err != nil {
return n, err
}
if err := dst.Send(payload, attachment); err != nil {
if attachment != nil {
attachment.Close()
}
return n, err
}
n++
}
panic("impossibru!")
return n, nil
}
// MsgDesc returns a human readable description of a beam message, usually
// for debugging purposes.
func MsgDesc(payload []byte, attachment *os.File) string {
var filedesc string = "<nil>"
if attachment != nil {
filedesc = fmt.Sprintf("%d", attachment.Fd())
}
return fmt.Sprintf("'%s'[%s]", payload, filedesc)
}
type devnull struct{}
func Devnull() ReceiveSender {
return devnull{}
}
func (d devnull) Send(p []byte, a *os.File) error {
if a != nil {
a.Close()
}
return nil
}
func (d devnull) Receive() ([]byte, *os.File, error) {
return nil, nil, io.EOF
}

39
beam/unix/beam_test.go Normal file
View File

@ -0,0 +1,39 @@
package unix
import (
"github.com/dotcloud/docker/pkg/beam/data"
"testing"
)
func TestSendConn(t *testing.T) {
a, b, err := USocketPair()
if err != nil {
t.Fatal(err)
}
defer a.Close()
defer b.Close()
go func() {
conn, err := SendConn(a, data.Empty().Set("type", "connection").Bytes())
if err != nil {
t.Fatal(err)
}
if err := conn.Send(data.Empty().Set("foo", "bar").Bytes(), nil); err != nil {
t.Fatal(err)
}
conn.CloseWrite()
}()
payload, conn, err := ReceiveConn(b)
if err != nil {
t.Fatal(err)
}
if val := data.Message(string(payload)).Get("type"); val == nil || val[0] != "connection" {
t.Fatalf("%v != %v\n", val, "connection")
}
msg, _, err := conn.Receive()
if err != nil {
t.Fatal(err)
}
if val := data.Message(string(msg)).Get("foo"); val == nil || val[0] != "bar" {
t.Fatalf("%v != %v\n", val, "bar")
}
}

144
beam/unix/conn.go Normal file
View File

@ -0,0 +1,144 @@
package unix
import (
"fmt"
"os"
"strconv"
"github.com/docker/libswarm/beam"
"github.com/docker/libswarm/beam/data"
)
func Pair() (*Conn, *Conn, error) {
c1, c2, err := USocketPair()
if err != nil {
return nil, nil, err
}
return &Conn{c1}, &Conn{c2}, nil
}
type Conn struct {
*UnixConn
}
func sendablePair() (conn *UnixConn, remoteFd *os.File, err error) {
// Get 2 *os.File
local, remote, err := SocketPair()
if err != nil {
return nil, nil, err
}
defer func() {
if err != nil {
local.Close()
remote.Close()
}
}()
// Convert 1 to *net.UnixConn
conn, err = FileConn(local)
if err != nil {
return nil, nil, err
}
local.Close()
// Return the "mismatched" pair
return conn, remote, nil
}
// This implements beam.Sender.Close which *only closes the sender*.
// This is similar to the pattern of only closing go channels from
// the sender's side.
// If you want to close the entire connection, call Conn.UnixConn.Close.
func (c *Conn) Close() error {
return c.UnixConn.CloseWrite()
}
func (c *Conn) Send(msg *beam.Message) (beam.Receiver, error) {
if msg.Att != nil {
return nil, fmt.Errorf("file attachment not yet implemented in unix transport")
}
parts := []string{fmt.Sprintf("%d", msg.Verb)}
parts = append(parts, msg.Args...)
b := []byte(data.EncodeList(parts))
// Setup nested streams
var (
fd *os.File
ret beam.Receiver
err error
)
// Caller requested a return pipe
if beam.RetPipe.Equals(msg.Ret) {
local, remote, err := sendablePair()
if err != nil {
return nil, err
}
fd = remote
ret = &Conn{local}
// Caller specified its own return channel
} else if msg.Ret != nil {
// The specified return channel is a unix conn: engaging cheat mode!
if retConn, ok := msg.Ret.(*Conn); ok {
fd, err = retConn.UnixConn.File()
if err != nil {
return nil, fmt.Errorf("error passing return channel: %v", err)
}
// Close duplicate fd
retConn.UnixConn.Close()
// The specified return channel is an unknown type: proxy messages.
} else {
local, remote, err := sendablePair()
if err != nil {
return nil, fmt.Errorf("error passing return channel: %v", err)
}
fd = remote
// FIXME: do we need a reference no all these background tasks?
go func() {
// Copy messages from the remote return channel to the local return channel.
// When the remote return channel is closed, also close the local return channel.
localConn := &Conn{local}
beam.Copy(msg.Ret, localConn)
msg.Ret.Close()
localConn.Close()
}()
}
}
if err := c.UnixConn.Send(b, fd); err != nil {
return nil, err
}
return ret, nil
}
func (c *Conn) Receive(mode int) (*beam.Message, error) {
b, fd, err := c.UnixConn.Receive()
if err != nil {
return nil, err
}
parts, n, err := data.DecodeList(string(b))
if err != nil {
return nil, err
}
if n != len(b) {
return nil, fmt.Errorf("garbage data %#v", b[:n])
}
if len(parts) == 0 {
return nil, fmt.Errorf("malformed message")
}
v, err := strconv.ParseUint(parts[0], 10, 32)
if err != nil {
return nil, err
}
msg := &beam.Message{Verb: beam.Verb(v), Args: parts[1:]}
// Apply mode mask
if fd != nil {
subconn, err := FileConn(fd)
if err != nil {
return nil, err
}
fd.Close()
if mode&beam.Ret != 0 {
msg.Ret = &Conn{subconn}
} else {
subconn.CloseWrite()
}
}
return msg, nil
}

74
beam/unix/conn_test.go Normal file
View File

@ -0,0 +1,74 @@
package unix
import (
"github.com/docker/libswarm/beam"
"github.com/dotcloud/docker/pkg/testutils"
"testing"
)
func TestPair(t *testing.T) {
r, w, err := Pair()
if err != nil {
t.Fatal("Unexpected error")
}
defer r.Close()
defer w.Close()
testutils.Timeout(t, func() {
go func() {
msg, err := r.Receive(0)
if err != nil {
t.Fatal(err)
}
if msg.Verb != beam.Log {
t.Fatalf("%#v", *msg)
}
if msg.Args[0] != "hello world" {
t.Fatalf("%#v", *msg)
}
}()
_, err := w.Send(&beam.Message{Verb: beam.Log, Args: []string{"hello world"}})
if err != nil {
t.Fatal(err)
}
})
}
func TestSendReply(t *testing.T) {
r, w, err := Pair()
if err != nil {
t.Fatal(err)
}
defer r.Close()
defer w.Close()
testutils.Timeout(t, func() {
// Send
go func() {
// Send a message with mode=R
ret, err := w.Send(&beam.Message{Args: []string{"this is the request"}, Ret: beam.RetPipe})
if err != nil {
t.Fatal(err)
}
// Read for a reply
msg, err := ret.Receive(0)
if err != nil {
t.Fatal(err)
}
if msg.Args[0] != "this is the reply" {
t.Fatalf("%#v", msg)
}
}()
// Receive a message with mode=W
msg, err := r.Receive(beam.Ret)
if err != nil {
t.Fatal(err)
}
if msg.Args[0] != "this is the request" {
t.Fatalf("%#v", msg)
}
// Send a reply
_, err = msg.Ret.Send(&beam.Message{Args: []string{"this is the reply"}})
if err != nil {
t.Fatal(err)
}
})
}

317
beam/unix/unix.go Normal file
View File

@ -0,0 +1,317 @@
package unix
import (
"bufio"
"fmt"
"net"
"os"
"syscall"
)
func debugCheckpoint(msg string, args ...interface{}) {
if os.Getenv("DEBUG") == "" {
return
}
os.Stdout.Sync()
tty, _ := os.OpenFile("/dev/tty", os.O_RDWR, 0700)
fmt.Fprintf(tty, msg, args...)
bufio.NewScanner(tty).Scan()
tty.Close()
}
type UnixConn struct {
*net.UnixConn
fds []*os.File
}
// Framing:
// In order to handle framing in Send/Recieve, as these give frame
// boundaries we use a very simple 4 bytes header. It is a big endiand
// uint32 where the high bit is set if the message includes a file
// descriptor. The rest of the uint32 is the length of the next frame.
// We need the bit in order to be able to assign recieved fds to
// the right message, as multiple messages may be coalesced into
// a single recieve operation.
func makeHeader(data []byte, fds []int) ([]byte, error) {
header := make([]byte, 4)
length := uint32(len(data))
if length > 0x7fffffff {
return nil, fmt.Errorf("Data to large")
}
if len(fds) != 0 {
length = length | 0x80000000
}
header[0] = byte((length >> 24) & 0xff)
header[1] = byte((length >> 16) & 0xff)
header[2] = byte((length >> 8) & 0xff)
header[3] = byte((length >> 0) & 0xff)
return header, nil
}
func parseHeader(header []byte) (uint32, bool) {
length := uint32(header[0])<<24 | uint32(header[1])<<16 | uint32(header[2])<<8 | uint32(header[3])
hasFd := length&0x80000000 != 0
length = length & ^uint32(0x80000000)
return length, hasFd
}
func FileConn(f *os.File) (*UnixConn, error) {
conn, err := net.FileConn(f)
if err != nil {
return nil, err
}
uconn, ok := conn.(*net.UnixConn)
if !ok {
conn.Close()
return nil, fmt.Errorf("%d: not a unix connection", f.Fd())
}
return &UnixConn{UnixConn: uconn}, nil
}
// Send sends a new message on conn with data and f as payload and
// attachment, respectively.
// On success, f is closed
func (conn *UnixConn) Send(data []byte, f *os.File) error {
{
var fd int = -1
if f != nil {
fd = int(f.Fd())
}
debugCheckpoint("===DEBUG=== about to send '%s'[%d]. Hit enter to confirm: ", data, fd)
}
var fds []int
if f != nil {
fds = append(fds, int(f.Fd()))
}
if err := conn.sendUnix(data, fds...); err != nil {
return err
}
if f != nil {
f.Close()
}
return nil
}
// Receive waits for a new message on conn, and receives its payload
// and attachment, or an error if any.
//
// If more than 1 file descriptor is sent in the message, they are all
// closed except for the first, which is the attachment.
// It is legal for a message to have no attachment or an empty payload.
func (conn *UnixConn) Receive() (rdata []byte, rf *os.File, rerr error) {
defer func() {
var fd int = -1
if rf != nil {
fd = int(rf.Fd())
}
debugCheckpoint("===DEBUG=== Receive() -> '%s'[%d]. Hit enter to continue.\n", rdata, fd)
}()
// Read header
header := make([]byte, 4)
nRead := uint32(0)
for nRead < 4 {
n, err := conn.receiveUnix(header[nRead:])
if err != nil {
return nil, nil, err
}
nRead = nRead + uint32(n)
}
length, hasFd := parseHeader(header)
if hasFd {
if len(conn.fds) == 0 {
return nil, nil, fmt.Errorf("No expected file descriptor in message")
}
rf = conn.fds[0]
conn.fds = conn.fds[1:]
}
rdata = make([]byte, length)
nRead = 0
for nRead < length {
n, err := conn.receiveUnix(rdata[nRead:])
if err != nil {
return nil, nil, err
}
nRead = nRead + uint32(n)
}
return
}
func (conn *UnixConn) receiveUnix(buf []byte) (int, error) {
oob := make([]byte, syscall.CmsgSpace(4))
bufn, oobn, _, _, err := conn.ReadMsgUnix(buf, oob)
if err != nil {
return 0, err
}
fd := extractFd(oob[:oobn])
if fd != -1 {
f := os.NewFile(uintptr(fd), "")
conn.fds = append(conn.fds, f)
}
return bufn, nil
}
func (conn *UnixConn) sendUnix(data []byte, fds ...int) error {
header, err := makeHeader(data, fds)
if err != nil {
return err
}
// There is a bug in conn.WriteMsgUnix where it doesn't correctly return
// the number of bytes writte (http://code.google.com/p/go/issues/detail?id=7645)
// So, we can't rely on the return value from it. However, we must use it to
// send the fds. In order to handle this we only write one byte using WriteMsgUnix
// (when we have to), as that can only ever block or fully suceed. We then write
// the rest with conn.Write()
// The reader side should not rely on this though, as hopefully this gets fixed
// in go later.
written := 0
if len(fds) != 0 {
oob := syscall.UnixRights(fds...)
wrote, _, err := conn.WriteMsgUnix(header[0:1], oob, nil)
if err != nil {
return err
}
written = written + wrote
}
for written < len(header) {
wrote, err := conn.Write(header[written:])
if err != nil {
return err
}
written = written + wrote
}
written = 0
for written < len(data) {
wrote, err := conn.Write(data[written:])
if err != nil {
return err
}
written = written + wrote
}
return nil
}
func extractFd(oob []byte) int {
// Grab forklock to make sure no forks accidentally inherit the new
// fds before they are made CLOEXEC
// There is a slight race condition between ReadMsgUnix returns and
// when we grap the lock, so this is not perfect. Unfortunately
// There is no way to pass MSG_CMSG_CLOEXEC to recvmsg() nor any
// way to implement non-blocking i/o in go, so this is hard to fix.
syscall.ForkLock.Lock()
defer syscall.ForkLock.Unlock()
scms, err := syscall.ParseSocketControlMessage(oob)
if err != nil {
return -1
}
foundFd := -1
for _, scm := range scms {
fds, err := syscall.ParseUnixRights(&scm)
if err != nil {
continue
}
for _, fd := range fds {
if foundFd == -1 {
syscall.CloseOnExec(fd)
foundFd = fd
} else {
syscall.Close(fd)
}
}
}
return foundFd
}
func socketpair() ([2]int, error) {
return syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM|syscall.FD_CLOEXEC, 0)
}
// SocketPair is a convenience wrapper around the socketpair(2) syscall.
// It returns a unix socket of type SOCK_STREAM in the form of 2 file descriptors
// not bound to the underlying filesystem.
// Messages sent on one end are received on the other, and vice-versa.
// It is the caller's responsibility to close both ends.
func SocketPair() (a *os.File, b *os.File, err error) {
defer func() {
var (
fdA int = -1
fdB int = -1
)
if a != nil {
fdA = int(a.Fd())
}
if b != nil {
fdB = int(b.Fd())
}
debugCheckpoint("===DEBUG=== SocketPair() = [%d-%d]. Hit enter to confirm: ", fdA, fdB)
}()
pair, err := socketpair()
if err != nil {
return nil, nil, err
}
return os.NewFile(uintptr(pair[0]), ""), os.NewFile(uintptr(pair[1]), ""), nil
}
func USocketPair() (*UnixConn, *UnixConn, error) {
debugCheckpoint("===DEBUG=== USocketPair(). Hit enter to confirm: ")
defer debugCheckpoint("===DEBUG=== USocketPair() returned. Hit enter to confirm ")
a, b, err := SocketPair()
if err != nil {
return nil, nil, err
}
defer a.Close()
defer b.Close()
uA, err := FileConn(a)
if err != nil {
return nil, nil, err
}
uB, err := FileConn(b)
if err != nil {
uA.Close()
return nil, nil, err
}
return uA, uB, nil
}
// FdConn wraps a file descriptor in a standard *net.UnixConn object, or
// returns an error if the file descriptor does not point to a unix socket.
// This creates a duplicate file descriptor. It's the caller's responsibility
// to close both.
func FdConn(fd int) (n *net.UnixConn, err error) {
{
debugCheckpoint("===DEBUG=== FdConn([%d]) = (unknown fd). Hit enter to confirm: ", fd)
}
f := os.NewFile(uintptr(fd), fmt.Sprintf("%d", fd))
conn, err := net.FileConn(f)
if err != nil {
return nil, err
}
uconn, ok := conn.(*net.UnixConn)
if !ok {
conn.Close()
return nil, fmt.Errorf("%d: not a unix connection", fd)
}
return uconn, nil
}

237
beam/unix/unix_test.go Normal file
View File

@ -0,0 +1,237 @@
package unix
import (
"fmt"
"io/ioutil"
"testing"
)
func TestSocketPair(t *testing.T) {
a, b, err := SocketPair()
if err != nil {
t.Fatal(err)
}
go func() {
a.Write([]byte("hello world!"))
fmt.Printf("done writing. closing\n")
a.Close()
fmt.Printf("done closing\n")
}()
data, err := ioutil.ReadAll(b)
if err != nil {
t.Fatal(err)
}
fmt.Printf("--> %s\n", data)
fmt.Printf("still open: %v\n", a.Fd())
}
func TestUSocketPair(t *testing.T) {
a, b, err := USocketPair()
if err != nil {
t.Fatal(err)
}
data := "hello world!"
go func() {
a.Write([]byte(data))
a.Close()
}()
res := make([]byte, 1024)
size, err := b.Read(res)
if err != nil {
t.Fatal(err)
}
if size != len(data) {
t.Fatal("Unexpected size")
}
if string(res[0:size]) != data {
t.Fatal("Unexpected data")
}
}
func TestSendUnixSocket(t *testing.T) {
a1, a2, err := USocketPair()
if err != nil {
t.Fatal(err)
}
// defer a1.Close()
// defer a2.Close()
b1, b2, err := USocketPair()
if err != nil {
t.Fatal(err)
}
// defer b1.Close()
// defer b2.Close()
glueA, glueB, err := SocketPair()
if err != nil {
t.Fatal(err)
}
// defer glueA.Close()
// defer glueB.Close()
go func() {
err := b2.Send([]byte("a"), glueB)
if err != nil {
t.Fatal(err)
}
}()
go func() {
err := a2.Send([]byte("b"), glueA)
if err != nil {
t.Fatal(err)
}
}()
connAhdr, connA, err := a1.Receive()
if err != nil {
t.Fatal(err)
}
if string(connAhdr) != "b" {
t.Fatalf("unexpected: %s", connAhdr)
}
connBhdr, connB, err := b1.Receive()
if err != nil {
t.Fatal(err)
}
if string(connBhdr) != "a" {
t.Fatalf("unexpected: %s", connBhdr)
}
fmt.Printf("received both ends: %v <-> %v\n", connA.Fd(), connB.Fd())
go func() {
fmt.Printf("sending message on %v\n", connA.Fd())
connA.Write([]byte("hello world"))
connA.Sync()
fmt.Printf("closing %v\n", connA.Fd())
connA.Close()
}()
data, err := ioutil.ReadAll(connB)
if err != nil {
t.Fatal(err)
}
fmt.Printf("---> %s\n", data)
}
// Ensure we get proper segmenting of messages
func TestSendSegmenting(t *testing.T) {
a, b, err := USocketPair()
if err != nil {
t.Fatal(err)
}
defer a.Close()
defer b.Close()
extrafd1, extrafd2, err := SocketPair()
if err != nil {
t.Fatal(err)
}
extrafd2.Close()
go func() {
a.Send([]byte("message 1"), nil)
a.Send([]byte("message 2"), extrafd1)
a.Send([]byte("message 3"), nil)
}()
msg1, file1, err := b.Receive()
if err != nil {
t.Fatal(err)
}
if string(msg1) != "message 1" {
t.Fatal("unexpected msg1:", string(msg1))
}
if file1 != nil {
t.Fatal("unexpectedly got file1")
}
msg2, file2, err := b.Receive()
if err != nil {
t.Fatal(err)
}
if string(msg2) != "message 2" {
t.Fatal("unexpected msg2:", string(msg2))
}
if file2 == nil {
t.Fatal("didn't get file2")
}
file2.Close()
msg3, file3, err := b.Receive()
if err != nil {
t.Fatal(err)
}
if string(msg3) != "message 3" {
t.Fatal("unexpected msg3:", string(msg3))
}
if file3 != nil {
t.Fatal("unexpectedly got file3")
}
}
// Test sending a zero byte message
func TestSendEmpty(t *testing.T) {
a, b, err := USocketPair()
if err != nil {
t.Fatal(err)
}
defer a.Close()
defer b.Close()
go func() {
a.Send([]byte{}, nil)
}()
msg, file, err := b.Receive()
if err != nil {
t.Fatal(err)
}
if len(msg) != 0 {
t.Fatalf("unexpected non-empty message: %v", msg)
}
if file != nil {
t.Fatal("unexpectedly got file")
}
}
func makeLarge(size int) []byte {
res := make([]byte, size)
for i := range res {
res[i] = byte(i % 255)
}
return res
}
func verifyLarge(data []byte, size int) bool {
if len(data) != size {
return false
}
for i := range data {
if data[i] != byte(i%255) {
return false
}
}
return true
}
// Test sending a large message
func TestSendLarge(t *testing.T) {
a, b, err := USocketPair()
if err != nil {
t.Fatal(err)
}
defer a.Close()
defer b.Close()
go func() {
a.Send(makeLarge(100000), nil)
}()
msg, file, err := b.Receive()
if err != nil {
t.Fatal(err)
}
if !verifyLarge(msg, 100000) {
t.Fatalf("unexpected message (size %d)", len(msg))
}
if file != nil {
t.Fatal("unexpectedly got file")
}
}

17
beam/utils/buf.go Normal file
View File

@ -0,0 +1,17 @@
package utils
import (
"github.com/docker/libswarm/beam"
)
type Buffer []*beam.Message
func (buf *Buffer) Send(msg *beam.Message) (beam.Receiver, error) {
(*buf) = append(*buf, msg)
return beam.NopReceiver{}, nil
}
func (buf *Buffer) Close() error {
(*buf) = nil
return nil
}

41
beam/utils/queue.go Normal file
View File

@ -0,0 +1,41 @@
package utils
import (
"github.com/docker/libswarm/beam"
)
type Queue struct {
*beam.PipeSender
dst beam.Sender
ch chan *beam.Message
}
func NewQueue(dst beam.Sender, size int) *Queue {
r, w := beam.Pipe()
q := &Queue{
PipeSender: w,
dst: dst,
ch: make(chan *beam.Message, size),
}
go func() {
defer close(q.ch)
for {
msg, err := r.Receive(beam.Ret)
if err != nil {
r.Close()
return
}
q.ch <- msg
}
}()
go func() {
for msg := range q.ch {
_, err := dst.Send(msg)
if err != nil {
r.Close()
return
}
}
}()
return q
}

43
beam/utils/queue_test.go Normal file
View File

@ -0,0 +1,43 @@
package utils
import (
"github.com/docker/libswarm/beam"
"testing"
)
func TestSendRet(t *testing.T) {
r, w := beam.Pipe()
defer r.Close()
defer w.Close()
q := NewQueue(w, 1)
defer q.Close()
ret, err := q.Send(&beam.Message{Verb: beam.Log, Args: []string{"ping"}, Ret: beam.RetPipe})
if err != nil {
t.Fatal(err)
}
go func() {
ping, err := r.Receive(beam.Ret)
if err != nil {
t.Fatal(err)
}
if _, err := ping.Ret.Send(&beam.Message{Verb: beam.Log, Args: []string{"pong"}}); err != nil {
t.Fatal(err)
}
}()
pong, err := ret.Receive(0)
if err != nil {
t.Fatal(err)
}
if pong.Verb != beam.Log {
t.Fatal(err)
}
}
func TestSendClose(t *testing.T) {
q := NewQueue(beam.NopSender{}, 1)
q.Send(&beam.Message{Verb: beam.Error, Args: []string{"hello"}})
q.Close()
if _, err := q.Send(&beam.Message{Verb: beam.Ack, Args: []string{"again"}}); err == nil {
t.Fatal("send on closed queue should return an error")
}
}

112
beam/utils/stack.go Normal file
View File

@ -0,0 +1,112 @@
package utils
import (
"container/list"
"fmt"
"github.com/docker/libswarm/beam"
"strings"
"sync"
)
// StackSender forwards beam messages to a dynamic list of backend receivers.
// New backends are stacked on top. When a message is sent, each backend is
// tried until one succeeds. Any failing backends encountered along the way
// are removed from the queue.
type StackSender struct {
stack *list.List
l sync.RWMutex
}
func NewStackSender() *StackSender {
stack := list.New()
return &StackSender{
stack: stack,
}
}
func (s *StackSender) Send(msg *beam.Message) (ret beam.Receiver, err error) {
completed := s.walk(func(h beam.Sender) (ok bool) {
ret, err = h.Send(msg)
fmt.Printf("[stacksender] sending %v to %#v returned %v\n", msg, h, err)
if err == nil {
return true
}
return false
})
// If walk was completed, it means we didn't find a valid handler
if !completed {
return ret, err
}
// Silently drop messages if no valid backend is available.
return beam.NopSender{}.Send(msg)
}
func (s *StackSender) Add(dst beam.Sender) *StackSender {
s.l.Lock()
defer s.l.Unlock()
prev := &StackSender{
stack: list.New(),
}
prev.stack.PushFrontList(s.stack)
fmt.Printf("[ADD] prev %#v\n", prev)
s.stack.PushFront(dst)
return prev
}
func (s *StackSender) Close() error {
s.walk(func(h beam.Sender) bool {
h.Close()
// remove all handlers
return false
})
return nil
}
func (s *StackSender) _walk(f func(*list.Element) bool) bool {
var e *list.Element
s.l.RLock()
e = s.stack.Front()
s.l.RUnlock()
for e != nil {
fmt.Printf("[StackSender.Walk] %v\n", e.Value.(beam.Sender))
s.l.RLock()
next := e.Next()
s.l.RUnlock()
cont := f(e)
if !cont {
return false
}
e = next
}
return true
}
func (s *StackSender) walk(f func(beam.Sender) bool) bool {
return s._walk(func(e *list.Element) bool {
ok := f(e.Value.(beam.Sender))
if ok {
// Found a valid handler. Stop walking.
return false
}
// Invalid handler: remove.
s.l.Lock()
s.stack.Remove(e)
s.l.Unlock()
return true
})
}
func (s *StackSender) Len() int {
s.l.RLock()
defer s.l.RUnlock()
return s.stack.Len()
}
func (s *StackSender) String() string {
var parts []string
s._walk(func(e *list.Element) bool {
parts = append(parts, fmt.Sprintf("%v", e.Value.(beam.Sender)))
return true
})
return fmt.Sprintf("%d:[%s]", len(parts), strings.Join(parts, "->"))
}

127
beam/utils/stack_test.go Normal file
View File

@ -0,0 +1,127 @@
package utils
import (
"github.com/docker/libswarm/beam"
"github.com/docker/libswarm/beam/unix"
"github.com/dotcloud/docker/pkg/testutils"
"strings"
"testing"
)
func TestStackWithPipe(t *testing.T) {
r, w := beam.Pipe()
defer r.Close()
defer w.Close()
s := NewStackSender()
s.Add(w)
testutils.Timeout(t, func() {
go func() {
msg, err := r.Receive(0)
if err != nil {
t.Fatal(err)
}
if msg.Verb != beam.Log {
t.Fatalf("%#v", msg)
}
if strings.Join(msg.Args, " ") != "wonderful world" {
t.Fatalf("%#v", msg)
}
}()
_, err := s.Send(&beam.Message{Verb: beam.Log, Args: []string{"wonderful", "world"}})
if err != nil {
t.Fatal(err)
}
})
}
func TestStackWithPair(t *testing.T) {
r, w, err := unix.Pair()
if err != nil {
t.Fatal(err)
}
defer r.Close()
defer w.Close()
s := NewStackSender()
s.Add(w)
testutils.Timeout(t, func() {
go func() {
msg, err := r.Receive(0)
if err != nil {
t.Fatal(err)
}
if msg.Verb != beam.Log {
t.Fatalf("%#v", msg)
}
if strings.Join(msg.Args, " ") != "wonderful world" {
t.Fatalf("%#v", msg)
}
}()
_, err := s.Send(&beam.Message{Verb: beam.Log, Args: []string{"wonderful", "world"}})
if err != nil {
t.Fatal(err)
}
})
}
func TestStackLen(t *testing.T) {
s := NewStackSender()
if s.Len() != 0 {
t.Fatalf("empty StackSender has length %d", s.Len())
}
}
func TestStackAdd(t *testing.T) {
s := NewStackSender()
a := Buffer{}
beforeA := s.Add(&a)
// Add on an empty StackSender should return an empty StackSender
if beforeA.Len() != 0 {
t.Fatalf("%s has %d elements", beforeA, beforeA.Len())
}
if s.Len() != 1 {
t.Fatalf("%#v", beforeA)
}
// Add a 2nd element
b := Buffer{}
beforeB := s.Add(&b)
if beforeB.Len() != 1 {
t.Fatalf("%#v", beforeA)
}
if s.Len() != 2 {
t.Fatalf("%#v", beforeA)
}
s.Send(&beam.Message{Verb: beam.Log, Args: []string{"for b"}})
beforeB.Send(&beam.Message{Verb: beam.Log, Args: []string{"for a"}})
beforeA.Send(&beam.Message{Verb: beam.Log, Args: []string{"for nobody"}})
if len(a) != 1 {
t.Fatalf("%#v", a)
}
if len(b) != 1 {
t.Fatalf("%#v", b)
}
}
// Misbehaving backends must be removed
func TestStackAddBad(t *testing.T) {
s := NewStackSender()
buf := Buffer{}
s.Add(&buf)
r, w := beam.Pipe()
s.Add(w)
if s.Len() != 2 {
t.Fatalf("%#v", s)
}
r.Close()
if _, err := s.Send(&beam.Message{Verb: beam.Log, Args: []string{"for the buffer"}}); err != nil {
t.Fatal(err)
}
if s.Len() != 1 {
t.Fatalf("%#v")
}
if len(buf) != 1 {
t.Fatalf("%#v", buf)
}
if buf[0].Args[0] != "for the buffer" {
t.Fatalf("%#v", buf)
}
}

51
beam/verbs.go Normal file
View File

@ -0,0 +1,51 @@
package beam
type Verb uint32
const (
Ack Verb = iota
Attach
Connect
Error
File
Get
Log
Ls
Set
Spawn
Start
Stop
Watch
)
func (v Verb) String() string {
switch v {
case Ack:
return "Ack"
case Attach:
return "Attach"
case Connect:
return "Connect"
case Error:
return "Error"
case File:
return "File"
case Get:
return "Get"
case Log:
return "Log"
case Ls:
return "Ls"
case Set:
return "Set"
case Spawn:
return "Spawn"
case Start:
return "Start"
case Stop:
return "Stop"
case Watch:
return "Watch"
}
return ""
}

View File

@ -0,0 +1,144 @@
package main
import (
"encoding/json"
"fmt"
"github.com/codegangsta/cli"
"github.com/docker/libswarm/backends"
"github.com/docker/libswarm/beam"
"github.com/dotcloud/docker/runconfig"
"github.com/dotcloud/docker/utils"
"io"
"os"
"strings"
)
func main() {
app := cli.NewApp()
app.Name = "swarmd"
app.Usage = "Control a heterogenous distributed system with the Docker API"
app.Version = "0.0.1"
app.Flags = []cli.Flag{
cli.StringFlag{"backend", "debug", "load a backend"},
}
app.Action = cmdDaemon
app.Run(os.Args)
}
func cmdDaemon(c *cli.Context) {
app := beam.NewServer()
app.OnLog(beam.Handler(func(msg *beam.Message) error {
utils.Debugf("%s", strings.Join(msg.Args, " "))
return nil
}))
app.OnError(beam.Handler(func(msg *beam.Message) error {
Fatalf("Fatal: %v", strings.Join(msg.Args[:1], ""))
return nil
}))
backend := beam.Object{backends.Forward()}
dockerHost := os.Getenv("DOCKER_HOST")
if dockerHost == "" {
dockerHost = "unix:///var/run/docker.sock"
}
instance, err := backend.Spawn(dockerHost)
if err != nil {
Fatalf("spawn: %v\n", err)
}
instanceR, instanceW, err := instance.Attach("")
if err != nil {
Fatalf("attach: %v", err)
}
defer instanceW.Close()
go beam.Copy(app, instanceR)
if err := instance.Start(); err != nil {
Fatalf("start: %v", err)
}
err = doCmd(instance, c.Args())
if err != nil {
Fatalf("%v", err)
}
}
func doCmd(instance *beam.Object, args []string) error {
if len(args) == 0 {
return fmt.Errorf("no command supplied")
}
if args[0] == "ps" {
if len(args) != 1 {
return fmt.Errorf("usage: ps")
}
names, err := instance.Ls()
if err != nil {
return err
}
fmt.Println(strings.Join(names, "\n"))
return nil
}
if args[0] == "run" {
if len(args) < 3 {
return fmt.Errorf("usage: run IMAGE COMMAND...")
}
containerJson, err := json.Marshal(&runconfig.Config{
Image: args[1],
Cmd: args[2:],
AttachStdin: false,
AttachStdout: true,
AttachStderr: true,
})
if err != nil {
return err
}
container, err := instance.Spawn(string(containerJson))
if err != nil {
return fmt.Errorf("spawn: %v", err)
}
logs, _, err := container.Attach("")
if err != nil {
return fmt.Errorf("attach: %v", err)
}
if err = container.Start(); err != nil {
return fmt.Errorf("start: %v", err)
}
for {
msg, err := logs.Receive(beam.Ret)
if err != nil {
if err.Error() == "EOF" {
break
}
return fmt.Errorf("error reading from container: %v", err)
}
if msg.Verb != beam.Log {
return fmt.Errorf("unexpected message reading from container: %v", msg)
}
if len(msg.Args) != 2 {
return fmt.Errorf("expected exactly 2 args to log message, got %d", len(msg.Args))
}
tag, chunk := msg.Args[0], msg.Args[1]
var stream io.Writer
if tag == "stdout" {
stream = os.Stdout
} else if tag == "stderr" {
stream = os.Stderr
} else {
return fmt.Errorf("unrecognised tag: %s", tag)
}
fmt.Fprint(stream, chunk)
}
return nil
}
return fmt.Errorf("unrecognised command: %s", args[0])
}
func Fatalf(msg string, args ...interface{}) {
if !strings.HasSuffix(msg, "\n") {
msg = msg + "\n"
}
fmt.Fprintf(os.Stderr, msg, args...)
os.Exit(1)
}

View File

@ -32,6 +32,10 @@
"Comment": "0-4-g2fb21b3", "Comment": "0-4-g2fb21b3",
"Rev": "2fb21b34171f083d46d66195caa7ec121d941ec5" "Rev": "2fb21b34171f083d46d66195caa7ec121d941ec5"
}, },
{
"ImportPath": "github.com/docker/spdystream",
"Rev": "87969a9c4b79508e63004ac17a97eb9278c08b3c"
},
{ {
"ImportPath": "github.com/dotcloud/docker/api", "ImportPath": "github.com/dotcloud/docker/api",
"Comment": "v0.11.1-466-g77ae37a", "Comment": "v0.11.1-466-g77ae37a",

2
swarmd/TODO Normal file
View File

@ -0,0 +1,2 @@
- Hub us dropping messages since API refactor, probably because of Queue.

View File

@ -4,10 +4,11 @@ import (
"fmt" "fmt"
"github.com/codegangsta/cli" "github.com/codegangsta/cli"
"github.com/docker/libswarm/backends" "github.com/docker/libswarm/backends"
"github.com/dotcloud/docker/api/server" "github.com/docker/libswarm/beam"
"github.com/dotcloud/docker/engine" _ "github.com/dotcloud/docker/api/server"
"github.com/flynn/go-shlex" "github.com/flynn/go-shlex"
"io" "io"
"log"
"os" "os"
"strings" "strings"
) )
@ -15,72 +16,66 @@ import (
func main() { func main() {
app := cli.NewApp() app := cli.NewApp()
app.Name = "swarmd" app.Name = "swarmd"
app.Usage = "a minimalist toolkit to compose network services" app.Usage = "Compose distributed systems from lightweight services"
app.Version = "0.0.1" app.Version = "0.0.1"
app.Flags = []cli.Flag{ app.Flags = []cli.Flag{}
cli.StringFlag{"backend", "debug", "load a backend"},
}
app.Action = cmdDaemon app.Action = cmdDaemon
app.Run(os.Args) app.Run(os.Args)
} }
func cmdDaemon(c *cli.Context) { func cmdDaemon(c *cli.Context) {
if len(c.Args()) == 0 { app := beam.NewServer()
Fatalf("Usage: %s [OPTIONS] <proto>://<address> [<proto>://<address>]...\n", c.App.Name) app.OnLog(beam.Handler(func(msg *beam.Message) error {
} log.Printf("%s\n", strings.Join(msg.Args, " "))
return nil
// Load backend }))
// FIXME: allow for multiple backends to be loaded. app.OnError(beam.Handler(func(msg *beam.Message) error {
// This could be done by instantiating 1 engine per backend, Fatalf("Fatal: %v", strings.Join(msg.Args[:1], ""))
// installing each backend in its respective engine, return nil
// then registering a Catchall on the frontent engine which }))
// multiplexes across all backends (with routing / filtering
// logic along the way).
back := backends.New() back := backends.New()
bName, bArgs, err := parseCmd(c.String("backend")) if len(c.Args()) == 0 {
names, err := back.Ls()
if err != nil {
Fatalf("ls: %v", err)
}
fmt.Println(strings.Join(names, "\n"))
return
}
var previousInstanceR beam.Receiver
// FIXME: refactor into a Pipeline
for idx, backendArg := range c.Args() {
bName, bArgs, err := parseCmd(backendArg)
if err != nil {
Fatalf("parse: %v", err)
}
_, backend, err := back.Attach(bName)
if err != nil {
Fatalf("%s: %v\n", bName, err)
}
instance, err := backend.Spawn(bArgs...)
if err != nil {
Fatalf("spawn %s: %v\n", bName, err)
}
instanceR, instanceW, err := instance.Attach("")
if err != nil {
Fatalf("attach: %v", err)
}
go func(r beam.Receiver, w beam.Sender, idx int) {
if r != nil {
beam.Copy(w, r)
}
w.Close()
}(previousInstanceR, instanceW, idx)
if err := instance.Start(); err != nil {
Fatalf("start: %v", err)
}
previousInstanceR = instanceR
}
_, err := beam.Copy(app, previousInstanceR)
if err != nil { if err != nil {
Fatalf("%v", err) Fatalf("copy: %v", err)
} }
fmt.Printf("---> Loading backend '%s'\n", strings.Join(append([]string{bName}, bArgs...), " "))
if err := back.Job(bName, bArgs...).Run(); err != nil {
Fatalf("%s: %v\n", bName, err)
}
// Register the API entrypoint
// (we register it as `argv[0]` so we can print usage messages straight from the job
// stderr.
front := engine.New()
front.Logging = false
// FIXME: server should expose an engine.Installer
front.Register(c.App.Name, server.ServeApi)
front.Register("acceptconnections", server.AcceptConnections)
front.RegisterCatchall(func(job *engine.Job) engine.Status {
fw := back.Job(job.Name, job.Args...)
fw.Stdout.Add(job.Stdout)
fw.Stderr.Add(job.Stderr)
fw.Stdin.Add(job.Stdin)
for key, val := range job.Env().Map() {
fw.Setenv(key, val)
}
fw.Run()
return engine.Status(fw.StatusCode())
})
// Call the API entrypoint
go func() {
serve := front.Job(c.App.Name, c.Args()...)
serve.Stdout.Add(os.Stdout)
serve.Stderr.Add(os.Stderr)
if err := serve.Run(); err != nil {
Fatalf("serveapi: %v", err)
}
}()
// Notify that we're ready to receive connections
if err := front.Job("acceptconnections").Run(); err != nil {
Fatalf("acceptconnections: %v", err)
}
// Inifinite loop
<-make(chan struct{})
} }
func parseCmd(txt string) (string, []string, error) { func parseCmd(txt string) (string, []string, error) {