Kafka Pubsub fixes: Avro serialization caching and retries (#3610)
Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com> Co-authored-by: Yaron Schneider <schneider.yaron@live.com>
This commit is contained in:
parent
85cbbf123a
commit
8c02ff33b4
|
@ -14,6 +14,7 @@ limitations under the License.
|
|||
package kafka
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
|
@ -32,6 +33,29 @@ type consumer struct {
|
|||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
func notifyRecover(consumer *consumer, message *sarama.ConsumerMessage, session sarama.ConsumerGroupSession, b backoff.BackOff) error {
|
||||
for {
|
||||
if err := retry.NotifyRecover(func() error {
|
||||
return consumer.doCallback(session, message)
|
||||
}, b, func(err error, d time.Duration) {
|
||||
consumer.k.logger.Warnf("Error processing Kafka message: %s/%d/%d [key=%s]. Error: %v. Retrying...", message.Topic, message.Partition, message.Offset, asBase64String(message.Key), err)
|
||||
}, func() {
|
||||
consumer.k.logger.Infof("Successfully processed Kafka message after it previously failed: %s/%d/%d [key=%s]", message.Topic, message.Partition, message.Offset, asBase64String(message.Key))
|
||||
}); err != nil {
|
||||
// If the retry policy got interrupted, it could mean that either
|
||||
// the policy has reached its maximum number of attempts or the context has been cancelled.
|
||||
// There is a weird edge case where the error returned is a 'context canceled' error but the session.Context is not done.
|
||||
// This is a workaround to handle that edge case and reprocess the current message.
|
||||
if err == context.Canceled && session.Context().Err() == nil {
|
||||
consumer.k.logger.Warnf("Error processing Kafka message: %s/%d/%d [key=%s]. The error returned is 'context canceled' but the session context is not done. Retrying...")
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
|
||||
b := consumer.k.backOffConfig.NewBackOffWithContext(session.Context())
|
||||
isBulkSubscribe := consumer.k.checkBulkSubscribe(claim.Topic())
|
||||
|
@ -83,13 +107,7 @@ func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai
|
|||
}
|
||||
|
||||
if consumer.k.consumeRetryEnabled {
|
||||
if err := retry.NotifyRecover(func() error {
|
||||
return consumer.doCallback(session, message)
|
||||
}, b, func(err error, d time.Duration) {
|
||||
consumer.k.logger.Warnf("Error processing Kafka message: %s/%d/%d [key=%s]. Error: %v. Retrying...", message.Topic, message.Partition, message.Offset, asBase64String(message.Key), err)
|
||||
}, func() {
|
||||
consumer.k.logger.Infof("Successfully processed Kafka message after it previously failed: %s/%d/%d [key=%s]", message.Topic, message.Partition, message.Offset, asBase64String(message.Key))
|
||||
}); err != nil {
|
||||
if err := notifyRecover(consumer, message, session, b); err != nil {
|
||||
consumer.k.logger.Errorf("Too many failed attempts at processing Kafka message: %s/%d/%d [key=%s]. Error: %v.", message.Topic, message.Partition, message.Offset, asBase64String(message.Key), err)
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -208,14 +208,17 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
|
|||
k.consumeRetryInterval = meta.ConsumeRetryInterval
|
||||
|
||||
if meta.SchemaRegistryURL != "" {
|
||||
k.logger.Infof("Schema registry URL '%s' provided. Configuring the Schema Registry client.", meta.SchemaRegistryURL)
|
||||
k.srClient = srclient.CreateSchemaRegistryClient(meta.SchemaRegistryURL)
|
||||
// Empty password is a possibility
|
||||
if meta.SchemaRegistryAPIKey != "" {
|
||||
k.srClient.SetCredentials(meta.SchemaRegistryAPIKey, meta.SchemaRegistryAPISecret)
|
||||
}
|
||||
k.logger.Infof("Schema caching enabled: %v", meta.SchemaCachingEnabled)
|
||||
k.srClient.CachingEnabled(meta.SchemaCachingEnabled)
|
||||
if meta.SchemaCachingEnabled {
|
||||
k.latestSchemaCache = make(map[string]SchemaCacheEntry)
|
||||
k.logger.Debugf("Schema cache TTL: %v", meta.SchemaLatestVersionCacheTTL)
|
||||
k.latestSchemaCacheTTL = meta.SchemaLatestVersionCacheTTL
|
||||
}
|
||||
}
|
||||
|
@ -323,6 +326,7 @@ func (k *Kafka) getLatestSchema(topic string) (*srclient.Schema, *goavro.Codec,
|
|||
if ok && cacheEntry.expirationTime.After(time.Now()) {
|
||||
return cacheEntry.schema, cacheEntry.codec, nil
|
||||
}
|
||||
k.logger.Debugf("Cache not found or expired for subject %s. Fetching from registry...", subject)
|
||||
schema, errSchema := srClient.GetLatestSchema(subject)
|
||||
if errSchema != nil {
|
||||
return nil, nil, errSchema
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
|
||||
mock_srclient "github.com/dapr/components-contrib/common/component/kafka/mocks"
|
||||
"github.com/dapr/kit/logger"
|
||||
)
|
||||
|
||||
func TestGetValueSchemaType(t *testing.T) {
|
||||
|
@ -62,6 +63,7 @@ func TestDeserializeValue(t *testing.T) {
|
|||
k := Kafka{
|
||||
srClient: registry,
|
||||
schemaCachingEnabled: true,
|
||||
logger: logger.NewLogger("kafka_test"),
|
||||
}
|
||||
|
||||
schemaIDBytes := make([]byte, 4)
|
||||
|
@ -175,6 +177,7 @@ func TestSerializeValueCachingDisabled(t *testing.T) {
|
|||
k := Kafka{
|
||||
srClient: registry,
|
||||
schemaCachingEnabled: false,
|
||||
logger: logger.NewLogger("kafka_test"),
|
||||
}
|
||||
|
||||
t.Run("valueSchemaType not set, leave value as is", func(t *testing.T) {
|
||||
|
@ -250,6 +253,7 @@ func TestSerializeValueCachingEnabled(t *testing.T) {
|
|||
schemaCachingEnabled: true,
|
||||
latestSchemaCache: make(map[string]SchemaCacheEntry),
|
||||
latestSchemaCacheTTL: time.Minute * 5,
|
||||
logger: logger.NewLogger("kafka_test"),
|
||||
}
|
||||
|
||||
t.Run("valueSchemaType not set, leave value as is", func(t *testing.T) {
|
||||
|
@ -280,6 +284,7 @@ func TestLatestSchemaCaching(t *testing.T) {
|
|||
schemaCachingEnabled: true,
|
||||
latestSchemaCache: make(map[string]SchemaCacheEntry),
|
||||
latestSchemaCacheTTL: time.Second * 10,
|
||||
logger: logger.NewLogger("kafka_test"),
|
||||
}
|
||||
|
||||
m.EXPECT().GetLatestSchema(gomock.Eq("my-topic-value")).Return(schema, nil).Times(1)
|
||||
|
@ -302,6 +307,7 @@ func TestLatestSchemaCaching(t *testing.T) {
|
|||
schemaCachingEnabled: true,
|
||||
latestSchemaCache: make(map[string]SchemaCacheEntry),
|
||||
latestSchemaCacheTTL: time.Second * 1,
|
||||
logger: logger.NewLogger("kafka_test"),
|
||||
}
|
||||
|
||||
m.EXPECT().GetLatestSchema(gomock.Eq("my-topic-value")).Return(schema, nil).Times(2)
|
||||
|
@ -326,6 +332,7 @@ func TestLatestSchemaCaching(t *testing.T) {
|
|||
schemaCachingEnabled: false,
|
||||
latestSchemaCache: make(map[string]SchemaCacheEntry),
|
||||
latestSchemaCacheTTL: 0,
|
||||
logger: logger.NewLogger("kafka_test"),
|
||||
}
|
||||
|
||||
m.EXPECT().GetLatestSchema(gomock.Eq("my-topic-value")).Return(schema, nil).Times(2)
|
||||
|
|
|
@ -163,6 +163,8 @@ func (k *Kafka) getKafkaMetadata(meta map[string]string) (*KafkaMetadata, error)
|
|||
ClientConnectionKeepAliveInterval: defaultClientConnectionKeepAliveInterval,
|
||||
HeartbeatInterval: 3 * time.Second,
|
||||
SessionTimeout: 10 * time.Second,
|
||||
SchemaCachingEnabled: true,
|
||||
SchemaLatestVersionCacheTTL: 5 * time.Minute,
|
||||
EscapeHeaders: false,
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue