Compare commits

...

24 Commits

Author SHA1 Message Date
Kubernetes Publisher 53ed5a1444 Update dependencies to v0.31.5 tag 2025-01-16 01:33:38 +00:00
Kubernetes Publisher b39cc9a1ad Merge pull request #129494 from MadhavJivrajani/131-go124-webhook-regex-ut
[go1.24][1.31] webhook: alter regex to account for x509sha1 GODEBUG removal

Kubernetes-commit: 1043bf333c5f5d4e7e7ae0fda464ff99f2b9b7b6
2025-01-09 08:16:31 -08:00
Madhav Jivrajani 6f8437296e webhook: alter regex to account for x509sha1 GODEBUG removal
go1.24 removes the x509sha1 GODEBUG variable, and with it the
support for SHA-1 signed certs. This commit alters the regex
in unit tests to account for that and prep for go1.24.

Signed-off-by: Madhav Jivrajani <madhav.jiv@gmail.com>

Kubernetes-commit: 774adff447f91fa56b57591a5e5584fa44375ed7
2024-12-30 13:39:17 -08:00
Kubernetes Publisher 2a99607e39 Merge pull request #128530 from wojtek-t/automated-cherry-pick-of-#128468-upstream-release-1.31
Automated cherry pick of #128468: Fix TestCacherDontMissEventsOnReinitialization test

Kubernetes-commit: f6f4d800c80835a11b8b65ed0d71b9f55acdec8f
2024-11-06 15:43:29 +00:00
Wojciech Tyczyński 9a71f86e75 Fix TestCacherDontMissEventsOnReinitialization test
Kubernetes-commit: 7682b7427171e16cc8db060ef3171cd191ff7192
2024-10-31 12:33:04 +01:00
Kubernetes Publisher 7ce61404d9 Merge pull request #127928 from p0lyn0mial/automated-cherry-pick-of-#127902-upstream-release-1.31
Automated cherry pick of #127902: server/config: assing system:apiserver user to system:authenticated group

Kubernetes-commit: 6d1bcd738fe49754c9f4dc53eac634eab41e0967
2024-11-01 04:09:16 +00:00
Kubernetes Publisher 62d807d9f4 Merge pull request #127328 from xuzhenglun/automated-cherry-pick-of-#127239-github-release-1.31
Automated cherry pick of #127239: API emulation versioning honors cohabitating resources

Kubernetes-commit: 87bc649cbb3608ed2afbfdfd830326b318564258
2024-10-10 14:23:52 +00:00
Kubernetes Publisher 1d6f7ca30c Merge pull request #126983 from dashpole/fix_mem_leak_31
Cherrypick of #126957 on 1.31: Fix memory leak from global OpenTelemetry MeterProvider

Kubernetes-commit: 36493b18490d448b485ab499c25933ee4a28d62c
2024-10-09 15:12:09 +00:00
Lukasz Szaszkiewicz fec9273f89 server/config: assing system:apiserver user to system:authenticated group
Kubernetes-commit: 32286d571dee764f30863e18de8f65f13dae6891
2024-10-07 17:39:10 +02:00
xuzhenglun 317849749e API emulation versioning honors cohabitating resources
Kubernetes-commit: 880e026a07ce18372a462ba45e386d147cae0120
2024-09-09 17:54:35 +08:00
David Ashpole 01e64e70cf update vendor
Kubernetes-commit: 924414d23200ccfc62137ef994876038cb4ef040
2024-08-29 00:08:09 +00:00
David Ashpole 81a2f90a08 fix memory leak from global MeterProvider
Kubernetes-commit: b42f8d32b52776b1331c4dc8d898aae92ca4dfe3
2024-08-28 14:20:46 +00:00
Kubernetes Publisher 70ed6fdbea Merge pull request #126670 from liggitt/automated-cherry-pick-of-#126665-upstream-release-1.31
Automated cherry pick of #126665: Restore honoring --version build ID overrides

Kubernetes-commit: 4d8e197743a6f764cdbb802e5f6d339d293a2e2e
2024-08-15 08:49:50 -07:00
Jordan Liggitt 31c2b5a4bc Restore honoring --version build ID overrides
Kubernetes-commit: 906d15782c8a65cf01f16e0e1380bfced17a2058
2024-08-13 18:48:56 -04:00
Kubernetes Publisher 78c69b68c5 Merge remote-tracking branch 'origin/master' into release-1.31
Kubernetes-commit: cb08f03faca0f486c6962db305ea2ea720c44475
2024-07-31 22:46:25 +00:00
Marek Siarkowicz 1ebb228a1f Make object transformation concurrent to remove watch cache scalability issue for conversion webhook
Test by enabling consistent list from cache in storage version migrator stress test that uses
conversion webhook that bottlenects events comming to watch cache.

Set concurrency to 10, based on maximum/average transform latency when
running stress test. In my testing max was about 60-100ms, while average
was 6-10ms.

Kubernetes-commit: bb686f203308481bcd7808f767171cdef27e12a0
2024-07-22 11:24:37 +02:00
Marek Siarkowicz 2f0f9ad33e Introduce ConcurrentWatchObjectDecode feature gate disabled by default
Kubernetes-commit: 93a10a75698075e86344ee4fdb56701309468b95
2024-07-30 16:28:48 +02:00
Kubernetes Publisher f36c496935 Merge remote-tracking branch 'origin/master' into release-1.31
Kubernetes-commit: 8855ca830f4cda22fb5f8f3e69a8922e4db260fa
2024-07-31 16:46:34 +00:00
Kubernetes Publisher 078d9cb851 Merge pull request #126469 from serathius/beta2
Move ConsistentListFromCache to Beta default again

Kubernetes-commit: eb729d1db72fc27f495ddf397289678b180926f1
2024-08-01 01:09:20 +00:00
Kubernetes Publisher 6fe68d6713 Merge remote-tracking branch 'origin/master' into release-1.31
Kubernetes-commit: f72233c6f0603d27649f8452d4a0027ddcc0825d
2024-07-31 10:46:31 +00:00
Kubernetes Publisher db03ef627c Merge pull request #126470 from benluddy/apiservingwithroutine-alpha-disabled
Move APIServingWithRoutine to alpha and disabled by default.

Kubernetes-commit: f9d2297298909c9f3a2be2e88f3c84df43f3a376
2024-08-01 01:09:17 +00:00
Marek Siarkowicz 19e03dd98c Move ConsistentListFromCache to Beta default again
This reverts commit aeb51a16e369d5b823a8ae6488d1d5e12c683516.

Kubernetes-commit: 2ca56aab87d0927e568f1d896d49692433d5d93a
2024-07-30 22:49:47 +02:00
Ben Luddy c47cf43461 Move APIServingWithRoutine to alpha and disabled by default.
Kubernetes-commit: c8380040848fcbd0a0cc06600b9d4531b65098d2
2024-07-30 16:33:31 -04:00
Marek Siarkowicz dbb9b30940 Implement fallback for consistent reads from cache
Kubernetes-commit: 35962561e44425fe5e23f19aeccba9269fab3a56
2024-07-30 18:57:22 +02:00
15 changed files with 383 additions and 55 deletions

14
go.mod
View File

@ -30,6 +30,7 @@ require (
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0
go.opentelemetry.io/otel v1.28.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0
go.opentelemetry.io/otel/metric v1.28.0
go.opentelemetry.io/otel/sdk v1.28.0
go.opentelemetry.io/otel/trace v1.28.0
go.uber.org/zap v1.26.0
@ -44,12 +45,12 @@ require (
gopkg.in/evanphx/json-patch.v4 v4.12.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
gopkg.in/square/go-jose.v2 v2.6.0
k8s.io/api v0.0.0-20240724010313-f04ea0bc861d
k8s.io/apimachinery v0.0.0-20240720202316-95b78024e3fe
k8s.io/client-go v0.0.0-20240724010704-ac9204c6195b
k8s.io/component-base v0.0.0-20240722183709-6cc953a9d440
k8s.io/api v0.31.5
k8s.io/apimachinery v0.31.5
k8s.io/client-go v0.31.5
k8s.io/component-base v0.31.5
k8s.io/klog/v2 v2.130.1
k8s.io/kms v0.0.0-20240707024556-6e3528fa4c33
k8s.io/kms v0.31.5
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3
@ -110,7 +111,6 @@ require (
go.etcd.io/etcd/pkg/v3 v3.5.13 // indirect
go.etcd.io/etcd/raft/v3 v3.5.13 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 // indirect
go.opentelemetry.io/otel/metric v1.28.0 // indirect
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect
@ -123,5 +123,3 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
replace k8s.io/api => k8s.io/api v0.0.0-20240724010313-a789efa287e8

24
go.sum
View File

@ -149,8 +149,8 @@ github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA=
github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To=
github.com/onsi/gomega v1.19.0 h1:4ieX6qQjPP/BfC3mpsAtIGGlxTWPeA3Inl/7DtXw1tw=
github.com/onsi/gomega v1.19.0/go.mod h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro=
github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk=
github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
@ -370,18 +370,18 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
k8s.io/api v0.0.0-20240724010313-a789efa287e8 h1:TISAHWnfAdn420WpN+fEHG6snbLbfaCAp3kHDoAkxIc=
k8s.io/api v0.0.0-20240724010313-a789efa287e8/go.mod h1:ytlEzqC2wOTwYET71W7+J+k7O2V7vrDuzmNLBSpgT+k=
k8s.io/apimachinery v0.0.0-20240720202316-95b78024e3fe h1:V9MwpYUwbKlfLKVrhpVuKWiat/LBIhm1pGB9/xdHm5Q=
k8s.io/apimachinery v0.0.0-20240720202316-95b78024e3fe/go.mod h1:rsPdaZJfTfLsNJSQzNHQvYoTmxhoOEofxtOsF3rtsMo=
k8s.io/client-go v0.0.0-20240724010704-ac9204c6195b h1:NTLYx38CAu+VstHvPLosqB6uSQUtSM+3Mqz2D/C5JpE=
k8s.io/client-go v0.0.0-20240724010704-ac9204c6195b/go.mod h1:Y6CzOT21oLI4O66cjiV5oSSUgOL7gG/VCG9n8XI8OxU=
k8s.io/component-base v0.0.0-20240722183709-6cc953a9d440 h1:14X+5sRQRsul6tLxIKTP0/DotvWlMd9DFCgMqHP1hZY=
k8s.io/component-base v0.0.0-20240722183709-6cc953a9d440/go.mod h1:dj2Pl05aLcVMZi2NXcwv+M/WdUVPEkisFPjDze7rbSk=
k8s.io/api v0.31.5 h1:7jP74egbPUOCLJV5KheUnwo9gz3zzUsMIj2EPkuYK1E=
k8s.io/api v0.31.5/go.mod h1:RMyMdZG1kJjou2ng5buEti0OHlo0uFXgSzTZ/k5LeVk=
k8s.io/apimachinery v0.31.5 h1:NxhAVGcfrSdTMx3M2v1OnvcMS7h1ZnWyt2x2z8CJJBU=
k8s.io/apimachinery v0.31.5/go.mod h1:rsPdaZJfTfLsNJSQzNHQvYoTmxhoOEofxtOsF3rtsMo=
k8s.io/client-go v0.31.5 h1:rmDswcUaIFAJ5vJaB82pjyqc52DgHCPv0G6af3OupO0=
k8s.io/client-go v0.31.5/go.mod h1:js93IlRSzRHql9o9zP54N56rMR249uH4+srnSOcFLsU=
k8s.io/component-base v0.31.5 h1:kpFiy1hI7F4Owp+o59H2CVLzmN94qwcPz+2L6wRhkqM=
k8s.io/component-base v0.31.5/go.mod h1:OiiusrmcLz42i9VvcAd94yQIN7UzQHJxN/hXxwYzj6E=
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=
k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
k8s.io/kms v0.0.0-20240707024556-6e3528fa4c33 h1:Wd/sRvKMgzuCdkZ/WQg2rg/j6NLW8eyw0RK8AhV9Hak=
k8s.io/kms v0.0.0-20240707024556-6e3528fa4c33/go.mod h1:x2EJv5lkGE18ijjE04e8W0fVyRPfAx5flo9WdY7a1Hw=
k8s.io/kms v0.31.5 h1:KqAHTnJDSoTt8ZSLsalz6gmwlX0lzVWAVSYcqEj6Lqw=
k8s.io/kms v0.31.5/go.mod h1:OZKwl1fan3n3N5FFxnW5C4V3ygrah/3YXeJWS3O6+94=
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 h1:BZqlfIlq5YbRMFko6/PM7FjZpUb45WallggurYhKGag=
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340/go.mod h1:yD4MZYeKMBwQKVht279WycxKyM84kkAx2DPrTXaeb98=
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 h1:pUdcCO1Lk/tbT5ztQWOBi5HBgbBP1J8+AsQnQCKsi8A=

View File

@ -101,6 +101,11 @@ const (
// Allows authorization to use field and label selectors.
AuthorizeWithSelectors featuregate.Feature = "AuthorizeWithSelectors"
// owner: @serathius
// beta: v1.31
// Enables concurrent watch object decoding to avoid starving watch cache when conversion webhook is installed.
ConcurrentWatchObjectDecode featuregate.Feature = "ConcurrentWatchObjectDecode"
// owner: @cici37 @jpbetz
// kep: http://kep.k8s.io/3488
// alpha: v1.26
@ -312,6 +317,7 @@ const (
// owner: @serathius
// kep: http://kep.k8s.io/2340
// alpha: v1.28
// beta: v1.31
//
// Allow the API server to serve consistent lists from cache
ConsistentListFromCache featuregate.Feature = "ConsistentListFromCache"
@ -360,10 +366,12 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
APIServerTracing: {Default: true, PreRelease: featuregate.Beta},
APIServingWithRoutine: {Default: true, PreRelease: featuregate.Beta},
APIServingWithRoutine: {Default: false, PreRelease: featuregate.Alpha},
AuthorizeWithSelectors: {Default: false, PreRelease: featuregate.Alpha},
ConcurrentWatchObjectDecode: {Default: false, PreRelease: featuregate.Beta},
ValidatingAdmissionPolicy: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.32
CoordinatedLeaderElection: {Default: false, PreRelease: featuregate.Alpha},
@ -414,7 +422,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
WatchList: {Default: false, PreRelease: featuregate.Alpha},
ConsistentListFromCache: {Default: false, PreRelease: featuregate.Alpha},
ConsistentListFromCache: {Default: true, PreRelease: featuregate.Beta},
ZeroLimitedNominalConcurrencyShares: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.32
}

View File

@ -1170,7 +1170,7 @@ func AuthorizeClientBearerToken(loopback *restclient.Config, authn *Authenticati
tokens[privilegedLoopbackToken] = &user.DefaultInfo{
Name: user.APIServerUser,
UID: uid,
Groups: []string{user.SystemPrivilegedGroup},
Groups: []string{user.AllAuthenticated, user.SystemPrivilegedGroup},
}
tokenAuthenticator := authenticatorfactory.NewFromTokens(tokens, authn.APIAudiences)

View File

@ -38,6 +38,7 @@ import (
"k8s.io/apiserver/pkg/audit/policy"
"k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/server/healthz"
utilfeature "k8s.io/apiserver/pkg/util/feature"
@ -83,6 +84,34 @@ func TestAuthorizeClientBearerTokenNoops(t *testing.T) {
}
}
func TestAuthorizeClientBearerTokenRequiredGroups(t *testing.T) {
fakeAuthenticator := authenticator.RequestFunc(func(req *http.Request) (*authenticator.Response, bool, error) {
return &authenticator.Response{User: &user.DefaultInfo{}}, false, nil
})
fakeAuthorizer := authorizer.AuthorizerFunc(func(ctx context.Context, a authorizer.Attributes) (authorizer.Decision, string, error) {
return authorizer.DecisionAllow, "", nil
})
target := &rest.Config{BearerToken: "secretToken"}
authN := &AuthenticationInfo{Authenticator: fakeAuthenticator}
authC := &AuthorizationInfo{Authorizer: fakeAuthorizer}
AuthorizeClientBearerToken(target, authN, authC)
fakeRequest, err := http.NewRequest("", "", nil)
if err != nil {
t.Fatal(err)
}
fakeRequest.Header.Set("Authorization", "bearer secretToken")
rsp, _, err := authN.Authenticator.AuthenticateRequest(fakeRequest)
if err != nil {
t.Fatal(err)
}
expectedGroups := []string{user.AllAuthenticated, user.SystemPrivilegedGroup}
if !reflect.DeepEqual(expectedGroups, rsp.User.GetGroups()) {
t.Fatalf("unexpected groups = %v returned, expected = %v", rsp.User.GetGroups(), expectedGroups)
}
}
func TestNewWithDelegate(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)

View File

@ -23,7 +23,9 @@ import (
"net"
"github.com/spf13/pflag"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/semconv/v1.12.0"
"google.golang.org/grpc"
@ -48,6 +50,12 @@ var (
codecs = serializer.NewCodecFactory(cfgScheme)
)
func init() {
// Prevent memory leak from OTel metrics, which we don't use:
// https://github.com/open-telemetry/opentelemetry-go-contrib/issues/5190
otel.SetMeterProvider(noop.NewMeterProvider())
}
func init() {
install.Install(cfgScheme)
}

View File

@ -130,13 +130,24 @@ func emulatedStorageVersion(binaryVersionOfResource schema.GroupVersion, example
gvks, _, err := scheme.ObjectKinds(example)
if err != nil {
return schema.GroupVersion{}, err
} else if len(gvks) == 0 {
// Probably shouldn't happen if err is non-nil
}
var gvk schema.GroupVersionKind
for _, item := range gvks {
if item.Group != binaryVersionOfResource.Group {
continue
}
gvk = item
break
}
if len(gvk.Kind) == 0 {
return schema.GroupVersion{}, fmt.Errorf("object %T has no GVKs registered in scheme", example)
}
// VersionsForGroupKind returns versions in priority order
versions := scheme.VersionsForGroupKind(schema.GroupKind{Group: gvks[0].Group, Kind: gvks[0].Kind})
versions := scheme.VersionsForGroupKind(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind})
compatibilityVersion := effectiveVersion.MinCompatibilityVersion()
@ -148,7 +159,7 @@ func emulatedStorageVersion(binaryVersionOfResource schema.GroupVersion, example
gvk := schema.GroupVersionKind{
Group: gv.Group,
Version: gv.Version,
Kind: gvks[0].Kind,
Kind: gvk.Kind,
}
exampleOfGVK, err := scheme.New(gvk)

View File

@ -245,7 +245,7 @@ func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource, ex
var err error
if backwardCompatibleInterface, ok := s.ResourceEncodingConfig.(CompatibilityResourceEncodingConfig); ok {
codecConfig.StorageVersion, err = backwardCompatibleInterface.BackwardCompatibileStorageEncodingFor(groupResource, example)
codecConfig.StorageVersion, err = backwardCompatibleInterface.BackwardCompatibileStorageEncodingFor(chosenStorageResource, example)
if err != nil {
return nil, err
}

View File

@ -848,7 +848,8 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
preparedKey += "/"
}
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
if resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported {
consistentRead := resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported
if consistentRead {
listRV, err = storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String())
if err != nil {
return err
@ -887,9 +888,24 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
}
objs, readResourceVersion, indexUsed, err := c.listItems(ctx, listRV, preparedKey, pred, recursive)
success := "true"
fallback := "false"
if err != nil {
if consistentRead {
if storage.IsTooLargeResourceVersion(err) {
fallback = "true"
err = c.storage.GetList(ctx, key, opts, listObj)
}
if err != nil {
success = "false"
}
metrics.ConsistentReadTotal.WithLabelValues(c.resourcePrefix, success, fallback).Add(1)
}
return err
}
if consistentRead {
metrics.ConsistentReadTotal.WithLabelValues(c.resourcePrefix, success, fallback).Add(1)
}
span.AddEvent("Listed items from cache", attribute.Int("count", len(objs)))
// store pointer of eligible objects,
// Why not directly put object in the items of listObj?

View File

@ -24,6 +24,7 @@ import (
"reflect"
goruntime "runtime"
"strconv"
"strings"
"sync"
"testing"
"time"
@ -45,10 +46,13 @@ import (
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/cacher/metrics"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
etcdfeature "k8s.io/apiserver/pkg/storage/feature"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
k8smetrics "k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/testutil"
"k8s.io/utils/clock"
testingclock "k8s.io/utils/clock/testing"
"k8s.io/utils/pointer"
@ -288,6 +292,138 @@ func testGetListCacheBypass(t *testing.T, options storage.ListOptions, expectByp
}
}
func TestConsistentReadFallback(t *testing.T) {
tcs := []struct {
name string
consistentReadsEnabled bool
watchCacheRV string
storageRV string
fallbackError bool
expectError bool
expectRV string
expectBlock bool
expectRequestsToStorage int
expectMetric string
}{
{
name: "Success",
consistentReadsEnabled: true,
watchCacheRV: "42",
storageRV: "42",
expectRV: "42",
expectRequestsToStorage: 1,
expectMetric: `
# HELP apiserver_watch_cache_consistent_read_total [ALPHA] Counter for consistent reads from cache.
# TYPE apiserver_watch_cache_consistent_read_total counter
apiserver_watch_cache_consistent_read_total{fallback="false", resource="pods", success="true"} 1
`,
},
{
name: "Fallback",
consistentReadsEnabled: true,
watchCacheRV: "2",
storageRV: "42",
expectRV: "42",
expectBlock: true,
expectRequestsToStorage: 2,
expectMetric: `
# HELP apiserver_watch_cache_consistent_read_total [ALPHA] Counter for consistent reads from cache.
# TYPE apiserver_watch_cache_consistent_read_total counter
apiserver_watch_cache_consistent_read_total{fallback="true", resource="pods", success="true"} 1
`,
},
{
name: "Fallback Failure",
consistentReadsEnabled: true,
watchCacheRV: "2",
storageRV: "42",
fallbackError: true,
expectError: true,
expectBlock: true,
expectRequestsToStorage: 2,
expectMetric: `
# HELP apiserver_watch_cache_consistent_read_total [ALPHA] Counter for consistent reads from cache.
# TYPE apiserver_watch_cache_consistent_read_total counter
apiserver_watch_cache_consistent_read_total{fallback="true", resource="pods", success="false"} 1
`,
},
{
name: "Disabled",
watchCacheRV: "2",
storageRV: "42",
expectRV: "42",
expectRequestsToStorage: 1,
expectMetric: ``,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, tc.consistentReadsEnabled)
if tc.consistentReadsEnabled {
forceRequestWatchProgressSupport(t)
}
registry := k8smetrics.NewKubeRegistry()
metrics.ConsistentReadTotal.Reset()
if err := registry.Register(metrics.ConsistentReadTotal); err != nil {
t.Errorf("unexpected error: %v", err)
}
backingStorage := &dummyStorage{}
backingStorage.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
podList := listObj.(*example.PodList)
podList.ResourceVersion = tc.watchCacheRV
return nil
}
// TODO: Use fake clock for this test to reduce execution time.
cacher, _, err := newTestCacher(backingStorage)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
if fmt.Sprintf("%d", cacher.watchCache.resourceVersion) != tc.watchCacheRV {
t.Fatalf("Expected watch cache RV to equal watchCacheRV, got: %d, want: %s", cacher.watchCache.resourceVersion, tc.watchCacheRV)
}
requestToStorageCount := 0
backingStorage.getListFn = func(_ context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
requestToStorageCount += 1
podList := listObj.(*example.PodList)
if key == cacher.resourcePrefix {
podList.ResourceVersion = tc.storageRV
return nil
}
if tc.fallbackError {
return errDummy
}
podList.ResourceVersion = tc.storageRV
return nil
}
result := &example.PodList{}
start := cacher.clock.Now()
err = cacher.GetList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: ""}, result)
duration := cacher.clock.Since(start)
if (err != nil) != tc.expectError {
t.Fatalf("Unexpected error err: %v", err)
}
if result.ResourceVersion != tc.expectRV {
t.Fatalf("Unexpected List response RV, got: %q, want: %q", result.ResourceVersion, tc.expectRV)
}
if requestToStorageCount != tc.expectRequestsToStorage {
t.Fatalf("Unexpected number of requests to storage, got: %d, want: %d", requestToStorageCount, tc.expectRequestsToStorage)
}
blocked := duration >= blockTimeout
if blocked != tc.expectBlock {
t.Fatalf("Unexpected block, got: %v, want: %v", blocked, tc.expectBlock)
}
if err := testutil.GatherAndCompare(registry, strings.NewReader(tc.expectMetric), "apiserver_watch_cache_consistent_read_total"); err != nil {
t.Errorf("unexpected error: %v", err)
}
})
}
}
func TestGetListNonRecursiveCacheBypass(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)
backingStorage := &dummyStorage{}
@ -763,6 +899,7 @@ func TestCacherDontMissEventsOnReinitialization(t *testing.T) {
case 1:
podList.ListMeta = metav1.ListMeta{ResourceVersion: "10"}
default:
t.Errorf("unexpected list call: %d", listCalls)
err = fmt.Errorf("unexpected list call")
}
listCalls++
@ -785,8 +922,11 @@ func TestCacherDontMissEventsOnReinitialization(t *testing.T) {
for i := 12; i < 18; i++ {
w.Add(makePod(i))
}
w.Stop()
// Keep the watch open to avoid another reinitialization,
// but register it for cleanup.
t.Cleanup(func() { w.Stop() })
default:
t.Errorf("unexpected watch call: %d", watchCalls)
err = fmt.Errorf("unexpected watch call")
}
watchCalls++
@ -808,7 +948,6 @@ func TestCacherDontMissEventsOnReinitialization(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
errCh := make(chan error, concurrency)
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
@ -832,11 +971,11 @@ func TestCacherDontMissEventsOnReinitialization(t *testing.T) {
}
rv, err := strconv.Atoi(object.(*example.Pod).ResourceVersion)
if err != nil {
errCh <- fmt.Errorf("incorrect resource version: %v", err)
t.Errorf("incorrect resource version: %v", err)
return
}
if prevRV != -1 && prevRV+1 != rv {
errCh <- fmt.Errorf("unexpected event received, prevRV=%d, rv=%d", prevRV, rv)
t.Errorf("unexpected event received, prevRV=%d, rv=%d", prevRV, rv)
return
}
prevRV = rv
@ -845,11 +984,6 @@ func TestCacherDontMissEventsOnReinitialization(t *testing.T) {
}()
}
wg.Wait()
close(errCh)
for err := range errCh {
t.Error(err)
}
}
func TestCacherNoLeakWithMultipleWatchers(t *testing.T) {

View File

@ -167,6 +167,15 @@ var (
StabilityLevel: compbasemetrics.ALPHA,
Buckets: []float64{0.005, 0.025, 0.05, 0.1, 0.2, 0.4, 0.6, 0.8, 1.0, 1.25, 1.5, 2, 3},
}, []string{"resource"})
ConsistentReadTotal = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "consistent_read_total",
Help: "Counter for consistent reads from cache.",
StabilityLevel: compbasemetrics.ALPHA,
}, []string{"resource", "success", "fallback"})
)
var registerMetrics sync.Once
@ -188,6 +197,7 @@ func Register() {
legacyregistry.MustRegister(WatchCacheCapacity)
legacyregistry.MustRegister(WatchCacheInitializations)
legacyregistry.MustRegister(WatchCacheReadWait)
legacyregistry.MustRegister(ConsistentReadTotal)
})
}

View File

@ -46,8 +46,9 @@ import (
const (
// We have set a buffer in order to reduce times of context switches.
incomingBufSize = 100
outgoingBufSize = 100
incomingBufSize = 100
outgoingBufSize = 100
processEventConcurrency = 10
)
// defaultWatcherMaxLimit is used to facilitate construction tests
@ -230,8 +231,7 @@ func (wc *watchChan) run(initialEventsEndBookmarkRequired, forceInitialEvents bo
go wc.startWatching(watchClosedCh, initialEventsEndBookmarkRequired, forceInitialEvents)
var resultChanWG sync.WaitGroup
resultChanWG.Add(1)
go wc.processEvent(&resultChanWG)
wc.processEvents(&resultChanWG)
select {
case err := <-wc.errChan:
@ -424,10 +424,17 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}, initialEventsEnd
close(watchClosedCh)
}
// processEvent processes events from etcd watcher and sends results to resultChan.
func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
// processEvents processes events from etcd watcher and sends results to resultChan.
func (wc *watchChan) processEvents(wg *sync.WaitGroup) {
if utilfeature.DefaultFeatureGate.Enabled(features.ConcurrentWatchObjectDecode) {
wc.concurrentProcessEvents(wg)
} else {
wg.Add(1)
go wc.serialProcessEvents(wg)
}
}
func (wc *watchChan) serialProcessEvents(wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case e := <-wc.incomingEventChan:
@ -435,7 +442,7 @@ func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
if res == nil {
continue
}
if len(wc.resultChan) == outgoingBufSize {
if len(wc.resultChan) == cap(wc.resultChan) {
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", wc.watcher.objectType, "groupResource", wc.watcher.groupResource)
}
// If user couldn't receive results fast enough, we also block incoming events from watcher.
@ -452,6 +459,95 @@ func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
}
}
func (wc *watchChan) concurrentProcessEvents(wg *sync.WaitGroup) {
p := concurrentOrderedEventProcessing{
input: wc.incomingEventChan,
processFunc: wc.transform,
output: wc.resultChan,
processingQueue: make(chan chan *watch.Event, processEventConcurrency-1),
objectType: wc.watcher.objectType,
groupResource: wc.watcher.groupResource,
}
wg.Add(1)
go func() {
defer wg.Done()
p.scheduleEventProcessing(wc.ctx, wg)
}()
wg.Add(1)
go func() {
defer wg.Done()
p.collectEventProcessing(wc.ctx)
}()
}
type concurrentOrderedEventProcessing struct {
input chan *event
processFunc func(*event) *watch.Event
output chan watch.Event
processingQueue chan chan *watch.Event
// Metadata for logging
objectType string
groupResource schema.GroupResource
}
func (p *concurrentOrderedEventProcessing) scheduleEventProcessing(ctx context.Context, wg *sync.WaitGroup) {
var e *event
for {
select {
case <-ctx.Done():
return
case e = <-p.input:
}
processingResponse := make(chan *watch.Event, 1)
select {
case <-ctx.Done():
return
case p.processingQueue <- processingResponse:
}
wg.Add(1)
go func(e *event, response chan<- *watch.Event) {
defer wg.Done()
select {
case <-ctx.Done():
case response <- p.processFunc(e):
}
}(e, processingResponse)
}
}
func (p *concurrentOrderedEventProcessing) collectEventProcessing(ctx context.Context) {
var processingResponse chan *watch.Event
var e *watch.Event
for {
select {
case <-ctx.Done():
return
case processingResponse = <-p.processingQueue:
}
select {
case <-ctx.Done():
return
case e = <-processingResponse:
}
if e == nil {
continue
}
if len(p.output) == cap(p.output) {
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", p.objectType, "groupResource", p.groupResource)
}
// If user couldn't receive results fast enough, we also block incoming events from watcher.
// Because storing events in local will cause more memory usage.
// The worst case would be closing the fast watcher.
select {
case <-ctx.Done():
return
case p.output <- *e:
}
}
}
func (wc *watchChan) filter(obj runtime.Object) bool {
if wc.internalPred.Empty() {
return true

View File

@ -133,6 +133,12 @@ func TestEtcdWatchSemantics(t *testing.T) {
storagetesting.RunWatchSemantics(ctx, t, store)
}
func TestEtcdWatchSemanticsWithConcurrentDecode(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConcurrentWatchObjectDecode, true)
ctx, store, _ := testSetup(t)
storagetesting.RunWatchSemantics(ctx, t, store)
}
func TestEtcdWatchSemanticInitialEventsExtended(t *testing.T) {
ctx, store, _ := testSetup(t)
storagetesting.RunWatchSemanticInitialEventsExtended(ctx, t, store)

View File

@ -41,6 +41,9 @@ type MutableEffectiveVersion interface {
}
type effectiveVersion struct {
// When true, BinaryVersion() returns the current binary version
useDefaultBuildBinaryVersion atomic.Bool
// Holds the last binary version stored in Set()
binaryVersion atomic.Pointer[version.Version]
// If the emulationVersion is set by the users, it could only contain major and minor versions.
// In tests, emulationVersion could be the same as the binary version, or set directly,
@ -51,6 +54,9 @@ type effectiveVersion struct {
}
func (m *effectiveVersion) BinaryVersion() *version.Version {
if m.useDefaultBuildBinaryVersion.Load() {
return defaultBuildBinaryVersion()
}
return m.binaryVersion.Load()
}
@ -89,6 +95,7 @@ func majorMinor(ver *version.Version) *version.Version {
func (m *effectiveVersion) Set(binaryVersion, emulationVersion, minCompatibilityVersion *version.Version) {
m.binaryVersion.Store(binaryVersion)
m.useDefaultBuildBinaryVersion.Store(false)
m.emulationVersion.Store(majorMinor(emulationVersion))
m.minCompatibilityVersion.Store(majorMinor(minCompatibilityVersion))
}
@ -104,7 +111,7 @@ func (m *effectiveVersion) SetMinCompatibilityVersion(minCompatibilityVersion *v
func (m *effectiveVersion) Validate() []error {
var errs []error
// Validate only checks the major and minor versions.
binaryVersion := m.binaryVersion.Load().WithPatch(0)
binaryVersion := m.BinaryVersion().WithPatch(0)
emulationVersion := m.emulationVersion.Load()
minCompatibilityVersion := m.minCompatibilityVersion.Load()
@ -123,10 +130,11 @@ func (m *effectiveVersion) Validate() []error {
return errs
}
func newEffectiveVersion(binaryVersion *version.Version) MutableEffectiveVersion {
func newEffectiveVersion(binaryVersion *version.Version, useDefaultBuildBinaryVersion bool) MutableEffectiveVersion {
effective := &effectiveVersion{}
compatVersion := binaryVersion.SubtractMinor(1)
effective.Set(binaryVersion, binaryVersion, compatVersion)
effective.useDefaultBuildBinaryVersion.Store(useDefaultBuildBinaryVersion)
return effective
}
@ -135,25 +143,29 @@ func NewEffectiveVersion(binaryVer string) MutableEffectiveVersion {
return &effectiveVersion{}
}
binaryVersion := version.MustParse(binaryVer)
return newEffectiveVersion(binaryVersion)
return newEffectiveVersion(binaryVersion, false)
}
func defaultBuildBinaryVersion() *version.Version {
verInfo := baseversion.Get()
return version.MustParse(verInfo.String()).WithInfo(verInfo)
}
// DefaultBuildEffectiveVersion returns the MutableEffectiveVersion based on the
// current build information.
func DefaultBuildEffectiveVersion() MutableEffectiveVersion {
verInfo := baseversion.Get()
binaryVersion := version.MustParse(verInfo.String()).WithInfo(verInfo)
binaryVersion := defaultBuildBinaryVersion()
if binaryVersion.Major() == 0 && binaryVersion.Minor() == 0 {
return DefaultKubeEffectiveVersion()
}
return newEffectiveVersion(binaryVersion)
return newEffectiveVersion(binaryVersion, true)
}
// DefaultKubeEffectiveVersion returns the MutableEffectiveVersion based on the
// latest K8s release.
func DefaultKubeEffectiveVersion() MutableEffectiveVersion {
binaryVersion := version.MustParse(baseversion.DefaultKubeBinaryVersion).WithInfo(baseversion.Get())
return newEffectiveVersion(binaryVersion)
return newEffectiveVersion(binaryVersion, false)
}
// ValidateKubeEffectiveVersion validates the EmulationVersion is equal to the binary version at 1.31 for kube components.

View File

@ -406,14 +406,14 @@ func TestTLSConfig(t *testing.T) {
test: "server cert with SHA1 signature",
clientCA: caCert,
serverCert: append(append(sha1ServerCertInter, byte('\n')), caCertInter...), serverKey: serverKey,
errRegex: "x509: cannot verify signature: insecure algorithm SHA1-RSA \\(temporarily override with GODEBUG=x509sha1=1\\)",
errRegex: "x509: cannot verify signature: insecure algorithm SHA1-RSA",
increaseSHA1SignatureWarnCounter: true,
},
{
test: "server cert signed by an intermediate CA with SHA1 signature",
clientCA: caCert,
serverCert: append(append(serverCertInterSHA1, byte('\n')), caCertInterSHA1...), serverKey: serverKey,
errRegex: "x509: cannot verify signature: insecure algorithm SHA1-RSA \\(temporarily override with GODEBUG=x509sha1=1\\)",
errRegex: "x509: cannot verify signature: insecure algorithm SHA1-RSA",
increaseSHA1SignatureWarnCounter: true,
},
}