mirror of https://github.com/kubernetes/kops.git
Update controller-runtime to v0.12.0
This commit is contained in:
parent
d44030184b
commit
a5e52dc81c
2
go.mod
2
go.mod
|
|
@ -97,7 +97,7 @@ require (
|
||||||
k8s.io/legacy-cloud-providers v0.24.0
|
k8s.io/legacy-cloud-providers v0.24.0
|
||||||
k8s.io/mount-utils v0.24.0
|
k8s.io/mount-utils v0.24.0
|
||||||
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9
|
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9
|
||||||
sigs.k8s.io/controller-runtime v0.11.2
|
sigs.k8s.io/controller-runtime v0.12.0
|
||||||
sigs.k8s.io/yaml v1.3.0
|
sigs.k8s.io/yaml v1.3.0
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
6
go.sum
6
go.sum
|
|
@ -1062,7 +1062,7 @@ github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1y
|
||||||
github.com/onsi/gomega v1.10.3/go.mod h1:V9xEwhxec5O8UDM77eCW8vLymOMltsqPVYWrpDsH8xc=
|
github.com/onsi/gomega v1.10.3/go.mod h1:V9xEwhxec5O8UDM77eCW8vLymOMltsqPVYWrpDsH8xc=
|
||||||
github.com/onsi/gomega v1.14.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0=
|
github.com/onsi/gomega v1.14.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0=
|
||||||
github.com/onsi/gomega v1.15.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0=
|
github.com/onsi/gomega v1.15.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0=
|
||||||
github.com/onsi/gomega v1.17.0 h1:9Luw4uT5HTjHTN8+aNcSThgH1vdXnmdJ8xIfZ4wyTRE=
|
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
|
||||||
github.com/opencontainers/go-digest v0.0.0-20170106003457-a6d0ee40d420/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s=
|
github.com/opencontainers/go-digest v0.0.0-20170106003457-a6d0ee40d420/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s=
|
||||||
github.com/opencontainers/go-digest v0.0.0-20180430190053-c9281466c8b2/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s=
|
github.com/opencontainers/go-digest v0.0.0-20180430190053-c9281466c8b2/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s=
|
||||||
github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s=
|
github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s=
|
||||||
|
|
@ -2169,8 +2169,8 @@ rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
|
||||||
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
|
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
|
||||||
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.30/go.mod h1:fEO7lRTdivWO2qYVCVG7dEADOMo/MLDCVr8So2g88Uw=
|
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.30/go.mod h1:fEO7lRTdivWO2qYVCVG7dEADOMo/MLDCVr8So2g88Uw=
|
||||||
sigs.k8s.io/controller-runtime v0.9.6/go.mod h1:q6PpkM5vqQubEKUKOM6qr06oXGzOBcCby1DA9FbyZeA=
|
sigs.k8s.io/controller-runtime v0.9.6/go.mod h1:q6PpkM5vqQubEKUKOM6qr06oXGzOBcCby1DA9FbyZeA=
|
||||||
sigs.k8s.io/controller-runtime v0.11.2 h1:H5GTxQl0Mc9UjRJhORusqfJCIjBO8UtUxGggCwL1rLA=
|
sigs.k8s.io/controller-runtime v0.12.0 h1:gA4zphrmHFc7ihmY/+GyyE0BxKD+OYdb5+DjD2azFAQ=
|
||||||
sigs.k8s.io/controller-runtime v0.11.2/go.mod h1:P6QCzrEjLaZGqHsfd+os7JQ+WFZhvB8MRFsn4dWF7O4=
|
sigs.k8s.io/controller-runtime v0.12.0/go.mod h1:BKhxlA4l7FPK4AQcsuL4X6vZeWnKDXez/vp1Y8dxTU0=
|
||||||
sigs.k8s.io/controller-tools v0.6.2/go.mod h1:oaeGpjXn6+ZSEIQkUe/+3I40PNiDYp9aeawbt3xTgJ8=
|
sigs.k8s.io/controller-tools v0.6.2/go.mod h1:oaeGpjXn6+ZSEIQkUe/+3I40PNiDYp9aeawbt3xTgJ8=
|
||||||
sigs.k8s.io/gateway-api v0.4.1 h1:Tof9/PNSZXyfDuTTe1XFvaTlvBRE6bKq1kmV6jj6rQE=
|
sigs.k8s.io/gateway-api v0.4.1 h1:Tof9/PNSZXyfDuTTe1XFvaTlvBRE6bKq1kmV6jj6rQE=
|
||||||
sigs.k8s.io/gateway-api v0.4.1/go.mod h1:r3eiNP+0el+NTLwaTfOrCNXy8TukC+dIM3ggc+fbNWk=
|
sigs.k8s.io/gateway-api v0.4.1/go.mod h1:r3eiNP+0el+NTLwaTfOrCNXy8TukC+dIM3ggc+fbNWk=
|
||||||
|
|
|
||||||
|
|
@ -1676,13 +1676,14 @@ oras.land/oras-go/pkg/registry/remote/auth
|
||||||
oras.land/oras-go/pkg/registry/remote/internal/errutil
|
oras.land/oras-go/pkg/registry/remote/internal/errutil
|
||||||
oras.land/oras-go/pkg/registry/remote/internal/syncutil
|
oras.land/oras-go/pkg/registry/remote/internal/syncutil
|
||||||
oras.land/oras-go/pkg/target
|
oras.land/oras-go/pkg/target
|
||||||
# sigs.k8s.io/controller-runtime v0.11.2
|
# sigs.k8s.io/controller-runtime v0.12.0
|
||||||
## explicit; go 1.17
|
## explicit; go 1.17
|
||||||
sigs.k8s.io/controller-runtime
|
sigs.k8s.io/controller-runtime
|
||||||
sigs.k8s.io/controller-runtime/pkg/builder
|
sigs.k8s.io/controller-runtime/pkg/builder
|
||||||
sigs.k8s.io/controller-runtime/pkg/cache
|
sigs.k8s.io/controller-runtime/pkg/cache
|
||||||
sigs.k8s.io/controller-runtime/pkg/cache/internal
|
sigs.k8s.io/controller-runtime/pkg/cache/internal
|
||||||
sigs.k8s.io/controller-runtime/pkg/certwatcher
|
sigs.k8s.io/controller-runtime/pkg/certwatcher
|
||||||
|
sigs.k8s.io/controller-runtime/pkg/certwatcher/metrics
|
||||||
sigs.k8s.io/controller-runtime/pkg/client
|
sigs.k8s.io/controller-runtime/pkg/client
|
||||||
sigs.k8s.io/controller-runtime/pkg/client/apiutil
|
sigs.k8s.io/controller-runtime/pkg/client/apiutil
|
||||||
sigs.k8s.io/controller-runtime/pkg/client/config
|
sigs.k8s.io/controller-runtime/pkg/client/config
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ linters:
|
||||||
- depguard
|
- depguard
|
||||||
- dogsled
|
- dogsled
|
||||||
- errcheck
|
- errcheck
|
||||||
|
- errorlint
|
||||||
- exportloopref
|
- exportloopref
|
||||||
- goconst
|
- goconst
|
||||||
- gocritic
|
- gocritic
|
||||||
|
|
@ -34,6 +35,7 @@ linters:
|
||||||
- typecheck
|
- typecheck
|
||||||
- unconvert
|
- unconvert
|
||||||
- unparam
|
- unparam
|
||||||
|
- unused
|
||||||
- varcheck
|
- varcheck
|
||||||
- whitespace
|
- whitespace
|
||||||
|
|
||||||
|
|
@ -59,9 +61,13 @@ linters-settings:
|
||||||
- pkg: sigs.k8s.io/controller-runtime
|
- pkg: sigs.k8s.io/controller-runtime
|
||||||
alias: ctrl
|
alias: ctrl
|
||||||
staticcheck:
|
staticcheck:
|
||||||
go: "1.17"
|
go: "1.18"
|
||||||
stylecheck:
|
stylecheck:
|
||||||
go: "1.17"
|
go: "1.18"
|
||||||
|
depguard:
|
||||||
|
include-go-root: true
|
||||||
|
packages:
|
||||||
|
- io/ioutil # https://go.dev/doc/go1.16#ioutil
|
||||||
|
|
||||||
issues:
|
issues:
|
||||||
max-same-issues: 0
|
max-same-issues: 0
|
||||||
|
|
@ -121,6 +127,11 @@ issues:
|
||||||
- linters:
|
- linters:
|
||||||
- gocritic
|
- gocritic
|
||||||
text: "singleCaseSwitch: should rewrite switch statement to if statement"
|
text: "singleCaseSwitch: should rewrite switch statement to if statement"
|
||||||
|
# It considers all file access to a filename that comes from a variable problematic,
|
||||||
|
# which is naiv at best.
|
||||||
|
- linters:
|
||||||
|
- gosec
|
||||||
|
text: "G304: Potential file inclusion via variable"
|
||||||
|
|
||||||
run:
|
run:
|
||||||
timeout: 10m
|
timeout: 10m
|
||||||
|
|
|
||||||
|
|
@ -48,9 +48,9 @@ generally cover most circumstances.
|
||||||
### Q: Where's the fake client? How do I use it?
|
### Q: Where's the fake client? How do I use it?
|
||||||
|
|
||||||
**A**: The fake client
|
**A**: The fake client
|
||||||
[exists](https://godoc.org/sigs.k8s.io/controller-runtime/pkg/client/fake),
|
[exists](https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/client/fake),
|
||||||
but we generally recommend using
|
but we generally recommend using
|
||||||
[envtest.Environment](https://godoc.org/sigs.k8s.io/controller-runtime/pkg/envtest#Environment)
|
[envtest.Environment](https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/envtest#Environment)
|
||||||
to test against a real API server. In our experience, tests using fake
|
to test against a real API server. In our experience, tests using fake
|
||||||
clients gradually re-implement poorly-written impressions of a real API
|
clients gradually re-implement poorly-written impressions of a real API
|
||||||
server, which leads to hard-to-maintain, complex test code.
|
server, which leads to hard-to-maintain, complex test code.
|
||||||
|
|
@ -58,7 +58,7 @@ server, which leads to hard-to-maintain, complex test code.
|
||||||
### Q: How should I write tests? Any suggestions for getting started?
|
### Q: How should I write tests? Any suggestions for getting started?
|
||||||
|
|
||||||
- Use the aforementioned
|
- Use the aforementioned
|
||||||
[envtest.Environment](https://godoc.org/sigs.k8s.io/controller-runtime/pkg/envtest#Environment)
|
[envtest.Environment](https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/envtest#Environment)
|
||||||
to spin up a real API server instead of trying to mock one out.
|
to spin up a real API server instead of trying to mock one out.
|
||||||
|
|
||||||
- Structure your tests to check that the state of the world is as you
|
- Structure your tests to check that the state of the world is as you
|
||||||
|
|
@ -77,5 +77,5 @@ mapping between Go types and group-version-kinds in Kubernetes. In
|
||||||
general, your application should have its own Scheme containing the types
|
general, your application should have its own Scheme containing the types
|
||||||
from the API groups that it needs (be they Kubernetes types or your own).
|
from the API groups that it needs (be they Kubernetes types or your own).
|
||||||
See the [scheme builder
|
See the [scheme builder
|
||||||
docs](https://godoc.org/sigs.k8s.io/controller-runtime/pkg/scheme) for
|
docs](https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/scheme) for
|
||||||
more information.
|
more information.
|
||||||
|
|
|
||||||
|
|
@ -27,6 +27,7 @@ aliases:
|
||||||
- vincepri
|
- vincepri
|
||||||
- alexeldeib
|
- alexeldeib
|
||||||
- varshaprasad96
|
- varshaprasad96
|
||||||
|
- fillzpp
|
||||||
|
|
||||||
# folks to can approve things in the directly-ported
|
# folks to can approve things in the directly-ported
|
||||||
# testing_frameworks portions of the codebase
|
# testing_frameworks portions of the codebase
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
[](https://goreportcard.com/report/sigs.k8s.io/controller-runtime)
|
[](https://goreportcard.com/report/sigs.k8s.io/controller-runtime)
|
||||||
[](https://godoc.org/sigs.k8s.io/controller-runtime)
|
[](https://pkg.go.dev/sigs.k8s.io/controller-runtime)
|
||||||
|
|
||||||
# Kubernetes controller-runtime Project
|
# Kubernetes controller-runtime Project
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -107,7 +107,7 @@ limitations under the License.
|
||||||
//
|
//
|
||||||
// Logging (pkg/log) in controller-runtime is done via structured logs, using a
|
// Logging (pkg/log) in controller-runtime is done via structured logs, using a
|
||||||
// log set of interfaces called logr
|
// log set of interfaces called logr
|
||||||
// (https://godoc.org/github.com/go-logr/logr). While controller-runtime
|
// (https://pkg.go.dev/github.com/go-logr/logr). While controller-runtime
|
||||||
// provides easy setup for using Zap (https://go.uber.org/zap, pkg/log/zap),
|
// provides easy setup for using Zap (https://go.uber.org/zap, pkg/log/zap),
|
||||||
// you can provide any implementation of logr as the base logger for
|
// you can provide any implementation of logr as the base logger for
|
||||||
// controller-runtime.
|
// controller-runtime.
|
||||||
|
|
|
||||||
|
|
@ -23,6 +23,7 @@ import (
|
||||||
"github.com/go-logr/logr"
|
"github.com/go-logr/logr"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
|
"k8s.io/klog/v2"
|
||||||
|
|
||||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||||
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
|
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
|
||||||
|
|
@ -148,9 +149,9 @@ func (blder *Builder) WithOptions(options controller.Options) *Builder {
|
||||||
return blder
|
return blder
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithLogger overrides the controller options's logger used.
|
// WithLogConstructor overrides the controller options's LogConstructor.
|
||||||
func (blder *Builder) WithLogger(log logr.Logger) *Builder {
|
func (blder *Builder) WithLogConstructor(logConstructor func(*reconcile.Request) logr.Logger) *Builder {
|
||||||
blder.ctrlOptions.Log = log
|
blder.ctrlOptions.LogConstructor = logConstructor
|
||||||
return blder
|
return blder
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -304,13 +305,31 @@ func (blder *Builder) doController(r reconcile.Reconciler) error {
|
||||||
ctrlOptions.CacheSyncTimeout = *globalOpts.CacheSyncTimeout
|
ctrlOptions.CacheSyncTimeout = *globalOpts.CacheSyncTimeout
|
||||||
}
|
}
|
||||||
|
|
||||||
|
controllerName := blder.getControllerName(gvk)
|
||||||
|
|
||||||
// Setup the logger.
|
// Setup the logger.
|
||||||
if ctrlOptions.Log.GetSink() == nil {
|
if ctrlOptions.LogConstructor == nil {
|
||||||
ctrlOptions.Log = blder.mgr.GetLogger()
|
log = blder.mgr.GetLogger().WithValues(
|
||||||
|
"controller", controllerName,
|
||||||
|
"controllerGroup", gvk.Group,
|
||||||
|
"controllerKind", gvk.Kind,
|
||||||
|
)
|
||||||
|
|
||||||
|
lowerCamelCaseKind := strings.ToLower(gvk.Kind[:1]) + gvk.Kind[1:]
|
||||||
|
|
||||||
|
ctrlOptions.LogConstructor = func(req *reconcile.Request) logr.Logger {
|
||||||
|
log := log
|
||||||
|
if req != nil {
|
||||||
|
log = log.WithValues(
|
||||||
|
lowerCamelCaseKind, klog.KRef(req.Namespace, req.Name),
|
||||||
|
"namespace", req.Namespace, "name", req.Name,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
return log
|
||||||
|
}
|
||||||
}
|
}
|
||||||
ctrlOptions.Log = ctrlOptions.Log.WithValues("reconciler group", gvk.Group, "reconciler kind", gvk.Kind)
|
|
||||||
|
|
||||||
// Build the controller and return.
|
// Build the controller and return.
|
||||||
blder.ctrl, err = newController(blder.getControllerName(gvk), blder.mgr, ctrlOptions)
|
blder.ctrl, err = newController(controllerName, blder.mgr, ctrlOptions)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -107,6 +107,29 @@ var (
|
||||||
// metav1.PartialObjectMetadata to the client when fetching objects in your
|
// metav1.PartialObjectMetadata to the client when fetching objects in your
|
||||||
// reconciler, otherwise you'll end up with a duplicate structured or
|
// reconciler, otherwise you'll end up with a duplicate structured or
|
||||||
// unstructured cache.
|
// unstructured cache.
|
||||||
|
//
|
||||||
|
// When watching a resource with OnlyMetadata, for example the v1.Pod, you
|
||||||
|
// should not Get and List using the v1.Pod type. Instead, you should use
|
||||||
|
// the special metav1.PartialObjectMetadata type.
|
||||||
|
//
|
||||||
|
// ❌ Incorrect:
|
||||||
|
//
|
||||||
|
// pod := &v1.Pod{}
|
||||||
|
// mgr.GetClient().Get(ctx, nsAndName, pod)
|
||||||
|
//
|
||||||
|
// ✅ Correct:
|
||||||
|
//
|
||||||
|
// pod := &metav1.PartialObjectMetadata{}
|
||||||
|
// pod.SetGroupVersionKind(schema.GroupVersionKind{
|
||||||
|
// Group: "",
|
||||||
|
// Version: "v1",
|
||||||
|
// Kind: "Pod",
|
||||||
|
// })
|
||||||
|
// mgr.GetClient().Get(ctx, nsAndName, pod)
|
||||||
|
//
|
||||||
|
// In the first case, controller-runtime will create another cache for the
|
||||||
|
// concrete type on top of the metadata cache; this increases memory
|
||||||
|
// consumption and leads to race conditions as caches are not in sync.
|
||||||
OnlyMetadata = projectAs(projectAsMetadata)
|
OnlyMetadata = projectAs(projectAsMetadata)
|
||||||
|
|
||||||
_ ForOption = OnlyMetadata
|
_ ForOption = OnlyMetadata
|
||||||
|
|
|
||||||
|
|
@ -128,6 +128,18 @@ type Options struct {
|
||||||
// Be very careful with this, when enabled you must DeepCopy any object before mutating it,
|
// Be very careful with this, when enabled you must DeepCopy any object before mutating it,
|
||||||
// otherwise you will mutate the object in the cache.
|
// otherwise you will mutate the object in the cache.
|
||||||
UnsafeDisableDeepCopyByObject DisableDeepCopyByObject
|
UnsafeDisableDeepCopyByObject DisableDeepCopyByObject
|
||||||
|
|
||||||
|
// TransformByObject is a map from GVKs to transformer functions which
|
||||||
|
// get applied when objects of the transformation are about to be committed
|
||||||
|
// to cache.
|
||||||
|
//
|
||||||
|
// This function is called both for new objects to enter the cache,
|
||||||
|
// and for updated objects.
|
||||||
|
TransformByObject TransformByObject
|
||||||
|
|
||||||
|
// DefaultTransform is the transform used for all GVKs which do
|
||||||
|
// not have an explicit transform func set in TransformByObject
|
||||||
|
DefaultTransform toolscache.TransformFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
var defaultResyncTime = 10 * time.Hour
|
var defaultResyncTime = 10 * time.Hour
|
||||||
|
|
@ -146,7 +158,12 @@ func New(config *rest.Config, opts Options) (Cache, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace, selectorsByGVK, disableDeepCopyByGVK)
|
transformByGVK, err := convertToTransformByKindAndGVK(opts.TransformByObject, opts.DefaultTransform, opts.Scheme)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace, selectorsByGVK, disableDeepCopyByGVK, transformByGVK)
|
||||||
return &informerCache{InformersMap: im}, nil
|
return &informerCache{InformersMap: im}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -241,3 +258,18 @@ func convertToDisableDeepCopyByGVK(disableDeepCopyByObject DisableDeepCopyByObje
|
||||||
}
|
}
|
||||||
return disableDeepCopyByGVK, nil
|
return disableDeepCopyByGVK, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TransformByObject associate a client.Object's GVK to a transformer function
|
||||||
|
// to be applied when storing the object into the cache.
|
||||||
|
type TransformByObject map[client.Object]toolscache.TransformFunc
|
||||||
|
|
||||||
|
func convertToTransformByKindAndGVK(t TransformByObject, defaultTransform toolscache.TransformFunc, scheme *runtime.Scheme) (internal.TransformFuncByObject, error) {
|
||||||
|
result := internal.NewTransformFuncByObject()
|
||||||
|
for obj, transformation := range t {
|
||||||
|
if err := result.Set(obj, scheme, transformation); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
result.SetDefault(defaultTransform)
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -96,11 +96,11 @@ func (ip *informerCache) objectTypeForListObject(list client.ObjectList) (*schem
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if !strings.HasSuffix(gvk.Kind, "List") {
|
|
||||||
return nil, nil, fmt.Errorf("non-list type %T (kind %q) passed as output", list, gvk)
|
|
||||||
}
|
|
||||||
// we need the non-list GVK, so chop off the "List" from the end of the kind
|
// we need the non-list GVK, so chop off the "List" from the end of the kind
|
||||||
|
if strings.HasSuffix(gvk.Kind, "List") && apimeta.IsListType(list) {
|
||||||
gvk.Kind = gvk.Kind[:len(gvk.Kind)-4]
|
gvk.Kind = gvk.Kind[:len(gvk.Kind)-4]
|
||||||
|
}
|
||||||
|
|
||||||
_, isUnstructured := list.(*unstructured.UnstructuredList)
|
_, isUnstructured := list.(*unstructured.UnstructuredList)
|
||||||
var cacheTypeObj runtime.Object
|
var cacheTypeObj runtime.Object
|
||||||
if isUnstructured {
|
if isUnstructured {
|
||||||
|
|
@ -193,8 +193,8 @@ func indexByField(indexer Informer, field string, extractor client.IndexerFunc)
|
||||||
rawVals := extractor(obj)
|
rawVals := extractor(obj)
|
||||||
var vals []string
|
var vals []string
|
||||||
if ns == "" {
|
if ns == "" {
|
||||||
// if we're not doubling the keys for the namespaced case, just re-use what was returned to us
|
// if we're not doubling the keys for the namespaced case, just create a new slice with same length
|
||||||
vals = rawVals
|
vals = make([]string, len(rawVals))
|
||||||
} else {
|
} else {
|
||||||
// if we need to add non-namespaced versions too, double the length
|
// if we need to add non-namespaced versions too, double the length
|
||||||
vals = make([]string, len(rawVals)*2)
|
vals = make([]string, len(rawVals)*2)
|
||||||
|
|
|
||||||
|
|
@ -52,11 +52,12 @@ func NewInformersMap(config *rest.Config,
|
||||||
namespace string,
|
namespace string,
|
||||||
selectors SelectorsByGVK,
|
selectors SelectorsByGVK,
|
||||||
disableDeepCopy DisableDeepCopyByGVK,
|
disableDeepCopy DisableDeepCopyByGVK,
|
||||||
|
transformers TransformFuncByObject,
|
||||||
) *InformersMap {
|
) *InformersMap {
|
||||||
return &InformersMap{
|
return &InformersMap{
|
||||||
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),
|
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers),
|
||||||
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),
|
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers),
|
||||||
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),
|
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers),
|
||||||
|
|
||||||
Scheme: scheme,
|
Scheme: scheme,
|
||||||
}
|
}
|
||||||
|
|
@ -108,18 +109,18 @@ func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj
|
||||||
|
|
||||||
// newStructuredInformersMap creates a new InformersMap for structured objects.
|
// newStructuredInformersMap creates a new InformersMap for structured objects.
|
||||||
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
|
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
|
||||||
namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK) *specificInformersMap {
|
namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK, transformers TransformFuncByObject) *specificInformersMap {
|
||||||
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, createStructuredListWatch)
|
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers, createStructuredListWatch)
|
||||||
}
|
}
|
||||||
|
|
||||||
// newUnstructuredInformersMap creates a new InformersMap for unstructured objects.
|
// newUnstructuredInformersMap creates a new InformersMap for unstructured objects.
|
||||||
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
|
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
|
||||||
namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK) *specificInformersMap {
|
namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK, transformers TransformFuncByObject) *specificInformersMap {
|
||||||
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, createUnstructuredListWatch)
|
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers, createUnstructuredListWatch)
|
||||||
}
|
}
|
||||||
|
|
||||||
// newMetadataInformersMap creates a new InformersMap for metadata-only objects.
|
// newMetadataInformersMap creates a new InformersMap for metadata-only objects.
|
||||||
func newMetadataInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
|
func newMetadataInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
|
||||||
namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK) *specificInformersMap {
|
namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK, transformers TransformFuncByObject) *specificInformersMap {
|
||||||
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, createMetadataListWatch)
|
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers, createMetadataListWatch)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -54,7 +54,9 @@ func newSpecificInformersMap(config *rest.Config,
|
||||||
namespace string,
|
namespace string,
|
||||||
selectors SelectorsByGVK,
|
selectors SelectorsByGVK,
|
||||||
disableDeepCopy DisableDeepCopyByGVK,
|
disableDeepCopy DisableDeepCopyByGVK,
|
||||||
createListWatcher createListWatcherFunc) *specificInformersMap {
|
transformers TransformFuncByObject,
|
||||||
|
createListWatcher createListWatcherFunc,
|
||||||
|
) *specificInformersMap {
|
||||||
ip := &specificInformersMap{
|
ip := &specificInformersMap{
|
||||||
config: config,
|
config: config,
|
||||||
Scheme: scheme,
|
Scheme: scheme,
|
||||||
|
|
@ -68,6 +70,7 @@ func newSpecificInformersMap(config *rest.Config,
|
||||||
namespace: namespace,
|
namespace: namespace,
|
||||||
selectors: selectors.forGVK,
|
selectors: selectors.forGVK,
|
||||||
disableDeepCopy: disableDeepCopy,
|
disableDeepCopy: disableDeepCopy,
|
||||||
|
transformers: transformers,
|
||||||
}
|
}
|
||||||
return ip
|
return ip
|
||||||
}
|
}
|
||||||
|
|
@ -135,6 +138,9 @@ type specificInformersMap struct {
|
||||||
|
|
||||||
// disableDeepCopy indicates not to deep copy objects during get or list objects.
|
// disableDeepCopy indicates not to deep copy objects during get or list objects.
|
||||||
disableDeepCopy DisableDeepCopyByGVK
|
disableDeepCopy DisableDeepCopyByGVK
|
||||||
|
|
||||||
|
// transform funcs are applied to objects before they are committed to the cache
|
||||||
|
transformers TransformFuncByObject
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start calls Run on each of the informers and sets started to true. Blocks on the context.
|
// Start calls Run on each of the informers and sets started to true. Blocks on the context.
|
||||||
|
|
@ -227,6 +233,12 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob
|
||||||
ni := cache.NewSharedIndexInformer(lw, obj, resyncPeriod(ip.resync)(), cache.Indexers{
|
ni := cache.NewSharedIndexInformer(lw, obj, resyncPeriod(ip.resync)(), cache.Indexers{
|
||||||
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
|
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// Check to see if there is a transformer for this gvk
|
||||||
|
if err := ni.SetTransform(ip.transformers.Get(gvk)); err != nil {
|
||||||
|
return nil, false, err
|
||||||
|
}
|
||||||
|
|
||||||
rm, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
|
rm, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, false, err
|
return nil, false, err
|
||||||
|
|
|
||||||
50
vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/transformers.go
generated
vendored
Normal file
50
vendor/sigs.k8s.io/controller-runtime/pkg/cache/internal/transformers.go
generated
vendored
Normal file
|
|
@ -0,0 +1,50 @@
|
||||||
|
package internal
|
||||||
|
|
||||||
|
import (
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
|
"k8s.io/client-go/tools/cache"
|
||||||
|
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TransformFuncByObject provides access to the correct transform function for
|
||||||
|
// any given GVK.
|
||||||
|
type TransformFuncByObject interface {
|
||||||
|
Set(runtime.Object, *runtime.Scheme, cache.TransformFunc) error
|
||||||
|
Get(schema.GroupVersionKind) cache.TransformFunc
|
||||||
|
SetDefault(transformer cache.TransformFunc)
|
||||||
|
}
|
||||||
|
|
||||||
|
type transformFuncByGVK struct {
|
||||||
|
defaultTransform cache.TransformFunc
|
||||||
|
transformers map[schema.GroupVersionKind]cache.TransformFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewTransformFuncByObject creates a new TransformFuncByObject instance.
|
||||||
|
func NewTransformFuncByObject() TransformFuncByObject {
|
||||||
|
return &transformFuncByGVK{
|
||||||
|
transformers: make(map[schema.GroupVersionKind]cache.TransformFunc),
|
||||||
|
defaultTransform: nil,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *transformFuncByGVK) SetDefault(transformer cache.TransformFunc) {
|
||||||
|
t.defaultTransform = transformer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *transformFuncByGVK) Set(obj runtime.Object, scheme *runtime.Scheme, transformer cache.TransformFunc) error {
|
||||||
|
gvk, err := apiutil.GVKForObject(obj, scheme)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
t.transformers[gvk] = transformer
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t transformFuncByGVK) Get(gvk schema.GroupVersionKind) cache.TransformFunc {
|
||||||
|
if val, ok := t.transformers[gvk]; ok {
|
||||||
|
return val
|
||||||
|
}
|
||||||
|
return t.defaultTransform
|
||||||
|
}
|
||||||
|
|
@ -55,7 +55,7 @@ func MultiNamespacedCacheBuilder(namespaces []string) NewCacheFunc {
|
||||||
// create a cache for cluster scoped resources
|
// create a cache for cluster scoped resources
|
||||||
gCache, err := New(config, opts)
|
gCache, err := New(config, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error creating global cache %v", err)
|
return nil, fmt.Errorf("error creating global cache: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, ns := range namespaces {
|
for _, ns := range namespaces {
|
||||||
|
|
|
||||||
|
|
@ -22,6 +22,7 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/fsnotify/fsnotify"
|
"github.com/fsnotify/fsnotify"
|
||||||
|
"sigs.k8s.io/controller-runtime/pkg/certwatcher/metrics"
|
||||||
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
|
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -116,8 +117,10 @@ func (cw *CertWatcher) Watch() {
|
||||||
// and updates the current certificate on the watcher. If a callback is set, it
|
// and updates the current certificate on the watcher. If a callback is set, it
|
||||||
// is invoked with the new certificate.
|
// is invoked with the new certificate.
|
||||||
func (cw *CertWatcher) ReadCertificate() error {
|
func (cw *CertWatcher) ReadCertificate() error {
|
||||||
|
metrics.ReadCertificateTotal.Inc()
|
||||||
cert, err := tls.LoadX509KeyPair(cw.certPath, cw.keyPath)
|
cert, err := tls.LoadX509KeyPair(cw.certPath, cw.keyPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
metrics.ReadCertificateErrors.Inc()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
45
vendor/sigs.k8s.io/controller-runtime/pkg/certwatcher/metrics/metrics.go
generated
vendored
Normal file
45
vendor/sigs.k8s.io/controller-runtime/pkg/certwatcher/metrics/metrics.go
generated
vendored
Normal file
|
|
@ -0,0 +1,45 @@
|
||||||
|
/*
|
||||||
|
Copyright 2022 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package metrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"sigs.k8s.io/controller-runtime/pkg/metrics"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// ReadCertificateTotal is a prometheus counter metrics which holds the total
|
||||||
|
// number of certificate reads.
|
||||||
|
ReadCertificateTotal = prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
|
Name: "certwatcher_read_certificate_total",
|
||||||
|
Help: "Total number of certificate reads",
|
||||||
|
})
|
||||||
|
|
||||||
|
// ReadCertificateErrors is a prometheus counter metrics which holds the total
|
||||||
|
// number of errors from certificate read.
|
||||||
|
ReadCertificateErrors = prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
|
Name: "certwatcher_read_certificate_errors_total",
|
||||||
|
Help: "Total number of certificate read errors",
|
||||||
|
})
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
metrics.Registry.MustRegister(
|
||||||
|
ReadCertificateTotal,
|
||||||
|
ReadCertificateErrors,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
@ -19,6 +19,7 @@ package apiutil
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
"k8s.io/apimachinery/pkg/api/meta"
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
|
|
@ -38,7 +39,8 @@ type dynamicRESTMapper struct {
|
||||||
|
|
||||||
lazy bool
|
lazy bool
|
||||||
// Used for lazy init.
|
// Used for lazy init.
|
||||||
initOnce sync.Once
|
inited uint32
|
||||||
|
initMtx sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// DynamicRESTMapperOption is a functional option on the dynamicRESTMapper.
|
// DynamicRESTMapperOption is a functional option on the dynamicRESTMapper.
|
||||||
|
|
@ -125,11 +127,18 @@ func (drm *dynamicRESTMapper) setStaticMapper() error {
|
||||||
|
|
||||||
// init initializes drm only once if drm is lazy.
|
// init initializes drm only once if drm is lazy.
|
||||||
func (drm *dynamicRESTMapper) init() (err error) {
|
func (drm *dynamicRESTMapper) init() (err error) {
|
||||||
drm.initOnce.Do(func() {
|
// skip init if drm is not lazy or has initialized
|
||||||
if drm.lazy {
|
if !drm.lazy || atomic.LoadUint32(&drm.inited) != 0 {
|
||||||
err = drm.setStaticMapper()
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
drm.initMtx.Lock()
|
||||||
|
defer drm.initMtx.Unlock()
|
||||||
|
if drm.inited == 0 {
|
||||||
|
if err = drm.setStaticMapper(); err == nil {
|
||||||
|
atomic.StoreUint32(&drm.inited, 1)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
})
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -123,7 +123,7 @@ func loadConfig(context string) (*rest.Config, error) {
|
||||||
if _, ok := os.LookupEnv("HOME"); !ok {
|
if _, ok := os.LookupEnv("HOME"); !ok {
|
||||||
u, err := user.Current()
|
u, err := user.Current()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not get current user: %v", err)
|
return nil, fmt.Errorf("could not get current user: %w", err)
|
||||||
}
|
}
|
||||||
loadingRules.Precedence = append(loadingRules.Precedence, filepath.Join(u.HomeDir, clientcmd.RecommendedHomeDir, clientcmd.RecommendedFileName))
|
loadingRules.Precedence = append(loadingRules.Precedence, filepath.Join(u.HomeDir, clientcmd.RecommendedHomeDir, clientcmd.RecommendedFileName))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -146,9 +146,7 @@ func (mc *metadataClient) List(ctx context.Context, obj ObjectList, opts ...List
|
||||||
}
|
}
|
||||||
|
|
||||||
gvk := metadata.GroupVersionKind()
|
gvk := metadata.GroupVersionKind()
|
||||||
if strings.HasSuffix(gvk.Kind, "List") {
|
gvk.Kind = strings.TrimSuffix(gvk.Kind, "List")
|
||||||
gvk.Kind = gvk.Kind[:len(gvk.Kind)-4]
|
|
||||||
}
|
|
||||||
|
|
||||||
listOpts := ListOptions{}
|
listOpts := ListOptions{}
|
||||||
listOpts.ApplyOptions(opts)
|
listOpts.ApplyOptions(opts)
|
||||||
|
|
|
||||||
|
|
@ -56,7 +56,7 @@ func (n *namespacedClient) RESTMapper() meta.RESTMapper {
|
||||||
func (n *namespacedClient) Create(ctx context.Context, obj Object, opts ...CreateOption) error {
|
func (n *namespacedClient) Create(ctx context.Context, obj Object, opts ...CreateOption) error {
|
||||||
isNamespaceScoped, err := objectutil.IsAPINamespaced(obj, n.Scheme(), n.RESTMapper())
|
isNamespaceScoped, err := objectutil.IsAPINamespaced(obj, n.Scheme(), n.RESTMapper())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error finding the scope of the object: %v", err)
|
return fmt.Errorf("error finding the scope of the object: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
objectNamespace := obj.GetNamespace()
|
objectNamespace := obj.GetNamespace()
|
||||||
|
|
@ -74,7 +74,7 @@ func (n *namespacedClient) Create(ctx context.Context, obj Object, opts ...Creat
|
||||||
func (n *namespacedClient) Update(ctx context.Context, obj Object, opts ...UpdateOption) error {
|
func (n *namespacedClient) Update(ctx context.Context, obj Object, opts ...UpdateOption) error {
|
||||||
isNamespaceScoped, err := objectutil.IsAPINamespaced(obj, n.Scheme(), n.RESTMapper())
|
isNamespaceScoped, err := objectutil.IsAPINamespaced(obj, n.Scheme(), n.RESTMapper())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error finding the scope of the object: %v", err)
|
return fmt.Errorf("error finding the scope of the object: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
objectNamespace := obj.GetNamespace()
|
objectNamespace := obj.GetNamespace()
|
||||||
|
|
@ -92,7 +92,7 @@ func (n *namespacedClient) Update(ctx context.Context, obj Object, opts ...Updat
|
||||||
func (n *namespacedClient) Delete(ctx context.Context, obj Object, opts ...DeleteOption) error {
|
func (n *namespacedClient) Delete(ctx context.Context, obj Object, opts ...DeleteOption) error {
|
||||||
isNamespaceScoped, err := objectutil.IsAPINamespaced(obj, n.Scheme(), n.RESTMapper())
|
isNamespaceScoped, err := objectutil.IsAPINamespaced(obj, n.Scheme(), n.RESTMapper())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error finding the scope of the object: %v", err)
|
return fmt.Errorf("error finding the scope of the object: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
objectNamespace := obj.GetNamespace()
|
objectNamespace := obj.GetNamespace()
|
||||||
|
|
@ -110,7 +110,7 @@ func (n *namespacedClient) Delete(ctx context.Context, obj Object, opts ...Delet
|
||||||
func (n *namespacedClient) DeleteAllOf(ctx context.Context, obj Object, opts ...DeleteAllOfOption) error {
|
func (n *namespacedClient) DeleteAllOf(ctx context.Context, obj Object, opts ...DeleteAllOfOption) error {
|
||||||
isNamespaceScoped, err := objectutil.IsAPINamespaced(obj, n.Scheme(), n.RESTMapper())
|
isNamespaceScoped, err := objectutil.IsAPINamespaced(obj, n.Scheme(), n.RESTMapper())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error finding the scope of the object: %v", err)
|
return fmt.Errorf("error finding the scope of the object: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if isNamespaceScoped {
|
if isNamespaceScoped {
|
||||||
|
|
@ -123,7 +123,7 @@ func (n *namespacedClient) DeleteAllOf(ctx context.Context, obj Object, opts ...
|
||||||
func (n *namespacedClient) Patch(ctx context.Context, obj Object, patch Patch, opts ...PatchOption) error {
|
func (n *namespacedClient) Patch(ctx context.Context, obj Object, patch Patch, opts ...PatchOption) error {
|
||||||
isNamespaceScoped, err := objectutil.IsAPINamespaced(obj, n.Scheme(), n.RESTMapper())
|
isNamespaceScoped, err := objectutil.IsAPINamespaced(obj, n.Scheme(), n.RESTMapper())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error finding the scope of the object: %v", err)
|
return fmt.Errorf("error finding the scope of the object: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
objectNamespace := obj.GetNamespace()
|
objectNamespace := obj.GetNamespace()
|
||||||
|
|
@ -141,7 +141,7 @@ func (n *namespacedClient) Patch(ctx context.Context, obj Object, patch Patch, o
|
||||||
func (n *namespacedClient) Get(ctx context.Context, key ObjectKey, obj Object) error {
|
func (n *namespacedClient) Get(ctx context.Context, key ObjectKey, obj Object) error {
|
||||||
isNamespaceScoped, err := objectutil.IsAPINamespaced(obj, n.Scheme(), n.RESTMapper())
|
isNamespaceScoped, err := objectutil.IsAPINamespaced(obj, n.Scheme(), n.RESTMapper())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error finding the scope of the object: %v", err)
|
return fmt.Errorf("error finding the scope of the object: %w", err)
|
||||||
}
|
}
|
||||||
if isNamespaceScoped {
|
if isNamespaceScoped {
|
||||||
if key.Namespace != "" && key.Namespace != n.namespace {
|
if key.Namespace != "" && key.Namespace != n.namespace {
|
||||||
|
|
@ -179,7 +179,7 @@ func (nsw *namespacedClientStatusWriter) Update(ctx context.Context, obj Object,
|
||||||
isNamespaceScoped, err := objectutil.IsAPINamespaced(obj, nsw.namespacedclient.Scheme(), nsw.namespacedclient.RESTMapper())
|
isNamespaceScoped, err := objectutil.IsAPINamespaced(obj, nsw.namespacedclient.Scheme(), nsw.namespacedclient.RESTMapper())
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error finding the scope of the object: %v", err)
|
return fmt.Errorf("error finding the scope of the object: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
objectNamespace := obj.GetNamespace()
|
objectNamespace := obj.GetNamespace()
|
||||||
|
|
@ -198,7 +198,7 @@ func (nsw *namespacedClientStatusWriter) Patch(ctx context.Context, obj Object,
|
||||||
isNamespaceScoped, err := objectutil.IsAPINamespaced(obj, nsw.namespacedclient.Scheme(), nsw.namespacedclient.RESTMapper())
|
isNamespaceScoped, err := objectutil.IsAPINamespaced(obj, nsw.namespacedclient.Scheme(), nsw.namespacedclient.RESTMapper())
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error finding the scope of the object: %v", err)
|
return fmt.Errorf("error finding the scope of the object: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
objectNamespace := obj.GetNamespace()
|
objectNamespace := obj.GetNamespace()
|
||||||
|
|
|
||||||
|
|
@ -318,7 +318,7 @@ func (p PropagationPolicy) ApplyToDeleteAllOf(opts *DeleteAllOfOptions) {
|
||||||
// pre-parsed selectors (since generally, selectors will be executed
|
// pre-parsed selectors (since generally, selectors will be executed
|
||||||
// against the cache).
|
// against the cache).
|
||||||
type ListOptions struct {
|
type ListOptions struct {
|
||||||
// LabelSelector filters results by label. Use SetLabelSelector to
|
// LabelSelector filters results by label. Use labels.Parse() to
|
||||||
// set from raw string form.
|
// set from raw string form.
|
||||||
LabelSelector labels.Selector
|
LabelSelector labels.Selector
|
||||||
// FieldSelector filters results by a particular field. In order
|
// FieldSelector filters results by a particular field. In order
|
||||||
|
|
|
||||||
|
|
@ -95,8 +95,7 @@ func (uc *unstructuredClient) Update(ctx context.Context, obj Object, opts ...Up
|
||||||
|
|
||||||
// Delete implements client.Client.
|
// Delete implements client.Client.
|
||||||
func (uc *unstructuredClient) Delete(ctx context.Context, obj Object, opts ...DeleteOption) error {
|
func (uc *unstructuredClient) Delete(ctx context.Context, obj Object, opts ...DeleteOption) error {
|
||||||
_, ok := obj.(*unstructured.Unstructured)
|
if _, ok := obj.(*unstructured.Unstructured); !ok {
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("unstructured client did not understand object: %T", obj)
|
return fmt.Errorf("unstructured client did not understand object: %T", obj)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -118,8 +117,7 @@ func (uc *unstructuredClient) Delete(ctx context.Context, obj Object, opts ...De
|
||||||
|
|
||||||
// DeleteAllOf implements client.Client.
|
// DeleteAllOf implements client.Client.
|
||||||
func (uc *unstructuredClient) DeleteAllOf(ctx context.Context, obj Object, opts ...DeleteAllOfOption) error {
|
func (uc *unstructuredClient) DeleteAllOf(ctx context.Context, obj Object, opts ...DeleteAllOfOption) error {
|
||||||
_, ok := obj.(*unstructured.Unstructured)
|
if _, ok := obj.(*unstructured.Unstructured); !ok {
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("unstructured client did not understand object: %T", obj)
|
return fmt.Errorf("unstructured client did not understand object: %T", obj)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -141,8 +139,7 @@ func (uc *unstructuredClient) DeleteAllOf(ctx context.Context, obj Object, opts
|
||||||
|
|
||||||
// Patch implements client.Client.
|
// Patch implements client.Client.
|
||||||
func (uc *unstructuredClient) Patch(ctx context.Context, obj Object, patch Patch, opts ...PatchOption) error {
|
func (uc *unstructuredClient) Patch(ctx context.Context, obj Object, patch Patch, opts ...PatchOption) error {
|
||||||
_, ok := obj.(*unstructured.Unstructured)
|
if _, ok := obj.(*unstructured.Unstructured); !ok {
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("unstructured client did not understand object: %T", obj)
|
return fmt.Errorf("unstructured client did not understand object: %T", obj)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -201,9 +198,7 @@ func (uc *unstructuredClient) List(ctx context.Context, obj ObjectList, opts ...
|
||||||
}
|
}
|
||||||
|
|
||||||
gvk := u.GroupVersionKind()
|
gvk := u.GroupVersionKind()
|
||||||
if strings.HasSuffix(gvk.Kind, "List") {
|
gvk.Kind = strings.TrimSuffix(gvk.Kind, "List")
|
||||||
gvk.Kind = gvk.Kind[:len(gvk.Kind)-4]
|
|
||||||
}
|
|
||||||
|
|
||||||
listOpts := ListOptions{}
|
listOpts := ListOptions{}
|
||||||
listOpts.ApplyOptions(opts)
|
listOpts.ApplyOptions(opts)
|
||||||
|
|
@ -222,8 +217,7 @@ func (uc *unstructuredClient) List(ctx context.Context, obj ObjectList, opts ...
|
||||||
}
|
}
|
||||||
|
|
||||||
func (uc *unstructuredClient) UpdateStatus(ctx context.Context, obj Object, opts ...UpdateOption) error {
|
func (uc *unstructuredClient) UpdateStatus(ctx context.Context, obj Object, opts ...UpdateOption) error {
|
||||||
_, ok := obj.(*unstructured.Unstructured)
|
if _, ok := obj.(*unstructured.Unstructured); !ok {
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("unstructured client did not understand object: %T", obj)
|
return fmt.Errorf("unstructured client did not understand object: %T", obj)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -69,9 +69,7 @@ func (w *watchingClient) listOpts(opts ...ListOption) ListOptions {
|
||||||
|
|
||||||
func (w *watchingClient) metadataWatch(ctx context.Context, obj *metav1.PartialObjectMetadataList, opts ...ListOption) (watch.Interface, error) {
|
func (w *watchingClient) metadataWatch(ctx context.Context, obj *metav1.PartialObjectMetadataList, opts ...ListOption) (watch.Interface, error) {
|
||||||
gvk := obj.GroupVersionKind()
|
gvk := obj.GroupVersionKind()
|
||||||
if strings.HasSuffix(gvk.Kind, "List") {
|
gvk.Kind = strings.TrimSuffix(gvk.Kind, "List")
|
||||||
gvk.Kind = gvk.Kind[:len(gvk.Kind)-4]
|
|
||||||
}
|
|
||||||
|
|
||||||
listOpts := w.listOpts(opts...)
|
listOpts := w.listOpts(opts...)
|
||||||
|
|
||||||
|
|
@ -85,9 +83,7 @@ func (w *watchingClient) metadataWatch(ctx context.Context, obj *metav1.PartialO
|
||||||
|
|
||||||
func (w *watchingClient) unstructuredWatch(ctx context.Context, obj *unstructured.UnstructuredList, opts ...ListOption) (watch.Interface, error) {
|
func (w *watchingClient) unstructuredWatch(ctx context.Context, obj *unstructured.UnstructuredList, opts ...ListOption) (watch.Interface, error) {
|
||||||
gvk := obj.GroupVersionKind()
|
gvk := obj.GroupVersionKind()
|
||||||
if strings.HasSuffix(gvk.Kind, "List") {
|
gvk.Kind = strings.TrimSuffix(gvk.Kind, "List")
|
||||||
gvk.Kind = gvk.Kind[:len(gvk.Kind)-4]
|
|
||||||
}
|
|
||||||
|
|
||||||
r, err := w.client.unstructuredClient.cache.getResource(obj)
|
r, err := w.client.unstructuredClient.cache.getResource(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,7 @@ package config
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
ioutil "io/ioutil"
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
|
@ -96,7 +96,7 @@ func (d *DeferredFileLoader) loadFile() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
content, err := ioutil.ReadFile(d.path)
|
content, err := os.ReadFile(d.path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
d.err = fmt.Errorf("could not read file at %s", d.path)
|
d.err = fmt.Errorf("could not read file at %s", d.path)
|
||||||
return
|
return
|
||||||
|
|
|
||||||
|
|
@ -23,6 +23,8 @@ import (
|
||||||
|
|
||||||
"github.com/go-logr/logr"
|
"github.com/go-logr/logr"
|
||||||
"k8s.io/client-go/util/workqueue"
|
"k8s.io/client-go/util/workqueue"
|
||||||
|
"k8s.io/klog/v2"
|
||||||
|
|
||||||
"sigs.k8s.io/controller-runtime/pkg/handler"
|
"sigs.k8s.io/controller-runtime/pkg/handler"
|
||||||
"sigs.k8s.io/controller-runtime/pkg/internal/controller"
|
"sigs.k8s.io/controller-runtime/pkg/internal/controller"
|
||||||
"sigs.k8s.io/controller-runtime/pkg/manager"
|
"sigs.k8s.io/controller-runtime/pkg/manager"
|
||||||
|
|
@ -45,9 +47,9 @@ type Options struct {
|
||||||
// The overall is a token bucket and the per-item is exponential.
|
// The overall is a token bucket and the per-item is exponential.
|
||||||
RateLimiter ratelimiter.RateLimiter
|
RateLimiter ratelimiter.RateLimiter
|
||||||
|
|
||||||
// Log is the logger used for this controller and passed to each reconciliation
|
// LogConstructor is used to construct a logger used for this controller and passed
|
||||||
// request via the context field.
|
// to each reconciliation via the context field.
|
||||||
Log logr.Logger
|
LogConstructor func(request *reconcile.Request) logr.Logger
|
||||||
|
|
||||||
// CacheSyncTimeout refers to the time limit set to wait for syncing caches.
|
// CacheSyncTimeout refers to the time limit set to wait for syncing caches.
|
||||||
// Defaults to 2 minutes if not set.
|
// Defaults to 2 minutes if not set.
|
||||||
|
|
@ -104,8 +106,20 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
|
||||||
return nil, fmt.Errorf("must specify Name for Controller")
|
return nil, fmt.Errorf("must specify Name for Controller")
|
||||||
}
|
}
|
||||||
|
|
||||||
if options.Log.GetSink() == nil {
|
if options.LogConstructor == nil {
|
||||||
options.Log = mgr.GetLogger()
|
log := mgr.GetLogger().WithValues(
|
||||||
|
"controller", name,
|
||||||
|
)
|
||||||
|
options.LogConstructor = func(req *reconcile.Request) logr.Logger {
|
||||||
|
log := log
|
||||||
|
if req != nil {
|
||||||
|
log = log.WithValues(
|
||||||
|
"object", klog.KRef(req.Namespace, req.Name),
|
||||||
|
"namespace", req.Namespace, "name", req.Name,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
return log
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if options.MaxConcurrentReconciles <= 0 {
|
if options.MaxConcurrentReconciles <= 0 {
|
||||||
|
|
@ -135,7 +149,7 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
|
||||||
CacheSyncTimeout: options.CacheSyncTimeout,
|
CacheSyncTimeout: options.CacheSyncTimeout,
|
||||||
SetFields: mgr.SetFields,
|
SetFields: mgr.SetFields,
|
||||||
Name: name,
|
Name: name,
|
||||||
Log: options.Log.WithName("controller").WithName(name),
|
LogConstructor: options.LogConstructor,
|
||||||
RecoverPanic: options.RecoverPanic,
|
RecoverPanic: options.RecoverPanic,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
13
vendor/sigs.k8s.io/controller-runtime/pkg/controller/controllerutil/controllerutil.go
generated
vendored
13
vendor/sigs.k8s.io/controller-runtime/pkg/controller/controllerutil/controllerutil.go
generated
vendored
|
|
@ -345,30 +345,35 @@ func mutate(f MutateFn, key client.ObjectKey, obj client.Object) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// MutateFn is a function which mutates the existing object into it's desired state.
|
// MutateFn is a function which mutates the existing object into its desired state.
|
||||||
type MutateFn func() error
|
type MutateFn func() error
|
||||||
|
|
||||||
// AddFinalizer accepts an Object and adds the provided finalizer if not present.
|
// AddFinalizer accepts an Object and adds the provided finalizer if not present.
|
||||||
func AddFinalizer(o client.Object, finalizer string) {
|
// It returns an indication of whether it updated the object's list of finalizers.
|
||||||
|
func AddFinalizer(o client.Object, finalizer string) (finalizersUpdated bool) {
|
||||||
f := o.GetFinalizers()
|
f := o.GetFinalizers()
|
||||||
for _, e := range f {
|
for _, e := range f {
|
||||||
if e == finalizer {
|
if e == finalizer {
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
o.SetFinalizers(append(f, finalizer))
|
o.SetFinalizers(append(f, finalizer))
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemoveFinalizer accepts an Object and removes the provided finalizer if present.
|
// RemoveFinalizer accepts an Object and removes the provided finalizer if present.
|
||||||
func RemoveFinalizer(o client.Object, finalizer string) {
|
// It returns an indication of whether it updated the object's list of finalizers.
|
||||||
|
func RemoveFinalizer(o client.Object, finalizer string) (finalizersUpdated bool) {
|
||||||
f := o.GetFinalizers()
|
f := o.GetFinalizers()
|
||||||
for i := 0; i < len(f); i++ {
|
for i := 0; i < len(f); i++ {
|
||||||
if f[i] == finalizer {
|
if f[i] == finalizer {
|
||||||
f = append(f[:i], f[i+1:]...)
|
f = append(f[:i], f[i+1:]...)
|
||||||
i--
|
i--
|
||||||
|
finalizersUpdated = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
o.SetFinalizers(f)
|
o.SetFinalizers(f)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// ContainsFinalizer checks an Object that the provided finalizer is present.
|
// ContainsFinalizer checks an Object that the provided finalizer is present.
|
||||||
|
|
|
||||||
|
|
@ -25,6 +25,7 @@ import (
|
||||||
|
|
||||||
"github.com/go-logr/logr"
|
"github.com/go-logr/logr"
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/util/uuid"
|
||||||
"k8s.io/client-go/util/workqueue"
|
"k8s.io/client-go/util/workqueue"
|
||||||
"sigs.k8s.io/controller-runtime/pkg/handler"
|
"sigs.k8s.io/controller-runtime/pkg/handler"
|
||||||
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
|
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
|
||||||
|
|
@ -83,8 +84,11 @@ type Controller struct {
|
||||||
// startWatches maintains a list of sources, handlers, and predicates to start when the controller is started.
|
// startWatches maintains a list of sources, handlers, and predicates to start when the controller is started.
|
||||||
startWatches []watchDescription
|
startWatches []watchDescription
|
||||||
|
|
||||||
// Log is used to log messages to users during reconciliation, or for example when a watch is started.
|
// LogConstructor is used to construct a logger to then log messages to users during reconciliation,
|
||||||
Log logr.Logger
|
// or for example when a watch is started.
|
||||||
|
// Note: LogConstructor has to be able to handle nil requests as we are also using it
|
||||||
|
// outside the context of a reconciliation.
|
||||||
|
LogConstructor func(request *reconcile.Request) logr.Logger
|
||||||
|
|
||||||
// RecoverPanic indicates whether the panic caused by reconcile should be recovered.
|
// RecoverPanic indicates whether the panic caused by reconcile should be recovered.
|
||||||
RecoverPanic bool
|
RecoverPanic bool
|
||||||
|
|
@ -99,18 +103,21 @@ type watchDescription struct {
|
||||||
|
|
||||||
// Reconcile implements reconcile.Reconciler.
|
// Reconcile implements reconcile.Reconciler.
|
||||||
func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, err error) {
|
func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, err error) {
|
||||||
if c.RecoverPanic {
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
|
if c.RecoverPanic {
|
||||||
for _, fn := range utilruntime.PanicHandlers {
|
for _, fn := range utilruntime.PanicHandlers {
|
||||||
fn(r)
|
fn(r)
|
||||||
}
|
}
|
||||||
err = fmt.Errorf("panic: %v [recovered]", r)
|
err = fmt.Errorf("panic: %v [recovered]", r)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log := logf.FromContext(ctx)
|
||||||
|
log.Info(fmt.Sprintf("Observed a panic in reconciler: %v", r))
|
||||||
|
panic(r)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
|
||||||
log := c.Log.WithValues("name", req.Name, "namespace", req.Namespace)
|
|
||||||
ctx = logf.IntoContext(ctx, log)
|
|
||||||
return c.Do.Reconcile(ctx, req)
|
return c.Do.Reconcile(ctx, req)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -140,7 +147,7 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
c.Log.Info("Starting EventSource", "source", src)
|
c.LogConstructor(nil).Info("Starting EventSource", "source", src)
|
||||||
return src.Start(c.ctx, evthdler, c.Queue, prct...)
|
return src.Start(c.ctx, evthdler, c.Queue, prct...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -175,7 +182,7 @@ func (c *Controller) Start(ctx context.Context) error {
|
||||||
// caches to sync so that they have a chance to register their intendeded
|
// caches to sync so that they have a chance to register their intendeded
|
||||||
// caches.
|
// caches.
|
||||||
for _, watch := range c.startWatches {
|
for _, watch := range c.startWatches {
|
||||||
c.Log.Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src))
|
c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src))
|
||||||
|
|
||||||
if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
|
if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
@ -183,7 +190,7 @@ func (c *Controller) Start(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
|
// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
|
||||||
c.Log.Info("Starting Controller")
|
c.LogConstructor(nil).Info("Starting Controller")
|
||||||
|
|
||||||
for _, watch := range c.startWatches {
|
for _, watch := range c.startWatches {
|
||||||
syncingSource, ok := watch.src.(source.SyncingSource)
|
syncingSource, ok := watch.src.(source.SyncingSource)
|
||||||
|
|
@ -200,7 +207,7 @@ func (c *Controller) Start(ctx context.Context) error {
|
||||||
// is an error or a timeout
|
// is an error or a timeout
|
||||||
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
|
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
|
||||||
err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
|
err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
|
||||||
c.Log.Error(err, "Could not wait for Cache to sync")
|
c.LogConstructor(nil).Error(err, "Could not wait for Cache to sync")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -217,7 +224,7 @@ func (c *Controller) Start(ctx context.Context) error {
|
||||||
c.startWatches = nil
|
c.startWatches = nil
|
||||||
|
|
||||||
// Launch workers to process resources
|
// Launch workers to process resources
|
||||||
c.Log.Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
|
c.LogConstructor(nil).Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
|
||||||
wg.Add(c.MaxConcurrentReconciles)
|
wg.Add(c.MaxConcurrentReconciles)
|
||||||
for i := 0; i < c.MaxConcurrentReconciles; i++ {
|
for i := 0; i < c.MaxConcurrentReconciles; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
|
|
@ -237,9 +244,9 @@ func (c *Controller) Start(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
c.Log.Info("Shutdown signal received, waiting for all workers to finish")
|
c.LogConstructor(nil).Info("Shutdown signal received, waiting for all workers to finish")
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
c.Log.Info("All workers finished")
|
c.LogConstructor(nil).Info("All workers finished")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -291,19 +298,21 @@ func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
|
||||||
c.updateMetrics(time.Since(reconcileStartTS))
|
c.updateMetrics(time.Since(reconcileStartTS))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Make sure that the the object is a valid request.
|
// Make sure that the object is a valid request.
|
||||||
req, ok := obj.(reconcile.Request)
|
req, ok := obj.(reconcile.Request)
|
||||||
if !ok {
|
if !ok {
|
||||||
// As the item in the workqueue is actually invalid, we call
|
// As the item in the workqueue is actually invalid, we call
|
||||||
// Forget here else we'd go into a loop of attempting to
|
// Forget here else we'd go into a loop of attempting to
|
||||||
// process a work item that is invalid.
|
// process a work item that is invalid.
|
||||||
c.Queue.Forget(obj)
|
c.Queue.Forget(obj)
|
||||||
c.Log.Error(nil, "Queue item was not a Request", "type", fmt.Sprintf("%T", obj), "value", obj)
|
c.LogConstructor(nil).Error(nil, "Queue item was not a Request", "type", fmt.Sprintf("%T", obj), "value", obj)
|
||||||
// Return true, don't take a break
|
// Return true, don't take a break
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log := c.Log.WithValues("name", req.Name, "namespace", req.Namespace)
|
log := c.LogConstructor(&req)
|
||||||
|
|
||||||
|
log = log.WithValues("reconcileID", uuid.NewUUID())
|
||||||
ctx = logf.IntoContext(ctx, log)
|
ctx = logf.IntoContext(ctx, log)
|
||||||
|
|
||||||
// RunInformersAndControllers the syncHandler, passing it the Namespace/Name string of the
|
// RunInformersAndControllers the syncHandler, passing it the Namespace/Name string of the
|
||||||
|
|
@ -336,7 +345,7 @@ func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
|
||||||
|
|
||||||
// GetLogger returns this controller's logger.
|
// GetLogger returns this controller's logger.
|
||||||
func (c *Controller) GetLogger() logr.Logger {
|
func (c *Controller) GetLogger() logr.Logger {
|
||||||
return c.Log
|
return c.LogConstructor(nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// InjectFunc implement SetFields.Injector.
|
// InjectFunc implement SetFields.Injector.
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,6 @@ package leaderelection
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/util/uuid"
|
"k8s.io/apimachinery/pkg/util/uuid"
|
||||||
|
|
@ -27,6 +26,7 @@ import (
|
||||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||||
"k8s.io/client-go/rest"
|
"k8s.io/client-go/rest"
|
||||||
"k8s.io/client-go/tools/leaderelection/resourcelock"
|
"k8s.io/client-go/tools/leaderelection/resourcelock"
|
||||||
|
|
||||||
"sigs.k8s.io/controller-runtime/pkg/recorder"
|
"sigs.k8s.io/controller-runtime/pkg/recorder"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -39,7 +39,7 @@ type Options struct {
|
||||||
LeaderElection bool
|
LeaderElection bool
|
||||||
|
|
||||||
// LeaderElectionResourceLock determines which resource lock to use for leader election,
|
// LeaderElectionResourceLock determines which resource lock to use for leader election,
|
||||||
// defaults to "configmapsleases".
|
// defaults to "leases".
|
||||||
LeaderElectionResourceLock string
|
LeaderElectionResourceLock string
|
||||||
|
|
||||||
// LeaderElectionNamespace determines the namespace in which the leader
|
// LeaderElectionNamespace determines the namespace in which the leader
|
||||||
|
|
@ -57,11 +57,12 @@ func NewResourceLock(config *rest.Config, recorderProvider recorder.Provider, op
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Default resource lock to "configmapsleases". We must keep this default until we are sure all controller-runtime
|
// Default resource lock to "leases". The previous default (from v0.7.0 to v0.11.x) was configmapsleases, which was
|
||||||
// users have upgraded from the original default ConfigMap lock to a controller-runtime version that has this new
|
// used to migrate from configmaps to leases. Since the default was "configmapsleases" for over a year, spanning
|
||||||
// default. Many users of controller-runtime skip versions, so we should be extremely conservative here.
|
// five minor releases, any actively maintained operators are very likely to have a released version that uses
|
||||||
|
// "configmapsleases". Therefore defaulting to "leases" should be safe.
|
||||||
if options.LeaderElectionResourceLock == "" {
|
if options.LeaderElectionResourceLock == "" {
|
||||||
options.LeaderElectionResourceLock = resourcelock.ConfigMapsLeasesResourceLock
|
options.LeaderElectionResourceLock = resourcelock.LeasesResourceLock
|
||||||
}
|
}
|
||||||
|
|
||||||
// LeaderElectionID must be provided to prevent clashes
|
// LeaderElectionID must be provided to prevent clashes
|
||||||
|
|
@ -118,7 +119,7 @@ func getInClusterNamespace() (string, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load the namespace file and return its content
|
// Load the namespace file and return its content
|
||||||
namespace, err := ioutil.ReadFile(inClusterNamespacePath)
|
namespace, err := os.ReadFile(inClusterNamespacePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", fmt.Errorf("error reading namespace file: %w", err)
|
return "", fmt.Errorf("error reading namespace file: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -177,7 +177,7 @@ func (l *DelegatingLogSink) Fulfill(actual logr.LogSink) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDelegatingLogSink constructs a new DelegatingLogSink which uses
|
// NewDelegatingLogSink constructs a new DelegatingLogSink which uses
|
||||||
// the given logger before it's promise is fulfilled.
|
// the given logger before its promise is fulfilled.
|
||||||
func NewDelegatingLogSink(initial logr.LogSink) *DelegatingLogSink {
|
func NewDelegatingLogSink(initial logr.LogSink) *DelegatingLogSink {
|
||||||
l := &DelegatingLogSink{
|
l := &DelegatingLogSink{
|
||||||
logger: initial,
|
logger: initial,
|
||||||
|
|
|
||||||
|
|
@ -29,7 +29,7 @@ limitations under the License.
|
||||||
//
|
//
|
||||||
// All logging in controller-runtime is structured, using a set of interfaces
|
// All logging in controller-runtime is structured, using a set of interfaces
|
||||||
// defined by a package called logr
|
// defined by a package called logr
|
||||||
// (https://godoc.org/github.com/go-logr/logr). The sub-package zap provides
|
// (https://pkg.go.dev/github.com/go-logr/logr). The sub-package zap provides
|
||||||
// helpers for setting up logr backed by Zap (go.uber.org/zap).
|
// helpers for setting up logr backed by Zap (go.uber.org/zap).
|
||||||
package log
|
package log
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -47,7 +47,7 @@ type KubeAPIWarningLogger struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// HandleWarningHeader handles logging for responses from API server that are
|
// HandleWarningHeader handles logging for responses from API server that are
|
||||||
// warnings with code being 299 and uses a logr.Logger for it's logging purposes.
|
// warnings with code being 299 and uses a logr.Logger for its logging purposes.
|
||||||
func (l *KubeAPIWarningLogger) HandleWarningHeader(code int, agent string, message string) {
|
func (l *KubeAPIWarningLogger) HandleWarningHeader(code int, agent string, message string) {
|
||||||
if code != 299 || len(message) == 0 {
|
if code != 299 || len(message) == 0 {
|
||||||
return
|
return
|
||||||
|
|
|
||||||
|
|
@ -457,21 +457,21 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) {
|
||||||
// between conversion webhooks and the cache sync (usually initial list) which causes the webhooks
|
// between conversion webhooks and the cache sync (usually initial list) which causes the webhooks
|
||||||
// to never start because no cache can be populated.
|
// to never start because no cache can be populated.
|
||||||
if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil {
|
if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil {
|
||||||
if err != wait.ErrWaitTimeout {
|
if !errors.Is(err, wait.ErrWaitTimeout) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start and wait for caches.
|
// Start and wait for caches.
|
||||||
if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil {
|
if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil {
|
||||||
if err != wait.ErrWaitTimeout {
|
if !errors.Is(err, wait.ErrWaitTimeout) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start the non-leaderelection Runnables after the cache has synced.
|
// Start the non-leaderelection Runnables after the cache has synced.
|
||||||
if err := cm.runnables.Others.Start(cm.internalCtx); err != nil {
|
if err := cm.runnables.Others.Start(cm.internalCtx); err != nil {
|
||||||
if err != wait.ErrWaitTimeout {
|
if !errors.Is(err, wait.ErrWaitTimeout) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -587,7 +587,7 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e
|
||||||
}()
|
}()
|
||||||
|
|
||||||
<-cm.shutdownCtx.Done()
|
<-cm.shutdownCtx.Done()
|
||||||
if err := cm.shutdownCtx.Err(); err != nil && err != context.Canceled {
|
if err := cm.shutdownCtx.Err(); err != nil && !errors.Is(err, context.Canceled) {
|
||||||
if errors.Is(err, context.DeadlineExceeded) {
|
if errors.Is(err, context.DeadlineExceeded) {
|
||||||
if cm.gracefulShutdownTimeout > 0 {
|
if cm.gracefulShutdownTimeout > 0 {
|
||||||
return fmt.Errorf("failed waiting for all runnables to end within grace period of %s: %w", cm.gracefulShutdownTimeout, err)
|
return fmt.Errorf("failed waiting for all runnables to end within grace period of %s: %w", cm.gracefulShutdownTimeout, err)
|
||||||
|
|
@ -597,6 +597,7 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e
|
||||||
// For any other error, return the error.
|
// For any other error, return the error.
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -98,9 +98,9 @@ type Manager interface {
|
||||||
|
|
||||||
// Options are the arguments for creating a new Manager.
|
// Options are the arguments for creating a new Manager.
|
||||||
type Options struct {
|
type Options struct {
|
||||||
// Scheme is the scheme used to resolve runtime.Objects to GroupVersionKinds / Resources
|
// Scheme is the scheme used to resolve runtime.Objects to GroupVersionKinds / Resources.
|
||||||
// Defaults to the kubernetes/client-go scheme.Scheme, but it's almost always better
|
// Defaults to the kubernetes/client-go scheme.Scheme, but it's almost always better
|
||||||
// idea to pass your own scheme in. See the documentation in pkg/scheme for more information.
|
// to pass your own scheme in. See the documentation in pkg/scheme for more information.
|
||||||
Scheme *runtime.Scheme
|
Scheme *runtime.Scheme
|
||||||
|
|
||||||
// MapperProvider provides the rest mapper used to map go types to Kubernetes APIs
|
// MapperProvider provides the rest mapper used to map go types to Kubernetes APIs
|
||||||
|
|
@ -186,11 +186,11 @@ type Options struct {
|
||||||
// between tries of actions. Default is 2 seconds.
|
// between tries of actions. Default is 2 seconds.
|
||||||
RetryPeriod *time.Duration
|
RetryPeriod *time.Duration
|
||||||
|
|
||||||
// Namespace if specified restricts the manager's cache to watch objects in
|
// Namespace, if specified, restricts the manager's cache to watch objects in
|
||||||
// the desired namespace Defaults to all namespaces
|
// the desired namespace. Defaults to all namespaces.
|
||||||
//
|
//
|
||||||
// Note: If a namespace is specified, controllers can still Watch for a
|
// Note: If a namespace is specified, controllers can still Watch for a
|
||||||
// cluster-scoped resource (e.g Node). For namespaced resources the cache
|
// cluster-scoped resource (e.g Node). For namespaced resources, the cache
|
||||||
// will only hold objects from the desired namespace.
|
// will only hold objects from the desired namespace.
|
||||||
Namespace string
|
Namespace string
|
||||||
|
|
||||||
|
|
@ -228,7 +228,7 @@ type Options struct {
|
||||||
// if this is set, the Manager will use this server instead.
|
// if this is set, the Manager will use this server instead.
|
||||||
WebhookServer *webhook.Server
|
WebhookServer *webhook.Server
|
||||||
|
|
||||||
// Functions to all for a user to customize the values that will be injected.
|
// Functions to allow for a user to customize values that will be injected.
|
||||||
|
|
||||||
// NewCache is the function that will create the cache to be used
|
// NewCache is the function that will create the cache to be used
|
||||||
// by the manager. If not set this will use the default new cache function.
|
// by the manager. If not set this will use the default new cache function.
|
||||||
|
|
@ -239,6 +239,11 @@ type Options struct {
|
||||||
// use the cache for reads and the client for writes.
|
// use the cache for reads and the client for writes.
|
||||||
NewClient cluster.NewClientFunc
|
NewClient cluster.NewClientFunc
|
||||||
|
|
||||||
|
// BaseContext is the function that provides Context values to Runnables
|
||||||
|
// managed by the Manager. If a BaseContext function isn't provided, Runnables
|
||||||
|
// will receive a new Background Context instead.
|
||||||
|
BaseContext BaseContextFunc
|
||||||
|
|
||||||
// ClientDisableCacheFor tells the client that, if any cache is used, to bypass it
|
// ClientDisableCacheFor tells the client that, if any cache is used, to bypass it
|
||||||
// for the given objects.
|
// for the given objects.
|
||||||
ClientDisableCacheFor []client.Object
|
ClientDisableCacheFor []client.Object
|
||||||
|
|
@ -278,6 +283,10 @@ type Options struct {
|
||||||
newHealthProbeListener func(addr string) (net.Listener, error)
|
newHealthProbeListener func(addr string) (net.Listener, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BaseContextFunc is a function used to provide a base Context to Runnables
|
||||||
|
// managed by a Manager.
|
||||||
|
type BaseContextFunc func() context.Context
|
||||||
|
|
||||||
// Runnable allows a component to be started.
|
// Runnable allows a component to be started.
|
||||||
// It's very important that Start blocks until
|
// It's very important that Start blocks until
|
||||||
// it's done running.
|
// it's done running.
|
||||||
|
|
@ -335,11 +344,21 @@ func New(config *rest.Config, options Options) (Manager, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create the resource lock to enable leader election)
|
// Create the resource lock to enable leader election)
|
||||||
leaderConfig := options.LeaderElectionConfig
|
var leaderConfig *rest.Config
|
||||||
if leaderConfig == nil {
|
var leaderRecorderProvider *intrec.Provider
|
||||||
|
|
||||||
|
if options.LeaderElectionConfig == nil {
|
||||||
leaderConfig = rest.CopyConfig(config)
|
leaderConfig = rest.CopyConfig(config)
|
||||||
|
leaderRecorderProvider = recorderProvider
|
||||||
|
} else {
|
||||||
|
leaderConfig = rest.CopyConfig(options.LeaderElectionConfig)
|
||||||
|
leaderRecorderProvider, err = options.newRecorderProvider(leaderConfig, cluster.GetScheme(), options.Logger.WithName("events"), options.makeBroadcaster)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
resourceLock, err := options.newResourceLock(leaderConfig, recorderProvider, leaderelection.Options{
|
}
|
||||||
|
|
||||||
|
resourceLock, err := options.newResourceLock(leaderConfig, leaderRecorderProvider, leaderelection.Options{
|
||||||
LeaderElection: options.LeaderElection,
|
LeaderElection: options.LeaderElection,
|
||||||
LeaderElectionResourceLock: options.LeaderElectionResourceLock,
|
LeaderElectionResourceLock: options.LeaderElectionResourceLock,
|
||||||
LeaderElectionID: options.LeaderElectionID,
|
LeaderElectionID: options.LeaderElectionID,
|
||||||
|
|
@ -367,7 +386,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
errChan := make(chan error)
|
errChan := make(chan error)
|
||||||
runnables := newRunnables(errChan)
|
runnables := newRunnables(options.BaseContext, errChan)
|
||||||
|
|
||||||
return &controllerManager{
|
return &controllerManager{
|
||||||
stopProcedureEngaged: pointer.Int64(0),
|
stopProcedureEngaged: pointer.Int64(0),
|
||||||
|
|
@ -475,6 +494,11 @@ func (o Options) AndFromOrDie(loader config.ControllerManagerConfiguration) Opti
|
||||||
}
|
}
|
||||||
|
|
||||||
func (o Options) setLeaderElectionConfig(obj v1alpha1.ControllerManagerConfigurationSpec) Options {
|
func (o Options) setLeaderElectionConfig(obj v1alpha1.ControllerManagerConfigurationSpec) Options {
|
||||||
|
if obj.LeaderElection == nil {
|
||||||
|
// The source does not have any configuration; noop
|
||||||
|
return o
|
||||||
|
}
|
||||||
|
|
||||||
if !o.LeaderElection && obj.LeaderElection.LeaderElect != nil {
|
if !o.LeaderElection && obj.LeaderElection.LeaderElect != nil {
|
||||||
o.LeaderElection = *obj.LeaderElection.LeaderElect
|
o.LeaderElection = *obj.LeaderElection.LeaderElect
|
||||||
}
|
}
|
||||||
|
|
@ -514,11 +538,17 @@ func defaultHealthProbeListener(addr string) (net.Listener, error) {
|
||||||
|
|
||||||
ln, err := net.Listen("tcp", addr)
|
ln, err := net.Listen("tcp", addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error listening on %s: %v", addr, err)
|
return nil, fmt.Errorf("error listening on %s: %w", addr, err)
|
||||||
}
|
}
|
||||||
return ln, nil
|
return ln, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// defaultBaseContext is used as the BaseContext value in Options if one
|
||||||
|
// has not already been set.
|
||||||
|
func defaultBaseContext() context.Context {
|
||||||
|
return context.Background()
|
||||||
|
}
|
||||||
|
|
||||||
// setOptionsDefaults set default values for Options fields.
|
// setOptionsDefaults set default values for Options fields.
|
||||||
func setOptionsDefaults(options Options) Options {
|
func setOptionsDefaults(options Options) Options {
|
||||||
// Allow newResourceLock to be mocked
|
// Allow newResourceLock to be mocked
|
||||||
|
|
@ -582,5 +612,9 @@ func setOptionsDefaults(options Options) Options {
|
||||||
options.Logger = log.Log
|
options.Logger = log.Log
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if options.BaseContext == nil {
|
||||||
|
options.BaseContext = defaultBaseContext
|
||||||
|
}
|
||||||
|
|
||||||
return options
|
return options
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -35,12 +35,12 @@ type runnables struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// newRunnables creates a new runnables object.
|
// newRunnables creates a new runnables object.
|
||||||
func newRunnables(errChan chan error) *runnables {
|
func newRunnables(baseContext BaseContextFunc, errChan chan error) *runnables {
|
||||||
return &runnables{
|
return &runnables{
|
||||||
Webhooks: newRunnableGroup(errChan),
|
Webhooks: newRunnableGroup(baseContext, errChan),
|
||||||
Caches: newRunnableGroup(errChan),
|
Caches: newRunnableGroup(baseContext, errChan),
|
||||||
LeaderElection: newRunnableGroup(errChan),
|
LeaderElection: newRunnableGroup(baseContext, errChan),
|
||||||
Others: newRunnableGroup(errChan),
|
Others: newRunnableGroup(baseContext, errChan),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -100,14 +100,15 @@ type runnableGroup struct {
|
||||||
wg *sync.WaitGroup
|
wg *sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
func newRunnableGroup(errChan chan error) *runnableGroup {
|
func newRunnableGroup(baseContext BaseContextFunc, errChan chan error) *runnableGroup {
|
||||||
r := &runnableGroup{
|
r := &runnableGroup{
|
||||||
startReadyCh: make(chan *readyRunnable),
|
startReadyCh: make(chan *readyRunnable),
|
||||||
errChan: errChan,
|
errChan: errChan,
|
||||||
ch: make(chan *readyRunnable),
|
ch: make(chan *readyRunnable),
|
||||||
wg: new(sync.WaitGroup),
|
wg: new(sync.WaitGroup),
|
||||||
}
|
}
|
||||||
r.ctx, r.cancel = context.WithCancel(context.Background())
|
|
||||||
|
r.ctx, r.cancel = context.WithCancel(baseContext())
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,3 +1,4 @@
|
||||||
|
//go:build !windows
|
||||||
// +build !windows
|
// +build !windows
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
||||||
|
|
@ -24,6 +24,7 @@ import (
|
||||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||||
"sigs.k8s.io/controller-runtime/pkg/event"
|
"sigs.k8s.io/controller-runtime/pkg/event"
|
||||||
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
|
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
|
||||||
|
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
|
||||||
)
|
)
|
||||||
|
|
||||||
var log = logf.RuntimeLog.WithName("predicate").WithName("eventFilters")
|
var log = logf.RuntimeLog.WithName("predicate").WithName("eventFilters")
|
||||||
|
|
@ -239,6 +240,15 @@ type and struct {
|
||||||
predicates []Predicate
|
predicates []Predicate
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a and) InjectFunc(f inject.Func) error {
|
||||||
|
for _, p := range a.predicates {
|
||||||
|
if err := f(p); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (a and) Create(e event.CreateEvent) bool {
|
func (a and) Create(e event.CreateEvent) bool {
|
||||||
for _, p := range a.predicates {
|
for _, p := range a.predicates {
|
||||||
if !p.Create(e) {
|
if !p.Create(e) {
|
||||||
|
|
@ -284,6 +294,15 @@ type or struct {
|
||||||
predicates []Predicate
|
predicates []Predicate
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (o or) InjectFunc(f inject.Func) error {
|
||||||
|
for _, p := range o.predicates {
|
||||||
|
if err := f(p); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (o or) Create(e event.CreateEvent) bool {
|
func (o or) Create(e event.CreateEvent) bool {
|
||||||
for _, p := range o.predicates {
|
for _, p := range o.predicates {
|
||||||
if p.Create(e) {
|
if p.Create(e) {
|
||||||
|
|
|
||||||
|
|
@ -24,6 +24,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/api/meta"
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/client-go/util/workqueue"
|
"k8s.io/client-go/util/workqueue"
|
||||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||||
|
|
@ -133,9 +134,14 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w
|
||||||
i, lastErr = ks.cache.GetInformer(ctx, ks.Type)
|
i, lastErr = ks.cache.GetInformer(ctx, ks.Type)
|
||||||
if lastErr != nil {
|
if lastErr != nil {
|
||||||
kindMatchErr := &meta.NoKindMatchError{}
|
kindMatchErr := &meta.NoKindMatchError{}
|
||||||
if errors.As(lastErr, &kindMatchErr) {
|
switch {
|
||||||
|
case errors.As(lastErr, &kindMatchErr):
|
||||||
log.Error(lastErr, "if kind is a CRD, it should be installed before calling Start",
|
log.Error(lastErr, "if kind is a CRD, it should be installed before calling Start",
|
||||||
"kind", kindMatchErr.GroupKind)
|
"kind", kindMatchErr.GroupKind)
|
||||||
|
case runtime.IsNotRegisteredError(lastErr):
|
||||||
|
log.Error(lastErr, "kind must be registered to the Scheme")
|
||||||
|
default:
|
||||||
|
log.Error(lastErr, "failed to get informer from cache")
|
||||||
}
|
}
|
||||||
return false, nil // Retry.
|
return false, nil // Retry.
|
||||||
}
|
}
|
||||||
|
|
@ -175,6 +181,9 @@ func (ks *Kind) WaitForSync(ctx context.Context) error {
|
||||||
return err
|
return err
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
ks.startCancel()
|
ks.startCancel()
|
||||||
|
if errors.Is(ctx.Err(), context.Canceled) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
return errors.New("timed out waiting for cache to be synced")
|
return errors.New("timed out waiting for cache to be synced")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -21,7 +21,6 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
v1 "k8s.io/api/admission/v1"
|
v1 "k8s.io/api/admission/v1"
|
||||||
|
|
@ -60,7 +59,7 @@ func (wh *Webhook) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
|
|
||||||
defer r.Body.Close()
|
defer r.Body.Close()
|
||||||
if body, err = ioutil.ReadAll(r.Body); err != nil {
|
if body, err = io.ReadAll(r.Body); err != nil {
|
||||||
wh.log.Error(err, "unable to read the body from the incoming request")
|
wh.log.Error(err, "unable to read the body from the incoming request")
|
||||||
reviewResponse = Errored(http.StatusBadRequest, err)
|
reviewResponse = Errored(http.StatusBadRequest, err)
|
||||||
wh.writeResponse(w, reviewResponse)
|
wh.writeResponse(w, reviewResponse)
|
||||||
|
|
|
||||||
|
|
@ -21,7 +21,6 @@ import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"crypto/x509"
|
"crypto/x509"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
|
@ -241,9 +240,9 @@ func (s *Server) Start(ctx context.Context) error {
|
||||||
// load CA to verify client certificate
|
// load CA to verify client certificate
|
||||||
if s.ClientCAName != "" {
|
if s.ClientCAName != "" {
|
||||||
certPool := x509.NewCertPool()
|
certPool := x509.NewCertPool()
|
||||||
clientCABytes, err := ioutil.ReadFile(filepath.Join(s.CertDir, s.ClientCAName))
|
clientCABytes, err := os.ReadFile(filepath.Join(s.CertDir, s.ClientCAName))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to read client CA cert: %v", err)
|
return fmt.Errorf("failed to read client CA cert: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ok := certPool.AppendCertsFromPEM(clientCABytes)
|
ok := certPool.AppendCertsFromPEM(clientCABytes)
|
||||||
|
|
@ -305,11 +304,11 @@ func (s *Server) StartedChecker() healthz.Checker {
|
||||||
d := &net.Dialer{Timeout: 10 * time.Second}
|
d := &net.Dialer{Timeout: 10 * time.Second}
|
||||||
conn, err := tls.DialWithDialer(d, "tcp", net.JoinHostPort(s.Host, strconv.Itoa(s.Port)), config)
|
conn, err := tls.DialWithDialer(d, "tcp", net.JoinHostPort(s.Host, strconv.Itoa(s.Port)), config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("webhook server is not reachable: %v", err)
|
return fmt.Errorf("webhook server is not reachable: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := conn.Close(); err != nil {
|
if err := conn.Close(); err != nil {
|
||||||
return fmt.Errorf("webhook server is not reachable: closing connection: %v", err)
|
return fmt.Errorf("webhook server is not reachable: closing connection: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue