diff --git a/beam/AUTHORS b/beam/AUTHORS deleted file mode 100644 index db33365bcd..0000000000 --- a/beam/AUTHORS +++ /dev/null @@ -1 +0,0 @@ -Solomon Hykes diff --git a/beam/MAINTAINERS b/beam/MAINTAINERS deleted file mode 100644 index aee10c8421..0000000000 --- a/beam/MAINTAINERS +++ /dev/null @@ -1 +0,0 @@ -Solomon Hykes (@shykes) diff --git a/beam/README.md b/beam/README.md deleted file mode 100644 index 5277c7f361..0000000000 --- a/beam/README.md +++ /dev/null @@ -1,4 +0,0 @@ -# Beam - -**WARNING:** This has been renamed [libchan](https://github.com/docker/libchan). The code in this directory is an old version of libchan which will be removed once libswarm has been ported over to libchan. - diff --git a/beam/beam.go b/beam/beam.go deleted file mode 100644 index b89ba8317e..0000000000 --- a/beam/beam.go +++ /dev/null @@ -1,66 +0,0 @@ -package beam - -import ( - "errors" - "os" -) - -type Sender interface { - Send(msg *Message) (Receiver, error) - Close() error -} - -type Receiver interface { - Receive(mode int) (*Message, error) -} - -type Message struct { - Verb Verb - Args []string - Att *os.File - Ret Sender -} - -const ( - Ret int = 1 << iota - // FIXME: use an `Att` flag to auto-close attachments by default -) - -type ReceiverFrom interface { - ReceiveFrom(Receiver) (int, error) -} - -type SenderTo interface { - SendTo(Sender) (int, error) -} - -var ( - ErrIncompatibleSender = errors.New("incompatible sender") - ErrIncompatibleReceiver = errors.New("incompatible receiver") -) - -// RetPipe is a special value for `Message.Ret`. -// When a Message is sent with `Ret=SendPipe`, the transport must -// substitute it with the writing end of a new pipe, and return the -// other end as a return value. -type retPipe struct { - NopSender -} - -var RetPipe = retPipe{} - -func (r retPipe) Equals(val Sender) bool { - if rval, ok := val.(retPipe); ok { - return rval == r - } - return false -} - -func Repeater(payload *Message) Sender { - return Handler(func(msg *Message) error { - msg.Ret.Send(payload) - return nil - }) -} - -var NotImplemented = Repeater(&Message{Verb: Error, Args: []string{"not implemented"}}) diff --git a/beam/beam_test.go b/beam/beam_test.go deleted file mode 100644 index 6d504f2ce7..0000000000 --- a/beam/beam_test.go +++ /dev/null @@ -1,20 +0,0 @@ -package beam - -import ( - "testing" -) - -func TestModes(t *testing.T) { - if Ret == 0 { - t.Fatalf("0") - } -} - -func TestRetPipe(t *testing.T) { - var ( - shouldBeEqual = RetPipe - ) - if RetPipe != shouldBeEqual { - t.Fatalf("%#v should equal %#v", RetPipe, shouldBeEqual) - } -} diff --git a/beam/copy.go b/beam/copy.go deleted file mode 100644 index d62c03b8b4..0000000000 --- a/beam/copy.go +++ /dev/null @@ -1,38 +0,0 @@ -package beam - -import ( - "io" - "sync" -) - -func Copy(dst Sender, src Receiver) (int, error) { - var tasks sync.WaitGroup - defer tasks.Wait() - if senderTo, ok := src.(SenderTo); ok { - if n, err := senderTo.SendTo(dst); err != ErrIncompatibleSender { - return n, err - } - } - if receiverFrom, ok := dst.(ReceiverFrom); ok { - if n, err := receiverFrom.ReceiveFrom(src); err != ErrIncompatibleReceiver { - return n, err - } - } - var ( - n int - ) - for { - msg, err := src.Receive(Ret) - if err == io.EOF { - return n, nil - } - if err != nil { - return n, err - } - if _, err := dst.Send(msg); err != nil { - return n, err - } - n++ - } - return n, nil -} diff --git a/beam/data/data.go b/beam/data/data.go deleted file mode 100644 index 5ece063f51..0000000000 --- a/beam/data/data.go +++ /dev/null @@ -1,119 +0,0 @@ -package data - -import ( - "fmt" - "strconv" - "strings" -) - -func Encode(obj map[string][]string) string { - var msg string - msg += encodeHeader(0) - for k, values := range obj { - msg += encodeNamedList(k, values) - } - return msg -} - -func encodeHeader(msgtype int) string { - return fmt.Sprintf("%03.3d;", msgtype) -} - -func encodeString(s string) string { - return fmt.Sprintf("%d:%s,", len(s), s) -} - -var EncodeString = encodeString -var DecodeString = decodeString - -var EncodeList = encodeList - -func encodeList(l []string) string { - values := make([]string, 0, len(l)) - for _, s := range l { - values = append(values, encodeString(s)) - } - return encodeString(strings.Join(values, "")) -} - -func encodeNamedList(name string, l []string) string { - return encodeString(name) + encodeList(l) -} - -func Decode(msg string) (map[string][]string, error) { - msgtype, skip, err := decodeHeader(msg) - if err != nil { - return nil, err - } - if msgtype != 0 { - // FIXME: use special error type so the caller can easily ignore - return nil, fmt.Errorf("unknown message type: %d", msgtype) - } - msg = msg[skip:] - obj := make(map[string][]string) - for len(msg) > 0 { - k, skip, err := decodeString(msg) - if err != nil { - return nil, err - } - msg = msg[skip:] - values, skip, err := decodeList(msg) - if err != nil { - return nil, err - } - msg = msg[skip:] - obj[k] = values - } - return obj, nil -} - -var DecodeList = decodeList - -func decodeList(msg string) ([]string, int, error) { - blob, skip, err := decodeString(msg) - if err != nil { - return nil, 0, err - } - var l []string - for len(blob) > 0 { - v, skipv, err := decodeString(blob) - if err != nil { - return nil, 0, err - } - l = append(l, v) - blob = blob[skipv:] - } - return l, skip, nil -} - -func decodeString(msg string) (string, int, error) { - parts := strings.SplitN(msg, ":", 2) - if len(parts) != 2 { - return "", 0, fmt.Errorf("invalid format: no column") - } - var length int - if l, err := strconv.ParseUint(parts[0], 10, 64); err != nil { - return "", 0, err - } else { - length = int(l) - } - if len(parts[1]) < length+1 { - return "", 0, fmt.Errorf("message '%s' is %d bytes, expected at least %d", parts[1], len(parts[1]), length+1) - } - payload := parts[1][:length+1] - if payload[length] != ',' { - return "", 0, fmt.Errorf("message is not comma-terminated") - } - return payload[:length], len(parts[0]) + 1 + length + 1, nil -} - -func decodeHeader(msg string) (int, int, error) { - if len(msg) < 4 { - return 0, 0, fmt.Errorf("message too small") - } - msgtype, err := strconv.ParseInt(msg[:3], 10, 32) - if err != nil { - return 0, 0, err - } - return int(msgtype), 4, nil -} diff --git a/beam/data/data_test.go b/beam/data/data_test.go deleted file mode 100644 index 9059922b3b..0000000000 --- a/beam/data/data_test.go +++ /dev/null @@ -1,129 +0,0 @@ -package data - -import ( - "strings" - "testing" -) - -func TestEncodeHelloWorld(t *testing.T) { - input := "hello world!" - output := encodeString(input) - expectedOutput := "12:hello world!," - if output != expectedOutput { - t.Fatalf("'%v' != '%v'", output, expectedOutput) - } -} - -func TestEncodeEmptyString(t *testing.T) { - input := "" - output := encodeString(input) - expectedOutput := "0:," - if output != expectedOutput { - t.Fatalf("'%v' != '%v'", output, expectedOutput) - } -} - -func TestEncodeEmptyList(t *testing.T) { - input := []string{} - output := encodeList(input) - expectedOutput := "0:," - if output != expectedOutput { - t.Fatalf("'%v' != '%v'", output, expectedOutput) - } -} - -func TestEncodeEmptyMap(t *testing.T) { - input := make(map[string][]string) - output := Encode(input) - expectedOutput := "000;" - if output != expectedOutput { - t.Fatalf("'%v' != '%v'", output, expectedOutput) - } -} - -func TestEncode1Key1Value(t *testing.T) { - input := make(map[string][]string) - input["hello"] = []string{"world"} - output := Encode(input) - expectedOutput := "000;5:hello,8:5:world,," - if output != expectedOutput { - t.Fatalf("'%v' != '%v'", output, expectedOutput) - } -} - -func TestEncode1Key2Value(t *testing.T) { - input := make(map[string][]string) - input["hello"] = []string{"beautiful", "world"} - output := Encode(input) - expectedOutput := "000;5:hello,20:9:beautiful,5:world,," - if output != expectedOutput { - t.Fatalf("'%v' != '%v'", output, expectedOutput) - } -} - -func TestEncodeEmptyValue(t *testing.T) { - input := make(map[string][]string) - input["foo"] = []string{} - output := Encode(input) - expectedOutput := "000;3:foo,0:," - if output != expectedOutput { - t.Fatalf("'%v' != '%v'", output, expectedOutput) - } -} - -func TestEncodeBinaryKey(t *testing.T) { - input := make(map[string][]string) - input["foo\x00bar\x7f"] = []string{} - output := Encode(input) - expectedOutput := "000;8:foo\x00bar\x7f,0:," - if output != expectedOutput { - t.Fatalf("'%v' != '%v'", output, expectedOutput) - } -} - -func TestEncodeBinaryValue(t *testing.T) { - input := make(map[string][]string) - input["foo\x00bar\x7f"] = []string{"\x01\x02\x03\x04"} - output := Encode(input) - expectedOutput := "000;8:foo\x00bar\x7f,7:4:\x01\x02\x03\x04,," - if output != expectedOutput { - t.Fatalf("'%v' != '%v'", output, expectedOutput) - } -} - -func TestDecodeString(t *testing.T) { - validEncodedStrings := []struct { - input string - output string - skip int - }{ - {"3:foo,", "foo", 6}, - {"5:hello,", "hello", 8}, - {"5:hello,5:world,", "hello", 8}, - } - for _, sample := range validEncodedStrings { - output, skip, err := decodeString(sample.input) - if err != nil { - t.Fatalf("error decoding '%v': %v", sample.input, err) - } - if skip != sample.skip { - t.Fatalf("invalid skip: %v!=%v", skip, sample.skip) - } - if output != sample.output { - t.Fatalf("invalid output: %v!=%v", output, sample.output) - } - } -} - -func TestDecode1Key1Value(t *testing.T) { - input := "000;3:foo,6:3:bar,," - output, err := Decode(input) - if err != nil { - t.Fatal(err) - } - if v, exists := output["foo"]; !exists { - t.Fatalf("wrong output: %v\n", output) - } else if len(v) != 1 || strings.Join(v, "") != "bar" { - t.Fatalf("wrong output: %v\n", output) - } -} diff --git a/beam/data/message.go b/beam/data/message.go deleted file mode 100644 index 193fb7b241..0000000000 --- a/beam/data/message.go +++ /dev/null @@ -1,93 +0,0 @@ -package data - -import ( - "fmt" - "strings" -) - -type Message string - -func Empty() Message { - return Message(Encode(nil)) -} - -func Parse(args []string) Message { - data := make(map[string][]string) - for _, word := range args { - if strings.Contains(word, "=") { - kv := strings.SplitN(word, "=", 2) - key := kv[0] - var val string - if len(kv) == 2 { - val = kv[1] - } - data[key] = []string{val} - } - } - return Message(Encode(data)) -} - -func (m Message) Add(k, v string) Message { - data, err := Decode(string(m)) - if err != nil { - return m - } - if values, exists := data[k]; exists { - data[k] = append(values, v) - } else { - data[k] = []string{v} - } - return Message(Encode(data)) -} - -func (m Message) Set(k string, v ...string) Message { - data, err := Decode(string(m)) - if err != nil { - panic(err) - return m - } - data[k] = v - return Message(Encode(data)) -} - -func (m Message) Del(k string) Message { - data, err := Decode(string(m)) - if err != nil { - panic(err) - return m - } - delete(data, k) - return Message(Encode(data)) -} - -func (m Message) Get(k string) []string { - data, err := Decode(string(m)) - if err != nil { - return nil - } - v, exists := data[k] - if !exists { - return nil - } - return v -} - -func (m Message) Pretty() string { - data, err := Decode(string(m)) - if err != nil { - return "" - } - entries := make([]string, 0, len(data)) - for k, values := range data { - entries = append(entries, fmt.Sprintf("%s=%s", k, strings.Join(values, ","))) - } - return strings.Join(entries, " ") -} - -func (m Message) String() string { - return string(m) -} - -func (m Message) Bytes() []byte { - return []byte(m) -} diff --git a/beam/data/message_test.go b/beam/data/message_test.go deleted file mode 100644 index 7685769069..0000000000 --- a/beam/data/message_test.go +++ /dev/null @@ -1,53 +0,0 @@ -package data - -import ( - "testing" -) - -func TestEmptyMessage(t *testing.T) { - m := Empty() - if m.String() != Encode(nil) { - t.Fatalf("%v != %v", m.String(), Encode(nil)) - } -} - -func TestSetMessage(t *testing.T) { - m := Empty().Set("foo", "bar") - output := m.String() - expectedOutput := "000;3:foo,6:3:bar,," - if output != expectedOutput { - t.Fatalf("'%v' != '%v'", output, expectedOutput) - } - decodedOutput, err := Decode(output) - if err != nil { - t.Fatal(err) - } - if len(decodedOutput) != 1 { - t.Fatalf("wrong output data: %#v\n", decodedOutput) - } -} - -func TestSetMessageTwice(t *testing.T) { - m := Empty().Set("foo", "bar").Set("ga", "bu") - output := m.String() - expectedOutput := "000;3:foo,6:3:bar,,2:ga,5:2:bu,," - if output != expectedOutput { - t.Fatalf("'%v' != '%v'", output, expectedOutput) - } - decodedOutput, err := Decode(output) - if err != nil { - t.Fatal(err) - } - if len(decodedOutput) != 2 { - t.Fatalf("wrong output data: %#v\n", decodedOutput) - } -} - -func TestSetDelMessage(t *testing.T) { - m := Empty().Set("foo", "bar").Del("foo") - output := m.String() - expectedOutput := Encode(nil) - if output != expectedOutput { - t.Fatalf("'%v' != '%v'", output, expectedOutput) - } -} diff --git a/beam/data/netstring.txt b/beam/data/netstring.txt deleted file mode 100644 index 17560929b6..0000000000 --- a/beam/data/netstring.txt +++ /dev/null @@ -1,92 +0,0 @@ -## -## Netstrings spec copied as-is from http://cr.yp.to/proto/netstrings.txt -## - -Netstrings -D. J. Bernstein, djb@pobox.com -19970201 - - -1. Introduction - - A netstring is a self-delimiting encoding of a string. Netstrings are - very easy to generate and to parse. Any string may be encoded as a - netstring; there are no restrictions on length or on allowed bytes. - Another virtue of a netstring is that it declares the string size up - front. Thus an application can check in advance whether it has enough - space to store the entire string. - - Netstrings may be used as a basic building block for reliable network - protocols. Most high-level protocols, in effect, transmit a sequence - of strings; those strings may be encoded as netstrings and then - concatenated into a sequence of characters, which in turn may be - transmitted over a reliable stream protocol such as TCP. - - Note that netstrings can be used recursively. The result of encoding - a sequence of strings is a single string. A series of those encoded - strings may in turn be encoded into a single string. And so on. - - In this document, a string of 8-bit bytes may be written in two - different forms: as a series of hexadecimal numbers between angle - brackets, or as a sequence of ASCII characters between double quotes. - For example, <68 65 6c 6c 6f 20 77 6f 72 6c 64 21> is a string of - length 12; it is the same as the string "hello world!". - - Although this document restricts attention to strings of 8-bit bytes, - netstrings could be used with any 6-bit-or-larger character set. - - -2. Definition - - Any string of 8-bit bytes may be encoded as [len]":"[string]",". - Here [string] is the string and [len] is a nonempty sequence of ASCII - digits giving the length of [string] in decimal. The ASCII digits are - <30> for 0, <31> for 1, and so on up through <39> for 9. Extra zeros - at the front of [len] are prohibited: [len] begins with <30> exactly - when [string] is empty. - - For example, the string "hello world!" is encoded as <31 32 3a 68 - 65 6c 6c 6f 20 77 6f 72 6c 64 21 2c>, i.e., "12:hello world!,". The - empty string is encoded as "0:,". - - [len]":"[string]"," is called a netstring. [string] is called the - interpretation of the netstring. - - -3. Sample code - - The following C code starts with a buffer buf of length len and - prints it as a netstring. - - if (printf("%lu:",len) < 0) barf(); - if (fwrite(buf,1,len,stdout) < len) barf(); - if (putchar(',') < 0) barf(); - - The following C code reads a netstring and decodes it into a - dynamically allocated buffer buf of length len. - - if (scanf("%9lu",&len) < 1) barf(); /* >999999999 bytes is bad */ - if (getchar() != ':') barf(); - buf = malloc(len + 1); /* malloc(0) is not portable */ - if (!buf) barf(); - if (fread(buf,1,len,stdin) < len) barf(); - if (getchar() != ',') barf(); - - Both of these code fragments assume that the local character set is - ASCII, and that the relevant stdio streams are in binary mode. - - -4. Security considerations - - The famous Finger security hole may be blamed on Finger's use of the - CRLF encoding. In that encoding, each string is simply terminated by - CRLF. This encoding has several problems. Most importantly, it does - not declare the string size in advance. This means that a correct - CRLF parser must be prepared to ask for more and more memory as it is - reading the string. In the case of Finger, a lazy implementor found - this to be too much trouble; instead he simply declared a fixed-size - buffer and used C's gets() function. The rest is history. - - In contrast, as the above sample code shows, it is very easy to - handle netstrings without risking buffer overflow. Thus widespread - use of netstrings may improve network security. diff --git a/beam/examples/beamsh/beamsh b/beam/examples/beamsh/beamsh deleted file mode 100755 index 9bfe78ef4a..0000000000 Binary files a/beam/examples/beamsh/beamsh and /dev/null differ diff --git a/beam/examples/beamsh/beamsh.go b/beam/examples/beamsh/beamsh.go deleted file mode 100644 index 808f038c68..0000000000 --- a/beam/examples/beamsh/beamsh.go +++ /dev/null @@ -1,542 +0,0 @@ -package main - -import ( - "bufio" - "flag" - "fmt" - "github.com/dotcloud/docker/pkg/beam" - "github.com/dotcloud/docker/pkg/beam/data" - "github.com/dotcloud/docker/pkg/dockerscript" - "github.com/dotcloud/docker/pkg/term" - "io" - "net" - "net/url" - "os" - "path" - "strings" - "sync" -) - -var rootPlugins = []string{ - "stdio", -} - -var ( - flX bool - flPing bool - introspect beam.ReceiveSender = beam.Devnull() -) - -func main() { - fd3 := os.NewFile(3, "beam-introspect") - if introsp, err := beam.FileConn(fd3); err == nil { - introspect = introsp - Logf("introspection enabled\n") - } else { - Logf("introspection disabled\n") - } - fd3.Close() - flag.BoolVar(&flX, "x", false, "print commands as they are being executed") - flag.Parse() - if flag.NArg() == 0 { - if term.IsTerminal(0) { - // No arguments, stdin is terminal --> interactive mode - input := bufio.NewScanner(os.Stdin) - for { - fmt.Printf("[%d] beamsh> ", os.Getpid()) - if !input.Scan() { - break - } - line := input.Text() - if len(line) != 0 { - cmd, err := dockerscript.Parse(strings.NewReader(line)) - if err != nil { - fmt.Fprintf(os.Stderr, "error: %v\n", err) - continue - } - if err := executeRootScript(cmd); err != nil { - Fatal(err) - } - } - if err := input.Err(); err == io.EOF { - break - } else if err != nil { - Fatal(err) - } - } - } else { - // No arguments, stdin not terminal --> batch mode - script, err := dockerscript.Parse(os.Stdin) - if err != nil { - Fatal("parse error: %v\n", err) - } - if err := executeRootScript(script); err != nil { - Fatal(err) - } - } - } else { - // 1+ arguments: parse them as script files - for _, scriptpath := range flag.Args() { - f, err := os.Open(scriptpath) - if err != nil { - Fatal(err) - } - script, err := dockerscript.Parse(f) - if err != nil { - Fatal("parse error: %v\n", err) - } - if err := executeRootScript(script); err != nil { - Fatal(err) - } - } - } -} - -func executeRootScript(script []*dockerscript.Command) error { - if len(rootPlugins) > 0 { - // If there are root plugins, wrap the script inside them - var ( - rootCmd *dockerscript.Command - lastCmd *dockerscript.Command - ) - for _, plugin := range rootPlugins { - pluginCmd := &dockerscript.Command{ - Args: []string{plugin}, - } - if rootCmd == nil { - rootCmd = pluginCmd - } else { - lastCmd.Children = []*dockerscript.Command{pluginCmd} - } - lastCmd = pluginCmd - } - lastCmd.Children = script - script = []*dockerscript.Command{rootCmd} - } - handlers, err := Handlers(introspect) - if err != nil { - return err - } - defer handlers.Close() - var tasks sync.WaitGroup - defer func() { - Debugf("Waiting for introspection...\n") - tasks.Wait() - Debugf("DONE Waiting for introspection\n") - }() - if introspect != nil { - tasks.Add(1) - go func() { - Debugf("starting introspection\n") - defer Debugf("done with introspection\n") - defer tasks.Done() - introspect.Send(data.Empty().Set("cmd", "log", "stdout").Set("message", "introspection worked!").Bytes(), nil) - Debugf("XXX starting reading introspection messages\n") - r := beam.NewRouter(handlers) - r.NewRoute().All().Handler(func(p []byte, a *os.File) error { - Logf("[INTROSPECTION] %s\n", beam.MsgDesc(p, a)) - return handlers.Send(p, a) - }) - n, err := beam.Copy(r, introspect) - Debugf("XXX done reading %d introspection messages: %v\n", n, err) - }() - } - if err := executeScript(handlers, script); err != nil { - return err - } - return nil -} - -func executeScript(out beam.Sender, script []*dockerscript.Command) error { - Debugf("executeScript(%s)\n", scriptString(script)) - defer Debugf("executeScript(%s) DONE\n", scriptString(script)) - var background sync.WaitGroup - defer background.Wait() - for _, cmd := range script { - if cmd.Background { - background.Add(1) - go func(out beam.Sender, cmd *dockerscript.Command) { - executeCommand(out, cmd) - background.Done() - }(out, cmd) - } else { - if err := executeCommand(out, cmd); err != nil { - return err - } - } - } - return nil -} - -// 1) Find a handler for the command (if no handler, fail) -// 2) Attach new in & out pair to the handler -// 3) [in the background] Copy handler output to our own output -// 4) [in the background] Run the handler -// 5) Recursively executeScript() all children commands and wait for them to complete -// 6) Wait for handler to return and (shortly afterwards) output copy to complete -// 7) Profit -func executeCommand(out beam.Sender, cmd *dockerscript.Command) error { - if flX { - fmt.Printf("+ %v\n", strings.Replace(strings.TrimRight(cmd.String(), "\n"), "\n", "\n+ ", -1)) - } - Debugf("executeCommand(%s)\n", strings.Join(cmd.Args, " ")) - defer Debugf("executeCommand(%s) DONE\n", strings.Join(cmd.Args, " ")) - if len(cmd.Args) == 0 { - return fmt.Errorf("empty command") - } - Debugf("[executeCommand] sending job '%s'\n", strings.Join(cmd.Args, " ")) - job, err := beam.SendConn(out, data.Empty().Set("cmd", cmd.Args...).Set("type", "job").Bytes()) - if err != nil { - return fmt.Errorf("%v\n", err) - } - var tasks sync.WaitGroup - tasks.Add(1) - Debugf("[executeCommand] spawning background copy of the output of '%s'\n", strings.Join(cmd.Args, " ")) - go func() { - if out != nil { - Debugf("[executeCommand] background copy of the output of '%s'\n", strings.Join(cmd.Args, " ")) - n, err := beam.Copy(out, job) - if err != nil { - Fatalf("[executeCommand] [%s] error during background copy: %v\n", strings.Join(cmd.Args, " "), err) - } - Debugf("[executeCommand] background copy done of the output of '%s': copied %d messages\n", strings.Join(cmd.Args, " "), n) - } - tasks.Done() - }() - // depth-first execution of children commands - // executeScript() blocks until all commands are completed - Debugf("[executeCommand] recursively running children of '%s'\n", strings.Join(cmd.Args, " ")) - executeScript(job, cmd.Children) - Debugf("[executeCommand] DONE recursively running children of '%s'\n", strings.Join(cmd.Args, " ")) - job.CloseWrite() - Debugf("[executeCommand] closing the input of '%s' (all children are completed)\n", strings.Join(cmd.Args, " ")) - Debugf("[executeCommand] waiting for background copy of '%s' to complete...\n", strings.Join(cmd.Args, " ")) - tasks.Wait() - Debugf("[executeCommand] background copy of '%s' complete! This means the job completed.\n", strings.Join(cmd.Args, " ")) - return nil -} - -type Handler func([]string, io.Writer, io.Writer, beam.Receiver, beam.Sender) - -func Handlers(sink beam.Sender) (*beam.UnixConn, error) { - var tasks sync.WaitGroup - pub, priv, err := beam.USocketPair() - if err != nil { - return nil, err - } - go func() { - defer func() { - Debugf("[handlers] closewrite() on endpoint\n") - // FIXME: this is not yet necessary but will be once - // there is synchronization over standard beam messages - priv.CloseWrite() - Debugf("[handlers] done closewrite() on endpoint\n") - }() - r := beam.NewRouter(sink) - r.NewRoute().HasAttachment().KeyIncludes("type", "job").Handler(func(payload []byte, attachment *os.File) error { - conn, err := beam.FileConn(attachment) - if err != nil { - attachment.Close() - return err - } - // attachment.Close() - tasks.Add(1) - go func() { - defer tasks.Done() - defer func() { - Debugf("[handlers] '%s' closewrite\n", payload) - conn.CloseWrite() - Debugf("[handlers] '%s' done closewrite\n", payload) - }() - cmd := data.Message(payload).Get("cmd") - Debugf("[handlers] received %s\n", strings.Join(cmd, " ")) - if len(cmd) == 0 { - return - } - handler := GetHandler(cmd[0]) - if handler == nil { - return - } - stdout, err := beam.SendRPipe(conn, data.Empty().Set("cmd", "log", "stdout").Set("fromcmd", cmd...).Bytes()) - if err != nil { - return - } - defer stdout.Close() - stderr, err := beam.SendRPipe(conn, data.Empty().Set("cmd", "log", "stderr").Set("fromcmd", cmd...).Bytes()) - if err != nil { - return - } - defer stderr.Close() - Debugf("[handlers] calling %s\n", strings.Join(cmd, " ")) - handler(cmd, stdout, stderr, beam.Receiver(conn), beam.Sender(conn)) - Debugf("[handlers] returned: %s\n", strings.Join(cmd, " ")) - }() - return nil - }) - beam.Copy(r, priv) - Debugf("[handlers] waiting for all tasks\n") - tasks.Wait() - Debugf("[handlers] all tasks returned\n") - }() - return pub, nil -} - -func GetHandler(name string) Handler { - if name == "logger" { - return CmdLogger - } else if name == "render" { - return CmdRender - } else if name == "devnull" { - return CmdDevnull - } else if name == "prompt" { - return CmdPrompt - } else if name == "stdio" { - return CmdStdio - } else if name == "echo" { - return CmdEcho - } else if name == "pass" { - return CmdPass - } else if name == "in" { - return CmdIn - } else if name == "exec" { - return CmdExec - } else if name == "trace" { - return CmdTrace - } else if name == "emit" { - return CmdEmit - } else if name == "print" { - return CmdPrint - } else if name == "multiprint" { - return CmdMultiprint - } else if name == "listen" { - return CmdListen - } else if name == "beamsend" { - return CmdBeamsend - } else if name == "beamreceive" { - return CmdBeamreceive - } else if name == "connect" { - return CmdConnect - } else if name == "openfile" { - return CmdOpenfile - } else if name == "spawn" { - return CmdSpawn - } else if name == "chdir" { - return CmdChdir - } - return nil -} - -// VARIOUS HELPER FUNCTIONS: - -func connToFile(conn net.Conn) (f *os.File, err error) { - if connWithFile, ok := conn.(interface { - File() (*os.File, error) - }); !ok { - return nil, fmt.Errorf("no file descriptor available") - } else { - f, err = connWithFile.File() - if err != nil { - return nil, err - } - } - return f, err -} - -type Msg struct { - payload []byte - attachment *os.File -} - -func Logf(msg string, args ...interface{}) (int, error) { - if len(msg) == 0 || msg[len(msg)-1] != '\n' { - msg = msg + "\n" - } - msg = fmt.Sprintf("[%v] [%v] %s", os.Getpid(), path.Base(os.Args[0]), msg) - return fmt.Printf(msg, args...) -} - -func Debugf(msg string, args ...interface{}) { - if os.Getenv("BEAMDEBUG") != "" { - Logf(msg, args...) - } -} - -func Fatalf(msg string, args ...interface{}) { - Logf(msg, args...) - os.Exit(1) -} - -func Fatal(args ...interface{}) { - Fatalf("%v", args[0]) -} - -func scriptString(script []*dockerscript.Command) string { - lines := make([]string, 0, len(script)) - for _, cmd := range script { - line := strings.Join(cmd.Args, " ") - if len(cmd.Children) > 0 { - line += fmt.Sprintf(" { %s }", scriptString(cmd.Children)) - } else { - line += " {}" - } - lines = append(lines, line) - } - return fmt.Sprintf("'%s'", strings.Join(lines, "; ")) -} - -func dialer(addr string) (chan net.Conn, error) { - u, err := url.Parse(addr) - if err != nil { - return nil, err - } - connections := make(chan net.Conn) - go func() { - defer close(connections) - for { - conn, err := net.Dial(u.Scheme, u.Host) - if err != nil { - return - } - connections <- conn - } - }() - return connections, nil -} - -func listener(addr string) (chan net.Conn, error) { - u, err := url.Parse(addr) - if err != nil { - return nil, err - } - l, err := net.Listen(u.Scheme, u.Host) - if err != nil { - return nil, err - } - connections := make(chan net.Conn) - go func() { - defer close(connections) - for { - conn, err := l.Accept() - if err != nil { - return - } - Logf("new connection\n") - connections <- conn - } - }() - return connections, nil -} - -func SendToConn(connections chan net.Conn, src beam.Receiver) error { - var tasks sync.WaitGroup - defer tasks.Wait() - for { - payload, attachment, err := src.Receive() - if err == io.EOF { - return nil - } else if err != nil { - return err - } - conn, ok := <-connections - if !ok { - break - } - Logf("Sending %s\n", msgDesc(payload, attachment)) - tasks.Add(1) - go func(payload []byte, attachment *os.File, conn net.Conn) { - defer tasks.Done() - if _, err := conn.Write([]byte(data.EncodeString(string(payload)))); err != nil { - return - } - if attachment == nil { - conn.Close() - return - } - var iotasks sync.WaitGroup - iotasks.Add(2) - go func(attachment *os.File, conn net.Conn) { - defer iotasks.Done() - Debugf("copying the connection to [%d]\n", attachment.Fd()) - io.Copy(attachment, conn) - attachment.Close() - Debugf("done copying the connection to [%d]\n", attachment.Fd()) - }(attachment, conn) - go func(attachment *os.File, conn net.Conn) { - defer iotasks.Done() - Debugf("copying [%d] to the connection\n", attachment.Fd()) - io.Copy(conn, attachment) - conn.Close() - Debugf("done copying [%d] to the connection\n", attachment.Fd()) - }(attachment, conn) - iotasks.Wait() - }(payload, attachment, conn) - } - return nil -} - -func msgDesc(payload []byte, attachment *os.File) string { - return beam.MsgDesc(payload, attachment) -} - -func ReceiveFromConn(connections chan net.Conn, dst beam.Sender) error { - for conn := range connections { - err := func() error { - Logf("parsing message from network...\n") - defer Logf("done parsing message from network\n") - buf := make([]byte, 4098) - n, err := conn.Read(buf) - if n == 0 { - conn.Close() - if err == io.EOF { - return nil - } else { - return err - } - } - Logf("decoding message from '%s'\n", buf[:n]) - header, skip, err := data.DecodeString(string(buf[:n])) - if err != nil { - conn.Close() - return err - } - pub, priv, err := beam.SocketPair() - if err != nil { - return err - } - Logf("decoded message: %s\n", data.Message(header).Pretty()) - go func(skipped []byte, conn net.Conn, f *os.File) { - // this closes both conn and f - if len(skipped) > 0 { - if _, err := f.Write(skipped); err != nil { - Logf("ERROR: %v\n", err) - f.Close() - conn.Close() - return - } - } - bicopy(conn, f) - }(buf[skip:n], conn, pub) - if err := dst.Send([]byte(header), priv); err != nil { - return err - } - return nil - }() - if err != nil { - Logf("Error reading from connection: %v\n", err) - } - } - return nil -} - -func bicopy(a, b io.ReadWriteCloser) { - var iotasks sync.WaitGroup - oneCopy := func(dst io.WriteCloser, src io.Reader) { - defer iotasks.Done() - io.Copy(dst, src) - dst.Close() - } - iotasks.Add(2) - go oneCopy(a, b) - go oneCopy(b, a) - iotasks.Wait() -} diff --git a/beam/examples/beamsh/builtins.go b/beam/examples/beamsh/builtins.go deleted file mode 100644 index 3242237cc1..0000000000 --- a/beam/examples/beamsh/builtins.go +++ /dev/null @@ -1,441 +0,0 @@ -package main - -import ( - "bufio" - "fmt" - "github.com/dotcloud/docker/pkg/beam" - "github.com/dotcloud/docker/pkg/beam/data" - "github.com/dotcloud/docker/pkg/term" - "github.com/dotcloud/docker/utils" - "io" - "net" - "net/url" - "os" - "os/exec" - "path" - "strings" - "sync" - "text/template" -) - -func CmdLogger(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - if err := os.MkdirAll("logs", 0700); err != nil { - fmt.Fprintf(stderr, "%v\n", err) - return - } - var tasks sync.WaitGroup - defer tasks.Wait() - var n int = 1 - r := beam.NewRouter(out) - r.NewRoute().HasAttachment().KeyStartsWith("cmd", "log").Handler(func(payload []byte, attachment *os.File) error { - tasks.Add(1) - go func(n int) { - defer tasks.Done() - defer attachment.Close() - var streamname string - if cmd := data.Message(payload).Get("cmd"); len(cmd) == 1 || cmd[1] == "stdout" { - streamname = "stdout" - } else { - streamname = cmd[1] - } - if fromcmd := data.Message(payload).Get("fromcmd"); len(fromcmd) != 0 { - streamname = fmt.Sprintf("%s-%s", strings.Replace(strings.Join(fromcmd, "_"), "/", "_", -1), streamname) - } - logfile, err := os.OpenFile(path.Join("logs", fmt.Sprintf("%d-%s", n, streamname)), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0700) - if err != nil { - fmt.Fprintf(stderr, "%v\n", err) - return - } - defer logfile.Close() - io.Copy(logfile, attachment) - logfile.Sync() - }(n) - n++ - return nil - }).Tee(out) - if _, err := beam.Copy(r, in); err != nil { - fmt.Fprintf(stderr, "%v\n", err) - return - } -} - -func CmdRender(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - if len(args) != 2 { - fmt.Fprintf(stderr, "Usage: %s FORMAT\n", args[0]) - out.Send(data.Empty().Set("status", "1").Bytes(), nil) - return - } - txt := args[1] - if !strings.HasSuffix(txt, "\n") { - txt += "\n" - } - t := template.Must(template.New("render").Parse(txt)) - for { - payload, attachment, err := in.Receive() - if err != nil { - return - } - msg, err := data.Decode(string(payload)) - if err != nil { - fmt.Fprintf(stderr, "decode error: %v\n") - } - if err := t.Execute(stdout, msg); err != nil { - fmt.Fprintf(stderr, "rendering error: %v\n", err) - out.Send(data.Empty().Set("status", "1").Bytes(), nil) - return - } - if err := out.Send(payload, attachment); err != nil { - return - } - } -} - -func CmdDevnull(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - for { - _, attachment, err := in.Receive() - if err != nil { - return - } - if attachment != nil { - attachment.Close() - } - } -} - -func CmdPrompt(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - if len(args) < 2 { - fmt.Fprintf(stderr, "usage: %s PROMPT...\n", args[0]) - return - } - if !term.IsTerminal(0) { - fmt.Fprintf(stderr, "can't prompt: no tty available...\n") - return - } - fmt.Printf("%s: ", strings.Join(args[1:], " ")) - oldState, _ := term.SaveState(0) - term.DisableEcho(0, oldState) - line, _, err := bufio.NewReader(os.Stdin).ReadLine() - if err != nil { - fmt.Fprintln(stderr, err.Error()) - return - } - val := string(line) - fmt.Printf("\n") - term.RestoreTerminal(0, oldState) - out.Send(data.Empty().Set("fromcmd", args...).Set("value", val).Bytes(), nil) -} - -func CmdStdio(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - var tasks sync.WaitGroup - defer tasks.Wait() - - r := beam.NewRouter(out) - r.NewRoute().HasAttachment().KeyStartsWith("cmd", "log").Handler(func(payload []byte, attachment *os.File) error { - tasks.Add(1) - go func() { - defer tasks.Done() - defer attachment.Close() - io.Copy(os.Stdout, attachment) - attachment.Close() - }() - return nil - }).Tee(out) - - if _, err := beam.Copy(r, in); err != nil { - Fatal(err) - fmt.Fprintf(stderr, "%v\n", err) - return - } -} - -func CmdEcho(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - fmt.Fprintln(stdout, strings.Join(args[1:], " ")) -} - -func CmdPass(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - for { - payload, attachment, err := in.Receive() - if err != nil { - return - } - if err := out.Send(payload, attachment); err != nil { - if attachment != nil { - attachment.Close() - } - return - } - } -} - -func CmdSpawn(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - c := exec.Command(utils.SelfPath()) - r, w, err := os.Pipe() - if err != nil { - fmt.Fprintf(stderr, "%v\n", err) - return - } - c.Stdin = r - c.Stdout = stdout - c.Stderr = stderr - go func() { - fmt.Fprintf(w, strings.Join(args[1:], " ")) - w.Sync() - w.Close() - }() - if err := c.Run(); err != nil { - fmt.Fprintf(stderr, "%v\n", err) - return - } -} - -func CmdIn(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - os.Chdir(args[1]) - GetHandler("pass")([]string{"pass"}, stdout, stderr, in, out) -} - -func CmdExec(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - cmd := exec.Command(args[1], args[2:]...) - cmd.Stdout = stdout - cmd.Stderr = stderr - //cmd.Stdin = os.Stdin - local, remote, err := beam.SocketPair() - if err != nil { - fmt.Fprintf(stderr, "%v\n", err) - return - } - child, err := beam.FileConn(local) - if err != nil { - local.Close() - remote.Close() - fmt.Fprintf(stderr, "%v\n", err) - return - } - local.Close() - cmd.ExtraFiles = append(cmd.ExtraFiles, remote) - - var tasks sync.WaitGroup - tasks.Add(1) - go func() { - defer Debugf("done copying to child\n") - defer tasks.Done() - defer child.CloseWrite() - beam.Copy(child, in) - }() - - tasks.Add(1) - go func() { - defer Debugf("done copying from child %d\n") - defer tasks.Done() - r := beam.NewRouter(out) - r.NewRoute().All().Handler(func(p []byte, a *os.File) error { - return out.Send(data.Message(p).Set("pid", fmt.Sprintf("%d", cmd.Process.Pid)).Bytes(), a) - }) - beam.Copy(r, child) - }() - execErr := cmd.Run() - // We can close both ends of the socket without worrying about data stuck in the buffer, - // because unix socket writes are fully synchronous. - child.Close() - tasks.Wait() - var status string - if execErr != nil { - status = execErr.Error() - } else { - status = "ok" - } - out.Send(data.Empty().Set("status", status).Set("cmd", args...).Bytes(), nil) -} - -func CmdTrace(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - r := beam.NewRouter(out) - r.NewRoute().All().Handler(func(payload []byte, attachment *os.File) error { - var sfd string = "nil" - if attachment != nil { - sfd = fmt.Sprintf("%d", attachment.Fd()) - } - fmt.Printf("===> %s [%s]\n", data.Message(payload).Pretty(), sfd) - out.Send(payload, attachment) - return nil - }) - beam.Copy(r, in) -} - -func CmdEmit(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - out.Send(data.Parse(args[1:]).Bytes(), nil) -} - -func CmdPrint(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - for { - payload, a, err := in.Receive() - if err != nil { - return - } - // Skip commands - if a != nil && data.Message(payload).Get("cmd") == nil { - dup, err := beam.SendRPipe(out, payload) - if err != nil { - a.Close() - return - } - io.Copy(io.MultiWriter(os.Stdout, dup), a) - dup.Close() - } else { - if err := out.Send(payload, a); err != nil { - return - } - } - } -} - -func CmdMultiprint(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - var tasks sync.WaitGroup - defer tasks.Wait() - r := beam.NewRouter(out) - multiprint := func(p []byte, a *os.File) error { - tasks.Add(1) - go func() { - defer tasks.Done() - defer a.Close() - msg := data.Message(string(p)) - input := bufio.NewScanner(a) - for input.Scan() { - fmt.Printf("[%s] %s\n", msg.Pretty(), input.Text()) - } - }() - return nil - } - r.NewRoute().KeyIncludes("type", "job").Passthrough(out) - r.NewRoute().HasAttachment().Handler(multiprint).Tee(out) - beam.Copy(r, in) -} - -func CmdListen(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - if len(args) != 2 { - out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil) - return - } - u, err := url.Parse(args[1]) - if err != nil { - out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) - return - } - l, err := net.Listen(u.Scheme, u.Host) - if err != nil { - out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) - return - } - for { - conn, err := l.Accept() - if err != nil { - out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) - return - } - f, err := connToFile(conn) - if err != nil { - conn.Close() - continue - } - out.Send(data.Empty().Set("type", "socket").Set("remoteaddr", conn.RemoteAddr().String()).Bytes(), f) - } -} - -func CmdBeamsend(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - if len(args) < 2 { - if err := out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil { - Fatal(err) - } - return - } - var connector func(string) (chan net.Conn, error) - connector = dialer - connections, err := connector(args[1]) - if err != nil { - out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) - return - } - // Copy in to conn - SendToConn(connections, in) -} - -func CmdBeamreceive(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - if len(args) != 2 { - if err := out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil { - Fatal(err) - } - return - } - var connector func(string) (chan net.Conn, error) - connector = listener - connections, err := connector(args[1]) - if err != nil { - out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) - return - } - // Copy in to conn - ReceiveFromConn(connections, out) -} - -func CmdConnect(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - if len(args) != 2 { - out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil) - return - } - u, err := url.Parse(args[1]) - if err != nil { - out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) - return - } - var tasks sync.WaitGroup - for { - _, attachment, err := in.Receive() - if err != nil { - break - } - if attachment == nil { - continue - } - Logf("connecting to %s/%s\n", u.Scheme, u.Host) - conn, err := net.Dial(u.Scheme, u.Host) - if err != nil { - out.Send(data.Empty().Set("cmd", "msg", "connect error: "+err.Error()).Bytes(), nil) - return - } - out.Send(data.Empty().Set("cmd", "msg", "connection established").Bytes(), nil) - tasks.Add(1) - go func(attachment *os.File, conn net.Conn) { - defer tasks.Done() - // even when successful, conn.File() returns a duplicate, - // so we must close the original - var iotasks sync.WaitGroup - iotasks.Add(2) - go func(attachment *os.File, conn net.Conn) { - defer iotasks.Done() - io.Copy(attachment, conn) - }(attachment, conn) - go func(attachment *os.File, conn net.Conn) { - defer iotasks.Done() - io.Copy(conn, attachment) - }(attachment, conn) - iotasks.Wait() - conn.Close() - attachment.Close() - }(attachment, conn) - } - tasks.Wait() -} - -func CmdOpenfile(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - for _, name := range args { - f, err := os.Open(name) - if err != nil { - continue - } - if err := out.Send(data.Empty().Set("path", name).Set("type", "file").Bytes(), f); err != nil { - f.Close() - } - } -} - -func CmdChdir(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { - os.Chdir(args[1]) -} diff --git a/beam/examples/beamsh/scripts/bug0.ds b/beam/examples/beamsh/scripts/bug0.ds deleted file mode 100755 index 89b75230be..0000000000 --- a/beam/examples/beamsh/scripts/bug0.ds +++ /dev/null @@ -1,3 +0,0 @@ -#!/usr/bin/env beamsh - -exec ls -l diff --git a/beam/examples/beamsh/scripts/bug1.ds b/beam/examples/beamsh/scripts/bug1.ds deleted file mode 100755 index 2d8a9e2ed9..0000000000 --- a/beam/examples/beamsh/scripts/bug1.ds +++ /dev/null @@ -1,5 +0,0 @@ -#!/usr/bin/env beamsh - -trace { - exec ls -l -} diff --git a/beam/examples/beamsh/scripts/bug2.ds b/beam/examples/beamsh/scripts/bug2.ds deleted file mode 100755 index 08f0431f68..0000000000 --- a/beam/examples/beamsh/scripts/bug2.ds +++ /dev/null @@ -1,7 +0,0 @@ -#!/usr/bin/env beamsh - -trace { - stdio { - exec ls -l - } -} diff --git a/beam/examples/beamsh/scripts/bug3.ds b/beam/examples/beamsh/scripts/bug3.ds deleted file mode 100755 index 7bb8694d49..0000000000 --- a/beam/examples/beamsh/scripts/bug3.ds +++ /dev/null @@ -1,10 +0,0 @@ -#!/usr/bin/env beamsh -x - -trace outer { - # stdio fails - stdio { - trace inner { - exec ls -l - } - } -} diff --git a/beam/examples/beamsh/scripts/bug4.ds b/beam/examples/beamsh/scripts/bug4.ds deleted file mode 100755 index b7beedbae2..0000000000 --- a/beam/examples/beamsh/scripts/bug4.ds +++ /dev/null @@ -1,9 +0,0 @@ -#!/usr/bin/env beamsh - -stdio { - trace { - stdio { - exec ls -l - } - } -} diff --git a/beam/examples/beamsh/scripts/bug5.ds b/beam/examples/beamsh/scripts/bug5.ds deleted file mode 100755 index 9f9a85515d..0000000000 --- a/beam/examples/beamsh/scripts/bug5.ds +++ /dev/null @@ -1,6 +0,0 @@ -#!/usr/bin/env beamsh - -stdio { - # exec fails - exec ls -l -} diff --git a/beam/examples/beamsh/scripts/bug6.ds b/beam/examples/beamsh/scripts/bug6.ds deleted file mode 100755 index 90281401cd..0000000000 --- a/beam/examples/beamsh/scripts/bug6.ds +++ /dev/null @@ -1,7 +0,0 @@ -#!/usr/bin/env beamsh - -stdio { - trace { - echo hello - } -} diff --git a/beam/examples/beamsh/scripts/bug7.ds b/beam/examples/beamsh/scripts/bug7.ds deleted file mode 100755 index b6e7bd9201..0000000000 --- a/beam/examples/beamsh/scripts/bug7.ds +++ /dev/null @@ -1,6 +0,0 @@ -#!/usr/bin/env beamsh - -stdio { - # exec fails - echo hello world -} diff --git a/beam/examples/beamsh/scripts/demo1.ds b/beam/examples/beamsh/scripts/demo1.ds deleted file mode 100755 index 20a3359f3a..0000000000 --- a/beam/examples/beamsh/scripts/demo1.ds +++ /dev/null @@ -1,9 +0,0 @@ -#!/usr/bin/env beamsh - -devnull { - multiprint { - exec tail -f /var/log/system.log & - exec ls -l - exec ls ksdhfkjdshf jksdfhkjsdhf - } -} diff --git a/beam/examples/beamsh/scripts/helloworld.ds b/beam/examples/beamsh/scripts/helloworld.ds deleted file mode 100755 index 32e59b062e..0000000000 --- a/beam/examples/beamsh/scripts/helloworld.ds +++ /dev/null @@ -1,8 +0,0 @@ -#!/usr/bin/env beamsh - -print { - trace { - emit msg=hello - emit msg=world - } -} diff --git a/beam/examples/beamsh/scripts/logdemo.ds b/beam/examples/beamsh/scripts/logdemo.ds deleted file mode 100755 index 8b729a966f..0000000000 --- a/beam/examples/beamsh/scripts/logdemo.ds +++ /dev/null @@ -1,9 +0,0 @@ -#!/usr/bin/env beamsh - -trace { - log { - exec ls -l - exec ls /tmp/jhsdfkjhsdjkfhsdjkfhsdjkkhsdjkf - echo hello world - } -} diff --git a/beam/examples/beamsh/scripts/miniserver.ds b/beam/examples/beamsh/scripts/miniserver.ds deleted file mode 100755 index 9707477ee0..0000000000 --- a/beam/examples/beamsh/scripts/miniserver.ds +++ /dev/null @@ -1,9 +0,0 @@ -#!/usr/bin/env beamsh - -multiprint { - trace { - listen tcp://localhost:7676 & - listen tcp://localhost:8787 & - } -} - diff --git a/beam/handler.go b/beam/handler.go deleted file mode 100644 index 086a391c02..0000000000 --- a/beam/handler.go +++ /dev/null @@ -1,31 +0,0 @@ -package beam - -import ( - "fmt" -) - -type Handler func(msg *Message) error - -func (h Handler) Send(msg *Message) (Receiver, error) { - var ret Receiver - if RetPipe.Equals(msg.Ret) { - ret, msg.Ret = Pipe() - } - go func() { - // Ret must always be a valid Sender, so handlers - // can safely send to it - if msg.Ret == nil { - msg.Ret = NopSender{} - } - err := h(msg) - if err != nil { - Obj(msg.Ret).Error("%v", err) - } - msg.Ret.Close() - }() - return ret, nil -} - -func (h Handler) Close() error { - return fmt.Errorf("can't close") -} diff --git a/beam/http2/README.md b/beam/http2/README.md deleted file mode 100644 index 92ea4c8cdf..0000000000 --- a/beam/http2/README.md +++ /dev/null @@ -1,8 +0,0 @@ -This package defines a remote transport for Beam services using http2/spdy and tls. - -Uses https://github.com/docker/spdystream - -Pointers: - - * Low-level protocol framer: http://code.google.com/p/go.net/spdy - * (incomplete) high-level server implementation: https://github.com/shykes/spdy-go diff --git a/beam/http2/listener.go b/beam/http2/listener.go deleted file mode 100644 index 8b17bf2b08..0000000000 --- a/beam/http2/listener.go +++ /dev/null @@ -1,86 +0,0 @@ -package http2 - -import ( - "github.com/docker/libswarm/beam" - "github.com/docker/spdystream" - "net" - "sync" -) - -type ListenSession struct { - listener net.Listener - streamChan chan *spdystream.Stream - streamLock sync.RWMutex - subStreamChans map[string]chan *spdystream.Stream - auth Authenticator -} - -func NewListenSession(listener net.Listener, auth Authenticator) (*ListenSession, error) { - return &ListenSession{ - listener: listener, - streamChan: make(chan *spdystream.Stream), - subStreamChans: make(map[string]chan *spdystream.Stream), - auth: auth, - }, nil -} - -func (l *ListenSession) streamHandler(stream *spdystream.Stream) { - streamChan := l.getStreamChan(stream.Parent()) - streamChan <- stream -} - -func (l *ListenSession) addStreamChan(stream *spdystream.Stream, streamChan chan *spdystream.Stream) { - l.streamLock.Lock() - l.subStreamChans[stream.String()] = streamChan - l.streamLock.Unlock() -} - -func (l *ListenSession) getStreamChan(stream *spdystream.Stream) chan *spdystream.Stream { - if stream == nil { - return l.streamChan - } - l.streamLock.RLock() - defer l.streamLock.RUnlock() - streamChan, ok := l.subStreamChans[stream.String()] - if ok { - return streamChan - } - return l.streamChan -} - -func (l *ListenSession) Serve() { - for { - conn, err := l.listener.Accept() - if err != nil { - // TODO log - break - } - - go func() { - authHandler, authErr := l.auth(conn) - if authErr != nil { - // TODO log - conn.Close() - return - } - - spdyConn, spdyErr := spdystream.NewConnection(conn, true) - if spdyErr != nil { - // TODO log - conn.Close() - return - } - - go spdyConn.Serve(l.streamHandler, authHandler) - }() - } -} - -func (l *ListenSession) Shutdown() error { - return l.listener.Close() -} - -func (l *ListenSession) Receive(mode int) (*beam.Message, error) { - stream := <-l.streamChan - return createStreamMessage(stream, mode, l, nil) -} diff --git a/beam/http2/listener_test.go b/beam/http2/listener_test.go deleted file mode 100644 index 2d4a8a9432..0000000000 --- a/beam/http2/listener_test.go +++ /dev/null @@ -1,83 +0,0 @@ -package http2 - -import ( - "github.com/docker/libswarm/beam" - "io" - "net" - "testing" -) - -func TestListenSession(t *testing.T) { - listen := "localhost:7743" - listener, listenErr := net.Listen("tcp", listen) - if listenErr != nil { - t.Fatalf("Error creating listener: %s", listenErr) - } - - session, sessionErr := NewListenSession(listener, NoAuthenticator) - if sessionErr != nil { - t.Fatalf("Error creating session: %s", sessionErr) - } - - go session.Serve() - - end := make(chan bool) - go exerciseServer(t, listen, end) - - msg, msgErr := session.Receive(beam.Ret) - if msgErr != nil { - t.Fatalf("Error receiving message: %s", msgErr) - } - if msg.Att == nil { - t.Fatalf("Error message missing attachment") - } - if msg.Verb != beam.Attach { - t.Fatalf("Wrong verb\nActual: %s\nExpecting: %s", msg.Verb, beam.Attach) - } - - receiver, sendErr := msg.Ret.Send(&beam.Message{Verb: beam.Ack}) - if sendErr != nil { - t.Fatalf("Error sending return message: %s", sendErr) - } - _, ackErr := receiver.Receive(0) - if ackErr == nil { - t.Fatalf("No error receiving from message with no return pipe") - } - if ackErr != io.EOF { - t.Fatalf("Unexpected error receiving from message: %s", ackErr) - } - - <-end - shutdownErr := session.Shutdown() - if shutdownErr != nil { - t.Fatalf("Error shutting down: %s", shutdownErr) - } -} - -func exerciseServer(t *testing.T, server string, endChan chan bool) { - defer close(endChan) - - conn, connErr := net.Dial("tcp", server) - if connErr != nil { - t.Fatalf("Error dialing server: %s", connErr) - } - - session, sessionErr := NewStreamSession(conn) - if sessionErr != nil { - t.Fatalf("Error creating session: %s", sessionErr) - } - - receiver, sendErr := session.Send(&beam.Message{Verb: beam.Attach, Ret: beam.RetPipe}) - if sendErr != nil { - t.Fatalf("Error sending message: %s", sendErr) - } - - msg, receiveErr := receiver.Receive(beam.Ret) - if receiveErr != nil { - t.Fatalf("Error receiving message") - } - - if msg.Verb != beam.Ack { - t.Fatalf("Wrong verb\nActual: %s\nExpecting: %s", msg.Verb, beam.Ack) - } -} diff --git a/beam/http2/server.go b/beam/http2/server.go deleted file mode 100644 index 4ec20d3272..0000000000 --- a/beam/http2/server.go +++ /dev/null @@ -1,66 +0,0 @@ -package http2 - -import ( - "github.com/docker/libswarm/beam" - "github.com/docker/spdystream" - "net" - "sync" -) - -// Serve a Beam endpoint over a single HTTP2 connection -type Server struct { - conn *spdystream.Connection - streamChan chan *spdystream.Stream - streamLock sync.RWMutex - subStreamChans map[string]chan *spdystream.Stream -} - -// Create a Beam receiver from a net.Conn -func NewServer(conn net.Conn) (*Server, error) { - spdyConn, err := spdystream.NewConnection(conn, true) - if err != nil { - return nil, err - } - - s := &Server{ - conn: spdyConn, - streamChan: make(chan *spdystream.Stream), - subStreamChans: make(map[string]chan *spdystream.Stream), - } - go s.conn.Serve(s.streamHandler, spdystream.NoAuthHandler) - - return s, nil -} - -func (s *Server) Close() error { - return s.conn.Close() -} - -func (s *Server) Receive(mode int) (*beam.Message, error) { - stream := <-s.streamChan - return createStreamMessage(stream, mode, s, nil) -} - -func (s *Server) streamHandler(stream *spdystream.Stream) { - streamChan := s.getStreamChan(stream.Parent()) - streamChan <- stream -} - -func (s *Server) addStreamChan(stream *spdystream.Stream, streamChan chan *spdystream.Stream) { - s.streamLock.Lock() - s.subStreamChans[stream.String()] = streamChan - s.streamLock.Unlock() -} - -func (s *Server) getStreamChan(stream *spdystream.Stream) chan *spdystream.Stream { - if stream == nil { - return s.streamChan - } - s.streamLock.RLock() - defer s.streamLock.RUnlock() - streamChan, ok := s.subStreamChans[stream.String()] - if ok { - return streamChan - } - return s.streamChan -} diff --git a/beam/http2/spdy.go b/beam/http2/spdy.go deleted file mode 100644 index 293cc229ab..0000000000 --- a/beam/http2/spdy.go +++ /dev/null @@ -1,109 +0,0 @@ -package http2 - -import ( - "encoding/base64" - "fmt" - "github.com/docker/libswarm/beam" - "github.com/docker/libswarm/beam/data" - "github.com/docker/spdystream" - "io" - "net" - "net/http" - "os" - "syscall" -) - -type Authenticator func(conn net.Conn) (spdystream.AuthHandler, error) - -func NoAuthenticator(conn net.Conn) (spdystream.AuthHandler, error) { - return func(header http.Header, slot uint8, parent uint32) bool { - return true - }, nil -} - -type streamChanProvider interface { - addStreamChan(stream *spdystream.Stream, streamChan chan *spdystream.Stream) - getStreamChan(stream *spdystream.Stream) chan *spdystream.Stream -} - -func encodeArgs(args []string) string { - encoded := data.Encode(map[string][]string{"args": args}) - return base64.URLEncoding.EncodeToString([]byte(encoded)) -} - -func decodeArgs(argString string) ([]string, error) { - decoded, decodeErr := base64.URLEncoding.DecodeString(argString) - if decodeErr != nil { - return []string{}, decodeErr - } - dataMap, dataErr := data.Decode(string(decoded)) - if dataErr != nil { - return []string{}, dataErr - } - return dataMap["args"], nil -} - -func createStreamMessage(stream *spdystream.Stream, mode int, streamChans streamChanProvider, ret beam.Sender) (*beam.Message, error) { - verbString := stream.Headers()["Verb"] - if len(verbString) != 1 { - if len(verbString) == 0 { - return nil, fmt.Errorf("Stream(%s) is missing verb header", stream) - } else { - return nil, fmt.Errorf("Stream(%s) has multiple verb headers", stream) - } - - } - verb, verbOk := verbs[verbString[0]] - if !verbOk { - return nil, fmt.Errorf("Unknown verb: %s", verbString[0]) - } - - var args []string - argString := stream.Headers()["Args"] - if len(argString) > 1 { - return nil, fmt.Errorf("Stream(%s) has multiple args headers", stream) - } - if len(argString) == 1 { - var err error - args, err = decodeArgs(argString[0]) - if err != nil { - return nil, err - } - } - - var attach *os.File - if !stream.IsFinished() { - socketFds, socketErr := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM|syscall.FD_CLOEXEC, 0) - if socketErr != nil { - return nil, socketErr - } - attach = os.NewFile(uintptr(socketFds[0]), "") - conn, connErr := net.FileConn(os.NewFile(uintptr(socketFds[1]), "")) - if connErr != nil { - return nil, connErr - } - - go func() { - io.Copy(conn, stream) - }() - go func() { - io.Copy(stream, conn) - }() - } - - retSender := ret - if retSender == nil || beam.RetPipe.Equals(retSender) { - retSender = &StreamSender{stream: stream, streamChans: streamChans} - } - - if mode&beam.Ret == 0 { - retSender.Close() - } - - return &beam.Message{ - Verb: verb, - Args: args, - Att: attach, - Ret: retSender, - }, nil -} diff --git a/beam/http2/stream.go b/beam/http2/stream.go deleted file mode 100644 index ac5c67364d..0000000000 --- a/beam/http2/stream.go +++ /dev/null @@ -1,166 +0,0 @@ -package http2 - -import ( - "fmt" - "github.com/docker/libswarm/beam" - "github.com/docker/spdystream" - "net" - "net/http" - "sync" -) - -var verbs = map[string]beam.Verb{ - "Ack": beam.Ack, - "Attach": beam.Attach, - "Connect": beam.Connect, - "Error": beam.Error, - "File": beam.File, - "Get": beam.Get, - "Log": beam.Log, - "Ls": beam.Ls, - "Set": beam.Set, - "Spawn": beam.Spawn, - "Start": beam.Start, - "Stop": beam.Stop, - "Watch": beam.Watch, -} - -// Only allows sending, no parent stream -type StreamSession struct { - conn *spdystream.Connection - - streamLock sync.Mutex - streamChan chan *spdystream.Stream - subStreamChans map[string]chan *spdystream.Stream -} - -func (s *StreamSession) addStreamChan(stream *spdystream.Stream, streamChan chan *spdystream.Stream) { - s.subStreamChans[stream.String()] = streamChan -} - -func (s *StreamSession) getStreamChan(stream *spdystream.Stream) chan *spdystream.Stream { - if stream == nil { - return s.streamChan - } - streamChan, ok := s.subStreamChans[stream.String()] - if ok { - return streamChan - } - return s.streamChan -} - -func (s *StreamSession) newStreamHandler(stream *spdystream.Stream) { - streamChan := s.getStreamChan(stream.Parent()) - streamChan <- stream -} - -func NewStreamSession(conn net.Conn) (*StreamSession, error) { - session := &StreamSession{ - streamChan: make(chan *spdystream.Stream), - subStreamChans: make(map[string]chan *spdystream.Stream), - } - - spdyConn, spdyErr := spdystream.NewConnection(conn, false) - if spdyErr != nil { - return nil, spdyErr - } - go spdyConn.Serve(session.newStreamHandler, spdystream.NoAuthHandler) - - session.conn = spdyConn - - return session, nil -} - -func (s *StreamSession) Send(msg *beam.Message) (ret beam.Receiver, err error) { - if msg.Att != nil { - return nil, fmt.Errorf("file attachment not yet implemented for spdy transport") - } - - var fin bool - if beam.RetPipe.Equals(msg.Ret) { - fin = false - } else { - fin = true - } - headers := http.Header{ - "Verb": []string{msg.Verb.String()}, - "Args": []string{encodeArgs(msg.Args)}, - } - stream, streamErr := s.conn.CreateStream(headers, nil, fin) - if streamErr != nil { - return nil, streamErr - } - - streamChan := make(chan *spdystream.Stream) - s.subStreamChans[stream.String()] = streamChan - - if beam.RetPipe.Equals(msg.Ret) { - ret = &StreamReceiver{stream: stream, streamChans: s} - } else { - ret = &beam.NopReceiver{} - } - return -} - -func (s *StreamSession) Close() error { - return s.conn.Close() -} - -type StreamReceiver struct { - stream *spdystream.Stream - streamChans streamChanProvider - ret beam.Sender -} - -func (s *StreamReceiver) Receive(mode int) (*beam.Message, error) { - waitErr := s.stream.Wait() - if waitErr != nil { - return nil, waitErr - } - streamChan := s.streamChans.getStreamChan(s.stream) - stream := <-streamChan - return createStreamMessage(stream, mode, s.streamChans, s.ret) -} - -type StreamSender struct { - stream *spdystream.Stream - streamChans streamChanProvider -} - -func (s *StreamSender) Send(msg *beam.Message) (ret beam.Receiver, err error) { - if msg.Att != nil { - return nil, fmt.Errorf("file attachment not yet implemented for spdy transport") - } - - var fin bool - if beam.RetPipe.Equals(msg.Ret) { - fin = false - } else { - fin = true - } - headers := http.Header{ - "Verb": []string{msg.Verb.String()}, - "Args": []string{encodeArgs(msg.Args)}, - } - - stream, streamErr := s.stream.CreateSubStream(headers, fin) - if streamErr != nil { - return nil, streamErr - } - - streamChan := make(chan *spdystream.Stream) - s.streamChans.addStreamChan(stream, streamChan) - - if beam.RetPipe.Equals(msg.Ret) { - ret = &StreamReceiver{stream: stream, streamChans: s.streamChans} - } else { - ret = beam.NopReceiver{} - } - - return -} - -func (s *StreamSender) Close() error { - // TODO Remove stream from stream chans - return s.stream.Close() -} diff --git a/beam/http2/stream_test.go b/beam/http2/stream_test.go deleted file mode 100644 index 8ae9d27a2b..0000000000 --- a/beam/http2/stream_test.go +++ /dev/null @@ -1,159 +0,0 @@ -package http2 - -import ( - //"bytes" - "github.com/docker/libswarm/beam" - //"github.com/docker/spdystream" - "io" - "net" - "testing" -) - -func TestBeamSession(t *testing.T) { - end := make(chan bool) - listen := "localhost:7543" - server, serverErr := runServer(listen, t, end) - if serverErr != nil { - t.Fatalf("Error initializing server: %s", serverErr) - } - - conn, connErr := net.Dial("tcp", listen) - if connErr != nil { - t.Fatalf("Error dialing server: %s", connErr) - } - - sender, senderErr := NewStreamSession(conn) - if senderErr != nil { - t.Fatalf("Error creating sender: %s", senderErr) - } - - // Ls interaction - receiver, sendErr := sender.Send(&beam.Message{Verb: beam.Ls, Ret: beam.RetPipe}) - if sendErr != nil { - t.Fatalf("Error sending beam message: %s", sendErr) - } - message, receiveErr := receiver.Receive(0) - if receiveErr != nil { - t.Fatalf("Error receiving beam message: %s", receiveErr) - } - if message.Verb != beam.Set { - t.Errorf("Unexpected message name:\nActual: %s\nExpected: %s", message.Verb, beam.Ls.String()) - } - if len(message.Args) != 3 { - t.Fatalf("Unexpected args length\nActual: %d\nExpected: %d", len(message.Args), 3) - } - if message.Args[0] != "file1" { - t.Errorf("Unexpected arg[0]\nActual: %s\nExpected: %s", message.Args[0], "file1") - } - if message.Args[1] != "file2" { - t.Errorf("Unexpected arg[0]\nActual: %s\nExpected: %s", message.Args[1], "file2") - } - if message.Args[2] != string([]byte{0x00, 0x00, 0x00}) { - t.Errorf("Unexpected arg[0]\nActual: %s\nExpected: %s", message.Args[2], []byte{0x00, 0x00, 0x00}) - } - - // Attach interactions - receiver, sendErr = sender.Send(&beam.Message{Verb: beam.Attach, Ret: beam.RetPipe}) - if sendErr != nil { - t.Fatalf("Error sending beam message: %s", sendErr) - } - message, receiveErr = receiver.Receive(beam.Ret) - if receiveErr != nil { - t.Fatalf("Error receiving beam message: %s", receiveErr) - } - if message.Verb != beam.Ack { - t.Errorf("Unexpected message name:\nActual: %s\nExpected: %s", message.Verb, beam.Ack.String()) - } - - // TODO full connect interaction - //if message.Att == nil { - // t.Fatalf("Missing attachment on message") - //} - - //testBytes := []byte("Hello") - //n, writeErr := message.Att.Write(testBytes) - //if writeErr != nil { - // t.Fatalf("Error writing bytes: %s", writeErr) - //} - //if n != 5 { - // t.Fatalf("Unexpected number of bytes read:\nActual: %d\nExpected: 5", n) - //} - - //buf := make([]byte, 10) - //n, readErr := message.Att.Read(buf) - //if readErr != nil { - // t.Fatalf("Error writing bytes: %s", readErr) - //} - //if n != 5 { - // t.Fatalf("Unexpected number of bytes read:\nActual: %d\nExpected: 5", n) - //} - //if bytes.Compare(buf[:n], testBytes) != 0 { - // t.Fatalf("Did not receive expected message:\nActual: %s\nExpectd: %s", buf, testBytes) - //} - - closeErr := server.Close() - if closeErr != nil { - t.Fatalf("Error closing server: %s", closeErr) - } - - closeErr = sender.Close() - if closeErr != nil { - t.Fatalf("Error closing sender: %s", closeErr) - } - <-end -} - -func runServer(listen string, t *testing.T, endChan chan bool) (io.Closer, error) { - listener, lErr := net.Listen("tcp", listen) - if lErr != nil { - return nil, lErr - } - - session, sessionErr := NewListenSession(listener, NoAuthenticator) - if sessionErr != nil { - t.Fatalf("Error creating session: %s", sessionErr) - } - - go session.Serve() - - go func() { - defer close(endChan) - // Ls exchange - message, receiveErr := session.Receive(beam.Ret) - if receiveErr != nil { - t.Fatalf("Error receiving on server: %s", receiveErr) - } - if message.Verb != beam.Ls { - t.Fatalf("Unexpected verb: %s", message.Verb) - } - receiver, sendErr := message.Ret.Send(&beam.Message{Verb: beam.Set, Args: []string{"file1", "file2", string([]byte{0x00, 0x00, 0x00})}}) - if sendErr != nil { - t.Fatalf("Error sending set message: %s", sendErr) - } - _, receiveErr = receiver.Receive(0) - if receiveErr == nil { - t.Fatalf("No error received from empty receiver") - } - if receiveErr != io.EOF { - t.Fatalf("Expected error from empty receiver: %s", receiveErr) - } - - // Connect exchange - message, receiveErr = session.Receive(beam.Ret) - if receiveErr != nil { - t.Fatalf("Error receiving on server: %s", receiveErr) - } - if message.Verb != beam.Attach { - t.Fatalf("Unexpected verb: %s", message.Verb) - } - receiver, sendErr = message.Ret.Send(&beam.Message{Verb: beam.Ack}) - if sendErr != nil { - t.Fatalf("Error sending set message: %s", sendErr) - } - - // TODO full connect interaction - - }() - - return listener, nil -} diff --git a/beam/inmem.go b/beam/inmem.go deleted file mode 100644 index 20593fb715..0000000000 --- a/beam/inmem.go +++ /dev/null @@ -1,206 +0,0 @@ -package beam - -import ( - "io" - "sync" -) - -func Pipe() (*PipeReceiver, *PipeSender) { - p := new(pipe) - p.rwait.L = &p.l - p.wwait.L = &p.l - r := &PipeReceiver{p} - w := &PipeSender{p} - return r, w -} - -type pipe struct { - rwait sync.Cond - wwait sync.Cond - l sync.Mutex - rl sync.Mutex - wl sync.Mutex - rerr error // if reader closed, error to give writes - werr error // if writer closed, error to give reads - msg *Message -} - -func (p *pipe) psend(msg *Message) error { - var err error - // One writer at a time. - p.wl.Lock() - defer p.wl.Unlock() - - p.l.Lock() - defer p.l.Unlock() - p.msg = msg - p.rwait.Signal() - for { - if p.msg == nil { - break - } - if p.rerr != nil { - err = p.rerr - break - } - if p.werr != nil { - err = io.ErrClosedPipe - } - p.wwait.Wait() - } - p.msg = nil // in case of rerr or werr - return err -} - -func (p *pipe) send(msg *Message) (ret Receiver, err error) { - // Prepare nested Receiver if requested - if RetPipe.Equals(msg.Ret) { - ret, msg.Ret = Pipe() - } - err = p.psend(msg) - return -} - -func (p *pipe) preceive() (*Message, error) { - p.rl.Lock() - defer p.rl.Unlock() - - p.l.Lock() - defer p.l.Unlock() - for { - if p.rerr != nil { - return nil, io.ErrClosedPipe - } - if p.msg != nil { - break - } - if p.werr != nil { - return nil, p.werr - } - p.rwait.Wait() - } - msg := p.msg - p.msg = nil - p.wwait.Signal() - return msg, nil -} - -func (p *pipe) receive(mode int) (*Message, error) { - msg, err := p.preceive() - if err != nil { - return nil, err - } - if msg.Ret == nil { - msg.Ret = NopSender{} - } - if mode&Ret == 0 { - msg.Ret.Close() - } - return msg, nil -} - -func (p *pipe) rclose(err error) { - if err == nil { - err = io.ErrClosedPipe - } - p.l.Lock() - defer p.l.Unlock() - p.rerr = err - p.rwait.Signal() - p.wwait.Signal() -} - -func (p *pipe) wclose(err error) { - if err == nil { - err = io.EOF - } - p.l.Lock() - defer p.l.Unlock() - p.werr = err - p.rwait.Signal() - p.wwait.Signal() -} - -// PipeReceiver - -type PipeReceiver struct { - p *pipe -} - -func (r *PipeReceiver) Receive(mode int) (*Message, error) { - return r.p.receive(mode) -} - -func (r *PipeReceiver) SendTo(dst Sender) (int, error) { - var n int - // If the destination is a PipeSender, we can cheat - pdst, ok := dst.(*PipeSender) - if !ok { - return 0, ErrIncompatibleSender - } - for { - pmsg, err := r.p.preceive() - if err == io.EOF { - break - } - if err != nil { - return n, err - } - if err := pdst.p.psend(pmsg); err != nil { - return n, err - } - } - n++ - return n, nil -} - -func (r *PipeReceiver) Close() error { - return r.CloseWithError(nil) -} - -func (r *PipeReceiver) CloseWithError(err error) error { - r.p.rclose(err) - return nil -} - -// PipeSender - -type PipeSender struct { - p *pipe -} - -func (w *PipeSender) Send(msg *Message) (Receiver, error) { - return w.p.send(msg) -} - -func (w *PipeSender) ReceiveFrom(src Receiver) (int, error) { - var n int - // If the destination is a PipeReceiver, we can cheat - psrc, ok := src.(*PipeReceiver) - if !ok { - return 0, ErrIncompatibleReceiver - } - for { - pmsg, err := psrc.p.preceive() - if err == io.EOF { - break - } - if err != nil { - return n, err - } - if err := w.p.psend(pmsg); err != nil { - return n, err - } - n++ - } - return n, nil -} - -func (w *PipeSender) Close() error { - return w.CloseWithError(nil) -} - -func (w *PipeSender) CloseWithError(err error) error { - w.p.wclose(err) - return nil -} diff --git a/beam/inmem_test.go b/beam/inmem_test.go deleted file mode 100644 index e3826e9148..0000000000 --- a/beam/inmem_test.go +++ /dev/null @@ -1,145 +0,0 @@ -package beam - -import ( - "fmt" - "github.com/dotcloud/docker/pkg/testutils" - "io/ioutil" - "os" - "testing" -) - -func TestInmemRetPipe(t *testing.T) { - r, w := Pipe() - defer r.Close() - defer w.Close() - wait := make(chan struct{}) - go func() { - defer close(wait) - ret, err := w.Send(&Message{Verb: Log, Args: []string{"hello"}, Ret: RetPipe}) - if err != nil { - t.Fatal(err) - } - msg, err := ret.Receive(0) - if err != nil { - t.Fatal(err) - } - if msg.Verb != Ack { - t.Fatalf("%#v", msg) - } - if msg.Args[0] != "this better not crash" { - t.Fatalf("%#v", msg) - } - }() - msg, err := r.Receive(Ret) - if err != nil { - t.Fatal(err) - } - if _, err := msg.Ret.Send(&Message{Verb: Ack, Args: []string{"this better not crash"}}); err != nil { - t.Fatal(err) - } - <-wait -} - -func TestSimpleSend(t *testing.T) { - r, w := Pipe() - defer r.Close() - defer w.Close() - testutils.Timeout(t, func() { - go func() { - msg, err := r.Receive(0) - if err != nil { - t.Fatal(err) - } - if msg.Verb != Log { - t.Fatalf("%#v", *msg) - } - if msg.Args[0] != "hello world" { - t.Fatalf("%#v", *msg) - } - }() - if _, err := w.Send(&Message{Verb: Log, Args: []string{"hello world"}}); err != nil { - t.Fatal(err) - } - }) -} - -func TestSendReply(t *testing.T) { - r, w := Pipe() - defer r.Close() - defer w.Close() - testutils.Timeout(t, func() { - // Send - go func() { - ret, err := w.Send(&Message{Args: []string{"this is the request"}, Ret: RetPipe}) - if err != nil { - t.Fatal(err) - } - if ret == nil { - t.Fatalf("ret = nil\n") - } - // Read for a reply - msg, err := ret.Receive(0) - if err != nil { - t.Fatal(err) - } - if msg.Args[0] != "this is the reply" { - t.Fatalf("%#v", msg) - } - }() - // Receive a message with mode=Ret - msg, err := r.Receive(Ret) - if err != nil { - t.Fatal(err) - } - if msg.Args[0] != "this is the request" { - t.Fatalf("%#v", msg) - } - if msg.Ret == nil { - t.Fatalf("%#v", msg) - } - // Send a reply - _, err = msg.Ret.Send(&Message{Args: []string{"this is the reply"}}) - if err != nil { - t.Fatal(err) - } - }) -} - -func TestSendFile(t *testing.T) { - r, w := Pipe() - defer r.Close() - defer w.Close() - tmp, err := ioutil.TempFile("", "beam-test-") - if err != nil { - t.Fatal(err) - } - defer os.RemoveAll(tmp.Name()) - fmt.Fprintf(tmp, "hello world\n") - tmp.Sync() - tmp.Seek(0, 0) - testutils.Timeout(t, func() { - go func() { - _, err := w.Send(&Message{Verb: File, Args: []string{"path=" + tmp.Name()}, Att: tmp}) - if err != nil { - t.Fatal(err) - } - }() - msg, err := r.Receive(0) - if err != nil { - t.Fatal(err) - } - if msg.Verb != File { - t.Fatalf("%#v", msg) - } - if msg.Args[0] != "path="+tmp.Name() { - t.Fatalf("%#v", msg) - } - txt, err := ioutil.ReadAll(msg.Att) - if err != nil { - t.Fatal(err) - } - if string(txt) != "hello world\n" { - t.Fatalf("%s\n", txt) - } - }) -} diff --git a/beam/message.go b/beam/message.go new file mode 100644 index 0000000000..eac5b23c76 --- /dev/null +++ b/beam/message.go @@ -0,0 +1,169 @@ +package beam + +import ( + "github.com/docker/libchan" + "github.com/docker/libchan/data" + + "fmt" + "os" +) + +type Message struct { + Verb + Args []string + Ret Sender + Att *os.File +} + +type Sender interface { + Send(msg *Message) (Receiver, error) + Close() error + Unwrap() libchan.Sender +} + +type Receiver interface { + Receive(mode int) (*Message, error) + Unwrap() libchan.Receiver +} + +type senderWrapper struct { + libchan.Sender +} + +func WrapSender(s libchan.Sender) Sender { + return &senderWrapper{s} +} + +func (s *senderWrapper) Send(msg *Message) (Receiver, error) { + recv, err := s.Sender.Send(msg.LibchanMessage()) + if err != nil { + return nil, err + } + return WrapReceiver(recv), err +} + +func (s *senderWrapper) Unwrap() libchan.Sender { + return s.Sender +} + +type receiverWrapper struct { + libchan.Receiver +} + +func WrapReceiver(r libchan.Receiver) Receiver { + return &receiverWrapper{r} +} + +func (r *receiverWrapper) Receive(mode int) (*Message, error) { + lcm, err := r.Receiver.Receive(mode) + if err != nil { + return nil, err + } + return DecodeLibchanMessage(lcm) +} + +func (r *receiverWrapper) Unwrap() libchan.Receiver { + return r.Receiver +} + +type senderUnwrapper struct { + Sender +} + +func (su *senderUnwrapper) Send(lcm *libchan.Message) (libchan.Receiver, error) { + msg, err := DecodeLibchanMessage(lcm) + if err != nil { + return nil, err + } + recv, err := su.Sender.Send(msg) + if err != nil { + return nil, err + } + return &receiverUnwrapper{recv}, nil +} + +type receiverUnwrapper struct { + Receiver +} + +func (ru *receiverUnwrapper) Receive(mode int) (*libchan.Message, error) { + msg, err := ru.Receiver.Receive(mode) + if err != nil { + return nil, err + } + return msg.LibchanMessage(), nil +} + +func Pipe() (Receiver, Sender) { + r, s := libchan.Pipe() + return WrapReceiver(r), WrapSender(s) +} + +func Copy(s Sender, r Receiver) (int, error) { + return libchan.Copy(s.Unwrap(), r.Unwrap()) +} + +func Handler(h func(msg *Message) error) Sender { + lch := libchan.Handler(func(lcm *libchan.Message) { + ret := WrapSender(lcm.Ret) + msg, err := DecodeLibchanMessage(lcm) + if err != nil { + ret.Send(&Message{Verb: Error, Args: []string{err.Error()}}) + } + if err = h(msg); err != nil { + ret.Send(&Message{Verb: Error, Args: []string{err.Error()}}) + } + }) + return WrapSender(lch) +} + +var RetPipe = WrapSender(libchan.RetPipe) +var Ret = libchan.Ret + +var notImplementedMsg = &Message{Verb: Error, Args: []string{"not implemented"}} +var NotImplemented = WrapSender(libchan.Repeater(notImplementedMsg.LibchanMessage())) + +func DecodeLibchanMessage(lcm *libchan.Message) (*Message, error) { + decoded, err := data.Decode(string(lcm.Data)) + if err != nil { + return nil, err + } + verbList, exists := decoded["verb"] + if !exists { + return nil, fmt.Errorf("No 'verb' key found in message data: %s", lcm.Data) + } + if len(verbList) != 1 { + return nil, fmt.Errorf("Expected exactly one verb, got %d: %#v", len(verbList), verbList) + } + verb, err := VerbFromString(verbList[0]) + if err != nil { + return nil, err + } + args, exists := decoded["args"] + if !exists { + return nil, fmt.Errorf("No 'args' key found in message data: %s", lcm.Data) + } + return &Message{ + Verb: verb, + Args: args, + Ret: WrapSender(lcm.Ret), + Att: lcm.Fd, + }, nil +} + +func (m *Message) LibchanMessage() *libchan.Message { + encoded := data.Empty(). + Set("verb", m.Verb.String()). + Set("args", m.Args...) + + var ret libchan.Sender + if m.Ret != nil { + ret = m.Ret.Unwrap() + } + + return &libchan.Message{ + Data: []byte(encoded), + Ret: ret, + Fd: m.Att, + } +} diff --git a/beam/nop.go b/beam/nop.go deleted file mode 100644 index 603d9b2ec5..0000000000 --- a/beam/nop.go +++ /dev/null @@ -1,21 +0,0 @@ -package beam - -import ( - "io" -) - -type NopSender struct{} - -func (s NopSender) Send(msg *Message) (Receiver, error) { - return NopReceiver{}, nil -} - -func (s NopSender) Close() error { - return nil -} - -type NopReceiver struct{} - -func (r NopReceiver) Receive(mode int) (*Message, error) { - return nil, io.EOF -} diff --git a/beam/server.go b/beam/server.go index 362a43adea..5db5f513aa 100644 --- a/beam/server.go +++ b/beam/server.go @@ -1,6 +1,8 @@ package beam import ( + "github.com/docker/libchan" + "fmt" ) @@ -70,3 +72,7 @@ func (s *Server) Send(msg *Message) (Receiver, error) { func (s *Server) Close() error { return fmt.Errorf("can't close") } + +func (s *Server) Unwrap() libchan.Sender { + return &senderUnwrapper{s} +} diff --git a/beam/unix/beam.go b/beam/unix/beam.go deleted file mode 100644 index 9e6dc90f1b..0000000000 --- a/beam/unix/beam.go +++ /dev/null @@ -1,166 +0,0 @@ -package unix - -import ( - "fmt" - "io" - "os" -) - -type Sender interface { - Send([]byte, *os.File) error -} - -type Receiver interface { - Receive() ([]byte, *os.File, error) -} - -type ReceiveCloser interface { - Receiver - Close() error -} - -type SendCloser interface { - Sender - Close() error -} - -type ReceiveSender interface { - Receiver - Sender -} - -const ( - R int = 1 << (32 - 1 - iota) - W -) - -func sendPipe(dst Sender, data []byte, mode int) (*os.File, error) { - r, w, err := os.Pipe() - if err != nil { - return nil, err - } - var ( - remote *os.File - local *os.File - ) - if mode == R { - remote = r - local = w - } else if mode == W { - remote = w - local = r - } - if err := dst.Send(data, remote); err != nil { - local.Close() - remote.Close() - return nil, err - } - return local, nil - -} - -// SendRPipe create a pipe and sends its *read* end attached in a beam message -// to `dst`, with `data` as the message payload. -// It returns the *write* end of the pipe, or an error. -func SendRPipe(dst Sender, data []byte) (*os.File, error) { - return sendPipe(dst, data, R) -} - -// SendWPipe create a pipe and sends its *read* end attached in a beam message -// to `dst`, with `data` as the message payload. -// It returns the *write* end of the pipe, or an error. -func SendWPipe(dst Sender, data []byte) (*os.File, error) { - return sendPipe(dst, data, W) -} - -func SendConn(dst Sender, data []byte) (conn *UnixConn, err error) { - local, remote, err := SocketPair() - if err != nil { - return nil, err - } - defer func() { - if err != nil { - local.Close() - remote.Close() - } - }() - conn, err = FileConn(local) - if err != nil { - return nil, err - } - local.Close() - if err := dst.Send(data, remote); err != nil { - return nil, err - } - return conn, nil -} - -func ReceiveConn(src Receiver) ([]byte, *UnixConn, error) { - for { - data, f, err := src.Receive() - if err != nil { - return nil, nil, err - } - if f == nil { - // Skip empty attachments - continue - } - conn, err := FileConn(f) - if err != nil { - // Skip beam attachments which are not connections - // (for example might be a regular file, directory etc) - continue - } - return data, conn, nil - } - panic("impossibru!") - return nil, nil, nil -} - -func Copy(dst Sender, src Receiver) (int, error) { - var n int - for { - payload, attachment, err := src.Receive() - if err == io.EOF { - return n, nil - } else if err != nil { - return n, err - } - if err := dst.Send(payload, attachment); err != nil { - if attachment != nil { - attachment.Close() - } - return n, err - } - n++ - } - panic("impossibru!") - return n, nil -} - -// MsgDesc returns a human readable description of a beam message, usually -// for debugging purposes. -func MsgDesc(payload []byte, attachment *os.File) string { - var filedesc string = "" - if attachment != nil { - filedesc = fmt.Sprintf("%d", attachment.Fd()) - } - return fmt.Sprintf("'%s'[%s]", payload, filedesc) -} - -type devnull struct{} - -func Devnull() ReceiveSender { - return devnull{} -} - -func (d devnull) Send(p []byte, a *os.File) error { - if a != nil { - a.Close() - } - return nil -} - -func (d devnull) Receive() ([]byte, *os.File, error) { - return nil, nil, io.EOF -} diff --git a/beam/unix/beam_test.go b/beam/unix/beam_test.go deleted file mode 100644 index 83bd91e0d3..0000000000 --- a/beam/unix/beam_test.go +++ /dev/null @@ -1,39 +0,0 @@ -package unix - -import ( - "github.com/dotcloud/docker/pkg/beam/data" - "testing" -) - -func TestSendConn(t *testing.T) { - a, b, err := USocketPair() - if err != nil { - t.Fatal(err) - } - defer a.Close() - defer b.Close() - go func() { - conn, err := SendConn(a, data.Empty().Set("type", "connection").Bytes()) - if err != nil { - t.Fatal(err) - } - if err := conn.Send(data.Empty().Set("foo", "bar").Bytes(), nil); err != nil { - t.Fatal(err) - } - conn.CloseWrite() - }() - payload, conn, err := ReceiveConn(b) - if err != nil { - t.Fatal(err) - } - if val := data.Message(string(payload)).Get("type"); val == nil || val[0] != "connection" { - t.Fatalf("%v != %v\n", val, "connection") - } - msg, _, err := conn.Receive() - if err != nil { - t.Fatal(err) - } - if val := data.Message(string(msg)).Get("foo"); val == nil || val[0] != "bar" { - t.Fatalf("%v != %v\n", val, "bar") - } -} diff --git a/beam/unix/conn.go b/beam/unix/conn.go deleted file mode 100644 index 2d0acef440..0000000000 --- a/beam/unix/conn.go +++ /dev/null @@ -1,144 +0,0 @@ -package unix - -import ( - "fmt" - "os" - "strconv" - - "github.com/docker/libswarm/beam" - "github.com/docker/libswarm/beam/data" -) - -func Pair() (*Conn, *Conn, error) { - c1, c2, err := USocketPair() - if err != nil { - return nil, nil, err - } - return &Conn{c1}, &Conn{c2}, nil -} - -type Conn struct { - *UnixConn -} - -func sendablePair() (conn *UnixConn, remoteFd *os.File, err error) { - // Get 2 *os.File - local, remote, err := SocketPair() - if err != nil { - return nil, nil, err - } - defer func() { - if err != nil { - local.Close() - remote.Close() - } - }() - // Convert 1 to *net.UnixConn - conn, err = FileConn(local) - if err != nil { - return nil, nil, err - } - local.Close() - // Return the "mismatched" pair - return conn, remote, nil -} - -// This implements beam.Sender.Close which *only closes the sender*. -// This is similar to the pattern of only closing go channels from -// the sender's side. -// If you want to close the entire connection, call Conn.UnixConn.Close. -func (c *Conn) Close() error { - return c.UnixConn.CloseWrite() -} - -func (c *Conn) Send(msg *beam.Message) (beam.Receiver, error) { - if msg.Att != nil { - return nil, fmt.Errorf("file attachment not yet implemented in unix transport") - } - parts := []string{fmt.Sprintf("%d", msg.Verb)} - parts = append(parts, msg.Args...) - b := []byte(data.EncodeList(parts)) - // Setup nested streams - var ( - fd *os.File - ret beam.Receiver - err error - ) - // Caller requested a return pipe - if beam.RetPipe.Equals(msg.Ret) { - local, remote, err := sendablePair() - if err != nil { - return nil, err - } - fd = remote - ret = &Conn{local} - // Caller specified its own return channel - } else if msg.Ret != nil { - // The specified return channel is a unix conn: engaging cheat mode! - if retConn, ok := msg.Ret.(*Conn); ok { - fd, err = retConn.UnixConn.File() - if err != nil { - return nil, fmt.Errorf("error passing return channel: %v", err) - } - // Close duplicate fd - retConn.UnixConn.Close() - // The specified return channel is an unknown type: proxy messages. - } else { - local, remote, err := sendablePair() - if err != nil { - return nil, fmt.Errorf("error passing return channel: %v", err) - } - fd = remote - // FIXME: do we need a reference no all these background tasks? - go func() { - // Copy messages from the remote return channel to the local return channel. - // When the remote return channel is closed, also close the local return channel. - localConn := &Conn{local} - beam.Copy(msg.Ret, localConn) - msg.Ret.Close() - localConn.Close() - }() - } - } - if err := c.UnixConn.Send(b, fd); err != nil { - return nil, err - } - return ret, nil -} - -func (c *Conn) Receive(mode int) (*beam.Message, error) { - b, fd, err := c.UnixConn.Receive() - if err != nil { - return nil, err - } - parts, n, err := data.DecodeList(string(b)) - if err != nil { - return nil, err - } - if n != len(b) { - return nil, fmt.Errorf("garbage data %#v", b[:n]) - } - if len(parts) == 0 { - return nil, fmt.Errorf("malformed message") - } - v, err := strconv.ParseUint(parts[0], 10, 32) - if err != nil { - return nil, err - } - msg := &beam.Message{Verb: beam.Verb(v), Args: parts[1:]} - - // Apply mode mask - if fd != nil { - subconn, err := FileConn(fd) - if err != nil { - return nil, err - } - fd.Close() - if mode&beam.Ret != 0 { - msg.Ret = &Conn{subconn} - } else { - subconn.CloseWrite() - } - } - return msg, nil -} diff --git a/beam/unix/conn_test.go b/beam/unix/conn_test.go deleted file mode 100644 index 09884cc6e7..0000000000 --- a/beam/unix/conn_test.go +++ /dev/null @@ -1,74 +0,0 @@ -package unix - -import ( - "github.com/docker/libswarm/beam" - "github.com/dotcloud/docker/pkg/testutils" - "testing" -) - -func TestPair(t *testing.T) { - r, w, err := Pair() - if err != nil { - t.Fatal("Unexpected error") - } - defer r.Close() - defer w.Close() - testutils.Timeout(t, func() { - go func() { - msg, err := r.Receive(0) - if err != nil { - t.Fatal(err) - } - if msg.Verb != beam.Log { - t.Fatalf("%#v", *msg) - } - if msg.Args[0] != "hello world" { - t.Fatalf("%#v", *msg) - } - }() - _, err := w.Send(&beam.Message{Verb: beam.Log, Args: []string{"hello world"}}) - if err != nil { - t.Fatal(err) - } - }) -} - -func TestSendReply(t *testing.T) { - r, w, err := Pair() - if err != nil { - t.Fatal(err) - } - defer r.Close() - defer w.Close() - testutils.Timeout(t, func() { - // Send - go func() { - // Send a message with mode=R - ret, err := w.Send(&beam.Message{Args: []string{"this is the request"}, Ret: beam.RetPipe}) - if err != nil { - t.Fatal(err) - } - // Read for a reply - msg, err := ret.Receive(0) - if err != nil { - t.Fatal(err) - } - if msg.Args[0] != "this is the reply" { - t.Fatalf("%#v", msg) - } - }() - // Receive a message with mode=W - msg, err := r.Receive(beam.Ret) - if err != nil { - t.Fatal(err) - } - if msg.Args[0] != "this is the request" { - t.Fatalf("%#v", msg) - } - // Send a reply - _, err = msg.Ret.Send(&beam.Message{Args: []string{"this is the reply"}}) - if err != nil { - t.Fatal(err) - } - }) -} diff --git a/beam/unix/unix.go b/beam/unix/unix.go deleted file mode 100644 index 594eb21b10..0000000000 --- a/beam/unix/unix.go +++ /dev/null @@ -1,317 +0,0 @@ -package unix - -import ( - "bufio" - "fmt" - "net" - "os" - "syscall" -) - -func debugCheckpoint(msg string, args ...interface{}) { - if os.Getenv("DEBUG") == "" { - return - } - os.Stdout.Sync() - tty, _ := os.OpenFile("/dev/tty", os.O_RDWR, 0700) - fmt.Fprintf(tty, msg, args...) - bufio.NewScanner(tty).Scan() - tty.Close() -} - -type UnixConn struct { - *net.UnixConn - fds []*os.File -} - -// Framing: -// In order to handle framing in Send/Recieve, as these give frame -// boundaries we use a very simple 4 bytes header. It is a big endiand -// uint32 where the high bit is set if the message includes a file -// descriptor. The rest of the uint32 is the length of the next frame. -// We need the bit in order to be able to assign recieved fds to -// the right message, as multiple messages may be coalesced into -// a single recieve operation. -func makeHeader(data []byte, fds []int) ([]byte, error) { - header := make([]byte, 4) - - length := uint32(len(data)) - - if length > 0x7fffffff { - return nil, fmt.Errorf("Data to large") - } - - if len(fds) != 0 { - length = length | 0x80000000 - } - header[0] = byte((length >> 24) & 0xff) - header[1] = byte((length >> 16) & 0xff) - header[2] = byte((length >> 8) & 0xff) - header[3] = byte((length >> 0) & 0xff) - - return header, nil -} - -func parseHeader(header []byte) (uint32, bool) { - length := uint32(header[0])<<24 | uint32(header[1])<<16 | uint32(header[2])<<8 | uint32(header[3]) - hasFd := length&0x80000000 != 0 - length = length & ^uint32(0x80000000) - - return length, hasFd -} - -func FileConn(f *os.File) (*UnixConn, error) { - conn, err := net.FileConn(f) - if err != nil { - return nil, err - } - uconn, ok := conn.(*net.UnixConn) - if !ok { - conn.Close() - return nil, fmt.Errorf("%d: not a unix connection", f.Fd()) - } - return &UnixConn{UnixConn: uconn}, nil - -} - -// Send sends a new message on conn with data and f as payload and -// attachment, respectively. -// On success, f is closed -func (conn *UnixConn) Send(data []byte, f *os.File) error { - { - var fd int = -1 - if f != nil { - fd = int(f.Fd()) - } - debugCheckpoint("===DEBUG=== about to send '%s'[%d]. Hit enter to confirm: ", data, fd) - } - var fds []int - if f != nil { - fds = append(fds, int(f.Fd())) - } - if err := conn.sendUnix(data, fds...); err != nil { - return err - } - - if f != nil { - f.Close() - } - return nil -} - -// Receive waits for a new message on conn, and receives its payload -// and attachment, or an error if any. -// -// If more than 1 file descriptor is sent in the message, they are all -// closed except for the first, which is the attachment. -// It is legal for a message to have no attachment or an empty payload. -func (conn *UnixConn) Receive() (rdata []byte, rf *os.File, rerr error) { - defer func() { - var fd int = -1 - if rf != nil { - fd = int(rf.Fd()) - } - debugCheckpoint("===DEBUG=== Receive() -> '%s'[%d]. Hit enter to continue.\n", rdata, fd) - }() - - // Read header - header := make([]byte, 4) - nRead := uint32(0) - - for nRead < 4 { - n, err := conn.receiveUnix(header[nRead:]) - if err != nil { - return nil, nil, err - } - nRead = nRead + uint32(n) - } - - length, hasFd := parseHeader(header) - - if hasFd { - if len(conn.fds) == 0 { - return nil, nil, fmt.Errorf("No expected file descriptor in message") - } - - rf = conn.fds[0] - conn.fds = conn.fds[1:] - } - - rdata = make([]byte, length) - - nRead = 0 - for nRead < length { - n, err := conn.receiveUnix(rdata[nRead:]) - if err != nil { - return nil, nil, err - } - nRead = nRead + uint32(n) - } - - return -} - -func (conn *UnixConn) receiveUnix(buf []byte) (int, error) { - oob := make([]byte, syscall.CmsgSpace(4)) - bufn, oobn, _, _, err := conn.ReadMsgUnix(buf, oob) - if err != nil { - return 0, err - } - fd := extractFd(oob[:oobn]) - if fd != -1 { - f := os.NewFile(uintptr(fd), "") - conn.fds = append(conn.fds, f) - } - - return bufn, nil -} - -func (conn *UnixConn) sendUnix(data []byte, fds ...int) error { - header, err := makeHeader(data, fds) - if err != nil { - return err - } - - // There is a bug in conn.WriteMsgUnix where it doesn't correctly return - // the number of bytes writte (http://code.google.com/p/go/issues/detail?id=7645) - // So, we can't rely on the return value from it. However, we must use it to - // send the fds. In order to handle this we only write one byte using WriteMsgUnix - // (when we have to), as that can only ever block or fully suceed. We then write - // the rest with conn.Write() - // The reader side should not rely on this though, as hopefully this gets fixed - // in go later. - written := 0 - if len(fds) != 0 { - oob := syscall.UnixRights(fds...) - wrote, _, err := conn.WriteMsgUnix(header[0:1], oob, nil) - if err != nil { - return err - } - written = written + wrote - } - - for written < len(header) { - wrote, err := conn.Write(header[written:]) - if err != nil { - return err - } - written = written + wrote - } - - written = 0 - for written < len(data) { - wrote, err := conn.Write(data[written:]) - if err != nil { - return err - } - written = written + wrote - } - - return nil -} - -func extractFd(oob []byte) int { - // Grab forklock to make sure no forks accidentally inherit the new - // fds before they are made CLOEXEC - // There is a slight race condition between ReadMsgUnix returns and - // when we grap the lock, so this is not perfect. Unfortunately - // There is no way to pass MSG_CMSG_CLOEXEC to recvmsg() nor any - // way to implement non-blocking i/o in go, so this is hard to fix. - syscall.ForkLock.Lock() - defer syscall.ForkLock.Unlock() - scms, err := syscall.ParseSocketControlMessage(oob) - if err != nil { - return -1 - } - - foundFd := -1 - for _, scm := range scms { - fds, err := syscall.ParseUnixRights(&scm) - if err != nil { - continue - } - - for _, fd := range fds { - if foundFd == -1 { - syscall.CloseOnExec(fd) - foundFd = fd - } else { - syscall.Close(fd) - } - } - } - - return foundFd -} - -func socketpair() ([2]int, error) { - return syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM|syscall.FD_CLOEXEC, 0) -} - -// SocketPair is a convenience wrapper around the socketpair(2) syscall. -// It returns a unix socket of type SOCK_STREAM in the form of 2 file descriptors -// not bound to the underlying filesystem. -// Messages sent on one end are received on the other, and vice-versa. -// It is the caller's responsibility to close both ends. -func SocketPair() (a *os.File, b *os.File, err error) { - defer func() { - var ( - fdA int = -1 - fdB int = -1 - ) - if a != nil { - fdA = int(a.Fd()) - } - if b != nil { - fdB = int(b.Fd()) - } - debugCheckpoint("===DEBUG=== SocketPair() = [%d-%d]. Hit enter to confirm: ", fdA, fdB) - }() - pair, err := socketpair() - if err != nil { - return nil, nil, err - } - return os.NewFile(uintptr(pair[0]), ""), os.NewFile(uintptr(pair[1]), ""), nil -} - -func USocketPair() (*UnixConn, *UnixConn, error) { - debugCheckpoint("===DEBUG=== USocketPair(). Hit enter to confirm: ") - defer debugCheckpoint("===DEBUG=== USocketPair() returned. Hit enter to confirm ") - a, b, err := SocketPair() - if err != nil { - return nil, nil, err - } - defer a.Close() - defer b.Close() - uA, err := FileConn(a) - if err != nil { - return nil, nil, err - } - uB, err := FileConn(b) - if err != nil { - uA.Close() - return nil, nil, err - } - return uA, uB, nil -} - -// FdConn wraps a file descriptor in a standard *net.UnixConn object, or -// returns an error if the file descriptor does not point to a unix socket. -// This creates a duplicate file descriptor. It's the caller's responsibility -// to close both. -func FdConn(fd int) (n *net.UnixConn, err error) { - { - debugCheckpoint("===DEBUG=== FdConn([%d]) = (unknown fd). Hit enter to confirm: ", fd) - } - f := os.NewFile(uintptr(fd), fmt.Sprintf("%d", fd)) - conn, err := net.FileConn(f) - if err != nil { - return nil, err - } - uconn, ok := conn.(*net.UnixConn) - if !ok { - conn.Close() - return nil, fmt.Errorf("%d: not a unix connection", fd) - } - return uconn, nil -} diff --git a/beam/unix/unix_test.go b/beam/unix/unix_test.go deleted file mode 100644 index 7f947760b3..0000000000 --- a/beam/unix/unix_test.go +++ /dev/null @@ -1,237 +0,0 @@ -package unix - -import ( - "fmt" - "io/ioutil" - "testing" -) - -func TestSocketPair(t *testing.T) { - a, b, err := SocketPair() - if err != nil { - t.Fatal(err) - } - go func() { - a.Write([]byte("hello world!")) - fmt.Printf("done writing. closing\n") - a.Close() - fmt.Printf("done closing\n") - }() - data, err := ioutil.ReadAll(b) - if err != nil { - t.Fatal(err) - } - fmt.Printf("--> %s\n", data) - fmt.Printf("still open: %v\n", a.Fd()) -} - -func TestUSocketPair(t *testing.T) { - a, b, err := USocketPair() - if err != nil { - t.Fatal(err) - } - - data := "hello world!" - go func() { - a.Write([]byte(data)) - a.Close() - }() - res := make([]byte, 1024) - size, err := b.Read(res) - if err != nil { - t.Fatal(err) - } - if size != len(data) { - t.Fatal("Unexpected size") - } - if string(res[0:size]) != data { - t.Fatal("Unexpected data") - } -} - -func TestSendUnixSocket(t *testing.T) { - a1, a2, err := USocketPair() - if err != nil { - t.Fatal(err) - } - // defer a1.Close() - // defer a2.Close() - b1, b2, err := USocketPair() - if err != nil { - t.Fatal(err) - } - // defer b1.Close() - // defer b2.Close() - glueA, glueB, err := SocketPair() - if err != nil { - t.Fatal(err) - } - // defer glueA.Close() - // defer glueB.Close() - go func() { - err := b2.Send([]byte("a"), glueB) - if err != nil { - t.Fatal(err) - } - }() - go func() { - err := a2.Send([]byte("b"), glueA) - if err != nil { - t.Fatal(err) - } - }() - connAhdr, connA, err := a1.Receive() - if err != nil { - t.Fatal(err) - } - if string(connAhdr) != "b" { - t.Fatalf("unexpected: %s", connAhdr) - } - connBhdr, connB, err := b1.Receive() - if err != nil { - t.Fatal(err) - } - if string(connBhdr) != "a" { - t.Fatalf("unexpected: %s", connBhdr) - } - fmt.Printf("received both ends: %v <-> %v\n", connA.Fd(), connB.Fd()) - go func() { - fmt.Printf("sending message on %v\n", connA.Fd()) - connA.Write([]byte("hello world")) - connA.Sync() - fmt.Printf("closing %v\n", connA.Fd()) - connA.Close() - }() - data, err := ioutil.ReadAll(connB) - if err != nil { - t.Fatal(err) - } - fmt.Printf("---> %s\n", data) - -} - -// Ensure we get proper segmenting of messages -func TestSendSegmenting(t *testing.T) { - a, b, err := USocketPair() - if err != nil { - t.Fatal(err) - } - defer a.Close() - defer b.Close() - - extrafd1, extrafd2, err := SocketPair() - if err != nil { - t.Fatal(err) - } - extrafd2.Close() - - go func() { - a.Send([]byte("message 1"), nil) - a.Send([]byte("message 2"), extrafd1) - a.Send([]byte("message 3"), nil) - }() - - msg1, file1, err := b.Receive() - if err != nil { - t.Fatal(err) - } - if string(msg1) != "message 1" { - t.Fatal("unexpected msg1:", string(msg1)) - } - if file1 != nil { - t.Fatal("unexpectedly got file1") - } - - msg2, file2, err := b.Receive() - if err != nil { - t.Fatal(err) - } - if string(msg2) != "message 2" { - t.Fatal("unexpected msg2:", string(msg2)) - } - if file2 == nil { - t.Fatal("didn't get file2") - } - file2.Close() - - msg3, file3, err := b.Receive() - if err != nil { - t.Fatal(err) - } - if string(msg3) != "message 3" { - t.Fatal("unexpected msg3:", string(msg3)) - } - if file3 != nil { - t.Fatal("unexpectedly got file3") - } - -} - -// Test sending a zero byte message -func TestSendEmpty(t *testing.T) { - a, b, err := USocketPair() - if err != nil { - t.Fatal(err) - } - defer a.Close() - defer b.Close() - go func() { - a.Send([]byte{}, nil) - }() - - msg, file, err := b.Receive() - if err != nil { - t.Fatal(err) - } - if len(msg) != 0 { - t.Fatalf("unexpected non-empty message: %v", msg) - } - if file != nil { - t.Fatal("unexpectedly got file") - } - -} - -func makeLarge(size int) []byte { - res := make([]byte, size) - for i := range res { - res[i] = byte(i % 255) - } - return res -} - -func verifyLarge(data []byte, size int) bool { - if len(data) != size { - return false - } - for i := range data { - if data[i] != byte(i%255) { - return false - } - } - return true -} - -// Test sending a large message -func TestSendLarge(t *testing.T) { - a, b, err := USocketPair() - if err != nil { - t.Fatal(err) - } - defer a.Close() - defer b.Close() - go func() { - a.Send(makeLarge(100000), nil) - }() - - msg, file, err := b.Receive() - if err != nil { - t.Fatal(err) - } - if !verifyLarge(msg, 100000) { - t.Fatalf("unexpected message (size %d)", len(msg)) - } - if file != nil { - t.Fatal("unexpectedly got file") - } -} diff --git a/beam/utils/buf.go b/beam/utils/buf.go deleted file mode 100644 index 226c1fea38..0000000000 --- a/beam/utils/buf.go +++ /dev/null @@ -1,17 +0,0 @@ -package utils - -import ( - "github.com/docker/libswarm/beam" -) - -type Buffer []*beam.Message - -func (buf *Buffer) Send(msg *beam.Message) (beam.Receiver, error) { - (*buf) = append(*buf, msg) - return beam.NopReceiver{}, nil -} - -func (buf *Buffer) Close() error { - (*buf) = nil - return nil -} diff --git a/beam/utils/queue.go b/beam/utils/queue.go deleted file mode 100644 index 9da2fad953..0000000000 --- a/beam/utils/queue.go +++ /dev/null @@ -1,41 +0,0 @@ -package utils - -import ( - "github.com/docker/libswarm/beam" -) - -type Queue struct { - *beam.PipeSender - dst beam.Sender - ch chan *beam.Message -} - -func NewQueue(dst beam.Sender, size int) *Queue { - r, w := beam.Pipe() - q := &Queue{ - PipeSender: w, - dst: dst, - ch: make(chan *beam.Message, size), - } - go func() { - defer close(q.ch) - for { - msg, err := r.Receive(beam.Ret) - if err != nil { - r.Close() - return - } - q.ch <- msg - } - }() - go func() { - for msg := range q.ch { - _, err := dst.Send(msg) - if err != nil { - r.Close() - return - } - } - }() - return q -} diff --git a/beam/utils/queue_test.go b/beam/utils/queue_test.go deleted file mode 100644 index 3319f4076a..0000000000 --- a/beam/utils/queue_test.go +++ /dev/null @@ -1,43 +0,0 @@ -package utils - -import ( - "github.com/docker/libswarm/beam" - "testing" -) - -func TestSendRet(t *testing.T) { - r, w := beam.Pipe() - defer r.Close() - defer w.Close() - q := NewQueue(w, 1) - defer q.Close() - ret, err := q.Send(&beam.Message{Verb: beam.Log, Args: []string{"ping"}, Ret: beam.RetPipe}) - if err != nil { - t.Fatal(err) - } - go func() { - ping, err := r.Receive(beam.Ret) - if err != nil { - t.Fatal(err) - } - if _, err := ping.Ret.Send(&beam.Message{Verb: beam.Log, Args: []string{"pong"}}); err != nil { - t.Fatal(err) - } - }() - pong, err := ret.Receive(0) - if err != nil { - t.Fatal(err) - } - if pong.Verb != beam.Log { - t.Fatal(err) - } -} - -func TestSendClose(t *testing.T) { - q := NewQueue(beam.NopSender{}, 1) - q.Send(&beam.Message{Verb: beam.Error, Args: []string{"hello"}}) - q.Close() - if _, err := q.Send(&beam.Message{Verb: beam.Ack, Args: []string{"again"}}); err == nil { - t.Fatal("send on closed queue should return an error") - } -} diff --git a/beam/utils/stack.go b/beam/utils/stack.go deleted file mode 100644 index 1d2c0188ce..0000000000 --- a/beam/utils/stack.go +++ /dev/null @@ -1,112 +0,0 @@ -package utils - -import ( - "container/list" - "fmt" - "github.com/docker/libswarm/beam" - "strings" - "sync" -) - -// StackSender forwards beam messages to a dynamic list of backend receivers. -// New backends are stacked on top. When a message is sent, each backend is -// tried until one succeeds. Any failing backends encountered along the way -// are removed from the queue. -type StackSender struct { - stack *list.List - l sync.RWMutex -} - -func NewStackSender() *StackSender { - stack := list.New() - return &StackSender{ - stack: stack, - } -} - -func (s *StackSender) Send(msg *beam.Message) (ret beam.Receiver, err error) { - completed := s.walk(func(h beam.Sender) (ok bool) { - ret, err = h.Send(msg) - fmt.Printf("[stacksender] sending %v to %#v returned %v\n", msg, h, err) - if err == nil { - return true - } - return false - }) - // If walk was completed, it means we didn't find a valid handler - if !completed { - return ret, err - } - // Silently drop messages if no valid backend is available. - return beam.NopSender{}.Send(msg) -} - -func (s *StackSender) Add(dst beam.Sender) *StackSender { - s.l.Lock() - defer s.l.Unlock() - prev := &StackSender{ - stack: list.New(), - } - prev.stack.PushFrontList(s.stack) - fmt.Printf("[ADD] prev %#v\n", prev) - s.stack.PushFront(dst) - return prev -} - -func (s *StackSender) Close() error { - s.walk(func(h beam.Sender) bool { - h.Close() - // remove all handlers - return false - }) - return nil -} - -func (s *StackSender) _walk(f func(*list.Element) bool) bool { - var e *list.Element - s.l.RLock() - e = s.stack.Front() - s.l.RUnlock() - for e != nil { - fmt.Printf("[StackSender.Walk] %v\n", e.Value.(beam.Sender)) - s.l.RLock() - next := e.Next() - s.l.RUnlock() - cont := f(e) - if !cont { - return false - } - e = next - } - return true -} - -func (s *StackSender) walk(f func(beam.Sender) bool) bool { - return s._walk(func(e *list.Element) bool { - ok := f(e.Value.(beam.Sender)) - if ok { - // Found a valid handler. Stop walking. - return false - } - // Invalid handler: remove. - s.l.Lock() - s.stack.Remove(e) - s.l.Unlock() - return true - }) -} - -func (s *StackSender) Len() int { - s.l.RLock() - defer s.l.RUnlock() - return s.stack.Len() -} - -func (s *StackSender) String() string { - var parts []string - s._walk(func(e *list.Element) bool { - parts = append(parts, fmt.Sprintf("%v", e.Value.(beam.Sender))) - return true - }) - return fmt.Sprintf("%d:[%s]", len(parts), strings.Join(parts, "->")) -} diff --git a/beam/utils/stack_test.go b/beam/utils/stack_test.go deleted file mode 100644 index fb18772ea8..0000000000 --- a/beam/utils/stack_test.go +++ /dev/null @@ -1,127 +0,0 @@ -package utils - -import ( - "github.com/docker/libswarm/beam" - "github.com/docker/libswarm/beam/unix" - "github.com/dotcloud/docker/pkg/testutils" - "strings" - "testing" -) - -func TestStackWithPipe(t *testing.T) { - r, w := beam.Pipe() - defer r.Close() - defer w.Close() - s := NewStackSender() - s.Add(w) - testutils.Timeout(t, func() { - go func() { - msg, err := r.Receive(0) - if err != nil { - t.Fatal(err) - } - if msg.Verb != beam.Log { - t.Fatalf("%#v", msg) - } - if strings.Join(msg.Args, " ") != "wonderful world" { - t.Fatalf("%#v", msg) - } - }() - _, err := s.Send(&beam.Message{Verb: beam.Log, Args: []string{"wonderful", "world"}}) - if err != nil { - t.Fatal(err) - } - }) -} - -func TestStackWithPair(t *testing.T) { - r, w, err := unix.Pair() - if err != nil { - t.Fatal(err) - } - defer r.Close() - defer w.Close() - s := NewStackSender() - s.Add(w) - testutils.Timeout(t, func() { - go func() { - msg, err := r.Receive(0) - if err != nil { - t.Fatal(err) - } - if msg.Verb != beam.Log { - t.Fatalf("%#v", msg) - } - if strings.Join(msg.Args, " ") != "wonderful world" { - t.Fatalf("%#v", msg) - } - }() - _, err := s.Send(&beam.Message{Verb: beam.Log, Args: []string{"wonderful", "world"}}) - if err != nil { - t.Fatal(err) - } - }) -} - -func TestStackLen(t *testing.T) { - s := NewStackSender() - if s.Len() != 0 { - t.Fatalf("empty StackSender has length %d", s.Len()) - } -} - -func TestStackAdd(t *testing.T) { - s := NewStackSender() - a := Buffer{} - beforeA := s.Add(&a) - // Add on an empty StackSender should return an empty StackSender - if beforeA.Len() != 0 { - t.Fatalf("%s has %d elements", beforeA, beforeA.Len()) - } - if s.Len() != 1 { - t.Fatalf("%#v", beforeA) - } - // Add a 2nd element - b := Buffer{} - beforeB := s.Add(&b) - if beforeB.Len() != 1 { - t.Fatalf("%#v", beforeA) - } - if s.Len() != 2 { - t.Fatalf("%#v", beforeA) - } - s.Send(&beam.Message{Verb: beam.Log, Args: []string{"for b"}}) - beforeB.Send(&beam.Message{Verb: beam.Log, Args: []string{"for a"}}) - beforeA.Send(&beam.Message{Verb: beam.Log, Args: []string{"for nobody"}}) - if len(a) != 1 { - t.Fatalf("%#v", a) - } - if len(b) != 1 { - t.Fatalf("%#v", b) - } -} - -// Misbehaving backends must be removed -func TestStackAddBad(t *testing.T) { - s := NewStackSender() - buf := Buffer{} - s.Add(&buf) - r, w := beam.Pipe() - s.Add(w) - if s.Len() != 2 { - t.Fatalf("%#v", s) - } - r.Close() - if _, err := s.Send(&beam.Message{Verb: beam.Log, Args: []string{"for the buffer"}}); err != nil { - t.Fatal(err) - } - if s.Len() != 1 { - t.Fatalf("%#v") - } - if len(buf) != 1 { - t.Fatalf("%#v", buf) - } - if buf[0].Args[0] != "for the buffer" { - t.Fatalf("%#v", buf) - } -} diff --git a/beam/verbs.go b/beam/verbs.go index 1cacb90627..ad10ca4a3b 100644 --- a/beam/verbs.go +++ b/beam/verbs.go @@ -1,5 +1,9 @@ package beam +import ( + "fmt" +) + type Verb uint32 const ( @@ -18,6 +22,38 @@ const ( Watch ) +func VerbFromString(s string) (Verb, error) { + switch s { + case "Ack": + return Ack, nil + case "Attach": + return Attach, nil + case "Connect": + return Connect, nil + case "Error": + return Error, nil + case "File": + return File, nil + case "Get": + return Get, nil + case "Log": + return Log, nil + case "Ls": + return Ls, nil + case "Set": + return Set, nil + case "Spawn": + return Spawn, nil + case "Start": + return Start, nil + case "Stop": + return Stop, nil + case "Watch": + return Watch, nil + } + return 0, fmt.Errorf("Unrecognised verb: %s", s) +} + func (v Verb) String() string { switch v { case Ack: diff --git a/beam/ws/ws.go b/beam/ws/ws.go deleted file mode 100644 index f6c03a9424..0000000000 --- a/beam/ws/ws.go +++ /dev/null @@ -1,72 +0,0 @@ -package ws - -import ( - "errors" - "github.com/docker/libswarm/beam" - "github.com/docker/libswarm/beam/http2" - "github.com/docker/spdystream/ws" - "github.com/gorilla/websocket" - "net/http" -) - -// Connect to a Beam server over a Websocket connection as a client -func NewSender(wsConn *websocket.Conn) (beam.Sender, error) { - session, err := http2.NewStreamSession(ws.NewConnection(wsConn)) - if err != nil { - return nil, err - } - return session, nil -} - -// Upgrade an HTTP connection to a Beam over HTTP2 over -// Websockets connection. -type Upgrader struct { - Upgrader websocket.Upgrader -} - -func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (*http2.Server, error) { - wsConn, err := u.Upgrader.Upgrade(w, r, responseHeader) - if err != nil { - return nil, err - } - - netConn := ws.NewConnection(wsConn) - server, err := http2.NewServer(netConn) - if err != nil { - netConn.Close() - return nil, err - } - - return server, nil -} - -// Returns true if a handshake error occured in websockets, which means -// a response has already been written to the stream. -func IsHandshakeError(err error) bool { - _, ok := err.(websocket.HandshakeError) - return ok -} - -type BeamFunc func(beam.Receiver) - -// Handler function for serving Beam over HTTP. Will invoke f and -// then close the server's Beam endpoint after f returns. -func Serve(u *Upgrader, f BeamFunc) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - if r.Method != "GET" { - u.Upgrader.Error(w, r, http.StatusMethodNotAllowed, errors.New("Method not allowed")) - return - } - - server, err := u.Upgrade(w, r, nil) - if err != nil { - if !IsHandshakeError(err) { - u.Upgrader.Error(w, r, http.StatusInternalServerError, errors.New("Unable to open an HTTP2 connection over Websockets")) - } - return - } - defer server.Close() - - f(server) - } -} diff --git a/beam/ws/ws_test.go b/beam/ws/ws_test.go deleted file mode 100644 index 6dd96a6645..0000000000 --- a/beam/ws/ws_test.go +++ /dev/null @@ -1,73 +0,0 @@ -package ws - -import ( - "github.com/docker/libswarm/beam" - "github.com/gorilla/websocket" - "io" - "net/http" - "net/http/httptest" - "strings" - "testing" -) - -func TestServe(t *testing.T) { - gotAck := make(chan bool) - u := &Upgrader{} - server := httptest.NewServer(Serve(u, func(r beam.Receiver) { - msg, msgErr := r.Receive(beam.Ret) - if msgErr != nil { - t.Fatalf("Error receiving message: %s", msgErr) - } - if msg.Att == nil { - t.Fatalf("Error message missing attachment") - } - if msg.Verb != beam.Attach { - t.Fatalf("Wrong verb\nActual: %s\nExpecting: %s", msg.Verb, beam.Attach) - } - - receiver, sendErr := msg.Ret.Send(&beam.Message{Verb: beam.Ack}) - if sendErr != nil { - t.Fatalf("Error sending return message: %s", sendErr) - } - - _, ackErr := receiver.Receive(0) - if ackErr == nil { - t.Fatalf("No error receiving from message with no return pipe") - } - if ackErr != io.EOF { - t.Fatalf("Unexpected error receiving from message: %s", ackErr) - } - - <-gotAck - })) - - wsConn, _, err := websocket.DefaultDialer.Dial(strings.Replace(server.URL, "http://", "ws://", 1), http.Header{"Origin": {server.URL}}) - if err != nil { - t.Fatal(err) - } - sender, senderErr := NewSender(wsConn) - if senderErr != nil { - t.Fatalf("Error creating sender: %s", senderErr) - } - - receiver, sendErr := sender.Send(&beam.Message{Verb: beam.Attach, Ret: beam.RetPipe}) - if sendErr != nil { - t.Fatalf("Error sending message: %s", sendErr) - } - - msg, receiveErr := receiver.Receive(beam.Ret) - if receiveErr != nil { - t.Fatalf("Error receiving message") - } - - if msg.Verb != beam.Ack { - t.Fatalf("Wrong verb\nActual: %s\nExpecting: %s", msg.Verb, beam.Ack) - } - - gotAck <- true - - shutdownErr := sender.Close() - if shutdownErr != nil && !strings.Contains(shutdownErr.Error(), "broken pipe") { - t.Fatalf("Error closing: %s", shutdownErr) - } -}