Implement Receiver resource filtering with CEL
Signed-off-by: Kevin McDermott <bigkevmcd@gmail.com> Co-authored-by: Matheus Pimenta <matheuscscp@gmail.com>
This commit is contained in:
parent
9b83efa21f
commit
28deef923f
|
|
@ -67,6 +67,16 @@ type ReceiverSpec struct {
|
|||
// +required
|
||||
Resources []CrossNamespaceObjectReference `json:"resources"`
|
||||
|
||||
// ResourceFilter is a CEL expression expected to return a boolean that is
|
||||
// evaluated for each resource referenced in the Resources field when a
|
||||
// webhook is received. If the expression returns false then the controller
|
||||
// will not request a reconciliation for the resource.
|
||||
// When the expression is specified the controller will parse it and mark
|
||||
// the object as terminally failed if the expression is invalid or does not
|
||||
// return a boolean.
|
||||
// +optional
|
||||
ResourceFilter string `json:"resourceFilter,omitempty"`
|
||||
|
||||
// SecretRef specifies the Secret containing the token used
|
||||
// to validate the payload authenticity.
|
||||
// +required
|
||||
|
|
|
|||
|
|
@ -62,6 +62,16 @@ spec:
|
|||
Secret references.
|
||||
pattern: ^([0-9]+(\.[0-9]+)?(ms|s|m|h))+$
|
||||
type: string
|
||||
resourceFilter:
|
||||
description: |-
|
||||
ResourceFilter is a CEL expression expected to return a boolean that is
|
||||
evaluated for each resource referenced in the Resources field when a
|
||||
webhook is received. If the expression returns false then the controller
|
||||
will not request a reconciliation for the resource.
|
||||
When the expression is specified the controller will parse it and mark
|
||||
the object as terminally failed if the expression is invalid or does not
|
||||
return a boolean.
|
||||
type: string
|
||||
resources:
|
||||
description: A list of resources to be notified about changes.
|
||||
items:
|
||||
|
|
|
|||
|
|
@ -122,6 +122,24 @@ e.g. ‘push’ for GitHub or ‘Push Hook’ for GitLab.</p>
|
|||
</tr>
|
||||
<tr>
|
||||
<td>
|
||||
<code>resourceFilter</code><br>
|
||||
<em>
|
||||
string
|
||||
</em>
|
||||
</td>
|
||||
<td>
|
||||
<em>(Optional)</em>
|
||||
<p>ResourceFilter is a CEL expression expected to return a boolean that is
|
||||
evaluated for each resource referenced in the Resources field when a
|
||||
webhook is received. If the expression returns false then the controller
|
||||
will not request a reconciliation for the resource.
|
||||
When the expression is specified the controller will parse it and mark
|
||||
the object as terminally failed if the expression is invalid or does not
|
||||
return a boolean.</p>
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>
|
||||
<code>secretRef</code><br>
|
||||
<em>
|
||||
<a href="https://pkg.go.dev/github.com/fluxcd/pkg/apis/meta#LocalObjectReference">
|
||||
|
|
@ -321,6 +339,24 @@ e.g. ‘push’ for GitHub or ‘Push Hook’ for GitLab.</p>
|
|||
</tr>
|
||||
<tr>
|
||||
<td>
|
||||
<code>resourceFilter</code><br>
|
||||
<em>
|
||||
string
|
||||
</em>
|
||||
</td>
|
||||
<td>
|
||||
<em>(Optional)</em>
|
||||
<p>ResourceFilter is a CEL expression expected to return a boolean that is
|
||||
evaluated for each resource referenced in the Resources field when a
|
||||
webhook is received. If the expression returns false then the controller
|
||||
will not request a reconciliation for the resource.
|
||||
When the expression is specified the controller will parse it and mark
|
||||
the object as terminally failed if the expression is invalid or does not
|
||||
return a boolean.</p>
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>
|
||||
<code>secretRef</code><br>
|
||||
<em>
|
||||
<a href="https://pkg.go.dev/github.com/fluxcd/pkg/apis/meta#LocalObjectReference">
|
||||
|
|
|
|||
|
|
@ -700,6 +700,75 @@ resources:
|
|||
**Note:** Cross-namespace references [can be disabled for security
|
||||
reasons](#disabling-cross-namespace-selectors).
|
||||
|
||||
#### Filtering reconciled objects with CEL
|
||||
|
||||
To filter the resources that are reconciled you can use [Common Expression Language (CEL)](https://cel.dev/).
|
||||
|
||||
For example, to trigger `ImageRepositories` on notifications from [Google Artifact Registry](https://cloud.google.com/artifact-registry/docs/configure-notifications#examples) you can define the following receiver:
|
||||
|
||||
```yaml
|
||||
apiVersion: notification.toolkit.fluxcd.io/v1
|
||||
kind: Receiver
|
||||
metadata:
|
||||
name: gar-receiver
|
||||
namespace: apps
|
||||
spec:
|
||||
type: gcr
|
||||
secretRef:
|
||||
name: flux-gar-token
|
||||
resources:
|
||||
- apiVersion: image.toolkit.fluxcd.io/v1beta2
|
||||
kind: ImageRepository
|
||||
name: "*"
|
||||
matchLabels:
|
||||
registry: gar
|
||||
```
|
||||
|
||||
This will trigger the reconciliation of all `ImageRepositories` with the label `registry: gar`.
|
||||
|
||||
But if you want to only notify `ImageRepository` resources that are referenced from the incoming hook you can use CEL to filter the resources.
|
||||
|
||||
```yaml
|
||||
apiVersion: notification.toolkit.fluxcd.io/v1
|
||||
kind: Receiver
|
||||
metadata:
|
||||
name: gar-receiver
|
||||
namespace: apps
|
||||
spec:
|
||||
type: gcr
|
||||
secretRef:
|
||||
name: flux-gar-token
|
||||
resources:
|
||||
- apiVersion: image.toolkit.fluxcd.io/v1beta2
|
||||
kind: ImageRepository
|
||||
name: "*"
|
||||
matchLabels:
|
||||
registry: gar
|
||||
resourceFilter: 'req.tag.contains(res.metadata.name)'
|
||||
```
|
||||
|
||||
If the body of the incoming hook looks like this:
|
||||
|
||||
```json
|
||||
{
|
||||
"action":"INSERT",
|
||||
"digest":"us-east1-docker.pkg.dev/my-project/my-repo/hello-world@sha256:6ec128e26cd5...",
|
||||
"tag":"us-east1-docker.pkg.dev/my-project/my-repo/hello-world:1.1"
|
||||
}
|
||||
```
|
||||
|
||||
This simple example would match `ImageRepositories` containing the name `hello-world`.
|
||||
|
||||
If you want to do more complex processing:
|
||||
|
||||
```yaml
|
||||
resourceFilter: has(res.metadata.annotations) && req.tag.split('/').last().value().split(":").first().value() == res.metadata.annotations['update-image']
|
||||
```
|
||||
|
||||
This would look for an annotation "update-image" on the resource, and match it to the `hello-world` part of the tag name.
|
||||
|
||||
**Note:** Currently the `resource` value in the CEL expression only provides the object metadata, this means you can access things like `res.metadata.labels`, `res.metadata.annotations` and `res.metadata.name`.
|
||||
|
||||
### Secret reference
|
||||
|
||||
`.spec.secretRef.name` is a required field to specify a name reference to a
|
||||
|
|
|
|||
6
go.mod
6
go.mod
|
|
@ -25,6 +25,7 @@ require (
|
|||
github.com/fluxcd/pkg/ssa v0.44.0
|
||||
github.com/getsentry/sentry-go v0.31.1
|
||||
github.com/go-logr/logr v1.4.2
|
||||
github.com/google/cel-go v0.23.1
|
||||
github.com/google/go-github/v64 v64.0.0
|
||||
github.com/hashicorp/go-retryablehttp v0.7.7
|
||||
github.com/ktrysmt/go-bitbucket v0.9.81
|
||||
|
|
@ -51,6 +52,7 @@ require (
|
|||
replace gopkg.in/yaml.v3 => gopkg.in/yaml.v3 v3.0.1
|
||||
|
||||
require (
|
||||
cel.dev/expr v0.19.1 // indirect
|
||||
cloud.google.com/go v0.116.0 // indirect
|
||||
cloud.google.com/go/auth v0.12.1 // indirect
|
||||
cloud.google.com/go/auth/oauth2adapt v0.2.6 // indirect
|
||||
|
|
@ -75,6 +77,7 @@ require (
|
|||
github.com/DataDog/zstd v1.5.2 // indirect
|
||||
github.com/MakeNowJust/heredoc v1.0.0 // indirect
|
||||
github.com/ProtonMail/go-crypto v1.1.5 // indirect
|
||||
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/blang/semver/v4 v4.0.0 // indirect
|
||||
github.com/bradleyfalzon/ghinstallation/v2 v2.13.0 // indirect
|
||||
|
|
@ -92,6 +95,7 @@ require (
|
|||
github.com/fatih/color v1.16.0 // indirect
|
||||
github.com/felixge/httpsnoop v1.0.4 // indirect
|
||||
github.com/fluxcd/pkg/apis/acl v0.6.0 // indirect
|
||||
github.com/fluxcd/pkg/apis/kustomize v1.9.0 // indirect
|
||||
github.com/fluxcd/pkg/auth v0.3.0 // indirect
|
||||
github.com/fsnotify/fsnotify v1.8.0 // indirect
|
||||
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
|
||||
|
|
@ -160,6 +164,7 @@ require (
|
|||
github.com/russross/blackfriday/v2 v2.1.0 // indirect
|
||||
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1 // indirect
|
||||
github.com/spf13/cobra v1.8.1 // indirect
|
||||
github.com/stoewer/go-strcase v1.3.0 // indirect
|
||||
github.com/x448/float16 v0.8.4 // indirect
|
||||
github.com/xlab/treeprint v1.2.0 // indirect
|
||||
go.opencensus.io v0.24.0 // indirect
|
||||
|
|
@ -172,6 +177,7 @@ require (
|
|||
go.uber.org/multierr v1.11.0 // indirect
|
||||
go.uber.org/zap v1.27.0 // indirect
|
||||
golang.org/x/crypto v0.32.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e // indirect
|
||||
golang.org/x/mod v0.22.0 // indirect
|
||||
golang.org/x/net v0.34.0 // indirect
|
||||
golang.org/x/sync v0.10.0 // indirect
|
||||
|
|
|
|||
12
go.sum
12
go.sum
|
|
@ -1,3 +1,5 @@
|
|||
cel.dev/expr v0.19.1 h1:NciYrtDRIR0lNCnH1LFJegdjspNx9fI59O7TWcua/W4=
|
||||
cel.dev/expr v0.19.1/go.mod h1:MrpN08Q+lEBs+bGYdLxxHkZoUSsCp0nSKTs0nTymJgw=
|
||||
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
|
||||
cloud.google.com/go v0.116.0 h1:B3fRrSDkLRt5qSHWe40ERJvhvnQwdZiHu0bJOpldweE=
|
||||
cloud.google.com/go v0.116.0/go.mod h1:cEPSRWPzZEswwdr9BxE6ChEn01dWlTaF05LiC2Xs70U=
|
||||
|
|
@ -79,6 +81,8 @@ github.com/PagerDuty/go-pagerduty v1.8.0 h1:MTFqTffIcAervB83U7Bx6HERzLbyaSPL/+ox
|
|||
github.com/PagerDuty/go-pagerduty v1.8.0/go.mod h1:nzIeAqyFSJAFkjWKvMzug0JtwDg+V+UoCWjFrfFH5mI=
|
||||
github.com/ProtonMail/go-crypto v1.1.5 h1:eoAQfK2dwL+tFSFpr7TbOaPNUbPiJj4fLYwwGE1FQO4=
|
||||
github.com/ProtonMail/go-crypto v1.1.5/go.mod h1:rA3QumHc/FZ8pAHreoekgiAbzpNsfQAosU5td4SnOrE=
|
||||
github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI=
|
||||
github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g=
|
||||
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
|
||||
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
|
||||
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||
|
|
@ -146,6 +150,8 @@ github.com/fluxcd/pkg/apis/acl v0.6.0 h1:rllf5uQLzTow81ZCslkQ6LPpDNqVQr6/fWaNksd
|
|||
github.com/fluxcd/pkg/apis/acl v0.6.0/go.mod h1:IVDZx3MAoDWjlLrJHMF9Z27huFuXAEQlnbWw0M6EcTs=
|
||||
github.com/fluxcd/pkg/apis/event v0.16.0 h1:ffKc/3erowPnh72lFszz7sPQhLZ7bhqNrq+pu1Pb+JE=
|
||||
github.com/fluxcd/pkg/apis/event v0.16.0/go.mod h1:D/QQi5lHT9/Ur3OMFLJO71D4KDQHbJ5s8dQV3h1ZAT0=
|
||||
github.com/fluxcd/pkg/apis/kustomize v1.9.0 h1:SJpT1CK58AnTvCpDKeGfMNA0Xud/4VReZNvPe8XkTxo=
|
||||
github.com/fluxcd/pkg/apis/kustomize v1.9.0/go.mod h1:AZl2GU03oPVue6SUivdiIYd/3mvF94j7t1G2JO26d4s=
|
||||
github.com/fluxcd/pkg/apis/meta v1.10.0 h1:rqbAuyl5ug7A5jjRf/rNwBXmNl6tJ9wG2iIsriwnQUk=
|
||||
github.com/fluxcd/pkg/apis/meta v1.10.0/go.mod h1:n7NstXHDaleAUMajcXTVkhz0MYkvEXy1C/eLI/t1xoI=
|
||||
github.com/fluxcd/pkg/auth v0.3.0 h1:I1A3e81O+bpAgEcJ3e+rXqObKPjzBu6FLYXQTSxXLOs=
|
||||
|
|
@ -226,6 +232,8 @@ github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek
|
|||
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
|
||||
github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg=
|
||||
github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4=
|
||||
github.com/google/cel-go v0.23.1 h1:91ThhEZlBcE5rB2adBVXqvDoqdL8BG2oyhd0bK1I/r4=
|
||||
github.com/google/cel-go v0.23.1/go.mod h1:52Pb6QsDbC5kvgxvZhiL9QX1oZEkcUF/ZqaPx1J5Wwo=
|
||||
github.com/google/gnostic-models v0.6.9 h1:MU/8wDLif2qCXZmzncUQ/BOfxWfthHi63KqpoNbWqVw=
|
||||
github.com/google/gnostic-models v0.6.9/go.mod h1:CiWsm0s6BSQd1hRn8/QmxqB6BesYcbSZxsz9b0KuDBw=
|
||||
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
|
||||
|
|
@ -396,6 +404,8 @@ github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3k
|
|||
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
|
||||
github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o=
|
||||
github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
|
||||
github.com/stoewer/go-strcase v1.3.0 h1:g0eASXYtp+yvN9fK8sH94oCIk0fau9uV1/ZdJ0AVEzs=
|
||||
github.com/stoewer/go-strcase v1.3.0/go.mod h1:fAH5hQ5pehh+j3nZfvwdk2RgEgQjAoM8wodgtPmh1xo=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
||||
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
|
||||
|
|
@ -459,6 +469,8 @@ golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v
|
|||
golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc=
|
||||
golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc=
|
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e h1:I88y4caeGeuDQxgdoFPUq097j7kNfw6uvuiNxUBfcBk=
|
||||
golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e/go.mod h1:akd2r19cwCdwSwWeIdzYQGa/EZZyqcOdwWiwj5L5eKQ=
|
||||
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
|
||||
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
|
||||
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
|
||||
|
|
|
|||
|
|
@ -156,6 +156,21 @@ func (r *ReceiverReconciler) Reconcile(ctx context.Context, req ctrl.Request) (r
|
|||
// reconcile steps through the actual reconciliation tasks for the object, it returns early on the first step that
|
||||
// produces an error.
|
||||
func (r *ReceiverReconciler) reconcile(ctx context.Context, obj *apiv1.Receiver) (ctrl.Result, error) {
|
||||
log := ctrl.LoggerFrom(ctx)
|
||||
|
||||
if filter := obj.Spec.ResourceFilter; filter != "" {
|
||||
if err := server.ValidateResourceFilter(filter); err != nil {
|
||||
const msg = "Reconciliation failed terminally due to configuration error"
|
||||
errMsg := fmt.Sprintf("%s: %v", msg, err)
|
||||
conditions.MarkFalse(obj, meta.ReadyCondition, meta.InvalidCELExpressionReason, "%s", errMsg)
|
||||
conditions.MarkStalled(obj, meta.InvalidCELExpressionReason, "%s", errMsg)
|
||||
obj.Status.ObservedGeneration = obj.Generation
|
||||
log.Error(err, msg)
|
||||
r.Event(obj, corev1.EventTypeWarning, meta.InvalidCELExpressionReason, errMsg)
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Mark the resource as under reconciliation.
|
||||
conditions.MarkReconciling(obj, meta.ProgressingReason, "Reconciliation in progress")
|
||||
|
||||
|
|
@ -163,7 +178,7 @@ func (r *ReceiverReconciler) reconcile(ctx context.Context, obj *apiv1.Receiver)
|
|||
if err != nil {
|
||||
conditions.MarkFalse(obj, meta.ReadyCondition, apiv1.TokenNotFoundReason, "%s", err)
|
||||
obj.Status.WebhookPath = ""
|
||||
return ctrl.Result{Requeue: true}, err
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
webhookPath := obj.GetWebhookPath(token)
|
||||
|
|
@ -174,7 +189,7 @@ func (r *ReceiverReconciler) reconcile(ctx context.Context, obj *apiv1.Receiver)
|
|||
|
||||
if obj.Status.WebhookPath != webhookPath {
|
||||
obj.Status.WebhookPath = webhookPath
|
||||
ctrl.LoggerFrom(ctx).Info(msg)
|
||||
log.Info(msg)
|
||||
}
|
||||
|
||||
return ctrl.Result{RequeueAfter: obj.GetInterval()}, nil
|
||||
|
|
|
|||
|
|
@ -144,6 +144,45 @@ func TestReceiverReconciler_Reconcile(t *testing.T) {
|
|||
g.Expect(resultR.Spec.Interval.Duration).To(BeIdenticalTo(10 * time.Minute))
|
||||
})
|
||||
|
||||
t.Run("fails with invalid CEL resource filter", func(t *testing.T) {
|
||||
g := NewWithT(t)
|
||||
g.Expect(k8sClient.Get(context.Background(), client.ObjectKeyFromObject(receiver), resultR)).To(Succeed())
|
||||
|
||||
// Incomplete CEL expression
|
||||
patch := []byte(`{"spec":{"resourceFilter":"has(res.metadata.annotations"}}`)
|
||||
g.Expect(k8sClient.Patch(context.Background(), resultR, client.RawPatch(types.MergePatchType, patch))).To(Succeed())
|
||||
|
||||
g.Eventually(func() bool {
|
||||
_ = k8sClient.Get(context.Background(), client.ObjectKeyFromObject(receiver), resultR)
|
||||
return !conditions.IsReady(resultR)
|
||||
}, timeout, time.Second).Should(BeTrue())
|
||||
|
||||
g.Expect(resultR.Status.ObservedGeneration).To(Equal(resultR.Generation))
|
||||
|
||||
g.Expect(conditions.GetReason(resultR, meta.ReadyCondition)).To(BeIdenticalTo(meta.InvalidCELExpressionReason))
|
||||
g.Expect(conditions.GetMessage(resultR, meta.ReadyCondition)).To(ContainSubstring("annotations"))
|
||||
|
||||
g.Expect(conditions.Has(resultR, meta.StalledCondition)).To(BeTrue())
|
||||
g.Expect(conditions.GetReason(resultR, meta.StalledCondition)).To(BeIdenticalTo(meta.InvalidCELExpressionReason))
|
||||
g.Expect(conditions.GetObservedGeneration(resultR, meta.StalledCondition)).To(BeIdenticalTo(resultR.Generation))
|
||||
})
|
||||
|
||||
t.Run("recovers when the CEL expression is valid", func(t *testing.T) {
|
||||
g := NewWithT(t)
|
||||
// Incomplete CEL expression
|
||||
patch := []byte(`{"spec":{"resourceFilter":"has(res.metadata.annotations)"}}`)
|
||||
g.Expect(k8sClient.Patch(context.Background(), resultR, client.RawPatch(types.MergePatchType, patch))).To(Succeed())
|
||||
|
||||
g.Eventually(func() bool {
|
||||
_ = k8sClient.Get(context.Background(), client.ObjectKeyFromObject(receiver), resultR)
|
||||
return conditions.IsReady(resultR)
|
||||
}, timeout, time.Second).Should(BeTrue())
|
||||
|
||||
g.Expect(conditions.GetObservedGeneration(resultR, meta.ReadyCondition)).To(BeIdenticalTo(resultR.Generation))
|
||||
g.Expect(resultR.Status.ObservedGeneration).To(BeIdenticalTo(resultR.Generation))
|
||||
g.Expect(conditions.Has(resultR, meta.ReconcilingCondition)).To(BeFalse())
|
||||
})
|
||||
|
||||
t.Run("fails with secret not found error", func(t *testing.T) {
|
||||
g := NewWithT(t)
|
||||
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load Diff
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package server
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/hmac"
|
||||
"crypto/sha1"
|
||||
|
|
@ -46,8 +47,11 @@ import (
|
|||
apiv1 "github.com/fluxcd/notification-controller/api/v1"
|
||||
)
|
||||
|
||||
var (
|
||||
WebhookPathIndexKey = ".metadata.webhookPath"
|
||||
const (
|
||||
WebhookPathIndexKey string = ".metadata.webhookPath"
|
||||
|
||||
// maxRequestSizeBytes is the maximum size of a request to the API server
|
||||
maxRequestSizeBytes int64 = 3 * 1024 * 1024
|
||||
)
|
||||
|
||||
// defaultFluxAPIVersions is a map of Flux API kinds to their API versions.
|
||||
|
|
@ -70,65 +74,140 @@ func IndexReceiverWebhookPath(o client.Object) []string {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *ReceiverServer) handlePayload() func(w http.ResponseWriter, r *http.Request) {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := context.Background()
|
||||
digest := url.PathEscape(strings.TrimPrefix(r.RequestURI, apiv1.ReceiverWebhookPath))
|
||||
func (s *ReceiverServer) handlePayload(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
digest := url.PathEscape(strings.TrimPrefix(r.RequestURI, apiv1.ReceiverWebhookPath))
|
||||
|
||||
s.logger.Info(fmt.Sprintf("handling request: %s", digest))
|
||||
s.logger.Info(fmt.Sprintf("handling request: %s", digest))
|
||||
|
||||
var allReceivers apiv1.ReceiverList
|
||||
err := s.kubeClient.List(ctx, &allReceivers, client.MatchingFields{
|
||||
WebhookPathIndexKey: r.RequestURI,
|
||||
}, client.Limit(1))
|
||||
if err != nil {
|
||||
s.logger.Error(err, "unable to list receivers")
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
var allReceivers apiv1.ReceiverList
|
||||
err := s.kubeClient.List(ctx, &allReceivers, client.MatchingFields{
|
||||
WebhookPathIndexKey: r.RequestURI,
|
||||
}, client.Limit(1))
|
||||
if err != nil {
|
||||
s.logger.Error(err, "unable to list receivers")
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
if len(allReceivers.Items) == 0 {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
if len(allReceivers.Items) == 0 {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
receiver := allReceivers.Items[0]
|
||||
logger := s.logger.WithValues(
|
||||
"reconciler kind", apiv1.ReceiverKind,
|
||||
"name", receiver.Name,
|
||||
"namespace", receiver.Namespace)
|
||||
receiver := allReceivers.Items[0]
|
||||
logger := s.logger.WithValues(
|
||||
"reconciler kind", apiv1.ReceiverKind,
|
||||
"name", receiver.Name,
|
||||
"namespace", receiver.Namespace)
|
||||
|
||||
if receiver.Spec.Suspend || !conditions.IsReady(&receiver) {
|
||||
err := errors.New("unable to process request")
|
||||
if receiver.Spec.Suspend {
|
||||
logger.Error(err, "receiver is suspended")
|
||||
} else {
|
||||
logger.Error(err, "receiver is not ready")
|
||||
}
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
|
||||
if err := s.validate(ctx, receiver, r); err != nil {
|
||||
logger.Error(err, "unable to validate payload")
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
var withErrors bool
|
||||
for _, resource := range receiver.Spec.Resources {
|
||||
if err := s.requestReconciliation(ctx, logger, resource, receiver.Namespace); err != nil {
|
||||
logger.Error(err, "unable to request reconciliation", "resource", resource)
|
||||
withErrors = true
|
||||
}
|
||||
}
|
||||
|
||||
if withErrors {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
if receiver.Spec.Suspend || !conditions.IsReady(&receiver) {
|
||||
err := errors.New("unable to process request")
|
||||
if receiver.Spec.Suspend {
|
||||
logger.Error(err, "receiver is suspended")
|
||||
} else {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
logger.Error(err, "receiver is not ready")
|
||||
}
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
|
||||
if err := s.validate(ctx, receiver, r); err != nil {
|
||||
logger.Error(err, "unable to validate payload")
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
var evaluator func(context.Context, client.Object) (*bool, error)
|
||||
if receiver.Spec.ResourceFilter != "" {
|
||||
evaluator, err = newResourceFilter(receiver.Spec.ResourceFilter, r)
|
||||
if err != nil {
|
||||
logger.Error(err, "unable to create CEL evaluator")
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
}
|
||||
}
|
||||
|
||||
var withErrors bool
|
||||
for _, resource := range receiver.Spec.Resources {
|
||||
if err := s.requestReconciliation(ctx, logger, resource, receiver.Namespace, evaluator); err != nil {
|
||||
logger.Error(err, "unable to request reconciliation", "resource", resource)
|
||||
withErrors = true
|
||||
}
|
||||
}
|
||||
|
||||
if withErrors {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
} else {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *ReceiverServer) notifySingleResource(ctx context.Context, logger logr.Logger, resource *metav1.PartialObjectMetadata, resourceFilter resourceFilter) error {
|
||||
objectKey := client.ObjectKeyFromObject(resource)
|
||||
if err := s.kubeClient.Get(ctx, objectKey, resource); err != nil {
|
||||
return fmt.Errorf("unable to read %s %q error: %w", resource.Kind, objectKey, err)
|
||||
}
|
||||
|
||||
return s.notifyResource(ctx, logger, resource, resourceFilter)
|
||||
}
|
||||
|
||||
func (s *ReceiverServer) notifyResource(ctx context.Context, logger logr.Logger, resource *metav1.PartialObjectMetadata, resourceFilter resourceFilter) error {
|
||||
if resourceFilter != nil {
|
||||
accept, err := resourceFilter(ctx, resource)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !*accept {
|
||||
logger.V(1).Info(fmt.Sprintf("resource '%s/%s.%s' NOT annotated because CEL expression returned false", resource.Kind, resource.Name, resource.Namespace))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
err := s.annotate(ctx, resource)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to annotate resource: '%s/%s.%s': %w", resource.Kind, resource.Name, resource.Namespace, err)
|
||||
} else {
|
||||
logger.Info(fmt.Sprintf("resource '%s/%s.%s' annotated",
|
||||
resource.Kind, resource.Name, resource.Namespace))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *ReceiverServer) notifyDynamicResources(ctx context.Context, logger logr.Logger, resource apiv1.CrossNamespaceObjectReference, namespace, group, version string, resourceFilter resourceFilter) error {
|
||||
if resource.MatchLabels == nil {
|
||||
return fmt.Errorf("matchLabels field not set when using wildcard '*' as name")
|
||||
}
|
||||
|
||||
logger.V(1).Info(fmt.Sprintf("annotate resources by matchLabel for kind %q in %q",
|
||||
resource.Kind, namespace), "matchLabels", resource.MatchLabels)
|
||||
|
||||
var resources metav1.PartialObjectMetadataList
|
||||
resources.SetGroupVersionKind(schema.GroupVersionKind{
|
||||
Group: group,
|
||||
Kind: resource.Kind,
|
||||
Version: version,
|
||||
})
|
||||
|
||||
if err := s.kubeClient.List(ctx, &resources,
|
||||
client.InNamespace(namespace),
|
||||
client.MatchingLabels(resource.MatchLabels),
|
||||
); err != nil {
|
||||
return fmt.Errorf("failed listing resources in namespace %q by matching labels %q: %w", namespace, resource.MatchLabels, err)
|
||||
}
|
||||
|
||||
if len(resources.Items) == 0 {
|
||||
noObjectsFoundErr := fmt.Errorf("no %q resources found with matching labels %q' in %q namespace", resource.Kind, resource.MatchLabels, namespace)
|
||||
logger.Error(noObjectsFoundErr, "error annotating resources")
|
||||
return nil
|
||||
}
|
||||
|
||||
for i := range resources.Items {
|
||||
if err := s.notifyResource(ctx, logger, &resources.Items[i], resourceFilter); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *ReceiverServer) validate(ctx context.Context, receiver apiv1.Receiver, r *http.Request) error {
|
||||
|
|
@ -142,6 +221,7 @@ func (s *ReceiverServer) validate(ctx context.Context, receiver apiv1.Receiver,
|
|||
"name", receiver.Name,
|
||||
"namespace", receiver.Namespace)
|
||||
|
||||
r.Body = io.NopCloser(io.LimitReader(r.Body, maxRequestSizeBytes))
|
||||
switch receiver.Spec.Type {
|
||||
case apiv1.GenericReceiver:
|
||||
return nil
|
||||
|
|
@ -155,9 +235,15 @@ func (s *ReceiverServer) validate(ctx context.Context, receiver apiv1.Receiver,
|
|||
if err != nil {
|
||||
return fmt.Errorf("unable to validate HMAC signature: %s", err)
|
||||
}
|
||||
r.Body = io.NopCloser(bytes.NewReader(b))
|
||||
return nil
|
||||
case apiv1.GitHubReceiver:
|
||||
_, err := github.ValidatePayload(r, []byte(token))
|
||||
b, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to read request body: %s", err)
|
||||
}
|
||||
r.Body = io.NopCloser(bytes.NewReader(b))
|
||||
_, err = github.ValidatePayload(r, []byte(token))
|
||||
if err != nil {
|
||||
return fmt.Errorf("the GitHub signature header is invalid, err: %w", err)
|
||||
}
|
||||
|
|
@ -172,11 +258,12 @@ func (s *ReceiverServer) validate(ctx context.Context, receiver apiv1.Receiver,
|
|||
}
|
||||
}
|
||||
if !allowed {
|
||||
return fmt.Errorf("the GitHub event '%s' is not authorised", event)
|
||||
return fmt.Errorf("the GitHub event %q is not authorised", event)
|
||||
}
|
||||
}
|
||||
|
||||
logger.Info(fmt.Sprintf("handling GitHub event: %s", event))
|
||||
r.Body = io.NopCloser(bytes.NewReader(b))
|
||||
return nil
|
||||
case apiv1.GitLabReceiver:
|
||||
if r.Header.Get("X-Gitlab-Token") != token {
|
||||
|
|
@ -193,7 +280,7 @@ func (s *ReceiverServer) validate(ctx context.Context, receiver apiv1.Receiver,
|
|||
}
|
||||
}
|
||||
if !allowed {
|
||||
return fmt.Errorf("the GitLab event '%s' is not authorised", event)
|
||||
return fmt.Errorf("the GitLab event %q is not authorised", event)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -225,14 +312,20 @@ func (s *ReceiverServer) validate(ctx context.Context, receiver apiv1.Receiver,
|
|||
}
|
||||
}
|
||||
if !allowed {
|
||||
return fmt.Errorf("the CDEvent '%s' is not authorised", event)
|
||||
return fmt.Errorf("the CDEvent %q is not authorised", event)
|
||||
}
|
||||
}
|
||||
|
||||
logger.Info(fmt.Sprintf("handling CDEvent: %s", event))
|
||||
r.Body = io.NopCloser(bytes.NewReader(b))
|
||||
return nil
|
||||
case apiv1.BitbucketReceiver:
|
||||
_, err := github.ValidatePayload(r, []byte(token))
|
||||
b, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to read request body: %s", err)
|
||||
}
|
||||
r.Body = io.NopCloser(bytes.NewReader(b))
|
||||
_, err = github.ValidatePayload(r, []byte(token))
|
||||
if err != nil {
|
||||
return fmt.Errorf("the Bitbucket server signature header is invalid, err: %w", err)
|
||||
}
|
||||
|
|
@ -247,11 +340,12 @@ func (s *ReceiverServer) validate(ctx context.Context, receiver apiv1.Receiver,
|
|||
}
|
||||
}
|
||||
if !allowed {
|
||||
return fmt.Errorf("the Bitbucket server event '%s' is not authorised", event)
|
||||
return fmt.Errorf("the Bitbucket server event %q is not authorised", event)
|
||||
}
|
||||
}
|
||||
|
||||
logger.Info(fmt.Sprintf("handling Bitbucket server event: %s", event))
|
||||
r.Body = io.NopCloser(bytes.NewReader(b))
|
||||
return nil
|
||||
case apiv1.QuayReceiver:
|
||||
type payload struct {
|
||||
|
|
@ -259,12 +353,18 @@ func (s *ReceiverServer) validate(ctx context.Context, receiver apiv1.Receiver,
|
|||
UpdatedTags []string `json:"updated_tags"`
|
||||
}
|
||||
|
||||
b, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to read request body: %s", err)
|
||||
}
|
||||
r.Body = io.NopCloser(bytes.NewReader(b))
|
||||
var p payload
|
||||
if err := json.NewDecoder(r.Body).Decode(&p); err != nil {
|
||||
return fmt.Errorf("cannot decode Quay webhook payload")
|
||||
return fmt.Errorf("cannot decode Quay webhook payload: %w", err)
|
||||
}
|
||||
|
||||
logger.Info(fmt.Sprintf("handling Quay event from %s", p.DockerUrl))
|
||||
r.Body = io.NopCloser(bytes.NewReader(b))
|
||||
return nil
|
||||
case apiv1.HarborReceiver:
|
||||
if r.Header.Get("Authorization") != token {
|
||||
|
|
@ -282,12 +382,18 @@ func (s *ReceiverServer) validate(ctx context.Context, receiver apiv1.Receiver,
|
|||
URL string `json:"repo_url"`
|
||||
} `json:"repository"`
|
||||
}
|
||||
b, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to read request body: %s", err)
|
||||
}
|
||||
r.Body = io.NopCloser(bytes.NewReader(b))
|
||||
var p payload
|
||||
if err := json.NewDecoder(r.Body).Decode(&p); err != nil {
|
||||
return fmt.Errorf("cannot decode DockerHub webhook payload")
|
||||
}
|
||||
|
||||
logger.Info(fmt.Sprintf("handling DockerHub event from %s for tag %s", p.Repository.URL, p.PushData.Tag))
|
||||
r.Body = io.NopCloser(bytes.NewReader(b))
|
||||
return nil
|
||||
case apiv1.GCRReceiver:
|
||||
const tokenIndex = len("Bearer ")
|
||||
|
|
@ -309,23 +415,30 @@ func (s *ReceiverServer) validate(ctx context.Context, receiver apiv1.Receiver,
|
|||
|
||||
err := authenticateGCRRequest(&http.Client{}, r.Header.Get("Authorization"), tokenIndex)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot authenticate GCR request: %s", err)
|
||||
return fmt.Errorf("cannot authenticate GCR request: %w", err)
|
||||
}
|
||||
|
||||
var p payload
|
||||
if err := json.NewDecoder(r.Body).Decode(&p); err != nil {
|
||||
return fmt.Errorf("cannot decode GCR webhook payload")
|
||||
return fmt.Errorf("cannot decode GCR webhook payload: %w", err)
|
||||
}
|
||||
|
||||
// The GCR payload is a Google PubSub event with the GCR event wrapped
|
||||
// inside (in base64 JSON).
|
||||
raw, _ := base64.StdEncoding.DecodeString(p.Message.Data)
|
||||
|
||||
var d data
|
||||
err = json.Unmarshal(raw, &d)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot decode GCR webhook body")
|
||||
return fmt.Errorf("cannot decode GCR webhook body: %w", err)
|
||||
}
|
||||
|
||||
logger.Info(fmt.Sprintf("handling GCR event from %s for tag %s", d.Digest, d.Tag))
|
||||
encodedPayload, err := json.Marshal(d)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot decode GCR webhook body: %w", err)
|
||||
}
|
||||
// This only puts the unwrapped event into the payload.
|
||||
r.Body = io.NopCloser(bytes.NewReader(encodedPayload))
|
||||
return nil
|
||||
case apiv1.NexusReceiver:
|
||||
signature := r.Header.Get("X-Nexus-Webhook-Signature")
|
||||
|
|
@ -335,7 +448,7 @@ func (s *ReceiverServer) validate(ctx context.Context, receiver apiv1.Receiver,
|
|||
|
||||
b, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read Nexus payload. error: %s", err)
|
||||
return fmt.Errorf("cannot read Nexus payload. error: %w", err)
|
||||
}
|
||||
|
||||
if !verifyHmacSignature([]byte(token), signature, b) {
|
||||
|
|
@ -347,10 +460,11 @@ func (s *ReceiverServer) validate(ctx context.Context, receiver apiv1.Receiver,
|
|||
}
|
||||
var p payload
|
||||
if err := json.Unmarshal(b, &p); err != nil {
|
||||
return fmt.Errorf("cannot decode Nexus webhook payload: %s", err)
|
||||
return fmt.Errorf("cannot decode Nexus webhook payload: %w", err)
|
||||
}
|
||||
|
||||
logger.Info(fmt.Sprintf("handling Nexus event from %s", p.RepositoryName))
|
||||
r.Body = io.NopCloser(bytes.NewReader(b))
|
||||
return nil
|
||||
case apiv1.ACRReceiver:
|
||||
type target struct {
|
||||
|
|
@ -363,16 +477,23 @@ func (s *ReceiverServer) validate(ctx context.Context, receiver apiv1.Receiver,
|
|||
Target target `json:"target"`
|
||||
}
|
||||
|
||||
b, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to read request body: %s", err)
|
||||
}
|
||||
r.Body = io.NopCloser(bytes.NewReader(b))
|
||||
|
||||
var p payload
|
||||
if err := json.NewDecoder(r.Body).Decode(&p); err != nil {
|
||||
return fmt.Errorf("cannot decode ACR webhook payload: %s", err)
|
||||
}
|
||||
|
||||
logger.Info(fmt.Sprintf("handling ACR event from %s for tag %s", p.Target.Repository, p.Target.Tag))
|
||||
r.Body = io.NopCloser(bytes.NewReader(b))
|
||||
return nil
|
||||
}
|
||||
|
||||
return fmt.Errorf("recevier type '%s' not supported", receiver.Spec.Type)
|
||||
return fmt.Errorf("recevier type %q not supported", receiver.Spec.Type)
|
||||
}
|
||||
|
||||
func (s *ReceiverServer) token(ctx context.Context, receiver apiv1.Receiver) (string, error) {
|
||||
|
|
@ -385,20 +506,20 @@ func (s *ReceiverServer) token(ctx context.Context, receiver apiv1.Receiver) (st
|
|||
var secret corev1.Secret
|
||||
err := s.kubeClient.Get(ctx, secretName, &secret)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("unable to read token from secret '%s' error: %w", secretName, err)
|
||||
return "", fmt.Errorf("unable to read token from secret %q error: %w", secretName, err)
|
||||
}
|
||||
|
||||
if val, ok := secret.Data["token"]; ok {
|
||||
token = string(val)
|
||||
} else {
|
||||
return "", fmt.Errorf("invalid '%s' secret data: required field 'token'", secretName)
|
||||
return "", fmt.Errorf("invalid %q secret data: required field 'token'", secretName)
|
||||
}
|
||||
|
||||
return token, nil
|
||||
}
|
||||
|
||||
// requestReconciliation requests reconciliation of all the resources matching the given CrossNamespaceObjectReference by annotating them accordingly.
|
||||
func (s *ReceiverServer) requestReconciliation(ctx context.Context, logger logr.Logger, resource apiv1.CrossNamespaceObjectReference, defaultNamespace string) error {
|
||||
func (s *ReceiverServer) requestReconciliation(ctx context.Context, logger logr.Logger, resource apiv1.CrossNamespaceObjectReference, defaultNamespace string, resourceFilter resourceFilter) error {
|
||||
namespace := defaultNamespace
|
||||
if resource.Namespace != "" {
|
||||
if s.noCrossNamespaceRefs && resource.Namespace != defaultNamespace {
|
||||
|
|
@ -410,7 +531,7 @@ func (s *ReceiverServer) requestReconciliation(ctx context.Context, logger logr.
|
|||
apiVersion := resource.APIVersion
|
||||
if apiVersion == "" {
|
||||
if defaultFluxAPIVersions[resource.Kind] == "" {
|
||||
return fmt.Errorf("apiVersion must be specified for kind '%s'", resource.Kind)
|
||||
return fmt.Errorf("apiVersion must be specified for kind %q", resource.Kind)
|
||||
}
|
||||
apiVersion = defaultFluxAPIVersions[resource.Kind]
|
||||
}
|
||||
|
|
@ -418,43 +539,7 @@ func (s *ReceiverServer) requestReconciliation(ctx context.Context, logger logr.
|
|||
group, version := getGroupVersion(apiVersion)
|
||||
|
||||
if resource.Name == "*" {
|
||||
if resource.MatchLabels == nil {
|
||||
return fmt.Errorf("matchLabels field not set when using wildcard '*' as name")
|
||||
}
|
||||
|
||||
logger.V(1).Info(fmt.Sprintf("annotate resources by matchLabel for kind '%s' in '%s'",
|
||||
resource.Kind, namespace), "matchLabels", resource.MatchLabels)
|
||||
|
||||
var resources metav1.PartialObjectMetadataList
|
||||
resources.SetGroupVersionKind(schema.GroupVersionKind{
|
||||
Group: group,
|
||||
Kind: resource.Kind,
|
||||
Version: version,
|
||||
})
|
||||
|
||||
if err := s.kubeClient.List(ctx, &resources,
|
||||
client.InNamespace(namespace),
|
||||
client.MatchingLabels(resource.MatchLabels),
|
||||
); err != nil {
|
||||
return fmt.Errorf("failed listing resources in namespace %q by matching labels %q: %w", namespace, resource.MatchLabels, err)
|
||||
}
|
||||
|
||||
if len(resources.Items) == 0 {
|
||||
noObjectsFoundErr := fmt.Errorf("no '%s' resources found with matching labels '%s' in '%s' namespace", resource.Kind, resource.MatchLabels, namespace)
|
||||
logger.Error(noObjectsFoundErr, "error annotating resources")
|
||||
return nil
|
||||
}
|
||||
|
||||
for i, resource := range resources.Items {
|
||||
if err := s.annotate(ctx, &resources.Items[i]); err != nil {
|
||||
return fmt.Errorf("failed to annotate resource: '%s/%s.%s': %w", resource.Kind, resource.Name, namespace, err)
|
||||
} else {
|
||||
logger.Info(fmt.Sprintf("resource '%s/%s.%s' annotated",
|
||||
resource.Kind, resource.Name, namespace))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return s.notifyDynamicResources(ctx, logger, resource, namespace, group, version, resourceFilter)
|
||||
}
|
||||
|
||||
u := &metav1.PartialObjectMetadata{}
|
||||
|
|
@ -463,25 +548,10 @@ func (s *ReceiverServer) requestReconciliation(ctx context.Context, logger logr.
|
|||
Kind: resource.Kind,
|
||||
Version: version,
|
||||
})
|
||||
u.SetNamespace(namespace)
|
||||
u.SetName(resource.Name)
|
||||
|
||||
objectKey := client.ObjectKey{
|
||||
Namespace: namespace,
|
||||
Name: resource.Name,
|
||||
}
|
||||
|
||||
if err := s.kubeClient.Get(ctx, objectKey, u); err != nil {
|
||||
return fmt.Errorf("unable to read %s '%s' error: %w", resource.Kind, objectKey, err)
|
||||
}
|
||||
|
||||
err := s.annotate(ctx, u)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to annotate resource: '%s/%s.%s': %w", resource.Kind, resource.Name, namespace, err)
|
||||
} else {
|
||||
logger.Info(fmt.Sprintf("resource '%s/%s.%s' annotated",
|
||||
resource.Kind, resource.Name, namespace))
|
||||
}
|
||||
|
||||
return nil
|
||||
return s.notifySingleResource(ctx, logger, u, resourceFilter)
|
||||
}
|
||||
|
||||
func (s *ReceiverServer) annotate(ctx context.Context, resource *metav1.PartialObjectMetadata) error {
|
||||
|
|
@ -496,7 +566,7 @@ func (s *ReceiverServer) annotate(ctx context.Context, resource *metav1.PartialO
|
|||
resource.SetAnnotations(sourceAnnotations)
|
||||
|
||||
if err := s.kubeClient.Patch(ctx, resource, patch); err != nil {
|
||||
return fmt.Errorf("unable to annotate %s '%s' error: %w", resource.Kind, client.ObjectKey{
|
||||
return fmt.Errorf("unable to annotate %s %q error: %w", resource.Kind, client.ObjectKey{
|
||||
Namespace: resource.Namespace,
|
||||
Name: resource.Name,
|
||||
}, err)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,108 @@
|
|||
/*
|
||||
Copyright 2025 The Flux authors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"mime"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/fluxcd/pkg/runtime/cel"
|
||||
"github.com/google/cel-go/common/types"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
)
|
||||
|
||||
type resourceFilter func(context.Context, client.Object) (*bool, error)
|
||||
|
||||
// ValidateResourceFilter accepts a CEL expression and will parse and check that
|
||||
// it's valid, if it's not valid an error is returned.
|
||||
func ValidateResourceFilter(s string) error {
|
||||
_, err := newFilterExpression(s)
|
||||
return err
|
||||
}
|
||||
|
||||
func newFilterExpression(s string) (*cel.Expression, error) {
|
||||
return cel.NewExpression(s,
|
||||
cel.WithCompile(),
|
||||
cel.WithOutputType(types.BoolType),
|
||||
cel.WithStructVariables("res", "req"))
|
||||
}
|
||||
|
||||
func newResourceFilter(expr string, r *http.Request) (resourceFilter, error) {
|
||||
celExpr, err := newFilterExpression(expr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Only decodes the body for the expression if the body is JSON.
|
||||
// Technically you could generate several resources without any body.
|
||||
var req map[string]any
|
||||
if !isJSONContent(r) {
|
||||
req = map[string]any{}
|
||||
} else if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
return nil, fmt.Errorf("failed to parse request body as JSON: %s", err)
|
||||
}
|
||||
|
||||
return func(ctx context.Context, obj client.Object) (*bool, error) {
|
||||
res, err := clientObjectToMap(obj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result, err := celExpr.EvaluateBoolean(ctx, map[string]any{
|
||||
"res": res,
|
||||
"req": req,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &result, nil
|
||||
}, nil
|
||||
}
|
||||
|
||||
func isJSONContent(r *http.Request) bool {
|
||||
contentType := r.Header.Get("Content-type")
|
||||
for _, v := range strings.Split(contentType, ",") {
|
||||
t, _, err := mime.ParseMediaType(v)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
if t == "application/json" {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func clientObjectToMap(v client.Object) (map[string]any, error) {
|
||||
b, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal PartialObjectMetadata from resource for CEL expression: %w", err)
|
||||
}
|
||||
|
||||
var result map[string]any
|
||||
if err := json.Unmarshal(b, &result); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal PartialObjectMetadata from resource for CEL expression: %w", err)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
|
@ -0,0 +1,163 @@
|
|||
/*
|
||||
Copyright 2025 The Flux authors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package server
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
"testing"
|
||||
|
||||
apiv1 "github.com/fluxcd/notification-controller/api/v1"
|
||||
. "github.com/onsi/gomega"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
)
|
||||
|
||||
func TestValidateCELExpressionValidExpressions(t *testing.T) {
|
||||
validationTests := []string{
|
||||
"true",
|
||||
"false",
|
||||
"req.value == 'test'",
|
||||
}
|
||||
|
||||
for _, tt := range validationTests {
|
||||
t.Run(tt, func(t *testing.T) {
|
||||
g := NewWithT(t)
|
||||
g.Expect(ValidateResourceFilter(tt)).To(Succeed())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateCELExpressionInvalidExpressions(t *testing.T) {
|
||||
validationTests := []struct {
|
||||
expression string
|
||||
wantError string
|
||||
}{
|
||||
{
|
||||
"'test'",
|
||||
"CEL expression output type mismatch: expected bool, got string",
|
||||
},
|
||||
{
|
||||
"requrest.body.value",
|
||||
"undeclared reference to 'requrest'",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range validationTests {
|
||||
t.Run(tt.expression, func(t *testing.T) {
|
||||
g := NewWithT(t)
|
||||
g.Expect(ValidateResourceFilter(tt.expression)).To(MatchError(ContainSubstring(tt.wantError)))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestCELEvaluation(t *testing.T) {
|
||||
evaluationTests := []struct {
|
||||
expression string
|
||||
request *http.Request
|
||||
resource client.Object
|
||||
wantResult bool
|
||||
}{
|
||||
{
|
||||
expression: `res.metadata.name == 'test-resource' && req.target.repository == 'hello-world'`,
|
||||
request: testNewHTTPRequest(t, http.MethodPost, "/test", map[string]any{
|
||||
"target": map[string]any{
|
||||
"repository": "hello-world",
|
||||
},
|
||||
}),
|
||||
resource: &apiv1.Receiver{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: apiv1.ReceiverKind,
|
||||
APIVersion: apiv1.GroupVersion.String(),
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-resource",
|
||||
},
|
||||
},
|
||||
wantResult: true,
|
||||
},
|
||||
{
|
||||
expression: `req.bool == true`,
|
||||
request: testNewHTTPRequest(t, http.MethodPost, "/test", map[string]any{
|
||||
"bool": true,
|
||||
}),
|
||||
resource: &apiv1.Receiver{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: apiv1.ReceiverKind,
|
||||
APIVersion: apiv1.GroupVersion.String(),
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-resource",
|
||||
},
|
||||
},
|
||||
wantResult: true,
|
||||
},
|
||||
{
|
||||
expression: `res.metadata.name == 'test-resource' && req.image.source.split(':').last().value().startsWith('v')`,
|
||||
request: testNewHTTPRequest(t, http.MethodPost, "/test", map[string]any{
|
||||
"image": map[string]any{
|
||||
"source": "hello-world:v1.0.0",
|
||||
},
|
||||
}),
|
||||
resource: &apiv1.Receiver{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: apiv1.ReceiverKind,
|
||||
APIVersion: apiv1.GroupVersion.String(),
|
||||
},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "test-resource",
|
||||
},
|
||||
},
|
||||
wantResult: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range evaluationTests {
|
||||
t.Run(tt.expression, func(t *testing.T) {
|
||||
g := NewWithT(t)
|
||||
evaluator, err := newResourceFilter(tt.expression, tt.request)
|
||||
g.Expect(err).To(Succeed())
|
||||
|
||||
result, err := evaluator(context.Background(), tt.resource)
|
||||
g.Expect(err).To(Succeed())
|
||||
g.Expect(result).To(Equal(&tt.wantResult))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testNewHTTPRequest(t *testing.T, method, target string, body map[string]any) *http.Request {
|
||||
var httpBody io.Reader
|
||||
g := NewWithT(t)
|
||||
if body != nil {
|
||||
b, err := json.Marshal(body)
|
||||
g.Expect(err).To(Succeed())
|
||||
httpBody = bytes.NewReader(b)
|
||||
}
|
||||
|
||||
req, err := http.NewRequest(method, target, httpBody)
|
||||
g.Expect(err).To(Succeed())
|
||||
|
||||
if httpBody != nil {
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
}
|
||||
|
||||
return req
|
||||
|
||||
}
|
||||
|
|
@ -53,7 +53,7 @@ func NewReceiverServer(port string, logger logr.Logger, kubeClient client.Client
|
|||
// ListenAndServe starts the HTTP server on the specified port
|
||||
func (s *ReceiverServer) ListenAndServe(stopCh <-chan struct{}, mdlw middleware.Middleware) {
|
||||
mux := http.NewServeMux()
|
||||
mux.Handle(apiv1.ReceiverWebhookPath, http.HandlerFunc(s.handlePayload()))
|
||||
mux.Handle(apiv1.ReceiverWebhookPath, http.HandlerFunc(s.handlePayload))
|
||||
handlerID := apiv1.ReceiverWebhookPath
|
||||
if s.exportHTTPPathMetrics {
|
||||
handlerID = ""
|
||||
|
|
|
|||
Loading…
Reference in New Issue