Merge pull request #811 from aluzzardi/node-removal

Handle Node Removal from Discovery correctly
This commit is contained in:
Alexandre Beslic 2015-05-19 10:11:10 -07:00
commit d5915b2a09
8 changed files with 142 additions and 23 deletions

View File

@ -233,7 +233,7 @@ func getContainersJSON(c *context, w http.ResponseWriter, r *http.Request) {
// `Status()` will generate a new one
tmp.Status = container.Info.State.String()
if !container.Engine.IsHealthy() {
tmp.Status = "Pending"
tmp.Status = "Host Down"
}
// Overwrite labels with the ones we have in the config.

View File

@ -91,7 +91,7 @@ func createRouter(c *context, enableCors bool) *mux.Router {
localRoute := route
localFct := fct
wrap := func(w http.ResponseWriter, r *http.Request) {
log.WithFields(log.Fields{"method": r.Method, "uri": r.RequestURI}).Info("HTTP request received")
log.WithFields(log.Fields{"method": r.Method, "uri": r.RequestURI}).Debug("HTTP request received")
if enableCors {
writeCorsHeaders(w, r)
}

View File

@ -27,7 +27,7 @@ func NewEngine(addr string, overcommitRatio float64) *Engine {
e := &Engine{
Addr: addr,
Labels: make(map[string]string),
ch: make(chan bool),
stopCh: make(chan struct{}),
containers: make(map[string]*Container),
healthy: true,
overcommitRatio: int64(overcommitRatio * 100),
@ -47,7 +47,7 @@ type Engine struct {
Memory int64
Labels map[string]string
ch chan bool
stopCh chan struct{}
containers map[string]*Container
images []*Image
client dockerclient.Client
@ -108,6 +108,15 @@ func (e *Engine) connectClient(client dockerclient.Client) error {
return nil
}
// Disconnect will stop all monitoring of the engine.
// The Engine object cannot be further used without reconnecting it first.
func (e *Engine) Disconnect() {
close(e.stopCh)
e.client.StopAllMonitorEvents()
e.client = nil
e.emitEvent("engine_disconnect")
}
// isConnected returns true if the engine is connected to a remote docker API
func (e *Engine) isConnected() bool {
return e.client != nil
@ -265,20 +274,18 @@ func (e *Engine) updateContainer(c dockerclient.Container, containers map[string
return containers, nil
}
func (e *Engine) refreshContainersAsync() {
e.ch <- true
}
func (e *Engine) refreshLoop() {
for {
var err error
// Sleep stateRefreshPeriod or quit if we get stopped.
select {
case <-e.ch:
err = e.refreshContainers(false)
case <-time.After(stateRefreshPeriod):
err = e.refreshContainers(false)
case <-e.stopCh:
return
}
err = e.refreshContainers(false)
if err == nil {
err = e.RefreshImages()
}

View File

@ -163,21 +163,25 @@ func (c *Cluster) RemoveContainer(container *cluster.Container, force bool) erro
return nil
}
func (c *Cluster) hasEngine(addr string) bool {
func (c *Cluster) getEngineByAddr(addr string) *cluster.Engine {
c.RLock()
defer c.RUnlock()
for _, engine := range c.engines {
if engine.Addr == addr {
return true
return engine
}
}
return false
return nil
}
func (c *Cluster) hasEngineByAddr(addr string) bool {
return c.getEngineByAddr(addr) != nil
}
func (c *Cluster) addEngine(addr string) bool {
// Check the engine is already registered by address.
if c.hasEngine(addr) {
if c.hasEngineByAddr(addr) {
return false
}
@ -208,19 +212,45 @@ func (c *Cluster) addEngine(addr string) bool {
if err := engine.RegisterEventHandler(c); err != nil {
log.Error(err)
}
log.Infof("Registered Engine %s at %s", engine.Name, addr)
return true
}
func (c *Cluster) removeEngine(addr string) bool {
engine := c.getEngineByAddr(addr)
if engine == nil {
return false
}
c.Lock()
defer c.Unlock()
engine.Disconnect()
delete(c.engines, engine.ID)
log.Infof("Removed Engine %s", engine.Name)
return true
}
// Entries are Docker Engines
func (c *Cluster) monitorDiscovery(ch <-chan discovery.Entries, errCh <-chan error) {
// Watch changes on the discovery channel.
currentEntries := discovery.Entries{}
for {
select {
case entries := <-ch:
// Attempt to add every engine. `addEngine` will take care of duplicates.
added, removed := currentEntries.Diff(entries)
currentEntries = entries
// Remove engines first. `addEngine` will refuse to add an engine
// if there's already an engine with the same ID. If an engine
// changes address, we have to first remove it then add it back.
for _, entry := range removed {
c.removeEngine(entry.String())
}
// Since `addEngine` can be very slow (it has to connect to the
// engine), we are going to launch them in parallel.
for _, entry := range entries {
// engine), we are going to do the adds in parallel.
for _, entry := range added {
go c.addEngine(entry.String())
}
case err := <-errCh:

View File

@ -52,6 +52,35 @@ func (e Entries) Equals(cmp Entries) bool {
return true
}
// Contains returns true if the Entries contain a given Entry.
func (e Entries) Contains(entry *Entry) bool {
for _, curr := range e {
if curr.Equals(entry) {
return true
}
}
return false
}
// Diff compares two entries and returns the added and removed entries.
func (e Entries) Diff(cmp Entries) (Entries, Entries) {
added := Entries{}
for _, entry := range cmp {
if !e.Contains(entry) {
added = append(added, entry)
}
}
removed := Entries{}
for _, entry := range e {
if !cmp.Contains(entry) {
removed = append(removed, entry)
}
}
return added, removed
}
// The Discovery interface is implemented by Discovery backends which
// manage swarm host entries.
type Discovery interface {

View File

@ -80,3 +80,34 @@ func TestEntriesEquality(t *testing.T) {
&Entry{Host: "127.0.0.42", Port: "2375"},
}))
}
func TestEntriesDiff(t *testing.T) {
entry1 := &Entry{Host: "1.1.1.1", Port: "1111"}
entry2 := &Entry{Host: "2.2.2.2", Port: "2222"}
entry3 := &Entry{Host: "3.3.3.3", Port: "3333"}
entries := Entries{entry1, entry2}
// No diff
added, removed := entries.Diff(Entries{entry2, entry1})
assert.Empty(t, added)
assert.Empty(t, removed)
// Add
added, removed = entries.Diff(Entries{entry2, entry3, entry1})
assert.Len(t, added, 1)
assert.True(t, added.Contains(entry3))
assert.Empty(t, removed)
// Remove
added, removed = entries.Diff(Entries{entry2})
assert.Empty(t, added)
assert.Len(t, removed, 1)
assert.True(t, removed.Contains(entry1))
// Add and remove
added, removed = entries.Diff(Entries{entry1, entry3})
assert.Len(t, added, 1)
assert.True(t, added.Contains(entry3))
assert.Len(t, removed, 1)
assert.True(t, removed.Contains(entry2))
}

View File

@ -4,13 +4,17 @@ load ../helpers
# Returns true if all nodes have joined the swarm.
function discovery_check_swarm_info() {
docker_swarm info | grep -q "Nodes: ${#HOSTS[@]}"
local total="$1"
[ -z "$total" ] && total="${#HOSTS[@]}"
docker_swarm info | grep -q "Nodes: $count"
}
# Returns true if all nodes have joined the discovery.
function discovery_check_swarm_list() {
local joined=`swarm list "$1" | wc -l`
local total=${#HOSTS[@]}
local total="$2"
[ -z "$total" ] && total="${#HOSTS[@]}"
echo "${joined} out of ${total} hosts joined discovery"
[ "$joined" -eq "$total" ]

View File

@ -31,7 +31,7 @@ function setup_discovery_file() {
# Start 2 engines and register them in the file.
start_docker 2
setup_discovery_file
retry 5 1 discovery_check_swarm_list "$DISCOVERY"
discovery_check_swarm_list "$DISCOVERY"
# Then, start a manager and ensure it sees all the engines.
swarm_manage "$DISCOVERY"
@ -49,10 +49,28 @@ function setup_discovery_file() {
# Add engines to the cluster and make sure it's picked up by swarm.
start_docker 2
setup_discovery_file
retry 5 1 discovery_check_swarm_list "$DISCOVERY"
discovery_check_swarm_list "$DISCOVERY"
retry 5 1 discovery_check_swarm_info
}
@test "file discovery: node removal" {
# The goal of this test is to ensure swarm can handle node removal.
# Start 2 engines and register them in the file.
start_docker 2
setup_discovery_file
discovery_check_swarm_list "$DISCOVERY"
# Then, start a manager and ensure it sees all the engines.
swarm_manage "$DISCOVERY"
retry 5 1 discovery_check_swarm_info
# Update the file with only one engine and see if swarm picks it up.
echo ${HOSTS[0]} > $DISCOVERY_FILE
discovery_check_swarm_list "$DISCOVERY" 1
retry 5 1 discovery_check_swarm_info 1
}
@test "file discovery: failure" {
# The goal of this test is to simulate a failure (file not available) and ensure discovery
# is resilient to it.
@ -70,6 +88,6 @@ function setup_discovery_file() {
setup_discovery_file
# After a while, `join` and `manage` should see the file.
retry 5 1 discovery_check_swarm_list "$DISCOVERY"
discovery_check_swarm_list "$DISCOVERY"
retry 5 1 discovery_check_swarm_info
}