Merge remote-tracking branch 'upstream/master' into fix-kafka-session-done-merge-master

This commit is contained in:
Mukundan Sundararajan 2022-09-24 00:11:19 +05:30
commit 8b60079501
10 changed files with 36 additions and 25 deletions

View File

@ -69,26 +69,37 @@ func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai
}
}
} else {
for message := range claim.Messages() {
if consumer.k.consumeRetryEnabled {
if err := retry.NotifyRecover(func() error {
return consumer.doCallback(session, message)
}, b, func(err error, d time.Duration) {
consumer.k.logger.Warnf("Error processing Kafka message: %s/%d/%d [key=%s]. Error: %v. Retrying...", message.Topic, message.Partition, message.Offset, asBase64String(message.Key), err)
}, func() {
consumer.k.logger.Infof("Successfully processed Kafka message after it previously failed: %s/%d/%d [key=%s]", message.Topic, message.Partition, message.Offset, asBase64String(message.Key))
}); err != nil {
consumer.k.logger.Errorf("Too many failed attempts at processing Kafka message: %s/%d/%d [key=%s]. Error: %v.", message.Topic, message.Partition, message.Offset, asBase64String(message.Key), err)
for {
select {
case message, ok := <-claim.Messages():
if !ok {
return nil
}
} else {
err := consumer.doCallback(session, message)
if err != nil {
consumer.k.logger.Errorf("Error processing Kafka message: %s/%d/%d [key=%s]. Error: %v.", message.Topic, message.Partition, message.Offset, asBase64String(message.Key), err)
if consumer.k.consumeRetryEnabled {
if err := retry.NotifyRecover(func() error {
return consumer.doCallback(session, message)
}, b, func(err error, d time.Duration) {
consumer.k.logger.Warnf("Error processing Kafka message: %s/%d/%d [key=%s]. Error: %v. Retrying...", message.Topic, message.Partition, message.Offset, asBase64String(message.Key), err)
}, func() {
consumer.k.logger.Infof("Successfully processed Kafka message after it previously failed: %s/%d/%d [key=%s]", message.Topic, message.Partition, message.Offset, asBase64String(message.Key))
}); err != nil {
consumer.k.logger.Errorf("Too many failed attempts at processing Kafka message: %s/%d/%d [key=%s]. Error: %v.", message.Topic, message.Partition, message.Offset, asBase64String(message.Key), err)
}
} else {
err := consumer.doCallback(session, message)
if err != nil {
consumer.k.logger.Errorf("Error processing Kafka message: %s/%d/%d [key=%s]. Error: %v.", message.Topic, message.Partition, message.Offset, asBase64String(message.Key), err)
}
}
// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
// https://github.com/Shopify/sarama/issues/1192
case <-session.Context().Done():
return nil
}
}
}
return nil
}
func (consumer *consumer) flushBulkMessages(claim sarama.ConsumerGroupClaim,

View File

@ -31,7 +31,7 @@ type bearerMiddlewareMetadata struct {
}
// NewBearerMiddleware returns a new oAuth2 middleware.
func NewBearerMiddleware(logger logger.Logger) *Middleware {
func NewBearerMiddleware(logger logger.Logger) middleware.Middleware {
return &Middleware{logger: logger}
}

View File

@ -39,7 +39,7 @@ type oAuth2MiddlewareMetadata struct {
}
// NewOAuth2Middleware returns a new oAuth2 middleware.
func NewOAuth2Middleware() *Middleware {
func NewOAuth2Middleware() middleware.Middleware {
return &Middleware{}
}

View File

@ -50,7 +50,7 @@ type TokenProviderInterface interface {
}
// NewOAuth2ClientCredentialsMiddleware returns a new oAuth2 middleware.
func NewOAuth2ClientCredentialsMiddleware(logger logger.Logger) *Middleware {
func NewOAuth2ClientCredentialsMiddleware(logger logger.Logger) middleware.Middleware {
m := &Middleware{
log: logger,
tokenCache: cache.New(1*time.Hour, 10*time.Minute),

View File

@ -102,7 +102,7 @@ func TestOAuth2ClientCredentialsToken(t *testing.T) {
// Initialize middleware component and inject mocked TokenProvider
log := logger.NewLogger("oauth2clientcredentials.test")
oauth2clientcredentialsMiddleware := NewOAuth2ClientCredentialsMiddleware(log)
oauth2clientcredentialsMiddleware, _ := NewOAuth2ClientCredentialsMiddleware(log).(*Middleware)
oauth2clientcredentialsMiddleware.SetTokenProvider(mockTokenProvider)
handler, err := oauth2clientcredentialsMiddleware.GetHandler(metadata)
require.NoError(t, err)
@ -160,7 +160,7 @@ func TestOAuth2ClientCredentialsCache(t *testing.T) {
// Initialize middleware component and inject mocked TokenProvider
log := logger.NewLogger("oauth2clientcredentials.test")
oauth2clientcredentialsMiddleware := NewOAuth2ClientCredentialsMiddleware(log)
oauth2clientcredentialsMiddleware, _ := NewOAuth2ClientCredentialsMiddleware(log).(*Middleware)
oauth2clientcredentialsMiddleware.SetTokenProvider(mockTokenProvider)
handler, err := oauth2clientcredentialsMiddleware.GetHandler(metadata)
require.NoError(t, err)

View File

@ -39,7 +39,7 @@ type middlewareMetadata struct {
}
// NewMiddleware returns a new Open Policy Agent middleware.
func NewMiddleware(logger logger.Logger) *Middleware {
func NewMiddleware(logger logger.Logger) middleware.Middleware {
return &Middleware{logger: logger}
}

View File

@ -39,7 +39,7 @@ const (
)
// NewRateLimitMiddleware returns a new ratelimit middleware.
func NewRateLimitMiddleware(logger logger.Logger) *Middleware {
func NewRateLimitMiddleware(logger logger.Logger) middleware.Middleware {
return &Middleware{logger: logger}
}

View File

@ -29,7 +29,7 @@ type Metadata struct {
}
// NewRouterCheckerMiddleware returns a new routerchecker middleware.
func NewMiddleware(logger logger.Logger) *Middleware {
func NewMiddleware(logger logger.Logger) middleware.Middleware {
return &Middleware{logger: logger}
}

View File

@ -40,7 +40,7 @@ type middlewareMetadata struct {
}
// NewMiddleware returns a new sentinel middleware.
func NewMiddleware(logger logger.Logger) *Middleware {
func NewMiddleware(logger logger.Logger) middleware.Middleware {
return &Middleware{logger: logger}
}

View File

@ -126,7 +126,7 @@ func TestLoadRules(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
sentinel := NewMiddleware(nil)
sentinel, _ := NewMiddleware(nil).(*Middleware)
err := sentinel.loadSentinelRules(&c.meta)
if c.expectErr {
assert.NotNil(t, err)