Use controllerCacheStorage
This commit is contained in:
parent
bb9b23c55c
commit
9d7898a5a5
|
|
@ -49,7 +49,13 @@ import (
|
|||
resourceclient "k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1"
|
||||
)
|
||||
|
||||
const defaultResyncPeriod time.Duration = 10 * time.Minute
|
||||
const (
|
||||
scaleCacheLoopPeriod time.Duration = 7 * time.Second
|
||||
scaleCacheEntryLifetime time.Duration = time.Hour
|
||||
scaleCacheEntryFreshnessTime time.Duration = 10 * time.Minute
|
||||
scaleCacheEntryJitterFactor float64 = 1.
|
||||
defaultResyncPeriod time.Duration = 10 * time.Minute
|
||||
)
|
||||
|
||||
// ClusterStateFeeder can update state of ClusterState object.
|
||||
type ClusterStateFeeder interface {
|
||||
|
|
@ -108,7 +114,8 @@ func NewClusterStateFeeder(config *rest.Config, clusterState *model.ClusterState
|
|||
kubeClient := kube_client.NewForConfigOrDie(config)
|
||||
podLister, oomObserver := NewPodListerAndOOMObserver(kubeClient, namespace)
|
||||
factory := informers.NewSharedInformerFactoryWithOptions(kubeClient, defaultResyncPeriod, informers.WithNamespace(namespace))
|
||||
controllerFetcher := controllerfetcher.NewControllerFetcher(config, kubeClient, factory)
|
||||
controllerFetcher := controllerfetcher.NewControllerFetcher(config, kubeClient, factory, scaleCacheEntryFreshnessTime, scaleCacheEntryLifetime, scaleCacheEntryJitterFactor)
|
||||
controllerFetcher.Start(context.TODO(), scaleCacheLoopPeriod)
|
||||
return ClusterStateFeederFactory{
|
||||
PodLister: podLister,
|
||||
OOMObserver: oomObserver,
|
||||
|
|
|
|||
|
|
@ -34,6 +34,7 @@ type scaleCacheKey struct {
|
|||
groupResource schema.GroupResource
|
||||
name string
|
||||
}
|
||||
|
||||
type scaleCacheEntry struct {
|
||||
refreshAfter time.Time
|
||||
deleteAfter time.Time
|
||||
|
|
@ -133,8 +134,10 @@ func (cc *controllerCacheStorage) RemoveExpired() {
|
|||
cc.mux.Lock()
|
||||
defer cc.mux.Unlock()
|
||||
now := now()
|
||||
removed := 0
|
||||
for k, v := range cc.cache {
|
||||
if now.After(v.deleteAfter) {
|
||||
removed += 1
|
||||
delete(cc.cache, k)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import (
|
|||
"time"
|
||||
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
autoscalingapi "k8s.io/api/autoscaling/v1"
|
||||
batchv1 "k8s.io/api/batch/v1"
|
||||
batchv1beta1 "k8s.io/api/batch/v1beta1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
|
|
@ -79,43 +80,36 @@ type ControllerFetcher interface {
|
|||
}
|
||||
|
||||
type controllerFetcher struct {
|
||||
scaleNamespacer scale.ScalesGetter
|
||||
mapper apimeta.RESTMapper
|
||||
informersMap map[wellKnownController]cache.SharedIndexInformer
|
||||
scaleNamespacer scale.ScalesGetter
|
||||
mapper apimeta.RESTMapper
|
||||
informersMap map[wellKnownController]cache.SharedIndexInformer
|
||||
scaleSubresourceCacheStorage controllerCacheStorage
|
||||
}
|
||||
|
||||
func (f *controllerFetcher) periodicallyRemoveExpired(ctx context.Context, period time.Duration) {
|
||||
func (f *controllerFetcher) periodicallyRefreshCache(ctx context.Context, period time.Duration) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(period):
|
||||
f.controllerCache.RemoveExpired()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (f *controllerFetcher) periodicallyRefresh(ctx context.Context, period time.Duration) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(period):
|
||||
for _, item := range f.controllerCache.GetKeysToRefresh() {
|
||||
keysToRefresh := f.scaleSubresourceCacheStorage.GetKeysToRefresh()
|
||||
klog.Info("Starting to refresh entries in controllerFetchers scaleSubresourceCacheStorage")
|
||||
for _, item := range keysToRefresh {
|
||||
scale, err := f.scaleNamespacer.Scales(item.namespace).Get(context.TODO(), item.groupResource, item.name, metav1.GetOptions{})
|
||||
f.controllerCache.Refresh(item.namespace, item.groupResource, item.name, scale, err)
|
||||
f.scaleSubresourceCacheStorage.Refresh(item.namespace, item.groupResource, item.name, scale, err)
|
||||
}
|
||||
klog.Infof("Finished refreshing %d entries in controllerFetchers scaleSubresourceCacheStorage", len(keysToRefresh))
|
||||
f.scaleSubresourceCacheStorage.RemoveExpired()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (f *controllerFetcher) Start(ctx context.Context, removePeriod, refreshPeriod time.Duration) {
|
||||
go f.periodicallyRefresh(ctx, refreshPeriod)
|
||||
go f.periodicallyRemoveExpired(ctx, removePeriod)
|
||||
func (f *controllerFetcher) Start(ctx context.Context, loopPeriod time.Duration) {
|
||||
go f.periodicallyRefreshCache(ctx, loopPeriod)
|
||||
}
|
||||
|
||||
// NewControllerFetcher returns a new instance of controllerFetcher
|
||||
func NewControllerFetcher(config *rest.Config, kubeClient kube_client.Interface, factory informers.SharedInformerFactory) ControllerFetcher {
|
||||
func NewControllerFetcher(config *rest.Config, kubeClient kube_client.Interface, factory informers.SharedInformerFactory, betweenRefreshes, lifeTime time.Duration, jitterFactor float64) *controllerFetcher {
|
||||
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
|
||||
if err != nil {
|
||||
klog.Fatalf("Could not create discoveryClient: %v", err)
|
||||
|
|
@ -151,9 +145,10 @@ func NewControllerFetcher(config *rest.Config, kubeClient kube_client.Interface,
|
|||
|
||||
scaleNamespacer := scale.New(restClient, mapper, dynamic.LegacyAPIPathResolverFunc, resolver)
|
||||
return &controllerFetcher{
|
||||
scaleNamespacer: scaleNamespacer,
|
||||
mapper: mapper,
|
||||
informersMap: informersMap,
|
||||
scaleNamespacer: scaleNamespacer,
|
||||
mapper: mapper,
|
||||
informersMap: informersMap,
|
||||
scaleSubresourceCacheStorage: newControllerCacheStorage(betweenRefreshes, lifeTime, jitterFactor),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -278,6 +273,15 @@ func (f *controllerFetcher) isWellKnown(key *ControllerKeyWithAPIVersion) bool {
|
|||
return exists
|
||||
}
|
||||
|
||||
func (f *controllerFetcher) getScaleForResource(namespace string, groupResource schema.GroupResource, name string) (controller *autoscalingapi.Scale, err error) {
|
||||
if ok, scale, err := f.scaleSubresourceCacheStorage.Get(namespace, groupResource, name); ok {
|
||||
return scale, err
|
||||
}
|
||||
scale, err := f.scaleNamespacer.Scales(namespace).Get(context.TODO(), groupResource, name, metav1.GetOptions{})
|
||||
f.scaleSubresourceCacheStorage.Insert(namespace, groupResource, name, scale, err)
|
||||
return scale, err
|
||||
}
|
||||
|
||||
func (f *controllerFetcher) isWellKnownOrScalable(key *ControllerKeyWithAPIVersion) bool {
|
||||
if f.isWellKnown(key) {
|
||||
return true
|
||||
|
|
@ -301,7 +305,7 @@ func (f *controllerFetcher) isWellKnownOrScalable(key *ControllerKeyWithAPIVersi
|
|||
|
||||
for _, mapping := range mappings {
|
||||
groupResource := mapping.Resource.GroupResource()
|
||||
scale, err := f.scaleNamespacer.Scales(key.Namespace).Get(context.TODO(), groupResource, key.Name, metav1.GetOptions{})
|
||||
scale, err := f.getScaleForResource(key.Namespace, groupResource, key.Name)
|
||||
if err == nil && scale != nil {
|
||||
return true
|
||||
}
|
||||
|
|
@ -323,7 +327,7 @@ func (f *controllerFetcher) getOwnerForScaleResource(groupKind schema.GroupKind,
|
|||
var lastError error
|
||||
for _, mapping := range mappings {
|
||||
groupResource := mapping.Resource.GroupResource()
|
||||
scale, err := f.scaleNamespacer.Scales(namespace).Get(context.TODO(), groupResource, name, metav1.GetOptions{})
|
||||
scale, err := f.getScaleForResource(namespace, groupResource, name)
|
||||
if err == nil {
|
||||
return getOwnerController(scale.OwnerReferences, namespace), nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -44,6 +44,7 @@ var trueVar = true
|
|||
func simpleControllerFetcher() *controllerFetcher {
|
||||
f := controllerFetcher{}
|
||||
f.informersMap = make(map[wellKnownController]cache.SharedIndexInformer)
|
||||
f.scaleSubresourceCacheStorage = newControllerCacheStorage(time.Second, time.Minute, 0.1)
|
||||
versioned := map[string][]metav1.APIResource{
|
||||
"Foo": {{Kind: "Foo", Name: "bah", Group: "foo"}, {Kind: "Scale", Name: "iCanScale", Group: "foo"}},
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue