From 4408a9cc8ca070a61caa006578bee6e609afe13c Mon Sep 17 00:00:00 2001 From: Alexandre Beslic Date: Fri, 15 May 2015 13:44:18 -0700 Subject: [PATCH] Add Ephemeral for Consul, Zookeeper and Etcd (Etcd abd Consul are using TTLs in the background) Signed-off-by: Alexandre Beslic --- .../github.com/coreos/go-etcd/etcd/client.go | 75 +++++++++++++------ .../coreos/go-etcd/etcd/client_test.go | 12 +++ .../github.com/coreos/go-etcd/etcd/cluster.go | 5 +- .../github.com/coreos/go-etcd/etcd/error.go | 3 +- .../github.com/coreos/go-etcd/etcd/member.go | 30 ++++++++ .../coreos/go-etcd/etcd/member_test.go | 71 ++++++++++++++++++ .../coreos/go-etcd/etcd/requests.go | 64 +++++++++++++--- .../github.com/coreos/go-etcd/etcd/version.go | 5 +- discovery/kv/kv.go | 19 +++-- pkg/store/consul.go | 66 ++++++++++++---- pkg/store/etcd.go | 31 +++++--- pkg/store/mock.go | 8 +- pkg/store/store.go | 25 ++++++- pkg/store/zookeeper.go | 23 ++++-- 14 files changed, 357 insertions(+), 80 deletions(-) create mode 100644 Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/member.go create mode 100644 Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/member_test.go diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/client.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/client.go index 8ecb50ee53..c6cf3341ba 100644 --- a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/client.go +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/client.go @@ -15,8 +15,6 @@ import ( "path" "strings" "time" - - "github.com/coreos/etcd/etcdserver/etcdhttp/httptypes" ) // See SetConsistency for how to use these constants. @@ -44,10 +42,17 @@ type Config struct { Consistency string `json:"consistency"` } +type credentials struct { + username string + password string +} + type Client struct { config Config `json:"config"` cluster *Cluster `json:"cluster"` httpClient *http.Client + credentials *credentials + transport *http.Transport persistence io.Writer cURLch chan string // CheckRetry can be used to control the policy for failed requests @@ -172,17 +177,27 @@ func NewClientFromReader(reader io.Reader) (*Client, error) { // Override the Client's HTTP Transport object func (c *Client) SetTransport(tr *http.Transport) { c.httpClient.Transport = tr + c.transport = tr +} + +func (c *Client) SetCredentials(username, password string) { + c.credentials = &credentials{username, password} +} + +func (c *Client) Close() { + c.transport.DisableKeepAlives = true + c.transport.CloseIdleConnections() } // initHTTPClient initializes a HTTP client for etcd client func (c *Client) initHTTPClient() { - tr := &http.Transport{ + c.transport = &http.Transport{ Dial: c.dial, TLSClientConfig: &tls.Config{ InsecureSkipVerify: true, }, } - c.httpClient = &http.Client{Transport: tr} + c.httpClient = &http.Client{Transport: c.transport} } // initHTTPClient initializes a HTTPS client for etcd client @@ -305,31 +320,49 @@ func (c *Client) internalSyncCluster(machines []string) bool { continue } - b, err := ioutil.ReadAll(resp.Body) - resp.Body.Close() - if err != nil { - // try another machine in the cluster - continue - } + if resp.StatusCode != http.StatusOK { // fall-back to old endpoint + httpPath := c.createHttpPath(machine, path.Join(version, "machines")) + resp, err := c.httpClient.Get(httpPath) + if err != nil { + // try another machine in the cluster + continue + } + b, err := ioutil.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + // try another machine in the cluster + continue + } + // update Machines List + c.cluster.updateFromStr(string(b)) + } else { + b, err := ioutil.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + // try another machine in the cluster + continue + } - var mCollection httptypes.MemberCollection - if err := json.Unmarshal(b, &mCollection); err != nil { - // try another machine - continue - } + var mCollection memberCollection + if err := json.Unmarshal(b, &mCollection); err != nil { + // try another machine + continue + } - urls := make([]string, 0) - for _, m := range mCollection { - urls = append(urls, m.ClientURLs...) - } + urls := make([]string, 0) + for _, m := range mCollection { + urls = append(urls, m.ClientURLs...) + } - // update Machines List - c.cluster.updateFromStr(strings.Join(urls, ",")) + // update Machines List + c.cluster.updateFromStr(strings.Join(urls, ",")) + } logger.Debug("sync.machines ", c.cluster.Machines) c.saveConfig() return true } + return false } diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/client_test.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/client_test.go index 66d79d7332..4720d8d693 100644 --- a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/client_test.go +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/client_test.go @@ -94,3 +94,15 @@ func TestPersistence(t *testing.T) { t.Fatalf("The two configs should be equal!") } } + +func TestClientRetry(t *testing.T) { + c := NewClient([]string{"http://strange", "http://127.0.0.1:4001"}) + // use first endpoint as the picked url + c.cluster.picked = 0 + if _, err := c.Set("foo", "bar", 5); err != nil { + t.Fatal(err) + } + if _, err := c.Delete("foo", true); err != nil { + t.Fatal(err) + } +} diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/cluster.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/cluster.go index 70fd6a6ee0..1ad3e155be 100644 --- a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/cluster.go +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/cluster.go @@ -30,5 +30,8 @@ func (cl *Cluster) pick() string { return cl.Machines[cl.picked] } func (cl *Cluster) updateFromStr(machines string) { cl.Machines = strings.Split(machines, ",") - cl.picked = rand.Intn(len(machines)) + for i := range cl.Machines { + cl.Machines[i] = strings.TrimSpace(cl.Machines[i]) + } + cl.picked = rand.Intn(len(cl.Machines)) } diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/error.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/error.go index 7e69287247..66dca54b5c 100644 --- a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/error.go +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/error.go @@ -6,7 +6,8 @@ import ( ) const ( - ErrCodeEtcdNotReachable = 501 + ErrCodeEtcdNotReachable = 501 + ErrCodeUnhandledHTTPStatus = 502 ) var ( diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/member.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/member.go new file mode 100644 index 0000000000..5b13b28e1a --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/member.go @@ -0,0 +1,30 @@ +package etcd + +import "encoding/json" + +type Member struct { + ID string `json:"id"` + Name string `json:"name"` + PeerURLs []string `json:"peerURLs"` + ClientURLs []string `json:"clientURLs"` +} + +type memberCollection []Member + +func (c *memberCollection) UnmarshalJSON(data []byte) error { + d := struct { + Members []Member + }{} + + if err := json.Unmarshal(data, &d); err != nil { + return err + } + + if d.Members == nil { + *c = make([]Member, 0) + return nil + } + + *c = d.Members + return nil +} diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/member_test.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/member_test.go new file mode 100644 index 0000000000..53ebdd4bfd --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/member_test.go @@ -0,0 +1,71 @@ +package etcd + +import ( + "encoding/json" + "reflect" + "testing" +) + +func TestMemberCollectionUnmarshal(t *testing.T) { + tests := []struct { + body []byte + want memberCollection + }{ + { + body: []byte(`{"members":[]}`), + want: memberCollection([]Member{}), + }, + { + body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`), + want: memberCollection( + []Member{ + { + ID: "2745e2525fce8fe", + Name: "node3", + PeerURLs: []string{ + "http://127.0.0.1:7003", + }, + ClientURLs: []string{ + "http://127.0.0.1:4003", + }, + }, + { + ID: "42134f434382925", + Name: "node1", + PeerURLs: []string{ + "http://127.0.0.1:2380", + "http://127.0.0.1:7001", + }, + ClientURLs: []string{ + "http://127.0.0.1:2379", + "http://127.0.0.1:4001", + }, + }, + { + ID: "94088180e21eb87b", + Name: "node2", + PeerURLs: []string{ + "http://127.0.0.1:7002", + }, + ClientURLs: []string{ + "http://127.0.0.1:4002", + }, + }, + }, + ), + }, + } + + for i, tt := range tests { + var got memberCollection + err := json.Unmarshal(tt.body, &got) + if err != nil { + t.Errorf("#%d: unexpected error: %v", i, err) + continue + } + + if !reflect.DeepEqual(tt.want, got) { + t.Errorf("#%d: incorrect output: want=%#v, got=%#v", i, tt.want, got) + } + } +} diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/requests.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/requests.go index e469101b5d..3c3f436bea 100644 --- a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/requests.go +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/requests.go @@ -188,7 +188,10 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) { logger.Debug("Connecting to etcd: attempt ", attempt+1, " for ", rr.RelativePath) - httpPath = c.getHttpPath(rr.RelativePath) + // get httpPath if not set + if httpPath == "" { + httpPath = c.getHttpPath(rr.RelativePath) + } // Return a cURL command if curlChan is set if c.cURLch != nil { @@ -196,6 +199,9 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) { for key, value := range rr.Values { command += fmt.Sprintf(" -d %s=%s", key, value[0]) } + if c.credentials != nil { + command += fmt.Sprintf(" -u %s", c.credentials.username) + } c.sendCURL(command) } @@ -225,7 +231,13 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) { return nil, err } + if c.credentials != nil { + req.SetBasicAuth(c.credentials.username, c.credentials.password) + } + resp, err = c.httpClient.Do(req) + // clear previous httpPath + httpPath = "" defer func() { if resp != nil { resp.Body.Close() @@ -280,6 +292,19 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) { } } + if resp.StatusCode == http.StatusTemporaryRedirect { + u, err := resp.Location() + + if err != nil { + logger.Warning(err) + } else { + // set httpPath for following redirection + httpPath = u.String() + } + resp.Body.Close() + continue + } + if checkErr := checkRetry(c.cluster, numReqs, *resp, errors.New("Unexpected HTTP status code")); checkErr != nil { return nil, checkErr @@ -302,21 +327,40 @@ func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) { func DefaultCheckRetry(cluster *Cluster, numReqs int, lastResp http.Response, err error) error { - if numReqs >= 2*len(cluster.Machines) { - return newError(ErrCodeEtcdNotReachable, - "Tried to connect to each peer twice and failed", 0) + if numReqs > 2*len(cluster.Machines) { + errStr := fmt.Sprintf("failed to propose on members %v twice [last error: %v]", cluster.Machines, err) + return newError(ErrCodeEtcdNotReachable, errStr, 0) } - code := lastResp.StatusCode - if code == http.StatusInternalServerError { - time.Sleep(time.Millisecond * 200) - + if isEmptyResponse(lastResp) { + // always retry if it failed to get response from one machine + return nil } - - logger.Warning("bad response status code", code) + if !shouldRetry(lastResp) { + body := []byte("nil") + if lastResp.Body != nil { + if b, err := ioutil.ReadAll(lastResp.Body); err == nil { + body = b + } + } + errStr := fmt.Sprintf("unhandled http status [%s] with body [%s]", http.StatusText(lastResp.StatusCode), body) + return newError(ErrCodeUnhandledHTTPStatus, errStr, 0) + } + // sleep some time and expect leader election finish + time.Sleep(time.Millisecond * 200) + logger.Warning("bad response status code", lastResp.StatusCode) return nil } +func isEmptyResponse(r http.Response) bool { return r.StatusCode == 0 } + +// shouldRetry returns whether the reponse deserves retry. +func shouldRetry(r http.Response) bool { + // TODO: only retry when the cluster is in leader election + // We cannot do it exactly because etcd doesn't support it well. + return r.StatusCode == http.StatusInternalServerError +} + func (c *Client) getHttpPath(s ...string) string { fullPath := c.cluster.pick() + "/" + version for _, seg := range s { diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/version.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/version.go index b3d05df70b..b1e9ed2713 100644 --- a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/version.go +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/version.go @@ -1,3 +1,6 @@ package etcd -const version = "v2" +const ( + version = "v2" + packageVersion = "v2.0.0+git" +) diff --git a/discovery/kv/kv.go b/discovery/kv/kv.go index 3bffb105c9..ca328b95df 100644 --- a/discovery/kv/kv.go +++ b/discovery/kv/kv.go @@ -47,13 +47,7 @@ func (s *Discovery) Initialize(uris string, heartbeat time.Duration) error { // Creates a new store, will ignore options given // if not supported by the chosen store - s.store, err = store.CreateStore( - s.backend, - addrs, - &store.Config{ - Timeout: s.heartbeat, - }, - ) + s.store, err = store.CreateStore(s.backend, addrs, nil) return err } @@ -121,5 +115,14 @@ func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-c // Register is exported func (s *Discovery) Register(addr string) error { - return s.store.Put(path.Join(s.prefix, addr), []byte(addr)) + opts := &store.WriteOptions{Ephemeral: true} + err := s.store.Put(path.Join(s.prefix, addr), []byte(addr), opts) + return err +} + +func convertToStringArray(entries []*store.KVPair) (addrs []string) { + for _, entry := range entries { + addrs = append(addrs, string(entry.Value)) + } + return addrs } diff --git a/pkg/store/consul.go b/pkg/store/consul.go index f2696453b1..b97b04fcd6 100644 --- a/pkg/store/consul.go +++ b/pkg/store/consul.go @@ -19,8 +19,9 @@ const ( // Consul embeds the client and watches type Consul struct { - config *api.Config - client *api.Client + config *api.Config + client *api.Client + ephemeralSession string } type consulLock struct { @@ -39,12 +40,14 @@ func InitializeConsul(endpoints []string, options *Config) (Store, error) { config.Address = endpoints[0] config.Scheme = "http" - if options.TLS != nil { - s.setTLS(options.TLS) - } - - if options.Timeout != 0 { - s.setTimeout(options.Timeout) + // Set options + if options != nil { + if options.TLS != nil { + s.setTLS(options.TLS) + } + if options.ConnectionTimeout != 0 { + s.setTimeout(options.ConnectionTimeout) + } } // Creates a new client @@ -55,6 +58,17 @@ func InitializeConsul(endpoints []string, options *Config) (Store, error) { } s.client = client + // Create global ephemeral keys session + entry := &api.SessionEntry{ + Behavior: api.SessionBehaviorDelete, + TTL: time.Duration(EphemeralTTL * time.Second).String(), + } + session, _, err := s.client.Session().Create(entry, nil) + if err != nil { + return nil, err + } + s.ephemeralSession = session + return s, nil } @@ -91,11 +105,37 @@ func (s *Consul) Get(key string) (*KVPair, error) { } // Put a value at "key" -func (s *Consul) Put(key string, value []byte) error { - p := &api.KVPair{Key: s.normalize(key), Value: value} - if s.client == nil { - log.Error("Error initializing client") +func (s *Consul) Put(key string, value []byte, opts *WriteOptions) error { + p := &api.KVPair{ + Key: s.normalize(key), + Value: value, } + + if opts != nil && opts.Ephemeral { + p.Session = s.ephemeralSession + + // Create lock option with the + // EphemeralSession + lockOpts := &api.LockOptions{ + Key: key, + Session: s.ephemeralSession, + } + + // Lock and ignore if lock is held + // It's just a placeholder for the + // ephemeral behavior + lock, _ := s.client.LockOpts(lockOpts) + if lock != nil { + lock.Lock(nil) + } + + // Try to renew the session + _, _, err := s.client.Session().Renew(p.Session, nil) + if err != nil { + return err + } + } + _, err := s.client.KV().Put(p, nil) return err } @@ -258,7 +298,7 @@ func (l *consulLock) Unlock() error { // AtomicPut put a value at "key" if the key has not been // modified in the meantime, throws an error if this is the case -func (s *Consul) AtomicPut(key string, value []byte, previous *KVPair) (bool, error) { +func (s *Consul) AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, error) { p := &api.KVPair{Key: s.normalize(key), Value: value, ModifyIndex: previous.LastIndex} if work, _, err := s.client.KV().CAS(p, nil); err != nil { return false, err diff --git a/pkg/store/etcd.go b/pkg/store/etcd.go index 4e09e69512..50f6522926 100644 --- a/pkg/store/etcd.go +++ b/pkg/store/etcd.go @@ -23,12 +23,14 @@ func InitializeEtcd(addrs []string, options *Config) (Store, error) { entries := createEndpoints(addrs, "http") s.client = etcd.NewClient(entries) - if options.TLS != nil { - s.setTLS(options.TLS) - } - - if options.Timeout != 0 { - s.setTimeout(options.Timeout) + // Set options + if options != nil { + if options.TLS != nil { + s.setTLS(options.TLS) + } + if options.ConnectionTimeout != 0 { + s.setTimeout(options.ConnectionTimeout) + } } // FIXME sync on each operation? @@ -66,7 +68,6 @@ func (s *Etcd) setTimeout(time time.Duration) { // Create the entire path for a directory that does not exist func (s *Etcd) createDirectory(path string) error { - // TODO Handle TTL at key/dir creation -> use K/V struct for key infos? if _, err := s.client.CreateDir(normalize(path), 10); err != nil { if etcdError, ok := err.(*etcd.EtcdError); ok { if etcdError.ErrorCode != 105 { // Skip key already exists @@ -96,19 +97,27 @@ func (s *Etcd) Get(key string) (*KVPair, error) { } // Put a value at "key" -func (s *Etcd) Put(key string, value []byte) error { - if _, err := s.client.Set(key, string(value), 0); err != nil { +func (s *Etcd) Put(key string, value []byte, opts *WriteOptions) error { + + // Default TTL = 0 means no expiration + ttl := DefaultTTL + if opts != nil && opts.Ephemeral { + ttl = EphemeralTTL + } + + if _, err := s.client.Set(key, string(value), uint64(ttl)); err != nil { if etcdError, ok := err.(*etcd.EtcdError); ok { if etcdError.ErrorCode == 104 { // Not a directory // Remove the last element (the actual key) and set the prefix as a dir err = s.createDirectory(getDirectory(key)) - if _, err := s.client.Set(key, string(value), 0); err != nil { + if _, err := s.client.Set(key, string(value), uint64(ttl)); err != nil { return err } } } return err } + return nil } @@ -223,7 +232,7 @@ func (s *Etcd) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*KVPai // AtomicPut put a value at "key" if the key has not been // modified in the meantime, throws an error if this is the case -func (s *Etcd) AtomicPut(key string, value []byte, previous *KVPair) (bool, error) { +func (s *Etcd) AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, error) { _, err := s.client.CompareAndSwap(normalize(key), string(value), 0, "", previous.LastIndex) if err != nil { return false, err diff --git a/pkg/store/mock.go b/pkg/store/mock.go index 3b70955628..71a55ba2e7 100644 --- a/pkg/store/mock.go +++ b/pkg/store/mock.go @@ -21,8 +21,8 @@ func InitializeMock(endpoints []string, options *Config) (Store, error) { } // Put mock -func (s *Mock) Put(key string, value []byte) error { - args := s.Mock.Called(key, value) +func (s *Mock) Put(key string, value []byte, opts *WriteOptions) error { + args := s.Mock.Called(key, value, opts) return args.Error(0) } @@ -75,8 +75,8 @@ func (s *Mock) DeleteTree(prefix string) error { } // AtomicPut mock -func (s *Mock) AtomicPut(key string, value []byte, previous *KVPair) (bool, error) { - args := s.Mock.Called(key, value, previous) +func (s *Mock) AtomicPut(key string, value []byte, previous *KVPair, opts *WriteOptions) (bool, error) { + args := s.Mock.Called(key, value, previous, opts) return args.Bool(0), args.Error(1) } diff --git a/pkg/store/store.go b/pkg/store/store.go index d10ecab503..dfc352d535 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -41,8 +41,8 @@ var ( // Config contains the options for a storage client type Config struct { - TLS *tls.Config - Timeout time.Duration + TLS *tls.Config + ConnectionTimeout time.Duration } // Store represents the backend K/V storage @@ -51,7 +51,7 @@ type Config struct { // backend for libkv type Store interface { // Put a value at the specified key - Put(key string, value []byte) error + Put(key string, value []byte, options *WriteOptions) error // Get a value given its key Get(key string) (*KVPair, error) @@ -86,12 +86,24 @@ type Store interface { DeleteTree(prefix string) error // Atomic operation on a single value - AtomicPut(key string, value []byte, previous *KVPair) (bool, error) + AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, error) // Atomic delete of a single value AtomicDelete(key string, previous *KVPair) (bool, error) } +const ( + // DefaultTTL is the default time used for a node + // to be removed, it is set to 0 to explain + // that there is no expiration + DefaultTTL = 0 + + // EphemeralTTL is used for the ephemeral node + // behavior. If the node session is not renewed + // before the ttl expires, the node is removed + EphemeralTTL = 60 +) + // KVPair represents {Key, Value, Lastindex} tuple type KVPair struct { Key string @@ -99,6 +111,11 @@ type KVPair struct { LastIndex uint64 } +// WriteOptions contains optional request parameters +type WriteOptions struct { + Ephemeral bool +} + // WatchCallback is used for watch methods on keys // and is triggered on key change type WatchCallback func(entries ...*KVPair) diff --git a/pkg/store/zookeeper.go b/pkg/store/zookeeper.go index b55e7f42f7..7ae92eb449 100644 --- a/pkg/store/zookeeper.go +++ b/pkg/store/zookeeper.go @@ -24,8 +24,11 @@ func InitializeZookeeper(endpoints []string, options *Config) (Store, error) { s := &Zookeeper{} s.timeout = 5 * time.Second // default timeout - if options.Timeout != 0 { - s.setTimeout(options.Timeout) + // Set options + if options != nil { + if options.ConnectionTimeout != 0 { + s.setTimeout(options.ConnectionTimeout) + } } conn, _, err := zk.Connect(endpoints, s.timeout) @@ -56,9 +59,13 @@ func (s *Zookeeper) Get(key string) (*KVPair, error) { } // Create the entire path for a directory that does not exist -func (s *Zookeeper) createFullpath(path []string) error { +func (s *Zookeeper) createFullpath(path []string, ephemeral bool) error { for i := 1; i <= len(path); i++ { newpath := "/" + strings.Join(path[:i], "/") + if i == len(path) && ephemeral { + _, err := s.client.Create(newpath, []byte{1}, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) + return err + } _, err := s.client.Create(newpath, []byte{1}, 0, zk.WorldACL(zk.PermAll)) if err != nil { // Skip if node already exists @@ -71,14 +78,18 @@ func (s *Zookeeper) createFullpath(path []string) error { } // Put a value at "key" -func (s *Zookeeper) Put(key string, value []byte) error { +func (s *Zookeeper) Put(key string, value []byte, opts *WriteOptions) error { fkey := normalize(key) exists, err := s.Exists(key) if err != nil { return err } if !exists { - s.createFullpath(splitKey(key)) + if opts != nil && opts.Ephemeral { + s.createFullpath(splitKey(key), opts.Ephemeral) + } else { + s.createFullpath(splitKey(key), false) + } } _, err = s.client.Set(fkey, value, -1) return err @@ -198,7 +209,7 @@ func (s *Zookeeper) DeleteTree(prefix string) error { // AtomicPut put a value at "key" if the key has not been // modified in the meantime, throws an error if this is the case -func (s *Zookeeper) AtomicPut(key string, value []byte, previous *KVPair) (bool, error) { +func (s *Zookeeper) AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, error) { // Use index of Set method to implement CAS return false, ErrNotImplemented }