components-contrib/state/azure/tablestorage/tablestorage.go

421 lines
12 KiB
Go

/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Azure Table Storage state store.
Sample configuration in yaml:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
spec:
type: state.azure.tablestorage
metadata:
- name: accountName
value: <storage account name>
- name: accountKey
value: <key>
- name: tableName
value: <table name>
This store uses PartitionKey as service name, and RowKey as the rest of the composite key.
Concurrency is supported with ETags according to https://docs.microsoft.com/en-us/azure/storage/common/storage-concurrency#managing-concurrency-in-table-storage
*/
package tablestorage
import (
"context"
"errors"
"fmt"
"reflect"
"strings"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/data/aztables"
jsoniter "github.com/json-iterator/go"
azauth "github.com/dapr/components-contrib/common/authentication/azure"
mdutils "github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/state"
"github.com/dapr/kit/logger"
kitmd "github.com/dapr/kit/metadata"
"github.com/dapr/kit/ptr"
)
const (
keyDelimiter = "||"
valueEntityProperty = "Value"
cosmosDBModeKey = "cosmosDbMode"
timeout = 15 * time.Second
)
type StateStore struct {
state.BulkStore
client *aztables.Client
json jsoniter.API
cosmosDBMode bool
features []state.Feature
logger logger.Logger
}
type tablesMetadata struct {
AccountName string
AccountKey string // optional, if not provided, will use Azure AD authentication
TableName string
CosmosDBMode bool // if true, use CosmosDB Table API, otherwise use Azure Table Storage
ServiceURL string // optional, if not provided, will use default Azure service URL
SkipCreateTable bool // skip attempt to create table - useful for fine grained AAD roles
}
// Init Initialises connection to table storage, optionally creates a table if it doesn't exist.
func (r *StateStore) Init(ctx context.Context, metadata state.Metadata) error {
meta, err := getTablesMetadata(metadata.Properties)
if err != nil {
return err
}
var client *aztables.ServiceClient
r.cosmosDBMode = meta.CosmosDBMode
serviceURL := meta.ServiceURL
if serviceURL == "" {
if r.cosmosDBMode {
serviceURL = fmt.Sprintf("https://%s.table.cosmos.azure.com", meta.AccountName)
} else {
serviceURL = fmt.Sprintf("https://%s.table.core.windows.net", meta.AccountName)
}
}
opts := aztables.ClientOptions{
ClientOptions: policy.ClientOptions{
Telemetry: policy.TelemetryOptions{
ApplicationID: "dapr-" + logger.DaprVersion,
},
},
}
if meta.AccountKey != "" {
// use shared key authentication
cred, innerErr := aztables.NewSharedKeyCredential(meta.AccountName, meta.AccountKey)
if innerErr != nil {
return innerErr
}
client, innerErr = aztables.NewServiceClientWithSharedKey(serviceURL, cred, &opts)
if innerErr != nil {
return innerErr
}
} else {
// fallback to azure AD authentication
settings, innerErr := azauth.NewEnvironmentSettings(metadata.Properties)
if innerErr != nil {
return innerErr
}
token, innerErr := settings.GetTokenCredential()
if innerErr != nil {
return innerErr
}
client, innerErr = aztables.NewServiceClient(serviceURL, token, &opts)
if err != nil {
return innerErr
}
}
if !meta.SkipCreateTable {
createContext, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
_, innerErr := client.CreateTable(createContext, meta.TableName, nil)
if innerErr != nil {
if isTableAlreadyExistsError(innerErr) {
// error creating table, but it already exists so we're fine
r.logger.Debugf("table already exists")
} else {
return innerErr
}
}
}
r.client = client.NewClient(meta.TableName)
r.logger.Debugf("table initialised, account: %s, cosmosDbMode: %s, table: %s", meta.AccountName, meta.CosmosDBMode, meta.TableName)
return nil
}
// Features returns the features available in this state store.
func (r *StateStore) Features() []state.Feature {
return r.features
}
func (r *StateStore) Delete(ctx context.Context, req *state.DeleteRequest) error {
err := r.deleteRow(ctx, req)
if err != nil {
if req.HasETag() {
return state.NewETagError(state.ETagMismatch, err)
} else if isNotFoundError(err) {
// deleting an item that doesn't exist without specifying an ETAG is a noop
return nil
}
}
return err
}
func (r *StateStore) Get(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) {
pk, rk := getPartitionAndRowKey(req.Key, r.cosmosDBMode)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
resp, err := r.client.GetEntity(ctx, pk, rk, nil)
if err != nil {
if isNotFoundError(err) {
return &state.GetResponse{}, nil
}
return &state.GetResponse{}, err
}
data, etag, err := r.unmarshal(&resp)
return &state.GetResponse{
Data: data,
ETag: etag,
}, err
}
func (r *StateStore) Set(ctx context.Context, req *state.SetRequest) error {
return r.writeRow(ctx, req)
}
func (r *StateStore) GetComponentMetadata() (metadataInfo mdutils.MetadataMap) {
metadataStruct := tablesMetadata{}
mdutils.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, mdutils.StateStoreType)
return
}
func NewAzureTablesStateStore(logger logger.Logger) state.Store {
s := &StateStore{
json: jsoniter.ConfigFastest,
features: []state.Feature{state.FeatureETag},
logger: logger,
}
s.BulkStore = state.NewDefaultBulkStore(s)
return s
}
func getTablesMetadata(meta map[string]string) (*tablesMetadata, error) {
m := tablesMetadata{}
err := kitmd.DecodeMetadata(meta, &m)
if val, ok := mdutils.GetMetadataProperty(meta, azauth.MetadataKeys["StorageAccountName"]...); ok && val != "" {
m.AccountName = val
} else {
return nil, fmt.Errorf("missing or empty %s field from metadata", azauth.MetadataKeys["StorageAccountName"][0])
}
// Can be empty (such as when using Azure AD for auth)
m.AccountKey, _ = mdutils.GetMetadataProperty(meta, azauth.MetadataKeys["StorageAccountKey"]...)
if val, ok := mdutils.GetMetadataProperty(meta, azauth.MetadataKeys["StorageTableName"]...); ok && val != "" {
m.TableName = val
} else {
return nil, fmt.Errorf("missing or empty %s field from metadata", azauth.MetadataKeys["StorageTableName"][0])
}
return &m, err
}
func (r *StateStore) writeRow(ctx context.Context, req *state.SetRequest) error {
marshalledEntity, err := r.marshal(req)
if err != nil {
return err
}
writeContext, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
// InsertOrReplace does not support ETag concurrency, therefore we will use Insert to check for key existence
// and then use Update to update the key if it exists with the specified ETag
_, err = r.client.AddEntity(writeContext, marshalledEntity, nil)
if err != nil {
// If Insert failed because item already exists, try to Update instead per Upsert semantics
if isEntityAlreadyExistsError(err) {
updateContext, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
// Always Update using the etag when provided even if Concurrency != FirstWrite.
// Today the presence of etag takes precedence over Concurrency.
// In the future #2739 will impose a breaking change which must disallow the use of etag when not using FirstWrite.
if req.HasETag() {
_, err = r.client.UpdateEntity(updateContext, marshalledEntity, &aztables.UpdateEntityOptions{
IfMatch: ptr.Of(azcore.ETag(*req.ETag)),
UpdateMode: aztables.UpdateModeReplace,
})
if err != nil {
if isPreconditionFailedError(err) {
return state.NewETagError(state.ETagMismatch, err)
}
return err
}
} else if req.Options.Concurrency == state.FirstWrite {
// Otherwise, if FirstWrite was set, but no etag was provided for an Update operation
// explicitly flag it as an error.
// entity.Update itself does not flag the test case as a mismatch as it does not distinguish
// between nil and "" etags, the initial etag will always be "", which would match on update.
return state.NewETagError(state.ETagMismatch, errors.New("update with Concurrency.FirstWrite without ETag"))
} else {
// Finally, last write semantics without ETag should always perform a force update.
_, err = r.client.UpdateEntity(updateContext, marshalledEntity, &aztables.UpdateEntityOptions{
IfMatch: nil, // this is the same as "*" matching all ETags
UpdateMode: aztables.UpdateModeReplace,
})
if err != nil {
return err
}
}
} else {
// Any other unexpected error on Insert is propagated to the caller
return err
}
}
return nil
}
func isNotFoundError(err error) bool {
var respErr *azcore.ResponseError
if errors.As(err, &respErr) {
return (respErr.ErrorCode == string(aztables.ResourceNotFound)) || (respErr.ErrorCode == string(aztables.EntityNotFound))
}
return false
}
func isPreconditionFailedError(err error) bool {
var respErr *azcore.ResponseError
if errors.As(err, &respErr) {
return respErr.ErrorCode == string(aztables.UpdateConditionNotSatisfied)
}
return false
}
func isEntityAlreadyExistsError(err error) bool {
var respErr *azcore.ResponseError
if errors.As(err, &respErr) {
return respErr.ErrorCode == string(aztables.EntityAlreadyExists)
}
return false
}
func isTableAlreadyExistsError(err error) bool {
var respErr *azcore.ResponseError
if errors.As(err, &respErr) {
return respErr.ErrorCode == string(aztables.TableAlreadyExists)
}
return false
}
func (r *StateStore) deleteRow(ctx context.Context, req *state.DeleteRequest) error {
pk, rk := getPartitionAndRowKey(req.Key, r.cosmosDBMode)
deleteContext, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
if req.HasETag() {
_, err := r.client.DeleteEntity(deleteContext, pk, rk, &aztables.DeleteEntityOptions{
IfMatch: ptr.Of(azcore.ETag(*req.ETag)),
})
if err != nil {
if isPreconditionFailedError(err) {
return state.NewETagError(state.ETagMismatch, err)
}
return err
}
return nil
}
all := azcore.ETagAny
_, err := r.client.DeleteEntity(deleteContext, pk, rk, &aztables.DeleteEntityOptions{IfMatch: &all})
return err
}
func getPartitionAndRowKey(key string, cosmosDBmode bool) (string, string) {
pr := strings.Split(key, keyDelimiter)
if len(pr) != 2 {
if cosmosDBmode {
return pr[0], "_dapr_empty_row_key_value_"
} else {
return pr[0], ""
}
}
return pr[0], pr[1]
}
func (r *StateStore) marshal(req *state.SetRequest) ([]byte, error) {
var value string
b, ok := req.Value.([]byte)
if ok {
value = string(b)
} else {
value, _ = jsoniter.MarshalToString(req.Value)
}
pk, rk := getPartitionAndRowKey(req.Key, r.cosmosDBMode)
entity := aztables.EDMEntity{
Entity: aztables.Entity{
PartitionKey: pk,
RowKey: rk,
},
Properties: map[string]interface{}{
valueEntityProperty: value,
},
}
if req.HasETag() {
entity.ETag = *req.ETag
}
marshalled, err := jsoniter.Marshal(entity)
return marshalled, err
}
func (r *StateStore) unmarshal(row *aztables.GetEntityResponse) ([]byte, *string, error) {
var myEntity aztables.EDMEntity
err := jsoniter.Unmarshal(row.Value, &myEntity)
if err != nil {
return nil, nil, err
}
raw := myEntity.Properties[valueEntityProperty]
// value column not present
if raw == nil {
return nil, nil, nil
}
// must be a string
sv, ok := raw.(string)
if !ok {
return nil, nil, fmt.Errorf("expected string in column '%s'", valueEntityProperty)
}
// use native ETag
return []byte(sv), &myEntity.ETag, nil
}