Merge beam into libswarm

Signed-off-by: Solomon Hykes <solomon@docker.com>
This commit is contained in:
Solomon Hykes 2014-06-01 23:49:08 +00:00
commit 91ae1d7353
41 changed files with 3711 additions and 0 deletions

1
beam/AUTHORS Normal file
View File

@ -0,0 +1 @@
Solomon Hykes <solomon@docker.com>

1
beam/MAINTAINERS Normal file
View File

@ -0,0 +1 @@
Solomon Hykes <solomon@docker.com> (@shykes)

79
beam/README.md Normal file
View File

@ -0,0 +1,79 @@
# Beam
## A library to break down an application into loosely coupled micro-services
Beam is a library to turn your application into a collection of loosely coupled micro-services.
It implements an ultra-lightweight hub for the different components of an application
to discover and consume each other, either in-memory or across the network.
Beam can be embedded with very little overhead by using Go channels. It
also implements an efficient http2/tls transport which can be used to
securely expose and consume any micro-service across a distributed system.
Because remote Beam sessions are regular HTTP2 over TLS sessions, they can
be used in combination with any standard proxy or authentication
middleware. This means Beam, when configured propely, can be safely exposed
on the public Internet. It can also be embedded in an existing rest API
using an http1 and websocket fallback.
## How is it different from RPC or REST?
Modern micro-services are not a great fit for classical RPC or REST
protocols because they often rely heavily on events, bi-directional
communication, stream multiplexing, and some form of data synchronization.
Sometimes these services have a component which requires raw socket access,
either for performance (file transfer, event firehose, database access) or
simply because they have their own protocol (dns, smtp, sql, ssh,
zeromq, etc). These components typically need a separate set of tools
because they are outside the scope of the REST and RPC tools. If there is
also a websocket or ServerEvents transport, those require yet another layer
of tools.
Instead of a clunky patchwork of tools, Beam implements in a single
minimalistic library all the primitives needed by modern micro-services:
* Request/response with arbitrary structured data
* Asynchronous events flowing in real-time in both directions
* Requests and responses can flow in any direction, and can be arbitrarily
nested, for example to implement a self-registering worker model
* Any request or response can include any number of streams, multiplexed in
both directions on the same session.
* Any message serialization format can be plugged in: json, msgpack, xml,
protobuf.
* As an optional convenience a minimalist key-value format is implemented.
It is designed to be extremely fast to serialize and parse, dead-simple to
implement, and suitable for both one-time data copy, file storage, and
real-time synchronization.
* Raw file descriptors can be "attached" to any message, and passed under
the hood using the best method available to each transport. The Go channel
transport just passes os.File pointers around. The unix socket transport
uses fd passing which makes it suitable for high-performance IPC. The
tcp transport uses dedicated http2 streams. And as a bonus extension, a
built-in tcp gateway can be used to proxy raw network sockets without
extra overhead. That means Beam services can be used as smart gateways to a
sql database, ssh or file transfer service, with unified auth, discovery
and tooling and without performance penalty.
## Design philosophy
An explicit goal of Beam is simplicity of implementation and clarity of
spec. Porting it to any language should be as effortless as humanly
possible.
## Creators
**Solomon Hykes**
- <http://twitter.com/solomonstre>
- <http://github.com/shykes>
## Copyright and license
Code and documentation copyright 2013-2014 Docker, inc. Code released under the Apache 2.0 license.
Docs released under Creative commons.

39
beam/beam.go Normal file
View File

@ -0,0 +1,39 @@
package beam
import (
"errors"
"os"
)
type Sender interface {
Send(msg *Message, mode int) (Receiver, Sender, error)
Close() error
}
type Receiver interface {
Receive(mode int) (*Message, Receiver, Sender, error)
}
type Message struct {
Name string
Args []string
Att *os.File
}
const (
R = 1 << (32 - 1 - iota)
W
)
type ReceiverFrom interface {
ReceiveFrom(Receiver) (int, error)
}
type SenderTo interface {
SendTo(Sender) (int, error)
}
var (
ErrIncompatibleSender = errors.New("incompatible sender")
ErrIncompatibleReceiver = errors.New("incompatible receiver")
)

17
beam/beam_test.go Normal file
View File

@ -0,0 +1,17 @@
package beam
import (
"testing"
)
func TestModes(t *testing.T) {
if R == W {
t.Fatalf("0")
}
if R == 0 {
t.Fatalf("0")
}
if W == 0 {
t.Fatalf("0")
}
}

119
beam/data/data.go Normal file
View File

@ -0,0 +1,119 @@
package data
import (
"fmt"
"strconv"
"strings"
)
func Encode(obj map[string][]string) string {
var msg string
msg += encodeHeader(0)
for k, values := range obj {
msg += encodeNamedList(k, values)
}
return msg
}
func encodeHeader(msgtype int) string {
return fmt.Sprintf("%03.3d;", msgtype)
}
func encodeString(s string) string {
return fmt.Sprintf("%d:%s,", len(s), s)
}
var EncodeString = encodeString
var DecodeString = decodeString
var EncodeList = encodeList
func encodeList(l []string) string {
values := make([]string, 0, len(l))
for _, s := range l {
values = append(values, encodeString(s))
}
return encodeString(strings.Join(values, ""))
}
func encodeNamedList(name string, l []string) string {
return encodeString(name) + encodeList(l)
}
func Decode(msg string) (map[string][]string, error) {
msgtype, skip, err := decodeHeader(msg)
if err != nil {
return nil, err
}
if msgtype != 0 {
// FIXME: use special error type so the caller can easily ignore
return nil, fmt.Errorf("unknown message type: %d", msgtype)
}
msg = msg[skip:]
obj := make(map[string][]string)
for len(msg) > 0 {
k, skip, err := decodeString(msg)
if err != nil {
return nil, err
}
msg = msg[skip:]
values, skip, err := decodeList(msg)
if err != nil {
return nil, err
}
msg = msg[skip:]
obj[k] = values
}
return obj, nil
}
var DecodeList = decodeList
func decodeList(msg string) ([]string, int, error) {
blob, skip, err := decodeString(msg)
if err != nil {
return nil, 0, err
}
var l []string
for len(blob) > 0 {
v, skipv, err := decodeString(blob)
if err != nil {
return nil, 0, err
}
l = append(l, v)
blob = blob[skipv:]
}
return l, skip, nil
}
func decodeString(msg string) (string, int, error) {
parts := strings.SplitN(msg, ":", 2)
if len(parts) != 2 {
return "", 0, fmt.Errorf("invalid format: no column")
}
var length int
if l, err := strconv.ParseUint(parts[0], 10, 64); err != nil {
return "", 0, err
} else {
length = int(l)
}
if len(parts[1]) < length+1 {
return "", 0, fmt.Errorf("message '%s' is %d bytes, expected at least %d", parts[1], len(parts[1]), length+1)
}
payload := parts[1][:length+1]
if payload[length] != ',' {
return "", 0, fmt.Errorf("message is not comma-terminated")
}
return payload[:length], len(parts[0]) + 1 + length + 1, nil
}
func decodeHeader(msg string) (int, int, error) {
if len(msg) < 4 {
return 0, 0, fmt.Errorf("message too small")
}
msgtype, err := strconv.ParseInt(msg[:3], 10, 32)
if err != nil {
return 0, 0, err
}
return int(msgtype), 4, nil
}

129
beam/data/data_test.go Normal file
View File

@ -0,0 +1,129 @@
package data
import (
"strings"
"testing"
)
func TestEncodeHelloWorld(t *testing.T) {
input := "hello world!"
output := encodeString(input)
expectedOutput := "12:hello world!,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestEncodeEmptyString(t *testing.T) {
input := ""
output := encodeString(input)
expectedOutput := "0:,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestEncodeEmptyList(t *testing.T) {
input := []string{}
output := encodeList(input)
expectedOutput := "0:,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestEncodeEmptyMap(t *testing.T) {
input := make(map[string][]string)
output := Encode(input)
expectedOutput := "000;"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestEncode1Key1Value(t *testing.T) {
input := make(map[string][]string)
input["hello"] = []string{"world"}
output := Encode(input)
expectedOutput := "000;5:hello,8:5:world,,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestEncode1Key2Value(t *testing.T) {
input := make(map[string][]string)
input["hello"] = []string{"beautiful", "world"}
output := Encode(input)
expectedOutput := "000;5:hello,20:9:beautiful,5:world,,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestEncodeEmptyValue(t *testing.T) {
input := make(map[string][]string)
input["foo"] = []string{}
output := Encode(input)
expectedOutput := "000;3:foo,0:,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestEncodeBinaryKey(t *testing.T) {
input := make(map[string][]string)
input["foo\x00bar\x7f"] = []string{}
output := Encode(input)
expectedOutput := "000;8:foo\x00bar\x7f,0:,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestEncodeBinaryValue(t *testing.T) {
input := make(map[string][]string)
input["foo\x00bar\x7f"] = []string{"\x01\x02\x03\x04"}
output := Encode(input)
expectedOutput := "000;8:foo\x00bar\x7f,7:4:\x01\x02\x03\x04,,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestDecodeString(t *testing.T) {
validEncodedStrings := []struct {
input string
output string
skip int
}{
{"3:foo,", "foo", 6},
{"5:hello,", "hello", 8},
{"5:hello,5:world,", "hello", 8},
}
for _, sample := range validEncodedStrings {
output, skip, err := decodeString(sample.input)
if err != nil {
t.Fatalf("error decoding '%v': %v", sample.input, err)
}
if skip != sample.skip {
t.Fatalf("invalid skip: %v!=%v", skip, sample.skip)
}
if output != sample.output {
t.Fatalf("invalid output: %v!=%v", output, sample.output)
}
}
}
func TestDecode1Key1Value(t *testing.T) {
input := "000;3:foo,6:3:bar,,"
output, err := Decode(input)
if err != nil {
t.Fatal(err)
}
if v, exists := output["foo"]; !exists {
t.Fatalf("wrong output: %v\n", output)
} else if len(v) != 1 || strings.Join(v, "") != "bar" {
t.Fatalf("wrong output: %v\n", output)
}
}

93
beam/data/message.go Normal file
View File

@ -0,0 +1,93 @@
package data
import (
"fmt"
"strings"
)
type Message string
func Empty() Message {
return Message(Encode(nil))
}
func Parse(args []string) Message {
data := make(map[string][]string)
for _, word := range args {
if strings.Contains(word, "=") {
kv := strings.SplitN(word, "=", 2)
key := kv[0]
var val string
if len(kv) == 2 {
val = kv[1]
}
data[key] = []string{val}
}
}
return Message(Encode(data))
}
func (m Message) Add(k, v string) Message {
data, err := Decode(string(m))
if err != nil {
return m
}
if values, exists := data[k]; exists {
data[k] = append(values, v)
} else {
data[k] = []string{v}
}
return Message(Encode(data))
}
func (m Message) Set(k string, v ...string) Message {
data, err := Decode(string(m))
if err != nil {
panic(err)
return m
}
data[k] = v
return Message(Encode(data))
}
func (m Message) Del(k string) Message {
data, err := Decode(string(m))
if err != nil {
panic(err)
return m
}
delete(data, k)
return Message(Encode(data))
}
func (m Message) Get(k string) []string {
data, err := Decode(string(m))
if err != nil {
return nil
}
v, exists := data[k]
if !exists {
return nil
}
return v
}
func (m Message) Pretty() string {
data, err := Decode(string(m))
if err != nil {
return ""
}
entries := make([]string, 0, len(data))
for k, values := range data {
entries = append(entries, fmt.Sprintf("%s=%s", k, strings.Join(values, ",")))
}
return strings.Join(entries, " ")
}
func (m Message) String() string {
return string(m)
}
func (m Message) Bytes() []byte {
return []byte(m)
}

53
beam/data/message_test.go Normal file
View File

@ -0,0 +1,53 @@
package data
import (
"testing"
)
func TestEmptyMessage(t *testing.T) {
m := Empty()
if m.String() != Encode(nil) {
t.Fatalf("%v != %v", m.String(), Encode(nil))
}
}
func TestSetMessage(t *testing.T) {
m := Empty().Set("foo", "bar")
output := m.String()
expectedOutput := "000;3:foo,6:3:bar,,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
decodedOutput, err := Decode(output)
if err != nil {
t.Fatal(err)
}
if len(decodedOutput) != 1 {
t.Fatalf("wrong output data: %#v\n", decodedOutput)
}
}
func TestSetMessageTwice(t *testing.T) {
m := Empty().Set("foo", "bar").Set("ga", "bu")
output := m.String()
expectedOutput := "000;3:foo,6:3:bar,,2:ga,5:2:bu,,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
decodedOutput, err := Decode(output)
if err != nil {
t.Fatal(err)
}
if len(decodedOutput) != 2 {
t.Fatalf("wrong output data: %#v\n", decodedOutput)
}
}
func TestSetDelMessage(t *testing.T) {
m := Empty().Set("foo", "bar").Del("foo")
output := m.String()
expectedOutput := Encode(nil)
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}

92
beam/data/netstring.txt Normal file
View File

@ -0,0 +1,92 @@
##
## Netstrings spec copied as-is from http://cr.yp.to/proto/netstrings.txt
##
Netstrings
D. J. Bernstein, djb@pobox.com
19970201
1. Introduction
A netstring is a self-delimiting encoding of a string. Netstrings are
very easy to generate and to parse. Any string may be encoded as a
netstring; there are no restrictions on length or on allowed bytes.
Another virtue of a netstring is that it declares the string size up
front. Thus an application can check in advance whether it has enough
space to store the entire string.
Netstrings may be used as a basic building block for reliable network
protocols. Most high-level protocols, in effect, transmit a sequence
of strings; those strings may be encoded as netstrings and then
concatenated into a sequence of characters, which in turn may be
transmitted over a reliable stream protocol such as TCP.
Note that netstrings can be used recursively. The result of encoding
a sequence of strings is a single string. A series of those encoded
strings may in turn be encoded into a single string. And so on.
In this document, a string of 8-bit bytes may be written in two
different forms: as a series of hexadecimal numbers between angle
brackets, or as a sequence of ASCII characters between double quotes.
For example, <68 65 6c 6c 6f 20 77 6f 72 6c 64 21> is a string of
length 12; it is the same as the string "hello world!".
Although this document restricts attention to strings of 8-bit bytes,
netstrings could be used with any 6-bit-or-larger character set.
2. Definition
Any string of 8-bit bytes may be encoded as [len]":"[string]",".
Here [string] is the string and [len] is a nonempty sequence of ASCII
digits giving the length of [string] in decimal. The ASCII digits are
<30> for 0, <31> for 1, and so on up through <39> for 9. Extra zeros
at the front of [len] are prohibited: [len] begins with <30> exactly
when [string] is empty.
For example, the string "hello world!" is encoded as <31 32 3a 68
65 6c 6c 6f 20 77 6f 72 6c 64 21 2c>, i.e., "12:hello world!,". The
empty string is encoded as "0:,".
[len]":"[string]"," is called a netstring. [string] is called the
interpretation of the netstring.
3. Sample code
The following C code starts with a buffer buf of length len and
prints it as a netstring.
if (printf("%lu:",len) < 0) barf();
if (fwrite(buf,1,len,stdout) < len) barf();
if (putchar(',') < 0) barf();
The following C code reads a netstring and decodes it into a
dynamically allocated buffer buf of length len.
if (scanf("%9lu",&len) < 1) barf(); /* >999999999 bytes is bad */
if (getchar() != ':') barf();
buf = malloc(len + 1); /* malloc(0) is not portable */
if (!buf) barf();
if (fread(buf,1,len,stdin) < len) barf();
if (getchar() != ',') barf();
Both of these code fragments assume that the local character set is
ASCII, and that the relevant stdio streams are in binary mode.
4. Security considerations
The famous Finger security hole may be blamed on Finger's use of the
CRLF encoding. In that encoding, each string is simply terminated by
CRLF. This encoding has several problems. Most importantly, it does
not declare the string size in advance. This means that a correct
CRLF parser must be prepared to ask for more and more memory as it is
reading the string. In the case of Finger, a lazy implementor found
this to be too much trouble; instead he simply declared a fixed-size
buffer and used C's gets() function. The rest is history.
In contrast, as the above sample code shows, it is very easy to
handle netstrings without risking buffer overflow. Thus widespread
use of netstrings may improve network security.

BIN
beam/examples/beamsh/beamsh Executable file

Binary file not shown.

View File

@ -0,0 +1,542 @@
package main
import (
"bufio"
"flag"
"fmt"
"github.com/dotcloud/docker/pkg/beam"
"github.com/dotcloud/docker/pkg/beam/data"
"github.com/dotcloud/docker/pkg/dockerscript"
"github.com/dotcloud/docker/pkg/term"
"io"
"net"
"net/url"
"os"
"path"
"strings"
"sync"
)
var rootPlugins = []string{
"stdio",
}
var (
flX bool
flPing bool
introspect beam.ReceiveSender = beam.Devnull()
)
func main() {
fd3 := os.NewFile(3, "beam-introspect")
if introsp, err := beam.FileConn(fd3); err == nil {
introspect = introsp
Logf("introspection enabled\n")
} else {
Logf("introspection disabled\n")
}
fd3.Close()
flag.BoolVar(&flX, "x", false, "print commands as they are being executed")
flag.Parse()
if flag.NArg() == 0 {
if term.IsTerminal(0) {
// No arguments, stdin is terminal --> interactive mode
input := bufio.NewScanner(os.Stdin)
for {
fmt.Printf("[%d] beamsh> ", os.Getpid())
if !input.Scan() {
break
}
line := input.Text()
if len(line) != 0 {
cmd, err := dockerscript.Parse(strings.NewReader(line))
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
continue
}
if err := executeRootScript(cmd); err != nil {
Fatal(err)
}
}
if err := input.Err(); err == io.EOF {
break
} else if err != nil {
Fatal(err)
}
}
} else {
// No arguments, stdin not terminal --> batch mode
script, err := dockerscript.Parse(os.Stdin)
if err != nil {
Fatal("parse error: %v\n", err)
}
if err := executeRootScript(script); err != nil {
Fatal(err)
}
}
} else {
// 1+ arguments: parse them as script files
for _, scriptpath := range flag.Args() {
f, err := os.Open(scriptpath)
if err != nil {
Fatal(err)
}
script, err := dockerscript.Parse(f)
if err != nil {
Fatal("parse error: %v\n", err)
}
if err := executeRootScript(script); err != nil {
Fatal(err)
}
}
}
}
func executeRootScript(script []*dockerscript.Command) error {
if len(rootPlugins) > 0 {
// If there are root plugins, wrap the script inside them
var (
rootCmd *dockerscript.Command
lastCmd *dockerscript.Command
)
for _, plugin := range rootPlugins {
pluginCmd := &dockerscript.Command{
Args: []string{plugin},
}
if rootCmd == nil {
rootCmd = pluginCmd
} else {
lastCmd.Children = []*dockerscript.Command{pluginCmd}
}
lastCmd = pluginCmd
}
lastCmd.Children = script
script = []*dockerscript.Command{rootCmd}
}
handlers, err := Handlers(introspect)
if err != nil {
return err
}
defer handlers.Close()
var tasks sync.WaitGroup
defer func() {
Debugf("Waiting for introspection...\n")
tasks.Wait()
Debugf("DONE Waiting for introspection\n")
}()
if introspect != nil {
tasks.Add(1)
go func() {
Debugf("starting introspection\n")
defer Debugf("done with introspection\n")
defer tasks.Done()
introspect.Send(data.Empty().Set("cmd", "log", "stdout").Set("message", "introspection worked!").Bytes(), nil)
Debugf("XXX starting reading introspection messages\n")
r := beam.NewRouter(handlers)
r.NewRoute().All().Handler(func(p []byte, a *os.File) error {
Logf("[INTROSPECTION] %s\n", beam.MsgDesc(p, a))
return handlers.Send(p, a)
})
n, err := beam.Copy(r, introspect)
Debugf("XXX done reading %d introspection messages: %v\n", n, err)
}()
}
if err := executeScript(handlers, script); err != nil {
return err
}
return nil
}
func executeScript(out beam.Sender, script []*dockerscript.Command) error {
Debugf("executeScript(%s)\n", scriptString(script))
defer Debugf("executeScript(%s) DONE\n", scriptString(script))
var background sync.WaitGroup
defer background.Wait()
for _, cmd := range script {
if cmd.Background {
background.Add(1)
go func(out beam.Sender, cmd *dockerscript.Command) {
executeCommand(out, cmd)
background.Done()
}(out, cmd)
} else {
if err := executeCommand(out, cmd); err != nil {
return err
}
}
}
return nil
}
// 1) Find a handler for the command (if no handler, fail)
// 2) Attach new in & out pair to the handler
// 3) [in the background] Copy handler output to our own output
// 4) [in the background] Run the handler
// 5) Recursively executeScript() all children commands and wait for them to complete
// 6) Wait for handler to return and (shortly afterwards) output copy to complete
// 7) Profit
func executeCommand(out beam.Sender, cmd *dockerscript.Command) error {
if flX {
fmt.Printf("+ %v\n", strings.Replace(strings.TrimRight(cmd.String(), "\n"), "\n", "\n+ ", -1))
}
Debugf("executeCommand(%s)\n", strings.Join(cmd.Args, " "))
defer Debugf("executeCommand(%s) DONE\n", strings.Join(cmd.Args, " "))
if len(cmd.Args) == 0 {
return fmt.Errorf("empty command")
}
Debugf("[executeCommand] sending job '%s'\n", strings.Join(cmd.Args, " "))
job, err := beam.SendConn(out, data.Empty().Set("cmd", cmd.Args...).Set("type", "job").Bytes())
if err != nil {
return fmt.Errorf("%v\n", err)
}
var tasks sync.WaitGroup
tasks.Add(1)
Debugf("[executeCommand] spawning background copy of the output of '%s'\n", strings.Join(cmd.Args, " "))
go func() {
if out != nil {
Debugf("[executeCommand] background copy of the output of '%s'\n", strings.Join(cmd.Args, " "))
n, err := beam.Copy(out, job)
if err != nil {
Fatalf("[executeCommand] [%s] error during background copy: %v\n", strings.Join(cmd.Args, " "), err)
}
Debugf("[executeCommand] background copy done of the output of '%s': copied %d messages\n", strings.Join(cmd.Args, " "), n)
}
tasks.Done()
}()
// depth-first execution of children commands
// executeScript() blocks until all commands are completed
Debugf("[executeCommand] recursively running children of '%s'\n", strings.Join(cmd.Args, " "))
executeScript(job, cmd.Children)
Debugf("[executeCommand] DONE recursively running children of '%s'\n", strings.Join(cmd.Args, " "))
job.CloseWrite()
Debugf("[executeCommand] closing the input of '%s' (all children are completed)\n", strings.Join(cmd.Args, " "))
Debugf("[executeCommand] waiting for background copy of '%s' to complete...\n", strings.Join(cmd.Args, " "))
tasks.Wait()
Debugf("[executeCommand] background copy of '%s' complete! This means the job completed.\n", strings.Join(cmd.Args, " "))
return nil
}
type Handler func([]string, io.Writer, io.Writer, beam.Receiver, beam.Sender)
func Handlers(sink beam.Sender) (*beam.UnixConn, error) {
var tasks sync.WaitGroup
pub, priv, err := beam.USocketPair()
if err != nil {
return nil, err
}
go func() {
defer func() {
Debugf("[handlers] closewrite() on endpoint\n")
// FIXME: this is not yet necessary but will be once
// there is synchronization over standard beam messages
priv.CloseWrite()
Debugf("[handlers] done closewrite() on endpoint\n")
}()
r := beam.NewRouter(sink)
r.NewRoute().HasAttachment().KeyIncludes("type", "job").Handler(func(payload []byte, attachment *os.File) error {
conn, err := beam.FileConn(attachment)
if err != nil {
attachment.Close()
return err
}
// attachment.Close()
tasks.Add(1)
go func() {
defer tasks.Done()
defer func() {
Debugf("[handlers] '%s' closewrite\n", payload)
conn.CloseWrite()
Debugf("[handlers] '%s' done closewrite\n", payload)
}()
cmd := data.Message(payload).Get("cmd")
Debugf("[handlers] received %s\n", strings.Join(cmd, " "))
if len(cmd) == 0 {
return
}
handler := GetHandler(cmd[0])
if handler == nil {
return
}
stdout, err := beam.SendRPipe(conn, data.Empty().Set("cmd", "log", "stdout").Set("fromcmd", cmd...).Bytes())
if err != nil {
return
}
defer stdout.Close()
stderr, err := beam.SendRPipe(conn, data.Empty().Set("cmd", "log", "stderr").Set("fromcmd", cmd...).Bytes())
if err != nil {
return
}
defer stderr.Close()
Debugf("[handlers] calling %s\n", strings.Join(cmd, " "))
handler(cmd, stdout, stderr, beam.Receiver(conn), beam.Sender(conn))
Debugf("[handlers] returned: %s\n", strings.Join(cmd, " "))
}()
return nil
})
beam.Copy(r, priv)
Debugf("[handlers] waiting for all tasks\n")
tasks.Wait()
Debugf("[handlers] all tasks returned\n")
}()
return pub, nil
}
func GetHandler(name string) Handler {
if name == "logger" {
return CmdLogger
} else if name == "render" {
return CmdRender
} else if name == "devnull" {
return CmdDevnull
} else if name == "prompt" {
return CmdPrompt
} else if name == "stdio" {
return CmdStdio
} else if name == "echo" {
return CmdEcho
} else if name == "pass" {
return CmdPass
} else if name == "in" {
return CmdIn
} else if name == "exec" {
return CmdExec
} else if name == "trace" {
return CmdTrace
} else if name == "emit" {
return CmdEmit
} else if name == "print" {
return CmdPrint
} else if name == "multiprint" {
return CmdMultiprint
} else if name == "listen" {
return CmdListen
} else if name == "beamsend" {
return CmdBeamsend
} else if name == "beamreceive" {
return CmdBeamreceive
} else if name == "connect" {
return CmdConnect
} else if name == "openfile" {
return CmdOpenfile
} else if name == "spawn" {
return CmdSpawn
} else if name == "chdir" {
return CmdChdir
}
return nil
}
// VARIOUS HELPER FUNCTIONS:
func connToFile(conn net.Conn) (f *os.File, err error) {
if connWithFile, ok := conn.(interface {
File() (*os.File, error)
}); !ok {
return nil, fmt.Errorf("no file descriptor available")
} else {
f, err = connWithFile.File()
if err != nil {
return nil, err
}
}
return f, err
}
type Msg struct {
payload []byte
attachment *os.File
}
func Logf(msg string, args ...interface{}) (int, error) {
if len(msg) == 0 || msg[len(msg)-1] != '\n' {
msg = msg + "\n"
}
msg = fmt.Sprintf("[%v] [%v] %s", os.Getpid(), path.Base(os.Args[0]), msg)
return fmt.Printf(msg, args...)
}
func Debugf(msg string, args ...interface{}) {
if os.Getenv("BEAMDEBUG") != "" {
Logf(msg, args...)
}
}
func Fatalf(msg string, args ...interface{}) {
Logf(msg, args...)
os.Exit(1)
}
func Fatal(args ...interface{}) {
Fatalf("%v", args[0])
}
func scriptString(script []*dockerscript.Command) string {
lines := make([]string, 0, len(script))
for _, cmd := range script {
line := strings.Join(cmd.Args, " ")
if len(cmd.Children) > 0 {
line += fmt.Sprintf(" { %s }", scriptString(cmd.Children))
} else {
line += " {}"
}
lines = append(lines, line)
}
return fmt.Sprintf("'%s'", strings.Join(lines, "; "))
}
func dialer(addr string) (chan net.Conn, error) {
u, err := url.Parse(addr)
if err != nil {
return nil, err
}
connections := make(chan net.Conn)
go func() {
defer close(connections)
for {
conn, err := net.Dial(u.Scheme, u.Host)
if err != nil {
return
}
connections <- conn
}
}()
return connections, nil
}
func listener(addr string) (chan net.Conn, error) {
u, err := url.Parse(addr)
if err != nil {
return nil, err
}
l, err := net.Listen(u.Scheme, u.Host)
if err != nil {
return nil, err
}
connections := make(chan net.Conn)
go func() {
defer close(connections)
for {
conn, err := l.Accept()
if err != nil {
return
}
Logf("new connection\n")
connections <- conn
}
}()
return connections, nil
}
func SendToConn(connections chan net.Conn, src beam.Receiver) error {
var tasks sync.WaitGroup
defer tasks.Wait()
for {
payload, attachment, err := src.Receive()
if err == io.EOF {
return nil
} else if err != nil {
return err
}
conn, ok := <-connections
if !ok {
break
}
Logf("Sending %s\n", msgDesc(payload, attachment))
tasks.Add(1)
go func(payload []byte, attachment *os.File, conn net.Conn) {
defer tasks.Done()
if _, err := conn.Write([]byte(data.EncodeString(string(payload)))); err != nil {
return
}
if attachment == nil {
conn.Close()
return
}
var iotasks sync.WaitGroup
iotasks.Add(2)
go func(attachment *os.File, conn net.Conn) {
defer iotasks.Done()
Debugf("copying the connection to [%d]\n", attachment.Fd())
io.Copy(attachment, conn)
attachment.Close()
Debugf("done copying the connection to [%d]\n", attachment.Fd())
}(attachment, conn)
go func(attachment *os.File, conn net.Conn) {
defer iotasks.Done()
Debugf("copying [%d] to the connection\n", attachment.Fd())
io.Copy(conn, attachment)
conn.Close()
Debugf("done copying [%d] to the connection\n", attachment.Fd())
}(attachment, conn)
iotasks.Wait()
}(payload, attachment, conn)
}
return nil
}
func msgDesc(payload []byte, attachment *os.File) string {
return beam.MsgDesc(payload, attachment)
}
func ReceiveFromConn(connections chan net.Conn, dst beam.Sender) error {
for conn := range connections {
err := func() error {
Logf("parsing message from network...\n")
defer Logf("done parsing message from network\n")
buf := make([]byte, 4098)
n, err := conn.Read(buf)
if n == 0 {
conn.Close()
if err == io.EOF {
return nil
} else {
return err
}
}
Logf("decoding message from '%s'\n", buf[:n])
header, skip, err := data.DecodeString(string(buf[:n]))
if err != nil {
conn.Close()
return err
}
pub, priv, err := beam.SocketPair()
if err != nil {
return err
}
Logf("decoded message: %s\n", data.Message(header).Pretty())
go func(skipped []byte, conn net.Conn, f *os.File) {
// this closes both conn and f
if len(skipped) > 0 {
if _, err := f.Write(skipped); err != nil {
Logf("ERROR: %v\n", err)
f.Close()
conn.Close()
return
}
}
bicopy(conn, f)
}(buf[skip:n], conn, pub)
if err := dst.Send([]byte(header), priv); err != nil {
return err
}
return nil
}()
if err != nil {
Logf("Error reading from connection: %v\n", err)
}
}
return nil
}
func bicopy(a, b io.ReadWriteCloser) {
var iotasks sync.WaitGroup
oneCopy := func(dst io.WriteCloser, src io.Reader) {
defer iotasks.Done()
io.Copy(dst, src)
dst.Close()
}
iotasks.Add(2)
go oneCopy(a, b)
go oneCopy(b, a)
iotasks.Wait()
}

View File

@ -0,0 +1,441 @@
package main
import (
"bufio"
"fmt"
"github.com/dotcloud/docker/pkg/beam"
"github.com/dotcloud/docker/pkg/beam/data"
"github.com/dotcloud/docker/pkg/term"
"github.com/dotcloud/docker/utils"
"io"
"net"
"net/url"
"os"
"os/exec"
"path"
"strings"
"sync"
"text/template"
)
func CmdLogger(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
if err := os.MkdirAll("logs", 0700); err != nil {
fmt.Fprintf(stderr, "%v\n", err)
return
}
var tasks sync.WaitGroup
defer tasks.Wait()
var n int = 1
r := beam.NewRouter(out)
r.NewRoute().HasAttachment().KeyStartsWith("cmd", "log").Handler(func(payload []byte, attachment *os.File) error {
tasks.Add(1)
go func(n int) {
defer tasks.Done()
defer attachment.Close()
var streamname string
if cmd := data.Message(payload).Get("cmd"); len(cmd) == 1 || cmd[1] == "stdout" {
streamname = "stdout"
} else {
streamname = cmd[1]
}
if fromcmd := data.Message(payload).Get("fromcmd"); len(fromcmd) != 0 {
streamname = fmt.Sprintf("%s-%s", strings.Replace(strings.Join(fromcmd, "_"), "/", "_", -1), streamname)
}
logfile, err := os.OpenFile(path.Join("logs", fmt.Sprintf("%d-%s", n, streamname)), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0700)
if err != nil {
fmt.Fprintf(stderr, "%v\n", err)
return
}
defer logfile.Close()
io.Copy(logfile, attachment)
logfile.Sync()
}(n)
n++
return nil
}).Tee(out)
if _, err := beam.Copy(r, in); err != nil {
fmt.Fprintf(stderr, "%v\n", err)
return
}
}
func CmdRender(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
if len(args) != 2 {
fmt.Fprintf(stderr, "Usage: %s FORMAT\n", args[0])
out.Send(data.Empty().Set("status", "1").Bytes(), nil)
return
}
txt := args[1]
if !strings.HasSuffix(txt, "\n") {
txt += "\n"
}
t := template.Must(template.New("render").Parse(txt))
for {
payload, attachment, err := in.Receive()
if err != nil {
return
}
msg, err := data.Decode(string(payload))
if err != nil {
fmt.Fprintf(stderr, "decode error: %v\n")
}
if err := t.Execute(stdout, msg); err != nil {
fmt.Fprintf(stderr, "rendering error: %v\n", err)
out.Send(data.Empty().Set("status", "1").Bytes(), nil)
return
}
if err := out.Send(payload, attachment); err != nil {
return
}
}
}
func CmdDevnull(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
for {
_, attachment, err := in.Receive()
if err != nil {
return
}
if attachment != nil {
attachment.Close()
}
}
}
func CmdPrompt(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
if len(args) < 2 {
fmt.Fprintf(stderr, "usage: %s PROMPT...\n", args[0])
return
}
if !term.IsTerminal(0) {
fmt.Fprintf(stderr, "can't prompt: no tty available...\n")
return
}
fmt.Printf("%s: ", strings.Join(args[1:], " "))
oldState, _ := term.SaveState(0)
term.DisableEcho(0, oldState)
line, _, err := bufio.NewReader(os.Stdin).ReadLine()
if err != nil {
fmt.Fprintln(stderr, err.Error())
return
}
val := string(line)
fmt.Printf("\n")
term.RestoreTerminal(0, oldState)
out.Send(data.Empty().Set("fromcmd", args...).Set("value", val).Bytes(), nil)
}
func CmdStdio(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
var tasks sync.WaitGroup
defer tasks.Wait()
r := beam.NewRouter(out)
r.NewRoute().HasAttachment().KeyStartsWith("cmd", "log").Handler(func(payload []byte, attachment *os.File) error {
tasks.Add(1)
go func() {
defer tasks.Done()
defer attachment.Close()
io.Copy(os.Stdout, attachment)
attachment.Close()
}()
return nil
}).Tee(out)
if _, err := beam.Copy(r, in); err != nil {
Fatal(err)
fmt.Fprintf(stderr, "%v\n", err)
return
}
}
func CmdEcho(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
fmt.Fprintln(stdout, strings.Join(args[1:], " "))
}
func CmdPass(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
for {
payload, attachment, err := in.Receive()
if err != nil {
return
}
if err := out.Send(payload, attachment); err != nil {
if attachment != nil {
attachment.Close()
}
return
}
}
}
func CmdSpawn(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
c := exec.Command(utils.SelfPath())
r, w, err := os.Pipe()
if err != nil {
fmt.Fprintf(stderr, "%v\n", err)
return
}
c.Stdin = r
c.Stdout = stdout
c.Stderr = stderr
go func() {
fmt.Fprintf(w, strings.Join(args[1:], " "))
w.Sync()
w.Close()
}()
if err := c.Run(); err != nil {
fmt.Fprintf(stderr, "%v\n", err)
return
}
}
func CmdIn(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
os.Chdir(args[1])
GetHandler("pass")([]string{"pass"}, stdout, stderr, in, out)
}
func CmdExec(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
cmd := exec.Command(args[1], args[2:]...)
cmd.Stdout = stdout
cmd.Stderr = stderr
//cmd.Stdin = os.Stdin
local, remote, err := beam.SocketPair()
if err != nil {
fmt.Fprintf(stderr, "%v\n", err)
return
}
child, err := beam.FileConn(local)
if err != nil {
local.Close()
remote.Close()
fmt.Fprintf(stderr, "%v\n", err)
return
}
local.Close()
cmd.ExtraFiles = append(cmd.ExtraFiles, remote)
var tasks sync.WaitGroup
tasks.Add(1)
go func() {
defer Debugf("done copying to child\n")
defer tasks.Done()
defer child.CloseWrite()
beam.Copy(child, in)
}()
tasks.Add(1)
go func() {
defer Debugf("done copying from child %d\n")
defer tasks.Done()
r := beam.NewRouter(out)
r.NewRoute().All().Handler(func(p []byte, a *os.File) error {
return out.Send(data.Message(p).Set("pid", fmt.Sprintf("%d", cmd.Process.Pid)).Bytes(), a)
})
beam.Copy(r, child)
}()
execErr := cmd.Run()
// We can close both ends of the socket without worrying about data stuck in the buffer,
// because unix socket writes are fully synchronous.
child.Close()
tasks.Wait()
var status string
if execErr != nil {
status = execErr.Error()
} else {
status = "ok"
}
out.Send(data.Empty().Set("status", status).Set("cmd", args...).Bytes(), nil)
}
func CmdTrace(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
r := beam.NewRouter(out)
r.NewRoute().All().Handler(func(payload []byte, attachment *os.File) error {
var sfd string = "nil"
if attachment != nil {
sfd = fmt.Sprintf("%d", attachment.Fd())
}
fmt.Printf("===> %s [%s]\n", data.Message(payload).Pretty(), sfd)
out.Send(payload, attachment)
return nil
})
beam.Copy(r, in)
}
func CmdEmit(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
out.Send(data.Parse(args[1:]).Bytes(), nil)
}
func CmdPrint(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
for {
payload, a, err := in.Receive()
if err != nil {
return
}
// Skip commands
if a != nil && data.Message(payload).Get("cmd") == nil {
dup, err := beam.SendRPipe(out, payload)
if err != nil {
a.Close()
return
}
io.Copy(io.MultiWriter(os.Stdout, dup), a)
dup.Close()
} else {
if err := out.Send(payload, a); err != nil {
return
}
}
}
}
func CmdMultiprint(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
var tasks sync.WaitGroup
defer tasks.Wait()
r := beam.NewRouter(out)
multiprint := func(p []byte, a *os.File) error {
tasks.Add(1)
go func() {
defer tasks.Done()
defer a.Close()
msg := data.Message(string(p))
input := bufio.NewScanner(a)
for input.Scan() {
fmt.Printf("[%s] %s\n", msg.Pretty(), input.Text())
}
}()
return nil
}
r.NewRoute().KeyIncludes("type", "job").Passthrough(out)
r.NewRoute().HasAttachment().Handler(multiprint).Tee(out)
beam.Copy(r, in)
}
func CmdListen(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
if len(args) != 2 {
out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil)
return
}
u, err := url.Parse(args[1])
if err != nil {
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
return
}
l, err := net.Listen(u.Scheme, u.Host)
if err != nil {
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
return
}
for {
conn, err := l.Accept()
if err != nil {
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
return
}
f, err := connToFile(conn)
if err != nil {
conn.Close()
continue
}
out.Send(data.Empty().Set("type", "socket").Set("remoteaddr", conn.RemoteAddr().String()).Bytes(), f)
}
}
func CmdBeamsend(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
if len(args) < 2 {
if err := out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil {
Fatal(err)
}
return
}
var connector func(string) (chan net.Conn, error)
connector = dialer
connections, err := connector(args[1])
if err != nil {
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
return
}
// Copy in to conn
SendToConn(connections, in)
}
func CmdBeamreceive(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
if len(args) != 2 {
if err := out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil {
Fatal(err)
}
return
}
var connector func(string) (chan net.Conn, error)
connector = listener
connections, err := connector(args[1])
if err != nil {
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
return
}
// Copy in to conn
ReceiveFromConn(connections, out)
}
func CmdConnect(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
if len(args) != 2 {
out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil)
return
}
u, err := url.Parse(args[1])
if err != nil {
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
return
}
var tasks sync.WaitGroup
for {
_, attachment, err := in.Receive()
if err != nil {
break
}
if attachment == nil {
continue
}
Logf("connecting to %s/%s\n", u.Scheme, u.Host)
conn, err := net.Dial(u.Scheme, u.Host)
if err != nil {
out.Send(data.Empty().Set("cmd", "msg", "connect error: "+err.Error()).Bytes(), nil)
return
}
out.Send(data.Empty().Set("cmd", "msg", "connection established").Bytes(), nil)
tasks.Add(1)
go func(attachment *os.File, conn net.Conn) {
defer tasks.Done()
// even when successful, conn.File() returns a duplicate,
// so we must close the original
var iotasks sync.WaitGroup
iotasks.Add(2)
go func(attachment *os.File, conn net.Conn) {
defer iotasks.Done()
io.Copy(attachment, conn)
}(attachment, conn)
go func(attachment *os.File, conn net.Conn) {
defer iotasks.Done()
io.Copy(conn, attachment)
}(attachment, conn)
iotasks.Wait()
conn.Close()
attachment.Close()
}(attachment, conn)
}
tasks.Wait()
}
func CmdOpenfile(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
for _, name := range args {
f, err := os.Open(name)
if err != nil {
continue
}
if err := out.Send(data.Empty().Set("path", name).Set("type", "file").Bytes(), f); err != nil {
f.Close()
}
}
}
func CmdChdir(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
os.Chdir(args[1])
}

View File

@ -0,0 +1,3 @@
#!/usr/bin/env beamsh
exec ls -l

View File

@ -0,0 +1,5 @@
#!/usr/bin/env beamsh
trace {
exec ls -l
}

View File

@ -0,0 +1,7 @@
#!/usr/bin/env beamsh
trace {
stdio {
exec ls -l
}
}

View File

@ -0,0 +1,10 @@
#!/usr/bin/env beamsh -x
trace outer {
# stdio fails
stdio {
trace inner {
exec ls -l
}
}
}

View File

@ -0,0 +1,9 @@
#!/usr/bin/env beamsh
stdio {
trace {
stdio {
exec ls -l
}
}
}

View File

@ -0,0 +1,6 @@
#!/usr/bin/env beamsh
stdio {
# exec fails
exec ls -l
}

View File

@ -0,0 +1,7 @@
#!/usr/bin/env beamsh
stdio {
trace {
echo hello
}
}

View File

@ -0,0 +1,6 @@
#!/usr/bin/env beamsh
stdio {
# exec fails
echo hello world
}

View File

@ -0,0 +1,9 @@
#!/usr/bin/env beamsh
devnull {
multiprint {
exec tail -f /var/log/system.log &
exec ls -l
exec ls ksdhfkjdshf jksdfhkjsdhf
}
}

View File

@ -0,0 +1,8 @@
#!/usr/bin/env beamsh
print {
trace {
emit msg=hello
emit msg=world
}
}

View File

@ -0,0 +1,9 @@
#!/usr/bin/env beamsh
trace {
log {
exec ls -l
exec ls /tmp/jhsdfkjhsdjkfhsdjkfhsdjkkhsdjkf
echo hello world
}
}

View File

@ -0,0 +1,9 @@
#!/usr/bin/env beamsh
multiprint {
trace {
listen tcp://localhost:7676 &
listen tcp://localhost:8787 &
}
}

6
beam/http2/README.md Normal file
View File

@ -0,0 +1,6 @@
This package defines a remote transport for Beam services using http2/spdy and tls.
Pointers:
* Low-level protocol framer: http://code.google.com/p/go.net/spdy
* (incomplete) high-level server implementation: https://github.com/shykes/spdy-go

264
beam/inmem/inmem.go Normal file
View File

@ -0,0 +1,264 @@
package inmem
import (
"github.com/docker/beam"
"io"
"sync"
)
func Pipe() (*PipeReceiver, *PipeSender) {
p := new(pipe)
p.rwait.L = &p.l
p.wwait.L = &p.l
r := &PipeReceiver{p}
w := &PipeSender{p}
return r, w
}
type pipe struct {
ch chan *pipeMessage
rwait sync.Cond
wwait sync.Cond
l sync.Mutex
rl sync.Mutex
wl sync.Mutex
rerr error // if reader closed, error to give writes
werr error // if writer closed, error to give reads
pmsg *pipeMessage
}
type pipeMessage struct {
msg *beam.Message
out *PipeSender
in *PipeReceiver
}
func (p *pipe) psend(pmsg *pipeMessage) error {
var err error
// One writer at a time.
p.wl.Lock()
defer p.wl.Unlock()
p.l.Lock()
defer p.l.Unlock()
p.pmsg = pmsg
p.rwait.Signal()
for {
if p.pmsg == nil {
break
}
if p.rerr != nil {
err = p.rerr
break
}
if p.werr != nil {
err = io.ErrClosedPipe
}
p.wwait.Wait()
}
p.pmsg = nil // in case of rerr or werr
return err
}
func (p *pipe) send(msg *beam.Message, mode int) (in *PipeReceiver, out *PipeSender, err error) {
// Prepare the message
pmsg := &pipeMessage{msg: msg}
if mode&beam.R != 0 {
in, pmsg.out = Pipe()
defer func() {
if err != nil {
in.Close()
in = nil
pmsg.out.Close()
}
}()
}
if mode&beam.W != 0 {
pmsg.in, out = Pipe()
defer func() {
if err != nil {
out.Close()
out = nil
pmsg.in.Close()
}
}()
}
err = p.psend(pmsg)
return
}
func (p *pipe) preceive() (*pipeMessage, error) {
p.rl.Lock()
defer p.rl.Unlock()
p.l.Lock()
defer p.l.Unlock()
for {
if p.rerr != nil {
return nil, io.ErrClosedPipe
}
if p.pmsg != nil {
break
}
if p.werr != nil {
return nil, p.werr
}
p.rwait.Wait()
}
pmsg := p.pmsg
p.pmsg = nil
p.wwait.Signal()
return pmsg, nil
}
func (p *pipe) receive(mode int) (*beam.Message, *PipeReceiver, *PipeSender, error) {
pmsg, err := p.preceive()
if err != nil {
return nil, nil, nil, err
}
if pmsg.out != nil && mode&beam.W == 0 {
pmsg.out.Close()
pmsg.out = nil
}
if pmsg.in != nil && mode&beam.R == 0 {
pmsg.in.Close()
pmsg.in = nil
}
return pmsg.msg, pmsg.in, pmsg.out, nil
}
func (p *pipe) rclose(err error) {
if err == nil {
err = io.ErrClosedPipe
}
p.l.Lock()
defer p.l.Unlock()
p.rerr = err
p.rwait.Signal()
p.wwait.Signal()
}
func (p *pipe) wclose(err error) {
if err == nil {
err = io.EOF
}
p.l.Lock()
defer p.l.Unlock()
p.werr = err
p.rwait.Signal()
p.wwait.Signal()
}
// PipeReceiver
type PipeReceiver struct {
p *pipe
}
func (r *PipeReceiver) Receive(mode int) (*beam.Message, beam.Receiver, beam.Sender, error) {
msg, pin, pout, err := r.p.receive(mode)
if err != nil {
return nil, nil, nil, err
}
var (
// Always return NopReceiver/NopSender instead of nil values,
// because:
// - if they were requested in the mode, they can safely be used
// - if they were not requested, they can safely be ignored (ie no leak if they
// aren't closed)
in beam.Receiver = beam.NopReceiver{}
out beam.Sender = beam.NopSender{}
)
if pin != nil {
in = pin
}
if pout != nil {
out = pout
}
return msg, in, out, err
}
func (r *PipeReceiver) SendTo(dst beam.Sender) (int, error) {
var n int
// If the destination is a PipeSender, we can cheat
pdst, ok := dst.(*PipeSender)
if !ok {
return 0, beam.ErrIncompatibleSender
}
for {
pmsg, err := r.p.preceive()
if err == io.EOF {
break
}
if err != nil {
return n, err
}
if err := pdst.p.psend(pmsg); err != nil {
return n, err
}
}
n++
return n, nil
}
func (r *PipeReceiver) Close() error {
return r.CloseWithError(nil)
}
func (r *PipeReceiver) CloseWithError(err error) error {
r.p.rclose(err)
return nil
}
// PipeSender
type PipeSender struct {
p *pipe
}
func (w *PipeSender) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) {
pin, pout, err := w.p.send(msg, mode)
var (
in beam.Receiver
out beam.Sender
)
if pin != nil {
in = pin
}
if pout != nil {
out = pout
}
return in, out, err
}
func (w *PipeSender) ReceiveFrom(src beam.Receiver) (int, error) {
var n int
// If the destination is a PipeReceiver, we can cheat
psrc, ok := src.(*PipeReceiver)
if !ok {
return 0, beam.ErrIncompatibleReceiver
}
for {
pmsg, err := psrc.p.preceive()
if err == io.EOF {
break
}
if err != nil {
return n, err
}
if err := w.p.psend(pmsg); err != nil {
return n, err
}
n++
}
return n, nil
}
func (w *PipeSender) Close() error {
return w.CloseWithError(nil)
}
func (w *PipeSender) CloseWithError(err error) error {
w.p.wclose(err)
return nil
}

191
beam/inmem/inmem_test.go Normal file
View File

@ -0,0 +1,191 @@
package inmem
import (
"fmt"
"github.com/docker/beam"
"github.com/dotcloud/docker/pkg/testutils"
"io/ioutil"
"os"
"testing"
)
func TestReceiveW(t *testing.T) {
r, w := Pipe()
go func() {
w.Send(&beam.Message{Name: "hello"}, 0)
}()
_, _, ww, err := r.Receive(beam.W)
if err != nil {
t.Fatal(err)
}
if _, _, err := ww.Send(&beam.Message{Name: "this better not crash"}, 0); err != nil {
t.Fatal(err)
}
}
func TestSimpleSend(t *testing.T) {
r, w := Pipe()
defer r.Close()
defer w.Close()
testutils.Timeout(t, func() {
go func() {
msg, in, out, err := r.Receive(0)
if err != nil {
t.Fatal(err)
}
if msg.Name != "print" {
t.Fatalf("%#v", *msg)
}
if msg.Args[0] != "hello world" {
t.Fatalf("%#v", *msg)
}
assertMode(t, in, out, 0)
}()
in, out, err := w.Send(&beam.Message{Name: "print", Args: []string{"hello world"}}, 0)
if err != nil {
t.Fatal(err)
}
assertMode(t, in, out, 0)
})
}
// assertMode verifies that the values of r and w match
// mode.
// If mode has the R bit set, r must be non-nil.
// If mode has the W bit set, w must be non-nil.
//
// If any of these conditions are not met, t.Fatal is called and the active
// test fails.
func assertMode(t *testing.T, r beam.Receiver, w beam.Sender, mode int) {
// If mode has the R bit set, r must be non-nil
if mode&beam.R != 0 {
if r == nil {
t.Fatalf("should be non-nil: %#v", r)
}
// Otherwise it must be nil.
}
// If mode has the W bit set, w must be non-nil
if mode&beam.W != 0 {
if w == nil {
t.Fatalf("should be non-nil: %#v", w)
}
// Otherwise it must be nil.
}
}
func TestSendReply(t *testing.T) {
r, w := Pipe()
defer r.Close()
defer w.Close()
testutils.Timeout(t, func() {
// Send
go func() {
// Send a message with mode=R
in, out, err := w.Send(&beam.Message{Args: []string{"this is the request"}}, beam.R)
if err != nil {
t.Fatal(err)
}
assertMode(t, in, out, beam.R)
// Read for a reply
resp, _, _, err := in.Receive(0)
if err != nil {
t.Fatal(err)
}
if resp.Args[0] != "this is the reply" {
t.Fatalf("%#v", resp)
}
}()
// Receive a message with mode=W
msg, in, out, err := r.Receive(beam.W)
if err != nil {
t.Fatal(err)
}
if msg.Args[0] != "this is the request" {
t.Fatalf("%#v", msg)
}
assertMode(t, in, out, beam.W)
// Send a reply
_, _, err = out.Send(&beam.Message{Args: []string{"this is the reply"}}, 0)
if err != nil {
t.Fatal(err)
}
})
}
func TestSendNested(t *testing.T) {
r, w := Pipe()
defer r.Close()
defer w.Close()
testutils.Timeout(t, func() {
// Send
go func() {
// Send a message with mode=W
in, out, err := w.Send(&beam.Message{Args: []string{"this is the request"}}, beam.W)
if err != nil {
t.Fatal(err)
}
assertMode(t, in, out, beam.W)
// Send a nested message
_, _, err = out.Send(&beam.Message{Args: []string{"this is the nested message"}}, 0)
if err != nil {
t.Fatal(err)
}
}()
// Receive a message with mode=R
msg, in, out, err := r.Receive(beam.R)
if err != nil {
t.Fatal(err)
}
if msg.Args[0] != "this is the request" {
t.Fatalf("%#v", msg)
}
assertMode(t, in, out, beam.R)
// Read for a nested message
nested, _, _, err := in.Receive(0)
if err != nil {
t.Fatal(err)
}
if nested.Args[0] != "this is the nested message" {
t.Fatalf("%#v", nested)
}
})
}
func TestSendFile(t *testing.T) {
r, w := Pipe()
defer r.Close()
defer w.Close()
tmp, err := ioutil.TempFile("", "beam-test-")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmp.Name())
fmt.Fprintf(tmp, "hello world\n")
tmp.Sync()
tmp.Seek(0, 0)
testutils.Timeout(t, func() {
go func() {
_, _, err := w.Send(&beam.Message{"file", []string{"path=" + tmp.Name()}, tmp}, 0)
if err != nil {
t.Fatal(err)
}
}()
msg, _, _, err := r.Receive(0)
if err != nil {
t.Fatal(err)
}
if msg.Name != "file" {
t.Fatalf("%#v", msg)
}
if msg.Args[0] != "path="+tmp.Name() {
t.Fatalf("%#v", msg)
}
txt, err := ioutil.ReadAll(msg.Att)
if err != nil {
t.Fatal(err)
}
if string(txt) != "hello world\n" {
t.Fatalf("%s\n", txt)
}
})
}

21
beam/nop.go Normal file
View File

@ -0,0 +1,21 @@
package beam
import (
"io"
)
type NopSender struct{}
func (s NopSender) Send(msg *Message, mode int) (Receiver, Sender, error) {
return NopReceiver{}, NopSender{}, nil
}
func (s NopSender) Close() error {
return nil
}
type NopReceiver struct{}
func (r NopReceiver) Receive(mode int) (*Message, Receiver, Sender, error) {
return nil, nil, nil, io.EOF
}

166
beam/unix/beam.go Normal file
View File

@ -0,0 +1,166 @@
package unix
import (
"fmt"
"io"
"os"
)
type Sender interface {
Send([]byte, *os.File) error
}
type Receiver interface {
Receive() ([]byte, *os.File, error)
}
type ReceiveCloser interface {
Receiver
Close() error
}
type SendCloser interface {
Sender
Close() error
}
type ReceiveSender interface {
Receiver
Sender
}
const (
R int = 1 << (32 - 1 - iota)
W
)
func sendPipe(dst Sender, data []byte, mode int) (*os.File, error) {
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
var (
remote *os.File
local *os.File
)
if mode == R {
remote = r
local = w
} else if mode == W {
remote = w
local = r
}
if err := dst.Send(data, remote); err != nil {
local.Close()
remote.Close()
return nil, err
}
return local, nil
}
// SendRPipe create a pipe and sends its *read* end attached in a beam message
// to `dst`, with `data` as the message payload.
// It returns the *write* end of the pipe, or an error.
func SendRPipe(dst Sender, data []byte) (*os.File, error) {
return sendPipe(dst, data, R)
}
// SendWPipe create a pipe and sends its *read* end attached in a beam message
// to `dst`, with `data` as the message payload.
// It returns the *write* end of the pipe, or an error.
func SendWPipe(dst Sender, data []byte) (*os.File, error) {
return sendPipe(dst, data, W)
}
func SendConn(dst Sender, data []byte) (conn *UnixConn, err error) {
local, remote, err := SocketPair()
if err != nil {
return nil, err
}
defer func() {
if err != nil {
local.Close()
remote.Close()
}
}()
conn, err = FileConn(local)
if err != nil {
return nil, err
}
local.Close()
if err := dst.Send(data, remote); err != nil {
return nil, err
}
return conn, nil
}
func ReceiveConn(src Receiver) ([]byte, *UnixConn, error) {
for {
data, f, err := src.Receive()
if err != nil {
return nil, nil, err
}
if f == nil {
// Skip empty attachments
continue
}
conn, err := FileConn(f)
if err != nil {
// Skip beam attachments which are not connections
// (for example might be a regular file, directory etc)
continue
}
return data, conn, nil
}
panic("impossibru!")
return nil, nil, nil
}
func Copy(dst Sender, src Receiver) (int, error) {
var n int
for {
payload, attachment, err := src.Receive()
if err == io.EOF {
return n, nil
} else if err != nil {
return n, err
}
if err := dst.Send(payload, attachment); err != nil {
if attachment != nil {
attachment.Close()
}
return n, err
}
n++
}
panic("impossibru!")
return n, nil
}
// MsgDesc returns a human readable description of a beam message, usually
// for debugging purposes.
func MsgDesc(payload []byte, attachment *os.File) string {
var filedesc string = "<nil>"
if attachment != nil {
filedesc = fmt.Sprintf("%d", attachment.Fd())
}
return fmt.Sprintf("'%s'[%s]", payload, filedesc)
}
type devnull struct{}
func Devnull() ReceiveSender {
return devnull{}
}
func (d devnull) Send(p []byte, a *os.File) error {
if a != nil {
a.Close()
}
return nil
}
func (d devnull) Receive() ([]byte, *os.File, error) {
return nil, nil, io.EOF
}

39
beam/unix/beam_test.go Normal file
View File

@ -0,0 +1,39 @@
package unix
import (
"github.com/dotcloud/docker/pkg/beam/data"
"testing"
)
func TestSendConn(t *testing.T) {
a, b, err := USocketPair()
if err != nil {
t.Fatal(err)
}
defer a.Close()
defer b.Close()
go func() {
conn, err := SendConn(a, data.Empty().Set("type", "connection").Bytes())
if err != nil {
t.Fatal(err)
}
if err := conn.Send(data.Empty().Set("foo", "bar").Bytes(), nil); err != nil {
t.Fatal(err)
}
conn.CloseWrite()
}()
payload, conn, err := ReceiveConn(b)
if err != nil {
t.Fatal(err)
}
if val := data.Message(string(payload)).Get("type"); val == nil || val[0] != "connection" {
t.Fatalf("%v != %v\n", val, "connection")
}
msg, _, err := conn.Receive()
if err != nil {
t.Fatal(err)
}
if val := data.Message(string(msg)).Get("foo"); val == nil || val[0] != "bar" {
t.Fatalf("%v != %v\n", val, "bar")
}
}

124
beam/unix/conn.go Normal file
View File

@ -0,0 +1,124 @@
package unix
import (
"fmt"
"os"
"github.com/docker/beam"
"github.com/docker/beam/data"
)
func Pair() (*Conn, *Conn, error) {
c1, c2, err := USocketPair()
if err != nil {
return nil, nil, err
}
return &Conn{c1}, &Conn{c2}, nil
}
type Conn struct {
*UnixConn
}
func sendablePair() (conn *UnixConn, remoteFd *os.File, err error) {
// Get 2 *os.File
local, remote, err := SocketPair()
if err != nil {
return nil, nil, err
}
defer func() {
if err != nil {
local.Close()
remote.Close()
}
}()
// Convert 1 to *net.UnixConn
conn, err = FileConn(local)
if err != nil {
return nil, nil, err
}
local.Close()
// Return the "mismatched" pair
return conn, remote, nil
}
// This implements beam.Sender.Close which *only closes the sender*.
// This is similar to the pattern of only closing go channels from
// the sender's side.
// If you want to close the entire connection, call Conn.UnixConn.Close.
func (c *Conn) Close() error {
return c.UnixConn.CloseWrite()
}
func (c *Conn) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) {
if msg.Att != nil {
return nil, nil, fmt.Errorf("file attachment not yet implemented in unix transport")
}
parts := []string{msg.Name}
parts = append(parts, msg.Args...)
b := []byte(data.EncodeList(parts))
// Setup nested streams
var (
fd *os.File
r beam.Receiver
w beam.Sender
)
if mode&(beam.R|beam.W) != 0 {
local, remote, err := sendablePair()
if err != nil {
return nil, nil, err
}
fd = remote
if mode&beam.R != 0 {
r = &Conn{local}
}
if mode&beam.W != 0 {
w = &Conn{local}
} else {
local.CloseWrite()
}
}
c.UnixConn.Send(b, fd)
return r, w, nil
}
func (c *Conn) Receive(mode int) (*beam.Message, beam.Receiver, beam.Sender, error) {
b, fd, err := c.UnixConn.Receive()
if err != nil {
return nil, nil, nil, err
}
parts, n, err := data.DecodeList(string(b))
if err != nil {
return nil, nil, nil, err
}
if n != len(b) {
return nil, nil, nil, fmt.Errorf("garbage data %#v", b[:n])
}
if len(parts) == 0 {
return nil, nil, nil, fmt.Errorf("malformed message")
}
msg := &beam.Message{Name: parts[0], Args: parts[1:]}
// Setup nested streams
var (
r beam.Receiver
w beam.Sender
)
// Apply mode mask
if fd != nil {
subconn, err := FileConn(fd)
if err != nil {
return nil, nil, nil, err
}
fd.Close()
if mode&beam.R != 0 {
r = &Conn{subconn}
}
if mode&beam.W != 0 {
w = &Conn{subconn}
} else {
subconn.CloseWrite()
}
}
return msg, r, w, nil
}

154
beam/unix/conn_test.go Normal file
View File

@ -0,0 +1,154 @@
package unix
import (
"github.com/docker/beam"
"github.com/dotcloud/docker/pkg/testutils"
"testing"
)
func TestPair(t *testing.T) {
r, w, err := Pair()
if err != nil {
t.Fatal("Unexpected error")
}
defer r.Close()
defer w.Close()
testutils.Timeout(t, func() {
go func() {
msg, in, out, err := r.Receive(0)
if err != nil {
t.Fatal(err)
}
if msg.Name != "print" {
t.Fatalf("%#v", *msg)
}
if msg.Args[0] != "hello world" {
t.Fatalf("%#v", *msg)
}
if in != nil && out != nil {
t.Fatal("Unexpected return value")
}
}()
_, _, err := w.Send(&beam.Message{Name: "print", Args: []string{"hello world"}}, 0)
if err != nil {
t.Fatal(err)
}
})
}
func TestSendReply(t *testing.T) {
r, w, err := Pair()
if err != nil {
t.Fatal(err)
}
defer r.Close()
defer w.Close()
testutils.Timeout(t, func() {
// Send
go func() {
// Send a message with mode=R
in, out, err := w.Send(&beam.Message{Args: []string{"this is the request"}}, beam.R)
if err != nil {
t.Fatal(err)
}
assertMode(t, in, out, beam.R)
// Read for a reply
resp, _, _, err := in.Receive(0)
if err != nil {
t.Fatal(err)
}
if resp.Args[0] != "this is the reply" {
t.Fatalf("%#v", resp)
}
}()
// Receive a message with mode=W
msg, in, out, err := r.Receive(beam.W)
if err != nil {
t.Fatal(err)
}
if msg.Args[0] != "this is the request" {
t.Fatalf("%#v", msg)
}
assertMode(t, in, out, beam.W)
// Send a reply
_, _, err = out.Send(&beam.Message{Args: []string{"this is the reply"}}, 0)
if err != nil {
t.Fatal(err)
}
})
}
func TestSendNested(t *testing.T) {
r, w, err := Pair()
if err != nil {
t.Fatal(err)
}
defer r.Close()
defer w.Close()
testutils.Timeout(t, func() {
// Send
go func() {
// Send a message with mode=W
in, out, err := w.Send(&beam.Message{Args: []string{"this is the request"}}, beam.W)
if err != nil {
t.Fatal(err)
}
assertMode(t, in, out, beam.W)
// Send a nested message
_, _, err = out.Send(&beam.Message{Args: []string{"this is the nested message"}}, 0)
if err != nil {
t.Fatal(err)
}
}()
// Receive a message with mode=R
msg, in, out, err := r.Receive(beam.R)
if err != nil {
t.Fatal(err)
}
if msg.Args[0] != "this is the request" {
t.Fatalf("%#v", msg)
}
assertMode(t, in, out, beam.R)
// Read for a nested message
nested, _, _, err := in.Receive(0)
if err != nil {
t.Fatal(err)
}
if nested.Args[0] != "this is the nested message" {
t.Fatalf("%#v", nested)
}
})
}
// FIXME: duplicate from inmem/inmem_test.go
// assertMode verifies that the values of r and w match
// mode.
// If mode has the R bit set, r must be non-nil. Otherwise it must be nil.
// If mode has the W bit set, w must be non-nil. Otherwise it must be nil.
//
// If any of these conditions are not met, t.Fatal is called and the active
// test fails.
func assertMode(t *testing.T, r beam.Receiver, w beam.Sender, mode int) {
// If mode has the R bit set, r must be non-nil
if mode&beam.R != 0 {
if r == nil {
t.Fatalf("should be non-nil: %#v", r)
}
// Otherwise it must be nil.
} else {
if r != nil {
t.Fatalf("should be nil: %#v", r)
}
}
// If mode has the W bit set, w must be non-nil
if mode&beam.W != 0 {
if w == nil {
t.Fatalf("should be non-nil: %#v", w)
}
// Otherwise it must be nil.
} else {
if w != nil {
t.Fatalf("should be nil: %#v", w)
}
}
}

317
beam/unix/unix.go Normal file
View File

@ -0,0 +1,317 @@
package unix
import (
"bufio"
"fmt"
"net"
"os"
"syscall"
)
func debugCheckpoint(msg string, args ...interface{}) {
if os.Getenv("DEBUG") == "" {
return
}
os.Stdout.Sync()
tty, _ := os.OpenFile("/dev/tty", os.O_RDWR, 0700)
fmt.Fprintf(tty, msg, args...)
bufio.NewScanner(tty).Scan()
tty.Close()
}
type UnixConn struct {
*net.UnixConn
fds []*os.File
}
// Framing:
// In order to handle framing in Send/Recieve, as these give frame
// boundaries we use a very simple 4 bytes header. It is a big endiand
// uint32 where the high bit is set if the message includes a file
// descriptor. The rest of the uint32 is the length of the next frame.
// We need the bit in order to be able to assign recieved fds to
// the right message, as multiple messages may be coalesced into
// a single recieve operation.
func makeHeader(data []byte, fds []int) ([]byte, error) {
header := make([]byte, 4)
length := uint32(len(data))
if length > 0x7fffffff {
return nil, fmt.Errorf("Data to large")
}
if len(fds) != 0 {
length = length | 0x80000000
}
header[0] = byte((length >> 24) & 0xff)
header[1] = byte((length >> 16) & 0xff)
header[2] = byte((length >> 8) & 0xff)
header[3] = byte((length >> 0) & 0xff)
return header, nil
}
func parseHeader(header []byte) (uint32, bool) {
length := uint32(header[0])<<24 | uint32(header[1])<<16 | uint32(header[2])<<8 | uint32(header[3])
hasFd := length&0x80000000 != 0
length = length & ^uint32(0x80000000)
return length, hasFd
}
func FileConn(f *os.File) (*UnixConn, error) {
conn, err := net.FileConn(f)
if err != nil {
return nil, err
}
uconn, ok := conn.(*net.UnixConn)
if !ok {
conn.Close()
return nil, fmt.Errorf("%d: not a unix connection", f.Fd())
}
return &UnixConn{UnixConn: uconn}, nil
}
// Send sends a new message on conn with data and f as payload and
// attachment, respectively.
// On success, f is closed
func (conn *UnixConn) Send(data []byte, f *os.File) error {
{
var fd int = -1
if f != nil {
fd = int(f.Fd())
}
debugCheckpoint("===DEBUG=== about to send '%s'[%d]. Hit enter to confirm: ", data, fd)
}
var fds []int
if f != nil {
fds = append(fds, int(f.Fd()))
}
if err := conn.sendUnix(data, fds...); err != nil {
return err
}
if f != nil {
f.Close()
}
return nil
}
// Receive waits for a new message on conn, and receives its payload
// and attachment, or an error if any.
//
// If more than 1 file descriptor is sent in the message, they are all
// closed except for the first, which is the attachment.
// It is legal for a message to have no attachment or an empty payload.
func (conn *UnixConn) Receive() (rdata []byte, rf *os.File, rerr error) {
defer func() {
var fd int = -1
if rf != nil {
fd = int(rf.Fd())
}
debugCheckpoint("===DEBUG=== Receive() -> '%s'[%d]. Hit enter to continue.\n", rdata, fd)
}()
// Read header
header := make([]byte, 4)
nRead := uint32(0)
for nRead < 4 {
n, err := conn.receiveUnix(header[nRead:])
if err != nil {
return nil, nil, err
}
nRead = nRead + uint32(n)
}
length, hasFd := parseHeader(header)
if hasFd {
if len(conn.fds) == 0 {
return nil, nil, fmt.Errorf("No expected file descriptor in message")
}
rf = conn.fds[0]
conn.fds = conn.fds[1:]
}
rdata = make([]byte, length)
nRead = 0
for nRead < length {
n, err := conn.receiveUnix(rdata[nRead:])
if err != nil {
return nil, nil, err
}
nRead = nRead + uint32(n)
}
return
}
func (conn *UnixConn) receiveUnix(buf []byte) (int, error) {
oob := make([]byte, syscall.CmsgSpace(4))
bufn, oobn, _, _, err := conn.ReadMsgUnix(buf, oob)
if err != nil {
return 0, err
}
fd := extractFd(oob[:oobn])
if fd != -1 {
f := os.NewFile(uintptr(fd), "")
conn.fds = append(conn.fds, f)
}
return bufn, nil
}
func (conn *UnixConn) sendUnix(data []byte, fds ...int) error {
header, err := makeHeader(data, fds)
if err != nil {
return err
}
// There is a bug in conn.WriteMsgUnix where it doesn't correctly return
// the number of bytes writte (http://code.google.com/p/go/issues/detail?id=7645)
// So, we can't rely on the return value from it. However, we must use it to
// send the fds. In order to handle this we only write one byte using WriteMsgUnix
// (when we have to), as that can only ever block or fully suceed. We then write
// the rest with conn.Write()
// The reader side should not rely on this though, as hopefully this gets fixed
// in go later.
written := 0
if len(fds) != 0 {
oob := syscall.UnixRights(fds...)
wrote, _, err := conn.WriteMsgUnix(header[0:1], oob, nil)
if err != nil {
return err
}
written = written + wrote
}
for written < len(header) {
wrote, err := conn.Write(header[written:])
if err != nil {
return err
}
written = written + wrote
}
written = 0
for written < len(data) {
wrote, err := conn.Write(data[written:])
if err != nil {
return err
}
written = written + wrote
}
return nil
}
func extractFd(oob []byte) int {
// Grab forklock to make sure no forks accidentally inherit the new
// fds before they are made CLOEXEC
// There is a slight race condition between ReadMsgUnix returns and
// when we grap the lock, so this is not perfect. Unfortunately
// There is no way to pass MSG_CMSG_CLOEXEC to recvmsg() nor any
// way to implement non-blocking i/o in go, so this is hard to fix.
syscall.ForkLock.Lock()
defer syscall.ForkLock.Unlock()
scms, err := syscall.ParseSocketControlMessage(oob)
if err != nil {
return -1
}
foundFd := -1
for _, scm := range scms {
fds, err := syscall.ParseUnixRights(&scm)
if err != nil {
continue
}
for _, fd := range fds {
if foundFd == -1 {
syscall.CloseOnExec(fd)
foundFd = fd
} else {
syscall.Close(fd)
}
}
}
return foundFd
}
func socketpair() ([2]int, error) {
return syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM|syscall.FD_CLOEXEC, 0)
}
// SocketPair is a convenience wrapper around the socketpair(2) syscall.
// It returns a unix socket of type SOCK_STREAM in the form of 2 file descriptors
// not bound to the underlying filesystem.
// Messages sent on one end are received on the other, and vice-versa.
// It is the caller's responsibility to close both ends.
func SocketPair() (a *os.File, b *os.File, err error) {
defer func() {
var (
fdA int = -1
fdB int = -1
)
if a != nil {
fdA = int(a.Fd())
}
if b != nil {
fdB = int(b.Fd())
}
debugCheckpoint("===DEBUG=== SocketPair() = [%d-%d]. Hit enter to confirm: ", fdA, fdB)
}()
pair, err := socketpair()
if err != nil {
return nil, nil, err
}
return os.NewFile(uintptr(pair[0]), ""), os.NewFile(uintptr(pair[1]), ""), nil
}
func USocketPair() (*UnixConn, *UnixConn, error) {
debugCheckpoint("===DEBUG=== USocketPair(). Hit enter to confirm: ")
defer debugCheckpoint("===DEBUG=== USocketPair() returned. Hit enter to confirm ")
a, b, err := SocketPair()
if err != nil {
return nil, nil, err
}
defer a.Close()
defer b.Close()
uA, err := FileConn(a)
if err != nil {
return nil, nil, err
}
uB, err := FileConn(b)
if err != nil {
uA.Close()
return nil, nil, err
}
return uA, uB, nil
}
// FdConn wraps a file descriptor in a standard *net.UnixConn object, or
// returns an error if the file descriptor does not point to a unix socket.
// This creates a duplicate file descriptor. It's the caller's responsibility
// to close both.
func FdConn(fd int) (n *net.UnixConn, err error) {
{
debugCheckpoint("===DEBUG=== FdConn([%d]) = (unknown fd). Hit enter to confirm: ", fd)
}
f := os.NewFile(uintptr(fd), fmt.Sprintf("%d", fd))
conn, err := net.FileConn(f)
if err != nil {
return nil, err
}
uconn, ok := conn.(*net.UnixConn)
if !ok {
conn.Close()
return nil, fmt.Errorf("%d: not a unix connection", fd)
}
return uconn, nil
}

237
beam/unix/unix_test.go Normal file
View File

@ -0,0 +1,237 @@
package unix
import (
"fmt"
"io/ioutil"
"testing"
)
func TestSocketPair(t *testing.T) {
a, b, err := SocketPair()
if err != nil {
t.Fatal(err)
}
go func() {
a.Write([]byte("hello world!"))
fmt.Printf("done writing. closing\n")
a.Close()
fmt.Printf("done closing\n")
}()
data, err := ioutil.ReadAll(b)
if err != nil {
t.Fatal(err)
}
fmt.Printf("--> %s\n", data)
fmt.Printf("still open: %v\n", a.Fd())
}
func TestUSocketPair(t *testing.T) {
a, b, err := USocketPair()
if err != nil {
t.Fatal(err)
}
data := "hello world!"
go func() {
a.Write([]byte(data))
a.Close()
}()
res := make([]byte, 1024)
size, err := b.Read(res)
if err != nil {
t.Fatal(err)
}
if size != len(data) {
t.Fatal("Unexpected size")
}
if string(res[0:size]) != data {
t.Fatal("Unexpected data")
}
}
func TestSendUnixSocket(t *testing.T) {
a1, a2, err := USocketPair()
if err != nil {
t.Fatal(err)
}
// defer a1.Close()
// defer a2.Close()
b1, b2, err := USocketPair()
if err != nil {
t.Fatal(err)
}
// defer b1.Close()
// defer b2.Close()
glueA, glueB, err := SocketPair()
if err != nil {
t.Fatal(err)
}
// defer glueA.Close()
// defer glueB.Close()
go func() {
err := b2.Send([]byte("a"), glueB)
if err != nil {
t.Fatal(err)
}
}()
go func() {
err := a2.Send([]byte("b"), glueA)
if err != nil {
t.Fatal(err)
}
}()
connAhdr, connA, err := a1.Receive()
if err != nil {
t.Fatal(err)
}
if string(connAhdr) != "b" {
t.Fatalf("unexpected: %s", connAhdr)
}
connBhdr, connB, err := b1.Receive()
if err != nil {
t.Fatal(err)
}
if string(connBhdr) != "a" {
t.Fatalf("unexpected: %s", connBhdr)
}
fmt.Printf("received both ends: %v <-> %v\n", connA.Fd(), connB.Fd())
go func() {
fmt.Printf("sending message on %v\n", connA.Fd())
connA.Write([]byte("hello world"))
connA.Sync()
fmt.Printf("closing %v\n", connA.Fd())
connA.Close()
}()
data, err := ioutil.ReadAll(connB)
if err != nil {
t.Fatal(err)
}
fmt.Printf("---> %s\n", data)
}
// Ensure we get proper segmenting of messages
func TestSendSegmenting(t *testing.T) {
a, b, err := USocketPair()
if err != nil {
t.Fatal(err)
}
defer a.Close()
defer b.Close()
extrafd1, extrafd2, err := SocketPair()
if err != nil {
t.Fatal(err)
}
extrafd2.Close()
go func() {
a.Send([]byte("message 1"), nil)
a.Send([]byte("message 2"), extrafd1)
a.Send([]byte("message 3"), nil)
}()
msg1, file1, err := b.Receive()
if err != nil {
t.Fatal(err)
}
if string(msg1) != "message 1" {
t.Fatal("unexpected msg1:", string(msg1))
}
if file1 != nil {
t.Fatal("unexpectedly got file1")
}
msg2, file2, err := b.Receive()
if err != nil {
t.Fatal(err)
}
if string(msg2) != "message 2" {
t.Fatal("unexpected msg2:", string(msg2))
}
if file2 == nil {
t.Fatal("didn't get file2")
}
file2.Close()
msg3, file3, err := b.Receive()
if err != nil {
t.Fatal(err)
}
if string(msg3) != "message 3" {
t.Fatal("unexpected msg3:", string(msg3))
}
if file3 != nil {
t.Fatal("unexpectedly got file3")
}
}
// Test sending a zero byte message
func TestSendEmpty(t *testing.T) {
a, b, err := USocketPair()
if err != nil {
t.Fatal(err)
}
defer a.Close()
defer b.Close()
go func() {
a.Send([]byte{}, nil)
}()
msg, file, err := b.Receive()
if err != nil {
t.Fatal(err)
}
if len(msg) != 0 {
t.Fatalf("unexpected non-empty message: %v", msg)
}
if file != nil {
t.Fatal("unexpectedly got file")
}
}
func makeLarge(size int) []byte {
res := make([]byte, size)
for i := range res {
res[i] = byte(i % 255)
}
return res
}
func verifyLarge(data []byte, size int) bool {
if len(data) != size {
return false
}
for i := range data {
if data[i] != byte(i%255) {
return false
}
}
return true
}
// Test sending a large message
func TestSendLarge(t *testing.T) {
a, b, err := USocketPair()
if err != nil {
t.Fatal(err)
}
defer a.Close()
defer b.Close()
go func() {
a.Send(makeLarge(100000), nil)
}()
msg, file, err := b.Receive()
if err != nil {
t.Fatal(err)
}
if !verifyLarge(msg, 100000) {
t.Fatalf("unexpected message (size %d)", len(msg))
}
if file != nil {
t.Fatal("unexpectedly got file")
}
}

17
beam/utils/buf.go Normal file
View File

@ -0,0 +1,17 @@
package utils
import (
"github.com/docker/beam"
)
type Buffer []*beam.Message
func (buf *Buffer) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) {
(*buf) = append(*buf, msg)
return beam.NopReceiver{}, beam.NopSender{}, nil
}
func (buf *Buffer) Close() error {
(*buf) = nil
return nil
}

58
beam/utils/copy.go Normal file
View File

@ -0,0 +1,58 @@
package utils
import (
"github.com/docker/beam"
"sync"
)
func Copy(dst beam.Sender, src beam.Receiver) (int, error) {
var tasks sync.WaitGroup
defer tasks.Wait()
if senderTo, ok := src.(beam.SenderTo); ok {
if n, err := senderTo.SendTo(dst); err != beam.ErrIncompatibleSender {
return n, err
}
}
if receiverFrom, ok := dst.(beam.ReceiverFrom); ok {
if n, err := receiverFrom.ReceiveFrom(src); err != beam.ErrIncompatibleReceiver {
return n, err
}
}
var (
n int
)
copyAndClose := func(dst beam.Sender, src beam.Receiver) {
if dst == nil {
return
}
defer dst.Close()
if src == nil {
return
}
Copy(dst, src)
}
for {
msg, rcvR, rcvW, err := src.Receive(beam.R | beam.W)
if err != nil {
return n, err
}
sndR, sndW, err := dst.Send(msg, beam.R|beam.W)
if err != nil {
if rcvW != nil {
rcvW.Close()
}
return n, err
}
tasks.Add(2)
go func() {
copyAndClose(rcvW, sndR)
tasks.Done()
}()
go func() {
copyAndClose(sndW, rcvR)
tasks.Done()
}()
n++
}
return n, nil
}

128
beam/utils/hub.go Normal file
View File

@ -0,0 +1,128 @@
package utils
import (
"fmt"
"github.com/docker/beam"
"github.com/docker/beam/inmem"
"io"
"sync"
)
// Hub passes messages to dynamically registered handlers.
type Hub struct {
handlers *StackSender
tasks sync.WaitGroup
}
func NewHub() *Hub {
return &Hub{
handlers: NewStackSender(),
}
}
func (hub *Hub) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) {
if msg.Name == "register" {
if mode&beam.R == 0 {
return nil, nil, fmt.Errorf("register: no return channel")
}
fmt.Printf("[hub] received %v\n", msg)
hYoutr, hYoutw := inmem.Pipe()
hYinr, hYinw := inmem.Pipe()
// Register the new handler on top of the others,
// and get a reference to the previous stack of handlers.
prevHandlers := hub.handlers.Add(hYinw)
// Pass requests from the new handler to the previous chain of handlers
// hYout -> hXin
hub.tasks.Add(1)
go func() {
defer hub.tasks.Done()
Copy(prevHandlers, hYoutr)
hYoutr.Close()
}()
return hYinr, hYoutw, nil
}
fmt.Printf("sending %#v to %d handlers\n", msg, hub.handlers.Len())
return hub.handlers.Send(msg, mode)
}
func (hub *Hub) Register(dst beam.Sender) error {
in, _, err := hub.Send(&beam.Message{Name: "register"}, beam.R)
if err != nil {
return err
}
go Copy(dst, in)
return nil
}
func (hub *Hub) RegisterTask(h func(beam.Receiver, beam.Sender) error) error {
in, out, err := hub.Send(&beam.Message{Name: "register"}, beam.R|beam.W)
if err != nil {
return err
}
go func() {
h(in, out)
out.Close()
}()
return nil
}
type Handler func(msg *beam.Message, in beam.Receiver, out beam.Sender, next beam.Sender) (pass bool, err error)
func (hub *Hub) RegisterName(name string, h Handler) error {
return hub.RegisterTask(func(in beam.Receiver, out beam.Sender) error {
var tasks sync.WaitGroup
copyTask := func(dst beam.Sender, src beam.Receiver) {
tasks.Add(1)
go func() {
defer tasks.Done()
if dst == nil {
return
}
defer dst.Close()
if src == nil {
return
}
Copy(dst, src)
}()
}
for {
msg, msgin, msgout, err := in.Receive(beam.R | beam.W)
if err == io.EOF {
break
}
if err != nil {
return err
}
var pass = true
if msg.Name == name || name == "" {
pass, err = h(msg, msgin, msgout, out)
if err != nil {
if _, _, err := msgout.Send(&beam.Message{Name: "error", Args: []string{err.Error()}}, 0); err != nil {
return err
}
}
}
if pass {
nextin, nextout, err := out.Send(msg, beam.R|beam.W)
if err != nil {
return err
}
copyTask(nextout, msgin)
copyTask(msgout, nextin)
} else {
if msgout != nil {
msgout.Close()
}
}
}
return nil
})
}
func (hub *Hub) Wait() {
hub.tasks.Wait()
}
func (hub *Hub) Close() error {
return hub.handlers.Close()
}

55
beam/utils/hub_test.go Normal file
View File

@ -0,0 +1,55 @@
package utils
import (
"github.com/docker/beam"
"github.com/dotcloud/docker/pkg/testutils"
"testing"
)
func TestHubSendEmpty(t *testing.T) {
hub := NewHub()
// Send to empty hub should silently drop
r, w, err := hub.Send(&beam.Message{Name: "hello", Args: nil}, beam.R|beam.W)
// Send must not return an error
if err != nil {
t.Fatal(err)
}
// We set beam.R, so a valid receiver must be returned
if r == nil {
t.Fatalf("%#v", r)
}
// We set beam.W, so a valid receiver must be returned
if w == nil {
t.Fatalf("%#v", w)
}
}
type CountSender int
func (s *CountSender) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) {
(*s)++
return nil, nil, nil
}
func TestHubSendOneHandler(t *testing.T) {
hub := NewHub()
defer hub.Close()
testutils.Timeout(t, func() {
in, _, err := hub.Send(&beam.Message{Name: "register", Args: nil}, beam.R)
if err != nil {
t.Fatal(err)
}
go func() {
if _, _, err := hub.Send(&beam.Message{Name: "hello", Args: nil}, 0); err != nil {
t.Fatal(err)
}
}()
msg, _, _, err := in.Receive(0)
if err != nil {
t.Fatal(err)
}
if msg.Name != "hello" {
t.Fatalf("%#v", msg)
}
})
}

112
beam/utils/stack.go Normal file
View File

@ -0,0 +1,112 @@
package utils
import (
"container/list"
"fmt"
"github.com/docker/beam"
"strings"
"sync"
)
// StackSender forwards beam messages to a dynamic list of backend receivers.
// New backends are stacked on top. When a message is sent, each backend is
// tried until one succeeds. Any failing backends encountered along the way
// are removed from the queue.
type StackSender struct {
stack *list.List
l sync.RWMutex
}
func NewStackSender() *StackSender {
stack := list.New()
return &StackSender{
stack: stack,
}
}
func (s *StackSender) Send(msg *beam.Message, mode int) (r beam.Receiver, w beam.Sender, err error) {
completed := s.walk(func(h beam.Sender) (ok bool) {
r, w, err = h.Send(msg, mode)
fmt.Printf("[stacksender] sending %v to %#v returned %v\n", msg, h, err)
if err == nil {
return true
}
return false
})
// If walk was completed, it means we didn't find a valid handler
if !completed {
return r, w, err
}
// Silently drop messages if no valid backend is available.
return beam.NopSender{}.Send(msg, mode)
}
func (s *StackSender) Add(dst beam.Sender) *StackSender {
s.l.Lock()
defer s.l.Unlock()
prev := &StackSender{
stack: list.New(),
}
prev.stack.PushFrontList(s.stack)
fmt.Printf("[ADD] prev %#v\n", prev)
s.stack.PushFront(dst)
return prev
}
func (s *StackSender) Close() error {
s.walk(func(h beam.Sender) bool {
h.Close()
// remove all handlers
return false
})
return nil
}
func (s *StackSender) _walk(f func(*list.Element) bool) bool {
var e *list.Element
s.l.RLock()
e = s.stack.Front()
s.l.RUnlock()
for e != nil {
fmt.Printf("[StackSender.Walk] %v\n", e.Value.(beam.Sender))
s.l.RLock()
next := e.Next()
s.l.RUnlock()
cont := f(e)
if !cont {
return false
}
e = next
}
return true
}
func (s *StackSender) walk(f func(beam.Sender) bool) bool {
return s._walk(func(e *list.Element) bool {
ok := f(e.Value.(beam.Sender))
if ok {
// Found a valid handler. Stop walking.
return false
}
// Invalid handler: remove.
s.l.Lock()
s.stack.Remove(e)
s.l.Unlock()
return true
})
}
func (s *StackSender) Len() int {
s.l.RLock()
defer s.l.RUnlock()
return s.stack.Len()
}
func (s *StackSender) String() string {
var parts []string
s._walk(func(e *list.Element) bool {
parts = append(parts, fmt.Sprintf("%v", e.Value.(beam.Sender)))
return true
})
return fmt.Sprintf("%d:[%s]", len(parts), strings.Join(parts, "->"))
}

128
beam/utils/stack_test.go Normal file
View File

@ -0,0 +1,128 @@
package utils
import (
"github.com/docker/beam"
"github.com/docker/beam/inmem"
"github.com/docker/beam/unix"
"github.com/dotcloud/docker/pkg/testutils"
"strings"
"testing"
)
func TestStackWithPipe(t *testing.T) {
r, w := inmem.Pipe()
defer r.Close()
defer w.Close()
s := NewStackSender()
s.Add(w)
testutils.Timeout(t, func() {
go func() {
msg, _, _, err := r.Receive(0)
if err != nil {
t.Fatal(err)
}
if msg.Name != "hello" {
t.Fatalf("%#v", msg)
}
if strings.Join(msg.Args, " ") != "wonderful world" {
t.Fatalf("%#v", msg)
}
}()
_, _, err := s.Send(&beam.Message{Name: "hello", Args: []string{"wonderful", "world"}}, 0)
if err != nil {
t.Fatal(err)
}
})
}
func TestStackWithPair(t *testing.T) {
r, w, err := unix.Pair()
if err != nil {
t.Fatal(err)
}
defer r.Close()
defer w.Close()
s := NewStackSender()
s.Add(w)
testutils.Timeout(t, func() {
go func() {
msg, _, _, err := r.Receive(0)
if err != nil {
t.Fatal(err)
}
if msg.Name != "hello" {
t.Fatalf("%#v", msg)
}
if strings.Join(msg.Args, " ") != "wonderful world" {
t.Fatalf("%#v", msg)
}
}()
_, _, err := s.Send(&beam.Message{Name: "hello", Args: []string{"wonderful", "world"}}, 0)
if err != nil {
t.Fatal(err)
}
})
}
func TestStackLen(t *testing.T) {
s := NewStackSender()
if s.Len() != 0 {
t.Fatalf("empty StackSender has length %d", s.Len())
}
}
func TestStackAdd(t *testing.T) {
s := NewStackSender()
a := Buffer{}
beforeA := s.Add(&a)
// Add on an empty StackSender should return an empty StackSender
if beforeA.Len() != 0 {
t.Fatalf("%s has %d elements", beforeA, beforeA.Len())
}
if s.Len() != 1 {
t.Fatalf("%#v", beforeA)
}
// Add a 2nd element
b := Buffer{}
beforeB := s.Add(&b)
if beforeB.Len() != 1 {
t.Fatalf("%#v", beforeA)
}
if s.Len() != 2 {
t.Fatalf("%#v", beforeA)
}
s.Send(&beam.Message{Name: "for b", Args: nil}, 0)
beforeB.Send(&beam.Message{Name: "for a", Args: nil}, 0)
beforeA.Send(&beam.Message{Name: "for nobody", Args: nil}, 0)
if len(a) != 1 {
t.Fatalf("%#v", a)
}
if len(b) != 1 {
t.Fatalf("%#v", b)
}
}
// Misbehaving backends must be removed
func TestStackAddBad(t *testing.T) {
s := NewStackSender()
buf := Buffer{}
s.Add(&buf)
r, w := inmem.Pipe()
s.Add(w)
if s.Len() != 2 {
t.Fatalf("%#v", s)
}
r.Close()
if _, _, err := s.Send(&beam.Message{Name: "for the buffer", Args: nil}, 0); err != nil {
t.Fatal(err)
}
if s.Len() != 1 {
t.Fatalf("%#v")
}
if len(buf) != 1 {
t.Fatalf("%#v", buf)
}
if buf[0].Name != "for the buffer" {
t.Fatalf("%#v", buf)
}
}