Merge branch 'master' into certInfrastructure

This commit is contained in:
Yaron Schneider 2022-12-22 13:12:45 -08:00 committed by GitHub
commit 0a55ad311d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 33 additions and 1 deletions

View File

@ -0,0 +1,19 @@
version: '2'
services:
nats:
image: nats:2.9.9
ports:
- "4222:4222"
networks: ["nats"]
command: "--js"
startup:
image: natsio/nats-box
networks: ["nats"]
depends_on:
- nats
entrypoint: sh -c "sleep 5 && nats -s nats:4222 stream add pubsub --subjects testTopic,multiTopic1,multiTopic2,testTopicBulk --storage=file --replicas=1 --retention=limits --discard=old --max-msgs=-1 --max-msgs-per-subject=-1 --max-bytes=-1 --max-age=-1 --max-msg-size=-1 --dupe-window=2m0s --no-allow-rollup --no-deny-delete --no-deny-purge"
networks:
nats:
name: nats

View File

@ -429,6 +429,11 @@ jobs:
run: docker-compose -f ./.github/infrastructure/docker-compose-kubemq.yml -p kubemq up -d
if: contains(matrix.component, 'kubemq')
- name: Start nats with JetStream
run: |
docker-compose -f ./.github/infrastructure/docker-compose-jetstream.yml up -p jetstream -d
if: contains(matrix.component, 'jetstream')
- name: Setup KinD test data
if: contains(matrix.component, 'kubernetes')
run: |

View File

@ -188,7 +188,13 @@ func (consumer *consumer) doCallback(session sarama.ConsumerGroupSession, messag
Topic: message.Topic,
Data: message.Value,
}
// This is true only when headers are set (Kafka > 0.11)
if len(message.Headers) > 0 {
event.Metadata = make(map[string]string, len(message.Headers))
for _, header := range message.Headers {
event.Metadata[string(header.Key)] = string(header.Value)
}
}
err = handlerConfig.Handler(session.Context(), &event)
if err == nil {
session.MarkMessage(message, "")

View File

@ -12,3 +12,5 @@ spec:
value: config-test
- name: flowControl
value: true
- name: hearbeat
value: 5s