From 63812e2f9bab807dbfac0ca1494fb6d04e43ed80 Mon Sep 17 00:00:00 2001 From: Alexander Zielenski Date: Mon, 20 Mar 2023 10:05:15 -0700 Subject: [PATCH] allow multiple sources to add/remove from discovery without clobbering each other Kubernetes-commit: 0740b11073d08262d4d6687ceedd8f0c71819ebd --- pkg/endpoints/discovery/aggregated/fake.go | 4 + pkg/endpoints/discovery/aggregated/handler.go | 208 +++++++++++++++--- .../discovery/aggregated/handler_test.go | 86 ++++++++ pkg/server/genericapiserver.go | 2 + 4 files changed, 267 insertions(+), 33 deletions(-) diff --git a/pkg/endpoints/discovery/aggregated/fake.go b/pkg/endpoints/discovery/aggregated/fake.go index ea5039c7c..a819fe28f 100644 --- a/pkg/endpoints/discovery/aggregated/fake.go +++ b/pkg/endpoints/discovery/aggregated/fake.go @@ -169,3 +169,7 @@ func (f *recorderResourceManager) WebService() *restful.WebService { func (f *recorderResourceManager) ServeHTTP(http.ResponseWriter, *http.Request) { panic("unimplemented") } + +func (f *recorderResourceManager) WithSource(source Source) ResourceManager { + panic("unimplemented") +} diff --git a/pkg/endpoints/discovery/aggregated/handler.go b/pkg/endpoints/discovery/aggregated/handler.go index 0df875a82..61a7fd70d 100644 --- a/pkg/endpoints/discovery/aggregated/handler.go +++ b/pkg/endpoints/discovery/aggregated/handler.go @@ -36,6 +36,15 @@ import ( "k8s.io/klog/v2" ) +type Source uint + +// The GroupVersion from the lowest Source takes precedence +const ( + AggregatorSource Source = 0 + BuiltinSource Source = 100 + CRDSource Source = 200 +) + // This handler serves the /apis endpoint for an aggregated list of // api resources indexed by their group version. type ResourceManager interface { @@ -65,9 +74,55 @@ type ResourceManager interface { // Thread-Safe SetGroups([]apidiscoveryv2beta1.APIGroupDiscovery) + // Returns the same resource manager using a different source + // The source is used to decide how to de-duplicate groups. + // The group from the least-numbered source is used + WithSource(source Source) ResourceManager + http.Handler } +type resourceManager struct { + source Source + *resourceDiscoveryManager +} + +func (rm resourceManager) AddGroupVersion(groupName string, value apidiscoveryv2beta1.APIVersionDiscovery) { + rm.resourceDiscoveryManager.AddGroupVersion(rm.source, groupName, value) +} +func (rm resourceManager) SetGroupVersionPriority(gv metav1.GroupVersion, grouppriority, versionpriority int) { + rm.resourceDiscoveryManager.SetGroupVersionPriority(rm.source, gv, grouppriority, versionpriority) +} +func (rm resourceManager) RemoveGroup(groupName string) { + rm.resourceDiscoveryManager.RemoveGroup(rm.source, groupName) +} +func (rm resourceManager) RemoveGroupVersion(gv metav1.GroupVersion) { + rm.resourceDiscoveryManager.RemoveGroupVersion(rm.source, gv) +} +func (rm resourceManager) SetGroups(groups []apidiscoveryv2beta1.APIGroupDiscovery) { + rm.resourceDiscoveryManager.SetGroups(rm.source, groups) +} + +func (rm resourceManager) WithSource(source Source) ResourceManager { + return resourceManager{ + source: source, + resourceDiscoveryManager: rm.resourceDiscoveryManager, + } +} + +type groupKey struct { + name string + + // Source identifies where this group came from and dictates which group + // among duplicates is chosen to be used for discovery. + source Source +} + +type groupVersionKey struct { + metav1.GroupVersion + source Source +} + type resourceDiscoveryManager struct { serializer runtime.NegotiatedSerializer // cache is an atomic pointer to avoid the use of locks @@ -78,8 +133,8 @@ type resourceDiscoveryManager struct { // Writes protected by the lock. // List of all apigroups & resources indexed by the resource manager lock sync.RWMutex - apiGroups map[string]*apidiscoveryv2beta1.APIGroupDiscovery - versionPriorities map[metav1.GroupVersion]priorityInfo + apiGroups map[groupKey]*apidiscoveryv2beta1.APIGroupDiscovery + versionPriorities map[groupVersionKey]priorityInfo } type priorityInfo struct { @@ -93,7 +148,7 @@ func NewResourceManager(path string) ResourceManager { utilruntime.Must(apidiscoveryv2beta1.AddToScheme(scheme)) rdm := &resourceDiscoveryManager{ serializer: codecs, - versionPriorities: make(map[metav1.GroupVersion]priorityInfo), + versionPriorities: make(map[groupVersionKey]priorityInfo), } rdm.serveHTTPFunc = metrics.InstrumentHandlerFunc("GET", /* group = */ "", @@ -105,20 +160,28 @@ func NewResourceManager(path string) ResourceManager { /* deprecated */ false, /* removedRelease */ "", rdm.serveHTTP) - return rdm + return resourceManager{ + source: BuiltinSource, + resourceDiscoveryManager: rdm, + } } -func (rdm *resourceDiscoveryManager) SetGroupVersionPriority(gv metav1.GroupVersion, groupPriorityMinimum, versionPriority int) { + +func (rdm *resourceDiscoveryManager) SetGroupVersionPriority(source Source, gv metav1.GroupVersion, groupPriorityMinimum, versionPriority int) { rdm.lock.Lock() defer rdm.lock.Unlock() - rdm.versionPriorities[gv] = priorityInfo{ + key := groupVersionKey{ + GroupVersion: gv, + source: source, + } + rdm.versionPriorities[key] = priorityInfo{ GroupPriorityMinimum: groupPriorityMinimum, VersionPriority: versionPriority, } rdm.cache.Store(nil) } -func (rdm *resourceDiscoveryManager) SetGroups(groups []apidiscoveryv2beta1.APIGroupDiscovery) { +func (rdm *resourceDiscoveryManager) SetGroups(source Source, groups []apidiscoveryv2beta1.APIGroupDiscovery) { rdm.lock.Lock() defer rdm.lock.Unlock() @@ -127,13 +190,17 @@ func (rdm *resourceDiscoveryManager) SetGroups(groups []apidiscoveryv2beta1.APIG for _, group := range groups { for _, version := range group.Versions { - rdm.addGroupVersionLocked(group.Name, version) + rdm.addGroupVersionLocked(source, group.Name, version) } } // Filter unused out priority entries for gv := range rdm.versionPriorities { - entry, exists := rdm.apiGroups[gv.Group] + key := groupKey{ + source: source, + name: gv.Group, + } + entry, exists := rdm.apiGroups[key] if !exists { delete(rdm.versionPriorities, gv) continue @@ -154,21 +221,26 @@ func (rdm *resourceDiscoveryManager) SetGroups(groups []apidiscoveryv2beta1.APIG } } -func (rdm *resourceDiscoveryManager) AddGroupVersion(groupName string, value apidiscoveryv2beta1.APIVersionDiscovery) { +func (rdm *resourceDiscoveryManager) AddGroupVersion(source Source, groupName string, value apidiscoveryv2beta1.APIVersionDiscovery) { rdm.lock.Lock() defer rdm.lock.Unlock() - rdm.addGroupVersionLocked(groupName, value) + rdm.addGroupVersionLocked(source, groupName, value) } -func (rdm *resourceDiscoveryManager) addGroupVersionLocked(groupName string, value apidiscoveryv2beta1.APIVersionDiscovery) { +func (rdm *resourceDiscoveryManager) addGroupVersionLocked(source Source, groupName string, value apidiscoveryv2beta1.APIVersionDiscovery) { klog.Infof("Adding GroupVersion %s %s to ResourceManager", groupName, value.Version) if rdm.apiGroups == nil { - rdm.apiGroups = make(map[string]*apidiscoveryv2beta1.APIGroupDiscovery) + rdm.apiGroups = make(map[groupKey]*apidiscoveryv2beta1.APIGroupDiscovery) } - if existing, groupExists := rdm.apiGroups[groupName]; groupExists { + key := groupKey{ + source: source, + name: groupName, + } + + if existing, groupExists := rdm.apiGroups[key]; groupExists { // If this version already exists, replace it versionExists := false @@ -181,6 +253,7 @@ func (rdm *resourceDiscoveryManager) addGroupVersionLocked(groupName string, val if reflect.DeepEqual(existing.Versions[i], value) { return } + existing.Versions[i] = value versionExists = true break @@ -198,12 +271,16 @@ func (rdm *resourceDiscoveryManager) addGroupVersionLocked(groupName string, val }, Versions: []apidiscoveryv2beta1.APIVersionDiscovery{value}, } - rdm.apiGroups[groupName] = group + rdm.apiGroups[key] = group } gv := metav1.GroupVersion{Group: groupName, Version: value.Version} - if _, ok := rdm.versionPriorities[gv]; !ok { - rdm.versionPriorities[gv] = priorityInfo{ + gvKey := groupVersionKey{ + GroupVersion: gv, + source: source, + } + if _, ok := rdm.versionPriorities[gvKey]; !ok { + rdm.versionPriorities[gvKey] = priorityInfo{ GroupPriorityMinimum: 1000, VersionPriority: 15, } @@ -213,10 +290,16 @@ func (rdm *resourceDiscoveryManager) addGroupVersionLocked(groupName string, val rdm.cache.Store(nil) } -func (rdm *resourceDiscoveryManager) RemoveGroupVersion(apiGroup metav1.GroupVersion) { +func (rdm *resourceDiscoveryManager) RemoveGroupVersion(source Source, apiGroup metav1.GroupVersion) { rdm.lock.Lock() defer rdm.lock.Unlock() - group, exists := rdm.apiGroups[apiGroup.Group] + + key := groupKey{ + source: source, + name: apiGroup.Group, + } + + group, exists := rdm.apiGroups[key] if !exists { return } @@ -234,23 +317,33 @@ func (rdm *resourceDiscoveryManager) RemoveGroupVersion(apiGroup metav1.GroupVer return } - delete(rdm.versionPriorities, apiGroup) + gvKey := groupVersionKey{ + GroupVersion: apiGroup, + source: source, + } + + delete(rdm.versionPriorities, gvKey) if len(group.Versions) == 0 { - delete(rdm.apiGroups, group.Name) + delete(rdm.apiGroups, key) } // Reset response document so it is recreated lazily rdm.cache.Store(nil) } -func (rdm *resourceDiscoveryManager) RemoveGroup(groupName string) { +func (rdm *resourceDiscoveryManager) RemoveGroup(source Source, groupName string) { rdm.lock.Lock() defer rdm.lock.Unlock() - delete(rdm.apiGroups, groupName) + key := groupKey{ + source: source, + name: groupName, + } + + delete(rdm.apiGroups, key) for k := range rdm.versionPriorities { - if k.Group == groupName { + if k.Group == groupName && k.source == source { delete(rdm.versionPriorities, k) } } @@ -265,17 +358,63 @@ func (rdm *resourceDiscoveryManager) calculateAPIGroupsLocked() []apidiscoveryv2 regenerationCounter.Inc() // Re-order the apiGroups by their priority. groups := []apidiscoveryv2beta1.APIGroupDiscovery{} - for _, group := range rdm.apiGroups { - copied := *group.DeepCopy() + + groupsToUse := map[string]apidiscoveryv2beta1.APIGroupDiscovery{} + sourcesUsed := map[metav1.GroupVersion]Source{} + + for key, group := range rdm.apiGroups { + if existing, ok := groupsToUse[key.name]; ok { + for _, v := range group.Versions { + gv := metav1.GroupVersion{Group: key.name, Version: v.Version} + + // Skip groupversions we've already seen before. Only DefaultSource + // takes precedence + if usedSource, seen := sourcesUsed[gv]; seen && key.source >= usedSource { + continue + } else if seen { + // Find the index of the duplicate version and replace + for i := 0; i < len(existing.Versions); i++ { + if existing.Versions[i].Version == v.Version { + existing.Versions[i] = v + break + } + } + + } else { + // New group-version, just append + existing.Versions = append(existing.Versions, v) + } + + sourcesUsed[gv] = key.source + groupsToUse[key.name] = existing + } + // Check to see if we have overlapping versions. If we do, take the one + // with highest source precedence + } else { + groupsToUse[key.name] = *group.DeepCopy() + for _, v := range group.Versions { + gv := metav1.GroupVersion{Group: key.name, Version: v.Version} + sourcesUsed[gv] = key.source + } + } + } + + for _, group := range groupsToUse { // Re-order versions based on their priority. Use kube-aware string // comparison as a tie breaker - sort.SliceStable(copied.Versions, func(i, j int) bool { - iVersion := copied.Versions[i].Version - jVersion := copied.Versions[j].Version + sort.SliceStable(group.Versions, func(i, j int) bool { + iVersion := group.Versions[i].Version + jVersion := group.Versions[j].Version - iPriority := rdm.versionPriorities[metav1.GroupVersion{Group: group.Name, Version: iVersion}].VersionPriority - jPriority := rdm.versionPriorities[metav1.GroupVersion{Group: group.Name, Version: jVersion}].VersionPriority + iGV := metav1.GroupVersion{Group: group.Name, Version: iVersion} + jGV := metav1.GroupVersion{Group: group.Name, Version: jVersion} + + iSource := sourcesUsed[iGV] + jSource := sourcesUsed[jGV] + + iPriority := rdm.versionPriorities[groupVersionKey{iGV, iSource}].VersionPriority + jPriority := rdm.versionPriorities[groupVersionKey{jGV, jSource}].VersionPriority // Sort by version string comparator if priority is equal if iPriority == jPriority { @@ -286,13 +425,16 @@ func (rdm *resourceDiscoveryManager) calculateAPIGroupsLocked() []apidiscoveryv2 return iPriority > jPriority }) - groups = append(groups, *copied.DeepCopy()) - + groups = append(groups, group) } // For each group, determine the highest minimum group priority and use that priorities := map[string]int{} for gv, info := range rdm.versionPriorities { + if source := sourcesUsed[gv.GroupVersion]; source != gv.source { + continue + } + if existing, exists := priorities[gv.Group]; exists { if existing < info.GroupPriorityMinimum { priorities[gv.Group] = info.GroupPriorityMinimum diff --git a/pkg/endpoints/discovery/aggregated/handler_test.go b/pkg/endpoints/discovery/aggregated/handler_test.go index fb5aaf2d2..7fd3cd465 100644 --- a/pkg/endpoints/discovery/aggregated/handler_test.go +++ b/pkg/endpoints/discovery/aggregated/handler_test.go @@ -18,6 +18,7 @@ package aggregated_test import ( "encoding/json" + "fmt" "math/rand" "net/http" "net/http/httptest" @@ -365,6 +366,91 @@ func TestUpdateService(t *testing.T) { assert.NotEqual(t, secondDocument, initialDocument, "should have returned expected document") } +func TestMultipleSources(t *testing.T) { + type pair struct { + manager discoveryendpoint.ResourceManager + apis apidiscoveryv2beta1.APIGroupDiscoveryList + } + + pairs := []pair{} + + defaultManager := discoveryendpoint.NewResourceManager("apis") + for i := 0; i < 10; i++ { + name := discoveryendpoint.Source(100 * i) + manager := defaultManager.WithSource(name) + apis := fuzzAPIGroups(1, 3, int64(15+i)) + + // Give the groups deterministic names + for i := range apis.Items { + apis.Items[i].Name = fmt.Sprintf("%v.%v.com", i, name) + } + + pairs = append(pairs, pair{manager, apis}) + } + + expectedResult := []apidiscoveryv2beta1.APIGroupDiscovery{} + + groupCounter := 0 + for _, p := range pairs { + for gi, g := range p.apis.Items { + for vi, v := range g.Versions { + p.manager.AddGroupVersion(g.Name, v) + + // Use index for priority so we dont have to do any sorting + // Use negative index since it is sorted descending + p.manager.SetGroupVersionPriority(metav1.GroupVersion{Group: g.Name, Version: v.Version}, -gi-groupCounter, -vi) + } + + expectedResult = append(expectedResult, g) + } + + groupCounter += len(p.apis.Items) + } + + // Show discovery document is what we expect + _, _, initialDocument := fetchPath(defaultManager, "application/json", discoveryPath, "") + + require.Len(t, initialDocument.Items, len(expectedResult)) + require.Equal(t, initialDocument.Items, expectedResult) +} + +// Shows that if you have multiple sources including Default source using +// with the same group name the groups added by the "Default" source are used +func TestSourcePrecedence(t *testing.T) { + defaultManager := discoveryendpoint.NewResourceManager("apis") + otherManager := defaultManager.WithSource(500) + apis := fuzzAPIGroups(1, 3, int64(15)) + for _, g := range apis.Items { + for i, v := range g.Versions { + v.Freshness = apidiscoveryv2beta1.DiscoveryFreshnessCurrent + g.Versions[i] = v + otherManager.AddGroupVersion(g.Name, v) + } + } + + _, _, initialDocument := fetchPath(defaultManager, "application/json", discoveryPath, "") + require.Equal(t, apis.Items, initialDocument.Items) + + // Add the first groupversion under default. + // No versions should appear in discovery document except this one + overrideVersion := initialDocument.Items[0].Versions[0] + overrideVersion.Freshness = apidiscoveryv2beta1.DiscoveryFreshnessStale + defaultManager.AddGroupVersion(initialDocument.Items[0].Name, overrideVersion) + + _, _, maskedDocument := fetchPath(defaultManager, "application/json", discoveryPath, "") + masked := initialDocument.DeepCopy() + masked.Items[0].Versions[0].Freshness = apidiscoveryv2beta1.DiscoveryFreshnessStale + + require.Equal(t, masked.Items, maskedDocument.Items) + + // Wipe out default group. The other versions from the other group should now + // appear since the group is not being overridden by defaults ource + defaultManager.RemoveGroup(apis.Items[0].Name) + + _, _, resetDocument := fetchPath(defaultManager, "application/json", discoveryPath, "") + require.Equal(t, resetDocument.Items, initialDocument.Items) +} + // Show the discovery manager is capable of serving requests to multiple users // with unchanging data func TestConcurrentRequests(t *testing.T) { diff --git a/pkg/server/genericapiserver.go b/pkg/server/genericapiserver.go index 4a2756c18..52c865f8a 100644 --- a/pkg/server/genericapiserver.go +++ b/pkg/server/genericapiserver.go @@ -776,6 +776,7 @@ func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *A s.AggregatedDiscoveryGroupManager.AddGroupVersion( groupVersion.Group, apidiscoveryv2beta1.APIVersionDiscovery{ + Freshness: apidiscoveryv2beta1.DiscoveryFreshnessCurrent, Version: groupVersion.Version, Resources: discoveryAPIResources, }, @@ -785,6 +786,7 @@ func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *A s.AggregatedLegacyDiscoveryGroupManager.AddGroupVersion( groupVersion.Group, apidiscoveryv2beta1.APIVersionDiscovery{ + Freshness: apidiscoveryv2beta1.DiscoveryFreshnessCurrent, Version: groupVersion.Version, Resources: discoveryAPIResources, },