Add typed errors; add errors_total metric

To keep reasonable commit size only top-level files use
new errors. Will add them in other files in next commits.
This commit is contained in:
Maciej Pytel 2017-05-16 16:56:54 +02:00
parent b432362a70
commit f716a7e496
8 changed files with 122 additions and 24 deletions

View File

@ -22,6 +22,7 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic" "k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
"k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
kube_record "k8s.io/client-go/tools/record" kube_record "k8s.io/client-go/tools/record"
kube_client "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" kube_client "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
@ -37,7 +38,7 @@ type AutoscalerOptions struct {
// The configuration can be injected at the creation of an autoscaler // The configuration can be injected at the creation of an autoscaler
type Autoscaler interface { type Autoscaler interface {
// RunOnce represents an iteration in the control-loop of CA // RunOnce represents an iteration in the control-loop of CA
RunOnce(currentTime time.Time) RunOnce(currentTime time.Time) *errors.AutoscalerError
// CleanUp represents a clean-up required before the first invocation of RunOnce // CleanUp represents a clean-up required before the first invocation of RunOnce
CleanUp() CleanUp()
// ExitCleanUp is a clean-up performed just before process termination. // ExitCleanUp is a clean-up performed just before process termination.

View File

@ -24,6 +24,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic" "k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
"k8s.io/autoscaler/cluster-autoscaler/metrics" "k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
kube_record "k8s.io/client-go/tools/record" kube_record "k8s.io/client-go/tools/record"
kube_client "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" kube_client "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
@ -56,14 +57,14 @@ func (a *DynamicAutoscaler) ExitCleanUp() {
} }
// RunOnce represents a single iteration of a dynamic autoscaler inside the CA's control-loop // RunOnce represents a single iteration of a dynamic autoscaler inside the CA's control-loop
func (a *DynamicAutoscaler) RunOnce(currentTime time.Time) { func (a *DynamicAutoscaler) RunOnce(currentTime time.Time) *errors.AutoscalerError {
reconfigureStart := time.Now() reconfigureStart := time.Now()
metrics.UpdateLastTime("reconfigure", reconfigureStart) metrics.UpdateLastTime("reconfigure", reconfigureStart)
if err := a.Reconfigure(); err != nil { if err := a.Reconfigure(); err != nil {
glog.Errorf("Failed to reconfigure : %v", err) glog.Errorf("Failed to reconfigure : %v", err)
} }
metrics.UpdateDuration("reconfigure", reconfigureStart) metrics.UpdateDuration("reconfigure", reconfigureStart)
a.autoscaler.RunOnce(currentTime) return a.autoscaler.RunOnce(currentTime)
} }
// Reconfigure this dynamic autoscaler if the configmap is updated // Reconfigure this dynamic autoscaler if the configmap is updated

View File

@ -19,6 +19,7 @@ package core
import ( import (
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic" "k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"testing" "testing"
"time" "time"
) )
@ -27,8 +28,9 @@ type AutoscalerMock struct {
mock.Mock mock.Mock
} }
func (m *AutoscalerMock) RunOnce(currentTime time.Time) { func (m *AutoscalerMock) RunOnce(currentTime time.Time) *errors.AutoscalerError {
m.Called(currentTime) m.Called(currentTime)
return nil
} }
func (m *AutoscalerMock) CleanUp() { func (m *AutoscalerMock) CleanUp() {

View File

@ -21,6 +21,7 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/autoscaler/cluster-autoscaler/metrics" "k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
) )
// PollingAutoscaler is a variant of autoscaler which polls the source-of-truth every time RunOnce is invoked // PollingAutoscaler is a variant of autoscaler which polls the source-of-truth every time RunOnce is invoked
@ -48,14 +49,14 @@ func (a *PollingAutoscaler) ExitCleanUp() {
} }
// RunOnce represents a single iteration of a polling autoscaler inside the CA's control-loop // RunOnce represents a single iteration of a polling autoscaler inside the CA's control-loop
func (a *PollingAutoscaler) RunOnce(currentTime time.Time) { func (a *PollingAutoscaler) RunOnce(currentTime time.Time) *errors.AutoscalerError {
reconfigureStart := time.Now() reconfigureStart := time.Now()
metrics.UpdateLastTime("poll", reconfigureStart) metrics.UpdateLastTime("poll", reconfigureStart)
if err := a.Poll(); err != nil { if err := a.Poll(); err != nil {
glog.Errorf("Failed to poll : %v", err) glog.Errorf("Failed to poll : %v", err)
} }
metrics.UpdateDuration("poll", reconfigureStart) metrics.UpdateDuration("poll", reconfigureStart)
a.autoscaler.RunOnce(currentTime) return a.autoscaler.RunOnce(currentTime)
} }
// Poll latest data from cloud provider to recreate this autoscaler // Poll latest data from cloud provider to recreate this autoscaler

View File

@ -21,6 +21,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils" "k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
"k8s.io/autoscaler/cluster-autoscaler/metrics" "k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
kube_record "k8s.io/client-go/tools/record" kube_record "k8s.io/client-go/tools/record"
@ -72,7 +73,7 @@ func (a *StaticAutoscaler) CleanUp() {
} }
// RunOnce iterates over node groups and scales them up/down if necessary // RunOnce iterates over node groups and scales them up/down if necessary
func (a *StaticAutoscaler) RunOnce(currentTime time.Time) { func (a *StaticAutoscaler) RunOnce(currentTime time.Time) *errors.AutoscalerError {
readyNodeLister := a.ReadyNodeLister() readyNodeLister := a.ReadyNodeLister()
allNodeLister := a.AllNodeLister() allNodeLister := a.AllNodeLister()
unschedulablePodLister := a.UnschedulablePodLister() unschedulablePodLister := a.UnschedulablePodLister()
@ -85,30 +86,30 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) {
readyNodes, err := readyNodeLister.List() readyNodes, err := readyNodeLister.List()
if err != nil { if err != nil {
glog.Errorf("Failed to list ready nodes: %v", err) glog.Errorf("Failed to list ready nodes: %v", err)
return return errors.ToAutoscalerError(errors.ApiCallError, err)
} }
if len(readyNodes) == 0 { if len(readyNodes) == 0 {
glog.Error("No ready nodes in the cluster") glog.Error("No ready nodes in the cluster")
scaleDown.CleanUpUnneededNodes() scaleDown.CleanUpUnneededNodes()
return return nil
} }
allNodes, err := allNodeLister.List() allNodes, err := allNodeLister.List()
if err != nil { if err != nil {
glog.Errorf("Failed to list all nodes: %v", err) glog.Errorf("Failed to list all nodes: %v", err)
return return errors.ToAutoscalerError(errors.ApiCallError, err)
} }
if len(allNodes) == 0 { if len(allNodes) == 0 {
glog.Error("No nodes in the cluster") glog.Error("No nodes in the cluster")
scaleDown.CleanUpUnneededNodes() scaleDown.CleanUpUnneededNodes()
return return nil
} }
err = a.ClusterStateRegistry.UpdateNodes(allNodes, currentTime) err = a.ClusterStateRegistry.UpdateNodes(allNodes, currentTime)
if err != nil { if err != nil {
glog.Errorf("Failed to update node registry: %v", err) glog.Errorf("Failed to update node registry: %v", err)
scaleDown.CleanUpUnneededNodes() scaleDown.CleanUpUnneededNodes()
return return errors.ToAutoscalerError(errors.CloudProviderError, err)
} }
metrics.UpdateClusterState(a.ClusterStateRegistry) metrics.UpdateClusterState(a.ClusterStateRegistry)
@ -122,7 +123,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) {
if !a.ClusterStateRegistry.IsClusterHealthy() { if !a.ClusterStateRegistry.IsClusterHealthy() {
glog.Warning("Cluster is not ready for autoscaling") glog.Warning("Cluster is not ready for autoscaling")
scaleDown.CleanUpUnneededNodes() scaleDown.CleanUpUnneededNodes()
return return nil
} }
metrics.UpdateDuration("updateClusterState", runStart) metrics.UpdateDuration("updateClusterState", runStart)
@ -142,12 +143,12 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) {
glog.Warningf("Failed to remove unregistered nodes: %v", err) glog.Warningf("Failed to remove unregistered nodes: %v", err)
} }
return return errors.ToAutoscalerError(errors.CloudProviderError, err)
} }
// Some nodes were removed. Let's skip this iteration, the next one should be better. // Some nodes were removed. Let's skip this iteration, the next one should be better.
if removedAny { if removedAny {
glog.V(0).Infof("Some unregistered nodes were removed, skipping iteration") glog.V(0).Infof("Some unregistered nodes were removed, skipping iteration")
return return nil
} }
} }
@ -157,24 +158,24 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) {
fixedSomething, err := fixNodeGroupSize(autoscalingContext, time.Now()) fixedSomething, err := fixNodeGroupSize(autoscalingContext, time.Now())
if err != nil { if err != nil {
glog.Warningf("Failed to fix node group sizes: %v", err) glog.Warningf("Failed to fix node group sizes: %v", err)
return return errors.ToAutoscalerError(errors.CloudProviderError, err)
} }
if fixedSomething { if fixedSomething {
glog.V(0).Infof("Some node group target size was fixed, skipping the iteration") glog.V(0).Infof("Some node group target size was fixed, skipping the iteration")
return return nil
} }
allUnschedulablePods, err := unschedulablePodLister.List() allUnschedulablePods, err := unschedulablePodLister.List()
if err != nil { if err != nil {
glog.Errorf("Failed to list unscheduled pods: %v", err) glog.Errorf("Failed to list unscheduled pods: %v", err)
return return errors.ToAutoscalerError(errors.ApiCallError, err)
} }
metrics.UpdateUnschedulablePodsCount(len(allUnschedulablePods)) metrics.UpdateUnschedulablePodsCount(len(allUnschedulablePods))
allScheduled, err := scheduledPodLister.List() allScheduled, err := scheduledPodLister.List()
if err != nil { if err != nil {
glog.Errorf("Failed to list scheduled pods: %v", err) glog.Errorf("Failed to list scheduled pods: %v", err)
return return errors.ToAutoscalerError(errors.ApiCallError, err)
} }
// We need to reset all pods that have been marked as unschedulable not after // We need to reset all pods that have been marked as unschedulable not after
@ -233,11 +234,12 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) {
if err != nil { if err != nil {
glog.Errorf("Failed to scale up: %v", err) glog.Errorf("Failed to scale up: %v", err)
return // TODO(maciekpytel): temporary hack, fix this
return nil
} else if scaledUp { } else if scaledUp {
a.lastScaleUpTime = time.Now() a.lastScaleUpTime = time.Now()
// No scale down in this iteration. // No scale down in this iteration.
return return nil
} }
} }
@ -247,7 +249,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) {
pdbs, err := pdbLister.List() pdbs, err := pdbLister.List()
if err != nil { if err != nil {
glog.Errorf("Failed to list pod disruption budgets: %v", err) glog.Errorf("Failed to list pod disruption budgets: %v", err)
return return errors.ToAutoscalerError(errors.ApiCallError, err)
} }
// In dry run only utilization is updated // In dry run only utilization is updated
@ -265,7 +267,8 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) {
err = scaleDown.UpdateUnneededNodes(allNodes, allScheduled, time.Now(), pdbs) err = scaleDown.UpdateUnneededNodes(allNodes, allScheduled, time.Now(), pdbs)
if err != nil { if err != nil {
glog.Warningf("Failed to scale down: %v", err) glog.Warningf("Failed to scale down: %v", err)
return // TODO(maciekpytel): temporary hack, fix this
return nil
} }
metrics.UpdateDuration("findUnneeded", unneededStart) metrics.UpdateDuration("findUnneeded", unneededStart)
@ -294,6 +297,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) {
} }
} }
} }
return nil
} }
// ExitCleanUp removes status configmap. // ExitCleanUp removes status configmap.

View File

@ -35,6 +35,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/expander" "k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/metrics" "k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
kube_client "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" kube_client "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
kube_leaderelection "k8s.io/kubernetes/pkg/client/leaderelection" kube_leaderelection "k8s.io/kubernetes/pkg/client/leaderelection"
@ -191,7 +192,10 @@ func run(_ <-chan struct{}) {
loopStart := time.Now() loopStart := time.Now()
metrics.UpdateLastTime("main", loopStart) metrics.UpdateLastTime("main", loopStart)
autoscaler.RunOnce(loopStart) err := autoscaler.RunOnce(loopStart)
if err != nil && err.Type() != errors.TransientError {
metrics.RegisterError(err)
}
metrics.UpdateDuration("main", loopStart) metrics.UpdateDuration("main", loopStart)
} }

View File

@ -21,6 +21,7 @@ import (
"time" "time"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
@ -86,6 +87,14 @@ var (
) )
/**** Metrics related to autoscaler operations ****/ /**** Metrics related to autoscaler operations ****/
errorsCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: caNamespace,
Name: "errors_total",
Help: "The number of CA loops failed due to an error.",
}, []string{"type"},
)
scaleUpCount = prometheus.NewCounter( scaleUpCount = prometheus.NewCounter(
prometheus.CounterOpts{ prometheus.CounterOpts{
Namespace: caNamespace, Namespace: caNamespace,
@ -125,6 +134,7 @@ func init() {
prometheus.MustRegister(unschedulablePodsCount) prometheus.MustRegister(unschedulablePodsCount)
prometheus.MustRegister(lastActivity) prometheus.MustRegister(lastActivity)
prometheus.MustRegister(functionDuration) prometheus.MustRegister(functionDuration)
prometheus.MustRegister(errorsCount)
prometheus.MustRegister(scaleUpCount) prometheus.MustRegister(scaleUpCount)
prometheus.MustRegister(scaleDownCount) prometheus.MustRegister(scaleDownCount)
prometheus.MustRegister(evictionsCount) prometheus.MustRegister(evictionsCount)
@ -166,6 +176,12 @@ func UpdateUnschedulablePodsCount(podsCount int) {
unschedulablePodsCount.Set(float64(podsCount)) unschedulablePodsCount.Set(float64(podsCount))
} }
// RegisterError records any errors preventing Cluster Autoscaler from working.
// No more than one error should be recorded per loop.
func RegisterError(err *errors.AutoscalerError) {
errorsCount.WithLabelValues(string(err.Type())).Add(1.0)
}
// RegisterScaleUp records number of nodes added by scale up // RegisterScaleUp records number of nodes added by scale up
func RegisterScaleUp(nodesCount int) { func RegisterScaleUp(nodesCount int) {
scaleUpCount.Add(float64(nodesCount)) scaleUpCount.Add(float64(nodesCount))

View File

@ -0,0 +1,69 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package errors
import (
"fmt"
)
// AutoscalerErrorType describes a high-level category of a given error
type AutoscalerErrorType string
// AutoscalerError contains information about Autoscaler errors
type AutoscalerError struct {
errorType AutoscalerErrorType
msg string
}
const (
// CloudProviderError is an error related to underlying infrastructure
CloudProviderError AutoscalerErrorType = "cloudProviderError"
// ApiCallError is an error related to communication with k8s API server
ApiCallError AutoscalerErrorType = "apiCallError"
// InternalError is an error inside Cluster Autoscaler
InternalError AutoscalerErrorType = "internalError"
// TransientError is an error that causes us to skip a single loop, but
// does not require any additional action.
TransientError AutoscalerErrorType = "transientError"
)
// NewAutoscalerError returns new autoscaler error with a message constructed from format string
func NewAutoscalerError(errorType AutoscalerErrorType, msg string, args ...interface{}) *AutoscalerError {
return &AutoscalerError{
errorType: errorType,
msg: fmt.Sprintf(msg, args...),
}
}
// ToAutoscalerError converts an error to AutoscalerError with given type,
// unless it already is an AutoscalerError (in which case it's not modified).
func ToAutoscalerError(defaultType AutoscalerErrorType, err error) *AutoscalerError {
if e, ok := err.(*AutoscalerError); ok {
return e
}
return NewAutoscalerError(defaultType, err.Error())
}
// Error implements golang error interface
func (e *AutoscalerError) Error() string {
return e.msg
}
// Type returns the typ of AutoscalerError
func (e *AutoscalerError) Type() AutoscalerErrorType {
return e.errorType
}