Port to libchan, pt. 1

This is a minimal effort to get things running. Only the `beam` package
has changed - backend code is untouched.

- The `beam` package is still around, but functions as a
  libswarm-specific wrapper around libchan messages. It'll either be
  renamed or moved to the top-level namespace.

- We have our own Message type which serializes/deserializes to libchan
  messages, and our own versions of Sender and Receiver which speak
  libswarm. WrapSender and WrapReceiver promote libchan objects to
  their libswarm equivalents.

- We also have our own wrappers around libchan utilities: RetPipe,
  Handler, Pipe, Copy.

Signed-off-by: Aanand Prasad <aanand.prasad@gmail.com>
This commit is contained in:
Aanand Prasad 2014-06-13 17:20:03 +01:00 committed by Aaron Feng
parent fea75e7e4c
commit 1e15a54d0e
53 changed files with 211 additions and 4229 deletions

View File

@ -1 +0,0 @@
Solomon Hykes <solomon@docker.com>

View File

@ -1 +0,0 @@
Solomon Hykes <solomon@docker.com> (@shykes)

View File

@ -1,4 +0,0 @@
# Beam
**WARNING:** This has been renamed [libchan](https://github.com/docker/libchan). The code in this directory is an old version of libchan which will be removed once libswarm has been ported over to libchan.

View File

@ -1,66 +0,0 @@
package beam
import (
"errors"
"os"
)
type Sender interface {
Send(msg *Message) (Receiver, error)
Close() error
}
type Receiver interface {
Receive(mode int) (*Message, error)
}
type Message struct {
Verb Verb
Args []string
Att *os.File
Ret Sender
}
const (
Ret int = 1 << iota
// FIXME: use an `Att` flag to auto-close attachments by default
)
type ReceiverFrom interface {
ReceiveFrom(Receiver) (int, error)
}
type SenderTo interface {
SendTo(Sender) (int, error)
}
var (
ErrIncompatibleSender = errors.New("incompatible sender")
ErrIncompatibleReceiver = errors.New("incompatible receiver")
)
// RetPipe is a special value for `Message.Ret`.
// When a Message is sent with `Ret=SendPipe`, the transport must
// substitute it with the writing end of a new pipe, and return the
// other end as a return value.
type retPipe struct {
NopSender
}
var RetPipe = retPipe{}
func (r retPipe) Equals(val Sender) bool {
if rval, ok := val.(retPipe); ok {
return rval == r
}
return false
}
func Repeater(payload *Message) Sender {
return Handler(func(msg *Message) error {
msg.Ret.Send(payload)
return nil
})
}
var NotImplemented = Repeater(&Message{Verb: Error, Args: []string{"not implemented"}})

View File

@ -1,20 +0,0 @@
package beam
import (
"testing"
)
func TestModes(t *testing.T) {
if Ret == 0 {
t.Fatalf("0")
}
}
func TestRetPipe(t *testing.T) {
var (
shouldBeEqual = RetPipe
)
if RetPipe != shouldBeEqual {
t.Fatalf("%#v should equal %#v", RetPipe, shouldBeEqual)
}
}

View File

@ -1,38 +0,0 @@
package beam
import (
"io"
"sync"
)
func Copy(dst Sender, src Receiver) (int, error) {
var tasks sync.WaitGroup
defer tasks.Wait()
if senderTo, ok := src.(SenderTo); ok {
if n, err := senderTo.SendTo(dst); err != ErrIncompatibleSender {
return n, err
}
}
if receiverFrom, ok := dst.(ReceiverFrom); ok {
if n, err := receiverFrom.ReceiveFrom(src); err != ErrIncompatibleReceiver {
return n, err
}
}
var (
n int
)
for {
msg, err := src.Receive(Ret)
if err == io.EOF {
return n, nil
}
if err != nil {
return n, err
}
if _, err := dst.Send(msg); err != nil {
return n, err
}
n++
}
return n, nil
}

View File

@ -1,119 +0,0 @@
package data
import (
"fmt"
"strconv"
"strings"
)
func Encode(obj map[string][]string) string {
var msg string
msg += encodeHeader(0)
for k, values := range obj {
msg += encodeNamedList(k, values)
}
return msg
}
func encodeHeader(msgtype int) string {
return fmt.Sprintf("%03.3d;", msgtype)
}
func encodeString(s string) string {
return fmt.Sprintf("%d:%s,", len(s), s)
}
var EncodeString = encodeString
var DecodeString = decodeString
var EncodeList = encodeList
func encodeList(l []string) string {
values := make([]string, 0, len(l))
for _, s := range l {
values = append(values, encodeString(s))
}
return encodeString(strings.Join(values, ""))
}
func encodeNamedList(name string, l []string) string {
return encodeString(name) + encodeList(l)
}
func Decode(msg string) (map[string][]string, error) {
msgtype, skip, err := decodeHeader(msg)
if err != nil {
return nil, err
}
if msgtype != 0 {
// FIXME: use special error type so the caller can easily ignore
return nil, fmt.Errorf("unknown message type: %d", msgtype)
}
msg = msg[skip:]
obj := make(map[string][]string)
for len(msg) > 0 {
k, skip, err := decodeString(msg)
if err != nil {
return nil, err
}
msg = msg[skip:]
values, skip, err := decodeList(msg)
if err != nil {
return nil, err
}
msg = msg[skip:]
obj[k] = values
}
return obj, nil
}
var DecodeList = decodeList
func decodeList(msg string) ([]string, int, error) {
blob, skip, err := decodeString(msg)
if err != nil {
return nil, 0, err
}
var l []string
for len(blob) > 0 {
v, skipv, err := decodeString(blob)
if err != nil {
return nil, 0, err
}
l = append(l, v)
blob = blob[skipv:]
}
return l, skip, nil
}
func decodeString(msg string) (string, int, error) {
parts := strings.SplitN(msg, ":", 2)
if len(parts) != 2 {
return "", 0, fmt.Errorf("invalid format: no column")
}
var length int
if l, err := strconv.ParseUint(parts[0], 10, 64); err != nil {
return "", 0, err
} else {
length = int(l)
}
if len(parts[1]) < length+1 {
return "", 0, fmt.Errorf("message '%s' is %d bytes, expected at least %d", parts[1], len(parts[1]), length+1)
}
payload := parts[1][:length+1]
if payload[length] != ',' {
return "", 0, fmt.Errorf("message is not comma-terminated")
}
return payload[:length], len(parts[0]) + 1 + length + 1, nil
}
func decodeHeader(msg string) (int, int, error) {
if len(msg) < 4 {
return 0, 0, fmt.Errorf("message too small")
}
msgtype, err := strconv.ParseInt(msg[:3], 10, 32)
if err != nil {
return 0, 0, err
}
return int(msgtype), 4, nil
}

View File

@ -1,129 +0,0 @@
package data
import (
"strings"
"testing"
)
func TestEncodeHelloWorld(t *testing.T) {
input := "hello world!"
output := encodeString(input)
expectedOutput := "12:hello world!,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestEncodeEmptyString(t *testing.T) {
input := ""
output := encodeString(input)
expectedOutput := "0:,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestEncodeEmptyList(t *testing.T) {
input := []string{}
output := encodeList(input)
expectedOutput := "0:,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestEncodeEmptyMap(t *testing.T) {
input := make(map[string][]string)
output := Encode(input)
expectedOutput := "000;"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestEncode1Key1Value(t *testing.T) {
input := make(map[string][]string)
input["hello"] = []string{"world"}
output := Encode(input)
expectedOutput := "000;5:hello,8:5:world,,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestEncode1Key2Value(t *testing.T) {
input := make(map[string][]string)
input["hello"] = []string{"beautiful", "world"}
output := Encode(input)
expectedOutput := "000;5:hello,20:9:beautiful,5:world,,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestEncodeEmptyValue(t *testing.T) {
input := make(map[string][]string)
input["foo"] = []string{}
output := Encode(input)
expectedOutput := "000;3:foo,0:,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestEncodeBinaryKey(t *testing.T) {
input := make(map[string][]string)
input["foo\x00bar\x7f"] = []string{}
output := Encode(input)
expectedOutput := "000;8:foo\x00bar\x7f,0:,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestEncodeBinaryValue(t *testing.T) {
input := make(map[string][]string)
input["foo\x00bar\x7f"] = []string{"\x01\x02\x03\x04"}
output := Encode(input)
expectedOutput := "000;8:foo\x00bar\x7f,7:4:\x01\x02\x03\x04,,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestDecodeString(t *testing.T) {
validEncodedStrings := []struct {
input string
output string
skip int
}{
{"3:foo,", "foo", 6},
{"5:hello,", "hello", 8},
{"5:hello,5:world,", "hello", 8},
}
for _, sample := range validEncodedStrings {
output, skip, err := decodeString(sample.input)
if err != nil {
t.Fatalf("error decoding '%v': %v", sample.input, err)
}
if skip != sample.skip {
t.Fatalf("invalid skip: %v!=%v", skip, sample.skip)
}
if output != sample.output {
t.Fatalf("invalid output: %v!=%v", output, sample.output)
}
}
}
func TestDecode1Key1Value(t *testing.T) {
input := "000;3:foo,6:3:bar,,"
output, err := Decode(input)
if err != nil {
t.Fatal(err)
}
if v, exists := output["foo"]; !exists {
t.Fatalf("wrong output: %v\n", output)
} else if len(v) != 1 || strings.Join(v, "") != "bar" {
t.Fatalf("wrong output: %v\n", output)
}
}

View File

@ -1,93 +0,0 @@
package data
import (
"fmt"
"strings"
)
type Message string
func Empty() Message {
return Message(Encode(nil))
}
func Parse(args []string) Message {
data := make(map[string][]string)
for _, word := range args {
if strings.Contains(word, "=") {
kv := strings.SplitN(word, "=", 2)
key := kv[0]
var val string
if len(kv) == 2 {
val = kv[1]
}
data[key] = []string{val}
}
}
return Message(Encode(data))
}
func (m Message) Add(k, v string) Message {
data, err := Decode(string(m))
if err != nil {
return m
}
if values, exists := data[k]; exists {
data[k] = append(values, v)
} else {
data[k] = []string{v}
}
return Message(Encode(data))
}
func (m Message) Set(k string, v ...string) Message {
data, err := Decode(string(m))
if err != nil {
panic(err)
return m
}
data[k] = v
return Message(Encode(data))
}
func (m Message) Del(k string) Message {
data, err := Decode(string(m))
if err != nil {
panic(err)
return m
}
delete(data, k)
return Message(Encode(data))
}
func (m Message) Get(k string) []string {
data, err := Decode(string(m))
if err != nil {
return nil
}
v, exists := data[k]
if !exists {
return nil
}
return v
}
func (m Message) Pretty() string {
data, err := Decode(string(m))
if err != nil {
return ""
}
entries := make([]string, 0, len(data))
for k, values := range data {
entries = append(entries, fmt.Sprintf("%s=%s", k, strings.Join(values, ",")))
}
return strings.Join(entries, " ")
}
func (m Message) String() string {
return string(m)
}
func (m Message) Bytes() []byte {
return []byte(m)
}

View File

@ -1,53 +0,0 @@
package data
import (
"testing"
)
func TestEmptyMessage(t *testing.T) {
m := Empty()
if m.String() != Encode(nil) {
t.Fatalf("%v != %v", m.String(), Encode(nil))
}
}
func TestSetMessage(t *testing.T) {
m := Empty().Set("foo", "bar")
output := m.String()
expectedOutput := "000;3:foo,6:3:bar,,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
decodedOutput, err := Decode(output)
if err != nil {
t.Fatal(err)
}
if len(decodedOutput) != 1 {
t.Fatalf("wrong output data: %#v\n", decodedOutput)
}
}
func TestSetMessageTwice(t *testing.T) {
m := Empty().Set("foo", "bar").Set("ga", "bu")
output := m.String()
expectedOutput := "000;3:foo,6:3:bar,,2:ga,5:2:bu,,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
decodedOutput, err := Decode(output)
if err != nil {
t.Fatal(err)
}
if len(decodedOutput) != 2 {
t.Fatalf("wrong output data: %#v\n", decodedOutput)
}
}
func TestSetDelMessage(t *testing.T) {
m := Empty().Set("foo", "bar").Del("foo")
output := m.String()
expectedOutput := Encode(nil)
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}

View File

@ -1,92 +0,0 @@
##
## Netstrings spec copied as-is from http://cr.yp.to/proto/netstrings.txt
##
Netstrings
D. J. Bernstein, djb@pobox.com
19970201
1. Introduction
A netstring is a self-delimiting encoding of a string. Netstrings are
very easy to generate and to parse. Any string may be encoded as a
netstring; there are no restrictions on length or on allowed bytes.
Another virtue of a netstring is that it declares the string size up
front. Thus an application can check in advance whether it has enough
space to store the entire string.
Netstrings may be used as a basic building block for reliable network
protocols. Most high-level protocols, in effect, transmit a sequence
of strings; those strings may be encoded as netstrings and then
concatenated into a sequence of characters, which in turn may be
transmitted over a reliable stream protocol such as TCP.
Note that netstrings can be used recursively. The result of encoding
a sequence of strings is a single string. A series of those encoded
strings may in turn be encoded into a single string. And so on.
In this document, a string of 8-bit bytes may be written in two
different forms: as a series of hexadecimal numbers between angle
brackets, or as a sequence of ASCII characters between double quotes.
For example, <68 65 6c 6c 6f 20 77 6f 72 6c 64 21> is a string of
length 12; it is the same as the string "hello world!".
Although this document restricts attention to strings of 8-bit bytes,
netstrings could be used with any 6-bit-or-larger character set.
2. Definition
Any string of 8-bit bytes may be encoded as [len]":"[string]",".
Here [string] is the string and [len] is a nonempty sequence of ASCII
digits giving the length of [string] in decimal. The ASCII digits are
<30> for 0, <31> for 1, and so on up through <39> for 9. Extra zeros
at the front of [len] are prohibited: [len] begins with <30> exactly
when [string] is empty.
For example, the string "hello world!" is encoded as <31 32 3a 68
65 6c 6c 6f 20 77 6f 72 6c 64 21 2c>, i.e., "12:hello world!,". The
empty string is encoded as "0:,".
[len]":"[string]"," is called a netstring. [string] is called the
interpretation of the netstring.
3. Sample code
The following C code starts with a buffer buf of length len and
prints it as a netstring.
if (printf("%lu:",len) < 0) barf();
if (fwrite(buf,1,len,stdout) < len) barf();
if (putchar(',') < 0) barf();
The following C code reads a netstring and decodes it into a
dynamically allocated buffer buf of length len.
if (scanf("%9lu",&len) < 1) barf(); /* >999999999 bytes is bad */
if (getchar() != ':') barf();
buf = malloc(len + 1); /* malloc(0) is not portable */
if (!buf) barf();
if (fread(buf,1,len,stdin) < len) barf();
if (getchar() != ',') barf();
Both of these code fragments assume that the local character set is
ASCII, and that the relevant stdio streams are in binary mode.
4. Security considerations
The famous Finger security hole may be blamed on Finger's use of the
CRLF encoding. In that encoding, each string is simply terminated by
CRLF. This encoding has several problems. Most importantly, it does
not declare the string size in advance. This means that a correct
CRLF parser must be prepared to ask for more and more memory as it is
reading the string. In the case of Finger, a lazy implementor found
this to be too much trouble; instead he simply declared a fixed-size
buffer and used C's gets() function. The rest is history.
In contrast, as the above sample code shows, it is very easy to
handle netstrings without risking buffer overflow. Thus widespread
use of netstrings may improve network security.

Binary file not shown.

View File

@ -1,542 +0,0 @@
package main
import (
"bufio"
"flag"
"fmt"
"github.com/dotcloud/docker/pkg/beam"
"github.com/dotcloud/docker/pkg/beam/data"
"github.com/dotcloud/docker/pkg/dockerscript"
"github.com/dotcloud/docker/pkg/term"
"io"
"net"
"net/url"
"os"
"path"
"strings"
"sync"
)
var rootPlugins = []string{
"stdio",
}
var (
flX bool
flPing bool
introspect beam.ReceiveSender = beam.Devnull()
)
func main() {
fd3 := os.NewFile(3, "beam-introspect")
if introsp, err := beam.FileConn(fd3); err == nil {
introspect = introsp
Logf("introspection enabled\n")
} else {
Logf("introspection disabled\n")
}
fd3.Close()
flag.BoolVar(&flX, "x", false, "print commands as they are being executed")
flag.Parse()
if flag.NArg() == 0 {
if term.IsTerminal(0) {
// No arguments, stdin is terminal --> interactive mode
input := bufio.NewScanner(os.Stdin)
for {
fmt.Printf("[%d] beamsh> ", os.Getpid())
if !input.Scan() {
break
}
line := input.Text()
if len(line) != 0 {
cmd, err := dockerscript.Parse(strings.NewReader(line))
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
continue
}
if err := executeRootScript(cmd); err != nil {
Fatal(err)
}
}
if err := input.Err(); err == io.EOF {
break
} else if err != nil {
Fatal(err)
}
}
} else {
// No arguments, stdin not terminal --> batch mode
script, err := dockerscript.Parse(os.Stdin)
if err != nil {
Fatal("parse error: %v\n", err)
}
if err := executeRootScript(script); err != nil {
Fatal(err)
}
}
} else {
// 1+ arguments: parse them as script files
for _, scriptpath := range flag.Args() {
f, err := os.Open(scriptpath)
if err != nil {
Fatal(err)
}
script, err := dockerscript.Parse(f)
if err != nil {
Fatal("parse error: %v\n", err)
}
if err := executeRootScript(script); err != nil {
Fatal(err)
}
}
}
}
func executeRootScript(script []*dockerscript.Command) error {
if len(rootPlugins) > 0 {
// If there are root plugins, wrap the script inside them
var (
rootCmd *dockerscript.Command
lastCmd *dockerscript.Command
)
for _, plugin := range rootPlugins {
pluginCmd := &dockerscript.Command{
Args: []string{plugin},
}
if rootCmd == nil {
rootCmd = pluginCmd
} else {
lastCmd.Children = []*dockerscript.Command{pluginCmd}
}
lastCmd = pluginCmd
}
lastCmd.Children = script
script = []*dockerscript.Command{rootCmd}
}
handlers, err := Handlers(introspect)
if err != nil {
return err
}
defer handlers.Close()
var tasks sync.WaitGroup
defer func() {
Debugf("Waiting for introspection...\n")
tasks.Wait()
Debugf("DONE Waiting for introspection\n")
}()
if introspect != nil {
tasks.Add(1)
go func() {
Debugf("starting introspection\n")
defer Debugf("done with introspection\n")
defer tasks.Done()
introspect.Send(data.Empty().Set("cmd", "log", "stdout").Set("message", "introspection worked!").Bytes(), nil)
Debugf("XXX starting reading introspection messages\n")
r := beam.NewRouter(handlers)
r.NewRoute().All().Handler(func(p []byte, a *os.File) error {
Logf("[INTROSPECTION] %s\n", beam.MsgDesc(p, a))
return handlers.Send(p, a)
})
n, err := beam.Copy(r, introspect)
Debugf("XXX done reading %d introspection messages: %v\n", n, err)
}()
}
if err := executeScript(handlers, script); err != nil {
return err
}
return nil
}
func executeScript(out beam.Sender, script []*dockerscript.Command) error {
Debugf("executeScript(%s)\n", scriptString(script))
defer Debugf("executeScript(%s) DONE\n", scriptString(script))
var background sync.WaitGroup
defer background.Wait()
for _, cmd := range script {
if cmd.Background {
background.Add(1)
go func(out beam.Sender, cmd *dockerscript.Command) {
executeCommand(out, cmd)
background.Done()
}(out, cmd)
} else {
if err := executeCommand(out, cmd); err != nil {
return err
}
}
}
return nil
}
// 1) Find a handler for the command (if no handler, fail)
// 2) Attach new in & out pair to the handler
// 3) [in the background] Copy handler output to our own output
// 4) [in the background] Run the handler
// 5) Recursively executeScript() all children commands and wait for them to complete
// 6) Wait for handler to return and (shortly afterwards) output copy to complete
// 7) Profit
func executeCommand(out beam.Sender, cmd *dockerscript.Command) error {
if flX {
fmt.Printf("+ %v\n", strings.Replace(strings.TrimRight(cmd.String(), "\n"), "\n", "\n+ ", -1))
}
Debugf("executeCommand(%s)\n", strings.Join(cmd.Args, " "))
defer Debugf("executeCommand(%s) DONE\n", strings.Join(cmd.Args, " "))
if len(cmd.Args) == 0 {
return fmt.Errorf("empty command")
}
Debugf("[executeCommand] sending job '%s'\n", strings.Join(cmd.Args, " "))
job, err := beam.SendConn(out, data.Empty().Set("cmd", cmd.Args...).Set("type", "job").Bytes())
if err != nil {
return fmt.Errorf("%v\n", err)
}
var tasks sync.WaitGroup
tasks.Add(1)
Debugf("[executeCommand] spawning background copy of the output of '%s'\n", strings.Join(cmd.Args, " "))
go func() {
if out != nil {
Debugf("[executeCommand] background copy of the output of '%s'\n", strings.Join(cmd.Args, " "))
n, err := beam.Copy(out, job)
if err != nil {
Fatalf("[executeCommand] [%s] error during background copy: %v\n", strings.Join(cmd.Args, " "), err)
}
Debugf("[executeCommand] background copy done of the output of '%s': copied %d messages\n", strings.Join(cmd.Args, " "), n)
}
tasks.Done()
}()
// depth-first execution of children commands
// executeScript() blocks until all commands are completed
Debugf("[executeCommand] recursively running children of '%s'\n", strings.Join(cmd.Args, " "))
executeScript(job, cmd.Children)
Debugf("[executeCommand] DONE recursively running children of '%s'\n", strings.Join(cmd.Args, " "))
job.CloseWrite()
Debugf("[executeCommand] closing the input of '%s' (all children are completed)\n", strings.Join(cmd.Args, " "))
Debugf("[executeCommand] waiting for background copy of '%s' to complete...\n", strings.Join(cmd.Args, " "))
tasks.Wait()
Debugf("[executeCommand] background copy of '%s' complete! This means the job completed.\n", strings.Join(cmd.Args, " "))
return nil
}
type Handler func([]string, io.Writer, io.Writer, beam.Receiver, beam.Sender)
func Handlers(sink beam.Sender) (*beam.UnixConn, error) {
var tasks sync.WaitGroup
pub, priv, err := beam.USocketPair()
if err != nil {
return nil, err
}
go func() {
defer func() {
Debugf("[handlers] closewrite() on endpoint\n")
// FIXME: this is not yet necessary but will be once
// there is synchronization over standard beam messages
priv.CloseWrite()
Debugf("[handlers] done closewrite() on endpoint\n")
}()
r := beam.NewRouter(sink)
r.NewRoute().HasAttachment().KeyIncludes("type", "job").Handler(func(payload []byte, attachment *os.File) error {
conn, err := beam.FileConn(attachment)
if err != nil {
attachment.Close()
return err
}
// attachment.Close()
tasks.Add(1)
go func() {
defer tasks.Done()
defer func() {
Debugf("[handlers] '%s' closewrite\n", payload)
conn.CloseWrite()
Debugf("[handlers] '%s' done closewrite\n", payload)
}()
cmd := data.Message(payload).Get("cmd")
Debugf("[handlers] received %s\n", strings.Join(cmd, " "))
if len(cmd) == 0 {
return
}
handler := GetHandler(cmd[0])
if handler == nil {
return
}
stdout, err := beam.SendRPipe(conn, data.Empty().Set("cmd", "log", "stdout").Set("fromcmd", cmd...).Bytes())
if err != nil {
return
}
defer stdout.Close()
stderr, err := beam.SendRPipe(conn, data.Empty().Set("cmd", "log", "stderr").Set("fromcmd", cmd...).Bytes())
if err != nil {
return
}
defer stderr.Close()
Debugf("[handlers] calling %s\n", strings.Join(cmd, " "))
handler(cmd, stdout, stderr, beam.Receiver(conn), beam.Sender(conn))
Debugf("[handlers] returned: %s\n", strings.Join(cmd, " "))
}()
return nil
})
beam.Copy(r, priv)
Debugf("[handlers] waiting for all tasks\n")
tasks.Wait()
Debugf("[handlers] all tasks returned\n")
}()
return pub, nil
}
func GetHandler(name string) Handler {
if name == "logger" {
return CmdLogger
} else if name == "render" {
return CmdRender
} else if name == "devnull" {
return CmdDevnull
} else if name == "prompt" {
return CmdPrompt
} else if name == "stdio" {
return CmdStdio
} else if name == "echo" {
return CmdEcho
} else if name == "pass" {
return CmdPass
} else if name == "in" {
return CmdIn
} else if name == "exec" {
return CmdExec
} else if name == "trace" {
return CmdTrace
} else if name == "emit" {
return CmdEmit
} else if name == "print" {
return CmdPrint
} else if name == "multiprint" {
return CmdMultiprint
} else if name == "listen" {
return CmdListen
} else if name == "beamsend" {
return CmdBeamsend
} else if name == "beamreceive" {
return CmdBeamreceive
} else if name == "connect" {
return CmdConnect
} else if name == "openfile" {
return CmdOpenfile
} else if name == "spawn" {
return CmdSpawn
} else if name == "chdir" {
return CmdChdir
}
return nil
}
// VARIOUS HELPER FUNCTIONS:
func connToFile(conn net.Conn) (f *os.File, err error) {
if connWithFile, ok := conn.(interface {
File() (*os.File, error)
}); !ok {
return nil, fmt.Errorf("no file descriptor available")
} else {
f, err = connWithFile.File()
if err != nil {
return nil, err
}
}
return f, err
}
type Msg struct {
payload []byte
attachment *os.File
}
func Logf(msg string, args ...interface{}) (int, error) {
if len(msg) == 0 || msg[len(msg)-1] != '\n' {
msg = msg + "\n"
}
msg = fmt.Sprintf("[%v] [%v] %s", os.Getpid(), path.Base(os.Args[0]), msg)
return fmt.Printf(msg, args...)
}
func Debugf(msg string, args ...interface{}) {
if os.Getenv("BEAMDEBUG") != "" {
Logf(msg, args...)
}
}
func Fatalf(msg string, args ...interface{}) {
Logf(msg, args...)
os.Exit(1)
}
func Fatal(args ...interface{}) {
Fatalf("%v", args[0])
}
func scriptString(script []*dockerscript.Command) string {
lines := make([]string, 0, len(script))
for _, cmd := range script {
line := strings.Join(cmd.Args, " ")
if len(cmd.Children) > 0 {
line += fmt.Sprintf(" { %s }", scriptString(cmd.Children))
} else {
line += " {}"
}
lines = append(lines, line)
}
return fmt.Sprintf("'%s'", strings.Join(lines, "; "))
}
func dialer(addr string) (chan net.Conn, error) {
u, err := url.Parse(addr)
if err != nil {
return nil, err
}
connections := make(chan net.Conn)
go func() {
defer close(connections)
for {
conn, err := net.Dial(u.Scheme, u.Host)
if err != nil {
return
}
connections <- conn
}
}()
return connections, nil
}
func listener(addr string) (chan net.Conn, error) {
u, err := url.Parse(addr)
if err != nil {
return nil, err
}
l, err := net.Listen(u.Scheme, u.Host)
if err != nil {
return nil, err
}
connections := make(chan net.Conn)
go func() {
defer close(connections)
for {
conn, err := l.Accept()
if err != nil {
return
}
Logf("new connection\n")
connections <- conn
}
}()
return connections, nil
}
func SendToConn(connections chan net.Conn, src beam.Receiver) error {
var tasks sync.WaitGroup
defer tasks.Wait()
for {
payload, attachment, err := src.Receive()
if err == io.EOF {
return nil
} else if err != nil {
return err
}
conn, ok := <-connections
if !ok {
break
}
Logf("Sending %s\n", msgDesc(payload, attachment))
tasks.Add(1)
go func(payload []byte, attachment *os.File, conn net.Conn) {
defer tasks.Done()
if _, err := conn.Write([]byte(data.EncodeString(string(payload)))); err != nil {
return
}
if attachment == nil {
conn.Close()
return
}
var iotasks sync.WaitGroup
iotasks.Add(2)
go func(attachment *os.File, conn net.Conn) {
defer iotasks.Done()
Debugf("copying the connection to [%d]\n", attachment.Fd())
io.Copy(attachment, conn)
attachment.Close()
Debugf("done copying the connection to [%d]\n", attachment.Fd())
}(attachment, conn)
go func(attachment *os.File, conn net.Conn) {
defer iotasks.Done()
Debugf("copying [%d] to the connection\n", attachment.Fd())
io.Copy(conn, attachment)
conn.Close()
Debugf("done copying [%d] to the connection\n", attachment.Fd())
}(attachment, conn)
iotasks.Wait()
}(payload, attachment, conn)
}
return nil
}
func msgDesc(payload []byte, attachment *os.File) string {
return beam.MsgDesc(payload, attachment)
}
func ReceiveFromConn(connections chan net.Conn, dst beam.Sender) error {
for conn := range connections {
err := func() error {
Logf("parsing message from network...\n")
defer Logf("done parsing message from network\n")
buf := make([]byte, 4098)
n, err := conn.Read(buf)
if n == 0 {
conn.Close()
if err == io.EOF {
return nil
} else {
return err
}
}
Logf("decoding message from '%s'\n", buf[:n])
header, skip, err := data.DecodeString(string(buf[:n]))
if err != nil {
conn.Close()
return err
}
pub, priv, err := beam.SocketPair()
if err != nil {
return err
}
Logf("decoded message: %s\n", data.Message(header).Pretty())
go func(skipped []byte, conn net.Conn, f *os.File) {
// this closes both conn and f
if len(skipped) > 0 {
if _, err := f.Write(skipped); err != nil {
Logf("ERROR: %v\n", err)
f.Close()
conn.Close()
return
}
}
bicopy(conn, f)
}(buf[skip:n], conn, pub)
if err := dst.Send([]byte(header), priv); err != nil {
return err
}
return nil
}()
if err != nil {
Logf("Error reading from connection: %v\n", err)
}
}
return nil
}
func bicopy(a, b io.ReadWriteCloser) {
var iotasks sync.WaitGroup
oneCopy := func(dst io.WriteCloser, src io.Reader) {
defer iotasks.Done()
io.Copy(dst, src)
dst.Close()
}
iotasks.Add(2)
go oneCopy(a, b)
go oneCopy(b, a)
iotasks.Wait()
}

View File

@ -1,441 +0,0 @@
package main
import (
"bufio"
"fmt"
"github.com/dotcloud/docker/pkg/beam"
"github.com/dotcloud/docker/pkg/beam/data"
"github.com/dotcloud/docker/pkg/term"
"github.com/dotcloud/docker/utils"
"io"
"net"
"net/url"
"os"
"os/exec"
"path"
"strings"
"sync"
"text/template"
)
func CmdLogger(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
if err := os.MkdirAll("logs", 0700); err != nil {
fmt.Fprintf(stderr, "%v\n", err)
return
}
var tasks sync.WaitGroup
defer tasks.Wait()
var n int = 1
r := beam.NewRouter(out)
r.NewRoute().HasAttachment().KeyStartsWith("cmd", "log").Handler(func(payload []byte, attachment *os.File) error {
tasks.Add(1)
go func(n int) {
defer tasks.Done()
defer attachment.Close()
var streamname string
if cmd := data.Message(payload).Get("cmd"); len(cmd) == 1 || cmd[1] == "stdout" {
streamname = "stdout"
} else {
streamname = cmd[1]
}
if fromcmd := data.Message(payload).Get("fromcmd"); len(fromcmd) != 0 {
streamname = fmt.Sprintf("%s-%s", strings.Replace(strings.Join(fromcmd, "_"), "/", "_", -1), streamname)
}
logfile, err := os.OpenFile(path.Join("logs", fmt.Sprintf("%d-%s", n, streamname)), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0700)
if err != nil {
fmt.Fprintf(stderr, "%v\n", err)
return
}
defer logfile.Close()
io.Copy(logfile, attachment)
logfile.Sync()
}(n)
n++
return nil
}).Tee(out)
if _, err := beam.Copy(r, in); err != nil {
fmt.Fprintf(stderr, "%v\n", err)
return
}
}
func CmdRender(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
if len(args) != 2 {
fmt.Fprintf(stderr, "Usage: %s FORMAT\n", args[0])
out.Send(data.Empty().Set("status", "1").Bytes(), nil)
return
}
txt := args[1]
if !strings.HasSuffix(txt, "\n") {
txt += "\n"
}
t := template.Must(template.New("render").Parse(txt))
for {
payload, attachment, err := in.Receive()
if err != nil {
return
}
msg, err := data.Decode(string(payload))
if err != nil {
fmt.Fprintf(stderr, "decode error: %v\n")
}
if err := t.Execute(stdout, msg); err != nil {
fmt.Fprintf(stderr, "rendering error: %v\n", err)
out.Send(data.Empty().Set("status", "1").Bytes(), nil)
return
}
if err := out.Send(payload, attachment); err != nil {
return
}
}
}
func CmdDevnull(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
for {
_, attachment, err := in.Receive()
if err != nil {
return
}
if attachment != nil {
attachment.Close()
}
}
}
func CmdPrompt(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
if len(args) < 2 {
fmt.Fprintf(stderr, "usage: %s PROMPT...\n", args[0])
return
}
if !term.IsTerminal(0) {
fmt.Fprintf(stderr, "can't prompt: no tty available...\n")
return
}
fmt.Printf("%s: ", strings.Join(args[1:], " "))
oldState, _ := term.SaveState(0)
term.DisableEcho(0, oldState)
line, _, err := bufio.NewReader(os.Stdin).ReadLine()
if err != nil {
fmt.Fprintln(stderr, err.Error())
return
}
val := string(line)
fmt.Printf("\n")
term.RestoreTerminal(0, oldState)
out.Send(data.Empty().Set("fromcmd", args...).Set("value", val).Bytes(), nil)
}
func CmdStdio(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
var tasks sync.WaitGroup
defer tasks.Wait()
r := beam.NewRouter(out)
r.NewRoute().HasAttachment().KeyStartsWith("cmd", "log").Handler(func(payload []byte, attachment *os.File) error {
tasks.Add(1)
go func() {
defer tasks.Done()
defer attachment.Close()
io.Copy(os.Stdout, attachment)
attachment.Close()
}()
return nil
}).Tee(out)
if _, err := beam.Copy(r, in); err != nil {
Fatal(err)
fmt.Fprintf(stderr, "%v\n", err)
return
}
}
func CmdEcho(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
fmt.Fprintln(stdout, strings.Join(args[1:], " "))
}
func CmdPass(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
for {
payload, attachment, err := in.Receive()
if err != nil {
return
}
if err := out.Send(payload, attachment); err != nil {
if attachment != nil {
attachment.Close()
}
return
}
}
}
func CmdSpawn(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
c := exec.Command(utils.SelfPath())
r, w, err := os.Pipe()
if err != nil {
fmt.Fprintf(stderr, "%v\n", err)
return
}
c.Stdin = r
c.Stdout = stdout
c.Stderr = stderr
go func() {
fmt.Fprintf(w, strings.Join(args[1:], " "))
w.Sync()
w.Close()
}()
if err := c.Run(); err != nil {
fmt.Fprintf(stderr, "%v\n", err)
return
}
}
func CmdIn(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
os.Chdir(args[1])
GetHandler("pass")([]string{"pass"}, stdout, stderr, in, out)
}
func CmdExec(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
cmd := exec.Command(args[1], args[2:]...)
cmd.Stdout = stdout
cmd.Stderr = stderr
//cmd.Stdin = os.Stdin
local, remote, err := beam.SocketPair()
if err != nil {
fmt.Fprintf(stderr, "%v\n", err)
return
}
child, err := beam.FileConn(local)
if err != nil {
local.Close()
remote.Close()
fmt.Fprintf(stderr, "%v\n", err)
return
}
local.Close()
cmd.ExtraFiles = append(cmd.ExtraFiles, remote)
var tasks sync.WaitGroup
tasks.Add(1)
go func() {
defer Debugf("done copying to child\n")
defer tasks.Done()
defer child.CloseWrite()
beam.Copy(child, in)
}()
tasks.Add(1)
go func() {
defer Debugf("done copying from child %d\n")
defer tasks.Done()
r := beam.NewRouter(out)
r.NewRoute().All().Handler(func(p []byte, a *os.File) error {
return out.Send(data.Message(p).Set("pid", fmt.Sprintf("%d", cmd.Process.Pid)).Bytes(), a)
})
beam.Copy(r, child)
}()
execErr := cmd.Run()
// We can close both ends of the socket without worrying about data stuck in the buffer,
// because unix socket writes are fully synchronous.
child.Close()
tasks.Wait()
var status string
if execErr != nil {
status = execErr.Error()
} else {
status = "ok"
}
out.Send(data.Empty().Set("status", status).Set("cmd", args...).Bytes(), nil)
}
func CmdTrace(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
r := beam.NewRouter(out)
r.NewRoute().All().Handler(func(payload []byte, attachment *os.File) error {
var sfd string = "nil"
if attachment != nil {
sfd = fmt.Sprintf("%d", attachment.Fd())
}
fmt.Printf("===> %s [%s]\n", data.Message(payload).Pretty(), sfd)
out.Send(payload, attachment)
return nil
})
beam.Copy(r, in)
}
func CmdEmit(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
out.Send(data.Parse(args[1:]).Bytes(), nil)
}
func CmdPrint(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
for {
payload, a, err := in.Receive()
if err != nil {
return
}
// Skip commands
if a != nil && data.Message(payload).Get("cmd") == nil {
dup, err := beam.SendRPipe(out, payload)
if err != nil {
a.Close()
return
}
io.Copy(io.MultiWriter(os.Stdout, dup), a)
dup.Close()
} else {
if err := out.Send(payload, a); err != nil {
return
}
}
}
}
func CmdMultiprint(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
var tasks sync.WaitGroup
defer tasks.Wait()
r := beam.NewRouter(out)
multiprint := func(p []byte, a *os.File) error {
tasks.Add(1)
go func() {
defer tasks.Done()
defer a.Close()
msg := data.Message(string(p))
input := bufio.NewScanner(a)
for input.Scan() {
fmt.Printf("[%s] %s\n", msg.Pretty(), input.Text())
}
}()
return nil
}
r.NewRoute().KeyIncludes("type", "job").Passthrough(out)
r.NewRoute().HasAttachment().Handler(multiprint).Tee(out)
beam.Copy(r, in)
}
func CmdListen(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
if len(args) != 2 {
out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil)
return
}
u, err := url.Parse(args[1])
if err != nil {
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
return
}
l, err := net.Listen(u.Scheme, u.Host)
if err != nil {
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
return
}
for {
conn, err := l.Accept()
if err != nil {
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
return
}
f, err := connToFile(conn)
if err != nil {
conn.Close()
continue
}
out.Send(data.Empty().Set("type", "socket").Set("remoteaddr", conn.RemoteAddr().String()).Bytes(), f)
}
}
func CmdBeamsend(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
if len(args) < 2 {
if err := out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil {
Fatal(err)
}
return
}
var connector func(string) (chan net.Conn, error)
connector = dialer
connections, err := connector(args[1])
if err != nil {
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
return
}
// Copy in to conn
SendToConn(connections, in)
}
func CmdBeamreceive(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
if len(args) != 2 {
if err := out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil {
Fatal(err)
}
return
}
var connector func(string) (chan net.Conn, error)
connector = listener
connections, err := connector(args[1])
if err != nil {
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
return
}
// Copy in to conn
ReceiveFromConn(connections, out)
}
func CmdConnect(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
if len(args) != 2 {
out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil)
return
}
u, err := url.Parse(args[1])
if err != nil {
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
return
}
var tasks sync.WaitGroup
for {
_, attachment, err := in.Receive()
if err != nil {
break
}
if attachment == nil {
continue
}
Logf("connecting to %s/%s\n", u.Scheme, u.Host)
conn, err := net.Dial(u.Scheme, u.Host)
if err != nil {
out.Send(data.Empty().Set("cmd", "msg", "connect error: "+err.Error()).Bytes(), nil)
return
}
out.Send(data.Empty().Set("cmd", "msg", "connection established").Bytes(), nil)
tasks.Add(1)
go func(attachment *os.File, conn net.Conn) {
defer tasks.Done()
// even when successful, conn.File() returns a duplicate,
// so we must close the original
var iotasks sync.WaitGroup
iotasks.Add(2)
go func(attachment *os.File, conn net.Conn) {
defer iotasks.Done()
io.Copy(attachment, conn)
}(attachment, conn)
go func(attachment *os.File, conn net.Conn) {
defer iotasks.Done()
io.Copy(conn, attachment)
}(attachment, conn)
iotasks.Wait()
conn.Close()
attachment.Close()
}(attachment, conn)
}
tasks.Wait()
}
func CmdOpenfile(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
for _, name := range args {
f, err := os.Open(name)
if err != nil {
continue
}
if err := out.Send(data.Empty().Set("path", name).Set("type", "file").Bytes(), f); err != nil {
f.Close()
}
}
}
func CmdChdir(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
os.Chdir(args[1])
}

View File

@ -1,3 +0,0 @@
#!/usr/bin/env beamsh
exec ls -l

View File

@ -1,5 +0,0 @@
#!/usr/bin/env beamsh
trace {
exec ls -l
}

View File

@ -1,7 +0,0 @@
#!/usr/bin/env beamsh
trace {
stdio {
exec ls -l
}
}

View File

@ -1,10 +0,0 @@
#!/usr/bin/env beamsh -x
trace outer {
# stdio fails
stdio {
trace inner {
exec ls -l
}
}
}

View File

@ -1,9 +0,0 @@
#!/usr/bin/env beamsh
stdio {
trace {
stdio {
exec ls -l
}
}
}

View File

@ -1,6 +0,0 @@
#!/usr/bin/env beamsh
stdio {
# exec fails
exec ls -l
}

View File

@ -1,7 +0,0 @@
#!/usr/bin/env beamsh
stdio {
trace {
echo hello
}
}

View File

@ -1,6 +0,0 @@
#!/usr/bin/env beamsh
stdio {
# exec fails
echo hello world
}

View File

@ -1,9 +0,0 @@
#!/usr/bin/env beamsh
devnull {
multiprint {
exec tail -f /var/log/system.log &
exec ls -l
exec ls ksdhfkjdshf jksdfhkjsdhf
}
}

View File

@ -1,8 +0,0 @@
#!/usr/bin/env beamsh
print {
trace {
emit msg=hello
emit msg=world
}
}

View File

@ -1,9 +0,0 @@
#!/usr/bin/env beamsh
trace {
log {
exec ls -l
exec ls /tmp/jhsdfkjhsdjkfhsdjkfhsdjkkhsdjkf
echo hello world
}
}

View File

@ -1,9 +0,0 @@
#!/usr/bin/env beamsh
multiprint {
trace {
listen tcp://localhost:7676 &
listen tcp://localhost:8787 &
}
}

View File

@ -1,31 +0,0 @@
package beam
import (
"fmt"
)
type Handler func(msg *Message) error
func (h Handler) Send(msg *Message) (Receiver, error) {
var ret Receiver
if RetPipe.Equals(msg.Ret) {
ret, msg.Ret = Pipe()
}
go func() {
// Ret must always be a valid Sender, so handlers
// can safely send to it
if msg.Ret == nil {
msg.Ret = NopSender{}
}
err := h(msg)
if err != nil {
Obj(msg.Ret).Error("%v", err)
}
msg.Ret.Close()
}()
return ret, nil
}
func (h Handler) Close() error {
return fmt.Errorf("can't close")
}

View File

@ -1,8 +0,0 @@
This package defines a remote transport for Beam services using http2/spdy and tls.
Uses https://github.com/docker/spdystream
Pointers:
* Low-level protocol framer: http://code.google.com/p/go.net/spdy
* (incomplete) high-level server implementation: https://github.com/shykes/spdy-go

View File

@ -1,86 +0,0 @@
package http2
import (
"github.com/docker/libswarm/beam"
"github.com/docker/spdystream"
"net"
"sync"
)
type ListenSession struct {
listener net.Listener
streamChan chan *spdystream.Stream
streamLock sync.RWMutex
subStreamChans map[string]chan *spdystream.Stream
auth Authenticator
}
func NewListenSession(listener net.Listener, auth Authenticator) (*ListenSession, error) {
return &ListenSession{
listener: listener,
streamChan: make(chan *spdystream.Stream),
subStreamChans: make(map[string]chan *spdystream.Stream),
auth: auth,
}, nil
}
func (l *ListenSession) streamHandler(stream *spdystream.Stream) {
streamChan := l.getStreamChan(stream.Parent())
streamChan <- stream
}
func (l *ListenSession) addStreamChan(stream *spdystream.Stream, streamChan chan *spdystream.Stream) {
l.streamLock.Lock()
l.subStreamChans[stream.String()] = streamChan
l.streamLock.Unlock()
}
func (l *ListenSession) getStreamChan(stream *spdystream.Stream) chan *spdystream.Stream {
if stream == nil {
return l.streamChan
}
l.streamLock.RLock()
defer l.streamLock.RUnlock()
streamChan, ok := l.subStreamChans[stream.String()]
if ok {
return streamChan
}
return l.streamChan
}
func (l *ListenSession) Serve() {
for {
conn, err := l.listener.Accept()
if err != nil {
// TODO log
break
}
go func() {
authHandler, authErr := l.auth(conn)
if authErr != nil {
// TODO log
conn.Close()
return
}
spdyConn, spdyErr := spdystream.NewConnection(conn, true)
if spdyErr != nil {
// TODO log
conn.Close()
return
}
go spdyConn.Serve(l.streamHandler, authHandler)
}()
}
}
func (l *ListenSession) Shutdown() error {
return l.listener.Close()
}
func (l *ListenSession) Receive(mode int) (*beam.Message, error) {
stream := <-l.streamChan
return createStreamMessage(stream, mode, l, nil)
}

View File

@ -1,83 +0,0 @@
package http2
import (
"github.com/docker/libswarm/beam"
"io"
"net"
"testing"
)
func TestListenSession(t *testing.T) {
listen := "localhost:7743"
listener, listenErr := net.Listen("tcp", listen)
if listenErr != nil {
t.Fatalf("Error creating listener: %s", listenErr)
}
session, sessionErr := NewListenSession(listener, NoAuthenticator)
if sessionErr != nil {
t.Fatalf("Error creating session: %s", sessionErr)
}
go session.Serve()
end := make(chan bool)
go exerciseServer(t, listen, end)
msg, msgErr := session.Receive(beam.Ret)
if msgErr != nil {
t.Fatalf("Error receiving message: %s", msgErr)
}
if msg.Att == nil {
t.Fatalf("Error message missing attachment")
}
if msg.Verb != beam.Attach {
t.Fatalf("Wrong verb\nActual: %s\nExpecting: %s", msg.Verb, beam.Attach)
}
receiver, sendErr := msg.Ret.Send(&beam.Message{Verb: beam.Ack})
if sendErr != nil {
t.Fatalf("Error sending return message: %s", sendErr)
}
_, ackErr := receiver.Receive(0)
if ackErr == nil {
t.Fatalf("No error receiving from message with no return pipe")
}
if ackErr != io.EOF {
t.Fatalf("Unexpected error receiving from message: %s", ackErr)
}
<-end
shutdownErr := session.Shutdown()
if shutdownErr != nil {
t.Fatalf("Error shutting down: %s", shutdownErr)
}
}
func exerciseServer(t *testing.T, server string, endChan chan bool) {
defer close(endChan)
conn, connErr := net.Dial("tcp", server)
if connErr != nil {
t.Fatalf("Error dialing server: %s", connErr)
}
session, sessionErr := NewStreamSession(conn)
if sessionErr != nil {
t.Fatalf("Error creating session: %s", sessionErr)
}
receiver, sendErr := session.Send(&beam.Message{Verb: beam.Attach, Ret: beam.RetPipe})
if sendErr != nil {
t.Fatalf("Error sending message: %s", sendErr)
}
msg, receiveErr := receiver.Receive(beam.Ret)
if receiveErr != nil {
t.Fatalf("Error receiving message")
}
if msg.Verb != beam.Ack {
t.Fatalf("Wrong verb\nActual: %s\nExpecting: %s", msg.Verb, beam.Ack)
}
}

View File

@ -1,66 +0,0 @@
package http2
import (
"github.com/docker/libswarm/beam"
"github.com/docker/spdystream"
"net"
"sync"
)
// Serve a Beam endpoint over a single HTTP2 connection
type Server struct {
conn *spdystream.Connection
streamChan chan *spdystream.Stream
streamLock sync.RWMutex
subStreamChans map[string]chan *spdystream.Stream
}
// Create a Beam receiver from a net.Conn
func NewServer(conn net.Conn) (*Server, error) {
spdyConn, err := spdystream.NewConnection(conn, true)
if err != nil {
return nil, err
}
s := &Server{
conn: spdyConn,
streamChan: make(chan *spdystream.Stream),
subStreamChans: make(map[string]chan *spdystream.Stream),
}
go s.conn.Serve(s.streamHandler, spdystream.NoAuthHandler)
return s, nil
}
func (s *Server) Close() error {
return s.conn.Close()
}
func (s *Server) Receive(mode int) (*beam.Message, error) {
stream := <-s.streamChan
return createStreamMessage(stream, mode, s, nil)
}
func (s *Server) streamHandler(stream *spdystream.Stream) {
streamChan := s.getStreamChan(stream.Parent())
streamChan <- stream
}
func (s *Server) addStreamChan(stream *spdystream.Stream, streamChan chan *spdystream.Stream) {
s.streamLock.Lock()
s.subStreamChans[stream.String()] = streamChan
s.streamLock.Unlock()
}
func (s *Server) getStreamChan(stream *spdystream.Stream) chan *spdystream.Stream {
if stream == nil {
return s.streamChan
}
s.streamLock.RLock()
defer s.streamLock.RUnlock()
streamChan, ok := s.subStreamChans[stream.String()]
if ok {
return streamChan
}
return s.streamChan
}

View File

@ -1,109 +0,0 @@
package http2
import (
"encoding/base64"
"fmt"
"github.com/docker/libswarm/beam"
"github.com/docker/libswarm/beam/data"
"github.com/docker/spdystream"
"io"
"net"
"net/http"
"os"
"syscall"
)
type Authenticator func(conn net.Conn) (spdystream.AuthHandler, error)
func NoAuthenticator(conn net.Conn) (spdystream.AuthHandler, error) {
return func(header http.Header, slot uint8, parent uint32) bool {
return true
}, nil
}
type streamChanProvider interface {
addStreamChan(stream *spdystream.Stream, streamChan chan *spdystream.Stream)
getStreamChan(stream *spdystream.Stream) chan *spdystream.Stream
}
func encodeArgs(args []string) string {
encoded := data.Encode(map[string][]string{"args": args})
return base64.URLEncoding.EncodeToString([]byte(encoded))
}
func decodeArgs(argString string) ([]string, error) {
decoded, decodeErr := base64.URLEncoding.DecodeString(argString)
if decodeErr != nil {
return []string{}, decodeErr
}
dataMap, dataErr := data.Decode(string(decoded))
if dataErr != nil {
return []string{}, dataErr
}
return dataMap["args"], nil
}
func createStreamMessage(stream *spdystream.Stream, mode int, streamChans streamChanProvider, ret beam.Sender) (*beam.Message, error) {
verbString := stream.Headers()["Verb"]
if len(verbString) != 1 {
if len(verbString) == 0 {
return nil, fmt.Errorf("Stream(%s) is missing verb header", stream)
} else {
return nil, fmt.Errorf("Stream(%s) has multiple verb headers", stream)
}
}
verb, verbOk := verbs[verbString[0]]
if !verbOk {
return nil, fmt.Errorf("Unknown verb: %s", verbString[0])
}
var args []string
argString := stream.Headers()["Args"]
if len(argString) > 1 {
return nil, fmt.Errorf("Stream(%s) has multiple args headers", stream)
}
if len(argString) == 1 {
var err error
args, err = decodeArgs(argString[0])
if err != nil {
return nil, err
}
}
var attach *os.File
if !stream.IsFinished() {
socketFds, socketErr := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM|syscall.FD_CLOEXEC, 0)
if socketErr != nil {
return nil, socketErr
}
attach = os.NewFile(uintptr(socketFds[0]), "")
conn, connErr := net.FileConn(os.NewFile(uintptr(socketFds[1]), ""))
if connErr != nil {
return nil, connErr
}
go func() {
io.Copy(conn, stream)
}()
go func() {
io.Copy(stream, conn)
}()
}
retSender := ret
if retSender == nil || beam.RetPipe.Equals(retSender) {
retSender = &StreamSender{stream: stream, streamChans: streamChans}
}
if mode&beam.Ret == 0 {
retSender.Close()
}
return &beam.Message{
Verb: verb,
Args: args,
Att: attach,
Ret: retSender,
}, nil
}

View File

@ -1,166 +0,0 @@
package http2
import (
"fmt"
"github.com/docker/libswarm/beam"
"github.com/docker/spdystream"
"net"
"net/http"
"sync"
)
var verbs = map[string]beam.Verb{
"Ack": beam.Ack,
"Attach": beam.Attach,
"Connect": beam.Connect,
"Error": beam.Error,
"File": beam.File,
"Get": beam.Get,
"Log": beam.Log,
"Ls": beam.Ls,
"Set": beam.Set,
"Spawn": beam.Spawn,
"Start": beam.Start,
"Stop": beam.Stop,
"Watch": beam.Watch,
}
// Only allows sending, no parent stream
type StreamSession struct {
conn *spdystream.Connection
streamLock sync.Mutex
streamChan chan *spdystream.Stream
subStreamChans map[string]chan *spdystream.Stream
}
func (s *StreamSession) addStreamChan(stream *spdystream.Stream, streamChan chan *spdystream.Stream) {
s.subStreamChans[stream.String()] = streamChan
}
func (s *StreamSession) getStreamChan(stream *spdystream.Stream) chan *spdystream.Stream {
if stream == nil {
return s.streamChan
}
streamChan, ok := s.subStreamChans[stream.String()]
if ok {
return streamChan
}
return s.streamChan
}
func (s *StreamSession) newStreamHandler(stream *spdystream.Stream) {
streamChan := s.getStreamChan(stream.Parent())
streamChan <- stream
}
func NewStreamSession(conn net.Conn) (*StreamSession, error) {
session := &StreamSession{
streamChan: make(chan *spdystream.Stream),
subStreamChans: make(map[string]chan *spdystream.Stream),
}
spdyConn, spdyErr := spdystream.NewConnection(conn, false)
if spdyErr != nil {
return nil, spdyErr
}
go spdyConn.Serve(session.newStreamHandler, spdystream.NoAuthHandler)
session.conn = spdyConn
return session, nil
}
func (s *StreamSession) Send(msg *beam.Message) (ret beam.Receiver, err error) {
if msg.Att != nil {
return nil, fmt.Errorf("file attachment not yet implemented for spdy transport")
}
var fin bool
if beam.RetPipe.Equals(msg.Ret) {
fin = false
} else {
fin = true
}
headers := http.Header{
"Verb": []string{msg.Verb.String()},
"Args": []string{encodeArgs(msg.Args)},
}
stream, streamErr := s.conn.CreateStream(headers, nil, fin)
if streamErr != nil {
return nil, streamErr
}
streamChan := make(chan *spdystream.Stream)
s.subStreamChans[stream.String()] = streamChan
if beam.RetPipe.Equals(msg.Ret) {
ret = &StreamReceiver{stream: stream, streamChans: s}
} else {
ret = &beam.NopReceiver{}
}
return
}
func (s *StreamSession) Close() error {
return s.conn.Close()
}
type StreamReceiver struct {
stream *spdystream.Stream
streamChans streamChanProvider
ret beam.Sender
}
func (s *StreamReceiver) Receive(mode int) (*beam.Message, error) {
waitErr := s.stream.Wait()
if waitErr != nil {
return nil, waitErr
}
streamChan := s.streamChans.getStreamChan(s.stream)
stream := <-streamChan
return createStreamMessage(stream, mode, s.streamChans, s.ret)
}
type StreamSender struct {
stream *spdystream.Stream
streamChans streamChanProvider
}
func (s *StreamSender) Send(msg *beam.Message) (ret beam.Receiver, err error) {
if msg.Att != nil {
return nil, fmt.Errorf("file attachment not yet implemented for spdy transport")
}
var fin bool
if beam.RetPipe.Equals(msg.Ret) {
fin = false
} else {
fin = true
}
headers := http.Header{
"Verb": []string{msg.Verb.String()},
"Args": []string{encodeArgs(msg.Args)},
}
stream, streamErr := s.stream.CreateSubStream(headers, fin)
if streamErr != nil {
return nil, streamErr
}
streamChan := make(chan *spdystream.Stream)
s.streamChans.addStreamChan(stream, streamChan)
if beam.RetPipe.Equals(msg.Ret) {
ret = &StreamReceiver{stream: stream, streamChans: s.streamChans}
} else {
ret = beam.NopReceiver{}
}
return
}
func (s *StreamSender) Close() error {
// TODO Remove stream from stream chans
return s.stream.Close()
}

View File

@ -1,159 +0,0 @@
package http2
import (
//"bytes"
"github.com/docker/libswarm/beam"
//"github.com/docker/spdystream"
"io"
"net"
"testing"
)
func TestBeamSession(t *testing.T) {
end := make(chan bool)
listen := "localhost:7543"
server, serverErr := runServer(listen, t, end)
if serverErr != nil {
t.Fatalf("Error initializing server: %s", serverErr)
}
conn, connErr := net.Dial("tcp", listen)
if connErr != nil {
t.Fatalf("Error dialing server: %s", connErr)
}
sender, senderErr := NewStreamSession(conn)
if senderErr != nil {
t.Fatalf("Error creating sender: %s", senderErr)
}
// Ls interaction
receiver, sendErr := sender.Send(&beam.Message{Verb: beam.Ls, Ret: beam.RetPipe})
if sendErr != nil {
t.Fatalf("Error sending beam message: %s", sendErr)
}
message, receiveErr := receiver.Receive(0)
if receiveErr != nil {
t.Fatalf("Error receiving beam message: %s", receiveErr)
}
if message.Verb != beam.Set {
t.Errorf("Unexpected message name:\nActual: %s\nExpected: %s", message.Verb, beam.Ls.String())
}
if len(message.Args) != 3 {
t.Fatalf("Unexpected args length\nActual: %d\nExpected: %d", len(message.Args), 3)
}
if message.Args[0] != "file1" {
t.Errorf("Unexpected arg[0]\nActual: %s\nExpected: %s", message.Args[0], "file1")
}
if message.Args[1] != "file2" {
t.Errorf("Unexpected arg[0]\nActual: %s\nExpected: %s", message.Args[1], "file2")
}
if message.Args[2] != string([]byte{0x00, 0x00, 0x00}) {
t.Errorf("Unexpected arg[0]\nActual: %s\nExpected: %s", message.Args[2], []byte{0x00, 0x00, 0x00})
}
// Attach interactions
receiver, sendErr = sender.Send(&beam.Message{Verb: beam.Attach, Ret: beam.RetPipe})
if sendErr != nil {
t.Fatalf("Error sending beam message: %s", sendErr)
}
message, receiveErr = receiver.Receive(beam.Ret)
if receiveErr != nil {
t.Fatalf("Error receiving beam message: %s", receiveErr)
}
if message.Verb != beam.Ack {
t.Errorf("Unexpected message name:\nActual: %s\nExpected: %s", message.Verb, beam.Ack.String())
}
// TODO full connect interaction
//if message.Att == nil {
// t.Fatalf("Missing attachment on message")
//}
//testBytes := []byte("Hello")
//n, writeErr := message.Att.Write(testBytes)
//if writeErr != nil {
// t.Fatalf("Error writing bytes: %s", writeErr)
//}
//if n != 5 {
// t.Fatalf("Unexpected number of bytes read:\nActual: %d\nExpected: 5", n)
//}
//buf := make([]byte, 10)
//n, readErr := message.Att.Read(buf)
//if readErr != nil {
// t.Fatalf("Error writing bytes: %s", readErr)
//}
//if n != 5 {
// t.Fatalf("Unexpected number of bytes read:\nActual: %d\nExpected: 5", n)
//}
//if bytes.Compare(buf[:n], testBytes) != 0 {
// t.Fatalf("Did not receive expected message:\nActual: %s\nExpectd: %s", buf, testBytes)
//}
closeErr := server.Close()
if closeErr != nil {
t.Fatalf("Error closing server: %s", closeErr)
}
closeErr = sender.Close()
if closeErr != nil {
t.Fatalf("Error closing sender: %s", closeErr)
}
<-end
}
func runServer(listen string, t *testing.T, endChan chan bool) (io.Closer, error) {
listener, lErr := net.Listen("tcp", listen)
if lErr != nil {
return nil, lErr
}
session, sessionErr := NewListenSession(listener, NoAuthenticator)
if sessionErr != nil {
t.Fatalf("Error creating session: %s", sessionErr)
}
go session.Serve()
go func() {
defer close(endChan)
// Ls exchange
message, receiveErr := session.Receive(beam.Ret)
if receiveErr != nil {
t.Fatalf("Error receiving on server: %s", receiveErr)
}
if message.Verb != beam.Ls {
t.Fatalf("Unexpected verb: %s", message.Verb)
}
receiver, sendErr := message.Ret.Send(&beam.Message{Verb: beam.Set, Args: []string{"file1", "file2", string([]byte{0x00, 0x00, 0x00})}})
if sendErr != nil {
t.Fatalf("Error sending set message: %s", sendErr)
}
_, receiveErr = receiver.Receive(0)
if receiveErr == nil {
t.Fatalf("No error received from empty receiver")
}
if receiveErr != io.EOF {
t.Fatalf("Expected error from empty receiver: %s", receiveErr)
}
// Connect exchange
message, receiveErr = session.Receive(beam.Ret)
if receiveErr != nil {
t.Fatalf("Error receiving on server: %s", receiveErr)
}
if message.Verb != beam.Attach {
t.Fatalf("Unexpected verb: %s", message.Verb)
}
receiver, sendErr = message.Ret.Send(&beam.Message{Verb: beam.Ack})
if sendErr != nil {
t.Fatalf("Error sending set message: %s", sendErr)
}
// TODO full connect interaction
}()
return listener, nil
}

View File

@ -1,206 +0,0 @@
package beam
import (
"io"
"sync"
)
func Pipe() (*PipeReceiver, *PipeSender) {
p := new(pipe)
p.rwait.L = &p.l
p.wwait.L = &p.l
r := &PipeReceiver{p}
w := &PipeSender{p}
return r, w
}
type pipe struct {
rwait sync.Cond
wwait sync.Cond
l sync.Mutex
rl sync.Mutex
wl sync.Mutex
rerr error // if reader closed, error to give writes
werr error // if writer closed, error to give reads
msg *Message
}
func (p *pipe) psend(msg *Message) error {
var err error
// One writer at a time.
p.wl.Lock()
defer p.wl.Unlock()
p.l.Lock()
defer p.l.Unlock()
p.msg = msg
p.rwait.Signal()
for {
if p.msg == nil {
break
}
if p.rerr != nil {
err = p.rerr
break
}
if p.werr != nil {
err = io.ErrClosedPipe
}
p.wwait.Wait()
}
p.msg = nil // in case of rerr or werr
return err
}
func (p *pipe) send(msg *Message) (ret Receiver, err error) {
// Prepare nested Receiver if requested
if RetPipe.Equals(msg.Ret) {
ret, msg.Ret = Pipe()
}
err = p.psend(msg)
return
}
func (p *pipe) preceive() (*Message, error) {
p.rl.Lock()
defer p.rl.Unlock()
p.l.Lock()
defer p.l.Unlock()
for {
if p.rerr != nil {
return nil, io.ErrClosedPipe
}
if p.msg != nil {
break
}
if p.werr != nil {
return nil, p.werr
}
p.rwait.Wait()
}
msg := p.msg
p.msg = nil
p.wwait.Signal()
return msg, nil
}
func (p *pipe) receive(mode int) (*Message, error) {
msg, err := p.preceive()
if err != nil {
return nil, err
}
if msg.Ret == nil {
msg.Ret = NopSender{}
}
if mode&Ret == 0 {
msg.Ret.Close()
}
return msg, nil
}
func (p *pipe) rclose(err error) {
if err == nil {
err = io.ErrClosedPipe
}
p.l.Lock()
defer p.l.Unlock()
p.rerr = err
p.rwait.Signal()
p.wwait.Signal()
}
func (p *pipe) wclose(err error) {
if err == nil {
err = io.EOF
}
p.l.Lock()
defer p.l.Unlock()
p.werr = err
p.rwait.Signal()
p.wwait.Signal()
}
// PipeReceiver
type PipeReceiver struct {
p *pipe
}
func (r *PipeReceiver) Receive(mode int) (*Message, error) {
return r.p.receive(mode)
}
func (r *PipeReceiver) SendTo(dst Sender) (int, error) {
var n int
// If the destination is a PipeSender, we can cheat
pdst, ok := dst.(*PipeSender)
if !ok {
return 0, ErrIncompatibleSender
}
for {
pmsg, err := r.p.preceive()
if err == io.EOF {
break
}
if err != nil {
return n, err
}
if err := pdst.p.psend(pmsg); err != nil {
return n, err
}
}
n++
return n, nil
}
func (r *PipeReceiver) Close() error {
return r.CloseWithError(nil)
}
func (r *PipeReceiver) CloseWithError(err error) error {
r.p.rclose(err)
return nil
}
// PipeSender
type PipeSender struct {
p *pipe
}
func (w *PipeSender) Send(msg *Message) (Receiver, error) {
return w.p.send(msg)
}
func (w *PipeSender) ReceiveFrom(src Receiver) (int, error) {
var n int
// If the destination is a PipeReceiver, we can cheat
psrc, ok := src.(*PipeReceiver)
if !ok {
return 0, ErrIncompatibleReceiver
}
for {
pmsg, err := psrc.p.preceive()
if err == io.EOF {
break
}
if err != nil {
return n, err
}
if err := w.p.psend(pmsg); err != nil {
return n, err
}
n++
}
return n, nil
}
func (w *PipeSender) Close() error {
return w.CloseWithError(nil)
}
func (w *PipeSender) CloseWithError(err error) error {
w.p.wclose(err)
return nil
}

View File

@ -1,145 +0,0 @@
package beam
import (
"fmt"
"github.com/dotcloud/docker/pkg/testutils"
"io/ioutil"
"os"
"testing"
)
func TestInmemRetPipe(t *testing.T) {
r, w := Pipe()
defer r.Close()
defer w.Close()
wait := make(chan struct{})
go func() {
defer close(wait)
ret, err := w.Send(&Message{Verb: Log, Args: []string{"hello"}, Ret: RetPipe})
if err != nil {
t.Fatal(err)
}
msg, err := ret.Receive(0)
if err != nil {
t.Fatal(err)
}
if msg.Verb != Ack {
t.Fatalf("%#v", msg)
}
if msg.Args[0] != "this better not crash" {
t.Fatalf("%#v", msg)
}
}()
msg, err := r.Receive(Ret)
if err != nil {
t.Fatal(err)
}
if _, err := msg.Ret.Send(&Message{Verb: Ack, Args: []string{"this better not crash"}}); err != nil {
t.Fatal(err)
}
<-wait
}
func TestSimpleSend(t *testing.T) {
r, w := Pipe()
defer r.Close()
defer w.Close()
testutils.Timeout(t, func() {
go func() {
msg, err := r.Receive(0)
if err != nil {
t.Fatal(err)
}
if msg.Verb != Log {
t.Fatalf("%#v", *msg)
}
if msg.Args[0] != "hello world" {
t.Fatalf("%#v", *msg)
}
}()
if _, err := w.Send(&Message{Verb: Log, Args: []string{"hello world"}}); err != nil {
t.Fatal(err)
}
})
}
func TestSendReply(t *testing.T) {
r, w := Pipe()
defer r.Close()
defer w.Close()
testutils.Timeout(t, func() {
// Send
go func() {
ret, err := w.Send(&Message{Args: []string{"this is the request"}, Ret: RetPipe})
if err != nil {
t.Fatal(err)
}
if ret == nil {
t.Fatalf("ret = nil\n")
}
// Read for a reply
msg, err := ret.Receive(0)
if err != nil {
t.Fatal(err)
}
if msg.Args[0] != "this is the reply" {
t.Fatalf("%#v", msg)
}
}()
// Receive a message with mode=Ret
msg, err := r.Receive(Ret)
if err != nil {
t.Fatal(err)
}
if msg.Args[0] != "this is the request" {
t.Fatalf("%#v", msg)
}
if msg.Ret == nil {
t.Fatalf("%#v", msg)
}
// Send a reply
_, err = msg.Ret.Send(&Message{Args: []string{"this is the reply"}})
if err != nil {
t.Fatal(err)
}
})
}
func TestSendFile(t *testing.T) {
r, w := Pipe()
defer r.Close()
defer w.Close()
tmp, err := ioutil.TempFile("", "beam-test-")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmp.Name())
fmt.Fprintf(tmp, "hello world\n")
tmp.Sync()
tmp.Seek(0, 0)
testutils.Timeout(t, func() {
go func() {
_, err := w.Send(&Message{Verb: File, Args: []string{"path=" + tmp.Name()}, Att: tmp})
if err != nil {
t.Fatal(err)
}
}()
msg, err := r.Receive(0)
if err != nil {
t.Fatal(err)
}
if msg.Verb != File {
t.Fatalf("%#v", msg)
}
if msg.Args[0] != "path="+tmp.Name() {
t.Fatalf("%#v", msg)
}
txt, err := ioutil.ReadAll(msg.Att)
if err != nil {
t.Fatal(err)
}
if string(txt) != "hello world\n" {
t.Fatalf("%s\n", txt)
}
})
}

169
beam/message.go Normal file
View File

@ -0,0 +1,169 @@
package beam
import (
"github.com/docker/libchan"
"github.com/docker/libchan/data"
"fmt"
"os"
)
type Message struct {
Verb
Args []string
Ret Sender
Att *os.File
}
type Sender interface {
Send(msg *Message) (Receiver, error)
Close() error
Unwrap() libchan.Sender
}
type Receiver interface {
Receive(mode int) (*Message, error)
Unwrap() libchan.Receiver
}
type senderWrapper struct {
libchan.Sender
}
func WrapSender(s libchan.Sender) Sender {
return &senderWrapper{s}
}
func (s *senderWrapper) Send(msg *Message) (Receiver, error) {
recv, err := s.Sender.Send(msg.LibchanMessage())
if err != nil {
return nil, err
}
return WrapReceiver(recv), err
}
func (s *senderWrapper) Unwrap() libchan.Sender {
return s.Sender
}
type receiverWrapper struct {
libchan.Receiver
}
func WrapReceiver(r libchan.Receiver) Receiver {
return &receiverWrapper{r}
}
func (r *receiverWrapper) Receive(mode int) (*Message, error) {
lcm, err := r.Receiver.Receive(mode)
if err != nil {
return nil, err
}
return DecodeLibchanMessage(lcm)
}
func (r *receiverWrapper) Unwrap() libchan.Receiver {
return r.Receiver
}
type senderUnwrapper struct {
Sender
}
func (su *senderUnwrapper) Send(lcm *libchan.Message) (libchan.Receiver, error) {
msg, err := DecodeLibchanMessage(lcm)
if err != nil {
return nil, err
}
recv, err := su.Sender.Send(msg)
if err != nil {
return nil, err
}
return &receiverUnwrapper{recv}, nil
}
type receiverUnwrapper struct {
Receiver
}
func (ru *receiverUnwrapper) Receive(mode int) (*libchan.Message, error) {
msg, err := ru.Receiver.Receive(mode)
if err != nil {
return nil, err
}
return msg.LibchanMessage(), nil
}
func Pipe() (Receiver, Sender) {
r, s := libchan.Pipe()
return WrapReceiver(r), WrapSender(s)
}
func Copy(s Sender, r Receiver) (int, error) {
return libchan.Copy(s.Unwrap(), r.Unwrap())
}
func Handler(h func(msg *Message) error) Sender {
lch := libchan.Handler(func(lcm *libchan.Message) {
ret := WrapSender(lcm.Ret)
msg, err := DecodeLibchanMessage(lcm)
if err != nil {
ret.Send(&Message{Verb: Error, Args: []string{err.Error()}})
}
if err = h(msg); err != nil {
ret.Send(&Message{Verb: Error, Args: []string{err.Error()}})
}
})
return WrapSender(lch)
}
var RetPipe = WrapSender(libchan.RetPipe)
var Ret = libchan.Ret
var notImplementedMsg = &Message{Verb: Error, Args: []string{"not implemented"}}
var NotImplemented = WrapSender(libchan.Repeater(notImplementedMsg.LibchanMessage()))
func DecodeLibchanMessage(lcm *libchan.Message) (*Message, error) {
decoded, err := data.Decode(string(lcm.Data))
if err != nil {
return nil, err
}
verbList, exists := decoded["verb"]
if !exists {
return nil, fmt.Errorf("No 'verb' key found in message data: %s", lcm.Data)
}
if len(verbList) != 1 {
return nil, fmt.Errorf("Expected exactly one verb, got %d: %#v", len(verbList), verbList)
}
verb, err := VerbFromString(verbList[0])
if err != nil {
return nil, err
}
args, exists := decoded["args"]
if !exists {
return nil, fmt.Errorf("No 'args' key found in message data: %s", lcm.Data)
}
return &Message{
Verb: verb,
Args: args,
Ret: WrapSender(lcm.Ret),
Att: lcm.Fd,
}, nil
}
func (m *Message) LibchanMessage() *libchan.Message {
encoded := data.Empty().
Set("verb", m.Verb.String()).
Set("args", m.Args...)
var ret libchan.Sender
if m.Ret != nil {
ret = m.Ret.Unwrap()
}
return &libchan.Message{
Data: []byte(encoded),
Ret: ret,
Fd: m.Att,
}
}

View File

@ -1,21 +0,0 @@
package beam
import (
"io"
)
type NopSender struct{}
func (s NopSender) Send(msg *Message) (Receiver, error) {
return NopReceiver{}, nil
}
func (s NopSender) Close() error {
return nil
}
type NopReceiver struct{}
func (r NopReceiver) Receive(mode int) (*Message, error) {
return nil, io.EOF
}

View File

@ -1,6 +1,8 @@
package beam
import (
"github.com/docker/libchan"
"fmt"
)
@ -70,3 +72,7 @@ func (s *Server) Send(msg *Message) (Receiver, error) {
func (s *Server) Close() error {
return fmt.Errorf("can't close")
}
func (s *Server) Unwrap() libchan.Sender {
return &senderUnwrapper{s}
}

View File

@ -1,166 +0,0 @@
package unix
import (
"fmt"
"io"
"os"
)
type Sender interface {
Send([]byte, *os.File) error
}
type Receiver interface {
Receive() ([]byte, *os.File, error)
}
type ReceiveCloser interface {
Receiver
Close() error
}
type SendCloser interface {
Sender
Close() error
}
type ReceiveSender interface {
Receiver
Sender
}
const (
R int = 1 << (32 - 1 - iota)
W
)
func sendPipe(dst Sender, data []byte, mode int) (*os.File, error) {
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
var (
remote *os.File
local *os.File
)
if mode == R {
remote = r
local = w
} else if mode == W {
remote = w
local = r
}
if err := dst.Send(data, remote); err != nil {
local.Close()
remote.Close()
return nil, err
}
return local, nil
}
// SendRPipe create a pipe and sends its *read* end attached in a beam message
// to `dst`, with `data` as the message payload.
// It returns the *write* end of the pipe, or an error.
func SendRPipe(dst Sender, data []byte) (*os.File, error) {
return sendPipe(dst, data, R)
}
// SendWPipe create a pipe and sends its *read* end attached in a beam message
// to `dst`, with `data` as the message payload.
// It returns the *write* end of the pipe, or an error.
func SendWPipe(dst Sender, data []byte) (*os.File, error) {
return sendPipe(dst, data, W)
}
func SendConn(dst Sender, data []byte) (conn *UnixConn, err error) {
local, remote, err := SocketPair()
if err != nil {
return nil, err
}
defer func() {
if err != nil {
local.Close()
remote.Close()
}
}()
conn, err = FileConn(local)
if err != nil {
return nil, err
}
local.Close()
if err := dst.Send(data, remote); err != nil {
return nil, err
}
return conn, nil
}
func ReceiveConn(src Receiver) ([]byte, *UnixConn, error) {
for {
data, f, err := src.Receive()
if err != nil {
return nil, nil, err
}
if f == nil {
// Skip empty attachments
continue
}
conn, err := FileConn(f)
if err != nil {
// Skip beam attachments which are not connections
// (for example might be a regular file, directory etc)
continue
}
return data, conn, nil
}
panic("impossibru!")
return nil, nil, nil
}
func Copy(dst Sender, src Receiver) (int, error) {
var n int
for {
payload, attachment, err := src.Receive()
if err == io.EOF {
return n, nil
} else if err != nil {
return n, err
}
if err := dst.Send(payload, attachment); err != nil {
if attachment != nil {
attachment.Close()
}
return n, err
}
n++
}
panic("impossibru!")
return n, nil
}
// MsgDesc returns a human readable description of a beam message, usually
// for debugging purposes.
func MsgDesc(payload []byte, attachment *os.File) string {
var filedesc string = "<nil>"
if attachment != nil {
filedesc = fmt.Sprintf("%d", attachment.Fd())
}
return fmt.Sprintf("'%s'[%s]", payload, filedesc)
}
type devnull struct{}
func Devnull() ReceiveSender {
return devnull{}
}
func (d devnull) Send(p []byte, a *os.File) error {
if a != nil {
a.Close()
}
return nil
}
func (d devnull) Receive() ([]byte, *os.File, error) {
return nil, nil, io.EOF
}

View File

@ -1,39 +0,0 @@
package unix
import (
"github.com/dotcloud/docker/pkg/beam/data"
"testing"
)
func TestSendConn(t *testing.T) {
a, b, err := USocketPair()
if err != nil {
t.Fatal(err)
}
defer a.Close()
defer b.Close()
go func() {
conn, err := SendConn(a, data.Empty().Set("type", "connection").Bytes())
if err != nil {
t.Fatal(err)
}
if err := conn.Send(data.Empty().Set("foo", "bar").Bytes(), nil); err != nil {
t.Fatal(err)
}
conn.CloseWrite()
}()
payload, conn, err := ReceiveConn(b)
if err != nil {
t.Fatal(err)
}
if val := data.Message(string(payload)).Get("type"); val == nil || val[0] != "connection" {
t.Fatalf("%v != %v\n", val, "connection")
}
msg, _, err := conn.Receive()
if err != nil {
t.Fatal(err)
}
if val := data.Message(string(msg)).Get("foo"); val == nil || val[0] != "bar" {
t.Fatalf("%v != %v\n", val, "bar")
}
}

View File

@ -1,144 +0,0 @@
package unix
import (
"fmt"
"os"
"strconv"
"github.com/docker/libswarm/beam"
"github.com/docker/libswarm/beam/data"
)
func Pair() (*Conn, *Conn, error) {
c1, c2, err := USocketPair()
if err != nil {
return nil, nil, err
}
return &Conn{c1}, &Conn{c2}, nil
}
type Conn struct {
*UnixConn
}
func sendablePair() (conn *UnixConn, remoteFd *os.File, err error) {
// Get 2 *os.File
local, remote, err := SocketPair()
if err != nil {
return nil, nil, err
}
defer func() {
if err != nil {
local.Close()
remote.Close()
}
}()
// Convert 1 to *net.UnixConn
conn, err = FileConn(local)
if err != nil {
return nil, nil, err
}
local.Close()
// Return the "mismatched" pair
return conn, remote, nil
}
// This implements beam.Sender.Close which *only closes the sender*.
// This is similar to the pattern of only closing go channels from
// the sender's side.
// If you want to close the entire connection, call Conn.UnixConn.Close.
func (c *Conn) Close() error {
return c.UnixConn.CloseWrite()
}
func (c *Conn) Send(msg *beam.Message) (beam.Receiver, error) {
if msg.Att != nil {
return nil, fmt.Errorf("file attachment not yet implemented in unix transport")
}
parts := []string{fmt.Sprintf("%d", msg.Verb)}
parts = append(parts, msg.Args...)
b := []byte(data.EncodeList(parts))
// Setup nested streams
var (
fd *os.File
ret beam.Receiver
err error
)
// Caller requested a return pipe
if beam.RetPipe.Equals(msg.Ret) {
local, remote, err := sendablePair()
if err != nil {
return nil, err
}
fd = remote
ret = &Conn{local}
// Caller specified its own return channel
} else if msg.Ret != nil {
// The specified return channel is a unix conn: engaging cheat mode!
if retConn, ok := msg.Ret.(*Conn); ok {
fd, err = retConn.UnixConn.File()
if err != nil {
return nil, fmt.Errorf("error passing return channel: %v", err)
}
// Close duplicate fd
retConn.UnixConn.Close()
// The specified return channel is an unknown type: proxy messages.
} else {
local, remote, err := sendablePair()
if err != nil {
return nil, fmt.Errorf("error passing return channel: %v", err)
}
fd = remote
// FIXME: do we need a reference no all these background tasks?
go func() {
// Copy messages from the remote return channel to the local return channel.
// When the remote return channel is closed, also close the local return channel.
localConn := &Conn{local}
beam.Copy(msg.Ret, localConn)
msg.Ret.Close()
localConn.Close()
}()
}
}
if err := c.UnixConn.Send(b, fd); err != nil {
return nil, err
}
return ret, nil
}
func (c *Conn) Receive(mode int) (*beam.Message, error) {
b, fd, err := c.UnixConn.Receive()
if err != nil {
return nil, err
}
parts, n, err := data.DecodeList(string(b))
if err != nil {
return nil, err
}
if n != len(b) {
return nil, fmt.Errorf("garbage data %#v", b[:n])
}
if len(parts) == 0 {
return nil, fmt.Errorf("malformed message")
}
v, err := strconv.ParseUint(parts[0], 10, 32)
if err != nil {
return nil, err
}
msg := &beam.Message{Verb: beam.Verb(v), Args: parts[1:]}
// Apply mode mask
if fd != nil {
subconn, err := FileConn(fd)
if err != nil {
return nil, err
}
fd.Close()
if mode&beam.Ret != 0 {
msg.Ret = &Conn{subconn}
} else {
subconn.CloseWrite()
}
}
return msg, nil
}

View File

@ -1,74 +0,0 @@
package unix
import (
"github.com/docker/libswarm/beam"
"github.com/dotcloud/docker/pkg/testutils"
"testing"
)
func TestPair(t *testing.T) {
r, w, err := Pair()
if err != nil {
t.Fatal("Unexpected error")
}
defer r.Close()
defer w.Close()
testutils.Timeout(t, func() {
go func() {
msg, err := r.Receive(0)
if err != nil {
t.Fatal(err)
}
if msg.Verb != beam.Log {
t.Fatalf("%#v", *msg)
}
if msg.Args[0] != "hello world" {
t.Fatalf("%#v", *msg)
}
}()
_, err := w.Send(&beam.Message{Verb: beam.Log, Args: []string{"hello world"}})
if err != nil {
t.Fatal(err)
}
})
}
func TestSendReply(t *testing.T) {
r, w, err := Pair()
if err != nil {
t.Fatal(err)
}
defer r.Close()
defer w.Close()
testutils.Timeout(t, func() {
// Send
go func() {
// Send a message with mode=R
ret, err := w.Send(&beam.Message{Args: []string{"this is the request"}, Ret: beam.RetPipe})
if err != nil {
t.Fatal(err)
}
// Read for a reply
msg, err := ret.Receive(0)
if err != nil {
t.Fatal(err)
}
if msg.Args[0] != "this is the reply" {
t.Fatalf("%#v", msg)
}
}()
// Receive a message with mode=W
msg, err := r.Receive(beam.Ret)
if err != nil {
t.Fatal(err)
}
if msg.Args[0] != "this is the request" {
t.Fatalf("%#v", msg)
}
// Send a reply
_, err = msg.Ret.Send(&beam.Message{Args: []string{"this is the reply"}})
if err != nil {
t.Fatal(err)
}
})
}

View File

@ -1,317 +0,0 @@
package unix
import (
"bufio"
"fmt"
"net"
"os"
"syscall"
)
func debugCheckpoint(msg string, args ...interface{}) {
if os.Getenv("DEBUG") == "" {
return
}
os.Stdout.Sync()
tty, _ := os.OpenFile("/dev/tty", os.O_RDWR, 0700)
fmt.Fprintf(tty, msg, args...)
bufio.NewScanner(tty).Scan()
tty.Close()
}
type UnixConn struct {
*net.UnixConn
fds []*os.File
}
// Framing:
// In order to handle framing in Send/Recieve, as these give frame
// boundaries we use a very simple 4 bytes header. It is a big endiand
// uint32 where the high bit is set if the message includes a file
// descriptor. The rest of the uint32 is the length of the next frame.
// We need the bit in order to be able to assign recieved fds to
// the right message, as multiple messages may be coalesced into
// a single recieve operation.
func makeHeader(data []byte, fds []int) ([]byte, error) {
header := make([]byte, 4)
length := uint32(len(data))
if length > 0x7fffffff {
return nil, fmt.Errorf("Data to large")
}
if len(fds) != 0 {
length = length | 0x80000000
}
header[0] = byte((length >> 24) & 0xff)
header[1] = byte((length >> 16) & 0xff)
header[2] = byte((length >> 8) & 0xff)
header[3] = byte((length >> 0) & 0xff)
return header, nil
}
func parseHeader(header []byte) (uint32, bool) {
length := uint32(header[0])<<24 | uint32(header[1])<<16 | uint32(header[2])<<8 | uint32(header[3])
hasFd := length&0x80000000 != 0
length = length & ^uint32(0x80000000)
return length, hasFd
}
func FileConn(f *os.File) (*UnixConn, error) {
conn, err := net.FileConn(f)
if err != nil {
return nil, err
}
uconn, ok := conn.(*net.UnixConn)
if !ok {
conn.Close()
return nil, fmt.Errorf("%d: not a unix connection", f.Fd())
}
return &UnixConn{UnixConn: uconn}, nil
}
// Send sends a new message on conn with data and f as payload and
// attachment, respectively.
// On success, f is closed
func (conn *UnixConn) Send(data []byte, f *os.File) error {
{
var fd int = -1
if f != nil {
fd = int(f.Fd())
}
debugCheckpoint("===DEBUG=== about to send '%s'[%d]. Hit enter to confirm: ", data, fd)
}
var fds []int
if f != nil {
fds = append(fds, int(f.Fd()))
}
if err := conn.sendUnix(data, fds...); err != nil {
return err
}
if f != nil {
f.Close()
}
return nil
}
// Receive waits for a new message on conn, and receives its payload
// and attachment, or an error if any.
//
// If more than 1 file descriptor is sent in the message, they are all
// closed except for the first, which is the attachment.
// It is legal for a message to have no attachment or an empty payload.
func (conn *UnixConn) Receive() (rdata []byte, rf *os.File, rerr error) {
defer func() {
var fd int = -1
if rf != nil {
fd = int(rf.Fd())
}
debugCheckpoint("===DEBUG=== Receive() -> '%s'[%d]. Hit enter to continue.\n", rdata, fd)
}()
// Read header
header := make([]byte, 4)
nRead := uint32(0)
for nRead < 4 {
n, err := conn.receiveUnix(header[nRead:])
if err != nil {
return nil, nil, err
}
nRead = nRead + uint32(n)
}
length, hasFd := parseHeader(header)
if hasFd {
if len(conn.fds) == 0 {
return nil, nil, fmt.Errorf("No expected file descriptor in message")
}
rf = conn.fds[0]
conn.fds = conn.fds[1:]
}
rdata = make([]byte, length)
nRead = 0
for nRead < length {
n, err := conn.receiveUnix(rdata[nRead:])
if err != nil {
return nil, nil, err
}
nRead = nRead + uint32(n)
}
return
}
func (conn *UnixConn) receiveUnix(buf []byte) (int, error) {
oob := make([]byte, syscall.CmsgSpace(4))
bufn, oobn, _, _, err := conn.ReadMsgUnix(buf, oob)
if err != nil {
return 0, err
}
fd := extractFd(oob[:oobn])
if fd != -1 {
f := os.NewFile(uintptr(fd), "")
conn.fds = append(conn.fds, f)
}
return bufn, nil
}
func (conn *UnixConn) sendUnix(data []byte, fds ...int) error {
header, err := makeHeader(data, fds)
if err != nil {
return err
}
// There is a bug in conn.WriteMsgUnix where it doesn't correctly return
// the number of bytes writte (http://code.google.com/p/go/issues/detail?id=7645)
// So, we can't rely on the return value from it. However, we must use it to
// send the fds. In order to handle this we only write one byte using WriteMsgUnix
// (when we have to), as that can only ever block or fully suceed. We then write
// the rest with conn.Write()
// The reader side should not rely on this though, as hopefully this gets fixed
// in go later.
written := 0
if len(fds) != 0 {
oob := syscall.UnixRights(fds...)
wrote, _, err := conn.WriteMsgUnix(header[0:1], oob, nil)
if err != nil {
return err
}
written = written + wrote
}
for written < len(header) {
wrote, err := conn.Write(header[written:])
if err != nil {
return err
}
written = written + wrote
}
written = 0
for written < len(data) {
wrote, err := conn.Write(data[written:])
if err != nil {
return err
}
written = written + wrote
}
return nil
}
func extractFd(oob []byte) int {
// Grab forklock to make sure no forks accidentally inherit the new
// fds before they are made CLOEXEC
// There is a slight race condition between ReadMsgUnix returns and
// when we grap the lock, so this is not perfect. Unfortunately
// There is no way to pass MSG_CMSG_CLOEXEC to recvmsg() nor any
// way to implement non-blocking i/o in go, so this is hard to fix.
syscall.ForkLock.Lock()
defer syscall.ForkLock.Unlock()
scms, err := syscall.ParseSocketControlMessage(oob)
if err != nil {
return -1
}
foundFd := -1
for _, scm := range scms {
fds, err := syscall.ParseUnixRights(&scm)
if err != nil {
continue
}
for _, fd := range fds {
if foundFd == -1 {
syscall.CloseOnExec(fd)
foundFd = fd
} else {
syscall.Close(fd)
}
}
}
return foundFd
}
func socketpair() ([2]int, error) {
return syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM|syscall.FD_CLOEXEC, 0)
}
// SocketPair is a convenience wrapper around the socketpair(2) syscall.
// It returns a unix socket of type SOCK_STREAM in the form of 2 file descriptors
// not bound to the underlying filesystem.
// Messages sent on one end are received on the other, and vice-versa.
// It is the caller's responsibility to close both ends.
func SocketPair() (a *os.File, b *os.File, err error) {
defer func() {
var (
fdA int = -1
fdB int = -1
)
if a != nil {
fdA = int(a.Fd())
}
if b != nil {
fdB = int(b.Fd())
}
debugCheckpoint("===DEBUG=== SocketPair() = [%d-%d]. Hit enter to confirm: ", fdA, fdB)
}()
pair, err := socketpair()
if err != nil {
return nil, nil, err
}
return os.NewFile(uintptr(pair[0]), ""), os.NewFile(uintptr(pair[1]), ""), nil
}
func USocketPair() (*UnixConn, *UnixConn, error) {
debugCheckpoint("===DEBUG=== USocketPair(). Hit enter to confirm: ")
defer debugCheckpoint("===DEBUG=== USocketPair() returned. Hit enter to confirm ")
a, b, err := SocketPair()
if err != nil {
return nil, nil, err
}
defer a.Close()
defer b.Close()
uA, err := FileConn(a)
if err != nil {
return nil, nil, err
}
uB, err := FileConn(b)
if err != nil {
uA.Close()
return nil, nil, err
}
return uA, uB, nil
}
// FdConn wraps a file descriptor in a standard *net.UnixConn object, or
// returns an error if the file descriptor does not point to a unix socket.
// This creates a duplicate file descriptor. It's the caller's responsibility
// to close both.
func FdConn(fd int) (n *net.UnixConn, err error) {
{
debugCheckpoint("===DEBUG=== FdConn([%d]) = (unknown fd). Hit enter to confirm: ", fd)
}
f := os.NewFile(uintptr(fd), fmt.Sprintf("%d", fd))
conn, err := net.FileConn(f)
if err != nil {
return nil, err
}
uconn, ok := conn.(*net.UnixConn)
if !ok {
conn.Close()
return nil, fmt.Errorf("%d: not a unix connection", fd)
}
return uconn, nil
}

View File

@ -1,237 +0,0 @@
package unix
import (
"fmt"
"io/ioutil"
"testing"
)
func TestSocketPair(t *testing.T) {
a, b, err := SocketPair()
if err != nil {
t.Fatal(err)
}
go func() {
a.Write([]byte("hello world!"))
fmt.Printf("done writing. closing\n")
a.Close()
fmt.Printf("done closing\n")
}()
data, err := ioutil.ReadAll(b)
if err != nil {
t.Fatal(err)
}
fmt.Printf("--> %s\n", data)
fmt.Printf("still open: %v\n", a.Fd())
}
func TestUSocketPair(t *testing.T) {
a, b, err := USocketPair()
if err != nil {
t.Fatal(err)
}
data := "hello world!"
go func() {
a.Write([]byte(data))
a.Close()
}()
res := make([]byte, 1024)
size, err := b.Read(res)
if err != nil {
t.Fatal(err)
}
if size != len(data) {
t.Fatal("Unexpected size")
}
if string(res[0:size]) != data {
t.Fatal("Unexpected data")
}
}
func TestSendUnixSocket(t *testing.T) {
a1, a2, err := USocketPair()
if err != nil {
t.Fatal(err)
}
// defer a1.Close()
// defer a2.Close()
b1, b2, err := USocketPair()
if err != nil {
t.Fatal(err)
}
// defer b1.Close()
// defer b2.Close()
glueA, glueB, err := SocketPair()
if err != nil {
t.Fatal(err)
}
// defer glueA.Close()
// defer glueB.Close()
go func() {
err := b2.Send([]byte("a"), glueB)
if err != nil {
t.Fatal(err)
}
}()
go func() {
err := a2.Send([]byte("b"), glueA)
if err != nil {
t.Fatal(err)
}
}()
connAhdr, connA, err := a1.Receive()
if err != nil {
t.Fatal(err)
}
if string(connAhdr) != "b" {
t.Fatalf("unexpected: %s", connAhdr)
}
connBhdr, connB, err := b1.Receive()
if err != nil {
t.Fatal(err)
}
if string(connBhdr) != "a" {
t.Fatalf("unexpected: %s", connBhdr)
}
fmt.Printf("received both ends: %v <-> %v\n", connA.Fd(), connB.Fd())
go func() {
fmt.Printf("sending message on %v\n", connA.Fd())
connA.Write([]byte("hello world"))
connA.Sync()
fmt.Printf("closing %v\n", connA.Fd())
connA.Close()
}()
data, err := ioutil.ReadAll(connB)
if err != nil {
t.Fatal(err)
}
fmt.Printf("---> %s\n", data)
}
// Ensure we get proper segmenting of messages
func TestSendSegmenting(t *testing.T) {
a, b, err := USocketPair()
if err != nil {
t.Fatal(err)
}
defer a.Close()
defer b.Close()
extrafd1, extrafd2, err := SocketPair()
if err != nil {
t.Fatal(err)
}
extrafd2.Close()
go func() {
a.Send([]byte("message 1"), nil)
a.Send([]byte("message 2"), extrafd1)
a.Send([]byte("message 3"), nil)
}()
msg1, file1, err := b.Receive()
if err != nil {
t.Fatal(err)
}
if string(msg1) != "message 1" {
t.Fatal("unexpected msg1:", string(msg1))
}
if file1 != nil {
t.Fatal("unexpectedly got file1")
}
msg2, file2, err := b.Receive()
if err != nil {
t.Fatal(err)
}
if string(msg2) != "message 2" {
t.Fatal("unexpected msg2:", string(msg2))
}
if file2 == nil {
t.Fatal("didn't get file2")
}
file2.Close()
msg3, file3, err := b.Receive()
if err != nil {
t.Fatal(err)
}
if string(msg3) != "message 3" {
t.Fatal("unexpected msg3:", string(msg3))
}
if file3 != nil {
t.Fatal("unexpectedly got file3")
}
}
// Test sending a zero byte message
func TestSendEmpty(t *testing.T) {
a, b, err := USocketPair()
if err != nil {
t.Fatal(err)
}
defer a.Close()
defer b.Close()
go func() {
a.Send([]byte{}, nil)
}()
msg, file, err := b.Receive()
if err != nil {
t.Fatal(err)
}
if len(msg) != 0 {
t.Fatalf("unexpected non-empty message: %v", msg)
}
if file != nil {
t.Fatal("unexpectedly got file")
}
}
func makeLarge(size int) []byte {
res := make([]byte, size)
for i := range res {
res[i] = byte(i % 255)
}
return res
}
func verifyLarge(data []byte, size int) bool {
if len(data) != size {
return false
}
for i := range data {
if data[i] != byte(i%255) {
return false
}
}
return true
}
// Test sending a large message
func TestSendLarge(t *testing.T) {
a, b, err := USocketPair()
if err != nil {
t.Fatal(err)
}
defer a.Close()
defer b.Close()
go func() {
a.Send(makeLarge(100000), nil)
}()
msg, file, err := b.Receive()
if err != nil {
t.Fatal(err)
}
if !verifyLarge(msg, 100000) {
t.Fatalf("unexpected message (size %d)", len(msg))
}
if file != nil {
t.Fatal("unexpectedly got file")
}
}

View File

@ -1,17 +0,0 @@
package utils
import (
"github.com/docker/libswarm/beam"
)
type Buffer []*beam.Message
func (buf *Buffer) Send(msg *beam.Message) (beam.Receiver, error) {
(*buf) = append(*buf, msg)
return beam.NopReceiver{}, nil
}
func (buf *Buffer) Close() error {
(*buf) = nil
return nil
}

View File

@ -1,41 +0,0 @@
package utils
import (
"github.com/docker/libswarm/beam"
)
type Queue struct {
*beam.PipeSender
dst beam.Sender
ch chan *beam.Message
}
func NewQueue(dst beam.Sender, size int) *Queue {
r, w := beam.Pipe()
q := &Queue{
PipeSender: w,
dst: dst,
ch: make(chan *beam.Message, size),
}
go func() {
defer close(q.ch)
for {
msg, err := r.Receive(beam.Ret)
if err != nil {
r.Close()
return
}
q.ch <- msg
}
}()
go func() {
for msg := range q.ch {
_, err := dst.Send(msg)
if err != nil {
r.Close()
return
}
}
}()
return q
}

View File

@ -1,43 +0,0 @@
package utils
import (
"github.com/docker/libswarm/beam"
"testing"
)
func TestSendRet(t *testing.T) {
r, w := beam.Pipe()
defer r.Close()
defer w.Close()
q := NewQueue(w, 1)
defer q.Close()
ret, err := q.Send(&beam.Message{Verb: beam.Log, Args: []string{"ping"}, Ret: beam.RetPipe})
if err != nil {
t.Fatal(err)
}
go func() {
ping, err := r.Receive(beam.Ret)
if err != nil {
t.Fatal(err)
}
if _, err := ping.Ret.Send(&beam.Message{Verb: beam.Log, Args: []string{"pong"}}); err != nil {
t.Fatal(err)
}
}()
pong, err := ret.Receive(0)
if err != nil {
t.Fatal(err)
}
if pong.Verb != beam.Log {
t.Fatal(err)
}
}
func TestSendClose(t *testing.T) {
q := NewQueue(beam.NopSender{}, 1)
q.Send(&beam.Message{Verb: beam.Error, Args: []string{"hello"}})
q.Close()
if _, err := q.Send(&beam.Message{Verb: beam.Ack, Args: []string{"again"}}); err == nil {
t.Fatal("send on closed queue should return an error")
}
}

View File

@ -1,112 +0,0 @@
package utils
import (
"container/list"
"fmt"
"github.com/docker/libswarm/beam"
"strings"
"sync"
)
// StackSender forwards beam messages to a dynamic list of backend receivers.
// New backends are stacked on top. When a message is sent, each backend is
// tried until one succeeds. Any failing backends encountered along the way
// are removed from the queue.
type StackSender struct {
stack *list.List
l sync.RWMutex
}
func NewStackSender() *StackSender {
stack := list.New()
return &StackSender{
stack: stack,
}
}
func (s *StackSender) Send(msg *beam.Message) (ret beam.Receiver, err error) {
completed := s.walk(func(h beam.Sender) (ok bool) {
ret, err = h.Send(msg)
fmt.Printf("[stacksender] sending %v to %#v returned %v\n", msg, h, err)
if err == nil {
return true
}
return false
})
// If walk was completed, it means we didn't find a valid handler
if !completed {
return ret, err
}
// Silently drop messages if no valid backend is available.
return beam.NopSender{}.Send(msg)
}
func (s *StackSender) Add(dst beam.Sender) *StackSender {
s.l.Lock()
defer s.l.Unlock()
prev := &StackSender{
stack: list.New(),
}
prev.stack.PushFrontList(s.stack)
fmt.Printf("[ADD] prev %#v\n", prev)
s.stack.PushFront(dst)
return prev
}
func (s *StackSender) Close() error {
s.walk(func(h beam.Sender) bool {
h.Close()
// remove all handlers
return false
})
return nil
}
func (s *StackSender) _walk(f func(*list.Element) bool) bool {
var e *list.Element
s.l.RLock()
e = s.stack.Front()
s.l.RUnlock()
for e != nil {
fmt.Printf("[StackSender.Walk] %v\n", e.Value.(beam.Sender))
s.l.RLock()
next := e.Next()
s.l.RUnlock()
cont := f(e)
if !cont {
return false
}
e = next
}
return true
}
func (s *StackSender) walk(f func(beam.Sender) bool) bool {
return s._walk(func(e *list.Element) bool {
ok := f(e.Value.(beam.Sender))
if ok {
// Found a valid handler. Stop walking.
return false
}
// Invalid handler: remove.
s.l.Lock()
s.stack.Remove(e)
s.l.Unlock()
return true
})
}
func (s *StackSender) Len() int {
s.l.RLock()
defer s.l.RUnlock()
return s.stack.Len()
}
func (s *StackSender) String() string {
var parts []string
s._walk(func(e *list.Element) bool {
parts = append(parts, fmt.Sprintf("%v", e.Value.(beam.Sender)))
return true
})
return fmt.Sprintf("%d:[%s]", len(parts), strings.Join(parts, "->"))
}

View File

@ -1,127 +0,0 @@
package utils
import (
"github.com/docker/libswarm/beam"
"github.com/docker/libswarm/beam/unix"
"github.com/dotcloud/docker/pkg/testutils"
"strings"
"testing"
)
func TestStackWithPipe(t *testing.T) {
r, w := beam.Pipe()
defer r.Close()
defer w.Close()
s := NewStackSender()
s.Add(w)
testutils.Timeout(t, func() {
go func() {
msg, err := r.Receive(0)
if err != nil {
t.Fatal(err)
}
if msg.Verb != beam.Log {
t.Fatalf("%#v", msg)
}
if strings.Join(msg.Args, " ") != "wonderful world" {
t.Fatalf("%#v", msg)
}
}()
_, err := s.Send(&beam.Message{Verb: beam.Log, Args: []string{"wonderful", "world"}})
if err != nil {
t.Fatal(err)
}
})
}
func TestStackWithPair(t *testing.T) {
r, w, err := unix.Pair()
if err != nil {
t.Fatal(err)
}
defer r.Close()
defer w.Close()
s := NewStackSender()
s.Add(w)
testutils.Timeout(t, func() {
go func() {
msg, err := r.Receive(0)
if err != nil {
t.Fatal(err)
}
if msg.Verb != beam.Log {
t.Fatalf("%#v", msg)
}
if strings.Join(msg.Args, " ") != "wonderful world" {
t.Fatalf("%#v", msg)
}
}()
_, err := s.Send(&beam.Message{Verb: beam.Log, Args: []string{"wonderful", "world"}})
if err != nil {
t.Fatal(err)
}
})
}
func TestStackLen(t *testing.T) {
s := NewStackSender()
if s.Len() != 0 {
t.Fatalf("empty StackSender has length %d", s.Len())
}
}
func TestStackAdd(t *testing.T) {
s := NewStackSender()
a := Buffer{}
beforeA := s.Add(&a)
// Add on an empty StackSender should return an empty StackSender
if beforeA.Len() != 0 {
t.Fatalf("%s has %d elements", beforeA, beforeA.Len())
}
if s.Len() != 1 {
t.Fatalf("%#v", beforeA)
}
// Add a 2nd element
b := Buffer{}
beforeB := s.Add(&b)
if beforeB.Len() != 1 {
t.Fatalf("%#v", beforeA)
}
if s.Len() != 2 {
t.Fatalf("%#v", beforeA)
}
s.Send(&beam.Message{Verb: beam.Log, Args: []string{"for b"}})
beforeB.Send(&beam.Message{Verb: beam.Log, Args: []string{"for a"}})
beforeA.Send(&beam.Message{Verb: beam.Log, Args: []string{"for nobody"}})
if len(a) != 1 {
t.Fatalf("%#v", a)
}
if len(b) != 1 {
t.Fatalf("%#v", b)
}
}
// Misbehaving backends must be removed
func TestStackAddBad(t *testing.T) {
s := NewStackSender()
buf := Buffer{}
s.Add(&buf)
r, w := beam.Pipe()
s.Add(w)
if s.Len() != 2 {
t.Fatalf("%#v", s)
}
r.Close()
if _, err := s.Send(&beam.Message{Verb: beam.Log, Args: []string{"for the buffer"}}); err != nil {
t.Fatal(err)
}
if s.Len() != 1 {
t.Fatalf("%#v")
}
if len(buf) != 1 {
t.Fatalf("%#v", buf)
}
if buf[0].Args[0] != "for the buffer" {
t.Fatalf("%#v", buf)
}
}

View File

@ -1,5 +1,9 @@
package beam
import (
"fmt"
)
type Verb uint32
const (
@ -18,6 +22,38 @@ const (
Watch
)
func VerbFromString(s string) (Verb, error) {
switch s {
case "Ack":
return Ack, nil
case "Attach":
return Attach, nil
case "Connect":
return Connect, nil
case "Error":
return Error, nil
case "File":
return File, nil
case "Get":
return Get, nil
case "Log":
return Log, nil
case "Ls":
return Ls, nil
case "Set":
return Set, nil
case "Spawn":
return Spawn, nil
case "Start":
return Start, nil
case "Stop":
return Stop, nil
case "Watch":
return Watch, nil
}
return 0, fmt.Errorf("Unrecognised verb: %s", s)
}
func (v Verb) String() string {
switch v {
case Ack:

View File

@ -1,72 +0,0 @@
package ws
import (
"errors"
"github.com/docker/libswarm/beam"
"github.com/docker/libswarm/beam/http2"
"github.com/docker/spdystream/ws"
"github.com/gorilla/websocket"
"net/http"
)
// Connect to a Beam server over a Websocket connection as a client
func NewSender(wsConn *websocket.Conn) (beam.Sender, error) {
session, err := http2.NewStreamSession(ws.NewConnection(wsConn))
if err != nil {
return nil, err
}
return session, nil
}
// Upgrade an HTTP connection to a Beam over HTTP2 over
// Websockets connection.
type Upgrader struct {
Upgrader websocket.Upgrader
}
func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (*http2.Server, error) {
wsConn, err := u.Upgrader.Upgrade(w, r, responseHeader)
if err != nil {
return nil, err
}
netConn := ws.NewConnection(wsConn)
server, err := http2.NewServer(netConn)
if err != nil {
netConn.Close()
return nil, err
}
return server, nil
}
// Returns true if a handshake error occured in websockets, which means
// a response has already been written to the stream.
func IsHandshakeError(err error) bool {
_, ok := err.(websocket.HandshakeError)
return ok
}
type BeamFunc func(beam.Receiver)
// Handler function for serving Beam over HTTP. Will invoke f and
// then close the server's Beam endpoint after f returns.
func Serve(u *Upgrader, f BeamFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
u.Upgrader.Error(w, r, http.StatusMethodNotAllowed, errors.New("Method not allowed"))
return
}
server, err := u.Upgrade(w, r, nil)
if err != nil {
if !IsHandshakeError(err) {
u.Upgrader.Error(w, r, http.StatusInternalServerError, errors.New("Unable to open an HTTP2 connection over Websockets"))
}
return
}
defer server.Close()
f(server)
}
}

View File

@ -1,73 +0,0 @@
package ws
import (
"github.com/docker/libswarm/beam"
"github.com/gorilla/websocket"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
)
func TestServe(t *testing.T) {
gotAck := make(chan bool)
u := &Upgrader{}
server := httptest.NewServer(Serve(u, func(r beam.Receiver) {
msg, msgErr := r.Receive(beam.Ret)
if msgErr != nil {
t.Fatalf("Error receiving message: %s", msgErr)
}
if msg.Att == nil {
t.Fatalf("Error message missing attachment")
}
if msg.Verb != beam.Attach {
t.Fatalf("Wrong verb\nActual: %s\nExpecting: %s", msg.Verb, beam.Attach)
}
receiver, sendErr := msg.Ret.Send(&beam.Message{Verb: beam.Ack})
if sendErr != nil {
t.Fatalf("Error sending return message: %s", sendErr)
}
_, ackErr := receiver.Receive(0)
if ackErr == nil {
t.Fatalf("No error receiving from message with no return pipe")
}
if ackErr != io.EOF {
t.Fatalf("Unexpected error receiving from message: %s", ackErr)
}
<-gotAck
}))
wsConn, _, err := websocket.DefaultDialer.Dial(strings.Replace(server.URL, "http://", "ws://", 1), http.Header{"Origin": {server.URL}})
if err != nil {
t.Fatal(err)
}
sender, senderErr := NewSender(wsConn)
if senderErr != nil {
t.Fatalf("Error creating sender: %s", senderErr)
}
receiver, sendErr := sender.Send(&beam.Message{Verb: beam.Attach, Ret: beam.RetPipe})
if sendErr != nil {
t.Fatalf("Error sending message: %s", sendErr)
}
msg, receiveErr := receiver.Receive(beam.Ret)
if receiveErr != nil {
t.Fatalf("Error receiving message")
}
if msg.Verb != beam.Ack {
t.Fatalf("Wrong verb\nActual: %s\nExpecting: %s", msg.Verb, beam.Ack)
}
gotAck <- true
shutdownErr := sender.Close()
if shutdownErr != nil && !strings.Contains(shutdownErr.Error(), "broken pipe") {
t.Fatalf("Error closing: %s", shutdownErr)
}
}