Compare commits

...

12 Commits

Author SHA1 Message Date
Knative Prow Robot eb58703061
[release-1.17] Compare the entire PodTemplateSpec, instead of just its PodSpec (#8560)
Compare the entire PodTemplateSpec, instead of just its PodSpec

Signed-off-by: Matthias Wessendorf <mwessend@redhat.com>
Co-authored-by: Matthias Wessendorf <mwessend@redhat.com>
2025-04-15 08:12:12 +00:00
Christoph Stäbler d01256fa32
[release-1.17] Migrate golanglint-ci config to v2 (#8563)
Migrate golanglint-ci config to v2
2025-04-15 07:05:12 +00:00
Knative Prow Robot 4e48320ac7
[release-1.17] If no subscriber uri is present we return 404, instead of 400 (#8553)
If no subscriber uri is present we return 404, instead of 400 which means the request from the client itself would have had isssues

Signed-off-by: Matthias Wessendorf <mwessend@redhat.com>
Co-authored-by: Matthias Wessendorf <mwessend@redhat.com>
2025-04-10 15:47:13 +00:00
Knative Prow Robot 34d05f30f2
[release-1.17] Use more readable ENV_VAR names for Camel (#8536)
lipstick: Camel provided updates to have more readable ENV_VAR names

Signed-off-by: Matthias Wessendorf <mwessend@redhat.com>
Co-authored-by: Matthias Wessendorf <mwessend@redhat.com>
2025-03-19 17:59:15 +00:00
Knative Prow Robot b4c59bb301
[release-1.17] Fixing the way we render custom camel tags on go structs (#8529)
hammer: Fixing the way we render custom camel tags on go structs. Remove the incorrect alue of the tag and update the function that assembles the actual ENV_VARs

Signed-off-by: Matthias Wessendorf <mwessend@redhat.com>
Co-authored-by: Matthias Wessendorf <mwessend@redhat.com>
2025-03-18 10:27:13 +00:00
Knative Prow Robot 85ca388bf8
[release-1.17] Guard reserved access with lock and create vpods in tests (#8504)
* Guard reserved access with lock in tests

* Create vpods in test

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

---------

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
Co-authored-by: Pierangelo Di Pilato <pierdipi@redhat.com>
2025-02-25 10:52:21 +00:00
Knative Prow Robot 5b7b03053a
[release-1.17] Scheduler: log expected vreplicas by vpod (#8462)
Scheduler: log expected vreplicas by vpod

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
Co-authored-by: Pierangelo Di Pilato <pierdipi@redhat.com>
2025-02-14 11:44:53 +00:00
Knative Prow Robot a017fc0ed0
[release-1.17] Not just render oidc env-vars when cfg is on, we need also a present audience to ensure the camel containers work correct (#8459)
Not just render oidc env-vars when cfg is on, we need also a present audience to ensure the camel containers work correct

Signed-off-by: Matthias Wessendorf <mwessend@redhat.com>
Co-authored-by: Matthias Wessendorf <mwessend@redhat.com>
2025-02-13 07:41:51 +00:00
Knative Prow Robot 93cc440c99
[release-1.17] Scheduler: Resync reserved periodically to keep state consistent (#8453)
Scheduler: Resync reserved periodically to keep state consistent

Add resyncReserved removes deleted vPods from reserved to keep the
state consistent when leadership changes (Promote / Demote).

`initReserved` is not enough since the vPod lister can be stale.

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
Co-authored-by: Pierangelo Di Pilato <pierdipi@redhat.com>
2025-02-11 15:26:50 +00:00
Knative Prow Robot c138419361
[release-1.17] Add `sinks.knative.dev` to namespaced ClusterRole (#8433)
Add `sinks.knative.dev` to namespaced ClusterRole

These are roles that users can use to give their developers access
to Knative Eventing resources and we're missing the sinks group.

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
Co-authored-by: Pierangelo Di Pilato <pierdipi@redhat.com>
2025-02-03 08:57:27 +00:00
Knative Prow Robot 4542e6bf08
[release-1.17] Reduce mt-broker-controller memory usage with namespaced endpoint informer (#8421)
* Reduce mt-broker-controller memory usage with namespaced endpoint informer

Currently, the mt-broker-controller is using a cluster-wide endpoints
informer but it actually only uses endpoints in the "SYSTEM_NAMESPACE".

Using the namespaced informer factory ensures that the watcher
is only watching endpoints in the `knative-eventing` (also known as
`SYSTEM_NAMESPACE`) namespace.

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

* Start informer

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

---------

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
Co-authored-by: Pierangelo Di Pilato <pierdipi@redhat.com>
2025-01-22 20:16:13 +00:00
David Simansky cbdf86e94b
Trigger patch release to cover latest go cve (#8417) 2025-01-22 11:20:11 +00:00
18 changed files with 265 additions and 114 deletions

View File

@ -1,16 +1,11 @@
version: "2"
run:
timeout: 10m
build-tags:
- e2e
- probe
- preupgrade
- postupgrade
- postdowngrade
skip-dirs:
- pkg/client
linters:
enable:
- asciicheck
@ -20,10 +15,28 @@ linters:
- unparam
disable:
- errcheck
issues:
exclude-rules:
- path: test # Excludes /test, *_test.go etc.
linters:
- gosec
- unparam
exclusions:
generated: lax
presets:
- comments
- common-false-positives
- legacy
- std-error-handling
rules:
- linters:
- gosec
- unparam
path: test
paths:
- third_party$
- builtin$
- examples$
- pkg/client
formatters:
exclusions:
generated: lax
paths:
- third_party$
- builtin$
- examples$
- pkg/client

View File

@ -79,6 +79,19 @@ rules:
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: knative-sinks-namespaced-admin
labels:
rbac.authorization.k8s.io/aggregate-to-admin: "true"
app.kubernetes.io/version: devel
app.kubernetes.io/name: knative-eventing
rules:
- apiGroups: ["sinks.knative.dev"]
resources: ["*"]
verbs: ["*"]
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: knative-eventing-namespaced-edit
labels:
@ -86,7 +99,7 @@ metadata:
app.kubernetes.io/version: devel
app.kubernetes.io/name: knative-eventing
rules:
- apiGroups: ["eventing.knative.dev", "messaging.knative.dev", "sources.knative.dev", "flows.knative.dev", "bindings.knative.dev"]
- apiGroups: ["eventing.knative.dev", "messaging.knative.dev", "sources.knative.dev", "flows.knative.dev", "bindings.knative.dev", "sinks.knative.dev"]
resources: ["*"]
verbs: ["create", "update", "patch", "delete"]
---
@ -99,6 +112,6 @@ metadata:
app.kubernetes.io/version: devel
app.kubernetes.io/name: knative-eventing
rules:
- apiGroups: ["eventing.knative.dev", "messaging.knative.dev", "sources.knative.dev", "flows.knative.dev", "bindings.knative.dev"]
- apiGroups: ["eventing.knative.dev", "messaging.knative.dev", "sources.knative.dev", "flows.knative.dev", "bindings.knative.dev", "sinks.knative.dev"]
resources: ["*"]
verbs: ["get", "list", "watch"]

2
go.mod
View File

@ -1,5 +1,7 @@
module knative.dev/eventing
// A placeholder comment to rebuild with Go 1.23.5 toolchain to cover CVEs
go 1.22.7
require (

View File

@ -33,33 +33,33 @@ type AWSCommon struct {
type AWSS3 struct {
AWSCommon `json:",inline"` // Embeds AWSCommon to inherit its fields in JSON
Arn string `json:"arn,omitempty" camel:"CAMEL_KAMELET_AWS_S3_SOURCE_BUCKETNAMEORARN"` // S3 ARN
DeleteAfterRead bool `json:"deleteAfterRead" default:"true"` // Auto-delete objects after reading
MoveAfterRead bool `json:"moveAfterRead" default:"false"` // Move objects after reading
DestinationBucket string `json:"destinationBucket,omitempty"` // Destination bucket for moved objects
DestinationBucketPrefix string `json:"destinationBucketPrefix,omitempty"` // Prefix for moved objects
DestinationBucketSuffix string `json:"destinationBucketSuffix,omitempty"` // Suffix for moved objects
AutoCreateBucket bool `json:"autoCreateBucket" default:"false"` // Auto-create S3 bucket
Prefix string `json:"prefix,omitempty"` // S3 bucket prefix for search
IgnoreBody bool `json:"ignoreBody" default:"false"` // Ignore object body
ForcePathStyle bool `json:"forcePathStyle" default:"false"` // Force path style for bucket access
Delay int `json:"delay" default:"500"` // Delay between polls in milliseconds
MaxMessagesPerPoll int `json:"maxMessagesPerPoll" default:"10"` // Max messages to poll per request
Arn string `json:"arn,omitempty" camel:"BUCKET_NAME_OR_ARN"` // S3 ARN
DeleteAfterRead bool `json:"deleteAfterRead" default:"true"` // Auto-delete objects after reading
MoveAfterRead bool `json:"moveAfterRead" default:"false"` // Move objects after reading
DestinationBucket string `json:"destinationBucket,omitempty"` // Destination bucket for moved objects
DestinationBucketPrefix string `json:"destinationBucketPrefix,omitempty"` // Prefix for moved objects
DestinationBucketSuffix string `json:"destinationBucketSuffix,omitempty"` // Suffix for moved objects
AutoCreateBucket bool `json:"autoCreateBucket" default:"false"` // Auto-create S3 bucket
Prefix string `json:"prefix,omitempty"` // S3 bucket prefix for search
IgnoreBody bool `json:"ignoreBody" default:"false"` // Ignore object body
ForcePathStyle bool `json:"forcePathStyle" default:"false"` // Force path style for bucket access
Delay int `json:"delay" default:"500"` // Delay between polls in milliseconds
MaxMessagesPerPoll int `json:"maxMessagesPerPoll" default:"10"` // Max messages to poll per request
}
type AWSSQS struct {
AWSCommon `json:",inline"` // Embeds AWSCommon to inherit its fields in JSON
Arn string `json:"arn,omitempty" camel:"CAMEL_KAMELET_AWS_SQS_SOURCE_QUEUENAMEORARN"` // SQS ARN
DeleteAfterRead bool `json:"deleteAfterRead" default:"true"` // Auto-delete messages after reading
AutoCreateQueue bool `json:"autoCreateQueue" default:"false"` // Auto-create SQS queue
Host string `json:"host" camel:"CAMEL_KAMELET_AWS_SQS_SOURCE_AMAZONAWSHOST" default:"amazonaws.com"` // AWS host
Protocol string `json:"protocol" default:"https"` // Communication protocol (http/https)
QueueURL string `json:"queueURL,omitempty"` // Full SQS queue URL
Greedy bool `json:"greedy" default:"false"` // Greedy scheduler
Delay int `json:"delay" default:"500"` // Delay between polls in milliseconds
MaxMessagesPerPoll int `json:"maxMessagesPerPoll" default:"1"` // Max messages to return (1-10)
WaitTimeSeconds int `json:"waitTimeSeconds,omitempty"` // Wait time for messages
VisibilityTimeout int `json:"visibilityTimeout,omitempty"` // Visibility timeout in seconds
Arn string `json:"arn,omitempty" camel:"QUEUE_NAME_OR_ARN"` // SQS ARN
DeleteAfterRead bool `json:"deleteAfterRead" default:"true"` // Auto-delete messages after reading
AutoCreateQueue bool `json:"autoCreateQueue" default:"false"` // Auto-create SQS queue
Host string `json:"host" camel:"AMAZONAWSHOST" default:"amazonaws.com"` // AWS host
Protocol string `json:"protocol" default:"https"` // Communication protocol (http/https)
QueueURL string `json:"queueURL,omitempty"` // Full SQS queue URL
Greedy bool `json:"greedy" default:"false"` // Greedy scheduler
Delay int `json:"delay" default:"500"` // Delay between polls in milliseconds
MaxMessagesPerPoll int `json:"maxMessagesPerPoll" default:"1"` // Max messages to return (1-10)
WaitTimeSeconds int `json:"waitTimeSeconds,omitempty"` // Wait time for messages
VisibilityTimeout int `json:"visibilityTimeout,omitempty"` // Visibility timeout in seconds
}
type AWSDDBStreams struct {
@ -71,6 +71,6 @@ type AWSDDBStreams struct {
type AWSSNS struct {
AWSCommon `json:",inline"` // Embeds AWSCommon to inherit its fields in JSON
Arn string `json:"arn,omitempty" camel:"CAMEL_KAMELET_AWS_SNS_SINK_TOPICNAMEORARN"` // SNS ARN
AutoCreateTopic bool `json:"autoCreateTopic" default:"false"` // Auto-create SNS topic
Arn string `json:"arn,omitempty" camel:"TOPIC_NAME_OR_ARN"` // SNS ARN
AutoCreateTopic bool `json:"autoCreateTopic" default:"false"` // Auto-create SNS topic
}

View File

@ -425,8 +425,8 @@ func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger
subscriberURI := trigger.Status.SubscriberURI
if subscriberURI == nil {
// Record the event count.
writer.WriteHeader(http.StatusBadRequest)
_ = h.reporter.ReportEventCount(reportArgs, http.StatusBadRequest)
writer.WriteHeader(http.StatusNotFound)
_ = h.reporter.ReportEventCount(reportArgs, http.StatusNotFound)
return
}

View File

@ -138,7 +138,7 @@ func TestReceiver(t *testing.T) {
triggers: []*eventingv1.Trigger{
makeTrigger(withoutSubscriberURI()),
},
expectedStatus: http.StatusBadRequest,
expectedStatus: http.StatusNotFound,
expectedEventCount: true,
},
"Trigger without a Filter": {

View File

@ -25,11 +25,11 @@ import (
"k8s.io/client-go/tools/cache"
"knative.dev/pkg/apis"
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap"
endpointsinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/endpoints"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/injection/clients/dynamicclient"
secretinformer "knative.dev/pkg/injection/clients/namespacedkube/informers/core/v1/secret"
namespacedinformerfactory "knative.dev/pkg/injection/clients/namespacedkube/informers/factory"
"knative.dev/pkg/logging"
pkgreconciler "knative.dev/pkg/reconciler"
"knative.dev/pkg/resolver"
@ -69,7 +69,12 @@ func NewController(
logger := logging.FromContext(ctx)
brokerInformer := brokerinformer.Get(ctx)
subscriptionInformer := subscriptioninformer.Get(ctx)
endpointsInformer := endpointsinformer.Get(ctx)
endpointsInformer := namespacedinformerfactory.Get(ctx).Core().V1().Endpoints()
if err := controller.StartInformers(ctx.Done(), endpointsInformer.Informer()); err != nil {
logger.Fatalw("Failed to start namespaced endpoints informer", zap.Error(err))
}
configmapInformer := configmapinformer.Get(ctx)
secretInformer := secretinformer.Get(ctx)
eventPolicyInformer := eventpolicyinformer.Get(ctx)

View File

@ -109,8 +109,8 @@ func (r *Reconciler) reconcileReceiveAdapter(ctx context.Context, source *v1.Con
return nil, fmt.Errorf("getting Deployment: %v", err)
} else if !metav1.IsControlledBy(ra, source) {
return nil, fmt.Errorf("deployment %q is not owned by ContainerSource %q", ra.Name, source.Name)
} else if r.podSpecChanged(&ra.Spec.Template.Spec, &expected.Spec.Template.Spec) {
ra.Spec.Template.Spec = expected.Spec.Template.Spec
} else if r.podTemplateChanged(&ra.Spec.Template, &expected.Spec.Template) {
ra.Spec.Template = expected.Spec.Template
ra, err = r.kubeClientSet.AppsV1().Deployments(expected.Namespace).Update(ctx, ra, metav1.UpdateOptions{})
if err != nil {
return nil, fmt.Errorf("updating Deployment: %v", err)
@ -159,6 +159,10 @@ func (r *Reconciler) podSpecChanged(have *corev1.PodSpec, want *corev1.PodSpec)
return !equality.Semantic.DeepDerivative(want, have)
}
func (r *Reconciler) podTemplateChanged(have *corev1.PodTemplateSpec, want *corev1.PodTemplateSpec) bool {
return !equality.Semantic.DeepDerivative(want, have)
}
func (r *Reconciler) sinkBindingSpecChanged(have *v1.SinkBindingSpec, want *v1.SinkBindingSpec) bool {
return !equality.Semantic.DeepDerivative(want, have)
}

View File

@ -54,7 +54,9 @@ func GenerateEnvVarsFromStruct(prefix string, s interface{}) []corev1.EnvVar {
// First, check for the custom 'camel' tag
envVarName := fieldType.Tag.Get("camel")
if envVarName == "" {
if envVarName != "" {
envVarName = fmt.Sprintf("%s_%s", prefix, envVarName)
} else {
// If 'camel' tag is not present, fall back to the 'json' tag or Go field name
jsonTag := fieldType.Tag.Get("json")
tagName := strings.Split(jsonTag, ",")[0]

View File

@ -53,25 +53,77 @@ func TestGenerateEnvVarsFromStruct(t *testing.T) {
func TestGenerateEnvVarsFromStruct_S3WithCamelTag(t *testing.T) {
type AWSS3 struct {
Arn string `json:"arn,omitempty" camel:"CAMEL_KAMELET_AWS_S3_SOURCE_BUCKETNAMEORARN"`
Arn string `json:"arn,omitempty" camel:"BUCKETNAMEORARN"`
Region string `json:"region,omitempty"`
}
prefix := "CAMEL_KAMELET_AWS_S3_SOURCE"
input := AWSS3{
Arn: "arn:aws:s3:::example-bucket",
Region: "us-west-2",
type AWSSQS struct {
Arn string `json:"arn,omitempty" camel:"QUEUENAMEORARN"`
Region string `json:"region,omitempty"`
}
// Expected environment variables including SSL settings and camel tag for Arn
want := []corev1.EnvVar{
{Name: "CAMEL_KAMELET_AWS_S3_SOURCE_BUCKETNAMEORARN", Value: "arn:aws:s3:::example-bucket"},
{Name: "CAMEL_KAMELET_AWS_S3_SOURCE_REGION", Value: "us-west-2"},
tests := []struct {
name string
prefix string
input interface{}
want []corev1.EnvVar
}{
{
name: "S3 Source with Camel Tag",
prefix: "CAMEL_KAMELET_AWS_S3_SOURCE",
input: AWSS3{
Arn: "arn:aws:s3:::example-bucket",
Region: "us-west-2",
},
want: []corev1.EnvVar{
{Name: "CAMEL_KAMELET_AWS_S3_SOURCE_BUCKETNAMEORARN", Value: "arn:aws:s3:::example-bucket"},
{Name: "CAMEL_KAMELET_AWS_S3_SOURCE_REGION", Value: "us-west-2"},
},
},
{
name: "S3 Sink with Camel Tag",
prefix: "CAMEL_KAMELET_AWS_S3_SINK",
input: AWSS3{
Arn: "arn:aws:s3:::another-bucket",
Region: "eu-central-1",
},
want: []corev1.EnvVar{
{Name: "CAMEL_KAMELET_AWS_S3_SINK_BUCKETNAMEORARN", Value: "arn:aws:s3:::another-bucket"},
{Name: "CAMEL_KAMELET_AWS_S3_SINK_REGION", Value: "eu-central-1"},
},
},
{
name: "SQS Source with Camel Tag",
prefix: "CAMEL_KAMELET_AWS_SQS_SOURCE",
input: AWSSQS{
Arn: "arn:aws:sqs:::example-queue",
Region: "us-east-1",
},
want: []corev1.EnvVar{
{Name: "CAMEL_KAMELET_AWS_SQS_SOURCE_QUEUENAMEORARN", Value: "arn:aws:sqs:::example-queue"},
{Name: "CAMEL_KAMELET_AWS_SQS_SOURCE_REGION", Value: "us-east-1"},
},
},
{
name: "SQS Sink with Camel Tag",
prefix: "CAMEL_KAMELET_AWS_SQS_SINK",
input: AWSSQS{
Arn: "arn:aws:sqs:::another-queue",
Region: "ap-southeast-1",
},
want: []corev1.EnvVar{
{Name: "CAMEL_KAMELET_AWS_SQS_SINK_QUEUENAMEORARN", Value: "arn:aws:sqs:::another-queue"},
{Name: "CAMEL_KAMELET_AWS_SQS_SINK_REGION", Value: "ap-southeast-1"},
},
},
}
got := GenerateEnvVarsFromStruct(prefix, input)
if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("generateEnvVarsFromStruct_S3WithCamelTag() mismatch (-want +got):\n%s", diff)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := GenerateEnvVarsFromStruct(tt.prefix, tt.input)
if diff := cmp.Diff(tt.want, got); diff != "" {
t.Errorf("GenerateEnvVarsFromStruct() mismatch (-want +got):\n%s", diff)
}
})
}
}

View File

@ -78,7 +78,9 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, source *v1alpha1.Integra
}
func (r *Reconciler) reconcileContainerSource(ctx context.Context, source *v1alpha1.IntegrationSource) (*v1.ContainerSource, error) {
expected := resources.NewContainerSource(source, feature.FromContext(ctx).IsOIDCAuthentication())
// set the OIDC to true only if the feature is enabled and the sink audience is set
// to prevent container environment vars to be set, just when the config is on.
expected := resources.NewContainerSource(source, feature.FromContext(ctx).IsOIDCAuthentication() && source.Status.SinkAudience != nil)
cs, err := r.containerSourceLister.ContainerSources(source.Namespace).Get(expected.Name)
if apierrors.IsNotFound(err) {

View File

@ -189,7 +189,7 @@ func TestReconcile(t *testing.T) {
WithIntegrationSourceSpec(makeIntegrationSourceSpec(sinkDest)),
WithInitIntegrationSourceConditions,
WithIntegrationSourceStatusObservedGeneration(generation),
WithIntegrationSourcePropagateContainerSourceStatus(makeContainerSourceStatus(&conditionTrue)),
WithIntegrationSourcePropagateContainerSourceStatus(makeContainerSourceStatusOIDC(&conditionTrue)),
WithIntegrationSourceOIDCServiceAccountName(getOIDCServiceAccountNameForContainerSource()),
),
}},
@ -218,39 +218,6 @@ func TestReconcile(t *testing.T) {
func makeContainerSourceOIDC(source *sourcesv1alpha1.IntegrationSource, ready *corev1.ConditionStatus) *sourcesv1.ContainerSource {
cs := makeContainerSource(source, ready)
// replace all env_vars for inserting the OIDC ones at the right order/index
cs.Spec.Template.Spec.Containers[0].Env = []corev1.EnvVar{
{
Name: "CAMEL_KNATIVE_CLIENT_SSL_ENABLED",
Value: "true",
},
{
Name: "CAMEL_KNATIVE_CLIENT_SSL_CERT_PATH",
Value: "/knative-custom-certs/knative-eventing-bundle.pem",
},
{
Name: "CAMEL_KNATIVE_CLIENT_OIDC_ENABLED",
Value: "true",
},
{
Name: "CAMEL_KNATIVE_CLIENT_OIDC_TOKEN_PATH",
Value: "file:///oidc/token",
},
{
Name: "CAMEL_KAMELET_TIMER_SOURCE_PERIOD",
Value: "1000",
},
{
Name: "CAMEL_KAMELET_TIMER_SOURCE_MESSAGE",
Value: "Hallo",
},
{
Name: "CAMEL_KAMELET_TIMER_SOURCE_REPEATCOUNT",
Value: "0",
},
}
cs.Status = *makeContainerSourceStatusOIDC(ready)
return cs

View File

@ -66,6 +66,8 @@ func (f SchedulerFunc) Schedule(ctx context.Context, vpod VPod) ([]duckv1alpha1.
// VPod represents virtual replicas placed into real Kubernetes pods
// The scheduler is responsible for placing VPods
type VPod interface {
GetDeletionTimestamp() *metav1.Time
// GetKey returns the VPod key (namespace/name).
GetKey() types.NamespacedName

View File

@ -278,23 +278,25 @@ func isPodUnschedulable(pod *v1.Pod) bool {
func (s *State) MarshalJSON() ([]byte, error) {
type S struct {
FreeCap []int32 `json:"freeCap"`
SchedulablePods []int32 `json:"schedulablePods"`
Capacity int32 `json:"capacity"`
Replicas int32 `json:"replicas"`
StatefulSetName string `json:"statefulSetName"`
PodSpread map[string]map[string]int32 `json:"podSpread"`
Pending map[string]int32 `json:"pending"`
FreeCap []int32 `json:"freeCap"`
SchedulablePods []int32 `json:"schedulablePods"`
Capacity int32 `json:"capacity"`
Replicas int32 `json:"replicas"`
StatefulSetName string `json:"statefulSetName"`
PodSpread map[string]map[string]int32 `json:"podSpread"`
Pending map[string]int32 `json:"pending"`
ExpectedVReplicaByVPod map[string]int32 `json:"expectedVReplicaByVPod"`
}
sj := S{
FreeCap: s.FreeCap,
SchedulablePods: s.SchedulablePods,
Capacity: s.Capacity,
Replicas: s.Replicas,
StatefulSetName: s.StatefulSetName,
PodSpread: ToJSONable(s.PodSpread),
Pending: toJSONablePending(s.Pending),
FreeCap: s.FreeCap,
SchedulablePods: s.SchedulablePods,
Capacity: s.Capacity,
Replicas: s.Replicas,
StatefulSetName: s.StatefulSetName,
PodSpread: ToJSONable(s.PodSpread),
Pending: toJSONablePending(s.Pending),
ExpectedVReplicaByVPod: toJSONablePending(s.ExpectedVReplicaByVPod),
}
return json.Marshal(sj)

View File

@ -205,10 +205,6 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool) err
return err
}
logger.Debugw("checking adapter capacity",
zap.Int32("replicas", scale.Spec.Replicas),
zap.Any("state", state))
newReplicas := integer.Int32Max(int32(math.Ceil(float64(state.TotalExpectedVReplicas())/float64(state.Capacity))), a.minReplicas)
// Only scale down if permitted
@ -216,6 +212,12 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool) err
newReplicas = scale.Spec.Replicas
}
logger.Debugw("checking adapter capacity",
zap.Bool("attemptScaleDown", attemptScaleDown),
zap.Int32("replicas", scale.Spec.Replicas),
zap.Int32("newReplicas", newReplicas),
zap.Any("state", state))
if newReplicas != scale.Spec.Replicas {
scale.Spec.Replicas = newReplicas
logger.Infow("updating adapter replicas", zap.Int32("replicas", scale.Spec.Replicas))

View File

@ -21,6 +21,7 @@ import (
"fmt"
"sort"
"sync"
"sync/atomic"
"time"
"go.uber.org/zap"
@ -114,6 +115,11 @@ type StatefulSetScheduler struct {
// replicas is the (cached) number of statefulset replicas.
replicas int32
// isLeader signals whether a given Scheduler instance is leader or not.
// The autoscaler is considered the leader when ephemeralLeaderElectionObject is in a
// bucket where we've been promoted.
isLeader atomic.Bool
// reserved tracks vreplicas that have been placed (ie. scheduled) but haven't been
// committed yet (ie. not appearing in vpodLister)
reserved map[types.NamespacedName]map[string]int32
@ -130,6 +136,9 @@ func (s *StatefulSetScheduler) Promote(b reconciler.Bucket, enq func(reconciler.
if !b.Has(ephemeralLeaderElectionObject) {
return nil
}
// The demoted bucket has the ephemeralLeaderElectionObject, so we are not leader anymore.
// Flip the flag after running initReserved.
defer s.isLeader.Store(true)
if v, ok := s.autoscaler.(reconciler.LeaderAware); ok {
return v.Promote(b, enq)
@ -151,6 +160,9 @@ func (s *StatefulSetScheduler) initReserved() error {
s.reserved = make(map[types.NamespacedName]map[string]int32, len(vPods))
for _, vPod := range vPods {
if !vPod.GetDeletionTimestamp().IsZero() {
continue
}
s.reserved[vPod.GetKey()] = make(map[string]int32, len(vPod.GetPlacements()))
for _, placement := range vPod.GetPlacements() {
s.reserved[vPod.GetKey()][placement.PodName] += placement.VReplicas
@ -159,8 +171,41 @@ func (s *StatefulSetScheduler) initReserved() error {
return nil
}
// resyncReserved removes deleted vPods from reserved to keep the state consistent when leadership
// changes (Promote / Demote).
// initReserved is not enough since the vPod lister can be stale.
func (s *StatefulSetScheduler) resyncReserved() error {
if !s.isLeader.Load() {
return nil
}
vPods, err := s.vpodLister()
if err != nil {
return fmt.Errorf("failed to list vPods during reserved resync: %w", err)
}
vPodsByK := vPodsByKey(vPods)
s.reservedMu.Lock()
defer s.reservedMu.Unlock()
for key := range s.reserved {
vPod, ok := vPodsByK[key]
if !ok || vPod == nil {
delete(s.reserved, key)
}
}
return nil
}
// Demote implements reconciler.LeaderAware.
func (s *StatefulSetScheduler) Demote(b reconciler.Bucket) {
if !b.Has(ephemeralLeaderElectionObject) {
return
}
// The demoted bucket has the ephemeralLeaderElectionObject, so we are not leader anymore.
defer s.isLeader.Store(false)
if v, ok := s.autoscaler.(reconciler.LeaderAware); ok {
v.Demote(b)
}
@ -208,6 +253,17 @@ func newStatefulSetScheduler(ctx context.Context,
sif.Shutdown()
}()
go func() {
for {
select {
case <-ctx.Done():
return
case <-time.After(cfg.RefreshPeriod * 3):
_ = s.resyncReserved()
}
}
}()
return s
}
@ -561,3 +617,11 @@ func upsertPlacements(placements []duckv1alpha1.Placement, placement duckv1alpha
}
return placements
}
func vPodsByKey(vPods []scheduler.VPod) map[types.NamespacedName]scheduler.VPod {
r := make(map[types.NamespacedName]scheduler.VPod, len(vPods))
for _, vPod := range vPods {
r[vPod.GetKey()] = vPod
}
return r
}

View File

@ -708,6 +708,21 @@ func TestStatefulsetScheduler(t *testing.T) {
vpodClient := tscheduler.NewVPodClient()
vpod := vpodClient.Create(vpodNamespace, vpodName, tc.vreplicas, tc.placements)
for vPodKey, res := range tc.initialReserved {
if vPodKey.Namespace == vpodNamespace && vPodKey.Name == vpodName {
continue
}
var placements []duckv1alpha1.Placement
count := int32(0)
for pod, vReplicas := range res {
count += vReplicas
placements = append(placements, duckv1alpha1.Placement{
PodName: pod,
VReplicas: vReplicas,
})
}
vpodClient.Create(vPodKey.Namespace, vPodKey.Name, count, placements)
}
for i := int32(0); i < tc.replicas; i++ {
nodeName := "node" + fmt.Sprint(i)
@ -745,7 +760,9 @@ func TestStatefulsetScheduler(t *testing.T) {
t.Fatal("unexpected error", err)
}
if tc.initialReserved != nil {
s.reservedMu.Lock()
s.reserved = tc.initialReserved
s.reservedMu.Unlock()
}
// Give some time for the informer to notify the scheduler and set the number of replicas

View File

@ -57,6 +57,10 @@ func NewVPod(ns, name string, vreplicas int32, placements []duckv1alpha1.Placeme
}
}
func (d *sampleVPod) GetDeletionTimestamp() *metav1.Time {
return nil
}
func (d *sampleVPod) GetKey() types.NamespacedName {
return d.key
}