Add context to Count()
Passing the same context will let us associate etcd traces with those from apiserver. Signed-off-by: Aleksander Mistewicz <amistewicz@google.com> Kubernetes-commit: 96b39187c5fac62e462dc348ccc1e3938464d9e1
This commit is contained in:
parent
92c5638afd
commit
7cd49caffc
|
@ -107,8 +107,8 @@ func (s *DryRunnableStorage) GuaranteedUpdate(
|
||||||
return s.Storage.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, cachedExistingObject)
|
return s.Storage.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, cachedExistingObject)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *DryRunnableStorage) Count(key string) (int64, error) {
|
func (s *DryRunnableStorage) Count(ctx context.Context, key string) (int64, error) {
|
||||||
return s.Storage.Count(key)
|
return s.Storage.Count(ctx, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *DryRunnableStorage) copyInto(in, out runtime.Object) error {
|
func (s *DryRunnableStorage) copyInto(in, out runtime.Object) error {
|
||||||
|
|
|
@ -1658,12 +1658,13 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
|
||||||
|
|
||||||
// startObservingCount starts monitoring given prefix and periodically updating metrics. It returns a function to stop collection.
|
// startObservingCount starts monitoring given prefix and periodically updating metrics. It returns a function to stop collection.
|
||||||
func (e *Store) startObservingCount(period time.Duration, objectCountTracker flowcontrolrequest.StorageObjectCountTracker) func() {
|
func (e *Store) startObservingCount(period time.Duration, objectCountTracker flowcontrolrequest.StorageObjectCountTracker) func() {
|
||||||
prefix := e.KeyRootFunc(genericapirequest.NewContext())
|
ctx := genericapirequest.NewContext()
|
||||||
|
prefix := e.KeyRootFunc(ctx)
|
||||||
resourceName := e.DefaultQualifiedResource.String()
|
resourceName := e.DefaultQualifiedResource.String()
|
||||||
klog.V(2).InfoS("Monitoring resource count at path", "resource", resourceName, "path", "<storage-prefix>/"+prefix)
|
klog.V(2).InfoS("Monitoring resource count at path", "resource", resourceName, "path", "<storage-prefix>/"+prefix)
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
go wait.JitterUntil(func() {
|
go wait.JitterUntil(func() {
|
||||||
count, err := e.Storage.Count(prefix)
|
count, err := e.Storage.Count(ctx, prefix)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.V(5).InfoS("Failed to update storage count metric", "err", err)
|
klog.V(5).InfoS("Failed to update storage count metric", "err", err)
|
||||||
count = -1
|
count = -1
|
||||||
|
|
|
@ -189,7 +189,7 @@ func (d *dummyStorage) GetList(ctx context.Context, resPrefix string, opts stora
|
||||||
func (d *dummyStorage) GuaranteedUpdate(_ context.Context, _ string, _ runtime.Object, _ bool, _ *storage.Preconditions, _ storage.UpdateFunc, _ runtime.Object) error {
|
func (d *dummyStorage) GuaranteedUpdate(_ context.Context, _ string, _ runtime.Object, _ bool, _ *storage.Preconditions, _ storage.UpdateFunc, _ runtime.Object) error {
|
||||||
return fmt.Errorf("unimplemented")
|
return fmt.Errorf("unimplemented")
|
||||||
}
|
}
|
||||||
func (d *dummyStorage) Count(_ string) (int64, error) {
|
func (d *dummyStorage) Count(_ context.Context, _ string) (int64, error) {
|
||||||
return 0, fmt.Errorf("unimplemented")
|
return 0, fmt.Errorf("unimplemented")
|
||||||
}
|
}
|
||||||
func (d *dummyStorage) ReadinessCheck() error {
|
func (d *dummyStorage) ReadinessCheck() error {
|
||||||
|
|
|
@ -265,8 +265,8 @@ func (c *CacheDelegator) GuaranteedUpdate(ctx context.Context, key string, desti
|
||||||
return c.storage.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, nil)
|
return c.storage.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *CacheDelegator) Count(pathPrefix string) (int64, error) {
|
func (c *CacheDelegator) Count(ctx context.Context, pathPrefix string) (int64, error) {
|
||||||
return c.storage.Count(pathPrefix)
|
return c.storage.Count(ctx, pathPrefix)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *CacheDelegator) ReadinessCheck() error {
|
func (c *CacheDelegator) ReadinessCheck() error {
|
||||||
|
|
|
@ -617,7 +617,7 @@ func getNewItemFunc(listObj runtime.Object, v reflect.Value) func() runtime.Obje
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) Count(key string) (int64, error) {
|
func (s *store) Count(ctx context.Context, key string) (int64, error) {
|
||||||
preparedKey, err := s.prepareKey(key)
|
preparedKey, err := s.prepareKey(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
|
@ -631,7 +631,7 @@ func (s *store) Count(key string) (int64, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
count, err := s.client.Kubernetes.Count(context.Background(), preparedKey, kubernetes.CountOptions{})
|
count, err := s.client.Kubernetes.Count(ctx, preparedKey, kubernetes.CountOptions{})
|
||||||
metrics.RecordEtcdRequest("listWithCount", s.groupResource, err, startTime)
|
metrics.RecordEtcdRequest("listWithCount", s.groupResource, err, startTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
|
|
|
@ -692,7 +692,7 @@ func TestInvalidKeys(t *testing.T) {
|
||||||
expectInvalidKey("Get", store.Get(ctx, invalidKey, storage.GetOptions{}, nil))
|
expectInvalidKey("Get", store.Get(ctx, invalidKey, storage.GetOptions{}, nil))
|
||||||
expectInvalidKey("GetList", store.GetList(ctx, invalidKey, storage.ListOptions{}, nil))
|
expectInvalidKey("GetList", store.GetList(ctx, invalidKey, storage.ListOptions{}, nil))
|
||||||
expectInvalidKey("GuaranteedUpdate", store.GuaranteedUpdate(ctx, invalidKey, nil, true, nil, nil, nil))
|
expectInvalidKey("GuaranteedUpdate", store.GuaranteedUpdate(ctx, invalidKey, nil, true, nil, nil, nil))
|
||||||
_, countErr := store.Count(invalidKey)
|
_, countErr := store.Count(t.Context(), invalidKey)
|
||||||
expectInvalidKey("Count", countErr)
|
expectInvalidKey("Count", countErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -244,7 +244,7 @@ type Interface interface {
|
||||||
preconditions *Preconditions, tryUpdate UpdateFunc, cachedExistingObject runtime.Object) error
|
preconditions *Preconditions, tryUpdate UpdateFunc, cachedExistingObject runtime.Object) error
|
||||||
|
|
||||||
// Count returns number of different entries under the key (generally being path prefix).
|
// Count returns number of different entries under the key (generally being path prefix).
|
||||||
Count(key string) (int64, error)
|
Count(ctx context.Context, key string) (int64, error)
|
||||||
|
|
||||||
// ReadinessCheck checks if the storage is ready for accepting requests.
|
// ReadinessCheck checks if the storage is ready for accepting requests.
|
||||||
ReadinessCheck() error
|
ReadinessCheck() error
|
||||||
|
|
|
@ -3130,7 +3130,7 @@ func RunTestCount(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
resourceACountGot, err := store.Count(resourceA)
|
resourceACountGot, err := store.Count(t.Context(), resourceA)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("store.Count failed: %v", err)
|
t.Fatalf("store.Count failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue