Reduce logging repeated code in event server
Signed-off-by: Matheus Pimenta <matheuscscp@gmail.com>
This commit is contained in:
parent
189755192c
commit
ae6cc4b4f6
|
|
@ -30,6 +30,8 @@ import (
|
|||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/log"
|
||||
"sigs.k8s.io/yaml"
|
||||
|
||||
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
|
||||
|
|
@ -43,6 +45,7 @@ import (
|
|||
func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request) {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
event := r.Context().Value(eventContextKey{}).(*eventv1.Event)
|
||||
eventLogger := log.FromContext(r.Context())
|
||||
|
||||
ctx, cancel := context.WithTimeout(r.Context(), 15*time.Second)
|
||||
defer cancel()
|
||||
|
|
@ -50,7 +53,7 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)
|
|||
var allAlerts apiv1beta2.AlertList
|
||||
err := s.kubeClient.List(ctx, &allAlerts)
|
||||
if err != nil {
|
||||
s.logger.Error(err, "listing alerts failed")
|
||||
eventLogger.Error(err, "listing alerts failed")
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
|
@ -59,6 +62,9 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)
|
|||
alerts := make([]apiv1beta2.Alert, 0)
|
||||
each_alert:
|
||||
for _, alert := range allAlerts.Items {
|
||||
alertLogger := eventLogger.WithValues("alert", client.ObjectKeyFromObject(&alert))
|
||||
ctx := log.IntoContext(ctx, alertLogger)
|
||||
|
||||
// skip suspended and not ready alerts
|
||||
isReady := conditions.IsReady(&alert)
|
||||
if alert.Spec.Suspend || !isReady {
|
||||
|
|
@ -75,7 +81,7 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)
|
|||
break
|
||||
}
|
||||
} else {
|
||||
s.logger.Error(err, fmt.Sprintf("failed to compile inclusion regex: %s", inclusionRegex))
|
||||
alertLogger.Error(err, fmt.Sprintf("failed to compile inclusion regex: %s", inclusionRegex))
|
||||
}
|
||||
}
|
||||
if !include {
|
||||
|
|
@ -91,7 +97,7 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)
|
|||
continue each_alert
|
||||
}
|
||||
} else {
|
||||
s.logger.Error(err, fmt.Sprintf("failed to compile exclusion regex: %s", exclusionRegex))
|
||||
alertLogger.Error(err, fmt.Sprintf("failed to compile exclusion regex: %s", exclusionRegex))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -109,27 +115,24 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)
|
|||
}
|
||||
|
||||
if len(alerts) == 0 {
|
||||
s.logger.Info("Discarding event, no alerts found for the involved object",
|
||||
"reconciler kind", event.InvolvedObject.Kind,
|
||||
"name", event.InvolvedObject.Name,
|
||||
"namespace", event.InvolvedObject.Namespace)
|
||||
eventLogger.Info("Discarding event, no alerts found for the involved object")
|
||||
w.WriteHeader(http.StatusAccepted)
|
||||
return
|
||||
}
|
||||
|
||||
s.logger.Info(fmt.Sprintf("Dispatching event: %s", event.Message),
|
||||
"reconciler kind", event.InvolvedObject.Kind,
|
||||
"name", event.InvolvedObject.Name,
|
||||
"namespace", event.InvolvedObject.Namespace)
|
||||
eventLogger.Info(fmt.Sprintf("Dispatching event: %s", event.Message))
|
||||
|
||||
// dispatch notifications
|
||||
for _, alert := range alerts {
|
||||
alertLogger := eventLogger.WithValues("alert", client.ObjectKeyFromObject(&alert))
|
||||
ctx := log.IntoContext(ctx, alertLogger)
|
||||
|
||||
// verify if event comes from a different namespace
|
||||
if s.noCrossNamespaceRefs && event.InvolvedObject.Namespace != alert.Namespace {
|
||||
accessDenied := fmt.Errorf(
|
||||
"alert '%s/%s' can't process event from '%s/%s/%s', cross-namespace references have been blocked",
|
||||
alert.Namespace, alert.Name, event.InvolvedObject.Kind, event.InvolvedObject.Namespace, event.InvolvedObject.Name)
|
||||
s.logger.Error(accessDenied, "Discarding event, access denied to cross-namespace sources")
|
||||
alertLogger.Error(accessDenied, "Discarding event, access denied to cross-namespace sources")
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
@ -138,10 +141,7 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)
|
|||
|
||||
err = s.kubeClient.Get(ctx, providerName, &provider)
|
||||
if err != nil {
|
||||
s.logger.Error(err, "failed to read provider",
|
||||
"reconciler kind", apiv1beta2.ProviderKind,
|
||||
"name", providerName.Name,
|
||||
"namespace", providerName.Namespace)
|
||||
alertLogger.Error(err, "failed to read provider")
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
@ -161,10 +161,7 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)
|
|||
|
||||
err = s.kubeClient.Get(ctx, secretName, &secret)
|
||||
if err != nil {
|
||||
s.logger.Error(err, "failed to read secret",
|
||||
"reconciler kind", apiv1beta2.ProviderKind,
|
||||
"name", providerName.Name,
|
||||
"namespace", providerName.Namespace)
|
||||
alertLogger.Error(err, "failed to read secret")
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
@ -191,10 +188,7 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)
|
|||
if h, ok := secret.Data["headers"]; ok {
|
||||
err := yaml.Unmarshal(h, &headers)
|
||||
if err != nil {
|
||||
s.logger.Error(err, "failed to read headers from secret",
|
||||
"reconciler kind", apiv1beta2.ProviderKind,
|
||||
"name", providerName.Name,
|
||||
"namespace", providerName.Namespace)
|
||||
alertLogger.Error(err, "failed to read headers from secret")
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
|
@ -207,57 +201,43 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)
|
|||
|
||||
err = s.kubeClient.Get(ctx, secretName, &secret)
|
||||
if err != nil {
|
||||
s.logger.Error(err, "failed to read secret",
|
||||
"reconciler kind", apiv1beta2.ProviderKind,
|
||||
"name", providerName.Name,
|
||||
"namespace", providerName.Namespace)
|
||||
alertLogger.Error(err, "failed to read cert secret")
|
||||
continue
|
||||
}
|
||||
|
||||
caFile, ok := secret.Data["caFile"]
|
||||
if !ok {
|
||||
s.logger.Error(err, "failed to read secret key caFile",
|
||||
"reconciler kind", apiv1beta2.ProviderKind,
|
||||
"name", providerName.Name,
|
||||
"namespace", providerName.Namespace)
|
||||
alertLogger.Error(err, "failed to read secret key caFile")
|
||||
continue
|
||||
}
|
||||
|
||||
certPool = x509.NewCertPool()
|
||||
ok = certPool.AppendCertsFromPEM(caFile)
|
||||
if !ok {
|
||||
s.logger.Error(err, "could not append to cert pool",
|
||||
"reconciler kind", apiv1beta2.ProviderKind,
|
||||
"name", providerName.Name,
|
||||
"namespace", providerName.Namespace)
|
||||
alertLogger.Error(err, "could not append to cert pool")
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if webhook == "" {
|
||||
s.logger.Error(nil, "provider has no address",
|
||||
"reconciler kind", apiv1beta2.ProviderKind,
|
||||
"name", providerName.Name,
|
||||
"namespace", providerName.Namespace)
|
||||
alertLogger.Error(nil, "provider has no address")
|
||||
continue
|
||||
}
|
||||
|
||||
factory := notifier.NewFactory(webhook, proxy, username, provider.Spec.Channel, token, headers, certPool, password, string(provider.UID))
|
||||
sender, err := factory.Notifier(provider.Spec.Type)
|
||||
if err != nil {
|
||||
s.logger.Error(err, "failed to initialize provider",
|
||||
"reconciler kind", apiv1beta2.ProviderKind,
|
||||
"name", providerName.Name,
|
||||
"namespace", providerName.Namespace)
|
||||
alertLogger.Error(err, "failed to initialize provider")
|
||||
continue
|
||||
}
|
||||
|
||||
notification := *event.DeepCopy()
|
||||
s.enhanceEventWithAlertMetadata(¬ification, alert)
|
||||
s.enhanceEventWithAlertMetadata(ctx, ¬ification, alert)
|
||||
|
||||
go func(n notifier.Interface, e eventv1.Event) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), provider.GetTimeout())
|
||||
defer cancel()
|
||||
ctx = log.IntoContext(ctx, alertLogger)
|
||||
if err := n.Post(ctx, e); err != nil {
|
||||
maskedErrStr, maskErr := masktoken.MaskTokenFromString(err.Error(), token)
|
||||
if maskErr != nil {
|
||||
|
|
@ -265,10 +245,7 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)
|
|||
} else {
|
||||
err = errors.New(maskedErrStr)
|
||||
}
|
||||
s.logger.Error(err, "failed to send notification",
|
||||
"reconciler kind", event.InvolvedObject.Kind,
|
||||
"name", event.InvolvedObject.Name,
|
||||
"namespace", event.InvolvedObject.Namespace)
|
||||
alertLogger.Error(err, "failed to send notification")
|
||||
}
|
||||
}(sender, notification)
|
||||
}
|
||||
|
|
@ -278,6 +255,8 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)
|
|||
}
|
||||
|
||||
func (s *EventServer) eventMatchesAlert(ctx context.Context, event *eventv1.Event, source apiv1.CrossNamespaceObjectReference, severity string) bool {
|
||||
alertLogger := log.FromContext(ctx)
|
||||
|
||||
if event.InvolvedObject.Namespace == source.Namespace && event.InvolvedObject.Kind == source.Kind {
|
||||
if event.Severity == severity || severity == eventv1.EventSeverityInfo {
|
||||
labelMatch := true
|
||||
|
|
@ -291,15 +270,14 @@ func (s *EventServer) eventMatchesAlert(ctx context.Context, event *eventv1.Even
|
|||
Namespace: event.InvolvedObject.Namespace,
|
||||
Name: event.InvolvedObject.Name,
|
||||
}, &obj); err != nil {
|
||||
s.logger.Error(err, "error getting object", "kind", event.InvolvedObject.Kind,
|
||||
"name", event.InvolvedObject.Name, "apiVersion", event.InvolvedObject.APIVersion)
|
||||
alertLogger.Error(err, "error getting the involved object")
|
||||
}
|
||||
|
||||
sel, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
|
||||
MatchLabels: source.MatchLabels,
|
||||
})
|
||||
if err != nil {
|
||||
s.logger.Error(err, fmt.Sprintf("error using matchLabels from event source '%s'", source.Name))
|
||||
alertLogger.Error(err, fmt.Sprintf("error using matchLabels from event source '%s'", source.Name))
|
||||
}
|
||||
|
||||
labelMatch = sel.Matches(labels.Set(obj.GetLabels()))
|
||||
|
|
@ -314,7 +292,7 @@ func (s *EventServer) eventMatchesAlert(ctx context.Context, event *eventv1.Even
|
|||
return false
|
||||
}
|
||||
|
||||
func (s *EventServer) enhanceEventWithAlertMetadata(event *eventv1.Event, alert apiv1beta2.Alert) {
|
||||
func (s *EventServer) enhanceEventWithAlertMetadata(ctx context.Context, event *eventv1.Event, alert apiv1beta2.Alert) {
|
||||
meta := event.Metadata
|
||||
if meta == nil {
|
||||
meta = make(map[string]string)
|
||||
|
|
@ -324,11 +302,8 @@ func (s *EventServer) enhanceEventWithAlertMetadata(event *eventv1.Event, alert
|
|||
if _, alreadyPresent := meta[key]; !alreadyPresent {
|
||||
meta[key] = value
|
||||
} else {
|
||||
s.logger.Info("metadata key found in the existing set of metadata",
|
||||
"reconciler kind", apiv1beta2.AlertKind,
|
||||
"name", alert.Name,
|
||||
"namespace", alert.Namespace,
|
||||
"key", key)
|
||||
log.FromContext(ctx).
|
||||
Info("metadata key found in the existing set of metadata", "key", key)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/go-logr/logr"
|
||||
|
|
@ -99,7 +100,7 @@ func TestEnhanceEventWithAlertMetadata(t *testing.T) {
|
|||
t.Run(name, func(t *testing.T) {
|
||||
g := NewGomegaWithT(t)
|
||||
|
||||
s.enhanceEventWithAlertMetadata(&tt.event, tt.alert)
|
||||
s.enhanceEventWithAlertMetadata(context.Background(), &tt.event, tt.alert)
|
||||
g.Expect(tt.event.Metadata).To(BeEquivalentTo(tt.expectedMetadata))
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -34,6 +34,7 @@ import (
|
|||
"github.com/slok/go-http-metrics/middleware"
|
||||
"github.com/slok/go-http-metrics/middleware/std"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/log"
|
||||
|
||||
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
|
||||
)
|
||||
|
|
@ -68,8 +69,8 @@ func (s *EventServer) ListenAndServe(stopCh <-chan struct{}, mdlw middleware.Mid
|
|||
var handler http.Handler = http.HandlerFunc(s.handleEvent())
|
||||
for _, middleware := range []func(http.Handler) http.Handler{
|
||||
limitMiddleware.Handle,
|
||||
s.logRateLimitMiddleware,
|
||||
s.cleanupMetadataMiddleware,
|
||||
logRateLimitMiddleware,
|
||||
s.eventMiddleware,
|
||||
} {
|
||||
handler = middleware(handler)
|
||||
}
|
||||
|
|
@ -100,10 +101,12 @@ func (s *EventServer) ListenAndServe(stopCh <-chan struct{}, mdlw middleware.Mid
|
|||
}
|
||||
}
|
||||
|
||||
// cleanupMetadataMiddleware cleans up the metadata using cleanupMetadata() and
|
||||
// eventMiddleware cleans up the event metadata using cleanupMetadata() and
|
||||
// adds the cleaned event in the request context which can then be queried and
|
||||
// used directly by the other http handlers.
|
||||
func (s *EventServer) cleanupMetadataMiddleware(h http.Handler) http.Handler {
|
||||
// used directly by the other http handlers. This middleware also adds a
|
||||
// logger with the event's involved object's reference information to the
|
||||
// request context.
|
||||
func (s *EventServer) eventMiddleware(h http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
body, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
|
|
@ -124,10 +127,13 @@ func (s *EventServer) cleanupMetadataMiddleware(h http.Handler) http.Handler {
|
|||
|
||||
cleanupMetadata(event)
|
||||
|
||||
ctxWithEvent := context.WithValue(r.Context(), eventContextKey{}, event)
|
||||
reqWithEvent := r.WithContext(ctxWithEvent)
|
||||
eventLogger := s.logger.WithValues("eventInvolvedObject", event.InvolvedObject)
|
||||
|
||||
h.ServeHTTP(w, reqWithEvent)
|
||||
enhancedCtx := context.WithValue(r.Context(), eventContextKey{}, event)
|
||||
enhancedCtx = log.IntoContext(enhancedCtx, eventLogger)
|
||||
enhancedReq := r.WithContext(enhancedCtx)
|
||||
|
||||
h.ServeHTTP(w, enhancedReq)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -172,7 +178,7 @@ func (r *statusRecorder) WriteHeader(status int) {
|
|||
r.ResponseWriter.WriteHeader(status)
|
||||
}
|
||||
|
||||
func (s *EventServer) logRateLimitMiddleware(h http.Handler) http.Handler {
|
||||
func logRateLimitMiddleware(h http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
recorder := &statusRecorder{
|
||||
ResponseWriter: w,
|
||||
|
|
@ -181,11 +187,8 @@ func (s *EventServer) logRateLimitMiddleware(h http.Handler) http.Handler {
|
|||
h.ServeHTTP(recorder, r)
|
||||
|
||||
if recorder.Status == http.StatusTooManyRequests {
|
||||
event := r.Context().Value(eventContextKey{}).(*eventv1.Event)
|
||||
s.logger.V(1).Info("Discarding event, rate limiting duplicate events",
|
||||
"reconciler kind", event.InvolvedObject.Kind,
|
||||
"name", event.InvolvedObject.Name,
|
||||
"namespace", event.InvolvedObject.Namespace)
|
||||
log.FromContext(r.Context()).V(1).
|
||||
Info("Discarding event, rate limiting duplicate events")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue