Merge pull request #124163 from flavianmissi/resource-quota-single-flight
resourcequota: use singleflight.Group to reduce apiserver load Kubernetes-commit: 7b33887879b56c040a6af8d445ce578accb989e1
This commit is contained in:
commit
8c72d838c6
4
go.mod
4
go.mod
|
|
@ -43,8 +43,8 @@ require (
|
||||||
google.golang.org/protobuf v1.33.0
|
google.golang.org/protobuf v1.33.0
|
||||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1
|
gopkg.in/natefinch/lumberjack.v2 v2.2.1
|
||||||
gopkg.in/square/go-jose.v2 v2.6.0
|
gopkg.in/square/go-jose.v2 v2.6.0
|
||||||
k8s.io/api v0.0.0-20240418013359-a819b1d9bd16
|
k8s.io/api v0.0.0-20240418093414-76b6c7ce02a4
|
||||||
k8s.io/apimachinery v0.0.0-20240418093209-8c36da9e60f7
|
k8s.io/apimachinery v0.0.0-20240418133208-ea31e5150286
|
||||||
k8s.io/client-go v0.0.0-20240418093651-9990b0b122c9
|
k8s.io/client-go v0.0.0-20240418093651-9990b0b122c9
|
||||||
k8s.io/component-base v0.0.0-20240418094434-57ba0489bfa6
|
k8s.io/component-base v0.0.0-20240418094434-57ba0489bfa6
|
||||||
k8s.io/klog/v2 v2.120.1
|
k8s.io/klog/v2 v2.120.1
|
||||||
|
|
|
||||||
8
go.sum
8
go.sum
|
|
@ -381,10 +381,10 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||||
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||||
k8s.io/api v0.0.0-20240418013359-a819b1d9bd16 h1:DXJla1ulezom5N0QIRlZetozcxtRPdS7U+muHDJuiO4=
|
k8s.io/api v0.0.0-20240418093414-76b6c7ce02a4 h1:Aoz6y1eO+GKFC27cTyO+VJkbaEyQsGec2IhAYYaN7IU=
|
||||||
k8s.io/api v0.0.0-20240418013359-a819b1d9bd16/go.mod h1:a1YU16kjsAapUzg1LYaOqTnbMlo87NXy9bSeWjRmfoo=
|
k8s.io/api v0.0.0-20240418093414-76b6c7ce02a4/go.mod h1:sbi+6EMV/95qn9kNfIj4HSNs4jcBqkcBxEAD+AKHUHY=
|
||||||
k8s.io/apimachinery v0.0.0-20240418093209-8c36da9e60f7 h1:SydhMcp6AJkjqqVcd0o0uz7ntTcs/QyIgIHAFYfIm7E=
|
k8s.io/apimachinery v0.0.0-20240418133208-ea31e5150286 h1:pGpFsAFMSxtFe98HpTcgDgblsARQhckNfASAgCmlXS4=
|
||||||
k8s.io/apimachinery v0.0.0-20240418093209-8c36da9e60f7/go.mod h1:iexa2somDaxdnj7bha06bhb43Zpa6eWH8N8dbqVjTUc=
|
k8s.io/apimachinery v0.0.0-20240418133208-ea31e5150286/go.mod h1:iexa2somDaxdnj7bha06bhb43Zpa6eWH8N8dbqVjTUc=
|
||||||
k8s.io/client-go v0.0.0-20240418093651-9990b0b122c9 h1:eC8SD8kFISw8xhx2kTsXpIuB4qOGtCUdnK+ciXaJeEA=
|
k8s.io/client-go v0.0.0-20240418093651-9990b0b122c9 h1:eC8SD8kFISw8xhx2kTsXpIuB4qOGtCUdnK+ciXaJeEA=
|
||||||
k8s.io/client-go v0.0.0-20240418093651-9990b0b122c9/go.mod h1:qmgPSZQ21ke/aLcgydRX8fK48pjHfF4anbvDcixuBqM=
|
k8s.io/client-go v0.0.0-20240418093651-9990b0b122c9/go.mod h1:qmgPSZQ21ke/aLcgydRX8fK48pjHfF4anbvDcixuBqM=
|
||||||
k8s.io/component-base v0.0.0-20240418094434-57ba0489bfa6 h1:ZdeuYrtChorFLu6yEbUE48mY6xXc/gkTqd5BFenIAyk=
|
k8s.io/component-base v0.0.0-20240418094434-57ba0489bfa6 h1:ZdeuYrtChorFLu6yEbUE48mY6xXc/gkTqd5BFenIAyk=
|
||||||
|
|
|
||||||
|
|
@ -21,6 +21,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/sync/singleflight"
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
|
|
@ -51,6 +52,7 @@ type quotaAccessor struct {
|
||||||
// This lets us handle the case of latent caches, by looking up actual results for a namespace on cache miss/no results.
|
// This lets us handle the case of latent caches, by looking up actual results for a namespace on cache miss/no results.
|
||||||
// We track the lookup result here so that for repeated requests, we don't look it up very often.
|
// We track the lookup result here so that for repeated requests, we don't look it up very often.
|
||||||
liveLookupCache *lru.Cache
|
liveLookupCache *lru.Cache
|
||||||
|
group singleflight.Group
|
||||||
liveTTL time.Duration
|
liveTTL time.Duration
|
||||||
// updatedQuotas holds a cache of quotas that we've updated. This is used to pull the "really latest" during back to
|
// updatedQuotas holds a cache of quotas that we've updated. This is used to pull the "really latest" during back to
|
||||||
// back quota evaluations that touch the same quota doc. This only works because we can compare etcd resourceVersions
|
// back quota evaluations that touch the same quota doc. This only works because we can compare etcd resourceVersions
|
||||||
|
|
@ -114,11 +116,9 @@ func (e *quotaAccessor) GetQuotas(namespace string) ([]corev1.ResourceQuota, err
|
||||||
if len(items) == 0 {
|
if len(items) == 0 {
|
||||||
lruItemObj, ok := e.liveLookupCache.Get(namespace)
|
lruItemObj, ok := e.liveLookupCache.Get(namespace)
|
||||||
if !ok || lruItemObj.(liveLookupEntry).expiry.Before(time.Now()) {
|
if !ok || lruItemObj.(liveLookupEntry).expiry.Before(time.Now()) {
|
||||||
// TODO: If there are multiple operations at the same time and cache has just expired,
|
// use singleflight.Group to avoid flooding the apiserver with repeated
|
||||||
// this may cause multiple List operations being issued at the same time.
|
// requests. See #22422 for details.
|
||||||
// If there is already in-flight List() for a given namespace, we should wait until
|
lruItemObj, err, _ = e.group.Do(namespace, func() (interface{}, error) {
|
||||||
// it is finished and cache is updated instead of doing the same, also to avoid
|
|
||||||
// throttling - see #22422 for details.
|
|
||||||
liveList, err := e.client.CoreV1().ResourceQuotas(namespace).List(context.TODO(), metav1.ListOptions{})
|
liveList, err := e.client.CoreV1().ResourceQuotas(namespace).List(context.TODO(), metav1.ListOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
@ -128,7 +128,11 @@ func (e *quotaAccessor) GetQuotas(namespace string) ([]corev1.ResourceQuota, err
|
||||||
newEntry.items = append(newEntry.items, &liveList.Items[i])
|
newEntry.items = append(newEntry.items, &liveList.Items[i])
|
||||||
}
|
}
|
||||||
e.liveLookupCache.Add(namespace, newEntry)
|
e.liveLookupCache.Add(namespace, newEntry)
|
||||||
lruItemObj = newEntry
|
return newEntry, nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
lruEntry := lruItemObj.(liveLookupEntry)
|
lruEntry := lruItemObj.(liveLookupEntry)
|
||||||
for i := range lruEntry.items {
|
for i := range lruEntry.items {
|
||||||
|
|
|
||||||
|
|
@ -17,7 +17,10 @@ limitations under the License.
|
||||||
package resourcequota
|
package resourcequota
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
|
@ -27,6 +30,7 @@ import (
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
"k8s.io/client-go/kubernetes/fake"
|
"k8s.io/client-go/kubernetes/fake"
|
||||||
|
core "k8s.io/client-go/testing"
|
||||||
"k8s.io/utils/lru"
|
"k8s.io/utils/lru"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -121,5 +125,122 @@ func TestLRUCacheLookup(t *testing.T) {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestGetQuotas ensures we do not have multiple LIST calls to the apiserver
|
||||||
|
// in-flight at any one time. This is to ensure the issue described in #22422 do
|
||||||
|
// not happen again.
|
||||||
|
func TestGetQuotas(t *testing.T) {
|
||||||
|
var (
|
||||||
|
testNamespace1 = "test-a"
|
||||||
|
testNamespace2 = "test-b"
|
||||||
|
listCallCountTestNamespace1 int64
|
||||||
|
listCallCountTestNamespace2 int64
|
||||||
|
)
|
||||||
|
resourceQuota := &corev1.ResourceQuota{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "foo",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
resourceQuotas := []*corev1.ResourceQuota{resourceQuota}
|
||||||
|
|
||||||
|
kubeClient := &fake.Clientset{}
|
||||||
|
informerFactory := informers.NewSharedInformerFactory(kubeClient, 0)
|
||||||
|
|
||||||
|
accessor, _ := newQuotaAccessor()
|
||||||
|
accessor.client = kubeClient
|
||||||
|
accessor.lister = informerFactory.Core().V1().ResourceQuotas().Lister()
|
||||||
|
|
||||||
|
kubeClient.AddReactor("list", "resourcequotas", func(action core.Action) (bool, runtime.Object, error) {
|
||||||
|
switch action.GetNamespace() {
|
||||||
|
case testNamespace1:
|
||||||
|
atomic.AddInt64(&listCallCountTestNamespace1, 1)
|
||||||
|
case testNamespace2:
|
||||||
|
atomic.AddInt64(&listCallCountTestNamespace2, 1)
|
||||||
|
default:
|
||||||
|
t.Error("unexpected namespace")
|
||||||
|
}
|
||||||
|
|
||||||
|
resourceQuotaList := &corev1.ResourceQuotaList{
|
||||||
|
ListMeta: metav1.ListMeta{
|
||||||
|
ResourceVersion: fmt.Sprintf("%d", len(resourceQuotas)),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for i, quota := range resourceQuotas {
|
||||||
|
quota.ResourceVersion = fmt.Sprintf("%d", i)
|
||||||
|
quota.Namespace = action.GetNamespace()
|
||||||
|
resourceQuotaList.Items = append(resourceQuotaList.Items, *quota)
|
||||||
|
}
|
||||||
|
// make the handler slow so concurrent calls exercise the singleflight
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
return true, resourceQuotaList, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
wg.Add(2)
|
||||||
|
// simulating concurrent calls after a cache failure
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
quotas, err := accessor.GetQuotas(testNamespace1)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if len(quotas) != len(resourceQuotas) {
|
||||||
|
t.Errorf("Expected %d resource quotas, got %d", len(resourceQuotas), len(quotas))
|
||||||
|
}
|
||||||
|
for _, q := range quotas {
|
||||||
|
if q.Namespace != testNamespace1 {
|
||||||
|
t.Errorf("Expected %s namespace, got %s", testNamespace1, q.Namespace)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// simulation of different namespaces is a call for a different group key, but not shared with the first namespace
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
quotas, err := accessor.GetQuotas(testNamespace2)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if len(quotas) != len(resourceQuotas) {
|
||||||
|
t.Errorf("Expected %d resource quotas, got %d", len(resourceQuotas), len(quotas))
|
||||||
|
}
|
||||||
|
for _, q := range quotas {
|
||||||
|
if q.Namespace != testNamespace2 {
|
||||||
|
t.Errorf("Expected %s namespace, got %s", testNamespace2, q.Namespace)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// and here we wait for all the goroutines
|
||||||
|
wg.Wait()
|
||||||
|
// since all the calls with the same namespace will be held, they must
|
||||||
|
// be caught on the singleflight group. there are two different sets of
|
||||||
|
// namespace calls hence only 2.
|
||||||
|
if listCallCountTestNamespace1 != 1 {
|
||||||
|
t.Errorf("Expected 1 resource quota call, got %d", listCallCountTestNamespace1)
|
||||||
|
}
|
||||||
|
if listCallCountTestNamespace2 != 1 {
|
||||||
|
t.Errorf("Expected 1 resource quota call, got %d", listCallCountTestNamespace2)
|
||||||
|
}
|
||||||
|
|
||||||
|
// invalidate the cache
|
||||||
|
accessor.liveLookupCache.Remove(testNamespace1)
|
||||||
|
quotas, err := accessor.GetQuotas(testNamespace1)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if len(quotas) != len(resourceQuotas) {
|
||||||
|
t.Errorf("Expected %d resource quotas, got %d", len(resourceQuotas), len(quotas))
|
||||||
|
}
|
||||||
|
|
||||||
|
if listCallCountTestNamespace1 != 2 {
|
||||||
|
t.Errorf("Expected 2 resource quota call, got %d", listCallCountTestNamespace1)
|
||||||
|
}
|
||||||
|
if listCallCountTestNamespace2 != 1 {
|
||||||
|
t.Errorf("Expected 1 resource quota call, got %d", listCallCountTestNamespace2)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue