Merge pull request #3589 from jbartosik/cache-controllers
Cache controllers
This commit is contained in:
commit
857477df43
|
|
@ -49,7 +49,13 @@ import (
|
|||
resourceclient "k8s.io/metrics/pkg/client/clientset/versioned/typed/metrics/v1beta1"
|
||||
)
|
||||
|
||||
const defaultResyncPeriod time.Duration = 10 * time.Minute
|
||||
const (
|
||||
scaleCacheLoopPeriod time.Duration = 7 * time.Second
|
||||
scaleCacheEntryLifetime time.Duration = time.Hour
|
||||
scaleCacheEntryFreshnessTime time.Duration = 10 * time.Minute
|
||||
scaleCacheEntryJitterFactor float64 = 1.
|
||||
defaultResyncPeriod time.Duration = 10 * time.Minute
|
||||
)
|
||||
|
||||
// ClusterStateFeeder can update state of ClusterState object.
|
||||
type ClusterStateFeeder interface {
|
||||
|
|
@ -108,7 +114,8 @@ func NewClusterStateFeeder(config *rest.Config, clusterState *model.ClusterState
|
|||
kubeClient := kube_client.NewForConfigOrDie(config)
|
||||
podLister, oomObserver := NewPodListerAndOOMObserver(kubeClient, namespace)
|
||||
factory := informers.NewSharedInformerFactoryWithOptions(kubeClient, defaultResyncPeriod, informers.WithNamespace(namespace))
|
||||
controllerFetcher := controllerfetcher.NewControllerFetcher(config, kubeClient, factory)
|
||||
controllerFetcher := controllerfetcher.NewControllerFetcher(config, kubeClient, factory, scaleCacheEntryFreshnessTime, scaleCacheEntryLifetime, scaleCacheEntryJitterFactor)
|
||||
controllerFetcher.Start(context.TODO(), scaleCacheLoopPeriod)
|
||||
return ClusterStateFeederFactory{
|
||||
PodLister: podLister,
|
||||
OOMObserver: oomObserver,
|
||||
|
|
|
|||
|
|
@ -0,0 +1,168 @@
|
|||
/*
|
||||
Copyright 2020 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package controllerfetcher
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
autoscalingapi "k8s.io/api/autoscaling/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/klog"
|
||||
)
|
||||
|
||||
// Allows tests to inject their time.
|
||||
var now = time.Now
|
||||
|
||||
type scaleCacheKey struct {
|
||||
namespace string
|
||||
groupResource schema.GroupResource
|
||||
name string
|
||||
}
|
||||
|
||||
type scaleCacheEntry struct {
|
||||
refreshAfter time.Time
|
||||
deleteAfter time.Time
|
||||
resource *autoscalingapi.Scale
|
||||
err error
|
||||
}
|
||||
|
||||
// Cache for responses to get queries on controllers. Thread safe.
|
||||
// Usage:
|
||||
// - `Get` cached response. If there is one use it, otherwise make query and
|
||||
// - `Insert` the response you got into the cache.
|
||||
// When you create a `controllerCacheStorage` you should start two go routines:
|
||||
// - One for refreshing cache entries, which calls `GetKeysToRefresh` then for
|
||||
// each key makes query to the API server and calls `Refresh` to update
|
||||
// content of the cache.
|
||||
// - Second for removing stale entries which periodically calls `RemoveExpired`
|
||||
// Each entry is refreshed after duration
|
||||
// `validityTime` * (1 + `jitterFactor`)
|
||||
// passes and is removed if there are no reads for it for more than `lifeTime`.
|
||||
//
|
||||
// Sometimes refreshing might take longer than refreshAfter (for example when
|
||||
// VPA is starting in a big cluster and tries to fetch all controllers). To
|
||||
// handle such situation lifeTime should be longer than refreshAfter so the main
|
||||
// VPA loop can do its work quickly, using the cached information (instead of
|
||||
// getting stuck on refreshing the cache).
|
||||
// TODO(jbartosik): Add a way to detect when we don't refresh cache frequently
|
||||
// enough.
|
||||
// TODO(jbartosik): Add a way to learn how long we keep entries around so we can
|
||||
// decide if / how we want to optimize entry refreshes.
|
||||
type controllerCacheStorage struct {
|
||||
cache map[scaleCacheKey]scaleCacheEntry
|
||||
mux sync.Mutex
|
||||
validityTime time.Duration
|
||||
jitterFactor float64
|
||||
lifeTime time.Duration
|
||||
}
|
||||
|
||||
// Returns bool indicating whether the entry was present in the cache and the cached response.
|
||||
// Updates deleteAfter for the element.
|
||||
func (cc *controllerCacheStorage) Get(namespace string, groupResource schema.GroupResource, name string) (ok bool, controller *autoscalingapi.Scale, err error) {
|
||||
key := scaleCacheKey{namespace: namespace, groupResource: groupResource, name: name}
|
||||
cc.mux.Lock()
|
||||
defer cc.mux.Unlock()
|
||||
r, ok := cc.cache[key]
|
||||
if ok {
|
||||
r.deleteAfter = now().Add(cc.lifeTime)
|
||||
cc.cache[key] = r
|
||||
}
|
||||
return ok, r.resource, r.err
|
||||
}
|
||||
|
||||
// If key is in the cache, refresh updates the cached value, error and refresh
|
||||
// time (but not time to remove).
|
||||
// If the key is missing from the cache does nothing (relevant when we're
|
||||
// concurrently updating cache and removing stale entries from it, to avoid
|
||||
// adding back an entry which we just removed).
|
||||
func (cc *controllerCacheStorage) Refresh(namespace string, groupResource schema.GroupResource, name string, controller *autoscalingapi.Scale, err error) {
|
||||
key := scaleCacheKey{namespace: namespace, groupResource: groupResource, name: name}
|
||||
cc.mux.Lock()
|
||||
defer cc.mux.Unlock()
|
||||
old, ok := cc.cache[key]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
// We refresh entries that are waiting to be removed. So when we refresh an
|
||||
// entry we mustn't change entries deleteAfter time (otherwise we risk never
|
||||
// removing entries that are not being read).
|
||||
cc.cache[key] = scaleCacheEntry{
|
||||
refreshAfter: now().Add(wait.Jitter(cc.validityTime, cc.jitterFactor)),
|
||||
deleteAfter: old.deleteAfter,
|
||||
resource: controller,
|
||||
err: err,
|
||||
}
|
||||
}
|
||||
|
||||
// If the key is missing from the cache, updates the cached value, error and refresh time (but not deleteAfter time).
|
||||
// If key is in the cache, does nothing (to make sure updating element doesn't change its deleteAfter time).
|
||||
func (cc *controllerCacheStorage) Insert(namespace string, groupResource schema.GroupResource, name string, controller *autoscalingapi.Scale, err error) {
|
||||
key := scaleCacheKey{namespace: namespace, groupResource: groupResource, name: name}
|
||||
cc.mux.Lock()
|
||||
defer cc.mux.Unlock()
|
||||
if _, ok := cc.cache[key]; ok {
|
||||
return
|
||||
}
|
||||
now := now()
|
||||
cc.cache[key] = scaleCacheEntry{
|
||||
refreshAfter: now.Add(wait.Jitter(cc.validityTime, cc.jitterFactor)),
|
||||
deleteAfter: now.Add(cc.lifeTime),
|
||||
resource: controller,
|
||||
err: err,
|
||||
}
|
||||
}
|
||||
|
||||
// Removes entries which we didn't read in a while from the cache.
|
||||
func (cc *controllerCacheStorage) RemoveExpired() {
|
||||
klog.V(1).Info("Removing entries from controllerCacheStorage")
|
||||
cc.mux.Lock()
|
||||
defer cc.mux.Unlock()
|
||||
now := now()
|
||||
removed := 0
|
||||
for k, v := range cc.cache {
|
||||
if now.After(v.deleteAfter) {
|
||||
removed += 1
|
||||
delete(cc.cache, k)
|
||||
}
|
||||
}
|
||||
klog.V(1).Infof("Removed %d entries from controllerCacheStorage", removed)
|
||||
}
|
||||
|
||||
// Returns a list of keys for which values need to be refreshed
|
||||
func (cc *controllerCacheStorage) GetKeysToRefresh() []scaleCacheKey {
|
||||
result := make([]scaleCacheKey, 0)
|
||||
cc.mux.Lock()
|
||||
defer cc.mux.Unlock()
|
||||
now := now()
|
||||
for k, v := range cc.cache {
|
||||
if now.After(v.refreshAfter) {
|
||||
result = append(result, k)
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func newControllerCacheStorage(validityTime, lifeTime time.Duration, jitterFactor float64) controllerCacheStorage {
|
||||
return controllerCacheStorage{
|
||||
cache: make(map[scaleCacheKey]scaleCacheEntry),
|
||||
validityTime: validityTime,
|
||||
jitterFactor: jitterFactor,
|
||||
lifeTime: lifeTime,
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,215 @@
|
|||
/*
|
||||
Copyright 2020 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package controllerfetcher
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
autoscalingapi "k8s.io/api/autoscaling/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
)
|
||||
|
||||
func getKey(key string) scaleCacheKey {
|
||||
return scaleCacheKey{
|
||||
namespace: "ns",
|
||||
groupResource: schema.GroupResource{
|
||||
Group: "group",
|
||||
Resource: "resource",
|
||||
},
|
||||
name: key,
|
||||
}
|
||||
}
|
||||
|
||||
func getScale() *autoscalingapi.Scale {
|
||||
return &autoscalingapi.Scale{}
|
||||
}
|
||||
|
||||
func TestControllerCache_InitiallyNotPresent(t *testing.T) {
|
||||
c := newControllerCacheStorage(time.Second, 10*time.Second, 1)
|
||||
key := getKey("foo")
|
||||
present, _, _ := c.Get(key.namespace, key.groupResource, key.name)
|
||||
assert.False(t, present)
|
||||
}
|
||||
|
||||
func TestControllerCache_Refresh_NotExisting(t *testing.T) {
|
||||
key := getKey("foo")
|
||||
c := newControllerCacheStorage(time.Second, 10*time.Second, 1)
|
||||
present, _, _ := c.Get(key.namespace, key.groupResource, key.name)
|
||||
assert.False(t, present)
|
||||
|
||||
// Refreshing key that isn't in the cache doesn't insert it
|
||||
c.Refresh(key.namespace, key.groupResource, key.name, getScale(), nil)
|
||||
present, _, _ = c.Get(key.namespace, key.groupResource, key.name)
|
||||
assert.False(t, present)
|
||||
}
|
||||
|
||||
func TestControllerCache_Insert(t *testing.T) {
|
||||
key := getKey("foo")
|
||||
c := newControllerCacheStorage(time.Second, 10*time.Second, 1)
|
||||
present, _, _ := c.Get(key.namespace, key.groupResource, key.name)
|
||||
assert.False(t, present)
|
||||
|
||||
c.Insert(key.namespace, key.groupResource, key.name, getScale(), nil)
|
||||
present, val, err := c.Get(key.namespace, key.groupResource, key.name)
|
||||
assert.True(t, present)
|
||||
assert.Equal(t, getScale(), val)
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
func TestControllerCache_InsertAndRefresh(t *testing.T) {
|
||||
key := getKey("foo")
|
||||
c := newControllerCacheStorage(time.Second, 10*time.Second, 1)
|
||||
present, _, _ := c.Get(key.namespace, key.groupResource, key.name)
|
||||
assert.False(t, present)
|
||||
|
||||
c.Insert(key.namespace, key.groupResource, key.name, getScale(), nil)
|
||||
present, val, err := c.Get(key.namespace, key.groupResource, key.name)
|
||||
assert.True(t, present)
|
||||
assert.Equal(t, getScale(), val)
|
||||
assert.Nil(t, err)
|
||||
|
||||
c.Refresh(key.namespace, key.groupResource, key.name, nil, fmt.Errorf("err"))
|
||||
present, val, err = c.Get(key.namespace, key.groupResource, key.name)
|
||||
assert.True(t, present)
|
||||
assert.Nil(t, val)
|
||||
assert.Errorf(t, err, "err")
|
||||
}
|
||||
|
||||
func TestControllerCache_InsertExistingKey(t *testing.T) {
|
||||
key := getKey("foo")
|
||||
c := newControllerCacheStorage(time.Second, 10*time.Second, 1)
|
||||
present, _, _ := c.Get(key.namespace, key.groupResource, key.name)
|
||||
assert.False(t, present)
|
||||
|
||||
c.Insert(key.namespace, key.groupResource, key.name, getScale(), nil)
|
||||
present, val, err := c.Get(key.namespace, key.groupResource, key.name)
|
||||
assert.True(t, present)
|
||||
assert.Equal(t, getScale(), val)
|
||||
assert.Nil(t, err)
|
||||
|
||||
// We might overwrite old values or keep them, either way should be fine.
|
||||
c.Insert(key.namespace, key.groupResource, key.name, nil, fmt.Errorf("err"))
|
||||
present, _, _ = c.Get(key.namespace, key.groupResource, key.name)
|
||||
assert.True(t, present)
|
||||
}
|
||||
|
||||
func TestControllerCache_GetRefreshesDeleteAfter(t *testing.T) {
|
||||
oldNow := now
|
||||
defer func() { now = oldNow }()
|
||||
startTime := oldNow()
|
||||
timeNow := startTime
|
||||
now = func() time.Time {
|
||||
return timeNow
|
||||
}
|
||||
|
||||
key := getKey("foo")
|
||||
c := newControllerCacheStorage(time.Second, 10*time.Second, 1)
|
||||
c.Insert(key.namespace, key.groupResource, key.name, nil, nil)
|
||||
assert.Equal(t, startTime.Add(10*time.Second), c.cache[key].deleteAfter)
|
||||
|
||||
timeNow = startTime.Add(5 * time.Second)
|
||||
c.Get(key.namespace, key.groupResource, key.name)
|
||||
assert.Equal(t, startTime.Add(15*time.Second), c.cache[key].deleteAfter)
|
||||
}
|
||||
|
||||
func assertTimeBetween(t *testing.T, got, expectAfter, expectBefore time.Time) {
|
||||
assert.True(t, got.After(expectAfter), "expected %v to be after %v", got, expectAfter)
|
||||
assert.False(t, got.After(expectBefore), "expected %v to not be after %v", got, expectBefore)
|
||||
}
|
||||
|
||||
func TestControllerCache_GetChangesLifeTimeNotFreshness(t *testing.T) {
|
||||
oldNow := now
|
||||
defer func() { now = oldNow }()
|
||||
startTime := oldNow()
|
||||
timeNow := startTime
|
||||
now = func() time.Time {
|
||||
return timeNow
|
||||
}
|
||||
|
||||
key := getKey("foo")
|
||||
c := newControllerCacheStorage(time.Second, 10*time.Second, 1)
|
||||
c.Insert(key.namespace, key.groupResource, key.name, nil, nil)
|
||||
cacheEntry := c.cache[key]
|
||||
// scheduled to delete 10s after insert
|
||||
assert.Equal(t, startTime.Add(10*time.Second), cacheEntry.deleteAfter)
|
||||
// scheduled to refresh (1-2)s after insert (with jitter)
|
||||
firstRefreshAfter := cacheEntry.refreshAfter
|
||||
assertTimeBetween(t, firstRefreshAfter, startTime.Add(time.Second), startTime.Add(2*time.Second))
|
||||
|
||||
timeNow = startTime.Add(5 * time.Second)
|
||||
c.Get(key.namespace, key.groupResource, key.name)
|
||||
cacheEntry = c.cache[key]
|
||||
// scheduled to delete 10s after get (15s after insert)
|
||||
assert.Equal(t, startTime.Add(15*time.Second), cacheEntry.deleteAfter)
|
||||
// refresh the same as before calling Get
|
||||
assert.Equal(t, firstRefreshAfter, cacheEntry.refreshAfter)
|
||||
}
|
||||
|
||||
func TestControllerCache_GetKeysToRefresh(t *testing.T) {
|
||||
oldNow := now
|
||||
defer func() { now = oldNow }()
|
||||
startTime := oldNow()
|
||||
timeNow := startTime
|
||||
now = func() time.Time {
|
||||
return timeNow
|
||||
}
|
||||
|
||||
key1 := getKey("foo")
|
||||
c := newControllerCacheStorage(time.Second, 10*time.Second, 1)
|
||||
c.Insert(key1.namespace, key1.groupResource, key1.name, nil, nil)
|
||||
cacheEntry := c.cache[key1]
|
||||
// scheduled to refresh (1-2)s after insert (with jitter)
|
||||
refreshAfter := cacheEntry.refreshAfter
|
||||
assertTimeBetween(t, refreshAfter, startTime.Add(time.Second), startTime.Add(2*time.Second))
|
||||
|
||||
timeNow = startTime.Add(5 * time.Second)
|
||||
key2 := getKey("bar")
|
||||
c.Insert(key2.namespace, key2.groupResource, key2.name, nil, nil)
|
||||
cacheEntry = c.cache[key2]
|
||||
// scheduled to refresh (1-2)s after insert (with jitter)
|
||||
refreshAfter = cacheEntry.refreshAfter
|
||||
assertTimeBetween(t, refreshAfter, startTime.Add(6*time.Second), startTime.Add(7*time.Second))
|
||||
|
||||
assert.ElementsMatch(t, []scaleCacheKey{key1}, c.GetKeysToRefresh())
|
||||
}
|
||||
|
||||
func TestControllerCache_Clear(t *testing.T) {
|
||||
oldNow := now
|
||||
defer func() { now = oldNow }()
|
||||
startTime := oldNow()
|
||||
timeNow := startTime
|
||||
now = func() time.Time {
|
||||
return timeNow
|
||||
}
|
||||
|
||||
key1 := getKey("foo")
|
||||
c := newControllerCacheStorage(time.Second, 10*time.Second, 1)
|
||||
c.Insert(key1.namespace, key1.groupResource, key1.name, nil, nil)
|
||||
assert.Equal(t, startTime.Add(10*time.Second), c.cache[key1].deleteAfter)
|
||||
|
||||
timeNow = startTime.Add(15 * time.Second)
|
||||
key2 := getKey("bar")
|
||||
c.Insert(key2.namespace, key2.groupResource, key2.name, nil, nil)
|
||||
assert.Equal(t, startTime.Add(25*time.Second), c.cache[key2].deleteAfter)
|
||||
|
||||
c.RemoveExpired()
|
||||
assert.Equal(t, 1, len(c.cache))
|
||||
assert.Contains(t, c.cache, key2)
|
||||
}
|
||||
|
|
@ -22,6 +22,7 @@ import (
|
|||
"time"
|
||||
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
autoscalingapi "k8s.io/api/autoscaling/v1"
|
||||
batchv1 "k8s.io/api/batch/v1"
|
||||
batchv1beta1 "k8s.io/api/batch/v1beta1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
|
|
@ -79,13 +80,36 @@ type ControllerFetcher interface {
|
|||
}
|
||||
|
||||
type controllerFetcher struct {
|
||||
scaleNamespacer scale.ScalesGetter
|
||||
mapper apimeta.RESTMapper
|
||||
informersMap map[wellKnownController]cache.SharedIndexInformer
|
||||
scaleNamespacer scale.ScalesGetter
|
||||
mapper apimeta.RESTMapper
|
||||
informersMap map[wellKnownController]cache.SharedIndexInformer
|
||||
scaleSubresourceCacheStorage controllerCacheStorage
|
||||
}
|
||||
|
||||
func (f *controllerFetcher) periodicallyRefreshCache(ctx context.Context, period time.Duration) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(period):
|
||||
keysToRefresh := f.scaleSubresourceCacheStorage.GetKeysToRefresh()
|
||||
klog.Info("Starting to refresh entries in controllerFetchers scaleSubresourceCacheStorage")
|
||||
for _, item := range keysToRefresh {
|
||||
scale, err := f.scaleNamespacer.Scales(item.namespace).Get(context.TODO(), item.groupResource, item.name, metav1.GetOptions{})
|
||||
f.scaleSubresourceCacheStorage.Refresh(item.namespace, item.groupResource, item.name, scale, err)
|
||||
}
|
||||
klog.Infof("Finished refreshing %d entries in controllerFetchers scaleSubresourceCacheStorage", len(keysToRefresh))
|
||||
f.scaleSubresourceCacheStorage.RemoveExpired()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (f *controllerFetcher) Start(ctx context.Context, loopPeriod time.Duration) {
|
||||
go f.periodicallyRefreshCache(ctx, loopPeriod)
|
||||
}
|
||||
|
||||
// NewControllerFetcher returns a new instance of controllerFetcher
|
||||
func NewControllerFetcher(config *rest.Config, kubeClient kube_client.Interface, factory informers.SharedInformerFactory) ControllerFetcher {
|
||||
func NewControllerFetcher(config *rest.Config, kubeClient kube_client.Interface, factory informers.SharedInformerFactory, betweenRefreshes, lifeTime time.Duration, jitterFactor float64) *controllerFetcher {
|
||||
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
|
||||
if err != nil {
|
||||
klog.Fatalf("Could not create discoveryClient: %v", err)
|
||||
|
|
@ -121,9 +145,10 @@ func NewControllerFetcher(config *rest.Config, kubeClient kube_client.Interface,
|
|||
|
||||
scaleNamespacer := scale.New(restClient, mapper, dynamic.LegacyAPIPathResolverFunc, resolver)
|
||||
return &controllerFetcher{
|
||||
scaleNamespacer: scaleNamespacer,
|
||||
mapper: mapper,
|
||||
informersMap: informersMap,
|
||||
scaleNamespacer: scaleNamespacer,
|
||||
mapper: mapper,
|
||||
informersMap: informersMap,
|
||||
scaleSubresourceCacheStorage: newControllerCacheStorage(betweenRefreshes, lifeTime, jitterFactor),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -248,6 +273,15 @@ func (f *controllerFetcher) isWellKnown(key *ControllerKeyWithAPIVersion) bool {
|
|||
return exists
|
||||
}
|
||||
|
||||
func (f *controllerFetcher) getScaleForResource(namespace string, groupResource schema.GroupResource, name string) (controller *autoscalingapi.Scale, err error) {
|
||||
if ok, scale, err := f.scaleSubresourceCacheStorage.Get(namespace, groupResource, name); ok {
|
||||
return scale, err
|
||||
}
|
||||
scale, err := f.scaleNamespacer.Scales(namespace).Get(context.TODO(), groupResource, name, metav1.GetOptions{})
|
||||
f.scaleSubresourceCacheStorage.Insert(namespace, groupResource, name, scale, err)
|
||||
return scale, err
|
||||
}
|
||||
|
||||
func (f *controllerFetcher) isWellKnownOrScalable(key *ControllerKeyWithAPIVersion) bool {
|
||||
if f.isWellKnown(key) {
|
||||
return true
|
||||
|
|
@ -271,7 +305,7 @@ func (f *controllerFetcher) isWellKnownOrScalable(key *ControllerKeyWithAPIVersi
|
|||
|
||||
for _, mapping := range mappings {
|
||||
groupResource := mapping.Resource.GroupResource()
|
||||
scale, err := f.scaleNamespacer.Scales(key.Namespace).Get(context.TODO(), groupResource, key.Name, metav1.GetOptions{})
|
||||
scale, err := f.getScaleForResource(key.Namespace, groupResource, key.Name)
|
||||
if err == nil && scale != nil {
|
||||
return true
|
||||
}
|
||||
|
|
@ -293,7 +327,7 @@ func (f *controllerFetcher) getOwnerForScaleResource(groupKind schema.GroupKind,
|
|||
var lastError error
|
||||
for _, mapping := range mappings {
|
||||
groupResource := mapping.Resource.GroupResource()
|
||||
scale, err := f.scaleNamespacer.Scales(namespace).Get(context.TODO(), groupResource, name, metav1.GetOptions{})
|
||||
scale, err := f.getScaleForResource(namespace, groupResource, name)
|
||||
if err == nil {
|
||||
return getOwnerController(scale.OwnerReferences, namespace), nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -44,6 +44,7 @@ var trueVar = true
|
|||
func simpleControllerFetcher() *controllerFetcher {
|
||||
f := controllerFetcher{}
|
||||
f.informersMap = make(map[wellKnownController]cache.SharedIndexInformer)
|
||||
f.scaleSubresourceCacheStorage = newControllerCacheStorage(time.Second, time.Minute, 0.1)
|
||||
versioned := map[string][]metav1.APIResource{
|
||||
"Foo": {{Kind: "Foo", Name: "bah", Group: "foo"}, {Kind: "Scale", Name: "iCanScale", Group: "foo"}},
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue