From cd3f2002a513fc8d949b4813c434e443dc89ce9c Mon Sep 17 00:00:00 2001 From: Aanand Prasad Date: Sun, 8 Jun 2014 12:55:07 -0700 Subject: [PATCH] Make client transport and scheme configurable Signed-off-by: Aanand Prasad --- backends/cloud.go | 27 +++++++++---------- backends/forward.go | 65 +++++++++++++++++++++------------------------ 2 files changed, 44 insertions(+), 48 deletions(-) diff --git a/backends/cloud.go b/backends/cloud.go index 4802cfcc30..efd8a20e50 100644 --- a/backends/cloud.go +++ b/backends/cloud.go @@ -114,10 +114,9 @@ func (s *cloud) Install(eng *engine.Engine) error { } } host := fmt.Sprintf("tcp://localhost:%d", localPort) - client, err := newClient(host, apiVersion) - if err != nil { - return job.Errorf("Unexpected error: %#v", err) - } + client := newClient() + client.setURL(host) + client.version = apiVersion //job.Eng.Register("inspect", func(job *engine.Job) engine.Status { // resp, err := client.call("GET", "/containers/ job.Eng.Register("create", func(job *engine.Job) engine.Status { @@ -128,17 +127,17 @@ func (s *cloud) Install(eng *engine.Engine) error { data, err := json.Marshal(container) resp, err := client.call("POST", "/containers/create", string(data)) if err != nil { - return job.Errorf("%s: post: %v", client.URL.String(), err) + return job.Errorf("post: %v", err) } body, err := ioutil.ReadAll(resp.Body) if err != nil { - return job.Errorf("%s: read body: %#v", client.URL.String(), err) + return job.Errorf("read body: %#v", err) } var containerOut Container err = json.Unmarshal([]byte(body), &containerOut) _, err = job.Printf("%s\n", containerOut.Id) if err != nil { - return job.Errorf("%s: write body: %#v", client.URL.String(), err) + return job.Errorf("write body: %#v", err) } log.Printf("%s", string(body)) return engine.StatusOK @@ -148,11 +147,11 @@ func (s *cloud) Install(eng *engine.Engine) error { path := fmt.Sprintf("/containers/%s/start", job.Args[0]) resp, err := client.call("POST", path, "{\"Binds\":[],\"ContainerIDFile\":\"\",\"LxcConf\":[],\"Privileged\":false,\"PortBindings\":{},\"Links\":null,\"PublishAllPorts\":false,\"Dns\":null,\"DnsSearch\":[],\"VolumesFrom\":[]}") if err != nil { - return job.Errorf("%s: post: %v", client.URL.String(), err) + return job.Errorf("post: %v", err) } body, err := ioutil.ReadAll(resp.Body) if err != nil { - return job.Errorf("%s: read body: %#v", client.URL.String(), err) + return job.Errorf("read body: %#v", err) } log.Printf("%s", string(body)) return engine.StatusOK @@ -169,17 +168,17 @@ func (s *cloud) Install(eng *engine.Engine) error { ) resp, err := client.call("GET", path, "") if err != nil { - return job.Errorf("%s: get: %v", client.URL.String(), err) + return job.Errorf("get: %v", err) } // FIXME: check for response error c := engine.NewTable("Created", 0) body, err := ioutil.ReadAll(resp.Body) if err != nil { - return job.Errorf("%s: read body: %v", client.URL.String(), err) + return job.Errorf("read body: %v", err) } fmt.Printf("---> '%s'\n", body) if _, err := c.ReadListFrom(body); err != nil { - return job.Errorf("%s: readlist: %v", client.URL.String(), err) + return job.Errorf("readlist: %v", err) } c.WriteListTo(job.Stdout) return engine.StatusOK @@ -191,7 +190,7 @@ func (s *cloud) Install(eng *engine.Engine) error { resp, err := client.call("DELETE", path, "") if err != nil { - return job.Errorf("%s: delete: %v", client.URL.String(), err) + return job.Errorf("delete: %v", err) } log.Printf("%#v", resp) return engine.StatusOK @@ -203,7 +202,7 @@ func (s *cloud) Install(eng *engine.Engine) error { resp, err := client.call("POST", path, "") if err != nil { - return job.Errorf("%s: delete: %v", client.URL.String(), err) + return job.Errorf("delete: %v", err) } log.Printf("%#v", resp) return engine.StatusOK diff --git a/backends/forward.go b/backends/forward.go index 1c1a8041d2..3b46529a4f 100644 --- a/backends/forward.go +++ b/backends/forward.go @@ -17,15 +17,16 @@ import ( ) func Forward() beam.Sender { + return ForwardWithClient(newClient()) +} + +func ForwardWithClient(client *client) beam.Sender { backend := beam.NewServer() backend.OnSpawn(beam.Handler(func(ctx *beam.Message) error { if len(ctx.Args) != 1 { return fmt.Errorf("forward: spawn takes exactly 1 argument, got %d", len(ctx.Args)) } - client, err := newClient(ctx.Args[0], "v1.11") - if err != nil { - return fmt.Errorf("%v", err) - } + client.setURL(ctx.Args[0]) f := &forwarder{ client: client, Server: beam.NewServer(), @@ -34,7 +35,7 @@ func Forward() beam.Sender { f.Server.OnStart(beam.Handler(f.start)) f.Server.OnLs(beam.Handler(f.ls)) f.Server.OnSpawn(beam.Handler(f.spawn)) - _, err = ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: f.Server}) + _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: f.Server}) return err })) return backend @@ -67,23 +68,23 @@ func (f *forwarder) start(ctx *beam.Message) error { func (f *forwarder) ls(ctx *beam.Message) error { resp, err := f.client.call("GET", "/containers/json", "") if err != nil { - return fmt.Errorf("%s: get: %v", f.client.URL.String(), err) + return fmt.Errorf("get: %v", err) } // FIXME: check for response error c := engine.NewTable("Created", 0) body, err := ioutil.ReadAll(resp.Body) if err != nil { - return fmt.Errorf("%s: read body: %v", f.client.URL.String(), err) + return fmt.Errorf("read body: %v", err) } if _, err := c.ReadListFrom(body); err != nil { - return fmt.Errorf("%s: readlist: %v", f.client.URL.String(), err) + return fmt.Errorf("readlist: %v", err) } names := []string{} for _, env := range c.Data { names = append(names, env.GetList("Names")[0][1:]) } if _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Set, Args: names}); err != nil { - return fmt.Errorf("%s: send response: %v", f.client.URL.String(), err) + return fmt.Errorf("send response: %v", err) } return nil } @@ -218,30 +219,27 @@ func (c *container) get(ctx *beam.Message) error { } type client struct { - URL *url.URL - proto string - addr string - version string + transport *http.Transport + urlHost string + scheme string + version string } -func newClient(peer, version string) (*client, error) { - u, err := url.Parse(peer) - if err != nil { - return nil, err +func newClient() *client { + return &client{ + transport: &http.Transport{}, + urlHost: "dummy.host", + scheme: "http", + version: "v1.11", } - protoAddrParts := strings.SplitN(peer, "://", 2) - c := &client{ - URL: u, - proto: protoAddrParts[0], - addr: protoAddrParts[1], - version: version, - } - c.URL.Scheme = "http" - return c, nil } -func (c *client) dial() (net.Conn, error) { - return net.Dial(c.proto, c.addr) +func (c *client) setURL(url string) { + parts := strings.SplitN(url, "://", 2) + proto, host := parts[0], parts[1] + c.transport.Dial = func(_, _ string) (net.Conn, error) { + return net.Dial(proto, host) + } } func (c *client) call(method, path, body string) (*http.Response, error) { @@ -250,15 +248,14 @@ func (c *client) call(method, path, body string) (*http.Response, error) { if err != nil { return nil, err } - u.Host = "dummy.host" - u.Scheme = c.URL.Scheme + u.Host = c.urlHost + u.Scheme = c.scheme req, err := http.NewRequest(method, u.String(), strings.NewReader(body)) if err != nil { return nil, err } - tr := &http.Transport{Dial: func(_, _ string) (net.Conn, error) { return c.dial() }} - client := &http.Client{Transport: tr} - resp, err := client.Do(req) + httpClient := &http.Client{Transport: c.transport} + resp, err := httpClient.Do(req) if err != nil { return nil, err } @@ -267,7 +264,7 @@ func (c *client) call(method, path, body string) (*http.Response, error) { func (c *client) hijack(method, path string, in io.ReadCloser, stdout, stderr io.Writer) error { path = fmt.Sprintf("/%s%s", c.version, path) - dial, err := c.dial() + dial, err := c.transport.Dial("ignored", "ignored") if err != nil { return err }