components-contrib/common/component/azure/eventhubs/entity_management.go

215 lines
8.1 KiB
Go

/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package eventhubs
import (
"context"
"errors"
"fmt"
"net/http"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/arm"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub"
azauth "github.com/dapr/components-contrib/common/authentication/azure"
"github.com/dapr/kit/logger"
"github.com/dapr/kit/ptr"
"github.com/dapr/kit/retry"
)
const (
defaultMessageRetentionInDays = 1
defaultPartitionCount = 1
resourceCheckMaxRetry = 5
resourceCheckMaxRetryInterval = 5 * time.Minute
resourceCreationTimeout = 15 * time.Second
resourceGetTimeout = 5 * time.Second
// See https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quotas for numbers.
maxMessageRetention = int32(90)
maxPartitionCount = int32(1024)
)
// Intializes the entity management capabilities. This method is invoked by Init.
func (aeh *AzureEventHubs) initEntityManagement() error {
// Validate the metadata
err := aeh.validateEnitityManagementMetadata()
if err != nil {
return err
}
// Get Azure Management plane credentials object
settings, err := azauth.NewEnvironmentSettings(aeh.metadata.properties)
if err != nil {
return err
}
creds, err := settings.GetTokenCredential()
if err != nil {
return fmt.Errorf("failed to obtain Azure AD management credentials: %w", err)
}
aeh.managementCreds = creds
return nil
}
func (aeh *AzureEventHubs) validateEnitityManagementMetadata() error {
if aeh.metadata.MessageRetentionInDays <= 0 || aeh.metadata.MessageRetentionInDays > maxMessageRetention {
aeh.logger.Warnf("Property messageRetentionInDays for entity management has an empty or invalid value; using the default value %d", defaultMessageRetentionInDays)
aeh.metadata.MessageRetentionInDays = defaultMessageRetentionInDays
}
if aeh.metadata.PartitionCount <= 0 || aeh.metadata.PartitionCount > maxPartitionCount {
aeh.logger.Warnf("Property partitionCount for entity management has an empty or invalid value; using the default value %d", defaultPartitionCount)
aeh.metadata.PartitionCount = defaultPartitionCount
}
if aeh.metadata.ResourceGroupName == "" {
return errors.New("property resourceGroupName is required for entity management")
}
if aeh.metadata.SubscriptionID == "" {
return errors.New("property subscriptionID is required for entity management")
}
return nil
}
// Ensures that the Event Hub entity exists.
// This is used during the creation of both producers and consumers.
func (aeh *AzureEventHubs) ensureEventHubEntity(parentCtx context.Context, topic string) error {
client, err := armeventhub.NewEventHubsClient(aeh.metadata.SubscriptionID, aeh.managementCreds, &arm.ClientOptions{
ClientOptions: policy.ClientOptions{
Telemetry: policy.TelemetryOptions{
ApplicationID: "dapr-" + logger.DaprVersion,
},
},
})
if err != nil {
return fmt.Errorf("failed to create Event Hubs ARM client: %w", err)
}
aeh.logger.Debugf("Checking if entity %s exists on Event Hub namespace %s", topic, aeh.metadata.namespaceName)
// Check if the entity exists
ctx, cancel := context.WithTimeout(parentCtx, resourceGetTimeout)
defer cancel()
_, err = client.Get(ctx, aeh.metadata.ResourceGroupName, aeh.metadata.namespaceName, topic, nil)
if err == nil {
// If there's no error, the entity already exists, so all good
aeh.logger.Debugf("Entity %s already exists on Event Hub namespace %s", topic, aeh.metadata.namespaceName)
return nil
}
// Check if the error is NotFound or something else
resErr := &azcore.ResponseError{}
if !errors.As(err, &resErr) || resErr.StatusCode != http.StatusNotFound {
// We have another error, just return it
return fmt.Errorf("failed to retrieve Event Hub entity %s: %w", topic, err)
}
// Create the entity
aeh.logger.Infof("Will create entity %s on Event Hub namespace %s", topic, aeh.metadata.namespaceName)
params := armeventhub.Eventhub{
Properties: &armeventhub.Properties{
MessageRetentionInDays: ptr.Of(int64(aeh.metadata.MessageRetentionInDays)),
PartitionCount: ptr.Of(int64(aeh.metadata.PartitionCount)),
},
}
ctx, cancel = context.WithTimeout(parentCtx, resourceCreationTimeout)
defer cancel()
_, err = client.CreateOrUpdate(ctx, aeh.metadata.ResourceGroupName, aeh.metadata.namespaceName, topic, params, nil)
if err != nil {
return fmt.Errorf("failed to create Event Hub entity %s: %w", topic, err)
}
aeh.logger.Infof("Entity %s created on Event Hub namespace %s", topic, aeh.metadata.namespaceName)
return nil
}
// Ensures that the subscription (consumer group) exists.
// This is used during the creation of consumers only.
func (aeh *AzureEventHubs) ensureSubscription(parentCtx context.Context, hubName string) error {
client, err := armeventhub.NewConsumerGroupsClient(aeh.metadata.SubscriptionID, aeh.managementCreds, &arm.ClientOptions{
ClientOptions: policy.ClientOptions{
Telemetry: policy.TelemetryOptions{
ApplicationID: "dapr-" + logger.DaprVersion,
},
},
})
if err != nil {
return fmt.Errorf("failed to create consumer group ARM client: %w", err)
}
aeh.logger.Debugf("Checking if consumer group %s exists in entity %s", aeh.metadata.ConsumerGroup, hubName)
// Check if the consumer group exists
// We need to use a retry logic here
backOffConfig := retry.DefaultConfig()
backOffConfig.Policy = retry.PolicyExponential
backOffConfig.MaxInterval = resourceCheckMaxRetryInterval
backOffConfig.MaxRetries = resourceCheckMaxRetry
b := backOffConfig.NewBackOffWithContext(parentCtx)
create, err := retry.NotifyRecoverWithData(func() (bool, error) {
c, cErr := aeh.shouldCreateConsumerGroup(parentCtx, client, hubName)
if cErr != nil {
return false, cErr
}
return c, nil
}, b, func(_ error, _ time.Duration) {
aeh.logger.Errorf("Error checking for consumer group for Event Hub: %s. Retrying…", hubName)
}, func() {
aeh.logger.Warnf("Successfully checked for consumer group in Event Hub %s after it previously failed", hubName)
})
if err != nil {
return err
}
if !create {
// Already exists
aeh.logger.Debugf("Consumer group %s already exists in entity %s", aeh.metadata.ConsumerGroup, hubName)
return nil
}
// Need to create the consumer group
aeh.logger.Infof("Will create consumer group %s exists in entity %s", aeh.metadata.ConsumerGroup, hubName)
ctx, cancel := context.WithTimeout(parentCtx, resourceCreationTimeout)
defer cancel()
_, err = client.CreateOrUpdate(ctx, aeh.metadata.ResourceGroupName, aeh.metadata.namespaceName, hubName, aeh.metadata.ConsumerGroup, armeventhub.ConsumerGroup{}, nil)
if err != nil {
return fmt.Errorf("failed to create consumer group %s: %w", aeh.metadata.ConsumerGroup, err)
}
aeh.logger.Infof("Consumer group %s created in entity %s", aeh.metadata.ConsumerGroup, hubName)
return nil
}
func (aeh *AzureEventHubs) shouldCreateConsumerGroup(parentCtx context.Context, client *armeventhub.ConsumerGroupsClient, hubName string) (bool, error) {
ctx, cancel := context.WithTimeout(parentCtx, resourceGetTimeout)
defer cancel()
_, err := client.Get(ctx, aeh.metadata.ResourceGroupName, aeh.metadata.namespaceName, hubName, aeh.metadata.ConsumerGroup, nil)
if err == nil {
// If there's no error, the consumer group already exists, so all good
return false, nil
}
// Check if the error is NotFound or something else
resErr := &azcore.ResponseError{}
if !errors.As(err, &resErr) || resErr.StatusCode != http.StatusNotFound {
// We have another error, just return it
return false, err
}
// Consumer group doesn't exist
return true, nil
}