mirror of https://github.com/docker/docs.git
Add Ephemeral for Consul, Zookeeper and Etcd (Etcd abd Consul are using TTLs in the background)
Signed-off-by: Alexandre Beslic <abronan@docker.com>
This commit is contained in:
parent
d5915b2a09
commit
4408a9cc8c
|
@ -15,8 +15,6 @@ import (
|
||||||
"path"
|
"path"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// See SetConsistency for how to use these constants.
|
// See SetConsistency for how to use these constants.
|
||||||
|
@ -44,10 +42,17 @@ type Config struct {
|
||||||
Consistency string `json:"consistency"`
|
Consistency string `json:"consistency"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type credentials struct {
|
||||||
|
username string
|
||||||
|
password string
|
||||||
|
}
|
||||||
|
|
||||||
type Client struct {
|
type Client struct {
|
||||||
config Config `json:"config"`
|
config Config `json:"config"`
|
||||||
cluster *Cluster `json:"cluster"`
|
cluster *Cluster `json:"cluster"`
|
||||||
httpClient *http.Client
|
httpClient *http.Client
|
||||||
|
credentials *credentials
|
||||||
|
transport *http.Transport
|
||||||
persistence io.Writer
|
persistence io.Writer
|
||||||
cURLch chan string
|
cURLch chan string
|
||||||
// CheckRetry can be used to control the policy for failed requests
|
// CheckRetry can be used to control the policy for failed requests
|
||||||
|
@ -172,17 +177,27 @@ func NewClientFromReader(reader io.Reader) (*Client, error) {
|
||||||
// Override the Client's HTTP Transport object
|
// Override the Client's HTTP Transport object
|
||||||
func (c *Client) SetTransport(tr *http.Transport) {
|
func (c *Client) SetTransport(tr *http.Transport) {
|
||||||
c.httpClient.Transport = tr
|
c.httpClient.Transport = tr
|
||||||
|
c.transport = tr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) SetCredentials(username, password string) {
|
||||||
|
c.credentials = &credentials{username, password}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) Close() {
|
||||||
|
c.transport.DisableKeepAlives = true
|
||||||
|
c.transport.CloseIdleConnections()
|
||||||
}
|
}
|
||||||
|
|
||||||
// initHTTPClient initializes a HTTP client for etcd client
|
// initHTTPClient initializes a HTTP client for etcd client
|
||||||
func (c *Client) initHTTPClient() {
|
func (c *Client) initHTTPClient() {
|
||||||
tr := &http.Transport{
|
c.transport = &http.Transport{
|
||||||
Dial: c.dial,
|
Dial: c.dial,
|
||||||
TLSClientConfig: &tls.Config{
|
TLSClientConfig: &tls.Config{
|
||||||
InsecureSkipVerify: true,
|
InsecureSkipVerify: true,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
c.httpClient = &http.Client{Transport: tr}
|
c.httpClient = &http.Client{Transport: c.transport}
|
||||||
}
|
}
|
||||||
|
|
||||||
// initHTTPClient initializes a HTTPS client for etcd client
|
// initHTTPClient initializes a HTTPS client for etcd client
|
||||||
|
@ -305,31 +320,49 @@ func (c *Client) internalSyncCluster(machines []string) bool {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
b, err := ioutil.ReadAll(resp.Body)
|
if resp.StatusCode != http.StatusOK { // fall-back to old endpoint
|
||||||
resp.Body.Close()
|
httpPath := c.createHttpPath(machine, path.Join(version, "machines"))
|
||||||
if err != nil {
|
resp, err := c.httpClient.Get(httpPath)
|
||||||
// try another machine in the cluster
|
if err != nil {
|
||||||
continue
|
// try another machine in the cluster
|
||||||
}
|
continue
|
||||||
|
}
|
||||||
|
b, err := ioutil.ReadAll(resp.Body)
|
||||||
|
resp.Body.Close()
|
||||||
|
if err != nil {
|
||||||
|
// try another machine in the cluster
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// update Machines List
|
||||||
|
c.cluster.updateFromStr(string(b))
|
||||||
|
} else {
|
||||||
|
b, err := ioutil.ReadAll(resp.Body)
|
||||||
|
resp.Body.Close()
|
||||||
|
if err != nil {
|
||||||
|
// try another machine in the cluster
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
var mCollection httptypes.MemberCollection
|
var mCollection memberCollection
|
||||||
if err := json.Unmarshal(b, &mCollection); err != nil {
|
if err := json.Unmarshal(b, &mCollection); err != nil {
|
||||||
// try another machine
|
// try another machine
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
urls := make([]string, 0)
|
urls := make([]string, 0)
|
||||||
for _, m := range mCollection {
|
for _, m := range mCollection {
|
||||||
urls = append(urls, m.ClientURLs...)
|
urls = append(urls, m.ClientURLs...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// update Machines List
|
// update Machines List
|
||||||
c.cluster.updateFromStr(strings.Join(urls, ","))
|
c.cluster.updateFromStr(strings.Join(urls, ","))
|
||||||
|
}
|
||||||
|
|
||||||
logger.Debug("sync.machines ", c.cluster.Machines)
|
logger.Debug("sync.machines ", c.cluster.Machines)
|
||||||
c.saveConfig()
|
c.saveConfig()
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -94,3 +94,15 @@ func TestPersistence(t *testing.T) {
|
||||||
t.Fatalf("The two configs should be equal!")
|
t.Fatalf("The two configs should be equal!")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestClientRetry(t *testing.T) {
|
||||||
|
c := NewClient([]string{"http://strange", "http://127.0.0.1:4001"})
|
||||||
|
// use first endpoint as the picked url
|
||||||
|
c.cluster.picked = 0
|
||||||
|
if _, err := c.Set("foo", "bar", 5); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if _, err := c.Delete("foo", true); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -30,5 +30,8 @@ func (cl *Cluster) pick() string { return cl.Machines[cl.picked] }
|
||||||
|
|
||||||
func (cl *Cluster) updateFromStr(machines string) {
|
func (cl *Cluster) updateFromStr(machines string) {
|
||||||
cl.Machines = strings.Split(machines, ",")
|
cl.Machines = strings.Split(machines, ",")
|
||||||
cl.picked = rand.Intn(len(machines))
|
for i := range cl.Machines {
|
||||||
|
cl.Machines[i] = strings.TrimSpace(cl.Machines[i])
|
||||||
|
}
|
||||||
|
cl.picked = rand.Intn(len(cl.Machines))
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,7 +6,8 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
ErrCodeEtcdNotReachable = 501
|
ErrCodeEtcdNotReachable = 501
|
||||||
|
ErrCodeUnhandledHTTPStatus = 502
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
|
@ -0,0 +1,30 @@
|
||||||
|
package etcd
|
||||||
|
|
||||||
|
import "encoding/json"
|
||||||
|
|
||||||
|
type Member struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
PeerURLs []string `json:"peerURLs"`
|
||||||
|
ClientURLs []string `json:"clientURLs"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type memberCollection []Member
|
||||||
|
|
||||||
|
func (c *memberCollection) UnmarshalJSON(data []byte) error {
|
||||||
|
d := struct {
|
||||||
|
Members []Member
|
||||||
|
}{}
|
||||||
|
|
||||||
|
if err := json.Unmarshal(data, &d); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if d.Members == nil {
|
||||||
|
*c = make([]Member, 0)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
*c = d.Members
|
||||||
|
return nil
|
||||||
|
}
|
71
Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/member_test.go
generated
vendored
Normal file
71
Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/member_test.go
generated
vendored
Normal file
|
@ -0,0 +1,71 @@
|
||||||
|
package etcd
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMemberCollectionUnmarshal(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
body []byte
|
||||||
|
want memberCollection
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
body: []byte(`{"members":[]}`),
|
||||||
|
want: memberCollection([]Member{}),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`),
|
||||||
|
want: memberCollection(
|
||||||
|
[]Member{
|
||||||
|
{
|
||||||
|
ID: "2745e2525fce8fe",
|
||||||
|
Name: "node3",
|
||||||
|
PeerURLs: []string{
|
||||||
|
"http://127.0.0.1:7003",
|
||||||
|
},
|
||||||
|
ClientURLs: []string{
|
||||||
|
"http://127.0.0.1:4003",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ID: "42134f434382925",
|
||||||
|
Name: "node1",
|
||||||
|
PeerURLs: []string{
|
||||||
|
"http://127.0.0.1:2380",
|
||||||
|
"http://127.0.0.1:7001",
|
||||||
|
},
|
||||||
|
ClientURLs: []string{
|
||||||
|
"http://127.0.0.1:2379",
|
||||||
|
"http://127.0.0.1:4001",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ID: "94088180e21eb87b",
|
||||||
|
Name: "node2",
|
||||||
|
PeerURLs: []string{
|
||||||
|
"http://127.0.0.1:7002",
|
||||||
|
},
|
||||||
|
ClientURLs: []string{
|
||||||
|
"http://127.0.0.1:4002",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, tt := range tests {
|
||||||
|
var got memberCollection
|
||||||
|
err := json.Unmarshal(tt.body, &got)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("#%d: unexpected error: %v", i, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(tt.want, got) {
|
||||||
|
t.Errorf("#%d: incorrect output: want=%#v, got=%#v", i, tt.want, got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -188,7 +188,10 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
|
||||||
|
|
||||||
logger.Debug("Connecting to etcd: attempt ", attempt+1, " for ", rr.RelativePath)
|
logger.Debug("Connecting to etcd: attempt ", attempt+1, " for ", rr.RelativePath)
|
||||||
|
|
||||||
httpPath = c.getHttpPath(rr.RelativePath)
|
// get httpPath if not set
|
||||||
|
if httpPath == "" {
|
||||||
|
httpPath = c.getHttpPath(rr.RelativePath)
|
||||||
|
}
|
||||||
|
|
||||||
// Return a cURL command if curlChan is set
|
// Return a cURL command if curlChan is set
|
||||||
if c.cURLch != nil {
|
if c.cURLch != nil {
|
||||||
|
@ -196,6 +199,9 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
|
||||||
for key, value := range rr.Values {
|
for key, value := range rr.Values {
|
||||||
command += fmt.Sprintf(" -d %s=%s", key, value[0])
|
command += fmt.Sprintf(" -d %s=%s", key, value[0])
|
||||||
}
|
}
|
||||||
|
if c.credentials != nil {
|
||||||
|
command += fmt.Sprintf(" -u %s", c.credentials.username)
|
||||||
|
}
|
||||||
c.sendCURL(command)
|
c.sendCURL(command)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -225,7 +231,13 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if c.credentials != nil {
|
||||||
|
req.SetBasicAuth(c.credentials.username, c.credentials.password)
|
||||||
|
}
|
||||||
|
|
||||||
resp, err = c.httpClient.Do(req)
|
resp, err = c.httpClient.Do(req)
|
||||||
|
// clear previous httpPath
|
||||||
|
httpPath = ""
|
||||||
defer func() {
|
defer func() {
|
||||||
if resp != nil {
|
if resp != nil {
|
||||||
resp.Body.Close()
|
resp.Body.Close()
|
||||||
|
@ -280,6 +292,19 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if resp.StatusCode == http.StatusTemporaryRedirect {
|
||||||
|
u, err := resp.Location()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
logger.Warning(err)
|
||||||
|
} else {
|
||||||
|
// set httpPath for following redirection
|
||||||
|
httpPath = u.String()
|
||||||
|
}
|
||||||
|
resp.Body.Close()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
if checkErr := checkRetry(c.cluster, numReqs, *resp,
|
if checkErr := checkRetry(c.cluster, numReqs, *resp,
|
||||||
errors.New("Unexpected HTTP status code")); checkErr != nil {
|
errors.New("Unexpected HTTP status code")); checkErr != nil {
|
||||||
return nil, checkErr
|
return nil, checkErr
|
||||||
|
@ -302,21 +327,40 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) {
|
||||||
func DefaultCheckRetry(cluster *Cluster, numReqs int, lastResp http.Response,
|
func DefaultCheckRetry(cluster *Cluster, numReqs int, lastResp http.Response,
|
||||||
err error) error {
|
err error) error {
|
||||||
|
|
||||||
if numReqs >= 2*len(cluster.Machines) {
|
if numReqs > 2*len(cluster.Machines) {
|
||||||
return newError(ErrCodeEtcdNotReachable,
|
errStr := fmt.Sprintf("failed to propose on members %v twice [last error: %v]", cluster.Machines, err)
|
||||||
"Tried to connect to each peer twice and failed", 0)
|
return newError(ErrCodeEtcdNotReachable, errStr, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
code := lastResp.StatusCode
|
if isEmptyResponse(lastResp) {
|
||||||
if code == http.StatusInternalServerError {
|
// always retry if it failed to get response from one machine
|
||||||
time.Sleep(time.Millisecond * 200)
|
return nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
if !shouldRetry(lastResp) {
|
||||||
logger.Warning("bad response status code", code)
|
body := []byte("nil")
|
||||||
|
if lastResp.Body != nil {
|
||||||
|
if b, err := ioutil.ReadAll(lastResp.Body); err == nil {
|
||||||
|
body = b
|
||||||
|
}
|
||||||
|
}
|
||||||
|
errStr := fmt.Sprintf("unhandled http status [%s] with body [%s]", http.StatusText(lastResp.StatusCode), body)
|
||||||
|
return newError(ErrCodeUnhandledHTTPStatus, errStr, 0)
|
||||||
|
}
|
||||||
|
// sleep some time and expect leader election finish
|
||||||
|
time.Sleep(time.Millisecond * 200)
|
||||||
|
logger.Warning("bad response status code", lastResp.StatusCode)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func isEmptyResponse(r http.Response) bool { return r.StatusCode == 0 }
|
||||||
|
|
||||||
|
// shouldRetry returns whether the reponse deserves retry.
|
||||||
|
func shouldRetry(r http.Response) bool {
|
||||||
|
// TODO: only retry when the cluster is in leader election
|
||||||
|
// We cannot do it exactly because etcd doesn't support it well.
|
||||||
|
return r.StatusCode == http.StatusInternalServerError
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Client) getHttpPath(s ...string) string {
|
func (c *Client) getHttpPath(s ...string) string {
|
||||||
fullPath := c.cluster.pick() + "/" + version
|
fullPath := c.cluster.pick() + "/" + version
|
||||||
for _, seg := range s {
|
for _, seg := range s {
|
||||||
|
|
|
@ -1,3 +1,6 @@
|
||||||
package etcd
|
package etcd
|
||||||
|
|
||||||
const version = "v2"
|
const (
|
||||||
|
version = "v2"
|
||||||
|
packageVersion = "v2.0.0+git"
|
||||||
|
)
|
||||||
|
|
|
@ -47,13 +47,7 @@ func (s *Discovery) Initialize(uris string, heartbeat time.Duration) error {
|
||||||
|
|
||||||
// Creates a new store, will ignore options given
|
// Creates a new store, will ignore options given
|
||||||
// if not supported by the chosen store
|
// if not supported by the chosen store
|
||||||
s.store, err = store.CreateStore(
|
s.store, err = store.CreateStore(s.backend, addrs, nil)
|
||||||
s.backend,
|
|
||||||
addrs,
|
|
||||||
&store.Config{
|
|
||||||
Timeout: s.heartbeat,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -121,5 +115,14 @@ func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-c
|
||||||
|
|
||||||
// Register is exported
|
// Register is exported
|
||||||
func (s *Discovery) Register(addr string) error {
|
func (s *Discovery) Register(addr string) error {
|
||||||
return s.store.Put(path.Join(s.prefix, addr), []byte(addr))
|
opts := &store.WriteOptions{Ephemeral: true}
|
||||||
|
err := s.store.Put(path.Join(s.prefix, addr), []byte(addr), opts)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func convertToStringArray(entries []*store.KVPair) (addrs []string) {
|
||||||
|
for _, entry := range entries {
|
||||||
|
addrs = append(addrs, string(entry.Value))
|
||||||
|
}
|
||||||
|
return addrs
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,8 +19,9 @@ const (
|
||||||
|
|
||||||
// Consul embeds the client and watches
|
// Consul embeds the client and watches
|
||||||
type Consul struct {
|
type Consul struct {
|
||||||
config *api.Config
|
config *api.Config
|
||||||
client *api.Client
|
client *api.Client
|
||||||
|
ephemeralSession string
|
||||||
}
|
}
|
||||||
|
|
||||||
type consulLock struct {
|
type consulLock struct {
|
||||||
|
@ -39,12 +40,14 @@ func InitializeConsul(endpoints []string, options *Config) (Store, error) {
|
||||||
config.Address = endpoints[0]
|
config.Address = endpoints[0]
|
||||||
config.Scheme = "http"
|
config.Scheme = "http"
|
||||||
|
|
||||||
if options.TLS != nil {
|
// Set options
|
||||||
s.setTLS(options.TLS)
|
if options != nil {
|
||||||
}
|
if options.TLS != nil {
|
||||||
|
s.setTLS(options.TLS)
|
||||||
if options.Timeout != 0 {
|
}
|
||||||
s.setTimeout(options.Timeout)
|
if options.ConnectionTimeout != 0 {
|
||||||
|
s.setTimeout(options.ConnectionTimeout)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Creates a new client
|
// Creates a new client
|
||||||
|
@ -55,6 +58,17 @@ func InitializeConsul(endpoints []string, options *Config) (Store, error) {
|
||||||
}
|
}
|
||||||
s.client = client
|
s.client = client
|
||||||
|
|
||||||
|
// Create global ephemeral keys session
|
||||||
|
entry := &api.SessionEntry{
|
||||||
|
Behavior: api.SessionBehaviorDelete,
|
||||||
|
TTL: time.Duration(EphemeralTTL * time.Second).String(),
|
||||||
|
}
|
||||||
|
session, _, err := s.client.Session().Create(entry, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
s.ephemeralSession = session
|
||||||
|
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -91,11 +105,37 @@ func (s *Consul) Get(key string) (*KVPair, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put a value at "key"
|
// Put a value at "key"
|
||||||
func (s *Consul) Put(key string, value []byte) error {
|
func (s *Consul) Put(key string, value []byte, opts *WriteOptions) error {
|
||||||
p := &api.KVPair{Key: s.normalize(key), Value: value}
|
p := &api.KVPair{
|
||||||
if s.client == nil {
|
Key: s.normalize(key),
|
||||||
log.Error("Error initializing client")
|
Value: value,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if opts != nil && opts.Ephemeral {
|
||||||
|
p.Session = s.ephemeralSession
|
||||||
|
|
||||||
|
// Create lock option with the
|
||||||
|
// EphemeralSession
|
||||||
|
lockOpts := &api.LockOptions{
|
||||||
|
Key: key,
|
||||||
|
Session: s.ephemeralSession,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Lock and ignore if lock is held
|
||||||
|
// It's just a placeholder for the
|
||||||
|
// ephemeral behavior
|
||||||
|
lock, _ := s.client.LockOpts(lockOpts)
|
||||||
|
if lock != nil {
|
||||||
|
lock.Lock(nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to renew the session
|
||||||
|
_, _, err := s.client.Session().Renew(p.Session, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
_, err := s.client.KV().Put(p, nil)
|
_, err := s.client.KV().Put(p, nil)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -258,7 +298,7 @@ func (l *consulLock) Unlock() error {
|
||||||
|
|
||||||
// AtomicPut put a value at "key" if the key has not been
|
// AtomicPut put a value at "key" if the key has not been
|
||||||
// modified in the meantime, throws an error if this is the case
|
// modified in the meantime, throws an error if this is the case
|
||||||
func (s *Consul) AtomicPut(key string, value []byte, previous *KVPair) (bool, error) {
|
func (s *Consul) AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, error) {
|
||||||
p := &api.KVPair{Key: s.normalize(key), Value: value, ModifyIndex: previous.LastIndex}
|
p := &api.KVPair{Key: s.normalize(key), Value: value, ModifyIndex: previous.LastIndex}
|
||||||
if work, _, err := s.client.KV().CAS(p, nil); err != nil {
|
if work, _, err := s.client.KV().CAS(p, nil); err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
|
|
|
@ -23,12 +23,14 @@ func InitializeEtcd(addrs []string, options *Config) (Store, error) {
|
||||||
entries := createEndpoints(addrs, "http")
|
entries := createEndpoints(addrs, "http")
|
||||||
s.client = etcd.NewClient(entries)
|
s.client = etcd.NewClient(entries)
|
||||||
|
|
||||||
if options.TLS != nil {
|
// Set options
|
||||||
s.setTLS(options.TLS)
|
if options != nil {
|
||||||
}
|
if options.TLS != nil {
|
||||||
|
s.setTLS(options.TLS)
|
||||||
if options.Timeout != 0 {
|
}
|
||||||
s.setTimeout(options.Timeout)
|
if options.ConnectionTimeout != 0 {
|
||||||
|
s.setTimeout(options.ConnectionTimeout)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// FIXME sync on each operation?
|
// FIXME sync on each operation?
|
||||||
|
@ -66,7 +68,6 @@ func (s *Etcd) setTimeout(time time.Duration) {
|
||||||
|
|
||||||
// Create the entire path for a directory that does not exist
|
// Create the entire path for a directory that does not exist
|
||||||
func (s *Etcd) createDirectory(path string) error {
|
func (s *Etcd) createDirectory(path string) error {
|
||||||
// TODO Handle TTL at key/dir creation -> use K/V struct for key infos?
|
|
||||||
if _, err := s.client.CreateDir(normalize(path), 10); err != nil {
|
if _, err := s.client.CreateDir(normalize(path), 10); err != nil {
|
||||||
if etcdError, ok := err.(*etcd.EtcdError); ok {
|
if etcdError, ok := err.(*etcd.EtcdError); ok {
|
||||||
if etcdError.ErrorCode != 105 { // Skip key already exists
|
if etcdError.ErrorCode != 105 { // Skip key already exists
|
||||||
|
@ -96,19 +97,27 @@ func (s *Etcd) Get(key string) (*KVPair, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put a value at "key"
|
// Put a value at "key"
|
||||||
func (s *Etcd) Put(key string, value []byte) error {
|
func (s *Etcd) Put(key string, value []byte, opts *WriteOptions) error {
|
||||||
if _, err := s.client.Set(key, string(value), 0); err != nil {
|
|
||||||
|
// Default TTL = 0 means no expiration
|
||||||
|
ttl := DefaultTTL
|
||||||
|
if opts != nil && opts.Ephemeral {
|
||||||
|
ttl = EphemeralTTL
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := s.client.Set(key, string(value), uint64(ttl)); err != nil {
|
||||||
if etcdError, ok := err.(*etcd.EtcdError); ok {
|
if etcdError, ok := err.(*etcd.EtcdError); ok {
|
||||||
if etcdError.ErrorCode == 104 { // Not a directory
|
if etcdError.ErrorCode == 104 { // Not a directory
|
||||||
// Remove the last element (the actual key) and set the prefix as a dir
|
// Remove the last element (the actual key) and set the prefix as a dir
|
||||||
err = s.createDirectory(getDirectory(key))
|
err = s.createDirectory(getDirectory(key))
|
||||||
if _, err := s.client.Set(key, string(value), 0); err != nil {
|
if _, err := s.client.Set(key, string(value), uint64(ttl)); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -223,7 +232,7 @@ func (s *Etcd) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPai
|
||||||
|
|
||||||
// AtomicPut put a value at "key" if the key has not been
|
// AtomicPut put a value at "key" if the key has not been
|
||||||
// modified in the meantime, throws an error if this is the case
|
// modified in the meantime, throws an error if this is the case
|
||||||
func (s *Etcd) AtomicPut(key string, value []byte, previous *KVPair) (bool, error) {
|
func (s *Etcd) AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, error) {
|
||||||
_, err := s.client.CompareAndSwap(normalize(key), string(value), 0, "", previous.LastIndex)
|
_, err := s.client.CompareAndSwap(normalize(key), string(value), 0, "", previous.LastIndex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
|
|
|
@ -21,8 +21,8 @@ func InitializeMock(endpoints []string, options *Config) (Store, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put mock
|
// Put mock
|
||||||
func (s *Mock) Put(key string, value []byte) error {
|
func (s *Mock) Put(key string, value []byte, opts *WriteOptions) error {
|
||||||
args := s.Mock.Called(key, value)
|
args := s.Mock.Called(key, value, opts)
|
||||||
return args.Error(0)
|
return args.Error(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -75,8 +75,8 @@ func (s *Mock) DeleteTree(prefix string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// AtomicPut mock
|
// AtomicPut mock
|
||||||
func (s *Mock) AtomicPut(key string, value []byte, previous *KVPair) (bool, error) {
|
func (s *Mock) AtomicPut(key string, value []byte, previous *KVPair, opts *WriteOptions) (bool, error) {
|
||||||
args := s.Mock.Called(key, value, previous)
|
args := s.Mock.Called(key, value, previous, opts)
|
||||||
return args.Bool(0), args.Error(1)
|
return args.Bool(0), args.Error(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -41,8 +41,8 @@ var (
|
||||||
|
|
||||||
// Config contains the options for a storage client
|
// Config contains the options for a storage client
|
||||||
type Config struct {
|
type Config struct {
|
||||||
TLS *tls.Config
|
TLS *tls.Config
|
||||||
Timeout time.Duration
|
ConnectionTimeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store represents the backend K/V storage
|
// Store represents the backend K/V storage
|
||||||
|
@ -51,7 +51,7 @@ type Config struct {
|
||||||
// backend for libkv
|
// backend for libkv
|
||||||
type Store interface {
|
type Store interface {
|
||||||
// Put a value at the specified key
|
// Put a value at the specified key
|
||||||
Put(key string, value []byte) error
|
Put(key string, value []byte, options *WriteOptions) error
|
||||||
|
|
||||||
// Get a value given its key
|
// Get a value given its key
|
||||||
Get(key string) (*KVPair, error)
|
Get(key string) (*KVPair, error)
|
||||||
|
@ -86,12 +86,24 @@ type Store interface {
|
||||||
DeleteTree(prefix string) error
|
DeleteTree(prefix string) error
|
||||||
|
|
||||||
// Atomic operation on a single value
|
// Atomic operation on a single value
|
||||||
AtomicPut(key string, value []byte, previous *KVPair) (bool, error)
|
AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, error)
|
||||||
|
|
||||||
// Atomic delete of a single value
|
// Atomic delete of a single value
|
||||||
AtomicDelete(key string, previous *KVPair) (bool, error)
|
AtomicDelete(key string, previous *KVPair) (bool, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
// DefaultTTL is the default time used for a node
|
||||||
|
// to be removed, it is set to 0 to explain
|
||||||
|
// that there is no expiration
|
||||||
|
DefaultTTL = 0
|
||||||
|
|
||||||
|
// EphemeralTTL is used for the ephemeral node
|
||||||
|
// behavior. If the node session is not renewed
|
||||||
|
// before the ttl expires, the node is removed
|
||||||
|
EphemeralTTL = 60
|
||||||
|
)
|
||||||
|
|
||||||
// KVPair represents {Key, Value, Lastindex} tuple
|
// KVPair represents {Key, Value, Lastindex} tuple
|
||||||
type KVPair struct {
|
type KVPair struct {
|
||||||
Key string
|
Key string
|
||||||
|
@ -99,6 +111,11 @@ type KVPair struct {
|
||||||
LastIndex uint64
|
LastIndex uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WriteOptions contains optional request parameters
|
||||||
|
type WriteOptions struct {
|
||||||
|
Ephemeral bool
|
||||||
|
}
|
||||||
|
|
||||||
// WatchCallback is used for watch methods on keys
|
// WatchCallback is used for watch methods on keys
|
||||||
// and is triggered on key change
|
// and is triggered on key change
|
||||||
type WatchCallback func(entries ...*KVPair)
|
type WatchCallback func(entries ...*KVPair)
|
||||||
|
|
|
@ -24,8 +24,11 @@ func InitializeZookeeper(endpoints []string, options *Config) (Store, error) {
|
||||||
s := &Zookeeper{}
|
s := &Zookeeper{}
|
||||||
s.timeout = 5 * time.Second // default timeout
|
s.timeout = 5 * time.Second // default timeout
|
||||||
|
|
||||||
if options.Timeout != 0 {
|
// Set options
|
||||||
s.setTimeout(options.Timeout)
|
if options != nil {
|
||||||
|
if options.ConnectionTimeout != 0 {
|
||||||
|
s.setTimeout(options.ConnectionTimeout)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
conn, _, err := zk.Connect(endpoints, s.timeout)
|
conn, _, err := zk.Connect(endpoints, s.timeout)
|
||||||
|
@ -56,9 +59,13 @@ func (s *Zookeeper) Get(key string) (*KVPair, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create the entire path for a directory that does not exist
|
// Create the entire path for a directory that does not exist
|
||||||
func (s *Zookeeper) createFullpath(path []string) error {
|
func (s *Zookeeper) createFullpath(path []string, ephemeral bool) error {
|
||||||
for i := 1; i <= len(path); i++ {
|
for i := 1; i <= len(path); i++ {
|
||||||
newpath := "/" + strings.Join(path[:i], "/")
|
newpath := "/" + strings.Join(path[:i], "/")
|
||||||
|
if i == len(path) && ephemeral {
|
||||||
|
_, err := s.client.Create(newpath, []byte{1}, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
|
||||||
|
return err
|
||||||
|
}
|
||||||
_, err := s.client.Create(newpath, []byte{1}, 0, zk.WorldACL(zk.PermAll))
|
_, err := s.client.Create(newpath, []byte{1}, 0, zk.WorldACL(zk.PermAll))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Skip if node already exists
|
// Skip if node already exists
|
||||||
|
@ -71,14 +78,18 @@ func (s *Zookeeper) createFullpath(path []string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put a value at "key"
|
// Put a value at "key"
|
||||||
func (s *Zookeeper) Put(key string, value []byte) error {
|
func (s *Zookeeper) Put(key string, value []byte, opts *WriteOptions) error {
|
||||||
fkey := normalize(key)
|
fkey := normalize(key)
|
||||||
exists, err := s.Exists(key)
|
exists, err := s.Exists(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if !exists {
|
if !exists {
|
||||||
s.createFullpath(splitKey(key))
|
if opts != nil && opts.Ephemeral {
|
||||||
|
s.createFullpath(splitKey(key), opts.Ephemeral)
|
||||||
|
} else {
|
||||||
|
s.createFullpath(splitKey(key), false)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
_, err = s.client.Set(fkey, value, -1)
|
_, err = s.client.Set(fkey, value, -1)
|
||||||
return err
|
return err
|
||||||
|
@ -198,7 +209,7 @@ func (s *Zookeeper) DeleteTree(prefix string) error {
|
||||||
|
|
||||||
// AtomicPut put a value at "key" if the key has not been
|
// AtomicPut put a value at "key" if the key has not been
|
||||||
// modified in the meantime, throws an error if this is the case
|
// modified in the meantime, throws an error if this is the case
|
||||||
func (s *Zookeeper) AtomicPut(key string, value []byte, previous *KVPair) (bool, error) {
|
func (s *Zookeeper) AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, error) {
|
||||||
// Use index of Set method to implement CAS
|
// Use index of Set method to implement CAS
|
||||||
return false, ErrNotImplemented
|
return false, ErrNotImplemented
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue