diff --git a/pkg/apply/applier.go b/pkg/apply/applier.go index c28dee8..d34f683 100644 --- a/pkg/apply/applier.go +++ b/pkg/apply/applier.go @@ -292,11 +292,18 @@ func (a *Applier) Run(ctx context.Context, objects []*resource.Info, options Opt return } + mapper, err := a.factory.ToRESTMapper() + if err != nil { + handleError(eventChannel, err) + return + } + // Fetch the queue (channel) of tasks that should be executed. taskQueue := (&solver.TaskQueueSolver{ ApplyOptions: a.ApplyOptions, PruneOptions: a.PruneOptions, InfoHelper: a.infoHelperFactoryFunc(), + Mapper: mapper, }).BuildTaskQueue(resourceObjects, solver.Options{ ReconcileTimeout: options.ReconcileTimeout, Prune: !options.NoPrune, diff --git a/pkg/apply/applier_test.go b/pkg/apply/applier_test.go index 243a28d..75794f1 100644 --- a/pkg/apply/applier_test.go +++ b/pkg/apply/applier_test.go @@ -801,11 +801,3 @@ func (f *fakeInfoHelper) getClient(gv schema.GroupVersion) (resource.RESTClient, } return f.factory.Client, nil } - -func (f *fakeInfoHelper) ResetRESTMapper() error { - return nil -} - -func (f *fakeInfoHelper) ToRESTMapper() (meta.RESTMapper, error) { - return f.factory.ToRESTMapper() -} diff --git a/pkg/apply/info/info_helper.go b/pkg/apply/info/info_helper.go index 7dd02da..048361b 100644 --- a/pkg/apply/info/info_helper.go +++ b/pkg/apply/info/info_helper.go @@ -4,14 +4,10 @@ package info import ( - "fmt" - "reflect" - "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/cli-runtime/pkg/resource" "k8s.io/client-go/rest" - "k8s.io/client-go/restmapper" "k8s.io/kubectl/pkg/cmd/util" ) @@ -21,13 +17,6 @@ type InfoHelper interface { // objects. This must be called at a time when all needed resource // types are available in the RESTMapper. UpdateInfos(infos []*resource.Info) error - - // ResetRESTMapper resets the state of the RESTMapper so any - // added resource types in the cluster will be picked up. - ResetRESTMapper() error - - // ToRESTMapper returns a RESTMapper - ToRESTMapper() (meta.RESTMapper, error) } func NewInfoHelper(factory util.Factory, namespace string) *infoHelper { @@ -68,20 +57,6 @@ func (ih *infoHelper) ToRESTMapper() (meta.RESTMapper, error) { return ih.factory.ToRESTMapper() } -func (ih *infoHelper) ResetRESTMapper() error { - mapper, err := ih.factory.ToRESTMapper() - if err != nil { - return err - } - fv := reflect.ValueOf(mapper).FieldByName("RESTMapper") - ddRESTMapper, ok := fv.Interface().(*restmapper.DeferredDiscoveryRESTMapper) - if !ok { - return fmt.Errorf("unexpected RESTMapper type") - } - ddRESTMapper.Reset() - return nil -} - func (ih *infoHelper) getClient(gv schema.GroupVersion) (*rest.RESTClient, error) { cfg, err := ih.factory.ToRESTConfig() if err != nil { diff --git a/pkg/apply/solver/solver.go b/pkg/apply/solver/solver.go index bc81f8a..cd74ae5 100644 --- a/pkg/apply/solver/solver.go +++ b/pkg/apply/solver/solver.go @@ -19,6 +19,7 @@ import ( v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/cli-runtime/pkg/resource" @@ -36,6 +37,7 @@ type TaskQueueSolver struct { ApplyOptions *apply.ApplyOptions PruneOptions *prune.PruneOptions InfoHelper info.InfoHelper + Mapper meta.RESTMapper } type Options struct { @@ -68,13 +70,16 @@ func (t *TaskQueueSolver) BuildTaskQueue(ro resourceObjects, ApplyOptions: t.ApplyOptions, DryRun: o.DryRun, InfoHelper: t.InfoHelper, + Mapper: t.Mapper, }) if !o.DryRun { tasks = append(tasks, taskrunner.NewWaitTask( object.InfosToObjMetas(crdSplitRes.crds), taskrunner.AllCurrent, 1*time.Minute), - ) + &task.ResetRESTMapperTask{ + Mapper: t.Mapper, + }) } remainingInfos = crdSplitRes.after } @@ -86,6 +91,7 @@ func (t *TaskQueueSolver) BuildTaskQueue(ro resourceObjects, ApplyOptions: t.ApplyOptions, DryRun: o.DryRun, InfoHelper: t.InfoHelper, + Mapper: t.Mapper, }, &task.SendEventTask{ Event: event.Event{ diff --git a/pkg/apply/solver/solver_test.go b/pkg/apply/solver/solver_test.go index 4c90fb2..d859daa 100644 --- a/pkg/apply/solver/solver_test.go +++ b/pkg/apply/solver/solver_test.go @@ -16,6 +16,7 @@ import ( "sigs.k8s.io/cli-utils/pkg/apply/task" "sigs.k8s.io/cli-utils/pkg/apply/taskrunner" "sigs.k8s.io/cli-utils/pkg/object" + "sigs.k8s.io/cli-utils/pkg/testutil" ) var ( @@ -149,6 +150,7 @@ func TestTaskQueueSolver_BuildTaskQueue(t *testing.T) { object.InfoToObjMeta(crdInfo), }, taskrunner.AllCurrent, 1*time.Second), + &task.ResetRESTMapperTask{}, &task.ApplyTask{ Objects: []*resource.Info{ depInfo, @@ -194,6 +196,7 @@ func TestTaskQueueSolver_BuildTaskQueue(t *testing.T) { tqs := TaskQueueSolver{ ApplyOptions: applyOptions, PruneOptions: pruneOptions, + Mapper: testutil.NewFakeRESTMapper(), } tq := tqs.BuildTaskQueue(&fakeResourceObjects{ diff --git a/pkg/apply/task/apply_task.go b/pkg/apply/task/apply_task.go index 273d02d..6de4964 100644 --- a/pkg/apply/task/apply_task.go +++ b/pkg/apply/task/apply_task.go @@ -20,6 +20,7 @@ import ( type ApplyTask struct { ApplyOptions applyOptions InfoHelper info.InfoHelper + Mapper meta.RESTMapper Objects []*resource.Info CRDs []*resource.Info DryRun bool @@ -113,11 +114,6 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) { gen := acc.GetGeneration() taskContext.ResourceApplied(id, gen) } - err = a.InfoHelper.ResetRESTMapper() - if err != nil { - a.sendTaskResult(taskContext, err) - return - } a.sendTaskResult(taskContext, nil) }() } @@ -149,18 +145,13 @@ func (a *ApplyTask) filterCRsWithCRDInSet(objects []*resource.Info) ([]*resource var objs []*resource.Info var objsWithCRD []*resource.Info - mapper, err := a.InfoHelper.ToRESTMapper() - if err != nil { - return objs, objsWithCRD, err - } - crdsInfo := buildCRDsInfo(a.CRDs) for _, obj := range objects { gvk := obj.Object.GetObjectKind().GroupVersionKind() // First check if we find the type in the RESTMapper. //TODO: Maybe we do care if there is a new version of the CRD? - _, err := mapper.RESTMapping(gvk.GroupKind()) + _, err := a.Mapper.RESTMapping(gvk.GroupKind()) if err != nil && !meta.IsNoMatchError(err) { return objs, objsWithCRD, err } diff --git a/pkg/apply/task/apply_task_test.go b/pkg/apply/task/apply_task_test.go index 95513da..b81ee2a 100644 --- a/pkg/apply/task/apply_task_test.go +++ b/pkg/apply/task/apply_task_test.go @@ -8,7 +8,6 @@ import ( "testing" "gotest.tools/assert" - "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/cli-runtime/pkg/resource" @@ -231,11 +230,10 @@ func TestApplyTask_DryRun(t *testing.T) { applyTask := &ApplyTask{ ApplyOptions: applyOptions, Objects: tc.infos, - InfoHelper: &fakeInfoHelper{ - restMapper: restMapper, - }, - DryRun: true, - CRDs: tc.crds, + InfoHelper: &fakeInfoHelper{}, + Mapper: restMapper, + DryRun: true, + CRDs: tc.crds, } var events []event.Event @@ -307,18 +305,8 @@ func (f *fakeApplyOptions) SetObjects(objects []*resource.Info) { f.objects = objects } -type fakeInfoHelper struct { - restMapper meta.RESTMapper -} +type fakeInfoHelper struct{} func (f *fakeInfoHelper) UpdateInfos([]*resource.Info) error { return nil } - -func (f *fakeInfoHelper) ResetRESTMapper() error { - return nil -} - -func (f *fakeInfoHelper) ToRESTMapper() (meta.RESTMapper, error) { - return f.restMapper, nil -} diff --git a/pkg/apply/task/resetmapper_task.go b/pkg/apply/task/resetmapper_task.go new file mode 100644 index 0000000..877b8f1 --- /dev/null +++ b/pkg/apply/task/resetmapper_task.go @@ -0,0 +1,61 @@ +// Copyright 2020 The Kubernetes Authors. +// SPDX-License-Identifier: Apache-2.0 + +package task + +import ( + "fmt" + "reflect" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/client-go/restmapper" + "sigs.k8s.io/cli-utils/pkg/apply/taskrunner" +) + +// ResetRESTMapperTask resets the provided RESTMapper. +type ResetRESTMapperTask struct { + Mapper meta.RESTMapper +} + +// Start creates a new goroutine that will unwrap the provided RESTMapper +// to get the underlying DeferredDiscoveryRESTMapper and then reset it. It +// will send a TaskResult on the taskChannel to signal that the task has +// been completed. +func (r *ResetRESTMapperTask) Start(taskContext *taskrunner.TaskContext) { + go func() { + ddRESTMapper, err := extractDeferredDiscoveryRESTMapper(r.Mapper) + if err != nil { + r.sendTaskResult(taskContext, err) + return + } + ddRESTMapper.Reset() + r.sendTaskResult(taskContext, nil) + }() +} + +// extractDeferredDiscoveryRESTMapper unwraps the provided RESTMapper +// interface to get access to the underlying DeferredDiscoveryRESTMapper +// that can be reset. +func extractDeferredDiscoveryRESTMapper(mapper meta.RESTMapper) (*restmapper.DeferredDiscoveryRESTMapper, + error) { + val := reflect.ValueOf(mapper) + if val.Type().Kind() != reflect.Struct { + return nil, fmt.Errorf("unexpected RESTMapper type: %s", val.Type().String()) + } + fv := val.FieldByName("RESTMapper") + ddRESTMapper, ok := fv.Interface().(*restmapper.DeferredDiscoveryRESTMapper) + if !ok { + return nil, fmt.Errorf("unexpected RESTMapper type") + } + return ddRESTMapper, nil +} + +func (r *ResetRESTMapperTask) sendTaskResult(taskContext *taskrunner.TaskContext, err error) { + taskContext.TaskChannel() <- taskrunner.TaskResult{ + Err: err, + } +} + +// ClearTimeout doesn't do anything as ResetRESTMapperTask doesn't support +// timeouts. +func (r *ResetRESTMapperTask) ClearTimeout() {} diff --git a/pkg/apply/task/resetmapper_task_test.go b/pkg/apply/task/resetmapper_task_test.go new file mode 100644 index 0000000..5eb6810 --- /dev/null +++ b/pkg/apply/task/resetmapper_task_test.go @@ -0,0 +1,79 @@ +// Copyright 2020 The Kubernetes Authors. +// SPDX-License-Identifier: Apache-2.0 + +package task + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/client-go/discovery" + "k8s.io/client-go/restmapper" + "sigs.k8s.io/cli-utils/pkg/apply/event" + "sigs.k8s.io/cli-utils/pkg/apply/taskrunner" + "sigs.k8s.io/cli-utils/pkg/testutil" +) + +func TestResetRESTMapperTask(t *testing.T) { + testCases := map[string]struct { + toRESTMapper func() (meta.RESTMapper, *fakeCachedDiscoveryClient) + expectErr bool + expectedErrMessage string + }{ + "correct wrapped RESTMapper": { + toRESTMapper: func() (meta.RESTMapper, *fakeCachedDiscoveryClient) { + discoveryClient := &fakeCachedDiscoveryClient{} + ddRESTMapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient) + return restmapper.NewShortcutExpander(ddRESTMapper, discoveryClient), discoveryClient + }, + expectErr: false, + }, + "incorrect wrapped RESTMapper": { + toRESTMapper: func() (meta.RESTMapper, *fakeCachedDiscoveryClient) { + return testutil.NewFakeRESTMapper(), nil + }, + expectErr: true, + expectedErrMessage: "unexpected RESTMapper type", + }, + } + + for tn, tc := range testCases { + t.Run(tn, func(t *testing.T) { + eventChannel := make(chan event.Event) + defer close(eventChannel) + taskContext := taskrunner.NewTaskContext(eventChannel) + + mapper, discoveryClient := tc.toRESTMapper() + + resetRESTMapperTask := &ResetRESTMapperTask{ + Mapper: mapper, + } + + resetRESTMapperTask.Start(taskContext) + + result := <-taskContext.TaskChannel() + + if tc.expectErr { + assert.Error(t, result.Err) + assert.Contains(t, result.Err.Error(), tc.expectedErrMessage) + return + } + + assert.True(t, discoveryClient.invalidated) + }) + } +} + +type fakeCachedDiscoveryClient struct { + discovery.DiscoveryInterface + invalidated bool +} + +func (d *fakeCachedDiscoveryClient) Fresh() bool { + return true +} + +func (d *fakeCachedDiscoveryClient) Invalidate() { + d.invalidated = true +}