notification-controller/internal/controller/alert_controller_test.go

549 lines
16 KiB
Go

/*
Copyright 2022 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/fluxcd/pkg/ssa"
. "github.com/onsi/gomega"
"github.com/sethvargo/go-limiter/memorystore"
prommetrics "github.com/slok/go-http-metrics/metrics/prometheus"
"github.com/slok/go-http-metrics/middleware"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
logf "sigs.k8s.io/controller-runtime/pkg/log"
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"
apiv1 "github.com/fluxcd/notification-controller/api/v1"
apiv1beta2 "github.com/fluxcd/notification-controller/api/v1beta2"
"github.com/fluxcd/notification-controller/internal/server"
)
func TestAlertReconciler_deleteBeforeFinalizer(t *testing.T) {
g := NewWithT(t)
namespaceName := "alert-" + randStringRunes(5)
namespace := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{Name: namespaceName},
}
g.Expect(k8sClient.Create(ctx, namespace)).ToNot(HaveOccurred())
t.Cleanup(func() {
g.Expect(k8sClient.Delete(ctx, namespace)).NotTo(HaveOccurred())
})
alert := &apiv1beta2.Alert{}
alert.Name = "test-alert"
alert.Namespace = namespaceName
alert.Spec.EventSources = []apiv1.CrossNamespaceObjectReference{
{Kind: "Bucket", Name: "Foo"},
}
// Add a test finalizer to prevent the object from getting deleted.
alert.SetFinalizers([]string{"test-finalizer"})
g.Expect(k8sClient.Create(ctx, alert)).NotTo(HaveOccurred())
// Add deletion timestamp by deleting the object.
g.Expect(k8sClient.Delete(ctx, alert)).NotTo(HaveOccurred())
r := &AlertReconciler{
Client: k8sClient,
EventRecorder: record.NewFakeRecorder(32),
}
// NOTE: Only a real API server responds with an error in this scenario.
_, err := r.Reconcile(ctx, ctrl.Request{NamespacedName: client.ObjectKeyFromObject(alert)})
g.Expect(err).NotTo(HaveOccurred())
}
func TestAlertReconciler_Reconcile(t *testing.T) {
g := NewWithT(t)
timeout := 5 * time.Second
resultA := &apiv1beta2.Alert{}
namespaceName := "alert-" + randStringRunes(5)
providerName := "provider-" + randStringRunes(5)
g.Expect(createNamespace(namespaceName)).NotTo(HaveOccurred(), "failed to create test namespace")
provider := &apiv1beta2.Provider{
ObjectMeta: metav1.ObjectMeta{
Name: providerName,
Namespace: namespaceName,
},
Spec: apiv1beta2.ProviderSpec{
Type: "generic",
Address: "https://webhook.internal",
},
}
alert := &apiv1beta2.Alert{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("alert-%s", randStringRunes(5)),
Namespace: namespaceName,
},
Spec: apiv1beta2.AlertSpec{
ProviderRef: meta.LocalObjectReference{
Name: providerName,
},
EventSeverity: "info",
EventSources: []apiv1.CrossNamespaceObjectReference{
{
Kind: "Bucket",
Name: "*",
},
},
},
}
g.Expect(k8sClient.Create(context.Background(), alert)).To(Succeed())
t.Run("fails with provider not found error", func(t *testing.T) {
g := NewWithT(t)
g.Eventually(func() bool {
_ = k8sClient.Get(context.Background(), client.ObjectKeyFromObject(alert), resultA)
return conditions.Has(resultA, meta.ReadyCondition)
}, timeout, time.Second).Should(BeTrue())
g.Expect(conditions.IsReady(resultA)).To(BeFalse())
g.Expect(conditions.GetReason(resultA, meta.ReadyCondition)).To(BeIdenticalTo(meta.FailedReason))
g.Expect(conditions.GetMessage(resultA, meta.ReadyCondition)).To(ContainSubstring(providerName))
g.Expect(conditions.Has(resultA, meta.ReconcilingCondition)).To(BeTrue())
g.Expect(conditions.GetReason(resultA, meta.ReconcilingCondition)).To(BeIdenticalTo(meta.ProgressingWithRetryReason))
g.Expect(conditions.GetObservedGeneration(resultA, meta.ReconcilingCondition)).To(BeIdenticalTo(resultA.Generation))
g.Expect(controllerutil.ContainsFinalizer(resultA, apiv1.NotificationFinalizer)).To(BeTrue())
})
t.Run("recovers when provider exists", func(t *testing.T) {
g := NewWithT(t)
g.Expect(k8sClient.Create(context.Background(), provider)).To(Succeed())
g.Eventually(func() bool {
_ = k8sClient.Get(context.Background(), client.ObjectKeyFromObject(alert), resultA)
return conditions.IsReady(resultA)
}, timeout, time.Second).Should(BeTrue())
g.Expect(conditions.GetObservedGeneration(resultA, meta.ReadyCondition)).To(BeIdenticalTo(resultA.Generation))
g.Expect(resultA.Status.ObservedGeneration).To(BeIdenticalTo(resultA.Generation))
g.Expect(conditions.Has(resultA, meta.ReconcilingCondition)).To(BeFalse())
})
t.Run("handles reconcileAt", func(t *testing.T) {
g := NewWithT(t)
g.Expect(k8sClient.Get(context.Background(), client.ObjectKeyFromObject(alert), resultA)).To(Succeed())
reconcileRequestAt := metav1.Now().String()
resultA.SetAnnotations(map[string]string{
meta.ReconcileRequestAnnotation: reconcileRequestAt,
})
g.Expect(k8sClient.Update(context.Background(), resultA)).To(Succeed())
g.Eventually(func() bool {
_ = k8sClient.Get(context.Background(), client.ObjectKeyFromObject(alert), resultA)
return resultA.Status.LastHandledReconcileAt == reconcileRequestAt
}, timeout, time.Second).Should(BeTrue())
})
t.Run("finalizes suspended object", func(t *testing.T) {
g := NewWithT(t)
g.Expect(k8sClient.Get(context.Background(), client.ObjectKeyFromObject(alert), resultA)).To(Succeed())
resultA.Spec.Suspend = true
g.Expect(k8sClient.Update(context.Background(), resultA)).To(Succeed())
g.Eventually(func() bool {
_ = k8sClient.Get(context.Background(), client.ObjectKeyFromObject(alert), resultA)
return resultA.Spec.Suspend == true
}, timeout, time.Second).Should(BeTrue())
g.Expect(k8sClient.Delete(context.Background(), resultA)).To(Succeed())
g.Eventually(func() bool {
err := k8sClient.Get(context.Background(), client.ObjectKeyFromObject(alert), resultA)
return apierrors.IsNotFound(err)
}, timeout, time.Second).Should(BeTrue())
})
}
func TestAlertReconciler_EventHandler(t *testing.T) {
g := NewWithT(t)
var (
namespace = "events-" + randStringRunes(5)
req *http.Request
provider *apiv1beta2.Provider
)
g.Expect(createNamespace(namespace)).NotTo(HaveOccurred(), "failed to create test namespace")
eventMdlw := middleware.New(middleware.Config{
Recorder: prommetrics.NewRecorder(prommetrics.Config{
Prefix: "gotk_event",
}),
})
store, err := memorystore.New(&memorystore.Config{
Interval: 5 * time.Minute,
})
if err != nil {
t.Fatalf("failed to create memory storage")
}
eventServer := server.NewEventServer("127.0.0.1:56789", logf.Log, k8sClient, true)
stopCh := make(chan struct{})
go eventServer.ListenAndServe(stopCh, eventMdlw, store)
rcvServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
req = r
w.WriteHeader(200)
}))
defer rcvServer.Close()
defer close(stopCh)
providerKey := types.NamespacedName{
Name: fmt.Sprintf("provider-%s", randStringRunes(5)),
Namespace: namespace,
}
provider = &apiv1beta2.Provider{
ObjectMeta: metav1.ObjectMeta{
Name: providerKey.Name,
Namespace: providerKey.Namespace,
},
Spec: apiv1beta2.ProviderSpec{
Type: "generic",
Address: rcvServer.URL,
},
}
g.Expect(k8sClient.Create(context.Background(), provider)).To(Succeed())
g.Eventually(func() bool {
var obj apiv1beta2.Provider
g.Expect(k8sClient.Get(context.Background(), client.ObjectKeyFromObject(provider), &obj))
return conditions.IsReady(&obj)
}, 30*time.Second, time.Second).Should(BeTrue())
repo, err := readManifest("./testdata/repo.yaml", namespace)
g.Expect(err).ToNot(HaveOccurred())
secondRepo, err := readManifest("./testdata/gitrepo2.yaml", namespace)
g.Expect(err).ToNot(HaveOccurred())
_, err = manager.Apply(context.Background(), repo, ssa.ApplyOptions{
Force: true,
})
g.Expect(err).ToNot(HaveOccurred())
_, err = manager.Apply(context.Background(), secondRepo, ssa.ApplyOptions{
Force: true,
})
g.Expect(err).ToNot(HaveOccurred())
alertKey := types.NamespacedName{
Name: fmt.Sprintf("alert-%s", randStringRunes(5)),
Namespace: namespace,
}
alert := &apiv1beta2.Alert{
ObjectMeta: metav1.ObjectMeta{
Name: alertKey.Name,
Namespace: alertKey.Namespace,
},
Spec: apiv1beta2.AlertSpec{
ProviderRef: meta.LocalObjectReference{
Name: providerKey.Name,
},
EventSeverity: "info",
EventSources: []apiv1.CrossNamespaceObjectReference{
{
Kind: "Bucket",
Name: "hyacinth",
Namespace: namespace,
},
{
Kind: "Kustomization",
Name: "*",
},
{
Kind: "GitRepository",
Name: "*",
MatchLabels: map[string]string{
"app": "podinfo",
},
},
{
Kind: "Kustomization",
Name: "*",
Namespace: "test",
},
},
ExclusionList: []string{
"doesnotoccur", // not intended to match
"excluded",
},
},
}
inclusionAlert := alert.DeepCopy()
inclusionAlert.Spec.InclusionList = []string{"^included"}
g.Expect(k8sClient.Create(context.Background(), alert)).To(Succeed())
// wait for controller to mark the alert as ready
g.Eventually(func() bool {
var obj apiv1beta2.Alert
g.Expect(k8sClient.Get(context.Background(), client.ObjectKeyFromObject(alert), &obj))
return conditions.IsReady(&obj)
}, 30*time.Second, time.Second).Should(BeTrue())
// An event fixture to set the initial fixed state of an Event which is
// modified in the test cases.
eventFixture := eventv1.Event{
InvolvedObject: corev1.ObjectReference{
Kind: "Bucket",
Name: "hyacinth",
Namespace: namespace,
},
Severity: "info",
Timestamp: metav1.Now(),
Message: "well that happened",
Reason: "event-happened",
ReportingController: "source-controller",
}
event := *eventFixture.DeepCopy()
testSent := func(g *WithT) {
g.THelper()
buf := &bytes.Buffer{}
g.Expect(json.NewEncoder(buf).Encode(&event)).To(Succeed())
res, err := http.Post("http://localhost:56789/", "application/json", buf)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(res.StatusCode).To(Equal(202)) // event_server responds with 202 Accepted
}
testForwarded := func(g *WithT) {
g.THelper()
g.Eventually(func() bool {
return req == nil
}, "2s", "0.1s").Should(BeFalse())
}
testFiltered := func(g *WithT) {
g.THelper()
// The event_server does forwarding in a goroutine, after
// responding to the POST of the event. This makes it
// difficult to know whether the provider has filtered the
// event, or just not run the goroutine yet. For now, I'll use
// a timeout (and Consistently so it can fail early)
g.Consistently(func() bool {
return req == nil
}, "1s", "0.1s").Should(BeTrue())
}
tests := []struct {
name string
modifyEventFunc func(e eventv1.Event) eventv1.Event
forwarded bool
}{
{
name: "forwards when source is a match",
modifyEventFunc: func(e eventv1.Event) eventv1.Event { return e },
forwarded: true,
},
{
name: "drops event when source Kind does not match",
modifyEventFunc: func(e eventv1.Event) eventv1.Event {
e.InvolvedObject.Kind = "GitRepository"
return e
},
forwarded: false,
},
{
name: "drops event when source name does not match",
modifyEventFunc: func(e eventv1.Event) eventv1.Event {
e.InvolvedObject.Name = "slop"
return e
},
forwarded: false,
},
{
name: "drops event when source namespace does not match",
modifyEventFunc: func(e eventv1.Event) eventv1.Event {
e.InvolvedObject.Namespace = "all-buckets"
return e
},
forwarded: false,
},
{
name: "drops event that is matched by exclusion",
modifyEventFunc: func(e eventv1.Event) eventv1.Event {
e.Message = "this is excluded"
return e
},
forwarded: false,
},
{
name: "forwards events when name wildcard is used",
modifyEventFunc: func(e eventv1.Event) eventv1.Event {
e.InvolvedObject.Kind = "Kustomization"
e.InvolvedObject.Name = "test"
e.InvolvedObject.Namespace = namespace
e.Message = "test"
return e
},
forwarded: true,
},
{
name: "forwards events when the label matches",
modifyEventFunc: func(e eventv1.Event) eventv1.Event {
e.InvolvedObject.Kind = "GitRepository"
e.InvolvedObject.Name = "podinfo"
e.InvolvedObject.APIVersion = "source.toolkit.fluxcd.io/v1"
e.InvolvedObject.Namespace = namespace
e.Message = "test"
return e
},
forwarded: true,
},
{
name: "drops events when the labels don't match",
modifyEventFunc: func(e eventv1.Event) eventv1.Event {
e.InvolvedObject.Kind = "GitRepository"
e.InvolvedObject.Name = "podinfo-two"
e.InvolvedObject.APIVersion = "source.toolkit.fluxcd.io/v1"
e.InvolvedObject.Namespace = namespace
e.Message = "test"
return e
},
forwarded: false,
},
{
name: "drops events for cross-namespace sources",
modifyEventFunc: func(e eventv1.Event) eventv1.Event {
e.InvolvedObject.Kind = "Kustomization"
e.InvolvedObject.Name = "test"
e.InvolvedObject.Namespace = "test"
e.Message = "test"
return e
},
forwarded: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
// Reset the common variables to their fixture value.
req = nil
event = *eventFixture.DeepCopy()
event = tt.modifyEventFunc(event)
testSent(g)
if tt.forwarded {
testForwarded(g)
} else {
testFiltered(g)
}
})
}
// update alert for testing inclusion list
var obj apiv1beta2.Alert
g.Expect(k8sClient.Get(context.Background(), client.ObjectKeyFromObject(alert), &obj)).To(Succeed())
inclusionAlert.ResourceVersion = obj.ResourceVersion
g.Expect(k8sClient.Update(context.Background(), inclusionAlert)).To(Succeed())
// wait for ready
g.Eventually(func() bool {
var obj apiv1beta2.Alert
g.Expect(k8sClient.Get(context.Background(), client.ObjectKeyFromObject(inclusionAlert), &obj))
return conditions.IsReady(&obj)
}, 30*time.Second, time.Second).Should(BeTrue())
// An event fixture to set the initial fixed state of an Event which is
// modified in the test cases.
eventFixture2 := eventv1.Event{
InvolvedObject: corev1.ObjectReference{
Kind: "Bucket",
Name: "hyacinth",
Namespace: namespace,
},
Severity: "info",
Timestamp: metav1.Now(),
Message: "included",
Reason: "event-happened",
ReportingController: "source-controller",
}
event = *eventFixture2.DeepCopy()
tests = []struct {
name string
modifyEventFunc func(e eventv1.Event) eventv1.Event
forwarded bool
}{
{
name: "forwards when message matches inclusion list",
modifyEventFunc: func(e eventv1.Event) eventv1.Event { return e },
forwarded: true,
},
{
name: "drops when message does not match inclusion list",
modifyEventFunc: func(e eventv1.Event) eventv1.Event {
e.Message = "not included"
return e
},
forwarded: false,
},
{
name: "drops when message matches inclusion list and exclusion list",
modifyEventFunc: func(e eventv1.Event) eventv1.Event {
e.Message = "included excluded"
return e
},
forwarded: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
// Reset the common variables to their fixture value.
req = nil
event = *eventFixture2.DeepCopy()
event = tt.modifyEventFunc(event)
testSent(g)
if tt.forwarded {
testForwarded(g)
} else {
testFiltered(g)
}
})
}
}