Use conditions helper in reconciler

Co-authored-by: Piaras Hoban <piaras@weave.works>
Co-authored-by: Hidde Beydals <hiddeco@users.noreply.github.com>
Co-authored-by: souleb <bah.soule@gmail.com>
Signed-off-by: Somtochi Onyekwere <somtochionyekwere@gmail.com>
This commit is contained in:
Somtochi Onyekwere 2021-09-09 13:14:52 +01:00
parent d394e04bbb
commit 19a0daa906
14 changed files with 455 additions and 309 deletions

View File

@ -85,10 +85,21 @@ type Alert struct {
}
// GetStatusConditions returns a pointer to the Status.Conditions slice
// Deprecated: use GetConditions instead.
func (in *Alert) GetStatusConditions() *[]metav1.Condition {
return &in.Status.Conditions
}
// GetConditions returns the status conditions of the object.
func (in *Alert) GetConditions() []metav1.Condition {
return in.Status.Conditions
}
// SetConditions sets the status conditions on the object.
func (in *Alert) SetConditions(conditions []metav1.Condition) {
in.Status.Conditions = conditions
}
// +kubebuilder:object:root=true
// AlertList contains a list of Alert
@ -101,9 +112,3 @@ type AlertList struct {
func init() {
SchemeBuilder.Register(&Alert{}, &AlertList{})
}
func SetAlertReadiness(alert Alert, status metav1.ConditionStatus, reason, message string) Alert {
meta.SetResourceCondition(&alert, meta.ReadyCondition, status, reason, message)
alert.Status.ObservedGeneration = alert.Generation
return alert
}

View File

@ -61,6 +61,11 @@ type ProviderSpec struct {
// a PEM-encoded CA certificate (`caFile`)
// +optional
CertSecretRef *meta.LocalObjectReference `json:"certSecretRef,omitempty"`
// This flag tells the controller to suspend subsequent events handling.
// Defaults to false.
// +optional
Suspend bool `json:"suspend,omitempty"`
}
const (
@ -112,10 +117,21 @@ type Provider struct {
}
// GetStatusConditions returns a pointer to the Status.Conditions slice
// Deprecated: use GetConditions instead.
func (in *Provider) GetStatusConditions() *[]metav1.Condition {
return &in.Status.Conditions
}
// GetConditions returns the status conditions of the object.
func (in *Provider) GetConditions() []metav1.Condition {
return in.Status.Conditions
}
// SetConditions sets the status conditions on the object.
func (in *Provider) SetConditions(conditions []metav1.Condition) {
in.Status.Conditions = conditions
}
// +kubebuilder:object:root=true
// ProviderList contains a list of Provider
@ -128,9 +144,3 @@ type ProviderList struct {
func init() {
SchemeBuilder.Register(&Provider{}, &ProviderList{})
}
func SetProviderReadiness(provider Provider, status metav1.ConditionStatus, reason, message string) Provider {
meta.SetResourceCondition(&provider, meta.ReadyCondition, status, reason, message)
provider.Status.ObservedGeneration = provider.Generation
return provider
}

View File

@ -80,24 +80,22 @@ const (
ACRReceiver string = "acr"
)
func ReceiverReady(receiver Receiver, reason, message, url string) Receiver {
meta.SetResourceCondition(&receiver, meta.ReadyCondition, metav1.ConditionTrue, reason, message)
receiver.Status.ObservedGeneration = receiver.Generation
receiver.Status.URL = url
return receiver
}
func ReceiverNotReady(receiver Receiver, reason, message string) Receiver {
meta.SetResourceCondition(&receiver, meta.ReadyCondition, metav1.ConditionFalse, reason, message)
receiver.Status.ObservedGeneration = receiver.Generation
return receiver
}
// GetStatusConditions returns a pointer to the Status.Conditions slice
// Deprecated: use GetConditions instead.
func (in *Receiver) GetStatusConditions() *[]metav1.Condition {
return &in.Status.Conditions
}
// GetConditions returns the status conditions of the object.
func (in *Receiver) GetConditions() []metav1.Condition {
return in.Status.Conditions
}
// SetConditions sets the status conditions on the object.
func (in *Receiver) SetConditions(conditions []metav1.Condition) {
in.Status.Conditions = conditions
}
// +genclient
// +genclient:Namespaced
// +kubebuilder:object:root=true

View File

@ -71,6 +71,9 @@ spec:
required:
- name
type: object
suspend:
description: This flag tells the controller to suspend subsequent events handling. Defaults to false.
type: boolean
type:
description: Type of provider
enum:

View File

@ -21,145 +21,185 @@ import (
"fmt"
"time"
"github.com/go-logr/logr"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"
helper "github.com/fluxcd/pkg/runtime/controller"
"github.com/fluxcd/pkg/runtime/patch"
"github.com/fluxcd/pkg/runtime/predicates"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/reference"
kerrors "k8s.io/apimachinery/pkg/util/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/metrics"
"github.com/fluxcd/pkg/runtime/predicates"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
"github.com/fluxcd/notification-controller/api/v1beta1"
)
var (
ProviderIndexKey string = ".metadata.provider"
)
// AlertReconciler reconciles a Alert object
type AlertReconciler struct {
client.Client
Scheme *runtime.Scheme
MetricsRecorder *metrics.Recorder
helper.Metrics
helper.Events
Scheme *runtime.Scheme
}
type AlertReconcilerOptions struct {
MaxConcurrentReconciles int
}
func (r *AlertReconciler) SetupWithManager(mgr ctrl.Manager) error {
return r.SetupWithManagerAndOptions(mgr, AlertReconcilerOptions{})
}
func (r *AlertReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts AlertReconcilerOptions) error {
if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &v1beta1.Alert{}, ProviderIndexKey,
func(o client.Object) []string {
alert := o.(*v1beta1.Alert)
return []string{
fmt.Sprintf("%s/%s", alert.GetNamespace(), alert.Spec.ProviderRef.Name),
}
}); err != nil {
return err
}
return ctrl.NewControllerManagedBy(mgr).
For(&v1beta1.Alert{}).
WithEventFilter(predicate.Or(predicate.GenerationChangedPredicate{}, predicates.ReconcileRequestedPredicate{})).
Watches(
&source.Kind{Type: &v1beta1.Provider{}},
handler.EnqueueRequestsFromMapFunc(r.requestsForProviderChange),
builder.WithPredicates(predicate.GenerationChangedPredicate{}),
).
WithOptions(controller.Options{MaxConcurrentReconciles: opts.MaxConcurrentReconciles}).
Complete(r)
}
// +kubebuilder:rbac:groups=notification.toolkit.fluxcd.io,resources=alerts,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=notification.toolkit.fluxcd.io,resources=alerts/status,verbs=get;update;patch
func (r *AlertReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
reconcileStart := time.Now()
log := logr.FromContext(ctx)
func (r *AlertReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) {
start := time.Now()
log := ctrl.LoggerFrom(ctx)
var alert v1beta1.Alert
if err := r.Get(ctx, req.NamespacedName, &alert); err != nil {
alert := &v1beta1.Alert{}
if err := r.Get(ctx, req.NamespacedName, alert); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// record suspension metrics
r.recordSuspension(ctx, alert)
r.RecordSuspend(ctx, alert, alert.Spec.Suspend)
// record reconciliation duration
if r.MetricsRecorder != nil {
objRef, err := reference.GetReference(r.Scheme, &alert)
if err != nil {
return ctrl.Result{}, err
}
defer r.MetricsRecorder.RecordDuration(*objRef, reconcileStart)
if alert.Spec.Suspend {
log.Info("Reconciliation is suspended for this object")
return ctrl.Result{}, nil
}
patchHelper, err := patch.NewHelper(alert, r.Client)
if err != nil {
return ctrl.Result{}, err
}
defer func() {
patchOpts := []patch.Option{
patch.WithOwnedConditions{
Conditions: []string{
meta.ReadyCondition,
meta.ReconcilingCondition,
meta.StalledCondition,
},
},
patch.WithStatusObservedGeneration{},
}
if retErr == nil && (result.IsZero() || !result.Requeue) {
conditions.Delete(alert, meta.ReconcilingCondition)
patchOpts = append(patchOpts, patch.WithStatusObservedGeneration{})
readyCondition := conditions.Get(alert, meta.ReadyCondition)
switch readyCondition.Status {
case metav1.ConditionFalse:
// As we are no longer reconciling and the end-state is not ready, the reconciliation has stalled
conditions.MarkStalled(alert, readyCondition.Reason, readyCondition.Message)
case metav1.ConditionTrue:
// As we are no longer reconciling and the end-state is ready, the reconciliation is no longer stalled
conditions.Delete(alert, meta.StalledCondition)
}
}
if err := patchHelper.Patch(ctx, alert, patchOpts...); err != nil {
retErr = kerrors.NewAggregate([]error{retErr, err})
}
r.Metrics.RecordReadiness(ctx, alert)
r.Metrics.RecordDuration(ctx, alert, start)
}()
return r.reconcile(ctx, alert)
}
func (r *AlertReconciler) reconcile(ctx context.Context, alert *v1beta1.Alert) (ctrl.Result, error) {
// Mark the resource as under reconciliation
conditions.MarkReconciling(alert, meta.ProgressingReason, "")
// validate alert spec and provider
if err := r.validate(ctx, alert); err != nil {
alert = v1beta1.SetAlertReadiness(alert, metav1.ConditionFalse, meta.ReconciliationFailedReason, err.Error())
if err := r.patchStatus(ctx, req, alert.Status); err != nil {
return ctrl.Result{Requeue: true}, err
}
return ctrl.Result{Requeue: true}, err
conditions.MarkFalse(alert, meta.ReadyCondition, meta.FailedReason, err.Error())
return ctrl.Result{}, client.IgnoreNotFound(err)
}
if !apimeta.IsStatusConditionTrue(alert.Status.Conditions, meta.ReadyCondition) || alert.Status.ObservedGeneration != alert.Generation {
alert = v1beta1.SetAlertReadiness(alert, metav1.ConditionTrue, v1beta1.InitializedReason, v1beta1.InitializedReason)
if err := r.patchStatus(ctx, req, alert.Status); err != nil {
return ctrl.Result{Requeue: true}, err
}
log.Info("Alert initialised")
}
r.recordReadiness(ctx, alert)
conditions.MarkTrue(alert, meta.ReadyCondition, meta.SucceededReason, v1beta1.InitializedReason)
ctrl.LoggerFrom(ctx).Info("Alert initialized")
return ctrl.Result{}, nil
}
func (r *AlertReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1beta1.Alert{}).
WithEventFilter(predicate.Or(predicate.GenerationChangedPredicate{}, predicates.ReconcileRequestedPredicate{})).
Complete(r)
}
func (r *AlertReconciler) validate(ctx context.Context, alert v1beta1.Alert) error {
var provider v1beta1.Provider
func (r *AlertReconciler) validate(ctx context.Context, alert *v1beta1.Alert) error {
provider := &v1beta1.Provider{}
providerName := types.NamespacedName{Namespace: alert.Namespace, Name: alert.Spec.ProviderRef.Name}
if err := r.Get(ctx, providerName, &provider); err != nil {
return fmt.Errorf("failed to get provider %s, error: %w", providerName.String(), err)
if err := r.Get(ctx, providerName, provider); err != nil {
// log not found errors since they get filtered out
ctrl.LoggerFrom(ctx).Error(err, "failed to get provider %s, error: %w", providerName.String())
return fmt.Errorf("failed to get provider '%s', error: %w", providerName.String(), err)
}
if !apimeta.IsStatusConditionTrue(provider.Status.Conditions, meta.ReadyCondition) {
if !conditions.IsReady(provider) {
return fmt.Errorf("provider %s is not ready", providerName.String())
}
return nil
}
func (r *AlertReconciler) recordSuspension(ctx context.Context, alert v1beta1.Alert) {
if r.MetricsRecorder == nil {
return
}
log := logr.FromContext(ctx)
objRef, err := reference.GetReference(r.Scheme, &alert)
if err != nil {
log.Error(err, "unable to record suspended metric")
return
func (r *AlertReconciler) requestsForProviderChange(o client.Object) []reconcile.Request {
provider, ok := o.(*v1beta1.Provider)
if !ok {
panic(fmt.Errorf("expected a provider, got %T", o))
}
if !alert.DeletionTimestamp.IsZero() {
r.MetricsRecorder.RecordSuspend(*objRef, false)
} else {
r.MetricsRecorder.RecordSuspend(*objRef, alert.Spec.Suspend)
ctx := context.Background()
var list v1beta1.AlertList
if err := r.List(ctx, &list, client.MatchingFields{
ProviderIndexKey: client.ObjectKeyFromObject(provider).String(),
}); err != nil {
return nil
}
}
func (r *AlertReconciler) recordReadiness(ctx context.Context, alert v1beta1.Alert) {
log := logr.FromContext(ctx)
if r.MetricsRecorder == nil {
return
}
objRef, err := reference.GetReference(r.Scheme, &alert)
if err != nil {
log.Error(err, "unable to record readiness metric")
return
}
if rc := apimeta.FindStatusCondition(alert.Status.Conditions, meta.ReadyCondition); rc != nil {
r.MetricsRecorder.RecordCondition(*objRef, *rc, !alert.DeletionTimestamp.IsZero())
} else {
r.MetricsRecorder.RecordCondition(*objRef, metav1.Condition{
Type: meta.ReadyCondition,
Status: metav1.ConditionUnknown,
}, !alert.DeletionTimestamp.IsZero())
}
}
func (r *AlertReconciler) patchStatus(ctx context.Context, req ctrl.Request, newStatus v1beta1.AlertStatus) error {
var alert v1beta1.Alert
if err := r.Get(ctx, req.NamespacedName, &alert); err != nil {
return err
}
patch := client.MergeFrom(alert.DeepCopy())
alert.Status = newStatus
return r.Status().Patch(ctx, &alert, patch)
var reqs []reconcile.Request
for _, i := range list.Items {
reqs = append(reqs, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(&i)})
}
return reqs
}

View File

@ -22,19 +22,20 @@ import (
"fmt"
"time"
"github.com/go-logr/logr"
"github.com/fluxcd/pkg/runtime/conditions"
helper "github.com/fluxcd/pkg/runtime/controller"
"github.com/fluxcd/pkg/runtime/patch"
corev1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/reference"
kerrors "k8s.io/apimachinery/pkg/util/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/metrics"
"github.com/fluxcd/pkg/runtime/predicates"
"github.com/fluxcd/notification-controller/api/v1beta1"
@ -44,61 +45,108 @@ import (
// ProviderReconciler reconciles a Provider object
type ProviderReconciler struct {
client.Client
Scheme *runtime.Scheme
MetricsRecorder *metrics.Recorder
helper.Metrics
Scheme *runtime.Scheme
}
type ProviderReconcilerOptions struct {
MaxConcurrentReconciles int
}
func (r *ProviderReconciler) SetupWithManager(mgr ctrl.Manager) error {
return r.SetupWithManagerAndOptions(mgr, ProviderReconcilerOptions{})
}
func (r *ProviderReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts ProviderReconcilerOptions) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1beta1.Provider{}).
WithEventFilter(predicate.Or(predicate.GenerationChangedPredicate{}, predicates.ReconcileRequestedPredicate{})).
WithOptions(controller.Options{MaxConcurrentReconciles: opts.MaxConcurrentReconciles}).
Complete(r)
}
// +kubebuilder:rbac:groups=notification.toolkit.fluxcd.io,resources=providers,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=notification.toolkit.fluxcd.io,resources=providers/status,verbs=get;update;patch
func (r *ProviderReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
reconcileStart := time.Now()
log := logr.FromContext(ctx)
func (r *ProviderReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) {
start := time.Now()
log := ctrl.LoggerFrom(ctx)
var provider v1beta1.Provider
if err := r.Get(ctx, req.NamespacedName, &provider); err != nil {
provider := &v1beta1.Provider{}
if err := r.Get(ctx, req.NamespacedName, provider); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// record reconciliation duration
if r.MetricsRecorder != nil {
objRef, err := reference.GetReference(r.Scheme, &provider)
if err != nil {
return ctrl.Result{}, err
}
defer r.MetricsRecorder.RecordDuration(*objRef, reconcileStart)
r.RecordSuspend(ctx, provider, provider.Spec.Suspend)
// return early if the object is suspended
if provider.Spec.Suspend {
log.Info("Reconciliation is suspended for this object")
return ctrl.Result{}, nil
}
patchHelper, err := patch.NewHelper(provider, r.Client)
if err != nil {
return ctrl.Result{}, err
}
defer func() {
patchOpts := []patch.Option{
patch.WithOwnedConditions{
Conditions: []string{
meta.ReadyCondition,
meta.ReconcilingCondition,
meta.StalledCondition,
},
},
patch.WithStatusObservedGeneration{},
}
if retErr == nil && (result.IsZero() || !result.Requeue) {
conditions.Delete(provider, meta.ReconcilingCondition)
patchOpts = append(patchOpts, patch.WithStatusObservedGeneration{})
readyCondition := conditions.Get(provider, meta.ReadyCondition)
switch readyCondition.Status {
case metav1.ConditionFalse:
// As we are no longer reconciling and the end-state is not ready, the reconciliation has stalled
conditions.MarkStalled(provider, readyCondition.Reason, readyCondition.Message)
case metav1.ConditionTrue:
// As we are no longer reconciling and the end-state is ready, the reconciliation is no longer stalled
conditions.Delete(provider, meta.StalledCondition)
}
}
if err := patchHelper.Patch(ctx, provider, patchOpts...); err != nil {
retErr = kerrors.NewAggregate([]error{retErr, err})
}
r.Metrics.RecordReadiness(ctx, provider)
r.Metrics.RecordDuration(ctx, provider, start)
}()
return r.reconcile(ctx, provider)
}
func (r *ProviderReconciler) reconcile(ctx context.Context, obj *v1beta1.Provider) (ctrl.Result, error) {
// Mark the resource as under reconciliation
conditions.MarkReconciling(obj, meta.ProgressingReason, "")
// validate provider spec and credentials
if err := r.validate(ctx, provider); err != nil {
provider = v1beta1.SetProviderReadiness(provider, metav1.ConditionFalse, meta.ReconciliationFailedReason, err.Error())
if err := r.patchStatus(ctx, req, provider.Status); err != nil {
return ctrl.Result{Requeue: true}, err
}
return ctrl.Result{Requeue: true}, err
if err := r.validate(ctx, obj); err != nil {
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, err.Error())
return ctrl.Result{}, err
}
if !apimeta.IsStatusConditionTrue(provider.Status.Conditions, meta.ReadyCondition) {
provider = v1beta1.SetProviderReadiness(provider, metav1.ConditionTrue, v1beta1.InitializedReason, v1beta1.InitializedReason)
if err := r.patchStatus(ctx, req, provider.Status); err != nil {
return ctrl.Result{Requeue: true}, err
}
log.Info("Provider initialised")
}
r.recordReadiness(ctx, provider)
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, v1beta1.InitializedReason)
ctrl.LoggerFrom(ctx).Info("Provider initialized")
return ctrl.Result{}, nil
}
func (r *ProviderReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1beta1.Provider{}).
WithEventFilter(predicate.Or(predicate.GenerationChangedPredicate{}, predicates.ReconcileRequestedPredicate{})).
Complete(r)
}
func (r *ProviderReconciler) validate(ctx context.Context, provider v1beta1.Provider) error {
func (r *ProviderReconciler) validate(ctx context.Context, provider *v1beta1.Provider) error {
address := provider.Spec.Address
token := ""
if provider.Spec.SecretRef != nil {
@ -145,41 +193,8 @@ func (r *ProviderReconciler) validate(ctx context.Context, provider v1beta1.Prov
factory := notifier.NewFactory(address, provider.Spec.Proxy, provider.Spec.Username, provider.Spec.Channel, token, certPool)
if _, err := factory.Notifier(provider.Spec.Type); err != nil {
return fmt.Errorf("failed to initialise provider, error: %w", err)
return fmt.Errorf("failed to initialize provider, error: %w", err)
}
return nil
}
func (r *ProviderReconciler) recordReadiness(ctx context.Context, provider v1beta1.Provider) {
log := logr.FromContext(ctx)
if r.MetricsRecorder == nil {
return
}
objRef, err := reference.GetReference(r.Scheme, &provider)
if err != nil {
log.Error(err, "unable to record readiness metric")
return
}
if rc := apimeta.FindStatusCondition(provider.Status.Conditions, meta.ReadyCondition); rc != nil {
r.MetricsRecorder.RecordCondition(*objRef, *rc, !provider.DeletionTimestamp.IsZero())
} else {
r.MetricsRecorder.RecordCondition(*objRef, metav1.Condition{
Type: meta.ReadyCondition,
Status: metav1.ConditionUnknown,
}, !provider.DeletionTimestamp.IsZero())
}
}
func (r *ProviderReconciler) patchStatus(ctx context.Context, req ctrl.Request, newStatus v1beta1.ProviderStatus) error {
var provider v1beta1.Provider
if err := r.Get(ctx, req.NamespacedName, &provider); err != nil {
return err
}
patch := client.MergeFrom(provider.DeepCopy())
provider.Status = newStatus
return r.Status().Patch(ctx, &provider, patch)
}

View File

@ -20,18 +20,21 @@ import (
"context"
"crypto/sha256"
"fmt"
"k8s.io/client-go/tools/reference"
"time"
"github.com/go-logr/logr"
"github.com/fluxcd/pkg/runtime/conditions"
helper "github.com/fluxcd/pkg/runtime/controller"
"github.com/fluxcd/pkg/runtime/patch"
corev1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/metrics"
"github.com/fluxcd/notification-controller/api/v1beta1"
)
@ -39,8 +42,12 @@ import (
// ReceiverReconciler reconciles a Receiver object
type ReceiverReconciler struct {
client.Client
Scheme *runtime.Scheme
MetricsRecorder *metrics.Recorder
helper.Metrics
Scheme *runtime.Scheme
}
type ReceiverReconcilerOptions struct {
MaxConcurrentReconciles int
}
// +kubebuilder:rbac:groups=notification.toolkit.fluxcd.io,resources=receivers,verbs=get;list;watch;create;update;patch;delete
@ -55,53 +62,116 @@ type ReceiverReconciler struct {
// +kubebuilder:rbac:groups=image.fluxcd.io,resources=imagerepositories/status,verbs=get
// +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch
func (r *ReceiverReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := logr.FromContext(ctx)
func (r *ReceiverReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) {
start := time.Now()
log := ctrl.LoggerFrom(ctx)
var receiver v1beta1.Receiver
if err := r.Get(ctx, req.NamespacedName, &receiver); err != nil {
receiver := &v1beta1.Receiver{}
if err := r.Get(ctx, req.NamespacedName, receiver); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// record suspension metrics
defer r.recordSuspension(ctx, receiver)
token, err := r.token(ctx, receiver)
if err != nil {
receiver = v1beta1.ReceiverNotReady(receiver, v1beta1.TokenNotFoundReason, err.Error())
if err := r.patchStatus(ctx, req, receiver.Status); err != nil {
return ctrl.Result{Requeue: true}, err
}
return ctrl.Result{}, err
}
isReady := apimeta.IsStatusConditionTrue(receiver.Status.Conditions, meta.ReadyCondition)
receiverURL := fmt.Sprintf("/hook/%s", sha256sum(token+receiver.Name+receiver.Namespace))
if receiver.Status.URL == receiverURL && isReady && receiver.Status.ObservedGeneration == receiver.Generation {
defer r.RecordSuspend(ctx, receiver, receiver.Spec.Suspend)
// return early if the object is suspended
if receiver.Spec.Suspend {
log.Info("Reconciliation is suspended for this object")
return ctrl.Result{}, nil
}
receiver = v1beta1.ReceiverReady(receiver,
v1beta1.InitializedReason,
"Receiver initialised with URL: "+receiverURL,
receiverURL)
if err := r.patchStatus(ctx, req, receiver.Status); err != nil {
return ctrl.Result{Requeue: true}, err
// Initialize the patch helper
patchHelper, err := patch.NewHelper(receiver, r.Client)
if err != nil {
return ctrl.Result{}, err
}
defer func() {
// Patch the object, ignoring conflicts on the conditions owned by this controller
patchOpts := []patch.Option{
patch.WithOwnedConditions{
Conditions: []string{
meta.ReadyCondition,
meta.ReconcilingCondition,
meta.StalledCondition,
},
},
patch.WithStatusObservedGeneration{},
}
// Determine if the resource is still being reconciled, or if it has stalled, and record this observation
if retErr == nil && (result.IsZero() || !result.Requeue) {
// We are no longer reconciling
conditions.Delete(receiver, meta.ReconcilingCondition)
// We have now observed this generation
patchOpts = append(patchOpts, patch.WithStatusObservedGeneration{})
readyCondition := conditions.Get(receiver, meta.ReadyCondition)
switch readyCondition.Status {
case metav1.ConditionFalse:
// As we are no longer reconciling and the end-state is not ready, the reconciliation has stalled
conditions.MarkStalled(receiver, readyCondition.Reason, readyCondition.Message)
case metav1.ConditionTrue:
// As we are no longer reconciling and the end-state is ready, the reconciliation is no longer stalled
conditions.Delete(receiver, meta.StalledCondition)
}
}
// Finally, patch the resource
if err := patchHelper.Patch(ctx, receiver, patchOpts...); err != nil {
retErr = errors.NewAggregate([]error{retErr, err})
}
// Always record readiness and duration metrics
r.Metrics.RecordReadiness(ctx, receiver)
r.Metrics.RecordDuration(ctx, receiver, start)
}()
return r.reconcile(ctx, receiver)
}
func (r *ReceiverReconciler) SetupWithManager(mgr ctrl.Manager) error {
return r.SetupWithManagerAndOptions(mgr, ReceiverReconcilerOptions{})
}
func (r *ReceiverReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts ReceiverReconcilerOptions) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1beta1.Receiver{}).
WithOptions(controller.Options{MaxConcurrentReconciles: opts.MaxConcurrentReconciles}).
Complete(r)
}
/// reconcile steps through the actual reconciliation tasks for the object, it returns early on the first step that
// produces an error.
func (r *ReceiverReconciler) reconcile(ctx context.Context, obj *v1beta1.Receiver) (ctrl.Result, error) {
// Mark the resource as under reconciliation
conditions.MarkReconciling(obj, meta.ProgressingReason, "")
token, err := r.token(ctx, obj)
if err != nil {
conditions.MarkFalse(obj, meta.ReadyCondition, v1beta1.TokenNotFoundReason, err.Error())
return ctrl.Result{}, err
}
log.Info("Receiver initialised")
receiverURL := fmt.Sprintf("/hook/%s", sha256sum(token+obj.Name+obj.Namespace))
// Nothing has changed so return early
if obj.Status.URL == receiverURL && obj.Status.ObservedGeneration == obj.Generation {
return ctrl.Result{}, nil
}
// Mark the resource as ready and set the URL
conditions.MarkTrue(obj, meta.ReadyCondition, v1beta1.InitializedReason, "Receiver initialised with URL: "+receiverURL,
receiverURL)
obj.Status.URL = receiverURL
ctrl.LoggerFrom(ctx).Info("Receiver initialized")
return ctrl.Result{}, nil
}
func (r *ReceiverReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1beta1.Receiver{}).
Complete(r)
}
// token extract the token value from the secret object
func (r *ReceiverReconciler) token(ctx context.Context, receiver v1beta1.Receiver) (string, error) {
func (r *ReceiverReconciler) token(ctx context.Context, receiver *v1beta1.Receiver) (string, error) {
token := ""
secretName := types.NamespacedName{
Namespace: receiver.GetNamespace(),
@ -123,38 +193,7 @@ func (r *ReceiverReconciler) token(ctx context.Context, receiver v1beta1.Receive
return token, nil
}
func (r *ReceiverReconciler) recordSuspension(ctx context.Context, rcvr v1beta1.Receiver) {
if r.MetricsRecorder == nil {
return
}
log := logr.FromContext(ctx)
objRef, err := reference.GetReference(r.Scheme, &rcvr)
if err != nil {
log.Error(err, "unable to record suspended metric")
return
}
if !rcvr.DeletionTimestamp.IsZero() {
r.MetricsRecorder.RecordSuspend(*objRef, false)
} else {
r.MetricsRecorder.RecordSuspend(*objRef, rcvr.Spec.Suspend)
}
}
func sha256sum(val string) string {
digest := sha256.Sum256([]byte(val))
return fmt.Sprintf("%x", digest)
}
func (r *ReceiverReconciler) patchStatus(ctx context.Context, req ctrl.Request, newStatus v1beta1.ReceiverStatus) error {
var receiver v1beta1.Receiver
if err := r.Get(ctx, req.NamespacedName, &receiver); err != nil {
return err
}
patch := client.MergeFrom(receiver.DeepCopy())
receiver.Status = newStatus
return r.Status().Patch(ctx, &receiver, patch)
}

View File

@ -26,6 +26,7 @@ import (
"testing"
"time"
"github.com/fluxcd/pkg/runtime/conditions"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/sethvargo/go-limiter/memorystore"
@ -170,7 +171,7 @@ var _ = Describe("Event handlers", func() {
Expect(k8sClient.Create(context.Background(), &alert)).To(Succeed())
// the event server won't dispatch to an alert if it has
// not been marked "ready"
meta.SetResourceCondition(&alert, meta.ReadyCondition, metav1.ConditionTrue, meta.ReconciliationSucceededReason, "artificially set to ready")
conditions.MarkTrue(&alert, meta.ReadyCondition, meta.SucceededReason, "artificially set to ready")
Expect(k8sClient.Status().Update(context.Background(), &alert)).To(Succeed())
})

View File

@ -313,6 +313,19 @@ github.com/fluxcd/pkg/apis/meta.LocalObjectReference
a PEM-encoded CA certificate (<code>caFile</code>)</p>
</td>
</tr>
<tr>
<td>
<code>suspend</code><br>
<em>
bool
</em>
</td>
<td>
<em>(Optional)</em>
<p>This flag tells the controller to suspend subsequent events handling.
Defaults to false.</p>
</td>
</tr>
</table>
</td>
</tr>
@ -791,6 +804,19 @@ github.com/fluxcd/pkg/apis/meta.LocalObjectReference
a PEM-encoded CA certificate (<code>caFile</code>)</p>
</td>
</tr>
<tr>
<td>
<code>suspend</code><br>
<em>
bool
</em>
</td>
<td>
<em>(Optional)</em>
<p>This flag tells the controller to suspend subsequent events handling.
Defaults to false.</p>
</td>
</tr>
</tbody>
</table>
</div>

6
go.mod
View File

@ -11,8 +11,8 @@ require (
github.com/Azure/go-amqp v0.13.6 // indirect
github.com/containrrr/shoutrrr v0.4.4
github.com/fluxcd/notification-controller/api v0.18.1
github.com/fluxcd/pkg/apis/meta v0.10.0
github.com/fluxcd/pkg/runtime v0.12.0
github.com/fluxcd/pkg/apis/meta v0.11.0-rc.1
github.com/fluxcd/pkg/runtime v0.13.0-rc.5
github.com/getsentry/sentry-go v0.11.0
github.com/go-logr/logr v0.4.0
github.com/google/go-github/v39 v39.0.0
@ -32,5 +32,5 @@ require (
k8s.io/api v0.21.3
k8s.io/apimachinery v0.21.3
k8s.io/client-go v0.21.3
sigs.k8s.io/controller-runtime v0.9.5
sigs.k8s.io/controller-runtime v0.9.6
)

36
go.sum
View File

@ -186,10 +186,12 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/fluxcd/pkg/apis/meta v0.10.0 h1:N7wVGHC1cyPdT87hrDC7UwCwRwnZdQM46PBSLjG2rlE=
github.com/fluxcd/pkg/apis/acl v0.0.1/go.mod h1:y3qOXUFObVWk7jzOjubMnr/u18j1kCeSi6olycnxr/E=
github.com/fluxcd/pkg/apis/meta v0.10.0/go.mod h1:CW9X9ijMTpNe7BwnokiUOrLl/h13miwVr/3abEQLbKE=
github.com/fluxcd/pkg/runtime v0.12.0 h1:BPZZ8bBkimpqGAPXqOf3LTaw+tcw6HgbWyCuzbbsJGs=
github.com/fluxcd/pkg/runtime v0.12.0/go.mod h1:EyaTR2TOYcjL5U//C4yH3bt2tvTgIOSXpVRbWxUn/C4=
github.com/fluxcd/pkg/apis/meta v0.11.0-rc.1 h1:RHHrztAFv9wmjM+Pk7Svt1UdD+1SdnQSp76MWFiM7Hg=
github.com/fluxcd/pkg/apis/meta v0.11.0-rc.1/go.mod h1:yUblM2vg+X8TE3A2VvJfdhkGmg+uqBlSPkLk7dxi0UM=
github.com/fluxcd/pkg/runtime v0.13.0-rc.5 h1:iawwyo9R7dQC52uh1opNFkGyq2coIYUtl5w4vc0+Pcc=
github.com/fluxcd/pkg/runtime v0.13.0-rc.5/go.mod h1:m4B1MCRh2OPMYC2kRdd+R10NHe/yDZ+a6sc2qPYii1M=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
@ -513,7 +515,6 @@ github.com/onsi/ginkgo v1.10.3/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+
github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
github.com/onsi/ginkgo v1.14.2/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY=
github.com/onsi/ginkgo v1.16.2/go.mod h1:CObGmKUOKaSC0RjmoAK7tKyn4Azo5P2IWuoMnvwxz1E=
github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc=
github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
@ -521,7 +522,6 @@ github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1Cpa
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.13.0/go.mod h1:lRk9szgn8TxENtWd0Tp4c3wjlRfMTMH27I+3Je41yGY=
github.com/onsi/gomega v1.14.0 h1:ep6kpPVwmr/nTbklSx2nrLNSIO62DoYAhnPNIMhK8gI=
github.com/onsi/gomega v1.14.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
@ -655,6 +655,7 @@ github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82/go.mod h1:lgjkn3NuSvDf
github.com/yudai/pp v2.0.1+incompatible/go.mod h1:PuxR/8QJ7cyCkFp/aUDS+JY727OFEZkTdatxwunjIkc=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.coder.com/go-tools v0.0.0-20190317003359-0c6a35b74a16/go.mod h1:iKV5yK9t+J5nG9O3uF6KYdPEz3dyfMyB15MN1rbQ8Qw=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
@ -675,7 +676,6 @@ go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo=
go.uber.org/zap v1.18.1 h1:CSUJ2mjFszzEWt4CdKISEuChVIXGBn3lAPwkRGyVrc4=
go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
goji.io v2.0.2+incompatible/go.mod h1:sbqFwrtqZACxLBTQcdgVjFh54yGVCvwq8+w49MVMMIk=
@ -730,6 +730,7 @@ golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzB
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.1-0.20200828183125-ce943fd02449/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180218175443-cbe0f9307d01/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@ -766,8 +767,10 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210224082022-3d97a244fca7/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/net v0.0.0-20210614182718-04defd469f4e h1:XpT3nA5TvE525Ne3hInMh6+GETgn27Zfm9dxsThnX2Q=
golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180227000427-d7d64896b5ff/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20181106182150-f42d05182288/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@ -784,6 +787,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180224232135-f6cff0780e54/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@ -841,8 +845,10 @@ golang.org/x/sys v0.0.0-20210113181707-4bcb84eeeb78/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210225134936-a50acf3fe073/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210426230700-d19ff857e887/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c h1:F1jZWGFhYfh0Ci55sIpILtKKK8p3i2/krTr0H1rg74I=
@ -909,8 +915,9 @@ golang.org/x/tools v0.0.0-20200505023115-26f46d2f7ef8/go.mod h1:EkVYQZoAsY45+roY
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.0 h1:po9/4sTYwZU9lPhi1tOrb4hCv3qrhiQ77LZfGa2OjwY=
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/tools v0.1.4 h1:cVngSRcfgyZCzys3KYOpCFa+4dqX/Oub9tAq00ttGVs=
golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@ -1030,23 +1037,18 @@ honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
k8s.io/api v0.21.1/go.mod h1:FstGROTmsSHBarKc8bylzXih8BLNYTiS3TZcsoEDg2s=
k8s.io/api v0.21.3 h1:cblWILbLO8ar+Fj6xdDGr603HRsf8Wu9E9rngJeprZQ=
k8s.io/api v0.21.3/go.mod h1:hUgeYHUbBp23Ue4qdX9tR8/ANi/g3ehylAqDn9NWVOg=
k8s.io/apiextensions-apiserver v0.21.1/go.mod h1:KESQFCGjqVcVsZ9g0xX5bacMjyX5emuWcS2arzdEouA=
k8s.io/apiextensions-apiserver v0.21.3 h1:+B6biyUWpqt41kz5x6peIsljlsuwvNAp/oFax/j2/aY=
k8s.io/apiextensions-apiserver v0.21.3/go.mod h1:kl6dap3Gd45+21Jnh6utCx8Z2xxLm8LGDkprcd+KbsE=
k8s.io/apimachinery v0.21.1/go.mod h1:jbreFvJo3ov9rj7eWT7+sYiRx+qZuCYXwWT1bcDswPY=
k8s.io/apimachinery v0.21.2/go.mod h1:CdTY8fU/BlvAbJ2z/8kBwimGki5Zp8/fbVuLY8gJumM=
k8s.io/apimachinery v0.21.3 h1:3Ju4nvjCngxxMYby0BimUk+pQHPOQp3eCGChk5kfVII=
k8s.io/apimachinery v0.21.3/go.mod h1:H/IM+5vH9kZRNJ4l3x/fXP/5bOPJaVP/guptnZPeCFI=
k8s.io/apiserver v0.21.1/go.mod h1:nLLYZvMWn35glJ4/FZRhzLG/3MPxAaZTgV4FJZdr+tY=
k8s.io/apiserver v0.21.3/go.mod h1:eDPWlZG6/cCCMj/JBcEpDoK+I+6i3r9GsChYBHSbAzU=
k8s.io/client-go v0.21.1/go.mod h1:/kEw4RgW+3xnBGzvp9IWxKSNA+lXn3A7AuH3gdOAzLs=
k8s.io/client-go v0.21.3 h1:J9nxZTOmvkInRDCzcSNQmPJbDYN/PjlxXT9Mos3HcLg=
k8s.io/client-go v0.21.3/go.mod h1:+VPhCgTsaFmGILxR/7E1N0S+ryO010QBeNCv5JwRGYU=
k8s.io/code-generator v0.21.1/go.mod h1:hUlps5+9QaTrKx+jiM4rmq7YmH8wPOIko64uZCHDh6Q=
k8s.io/code-generator v0.21.3/go.mod h1:K3y0Bv9Cz2cOW2vXUrNZlFbflhuPvuadW6JdnN6gGKo=
k8s.io/component-base v0.21.1/go.mod h1:NgzFZ2qu4m1juby4TnrmpR8adRk6ka62YdH5DkIIyKA=
k8s.io/component-base v0.21.3 h1:4WuuXY3Npa+iFfi2aDRiOz+anhNvRfye0859ZgfC5Og=
k8s.io/component-base v0.21.3/go.mod h1:kkuhtfEHeZM6LkX0saqSK8PbdO7A0HigUngmhhrwfGQ=
k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
@ -1058,7 +1060,6 @@ k8s.io/klog/v2 v2.8.0/go.mod h1:hy9LJ/NvuK+iVyP4Ehqva4HxZG/oXyIS3n3Jmire4Ec=
k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7 h1:vEx13qjvaZ4yfObSSXW7BrMc/KQBBT/Jyee8XtLf4x0=
k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7/go.mod h1:wXW5VT87nVfh/iLV8FpR2uDvrFyomxbtb1KivDbvPTE=
k8s.io/utils v0.0.0-20201110183641-67b214c5f920/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20210527160623-6fdb442a123b/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20210722164352-7f3ee0f31471 h1:DnzUXII7sVg1FJ/4JX6YDRJfLNAC7idRatPwe07suiI=
k8s.io/utils v0.0.0-20210722164352-7f3ee0f31471/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
mvdan.cc/sh v2.6.4+incompatible/go.mod h1:IeeQbZq+x2SUGBensq/jge5lLQbS3XT2ktyp3wrt4x8=
@ -1068,11 +1069,10 @@ nhooyr.io/websocket v1.8.6/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.15/go.mod h1:LEScyzhFmoF5pso/YSeBstl57mOzx9xlU9n85RGrDQg=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.19/go.mod h1:LEScyzhFmoF5pso/YSeBstl57mOzx9xlU9n85RGrDQg=
sigs.k8s.io/controller-runtime v0.9.0/go.mod h1:TgkfvrhhEw3PlI0BRL/5xM+89y3/yc0ZDfdbTl84si8=
sigs.k8s.io/controller-runtime v0.9.5 h1:WThcFE6cqctTn2jCZprLICO6BaKZfhsT37uAapTNfxc=
sigs.k8s.io/controller-runtime v0.9.5/go.mod h1:q6PpkM5vqQubEKUKOM6qr06oXGzOBcCby1DA9FbyZeA=
sigs.k8s.io/controller-runtime v0.9.6 h1:EevVMlgUj4fC1NVM4+DB3iPkWkmGRNarA66neqv9Qew=
sigs.k8s.io/controller-runtime v0.9.6/go.mod h1:q6PpkM5vqQubEKUKOM6qr06oXGzOBcCby1DA9FbyZeA=
sigs.k8s.io/structured-merge-diff/v4 v4.0.2/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw=
sigs.k8s.io/structured-merge-diff/v4 v4.1.0/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw=
sigs.k8s.io/structured-merge-diff/v4 v4.1.2 h1:Hr/htKFmJEbtMgS/UD0N+gtgctAqz81t3nu+sPzynno=

View File

@ -27,11 +27,10 @@ import (
"regexp"
"time"
"github.com/fluxcd/pkg/runtime/conditions"
corev1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/types"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/events"
"github.com/fluxcd/notification-controller/api/v1beta1"
@ -72,7 +71,7 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)
each_alert:
for _, alert := range allAlerts.Items {
// skip suspended and not ready alerts
isReady := apimeta.IsStatusConditionTrue(alert.Status.Conditions, meta.ReadyCondition)
isReady := conditions.IsReady(&alert)
if alert.Spec.Suspend || !isReady {
continue each_alert
}
@ -134,6 +133,10 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)
continue
}
if provider.Spec.Suspend {
continue
}
webhook := provider.Spec.Address
token := ""
if provider.Spec.SecretRef != nil {
@ -203,7 +206,7 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)
factory := notifier.NewFactory(webhook, provider.Spec.Proxy, provider.Spec.Username, provider.Spec.Channel, token, certPool)
sender, err := factory.Notifier(provider.Spec.Type)
if err != nil {
s.logger.Error(err, "failed to initialise provider",
s.logger.Error(err, "failed to initialize provider",
"reconciler kind", v1beta1.ProviderKind,
"name", providerName.Name,
"namespace", providerName.Namespace)

View File

@ -30,6 +30,7 @@ import (
"strings"
"time"
"github.com/fluxcd/pkg/runtime/conditions"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client"
@ -37,7 +38,6 @@ import (
"github.com/fluxcd/pkg/apis/meta"
"github.com/google/go-github/v39/github"
corev1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
@ -62,7 +62,7 @@ func (s *ReceiverServer) handlePayload() func(w http.ResponseWriter, r *http.Req
receivers := make([]v1beta1.Receiver, 0)
for _, receiver := range allReceivers.Items {
if !receiver.Spec.Suspend &&
apimeta.IsStatusConditionTrue(receiver.Status.Conditions, meta.ReadyCondition) &&
conditions.IsReady(&receiver) &&
receiver.Status.URL == fmt.Sprintf("/hook/%s", digest) {
receivers = append(receivers, receiver)
}

40
main.go
View File

@ -21,6 +21,7 @@ import (
"os"
"time"
helper "github.com/fluxcd/pkg/runtime/controller"
prommetrics "github.com/slok/go-http-metrics/metrics/prometheus"
"github.com/slok/go-http-metrics/middleware"
flag "github.com/spf13/pflag"
@ -33,14 +34,14 @@ import (
"github.com/fluxcd/pkg/runtime/client"
"github.com/fluxcd/pkg/runtime/leaderelection"
"github.com/fluxcd/pkg/runtime/logger"
"github.com/fluxcd/pkg/runtime/metrics"
"github.com/fluxcd/pkg/runtime/pprof"
"github.com/fluxcd/pkg/runtime/probes"
"github.com/sethvargo/go-limiter/memorystore"
"github.com/fluxcd/notification-controller/api/v1beta1"
"github.com/fluxcd/notification-controller/controllers"
"github.com/fluxcd/notification-controller/internal/server"
"github.com/sethvargo/go-limiter/memorystore"
// +kubebuilder:scaffold:imports
)
@ -88,9 +89,6 @@ func main() {
log := logger.NewLogger(logOptions)
ctrl.SetLogger(log)
metricsRecorder := metrics.NewRecorder()
crtlmetrics.Registry.MustRegister(metricsRecorder.Collectors()...)
watchNamespace := ""
if !watchAllNamespaces {
watchNamespace = os.Getenv("RUNTIME_NAMESPACE")
@ -119,27 +117,35 @@ func main() {
probes.SetupChecks(mgr, setupLog)
pprof.SetupHandlers(mgr, setupLog)
metricsH := helper.MustMakeMetrics(mgr)
if err = (&controllers.ProviderReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
MetricsRecorder: metricsRecorder,
}).SetupWithManager(mgr); err != nil {
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Metrics: metricsH,
}).SetupWithManagerAndOptions(mgr, controllers.ProviderReconcilerOptions{
MaxConcurrentReconciles: concurrent,
}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Provider")
os.Exit(1)
}
if err = (&controllers.AlertReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
MetricsRecorder: metricsRecorder,
}).SetupWithManager(mgr); err != nil {
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Metrics: metricsH,
}).SetupWithManagerAndOptions(mgr, controllers.AlertReconcilerOptions{
MaxConcurrentReconciles: concurrent,
}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Alert")
os.Exit(1)
}
if err = (&controllers.ReceiverReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
MetricsRecorder: metricsRecorder,
}).SetupWithManager(mgr); err != nil {
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Metrics: metricsH,
}).SetupWithManagerAndOptions(mgr, controllers.ReceiverReconcilerOptions{
MaxConcurrentReconciles: concurrent,
}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Receiver")
os.Exit(1)
}