Merge pull request #132355 from serathius/apf_estimate_size

Estimate average size of objects in etcd and plug it into request cost estimator

Kubernetes-commit: 025636181393cc95bf78a48238fff5c8b05d6404
This commit is contained in:
Kubernetes Publisher 2025-06-26 05:54:28 -07:00
commit 895f2b3947
23 changed files with 681 additions and 128 deletions

View File

@ -176,6 +176,11 @@ const (
// if the generated name conflicts with an existing resource name, up to a maximum number of 7 retries.
RetryGenerateName featuregate.Feature = "RetryGenerateName"
// owner: @serathius
//
// Enables APF to use size of objects for estimating request cost.
SizeBasedListCostEstimate featuregate.Feature = "SizeBasedListCostEstimate"
// owner: @cici37
//
// StrictCostEnforcementForVAP is used to apply strict CEL cost validation for ValidatingAdmissionPolicy.
@ -362,6 +367,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
{Version: version.MustParse("1.33"), Default: false, PreRelease: featuregate.Deprecated},
},
SizeBasedListCostEstimate: {
{Version: version.MustParse("1.34"), Default: true, PreRelease: featuregate.Beta},
},
StorageVersionAPI: {
{Version: version.MustParse("1.20"), Default: false, PreRelease: featuregate.Alpha},
},

View File

@ -107,8 +107,8 @@ func (s *DryRunnableStorage) GuaranteedUpdate(
return s.Storage.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, cachedExistingObject)
}
func (s *DryRunnableStorage) Count(ctx context.Context, key string) (int64, error) {
return s.Storage.Count(ctx, key)
func (s *DryRunnableStorage) Stats(ctx context.Context) (storage.Stats, error) {
return s.Storage.Stats(ctx)
}
func (s *DryRunnableStorage) copyInto(in, out runtime.Object) error {

View File

@ -1667,15 +1667,15 @@ func (e *Store) startObservingCount(period time.Duration, objectCountTracker flo
klog.V(2).InfoS("Monitoring resource count at path", "resource", resourceName, "path", "<storage-prefix>/"+prefix)
stopCh := make(chan struct{})
go wait.JitterUntil(func() {
count, err := e.Storage.Count(ctx, prefix)
stats, err := e.Storage.Stats(ctx)
if err != nil {
klog.V(5).InfoS("Failed to update storage count metric", "err", err)
count = -1
stats.ObjectCount = -1
}
metrics.UpdateObjectCount(e.DefaultQualifiedResource, count)
metrics.UpdateObjectCount(e.DefaultQualifiedResource, stats.ObjectCount)
if objectCountTracker != nil {
objectCountTracker.Set(resourceName, count)
objectCountTracker.Set(resourceName, stats)
}
}, period, resourceCountPollPeriodJitter, true, stopCh)
return func() { close(stopCh) }

View File

@ -455,7 +455,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
}, time.Second, stopCh,
)
}()
config.Storage.SetKeysFunc(cacher.getKeys)
return cacher, nil
}
@ -1271,6 +1271,14 @@ func (c *Cacher) setInitialEventsEndBookmarkIfRequested(cacheInterval *watchCach
}
}
func (c *Cacher) getKeys(ctx context.Context) ([]string, error) {
rev, err := c.storage.GetCurrentResourceVersion(ctx)
if err != nil {
return nil, err
}
return c.watchCache.WaitUntilFreshAndGetKeys(ctx, rev)
}
func (c *Cacher) Ready() bool {
_, err := c.ready.check()
return err == nil

View File

@ -38,6 +38,7 @@ import (
"k8s.io/apiserver/pkg/storage"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
storagetesting "k8s.io/apiserver/pkg/storage/testing"
"k8s.io/apiserver/pkg/storage/value/encrypt/identity"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/cache"
featuregatetesting "k8s.io/component-base/featuregate/testing"
@ -279,10 +280,15 @@ func TestTransformationFailure(t *testing.T) {
// TODO(#109831): Enable use of this test and run it.
}
func TestCount(t *testing.T) {
ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate)
storagetesting.RunTestCount(ctx, t, cacher)
func TestStats(t *testing.T) {
for _, sizeBasedListCostEstimate := range []bool{true, false} {
t.Run(fmt.Sprintf("SizeBasedListCostEstimate=%v", sizeBasedListCostEstimate), func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SizeBasedListCostEstimate, sizeBasedListCostEstimate)
ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate)
storagetesting.RunTestStats(ctx, t, cacher, codecs.LegacyCodec(examplev1.SchemeGroupVersion), identity.NewEncryptCheckTransformer(), sizeBasedListCostEstimate)
})
}
}
func TestWatch(t *testing.T) {
@ -640,3 +646,18 @@ func BenchmarkStoreList(b *testing.B) {
})
}
}
func BenchmarkStoreStats(b *testing.B) {
klog.SetLogger(logr.Discard())
data := storagetesting.PrepareBenchchmarkData(50, 3_000, 5_000)
ctx, cacher, _, terminate := testSetupWithEtcdServer(b)
b.Cleanup(terminate)
var out example.Pod
for _, pod := range data.Pods {
err := cacher.Create(ctx, computePodKey(pod), pod, &out, 0)
if err != nil {
b.Fatal(err)
}
}
storagetesting.RunBenchmarkStoreStats(ctx, b, cacher)
}

View File

@ -71,6 +71,7 @@ func newEtcdTestStorage(t testing.TB, prefix string) (*etcd3testing.EtcdTestServ
etcd3.NewDefaultLeaseManagerConfig(),
etcd3.NewDefaultDecoder(codec, versioner),
versioner)
t.Cleanup(storage.Close)
return server, storage
}

View File

@ -189,8 +189,10 @@ func (d *dummyStorage) GetList(ctx context.Context, resPrefix string, opts stora
func (d *dummyStorage) GuaranteedUpdate(_ context.Context, _ string, _ runtime.Object, _ bool, _ *storage.Preconditions, _ storage.UpdateFunc, _ runtime.Object) error {
return fmt.Errorf("unimplemented")
}
func (d *dummyStorage) Count(_ context.Context, _ string) (int64, error) {
return 0, fmt.Errorf("unimplemented")
func (d *dummyStorage) Stats(_ context.Context) (storage.Stats, error) {
return storage.Stats{}, fmt.Errorf("unimplemented")
}
func (d *dummyStorage) SetKeysFunc(storage.KeysFunc) {
}
func (d *dummyStorage) ReadinessCheck() error {
return nil

View File

@ -99,6 +99,10 @@ func (c *CacheDelegator) GetCurrentResourceVersion(ctx context.Context) (uint64,
return c.storage.GetCurrentResourceVersion(ctx)
}
func (c *CacheDelegator) SetKeysFunc(keys storage.KeysFunc) {
c.storage.SetKeysFunc(keys)
}
func (c *CacheDelegator) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object, opts storage.DeleteOptions) error {
// Ignore the suggestion and try to pass down the current version of the object
// read from cache.
@ -257,8 +261,8 @@ func (c *CacheDelegator) GuaranteedUpdate(ctx context.Context, key string, desti
return c.storage.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, nil)
}
func (c *CacheDelegator) Count(ctx context.Context, pathPrefix string) (int64, error) {
return c.storage.Count(ctx, pathPrefix)
func (c *CacheDelegator) Stats(ctx context.Context) (storage.Stats, error) {
return c.storage.Stats(ctx)
}
func (c *CacheDelegator) ReadinessCheck() error {

View File

@ -531,6 +531,24 @@ func (c *watchCache) waitUntilFreshAndGetList(ctx context.Context, key string, o
return listResp{ResourceVersion: readResourceVersion}, "", nil
}
// WaitUntilFreshAndList returns list of pointers to `storeElement` objects along
// with their ResourceVersion and the name of the index, if any, that was used.
func (w *watchCache) WaitUntilFreshAndGetKeys(ctx context.Context, resourceVersion uint64) (keys []string, err error) {
if delegator.ConsistentReadSupported() && w.notFresh(resourceVersion) {
w.waitingUntilFresh.Add()
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
w.waitingUntilFresh.Remove()
} else {
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
}
defer w.RUnlock()
if err != nil {
return nil, err
}
return w.store.ListKeys(), nil
}
// NOTICE: Structure follows the shouldDelegateList function in
// staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go
func (w *watchCache) waitUntilFreshAndList(ctx context.Context, key string, opts storage.ListOptions) (resp listResp, index string, err error) {

198
pkg/storage/etcd3/stats.go Normal file
View File

@ -0,0 +1,198 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"context"
"strings"
"sync"
"sync/atomic"
"time"
"go.etcd.io/etcd/api/v3/mvccpb"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/storage"
"k8s.io/klog/v2"
)
const sizerRefreshInterval = time.Minute
func newStatsCache(prefix string, getKeys storage.KeysFunc) *statsCache {
if prefix[len(prefix)-1] != '/' {
prefix += "/"
}
sc := &statsCache{
prefix: prefix,
getKeys: getKeys,
stop: make(chan struct{}),
keys: make(map[string]sizeRevision),
}
sc.wg.Add(1)
go func() {
defer sc.wg.Done()
sc.run()
}()
return sc
}
// statsCache efficiently estimates the average object size
// based on the last observed state of individual keys.
// By plugging statsCache into GetList and Watch functions,
// a fairly accurate estimate of object sizes can be maintained
// without additional requests to the underlying storage.
// To handle potential out-of-order or incomplete data,
// it uses a per-key revision to identify the newer state.
// This approach may leak keys if delete events are not observed,
// thus we run a background goroutine to periodically cleanup keys if needed.
type statsCache struct {
prefix string
getKeys storage.KeysFunc
stop chan struct{}
wg sync.WaitGroup
lastKeyCleanup atomic.Pointer[time.Time]
lock sync.Mutex
keys map[string]sizeRevision
}
type sizeRevision struct {
sizeBytes int64
revision int64
}
func (sc *statsCache) Stats(ctx context.Context) (storage.Stats, error) {
keys, err := sc.getKeys(ctx)
if err != nil {
return storage.Stats{}, err
}
stats := storage.Stats{
ObjectCount: int64(len(keys)),
}
sc.lock.Lock()
defer sc.lock.Unlock()
sc.cleanKeys(keys)
if len(sc.keys) != 0 {
stats.EstimatedAverageObjectSizeBytes = sc.keySizes() / int64(len(sc.keys))
}
return stats, nil
}
func (sc *statsCache) SetKeysFunc(keys storage.KeysFunc) {
sc.getKeys = keys
}
func (sc *statsCache) Close() {
close(sc.stop)
sc.wg.Wait()
}
func (sc *statsCache) run() {
err := wait.PollUntilContextCancel(wait.ContextForChannel(sc.stop), sizerRefreshInterval, false, func(ctx context.Context) (done bool, err error) {
sc.cleanKeysIfNeeded(ctx)
return false, nil
})
if err != nil {
klog.InfoS("Sizer exiting")
}
}
func (sc *statsCache) cleanKeysIfNeeded(ctx context.Context) {
lastKeyCleanup := sc.lastKeyCleanup.Load()
if lastKeyCleanup != nil && time.Since(*lastKeyCleanup) < sizerRefreshInterval {
return
}
// Don't execute getKeys under lock.
keys, err := sc.getKeys(ctx)
if err != nil {
klog.InfoS("Error getting keys", "err", err)
}
sc.lock.Lock()
defer sc.lock.Unlock()
sc.cleanKeys(keys)
}
func (sc *statsCache) cleanKeys(keepKeys []string) {
newKeys := make(map[string]sizeRevision, len(keepKeys))
for _, key := range keepKeys {
// Handle cacher keys not having prefix.
if !strings.HasPrefix(key, sc.prefix) {
startIndex := 0
if key[0] == '/' {
startIndex = 1
}
key = sc.prefix + key[startIndex:]
}
keySizeRevision, ok := sc.keys[key]
if !ok {
continue
}
newKeys[key] = keySizeRevision
}
sc.keys = newKeys
now := time.Now()
sc.lastKeyCleanup.Store(&now)
}
func (sc *statsCache) keySizes() (totalSize int64) {
for _, sizeRevision := range sc.keys {
totalSize += sizeRevision.sizeBytes
}
return totalSize
}
func (sc *statsCache) Update(kvs []*mvccpb.KeyValue) {
sc.lock.Lock()
defer sc.lock.Unlock()
for _, kv := range kvs {
sc.updateKey(kv)
}
}
func (sc *statsCache) UpdateKey(kv *mvccpb.KeyValue) {
sc.lock.Lock()
defer sc.lock.Unlock()
sc.updateKey(kv)
}
func (sc *statsCache) updateKey(kv *mvccpb.KeyValue) {
key := string(kv.Key)
keySizeRevision := sc.keys[key]
if keySizeRevision.revision >= kv.ModRevision {
return
}
sc.keys[key] = sizeRevision{
sizeBytes: int64(len(kv.Value)),
revision: kv.ModRevision,
}
}
func (sc *statsCache) DeleteKey(kv *mvccpb.KeyValue) {
sc.lock.Lock()
defer sc.lock.Unlock()
key := string(kv.Key)
keySizeRevision := sc.keys[key]
if keySizeRevision.revision >= kv.ModRevision {
return
}
delete(sc.keys, key)
}

View File

@ -0,0 +1,121 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/api/v3/mvccpb"
)
func TestStatsCache(t *testing.T) {
ctx := t.Context()
store := newStatsCache("/prefix", func(ctx context.Context) ([]string, error) { return []string{}, nil })
defer store.Close()
stats, err := store.Stats(ctx)
require.NoError(t, err)
assert.Equal(t, int64(0), stats.EstimatedAverageObjectSizeBytes)
store.UpdateKey(&mvccpb.KeyValue{Key: []byte("/prefix/foo1"), Value: []byte("0123456789"), ModRevision: 2})
store.getKeys = func(ctx context.Context) ([]string, error) { return []string{"/prefix/foo1"}, nil }
stats, err = store.Stats(ctx)
require.NoError(t, err)
assert.Equal(t, int64(10), stats.EstimatedAverageObjectSizeBytes)
store.UpdateKey(&mvccpb.KeyValue{Key: []byte("/prefix/foo2"), Value: []byte("01234567890123456789"), ModRevision: 3})
store.getKeys = func(ctx context.Context) ([]string, error) { return []string{"/prefix/foo1", "/prefix/foo2"}, nil }
stats, err = store.Stats(ctx)
require.NoError(t, err)
assert.Equal(t, int64(15), stats.EstimatedAverageObjectSizeBytes)
store.UpdateKey(&mvccpb.KeyValue{Key: []byte("/prefix/foo1"), Value: []byte("012345678901234567890123456789"), ModRevision: 4})
stats, err = store.Stats(ctx)
require.NoError(t, err)
assert.Equal(t, int64(25), stats.EstimatedAverageObjectSizeBytes)
store.UpdateKey(&mvccpb.KeyValue{Key: []byte("/prefix/foo2"), Value: []byte("0123456789"), ModRevision: 5})
stats, err = store.Stats(ctx)
require.NoError(t, err)
assert.Equal(t, int64(20), stats.EstimatedAverageObjectSizeBytes)
store.DeleteKey(&mvccpb.KeyValue{Key: []byte("/prefix/foo1"), ModRevision: 6})
store.getKeys = func(ctx context.Context) ([]string, error) { return []string{"/prefix/foo2"}, nil }
stats, err = store.Stats(ctx)
require.NoError(t, err)
assert.Equal(t, int64(10), stats.EstimatedAverageObjectSizeBytes)
// Snapshot revision from revision 3
store.Update([]*mvccpb.KeyValue{
{Key: []byte("/prefix/foo1"), Value: []byte("0123456789"), ModRevision: 2},
{Key: []byte("/prefix/foo2"), Value: []byte("01234567890123456789"), ModRevision: 3},
})
stats, err = store.Stats(ctx)
require.NoError(t, err)
assert.Equal(t, int64(10), stats.EstimatedAverageObjectSizeBytes)
// Replay from revision 2
store.UpdateKey(&mvccpb.KeyValue{Key: []byte("/prefix/foo1"), Value: []byte("0123456789"), ModRevision: 2})
stats, err = store.Stats(ctx)
require.NoError(t, err)
assert.Equal(t, int64(10), stats.EstimatedAverageObjectSizeBytes)
store.UpdateKey(&mvccpb.KeyValue{Key: []byte("/prefix/foo2"), Value: []byte("01234567890123456789"), ModRevision: 3})
stats, err = store.Stats(ctx)
require.NoError(t, err)
assert.Equal(t, int64(10), stats.EstimatedAverageObjectSizeBytes)
store.UpdateKey(&mvccpb.KeyValue{Key: []byte("/prefix/foo1"), Value: []byte("012345678901234567890123456789"), ModRevision: 4})
stats, err = store.Stats(ctx)
require.NoError(t, err)
assert.Equal(t, int64(10), stats.EstimatedAverageObjectSizeBytes)
store.UpdateKey(&mvccpb.KeyValue{Key: []byte("/prefix/foo2"), Value: []byte("0123456789"), ModRevision: 5})
stats, err = store.Stats(ctx)
require.NoError(t, err)
assert.Equal(t, int64(10), stats.EstimatedAverageObjectSizeBytes)
store.DeleteKey(&mvccpb.KeyValue{Key: []byte("/prefix/foo1"), ModRevision: 6})
stats, err = store.Stats(ctx)
require.NoError(t, err)
assert.Equal(t, int64(10), stats.EstimatedAverageObjectSizeBytes)
store.DeleteKey(&mvccpb.KeyValue{Key: []byte("/prefix/foo1"), ModRevision: 7})
store.getKeys = func(ctx context.Context) ([]string, error) { return []string{}, nil }
stats, err = store.Stats(ctx)
require.NoError(t, err)
assert.Equal(t, int64(0), stats.EstimatedAverageObjectSizeBytes)
// Old snapshot might restore old revision if keys were recreated
store.getKeys = func(ctx context.Context) ([]string, error) { return []string{"foo1", "foo2"}, nil }
store.Update([]*mvccpb.KeyValue{
{Key: []byte("/prefix/foo1"), Value: []byte("0123456789"), ModRevision: 2},
{Key: []byte("/prefix/foo2"), Value: []byte("01234567890123456789"), ModRevision: 3},
})
stats, err = store.Stats(ctx)
require.NoError(t, err)
assert.Equal(t, int64(15), stats.EstimatedAverageObjectSizeBytes)
// Cleanup if keys were deleted
store.getKeys = func(ctx context.Context) ([]string, error) { return []string{}, nil }
stats, err = store.Stats(ctx)
require.NoError(t, err)
assert.Equal(t, int64(0), stats.EstimatedAverageObjectSizeBytes)
}

View File

@ -90,6 +90,7 @@ type store struct {
resourcePrefix string
newListFunc func() runtime.Object
stats *statsCache
}
var _ storage.Interface = (*store)(nil)
@ -183,6 +184,11 @@ func New(c *kubernetes.Client, codec runtime.Codec, newFunc, newListFunc func()
resourcePrefix: resourcePrefix,
newListFunc: newListFunc,
}
if utilfeature.DefaultFeatureGate.Enabled(features.SizeBasedListCostEstimate) {
stats := newStatsCache(pathPrefix, s.getKeys)
s.stats = stats
w.stats = stats
}
w.getCurrentStorageRV = func(ctx context.Context) (uint64, error) {
return s.GetCurrentResourceVersion(ctx)
@ -198,6 +204,12 @@ func (s *store) Versioner() storage.Versioner {
return s.versioner
}
func (s *store) Close() {
if s.stats != nil {
s.stats.Close()
}
}
// Get implements storage.Interface.Get.
func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, out runtime.Object) error {
preparedKey, err := s.prepareKey(key)
@ -608,26 +620,36 @@ func getNewItemFunc(listObj runtime.Object, v reflect.Value) func() runtime.Obje
}
}
func (s *store) Count(ctx context.Context, key string) (int64, error) {
preparedKey, err := s.prepareKey(key)
if err != nil {
return 0, err
func (s *store) Stats(ctx context.Context) (stats storage.Stats, err error) {
if s.stats != nil {
return s.stats.Stats(ctx)
}
// We need to make sure the key ended with "/" so that we only get children "directories".
// e.g. if we have key "/a", "/a/b", "/ab", getting keys with prefix "/a" will return all three,
// while with prefix "/a/" will return only "/a/b" which is the correct answer.
if !strings.HasSuffix(preparedKey, "/") {
preparedKey += "/"
}
startTime := time.Now()
count, err := s.client.Kubernetes.Count(ctx, preparedKey, kubernetes.CountOptions{})
count, err := s.client.Kubernetes.Count(ctx, s.pathPrefix, kubernetes.CountOptions{})
metrics.RecordEtcdRequest("listWithCount", s.groupResource, err, startTime)
if err != nil {
return 0, err
return storage.Stats{}, err
}
return count, nil
return storage.Stats{
ObjectCount: count,
}, nil
}
func (s *store) SetKeysFunc(keys storage.KeysFunc) {
if s.stats != nil {
s.stats.SetKeysFunc(keys)
}
}
func (s *store) getKeys(ctx context.Context) ([]string, error) {
startTime := time.Now()
resp, err := s.client.KV.Get(ctx, s.pathPrefix, clientv3.WithPrefix(), clientv3.WithKeysOnly())
metrics.RecordEtcdRequest("listOnlyKeys", s.groupResource, err, startTime)
keys := make([]string, 0, len(resp.Kvs))
for _, kv := range resp.Kvs {
keys = append(keys, string(kv.Key))
}
return keys, nil
}
// ReadinessCheck implements storage.Interface.
@ -766,6 +788,9 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
} else {
growSlice(v, 2048, len(getResp.Kvs))
}
if s.stats != nil {
s.stats.Update(getResp.Kvs)
}
// take items from the response until the bucket is full, filtering as we go
for i, kv := range getResp.Kvs {

View File

@ -43,10 +43,13 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3/testserver"
storagetesting "k8s.io/apiserver/pkg/storage/testing"
"k8s.io/apiserver/pkg/storage/value"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2"
)
@ -343,9 +346,15 @@ func TestListResourceVersionMatch(t *testing.T) {
storagetesting.RunTestListResourceVersionMatch(ctx, t, &storeWithPrefixTransformer{store})
}
func TestCount(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunTestCount(ctx, t, store)
func TestStats(t *testing.T) {
for _, sizeBasedListCostEstimate := range []bool{true, false} {
t.Run(fmt.Sprintf("SizeBasedListCostEstimate=%v", sizeBasedListCostEstimate), func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SizeBasedListCostEstimate, sizeBasedListCostEstimate)
// Match transformer with cacher tests.
ctx, store, _ := testSetup(t)
storagetesting.RunTestStats(ctx, t, store, store.codec, store.transformer, sizeBasedListCostEstimate)
})
}
}
// =======================================================================
@ -591,6 +600,7 @@ func testSetup(t testing.TB, opts ...setupOption) (context.Context, *store, *kub
NewDefaultDecoder(setupOpts.codec, versioner),
versioner,
)
t.Cleanup(store.Close)
ctx := context.Background()
return ctx, store, client
}
@ -666,8 +676,6 @@ func TestInvalidKeys(t *testing.T) {
expectInvalidKey("Get", store.Get(ctx, invalidKey, storage.GetOptions{}, nil))
expectInvalidKey("GetList", store.GetList(ctx, invalidKey, storage.ListOptions{}, nil))
expectInvalidKey("GuaranteedUpdate", store.GuaranteedUpdate(ctx, invalidKey, nil, true, nil, nil, nil))
_, countErr := store.Count(t.Context(), invalidKey)
expectInvalidKey("Count", countErr)
}
func BenchmarkStore_GetList(b *testing.B) {
@ -836,18 +844,21 @@ func BenchmarkStoreList(b *testing.B) {
},
}
for _, dims := range dimensions {
b.Run(fmt.Sprintf("Namespaces=%d/Pods=%d/Nodes=%d", dims.namespaceCount, dims.namespaceCount*dims.podPerNamespaceCount, dims.nodeCount), func(b *testing.B) {
data := storagetesting.PrepareBenchchmarkData(dims.namespaceCount, dims.podPerNamespaceCount, dims.nodeCount)
ctx, store, _ := testSetup(b)
var out example.Pod
for _, pod := range data.Pods {
err := store.Create(ctx, computePodKey(pod), pod, &out, 0)
if err != nil {
b.Fatal(err)
for _, sizeBasedEnabled := range []bool{true, false} {
featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, features.SizeBasedListCostEstimate, sizeBasedEnabled)
b.Run(fmt.Sprintf("SizeBasedListCostEstimate=%v/Namespaces=%d/Pods=%d/Nodes=%d", sizeBasedEnabled, dims.namespaceCount, dims.namespaceCount*dims.podPerNamespaceCount, dims.nodeCount), func(b *testing.B) {
data := storagetesting.PrepareBenchchmarkData(dims.namespaceCount, dims.podPerNamespaceCount, dims.nodeCount)
ctx, store, _ := testSetup(b)
var out example.Pod
for _, pod := range data.Pods {
err := store.Create(ctx, computePodKey(pod), pod, &out, 0)
if err != nil {
b.Fatal(err)
}
}
}
storagetesting.RunBenchmarkStoreList(ctx, b, store, data, false)
})
storagetesting.RunBenchmarkStoreList(ctx, b, store, data, false)
})
}
}
}
@ -900,3 +911,54 @@ func TestGetCurrentResourceVersion(t *testing.T) {
require.NoError(t, err)
require.Equal(t, currentPodRV, podRV, "didn't expect to see the pod's RV changed")
}
func BenchmarkStoreStats(b *testing.B) {
klog.SetLogger(logr.Discard())
data := storagetesting.PrepareBenchchmarkData(50, 3_000, 5_000)
ctx, store, _ := testSetup(b)
var out example.Pod
for _, pod := range data.Pods {
err := store.Create(ctx, computePodKey(pod), pod, &out, 0)
if err != nil {
b.Fatal(err)
}
}
storagetesting.RunBenchmarkStoreStats(ctx, b, store)
}
// BenchmarkStatsCacheCleanKeys measures execution time of cleanupKeys which is important for watch latency as blocks watch updates.
func BenchmarkStatsCacheCleanKeys(b *testing.B) {
klog.SetLogger(logr.Discard())
namespaceCount := 50
podPerNamespaceCount := 3_000
data := storagetesting.PrepareBenchchmarkData(namespaceCount, podPerNamespaceCount, 5_000)
ctx, store, _ := testSetup(b)
var out example.Pod
for _, pod := range data.Pods {
err := store.Create(ctx, computePodKey(pod), pod, &out, 0)
if err != nil {
b.Fatal(err)
}
}
// List to fetch object sizes for statsCache.
listOut := &example.PodList{}
err := store.GetList(ctx, "/pods/", storage.ListOptions{Recursive: true, Predicate: storage.Everything}, listOut)
if err != nil {
b.Fatal(err)
}
if len(store.stats.keys) < namespaceCount*podPerNamespaceCount {
b.Fatalf("Unexpected number of keys in stats, want: %d, got: %d", namespaceCount*podPerNamespaceCount, len(store.stats.keys))
}
// Get keys to measure only cleanupKeys time
keys, err := store.getKeys(ctx)
if err != nil {
b.Fatal(err)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
store.stats.cleanKeys(keys)
}
if len(store.stats.keys) < namespaceCount*podPerNamespaceCount {
b.Fatalf("Unexpected number of keys in stats, want: %d, got: %d", namespaceCount*podPerNamespaceCount, len(store.stats.keys))
}
}

View File

@ -77,6 +77,7 @@ type watcher struct {
versioner storage.Versioner
transformer value.Transformer
getCurrentStorageRV func(context.Context) (uint64, error)
stats *statsCache
}
// watchChan implements watch.Interface.
@ -91,6 +92,7 @@ type watchChan struct {
cancel context.CancelFunc
incomingEventChan chan *event
resultChan chan watch.Event
stats *statsCache
}
// Watch watches on a key and returns a watch.Interface that transfers relevant notifications.
@ -134,6 +136,7 @@ func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, re
internalPred: pred,
incomingEventChan: make(chan *event, incomingBufSize),
resultChan: make(chan watch.Event, outgoingBufSize),
stats: w.stats,
}
if pred.Empty() {
// The filter doesn't filter out any object.
@ -402,6 +405,14 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}, initialEventsEnd
}
for _, e := range wres.Events {
if wc.stats != nil {
switch e.Type {
case clientv3.EventTypePut:
wc.stats.UpdateKey(e.Kv)
case clientv3.EventTypeDelete:
wc.stats.DeleteKey(e.Kv)
}
}
metrics.RecordEtcdEvent(wc.watcher.groupResource)
parsedEvent, err := parseEvent(e)
if err != nil {

View File

@ -243,8 +243,8 @@ type Interface interface {
ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool,
preconditions *Preconditions, tryUpdate UpdateFunc, cachedExistingObject runtime.Object) error
// Count returns number of different entries under the key (generally being path prefix).
Count(ctx context.Context, key string) (int64, error)
// Stats returns storage stats.
Stats(ctx context.Context) (Stats, error)
// ReadinessCheck checks if the storage is ready for accepting requests.
ReadinessCheck() error
@ -267,8 +267,15 @@ type Interface interface {
// GetCurrentResourceVersion gets the current resource version from etcd.
// This method issues an empty list request and reads only the ResourceVersion from the object metadata
GetCurrentResourceVersion(ctx context.Context) (uint64, error)
// SetKeysFunc allows to override the function used to get keys from storage.
// This allows to replace default function that fetches keys from storage with one using cache.
SetKeysFunc(KeysFunc)
}
// KeysFunc is a function prototype to fetch keys from storage.
type KeysFunc func(context.Context) ([]string, error)
// GetOptions provides the options that may be provided for storage get operations.
type GetOptions struct {
// IgnoreNotFound determines what is returned if the requested object is not found. If
@ -370,3 +377,12 @@ func ValidateListOptions(keyPrefix string, versioner Versioner, opts ListOptions
}
return withRev, "", nil
}
// Stats provides statistics information about storage.
type Stats struct {
// ObjectCount informs about number of objects stored in the storage.
ObjectCount int64
// EstimatedAverageObjectSizeBytes informs about size of objects stored in the storage, based on size of serialized values.
// Value is an estimate, meaning it doesn't need to provide accurate nor consistent.
EstimatedAverageObjectSizeBytes int64
}

View File

@ -445,17 +445,6 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc, newListFunc fu
return nil, nil, err
}
var once sync.Once
destroyFunc := func() {
// we know that storage destroy funcs are called multiple times (due to reuse in subresources).
// Hence, we only destroy once.
// TODO: fix duplicated storage destroy calls higher level
once.Do(func() {
stopCompactor()
stopDBSizeMonitor()
client.Close()
})
}
transformer := c.Transformer
if transformer == nil {
transformer = identity.NewEncryptCheckTransformer()
@ -468,12 +457,24 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc, newListFunc fu
transformer = etcd3.WithCorruptObjErrorHandlingTransformer(transformer)
decoder = etcd3.WithCorruptObjErrorHandlingDecoder(decoder)
}
var store storage.Interface
store = etcd3.New(client, c.Codec, newFunc, newListFunc, c.Prefix, resourcePrefix, c.GroupResource, transformer, c.LeaseManagerConfig, decoder, versioner)
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AllowUnsafeMalformedObjectDeletion) {
store = etcd3.NewStoreWithUnsafeCorruptObjectDeletion(store, c.GroupResource)
store := etcd3.New(client, c.Codec, newFunc, newListFunc, c.Prefix, resourcePrefix, c.GroupResource, transformer, c.LeaseManagerConfig, decoder, versioner)
var once sync.Once
destroyFunc := func() {
// we know that storage destroy funcs are called multiple times (due to reuse in subresources).
// Hence, we only destroy once.
// TODO: fix duplicated storage destroy calls higher level
once.Do(func() {
stopCompactor()
stopDBSizeMonitor()
store.Close()
_ = client.Close()
})
}
return store, destroyFunc, nil
var storage storage.Interface = store
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AllowUnsafeMalformedObjectDeletion) {
storage = etcd3.NewStoreWithUnsafeCorruptObjectDeletion(storage, c.GroupResource)
}
return storage, destroyFunc, nil
}
// startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the

View File

@ -220,3 +220,13 @@ type BenchmarkData struct {
NamespaceNames []string
NodeNames []string
}
func RunBenchmarkStoreStats(ctx context.Context, b *testing.B, store storage.Interface) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := store.Stats(ctx)
if err != nil {
b.Fatal(err)
}
}
}

View File

@ -3127,43 +3127,84 @@ func RunTestTransformationFailure(ctx context.Context, t *testing.T, store Inter
}
}
func RunTestCount(ctx context.Context, t *testing.T, store storage.Interface) {
resourceA := "/foo.bar.io/abc"
func RunTestStats(ctx context.Context, t *testing.T, store storage.Interface, codec runtime.Codec, transformer value.Transformer, sizeEnabled bool) {
assertStats(t, store, sizeEnabled, storage.Stats{ObjectCount: 0, EstimatedAverageObjectSizeBytes: 0})
// resourceA is intentionally a prefix of resourceB to ensure that the count
// for resourceA does not include any objects from resourceB.
resourceB := fmt.Sprintf("%sdef", resourceA)
resourceACountExpected := 5
for i := 1; i <= resourceACountExpected; i++ {
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("foo-%d", i)}}
key := fmt.Sprintf("%s/%d", resourceA, i)
if err := store.Create(ctx, key, obj, nil, 0); err != nil {
t.Fatalf("Create failed: %v", err)
}
foo := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
fooKey := computePodKey(foo)
if err := store.Create(ctx, fooKey, foo, nil, 0); err != nil {
t.Fatalf("Create failed: %v", err)
}
fooSize := objectSize(t, codec, foo, transformer)
assertStats(t, store, sizeEnabled, storage.Stats{ObjectCount: 1, EstimatedAverageObjectSizeBytes: fooSize})
resourceBCount := 4
for i := 1; i <= resourceBCount; i++ {
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("foo-%d", i)}}
key := fmt.Sprintf("%s/%d", resourceB, i)
if err := store.Create(ctx, key, obj, nil, 0); err != nil {
t.Fatalf("Create failed: %v", err)
}
bar := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}
barKey := computePodKey(bar)
if err := store.Create(ctx, barKey, bar, nil, 0); err != nil {
t.Fatalf("Create failed: %v", err)
}
barSize := objectSize(t, codec, bar, transformer)
assertStats(t, store, sizeEnabled, storage.Stats{ObjectCount: 2, EstimatedAverageObjectSizeBytes: (fooSize + barSize) / 2})
resourceACountGot, err := store.Count(t.Context(), resourceA)
if err := store.GuaranteedUpdate(ctx, barKey, bar, false, nil,
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
pod := obj.(*example.Pod)
pod.Labels = map[string]string{"foo": "bar"}
return pod, nil
}), nil); err != nil {
t.Errorf("Update failed: %v", err)
}
// ResourceVerson is not stored.
bar.ResourceVersion = ""
barSize = objectSize(t, codec, bar, transformer)
assertStats(t, store, sizeEnabled, storage.Stats{ObjectCount: 2, EstimatedAverageObjectSizeBytes: (fooSize + barSize) / 2})
if err := store.Delete(ctx, fooKey, foo, nil, storage.ValidateAllObjectFunc, nil, storage.DeleteOptions{}); err != nil {
t.Errorf("Delete failed: %v", err)
}
assertStats(t, store, sizeEnabled, storage.Stats{ObjectCount: 1, EstimatedAverageObjectSizeBytes: barSize})
if err := store.Delete(ctx, fooKey, foo, nil, storage.ValidateAllObjectFunc, nil, storage.DeleteOptions{}); err == nil {
t.Errorf("Delete expected to fail")
}
assertStats(t, store, sizeEnabled, storage.Stats{ObjectCount: 1, EstimatedAverageObjectSizeBytes: barSize})
if err := store.Delete(ctx, barKey, bar, nil, storage.ValidateAllObjectFunc, nil, storage.DeleteOptions{}); err != nil {
t.Errorf("Delete failed: %v", err)
}
assertStats(t, store, sizeEnabled, storage.Stats{ObjectCount: 0, EstimatedAverageObjectSizeBytes: 0})
}
func assertStats(t *testing.T, store storage.Interface, sizeEnabled bool, expectStats storage.Stats) {
t.Helper()
// Execute consistent LIST to refresh state of cache.
err := store.GetList(t.Context(), "/pods", storage.ListOptions{Recursive: true, Predicate: storage.Everything}, &example.PodList{})
if err != nil {
t.Fatalf("store.Count failed: %v", err)
t.Fatalf("GetList failed: %v", err)
}
stats, err := store.Stats(t.Context())
if err != nil {
t.Fatalf("store.Stats failed: %v", err)
}
// count for resourceA should not include the objects for resourceB
// even though resourceA is a prefix of resourceB.
if int64(resourceACountExpected) != resourceACountGot {
t.Fatalf("store.Count for resource %s: expected %d but got %d", resourceA, resourceACountExpected, resourceACountGot)
if !sizeEnabled {
expectStats.EstimatedAverageObjectSizeBytes = 0
}
if expectStats != stats {
t.Errorf("store.Stats: expected %+v but got %+v", expectStats, stats)
}
}
func objectSize(t *testing.T, codec runtime.Codec, obj runtime.Object, transformer value.Transformer) int64 {
data, err := runtime.Encode(codec, obj)
if err != nil {
t.Fatalf("Encode failed: %v", err)
}
data, err = transformer.TransformToStorage(t.Context(), data, value.DefaultContext{})
if err != nil {
t.Fatalf("Transform failed: %v", err)
}
return int64(len(data))
}
func RunTestListPaging(ctx context.Context, t *testing.T, store storage.Interface) {

View File

@ -27,10 +27,10 @@ import (
"k8s.io/klog/v2"
)
func newListWorkEstimator(countFn objectCountGetterFunc, config *WorkEstimatorConfig, maxSeatsFn maxSeatsFunc) WorkEstimatorFunc {
func newListWorkEstimator(countFn statsGetterFunc, config *WorkEstimatorConfig, maxSeatsFn maxSeatsFunc) WorkEstimatorFunc {
estimator := &listWorkEstimator{
config: config,
countGetterFn: countFn,
statsGetterFn: countFn,
maxSeatsFn: maxSeatsFn,
}
return estimator.estimate
@ -38,7 +38,7 @@ func newListWorkEstimator(countFn objectCountGetterFunc, config *WorkEstimatorCo
type listWorkEstimator struct {
config *WorkEstimatorConfig
countGetterFn objectCountGetterFunc
statsGetterFn statsGetterFunc
maxSeatsFn maxSeatsFunc
}
@ -90,7 +90,8 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe
listFromStorage := result.ShouldDelegate
isListFromCache := requestInfo.Verb == "watch" || !listFromStorage
numStored, err := e.countGetterFn(key(requestInfo))
stats, err := e.statsGetterFn(key(requestInfo))
numStored := stats.ObjectCount
switch {
case err == ObjectCountStaleErr:
// object count going stale is indicative of degradation, so we should

View File

@ -22,6 +22,7 @@ import (
"time"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/storage"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
)
@ -55,7 +56,7 @@ var (
type StorageObjectCountTracker interface {
// Set is invoked to update the current number of total
// objects for the given resource
Set(string, int64)
Set(string, storage.Stats)
// Get returns the total number of objects for the given resource.
// The following errors are returned:
@ -63,7 +64,7 @@ type StorageObjectCountTracker interface {
// failures ObjectCountStaleErr is returned.
// - if the given resource is not being tracked then
// ObjectCountNotFoundErr is returned.
Get(string) (int64, error)
Get(string) (storage.Stats, error)
// RunUntil starts all the necessary maintenance.
RunUntil(stopCh <-chan struct{})
@ -75,14 +76,14 @@ type StorageObjectCountTracker interface {
func NewStorageObjectCountTracker() StorageObjectCountTracker {
return &objectCountTracker{
clock: &clock.RealClock{},
counts: map[string]*timestampedCount{},
counts: map[string]*timestampedStats{},
}
}
// timestampedCount stores the count of a given resource with a last updated
// timestampedStats stores the count of a given resource with a last updated
// timestamp so we can prune it after it goes stale for certain threshold.
type timestampedCount struct {
count int64
type timestampedStats struct {
storage.Stats
lastUpdatedAt time.Time
}
@ -92,11 +93,11 @@ type objectCountTracker struct {
clock clock.PassiveClock
lock sync.RWMutex
counts map[string]*timestampedCount
counts map[string]*timestampedStats
}
func (t *objectCountTracker) Set(groupResource string, count int64) {
if count <= -1 {
func (t *objectCountTracker) Set(groupResource string, stats storage.Stats) {
if stats.ObjectCount <= -1 {
// a value of -1 indicates that the 'Count' call failed to contact
// the storage layer, in most cases this error can be transient.
// we will continue to work with the count that is in the cache
@ -114,18 +115,18 @@ func (t *objectCountTracker) Set(groupResource string, count int64) {
defer t.lock.Unlock()
if item, ok := t.counts[groupResource]; ok {
item.count = count
item.Stats = stats
item.lastUpdatedAt = now
return
}
t.counts[groupResource] = &timestampedCount{
count: count,
t.counts[groupResource] = &timestampedStats{
Stats: stats,
lastUpdatedAt: now,
}
}
func (t *objectCountTracker) Get(groupResource string) (int64, error) {
func (t *objectCountTracker) Get(groupResource string) (storage.Stats, error) {
staleThreshold := t.clock.Now().Add(-staleTolerationThreshold)
t.lock.RLock()
@ -133,11 +134,11 @@ func (t *objectCountTracker) Get(groupResource string) (int64, error) {
if item, ok := t.counts[groupResource]; ok {
if item.lastUpdatedAt.Before(staleThreshold) {
return item.count, ObjectCountStaleErr
return item.Stats, ObjectCountStaleErr
}
return item.count, nil
return item.Stats, nil
}
return 0, ObjectCountNotFoundErr
return storage.Stats{}, ObjectCountNotFoundErr
}
// RunUntil runs all the necessary maintenance.

View File

@ -22,6 +22,7 @@ import (
"time"
"github.com/google/go-cmp/cmp"
"k8s.io/apiserver/pkg/storage"
testclock "k8s.io/utils/clock/testing"
)
@ -69,21 +70,21 @@ func TestStorageObjectCountTracker(t *testing.T) {
fakeClock := &testclock.FakePassiveClock{}
tracker := &objectCountTracker{
clock: fakeClock,
counts: map[string]*timestampedCount{},
counts: map[string]*timestampedStats{},
}
key := "foo.bar.resource"
now := time.Now()
fakeClock.SetTime(now.Add(-test.lastUpdated))
tracker.Set(key, test.count)
tracker.Set(key, storage.Stats{ObjectCount: test.count})
fakeClock.SetTime(now)
countGot, err := tracker.Get(key)
stats, err := tracker.Get(key)
if test.errExpected != err {
t.Errorf("Expected error: %v, but got: %v", test.errExpected, err)
}
if test.countExpected != countGot {
t.Errorf("Expected count: %d, but got: %d", test.countExpected, countGot)
if test.countExpected != stats.ObjectCount {
t.Errorf("Expected count: %d, but got: %d", test.countExpected, stats.ObjectCount)
}
if test.count <= -1 && len(tracker.counts) > 0 {
t.Errorf("Expected the cache to be empty, but got: %d", len(tracker.counts))
@ -96,23 +97,23 @@ func TestStorageObjectCountTrackerWithPrune(t *testing.T) {
fakeClock := &testclock.FakePassiveClock{}
tracker := &objectCountTracker{
clock: fakeClock,
counts: map[string]*timestampedCount{},
counts: map[string]*timestampedStats{},
}
now := time.Now()
fakeClock.SetTime(now.Add(-61 * time.Minute))
tracker.Set("k1", 61)
tracker.Set("k1", storage.Stats{ObjectCount: 61})
fakeClock.SetTime(now.Add(-60 * time.Minute))
tracker.Set("k2", 60)
tracker.Set("k2", storage.Stats{ObjectCount: 60})
// we are going to prune keys that are stale for >= 1h
// so the above keys are expected to be pruned and the
// key below should not be pruned.
mostRecent := now.Add(-59 * time.Minute)
fakeClock.SetTime(mostRecent)
tracker.Set("k3", 59)
expected := map[string]*timestampedCount{
tracker.Set("k3", storage.Stats{ObjectCount: 59})
expected := map[string]*timestampedStats{
"k3": {
count: 59,
Stats: storage.Stats{ObjectCount: 59},
lastUpdatedAt: mostRecent,
},
}

View File

@ -23,6 +23,7 @@ import (
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
@ -56,9 +57,9 @@ func (we *WorkEstimate) MaxSeats() int {
return int(we.FinalSeats)
}
// objectCountGetterFunc represents a function that gets the total
// statsGetterFunc represents a function that gets the total
// number of objects for a given resource.
type objectCountGetterFunc func(string) (int64, error)
type statsGetterFunc func(string) (storage.Stats, error)
// watchCountGetterFunc represents a function that gets the total
// number of watchers potentially interested in a given request.
@ -71,7 +72,7 @@ type maxSeatsFunc func(priorityLevelName string) uint64
// NewWorkEstimator estimates the work that will be done by a given request,
// if no WorkEstimatorFunc matches the given request then the default
// work estimate of 1 seat is allocated to the request.
func NewWorkEstimator(objectCountFn objectCountGetterFunc, watchCountFn watchCountGetterFunc, config *WorkEstimatorConfig, maxSeatsFn maxSeatsFunc) WorkEstimatorFunc {
func NewWorkEstimator(objectCountFn statsGetterFunc, watchCountFn watchCountGetterFunc, config *WorkEstimatorConfig, maxSeatsFn maxSeatsFunc) WorkEstimatorFunc {
estimator := &workEstimator{
minimumSeats: config.MinimumSeats,
maximumSeatsLimit: config.MaximumSeatsLimit,

View File

@ -24,6 +24,7 @@ import (
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
)
@ -547,8 +548,8 @@ func TestWorkEstimator(t *testing.T) {
if len(counts) == 0 {
counts = map[string]int64{}
}
countsFn := func(key string) (int64, error) {
return counts[key], test.countErr
countsFn := func(key string) (storage.Stats, error) {
return storage.Stats{ObjectCount: counts[key]}, test.countErr
}
watchCountsFn := func(_ *apirequest.RequestInfo) int {
return test.watchCount