diff --git a/pubsub/azure/eventhubs/eventhubs.go b/pubsub/azure/eventhubs/eventhubs.go index 9cf7e38c3..e1f3cbd2d 100644 --- a/pubsub/azure/eventhubs/eventhubs.go +++ b/pubsub/azure/eventhubs/eventhubs.go @@ -132,6 +132,7 @@ type AzureEventHubs struct { logger logger.Logger publishCtx context.Context publishCancel context.CancelFunc + backOffConfig retry.Config hubClients map[string]*eventhub.Hub eventProcessors map[string]*eph.EventProcessorHost hubManager *eventhub.HubManager @@ -532,6 +533,14 @@ func (aeh *AzureEventHubs) Init(metadata pubsub.Metadata) error { aeh.publishCtx, aeh.publishCancel = context.WithCancel(context.Background()) + // Default retry configuration is used if no backOff properties are set. + if err := retry.DecodeConfigWithPrefix( + &aeh.backOffConfig, + metadata.Properties, + "backOff"); err != nil { + return err + } + return nil } @@ -583,8 +592,18 @@ func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, req pubsub.Su aeh.logger.Debugf("registering handler for topic %s", req.Topic) _, err = processor.RegisterHandler(subscribeCtx, func(_ context.Context, e *eventhub.Event) error { - aeh.logger.Debugf("Processing EventHubs event %s/%s", req.Topic, e.ID) - return subscribeHandler(subscribeCtx, req.Topic, e, handler) + // This component has built-in retries because Event Hubs doesn't support N/ACK for messages + b := aeh.backOffConfig.NewBackOffWithContext(subscribeCtx) + + return retry.NotifyRecover(func() error { + aeh.logger.Debugf("Processing EventHubs event %s/%s", req.Topic, e.ID) + + return subscribeHandler(subscribeCtx, req.Topic, e, handler) + }, b, func(_ error, _ time.Duration) { + aeh.logger.Errorf("Error processing EventHubs event: %s/%s. Retrying...", req.Topic, e.ID) + }, func() { + aeh.logger.Errorf("Successfully processed EventHubs event after it previously failed: %s/%s", req.Topic, e.ID) + }) }) if err != nil { return err