From a08988c25ed32263809d9fdacaac90edb7b005a1 Mon Sep 17 00:00:00 2001 From: "Alessandro (Ale) Segala" <43508+ItalyPaleAle@users.noreply.github.com> Date: Fri, 17 Jun 2022 15:12:15 -0700 Subject: [PATCH] Shared implementation for Azure Service Bus subscriptions for binding and pubsub (#1791) * Moved ASB subscription code to a shared package Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> * Moved ASBQ binding to use the shared ASB implementation Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> * Updating Azure Service Bus SDK version Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- bindings/azure/servicebusqueues/metadata.go | 124 ++++++++ ...vicebusqueues_test.go => metadata_test.go} | 4 +- .../servicebusqueues/servicebusqueues.go | 286 +++++------------- go.mod | 12 +- go.sum | 16 +- .../azure/servicebus/subscription.go | 196 ++++++------ pubsub/azure/servicebus/servicebus.go | 90 +++--- .../bindings/azure/blobstorage/go.mod | 6 +- .../bindings/azure/blobstorage/go.sum | 12 +- .../bindings/azure/cosmosdb/go.mod | 6 +- .../bindings/azure/cosmosdb/go.sum | 12 +- .../bindings/azure/eventhubs/go.mod | 8 +- .../bindings/azure/eventhubs/go.sum | 16 +- .../bindings/azure/servicebusqueues/go.mod | 7 +- .../bindings/azure/servicebusqueues/go.sum | 14 +- .../pubsub/azure/eventhubs/go.mod | 8 +- .../pubsub/azure/eventhubs/go.sum | 16 +- .../secretstores/azure/keyvault/go.mod | 6 +- .../secretstores/azure/keyvault/go.sum | 12 +- 19 files changed, 426 insertions(+), 425 deletions(-) create mode 100644 bindings/azure/servicebusqueues/metadata.go rename bindings/azure/servicebusqueues/{servicebusqueues_test.go => metadata_test.go} (97%) rename {pubsub => internal/component}/azure/servicebus/subscription.go (54%) diff --git a/bindings/azure/servicebusqueues/metadata.go b/bindings/azure/servicebusqueues/metadata.go new file mode 100644 index 000000000..8fb8db8eb --- /dev/null +++ b/bindings/azure/servicebusqueues/metadata.go @@ -0,0 +1,124 @@ +package servicebusqueues + +import ( + "encoding/json" + "errors" + "strings" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" + + "github.com/dapr/components-contrib/bindings" + contrib_metadata "github.com/dapr/components-contrib/metadata" +) + +type serviceBusQueuesMetadata struct { + ConnectionString string `json:"connectionString"` + NamespaceName string `json:"namespaceName,omitempty"` + QueueName string `json:"queueName"` + TimeoutInSec int `json:"timeoutInSec"` + MaxConnectionRecoveryInSec int `json:"maxConnectionRecoveryInSec"` + MinConnectionRecoveryInSec int `json:"minConnectionRecoveryInSec"` + MaxRetriableErrorsPerSec *int `json:"maxRetriableErrorsPerSec"` + MaxActiveMessages int `json:"maxActiveMessages"` + LockRenewalInSec int `json:"lockRenewalInSec"` + MaxConcurrentHandlers int `json:"maxConcurrentHandlers"` + ttl time.Duration +} + +const ( + // Default time to live for queues, which is 14 days. The same way Azure Portal does. + defaultMessageTimeToLive = time.Hour * 24 * 14 + + // Default timeout in seconds + defaultTimeoutInSec = 60 + + // Default minimum and maximum recovery time while trying to reconnect + defaultMinConnectionRecoveryInSec = 2 + defaultMaxConnectionRecoveryInSec = 300 + + // Default lock renewal interval, in seconds + defaultLockRenewalInSec = 20 + + // Default number of max active messages + // Max active messages should be >= max concurrent handlers + defaultMaxActiveMessages = 1 + + // Default number of max concurrent handlers + // For backwards-compatibility reasons, this only handles one message at a time + defaultMaxConcurrentHandlers = 1 + + // Default rate of retriable errors per second + defaultMaxRetriableErrorsPerSec = 10 +) + +func (a *AzureServiceBusQueues) parseMetadata(metadata bindings.Metadata) (*serviceBusQueuesMetadata, error) { + b, err := json.Marshal(metadata.Properties) + if err != nil { + return nil, err + } + + var m serviceBusQueuesMetadata + err = json.Unmarshal(b, &m) + if err != nil { + return nil, err + } + + if m.ConnectionString != "" && m.NamespaceName != "" { + return nil, errors.New("connectionString and namespaceName are mutually exclusive") + } + + ttl, ok, err := contrib_metadata.TryGetTTL(metadata.Properties) + if err != nil { + return nil, err + } + if !ok { + // set the same default message time to live as suggested in Azure Portal to 14 days (otherwise it will be 10675199 days) + ttl = defaultMessageTimeToLive + } + m.ttl = ttl + + // Queue names are case-insensitive and are forced to lowercase. This mimics the Azure portal's behavior. + m.QueueName = strings.ToLower(m.QueueName) + + if m.TimeoutInSec < 1 { + m.TimeoutInSec = defaultTimeoutInSec + } + + if m.MinConnectionRecoveryInSec < 1 { + m.MinConnectionRecoveryInSec = defaultMinConnectionRecoveryInSec + } + + if m.MaxConnectionRecoveryInSec < 1 { + m.MaxConnectionRecoveryInSec = defaultMaxConnectionRecoveryInSec + } + + if m.MinConnectionRecoveryInSec > m.MaxConnectionRecoveryInSec { + return nil, errors.New("maxConnectionRecoveryInSec must be greater than minConnectionRecoveryInSec") + } + + if m.MaxActiveMessages < 1 { + m.MaxActiveMessages = defaultMaxActiveMessages + } + + if m.MaxConcurrentHandlers < 1 { + m.MaxConcurrentHandlers = defaultMaxConcurrentHandlers + } + + if m.MaxConcurrentHandlers > m.MaxActiveMessages { + return nil, errors.New("maxConcurrentHandlers cannot be bigger than maxActiveMessages") + } + + if m.LockRenewalInSec < 1 { + m.LockRenewalInSec = defaultLockRenewalInSec + } + + if m.MaxRetriableErrorsPerSec == nil { + m.MaxRetriableErrorsPerSec = to.Ptr(defaultMaxRetriableErrorsPerSec) + } + if *m.MaxRetriableErrorsPerSec < 0 { + return nil, errors.New("maxRetriableErrorsPerSec must be non-negative") + } + + return &m, nil +} diff --git a/bindings/azure/servicebusqueues/servicebusqueues_test.go b/bindings/azure/servicebusqueues/metadata_test.go similarity index 97% rename from bindings/azure/servicebusqueues/servicebusqueues_test.go rename to bindings/azure/servicebusqueues/metadata_test.go index 0d9089c43..dcdaea384 100644 --- a/bindings/azure/servicebusqueues/servicebusqueues_test.go +++ b/bindings/azure/servicebusqueues/metadata_test.go @@ -39,14 +39,14 @@ func TestParseMetadata(t *testing.T) { properties: map[string]string{"connectionString": "connString", "queueName": "queue1"}, expectedConnectionString: "connString", expectedQueueName: "queue1", - expectedTTL: azureServiceBusDefaultMessageTimeToLive, + expectedTTL: defaultMessageTimeToLive, }, { name: "Empty TTL", properties: map[string]string{"connectionString": "connString", "queueName": "queue1", metadata.TTLMetadataKey: ""}, expectedConnectionString: "connString", expectedQueueName: "queue1", - expectedTTL: azureServiceBusDefaultMessageTimeToLive, + expectedTTL: defaultMessageTimeToLive, }, { name: "With TTL", diff --git a/bindings/azure/servicebusqueues/servicebusqueues.go b/bindings/azure/servicebusqueues/servicebusqueues.go index f935402ef..bb76fc3fe 100644 --- a/bindings/azure/servicebusqueues/servicebusqueues.go +++ b/bindings/azure/servicebusqueues/servicebusqueues.go @@ -15,67 +15,40 @@ package servicebusqueues import ( "context" - "encoding/json" "errors" - "strings" "sync" "time" "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" servicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" sbadmin "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin" + "github.com/Azure/go-amqp" backoff "github.com/cenkalti/backoff/v4" - "go.uber.org/ratelimit" azauth "github.com/dapr/components-contrib/authentication/azure" "github.com/dapr/components-contrib/bindings" + impl "github.com/dapr/components-contrib/internal/component/azure/servicebus" contrib_metadata "github.com/dapr/components-contrib/metadata" "github.com/dapr/kit/logger" - "github.com/dapr/kit/retry" ) const ( correlationID = "correlationID" label = "label" id = "id" - - // azureServiceBusDefaultMessageTimeToLive defines the default time to live for queues, which is 14 days. The same way Azure Portal does. - azureServiceBusDefaultMessageTimeToLive = time.Hour * 24 * 14 - - // Default timeout in seconds - defaultTimeoutInSec = 60 - - // Default minimum and maximum recovery time while trying to reconnect - defaultMinConnectionRecoveryInSec = 2 - defaultMaxConnectionRecoveryInSec = 300 - - // Default rate of retriable errors per second - defaultMaxRetriableErrorsPerSec = 10 ) // AzureServiceBusQueues is an input/output binding reading from and sending events to Azure Service Bus queues. type AzureServiceBusQueues struct { - metadata *serviceBusQueuesMetadata - client *servicebus.Client - adminClient *sbadmin.Client - timeout time.Duration - sender *servicebus.Sender - senderLock sync.RWMutex - retriableErrLimit ratelimit.Limiter - logger logger.Logger - ctx context.Context - cancel context.CancelFunc -} - -type serviceBusQueuesMetadata struct { - ConnectionString string `json:"connectionString"` - NamespaceName string `json:"namespaceName,omitempty"` - QueueName string `json:"queueName"` - TimeoutInSec int `json:"timeoutInSec"` - MaxConnectionRecoveryInSec int `json:"maxConnectionRecoveryInSec"` - MinConnectionRecoveryInSec int `json:"minConnectionRecoveryInSec"` - MaxRetriableErrorsPerSec *int `json:"maxRetriableErrorsPerSec"` - ttl time.Duration + metadata *serviceBusQueuesMetadata + client *servicebus.Client + adminClient *sbadmin.Client + timeout time.Duration + sender *servicebus.Sender + senderLock sync.RWMutex + logger logger.Logger + ctx context.Context + cancel context.CancelFunc } // NewAzureServiceBusQueues returns a new AzureServiceBusQueues instance. @@ -93,11 +66,6 @@ func (a *AzureServiceBusQueues) Init(metadata bindings.Metadata) (err error) { return err } a.timeout = time.Duration(a.metadata.TimeoutInSec) * time.Second - if *a.metadata.MaxRetriableErrorsPerSec > 0 { - a.retriableErrLimit = ratelimit.New(*a.metadata.MaxRetriableErrorsPerSec) - } else { - a.retriableErrLimit = ratelimit.NewUnlimited() - } userAgent := "dapr-" + logger.DaprVersion if a.metadata.ConnectionString != "" { @@ -164,61 +132,6 @@ func (a *AzureServiceBusQueues) Init(metadata bindings.Metadata) (err error) { return nil } -func (a *AzureServiceBusQueues) parseMetadata(metadata bindings.Metadata) (*serviceBusQueuesMetadata, error) { - b, err := json.Marshal(metadata.Properties) - if err != nil { - return nil, err - } - - var m serviceBusQueuesMetadata - err = json.Unmarshal(b, &m) - if err != nil { - return nil, err - } - - if m.ConnectionString != "" && m.NamespaceName != "" { - return nil, errors.New("connectionString and namespaceName are mutually exclusive") - } - - ttl, ok, err := contrib_metadata.TryGetTTL(metadata.Properties) - if err != nil { - return nil, err - } - if !ok { - // set the same default message time to live as suggested in Azure Portal to 14 days (otherwise it will be 10675199 days) - ttl = azureServiceBusDefaultMessageTimeToLive - } - m.ttl = ttl - - // Queue names are case-insensitive and are forced to lowercase. This mimics the Azure portal's behavior. - m.QueueName = strings.ToLower(m.QueueName) - - if m.TimeoutInSec < 1 { - m.TimeoutInSec = defaultTimeoutInSec - } - - if m.MinConnectionRecoveryInSec < 1 { - m.MinConnectionRecoveryInSec = defaultMinConnectionRecoveryInSec - } - - if m.MaxConnectionRecoveryInSec < 1 { - m.MaxConnectionRecoveryInSec = defaultMaxConnectionRecoveryInSec - } - - if m.MinConnectionRecoveryInSec > m.MaxConnectionRecoveryInSec { - return nil, errors.New("maxConnectionRecoveryInSec must be greater than minConnectionRecoveryInSec") - } - - if m.MaxRetriableErrorsPerSec == nil { - m.MaxRetriableErrorsPerSec = to.Ptr(defaultMaxRetriableErrorsPerSec) - } - if *m.MaxRetriableErrorsPerSec < 0 { - return nil, errors.New("maxRetriableErrorsPerSec must be non-negative") - } - - return &m, nil -} - func (a *AzureServiceBusQueues) Operations() []bindings.OperationKind { return []bindings.OperationKind{bindings.CreateOperation} } @@ -264,86 +177,67 @@ func (a *AzureServiceBusQueues) Invoke(ctx context.Context, req *bindings.Invoke } func (a *AzureServiceBusQueues) Read(handler bindings.Handler) error { + subscribeCtx := context.Background() + // Reconnection backoff policy bo := backoff.NewExponentialBackOff() bo.MaxElapsedTime = 0 bo.InitialInterval = time.Duration(a.metadata.MinConnectionRecoveryInSec) * time.Second bo.MaxInterval = time.Duration(a.metadata.MaxConnectionRecoveryInSec) * time.Second + // Reconnect loop. for { - receiver, _ := a.attemptConnectionForever(a.ctx) - if receiver == nil { - a.logger.Errorf("Failed to connect to Azure Service Bus Queue.") - continue + sub := impl.NewSubscription( + subscribeCtx, + a.metadata.MaxActiveMessages, + a.metadata.TimeoutInSec, + *a.metadata.MaxRetriableErrorsPerSec, + &a.metadata.MaxConcurrentHandlers, + "queue "+a.metadata.QueueName, + a.logger, + ) + + // Blocks until a successful connection (or until context is canceled) + err := sub.Connect(func() (*servicebus.Receiver, error) { + return a.client.NewReceiverForQueue(a.metadata.QueueName, nil) + }) + if err != nil { + // Realistically, the only time we should get to this point is if the context was canceled, but let's log any other error we may get. + if err != context.Canceled { + a.logger.Warnf("Error reading from Azure Service Bus Queue binding: %s", err.Error()) + } + break } - // Receive messages loop - // This continues until the context is canceled - for a.ctx.Err() == nil { - // Blocks until the connection is closed or the context is canceled - msgs, err := receiver.ReceiveMessages(a.ctx, 1, nil) - if err != nil { - if err != context.Canceled { - a.logger.Warnf("Error reading from Azure Service Bus Queue binding: %s", err.Error()) - } - // Exit from the receive loop to force a reconnection - break - } - - // If we got a message, reset the reconnection backoff - bo.Reset() - - l := len(msgs) - if l == 0 { - // We got no message, which is unusual too - a.logger.Warn("Received 0 messages from Service Bus") - continue - } else if l > 1 { - // We are requesting one message only; this should never happen - a.logger.Errorf("Expected one message from Service Bus, but received %d", l) - } - - msg := msgs[0] - - metadata := make(map[string]string) - metadata[id] = msg.MessageID - if msg.CorrelationID != nil { - metadata[correlationID] = *msg.CorrelationID - } - if msg.Subject != nil { - metadata[label] = *msg.Subject - } - - _, err = handler(a.ctx, &bindings.ReadResponse{ - Data: msg.Body, - Metadata: metadata, - }) - if err != nil { - a.abandonMessage(receiver, msg) - continue - } - - // Use a background context in case a.ctx has been canceled already - ctx, cancel := context.WithTimeout(context.Background(), a.timeout) - err = receiver.CompleteMessage(ctx, msg, nil) - cancel() - if err != nil { - a.logger.Warnf("Error completing message: %s", err.Error()) - continue + // ReceiveAndBlock will only return with an error that it cannot handle internally. The subscription connection is closed when this method returns. + // If that occurs, we will log the error and attempt to re-establish the subscription connection until we exhaust the number of reconnect attempts. + err = sub.ReceiveAndBlock( + a.getHandlerFunc(handler), + a.metadata.LockRenewalInSec, + func() { + // Reset the backoff when the subscription is successful and we have received the first message + bo.Reset() + }, + ) + if err != nil { + var detachError *amqp.DetachError + var amqpError *amqp.Error + if errors.Is(err, detachError) || + (errors.As(err, &amqpError) && amqpError.Condition == amqp.ErrorDetachForced) { + a.logger.Debug(err) + } else { + a.logger.Error(err) } } - // Disconnect (gracefully) before attempting to re-connect (unless we're shutting down) - // Use a background context here because a.ctx may be canceled already at this stage - ctx, cancel := context.WithTimeout(context.Background(), a.timeout) - if err := receiver.Close(ctx); err != nil { - // Log only - a.logger.Warnf("Error closing receiver of Azure Service Bus Queue binding: %s", err.Error()) - } - cancel() + // Gracefully close the connection (in case it's not closed already) + // Use a background context here (with timeout) because ctx may be closed already + closeCtx, closeCancel := context.WithTimeout(context.Background(), time.Second*time.Duration(a.metadata.TimeoutInSec)) + sub.Close(closeCtx) + closeCancel() - // Reconnect until context is canceled - if a.ctx.Err() != nil { + // If context was canceled, do not attempt to reconnect + if subscribeCtx.Err() != nil { a.logger.Debug("Context canceled; will not reconnect") break } @@ -352,55 +246,27 @@ func (a *AzureServiceBusQueues) Read(handler bindings.Handler) error { a.logger.Warnf("Subscription to queue %s lost connection, attempting to reconnect in %s...", a.metadata.QueueName, wait) time.Sleep(wait) } + return nil } -func (a *AzureServiceBusQueues) abandonMessage(receiver *servicebus.Receiver, msg *servicebus.ReceivedMessage) { - // Use a background context in case a.ctx has been canceled already - ctx, cancel := context.WithTimeout(context.Background(), a.timeout) - err := receiver.AbandonMessage(ctx, msg, nil) - cancel() - if err != nil { - // Log only - a.logger.Warnf("Error abandoning message: %s", err.Error()) +func (a *AzureServiceBusQueues) getHandlerFunc(handler bindings.Handler) impl.HandlerFunc { + return func(ctx context.Context, msg *servicebus.ReceivedMessage) error { + metadata := make(map[string]string) + metadata[id] = msg.MessageID + if msg.CorrelationID != nil { + metadata[correlationID] = *msg.CorrelationID + } + if msg.Subject != nil { + metadata[label] = *msg.Subject + } + + _, err := handler(a.ctx, &bindings.ReadResponse{ + Data: msg.Body, + Metadata: metadata, + }) + return err } - - // If we're here, it means we got a retriable error, so we need to consume a retriable error token before this (synchronous) method returns - // If there have been too many retriable errors per second, this method slows the consuming down - a.logger.Debugf("Taking a retriable error token") - before := time.Now() - _ = a.retriableErrLimit.Take() - a.logger.Debugf("Resumed after pausing for %v", time.Now().Sub(before)) -} - -// Attempts to connect to a Service Bus queue and blocks until it succeeds; it can retry forever (until the context is canceled) -func (a *AzureServiceBusQueues) attemptConnectionForever(ctx context.Context) (receiver *servicebus.Receiver, err error) { - // Connections need to retry forever with a maximum backoff of 5 minutes and exponential scaling. - config := retry.DefaultConfig() - config.Policy = retry.PolicyExponential - config.MaxInterval = 5 * time.Minute - config.MaxElapsedTime = 0 - backoff := config.NewBackOffWithContext(ctx) - - err = retry.NotifyRecover( - func() error { - clientAttempt, innerErr := a.client.NewReceiverForQueue(a.metadata.QueueName, nil) - if innerErr != nil { - return innerErr - } - receiver = clientAttempt - return nil - }, - backoff, - func(err error, d time.Duration) { - a.logger.Debugf("Failed to connect to Azure Service Bus Queue Binding with error: %s", err.Error()) - }, - func() { - a.logger.Debug("Successfully reconnected to Azure Service Bus.") - backoff.Reset() - }, - ) - return receiver, err } func (a *AzureServiceBusQueues) Close() (err error) { diff --git a/go.mod b/go.mod index 5e9515d6c..11ad104f4 100644 --- a/go.mod +++ b/go.mod @@ -9,10 +9,12 @@ require ( cloud.google.com/go/storage v1.10.0 github.com/Azure/azure-amqp-common-go/v3 v3.2.3 github.com/Azure/azure-event-hubs-go/v3 v3.3.18 - github.com/Azure/azure-sdk-for-go v63.4.0+incompatible - github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0 - github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0 + github.com/Azure/azure-sdk-for-go v65.0.0+incompatible + github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 + github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 + github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1 github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets v0.7.1 + github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.0.1 github.com/Azure/azure-service-bus-go v0.10.10 github.com/Azure/azure-storage-blob-go v0.10.0 github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd @@ -157,8 +159,6 @@ require ( require ( cloud.google.com/go/secretmanager v1.4.0 dubbo.apache.org/dubbo-go/v3 v3.0.3-0.20220610080020-48691a404537 - github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1 - github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.0.1 github.com/apache/dubbo-go-hessian2 v1.11.0 github.com/huaweicloud/huaweicloud-sdk-go-obs v3.21.12+incompatible github.com/huaweicloud/huaweicloud-sdk-go-v3 v0.0.87 @@ -177,7 +177,7 @@ require ( cloud.google.com/go/kms v1.4.0 // indirect contrib.go.opencensus.io/exporter/prometheus v0.4.1 // indirect github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect - github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 // indirect + github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 // indirect github.com/RoaringBitmap/roaring v1.1.0 // indirect github.com/Workiva/go-datastructures v1.0.52 // indirect github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 // indirect diff --git a/go.sum b/go.sum index bb9f308ce..c02c2af68 100644 --- a/go.sum +++ b/go.sum @@ -92,12 +92,12 @@ github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9mo github.com/Azure/azure-sdk-for-go v37.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-sdk-for-go v56.3.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= -github.com/Azure/azure-sdk-for-go v63.4.0+incompatible h1:fle3M5Q7vr8auaiPffKyUQmLbvYeqpw30bKU6PrWJFo= -github.com/Azure/azure-sdk-for-go v63.4.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= -github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0 h1:sVPhtT2qjO86rTUaWMr4WoES4TkjGnzcioXcnHV9s5k= -github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= -github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0 h1:Yoicul8bnVdQrhDMTHxdEckRGX01XvwXDHUT9zYZ3k0= -github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0/go.mod h1:+6sju8gk8FRmSajX3Oz4G5Gm7P+mbqE9FVaXXFYTkCM= +github.com/Azure/azure-sdk-for-go v65.0.0+incompatible h1:HzKLt3kIwMm4KeJYTdx9EbjRYTySD/t8i1Ee/W5EGXw= +github.com/Azure/azure-sdk-for-go v65.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 h1:Ut0ZGdOwJDw0npYEg+TLlPls3Pq6JiZaP2/aGKir7Zw= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 h1:QkAcEIAKbNL4KoFr4SathZPhDhF4mVwpBMFlYjyAqy8= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0= github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1 h1:bFa9IcjvrCber6gGgDAUZ+I2bO8J7s8JxXmu9fhi2ss= github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1/go.mod h1:l3wvZkG9oW07GLBW5Cd0WwG5asOfJ8aqE8raUvNzLpk= github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 h1:jp0dGvZ7ZK0mgqnTSClMxa5xuRL7NZgHameVYF6BurY= @@ -176,8 +176,8 @@ github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk= github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo= github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU= -github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 h1:WVsrXCnHlDDX8ls+tootqRE87/hL9S/g4ewig9RsD/c= -github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4= +github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 h1:BWe8a+f/t+7KY7zH2mqygeUD0t8hNFXe08p1Pb3/jKE= +github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.4.1 h1:GaI7EiDXDRfa8VshkTj7Fym7ha+y8/XxIgD2okUIjLw= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= diff --git a/pubsub/azure/servicebus/subscription.go b/internal/component/azure/servicebus/subscription.go similarity index 54% rename from pubsub/azure/servicebus/subscription.go rename to internal/component/azure/servicebus/subscription.go index 588eabbfd..73a2ad182 100644 --- a/pubsub/azure/servicebus/subscription.go +++ b/internal/component/azure/servicebus/subscription.go @@ -15,25 +15,27 @@ package servicebus import ( "context" - "fmt" "sync" "time" azservicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" "go.uber.org/ratelimit" - "github.com/dapr/components-contrib/pubsub" "github.com/dapr/kit/logger" + "github.com/dapr/kit/retry" ) -type subscription struct { - topic string +// HandlerFunc is the type for handlers that receive messages +type HandlerFunc func(ctx context.Context, msg *azservicebus.ReceivedMessage) error + +// Subscription is an object that manages a subscription to an Azure Service Bus receiver, for a topic or queue. +type Subscription struct { + entity string mu sync.RWMutex activeMessages map[string]*azservicebus.ReceivedMessage activeMessagesChan chan struct{} receiver *azservicebus.Receiver timeout time.Duration - handlerTimeout time.Duration retriableErrLimit ratelimit.Limiter handleChan chan struct{} logger logger.Logger @@ -41,25 +43,23 @@ type subscription struct { cancel context.CancelFunc } -func newSubscription( +// NewSubscription returns a new Subscription object. +// Parameter "entity" is usually in the format "topic " or "queue " and it's only used for logging. +func NewSubscription( parentCtx context.Context, - topic string, - receiver *azservicebus.Receiver, maxActiveMessages int, timeoutInSec int, - handlerTimeoutInSec int, maxRetriableEPS int, maxConcurrentHandlers *int, + entity string, logger logger.Logger, -) *subscription { +) *Subscription { ctx, cancel := context.WithCancel(parentCtx) - s := &subscription{ - topic: topic, + s := &Subscription{ + entity: entity, activeMessages: make(map[string]*azservicebus.ReceivedMessage), activeMessagesChan: make(chan struct{}, maxActiveMessages), - receiver: receiver, timeout: time.Duration(timeoutInSec) * time.Second, - handlerTimeout: time.Duration(handlerTimeoutInSec) * time.Second, logger: logger, ctx: ctx, cancel: cancel, @@ -72,15 +72,45 @@ func newSubscription( } if maxConcurrentHandlers != nil { - s.logger.Debugf("Subscription to topic %s is limited to %d message handler(s)", topic, *maxConcurrentHandlers) + s.logger.Debugf("Subscription to %s is limited to %d message handler(s)", entity, *maxConcurrentHandlers) s.handleChan = make(chan struct{}, *maxConcurrentHandlers) } return s } -// ReceiveAndBlock is a blocking call to receive messages on an Azure Service Bus subscription from a topic. -func (s *subscription) ReceiveAndBlock(handler pubsub.Handler, lockRenewalInSec int, onFirstSuccess func()) error { +// Connect to a Service Bus topic or queue, blocking until it succeeds; it can retry forever (until the context is canceled). +func (s *Subscription) Connect(newReceiverFunc func() (*azservicebus.Receiver, error)) (err error) { + // Connections need to retry forever with a maximum backoff of 5 minutes and exponential scaling. + config := retry.DefaultConfig() + config.Policy = retry.PolicyExponential + config.MaxInterval = 5 * time.Minute + config.MaxElapsedTime = 0 + backoff := config.NewBackOffWithContext(s.ctx) + + err = retry.NotifyRecover( + func() error { + clientAttempt, innerErr := newReceiverFunc() + if innerErr != nil { + return innerErr + } + s.receiver = clientAttempt + return nil + }, + backoff, + func(err error, d time.Duration) { + s.logger.Warnf("Failed to connect to Azure Service Bus %s; will retry in %s. Error: %s", s.entity, d, err.Error()) + }, + func() { + s.logger.Infof("Successfully reconnected to Azure Service Bus %s", s.entity) + }, + ) + + return err +} + +// ReceiveAndBlock is a blocking call to receive messages on an Azure Service Bus subscription from a topic or queue. +func (s *Subscription) ReceiveAndBlock(handler HandlerFunc, lockRenewalInSec int, onFirstSuccess func()) error { ctx, cancel := context.WithCancel(s.ctx) defer cancel() @@ -88,7 +118,7 @@ func (s *subscription) ReceiveAndBlock(handler pubsub.Handler, lockRenewalInSec go func() { shouldRenewLocks := lockRenewalInSec > 0 if !shouldRenewLocks { - s.logger.Debugf("Lock renewal for topic %s disabled", s.topic) + s.logger.Debugf("Lock renewal for %s disabled", s.entity) return } t := time.NewTicker(time.Second * time.Duration(lockRenewalInSec)) @@ -96,7 +126,7 @@ func (s *subscription) ReceiveAndBlock(handler pubsub.Handler, lockRenewalInSec for { select { case <-ctx.Done(): - s.logger.Debugf("Lock renewal context for topic %s done", s.topic) + s.logger.Debugf("Lock renewal context for %s done", s.entity) return case <-t.C: s.tryRenewLocks() @@ -104,8 +134,6 @@ func (s *subscription) ReceiveAndBlock(handler pubsub.Handler, lockRenewalInSec } }() - handlerFunc := s.getHandlerFunc(handler) - // Receiver loop for { select { @@ -113,7 +141,7 @@ func (s *subscription) ReceiveAndBlock(handler pubsub.Handler, lockRenewalInSec case s.activeMessagesChan <- struct{}{}: // Return if context is canceled case <-ctx.Done(): - s.logger.Debugf("Receive context for topic %s done", s.topic) + s.logger.Debugf("Receive context for %s done", s.entity) return ctx.Err() } @@ -121,7 +149,7 @@ func (s *subscription) ReceiveAndBlock(handler pubsub.Handler, lockRenewalInSec msgs, err := s.receiver.ReceiveMessages(s.ctx, 1, nil) if err != nil { if err != context.Canceled { - s.logger.Errorf("Error reading from topic %s. %s", s.topic, err.Error()) + s.logger.Errorf("Error reading from %s. %s", s.entity, err.Error()) } // Return the error. This will cause the Service Bus component to try and reconnect. return err @@ -145,62 +173,27 @@ func (s *subscription) ReceiveAndBlock(handler pubsub.Handler, lockRenewalInSec msg := msgs[0] s.logger.Debugf("Received message: %s; current active message usage: %d/%d", msg.MessageID, len(s.activeMessagesChan), cap(s.activeMessagesChan)) - // body, _ := msg.Body() - // s.logger.Debugf("Message body: %s", string(body)) + // s.logger.Debugf("Message body: %s", string(msg.Body)) s.addActiveMessage(msg) s.logger.Debugf("Processing received message: %s", msg.MessageID) - s.handleAsync(s.ctx, msg, handlerFunc) + s.handleAsync(s.ctx, msg, handler) } } -func (s *subscription) close(closeCtx context.Context) { - s.logger.Debugf("Closing subscription to topic %s", s.topic) +// Close the receiver and stops watching for new messages. +func (s *Subscription) Close(closeCtx context.Context) { + s.logger.Debugf("Closing subscription to %s", s.entity) // Ensure subscription entity is closed. if err := s.receiver.Close(closeCtx); err != nil { - s.logger.Warnf("%s closing subscription entity for topic %s: %+v", errorMessagePrefix, s.topic, err) + s.logger.Warnf("Error closing subscription for %s: %+v", s.entity, err) } s.cancel() } -func (s *subscription) getHandlerFunc(handler pubsub.Handler) func(ctx context.Context, asbMsg *azservicebus.ReceivedMessage) (consumeToken bool, err error) { - handlerTimeout := s.handlerTimeout - timeout := s.timeout - return func(ctx context.Context, asbMsg *azservicebus.ReceivedMessage) (consumeToken bool, err error) { - pubsubMsg, err := NewPubsubMessageFromASBMessage(asbMsg, s.topic) - if err != nil { - return false, fmt.Errorf("failed to get pubsub message from azure service bus message: %+v", err) - } - - handleCtx, handleCancel := context.WithTimeout(ctx, handlerTimeout) - defer handleCancel() - s.logger.Debugf("Calling app's handler for message %s on topic %s", asbMsg.MessageID, s.topic) - appErr := handler(handleCtx, pubsubMsg) - - // This context is used for the calls to service bus to finalize (i.e. complete/abandon) the message. - // If we fail to finalize the message, this message will eventually be reprocessed (at-least once delivery). - finalizeCtx, finalizeCancel := context.WithTimeout(context.Background(), timeout) - defer finalizeCancel() - - if appErr != nil { - s.logger.Warnf("Error in app's handler: %+v", appErr) - if abandonErr := s.abandonMessage(finalizeCtx, asbMsg); abandonErr != nil { - return true, fmt.Errorf("failed to abandon: %+v", abandonErr) - } - - return true, nil - } - if completeErr := s.completeMessage(finalizeCtx, asbMsg); completeErr != nil { - return false, fmt.Errorf("failed to complete: %+v", completeErr) - } - - return false, nil - } -} - -func (s *subscription) handleAsync(ctx context.Context, msg *azservicebus.ReceivedMessage, handlerFunc func(ctx context.Context, asbMsg *azservicebus.ReceivedMessage) (consumeToken bool, err error)) { +func (s *Subscription) handleAsync(ctx context.Context, msg *azservicebus.ReceivedMessage, handler HandlerFunc) { go func() { var consumeToken bool var err error @@ -211,7 +204,7 @@ func (s *subscription) handleAsync(ctx context.Context, msg *azservicebus.Receiv // Release a handler if needed if limitConcurrentHandlers { <-s.handleChan - s.logger.Debugf("Released message handle for %s on topic %s", msg.MessageID, s.topic) + s.logger.Debugf("Released message handle for %s on %s", msg.MessageID, s.entity) } // If we got a retriable error (app handler returned a retriable error, or a network error while connecting to the app, etc) consume a retriable error token @@ -231,27 +224,39 @@ func (s *subscription) handleAsync(ctx context.Context, msg *azservicebus.Receiv }() if limitConcurrentHandlers { - s.logger.Debugf("Taking message handle for %s on topic %s", msg.MessageID, s.topic) + s.logger.Debugf("Taking message handle for %s on %s", msg.MessageID, s.entity) select { // Context is done, so we will stop waiting case <-ctx.Done(): - s.logger.Debugf("Message context done for %s on topic %s", msg.MessageID, s.topic) + s.logger.Debugf("Message context done for %s on %s", msg.MessageID, s.entity) return // Blocks until we have a handler available case s.handleChan <- struct{}{}: - s.logger.Debugf("Taken message handle for %s on topic %s", msg.MessageID, s.topic) + s.logger.Debugf("Taken message handle for %s on %s", msg.MessageID, s.entity) } } - consumeToken, err = handlerFunc(ctx, msg) + // Invoke the handler to process the message + err = handler(ctx, msg) + + // This context is used for the calls to service bus to finalize (i.e. complete/abandon) the message. + // If we fail to finalize the message, this message will eventually be reprocessed (at-least once delivery). + // This uses a background context in case ctx has been canceled already. + finalizeCtx, finalizeCancel := context.WithTimeout(context.Background(), s.timeout) + defer finalizeCancel() + if err != nil { // Log the error only, as we're running asynchronously - s.logger.Errorf("%s error handling message %s on topic '%s', %s", errorMessagePrefix, msg.MessageID, s.topic, err) + s.logger.Errorf("App handler returned an error for message %s on %s: %s", msg.MessageID, s.entity, err) + s.AbandonMessage(finalizeCtx, msg) + return } + + s.CompleteMessage(finalizeCtx, msg) }() } -func (s *subscription) tryRenewLocks() { +func (s *Subscription) tryRenewLocks() { // Snapshot the messages to try to renew locks for. msgs := make([]*azservicebus.ReceivedMessage, 0) s.mu.RLock() @@ -260,12 +265,12 @@ func (s *subscription) tryRenewLocks() { } s.mu.RUnlock() if len(msgs) == 0 { - s.logger.Debugf("No active messages require lock renewal for topic %s", s.topic) + s.logger.Debugf("No active messages require lock renewal for %s", s.entity) return } // Lock renewal is best effort and not guaranteed to succeed, warnings are expected. - s.logger.Debugf("Trying to renew %d active message lock(s) for topic %s", len(msgs), s.topic) + s.logger.Debugf("Trying to renew %d active message lock(s) for %s", len(msgs), s.entity) var err error var ctx context.Context var cancel context.CancelFunc @@ -273,31 +278,52 @@ func (s *subscription) tryRenewLocks() { ctx, cancel = context.WithTimeout(context.Background(), s.timeout) err = s.receiver.RenewMessageLock(ctx, msg, nil) if err != nil { - s.logger.Debugf("Couldn't renew all active message lock(s) for topic %s, ", s.topic, err) + s.logger.Debugf("Couldn't renew all active message lock(s) for %s, ", s.entity, err) } cancel() } } -func (s *subscription) abandonMessage(ctx context.Context, m *azservicebus.ReceivedMessage) error { - s.logger.Debugf("Abandoning message %s on topic %s", m.MessageID, s.topic) - return s.receiver.AbandonMessage(ctx, m, nil) +// AbandonMessage marks a messsage as abandoned. +func (s *Subscription) AbandonMessage(ctx context.Context, m *azservicebus.ReceivedMessage) { + s.logger.Debugf("Abandoning message %s on %s", m.MessageID, s.entity) + + // Use a background context in case a.ctx has been canceled already + err := s.receiver.AbandonMessage(ctx, m, nil) + if err != nil { + // Log only + s.logger.Warnf("Error abandoning message %s on %s: %s", m.MessageID, s.entity, err.Error()) + } + + // If we're here, it means we got a retriable error, so we need to consume a retriable error token before this (synchronous) method returns + // If there have been too many retriable errors per second, this method slows the consumer down + s.logger.Debugf("Taking a retriable error token") + before := time.Now() + _ = s.retriableErrLimit.Take() + s.logger.Debugf("Resumed after pausing for %v", time.Now().Sub(before)) } -func (s *subscription) completeMessage(ctx context.Context, m *azservicebus.ReceivedMessage) error { - s.logger.Debugf("Completing message %s on topic %s", m.MessageID, s.topic) - return s.receiver.CompleteMessage(ctx, m, nil) +// CompleteMessage marks a message as complete. +func (s *Subscription) CompleteMessage(ctx context.Context, m *azservicebus.ReceivedMessage) { + s.logger.Debugf("Completing message %s on %s", m.MessageID, s.entity) + + // Use a background context in case a.ctx has been canceled already + err := s.receiver.CompleteMessage(ctx, m, nil) + if err != nil { + // Log only + s.logger.Warnf("Error completing message %s on %s: %s", m.MessageID, s.entity, err.Error()) + } } -func (s *subscription) addActiveMessage(m *azservicebus.ReceivedMessage) { - s.logger.Debugf("Adding message %s to active messages on topic %s", m.MessageID, s.topic) +func (s *Subscription) addActiveMessage(m *azservicebus.ReceivedMessage) { + s.logger.Debugf("Adding message %s to active messages on %s", m.MessageID, s.entity) s.mu.Lock() s.activeMessages[m.MessageID] = m s.mu.Unlock() } -func (s *subscription) removeActiveMessage(messageID string) { - s.logger.Debugf("Removing message %s from active messages on topic %s", messageID, s.topic) +func (s *Subscription) removeActiveMessage(messageID string) { + s.logger.Debugf("Removing message %s from active messages on %s", messageID, s.entity) s.mu.Lock() delete(s.activeMessages, messageID) s.mu.Unlock() diff --git a/pubsub/azure/servicebus/servicebus.go b/pubsub/azure/servicebus/servicebus.go index 39dc95f42..4ce5f0c3d 100644 --- a/pubsub/azure/servicebus/servicebus.go +++ b/pubsub/azure/servicebus/servicebus.go @@ -29,6 +29,7 @@ import ( sbadmin "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin" azauth "github.com/dapr/components-contrib/authentication/azure" + impl "github.com/dapr/components-contrib/internal/component/azure/servicebus" contrib_metadata "github.com/dapr/components-contrib/metadata" "github.com/dapr/components-contrib/pubsub" "github.com/dapr/kit/logger" @@ -368,8 +369,20 @@ func (a *azureServiceBus) Subscribe(subscribeCtx context.Context, req pubsub.Sub go func() { // Reconnect loop. for { + sub := impl.NewSubscription( + subscribeCtx, + a.metadata.MaxActiveMessages, + a.metadata.TimeoutInSec, + a.metadata.MaxRetriableErrorsPerSec, + a.metadata.MaxConcurrentHandlers, + "topic "+req.Topic, + a.logger, + ) + // Blocks until a successful connection (or until context is canceled) - receiver, err := a.attemptConnectionForever(subscribeCtx, req.Topic, subID) + err := sub.Connect(func() (*servicebus.Receiver, error) { + return a.client.NewReceiverForSubscription(req.Topic, subID, nil) + }) if err != nil { // Realistically, the only time we should get to this point is if the context was canceled, but let's log any other error we may get. if err != context.Canceled { @@ -377,47 +390,32 @@ func (a *azureServiceBus) Subscribe(subscribeCtx context.Context, req pubsub.Sub } return } - sub := newSubscription( - subscribeCtx, - req.Topic, - receiver, - a.metadata.MaxActiveMessages, - a.metadata.TimeoutInSec, - a.metadata.HandlerTimeoutInSec, - a.metadata.MaxRetriableErrorsPerSec, - a.metadata.MaxConcurrentHandlers, - a.logger, - ) - // ReceiveAndBlock will only return with an error - // that it cannot handle internally. The subscription - // connection is closed when this method returns. - // If that occurs, we will log the error and attempt - // to re-establish the subscription connection until - // we exhaust the number of reconnect attempts. - innerErr := sub.ReceiveAndBlock( - handler, + // ReceiveAndBlock will only return with an error that it cannot handle internally. The subscription connection is closed when this method returns. + // If that occurs, we will log the error and attempt to re-establish the subscription connection until we exhaust the number of reconnect attempts. + err = sub.ReceiveAndBlock( + a.getHandlerFunc(req.Topic, handler), a.metadata.LockRenewalInSec, func() { // Reset the backoff when the subscription is successful and we have received the first message bo.Reset() }, ) - if innerErr != nil { + if err != nil { var detachError *amqp.DetachError var amqpError *amqp.Error - if errors.Is(innerErr, detachError) || - (errors.As(innerErr, &amqpError) && amqpError.Condition == amqp.ErrorDetachForced) { - a.logger.Debug(innerErr) + if errors.Is(err, detachError) || + (errors.As(err, &amqpError) && amqpError.Condition == amqp.ErrorDetachForced) { + a.logger.Debug(err) } else { - a.logger.Error(innerErr) + a.logger.Error(err) } } // Gracefully close the connection (in case it's not closed already) // Use a background context here (with timeout) because ctx may be closed already closeCtx, closeCancel := context.WithTimeout(context.Background(), time.Second*time.Duration(a.metadata.TimeoutInSec)) - sub.close(closeCtx) + sub.Close(closeCtx) closeCancel() // If context was canceled, do not attempt to reconnect @@ -427,7 +425,7 @@ func (a *azureServiceBus) Subscribe(subscribeCtx context.Context, req pubsub.Sub } wait := bo.NextBackOff() - a.logger.Warnf("Subscription to topic %s lost connection, attempting to reconnect in %s...", sub.topic, wait) + a.logger.Warnf("Subscription to topic %s lost connection, attempting to reconnect in %s...", req.Topic, wait) time.Sleep(wait) } }() @@ -435,34 +433,18 @@ func (a *azureServiceBus) Subscribe(subscribeCtx context.Context, req pubsub.Sub return nil } -// Attempts to connect to a Service Bus topic and blocks until it succeeds; it can retry forever (until the context is canceled) -func (a *azureServiceBus) attemptConnectionForever(ctx context.Context, topicName string, subscriptionName string) (receiver *servicebus.Receiver, err error) { - // Connections need to retry forever with a maximum backoff of 5 minutes and exponential scaling. - config := retry.DefaultConfig() - config.Policy = retry.PolicyExponential - config.MaxInterval = 5 * time.Minute - config.MaxElapsedTime = 0 - backoff := config.NewBackOffWithContext(ctx) +func (a *azureServiceBus) getHandlerFunc(topic string, handler pubsub.Handler) impl.HandlerFunc { + return func(ctx context.Context, asbMsg *servicebus.ReceivedMessage) error { + pubsubMsg, err := NewPubsubMessageFromASBMessage(asbMsg, topic) + if err != nil { + return fmt.Errorf("failed to get pubsub message from azure service bus message: %+v", err) + } - err = retry.NotifyRecover( - func() error { - clientAttempt, innerErr := a.client.NewReceiverForSubscription(topicName, subscriptionName, nil) - if innerErr != nil { - return innerErr - } - receiver = clientAttempt - return nil - }, - backoff, - func(err error, d time.Duration) { - a.logger.Warnf("Failed to connect to Azure Service Bus topic %s; will retry in %s. Error: %s", topicName, d, err.Error()) - }, - func() { - a.logger.Infof("Successfully reconnected to Azure Service Bus topic %s", topicName) - backoff.Reset() - }, - ) - return receiver, err + handleCtx, handleCancel := context.WithTimeout(ctx, time.Duration(a.metadata.HandlerTimeoutInSec)*time.Second) + defer handleCancel() + a.logger.Debugf("Calling app's handler for message %s on topic %s", asbMsg.MessageID, topic) + return handler(handleCtx, pubsubMsg) + } } // senderForTopic returns the sender for a topic, or creates a new one if it doesn't exist diff --git a/tests/certification/bindings/azure/blobstorage/go.mod b/tests/certification/bindings/azure/blobstorage/go.mod index 17020c7c1..36e44b7b2 100644 --- a/tests/certification/bindings/azure/blobstorage/go.mod +++ b/tests/certification/bindings/azure/blobstorage/go.mod @@ -18,8 +18,8 @@ require ( github.com/AdhityaRamadhanus/fasthttpcors v0.0.0-20170121111917-d4c07198763a // indirect github.com/Azure/azure-amqp-common-go/v3 v3.2.3 // indirect github.com/Azure/azure-pipeline-go v0.2.3 // indirect - github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0 // indirect - github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 // indirect github.com/Azure/azure-storage-blob-go v0.10.0 // indirect github.com/Azure/go-autorest v14.2.0+incompatible // indirect @@ -30,7 +30,7 @@ require ( github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect github.com/Azure/go-autorest/logger v0.2.1 // indirect github.com/Azure/go-autorest/tracing v0.6.0 // indirect - github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 // indirect + github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 // indirect github.com/PuerkitoBio/purell v1.1.1 // indirect github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect github.com/andybalholm/brotli v1.0.2 // indirect diff --git a/tests/certification/bindings/azure/blobstorage/go.sum b/tests/certification/bindings/azure/blobstorage/go.sum index f912f1e87..ae003ad12 100644 --- a/tests/certification/bindings/azure/blobstorage/go.sum +++ b/tests/certification/bindings/azure/blobstorage/go.sum @@ -48,10 +48,10 @@ github.com/Azure/azure-amqp-common-go/v3 v3.2.3/go.mod h1:7rPmbSfszeovxGfc5fSAXE github.com/Azure/azure-pipeline-go v0.2.2/go.mod h1:4rQ/NZncSvGqNkkOsNpOU1tgoNuIlp9AfUH5G1tvCHc= github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U= github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k= -github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0 h1:sVPhtT2qjO86rTUaWMr4WoES4TkjGnzcioXcnHV9s5k= -github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= -github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0 h1:Yoicul8bnVdQrhDMTHxdEckRGX01XvwXDHUT9zYZ3k0= -github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0/go.mod h1:+6sju8gk8FRmSajX3Oz4G5Gm7P+mbqE9FVaXXFYTkCM= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 h1:Ut0ZGdOwJDw0npYEg+TLlPls3Pq6JiZaP2/aGKir7Zw= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 h1:QkAcEIAKbNL4KoFr4SathZPhDhF4mVwpBMFlYjyAqy8= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0= github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 h1:jp0dGvZ7ZK0mgqnTSClMxa5xuRL7NZgHameVYF6BurY= github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w= github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.4.0 h1:0nJeKDmB7a1a8RDMjTltahlPsaNlWjq/LpkZleSwINk= @@ -93,8 +93,8 @@ github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk= github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo= github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU= -github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 h1:WVsrXCnHlDDX8ls+tootqRE87/hL9S/g4ewig9RsD/c= -github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4= +github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 h1:BWe8a+f/t+7KY7zH2mqygeUD0t8hNFXe08p1Pb3/jKE= +github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= diff --git a/tests/certification/bindings/azure/cosmosdb/go.mod b/tests/certification/bindings/azure/cosmosdb/go.mod index cbb17d979..791fe3b9f 100644 --- a/tests/certification/bindings/azure/cosmosdb/go.mod +++ b/tests/certification/bindings/azure/cosmosdb/go.mod @@ -19,8 +19,8 @@ require ( github.com/AdhityaRamadhanus/fasthttpcors v0.0.0-20170121111917-d4c07198763a // indirect github.com/Azure/azure-amqp-common-go/v3 v3.2.3 // indirect github.com/Azure/azure-pipeline-go v0.2.3 // indirect - github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0 // indirect - github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 // indirect github.com/Azure/azure-storage-blob-go v0.10.0 // indirect github.com/Azure/go-autorest v14.2.0+incompatible // indirect @@ -31,7 +31,7 @@ require ( github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect github.com/Azure/go-autorest/logger v0.2.1 // indirect github.com/Azure/go-autorest/tracing v0.6.0 // indirect - github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 // indirect + github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 // indirect github.com/PuerkitoBio/purell v1.1.1 // indirect github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect github.com/andybalholm/brotli v1.0.2 // indirect diff --git a/tests/certification/bindings/azure/cosmosdb/go.sum b/tests/certification/bindings/azure/cosmosdb/go.sum index 05792d094..85afac3cb 100644 --- a/tests/certification/bindings/azure/cosmosdb/go.sum +++ b/tests/certification/bindings/azure/cosmosdb/go.sum @@ -48,10 +48,10 @@ github.com/Azure/azure-amqp-common-go/v3 v3.2.3/go.mod h1:7rPmbSfszeovxGfc5fSAXE github.com/Azure/azure-pipeline-go v0.2.2/go.mod h1:4rQ/NZncSvGqNkkOsNpOU1tgoNuIlp9AfUH5G1tvCHc= github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U= github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k= -github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0 h1:sVPhtT2qjO86rTUaWMr4WoES4TkjGnzcioXcnHV9s5k= -github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= -github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0 h1:Yoicul8bnVdQrhDMTHxdEckRGX01XvwXDHUT9zYZ3k0= -github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0/go.mod h1:+6sju8gk8FRmSajX3Oz4G5Gm7P+mbqE9FVaXXFYTkCM= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 h1:Ut0ZGdOwJDw0npYEg+TLlPls3Pq6JiZaP2/aGKir7Zw= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 h1:QkAcEIAKbNL4KoFr4SathZPhDhF4mVwpBMFlYjyAqy8= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0= github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 h1:jp0dGvZ7ZK0mgqnTSClMxa5xuRL7NZgHameVYF6BurY= github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w= github.com/Azure/azure-storage-blob-go v0.10.0 h1:evCwGreYo3XLeBV4vSxLbLiYb6e0SzsJiXQVRGsRXxs= @@ -91,8 +91,8 @@ github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk= github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo= github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU= -github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 h1:WVsrXCnHlDDX8ls+tootqRE87/hL9S/g4ewig9RsD/c= -github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4= +github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 h1:BWe8a+f/t+7KY7zH2mqygeUD0t8hNFXe08p1Pb3/jKE= +github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= diff --git a/tests/certification/bindings/azure/eventhubs/go.mod b/tests/certification/bindings/azure/eventhubs/go.mod index 0d55f7398..2d592bfbe 100644 --- a/tests/certification/bindings/azure/eventhubs/go.mod +++ b/tests/certification/bindings/azure/eventhubs/go.mod @@ -20,9 +20,9 @@ require ( github.com/Azure/azure-amqp-common-go/v3 v3.2.3 // indirect github.com/Azure/azure-event-hubs-go/v3 v3.3.18 // indirect github.com/Azure/azure-pipeline-go v0.2.3 // indirect - github.com/Azure/azure-sdk-for-go v63.4.0+incompatible // indirect - github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0 // indirect - github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0 // indirect + github.com/Azure/azure-sdk-for-go v65.0.0+incompatible // indirect + github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 // indirect github.com/Azure/azure-storage-blob-go v0.10.0 // indirect github.com/Azure/go-amqp v0.17.4 // indirect @@ -36,7 +36,7 @@ require ( github.com/Azure/go-autorest/autorest/validation v0.3.1 // indirect github.com/Azure/go-autorest/logger v0.2.1 // indirect github.com/Azure/go-autorest/tracing v0.6.0 // indirect - github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 // indirect + github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 // indirect github.com/PuerkitoBio/purell v1.1.1 // indirect github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect github.com/andybalholm/brotli v1.0.2 // indirect diff --git a/tests/certification/bindings/azure/eventhubs/go.sum b/tests/certification/bindings/azure/eventhubs/go.sum index d407fe781..6ec5fdb43 100644 --- a/tests/certification/bindings/azure/eventhubs/go.sum +++ b/tests/certification/bindings/azure/eventhubs/go.sum @@ -53,12 +53,12 @@ github.com/Azure/azure-pipeline-go v0.2.2/go.mod h1:4rQ/NZncSvGqNkkOsNpOU1tgoNuI github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U= github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k= github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= -github.com/Azure/azure-sdk-for-go v63.4.0+incompatible h1:fle3M5Q7vr8auaiPffKyUQmLbvYeqpw30bKU6PrWJFo= -github.com/Azure/azure-sdk-for-go v63.4.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= -github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0 h1:sVPhtT2qjO86rTUaWMr4WoES4TkjGnzcioXcnHV9s5k= -github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= -github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0 h1:Yoicul8bnVdQrhDMTHxdEckRGX01XvwXDHUT9zYZ3k0= -github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0/go.mod h1:+6sju8gk8FRmSajX3Oz4G5Gm7P+mbqE9FVaXXFYTkCM= +github.com/Azure/azure-sdk-for-go v65.0.0+incompatible h1:HzKLt3kIwMm4KeJYTdx9EbjRYTySD/t8i1Ee/W5EGXw= +github.com/Azure/azure-sdk-for-go v65.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 h1:Ut0ZGdOwJDw0npYEg+TLlPls3Pq6JiZaP2/aGKir7Zw= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 h1:QkAcEIAKbNL4KoFr4SathZPhDhF4mVwpBMFlYjyAqy8= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0= github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 h1:jp0dGvZ7ZK0mgqnTSClMxa5xuRL7NZgHameVYF6BurY= github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w= github.com/Azure/azure-storage-blob-go v0.6.0/go.mod h1:oGfmITT1V6x//CswqY2gtAHND+xIP64/qL7a5QJix0Y= @@ -110,8 +110,8 @@ github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk= github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo= github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU= -github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 h1:WVsrXCnHlDDX8ls+tootqRE87/hL9S/g4ewig9RsD/c= -github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4= +github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 h1:BWe8a+f/t+7KY7zH2mqygeUD0t8hNFXe08p1Pb3/jKE= +github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= diff --git a/tests/certification/bindings/azure/servicebusqueues/go.mod b/tests/certification/bindings/azure/servicebusqueues/go.mod index d55683a0a..c295a0a03 100644 --- a/tests/certification/bindings/azure/servicebusqueues/go.mod +++ b/tests/certification/bindings/azure/servicebusqueues/go.mod @@ -18,11 +18,12 @@ require ( github.com/AdhityaRamadhanus/fasthttpcors v0.0.0-20170121111917-d4c07198763a // indirect github.com/Azure/azure-amqp-common-go/v3 v3.2.3 // indirect github.com/Azure/azure-pipeline-go v0.2.3 // indirect - github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0 // indirect - github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.0.1 // indirect github.com/Azure/azure-storage-blob-go v0.10.0 // indirect + github.com/Azure/go-amqp v0.17.4 // indirect github.com/Azure/go-autorest v14.2.0+incompatible // indirect github.com/Azure/go-autorest/autorest v0.11.27 // indirect github.com/Azure/go-autorest/autorest/adal v0.9.18 // indirect @@ -31,7 +32,7 @@ require ( github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect github.com/Azure/go-autorest/logger v0.2.1 // indirect github.com/Azure/go-autorest/tracing v0.6.0 // indirect - github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 // indirect + github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 // indirect github.com/PuerkitoBio/purell v1.1.1 // indirect github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 // indirect diff --git a/tests/certification/bindings/azure/servicebusqueues/go.sum b/tests/certification/bindings/azure/servicebusqueues/go.sum index 8f6d410aa..d1663b7ea 100644 --- a/tests/certification/bindings/azure/servicebusqueues/go.sum +++ b/tests/certification/bindings/azure/servicebusqueues/go.sum @@ -48,10 +48,10 @@ github.com/Azure/azure-amqp-common-go/v3 v3.2.3/go.mod h1:7rPmbSfszeovxGfc5fSAXE github.com/Azure/azure-pipeline-go v0.2.2/go.mod h1:4rQ/NZncSvGqNkkOsNpOU1tgoNuIlp9AfUH5G1tvCHc= github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U= github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k= -github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0 h1:sVPhtT2qjO86rTUaWMr4WoES4TkjGnzcioXcnHV9s5k= -github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= -github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0 h1:Yoicul8bnVdQrhDMTHxdEckRGX01XvwXDHUT9zYZ3k0= -github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0/go.mod h1:+6sju8gk8FRmSajX3Oz4G5Gm7P+mbqE9FVaXXFYTkCM= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 h1:Ut0ZGdOwJDw0npYEg+TLlPls3Pq6JiZaP2/aGKir7Zw= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 h1:QkAcEIAKbNL4KoFr4SathZPhDhF4mVwpBMFlYjyAqy8= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0= github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 h1:jp0dGvZ7ZK0mgqnTSClMxa5xuRL7NZgHameVYF6BurY= github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w= github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.0.1 h1:hOHIC1pSoJsFrXBQlXYt+w0OKAx1MzCr4KiLXjylyac= @@ -61,6 +61,7 @@ github.com/Azure/azure-storage-blob-go v0.10.0 h1:evCwGreYo3XLeBV4vSxLbLiYb6e0Sz github.com/Azure/azure-storage-blob-go v0.10.0/go.mod h1:ep1edmW+kNQx4UfWM9heESNmQdijykocJ0YOxmMX8SE= github.com/Azure/go-amqp v0.17.0/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg= github.com/Azure/go-amqp v0.17.4 h1:6t9wEiwA4uXMRoUj3Cd3K2gmH8cW8ylizmBnSeF0bzM= +github.com/Azure/go-amqp v0.17.4/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg= github.com/Azure/go-ansiterm v0.0.0-20210608223527-2377c96fe795/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs= @@ -96,8 +97,8 @@ github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk= github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo= github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU= -github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 h1:WVsrXCnHlDDX8ls+tootqRE87/hL9S/g4ewig9RsD/c= -github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4= +github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 h1:BWe8a+f/t+7KY7zH2mqygeUD0t8hNFXe08p1Pb3/jKE= +github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= @@ -230,6 +231,7 @@ github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYF github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= diff --git a/tests/certification/pubsub/azure/eventhubs/go.mod b/tests/certification/pubsub/azure/eventhubs/go.mod index 349d24d35..3a0ca04fd 100644 --- a/tests/certification/pubsub/azure/eventhubs/go.mod +++ b/tests/certification/pubsub/azure/eventhubs/go.mod @@ -20,9 +20,9 @@ require ( github.com/Azure/azure-amqp-common-go/v3 v3.2.3 // indirect github.com/Azure/azure-event-hubs-go/v3 v3.3.18 // indirect github.com/Azure/azure-pipeline-go v0.2.3 // indirect - github.com/Azure/azure-sdk-for-go v63.4.0+incompatible // indirect - github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0 // indirect - github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0 // indirect + github.com/Azure/azure-sdk-for-go v65.0.0+incompatible // indirect + github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 // indirect github.com/Azure/azure-storage-blob-go v0.10.0 // indirect github.com/Azure/go-amqp v0.17.4 // indirect @@ -36,7 +36,7 @@ require ( github.com/Azure/go-autorest/autorest/validation v0.3.1 // indirect github.com/Azure/go-autorest/logger v0.2.1 // indirect github.com/Azure/go-autorest/tracing v0.6.0 // indirect - github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 // indirect + github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 // indirect github.com/PuerkitoBio/purell v1.1.1 // indirect github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect github.com/andybalholm/brotli v1.0.2 // indirect diff --git a/tests/certification/pubsub/azure/eventhubs/go.sum b/tests/certification/pubsub/azure/eventhubs/go.sum index 87a7a7b3b..dfab37ad2 100644 --- a/tests/certification/pubsub/azure/eventhubs/go.sum +++ b/tests/certification/pubsub/azure/eventhubs/go.sum @@ -53,12 +53,12 @@ github.com/Azure/azure-pipeline-go v0.2.2/go.mod h1:4rQ/NZncSvGqNkkOsNpOU1tgoNuI github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U= github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k= github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= -github.com/Azure/azure-sdk-for-go v63.4.0+incompatible h1:fle3M5Q7vr8auaiPffKyUQmLbvYeqpw30bKU6PrWJFo= -github.com/Azure/azure-sdk-for-go v63.4.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= -github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0 h1:sVPhtT2qjO86rTUaWMr4WoES4TkjGnzcioXcnHV9s5k= -github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= -github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0 h1:Yoicul8bnVdQrhDMTHxdEckRGX01XvwXDHUT9zYZ3k0= -github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0/go.mod h1:+6sju8gk8FRmSajX3Oz4G5Gm7P+mbqE9FVaXXFYTkCM= +github.com/Azure/azure-sdk-for-go v65.0.0+incompatible h1:HzKLt3kIwMm4KeJYTdx9EbjRYTySD/t8i1Ee/W5EGXw= +github.com/Azure/azure-sdk-for-go v65.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 h1:Ut0ZGdOwJDw0npYEg+TLlPls3Pq6JiZaP2/aGKir7Zw= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 h1:QkAcEIAKbNL4KoFr4SathZPhDhF4mVwpBMFlYjyAqy8= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0= github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 h1:jp0dGvZ7ZK0mgqnTSClMxa5xuRL7NZgHameVYF6BurY= github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w= github.com/Azure/azure-storage-blob-go v0.6.0/go.mod h1:oGfmITT1V6x//CswqY2gtAHND+xIP64/qL7a5QJix0Y= @@ -110,8 +110,8 @@ github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk= github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo= github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU= -github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 h1:WVsrXCnHlDDX8ls+tootqRE87/hL9S/g4ewig9RsD/c= -github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4= +github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 h1:BWe8a+f/t+7KY7zH2mqygeUD0t8hNFXe08p1Pb3/jKE= +github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= diff --git a/tests/certification/secretstores/azure/keyvault/go.mod b/tests/certification/secretstores/azure/keyvault/go.mod index 225a97fd4..47205310a 100644 --- a/tests/certification/secretstores/azure/keyvault/go.mod +++ b/tests/certification/secretstores/azure/keyvault/go.mod @@ -17,8 +17,8 @@ require ( github.com/AdhityaRamadhanus/fasthttpcors v0.0.0-20170121111917-d4c07198763a // indirect github.com/Azure/azure-amqp-common-go/v3 v3.2.3 // indirect github.com/Azure/azure-pipeline-go v0.2.3 // indirect - github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0 // indirect - github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets v0.7.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/keyvault/internal v0.5.0 // indirect @@ -31,7 +31,7 @@ require ( github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect github.com/Azure/go-autorest/logger v0.2.1 // indirect github.com/Azure/go-autorest/tracing v0.6.0 // indirect - github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 // indirect + github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 // indirect github.com/PuerkitoBio/purell v1.1.1 // indirect github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect github.com/andybalholm/brotli v1.0.2 // indirect diff --git a/tests/certification/secretstores/azure/keyvault/go.sum b/tests/certification/secretstores/azure/keyvault/go.sum index b8c3fa9fd..f81a8c0e6 100644 --- a/tests/certification/secretstores/azure/keyvault/go.sum +++ b/tests/certification/secretstores/azure/keyvault/go.sum @@ -48,10 +48,10 @@ github.com/Azure/azure-amqp-common-go/v3 v3.2.3/go.mod h1:7rPmbSfszeovxGfc5fSAXE github.com/Azure/azure-pipeline-go v0.2.2/go.mod h1:4rQ/NZncSvGqNkkOsNpOU1tgoNuIlp9AfUH5G1tvCHc= github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U= github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k= -github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0 h1:sVPhtT2qjO86rTUaWMr4WoES4TkjGnzcioXcnHV9s5k= -github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= -github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0 h1:Yoicul8bnVdQrhDMTHxdEckRGX01XvwXDHUT9zYZ3k0= -github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0/go.mod h1:+6sju8gk8FRmSajX3Oz4G5Gm7P+mbqE9FVaXXFYTkCM= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 h1:Ut0ZGdOwJDw0npYEg+TLlPls3Pq6JiZaP2/aGKir7Zw= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 h1:QkAcEIAKbNL4KoFr4SathZPhDhF4mVwpBMFlYjyAqy8= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0= github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 h1:jp0dGvZ7ZK0mgqnTSClMxa5xuRL7NZgHameVYF6BurY= github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w= github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets v0.7.1 h1:X7FHRMKr0u5YiPnD6L/nqG64XBOcK0IYavhAHBQEmms= @@ -95,8 +95,8 @@ github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk= github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo= github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU= -github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 h1:WVsrXCnHlDDX8ls+tootqRE87/hL9S/g4ewig9RsD/c= -github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4= +github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 h1:BWe8a+f/t+7KY7zH2mqygeUD0t8hNFXe08p1Pb3/jKE= +github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=