Add FeatureSupportChecker for etcd storage

Kubernetes-commit: 70e65eee46cbf72efabe8440a92117d1fedf6497
This commit is contained in:
ah8ad3 2024-04-23 16:58:01 +03:30 committed by Kubernetes Publisher
parent 38aa2c2e10
commit 2b7a0cfef8
3 changed files with 433 additions and 0 deletions

View File

@ -0,0 +1,155 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package feature
import (
"context"
"fmt"
"sync"
clientv3 "go.etcd.io/etcd/client/v3"
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/apiserver/pkg/storage"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
)
var (
// Define these static versions to use for checking version of etcd, issue on kubernetes #123192
v3_4_31 = version.MustParseSemantic("3.4.31")
v3_5_0 = version.MustParseSemantic("3.5.0")
v3_5_13 = version.MustParseSemantic("3.5.13")
// DefaultFeatureSupportChecker is a shared global etcd FeatureSupportChecker.
DefaultFeatureSupportChecker FeatureSupportChecker = newDefaultFeatureSupportChecker()
)
// FeatureSupportChecker to define Supports functions.
type FeatureSupportChecker interface {
// Supports check if the feature is supported or not by checking internal cache.
// By default all calls to this function before calling CheckClient returns false.
// Returns true if all endpoints in etcd clients are supporting the feature.
Supports(feature storage.Feature) (bool, error)
// CheckClient works with etcd client to recalcualte feature support and cache it internally.
// All etcd clients should support feature to cause `Supports` return true.
// If client A supports and client B doesn't support the feature, the `Supports` will
// first return true at client A initializtion and then return false on client B
// initialzation, it can flip the support at runtime.
CheckClient(ctx context.Context, c client, feature storage.Feature) error
}
type defaultFeatureSupportChecker struct {
lock sync.Mutex
progressNotifySupported *bool
progresNotifyEndpointCache map[string]bool
}
func newDefaultFeatureSupportChecker() *defaultFeatureSupportChecker {
return &defaultFeatureSupportChecker{
progresNotifyEndpointCache: make(map[string]bool),
}
}
// Supports can check the featue from anywhere without storage if it was cached before.
func (f *defaultFeatureSupportChecker) Supports(feature storage.Feature) (bool, error) {
switch feature {
case storage.RequestWatchProgress:
f.lock.Lock()
defer f.lock.Unlock()
return ptr.Deref(f.progressNotifySupported, false), nil
default:
return false, fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", feature)
}
}
// CheckClient accepts client and calculate the support per endpoint and caches it.
// It will return at any point if error happens or one endpoint is not supported.
func (f *defaultFeatureSupportChecker) CheckClient(ctx context.Context, c client, feature storage.Feature) error {
switch feature {
case storage.RequestWatchProgress:
return f.clientSupportsRequestWatchProgress(ctx, c)
default:
return fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", feature)
}
}
func (f *defaultFeatureSupportChecker) clientSupportsRequestWatchProgress(ctx context.Context, c client) error {
f.lock.Lock()
defer f.lock.Unlock()
for _, ep := range c.Endpoints() {
supported, err := f.supportsProgressNotifyEndpointLocked(ctx, c, ep)
if err != nil {
return err
}
if !supported {
f.progressNotifySupported = ptr.To(false)
return nil
}
}
if f.progressNotifySupported == nil && len(c.Endpoints()) > 0 {
f.progressNotifySupported = ptr.To(true)
}
return nil
}
func (f *defaultFeatureSupportChecker) supportsProgressNotifyEndpointLocked(ctx context.Context, c client, ep string) (bool, error) {
if supported, ok := f.progresNotifyEndpointCache[ep]; ok {
return supported, nil
}
supported, err := endpointSupportsRequestWatchProgress(ctx, c, ep)
if err != nil {
return false, err
}
f.progresNotifyEndpointCache[ep] = supported
return supported, nil
}
// Sub interface of etcd client.
type client interface {
// Endpoints returns list of endpoints in etcd client.
Endpoints() []string
// Status retrieves the status information from the etcd client connected to the specified endpoint.
// It takes a context.Context parameter for cancellation or timeout control.
// It returns a clientv3.StatusResponse containing the status information or an error if the operation fails.
Status(ctx context.Context, endpoint string) (*clientv3.StatusResponse, error)
}
// endpointSupportsRequestWatchProgress evaluates whether RequestWatchProgress supported by current version of etcd endpoint.
// Based on this issues:
// - https://github.com/etcd-io/etcd/issues/15220 - Fixed in etcd v3.4.25+ and v3.5.8+
// - https://github.com/etcd-io/etcd/issues/17507 - Fixed in etcd v3.4.31+ and v3.5.13+
func endpointSupportsRequestWatchProgress(ctx context.Context, c client, endpoint string) (bool, error) {
resp, err := c.Status(ctx, endpoint)
if err != nil {
return false, fmt.Errorf("failed checking etcd version, endpoint: %q: %w", endpoint, err)
}
ver, err := version.ParseSemantic(resp.Version)
if err != nil {
// Assume feature is not supported if etcd version cannot be parsed.
klog.ErrorS(err, "Failed to parse etcd version", "version", resp.Version)
return false, nil
}
if ver.LessThan(v3_4_31) || ver.AtLeast(v3_5_0) && ver.LessThan(v3_5_13) {
return false, nil
}
return true, nil
}

View File

@ -0,0 +1,272 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package feature
import (
"context"
"fmt"
"testing"
"github.com/stretchr/testify/assert"
clientv3 "go.etcd.io/etcd/client/v3"
"k8s.io/apiserver/pkg/storage"
)
type mockEndpointVersion struct {
Endpoint string
Version string
Error error
}
// MockEtcdClient is a mock implementation of the EtcdClientInterface interface.
type MockEtcdClient struct {
EndpointVersion []mockEndpointVersion
}
func (m MockEtcdClient) getEndpoints() []string {
var endpoints []string
for _, ev := range m.EndpointVersion {
endpoints = append(endpoints, ev.Endpoint)
}
return endpoints
}
func (m MockEtcdClient) getVersion(endpoint string) (string, error) {
for _, ev := range m.EndpointVersion {
if ev.Endpoint == endpoint {
return ev.Version, ev.Error
}
}
// Never should happen, unless tests having a problem.
return "", fmt.Errorf("No version found")
}
func (m *MockEtcdClient) Endpoints() []string {
return m.getEndpoints()
}
// Status returns a mock status response.
func (m *MockEtcdClient) Status(ctx context.Context, endpoint string) (*clientv3.StatusResponse, error) {
version, err := m.getVersion(endpoint)
if err != nil {
return nil, err
}
// Return a mock status response
return &clientv3.StatusResponse{
Version: version,
}, nil
}
func TestSupports(t *testing.T) {
tests := []struct {
testName string
featureName string
expectedResult bool
expectedError error
}{
{
testName: "Error with unknown feature",
featureName: "some unknown feature",
expectedError: fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", "some unknown feature"),
},
{
testName: "Error with empty feature",
featureName: "",
expectedError: fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", ""),
},
{
testName: "No error but disabled by default",
featureName: storage.RequestWatchProgress,
expectedResult: false,
},
}
for _, tt := range tests {
t.Run(tt.testName, func(t *testing.T) {
var testFeatureSupportChecker FeatureSupportChecker = newDefaultFeatureSupportChecker()
supported, err := testFeatureSupportChecker.Supports(tt.featureName)
assert.Equal(t, tt.expectedResult, supported)
assert.Equal(t, tt.expectedError, err)
})
}
}
func TestSupportsRequestWatchProgress(t *testing.T) {
type testCase struct {
endpointsVersion []mockEndpointVersion
expectedResult bool
expectedError error
}
tests := []struct {
testName string
rounds []testCase
}{
{
testName: "Disabled - default disabled",
rounds: []testCase{{expectedResult: false}},
},
{
testName: "Enabled - supported versions bound",
rounds: []testCase{
{endpointsVersion: []mockEndpointVersion{
{Version: "3.4.31", Endpoint: "localhost:2390"}},
expectedResult: true,
},
{endpointsVersion: []mockEndpointVersion{
{Version: "3.5.13", Endpoint: "localhost:2391"}},
expectedResult: true,
},
{endpointsVersion: []mockEndpointVersion{
{Version: "3.6.0", Endpoint: "localhost:2392"}},
expectedResult: true}},
},
{
testName: "Disabled - supported versions bound, 3.4.30",
rounds: []testCase{
{endpointsVersion: []mockEndpointVersion{
{Version: "3.4.30", Endpoint: "localhost:2390"}},
expectedResult: false}},
},
{
testName: "Disabled - supported versions bound, 3.5.0",
rounds: []testCase{
{endpointsVersion: []mockEndpointVersion{
{Version: "3.5.0", Endpoint: "localhost:2390"}},
expectedResult: false}},
},
{
testName: "Disabled - supported versions bound, 3.5.12",
rounds: []testCase{
{endpointsVersion: []mockEndpointVersion{
{Version: "3.5.12", Endpoint: "localhost:2390"}},
expectedResult: false}},
},
{
testName: "Disabled - disables if called with one client doesn't support it",
rounds: []testCase{
{endpointsVersion: []mockEndpointVersion{
{Version: "3.5.13", Endpoint: "localhost:2390"},
{Version: "3.5.10", Endpoint: "localhost:2391"}},
expectedResult: false}},
},
{
testName: "Disabled - disables if called with all client doesn't support it",
rounds: []testCase{
{endpointsVersion: []mockEndpointVersion{
{Version: "3.5.9", Endpoint: "localhost:2390"},
{Version: "3.5.10", Endpoint: "localhost:2391"}},
expectedResult: false}},
},
{
testName: "Enabled - if provided client has at least one endpoint that supports it and no client that doesn't",
rounds: []testCase{
{endpointsVersion: []mockEndpointVersion{
{Version: "3.4.31", Endpoint: "localhost:2390"},
{Version: "3.5.13", Endpoint: "localhost:2391"},
{Version: "3.5.14", Endpoint: "localhost:2392"},
{Version: "3.6.0", Endpoint: "localhost:2393"}},
expectedResult: true}},
},
{
testName: "Disabled - cannot be re-enabled",
rounds: []testCase{
{endpointsVersion: []mockEndpointVersion{
{Version: "3.4.0", Endpoint: "localhost:2390"},
{Version: "3.4.1", Endpoint: "localhost:2391"}},
expectedResult: false},
{endpointsVersion: []mockEndpointVersion{
{Version: "3.6.0", Endpoint: "localhost:2392"}},
expectedResult: false}},
},
{
testName: "Enabled - one client supports it and later disabled it with second client",
rounds: []testCase{
{endpointsVersion: []mockEndpointVersion{
{Version: "3.6.0", Endpoint: "localhost:2390"},
{Version: "3.5.14", Endpoint: "localhost:2391"}},
expectedResult: true},
{endpointsVersion: []mockEndpointVersion{
{Version: "3.4.0", Endpoint: "localhost:2392"}},
expectedResult: false}},
},
{
testName: "Disabled - malformed version would disable the supported cluster and can not be re-enabled again",
rounds: []testCase{
{endpointsVersion: []mockEndpointVersion{
{Version: "3.6.0", Endpoint: "localhost:2390"}},
expectedResult: true,
},
{endpointsVersion: []mockEndpointVersion{
{Version: "3.4.--aaa", Endpoint: "localhost:2392"}},
expectedResult: false},
{endpointsVersion: []mockEndpointVersion{
{Version: "3.5.13", Endpoint: "localhost:2393"}},
expectedResult: false}},
},
{
testName: "Enabled - error on first client, enabled success on second client",
rounds: []testCase{
{endpointsVersion: []mockEndpointVersion{
{Version: "3.6.0", Endpoint: "localhost:2390", Error: fmt.Errorf("some error")}},
expectedResult: false,
expectedError: fmt.Errorf("failed checking etcd version, endpoint: %q: %w", "localhost:2390", fmt.Errorf("some error")),
},
{endpointsVersion: []mockEndpointVersion{
{Version: "3.5.14", Endpoint: "localhost:2391"}},
expectedResult: true}},
},
{
testName: "Disabled - enabled success on first client, error on second client, disabled success on third client",
rounds: []testCase{
{endpointsVersion: []mockEndpointVersion{
{Version: "3.6.0", Endpoint: "localhost:2390"}},
expectedResult: true,
},
{endpointsVersion: []mockEndpointVersion{
{Version: "3.6.0", Endpoint: "localhost:2391", Error: fmt.Errorf("some error")}},
expectedResult: true,
expectedError: fmt.Errorf("failed checking etcd version, endpoint: %q: %w", "localhost:2391", fmt.Errorf("some error")),
},
{endpointsVersion: []mockEndpointVersion{
{Version: "3.5.10", Endpoint: "localhost:2392"}},
expectedResult: false}},
},
{
testName: "Disabled - client doesn't have any endpoints",
rounds: []testCase{{endpointsVersion: []mockEndpointVersion{}, expectedResult: false}},
},
}
for _, tt := range tests {
t.Run(tt.testName, func(t *testing.T) {
var testFeatureSupportChecker FeatureSupportChecker = newDefaultFeatureSupportChecker()
for _, round := range tt.rounds {
// Mock Etcd client
mockClient := &MockEtcdClient{EndpointVersion: round.endpointsVersion}
ctx := context.Background()
err := testFeatureSupportChecker.CheckClient(ctx, mockClient, storage.RequestWatchProgress)
assert.Equal(t, err, round.expectedError)
// Error of Supports already tested in TestSupports.
supported, _ := testFeatureSupportChecker.Supports(storage.RequestWatchProgress)
assert.Equal(t, supported, round.expectedResult)
}
})
}
}

View File

@ -29,6 +29,12 @@ import (
"k8s.io/apimachinery/pkg/watch"
)
// Feature is the name of each feature in storage that we check in feature_support_checker.
type Feature = string
// RequestWatchProgress is an etcd feature that may use to check if it supported or not.
var RequestWatchProgress Feature = "RequestWatchProgress"
// Versioner abstracts setting and retrieving metadata fields from database response
// onto the object ot list. It is required to maintain storage invariants - updating an
// object twice with the same data except for the ResourceVersion and SelfLink must be