Merge branch 'main' into feat/pulsar-replicate-subscription-state

This commit is contained in:
Dapr Bot 2025-07-25 09:42:10 -07:00 committed by GitHub
commit 8c357c2008
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 139 additions and 4 deletions

View File

@ -104,7 +104,7 @@ metadata:
required: false
description: |
The Redis sentinel master name. Required when "failover" is enabled.
example: "127.0.0.1:6379"
example: "mymaster"
url:
title: "Redis Sentinel documentation"
url: "https://redis.io/docs/manual/sentinel/"

View File

@ -17,6 +17,8 @@ import (
"context"
"errors"
"time"
"github.com/dapr/components-contrib/pubsub"
)
// Subscribe adds a handler and configuration for a topic, and subscribes.
@ -35,9 +37,26 @@ func (k *Kafka) Subscribe(ctx context.Context, handlerConfig SubscriptionHandler
k.wg.Add(1)
go func() {
defer k.wg.Done()
postAction := func() {}
select {
case <-ctx.Done():
err := context.Cause(ctx)
if errors.Is(err, pubsub.ErrGracefulShutdown) {
k.logger.Debugf("Kafka component is closing. Context is done due to shutdown process.")
postAction = func() {
if k.clients != nil && k.clients.consumerGroup != nil {
k.logger.Debugf("Kafka component is closing. Closing consumer group.")
err := k.clients.consumerGroup.Close()
if err != nil {
k.logger.Errorf("failed to close consumer group: %w", err)
}
}
}
}
case <-k.closeCh:
k.logger.Debugf("Kafka component is closing. Channel is closed.")
}
k.subscribeLock.Lock()
@ -50,6 +69,7 @@ func (k *Kafka) Subscribe(ctx context.Context, handlerConfig SubscriptionHandler
}
k.reloadConsumerGroup()
postAction()
}()
}
@ -87,9 +107,11 @@ func (k *Kafka) consume(ctx context.Context, topics []string, consumer *consumer
clients, err := k.latestClients()
if err != nil || clients == nil {
k.logger.Errorf("failed to get latest Kafka clients: %w", err)
return
}
if clients.consumerGroup == nil {
k.logger.Errorf("component is closed")
return
}
err = clients.consumerGroup.Consume(ctx, topics, consumer)
if errors.Is(err, context.Canceled) {

View File

@ -16,6 +16,7 @@ package kafka
import (
"context"
"errors"
"fmt"
"strconv"
"sync/atomic"
"testing"
@ -26,6 +27,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/dapr/components-contrib/common/component/kafka/mocks"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
)
@ -234,6 +236,115 @@ func Test_reloadConsumerGroup(t *testing.T) {
assert.Equal(t, int64(2), cancelCalled.Load())
assert.Equal(t, int64(2), consumeCalled.Load())
})
t.Run("Cancel context whit shutdown error closes consumer group with one subscriber", func(t *testing.T) {
var consumeCalled atomic.Int64
var cancelCalled atomic.Int64
var closeCalled atomic.Int64
waitCh := make(chan struct{})
cg := mocks.NewConsumerGroup().
WithConsumeFn(func(ctx context.Context, _ []string, _ sarama.ConsumerGroupHandler) error {
consumeCalled.Add(1)
<-ctx.Done()
cancelCalled.Add(1)
return nil
}).WithCloseFn(func() error {
closeCalled.Add(1)
waitCh <- struct{}{}
return nil
})
k := &Kafka{
logger: logger.NewLogger("test"),
mockConsumerGroup: cg,
consumerCancel: nil,
closeCh: make(chan struct{}),
subscribeTopics: map[string]SubscriptionHandlerConfig{"foo": {}},
consumeRetryInterval: time.Millisecond,
}
c, err := k.latestClients()
require.NoError(t, err)
k.clients = c
ctx, cancel := context.WithCancelCause(t.Context())
k.Subscribe(ctx, SubscriptionHandlerConfig{}, "foo")
assert.Eventually(t, func() bool {
return consumeCalled.Load() == 1
}, time.Second, time.Millisecond)
assert.Equal(t, int64(0), cancelCalled.Load())
cancel(pubsub.ErrGracefulShutdown)
<-waitCh
assert.Equal(t, int64(1), closeCalled.Load())
})
t.Run("Cancel context whit shutdown error closes consumer group with multiple subscriber", func(t *testing.T) {
var closeCalled atomic.Int64
waitCh := make(chan struct{})
cg := mocks.NewConsumerGroup().WithCloseFn(func() error {
closeCalled.Add(1)
waitCh <- struct{}{}
return nil
})
k := &Kafka{
logger: logger.NewLogger("test"),
mockConsumerGroup: cg,
consumerCancel: nil,
closeCh: make(chan struct{}),
subscribeTopics: map[string]SubscriptionHandlerConfig{"foo": {}},
consumeRetryInterval: time.Millisecond,
}
c, err := k.latestClients()
require.NoError(t, err)
k.clients = c
cancelFns := make([]context.CancelCauseFunc, 0, 100)
for i := range 100 {
ctx, cancel := context.WithCancelCause(t.Context())
cancelFns = append(cancelFns, cancel)
k.Subscribe(ctx, SubscriptionHandlerConfig{}, fmt.Sprintf("foo%d", i))
}
cancelFns[0](pubsub.ErrGracefulShutdown)
<-waitCh
assert.Equal(t, int64(1), closeCalled.Load())
})
t.Run("Closing subscriptions with no error or no ErrGracefulShutdown does not close consumer group", func(t *testing.T) {
var closeCalled atomic.Int64
waitCh := make(chan struct{})
cg := mocks.NewConsumerGroup().WithCloseFn(func() error {
closeCalled.Add(1)
waitCh <- struct{}{}
return nil
})
k := &Kafka{
logger: logger.NewLogger("test"),
mockConsumerGroup: cg,
consumerCancel: nil,
closeCh: make(chan struct{}),
subscribeTopics: map[string]SubscriptionHandlerConfig{"foo": {}},
consumeRetryInterval: time.Millisecond,
}
c, err := k.latestClients()
require.NoError(t, err)
k.clients = c
cancelFns := make([]context.CancelCauseFunc, 0, 100)
for i := range 100 {
ctx, cancel := context.WithCancelCause(t.Context())
cancelFns = append(cancelFns, cancel)
k.Subscribe(ctx, SubscriptionHandlerConfig{}, fmt.Sprintf("foo%d", i))
}
cancelFns[0](errors.New("some error"))
time.Sleep(1 * time.Second)
assert.Equal(t, int64(0), closeCalled.Load())
cancelFns[4](nil)
time.Sleep(1 * time.Second)
assert.Equal(t, int64(0), closeCalled.Load())
})
}
func Test_Subscribe(t *testing.T) {

View File

@ -92,7 +92,7 @@ metadata:
required: false
description: |
The Redis sentinel master name. Required when "failover" is enabled.
example: "127.0.0.1:6379"
example: "mymaster"
url:
title: "Redis Sentinel documentation"
url: "https://redis.io/docs/manual/sentinel/"

View File

@ -22,6 +22,8 @@ import (
"github.com/dapr/components-contrib/metadata"
)
var ErrGracefulShutdown = errors.New("pubsub shutdown")
// PubSub is the interface for message buses.
type PubSub interface {
metadata.ComponentWithMetadata

View File

@ -186,7 +186,7 @@ metadata:
- name: sentinelMasterName
required: false
description: The sentinel master name. See Redis Sentinel Documentation.
example: "127.0.0.1:6379"
example: "mymaster"
type: string
- name: maxLenApprox
required: false

View File

@ -72,7 +72,7 @@ metadata:
required: false
description: |
The Redis sentinel master name. Required when "failover" is enabled.
example: "127.0.0.1:6379"
example: "mymaster"
type: string
url:
title: "Redis Sentinel documentation"