refactor cached restmapper based on Kubernetes restmapper
Signed-off-by: RainbowMango <qdurenhongcai@gmail.com>
This commit is contained in:
parent
d27faeac95
commit
dec4e42101
|
@ -8,7 +8,6 @@ import (
|
|||
"k8s.io/client-go/discovery"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/restmapper"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
|
||||
)
|
||||
|
||||
// GetGroupVersionResource is a helper to map GVK(schema.GroupVersionKind) to GVR(schema.GroupVersionResource).
|
||||
|
@ -23,39 +22,42 @@ func GetGroupVersionResource(restMapper meta.RESTMapper, gvk schema.GroupVersion
|
|||
// cachedRESTMapper caches the previous result to accelerate subsequent queries.
|
||||
// Note: now the acceleration applies only to RESTMapping() which is heavily used by Karmada.
|
||||
type cachedRESTMapper struct {
|
||||
restMapper meta.RESTMapper
|
||||
gvkToGVR sync.Map
|
||||
restMapper meta.RESTMapper
|
||||
discoveryClient discovery.DiscoveryInterface
|
||||
gvkToGVR sync.Map
|
||||
// mu is used to provide thread-safe mapper reloading.
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
func (g *cachedRESTMapper) KindFor(resource schema.GroupVersionResource) (schema.GroupVersionKind, error) {
|
||||
return g.restMapper.KindFor(resource)
|
||||
return g.getMapper().KindFor(resource)
|
||||
}
|
||||
|
||||
func (g *cachedRESTMapper) KindsFor(resource schema.GroupVersionResource) ([]schema.GroupVersionKind, error) {
|
||||
return g.restMapper.KindsFor(resource)
|
||||
return g.getMapper().KindsFor(resource)
|
||||
}
|
||||
|
||||
func (g *cachedRESTMapper) ResourceFor(input schema.GroupVersionResource) (schema.GroupVersionResource, error) {
|
||||
return g.restMapper.ResourceFor(input)
|
||||
return g.getMapper().ResourceFor(input)
|
||||
}
|
||||
|
||||
func (g *cachedRESTMapper) ResourcesFor(input schema.GroupVersionResource) ([]schema.GroupVersionResource, error) {
|
||||
return g.restMapper.ResourcesFor(input)
|
||||
return g.getMapper().ResourcesFor(input)
|
||||
}
|
||||
|
||||
func (g *cachedRESTMapper) RESTMappings(gk schema.GroupKind, versions ...string) ([]*meta.RESTMapping, error) {
|
||||
return g.restMapper.RESTMappings(gk, versions...)
|
||||
return g.getMapper().RESTMappings(gk, versions...)
|
||||
}
|
||||
|
||||
func (g *cachedRESTMapper) ResourceSingularizer(resource string) (singular string, err error) {
|
||||
return g.restMapper.ResourceSingularizer(resource)
|
||||
return g.getMapper().ResourceSingularizer(resource)
|
||||
}
|
||||
|
||||
func (g *cachedRESTMapper) RESTMapping(gk schema.GroupKind, versions ...string) (*meta.RESTMapping, error) {
|
||||
// in case of multi-versions or no versions, cachedRESTMapper don't know which is the preferred version,
|
||||
// so just bypass the cache and consult the underlying mapper.
|
||||
if len(versions) != 1 {
|
||||
return g.restMapper.RESTMapping(gk, versions...)
|
||||
return g.getMapper().RESTMapping(gk, versions...)
|
||||
}
|
||||
|
||||
gvk := gk.WithVersion(versions[0])
|
||||
|
@ -65,16 +67,49 @@ func (g *cachedRESTMapper) RESTMapping(gk schema.GroupKind, versions ...string)
|
|||
}
|
||||
|
||||
// consult underlying mapper and then update cache
|
||||
restMapping, err := g.restMapper.RESTMapping(gk, versions...)
|
||||
restMapping, err := g.getMapper().RESTMapping(gk, versions...)
|
||||
if meta.IsNoMatchError(err) {
|
||||
// hit here means a resource might be missing from the current rest mapper,
|
||||
// probably because a new resource(CRD) has been added, we have to reload
|
||||
// resource and rebuild the rest mapper.
|
||||
|
||||
var groupResources []*restmapper.APIGroupResources
|
||||
groupResources, err = restmapper.GetAPIGroupResources(g.discoveryClient)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
newMapper := restmapper.NewDiscoveryRESTMapper(groupResources)
|
||||
restMapping, err = newMapper.RESTMapping(gk, versions...)
|
||||
if err == nil {
|
||||
// hit here means after reloading, the new rest mapper can recognize
|
||||
// the resource, we have to replace the mapper and clear cache.
|
||||
g.mu.Lock()
|
||||
g.restMapper = newMapper
|
||||
g.mu.Unlock()
|
||||
g.gvkToGVR.Range(func(key, value any) bool {
|
||||
g.gvkToGVR.Delete(key)
|
||||
return true
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return restMapping, err
|
||||
}
|
||||
g.gvkToGVR.Store(gvk, restMapping)
|
||||
|
||||
return restMapping, nil
|
||||
}
|
||||
|
||||
func (g *cachedRESTMapper) getMapper() meta.RESTMapper {
|
||||
g.mu.RLock()
|
||||
defer g.mu.RUnlock()
|
||||
return g.restMapper
|
||||
}
|
||||
|
||||
// NewCachedRESTMapper builds a cachedRESTMapper with a customized underlyingMapper.
|
||||
// If underlyingMapper is nil, defaults to DynamicRESTMapper.
|
||||
// If underlyingMapper is nil, defaults to DiscoveryRESTMapper.
|
||||
func NewCachedRESTMapper(cfg *rest.Config, underlyingMapper meta.RESTMapper) (meta.RESTMapper, error) {
|
||||
cachedMapper := cachedRESTMapper{}
|
||||
|
||||
|
@ -89,21 +124,14 @@ func NewCachedRESTMapper(cfg *rest.Config, underlyingMapper meta.RESTMapper) (me
|
|||
return nil, err
|
||||
}
|
||||
|
||||
option := apiutil.WithCustomMapper(func() (meta.RESTMapper, error) {
|
||||
groupResources, err := restmapper.GetAPIGroupResources(client)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// clear the cache map when reloading DiscoveryRESTMapper
|
||||
cachedMapper.gvkToGVR = sync.Map{}
|
||||
return restmapper.NewDiscoveryRESTMapper(groupResources), nil
|
||||
})
|
||||
|
||||
underlyingMapper, err = apiutil.NewDynamicRESTMapper(cfg, option)
|
||||
// loading current resources for building a base rest mapper.
|
||||
groupResources, err := restmapper.GetAPIGroupResources(client)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cachedMapper.restMapper = underlyingMapper
|
||||
|
||||
cachedMapper.restMapper = restmapper.NewDiscoveryRESTMapper(groupResources)
|
||||
cachedMapper.discoveryClient = client
|
||||
|
||||
return &cachedMapper, nil
|
||||
}
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package restmapper
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
|
@ -128,26 +127,36 @@ func BenchmarkGetGroupVersionResource(b *testing.B) {
|
|||
}
|
||||
}
|
||||
}
|
||||
func BenchmarkGetGroupVersionResourceWithoutCache(b *testing.B) {
|
||||
groupResources, err := restmapper.GetAPIGroupResources(discoveryClient)
|
||||
if err != nil {
|
||||
b.Fatalf("Failed to load resources: %v", err)
|
||||
}
|
||||
|
||||
mapper := restmapper.NewDiscoveryRESTMapper(groupResources)
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
for _, tc := range getGVRTestCases {
|
||||
_, err := GetGroupVersionResource(mapper, tc.inputGVK)
|
||||
if (err != nil && !tc.expectErr) || (err == nil && tc.expectErr) {
|
||||
b.Errorf("GetGroupVersionResource For %#v Error: %v, wantErr: %v", tc.inputGVK, err, tc.expectErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkGetGroupVersionResourceWithCache(b *testing.B) {
|
||||
cachedMapper := &cachedRESTMapper{}
|
||||
|
||||
var option = apiutil.WithCustomMapper(func() (meta.RESTMapper, error) {
|
||||
groupResources, err := restmapper.GetAPIGroupResources(discoveryClient)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// clear the cache map when reloading DiscoveryRESTMapper
|
||||
cachedMapper.gvkToGVR = sync.Map{}
|
||||
return restmapper.NewDiscoveryRESTMapper(groupResources), nil
|
||||
})
|
||||
|
||||
mapper, err := apiutil.NewDynamicRESTMapper(&rest.Config{}, option)
|
||||
groupResources, err := restmapper.GetAPIGroupResources(discoveryClient)
|
||||
if err != nil {
|
||||
b.Error(err)
|
||||
b.Fatalf("Failed to load resources: %v", err)
|
||||
}
|
||||
|
||||
cachedMapper.restMapper = mapper
|
||||
newMapper := restmapper.NewDiscoveryRESTMapper(groupResources)
|
||||
cachedMapper.restMapper = newMapper
|
||||
cachedMapper.discoveryClient = discoveryClient
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
|
@ -163,22 +172,14 @@ func BenchmarkGetGroupVersionResourceWithCache(b *testing.B) {
|
|||
func TestGetGroupVersionResourceWithCache(t *testing.T) {
|
||||
cachedMapper := &cachedRESTMapper{}
|
||||
|
||||
var option = apiutil.WithCustomMapper(func() (meta.RESTMapper, error) {
|
||||
groupResources, err := restmapper.GetAPIGroupResources(discoveryClient)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// clear the cache map when reloading DiscoveryRESTMapper
|
||||
cachedMapper.gvkToGVR = sync.Map{}
|
||||
return restmapper.NewDiscoveryRESTMapper(groupResources), nil
|
||||
})
|
||||
|
||||
mapper, err := apiutil.NewDynamicRESTMapper(&rest.Config{}, option)
|
||||
groupResources, err := restmapper.GetAPIGroupResources(discoveryClient)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatalf("Failed to load resources: %v", err)
|
||||
}
|
||||
|
||||
cachedMapper.restMapper = mapper
|
||||
newMapper := restmapper.NewDiscoveryRESTMapper(groupResources)
|
||||
cachedMapper.restMapper = newMapper
|
||||
cachedMapper.discoveryClient = discoveryClient
|
||||
|
||||
for _, tc := range getGVRTestCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
|
|
Loading…
Reference in New Issue