From cb2ceea70203dbce53637d3fe82eb06d922be166 Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Fri, 9 Oct 2015 11:32:03 -0700 Subject: [PATCH] engine: Added a concurrent safe refresh delayer. Signed-off-by: Andrea Luzzardi --- cluster/engine.go | 63 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 43 insertions(+), 20 deletions(-) diff --git a/cluster/engine.go b/cluster/engine.go index b50045c3d8..2b43e927c8 100644 --- a/cluster/engine.go +++ b/cluster/engine.go @@ -21,8 +21,8 @@ import ( const ( // Force-refresh the state of the engine this often. - stateRefreshMinRange = 30 - stateRefreshMaxRange = 60 + stateRefreshMinRange = 30 * time.Second + stateRefreshMaxRange = 60 * time.Second stateRefreshRetries = 3 // Timeout for requests sent out to the engine. @@ -32,20 +32,29 @@ const ( minSupportedVersion = version.Version("1.6.0") ) -// NewEngine is exported -func NewEngine(addr string, overcommitRatio float64) *Engine { - e := &Engine{ - Addr: addr, - client: nopclient.NewNopClient(), - Labels: make(map[string]string), - stopCh: make(chan struct{}), - r: rand.New(rand.NewSource(time.Now().UTC().UnixNano())), - containers: make(map[string]*Container), - volumes: make(map[string]*Volume), - healthy: true, - overcommitRatio: int64(overcommitRatio * 100), +// delayer offers a simple API to random delay within a given time range. +type delayer struct { + rangeMin time.Duration + rangeMax time.Duration + + r *rand.Rand + l sync.Mutex +} + +func newDelayer(rangeMin, rangeMax time.Duration) *delayer { + return &delayer{ + rangeMin: rangeMin, + rangeMax: rangeMax, + r: rand.New(rand.NewSource(time.Now().UTC().UnixNano())), } - return e +} + +func (d *delayer) Wait() <-chan time.Time { + d.l.Lock() + defer d.l.Unlock() + + waitPeriod := int64(d.rangeMin) + d.r.Int63n(int64(d.rangeMax)-int64(d.rangeMin)) + return time.After(time.Duration(waitPeriod)) } // Engine represents a docker engine @@ -61,7 +70,7 @@ type Engine struct { Labels map[string]string stopCh chan struct{} - r *rand.Rand + refreshDelayer *delayer containers map[string]*Container images []*Image volumes map[string]*Volume @@ -71,6 +80,22 @@ type Engine struct { overcommitRatio int64 } +// NewEngine is exported +func NewEngine(addr string, overcommitRatio float64) *Engine { + e := &Engine{ + Addr: addr, + client: nopclient.NewNopClient(), + refreshDelayer: newDelayer(stateRefreshMinRange, stateRefreshMaxRange), + Labels: make(map[string]string), + stopCh: make(chan struct{}), + containers: make(map[string]*Container), + volumes: make(map[string]*Volume), + healthy: true, + overcommitRatio: int64(overcommitRatio * 100), + } + return e +} + // Connect will initialize a connection to the Docker daemon running on the // host, gather machine specs (memory, cpu, ...) and monitor state changes. func (e *Engine) Connect(config *tls.Config) error { @@ -346,11 +371,9 @@ func (e *Engine) refreshLoop() { for { var err error - refreshPeriod := time.Duration(e.r.Intn(stateRefreshMaxRange-stateRefreshMinRange) + stateRefreshMinRange) - - // Sleep stateRefreshPeriod or quit if we get stopped. + // Wait for the delayer or quit if we get stopped. select { - case <-time.After(refreshPeriod * time.Second): + case <-e.refreshDelayer.Wait(): case <-e.stopCh: return }