Add blob storage as state store (#348)
* Add blob storage as state store * Updated comments for blob storage Co-authored-by: Young Bu Park <youngp@microsoft.com> Co-authored-by: Yaron Schneider <yaronsc@microsoft.com>
This commit is contained in:
parent
cf3ee2a46a
commit
6085834d79
|
|
@ -0,0 +1,263 @@
|
|||
// ------------------------------------------------------------
|
||||
// Copyright (c) Microsoft Corporation.
|
||||
// Licensed under the MIT License.
|
||||
// ------------------------------------------------------------
|
||||
|
||||
/*
|
||||
Azure Blob Storage state store.
|
||||
|
||||
Sample configuration in yaml:
|
||||
|
||||
apiVersion: dapr.io/v1alpha1
|
||||
kind: Component
|
||||
metadata:
|
||||
name: statestore
|
||||
spec:
|
||||
type: state.azure.blobstorage
|
||||
metadata:
|
||||
- name: accountName
|
||||
value: <storage account name>
|
||||
- name: accountKey
|
||||
value: <key>
|
||||
- name: containerName
|
||||
value: <container Name>
|
||||
|
||||
Concurrency is supported with ETags according to https://docs.microsoft.com/en-us/azure/storage/common/storage-concurrency#managing-concurrency-in-blob-storage
|
||||
*/
|
||||
|
||||
package blobstorage
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
"github.com/Azure/azure-storage-blob-go/azblob"
|
||||
"github.com/dapr/components-contrib/state"
|
||||
"github.com/dapr/dapr/pkg/logger"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
)
|
||||
|
||||
const (
|
||||
keyDelimiter = "||"
|
||||
accountNameKey = "accountName"
|
||||
accountKeyKey = "accountKey"
|
||||
containerNameKey = "containerName"
|
||||
)
|
||||
|
||||
// StateStore Type
|
||||
type StateStore struct {
|
||||
containerURL azblob.ContainerURL
|
||||
json jsoniter.API
|
||||
|
||||
logger logger.Logger
|
||||
}
|
||||
|
||||
type blobStorageMetadata struct {
|
||||
accountName string
|
||||
accountKey string
|
||||
containerName string
|
||||
}
|
||||
|
||||
// Init the connection to blob storage, optionally creates a blob container if it doesn't exist.
|
||||
func (r *StateStore) Init(metadata state.Metadata) error {
|
||||
meta, err := getBlobStorageMetadata(metadata.Properties)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
credential, err := azblob.NewSharedKeyCredential(meta.accountName, meta.accountKey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid credentials with error: %s", err.Error())
|
||||
}
|
||||
|
||||
p := azblob.NewPipeline(credential, azblob.PipelineOptions{})
|
||||
|
||||
URL, _ := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net/%s", meta.accountName, meta.containerName))
|
||||
containerURL := azblob.NewContainerURL(*URL, p)
|
||||
|
||||
ctx := context.Background()
|
||||
_, err = containerURL.Create(ctx, azblob.Metadata{}, azblob.PublicAccessNone)
|
||||
r.logger.Debugf("error creating container: %s", err)
|
||||
|
||||
r.containerURL = containerURL
|
||||
r.logger.Debugf("using container '%s'", meta.containerName)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete the state
|
||||
func (r *StateStore) Delete(req *state.DeleteRequest) error {
|
||||
r.logger.Debugf("delete %s", req.Key)
|
||||
return r.deleteFile(req)
|
||||
}
|
||||
|
||||
// BulkDelete the state
|
||||
func (r *StateStore) BulkDelete(req []state.DeleteRequest) error {
|
||||
r.logger.Debugf("bulk delete %v key(s)", len(req))
|
||||
for _, s := range req {
|
||||
err := r.Delete(&s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get the state
|
||||
func (r *StateStore) Get(req *state.GetRequest) (*state.GetResponse, error) {
|
||||
r.logger.Debugf("fetching %s", req.Key)
|
||||
data, etag, err := r.readFile(req)
|
||||
if err != nil {
|
||||
r.logger.Debugf("error %s", err)
|
||||
|
||||
if isNotFoundError(err) {
|
||||
return &state.GetResponse{}, nil
|
||||
}
|
||||
|
||||
return &state.GetResponse{}, err
|
||||
}
|
||||
|
||||
return &state.GetResponse{
|
||||
Data: data,
|
||||
ETag: etag,
|
||||
}, err
|
||||
}
|
||||
|
||||
// Set the state
|
||||
func (r *StateStore) Set(req *state.SetRequest) error {
|
||||
r.logger.Debugf("saving %s", req.Key)
|
||||
return r.writeFile(req)
|
||||
}
|
||||
|
||||
// BulkSet the state
|
||||
func (r *StateStore) BulkSet(req []state.SetRequest) error {
|
||||
r.logger.Debugf("bulk set %v key(s)", len(req))
|
||||
|
||||
for _, s := range req {
|
||||
err := r.Set(&s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewAzureBlobStorageStore instance
|
||||
func NewAzureBlobStorageStore(logger logger.Logger) *StateStore {
|
||||
return &StateStore{
|
||||
json: jsoniter.ConfigFastest,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
func getBlobStorageMetadata(metadata map[string]string) (*blobStorageMetadata, error) {
|
||||
meta := blobStorageMetadata{}
|
||||
|
||||
if val, ok := metadata[accountNameKey]; ok && val != "" {
|
||||
meta.accountName = val
|
||||
} else {
|
||||
return nil, fmt.Errorf("missing or empty %s field from metadata", accountNameKey)
|
||||
}
|
||||
|
||||
if val, ok := metadata[accountKeyKey]; ok && val != "" {
|
||||
meta.accountKey = val
|
||||
} else {
|
||||
return nil, fmt.Errorf("missing of empty %s field from metadata", accountKeyKey)
|
||||
}
|
||||
|
||||
if val, ok := metadata[containerNameKey]; ok && val != "" {
|
||||
meta.containerName = val
|
||||
} else {
|
||||
return nil, fmt.Errorf("missing of empty %s field from metadata", containerNameKey)
|
||||
}
|
||||
|
||||
return &meta, nil
|
||||
}
|
||||
|
||||
func (r *StateStore) readFile(req *state.GetRequest) ([]byte, string, error) {
|
||||
blobURL := r.containerURL.NewBlockBlobURL(getFileName(req.Key))
|
||||
|
||||
resp, err := blobURL.Download(context.Background(), 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false)
|
||||
if err != nil {
|
||||
r.logger.Debugf("download file %s, err %s", req.Key, err)
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
bodyStream := resp.Body(azblob.RetryReaderOptions{})
|
||||
data := bytes.Buffer{}
|
||||
_, err = data.ReadFrom(bodyStream)
|
||||
|
||||
if err != nil {
|
||||
r.logger.Debugf("read file %s, err %s", req.Key, err)
|
||||
return nil, "", err
|
||||
}
|
||||
return data.Bytes(), string(resp.ETag()), nil
|
||||
}
|
||||
|
||||
func (r *StateStore) writeFile(req *state.SetRequest) error {
|
||||
blobURL := r.containerURL.NewBlockBlobURL(getFileName(req.Key))
|
||||
|
||||
accessConditions := azblob.BlobAccessConditions{}
|
||||
|
||||
if req.Options.Concurrency == state.LastWrite {
|
||||
accessConditions.IfMatch = azblob.ETag(req.ETag)
|
||||
}
|
||||
|
||||
_, err := azblob.UploadBufferToBlockBlob(context.Background(), r.marshal(req), blobURL, azblob.UploadToBlockBlobOptions{
|
||||
Parallelism: 16,
|
||||
Metadata: req.Metadata,
|
||||
AccessConditions: accessConditions,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
r.logger.Debugf("write file %s, err %s", req.Key, err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *StateStore) deleteFile(req *state.DeleteRequest) error {
|
||||
blobURL := r.containerURL.NewBlockBlobURL(getFileName((req.Key)))
|
||||
accessConditions := azblob.BlobAccessConditions{}
|
||||
|
||||
if req.Options.Concurrency == state.LastWrite {
|
||||
accessConditions.IfMatch = azblob.ETag(req.ETag)
|
||||
}
|
||||
|
||||
_, err := blobURL.Delete(context.Background(), azblob.DeleteSnapshotsOptionNone, accessConditions)
|
||||
|
||||
if err != nil {
|
||||
r.logger.Debugf("delete file %s, err %s", req.Key, err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getFileName(key string) string {
|
||||
pr := strings.Split(key, keyDelimiter)
|
||||
if len(pr) != 2 {
|
||||
return pr[0]
|
||||
}
|
||||
return pr[1]
|
||||
}
|
||||
|
||||
func (r *StateStore) marshal(req *state.SetRequest) []byte {
|
||||
var v string
|
||||
b, ok := req.Value.([]byte)
|
||||
if ok {
|
||||
v = string(b)
|
||||
} else {
|
||||
v, _ = jsoniter.MarshalToString(req.Value)
|
||||
}
|
||||
return []byte(v)
|
||||
}
|
||||
|
||||
func isNotFoundError(err error) bool {
|
||||
azureError, ok := err.(azblob.StorageError)
|
||||
return ok && azureError.ServiceCode() == azblob.ServiceCodeBlobNotFound
|
||||
}
|
||||
|
|
@ -0,0 +1,74 @@
|
|||
// ------------------------------------------------------------
|
||||
// Copyright (c) Microsoft Corporation.
|
||||
// Licensed under the MIT License.
|
||||
// ------------------------------------------------------------
|
||||
|
||||
package blobstorage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/dapr/components-contrib/state"
|
||||
"github.com/dapr/dapr/pkg/logger"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestInit(t *testing.T) {
|
||||
m := state.Metadata{}
|
||||
s := NewAzureBlobStorageStore(logger.NewLogger("logger"))
|
||||
t.Run("Init with valid metadata", func(t *testing.T) {
|
||||
m.Properties = map[string]string{
|
||||
"accountName": "acc",
|
||||
"accountKey": "e+Dnvl8EOxYxV94nurVaRQ==",
|
||||
"containerName": "dapr",
|
||||
}
|
||||
err := s.Init(m)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, "acc.blob.core.windows.net", s.containerURL.URL().Host)
|
||||
assert.Equal(t, "/dapr", s.containerURL.URL().Path)
|
||||
})
|
||||
|
||||
t.Run("Init with missing metadata", func(t *testing.T) {
|
||||
m.Properties = map[string]string{
|
||||
"invalidValue": "a",
|
||||
}
|
||||
err := s.Init(m)
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, err, fmt.Errorf("missing or empty accountName field from metadata"))
|
||||
})
|
||||
}
|
||||
|
||||
func TestGetBlobStorageMetaData(t *testing.T) {
|
||||
t.Run("Nothing at all passed", func(t *testing.T) {
|
||||
m := make(map[string]string)
|
||||
_, err := getBlobStorageMetadata(m)
|
||||
|
||||
assert.NotNil(t, err)
|
||||
})
|
||||
|
||||
t.Run("All parameters passed and parsed", func(t *testing.T) {
|
||||
m := make(map[string]string)
|
||||
m["accountName"] = "acc"
|
||||
m["accountKey"] = "key"
|
||||
m["containerName"] = "dapr"
|
||||
meta, err := getBlobStorageMetadata(m)
|
||||
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, "acc", meta.accountName)
|
||||
assert.Equal(t, "key", meta.accountKey)
|
||||
assert.Equal(t, "dapr", meta.containerName)
|
||||
})
|
||||
}
|
||||
|
||||
func TestFileName(t *testing.T) {
|
||||
t.Run("Valid composite key", func(t *testing.T) {
|
||||
key := getFileName("app_id||key")
|
||||
assert.Equal(t, "key", key)
|
||||
})
|
||||
|
||||
t.Run("No delimiter present", func(t *testing.T) {
|
||||
key := getFileName("key")
|
||||
assert.Equal(t, "key", key)
|
||||
})
|
||||
}
|
||||
Loading…
Reference in New Issue