diff --git a/pkg/endpoints/discovery/aggregated/etag.go b/pkg/endpoints/discovery/aggregated/etag.go index 01cc682ef..0151f6c10 100644 --- a/pkg/endpoints/discovery/aggregated/etag.go +++ b/pkg/endpoints/discovery/aggregated/etag.go @@ -54,7 +54,7 @@ func ServeHTTPWithETag( // Otherwise, we delegate to the handler for actual content // // According to documentation, An Etag within an If-None-Match - // header will be enclosed within doule quotes: + // header will be enclosed within double quotes: // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/If-None-Match#directives if clientCachedHash := req.Header.Get("If-None-Match"); quotedHash == clientCachedHash { w.WriteHeader(http.StatusNotModified) diff --git a/pkg/endpoints/discovery/aggregated/handler.go b/pkg/endpoints/discovery/aggregated/handler.go index 14497baad..0df875a82 100644 --- a/pkg/endpoints/discovery/aggregated/handler.go +++ b/pkg/endpoints/discovery/aggregated/handler.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/version" "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" + "k8s.io/apiserver/pkg/endpoints/metrics" "sync/atomic" @@ -72,6 +73,8 @@ type resourceDiscoveryManager struct { // cache is an atomic pointer to avoid the use of locks cache atomic.Pointer[cachedGroupList] + serveHTTPFunc http.HandlerFunc + // Writes protected by the lock. // List of all apigroups & resources indexed by the resource manager lock sync.RWMutex @@ -84,13 +87,26 @@ type priorityInfo struct { VersionPriority int } -func NewResourceManager() ResourceManager { +func NewResourceManager(path string) ResourceManager { scheme := runtime.NewScheme() codecs := serializer.NewCodecFactory(scheme) utilruntime.Must(apidiscoveryv2beta1.AddToScheme(scheme)) - return &resourceDiscoveryManager{serializer: codecs, versionPriorities: make(map[metav1.GroupVersion]priorityInfo)} + rdm := &resourceDiscoveryManager{ + serializer: codecs, + versionPriorities: make(map[metav1.GroupVersion]priorityInfo), + } + rdm.serveHTTPFunc = metrics.InstrumentHandlerFunc("GET", + /* group = */ "", + /* version = */ "", + /* resource = */ "", + /* subresource = */ path, + /* scope = */ "", + /* component = */ metrics.APIServerComponent, + /* deprecated */ false, + /* removedRelease */ "", + rdm.serveHTTP) + return rdm } - func (rdm *resourceDiscoveryManager) SetGroupVersionPriority(gv metav1.GroupVersion, groupPriorityMinimum, versionPriority int) { rdm.lock.Lock() defer rdm.lock.Unlock() @@ -246,6 +262,7 @@ func (rdm *resourceDiscoveryManager) RemoveGroup(groupName string) { // Prepares the api group list for serving by converting them from map into // list and sorting them according to insertion order func (rdm *resourceDiscoveryManager) calculateAPIGroupsLocked() []apidiscoveryv2beta1.APIGroupDiscovery { + regenerationCounter.Inc() // Re-order the apiGroups by their priority. groups := []apidiscoveryv2beta1.APIGroupDiscovery{} for _, group := range rdm.apiGroups { @@ -338,6 +355,10 @@ type cachedGroupList struct { } func (rdm *resourceDiscoveryManager) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + rdm.serveHTTPFunc(resp, req) +} + +func (rdm *resourceDiscoveryManager) serveHTTP(resp http.ResponseWriter, req *http.Request) { cache := rdm.fetchFromCache() response := cache.cachedResponse etag := cache.cachedResponseETag diff --git a/pkg/endpoints/discovery/aggregated/handler_test.go b/pkg/endpoints/discovery/aggregated/handler_test.go index dc62ce6b5..fb5aaf2d2 100644 --- a/pkg/endpoints/discovery/aggregated/handler_test.go +++ b/pkg/endpoints/discovery/aggregated/handler_test.go @@ -120,7 +120,7 @@ func fetchPath(handler http.Handler, acceptPrefix string, path string, etag stri // Add all builtin APIServices to the manager and check the output func TestBasicResponse(t *testing.T) { - manager := discoveryendpoint.NewResourceManager() + manager := discoveryendpoint.NewResourceManager("apis") apis := fuzzAPIGroups(1, 3, 10) manager.SetGroups(apis.Items) @@ -141,7 +141,7 @@ func TestBasicResponse(t *testing.T) { // Test that protobuf is outputted correctly func TestBasicResponseProtobuf(t *testing.T) { - manager := discoveryendpoint.NewResourceManager() + manager := discoveryendpoint.NewResourceManager("apis") apis := fuzzAPIGroups(1, 3, 10) manager.SetGroups(apis.Items) @@ -157,8 +157,8 @@ func TestBasicResponseProtobuf(t *testing.T) { // e.g.: Multiple services with the same contents should have the same etag. func TestEtagConsistent(t *testing.T) { // Create 2 managers, add a bunch of services to each - manager1 := discoveryendpoint.NewResourceManager() - manager2 := discoveryendpoint.NewResourceManager() + manager1 := discoveryendpoint.NewResourceManager("apis") + manager2 := discoveryendpoint.NewResourceManager("apis") apis := fuzzAPIGroups(1, 3, 11) manager1.SetGroups(apis.Items) @@ -231,7 +231,7 @@ func TestEtagConsistent(t *testing.T) { // Test that if a request comes in with an If-None-Match header with an incorrect // E-Tag, that fresh content is returned. func TestEtagNonMatching(t *testing.T) { - manager := discoveryendpoint.NewResourceManager() + manager := discoveryendpoint.NewResourceManager("apis") apis := fuzzAPIGroups(1, 3, 12) manager.SetGroups(apis.Items) @@ -251,7 +251,7 @@ func TestEtagNonMatching(t *testing.T) { // Test that if a request comes in with an If-None-Match header with a correct // E-Tag, that 304 Not Modified is returned func TestEtagMatching(t *testing.T) { - manager := discoveryendpoint.NewResourceManager() + manager := discoveryendpoint.NewResourceManager("apis") apis := fuzzAPIGroups(1, 3, 12) manager.SetGroups(apis.Items) @@ -273,7 +273,7 @@ func TestEtagMatching(t *testing.T) { // Test that if a request comes in with an If-None-Match header with an old // E-Tag, that fresh content is returned func TestEtagOutdated(t *testing.T) { - manager := discoveryendpoint.NewResourceManager() + manager := discoveryendpoint.NewResourceManager("apis") apis := fuzzAPIGroups(1, 3, 15) manager.SetGroups(apis.Items) @@ -301,7 +301,7 @@ func TestEtagOutdated(t *testing.T) { // Test that an api service can be added or removed func TestAddRemove(t *testing.T) { - manager := discoveryendpoint.NewResourceManager() + manager := discoveryendpoint.NewResourceManager("apis") apis := fuzzAPIGroups(1, 3, 15) for _, group := range apis.Items { for _, version := range group.Versions { @@ -331,7 +331,7 @@ func TestAddRemove(t *testing.T) { // Show that updating an existing service replaces and does not add the entry // and instead replaces it func TestUpdateService(t *testing.T) { - manager := discoveryendpoint.NewResourceManager() + manager := discoveryendpoint.NewResourceManager("apis") apis := fuzzAPIGroups(1, 3, 15) for _, group := range apis.Items { for _, version := range group.Versions { @@ -368,7 +368,7 @@ func TestUpdateService(t *testing.T) { // Show the discovery manager is capable of serving requests to multiple users // with unchanging data func TestConcurrentRequests(t *testing.T) { - manager := discoveryendpoint.NewResourceManager() + manager := discoveryendpoint.NewResourceManager("apis") apis := fuzzAPIGroups(1, 3, 15) manager.SetGroups(apis.Items) @@ -410,7 +410,7 @@ func TestConcurrentRequests(t *testing.T) { // concurrent writers without tripping up. Good to run with go '-race' detector // since there are not many "correctness" checks func TestAbuse(t *testing.T) { - manager := discoveryendpoint.NewResourceManager() + manager := discoveryendpoint.NewResourceManager("apis") numReaders := 100 numRequestsPerReader := 1000 @@ -505,7 +505,7 @@ func TestAbuse(t *testing.T) { } func TestVersionSortingNoPriority(t *testing.T) { - manager := discoveryendpoint.NewResourceManager() + manager := discoveryendpoint.NewResourceManager("apis") manager.AddGroupVersion("default", apidiscoveryv2beta1.APIVersionDiscovery{ Version: "v1alpha1", @@ -537,7 +537,7 @@ func TestVersionSortingNoPriority(t *testing.T) { } func TestVersionSortingWithPriority(t *testing.T) { - manager := discoveryendpoint.NewResourceManager() + manager := discoveryendpoint.NewResourceManager("apis") manager.AddGroupVersion("default", apidiscoveryv2beta1.APIVersionDiscovery{ Version: "v1", @@ -560,7 +560,7 @@ func TestVersionSortingWithPriority(t *testing.T) { // if two apiservices declare conflicting priorities for their group priority, take the higher one. func TestGroupVersionSortingConflictingPriority(t *testing.T) { - manager := discoveryendpoint.NewResourceManager() + manager := discoveryendpoint.NewResourceManager("apis") manager.AddGroupVersion("default", apidiscoveryv2beta1.APIVersionDiscovery{ Version: "v1", @@ -588,7 +588,7 @@ func TestGroupVersionSortingConflictingPriority(t *testing.T) { // Show that the GroupPriorityMinimum is not sticky if a higher group version is removed // after a lower one is added func TestStatelessGroupPriorityMinimum(t *testing.T) { - manager := discoveryendpoint.NewResourceManager() + manager := discoveryendpoint.NewResourceManager("apis") stableGroup := "stable.example.com" experimentalGroup := "experimental.example.com" diff --git a/pkg/endpoints/discovery/aggregated/metrics.go b/pkg/endpoints/discovery/aggregated/metrics.go new file mode 100644 index 000000000..816cf177f --- /dev/null +++ b/pkg/endpoints/discovery/aggregated/metrics.go @@ -0,0 +1,36 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aggregated + +import ( + "k8s.io/component-base/metrics" + "k8s.io/component-base/metrics/legacyregistry" +) + +var ( + regenerationCounter = metrics.NewCounter( + &metrics.CounterOpts{ + Name: "aggregator_discovery_aggregation_count_total", + Help: "Counter of number of times discovery was aggregated", + StabilityLevel: metrics.ALPHA, + }, + ) +) + +func init() { + legacyregistry.MustRegister(regenerationCounter) +} diff --git a/pkg/endpoints/discovery/aggregated/metrics_test.go b/pkg/endpoints/discovery/aggregated/metrics_test.go new file mode 100644 index 000000000..0fb69f535 --- /dev/null +++ b/pkg/endpoints/discovery/aggregated/metrics_test.go @@ -0,0 +1,89 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aggregated_test + +import ( + "fmt" + "io" + "strings" + "testing" + + "k8s.io/component-base/metrics/legacyregistry" + "k8s.io/component-base/metrics/testutil" + + discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated" +) + +func formatExpectedMetrics(aggregationCount int) io.Reader { + expected := `` + if aggregationCount > 0 { + expected = expected + `# HELP aggregator_discovery_aggregation_count_total [ALPHA] Counter of number of times discovery was aggregated +# TYPE aggregator_discovery_aggregation_count_total counter +aggregator_discovery_aggregation_count_total %d +` + } + args := []any{} + if aggregationCount > 0 { + args = append(args, aggregationCount) + } + return strings.NewReader(fmt.Sprintf(expected, args...)) +} + +func TestBasicMetrics(t *testing.T) { + legacyregistry.Reset() + manager := discoveryendpoint.NewResourceManager("apis") + + apis := fuzzAPIGroups(1, 3, 10) + manager.SetGroups(apis.Items) + + interests := []string{"aggregator_discovery_aggregation_count_total"} + + _, _, _ = fetchPath(manager, "application/json", discoveryPath, "") + // A single fetch should aggregate and increment regeneration counter. + if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, formatExpectedMetrics(1), interests...); err != nil { + t.Fatal(err) + } + _, _, _ = fetchPath(manager, "application/json", discoveryPath, "") + // Subsequent fetches should not reaggregate discovery. + if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, formatExpectedMetrics(1), interests...); err != nil { + t.Fatal(err) + } +} + +func TestMetricsModified(t *testing.T) { + legacyregistry.Reset() + manager := discoveryendpoint.NewResourceManager("apis") + + apis := fuzzAPIGroups(1, 3, 10) + manager.SetGroups(apis.Items) + + interests := []string{"aggregator_discovery_aggregation_count_total"} + + _, _, _ = fetchPath(manager, "application/json", discoveryPath, "") + // A single fetch should aggregate and increment regeneration counter. + if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, formatExpectedMetrics(1), interests...); err != nil { + t.Fatal(err) + } + + // Update discovery document. + manager.SetGroups(fuzzAPIGroups(1, 3, 10).Items) + _, _, _ = fetchPath(manager, "application/json", discoveryPath, "") + // If the discovery content has changed, reaggregation should be performed. + if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, formatExpectedMetrics(2), interests...); err != nil { + t.Fatal(err) + } +} diff --git a/pkg/server/config.go b/pkg/server/config.go index cff577096..943b4d2ca 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -738,10 +738,10 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) { manager := c.AggregatedDiscoveryGroupManager if manager == nil { - manager = discoveryendpoint.NewResourceManager() + manager = discoveryendpoint.NewResourceManager("apis") } s.AggregatedDiscoveryGroupManager = manager - s.AggregatedLegacyDiscoveryGroupManager = discoveryendpoint.NewResourceManager() + s.AggregatedLegacyDiscoveryGroupManager = discoveryendpoint.NewResourceManager("api") } for { if c.JSONPatchMaxCopyBytes <= 0 {