diff --git a/.travis.yml b/.travis.yml index 3b899d0e70..02bc577675 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,7 +19,15 @@ install: - go get github.com/golang/lint/golint - go get github.com/GeertJohan/fgt +before_script: + - script/travis_consul.sh 0.5.2 + - script/travis_etcd.sh 2.0.11 + - script/travis_zk.sh 3.4.6 + script: + - ./consul agent -server -bootstrap-expect 1 -data-dir /tmp/consul -config-file=./config.json 1>/dev/null & + - ./etcd/etcd --listen-client-urls 'http://0.0.0.0:4001' --advertise-client-urls 'http://127.0.0.1:4001' >/dev/null 2>&1 & + - ./zk/bin/zkServer.sh start ./zk/conf/zoo.cfg 1> /dev/null - script/validate-gofmt - go vet ./... - fgt golint ./... diff --git a/pkg/store/consul.go b/pkg/store/consul.go index 39a01e11cd..694fcb3dd3 100644 --- a/pkg/store/consul.go +++ b/pkg/store/consul.go @@ -16,10 +16,6 @@ const ( // watched key has changed. This affects the minimum time it takes to // cancel a watch. DefaultWatchWaitTime = 15 * time.Second - - // MinimumTimeToLive is the minimum TTL value allowed by Consul for - // Ephemeral entries - MinimumTimeToLive = 10 * time.Second ) // Consul embeds the client and watches @@ -56,9 +52,7 @@ func InitializeConsul(endpoints []string, options *Config) (Store, error) { s.setTimeout(options.ConnectionTimeout) } if options.EphemeralTTL != 0 { - if err := s.setEphemeralTTL(options.EphemeralTTL); err != nil { - return nil, err - } + s.setEphemeralTTL(options.EphemeralTTL) } } @@ -87,12 +81,8 @@ func (s *Consul) setTimeout(time time.Duration) { } // SetEphemeralTTL sets the ttl for ephemeral nodes -func (s *Consul) setEphemeralTTL(ttl time.Duration) error { - if ttl < MinimumTimeToLive { - return ErrInvalidTTL - } +func (s *Consul) setEphemeralTTL(ttl time.Duration) { s.ephemeralTTL = ttl - return nil } // createEphemeralSession creates the global session diff --git a/pkg/store/consul_test.go b/pkg/store/consul_test.go new file mode 100644 index 0000000000..5f3c58fd2b --- /dev/null +++ b/pkg/store/consul_test.go @@ -0,0 +1,410 @@ +package store + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func makeConsulClient(t *testing.T) Store { + client := "localhost:8500" + + kv, err := NewStore( + CONSUL, + []string{client}, + &Config{ + ConnectionTimeout: 10 * time.Second, + EphemeralTTL: 2 * time.Second, + }, + ) + if err != nil { + t.Fatalf("cannot create store: %v", err) + } + + return kv +} + +func TestConsulPutGetDelete(t *testing.T) { + kv := makeConsulClient(t) + + key := "foo" + value := []byte("bar") + + // Put the key + err := kv.Put(key, value, nil) + assert.NoError(t, err) + + // Get should return the value and an incremented index + pair, err := kv.Get(key) + assert.NoError(t, err) + if assert.NotNil(t, pair) { + assert.NotNil(t, pair.Value) + } + assert.Equal(t, pair.Value, value) + assert.NotEqual(t, pair.LastIndex, 0) + + // Delete the key + err = kv.Delete(key) + assert.NoError(t, err) + + // Get should fail + pair, err = kv.Get(key) + assert.Error(t, err) + assert.Nil(t, pair) +} + +func TestConsulWatch(t *testing.T) { + kv := makeConsulClient(t) + + key := "hello" + value := []byte("world") + newValue := []byte("world!") + + // Put the key + err := kv.Put(key, value, nil) + assert.NoError(t, err) + + stopCh := make(<-chan struct{}) + events, err := kv.Watch(key, stopCh) + assert.NoError(t, err) + assert.NotNil(t, events) + + // Update loop + go func() { + timeout := time.After(1 * time.Second) + tick := time.Tick(250 * time.Millisecond) + for { + select { + case <-timeout: + return + case <-tick: + err := kv.Put(key, newValue, nil) + if assert.NoError(t, err) { + continue + } + return + } + } + }() + + eventCount := 1 + for { + select { + case event := <-events: + assert.NotNil(t, event) + if eventCount == 1 { + assert.Equal(t, event.Key, key) + assert.Equal(t, event.Value, value) + } else { + assert.Equal(t, event.Key, key) + assert.Equal(t, event.Value, newValue) + } + eventCount++ + // We received all the events we wanted to check + if eventCount >= 4 { + return + } + case <-stopCh: + return + } + } +} + +func TestConsulWatchTree(t *testing.T) { + kv := makeConsulClient(t) + + dir := "tree" + + node1 := "tree/node1" + value1 := []byte("node1") + + node2 := "tree/node2" + value2 := []byte("node2") + + node3 := "tree/node3" + value3 := []byte("node3") + + newValue := []byte("world!") + + err := kv.Put(node1, value1, nil) + assert.NoError(t, err) + err = kv.Put(node2, value2, nil) + assert.NoError(t, err) + err = kv.Put(node3, value3, nil) + assert.NoError(t, err) + + stopCh := make(<-chan struct{}) + events, err := kv.WatchTree(dir, stopCh) + assert.NoError(t, err) + assert.NotNil(t, events) + + // Update loop + go func() { + timeout := time.After(1 * time.Second) + tick := time.Tick(250 * time.Millisecond) + for { + select { + case <-timeout: + return + case <-tick: + err := kv.Put(node1, newValue, nil) + if assert.NoError(t, err) { + continue + } + return + } + } + }() + + // Check for updates + eventCount := 1 + for { + select { + case event := <-events: + assert.NotNil(t, event) + assert.Equal(t, len(event), 3) + eventCount++ + // We received all the events we wanted to check + if eventCount >= 4 { + return + } + case <-stopCh: + return + } + } +} + +func TestConsulAtomicPut(t *testing.T) { + kv := makeConsulClient(t) + + key := "hello" + value := []byte("world") + + // Put the key + err := kv.Put(key, value, nil) + assert.NoError(t, err) + + // Get should return the value and an incremented index + pair, err := kv.Get(key) + assert.NoError(t, err) + if assert.NotNil(t, pair) { + assert.NotNil(t, pair.Value) + } + assert.Equal(t, pair.Value, value) + assert.NotEqual(t, pair.LastIndex, 0) + + // This CAS should succeed + success, _, err := kv.AtomicPut("hello", []byte("WORLD"), pair, nil) + assert.NoError(t, err) + assert.True(t, success) + + // This CAS should fail + pair.LastIndex = 0 + success, _, err = kv.AtomicPut("hello", []byte("WORLDWORLD"), pair, nil) + assert.Error(t, err) + assert.False(t, success) +} + +func TestConsulAtomicDelete(t *testing.T) { + kv := makeConsulClient(t) + + key := "atomic" + value := []byte("world") + + // Put the key + err := kv.Put(key, value, nil) + assert.NoError(t, err) + + // Get should return the value and an incremented index + pair, err := kv.Get(key) + assert.NoError(t, err) + if assert.NotNil(t, pair) { + assert.NotNil(t, pair.Value) + } + assert.Equal(t, pair.Value, value) + assert.NotEqual(t, pair.LastIndex, 0) + + tempIndex := pair.LastIndex + + // AtomicDelete should fail + pair.LastIndex = 0 + success, err := kv.AtomicDelete(key, pair) + assert.Error(t, err) + assert.False(t, success) + + // AtomicDelete should succeed + pair.LastIndex = tempIndex + success, err = kv.AtomicDelete(key, pair) + assert.NoError(t, err) + assert.True(t, success) +} + +func TestConsulLockUnlock(t *testing.T) { + t.Parallel() + kv := makeConsulClient(t) + + key := "foo" + value := []byte("bar") + + // We should be able to create a new lock on key + lock, err := kv.NewLock(key, &LockOptions{Value: value}) + assert.NoError(t, err) + assert.NotNil(t, lock) + + // Lock should successfully succeed or block + lockChan, err := lock.Lock() + assert.NoError(t, err) + assert.NotNil(t, lockChan) + + // Get should work + pair, err := kv.Get(key) + assert.NoError(t, err) + if assert.NotNil(t, pair) { + assert.NotNil(t, pair.Value) + } + assert.Equal(t, pair.Value, value) + assert.NotEqual(t, pair.LastIndex, 0) + + // Unlock should succeed + err = lock.Unlock() + assert.NoError(t, err) + + // Get should work + pair, err = kv.Get(key) + assert.NoError(t, err) + if assert.NotNil(t, pair) { + assert.NotNil(t, pair.Value) + } + assert.Equal(t, pair.Value, value) + assert.NotEqual(t, pair.LastIndex, 0) +} + +func TestConsulPutEphemeral(t *testing.T) { + kv := makeConsulClient(t) + + firstKey := "foo" + firstValue := []byte("foo") + + secondKey := "bar" + secondValue := []byte("bar") + + // Put the first key with the Ephemeral flag + err := kv.Put(firstKey, firstValue, &WriteOptions{Ephemeral: true}) + assert.NoError(t, err) + + // Put a second key with the Ephemeral flag + err = kv.Put(secondKey, secondValue, &WriteOptions{Ephemeral: true}) + assert.NoError(t, err) + + // Get on firstKey should work + pair, err := kv.Get(firstKey) + assert.NoError(t, err) + assert.NotNil(t, pair) + + // Get on secondKey should work + pair, err = kv.Get(secondKey) + assert.NoError(t, err) + assert.NotNil(t, pair) + + // Let the session expire + time.Sleep(6 * time.Second) + + // Get on firstKey shouldn't work + pair, err = kv.Get(firstKey) + assert.Error(t, err) + assert.Nil(t, pair) + + // Get on secondKey shouldn't work + pair, err = kv.Get(secondKey) + assert.Error(t, err) + assert.Nil(t, pair) +} + +func TestConsulList(t *testing.T) { + kv := makeConsulClient(t) + + prefix := "nodes/" + + firstKey := "nodes/first" + firstValue := []byte("first") + + secondKey := "nodes/second" + secondValue := []byte("second") + + // Put the first key + err := kv.Put(firstKey, firstValue, nil) + assert.NoError(t, err) + + // Put the second key + err = kv.Put(secondKey, secondValue, nil) + assert.NoError(t, err) + + // List should work and return the two correct values + pairs, err := kv.List(prefix) + assert.NoError(t, err) + if assert.NotNil(t, pairs) { + assert.Equal(t, len(pairs), 2) + } + + // Check pairs, those are not necessarily in Put order + for _, pair := range pairs { + if pair.Key == firstKey { + assert.Equal(t, pair.Value, firstValue) + } + if pair.Key == secondKey { + assert.Equal(t, pair.Value, secondValue) + } + } +} + +func TestConsulDeleteTree(t *testing.T) { + kv := makeConsulClient(t) + + prefix := "nodes/" + + firstKey := "nodes/first" + firstValue := []byte("first") + + secondKey := "nodes/second" + secondValue := []byte("second") + + // Put the first key + err := kv.Put(firstKey, firstValue, nil) + assert.NoError(t, err) + + // Put the second key + err = kv.Put(secondKey, secondValue, nil) + assert.NoError(t, err) + + // Get should work on the first Key + pair, err := kv.Get(firstKey) + assert.NoError(t, err) + if assert.NotNil(t, pair) { + assert.NotNil(t, pair.Value) + } + assert.Equal(t, pair.Value, firstValue) + assert.NotEqual(t, pair.LastIndex, 0) + + // Get should work on the second Key + pair, err = kv.Get(secondKey) + assert.NoError(t, err) + if assert.NotNil(t, pair) { + assert.NotNil(t, pair.Value) + } + assert.Equal(t, pair.Value, secondValue) + assert.NotEqual(t, pair.LastIndex, 0) + + // Delete Values under directory `nodes` + err = kv.DeleteTree(prefix) + assert.NoError(t, err) + + // Get should fail on both keys + pair, err = kv.Get(firstKey) + assert.Error(t, err) + assert.Nil(t, pair) + + pair, err = kv.Get(secondKey) + assert.Error(t, err) + assert.Nil(t, pair) +} diff --git a/pkg/store/etcd_test.go b/pkg/store/etcd_test.go new file mode 100644 index 0000000000..b98da08068 --- /dev/null +++ b/pkg/store/etcd_test.go @@ -0,0 +1,412 @@ +package store + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func makeEtcdClient(t *testing.T) Store { + client := "localhost:4001" + + kv, err := NewStore( + ETCD, + []string{client}, + &Config{ + ConnectionTimeout: 10 * time.Second, + EphemeralTTL: 2 * time.Second, + }, + ) + if err != nil { + t.Fatalf("cannot create store: %v", err) + } + + return kv +} + +func TestEtcdPutGetDelete(t *testing.T) { + kv := makeEtcdClient(t) + + key := "foo" + value := []byte("bar") + + // Put the key + err := kv.Put(key, value, nil) + assert.NoError(t, err) + + // Get should return the value and an incremented index + pair, err := kv.Get(key) + assert.NoError(t, err) + if assert.NotNil(t, pair) { + assert.NotNil(t, pair.Value) + } + assert.Equal(t, pair.Value, value) + assert.NotEqual(t, pair.LastIndex, 0) + + // Delete the key + err = kv.Delete(key) + assert.NoError(t, err) + + // Get should fail + pair, err = kv.Get(key) + assert.Error(t, err) + assert.Nil(t, pair) +} + +func TestEtcdWatch(t *testing.T) { + kv := makeEtcdClient(t) + + // FIXME General key formatting issue + key := "/hello" + value := []byte("world") + newValue := []byte("world!") + + // Put the key + err := kv.Put(key, value, nil) + assert.NoError(t, err) + + stopCh := make(<-chan struct{}) + events, err := kv.Watch(key, stopCh) + assert.NoError(t, err) + assert.NotNil(t, events) + + // Update loop + go func() { + timeout := time.After(1 * time.Second) + tick := time.Tick(250 * time.Millisecond) + for { + select { + case <-timeout: + return + case <-tick: + err := kv.Put(key, newValue, nil) + if assert.NoError(t, err) { + continue + } + return + } + } + }() + + eventCount := 1 + for { + select { + case event := <-events: + assert.NotNil(t, event) + if eventCount == 1 { + assert.Equal(t, event.Key, key) + assert.Equal(t, event.Value, value) + } else { + assert.Equal(t, event.Key, key) + assert.Equal(t, event.Value, newValue) + } + eventCount++ + // We received all the events we wanted to check + if eventCount >= 4 { + return + } + case <-stopCh: + return + } + } +} + +func TestEtcdWatchTree(t *testing.T) { + kv := makeEtcdClient(t) + + // FIXME General key formatting issue + dir := "/tree" + + node1 := "tree/node1" + value1 := []byte("node1") + + node2 := "tree/node2" + value2 := []byte("node2") + + node3 := "tree/node3" + value3 := []byte("node3") + + newValue := []byte("world!") + + err := kv.Put(node1, value1, nil) + assert.NoError(t, err) + err = kv.Put(node2, value2, nil) + assert.NoError(t, err) + err = kv.Put(node3, value3, nil) + assert.NoError(t, err) + + stopCh := make(<-chan struct{}) + events, err := kv.WatchTree(dir, stopCh) + assert.NoError(t, err) + assert.NotNil(t, events) + + // Update loop + go func() { + timeout := time.After(1 * time.Second) + tick := time.Tick(250 * time.Millisecond) + for { + select { + case <-timeout: + return + case <-tick: + err := kv.Put(node1, newValue, nil) + if assert.NoError(t, err) { + continue + } + return + } + } + }() + + // Check for updates + eventCount := 1 + for { + select { + case event := <-events: + assert.NotNil(t, event) + assert.Equal(t, len(event), 3) + eventCount++ + // We received all the events we wanted to check + if eventCount >= 4 { + return + } + case <-stopCh: + return + } + } +} + +func TestEtcdAtomicPut(t *testing.T) { + kv := makeEtcdClient(t) + + key := "hello" + value := []byte("world") + + // Put the key + err := kv.Put(key, value, nil) + assert.NoError(t, err) + + // Get should return the value and an incremented index + pair, err := kv.Get(key) + assert.NoError(t, err) + if assert.NotNil(t, pair) { + assert.NotNil(t, pair.Value) + } + assert.Equal(t, pair.Value, value) + assert.NotEqual(t, pair.LastIndex, 0) + + // This CAS should succeed + success, _, err := kv.AtomicPut("hello", []byte("WORLD"), pair, nil) + assert.NoError(t, err) + assert.True(t, success) + + // This CAS should fail + pair.LastIndex = 0 + success, _, err = kv.AtomicPut("hello", []byte("WORLDWORLD"), pair, nil) + assert.Error(t, err) + assert.False(t, success) +} + +func TestEtcdAtomicDelete(t *testing.T) { + kv := makeEtcdClient(t) + + key := "atomic" + value := []byte("world") + + // Put the key + err := kv.Put(key, value, nil) + assert.NoError(t, err) + + // Get should return the value and an incremented index + pair, err := kv.Get(key) + assert.NoError(t, err) + if assert.NotNil(t, pair) { + assert.NotNil(t, pair.Value) + } + assert.Equal(t, pair.Value, value) + assert.NotEqual(t, pair.LastIndex, 0) + + tempIndex := pair.LastIndex + + // AtomicDelete should fail + pair.LastIndex = 0 + success, err := kv.AtomicDelete(key, pair) + assert.Error(t, err) + assert.False(t, success) + + // AtomicDelete should succeed + pair.LastIndex = tempIndex + success, err = kv.AtomicDelete(key, pair) + assert.NoError(t, err) + assert.True(t, success) +} + +func TestEtcdLockUnlock(t *testing.T) { + t.Parallel() + kv := makeEtcdClient(t) + + key := "foo" + value := []byte("bar") + + // We should be able to create a new lock on key + lock, err := kv.NewLock(key, &LockOptions{Value: value}) + assert.NoError(t, err) + assert.NotNil(t, lock) + + // Lock should successfully succeed or block + lockChan, err := lock.Lock() + assert.NoError(t, err) + assert.NotNil(t, lockChan) + + // Get should work + pair, err := kv.Get(key) + assert.NoError(t, err) + if assert.NotNil(t, pair) { + assert.NotNil(t, pair.Value) + } + assert.Equal(t, pair.Value, value) + assert.NotEqual(t, pair.LastIndex, 0) + + // Unlock should succeed + err = lock.Unlock() + assert.NoError(t, err) + + // Get should work + pair, err = kv.Get(key) + assert.NoError(t, err) + if assert.NotNil(t, pair) { + assert.NotNil(t, pair.Value) + } + assert.Equal(t, pair.Value, value) + assert.NotEqual(t, pair.LastIndex, 0) +} + +func TestEtcdPutEphemeral(t *testing.T) { + kv := makeEtcdClient(t) + + firstKey := "foo" + firstValue := []byte("foo") + + secondKey := "bar" + secondValue := []byte("bar") + + // Put the first key with the Ephemeral flag + err := kv.Put(firstKey, firstValue, &WriteOptions{Ephemeral: true}) + assert.NoError(t, err) + + // Put a second key with the Ephemeral flag + err = kv.Put(secondKey, secondValue, &WriteOptions{Ephemeral: true}) + assert.NoError(t, err) + + // Get on firstKey should work + pair, err := kv.Get(firstKey) + assert.NoError(t, err) + assert.NotNil(t, pair) + + // Get on secondKey should work + pair, err = kv.Get(secondKey) + assert.NoError(t, err) + assert.NotNil(t, pair) + + // Let the session expire + time.Sleep(3 * time.Second) + + // Get on firstKey shouldn't work + pair, err = kv.Get(firstKey) + assert.Error(t, err) + assert.Nil(t, pair) + + // Get on secondKey shouldn't work + pair, err = kv.Get(secondKey) + assert.Error(t, err) + assert.Nil(t, pair) +} + +func TestEtcdList(t *testing.T) { + kv := makeEtcdClient(t) + + prefix := "nodes/" + + firstKey := "nodes/first" + firstValue := []byte("first") + + secondKey := "nodes/second" + secondValue := []byte("second") + + // Put the first key + err := kv.Put(firstKey, firstValue, nil) + assert.NoError(t, err) + + // Put the second key + err = kv.Put(secondKey, secondValue, nil) + assert.NoError(t, err) + + // List should work and return the two correct values + pairs, err := kv.List(prefix) + assert.NoError(t, err) + if assert.NotNil(t, pairs) { + assert.Equal(t, len(pairs), 2) + } + + // Check pairs, those are not necessarily in Put order + for _, pair := range pairs { + if pair.Key == firstKey { + assert.Equal(t, pair.Value, firstValue) + } + if pair.Key == secondKey { + assert.Equal(t, pair.Value, secondValue) + } + } +} + +func TestEtcdDeleteTree(t *testing.T) { + kv := makeEtcdClient(t) + + prefix := "nodes/" + + firstKey := "nodes/first" + firstValue := []byte("first") + + secondKey := "nodes/second" + secondValue := []byte("second") + + // Put the first key + err := kv.Put(firstKey, firstValue, nil) + assert.NoError(t, err) + + // Put the second key + err = kv.Put(secondKey, secondValue, nil) + assert.NoError(t, err) + + // Get should work on the first Key + pair, err := kv.Get(firstKey) + assert.NoError(t, err) + if assert.NotNil(t, pair) { + assert.NotNil(t, pair.Value) + } + assert.Equal(t, pair.Value, firstValue) + assert.NotEqual(t, pair.LastIndex, 0) + + // Get should work on the second Key + pair, err = kv.Get(secondKey) + assert.NoError(t, err) + if assert.NotNil(t, pair) { + assert.NotNil(t, pair.Value) + } + assert.Equal(t, pair.Value, secondValue) + assert.NotEqual(t, pair.LastIndex, 0) + + // Delete Values under directory `nodes` + err = kv.DeleteTree(prefix) + assert.NoError(t, err) + + // Get should fail on both keys + pair, err = kv.Get(firstKey) + assert.Error(t, err) + assert.Nil(t, pair) + + pair, err = kv.Get(secondKey) + assert.Error(t, err) + assert.Nil(t, pair) +} diff --git a/pkg/store/zookeeper_test.go b/pkg/store/zookeeper_test.go new file mode 100644 index 0000000000..cce12c3638 --- /dev/null +++ b/pkg/store/zookeeper_test.go @@ -0,0 +1,421 @@ +package store + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func makeZkClient(t *testing.T) Store { + client := "localhost:2181" + + kv, err := NewStore( + ZK, + []string{client}, + &Config{ + ConnectionTimeout: 10 * time.Second, + EphemeralTTL: 2 * time.Second, + }, + ) + if err != nil { + t.Fatalf("cannot create store: %v", err) + } + + return kv +} + +func TestZkPutGetDelete(t *testing.T) { + kv := makeZkClient(t) + + key := "foo" + value := []byte("bar") + + // Put the key + err := kv.Put(key, value, nil) + assert.NoError(t, err) + + // Get should return the value and an incremented index + pair, err := kv.Get(key) + assert.NoError(t, err) + if assert.NotNil(t, pair) { + assert.NotNil(t, pair.Value) + } + assert.Equal(t, pair.Value, value) + assert.NotEqual(t, pair.LastIndex, 0) + + // Delete the key + err = kv.Delete(key) + assert.NoError(t, err) + + // Get should fail + pair, err = kv.Get(key) + assert.Error(t, err) + assert.Nil(t, pair) +} + +func TestZkWatch(t *testing.T) { + kv := makeZkClient(t) + + key := "hello" + value := []byte("world") + newValue := []byte("world!") + + // Put the key + err := kv.Put(key, value, nil) + assert.NoError(t, err) + + stopCh := make(<-chan struct{}) + events, err := kv.Watch(key, stopCh) + assert.NoError(t, err) + assert.NotNil(t, events) + + // Update loop + go func() { + timeout := time.After(1 * time.Second) + tick := time.Tick(250 * time.Millisecond) + for { + select { + case <-timeout: + return + case <-tick: + err := kv.Put(key, newValue, nil) + if assert.NoError(t, err) { + continue + } + return + } + } + }() + + eventCount := 1 + for { + select { + case event := <-events: + assert.NotNil(t, event) + if eventCount == 1 { + assert.Equal(t, event.Key, key) + assert.Equal(t, event.Value, value) + } else { + assert.Equal(t, event.Key, key) + assert.Equal(t, event.Value, newValue) + } + eventCount++ + // We received all the events we wanted to check + if eventCount >= 4 { + return + } + case <-stopCh: + return + } + } +} + +func TestZkWatchTree(t *testing.T) { + kv := makeZkClient(t) + + dir := "tree" + + node1 := "tree/node1" + value1 := []byte("node1") + + node2 := "tree/node2" + value2 := []byte("node2") + + node3 := "tree/node3" + value3 := []byte("node3") + + newValue := []byte("world!") + + err := kv.Put(node1, value1, nil) + assert.NoError(t, err) + err = kv.Put(node2, value2, nil) + assert.NoError(t, err) + err = kv.Put(node3, value3, nil) + assert.NoError(t, err) + + stopCh := make(<-chan struct{}) + events, err := kv.WatchTree(dir, stopCh) + assert.NoError(t, err) + assert.NotNil(t, events) + + // Update loop + go func() { + timeout := time.After(750 * time.Millisecond) + tick := time.Tick(250 * time.Millisecond) + for { + select { + case <-timeout: + return + case <-tick: + // Zookeeper special case, only added and deleted + // nodes triggers the ChildrenW + err := kv.Put(node1, newValue, nil) + if assert.NoError(t, err) { + err = kv.Delete(node1) + assert.NoError(t, err) + continue + } + return + } + } + }() + + // Check for updates + eventCount := 1 + for { + select { + case event := <-events: + assert.NotNil(t, event) + eventCount++ + // We received all the events we wanted to check + if eventCount >= 3 { + return + } + case <-stopCh: + return + } + } +} + +func TestZkAtomicPut(t *testing.T) { + kv := makeZkClient(t) + + key := "hello" + value := []byte("world") + + // Put the key + err := kv.Put(key, value, nil) + assert.NoError(t, err) + + // Get should return the value and an incremented index + pair, err := kv.Get(key) + assert.NoError(t, err) + if assert.NotNil(t, pair) { + assert.NotNil(t, pair.Value) + } + assert.Equal(t, pair.Value, value) + assert.NotEqual(t, pair.LastIndex, 0) + + // This CAS should succeed + success, _, err := kv.AtomicPut("hello", []byte("WORLD"), pair, nil) + assert.NoError(t, err) + assert.True(t, success) + + // This CAS should fail + pair.LastIndex = 0 + success, _, err = kv.AtomicPut("hello", []byte("WORLDWORLD"), pair, nil) + assert.Error(t, err) + assert.False(t, success) +} + +func TestZkAtomicDelete(t *testing.T) { + kv := makeZkClient(t) + + key := "atomic" + value := []byte("world") + + // Put the key + err := kv.Put(key, value, nil) + assert.NoError(t, err) + + // Get should return the value and an incremented index + pair, err := kv.Get(key) + assert.NoError(t, err) + if assert.NotNil(t, pair) { + assert.NotNil(t, pair.Value) + } + assert.Equal(t, pair.Value, value) + assert.NotEqual(t, pair.LastIndex, 0) + + tempIndex := pair.LastIndex + + // AtomicDelete should fail + pair.LastIndex = 0 + success, err := kv.AtomicDelete(key, pair) + assert.Error(t, err) + assert.False(t, success) + + // AtomicDelete should succeed + pair.LastIndex = tempIndex + success, err = kv.AtomicDelete(key, pair) + assert.NoError(t, err) + assert.True(t, success) +} + +func TestZkLockUnlock(t *testing.T) { + t.Parallel() + kv := makeZkClient(t) + + key := "foo" + value := []byte("bar") + + // We should be able to create a new lock on key + lock, err := kv.NewLock(key, &LockOptions{Value: value}) + assert.NoError(t, err) + assert.NotNil(t, lock) + + // Lock should successfully succeed or block + lockChan, err := lock.Lock() + assert.NoError(t, err) + // FIXME zookeeper specific chan is nil + assert.Nil(t, lockChan) + + // Get should work + pair, err := kv.Get(key) + assert.NoError(t, err) + if assert.NotNil(t, pair) { + assert.NotNil(t, pair.Value) + } + assert.Equal(t, pair.Value, value) + assert.NotEqual(t, pair.LastIndex, 0) + + // Unlock should succeed + err = lock.Unlock() + assert.NoError(t, err) + + // Get should work + pair, err = kv.Get(key) + assert.NoError(t, err) + if assert.NotNil(t, pair) { + assert.NotNil(t, pair.Value) + } + assert.Equal(t, pair.Value, value) + assert.NotEqual(t, pair.LastIndex, 0) +} + +func TestZkPutEphemeral(t *testing.T) { + kv := makeZkClient(t) + + firstKey := "foo" + firstValue := []byte("foo") + + secondKey := "bar" + secondValue := []byte("bar") + + // Put the first key with the Ephemeral flag + err := kv.Put(firstKey, firstValue, &WriteOptions{Ephemeral: true}) + assert.NoError(t, err) + + // Put a second key with the Ephemeral flag + err = kv.Put(secondKey, secondValue, &WriteOptions{Ephemeral: true}) + assert.NoError(t, err) + + // Get on firstKey should work + pair, err := kv.Get(firstKey) + assert.NoError(t, err) + assert.NotNil(t, pair) + + // Get on secondKey should work + pair, err = kv.Get(secondKey) + assert.NoError(t, err) + assert.NotNil(t, pair) + + // Close Zookeeper's client connection + zk := kv.(*Zookeeper) + zk.client.Close() + + // Let the session expire + time.Sleep(6 * time.Second) + kv = makeZkClient(t) + + // Get on firstKey shouldn't work + pair, err = kv.Get(firstKey) + assert.Error(t, err) + assert.Nil(t, pair) + + // Get on secondKey shouldn't work + pair, err = kv.Get(secondKey) + assert.Error(t, err) + assert.Nil(t, pair) +} + +func TestZkList(t *testing.T) { + kv := makeZkClient(t) + + // FIXME key parsing with last `/` + prefix := "nodes" + + firstKey := "nodes/first" + firstValue := []byte("first") + + secondKey := "nodes/second" + secondValue := []byte("second") + + // Put the first key + err := kv.Put(firstKey, firstValue, nil) + assert.NoError(t, err) + + // Put the second key + err = kv.Put(secondKey, secondValue, nil) + assert.NoError(t, err) + + // List should work and return the two correct values + pairs, err := kv.List(prefix) + assert.NoError(t, err) + if assert.NotNil(t, pairs) { + assert.Equal(t, len(pairs), 2) + } + + // Check pairs, those are not necessarily in Put order + for _, pair := range pairs { + if pair.Key == firstKey { + assert.Equal(t, pair.Value, firstValue) + } + if pair.Key == secondKey { + assert.Equal(t, pair.Value, secondValue) + } + } +} + +func TestZkDeleteTree(t *testing.T) { + kv := makeZkClient(t) + + // FIXME key parsing with last `/` + prefix := "nodes" + + firstKey := "nodes/first" + firstValue := []byte("first") + + secondKey := "nodes/second" + secondValue := []byte("second") + + // Put the first key + err := kv.Put(firstKey, firstValue, nil) + assert.NoError(t, err) + + // Put the second key + err = kv.Put(secondKey, secondValue, nil) + assert.NoError(t, err) + + // Get should work on the first Key + pair, err := kv.Get(firstKey) + assert.NoError(t, err) + if assert.NotNil(t, pair) { + assert.NotNil(t, pair.Value) + } + assert.Equal(t, pair.Value, firstValue) + assert.NotEqual(t, pair.LastIndex, 0) + + // Get should work on the second Key + pair, err = kv.Get(secondKey) + assert.NoError(t, err) + if assert.NotNil(t, pair) { + assert.NotNil(t, pair.Value) + } + assert.Equal(t, pair.Value, secondValue) + assert.NotEqual(t, pair.LastIndex, 0) + + // Delete Values under directory `nodes` + err = kv.DeleteTree(prefix) + assert.NoError(t, err) + + // Get should fail on both keys + pair, err = kv.Get(firstKey) + assert.Error(t, err) + assert.Nil(t, pair) + + pair, err = kv.Get(secondKey) + assert.Error(t, err) + assert.Nil(t, pair) +} diff --git a/script/travis_consul.sh b/script/travis_consul.sh new file mode 100755 index 0000000000..5268c4326f --- /dev/null +++ b/script/travis_consul.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +if [ $# -gt 0 ] ; then + CONSUL_VERSION="$1" +else + CONSUL_VERSION="0.5.2" +fi + +# install consul +wget "https://dl.bintray.com/mitchellh/consul/${CONSUL_VERSION}_linux_amd64.zip" +unzip "${CONSUL_VERSION}_linux_amd64.zip" + +# make config for minimum ttl +touch config.json +echo "{\"session_ttl_min\": \"2s\"}" >> config.json + +# check +./consul --version diff --git a/script/travis_etcd.sh b/script/travis_etcd.sh new file mode 100755 index 0000000000..004154969a --- /dev/null +++ b/script/travis_etcd.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +if [ $# -gt 0 ] ; then + ETCD_VERSION="$1" +else + ETCD_VERSION="2.0.11" +fi + +curl -L https://github.com/coreos/etcd/releases/download/v$ETCD_VERSION/etcd-v$ETCD_VERSION-linux-amd64.tar.gz -o etcd-v$ETCD_VERSION-linux-amd64.tar.gz +tar xzvf etcd-v$ETCD_VERSION-linux-amd64.tar.gz +mv etcd-v$ETCD_VERSION-linux-amd64 etcd diff --git a/script/travis_zk.sh b/script/travis_zk.sh new file mode 100755 index 0000000000..9ba3b3da07 --- /dev/null +++ b/script/travis_zk.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +if [ $# -gt 0 ] ; then + ZK_VERSION="$1" +else + ZK_VERSION="3.4.6" +fi + +wget "http://mirrors.ukfast.co.uk/sites/ftp.apache.org/zookeeper/stable/zookeeper-${ZK_VERSION}.tar.gz" +tar -xvf "zookeeper-${ZK_VERSION}.tar.gz" +mv zookeeper-$ZK_VERSION zk +mv ./zk/conf/zoo_sample.cfg ./zk/conf/zoo.cfg