discovery: Push watch errors to a channel

Signed-off-by: Andrea Luzzardi <aluzzardi@gmail.com>
This commit is contained in:
Andrea Luzzardi 2015-05-16 13:47:15 -07:00
parent 5756e83fcb
commit f33c03af93
9 changed files with 64 additions and 53 deletions

View File

@ -4,6 +4,7 @@ import (
"fmt"
"os"
"path"
"strconv"
"time"
log "github.com/Sirupsen/logrus"
@ -77,28 +78,36 @@ func Run() {
Name: "list",
ShortName: "l",
Usage: "list nodes in a cluster",
Flags: []cli.Flag{
cli.IntFlag{
Name: "timeout",
Value: 10,
},
},
Action: func(c *cli.Context) {
dflag := getDiscovery(c)
if dflag == "" {
log.Fatalf("discovery required to list a cluster. See '%s list --help'.", c.App.Name)
}
timeout, err := strconv.ParseUint(c.String("timeout"), 0, 32)
if timeout < 1 || err != nil {
log.Fatal("--timeout should be an unsigned integer and greater than 0")
}
// FIXME Add and use separate timeout flag instead of forcing it
d, err := discovery.New(dflag, 10)
d, err := discovery.New(dflag, timeout)
if err != nil {
log.Fatal(err)
}
ch, err := d.Watch(nil)
if err != nil {
log.Fatal(err)
}
ch, errCh := d.Watch(nil)
select {
case entries := <-ch:
for _, entry := range entries {
fmt.Println(entry)
}
case <-time.After(10 * time.Second):
case err := <-errCh:
log.Fatal(err)
case <-time.After(time.Duration(timeout) * time.Second):
log.Fatal("Timed out")
}
},

View File

@ -51,7 +51,7 @@ var (
flHeartBeat = cli.IntFlag{
Name: "heartbeat, hb",
Value: 25,
Usage: "time in second between each heartbeat",
Usage: "time in seconds between each heartbeat",
}
flEnableCors = cli.BoolFlag{
Name: "api-enable-cors, cors",

View File

@ -68,11 +68,8 @@ func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, TLSConfig *t
if err != nil {
log.Fatal(err)
}
discoveryCh, err := cluster.discovery.Watch(nil)
if err != nil {
log.Fatal(err)
}
go cluster.monitorDiscovery(discoveryCh)
discoveryCh, errCh := cluster.discovery.Watch(nil)
go cluster.monitorDiscovery(discoveryCh, errCh)
return cluster, nil
}
@ -211,17 +208,19 @@ func (c *Cluster) addEngine(addr string) bool {
}
// Entries are Docker Engines
func (c *Cluster) monitorDiscovery(ch <-chan discovery.Entries) {
// Watch for changes in the discovery channel.
log.Error("starting monitor")
for entries := range ch {
log.Errorf("got %v from monitor", entries)
// Attempt to add every engine. `addEngine` will take care of duplicates.
// Since `addEngine` can be very slow (it has to connect to the
// engine), we are going to launch them in parallel.
for _, entry := range entries {
go c.addEngine(entry.String())
func (c *Cluster) monitorDiscovery(ch <-chan discovery.Entries, errCh <-chan error) {
// Watch changes on the discovery channel.
for {
select {
case entries := <-ch:
// Attempt to add every engine. `addEngine` will take care of duplicates.
// Since `addEngine` can be very slow (it has to connect to the
// engine), we are going to launch them in parallel.
for _, entry := range entries {
go c.addEngine(entry.String())
}
case err := <-errCh:
log.Errorf("Discovery error: %v", err)
}
}
}

View File

@ -57,7 +57,7 @@ type Discovery interface {
// Watch the discovery for entry changes.
// Returns a channel that will receive changes or an error.
// Providing a non-nil stopCh can be used to stop watching.
Watch(stopCh <-chan struct{}) (<-chan Entries, error)
Watch(stopCh <-chan struct{}) (<-chan Entries, <-chan error)
// Register to the discovery
Register(string) error

View File

@ -1,11 +1,11 @@
package file
import (
"fmt"
"io/ioutil"
"strings"
"time"
log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/discovery"
)
@ -50,15 +50,15 @@ func parseFileContent(content []byte) []string {
func (s *Discovery) fetch() (discovery.Entries, error) {
fileContent, err := ioutil.ReadFile(s.path)
if err != nil {
log.WithField("discovery", "file").Errorf("Failed to read '%s': %v", s.path, err)
return nil, err
return nil, fmt.Errorf("failed to read '%s': %v", s.path, err)
}
return discovery.CreateEntries(parseFileContent(fileContent))
}
// Watch is exported
func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, error) {
func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) {
ch := make(chan discovery.Entries)
errCh := make(chan error)
ticker := time.NewTicker(time.Duration(s.heartbeat) * time.Second)
go func() {
@ -74,6 +74,7 @@ func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, err
case <-ticker.C:
newEntries, err := s.fetch()
if err != nil {
errCh <- err
continue
}
@ -89,7 +90,7 @@ func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, err
}
}()
return ch, nil
return ch, errCh
}
// Register is exported

View File

@ -63,7 +63,7 @@ func (s *Discovery) Initialize(uris string, heartbeat uint64) error {
// Watch the store until either there's a store error or we receive a stop request.
// Returns false if we shouldn't attempt watching the store anymore (stop request received).
func (s *Discovery) watchOnce(stopCh <-chan struct{}, watchCh <-chan []*store.KVPair, discoveryCh chan discovery.Entries) bool {
func (s *Discovery) watchOnce(stopCh <-chan struct{}, watchCh <-chan []*store.KVPair, discoveryCh chan discovery.Entries, errCh chan error) bool {
for {
select {
case pairs := <-watchCh:
@ -79,7 +79,10 @@ func (s *Discovery) watchOnce(stopCh <-chan struct{}, watchCh <-chan []*store.KV
addrs = append(addrs, string(pair.Value))
}
if entries, err := discovery.CreateEntries(addrs); err == nil {
entries, err := discovery.CreateEntries(addrs)
if err != nil {
errCh <- err
} else {
discoveryCh <- entries
}
case <-stopCh:
@ -90,8 +93,10 @@ func (s *Discovery) watchOnce(stopCh <-chan struct{}, watchCh <-chan []*store.KV
}
// Watch is exported
func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, error) {
func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) {
ch := make(chan discovery.Entries)
errCh := make(chan error)
go func() {
// Forever: Create a store watch, watch until we get an error and then try again.
// Will only stop if we receive a stopCh request.
@ -99,22 +104,20 @@ func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, err
// Set up a watch.
watchCh, err := s.store.WatchTree(s.prefix, stopCh)
if err != nil {
log.WithField("discovery", s.backend).Errorf("Unable to set up a watch: %v", err)
errCh <- err
} else {
if !s.watchOnce(stopCh, watchCh, ch) {
log.WithField("discovery", s.backend).Infof("Shutting down")
if !s.watchOnce(stopCh, watchCh, ch, errCh) {
return
}
}
// If we get here it means the store watch channel was closed. This
// is unexpected so let's retry later.
log.WithField("discovery", s.backend).Errorf("Unexpected watch error. Retrying in %s", time.Duration(s.heartbeat))
errCh <- fmt.Errorf("Unexpected watch error")
time.Sleep(s.heartbeat)
}
}()
return ch, nil
return ch, errCh
}
// Register is exported

View File

@ -31,7 +31,7 @@ func (s *Discovery) Initialize(uris string, _ uint64) error {
}
// Watch is exported
func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, error) {
func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) {
ch := make(chan discovery.Entries)
go func() {
ch <- s.entries

View File

@ -9,7 +9,6 @@ import (
"strings"
"time"
log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/discovery"
)
@ -57,27 +56,27 @@ func (s *Discovery) fetch() (discovery.Entries, error) {
var addrs []string
if resp.StatusCode == http.StatusOK {
if err := json.NewDecoder(resp.Body).Decode(&addrs); err != nil {
log.WithField("discovery", "token").Errorf("Failed to decode response: %v", err)
return nil, err
return nil, fmt.Errorf("Failed to decode response: %v", err)
}
} else {
err := fmt.Errorf("Failed to fetch entries, Discovery service returned %d HTTP status code", resp.StatusCode)
log.WithField("discovery", "token").Error(err)
return nil, err
return nil, fmt.Errorf("Failed to fetch entries, Discovery service returned %d HTTP status code", resp.StatusCode)
}
return discovery.CreateEntries(addrs)
}
// Watch is exported
func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, error) {
func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) {
ch := make(chan discovery.Entries)
ticker := time.NewTicker(time.Duration(s.heartbeat) * time.Second)
errCh := make(chan error)
go func() {
// Send the initial entries if available.
currentEntries, err := s.fetch()
if err == nil {
if err != nil {
errCh <- err
} else {
ch <- currentEntries
}
@ -87,6 +86,7 @@ func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, err
case <-ticker.C:
newEntries, err := s.fetch()
if err != nil {
errCh <- err
continue
}

View File

@ -1,7 +1,6 @@
package token
import (
"log"
"testing"
"time"
@ -35,13 +34,13 @@ func TestRegister(t *testing.T) {
assert.NoError(t, d.Register(expected))
// Watch
ch, err := d.Watch(nil)
assert.NoError(t, err)
ch, errCh := d.Watch(nil)
select {
case entries := <-ch:
log.Printf("%v %v", entries, expectedEntries)
assert.True(t, entries.Equals(expectedEntries))
case <-time.After(2 * time.Second):
case err := <-errCh:
t.Fatal(err)
case <-time.After(5 * time.Second):
t.Fatal("Timed out")
}