Make client transport and scheme configurable

Signed-off-by: Aanand Prasad <aanand.prasad@gmail.com>
This commit is contained in:
Aanand Prasad 2014-06-08 12:55:07 -07:00
parent 20c489e752
commit cd3f2002a5
2 changed files with 44 additions and 48 deletions

View File

@ -114,10 +114,9 @@ func (s *cloud) Install(eng *engine.Engine) error {
}
}
host := fmt.Sprintf("tcp://localhost:%d", localPort)
client, err := newClient(host, apiVersion)
if err != nil {
return job.Errorf("Unexpected error: %#v", err)
}
client := newClient()
client.setURL(host)
client.version = apiVersion
//job.Eng.Register("inspect", func(job *engine.Job) engine.Status {
// resp, err := client.call("GET", "/containers/
job.Eng.Register("create", func(job *engine.Job) engine.Status {
@ -128,17 +127,17 @@ func (s *cloud) Install(eng *engine.Engine) error {
data, err := json.Marshal(container)
resp, err := client.call("POST", "/containers/create", string(data))
if err != nil {
return job.Errorf("%s: post: %v", client.URL.String(), err)
return job.Errorf("post: %v", err)
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return job.Errorf("%s: read body: %#v", client.URL.String(), err)
return job.Errorf("read body: %#v", err)
}
var containerOut Container
err = json.Unmarshal([]byte(body), &containerOut)
_, err = job.Printf("%s\n", containerOut.Id)
if err != nil {
return job.Errorf("%s: write body: %#v", client.URL.String(), err)
return job.Errorf("write body: %#v", err)
}
log.Printf("%s", string(body))
return engine.StatusOK
@ -148,11 +147,11 @@ func (s *cloud) Install(eng *engine.Engine) error {
path := fmt.Sprintf("/containers/%s/start", job.Args[0])
resp, err := client.call("POST", path, "{\"Binds\":[],\"ContainerIDFile\":\"\",\"LxcConf\":[],\"Privileged\":false,\"PortBindings\":{},\"Links\":null,\"PublishAllPorts\":false,\"Dns\":null,\"DnsSearch\":[],\"VolumesFrom\":[]}")
if err != nil {
return job.Errorf("%s: post: %v", client.URL.String(), err)
return job.Errorf("post: %v", err)
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return job.Errorf("%s: read body: %#v", client.URL.String(), err)
return job.Errorf("read body: %#v", err)
}
log.Printf("%s", string(body))
return engine.StatusOK
@ -169,17 +168,17 @@ func (s *cloud) Install(eng *engine.Engine) error {
)
resp, err := client.call("GET", path, "")
if err != nil {
return job.Errorf("%s: get: %v", client.URL.String(), err)
return job.Errorf("get: %v", err)
}
// FIXME: check for response error
c := engine.NewTable("Created", 0)
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return job.Errorf("%s: read body: %v", client.URL.String(), err)
return job.Errorf("read body: %v", err)
}
fmt.Printf("---> '%s'\n", body)
if _, err := c.ReadListFrom(body); err != nil {
return job.Errorf("%s: readlist: %v", client.URL.String(), err)
return job.Errorf("readlist: %v", err)
}
c.WriteListTo(job.Stdout)
return engine.StatusOK
@ -191,7 +190,7 @@ func (s *cloud) Install(eng *engine.Engine) error {
resp, err := client.call("DELETE", path, "")
if err != nil {
return job.Errorf("%s: delete: %v", client.URL.String(), err)
return job.Errorf("delete: %v", err)
}
log.Printf("%#v", resp)
return engine.StatusOK
@ -203,7 +202,7 @@ func (s *cloud) Install(eng *engine.Engine) error {
resp, err := client.call("POST", path, "")
if err != nil {
return job.Errorf("%s: delete: %v", client.URL.String(), err)
return job.Errorf("delete: %v", err)
}
log.Printf("%#v", resp)
return engine.StatusOK

View File

@ -17,15 +17,16 @@ import (
)
func Forward() beam.Sender {
return ForwardWithClient(newClient())
}
func ForwardWithClient(client *client) beam.Sender {
backend := beam.NewServer()
backend.OnSpawn(beam.Handler(func(ctx *beam.Message) error {
if len(ctx.Args) != 1 {
return fmt.Errorf("forward: spawn takes exactly 1 argument, got %d", len(ctx.Args))
}
client, err := newClient(ctx.Args[0], "v1.11")
if err != nil {
return fmt.Errorf("%v", err)
}
client.setURL(ctx.Args[0])
f := &forwarder{
client: client,
Server: beam.NewServer(),
@ -34,7 +35,7 @@ func Forward() beam.Sender {
f.Server.OnStart(beam.Handler(f.start))
f.Server.OnLs(beam.Handler(f.ls))
f.Server.OnSpawn(beam.Handler(f.spawn))
_, err = ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: f.Server})
_, err := ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: f.Server})
return err
}))
return backend
@ -67,23 +68,23 @@ func (f *forwarder) start(ctx *beam.Message) error {
func (f *forwarder) ls(ctx *beam.Message) error {
resp, err := f.client.call("GET", "/containers/json", "")
if err != nil {
return fmt.Errorf("%s: get: %v", f.client.URL.String(), err)
return fmt.Errorf("get: %v", err)
}
// FIXME: check for response error
c := engine.NewTable("Created", 0)
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("%s: read body: %v", f.client.URL.String(), err)
return fmt.Errorf("read body: %v", err)
}
if _, err := c.ReadListFrom(body); err != nil {
return fmt.Errorf("%s: readlist: %v", f.client.URL.String(), err)
return fmt.Errorf("readlist: %v", err)
}
names := []string{}
for _, env := range c.Data {
names = append(names, env.GetList("Names")[0][1:])
}
if _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Set, Args: names}); err != nil {
return fmt.Errorf("%s: send response: %v", f.client.URL.String(), err)
return fmt.Errorf("send response: %v", err)
}
return nil
}
@ -218,30 +219,27 @@ func (c *container) get(ctx *beam.Message) error {
}
type client struct {
URL *url.URL
proto string
addr string
version string
transport *http.Transport
urlHost string
scheme string
version string
}
func newClient(peer, version string) (*client, error) {
u, err := url.Parse(peer)
if err != nil {
return nil, err
func newClient() *client {
return &client{
transport: &http.Transport{},
urlHost: "dummy.host",
scheme: "http",
version: "v1.11",
}
protoAddrParts := strings.SplitN(peer, "://", 2)
c := &client{
URL: u,
proto: protoAddrParts[0],
addr: protoAddrParts[1],
version: version,
}
c.URL.Scheme = "http"
return c, nil
}
func (c *client) dial() (net.Conn, error) {
return net.Dial(c.proto, c.addr)
func (c *client) setURL(url string) {
parts := strings.SplitN(url, "://", 2)
proto, host := parts[0], parts[1]
c.transport.Dial = func(_, _ string) (net.Conn, error) {
return net.Dial(proto, host)
}
}
func (c *client) call(method, path, body string) (*http.Response, error) {
@ -250,15 +248,14 @@ func (c *client) call(method, path, body string) (*http.Response, error) {
if err != nil {
return nil, err
}
u.Host = "dummy.host"
u.Scheme = c.URL.Scheme
u.Host = c.urlHost
u.Scheme = c.scheme
req, err := http.NewRequest(method, u.String(), strings.NewReader(body))
if err != nil {
return nil, err
}
tr := &http.Transport{Dial: func(_, _ string) (net.Conn, error) { return c.dial() }}
client := &http.Client{Transport: tr}
resp, err := client.Do(req)
httpClient := &http.Client{Transport: c.transport}
resp, err := httpClient.Do(req)
if err != nil {
return nil, err
}
@ -267,7 +264,7 @@ func (c *client) call(method, path, body string) (*http.Response, error) {
func (c *client) hijack(method, path string, in io.ReadCloser, stdout, stderr io.Writer) error {
path = fmt.Sprintf("/%s%s", c.version, path)
dial, err := c.dial()
dial, err := c.transport.Dial("ignored", "ignored")
if err != nil {
return err
}