diff --git a/go.mod b/go.mod index 14fb8eb77..3c34694a0 100644 --- a/go.mod +++ b/go.mod @@ -43,8 +43,8 @@ require ( google.golang.org/protobuf v1.33.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 gopkg.in/square/go-jose.v2 v2.6.0 - k8s.io/api v0.0.0-20240418013359-a819b1d9bd16 - k8s.io/apimachinery v0.0.0-20240418093209-8c36da9e60f7 + k8s.io/api v0.0.0-20240418093414-76b6c7ce02a4 + k8s.io/apimachinery v0.0.0-20240418133208-ea31e5150286 k8s.io/client-go v0.0.0-20240418093651-9990b0b122c9 k8s.io/component-base v0.0.0-20240418094434-57ba0489bfa6 k8s.io/klog/v2 v2.120.1 diff --git a/go.sum b/go.sum index b00b6f70f..2183a8d14 100644 --- a/go.sum +++ b/go.sum @@ -381,10 +381,10 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= -k8s.io/api v0.0.0-20240418013359-a819b1d9bd16 h1:DXJla1ulezom5N0QIRlZetozcxtRPdS7U+muHDJuiO4= -k8s.io/api v0.0.0-20240418013359-a819b1d9bd16/go.mod h1:a1YU16kjsAapUzg1LYaOqTnbMlo87NXy9bSeWjRmfoo= -k8s.io/apimachinery v0.0.0-20240418093209-8c36da9e60f7 h1:SydhMcp6AJkjqqVcd0o0uz7ntTcs/QyIgIHAFYfIm7E= -k8s.io/apimachinery v0.0.0-20240418093209-8c36da9e60f7/go.mod h1:iexa2somDaxdnj7bha06bhb43Zpa6eWH8N8dbqVjTUc= +k8s.io/api v0.0.0-20240418093414-76b6c7ce02a4 h1:Aoz6y1eO+GKFC27cTyO+VJkbaEyQsGec2IhAYYaN7IU= +k8s.io/api v0.0.0-20240418093414-76b6c7ce02a4/go.mod h1:sbi+6EMV/95qn9kNfIj4HSNs4jcBqkcBxEAD+AKHUHY= +k8s.io/apimachinery v0.0.0-20240418133208-ea31e5150286 h1:pGpFsAFMSxtFe98HpTcgDgblsARQhckNfASAgCmlXS4= +k8s.io/apimachinery v0.0.0-20240418133208-ea31e5150286/go.mod h1:iexa2somDaxdnj7bha06bhb43Zpa6eWH8N8dbqVjTUc= k8s.io/client-go v0.0.0-20240418093651-9990b0b122c9 h1:eC8SD8kFISw8xhx2kTsXpIuB4qOGtCUdnK+ciXaJeEA= k8s.io/client-go v0.0.0-20240418093651-9990b0b122c9/go.mod h1:qmgPSZQ21ke/aLcgydRX8fK48pjHfF4anbvDcixuBqM= k8s.io/component-base v0.0.0-20240418094434-57ba0489bfa6 h1:ZdeuYrtChorFLu6yEbUE48mY6xXc/gkTqd5BFenIAyk= diff --git a/pkg/admission/plugin/resourcequota/resource_access.go b/pkg/admission/plugin/resourcequota/resource_access.go index f09b46268..d189446f0 100644 --- a/pkg/admission/plugin/resourcequota/resource_access.go +++ b/pkg/admission/plugin/resourcequota/resource_access.go @@ -21,6 +21,7 @@ import ( "fmt" "time" + "golang.org/x/sync/singleflight" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -51,6 +52,7 @@ type quotaAccessor struct { // This lets us handle the case of latent caches, by looking up actual results for a namespace on cache miss/no results. // We track the lookup result here so that for repeated requests, we don't look it up very often. liveLookupCache *lru.Cache + group singleflight.Group liveTTL time.Duration // updatedQuotas holds a cache of quotas that we've updated. This is used to pull the "really latest" during back to // back quota evaluations that touch the same quota doc. This only works because we can compare etcd resourceVersions @@ -114,21 +116,23 @@ func (e *quotaAccessor) GetQuotas(namespace string) ([]corev1.ResourceQuota, err if len(items) == 0 { lruItemObj, ok := e.liveLookupCache.Get(namespace) if !ok || lruItemObj.(liveLookupEntry).expiry.Before(time.Now()) { - // TODO: If there are multiple operations at the same time and cache has just expired, - // this may cause multiple List operations being issued at the same time. - // If there is already in-flight List() for a given namespace, we should wait until - // it is finished and cache is updated instead of doing the same, also to avoid - // throttling - see #22422 for details. - liveList, err := e.client.CoreV1().ResourceQuotas(namespace).List(context.TODO(), metav1.ListOptions{}) + // use singleflight.Group to avoid flooding the apiserver with repeated + // requests. See #22422 for details. + lruItemObj, err, _ = e.group.Do(namespace, func() (interface{}, error) { + liveList, err := e.client.CoreV1().ResourceQuotas(namespace).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return nil, err + } + newEntry := liveLookupEntry{expiry: time.Now().Add(e.liveTTL)} + for i := range liveList.Items { + newEntry.items = append(newEntry.items, &liveList.Items[i]) + } + e.liveLookupCache.Add(namespace, newEntry) + return newEntry, nil + }) if err != nil { return nil, err } - newEntry := liveLookupEntry{expiry: time.Now().Add(e.liveTTL)} - for i := range liveList.Items { - newEntry.items = append(newEntry.items, &liveList.Items[i]) - } - e.liveLookupCache.Add(namespace, newEntry) - lruItemObj = newEntry } lruEntry := lruItemObj.(liveLookupEntry) for i := range lruEntry.items { diff --git a/pkg/admission/plugin/resourcequota/resource_access_test.go b/pkg/admission/plugin/resourcequota/resource_access_test.go index b784add88..9bd035a80 100644 --- a/pkg/admission/plugin/resourcequota/resource_access_test.go +++ b/pkg/admission/plugin/resourcequota/resource_access_test.go @@ -17,7 +17,10 @@ limitations under the License. package resourcequota import ( + "fmt" "reflect" + "sync" + "sync/atomic" "testing" "time" @@ -27,6 +30,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" + core "k8s.io/client-go/testing" "k8s.io/utils/lru" ) @@ -121,5 +125,122 @@ func TestLRUCacheLookup(t *testing.T) { } }) } - +} + +// TestGetQuotas ensures we do not have multiple LIST calls to the apiserver +// in-flight at any one time. This is to ensure the issue described in #22422 do +// not happen again. +func TestGetQuotas(t *testing.T) { + var ( + testNamespace1 = "test-a" + testNamespace2 = "test-b" + listCallCountTestNamespace1 int64 + listCallCountTestNamespace2 int64 + ) + resourceQuota := &corev1.ResourceQuota{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + } + + resourceQuotas := []*corev1.ResourceQuota{resourceQuota} + + kubeClient := &fake.Clientset{} + informerFactory := informers.NewSharedInformerFactory(kubeClient, 0) + + accessor, _ := newQuotaAccessor() + accessor.client = kubeClient + accessor.lister = informerFactory.Core().V1().ResourceQuotas().Lister() + + kubeClient.AddReactor("list", "resourcequotas", func(action core.Action) (bool, runtime.Object, error) { + switch action.GetNamespace() { + case testNamespace1: + atomic.AddInt64(&listCallCountTestNamespace1, 1) + case testNamespace2: + atomic.AddInt64(&listCallCountTestNamespace2, 1) + default: + t.Error("unexpected namespace") + } + + resourceQuotaList := &corev1.ResourceQuotaList{ + ListMeta: metav1.ListMeta{ + ResourceVersion: fmt.Sprintf("%d", len(resourceQuotas)), + }, + } + for i, quota := range resourceQuotas { + quota.ResourceVersion = fmt.Sprintf("%d", i) + quota.Namespace = action.GetNamespace() + resourceQuotaList.Items = append(resourceQuotaList.Items, *quota) + } + // make the handler slow so concurrent calls exercise the singleflight + time.Sleep(time.Second) + return true, resourceQuotaList, nil + }) + + wg := sync.WaitGroup{} + for i := 0; i < 10; i++ { + wg.Add(2) + // simulating concurrent calls after a cache failure + go func() { + defer wg.Done() + quotas, err := accessor.GetQuotas(testNamespace1) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(quotas) != len(resourceQuotas) { + t.Errorf("Expected %d resource quotas, got %d", len(resourceQuotas), len(quotas)) + } + for _, q := range quotas { + if q.Namespace != testNamespace1 { + t.Errorf("Expected %s namespace, got %s", testNamespace1, q.Namespace) + } + } + }() + + // simulation of different namespaces is a call for a different group key, but not shared with the first namespace + go func() { + defer wg.Done() + quotas, err := accessor.GetQuotas(testNamespace2) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(quotas) != len(resourceQuotas) { + t.Errorf("Expected %d resource quotas, got %d", len(resourceQuotas), len(quotas)) + } + for _, q := range quotas { + if q.Namespace != testNamespace2 { + t.Errorf("Expected %s namespace, got %s", testNamespace2, q.Namespace) + } + } + }() + } + + // and here we wait for all the goroutines + wg.Wait() + // since all the calls with the same namespace will be held, they must + // be caught on the singleflight group. there are two different sets of + // namespace calls hence only 2. + if listCallCountTestNamespace1 != 1 { + t.Errorf("Expected 1 resource quota call, got %d", listCallCountTestNamespace1) + } + if listCallCountTestNamespace2 != 1 { + t.Errorf("Expected 1 resource quota call, got %d", listCallCountTestNamespace2) + } + + // invalidate the cache + accessor.liveLookupCache.Remove(testNamespace1) + quotas, err := accessor.GetQuotas(testNamespace1) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if len(quotas) != len(resourceQuotas) { + t.Errorf("Expected %d resource quotas, got %d", len(resourceQuotas), len(quotas)) + } + + if listCallCountTestNamespace1 != 2 { + t.Errorf("Expected 2 resource quota call, got %d", listCallCountTestNamespace1) + } + if listCallCountTestNamespace2 != 1 { + t.Errorf("Expected 1 resource quota call, got %d", listCallCountTestNamespace2) + } }