add new aggregated resourcemanager to genericapiserver

Co-authored-by: Jeffrey Ying <jeffrey.ying86@live.com>

Kubernetes-commit: 6e83f6750598d394fb257f66c5d0721cf88f45db
This commit is contained in:
Alexander Zielenski 2022-11-08 12:37:50 -08:00 committed by Kubernetes Publisher
parent 0a2a637e85
commit b2bf3ca966
15 changed files with 1745 additions and 16 deletions

View File

@ -254,7 +254,7 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission.
group.GroupVersion = grouplessGroupVersion
group.OptionsExternalVersion = &grouplessGroupVersion
group.Serializer = codecs
if _, err := (&group).InstallREST(container); err != nil {
if _, _, err := (&group).InstallREST(container); err != nil {
panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err))
}
}
@ -266,7 +266,7 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission.
group.GroupVersion = testGroupVersion
group.OptionsExternalVersion = &testGroupVersion
group.Serializer = codecs
if _, err := (&group).InstallREST(container); err != nil {
if _, _, err := (&group).InstallREST(container); err != nil {
panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err))
}
}
@ -278,7 +278,7 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission.
group.GroupVersion = newGroupVersion
group.OptionsExternalVersion = &newGroupVersion
group.Serializer = codecs
if _, err := (&group).InstallREST(container); err != nil {
if _, _, err := (&group).InstallREST(container); err != nil {
panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err))
}
}
@ -3311,7 +3311,7 @@ func TestParentResourceIsRequired(t *testing.T) {
ParameterCodec: parameterCodec,
}
container := restful.NewContainer()
if _, err := group.InstallREST(container); err == nil {
if _, _, err := group.InstallREST(container); err == nil {
t.Fatal("expected error")
}
@ -3343,7 +3343,7 @@ func TestParentResourceIsRequired(t *testing.T) {
ParameterCodec: parameterCodec,
}
container = restful.NewContainer()
if _, err := group.InstallREST(container); err != nil {
if _, _, err := group.InstallREST(container); err != nil {
t.Fatal(err)
}
@ -4328,7 +4328,7 @@ func TestXGSubresource(t *testing.T) {
Serializer: codecs,
}
if _, err := (&group).InstallREST(container); err != nil {
if _, _, err := (&group).InstallREST(container); err != nil {
panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err))
}

View File

@ -0,0 +1,84 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aggregated
import (
"crypto/sha512"
"encoding/json"
"fmt"
"net/http"
"strconv"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
)
// This file exposes helper functions used for calculating the E-Tag header
// used in discovery endpoint responses
// Attaches Cache-Busting functionality to an endpoint
// - Sets ETag header to provided hash
// - Replies with 304 Not Modified, if If-None-Match header matches hash
//
// hash should be the value of calculateETag on object. If hash is empty, then
//
// the object is simply serialized without E-Tag functionality
func ServeHTTPWithETag(
object runtime.Object,
hash string,
serializer runtime.NegotiatedSerializer,
w http.ResponseWriter,
req *http.Request,
) {
// ETag must be enclosed in double quotes:
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/ETag
quotedHash := strconv.Quote(hash)
w.Header().Set("ETag", quotedHash)
w.Header().Set("Vary", "Accept")
w.Header().Set("Cache-Control", "public")
// If Request includes If-None-Match and matches hash, reply with 304
// Otherwise, we delegate to the handler for actual content
//
// According to documentation, An Etag within an If-None-Match
// header will be enclosed within doule quotes:
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/If-None-Match#directives
if clientCachedHash := req.Header.Get("If-None-Match"); quotedHash == clientCachedHash {
w.WriteHeader(http.StatusNotModified)
return
}
responsewriters.WriteObjectNegotiated(
serializer,
DiscoveryEndpointRestrictions,
AggregatedDiscoveryGV,
w,
req,
http.StatusOK,
object,
true,
)
}
func calculateETag(resources interface{}) (string, error) {
serialized, err := json.Marshal(resources)
if err != nil {
return "", err
}
return fmt.Sprintf("%X", sha512.Sum512(serialized)), nil
}

View File

@ -0,0 +1,170 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aggregated
import (
"context"
"errors"
"net/http"
"reflect"
"sync"
"time"
"github.com/emicklei/go-restful/v3"
"github.com/google/go-cmp/cmp"
apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
)
type FakeResourceManager interface {
ResourceManager
Expect() ResourceManager
HasExpectedNumberActions() bool
Validate() error
WaitForActions(ctx context.Context, timeout time.Duration) error
}
func NewFakeResourceManager() FakeResourceManager {
return &fakeResourceManager{}
}
// a resource manager with helper functions for checking the actions
// match expected. For Use in tests
type fakeResourceManager struct {
recorderResourceManager
expect recorderResourceManager
}
// a resource manager which instead of managing a discovery document,
// simply records the calls to its interface functoins for testing
type recorderResourceManager struct {
lock sync.RWMutex
Actions []recorderResourceManagerAction
}
var _ ResourceManager = &fakeResourceManager{}
var _ ResourceManager = &recorderResourceManager{}
// Storage type for a call to the resource manager
type recorderResourceManagerAction struct {
Type string
Group string
Version string
Value interface{}
}
func (f *fakeResourceManager) Expect() ResourceManager {
return &f.expect
}
func (f *fakeResourceManager) HasExpectedNumberActions() bool {
f.lock.RLock()
defer f.lock.RUnlock()
f.expect.lock.RLock()
defer f.expect.lock.RUnlock()
return len(f.Actions) >= len(f.expect.Actions)
}
func (f *fakeResourceManager) Validate() error {
f.lock.RLock()
defer f.lock.RUnlock()
f.expect.lock.RLock()
defer f.expect.lock.RUnlock()
if !reflect.DeepEqual(f.expect.Actions, f.Actions) {
return errors.New(cmp.Diff(f.expect.Actions, f.Actions))
}
return nil
}
func (f *fakeResourceManager) WaitForActions(ctx context.Context, timeout time.Duration) error {
err := wait.PollImmediateWithContext(
ctx,
100*time.Millisecond, // try every 100ms
timeout, // timeout after timeout
func(ctx context.Context) (done bool, err error) {
if f.HasExpectedNumberActions() {
return true, f.Validate()
}
return false, nil
})
return err
}
func (f *recorderResourceManager) SetGroupPriority(groupName string, priority int) {
f.lock.Lock()
defer f.lock.Unlock()
f.Actions = append(f.Actions, recorderResourceManagerAction{
Type: "SetGroupPriority",
Group: groupName,
Value: priority,
})
}
func (f *recorderResourceManager) AddGroupVersion(groupName string, value apidiscoveryv2beta1.APIVersionDiscovery) {
f.lock.Lock()
defer f.lock.Unlock()
f.Actions = append(f.Actions, recorderResourceManagerAction{
Type: "AddGroupVersion",
Group: groupName,
Value: value,
})
}
func (f *recorderResourceManager) RemoveGroup(groupName string) {
f.lock.Lock()
defer f.lock.Unlock()
f.Actions = append(f.Actions, recorderResourceManagerAction{
Type: "RemoveGroup",
Group: groupName,
})
}
func (f *recorderResourceManager) RemoveGroupVersion(gv metav1.GroupVersion) {
f.lock.Lock()
defer f.lock.Unlock()
f.Actions = append(f.Actions, recorderResourceManagerAction{
Type: "RemoveGroupVersion",
Group: gv.Group,
Version: gv.Version,
})
}
func (f *recorderResourceManager) SetGroups(values []apidiscoveryv2beta1.APIGroupDiscovery) {
f.lock.Lock()
defer f.lock.Unlock()
f.Actions = append(f.Actions, recorderResourceManagerAction{
Type: "SetGroups",
Value: values,
})
}
func (f *recorderResourceManager) WebService() *restful.WebService {
panic("unimplemented")
}
func (f *recorderResourceManager) ServeHTTP(http.ResponseWriter, *http.Request) {
panic("unimplemented")
}

View File

@ -0,0 +1,302 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aggregated
import (
"net/http"
"reflect"
"sort"
"sync"
apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"sync/atomic"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/klog/v2"
)
// This handler serves the /apis endpoint for an aggregated list of
// api resources indexed by their group version.
type ResourceManager interface {
// Adds knowledge of the given groupversion to the discovery document
// If it was already being tracked, updates the stored APIVersionDiscovery
// Thread-safe
AddGroupVersion(groupName string, value apidiscoveryv2beta1.APIVersionDiscovery)
// Sets priority for a group for sorting discovery.
// If a priority is set before the group is known, the priority will be ignored
// Once a group is removed, the priority is forgotten.
SetGroupPriority(groupName string, priority int)
// Removes all group versions for a given group
// Thread-safe
RemoveGroup(groupName string)
// Removes a specific groupversion. If all versions of a group have been
// removed, then the entire group is unlisted.
// Thread-safe
RemoveGroupVersion(gv metav1.GroupVersion)
// Resets the manager's known list of group-versions and replaces them
// with the given groups
// Thread-Safe
SetGroups([]apidiscoveryv2beta1.APIGroupDiscovery)
http.Handler
}
type resourceDiscoveryManager struct {
serializer runtime.NegotiatedSerializer
// cache is an atomic pointer to avoid the use of locks
cache atomic.Pointer[cachedGroupList]
// Writes protected by the lock.
// List of all apigroups & resources indexed by the resource manager
lock sync.RWMutex
apiGroups map[string]*apidiscoveryv2beta1.APIGroupDiscovery
apiGroupNames map[string]int
}
func NewResourceManager() ResourceManager {
scheme := runtime.NewScheme()
codecs := serializer.NewCodecFactory(scheme)
utilruntime.Must(apidiscoveryv2beta1.AddToScheme(scheme))
return &resourceDiscoveryManager{serializer: codecs, apiGroupNames: make(map[string]int)}
}
func (rdm *resourceDiscoveryManager) SetGroupPriority(group string, priority int) {
rdm.lock.Lock()
defer rdm.lock.Unlock()
if _, exists := rdm.apiGroupNames[group]; exists {
rdm.apiGroupNames[group] = priority
rdm.cache.Store(nil)
} else {
klog.Warningf("DiscoveryManager: Attempted to set priority for group %s but does not exist", group)
}
}
func (rdm *resourceDiscoveryManager) SetGroups(groups []apidiscoveryv2beta1.APIGroupDiscovery) {
rdm.lock.Lock()
defer rdm.lock.Unlock()
rdm.apiGroups = nil
rdm.cache.Store(nil)
for _, group := range groups {
for _, version := range group.Versions {
rdm.addGroupVersionLocked(group.Name, version)
}
}
// Filter unused out apiGroupNames
for name := range rdm.apiGroupNames {
if _, exists := rdm.apiGroups[name]; !exists {
delete(rdm.apiGroupNames, name)
}
}
}
func (rdm *resourceDiscoveryManager) AddGroupVersion(groupName string, value apidiscoveryv2beta1.APIVersionDiscovery) {
rdm.lock.Lock()
defer rdm.lock.Unlock()
rdm.addGroupVersionLocked(groupName, value)
}
func (rdm *resourceDiscoveryManager) addGroupVersionLocked(groupName string, value apidiscoveryv2beta1.APIVersionDiscovery) {
klog.Infof("Adding GroupVersion %s %s to ResourceManager", groupName, value.Version)
if rdm.apiGroups == nil {
rdm.apiGroups = make(map[string]*apidiscoveryv2beta1.APIGroupDiscovery)
}
if existing, groupExists := rdm.apiGroups[groupName]; groupExists {
// If this version already exists, replace it
versionExists := false
// Not very efficient, but in practice there are generally not many versions
for i := range existing.Versions {
if existing.Versions[i].Version == value.Version {
// The new gv is the exact same as what is already in
// the map. This is a noop and cache should not be
// invalidated.
if reflect.DeepEqual(existing.Versions[i], value) {
return
}
existing.Versions[i] = value
versionExists = true
break
}
}
if !versionExists {
existing.Versions = append(existing.Versions, value)
}
} else {
group := &apidiscoveryv2beta1.APIGroupDiscovery{
ObjectMeta: metav1.ObjectMeta{
Name: groupName,
},
Versions: []apidiscoveryv2beta1.APIVersionDiscovery{value},
}
rdm.apiGroups[groupName] = group
rdm.apiGroupNames[groupName] = 0
}
// Reset response document so it is recreated lazily
rdm.cache.Store(nil)
}
func (rdm *resourceDiscoveryManager) RemoveGroupVersion(apiGroup metav1.GroupVersion) {
rdm.lock.Lock()
defer rdm.lock.Unlock()
group, exists := rdm.apiGroups[apiGroup.Group]
if !exists {
return
}
modified := false
for i := range group.Versions {
if group.Versions[i].Version == apiGroup.Version {
group.Versions = append(group.Versions[:i], group.Versions[i+1:]...)
modified = true
break
}
}
// If no modification was done, cache does not need to be cleared
if !modified {
return
}
if len(group.Versions) == 0 {
delete(rdm.apiGroups, group.Name)
delete(rdm.apiGroupNames, group.Name)
}
// Reset response document so it is recreated lazily
rdm.cache.Store(nil)
}
func (rdm *resourceDiscoveryManager) RemoveGroup(groupName string) {
rdm.lock.Lock()
defer rdm.lock.Unlock()
delete(rdm.apiGroups, groupName)
delete(rdm.apiGroupNames, groupName)
// Reset response document so it is recreated lazily
rdm.cache.Store(nil)
}
// Prepares the api group list for serving by converting them from map into
// list and sorting them according to insertion order
func (rdm *resourceDiscoveryManager) calculateAPIGroupsLocked() []apidiscoveryv2beta1.APIGroupDiscovery {
// Re-order the apiGroups by their priority.
groups := []apidiscoveryv2beta1.APIGroupDiscovery{}
for _, group := range rdm.apiGroups {
groups = append(groups, *group.DeepCopy())
}
sort.SliceStable(groups, func(i, j int) bool {
iName := groups[i].Name
jName := groups[j].Name
// Default to 0 priority by default
iPriority := rdm.apiGroupNames[iName]
jPriority := rdm.apiGroupNames[jName]
// Sort discovery based on apiservice priority.
// Duplicated from staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helpers.go
if iPriority == jPriority {
// Equal priority uses name to break ties
return iName < jName
}
// i sorts before j if it has a lower priority
return iPriority > jPriority
})
return groups
}
// Fetches from cache if it exists. If cache is empty, create it.
func (rdm *resourceDiscoveryManager) fetchFromCache() *cachedGroupList {
rdm.lock.RLock()
defer rdm.lock.RUnlock()
cacheLoad := rdm.cache.Load()
if cacheLoad != nil {
return cacheLoad
}
response := apidiscoveryv2beta1.APIGroupDiscoveryList{
Items: rdm.calculateAPIGroupsLocked(),
}
etag, err := calculateETag(response)
if err != nil {
klog.Errorf("failed to calculate etag for discovery document: %s", etag)
etag = ""
}
cached := &cachedGroupList{
cachedResponse: response,
cachedResponseETag: etag,
}
rdm.cache.Store(cached)
return cached
}
type cachedGroupList struct {
cachedResponse apidiscoveryv2beta1.APIGroupDiscoveryList
cachedResponseETag string
}
func (rdm *resourceDiscoveryManager) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
cache := rdm.fetchFromCache()
response := cache.cachedResponse
etag := cache.cachedResponseETag
if len(etag) > 0 {
// Use proper e-tag headers if one is available
ServeHTTPWithETag(
&response,
etag,
rdm.serializer,
resp,
req,
)
} else {
// Default to normal response in rare case etag is
// not cached with the object for some reason.
responsewriters.WriteObjectNegotiated(
rdm.serializer,
DiscoveryEndpointRestrictions,
AggregatedDiscoveryGV,
resp,
req,
http.StatusOK,
&response,
true,
)
}
}

View File

@ -0,0 +1,501 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aggregated_test
import (
"encoding/json"
"math/rand"
"net/http"
"net/http/httptest"
"sort"
"strconv"
"strings"
"sync"
"testing"
fuzz "github.com/google/gofuzz"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
runtimeserializer "k8s.io/apimachinery/pkg/runtime/serializer"
discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
)
var scheme = runtime.NewScheme()
var codecs = runtimeserializer.NewCodecFactory(scheme)
const discoveryPath = "/apis"
func init() {
// Add all builtin types to scheme
apidiscoveryv2beta1.AddToScheme(scheme)
codecs = runtimeserializer.NewCodecFactory(scheme)
}
func fuzzAPIGroups(atLeastNumGroups, maxNumGroups int, seed int64) apidiscoveryv2beta1.APIGroupDiscoveryList {
fuzzer := fuzz.NewWithSeed(seed)
fuzzer.NumElements(atLeastNumGroups, maxNumGroups)
fuzzer.NilChance(0)
fuzzer.Funcs(func(o *apidiscoveryv2beta1.APIGroupDiscovery, c fuzz.Continue) {
c.FuzzNoCustom(o)
// The ResourceManager will just not serve the group if its versions
// list is empty
atLeastOne := apidiscoveryv2beta1.APIVersionDiscovery{}
c.Fuzz(&atLeastOne)
o.Versions = append(o.Versions, atLeastOne)
o.TypeMeta = metav1.TypeMeta{}
var name string
c.Fuzz(&name)
o.ObjectMeta = metav1.ObjectMeta{
Name: name,
}
})
var apis []apidiscoveryv2beta1.APIGroupDiscovery
fuzzer.Fuzz(&apis)
sort.Slice(apis[:], func(i, j int) bool {
return apis[i].Name < apis[j].Name
})
return apidiscoveryv2beta1.APIGroupDiscoveryList{
TypeMeta: metav1.TypeMeta{
Kind: "APIGroupDiscoveryList",
APIVersion: "apidiscovery.k8s.io/v2beta1",
},
Items: apis,
}
}
func fetchPath(handler http.Handler, acceptPrefix string, path string, etag string) (*http.Response, []byte, *apidiscoveryv2beta1.APIGroupDiscoveryList) {
// Expect json-formatted apis group list
w := httptest.NewRecorder()
req := httptest.NewRequest("GET", discoveryPath, nil)
// Ask for JSON response
req.Header.Set("Accept", acceptPrefix+";g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList")
if etag != "" {
// Quote provided etag if unquoted
quoted := etag
if !strings.HasPrefix(etag, "\"") {
quoted = strconv.Quote(etag)
}
req.Header.Set("If-None-Match", quoted)
}
handler.ServeHTTP(w, req)
bytes := w.Body.Bytes()
var decoded *apidiscoveryv2beta1.APIGroupDiscoveryList
if len(bytes) > 0 {
decoded = &apidiscoveryv2beta1.APIGroupDiscoveryList{}
runtime.DecodeInto(codecs.UniversalDecoder(), bytes, decoded)
}
return w.Result(), bytes, decoded
}
// Add all builtin APIServices to the manager and check the output
func TestBasicResponse(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
apis := fuzzAPIGroups(1, 3, 10)
manager.SetGroups(apis.Items)
response, body, decoded := fetchPath(manager, "application/json", discoveryPath, "")
jsonFormatted, err := json.Marshal(&apis)
require.NoError(t, err, "json marshal should always succeed")
assert.Equal(t, http.StatusOK, response.StatusCode, "response should be 200 OK")
assert.Equal(t, "application/json;g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList", response.Header.Get("Content-Type"), "Content-Type response header should be as requested in Accept header if supported")
assert.NotEmpty(t, response.Header.Get("ETag"), "E-Tag should be set")
assert.NoError(t, err, "decode should always succeed")
assert.EqualValues(t, &apis, decoded, "decoded value should equal input")
assert.Equal(t, string(jsonFormatted)+"\n", string(body), "response should be the api group list")
}
// Test that protobuf is outputted correctly
func TestBasicResponseProtobuf(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
apis := fuzzAPIGroups(1, 3, 10)
manager.SetGroups(apis.Items)
response, _, decoded := fetchPath(manager, "application/vnd.kubernetes.protobuf", discoveryPath, "")
assert.Equal(t, http.StatusOK, response.StatusCode, "response should be 200 OK")
assert.Equal(t, "application/vnd.kubernetes.protobuf;g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList", response.Header.Get("Content-Type"), "Content-Type response header should be as requested in Accept header if supported")
assert.NotEmpty(t, response.Header.Get("ETag"), "E-Tag should be set")
assert.EqualValues(t, &apis, decoded, "decoded value should equal input")
}
// Test that an etag associated with the service only depends on the apiresources
// e.g.: Multiple services with the same contents should have the same etag.
func TestEtagConsistent(t *testing.T) {
// Create 2 managers, add a bunch of services to each
manager1 := discoveryendpoint.NewResourceManager()
manager2 := discoveryendpoint.NewResourceManager()
apis := fuzzAPIGroups(1, 3, 11)
manager1.SetGroups(apis.Items)
manager2.SetGroups(apis.Items)
// Make sure etag of each is the same
res1_initial, _, _ := fetchPath(manager1, "application/json", discoveryPath, "")
res2_initial, _, _ := fetchPath(manager2, "application/json", discoveryPath, "")
assert.NotEmpty(t, res1_initial.Header.Get("ETag"), "Etag should be populated")
assert.NotEmpty(t, res2_initial.Header.Get("ETag"), "Etag should be populated")
assert.Equal(t, res1_initial.Header.Get("ETag"), res2_initial.Header.Get("ETag"), "etag should be deterministic")
// Then add one service to only one.
// Make sure etag is changed, but other is the same
apis = fuzzAPIGroups(1, 1, 11)
for _, group := range apis.Items {
for _, version := range group.Versions {
manager1.AddGroupVersion(group.Name, version)
}
}
res1_addedToOne, _, _ := fetchPath(manager1, "application/json", discoveryPath, "")
res2_addedToOne, _, _ := fetchPath(manager2, "application/json", discoveryPath, "")
assert.NotEmpty(t, res1_addedToOne.Header.Get("ETag"), "Etag should be populated")
assert.NotEmpty(t, res2_addedToOne.Header.Get("ETag"), "Etag should be populated")
assert.NotEqual(t, res1_initial.Header.Get("ETag"), res1_addedToOne.Header.Get("ETag"), "ETag should be changed since version was added")
assert.Equal(t, res2_initial.Header.Get("ETag"), res2_addedToOne.Header.Get("ETag"), "ETag should be unchanged since data was unchanged")
// Then add service to other one
// Make sure etag is the same
for _, group := range apis.Items {
for _, version := range group.Versions {
manager2.AddGroupVersion(group.Name, version)
}
}
res1_addedToBoth, _, _ := fetchPath(manager1, "application/json", discoveryPath, "")
res2_addedToBoth, _, _ := fetchPath(manager2, "application/json", discoveryPath, "")
assert.NotEmpty(t, res1_addedToOne.Header.Get("ETag"), "Etag should be populated")
assert.NotEmpty(t, res2_addedToOne.Header.Get("ETag"), "Etag should be populated")
assert.Equal(t, res1_addedToBoth.Header.Get("ETag"), res2_addedToBoth.Header.Get("ETag"), "ETags should be equal since content is equal")
assert.NotEqual(t, res2_initial.Header.Get("ETag"), res2_addedToBoth.Header.Get("ETag"), "ETag should be changed since data was changed")
// Remove the group version from both. Initial E-Tag should be restored
for _, group := range apis.Items {
for _, version := range group.Versions {
manager1.RemoveGroupVersion(metav1.GroupVersion{
Group: group.Name,
Version: version.Version,
})
manager2.RemoveGroupVersion(metav1.GroupVersion{
Group: group.Name,
Version: version.Version,
})
}
}
res1_removeFromBoth, _, _ := fetchPath(manager1, "application/json", discoveryPath, "")
res2_removeFromBoth, _, _ := fetchPath(manager2, "application/json", discoveryPath, "")
assert.NotEmpty(t, res1_addedToOne.Header.Get("ETag"), "Etag should be populated")
assert.NotEmpty(t, res2_addedToOne.Header.Get("ETag"), "Etag should be populated")
assert.Equal(t, res1_removeFromBoth.Header.Get("ETag"), res2_removeFromBoth.Header.Get("ETag"), "ETags should be equal since content is equal")
assert.Equal(t, res1_initial.Header.Get("ETag"), res1_removeFromBoth.Header.Get("ETag"), "ETag should be equal to initial value since added content was removed")
}
// Test that if a request comes in with an If-None-Match header with an incorrect
// E-Tag, that fresh content is returned.
func TestEtagNonMatching(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
apis := fuzzAPIGroups(1, 3, 12)
manager.SetGroups(apis.Items)
// fetch the document once
initial, _, _ := fetchPath(manager, "application/json", discoveryPath, "")
assert.NotEmpty(t, initial.Header.Get("ETag"), "ETag should be populated")
// Send another request with a wrong e-tag. The same response should
// get sent again
second, _, _ := fetchPath(manager, "application/json", discoveryPath, "wrongetag")
assert.Equal(t, http.StatusOK, initial.StatusCode, "response should be 200 OK")
assert.Equal(t, http.StatusOK, second.StatusCode, "response should be 200 OK")
assert.Equal(t, initial.Header.Get("ETag"), second.Header.Get("ETag"), "ETag of both requests should be equal")
}
// Test that if a request comes in with an If-None-Match header with a correct
// E-Tag, that 304 Not Modified is returned
func TestEtagMatching(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
apis := fuzzAPIGroups(1, 3, 12)
manager.SetGroups(apis.Items)
// fetch the document once
initial, initialBody, _ := fetchPath(manager, "application/json", discoveryPath, "")
assert.NotEmpty(t, initial.Header.Get("ETag"), "ETag should be populated")
assert.NotEmpty(t, initialBody, "body should not be empty")
// Send another request with a wrong e-tag. The same response should
// get sent again
second, secondBody, _ := fetchPath(manager, "application/json", discoveryPath, initial.Header.Get("ETag"))
assert.Equal(t, http.StatusOK, initial.StatusCode, "initial response should be 200 OK")
assert.Equal(t, http.StatusNotModified, second.StatusCode, "second response should be 304 Not Modified")
assert.Equal(t, initial.Header.Get("ETag"), second.Header.Get("ETag"), "ETag of both requests should be equal")
assert.Empty(t, secondBody, "body should be empty when returning 304 Not Modified")
}
// Test that if a request comes in with an If-None-Match header with an old
// E-Tag, that fresh content is returned
func TestEtagOutdated(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
apis := fuzzAPIGroups(1, 3, 15)
manager.SetGroups(apis.Items)
// fetch the document once
initial, initialBody, _ := fetchPath(manager, "application/json", discoveryPath, "")
assert.NotEmpty(t, initial.Header.Get("ETag"), "ETag should be populated")
assert.NotEmpty(t, initialBody, "body should not be empty")
// Then add some services so the etag changes
apis = fuzzAPIGroups(1, 3, 14)
for _, group := range apis.Items {
for _, version := range group.Versions {
manager.AddGroupVersion(group.Name, version)
}
}
// Send another request with the old e-tag. Response should not be 304 Not Modified
second, secondBody, _ := fetchPath(manager, "application/json", discoveryPath, initial.Header.Get("ETag"))
assert.Equal(t, http.StatusOK, initial.StatusCode, "initial response should be 200 OK")
assert.Equal(t, http.StatusOK, second.StatusCode, "second response should be 304 Not Modified")
assert.NotEqual(t, initial.Header.Get("ETag"), second.Header.Get("ETag"), "ETag of both requests should be unequal since contents differ")
assert.NotEmpty(t, secondBody, "body should be not empty when returning 304 Not Modified")
}
// Test that an api service can be added or removed
func TestAddRemove(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
apis := fuzzAPIGroups(1, 3, 15)
for _, group := range apis.Items {
for _, version := range group.Versions {
manager.AddGroupVersion(group.Name, version)
}
}
_, _, initialDocument := fetchPath(manager, "application/json", discoveryPath, "")
for _, group := range apis.Items {
for _, version := range group.Versions {
manager.RemoveGroupVersion(metav1.GroupVersion{
Group: group.Name,
Version: version.Version,
})
}
}
_, _, secondDocument := fetchPath(manager, "application/json", discoveryPath, "")
require.NotNil(t, initialDocument, "initial document should parse")
require.NotNil(t, secondDocument, "second document should parse")
assert.Len(t, initialDocument.Items, len(apis.Items), "initial document should have set number of groups")
assert.Len(t, secondDocument.Items, 0, "second document should have no groups")
}
// Show that updating an existing service replaces and does not add the entry
// and instead replaces it
func TestUpdateService(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
apis := fuzzAPIGroups(1, 3, 15)
for _, group := range apis.Items {
for _, version := range group.Versions {
manager.AddGroupVersion(group.Name, version)
}
}
_, _, initialDocument := fetchPath(manager, "application/json", discoveryPath, "")
assert.Equal(t, initialDocument, &apis, "should have returned expected document")
b, err := json.Marshal(apis)
if err != nil {
t.Error(err)
}
var newapis apidiscoveryv2beta1.APIGroupDiscoveryList
err = json.Unmarshal(b, &newapis)
if err != nil {
t.Error(err)
}
newapis.Items[0].Versions[0].Resources[0].Resource = "changed a resource name!"
for _, group := range newapis.Items {
for _, version := range group.Versions {
manager.AddGroupVersion(group.Name, version)
}
}
_, _, secondDocument := fetchPath(manager, "application/json", discoveryPath, "")
assert.Equal(t, secondDocument, &newapis, "should have returned expected document")
assert.NotEqual(t, secondDocument, initialDocument, "should have returned expected document")
}
// Show the discovery manager is capable of serving requests to multiple users
// with unchanging data
func TestConcurrentRequests(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
apis := fuzzAPIGroups(1, 3, 15)
manager.SetGroups(apis.Items)
waitGroup := sync.WaitGroup{}
numReaders := 100
numRequestsPerReader := 100
// Spawn a bunch of readers that will keep sending requests to the server
for i := 0; i < numReaders; i++ {
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
etag := ""
for j := 0; j < numRequestsPerReader; j++ {
usedEtag := etag
if j%2 == 0 {
// Disable use of etag for every second request
usedEtag = ""
}
response, body, document := fetchPath(manager, "application/json", discoveryPath, usedEtag)
if usedEtag != "" {
assert.Equal(t, http.StatusNotModified, response.StatusCode, "response should be Not Modified if etag was used")
assert.Empty(t, body, "body should be empty if etag used")
} else {
assert.Equal(t, http.StatusOK, response.StatusCode, "response should be OK if etag was unused")
assert.Equal(t, &apis, document, "document should be equal")
}
etag = response.Header.Get("ETag")
}
}()
}
waitGroup.Wait()
}
// Show the handler is capable of serving many concurrent readers and many
// concurrent writers without tripping up. Good to run with go '-race' detector
// since there are not many "correctness" checks
func TestAbuse(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
numReaders := 100
numRequestsPerReader := 1000
numWriters := 10
numWritesPerWriter := 1000
waitGroup := sync.WaitGroup{}
// Spawn a bunch of writers that randomly add groups, remove groups, and
// reset the list of groups
for i := 0; i < numWriters; i++ {
source := rand.NewSource(int64(i))
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
// track list of groups we've added so that we can remove them
// randomly
var addedGroups []metav1.GroupVersion
for j := 0; j < numWritesPerWriter; j++ {
switch source.Int63() % 3 {
case 0:
// Add a fuzzed group
apis := fuzzAPIGroups(1, 2, 15)
for _, group := range apis.Items {
for _, version := range group.Versions {
manager.AddGroupVersion(group.Name, version)
addedGroups = append(addedGroups, metav1.GroupVersion{
Group: group.Name,
Version: version.Version,
})
}
}
case 1:
// Remove a group that we have added
if len(addedGroups) > 0 {
manager.RemoveGroupVersion(addedGroups[0])
addedGroups = addedGroups[1:]
} else {
// Send a request and try to remove a group someone else
// might have added
_, _, document := fetchPath(manager, "application/json", discoveryPath, "")
assert.NotNil(t, document, "manager should always succeed in returning a document")
if len(document.Items) > 0 {
manager.RemoveGroupVersion(metav1.GroupVersion{
Group: document.Items[0].Name,
Version: document.Items[0].Versions[0].Version,
})
}
}
case 2:
manager.SetGroups(nil)
addedGroups = nil
default:
panic("unreachable")
}
}
}()
}
// Spawn a bunch of readers that will keep sending requests to the server
// and making sure the response makes sense
for i := 0; i < numReaders; i++ {
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
etag := ""
for j := 0; j < numRequestsPerReader; j++ {
response, body, document := fetchPath(manager, "application/json", discoveryPath, etag)
if response.StatusCode == http.StatusNotModified {
assert.Equal(t, etag, response.Header.Get("ETag"))
assert.Empty(t, body, "body should be empty if etag used")
assert.Nil(t, document)
} else {
assert.Equal(t, http.StatusOK, response.StatusCode, "response should be OK if etag was unused")
assert.NotNil(t, document)
}
etag = response.Header.Get("ETag")
}
}()
}
waitGroup.Wait()
}

View File

@ -0,0 +1,45 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aggregated
import (
"k8s.io/apimachinery/pkg/runtime/schema"
)
var AggregatedDiscoveryGV = schema.GroupVersion{Group: "apidiscovery.k8s.io", Version: "v2beta1"}
// Interface is from "k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
// DiscoveryEndpointRestrictions allows requests to /apis to provide a Content Negotiation GVK for aggregated discovery.
var DiscoveryEndpointRestrictions = discoveryEndpointRestrictions{}
type discoveryEndpointRestrictions struct{}
func (discoveryEndpointRestrictions) AllowsMediaTypeTransform(mimeType string, mimeSubType string, gvk *schema.GroupVersionKind) bool {
return IsAggregatedDiscoveryGVK(gvk)
}
func (discoveryEndpointRestrictions) AllowsServerVersion(string) bool { return false }
func (discoveryEndpointRestrictions) AllowsStreamSchema(s string) bool { return s == "watch" }
// IsAggregatedDiscoveryGVK checks if a provided GVK is the GVK for serving aggregated discovery.
func IsAggregatedDiscoveryGVK(gvk *schema.GroupVersionKind) bool {
if gvk != nil {
return gvk.Group == "apidiscovery.k8s.io" && gvk.Version == "v2beta1" && gvk.Kind == "APIGroupDiscoveryList"
}
return false
}

View File

@ -0,0 +1,78 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aggregated
import (
"net/http"
apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1"
"k8s.io/apimachinery/pkg/runtime/serializer"
"github.com/emicklei/go-restful/v3"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
genericfeatures "k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
)
type WrappedHandler struct {
s runtime.NegotiatedSerializer
handler http.Handler
aggHandler http.Handler
}
func (wrapped *WrappedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) {
mediaType, _ := negotiation.NegotiateMediaTypeOptions(req.Header.Get("Accept"), wrapped.s.SupportedMediaTypes(), DiscoveryEndpointRestrictions)
// mediaType.Convert looks at the request accept headers and is used to control whether the discovery document will be aggregated.
if IsAggregatedDiscoveryGVK(mediaType.Convert) {
wrapped.aggHandler.ServeHTTP(resp, req)
return
}
}
wrapped.handler.ServeHTTP(resp, req)
}
func (wrapped *WrappedHandler) restfulHandle(req *restful.Request, resp *restful.Response) {
wrapped.ServeHTTP(resp.ResponseWriter, req.Request)
}
func (wrapped *WrappedHandler) GenerateWebService(prefix string, returnType interface{}) *restful.WebService {
mediaTypes, _ := negotiation.MediaTypesForSerializer(wrapped.s)
ws := new(restful.WebService)
ws.Path(prefix)
ws.Doc("get available API versions")
ws.Route(ws.GET("/").To(wrapped.restfulHandle).
Doc("get available API versions").
Operation("getAPIVersions").
Produces(mediaTypes...).
Consumes(mediaTypes...).
Writes(returnType))
return ws
}
// WrapAggregatedDiscoveryToHandler wraps a handler with an option to
// emit the aggregated discovery by passing in the aggregated
// discovery type in content negotiation headers: eg: (Accept:
// application/json;v=v2beta1;g=apidiscovery.k8s.io;as=APIGroupDiscoveryList)
func WrapAggregatedDiscoveryToHandler(handler http.Handler, aggHandler http.Handler) *WrappedHandler {
scheme := runtime.NewScheme()
apidiscoveryv2beta1.AddToScheme(scheme)
codecs := serializer.NewCodecFactory(scheme)
return &WrappedHandler{codecs, handler, aggHandler}
}

View File

@ -0,0 +1,156 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aggregated
import (
"net/http"
"net/http/httptest"
"io"
"testing"
"github.com/stretchr/testify/assert"
genericfeatures "k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
)
const discoveryPath = "/apis"
const jsonAccept = "application/json"
const protobufAccept = "application/vnd.kubernetes.protobuf"
const aggregatedAcceptSuffix = ";g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList"
const aggregatedJSONAccept = jsonAccept + aggregatedAcceptSuffix
const aggregatedProtoAccept = protobufAccept + aggregatedAcceptSuffix
func fetchPath(handler http.Handler, path, accept string) string {
w := httptest.NewRecorder()
req := httptest.NewRequest("GET", discoveryPath, nil)
// Ask for JSON response
req.Header.Set("Accept", accept)
handler.ServeHTTP(w, req)
return string(w.Body.Bytes())
}
type fakeHTTPHandler struct {
data string
}
func (f fakeHTTPHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
io.WriteString(resp, f.data)
}
func TestAggregationEnabled(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.AggregatedDiscoveryEndpoint, true)()
unaggregated := fakeHTTPHandler{data: "unaggregated"}
aggregated := fakeHTTPHandler{data: "aggregated"}
wrapped := WrapAggregatedDiscoveryToHandler(unaggregated, aggregated)
testCases := []struct {
accept string
expected string
}{
{
// Misconstructed/incorrect accept headers should be passed to the unaggregated handler to return an error
accept: "application/json;foo=bar",
expected: "unaggregated",
}, {
// Empty accept headers are valid and should be handled by the unaggregated handler
accept: "",
expected: "unaggregated",
}, {
accept: aggregatedJSONAccept,
expected: "aggregated",
}, {
accept: aggregatedProtoAccept,
expected: "aggregated",
}, {
accept: jsonAccept,
expected: "unaggregated",
}, {
accept: protobufAccept,
expected: "unaggregated",
}, {
// Server should return the first accepted type
accept: aggregatedJSONAccept + "," + jsonAccept,
expected: "aggregated",
}, {
// Server should return the first accepted type
accept: aggregatedProtoAccept + "," + protobufAccept,
expected: "aggregated",
},
}
for _, tc := range testCases {
body := fetchPath(wrapped, discoveryPath, tc.accept)
assert.Equal(t, tc.expected, body)
}
}
func TestAggregationDisabled(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, genericfeatures.AggregatedDiscoveryEndpoint, false)()
unaggregated := fakeHTTPHandler{data: "unaggregated"}
aggregated := fakeHTTPHandler{data: "aggregated"}
wrapped := WrapAggregatedDiscoveryToHandler(unaggregated, aggregated)
testCases := []struct {
accept string
expected string
}{
{
// Misconstructed/incorrect accept headers should be passed to the unaggregated handler to return an error
accept: "application/json;foo=bar",
expected: "unaggregated",
}, {
// Empty accept headers are valid and should be handled by the unaggregated handler
accept: "",
expected: "unaggregated",
}, {
accept: aggregatedJSONAccept,
expected: "unaggregated",
}, {
accept: aggregatedProtoAccept,
expected: "unaggregated",
}, {
accept: jsonAccept,
expected: "unaggregated",
}, {
accept: protobufAccept,
expected: "unaggregated",
}, {
// Server should return the first accepted type.
// If aggregation is disabled, the unaggregated type should be returned.
accept: aggregatedJSONAccept + "," + jsonAccept,
expected: "unaggregated",
}, {
// Server should return the first accepted type.
// If aggregation is disabled, the unaggregated type should be returned.
accept: aggregatedProtoAccept + "," + protobufAccept,
expected: "unaggregated",
},
}
for _, tc := range testCases {
body := fetchPath(wrapped, discoveryPath, tc.accept)
assert.Equal(t, tc.expected, body)
}
}

View File

@ -56,7 +56,7 @@ func (s *legacyRootAPIHandler) WebService() *restful.WebService {
ws := new(restful.WebService)
ws.Path(s.apiPrefix)
ws.Doc("get available API versions")
ws.Route(ws.GET("/").To(s.handle).
ws.Route(ws.GET("/").To(s.restfulHandle).
Doc("get available API versions").
Operation("getAPIVersions").
Produces(mediaTypes...).
@ -65,12 +65,16 @@ func (s *legacyRootAPIHandler) WebService() *restful.WebService {
return ws
}
func (s *legacyRootAPIHandler) handle(req *restful.Request, resp *restful.Response) {
clientIP := utilnet.GetClientIP(req.Request)
func (s *legacyRootAPIHandler) restfulHandle(req *restful.Request, resp *restful.Response) {
s.ServeHTTP(resp.ResponseWriter, req.Request)
}
func (s *legacyRootAPIHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
clientIP := utilnet.GetClientIP(req)
apiVersions := &metav1.APIVersions{
ServerAddressByClientCIDRs: s.addresses.ServerAddressByClientCIDRs(clientIP),
Versions: []string{"v1"},
}
responsewriters.WriteObjectNegotiated(s.serializer, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, resp.ResponseWriter, req.Request, http.StatusOK, apiVersions, false)
responsewriters.WriteObjectNegotiated(s.serializer, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, resp, req, http.StatusOK, apiVersions, false)
}

View File

@ -35,7 +35,7 @@ import (
type GroupManager interface {
AddGroup(apiGroup metav1.APIGroup)
RemoveGroup(groupName string)
ServeHTTP(resp http.ResponseWriter, req *http.Request)
WebService() *restful.WebService
}

View File

@ -22,6 +22,7 @@ import (
restful "github.com/emicklei/go-restful/v3"
apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
@ -105,7 +106,7 @@ type APIGroupVersion struct {
// InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container.
// It is expected that the provided path root prefix will serve all operations. Root MUST NOT end
// in a slash.
func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]*storageversion.ResourceInfo, error) {
func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]apidiscoveryv2beta1.APIResourceDiscovery, []*storageversion.ResourceInfo, error) {
prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
installer := &APIInstaller{
group: g,
@ -117,7 +118,11 @@ func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]*storagev
versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
versionDiscoveryHandler.AddToWebService(ws)
container.Add(ws)
return removeNonPersistedResources(resourceInfos), utilerrors.NewAggregate(registrationErrors)
aggregatedDiscoveryResources, err := ConvertGroupVersionIntoToDiscovery(apiResources)
if err != nil {
registrationErrors = append(registrationErrors, err)
}
return aggregatedDiscoveryResources, removeNonPersistedResources(resourceInfos), utilerrors.NewAggregate(registrationErrors)
}
func removeNonPersistedResources(infos []*storageversion.ResourceInfo) []*storageversion.ResourceInfo {

View File

@ -26,6 +26,7 @@ import (
"unicode"
restful "github.com/emicklei/go-restful/v3"
apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/runtime"
@ -68,6 +69,94 @@ type action struct {
AllNamespaces bool // true iff the action is namespaced but works on aggregate result for all namespaces
}
func ConvertGroupVersionIntoToDiscovery(list []metav1.APIResource) ([]apidiscoveryv2beta1.APIResourceDiscovery, error) {
var apiResourceList []apidiscoveryv2beta1.APIResourceDiscovery
parentResources := map[string]*apidiscoveryv2beta1.APIResourceDiscovery{}
// Loop through all top-level resources
for _, r := range list {
if strings.Contains(r.Name, "/") {
// Skip subresources for now so we can get the list of resources
continue
}
var scope apidiscoveryv2beta1.ResourceScope
if r.Namespaced {
scope = apidiscoveryv2beta1.ScopeNamespace
} else {
scope = apidiscoveryv2beta1.ScopeCluster
}
apiResourceList = append(apiResourceList, apidiscoveryv2beta1.APIResourceDiscovery{
Resource: r.Name,
Scope: scope,
ResponseKind: &metav1.GroupVersionKind{
Group: r.Group,
Version: r.Version,
Kind: r.Kind,
},
Verbs: r.Verbs,
ShortNames: r.ShortNames,
Categories: r.Categories,
SingularResource: r.SingularName,
})
parentResources[r.Name] = &apiResourceList[len(apiResourceList)-1]
}
// Loop through all subresources
for _, r := range list {
// Split resource name and subresource name
split := strings.SplitN(r.Name, "/", 2)
if len(split) != 2 {
// Skip parent resources
continue
}
var scope apidiscoveryv2beta1.ResourceScope
if r.Namespaced {
scope = apidiscoveryv2beta1.ScopeNamespace
} else {
scope = apidiscoveryv2beta1.ScopeCluster
}
var parent *apidiscoveryv2beta1.APIResourceDiscovery
var exists bool
parent, exists = parentResources[split[0]]
if !exists {
// If a subresource exists without a parent, create a parent
apiResourceList = append(apiResourceList, apidiscoveryv2beta1.APIResourceDiscovery{
Resource: split[0],
Scope: scope,
})
parentResources[split[0]] = &apiResourceList[len(apiResourceList)-1]
parent = &apiResourceList[len(apiResourceList)-1]
parentResources[split[0]] = parent
}
if parent.Scope != scope {
return nil, fmt.Errorf("Error: Parent %s (scope: %s) and subresource %s (scope: %s) scope do not match", split[0], parent.Scope, split[1], scope)
//
}
subresource := apidiscoveryv2beta1.APISubresourceDiscovery{
Subresource: split[1],
Verbs: r.Verbs,
}
if r.Kind != "" {
subresource.ResponseKind = &metav1.GroupVersionKind{
Group: r.Group,
Version: r.Version,
Kind: r.Kind,
}
}
parent.Subresources = append(parent.Subresources, subresource)
}
return apiResourceList, nil
}
// An interface to see if one storage supports override its default verb for monitoring
type StorageMetricsOverride interface {
// OverrideMetricsVerb gives a storage object an opportunity to override the verb reported to the metrics endpoint

View File

@ -18,6 +18,10 @@ package endpoints
import (
"testing"
"github.com/stretchr/testify/require"
apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func TestIsVowel(t *testing.T) {
@ -97,3 +101,243 @@ func TestGetArticleForNoun(t *testing.T) {
}
}
}
func TestConvertAPIResourceToDiscovery(t *testing.T) {
tests := []struct {
name string
resources []metav1.APIResource
wantAPIResourceDiscovery []apidiscoveryv2beta1.APIResourceDiscovery
wantErr bool
}{
{
name: "Basic Test",
resources: []metav1.APIResource{
{
Name: "pods",
Namespaced: true,
Kind: "Pod",
ShortNames: []string{"po"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
},
},
wantAPIResourceDiscovery: []apidiscoveryv2beta1.APIResourceDiscovery{
{
Resource: "pods",
Scope: apidiscoveryv2beta1.ScopeNamespace,
ResponseKind: &metav1.GroupVersionKind{
Kind: "Pod",
},
ShortNames: []string{"po"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
},
},
},
{
name: "Basic Group Version Test",
resources: []metav1.APIResource{
{
Name: "cronjobs",
Namespaced: true,
Group: "batch",
Version: "v1",
Kind: "CronJob",
ShortNames: []string{"cj"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
},
},
wantAPIResourceDiscovery: []apidiscoveryv2beta1.APIResourceDiscovery{
{
Resource: "cronjobs",
Scope: apidiscoveryv2beta1.ScopeNamespace,
ResponseKind: &metav1.GroupVersionKind{
Group: "batch",
Version: "v1",
Kind: "CronJob",
},
ShortNames: []string{"cj"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
},
},
},
{
name: "Test with subresource",
resources: []metav1.APIResource{
{
Name: "cronjobs",
Namespaced: true,
Kind: "CronJob",
Group: "batch",
Version: "v1",
ShortNames: []string{"cj"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
},
{
Name: "cronjobs/status",
Namespaced: true,
Kind: "CronJob",
Group: "batch",
Version: "v1",
ShortNames: []string{"cj"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
},
},
wantAPIResourceDiscovery: []apidiscoveryv2beta1.APIResourceDiscovery{
{
Resource: "cronjobs",
Scope: apidiscoveryv2beta1.ScopeNamespace,
ResponseKind: &metav1.GroupVersionKind{
Group: "batch",
Version: "v1",
Kind: "CronJob",
},
ShortNames: []string{"cj"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
Subresources: []apidiscoveryv2beta1.APISubresourceDiscovery{{
Subresource: "status",
ResponseKind: &metav1.GroupVersionKind{
Group: "batch",
Version: "v1",
Kind: "CronJob",
},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
}},
},
},
},
{
name: "Test with subresource with no parent",
resources: []metav1.APIResource{
{
Name: "cronjobs/status",
Namespaced: true,
Kind: "CronJob",
Group: "batch",
Version: "v1",
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
},
},
wantAPIResourceDiscovery: []apidiscoveryv2beta1.APIResourceDiscovery{
{
Resource: "cronjobs",
Scope: apidiscoveryv2beta1.ScopeNamespace,
Subresources: []apidiscoveryv2beta1.APISubresourceDiscovery{{
Subresource: "status",
ResponseKind: &metav1.GroupVersionKind{
Group: "batch",
Version: "v1",
Kind: "CronJob",
},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
}},
},
},
},
{
name: "Test with mismatch parent and subresource scope",
resources: []metav1.APIResource{
{
Name: "cronjobs",
Namespaced: true,
Kind: "CronJob",
Group: "batch",
Version: "v1",
ShortNames: []string{"cj"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
},
{
Name: "cronjobs/status",
Namespaced: false,
Kind: "CronJob",
Group: "batch",
Version: "v1",
ShortNames: []string{"cj"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
},
},
wantAPIResourceDiscovery: []apidiscoveryv2beta1.APIResourceDiscovery{},
wantErr: true,
},
{
name: "Cluster Scope Test",
resources: []metav1.APIResource{
{
Name: "nodes",
Namespaced: false,
Kind: "Node",
ShortNames: []string{"no"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
},
},
wantAPIResourceDiscovery: []apidiscoveryv2beta1.APIResourceDiscovery{
{
Resource: "nodes",
Scope: apidiscoveryv2beta1.ScopeCluster,
ResponseKind: &metav1.GroupVersionKind{
Kind: "Node",
},
ShortNames: []string{"no"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
},
},
},
{
name: "Namespace Scope Test",
resources: []metav1.APIResource{
{
Name: "nodes",
Namespaced: true,
Kind: "Node",
ShortNames: []string{"no"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
},
},
wantAPIResourceDiscovery: []apidiscoveryv2beta1.APIResourceDiscovery{
{
Resource: "nodes",
Scope: apidiscoveryv2beta1.ScopeNamespace,
ResponseKind: &metav1.GroupVersionKind{
Kind: "Node",
},
ShortNames: []string{"no"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
},
},
},
{
name: "Singular Resource Name",
resources: []metav1.APIResource{
{
Name: "nodes",
SingularName: "node",
Kind: "Node",
ShortNames: []string{"no"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
},
},
wantAPIResourceDiscovery: []apidiscoveryv2beta1.APIResourceDiscovery{
{
Resource: "nodes",
SingularResource: "node",
Scope: apidiscoveryv2beta1.ScopeCluster,
ResponseKind: &metav1.GroupVersionKind{
Kind: "Node",
},
ShortNames: []string{"no"},
Verbs: []string{"create", "delete", "deletecollection", "get", "list", "patch", "update", "watch"},
},
},
},
}
for _, tt := range tests {
discoveryAPIResources, err := ConvertGroupVersionIntoToDiscovery(tt.resources)
if err != nil {
if tt.wantErr == false {
t.Error(err)
}
} else {
require.Equal(t, tt.wantAPIResourceDiscovery, discoveryAPIResources)
}
}
}

View File

@ -50,6 +50,7 @@ import (
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/endpoints/discovery"
discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
"k8s.io/apiserver/pkg/endpoints/filterlatency"
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
apiopenapi "k8s.io/apiserver/pkg/endpoints/openapi"
@ -125,6 +126,7 @@ type Config struct {
EnableIndex bool
EnableProfiling bool
EnableDiscovery bool
// Requires generic profiling enabled
EnableContentionProfiling bool
EnableMetrics bool
@ -262,6 +264,9 @@ type Config struct {
// StorageVersionManager holds the storage versions of the API resources installed by this server.
StorageVersionManager storageversion.Manager
// AggregatedDiscoveryGroupManager serves /apis in an aggregated form.
AggregatedDiscoveryGroupManager discoveryendpoint.ResourceManager
}
type RecommendedConfig struct {
@ -677,6 +682,14 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
muxAndDiscoveryCompleteSignals: map[string]<-chan struct{}{},
}
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) {
manager := c.AggregatedDiscoveryGroupManager
if manager == nil {
manager = discoveryendpoint.NewResourceManager()
}
s.AggregatedDiscoveryGroupManager = manager
s.AggregatedLegacyDiscoveryGroupManager = discoveryendpoint.NewResourceManager()
}
for {
if c.JSONPatchMaxCopyBytes <= 0 {
break

View File

@ -26,6 +26,7 @@ import (
systemd "github.com/coreos/go-systemd/v22/daemon"
apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -39,6 +40,7 @@ import (
"k8s.io/apiserver/pkg/authorization/authorizer"
genericapi "k8s.io/apiserver/pkg/endpoints"
"k8s.io/apiserver/pkg/endpoints/discovery"
discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/registry/rest"
@ -137,9 +139,15 @@ type GenericAPIServer struct {
// listedPathProvider is a lister which provides the set of paths to show at /
listedPathProvider routes.ListedPathProvider
// DiscoveryGroupManager serves /apis
// DiscoveryGroupManager serves /apis in an unaggregated form.
DiscoveryGroupManager discovery.GroupManager
// AggregatedDiscoveryGroupManager serves /apis in an aggregated form.
AggregatedDiscoveryGroupManager discoveryendpoint.ResourceManager
// AggregatedLegacyDiscoveryGroupManager serves /api in an aggregated form.
AggregatedLegacyDiscoveryGroupManager discoveryendpoint.ResourceManager
// Enable swagger and/or OpenAPI if these configs are non-nil.
openAPIConfig *openapicommon.Config
@ -672,11 +680,35 @@ func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *A
apiGroupVersion.MaxRequestBodyBytes = s.maxRequestBodyBytes
r, err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer)
discoveryAPIResources, r, err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer)
if err != nil {
return fmt.Errorf("unable to setup API %v: %v", apiGroupInfo, err)
}
resourceInfos = append(resourceInfos, r...)
if utilfeature.DefaultFeatureGate.Enabled(features.AggregatedDiscoveryEndpoint) {
// Aggregated discovery only aggregates resources under /apis
if apiPrefix == APIGroupPrefix {
s.AggregatedDiscoveryGroupManager.AddGroupVersion(
groupVersion.Group,
apidiscoveryv2beta1.APIVersionDiscovery{
Version: groupVersion.Version,
Resources: discoveryAPIResources,
},
)
} else {
// There is only one group version for legacy resources, priority can be defaulted to 0.
s.AggregatedLegacyDiscoveryGroupManager.AddGroupVersion(
groupVersion.Group,
apidiscoveryv2beta1.APIVersionDiscovery{
Version: groupVersion.Version,
Resources: discoveryAPIResources,
},
)
}
}
}
s.RegisterDestroyFunc(apiGroupInfo.destroyStorage)
@ -711,7 +743,13 @@ func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo
// Install the version handler.
// Add a handler at /<apiPrefix> to enumerate the supported api versions.
s.Handler.GoRestfulContainer.Add(discovery.NewLegacyRootAPIHandler(s.discoveryAddresses, s.Serializer, apiPrefix).WebService())
legacyRootAPIHandler := discovery.NewLegacyRootAPIHandler(s.discoveryAddresses, s.Serializer, apiPrefix)
if utilfeature.DefaultFeatureGate.Enabled(features.AggregatedDiscoveryEndpoint) {
wrapped := discoveryendpoint.WrapAggregatedDiscoveryToHandler(legacyRootAPIHandler, s.AggregatedLegacyDiscoveryGroupManager)
s.Handler.GoRestfulContainer.Add(wrapped.GenerateWebService("/api", metav1.APIVersions{}))
} else {
s.Handler.GoRestfulContainer.Add(legacyRootAPIHandler.WebService())
}
return nil
}