diff --git a/cli/commands.go b/cli/commands.go index 117dddd18c..73790935c9 100644 --- a/cli/commands.go +++ b/cli/commands.go @@ -26,6 +26,7 @@ var ( flHosts, flLeaderElection, flLeaderTTL, flManageAdvertise, flTLS, flTLSCaCert, flTLSCert, flTLSKey, flTLSVerify, + flRefreshIntervalMin, flRefreshIntervalMax, flRefreshRetry, flHeartBeat, flEnableCors, flCluster, flDiscoveryOpt, flClusterOpt}, diff --git a/cli/flags.go b/cli/flags.go index d974030cbe..49164f7ced 100644 --- a/cli/flags.go +++ b/cli/flags.go @@ -61,6 +61,21 @@ var ( Value: "10s", Usage: "timeout period", } + flRefreshIntervalMin = cli.StringFlag{ + Name: "engine-refresh-min-interval", + Value: "30s", + Usage: "set engine refresh minimum interval", + } + flRefreshIntervalMax = cli.StringFlag{ + Name: "engine-refresh-max-interval", + Value: "60s", + Usage: "set engine refresh maximum interval", + } + flRefreshRetry = cli.StringFlag{ + Name: "engine-refresh-retry", + Value: "3", + Usage: "set engine refresh retry count on failure", + } flEnableCors = cli.BoolFlag{ Name: "api-enable-cors, cors", Usage: "enable CORS headers in the remote API", @@ -117,7 +132,6 @@ var ( Usage: "discovery options", Value: &cli.StringSlice{}, } - flLeaderElection = cli.BoolFlag{ Name: "replication", Usage: "Enable Swarm manager replication", diff --git a/cli/manage.go b/cli/manage.go index 801c801655..a42c32358f 100644 --- a/cli/manage.go +++ b/cli/manage.go @@ -232,6 +232,24 @@ func manage(c *cli.Context) { } } + refreshMinInterval := c.Duration("engine-refresh-min-interval") + refreshMaxInterval := c.Duration("engine-refresh-max-interval") + if refreshMinInterval == time.Duration(0) { + log.Fatal("minimum refresh interval should be a positive number") + } + if refreshMaxInterval < refreshMinInterval { + log.Fatal("max refresh interval cannot be less than min refresh interval") + } + refreshRetry := c.Int("engine-refresh-retry") + if refreshRetry <= 0 { + log.Fatal("invalid refresh retry count") + } + engineOpts := &cluster.EngineOpts{ + RefreshMinInterval: refreshMinInterval, + RefreshMaxInterval: refreshMaxInterval, + RefreshRetry: refreshRetry, + } + uri := getDiscovery(c) if uri == "" { log.Fatalf("discovery required to manage a cluster. See '%s manage --help'.", c.App.Name) @@ -257,9 +275,9 @@ func manage(c *cli.Context) { switch c.String("cluster-driver") { case "mesos-experimental": log.Warn("WARNING: the mesos driver is currently experimental, use at your own risks") - cl, err = mesos.NewCluster(sched, tlsConfig, uri, c.StringSlice("cluster-opt")) + cl, err = mesos.NewCluster(sched, tlsConfig, uri, c.StringSlice("cluster-opt"), engineOpts) case "swarm": - cl, err = swarm.NewCluster(sched, tlsConfig, discovery, c.StringSlice("cluster-opt")) + cl, err = swarm.NewCluster(sched, tlsConfig, discovery, c.StringSlice("cluster-opt"), engineOpts) default: log.Fatalf("unsupported cluster %q", c.String("cluster-driver")) } diff --git a/cluster/engine.go b/cluster/engine.go index aba9f061aa..705fb429c5 100644 --- a/cluster/engine.go +++ b/cluster/engine.go @@ -19,11 +19,6 @@ import ( ) const ( - // Force-refresh the state of the engine this often. - stateRefreshMinRange = 30 * time.Second - stateRefreshMaxRange = 60 * time.Second - stateRefreshRetries = 3 - // Timeout for requests sent out to the engine. requestTimeout = 10 * time.Second @@ -52,10 +47,21 @@ func (d *delayer) Wait() <-chan time.Time { d.l.Lock() defer d.l.Unlock() - waitPeriod := int64(d.rangeMin) + d.r.Int63n(int64(d.rangeMax)-int64(d.rangeMin)) + waitPeriod := int64(d.rangeMin) + if delta := int64(d.rangeMax) - int64(d.rangeMin); delta > 0 { + // Int63n panics if the parameter is 0 + waitPeriod += d.r.Int63n(delta) + } return time.After(time.Duration(waitPeriod)) } +// EngineOpts represents the options for an engine +type EngineOpts struct { + RefreshMinInterval time.Duration + RefreshMaxInterval time.Duration + RefreshRetry int +} + // Engine represents a docker engine type Engine struct { sync.RWMutex @@ -78,14 +84,18 @@ type Engine struct { eventHandler EventHandler healthy bool overcommitRatio int64 + opts *EngineOpts } // NewEngine is exported -func NewEngine(addr string, overcommitRatio float64) *Engine { +func NewEngine(addr string, overcommitRatio float64, opts *EngineOpts) *Engine { + if opts == nil { + log.Fatal("EngineOpts is nil") + } e := &Engine{ Addr: addr, client: nopclient.NewNopClient(), - refreshDelayer: newDelayer(stateRefreshMinRange, stateRefreshMaxRange), + refreshDelayer: newDelayer(opts.RefreshMinInterval, opts.RefreshMaxInterval), Labels: make(map[string]string), stopCh: make(chan struct{}), containers: make(map[string]*Container), @@ -93,6 +103,7 @@ func NewEngine(addr string, overcommitRatio float64) *Engine { volumes: make(map[string]*Volume), healthy: true, overcommitRatio: int64(overcommitRatio * 100), + opts: opts, } return e } @@ -412,10 +423,8 @@ func (e *Engine) refreshLoop() { if err != nil { failedAttempts++ - if failedAttempts >= stateRefreshRetries { - if e.healthy { - e.emitEvent("engine_disconnect") - } + if failedAttempts >= e.opts.RefreshRetry && e.healthy { + e.emitEvent("engine_disconnect") e.healthy = false log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Errorf("Flagging engine as dead. Updated state failed %d times: %v", failedAttempts, err) } diff --git a/cluster/engine_test.go b/cluster/engine_test.go index eb2bf99350..6c2dd846b4 100644 --- a/cluster/engine_test.go +++ b/cluster/engine_test.go @@ -5,6 +5,7 @@ import ( "fmt" "math" "testing" + "time" "github.com/samalba/dockerclient" "github.com/samalba/dockerclient/mockclient" @@ -32,7 +33,12 @@ var ( ) func TestEngineConnectionFailure(t *testing.T) { - engine := NewEngine("test", 0) + opts := &EngineOpts{ + RefreshMinInterval: time.Duration(30), + RefreshMaxInterval: time.Duration(60), + RefreshRetry: 3, + } + engine := NewEngine("test", 0, opts) assert.False(t, engine.isConnected()) // Always fail. @@ -51,7 +57,12 @@ func TestEngineConnectionFailure(t *testing.T) { } func TestOutdatedEngine(t *testing.T) { - engine := NewEngine("test", 0) + opts := &EngineOpts{ + RefreshMinInterval: time.Duration(30), + RefreshMaxInterval: time.Duration(60), + RefreshRetry: 3, + } + engine := NewEngine("test", 0, opts) client := mockclient.NewMockClient() client.On("Info").Return(&dockerclient.Info{}, nil) @@ -65,7 +76,12 @@ func TestOutdatedEngine(t *testing.T) { } func TestEngineCpusMemory(t *testing.T) { - engine := NewEngine("test", 0) + opts := &EngineOpts{ + RefreshMinInterval: time.Duration(30), + RefreshMaxInterval: time.Duration(60), + RefreshRetry: 3, + } + engine := NewEngine("test", 0, opts) assert.False(t, engine.isConnected()) client := mockclient.NewMockClient() @@ -88,7 +104,12 @@ func TestEngineCpusMemory(t *testing.T) { } func TestEngineSpecs(t *testing.T) { - engine := NewEngine("test", 0) + opts := &EngineOpts{ + RefreshMinInterval: time.Duration(30), + RefreshMaxInterval: time.Duration(60), + RefreshRetry: 3, + } + engine := NewEngine("test", 0, opts) assert.False(t, engine.isConnected()) client := mockclient.NewMockClient() @@ -116,7 +137,12 @@ func TestEngineSpecs(t *testing.T) { } func TestEngineState(t *testing.T) { - engine := NewEngine("test", 0) + opts := &EngineOpts{ + RefreshMinInterval: time.Duration(30), + RefreshMaxInterval: time.Duration(60), + RefreshRetry: 3, + } + engine := NewEngine("test", 0, opts) assert.False(t, engine.isConnected()) client := mockclient.NewMockClient() @@ -158,6 +184,11 @@ func TestEngineState(t *testing.T) { } func TestCreateContainer(t *testing.T) { + opts := &EngineOpts{ + RefreshMinInterval: time.Duration(30), + RefreshMaxInterval: time.Duration(60), + RefreshRetry: 3, + } var ( config = &ContainerConfig{dockerclient.ContainerConfig{ Image: "busybox", @@ -165,7 +196,7 @@ func TestCreateContainer(t *testing.T) { Cmd: []string{"date"}, Tty: false, }} - engine = NewEngine("test", 0) + engine = NewEngine("test", 0, opts) client = mockclient.NewMockClient() ) @@ -224,7 +255,12 @@ func TestCreateContainer(t *testing.T) { } func TestImages(t *testing.T) { - engine := NewEngine("test", 0) + opts := &EngineOpts{ + RefreshMinInterval: time.Duration(30), + RefreshMaxInterval: time.Duration(60), + RefreshRetry: 3, + } + engine := NewEngine("test", 0, opts) engine.images = []*Image{ {dockerclient.Image{Id: "a"}, engine}, {dockerclient.Image{Id: "b"}, engine}, @@ -234,22 +270,33 @@ func TestImages(t *testing.T) { result := engine.Images() assert.Equal(t, len(result), 3) } + func TestTotalMemory(t *testing.T) { - engine := NewEngine("test", 0.05) + opts := &EngineOpts{ + RefreshMinInterval: time.Duration(30), + RefreshMaxInterval: time.Duration(60), + RefreshRetry: 3, + } + engine := NewEngine("test", 0.05, opts) engine.Memory = 1024 assert.Equal(t, engine.TotalMemory(), int64(1024+1024*5/100)) - engine = NewEngine("test", 0) + engine = NewEngine("test", 0, opts) engine.Memory = 1024 assert.Equal(t, engine.TotalMemory(), int64(1024)) } func TestTotalCpus(t *testing.T) { - engine := NewEngine("test", 0.05) + opts := &EngineOpts{ + RefreshMinInterval: time.Duration(30), + RefreshMaxInterval: time.Duration(60), + RefreshRetry: 3, + } + engine := NewEngine("test", 0.05, opts) engine.Cpus = 2 assert.Equal(t, engine.TotalCpus(), int64(2+2*5/100)) - engine = NewEngine("test", 0) + engine = NewEngine("test", 0, opts) engine.Cpus = 2 assert.Equal(t, engine.TotalCpus(), int64(2)) } @@ -260,7 +307,12 @@ func TestUsedCpus(t *testing.T) { hostNcpu = []int64{1, 2, 4, 8, 10, 12, 16, 20, 32, 36, 40, 48} ) - engine := NewEngine("test", 0) + opts := &EngineOpts{ + RefreshMinInterval: time.Duration(30), + RefreshMaxInterval: time.Duration(60), + RefreshRetry: 3, + } + engine := NewEngine("test", 0, opts) client := mockclient.NewMockClient() for _, hn := range hostNcpu { @@ -293,7 +345,12 @@ func TestContainerRemovedDuringRefresh(t *testing.T) { info2 = &dockerclient.ContainerInfo{Id: "c2", Config: &dockerclient.ContainerConfig{}} ) - engine := NewEngine("test", 0) + opts := &EngineOpts{ + RefreshMinInterval: time.Duration(30), + RefreshMaxInterval: time.Duration(60), + RefreshRetry: 3, + } + engine := NewEngine("test", 0, opts) assert.False(t, engine.isConnected()) // A container is removed before it can be inspected. diff --git a/cluster/image_test.go b/cluster/image_test.go index 816521aee3..1bfb36abf2 100644 --- a/cluster/image_test.go +++ b/cluster/image_test.go @@ -2,6 +2,7 @@ package cluster import ( "testing" + "time" dockerfilters "github.com/docker/docker/pkg/parsers/filters" "github.com/samalba/dockerclient" @@ -52,7 +53,12 @@ func TestMatchPrivateRepo(t *testing.T) { } func TestImagesFilterWithLabelFilter(t *testing.T) { - engine := NewEngine("test", 0) + opts := &EngineOpts{ + RefreshMinInterval: time.Duration(30), + RefreshMaxInterval: time.Duration(60), + RefreshRetry: 3, + } + engine := NewEngine("test", 0, opts) images := Images{ {dockerclient.Image{Id: "a"}, engine}, {dockerclient.Image{ @@ -69,7 +75,12 @@ func TestImagesFilterWithLabelFilter(t *testing.T) { } func TestImagesFilterWithNameFilter(t *testing.T) { - engine := NewEngine("test", 0) + opts := &EngineOpts{ + RefreshMinInterval: time.Duration(30), + RefreshMaxInterval: time.Duration(60), + RefreshRetry: 3, + } + engine := NewEngine("test", 0, opts) images := Images{ { dockerclient.Image{ @@ -93,7 +104,12 @@ func TestImagesFilterWithNameFilter(t *testing.T) { } func TestImagesFilterWithNameFilterWithTag(t *testing.T) { - engine := NewEngine("test", 0) + opts := &EngineOpts{ + RefreshMinInterval: time.Duration(30), + RefreshMaxInterval: time.Duration(60), + RefreshRetry: 3, + } + engine := NewEngine("test", 0, opts) images := Images{ { dockerclient.Image{ diff --git a/cluster/mesos/cluster.go b/cluster/mesos/cluster.go index b540c69fdd..4eca16ed2a 100644 --- a/cluster/mesos/cluster.go +++ b/cluster/mesos/cluster.go @@ -37,6 +37,7 @@ type Cluster struct { offerTimeout time.Duration taskCreationTimeout time.Duration pendingTasks *queue.Queue + engineOpts *cluster.EngineOpts } const ( @@ -54,7 +55,7 @@ var ( ) // NewCluster for mesos Cluster creation -func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, master string, options cluster.DriverOpts) (cluster.Cluster, error) { +func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, master string, options cluster.DriverOpts, engineOptions *cluster.EngineOpts) (cluster.Cluster, error) { log.WithFields(log.Fields{"name": "mesos"}).Debug("Initializing cluster") cluster := &Cluster{ @@ -66,6 +67,7 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, master st options: &options, offerTimeout: defaultOfferTimeout, taskCreationTimeout: defaultTaskCreationTimeout, + engineOpts: engineOptions, } cluster.pendingTasks = queue.NewQueue() diff --git a/cluster/mesos/cluster_test.go b/cluster/mesos/cluster_test.go index 6d3c8131f4..6afa70ac0b 100644 --- a/cluster/mesos/cluster_test.go +++ b/cluster/mesos/cluster_test.go @@ -2,6 +2,7 @@ package mesos import ( "testing" + "time" "github.com/docker/swarm/cluster" "github.com/samalba/dockerclient" @@ -9,7 +10,12 @@ import ( ) func createSlave(t *testing.T, ID string, containers ...*cluster.Container) *slave { - engine := cluster.NewEngine(ID, 0) + opts := &cluster.EngineOpts{ + RefreshMinInterval: time.Duration(30), + RefreshMaxInterval: time.Duration(60), + RefreshRetry: 3, + } + engine := cluster.NewEngine(ID, 0, opts) engine.Name = ID engine.ID = ID diff --git a/cluster/mesos/driver.go b/cluster/mesos/driver.go index 94d41c0afa..8b4ca1c929 100644 --- a/cluster/mesos/driver.go +++ b/cluster/mesos/driver.go @@ -43,7 +43,7 @@ func (c *Cluster) ResourceOffers(_ mesosscheduler.SchedulerDriver, offers []*mes } s, ok := c.slaves[slaveID] if !ok { - engine := cluster.NewEngine(*offer.Hostname+":"+dockerPort, 0) + engine := cluster.NewEngine(*offer.Hostname+":"+dockerPort, 0, c.engineOpts) if err := engine.Connect(c.TLSConfig); err != nil { log.Error(err) } else { diff --git a/cluster/swarm/cluster.go b/cluster/swarm/cluster.go index a821e17934..93322f21a0 100644 --- a/cluster/swarm/cluster.go +++ b/cluster/swarm/cluster.go @@ -54,11 +54,12 @@ type Cluster struct { pendingContainers map[string]*pendingContainer overcommitRatio float64 + engineOpts *cluster.EngineOpts TLSConfig *tls.Config } // NewCluster is exported -func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, discovery discovery.Discovery, options cluster.DriverOpts) (cluster.Cluster, error) { +func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, discovery discovery.Discovery, options cluster.DriverOpts, engineOptions *cluster.EngineOpts) (cluster.Cluster, error) { log.WithFields(log.Fields{"name": "swarm"}).Debug("Initializing cluster") cluster := &Cluster{ @@ -68,6 +69,7 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, discovery discovery: discovery, pendingContainers: make(map[string]*pendingContainer), overcommitRatio: 0.05, + engineOpts: engineOptions, } if val, ok := options.Float("swarm.overcommit", ""); ok { @@ -209,7 +211,7 @@ func (c *Cluster) addEngine(addr string) bool { return false } - engine := cluster.NewEngine(addr, c.overcommitRatio) + engine := cluster.NewEngine(addr, c.overcommitRatio, c.engineOpts) if err := engine.RegisterEventHandler(c); err != nil { log.Error(err) } diff --git a/cluster/swarm/cluster_test.go b/cluster/swarm/cluster_test.go index f99e9d4467..953bf61f24 100644 --- a/cluster/swarm/cluster_test.go +++ b/cluster/swarm/cluster_test.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "testing" + "time" "github.com/docker/swarm/cluster" "github.com/samalba/dockerclient" @@ -41,7 +42,12 @@ var ( ) func createEngine(t *testing.T, ID string, containers ...*cluster.Container) *cluster.Engine { - engine := cluster.NewEngine(ID, 0) + opts := &cluster.EngineOpts{ + RefreshMinInterval: time.Duration(30), + RefreshMaxInterval: time.Duration(60), + RefreshRetry: 3, + } + engine := cluster.NewEngine(ID, 0, opts) engine.Name = ID engine.ID = ID @@ -119,7 +125,12 @@ func TestImportImage(t *testing.T) { // create engione id := "test-engine" - engine := cluster.NewEngine(id, 0) + opts := &cluster.EngineOpts{ + RefreshMinInterval: time.Duration(30), + RefreshMaxInterval: time.Duration(60), + RefreshRetry: 3, + } + engine := cluster.NewEngine(id, 0, opts) engine.Name = id engine.ID = id @@ -169,7 +180,12 @@ func TestLoadImage(t *testing.T) { // create engione id := "test-engine" - engine := cluster.NewEngine(id, 0) + opts := &cluster.EngineOpts{ + RefreshMinInterval: time.Duration(30), + RefreshMaxInterval: time.Duration(60), + RefreshRetry: 3, + } + engine := cluster.NewEngine(id, 0, opts) engine.Name = id engine.ID = id @@ -222,7 +238,12 @@ func TestTagImage(t *testing.T) { // create engine id := "test-engine" - engine := cluster.NewEngine(id, 0) + opts := &cluster.EngineOpts{ + RefreshMinInterval: time.Duration(30), + RefreshMaxInterval: time.Duration(60), + RefreshRetry: 3, + } + engine := cluster.NewEngine(id, 0, opts) engine.Name = id engine.ID = id