Estimate average size of objects in etcd and plug it into request cost estimator

Kubernetes-commit: ec78b8305ad392f6faf4e5247ea33ceabb484c3f
This commit is contained in:
Marek Siarkowicz 2025-06-13 16:34:42 +02:00 committed by Kubernetes Publisher
parent 2ca55b4189
commit cf27dab482
18 changed files with 454 additions and 100 deletions

View File

@ -176,6 +176,11 @@ const (
// if the generated name conflicts with an existing resource name, up to a maximum number of 7 retries.
RetryGenerateName featuregate.Feature = "RetryGenerateName"
// owner: @serathius
//
// Enables APF to use size of objects for estimating request cost.
SizeBasedListCostEstimate featuregate.Feature = "SizeBasedListCostEstimate"
// owner: @cici37
//
// StrictCostEnforcementForVAP is used to apply strict CEL cost validation for ValidatingAdmissionPolicy.
@ -362,6 +367,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
{Version: version.MustParse("1.33"), Default: false, PreRelease: featuregate.Deprecated},
},
SizeBasedListCostEstimate: {
{Version: version.MustParse("1.34"), Default: true, PreRelease: featuregate.Beta},
},
StorageVersionAPI: {
{Version: version.MustParse("1.20"), Default: false, PreRelease: featuregate.Alpha},
},

View File

@ -107,8 +107,8 @@ func (s *DryRunnableStorage) GuaranteedUpdate(
return s.Storage.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, cachedExistingObject)
}
func (s *DryRunnableStorage) Count(ctx context.Context, key string) (int64, error) {
return s.Storage.Count(ctx, key)
func (s *DryRunnableStorage) Stats(ctx context.Context) (storage.Stats, error) {
return s.Storage.Stats(ctx)
}
func (s *DryRunnableStorage) copyInto(in, out runtime.Object) error {

View File

@ -1667,15 +1667,15 @@ func (e *Store) startObservingCount(period time.Duration, objectCountTracker flo
klog.V(2).InfoS("Monitoring resource count at path", "resource", resourceName, "path", "<storage-prefix>/"+prefix)
stopCh := make(chan struct{})
go wait.JitterUntil(func() {
count, err := e.Storage.Count(ctx, prefix)
stats, err := e.Storage.Stats(ctx)
if err != nil {
klog.V(5).InfoS("Failed to update storage count metric", "err", err)
count = -1
stats.ObjectCount = -1
}
metrics.UpdateObjectCount(e.DefaultQualifiedResource, count)
metrics.UpdateObjectCount(e.DefaultQualifiedResource, stats.ObjectCount)
if objectCountTracker != nil {
objectCountTracker.Set(resourceName, count)
objectCountTracker.Set(resourceName, stats)
}
}, period, resourceCountPollPeriodJitter, true, stopCh)
return func() { close(stopCh) }

View File

@ -38,6 +38,7 @@ import (
"k8s.io/apiserver/pkg/storage"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
storagetesting "k8s.io/apiserver/pkg/storage/testing"
"k8s.io/apiserver/pkg/storage/value/encrypt/identity"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/cache"
featuregatetesting "k8s.io/component-base/featuregate/testing"
@ -279,10 +280,15 @@ func TestTransformationFailure(t *testing.T) {
// TODO(#109831): Enable use of this test and run it.
}
func TestCount(t *testing.T) {
ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate)
storagetesting.RunTestCount(ctx, t, cacher)
func TestStats(t *testing.T) {
for _, sizeBasedListCostEstimate := range []bool{true, false} {
t.Run(fmt.Sprintf("SizeBasedListCostEstimate=%v", sizeBasedListCostEstimate), func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SizeBasedListCostEstimate, sizeBasedListCostEstimate)
ctx, cacher, terminate := testSetup(t)
t.Cleanup(terminate)
storagetesting.RunTestStats(ctx, t, cacher, codecs.LegacyCodec(examplev1.SchemeGroupVersion), identity.NewEncryptCheckTransformer(), sizeBasedListCostEstimate)
})
}
}
func TestWatch(t *testing.T) {

View File

@ -189,8 +189,8 @@ func (d *dummyStorage) GetList(ctx context.Context, resPrefix string, opts stora
func (d *dummyStorage) GuaranteedUpdate(_ context.Context, _ string, _ runtime.Object, _ bool, _ *storage.Preconditions, _ storage.UpdateFunc, _ runtime.Object) error {
return fmt.Errorf("unimplemented")
}
func (d *dummyStorage) Count(_ context.Context, _ string) (int64, error) {
return 0, fmt.Errorf("unimplemented")
func (d *dummyStorage) Stats(_ context.Context) (storage.Stats, error) {
return storage.Stats{}, fmt.Errorf("unimplemented")
}
func (d *dummyStorage) ReadinessCheck() error {
return nil

View File

@ -257,8 +257,8 @@ func (c *CacheDelegator) GuaranteedUpdate(ctx context.Context, key string, desti
return c.storage.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, nil)
}
func (c *CacheDelegator) Count(ctx context.Context, pathPrefix string) (int64, error) {
return c.storage.Count(ctx, pathPrefix)
func (c *CacheDelegator) Stats(ctx context.Context) (storage.Stats, error) {
return c.storage.Stats(ctx)
}
func (c *CacheDelegator) ReadinessCheck() error {

133
pkg/storage/etcd3/stats.go Normal file
View File

@ -0,0 +1,133 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"context"
"sync"
"go.etcd.io/etcd/api/v3/mvccpb"
"k8s.io/apiserver/pkg/storage"
)
type keysFunc func(context.Context) ([]string, error)
func newStatsCache(getKeys keysFunc) *statsCache {
sc := &statsCache{
getKeys: getKeys,
keys: make(map[string]sizeRevision),
}
return sc
}
// statsCache efficiently estimates the average object size
// based on the last observed state of individual keys.
// By plugging statsCache into GetList and Watch functions,
// a fairly accurate estimate of object sizes can be maintained
// without additional requests to the underlying storage.
// To handle potential out-of-order or incomplete data,
// it uses a per-key revision to identify the newer state.
// This approach may leak keys if delete events are not observed,
// thus we run a background goroutine to periodically cleanup keys if needed.
type statsCache struct {
getKeys keysFunc
lock sync.Mutex
keys map[string]sizeRevision
}
type sizeRevision struct {
sizeBytes int64
revision int64
}
func (sc *statsCache) Stats(ctx context.Context) (storage.Stats, error) {
keys, err := sc.getKeys(ctx)
if err != nil {
return storage.Stats{}, err
}
stats := storage.Stats{
ObjectCount: int64(len(keys)),
}
sc.lock.Lock()
defer sc.lock.Unlock()
sc.cleanKeys(keys)
if len(sc.keys) != 0 {
stats.EstimatedAverageObjectSizeBytes = sc.keySizes() / int64(len(sc.keys))
}
return stats, nil
}
func (sc *statsCache) cleanKeys(keepKeys []string) {
newKeys := make(map[string]sizeRevision, len(keepKeys))
for _, key := range keepKeys {
keySizeRevision, ok := sc.keys[key]
if !ok {
continue
}
newKeys[key] = keySizeRevision
}
sc.keys = newKeys
}
func (sc *statsCache) keySizes() (totalSize int64) {
for _, sizeRevision := range sc.keys {
totalSize += sizeRevision.sizeBytes
}
return totalSize
}
func (sc *statsCache) Update(kvs []*mvccpb.KeyValue) {
sc.lock.Lock()
defer sc.lock.Unlock()
for _, kv := range kvs {
sc.updateKey(kv)
}
}
func (sc *statsCache) UpdateKey(kv *mvccpb.KeyValue) {
sc.lock.Lock()
defer sc.lock.Unlock()
sc.updateKey(kv)
}
func (sc *statsCache) updateKey(kv *mvccpb.KeyValue) {
key := string(kv.Key)
keySizeRevision := sc.keys[key]
if keySizeRevision.revision >= kv.ModRevision {
return
}
sc.keys[key] = sizeRevision{
sizeBytes: int64(len(kv.Value)),
revision: kv.ModRevision,
}
}
func (sc *statsCache) DeleteKey(kv *mvccpb.KeyValue) {
sc.lock.Lock()
defer sc.lock.Unlock()
key := string(kv.Key)
keySizeRevision := sc.keys[key]
if keySizeRevision.revision >= kv.ModRevision {
return
}
delete(sc.keys, key)
}

View File

@ -0,0 +1,120 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/api/v3/mvccpb"
)
func TestStatsCache(t *testing.T) {
ctx := t.Context()
store := newStatsCache(func(ctx context.Context) ([]string, error) { return []string{}, nil })
stats, err := store.Stats(ctx)
require.NoError(t, err)
assert.Equal(t, int64(0), stats.EstimatedAverageObjectSizeBytes)
store.UpdateKey(&mvccpb.KeyValue{Key: []byte("foo1"), Value: []byte("0123456789"), ModRevision: 2})
store.getKeys = func(ctx context.Context) ([]string, error) { return []string{"foo1"}, nil }
stats, err = store.Stats(ctx)
require.NoError(t, err)
assert.Equal(t, int64(10), stats.EstimatedAverageObjectSizeBytes)
store.UpdateKey(&mvccpb.KeyValue{Key: []byte("foo2"), Value: []byte("01234567890123456789"), ModRevision: 3})
store.getKeys = func(ctx context.Context) ([]string, error) { return []string{"foo1", "foo2"}, nil }
stats, err = store.Stats(ctx)
require.NoError(t, err)
assert.Equal(t, int64(15), stats.EstimatedAverageObjectSizeBytes)
store.UpdateKey(&mvccpb.KeyValue{Key: []byte("foo1"), Value: []byte("012345678901234567890123456789"), ModRevision: 4})
stats, err = store.Stats(ctx)
require.NoError(t, err)
assert.Equal(t, int64(25), stats.EstimatedAverageObjectSizeBytes)
store.UpdateKey(&mvccpb.KeyValue{Key: []byte("foo2"), Value: []byte("0123456789"), ModRevision: 5})
stats, err = store.Stats(ctx)
require.NoError(t, err)
assert.Equal(t, int64(20), stats.EstimatedAverageObjectSizeBytes)
store.DeleteKey(&mvccpb.KeyValue{Key: []byte("foo1"), ModRevision: 6})
store.getKeys = func(ctx context.Context) ([]string, error) { return []string{"foo2"}, nil }
stats, err = store.Stats(ctx)
require.NoError(t, err)
assert.Equal(t, int64(10), stats.EstimatedAverageObjectSizeBytes)
// Snapshot revision from revision 3
store.Update([]*mvccpb.KeyValue{
{Key: []byte("foo1"), Value: []byte("0123456789"), ModRevision: 2},
{Key: []byte("foo2"), Value: []byte("01234567890123456789"), ModRevision: 3},
})
stats, err = store.Stats(ctx)
require.NoError(t, err)
assert.Equal(t, int64(10), stats.EstimatedAverageObjectSizeBytes)
// Replay from revision 2
store.UpdateKey(&mvccpb.KeyValue{Key: []byte("foo1"), Value: []byte("0123456789"), ModRevision: 2})
stats, err = store.Stats(ctx)
require.NoError(t, err)
assert.Equal(t, int64(10), stats.EstimatedAverageObjectSizeBytes)
store.UpdateKey(&mvccpb.KeyValue{Key: []byte("foo2"), Value: []byte("01234567890123456789"), ModRevision: 3})
stats, err = store.Stats(ctx)
require.NoError(t, err)
assert.Equal(t, int64(10), stats.EstimatedAverageObjectSizeBytes)
store.UpdateKey(&mvccpb.KeyValue{Key: []byte("foo1"), Value: []byte("012345678901234567890123456789"), ModRevision: 4})
stats, err = store.Stats(ctx)
require.NoError(t, err)
assert.Equal(t, int64(10), stats.EstimatedAverageObjectSizeBytes)
store.UpdateKey(&mvccpb.KeyValue{Key: []byte("foo2"), Value: []byte("0123456789"), ModRevision: 5})
stats, err = store.Stats(ctx)
require.NoError(t, err)
assert.Equal(t, int64(10), stats.EstimatedAverageObjectSizeBytes)
store.DeleteKey(&mvccpb.KeyValue{Key: []byte("foo1"), ModRevision: 6})
stats, err = store.Stats(ctx)
require.NoError(t, err)
assert.Equal(t, int64(10), stats.EstimatedAverageObjectSizeBytes)
store.DeleteKey(&mvccpb.KeyValue{Key: []byte("foo1"), ModRevision: 7})
store.getKeys = func(ctx context.Context) ([]string, error) { return []string{}, nil }
stats, err = store.Stats(ctx)
require.NoError(t, err)
assert.Equal(t, int64(0), stats.EstimatedAverageObjectSizeBytes)
// Old snapshot might restore old revision if keys were recreated
store.getKeys = func(ctx context.Context) ([]string, error) { return []string{"foo1", "foo2"}, nil }
store.Update([]*mvccpb.KeyValue{
{Key: []byte("foo1"), Value: []byte("0123456789"), ModRevision: 2},
{Key: []byte("foo2"), Value: []byte("01234567890123456789"), ModRevision: 3},
})
stats, err = store.Stats(ctx)
require.NoError(t, err)
assert.Equal(t, int64(15), stats.EstimatedAverageObjectSizeBytes)
// Cleanup if keys were deleted
store.getKeys = func(ctx context.Context) ([]string, error) { return []string{}, nil }
stats, err = store.Stats(ctx)
require.NoError(t, err)
assert.Equal(t, int64(0), stats.EstimatedAverageObjectSizeBytes)
}

View File

@ -90,6 +90,7 @@ type store struct {
resourcePrefix string
newListFunc func() runtime.Object
stats *statsCache
}
var _ storage.Interface = (*store)(nil)
@ -183,6 +184,11 @@ func New(c *kubernetes.Client, codec runtime.Codec, newFunc, newListFunc func()
resourcePrefix: resourcePrefix,
newListFunc: newListFunc,
}
if utilfeature.DefaultFeatureGate.Enabled(features.SizeBasedListCostEstimate) {
stats := newStatsCache(s.getKeys)
s.stats = stats
w.stats = stats
}
w.getCurrentStorageRV = func(ctx context.Context) (uint64, error) {
return s.GetCurrentResourceVersion(ctx)
@ -608,26 +614,30 @@ func getNewItemFunc(listObj runtime.Object, v reflect.Value) func() runtime.Obje
}
}
func (s *store) Count(ctx context.Context, key string) (int64, error) {
preparedKey, err := s.prepareKey(key)
if err != nil {
return 0, err
func (s *store) Stats(ctx context.Context) (stats storage.Stats, err error) {
if s.stats != nil {
return s.stats.Stats(ctx)
}
// We need to make sure the key ended with "/" so that we only get children "directories".
// e.g. if we have key "/a", "/a/b", "/ab", getting keys with prefix "/a" will return all three,
// while with prefix "/a/" will return only "/a/b" which is the correct answer.
if !strings.HasSuffix(preparedKey, "/") {
preparedKey += "/"
}
startTime := time.Now()
count, err := s.client.Kubernetes.Count(ctx, preparedKey, kubernetes.CountOptions{})
count, err := s.client.Kubernetes.Count(ctx, s.pathPrefix, kubernetes.CountOptions{})
metrics.RecordEtcdRequest("listWithCount", s.groupResource, err, startTime)
if err != nil {
return 0, err
return storage.Stats{}, err
}
return count, nil
return storage.Stats{
ObjectCount: count,
}, nil
}
func (s *store) getKeys(ctx context.Context) ([]string, error) {
startTime := time.Now()
resp, err := s.client.KV.Get(ctx, s.pathPrefix, clientv3.WithPrefix(), clientv3.WithKeysOnly())
metrics.RecordEtcdRequest("listOnlyKeys", s.groupResource, err, startTime)
keys := make([]string, 0, len(resp.Kvs))
for _, kv := range resp.Kvs {
keys = append(keys, string(kv.Key))
}
return keys, nil
}
// ReadinessCheck implements storage.Interface.
@ -766,6 +776,9 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
} else {
growSlice(v, 2048, len(getResp.Kvs))
}
if s.stats != nil {
s.stats.Update(getResp.Kvs)
}
// take items from the response until the bucket is full, filtering as we go
for i, kv := range getResp.Kvs {

View File

@ -43,10 +43,13 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3/testserver"
storagetesting "k8s.io/apiserver/pkg/storage/testing"
"k8s.io/apiserver/pkg/storage/value"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2"
)
@ -343,9 +346,15 @@ func TestListResourceVersionMatch(t *testing.T) {
storagetesting.RunTestListResourceVersionMatch(ctx, t, &storeWithPrefixTransformer{store})
}
func TestCount(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunTestCount(ctx, t, store)
func TestStats(t *testing.T) {
for _, sizeBasedListCostEstimate := range []bool{true, false} {
t.Run(fmt.Sprintf("SizeBasedListCostEstimate=%v", sizeBasedListCostEstimate), func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SizeBasedListCostEstimate, sizeBasedListCostEstimate)
// Match transformer with cacher tests.
ctx, store, _ := testSetup(t)
storagetesting.RunTestStats(ctx, t, store, store.codec, store.transformer, sizeBasedListCostEstimate)
})
}
}
// =======================================================================
@ -666,8 +675,6 @@ func TestInvalidKeys(t *testing.T) {
expectInvalidKey("Get", store.Get(ctx, invalidKey, storage.GetOptions{}, nil))
expectInvalidKey("GetList", store.GetList(ctx, invalidKey, storage.ListOptions{}, nil))
expectInvalidKey("GuaranteedUpdate", store.GuaranteedUpdate(ctx, invalidKey, nil, true, nil, nil, nil))
_, countErr := store.Count(t.Context(), invalidKey)
expectInvalidKey("Count", countErr)
}
func BenchmarkStore_GetList(b *testing.B) {

View File

@ -77,6 +77,7 @@ type watcher struct {
versioner storage.Versioner
transformer value.Transformer
getCurrentStorageRV func(context.Context) (uint64, error)
stats *statsCache
}
// watchChan implements watch.Interface.
@ -91,6 +92,7 @@ type watchChan struct {
cancel context.CancelFunc
incomingEventChan chan *event
resultChan chan watch.Event
stats *statsCache
}
// Watch watches on a key and returns a watch.Interface that transfers relevant notifications.
@ -134,6 +136,7 @@ func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, re
internalPred: pred,
incomingEventChan: make(chan *event, incomingBufSize),
resultChan: make(chan watch.Event, outgoingBufSize),
stats: w.stats,
}
if pred.Empty() {
// The filter doesn't filter out any object.
@ -402,6 +405,14 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}, initialEventsEnd
}
for _, e := range wres.Events {
if wc.stats != nil {
switch e.Type {
case clientv3.EventTypePut:
wc.stats.UpdateKey(e.Kv)
case clientv3.EventTypeDelete:
wc.stats.DeleteKey(e.Kv)
}
}
metrics.RecordEtcdEvent(wc.watcher.groupResource)
parsedEvent, err := parseEvent(e)
if err != nil {

View File

@ -243,8 +243,8 @@ type Interface interface {
ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool,
preconditions *Preconditions, tryUpdate UpdateFunc, cachedExistingObject runtime.Object) error
// Count returns number of different entries under the key (generally being path prefix).
Count(ctx context.Context, key string) (int64, error)
// Stats returns storage stats.
Stats(ctx context.Context) (Stats, error)
// ReadinessCheck checks if the storage is ready for accepting requests.
ReadinessCheck() error
@ -370,3 +370,12 @@ func ValidateListOptions(keyPrefix string, versioner Versioner, opts ListOptions
}
return withRev, "", nil
}
// Stats provides statistics information about storage.
type Stats struct {
// ObjectCount informs about number of objects stored in the storage.
ObjectCount int64
// EstimatedAverageObjectSizeBytes informs about size of objects stored in the storage, based on size of serialized values.
// Value is an estimate, meaning it doesn't need to provide accurate nor consistent.
EstimatedAverageObjectSizeBytes int64
}

View File

@ -3127,43 +3127,84 @@ func RunTestTransformationFailure(ctx context.Context, t *testing.T, store Inter
}
}
func RunTestCount(ctx context.Context, t *testing.T, store storage.Interface) {
resourceA := "/foo.bar.io/abc"
func RunTestStats(ctx context.Context, t *testing.T, store storage.Interface, codec runtime.Codec, transformer value.Transformer, sizeEnabled bool) {
assertStats(t, store, sizeEnabled, storage.Stats{ObjectCount: 0, EstimatedAverageObjectSizeBytes: 0})
// resourceA is intentionally a prefix of resourceB to ensure that the count
// for resourceA does not include any objects from resourceB.
resourceB := fmt.Sprintf("%sdef", resourceA)
resourceACountExpected := 5
for i := 1; i <= resourceACountExpected; i++ {
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("foo-%d", i)}}
key := fmt.Sprintf("%s/%d", resourceA, i)
if err := store.Create(ctx, key, obj, nil, 0); err != nil {
t.Fatalf("Create failed: %v", err)
}
foo := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
fooKey := computePodKey(foo)
if err := store.Create(ctx, fooKey, foo, nil, 0); err != nil {
t.Fatalf("Create failed: %v", err)
}
fooSize := objectSize(t, codec, foo, transformer)
assertStats(t, store, sizeEnabled, storage.Stats{ObjectCount: 1, EstimatedAverageObjectSizeBytes: fooSize})
resourceBCount := 4
for i := 1; i <= resourceBCount; i++ {
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("foo-%d", i)}}
key := fmt.Sprintf("%s/%d", resourceB, i)
if err := store.Create(ctx, key, obj, nil, 0); err != nil {
t.Fatalf("Create failed: %v", err)
}
bar := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}
barKey := computePodKey(bar)
if err := store.Create(ctx, barKey, bar, nil, 0); err != nil {
t.Fatalf("Create failed: %v", err)
}
barSize := objectSize(t, codec, bar, transformer)
assertStats(t, store, sizeEnabled, storage.Stats{ObjectCount: 2, EstimatedAverageObjectSizeBytes: (fooSize + barSize) / 2})
resourceACountGot, err := store.Count(t.Context(), resourceA)
if err := store.GuaranteedUpdate(ctx, barKey, bar, false, nil,
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
pod := obj.(*example.Pod)
pod.Labels = map[string]string{"foo": "bar"}
return pod, nil
}), nil); err != nil {
t.Errorf("Update failed: %v", err)
}
// ResourceVerson is not stored.
bar.ResourceVersion = ""
barSize = objectSize(t, codec, bar, transformer)
assertStats(t, store, sizeEnabled, storage.Stats{ObjectCount: 2, EstimatedAverageObjectSizeBytes: (fooSize + barSize) / 2})
if err := store.Delete(ctx, fooKey, foo, nil, storage.ValidateAllObjectFunc, nil, storage.DeleteOptions{}); err != nil {
t.Errorf("Delete failed: %v", err)
}
assertStats(t, store, sizeEnabled, storage.Stats{ObjectCount: 1, EstimatedAverageObjectSizeBytes: barSize})
if err := store.Delete(ctx, fooKey, foo, nil, storage.ValidateAllObjectFunc, nil, storage.DeleteOptions{}); err == nil {
t.Errorf("Delete expected to fail")
}
assertStats(t, store, sizeEnabled, storage.Stats{ObjectCount: 1, EstimatedAverageObjectSizeBytes: barSize})
if err := store.Delete(ctx, barKey, bar, nil, storage.ValidateAllObjectFunc, nil, storage.DeleteOptions{}); err != nil {
t.Errorf("Delete failed: %v", err)
}
assertStats(t, store, sizeEnabled, storage.Stats{ObjectCount: 0, EstimatedAverageObjectSizeBytes: 0})
}
func assertStats(t *testing.T, store storage.Interface, sizeEnabled bool, expectStats storage.Stats) {
t.Helper()
// Execute consistent LIST to refresh state of cache.
err := store.GetList(t.Context(), "/pods", storage.ListOptions{Recursive: true, Predicate: storage.Everything}, &example.PodList{})
if err != nil {
t.Fatalf("store.Count failed: %v", err)
t.Fatalf("GetList failed: %v", err)
}
stats, err := store.Stats(t.Context())
if err != nil {
t.Fatalf("store.Stats failed: %v", err)
}
// count for resourceA should not include the objects for resourceB
// even though resourceA is a prefix of resourceB.
if int64(resourceACountExpected) != resourceACountGot {
t.Fatalf("store.Count for resource %s: expected %d but got %d", resourceA, resourceACountExpected, resourceACountGot)
if !sizeEnabled {
expectStats.EstimatedAverageObjectSizeBytes = 0
}
if expectStats != stats {
t.Errorf("store.Stats: expected %+v but got %+v", expectStats, stats)
}
}
func objectSize(t *testing.T, codec runtime.Codec, obj runtime.Object, transformer value.Transformer) int64 {
data, err := runtime.Encode(codec, obj)
if err != nil {
t.Fatalf("Encode failed: %v", err)
}
data, err = transformer.TransformToStorage(t.Context(), data, value.DefaultContext{})
if err != nil {
t.Fatalf("Transform failed: %v", err)
}
return int64(len(data))
}
func RunTestListPaging(ctx context.Context, t *testing.T, store storage.Interface) {

View File

@ -27,10 +27,10 @@ import (
"k8s.io/klog/v2"
)
func newListWorkEstimator(countFn objectCountGetterFunc, config *WorkEstimatorConfig, maxSeatsFn maxSeatsFunc) WorkEstimatorFunc {
func newListWorkEstimator(countFn statsGetterFunc, config *WorkEstimatorConfig, maxSeatsFn maxSeatsFunc) WorkEstimatorFunc {
estimator := &listWorkEstimator{
config: config,
countGetterFn: countFn,
statsGetterFn: countFn,
maxSeatsFn: maxSeatsFn,
}
return estimator.estimate
@ -38,7 +38,7 @@ func newListWorkEstimator(countFn objectCountGetterFunc, config *WorkEstimatorCo
type listWorkEstimator struct {
config *WorkEstimatorConfig
countGetterFn objectCountGetterFunc
statsGetterFn statsGetterFunc
maxSeatsFn maxSeatsFunc
}
@ -90,7 +90,8 @@ func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLe
listFromStorage := result.ShouldDelegate
isListFromCache := requestInfo.Verb == "watch" || !listFromStorage
numStored, err := e.countGetterFn(key(requestInfo))
stats, err := e.statsGetterFn(key(requestInfo))
numStored := stats.ObjectCount
switch {
case err == ObjectCountStaleErr:
// object count going stale is indicative of degradation, so we should

View File

@ -22,6 +22,7 @@ import (
"time"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/storage"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
)
@ -55,7 +56,7 @@ var (
type StorageObjectCountTracker interface {
// Set is invoked to update the current number of total
// objects for the given resource
Set(string, int64)
Set(string, storage.Stats)
// Get returns the total number of objects for the given resource.
// The following errors are returned:
@ -63,7 +64,7 @@ type StorageObjectCountTracker interface {
// failures ObjectCountStaleErr is returned.
// - if the given resource is not being tracked then
// ObjectCountNotFoundErr is returned.
Get(string) (int64, error)
Get(string) (storage.Stats, error)
// RunUntil starts all the necessary maintenance.
RunUntil(stopCh <-chan struct{})
@ -75,14 +76,14 @@ type StorageObjectCountTracker interface {
func NewStorageObjectCountTracker() StorageObjectCountTracker {
return &objectCountTracker{
clock: &clock.RealClock{},
counts: map[string]*timestampedCount{},
counts: map[string]*timestampedStats{},
}
}
// timestampedCount stores the count of a given resource with a last updated
// timestampedStats stores the count of a given resource with a last updated
// timestamp so we can prune it after it goes stale for certain threshold.
type timestampedCount struct {
count int64
type timestampedStats struct {
storage.Stats
lastUpdatedAt time.Time
}
@ -92,11 +93,11 @@ type objectCountTracker struct {
clock clock.PassiveClock
lock sync.RWMutex
counts map[string]*timestampedCount
counts map[string]*timestampedStats
}
func (t *objectCountTracker) Set(groupResource string, count int64) {
if count <= -1 {
func (t *objectCountTracker) Set(groupResource string, stats storage.Stats) {
if stats.ObjectCount <= -1 {
// a value of -1 indicates that the 'Count' call failed to contact
// the storage layer, in most cases this error can be transient.
// we will continue to work with the count that is in the cache
@ -114,18 +115,18 @@ func (t *objectCountTracker) Set(groupResource string, count int64) {
defer t.lock.Unlock()
if item, ok := t.counts[groupResource]; ok {
item.count = count
item.Stats = stats
item.lastUpdatedAt = now
return
}
t.counts[groupResource] = &timestampedCount{
count: count,
t.counts[groupResource] = &timestampedStats{
Stats: stats,
lastUpdatedAt: now,
}
}
func (t *objectCountTracker) Get(groupResource string) (int64, error) {
func (t *objectCountTracker) Get(groupResource string) (storage.Stats, error) {
staleThreshold := t.clock.Now().Add(-staleTolerationThreshold)
t.lock.RLock()
@ -133,11 +134,11 @@ func (t *objectCountTracker) Get(groupResource string) (int64, error) {
if item, ok := t.counts[groupResource]; ok {
if item.lastUpdatedAt.Before(staleThreshold) {
return item.count, ObjectCountStaleErr
return item.Stats, ObjectCountStaleErr
}
return item.count, nil
return item.Stats, nil
}
return 0, ObjectCountNotFoundErr
return storage.Stats{}, ObjectCountNotFoundErr
}
// RunUntil runs all the necessary maintenance.

View File

@ -22,6 +22,7 @@ import (
"time"
"github.com/google/go-cmp/cmp"
"k8s.io/apiserver/pkg/storage"
testclock "k8s.io/utils/clock/testing"
)
@ -69,21 +70,21 @@ func TestStorageObjectCountTracker(t *testing.T) {
fakeClock := &testclock.FakePassiveClock{}
tracker := &objectCountTracker{
clock: fakeClock,
counts: map[string]*timestampedCount{},
counts: map[string]*timestampedStats{},
}
key := "foo.bar.resource"
now := time.Now()
fakeClock.SetTime(now.Add(-test.lastUpdated))
tracker.Set(key, test.count)
tracker.Set(key, storage.Stats{ObjectCount: test.count})
fakeClock.SetTime(now)
countGot, err := tracker.Get(key)
stats, err := tracker.Get(key)
if test.errExpected != err {
t.Errorf("Expected error: %v, but got: %v", test.errExpected, err)
}
if test.countExpected != countGot {
t.Errorf("Expected count: %d, but got: %d", test.countExpected, countGot)
if test.countExpected != stats.ObjectCount {
t.Errorf("Expected count: %d, but got: %d", test.countExpected, stats.ObjectCount)
}
if test.count <= -1 && len(tracker.counts) > 0 {
t.Errorf("Expected the cache to be empty, but got: %d", len(tracker.counts))
@ -96,23 +97,23 @@ func TestStorageObjectCountTrackerWithPrune(t *testing.T) {
fakeClock := &testclock.FakePassiveClock{}
tracker := &objectCountTracker{
clock: fakeClock,
counts: map[string]*timestampedCount{},
counts: map[string]*timestampedStats{},
}
now := time.Now()
fakeClock.SetTime(now.Add(-61 * time.Minute))
tracker.Set("k1", 61)
tracker.Set("k1", storage.Stats{ObjectCount: 61})
fakeClock.SetTime(now.Add(-60 * time.Minute))
tracker.Set("k2", 60)
tracker.Set("k2", storage.Stats{ObjectCount: 60})
// we are going to prune keys that are stale for >= 1h
// so the above keys are expected to be pruned and the
// key below should not be pruned.
mostRecent := now.Add(-59 * time.Minute)
fakeClock.SetTime(mostRecent)
tracker.Set("k3", 59)
expected := map[string]*timestampedCount{
tracker.Set("k3", storage.Stats{ObjectCount: 59})
expected := map[string]*timestampedStats{
"k3": {
count: 59,
Stats: storage.Stats{ObjectCount: 59},
lastUpdatedAt: mostRecent,
},
}

View File

@ -23,6 +23,7 @@ import (
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
@ -56,9 +57,9 @@ func (we *WorkEstimate) MaxSeats() int {
return int(we.FinalSeats)
}
// objectCountGetterFunc represents a function that gets the total
// statsGetterFunc represents a function that gets the total
// number of objects for a given resource.
type objectCountGetterFunc func(string) (int64, error)
type statsGetterFunc func(string) (storage.Stats, error)
// watchCountGetterFunc represents a function that gets the total
// number of watchers potentially interested in a given request.
@ -71,7 +72,7 @@ type maxSeatsFunc func(priorityLevelName string) uint64
// NewWorkEstimator estimates the work that will be done by a given request,
// if no WorkEstimatorFunc matches the given request then the default
// work estimate of 1 seat is allocated to the request.
func NewWorkEstimator(objectCountFn objectCountGetterFunc, watchCountFn watchCountGetterFunc, config *WorkEstimatorConfig, maxSeatsFn maxSeatsFunc) WorkEstimatorFunc {
func NewWorkEstimator(objectCountFn statsGetterFunc, watchCountFn watchCountGetterFunc, config *WorkEstimatorConfig, maxSeatsFn maxSeatsFunc) WorkEstimatorFunc {
estimator := &workEstimator{
minimumSeats: config.MinimumSeats,
maximumSeatsLimit: config.MaximumSeatsLimit,

View File

@ -24,6 +24,7 @@ import (
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
)
@ -547,8 +548,8 @@ func TestWorkEstimator(t *testing.T) {
if len(counts) == 0 {
counts = map[string]int64{}
}
countsFn := func(key string) (int64, error) {
return counts[key], test.countErr
countsFn := func(key string) (storage.Stats, error) {
return storage.Stats{ObjectCount: counts[key]}, test.countErr
}
watchCountsFn := func(_ *apirequest.RequestInfo) int {
return test.watchCount