mirror of https://github.com/docker/docs.git
Merge pull request #865 from abronan/update_etcd_godep
store: Update go-etcd Godep
This commit is contained in:
commit
a43a1a869d
|
@ -32,8 +32,8 @@
|
|||
},
|
||||
{
|
||||
"ImportPath": "github.com/coreos/go-etcd/etcd",
|
||||
"Comment": "v0.4.6-2-gb7c233e",
|
||||
"Rev": "b7c233e2ef9be1016c63f66145161b317165ad7e"
|
||||
"Comment": "v2.0.0-7-g73a8ef7",
|
||||
"Rev": "73a8ef737e8ea002281a28b4cb92a1de121ad4c6"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/docker/docker/pkg/ioutils",
|
||||
|
|
|
@ -15,8 +15,6 @@ import (
|
|||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
|
||||
)
|
||||
|
||||
// See SetConsistency for how to use these constants.
|
||||
|
@ -44,10 +42,17 @@ type Config struct {
|
|||
Consistency string `json:"consistency"`
|
||||
}
|
||||
|
||||
type credentials struct {
|
||||
username string
|
||||
password string
|
||||
}
|
||||
|
||||
type Client struct {
|
||||
config Config `json:"config"`
|
||||
cluster *Cluster `json:"cluster"`
|
||||
httpClient *http.Client
|
||||
credentials *credentials
|
||||
transport *http.Transport
|
||||
persistence io.Writer
|
||||
cURLch chan string
|
||||
// CheckRetry can be used to control the policy for failed requests
|
||||
|
@ -172,17 +177,27 @@ func NewClientFromReader(reader io.Reader) (*Client, error) {
|
|||
// Override the Client's HTTP Transport object
|
||||
func (c *Client) SetTransport(tr *http.Transport) {
|
||||
c.httpClient.Transport = tr
|
||||
c.transport = tr
|
||||
}
|
||||
|
||||
func (c *Client) SetCredentials(username, password string) {
|
||||
c.credentials = &credentials{username, password}
|
||||
}
|
||||
|
||||
func (c *Client) Close() {
|
||||
c.transport.DisableKeepAlives = true
|
||||
c.transport.CloseIdleConnections()
|
||||
}
|
||||
|
||||
// initHTTPClient initializes a HTTP client for etcd client
|
||||
func (c *Client) initHTTPClient() {
|
||||
tr := &http.Transport{
|
||||
c.transport = &http.Transport{
|
||||
Dial: c.dial,
|
||||
TLSClientConfig: &tls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
},
|
||||
}
|
||||
c.httpClient = &http.Client{Transport: tr}
|
||||
c.httpClient = &http.Client{Transport: c.transport}
|
||||
}
|
||||
|
||||
// initHTTPClient initializes a HTTPS client for etcd client
|
||||
|
@ -305,31 +320,49 @@ func (c *Client) internalSyncCluster(machines []string) bool {
|
|||
continue
|
||||
}
|
||||
|
||||
b, err := ioutil.ReadAll(resp.Body)
|
||||
resp.Body.Close()
|
||||
if err != nil {
|
||||
// try another machine in the cluster
|
||||
continue
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK { // fall-back to old endpoint
|
||||
httpPath := c.createHttpPath(machine, path.Join(version, "machines"))
|
||||
resp, err := c.httpClient.Get(httpPath)
|
||||
if err != nil {
|
||||
// try another machine in the cluster
|
||||
continue
|
||||
}
|
||||
b, err := ioutil.ReadAll(resp.Body)
|
||||
resp.Body.Close()
|
||||
if err != nil {
|
||||
// try another machine in the cluster
|
||||
continue
|
||||
}
|
||||
// update Machines List
|
||||
c.cluster.updateFromStr(string(b))
|
||||
} else {
|
||||
b, err := ioutil.ReadAll(resp.Body)
|
||||
resp.Body.Close()
|
||||
if err != nil {
|
||||
// try another machine in the cluster
|
||||
continue
|
||||
}
|
||||
|
||||
var mCollection httptypes.MemberCollection
|
||||
if err := json.Unmarshal(b, &mCollection); err != nil {
|
||||
// try another machine
|
||||
continue
|
||||
}
|
||||
var mCollection memberCollection
|
||||
if err := json.Unmarshal(b, &mCollection); err != nil {
|
||||
// try another machine
|
||||
continue
|
||||
}
|
||||
|
||||
urls := make([]string, 0)
|
||||
for _, m := range mCollection {
|
||||
urls = append(urls, m.ClientURLs...)
|
||||
}
|
||||
urls := make([]string, 0)
|
||||
for _, m := range mCollection {
|
||||
urls = append(urls, m.ClientURLs...)
|
||||
}
|
||||
|
||||
// update Machines List
|
||||
c.cluster.updateFromStr(strings.Join(urls, ","))
|
||||
// update Machines List
|
||||
c.cluster.updateFromStr(strings.Join(urls, ","))
|
||||
}
|
||||
|
||||
logger.Debug("sync.machines ", c.cluster.Machines)
|
||||
c.saveConfig()
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
|
|
|
@ -94,3 +94,15 @@ func TestPersistence(t *testing.T) {
|
|||
t.Fatalf("The two configs should be equal!")
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientRetry(t *testing.T) {
|
||||
c := NewClient([]string{"http://strange", "http://127.0.0.1:4001"})
|
||||
// use first endpoint as the picked url
|
||||
c.cluster.picked = 0
|
||||
if _, err := c.Set("foo", "bar", 5); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := c.Delete("foo", true); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,5 +30,8 @@ func (cl *Cluster) pick() string { return cl.Machines[cl.picked] }
|
|||
|
||||
func (cl *Cluster) updateFromStr(machines string) {
|
||||
cl.Machines = strings.Split(machines, ",")
|
||||
cl.picked = rand.Intn(len(machines))
|
||||
for i := range cl.Machines {
|
||||
cl.Machines[i] = strings.TrimSpace(cl.Machines[i])
|
||||
}
|
||||
cl.picked = rand.Intn(len(cl.Machines))
|
||||
}
|
||||
|
|
|
@ -6,7 +6,8 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
ErrCodeEtcdNotReachable = 501
|
||||
ErrCodeEtcdNotReachable = 501
|
||||
ErrCodeUnhandledHTTPStatus = 502
|
||||
)
|
||||
|
||||
var (
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
package etcd
|
||||
|
||||
import "encoding/json"
|
||||
|
||||
type Member struct {
|
||||
ID string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
PeerURLs []string `json:"peerURLs"`
|
||||
ClientURLs []string `json:"clientURLs"`
|
||||
}
|
||||
|
||||
type memberCollection []Member
|
||||
|
||||
func (c *memberCollection) UnmarshalJSON(data []byte) error {
|
||||
d := struct {
|
||||
Members []Member
|
||||
}{}
|
||||
|
||||
if err := json.Unmarshal(data, &d); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if d.Members == nil {
|
||||
*c = make([]Member, 0)
|
||||
return nil
|
||||
}
|
||||
|
||||
*c = d.Members
|
||||
return nil
|
||||
}
|
71
Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/member_test.go
generated
vendored
Normal file
71
Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/member_test.go
generated
vendored
Normal file
|
@ -0,0 +1,71 @@
|
|||
package etcd
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestMemberCollectionUnmarshal(t *testing.T) {
|
||||
tests := []struct {
|
||||
body []byte
|
||||
want memberCollection
|
||||
}{
|
||||
{
|
||||
body: []byte(`{"members":[]}`),
|
||||
want: memberCollection([]Member{}),
|
||||
},
|
||||
{
|
||||
body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`),
|
||||
want: memberCollection(
|
||||
[]Member{
|
||||
{
|
||||
ID: "2745e2525fce8fe",
|
||||
Name: "node3",
|
||||
PeerURLs: []string{
|
||||
"http://127.0.0.1:7003",
|
||||
},
|
||||
ClientURLs: []string{
|
||||
"http://127.0.0.1:4003",
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "42134f434382925",
|
||||
Name: "node1",
|
||||
PeerURLs: []string{
|
||||
"http://127.0.0.1:2380",
|
||||
"http://127.0.0.1:7001",
|
||||
},
|
||||
ClientURLs: []string{
|
||||
"http://127.0.0.1:2379",
|
||||
"http://127.0.0.1:4001",
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: "94088180e21eb87b",
|
||||
Name: "node2",
|
||||
PeerURLs: []string{
|
||||
"http://127.0.0.1:7002",
|
||||
},
|
||||
ClientURLs: []string{
|
||||
"http://127.0.0.1:4002",
|
||||
},
|
||||
},
|
||||
},
|
||||
),
|
||||
},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
var got memberCollection
|
||||
err := json.Unmarshal(tt.body, &got)
|
||||
if err != nil {
|
||||
t.Errorf("#%d: unexpected error: %v", i, err)
|
||||
continue
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(tt.want, got) {
|
||||
t.Errorf("#%d: incorrect output: want=%#v, got=%#v", i, tt.want, got)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -188,7 +188,10 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
|
|||
|
||||
logger.Debug("Connecting to etcd: attempt ", attempt+1, " for ", rr.RelativePath)
|
||||
|
||||
httpPath = c.getHttpPath(rr.RelativePath)
|
||||
// get httpPath if not set
|
||||
if httpPath == "" {
|
||||
httpPath = c.getHttpPath(rr.RelativePath)
|
||||
}
|
||||
|
||||
// Return a cURL command if curlChan is set
|
||||
if c.cURLch != nil {
|
||||
|
@ -196,6 +199,9 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
|
|||
for key, value := range rr.Values {
|
||||
command += fmt.Sprintf(" -d %s=%s", key, value[0])
|
||||
}
|
||||
if c.credentials != nil {
|
||||
command += fmt.Sprintf(" -u %s", c.credentials.username)
|
||||
}
|
||||
c.sendCURL(command)
|
||||
}
|
||||
|
||||
|
@ -225,7 +231,13 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if c.credentials != nil {
|
||||
req.SetBasicAuth(c.credentials.username, c.credentials.password)
|
||||
}
|
||||
|
||||
resp, err = c.httpClient.Do(req)
|
||||
// clear previous httpPath
|
||||
httpPath = ""
|
||||
defer func() {
|
||||
if resp != nil {
|
||||
resp.Body.Close()
|
||||
|
@ -280,6 +292,19 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
|
|||
}
|
||||
}
|
||||
|
||||
if resp.StatusCode == http.StatusTemporaryRedirect {
|
||||
u, err := resp.Location()
|
||||
|
||||
if err != nil {
|
||||
logger.Warning(err)
|
||||
} else {
|
||||
// set httpPath for following redirection
|
||||
httpPath = u.String()
|
||||
}
|
||||
resp.Body.Close()
|
||||
continue
|
||||
}
|
||||
|
||||
if checkErr := checkRetry(c.cluster, numReqs, *resp,
|
||||
errors.New("Unexpected HTTP status code")); checkErr != nil {
|
||||
return nil, checkErr
|
||||
|
@ -302,21 +327,40 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
|
|||
func DefaultCheckRetry(cluster *Cluster, numReqs int, lastResp http.Response,
|
||||
err error) error {
|
||||
|
||||
if numReqs >= 2*len(cluster.Machines) {
|
||||
return newError(ErrCodeEtcdNotReachable,
|
||||
"Tried to connect to each peer twice and failed", 0)
|
||||
if numReqs > 2*len(cluster.Machines) {
|
||||
errStr := fmt.Sprintf("failed to propose on members %v twice [last error: %v]", cluster.Machines, err)
|
||||
return newError(ErrCodeEtcdNotReachable, errStr, 0)
|
||||
}
|
||||
|
||||
code := lastResp.StatusCode
|
||||
if code == http.StatusInternalServerError {
|
||||
time.Sleep(time.Millisecond * 200)
|
||||
|
||||
if isEmptyResponse(lastResp) {
|
||||
// always retry if it failed to get response from one machine
|
||||
return nil
|
||||
}
|
||||
|
||||
logger.Warning("bad response status code", code)
|
||||
if !shouldRetry(lastResp) {
|
||||
body := []byte("nil")
|
||||
if lastResp.Body != nil {
|
||||
if b, err := ioutil.ReadAll(lastResp.Body); err == nil {
|
||||
body = b
|
||||
}
|
||||
}
|
||||
errStr := fmt.Sprintf("unhandled http status [%s] with body [%s]", http.StatusText(lastResp.StatusCode), body)
|
||||
return newError(ErrCodeUnhandledHTTPStatus, errStr, 0)
|
||||
}
|
||||
// sleep some time and expect leader election finish
|
||||
time.Sleep(time.Millisecond * 200)
|
||||
logger.Warning("bad response status code", lastResp.StatusCode)
|
||||
return nil
|
||||
}
|
||||
|
||||
func isEmptyResponse(r http.Response) bool { return r.StatusCode == 0 }
|
||||
|
||||
// shouldRetry returns whether the reponse deserves retry.
|
||||
func shouldRetry(r http.Response) bool {
|
||||
// TODO: only retry when the cluster is in leader election
|
||||
// We cannot do it exactly because etcd doesn't support it well.
|
||||
return r.StatusCode == http.StatusInternalServerError
|
||||
}
|
||||
|
||||
func (c *Client) getHttpPath(s ...string) string {
|
||||
fullPath := c.cluster.pick() + "/" + version
|
||||
for _, seg := range s {
|
||||
|
|
|
@ -1,3 +1,6 @@
|
|||
package etcd
|
||||
|
||||
const version = "v2"
|
||||
const (
|
||||
version = "v2"
|
||||
packageVersion = "v2.0.0+git"
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue