Use request metadata instead of component metadata (#2119)
Signed-off-by: Shubham Sharma <shubhash@microsoft.com> Signed-off-by: Shubham Sharma <shubhash@microsoft.com>
This commit is contained in:
parent
4f406a597f
commit
cd206609ce
|
@ -45,8 +45,8 @@ const (
|
||||||
// MaxBulkAwaitDurationKey is the key for the max bulk await duration in the metadata.
|
// MaxBulkAwaitDurationKey is the key for the max bulk await duration in the metadata.
|
||||||
MaxBulkAwaitDurationMilliSecondsKey string = "maxBulkAwaitDurationMilliSeconds"
|
MaxBulkAwaitDurationMilliSecondsKey string = "maxBulkAwaitDurationMilliSeconds"
|
||||||
|
|
||||||
// MaxBulkPubBytes defines the maximum bytes to publish in a bulk publish request metadata.
|
// MaxBulkPubBytesKey defines the maximum bytes to publish in a bulk publish request metadata.
|
||||||
MaxBulkPubBytes string = "maxBulkPubBytes"
|
MaxBulkPubBytesKey string = "maxBulkPubBytes"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TryGetTTL tries to get the ttl as a time.Duration value for pubsub, binding and any other building block.
|
// TryGetTTL tries to get the ttl as a time.Duration value for pubsub, binding and any other building block.
|
||||||
|
|
|
@ -32,6 +32,8 @@ import (
|
||||||
"github.com/Azure/go-autorest/autorest/azure"
|
"github.com/Azure/go-autorest/autorest/azure"
|
||||||
|
|
||||||
azauth "github.com/dapr/components-contrib/internal/authentication/azure"
|
azauth "github.com/dapr/components-contrib/internal/authentication/azure"
|
||||||
|
"github.com/dapr/components-contrib/internal/utils"
|
||||||
|
contribMetadata "github.com/dapr/components-contrib/metadata"
|
||||||
"github.com/dapr/components-contrib/pubsub"
|
"github.com/dapr/components-contrib/pubsub"
|
||||||
"github.com/dapr/kit/logger"
|
"github.com/dapr/kit/logger"
|
||||||
"github.com/dapr/kit/retry"
|
"github.com/dapr/kit/retry"
|
||||||
|
@ -57,7 +59,6 @@ const (
|
||||||
missingSubscriptionIDMsg = "error: missing subscriptionID attribute required for entityManagement"
|
missingSubscriptionIDMsg = "error: missing subscriptionID attribute required for entityManagement"
|
||||||
entityManagementConnectionStrMsg = "error: entity management support is not available with connectionString"
|
entityManagementConnectionStrMsg = "error: entity management support is not available with connectionString"
|
||||||
differentTopicConnectionStringErrorTmpl = "error: specified topic %s does not match the event hub name in the provided connectionString"
|
differentTopicConnectionStringErrorTmpl = "error: specified topic %s does not match the event hub name in the provided connectionString"
|
||||||
maxBulkSizeInBytesTooLargeErrorTmpl = "error: maxBulkSizeInBytes exceeds the maximum allowed value of %d"
|
|
||||||
|
|
||||||
// Event Hubs SystemProperties names for metadata passthrough.
|
// Event Hubs SystemProperties names for metadata passthrough.
|
||||||
sysPropSequenceNumber = "x-opt-sequence-number"
|
sysPropSequenceNumber = "x-opt-sequence-number"
|
||||||
|
@ -157,7 +158,6 @@ type azureEventHubsMetadata struct {
|
||||||
PartitionCount int32 `json:"partitionCount,omitempty,string"`
|
PartitionCount int32 `json:"partitionCount,omitempty,string"`
|
||||||
SubscriptionID string `json:"subscriptionID,omitempty"`
|
SubscriptionID string `json:"subscriptionID,omitempty"`
|
||||||
ResourceGroupName string `json:"resourceGroupName,omitempty"`
|
ResourceGroupName string `json:"resourceGroupName,omitempty"`
|
||||||
MaxBulkSizeInBytes int `json:"maxBulkSizeInBytes,omitempty,string"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewAzureEventHubs returns a new Azure Event hubs instance.
|
// NewAzureEventHubs returns a new Azure Event hubs instance.
|
||||||
|
@ -185,10 +185,6 @@ func parseEventHubsMetadata(meta pubsub.Metadata) (*azureEventHubsMetadata, erro
|
||||||
return &m, errors.New(bothConnectionStringNamespaceErrorMsg)
|
return &m, errors.New(bothConnectionStringNamespaceErrorMsg)
|
||||||
}
|
}
|
||||||
|
|
||||||
if m.MaxBulkSizeInBytes > int(eventhub.DefaultMaxMessageSizeInBytes) {
|
|
||||||
return &m, fmt.Errorf(maxBulkSizeInBytesTooLargeErrorTmpl, eventhub.DefaultMaxMessageSizeInBytes)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &m, nil
|
return &m, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -589,9 +585,9 @@ func (aeh *AzureEventHubs) BulkPublish(req *pubsub.BulkPublishRequest) (pubsub.B
|
||||||
}
|
}
|
||||||
|
|
||||||
// Configure options for sending events.
|
// Configure options for sending events.
|
||||||
opts := []eventhub.BatchOption{}
|
opts := []eventhub.BatchOption{
|
||||||
if aeh.metadata.MaxBulkSizeInBytes > 0 {
|
eventhub.BatchWithMaxSizeInBytes(utils.GetElemOrDefaultFromMap(
|
||||||
opts = append(opts, eventhub.BatchWithMaxSizeInBytes(aeh.metadata.MaxBulkSizeInBytes))
|
req.Metadata, contribMetadata.MaxBulkPubBytesKey, int(eventhub.DefaultMaxMessageSizeInBytes))),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send events.
|
// Send events.
|
||||||
|
|
|
@ -17,7 +17,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
eventhub "github.com/Azure/azure-event-hubs-go/v3"
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
@ -68,18 +67,6 @@ func TestParseEventHubsMetadata(t *testing.T) {
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
assert.Equal(t, missingConnectionStringNamespaceErrorMsg, err.Error())
|
assert.Equal(t, missingConnectionStringNamespaceErrorMsg, err.Error())
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("test maxBulkSizeInBytes limits", func(t *testing.T) {
|
|
||||||
val := fmt.Sprintf("%d", eventhub.DefaultMaxMessageSizeInBytes+1)
|
|
||||||
props := map[string]string{"connectionString": "fake", "maxBulkSizeInBytes": val}
|
|
||||||
|
|
||||||
metadata := pubsub.Metadata{Base: metadata.Base{Properties: props}}
|
|
||||||
_, err := parseEventHubsMetadata(metadata)
|
|
||||||
|
|
||||||
expected := fmt.Sprintf(maxBulkSizeInBytesTooLargeErrorTmpl, eventhub.DefaultMaxMessageSizeInBytes)
|
|
||||||
assert.Error(t, err)
|
|
||||||
assert.Equal(t, expected, err.Error())
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestValidateSubscriptionAttributes(t *testing.T) {
|
func TestValidateSubscriptionAttributes(t *testing.T) {
|
||||||
|
|
|
@ -377,7 +377,7 @@ func (a *azureServiceBus) BulkPublish(ctx context.Context, req *pubsub.BulkPubli
|
||||||
|
|
||||||
// Create a new batch of messages with batch options.
|
// Create a new batch of messages with batch options.
|
||||||
batchOpts := &servicebus.MessageBatchOptions{
|
batchOpts := &servicebus.MessageBatchOptions{
|
||||||
MaxBytes: utils.GetElemOrDefaultFromMap(req.Metadata, contribMetadata.MaxBulkPubBytes, defaultMaxBulkPubBytes),
|
MaxBytes: utils.GetElemOrDefaultFromMap(req.Metadata, contribMetadata.MaxBulkPubBytesKey, defaultMaxBulkPubBytes),
|
||||||
}
|
}
|
||||||
|
|
||||||
batchMsg, err := sender.NewMessageBatch(ctx, batchOpts)
|
batchMsg, err := sender.NewMessageBatch(ctx, batchOpts)
|
||||||
|
|
Loading…
Reference in New Issue