engine: Added a concurrent safe refresh delayer.

Signed-off-by: Andrea Luzzardi <aluzzardi@gmail.com>
This commit is contained in:
Andrea Luzzardi 2015-10-09 11:32:03 -07:00
parent f1782fed90
commit cb2ceea702
1 changed files with 43 additions and 20 deletions

View File

@ -21,8 +21,8 @@ import (
const ( const (
// Force-refresh the state of the engine this often. // Force-refresh the state of the engine this often.
stateRefreshMinRange = 30 stateRefreshMinRange = 30 * time.Second
stateRefreshMaxRange = 60 stateRefreshMaxRange = 60 * time.Second
stateRefreshRetries = 3 stateRefreshRetries = 3
// Timeout for requests sent out to the engine. // Timeout for requests sent out to the engine.
@ -32,20 +32,29 @@ const (
minSupportedVersion = version.Version("1.6.0") minSupportedVersion = version.Version("1.6.0")
) )
// NewEngine is exported // delayer offers a simple API to random delay within a given time range.
func NewEngine(addr string, overcommitRatio float64) *Engine { type delayer struct {
e := &Engine{ rangeMin time.Duration
Addr: addr, rangeMax time.Duration
client: nopclient.NewNopClient(),
Labels: make(map[string]string), r *rand.Rand
stopCh: make(chan struct{}), l sync.Mutex
r: rand.New(rand.NewSource(time.Now().UTC().UnixNano())), }
containers: make(map[string]*Container),
volumes: make(map[string]*Volume), func newDelayer(rangeMin, rangeMax time.Duration) *delayer {
healthy: true, return &delayer{
overcommitRatio: int64(overcommitRatio * 100), rangeMin: rangeMin,
rangeMax: rangeMax,
r: rand.New(rand.NewSource(time.Now().UTC().UnixNano())),
} }
return e }
func (d *delayer) Wait() <-chan time.Time {
d.l.Lock()
defer d.l.Unlock()
waitPeriod := int64(d.rangeMin) + d.r.Int63n(int64(d.rangeMax)-int64(d.rangeMin))
return time.After(time.Duration(waitPeriod))
} }
// Engine represents a docker engine // Engine represents a docker engine
@ -61,7 +70,7 @@ type Engine struct {
Labels map[string]string Labels map[string]string
stopCh chan struct{} stopCh chan struct{}
r *rand.Rand refreshDelayer *delayer
containers map[string]*Container containers map[string]*Container
images []*Image images []*Image
volumes map[string]*Volume volumes map[string]*Volume
@ -71,6 +80,22 @@ type Engine struct {
overcommitRatio int64 overcommitRatio int64
} }
// NewEngine is exported
func NewEngine(addr string, overcommitRatio float64) *Engine {
e := &Engine{
Addr: addr,
client: nopclient.NewNopClient(),
refreshDelayer: newDelayer(stateRefreshMinRange, stateRefreshMaxRange),
Labels: make(map[string]string),
stopCh: make(chan struct{}),
containers: make(map[string]*Container),
volumes: make(map[string]*Volume),
healthy: true,
overcommitRatio: int64(overcommitRatio * 100),
}
return e
}
// Connect will initialize a connection to the Docker daemon running on the // Connect will initialize a connection to the Docker daemon running on the
// host, gather machine specs (memory, cpu, ...) and monitor state changes. // host, gather machine specs (memory, cpu, ...) and monitor state changes.
func (e *Engine) Connect(config *tls.Config) error { func (e *Engine) Connect(config *tls.Config) error {
@ -346,11 +371,9 @@ func (e *Engine) refreshLoop() {
for { for {
var err error var err error
refreshPeriod := time.Duration(e.r.Intn(stateRefreshMaxRange-stateRefreshMinRange) + stateRefreshMinRange) // Wait for the delayer or quit if we get stopped.
// Sleep stateRefreshPeriod or quit if we get stopped.
select { select {
case <-time.After(refreshPeriod * time.Second): case <-e.refreshDelayer.Wait():
case <-e.stopCh: case <-e.stopCh:
return return
} }