diff --git a/discovery/kv/kv.go b/discovery/kv/kv.go index 6e29d224a5..53c0803de2 100644 --- a/discovery/kv/kv.go +++ b/discovery/kv/kv.go @@ -112,6 +112,17 @@ func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-c // Forever: Create a store watch, watch until we get an error and then try again. // Will only stop if we receive a stopCh request. for { + // Create the path to watch if it does not exist yet + exists, err := s.store.Exists(s.path) + if err != nil { + errCh <- err + } + if !exists { + if err := s.store.Put(s.path, []byte(""), &store.WriteOptions{IsDir: true}); err != nil { + errCh <- err + } + } + // Set up a watch. watchCh, err := s.store.WatchTree(s.path, stopCh) if err != nil { diff --git a/discovery/kv/kv_test.go b/discovery/kv/kv_test.go index d1e827db7a..e313047d3e 100644 --- a/discovery/kv/kv_test.go +++ b/discovery/kv/kv_test.go @@ -69,8 +69,11 @@ func TestWatch(t *testing.T) { s := d.store.(*libkvmock.Mock) mockCh := make(chan []*store.KVPair) - // The first watch will fail. + // The first watch will fail on those three calls + s.On("Exists", "path/"+discoveryPath).Return(false, errors.New("test error")) + s.On("Put", "path/"+discoveryPath, mock.Anything, mock.Anything).Return(errors.New("test error")) s.On("WatchTree", "path/"+discoveryPath, mock.Anything).Return(mockCh, errors.New("test error")).Once() + // The second one will succeed. s.On("WatchTree", "path/"+discoveryPath, mock.Anything).Return(mockCh, nil).Once() expected := discovery.Entries{ @@ -89,7 +92,7 @@ func TestWatch(t *testing.T) { assert.EqualError(t, <-errCh, "test error") // We have to drain the error channel otherwise Watch will get stuck. go func() { - for _ = range errCh { + for range errCh { } }()