AzBlob State: Migrate to Track2 SDK

Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com>
This commit is contained in:
Bernd Verst 2022-11-17 19:55:02 -08:00
parent 17d38d7f65
commit 7ccca4def8
4 changed files with 140 additions and 85 deletions

View File

@ -36,16 +36,21 @@ Concurrency is supported with ETags according to https://docs.microsoft.com/en-u
package blobstorage package blobstorage
import ( import (
"bytes"
"context" "context"
b64 "encoding/base64" b64 "encoding/base64"
"fmt" "fmt"
"io"
"net"
"net/url" "net/url"
"reflect" "reflect"
"strings" "strings"
"time"
"github.com/Azure/azure-storage-blob-go/azblob" "github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
jsoniter "github.com/json-iterator/go" jsoniter "github.com/json-iterator/go"
azauth "github.com/dapr/components-contrib/internal/authentication/azure" azauth "github.com/dapr/components-contrib/internal/authentication/azure"
@ -63,12 +68,13 @@ const (
contentLanguage = "ContentLanguage" contentLanguage = "ContentLanguage"
contentDisposition = "ContentDisposition" contentDisposition = "ContentDisposition"
cacheControl = "CacheControl" cacheControl = "CacheControl"
endpointKey = "endpoint"
) )
// StateStore Type. // StateStore Type.
type StateStore struct { type StateStore struct {
state.DefaultBulkStore state.DefaultBulkStore
containerURL azblob.ContainerURL containerClient *container.Client
json jsoniter.API json jsoniter.API
features []state.Feature features []state.Feature
@ -78,49 +84,77 @@ type StateStore struct {
type blobStorageMetadata struct { type blobStorageMetadata struct {
AccountName string AccountName string
ContainerName string ContainerName string
AccountKey string
} }
// Init the connection to blob storage, optionally creates a blob container if it doesn't exist. // Init the connection to blob storage, optionally creates a blob container if it doesn't exist.
func (r *StateStore) Init(metadata state.Metadata) error { func (r *StateStore) Init(metadata state.Metadata) error {
meta, err := getBlobStorageMetadata(metadata.Properties) m, err := getBlobStorageMetadata(metadata.Properties)
if err != nil { if err != nil {
return err return err
} }
credential, env, err := azauth.GetAzureStorageBlobCredentials(r.logger, meta.AccountName, metadata.Properties)
if err != nil {
return fmt.Errorf("invalid credentials with error: %s", err.Error())
}
userAgent := "dapr-" + logger.DaprVersion userAgent := "dapr-" + logger.DaprVersion
options := azblob.PipelineOptions{ options := container.ClientOptions{
Telemetry: azblob.TelemetryOptions{Value: userAgent}, ClientOptions: azcore.ClientOptions{
Telemetry: policy.TelemetryOptions{
ApplicationID: userAgent,
},
},
} }
p := azblob.NewPipeline(credential, options)
settings, err := azauth.NewEnvironmentSettings("storage", metadata.Properties)
if err != nil {
return err
}
customEndpoint, ok := metadata.Properties[endpointKey]
var URL *url.URL var URL *url.URL
customEndpoint, ok := mdutils.GetMetadataProperty(metadata.Properties, azauth.StorageEndpointKeys...)
if ok && customEndpoint != "" { if ok && customEndpoint != "" {
URL, err = url.Parse(fmt.Sprintf("%s/%s/%s", customEndpoint, meta.AccountName, meta.ContainerName)) var parseErr error
URL, parseErr = url.Parse(fmt.Sprintf("%s/%s/%s", customEndpoint, m.AccountName, m.ContainerName))
if parseErr != nil {
return parseErr
}
} else { } else {
URL, err = url.Parse(fmt.Sprintf("https://%s.blob.%s/%s", meta.AccountName, env.StorageEndpointSuffix, meta.ContainerName)) env := settings.AzureEnvironment
URL, _ = url.Parse(fmt.Sprintf("https://%s.blob.%s/%s", m.AccountName, env.StorageEndpointSuffix, m.ContainerName))
} }
var clientErr error
var client *container.Client
// Try using shared key credentials first
if m.AccountKey != "" {
credential, newSharedKeyErr := azblob.NewSharedKeyCredential(m.AccountName, m.AccountKey)
if err != nil { if err != nil {
return err return fmt.Errorf("invalid credentials with error: %w", newSharedKeyErr)
} }
containerURL := azblob.NewContainerURL(*URL, p) client, clientErr = container.NewClientWithSharedKeyCredential(URL.String(), credential, &options)
if clientErr != nil {
_, err = net.LookupHost(URL.Hostname()) return fmt.Errorf("cannot init Blobstorage container client: %w", err)
}
container.NewClientWithSharedKeyCredential(URL.String(), credential, &options)
r.containerClient = client
} else {
// fallback to AAD
credential, tokenErr := settings.GetTokenCredential()
if err != nil { if err != nil {
return err return fmt.Errorf("invalid credentials with error: %w", tokenErr)
}
client, clientErr = container.NewClient(URL.String(), credential, &options)
}
if clientErr != nil {
return fmt.Errorf("cannot init Blobstorage client: %w", clientErr)
} }
ctx := context.Background() createContainerOptions := container.CreateOptions{
_, err = containerURL.Create(ctx, azblob.Metadata{}, azblob.PublicAccessNone) Access: nil,
r.logger.Debugf("error creating container: %s", err) }
timeoutCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
r.containerURL = containerURL _, err = client.Create(timeoutCtx, &createContainerOptions)
r.logger.Debugf("using container '%s'", meta.ContainerName) cancel()
// Don't return error, container might already exist
r.logger.Debugf("error creating container: %w", err)
r.containerClient = client
return nil return nil
} }
@ -149,10 +183,11 @@ func (r *StateStore) Set(req *state.SetRequest) error {
} }
func (r *StateStore) Ping() error { func (r *StateStore) Ping() error {
accessConditions := azblob.BlobAccessConditions{} getPropertiesOptions := container.GetPropertiesOptions{
LeaseAccessConditions: &container.LeaseAccessConditions{},
if _, err := r.containerURL.GetProperties(context.Background(), accessConditions.LeaseAccessConditions); err != nil { }
return fmt.Errorf("blob storage: error connecting to Blob storage at %s: %s", r.containerURL.URL().Host, err) if _, err := r.containerClient.GetProperties(context.Background(), &getPropertiesOptions); err != nil {
return fmt.Errorf("blob storage: error connecting to Blob storage at %s: %s", r.containerClient.URL(), err)
} }
return nil return nil
@ -197,9 +232,13 @@ func getBlobStorageMetadata(meta map[string]string) (*blobStorageMetadata, error
} }
func (r *StateStore) readFile(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) { func (r *StateStore) readFile(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) {
blobURL := r.containerURL.NewBlockBlobURL(getFileName(req.Key)) blockBlobClient := r.containerClient.NewBlockBlobClient(getFileName(req.Key))
resp, err := blobURL.Download(ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false) downloadOptions := azblob.DownloadStreamOptions{
AccessConditions: &blob.AccessConditions{},
}
blobDownloadResponse, err := blockBlobClient.DownloadStream(ctx, &downloadOptions)
if err != nil { if err != nil {
r.logger.Debugf("download file %s, err %s", req.Key, err) r.logger.Debugf("download file %s, err %s", req.Key, err)
@ -210,107 +249,124 @@ func (r *StateStore) readFile(ctx context.Context, req *state.GetRequest) (*stat
return &state.GetResponse{}, err return &state.GetResponse{}, err
} }
bodyStream := resp.Body(azblob.RetryReaderOptions{}) blobData := &bytes.Buffer{}
data, err := io.ReadAll(bodyStream) reader := blobDownloadResponse.Body
_, err = blobData.ReadFrom(reader)
if err != nil { if err != nil {
r.logger.Debugf("read file %s, err %s", req.Key, err) return &state.GetResponse{}, fmt.Errorf("error reading az blob: %w", err)
return &state.GetResponse{}, err }
err = reader.Close()
if err != nil {
return &state.GetResponse{}, fmt.Errorf("error closing az blob reader: %w", err)
} }
contentType := resp.ContentType() contentType := blobDownloadResponse.ContentType
return &state.GetResponse{ return &state.GetResponse{
Data: data, Data: blobData.Bytes(),
ETag: ptr.Of(string(resp.ETag())), ETag: ptr.Of(string(*blobDownloadResponse.ETag)),
ContentType: &contentType, ContentType: contentType,
}, nil }, nil
} }
func (r *StateStore) writeFile(ctx context.Context, req *state.SetRequest) error { func (r *StateStore) writeFile(ctx context.Context, req *state.SetRequest) error {
accessConditions := azblob.BlobAccessConditions{} modifiedAccessConditions := blob.ModifiedAccessConditions{}
if req.ETag != nil && *req.ETag != "" { if req.ETag != nil && *req.ETag != "" {
accessConditions.IfMatch = azblob.ETag(*req.ETag) modifiedAccessConditions.IfMatch = ptr.Of(azcore.ETag(*req.ETag))
} }
if req.Options.Concurrency == state.FirstWrite && (req.ETag == nil || *req.ETag == "") { if req.Options.Concurrency == state.FirstWrite && (req.ETag == nil || *req.ETag == "") {
accessConditions.IfNoneMatch = azblob.ETag("*") modifiedAccessConditions.IfNoneMatch = ptr.Of(azcore.ETagAny)
} }
blobURL := r.containerURL.NewBlockBlobURL(getFileName(req.Key)) accessConditions := blob.AccessConditions{
ModifiedAccessConditions: &modifiedAccessConditions,
}
blobHTTPHeaders, err := r.createBlobHTTPHeadersFromRequest(req) blobHTTPHeaders, err := r.createBlobHTTPHeadersFromRequest(req)
if err != nil { if err != nil {
return err return err
} }
_, err = azblob.UploadBufferToBlockBlob(ctx, r.marshal(req), blobURL, azblob.UploadToBlockBlobOptions{
Metadata: req.Metadata,
AccessConditions: accessConditions,
BlobHTTPHeaders: blobHTTPHeaders,
})
if err != nil {
r.logger.Debugf("write file %s, err %s", req.Key, err)
uploadOptions := azblob.UploadBufferOptions{
AccessConditions: &accessConditions,
Metadata: req.Metadata,
HTTPHeaders: &blobHTTPHeaders,
Concurrency: 16,
}
blockBlobClient := r.containerClient.NewBlockBlobClient(getFileName(req.Key))
_, err = blockBlobClient.UploadBuffer(ctx, r.marshal(req), &uploadOptions)
if err != nil {
// Check if the error is due to ETag conflict // Check if the error is due to ETag conflict
if req.ETag != nil && isETagConflictError(err) { if req.ETag != nil && isETagConflictError(err) {
return state.NewETagError(state.ETagMismatch, err) return state.NewETagError(state.ETagMismatch, err)
} }
return err return fmt.Errorf("error uploading az blob: %w", err)
} }
return nil return nil
} }
func (r *StateStore) createBlobHTTPHeadersFromRequest(req *state.SetRequest) (azblob.BlobHTTPHeaders, error) { func (r *StateStore) createBlobHTTPHeadersFromRequest(req *state.SetRequest) (blob.HTTPHeaders, error) {
var blobHTTPHeaders azblob.BlobHTTPHeaders blobHTTPHeaders := blob.HTTPHeaders{}
if val, ok := req.Metadata[contentType]; ok && val != "" { if val, ok := req.Metadata[contentType]; ok && val != "" {
blobHTTPHeaders.ContentType = val blobHTTPHeaders.BlobContentType = &val
delete(req.Metadata, contentType) delete(req.Metadata, contentType)
} }
if req.ContentType != nil { if req.ContentType != nil {
if blobHTTPHeaders.ContentType != "" { if blobHTTPHeaders.BlobContentType != nil {
r.logger.Warnf("ContentType received from request Metadata %s, as well as ContentType property %s, choosing value from contentType property", blobHTTPHeaders.ContentType, *req.ContentType) r.logger.Warnf("ContentType received from request Metadata %s, as well as ContentType property %s, choosing value from contentType property", blobHTTPHeaders.BlobContentType, req.ContentType)
} }
blobHTTPHeaders.ContentType = *req.ContentType blobHTTPHeaders.BlobContentType = req.ContentType
} }
if val, ok := req.Metadata[contentMD5]; ok && val != "" { if val, ok := req.Metadata[contentMD5]; ok && val != "" {
sDec, err := b64.StdEncoding.DecodeString(val) sDec, err := b64.StdEncoding.DecodeString(val)
if err != nil || len(sDec) != 16 { if err != nil || len(sDec) != 16 {
return azblob.BlobHTTPHeaders{}, fmt.Errorf("the MD5 value specified in Content MD5 is invalid, MD5 value must be 128 bits and base64 encoded") return blob.HTTPHeaders{}, fmt.Errorf("the MD5 value specified in Content MD5 is invalid, MD5 value must be 128 bits and base64 encoded")
} }
blobHTTPHeaders.ContentMD5 = sDec blobHTTPHeaders.BlobContentMD5 = sDec
delete(req.Metadata, contentMD5) delete(req.Metadata, contentMD5)
} }
if val, ok := req.Metadata[contentEncoding]; ok && val != "" { if val, ok := req.Metadata[contentEncoding]; ok && val != "" {
blobHTTPHeaders.ContentEncoding = val blobHTTPHeaders.BlobContentEncoding = &val
delete(req.Metadata, contentEncoding) delete(req.Metadata, contentEncoding)
} }
if val, ok := req.Metadata[contentLanguage]; ok && val != "" { if val, ok := req.Metadata[contentLanguage]; ok && val != "" {
blobHTTPHeaders.ContentLanguage = val blobHTTPHeaders.BlobContentLanguage = &val
delete(req.Metadata, contentLanguage) delete(req.Metadata, contentLanguage)
} }
if val, ok := req.Metadata[contentDisposition]; ok && val != "" { if val, ok := req.Metadata[contentDisposition]; ok && val != "" {
blobHTTPHeaders.ContentDisposition = val blobHTTPHeaders.BlobContentDisposition = &val
delete(req.Metadata, contentDisposition) delete(req.Metadata, contentDisposition)
} }
if val, ok := req.Metadata[cacheControl]; ok && val != "" { if val, ok := req.Metadata[cacheControl]; ok && val != "" {
blobHTTPHeaders.CacheControl = val blobHTTPHeaders.BlobCacheControl = &val
delete(req.Metadata, cacheControl) delete(req.Metadata, cacheControl)
} }
return blobHTTPHeaders, nil return blobHTTPHeaders, nil
} }
func (r *StateStore) deleteFile(ctx context.Context, req *state.DeleteRequest) error { func (r *StateStore) deleteFile(ctx context.Context, req *state.DeleteRequest) error {
blobURL := r.containerURL.NewBlockBlobURL(getFileName(req.Key)) blockBlobClient := r.containerClient.NewBlockBlobClient(getFileName(req.Key))
accessConditions := azblob.BlobAccessConditions{}
modifiedAccessConditions := blob.ModifiedAccessConditions{}
if req.ETag != nil && *req.ETag != "" { if req.ETag != nil && *req.ETag != "" {
accessConditions.IfMatch = azblob.ETag(*req.ETag) modifiedAccessConditions.IfMatch = ptr.Of(azcore.ETag(*req.ETag))
} }
_, err := blobURL.Delete(ctx, azblob.DeleteSnapshotsOptionNone, accessConditions) deleteOptions := blob.DeleteOptions{
DeleteSnapshots: nil,
AccessConditions: &blob.AccessConditions{
ModifiedAccessConditions: &modifiedAccessConditions,
},
}
_, err := blockBlobClient.Delete(ctx, &deleteOptions)
if err != nil { if err != nil {
r.logger.Debugf("delete file %s, err %s", req.Key, err) r.logger.Debugf("delete file %s, err %s", req.Key, err)
@ -349,13 +405,9 @@ func (r *StateStore) marshal(req *state.SetRequest) []byte {
} }
func isNotFoundError(err error) bool { func isNotFoundError(err error) bool {
azureError, ok := err.(azblob.StorageError) return bloberror.HasCode(err, bloberror.BlobNotFound)
return ok && azureError.ServiceCode() == azblob.ServiceCodeBlobNotFound
} }
func isETagConflictError(err error) bool { func isETagConflictError(err error) bool {
azureError, ok := err.(azblob.StorageError) return bloberror.HasCode(err, bloberror.ConditionNotMet)
return ok && azureError.ServiceCode() == azblob.ServiceCodeConditionNotMet
} }

View File

@ -34,8 +34,7 @@ func TestInit(t *testing.T) {
} }
err := s.Init(m) err := s.Init(m)
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, "acc.blob.core.windows.net", s.containerURL.URL().Host) assert.Equal(t, "https://acc.blob.core.windows.net/dapr", s.containerClient.URL())
assert.Equal(t, "/dapr", s.containerURL.URL().Path)
}) })
t.Run("Init with missing metadata", func(t *testing.T) { t.Run("Init with missing metadata", func(t *testing.T) {
@ -53,7 +52,8 @@ func TestInit(t *testing.T) {
"accountKey": "e+Dnvl8EOxYxV94nurVaRQ==", "accountKey": "e+Dnvl8EOxYxV94nurVaRQ==",
"containerName": "dapr", "containerName": "dapr",
} }
err := s.Init(m) s.Init(m)
err := s.Ping()
assert.NotNil(t, err) assert.NotNil(t, err)
}) })
} }
@ -100,7 +100,7 @@ func TestBlobHTTPHeaderGeneration(t *testing.T) {
blobHeaders, err := s.createBlobHTTPHeadersFromRequest(req) blobHeaders, err := s.createBlobHTTPHeadersFromRequest(req)
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, "application/json", blobHeaders.ContentType) assert.Equal(t, "application/json", *blobHeaders.BlobContentType)
}) })
t.Run("Content type and metadata provided (conflict), content type chosen", func(t *testing.T) { t.Run("Content type and metadata provided (conflict), content type chosen", func(t *testing.T) {
contentType := "application/json" contentType := "application/json"
@ -113,7 +113,7 @@ func TestBlobHTTPHeaderGeneration(t *testing.T) {
blobHeaders, err := s.createBlobHTTPHeadersFromRequest(req) blobHeaders, err := s.createBlobHTTPHeadersFromRequest(req)
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, "application/json", blobHeaders.ContentType) assert.Equal(t, "application/json", *blobHeaders.BlobContentType)
}) })
t.Run("ContentType not provided, metadata provided set backward compatibility", func(t *testing.T) { t.Run("ContentType not provided, metadata provided set backward compatibility", func(t *testing.T) {
req := &state.SetRequest{ req := &state.SetRequest{
@ -124,6 +124,6 @@ func TestBlobHTTPHeaderGeneration(t *testing.T) {
blobHeaders, err := s.createBlobHTTPHeadersFromRequest(req) blobHeaders, err := s.createBlobHTTPHeadersFromRequest(req)
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, "text/plain", blobHeaders.ContentType) assert.Equal(t, "text/plain", *blobHeaders.BlobContentType)
}) })
} }

View File

@ -19,6 +19,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.4 // indirect github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.4 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.5.1 // indirect
github.com/Azure/azure-storage-blob-go v0.10.0 // indirect github.com/Azure/azure-storage-blob-go v0.10.0 // indirect
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd // indirect github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect github.com/Azure/go-autorest v14.2.0+incompatible // indirect

View File

@ -47,6 +47,8 @@ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 h1:QkAcEIAKbNL4KoFr4Sath
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0= github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1 h1:XUNQ4mw+zJmaA2KXzP9JlQiecy1SI+Eog7xVkPiqIbg= github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1 h1:XUNQ4mw+zJmaA2KXzP9JlQiecy1SI+Eog7xVkPiqIbg=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w= github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.1/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.5.1 h1:BMTdr+ib5ljLa9MxTJK8x/Ds0MbBb4MfuW5BL0zMJnI=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.5.1/go.mod h1:c6WvOhtmjNUWbLfOG1qxM/q0SPvQNSVJvolm+C52dIU=
github.com/Azure/azure-storage-blob-go v0.10.0 h1:evCwGreYo3XLeBV4vSxLbLiYb6e0SzsJiXQVRGsRXxs= github.com/Azure/azure-storage-blob-go v0.10.0 h1:evCwGreYo3XLeBV4vSxLbLiYb6e0SzsJiXQVRGsRXxs=
github.com/Azure/azure-storage-blob-go v0.10.0/go.mod h1:ep1edmW+kNQx4UfWM9heESNmQdijykocJ0YOxmMX8SE= github.com/Azure/azure-storage-blob-go v0.10.0/go.mod h1:ep1edmW+kNQx4UfWM9heESNmQdijykocJ0YOxmMX8SE=
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd h1:b3wyxBl3vvr15tUAziPBPK354y+LSdfPCpex5oBttHo= github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd h1:b3wyxBl3vvr15tUAziPBPK354y+LSdfPCpex5oBttHo=