diff --git a/cluster/engine.go b/cluster/engine.go index b62efa822d..0a6af8f47f 100644 --- a/cluster/engine.go +++ b/cluster/engine.go @@ -26,6 +26,27 @@ const ( minSupportedVersion = version.Version("1.6.0") ) +type engineState int + +const ( + // pending means an engine added to cluster has not been validated + statePending engineState = iota + // unhealthy means an engine is unreachable + stateUnhealthy + // healthy means an engine is reachable + stateHealthy + // maintenance means an engine is under maintenance. + // There is no action to migrate a node into maintenance state yet. + //stateMaintenance +) + +var stateText = map[engineState]string{ + statePending: "Pending", + stateUnhealthy: "Unhealthy", + stateHealthy: "Healthy", + //stateMaintenance: "Maintenance", +} + // delayer offers a simple API to random delay within a given time range. type delayer struct { rangeMin time.Duration @@ -82,7 +103,9 @@ type Engine struct { volumes map[string]*Volume client dockerclient.Client eventHandler EventHandler - healthy bool + state engineState + lastError string + updatedAt time.Time failureCount int overcommitRatio int64 opts *EngineOpts @@ -99,7 +122,8 @@ func NewEngine(addr string, overcommitRatio float64, opts *EngineOpts) *Engine { containers: make(map[string]*Container), networks: make(map[string]*Network), volumes: make(map[string]*Volume), - healthy: true, + state: statePending, + updatedAt: time.Now(), overcommitRatio: int64(overcommitRatio * 100), opts: opts, } @@ -153,9 +177,6 @@ func (e *Engine) ConnectWithClient(client dockerclient.Client) error { e.RefreshVolumes() e.RefreshNetworks() - // Start the update loop. - go e.refreshLoop() - e.emitEvent("engine_connect") return nil @@ -182,45 +203,111 @@ func (e *Engine) isConnected() bool { // IsHealthy returns true if the engine is healthy func (e *Engine) IsHealthy() bool { - return e.healthy + e.RLock() + e.RUnlock() + return e.state == stateHealthy } -// setHealthy sets engine healthy state -func (e *Engine) setHealthy(state bool) { +// setState sets engine healthy state +func (e *Engine) setState(state engineState) { e.Lock() - e.healthy = state + defer e.Unlock() + e.state = state // if engine is healthy, clear failureCount - if state { + if state == stateHealthy { e.failureCount = 0 } - e.Unlock() +} + +// TimeToValidate returns true if a pending node is up for validation +func (e *Engine) TimeToValidate() bool { + e.Lock() + defer e.Unlock() + if e.state != statePending { + return false + } + sinceLastUpdate := time.Since(e.updatedAt) + // Increase check interval for a pending engine according to failureCount and cap it at 4 hours + if sinceLastUpdate > 4*time.Hour || sinceLastUpdate > time.Duration(e.failureCount)*30*time.Second { + return true + } + return false +} + +// ValidationComplete transitions engine state from statePending to stateHealthy +func (e *Engine) ValidationComplete() { + e.Lock() + defer e.Unlock() + if e.state != statePending { + return + } + e.state = stateHealthy + e.failureCount = 0 + go e.refreshLoop() +} + +// setErrMsg sets error message for the engine +func (e *Engine) setErrMsg(errMsg string) { + e.Lock() + defer e.Unlock() + e.lastError = errMsg + e.updatedAt = time.Now() +} + +// ErrMsg returns error message for the engine +func (e *Engine) ErrMsg() string { + e.RLock() + defer e.RUnlock() + return e.lastError +} + +// HandleIDConflict handles ID duplicate with existing engine +func (e *Engine) HandleIDConflict(otherAddr string) { + e.setErrMsg(fmt.Sprintf("ID duplicated. %s shared by this node %s and another node %s", e.ID, e.Addr, otherAddr)) } // Status returns the health status of the Engine: Healthy or Unhealthy func (e *Engine) Status() string { - if e.healthy { - return "Healthy" - } - return "Unhealthy" + e.RLock() + defer e.RUnlock() + return stateText[e.state] } // incFailureCount increases engine's failure count, and set engine as unhealthy if threshold is crossed func (e *Engine) incFailureCount() { e.Lock() + defer e.Unlock() e.failureCount++ - if e.healthy && e.failureCount >= e.opts.FailureRetry { - e.healthy = false + if e.state == stateHealthy && e.failureCount >= e.opts.FailureRetry { + e.state = stateUnhealthy + log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Errorf("Flagging engine as unhealthy. Connect failed %d times", e.failureCount) + e.emitEvent("engine_disconnect") } - e.Unlock() } -// CheckConnectionErr checks error from client response and adjust engine healthy indicators +// UpdatedAt returns the previous updatedAt time +func (e *Engine) UpdatedAt() time.Time { + e.RLock() + defer e.RUnlock() + return e.updatedAt +} + +// CheckConnectionErr checks error from client response and adjusts engine healthy indicators func (e *Engine) CheckConnectionErr(err error) { if err == nil { - e.setHealthy(true) + e.setErrMsg("") + // If current state is unhealthy, change it to healthy + if e.state == stateUnhealthy { + log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Infof("Engine came back to life after %d retries. Hooray!", e.failureCount) + e.emitEvent("engine_reconnect") + e.setState(stateHealthy) + } return } + // update engine error message + e.setErrMsg(err.Error()) + // dockerclient defines ErrConnectionRefused error. but if http client is from swarm, it's not using // dockerclient. We need string matching for these cases. Remove the first character to deal with // case sensitive issue @@ -264,7 +351,19 @@ func (e *Engine) updateSpecs() error { return fmt.Errorf("engine %s is running an unsupported version of Docker Engine. Please upgrade to at least %s", e.Addr, minSupportedVersion) } - e.ID = info.ID + e.Lock() + defer e.Unlock() + // Swarm/docker identifies engine by ID. Updating ID but not updating cluster + // index will put the cluster into inconsistent state. If this happens, the + // engine should be put to pending state for re-validation. + if e.ID == "" { + e.ID = info.ID + } else if e.ID != info.ID { + e.state = statePending + message := fmt.Sprintf("Engine (ID: %s, Addr: %s) shows up with another ID:%s. Please remove it from cluster, it can be added back.", e.ID, e.Addr, info.ID) + e.lastError = message + return fmt.Errorf(message) + } e.Name = info.Name e.Cpus = info.NCPU e.Memory = info.MemTotal @@ -368,9 +467,7 @@ func (e *Engine) RefreshVolumes() error { // FIXME: unexport this method after mesos scheduler stops using it directly func (e *Engine) RefreshContainers(full bool) error { containers, err := e.client.ListContainers(true, false, "") - // e.CheckConnectionErr(err) is not appropriate here because refresh loop uses - // RefreshContainers function to fail/recover an engine. Adding CheckConnectionErr - // here would result in double count + e.CheckConnectionErr(err) if err != nil { return err } @@ -389,7 +486,6 @@ func (e *Engine) RefreshContainers(full bool) error { defer e.Unlock() e.containers = merged - log.WithFields(log.Fields{"id": e.ID, "name": e.Name}).Debugf("Updated engine state") return nil } @@ -469,6 +565,7 @@ func (e *Engine) updateContainer(c dockerclient.Container, containers map[string return containers, nil } +// refreshLoop periodically triggers engine refresh. func (e *Engine) refreshLoop() { for { @@ -481,33 +578,24 @@ func (e *Engine) refreshLoop() { return } + if !e.IsHealthy() { + if err = e.updateSpecs(); err != nil { + log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Errorf("Update engine specs failed: %v", err) + continue + } + e.client.StopAllMonitorEvents() + e.client.StartMonitorEvents(e.handler, nil) + } + err = e.RefreshContainers(false) if err == nil { // Do not check error as older daemon don't support this call e.RefreshVolumes() e.RefreshNetworks() - err = e.RefreshImages() - } - - if err != nil { - e.failureCount++ - if e.failureCount >= e.opts.FailureRetry && e.healthy { - e.emitEvent("engine_disconnect") - e.setHealthy(false) - log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Errorf("Flagging engine as dead. Updated state failed %d times: %v", e.failureCount, err) - } + e.RefreshImages() + log.WithFields(log.Fields{"id": e.ID, "name": e.Name}).Debugf("Engine update succeeded") } else { - if !e.healthy { - log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Infof("Engine came back to life after %d retries. Hooray!", e.failureCount) - if err := e.updateSpecs(); err != nil { - log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Errorf("Update engine specs failed: %v", err) - continue - } - e.client.StopAllMonitorEvents() - e.client.StartMonitorEvents(e.handler, nil) - e.emitEvent("engine_reconnect") - } - e.setHealthy(true) + log.WithFields(log.Fields{"id": e.ID, "name": e.Name}).Debugf("Engine refresh failed") } } } diff --git a/cluster/engine_test.go b/cluster/engine_test.go index 07635360be..2757242513 100644 --- a/cluster/engine_test.go +++ b/cluster/engine_test.go @@ -38,8 +38,29 @@ var ( } ) +func TestSetEngineState(t *testing.T) { + engine := NewEngine("test", 0, engOpts) + assert.True(t, engine.state == statePending) + engine.setState(stateUnhealthy) + assert.True(t, engine.state == stateUnhealthy) + engine.incFailureCount() + assert.True(t, engine.failureCount == 1) + engine.setState(stateHealthy) + assert.True(t, engine.state == stateHealthy) + assert.True(t, engine.failureCount == 0) +} + +func TestErrMsg(t *testing.T) { + engine := NewEngine("test", 0, engOpts) + assert.True(t, len(engine.ErrMsg()) == 0) + message := "cannot connect" + engine.setErrMsg(message) + assert.True(t, engine.ErrMsg() == message) +} + func TestEngineFailureCount(t *testing.T) { engine := NewEngine("test", 0, engOpts) + engine.setState(stateHealthy) for i := 0; i < engine.opts.FailureRetry; i++ { assert.True(t, engine.IsHealthy()) engine.incFailureCount() @@ -82,6 +103,7 @@ func TestOutdatedEngine(t *testing.T) { func TestEngineCpusMemory(t *testing.T) { engine := NewEngine("test", 0, engOpts) + engine.setState(stateUnhealthy) assert.False(t, engine.isConnected()) client := mockclient.NewMockClient() @@ -105,6 +127,7 @@ func TestEngineCpusMemory(t *testing.T) { func TestEngineSpecs(t *testing.T) { engine := NewEngine("test", 0, engOpts) + engine.setState(stateUnhealthy) assert.False(t, engine.isConnected()) client := mockclient.NewMockClient() @@ -133,6 +156,7 @@ func TestEngineSpecs(t *testing.T) { func TestEngineState(t *testing.T) { engine := NewEngine("test", 0, engOpts) + engine.setState(stateUnhealthy) assert.False(t, engine.isConnected()) client := mockclient.NewMockClient() @@ -185,6 +209,7 @@ func TestCreateContainer(t *testing.T) { client = mockclient.NewMockClient() ) + engine.setState(stateUnhealthy) client.On("Info").Return(mockInfo, nil) client.On("Version").Return(mockVersion, nil) client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return() @@ -242,6 +267,7 @@ func TestCreateContainer(t *testing.T) { func TestImages(t *testing.T) { engine := NewEngine("test", 0, engOpts) + engine.setState(stateHealthy) engine.images = []*Image{ {dockerclient.Image{Id: "a"}, engine}, {dockerclient.Image{Id: "b"}, engine}, @@ -279,6 +305,7 @@ func TestUsedCpus(t *testing.T) { ) engine := NewEngine("test", 0, engOpts) + engine.setState(stateHealthy) client := mockclient.NewMockClient() for _, hn := range hostNcpu { @@ -312,6 +339,7 @@ func TestContainerRemovedDuringRefresh(t *testing.T) { ) engine := NewEngine("test", 0, engOpts) + engine.setState(stateUnhealthy) assert.False(t, engine.isConnected()) // A container is removed before it can be inspected. diff --git a/cluster/mesos/driver.go b/cluster/mesos/driver.go index 62c3bb37a3..b39ebde5cf 100644 --- a/cluster/mesos/driver.go +++ b/cluster/mesos/driver.go @@ -47,6 +47,8 @@ func (c *Cluster) ResourceOffers(_ mesosscheduler.SchedulerDriver, offers []*mes if err := engine.Connect(c.TLSConfig); err != nil { log.Error(err) } else { + // Set engine state to healthy and start refresh loop + engine.ValidationComplete() s = newAgent(agentID, engine) c.agents[agentID] = s if err := s.engine.RegisterEventHandler(c); err != nil { diff --git a/cluster/swarm/cluster.go b/cluster/swarm/cluster.go index 461d4a2833..138923deb1 100644 --- a/cluster/swarm/cluster.go +++ b/cluster/swarm/cluster.go @@ -9,6 +9,7 @@ import ( "sort" "strings" "sync" + "time" log "github.com/Sirupsen/logrus" "github.com/docker/docker/pkg/stringid" @@ -51,6 +52,7 @@ type Cluster struct { eventHandler cluster.EventHandler engines map[string]*cluster.Engine + pendingEngines map[string]*cluster.Engine scheduler *scheduler.Scheduler discovery discovery.Discovery pendingContainers map[string]*pendingContainer @@ -66,6 +68,7 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, discovery cluster := &Cluster{ engines: make(map[string]*cluster.Engine), + pendingEngines: make(map[string]*cluster.Engine), scheduler: scheduler, TLSConfig: TLSConfig, discovery: discovery, @@ -80,6 +83,7 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, discovery discoveryCh, errCh := cluster.discovery.Watch(nil) go cluster.monitorDiscovery(discoveryCh, errCh) + go cluster.monitorPendingEngines() return cluster, nil } @@ -195,6 +199,9 @@ func (c *Cluster) getEngineByAddr(addr string) *cluster.Engine { c.RLock() defer c.RUnlock() + if engine, ok := c.pendingEngines[addr]; ok { + return engine + } for _, engine := range c.engines { if engine.Addr == addr { return engine @@ -217,11 +224,26 @@ func (c *Cluster) addEngine(addr string) bool { if err := engine.RegisterEventHandler(c); err != nil { log.Error(err) } + // Add it to pending engine map, indexed by address. This will prevent + // duplicates from entering + c.Lock() + c.pendingEngines[addr] = engine + c.Unlock() + // validatePendingEngine will start a thread to validate the engine. + // If the engine is reachable and valid, it'll be monitored and updated in a loop. + // If engine is not reachable, pending engines will be examined once in a while + go c.validatePendingEngine(engine) + + return true +} + +// validatePendingEngine connects to the engine, +func (c *Cluster) validatePendingEngine(engine *cluster.Engine) bool { // Attempt a connection to the engine. Since this is slow, don't get a hold // of the lock yet. if err := engine.Connect(c.TLSConfig); err != nil { - log.Error(err) + log.WithFields(log.Fields{"Addr": engine.Addr}).Debugf("Failed to validate pending node: %s", err) return false } @@ -233,16 +255,28 @@ func (c *Cluster) addEngine(addr string) bool { if old, exists := c.engines[engine.ID]; exists { if old.Addr != engine.Addr { log.Errorf("ID duplicated. %s shared by %s and %s", engine.ID, old.Addr, engine.Addr) + // Keep this engine in pendingEngines table and show its error. + // If it's ID duplication from VM clone, user see this message and can fix it. + // If the engine is rebooted and get new IP from DHCP, previous address will be removed + // from discovery after a while. + // In both cases, retry may fix the problem. + engine.HandleIDConflict(old.Addr) } else { log.Debugf("node %q (name: %q) with address %q is already registered", engine.ID, engine.Name, engine.Addr) + engine.Disconnect() + // Remove it from pendingEngines table + delete(c.pendingEngines, engine.Addr) } - engine.Disconnect() return false } - // Finally register the engine. + // Engine validated, move from pendingEngines table to engines table + delete(c.pendingEngines, engine.Addr) + // set engine state to healthy, and start refresh loop + engine.ValidationComplete() c.engines[engine.ID] = engine - log.Infof("Registered Engine %s at %s", engine.Name, addr) + + log.Infof("Registered Engine %s at %s", engine.Name, engine.Addr) return true } @@ -255,7 +289,12 @@ func (c *Cluster) removeEngine(addr string) bool { defer c.Unlock() engine.Disconnect() - delete(c.engines, engine.ID) + // it could be in pendingEngines or engines + if _, ok := c.pendingEngines[addr]; ok { + delete(c.pendingEngines, addr) + } else { + delete(c.engines, engine.ID) + } log.Infof("Removed Engine %s", engine.Name) return true } @@ -277,10 +316,8 @@ func (c *Cluster) monitorDiscovery(ch <-chan discovery.Entries, errCh <-chan err c.removeEngine(entry.String()) } - // Since `addEngine` can be very slow (it has to connect to the - // engine), we are going to do the adds in parallel. for _, entry := range added { - go c.addEngine(entry.String()) + c.addEngine(entry.String()) } case err := <-errCh: log.Errorf("Discovery error: %v", err) @@ -288,6 +325,26 @@ func (c *Cluster) monitorDiscovery(ch <-chan discovery.Entries, errCh <-chan err } } +// monitorPendingEngines checks if some previous unreachable/invalid engines have been fixed +func (c *Cluster) monitorPendingEngines() { + for { + // Don't need to do it frequently + time.Sleep(30 * time.Second) + // Get the list of pendingEngines + c.RLock() + pEngines := make([]*cluster.Engine, 0, len(c.pendingEngines)) + for _, e := range c.pendingEngines { + pEngines = append(pEngines, e) + } + c.RUnlock() + for _, e := range pEngines { + if e.TimeToValidate() { + go c.validatePendingEngine(e) + } + } + } +} + // Images returns all the images in the cluster. func (c *Cluster) Images() cluster.Images { c.RLock() @@ -672,7 +729,7 @@ func (c *Cluster) Volume(name string) *cluster.Volume { return nil } -// listNodes returns all the engines in the cluster. +// listNodes returns all validated engines in the cluster, excluding pendingEngines. func (c *Cluster) listNodes() []*node.Node { c.RLock() defer c.RUnlock() @@ -692,14 +749,18 @@ func (c *Cluster) listNodes() []*node.Node { } // listEngines returns all the engines in the cluster. +// This is for reporting, not scheduling, hence pendingEngines are included. func (c *Cluster) listEngines() []*cluster.Engine { c.RLock() defer c.RUnlock() - out := make([]*cluster.Engine, 0, len(c.engines)) + out := make([]*cluster.Engine, 0, len(c.engines)+len(c.pendingEngines)) for _, n := range c.engines { out = append(out, n) } + for _, n := range c.pendingEngines { + out = append(out, n) + } return out } @@ -744,6 +805,8 @@ func (c *Cluster) Info() [][]string { } sort.Strings(labels) info = append(info, []string{" └ Labels", fmt.Sprintf("%s", strings.Join(labels, ", "))}) + info = append(info, []string{" └ Error", engine.ErrMsg()}) + info = append(info, []string{" └ UpdatedAt", engine.UpdatedAt().UTC().Format(time.RFC3339)}) } return info diff --git a/cluster/swarm/cluster_test.go b/cluster/swarm/cluster_test.go index bb02d2918d..862fc00afd 100644 --- a/cluster/swarm/cluster_test.go +++ b/cluster/swarm/cluster_test.go @@ -25,7 +25,7 @@ func (nopCloser) Close() error { var ( mockInfo = &dockerclient.Info{ - ID: "id", + ID: "test-engine", Name: "name", NCPU: 10, MemTotal: 20,