Merge pull request #612 from KarolKraskiewicz/loading-cluster-state-refactoring
Refactoring: loading ClusterState
This commit is contained in:
commit
2722fa6297
|
|
@ -0,0 +1,211 @@
|
|||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package input
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
vpa_clientset "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned"
|
||||
vpa_api "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned/typed/poc.autoscaling.k8s.io/v1alpha1"
|
||||
vpa_lister "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/listers/poc.autoscaling.k8s.io/v1alpha1"
|
||||
vpa_api_util "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/vpa"
|
||||
"k8s.io/autoscaler/vertical-pod-autoscaler/recommender/input/history"
|
||||
"k8s.io/autoscaler/vertical-pod-autoscaler/recommender/input/metrics"
|
||||
"k8s.io/autoscaler/vertical-pod-autoscaler/recommender/input/spec"
|
||||
"k8s.io/autoscaler/vertical-pod-autoscaler/recommender/model"
|
||||
kube_client "k8s.io/client-go/kubernetes"
|
||||
v1lister "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
resourceclient "k8s.io/metrics/pkg/client/clientset_generated/clientset/typed/metrics/v1beta1"
|
||||
)
|
||||
|
||||
// ClusterStateFeeder can update state of ClusterState object.
|
||||
type ClusterStateFeeder interface {
|
||||
|
||||
// LoadHistory loads historical pod spec and metrics into clusterState.
|
||||
LoadHistory()
|
||||
|
||||
// LoadVPAs updtes clusterState with current state of VPAs.
|
||||
LoadVPAs()
|
||||
|
||||
// LoadPods updates slusterState with current specification of Pods and their Containers.
|
||||
LoadPods()
|
||||
|
||||
// LoadRealTimeMetrics updates clusterState with current usage metrics of containers.
|
||||
LoadRealTimeMetrics()
|
||||
}
|
||||
|
||||
// NewClusterStateFeeder creates new ClusterStateFeeder with internal data providers, based on kube client config and a historyProvider.
|
||||
func NewClusterStateFeeder(config *rest.Config, historyProvider history.HistoryProvider, clusterState *model.ClusterState) ClusterStateFeeder {
|
||||
return &clusterStateFeeder{
|
||||
specClient: newSpecClient(config),
|
||||
metricsClient: newMetricsClient(config),
|
||||
vpaClient: vpa_clientset.NewForConfigOrDie(config).PocV1alpha1(),
|
||||
vpaLister: vpa_api_util.NewAllVpasLister(vpa_clientset.NewForConfigOrDie(config), make(chan struct{})),
|
||||
historyProvider: historyProvider,
|
||||
clusterState: clusterState,
|
||||
}
|
||||
}
|
||||
|
||||
func newSpecClient(config *rest.Config) spec.SpecClient {
|
||||
kubeClient := kube_client.NewForConfigOrDie(config)
|
||||
podLister := newPodLister(kubeClient)
|
||||
return spec.NewSpecClient(podLister)
|
||||
}
|
||||
|
||||
func newMetricsClient(config *rest.Config) metrics.MetricsClient {
|
||||
metricsGetter := resourceclient.NewForConfigOrDie(config)
|
||||
return metrics.NewMetricsClient(metricsGetter)
|
||||
}
|
||||
|
||||
// Creates PodLister, listing only not terminated pods.
|
||||
func newPodLister(kubeClient kube_client.Interface) v1lister.PodLister {
|
||||
selector := fields.ParseSelectorOrDie("status.phase!=" + string(apiv1.PodPending))
|
||||
podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", apiv1.NamespaceAll, selector)
|
||||
store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||
podLister := v1lister.NewPodLister(store)
|
||||
podReflector := cache.NewReflector(podListWatch, &apiv1.Pod{}, store, time.Hour)
|
||||
stopCh := make(chan struct{})
|
||||
go podReflector.Run(stopCh)
|
||||
return podLister
|
||||
}
|
||||
|
||||
type clusterStateFeeder struct {
|
||||
specClient spec.SpecClient
|
||||
metricsClient metrics.MetricsClient
|
||||
vpaClient vpa_api.VerticalPodAutoscalersGetter
|
||||
vpaLister vpa_lister.VerticalPodAutoscalerLister
|
||||
historyProvider history.HistoryProvider
|
||||
clusterState *model.ClusterState
|
||||
}
|
||||
|
||||
func (feeder *clusterStateFeeder) LoadHistory() {
|
||||
clusterHistory, err := feeder.historyProvider.GetClusterHistory()
|
||||
if err != nil {
|
||||
glog.Errorf("Cannot get cluster history: %v", err)
|
||||
}
|
||||
for podID, podHistory := range clusterHistory {
|
||||
glog.V(4).Infof("Adding pod %v with labels %v", podID, podHistory.LastLabels)
|
||||
feeder.clusterState.AddOrUpdatePod(podID, podHistory.LastLabels)
|
||||
for containerName, sampleList := range podHistory.Samples {
|
||||
containerID := model.ContainerID{
|
||||
PodID: podID,
|
||||
ContainerName: containerName}
|
||||
glog.V(4).Infof("Adding %d samples for container %v", len(sampleList), containerID)
|
||||
for _, sample := range sampleList {
|
||||
feeder.clusterState.AddSample(
|
||||
&model.ContainerUsageSampleWithKey{
|
||||
ContainerUsageSample: sample,
|
||||
Container: containerID})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch VPA objects and load them into the cluster state.
|
||||
func (feeder *clusterStateFeeder) LoadVPAs() {
|
||||
// List VPA API objects.
|
||||
vpaCRDs, err := feeder.vpaLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
glog.Errorf("Cannot list VPAs. Reason: %+v", err)
|
||||
} else {
|
||||
glog.V(3).Infof("Fetched VPAs.")
|
||||
}
|
||||
// Add or update existing VPAs in the model.
|
||||
vpaKeys := make(map[model.VpaID]bool)
|
||||
for n, vpaCRD := range vpaCRDs {
|
||||
glog.V(3).Infof("VPA CRD #%v: %+v", n, vpaCRD)
|
||||
vpaID := model.VpaID{
|
||||
Namespace: vpaCRD.Namespace,
|
||||
VpaName: vpaCRD.Name}
|
||||
if feeder.clusterState.AddOrUpdateVpa(vpaCRD) == nil {
|
||||
// Successfully added VPA to the model.
|
||||
vpaKeys[vpaID] = true
|
||||
}
|
||||
|
||||
}
|
||||
// Delete non-existent VPAs from the model.
|
||||
for vpaID := range feeder.clusterState.Vpas {
|
||||
if _, exists := vpaKeys[vpaID]; !exists {
|
||||
glog.V(3).Infof("Deleting VPA %v", vpaID)
|
||||
feeder.clusterState.DeleteVpa(vpaID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Load pod into the cluster state.
|
||||
func (feeder *clusterStateFeeder) LoadPods() {
|
||||
podSpecs, err := feeder.specClient.GetPodSpecs()
|
||||
if err != nil {
|
||||
glog.Errorf("Cannot get SimplePodSpecs. Reason: %+v", err)
|
||||
}
|
||||
pods := make(map[model.PodID]*spec.BasicPodSpec)
|
||||
for n, spec := range podSpecs {
|
||||
glog.V(3).Infof("SimplePodSpec #%v: %+v", n, spec)
|
||||
pods[spec.ID] = spec
|
||||
}
|
||||
for key := range feeder.clusterState.Pods {
|
||||
if _, exists := pods[key]; !exists {
|
||||
glog.V(3).Infof("Deleting Pod %v", key)
|
||||
feeder.clusterState.DeletePod(key)
|
||||
}
|
||||
}
|
||||
for _, pod := range pods {
|
||||
feeder.clusterState.AddOrUpdatePod(pod.ID, pod.PodLabels)
|
||||
for _, container := range pod.Containers {
|
||||
feeder.clusterState.AddOrUpdateContainer(container.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (feeder *clusterStateFeeder) LoadRealTimeMetrics() {
|
||||
containersMetrics, err := feeder.metricsClient.GetContainersMetrics()
|
||||
if err != nil {
|
||||
glog.Errorf("Cannot get ContainerMetricsSnapshot from MetricsClient. Reason: %+v", err)
|
||||
}
|
||||
|
||||
sampleCount := 0
|
||||
for _, containerMetrics := range containersMetrics {
|
||||
for _, sample := range newContainerUsageSamplesWithKey(containerMetrics) {
|
||||
feeder.clusterState.AddSample(sample)
|
||||
sampleCount++
|
||||
}
|
||||
}
|
||||
glog.V(3).Infof("ClusterSpec fed with #%v ContainerUsageSamples for #%v containers", sampleCount, len(containersMetrics))
|
||||
}
|
||||
|
||||
func newContainerUsageSamplesWithKey(metrics *metrics.ContainerMetricsSnapshot) []*model.ContainerUsageSampleWithKey {
|
||||
var samples []*model.ContainerUsageSampleWithKey
|
||||
|
||||
for metricName, resourceAmount := range metrics.Usage {
|
||||
sample := &model.ContainerUsageSampleWithKey{
|
||||
Container: metrics.ID,
|
||||
ContainerUsageSample: model.ContainerUsageSample{
|
||||
MeasureStart: metrics.SnapshotTime,
|
||||
Resource: metricName,
|
||||
Usage: resourceAmount,
|
||||
},
|
||||
}
|
||||
samples = append(samples, sample)
|
||||
}
|
||||
return samples
|
||||
}
|
||||
|
|
@ -20,24 +20,15 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
apiv1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/poc.autoscaling.k8s.io/v1alpha1"
|
||||
vpa_clientset "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned"
|
||||
vpa_api "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned/typed/poc.autoscaling.k8s.io/v1alpha1"
|
||||
vpa_lister "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/listers/poc.autoscaling.k8s.io/v1alpha1"
|
||||
vpa_api_util "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/vpa"
|
||||
"k8s.io/autoscaler/vertical-pod-autoscaler/recommender/input"
|
||||
"k8s.io/autoscaler/vertical-pod-autoscaler/recommender/input/history"
|
||||
"k8s.io/autoscaler/vertical-pod-autoscaler/recommender/input/metrics"
|
||||
"k8s.io/autoscaler/vertical-pod-autoscaler/recommender/input/spec"
|
||||
"k8s.io/autoscaler/vertical-pod-autoscaler/recommender/logic"
|
||||
"k8s.io/autoscaler/vertical-pod-autoscaler/recommender/model"
|
||||
kube_client "k8s.io/client-go/kubernetes"
|
||||
v1lister "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
resourceclient "k8s.io/metrics/pkg/client/clientset_generated/clientset/typed/metrics/v1beta1"
|
||||
)
|
||||
|
||||
// Recommender recommend resources for certain containers, based on utilization periodically got from metrics api.
|
||||
|
|
@ -47,127 +38,12 @@ type Recommender interface {
|
|||
|
||||
type recommender struct {
|
||||
clusterState *model.ClusterState
|
||||
specClient spec.SpecClient
|
||||
metricsClient metrics.MetricsClient
|
||||
clusterStateFeeder input.ClusterStateFeeder
|
||||
metricsFetchingInterval time.Duration
|
||||
historyProvider history.HistoryProvider
|
||||
vpaClient vpa_api.VerticalPodAutoscalersGetter
|
||||
vpaLister vpa_lister.VerticalPodAutoscalerLister
|
||||
podResourceRecommender logic.PodResourceRecommender
|
||||
}
|
||||
|
||||
func (r *recommender) readHistory() {
|
||||
clusterHistory, err := r.historyProvider.GetClusterHistory()
|
||||
if err != nil {
|
||||
glog.Errorf("Cannot get cluster history: %v", err)
|
||||
}
|
||||
for podID, podHistory := range clusterHistory {
|
||||
glog.V(4).Infof("Adding pod %v with labels %v", podID, podHistory.LastLabels)
|
||||
r.clusterState.AddOrUpdatePod(podID, podHistory.LastLabels)
|
||||
for containerName, sampleList := range podHistory.Samples {
|
||||
containerID := model.ContainerID{
|
||||
PodID: podID,
|
||||
ContainerName: containerName}
|
||||
glog.V(4).Infof("Adding %d samples for container %v", len(sampleList), containerID)
|
||||
for _, sample := range sampleList {
|
||||
r.clusterState.AddSample(
|
||||
&model.ContainerUsageSampleWithKey{
|
||||
ContainerUsageSample: sample,
|
||||
Container: containerID})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch VPA objects and load them into the cluster state.
|
||||
func (r *recommender) loadVPAs() {
|
||||
// List VPA API objects.
|
||||
vpaCRDs, err := r.vpaLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
glog.Errorf("Cannot list VPAs. Reason: %+v", err)
|
||||
} else {
|
||||
glog.V(3).Infof("Fetched VPAs.")
|
||||
}
|
||||
// Add or update existing VPAs in the model.
|
||||
vpaKeys := make(map[model.VpaID]bool)
|
||||
for n, vpaCRD := range vpaCRDs {
|
||||
glog.V(3).Infof("VPA CRD #%v: %+v", n, vpaCRD)
|
||||
vpaID := model.VpaID{
|
||||
Namespace: vpaCRD.Namespace,
|
||||
VpaName: vpaCRD.Name}
|
||||
if r.clusterState.AddOrUpdateVpa(vpaCRD) == nil {
|
||||
// Successfully added VPA to the model.
|
||||
vpaKeys[vpaID] = true
|
||||
}
|
||||
|
||||
}
|
||||
// Delete non-existent VPAs from the model.
|
||||
for vpaID := range r.clusterState.Vpas {
|
||||
if _, exists := vpaKeys[vpaID]; !exists {
|
||||
glog.V(3).Infof("Deleting VPA %v", vpaID)
|
||||
r.clusterState.DeleteVpa(vpaID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Load pod into the cluster state.
|
||||
func (r *recommender) loadPods() {
|
||||
podSpecs, err := r.specClient.GetPodSpecs()
|
||||
if err != nil {
|
||||
glog.Errorf("Cannot get SimplePodSpecs. Reason: %+v", err)
|
||||
}
|
||||
pods := make(map[model.PodID]*spec.BasicPodSpec)
|
||||
for n, spec := range podSpecs {
|
||||
glog.V(3).Infof("SimplePodSpec #%v: %+v", n, spec)
|
||||
pods[spec.ID] = spec
|
||||
}
|
||||
for key := range r.clusterState.Pods {
|
||||
if _, exists := pods[key]; !exists {
|
||||
glog.V(3).Infof("Deleting Pod %v", key)
|
||||
r.clusterState.DeletePod(key)
|
||||
}
|
||||
}
|
||||
for _, pod := range pods {
|
||||
r.clusterState.AddOrUpdatePod(pod.ID, pod.PodLabels)
|
||||
for _, container := range pod.Containers {
|
||||
r.clusterState.AddOrUpdateContainer(container.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *recommender) loadRealTimeMetrics() {
|
||||
containersMetrics, err := r.metricsClient.GetContainersMetrics()
|
||||
if err != nil {
|
||||
glog.Errorf("Cannot get ContainerMetricsSnapshot from MetricsClient. Reason: %+v", err)
|
||||
}
|
||||
|
||||
sampleCount := 0
|
||||
for _, containerMetrics := range containersMetrics {
|
||||
for _, sample := range newContainerUsageSamplesWithKey(containerMetrics) {
|
||||
r.clusterState.AddSample(sample)
|
||||
sampleCount++
|
||||
}
|
||||
}
|
||||
glog.V(3).Infof("ClusterSpec fed with #%v ContainerUsageSamples for #%v containers", sampleCount, len(containersMetrics))
|
||||
}
|
||||
|
||||
func newContainerUsageSamplesWithKey(metrics *metrics.ContainerMetricsSnapshot) []*model.ContainerUsageSampleWithKey {
|
||||
var samples []*model.ContainerUsageSampleWithKey
|
||||
|
||||
for metricName, resourceAmount := range metrics.Usage {
|
||||
sample := &model.ContainerUsageSampleWithKey{
|
||||
Container: metrics.ID,
|
||||
ContainerUsageSample: model.ContainerUsageSample{
|
||||
MeasureStart: metrics.SnapshotTime,
|
||||
Resource: metricName,
|
||||
Usage: resourceAmount,
|
||||
},
|
||||
}
|
||||
samples = append(samples, sample)
|
||||
}
|
||||
return samples
|
||||
}
|
||||
|
||||
// Updates VPA CRD objects' statuses.
|
||||
func (r *recommender) updateVPAs() {
|
||||
for key, vpa := range r.clusterState.Vpas {
|
||||
|
|
@ -201,15 +77,15 @@ func (r *recommender) updateVPAs() {
|
|||
// It will be soon replaced by something more useful.
|
||||
func (r *recommender) runOnce() {
|
||||
glog.V(3).Infof("Recommender Run")
|
||||
r.loadVPAs()
|
||||
r.loadPods()
|
||||
r.loadRealTimeMetrics()
|
||||
r.clusterStateFeeder.LoadVPAs()
|
||||
r.clusterStateFeeder.LoadPods()
|
||||
r.clusterStateFeeder.LoadRealTimeMetrics()
|
||||
r.updateVPAs()
|
||||
glog.V(3).Infof("ClusterState is tracking %v PodStates and %v VPAs", len(r.clusterState.Pods), len(r.clusterState.Vpas))
|
||||
}
|
||||
|
||||
func (r *recommender) Run() {
|
||||
r.readHistory()
|
||||
r.clusterStateFeeder.LoadHistory()
|
||||
for {
|
||||
select {
|
||||
case <-time.After(r.metricsFetchingInterval):
|
||||
|
|
@ -239,40 +115,14 @@ func createPodResourceRecommender() logic.PodResourceRecommender {
|
|||
// which can be run in order to provide continuous resource recommendations for containers.
|
||||
// It requires cluster configuration object and duration between recommender intervals.
|
||||
func NewRecommender(config *rest.Config, metricsFetcherInterval time.Duration, historyProvider history.HistoryProvider) Recommender {
|
||||
clusterState := model.NewClusterState()
|
||||
recommender := &recommender{
|
||||
clusterState: model.NewClusterState(),
|
||||
specClient: newSpecClient(config),
|
||||
metricsClient: newMetricsClient(config),
|
||||
clusterState: clusterState,
|
||||
clusterStateFeeder: input.NewClusterStateFeeder(config, historyProvider, clusterState),
|
||||
metricsFetchingInterval: metricsFetcherInterval,
|
||||
historyProvider: historyProvider,
|
||||
vpaClient: vpa_clientset.NewForConfigOrDie(config).PocV1alpha1(),
|
||||
vpaLister: vpa_api_util.NewAllVpasLister(vpa_clientset.NewForConfigOrDie(config), make(chan struct{})),
|
||||
podResourceRecommender: createPodResourceRecommender(),
|
||||
}
|
||||
glog.V(3).Infof("New Recommender created %+v", recommender)
|
||||
|
||||
return recommender
|
||||
}
|
||||
|
||||
func newSpecClient(config *rest.Config) spec.SpecClient {
|
||||
kubeClient := kube_client.NewForConfigOrDie(config)
|
||||
podLister := newPodLister(kubeClient)
|
||||
return spec.NewSpecClient(podLister)
|
||||
}
|
||||
|
||||
func newMetricsClient(config *rest.Config) metrics.MetricsClient {
|
||||
metricsGetter := resourceclient.NewForConfigOrDie(config)
|
||||
return metrics.NewMetricsClient(metricsGetter)
|
||||
}
|
||||
|
||||
// Creates PodLister, listing only not terminated pods.
|
||||
func newPodLister(kubeClient kube_client.Interface) v1lister.PodLister {
|
||||
selector := fields.ParseSelectorOrDie("status.phase!=" + string(apiv1.PodPending))
|
||||
podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", apiv1.NamespaceAll, selector)
|
||||
store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||
podLister := v1lister.NewPodLister(store)
|
||||
podReflector := cache.NewReflector(podListWatch, &apiv1.Pod{}, store, time.Hour)
|
||||
stopCh := make(chan struct{})
|
||||
go podReflector.Run(stopCh)
|
||||
return podLister
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue