diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index aad78f4b15..fc4b4c0f26 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -99,6 +99,7 @@ var ( "Type of node group expander to be used in scale up. Available values: ["+strings.Join(expander.AvailableExpanders, ",")+"]") writeStatusConfigMapFlag = flag.Bool("write-status-configmap", true, "Should CA write status information to a configmap") + maxInactivityFlag = flag.Duration("max-inactivity", 10*time.Minute, "Maximum time from last recorded autoscaler activity before automatic restart") ) func createAutoscalerOptions() core.AutoscalerOptions { @@ -166,10 +167,7 @@ func registerSignalHandlers(autoscaler core.Autoscaler) { }() } -// In order to meet interface criteria for LeaderElectionConfig we need to -// take stop channel as an argument. However, since we are committing a suicide -// after loosing mastership we can safely ignore it. -func run(_ <-chan struct{}) { +func run(healthCheck *metrics.HealthCheck) { kubeClient := createKubeClient() kubeEventRecorder := kube_util.CreateEventRecorder(kubeClient) opts := createAutoscalerOptions() @@ -184,6 +182,7 @@ func run(_ <-chan struct{}) { autoscaler.CleanUp() registerSignalHandlers(autoscaler) + healthCheck.StartMonitoring() for { select { @@ -191,6 +190,7 @@ func run(_ <-chan struct{}) { { loopStart := time.Now() metrics.UpdateLastTime("main", loopStart) + healthCheck.UpdateLastActivity(loopStart) err := autoscaler.RunOnce(loopStart) if err != nil && err.Type() != errors.TransientError { @@ -212,6 +212,8 @@ func main() { "Can be used multiple times. Format: ::") kube_flag.InitFlags() + healthCheck := metrics.NewHealthCheck(*maxInactivityFlag) + glog.Infof("Cluster Autoscaler %s", ClusterAutoscalerVersion) correctEstimator := false @@ -226,12 +228,13 @@ func main() { go func() { http.Handle("/metrics", prometheus.Handler()) + http.Handle("/health-check", healthCheck) err := http.ListenAndServe(*address, nil) glog.Fatalf("Failed to start metrics: %v", err) }() if !leaderElection.LeaderElect { - run(nil) + run(healthCheck) } else { id, err := os.Hostname() if err != nil { @@ -262,7 +265,11 @@ func main() { RenewDeadline: leaderElection.RenewDeadline.Duration, RetryPeriod: leaderElection.RetryPeriod.Duration, Callbacks: kube_leaderelection.LeaderCallbacks{ - OnStartedLeading: run, + OnStartedLeading: func(_ <-chan struct{}) { + // Since we are committing a suicide after losing + // mastership, we can safely ignore the argument. + run(healthCheck) + }, OnStoppedLeading: func() { glog.Fatalf("lost master") }, diff --git a/cluster-autoscaler/metrics/liveness.go b/cluster-autoscaler/metrics/liveness.go new file mode 100644 index 0000000000..321f10bf7a --- /dev/null +++ b/cluster-autoscaler/metrics/liveness.go @@ -0,0 +1,78 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "fmt" + "net/http" + "sync" + "time" +) + +// HealthCheck contains information about last time of autoscaler activity and timeout +type HealthCheck struct { + lastActivity time.Time + mutex *sync.Mutex + timeout time.Duration + checkTimeout bool +} + +// NewHealthCheck builds new HealthCheck object with given timeout +func NewHealthCheck(timeout time.Duration) *HealthCheck { + return &HealthCheck{ + lastActivity: time.Now(), + mutex: &sync.Mutex{}, + timeout: timeout, + checkTimeout: false, + } +} + +// StartMonitoring activates checks for autoscaler inactivity +func (hc *HealthCheck) StartMonitoring() { + hc.mutex.Lock() + defer hc.mutex.Unlock() + hc.checkTimeout = true + now := time.Now() + if now.After(hc.lastActivity) { + hc.lastActivity = now + } +} + +// ServeHTTP implements http.Handler interface to provide a health-check endpoint +func (hc *HealthCheck) ServeHTTP(w http.ResponseWriter, r *http.Request) { + hc.mutex.Lock() + lastActivity := hc.lastActivity + timedOut := hc.checkTimeout && time.Now().After(lastActivity.Add(hc.timeout)) + hc.mutex.Unlock() + + if timedOut { + w.WriteHeader(500) + w.Write([]byte(fmt.Sprintf("Error: last activity more than %v ago", time.Now().Sub(lastActivity).String()))) + } else { + w.WriteHeader(200) + w.Write([]byte("OK")) + } +} + +// UpdateLastActivity updates last time of activity +func (hc *HealthCheck) UpdateLastActivity(timestamp time.Time) { + hc.mutex.Lock() + defer hc.mutex.Unlock() + if timestamp.After(hc.lastActivity) { + hc.lastActivity = timestamp + } +} diff --git a/cluster-autoscaler/metrics/liveness_test.go b/cluster-autoscaler/metrics/liveness_test.go new file mode 100644 index 0000000000..ed57e3dfc4 --- /dev/null +++ b/cluster-autoscaler/metrics/liveness_test.go @@ -0,0 +1,76 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func getTestResponse(start time.Time, timeout time.Duration, checkMonitoring bool) *httptest.ResponseRecorder { + req := httptest.NewRequest("GET", "/health-check", nil) + w := httptest.NewRecorder() + healthCheck := NewHealthCheck(timeout) + if checkMonitoring { + healthCheck.StartMonitoring() + } + healthCheck.lastActivity = start + healthCheck.ServeHTTP(w, req) + return w +} + +func TestOkServeHTTP(t *testing.T) { + w := getTestResponse(time.Now(), time.Second, true) + assert.Equal(t, 200, w.Code) +} + +func TestFailServeHTTP(t *testing.T) { + w := getTestResponse(time.Now().Add(time.Second*-2), time.Second, true) + assert.Equal(t, 500, w.Code) +} + +func TestMonitoringOffAfterTimeout(t *testing.T) { + w := getTestResponse(time.Now(), time.Second, false) + assert.Equal(t, 200, w.Code) +} + +func TestMonitoringOffBeforeTimeout(t *testing.T) { + w := getTestResponse(time.Now().Add(time.Second*-2), time.Second, false) + assert.Equal(t, 200, w.Code) +} + +func TestUpdateLastActivity(t *testing.T) { + timeout := time.Second + start := time.Now().Add(timeout * -2) + + req := httptest.NewRequest("GET", "/health-check", nil) + w := httptest.NewRecorder() + healthCheck := NewHealthCheck(timeout) + healthCheck.StartMonitoring() + healthCheck.lastActivity = start + + healthCheck.ServeHTTP(w, req) + assert.Equal(t, 500, w.Code) + + w = httptest.NewRecorder() + healthCheck.UpdateLastActivity(time.Now()) + healthCheck.ServeHTTP(w, req) + assert.Equal(t, 200, w.Code) +}