mirror of https://github.com/knative/eventing.git
Compare commits
12 Commits
Author | SHA1 | Date |
---|---|---|
|
eb58703061 | |
|
d01256fa32 | |
|
4e48320ac7 | |
|
34d05f30f2 | |
|
b4c59bb301 | |
|
85ca388bf8 | |
|
5b7b03053a | |
|
a017fc0ed0 | |
|
93cc440c99 | |
|
c138419361 | |
|
4542e6bf08 | |
|
cbdf86e94b |
|
@ -1,16 +1,11 @@
|
|||
version: "2"
|
||||
run:
|
||||
timeout: 10m
|
||||
|
||||
build-tags:
|
||||
- e2e
|
||||
- probe
|
||||
- preupgrade
|
||||
- postupgrade
|
||||
- postdowngrade
|
||||
|
||||
skip-dirs:
|
||||
- pkg/client
|
||||
|
||||
linters:
|
||||
enable:
|
||||
- asciicheck
|
||||
|
@ -20,10 +15,28 @@ linters:
|
|||
- unparam
|
||||
disable:
|
||||
- errcheck
|
||||
|
||||
issues:
|
||||
exclude-rules:
|
||||
- path: test # Excludes /test, *_test.go etc.
|
||||
linters:
|
||||
- gosec
|
||||
- unparam
|
||||
exclusions:
|
||||
generated: lax
|
||||
presets:
|
||||
- comments
|
||||
- common-false-positives
|
||||
- legacy
|
||||
- std-error-handling
|
||||
rules:
|
||||
- linters:
|
||||
- gosec
|
||||
- unparam
|
||||
path: test
|
||||
paths:
|
||||
- third_party$
|
||||
- builtin$
|
||||
- examples$
|
||||
- pkg/client
|
||||
formatters:
|
||||
exclusions:
|
||||
generated: lax
|
||||
paths:
|
||||
- third_party$
|
||||
- builtin$
|
||||
- examples$
|
||||
- pkg/client
|
||||
|
|
|
@ -79,6 +79,19 @@ rules:
|
|||
---
|
||||
kind: ClusterRole
|
||||
apiVersion: rbac.authorization.k8s.io/v1
|
||||
metadata:
|
||||
name: knative-sinks-namespaced-admin
|
||||
labels:
|
||||
rbac.authorization.k8s.io/aggregate-to-admin: "true"
|
||||
app.kubernetes.io/version: devel
|
||||
app.kubernetes.io/name: knative-eventing
|
||||
rules:
|
||||
- apiGroups: ["sinks.knative.dev"]
|
||||
resources: ["*"]
|
||||
verbs: ["*"]
|
||||
---
|
||||
kind: ClusterRole
|
||||
apiVersion: rbac.authorization.k8s.io/v1
|
||||
metadata:
|
||||
name: knative-eventing-namespaced-edit
|
||||
labels:
|
||||
|
@ -86,7 +99,7 @@ metadata:
|
|||
app.kubernetes.io/version: devel
|
||||
app.kubernetes.io/name: knative-eventing
|
||||
rules:
|
||||
- apiGroups: ["eventing.knative.dev", "messaging.knative.dev", "sources.knative.dev", "flows.knative.dev", "bindings.knative.dev"]
|
||||
- apiGroups: ["eventing.knative.dev", "messaging.knative.dev", "sources.knative.dev", "flows.knative.dev", "bindings.knative.dev", "sinks.knative.dev"]
|
||||
resources: ["*"]
|
||||
verbs: ["create", "update", "patch", "delete"]
|
||||
---
|
||||
|
@ -99,6 +112,6 @@ metadata:
|
|||
app.kubernetes.io/version: devel
|
||||
app.kubernetes.io/name: knative-eventing
|
||||
rules:
|
||||
- apiGroups: ["eventing.knative.dev", "messaging.knative.dev", "sources.knative.dev", "flows.knative.dev", "bindings.knative.dev"]
|
||||
- apiGroups: ["eventing.knative.dev", "messaging.knative.dev", "sources.knative.dev", "flows.knative.dev", "bindings.knative.dev", "sinks.knative.dev"]
|
||||
resources: ["*"]
|
||||
verbs: ["get", "list", "watch"]
|
||||
|
|
2
go.mod
2
go.mod
|
@ -1,5 +1,7 @@
|
|||
module knative.dev/eventing
|
||||
|
||||
// A placeholder comment to rebuild with Go 1.23.5 toolchain to cover CVEs
|
||||
|
||||
go 1.22.7
|
||||
|
||||
require (
|
||||
|
|
|
@ -33,33 +33,33 @@ type AWSCommon struct {
|
|||
|
||||
type AWSS3 struct {
|
||||
AWSCommon `json:",inline"` // Embeds AWSCommon to inherit its fields in JSON
|
||||
Arn string `json:"arn,omitempty" camel:"CAMEL_KAMELET_AWS_S3_SOURCE_BUCKETNAMEORARN"` // S3 ARN
|
||||
DeleteAfterRead bool `json:"deleteAfterRead" default:"true"` // Auto-delete objects after reading
|
||||
MoveAfterRead bool `json:"moveAfterRead" default:"false"` // Move objects after reading
|
||||
DestinationBucket string `json:"destinationBucket,omitempty"` // Destination bucket for moved objects
|
||||
DestinationBucketPrefix string `json:"destinationBucketPrefix,omitempty"` // Prefix for moved objects
|
||||
DestinationBucketSuffix string `json:"destinationBucketSuffix,omitempty"` // Suffix for moved objects
|
||||
AutoCreateBucket bool `json:"autoCreateBucket" default:"false"` // Auto-create S3 bucket
|
||||
Prefix string `json:"prefix,omitempty"` // S3 bucket prefix for search
|
||||
IgnoreBody bool `json:"ignoreBody" default:"false"` // Ignore object body
|
||||
ForcePathStyle bool `json:"forcePathStyle" default:"false"` // Force path style for bucket access
|
||||
Delay int `json:"delay" default:"500"` // Delay between polls in milliseconds
|
||||
MaxMessagesPerPoll int `json:"maxMessagesPerPoll" default:"10"` // Max messages to poll per request
|
||||
Arn string `json:"arn,omitempty" camel:"BUCKET_NAME_OR_ARN"` // S3 ARN
|
||||
DeleteAfterRead bool `json:"deleteAfterRead" default:"true"` // Auto-delete objects after reading
|
||||
MoveAfterRead bool `json:"moveAfterRead" default:"false"` // Move objects after reading
|
||||
DestinationBucket string `json:"destinationBucket,omitempty"` // Destination bucket for moved objects
|
||||
DestinationBucketPrefix string `json:"destinationBucketPrefix,omitempty"` // Prefix for moved objects
|
||||
DestinationBucketSuffix string `json:"destinationBucketSuffix,omitempty"` // Suffix for moved objects
|
||||
AutoCreateBucket bool `json:"autoCreateBucket" default:"false"` // Auto-create S3 bucket
|
||||
Prefix string `json:"prefix,omitempty"` // S3 bucket prefix for search
|
||||
IgnoreBody bool `json:"ignoreBody" default:"false"` // Ignore object body
|
||||
ForcePathStyle bool `json:"forcePathStyle" default:"false"` // Force path style for bucket access
|
||||
Delay int `json:"delay" default:"500"` // Delay between polls in milliseconds
|
||||
MaxMessagesPerPoll int `json:"maxMessagesPerPoll" default:"10"` // Max messages to poll per request
|
||||
}
|
||||
|
||||
type AWSSQS struct {
|
||||
AWSCommon `json:",inline"` // Embeds AWSCommon to inherit its fields in JSON
|
||||
Arn string `json:"arn,omitempty" camel:"CAMEL_KAMELET_AWS_SQS_SOURCE_QUEUENAMEORARN"` // SQS ARN
|
||||
DeleteAfterRead bool `json:"deleteAfterRead" default:"true"` // Auto-delete messages after reading
|
||||
AutoCreateQueue bool `json:"autoCreateQueue" default:"false"` // Auto-create SQS queue
|
||||
Host string `json:"host" camel:"CAMEL_KAMELET_AWS_SQS_SOURCE_AMAZONAWSHOST" default:"amazonaws.com"` // AWS host
|
||||
Protocol string `json:"protocol" default:"https"` // Communication protocol (http/https)
|
||||
QueueURL string `json:"queueURL,omitempty"` // Full SQS queue URL
|
||||
Greedy bool `json:"greedy" default:"false"` // Greedy scheduler
|
||||
Delay int `json:"delay" default:"500"` // Delay between polls in milliseconds
|
||||
MaxMessagesPerPoll int `json:"maxMessagesPerPoll" default:"1"` // Max messages to return (1-10)
|
||||
WaitTimeSeconds int `json:"waitTimeSeconds,omitempty"` // Wait time for messages
|
||||
VisibilityTimeout int `json:"visibilityTimeout,omitempty"` // Visibility timeout in seconds
|
||||
Arn string `json:"arn,omitempty" camel:"QUEUE_NAME_OR_ARN"` // SQS ARN
|
||||
DeleteAfterRead bool `json:"deleteAfterRead" default:"true"` // Auto-delete messages after reading
|
||||
AutoCreateQueue bool `json:"autoCreateQueue" default:"false"` // Auto-create SQS queue
|
||||
Host string `json:"host" camel:"AMAZONAWSHOST" default:"amazonaws.com"` // AWS host
|
||||
Protocol string `json:"protocol" default:"https"` // Communication protocol (http/https)
|
||||
QueueURL string `json:"queueURL,omitempty"` // Full SQS queue URL
|
||||
Greedy bool `json:"greedy" default:"false"` // Greedy scheduler
|
||||
Delay int `json:"delay" default:"500"` // Delay between polls in milliseconds
|
||||
MaxMessagesPerPoll int `json:"maxMessagesPerPoll" default:"1"` // Max messages to return (1-10)
|
||||
WaitTimeSeconds int `json:"waitTimeSeconds,omitempty"` // Wait time for messages
|
||||
VisibilityTimeout int `json:"visibilityTimeout,omitempty"` // Visibility timeout in seconds
|
||||
}
|
||||
|
||||
type AWSDDBStreams struct {
|
||||
|
@ -71,6 +71,6 @@ type AWSDDBStreams struct {
|
|||
|
||||
type AWSSNS struct {
|
||||
AWSCommon `json:",inline"` // Embeds AWSCommon to inherit its fields in JSON
|
||||
Arn string `json:"arn,omitempty" camel:"CAMEL_KAMELET_AWS_SNS_SINK_TOPICNAMEORARN"` // SNS ARN
|
||||
AutoCreateTopic bool `json:"autoCreateTopic" default:"false"` // Auto-create SNS topic
|
||||
Arn string `json:"arn,omitempty" camel:"TOPIC_NAME_OR_ARN"` // SNS ARN
|
||||
AutoCreateTopic bool `json:"autoCreateTopic" default:"false"` // Auto-create SNS topic
|
||||
}
|
||||
|
|
|
@ -425,8 +425,8 @@ func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger
|
|||
subscriberURI := trigger.Status.SubscriberURI
|
||||
if subscriberURI == nil {
|
||||
// Record the event count.
|
||||
writer.WriteHeader(http.StatusBadRequest)
|
||||
_ = h.reporter.ReportEventCount(reportArgs, http.StatusBadRequest)
|
||||
writer.WriteHeader(http.StatusNotFound)
|
||||
_ = h.reporter.ReportEventCount(reportArgs, http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -138,7 +138,7 @@ func TestReceiver(t *testing.T) {
|
|||
triggers: []*eventingv1.Trigger{
|
||||
makeTrigger(withoutSubscriberURI()),
|
||||
},
|
||||
expectedStatus: http.StatusBadRequest,
|
||||
expectedStatus: http.StatusNotFound,
|
||||
expectedEventCount: true,
|
||||
},
|
||||
"Trigger without a Filter": {
|
||||
|
|
|
@ -25,11 +25,11 @@ import (
|
|||
"k8s.io/client-go/tools/cache"
|
||||
"knative.dev/pkg/apis"
|
||||
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap"
|
||||
endpointsinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/endpoints"
|
||||
"knative.dev/pkg/configmap"
|
||||
"knative.dev/pkg/controller"
|
||||
"knative.dev/pkg/injection/clients/dynamicclient"
|
||||
secretinformer "knative.dev/pkg/injection/clients/namespacedkube/informers/core/v1/secret"
|
||||
namespacedinformerfactory "knative.dev/pkg/injection/clients/namespacedkube/informers/factory"
|
||||
"knative.dev/pkg/logging"
|
||||
pkgreconciler "knative.dev/pkg/reconciler"
|
||||
"knative.dev/pkg/resolver"
|
||||
|
@ -69,7 +69,12 @@ func NewController(
|
|||
logger := logging.FromContext(ctx)
|
||||
brokerInformer := brokerinformer.Get(ctx)
|
||||
subscriptionInformer := subscriptioninformer.Get(ctx)
|
||||
endpointsInformer := endpointsinformer.Get(ctx)
|
||||
|
||||
endpointsInformer := namespacedinformerfactory.Get(ctx).Core().V1().Endpoints()
|
||||
if err := controller.StartInformers(ctx.Done(), endpointsInformer.Informer()); err != nil {
|
||||
logger.Fatalw("Failed to start namespaced endpoints informer", zap.Error(err))
|
||||
}
|
||||
|
||||
configmapInformer := configmapinformer.Get(ctx)
|
||||
secretInformer := secretinformer.Get(ctx)
|
||||
eventPolicyInformer := eventpolicyinformer.Get(ctx)
|
||||
|
|
|
@ -109,8 +109,8 @@ func (r *Reconciler) reconcileReceiveAdapter(ctx context.Context, source *v1.Con
|
|||
return nil, fmt.Errorf("getting Deployment: %v", err)
|
||||
} else if !metav1.IsControlledBy(ra, source) {
|
||||
return nil, fmt.Errorf("deployment %q is not owned by ContainerSource %q", ra.Name, source.Name)
|
||||
} else if r.podSpecChanged(&ra.Spec.Template.Spec, &expected.Spec.Template.Spec) {
|
||||
ra.Spec.Template.Spec = expected.Spec.Template.Spec
|
||||
} else if r.podTemplateChanged(&ra.Spec.Template, &expected.Spec.Template) {
|
||||
ra.Spec.Template = expected.Spec.Template
|
||||
ra, err = r.kubeClientSet.AppsV1().Deployments(expected.Namespace).Update(ctx, ra, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("updating Deployment: %v", err)
|
||||
|
@ -159,6 +159,10 @@ func (r *Reconciler) podSpecChanged(have *corev1.PodSpec, want *corev1.PodSpec)
|
|||
return !equality.Semantic.DeepDerivative(want, have)
|
||||
}
|
||||
|
||||
func (r *Reconciler) podTemplateChanged(have *corev1.PodTemplateSpec, want *corev1.PodTemplateSpec) bool {
|
||||
return !equality.Semantic.DeepDerivative(want, have)
|
||||
}
|
||||
|
||||
func (r *Reconciler) sinkBindingSpecChanged(have *v1.SinkBindingSpec, want *v1.SinkBindingSpec) bool {
|
||||
return !equality.Semantic.DeepDerivative(want, have)
|
||||
}
|
||||
|
|
|
@ -54,7 +54,9 @@ func GenerateEnvVarsFromStruct(prefix string, s interface{}) []corev1.EnvVar {
|
|||
|
||||
// First, check for the custom 'camel' tag
|
||||
envVarName := fieldType.Tag.Get("camel")
|
||||
if envVarName == "" {
|
||||
if envVarName != "" {
|
||||
envVarName = fmt.Sprintf("%s_%s", prefix, envVarName)
|
||||
} else {
|
||||
// If 'camel' tag is not present, fall back to the 'json' tag or Go field name
|
||||
jsonTag := fieldType.Tag.Get("json")
|
||||
tagName := strings.Split(jsonTag, ",")[0]
|
||||
|
|
|
@ -53,25 +53,77 @@ func TestGenerateEnvVarsFromStruct(t *testing.T) {
|
|||
|
||||
func TestGenerateEnvVarsFromStruct_S3WithCamelTag(t *testing.T) {
|
||||
type AWSS3 struct {
|
||||
Arn string `json:"arn,omitempty" camel:"CAMEL_KAMELET_AWS_S3_SOURCE_BUCKETNAMEORARN"`
|
||||
Arn string `json:"arn,omitempty" camel:"BUCKETNAMEORARN"`
|
||||
Region string `json:"region,omitempty"`
|
||||
}
|
||||
|
||||
prefix := "CAMEL_KAMELET_AWS_S3_SOURCE"
|
||||
input := AWSS3{
|
||||
Arn: "arn:aws:s3:::example-bucket",
|
||||
Region: "us-west-2",
|
||||
type AWSSQS struct {
|
||||
Arn string `json:"arn,omitempty" camel:"QUEUENAMEORARN"`
|
||||
Region string `json:"region,omitempty"`
|
||||
}
|
||||
|
||||
// Expected environment variables including SSL settings and camel tag for Arn
|
||||
want := []corev1.EnvVar{
|
||||
{Name: "CAMEL_KAMELET_AWS_S3_SOURCE_BUCKETNAMEORARN", Value: "arn:aws:s3:::example-bucket"},
|
||||
{Name: "CAMEL_KAMELET_AWS_S3_SOURCE_REGION", Value: "us-west-2"},
|
||||
tests := []struct {
|
||||
name string
|
||||
prefix string
|
||||
input interface{}
|
||||
want []corev1.EnvVar
|
||||
}{
|
||||
{
|
||||
name: "S3 Source with Camel Tag",
|
||||
prefix: "CAMEL_KAMELET_AWS_S3_SOURCE",
|
||||
input: AWSS3{
|
||||
Arn: "arn:aws:s3:::example-bucket",
|
||||
Region: "us-west-2",
|
||||
},
|
||||
want: []corev1.EnvVar{
|
||||
{Name: "CAMEL_KAMELET_AWS_S3_SOURCE_BUCKETNAMEORARN", Value: "arn:aws:s3:::example-bucket"},
|
||||
{Name: "CAMEL_KAMELET_AWS_S3_SOURCE_REGION", Value: "us-west-2"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "S3 Sink with Camel Tag",
|
||||
prefix: "CAMEL_KAMELET_AWS_S3_SINK",
|
||||
input: AWSS3{
|
||||
Arn: "arn:aws:s3:::another-bucket",
|
||||
Region: "eu-central-1",
|
||||
},
|
||||
want: []corev1.EnvVar{
|
||||
{Name: "CAMEL_KAMELET_AWS_S3_SINK_BUCKETNAMEORARN", Value: "arn:aws:s3:::another-bucket"},
|
||||
{Name: "CAMEL_KAMELET_AWS_S3_SINK_REGION", Value: "eu-central-1"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "SQS Source with Camel Tag",
|
||||
prefix: "CAMEL_KAMELET_AWS_SQS_SOURCE",
|
||||
input: AWSSQS{
|
||||
Arn: "arn:aws:sqs:::example-queue",
|
||||
Region: "us-east-1",
|
||||
},
|
||||
want: []corev1.EnvVar{
|
||||
{Name: "CAMEL_KAMELET_AWS_SQS_SOURCE_QUEUENAMEORARN", Value: "arn:aws:sqs:::example-queue"},
|
||||
{Name: "CAMEL_KAMELET_AWS_SQS_SOURCE_REGION", Value: "us-east-1"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "SQS Sink with Camel Tag",
|
||||
prefix: "CAMEL_KAMELET_AWS_SQS_SINK",
|
||||
input: AWSSQS{
|
||||
Arn: "arn:aws:sqs:::another-queue",
|
||||
Region: "ap-southeast-1",
|
||||
},
|
||||
want: []corev1.EnvVar{
|
||||
{Name: "CAMEL_KAMELET_AWS_SQS_SINK_QUEUENAMEORARN", Value: "arn:aws:sqs:::another-queue"},
|
||||
{Name: "CAMEL_KAMELET_AWS_SQS_SINK_REGION", Value: "ap-southeast-1"},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
got := GenerateEnvVarsFromStruct(prefix, input)
|
||||
|
||||
if diff := cmp.Diff(want, got); diff != "" {
|
||||
t.Errorf("generateEnvVarsFromStruct_S3WithCamelTag() mismatch (-want +got):\n%s", diff)
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got := GenerateEnvVarsFromStruct(tt.prefix, tt.input)
|
||||
if diff := cmp.Diff(tt.want, got); diff != "" {
|
||||
t.Errorf("GenerateEnvVarsFromStruct() mismatch (-want +got):\n%s", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -78,7 +78,9 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, source *v1alpha1.Integra
|
|||
}
|
||||
|
||||
func (r *Reconciler) reconcileContainerSource(ctx context.Context, source *v1alpha1.IntegrationSource) (*v1.ContainerSource, error) {
|
||||
expected := resources.NewContainerSource(source, feature.FromContext(ctx).IsOIDCAuthentication())
|
||||
// set the OIDC to true only if the feature is enabled and the sink audience is set
|
||||
// to prevent container environment vars to be set, just when the config is on.
|
||||
expected := resources.NewContainerSource(source, feature.FromContext(ctx).IsOIDCAuthentication() && source.Status.SinkAudience != nil)
|
||||
|
||||
cs, err := r.containerSourceLister.ContainerSources(source.Namespace).Get(expected.Name)
|
||||
if apierrors.IsNotFound(err) {
|
||||
|
|
|
@ -189,7 +189,7 @@ func TestReconcile(t *testing.T) {
|
|||
WithIntegrationSourceSpec(makeIntegrationSourceSpec(sinkDest)),
|
||||
WithInitIntegrationSourceConditions,
|
||||
WithIntegrationSourceStatusObservedGeneration(generation),
|
||||
WithIntegrationSourcePropagateContainerSourceStatus(makeContainerSourceStatus(&conditionTrue)),
|
||||
WithIntegrationSourcePropagateContainerSourceStatus(makeContainerSourceStatusOIDC(&conditionTrue)),
|
||||
WithIntegrationSourceOIDCServiceAccountName(getOIDCServiceAccountNameForContainerSource()),
|
||||
),
|
||||
}},
|
||||
|
@ -218,39 +218,6 @@ func TestReconcile(t *testing.T) {
|
|||
|
||||
func makeContainerSourceOIDC(source *sourcesv1alpha1.IntegrationSource, ready *corev1.ConditionStatus) *sourcesv1.ContainerSource {
|
||||
cs := makeContainerSource(source, ready)
|
||||
|
||||
// replace all env_vars for inserting the OIDC ones at the right order/index
|
||||
cs.Spec.Template.Spec.Containers[0].Env = []corev1.EnvVar{
|
||||
{
|
||||
Name: "CAMEL_KNATIVE_CLIENT_SSL_ENABLED",
|
||||
Value: "true",
|
||||
},
|
||||
{
|
||||
Name: "CAMEL_KNATIVE_CLIENT_SSL_CERT_PATH",
|
||||
Value: "/knative-custom-certs/knative-eventing-bundle.pem",
|
||||
},
|
||||
{
|
||||
Name: "CAMEL_KNATIVE_CLIENT_OIDC_ENABLED",
|
||||
Value: "true",
|
||||
},
|
||||
{
|
||||
Name: "CAMEL_KNATIVE_CLIENT_OIDC_TOKEN_PATH",
|
||||
Value: "file:///oidc/token",
|
||||
},
|
||||
{
|
||||
Name: "CAMEL_KAMELET_TIMER_SOURCE_PERIOD",
|
||||
Value: "1000",
|
||||
},
|
||||
{
|
||||
Name: "CAMEL_KAMELET_TIMER_SOURCE_MESSAGE",
|
||||
Value: "Hallo",
|
||||
},
|
||||
{
|
||||
Name: "CAMEL_KAMELET_TIMER_SOURCE_REPEATCOUNT",
|
||||
Value: "0",
|
||||
},
|
||||
}
|
||||
|
||||
cs.Status = *makeContainerSourceStatusOIDC(ready)
|
||||
|
||||
return cs
|
||||
|
|
|
@ -66,6 +66,8 @@ func (f SchedulerFunc) Schedule(ctx context.Context, vpod VPod) ([]duckv1alpha1.
|
|||
// VPod represents virtual replicas placed into real Kubernetes pods
|
||||
// The scheduler is responsible for placing VPods
|
||||
type VPod interface {
|
||||
GetDeletionTimestamp() *metav1.Time
|
||||
|
||||
// GetKey returns the VPod key (namespace/name).
|
||||
GetKey() types.NamespacedName
|
||||
|
||||
|
|
|
@ -278,23 +278,25 @@ func isPodUnschedulable(pod *v1.Pod) bool {
|
|||
func (s *State) MarshalJSON() ([]byte, error) {
|
||||
|
||||
type S struct {
|
||||
FreeCap []int32 `json:"freeCap"`
|
||||
SchedulablePods []int32 `json:"schedulablePods"`
|
||||
Capacity int32 `json:"capacity"`
|
||||
Replicas int32 `json:"replicas"`
|
||||
StatefulSetName string `json:"statefulSetName"`
|
||||
PodSpread map[string]map[string]int32 `json:"podSpread"`
|
||||
Pending map[string]int32 `json:"pending"`
|
||||
FreeCap []int32 `json:"freeCap"`
|
||||
SchedulablePods []int32 `json:"schedulablePods"`
|
||||
Capacity int32 `json:"capacity"`
|
||||
Replicas int32 `json:"replicas"`
|
||||
StatefulSetName string `json:"statefulSetName"`
|
||||
PodSpread map[string]map[string]int32 `json:"podSpread"`
|
||||
Pending map[string]int32 `json:"pending"`
|
||||
ExpectedVReplicaByVPod map[string]int32 `json:"expectedVReplicaByVPod"`
|
||||
}
|
||||
|
||||
sj := S{
|
||||
FreeCap: s.FreeCap,
|
||||
SchedulablePods: s.SchedulablePods,
|
||||
Capacity: s.Capacity,
|
||||
Replicas: s.Replicas,
|
||||
StatefulSetName: s.StatefulSetName,
|
||||
PodSpread: ToJSONable(s.PodSpread),
|
||||
Pending: toJSONablePending(s.Pending),
|
||||
FreeCap: s.FreeCap,
|
||||
SchedulablePods: s.SchedulablePods,
|
||||
Capacity: s.Capacity,
|
||||
Replicas: s.Replicas,
|
||||
StatefulSetName: s.StatefulSetName,
|
||||
PodSpread: ToJSONable(s.PodSpread),
|
||||
Pending: toJSONablePending(s.Pending),
|
||||
ExpectedVReplicaByVPod: toJSONablePending(s.ExpectedVReplicaByVPod),
|
||||
}
|
||||
|
||||
return json.Marshal(sj)
|
||||
|
|
|
@ -205,10 +205,6 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool) err
|
|||
return err
|
||||
}
|
||||
|
||||
logger.Debugw("checking adapter capacity",
|
||||
zap.Int32("replicas", scale.Spec.Replicas),
|
||||
zap.Any("state", state))
|
||||
|
||||
newReplicas := integer.Int32Max(int32(math.Ceil(float64(state.TotalExpectedVReplicas())/float64(state.Capacity))), a.minReplicas)
|
||||
|
||||
// Only scale down if permitted
|
||||
|
@ -216,6 +212,12 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool) err
|
|||
newReplicas = scale.Spec.Replicas
|
||||
}
|
||||
|
||||
logger.Debugw("checking adapter capacity",
|
||||
zap.Bool("attemptScaleDown", attemptScaleDown),
|
||||
zap.Int32("replicas", scale.Spec.Replicas),
|
||||
zap.Int32("newReplicas", newReplicas),
|
||||
zap.Any("state", state))
|
||||
|
||||
if newReplicas != scale.Spec.Replicas {
|
||||
scale.Spec.Replicas = newReplicas
|
||||
logger.Infow("updating adapter replicas", zap.Int32("replicas", scale.Spec.Replicas))
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
@ -114,6 +115,11 @@ type StatefulSetScheduler struct {
|
|||
// replicas is the (cached) number of statefulset replicas.
|
||||
replicas int32
|
||||
|
||||
// isLeader signals whether a given Scheduler instance is leader or not.
|
||||
// The autoscaler is considered the leader when ephemeralLeaderElectionObject is in a
|
||||
// bucket where we've been promoted.
|
||||
isLeader atomic.Bool
|
||||
|
||||
// reserved tracks vreplicas that have been placed (ie. scheduled) but haven't been
|
||||
// committed yet (ie. not appearing in vpodLister)
|
||||
reserved map[types.NamespacedName]map[string]int32
|
||||
|
@ -130,6 +136,9 @@ func (s *StatefulSetScheduler) Promote(b reconciler.Bucket, enq func(reconciler.
|
|||
if !b.Has(ephemeralLeaderElectionObject) {
|
||||
return nil
|
||||
}
|
||||
// The demoted bucket has the ephemeralLeaderElectionObject, so we are not leader anymore.
|
||||
// Flip the flag after running initReserved.
|
||||
defer s.isLeader.Store(true)
|
||||
|
||||
if v, ok := s.autoscaler.(reconciler.LeaderAware); ok {
|
||||
return v.Promote(b, enq)
|
||||
|
@ -151,6 +160,9 @@ func (s *StatefulSetScheduler) initReserved() error {
|
|||
|
||||
s.reserved = make(map[types.NamespacedName]map[string]int32, len(vPods))
|
||||
for _, vPod := range vPods {
|
||||
if !vPod.GetDeletionTimestamp().IsZero() {
|
||||
continue
|
||||
}
|
||||
s.reserved[vPod.GetKey()] = make(map[string]int32, len(vPod.GetPlacements()))
|
||||
for _, placement := range vPod.GetPlacements() {
|
||||
s.reserved[vPod.GetKey()][placement.PodName] += placement.VReplicas
|
||||
|
@ -159,8 +171,41 @@ func (s *StatefulSetScheduler) initReserved() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// resyncReserved removes deleted vPods from reserved to keep the state consistent when leadership
|
||||
// changes (Promote / Demote).
|
||||
// initReserved is not enough since the vPod lister can be stale.
|
||||
func (s *StatefulSetScheduler) resyncReserved() error {
|
||||
if !s.isLeader.Load() {
|
||||
return nil
|
||||
}
|
||||
|
||||
vPods, err := s.vpodLister()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to list vPods during reserved resync: %w", err)
|
||||
}
|
||||
vPodsByK := vPodsByKey(vPods)
|
||||
|
||||
s.reservedMu.Lock()
|
||||
defer s.reservedMu.Unlock()
|
||||
|
||||
for key := range s.reserved {
|
||||
vPod, ok := vPodsByK[key]
|
||||
if !ok || vPod == nil {
|
||||
delete(s.reserved, key)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Demote implements reconciler.LeaderAware.
|
||||
func (s *StatefulSetScheduler) Demote(b reconciler.Bucket) {
|
||||
if !b.Has(ephemeralLeaderElectionObject) {
|
||||
return
|
||||
}
|
||||
// The demoted bucket has the ephemeralLeaderElectionObject, so we are not leader anymore.
|
||||
defer s.isLeader.Store(false)
|
||||
|
||||
if v, ok := s.autoscaler.(reconciler.LeaderAware); ok {
|
||||
v.Demote(b)
|
||||
}
|
||||
|
@ -208,6 +253,17 @@ func newStatefulSetScheduler(ctx context.Context,
|
|||
sif.Shutdown()
|
||||
}()
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(cfg.RefreshPeriod * 3):
|
||||
_ = s.resyncReserved()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
|
@ -561,3 +617,11 @@ func upsertPlacements(placements []duckv1alpha1.Placement, placement duckv1alpha
|
|||
}
|
||||
return placements
|
||||
}
|
||||
|
||||
func vPodsByKey(vPods []scheduler.VPod) map[types.NamespacedName]scheduler.VPod {
|
||||
r := make(map[types.NamespacedName]scheduler.VPod, len(vPods))
|
||||
for _, vPod := range vPods {
|
||||
r[vPod.GetKey()] = vPod
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
|
|
@ -708,6 +708,21 @@ func TestStatefulsetScheduler(t *testing.T) {
|
|||
|
||||
vpodClient := tscheduler.NewVPodClient()
|
||||
vpod := vpodClient.Create(vpodNamespace, vpodName, tc.vreplicas, tc.placements)
|
||||
for vPodKey, res := range tc.initialReserved {
|
||||
if vPodKey.Namespace == vpodNamespace && vPodKey.Name == vpodName {
|
||||
continue
|
||||
}
|
||||
var placements []duckv1alpha1.Placement
|
||||
count := int32(0)
|
||||
for pod, vReplicas := range res {
|
||||
count += vReplicas
|
||||
placements = append(placements, duckv1alpha1.Placement{
|
||||
PodName: pod,
|
||||
VReplicas: vReplicas,
|
||||
})
|
||||
}
|
||||
vpodClient.Create(vPodKey.Namespace, vPodKey.Name, count, placements)
|
||||
}
|
||||
|
||||
for i := int32(0); i < tc.replicas; i++ {
|
||||
nodeName := "node" + fmt.Sprint(i)
|
||||
|
@ -745,7 +760,9 @@ func TestStatefulsetScheduler(t *testing.T) {
|
|||
t.Fatal("unexpected error", err)
|
||||
}
|
||||
if tc.initialReserved != nil {
|
||||
s.reservedMu.Lock()
|
||||
s.reserved = tc.initialReserved
|
||||
s.reservedMu.Unlock()
|
||||
}
|
||||
|
||||
// Give some time for the informer to notify the scheduler and set the number of replicas
|
||||
|
|
|
@ -57,6 +57,10 @@ func NewVPod(ns, name string, vreplicas int32, placements []duckv1alpha1.Placeme
|
|||
}
|
||||
}
|
||||
|
||||
func (d *sampleVPod) GetDeletionTimestamp() *metav1.Time {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *sampleVPod) GetKey() types.NamespacedName {
|
||||
return d.key
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue