diff --git a/beam/AUTHORS b/beam/AUTHORS new file mode 100644 index 0000000000..db33365bcd --- /dev/null +++ b/beam/AUTHORS @@ -0,0 +1 @@ +Solomon Hykes diff --git a/beam/MAINTAINERS b/beam/MAINTAINERS new file mode 100644 index 0000000000..aee10c8421 --- /dev/null +++ b/beam/MAINTAINERS @@ -0,0 +1 @@ +Solomon Hykes (@shykes) diff --git a/beam/README.md b/beam/README.md new file mode 100644 index 0000000000..1ba8e57532 --- /dev/null +++ b/beam/README.md @@ -0,0 +1,79 @@ +# Beam + +## A library to break down an application into loosely coupled micro-services + +Beam is a library to turn your application into a collection of loosely coupled micro-services. +It implements an ultra-lightweight hub for the different components of an application +to discover and consume each other, either in-memory or across the network. + +Beam can be embedded with very little overhead by using Go channels. It +also implements an efficient http2/tls transport which can be used to +securely expose and consume any micro-service across a distributed system. + +Because remote Beam sessions are regular HTTP2 over TLS sessions, they can +be used in combination with any standard proxy or authentication +middleware. This means Beam, when configured propely, can be safely exposed +on the public Internet. It can also be embedded in an existing rest API +using an http1 and websocket fallback. + +## How is it different from RPC or REST? + +Modern micro-services are not a great fit for classical RPC or REST +protocols because they often rely heavily on events, bi-directional +communication, stream multiplexing, and some form of data synchronization. +Sometimes these services have a component which requires raw socket access, +either for performance (file transfer, event firehose, database access) or +simply because they have their own protocol (dns, smtp, sql, ssh, +zeromq, etc). These components typically need a separate set of tools +because they are outside the scope of the REST and RPC tools. If there is +also a websocket or ServerEvents transport, those require yet another layer +of tools. + +Instead of a clunky patchwork of tools, Beam implements in a single +minimalistic library all the primitives needed by modern micro-services: + +* Request/response with arbitrary structured data + +* Asynchronous events flowing in real-time in both directions + +* Requests and responses can flow in any direction, and can be arbitrarily +nested, for example to implement a self-registering worker model + +* Any request or response can include any number of streams, multiplexed in +both directions on the same session. + +* Any message serialization format can be plugged in: json, msgpack, xml, +protobuf. + +* As an optional convenience a minimalist key-value format is implemented. +It is designed to be extremely fast to serialize and parse, dead-simple to +implement, and suitable for both one-time data copy, file storage, and +real-time synchronization. + +* Raw file descriptors can be "attached" to any message, and passed under +the hood using the best method available to each transport. The Go channel +transport just passes os.File pointers around. The unix socket transport +uses fd passing which makes it suitable for high-performance IPC. The +tcp transport uses dedicated http2 streams. And as a bonus extension, a +built-in tcp gateway can be used to proxy raw network sockets without +extra overhead. That means Beam services can be used as smart gateways to a +sql database, ssh or file transfer service, with unified auth, discovery +and tooling and without performance penalty. + +## Design philosophy + +An explicit goal of Beam is simplicity of implementation and clarity of +spec. Porting it to any language should be as effortless as humanly +possible. + +## Creators + +**Solomon Hykes** + +- +- + +## Copyright and license + +Code and documentation copyright 2013-2014 Docker, inc. Code released under the Apache 2.0 license. +Docs released under Creative commons. diff --git a/beam/beam.go b/beam/beam.go new file mode 100644 index 0000000000..7fb13119cb --- /dev/null +++ b/beam/beam.go @@ -0,0 +1,39 @@ +package beam + +import ( + "errors" + "os" +) + +type Sender interface { + Send(msg *Message, mode int) (Receiver, Sender, error) + Close() error +} + +type Receiver interface { + Receive(mode int) (*Message, Receiver, Sender, error) +} + +type Message struct { + Name string + Args []string + Att *os.File +} + +const ( + R = 1 << (32 - 1 - iota) + W +) + +type ReceiverFrom interface { + ReceiveFrom(Receiver) (int, error) +} + +type SenderTo interface { + SendTo(Sender) (int, error) +} + +var ( + ErrIncompatibleSender = errors.New("incompatible sender") + ErrIncompatibleReceiver = errors.New("incompatible receiver") +) diff --git a/beam/beam_test.go b/beam/beam_test.go new file mode 100644 index 0000000000..ee0096b189 --- /dev/null +++ b/beam/beam_test.go @@ -0,0 +1,17 @@ +package beam + +import ( + "testing" +) + +func TestModes(t *testing.T) { + if R == W { + t.Fatalf("0") + } + if R == 0 { + t.Fatalf("0") + } + if W == 0 { + t.Fatalf("0") + } +} diff --git a/beam/data/data.go b/beam/data/data.go new file mode 100644 index 0000000000..5ece063f51 --- /dev/null +++ b/beam/data/data.go @@ -0,0 +1,119 @@ +package data + +import ( + "fmt" + "strconv" + "strings" +) + +func Encode(obj map[string][]string) string { + var msg string + msg += encodeHeader(0) + for k, values := range obj { + msg += encodeNamedList(k, values) + } + return msg +} + +func encodeHeader(msgtype int) string { + return fmt.Sprintf("%03.3d;", msgtype) +} + +func encodeString(s string) string { + return fmt.Sprintf("%d:%s,", len(s), s) +} + +var EncodeString = encodeString +var DecodeString = decodeString + +var EncodeList = encodeList + +func encodeList(l []string) string { + values := make([]string, 0, len(l)) + for _, s := range l { + values = append(values, encodeString(s)) + } + return encodeString(strings.Join(values, "")) +} + +func encodeNamedList(name string, l []string) string { + return encodeString(name) + encodeList(l) +} + +func Decode(msg string) (map[string][]string, error) { + msgtype, skip, err := decodeHeader(msg) + if err != nil { + return nil, err + } + if msgtype != 0 { + // FIXME: use special error type so the caller can easily ignore + return nil, fmt.Errorf("unknown message type: %d", msgtype) + } + msg = msg[skip:] + obj := make(map[string][]string) + for len(msg) > 0 { + k, skip, err := decodeString(msg) + if err != nil { + return nil, err + } + msg = msg[skip:] + values, skip, err := decodeList(msg) + if err != nil { + return nil, err + } + msg = msg[skip:] + obj[k] = values + } + return obj, nil +} + +var DecodeList = decodeList + +func decodeList(msg string) ([]string, int, error) { + blob, skip, err := decodeString(msg) + if err != nil { + return nil, 0, err + } + var l []string + for len(blob) > 0 { + v, skipv, err := decodeString(blob) + if err != nil { + return nil, 0, err + } + l = append(l, v) + blob = blob[skipv:] + } + return l, skip, nil +} + +func decodeString(msg string) (string, int, error) { + parts := strings.SplitN(msg, ":", 2) + if len(parts) != 2 { + return "", 0, fmt.Errorf("invalid format: no column") + } + var length int + if l, err := strconv.ParseUint(parts[0], 10, 64); err != nil { + return "", 0, err + } else { + length = int(l) + } + if len(parts[1]) < length+1 { + return "", 0, fmt.Errorf("message '%s' is %d bytes, expected at least %d", parts[1], len(parts[1]), length+1) + } + payload := parts[1][:length+1] + if payload[length] != ',' { + return "", 0, fmt.Errorf("message is not comma-terminated") + } + return payload[:length], len(parts[0]) + 1 + length + 1, nil +} + +func decodeHeader(msg string) (int, int, error) { + if len(msg) < 4 { + return 0, 0, fmt.Errorf("message too small") + } + msgtype, err := strconv.ParseInt(msg[:3], 10, 32) + if err != nil { + return 0, 0, err + } + return int(msgtype), 4, nil +} diff --git a/beam/data/data_test.go b/beam/data/data_test.go new file mode 100644 index 0000000000..9059922b3b --- /dev/null +++ b/beam/data/data_test.go @@ -0,0 +1,129 @@ +package data + +import ( + "strings" + "testing" +) + +func TestEncodeHelloWorld(t *testing.T) { + input := "hello world!" + output := encodeString(input) + expectedOutput := "12:hello world!," + if output != expectedOutput { + t.Fatalf("'%v' != '%v'", output, expectedOutput) + } +} + +func TestEncodeEmptyString(t *testing.T) { + input := "" + output := encodeString(input) + expectedOutput := "0:," + if output != expectedOutput { + t.Fatalf("'%v' != '%v'", output, expectedOutput) + } +} + +func TestEncodeEmptyList(t *testing.T) { + input := []string{} + output := encodeList(input) + expectedOutput := "0:," + if output != expectedOutput { + t.Fatalf("'%v' != '%v'", output, expectedOutput) + } +} + +func TestEncodeEmptyMap(t *testing.T) { + input := make(map[string][]string) + output := Encode(input) + expectedOutput := "000;" + if output != expectedOutput { + t.Fatalf("'%v' != '%v'", output, expectedOutput) + } +} + +func TestEncode1Key1Value(t *testing.T) { + input := make(map[string][]string) + input["hello"] = []string{"world"} + output := Encode(input) + expectedOutput := "000;5:hello,8:5:world,," + if output != expectedOutput { + t.Fatalf("'%v' != '%v'", output, expectedOutput) + } +} + +func TestEncode1Key2Value(t *testing.T) { + input := make(map[string][]string) + input["hello"] = []string{"beautiful", "world"} + output := Encode(input) + expectedOutput := "000;5:hello,20:9:beautiful,5:world,," + if output != expectedOutput { + t.Fatalf("'%v' != '%v'", output, expectedOutput) + } +} + +func TestEncodeEmptyValue(t *testing.T) { + input := make(map[string][]string) + input["foo"] = []string{} + output := Encode(input) + expectedOutput := "000;3:foo,0:," + if output != expectedOutput { + t.Fatalf("'%v' != '%v'", output, expectedOutput) + } +} + +func TestEncodeBinaryKey(t *testing.T) { + input := make(map[string][]string) + input["foo\x00bar\x7f"] = []string{} + output := Encode(input) + expectedOutput := "000;8:foo\x00bar\x7f,0:," + if output != expectedOutput { + t.Fatalf("'%v' != '%v'", output, expectedOutput) + } +} + +func TestEncodeBinaryValue(t *testing.T) { + input := make(map[string][]string) + input["foo\x00bar\x7f"] = []string{"\x01\x02\x03\x04"} + output := Encode(input) + expectedOutput := "000;8:foo\x00bar\x7f,7:4:\x01\x02\x03\x04,," + if output != expectedOutput { + t.Fatalf("'%v' != '%v'", output, expectedOutput) + } +} + +func TestDecodeString(t *testing.T) { + validEncodedStrings := []struct { + input string + output string + skip int + }{ + {"3:foo,", "foo", 6}, + {"5:hello,", "hello", 8}, + {"5:hello,5:world,", "hello", 8}, + } + for _, sample := range validEncodedStrings { + output, skip, err := decodeString(sample.input) + if err != nil { + t.Fatalf("error decoding '%v': %v", sample.input, err) + } + if skip != sample.skip { + t.Fatalf("invalid skip: %v!=%v", skip, sample.skip) + } + if output != sample.output { + t.Fatalf("invalid output: %v!=%v", output, sample.output) + } + } +} + +func TestDecode1Key1Value(t *testing.T) { + input := "000;3:foo,6:3:bar,," + output, err := Decode(input) + if err != nil { + t.Fatal(err) + } + if v, exists := output["foo"]; !exists { + t.Fatalf("wrong output: %v\n", output) + } else if len(v) != 1 || strings.Join(v, "") != "bar" { + t.Fatalf("wrong output: %v\n", output) + } +} diff --git a/beam/data/message.go b/beam/data/message.go new file mode 100644 index 0000000000..193fb7b241 --- /dev/null +++ b/beam/data/message.go @@ -0,0 +1,93 @@ +package data + +import ( + "fmt" + "strings" +) + +type Message string + +func Empty() Message { + return Message(Encode(nil)) +} + +func Parse(args []string) Message { + data := make(map[string][]string) + for _, word := range args { + if strings.Contains(word, "=") { + kv := strings.SplitN(word, "=", 2) + key := kv[0] + var val string + if len(kv) == 2 { + val = kv[1] + } + data[key] = []string{val} + } + } + return Message(Encode(data)) +} + +func (m Message) Add(k, v string) Message { + data, err := Decode(string(m)) + if err != nil { + return m + } + if values, exists := data[k]; exists { + data[k] = append(values, v) + } else { + data[k] = []string{v} + } + return Message(Encode(data)) +} + +func (m Message) Set(k string, v ...string) Message { + data, err := Decode(string(m)) + if err != nil { + panic(err) + return m + } + data[k] = v + return Message(Encode(data)) +} + +func (m Message) Del(k string) Message { + data, err := Decode(string(m)) + if err != nil { + panic(err) + return m + } + delete(data, k) + return Message(Encode(data)) +} + +func (m Message) Get(k string) []string { + data, err := Decode(string(m)) + if err != nil { + return nil + } + v, exists := data[k] + if !exists { + return nil + } + return v +} + +func (m Message) Pretty() string { + data, err := Decode(string(m)) + if err != nil { + return "" + } + entries := make([]string, 0, len(data)) + for k, values := range data { + entries = append(entries, fmt.Sprintf("%s=%s", k, strings.Join(values, ","))) + } + return strings.Join(entries, " ") +} + +func (m Message) String() string { + return string(m) +} + +func (m Message) Bytes() []byte { + return []byte(m) +} diff --git a/beam/data/message_test.go b/beam/data/message_test.go new file mode 100644 index 0000000000..7685769069 --- /dev/null +++ b/beam/data/message_test.go @@ -0,0 +1,53 @@ +package data + +import ( + "testing" +) + +func TestEmptyMessage(t *testing.T) { + m := Empty() + if m.String() != Encode(nil) { + t.Fatalf("%v != %v", m.String(), Encode(nil)) + } +} + +func TestSetMessage(t *testing.T) { + m := Empty().Set("foo", "bar") + output := m.String() + expectedOutput := "000;3:foo,6:3:bar,," + if output != expectedOutput { + t.Fatalf("'%v' != '%v'", output, expectedOutput) + } + decodedOutput, err := Decode(output) + if err != nil { + t.Fatal(err) + } + if len(decodedOutput) != 1 { + t.Fatalf("wrong output data: %#v\n", decodedOutput) + } +} + +func TestSetMessageTwice(t *testing.T) { + m := Empty().Set("foo", "bar").Set("ga", "bu") + output := m.String() + expectedOutput := "000;3:foo,6:3:bar,,2:ga,5:2:bu,," + if output != expectedOutput { + t.Fatalf("'%v' != '%v'", output, expectedOutput) + } + decodedOutput, err := Decode(output) + if err != nil { + t.Fatal(err) + } + if len(decodedOutput) != 2 { + t.Fatalf("wrong output data: %#v\n", decodedOutput) + } +} + +func TestSetDelMessage(t *testing.T) { + m := Empty().Set("foo", "bar").Del("foo") + output := m.String() + expectedOutput := Encode(nil) + if output != expectedOutput { + t.Fatalf("'%v' != '%v'", output, expectedOutput) + } +} diff --git a/beam/data/netstring.txt b/beam/data/netstring.txt new file mode 100644 index 0000000000..17560929b6 --- /dev/null +++ b/beam/data/netstring.txt @@ -0,0 +1,92 @@ +## +## Netstrings spec copied as-is from http://cr.yp.to/proto/netstrings.txt +## + +Netstrings +D. J. Bernstein, djb@pobox.com +19970201 + + +1. Introduction + + A netstring is a self-delimiting encoding of a string. Netstrings are + very easy to generate and to parse. Any string may be encoded as a + netstring; there are no restrictions on length or on allowed bytes. + Another virtue of a netstring is that it declares the string size up + front. Thus an application can check in advance whether it has enough + space to store the entire string. + + Netstrings may be used as a basic building block for reliable network + protocols. Most high-level protocols, in effect, transmit a sequence + of strings; those strings may be encoded as netstrings and then + concatenated into a sequence of characters, which in turn may be + transmitted over a reliable stream protocol such as TCP. + + Note that netstrings can be used recursively. The result of encoding + a sequence of strings is a single string. A series of those encoded + strings may in turn be encoded into a single string. And so on. + + In this document, a string of 8-bit bytes may be written in two + different forms: as a series of hexadecimal numbers between angle + brackets, or as a sequence of ASCII characters between double quotes. + For example, <68 65 6c 6c 6f 20 77 6f 72 6c 64 21> is a string of + length 12; it is the same as the string "hello world!". + + Although this document restricts attention to strings of 8-bit bytes, + netstrings could be used with any 6-bit-or-larger character set. + + +2. Definition + + Any string of 8-bit bytes may be encoded as [len]":"[string]",". + Here [string] is the string and [len] is a nonempty sequence of ASCII + digits giving the length of [string] in decimal. The ASCII digits are + <30> for 0, <31> for 1, and so on up through <39> for 9. Extra zeros + at the front of [len] are prohibited: [len] begins with <30> exactly + when [string] is empty. + + For example, the string "hello world!" is encoded as <31 32 3a 68 + 65 6c 6c 6f 20 77 6f 72 6c 64 21 2c>, i.e., "12:hello world!,". The + empty string is encoded as "0:,". + + [len]":"[string]"," is called a netstring. [string] is called the + interpretation of the netstring. + + +3. Sample code + + The following C code starts with a buffer buf of length len and + prints it as a netstring. + + if (printf("%lu:",len) < 0) barf(); + if (fwrite(buf,1,len,stdout) < len) barf(); + if (putchar(',') < 0) barf(); + + The following C code reads a netstring and decodes it into a + dynamically allocated buffer buf of length len. + + if (scanf("%9lu",&len) < 1) barf(); /* >999999999 bytes is bad */ + if (getchar() != ':') barf(); + buf = malloc(len + 1); /* malloc(0) is not portable */ + if (!buf) barf(); + if (fread(buf,1,len,stdin) < len) barf(); + if (getchar() != ',') barf(); + + Both of these code fragments assume that the local character set is + ASCII, and that the relevant stdio streams are in binary mode. + + +4. Security considerations + + The famous Finger security hole may be blamed on Finger's use of the + CRLF encoding. In that encoding, each string is simply terminated by + CRLF. This encoding has several problems. Most importantly, it does + not declare the string size in advance. This means that a correct + CRLF parser must be prepared to ask for more and more memory as it is + reading the string. In the case of Finger, a lazy implementor found + this to be too much trouble; instead he simply declared a fixed-size + buffer and used C's gets() function. The rest is history. + + In contrast, as the above sample code shows, it is very easy to + handle netstrings without risking buffer overflow. Thus widespread + use of netstrings may improve network security. diff --git a/beam/examples/beamsh/beamsh b/beam/examples/beamsh/beamsh new file mode 100755 index 0000000000..9bfe78ef4a Binary files /dev/null and b/beam/examples/beamsh/beamsh differ diff --git a/beam/examples/beamsh/beamsh.go b/beam/examples/beamsh/beamsh.go new file mode 100644 index 0000000000..808f038c68 --- /dev/null +++ b/beam/examples/beamsh/beamsh.go @@ -0,0 +1,542 @@ +package main + +import ( + "bufio" + "flag" + "fmt" + "github.com/dotcloud/docker/pkg/beam" + "github.com/dotcloud/docker/pkg/beam/data" + "github.com/dotcloud/docker/pkg/dockerscript" + "github.com/dotcloud/docker/pkg/term" + "io" + "net" + "net/url" + "os" + "path" + "strings" + "sync" +) + +var rootPlugins = []string{ + "stdio", +} + +var ( + flX bool + flPing bool + introspect beam.ReceiveSender = beam.Devnull() +) + +func main() { + fd3 := os.NewFile(3, "beam-introspect") + if introsp, err := beam.FileConn(fd3); err == nil { + introspect = introsp + Logf("introspection enabled\n") + } else { + Logf("introspection disabled\n") + } + fd3.Close() + flag.BoolVar(&flX, "x", false, "print commands as they are being executed") + flag.Parse() + if flag.NArg() == 0 { + if term.IsTerminal(0) { + // No arguments, stdin is terminal --> interactive mode + input := bufio.NewScanner(os.Stdin) + for { + fmt.Printf("[%d] beamsh> ", os.Getpid()) + if !input.Scan() { + break + } + line := input.Text() + if len(line) != 0 { + cmd, err := dockerscript.Parse(strings.NewReader(line)) + if err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + continue + } + if err := executeRootScript(cmd); err != nil { + Fatal(err) + } + } + if err := input.Err(); err == io.EOF { + break + } else if err != nil { + Fatal(err) + } + } + } else { + // No arguments, stdin not terminal --> batch mode + script, err := dockerscript.Parse(os.Stdin) + if err != nil { + Fatal("parse error: %v\n", err) + } + if err := executeRootScript(script); err != nil { + Fatal(err) + } + } + } else { + // 1+ arguments: parse them as script files + for _, scriptpath := range flag.Args() { + f, err := os.Open(scriptpath) + if err != nil { + Fatal(err) + } + script, err := dockerscript.Parse(f) + if err != nil { + Fatal("parse error: %v\n", err) + } + if err := executeRootScript(script); err != nil { + Fatal(err) + } + } + } +} + +func executeRootScript(script []*dockerscript.Command) error { + if len(rootPlugins) > 0 { + // If there are root plugins, wrap the script inside them + var ( + rootCmd *dockerscript.Command + lastCmd *dockerscript.Command + ) + for _, plugin := range rootPlugins { + pluginCmd := &dockerscript.Command{ + Args: []string{plugin}, + } + if rootCmd == nil { + rootCmd = pluginCmd + } else { + lastCmd.Children = []*dockerscript.Command{pluginCmd} + } + lastCmd = pluginCmd + } + lastCmd.Children = script + script = []*dockerscript.Command{rootCmd} + } + handlers, err := Handlers(introspect) + if err != nil { + return err + } + defer handlers.Close() + var tasks sync.WaitGroup + defer func() { + Debugf("Waiting for introspection...\n") + tasks.Wait() + Debugf("DONE Waiting for introspection\n") + }() + if introspect != nil { + tasks.Add(1) + go func() { + Debugf("starting introspection\n") + defer Debugf("done with introspection\n") + defer tasks.Done() + introspect.Send(data.Empty().Set("cmd", "log", "stdout").Set("message", "introspection worked!").Bytes(), nil) + Debugf("XXX starting reading introspection messages\n") + r := beam.NewRouter(handlers) + r.NewRoute().All().Handler(func(p []byte, a *os.File) error { + Logf("[INTROSPECTION] %s\n", beam.MsgDesc(p, a)) + return handlers.Send(p, a) + }) + n, err := beam.Copy(r, introspect) + Debugf("XXX done reading %d introspection messages: %v\n", n, err) + }() + } + if err := executeScript(handlers, script); err != nil { + return err + } + return nil +} + +func executeScript(out beam.Sender, script []*dockerscript.Command) error { + Debugf("executeScript(%s)\n", scriptString(script)) + defer Debugf("executeScript(%s) DONE\n", scriptString(script)) + var background sync.WaitGroup + defer background.Wait() + for _, cmd := range script { + if cmd.Background { + background.Add(1) + go func(out beam.Sender, cmd *dockerscript.Command) { + executeCommand(out, cmd) + background.Done() + }(out, cmd) + } else { + if err := executeCommand(out, cmd); err != nil { + return err + } + } + } + return nil +} + +// 1) Find a handler for the command (if no handler, fail) +// 2) Attach new in & out pair to the handler +// 3) [in the background] Copy handler output to our own output +// 4) [in the background] Run the handler +// 5) Recursively executeScript() all children commands and wait for them to complete +// 6) Wait for handler to return and (shortly afterwards) output copy to complete +// 7) Profit +func executeCommand(out beam.Sender, cmd *dockerscript.Command) error { + if flX { + fmt.Printf("+ %v\n", strings.Replace(strings.TrimRight(cmd.String(), "\n"), "\n", "\n+ ", -1)) + } + Debugf("executeCommand(%s)\n", strings.Join(cmd.Args, " ")) + defer Debugf("executeCommand(%s) DONE\n", strings.Join(cmd.Args, " ")) + if len(cmd.Args) == 0 { + return fmt.Errorf("empty command") + } + Debugf("[executeCommand] sending job '%s'\n", strings.Join(cmd.Args, " ")) + job, err := beam.SendConn(out, data.Empty().Set("cmd", cmd.Args...).Set("type", "job").Bytes()) + if err != nil { + return fmt.Errorf("%v\n", err) + } + var tasks sync.WaitGroup + tasks.Add(1) + Debugf("[executeCommand] spawning background copy of the output of '%s'\n", strings.Join(cmd.Args, " ")) + go func() { + if out != nil { + Debugf("[executeCommand] background copy of the output of '%s'\n", strings.Join(cmd.Args, " ")) + n, err := beam.Copy(out, job) + if err != nil { + Fatalf("[executeCommand] [%s] error during background copy: %v\n", strings.Join(cmd.Args, " "), err) + } + Debugf("[executeCommand] background copy done of the output of '%s': copied %d messages\n", strings.Join(cmd.Args, " "), n) + } + tasks.Done() + }() + // depth-first execution of children commands + // executeScript() blocks until all commands are completed + Debugf("[executeCommand] recursively running children of '%s'\n", strings.Join(cmd.Args, " ")) + executeScript(job, cmd.Children) + Debugf("[executeCommand] DONE recursively running children of '%s'\n", strings.Join(cmd.Args, " ")) + job.CloseWrite() + Debugf("[executeCommand] closing the input of '%s' (all children are completed)\n", strings.Join(cmd.Args, " ")) + Debugf("[executeCommand] waiting for background copy of '%s' to complete...\n", strings.Join(cmd.Args, " ")) + tasks.Wait() + Debugf("[executeCommand] background copy of '%s' complete! This means the job completed.\n", strings.Join(cmd.Args, " ")) + return nil +} + +type Handler func([]string, io.Writer, io.Writer, beam.Receiver, beam.Sender) + +func Handlers(sink beam.Sender) (*beam.UnixConn, error) { + var tasks sync.WaitGroup + pub, priv, err := beam.USocketPair() + if err != nil { + return nil, err + } + go func() { + defer func() { + Debugf("[handlers] closewrite() on endpoint\n") + // FIXME: this is not yet necessary but will be once + // there is synchronization over standard beam messages + priv.CloseWrite() + Debugf("[handlers] done closewrite() on endpoint\n") + }() + r := beam.NewRouter(sink) + r.NewRoute().HasAttachment().KeyIncludes("type", "job").Handler(func(payload []byte, attachment *os.File) error { + conn, err := beam.FileConn(attachment) + if err != nil { + attachment.Close() + return err + } + // attachment.Close() + tasks.Add(1) + go func() { + defer tasks.Done() + defer func() { + Debugf("[handlers] '%s' closewrite\n", payload) + conn.CloseWrite() + Debugf("[handlers] '%s' done closewrite\n", payload) + }() + cmd := data.Message(payload).Get("cmd") + Debugf("[handlers] received %s\n", strings.Join(cmd, " ")) + if len(cmd) == 0 { + return + } + handler := GetHandler(cmd[0]) + if handler == nil { + return + } + stdout, err := beam.SendRPipe(conn, data.Empty().Set("cmd", "log", "stdout").Set("fromcmd", cmd...).Bytes()) + if err != nil { + return + } + defer stdout.Close() + stderr, err := beam.SendRPipe(conn, data.Empty().Set("cmd", "log", "stderr").Set("fromcmd", cmd...).Bytes()) + if err != nil { + return + } + defer stderr.Close() + Debugf("[handlers] calling %s\n", strings.Join(cmd, " ")) + handler(cmd, stdout, stderr, beam.Receiver(conn), beam.Sender(conn)) + Debugf("[handlers] returned: %s\n", strings.Join(cmd, " ")) + }() + return nil + }) + beam.Copy(r, priv) + Debugf("[handlers] waiting for all tasks\n") + tasks.Wait() + Debugf("[handlers] all tasks returned\n") + }() + return pub, nil +} + +func GetHandler(name string) Handler { + if name == "logger" { + return CmdLogger + } else if name == "render" { + return CmdRender + } else if name == "devnull" { + return CmdDevnull + } else if name == "prompt" { + return CmdPrompt + } else if name == "stdio" { + return CmdStdio + } else if name == "echo" { + return CmdEcho + } else if name == "pass" { + return CmdPass + } else if name == "in" { + return CmdIn + } else if name == "exec" { + return CmdExec + } else if name == "trace" { + return CmdTrace + } else if name == "emit" { + return CmdEmit + } else if name == "print" { + return CmdPrint + } else if name == "multiprint" { + return CmdMultiprint + } else if name == "listen" { + return CmdListen + } else if name == "beamsend" { + return CmdBeamsend + } else if name == "beamreceive" { + return CmdBeamreceive + } else if name == "connect" { + return CmdConnect + } else if name == "openfile" { + return CmdOpenfile + } else if name == "spawn" { + return CmdSpawn + } else if name == "chdir" { + return CmdChdir + } + return nil +} + +// VARIOUS HELPER FUNCTIONS: + +func connToFile(conn net.Conn) (f *os.File, err error) { + if connWithFile, ok := conn.(interface { + File() (*os.File, error) + }); !ok { + return nil, fmt.Errorf("no file descriptor available") + } else { + f, err = connWithFile.File() + if err != nil { + return nil, err + } + } + return f, err +} + +type Msg struct { + payload []byte + attachment *os.File +} + +func Logf(msg string, args ...interface{}) (int, error) { + if len(msg) == 0 || msg[len(msg)-1] != '\n' { + msg = msg + "\n" + } + msg = fmt.Sprintf("[%v] [%v] %s", os.Getpid(), path.Base(os.Args[0]), msg) + return fmt.Printf(msg, args...) +} + +func Debugf(msg string, args ...interface{}) { + if os.Getenv("BEAMDEBUG") != "" { + Logf(msg, args...) + } +} + +func Fatalf(msg string, args ...interface{}) { + Logf(msg, args...) + os.Exit(1) +} + +func Fatal(args ...interface{}) { + Fatalf("%v", args[0]) +} + +func scriptString(script []*dockerscript.Command) string { + lines := make([]string, 0, len(script)) + for _, cmd := range script { + line := strings.Join(cmd.Args, " ") + if len(cmd.Children) > 0 { + line += fmt.Sprintf(" { %s }", scriptString(cmd.Children)) + } else { + line += " {}" + } + lines = append(lines, line) + } + return fmt.Sprintf("'%s'", strings.Join(lines, "; ")) +} + +func dialer(addr string) (chan net.Conn, error) { + u, err := url.Parse(addr) + if err != nil { + return nil, err + } + connections := make(chan net.Conn) + go func() { + defer close(connections) + for { + conn, err := net.Dial(u.Scheme, u.Host) + if err != nil { + return + } + connections <- conn + } + }() + return connections, nil +} + +func listener(addr string) (chan net.Conn, error) { + u, err := url.Parse(addr) + if err != nil { + return nil, err + } + l, err := net.Listen(u.Scheme, u.Host) + if err != nil { + return nil, err + } + connections := make(chan net.Conn) + go func() { + defer close(connections) + for { + conn, err := l.Accept() + if err != nil { + return + } + Logf("new connection\n") + connections <- conn + } + }() + return connections, nil +} + +func SendToConn(connections chan net.Conn, src beam.Receiver) error { + var tasks sync.WaitGroup + defer tasks.Wait() + for { + payload, attachment, err := src.Receive() + if err == io.EOF { + return nil + } else if err != nil { + return err + } + conn, ok := <-connections + if !ok { + break + } + Logf("Sending %s\n", msgDesc(payload, attachment)) + tasks.Add(1) + go func(payload []byte, attachment *os.File, conn net.Conn) { + defer tasks.Done() + if _, err := conn.Write([]byte(data.EncodeString(string(payload)))); err != nil { + return + } + if attachment == nil { + conn.Close() + return + } + var iotasks sync.WaitGroup + iotasks.Add(2) + go func(attachment *os.File, conn net.Conn) { + defer iotasks.Done() + Debugf("copying the connection to [%d]\n", attachment.Fd()) + io.Copy(attachment, conn) + attachment.Close() + Debugf("done copying the connection to [%d]\n", attachment.Fd()) + }(attachment, conn) + go func(attachment *os.File, conn net.Conn) { + defer iotasks.Done() + Debugf("copying [%d] to the connection\n", attachment.Fd()) + io.Copy(conn, attachment) + conn.Close() + Debugf("done copying [%d] to the connection\n", attachment.Fd()) + }(attachment, conn) + iotasks.Wait() + }(payload, attachment, conn) + } + return nil +} + +func msgDesc(payload []byte, attachment *os.File) string { + return beam.MsgDesc(payload, attachment) +} + +func ReceiveFromConn(connections chan net.Conn, dst beam.Sender) error { + for conn := range connections { + err := func() error { + Logf("parsing message from network...\n") + defer Logf("done parsing message from network\n") + buf := make([]byte, 4098) + n, err := conn.Read(buf) + if n == 0 { + conn.Close() + if err == io.EOF { + return nil + } else { + return err + } + } + Logf("decoding message from '%s'\n", buf[:n]) + header, skip, err := data.DecodeString(string(buf[:n])) + if err != nil { + conn.Close() + return err + } + pub, priv, err := beam.SocketPair() + if err != nil { + return err + } + Logf("decoded message: %s\n", data.Message(header).Pretty()) + go func(skipped []byte, conn net.Conn, f *os.File) { + // this closes both conn and f + if len(skipped) > 0 { + if _, err := f.Write(skipped); err != nil { + Logf("ERROR: %v\n", err) + f.Close() + conn.Close() + return + } + } + bicopy(conn, f) + }(buf[skip:n], conn, pub) + if err := dst.Send([]byte(header), priv); err != nil { + return err + } + return nil + }() + if err != nil { + Logf("Error reading from connection: %v\n", err) + } + } + return nil +} + +func bicopy(a, b io.ReadWriteCloser) { + var iotasks sync.WaitGroup + oneCopy := func(dst io.WriteCloser, src io.Reader) { + defer iotasks.Done() + io.Copy(dst, src) + dst.Close() + } + iotasks.Add(2) + go oneCopy(a, b) + go oneCopy(b, a) + iotasks.Wait() +} diff --git a/beam/examples/beamsh/builtins.go b/beam/examples/beamsh/builtins.go new file mode 100644 index 0000000000..3242237cc1 --- /dev/null +++ b/beam/examples/beamsh/builtins.go @@ -0,0 +1,441 @@ +package main + +import ( + "bufio" + "fmt" + "github.com/dotcloud/docker/pkg/beam" + "github.com/dotcloud/docker/pkg/beam/data" + "github.com/dotcloud/docker/pkg/term" + "github.com/dotcloud/docker/utils" + "io" + "net" + "net/url" + "os" + "os/exec" + "path" + "strings" + "sync" + "text/template" +) + +func CmdLogger(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + if err := os.MkdirAll("logs", 0700); err != nil { + fmt.Fprintf(stderr, "%v\n", err) + return + } + var tasks sync.WaitGroup + defer tasks.Wait() + var n int = 1 + r := beam.NewRouter(out) + r.NewRoute().HasAttachment().KeyStartsWith("cmd", "log").Handler(func(payload []byte, attachment *os.File) error { + tasks.Add(1) + go func(n int) { + defer tasks.Done() + defer attachment.Close() + var streamname string + if cmd := data.Message(payload).Get("cmd"); len(cmd) == 1 || cmd[1] == "stdout" { + streamname = "stdout" + } else { + streamname = cmd[1] + } + if fromcmd := data.Message(payload).Get("fromcmd"); len(fromcmd) != 0 { + streamname = fmt.Sprintf("%s-%s", strings.Replace(strings.Join(fromcmd, "_"), "/", "_", -1), streamname) + } + logfile, err := os.OpenFile(path.Join("logs", fmt.Sprintf("%d-%s", n, streamname)), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0700) + if err != nil { + fmt.Fprintf(stderr, "%v\n", err) + return + } + defer logfile.Close() + io.Copy(logfile, attachment) + logfile.Sync() + }(n) + n++ + return nil + }).Tee(out) + if _, err := beam.Copy(r, in); err != nil { + fmt.Fprintf(stderr, "%v\n", err) + return + } +} + +func CmdRender(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + if len(args) != 2 { + fmt.Fprintf(stderr, "Usage: %s FORMAT\n", args[0]) + out.Send(data.Empty().Set("status", "1").Bytes(), nil) + return + } + txt := args[1] + if !strings.HasSuffix(txt, "\n") { + txt += "\n" + } + t := template.Must(template.New("render").Parse(txt)) + for { + payload, attachment, err := in.Receive() + if err != nil { + return + } + msg, err := data.Decode(string(payload)) + if err != nil { + fmt.Fprintf(stderr, "decode error: %v\n") + } + if err := t.Execute(stdout, msg); err != nil { + fmt.Fprintf(stderr, "rendering error: %v\n", err) + out.Send(data.Empty().Set("status", "1").Bytes(), nil) + return + } + if err := out.Send(payload, attachment); err != nil { + return + } + } +} + +func CmdDevnull(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + for { + _, attachment, err := in.Receive() + if err != nil { + return + } + if attachment != nil { + attachment.Close() + } + } +} + +func CmdPrompt(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + if len(args) < 2 { + fmt.Fprintf(stderr, "usage: %s PROMPT...\n", args[0]) + return + } + if !term.IsTerminal(0) { + fmt.Fprintf(stderr, "can't prompt: no tty available...\n") + return + } + fmt.Printf("%s: ", strings.Join(args[1:], " ")) + oldState, _ := term.SaveState(0) + term.DisableEcho(0, oldState) + line, _, err := bufio.NewReader(os.Stdin).ReadLine() + if err != nil { + fmt.Fprintln(stderr, err.Error()) + return + } + val := string(line) + fmt.Printf("\n") + term.RestoreTerminal(0, oldState) + out.Send(data.Empty().Set("fromcmd", args...).Set("value", val).Bytes(), nil) +} + +func CmdStdio(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + var tasks sync.WaitGroup + defer tasks.Wait() + + r := beam.NewRouter(out) + r.NewRoute().HasAttachment().KeyStartsWith("cmd", "log").Handler(func(payload []byte, attachment *os.File) error { + tasks.Add(1) + go func() { + defer tasks.Done() + defer attachment.Close() + io.Copy(os.Stdout, attachment) + attachment.Close() + }() + return nil + }).Tee(out) + + if _, err := beam.Copy(r, in); err != nil { + Fatal(err) + fmt.Fprintf(stderr, "%v\n", err) + return + } +} + +func CmdEcho(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + fmt.Fprintln(stdout, strings.Join(args[1:], " ")) +} + +func CmdPass(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + for { + payload, attachment, err := in.Receive() + if err != nil { + return + } + if err := out.Send(payload, attachment); err != nil { + if attachment != nil { + attachment.Close() + } + return + } + } +} + +func CmdSpawn(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + c := exec.Command(utils.SelfPath()) + r, w, err := os.Pipe() + if err != nil { + fmt.Fprintf(stderr, "%v\n", err) + return + } + c.Stdin = r + c.Stdout = stdout + c.Stderr = stderr + go func() { + fmt.Fprintf(w, strings.Join(args[1:], " ")) + w.Sync() + w.Close() + }() + if err := c.Run(); err != nil { + fmt.Fprintf(stderr, "%v\n", err) + return + } +} + +func CmdIn(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + os.Chdir(args[1]) + GetHandler("pass")([]string{"pass"}, stdout, stderr, in, out) +} + +func CmdExec(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + cmd := exec.Command(args[1], args[2:]...) + cmd.Stdout = stdout + cmd.Stderr = stderr + //cmd.Stdin = os.Stdin + local, remote, err := beam.SocketPair() + if err != nil { + fmt.Fprintf(stderr, "%v\n", err) + return + } + child, err := beam.FileConn(local) + if err != nil { + local.Close() + remote.Close() + fmt.Fprintf(stderr, "%v\n", err) + return + } + local.Close() + cmd.ExtraFiles = append(cmd.ExtraFiles, remote) + + var tasks sync.WaitGroup + tasks.Add(1) + go func() { + defer Debugf("done copying to child\n") + defer tasks.Done() + defer child.CloseWrite() + beam.Copy(child, in) + }() + + tasks.Add(1) + go func() { + defer Debugf("done copying from child %d\n") + defer tasks.Done() + r := beam.NewRouter(out) + r.NewRoute().All().Handler(func(p []byte, a *os.File) error { + return out.Send(data.Message(p).Set("pid", fmt.Sprintf("%d", cmd.Process.Pid)).Bytes(), a) + }) + beam.Copy(r, child) + }() + execErr := cmd.Run() + // We can close both ends of the socket without worrying about data stuck in the buffer, + // because unix socket writes are fully synchronous. + child.Close() + tasks.Wait() + var status string + if execErr != nil { + status = execErr.Error() + } else { + status = "ok" + } + out.Send(data.Empty().Set("status", status).Set("cmd", args...).Bytes(), nil) +} + +func CmdTrace(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + r := beam.NewRouter(out) + r.NewRoute().All().Handler(func(payload []byte, attachment *os.File) error { + var sfd string = "nil" + if attachment != nil { + sfd = fmt.Sprintf("%d", attachment.Fd()) + } + fmt.Printf("===> %s [%s]\n", data.Message(payload).Pretty(), sfd) + out.Send(payload, attachment) + return nil + }) + beam.Copy(r, in) +} + +func CmdEmit(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + out.Send(data.Parse(args[1:]).Bytes(), nil) +} + +func CmdPrint(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + for { + payload, a, err := in.Receive() + if err != nil { + return + } + // Skip commands + if a != nil && data.Message(payload).Get("cmd") == nil { + dup, err := beam.SendRPipe(out, payload) + if err != nil { + a.Close() + return + } + io.Copy(io.MultiWriter(os.Stdout, dup), a) + dup.Close() + } else { + if err := out.Send(payload, a); err != nil { + return + } + } + } +} + +func CmdMultiprint(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + var tasks sync.WaitGroup + defer tasks.Wait() + r := beam.NewRouter(out) + multiprint := func(p []byte, a *os.File) error { + tasks.Add(1) + go func() { + defer tasks.Done() + defer a.Close() + msg := data.Message(string(p)) + input := bufio.NewScanner(a) + for input.Scan() { + fmt.Printf("[%s] %s\n", msg.Pretty(), input.Text()) + } + }() + return nil + } + r.NewRoute().KeyIncludes("type", "job").Passthrough(out) + r.NewRoute().HasAttachment().Handler(multiprint).Tee(out) + beam.Copy(r, in) +} + +func CmdListen(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + if len(args) != 2 { + out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil) + return + } + u, err := url.Parse(args[1]) + if err != nil { + out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) + return + } + l, err := net.Listen(u.Scheme, u.Host) + if err != nil { + out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) + return + } + for { + conn, err := l.Accept() + if err != nil { + out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) + return + } + f, err := connToFile(conn) + if err != nil { + conn.Close() + continue + } + out.Send(data.Empty().Set("type", "socket").Set("remoteaddr", conn.RemoteAddr().String()).Bytes(), f) + } +} + +func CmdBeamsend(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + if len(args) < 2 { + if err := out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil { + Fatal(err) + } + return + } + var connector func(string) (chan net.Conn, error) + connector = dialer + connections, err := connector(args[1]) + if err != nil { + out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) + return + } + // Copy in to conn + SendToConn(connections, in) +} + +func CmdBeamreceive(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + if len(args) != 2 { + if err := out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil { + Fatal(err) + } + return + } + var connector func(string) (chan net.Conn, error) + connector = listener + connections, err := connector(args[1]) + if err != nil { + out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) + return + } + // Copy in to conn + ReceiveFromConn(connections, out) +} + +func CmdConnect(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + if len(args) != 2 { + out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil) + return + } + u, err := url.Parse(args[1]) + if err != nil { + out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) + return + } + var tasks sync.WaitGroup + for { + _, attachment, err := in.Receive() + if err != nil { + break + } + if attachment == nil { + continue + } + Logf("connecting to %s/%s\n", u.Scheme, u.Host) + conn, err := net.Dial(u.Scheme, u.Host) + if err != nil { + out.Send(data.Empty().Set("cmd", "msg", "connect error: "+err.Error()).Bytes(), nil) + return + } + out.Send(data.Empty().Set("cmd", "msg", "connection established").Bytes(), nil) + tasks.Add(1) + go func(attachment *os.File, conn net.Conn) { + defer tasks.Done() + // even when successful, conn.File() returns a duplicate, + // so we must close the original + var iotasks sync.WaitGroup + iotasks.Add(2) + go func(attachment *os.File, conn net.Conn) { + defer iotasks.Done() + io.Copy(attachment, conn) + }(attachment, conn) + go func(attachment *os.File, conn net.Conn) { + defer iotasks.Done() + io.Copy(conn, attachment) + }(attachment, conn) + iotasks.Wait() + conn.Close() + attachment.Close() + }(attachment, conn) + } + tasks.Wait() +} + +func CmdOpenfile(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + for _, name := range args { + f, err := os.Open(name) + if err != nil { + continue + } + if err := out.Send(data.Empty().Set("path", name).Set("type", "file").Bytes(), f); err != nil { + f.Close() + } + } +} + +func CmdChdir(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + os.Chdir(args[1]) +} diff --git a/beam/examples/beamsh/scripts/bug0.ds b/beam/examples/beamsh/scripts/bug0.ds new file mode 100755 index 0000000000..89b75230be --- /dev/null +++ b/beam/examples/beamsh/scripts/bug0.ds @@ -0,0 +1,3 @@ +#!/usr/bin/env beamsh + +exec ls -l diff --git a/beam/examples/beamsh/scripts/bug1.ds b/beam/examples/beamsh/scripts/bug1.ds new file mode 100755 index 0000000000..2d8a9e2ed9 --- /dev/null +++ b/beam/examples/beamsh/scripts/bug1.ds @@ -0,0 +1,5 @@ +#!/usr/bin/env beamsh + +trace { + exec ls -l +} diff --git a/beam/examples/beamsh/scripts/bug2.ds b/beam/examples/beamsh/scripts/bug2.ds new file mode 100755 index 0000000000..08f0431f68 --- /dev/null +++ b/beam/examples/beamsh/scripts/bug2.ds @@ -0,0 +1,7 @@ +#!/usr/bin/env beamsh + +trace { + stdio { + exec ls -l + } +} diff --git a/beam/examples/beamsh/scripts/bug3.ds b/beam/examples/beamsh/scripts/bug3.ds new file mode 100755 index 0000000000..7bb8694d49 --- /dev/null +++ b/beam/examples/beamsh/scripts/bug3.ds @@ -0,0 +1,10 @@ +#!/usr/bin/env beamsh -x + +trace outer { + # stdio fails + stdio { + trace inner { + exec ls -l + } + } +} diff --git a/beam/examples/beamsh/scripts/bug4.ds b/beam/examples/beamsh/scripts/bug4.ds new file mode 100755 index 0000000000..b7beedbae2 --- /dev/null +++ b/beam/examples/beamsh/scripts/bug4.ds @@ -0,0 +1,9 @@ +#!/usr/bin/env beamsh + +stdio { + trace { + stdio { + exec ls -l + } + } +} diff --git a/beam/examples/beamsh/scripts/bug5.ds b/beam/examples/beamsh/scripts/bug5.ds new file mode 100755 index 0000000000..9f9a85515d --- /dev/null +++ b/beam/examples/beamsh/scripts/bug5.ds @@ -0,0 +1,6 @@ +#!/usr/bin/env beamsh + +stdio { + # exec fails + exec ls -l +} diff --git a/beam/examples/beamsh/scripts/bug6.ds b/beam/examples/beamsh/scripts/bug6.ds new file mode 100755 index 0000000000..90281401cd --- /dev/null +++ b/beam/examples/beamsh/scripts/bug6.ds @@ -0,0 +1,7 @@ +#!/usr/bin/env beamsh + +stdio { + trace { + echo hello + } +} diff --git a/beam/examples/beamsh/scripts/bug7.ds b/beam/examples/beamsh/scripts/bug7.ds new file mode 100755 index 0000000000..b6e7bd9201 --- /dev/null +++ b/beam/examples/beamsh/scripts/bug7.ds @@ -0,0 +1,6 @@ +#!/usr/bin/env beamsh + +stdio { + # exec fails + echo hello world +} diff --git a/beam/examples/beamsh/scripts/demo1.ds b/beam/examples/beamsh/scripts/demo1.ds new file mode 100755 index 0000000000..20a3359f3a --- /dev/null +++ b/beam/examples/beamsh/scripts/demo1.ds @@ -0,0 +1,9 @@ +#!/usr/bin/env beamsh + +devnull { + multiprint { + exec tail -f /var/log/system.log & + exec ls -l + exec ls ksdhfkjdshf jksdfhkjsdhf + } +} diff --git a/beam/examples/beamsh/scripts/helloworld.ds b/beam/examples/beamsh/scripts/helloworld.ds new file mode 100755 index 0000000000..32e59b062e --- /dev/null +++ b/beam/examples/beamsh/scripts/helloworld.ds @@ -0,0 +1,8 @@ +#!/usr/bin/env beamsh + +print { + trace { + emit msg=hello + emit msg=world + } +} diff --git a/beam/examples/beamsh/scripts/logdemo.ds b/beam/examples/beamsh/scripts/logdemo.ds new file mode 100755 index 0000000000..8b729a966f --- /dev/null +++ b/beam/examples/beamsh/scripts/logdemo.ds @@ -0,0 +1,9 @@ +#!/usr/bin/env beamsh + +trace { + log { + exec ls -l + exec ls /tmp/jhsdfkjhsdjkfhsdjkfhsdjkkhsdjkf + echo hello world + } +} diff --git a/beam/examples/beamsh/scripts/miniserver.ds b/beam/examples/beamsh/scripts/miniserver.ds new file mode 100755 index 0000000000..9707477ee0 --- /dev/null +++ b/beam/examples/beamsh/scripts/miniserver.ds @@ -0,0 +1,9 @@ +#!/usr/bin/env beamsh + +multiprint { + trace { + listen tcp://localhost:7676 & + listen tcp://localhost:8787 & + } +} + diff --git a/beam/http2/README.md b/beam/http2/README.md new file mode 100644 index 0000000000..d9af9bc043 --- /dev/null +++ b/beam/http2/README.md @@ -0,0 +1,6 @@ +This package defines a remote transport for Beam services using http2/spdy and tls. + +Pointers: + + * Low-level protocol framer: http://code.google.com/p/go.net/spdy + * (incomplete) high-level server implementation: https://github.com/shykes/spdy-go diff --git a/beam/inmem/inmem.go b/beam/inmem/inmem.go new file mode 100644 index 0000000000..0c7a7e90fe --- /dev/null +++ b/beam/inmem/inmem.go @@ -0,0 +1,264 @@ +package inmem + +import ( + "github.com/docker/beam" + "io" + "sync" +) + +func Pipe() (*PipeReceiver, *PipeSender) { + p := new(pipe) + p.rwait.L = &p.l + p.wwait.L = &p.l + r := &PipeReceiver{p} + w := &PipeSender{p} + return r, w +} + +type pipe struct { + ch chan *pipeMessage + rwait sync.Cond + wwait sync.Cond + l sync.Mutex + rl sync.Mutex + wl sync.Mutex + rerr error // if reader closed, error to give writes + werr error // if writer closed, error to give reads + pmsg *pipeMessage +} + +type pipeMessage struct { + msg *beam.Message + out *PipeSender + in *PipeReceiver +} + +func (p *pipe) psend(pmsg *pipeMessage) error { + var err error + // One writer at a time. + p.wl.Lock() + defer p.wl.Unlock() + + p.l.Lock() + defer p.l.Unlock() + p.pmsg = pmsg + p.rwait.Signal() + for { + if p.pmsg == nil { + break + } + if p.rerr != nil { + err = p.rerr + break + } + if p.werr != nil { + err = io.ErrClosedPipe + } + p.wwait.Wait() + } + p.pmsg = nil // in case of rerr or werr + return err +} + +func (p *pipe) send(msg *beam.Message, mode int) (in *PipeReceiver, out *PipeSender, err error) { + // Prepare the message + pmsg := &pipeMessage{msg: msg} + if mode&beam.R != 0 { + in, pmsg.out = Pipe() + defer func() { + if err != nil { + in.Close() + in = nil + pmsg.out.Close() + } + }() + } + if mode&beam.W != 0 { + pmsg.in, out = Pipe() + defer func() { + if err != nil { + out.Close() + out = nil + pmsg.in.Close() + } + }() + } + err = p.psend(pmsg) + return +} + +func (p *pipe) preceive() (*pipeMessage, error) { + p.rl.Lock() + defer p.rl.Unlock() + + p.l.Lock() + defer p.l.Unlock() + for { + if p.rerr != nil { + return nil, io.ErrClosedPipe + } + if p.pmsg != nil { + break + } + if p.werr != nil { + return nil, p.werr + } + p.rwait.Wait() + } + pmsg := p.pmsg + p.pmsg = nil + p.wwait.Signal() + return pmsg, nil +} + +func (p *pipe) receive(mode int) (*beam.Message, *PipeReceiver, *PipeSender, error) { + pmsg, err := p.preceive() + if err != nil { + return nil, nil, nil, err + } + if pmsg.out != nil && mode&beam.W == 0 { + pmsg.out.Close() + pmsg.out = nil + } + if pmsg.in != nil && mode&beam.R == 0 { + pmsg.in.Close() + pmsg.in = nil + } + return pmsg.msg, pmsg.in, pmsg.out, nil +} + +func (p *pipe) rclose(err error) { + if err == nil { + err = io.ErrClosedPipe + } + p.l.Lock() + defer p.l.Unlock() + p.rerr = err + p.rwait.Signal() + p.wwait.Signal() +} + +func (p *pipe) wclose(err error) { + if err == nil { + err = io.EOF + } + p.l.Lock() + defer p.l.Unlock() + p.werr = err + p.rwait.Signal() + p.wwait.Signal() +} + +// PipeReceiver + +type PipeReceiver struct { + p *pipe +} + +func (r *PipeReceiver) Receive(mode int) (*beam.Message, beam.Receiver, beam.Sender, error) { + msg, pin, pout, err := r.p.receive(mode) + if err != nil { + return nil, nil, nil, err + } + var ( + // Always return NopReceiver/NopSender instead of nil values, + // because: + // - if they were requested in the mode, they can safely be used + // - if they were not requested, they can safely be ignored (ie no leak if they + // aren't closed) + in beam.Receiver = beam.NopReceiver{} + out beam.Sender = beam.NopSender{} + ) + if pin != nil { + in = pin + } + if pout != nil { + out = pout + } + return msg, in, out, err +} + +func (r *PipeReceiver) SendTo(dst beam.Sender) (int, error) { + var n int + // If the destination is a PipeSender, we can cheat + pdst, ok := dst.(*PipeSender) + if !ok { + return 0, beam.ErrIncompatibleSender + } + for { + pmsg, err := r.p.preceive() + if err == io.EOF { + break + } + if err != nil { + return n, err + } + if err := pdst.p.psend(pmsg); err != nil { + return n, err + } + } + n++ + return n, nil +} + +func (r *PipeReceiver) Close() error { + return r.CloseWithError(nil) +} + +func (r *PipeReceiver) CloseWithError(err error) error { + r.p.rclose(err) + return nil +} + +// PipeSender + +type PipeSender struct { + p *pipe +} + +func (w *PipeSender) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) { + pin, pout, err := w.p.send(msg, mode) + var ( + in beam.Receiver + out beam.Sender + ) + if pin != nil { + in = pin + } + if pout != nil { + out = pout + } + return in, out, err +} + +func (w *PipeSender) ReceiveFrom(src beam.Receiver) (int, error) { + var n int + // If the destination is a PipeReceiver, we can cheat + psrc, ok := src.(*PipeReceiver) + if !ok { + return 0, beam.ErrIncompatibleReceiver + } + for { + pmsg, err := psrc.p.preceive() + if err == io.EOF { + break + } + if err != nil { + return n, err + } + if err := w.p.psend(pmsg); err != nil { + return n, err + } + n++ + } + return n, nil +} + +func (w *PipeSender) Close() error { + return w.CloseWithError(nil) +} + +func (w *PipeSender) CloseWithError(err error) error { + w.p.wclose(err) + return nil +} diff --git a/beam/inmem/inmem_test.go b/beam/inmem/inmem_test.go new file mode 100644 index 0000000000..4703ddb053 --- /dev/null +++ b/beam/inmem/inmem_test.go @@ -0,0 +1,191 @@ +package inmem + +import ( + "fmt" + "github.com/docker/beam" + "github.com/dotcloud/docker/pkg/testutils" + "io/ioutil" + "os" + "testing" +) + +func TestReceiveW(t *testing.T) { + r, w := Pipe() + go func() { + w.Send(&beam.Message{Name: "hello"}, 0) + }() + _, _, ww, err := r.Receive(beam.W) + if err != nil { + t.Fatal(err) + } + if _, _, err := ww.Send(&beam.Message{Name: "this better not crash"}, 0); err != nil { + t.Fatal(err) + } +} + +func TestSimpleSend(t *testing.T) { + r, w := Pipe() + defer r.Close() + defer w.Close() + testutils.Timeout(t, func() { + go func() { + msg, in, out, err := r.Receive(0) + if err != nil { + t.Fatal(err) + } + if msg.Name != "print" { + t.Fatalf("%#v", *msg) + } + if msg.Args[0] != "hello world" { + t.Fatalf("%#v", *msg) + } + assertMode(t, in, out, 0) + }() + in, out, err := w.Send(&beam.Message{Name: "print", Args: []string{"hello world"}}, 0) + if err != nil { + t.Fatal(err) + } + assertMode(t, in, out, 0) + }) +} + +// assertMode verifies that the values of r and w match +// mode. +// If mode has the R bit set, r must be non-nil. +// If mode has the W bit set, w must be non-nil. +// +// If any of these conditions are not met, t.Fatal is called and the active +// test fails. +func assertMode(t *testing.T, r beam.Receiver, w beam.Sender, mode int) { + // If mode has the R bit set, r must be non-nil + if mode&beam.R != 0 { + if r == nil { + t.Fatalf("should be non-nil: %#v", r) + } + // Otherwise it must be nil. + } + // If mode has the W bit set, w must be non-nil + if mode&beam.W != 0 { + if w == nil { + t.Fatalf("should be non-nil: %#v", w) + } + // Otherwise it must be nil. + } +} + +func TestSendReply(t *testing.T) { + r, w := Pipe() + defer r.Close() + defer w.Close() + testutils.Timeout(t, func() { + // Send + go func() { + // Send a message with mode=R + in, out, err := w.Send(&beam.Message{Args: []string{"this is the request"}}, beam.R) + if err != nil { + t.Fatal(err) + } + assertMode(t, in, out, beam.R) + // Read for a reply + resp, _, _, err := in.Receive(0) + if err != nil { + t.Fatal(err) + } + if resp.Args[0] != "this is the reply" { + t.Fatalf("%#v", resp) + } + }() + // Receive a message with mode=W + msg, in, out, err := r.Receive(beam.W) + if err != nil { + t.Fatal(err) + } + if msg.Args[0] != "this is the request" { + t.Fatalf("%#v", msg) + } + assertMode(t, in, out, beam.W) + // Send a reply + _, _, err = out.Send(&beam.Message{Args: []string{"this is the reply"}}, 0) + if err != nil { + t.Fatal(err) + } + }) +} + +func TestSendNested(t *testing.T) { + r, w := Pipe() + defer r.Close() + defer w.Close() + testutils.Timeout(t, func() { + // Send + go func() { + // Send a message with mode=W + in, out, err := w.Send(&beam.Message{Args: []string{"this is the request"}}, beam.W) + if err != nil { + t.Fatal(err) + } + assertMode(t, in, out, beam.W) + // Send a nested message + _, _, err = out.Send(&beam.Message{Args: []string{"this is the nested message"}}, 0) + if err != nil { + t.Fatal(err) + } + }() + // Receive a message with mode=R + msg, in, out, err := r.Receive(beam.R) + if err != nil { + t.Fatal(err) + } + if msg.Args[0] != "this is the request" { + t.Fatalf("%#v", msg) + } + assertMode(t, in, out, beam.R) + // Read for a nested message + nested, _, _, err := in.Receive(0) + if err != nil { + t.Fatal(err) + } + if nested.Args[0] != "this is the nested message" { + t.Fatalf("%#v", nested) + } + }) +} + +func TestSendFile(t *testing.T) { + r, w := Pipe() + defer r.Close() + defer w.Close() + tmp, err := ioutil.TempFile("", "beam-test-") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmp.Name()) + fmt.Fprintf(tmp, "hello world\n") + tmp.Sync() + tmp.Seek(0, 0) + testutils.Timeout(t, func() { + go func() { + _, _, err := w.Send(&beam.Message{"file", []string{"path=" + tmp.Name()}, tmp}, 0) + if err != nil { + t.Fatal(err) + } + }() + msg, _, _, err := r.Receive(0) + if err != nil { + t.Fatal(err) + } + if msg.Name != "file" { + t.Fatalf("%#v", msg) + } + if msg.Args[0] != "path="+tmp.Name() { + t.Fatalf("%#v", msg) + } + txt, err := ioutil.ReadAll(msg.Att) + if err != nil { + t.Fatal(err) + } + if string(txt) != "hello world\n" { + t.Fatalf("%s\n", txt) + } + }) +} diff --git a/beam/nop.go b/beam/nop.go new file mode 100644 index 0000000000..061e5f41a5 --- /dev/null +++ b/beam/nop.go @@ -0,0 +1,21 @@ +package beam + +import ( + "io" +) + +type NopSender struct{} + +func (s NopSender) Send(msg *Message, mode int) (Receiver, Sender, error) { + return NopReceiver{}, NopSender{}, nil +} + +func (s NopSender) Close() error { + return nil +} + +type NopReceiver struct{} + +func (r NopReceiver) Receive(mode int) (*Message, Receiver, Sender, error) { + return nil, nil, nil, io.EOF +} diff --git a/beam/unix/beam.go b/beam/unix/beam.go new file mode 100644 index 0000000000..9e6dc90f1b --- /dev/null +++ b/beam/unix/beam.go @@ -0,0 +1,166 @@ +package unix + +import ( + "fmt" + "io" + "os" +) + +type Sender interface { + Send([]byte, *os.File) error +} + +type Receiver interface { + Receive() ([]byte, *os.File, error) +} + +type ReceiveCloser interface { + Receiver + Close() error +} + +type SendCloser interface { + Sender + Close() error +} + +type ReceiveSender interface { + Receiver + Sender +} + +const ( + R int = 1 << (32 - 1 - iota) + W +) + +func sendPipe(dst Sender, data []byte, mode int) (*os.File, error) { + r, w, err := os.Pipe() + if err != nil { + return nil, err + } + var ( + remote *os.File + local *os.File + ) + if mode == R { + remote = r + local = w + } else if mode == W { + remote = w + local = r + } + if err := dst.Send(data, remote); err != nil { + local.Close() + remote.Close() + return nil, err + } + return local, nil + +} + +// SendRPipe create a pipe and sends its *read* end attached in a beam message +// to `dst`, with `data` as the message payload. +// It returns the *write* end of the pipe, or an error. +func SendRPipe(dst Sender, data []byte) (*os.File, error) { + return sendPipe(dst, data, R) +} + +// SendWPipe create a pipe and sends its *read* end attached in a beam message +// to `dst`, with `data` as the message payload. +// It returns the *write* end of the pipe, or an error. +func SendWPipe(dst Sender, data []byte) (*os.File, error) { + return sendPipe(dst, data, W) +} + +func SendConn(dst Sender, data []byte) (conn *UnixConn, err error) { + local, remote, err := SocketPair() + if err != nil { + return nil, err + } + defer func() { + if err != nil { + local.Close() + remote.Close() + } + }() + conn, err = FileConn(local) + if err != nil { + return nil, err + } + local.Close() + if err := dst.Send(data, remote); err != nil { + return nil, err + } + return conn, nil +} + +func ReceiveConn(src Receiver) ([]byte, *UnixConn, error) { + for { + data, f, err := src.Receive() + if err != nil { + return nil, nil, err + } + if f == nil { + // Skip empty attachments + continue + } + conn, err := FileConn(f) + if err != nil { + // Skip beam attachments which are not connections + // (for example might be a regular file, directory etc) + continue + } + return data, conn, nil + } + panic("impossibru!") + return nil, nil, nil +} + +func Copy(dst Sender, src Receiver) (int, error) { + var n int + for { + payload, attachment, err := src.Receive() + if err == io.EOF { + return n, nil + } else if err != nil { + return n, err + } + if err := dst.Send(payload, attachment); err != nil { + if attachment != nil { + attachment.Close() + } + return n, err + } + n++ + } + panic("impossibru!") + return n, nil +} + +// MsgDesc returns a human readable description of a beam message, usually +// for debugging purposes. +func MsgDesc(payload []byte, attachment *os.File) string { + var filedesc string = "" + if attachment != nil { + filedesc = fmt.Sprintf("%d", attachment.Fd()) + } + return fmt.Sprintf("'%s'[%s]", payload, filedesc) +} + +type devnull struct{} + +func Devnull() ReceiveSender { + return devnull{} +} + +func (d devnull) Send(p []byte, a *os.File) error { + if a != nil { + a.Close() + } + return nil +} + +func (d devnull) Receive() ([]byte, *os.File, error) { + return nil, nil, io.EOF +} diff --git a/beam/unix/beam_test.go b/beam/unix/beam_test.go new file mode 100644 index 0000000000..83bd91e0d3 --- /dev/null +++ b/beam/unix/beam_test.go @@ -0,0 +1,39 @@ +package unix + +import ( + "github.com/dotcloud/docker/pkg/beam/data" + "testing" +) + +func TestSendConn(t *testing.T) { + a, b, err := USocketPair() + if err != nil { + t.Fatal(err) + } + defer a.Close() + defer b.Close() + go func() { + conn, err := SendConn(a, data.Empty().Set("type", "connection").Bytes()) + if err != nil { + t.Fatal(err) + } + if err := conn.Send(data.Empty().Set("foo", "bar").Bytes(), nil); err != nil { + t.Fatal(err) + } + conn.CloseWrite() + }() + payload, conn, err := ReceiveConn(b) + if err != nil { + t.Fatal(err) + } + if val := data.Message(string(payload)).Get("type"); val == nil || val[0] != "connection" { + t.Fatalf("%v != %v\n", val, "connection") + } + msg, _, err := conn.Receive() + if err != nil { + t.Fatal(err) + } + if val := data.Message(string(msg)).Get("foo"); val == nil || val[0] != "bar" { + t.Fatalf("%v != %v\n", val, "bar") + } +} diff --git a/beam/unix/conn.go b/beam/unix/conn.go new file mode 100644 index 0000000000..84b70ea130 --- /dev/null +++ b/beam/unix/conn.go @@ -0,0 +1,124 @@ +package unix + +import ( + "fmt" + "os" + + "github.com/docker/beam" + "github.com/docker/beam/data" +) + +func Pair() (*Conn, *Conn, error) { + c1, c2, err := USocketPair() + if err != nil { + return nil, nil, err + } + return &Conn{c1}, &Conn{c2}, nil +} + +type Conn struct { + *UnixConn +} + +func sendablePair() (conn *UnixConn, remoteFd *os.File, err error) { + // Get 2 *os.File + local, remote, err := SocketPair() + if err != nil { + return nil, nil, err + } + defer func() { + if err != nil { + local.Close() + remote.Close() + } + }() + // Convert 1 to *net.UnixConn + conn, err = FileConn(local) + if err != nil { + return nil, nil, err + } + local.Close() + // Return the "mismatched" pair + return conn, remote, nil +} + +// This implements beam.Sender.Close which *only closes the sender*. +// This is similar to the pattern of only closing go channels from +// the sender's side. +// If you want to close the entire connection, call Conn.UnixConn.Close. +func (c *Conn) Close() error { + return c.UnixConn.CloseWrite() +} + +func (c *Conn) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) { + if msg.Att != nil { + return nil, nil, fmt.Errorf("file attachment not yet implemented in unix transport") + } + parts := []string{msg.Name} + parts = append(parts, msg.Args...) + b := []byte(data.EncodeList(parts)) + // Setup nested streams + var ( + fd *os.File + r beam.Receiver + w beam.Sender + ) + if mode&(beam.R|beam.W) != 0 { + local, remote, err := sendablePair() + if err != nil { + return nil, nil, err + } + fd = remote + if mode&beam.R != 0 { + r = &Conn{local} + } + if mode&beam.W != 0 { + w = &Conn{local} + } else { + local.CloseWrite() + } + } + c.UnixConn.Send(b, fd) + return r, w, nil +} + +func (c *Conn) Receive(mode int) (*beam.Message, beam.Receiver, beam.Sender, error) { + b, fd, err := c.UnixConn.Receive() + if err != nil { + return nil, nil, nil, err + } + parts, n, err := data.DecodeList(string(b)) + if err != nil { + return nil, nil, nil, err + } + if n != len(b) { + return nil, nil, nil, fmt.Errorf("garbage data %#v", b[:n]) + } + if len(parts) == 0 { + return nil, nil, nil, fmt.Errorf("malformed message") + } + msg := &beam.Message{Name: parts[0], Args: parts[1:]} + + // Setup nested streams + var ( + r beam.Receiver + w beam.Sender + ) + // Apply mode mask + if fd != nil { + subconn, err := FileConn(fd) + if err != nil { + return nil, nil, nil, err + } + fd.Close() + if mode&beam.R != 0 { + r = &Conn{subconn} + } + if mode&beam.W != 0 { + w = &Conn{subconn} + } else { + subconn.CloseWrite() + } + } + return msg, r, w, nil +} diff --git a/beam/unix/conn_test.go b/beam/unix/conn_test.go new file mode 100644 index 0000000000..aa5be778e2 --- /dev/null +++ b/beam/unix/conn_test.go @@ -0,0 +1,154 @@ +package unix + +import ( + "github.com/docker/beam" + "github.com/dotcloud/docker/pkg/testutils" + "testing" +) + +func TestPair(t *testing.T) { + r, w, err := Pair() + if err != nil { + t.Fatal("Unexpected error") + } + defer r.Close() + defer w.Close() + testutils.Timeout(t, func() { + go func() { + msg, in, out, err := r.Receive(0) + if err != nil { + t.Fatal(err) + } + if msg.Name != "print" { + t.Fatalf("%#v", *msg) + } + if msg.Args[0] != "hello world" { + t.Fatalf("%#v", *msg) + } + if in != nil && out != nil { + t.Fatal("Unexpected return value") + } + }() + _, _, err := w.Send(&beam.Message{Name: "print", Args: []string{"hello world"}}, 0) + if err != nil { + t.Fatal(err) + } + }) +} + +func TestSendReply(t *testing.T) { + r, w, err := Pair() + if err != nil { + t.Fatal(err) + } + defer r.Close() + defer w.Close() + testutils.Timeout(t, func() { + // Send + go func() { + // Send a message with mode=R + in, out, err := w.Send(&beam.Message{Args: []string{"this is the request"}}, beam.R) + if err != nil { + t.Fatal(err) + } + assertMode(t, in, out, beam.R) + // Read for a reply + resp, _, _, err := in.Receive(0) + if err != nil { + t.Fatal(err) + } + if resp.Args[0] != "this is the reply" { + t.Fatalf("%#v", resp) + } + }() + // Receive a message with mode=W + msg, in, out, err := r.Receive(beam.W) + if err != nil { + t.Fatal(err) + } + if msg.Args[0] != "this is the request" { + t.Fatalf("%#v", msg) + } + assertMode(t, in, out, beam.W) + // Send a reply + _, _, err = out.Send(&beam.Message{Args: []string{"this is the reply"}}, 0) + if err != nil { + t.Fatal(err) + } + }) +} + +func TestSendNested(t *testing.T) { + r, w, err := Pair() + if err != nil { + t.Fatal(err) + } + defer r.Close() + defer w.Close() + testutils.Timeout(t, func() { + // Send + go func() { + // Send a message with mode=W + in, out, err := w.Send(&beam.Message{Args: []string{"this is the request"}}, beam.W) + if err != nil { + t.Fatal(err) + } + assertMode(t, in, out, beam.W) + // Send a nested message + _, _, err = out.Send(&beam.Message{Args: []string{"this is the nested message"}}, 0) + if err != nil { + t.Fatal(err) + } + }() + // Receive a message with mode=R + msg, in, out, err := r.Receive(beam.R) + if err != nil { + t.Fatal(err) + } + if msg.Args[0] != "this is the request" { + t.Fatalf("%#v", msg) + } + assertMode(t, in, out, beam.R) + // Read for a nested message + nested, _, _, err := in.Receive(0) + if err != nil { + t.Fatal(err) + } + if nested.Args[0] != "this is the nested message" { + t.Fatalf("%#v", nested) + } + }) +} + +// FIXME: duplicate from inmem/inmem_test.go +// assertMode verifies that the values of r and w match +// mode. +// If mode has the R bit set, r must be non-nil. Otherwise it must be nil. +// If mode has the W bit set, w must be non-nil. Otherwise it must be nil. +// +// If any of these conditions are not met, t.Fatal is called and the active +// test fails. +func assertMode(t *testing.T, r beam.Receiver, w beam.Sender, mode int) { + // If mode has the R bit set, r must be non-nil + if mode&beam.R != 0 { + if r == nil { + t.Fatalf("should be non-nil: %#v", r) + } + // Otherwise it must be nil. + } else { + if r != nil { + t.Fatalf("should be nil: %#v", r) + } + } + // If mode has the W bit set, w must be non-nil + if mode&beam.W != 0 { + if w == nil { + t.Fatalf("should be non-nil: %#v", w) + } + // Otherwise it must be nil. + } else { + if w != nil { + t.Fatalf("should be nil: %#v", w) + } + } +} diff --git a/beam/unix/unix.go b/beam/unix/unix.go new file mode 100644 index 0000000000..594eb21b10 --- /dev/null +++ b/beam/unix/unix.go @@ -0,0 +1,317 @@ +package unix + +import ( + "bufio" + "fmt" + "net" + "os" + "syscall" +) + +func debugCheckpoint(msg string, args ...interface{}) { + if os.Getenv("DEBUG") == "" { + return + } + os.Stdout.Sync() + tty, _ := os.OpenFile("/dev/tty", os.O_RDWR, 0700) + fmt.Fprintf(tty, msg, args...) + bufio.NewScanner(tty).Scan() + tty.Close() +} + +type UnixConn struct { + *net.UnixConn + fds []*os.File +} + +// Framing: +// In order to handle framing in Send/Recieve, as these give frame +// boundaries we use a very simple 4 bytes header. It is a big endiand +// uint32 where the high bit is set if the message includes a file +// descriptor. The rest of the uint32 is the length of the next frame. +// We need the bit in order to be able to assign recieved fds to +// the right message, as multiple messages may be coalesced into +// a single recieve operation. +func makeHeader(data []byte, fds []int) ([]byte, error) { + header := make([]byte, 4) + + length := uint32(len(data)) + + if length > 0x7fffffff { + return nil, fmt.Errorf("Data to large") + } + + if len(fds) != 0 { + length = length | 0x80000000 + } + header[0] = byte((length >> 24) & 0xff) + header[1] = byte((length >> 16) & 0xff) + header[2] = byte((length >> 8) & 0xff) + header[3] = byte((length >> 0) & 0xff) + + return header, nil +} + +func parseHeader(header []byte) (uint32, bool) { + length := uint32(header[0])<<24 | uint32(header[1])<<16 | uint32(header[2])<<8 | uint32(header[3]) + hasFd := length&0x80000000 != 0 + length = length & ^uint32(0x80000000) + + return length, hasFd +} + +func FileConn(f *os.File) (*UnixConn, error) { + conn, err := net.FileConn(f) + if err != nil { + return nil, err + } + uconn, ok := conn.(*net.UnixConn) + if !ok { + conn.Close() + return nil, fmt.Errorf("%d: not a unix connection", f.Fd()) + } + return &UnixConn{UnixConn: uconn}, nil + +} + +// Send sends a new message on conn with data and f as payload and +// attachment, respectively. +// On success, f is closed +func (conn *UnixConn) Send(data []byte, f *os.File) error { + { + var fd int = -1 + if f != nil { + fd = int(f.Fd()) + } + debugCheckpoint("===DEBUG=== about to send '%s'[%d]. Hit enter to confirm: ", data, fd) + } + var fds []int + if f != nil { + fds = append(fds, int(f.Fd())) + } + if err := conn.sendUnix(data, fds...); err != nil { + return err + } + + if f != nil { + f.Close() + } + return nil +} + +// Receive waits for a new message on conn, and receives its payload +// and attachment, or an error if any. +// +// If more than 1 file descriptor is sent in the message, they are all +// closed except for the first, which is the attachment. +// It is legal for a message to have no attachment or an empty payload. +func (conn *UnixConn) Receive() (rdata []byte, rf *os.File, rerr error) { + defer func() { + var fd int = -1 + if rf != nil { + fd = int(rf.Fd()) + } + debugCheckpoint("===DEBUG=== Receive() -> '%s'[%d]. Hit enter to continue.\n", rdata, fd) + }() + + // Read header + header := make([]byte, 4) + nRead := uint32(0) + + for nRead < 4 { + n, err := conn.receiveUnix(header[nRead:]) + if err != nil { + return nil, nil, err + } + nRead = nRead + uint32(n) + } + + length, hasFd := parseHeader(header) + + if hasFd { + if len(conn.fds) == 0 { + return nil, nil, fmt.Errorf("No expected file descriptor in message") + } + + rf = conn.fds[0] + conn.fds = conn.fds[1:] + } + + rdata = make([]byte, length) + + nRead = 0 + for nRead < length { + n, err := conn.receiveUnix(rdata[nRead:]) + if err != nil { + return nil, nil, err + } + nRead = nRead + uint32(n) + } + + return +} + +func (conn *UnixConn) receiveUnix(buf []byte) (int, error) { + oob := make([]byte, syscall.CmsgSpace(4)) + bufn, oobn, _, _, err := conn.ReadMsgUnix(buf, oob) + if err != nil { + return 0, err + } + fd := extractFd(oob[:oobn]) + if fd != -1 { + f := os.NewFile(uintptr(fd), "") + conn.fds = append(conn.fds, f) + } + + return bufn, nil +} + +func (conn *UnixConn) sendUnix(data []byte, fds ...int) error { + header, err := makeHeader(data, fds) + if err != nil { + return err + } + + // There is a bug in conn.WriteMsgUnix where it doesn't correctly return + // the number of bytes writte (http://code.google.com/p/go/issues/detail?id=7645) + // So, we can't rely on the return value from it. However, we must use it to + // send the fds. In order to handle this we only write one byte using WriteMsgUnix + // (when we have to), as that can only ever block or fully suceed. We then write + // the rest with conn.Write() + // The reader side should not rely on this though, as hopefully this gets fixed + // in go later. + written := 0 + if len(fds) != 0 { + oob := syscall.UnixRights(fds...) + wrote, _, err := conn.WriteMsgUnix(header[0:1], oob, nil) + if err != nil { + return err + } + written = written + wrote + } + + for written < len(header) { + wrote, err := conn.Write(header[written:]) + if err != nil { + return err + } + written = written + wrote + } + + written = 0 + for written < len(data) { + wrote, err := conn.Write(data[written:]) + if err != nil { + return err + } + written = written + wrote + } + + return nil +} + +func extractFd(oob []byte) int { + // Grab forklock to make sure no forks accidentally inherit the new + // fds before they are made CLOEXEC + // There is a slight race condition between ReadMsgUnix returns and + // when we grap the lock, so this is not perfect. Unfortunately + // There is no way to pass MSG_CMSG_CLOEXEC to recvmsg() nor any + // way to implement non-blocking i/o in go, so this is hard to fix. + syscall.ForkLock.Lock() + defer syscall.ForkLock.Unlock() + scms, err := syscall.ParseSocketControlMessage(oob) + if err != nil { + return -1 + } + + foundFd := -1 + for _, scm := range scms { + fds, err := syscall.ParseUnixRights(&scm) + if err != nil { + continue + } + + for _, fd := range fds { + if foundFd == -1 { + syscall.CloseOnExec(fd) + foundFd = fd + } else { + syscall.Close(fd) + } + } + } + + return foundFd +} + +func socketpair() ([2]int, error) { + return syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM|syscall.FD_CLOEXEC, 0) +} + +// SocketPair is a convenience wrapper around the socketpair(2) syscall. +// It returns a unix socket of type SOCK_STREAM in the form of 2 file descriptors +// not bound to the underlying filesystem. +// Messages sent on one end are received on the other, and vice-versa. +// It is the caller's responsibility to close both ends. +func SocketPair() (a *os.File, b *os.File, err error) { + defer func() { + var ( + fdA int = -1 + fdB int = -1 + ) + if a != nil { + fdA = int(a.Fd()) + } + if b != nil { + fdB = int(b.Fd()) + } + debugCheckpoint("===DEBUG=== SocketPair() = [%d-%d]. Hit enter to confirm: ", fdA, fdB) + }() + pair, err := socketpair() + if err != nil { + return nil, nil, err + } + return os.NewFile(uintptr(pair[0]), ""), os.NewFile(uintptr(pair[1]), ""), nil +} + +func USocketPair() (*UnixConn, *UnixConn, error) { + debugCheckpoint("===DEBUG=== USocketPair(). Hit enter to confirm: ") + defer debugCheckpoint("===DEBUG=== USocketPair() returned. Hit enter to confirm ") + a, b, err := SocketPair() + if err != nil { + return nil, nil, err + } + defer a.Close() + defer b.Close() + uA, err := FileConn(a) + if err != nil { + return nil, nil, err + } + uB, err := FileConn(b) + if err != nil { + uA.Close() + return nil, nil, err + } + return uA, uB, nil +} + +// FdConn wraps a file descriptor in a standard *net.UnixConn object, or +// returns an error if the file descriptor does not point to a unix socket. +// This creates a duplicate file descriptor. It's the caller's responsibility +// to close both. +func FdConn(fd int) (n *net.UnixConn, err error) { + { + debugCheckpoint("===DEBUG=== FdConn([%d]) = (unknown fd). Hit enter to confirm: ", fd) + } + f := os.NewFile(uintptr(fd), fmt.Sprintf("%d", fd)) + conn, err := net.FileConn(f) + if err != nil { + return nil, err + } + uconn, ok := conn.(*net.UnixConn) + if !ok { + conn.Close() + return nil, fmt.Errorf("%d: not a unix connection", fd) + } + return uconn, nil +} diff --git a/beam/unix/unix_test.go b/beam/unix/unix_test.go new file mode 100644 index 0000000000..7f947760b3 --- /dev/null +++ b/beam/unix/unix_test.go @@ -0,0 +1,237 @@ +package unix + +import ( + "fmt" + "io/ioutil" + "testing" +) + +func TestSocketPair(t *testing.T) { + a, b, err := SocketPair() + if err != nil { + t.Fatal(err) + } + go func() { + a.Write([]byte("hello world!")) + fmt.Printf("done writing. closing\n") + a.Close() + fmt.Printf("done closing\n") + }() + data, err := ioutil.ReadAll(b) + if err != nil { + t.Fatal(err) + } + fmt.Printf("--> %s\n", data) + fmt.Printf("still open: %v\n", a.Fd()) +} + +func TestUSocketPair(t *testing.T) { + a, b, err := USocketPair() + if err != nil { + t.Fatal(err) + } + + data := "hello world!" + go func() { + a.Write([]byte(data)) + a.Close() + }() + res := make([]byte, 1024) + size, err := b.Read(res) + if err != nil { + t.Fatal(err) + } + if size != len(data) { + t.Fatal("Unexpected size") + } + if string(res[0:size]) != data { + t.Fatal("Unexpected data") + } +} + +func TestSendUnixSocket(t *testing.T) { + a1, a2, err := USocketPair() + if err != nil { + t.Fatal(err) + } + // defer a1.Close() + // defer a2.Close() + b1, b2, err := USocketPair() + if err != nil { + t.Fatal(err) + } + // defer b1.Close() + // defer b2.Close() + glueA, glueB, err := SocketPair() + if err != nil { + t.Fatal(err) + } + // defer glueA.Close() + // defer glueB.Close() + go func() { + err := b2.Send([]byte("a"), glueB) + if err != nil { + t.Fatal(err) + } + }() + go func() { + err := a2.Send([]byte("b"), glueA) + if err != nil { + t.Fatal(err) + } + }() + connAhdr, connA, err := a1.Receive() + if err != nil { + t.Fatal(err) + } + if string(connAhdr) != "b" { + t.Fatalf("unexpected: %s", connAhdr) + } + connBhdr, connB, err := b1.Receive() + if err != nil { + t.Fatal(err) + } + if string(connBhdr) != "a" { + t.Fatalf("unexpected: %s", connBhdr) + } + fmt.Printf("received both ends: %v <-> %v\n", connA.Fd(), connB.Fd()) + go func() { + fmt.Printf("sending message on %v\n", connA.Fd()) + connA.Write([]byte("hello world")) + connA.Sync() + fmt.Printf("closing %v\n", connA.Fd()) + connA.Close() + }() + data, err := ioutil.ReadAll(connB) + if err != nil { + t.Fatal(err) + } + fmt.Printf("---> %s\n", data) + +} + +// Ensure we get proper segmenting of messages +func TestSendSegmenting(t *testing.T) { + a, b, err := USocketPair() + if err != nil { + t.Fatal(err) + } + defer a.Close() + defer b.Close() + + extrafd1, extrafd2, err := SocketPair() + if err != nil { + t.Fatal(err) + } + extrafd2.Close() + + go func() { + a.Send([]byte("message 1"), nil) + a.Send([]byte("message 2"), extrafd1) + a.Send([]byte("message 3"), nil) + }() + + msg1, file1, err := b.Receive() + if err != nil { + t.Fatal(err) + } + if string(msg1) != "message 1" { + t.Fatal("unexpected msg1:", string(msg1)) + } + if file1 != nil { + t.Fatal("unexpectedly got file1") + } + + msg2, file2, err := b.Receive() + if err != nil { + t.Fatal(err) + } + if string(msg2) != "message 2" { + t.Fatal("unexpected msg2:", string(msg2)) + } + if file2 == nil { + t.Fatal("didn't get file2") + } + file2.Close() + + msg3, file3, err := b.Receive() + if err != nil { + t.Fatal(err) + } + if string(msg3) != "message 3" { + t.Fatal("unexpected msg3:", string(msg3)) + } + if file3 != nil { + t.Fatal("unexpectedly got file3") + } + +} + +// Test sending a zero byte message +func TestSendEmpty(t *testing.T) { + a, b, err := USocketPair() + if err != nil { + t.Fatal(err) + } + defer a.Close() + defer b.Close() + go func() { + a.Send([]byte{}, nil) + }() + + msg, file, err := b.Receive() + if err != nil { + t.Fatal(err) + } + if len(msg) != 0 { + t.Fatalf("unexpected non-empty message: %v", msg) + } + if file != nil { + t.Fatal("unexpectedly got file") + } + +} + +func makeLarge(size int) []byte { + res := make([]byte, size) + for i := range res { + res[i] = byte(i % 255) + } + return res +} + +func verifyLarge(data []byte, size int) bool { + if len(data) != size { + return false + } + for i := range data { + if data[i] != byte(i%255) { + return false + } + } + return true +} + +// Test sending a large message +func TestSendLarge(t *testing.T) { + a, b, err := USocketPair() + if err != nil { + t.Fatal(err) + } + defer a.Close() + defer b.Close() + go func() { + a.Send(makeLarge(100000), nil) + }() + + msg, file, err := b.Receive() + if err != nil { + t.Fatal(err) + } + if !verifyLarge(msg, 100000) { + t.Fatalf("unexpected message (size %d)", len(msg)) + } + if file != nil { + t.Fatal("unexpectedly got file") + } +} diff --git a/beam/utils/buf.go b/beam/utils/buf.go new file mode 100644 index 0000000000..d4164a0220 --- /dev/null +++ b/beam/utils/buf.go @@ -0,0 +1,17 @@ +package utils + +import ( + "github.com/docker/beam" +) + +type Buffer []*beam.Message + +func (buf *Buffer) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) { + (*buf) = append(*buf, msg) + return beam.NopReceiver{}, beam.NopSender{}, nil +} + +func (buf *Buffer) Close() error { + (*buf) = nil + return nil +} diff --git a/beam/utils/copy.go b/beam/utils/copy.go new file mode 100644 index 0000000000..ad41758d46 --- /dev/null +++ b/beam/utils/copy.go @@ -0,0 +1,58 @@ +package utils + +import ( + "github.com/docker/beam" + "sync" +) + +func Copy(dst beam.Sender, src beam.Receiver) (int, error) { + var tasks sync.WaitGroup + defer tasks.Wait() + if senderTo, ok := src.(beam.SenderTo); ok { + if n, err := senderTo.SendTo(dst); err != beam.ErrIncompatibleSender { + return n, err + } + } + if receiverFrom, ok := dst.(beam.ReceiverFrom); ok { + if n, err := receiverFrom.ReceiveFrom(src); err != beam.ErrIncompatibleReceiver { + return n, err + } + } + var ( + n int + ) + copyAndClose := func(dst beam.Sender, src beam.Receiver) { + if dst == nil { + return + } + defer dst.Close() + if src == nil { + return + } + Copy(dst, src) + } + for { + msg, rcvR, rcvW, err := src.Receive(beam.R | beam.W) + if err != nil { + return n, err + } + sndR, sndW, err := dst.Send(msg, beam.R|beam.W) + if err != nil { + if rcvW != nil { + rcvW.Close() + } + return n, err + } + tasks.Add(2) + go func() { + copyAndClose(rcvW, sndR) + tasks.Done() + }() + go func() { + copyAndClose(sndW, rcvR) + tasks.Done() + }() + n++ + } + return n, nil +} diff --git a/beam/utils/hub.go b/beam/utils/hub.go new file mode 100644 index 0000000000..b0dba9a5ef --- /dev/null +++ b/beam/utils/hub.go @@ -0,0 +1,128 @@ +package utils + +import ( + "fmt" + "github.com/docker/beam" + "github.com/docker/beam/inmem" + "io" + "sync" +) + +// Hub passes messages to dynamically registered handlers. +type Hub struct { + handlers *StackSender + tasks sync.WaitGroup +} + +func NewHub() *Hub { + return &Hub{ + handlers: NewStackSender(), + } +} + +func (hub *Hub) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) { + if msg.Name == "register" { + if mode&beam.R == 0 { + return nil, nil, fmt.Errorf("register: no return channel") + } + fmt.Printf("[hub] received %v\n", msg) + hYoutr, hYoutw := inmem.Pipe() + hYinr, hYinw := inmem.Pipe() + // Register the new handler on top of the others, + // and get a reference to the previous stack of handlers. + prevHandlers := hub.handlers.Add(hYinw) + // Pass requests from the new handler to the previous chain of handlers + // hYout -> hXin + hub.tasks.Add(1) + go func() { + defer hub.tasks.Done() + Copy(prevHandlers, hYoutr) + hYoutr.Close() + }() + return hYinr, hYoutw, nil + } + fmt.Printf("sending %#v to %d handlers\n", msg, hub.handlers.Len()) + return hub.handlers.Send(msg, mode) +} + +func (hub *Hub) Register(dst beam.Sender) error { + in, _, err := hub.Send(&beam.Message{Name: "register"}, beam.R) + if err != nil { + return err + } + go Copy(dst, in) + return nil +} + +func (hub *Hub) RegisterTask(h func(beam.Receiver, beam.Sender) error) error { + in, out, err := hub.Send(&beam.Message{Name: "register"}, beam.R|beam.W) + if err != nil { + return err + } + go func() { + h(in, out) + out.Close() + }() + return nil +} + +type Handler func(msg *beam.Message, in beam.Receiver, out beam.Sender, next beam.Sender) (pass bool, err error) + +func (hub *Hub) RegisterName(name string, h Handler) error { + return hub.RegisterTask(func(in beam.Receiver, out beam.Sender) error { + var tasks sync.WaitGroup + copyTask := func(dst beam.Sender, src beam.Receiver) { + tasks.Add(1) + go func() { + defer tasks.Done() + if dst == nil { + return + } + defer dst.Close() + if src == nil { + return + } + Copy(dst, src) + }() + } + for { + msg, msgin, msgout, err := in.Receive(beam.R | beam.W) + if err == io.EOF { + break + } + if err != nil { + return err + } + var pass = true + if msg.Name == name || name == "" { + pass, err = h(msg, msgin, msgout, out) + if err != nil { + if _, _, err := msgout.Send(&beam.Message{Name: "error", Args: []string{err.Error()}}, 0); err != nil { + return err + } + } + } + if pass { + nextin, nextout, err := out.Send(msg, beam.R|beam.W) + if err != nil { + return err + } + copyTask(nextout, msgin) + copyTask(msgout, nextin) + } else { + if msgout != nil { + msgout.Close() + } + } + } + return nil + }) +} + +func (hub *Hub) Wait() { + hub.tasks.Wait() +} + +func (hub *Hub) Close() error { + return hub.handlers.Close() +} diff --git a/beam/utils/hub_test.go b/beam/utils/hub_test.go new file mode 100644 index 0000000000..c9b40a4817 --- /dev/null +++ b/beam/utils/hub_test.go @@ -0,0 +1,55 @@ +package utils + +import ( + "github.com/docker/beam" + "github.com/dotcloud/docker/pkg/testutils" + "testing" +) + +func TestHubSendEmpty(t *testing.T) { + hub := NewHub() + // Send to empty hub should silently drop + r, w, err := hub.Send(&beam.Message{Name: "hello", Args: nil}, beam.R|beam.W) + // Send must not return an error + if err != nil { + t.Fatal(err) + } + // We set beam.R, so a valid receiver must be returned + if r == nil { + t.Fatalf("%#v", r) + } + // We set beam.W, so a valid receiver must be returned + if w == nil { + t.Fatalf("%#v", w) + } +} + +type CountSender int + +func (s *CountSender) Send(msg *beam.Message, mode int) (beam.Receiver, beam.Sender, error) { + (*s)++ + return nil, nil, nil +} + +func TestHubSendOneHandler(t *testing.T) { + hub := NewHub() + defer hub.Close() + testutils.Timeout(t, func() { + in, _, err := hub.Send(&beam.Message{Name: "register", Args: nil}, beam.R) + if err != nil { + t.Fatal(err) + } + go func() { + if _, _, err := hub.Send(&beam.Message{Name: "hello", Args: nil}, 0); err != nil { + t.Fatal(err) + } + }() + msg, _, _, err := in.Receive(0) + if err != nil { + t.Fatal(err) + } + if msg.Name != "hello" { + t.Fatalf("%#v", msg) + } + }) +} diff --git a/beam/utils/stack.go b/beam/utils/stack.go new file mode 100644 index 0000000000..11fd678288 --- /dev/null +++ b/beam/utils/stack.go @@ -0,0 +1,112 @@ +package utils + +import ( + "container/list" + "fmt" + "github.com/docker/beam" + "strings" + "sync" +) + +// StackSender forwards beam messages to a dynamic list of backend receivers. +// New backends are stacked on top. When a message is sent, each backend is +// tried until one succeeds. Any failing backends encountered along the way +// are removed from the queue. +type StackSender struct { + stack *list.List + l sync.RWMutex +} + +func NewStackSender() *StackSender { + stack := list.New() + return &StackSender{ + stack: stack, + } +} + +func (s *StackSender) Send(msg *beam.Message, mode int) (r beam.Receiver, w beam.Sender, err error) { + completed := s.walk(func(h beam.Sender) (ok bool) { + r, w, err = h.Send(msg, mode) + fmt.Printf("[stacksender] sending %v to %#v returned %v\n", msg, h, err) + if err == nil { + return true + } + return false + }) + // If walk was completed, it means we didn't find a valid handler + if !completed { + return r, w, err + } + // Silently drop messages if no valid backend is available. + return beam.NopSender{}.Send(msg, mode) +} + +func (s *StackSender) Add(dst beam.Sender) *StackSender { + s.l.Lock() + defer s.l.Unlock() + prev := &StackSender{ + stack: list.New(), + } + prev.stack.PushFrontList(s.stack) + fmt.Printf("[ADD] prev %#v\n", prev) + s.stack.PushFront(dst) + return prev +} + +func (s *StackSender) Close() error { + s.walk(func(h beam.Sender) bool { + h.Close() + // remove all handlers + return false + }) + return nil +} + +func (s *StackSender) _walk(f func(*list.Element) bool) bool { + var e *list.Element + s.l.RLock() + e = s.stack.Front() + s.l.RUnlock() + for e != nil { + fmt.Printf("[StackSender.Walk] %v\n", e.Value.(beam.Sender)) + s.l.RLock() + next := e.Next() + s.l.RUnlock() + cont := f(e) + if !cont { + return false + } + e = next + } + return true +} + +func (s *StackSender) walk(f func(beam.Sender) bool) bool { + return s._walk(func(e *list.Element) bool { + ok := f(e.Value.(beam.Sender)) + if ok { + // Found a valid handler. Stop walking. + return false + } + // Invalid handler: remove. + s.l.Lock() + s.stack.Remove(e) + s.l.Unlock() + return true + }) +} + +func (s *StackSender) Len() int { + s.l.RLock() + defer s.l.RUnlock() + return s.stack.Len() +} + +func (s *StackSender) String() string { + var parts []string + s._walk(func(e *list.Element) bool { + parts = append(parts, fmt.Sprintf("%v", e.Value.(beam.Sender))) + return true + }) + return fmt.Sprintf("%d:[%s]", len(parts), strings.Join(parts, "->")) +} diff --git a/beam/utils/stack_test.go b/beam/utils/stack_test.go new file mode 100644 index 0000000000..679e8349f0 --- /dev/null +++ b/beam/utils/stack_test.go @@ -0,0 +1,128 @@ +package utils + +import ( + "github.com/docker/beam" + "github.com/docker/beam/inmem" + "github.com/docker/beam/unix" + "github.com/dotcloud/docker/pkg/testutils" + "strings" + "testing" +) + +func TestStackWithPipe(t *testing.T) { + r, w := inmem.Pipe() + defer r.Close() + defer w.Close() + s := NewStackSender() + s.Add(w) + testutils.Timeout(t, func() { + go func() { + msg, _, _, err := r.Receive(0) + if err != nil { + t.Fatal(err) + } + if msg.Name != "hello" { + t.Fatalf("%#v", msg) + } + if strings.Join(msg.Args, " ") != "wonderful world" { + t.Fatalf("%#v", msg) + } + }() + _, _, err := s.Send(&beam.Message{Name: "hello", Args: []string{"wonderful", "world"}}, 0) + if err != nil { + t.Fatal(err) + } + }) +} + +func TestStackWithPair(t *testing.T) { + r, w, err := unix.Pair() + if err != nil { + t.Fatal(err) + } + defer r.Close() + defer w.Close() + s := NewStackSender() + s.Add(w) + testutils.Timeout(t, func() { + go func() { + msg, _, _, err := r.Receive(0) + if err != nil { + t.Fatal(err) + } + if msg.Name != "hello" { + t.Fatalf("%#v", msg) + } + if strings.Join(msg.Args, " ") != "wonderful world" { + t.Fatalf("%#v", msg) + } + }() + _, _, err := s.Send(&beam.Message{Name: "hello", Args: []string{"wonderful", "world"}}, 0) + if err != nil { + t.Fatal(err) + } + }) +} + +func TestStackLen(t *testing.T) { + s := NewStackSender() + if s.Len() != 0 { + t.Fatalf("empty StackSender has length %d", s.Len()) + } +} + +func TestStackAdd(t *testing.T) { + s := NewStackSender() + a := Buffer{} + beforeA := s.Add(&a) + // Add on an empty StackSender should return an empty StackSender + if beforeA.Len() != 0 { + t.Fatalf("%s has %d elements", beforeA, beforeA.Len()) + } + if s.Len() != 1 { + t.Fatalf("%#v", beforeA) + } + // Add a 2nd element + b := Buffer{} + beforeB := s.Add(&b) + if beforeB.Len() != 1 { + t.Fatalf("%#v", beforeA) + } + if s.Len() != 2 { + t.Fatalf("%#v", beforeA) + } + s.Send(&beam.Message{Name: "for b", Args: nil}, 0) + beforeB.Send(&beam.Message{Name: "for a", Args: nil}, 0) + beforeA.Send(&beam.Message{Name: "for nobody", Args: nil}, 0) + if len(a) != 1 { + t.Fatalf("%#v", a) + } + if len(b) != 1 { + t.Fatalf("%#v", b) + } +} + +// Misbehaving backends must be removed +func TestStackAddBad(t *testing.T) { + s := NewStackSender() + buf := Buffer{} + s.Add(&buf) + r, w := inmem.Pipe() + s.Add(w) + if s.Len() != 2 { + t.Fatalf("%#v", s) + } + r.Close() + if _, _, err := s.Send(&beam.Message{Name: "for the buffer", Args: nil}, 0); err != nil { + t.Fatal(err) + } + if s.Len() != 1 { + t.Fatalf("%#v") + } + if len(buf) != 1 { + t.Fatalf("%#v", buf) + } + if buf[0].Name != "for the buffer" { + t.Fatalf("%#v", buf) + } +}