diff --git a/pubsub/azure/eventhubs/eventhubs.go b/pubsub/azure/eventhubs/eventhubs.go index f2a70e85e..6ae4c4300 100644 --- a/pubsub/azure/eventhubs/eventhubs.go +++ b/pubsub/azure/eventhubs/eventhubs.go @@ -240,14 +240,21 @@ func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, req pubsub.Su // Ensure that no subscriber using the old "track 1" SDK is active // TODO(@ItalyPaleAle): Remove this for Dapr 1.13 { - ctx, cancel := context.WithTimeout(subscribeCtx, 10*time.Minute) + ctx, cancel := context.WithTimeout(subscribeCtx, 2*time.Minute) err = aeh.ensureNoTrack1Subscribers(ctx, req.Topic) cancel() if err != nil { // If there's a timeout, it means that the other client was still active after the timeout - // In this case, we actually panic to make sure the user notices the error and ensures the rollout of the new version of Dapr is complete + // In this case, we return an error here so Dapr can continue the initialization and report a "healthy" status (but this subscription won't be active) + // After 2 minutes, then, we panic, which ensures that during a rollout Kubernetes will see that this pod is unhealthy and re-creates that. Hopefully, by then other instances of the app will have been updated and no more locks will be present if errors.Is(err, context.DeadlineExceeded) { - aeh.logger.Fatalf("Another instance is currently subscribed to the topic %s in this Event Hub using an old version of Dapr, and this is not supported. Please ensure that all applications subscribed to the same topic, with this consumer group, are using Dapr 1.10 or newer.") + errMsg := fmt.Sprintf("Another instance is currently subscribed to the topic %s in this Event Hub using an old version of Dapr, and this is not supported. Please ensure that all applications subscribed to the same topic, with this consumer group, are using Dapr 1.10 or newer.", req.Topic) + aeh.logger.Error(errMsg + " ⚠️⚠️⚠️ Dapr will crash in 2 minutes to force the orchestrator to restart the process after the rollout of other instances is complete.") + go func() { + time.Sleep(2 * time.Minute) + aeh.logger.Fatalf("Another instance is currently subscribed to the topic %s in this Event Hub using an old version of Dapr, and this is not supported. Please ensure that all applications subscribed to the same topic, with this consumer group, are using Dapr 1.10 or newer.", req.Topic) + }() + return fmt.Errorf("another instance is currently subscribed to the topic %s in this Event Hub using an old version of Dapr", req.Topic) } // In case of other errors, just return the error diff --git a/pubsub/azure/eventhubs/track1_upgrade.go b/pubsub/azure/eventhubs/track1_upgrade.go index ebae8db1e..3cbb1f72c 100644 --- a/pubsub/azure/eventhubs/track1_upgrade.go +++ b/pubsub/azure/eventhubs/track1_upgrade.go @@ -15,20 +15,24 @@ package eventhubs import ( "context" + "errors" "fmt" + "time" "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" + "github.com/cenkalti/backoff/v4" azauth "github.com/dapr/components-contrib/internal/authentication/azure" "github.com/dapr/kit/logger" + "github.com/dapr/kit/retry" ) // This method ensures that there are currently no active subscribers to the same Event Hub topic that are using the old ("track 1") SDK of Azure Event Hubs. This is the SDK that was in use until Dapr 1.9. // Because the new SDK stores checkpoints in a different way, clients using the new ("track 2") and the old SDK cannot coexist. -// To ensure this doesn't happen, when we create a new subscription to the same topic and with the same consumer group, we check if there's a file in Azure Storage with the checkpoint created by the old SDK and with a still-active lease. If that's true, we wait up to 10 minutes before we crash Dapr with a log message describing what's happening. +// To ensure this doesn't happen, when we create a new subscription to the same topic and with the same consumer group, we check if there's a file in Azure Storage with the checkpoint created by the old SDK and with a still-active lease. If that's true, we wait (until the context expires) before we crash Dapr with a log message describing what's happening. // These conflicts should be transient anyways, as mixed versions of Dapr should only happen during a rollout of a new version of Dapr. // TODO(@ItalyPaleAle): Remove this for Dapr 1.13 func (aeh *AzureEventHubs) ensureNoTrack1Subscribers(parentCtx context.Context, topic string) error { @@ -42,22 +46,45 @@ func (aeh *AzureEventHubs) ensureNoTrack1Subscribers(parentCtx context.Context, // `dapr-(topic)-(consumer-group)-(partition-key)` // We need to list those up and check if they have an active lease prefix := fmt.Sprintf("dapr-%s-%s-", topic, aeh.metadata.ConsumerGroup) - pager := client.NewListBlobsFlatPager(&container.ListBlobsFlatOptions{ - Prefix: &prefix, - }) - for pager.More() { - ctx, cancel := context.WithTimeout(parentCtx, resourceGetTimeout) - resp, err := pager.NextPage(ctx) - cancel() - if err != nil { - fmt.Errorf("failed to list blobs: %w", err) - } - for _, blob := range resp.Segment.BlobItems { - fmt.Println(*blob.Name) - } - } - return nil + // Retry until we find no active leases - or the context expires + backOffConfig := retry.DefaultConfig() + backOffConfig.Policy = retry.PolicyExponential + backOffConfig.MaxInterval = time.Minute + backOffConfig.MaxElapsedTime = 0 + backOffConfig.MaxRetries = -1 + b := backOffConfig.NewBackOffWithContext(parentCtx) + err = backoff.Retry(func() error { + pager := client.NewListBlobsFlatPager(&container.ListBlobsFlatOptions{ + Prefix: &prefix, + }) + for pager.More() { + ctx, cancel := context.WithTimeout(parentCtx, resourceGetTimeout) + resp, err := pager.NextPage(ctx) + cancel() + if err != nil { + return fmt.Errorf("failed to list blobs: %w", err) + } + for _, blob := range resp.Segment.BlobItems { + if blob == nil || blob.Name == nil || blob.Properties == nil || blob.Properties.LeaseState == nil { + continue + } + aeh.logger.Debugf("Found checkpoint from an older Dapr version %s", *blob.Name) + // If the blob is locked, it means that there's another Dapr process with an old version of the SDK running, so we need to wait + if *blob.Properties.LeaseStatus == "locked" { + aeh.logger.Warnf("Found active lease on checkpoint %s from an older Dapr version - waiting for lease to expire", *blob.Name) + return fmt.Errorf("found active lease on checkpoint %s from an old Dapr version", *blob.Name) + } + } + } + return nil + }, b) + + // If the error is a DeadlineExceeded on the operation and not on parentCtx, handle that separately to avoid crashing Dapr needlessly + if err != nil && errors.Is(err, context.DeadlineExceeded) && parentCtx.Err() != context.DeadlineExceeded { + err = errors.New("failed to list blobs: request timed out") + } + return err } func (aeh *AzureEventHubs) createContainerStorageClient() (*container.Client, error) {