Merge pull request #115630 from Jefftree/agg-discovery-metrics

Add metrics for aggregated discovery

Kubernetes-commit: 2e3c5003b96aef29e87ee24c9086ff7f06cb8886
This commit is contained in:
Kubernetes Publisher 2023-03-10 07:44:41 -08:00
commit 07f0d43ace
6 changed files with 167 additions and 21 deletions

View File

@ -54,7 +54,7 @@ func ServeHTTPWithETag(
// Otherwise, we delegate to the handler for actual content
//
// According to documentation, An Etag within an If-None-Match
// header will be enclosed within doule quotes:
// header will be enclosed within double quotes:
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/If-None-Match#directives
if clientCachedHash := req.Header.Get("If-None-Match"); quotedHash == clientCachedHash {
w.WriteHeader(http.StatusNotModified)

View File

@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/version"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/metrics"
"sync/atomic"
@ -72,6 +73,8 @@ type resourceDiscoveryManager struct {
// cache is an atomic pointer to avoid the use of locks
cache atomic.Pointer[cachedGroupList]
serveHTTPFunc http.HandlerFunc
// Writes protected by the lock.
// List of all apigroups & resources indexed by the resource manager
lock sync.RWMutex
@ -84,13 +87,26 @@ type priorityInfo struct {
VersionPriority int
}
func NewResourceManager() ResourceManager {
func NewResourceManager(path string) ResourceManager {
scheme := runtime.NewScheme()
codecs := serializer.NewCodecFactory(scheme)
utilruntime.Must(apidiscoveryv2beta1.AddToScheme(scheme))
return &resourceDiscoveryManager{serializer: codecs, versionPriorities: make(map[metav1.GroupVersion]priorityInfo)}
rdm := &resourceDiscoveryManager{
serializer: codecs,
versionPriorities: make(map[metav1.GroupVersion]priorityInfo),
}
rdm.serveHTTPFunc = metrics.InstrumentHandlerFunc("GET",
/* group = */ "",
/* version = */ "",
/* resource = */ "",
/* subresource = */ path,
/* scope = */ "",
/* component = */ metrics.APIServerComponent,
/* deprecated */ false,
/* removedRelease */ "",
rdm.serveHTTP)
return rdm
}
func (rdm *resourceDiscoveryManager) SetGroupVersionPriority(gv metav1.GroupVersion, groupPriorityMinimum, versionPriority int) {
rdm.lock.Lock()
defer rdm.lock.Unlock()
@ -246,6 +262,7 @@ func (rdm *resourceDiscoveryManager) RemoveGroup(groupName string) {
// Prepares the api group list for serving by converting them from map into
// list and sorting them according to insertion order
func (rdm *resourceDiscoveryManager) calculateAPIGroupsLocked() []apidiscoveryv2beta1.APIGroupDiscovery {
regenerationCounter.Inc()
// Re-order the apiGroups by their priority.
groups := []apidiscoveryv2beta1.APIGroupDiscovery{}
for _, group := range rdm.apiGroups {
@ -338,6 +355,10 @@ type cachedGroupList struct {
}
func (rdm *resourceDiscoveryManager) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
rdm.serveHTTPFunc(resp, req)
}
func (rdm *resourceDiscoveryManager) serveHTTP(resp http.ResponseWriter, req *http.Request) {
cache := rdm.fetchFromCache()
response := cache.cachedResponse
etag := cache.cachedResponseETag

View File

@ -120,7 +120,7 @@ func fetchPath(handler http.Handler, acceptPrefix string, path string, etag stri
// Add all builtin APIServices to the manager and check the output
func TestBasicResponse(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
manager := discoveryendpoint.NewResourceManager("apis")
apis := fuzzAPIGroups(1, 3, 10)
manager.SetGroups(apis.Items)
@ -141,7 +141,7 @@ func TestBasicResponse(t *testing.T) {
// Test that protobuf is outputted correctly
func TestBasicResponseProtobuf(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
manager := discoveryendpoint.NewResourceManager("apis")
apis := fuzzAPIGroups(1, 3, 10)
manager.SetGroups(apis.Items)
@ -157,8 +157,8 @@ func TestBasicResponseProtobuf(t *testing.T) {
// e.g.: Multiple services with the same contents should have the same etag.
func TestEtagConsistent(t *testing.T) {
// Create 2 managers, add a bunch of services to each
manager1 := discoveryendpoint.NewResourceManager()
manager2 := discoveryendpoint.NewResourceManager()
manager1 := discoveryendpoint.NewResourceManager("apis")
manager2 := discoveryendpoint.NewResourceManager("apis")
apis := fuzzAPIGroups(1, 3, 11)
manager1.SetGroups(apis.Items)
@ -231,7 +231,7 @@ func TestEtagConsistent(t *testing.T) {
// Test that if a request comes in with an If-None-Match header with an incorrect
// E-Tag, that fresh content is returned.
func TestEtagNonMatching(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
manager := discoveryendpoint.NewResourceManager("apis")
apis := fuzzAPIGroups(1, 3, 12)
manager.SetGroups(apis.Items)
@ -251,7 +251,7 @@ func TestEtagNonMatching(t *testing.T) {
// Test that if a request comes in with an If-None-Match header with a correct
// E-Tag, that 304 Not Modified is returned
func TestEtagMatching(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
manager := discoveryendpoint.NewResourceManager("apis")
apis := fuzzAPIGroups(1, 3, 12)
manager.SetGroups(apis.Items)
@ -273,7 +273,7 @@ func TestEtagMatching(t *testing.T) {
// Test that if a request comes in with an If-None-Match header with an old
// E-Tag, that fresh content is returned
func TestEtagOutdated(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
manager := discoveryendpoint.NewResourceManager("apis")
apis := fuzzAPIGroups(1, 3, 15)
manager.SetGroups(apis.Items)
@ -301,7 +301,7 @@ func TestEtagOutdated(t *testing.T) {
// Test that an api service can be added or removed
func TestAddRemove(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
manager := discoveryendpoint.NewResourceManager("apis")
apis := fuzzAPIGroups(1, 3, 15)
for _, group := range apis.Items {
for _, version := range group.Versions {
@ -331,7 +331,7 @@ func TestAddRemove(t *testing.T) {
// Show that updating an existing service replaces and does not add the entry
// and instead replaces it
func TestUpdateService(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
manager := discoveryendpoint.NewResourceManager("apis")
apis := fuzzAPIGroups(1, 3, 15)
for _, group := range apis.Items {
for _, version := range group.Versions {
@ -368,7 +368,7 @@ func TestUpdateService(t *testing.T) {
// Show the discovery manager is capable of serving requests to multiple users
// with unchanging data
func TestConcurrentRequests(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
manager := discoveryendpoint.NewResourceManager("apis")
apis := fuzzAPIGroups(1, 3, 15)
manager.SetGroups(apis.Items)
@ -410,7 +410,7 @@ func TestConcurrentRequests(t *testing.T) {
// concurrent writers without tripping up. Good to run with go '-race' detector
// since there are not many "correctness" checks
func TestAbuse(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
manager := discoveryendpoint.NewResourceManager("apis")
numReaders := 100
numRequestsPerReader := 1000
@ -505,7 +505,7 @@ func TestAbuse(t *testing.T) {
}
func TestVersionSortingNoPriority(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
manager := discoveryendpoint.NewResourceManager("apis")
manager.AddGroupVersion("default", apidiscoveryv2beta1.APIVersionDiscovery{
Version: "v1alpha1",
@ -537,7 +537,7 @@ func TestVersionSortingNoPriority(t *testing.T) {
}
func TestVersionSortingWithPriority(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
manager := discoveryendpoint.NewResourceManager("apis")
manager.AddGroupVersion("default", apidiscoveryv2beta1.APIVersionDiscovery{
Version: "v1",
@ -560,7 +560,7 @@ func TestVersionSortingWithPriority(t *testing.T) {
// if two apiservices declare conflicting priorities for their group priority, take the higher one.
func TestGroupVersionSortingConflictingPriority(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
manager := discoveryendpoint.NewResourceManager("apis")
manager.AddGroupVersion("default", apidiscoveryv2beta1.APIVersionDiscovery{
Version: "v1",
@ -588,7 +588,7 @@ func TestGroupVersionSortingConflictingPriority(t *testing.T) {
// Show that the GroupPriorityMinimum is not sticky if a higher group version is removed
// after a lower one is added
func TestStatelessGroupPriorityMinimum(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
manager := discoveryendpoint.NewResourceManager("apis")
stableGroup := "stable.example.com"
experimentalGroup := "experimental.example.com"

View File

@ -0,0 +1,36 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aggregated
import (
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
)
var (
regenerationCounter = metrics.NewCounter(
&metrics.CounterOpts{
Name: "aggregator_discovery_aggregation_count_total",
Help: "Counter of number of times discovery was aggregated",
StabilityLevel: metrics.ALPHA,
},
)
)
func init() {
legacyregistry.MustRegister(regenerationCounter)
}

View File

@ -0,0 +1,89 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aggregated_test
import (
"fmt"
"io"
"strings"
"testing"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/component-base/metrics/testutil"
discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
)
func formatExpectedMetrics(aggregationCount int) io.Reader {
expected := ``
if aggregationCount > 0 {
expected = expected + `# HELP aggregator_discovery_aggregation_count_total [ALPHA] Counter of number of times discovery was aggregated
# TYPE aggregator_discovery_aggregation_count_total counter
aggregator_discovery_aggregation_count_total %d
`
}
args := []any{}
if aggregationCount > 0 {
args = append(args, aggregationCount)
}
return strings.NewReader(fmt.Sprintf(expected, args...))
}
func TestBasicMetrics(t *testing.T) {
legacyregistry.Reset()
manager := discoveryendpoint.NewResourceManager("apis")
apis := fuzzAPIGroups(1, 3, 10)
manager.SetGroups(apis.Items)
interests := []string{"aggregator_discovery_aggregation_count_total"}
_, _, _ = fetchPath(manager, "application/json", discoveryPath, "")
// A single fetch should aggregate and increment regeneration counter.
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, formatExpectedMetrics(1), interests...); err != nil {
t.Fatal(err)
}
_, _, _ = fetchPath(manager, "application/json", discoveryPath, "")
// Subsequent fetches should not reaggregate discovery.
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, formatExpectedMetrics(1), interests...); err != nil {
t.Fatal(err)
}
}
func TestMetricsModified(t *testing.T) {
legacyregistry.Reset()
manager := discoveryendpoint.NewResourceManager("apis")
apis := fuzzAPIGroups(1, 3, 10)
manager.SetGroups(apis.Items)
interests := []string{"aggregator_discovery_aggregation_count_total"}
_, _, _ = fetchPath(manager, "application/json", discoveryPath, "")
// A single fetch should aggregate and increment regeneration counter.
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, formatExpectedMetrics(1), interests...); err != nil {
t.Fatal(err)
}
// Update discovery document.
manager.SetGroups(fuzzAPIGroups(1, 3, 10).Items)
_, _, _ = fetchPath(manager, "application/json", discoveryPath, "")
// If the discovery content has changed, reaggregation should be performed.
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, formatExpectedMetrics(2), interests...); err != nil {
t.Fatal(err)
}
}

View File

@ -738,10 +738,10 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) {
manager := c.AggregatedDiscoveryGroupManager
if manager == nil {
manager = discoveryendpoint.NewResourceManager()
manager = discoveryendpoint.NewResourceManager("apis")
}
s.AggregatedDiscoveryGroupManager = manager
s.AggregatedLegacyDiscoveryGroupManager = discoveryendpoint.NewResourceManager()
s.AggregatedLegacyDiscoveryGroupManager = discoveryendpoint.NewResourceManager("api")
}
for {
if c.JSONPatchMaxCopyBytes <= 0 {