Remove deprecated code from EventHubs component (#3202)

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
Signed-off-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com>
Co-authored-by: Bernd Verst <github@bernd.dev>
This commit is contained in:
Alessandro (Ale) Segala 2023-11-01 15:46:19 -07:00 committed by GitHub
parent 7114fd0279
commit ef68cb6933
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 7 additions and 137 deletions

View File

@ -56,9 +56,6 @@ type AzureEventHubs struct {
checkpointStoreLock *sync.RWMutex
managementCreds azcore.TokenCredential
// TODO(@ItalyPaleAle): Remove in Dapr 1.13
isFailed atomic.Bool
}
// HandlerResponseItem represents a response from the handler for each message.
@ -251,37 +248,6 @@ func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, config Subscr
return fmt.Errorf("error trying to establish a connection: %w", err)
}
// Ensure that no subscriber using the old "track 1" SDK is active
// TODO(@ItalyPaleAle): Remove this for Dapr 1.13
{
// If a previous topic already failed, no need to try with other topics, as we're about to panic anyways
if aeh.isFailed.Load() {
return errors.New("subscribing to another topic on this component failed and Dapr is scheduled to crash; will not try subscribing to a new topic")
}
ctx, cancel := context.WithTimeout(subscribeCtx, 2*time.Minute)
err = aeh.ensureNoTrack1Subscribers(ctx, topic)
cancel()
if err != nil {
// If there's a timeout, it means that the other client was still active after the timeout
// In this case, we return an error here so Dapr can continue the initialization and report a "healthy" status (but this subscription won't be active)
// After 2 minutes, then, we panic, which ensures that during a rollout Kubernetes will see that this pod is unhealthy and re-creates that. Hopefully, by then other instances of the app will have been updated and no more locks will be present
if errors.Is(err, context.DeadlineExceeded) {
aeh.isFailed.Store(true)
errMsg := fmt.Sprintf("Another instance is currently subscribed to the topic %s in this Event Hub using an old version of Dapr, and this is not supported. Please ensure that all applications subscribed to the same topic, with this consumer group, are using Dapr 1.10 or newer.", topic)
aeh.logger.Error(errMsg + " ⚠️⚠️⚠️ Dapr will crash in 2 minutes to force the orchestrator to restart the process after the rollout of other instances is complete.")
go func() {
time.Sleep(2 * time.Minute)
aeh.logger.Fatalf("Another instance is currently subscribed to the topic %s in this Event Hub using an old version of Dapr, and this is not supported. Please ensure that all applications subscribed to the same topic, with this consumer group, are using Dapr 1.10 or newer.", topic)
}()
return fmt.Errorf("another instance is currently subscribed to the topic %s in this Event Hub using an old version of Dapr", topic)
}
// In case of other errors, just return the error
return fmt.Errorf("failed to check for subscribers using an old version of Dapr: %w", err)
}
}
// This component has built-in retries because Event Hubs doesn't support N/ACK for messages
retryHandler := func(ctx context.Context, events []*azeventhubs.ReceivedEventData) ([]HandlerResponseItem, error) {
b := aeh.backOffConfig.NewBackOffWithContext(ctx)
@ -621,7 +587,7 @@ func (aeh *AzureEventHubs) createCheckpointStore(ctx context.Context) (checkpoin
}
// Get the Azure Blob Storage client and ensure the container exists
client, err := aeh.createStorageClient(ctx, true)
client, err := aeh.createStorageClient(ctx)
if err != nil {
return nil, err
}
@ -641,8 +607,7 @@ func (aeh *AzureEventHubs) createCheckpointStore(ctx context.Context) (checkpoin
}
// Creates a client to access Azure Blob Storage.
// TODO(@ItalyPaleAle): Remove ensureContainer option (and default to true) for Dapr 1.13
func (aeh *AzureEventHubs) createStorageClient(ctx context.Context, ensureContainer bool) (*container.Client, error) {
func (aeh *AzureEventHubs) createStorageClient(ctx context.Context) (*container.Client, error) {
m := blobstorage.ContainerClientOpts{
ConnectionString: aeh.metadata.StorageConnectionString,
ContainerName: aeh.metadata.StorageContainerName,
@ -655,13 +620,11 @@ func (aeh *AzureEventHubs) createStorageClient(ctx context.Context, ensureContai
return nil, err
}
if ensureContainer {
// Ensure the container exists
// We're setting "accessLevel" to nil to make sure it's private
err = m.EnsureContainer(ctx, client, nil)
if err != nil {
return nil, err
}
// Ensure the container exists
// We're setting "accessLevel" to nil to make sure it's private
err = m.EnsureContainer(ctx, client, nil)
if err != nil {
return nil, err
}
return client, nil

View File

@ -1,93 +0,0 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package eventhubs
import (
"context"
"errors"
"fmt"
"net/http"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/cenkalti/backoff/v4"
"github.com/dapr/kit/retry"
)
// This method ensures that there are currently no active subscribers to the same Event Hub topic that are using the old ("track 1") SDK of Azure Event Hubs. This is the SDK that was in use until Dapr 1.9.
// Because the new SDK stores checkpoints in a different way, clients using the new ("track 2") and the old SDK cannot coexist.
// To ensure this doesn't happen, when we create a new subscription to the same topic and with the same consumer group, we check if there's a file in Azure Storage with the checkpoint created by the old SDK and with a still-active lease. If that's true, we wait (until the context expires) before we crash Dapr with a log message describing what's happening.
// These conflicts should be transient anyways, as mixed versions of Dapr should only happen during a rollout of a new version of Dapr.
// TODO(@ItalyPaleAle): Remove this (entire file) for Dapr 1.13
func (aeh *AzureEventHubs) ensureNoTrack1Subscribers(parentCtx context.Context, topic string) error {
// Get a client to Azure Blob Storage
// Because we are not using "ensureContainer=true", we can pass a nil context
client, err := aeh.createStorageClient(nil, false) //nolint:staticcheck
if err != nil {
return err
}
// In the old version of the SDK, checkpoints were stored in the root of the storage account and were named like:
// `dapr-(topic)-(consumer-group)-(partition-key)`
// We need to list those up and check if they have an active lease
prefix := fmt.Sprintf("dapr-%s-%s-", topic, aeh.metadata.ConsumerGroup)
// Retry until we find no active leases - or the context expires
backOffConfig := retry.DefaultConfig()
backOffConfig.Policy = retry.PolicyExponential
backOffConfig.MaxInterval = time.Minute
backOffConfig.MaxElapsedTime = 0
backOffConfig.MaxRetries = -1
b := backOffConfig.NewBackOffWithContext(parentCtx)
err = backoff.Retry(func() error {
pager := client.NewListBlobsFlatPager(&container.ListBlobsFlatOptions{
Prefix: &prefix,
})
for pager.More() {
ctx, cancel := context.WithTimeout(parentCtx, resourceGetTimeout)
resp, innerErr := pager.NextPage(ctx)
cancel()
if innerErr != nil {
// Treat these errors as permanent
resErr := &azcore.ResponseError{}
if !errors.As(err, &resErr) || resErr.StatusCode != http.StatusNotFound {
// A "not-found" error means that the storage container doesn't exist, so let's not handle it here
// Just return no error
return nil
}
return backoff.Permanent(fmt.Errorf("failed to list blobs: %w", innerErr))
}
for _, blob := range resp.Segment.BlobItems {
if blob == nil || blob.Name == nil || blob.Properties == nil || blob.Properties.LeaseState == nil {
continue
}
aeh.logger.Debugf("Found checkpoint from an older Dapr version %s", *blob.Name)
// If the blob is locked, it means that there's another Dapr process with an old version of the SDK running, so we need to wait
if *blob.Properties.LeaseStatus == "locked" {
aeh.logger.Warnf("Found active lease on checkpoint %s from an older Dapr version - waiting for lease to expire", *blob.Name)
return fmt.Errorf("found active lease on checkpoint %s from an old Dapr version", *blob.Name)
}
}
}
return nil
}, b)
// If the error is a DeadlineExceeded on the operation and not on parentCtx, handle that separately to avoid crashing Dapr needlessly
if err != nil && errors.Is(err, context.DeadlineExceeded) && parentCtx.Err() != context.DeadlineExceeded {
err = errors.New("failed to list blobs: request timed out")
}
return err
}