Initial implementation (#116)
Co-authored-by: Artur Souza <artursouza.ms@outlook.com>
This commit is contained in:
		
							parent
							
								
									b210ea0e7f
								
							
						
					
					
						commit
						703f9bf04e
					
				|  | @ -4,6 +4,7 @@ State Stores provide a common way to interact with different data store implemen | |||
| 
 | ||||
| Currently supported state stores are: | ||||
| 
 | ||||
| * AWS DynamoDB | ||||
| * Azure CosmosDB | ||||
| * Azure Table Storage | ||||
| * Cassandra | ||||
|  | @ -18,6 +19,8 @@ Currently supported state stores are: | |||
| * Redis | ||||
| * SQL Server | ||||
| * Zookeeper | ||||
| * Cloud Firestore (Datastore mode) | ||||
| * Couchbase | ||||
| 
 | ||||
| ## Implementing a new State Store | ||||
| 
 | ||||
|  |  | |||
|  | @ -0,0 +1,168 @@ | |||
| package dynamodb | ||||
| 
 | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute" | ||||
| 	"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface" | ||||
| 
 | ||||
| 	"github.com/aws/aws-sdk-go/aws" | ||||
| 	"github.com/aws/aws-sdk-go/aws/credentials" | ||||
| 	"github.com/aws/aws-sdk-go/aws/session" | ||||
| 	"github.com/aws/aws-sdk-go/service/dynamodb" | ||||
| 	"github.com/dapr/components-contrib/state" | ||||
| ) | ||||
| 
 | ||||
| // StateStore is a DynamoDB state store
 | ||||
| type StateStore struct { | ||||
| 	client dynamodbiface.DynamoDBAPI | ||||
| 	table  string | ||||
| } | ||||
| 
 | ||||
| type dynamoDBMetadata struct { | ||||
| 	Region       string `json:"region"` | ||||
| 	AccessKey    string `json:"accessKey"` | ||||
| 	SecretKey    string `json:"secretKey"` | ||||
| 	SessionToken string `json:"sessionToken"` | ||||
| 	Table        string `json:"table"` | ||||
| } | ||||
| 
 | ||||
| // NewDynamoDBStateStore returns a new dynamoDB state store
 | ||||
| func NewDynamoDBStateStore() *StateStore { | ||||
| 	return &StateStore{} | ||||
| } | ||||
| 
 | ||||
| // Init does metadata and connection parsing
 | ||||
| func (d *StateStore) Init(metadata state.Metadata) error { | ||||
| 	meta, err := d.getDynamoDBMetadata(metadata) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	client, err := d.getClient(meta) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	d.client = client | ||||
| 	d.table = meta.Table | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // Get retrieves a dynamoDB item
 | ||||
| func (d *StateStore) Get(req *state.GetRequest) (*state.GetResponse, error) { | ||||
| 	input := &dynamodb.GetItemInput{ | ||||
| 		ConsistentRead: aws.Bool(req.Options.Consistency == state.Strong), | ||||
| 		TableName:      aws.String(d.table), | ||||
| 		Key: map[string]*dynamodb.AttributeValue{ | ||||
| 			"key": { | ||||
| 				S: aws.String(req.Key), | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 
 | ||||
| 	result, err := d.client.GetItem(input) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	if len(result.Item) == 0 { | ||||
| 		return &state.GetResponse{}, nil | ||||
| 	} | ||||
| 
 | ||||
| 	var output string | ||||
| 	if err = dynamodbattribute.Unmarshal(result.Item["value"], &output); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return &state.GetResponse{ | ||||
| 		Data: []byte(output), | ||||
| 	}, nil | ||||
| } | ||||
| 
 | ||||
| // Set saves a dynamoDB item
 | ||||
| func (d *StateStore) Set(req *state.SetRequest) error { | ||||
| 	item := map[string]*dynamodb.AttributeValue{ | ||||
| 		"key": { | ||||
| 			S: aws.String(req.Key), | ||||
| 		}, | ||||
| 		"value": { | ||||
| 			S: aws.String(fmt.Sprintf("%v", req.Value)), | ||||
| 		}, | ||||
| 	} | ||||
| 
 | ||||
| 	input := &dynamodb.PutItemInput{ | ||||
| 		Item:      item, | ||||
| 		TableName: &d.table, | ||||
| 	} | ||||
| 
 | ||||
| 	_, e := d.client.PutItem(input) | ||||
| 	return e | ||||
| } | ||||
| 
 | ||||
| // BulkSet performs a bulk set operation
 | ||||
| func (d *StateStore) BulkSet(req []state.SetRequest) error { | ||||
| 	for _, r := range req { | ||||
| 		err := d.Set(&r) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // Delete performs a delete operation
 | ||||
| func (d *StateStore) Delete(req *state.DeleteRequest) error { | ||||
| 	input := &dynamodb.DeleteItemInput{ | ||||
| 		Key: map[string]*dynamodb.AttributeValue{ | ||||
| 			"key": { | ||||
| 				S: aws.String(req.Key), | ||||
| 			}, | ||||
| 		}, | ||||
| 		TableName: aws.String(d.table), | ||||
| 	} | ||||
| 	_, err := d.client.DeleteItem(input) | ||||
| 	return err | ||||
| } | ||||
| 
 | ||||
| // BulkDelete performs a bulk delete operation
 | ||||
| func (d *StateStore) BulkDelete(req []state.DeleteRequest) error { | ||||
| 	for _, r := range req { | ||||
| 		err := d.Delete(&r) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (d *StateStore) getDynamoDBMetadata(metadata state.Metadata) (*dynamoDBMetadata, error) { | ||||
| 	b, err := json.Marshal(metadata.Properties) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	var meta dynamoDBMetadata | ||||
| 	err = json.Unmarshal(b, &meta) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	if meta.SecretKey == "" || meta.AccessKey == "" || meta.Region == "" || meta.SessionToken == "" { | ||||
| 		return nil, fmt.Errorf("missing aws credentials in metadata") | ||||
| 	} | ||||
| 
 | ||||
| 	return &meta, nil | ||||
| } | ||||
| 
 | ||||
| func (d *StateStore) getClient(meta *dynamoDBMetadata) (*dynamodb.DynamoDB, error) { | ||||
| 	sess, err := session.NewSession(&aws.Config{ | ||||
| 		Region:      aws.String(meta.Region), | ||||
| 		Credentials: credentials.NewStaticCredentials(meta.AccessKey, meta.SecretKey, meta.SessionToken), | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	c := dynamodb.New(sess) | ||||
| 	return c, nil | ||||
| } | ||||
|  | @ -0,0 +1,247 @@ | |||
| // ------------------------------------------------------------
 | ||||
| // Copyright (c) Microsoft Corporation.
 | ||||
| // Licensed under the MIT License.
 | ||||
| // ------------------------------------------------------------
 | ||||
| package dynamodb | ||||
| 
 | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"github.com/aws/aws-sdk-go/aws" | ||||
| 	"github.com/aws/aws-sdk-go/service/dynamodb" | ||||
| 	"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface" | ||||
| 	"github.com/dapr/components-contrib/state" | ||||
| 	"testing" | ||||
| 
 | ||||
| 	"github.com/stretchr/testify/assert" | ||||
| ) | ||||
| 
 | ||||
| type mockedDynamoDB struct { | ||||
| 	GetItemFn    func(input *dynamodb.GetItemInput) (*dynamodb.GetItemOutput, error) | ||||
| 	PutItemFn    func(input *dynamodb.PutItemInput) (*dynamodb.PutItemOutput, error) | ||||
| 	DeleteItemFn func(input *dynamodb.DeleteItemInput) (*dynamodb.DeleteItemOutput, error) | ||||
| 	dynamodbiface.DynamoDBAPI | ||||
| } | ||||
| 
 | ||||
| func (m *mockedDynamoDB) GetItem(input *dynamodb.GetItemInput) (*dynamodb.GetItemOutput, error) { | ||||
| 	return m.GetItemFn(input) | ||||
| } | ||||
| 
 | ||||
| func (m *mockedDynamoDB) PutItem(input *dynamodb.PutItemInput) (*dynamodb.PutItemOutput, error) { | ||||
| 	return m.PutItemFn(input) | ||||
| } | ||||
| 
 | ||||
| func (m *mockedDynamoDB) DeleteItem(input *dynamodb.DeleteItemInput) (*dynamodb.DeleteItemOutput, error) { | ||||
| 	return m.DeleteItemFn(input) | ||||
| } | ||||
| 
 | ||||
| func TestInit(t *testing.T) { | ||||
| 	m := state.Metadata{} | ||||
| 	s := NewDynamoDBStateStore() | ||||
| 	t.Run("Init with valid metadata", func(t *testing.T) { | ||||
| 		m.Properties = map[string]string{ | ||||
| 			"AccessKey":    "a", | ||||
| 			"Region":       "a", | ||||
| 			"SecretKey":    "a", | ||||
| 			"SessionToken": "a", | ||||
| 		} | ||||
| 		err := s.Init(m) | ||||
| 		assert.Nil(t, err) | ||||
| 	}) | ||||
| 
 | ||||
| 	t.Run("Init with missing metadata", func(t *testing.T) { | ||||
| 		m.Properties = map[string]string{ | ||||
| 			"Dummy": "a", | ||||
| 		} | ||||
| 		err := s.Init(m) | ||||
| 		assert.NotNil(t, err) | ||||
| 		assert.Equal(t, err, fmt.Errorf("missing aws credentials in metadata")) | ||||
| 	}) | ||||
| } | ||||
| 
 | ||||
| func TestGet(t *testing.T) { | ||||
| 	t.Run("Successfully retrieve item", func(t *testing.T) { | ||||
| 		ss := StateStore{ | ||||
| 			client: &mockedDynamoDB{ | ||||
| 				GetItemFn: func(input *dynamodb.GetItemInput) (output *dynamodb.GetItemOutput, err error) { | ||||
| 					return &dynamodb.GetItemOutput{ | ||||
| 						Item: map[string]*dynamodb.AttributeValue{ | ||||
| 							"key": { | ||||
| 								S: aws.String("key"), | ||||
| 							}, | ||||
| 							"value": { | ||||
| 								S: aws.String("value"), | ||||
| 							}, | ||||
| 						}, | ||||
| 					}, nil | ||||
| 				}, | ||||
| 			}, | ||||
| 		} | ||||
| 		req := &state.GetRequest{ | ||||
| 			Key:      "key", | ||||
| 			Metadata: nil, | ||||
| 			Options: state.GetStateOption{ | ||||
| 				Consistency: "strong", | ||||
| 			}, | ||||
| 		} | ||||
| 		out, err := ss.Get(req) | ||||
| 		assert.Nil(t, err) | ||||
| 		assert.Equal(t, []byte("value"), out.Data) | ||||
| 	}) | ||||
| 	t.Run("Unsuccessfully get item", func(t *testing.T) { | ||||
| 		ss := StateStore{ | ||||
| 			client: &mockedDynamoDB{ | ||||
| 				GetItemFn: func(input *dynamodb.GetItemInput) (output *dynamodb.GetItemOutput, err error) { | ||||
| 					return nil, fmt.Errorf("failed to retrieve data") | ||||
| 				}, | ||||
| 			}, | ||||
| 		} | ||||
| 		req := &state.GetRequest{ | ||||
| 			Key:      "key", | ||||
| 			Metadata: nil, | ||||
| 			Options: state.GetStateOption{ | ||||
| 				Consistency: "strong", | ||||
| 			}, | ||||
| 		} | ||||
| 		out, err := ss.Get(req) | ||||
| 		assert.NotNil(t, err) | ||||
| 		assert.Nil(t, out) | ||||
| 	}) | ||||
| 	t.Run("Unsuccessfully with empty response", func(t *testing.T) { | ||||
| 		ss := StateStore{ | ||||
| 			client: &mockedDynamoDB{ | ||||
| 				GetItemFn: func(input *dynamodb.GetItemInput) (output *dynamodb.GetItemOutput, err error) { | ||||
| 					return &dynamodb.GetItemOutput{ | ||||
| 						Item: map[string]*dynamodb.AttributeValue{}, | ||||
| 					}, nil | ||||
| 				}, | ||||
| 			}, | ||||
| 		} | ||||
| 		req := &state.GetRequest{ | ||||
| 			Key:      "key", | ||||
| 			Metadata: nil, | ||||
| 			Options: state.GetStateOption{ | ||||
| 				Consistency: "strong", | ||||
| 			}, | ||||
| 		} | ||||
| 		out, err := ss.Get(req) | ||||
| 		assert.Nil(t, err) | ||||
| 		assert.Nil(t, out.Data) | ||||
| 	}) | ||||
| 	t.Run("Unsuccessfully with no required key", func(t *testing.T) { | ||||
| 		ss := StateStore{ | ||||
| 			client: &mockedDynamoDB{ | ||||
| 				GetItemFn: func(input *dynamodb.GetItemInput) (output *dynamodb.GetItemOutput, err error) { | ||||
| 					return &dynamodb.GetItemOutput{ | ||||
| 						Item: map[string]*dynamodb.AttributeValue{ | ||||
| 							"value2": { | ||||
| 								S: aws.String("value"), | ||||
| 							}, | ||||
| 						}, | ||||
| 					}, nil | ||||
| 				}, | ||||
| 			}, | ||||
| 		} | ||||
| 		req := &state.GetRequest{ | ||||
| 			Key:      "key", | ||||
| 			Metadata: nil, | ||||
| 			Options: state.GetStateOption{ | ||||
| 				Consistency: "strong", | ||||
| 			}, | ||||
| 		} | ||||
| 		out, err := ss.Get(req) | ||||
| 		assert.Nil(t, err) | ||||
| 		assert.Empty(t, out.Data) | ||||
| 	}) | ||||
| } | ||||
| 
 | ||||
| func TestSet(t *testing.T) { | ||||
| 	type value struct { | ||||
| 		Value string | ||||
| 	} | ||||
| 
 | ||||
| 	t.Run("Successfully set item", func(t *testing.T) { | ||||
| 		ss := StateStore{ | ||||
| 			client: &mockedDynamoDB{ | ||||
| 				PutItemFn: func(input *dynamodb.PutItemInput) (output *dynamodb.PutItemOutput, err error) { | ||||
| 					assert.Equal(t, map[string]*dynamodb.AttributeValue{ | ||||
| 						"key": { | ||||
| 							S: aws.String("key"), | ||||
| 						}, | ||||
| 						"value": { | ||||
| 							S: aws.String("{value}"), | ||||
| 						}}, input.Item) | ||||
| 					return &dynamodb.PutItemOutput{ | ||||
| 						Attributes: map[string]*dynamodb.AttributeValue{ | ||||
| 							"key": { | ||||
| 								S: aws.String("value"), | ||||
| 							}, | ||||
| 						}, | ||||
| 					}, nil | ||||
| 				}, | ||||
| 			}, | ||||
| 		} | ||||
| 		req := &state.SetRequest{ | ||||
| 			Key: "key", | ||||
| 			Value: value{ | ||||
| 				Value: "value", | ||||
| 			}, | ||||
| 		} | ||||
| 		err := ss.Set(req) | ||||
| 		assert.Nil(t, err) | ||||
| 	}) | ||||
| 	t.Run("Un-successfully set item", func(t *testing.T) { | ||||
| 		ss := StateStore{ | ||||
| 			client: &mockedDynamoDB{ | ||||
| 				PutItemFn: func(input *dynamodb.PutItemInput) (output *dynamodb.PutItemOutput, err error) { | ||||
| 					return nil, fmt.Errorf("unable to put item") | ||||
| 				}, | ||||
| 			}, | ||||
| 		} | ||||
| 		req := &state.SetRequest{ | ||||
| 			Key: "key", | ||||
| 			Value: value{ | ||||
| 				Value: "value", | ||||
| 			}, | ||||
| 		} | ||||
| 		err := ss.Set(req) | ||||
| 		assert.NotNil(t, err) | ||||
| 	}) | ||||
| } | ||||
| 
 | ||||
| func TestDelete(t *testing.T) { | ||||
| 	t.Run("Successfully delete item", func(t *testing.T) { | ||||
| 		req := &state.DeleteRequest{ | ||||
| 			Key: "key", | ||||
| 		} | ||||
| 
 | ||||
| 		ss := StateStore{ | ||||
| 			client: &mockedDynamoDB{ | ||||
| 				DeleteItemFn: func(input *dynamodb.DeleteItemInput) (output *dynamodb.DeleteItemOutput, err error) { | ||||
| 					assert.Equal(t, map[string]*dynamodb.AttributeValue{ | ||||
| 						"key": { | ||||
| 							S: aws.String(req.Key), | ||||
| 						}, | ||||
| 					}, input.Key) | ||||
| 					return nil, nil | ||||
| 				}, | ||||
| 			}, | ||||
| 		} | ||||
| 		err := ss.Delete(req) | ||||
| 		assert.Nil(t, err) | ||||
| 	}) | ||||
| 
 | ||||
| 	t.Run("Un-successfully delete item", func(t *testing.T) { | ||||
| 		ss := StateStore{ | ||||
| 			client: &mockedDynamoDB{ | ||||
| 				DeleteItemFn: func(input *dynamodb.DeleteItemInput) (output *dynamodb.DeleteItemOutput, err error) { | ||||
| 					return nil, fmt.Errorf("unable to delete item") | ||||
| 				}, | ||||
| 			}, | ||||
| 		} | ||||
| 		req := &state.DeleteRequest{ | ||||
| 			Key: "key", | ||||
| 		} | ||||
| 		err := ss.Delete(req) | ||||
| 		assert.NotNil(t, err) | ||||
| 	}) | ||||
| } | ||||
		Loading…
	
		Reference in New Issue