Merge branch 'main' into 3318-RavenDB-state-store-new
This commit is contained in:
commit
3b0f1340c2
|
|
@ -104,7 +104,7 @@ metadata:
|
|||
required: false
|
||||
description: |
|
||||
The Redis sentinel master name. Required when "failover" is enabled.
|
||||
example: "127.0.0.1:6379"
|
||||
example: "mymaster"
|
||||
url:
|
||||
title: "Redis Sentinel documentation"
|
||||
url: "https://redis.io/docs/manual/sentinel/"
|
||||
|
|
|
|||
|
|
@ -17,6 +17,8 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/dapr/components-contrib/pubsub"
|
||||
)
|
||||
|
||||
// Subscribe adds a handler and configuration for a topic, and subscribes.
|
||||
|
|
@ -35,9 +37,26 @@ func (k *Kafka) Subscribe(ctx context.Context, handlerConfig SubscriptionHandler
|
|||
k.wg.Add(1)
|
||||
go func() {
|
||||
defer k.wg.Done()
|
||||
postAction := func() {}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err := context.Cause(ctx)
|
||||
if errors.Is(err, pubsub.ErrGracefulShutdown) {
|
||||
k.logger.Debugf("Kafka component is closing. Context is done due to shutdown process.")
|
||||
postAction = func() {
|
||||
if k.clients != nil && k.clients.consumerGroup != nil {
|
||||
k.logger.Debugf("Kafka component is closing. Closing consumer group.")
|
||||
err := k.clients.consumerGroup.Close()
|
||||
if err != nil {
|
||||
k.logger.Errorf("failed to close consumer group: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case <-k.closeCh:
|
||||
k.logger.Debugf("Kafka component is closing. Channel is closed.")
|
||||
}
|
||||
|
||||
k.subscribeLock.Lock()
|
||||
|
|
@ -50,6 +69,7 @@ func (k *Kafka) Subscribe(ctx context.Context, handlerConfig SubscriptionHandler
|
|||
}
|
||||
|
||||
k.reloadConsumerGroup()
|
||||
postAction()
|
||||
}()
|
||||
}
|
||||
|
||||
|
|
@ -87,9 +107,11 @@ func (k *Kafka) consume(ctx context.Context, topics []string, consumer *consumer
|
|||
clients, err := k.latestClients()
|
||||
if err != nil || clients == nil {
|
||||
k.logger.Errorf("failed to get latest Kafka clients: %w", err)
|
||||
return
|
||||
}
|
||||
if clients.consumerGroup == nil {
|
||||
k.logger.Errorf("component is closed")
|
||||
return
|
||||
}
|
||||
err = clients.consumerGroup.Consume(ctx, topics, consumer)
|
||||
if errors.Is(err, context.Canceled) {
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ package kafka
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
|
@ -26,6 +27,7 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/dapr/components-contrib/common/component/kafka/mocks"
|
||||
"github.com/dapr/components-contrib/pubsub"
|
||||
"github.com/dapr/kit/logger"
|
||||
)
|
||||
|
||||
|
|
@ -234,6 +236,115 @@ func Test_reloadConsumerGroup(t *testing.T) {
|
|||
assert.Equal(t, int64(2), cancelCalled.Load())
|
||||
assert.Equal(t, int64(2), consumeCalled.Load())
|
||||
})
|
||||
|
||||
t.Run("Cancel context whit shutdown error closes consumer group with one subscriber", func(t *testing.T) {
|
||||
var consumeCalled atomic.Int64
|
||||
var cancelCalled atomic.Int64
|
||||
var closeCalled atomic.Int64
|
||||
waitCh := make(chan struct{})
|
||||
cg := mocks.NewConsumerGroup().
|
||||
WithConsumeFn(func(ctx context.Context, _ []string, _ sarama.ConsumerGroupHandler) error {
|
||||
consumeCalled.Add(1)
|
||||
<-ctx.Done()
|
||||
cancelCalled.Add(1)
|
||||
return nil
|
||||
}).WithCloseFn(func() error {
|
||||
closeCalled.Add(1)
|
||||
waitCh <- struct{}{}
|
||||
return nil
|
||||
})
|
||||
|
||||
k := &Kafka{
|
||||
logger: logger.NewLogger("test"),
|
||||
mockConsumerGroup: cg,
|
||||
consumerCancel: nil,
|
||||
closeCh: make(chan struct{}),
|
||||
subscribeTopics: map[string]SubscriptionHandlerConfig{"foo": {}},
|
||||
consumeRetryInterval: time.Millisecond,
|
||||
}
|
||||
c, err := k.latestClients()
|
||||
require.NoError(t, err)
|
||||
|
||||
k.clients = c
|
||||
ctx, cancel := context.WithCancelCause(t.Context())
|
||||
k.Subscribe(ctx, SubscriptionHandlerConfig{}, "foo")
|
||||
assert.Eventually(t, func() bool {
|
||||
return consumeCalled.Load() == 1
|
||||
}, time.Second, time.Millisecond)
|
||||
assert.Equal(t, int64(0), cancelCalled.Load())
|
||||
cancel(pubsub.ErrGracefulShutdown)
|
||||
<-waitCh
|
||||
assert.Equal(t, int64(1), closeCalled.Load())
|
||||
})
|
||||
|
||||
t.Run("Cancel context whit shutdown error closes consumer group with multiple subscriber", func(t *testing.T) {
|
||||
var closeCalled atomic.Int64
|
||||
waitCh := make(chan struct{})
|
||||
cg := mocks.NewConsumerGroup().WithCloseFn(func() error {
|
||||
closeCalled.Add(1)
|
||||
waitCh <- struct{}{}
|
||||
return nil
|
||||
})
|
||||
|
||||
k := &Kafka{
|
||||
logger: logger.NewLogger("test"),
|
||||
mockConsumerGroup: cg,
|
||||
consumerCancel: nil,
|
||||
closeCh: make(chan struct{}),
|
||||
subscribeTopics: map[string]SubscriptionHandlerConfig{"foo": {}},
|
||||
consumeRetryInterval: time.Millisecond,
|
||||
}
|
||||
c, err := k.latestClients()
|
||||
require.NoError(t, err)
|
||||
|
||||
k.clients = c
|
||||
|
||||
cancelFns := make([]context.CancelCauseFunc, 0, 100)
|
||||
for i := range 100 {
|
||||
ctx, cancel := context.WithCancelCause(t.Context())
|
||||
cancelFns = append(cancelFns, cancel)
|
||||
k.Subscribe(ctx, SubscriptionHandlerConfig{}, fmt.Sprintf("foo%d", i))
|
||||
}
|
||||
cancelFns[0](pubsub.ErrGracefulShutdown)
|
||||
<-waitCh
|
||||
assert.Equal(t, int64(1), closeCalled.Load())
|
||||
})
|
||||
|
||||
t.Run("Closing subscriptions with no error or no ErrGracefulShutdown does not close consumer group", func(t *testing.T) {
|
||||
var closeCalled atomic.Int64
|
||||
waitCh := make(chan struct{})
|
||||
cg := mocks.NewConsumerGroup().WithCloseFn(func() error {
|
||||
closeCalled.Add(1)
|
||||
waitCh <- struct{}{}
|
||||
return nil
|
||||
})
|
||||
|
||||
k := &Kafka{
|
||||
logger: logger.NewLogger("test"),
|
||||
mockConsumerGroup: cg,
|
||||
consumerCancel: nil,
|
||||
closeCh: make(chan struct{}),
|
||||
subscribeTopics: map[string]SubscriptionHandlerConfig{"foo": {}},
|
||||
consumeRetryInterval: time.Millisecond,
|
||||
}
|
||||
c, err := k.latestClients()
|
||||
require.NoError(t, err)
|
||||
|
||||
k.clients = c
|
||||
cancelFns := make([]context.CancelCauseFunc, 0, 100)
|
||||
for i := range 100 {
|
||||
ctx, cancel := context.WithCancelCause(t.Context())
|
||||
cancelFns = append(cancelFns, cancel)
|
||||
k.Subscribe(ctx, SubscriptionHandlerConfig{}, fmt.Sprintf("foo%d", i))
|
||||
}
|
||||
cancelFns[0](errors.New("some error"))
|
||||
time.Sleep(1 * time.Second)
|
||||
assert.Equal(t, int64(0), closeCalled.Load())
|
||||
|
||||
cancelFns[4](nil)
|
||||
time.Sleep(1 * time.Second)
|
||||
assert.Equal(t, int64(0), closeCalled.Load())
|
||||
})
|
||||
}
|
||||
|
||||
func Test_Subscribe(t *testing.T) {
|
||||
|
|
|
|||
|
|
@ -92,7 +92,7 @@ metadata:
|
|||
required: false
|
||||
description: |
|
||||
The Redis sentinel master name. Required when "failover" is enabled.
|
||||
example: "127.0.0.1:6379"
|
||||
example: "mymaster"
|
||||
url:
|
||||
title: "Redis Sentinel documentation"
|
||||
url: "https://redis.io/docs/manual/sentinel/"
|
||||
|
|
|
|||
|
|
@ -22,6 +22,8 @@ import (
|
|||
"github.com/dapr/components-contrib/metadata"
|
||||
)
|
||||
|
||||
var ErrGracefulShutdown = errors.New("pubsub shutdown")
|
||||
|
||||
// PubSub is the interface for message buses.
|
||||
type PubSub interface {
|
||||
metadata.ComponentWithMetadata
|
||||
|
|
|
|||
|
|
@ -186,7 +186,7 @@ metadata:
|
|||
- name: sentinelMasterName
|
||||
required: false
|
||||
description: The sentinel master name. See Redis Sentinel Documentation.
|
||||
example: "127.0.0.1:6379"
|
||||
example: "mymaster"
|
||||
type: string
|
||||
- name: maxLenApprox
|
||||
required: false
|
||||
|
|
|
|||
|
|
@ -72,7 +72,7 @@ metadata:
|
|||
required: false
|
||||
description: |
|
||||
The Redis sentinel master name. Required when "failover" is enabled.
|
||||
example: "127.0.0.1:6379"
|
||||
example: "mymaster"
|
||||
type: string
|
||||
url:
|
||||
title: "Redis Sentinel documentation"
|
||||
|
|
|
|||
Loading…
Reference in New Issue