Compare commits
24 Commits
Author | SHA1 | Date |
---|---|---|
|
53ed5a1444 | |
|
b39cc9a1ad | |
|
6f8437296e | |
|
2a99607e39 | |
|
9a71f86e75 | |
|
7ce61404d9 | |
|
62d807d9f4 | |
|
1d6f7ca30c | |
|
fec9273f89 | |
|
317849749e | |
|
01e64e70cf | |
|
81a2f90a08 | |
|
70ed6fdbea | |
|
31c2b5a4bc | |
|
78c69b68c5 | |
|
1ebb228a1f | |
|
2f0f9ad33e | |
|
f36c496935 | |
|
078d9cb851 | |
|
6fe68d6713 | |
|
db03ef627c | |
|
19e03dd98c | |
|
c47cf43461 | |
|
dbb9b30940 |
14
go.mod
14
go.mod
|
@ -30,6 +30,7 @@ require (
|
|||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0
|
||||
go.opentelemetry.io/otel v1.28.0
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0
|
||||
go.opentelemetry.io/otel/metric v1.28.0
|
||||
go.opentelemetry.io/otel/sdk v1.28.0
|
||||
go.opentelemetry.io/otel/trace v1.28.0
|
||||
go.uber.org/zap v1.26.0
|
||||
|
@ -44,12 +45,12 @@ require (
|
|||
gopkg.in/evanphx/json-patch.v4 v4.12.0
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1
|
||||
gopkg.in/square/go-jose.v2 v2.6.0
|
||||
k8s.io/api v0.0.0-20240724010313-f04ea0bc861d
|
||||
k8s.io/apimachinery v0.0.0-20240720202316-95b78024e3fe
|
||||
k8s.io/client-go v0.0.0-20240724010704-ac9204c6195b
|
||||
k8s.io/component-base v0.0.0-20240722183709-6cc953a9d440
|
||||
k8s.io/api v0.31.5
|
||||
k8s.io/apimachinery v0.31.5
|
||||
k8s.io/client-go v0.31.5
|
||||
k8s.io/component-base v0.31.5
|
||||
k8s.io/klog/v2 v2.130.1
|
||||
k8s.io/kms v0.0.0-20240707024556-6e3528fa4c33
|
||||
k8s.io/kms v0.31.5
|
||||
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340
|
||||
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8
|
||||
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3
|
||||
|
@ -110,7 +111,6 @@ require (
|
|||
go.etcd.io/etcd/pkg/v3 v3.5.13 // indirect
|
||||
go.etcd.io/etcd/raft/v3 v3.5.13 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.28.0 // indirect
|
||||
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
|
||||
go.uber.org/multierr v1.11.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect
|
||||
|
@ -123,5 +123,3 @@ require (
|
|||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
)
|
||||
|
||||
replace k8s.io/api => k8s.io/api v0.0.0-20240724010313-a789efa287e8
|
||||
|
|
24
go.sum
24
go.sum
|
@ -149,8 +149,8 @@ github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J
|
|||
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
|
||||
github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA=
|
||||
github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To=
|
||||
github.com/onsi/gomega v1.19.0 h1:4ieX6qQjPP/BfC3mpsAtIGGlxTWPeA3Inl/7DtXw1tw=
|
||||
github.com/onsi/gomega v1.19.0/go.mod h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro=
|
||||
github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk=
|
||||
github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0=
|
||||
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
|
||||
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||
|
@ -370,18 +370,18 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
|||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||
k8s.io/api v0.0.0-20240724010313-a789efa287e8 h1:TISAHWnfAdn420WpN+fEHG6snbLbfaCAp3kHDoAkxIc=
|
||||
k8s.io/api v0.0.0-20240724010313-a789efa287e8/go.mod h1:ytlEzqC2wOTwYET71W7+J+k7O2V7vrDuzmNLBSpgT+k=
|
||||
k8s.io/apimachinery v0.0.0-20240720202316-95b78024e3fe h1:V9MwpYUwbKlfLKVrhpVuKWiat/LBIhm1pGB9/xdHm5Q=
|
||||
k8s.io/apimachinery v0.0.0-20240720202316-95b78024e3fe/go.mod h1:rsPdaZJfTfLsNJSQzNHQvYoTmxhoOEofxtOsF3rtsMo=
|
||||
k8s.io/client-go v0.0.0-20240724010704-ac9204c6195b h1:NTLYx38CAu+VstHvPLosqB6uSQUtSM+3Mqz2D/C5JpE=
|
||||
k8s.io/client-go v0.0.0-20240724010704-ac9204c6195b/go.mod h1:Y6CzOT21oLI4O66cjiV5oSSUgOL7gG/VCG9n8XI8OxU=
|
||||
k8s.io/component-base v0.0.0-20240722183709-6cc953a9d440 h1:14X+5sRQRsul6tLxIKTP0/DotvWlMd9DFCgMqHP1hZY=
|
||||
k8s.io/component-base v0.0.0-20240722183709-6cc953a9d440/go.mod h1:dj2Pl05aLcVMZi2NXcwv+M/WdUVPEkisFPjDze7rbSk=
|
||||
k8s.io/api v0.31.5 h1:7jP74egbPUOCLJV5KheUnwo9gz3zzUsMIj2EPkuYK1E=
|
||||
k8s.io/api v0.31.5/go.mod h1:RMyMdZG1kJjou2ng5buEti0OHlo0uFXgSzTZ/k5LeVk=
|
||||
k8s.io/apimachinery v0.31.5 h1:NxhAVGcfrSdTMx3M2v1OnvcMS7h1ZnWyt2x2z8CJJBU=
|
||||
k8s.io/apimachinery v0.31.5/go.mod h1:rsPdaZJfTfLsNJSQzNHQvYoTmxhoOEofxtOsF3rtsMo=
|
||||
k8s.io/client-go v0.31.5 h1:rmDswcUaIFAJ5vJaB82pjyqc52DgHCPv0G6af3OupO0=
|
||||
k8s.io/client-go v0.31.5/go.mod h1:js93IlRSzRHql9o9zP54N56rMR249uH4+srnSOcFLsU=
|
||||
k8s.io/component-base v0.31.5 h1:kpFiy1hI7F4Owp+o59H2CVLzmN94qwcPz+2L6wRhkqM=
|
||||
k8s.io/component-base v0.31.5/go.mod h1:OiiusrmcLz42i9VvcAd94yQIN7UzQHJxN/hXxwYzj6E=
|
||||
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=
|
||||
k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
|
||||
k8s.io/kms v0.0.0-20240707024556-6e3528fa4c33 h1:Wd/sRvKMgzuCdkZ/WQg2rg/j6NLW8eyw0RK8AhV9Hak=
|
||||
k8s.io/kms v0.0.0-20240707024556-6e3528fa4c33/go.mod h1:x2EJv5lkGE18ijjE04e8W0fVyRPfAx5flo9WdY7a1Hw=
|
||||
k8s.io/kms v0.31.5 h1:KqAHTnJDSoTt8ZSLsalz6gmwlX0lzVWAVSYcqEj6Lqw=
|
||||
k8s.io/kms v0.31.5/go.mod h1:OZKwl1fan3n3N5FFxnW5C4V3ygrah/3YXeJWS3O6+94=
|
||||
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 h1:BZqlfIlq5YbRMFko6/PM7FjZpUb45WallggurYhKGag=
|
||||
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340/go.mod h1:yD4MZYeKMBwQKVht279WycxKyM84kkAx2DPrTXaeb98=
|
||||
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 h1:pUdcCO1Lk/tbT5ztQWOBi5HBgbBP1J8+AsQnQCKsi8A=
|
||||
|
|
|
@ -101,6 +101,11 @@ const (
|
|||
// Allows authorization to use field and label selectors.
|
||||
AuthorizeWithSelectors featuregate.Feature = "AuthorizeWithSelectors"
|
||||
|
||||
// owner: @serathius
|
||||
// beta: v1.31
|
||||
// Enables concurrent watch object decoding to avoid starving watch cache when conversion webhook is installed.
|
||||
ConcurrentWatchObjectDecode featuregate.Feature = "ConcurrentWatchObjectDecode"
|
||||
|
||||
// owner: @cici37 @jpbetz
|
||||
// kep: http://kep.k8s.io/3488
|
||||
// alpha: v1.26
|
||||
|
@ -312,6 +317,7 @@ const (
|
|||
// owner: @serathius
|
||||
// kep: http://kep.k8s.io/2340
|
||||
// alpha: v1.28
|
||||
// beta: v1.31
|
||||
//
|
||||
// Allow the API server to serve consistent lists from cache
|
||||
ConsistentListFromCache featuregate.Feature = "ConsistentListFromCache"
|
||||
|
@ -360,10 +366,12 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
|
|||
|
||||
APIServerTracing: {Default: true, PreRelease: featuregate.Beta},
|
||||
|
||||
APIServingWithRoutine: {Default: true, PreRelease: featuregate.Beta},
|
||||
APIServingWithRoutine: {Default: false, PreRelease: featuregate.Alpha},
|
||||
|
||||
AuthorizeWithSelectors: {Default: false, PreRelease: featuregate.Alpha},
|
||||
|
||||
ConcurrentWatchObjectDecode: {Default: false, PreRelease: featuregate.Beta},
|
||||
|
||||
ValidatingAdmissionPolicy: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.32
|
||||
|
||||
CoordinatedLeaderElection: {Default: false, PreRelease: featuregate.Alpha},
|
||||
|
@ -414,7 +422,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
|
|||
|
||||
WatchList: {Default: false, PreRelease: featuregate.Alpha},
|
||||
|
||||
ConsistentListFromCache: {Default: false, PreRelease: featuregate.Alpha},
|
||||
ConsistentListFromCache: {Default: true, PreRelease: featuregate.Beta},
|
||||
|
||||
ZeroLimitedNominalConcurrencyShares: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.32
|
||||
}
|
||||
|
|
|
@ -1170,7 +1170,7 @@ func AuthorizeClientBearerToken(loopback *restclient.Config, authn *Authenticati
|
|||
tokens[privilegedLoopbackToken] = &user.DefaultInfo{
|
||||
Name: user.APIServerUser,
|
||||
UID: uid,
|
||||
Groups: []string{user.SystemPrivilegedGroup},
|
||||
Groups: []string{user.AllAuthenticated, user.SystemPrivilegedGroup},
|
||||
}
|
||||
|
||||
tokenAuthenticator := authenticatorfactory.NewFromTokens(tokens, authn.APIAudiences)
|
||||
|
|
|
@ -38,6 +38,7 @@ import (
|
|||
"k8s.io/apiserver/pkg/audit/policy"
|
||||
"k8s.io/apiserver/pkg/authentication/authenticator"
|
||||
"k8s.io/apiserver/pkg/authentication/user"
|
||||
"k8s.io/apiserver/pkg/authorization/authorizer"
|
||||
"k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
|
@ -83,6 +84,34 @@ func TestAuthorizeClientBearerTokenNoops(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestAuthorizeClientBearerTokenRequiredGroups(t *testing.T) {
|
||||
fakeAuthenticator := authenticator.RequestFunc(func(req *http.Request) (*authenticator.Response, bool, error) {
|
||||
return &authenticator.Response{User: &user.DefaultInfo{}}, false, nil
|
||||
})
|
||||
fakeAuthorizer := authorizer.AuthorizerFunc(func(ctx context.Context, a authorizer.Attributes) (authorizer.Decision, string, error) {
|
||||
return authorizer.DecisionAllow, "", nil
|
||||
})
|
||||
target := &rest.Config{BearerToken: "secretToken"}
|
||||
authN := &AuthenticationInfo{Authenticator: fakeAuthenticator}
|
||||
authC := &AuthorizationInfo{Authorizer: fakeAuthorizer}
|
||||
|
||||
AuthorizeClientBearerToken(target, authN, authC)
|
||||
|
||||
fakeRequest, err := http.NewRequest("", "", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
fakeRequest.Header.Set("Authorization", "bearer secretToken")
|
||||
rsp, _, err := authN.Authenticator.AuthenticateRequest(fakeRequest)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
expectedGroups := []string{user.AllAuthenticated, user.SystemPrivilegedGroup}
|
||||
if !reflect.DeepEqual(expectedGroups, rsp.User.GetGroups()) {
|
||||
t.Fatalf("unexpected groups = %v returned, expected = %v", rsp.User.GetGroups(), expectedGroups)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewWithDelegate(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
ctx, cancel := context.WithCancelCause(ctx)
|
||||
|
|
|
@ -23,7 +23,9 @@ import (
|
|||
"net"
|
||||
|
||||
"github.com/spf13/pflag"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
|
||||
"go.opentelemetry.io/otel/metric/noop"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
"go.opentelemetry.io/otel/semconv/v1.12.0"
|
||||
"google.golang.org/grpc"
|
||||
|
@ -48,6 +50,12 @@ var (
|
|||
codecs = serializer.NewCodecFactory(cfgScheme)
|
||||
)
|
||||
|
||||
func init() {
|
||||
// Prevent memory leak from OTel metrics, which we don't use:
|
||||
// https://github.com/open-telemetry/opentelemetry-go-contrib/issues/5190
|
||||
otel.SetMeterProvider(noop.NewMeterProvider())
|
||||
}
|
||||
|
||||
func init() {
|
||||
install.Install(cfgScheme)
|
||||
}
|
||||
|
|
|
@ -130,13 +130,24 @@ func emulatedStorageVersion(binaryVersionOfResource schema.GroupVersion, example
|
|||
gvks, _, err := scheme.ObjectKinds(example)
|
||||
if err != nil {
|
||||
return schema.GroupVersion{}, err
|
||||
} else if len(gvks) == 0 {
|
||||
// Probably shouldn't happen if err is non-nil
|
||||
}
|
||||
|
||||
var gvk schema.GroupVersionKind
|
||||
for _, item := range gvks {
|
||||
if item.Group != binaryVersionOfResource.Group {
|
||||
continue
|
||||
}
|
||||
|
||||
gvk = item
|
||||
break
|
||||
}
|
||||
|
||||
if len(gvk.Kind) == 0 {
|
||||
return schema.GroupVersion{}, fmt.Errorf("object %T has no GVKs registered in scheme", example)
|
||||
}
|
||||
|
||||
// VersionsForGroupKind returns versions in priority order
|
||||
versions := scheme.VersionsForGroupKind(schema.GroupKind{Group: gvks[0].Group, Kind: gvks[0].Kind})
|
||||
versions := scheme.VersionsForGroupKind(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind})
|
||||
|
||||
compatibilityVersion := effectiveVersion.MinCompatibilityVersion()
|
||||
|
||||
|
@ -148,7 +159,7 @@ func emulatedStorageVersion(binaryVersionOfResource schema.GroupVersion, example
|
|||
gvk := schema.GroupVersionKind{
|
||||
Group: gv.Group,
|
||||
Version: gv.Version,
|
||||
Kind: gvks[0].Kind,
|
||||
Kind: gvk.Kind,
|
||||
}
|
||||
|
||||
exampleOfGVK, err := scheme.New(gvk)
|
||||
|
|
|
@ -245,7 +245,7 @@ func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource, ex
|
|||
|
||||
var err error
|
||||
if backwardCompatibleInterface, ok := s.ResourceEncodingConfig.(CompatibilityResourceEncodingConfig); ok {
|
||||
codecConfig.StorageVersion, err = backwardCompatibleInterface.BackwardCompatibileStorageEncodingFor(groupResource, example)
|
||||
codecConfig.StorageVersion, err = backwardCompatibleInterface.BackwardCompatibileStorageEncodingFor(chosenStorageResource, example)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -848,7 +848,8 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
|
|||
preparedKey += "/"
|
||||
}
|
||||
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
|
||||
if resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported {
|
||||
consistentRead := resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported
|
||||
if consistentRead {
|
||||
listRV, err = storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String())
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -887,9 +888,24 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
|
|||
}
|
||||
|
||||
objs, readResourceVersion, indexUsed, err := c.listItems(ctx, listRV, preparedKey, pred, recursive)
|
||||
success := "true"
|
||||
fallback := "false"
|
||||
if err != nil {
|
||||
if consistentRead {
|
||||
if storage.IsTooLargeResourceVersion(err) {
|
||||
fallback = "true"
|
||||
err = c.storage.GetList(ctx, key, opts, listObj)
|
||||
}
|
||||
if err != nil {
|
||||
success = "false"
|
||||
}
|
||||
metrics.ConsistentReadTotal.WithLabelValues(c.resourcePrefix, success, fallback).Add(1)
|
||||
}
|
||||
return err
|
||||
}
|
||||
if consistentRead {
|
||||
metrics.ConsistentReadTotal.WithLabelValues(c.resourcePrefix, success, fallback).Add(1)
|
||||
}
|
||||
span.AddEvent("Listed items from cache", attribute.Int("count", len(objs)))
|
||||
// store pointer of eligible objects,
|
||||
// Why not directly put object in the items of listObj?
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
"reflect"
|
||||
goruntime "runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -45,10 +46,13 @@ import (
|
|||
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
||||
"k8s.io/apiserver/pkg/features"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
"k8s.io/apiserver/pkg/storage/cacher/metrics"
|
||||
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
|
||||
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
k8smetrics "k8s.io/component-base/metrics"
|
||||
"k8s.io/component-base/metrics/testutil"
|
||||
"k8s.io/utils/clock"
|
||||
testingclock "k8s.io/utils/clock/testing"
|
||||
"k8s.io/utils/pointer"
|
||||
|
@ -288,6 +292,138 @@ func testGetListCacheBypass(t *testing.T, options storage.ListOptions, expectByp
|
|||
}
|
||||
}
|
||||
|
||||
func TestConsistentReadFallback(t *testing.T) {
|
||||
tcs := []struct {
|
||||
name string
|
||||
consistentReadsEnabled bool
|
||||
watchCacheRV string
|
||||
storageRV string
|
||||
fallbackError bool
|
||||
|
||||
expectError bool
|
||||
expectRV string
|
||||
expectBlock bool
|
||||
expectRequestsToStorage int
|
||||
expectMetric string
|
||||
}{
|
||||
{
|
||||
name: "Success",
|
||||
consistentReadsEnabled: true,
|
||||
watchCacheRV: "42",
|
||||
storageRV: "42",
|
||||
expectRV: "42",
|
||||
expectRequestsToStorage: 1,
|
||||
expectMetric: `
|
||||
# HELP apiserver_watch_cache_consistent_read_total [ALPHA] Counter for consistent reads from cache.
|
||||
# TYPE apiserver_watch_cache_consistent_read_total counter
|
||||
apiserver_watch_cache_consistent_read_total{fallback="false", resource="pods", success="true"} 1
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "Fallback",
|
||||
consistentReadsEnabled: true,
|
||||
watchCacheRV: "2",
|
||||
storageRV: "42",
|
||||
expectRV: "42",
|
||||
expectBlock: true,
|
||||
expectRequestsToStorage: 2,
|
||||
expectMetric: `
|
||||
# HELP apiserver_watch_cache_consistent_read_total [ALPHA] Counter for consistent reads from cache.
|
||||
# TYPE apiserver_watch_cache_consistent_read_total counter
|
||||
apiserver_watch_cache_consistent_read_total{fallback="true", resource="pods", success="true"} 1
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "Fallback Failure",
|
||||
consistentReadsEnabled: true,
|
||||
watchCacheRV: "2",
|
||||
storageRV: "42",
|
||||
fallbackError: true,
|
||||
expectError: true,
|
||||
expectBlock: true,
|
||||
expectRequestsToStorage: 2,
|
||||
expectMetric: `
|
||||
# HELP apiserver_watch_cache_consistent_read_total [ALPHA] Counter for consistent reads from cache.
|
||||
# TYPE apiserver_watch_cache_consistent_read_total counter
|
||||
apiserver_watch_cache_consistent_read_total{fallback="true", resource="pods", success="false"} 1
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "Disabled",
|
||||
watchCacheRV: "2",
|
||||
storageRV: "42",
|
||||
expectRV: "42",
|
||||
expectRequestsToStorage: 1,
|
||||
expectMetric: ``,
|
||||
},
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, tc.consistentReadsEnabled)
|
||||
if tc.consistentReadsEnabled {
|
||||
forceRequestWatchProgressSupport(t)
|
||||
}
|
||||
|
||||
registry := k8smetrics.NewKubeRegistry()
|
||||
metrics.ConsistentReadTotal.Reset()
|
||||
if err := registry.Register(metrics.ConsistentReadTotal); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
backingStorage := &dummyStorage{}
|
||||
backingStorage.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
||||
podList := listObj.(*example.PodList)
|
||||
podList.ResourceVersion = tc.watchCacheRV
|
||||
return nil
|
||||
}
|
||||
// TODO: Use fake clock for this test to reduce execution time.
|
||||
cacher, _, err := newTestCacher(backingStorage)
|
||||
if err != nil {
|
||||
t.Fatalf("Couldn't create cacher: %v", err)
|
||||
}
|
||||
defer cacher.Stop()
|
||||
|
||||
if fmt.Sprintf("%d", cacher.watchCache.resourceVersion) != tc.watchCacheRV {
|
||||
t.Fatalf("Expected watch cache RV to equal watchCacheRV, got: %d, want: %s", cacher.watchCache.resourceVersion, tc.watchCacheRV)
|
||||
}
|
||||
requestToStorageCount := 0
|
||||
backingStorage.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
||||
requestToStorageCount += 1
|
||||
podList := listObj.(*example.PodList)
|
||||
if key == cacher.resourcePrefix {
|
||||
podList.ResourceVersion = tc.storageRV
|
||||
return nil
|
||||
}
|
||||
if tc.fallbackError {
|
||||
return errDummy
|
||||
}
|
||||
podList.ResourceVersion = tc.storageRV
|
||||
return nil
|
||||
}
|
||||
result := &example.PodList{}
|
||||
start := cacher.clock.Now()
|
||||
err = cacher.GetList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: ""}, result)
|
||||
duration := cacher.clock.Since(start)
|
||||
if (err != nil) != tc.expectError {
|
||||
t.Fatalf("Unexpected error err: %v", err)
|
||||
}
|
||||
if result.ResourceVersion != tc.expectRV {
|
||||
t.Fatalf("Unexpected List response RV, got: %q, want: %q", result.ResourceVersion, tc.expectRV)
|
||||
}
|
||||
if requestToStorageCount != tc.expectRequestsToStorage {
|
||||
t.Fatalf("Unexpected number of requests to storage, got: %d, want: %d", requestToStorageCount, tc.expectRequestsToStorage)
|
||||
}
|
||||
blocked := duration >= blockTimeout
|
||||
if blocked != tc.expectBlock {
|
||||
t.Fatalf("Unexpected block, got: %v, want: %v", blocked, tc.expectBlock)
|
||||
}
|
||||
|
||||
if err := testutil.GatherAndCompare(registry, strings.NewReader(tc.expectMetric), "apiserver_watch_cache_consistent_read_total"); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetListNonRecursiveCacheBypass(t *testing.T) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)
|
||||
backingStorage := &dummyStorage{}
|
||||
|
@ -763,6 +899,7 @@ func TestCacherDontMissEventsOnReinitialization(t *testing.T) {
|
|||
case 1:
|
||||
podList.ListMeta = metav1.ListMeta{ResourceVersion: "10"}
|
||||
default:
|
||||
t.Errorf("unexpected list call: %d", listCalls)
|
||||
err = fmt.Errorf("unexpected list call")
|
||||
}
|
||||
listCalls++
|
||||
|
@ -785,8 +922,11 @@ func TestCacherDontMissEventsOnReinitialization(t *testing.T) {
|
|||
for i := 12; i < 18; i++ {
|
||||
w.Add(makePod(i))
|
||||
}
|
||||
w.Stop()
|
||||
// Keep the watch open to avoid another reinitialization,
|
||||
// but register it for cleanup.
|
||||
t.Cleanup(func() { w.Stop() })
|
||||
default:
|
||||
t.Errorf("unexpected watch call: %d", watchCalls)
|
||||
err = fmt.Errorf("unexpected watch call")
|
||||
}
|
||||
watchCalls++
|
||||
|
@ -808,7 +948,6 @@ func TestCacherDontMissEventsOnReinitialization(t *testing.T) {
|
|||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer cancel()
|
||||
|
||||
errCh := make(chan error, concurrency)
|
||||
for i := 0; i < concurrency; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
@ -832,11 +971,11 @@ func TestCacherDontMissEventsOnReinitialization(t *testing.T) {
|
|||
}
|
||||
rv, err := strconv.Atoi(object.(*example.Pod).ResourceVersion)
|
||||
if err != nil {
|
||||
errCh <- fmt.Errorf("incorrect resource version: %v", err)
|
||||
t.Errorf("incorrect resource version: %v", err)
|
||||
return
|
||||
}
|
||||
if prevRV != -1 && prevRV+1 != rv {
|
||||
errCh <- fmt.Errorf("unexpected event received, prevRV=%d, rv=%d", prevRV, rv)
|
||||
t.Errorf("unexpected event received, prevRV=%d, rv=%d", prevRV, rv)
|
||||
return
|
||||
}
|
||||
prevRV = rv
|
||||
|
@ -845,11 +984,6 @@ func TestCacherDontMissEventsOnReinitialization(t *testing.T) {
|
|||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
close(errCh)
|
||||
|
||||
for err := range errCh {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCacherNoLeakWithMultipleWatchers(t *testing.T) {
|
||||
|
|
|
@ -167,6 +167,15 @@ var (
|
|||
StabilityLevel: compbasemetrics.ALPHA,
|
||||
Buckets: []float64{0.005, 0.025, 0.05, 0.1, 0.2, 0.4, 0.6, 0.8, 1.0, 1.25, 1.5, 2, 3},
|
||||
}, []string{"resource"})
|
||||
|
||||
ConsistentReadTotal = compbasemetrics.NewCounterVec(
|
||||
&compbasemetrics.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "consistent_read_total",
|
||||
Help: "Counter for consistent reads from cache.",
|
||||
StabilityLevel: compbasemetrics.ALPHA,
|
||||
}, []string{"resource", "success", "fallback"})
|
||||
)
|
||||
|
||||
var registerMetrics sync.Once
|
||||
|
@ -188,6 +197,7 @@ func Register() {
|
|||
legacyregistry.MustRegister(WatchCacheCapacity)
|
||||
legacyregistry.MustRegister(WatchCacheInitializations)
|
||||
legacyregistry.MustRegister(WatchCacheReadWait)
|
||||
legacyregistry.MustRegister(ConsistentReadTotal)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -46,8 +46,9 @@ import (
|
|||
|
||||
const (
|
||||
// We have set a buffer in order to reduce times of context switches.
|
||||
incomingBufSize = 100
|
||||
outgoingBufSize = 100
|
||||
incomingBufSize = 100
|
||||
outgoingBufSize = 100
|
||||
processEventConcurrency = 10
|
||||
)
|
||||
|
||||
// defaultWatcherMaxLimit is used to facilitate construction tests
|
||||
|
@ -230,8 +231,7 @@ func (wc *watchChan) run(initialEventsEndBookmarkRequired, forceInitialEvents bo
|
|||
go wc.startWatching(watchClosedCh, initialEventsEndBookmarkRequired, forceInitialEvents)
|
||||
|
||||
var resultChanWG sync.WaitGroup
|
||||
resultChanWG.Add(1)
|
||||
go wc.processEvent(&resultChanWG)
|
||||
wc.processEvents(&resultChanWG)
|
||||
|
||||
select {
|
||||
case err := <-wc.errChan:
|
||||
|
@ -424,10 +424,17 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}, initialEventsEnd
|
|||
close(watchClosedCh)
|
||||
}
|
||||
|
||||
// processEvent processes events from etcd watcher and sends results to resultChan.
|
||||
func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
|
||||
// processEvents processes events from etcd watcher and sends results to resultChan.
|
||||
func (wc *watchChan) processEvents(wg *sync.WaitGroup) {
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.ConcurrentWatchObjectDecode) {
|
||||
wc.concurrentProcessEvents(wg)
|
||||
} else {
|
||||
wg.Add(1)
|
||||
go wc.serialProcessEvents(wg)
|
||||
}
|
||||
}
|
||||
func (wc *watchChan) serialProcessEvents(wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case e := <-wc.incomingEventChan:
|
||||
|
@ -435,7 +442,7 @@ func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
|
|||
if res == nil {
|
||||
continue
|
||||
}
|
||||
if len(wc.resultChan) == outgoingBufSize {
|
||||
if len(wc.resultChan) == cap(wc.resultChan) {
|
||||
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", wc.watcher.objectType, "groupResource", wc.watcher.groupResource)
|
||||
}
|
||||
// If user couldn't receive results fast enough, we also block incoming events from watcher.
|
||||
|
@ -452,6 +459,95 @@ func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
|
|||
}
|
||||
}
|
||||
|
||||
func (wc *watchChan) concurrentProcessEvents(wg *sync.WaitGroup) {
|
||||
p := concurrentOrderedEventProcessing{
|
||||
input: wc.incomingEventChan,
|
||||
processFunc: wc.transform,
|
||||
output: wc.resultChan,
|
||||
processingQueue: make(chan chan *watch.Event, processEventConcurrency-1),
|
||||
|
||||
objectType: wc.watcher.objectType,
|
||||
groupResource: wc.watcher.groupResource,
|
||||
}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
p.scheduleEventProcessing(wc.ctx, wg)
|
||||
}()
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
p.collectEventProcessing(wc.ctx)
|
||||
}()
|
||||
}
|
||||
|
||||
type concurrentOrderedEventProcessing struct {
|
||||
input chan *event
|
||||
processFunc func(*event) *watch.Event
|
||||
output chan watch.Event
|
||||
|
||||
processingQueue chan chan *watch.Event
|
||||
// Metadata for logging
|
||||
objectType string
|
||||
groupResource schema.GroupResource
|
||||
}
|
||||
|
||||
func (p *concurrentOrderedEventProcessing) scheduleEventProcessing(ctx context.Context, wg *sync.WaitGroup) {
|
||||
var e *event
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case e = <-p.input:
|
||||
}
|
||||
processingResponse := make(chan *watch.Event, 1)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case p.processingQueue <- processingResponse:
|
||||
}
|
||||
wg.Add(1)
|
||||
go func(e *event, response chan<- *watch.Event) {
|
||||
defer wg.Done()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case response <- p.processFunc(e):
|
||||
}
|
||||
}(e, processingResponse)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *concurrentOrderedEventProcessing) collectEventProcessing(ctx context.Context) {
|
||||
var processingResponse chan *watch.Event
|
||||
var e *watch.Event
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case processingResponse = <-p.processingQueue:
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case e = <-processingResponse:
|
||||
}
|
||||
if e == nil {
|
||||
continue
|
||||
}
|
||||
if len(p.output) == cap(p.output) {
|
||||
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", p.objectType, "groupResource", p.groupResource)
|
||||
}
|
||||
// If user couldn't receive results fast enough, we also block incoming events from watcher.
|
||||
// Because storing events in local will cause more memory usage.
|
||||
// The worst case would be closing the fast watcher.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case p.output <- *e:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (wc *watchChan) filter(obj runtime.Object) bool {
|
||||
if wc.internalPred.Empty() {
|
||||
return true
|
||||
|
|
|
@ -133,6 +133,12 @@ func TestEtcdWatchSemantics(t *testing.T) {
|
|||
storagetesting.RunWatchSemantics(ctx, t, store)
|
||||
}
|
||||
|
||||
func TestEtcdWatchSemanticsWithConcurrentDecode(t *testing.T) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConcurrentWatchObjectDecode, true)
|
||||
ctx, store, _ := testSetup(t)
|
||||
storagetesting.RunWatchSemantics(ctx, t, store)
|
||||
}
|
||||
|
||||
func TestEtcdWatchSemanticInitialEventsExtended(t *testing.T) {
|
||||
ctx, store, _ := testSetup(t)
|
||||
storagetesting.RunWatchSemanticInitialEventsExtended(ctx, t, store)
|
||||
|
|
|
@ -41,6 +41,9 @@ type MutableEffectiveVersion interface {
|
|||
}
|
||||
|
||||
type effectiveVersion struct {
|
||||
// When true, BinaryVersion() returns the current binary version
|
||||
useDefaultBuildBinaryVersion atomic.Bool
|
||||
// Holds the last binary version stored in Set()
|
||||
binaryVersion atomic.Pointer[version.Version]
|
||||
// If the emulationVersion is set by the users, it could only contain major and minor versions.
|
||||
// In tests, emulationVersion could be the same as the binary version, or set directly,
|
||||
|
@ -51,6 +54,9 @@ type effectiveVersion struct {
|
|||
}
|
||||
|
||||
func (m *effectiveVersion) BinaryVersion() *version.Version {
|
||||
if m.useDefaultBuildBinaryVersion.Load() {
|
||||
return defaultBuildBinaryVersion()
|
||||
}
|
||||
return m.binaryVersion.Load()
|
||||
}
|
||||
|
||||
|
@ -89,6 +95,7 @@ func majorMinor(ver *version.Version) *version.Version {
|
|||
|
||||
func (m *effectiveVersion) Set(binaryVersion, emulationVersion, minCompatibilityVersion *version.Version) {
|
||||
m.binaryVersion.Store(binaryVersion)
|
||||
m.useDefaultBuildBinaryVersion.Store(false)
|
||||
m.emulationVersion.Store(majorMinor(emulationVersion))
|
||||
m.minCompatibilityVersion.Store(majorMinor(minCompatibilityVersion))
|
||||
}
|
||||
|
@ -104,7 +111,7 @@ func (m *effectiveVersion) SetMinCompatibilityVersion(minCompatibilityVersion *v
|
|||
func (m *effectiveVersion) Validate() []error {
|
||||
var errs []error
|
||||
// Validate only checks the major and minor versions.
|
||||
binaryVersion := m.binaryVersion.Load().WithPatch(0)
|
||||
binaryVersion := m.BinaryVersion().WithPatch(0)
|
||||
emulationVersion := m.emulationVersion.Load()
|
||||
minCompatibilityVersion := m.minCompatibilityVersion.Load()
|
||||
|
||||
|
@ -123,10 +130,11 @@ func (m *effectiveVersion) Validate() []error {
|
|||
return errs
|
||||
}
|
||||
|
||||
func newEffectiveVersion(binaryVersion *version.Version) MutableEffectiveVersion {
|
||||
func newEffectiveVersion(binaryVersion *version.Version, useDefaultBuildBinaryVersion bool) MutableEffectiveVersion {
|
||||
effective := &effectiveVersion{}
|
||||
compatVersion := binaryVersion.SubtractMinor(1)
|
||||
effective.Set(binaryVersion, binaryVersion, compatVersion)
|
||||
effective.useDefaultBuildBinaryVersion.Store(useDefaultBuildBinaryVersion)
|
||||
return effective
|
||||
}
|
||||
|
||||
|
@ -135,25 +143,29 @@ func NewEffectiveVersion(binaryVer string) MutableEffectiveVersion {
|
|||
return &effectiveVersion{}
|
||||
}
|
||||
binaryVersion := version.MustParse(binaryVer)
|
||||
return newEffectiveVersion(binaryVersion)
|
||||
return newEffectiveVersion(binaryVersion, false)
|
||||
}
|
||||
|
||||
func defaultBuildBinaryVersion() *version.Version {
|
||||
verInfo := baseversion.Get()
|
||||
return version.MustParse(verInfo.String()).WithInfo(verInfo)
|
||||
}
|
||||
|
||||
// DefaultBuildEffectiveVersion returns the MutableEffectiveVersion based on the
|
||||
// current build information.
|
||||
func DefaultBuildEffectiveVersion() MutableEffectiveVersion {
|
||||
verInfo := baseversion.Get()
|
||||
binaryVersion := version.MustParse(verInfo.String()).WithInfo(verInfo)
|
||||
binaryVersion := defaultBuildBinaryVersion()
|
||||
if binaryVersion.Major() == 0 && binaryVersion.Minor() == 0 {
|
||||
return DefaultKubeEffectiveVersion()
|
||||
}
|
||||
return newEffectiveVersion(binaryVersion)
|
||||
return newEffectiveVersion(binaryVersion, true)
|
||||
}
|
||||
|
||||
// DefaultKubeEffectiveVersion returns the MutableEffectiveVersion based on the
|
||||
// latest K8s release.
|
||||
func DefaultKubeEffectiveVersion() MutableEffectiveVersion {
|
||||
binaryVersion := version.MustParse(baseversion.DefaultKubeBinaryVersion).WithInfo(baseversion.Get())
|
||||
return newEffectiveVersion(binaryVersion)
|
||||
return newEffectiveVersion(binaryVersion, false)
|
||||
}
|
||||
|
||||
// ValidateKubeEffectiveVersion validates the EmulationVersion is equal to the binary version at 1.31 for kube components.
|
||||
|
|
|
@ -406,14 +406,14 @@ func TestTLSConfig(t *testing.T) {
|
|||
test: "server cert with SHA1 signature",
|
||||
clientCA: caCert,
|
||||
serverCert: append(append(sha1ServerCertInter, byte('\n')), caCertInter...), serverKey: serverKey,
|
||||
errRegex: "x509: cannot verify signature: insecure algorithm SHA1-RSA \\(temporarily override with GODEBUG=x509sha1=1\\)",
|
||||
errRegex: "x509: cannot verify signature: insecure algorithm SHA1-RSA",
|
||||
increaseSHA1SignatureWarnCounter: true,
|
||||
},
|
||||
{
|
||||
test: "server cert signed by an intermediate CA with SHA1 signature",
|
||||
clientCA: caCert,
|
||||
serverCert: append(append(serverCertInterSHA1, byte('\n')), caCertInterSHA1...), serverKey: serverKey,
|
||||
errRegex: "x509: cannot verify signature: insecure algorithm SHA1-RSA \\(temporarily override with GODEBUG=x509sha1=1\\)",
|
||||
errRegex: "x509: cannot verify signature: insecure algorithm SHA1-RSA",
|
||||
increaseSHA1SignatureWarnCounter: true,
|
||||
},
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue