Shared implementation for Azure Service Bus subscriptions for binding and pubsub (#1791)
* Moved ASB subscription code to a shared package Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> * Moved ASBQ binding to use the shared ASB implementation Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> * Updating Azure Service Bus SDK version Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
parent
f25b26fc0c
commit
a08988c25e
|
@ -0,0 +1,124 @@
|
|||
package servicebusqueues
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
|
||||
|
||||
"github.com/dapr/components-contrib/bindings"
|
||||
contrib_metadata "github.com/dapr/components-contrib/metadata"
|
||||
)
|
||||
|
||||
type serviceBusQueuesMetadata struct {
|
||||
ConnectionString string `json:"connectionString"`
|
||||
NamespaceName string `json:"namespaceName,omitempty"`
|
||||
QueueName string `json:"queueName"`
|
||||
TimeoutInSec int `json:"timeoutInSec"`
|
||||
MaxConnectionRecoveryInSec int `json:"maxConnectionRecoveryInSec"`
|
||||
MinConnectionRecoveryInSec int `json:"minConnectionRecoveryInSec"`
|
||||
MaxRetriableErrorsPerSec *int `json:"maxRetriableErrorsPerSec"`
|
||||
MaxActiveMessages int `json:"maxActiveMessages"`
|
||||
LockRenewalInSec int `json:"lockRenewalInSec"`
|
||||
MaxConcurrentHandlers int `json:"maxConcurrentHandlers"`
|
||||
ttl time.Duration
|
||||
}
|
||||
|
||||
const (
|
||||
// Default time to live for queues, which is 14 days. The same way Azure Portal does.
|
||||
defaultMessageTimeToLive = time.Hour * 24 * 14
|
||||
|
||||
// Default timeout in seconds
|
||||
defaultTimeoutInSec = 60
|
||||
|
||||
// Default minimum and maximum recovery time while trying to reconnect
|
||||
defaultMinConnectionRecoveryInSec = 2
|
||||
defaultMaxConnectionRecoveryInSec = 300
|
||||
|
||||
// Default lock renewal interval, in seconds
|
||||
defaultLockRenewalInSec = 20
|
||||
|
||||
// Default number of max active messages
|
||||
// Max active messages should be >= max concurrent handlers
|
||||
defaultMaxActiveMessages = 1
|
||||
|
||||
// Default number of max concurrent handlers
|
||||
// For backwards-compatibility reasons, this only handles one message at a time
|
||||
defaultMaxConcurrentHandlers = 1
|
||||
|
||||
// Default rate of retriable errors per second
|
||||
defaultMaxRetriableErrorsPerSec = 10
|
||||
)
|
||||
|
||||
func (a *AzureServiceBusQueues) parseMetadata(metadata bindings.Metadata) (*serviceBusQueuesMetadata, error) {
|
||||
b, err := json.Marshal(metadata.Properties)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var m serviceBusQueuesMetadata
|
||||
err = json.Unmarshal(b, &m)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if m.ConnectionString != "" && m.NamespaceName != "" {
|
||||
return nil, errors.New("connectionString and namespaceName are mutually exclusive")
|
||||
}
|
||||
|
||||
ttl, ok, err := contrib_metadata.TryGetTTL(metadata.Properties)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !ok {
|
||||
// set the same default message time to live as suggested in Azure Portal to 14 days (otherwise it will be 10675199 days)
|
||||
ttl = defaultMessageTimeToLive
|
||||
}
|
||||
m.ttl = ttl
|
||||
|
||||
// Queue names are case-insensitive and are forced to lowercase. This mimics the Azure portal's behavior.
|
||||
m.QueueName = strings.ToLower(m.QueueName)
|
||||
|
||||
if m.TimeoutInSec < 1 {
|
||||
m.TimeoutInSec = defaultTimeoutInSec
|
||||
}
|
||||
|
||||
if m.MinConnectionRecoveryInSec < 1 {
|
||||
m.MinConnectionRecoveryInSec = defaultMinConnectionRecoveryInSec
|
||||
}
|
||||
|
||||
if m.MaxConnectionRecoveryInSec < 1 {
|
||||
m.MaxConnectionRecoveryInSec = defaultMaxConnectionRecoveryInSec
|
||||
}
|
||||
|
||||
if m.MinConnectionRecoveryInSec > m.MaxConnectionRecoveryInSec {
|
||||
return nil, errors.New("maxConnectionRecoveryInSec must be greater than minConnectionRecoveryInSec")
|
||||
}
|
||||
|
||||
if m.MaxActiveMessages < 1 {
|
||||
m.MaxActiveMessages = defaultMaxActiveMessages
|
||||
}
|
||||
|
||||
if m.MaxConcurrentHandlers < 1 {
|
||||
m.MaxConcurrentHandlers = defaultMaxConcurrentHandlers
|
||||
}
|
||||
|
||||
if m.MaxConcurrentHandlers > m.MaxActiveMessages {
|
||||
return nil, errors.New("maxConcurrentHandlers cannot be bigger than maxActiveMessages")
|
||||
}
|
||||
|
||||
if m.LockRenewalInSec < 1 {
|
||||
m.LockRenewalInSec = defaultLockRenewalInSec
|
||||
}
|
||||
|
||||
if m.MaxRetriableErrorsPerSec == nil {
|
||||
m.MaxRetriableErrorsPerSec = to.Ptr(defaultMaxRetriableErrorsPerSec)
|
||||
}
|
||||
if *m.MaxRetriableErrorsPerSec < 0 {
|
||||
return nil, errors.New("maxRetriableErrorsPerSec must be non-negative")
|
||||
}
|
||||
|
||||
return &m, nil
|
||||
}
|
|
@ -39,14 +39,14 @@ func TestParseMetadata(t *testing.T) {
|
|||
properties: map[string]string{"connectionString": "connString", "queueName": "queue1"},
|
||||
expectedConnectionString: "connString",
|
||||
expectedQueueName: "queue1",
|
||||
expectedTTL: azureServiceBusDefaultMessageTimeToLive,
|
||||
expectedTTL: defaultMessageTimeToLive,
|
||||
},
|
||||
{
|
||||
name: "Empty TTL",
|
||||
properties: map[string]string{"connectionString": "connString", "queueName": "queue1", metadata.TTLMetadataKey: ""},
|
||||
expectedConnectionString: "connString",
|
||||
expectedQueueName: "queue1",
|
||||
expectedTTL: azureServiceBusDefaultMessageTimeToLive,
|
||||
expectedTTL: defaultMessageTimeToLive,
|
||||
},
|
||||
{
|
||||
name: "With TTL",
|
|
@ -15,67 +15,40 @@ package servicebusqueues
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
|
||||
servicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
|
||||
sbadmin "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin"
|
||||
"github.com/Azure/go-amqp"
|
||||
backoff "github.com/cenkalti/backoff/v4"
|
||||
"go.uber.org/ratelimit"
|
||||
|
||||
azauth "github.com/dapr/components-contrib/authentication/azure"
|
||||
"github.com/dapr/components-contrib/bindings"
|
||||
impl "github.com/dapr/components-contrib/internal/component/azure/servicebus"
|
||||
contrib_metadata "github.com/dapr/components-contrib/metadata"
|
||||
"github.com/dapr/kit/logger"
|
||||
"github.com/dapr/kit/retry"
|
||||
)
|
||||
|
||||
const (
|
||||
correlationID = "correlationID"
|
||||
label = "label"
|
||||
id = "id"
|
||||
|
||||
// azureServiceBusDefaultMessageTimeToLive defines the default time to live for queues, which is 14 days. The same way Azure Portal does.
|
||||
azureServiceBusDefaultMessageTimeToLive = time.Hour * 24 * 14
|
||||
|
||||
// Default timeout in seconds
|
||||
defaultTimeoutInSec = 60
|
||||
|
||||
// Default minimum and maximum recovery time while trying to reconnect
|
||||
defaultMinConnectionRecoveryInSec = 2
|
||||
defaultMaxConnectionRecoveryInSec = 300
|
||||
|
||||
// Default rate of retriable errors per second
|
||||
defaultMaxRetriableErrorsPerSec = 10
|
||||
)
|
||||
|
||||
// AzureServiceBusQueues is an input/output binding reading from and sending events to Azure Service Bus queues.
|
||||
type AzureServiceBusQueues struct {
|
||||
metadata *serviceBusQueuesMetadata
|
||||
client *servicebus.Client
|
||||
adminClient *sbadmin.Client
|
||||
timeout time.Duration
|
||||
sender *servicebus.Sender
|
||||
senderLock sync.RWMutex
|
||||
retriableErrLimit ratelimit.Limiter
|
||||
logger logger.Logger
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
type serviceBusQueuesMetadata struct {
|
||||
ConnectionString string `json:"connectionString"`
|
||||
NamespaceName string `json:"namespaceName,omitempty"`
|
||||
QueueName string `json:"queueName"`
|
||||
TimeoutInSec int `json:"timeoutInSec"`
|
||||
MaxConnectionRecoveryInSec int `json:"maxConnectionRecoveryInSec"`
|
||||
MinConnectionRecoveryInSec int `json:"minConnectionRecoveryInSec"`
|
||||
MaxRetriableErrorsPerSec *int `json:"maxRetriableErrorsPerSec"`
|
||||
ttl time.Duration
|
||||
metadata *serviceBusQueuesMetadata
|
||||
client *servicebus.Client
|
||||
adminClient *sbadmin.Client
|
||||
timeout time.Duration
|
||||
sender *servicebus.Sender
|
||||
senderLock sync.RWMutex
|
||||
logger logger.Logger
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
// NewAzureServiceBusQueues returns a new AzureServiceBusQueues instance.
|
||||
|
@ -93,11 +66,6 @@ func (a *AzureServiceBusQueues) Init(metadata bindings.Metadata) (err error) {
|
|||
return err
|
||||
}
|
||||
a.timeout = time.Duration(a.metadata.TimeoutInSec) * time.Second
|
||||
if *a.metadata.MaxRetriableErrorsPerSec > 0 {
|
||||
a.retriableErrLimit = ratelimit.New(*a.metadata.MaxRetriableErrorsPerSec)
|
||||
} else {
|
||||
a.retriableErrLimit = ratelimit.NewUnlimited()
|
||||
}
|
||||
|
||||
userAgent := "dapr-" + logger.DaprVersion
|
||||
if a.metadata.ConnectionString != "" {
|
||||
|
@ -164,61 +132,6 @@ func (a *AzureServiceBusQueues) Init(metadata bindings.Metadata) (err error) {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (a *AzureServiceBusQueues) parseMetadata(metadata bindings.Metadata) (*serviceBusQueuesMetadata, error) {
|
||||
b, err := json.Marshal(metadata.Properties)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var m serviceBusQueuesMetadata
|
||||
err = json.Unmarshal(b, &m)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if m.ConnectionString != "" && m.NamespaceName != "" {
|
||||
return nil, errors.New("connectionString and namespaceName are mutually exclusive")
|
||||
}
|
||||
|
||||
ttl, ok, err := contrib_metadata.TryGetTTL(metadata.Properties)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !ok {
|
||||
// set the same default message time to live as suggested in Azure Portal to 14 days (otherwise it will be 10675199 days)
|
||||
ttl = azureServiceBusDefaultMessageTimeToLive
|
||||
}
|
||||
m.ttl = ttl
|
||||
|
||||
// Queue names are case-insensitive and are forced to lowercase. This mimics the Azure portal's behavior.
|
||||
m.QueueName = strings.ToLower(m.QueueName)
|
||||
|
||||
if m.TimeoutInSec < 1 {
|
||||
m.TimeoutInSec = defaultTimeoutInSec
|
||||
}
|
||||
|
||||
if m.MinConnectionRecoveryInSec < 1 {
|
||||
m.MinConnectionRecoveryInSec = defaultMinConnectionRecoveryInSec
|
||||
}
|
||||
|
||||
if m.MaxConnectionRecoveryInSec < 1 {
|
||||
m.MaxConnectionRecoveryInSec = defaultMaxConnectionRecoveryInSec
|
||||
}
|
||||
|
||||
if m.MinConnectionRecoveryInSec > m.MaxConnectionRecoveryInSec {
|
||||
return nil, errors.New("maxConnectionRecoveryInSec must be greater than minConnectionRecoveryInSec")
|
||||
}
|
||||
|
||||
if m.MaxRetriableErrorsPerSec == nil {
|
||||
m.MaxRetriableErrorsPerSec = to.Ptr(defaultMaxRetriableErrorsPerSec)
|
||||
}
|
||||
if *m.MaxRetriableErrorsPerSec < 0 {
|
||||
return nil, errors.New("maxRetriableErrorsPerSec must be non-negative")
|
||||
}
|
||||
|
||||
return &m, nil
|
||||
}
|
||||
|
||||
func (a *AzureServiceBusQueues) Operations() []bindings.OperationKind {
|
||||
return []bindings.OperationKind{bindings.CreateOperation}
|
||||
}
|
||||
|
@ -264,86 +177,67 @@ func (a *AzureServiceBusQueues) Invoke(ctx context.Context, req *bindings.Invoke
|
|||
}
|
||||
|
||||
func (a *AzureServiceBusQueues) Read(handler bindings.Handler) error {
|
||||
subscribeCtx := context.Background()
|
||||
|
||||
// Reconnection backoff policy
|
||||
bo := backoff.NewExponentialBackOff()
|
||||
bo.MaxElapsedTime = 0
|
||||
bo.InitialInterval = time.Duration(a.metadata.MinConnectionRecoveryInSec) * time.Second
|
||||
bo.MaxInterval = time.Duration(a.metadata.MaxConnectionRecoveryInSec) * time.Second
|
||||
|
||||
// Reconnect loop.
|
||||
for {
|
||||
receiver, _ := a.attemptConnectionForever(a.ctx)
|
||||
if receiver == nil {
|
||||
a.logger.Errorf("Failed to connect to Azure Service Bus Queue.")
|
||||
continue
|
||||
sub := impl.NewSubscription(
|
||||
subscribeCtx,
|
||||
a.metadata.MaxActiveMessages,
|
||||
a.metadata.TimeoutInSec,
|
||||
*a.metadata.MaxRetriableErrorsPerSec,
|
||||
&a.metadata.MaxConcurrentHandlers,
|
||||
"queue "+a.metadata.QueueName,
|
||||
a.logger,
|
||||
)
|
||||
|
||||
// Blocks until a successful connection (or until context is canceled)
|
||||
err := sub.Connect(func() (*servicebus.Receiver, error) {
|
||||
return a.client.NewReceiverForQueue(a.metadata.QueueName, nil)
|
||||
})
|
||||
if err != nil {
|
||||
// Realistically, the only time we should get to this point is if the context was canceled, but let's log any other error we may get.
|
||||
if err != context.Canceled {
|
||||
a.logger.Warnf("Error reading from Azure Service Bus Queue binding: %s", err.Error())
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
// Receive messages loop
|
||||
// This continues until the context is canceled
|
||||
for a.ctx.Err() == nil {
|
||||
// Blocks until the connection is closed or the context is canceled
|
||||
msgs, err := receiver.ReceiveMessages(a.ctx, 1, nil)
|
||||
if err != nil {
|
||||
if err != context.Canceled {
|
||||
a.logger.Warnf("Error reading from Azure Service Bus Queue binding: %s", err.Error())
|
||||
}
|
||||
// Exit from the receive loop to force a reconnection
|
||||
break
|
||||
}
|
||||
|
||||
// If we got a message, reset the reconnection backoff
|
||||
bo.Reset()
|
||||
|
||||
l := len(msgs)
|
||||
if l == 0 {
|
||||
// We got no message, which is unusual too
|
||||
a.logger.Warn("Received 0 messages from Service Bus")
|
||||
continue
|
||||
} else if l > 1 {
|
||||
// We are requesting one message only; this should never happen
|
||||
a.logger.Errorf("Expected one message from Service Bus, but received %d", l)
|
||||
}
|
||||
|
||||
msg := msgs[0]
|
||||
|
||||
metadata := make(map[string]string)
|
||||
metadata[id] = msg.MessageID
|
||||
if msg.CorrelationID != nil {
|
||||
metadata[correlationID] = *msg.CorrelationID
|
||||
}
|
||||
if msg.Subject != nil {
|
||||
metadata[label] = *msg.Subject
|
||||
}
|
||||
|
||||
_, err = handler(a.ctx, &bindings.ReadResponse{
|
||||
Data: msg.Body,
|
||||
Metadata: metadata,
|
||||
})
|
||||
if err != nil {
|
||||
a.abandonMessage(receiver, msg)
|
||||
continue
|
||||
}
|
||||
|
||||
// Use a background context in case a.ctx has been canceled already
|
||||
ctx, cancel := context.WithTimeout(context.Background(), a.timeout)
|
||||
err = receiver.CompleteMessage(ctx, msg, nil)
|
||||
cancel()
|
||||
if err != nil {
|
||||
a.logger.Warnf("Error completing message: %s", err.Error())
|
||||
continue
|
||||
// ReceiveAndBlock will only return with an error that it cannot handle internally. The subscription connection is closed when this method returns.
|
||||
// If that occurs, we will log the error and attempt to re-establish the subscription connection until we exhaust the number of reconnect attempts.
|
||||
err = sub.ReceiveAndBlock(
|
||||
a.getHandlerFunc(handler),
|
||||
a.metadata.LockRenewalInSec,
|
||||
func() {
|
||||
// Reset the backoff when the subscription is successful and we have received the first message
|
||||
bo.Reset()
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
var detachError *amqp.DetachError
|
||||
var amqpError *amqp.Error
|
||||
if errors.Is(err, detachError) ||
|
||||
(errors.As(err, &amqpError) && amqpError.Condition == amqp.ErrorDetachForced) {
|
||||
a.logger.Debug(err)
|
||||
} else {
|
||||
a.logger.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Disconnect (gracefully) before attempting to re-connect (unless we're shutting down)
|
||||
// Use a background context here because a.ctx may be canceled already at this stage
|
||||
ctx, cancel := context.WithTimeout(context.Background(), a.timeout)
|
||||
if err := receiver.Close(ctx); err != nil {
|
||||
// Log only
|
||||
a.logger.Warnf("Error closing receiver of Azure Service Bus Queue binding: %s", err.Error())
|
||||
}
|
||||
cancel()
|
||||
// Gracefully close the connection (in case it's not closed already)
|
||||
// Use a background context here (with timeout) because ctx may be closed already
|
||||
closeCtx, closeCancel := context.WithTimeout(context.Background(), time.Second*time.Duration(a.metadata.TimeoutInSec))
|
||||
sub.Close(closeCtx)
|
||||
closeCancel()
|
||||
|
||||
// Reconnect until context is canceled
|
||||
if a.ctx.Err() != nil {
|
||||
// If context was canceled, do not attempt to reconnect
|
||||
if subscribeCtx.Err() != nil {
|
||||
a.logger.Debug("Context canceled; will not reconnect")
|
||||
break
|
||||
}
|
||||
|
@ -352,55 +246,27 @@ func (a *AzureServiceBusQueues) Read(handler bindings.Handler) error {
|
|||
a.logger.Warnf("Subscription to queue %s lost connection, attempting to reconnect in %s...", a.metadata.QueueName, wait)
|
||||
time.Sleep(wait)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *AzureServiceBusQueues) abandonMessage(receiver *servicebus.Receiver, msg *servicebus.ReceivedMessage) {
|
||||
// Use a background context in case a.ctx has been canceled already
|
||||
ctx, cancel := context.WithTimeout(context.Background(), a.timeout)
|
||||
err := receiver.AbandonMessage(ctx, msg, nil)
|
||||
cancel()
|
||||
if err != nil {
|
||||
// Log only
|
||||
a.logger.Warnf("Error abandoning message: %s", err.Error())
|
||||
func (a *AzureServiceBusQueues) getHandlerFunc(handler bindings.Handler) impl.HandlerFunc {
|
||||
return func(ctx context.Context, msg *servicebus.ReceivedMessage) error {
|
||||
metadata := make(map[string]string)
|
||||
metadata[id] = msg.MessageID
|
||||
if msg.CorrelationID != nil {
|
||||
metadata[correlationID] = *msg.CorrelationID
|
||||
}
|
||||
if msg.Subject != nil {
|
||||
metadata[label] = *msg.Subject
|
||||
}
|
||||
|
||||
_, err := handler(a.ctx, &bindings.ReadResponse{
|
||||
Data: msg.Body,
|
||||
Metadata: metadata,
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
// If we're here, it means we got a retriable error, so we need to consume a retriable error token before this (synchronous) method returns
|
||||
// If there have been too many retriable errors per second, this method slows the consuming down
|
||||
a.logger.Debugf("Taking a retriable error token")
|
||||
before := time.Now()
|
||||
_ = a.retriableErrLimit.Take()
|
||||
a.logger.Debugf("Resumed after pausing for %v", time.Now().Sub(before))
|
||||
}
|
||||
|
||||
// Attempts to connect to a Service Bus queue and blocks until it succeeds; it can retry forever (until the context is canceled)
|
||||
func (a *AzureServiceBusQueues) attemptConnectionForever(ctx context.Context) (receiver *servicebus.Receiver, err error) {
|
||||
// Connections need to retry forever with a maximum backoff of 5 minutes and exponential scaling.
|
||||
config := retry.DefaultConfig()
|
||||
config.Policy = retry.PolicyExponential
|
||||
config.MaxInterval = 5 * time.Minute
|
||||
config.MaxElapsedTime = 0
|
||||
backoff := config.NewBackOffWithContext(ctx)
|
||||
|
||||
err = retry.NotifyRecover(
|
||||
func() error {
|
||||
clientAttempt, innerErr := a.client.NewReceiverForQueue(a.metadata.QueueName, nil)
|
||||
if innerErr != nil {
|
||||
return innerErr
|
||||
}
|
||||
receiver = clientAttempt
|
||||
return nil
|
||||
},
|
||||
backoff,
|
||||
func(err error, d time.Duration) {
|
||||
a.logger.Debugf("Failed to connect to Azure Service Bus Queue Binding with error: %s", err.Error())
|
||||
},
|
||||
func() {
|
||||
a.logger.Debug("Successfully reconnected to Azure Service Bus.")
|
||||
backoff.Reset()
|
||||
},
|
||||
)
|
||||
return receiver, err
|
||||
}
|
||||
|
||||
func (a *AzureServiceBusQueues) Close() (err error) {
|
||||
|
|
12
go.mod
12
go.mod
|
@ -9,10 +9,12 @@ require (
|
|||
cloud.google.com/go/storage v1.10.0
|
||||
github.com/Azure/azure-amqp-common-go/v3 v3.2.3
|
||||
github.com/Azure/azure-event-hubs-go/v3 v3.3.18
|
||||
github.com/Azure/azure-sdk-for-go v63.4.0+incompatible
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0
|
||||
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0
|
||||
github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1
|
||||
github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets v0.7.1
|
||||
github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.0.1
|
||||
github.com/Azure/azure-service-bus-go v0.10.10
|
||||
github.com/Azure/azure-storage-blob-go v0.10.0
|
||||
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd
|
||||
|
@ -157,8 +159,6 @@ require (
|
|||
require (
|
||||
cloud.google.com/go/secretmanager v1.4.0
|
||||
dubbo.apache.org/dubbo-go/v3 v3.0.3-0.20220610080020-48691a404537
|
||||
github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1
|
||||
github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.0.1
|
||||
github.com/apache/dubbo-go-hessian2 v1.11.0
|
||||
github.com/huaweicloud/huaweicloud-sdk-go-obs v3.21.12+incompatible
|
||||
github.com/huaweicloud/huaweicloud-sdk-go-v3 v0.0.87
|
||||
|
@ -177,7 +177,7 @@ require (
|
|||
cloud.google.com/go/kms v1.4.0 // indirect
|
||||
contrib.go.opencensus.io/exporter/prometheus v0.4.1 // indirect
|
||||
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 // indirect
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 // indirect
|
||||
github.com/RoaringBitmap/roaring v1.1.0 // indirect
|
||||
github.com/Workiva/go-datastructures v1.0.52 // indirect
|
||||
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 // indirect
|
||||
|
|
16
go.sum
16
go.sum
|
@ -92,12 +92,12 @@ github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9mo
|
|||
github.com/Azure/azure-sdk-for-go v37.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
|
||||
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
|
||||
github.com/Azure/azure-sdk-for-go v56.3.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
|
||||
github.com/Azure/azure-sdk-for-go v63.4.0+incompatible h1:fle3M5Q7vr8auaiPffKyUQmLbvYeqpw30bKU6PrWJFo=
|
||||
github.com/Azure/azure-sdk-for-go v63.4.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0 h1:sVPhtT2qjO86rTUaWMr4WoES4TkjGnzcioXcnHV9s5k=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0 h1:Yoicul8bnVdQrhDMTHxdEckRGX01XvwXDHUT9zYZ3k0=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0/go.mod h1:+6sju8gk8FRmSajX3Oz4G5Gm7P+mbqE9FVaXXFYTkCM=
|
||||
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible h1:HzKLt3kIwMm4KeJYTdx9EbjRYTySD/t8i1Ee/W5EGXw=
|
||||
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 h1:Ut0ZGdOwJDw0npYEg+TLlPls3Pq6JiZaP2/aGKir7Zw=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 h1:QkAcEIAKbNL4KoFr4SathZPhDhF4mVwpBMFlYjyAqy8=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1 h1:bFa9IcjvrCber6gGgDAUZ+I2bO8J7s8JxXmu9fhi2ss=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1/go.mod h1:l3wvZkG9oW07GLBW5Cd0WwG5asOfJ8aqE8raUvNzLpk=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 h1:jp0dGvZ7ZK0mgqnTSClMxa5xuRL7NZgHameVYF6BurY=
|
||||
|
@ -176,8 +176,8 @@ github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ
|
|||
github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk=
|
||||
github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo=
|
||||
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 h1:WVsrXCnHlDDX8ls+tootqRE87/hL9S/g4ewig9RsD/c=
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4=
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 h1:BWe8a+f/t+7KY7zH2mqygeUD0t8hNFXe08p1Pb3/jKE=
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/BurntSushi/toml v0.4.1 h1:GaI7EiDXDRfa8VshkTj7Fym7ha+y8/XxIgD2okUIjLw=
|
||||
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
|
||||
|
|
|
@ -15,25 +15,27 @@ package servicebus
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
azservicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
|
||||
"go.uber.org/ratelimit"
|
||||
|
||||
"github.com/dapr/components-contrib/pubsub"
|
||||
"github.com/dapr/kit/logger"
|
||||
"github.com/dapr/kit/retry"
|
||||
)
|
||||
|
||||
type subscription struct {
|
||||
topic string
|
||||
// HandlerFunc is the type for handlers that receive messages
|
||||
type HandlerFunc func(ctx context.Context, msg *azservicebus.ReceivedMessage) error
|
||||
|
||||
// Subscription is an object that manages a subscription to an Azure Service Bus receiver, for a topic or queue.
|
||||
type Subscription struct {
|
||||
entity string
|
||||
mu sync.RWMutex
|
||||
activeMessages map[string]*azservicebus.ReceivedMessage
|
||||
activeMessagesChan chan struct{}
|
||||
receiver *azservicebus.Receiver
|
||||
timeout time.Duration
|
||||
handlerTimeout time.Duration
|
||||
retriableErrLimit ratelimit.Limiter
|
||||
handleChan chan struct{}
|
||||
logger logger.Logger
|
||||
|
@ -41,25 +43,23 @@ type subscription struct {
|
|||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func newSubscription(
|
||||
// NewSubscription returns a new Subscription object.
|
||||
// Parameter "entity" is usually in the format "topic <topicname>" or "queue <queuename>" and it's only used for logging.
|
||||
func NewSubscription(
|
||||
parentCtx context.Context,
|
||||
topic string,
|
||||
receiver *azservicebus.Receiver,
|
||||
maxActiveMessages int,
|
||||
timeoutInSec int,
|
||||
handlerTimeoutInSec int,
|
||||
maxRetriableEPS int,
|
||||
maxConcurrentHandlers *int,
|
||||
entity string,
|
||||
logger logger.Logger,
|
||||
) *subscription {
|
||||
) *Subscription {
|
||||
ctx, cancel := context.WithCancel(parentCtx)
|
||||
s := &subscription{
|
||||
topic: topic,
|
||||
s := &Subscription{
|
||||
entity: entity,
|
||||
activeMessages: make(map[string]*azservicebus.ReceivedMessage),
|
||||
activeMessagesChan: make(chan struct{}, maxActiveMessages),
|
||||
receiver: receiver,
|
||||
timeout: time.Duration(timeoutInSec) * time.Second,
|
||||
handlerTimeout: time.Duration(handlerTimeoutInSec) * time.Second,
|
||||
logger: logger,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
|
@ -72,15 +72,45 @@ func newSubscription(
|
|||
}
|
||||
|
||||
if maxConcurrentHandlers != nil {
|
||||
s.logger.Debugf("Subscription to topic %s is limited to %d message handler(s)", topic, *maxConcurrentHandlers)
|
||||
s.logger.Debugf("Subscription to %s is limited to %d message handler(s)", entity, *maxConcurrentHandlers)
|
||||
s.handleChan = make(chan struct{}, *maxConcurrentHandlers)
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
// ReceiveAndBlock is a blocking call to receive messages on an Azure Service Bus subscription from a topic.
|
||||
func (s *subscription) ReceiveAndBlock(handler pubsub.Handler, lockRenewalInSec int, onFirstSuccess func()) error {
|
||||
// Connect to a Service Bus topic or queue, blocking until it succeeds; it can retry forever (until the context is canceled).
|
||||
func (s *Subscription) Connect(newReceiverFunc func() (*azservicebus.Receiver, error)) (err error) {
|
||||
// Connections need to retry forever with a maximum backoff of 5 minutes and exponential scaling.
|
||||
config := retry.DefaultConfig()
|
||||
config.Policy = retry.PolicyExponential
|
||||
config.MaxInterval = 5 * time.Minute
|
||||
config.MaxElapsedTime = 0
|
||||
backoff := config.NewBackOffWithContext(s.ctx)
|
||||
|
||||
err = retry.NotifyRecover(
|
||||
func() error {
|
||||
clientAttempt, innerErr := newReceiverFunc()
|
||||
if innerErr != nil {
|
||||
return innerErr
|
||||
}
|
||||
s.receiver = clientAttempt
|
||||
return nil
|
||||
},
|
||||
backoff,
|
||||
func(err error, d time.Duration) {
|
||||
s.logger.Warnf("Failed to connect to Azure Service Bus %s; will retry in %s. Error: %s", s.entity, d, err.Error())
|
||||
},
|
||||
func() {
|
||||
s.logger.Infof("Successfully reconnected to Azure Service Bus %s", s.entity)
|
||||
},
|
||||
)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// ReceiveAndBlock is a blocking call to receive messages on an Azure Service Bus subscription from a topic or queue.
|
||||
func (s *Subscription) ReceiveAndBlock(handler HandlerFunc, lockRenewalInSec int, onFirstSuccess func()) error {
|
||||
ctx, cancel := context.WithCancel(s.ctx)
|
||||
defer cancel()
|
||||
|
||||
|
@ -88,7 +118,7 @@ func (s *subscription) ReceiveAndBlock(handler pubsub.Handler, lockRenewalInSec
|
|||
go func() {
|
||||
shouldRenewLocks := lockRenewalInSec > 0
|
||||
if !shouldRenewLocks {
|
||||
s.logger.Debugf("Lock renewal for topic %s disabled", s.topic)
|
||||
s.logger.Debugf("Lock renewal for %s disabled", s.entity)
|
||||
return
|
||||
}
|
||||
t := time.NewTicker(time.Second * time.Duration(lockRenewalInSec))
|
||||
|
@ -96,7 +126,7 @@ func (s *subscription) ReceiveAndBlock(handler pubsub.Handler, lockRenewalInSec
|
|||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
s.logger.Debugf("Lock renewal context for topic %s done", s.topic)
|
||||
s.logger.Debugf("Lock renewal context for %s done", s.entity)
|
||||
return
|
||||
case <-t.C:
|
||||
s.tryRenewLocks()
|
||||
|
@ -104,8 +134,6 @@ func (s *subscription) ReceiveAndBlock(handler pubsub.Handler, lockRenewalInSec
|
|||
}
|
||||
}()
|
||||
|
||||
handlerFunc := s.getHandlerFunc(handler)
|
||||
|
||||
// Receiver loop
|
||||
for {
|
||||
select {
|
||||
|
@ -113,7 +141,7 @@ func (s *subscription) ReceiveAndBlock(handler pubsub.Handler, lockRenewalInSec
|
|||
case s.activeMessagesChan <- struct{}{}:
|
||||
// Return if context is canceled
|
||||
case <-ctx.Done():
|
||||
s.logger.Debugf("Receive context for topic %s done", s.topic)
|
||||
s.logger.Debugf("Receive context for %s done", s.entity)
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
|
@ -121,7 +149,7 @@ func (s *subscription) ReceiveAndBlock(handler pubsub.Handler, lockRenewalInSec
|
|||
msgs, err := s.receiver.ReceiveMessages(s.ctx, 1, nil)
|
||||
if err != nil {
|
||||
if err != context.Canceled {
|
||||
s.logger.Errorf("Error reading from topic %s. %s", s.topic, err.Error())
|
||||
s.logger.Errorf("Error reading from %s. %s", s.entity, err.Error())
|
||||
}
|
||||
// Return the error. This will cause the Service Bus component to try and reconnect.
|
||||
return err
|
||||
|
@ -145,62 +173,27 @@ func (s *subscription) ReceiveAndBlock(handler pubsub.Handler, lockRenewalInSec
|
|||
|
||||
msg := msgs[0]
|
||||
s.logger.Debugf("Received message: %s; current active message usage: %d/%d", msg.MessageID, len(s.activeMessagesChan), cap(s.activeMessagesChan))
|
||||
// body, _ := msg.Body()
|
||||
// s.logger.Debugf("Message body: %s", string(body))
|
||||
// s.logger.Debugf("Message body: %s", string(msg.Body))
|
||||
|
||||
s.addActiveMessage(msg)
|
||||
|
||||
s.logger.Debugf("Processing received message: %s", msg.MessageID)
|
||||
s.handleAsync(s.ctx, msg, handlerFunc)
|
||||
s.handleAsync(s.ctx, msg, handler)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *subscription) close(closeCtx context.Context) {
|
||||
s.logger.Debugf("Closing subscription to topic %s", s.topic)
|
||||
// Close the receiver and stops watching for new messages.
|
||||
func (s *Subscription) Close(closeCtx context.Context) {
|
||||
s.logger.Debugf("Closing subscription to %s", s.entity)
|
||||
|
||||
// Ensure subscription entity is closed.
|
||||
if err := s.receiver.Close(closeCtx); err != nil {
|
||||
s.logger.Warnf("%s closing subscription entity for topic %s: %+v", errorMessagePrefix, s.topic, err)
|
||||
s.logger.Warnf("Error closing subscription for %s: %+v", s.entity, err)
|
||||
}
|
||||
s.cancel()
|
||||
}
|
||||
|
||||
func (s *subscription) getHandlerFunc(handler pubsub.Handler) func(ctx context.Context, asbMsg *azservicebus.ReceivedMessage) (consumeToken bool, err error) {
|
||||
handlerTimeout := s.handlerTimeout
|
||||
timeout := s.timeout
|
||||
return func(ctx context.Context, asbMsg *azservicebus.ReceivedMessage) (consumeToken bool, err error) {
|
||||
pubsubMsg, err := NewPubsubMessageFromASBMessage(asbMsg, s.topic)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to get pubsub message from azure service bus message: %+v", err)
|
||||
}
|
||||
|
||||
handleCtx, handleCancel := context.WithTimeout(ctx, handlerTimeout)
|
||||
defer handleCancel()
|
||||
s.logger.Debugf("Calling app's handler for message %s on topic %s", asbMsg.MessageID, s.topic)
|
||||
appErr := handler(handleCtx, pubsubMsg)
|
||||
|
||||
// This context is used for the calls to service bus to finalize (i.e. complete/abandon) the message.
|
||||
// If we fail to finalize the message, this message will eventually be reprocessed (at-least once delivery).
|
||||
finalizeCtx, finalizeCancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer finalizeCancel()
|
||||
|
||||
if appErr != nil {
|
||||
s.logger.Warnf("Error in app's handler: %+v", appErr)
|
||||
if abandonErr := s.abandonMessage(finalizeCtx, asbMsg); abandonErr != nil {
|
||||
return true, fmt.Errorf("failed to abandon: %+v", abandonErr)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
if completeErr := s.completeMessage(finalizeCtx, asbMsg); completeErr != nil {
|
||||
return false, fmt.Errorf("failed to complete: %+v", completeErr)
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s *subscription) handleAsync(ctx context.Context, msg *azservicebus.ReceivedMessage, handlerFunc func(ctx context.Context, asbMsg *azservicebus.ReceivedMessage) (consumeToken bool, err error)) {
|
||||
func (s *Subscription) handleAsync(ctx context.Context, msg *azservicebus.ReceivedMessage, handler HandlerFunc) {
|
||||
go func() {
|
||||
var consumeToken bool
|
||||
var err error
|
||||
|
@ -211,7 +204,7 @@ func (s *subscription) handleAsync(ctx context.Context, msg *azservicebus.Receiv
|
|||
// Release a handler if needed
|
||||
if limitConcurrentHandlers {
|
||||
<-s.handleChan
|
||||
s.logger.Debugf("Released message handle for %s on topic %s", msg.MessageID, s.topic)
|
||||
s.logger.Debugf("Released message handle for %s on %s", msg.MessageID, s.entity)
|
||||
}
|
||||
|
||||
// If we got a retriable error (app handler returned a retriable error, or a network error while connecting to the app, etc) consume a retriable error token
|
||||
|
@ -231,27 +224,39 @@ func (s *subscription) handleAsync(ctx context.Context, msg *azservicebus.Receiv
|
|||
}()
|
||||
|
||||
if limitConcurrentHandlers {
|
||||
s.logger.Debugf("Taking message handle for %s on topic %s", msg.MessageID, s.topic)
|
||||
s.logger.Debugf("Taking message handle for %s on %s", msg.MessageID, s.entity)
|
||||
select {
|
||||
// Context is done, so we will stop waiting
|
||||
case <-ctx.Done():
|
||||
s.logger.Debugf("Message context done for %s on topic %s", msg.MessageID, s.topic)
|
||||
s.logger.Debugf("Message context done for %s on %s", msg.MessageID, s.entity)
|
||||
return
|
||||
// Blocks until we have a handler available
|
||||
case s.handleChan <- struct{}{}:
|
||||
s.logger.Debugf("Taken message handle for %s on topic %s", msg.MessageID, s.topic)
|
||||
s.logger.Debugf("Taken message handle for %s on %s", msg.MessageID, s.entity)
|
||||
}
|
||||
}
|
||||
|
||||
consumeToken, err = handlerFunc(ctx, msg)
|
||||
// Invoke the handler to process the message
|
||||
err = handler(ctx, msg)
|
||||
|
||||
// This context is used for the calls to service bus to finalize (i.e. complete/abandon) the message.
|
||||
// If we fail to finalize the message, this message will eventually be reprocessed (at-least once delivery).
|
||||
// This uses a background context in case ctx has been canceled already.
|
||||
finalizeCtx, finalizeCancel := context.WithTimeout(context.Background(), s.timeout)
|
||||
defer finalizeCancel()
|
||||
|
||||
if err != nil {
|
||||
// Log the error only, as we're running asynchronously
|
||||
s.logger.Errorf("%s error handling message %s on topic '%s', %s", errorMessagePrefix, msg.MessageID, s.topic, err)
|
||||
s.logger.Errorf("App handler returned an error for message %s on %s: %s", msg.MessageID, s.entity, err)
|
||||
s.AbandonMessage(finalizeCtx, msg)
|
||||
return
|
||||
}
|
||||
|
||||
s.CompleteMessage(finalizeCtx, msg)
|
||||
}()
|
||||
}
|
||||
|
||||
func (s *subscription) tryRenewLocks() {
|
||||
func (s *Subscription) tryRenewLocks() {
|
||||
// Snapshot the messages to try to renew locks for.
|
||||
msgs := make([]*azservicebus.ReceivedMessage, 0)
|
||||
s.mu.RLock()
|
||||
|
@ -260,12 +265,12 @@ func (s *subscription) tryRenewLocks() {
|
|||
}
|
||||
s.mu.RUnlock()
|
||||
if len(msgs) == 0 {
|
||||
s.logger.Debugf("No active messages require lock renewal for topic %s", s.topic)
|
||||
s.logger.Debugf("No active messages require lock renewal for %s", s.entity)
|
||||
return
|
||||
}
|
||||
|
||||
// Lock renewal is best effort and not guaranteed to succeed, warnings are expected.
|
||||
s.logger.Debugf("Trying to renew %d active message lock(s) for topic %s", len(msgs), s.topic)
|
||||
s.logger.Debugf("Trying to renew %d active message lock(s) for %s", len(msgs), s.entity)
|
||||
var err error
|
||||
var ctx context.Context
|
||||
var cancel context.CancelFunc
|
||||
|
@ -273,31 +278,52 @@ func (s *subscription) tryRenewLocks() {
|
|||
ctx, cancel = context.WithTimeout(context.Background(), s.timeout)
|
||||
err = s.receiver.RenewMessageLock(ctx, msg, nil)
|
||||
if err != nil {
|
||||
s.logger.Debugf("Couldn't renew all active message lock(s) for topic %s, ", s.topic, err)
|
||||
s.logger.Debugf("Couldn't renew all active message lock(s) for %s, ", s.entity, err)
|
||||
}
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *subscription) abandonMessage(ctx context.Context, m *azservicebus.ReceivedMessage) error {
|
||||
s.logger.Debugf("Abandoning message %s on topic %s", m.MessageID, s.topic)
|
||||
return s.receiver.AbandonMessage(ctx, m, nil)
|
||||
// AbandonMessage marks a messsage as abandoned.
|
||||
func (s *Subscription) AbandonMessage(ctx context.Context, m *azservicebus.ReceivedMessage) {
|
||||
s.logger.Debugf("Abandoning message %s on %s", m.MessageID, s.entity)
|
||||
|
||||
// Use a background context in case a.ctx has been canceled already
|
||||
err := s.receiver.AbandonMessage(ctx, m, nil)
|
||||
if err != nil {
|
||||
// Log only
|
||||
s.logger.Warnf("Error abandoning message %s on %s: %s", m.MessageID, s.entity, err.Error())
|
||||
}
|
||||
|
||||
// If we're here, it means we got a retriable error, so we need to consume a retriable error token before this (synchronous) method returns
|
||||
// If there have been too many retriable errors per second, this method slows the consumer down
|
||||
s.logger.Debugf("Taking a retriable error token")
|
||||
before := time.Now()
|
||||
_ = s.retriableErrLimit.Take()
|
||||
s.logger.Debugf("Resumed after pausing for %v", time.Now().Sub(before))
|
||||
}
|
||||
|
||||
func (s *subscription) completeMessage(ctx context.Context, m *azservicebus.ReceivedMessage) error {
|
||||
s.logger.Debugf("Completing message %s on topic %s", m.MessageID, s.topic)
|
||||
return s.receiver.CompleteMessage(ctx, m, nil)
|
||||
// CompleteMessage marks a message as complete.
|
||||
func (s *Subscription) CompleteMessage(ctx context.Context, m *azservicebus.ReceivedMessage) {
|
||||
s.logger.Debugf("Completing message %s on %s", m.MessageID, s.entity)
|
||||
|
||||
// Use a background context in case a.ctx has been canceled already
|
||||
err := s.receiver.CompleteMessage(ctx, m, nil)
|
||||
if err != nil {
|
||||
// Log only
|
||||
s.logger.Warnf("Error completing message %s on %s: %s", m.MessageID, s.entity, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func (s *subscription) addActiveMessage(m *azservicebus.ReceivedMessage) {
|
||||
s.logger.Debugf("Adding message %s to active messages on topic %s", m.MessageID, s.topic)
|
||||
func (s *Subscription) addActiveMessage(m *azservicebus.ReceivedMessage) {
|
||||
s.logger.Debugf("Adding message %s to active messages on %s", m.MessageID, s.entity)
|
||||
s.mu.Lock()
|
||||
s.activeMessages[m.MessageID] = m
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *subscription) removeActiveMessage(messageID string) {
|
||||
s.logger.Debugf("Removing message %s from active messages on topic %s", messageID, s.topic)
|
||||
func (s *Subscription) removeActiveMessage(messageID string) {
|
||||
s.logger.Debugf("Removing message %s from active messages on %s", messageID, s.entity)
|
||||
s.mu.Lock()
|
||||
delete(s.activeMessages, messageID)
|
||||
s.mu.Unlock()
|
|
@ -29,6 +29,7 @@ import (
|
|||
sbadmin "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin"
|
||||
|
||||
azauth "github.com/dapr/components-contrib/authentication/azure"
|
||||
impl "github.com/dapr/components-contrib/internal/component/azure/servicebus"
|
||||
contrib_metadata "github.com/dapr/components-contrib/metadata"
|
||||
"github.com/dapr/components-contrib/pubsub"
|
||||
"github.com/dapr/kit/logger"
|
||||
|
@ -368,8 +369,20 @@ func (a *azureServiceBus) Subscribe(subscribeCtx context.Context, req pubsub.Sub
|
|||
go func() {
|
||||
// Reconnect loop.
|
||||
for {
|
||||
sub := impl.NewSubscription(
|
||||
subscribeCtx,
|
||||
a.metadata.MaxActiveMessages,
|
||||
a.metadata.TimeoutInSec,
|
||||
a.metadata.MaxRetriableErrorsPerSec,
|
||||
a.metadata.MaxConcurrentHandlers,
|
||||
"topic "+req.Topic,
|
||||
a.logger,
|
||||
)
|
||||
|
||||
// Blocks until a successful connection (or until context is canceled)
|
||||
receiver, err := a.attemptConnectionForever(subscribeCtx, req.Topic, subID)
|
||||
err := sub.Connect(func() (*servicebus.Receiver, error) {
|
||||
return a.client.NewReceiverForSubscription(req.Topic, subID, nil)
|
||||
})
|
||||
if err != nil {
|
||||
// Realistically, the only time we should get to this point is if the context was canceled, but let's log any other error we may get.
|
||||
if err != context.Canceled {
|
||||
|
@ -377,47 +390,32 @@ func (a *azureServiceBus) Subscribe(subscribeCtx context.Context, req pubsub.Sub
|
|||
}
|
||||
return
|
||||
}
|
||||
sub := newSubscription(
|
||||
subscribeCtx,
|
||||
req.Topic,
|
||||
receiver,
|
||||
a.metadata.MaxActiveMessages,
|
||||
a.metadata.TimeoutInSec,
|
||||
a.metadata.HandlerTimeoutInSec,
|
||||
a.metadata.MaxRetriableErrorsPerSec,
|
||||
a.metadata.MaxConcurrentHandlers,
|
||||
a.logger,
|
||||
)
|
||||
|
||||
// ReceiveAndBlock will only return with an error
|
||||
// that it cannot handle internally. The subscription
|
||||
// connection is closed when this method returns.
|
||||
// If that occurs, we will log the error and attempt
|
||||
// to re-establish the subscription connection until
|
||||
// we exhaust the number of reconnect attempts.
|
||||
innerErr := sub.ReceiveAndBlock(
|
||||
handler,
|
||||
// ReceiveAndBlock will only return with an error that it cannot handle internally. The subscription connection is closed when this method returns.
|
||||
// If that occurs, we will log the error and attempt to re-establish the subscription connection until we exhaust the number of reconnect attempts.
|
||||
err = sub.ReceiveAndBlock(
|
||||
a.getHandlerFunc(req.Topic, handler),
|
||||
a.metadata.LockRenewalInSec,
|
||||
func() {
|
||||
// Reset the backoff when the subscription is successful and we have received the first message
|
||||
bo.Reset()
|
||||
},
|
||||
)
|
||||
if innerErr != nil {
|
||||
if err != nil {
|
||||
var detachError *amqp.DetachError
|
||||
var amqpError *amqp.Error
|
||||
if errors.Is(innerErr, detachError) ||
|
||||
(errors.As(innerErr, &amqpError) && amqpError.Condition == amqp.ErrorDetachForced) {
|
||||
a.logger.Debug(innerErr)
|
||||
if errors.Is(err, detachError) ||
|
||||
(errors.As(err, &amqpError) && amqpError.Condition == amqp.ErrorDetachForced) {
|
||||
a.logger.Debug(err)
|
||||
} else {
|
||||
a.logger.Error(innerErr)
|
||||
a.logger.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Gracefully close the connection (in case it's not closed already)
|
||||
// Use a background context here (with timeout) because ctx may be closed already
|
||||
closeCtx, closeCancel := context.WithTimeout(context.Background(), time.Second*time.Duration(a.metadata.TimeoutInSec))
|
||||
sub.close(closeCtx)
|
||||
sub.Close(closeCtx)
|
||||
closeCancel()
|
||||
|
||||
// If context was canceled, do not attempt to reconnect
|
||||
|
@ -427,7 +425,7 @@ func (a *azureServiceBus) Subscribe(subscribeCtx context.Context, req pubsub.Sub
|
|||
}
|
||||
|
||||
wait := bo.NextBackOff()
|
||||
a.logger.Warnf("Subscription to topic %s lost connection, attempting to reconnect in %s...", sub.topic, wait)
|
||||
a.logger.Warnf("Subscription to topic %s lost connection, attempting to reconnect in %s...", req.Topic, wait)
|
||||
time.Sleep(wait)
|
||||
}
|
||||
}()
|
||||
|
@ -435,34 +433,18 @@ func (a *azureServiceBus) Subscribe(subscribeCtx context.Context, req pubsub.Sub
|
|||
return nil
|
||||
}
|
||||
|
||||
// Attempts to connect to a Service Bus topic and blocks until it succeeds; it can retry forever (until the context is canceled)
|
||||
func (a *azureServiceBus) attemptConnectionForever(ctx context.Context, topicName string, subscriptionName string) (receiver *servicebus.Receiver, err error) {
|
||||
// Connections need to retry forever with a maximum backoff of 5 minutes and exponential scaling.
|
||||
config := retry.DefaultConfig()
|
||||
config.Policy = retry.PolicyExponential
|
||||
config.MaxInterval = 5 * time.Minute
|
||||
config.MaxElapsedTime = 0
|
||||
backoff := config.NewBackOffWithContext(ctx)
|
||||
func (a *azureServiceBus) getHandlerFunc(topic string, handler pubsub.Handler) impl.HandlerFunc {
|
||||
return func(ctx context.Context, asbMsg *servicebus.ReceivedMessage) error {
|
||||
pubsubMsg, err := NewPubsubMessageFromASBMessage(asbMsg, topic)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get pubsub message from azure service bus message: %+v", err)
|
||||
}
|
||||
|
||||
err = retry.NotifyRecover(
|
||||
func() error {
|
||||
clientAttempt, innerErr := a.client.NewReceiverForSubscription(topicName, subscriptionName, nil)
|
||||
if innerErr != nil {
|
||||
return innerErr
|
||||
}
|
||||
receiver = clientAttempt
|
||||
return nil
|
||||
},
|
||||
backoff,
|
||||
func(err error, d time.Duration) {
|
||||
a.logger.Warnf("Failed to connect to Azure Service Bus topic %s; will retry in %s. Error: %s", topicName, d, err.Error())
|
||||
},
|
||||
func() {
|
||||
a.logger.Infof("Successfully reconnected to Azure Service Bus topic %s", topicName)
|
||||
backoff.Reset()
|
||||
},
|
||||
)
|
||||
return receiver, err
|
||||
handleCtx, handleCancel := context.WithTimeout(ctx, time.Duration(a.metadata.HandlerTimeoutInSec)*time.Second)
|
||||
defer handleCancel()
|
||||
a.logger.Debugf("Calling app's handler for message %s on topic %s", asbMsg.MessageID, topic)
|
||||
return handler(handleCtx, pubsubMsg)
|
||||
}
|
||||
}
|
||||
|
||||
// senderForTopic returns the sender for a topic, or creates a new one if it doesn't exist
|
||||
|
|
|
@ -18,8 +18,8 @@ require (
|
|||
github.com/AdhityaRamadhanus/fasthttpcors v0.0.0-20170121111917-d4c07198763a // indirect
|
||||
github.com/Azure/azure-amqp-common-go/v3 v3.2.3 // indirect
|
||||
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 // indirect
|
||||
github.com/Azure/azure-storage-blob-go v0.10.0 // indirect
|
||||
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
|
||||
|
@ -30,7 +30,7 @@ require (
|
|||
github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect
|
||||
github.com/Azure/go-autorest/logger v0.2.1 // indirect
|
||||
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 // indirect
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 // indirect
|
||||
github.com/PuerkitoBio/purell v1.1.1 // indirect
|
||||
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
|
||||
github.com/andybalholm/brotli v1.0.2 // indirect
|
||||
|
|
|
@ -48,10 +48,10 @@ github.com/Azure/azure-amqp-common-go/v3 v3.2.3/go.mod h1:7rPmbSfszeovxGfc5fSAXE
|
|||
github.com/Azure/azure-pipeline-go v0.2.2/go.mod h1:4rQ/NZncSvGqNkkOsNpOU1tgoNuIlp9AfUH5G1tvCHc=
|
||||
github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U=
|
||||
github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0 h1:sVPhtT2qjO86rTUaWMr4WoES4TkjGnzcioXcnHV9s5k=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0 h1:Yoicul8bnVdQrhDMTHxdEckRGX01XvwXDHUT9zYZ3k0=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0/go.mod h1:+6sju8gk8FRmSajX3Oz4G5Gm7P+mbqE9FVaXXFYTkCM=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 h1:Ut0ZGdOwJDw0npYEg+TLlPls3Pq6JiZaP2/aGKir7Zw=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 h1:QkAcEIAKbNL4KoFr4SathZPhDhF4mVwpBMFlYjyAqy8=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 h1:jp0dGvZ7ZK0mgqnTSClMxa5xuRL7NZgHameVYF6BurY=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.4.0 h1:0nJeKDmB7a1a8RDMjTltahlPsaNlWjq/LpkZleSwINk=
|
||||
|
@ -93,8 +93,8 @@ github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ
|
|||
github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk=
|
||||
github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo=
|
||||
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 h1:WVsrXCnHlDDX8ls+tootqRE87/hL9S/g4ewig9RsD/c=
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4=
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 h1:BWe8a+f/t+7KY7zH2mqygeUD0t8hNFXe08p1Pb3/jKE=
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
|
||||
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
|
||||
|
|
|
@ -19,8 +19,8 @@ require (
|
|||
github.com/AdhityaRamadhanus/fasthttpcors v0.0.0-20170121111917-d4c07198763a // indirect
|
||||
github.com/Azure/azure-amqp-common-go/v3 v3.2.3 // indirect
|
||||
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 // indirect
|
||||
github.com/Azure/azure-storage-blob-go v0.10.0 // indirect
|
||||
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
|
||||
|
@ -31,7 +31,7 @@ require (
|
|||
github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect
|
||||
github.com/Azure/go-autorest/logger v0.2.1 // indirect
|
||||
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 // indirect
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 // indirect
|
||||
github.com/PuerkitoBio/purell v1.1.1 // indirect
|
||||
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
|
||||
github.com/andybalholm/brotli v1.0.2 // indirect
|
||||
|
|
|
@ -48,10 +48,10 @@ github.com/Azure/azure-amqp-common-go/v3 v3.2.3/go.mod h1:7rPmbSfszeovxGfc5fSAXE
|
|||
github.com/Azure/azure-pipeline-go v0.2.2/go.mod h1:4rQ/NZncSvGqNkkOsNpOU1tgoNuIlp9AfUH5G1tvCHc=
|
||||
github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U=
|
||||
github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0 h1:sVPhtT2qjO86rTUaWMr4WoES4TkjGnzcioXcnHV9s5k=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0 h1:Yoicul8bnVdQrhDMTHxdEckRGX01XvwXDHUT9zYZ3k0=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0/go.mod h1:+6sju8gk8FRmSajX3Oz4G5Gm7P+mbqE9FVaXXFYTkCM=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 h1:Ut0ZGdOwJDw0npYEg+TLlPls3Pq6JiZaP2/aGKir7Zw=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 h1:QkAcEIAKbNL4KoFr4SathZPhDhF4mVwpBMFlYjyAqy8=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 h1:jp0dGvZ7ZK0mgqnTSClMxa5xuRL7NZgHameVYF6BurY=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
|
||||
github.com/Azure/azure-storage-blob-go v0.10.0 h1:evCwGreYo3XLeBV4vSxLbLiYb6e0SzsJiXQVRGsRXxs=
|
||||
|
@ -91,8 +91,8 @@ github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ
|
|||
github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk=
|
||||
github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo=
|
||||
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 h1:WVsrXCnHlDDX8ls+tootqRE87/hL9S/g4ewig9RsD/c=
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4=
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 h1:BWe8a+f/t+7KY7zH2mqygeUD0t8hNFXe08p1Pb3/jKE=
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
|
||||
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
|
||||
|
|
|
@ -20,9 +20,9 @@ require (
|
|||
github.com/Azure/azure-amqp-common-go/v3 v3.2.3 // indirect
|
||||
github.com/Azure/azure-event-hubs-go/v3 v3.3.18 // indirect
|
||||
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
|
||||
github.com/Azure/azure-sdk-for-go v63.4.0+incompatible // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0 // indirect
|
||||
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 // indirect
|
||||
github.com/Azure/azure-storage-blob-go v0.10.0 // indirect
|
||||
github.com/Azure/go-amqp v0.17.4 // indirect
|
||||
|
@ -36,7 +36,7 @@ require (
|
|||
github.com/Azure/go-autorest/autorest/validation v0.3.1 // indirect
|
||||
github.com/Azure/go-autorest/logger v0.2.1 // indirect
|
||||
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 // indirect
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 // indirect
|
||||
github.com/PuerkitoBio/purell v1.1.1 // indirect
|
||||
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
|
||||
github.com/andybalholm/brotli v1.0.2 // indirect
|
||||
|
|
|
@ -53,12 +53,12 @@ github.com/Azure/azure-pipeline-go v0.2.2/go.mod h1:4rQ/NZncSvGqNkkOsNpOU1tgoNuI
|
|||
github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U=
|
||||
github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k=
|
||||
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
|
||||
github.com/Azure/azure-sdk-for-go v63.4.0+incompatible h1:fle3M5Q7vr8auaiPffKyUQmLbvYeqpw30bKU6PrWJFo=
|
||||
github.com/Azure/azure-sdk-for-go v63.4.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0 h1:sVPhtT2qjO86rTUaWMr4WoES4TkjGnzcioXcnHV9s5k=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0 h1:Yoicul8bnVdQrhDMTHxdEckRGX01XvwXDHUT9zYZ3k0=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0/go.mod h1:+6sju8gk8FRmSajX3Oz4G5Gm7P+mbqE9FVaXXFYTkCM=
|
||||
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible h1:HzKLt3kIwMm4KeJYTdx9EbjRYTySD/t8i1Ee/W5EGXw=
|
||||
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 h1:Ut0ZGdOwJDw0npYEg+TLlPls3Pq6JiZaP2/aGKir7Zw=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 h1:QkAcEIAKbNL4KoFr4SathZPhDhF4mVwpBMFlYjyAqy8=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 h1:jp0dGvZ7ZK0mgqnTSClMxa5xuRL7NZgHameVYF6BurY=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
|
||||
github.com/Azure/azure-storage-blob-go v0.6.0/go.mod h1:oGfmITT1V6x//CswqY2gtAHND+xIP64/qL7a5QJix0Y=
|
||||
|
@ -110,8 +110,8 @@ github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ
|
|||
github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk=
|
||||
github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo=
|
||||
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 h1:WVsrXCnHlDDX8ls+tootqRE87/hL9S/g4ewig9RsD/c=
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4=
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 h1:BWe8a+f/t+7KY7zH2mqygeUD0t8hNFXe08p1Pb3/jKE=
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
|
||||
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
|
||||
|
|
|
@ -18,11 +18,12 @@ require (
|
|||
github.com/AdhityaRamadhanus/fasthttpcors v0.0.0-20170121111917-d4c07198763a // indirect
|
||||
github.com/Azure/azure-amqp-common-go/v3 v3.2.3 // indirect
|
||||
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.0.1 // indirect
|
||||
github.com/Azure/azure-storage-blob-go v0.10.0 // indirect
|
||||
github.com/Azure/go-amqp v0.17.4 // indirect
|
||||
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
|
||||
github.com/Azure/go-autorest/autorest v0.11.27 // indirect
|
||||
github.com/Azure/go-autorest/autorest/adal v0.9.18 // indirect
|
||||
|
@ -31,7 +32,7 @@ require (
|
|||
github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect
|
||||
github.com/Azure/go-autorest/logger v0.2.1 // indirect
|
||||
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 // indirect
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 // indirect
|
||||
github.com/PuerkitoBio/purell v1.1.1 // indirect
|
||||
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
|
||||
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 // indirect
|
||||
|
|
|
@ -48,10 +48,10 @@ github.com/Azure/azure-amqp-common-go/v3 v3.2.3/go.mod h1:7rPmbSfszeovxGfc5fSAXE
|
|||
github.com/Azure/azure-pipeline-go v0.2.2/go.mod h1:4rQ/NZncSvGqNkkOsNpOU1tgoNuIlp9AfUH5G1tvCHc=
|
||||
github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U=
|
||||
github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0 h1:sVPhtT2qjO86rTUaWMr4WoES4TkjGnzcioXcnHV9s5k=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0 h1:Yoicul8bnVdQrhDMTHxdEckRGX01XvwXDHUT9zYZ3k0=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0/go.mod h1:+6sju8gk8FRmSajX3Oz4G5Gm7P+mbqE9FVaXXFYTkCM=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 h1:Ut0ZGdOwJDw0npYEg+TLlPls3Pq6JiZaP2/aGKir7Zw=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 h1:QkAcEIAKbNL4KoFr4SathZPhDhF4mVwpBMFlYjyAqy8=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 h1:jp0dGvZ7ZK0mgqnTSClMxa5xuRL7NZgHameVYF6BurY=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.0.1 h1:hOHIC1pSoJsFrXBQlXYt+w0OKAx1MzCr4KiLXjylyac=
|
||||
|
@ -61,6 +61,7 @@ github.com/Azure/azure-storage-blob-go v0.10.0 h1:evCwGreYo3XLeBV4vSxLbLiYb6e0Sz
|
|||
github.com/Azure/azure-storage-blob-go v0.10.0/go.mod h1:ep1edmW+kNQx4UfWM9heESNmQdijykocJ0YOxmMX8SE=
|
||||
github.com/Azure/go-amqp v0.17.0/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg=
|
||||
github.com/Azure/go-amqp v0.17.4 h1:6t9wEiwA4uXMRoUj3Cd3K2gmH8cW8ylizmBnSeF0bzM=
|
||||
github.com/Azure/go-amqp v0.17.4/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg=
|
||||
github.com/Azure/go-ansiterm v0.0.0-20210608223527-2377c96fe795/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8=
|
||||
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
|
||||
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
|
||||
|
@ -96,8 +97,8 @@ github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ
|
|||
github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk=
|
||||
github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo=
|
||||
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 h1:WVsrXCnHlDDX8ls+tootqRE87/hL9S/g4ewig9RsD/c=
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4=
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 h1:BWe8a+f/t+7KY7zH2mqygeUD0t8hNFXe08p1Pb3/jKE=
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
|
||||
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
|
||||
|
@ -230,6 +231,7 @@ github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYF
|
|||
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
|
||||
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
|
||||
github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
|
||||
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
|
||||
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
|
||||
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
|
||||
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
|
||||
|
|
|
@ -20,9 +20,9 @@ require (
|
|||
github.com/Azure/azure-amqp-common-go/v3 v3.2.3 // indirect
|
||||
github.com/Azure/azure-event-hubs-go/v3 v3.3.18 // indirect
|
||||
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
|
||||
github.com/Azure/azure-sdk-for-go v63.4.0+incompatible // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0 // indirect
|
||||
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 // indirect
|
||||
github.com/Azure/azure-storage-blob-go v0.10.0 // indirect
|
||||
github.com/Azure/go-amqp v0.17.4 // indirect
|
||||
|
@ -36,7 +36,7 @@ require (
|
|||
github.com/Azure/go-autorest/autorest/validation v0.3.1 // indirect
|
||||
github.com/Azure/go-autorest/logger v0.2.1 // indirect
|
||||
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 // indirect
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 // indirect
|
||||
github.com/PuerkitoBio/purell v1.1.1 // indirect
|
||||
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
|
||||
github.com/andybalholm/brotli v1.0.2 // indirect
|
||||
|
|
|
@ -53,12 +53,12 @@ github.com/Azure/azure-pipeline-go v0.2.2/go.mod h1:4rQ/NZncSvGqNkkOsNpOU1tgoNuI
|
|||
github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U=
|
||||
github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k=
|
||||
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
|
||||
github.com/Azure/azure-sdk-for-go v63.4.0+incompatible h1:fle3M5Q7vr8auaiPffKyUQmLbvYeqpw30bKU6PrWJFo=
|
||||
github.com/Azure/azure-sdk-for-go v63.4.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0 h1:sVPhtT2qjO86rTUaWMr4WoES4TkjGnzcioXcnHV9s5k=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0 h1:Yoicul8bnVdQrhDMTHxdEckRGX01XvwXDHUT9zYZ3k0=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0/go.mod h1:+6sju8gk8FRmSajX3Oz4G5Gm7P+mbqE9FVaXXFYTkCM=
|
||||
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible h1:HzKLt3kIwMm4KeJYTdx9EbjRYTySD/t8i1Ee/W5EGXw=
|
||||
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 h1:Ut0ZGdOwJDw0npYEg+TLlPls3Pq6JiZaP2/aGKir7Zw=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 h1:QkAcEIAKbNL4KoFr4SathZPhDhF4mVwpBMFlYjyAqy8=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 h1:jp0dGvZ7ZK0mgqnTSClMxa5xuRL7NZgHameVYF6BurY=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
|
||||
github.com/Azure/azure-storage-blob-go v0.6.0/go.mod h1:oGfmITT1V6x//CswqY2gtAHND+xIP64/qL7a5QJix0Y=
|
||||
|
@ -110,8 +110,8 @@ github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ
|
|||
github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk=
|
||||
github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo=
|
||||
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 h1:WVsrXCnHlDDX8ls+tootqRE87/hL9S/g4ewig9RsD/c=
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4=
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 h1:BWe8a+f/t+7KY7zH2mqygeUD0t8hNFXe08p1Pb3/jKE=
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
|
||||
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
|
||||
|
|
|
@ -17,8 +17,8 @@ require (
|
|||
github.com/AdhityaRamadhanus/fasthttpcors v0.0.0-20170121111917-d4c07198763a // indirect
|
||||
github.com/Azure/azure-amqp-common-go/v3 v3.2.3 // indirect
|
||||
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets v0.7.1 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/keyvault/internal v0.5.0 // indirect
|
||||
|
@ -31,7 +31,7 @@ require (
|
|||
github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect
|
||||
github.com/Azure/go-autorest/logger v0.2.1 // indirect
|
||||
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 // indirect
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 // indirect
|
||||
github.com/PuerkitoBio/purell v1.1.1 // indirect
|
||||
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
|
||||
github.com/andybalholm/brotli v1.0.2 // indirect
|
||||
|
|
|
@ -48,10 +48,10 @@ github.com/Azure/azure-amqp-common-go/v3 v3.2.3/go.mod h1:7rPmbSfszeovxGfc5fSAXE
|
|||
github.com/Azure/azure-pipeline-go v0.2.2/go.mod h1:4rQ/NZncSvGqNkkOsNpOU1tgoNuIlp9AfUH5G1tvCHc=
|
||||
github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U=
|
||||
github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0 h1:sVPhtT2qjO86rTUaWMr4WoES4TkjGnzcioXcnHV9s5k=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0 h1:Yoicul8bnVdQrhDMTHxdEckRGX01XvwXDHUT9zYZ3k0=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0/go.mod h1:+6sju8gk8FRmSajX3Oz4G5Gm7P+mbqE9FVaXXFYTkCM=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 h1:Ut0ZGdOwJDw0npYEg+TLlPls3Pq6JiZaP2/aGKir7Zw=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 h1:QkAcEIAKbNL4KoFr4SathZPhDhF4mVwpBMFlYjyAqy8=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 h1:jp0dGvZ7ZK0mgqnTSClMxa5xuRL7NZgHameVYF6BurY=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets v0.7.1 h1:X7FHRMKr0u5YiPnD6L/nqG64XBOcK0IYavhAHBQEmms=
|
||||
|
@ -95,8 +95,8 @@ github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ
|
|||
github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk=
|
||||
github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo=
|
||||
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 h1:WVsrXCnHlDDX8ls+tootqRE87/hL9S/g4ewig9RsD/c=
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4=
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 h1:BWe8a+f/t+7KY7zH2mqygeUD0t8hNFXe08p1Pb3/jKE=
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
|
||||
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
|
||||
|
|
Loading…
Reference in New Issue