diff --git a/.github/infrastructure/docker-compose-jetstream.yml b/.github/infrastructure/docker-compose-jetstream.yml new file mode 100644 index 000000000..c056134b2 --- /dev/null +++ b/.github/infrastructure/docker-compose-jetstream.yml @@ -0,0 +1,19 @@ +version: '2' + +services: + nats: + image: nats:2.9.9 + ports: + - "4222:4222" + networks: ["nats"] + command: "--js" + startup: + image: natsio/nats-box + networks: ["nats"] + depends_on: + - nats + entrypoint: sh -c "sleep 5 && nats -s nats:4222 stream add pubsub --subjects testTopic,multiTopic1,multiTopic2,testTopicBulk --storage=file --replicas=1 --retention=limits --discard=old --max-msgs=-1 --max-msgs-per-subject=-1 --max-bytes=-1 --max-age=-1 --max-msg-size=-1 --dupe-window=2m0s --no-allow-rollup --no-deny-delete --no-deny-purge" + +networks: + nats: + name: nats diff --git a/.github/workflows/conformance.yml b/.github/workflows/conformance.yml index 741d3b8b6..730a4a34c 100644 --- a/.github/workflows/conformance.yml +++ b/.github/workflows/conformance.yml @@ -429,6 +429,11 @@ jobs: run: docker-compose -f ./.github/infrastructure/docker-compose-kubemq.yml -p kubemq up -d if: contains(matrix.component, 'kubemq') + - name: Start nats with JetStream + run: | + docker-compose -f ./.github/infrastructure/docker-compose-jetstream.yml up -p jetstream -d + if: contains(matrix.component, 'jetstream') + - name: Setup KinD test data if: contains(matrix.component, 'kubernetes') run: | diff --git a/internal/component/kafka/consumer.go b/internal/component/kafka/consumer.go index 80e9d16d9..71efa628a 100644 --- a/internal/component/kafka/consumer.go +++ b/internal/component/kafka/consumer.go @@ -188,7 +188,13 @@ func (consumer *consumer) doCallback(session sarama.ConsumerGroupSession, messag Topic: message.Topic, Data: message.Value, } - + // This is true only when headers are set (Kafka > 0.11) + if len(message.Headers) > 0 { + event.Metadata = make(map[string]string, len(message.Headers)) + for _, header := range message.Headers { + event.Metadata[string(header.Key)] = string(header.Value) + } + } err = handlerConfig.Handler(session.Context(), &event) if err == nil { session.MarkMessage(message, "") diff --git a/tests/config/pubsub/jetstream/pubsub.yml b/tests/config/pubsub/jetstream/pubsub.yml index 631023f9f..1615f18f4 100644 --- a/tests/config/pubsub/jetstream/pubsub.yml +++ b/tests/config/pubsub/jetstream/pubsub.yml @@ -12,3 +12,5 @@ spec: value: config-test - name: flowControl value: true + - name: hearbeat + value: 5s