Updated Azure Event Hubs and Azure Blob Storage SDKs

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
ItalyPaleAle 2023-02-16 00:21:56 +00:00
parent 643a2abff2
commit 6f30dbf6c6
13 changed files with 226 additions and 223 deletions

View File

@ -94,7 +94,7 @@ func NewAzureBlobStorage(logger logger.Logger) bindings.OutputBinding {
// Init performs metadata parsing.
func (a *AzureBlobStorage) Init(metadata bindings.Metadata) error {
var err error
a.containerClient, a.metadata, err = storageinternal.CreateContainerStorageClient(a.logger, metadata.Properties)
a.containerClient, a.metadata, err = storageinternal.CreateContainerStorageClient(context.TODO(), a.logger, metadata.Properties)
if err != nil {
return err
}

14
go.mod
View File

@ -9,17 +9,17 @@ require (
cloud.google.com/go/storage v1.29.0
dubbo.apache.org/dubbo-go/v3 v3.0.3-0.20230118042253-4f159a2b38f3
github.com/Azure/azure-amqp-common-go/v4 v4.0.0
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.3.0
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.3.1
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0
github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig v0.5.0
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.3
github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1
github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets v0.11.0
github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v0.4.0
github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v0.5.0
github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.1.4
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventgrid/armeventgrid/v2 v2.0.0
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.0.0
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.6.1
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd
github.com/Azure/go-amqp v0.18.1
github.com/Azure/go-autorest/autorest v0.11.28
@ -113,7 +113,7 @@ require (
golang.org/x/crypto v0.5.0
golang.org/x/exp v0.0.0-20230125214544-b3c2aaf6208d
golang.org/x/mod v0.7.0
golang.org/x/net v0.5.0
golang.org/x/net v0.7.0
golang.org/x/oauth2 v0.4.0
google.golang.org/api v0.108.0
google.golang.org/grpc v1.52.3
@ -356,9 +356,9 @@ require (
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.4.1-0.20230105183443-b8be2fde2a9e // indirect
golang.org/x/term v0.4.0 // indirect
golang.org/x/text v0.6.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/term v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.2.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect

14
go.sum
View File

@ -415,6 +415,8 @@ github.com/Azure/azure-sdk-for-go v68.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9mo
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.19.0/go.mod h1:h6H6c8enJmmocHUbLiiGY6sx7f9i+X3m1CHdd5c6Rdw=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.3.0 h1:VuHAcMq8pU1IWNT/m5yRaGqbK0BiQKHT8X4DTp9CHdI=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.3.0/go.mod h1:tZoQYdDZNOiIjdSn0dVWVfl0NEPGOJqVLzSrcFk4Is0=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.3.1 h1:gVXuXcWd1i4C2Ruxe321aU+IKGaStvGB/S90PUPB/W8=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.3.1/go.mod h1:DffdKW9RFqa5VgmsjUOsS7UE7eiA5iAvYUs63bhKQ0M=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.11.0/go.mod h1:HcM1YX14R7CJcghJGOYCgdezslRSVzqwLf/q+4Y2r/0=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0 h1:t/W5MYAuQy81cvM8VUNfRLzhtKpXhVUAN7Cd7KVbTyc=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0/go.mod h1:NBanQUfSWiWn3QEpWDTCU0IjBECKOYvl2R8xdRtMtiM=
@ -433,6 +435,8 @@ github.com/Azure/azure-sdk-for-go/sdk/keyvault/internal v0.7.0 h1:Lg6BW0VPmCwcMl
github.com/Azure/azure-sdk-for-go/sdk/keyvault/internal v0.7.0/go.mod h1:9V2j0jn9jDEkCkv8w/bKTNppX/d0FVA1ud77xCIP4KA=
github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v0.4.0 h1:X/ePaAG8guM7j5WORT5eEIw7cGUxe9Ah1jEQJKLmmSo=
github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v0.4.0/go.mod h1:5dog28UP3dd1BnCPFYvyHfsmA+Phmoezt+KWT5cZnyc=
github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v0.5.0 h1:v4v4ccQInOrQ1dT6Z1jhmSfv/Vo+Gj6TiH4agar4+9c=
github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v0.5.0/go.mod h1:N1KaFfTg2o6ltJ2djIz5oOFE9tgHOHqQR+dIOiAdyUc=
github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.1.4 h1:kaZamwZwmUqnECvnPkf1LBRBIFYYCy3E0gKHn/UFSD0=
github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.1.4/go.mod h1:uDLwkzCJMvTrHsvtiVFeAp85hi3W77zvs61wrpc+6ho=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventgrid/armeventgrid/v2 v2.0.0 h1:PcP4TC+0dC85A3i1p7CbD0FyrjnTvzQ3ipgSkJTIb7Y=
@ -441,6 +445,8 @@ github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.0.
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.0.0/go.mod h1:Y3gnVwfaz8h6L1YHar+NfWORtBoVUSB5h4GlGkdeF7Q=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.6.1 h1:YvQv9Mz6T8oR5ypQOL6erY0Z5t71ak1uHV4QFokCOZk=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.6.1/go.mod h1:c6WvOhtmjNUWbLfOG1qxM/q0SPvQNSVJvolm+C52dIU=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0 h1:u/LLAOFgsMv7HmNL4Qufg58y+qElGOt5qv0z1mURkRY=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0/go.mod h1:2e8rMJtl2+2j+HXbTBwnyGpm5Nou7KhvSfxOq8JpTag=
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd h1:b3wyxBl3vvr15tUAziPBPK354y+LSdfPCpex5oBttHo=
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd/go.mod h1:K6am8mT+5iFXgingS9LUc7TmbsW6XBw3nxaRyaMyWc8=
github.com/Azure/go-amqp v0.18.1 h1:D5Ca+uijuTcj5g76sF+zT4OQZcFFY397+IGf/5Ip5Sc=
@ -2059,6 +2065,8 @@ golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE=
golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw=
golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws=
golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@ -2227,12 +2235,16 @@ golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.4.1-0.20230105183443-b8be2fde2a9e h1:Lw2b7QX5zDuEsD5ZkJNRUGEGkLuho3UAKsO25Ucv140=
golang.org/x/sys v0.4.1-0.20230105183443-b8be2fde2a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.3.0/go.mod h1:q750SLmJuPmVoN1blW3UFBPREJfb1KmY3vwxfr+nFDA=
golang.org/x/term v0.4.0 h1:O7UWfv5+A2qiuulQk30kVinPoMtoIPeVaKLEgLpVkvg=
golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ=
golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@ -2248,6 +2260,8 @@ golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.6.0 h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k=
golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=

View File

@ -34,6 +34,7 @@ import (
)
// NewEnvironmentSettings returns a new EnvironmentSettings configured for a given Azure resource.
// TODO: Remove resourceName when "track1" SDK support is dropped.
func NewEnvironmentSettings(resourceName string, values map[string]string) (EnvironmentSettings, error) {
es := EnvironmentSettings{
Values: values,

View File

@ -15,6 +15,7 @@ package blobstorage
import (
"context"
"errors"
"fmt"
"net/url"
"time"
@ -35,77 +36,134 @@ const (
defaultBlobRetryCount = 3
)
func CreateContainerStorageClient(log logger.Logger, meta map[string]string) (*container.Client, *BlobStorageMetadata, error) {
// CreateContainerStorageClient returns a container.Client and the parsed metadata from the metadata dictionary.
func CreateContainerStorageClient(parentCtx context.Context, log logger.Logger, meta map[string]string) (*container.Client, *BlobStorageMetadata, error) {
// Parse the metadata and set the properties in the object
m, err := parseMetadata(meta)
if err != nil {
return nil, nil, err
}
userAgent := "dapr-" + logger.DaprVersion
options := container.ClientOptions{
azEnvSettings, err := azauth.NewEnvironmentSettings("storage", meta)
if err != nil {
return nil, nil, err
}
if val, _ := mdutils.GetMetadataProperty(meta, azauth.StorageEndpointKeys...); val != "" {
m.customEndpoint = val
}
// Get the container client
client, err := m.InitContainerClient(azEnvSettings)
if err != nil {
return nil, nil, err
}
// Create the container if it doesn't already exist
var accessLevel *azblob.PublicAccessType
if m.PublicAccessLevel != "" && m.PublicAccessLevel != "none" {
accessLevel = &m.PublicAccessLevel
}
ctx, cancel := context.WithTimeout(parentCtx, 30*time.Second)
defer cancel()
err = m.EnsureContainer(ctx, client, accessLevel)
if err != nil {
return nil, nil, fmt.Errorf("failed to create Azure Storage container %s: %w", m.ContainerName, err)
}
return client, m, nil
}
// GetContainerURL returns the URL of the container, needed by some auth methods.
func (opts ContainerClientOpts) GetContainerURL(azEnvSettings azauth.EnvironmentSettings) (u *url.URL, err error) {
if opts.customEndpoint != "" {
u, err = url.Parse(fmt.Sprintf("%s/%s/%s", opts.customEndpoint, opts.AccountName, opts.ContainerName))
if err != nil {
return nil, fmt.Errorf("failed to get container's URL with custom endpoint")
}
} else {
u, _ = url.Parse(fmt.Sprintf("https://%s.blob.%s/%s", opts.AccountName, azEnvSettings.AzureEnvironment.StorageEndpointSuffix, opts.ContainerName))
}
return u, nil
}
// InitContainerClient returns a new container.Client object from the given options.
func (opts ContainerClientOpts) InitContainerClient(azEnvSettings azauth.EnvironmentSettings) (client *container.Client, err error) {
clientOpts := &container.ClientOptions{
ClientOptions: azcore.ClientOptions{
Retry: policy.RetryOptions{
MaxRetries: m.RetryCount,
MaxRetries: opts.RetryCount,
},
Telemetry: policy.TelemetryOptions{
ApplicationID: userAgent,
ApplicationID: "dapr-" + logger.DaprVersion,
},
},
}
settings, err := azauth.NewEnvironmentSettings("storage", meta)
if err != nil {
return nil, nil, err
}
var customEndpoint string
if val, ok := mdutils.GetMetadataProperty(meta, azauth.StorageEndpointKeys...); ok && val != "" {
customEndpoint = val
}
var URL *url.URL
if customEndpoint != "" {
var parseErr error
URL, parseErr = url.Parse(fmt.Sprintf("%s/%s/%s", customEndpoint, m.AccountName, m.ContainerName))
if parseErr != nil {
return nil, nil, parseErr
switch {
// Use a connection string
case opts.ConnectionString != "":
client, err = container.NewClientFromConnectionString(opts.ConnectionString, opts.ContainerName, clientOpts)
if err != nil {
return nil, fmt.Errorf("cannot init blob storage container client with connection string: %w", err)
}
} else {
env := settings.AzureEnvironment
URL, _ = url.Parse(fmt.Sprintf("https://%s.blob.%s/%s", m.AccountName, env.StorageEndpointSuffix, m.ContainerName))
}
var clientErr error
var client *container.Client
// Try using shared key credentials first
if m.AccountKey != "" {
credential, newSharedKeyErr := azblob.NewSharedKeyCredential(m.AccountName, m.AccountKey)
if newSharedKeyErr != nil {
return nil, nil, fmt.Errorf("invalid shared key credentials with error: %w", newSharedKeyErr)
// Use a shared account key
case opts.AccountKey != "" && opts.AccountName != "":
var (
credential *azblob.SharedKeyCredential
u *url.URL
)
credential, err = azblob.NewSharedKeyCredential(opts.AccountName, opts.AccountKey)
if err != nil {
return nil, fmt.Errorf("invalid shared key credentials with error: %w", err)
}
client, clientErr = container.NewClientWithSharedKeyCredential(URL.String(), credential, &options)
if clientErr != nil {
return nil, nil, fmt.Errorf("cannot init Blobstorage container client: %w", clientErr)
u, err = opts.GetContainerURL(azEnvSettings)
if err != nil {
return nil, err
}
} else {
// fallback to AAD
credential, tokenErr := settings.GetTokenCredential()
client, err = container.NewClientWithSharedKeyCredential(u.String(), credential, clientOpts)
if err != nil {
return nil, fmt.Errorf("cannot init blob storage container client with shared key: %w", err)
}
// Use Azure AD as fallback
default:
credential, tokenErr := azEnvSettings.GetTokenCredential()
if tokenErr != nil {
return nil, nil, fmt.Errorf("invalid token credentials with error: %w", tokenErr)
return nil, fmt.Errorf("invalid token credentials with error: %w", tokenErr)
}
var u *url.URL
u, err = opts.GetContainerURL(azEnvSettings)
if err != nil {
return nil, err
}
client, err = container.NewClient(u.String(), credential, clientOpts)
if err != nil {
return nil, fmt.Errorf("cannot init blob storage container client with Azure AD token: %w", err)
}
client, clientErr = container.NewClient(URL.String(), credential, &options)
}
if clientErr != nil {
return nil, nil, fmt.Errorf("cannot init Blobstorage client: %w", clientErr)
}
createContainerOptions := container.CreateOptions{
Access: &m.PublicAccessLevel,
Metadata: map[string]string{},
}
timeoutCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
_, err = client.Create(timeoutCtx, &createContainerOptions)
cancel()
// Don't return error, container might already exist
log.Debugf("error creating container: %v", err)
return client, m, nil
return client, nil
}
// EnsureContainer creates the container if it doesn't already exist.
// Property "accessLevel" indicates the public access level; nil-value means the container is private
func (opts ContainerClientOpts) EnsureContainer(ctx context.Context, client *container.Client, accessLevel *azblob.PublicAccessType) error {
// Create the container
// This will return an error if it already exists
_, err := client.Create(ctx, &container.CreateOptions{
Access: accessLevel,
})
if err != nil {
// Check if it's an Azure Storage error
resErr := &azcore.ResponseError{}
// If the container already exists, return no error
if errors.As(err, &resErr) && (resErr.ErrorCode == "ContainerAlreadyExists" || resErr.ErrorCode == "ResourceAlreadyExists") {
return nil
}
return err
}
return nil
}

View File

@ -14,6 +14,7 @@ limitations under the License.
package blobstorage
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
@ -43,7 +44,7 @@ func TestClientInitFailures(t *testing.T) {
for name, s := range scenarios {
t.Run(name, func(t *testing.T) {
_, _, err := CreateContainerStorageClient(log, s.metadata)
_, _, err := CreateContainerStorageClient(context.Background(), log, s.metadata)
assert.Contains(t, err.Error(), s.expectedFailureSubString)
})
}

View File

@ -15,6 +15,7 @@ package blobstorage
import (
"fmt"
"net/url"
"strconv"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
@ -24,18 +25,31 @@ import (
)
type BlobStorageMetadata struct {
AccountName string
AccountKey string
ContainerName string
RetryCount int32 `json:"retryCount,string"`
DecodeBase64 bool `json:"decodeBase64,string"`
PublicAccessLevel azblob.PublicAccessType
ContainerClientOpts `json:",inline" mapstructure:",squash"`
DecodeBase64 bool `json:"decodeBase64,string"`
PublicAccessLevel azblob.PublicAccessType
}
type ContainerClientOpts struct {
// Use a connection string
ConnectionString string
ContainerName string
// Use a shared account key
AccountName string
AccountKey string
// Misc
RetryCount int32 `json:"retryCount,string"`
// Private properties
containerURL *url.URL `json:"-" mapstructure:"-"`
customEndpoint string `json:"-" mapstructure:"-"`
}
func parseMetadata(meta map[string]string) (*BlobStorageMetadata, error) {
m := BlobStorageMetadata{
RetryCount: defaultBlobRetryCount,
}
m := BlobStorageMetadata{}
m.RetryCount = defaultBlobRetryCount
mdutils.DecodeMetadata(meta, &m)
if val, ok := mdutils.GetMetadataProperty(meta, azauth.StorageAccountNameKeys...); ok && val != "" {

View File

@ -21,6 +21,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/dapr/kit/logger"
"github.com/dapr/kit/ptr"
)
const (
@ -79,7 +80,10 @@ func CreateBlobHTTPHeadersFromRequest(meta map[string]string, contentType *strin
return blobHTTPHeaders, nil
}
func SanitizeMetadata(log logger.Logger, metadata map[string]string) map[string]string {
// SanitizeMetadata is used by Azure Blob Storage components to sanitize the metadata.
// Keys can only contain [A-Za-z0-9], and values are only allowed characters in the ASCII table.
func SanitizeMetadata(log logger.Logger, metadata map[string]string) map[string]*string {
res := make(map[string]*string, len(metadata))
for key, val := range metadata {
// Keep only letters and digits
n := 0
@ -96,8 +100,6 @@ func SanitizeMetadata(log logger.Logger, metadata map[string]string) map[string]
if n != len(key) {
nks := string(newKey[:n])
log.Warnf("metadata key %s contains disallowed characters, sanitized to %s", key, nks)
delete(metadata, key)
metadata[nks] = val
key = nks
}
@ -105,14 +107,14 @@ func SanitizeMetadata(log logger.Logger, metadata map[string]string) map[string]
n = 0
newVal := make([]byte, len(val))
for i := 0; i < len(val); i++ {
if val[i] > 127 {
if val[i] > 127 || val[i] == 0 {
continue
}
newVal[n] = val[i]
n++
}
metadata[key] = string(newVal[:n])
res[key] = ptr.Of(string(newVal[:n]))
}
return metadata
return res
}

View File

@ -61,8 +61,11 @@ func TestSanitizeRequestMetadata(t *testing.T) {
"not-allowed:": "not-allowed",
}
meta := SanitizeMetadata(log, m)
assert.Equal(t, meta["somecustomfield"], "some-custom-value")
assert.Equal(t, meta["specialfield"], "special:value")
assert.Equal(t, meta["notallowed"], "not-allowed")
_ = assert.NotNil(t, meta["somecustomfield"]) &&
assert.Equal(t, *meta["somecustomfield"], "some-custom-value")
_ = assert.NotNil(t, meta["specialfield"]) &&
assert.Equal(t, *meta["specialfield"], "special:value")
_ = assert.NotNil(t, meta["notallowed"]) &&
assert.Equal(t, *meta["notallowed"], "not-allowed")
})
}

View File

@ -26,11 +26,11 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"golang.org/x/exp/maps"
azauth "github.com/dapr/components-contrib/internal/authentication/azure"
"github.com/dapr/components-contrib/internal/component/azure/blobstorage"
"github.com/dapr/kit/logger"
"github.com/dapr/kit/retry"
)
@ -51,7 +51,7 @@ type AzureEventHubs struct {
managementCreds azcore.TokenCredential
// TODO(@ItalyPaleAle): Remove in Dapr 1.13
isFailed *atomic.Bool
isFailed atomic.Bool
}
// NewAzureEventHubs returns a new Azure Event hubs instance.
@ -62,7 +62,6 @@ func NewAzureEventHubs(logger logger.Logger, isBinding bool) *AzureEventHubs {
producersLock: &sync.RWMutex{},
producers: make(map[string]*azeventhubs.ProducerClient, 1),
checkpointStoreLock: &sync.RWMutex{},
isFailed: &atomic.Bool{},
}
}
@ -76,15 +75,10 @@ func (aeh *AzureEventHubs) Init(metadata map[string]string) error {
if aeh.metadata.ConnectionString == "" {
// If connecting via Azure AD, we need to do some more initialization
var env azauth.EnvironmentSettings
env, err = azauth.NewEnvironmentSettings("eventhubs", metadata)
aeh.metadata.azEnvSettings, err = azauth.NewEnvironmentSettings("eventhubs", metadata)
if err != nil {
return fmt.Errorf("failed to initialize Azure AD credentials: %w", err)
}
aeh.metadata.aadTokenProvider, err = env.GetTokenCredential()
if err != nil {
return fmt.Errorf("failed to get Azure AD token credentials provider: %w", err)
}
aeh.logger.Info("connecting to Azure Event Hub using Azure AD; the connection will be established on first publish/subscribe and req.Topic field in incoming requests will be honored")
@ -390,7 +384,11 @@ func (aeh *AzureEventHubs) getProducerClientForTopic(ctx context.Context, topic
}
} else {
// Use Azure AD
client, err = azeventhubs.NewProducerClient(aeh.metadata.EventHubNamespace, topic, aeh.metadata.aadTokenProvider, clientOpts)
cred, tokenErr := aeh.metadata.azEnvSettings.GetTokenCredential()
if tokenErr != nil {
return nil, fmt.Errorf("failed to get credentials from Azure AD: %w", tokenErr)
}
client, err = azeventhubs.NewProducerClient(aeh.metadata.EventHubNamespace, topic, cred, clientOpts)
if err != nil {
return nil, fmt.Errorf("unable to connect to Azure Event Hub using Azure AD: %w", err)
}
@ -448,7 +446,11 @@ func (aeh *AzureEventHubs) getProcessorForTopic(ctx context.Context, topic strin
}
} else {
// Use Azure AD
consumerClient, err = azeventhubs.NewConsumerClient(aeh.metadata.EventHubNamespace, topic, aeh.metadata.ConsumerGroup, aeh.metadata.aadTokenProvider, clientOpts)
cred, tokenErr := aeh.metadata.azEnvSettings.GetTokenCredential()
if tokenErr != nil {
return nil, fmt.Errorf("failed to get credentials from Azure AD: %w", tokenErr)
}
consumerClient, err = azeventhubs.NewConsumerClient(aeh.metadata.EventHubNamespace, topic, aeh.metadata.ConsumerGroup, cred, clientOpts)
if err != nil {
return nil, fmt.Errorf("unable to connect to Azure Event Hub using Azure AD: %w", err)
}
@ -500,132 +502,47 @@ func (aeh *AzureEventHubs) createCheckpointStore(ctx context.Context) (checkpoin
return nil, errors.New("property storageContainerName is required to subscribe to an Event Hub topic")
}
// Ensure the container exists
err = aeh.ensureStorageContainer(ctx)
// Get the Azure Blob Storage client and ensure the container exists
client, err := aeh.createStorageClient(ctx, true)
if err != nil {
return nil, err
}
// Create the checkpoint store
checkpointStoreOpts := &checkpoints.BlobStoreOptions{
checkpointStore, err = checkpoints.NewBlobStore(client, &checkpoints.BlobStoreOptions{
ClientOptions: policy.ClientOptions{
Telemetry: policy.TelemetryOptions{
ApplicationID: "dapr-" + logger.DaprVersion,
},
},
})
if err != nil {
return nil, fmt.Errorf("error creating checkpointer: %w", err)
}
if aeh.metadata.StorageConnectionString != "" {
// Authenticate with a connection string
checkpointStore, err = checkpoints.NewBlobStoreFromConnectionString(aeh.metadata.StorageConnectionString, aeh.metadata.StorageContainerName, checkpointStoreOpts)
if err != nil {
return nil, fmt.Errorf("error creating checkpointer from connection string: %w", err)
}
} else if aeh.metadata.StorageAccountKey != "" {
// Authenticate with a shared key
// TODO: This is a workaround in which we assemble a connection string until https://github.com/Azure/azure-sdk-for-go/issues/19842 is fixed
connString := fmt.Sprintf("DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s;EndpointSuffix=core.windows.net", aeh.metadata.StorageAccountName, aeh.metadata.StorageAccountKey)
checkpointStore, err = checkpoints.NewBlobStoreFromConnectionString(connString, aeh.metadata.StorageContainerName, checkpointStoreOpts)
if err != nil {
return nil, fmt.Errorf("error creating checkpointer from storage account credentials: %w", err)
}
} else {
// Use Azure AD
// If Event Hub is authenticated using a connection string, we can't use Azure AD here
if aeh.metadata.ConnectionString != "" {
return nil, errors.New("either one of storageConnectionString or storageAccountKey is required when subscribing to an Event Hub topic without using Azure AD")
}
// Use the global URL for Azure Storage
containerURL := fmt.Sprintf("https://%s.blob.%s/%s", aeh.metadata.StorageAccountName, "core.windows.net", aeh.metadata.StorageContainerName)
checkpointStore, err = checkpoints.NewBlobStore(containerURL, aeh.metadata.aadTokenProvider, checkpointStoreOpts)
if err != nil {
return nil, fmt.Errorf("error creating checkpointer from Azure AD credentials: %w", err)
}
}
return checkpointStore, nil
}
// Ensures that the container exists in the Azure Storage Account.
// This is done to preserve backwards-compatibility with Dapr 1.9, as the old checkpoint SDK created them automatically.
func (aeh *AzureEventHubs) ensureStorageContainer(parentCtx context.Context) error {
// Get a client to Azure Blob Storage
client, err := aeh.createStorageClient()
// Creates a client to access Azure Blob Storage.
// TODO(@ItalyPaleAle): Remove ensureContainer option (and default to true) for Dapr 1.13
func (aeh *AzureEventHubs) createStorageClient(ctx context.Context, ensureContainer bool) (*container.Client, error) {
m := blobstorage.ContainerClientOpts{
ConnectionString: aeh.metadata.StorageConnectionString,
ContainerName: aeh.metadata.StorageContainerName,
AccountName: aeh.metadata.StorageAccountName,
AccountKey: aeh.metadata.StorageAccountKey,
RetryCount: 3,
}
client, err := m.InitContainerClient(aeh.metadata.azEnvSettings)
if err != nil {
return err
return nil, err
}
// Create the container
// This will return an error if it already exists
ctx, cancel := context.WithTimeout(parentCtx, resourceCreationTimeout)
defer cancel()
_, err = client.CreateContainer(ctx, aeh.metadata.StorageContainerName, &container.CreateOptions{
// Default is private
Access: nil,
})
if err != nil {
// Check if it's an Azure Storage error
resErr := &azcore.ResponseError{}
// If the container already exists, return no error
if errors.As(err, &resErr) && (resErr.ErrorCode == "ContainerAlreadyExists" || resErr.ErrorCode == "ResourceAlreadyExists") {
return nil
}
return fmt.Errorf("failed to create Azure Storage container %s: %w", aeh.metadata.StorageContainerName, err)
}
return nil
}
// Creates a client to access Azure Blob Storage
func (aeh *AzureEventHubs) createStorageClient() (*azblob.Client, error) {
options := azblob.ClientOptions{
ClientOptions: azcore.ClientOptions{
Telemetry: policy.TelemetryOptions{
ApplicationID: "dapr-" + logger.DaprVersion,
},
},
}
var (
err error
client *azblob.Client
)
// Use the global URL for Azure Storage
accountURL := fmt.Sprintf("https://%s.blob.%s", aeh.metadata.StorageAccountName, "core.windows.net")
if aeh.metadata.StorageConnectionString != "" {
// Authenticate with a connection string
client, err = azblob.NewClientFromConnectionString(aeh.metadata.StorageConnectionString, &options)
if ensureContainer {
// Ensure the container exists
// We're setting "accessLevel" to nil to make sure it's private
err = m.EnsureContainer(ctx, client, nil)
if err != nil {
return nil, fmt.Errorf("error creating Azure Storage client from connection string: %w", err)
}
} else if aeh.metadata.StorageAccountKey != "" {
// Authenticate with a shared key
credential, newSharedKeyErr := azblob.NewSharedKeyCredential(aeh.metadata.StorageAccountName, aeh.metadata.StorageAccountKey)
if newSharedKeyErr != nil {
return nil, fmt.Errorf("invalid Azure Storage shared key credentials with error: %w", newSharedKeyErr)
}
client, err = azblob.NewClientWithSharedKeyCredential(accountURL, credential, &options)
if err != nil {
return nil, fmt.Errorf("error creating Azure Storage client from shared key credentials: %w", err)
}
} else {
// Use Azure AD
var (
settings azauth.EnvironmentSettings
credential azcore.TokenCredential
)
settings, err = azauth.NewEnvironmentSettings("storage", aeh.metadata.properties)
if err != nil {
return nil, fmt.Errorf("error getting Azure environment settings: %w", err)
}
credential, err = settings.GetTokenCredential()
if err != nil {
return nil, fmt.Errorf("invalid Azure Storage token credentials with error: %w", err)
}
client, err = azblob.NewClient(accountURL, credential, &options)
if err != nil {
return nil, fmt.Errorf("error creating Azure Storage client from token credentials: %w", err)
return nil, err
}
}

View File

@ -16,11 +16,11 @@ package eventhubs
import (
"errors"
"fmt"
"regexp"
"strings"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
azauth "github.com/dapr/components-contrib/internal/authentication/azure"
"github.com/dapr/components-contrib/metadata"
"github.com/dapr/kit/logger"
)
@ -45,10 +45,10 @@ type azureEventHubsMetadata struct {
PartitionID string `json:"partitionID" mapstructure:"partitionID"` // Deprecated
// Internal properties
namespaceName string
hubName string
aadTokenProvider azcore.TokenCredential
properties map[string]string
namespaceName string
hubName string
azEnvSettings azauth.EnvironmentSettings
properties map[string]string
}
func parseEventHubsMetadata(meta map[string]string, isBinding bool, log logger.Logger) (*azureEventHubsMetadata, error) {
@ -108,7 +108,7 @@ func parseEventHubsMetadata(meta map[string]string, isBinding bool, log logger.L
if hubName != "" {
log.Infof(`The provided connection string is specific to the Event Hub ("entity path") '%s'; publishing or subscribing to a topic that does not match this Event Hub will fail when attempted`, hubName)
} else {
log.Infof(`The provided connection string does not contain an Event Hub name ("entity path"); the connection will be established on first publish/subscribe and req.Topic field in incoming requests will be honored`)
log.Info(`The provided connection string does not contain an Event Hub name ("entity path"); the connection will be established on first publish/subscribe and req.Topic field in incoming requests will be honored`)
}
m.hubName = hubName
@ -141,14 +141,11 @@ func parseEventHubsMetadata(meta map[string]string, isBinding bool, log logger.L
return &m, nil
}
var hubNameMatch = regexp.MustCompile(`(?i)(^|;)EntityPath=([^;]+)(;|$)`)
// Returns the hub name (topic) from the connection string.
// TODO: Temporary until https://github.com/Azure/azure-sdk-for-go/issues/19840 is fixed - then use `conn.ParsedConnectionFromStr(aeh.metadata.ConnectionString)` and look at the `HubName` property.
func hubNameFromConnString(connString string) string {
match := hubNameMatch.FindStringSubmatch(connString)
if len(match) < 3 {
props, err := azeventhubs.ParseConnectionString(connString)
if err != nil || props.EntityPath == nil {
return ""
}
return match[2]
return *props.EntityPath
}

View File

@ -31,10 +31,11 @@ import (
// Because the new SDK stores checkpoints in a different way, clients using the new ("track 2") and the old SDK cannot coexist.
// To ensure this doesn't happen, when we create a new subscription to the same topic and with the same consumer group, we check if there's a file in Azure Storage with the checkpoint created by the old SDK and with a still-active lease. If that's true, we wait (until the context expires) before we crash Dapr with a log message describing what's happening.
// These conflicts should be transient anyways, as mixed versions of Dapr should only happen during a rollout of a new version of Dapr.
// TODO(@ItalyPaleAle): Remove this for Dapr 1.13
// TODO(@ItalyPaleAle): Remove this (entire file) for Dapr 1.13
func (aeh *AzureEventHubs) ensureNoTrack1Subscribers(parentCtx context.Context, topic string) error {
// Get a client to Azure Blob Storage
client, err := aeh.createStorageClient()
// Because we are not using "ensureContainer=true", we can pass a nil context
client, err := aeh.createStorageClient(nil, false)
if err != nil {
return err
}
@ -52,7 +53,7 @@ func (aeh *AzureEventHubs) ensureNoTrack1Subscribers(parentCtx context.Context,
backOffConfig.MaxRetries = -1
b := backOffConfig.NewBackOffWithContext(parentCtx)
err = backoff.Retry(func() error {
pager := client.NewListBlobsFlatPager(aeh.metadata.StorageContainerName, &container.ListBlobsFlatOptions{
pager := client.NewListBlobsFlatPager(&container.ListBlobsFlatOptions{
Prefix: &prefix,
})
for pager.More() {

View File

@ -64,16 +64,13 @@ const (
type StateStore struct {
state.DefaultBulkStore
containerClient *container.Client
json jsoniter.API
features []state.Feature
logger logger.Logger
logger logger.Logger
}
// Init the connection to blob storage, optionally creates a blob container if it doesn't exist.
func (r *StateStore) Init(metadata state.Metadata) error {
var err error
r.containerClient, _, err = storageinternal.CreateContainerStorageClient(r.logger, metadata.Properties)
r.containerClient, _, err = storageinternal.CreateContainerStorageClient(context.TODO(), r.logger, metadata.Properties)
if err != nil {
return err
}
@ -82,7 +79,7 @@ func (r *StateStore) Init(metadata state.Metadata) error {
// Features returns the features available in this state store.
func (r *StateStore) Features() []state.Feature {
return r.features
return []state.Feature{state.FeatureETag}
}
// Delete the state.
@ -118,9 +115,7 @@ func (r *StateStore) GetComponentMetadata() map[string]string {
// NewAzureBlobStorageStore instance.
func NewAzureBlobStorageStore(logger logger.Logger) state.Store {
s := &StateStore{
json: jsoniter.ConfigFastest,
features: []state.Feature{state.FeatureETag},
logger: logger,
logger: logger,
}
s.DefaultBulkStore = state.NewDefaultBulkStore(s)