Fixed Kafka PubSub to allow multiple handlers for different topics (#1755)
* Fixed Kafka PubSub to allow multiple handlers for different topics With this, tests from #1743 are passing Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> * Removed read lock (for now) Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> * Updated as requested by @skyao Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> * Use interface type everywhere Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> * Fixed panic Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Co-authored-by: Yaron Schneider <schneider.yaron@live.com>
This commit is contained in:
parent
a323748d75
commit
ca9fbf690e
|
@ -51,7 +51,7 @@ type webhookResult struct {
|
|||
}
|
||||
|
||||
type outgoingWebhook struct {
|
||||
handler func(context.Context, *bindings.ReadResponse) ([]byte, error)
|
||||
handler bindings.Handler
|
||||
}
|
||||
|
||||
var webhooks = struct { // nolint: gochecknoglobals
|
||||
|
@ -94,7 +94,7 @@ func (t *DingTalkWebhook) Init(metadata bindings.Metadata) error {
|
|||
}
|
||||
|
||||
// Read triggers the outgoing webhook, not yet production ready.
|
||||
func (t *DingTalkWebhook) Read(handler func(context.Context, *bindings.ReadResponse) ([]byte, error)) error {
|
||||
func (t *DingTalkWebhook) Read(handler bindings.Handler) error {
|
||||
t.logger.Debugf("dingtalk webhook: start read input binding")
|
||||
|
||||
webhooks.Lock()
|
||||
|
|
|
@ -140,7 +140,7 @@ func (n *Nacos) createConfigClient() error {
|
|||
}
|
||||
|
||||
// Read implements InputBinding's Read method.
|
||||
func (n *Nacos) Read(handler func(context.Context, *bindings.ReadResponse) ([]byte, error)) error {
|
||||
func (n *Nacos) Read(handler bindings.Handler) error {
|
||||
n.readHandler = handler
|
||||
|
||||
for _, watch := range n.watches {
|
||||
|
|
|
@ -78,7 +78,7 @@ func (a *AliCloudRocketMQ) Init(metadata bindings.Metadata) error {
|
|||
}
|
||||
|
||||
// Read triggers the rocketmq subscription.
|
||||
func (a *AliCloudRocketMQ) Read(handler func(context.Context, *bindings.ReadResponse) ([]byte, error)) error {
|
||||
func (a *AliCloudRocketMQ) Read(handler bindings.Handler) error {
|
||||
a.logger.Debugf("binding rocketmq: start read input binding")
|
||||
|
||||
var err error
|
||||
|
@ -254,7 +254,7 @@ func (a *AliCloudRocketMQ) send(topic, mqExpr, key string, data []byte) (bool, e
|
|||
|
||||
type mqCallback func(ctx context.Context, msgs ...*primitive.MessageExt) (mqc.ConsumeResult, error)
|
||||
|
||||
func (a *AliCloudRocketMQ) adaptCallback(_, consumerGroup, mqType, mqExpr string, handler func(context.Context, *bindings.ReadResponse) ([]byte, error)) mqCallback {
|
||||
func (a *AliCloudRocketMQ) adaptCallback(_, consumerGroup, mqType, mqExpr string, handler bindings.Handler) mqCallback {
|
||||
return func(ctx context.Context, msgs ...*primitive.MessageExt) (mqc.ConsumeResult, error) {
|
||||
success := true
|
||||
for _, v := range msgs {
|
||||
|
|
|
@ -76,12 +76,12 @@ const (
|
|||
// recordProcessorFactory.
|
||||
type recordProcessorFactory struct {
|
||||
logger logger.Logger
|
||||
handler func(context.Context, *bindings.ReadResponse) ([]byte, error)
|
||||
handler bindings.Handler
|
||||
}
|
||||
|
||||
type recordProcessor struct {
|
||||
logger logger.Logger
|
||||
handler func(context.Context, *bindings.ReadResponse) ([]byte, error)
|
||||
handler bindings.Handler
|
||||
}
|
||||
|
||||
// NewAWSKinesis returns a new AWS Kinesis instance.
|
||||
|
@ -149,7 +149,7 @@ func (a *AWSKinesis) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*
|
|||
return nil, err
|
||||
}
|
||||
|
||||
func (a *AWSKinesis) Read(handler func(context.Context, *bindings.ReadResponse) ([]byte, error)) error {
|
||||
func (a *AWSKinesis) Read(handler bindings.Handler) error {
|
||||
if a.metadata.KinesisConsumerMode == SharedThroughput {
|
||||
a.worker = worker.NewWorker(a.recordProcessorFactory(handler), a.workerConfig)
|
||||
err := a.worker.Start()
|
||||
|
@ -179,7 +179,7 @@ func (a *AWSKinesis) Read(handler func(context.Context, *bindings.ReadResponse)
|
|||
}
|
||||
|
||||
// Subscribe to all shards.
|
||||
func (a *AWSKinesis) Subscribe(ctx context.Context, streamDesc kinesis.StreamDescription, handler func(context.Context, *bindings.ReadResponse) ([]byte, error)) error {
|
||||
func (a *AWSKinesis) Subscribe(ctx context.Context, streamDesc kinesis.StreamDescription, handler bindings.Handler) error {
|
||||
consumerARN, err := a.ensureConsumer(streamDesc.StreamARN)
|
||||
if err != nil {
|
||||
a.logger.Error(err)
|
||||
|
@ -329,7 +329,7 @@ func (a *AWSKinesis) parseMetadata(metadata bindings.Metadata) (*kinesisMetadata
|
|||
return &m, nil
|
||||
}
|
||||
|
||||
func (a *AWSKinesis) recordProcessorFactory(handler func(context.Context, *bindings.ReadResponse) ([]byte, error)) interfaces.IRecordProcessorFactory {
|
||||
func (a *AWSKinesis) recordProcessorFactory(handler bindings.Handler) interfaces.IRecordProcessorFactory {
|
||||
return &recordProcessorFactory{logger: a.logger, handler: handler}
|
||||
}
|
||||
|
||||
|
|
|
@ -88,7 +88,7 @@ func (a *AWSSQS) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bind
|
|||
return nil, err
|
||||
}
|
||||
|
||||
func (a *AWSSQS) Read(handler func(context.Context, *bindings.ReadResponse) ([]byte, error)) error {
|
||||
func (a *AWSSQS) Read(handler bindings.Handler) error {
|
||||
for {
|
||||
result, err := a.Client.ReceiveMessage(&sqs.ReceiveMessageInput{
|
||||
QueueUrl: a.QueueURL,
|
||||
|
|
|
@ -74,7 +74,7 @@ func (a *AzureEventGrid) Init(metadata bindings.Metadata) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (a *AzureEventGrid) Read(handler func(context.Context, *bindings.ReadResponse) ([]byte, error)) error {
|
||||
func (a *AzureEventGrid) Read(handler bindings.Handler) error {
|
||||
err := a.ensureInputBindingMetadata()
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -76,7 +76,7 @@ const (
|
|||
sysPropMessageID = "message-id"
|
||||
)
|
||||
|
||||
func readHandler(e *eventhub.Event, handler func(context.Context, *bindings.ReadResponse) ([]byte, error)) error {
|
||||
func readHandler(e *eventhub.Event, handler bindings.Handler) error {
|
||||
res := bindings.ReadResponse{Data: e.Data, Metadata: map[string]string{}}
|
||||
if e.SystemProperties.SequenceNumber != nil {
|
||||
res.Metadata[sysPropSequenceNumber] = strconv.FormatInt(*e.SystemProperties.SequenceNumber, 10)
|
||||
|
@ -305,7 +305,7 @@ func (a *AzureEventHubs) Invoke(ctx context.Context, req *bindings.InvokeRequest
|
|||
}
|
||||
|
||||
// Read gets messages from eventhubs in a non-blocking fashion.
|
||||
func (a *AzureEventHubs) Read(handler func(context.Context, *bindings.ReadResponse) ([]byte, error)) error {
|
||||
func (a *AzureEventHubs) Read(handler bindings.Handler) error {
|
||||
if !a.metadata.partitioned() {
|
||||
if err := a.RegisterEventProcessor(handler); err != nil {
|
||||
return err
|
||||
|
@ -327,7 +327,7 @@ func (a *AzureEventHubs) Read(handler func(context.Context, *bindings.ReadRespon
|
|||
}
|
||||
|
||||
// RegisterPartitionedEventProcessor - receive eventhub messages by partitionID.
|
||||
func (a *AzureEventHubs) RegisterPartitionedEventProcessor(handler func(context.Context, *bindings.ReadResponse) ([]byte, error)) error {
|
||||
func (a *AzureEventHubs) RegisterPartitionedEventProcessor(handler bindings.Handler) error {
|
||||
ctx := context.Background()
|
||||
|
||||
runtimeInfo, err := a.hub.GetRuntimeInformation(ctx)
|
||||
|
@ -376,7 +376,7 @@ func contains(arr []string, str string) bool {
|
|||
|
||||
// RegisterEventProcessor - receive eventhub messages by eventprocessor
|
||||
// host by balancing partitions.
|
||||
func (a *AzureEventHubs) RegisterEventProcessor(handler func(context.Context, *bindings.ReadResponse) ([]byte, error)) error {
|
||||
func (a *AzureEventHubs) RegisterEventProcessor(handler bindings.Handler) error {
|
||||
leaserCheckpointer, err := storage.NewStorageLeaserCheckpointer(a.storageCredential, a.metadata.storageAccountName, a.metadata.storageContainerName, *a.azureEnvironment)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -244,7 +244,7 @@ func (a *AzureServiceBusQueues) Invoke(ctx context.Context, req *bindings.Invoke
|
|||
return nil, sender.SendMessage(ctx, msg, nil)
|
||||
}
|
||||
|
||||
func (a *AzureServiceBusQueues) Read(handler func(context.Context, *bindings.ReadResponse) ([]byte, error)) error {
|
||||
func (a *AzureServiceBusQueues) Read(handler bindings.Handler) error {
|
||||
for a.ctx.Err() == nil {
|
||||
receiver := a.attemptConnectionForever()
|
||||
if receiver == nil {
|
||||
|
|
|
@ -38,7 +38,7 @@ const (
|
|||
)
|
||||
|
||||
type consumer struct {
|
||||
callback func(context.Context, *bindings.ReadResponse) ([]byte, error)
|
||||
callback bindings.Handler
|
||||
}
|
||||
|
||||
// QueueHelper enables injection for testnig.
|
||||
|
@ -265,7 +265,7 @@ func (a *AzureStorageQueues) Invoke(ctx context.Context, req *bindings.InvokeReq
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
func (a *AzureStorageQueues) Read(handler func(context.Context, *bindings.ReadResponse) ([]byte, error)) error {
|
||||
func (a *AzureStorageQueues) Read(handler bindings.Handler) error {
|
||||
c := consumer{
|
||||
callback: handler,
|
||||
}
|
||||
|
|
|
@ -71,7 +71,7 @@ func (b *Binding) Init(metadata bindings.Metadata) error {
|
|||
}
|
||||
|
||||
// Read triggers the Cron scheduler.
|
||||
func (b *Binding) Read(handler func(context.Context, *bindings.ReadResponse) ([]byte, error)) error {
|
||||
func (b *Binding) Read(handler bindings.Handler) error {
|
||||
c := cron.New(cron.WithParser(b.parser))
|
||||
id, err := c.AddFunc(b.schedule, func() {
|
||||
b.logger.Debugf("name: %s, schedule fired: %v", b.name, time.Now())
|
||||
|
|
|
@ -89,7 +89,7 @@ func (g *GCPPubSub) parseMetadata(metadata bindings.Metadata) ([]byte, error) {
|
|||
return b, err
|
||||
}
|
||||
|
||||
func (g *GCPPubSub) Read(handler func(context.Context, *bindings.ReadResponse) ([]byte, error)) error {
|
||||
func (g *GCPPubSub) Read(handler bindings.Handler) error {
|
||||
sub := g.client.Subscription(g.metadata.Subscription)
|
||||
err := sub.Receive(context.Background(), func(ctx context.Context, m *pubsub.Message) {
|
||||
_, err := handler(ctx, &bindings.ReadResponse{
|
||||
|
|
|
@ -20,5 +20,8 @@ type InputBinding interface {
|
|||
// Init passes connection and properties metadata to the binding implementation
|
||||
Init(metadata Metadata) error
|
||||
// Read is a blocking method that triggers the callback function whenever an event arrives
|
||||
Read(handler func(context.Context, *ReadResponse) ([]byte, error)) error
|
||||
Read(handler Handler) error
|
||||
}
|
||||
|
||||
// Handler is the handler used to invoke the app handler.
|
||||
type Handler func(context.Context, *ReadResponse) ([]byte, error)
|
||||
|
|
|
@ -15,7 +15,10 @@ package kafka
|
|||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"syscall"
|
||||
|
||||
"github.com/dapr/kit/logger"
|
||||
|
||||
|
@ -74,30 +77,42 @@ func (b *Binding) Invoke(_ context.Context, req *bindings.InvokeRequest) (*bindi
|
|||
return nil, err
|
||||
}
|
||||
|
||||
func (b *Binding) Read(handler func(context.Context, *bindings.ReadResponse) ([]byte, error)) error {
|
||||
func (b *Binding) Read(handler bindings.Handler) error {
|
||||
if len(b.topics) == 0 {
|
||||
b.logger.Warnf("kafka binding: no topic defined, input bindings will not be started")
|
||||
return nil
|
||||
}
|
||||
|
||||
err := b.kafka.Subscribe(b.topics, map[string]string{}, newBindingAdapter(handler).adapter)
|
||||
return err
|
||||
ah := adaptHandler(handler)
|
||||
for _, t := range b.topics {
|
||||
b.kafka.AddTopicHandler(t, ah)
|
||||
}
|
||||
|
||||
// Subscribe, in a background goroutine
|
||||
err := b.kafka.Subscribe(context.Background())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Wait until we exit
|
||||
sigCh := make(chan os.Signal, 1)
|
||||
signal.Notify(sigCh,
|
||||
syscall.SIGHUP,
|
||||
syscall.SIGINT,
|
||||
syscall.SIGTERM,
|
||||
syscall.SIGQUIT)
|
||||
<-sigCh
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// bindingAdapter is used to adapter bindings handler to kafka.EventHandler with the same content.
|
||||
type bindingAdapter struct {
|
||||
handler func(context.Context, *bindings.ReadResponse) ([]byte, error)
|
||||
}
|
||||
|
||||
func newBindingAdapter(handler func(context.Context, *bindings.ReadResponse) ([]byte, error)) *bindingAdapter {
|
||||
return &bindingAdapter{handler: handler}
|
||||
}
|
||||
|
||||
func (a *bindingAdapter) adapter(ctx context.Context, event *kafka.NewEvent) error {
|
||||
_, err := a.handler(ctx, &bindings.ReadResponse{
|
||||
Data: event.Data,
|
||||
Metadata: event.Metadata,
|
||||
ContentType: event.ContentType,
|
||||
})
|
||||
return err
|
||||
func adaptHandler(handler bindings.Handler) kafka.EventHandler {
|
||||
return func(ctx context.Context, event *kafka.NewEvent) error {
|
||||
_, err := handler(ctx, &bindings.ReadResponse{
|
||||
Data: event.Data,
|
||||
Metadata: event.Metadata,
|
||||
ContentType: event.ContentType,
|
||||
})
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,59 +0,0 @@
|
|||
/*
|
||||
Copyright 2021 The Dapr Authors
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package kafka
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/dapr/components-contrib/bindings"
|
||||
"github.com/dapr/components-contrib/internal/component/kafka"
|
||||
)
|
||||
|
||||
func TestBindingAdapter(t *testing.T) {
|
||||
// bindingAdapter is used to adapter bindings handler to kafka.EventHandler
|
||||
// step1: prepare a new kafka event
|
||||
ct := "text/plain"
|
||||
event := &kafka.NewEvent{
|
||||
Topic: "topic1",
|
||||
Data: []byte("abcdefg"),
|
||||
Metadata: map[string]string{"k1": "v1"},
|
||||
ContentType: &ct,
|
||||
}
|
||||
ctx := context.Background()
|
||||
|
||||
a := testAdapter{ctx: ctx, event: event, t: t}
|
||||
|
||||
// step2: call this adapter method to mock the new kafka event is triggered from kafka topic
|
||||
err := newBindingAdapter(a.testHandler).adapter(ctx, event)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
type testAdapter struct {
|
||||
ctx context.Context
|
||||
event *kafka.NewEvent
|
||||
t *testing.T
|
||||
}
|
||||
|
||||
func (a *testAdapter) testHandler(ctx context.Context, msg *bindings.ReadResponse) ([]byte, error) {
|
||||
// step3: the binding handler should be called with the adapted binding event with the same content of the kafka event
|
||||
assert.Equal(a.t, ctx, a.ctx)
|
||||
assert.Equal(a.t, msg.Data, a.event.Data)
|
||||
assert.Equal(a.t, msg.Metadata, a.event.Metadata)
|
||||
assert.Equal(a.t, msg.ContentType, a.event.ContentType)
|
||||
|
||||
return nil, nil
|
||||
}
|
|
@ -82,7 +82,7 @@ func (k *kubernetesInput) parseMetadata(metadata bindings.Metadata) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (k *kubernetesInput) Read(handler func(context.Context, *bindings.ReadResponse) ([]byte, error)) error {
|
||||
func (k *kubernetesInput) Read(handler bindings.Handler) error {
|
||||
watchlist := cache.NewListWatchFromClient(
|
||||
k.kubeClient.CoreV1().RESTClient(),
|
||||
"events",
|
||||
|
|
|
@ -222,7 +222,7 @@ func (m *MQTT) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindin
|
|||
})
|
||||
}
|
||||
|
||||
func (m *MQTT) handleMessage(handler func(context.Context, *bindings.ReadResponse) ([]byte, error), mqttMsg mqtt.Message) error {
|
||||
func (m *MQTT) handleMessage(handler bindings.Handler, mqttMsg mqtt.Message) error {
|
||||
msg := bindings.ReadResponse{
|
||||
Data: mqttMsg.Payload(),
|
||||
Metadata: map[string]string{mqttTopic: mqttMsg.Topic()},
|
||||
|
@ -251,7 +251,7 @@ func (m *MQTT) handleMessage(handler func(context.Context, *bindings.ReadRespons
|
|||
}
|
||||
}
|
||||
|
||||
func (m *MQTT) Read(handler func(context.Context, *bindings.ReadResponse) ([]byte, error)) error {
|
||||
func (m *MQTT) Read(handler bindings.Handler) error {
|
||||
sigterm := make(chan os.Signal, 1)
|
||||
signal.Notify(sigterm, os.Interrupt, syscall.SIGTERM)
|
||||
|
||||
|
|
|
@ -236,7 +236,7 @@ func (r *RabbitMQ) declareQueue() (amqp.Queue, error) {
|
|||
return r.channel.QueueDeclare(r.metadata.QueueName, r.metadata.Durable, r.metadata.DeleteWhenUnused, r.metadata.Exclusive, false, args)
|
||||
}
|
||||
|
||||
func (r *RabbitMQ) Read(handler func(context.Context, *bindings.ReadResponse) ([]byte, error)) error {
|
||||
func (r *RabbitMQ) Read(handler bindings.Handler) error {
|
||||
msgs, err := r.channel.Consume(
|
||||
r.queue.Name,
|
||||
"",
|
||||
|
|
|
@ -70,7 +70,7 @@ func (b *Binding) Init(metadata bindings.Metadata) error {
|
|||
}
|
||||
|
||||
// Read triggers the RethinkDB scheduler.
|
||||
func (b *Binding) Read(handler func(context.Context, *bindings.ReadResponse) ([]byte, error)) error {
|
||||
func (b *Binding) Read(handler bindings.Handler) error {
|
||||
b.logger.Infof("subscribing to state changes in %s.%s...", b.config.Database, b.config.Table)
|
||||
cursor, err := r.DB(b.config.Database).Table(b.config.Table).Changes(r.ChangesOpts{
|
||||
IncludeTypes: true,
|
||||
|
|
|
@ -86,7 +86,7 @@ func (t *Binding) Operations() []bindings.OperationKind {
|
|||
}
|
||||
|
||||
// Read triggers the Twitter search and events on each result tweet.
|
||||
func (t *Binding) Read(handler func(context.Context, *bindings.ReadResponse) ([]byte, error)) error {
|
||||
func (t *Binding) Read(handler bindings.Handler) error {
|
||||
if t.query == "" {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -58,7 +58,7 @@ type jobWorkerMetadata struct {
|
|||
}
|
||||
|
||||
type jobHandler struct {
|
||||
callback func(context.Context, *bindings.ReadResponse) ([]byte, error)
|
||||
callback bindings.Handler
|
||||
logger logger.Logger
|
||||
}
|
||||
|
||||
|
@ -89,7 +89,7 @@ func (z *ZeebeJobWorker) Init(metadata bindings.Metadata) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (z *ZeebeJobWorker) Read(handler func(context.Context, *bindings.ReadResponse) ([]byte, error)) error {
|
||||
func (z *ZeebeJobWorker) Read(handler bindings.Handler) error {
|
||||
h := jobHandler{
|
||||
callback: handler,
|
||||
logger: z.logger,
|
||||
|
|
|
@ -27,31 +27,27 @@ import (
|
|||
)
|
||||
|
||||
type consumer struct {
|
||||
k *Kafka
|
||||
ready chan bool
|
||||
callback EventHandler
|
||||
once sync.Once
|
||||
k *Kafka
|
||||
ready chan bool
|
||||
once sync.Once
|
||||
}
|
||||
|
||||
func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
|
||||
if consumer.callback == nil {
|
||||
return fmt.Errorf("nil consumer callback")
|
||||
}
|
||||
|
||||
b := consumer.k.backOffConfig.NewBackOffWithContext(session.Context())
|
||||
for message := range claim.Messages() {
|
||||
if consumer.k.consumeRetryEnabled {
|
||||
if err := retry.NotifyRecover(func() error {
|
||||
return consumer.doCallback(session, message)
|
||||
}, b, func(err error, d time.Duration) {
|
||||
consumer.k.logger.Errorf("Error processing Kafka message: %s/%d/%d [key=%s]. Retrying...", message.Topic, message.Partition, message.Offset, asBase64String(message.Key))
|
||||
consumer.k.logger.Errorf("Error processing Kafka message: %s/%d/%d [key=%s]. Error: %v. Retrying...", message.Topic, message.Partition, message.Offset, asBase64String(message.Key), err)
|
||||
}, func() {
|
||||
consumer.k.logger.Infof("Successfully processed Kafka message after it previously failed: %s/%d/%d [key=%s]", message.Topic, message.Partition, message.Offset, asBase64String(message.Key))
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
_ = consumer.doCallback(session, message)
|
||||
err := consumer.doCallback(session, message)
|
||||
consumer.k.logger.Errorf("Error processing Kafka message: %s/%d/%d [key=%s]. Error: %v.", message.Topic, message.Partition, message.Offset, asBase64String(message.Key), err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -60,11 +56,15 @@ func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai
|
|||
|
||||
func (consumer *consumer) doCallback(session sarama.ConsumerGroupSession, message *sarama.ConsumerMessage) error {
|
||||
consumer.k.logger.Debugf("Processing Kafka message: %s/%d/%d [key=%s]", message.Topic, message.Partition, message.Offset, asBase64String(message.Key))
|
||||
handler, err := consumer.k.GetTopicHandler(message.Topic)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
event := NewEvent{
|
||||
Topic: message.Topic,
|
||||
Data: message.Value,
|
||||
}
|
||||
err := consumer.callback(session.Context(), &event)
|
||||
err = handler(session.Context(), &event)
|
||||
if err == nil {
|
||||
session.MarkMessage(message, "")
|
||||
}
|
||||
|
@ -84,9 +84,37 @@ func (consumer *consumer) Setup(sarama.ConsumerGroupSession) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Subscribe to topic in the Kafka cluster
|
||||
// This call cannot block like its sibling in bindings/kafka because of where this is invoked in runtime.go.
|
||||
func (k *Kafka) Subscribe(topics []string, _ map[string]string, handler EventHandler) error {
|
||||
// AddTopicHandler adds a topic handler
|
||||
func (k *Kafka) AddTopicHandler(topic string, handler EventHandler) {
|
||||
k.subscribeLock.Lock()
|
||||
k.subscribeTopics[topic] = handler
|
||||
k.subscribeLock.Unlock()
|
||||
}
|
||||
|
||||
// RemoveTopicHandler removes a topic handler
|
||||
func (k *Kafka) RemoveTopicHandler(topic string) {
|
||||
k.subscribeLock.Lock()
|
||||
delete(k.subscribeTopics, topic)
|
||||
k.subscribeLock.Unlock()
|
||||
}
|
||||
|
||||
// GetTopicHandler returns the handler for a topic
|
||||
func (k *Kafka) GetTopicHandler(topic string) (EventHandler, error) {
|
||||
k.subscribeLock.RLock()
|
||||
handler, ok := k.subscribeTopics[topic]
|
||||
k.subscribeLock.RUnlock()
|
||||
if !ok || handler == nil {
|
||||
return nil, fmt.Errorf("handler for messages of topic %s not found", topic)
|
||||
}
|
||||
|
||||
return handler, nil
|
||||
}
|
||||
|
||||
// Subscribe to topic in the Kafka cluster, in a background goroutine
|
||||
func (k *Kafka) Subscribe(ctx context.Context) error {
|
||||
k.subscribeLock.Lock()
|
||||
defer k.subscribeLock.Unlock()
|
||||
|
||||
if k.consumerGroup == "" {
|
||||
return errors.New("kafka: consumerGroup must be set to subscribe")
|
||||
}
|
||||
|
@ -101,28 +129,27 @@ func (k *Kafka) Subscribe(topics []string, _ map[string]string, handler EventHan
|
|||
|
||||
k.cg = cg
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
k.cancel = cancel
|
||||
|
||||
ready := make(chan bool)
|
||||
k.consumer = consumer{
|
||||
k: k,
|
||||
ready: ready,
|
||||
callback: handler,
|
||||
k: k,
|
||||
ready: ready,
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
k.logger.Debugf("Closing ConsumerGroup for topics: %v", topics)
|
||||
err := k.cg.Close()
|
||||
if err != nil {
|
||||
k.logger.Errorf("Error closing consumer group: %v", err)
|
||||
}
|
||||
}()
|
||||
topics := k.subscribeTopics.TopicList()
|
||||
|
||||
go func() {
|
||||
k.logger.Debugf("Subscribed and listening to topics: %s", topics)
|
||||
|
||||
for {
|
||||
// If the context was cancelled, as is the case when handling SIGINT and SIGTERM below, then this pops
|
||||
// us out of the consume loop
|
||||
if ctx.Err() != nil {
|
||||
break
|
||||
}
|
||||
|
||||
k.logger.Debugf("Starting loop to consume.")
|
||||
|
||||
// Consume the requested topics
|
||||
|
@ -137,12 +164,12 @@ func (k *Kafka) Subscribe(topics []string, _ map[string]string, handler EventHan
|
|||
if innerErr != nil && !errors.Is(innerErr, context.Canceled) {
|
||||
k.logger.Errorf("Permanent error consuming %v: %v", topics, innerErr)
|
||||
}
|
||||
}
|
||||
|
||||
// If the context was cancelled, as is the case when handling SIGINT and SIGTERM below, then this pops
|
||||
// us out of the consume loop
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
k.logger.Debugf("Closing ConsumerGroup for topics: %v", topics)
|
||||
err := k.cg.Close()
|
||||
if err != nil {
|
||||
k.logger.Errorf("Error closing consumer group: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
|
|
|
@ -15,6 +15,7 @@ package kafka
|
|||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
|
@ -25,18 +26,20 @@ import (
|
|||
|
||||
// Kafka allows reading/writing to a Kafka consumer group.
|
||||
type Kafka struct {
|
||||
producer sarama.SyncProducer
|
||||
consumerGroup string
|
||||
brokers []string
|
||||
logger logger.Logger
|
||||
authType string
|
||||
saslUsername string
|
||||
saslPassword string
|
||||
initialOffset int64
|
||||
cg sarama.ConsumerGroup
|
||||
cancel context.CancelFunc
|
||||
consumer consumer
|
||||
config *sarama.Config
|
||||
producer sarama.SyncProducer
|
||||
consumerGroup string
|
||||
brokers []string
|
||||
logger logger.Logger
|
||||
authType string
|
||||
saslUsername string
|
||||
saslPassword string
|
||||
initialOffset int64
|
||||
cg sarama.ConsumerGroup
|
||||
cancel context.CancelFunc
|
||||
consumer consumer
|
||||
config *sarama.Config
|
||||
subscribeTopics TopicHandlers
|
||||
subscribeLock sync.RWMutex
|
||||
|
||||
backOffConfig retry.Config
|
||||
|
||||
|
@ -49,7 +52,9 @@ type Kafka struct {
|
|||
|
||||
func NewKafka(logger logger.Logger) *Kafka {
|
||||
return &Kafka{
|
||||
logger: logger,
|
||||
logger: logger,
|
||||
subscribeTopics: make(TopicHandlers),
|
||||
subscribeLock: sync.RWMutex{},
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -42,7 +42,8 @@ func (k *Kafka) Publish(topic string, data []byte, metadata map[string]string) e
|
|||
if k.producer == nil {
|
||||
return errors.New("component is closed")
|
||||
}
|
||||
k.logger.Debugf("Publishing topic %v with data: %v", topic, data)
|
||||
// k.logger.Debugf("Publishing topic %v with data: %v", topic, string(data))
|
||||
k.logger.Debugf("Publishing on topic %v", topic)
|
||||
|
||||
msg := &sarama.ProducerMessage{
|
||||
Topic: topic,
|
||||
|
|
|
@ -51,3 +51,17 @@ func isValidPEM(val string) bool {
|
|||
|
||||
return block != nil
|
||||
}
|
||||
|
||||
// Map of topics and their handlers
|
||||
type TopicHandlers map[string]EventHandler
|
||||
|
||||
// TopicList returns the list of topics
|
||||
func (th TopicHandlers) TopicList() []string {
|
||||
topics := make([]string, len(th))
|
||||
i := 0
|
||||
for topic := range th {
|
||||
topics[i] = topic
|
||||
i++
|
||||
}
|
||||
return topics
|
||||
}
|
||||
|
|
|
@ -23,34 +23,16 @@ import (
|
|||
)
|
||||
|
||||
type PubSub struct {
|
||||
kafka *kafka.Kafka
|
||||
topics map[string]bool
|
||||
kafka *kafka.Kafka
|
||||
}
|
||||
|
||||
func (p *PubSub) Init(metadata pubsub.Metadata) error {
|
||||
p.topics = make(map[string]bool)
|
||||
return p.kafka.Init(metadata.Properties)
|
||||
}
|
||||
|
||||
func (p *PubSub) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) error {
|
||||
topics := p.addTopic(req.Topic)
|
||||
|
||||
return p.kafka.Subscribe(topics, req.Metadata, newSubscribeAdapter(handler).adapter)
|
||||
}
|
||||
|
||||
func (p *PubSub) addTopic(newTopic string) []string {
|
||||
// Add topic to our map of topics
|
||||
p.topics[newTopic] = true
|
||||
|
||||
topics := make([]string, len(p.topics))
|
||||
|
||||
i := 0
|
||||
for topic := range p.topics {
|
||||
topics[i] = topic
|
||||
i++
|
||||
}
|
||||
|
||||
return topics
|
||||
p.kafka.AddTopicHandler(req.Topic, adaptHandler(handler))
|
||||
return p.kafka.Subscribe(context.Background())
|
||||
}
|
||||
|
||||
// NewKafka returns a new kafka pubsub instance.
|
||||
|
@ -76,20 +58,13 @@ func (p *PubSub) Features() []pubsub.Feature {
|
|||
return nil
|
||||
}
|
||||
|
||||
// subscribeAdapter is used to adapter pubsub.Handler to kafka.EventHandler with the same content.
|
||||
type subscribeAdapter struct {
|
||||
handler pubsub.Handler
|
||||
}
|
||||
|
||||
func newSubscribeAdapter(handler pubsub.Handler) *subscribeAdapter {
|
||||
return &subscribeAdapter{handler: handler}
|
||||
}
|
||||
|
||||
func (a *subscribeAdapter) adapter(ctx context.Context, event *kafka.NewEvent) error {
|
||||
return a.handler(ctx, &pubsub.NewMessage{
|
||||
Topic: event.Topic,
|
||||
Data: event.Data,
|
||||
Metadata: event.Metadata,
|
||||
ContentType: event.ContentType,
|
||||
})
|
||||
func adaptHandler(handler pubsub.Handler) kafka.EventHandler {
|
||||
return func(ctx context.Context, event *kafka.NewEvent) error {
|
||||
return handler(ctx, &pubsub.NewMessage{
|
||||
Topic: event.Topic,
|
||||
Data: event.Data,
|
||||
Metadata: event.Metadata,
|
||||
ContentType: event.ContentType,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,60 +0,0 @@
|
|||
/*
|
||||
Copyright 2021 The Dapr Authors
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package kafka
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/dapr/components-contrib/internal/component/kafka"
|
||||
"github.com/dapr/components-contrib/pubsub"
|
||||
)
|
||||
|
||||
func TestSubscribeAdapter(t *testing.T) {
|
||||
// subscribeAdapter is used to adapter pubsub.Handler to kafka.EventHandler
|
||||
// step1: prepare a new kafka event
|
||||
ct := "text/plain"
|
||||
event := &kafka.NewEvent{
|
||||
Topic: "topic1",
|
||||
Data: []byte("abcdefg"),
|
||||
Metadata: map[string]string{"k1": "v1"},
|
||||
ContentType: &ct,
|
||||
}
|
||||
ctx := context.Background()
|
||||
|
||||
a := testAdapter{ctx: ctx, event: event, t: t}
|
||||
|
||||
// step2: call this adapter method to mock the new kafka event is triggered from kafka topic
|
||||
err := newSubscribeAdapter(a.testHandler).adapter(ctx, event)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
type testAdapter struct {
|
||||
ctx context.Context
|
||||
event *kafka.NewEvent
|
||||
t *testing.T
|
||||
}
|
||||
|
||||
func (a *testAdapter) testHandler(ctx context.Context, msg *pubsub.NewMessage) error {
|
||||
// step3: the pubsub handler should be called with the adapted pubsub event with the same content of the kafka event
|
||||
assert.Equal(a.t, ctx, a.ctx)
|
||||
assert.Equal(a.t, msg.Topic, a.event.Topic)
|
||||
assert.Equal(a.t, msg.Data, a.event.Data)
|
||||
assert.Equal(a.t, msg.Metadata, a.event.Metadata)
|
||||
assert.Equal(a.t, msg.ContentType, a.event.ContentType)
|
||||
|
||||
return nil
|
||||
}
|
|
@ -301,7 +301,7 @@ func CalcWorker(request *bindings.ReadResponse) ([]byte, error) {
|
|||
func InitTestProcess(
|
||||
cmd *command.ZeebeCommand,
|
||||
id string,
|
||||
testWorker func(context.Context, *bindings.ReadResponse) ([]byte, error),
|
||||
testWorker bindings.Handler,
|
||||
additionalMetadata ...MetadataPair,
|
||||
) error {
|
||||
testJobType := id + "-test"
|
||||
|
|
Loading…
Reference in New Issue