mirror of https://github.com/docker/docs.git
Improve node management.
1. Introduce pending state. Pending nodes need validation before moving to healthy state. Resolve issues of duplicate ID and dead node drop issues. 2. Expose error and last update time in docker info. 3. Use connect success/failure to drive state transition between healthy and unhealthy. Signed-off-by: Dong Chen <dongluo.chen@docker.com>
This commit is contained in:
parent
2aff182135
commit
995866d76c
|
|
@ -26,6 +26,27 @@ const (
|
||||||
minSupportedVersion = version.Version("1.6.0")
|
minSupportedVersion = version.Version("1.6.0")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type engineState int
|
||||||
|
|
||||||
|
const (
|
||||||
|
// pending means an engine added to cluster has not been validated
|
||||||
|
statePending engineState = iota
|
||||||
|
// unhealthy means an engine is unreachable
|
||||||
|
stateUnhealthy
|
||||||
|
// healthy means an engine is reachable
|
||||||
|
stateHealthy
|
||||||
|
// maintenance means an engine is under maintenance.
|
||||||
|
// There is no action to migrate a node into maintenance state yet.
|
||||||
|
//stateMaintenance
|
||||||
|
)
|
||||||
|
|
||||||
|
var stateText = map[engineState]string{
|
||||||
|
statePending: "Pending",
|
||||||
|
stateUnhealthy: "Unhealthy",
|
||||||
|
stateHealthy: "Healthy",
|
||||||
|
//stateMaintenance: "Maintenance",
|
||||||
|
}
|
||||||
|
|
||||||
// delayer offers a simple API to random delay within a given time range.
|
// delayer offers a simple API to random delay within a given time range.
|
||||||
type delayer struct {
|
type delayer struct {
|
||||||
rangeMin time.Duration
|
rangeMin time.Duration
|
||||||
|
|
@ -82,7 +103,9 @@ type Engine struct {
|
||||||
volumes map[string]*Volume
|
volumes map[string]*Volume
|
||||||
client dockerclient.Client
|
client dockerclient.Client
|
||||||
eventHandler EventHandler
|
eventHandler EventHandler
|
||||||
healthy bool
|
state engineState
|
||||||
|
lastError string
|
||||||
|
updatedAt time.Time
|
||||||
failureCount int
|
failureCount int
|
||||||
overcommitRatio int64
|
overcommitRatio int64
|
||||||
opts *EngineOpts
|
opts *EngineOpts
|
||||||
|
|
@ -99,7 +122,8 @@ func NewEngine(addr string, overcommitRatio float64, opts *EngineOpts) *Engine {
|
||||||
containers: make(map[string]*Container),
|
containers: make(map[string]*Container),
|
||||||
networks: make(map[string]*Network),
|
networks: make(map[string]*Network),
|
||||||
volumes: make(map[string]*Volume),
|
volumes: make(map[string]*Volume),
|
||||||
healthy: true,
|
state: statePending,
|
||||||
|
updatedAt: time.Now(),
|
||||||
overcommitRatio: int64(overcommitRatio * 100),
|
overcommitRatio: int64(overcommitRatio * 100),
|
||||||
opts: opts,
|
opts: opts,
|
||||||
}
|
}
|
||||||
|
|
@ -153,9 +177,6 @@ func (e *Engine) ConnectWithClient(client dockerclient.Client) error {
|
||||||
e.RefreshVolumes()
|
e.RefreshVolumes()
|
||||||
e.RefreshNetworks()
|
e.RefreshNetworks()
|
||||||
|
|
||||||
// Start the update loop.
|
|
||||||
go e.refreshLoop()
|
|
||||||
|
|
||||||
e.emitEvent("engine_connect")
|
e.emitEvent("engine_connect")
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -182,45 +203,111 @@ func (e *Engine) isConnected() bool {
|
||||||
|
|
||||||
// IsHealthy returns true if the engine is healthy
|
// IsHealthy returns true if the engine is healthy
|
||||||
func (e *Engine) IsHealthy() bool {
|
func (e *Engine) IsHealthy() bool {
|
||||||
return e.healthy
|
e.RLock()
|
||||||
|
e.RUnlock()
|
||||||
|
return e.state == stateHealthy
|
||||||
}
|
}
|
||||||
|
|
||||||
// setHealthy sets engine healthy state
|
// setState sets engine healthy state
|
||||||
func (e *Engine) setHealthy(state bool) {
|
func (e *Engine) setState(state engineState) {
|
||||||
e.Lock()
|
e.Lock()
|
||||||
e.healthy = state
|
defer e.Unlock()
|
||||||
|
e.state = state
|
||||||
// if engine is healthy, clear failureCount
|
// if engine is healthy, clear failureCount
|
||||||
if state {
|
if state == stateHealthy {
|
||||||
e.failureCount = 0
|
e.failureCount = 0
|
||||||
}
|
}
|
||||||
e.Unlock()
|
}
|
||||||
|
|
||||||
|
// TimeToValidate returns true if a pending node is up for validation
|
||||||
|
func (e *Engine) TimeToValidate() bool {
|
||||||
|
e.Lock()
|
||||||
|
defer e.Unlock()
|
||||||
|
if e.state != statePending {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
sinceLastUpdate := time.Since(e.updatedAt)
|
||||||
|
// Increase check interval for a pending engine according to failureCount and cap it at 4 hours
|
||||||
|
if sinceLastUpdate > 4*time.Hour || sinceLastUpdate > time.Duration(e.failureCount)*30*time.Second {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// ValidationComplete transitions engine state from statePending to stateHealthy
|
||||||
|
func (e *Engine) ValidationComplete() {
|
||||||
|
e.Lock()
|
||||||
|
defer e.Unlock()
|
||||||
|
if e.state != statePending {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
e.state = stateHealthy
|
||||||
|
e.failureCount = 0
|
||||||
|
go e.refreshLoop()
|
||||||
|
}
|
||||||
|
|
||||||
|
// setErrMsg sets error message for the engine
|
||||||
|
func (e *Engine) setErrMsg(errMsg string) {
|
||||||
|
e.Lock()
|
||||||
|
defer e.Unlock()
|
||||||
|
e.lastError = errMsg
|
||||||
|
e.updatedAt = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
// ErrMsg returns error message for the engine
|
||||||
|
func (e *Engine) ErrMsg() string {
|
||||||
|
e.RLock()
|
||||||
|
defer e.RUnlock()
|
||||||
|
return e.lastError
|
||||||
|
}
|
||||||
|
|
||||||
|
// HandleIDConflict handles ID duplicate with existing engine
|
||||||
|
func (e *Engine) HandleIDConflict(otherAddr string) {
|
||||||
|
e.setErrMsg(fmt.Sprintf("ID duplicated. %s shared by this node %s and another node %s", e.ID, e.Addr, otherAddr))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Status returns the health status of the Engine: Healthy or Unhealthy
|
// Status returns the health status of the Engine: Healthy or Unhealthy
|
||||||
func (e *Engine) Status() string {
|
func (e *Engine) Status() string {
|
||||||
if e.healthy {
|
e.RLock()
|
||||||
return "Healthy"
|
defer e.RUnlock()
|
||||||
}
|
return stateText[e.state]
|
||||||
return "Unhealthy"
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// incFailureCount increases engine's failure count, and set engine as unhealthy if threshold is crossed
|
// incFailureCount increases engine's failure count, and set engine as unhealthy if threshold is crossed
|
||||||
func (e *Engine) incFailureCount() {
|
func (e *Engine) incFailureCount() {
|
||||||
e.Lock()
|
e.Lock()
|
||||||
|
defer e.Unlock()
|
||||||
e.failureCount++
|
e.failureCount++
|
||||||
if e.healthy && e.failureCount >= e.opts.FailureRetry {
|
if e.state == stateHealthy && e.failureCount >= e.opts.FailureRetry {
|
||||||
e.healthy = false
|
e.state = stateUnhealthy
|
||||||
|
log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Errorf("Flagging engine as unhealthy. Connect failed %d times", e.failureCount)
|
||||||
|
e.emitEvent("engine_disconnect")
|
||||||
}
|
}
|
||||||
e.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// CheckConnectionErr checks error from client response and adjust engine healthy indicators
|
// UpdatedAt returns the previous updatedAt time
|
||||||
|
func (e *Engine) UpdatedAt() time.Time {
|
||||||
|
e.RLock()
|
||||||
|
defer e.RUnlock()
|
||||||
|
return e.updatedAt
|
||||||
|
}
|
||||||
|
|
||||||
|
// CheckConnectionErr checks error from client response and adjusts engine healthy indicators
|
||||||
func (e *Engine) CheckConnectionErr(err error) {
|
func (e *Engine) CheckConnectionErr(err error) {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
e.setHealthy(true)
|
e.setErrMsg("")
|
||||||
|
// If current state is unhealthy, change it to healthy
|
||||||
|
if e.state == stateUnhealthy {
|
||||||
|
log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Infof("Engine came back to life after %d retries. Hooray!", e.failureCount)
|
||||||
|
e.emitEvent("engine_reconnect")
|
||||||
|
e.setState(stateHealthy)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// update engine error message
|
||||||
|
e.setErrMsg(err.Error())
|
||||||
|
|
||||||
// dockerclient defines ErrConnectionRefused error. but if http client is from swarm, it's not using
|
// dockerclient defines ErrConnectionRefused error. but if http client is from swarm, it's not using
|
||||||
// dockerclient. We need string matching for these cases. Remove the first character to deal with
|
// dockerclient. We need string matching for these cases. Remove the first character to deal with
|
||||||
// case sensitive issue
|
// case sensitive issue
|
||||||
|
|
@ -264,7 +351,19 @@ func (e *Engine) updateSpecs() error {
|
||||||
return fmt.Errorf("engine %s is running an unsupported version of Docker Engine. Please upgrade to at least %s", e.Addr, minSupportedVersion)
|
return fmt.Errorf("engine %s is running an unsupported version of Docker Engine. Please upgrade to at least %s", e.Addr, minSupportedVersion)
|
||||||
}
|
}
|
||||||
|
|
||||||
e.ID = info.ID
|
e.Lock()
|
||||||
|
defer e.Unlock()
|
||||||
|
// Swarm/docker identifies engine by ID. Updating ID but not updating cluster
|
||||||
|
// index will put the cluster into inconsistent state. If this happens, the
|
||||||
|
// engine should be put to pending state for re-validation.
|
||||||
|
if e.ID == "" {
|
||||||
|
e.ID = info.ID
|
||||||
|
} else if e.ID != info.ID {
|
||||||
|
e.state = statePending
|
||||||
|
message := fmt.Sprintf("Engine (ID: %s, Addr: %s) shows up with another ID:%s. Please remove it from cluster, it can be added back.", e.ID, e.Addr, info.ID)
|
||||||
|
e.lastError = message
|
||||||
|
return fmt.Errorf(message)
|
||||||
|
}
|
||||||
e.Name = info.Name
|
e.Name = info.Name
|
||||||
e.Cpus = info.NCPU
|
e.Cpus = info.NCPU
|
||||||
e.Memory = info.MemTotal
|
e.Memory = info.MemTotal
|
||||||
|
|
@ -368,9 +467,7 @@ func (e *Engine) RefreshVolumes() error {
|
||||||
// FIXME: unexport this method after mesos scheduler stops using it directly
|
// FIXME: unexport this method after mesos scheduler stops using it directly
|
||||||
func (e *Engine) RefreshContainers(full bool) error {
|
func (e *Engine) RefreshContainers(full bool) error {
|
||||||
containers, err := e.client.ListContainers(true, false, "")
|
containers, err := e.client.ListContainers(true, false, "")
|
||||||
// e.CheckConnectionErr(err) is not appropriate here because refresh loop uses
|
e.CheckConnectionErr(err)
|
||||||
// RefreshContainers function to fail/recover an engine. Adding CheckConnectionErr
|
|
||||||
// here would result in double count
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -389,7 +486,6 @@ func (e *Engine) RefreshContainers(full bool) error {
|
||||||
defer e.Unlock()
|
defer e.Unlock()
|
||||||
e.containers = merged
|
e.containers = merged
|
||||||
|
|
||||||
log.WithFields(log.Fields{"id": e.ID, "name": e.Name}).Debugf("Updated engine state")
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -469,6 +565,7 @@ func (e *Engine) updateContainer(c dockerclient.Container, containers map[string
|
||||||
return containers, nil
|
return containers, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// refreshLoop periodically triggers engine refresh.
|
||||||
func (e *Engine) refreshLoop() {
|
func (e *Engine) refreshLoop() {
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
|
@ -481,33 +578,24 @@ func (e *Engine) refreshLoop() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !e.IsHealthy() {
|
||||||
|
if err = e.updateSpecs(); err != nil {
|
||||||
|
log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Errorf("Update engine specs failed: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
e.client.StopAllMonitorEvents()
|
||||||
|
e.client.StartMonitorEvents(e.handler, nil)
|
||||||
|
}
|
||||||
|
|
||||||
err = e.RefreshContainers(false)
|
err = e.RefreshContainers(false)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
// Do not check error as older daemon don't support this call
|
// Do not check error as older daemon don't support this call
|
||||||
e.RefreshVolumes()
|
e.RefreshVolumes()
|
||||||
e.RefreshNetworks()
|
e.RefreshNetworks()
|
||||||
err = e.RefreshImages()
|
e.RefreshImages()
|
||||||
}
|
log.WithFields(log.Fields{"id": e.ID, "name": e.Name}).Debugf("Engine update succeeded")
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
e.failureCount++
|
|
||||||
if e.failureCount >= e.opts.FailureRetry && e.healthy {
|
|
||||||
e.emitEvent("engine_disconnect")
|
|
||||||
e.setHealthy(false)
|
|
||||||
log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Errorf("Flagging engine as dead. Updated state failed %d times: %v", e.failureCount, err)
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
if !e.healthy {
|
log.WithFields(log.Fields{"id": e.ID, "name": e.Name}).Debugf("Engine refresh failed")
|
||||||
log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Infof("Engine came back to life after %d retries. Hooray!", e.failureCount)
|
|
||||||
if err := e.updateSpecs(); err != nil {
|
|
||||||
log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Errorf("Update engine specs failed: %v", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
e.client.StopAllMonitorEvents()
|
|
||||||
e.client.StartMonitorEvents(e.handler, nil)
|
|
||||||
e.emitEvent("engine_reconnect")
|
|
||||||
}
|
|
||||||
e.setHealthy(true)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -38,8 +38,29 @@ var (
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestSetEngineState(t *testing.T) {
|
||||||
|
engine := NewEngine("test", 0, engOpts)
|
||||||
|
assert.True(t, engine.state == statePending)
|
||||||
|
engine.setState(stateUnhealthy)
|
||||||
|
assert.True(t, engine.state == stateUnhealthy)
|
||||||
|
engine.incFailureCount()
|
||||||
|
assert.True(t, engine.failureCount == 1)
|
||||||
|
engine.setState(stateHealthy)
|
||||||
|
assert.True(t, engine.state == stateHealthy)
|
||||||
|
assert.True(t, engine.failureCount == 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestErrMsg(t *testing.T) {
|
||||||
|
engine := NewEngine("test", 0, engOpts)
|
||||||
|
assert.True(t, len(engine.ErrMsg()) == 0)
|
||||||
|
message := "cannot connect"
|
||||||
|
engine.setErrMsg(message)
|
||||||
|
assert.True(t, engine.ErrMsg() == message)
|
||||||
|
}
|
||||||
|
|
||||||
func TestEngineFailureCount(t *testing.T) {
|
func TestEngineFailureCount(t *testing.T) {
|
||||||
engine := NewEngine("test", 0, engOpts)
|
engine := NewEngine("test", 0, engOpts)
|
||||||
|
engine.setState(stateHealthy)
|
||||||
for i := 0; i < engine.opts.FailureRetry; i++ {
|
for i := 0; i < engine.opts.FailureRetry; i++ {
|
||||||
assert.True(t, engine.IsHealthy())
|
assert.True(t, engine.IsHealthy())
|
||||||
engine.incFailureCount()
|
engine.incFailureCount()
|
||||||
|
|
@ -82,6 +103,7 @@ func TestOutdatedEngine(t *testing.T) {
|
||||||
|
|
||||||
func TestEngineCpusMemory(t *testing.T) {
|
func TestEngineCpusMemory(t *testing.T) {
|
||||||
engine := NewEngine("test", 0, engOpts)
|
engine := NewEngine("test", 0, engOpts)
|
||||||
|
engine.setState(stateUnhealthy)
|
||||||
assert.False(t, engine.isConnected())
|
assert.False(t, engine.isConnected())
|
||||||
|
|
||||||
client := mockclient.NewMockClient()
|
client := mockclient.NewMockClient()
|
||||||
|
|
@ -105,6 +127,7 @@ func TestEngineCpusMemory(t *testing.T) {
|
||||||
|
|
||||||
func TestEngineSpecs(t *testing.T) {
|
func TestEngineSpecs(t *testing.T) {
|
||||||
engine := NewEngine("test", 0, engOpts)
|
engine := NewEngine("test", 0, engOpts)
|
||||||
|
engine.setState(stateUnhealthy)
|
||||||
assert.False(t, engine.isConnected())
|
assert.False(t, engine.isConnected())
|
||||||
|
|
||||||
client := mockclient.NewMockClient()
|
client := mockclient.NewMockClient()
|
||||||
|
|
@ -133,6 +156,7 @@ func TestEngineSpecs(t *testing.T) {
|
||||||
|
|
||||||
func TestEngineState(t *testing.T) {
|
func TestEngineState(t *testing.T) {
|
||||||
engine := NewEngine("test", 0, engOpts)
|
engine := NewEngine("test", 0, engOpts)
|
||||||
|
engine.setState(stateUnhealthy)
|
||||||
assert.False(t, engine.isConnected())
|
assert.False(t, engine.isConnected())
|
||||||
|
|
||||||
client := mockclient.NewMockClient()
|
client := mockclient.NewMockClient()
|
||||||
|
|
@ -185,6 +209,7 @@ func TestCreateContainer(t *testing.T) {
|
||||||
client = mockclient.NewMockClient()
|
client = mockclient.NewMockClient()
|
||||||
)
|
)
|
||||||
|
|
||||||
|
engine.setState(stateUnhealthy)
|
||||||
client.On("Info").Return(mockInfo, nil)
|
client.On("Info").Return(mockInfo, nil)
|
||||||
client.On("Version").Return(mockVersion, nil)
|
client.On("Version").Return(mockVersion, nil)
|
||||||
client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return()
|
client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return()
|
||||||
|
|
@ -242,6 +267,7 @@ func TestCreateContainer(t *testing.T) {
|
||||||
|
|
||||||
func TestImages(t *testing.T) {
|
func TestImages(t *testing.T) {
|
||||||
engine := NewEngine("test", 0, engOpts)
|
engine := NewEngine("test", 0, engOpts)
|
||||||
|
engine.setState(stateHealthy)
|
||||||
engine.images = []*Image{
|
engine.images = []*Image{
|
||||||
{dockerclient.Image{Id: "a"}, engine},
|
{dockerclient.Image{Id: "a"}, engine},
|
||||||
{dockerclient.Image{Id: "b"}, engine},
|
{dockerclient.Image{Id: "b"}, engine},
|
||||||
|
|
@ -279,6 +305,7 @@ func TestUsedCpus(t *testing.T) {
|
||||||
)
|
)
|
||||||
|
|
||||||
engine := NewEngine("test", 0, engOpts)
|
engine := NewEngine("test", 0, engOpts)
|
||||||
|
engine.setState(stateHealthy)
|
||||||
client := mockclient.NewMockClient()
|
client := mockclient.NewMockClient()
|
||||||
|
|
||||||
for _, hn := range hostNcpu {
|
for _, hn := range hostNcpu {
|
||||||
|
|
@ -312,6 +339,7 @@ func TestContainerRemovedDuringRefresh(t *testing.T) {
|
||||||
)
|
)
|
||||||
|
|
||||||
engine := NewEngine("test", 0, engOpts)
|
engine := NewEngine("test", 0, engOpts)
|
||||||
|
engine.setState(stateUnhealthy)
|
||||||
assert.False(t, engine.isConnected())
|
assert.False(t, engine.isConnected())
|
||||||
|
|
||||||
// A container is removed before it can be inspected.
|
// A container is removed before it can be inspected.
|
||||||
|
|
|
||||||
|
|
@ -47,6 +47,8 @@ func (c *Cluster) ResourceOffers(_ mesosscheduler.SchedulerDriver, offers []*mes
|
||||||
if err := engine.Connect(c.TLSConfig); err != nil {
|
if err := engine.Connect(c.TLSConfig); err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
} else {
|
} else {
|
||||||
|
// Set engine state to healthy and start refresh loop
|
||||||
|
engine.ValidationComplete()
|
||||||
s = newAgent(agentID, engine)
|
s = newAgent(agentID, engine)
|
||||||
c.agents[agentID] = s
|
c.agents[agentID] = s
|
||||||
if err := s.engine.RegisterEventHandler(c); err != nil {
|
if err := s.engine.RegisterEventHandler(c); err != nil {
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/Sirupsen/logrus"
|
||||||
"github.com/docker/docker/pkg/stringid"
|
"github.com/docker/docker/pkg/stringid"
|
||||||
|
|
@ -51,6 +52,7 @@ type Cluster struct {
|
||||||
|
|
||||||
eventHandler cluster.EventHandler
|
eventHandler cluster.EventHandler
|
||||||
engines map[string]*cluster.Engine
|
engines map[string]*cluster.Engine
|
||||||
|
pendingEngines map[string]*cluster.Engine
|
||||||
scheduler *scheduler.Scheduler
|
scheduler *scheduler.Scheduler
|
||||||
discovery discovery.Discovery
|
discovery discovery.Discovery
|
||||||
pendingContainers map[string]*pendingContainer
|
pendingContainers map[string]*pendingContainer
|
||||||
|
|
@ -66,6 +68,7 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, discovery
|
||||||
|
|
||||||
cluster := &Cluster{
|
cluster := &Cluster{
|
||||||
engines: make(map[string]*cluster.Engine),
|
engines: make(map[string]*cluster.Engine),
|
||||||
|
pendingEngines: make(map[string]*cluster.Engine),
|
||||||
scheduler: scheduler,
|
scheduler: scheduler,
|
||||||
TLSConfig: TLSConfig,
|
TLSConfig: TLSConfig,
|
||||||
discovery: discovery,
|
discovery: discovery,
|
||||||
|
|
@ -80,6 +83,7 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, discovery
|
||||||
|
|
||||||
discoveryCh, errCh := cluster.discovery.Watch(nil)
|
discoveryCh, errCh := cluster.discovery.Watch(nil)
|
||||||
go cluster.monitorDiscovery(discoveryCh, errCh)
|
go cluster.monitorDiscovery(discoveryCh, errCh)
|
||||||
|
go cluster.monitorPendingEngines()
|
||||||
|
|
||||||
return cluster, nil
|
return cluster, nil
|
||||||
}
|
}
|
||||||
|
|
@ -195,6 +199,9 @@ func (c *Cluster) getEngineByAddr(addr string) *cluster.Engine {
|
||||||
c.RLock()
|
c.RLock()
|
||||||
defer c.RUnlock()
|
defer c.RUnlock()
|
||||||
|
|
||||||
|
if engine, ok := c.pendingEngines[addr]; ok {
|
||||||
|
return engine
|
||||||
|
}
|
||||||
for _, engine := range c.engines {
|
for _, engine := range c.engines {
|
||||||
if engine.Addr == addr {
|
if engine.Addr == addr {
|
||||||
return engine
|
return engine
|
||||||
|
|
@ -217,11 +224,26 @@ func (c *Cluster) addEngine(addr string) bool {
|
||||||
if err := engine.RegisterEventHandler(c); err != nil {
|
if err := engine.RegisterEventHandler(c); err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
}
|
}
|
||||||
|
// Add it to pending engine map, indexed by address. This will prevent
|
||||||
|
// duplicates from entering
|
||||||
|
c.Lock()
|
||||||
|
c.pendingEngines[addr] = engine
|
||||||
|
c.Unlock()
|
||||||
|
|
||||||
|
// validatePendingEngine will start a thread to validate the engine.
|
||||||
|
// If the engine is reachable and valid, it'll be monitored and updated in a loop.
|
||||||
|
// If engine is not reachable, pending engines will be examined once in a while
|
||||||
|
go c.validatePendingEngine(engine)
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// validatePendingEngine connects to the engine,
|
||||||
|
func (c *Cluster) validatePendingEngine(engine *cluster.Engine) bool {
|
||||||
// Attempt a connection to the engine. Since this is slow, don't get a hold
|
// Attempt a connection to the engine. Since this is slow, don't get a hold
|
||||||
// of the lock yet.
|
// of the lock yet.
|
||||||
if err := engine.Connect(c.TLSConfig); err != nil {
|
if err := engine.Connect(c.TLSConfig); err != nil {
|
||||||
log.Error(err)
|
log.WithFields(log.Fields{"Addr": engine.Addr}).Debugf("Failed to validate pending node: %s", err)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -233,16 +255,28 @@ func (c *Cluster) addEngine(addr string) bool {
|
||||||
if old, exists := c.engines[engine.ID]; exists {
|
if old, exists := c.engines[engine.ID]; exists {
|
||||||
if old.Addr != engine.Addr {
|
if old.Addr != engine.Addr {
|
||||||
log.Errorf("ID duplicated. %s shared by %s and %s", engine.ID, old.Addr, engine.Addr)
|
log.Errorf("ID duplicated. %s shared by %s and %s", engine.ID, old.Addr, engine.Addr)
|
||||||
|
// Keep this engine in pendingEngines table and show its error.
|
||||||
|
// If it's ID duplication from VM clone, user see this message and can fix it.
|
||||||
|
// If the engine is rebooted and get new IP from DHCP, previous address will be removed
|
||||||
|
// from discovery after a while.
|
||||||
|
// In both cases, retry may fix the problem.
|
||||||
|
engine.HandleIDConflict(old.Addr)
|
||||||
} else {
|
} else {
|
||||||
log.Debugf("node %q (name: %q) with address %q is already registered", engine.ID, engine.Name, engine.Addr)
|
log.Debugf("node %q (name: %q) with address %q is already registered", engine.ID, engine.Name, engine.Addr)
|
||||||
|
engine.Disconnect()
|
||||||
|
// Remove it from pendingEngines table
|
||||||
|
delete(c.pendingEngines, engine.Addr)
|
||||||
}
|
}
|
||||||
engine.Disconnect()
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// Finally register the engine.
|
// Engine validated, move from pendingEngines table to engines table
|
||||||
|
delete(c.pendingEngines, engine.Addr)
|
||||||
|
// set engine state to healthy, and start refresh loop
|
||||||
|
engine.ValidationComplete()
|
||||||
c.engines[engine.ID] = engine
|
c.engines[engine.ID] = engine
|
||||||
log.Infof("Registered Engine %s at %s", engine.Name, addr)
|
|
||||||
|
log.Infof("Registered Engine %s at %s", engine.Name, engine.Addr)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -255,7 +289,12 @@ func (c *Cluster) removeEngine(addr string) bool {
|
||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
|
|
||||||
engine.Disconnect()
|
engine.Disconnect()
|
||||||
delete(c.engines, engine.ID)
|
// it could be in pendingEngines or engines
|
||||||
|
if _, ok := c.pendingEngines[addr]; ok {
|
||||||
|
delete(c.pendingEngines, addr)
|
||||||
|
} else {
|
||||||
|
delete(c.engines, engine.ID)
|
||||||
|
}
|
||||||
log.Infof("Removed Engine %s", engine.Name)
|
log.Infof("Removed Engine %s", engine.Name)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
@ -277,10 +316,8 @@ func (c *Cluster) monitorDiscovery(ch <-chan discovery.Entries, errCh <-chan err
|
||||||
c.removeEngine(entry.String())
|
c.removeEngine(entry.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Since `addEngine` can be very slow (it has to connect to the
|
|
||||||
// engine), we are going to do the adds in parallel.
|
|
||||||
for _, entry := range added {
|
for _, entry := range added {
|
||||||
go c.addEngine(entry.String())
|
c.addEngine(entry.String())
|
||||||
}
|
}
|
||||||
case err := <-errCh:
|
case err := <-errCh:
|
||||||
log.Errorf("Discovery error: %v", err)
|
log.Errorf("Discovery error: %v", err)
|
||||||
|
|
@ -288,6 +325,26 @@ func (c *Cluster) monitorDiscovery(ch <-chan discovery.Entries, errCh <-chan err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// monitorPendingEngines checks if some previous unreachable/invalid engines have been fixed
|
||||||
|
func (c *Cluster) monitorPendingEngines() {
|
||||||
|
for {
|
||||||
|
// Don't need to do it frequently
|
||||||
|
time.Sleep(30 * time.Second)
|
||||||
|
// Get the list of pendingEngines
|
||||||
|
c.RLock()
|
||||||
|
pEngines := make([]*cluster.Engine, 0, len(c.pendingEngines))
|
||||||
|
for _, e := range c.pendingEngines {
|
||||||
|
pEngines = append(pEngines, e)
|
||||||
|
}
|
||||||
|
c.RUnlock()
|
||||||
|
for _, e := range pEngines {
|
||||||
|
if e.TimeToValidate() {
|
||||||
|
go c.validatePendingEngine(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Images returns all the images in the cluster.
|
// Images returns all the images in the cluster.
|
||||||
func (c *Cluster) Images() cluster.Images {
|
func (c *Cluster) Images() cluster.Images {
|
||||||
c.RLock()
|
c.RLock()
|
||||||
|
|
@ -672,7 +729,7 @@ func (c *Cluster) Volume(name string) *cluster.Volume {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// listNodes returns all the engines in the cluster.
|
// listNodes returns all validated engines in the cluster, excluding pendingEngines.
|
||||||
func (c *Cluster) listNodes() []*node.Node {
|
func (c *Cluster) listNodes() []*node.Node {
|
||||||
c.RLock()
|
c.RLock()
|
||||||
defer c.RUnlock()
|
defer c.RUnlock()
|
||||||
|
|
@ -692,14 +749,18 @@ func (c *Cluster) listNodes() []*node.Node {
|
||||||
}
|
}
|
||||||
|
|
||||||
// listEngines returns all the engines in the cluster.
|
// listEngines returns all the engines in the cluster.
|
||||||
|
// This is for reporting, not scheduling, hence pendingEngines are included.
|
||||||
func (c *Cluster) listEngines() []*cluster.Engine {
|
func (c *Cluster) listEngines() []*cluster.Engine {
|
||||||
c.RLock()
|
c.RLock()
|
||||||
defer c.RUnlock()
|
defer c.RUnlock()
|
||||||
|
|
||||||
out := make([]*cluster.Engine, 0, len(c.engines))
|
out := make([]*cluster.Engine, 0, len(c.engines)+len(c.pendingEngines))
|
||||||
for _, n := range c.engines {
|
for _, n := range c.engines {
|
||||||
out = append(out, n)
|
out = append(out, n)
|
||||||
}
|
}
|
||||||
|
for _, n := range c.pendingEngines {
|
||||||
|
out = append(out, n)
|
||||||
|
}
|
||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -744,6 +805,8 @@ func (c *Cluster) Info() [][]string {
|
||||||
}
|
}
|
||||||
sort.Strings(labels)
|
sort.Strings(labels)
|
||||||
info = append(info, []string{" └ Labels", fmt.Sprintf("%s", strings.Join(labels, ", "))})
|
info = append(info, []string{" └ Labels", fmt.Sprintf("%s", strings.Join(labels, ", "))})
|
||||||
|
info = append(info, []string{" └ Error", engine.ErrMsg()})
|
||||||
|
info = append(info, []string{" └ UpdatedAt", engine.UpdatedAt().UTC().Format(time.RFC3339)})
|
||||||
}
|
}
|
||||||
|
|
||||||
return info
|
return info
|
||||||
|
|
|
||||||
|
|
@ -25,7 +25,7 @@ func (nopCloser) Close() error {
|
||||||
|
|
||||||
var (
|
var (
|
||||||
mockInfo = &dockerclient.Info{
|
mockInfo = &dockerclient.Info{
|
||||||
ID: "id",
|
ID: "test-engine",
|
||||||
Name: "name",
|
Name: "name",
|
||||||
NCPU: 10,
|
NCPU: 10,
|
||||||
MemTotal: 20,
|
MemTotal: 20,
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue