Polling package should handle missing resource types in RESTMapper

This commit is contained in:
Morten Torkildsen 2020-05-06 19:19:41 -07:00
parent 025c3c6730
commit 5a34f96b10
2 changed files with 82 additions and 49 deletions

View File

@ -54,7 +54,7 @@ func NewCachingClusterReader(reader client.Reader, mapper meta.RESTMapper, ident
for _, id := range identifiers {
// For every identifier, add the GroupVersionKind and namespace combination to the gvkNamespaceSet and
// check the genGroupKinds map for any generated resources that also should be included.
err := buildGvkNamespaceSet(mapper, []schema.GroupKind{id.GroupKind}, id.Namespace, gvkNamespaceSet)
err := buildGvkNamespaceSet([]schema.GroupKind{id.GroupKind}, id.Namespace, gvkNamespaceSet)
if err != nil {
return nil, err
}
@ -67,19 +67,15 @@ func NewCachingClusterReader(reader client.Reader, mapper meta.RESTMapper, ident
}, nil
}
func buildGvkNamespaceSet(mapper meta.RESTMapper, gks []schema.GroupKind, namespace string, gvkNamespaceSet *gvkNamespaceSet) error {
func buildGvkNamespaceSet(gks []schema.GroupKind, namespace string, gvkNamespaceSet *gvkNamespaceSet) error {
for _, gk := range gks {
mapping, err := mapper.RESTMapping(gk)
if err != nil {
return err
}
gvkNamespaceSet.add(gvkNamespace{
GVK: mapping.GroupVersionKind,
gvkNamespaceSet.add(gkNamespace{
GroupKind: gk,
Namespace: namespace,
})
genGKs, found := genGroupKinds[gk]
if found {
err := buildGvkNamespaceSet(mapper, genGKs, namespace, gvkNamespaceSet)
err := buildGvkNamespaceSet(genGKs, namespace, gvkNamespaceSet)
if err != nil {
return err
}
@ -89,18 +85,18 @@ func buildGvkNamespaceSet(mapper meta.RESTMapper, gks []schema.GroupKind, namesp
}
type gvkNamespaceSet struct {
gvkNamespaces []gvkNamespace
seen map[gvkNamespace]bool
gvkNamespaces []gkNamespace
seen map[gkNamespace]bool
}
func newGnSet() *gvkNamespaceSet {
return &gvkNamespaceSet{
gvkNamespaces: make([]gvkNamespace, 0),
seen: make(map[gvkNamespace]bool),
gvkNamespaces: make([]gkNamespace, 0),
seen: make(map[gkNamespace]bool),
}
}
func (g *gvkNamespaceSet) add(gn gvkNamespace) {
func (g *gvkNamespaceSet) add(gn gkNamespace) {
if _, found := g.seen[gn]; !found {
g.gvkNamespaces = append(g.gvkNamespaces, gn)
g.seen[gn] = true
@ -126,12 +122,12 @@ type CachingClusterReader struct {
// should be included in the cache. This is computed based the resource identifiers
// passed in when the CachingClusterReader is created and augmented with other
// resource types needed to compute status (see genGroupKinds).
gns []gvkNamespace
gns []gkNamespace
// cache contains the resources found in the cluster for the given combination
// of GVK and namespace. Before each polling cycle, the framework will call the
// Sync function, which is responsible for repopulating the cache.
cache map[gvkNamespace]cacheEntry
cache map[gkNamespace]cacheEntry
}
type cacheEntry struct {
@ -139,9 +135,9 @@ type cacheEntry struct {
err error
}
// gvkNamespace contains information about a GroupVersionKind and a namespace.
type gvkNamespace struct {
GVK schema.GroupVersionKind
// gkNamespace contains information about a GroupVersionKind and a namespace.
type gkNamespace struct {
GroupKind schema.GroupKind
Namespace string
}
@ -155,8 +151,8 @@ func (c *CachingClusterReader) Get(_ context.Context, key client.ObjectKey, obj
if err != nil {
return err
}
gn := gvkNamespace{
GVK: gvk,
gn := gkNamespace{
GroupKind: gvk.GroupKind(),
Namespace: key.Namespace,
}
cacheEntry, found := c.cache[gn]
@ -182,8 +178,8 @@ func (c *CachingClusterReader) ListNamespaceScoped(_ context.Context, list *unst
c.RLock()
defer c.RUnlock()
gvk := list.GroupVersionKind()
gn := gvkNamespace{
GVK: gvk,
gn := gkNamespace{
GroupKind: gvk.GroupKind(),
Namespace: namespace,
}
@ -213,15 +209,26 @@ func (c *CachingClusterReader) ListClusterScoped(ctx context.Context, list *unst
return c.ListNamespaceScoped(ctx, list, "", selector)
}
// Sync loops over the list of gvkNamespace we know of, and uses list calls to fetch the resources.
// Sync loops over the list of gkNamespace we know of, and uses list calls to fetch the resources.
// This information populates the cache.
func (c *CachingClusterReader) Sync(ctx context.Context) error {
c.Lock()
defer c.Unlock()
cache := make(map[gvkNamespace]cacheEntry)
cache := make(map[gkNamespace]cacheEntry)
for _, gn := range c.gns {
mapping, err := c.mapper.RESTMapping(gn.GVK.GroupKind())
mapping, err := c.mapper.RESTMapping(gn.GroupKind)
if err != nil {
if meta.IsNoMatchError(err) {
// If we get a NoMatchError, it means we are checking for
// a type that doesn't exist. Presumably the CRD is being
// applied, so it will be added. Reset the RESTMapper to
// make sure we pick up any new resource types on the
// APIServer.
cache[gn] = cacheEntry{
err: err,
}
continue
}
return err
}
var listOptions []client.ListOption
@ -229,7 +236,7 @@ func (c *CachingClusterReader) Sync(ctx context.Context) error {
listOptions = append(listOptions, client.InNamespace(gn.Namespace))
}
var list unstructured.UnstructuredList
list.SetGroupVersionKind(gn.GVK)
list.SetGroupVersionKind(mapping.GroupVersionKind)
err = c.reader.List(ctx, &list, listOptions...)
if err != nil {
// If we get an IsNotFound error here, it means the type

View File

@ -14,6 +14,7 @@ import (
v1 "k8s.io/api/core/v1"
apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/testutil"
@ -30,7 +31,7 @@ var (
func TestSync(t *testing.T) {
testCases := map[string]struct {
identifiers []object.ObjMetadata
expectedSynced []gvkNamespace
expectedSynced []gkNamespace
}{
"no identifiers": {
identifiers: []object.ObjMetadata{},
@ -48,29 +49,29 @@ func TestSync(t *testing.T) {
Namespace: "Bar",
},
},
expectedSynced: []gvkNamespace{
expectedSynced: []gkNamespace{
{
GVK: deploymentGVK,
GroupKind: deploymentGVK.GroupKind(),
Namespace: "Foo",
},
{
GVK: rsGVK,
GroupKind: rsGVK.GroupKind(),
Namespace: "Foo",
},
{
GVK: podGVK,
GroupKind: podGVK.GroupKind(),
Namespace: "Foo",
},
{
GVK: deploymentGVK,
GroupKind: deploymentGVK.GroupKind(),
Namespace: "Bar",
},
{
GVK: rsGVK,
GroupKind: rsGVK.GroupKind(),
Namespace: "Bar",
},
{
GVK: podGVK,
GroupKind: podGVK.GroupKind(),
Namespace: "Bar",
},
},
@ -106,19 +107,46 @@ func TestSync(t *testing.T) {
func TestSync_Errors(t *testing.T) {
testCases := map[string]struct {
mapper meta.RESTMapper
readerError error
expectSyncError bool
cacheError bool
cacheErrorText string
}{
"mapping and reader are successful": {
mapper: testutil.NewFakeRESTMapper(
apiextv1.SchemeGroupVersion.WithKind("CustomResourceDefinition"),
),
readerError: nil,
expectSyncError: false,
cacheError: false,
},
"reader returns NotFound error": {
mapper: testutil.NewFakeRESTMapper(
apiextv1.SchemeGroupVersion.WithKind("CustomResourceDefinition"),
),
readerError: errors.NewNotFound(schema.GroupResource{
Group: "apiextensions.k8s.io",
Resource: "customresourcedefinitions",
}, "my-crd"),
expectSyncError: false,
cacheError: true,
cacheErrorText: "not found",
},
"reader returns other error": {
mapper: testutil.NewFakeRESTMapper(
apiextv1.SchemeGroupVersion.WithKind("CustomResourceDefinition"),
),
readerError: errors.NewInternalError(fmt.Errorf("testing")),
expectSyncError: true,
cacheError: true,
cacheErrorText: "not found",
},
"mapping not found": {
mapper: testutil.NewFakeRESTMapper(),
expectSyncError: false,
cacheError: true,
cacheErrorText: "no matches for kind",
},
}
@ -134,15 +162,11 @@ func TestSync_Errors(t *testing.T) {
},
}
fakeMapper := testutil.NewFakeRESTMapper(
apiextv1.SchemeGroupVersion.WithKind("CustomResourceDefinition"),
)
fakeReader := &fakeReader{
err: tc.readerError,
}
clusterReader, err := NewCachingClusterReader(fakeReader, fakeMapper, identifiers)
clusterReader, err := NewCachingClusterReader(fakeReader, tc.mapper, identifiers)
assert.NilError(t, err)
err = clusterReader.Sync(context.Background())
@ -153,26 +177,28 @@ func TestSync_Errors(t *testing.T) {
}
assert.NilError(t, err)
cacheEntry, found := clusterReader.cache[gvkNamespace{
GVK: apiextv1.SchemeGroupVersion.WithKind("CustomResourceDefinition"),
cacheEntry, found := clusterReader.cache[gkNamespace{
GroupKind: apiextv1.SchemeGroupVersion.WithKind("CustomResourceDefinition").GroupKind(),
}]
assert.Check(t, found)
assert.ErrorContains(t, cacheEntry.err, "not found")
if tc.cacheError {
assert.ErrorContains(t, cacheEntry.err, tc.cacheErrorText)
}
})
}
}
func sortGVKNamespaces(gvkNamespaces []gvkNamespace) {
func sortGVKNamespaces(gvkNamespaces []gkNamespace) {
sort.Slice(gvkNamespaces, func(i, j int) bool {
if gvkNamespaces[i].GVK.String() != gvkNamespaces[j].GVK.String() {
return gvkNamespaces[i].GVK.String() < gvkNamespaces[j].GVK.String()
if gvkNamespaces[i].GroupKind.String() != gvkNamespaces[j].GroupKind.String() {
return gvkNamespaces[i].GroupKind.String() < gvkNamespaces[j].GroupKind.String()
}
return gvkNamespaces[i].Namespace < gvkNamespaces[j].Namespace
})
}
type fakeReader struct {
syncedGVKNamespaces []gvkNamespace
syncedGVKNamespaces []gkNamespace
err error
}
@ -191,8 +217,8 @@ func (f *fakeReader) List(_ context.Context, list runtime.Object, opts ...client
}
gvk := list.GetObjectKind().GroupVersionKind()
f.syncedGVKNamespaces = append(f.syncedGVKNamespaces, gvkNamespace{
GVK: gvk,
f.syncedGVKNamespaces = append(f.syncedGVKNamespaces, gkNamespace{
GroupKind: gvk.GroupKind(),
Namespace: namespace,
})