mirror of https://github.com/knative/eventing.git
Compare commits
12 Commits
Author | SHA1 | Date |
---|---|---|
|
eb58703061 | |
|
d01256fa32 | |
|
4e48320ac7 | |
|
34d05f30f2 | |
|
b4c59bb301 | |
|
85ca388bf8 | |
|
5b7b03053a | |
|
a017fc0ed0 | |
|
93cc440c99 | |
|
c138419361 | |
|
4542e6bf08 | |
|
cbdf86e94b |
|
@ -1,16 +1,11 @@
|
||||||
|
version: "2"
|
||||||
run:
|
run:
|
||||||
timeout: 10m
|
|
||||||
|
|
||||||
build-tags:
|
build-tags:
|
||||||
- e2e
|
- e2e
|
||||||
- probe
|
- probe
|
||||||
- preupgrade
|
- preupgrade
|
||||||
- postupgrade
|
- postupgrade
|
||||||
- postdowngrade
|
- postdowngrade
|
||||||
|
|
||||||
skip-dirs:
|
|
||||||
- pkg/client
|
|
||||||
|
|
||||||
linters:
|
linters:
|
||||||
enable:
|
enable:
|
||||||
- asciicheck
|
- asciicheck
|
||||||
|
@ -20,10 +15,28 @@ linters:
|
||||||
- unparam
|
- unparam
|
||||||
disable:
|
disable:
|
||||||
- errcheck
|
- errcheck
|
||||||
|
exclusions:
|
||||||
issues:
|
generated: lax
|
||||||
exclude-rules:
|
presets:
|
||||||
- path: test # Excludes /test, *_test.go etc.
|
- comments
|
||||||
linters:
|
- common-false-positives
|
||||||
|
- legacy
|
||||||
|
- std-error-handling
|
||||||
|
rules:
|
||||||
|
- linters:
|
||||||
- gosec
|
- gosec
|
||||||
- unparam
|
- unparam
|
||||||
|
path: test
|
||||||
|
paths:
|
||||||
|
- third_party$
|
||||||
|
- builtin$
|
||||||
|
- examples$
|
||||||
|
- pkg/client
|
||||||
|
formatters:
|
||||||
|
exclusions:
|
||||||
|
generated: lax
|
||||||
|
paths:
|
||||||
|
- third_party$
|
||||||
|
- builtin$
|
||||||
|
- examples$
|
||||||
|
- pkg/client
|
||||||
|
|
|
@ -79,6 +79,19 @@ rules:
|
||||||
---
|
---
|
||||||
kind: ClusterRole
|
kind: ClusterRole
|
||||||
apiVersion: rbac.authorization.k8s.io/v1
|
apiVersion: rbac.authorization.k8s.io/v1
|
||||||
|
metadata:
|
||||||
|
name: knative-sinks-namespaced-admin
|
||||||
|
labels:
|
||||||
|
rbac.authorization.k8s.io/aggregate-to-admin: "true"
|
||||||
|
app.kubernetes.io/version: devel
|
||||||
|
app.kubernetes.io/name: knative-eventing
|
||||||
|
rules:
|
||||||
|
- apiGroups: ["sinks.knative.dev"]
|
||||||
|
resources: ["*"]
|
||||||
|
verbs: ["*"]
|
||||||
|
---
|
||||||
|
kind: ClusterRole
|
||||||
|
apiVersion: rbac.authorization.k8s.io/v1
|
||||||
metadata:
|
metadata:
|
||||||
name: knative-eventing-namespaced-edit
|
name: knative-eventing-namespaced-edit
|
||||||
labels:
|
labels:
|
||||||
|
@ -86,7 +99,7 @@ metadata:
|
||||||
app.kubernetes.io/version: devel
|
app.kubernetes.io/version: devel
|
||||||
app.kubernetes.io/name: knative-eventing
|
app.kubernetes.io/name: knative-eventing
|
||||||
rules:
|
rules:
|
||||||
- apiGroups: ["eventing.knative.dev", "messaging.knative.dev", "sources.knative.dev", "flows.knative.dev", "bindings.knative.dev"]
|
- apiGroups: ["eventing.knative.dev", "messaging.knative.dev", "sources.knative.dev", "flows.knative.dev", "bindings.knative.dev", "sinks.knative.dev"]
|
||||||
resources: ["*"]
|
resources: ["*"]
|
||||||
verbs: ["create", "update", "patch", "delete"]
|
verbs: ["create", "update", "patch", "delete"]
|
||||||
---
|
---
|
||||||
|
@ -99,6 +112,6 @@ metadata:
|
||||||
app.kubernetes.io/version: devel
|
app.kubernetes.io/version: devel
|
||||||
app.kubernetes.io/name: knative-eventing
|
app.kubernetes.io/name: knative-eventing
|
||||||
rules:
|
rules:
|
||||||
- apiGroups: ["eventing.knative.dev", "messaging.knative.dev", "sources.knative.dev", "flows.knative.dev", "bindings.knative.dev"]
|
- apiGroups: ["eventing.knative.dev", "messaging.knative.dev", "sources.knative.dev", "flows.knative.dev", "bindings.knative.dev", "sinks.knative.dev"]
|
||||||
resources: ["*"]
|
resources: ["*"]
|
||||||
verbs: ["get", "list", "watch"]
|
verbs: ["get", "list", "watch"]
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -1,5 +1,7 @@
|
||||||
module knative.dev/eventing
|
module knative.dev/eventing
|
||||||
|
|
||||||
|
// A placeholder comment to rebuild with Go 1.23.5 toolchain to cover CVEs
|
||||||
|
|
||||||
go 1.22.7
|
go 1.22.7
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
|
|
@ -33,7 +33,7 @@ type AWSCommon struct {
|
||||||
|
|
||||||
type AWSS3 struct {
|
type AWSS3 struct {
|
||||||
AWSCommon `json:",inline"` // Embeds AWSCommon to inherit its fields in JSON
|
AWSCommon `json:",inline"` // Embeds AWSCommon to inherit its fields in JSON
|
||||||
Arn string `json:"arn,omitempty" camel:"CAMEL_KAMELET_AWS_S3_SOURCE_BUCKETNAMEORARN"` // S3 ARN
|
Arn string `json:"arn,omitempty" camel:"BUCKET_NAME_OR_ARN"` // S3 ARN
|
||||||
DeleteAfterRead bool `json:"deleteAfterRead" default:"true"` // Auto-delete objects after reading
|
DeleteAfterRead bool `json:"deleteAfterRead" default:"true"` // Auto-delete objects after reading
|
||||||
MoveAfterRead bool `json:"moveAfterRead" default:"false"` // Move objects after reading
|
MoveAfterRead bool `json:"moveAfterRead" default:"false"` // Move objects after reading
|
||||||
DestinationBucket string `json:"destinationBucket,omitempty"` // Destination bucket for moved objects
|
DestinationBucket string `json:"destinationBucket,omitempty"` // Destination bucket for moved objects
|
||||||
|
@ -49,10 +49,10 @@ type AWSS3 struct {
|
||||||
|
|
||||||
type AWSSQS struct {
|
type AWSSQS struct {
|
||||||
AWSCommon `json:",inline"` // Embeds AWSCommon to inherit its fields in JSON
|
AWSCommon `json:",inline"` // Embeds AWSCommon to inherit its fields in JSON
|
||||||
Arn string `json:"arn,omitempty" camel:"CAMEL_KAMELET_AWS_SQS_SOURCE_QUEUENAMEORARN"` // SQS ARN
|
Arn string `json:"arn,omitempty" camel:"QUEUE_NAME_OR_ARN"` // SQS ARN
|
||||||
DeleteAfterRead bool `json:"deleteAfterRead" default:"true"` // Auto-delete messages after reading
|
DeleteAfterRead bool `json:"deleteAfterRead" default:"true"` // Auto-delete messages after reading
|
||||||
AutoCreateQueue bool `json:"autoCreateQueue" default:"false"` // Auto-create SQS queue
|
AutoCreateQueue bool `json:"autoCreateQueue" default:"false"` // Auto-create SQS queue
|
||||||
Host string `json:"host" camel:"CAMEL_KAMELET_AWS_SQS_SOURCE_AMAZONAWSHOST" default:"amazonaws.com"` // AWS host
|
Host string `json:"host" camel:"AMAZONAWSHOST" default:"amazonaws.com"` // AWS host
|
||||||
Protocol string `json:"protocol" default:"https"` // Communication protocol (http/https)
|
Protocol string `json:"protocol" default:"https"` // Communication protocol (http/https)
|
||||||
QueueURL string `json:"queueURL,omitempty"` // Full SQS queue URL
|
QueueURL string `json:"queueURL,omitempty"` // Full SQS queue URL
|
||||||
Greedy bool `json:"greedy" default:"false"` // Greedy scheduler
|
Greedy bool `json:"greedy" default:"false"` // Greedy scheduler
|
||||||
|
@ -71,6 +71,6 @@ type AWSDDBStreams struct {
|
||||||
|
|
||||||
type AWSSNS struct {
|
type AWSSNS struct {
|
||||||
AWSCommon `json:",inline"` // Embeds AWSCommon to inherit its fields in JSON
|
AWSCommon `json:",inline"` // Embeds AWSCommon to inherit its fields in JSON
|
||||||
Arn string `json:"arn,omitempty" camel:"CAMEL_KAMELET_AWS_SNS_SINK_TOPICNAMEORARN"` // SNS ARN
|
Arn string `json:"arn,omitempty" camel:"TOPIC_NAME_OR_ARN"` // SNS ARN
|
||||||
AutoCreateTopic bool `json:"autoCreateTopic" default:"false"` // Auto-create SNS topic
|
AutoCreateTopic bool `json:"autoCreateTopic" default:"false"` // Auto-create SNS topic
|
||||||
}
|
}
|
||||||
|
|
|
@ -425,8 +425,8 @@ func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger
|
||||||
subscriberURI := trigger.Status.SubscriberURI
|
subscriberURI := trigger.Status.SubscriberURI
|
||||||
if subscriberURI == nil {
|
if subscriberURI == nil {
|
||||||
// Record the event count.
|
// Record the event count.
|
||||||
writer.WriteHeader(http.StatusBadRequest)
|
writer.WriteHeader(http.StatusNotFound)
|
||||||
_ = h.reporter.ReportEventCount(reportArgs, http.StatusBadRequest)
|
_ = h.reporter.ReportEventCount(reportArgs, http.StatusNotFound)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -138,7 +138,7 @@ func TestReceiver(t *testing.T) {
|
||||||
triggers: []*eventingv1.Trigger{
|
triggers: []*eventingv1.Trigger{
|
||||||
makeTrigger(withoutSubscriberURI()),
|
makeTrigger(withoutSubscriberURI()),
|
||||||
},
|
},
|
||||||
expectedStatus: http.StatusBadRequest,
|
expectedStatus: http.StatusNotFound,
|
||||||
expectedEventCount: true,
|
expectedEventCount: true,
|
||||||
},
|
},
|
||||||
"Trigger without a Filter": {
|
"Trigger without a Filter": {
|
||||||
|
|
|
@ -25,11 +25,11 @@ import (
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
"knative.dev/pkg/apis"
|
"knative.dev/pkg/apis"
|
||||||
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap"
|
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap"
|
||||||
endpointsinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/endpoints"
|
|
||||||
"knative.dev/pkg/configmap"
|
"knative.dev/pkg/configmap"
|
||||||
"knative.dev/pkg/controller"
|
"knative.dev/pkg/controller"
|
||||||
"knative.dev/pkg/injection/clients/dynamicclient"
|
"knative.dev/pkg/injection/clients/dynamicclient"
|
||||||
secretinformer "knative.dev/pkg/injection/clients/namespacedkube/informers/core/v1/secret"
|
secretinformer "knative.dev/pkg/injection/clients/namespacedkube/informers/core/v1/secret"
|
||||||
|
namespacedinformerfactory "knative.dev/pkg/injection/clients/namespacedkube/informers/factory"
|
||||||
"knative.dev/pkg/logging"
|
"knative.dev/pkg/logging"
|
||||||
pkgreconciler "knative.dev/pkg/reconciler"
|
pkgreconciler "knative.dev/pkg/reconciler"
|
||||||
"knative.dev/pkg/resolver"
|
"knative.dev/pkg/resolver"
|
||||||
|
@ -69,7 +69,12 @@ func NewController(
|
||||||
logger := logging.FromContext(ctx)
|
logger := logging.FromContext(ctx)
|
||||||
brokerInformer := brokerinformer.Get(ctx)
|
brokerInformer := brokerinformer.Get(ctx)
|
||||||
subscriptionInformer := subscriptioninformer.Get(ctx)
|
subscriptionInformer := subscriptioninformer.Get(ctx)
|
||||||
endpointsInformer := endpointsinformer.Get(ctx)
|
|
||||||
|
endpointsInformer := namespacedinformerfactory.Get(ctx).Core().V1().Endpoints()
|
||||||
|
if err := controller.StartInformers(ctx.Done(), endpointsInformer.Informer()); err != nil {
|
||||||
|
logger.Fatalw("Failed to start namespaced endpoints informer", zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
configmapInformer := configmapinformer.Get(ctx)
|
configmapInformer := configmapinformer.Get(ctx)
|
||||||
secretInformer := secretinformer.Get(ctx)
|
secretInformer := secretinformer.Get(ctx)
|
||||||
eventPolicyInformer := eventpolicyinformer.Get(ctx)
|
eventPolicyInformer := eventpolicyinformer.Get(ctx)
|
||||||
|
|
|
@ -109,8 +109,8 @@ func (r *Reconciler) reconcileReceiveAdapter(ctx context.Context, source *v1.Con
|
||||||
return nil, fmt.Errorf("getting Deployment: %v", err)
|
return nil, fmt.Errorf("getting Deployment: %v", err)
|
||||||
} else if !metav1.IsControlledBy(ra, source) {
|
} else if !metav1.IsControlledBy(ra, source) {
|
||||||
return nil, fmt.Errorf("deployment %q is not owned by ContainerSource %q", ra.Name, source.Name)
|
return nil, fmt.Errorf("deployment %q is not owned by ContainerSource %q", ra.Name, source.Name)
|
||||||
} else if r.podSpecChanged(&ra.Spec.Template.Spec, &expected.Spec.Template.Spec) {
|
} else if r.podTemplateChanged(&ra.Spec.Template, &expected.Spec.Template) {
|
||||||
ra.Spec.Template.Spec = expected.Spec.Template.Spec
|
ra.Spec.Template = expected.Spec.Template
|
||||||
ra, err = r.kubeClientSet.AppsV1().Deployments(expected.Namespace).Update(ctx, ra, metav1.UpdateOptions{})
|
ra, err = r.kubeClientSet.AppsV1().Deployments(expected.Namespace).Update(ctx, ra, metav1.UpdateOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("updating Deployment: %v", err)
|
return nil, fmt.Errorf("updating Deployment: %v", err)
|
||||||
|
@ -159,6 +159,10 @@ func (r *Reconciler) podSpecChanged(have *corev1.PodSpec, want *corev1.PodSpec)
|
||||||
return !equality.Semantic.DeepDerivative(want, have)
|
return !equality.Semantic.DeepDerivative(want, have)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *Reconciler) podTemplateChanged(have *corev1.PodTemplateSpec, want *corev1.PodTemplateSpec) bool {
|
||||||
|
return !equality.Semantic.DeepDerivative(want, have)
|
||||||
|
}
|
||||||
|
|
||||||
func (r *Reconciler) sinkBindingSpecChanged(have *v1.SinkBindingSpec, want *v1.SinkBindingSpec) bool {
|
func (r *Reconciler) sinkBindingSpecChanged(have *v1.SinkBindingSpec, want *v1.SinkBindingSpec) bool {
|
||||||
return !equality.Semantic.DeepDerivative(want, have)
|
return !equality.Semantic.DeepDerivative(want, have)
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,7 +54,9 @@ func GenerateEnvVarsFromStruct(prefix string, s interface{}) []corev1.EnvVar {
|
||||||
|
|
||||||
// First, check for the custom 'camel' tag
|
// First, check for the custom 'camel' tag
|
||||||
envVarName := fieldType.Tag.Get("camel")
|
envVarName := fieldType.Tag.Get("camel")
|
||||||
if envVarName == "" {
|
if envVarName != "" {
|
||||||
|
envVarName = fmt.Sprintf("%s_%s", prefix, envVarName)
|
||||||
|
} else {
|
||||||
// If 'camel' tag is not present, fall back to the 'json' tag or Go field name
|
// If 'camel' tag is not present, fall back to the 'json' tag or Go field name
|
||||||
jsonTag := fieldType.Tag.Get("json")
|
jsonTag := fieldType.Tag.Get("json")
|
||||||
tagName := strings.Split(jsonTag, ",")[0]
|
tagName := strings.Split(jsonTag, ",")[0]
|
||||||
|
|
|
@ -53,25 +53,77 @@ func TestGenerateEnvVarsFromStruct(t *testing.T) {
|
||||||
|
|
||||||
func TestGenerateEnvVarsFromStruct_S3WithCamelTag(t *testing.T) {
|
func TestGenerateEnvVarsFromStruct_S3WithCamelTag(t *testing.T) {
|
||||||
type AWSS3 struct {
|
type AWSS3 struct {
|
||||||
Arn string `json:"arn,omitempty" camel:"CAMEL_KAMELET_AWS_S3_SOURCE_BUCKETNAMEORARN"`
|
Arn string `json:"arn,omitempty" camel:"BUCKETNAMEORARN"`
|
||||||
Region string `json:"region,omitempty"`
|
Region string `json:"region,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
prefix := "CAMEL_KAMELET_AWS_S3_SOURCE"
|
type AWSSQS struct {
|
||||||
input := AWSS3{
|
Arn string `json:"arn,omitempty" camel:"QUEUENAMEORARN"`
|
||||||
|
Region string `json:"region,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
prefix string
|
||||||
|
input interface{}
|
||||||
|
want []corev1.EnvVar
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "S3 Source with Camel Tag",
|
||||||
|
prefix: "CAMEL_KAMELET_AWS_S3_SOURCE",
|
||||||
|
input: AWSS3{
|
||||||
Arn: "arn:aws:s3:::example-bucket",
|
Arn: "arn:aws:s3:::example-bucket",
|
||||||
Region: "us-west-2",
|
Region: "us-west-2",
|
||||||
}
|
},
|
||||||
|
want: []corev1.EnvVar{
|
||||||
// Expected environment variables including SSL settings and camel tag for Arn
|
|
||||||
want := []corev1.EnvVar{
|
|
||||||
{Name: "CAMEL_KAMELET_AWS_S3_SOURCE_BUCKETNAMEORARN", Value: "arn:aws:s3:::example-bucket"},
|
{Name: "CAMEL_KAMELET_AWS_S3_SOURCE_BUCKETNAMEORARN", Value: "arn:aws:s3:::example-bucket"},
|
||||||
{Name: "CAMEL_KAMELET_AWS_S3_SOURCE_REGION", Value: "us-west-2"},
|
{Name: "CAMEL_KAMELET_AWS_S3_SOURCE_REGION", Value: "us-west-2"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "S3 Sink with Camel Tag",
|
||||||
|
prefix: "CAMEL_KAMELET_AWS_S3_SINK",
|
||||||
|
input: AWSS3{
|
||||||
|
Arn: "arn:aws:s3:::another-bucket",
|
||||||
|
Region: "eu-central-1",
|
||||||
|
},
|
||||||
|
want: []corev1.EnvVar{
|
||||||
|
{Name: "CAMEL_KAMELET_AWS_S3_SINK_BUCKETNAMEORARN", Value: "arn:aws:s3:::another-bucket"},
|
||||||
|
{Name: "CAMEL_KAMELET_AWS_S3_SINK_REGION", Value: "eu-central-1"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "SQS Source with Camel Tag",
|
||||||
|
prefix: "CAMEL_KAMELET_AWS_SQS_SOURCE",
|
||||||
|
input: AWSSQS{
|
||||||
|
Arn: "arn:aws:sqs:::example-queue",
|
||||||
|
Region: "us-east-1",
|
||||||
|
},
|
||||||
|
want: []corev1.EnvVar{
|
||||||
|
{Name: "CAMEL_KAMELET_AWS_SQS_SOURCE_QUEUENAMEORARN", Value: "arn:aws:sqs:::example-queue"},
|
||||||
|
{Name: "CAMEL_KAMELET_AWS_SQS_SOURCE_REGION", Value: "us-east-1"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "SQS Sink with Camel Tag",
|
||||||
|
prefix: "CAMEL_KAMELET_AWS_SQS_SINK",
|
||||||
|
input: AWSSQS{
|
||||||
|
Arn: "arn:aws:sqs:::another-queue",
|
||||||
|
Region: "ap-southeast-1",
|
||||||
|
},
|
||||||
|
want: []corev1.EnvVar{
|
||||||
|
{Name: "CAMEL_KAMELET_AWS_SQS_SINK_QUEUENAMEORARN", Value: "arn:aws:sqs:::another-queue"},
|
||||||
|
{Name: "CAMEL_KAMELET_AWS_SQS_SINK_REGION", Value: "ap-southeast-1"},
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
got := GenerateEnvVarsFromStruct(prefix, input)
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
if diff := cmp.Diff(want, got); diff != "" {
|
got := GenerateEnvVarsFromStruct(tt.prefix, tt.input)
|
||||||
t.Errorf("generateEnvVarsFromStruct_S3WithCamelTag() mismatch (-want +got):\n%s", diff)
|
if diff := cmp.Diff(tt.want, got); diff != "" {
|
||||||
|
t.Errorf("GenerateEnvVarsFromStruct() mismatch (-want +got):\n%s", diff)
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -78,7 +78,9 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, source *v1alpha1.Integra
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Reconciler) reconcileContainerSource(ctx context.Context, source *v1alpha1.IntegrationSource) (*v1.ContainerSource, error) {
|
func (r *Reconciler) reconcileContainerSource(ctx context.Context, source *v1alpha1.IntegrationSource) (*v1.ContainerSource, error) {
|
||||||
expected := resources.NewContainerSource(source, feature.FromContext(ctx).IsOIDCAuthentication())
|
// set the OIDC to true only if the feature is enabled and the sink audience is set
|
||||||
|
// to prevent container environment vars to be set, just when the config is on.
|
||||||
|
expected := resources.NewContainerSource(source, feature.FromContext(ctx).IsOIDCAuthentication() && source.Status.SinkAudience != nil)
|
||||||
|
|
||||||
cs, err := r.containerSourceLister.ContainerSources(source.Namespace).Get(expected.Name)
|
cs, err := r.containerSourceLister.ContainerSources(source.Namespace).Get(expected.Name)
|
||||||
if apierrors.IsNotFound(err) {
|
if apierrors.IsNotFound(err) {
|
||||||
|
|
|
@ -189,7 +189,7 @@ func TestReconcile(t *testing.T) {
|
||||||
WithIntegrationSourceSpec(makeIntegrationSourceSpec(sinkDest)),
|
WithIntegrationSourceSpec(makeIntegrationSourceSpec(sinkDest)),
|
||||||
WithInitIntegrationSourceConditions,
|
WithInitIntegrationSourceConditions,
|
||||||
WithIntegrationSourceStatusObservedGeneration(generation),
|
WithIntegrationSourceStatusObservedGeneration(generation),
|
||||||
WithIntegrationSourcePropagateContainerSourceStatus(makeContainerSourceStatus(&conditionTrue)),
|
WithIntegrationSourcePropagateContainerSourceStatus(makeContainerSourceStatusOIDC(&conditionTrue)),
|
||||||
WithIntegrationSourceOIDCServiceAccountName(getOIDCServiceAccountNameForContainerSource()),
|
WithIntegrationSourceOIDCServiceAccountName(getOIDCServiceAccountNameForContainerSource()),
|
||||||
),
|
),
|
||||||
}},
|
}},
|
||||||
|
@ -218,39 +218,6 @@ func TestReconcile(t *testing.T) {
|
||||||
|
|
||||||
func makeContainerSourceOIDC(source *sourcesv1alpha1.IntegrationSource, ready *corev1.ConditionStatus) *sourcesv1.ContainerSource {
|
func makeContainerSourceOIDC(source *sourcesv1alpha1.IntegrationSource, ready *corev1.ConditionStatus) *sourcesv1.ContainerSource {
|
||||||
cs := makeContainerSource(source, ready)
|
cs := makeContainerSource(source, ready)
|
||||||
|
|
||||||
// replace all env_vars for inserting the OIDC ones at the right order/index
|
|
||||||
cs.Spec.Template.Spec.Containers[0].Env = []corev1.EnvVar{
|
|
||||||
{
|
|
||||||
Name: "CAMEL_KNATIVE_CLIENT_SSL_ENABLED",
|
|
||||||
Value: "true",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Name: "CAMEL_KNATIVE_CLIENT_SSL_CERT_PATH",
|
|
||||||
Value: "/knative-custom-certs/knative-eventing-bundle.pem",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Name: "CAMEL_KNATIVE_CLIENT_OIDC_ENABLED",
|
|
||||||
Value: "true",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Name: "CAMEL_KNATIVE_CLIENT_OIDC_TOKEN_PATH",
|
|
||||||
Value: "file:///oidc/token",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Name: "CAMEL_KAMELET_TIMER_SOURCE_PERIOD",
|
|
||||||
Value: "1000",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Name: "CAMEL_KAMELET_TIMER_SOURCE_MESSAGE",
|
|
||||||
Value: "Hallo",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Name: "CAMEL_KAMELET_TIMER_SOURCE_REPEATCOUNT",
|
|
||||||
Value: "0",
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
cs.Status = *makeContainerSourceStatusOIDC(ready)
|
cs.Status = *makeContainerSourceStatusOIDC(ready)
|
||||||
|
|
||||||
return cs
|
return cs
|
||||||
|
|
|
@ -66,6 +66,8 @@ func (f SchedulerFunc) Schedule(ctx context.Context, vpod VPod) ([]duckv1alpha1.
|
||||||
// VPod represents virtual replicas placed into real Kubernetes pods
|
// VPod represents virtual replicas placed into real Kubernetes pods
|
||||||
// The scheduler is responsible for placing VPods
|
// The scheduler is responsible for placing VPods
|
||||||
type VPod interface {
|
type VPod interface {
|
||||||
|
GetDeletionTimestamp() *metav1.Time
|
||||||
|
|
||||||
// GetKey returns the VPod key (namespace/name).
|
// GetKey returns the VPod key (namespace/name).
|
||||||
GetKey() types.NamespacedName
|
GetKey() types.NamespacedName
|
||||||
|
|
||||||
|
|
|
@ -285,6 +285,7 @@ func (s *State) MarshalJSON() ([]byte, error) {
|
||||||
StatefulSetName string `json:"statefulSetName"`
|
StatefulSetName string `json:"statefulSetName"`
|
||||||
PodSpread map[string]map[string]int32 `json:"podSpread"`
|
PodSpread map[string]map[string]int32 `json:"podSpread"`
|
||||||
Pending map[string]int32 `json:"pending"`
|
Pending map[string]int32 `json:"pending"`
|
||||||
|
ExpectedVReplicaByVPod map[string]int32 `json:"expectedVReplicaByVPod"`
|
||||||
}
|
}
|
||||||
|
|
||||||
sj := S{
|
sj := S{
|
||||||
|
@ -295,6 +296,7 @@ func (s *State) MarshalJSON() ([]byte, error) {
|
||||||
StatefulSetName: s.StatefulSetName,
|
StatefulSetName: s.StatefulSetName,
|
||||||
PodSpread: ToJSONable(s.PodSpread),
|
PodSpread: ToJSONable(s.PodSpread),
|
||||||
Pending: toJSONablePending(s.Pending),
|
Pending: toJSONablePending(s.Pending),
|
||||||
|
ExpectedVReplicaByVPod: toJSONablePending(s.ExpectedVReplicaByVPod),
|
||||||
}
|
}
|
||||||
|
|
||||||
return json.Marshal(sj)
|
return json.Marshal(sj)
|
||||||
|
|
|
@ -205,10 +205,6 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool) err
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Debugw("checking adapter capacity",
|
|
||||||
zap.Int32("replicas", scale.Spec.Replicas),
|
|
||||||
zap.Any("state", state))
|
|
||||||
|
|
||||||
newReplicas := integer.Int32Max(int32(math.Ceil(float64(state.TotalExpectedVReplicas())/float64(state.Capacity))), a.minReplicas)
|
newReplicas := integer.Int32Max(int32(math.Ceil(float64(state.TotalExpectedVReplicas())/float64(state.Capacity))), a.minReplicas)
|
||||||
|
|
||||||
// Only scale down if permitted
|
// Only scale down if permitted
|
||||||
|
@ -216,6 +212,12 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool) err
|
||||||
newReplicas = scale.Spec.Replicas
|
newReplicas = scale.Spec.Replicas
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.Debugw("checking adapter capacity",
|
||||||
|
zap.Bool("attemptScaleDown", attemptScaleDown),
|
||||||
|
zap.Int32("replicas", scale.Spec.Replicas),
|
||||||
|
zap.Int32("newReplicas", newReplicas),
|
||||||
|
zap.Any("state", state))
|
||||||
|
|
||||||
if newReplicas != scale.Spec.Replicas {
|
if newReplicas != scale.Spec.Replicas {
|
||||||
scale.Spec.Replicas = newReplicas
|
scale.Spec.Replicas = newReplicas
|
||||||
logger.Infow("updating adapter replicas", zap.Int32("replicas", scale.Spec.Replicas))
|
logger.Infow("updating adapter replicas", zap.Int32("replicas", scale.Spec.Replicas))
|
||||||
|
|
|
@ -21,6 +21,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -114,6 +115,11 @@ type StatefulSetScheduler struct {
|
||||||
// replicas is the (cached) number of statefulset replicas.
|
// replicas is the (cached) number of statefulset replicas.
|
||||||
replicas int32
|
replicas int32
|
||||||
|
|
||||||
|
// isLeader signals whether a given Scheduler instance is leader or not.
|
||||||
|
// The autoscaler is considered the leader when ephemeralLeaderElectionObject is in a
|
||||||
|
// bucket where we've been promoted.
|
||||||
|
isLeader atomic.Bool
|
||||||
|
|
||||||
// reserved tracks vreplicas that have been placed (ie. scheduled) but haven't been
|
// reserved tracks vreplicas that have been placed (ie. scheduled) but haven't been
|
||||||
// committed yet (ie. not appearing in vpodLister)
|
// committed yet (ie. not appearing in vpodLister)
|
||||||
reserved map[types.NamespacedName]map[string]int32
|
reserved map[types.NamespacedName]map[string]int32
|
||||||
|
@ -130,6 +136,9 @@ func (s *StatefulSetScheduler) Promote(b reconciler.Bucket, enq func(reconciler.
|
||||||
if !b.Has(ephemeralLeaderElectionObject) {
|
if !b.Has(ephemeralLeaderElectionObject) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
// The demoted bucket has the ephemeralLeaderElectionObject, so we are not leader anymore.
|
||||||
|
// Flip the flag after running initReserved.
|
||||||
|
defer s.isLeader.Store(true)
|
||||||
|
|
||||||
if v, ok := s.autoscaler.(reconciler.LeaderAware); ok {
|
if v, ok := s.autoscaler.(reconciler.LeaderAware); ok {
|
||||||
return v.Promote(b, enq)
|
return v.Promote(b, enq)
|
||||||
|
@ -151,6 +160,9 @@ func (s *StatefulSetScheduler) initReserved() error {
|
||||||
|
|
||||||
s.reserved = make(map[types.NamespacedName]map[string]int32, len(vPods))
|
s.reserved = make(map[types.NamespacedName]map[string]int32, len(vPods))
|
||||||
for _, vPod := range vPods {
|
for _, vPod := range vPods {
|
||||||
|
if !vPod.GetDeletionTimestamp().IsZero() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
s.reserved[vPod.GetKey()] = make(map[string]int32, len(vPod.GetPlacements()))
|
s.reserved[vPod.GetKey()] = make(map[string]int32, len(vPod.GetPlacements()))
|
||||||
for _, placement := range vPod.GetPlacements() {
|
for _, placement := range vPod.GetPlacements() {
|
||||||
s.reserved[vPod.GetKey()][placement.PodName] += placement.VReplicas
|
s.reserved[vPod.GetKey()][placement.PodName] += placement.VReplicas
|
||||||
|
@ -159,8 +171,41 @@ func (s *StatefulSetScheduler) initReserved() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// resyncReserved removes deleted vPods from reserved to keep the state consistent when leadership
|
||||||
|
// changes (Promote / Demote).
|
||||||
|
// initReserved is not enough since the vPod lister can be stale.
|
||||||
|
func (s *StatefulSetScheduler) resyncReserved() error {
|
||||||
|
if !s.isLeader.Load() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
vPods, err := s.vpodLister()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to list vPods during reserved resync: %w", err)
|
||||||
|
}
|
||||||
|
vPodsByK := vPodsByKey(vPods)
|
||||||
|
|
||||||
|
s.reservedMu.Lock()
|
||||||
|
defer s.reservedMu.Unlock()
|
||||||
|
|
||||||
|
for key := range s.reserved {
|
||||||
|
vPod, ok := vPodsByK[key]
|
||||||
|
if !ok || vPod == nil {
|
||||||
|
delete(s.reserved, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Demote implements reconciler.LeaderAware.
|
// Demote implements reconciler.LeaderAware.
|
||||||
func (s *StatefulSetScheduler) Demote(b reconciler.Bucket) {
|
func (s *StatefulSetScheduler) Demote(b reconciler.Bucket) {
|
||||||
|
if !b.Has(ephemeralLeaderElectionObject) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// The demoted bucket has the ephemeralLeaderElectionObject, so we are not leader anymore.
|
||||||
|
defer s.isLeader.Store(false)
|
||||||
|
|
||||||
if v, ok := s.autoscaler.(reconciler.LeaderAware); ok {
|
if v, ok := s.autoscaler.(reconciler.LeaderAware); ok {
|
||||||
v.Demote(b)
|
v.Demote(b)
|
||||||
}
|
}
|
||||||
|
@ -208,6 +253,17 @@ func newStatefulSetScheduler(ctx context.Context,
|
||||||
sif.Shutdown()
|
sif.Shutdown()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-time.After(cfg.RefreshPeriod * 3):
|
||||||
|
_ = s.resyncReserved()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -561,3 +617,11 @@ func upsertPlacements(placements []duckv1alpha1.Placement, placement duckv1alpha
|
||||||
}
|
}
|
||||||
return placements
|
return placements
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func vPodsByKey(vPods []scheduler.VPod) map[types.NamespacedName]scheduler.VPod {
|
||||||
|
r := make(map[types.NamespacedName]scheduler.VPod, len(vPods))
|
||||||
|
for _, vPod := range vPods {
|
||||||
|
r[vPod.GetKey()] = vPod
|
||||||
|
}
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
|
@ -708,6 +708,21 @@ func TestStatefulsetScheduler(t *testing.T) {
|
||||||
|
|
||||||
vpodClient := tscheduler.NewVPodClient()
|
vpodClient := tscheduler.NewVPodClient()
|
||||||
vpod := vpodClient.Create(vpodNamespace, vpodName, tc.vreplicas, tc.placements)
|
vpod := vpodClient.Create(vpodNamespace, vpodName, tc.vreplicas, tc.placements)
|
||||||
|
for vPodKey, res := range tc.initialReserved {
|
||||||
|
if vPodKey.Namespace == vpodNamespace && vPodKey.Name == vpodName {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var placements []duckv1alpha1.Placement
|
||||||
|
count := int32(0)
|
||||||
|
for pod, vReplicas := range res {
|
||||||
|
count += vReplicas
|
||||||
|
placements = append(placements, duckv1alpha1.Placement{
|
||||||
|
PodName: pod,
|
||||||
|
VReplicas: vReplicas,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
vpodClient.Create(vPodKey.Namespace, vPodKey.Name, count, placements)
|
||||||
|
}
|
||||||
|
|
||||||
for i := int32(0); i < tc.replicas; i++ {
|
for i := int32(0); i < tc.replicas; i++ {
|
||||||
nodeName := "node" + fmt.Sprint(i)
|
nodeName := "node" + fmt.Sprint(i)
|
||||||
|
@ -745,7 +760,9 @@ func TestStatefulsetScheduler(t *testing.T) {
|
||||||
t.Fatal("unexpected error", err)
|
t.Fatal("unexpected error", err)
|
||||||
}
|
}
|
||||||
if tc.initialReserved != nil {
|
if tc.initialReserved != nil {
|
||||||
|
s.reservedMu.Lock()
|
||||||
s.reserved = tc.initialReserved
|
s.reserved = tc.initialReserved
|
||||||
|
s.reservedMu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Give some time for the informer to notify the scheduler and set the number of replicas
|
// Give some time for the informer to notify the scheduler and set the number of replicas
|
||||||
|
|
|
@ -57,6 +57,10 @@ func NewVPod(ns, name string, vreplicas int32, placements []duckv1alpha1.Placeme
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *sampleVPod) GetDeletionTimestamp() *metav1.Time {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (d *sampleVPod) GetKey() types.NamespacedName {
|
func (d *sampleVPod) GetKey() types.NamespacedName {
|
||||||
return d.key
|
return d.key
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue