Merge pull request #1839 from ItalyPaleAle/revert-1798-eventhubs

Reverting #1798 for pubsub.azure.eventhubs
This commit is contained in:
Bernd Verst 2022-07-01 11:48:45 -07:00 committed by GitHub
commit bfd6137632
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 21 additions and 2 deletions

View File

@ -132,6 +132,7 @@ type AzureEventHubs struct {
logger logger.Logger
publishCtx context.Context
publishCancel context.CancelFunc
backOffConfig retry.Config
hubClients map[string]*eventhub.Hub
eventProcessors map[string]*eph.EventProcessorHost
hubManager *eventhub.HubManager
@ -532,6 +533,14 @@ func (aeh *AzureEventHubs) Init(metadata pubsub.Metadata) error {
aeh.publishCtx, aeh.publishCancel = context.WithCancel(context.Background())
// Default retry configuration is used if no backOff properties are set.
if err := retry.DecodeConfigWithPrefix(
&aeh.backOffConfig,
metadata.Properties,
"backOff"); err != nil {
return err
}
return nil
}
@ -583,8 +592,18 @@ func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, req pubsub.Su
aeh.logger.Debugf("registering handler for topic %s", req.Topic)
_, err = processor.RegisterHandler(subscribeCtx,
func(_ context.Context, e *eventhub.Event) error {
aeh.logger.Debugf("Processing EventHubs event %s/%s", req.Topic, e.ID)
return subscribeHandler(subscribeCtx, req.Topic, e, handler)
// This component has built-in retries because Event Hubs doesn't support N/ACK for messages
b := aeh.backOffConfig.NewBackOffWithContext(subscribeCtx)
return retry.NotifyRecover(func() error {
aeh.logger.Debugf("Processing EventHubs event %s/%s", req.Topic, e.ID)
return subscribeHandler(subscribeCtx, req.Topic, e, handler)
}, b, func(_ error, _ time.Duration) {
aeh.logger.Errorf("Error processing EventHubs event: %s/%s. Retrying...", req.Topic, e.ID)
}, func() {
aeh.logger.Errorf("Successfully processed EventHubs event after it previously failed: %s/%s", req.Topic, e.ID)
})
})
if err != nil {
return err