215 lines
8.1 KiB
Go
215 lines
8.1 KiB
Go
/*
|
|
Copyright 2023 The Dapr Authors
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package eventhubs
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/azcore/arm"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub"
|
|
|
|
azauth "github.com/dapr/components-contrib/common/authentication/azure"
|
|
"github.com/dapr/kit/logger"
|
|
"github.com/dapr/kit/ptr"
|
|
"github.com/dapr/kit/retry"
|
|
)
|
|
|
|
const (
|
|
defaultMessageRetentionInDays = 1
|
|
defaultPartitionCount = 1
|
|
|
|
resourceCheckMaxRetry = 5
|
|
resourceCheckMaxRetryInterval = 5 * time.Minute
|
|
resourceCreationTimeout = 15 * time.Second
|
|
resourceGetTimeout = 5 * time.Second
|
|
|
|
// See https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quotas for numbers.
|
|
maxMessageRetention = int32(90)
|
|
maxPartitionCount = int32(1024)
|
|
)
|
|
|
|
// Intializes the entity management capabilities. This method is invoked by Init.
|
|
func (aeh *AzureEventHubs) initEntityManagement() error {
|
|
// Validate the metadata
|
|
err := aeh.validateEnitityManagementMetadata()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Get Azure Management plane credentials object
|
|
settings, err := azauth.NewEnvironmentSettings(aeh.metadata.properties)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
creds, err := settings.GetTokenCredential()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to obtain Azure AD management credentials: %w", err)
|
|
}
|
|
aeh.managementCreds = creds
|
|
return nil
|
|
}
|
|
|
|
func (aeh *AzureEventHubs) validateEnitityManagementMetadata() error {
|
|
if aeh.metadata.MessageRetentionInDays <= 0 || aeh.metadata.MessageRetentionInDays > maxMessageRetention {
|
|
aeh.logger.Warnf("Property messageRetentionInDays for entity management has an empty or invalid value; using the default value %d", defaultMessageRetentionInDays)
|
|
aeh.metadata.MessageRetentionInDays = defaultMessageRetentionInDays
|
|
}
|
|
if aeh.metadata.PartitionCount <= 0 || aeh.metadata.PartitionCount > maxPartitionCount {
|
|
aeh.logger.Warnf("Property partitionCount for entity management has an empty or invalid value; using the default value %d", defaultPartitionCount)
|
|
aeh.metadata.PartitionCount = defaultPartitionCount
|
|
}
|
|
if aeh.metadata.ResourceGroupName == "" {
|
|
return errors.New("property resourceGroupName is required for entity management")
|
|
}
|
|
if aeh.metadata.SubscriptionID == "" {
|
|
return errors.New("property subscriptionID is required for entity management")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Ensures that the Event Hub entity exists.
|
|
// This is used during the creation of both producers and consumers.
|
|
func (aeh *AzureEventHubs) ensureEventHubEntity(parentCtx context.Context, topic string) error {
|
|
client, err := armeventhub.NewEventHubsClient(aeh.metadata.SubscriptionID, aeh.managementCreds, &arm.ClientOptions{
|
|
ClientOptions: policy.ClientOptions{
|
|
Telemetry: policy.TelemetryOptions{
|
|
ApplicationID: "dapr-" + logger.DaprVersion,
|
|
},
|
|
},
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create Event Hubs ARM client: %w", err)
|
|
}
|
|
|
|
aeh.logger.Debugf("Checking if entity %s exists on Event Hub namespace %s", topic, aeh.metadata.namespaceName)
|
|
|
|
// Check if the entity exists
|
|
ctx, cancel := context.WithTimeout(parentCtx, resourceGetTimeout)
|
|
defer cancel()
|
|
_, err = client.Get(ctx, aeh.metadata.ResourceGroupName, aeh.metadata.namespaceName, topic, nil)
|
|
if err == nil {
|
|
// If there's no error, the entity already exists, so all good
|
|
aeh.logger.Debugf("Entity %s already exists on Event Hub namespace %s", topic, aeh.metadata.namespaceName)
|
|
return nil
|
|
}
|
|
|
|
// Check if the error is NotFound or something else
|
|
resErr := &azcore.ResponseError{}
|
|
if !errors.As(err, &resErr) || resErr.StatusCode != http.StatusNotFound {
|
|
// We have another error, just return it
|
|
return fmt.Errorf("failed to retrieve Event Hub entity %s: %w", topic, err)
|
|
}
|
|
|
|
// Create the entity
|
|
aeh.logger.Infof("Will create entity %s on Event Hub namespace %s", topic, aeh.metadata.namespaceName)
|
|
params := armeventhub.Eventhub{
|
|
Properties: &armeventhub.Properties{
|
|
MessageRetentionInDays: ptr.Of(int64(aeh.metadata.MessageRetentionInDays)),
|
|
PartitionCount: ptr.Of(int64(aeh.metadata.PartitionCount)),
|
|
},
|
|
}
|
|
ctx, cancel = context.WithTimeout(parentCtx, resourceCreationTimeout)
|
|
defer cancel()
|
|
_, err = client.CreateOrUpdate(ctx, aeh.metadata.ResourceGroupName, aeh.metadata.namespaceName, topic, params, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create Event Hub entity %s: %w", topic, err)
|
|
}
|
|
|
|
aeh.logger.Infof("Entity %s created on Event Hub namespace %s", topic, aeh.metadata.namespaceName)
|
|
return nil
|
|
}
|
|
|
|
// Ensures that the subscription (consumer group) exists.
|
|
// This is used during the creation of consumers only.
|
|
func (aeh *AzureEventHubs) ensureSubscription(parentCtx context.Context, hubName string) error {
|
|
client, err := armeventhub.NewConsumerGroupsClient(aeh.metadata.SubscriptionID, aeh.managementCreds, &arm.ClientOptions{
|
|
ClientOptions: policy.ClientOptions{
|
|
Telemetry: policy.TelemetryOptions{
|
|
ApplicationID: "dapr-" + logger.DaprVersion,
|
|
},
|
|
},
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create consumer group ARM client: %w", err)
|
|
}
|
|
|
|
aeh.logger.Debugf("Checking if consumer group %s exists in entity %s", aeh.metadata.ConsumerGroup, hubName)
|
|
|
|
// Check if the consumer group exists
|
|
// We need to use a retry logic here
|
|
backOffConfig := retry.DefaultConfig()
|
|
backOffConfig.Policy = retry.PolicyExponential
|
|
backOffConfig.MaxInterval = resourceCheckMaxRetryInterval
|
|
backOffConfig.MaxRetries = resourceCheckMaxRetry
|
|
b := backOffConfig.NewBackOffWithContext(parentCtx)
|
|
create, err := retry.NotifyRecoverWithData(func() (bool, error) {
|
|
c, cErr := aeh.shouldCreateConsumerGroup(parentCtx, client, hubName)
|
|
if cErr != nil {
|
|
return false, cErr
|
|
}
|
|
return c, nil
|
|
}, b, func(_ error, _ time.Duration) {
|
|
aeh.logger.Errorf("Error checking for consumer group for Event Hub: %s. Retrying…", hubName)
|
|
}, func() {
|
|
aeh.logger.Warnf("Successfully checked for consumer group in Event Hub %s after it previously failed", hubName)
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !create {
|
|
// Already exists
|
|
aeh.logger.Debugf("Consumer group %s already exists in entity %s", aeh.metadata.ConsumerGroup, hubName)
|
|
return nil
|
|
}
|
|
|
|
// Need to create the consumer group
|
|
aeh.logger.Infof("Will create consumer group %s exists in entity %s", aeh.metadata.ConsumerGroup, hubName)
|
|
|
|
ctx, cancel := context.WithTimeout(parentCtx, resourceCreationTimeout)
|
|
defer cancel()
|
|
_, err = client.CreateOrUpdate(ctx, aeh.metadata.ResourceGroupName, aeh.metadata.namespaceName, hubName, aeh.metadata.ConsumerGroup, armeventhub.ConsumerGroup{}, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create consumer group %s: %w", aeh.metadata.ConsumerGroup, err)
|
|
}
|
|
|
|
aeh.logger.Infof("Consumer group %s created in entity %s", aeh.metadata.ConsumerGroup, hubName)
|
|
return nil
|
|
}
|
|
|
|
func (aeh *AzureEventHubs) shouldCreateConsumerGroup(parentCtx context.Context, client *armeventhub.ConsumerGroupsClient, hubName string) (bool, error) {
|
|
ctx, cancel := context.WithTimeout(parentCtx, resourceGetTimeout)
|
|
defer cancel()
|
|
_, err := client.Get(ctx, aeh.metadata.ResourceGroupName, aeh.metadata.namespaceName, hubName, aeh.metadata.ConsumerGroup, nil)
|
|
if err == nil {
|
|
// If there's no error, the consumer group already exists, so all good
|
|
return false, nil
|
|
}
|
|
|
|
// Check if the error is NotFound or something else
|
|
resErr := &azcore.ResponseError{}
|
|
if !errors.As(err, &resErr) || resErr.StatusCode != http.StatusNotFound {
|
|
// We have another error, just return it
|
|
return false, err
|
|
}
|
|
|
|
// Consumer group doesn't exist
|
|
return true, nil
|
|
}
|