feat!: applier configuration options

BREAKING CHANGE: NewApplierBuilder() is the new way to construct an Applier
BREAKING CHANGE: apply.Options renamed into apply.ApplierOptions
This commit is contained in:
Mikhail Mazurskiy 2021-12-26 14:30:03 +11:00
parent 5fb19a18af
commit 2c815d2843
No known key found for this signature in database
GPG Key ID: FA7917C48932DD55
26 changed files with 311 additions and 191 deletions

View File

@ -141,7 +141,10 @@ func (r *ApplyRunner) RunE(cmd *cobra.Command, args []string) error {
// Run the applier. It will return a channel where we can receive updates
// to keep track of progress and any issues.
a, err := apply.NewApplier(r.factory, invClient)
a, err := apply.NewApplierBuilder().
WithFactory(r.factory).
WithInventoryClient(invClient).
Build()
if err != nil {
return err
}
@ -151,7 +154,7 @@ func (r *ApplyRunner) RunE(cmd *cobra.Command, args []string) error {
r.printStatusEvents = true
}
ch := a.Run(ctx, inv, objs, apply.Options{
ch := a.Run(ctx, inv, objs, apply.ApplierOptions{
ServerSideOptions: r.serverSideOptions,
PollInterval: r.period,
ReconcileTimeout: r.reconcileTimeout,

View File

@ -138,14 +138,17 @@ func (r *PreviewRunner) RunE(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}
a, err := apply.NewApplier(r.factory, invClient)
a, err := apply.NewApplierBuilder().
WithFactory(r.factory).
WithInventoryClient(invClient).
Build()
if err != nil {
return err
}
// Run the applier. It will return a channel where we can receive updates
// to keep track of progress and any issues.
ch = a.Run(ctx, inv, objs, apply.Options{
ch = a.Run(ctx, inv, objs, apply.ApplierOptions{
EmitStatusEvents: false,
NoPrune: noPrune,
DryRunStrategy: drs,

View File

@ -9,10 +9,12 @@ import (
"sort"
"time"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/klog/v2"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"sigs.k8s.io/cli-utils/pkg/apply/cache"
"sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/apply/filter"
@ -24,35 +26,12 @@ import (
"sigs.k8s.io/cli-utils/pkg/apply/taskrunner"
"sigs.k8s.io/cli-utils/pkg/common"
"sigs.k8s.io/cli-utils/pkg/inventory"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine"
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/cli-utils/pkg/object/validation"
"sigs.k8s.io/cli-utils/pkg/ordering"
)
// NewApplier returns a new Applier.
func NewApplier(factory cmdutil.Factory, invClient inventory.InventoryClient) (*Applier, error) {
pruner, err := prune.NewPruner(factory, invClient)
if err != nil {
return nil, err
}
statusPoller, err := polling.NewStatusPollerFromFactory(factory, []engine.StatusReader{})
if err != nil {
return nil, err
}
mapper, err := factory.ToRESTMapper()
if err != nil {
return nil, err
}
return &Applier{
pruner: pruner,
StatusPoller: statusPoller,
factory: factory,
invClient: invClient,
infoHelper: info.NewInfoHelper(mapper, factory),
}, nil
}
const defaultPollInterval = 2 * time.Second
// Applier performs the step of applying a set of resources into a cluster,
// conditionally waits for all of them to be fully reconciled and finally
@ -65,17 +44,19 @@ func NewApplier(factory cmdutil.Factory, invClient inventory.InventoryClient) (*
// parameters and/or the set of resources that needs to be applied to the
// cluster, different sets of tasks might be needed.
type Applier struct {
pruner *prune.Pruner
StatusPoller poller.Poller
factory cmdutil.Factory
invClient inventory.InventoryClient
infoHelper info.InfoHelper
pruner *prune.Pruner
statusPoller poller.Poller
invClient inventory.InventoryClient
client dynamic.Interface
openAPIGetter discovery.OpenAPISchemaInterface
mapper meta.RESTMapper
infoHelper info.InfoHelper
}
// prepareObjects returns the set of objects to apply and to prune or
// an error if one occurred.
func (a *Applier) prepareObjects(localInv inventory.InventoryInfo, localObjs object.UnstructuredSet,
o Options) (object.UnstructuredSet, object.UnstructuredSet, error) {
o ApplierOptions) (object.UnstructuredSet, object.UnstructuredSet, error) {
if localInv == nil {
return nil, nil, fmt.Errorf("the local inventory can't be nil")
}
@ -117,37 +98,25 @@ func (a *Applier) prepareObjects(localInv inventory.InventoryInfo, localObjs obj
}
// Run performs the Apply step. This happens asynchronously with updates
// on progress and any errors are reported back on the event channel.
// on progress and any errors reported back on the event channel.
// Cancelling the operation or setting timeout on how long to Wait
// for it complete can be done with the passed in context.
// Note: There isn't currently any way to interrupt the operation
// before all the given resources have been applied to the cluster. Any
// cancellation or timeout will only affect how long we Wait for the
// resources to become current.
func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, objects object.UnstructuredSet, options Options) <-chan event.Event {
func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, objects object.UnstructuredSet, options ApplierOptions) <-chan event.Event {
klog.V(4).Infof("apply run for %d objects", len(objects))
eventChannel := make(chan event.Event)
setDefaults(&options)
go func() {
defer close(eventChannel)
client, err := a.factory.DynamicClient()
if err != nil {
handleError(eventChannel, err)
return
}
mapper, err := a.factory.ToRESTMapper()
if err != nil {
handleError(eventChannel, err)
return
}
// Validate the resources to make sure we catch those problems early
// before anything has been updated in the cluster.
vCollector := &validation.Collector{}
validator := &validation.Validator{
Collector: vCollector,
Mapper: mapper,
Mapper: a.mapper,
}
validator.Validate(objects)
@ -162,13 +131,14 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje
// Fetch the queue (channel) of tasks that should be executed.
klog.V(4).Infoln("applier building task queue...")
taskBuilder := &solver.TaskQueueBuilder{
Pruner: a.pruner,
Factory: a.factory,
InfoHelper: a.infoHelper,
Mapper: mapper,
InvClient: a.invClient,
Destroy: false,
Collector: vCollector,
Pruner: a.pruner,
DynamicClient: a.client,
OpenAPIGetter: a.openAPIGetter,
InfoHelper: a.infoHelper,
Mapper: a.mapper,
InvClient: a.invClient,
Destroy: false,
Collector: vCollector,
}
opts := solver.Options{
ServerSideOptions: options.ServerSideOptions,
@ -180,11 +150,11 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje
InventoryPolicy: options.InventoryPolicy,
}
// Build list of apply validation filters.
applyFilters := []filter.ValidationFilter{}
var applyFilters []filter.ValidationFilter
if options.InventoryPolicy != inventory.AdoptAll {
applyFilters = append(applyFilters, filter.InventoryPolicyApplyFilter{
Client: client,
Mapper: mapper,
Client: a.client,
Mapper: a.mapper,
Inv: invInfo,
InvPolicy: options.InventoryPolicy,
})
@ -205,8 +175,8 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje
resourceCache := cache.NewResourceCacheMap()
applyMutators := []mutator.Interface{
&mutator.ApplyTimeMutator{
Client: client,
Mapper: mapper,
Client: a.client,
Mapper: a.mapper,
ResourceCache: resourceCache,
},
}
@ -258,7 +228,7 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje
// Create a new TaskStatusRunner to execute the taskQueue.
klog.V(4).Infoln("applier building TaskStatusRunner...")
allIds := object.UnstructuredSetToObjMetadataSet(append(applyObjs, pruneObjs...))
runner := taskrunner.NewTaskStatusRunner(allIds, a.StatusPoller)
runner := taskrunner.NewTaskStatusRunner(allIds, a.statusPoller)
klog.V(4).Infoln("applier running TaskStatusRunner...")
err = runner.Run(ctx, taskContext, taskQueue.ToChannel(), taskrunner.Options{
PollInterval: options.PollInterval,
@ -273,7 +243,7 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje
return eventChannel
}
type Options struct {
type ApplierOptions struct {
// Encapsulates the fields for server-side apply.
ServerSideOptions common.ServerSideOptions
@ -317,9 +287,9 @@ type Options struct {
// setDefaults set the options to the default values if they
// have not been provided.
func setDefaults(o *Options) {
if o.PollInterval == time.Duration(0) {
o.PollInterval = poller.DefaultPollInterval
func setDefaults(o *ApplierOptions) {
if o.PollInterval == 0 {
o.PollInterval = defaultPollInterval
}
if o.PrunePropagationPolicy == "" {
o.PrunePropagationPolicy = metav1.DeletePropagationBackground

View File

@ -0,0 +1,160 @@
// Copyright 2021 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package apply
import (
"errors"
"fmt"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/scheme"
"sigs.k8s.io/cli-utils/pkg/apply/info"
"sigs.k8s.io/cli-utils/pkg/apply/poller"
"sigs.k8s.io/cli-utils/pkg/apply/prune"
"sigs.k8s.io/cli-utils/pkg/inventory"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling"
"sigs.k8s.io/controller-runtime/pkg/client"
)
type ApplierBuilder struct {
// factory is only used to retrieve things that have not been provided explicitly.
factory util.Factory
invClient inventory.InventoryClient
client dynamic.Interface
discoClient discovery.CachedDiscoveryInterface
mapper meta.RESTMapper
restConfig *rest.Config
unstructuredClientForMapping func(*meta.RESTMapping) (resource.RESTClient, error)
statusPoller poller.Poller
}
// NewApplierBuilder returns a new ApplierBuilder.
func NewApplierBuilder() *ApplierBuilder {
return &ApplierBuilder{
// Defaults, if any, go here.
}
}
func (b *ApplierBuilder) Build() (*Applier, error) {
bx, err := b.finalize()
if err != nil {
return nil, err
}
return &Applier{
pruner: &prune.Pruner{
InvClient: bx.invClient,
Client: bx.client,
Mapper: bx.mapper,
},
statusPoller: bx.statusPoller,
invClient: bx.invClient,
client: bx.client,
openAPIGetter: bx.discoClient,
mapper: bx.mapper,
infoHelper: info.NewInfoHelper(bx.mapper, bx.unstructuredClientForMapping),
}, nil
}
func (b *ApplierBuilder) finalize() (*ApplierBuilder, error) {
bx := *b // make a copy before mutating any fields. Shallow copy is good enough.
var err error
if bx.invClient == nil {
return nil, errors.New("inventory client must be provided")
}
if bx.client == nil {
if bx.factory == nil {
return nil, fmt.Errorf("a factory must be provided or all other options: %v", err)
}
bx.client, err = bx.factory.DynamicClient()
if err != nil {
return nil, fmt.Errorf("error getting dynamic client: %v", err)
}
}
if bx.discoClient == nil {
if bx.factory == nil {
return nil, fmt.Errorf("a factory must be provided or all other options: %v", err)
}
bx.discoClient, err = bx.factory.ToDiscoveryClient()
if err != nil {
return nil, fmt.Errorf("error getting discovery client: %v", err)
}
}
if bx.mapper == nil {
if bx.factory == nil {
return nil, fmt.Errorf("a factory must be provided or all other options: %v", err)
}
bx.mapper, err = bx.factory.ToRESTMapper()
if err != nil {
return nil, fmt.Errorf("error getting rest mapper: %v", err)
}
}
if bx.restConfig == nil {
if bx.factory == nil {
return nil, fmt.Errorf("a factory must be provided or all other options: %v", err)
}
bx.restConfig, err = bx.factory.ToRESTConfig()
if err != nil {
return nil, fmt.Errorf("error getting rest config: %v", err)
}
}
if bx.unstructuredClientForMapping == nil {
if bx.factory == nil {
return nil, fmt.Errorf("a factory must be provided or all other options: %v", err)
}
bx.unstructuredClientForMapping = bx.factory.UnstructuredClientForMapping
}
if bx.statusPoller == nil {
c, err := client.New(bx.restConfig, client.Options{Scheme: scheme.Scheme, Mapper: bx.mapper})
if err != nil {
return nil, fmt.Errorf("error creating client: %v", err)
}
bx.statusPoller = polling.NewStatusPoller(c, bx.mapper, nil)
}
return &bx, nil
}
func (b *ApplierBuilder) WithFactory(factory util.Factory) *ApplierBuilder {
b.factory = factory
return b
}
func (b *ApplierBuilder) WithInventoryClient(invClient inventory.InventoryClient) *ApplierBuilder {
b.invClient = invClient
return b
}
func (b *ApplierBuilder) WithDynamicClient(client dynamic.Interface) *ApplierBuilder {
b.client = client
return b
}
func (b *ApplierBuilder) WithDiscoveryClient(discoClient discovery.CachedDiscoveryInterface) *ApplierBuilder {
b.discoClient = discoClient
return b
}
func (b *ApplierBuilder) WithRestMapper(mapper meta.RESTMapper) *ApplierBuilder {
b.mapper = mapper
return b
}
func (b *ApplierBuilder) WithRestConfig(restConfig *rest.Config) *ApplierBuilder {
b.restConfig = restConfig
return b
}
func (b *ApplierBuilder) WithUnstructuredClientForMapping(unstructuredClientForMapping func(*meta.RESTMapping) (resource.RESTClient, error)) *ApplierBuilder {
b.unstructuredClientForMapping = unstructuredClientForMapping
return b
}
func (b *ApplierBuilder) WithStatusPoller(statusPoller poller.Poller) *ApplierBuilder {
b.statusPoller = statusPoller
return b
}

View File

@ -97,7 +97,7 @@ func TestApplier(t *testing.T) {
// objects in the cluster
clusterObjs object.UnstructuredSet
// options input to applier.Run
options Options
options ApplierOptions
// fake input events from the status poller
statusEvents []pollevent.Event
// expected output status events (async)
@ -120,7 +120,7 @@ func TestApplier(t *testing.T) {
id: "test",
},
clusterObjs: object.UnstructuredSet{},
options: Options{
options: ApplierOptions{
NoPrune: true,
InventoryPolicy: inventory.InventoryPolicyMustMatch,
},
@ -227,7 +227,7 @@ func TestApplier(t *testing.T) {
id: "test",
},
clusterObjs: object.UnstructuredSet{},
options: Options{
options: ApplierOptions{
ReconcileTimeout: time.Minute,
InventoryPolicy: inventory.InventoryPolicyMustMatch,
EmitStatusEvents: true,
@ -423,7 +423,7 @@ func TestApplier(t *testing.T) {
clusterObjs: object.UnstructuredSet{
testutil.Unstructured(t, resources["deployment"]),
},
options: Options{
options: ApplierOptions{
ReconcileTimeout: time.Minute,
InventoryPolicy: inventory.AdoptIfNoInventory,
EmitStatusEvents: true,
@ -605,7 +605,7 @@ func TestApplier(t *testing.T) {
testutil.Unstructured(t, resources["deployment"], testutil.AddOwningInv(t, "test")),
testutil.Unstructured(t, resources["secret"], testutil.AddOwningInv(t, "test")),
},
options: Options{
options: ApplierOptions{
InventoryPolicy: inventory.InventoryPolicyMustMatch,
EmitStatusEvents: true,
},
@ -804,7 +804,7 @@ func TestApplier(t *testing.T) {
clusterObjs: object.UnstructuredSet{
testutil.Unstructured(t, resources["deployment"], testutil.AddOwningInv(t, "unmatched")),
},
options: Options{
options: ApplierOptions{
ReconcileTimeout: time.Minute,
InventoryPolicy: inventory.InventoryPolicyMustMatch,
EmitStatusEvents: true,
@ -944,7 +944,7 @@ func TestApplier(t *testing.T) {
clusterObjs: object.UnstructuredSet{
testutil.Unstructured(t, resources["deployment"], testutil.AddOwningInv(t, "unmatched")),
},
options: Options{
options: ApplierOptions{
InventoryPolicy: inventory.InventoryPolicyMustMatch,
EmitStatusEvents: true,
},
@ -1051,7 +1051,7 @@ func TestApplier(t *testing.T) {
clusterObjs: object.UnstructuredSet{
testutil.Unstructured(t, resources["deployment"], testutil.AddOwningInv(t, "test")),
},
options: Options{
options: ApplierOptions{
InventoryPolicy: inventory.InventoryPolicyMustMatch,
EmitStatusEvents: true,
},
@ -1199,7 +1199,7 @@ func TestApplier(t *testing.T) {
id: "test",
},
clusterObjs: object.UnstructuredSet{},
options: Options{
options: ApplierOptions{
ReconcileTimeout: time.Minute,
InventoryPolicy: inventory.AdoptIfNoInventory,
EmitStatusEvents: true,
@ -1380,7 +1380,7 @@ func TestApplier(t *testing.T) {
id: "test",
},
clusterObjs: object.UnstructuredSet{},
options: Options{
options: ApplierOptions{
ReconcileTimeout: time.Minute,
InventoryPolicy: inventory.AdoptIfNoInventory,
EmitStatusEvents: true,
@ -1531,7 +1531,7 @@ func TestApplierCancel(t *testing.T) {
// objects in the cluster
clusterObjs object.UnstructuredSet
// options input to applier.Run
options Options
options ApplierOptions
// timeout for applier.Run
runTimeout time.Duration
// timeout for the test
@ -1558,7 +1558,7 @@ func TestApplierCancel(t *testing.T) {
id: "test",
},
clusterObjs: object.UnstructuredSet{},
options: Options{
options: ApplierOptions{
// EmitStatusEvents required to test event output
EmitStatusEvents: true,
NoPrune: true,
@ -1716,7 +1716,7 @@ func TestApplierCancel(t *testing.T) {
id: "test",
},
clusterObjs: object.UnstructuredSet{},
options: Options{
options: ApplierOptions{
// EmitStatusEvents required to test event output
EmitStatusEvents: true,
NoPrune: true,
@ -1947,7 +1947,7 @@ func TestApplierCancel(t *testing.T) {
if tc.expectRunTimeout {
assert.Equal(t, context.DeadlineExceeded, runCtx.Err(), "Applier.Run exited, but not by expected timeout")
} else {
assert.Nil(t, runCtx.Err(), "Applier.Run exited, but not by expected timeout")
assert.NoError(t, runCtx.Err(), "Applier.Run exited, but not by expected timeout")
}
})
}
@ -1955,7 +1955,7 @@ func TestApplierCancel(t *testing.T) {
func TestReadAndPrepareObjectsNilInv(t *testing.T) {
applier := Applier{}
_, _, err := applier.prepareObjects(nil, object.UnstructuredSet{}, Options{})
_, _, err := applier.prepareObjects(nil, object.UnstructuredSet{}, ApplierOptions{})
assert.Error(t, err)
}
@ -2062,7 +2062,7 @@ func TestReadAndPrepareObjects(t *testing.T) {
newFakePoller([]pollevent.Event{}),
)
applyObjs, pruneObjs, err := applier.prepareObjects(tc.invInfo.toWrapped(), tc.resources, Options{})
applyObjs, pruneObjs, err := applier.prepareObjects(tc.invInfo.toWrapped(), tc.resources, ApplierOptions{})
if tc.isError {
assert.Error(t, err)
return

View File

@ -85,9 +85,12 @@ func newTestApplier(
invClient := newTestInventory(t, tf)
applier, err := NewApplier(tf, invClient)
applier, err := NewApplierBuilder().
WithFactory(tf).
WithInventoryClient(invClient).
WithStatusPoller(statusPoller).
Build()
require.NoError(t, err)
applier.StatusPoller = statusPoller
// Inject the fakeInfoHelper to allow generating Info
// objects that use the FakeRESTClient as the UnstructuredClient.

View File

@ -14,6 +14,7 @@ import (
"sigs.k8s.io/cli-utils/pkg/apply/cache"
"sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/apply/filter"
"sigs.k8s.io/cli-utils/pkg/apply/info"
"sigs.k8s.io/cli-utils/pkg/apply/poller"
"sigs.k8s.io/cli-utils/pkg/apply/prune"
"sigs.k8s.io/cli-utils/pkg/apply/solver"
@ -89,7 +90,7 @@ type DestroyerOptions struct {
func setDestroyerDefaults(o *DestroyerOptions) {
if o.PollInterval == time.Duration(0) {
o.PollInterval = poller.DefaultPollInterval
o.PollInterval = defaultPollInterval
}
if o.DeletePropagationPolicy == "" {
o.DeletePropagationPolicy = metav1.DeletePropagationBackground
@ -130,13 +131,20 @@ func (d *Destroyer) Run(ctx context.Context, inv inventory.InventoryInfo, option
validator.Validate(deleteObjs)
klog.V(4).Infoln("destroyer building task queue...")
dynamicClient, err := d.factory.DynamicClient()
if err != nil {
handleError(eventChannel, err)
return
}
taskBuilder := &solver.TaskQueueBuilder{
Pruner: d.pruner,
Factory: d.factory,
Mapper: mapper,
InvClient: d.invClient,
Destroy: true,
Collector: vCollector,
Pruner: d.pruner,
DynamicClient: dynamicClient,
OpenAPIGetter: d.factory.OpenAPIGetter(),
InfoHelper: info.NewInfoHelper(mapper, d.factory.UnstructuredClientForMapping),
Mapper: mapper,
InvClient: d.invClient,
Destroy: true,
Collector: vCollector,
}
opts := solver.Options{
Prune: true,

View File

@ -7,7 +7,6 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/kubectl/pkg/cmd/util"
"sigs.k8s.io/cli-utils/pkg/object"
)
@ -21,16 +20,16 @@ type InfoHelper interface {
BuildInfo(obj *unstructured.Unstructured) (*resource.Info, error)
}
func NewInfoHelper(mapper meta.RESTMapper, factory util.Factory) *infoHelper {
func NewInfoHelper(mapper meta.RESTMapper, unstructuredClientForMapping func(*meta.RESTMapping) (resource.RESTClient, error)) *infoHelper {
return &infoHelper{
mapper: mapper,
factory: factory,
mapper: mapper,
unstructuredClientForMapping: unstructuredClientForMapping,
}
}
type infoHelper struct {
mapper meta.RESTMapper
factory util.Factory
mapper meta.RESTMapper
unstructuredClientForMapping func(*meta.RESTMapping) (resource.RESTClient, error)
}
func (ih *infoHelper) UpdateInfo(info *resource.Info) error {
@ -41,7 +40,7 @@ func (ih *infoHelper) UpdateInfo(info *resource.Info) error {
}
info.Mapping = mapping
c, err := ih.factory.UnstructuredClientForMapping(mapping)
c, err := ih.unstructuredClientForMapping(mapping)
if err != nil {
return err
}

View File

@ -5,15 +5,12 @@ package poller
import (
"context"
"time"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling"
pollevent "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/object"
)
const DefaultPollInterval = 2 * time.Second
// Poller defines the interface the applier needs to poll for status of resources.
// The context is the preferred way to shut down the poller.
// The identifiers defines the resources which the poller should poll and

View File

@ -20,8 +20,9 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/klog/v2"
"k8s.io/kubectl/pkg/cmd/util"
"sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/apply/filter"
"sigs.k8s.io/cli-utils/pkg/apply/info"
@ -37,11 +38,12 @@ import (
)
type TaskQueueBuilder struct {
Pruner *prune.Pruner
InfoHelper info.InfoHelper
Factory util.Factory
Mapper meta.RESTMapper
InvClient inventory.InventoryClient
Pruner *prune.Pruner
DynamicClient dynamic.Interface
OpenAPIGetter discovery.OpenAPISchemaInterface
InfoHelper info.InfoHelper
Mapper meta.RESTMapper
InvClient inventory.InventoryClient
// Collector is used to collect validation errors and invalid objects.
// Invalid objects will be filtered and not be injected into tasks.
Collector *validation.Collector
@ -158,8 +160,9 @@ func (t *TaskQueueBuilder) AppendApplyTask(applyObjs object.UnstructuredSet,
Mutators: applyMutators,
ServerSideOptions: o.ServerSideOptions,
DryRunStrategy: o.DryRunStrategy,
DynamicClient: t.DynamicClient,
OpenAPIGetter: t.OpenAPIGetter,
InfoHelper: t.InfoHelper,
Factory: t.Factory,
Mapper: t.Mapper,
})
t.applyCounter += 1

View File

@ -14,10 +14,11 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/klog/v2"
"k8s.io/kubectl/pkg/cmd/apply"
cmddelete "k8s.io/kubectl/pkg/cmd/delete"
"k8s.io/kubectl/pkg/cmd/util"
applyerror "sigs.k8s.io/cli-utils/pkg/apply/error"
"sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/apply/filter"
@ -46,7 +47,8 @@ type applyOptions interface {
type ApplyTask struct {
TaskName string
Factory util.Factory
DynamicClient dynamic.Interface
OpenAPIGetter discovery.OpenAPISchemaInterface
InfoHelper info.InfoHelper
Mapper meta.RESTMapper
Objects object.UnstructuredSet
@ -87,18 +89,6 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
objects := a.Objects
klog.V(2).Infof("apply task starting (name: %q, objects: %d)",
a.Name(), len(objects))
// Create a new instance of the applyOptions interface and use it
// to apply the objects.
ao, err := applyOptionsFactoryFunc(a.Name(), taskContext.EventChannel(),
a.ServerSideOptions, a.DryRunStrategy, a.Factory)
if err != nil {
if klog.V(4).Enabled() {
klog.Errorf("error creating ApplyOptions (%s)--returning", err)
}
a.sendBatchApplyEvents(taskContext, objects, err)
a.sendTaskResult(taskContext)
return
}
for _, obj := range objects {
// Set the client and mapping fields on the provided
// info so they can be applied to the cluster.
@ -157,7 +147,10 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
continue
}
// Apply the object
// Create a new instance of the applyOptions interface and use it
// to apply the objects.
ao := applyOptionsFactoryFunc(a.Name(), taskContext.EventChannel(),
a.ServerSideOptions, a.DryRunStrategy, a.DynamicClient, a.OpenAPIGetter)
ao.SetObjects([]*resource.Info{info})
klog.V(5).Infof("applying %s/%s...", info.Namespace, info.Name)
err = ao.Run()
@ -189,16 +182,9 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
}()
}
func newApplyOptions(taskName string, eventChannel chan event.Event, serverSideOptions common.ServerSideOptions,
strategy common.DryRunStrategy, factory util.Factory) (applyOptions, error) {
discovery, err := factory.ToDiscoveryClient()
if err != nil {
return nil, err
}
dynamic, err := factory.DynamicClient()
if err != nil {
return nil, err
}
func newApplyOptions(taskName string, eventChannel chan<- event.Event, serverSideOptions common.ServerSideOptions,
strategy common.DryRunStrategy, dynamicClient dynamic.Interface,
openAPIGetter discovery.OpenAPISchemaInterface) applyOptions {
emptyString := ""
return &apply.ApplyOptions{
VisitedNamespaces: sets.NewString(),
@ -227,9 +213,9 @@ func newApplyOptions(taskName string, eventChannel chan event.Event, serverSideO
ch: eventChannel,
groupName: taskName,
}).toPrinterFunc(),
DynamicClient: dynamic,
DryRunVerifier: resource.NewDryRunVerifier(dynamic, discovery),
}, nil
DynamicClient: dynamicClient,
DryRunVerifier: resource.NewDryRunVerifier(dynamicClient, openAPIGetter),
}
}
func (a *ApplyTask) sendTaskResult(taskContext *taskrunner.TaskContext) {
@ -283,23 +269,6 @@ func (a *ApplyTask) createApplyFailedEvent(id object.ObjMetadata, err error) eve
}
}
// sendBatchApplyEvents is a helper function to send out multiple apply events for
// a list of resources when failed to initialize the apply process.
func (a *ApplyTask) sendBatchApplyEvents(
taskContext *taskrunner.TaskContext,
objects object.UnstructuredSet,
err error,
) {
for _, obj := range objects {
id := object.UnstructuredToObjMetadata(obj)
taskContext.SendEvent(a.createApplyFailedEvent(
id,
applyerror.NewInitializeApplyOptionError(err),
))
taskContext.AddFailedApply(id)
}
}
func isAPIService(obj *unstructured.Unstructured) bool {
gk := obj.GroupVersionKind().GroupKind()
return gk.Group == "apiregistration.k8s.io" && gk.Kind == "APIService"
@ -311,11 +280,8 @@ func isStreamError(err error) bool {
return strings.Contains(err.Error(), "stream error: stream ID ")
}
func (a *ApplyTask) clientSideApply(info *resource.Info, eventChannel chan event.Event) error {
ao, err := applyOptionsFactoryFunc(a.Name(), eventChannel, common.ServerSideOptions{ServerSideApply: false}, a.DryRunStrategy, a.Factory)
if err != nil {
return err
}
func (a *ApplyTask) clientSideApply(info *resource.Info, eventChannel chan<- event.Event) error {
ao := applyOptionsFactoryFunc(a.Name(), eventChannel, common.ServerSideOptions{ServerSideApply: false}, a.DryRunStrategy, a.DynamicClient, a.OpenAPIGetter)
ao.SetObjects([]*resource.Info{info})
return ao.Run()
}

View File

@ -14,7 +14,8 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/kubectl/pkg/cmd/util"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"sigs.k8s.io/cli-utils/pkg/apply/cache"
"sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/apply/taskrunner"
@ -85,8 +86,9 @@ func TestApplyTask_BasicAppliedObjects(t *testing.T) {
objs := toUnstructureds(tc.applied)
oldAO := applyOptionsFactoryFunc
applyOptionsFactoryFunc = func(string, chan event.Event, common.ServerSideOptions, common.DryRunStrategy, util.Factory) (applyOptions, error) {
return &fakeApplyOptions{}, nil
applyOptionsFactoryFunc = func(string, chan<- event.Event, common.ServerSideOptions, common.DryRunStrategy,
dynamic.Interface, discovery.OpenAPISchemaInterface) applyOptions {
return &fakeApplyOptions{}
}
defer func() { applyOptionsFactoryFunc = oldAO }()
@ -174,8 +176,9 @@ func TestApplyTask_FetchGeneration(t *testing.T) {
objs := toUnstructureds(tc.rss)
oldAO := applyOptionsFactoryFunc
applyOptionsFactoryFunc = func(string, chan event.Event, common.ServerSideOptions, common.DryRunStrategy, util.Factory) (applyOptions, error) {
return &fakeApplyOptions{}, nil
applyOptionsFactoryFunc = func(string, chan<- event.Event, common.ServerSideOptions, common.DryRunStrategy,
dynamic.Interface, discovery.OpenAPISchemaInterface) applyOptions {
return &fakeApplyOptions{}
}
defer func() { applyOptionsFactoryFunc = oldAO }()
applyTask := &ApplyTask{
@ -295,8 +298,9 @@ func TestApplyTask_DryRun(t *testing.T) {
ao := &fakeApplyOptions{}
oldAO := applyOptionsFactoryFunc
applyOptionsFactoryFunc = func(string, chan event.Event, common.ServerSideOptions, common.DryRunStrategy, util.Factory) (applyOptions, error) {
return ao, nil
applyOptionsFactoryFunc = func(string, chan<- event.Event, common.ServerSideOptions, common.DryRunStrategy,
dynamic.Interface, discovery.OpenAPISchemaInterface) applyOptions {
return ao
}
defer func() { applyOptionsFactoryFunc = oldAO }()
@ -442,8 +446,9 @@ func TestApplyTaskWithError(t *testing.T) {
ao := &fakeApplyOptions{}
oldAO := applyOptionsFactoryFunc
applyOptionsFactoryFunc = func(string, chan event.Event, common.ServerSideOptions, common.DryRunStrategy, util.Factory) (applyOptions, error) {
return ao, nil
applyOptionsFactoryFunc = func(string, chan<- event.Event, common.ServerSideOptions, common.DryRunStrategy,
dynamic.Interface, discovery.OpenAPISchemaInterface) applyOptions {
return ao
}
defer func() { applyOptionsFactoryFunc = oldAO }()

View File

@ -32,7 +32,7 @@ func applyAndDestroyTest(ctx context.Context, c client.Client, invConfig Invento
deployment1Obj,
}
applierEvents := runCollect(applier.Run(ctx, inventoryInfo, resources, apply.Options{
applierEvents := runCollect(applier.Run(ctx, inventoryInfo, resources, apply.ApplierOptions{
ReconcileTimeout: 2 * time.Minute,
EmitStatusEvents: true,
}))

View File

@ -32,7 +32,7 @@ func continueOnErrorTest(ctx context.Context, c client.Client, invConfig Invento
pod1Obj,
}
applierEvents := runCollect(applier.Run(ctx, inv, resources, apply.Options{}))
applierEvents := runCollect(applier.Run(ctx, inv, resources, apply.ApplierOptions{}))
expEvents := []testutil.ExpEvent{
{

View File

@ -34,7 +34,7 @@ func crdTest(ctx context.Context, _ client.Client, invConfig InventoryConfig, in
crdObj,
}
applierEvents := runCollect(applier.Run(ctx, inv, resources, apply.Options{
applierEvents := runCollect(applier.Run(ctx, inv, resources, apply.ApplierOptions{
ReconcileTimeout: 2 * time.Minute,
EmitStatusEvents: false,
}))

View File

@ -30,7 +30,7 @@ func deletionPreventionTest(ctx context.Context, c client.Client, invConfig Inve
withAnnotation(withNamespace(manifestToUnstructured(pod2), namespaceName), common.LifecycleDeleteAnnotation, common.PreventDeletion),
}
runCollect(applier.Run(ctx, inventoryInfo, resources, apply.Options{
runCollect(applier.Run(ctx, inventoryInfo, resources, apply.ApplierOptions{
ReconcileTimeout: 2 * time.Minute,
}))
@ -54,7 +54,7 @@ func deletionPreventionTest(ctx context.Context, c client.Client, invConfig Inve
withNamespace(manifestToUnstructured(deployment1), namespaceName),
}
runCollect(applier.Run(ctx, inventoryInfo, resources, apply.Options{
runCollect(applier.Run(ctx, inventoryInfo, resources, apply.ApplierOptions{
ReconcileTimeout: 2 * time.Minute,
DryRunStrategy: common.DryRunClient,
}))
@ -79,7 +79,7 @@ func deletionPreventionTest(ctx context.Context, c client.Client, invConfig Inve
withNamespace(manifestToUnstructured(deployment1), namespaceName),
}
runCollect(applier.Run(ctx, inventoryInfo, resources, apply.Options{
runCollect(applier.Run(ctx, inventoryInfo, resources, apply.ApplierOptions{
ReconcileTimeout: 2 * time.Minute,
}))

View File

@ -36,7 +36,7 @@ func dependsOnTest(ctx context.Context, c client.Client, invConfig InventoryConf
pod3Obj,
}
applierEvents := runCollect(applier.Run(ctx, inv, resources, apply.Options{
applierEvents := runCollect(applier.Run(ctx, inv, resources, apply.ApplierOptions{
EmitStatusEvents: false,
}))

View File

@ -401,7 +401,10 @@ func newApplierFromInvFactory(invFactory inventory.InventoryClientFactory) *appl
invClient, err := invFactory.NewInventoryClient(f)
Expect(err).NotTo(HaveOccurred())
a, err := apply.NewApplier(f, invClient)
a, err := apply.NewApplierBuilder().
WithFactory(f).
WithInventoryClient(invClient).
Build()
Expect(err).NotTo(HaveOccurred())
return a
}

View File

@ -40,7 +40,7 @@ func exitEarlyTest(ctx context.Context, c client.Client, invConfig InventoryConf
invalidPodObj,
}
applierEvents := runCollect(applier.Run(ctx, inv, resources, apply.Options{
applierEvents := runCollect(applier.Run(ctx, inv, resources, apply.ApplierOptions{
EmitStatusEvents: false,
ValidationPolicy: validation.ExitEarly,
}))

View File

@ -31,7 +31,7 @@ func inventoryPolicyMustMatchTest(ctx context.Context, c client.Client, invConfi
deployment1Obj,
}
runWithNoErr(applier.Run(ctx, firstInv, firstResources, apply.Options{
runWithNoErr(applier.Run(ctx, firstInv, firstResources, apply.ApplierOptions{
ReconcileTimeout: 2 * time.Minute,
EmitStatusEvents: true,
}))
@ -44,7 +44,7 @@ func inventoryPolicyMustMatchTest(ctx context.Context, c client.Client, invConfi
withReplicas(deployment1Obj, 6),
}
applierEvents := runCollect(applier.Run(ctx, secondInv, secondResources, apply.Options{
applierEvents := runCollect(applier.Run(ctx, secondInv, secondResources, apply.ApplierOptions{
ReconcileTimeout: 2 * time.Minute,
EmitStatusEvents: true,
InventoryPolicy: inventory.InventoryPolicyMustMatch,
@ -202,7 +202,7 @@ func inventoryPolicyAdoptIfNoInventoryTest(ctx context.Context, c client.Client,
withReplicas(deployment1Obj, 6),
}
applierEvents := runCollect(applier.Run(ctx, inv, resources, apply.Options{
applierEvents := runCollect(applier.Run(ctx, inv, resources, apply.ApplierOptions{
ReconcileTimeout: 2 * time.Minute,
EmitStatusEvents: true,
InventoryPolicy: inventory.AdoptIfNoInventory,
@ -371,7 +371,7 @@ func inventoryPolicyAdoptAllTest(ctx context.Context, c client.Client, invConfig
deployment1Obj,
}
runWithNoErr(applier.Run(ctx, firstInv, firstResources, apply.Options{
runWithNoErr(applier.Run(ctx, firstInv, firstResources, apply.ApplierOptions{
ReconcileTimeout: 2 * time.Minute,
EmitStatusEvents: true,
}))
@ -384,7 +384,7 @@ func inventoryPolicyAdoptAllTest(ctx context.Context, c client.Client, invConfig
withReplicas(deployment1Obj, 6),
}
applierEvents := runCollect(applier.Run(ctx, secondInv, secondResources, apply.Options{
applierEvents := runCollect(applier.Run(ctx, secondInv, secondResources, apply.ApplierOptions{
ReconcileTimeout: 2 * time.Minute,
EmitStatusEvents: true,
InventoryPolicy: inventory.AdoptAll,

View File

@ -49,7 +49,7 @@ func mutationTest(ctx context.Context, c client.Client, invConfig InventoryConfi
podBObj,
}
applierEvents := runCollect(applier.Run(ctx, inv, resources, apply.Options{
applierEvents := runCollect(applier.Run(ctx, inv, resources, apply.ApplierOptions{
EmitStatusEvents: false,
}))

View File

@ -26,7 +26,7 @@ func applyWithExistingInvTest(ctx context.Context, c client.Client, invConfig In
withNamespace(manifestToUnstructured(deployment1), namespaceName),
}
runWithNoErr(applier.Run(ctx, orgApplyInv, resources, apply.Options{
runWithNoErr(applier.Run(ctx, orgApplyInv, resources, apply.ApplierOptions{
ReconcileTimeout: 2 * time.Minute,
EmitStatusEvents: true,
}))
@ -38,7 +38,7 @@ func applyWithExistingInvTest(ctx context.Context, c client.Client, invConfig In
secondInventoryID := fmt.Sprintf("%s-%s-2", inventoryName, namespaceName)
secondApplyInv := invConfig.InvWrapperFunc(invConfig.InventoryFactoryFunc(inventoryName, namespaceName, secondInventoryID))
err := run(applier.Run(ctx, secondApplyInv, resources, apply.Options{
err := run(applier.Run(ctx, secondApplyInv, resources, apply.ApplierOptions{
ReconcileTimeout: 2 * time.Minute,
EmitStatusEvents: true,
}))

View File

@ -31,7 +31,7 @@ func pruneRetrieveErrorTest(ctx context.Context, c client.Client, invConfig Inve
pod1Obj,
}
applierEvents := runCollect(applier.Run(ctx, inv, resource1, apply.Options{
applierEvents := runCollect(applier.Run(ctx, inv, resource1, apply.ApplierOptions{
EmitStatusEvents: false,
}))
@ -165,7 +165,7 @@ func pruneRetrieveErrorTest(ctx context.Context, c client.Client, invConfig Inve
pod2Obj,
}
applierEvents2 := runCollect(applier.Run(ctx, inv, resource2, apply.Options{
applierEvents2 := runCollect(applier.Run(ctx, inv, resource2, apply.ApplierOptions{
EmitStatusEvents: false,
}))

View File

@ -29,7 +29,7 @@ func reconciliationFailed(ctx context.Context, invConfig InventoryConfig, invent
podObj,
}
applierEvents := runCollect(applier.Run(ctx, inventoryInfo, resources, apply.Options{
applierEvents := runCollect(applier.Run(ctx, inventoryInfo, resources, apply.ApplierOptions{
ReconcileTimeout: 2 * time.Minute,
EmitStatusEvents: false,
}))
@ -52,7 +52,7 @@ func reconciliationTimeout(ctx context.Context, invConfig InventoryConfig, inven
podObj,
}
applierEvents := runCollect(applier.Run(ctx, inventoryInfo, resources, apply.Options{
applierEvents := runCollect(applier.Run(ctx, inventoryInfo, resources, apply.ApplierOptions{
ReconcileTimeout: 30 * time.Second,
EmitStatusEvents: false,
}))

View File

@ -27,7 +27,7 @@ func serversideApplyTest(ctx context.Context, c client.Client, invConfig Invento
manifestToUnstructured(apiservice1),
}
runWithNoErr(applier.Run(ctx, inv, firstResources, apply.Options{
runWithNoErr(applier.Run(ctx, inv, firstResources, apply.ApplierOptions{
ReconcileTimeout: 2 * time.Minute,
EmitStatusEvents: true,
ServerSideOptions: common.ServerSideOptions{

View File

@ -55,7 +55,7 @@ func skipInvalidTest(ctx context.Context, c client.Client, invConfig InventoryCo
invalidPodObj,
}
applierEvents := runCollect(applier.Run(ctx, inv, resources, apply.Options{
applierEvents := runCollect(applier.Run(ctx, inv, resources, apply.ApplierOptions{
EmitStatusEvents: false,
ValidationPolicy: validation.SkipInvalid,
}))