diff --git a/bindings/redis/metadata.yaml b/bindings/redis/metadata.yaml index efc340750..217f70757 100644 --- a/bindings/redis/metadata.yaml +++ b/bindings/redis/metadata.yaml @@ -104,7 +104,7 @@ metadata: required: false description: | The Redis sentinel master name. Required when "failover" is enabled. - example: "127.0.0.1:6379" + example: "mymaster" url: title: "Redis Sentinel documentation" url: "https://redis.io/docs/manual/sentinel/" diff --git a/common/component/kafka/subscriber.go b/common/component/kafka/subscriber.go index 4e1dc7ae8..a2c51cdda 100644 --- a/common/component/kafka/subscriber.go +++ b/common/component/kafka/subscriber.go @@ -17,6 +17,8 @@ import ( "context" "errors" "time" + + "github.com/dapr/components-contrib/pubsub" ) // Subscribe adds a handler and configuration for a topic, and subscribes. @@ -35,9 +37,26 @@ func (k *Kafka) Subscribe(ctx context.Context, handlerConfig SubscriptionHandler k.wg.Add(1) go func() { defer k.wg.Done() + postAction := func() {} + select { case <-ctx.Done(): + err := context.Cause(ctx) + if errors.Is(err, pubsub.ErrGracefulShutdown) { + k.logger.Debugf("Kafka component is closing. Context is done due to shutdown process.") + postAction = func() { + if k.clients != nil && k.clients.consumerGroup != nil { + k.logger.Debugf("Kafka component is closing. Closing consumer group.") + err := k.clients.consumerGroup.Close() + if err != nil { + k.logger.Errorf("failed to close consumer group: %w", err) + } + } + } + } + case <-k.closeCh: + k.logger.Debugf("Kafka component is closing. Channel is closed.") } k.subscribeLock.Lock() @@ -50,6 +69,7 @@ func (k *Kafka) Subscribe(ctx context.Context, handlerConfig SubscriptionHandler } k.reloadConsumerGroup() + postAction() }() } @@ -87,9 +107,11 @@ func (k *Kafka) consume(ctx context.Context, topics []string, consumer *consumer clients, err := k.latestClients() if err != nil || clients == nil { k.logger.Errorf("failed to get latest Kafka clients: %w", err) + return } if clients.consumerGroup == nil { k.logger.Errorf("component is closed") + return } err = clients.consumerGroup.Consume(ctx, topics, consumer) if errors.Is(err, context.Canceled) { diff --git a/common/component/kafka/subscriber_test.go b/common/component/kafka/subscriber_test.go index 8bdb150e0..135df6f9c 100644 --- a/common/component/kafka/subscriber_test.go +++ b/common/component/kafka/subscriber_test.go @@ -16,6 +16,7 @@ package kafka import ( "context" "errors" + "fmt" "strconv" "sync/atomic" "testing" @@ -26,6 +27,7 @@ import ( "github.com/stretchr/testify/require" "github.com/dapr/components-contrib/common/component/kafka/mocks" + "github.com/dapr/components-contrib/pubsub" "github.com/dapr/kit/logger" ) @@ -234,6 +236,115 @@ func Test_reloadConsumerGroup(t *testing.T) { assert.Equal(t, int64(2), cancelCalled.Load()) assert.Equal(t, int64(2), consumeCalled.Load()) }) + + t.Run("Cancel context whit shutdown error closes consumer group with one subscriber", func(t *testing.T) { + var consumeCalled atomic.Int64 + var cancelCalled atomic.Int64 + var closeCalled atomic.Int64 + waitCh := make(chan struct{}) + cg := mocks.NewConsumerGroup(). + WithConsumeFn(func(ctx context.Context, _ []string, _ sarama.ConsumerGroupHandler) error { + consumeCalled.Add(1) + <-ctx.Done() + cancelCalled.Add(1) + return nil + }).WithCloseFn(func() error { + closeCalled.Add(1) + waitCh <- struct{}{} + return nil + }) + + k := &Kafka{ + logger: logger.NewLogger("test"), + mockConsumerGroup: cg, + consumerCancel: nil, + closeCh: make(chan struct{}), + subscribeTopics: map[string]SubscriptionHandlerConfig{"foo": {}}, + consumeRetryInterval: time.Millisecond, + } + c, err := k.latestClients() + require.NoError(t, err) + + k.clients = c + ctx, cancel := context.WithCancelCause(t.Context()) + k.Subscribe(ctx, SubscriptionHandlerConfig{}, "foo") + assert.Eventually(t, func() bool { + return consumeCalled.Load() == 1 + }, time.Second, time.Millisecond) + assert.Equal(t, int64(0), cancelCalled.Load()) + cancel(pubsub.ErrGracefulShutdown) + <-waitCh + assert.Equal(t, int64(1), closeCalled.Load()) + }) + + t.Run("Cancel context whit shutdown error closes consumer group with multiple subscriber", func(t *testing.T) { + var closeCalled atomic.Int64 + waitCh := make(chan struct{}) + cg := mocks.NewConsumerGroup().WithCloseFn(func() error { + closeCalled.Add(1) + waitCh <- struct{}{} + return nil + }) + + k := &Kafka{ + logger: logger.NewLogger("test"), + mockConsumerGroup: cg, + consumerCancel: nil, + closeCh: make(chan struct{}), + subscribeTopics: map[string]SubscriptionHandlerConfig{"foo": {}}, + consumeRetryInterval: time.Millisecond, + } + c, err := k.latestClients() + require.NoError(t, err) + + k.clients = c + + cancelFns := make([]context.CancelCauseFunc, 0, 100) + for i := range 100 { + ctx, cancel := context.WithCancelCause(t.Context()) + cancelFns = append(cancelFns, cancel) + k.Subscribe(ctx, SubscriptionHandlerConfig{}, fmt.Sprintf("foo%d", i)) + } + cancelFns[0](pubsub.ErrGracefulShutdown) + <-waitCh + assert.Equal(t, int64(1), closeCalled.Load()) + }) + + t.Run("Closing subscriptions with no error or no ErrGracefulShutdown does not close consumer group", func(t *testing.T) { + var closeCalled atomic.Int64 + waitCh := make(chan struct{}) + cg := mocks.NewConsumerGroup().WithCloseFn(func() error { + closeCalled.Add(1) + waitCh <- struct{}{} + return nil + }) + + k := &Kafka{ + logger: logger.NewLogger("test"), + mockConsumerGroup: cg, + consumerCancel: nil, + closeCh: make(chan struct{}), + subscribeTopics: map[string]SubscriptionHandlerConfig{"foo": {}}, + consumeRetryInterval: time.Millisecond, + } + c, err := k.latestClients() + require.NoError(t, err) + + k.clients = c + cancelFns := make([]context.CancelCauseFunc, 0, 100) + for i := range 100 { + ctx, cancel := context.WithCancelCause(t.Context()) + cancelFns = append(cancelFns, cancel) + k.Subscribe(ctx, SubscriptionHandlerConfig{}, fmt.Sprintf("foo%d", i)) + } + cancelFns[0](errors.New("some error")) + time.Sleep(1 * time.Second) + assert.Equal(t, int64(0), closeCalled.Load()) + + cancelFns[4](nil) + time.Sleep(1 * time.Second) + assert.Equal(t, int64(0), closeCalled.Load()) + }) } func Test_Subscribe(t *testing.T) { diff --git a/configuration/redis/metadata.yaml b/configuration/redis/metadata.yaml index 6abccb130..00451159c 100644 --- a/configuration/redis/metadata.yaml +++ b/configuration/redis/metadata.yaml @@ -92,7 +92,7 @@ metadata: required: false description: | The Redis sentinel master name. Required when "failover" is enabled. - example: "127.0.0.1:6379" + example: "mymaster" url: title: "Redis Sentinel documentation" url: "https://redis.io/docs/manual/sentinel/" diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index f61261108..47377d65e 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -22,6 +22,8 @@ import ( "github.com/dapr/components-contrib/metadata" ) +var ErrGracefulShutdown = errors.New("pubsub shutdown") + // PubSub is the interface for message buses. type PubSub interface { metadata.ComponentWithMetadata diff --git a/pubsub/redis/metadata.yaml b/pubsub/redis/metadata.yaml index ed852f207..f841e194b 100644 --- a/pubsub/redis/metadata.yaml +++ b/pubsub/redis/metadata.yaml @@ -186,7 +186,7 @@ metadata: - name: sentinelMasterName required: false description: The sentinel master name. See Redis Sentinel Documentation. - example: "127.0.0.1:6379" + example: "mymaster" type: string - name: maxLenApprox required: false diff --git a/state/redis/metadata.yaml b/state/redis/metadata.yaml index 79898a759..5fcd934ea 100644 --- a/state/redis/metadata.yaml +++ b/state/redis/metadata.yaml @@ -72,7 +72,7 @@ metadata: required: false description: | The Redis sentinel master name. Required when "failover" is enabled. - example: "127.0.0.1:6379" + example: "mymaster" type: string url: title: "Redis Sentinel documentation"