Recover interrupted eventhubs subscriptions (#3344)
Signed-off-by: Bernd Verst <github@bernd.dev>
This commit is contained in:
parent
ba6609e624
commit
5445cead81
|
|
@ -275,13 +275,16 @@ func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, config Subscr
|
|||
Handler: retryHandler,
|
||||
}
|
||||
|
||||
subscriptionLoopFinished := make(chan bool, 1)
|
||||
|
||||
// Process all partition clients as they come in
|
||||
go func() {
|
||||
subscriberLoop := func() {
|
||||
for {
|
||||
// This will block until a new partition client is available
|
||||
// It returns nil if processor.Run terminates or if the context is canceled
|
||||
partitionClient := processor.NextPartitionClient(subscribeCtx)
|
||||
if partitionClient == nil {
|
||||
subscriptionLoopFinished <- true
|
||||
return
|
||||
}
|
||||
aeh.logger.Debugf("Received client for partition %s", partitionClient.PartitionID())
|
||||
|
|
@ -295,15 +298,37 @@ func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, config Subscr
|
|||
}
|
||||
}()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Start the processor
|
||||
go func() {
|
||||
// This is a blocking call that runs until the context is canceled
|
||||
err = processor.Run(subscribeCtx)
|
||||
// Do not log context.Canceled which happens at shutdown
|
||||
if err != nil && !errors.Is(err, context.Canceled) {
|
||||
aeh.logger.Errorf("Error from event processor: %v", err)
|
||||
for {
|
||||
go subscriberLoop()
|
||||
// This is a blocking call that runs until the context is canceled
|
||||
err = processor.Run(subscribeCtx)
|
||||
// Exit if the context is canceled
|
||||
if err != nil && errors.Is(err, context.Canceled) {
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
aeh.logger.Errorf("Error from event processor: %v", err)
|
||||
} else {
|
||||
aeh.logger.Debugf("Event processor terminated without error")
|
||||
}
|
||||
// wait for subscription loop finished signal
|
||||
select {
|
||||
case <-subscribeCtx.Done():
|
||||
return
|
||||
case <-subscriptionLoopFinished:
|
||||
// noop
|
||||
}
|
||||
// Waiting here is not strictly necessary, however, we will wait for a short time to increase the likelihood of transient errors having disappeared
|
||||
select {
|
||||
case <-subscribeCtx.Done():
|
||||
return
|
||||
case <-time.After(5 * time.Second):
|
||||
// noop - continue the for loop
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue