update godeps to include libkv

Signed-off-by: Alexandre Beslic <abronan@docker.com>
This commit is contained in:
Alexandre Beslic 2015-06-15 15:28:39 -07:00
parent 7d2e4c5787
commit c70f6f8afe
35 changed files with 2624 additions and 1237 deletions

14
Godeps/Godeps.json generated
View File

@ -20,16 +20,6 @@
"Comment": "1.2.0-62-gbf4a526",
"Rev": "bf4a526f48af7badd25d2cb02d587e1b01be3b50"
},
{
"ImportPath": "github.com/coreos/etcd/etcdserver/etcdhttp/httptypes",
"Comment": "v2.0.0-rc.1-288-g4c33d12",
"Rev": "4c33d12bf84fac9fd1b1a88cb8e95f6d4fab802a"
},
{
"ImportPath": "github.com/coreos/etcd/pkg/types",
"Comment": "v2.0.0-rc.1-288-g4c33d12",
"Rev": "4c33d12bf84fac9fd1b1a88cb8e95f6d4fab802a"
},
{
"ImportPath": "github.com/coreos/go-etcd/etcd",
"Comment": "v2.0.0-11-gcc90c7b",
@ -60,6 +50,10 @@
"Comment": "v1.4.1-3245-g443437f",
"Rev": "443437f5ea04da9d62bf3e05d7951f7d30e77d96"
},
{
"ImportPath": "github.com/docker/libkv",
"Rev": "e201296a473f7e796488b8bf82629f28ca13634b"
},
{
"ImportPath": "github.com/gogo/protobuf/proto",
"Rev": "bc946d07d1016848dfd2507f90f0859c9471681e"

View File

@ -1,19 +0,0 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/*
Package httptypes defines how etcd's HTTP API entities are serialized to and deserialized from JSON.
*/
package httptypes

View File

@ -1,49 +0,0 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package httptypes
import (
"encoding/json"
"log"
"net/http"
)
type HTTPError struct {
Message string `json:"message"`
// HTTP return code
Code int `json:"-"`
}
func (e HTTPError) Error() string {
return e.Message
}
// TODO(xiangli): handle http write errors
func (e HTTPError) WriteTo(w http.ResponseWriter) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(e.Code)
b, err := json.Marshal(e)
if err != nil {
log.Panicf("marshal HTTPError should never fail: %v", err)
}
w.Write(b)
}
func NewHTTPError(code int, m string) *HTTPError {
return &HTTPError{
Message: m,
Code: code,
}
}

View File

@ -1,47 +0,0 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package httptypes
import (
"net/http"
"net/http/httptest"
"reflect"
"testing"
)
func TestHTTPErrorWriteTo(t *testing.T) {
err := NewHTTPError(http.StatusBadRequest, "what a bad request you made!")
rr := httptest.NewRecorder()
err.WriteTo(rr)
wcode := http.StatusBadRequest
wheader := http.Header(map[string][]string{
"Content-Type": []string{"application/json"},
})
wbody := `{"message":"what a bad request you made!"}`
if wcode != rr.Code {
t.Errorf("HTTP status code %d, want %d", rr.Code, wcode)
}
if !reflect.DeepEqual(wheader, rr.HeaderMap) {
t.Errorf("HTTP headers %v, want %v", rr.HeaderMap, wheader)
}
gbody := rr.Body.String()
if wbody != gbody {
t.Errorf("HTTP body %q, want %q", gbody, wbody)
}
}

View File

@ -1,99 +0,0 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package httptypes
import (
"encoding/json"
"github.com/coreos/etcd/pkg/types"
)
type Member struct {
ID string `json:"id"`
Name string `json:"name"`
PeerURLs []string `json:"peerURLs"`
ClientURLs []string `json:"clientURLs"`
}
type MemberCreateRequest struct {
PeerURLs types.URLs
}
type MemberUpdateRequest struct {
MemberCreateRequest
}
func (m *MemberCreateRequest) MarshalJSON() ([]byte, error) {
s := struct {
PeerURLs []string `json:"peerURLs"`
}{
PeerURLs: make([]string, len(m.PeerURLs)),
}
for i, u := range m.PeerURLs {
s.PeerURLs[i] = u.String()
}
return json.Marshal(&s)
}
func (m *MemberCreateRequest) UnmarshalJSON(data []byte) error {
s := struct {
PeerURLs []string `json:"peerURLs"`
}{}
err := json.Unmarshal(data, &s)
if err != nil {
return err
}
urls, err := types.NewURLs(s.PeerURLs)
if err != nil {
return err
}
m.PeerURLs = urls
return nil
}
type MemberCollection []Member
func (c *MemberCollection) MarshalJSON() ([]byte, error) {
d := struct {
Members []Member `json:"members"`
}{
Members: []Member(*c),
}
return json.Marshal(d)
}
func (c *MemberCollection) UnmarshalJSON(data []byte) error {
d := struct {
Members []Member
}{}
if err := json.Unmarshal(data, &d); err != nil {
return err
}
if d.Members == nil {
*c = make([]Member, 0)
return nil
}
*c = d.Members
return nil
}

View File

@ -1,218 +0,0 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package httptypes
import (
"encoding/json"
"net/url"
"reflect"
"testing"
"github.com/coreos/etcd/pkg/types"
)
func TestMemberUnmarshal(t *testing.T) {
tests := []struct {
body []byte
wantMember Member
wantError bool
}{
// no URLs, just check ID & Name
{
body: []byte(`{"id": "c", "name": "dungarees"}`),
wantMember: Member{ID: "c", Name: "dungarees", PeerURLs: nil, ClientURLs: nil},
},
// both client and peer URLs
{
body: []byte(`{"peerURLs": ["http://127.0.0.1:4001"], "clientURLs": ["http://127.0.0.1:4001"]}`),
wantMember: Member{
PeerURLs: []string{
"http://127.0.0.1:4001",
},
ClientURLs: []string{
"http://127.0.0.1:4001",
},
},
},
// multiple peer URLs
{
body: []byte(`{"peerURLs": ["http://127.0.0.1:4001", "https://example.com"]}`),
wantMember: Member{
PeerURLs: []string{
"http://127.0.0.1:4001",
"https://example.com",
},
ClientURLs: nil,
},
},
// multiple client URLs
{
body: []byte(`{"clientURLs": ["http://127.0.0.1:4001", "https://example.com"]}`),
wantMember: Member{
PeerURLs: nil,
ClientURLs: []string{
"http://127.0.0.1:4001",
"https://example.com",
},
},
},
// invalid JSON
{
body: []byte(`{"peerU`),
wantError: true,
},
}
for i, tt := range tests {
got := Member{}
err := json.Unmarshal(tt.body, &got)
if tt.wantError != (err != nil) {
t.Errorf("#%d: want error %t, got %v", i, tt.wantError, err)
continue
}
if !reflect.DeepEqual(tt.wantMember, got) {
t.Errorf("#%d: incorrect output: want=%#v, got=%#v", i, tt.wantMember, got)
}
}
}
func TestMemberCollectionUnmarshal(t *testing.T) {
tests := []struct {
body []byte
want MemberCollection
}{
{
body: []byte(`{"members":[]}`),
want: MemberCollection([]Member{}),
},
{
body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`),
want: MemberCollection(
[]Member{
{
ID: "2745e2525fce8fe",
Name: "node3",
PeerURLs: []string{
"http://127.0.0.1:7003",
},
ClientURLs: []string{
"http://127.0.0.1:4003",
},
},
{
ID: "42134f434382925",
Name: "node1",
PeerURLs: []string{
"http://127.0.0.1:2380",
"http://127.0.0.1:7001",
},
ClientURLs: []string{
"http://127.0.0.1:2379",
"http://127.0.0.1:4001",
},
},
{
ID: "94088180e21eb87b",
Name: "node2",
PeerURLs: []string{
"http://127.0.0.1:7002",
},
ClientURLs: []string{
"http://127.0.0.1:4002",
},
},
},
),
},
}
for i, tt := range tests {
var got MemberCollection
err := json.Unmarshal(tt.body, &got)
if err != nil {
t.Errorf("#%d: unexpected error: %v", i, err)
continue
}
if !reflect.DeepEqual(tt.want, got) {
t.Errorf("#%d: incorrect output: want=%#v, got=%#v", i, tt.want, got)
}
}
}
func TestMemberCreateRequestUnmarshal(t *testing.T) {
body := []byte(`{"peerURLs": ["http://127.0.0.1:8081", "https://127.0.0.1:8080"]}`)
want := MemberCreateRequest{
PeerURLs: types.URLs([]url.URL{
url.URL{Scheme: "http", Host: "127.0.0.1:8081"},
url.URL{Scheme: "https", Host: "127.0.0.1:8080"},
}),
}
var req MemberCreateRequest
if err := json.Unmarshal(body, &req); err != nil {
t.Fatalf("Unmarshal returned unexpected err=%v", err)
}
if !reflect.DeepEqual(want, req) {
t.Fatalf("Failed to unmarshal MemberCreateRequest: want=%#v, got=%#v", want, req)
}
}
func TestMemberCreateRequestUnmarshalFail(t *testing.T) {
tests := [][]byte{
// invalid JSON
[]byte(``),
[]byte(`{`),
// spot-check validation done in types.NewURLs
[]byte(`{"peerURLs": "foo"}`),
[]byte(`{"peerURLs": ["."]}`),
[]byte(`{"peerURLs": []}`),
[]byte(`{"peerURLs": ["http://127.0.0.1:4001/foo"]}`),
[]byte(`{"peerURLs": ["http://127.0.0.1"]}`),
}
for i, tt := range tests {
var req MemberCreateRequest
if err := json.Unmarshal(tt, &req); err == nil {
t.Errorf("#%d: expected err, got nil", i)
}
}
}
func TestMemberCreateRequestMarshal(t *testing.T) {
req := MemberCreateRequest{
PeerURLs: types.URLs([]url.URL{
url.URL{Scheme: "http", Host: "127.0.0.1:8081"},
url.URL{Scheme: "https", Host: "127.0.0.1:8080"},
}),
}
want := []byte(`{"peerURLs":["http://127.0.0.1:8081","https://127.0.0.1:8080"]}`)
got, err := json.Marshal(&req)
if err != nil {
t.Fatalf("Marshal returned unexpected err=%v", err)
}
if !reflect.DeepEqual(want, got) {
t.Fatalf("Failed to marshal MemberCreateRequest: want=%s, got=%s", want, got)
}
}

View File

@ -1,41 +0,0 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package types
import (
"strconv"
)
// ID represents a generic identifier which is canonically
// stored as a uint64 but is typically represented as a
// base-16 string for input/output
type ID uint64
func (i ID) String() string {
return strconv.FormatUint(uint64(i), 16)
}
// IDFromString attempts to create an ID from a base-16 string.
func IDFromString(s string) (ID, error) {
i, err := strconv.ParseUint(s, 16, 64)
return ID(i), err
}
// IDSlice implements the sort interface
type IDSlice []ID
func (p IDSlice) Len() int { return len(p) }
func (p IDSlice) Less(i, j int) bool { return uint64(p[i]) < uint64(p[j]) }
func (p IDSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }

View File

@ -1,95 +0,0 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package types
import (
"reflect"
"sort"
"testing"
)
func TestIDString(t *testing.T) {
tests := []struct {
input ID
want string
}{
{
input: 12,
want: "c",
},
{
input: 4918257920282737594,
want: "444129853c343bba",
},
}
for i, tt := range tests {
got := tt.input.String()
if tt.want != got {
t.Errorf("#%d: ID.String failure: want=%v, got=%v", i, tt.want, got)
}
}
}
func TestIDFromString(t *testing.T) {
tests := []struct {
input string
want ID
}{
{
input: "17",
want: 23,
},
{
input: "612840dae127353",
want: 437557308098245459,
},
}
for i, tt := range tests {
got, err := IDFromString(tt.input)
if err != nil {
t.Errorf("#%d: IDFromString failure: err=%v", i, err)
continue
}
if tt.want != got {
t.Errorf("#%d: IDFromString failure: want=%v, got=%v", i, tt.want, got)
}
}
}
func TestIDFromStringFail(t *testing.T) {
tests := []string{
"",
"XXX",
"612840dae127353612840dae127353",
}
for i, tt := range tests {
_, err := IDFromString(tt)
if err == nil {
t.Fatalf("#%d: IDFromString expected error, but err=nil", i)
}
}
}
func TestIDSlice(t *testing.T) {
g := []ID{10, 500, 5, 1, 100, 25}
w := []ID{1, 5, 10, 25, 100, 500}
sort.Sort(IDSlice(g))
if !reflect.DeepEqual(g, w) {
t.Errorf("slice after sort = %#v, want %#v", g, w)
}
}

View File

@ -1,178 +0,0 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package types
import (
"reflect"
"sort"
"sync"
)
type Set interface {
Add(string)
Remove(string)
Contains(string) bool
Equals(Set) bool
Length() int
Values() []string
Copy() Set
Sub(Set) Set
}
func NewUnsafeSet(values ...string) *unsafeSet {
set := &unsafeSet{make(map[string]struct{})}
for _, v := range values {
set.Add(v)
}
return set
}
func NewThreadsafeSet(values ...string) *tsafeSet {
us := NewUnsafeSet(values...)
return &tsafeSet{us, sync.RWMutex{}}
}
type unsafeSet struct {
d map[string]struct{}
}
// Add adds a new value to the set (no-op if the value is already present)
func (us *unsafeSet) Add(value string) {
us.d[value] = struct{}{}
}
// Remove removes the given value from the set
func (us *unsafeSet) Remove(value string) {
delete(us.d, value)
}
// Contains returns whether the set contains the given value
func (us *unsafeSet) Contains(value string) (exists bool) {
_, exists = us.d[value]
return
}
// ContainsAll returns whether the set contains all given values
func (us *unsafeSet) ContainsAll(values []string) bool {
for _, s := range values {
if !us.Contains(s) {
return false
}
}
return true
}
// Equals returns whether the contents of two sets are identical
func (us *unsafeSet) Equals(other Set) bool {
v1 := sort.StringSlice(us.Values())
v2 := sort.StringSlice(other.Values())
v1.Sort()
v2.Sort()
return reflect.DeepEqual(v1, v2)
}
// Length returns the number of elements in the set
func (us *unsafeSet) Length() int {
return len(us.d)
}
// Values returns the values of the Set in an unspecified order.
func (us *unsafeSet) Values() (values []string) {
values = make([]string, 0)
for val, _ := range us.d {
values = append(values, val)
}
return
}
// Copy creates a new Set containing the values of the first
func (us *unsafeSet) Copy() Set {
cp := NewUnsafeSet()
for val, _ := range us.d {
cp.Add(val)
}
return cp
}
// Sub removes all elements in other from the set
func (us *unsafeSet) Sub(other Set) Set {
oValues := other.Values()
result := us.Copy().(*unsafeSet)
for _, val := range oValues {
if _, ok := result.d[val]; !ok {
continue
}
delete(result.d, val)
}
return result
}
type tsafeSet struct {
us *unsafeSet
m sync.RWMutex
}
func (ts *tsafeSet) Add(value string) {
ts.m.Lock()
defer ts.m.Unlock()
ts.us.Add(value)
}
func (ts *tsafeSet) Remove(value string) {
ts.m.Lock()
defer ts.m.Unlock()
ts.us.Remove(value)
}
func (ts *tsafeSet) Contains(value string) (exists bool) {
ts.m.RLock()
defer ts.m.RUnlock()
return ts.us.Contains(value)
}
func (ts *tsafeSet) Equals(other Set) bool {
ts.m.RLock()
defer ts.m.RUnlock()
return ts.us.Equals(other)
}
func (ts *tsafeSet) Length() int {
ts.m.RLock()
defer ts.m.RUnlock()
return ts.us.Length()
}
func (ts *tsafeSet) Values() (values []string) {
ts.m.RLock()
defer ts.m.RUnlock()
return ts.us.Values()
}
func (ts *tsafeSet) Copy() Set {
ts.m.RLock()
defer ts.m.RUnlock()
usResult := ts.us.Copy().(*unsafeSet)
return &tsafeSet{usResult, sync.RWMutex{}}
}
func (ts *tsafeSet) Sub(other Set) Set {
ts.m.RLock()
defer ts.m.RUnlock()
usResult := ts.us.Sub(other).(*unsafeSet)
return &tsafeSet{usResult, sync.RWMutex{}}
}

View File

@ -1,186 +0,0 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package types
import (
"reflect"
"sort"
"testing"
)
func TestUnsafeSet(t *testing.T) {
driveSetTests(t, NewUnsafeSet())
}
func TestThreadsafeSet(t *testing.T) {
driveSetTests(t, NewThreadsafeSet())
}
// Check that two slices contents are equal; order is irrelevant
func equal(a, b []string) bool {
as := sort.StringSlice(a)
bs := sort.StringSlice(b)
as.Sort()
bs.Sort()
return reflect.DeepEqual(as, bs)
}
func driveSetTests(t *testing.T, s Set) {
// Verify operations on an empty set
eValues := []string{}
values := s.Values()
if !reflect.DeepEqual(values, eValues) {
t.Fatalf("Expect values=%v got %v", eValues, values)
}
if l := s.Length(); l != 0 {
t.Fatalf("Expected length=0, got %d", l)
}
for _, v := range []string{"foo", "bar", "baz"} {
if s.Contains(v) {
t.Fatalf("Expect s.Contains(%q) to be fale, got true", v)
}
}
// Add three items, ensure they show up
s.Add("foo")
s.Add("bar")
s.Add("baz")
eValues = []string{"foo", "bar", "baz"}
values = s.Values()
if !equal(values, eValues) {
t.Fatalf("Expect values=%v got %v", eValues, values)
}
for _, v := range eValues {
if !s.Contains(v) {
t.Fatalf("Expect s.Contains(%q) to be true, got false", v)
}
}
if l := s.Length(); l != 3 {
t.Fatalf("Expected length=3, got %d", l)
}
// Add the same item a second time, ensuring it is not duplicated
s.Add("foo")
values = s.Values()
if !equal(values, eValues) {
t.Fatalf("Expect values=%v got %v", eValues, values)
}
if l := s.Length(); l != 3 {
t.Fatalf("Expected length=3, got %d", l)
}
// Remove all items, ensure they are gone
s.Remove("foo")
s.Remove("bar")
s.Remove("baz")
eValues = []string{}
values = s.Values()
if !equal(values, eValues) {
t.Fatalf("Expect values=%v got %v", eValues, values)
}
if l := s.Length(); l != 0 {
t.Fatalf("Expected length=0, got %d", l)
}
// Create new copies of the set, and ensure they are unlinked to the
// original Set by making modifications
s.Add("foo")
s.Add("bar")
cp1 := s.Copy()
cp2 := s.Copy()
s.Remove("foo")
cp3 := s.Copy()
cp1.Add("baz")
for i, tt := range []struct {
want []string
got []string
}{
{[]string{"bar"}, s.Values()},
{[]string{"foo", "bar", "baz"}, cp1.Values()},
{[]string{"foo", "bar"}, cp2.Values()},
{[]string{"bar"}, cp3.Values()},
} {
if !equal(tt.want, tt.got) {
t.Fatalf("case %d: expect values=%v got %v", i, tt.want, tt.got)
}
}
for i, tt := range []struct {
want bool
got bool
}{
{true, s.Equals(cp3)},
{true, cp3.Equals(s)},
{false, s.Equals(cp2)},
{false, s.Equals(cp1)},
{false, cp1.Equals(s)},
{false, cp2.Equals(s)},
{false, cp2.Equals(cp1)},
} {
if tt.got != tt.want {
t.Fatalf("case %d: want %t, got %t", i, tt.want, tt.got)
}
}
// Subtract values from a Set, ensuring a new Set is created and
// the original Sets are unmodified
sub1 := cp1.Sub(s)
sub2 := cp2.Sub(cp1)
for i, tt := range []struct {
want []string
got []string
}{
{[]string{"foo", "bar", "baz"}, cp1.Values()},
{[]string{"foo", "bar"}, cp2.Values()},
{[]string{"bar"}, s.Values()},
{[]string{"foo", "baz"}, sub1.Values()},
{[]string{}, sub2.Values()},
} {
if !equal(tt.want, tt.got) {
t.Fatalf("case %d: expect values=%v got %v", i, tt.want, tt.got)
}
}
}
func TestUnsafeSetContainsAll(t *testing.T) {
vals := []string{"foo", "bar", "baz"}
s := NewUnsafeSet(vals...)
tests := []struct {
strs []string
wcontain bool
}{
{[]string{}, true},
{vals[:1], true},
{vals[:2], true},
{vals, true},
{[]string{"cuz"}, false},
{[]string{vals[0], "cuz"}, false},
}
for i, tt := range tests {
if g := s.ContainsAll(tt.strs); g != tt.wcontain {
t.Errorf("#%d: ok = %v, want %v", i, g, tt.wcontain)
}
}
}

View File

@ -1,22 +0,0 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package types
// Uint64Slice implements sort interface
type Uint64Slice []uint64
func (p Uint64Slice) Len() int { return len(p) }
func (p Uint64Slice) Less(i, j int) bool { return p[i] < p[j] }
func (p Uint64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }

View File

@ -1,30 +0,0 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package types
import (
"reflect"
"sort"
"testing"
)
func TestUint64Slice(t *testing.T) {
g := Uint64Slice{10, 500, 5, 1, 100, 25}
w := Uint64Slice{1, 5, 10, 25, 100, 500}
sort.Sort(g)
if !reflect.DeepEqual(g, w) {
t.Errorf("slice after sort = %#v, want %#v", g, w)
}
}

View File

@ -1,74 +0,0 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package types
import (
"errors"
"fmt"
"net"
"net/url"
"sort"
"strings"
)
type URLs []url.URL
func NewURLs(strs []string) (URLs, error) {
all := make([]url.URL, len(strs))
if len(all) == 0 {
return nil, errors.New("no valid URLs given")
}
for i, in := range strs {
in = strings.TrimSpace(in)
u, err := url.Parse(in)
if err != nil {
return nil, err
}
if u.Scheme != "http" && u.Scheme != "https" {
return nil, fmt.Errorf("URL scheme must be http or https: %s", in)
}
if _, _, err := net.SplitHostPort(u.Host); err != nil {
return nil, fmt.Errorf(`URL address does not have the form "host:port": %s`, in)
}
if u.Path != "" {
return nil, fmt.Errorf("URL must not contain a path: %s", in)
}
all[i] = *u
}
us := URLs(all)
us.Sort()
return us, nil
}
func (us URLs) String() string {
return strings.Join(us.StringSlice(), ",")
}
func (us *URLs) Sort() {
sort.Sort(us)
}
func (us URLs) Len() int { return len(us) }
func (us URLs) Less(i, j int) bool { return us[i].String() < us[j].String() }
func (us URLs) Swap(i, j int) { us[i], us[j] = us[j], us[i] }
func (us URLs) StringSlice() []string {
out := make([]string, len(us))
for i := range us {
out[i] = us[i].String()
}
return out
}

View File

@ -1,169 +0,0 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package types
import (
"reflect"
"testing"
"github.com/coreos/etcd/pkg/testutil"
)
func TestNewURLs(t *testing.T) {
tests := []struct {
strs []string
wurls URLs
}{
{
[]string{"http://127.0.0.1:4001"},
testutil.MustNewURLs(t, []string{"http://127.0.0.1:4001"}),
},
// it can trim space
{
[]string{" http://127.0.0.1:4001 "},
testutil.MustNewURLs(t, []string{"http://127.0.0.1:4001"}),
},
// it does sort
{
[]string{
"http://127.0.0.2:4001",
"http://127.0.0.1:4001",
},
testutil.MustNewURLs(t, []string{
"http://127.0.0.1:4001",
"http://127.0.0.2:4001",
}),
},
}
for i, tt := range tests {
urls, _ := NewURLs(tt.strs)
if !reflect.DeepEqual(urls, tt.wurls) {
t.Errorf("#%d: urls = %+v, want %+v", i, urls, tt.wurls)
}
}
}
func TestURLsString(t *testing.T) {
tests := []struct {
us URLs
wstr string
}{
{
URLs{},
"",
},
{
testutil.MustNewURLs(t, []string{"http://127.0.0.1:4001"}),
"http://127.0.0.1:4001",
},
{
testutil.MustNewURLs(t, []string{
"http://127.0.0.1:4001",
"http://127.0.0.2:4001",
}),
"http://127.0.0.1:4001,http://127.0.0.2:4001",
},
{
testutil.MustNewURLs(t, []string{
"http://127.0.0.2:4001",
"http://127.0.0.1:4001",
}),
"http://127.0.0.2:4001,http://127.0.0.1:4001",
},
}
for i, tt := range tests {
g := tt.us.String()
if g != tt.wstr {
t.Errorf("#%d: string = %s, want %s", i, g, tt.wstr)
}
}
}
func TestURLsSort(t *testing.T) {
g := testutil.MustNewURLs(t, []string{
"http://127.0.0.4:4001",
"http://127.0.0.2:4001",
"http://127.0.0.1:4001",
"http://127.0.0.3:4001",
})
w := testutil.MustNewURLs(t, []string{
"http://127.0.0.1:4001",
"http://127.0.0.2:4001",
"http://127.0.0.3:4001",
"http://127.0.0.4:4001",
})
gurls := URLs(g)
gurls.Sort()
if !reflect.DeepEqual(g, w) {
t.Errorf("URLs after sort = %#v, want %#v", g, w)
}
}
func TestURLsStringSlice(t *testing.T) {
tests := []struct {
us URLs
wstr []string
}{
{
URLs{},
[]string{},
},
{
testutil.MustNewURLs(t, []string{"http://127.0.0.1:4001"}),
[]string{"http://127.0.0.1:4001"},
},
{
testutil.MustNewURLs(t, []string{
"http://127.0.0.1:4001",
"http://127.0.0.2:4001",
}),
[]string{"http://127.0.0.1:4001", "http://127.0.0.2:4001"},
},
{
testutil.MustNewURLs(t, []string{
"http://127.0.0.2:4001",
"http://127.0.0.1:4001",
}),
[]string{"http://127.0.0.2:4001", "http://127.0.0.1:4001"},
},
}
for i, tt := range tests {
g := tt.us.StringSlice()
if !reflect.DeepEqual(g, tt.wstr) {
t.Errorf("#%d: string slice = %+v, want %+v", i, g, tt.wstr)
}
}
}
func TestNewURLsFail(t *testing.T) {
tests := [][]string{
// no urls given
{},
// missing protocol scheme
{"://127.0.0.1:4001"},
// unsupported scheme
{"mailto://127.0.0.1:4001"},
// not conform to host:port
{"http://127.0.0.1"},
// contain a path
{"http://127.0.0.1:4001/path"},
}
for i, tt := range tests {
_, err := NewURLs(tt)
if err == nil {
t.Errorf("#%d: err = nil, but error", i)
}
}
}

View File

@ -0,0 +1,34 @@
language: go
go:
- 1.3
# - 1.4
# see https://github.com/moovweb/gvm/pull/116 for why Go 1.4 is currently disabled
# let us have speedy Docker-based Travis workers
sudo: false
before_install:
# Symlink below is needed for Travis CI to work correctly on personal forks of libkv
- ln -s $HOME/gopath/src/github.com/${TRAVIS_REPO_SLUG///libkv/} $HOME/gopath/src/github.com/docker
- go get golang.org/x/tools/cmd/vet
- go get golang.org/x/tools/cmd/cover
- go get github.com/mattn/goveralls
- go get github.com/golang/lint/golint
- go get github.com/GeertJohan/fgt
before_script:
- script/travis_consul.sh 0.5.2
- script/travis_etcd.sh 2.0.11
- script/travis_zk.sh 3.4.6
script:
- ./consul agent -server -bootstrap-expect 1 -data-dir /tmp/consul -config-file=./config.json 1>/dev/null &
- ./etcd/etcd --listen-client-urls 'http://0.0.0.0:4001' --advertise-client-urls 'http://127.0.0.1:4001' >/dev/null 2>&1 &
- ./zk/bin/zkServer.sh start ./zk/conf/zoo.cfg 1> /dev/null
- script/validate-gofmt
- go vet ./...
- fgt golint ./...
- go test -v -race ./...
- script/coverage
- goveralls -service=travis-ci -coverprofile=goverage.report

191
Godeps/_workspace/src/github.com/docker/libkv/LICENSE generated vendored Normal file
View File

@ -0,0 +1,191 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
Copyright 2014-2015 Docker, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

108
Godeps/_workspace/src/github.com/docker/libkv/README.md generated vendored Normal file
View File

@ -0,0 +1,108 @@
# libkv
[![GoDoc](https://godoc.org/github.com/docker/libkv?status.png)](https://godoc.org/github.com/docker/libkv)
[![Build Status](https://travis-ci.org/docker/libkv.svg?branch=master)](https://travis-ci.org/docker/libkv)
[![Coverage Status](https://coveralls.io/repos/docker/libkv/badge.svg)](https://coveralls.io/r/docker/libkv)
`libkv` provides a `Go` native library to store metadata.
The goal of `libkv` is to abstract common store operations for multiple Key/Value backends and offer the same experience no matter which one of the backend you want to use.
For example, you can use it to store your metadata or for service discovery to register machines and endpoints inside your cluster.
You can also easily implement a generic *Leader Election* on top of it (see the [swarm/leadership](https://github.com/docker/swarm/tree/master/leadership) package).
As of now, `libkv` offers support for `Consul`, `Etcd` and `Zookeeper`.
## Example of usage
### Create a new store and use Put/Get
```go
package main
import (
"fmt"
"time"
"github.com/docker/libkv"
"github.com/docker/libkv/store"
log "github.com/Sirupsen/logrus"
)
func main() {
client := "localhost:8500"
// Initialize a new store with consul
kv, err = libkv.NewStore(
store.CONSUL, // or "consul"
[]string{client},
&store.Config{
ConnectionTimeout: 10*time.Second,
},
)
if err != nil {
log.Fatal("Cannot create store consul")
}
key := "foo"
err = kv.Put(key, []byte("bar"), nil)
if err != nil {
log.Error("Error trying to put value at key `", key, "`")
}
pair, err := kv.Get(key)
if err != nil {
log.Error("Error trying accessing value at key `", key, "`")
}
log.Info("value: ", string(pair.Value))
}
```
You can find other usage examples for `libkv` under the `docker/swarm` or `docker/libnetwork` repositories.
## Details
You should expect the same experience for basic operations like `Get`/`Put`, etc.
However calls like `WatchTree` may return different events (or number of events) depending on the backend (for now, `Etcd` and `Consul` will likely return more events than `Zookeeper` that you should triage properly).
## Create a new storage backend
A new **storage backend** should include those calls:
```go
type Store interface {
Put(key string, value []byte, options *WriteOptions) error
Get(key string) (*KVPair, error)
Delete(key string) error
Exists(key string) (bool, error)
Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error)
WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*KVPair, error)
NewLock(key string, options *LockOptions) (Locker, error)
List(directory string) ([]*KVPair, error)
DeleteTree(directory string) error
AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, *KVPair, error)
AtomicDelete(key string, previous *KVPair) (bool, error)
Close()
}
```
You can get inspiration from existing backends to create a new one. This interface could be subject to changes to improve the experience of using the library and contributing to a new backend.
##Roadmap
- Make the API nicer to use (using `options`)
- Provide more options (`consistency` for example)
- Improve performance (remove extras `Get`/`List` operations)
- Add more exhaustive tests
- New backends?
##Contributing
Want to hack on libkv? [Docker's contributions guidelines](https://github.com/docker/docker/blob/master/CONTRIBUTING.md) apply.
##Copyright and license
Code and documentation copyright 2015 Docker, inc. Code released under the Apache 2.0 license. Docs released under Creative commons.

29
Godeps/_workspace/src/github.com/docker/libkv/libkv.go generated vendored Normal file
View File

@ -0,0 +1,29 @@
package libkv
import (
"github.com/docker/libkv/store"
"github.com/docker/libkv/store/consul"
"github.com/docker/libkv/store/etcd"
"github.com/docker/libkv/store/zookeeper"
)
// Initialize creates a new Store object, initializing the client
type Initialize func(addrs []string, options *store.Config) (store.Store, error)
var (
// Backend initializers
initializers = map[store.Backend]Initialize{
store.CONSUL: consul.New,
store.ETCD: etcd.New,
store.ZK: zookeeper.New,
}
)
// NewStore creates a an instance of store
func NewStore(backend store.Backend, addrs []string, options *store.Config) (store.Store, error) {
if init, exists := initializers[backend]; exists {
return init(addrs, options)
}
return nil, store.ErrNotSupported
}

View File

@ -0,0 +1,80 @@
package libkv
import (
"testing"
"time"
"github.com/docker/libkv/store"
"github.com/docker/libkv/store/consul"
"github.com/docker/libkv/store/etcd"
"github.com/docker/libkv/store/zookeeper"
"github.com/stretchr/testify/assert"
)
func TestNewStoreConsul(t *testing.T) {
client := "localhost:8500"
kv, err := NewStore(
store.CONSUL,
[]string{client},
&store.Config{
ConnectionTimeout: 10 * time.Second,
},
)
assert.NoError(t, err)
assert.NotNil(t, kv)
if _, ok := kv.(*consul.Consul); !ok {
t.Fatal("Error while initializing store consul")
}
}
func TestNewStoreEtcd(t *testing.T) {
client := "localhost:4001"
kv, err := NewStore(
store.ETCD,
[]string{client},
&store.Config{
ConnectionTimeout: 10 * time.Second,
},
)
assert.NoError(t, err)
assert.NotNil(t, kv)
if _, ok := kv.(*etcd.Etcd); !ok {
t.Fatal("Error while initializing store etcd")
}
}
func TestNewStoreZookeeper(t *testing.T) {
client := "localhost:2181"
kv, err := NewStore(
store.ZK,
[]string{client},
&store.Config{
ConnectionTimeout: 10 * time.Second,
},
)
assert.NoError(t, err)
assert.NotNil(t, kv)
if _, ok := kv.(*zookeeper.Zookeeper); !ok {
t.Fatal("Error while initializing store zookeeper")
}
}
func TestNewStoreUnsupported(t *testing.T) {
client := "localhost:9999"
kv, err := NewStore(
"unsupported",
[]string{client},
&store.Config{
ConnectionTimeout: 10 * time.Second,
},
)
assert.Error(t, err)
assert.Nil(t, kv)
}

View File

@ -0,0 +1,33 @@
#!/bin/bash
if [ -z "$VALIDATE_UPSTREAM" ]; then
# this is kind of an expensive check, so let's not do this twice if we
# are running more than one validate bundlescript
VALIDATE_REPO='https://github.com/docker/libkv.git'
VALIDATE_BRANCH='master'
if [ "$TRAVIS" = 'true' -a "$TRAVIS_PULL_REQUEST" != 'false' ]; then
VALIDATE_REPO="https://github.com/${TRAVIS_REPO_SLUG}.git"
VALIDATE_BRANCH="${TRAVIS_BRANCH}"
fi
VALIDATE_HEAD="$(git rev-parse --verify HEAD)"
git fetch -q "$VALIDATE_REPO" "refs/heads/$VALIDATE_BRANCH"
VALIDATE_UPSTREAM="$(git rev-parse --verify FETCH_HEAD)"
VALIDATE_COMMIT_LOG="$VALIDATE_UPSTREAM..$VALIDATE_HEAD"
VALIDATE_COMMIT_DIFF="$VALIDATE_UPSTREAM...$VALIDATE_HEAD"
validate_diff() {
if [ "$VALIDATE_UPSTREAM" != "$VALIDATE_HEAD" ]; then
git diff "$VALIDATE_COMMIT_DIFF" "$@"
fi
}
validate_log() {
if [ "$VALIDATE_UPSTREAM" != "$VALIDATE_HEAD" ]; then
git log "$VALIDATE_COMMIT_LOG" "$@"
fi
}
fi

View File

@ -0,0 +1,21 @@
#!/bin/bash
MODE="mode: count"
ROOT=${TRAVIS_BUILD_DIR:-.}/../../..
# Grab the list of packages.
# Exclude the API and CLI from coverage as it will be covered by integration tests.
PACKAGES=`go list ./...`
# Create the empty coverage file.
echo $MODE > goverage.report
# Run coverage on every package.
for package in $PACKAGES; do
output="$ROOT/$package/coverage.out"
go test -test.short -covermode=count -coverprofile=$output $package
if [ -f "$output" ] ; then
cat "$output" | grep -v "$MODE" >> goverage.report
fi
done

View File

@ -0,0 +1,18 @@
#!/bin/bash
if [ $# -gt 0 ] ; then
CONSUL_VERSION="$1"
else
CONSUL_VERSION="0.5.2"
fi
# install consul
wget "https://dl.bintray.com/mitchellh/consul/${CONSUL_VERSION}_linux_amd64.zip"
unzip "${CONSUL_VERSION}_linux_amd64.zip"
# make config for minimum ttl
touch config.json
echo "{\"session_ttl_min\": \"1s\"}" >> config.json
# check
./consul --version

View File

@ -0,0 +1,11 @@
#!/bin/bash
if [ $# -gt 0 ] ; then
ETCD_VERSION="$1"
else
ETCD_VERSION="2.0.11"
fi
curl -L https://github.com/coreos/etcd/releases/download/v$ETCD_VERSION/etcd-v$ETCD_VERSION-linux-amd64.tar.gz -o etcd-v$ETCD_VERSION-linux-amd64.tar.gz
tar xzvf etcd-v$ETCD_VERSION-linux-amd64.tar.gz
mv etcd-v$ETCD_VERSION-linux-amd64 etcd

View File

@ -0,0 +1,12 @@
#!/bin/bash
if [ $# -gt 0 ] ; then
ZK_VERSION="$1"
else
ZK_VERSION="3.4.6"
fi
wget "http://mirrors.ukfast.co.uk/sites/ftp.apache.org/zookeeper/stable/zookeeper-${ZK_VERSION}.tar.gz"
tar -xvf "zookeeper-${ZK_VERSION}.tar.gz"
mv zookeeper-$ZK_VERSION zk
mv ./zk/conf/zoo_sample.cfg ./zk/conf/zoo.cfg

View File

@ -0,0 +1,30 @@
#!/bin/bash
source "$(dirname "$BASH_SOURCE")/.validate"
IFS=$'\n'
files=( $(validate_diff --diff-filter=ACMR --name-only -- '*.go' | grep -v '^Godeps/' || true) )
unset IFS
badFiles=()
for f in "${files[@]}"; do
# we use "git show" here to validate that what's committed is formatted
if [ "$(git show "$VALIDATE_HEAD:$f" | gofmt -s -l)" ]; then
badFiles+=( "$f" )
fi
done
if [ ${#badFiles[@]} -eq 0 ]; then
echo 'Congratulations! All Go source files are properly formatted.'
else
{
echo "These files are not properly gofmt'd:"
for f in "${badFiles[@]}"; do
echo " - $f"
done
echo
echo 'Please reformat the above files using "gofmt -s -w" and commit the result.'
echo
} >&2
false
fi

View File

@ -0,0 +1,417 @@
package consul
import (
"crypto/tls"
"net/http"
"strings"
"sync"
"time"
"github.com/docker/libkv/store"
api "github.com/hashicorp/consul/api"
)
const (
// DefaultWatchWaitTime is how long we block for at a
// time to check if the watched key has changed. This
// affects the minimum time it takes to cancel a watch.
DefaultWatchWaitTime = 15 * time.Second
)
// Consul is the receiver type for the
// Store interface
type Consul struct {
sync.Mutex
config *api.Config
client *api.Client
ephemeralTTL time.Duration
}
type consulLock struct {
lock *api.Lock
}
// New creates a new Consul client given a list
// of endpoints and optional tls config
func New(endpoints []string, options *store.Config) (store.Store, error) {
s := &Consul{}
// Create Consul client
config := api.DefaultConfig()
s.config = config
config.HttpClient = http.DefaultClient
config.Address = endpoints[0]
config.Scheme = "http"
// Set options
if options != nil {
if options.TLS != nil {
s.setTLS(options.TLS)
}
if options.ConnectionTimeout != 0 {
s.setTimeout(options.ConnectionTimeout)
}
if options.EphemeralTTL != 0 {
s.setEphemeralTTL(options.EphemeralTTL)
}
}
// Creates a new client
client, err := api.NewClient(config)
if err != nil {
return nil, err
}
s.client = client
return s, nil
}
// SetTLS sets Consul TLS options
func (s *Consul) setTLS(tls *tls.Config) {
s.config.HttpClient.Transport = &http.Transport{
TLSClientConfig: tls,
}
s.config.Scheme = "https"
}
// SetTimeout sets the timout for connecting to Consul
func (s *Consul) setTimeout(time time.Duration) {
s.config.WaitTime = time
}
// SetEphemeralTTL sets the ttl for ephemeral nodes
func (s *Consul) setEphemeralTTL(ttl time.Duration) {
s.ephemeralTTL = ttl
}
// Normalize the key for usage in Consul
func (s *Consul) normalize(key string) string {
key = store.Normalize(key)
return strings.TrimPrefix(key, "/")
}
func (s *Consul) refreshSession(pair *api.KVPair) error {
// Check if there is any previous session with an active TTL
session, err := s.getActiveSession(pair.Key)
if err != nil {
return err
}
if session == "" {
entry := &api.SessionEntry{
Behavior: api.SessionBehaviorDelete, // Delete the key when the session expires
TTL: ((s.ephemeralTTL) / 2).String(), // Consul multiplies the TTL by 2x
LockDelay: 1 * time.Millisecond, // Virtually disable lock delay
}
// Create the key session
session, _, err = s.client.Session().Create(entry, nil)
if err != nil {
return err
}
lockOpts := &api.LockOptions{
Key: pair.Key,
Session: session,
}
// Lock and ignore if lock is held
// It's just a placeholder for the
// ephemeral behavior
lock, _ := s.client.LockOpts(lockOpts)
if lock != nil {
lock.Lock(nil)
}
}
_, _, err = s.client.Session().Renew(session, nil)
if err != nil {
return s.refreshSession(pair)
}
return nil
}
// getActiveSession checks if the key already has
// a session attached
func (s *Consul) getActiveSession(key string) (string, error) {
pair, _, err := s.client.KV().Get(key, nil)
if err != nil {
return "", err
}
if pair != nil && pair.Session != "" {
return pair.Session, nil
}
return "", nil
}
// Get the value at "key", returns the last modified index
// to use in conjunction to CAS calls
func (s *Consul) Get(key string) (*store.KVPair, error) {
options := &api.QueryOptions{
AllowStale: false,
RequireConsistent: true,
}
pair, meta, err := s.client.KV().Get(s.normalize(key), options)
if err != nil {
return nil, err
}
// If pair is nil then the key does not exist
if pair == nil {
return nil, store.ErrKeyNotFound
}
return &store.KVPair{Key: pair.Key, Value: pair.Value, LastIndex: meta.LastIndex}, nil
}
// Put a value at "key"
func (s *Consul) Put(key string, value []byte, opts *store.WriteOptions) error {
key = s.normalize(key)
p := &api.KVPair{
Key: key,
Value: value,
}
if opts != nil && opts.Ephemeral {
// Create or refresh the session
err := s.refreshSession(p)
if err != nil {
return err
}
}
_, err := s.client.KV().Put(p, nil)
return err
}
// Delete a value at "key"
func (s *Consul) Delete(key string) error {
_, err := s.client.KV().Delete(s.normalize(key), nil)
return err
}
// Exists checks that the key exists inside the store
func (s *Consul) Exists(key string) (bool, error) {
_, err := s.Get(key)
if err != nil && err == store.ErrKeyNotFound {
return false, err
}
return true, nil
}
// List child nodes of a given directory
func (s *Consul) List(directory string) ([]*store.KVPair, error) {
pairs, _, err := s.client.KV().List(s.normalize(directory), nil)
if err != nil {
return nil, err
}
if len(pairs) == 0 {
return nil, store.ErrKeyNotFound
}
kv := []*store.KVPair{}
for _, pair := range pairs {
if pair.Key == directory {
continue
}
kv = append(kv, &store.KVPair{
Key: pair.Key,
Value: pair.Value,
LastIndex: pair.ModifyIndex,
})
}
return kv, nil
}
// DeleteTree deletes a range of keys under a given directory
func (s *Consul) DeleteTree(directory string) error {
_, err := s.client.KV().DeleteTree(s.normalize(directory), nil)
return err
}
// Watch for changes on a "key"
// It returns a channel that will receive changes or pass
// on errors. Upon creation, the current value will first
// be sent to the channel. Providing a non-nil stopCh can
// be used to stop watching.
func (s *Consul) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) {
kv := s.client.KV()
watchCh := make(chan *store.KVPair)
go func() {
defer close(watchCh)
// Use a wait time in order to check if we should quit
// from time to time.
opts := &api.QueryOptions{WaitTime: DefaultWatchWaitTime}
for {
// Check if we should quit
select {
case <-stopCh:
return
default:
}
// Get the key
pair, meta, err := kv.Get(key, opts)
if err != nil {
return
}
// If LastIndex didn't change then it means `Get` returned
// because of the WaitTime and the key didn't changed.
if opts.WaitIndex == meta.LastIndex {
continue
}
opts.WaitIndex = meta.LastIndex
// Return the value to the channel
// FIXME: What happens when a key is deleted?
if pair != nil {
watchCh <- &store.KVPair{
Key: pair.Key,
Value: pair.Value,
LastIndex: pair.ModifyIndex,
}
}
}
}()
return watchCh, nil
}
// WatchTree watches for changes on a "directory"
// It returns a channel that will receive changes or pass
// on errors. Upon creating a watch, the current childs values
// will be sent to the channel .Providing a non-nil stopCh can
// be used to stop watching.
func (s *Consul) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) {
kv := s.client.KV()
watchCh := make(chan []*store.KVPair)
go func() {
defer close(watchCh)
// Use a wait time in order to check if we should quit
// from time to time.
opts := &api.QueryOptions{WaitTime: DefaultWatchWaitTime}
for {
// Check if we should quit
select {
case <-stopCh:
return
default:
}
// Get all the childrens
pairs, meta, err := kv.List(directory, opts)
if err != nil {
return
}
// If LastIndex didn't change then it means `Get` returned
// because of the WaitTime and the child keys didn't change.
if opts.WaitIndex == meta.LastIndex {
continue
}
opts.WaitIndex = meta.LastIndex
// Return children KV pairs to the channel
kvpairs := []*store.KVPair{}
for _, pair := range pairs {
if pair.Key == directory {
continue
}
kvpairs = append(kvpairs, &store.KVPair{
Key: pair.Key,
Value: pair.Value,
LastIndex: pair.ModifyIndex,
})
}
watchCh <- kvpairs
}
}()
return watchCh, nil
}
// NewLock returns a handle to a lock struct which can
// be used to provide mutual exclusion on a key
func (s *Consul) NewLock(key string, options *store.LockOptions) (store.Locker, error) {
consulOpts := &api.LockOptions{
Key: s.normalize(key),
}
if options != nil {
consulOpts.Value = options.Value
}
l, err := s.client.LockOpts(consulOpts)
if err != nil {
return nil, err
}
return &consulLock{lock: l}, nil
}
// Lock attempts to acquire the lock and blocks while
// doing so. It returns a channel that is closed if our
// lock is lost or if an error occurs
func (l *consulLock) Lock() (<-chan struct{}, error) {
return l.lock.Lock(nil)
}
// Unlock the "key". Calling unlock while
// not holding the lock will throw an error
func (l *consulLock) Unlock() error {
return l.lock.Unlock()
}
// AtomicPut put a value at "key" if the key has not been
// modified in the meantime, throws an error if this is the case
func (s *Consul) AtomicPut(key string, value []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) {
if previous == nil {
return false, nil, store.ErrPreviousNotSpecified
}
p := &api.KVPair{Key: s.normalize(key), Value: value, ModifyIndex: previous.LastIndex}
if work, _, err := s.client.KV().CAS(p, nil); err != nil {
return false, nil, err
} else if !work {
return false, nil, store.ErrKeyModified
}
pair, err := s.Get(key)
if err != nil {
return false, nil, err
}
return true, pair, nil
}
// AtomicDelete deletes a value at "key" if the key has not
// been modified in the meantime, throws an error if this is the case
func (s *Consul) AtomicDelete(key string, previous *store.KVPair) (bool, error) {
if previous == nil {
return false, store.ErrPreviousNotSpecified
}
p := &api.KVPair{Key: s.normalize(key), ModifyIndex: previous.LastIndex}
if work, _, err := s.client.KV().DeleteCAS(p, nil); err != nil {
return false, err
} else if !work {
return false, store.ErrKeyModified
}
return true, nil
}
// Close closes the client connection
func (s *Consul) Close() {
return
}

View File

@ -0,0 +1,62 @@
package consul
import (
"testing"
"time"
"github.com/docker/libkv/store"
"github.com/docker/libkv/testutils"
"github.com/stretchr/testify/assert"
)
func makeConsulClient(t *testing.T) store.Store {
client := "localhost:8500"
kv, err := New(
[]string{client},
&store.Config{
ConnectionTimeout: 3 * time.Second,
EphemeralTTL: 2 * time.Second,
},
)
if err != nil {
t.Fatalf("cannot create store: %v", err)
}
return kv
}
func TestConsulStore(t *testing.T) {
kv := makeConsulClient(t)
backup := makeConsulClient(t)
testutils.RunTestStore(t, kv, backup)
}
func TestGetActiveSession(t *testing.T) {
kv := makeConsulClient(t)
consul := kv.(*Consul)
key := "foo"
value := []byte("bar")
// Put the first key with the Ephemeral flag
err := kv.Put(key, value, &store.WriteOptions{Ephemeral: true})
assert.NoError(t, err)
// Session should not be empty
session, err := consul.getActiveSession(key)
assert.NoError(t, err)
assert.NotEqual(t, session, "")
// Delete the key
err = kv.Delete(key)
assert.NoError(t, err)
// Check the session again, it should return nothing
session, err = consul.getActiveSession(key)
assert.NoError(t, err)
assert.Equal(t, session, "")
}

View File

@ -0,0 +1,478 @@
package etcd
import (
"crypto/tls"
"net"
"net/http"
"strings"
"time"
etcd "github.com/coreos/go-etcd/etcd"
"github.com/docker/libkv/store"
)
// Etcd is the receiver type for the
// Store interface
type Etcd struct {
client *etcd.Client
ephemeralTTL time.Duration
}
type etcdLock struct {
client *etcd.Client
stopLock chan struct{}
key string
value string
last *etcd.Response
ttl uint64
}
const (
periodicSync = 10 * time.Minute
defaultLockTTL = 20 * time.Second
defaultUpdateTime = 5 * time.Second
)
// New creates a new Etcd client given a list
// of endpoints and an optional tls config
func New(addrs []string, options *store.Config) (store.Store, error) {
s := &Etcd{}
entries := store.CreateEndpoints(addrs, "http")
s.client = etcd.NewClient(entries)
// Set options
if options != nil {
if options.TLS != nil {
s.setTLS(options.TLS)
}
if options.ConnectionTimeout != 0 {
s.setTimeout(options.ConnectionTimeout)
}
if options.EphemeralTTL != 0 {
s.setEphemeralTTL(options.EphemeralTTL)
}
}
// Periodic SyncCluster
go func() {
for {
s.client.SyncCluster()
time.Sleep(periodicSync)
}
}()
return s, nil
}
// SetTLS sets the tls configuration given the path
// of certificate files
func (s *Etcd) setTLS(tls *tls.Config) {
// Change to https scheme
var addrs []string
entries := s.client.GetCluster()
for _, entry := range entries {
addrs = append(addrs, strings.Replace(entry, "http", "https", -1))
}
s.client.SetCluster(addrs)
// Set transport
t := http.Transport{
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: tls,
}
s.client.SetTransport(&t)
}
// setTimeout sets the timeout used for connecting to the store
func (s *Etcd) setTimeout(time time.Duration) {
s.client.SetDialTimeout(time)
}
// setEphemeralHeartbeat sets the heartbeat value to notify
// that a node is alive
func (s *Etcd) setEphemeralTTL(time time.Duration) {
s.ephemeralTTL = time
}
// createDirectory creates the entire path for a directory
// that does not exist
func (s *Etcd) createDirectory(path string) error {
if _, err := s.client.CreateDir(store.Normalize(path), 10); err != nil {
if etcdError, ok := err.(*etcd.EtcdError); ok {
// Skip key already exists
if etcdError.ErrorCode != 105 {
return err
}
} else {
return err
}
}
return nil
}
// Get the value at "key", returns the last modified index
// to use in conjunction to Atomic calls
func (s *Etcd) Get(key string) (pair *store.KVPair, err error) {
result, err := s.client.Get(store.Normalize(key), false, false)
if err != nil {
if etcdError, ok := err.(*etcd.EtcdError); ok {
// Not a Directory or Not a file
if etcdError.ErrorCode == 102 || etcdError.ErrorCode == 104 {
return nil, store.ErrKeyNotFound
}
}
return nil, err
}
pair = &store.KVPair{
Key: key,
Value: []byte(result.Node.Value),
LastIndex: result.Node.ModifiedIndex,
}
return pair, nil
}
// Put a value at "key"
func (s *Etcd) Put(key string, value []byte, opts *store.WriteOptions) error {
// Default TTL = 0 means no expiration
var ttl uint64
if opts != nil && opts.Ephemeral {
ttl = uint64(s.ephemeralTTL.Seconds())
}
if _, err := s.client.Set(key, string(value), ttl); err != nil {
if etcdError, ok := err.(*etcd.EtcdError); ok {
// Not a directory
if etcdError.ErrorCode == 104 {
// Remove the last element (the actual key)
// and create the full directory path
err = s.createDirectory(store.GetDirectory(key))
if err != nil {
return err
}
// Now that the directory is created, set the key
if _, err := s.client.Set(key, string(value), ttl); err != nil {
return err
}
}
}
return err
}
return nil
}
// Delete a value at "key"
func (s *Etcd) Delete(key string) error {
_, err := s.client.Delete(store.Normalize(key), false)
return err
}
// Exists checks if the key exists inside the store
func (s *Etcd) Exists(key string) (bool, error) {
entry, err := s.Get(key)
if err != nil && entry != nil {
if err == store.ErrKeyNotFound || entry.Value == nil {
return false, nil
}
return false, err
}
return true, nil
}
// Watch for changes on a "key"
// It returns a channel that will receive changes or pass
// on errors. Upon creation, the current value will first
// be sent to the channel. Providing a non-nil stopCh can
// be used to stop watching.
func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) {
// Get the current value
current, err := s.Get(key)
if err != nil {
return nil, err
}
// Start an etcd watch.
// Note: etcd will send the current value through the channel.
etcdWatchCh := make(chan *etcd.Response)
etcdStopCh := make(chan bool)
go s.client.Watch(store.Normalize(key), 0, false, etcdWatchCh, etcdStopCh)
// Adapter goroutine: The goal here is to convert whatever
// format etcd is using into our interface.
watchCh := make(chan *store.KVPair)
go func() {
defer close(watchCh)
// Push the current value through the channel.
watchCh <- current
for {
select {
case result := <-etcdWatchCh:
watchCh <- &store.KVPair{
Key: key,
Value: []byte(result.Node.Value),
LastIndex: result.Node.ModifiedIndex,
}
case <-stopCh:
etcdStopCh <- true
return
}
}
}()
return watchCh, nil
}
// WatchTree watches for changes on a "directory"
// It returns a channel that will receive changes or pass
// on errors. Upon creating a watch, the current childs values
// will be sent to the channel .Providing a non-nil stopCh can
// be used to stop watching.
func (s *Etcd) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) {
// Get child values
current, err := s.List(directory)
if err != nil {
return nil, err
}
// Start the watch
etcdWatchCh := make(chan *etcd.Response)
etcdStopCh := make(chan bool)
go s.client.Watch(store.Normalize(directory), 0, true, etcdWatchCh, etcdStopCh)
// Adapter goroutine: The goal here is to convert whatever
// format etcd is using into our interface.
watchCh := make(chan []*store.KVPair)
go func() {
defer close(watchCh)
// Push the current value through the channel.
watchCh <- current
for {
select {
case <-etcdWatchCh:
// FIXME: We should probably use the value pushed by the channel.
// However, Node.Nodes seems to be empty.
if list, err := s.List(directory); err == nil {
watchCh <- list
}
case <-stopCh:
etcdStopCh <- true
return
}
}
}()
return watchCh, nil
}
// AtomicPut put a value at "key" if the key has not been
// modified in the meantime, throws an error if this is the case
func (s *Etcd) AtomicPut(key string, value []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) {
if previous == nil {
return false, nil, store.ErrPreviousNotSpecified
}
meta, err := s.client.CompareAndSwap(store.Normalize(key), string(value), 0, "", previous.LastIndex)
if err != nil {
if etcdError, ok := err.(*etcd.EtcdError); ok {
// Compare Failed
if etcdError.ErrorCode == 101 {
return false, nil, store.ErrKeyModified
}
}
return false, nil, err
}
updated := &store.KVPair{
Key: key,
Value: value,
LastIndex: meta.Node.ModifiedIndex,
}
return true, updated, nil
}
// AtomicDelete deletes a value at "key" if the key
// has not been modified in the meantime, throws an
// error if this is the case
func (s *Etcd) AtomicDelete(key string, previous *store.KVPair) (bool, error) {
if previous == nil {
return false, store.ErrPreviousNotSpecified
}
_, err := s.client.CompareAndDelete(store.Normalize(key), "", previous.LastIndex)
if err != nil {
if etcdError, ok := err.(*etcd.EtcdError); ok {
// Compare failed
if etcdError.ErrorCode == 101 {
return false, store.ErrKeyModified
}
}
return false, err
}
return true, nil
}
// List child nodes of a given directory
func (s *Etcd) List(directory string) ([]*store.KVPair, error) {
resp, err := s.client.Get(store.Normalize(directory), true, true)
if err != nil {
return nil, err
}
kv := []*store.KVPair{}
for _, n := range resp.Node.Nodes {
key := strings.TrimLeft(n.Key, "/")
kv = append(kv, &store.KVPair{
Key: key,
Value: []byte(n.Value),
LastIndex: n.ModifiedIndex,
})
}
return kv, nil
}
// DeleteTree deletes a range of keys under a given directory
func (s *Etcd) DeleteTree(directory string) error {
_, err := s.client.Delete(store.Normalize(directory), true)
return err
}
// NewLock returns a handle to a lock struct which can
// be used to provide mutual exclusion on a key
func (s *Etcd) NewLock(key string, options *store.LockOptions) (lock store.Locker, err error) {
var value string
ttl := uint64(time.Duration(defaultLockTTL).Seconds())
// Apply options on Lock
if options != nil {
if options.Value != nil {
value = string(options.Value)
}
if options.TTL != 0 {
ttl = uint64(options.TTL.Seconds())
}
}
// Create lock object
lock = &etcdLock{
client: s.client,
key: key,
value: value,
ttl: ttl,
}
return lock, nil
}
// Lock attempts to acquire the lock and blocks while
// doing so. It returns a channel that is closed if our
// lock is lost or if an error occurs
func (l *etcdLock) Lock() (<-chan struct{}, error) {
key := store.Normalize(l.key)
// Lock holder channels
lockHeld := make(chan struct{})
stopLocking := make(chan struct{})
var lastIndex uint64
for {
resp, err := l.client.Create(key, l.value, l.ttl)
if err != nil {
if etcdError, ok := err.(*etcd.EtcdError); ok {
// Key already exists
if etcdError.ErrorCode != 105 {
lastIndex = ^uint64(0)
}
}
} else {
lastIndex = resp.Node.ModifiedIndex
}
_, err = l.client.CompareAndSwap(key, l.value, l.ttl, "", lastIndex)
if err == nil {
// Leader section
l.stopLock = stopLocking
go l.holdLock(key, lockHeld, stopLocking)
break
} else {
// Seeker section
chW := make(chan *etcd.Response)
chWStop := make(chan bool)
l.waitLock(key, chW, chWStop)
// Delete or Expire event occured
// Retry
}
}
return lockHeld, nil
}
// Hold the lock as long as we can
// Updates the key ttl periodically until we receive
// an explicit stop signal from the Unlock method
func (l *etcdLock) holdLock(key string, lockHeld chan struct{}, stopLocking chan struct{}) {
defer close(lockHeld)
update := time.NewTicker(defaultUpdateTime)
defer update.Stop()
var err error
for {
select {
case <-update.C:
l.last, err = l.client.Update(key, l.value, l.ttl)
if err != nil {
return
}
case <-stopLocking:
return
}
}
}
// WaitLock simply waits for the key to be available for creation
func (l *etcdLock) waitLock(key string, eventCh chan *etcd.Response, stopWatchCh chan bool) {
go l.client.Watch(key, 0, false, eventCh, stopWatchCh)
for event := range eventCh {
if event.Action == "delete" || event.Action == "expire" {
return
}
}
}
// Unlock the "key". Calling unlock while
// not holding the lock will throw an error
func (l *etcdLock) Unlock() error {
if l.stopLock != nil {
l.stopLock <- struct{}{}
}
if l.last != nil {
_, err := l.client.CompareAndDelete(store.Normalize(l.key), l.value, l.last.Node.ModifiedIndex)
if err != nil {
return err
}
}
return nil
}
// Close closes the client connection
func (s *Etcd) Close() {
return
}

View File

@ -0,0 +1,34 @@
package etcd
import (
"testing"
"time"
"github.com/docker/libkv/store"
"github.com/docker/libkv/testutils"
)
func makeEtcdClient(t *testing.T) store.Store {
client := "localhost:4001"
kv, err := New(
[]string{client},
&store.Config{
ConnectionTimeout: 3 * time.Second,
EphemeralTTL: 2 * time.Second,
},
)
if err != nil {
t.Fatalf("cannot create store: %v", err)
}
return kv
}
func TestEtcdStore(t *testing.T) {
kv := makeEtcdClient(t)
backup := makeEtcdClient(t)
testutils.RunTestStore(t, kv, backup)
}

View File

@ -0,0 +1,47 @@
package store
import (
"strings"
)
// CreateEndpoints creates a list of endpoints given the right scheme
func CreateEndpoints(addrs []string, scheme string) (entries []string) {
for _, addr := range addrs {
entries = append(entries, scheme+"://"+addr)
}
return entries
}
// Normalize the key for each store to the form:
//
// /path/to/key
//
func Normalize(key string) string {
return "/" + join(SplitKey(key))
}
// GetDirectory gets the full directory part of
// the key to the form:
//
// /path/to/
//
func GetDirectory(key string) string {
parts := SplitKey(key)
parts = parts[:len(parts)-1]
return "/" + join(parts)
}
// SplitKey splits the key to extract path informations
func SplitKey(key string) (path []string) {
if strings.Contains(key, "/") {
path = strings.Split(key, "/")
} else {
path = []string{key}
}
return path
}
// join the path parts with '/'
func join(parts []string) string {
return strings.Join(parts, "/")
}

View File

@ -0,0 +1,113 @@
package mock
import (
"github.com/docker/libkv/store"
"github.com/stretchr/testify/mock"
)
// Mock store. Mocks all Store functions using testify.Mock
type Mock struct {
mock.Mock
// Endpoints passed to InitializeMock
Endpoints []string
// Options passed to InitializeMock
Options *store.Config
}
// New creates a Mock store
func New(endpoints []string, options *store.Config) (store.Store, error) {
s := &Mock{}
s.Endpoints = endpoints
s.Options = options
return s, nil
}
// Put mock
func (s *Mock) Put(key string, value []byte, opts *store.WriteOptions) error {
args := s.Mock.Called(key, value, opts)
return args.Error(0)
}
// Get mock
func (s *Mock) Get(key string) (*store.KVPair, error) {
args := s.Mock.Called(key)
return args.Get(0).(*store.KVPair), args.Error(1)
}
// Delete mock
func (s *Mock) Delete(key string) error {
args := s.Mock.Called(key)
return args.Error(0)
}
// Exists mock
func (s *Mock) Exists(key string) (bool, error) {
args := s.Mock.Called(key)
return args.Bool(0), args.Error(1)
}
// Watch mock
func (s *Mock) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) {
args := s.Mock.Called(key, stopCh)
return args.Get(0).(<-chan *store.KVPair), args.Error(1)
}
// WatchTree mock
func (s *Mock) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) {
args := s.Mock.Called(prefix, stopCh)
return args.Get(0).(chan []*store.KVPair), args.Error(1)
}
// NewLock mock
func (s *Mock) NewLock(key string, options *store.LockOptions) (store.Locker, error) {
args := s.Mock.Called(key, options)
return args.Get(0).(store.Locker), args.Error(1)
}
// List mock
func (s *Mock) List(prefix string) ([]*store.KVPair, error) {
args := s.Mock.Called(prefix)
return args.Get(0).([]*store.KVPair), args.Error(1)
}
// DeleteTree mock
func (s *Mock) DeleteTree(prefix string) error {
args := s.Mock.Called(prefix)
return args.Error(0)
}
// AtomicPut mock
func (s *Mock) AtomicPut(key string, value []byte, previous *store.KVPair, opts *store.WriteOptions) (bool, *store.KVPair, error) {
args := s.Mock.Called(key, value, previous, opts)
return args.Bool(0), args.Get(1).(*store.KVPair), args.Error(2)
}
// AtomicDelete mock
func (s *Mock) AtomicDelete(key string, previous *store.KVPair) (bool, error) {
args := s.Mock.Called(key, previous)
return args.Bool(0), args.Error(1)
}
// Lock mock implementation of Locker
type Lock struct {
mock.Mock
}
// Lock mock
func (l *Lock) Lock() (<-chan struct{}, error) {
args := l.Mock.Called()
return args.Get(0).(<-chan struct{}), args.Error(1)
}
// Unlock mock
func (l *Lock) Unlock() error {
args := l.Mock.Called()
return args.Error(0)
}
// Close mock
func (s *Mock) Close() {
return
}

View File

@ -0,0 +1,118 @@
package store
import (
"crypto/tls"
"errors"
"time"
)
// Backend represents a KV Store Backend
type Backend string
const (
// CONSUL backend
CONSUL Backend = "consul"
// ETCD backend
ETCD Backend = "etcd"
// ZK backend
ZK Backend = "zk"
)
var (
// ErrNotSupported is thrown when the backend k/v store is not supported by libkv
ErrNotSupported = errors.New("Backend storage not supported yet, please choose another one")
// ErrNotImplemented is thrown when a method is not implemented by the current backend
ErrNotImplemented = errors.New("Call not implemented in current backend")
// ErrNotReachable is thrown when the API cannot be reached for issuing common store operations
ErrNotReachable = errors.New("Api not reachable")
// ErrCannotLock is thrown when there is an error acquiring a lock on a key
ErrCannotLock = errors.New("Error acquiring the lock")
// ErrKeyModified is thrown during an atomic operation if the index does not match the one in the store
ErrKeyModified = errors.New("Unable to complete atomic operation, key modified")
// ErrKeyNotFound is thrown when the key is not found in the store during a Get operation
ErrKeyNotFound = errors.New("Key not found in store")
// ErrPreviousNotSpecified is thrown when the previous value is not specified for an atomic operation
ErrPreviousNotSpecified = errors.New("Previous K/V pair should be provided for the Atomic operation")
)
// Config contains the options for a storage client
type Config struct {
TLS *tls.Config
ConnectionTimeout time.Duration
EphemeralTTL time.Duration
}
// Store represents the backend K/V storage
// Each store should support every call listed
// here. Or it couldn't be implemented as a K/V
// backend for libkv
type Store interface {
// Put a value at the specified key
Put(key string, value []byte, options *WriteOptions) error
// Get a value given its key
Get(key string) (*KVPair, error)
// Delete the value at the specified key
Delete(key string) error
// Verify if a Key exists in the store
Exists(key string) (bool, error)
// Watch for changes on a key
Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error)
// WatchTree watches for changes on child nodes under
// a given a directory
WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*KVPair, error)
// CreateLock for a given key.
// The returned Locker is not held and must be acquired
// with `.Lock`. The Value is optional.
NewLock(key string, options *LockOptions) (Locker, error)
// List the content of a given prefix
List(directory string) ([]*KVPair, error)
// DeleteTree deletes a range of keys under a given directory
DeleteTree(directory string) error
// Atomic operation on a single value
AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, *KVPair, error)
// Atomic delete of a single value
AtomicDelete(key string, previous *KVPair) (bool, error)
// Close the store connection
Close()
}
// KVPair represents {Key, Value, Lastindex} tuple
type KVPair struct {
Key string
Value []byte
LastIndex uint64
}
// WriteOptions contains optional request parameters
type WriteOptions struct {
Heartbeat time.Duration
Ephemeral bool
}
// LockOptions contains optional request parameters
type LockOptions struct {
Value []byte // Optional, value to associate with the lock
TTL time.Duration // Optional, expiration ttl associated with the lock
}
// WatchCallback is used for watch methods on keys
// and is triggered on key change
type WatchCallback func(entries ...*KVPair)
// Locker provides locking mechanism on top of the store.
// Similar to `sync.Lock` except it may return errors.
type Locker interface {
Lock() (<-chan struct{}, error)
Unlock() error
}

View File

@ -0,0 +1,355 @@
package zookeeper
import (
"strings"
"time"
"github.com/docker/libkv/store"
zk "github.com/samuel/go-zookeeper/zk"
)
const defaultTimeout = 10 * time.Second
// Zookeeper is the receiver type for
// the Store interface
type Zookeeper struct {
timeout time.Duration
client *zk.Conn
}
type zookeeperLock struct {
client *zk.Conn
lock *zk.Lock
key string
value []byte
}
// New creates a new Zookeeper client given a
// list of endpoints and an optional tls config
func New(endpoints []string, options *store.Config) (store.Store, error) {
s := &Zookeeper{}
s.timeout = defaultTimeout
// Set options
if options != nil {
if options.ConnectionTimeout != 0 {
s.setTimeout(options.ConnectionTimeout)
}
}
// Connect to Zookeeper
conn, _, err := zk.Connect(endpoints, s.timeout)
if err != nil {
return nil, err
}
s.client = conn
return s, nil
}
// setTimeout sets the timeout for connecting to Zookeeper
func (s *Zookeeper) setTimeout(time time.Duration) {
s.timeout = time
}
// Get the value at "key", returns the last modified index
// to use in conjunction to Atomic calls
func (s *Zookeeper) Get(key string) (pair *store.KVPair, err error) {
resp, meta, err := s.client.Get(store.Normalize(key))
if err != nil {
return nil, err
}
// If resp is nil, the key does not exist
if resp == nil {
return nil, store.ErrKeyNotFound
}
pair = &store.KVPair{
Key: key,
Value: resp,
LastIndex: uint64(meta.Version),
}
return pair, nil
}
// createFullPath creates the entire path for a directory
// that does not exist
func (s *Zookeeper) createFullPath(path []string, ephemeral bool) error {
for i := 1; i <= len(path); i++ {
newpath := "/" + strings.Join(path[:i], "/")
if i == len(path) && ephemeral {
_, err := s.client.Create(newpath, []byte{1}, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
return err
}
_, err := s.client.Create(newpath, []byte{1}, 0, zk.WorldACL(zk.PermAll))
if err != nil {
// Skip if node already exists
if err != zk.ErrNodeExists {
return err
}
}
}
return nil
}
// Put a value at "key"
func (s *Zookeeper) Put(key string, value []byte, opts *store.WriteOptions) error {
fkey := store.Normalize(key)
exists, err := s.Exists(key)
if err != nil {
return err
}
if !exists {
if opts != nil && opts.Ephemeral {
s.createFullPath(store.SplitKey(key), opts.Ephemeral)
} else {
s.createFullPath(store.SplitKey(key), false)
}
}
_, err = s.client.Set(fkey, value, -1)
return err
}
// Delete a value at "key"
func (s *Zookeeper) Delete(key string) error {
err := s.client.Delete(store.Normalize(key), -1)
return err
}
// Exists checks if the key exists inside the store
func (s *Zookeeper) Exists(key string) (bool, error) {
exists, _, err := s.client.Exists(store.Normalize(key))
if err != nil {
return false, err
}
return exists, nil
}
// Watch for changes on a "key"
// It returns a channel that will receive changes or pass
// on errors. Upon creation, the current value will first
// be sent to the channel. Providing a non-nil stopCh can
// be used to stop watching.
func (s *Zookeeper) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) {
// Get the key first
pair, err := s.Get(key)
if err != nil {
return nil, err
}
// Catch zk notifications and fire changes into the channel.
watchCh := make(chan *store.KVPair)
go func() {
defer close(watchCh)
// Get returns the current value to the channel prior
// to listening to any event that may occur on that key
watchCh <- pair
for {
_, _, eventCh, err := s.client.GetW(store.Normalize(key))
if err != nil {
return
}
select {
case e := <-eventCh:
if e.Type == zk.EventNodeDataChanged {
if entry, err := s.Get(key); err == nil {
watchCh <- entry
}
}
case <-stopCh:
// There is no way to stop GetW so just quit
return
}
}
}()
return watchCh, nil
}
// WatchTree watches for changes on a "directory"
// It returns a channel that will receive changes or pass
// on errors. Upon creating a watch, the current childs values
// will be sent to the channel .Providing a non-nil stopCh can
// be used to stop watching.
func (s *Zookeeper) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) {
// List the childrens first
entries, err := s.List(directory)
if err != nil {
return nil, err
}
// Catch zk notifications and fire changes into the channel.
watchCh := make(chan []*store.KVPair)
go func() {
defer close(watchCh)
// List returns the children values to the channel
// prior to listening to any events that may occur
// on those keys
watchCh <- entries
for {
_, _, eventCh, err := s.client.ChildrenW(store.Normalize(directory))
if err != nil {
return
}
select {
case e := <-eventCh:
if e.Type == zk.EventNodeChildrenChanged {
if kv, err := s.List(directory); err == nil {
watchCh <- kv
}
}
case <-stopCh:
// There is no way to stop GetW so just quit
return
}
}
}()
return watchCh, nil
}
// List child nodes of a given directory
func (s *Zookeeper) List(directory string) ([]*store.KVPair, error) {
keys, stat, err := s.client.Children(store.Normalize(directory))
if err != nil {
return nil, err
}
kv := []*store.KVPair{}
// FIXME Costly Get request for each child key..
for _, key := range keys {
pair, err := s.Get(directory + store.Normalize(key))
if err != nil {
return nil, err
}
kv = append(kv, &store.KVPair{
Key: key,
Value: []byte(pair.Value),
LastIndex: uint64(stat.Version),
})
}
return kv, nil
}
// DeleteTree deletes a range of keys under a given directory
func (s *Zookeeper) DeleteTree(directory string) error {
pairs, err := s.List(directory)
if err != nil {
return err
}
var reqs []interface{}
for _, pair := range pairs {
reqs = append(reqs, &zk.DeleteRequest{
Path: store.Normalize(directory + "/" + pair.Key),
Version: -1,
})
}
_, err = s.client.Multi(reqs...)
return err
}
// AtomicPut put a value at "key" if the key has not been
// modified in the meantime, throws an error if this is the case
func (s *Zookeeper) AtomicPut(key string, value []byte, previous *store.KVPair, _ *store.WriteOptions) (bool, *store.KVPair, error) {
if previous == nil {
return false, nil, store.ErrPreviousNotSpecified
}
meta, err := s.client.Set(store.Normalize(key), value, int32(previous.LastIndex))
if err != nil {
// Compare Failed
if err == zk.ErrBadVersion {
return false, nil, store.ErrKeyModified
}
return false, nil, err
}
pair := &store.KVPair{
Key: key,
Value: value,
LastIndex: uint64(meta.Version),
}
return true, pair, nil
}
// AtomicDelete deletes a value at "key" if the key
// has not been modified in the meantime, throws an
// error if this is the case
func (s *Zookeeper) AtomicDelete(key string, previous *store.KVPair) (bool, error) {
if previous == nil {
return false, store.ErrPreviousNotSpecified
}
err := s.client.Delete(store.Normalize(key), int32(previous.LastIndex))
if err != nil {
if err == zk.ErrBadVersion {
return false, store.ErrKeyModified
}
return false, err
}
return true, nil
}
// NewLock returns a handle to a lock struct which can
// be used to provide mutual exclusion on a key
func (s *Zookeeper) NewLock(key string, options *store.LockOptions) (lock store.Locker, err error) {
value := []byte("")
// Apply options
if options != nil {
if options.Value != nil {
value = options.Value
}
}
lock = &zookeeperLock{
client: s.client,
key: store.Normalize(key),
value: value,
lock: zk.NewLock(s.client, store.Normalize(key), zk.WorldACL(zk.PermAll)),
}
return lock, err
}
// Lock attempts to acquire the lock and blocks while
// doing so. It returns a channel that is closed if our
// lock is lost or if an error occurs
func (l *zookeeperLock) Lock() (<-chan struct{}, error) {
err := l.lock.Lock()
if err == nil {
// We hold the lock, we can set our value
// FIXME: The value is left behind
// (problematic for leader election)
_, err = l.client.Set(l.key, l.value, -1)
}
return make(chan struct{}), err
}
// Unlock the "key". Calling unlock while
// not holding the lock will throw an error
func (l *zookeeperLock) Unlock() error {
return l.lock.Unlock()
}
// Close closes the client connection
func (s *Zookeeper) Close() {
s.client.Close()
}

View File

@ -0,0 +1,34 @@
package zookeeper
import (
"testing"
"time"
"github.com/docker/libkv/store"
"github.com/docker/libkv/testutils"
)
func makeZkClient(t *testing.T) store.Store {
client := "localhost:2181"
kv, err := New(
[]string{client},
&store.Config{
ConnectionTimeout: 3 * time.Second,
EphemeralTTL: 2 * time.Second,
},
)
if err != nil {
t.Fatalf("cannot create store: %v", err)
}
return kv
}
func TestZkStore(t *testing.T) {
kv := makeZkClient(t)
backup := makeZkClient(t)
testutils.RunTestStore(t, kv, backup)
}

View File

@ -0,0 +1,395 @@
package testutils
import (
"testing"
"time"
"github.com/docker/libkv/store"
"github.com/stretchr/testify/assert"
)
// RunTestStore is an helper testing method that is
// called by each K/V backend sub-package testing
func RunTestStore(t *testing.T, kv store.Store, backup store.Store) {
testPutGetDelete(t, kv)
testWatch(t, kv)
testWatchTree(t, kv)
testAtomicPut(t, kv)
testAtomicDelete(t, kv)
testLockUnlock(t, kv)
testPutEphemeral(t, kv, backup)
testList(t, kv)
testDeleteTree(t, kv)
}
func testPutGetDelete(t *testing.T, kv store.Store) {
key := "foo"
value := []byte("bar")
// Put the key
err := kv.Put(key, value, nil)
assert.NoError(t, err)
// Get should return the value and an incremented index
pair, err := kv.Get(key)
assert.NoError(t, err)
if assert.NotNil(t, pair) {
assert.NotNil(t, pair.Value)
}
assert.Equal(t, pair.Value, value)
assert.NotEqual(t, pair.LastIndex, 0)
// Delete the key
err = kv.Delete(key)
assert.NoError(t, err)
// Get should fail
pair, err = kv.Get(key)
assert.Error(t, err)
assert.Nil(t, pair)
}
func testWatch(t *testing.T, kv store.Store) {
key := "hello"
value := []byte("world")
newValue := []byte("world!")
// Put the key
err := kv.Put(key, value, nil)
assert.NoError(t, err)
stopCh := make(<-chan struct{})
events, err := kv.Watch(key, stopCh)
assert.NoError(t, err)
assert.NotNil(t, events)
// Update loop
go func() {
timeout := time.After(1 * time.Second)
tick := time.Tick(250 * time.Millisecond)
for {
select {
case <-timeout:
return
case <-tick:
err := kv.Put(key, newValue, nil)
if assert.NoError(t, err) {
continue
}
return
}
}
}()
// Check for updates
eventCount := 1
for {
select {
case event := <-events:
assert.NotNil(t, event)
if eventCount == 1 {
assert.Equal(t, event.Key, key)
assert.Equal(t, event.Value, value)
} else {
assert.Equal(t, event.Key, key)
assert.Equal(t, event.Value, newValue)
}
eventCount++
// We received all the events we wanted to check
if eventCount >= 4 {
return
}
case <-time.After(4 * time.Second):
t.Fatal("Timeout reached")
return
}
}
}
func testWatchTree(t *testing.T, kv store.Store) {
dir := "tree"
node1 := "tree/node1"
value1 := []byte("node1")
node2 := "tree/node2"
value2 := []byte("node2")
node3 := "tree/node3"
value3 := []byte("node3")
err := kv.Put(node1, value1, nil)
assert.NoError(t, err)
err = kv.Put(node2, value2, nil)
assert.NoError(t, err)
err = kv.Put(node3, value3, nil)
assert.NoError(t, err)
stopCh := make(<-chan struct{})
events, err := kv.WatchTree(dir, stopCh)
assert.NoError(t, err)
assert.NotNil(t, events)
// Update loop
go func() {
timeout := time.After(250 * time.Millisecond)
for {
select {
case <-timeout:
err := kv.Delete(node3)
assert.NoError(t, err)
return
}
}
}()
// Check for updates
for {
select {
case event := <-events:
assert.NotNil(t, event)
// We received the Delete event on a child node
// Exit test successfully
if len(event) == 2 {
return
}
case <-time.After(4 * time.Second):
t.Fatal("Timeout reached")
return
}
}
}
func testAtomicPut(t *testing.T, kv store.Store) {
key := "hello"
value := []byte("world")
// Put the key
err := kv.Put(key, value, nil)
assert.NoError(t, err)
// Get should return the value and an incremented index
pair, err := kv.Get(key)
assert.NoError(t, err)
if assert.NotNil(t, pair) {
assert.NotNil(t, pair.Value)
}
assert.Equal(t, pair.Value, value)
assert.NotEqual(t, pair.LastIndex, 0)
// This CAS should fail: no previous
success, _, err := kv.AtomicPut("hello", []byte("WORLD"), nil, nil)
assert.Error(t, err)
assert.False(t, success)
// This CAS should succeed
success, _, err = kv.AtomicPut("hello", []byte("WORLD"), pair, nil)
assert.NoError(t, err)
assert.True(t, success)
// This CAS should fail
pair.LastIndex = 0
success, _, err = kv.AtomicPut("hello", []byte("WORLDWORLD"), pair, nil)
assert.Error(t, err)
assert.False(t, success)
}
func testAtomicDelete(t *testing.T, kv store.Store) {
key := "atomic"
value := []byte("world")
// Put the key
err := kv.Put(key, value, nil)
assert.NoError(t, err)
// Get should return the value and an incremented index
pair, err := kv.Get(key)
assert.NoError(t, err)
if assert.NotNil(t, pair) {
assert.NotNil(t, pair.Value)
}
assert.Equal(t, pair.Value, value)
assert.NotEqual(t, pair.LastIndex, 0)
tempIndex := pair.LastIndex
// AtomicDelete should fail
pair.LastIndex = 0
success, err := kv.AtomicDelete(key, pair)
assert.Error(t, err)
assert.False(t, success)
// AtomicDelete should succeed
pair.LastIndex = tempIndex
success, err = kv.AtomicDelete(key, pair)
assert.NoError(t, err)
assert.True(t, success)
}
func testLockUnlock(t *testing.T, kv store.Store) {
key := "foo"
value := []byte("bar")
// We should be able to create a new lock on key
lock, err := kv.NewLock(key, &store.LockOptions{Value: value})
assert.NoError(t, err)
assert.NotNil(t, lock)
// Lock should successfully succeed or block
lockChan, err := lock.Lock()
assert.NoError(t, err)
assert.NotNil(t, lockChan)
// Get should work
pair, err := kv.Get(key)
assert.NoError(t, err)
if assert.NotNil(t, pair) {
assert.NotNil(t, pair.Value)
}
assert.Equal(t, pair.Value, value)
assert.NotEqual(t, pair.LastIndex, 0)
// Unlock should succeed
err = lock.Unlock()
assert.NoError(t, err)
// Get should work
pair, err = kv.Get(key)
assert.NoError(t, err)
if assert.NotNil(t, pair) {
assert.NotNil(t, pair.Value)
}
assert.Equal(t, pair.Value, value)
assert.NotEqual(t, pair.LastIndex, 0)
}
func testPutEphemeral(t *testing.T, kv store.Store, otherConn store.Store) {
firstKey := "first"
firstValue := []byte("foo")
secondKey := "second"
secondValue := []byte("bar")
// Put the first key with the Ephemeral flag
err := otherConn.Put(firstKey, firstValue, &store.WriteOptions{Ephemeral: true})
assert.NoError(t, err)
// Put a second key with the Ephemeral flag
err = otherConn.Put(secondKey, secondValue, &store.WriteOptions{Ephemeral: true})
assert.NoError(t, err)
// Get on firstKey should work
pair, err := kv.Get(firstKey)
assert.NoError(t, err)
assert.NotNil(t, pair)
// Get on secondKey should work
pair, err = kv.Get(secondKey)
assert.NoError(t, err)
assert.NotNil(t, pair)
// Close the connection
otherConn.Close()
// Let the session expire
time.Sleep(3 * time.Second)
// Get on firstKey shouldn't work
pair, err = kv.Get(firstKey)
assert.Error(t, err)
assert.Nil(t, pair)
// Get on secondKey shouldn't work
pair, err = kv.Get(secondKey)
assert.Error(t, err)
assert.Nil(t, pair)
}
func testList(t *testing.T, kv store.Store) {
prefix := "nodes"
firstKey := "nodes/first"
firstValue := []byte("first")
secondKey := "nodes/second"
secondValue := []byte("second")
// Put the first key
err := kv.Put(firstKey, firstValue, nil)
assert.NoError(t, err)
// Put the second key
err = kv.Put(secondKey, secondValue, nil)
assert.NoError(t, err)
// List should work and return the two correct values
pairs, err := kv.List(prefix)
assert.NoError(t, err)
if assert.NotNil(t, pairs) {
assert.Equal(t, len(pairs), 2)
}
// Check pairs, those are not necessarily in Put order
for _, pair := range pairs {
if pair.Key == firstKey {
assert.Equal(t, pair.Value, firstValue)
}
if pair.Key == secondKey {
assert.Equal(t, pair.Value, secondValue)
}
}
// List should fail: the key does not exist
pairs, err = kv.List("idontexist")
assert.Error(t, err)
assert.Nil(t, pairs)
}
func testDeleteTree(t *testing.T, kv store.Store) {
prefix := "nodes"
firstKey := "nodes/first"
firstValue := []byte("first")
secondKey := "nodes/second"
secondValue := []byte("second")
// Put the first key
err := kv.Put(firstKey, firstValue, nil)
assert.NoError(t, err)
// Put the second key
err = kv.Put(secondKey, secondValue, nil)
assert.NoError(t, err)
// Get should work on the first Key
pair, err := kv.Get(firstKey)
assert.NoError(t, err)
if assert.NotNil(t, pair) {
assert.NotNil(t, pair.Value)
}
assert.Equal(t, pair.Value, firstValue)
assert.NotEqual(t, pair.LastIndex, 0)
// Get should work on the second Key
pair, err = kv.Get(secondKey)
assert.NoError(t, err)
if assert.NotNil(t, pair) {
assert.NotNil(t, pair.Value)
}
assert.Equal(t, pair.Value, secondValue)
assert.NotEqual(t, pair.LastIndex, 0)
// Delete Values under directory `nodes`
err = kv.DeleteTree(prefix)
assert.NoError(t, err)
// Get should fail on both keys
pair, err = kv.Get(firstKey)
assert.Error(t, err)
assert.Nil(t, pair)
pair, err = kv.Get(secondKey)
assert.Error(t, err)
assert.Nil(t, pair)
}