Merge pull request #2389 from robertojrojas/metada-partition-key-2388

[AWS DynamoDB] Adds Support for Partition Key Metadata Field
This commit is contained in:
Bernd Verst 2023-01-03 15:49:53 -08:00 committed by GitHub
commit 6c3ca196fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 247 additions and 9 deletions

View File

@ -51,6 +51,10 @@ type dynamoDBMetadata struct {
TTLAttributeName string `json:"ttlAttributeName"`
}
const (
metadataPartitionKey = "partitionKey"
)
// NewDynamoDBStateStore returns a new dynamoDB state store.
func NewDynamoDBStateStore(_ logger.Logger) state.Store {
return &StateStore{}
@ -87,7 +91,7 @@ func (d *StateStore) Get(ctx context.Context, req *state.GetRequest) (*state.Get
TableName: aws.String(d.table),
Key: map[string]*dynamodb.AttributeValue{
"key": {
S: aws.String(req.Key),
S: aws.String(populatePartitionMetadata(req.Key, req.Metadata)),
},
},
}
@ -224,7 +228,7 @@ func (d *StateStore) Delete(ctx context.Context, req *state.DeleteRequest) error
input := &dynamodb.DeleteItemInput{
Key: map[string]*dynamodb.AttributeValue{
"key": {
S: aws.String(req.Key),
S: aws.String(populatePartitionMetadata(req.Key, req.Metadata)),
},
},
TableName: aws.String(d.table),
@ -268,7 +272,7 @@ func (d *StateStore) BulkDelete(ctx context.Context, req []state.DeleteRequest)
DeleteRequest: &dynamodb.DeleteRequest{
Key: map[string]*dynamodb.AttributeValue{
"key": {
S: aws.String(r.Key),
S: aws.String(populatePartitionMetadata(r.Key, r.Metadata)),
},
},
},
@ -314,9 +318,10 @@ func (d *StateStore) getClient(metadata *dynamoDBMetadata) (*dynamodb.DynamoDB,
// getItemFromReq converts a dapr state.SetRequest into an dynamodb item
func (d *StateStore) getItemFromReq(req *state.SetRequest) (map[string]*dynamodb.AttributeValue, error) {
partitionKey := populatePartitionMetadata(req.Key, req.Metadata)
value, err := d.marshalToString(req.Value)
if err != nil {
return nil, fmt.Errorf("dynamodb error: failed to set key %s: %s", req.Key, err)
return nil, fmt.Errorf("dynamodb error: failed to set key %s: %s", partitionKey, err)
}
ttl, err := d.parseTTL(req)
@ -328,9 +333,10 @@ func (d *StateStore) getItemFromReq(req *state.SetRequest) (map[string]*dynamodb
if err != nil {
return nil, fmt.Errorf("dynamodb error: failed to generate etag: %w", err)
}
item := map[string]*dynamodb.AttributeValue{
"key": {
S: aws.String(req.Key),
S: aws.String(partitionKey),
},
"value": {
S: aws.String(value),
@ -385,3 +391,13 @@ func (d *StateStore) parseTTL(req *state.SetRequest) (*int64, error) {
return nil, nil
}
// This is a helper to return the partition key to use. If if metadata["partitionkey"] is present,
// use that, otherwise use what's in "key".
func populatePartitionMetadata(key string, requestMetadata map[string]string) string {
if val, found := requestMetadata[metadataPartitionKey]; found {
return val
}
return key
}

View File

@ -46,6 +46,11 @@ type DynamoDBItem struct {
TestAttributeName int64 `json:"testAttributeName"`
}
const (
tableName = "table_name"
pkey = "partitionKey"
)
func (m *mockedDynamoDB) GetItemWithContext(ctx context.Context, input *dynamodb.GetItemInput, op ...request.Option) (*dynamodb.GetItemOutput, error) {
return m.GetItemWithContextFn(ctx, input, op...)
}
@ -268,6 +273,45 @@ func TestGet(t *testing.T) {
assert.Nil(t, err)
assert.Empty(t, out.Data)
})
t.Run("Successfully retrieve item with metadata partition key", func(t *testing.T) {
ss := StateStore{
client: &mockedDynamoDB{
GetItemWithContextFn: func(ctx context.Context, input *dynamodb.GetItemInput, op ...request.Option) (output *dynamodb.GetItemOutput, err error) {
if *input.Key["key"].S != pkey {
return &dynamodb.GetItemOutput{
Item: map[string]*dynamodb.AttributeValue{},
}, nil
}
return &dynamodb.GetItemOutput{
Item: map[string]*dynamodb.AttributeValue{
"key": {
S: input.Key["key"].S,
},
"value": {
S: aws.String("some value"),
},
"etag": {
S: aws.String("1bdead4badc0ffee"),
},
},
}, nil
},
},
}
req := &state.GetRequest{
Key: "someKey",
Metadata: map[string]string{
metadataPartitionKey: pkey,
},
Options: state.GetStateOption{
Consistency: "strong",
},
}
out, err := ss.Get(context.Background(), req)
assert.Nil(t, err)
assert.Equal(t, []byte("some value"), out.Data)
assert.Equal(t, "1bdead4badc0ffee", *out.ETag)
})
}
func TestSet(t *testing.T) {
@ -619,6 +663,40 @@ func TestSet(t *testing.T) {
assert.NotNil(t, err)
assert.Equal(t, "dynamodb error: failed to parse ttlInSeconds: strconv.ParseInt: parsing \"invalidvalue\": invalid syntax", err.Error())
})
t.Run("Successfully set item with metadata partition key", func(t *testing.T) {
ss := StateStore{
client: &mockedDynamoDB{
PutItemWithContextFn: func(ctx context.Context, input *dynamodb.PutItemInput, op ...request.Option) (output *dynamodb.PutItemOutput, err error) {
assert.Equal(t, dynamodb.AttributeValue{
S: aws.String(pkey),
}, *input.Item["key"])
assert.Equal(t, dynamodb.AttributeValue{
S: aws.String(`{"Value":"value"}`),
}, *input.Item["value"])
assert.Equal(t, len(input.Item), 3)
return &dynamodb.PutItemOutput{
Attributes: map[string]*dynamodb.AttributeValue{
"key": {
S: aws.String("value"),
},
},
}, nil
},
},
}
req := &state.SetRequest{
Key: "key",
Metadata: map[string]string{
metadataPartitionKey: pkey,
},
Value: value{
Value: "value",
},
}
err := ss.Set(context.Background(), req)
assert.Nil(t, err)
})
}
func TestBulkSet(t *testing.T) {
@ -627,7 +705,6 @@ func TestBulkSet(t *testing.T) {
}
t.Run("Successfully set items", func(t *testing.T) {
tableName := "table_name"
ss := StateStore{
client: &mockedDynamoDB{
BatchWriteItemWithContextFn: func(ctx context.Context, input *dynamodb.BatchWriteItemInput, op ...request.Option) (output *dynamodb.BatchWriteItemOutput, err error) {
@ -694,7 +771,6 @@ func TestBulkSet(t *testing.T) {
assert.Nil(t, err)
})
t.Run("Successfully set items with ttl = -1", func(t *testing.T) {
tableName := "table_name"
ss := StateStore{
client: &mockedDynamoDB{
BatchWriteItemWithContextFn: func(ctx context.Context, input *dynamodb.BatchWriteItemInput, op ...request.Option) (output *dynamodb.BatchWriteItemOutput, err error) {
@ -767,7 +843,6 @@ func TestBulkSet(t *testing.T) {
assert.Nil(t, err)
})
t.Run("Successfully set items with ttl", func(t *testing.T) {
tableName := "table_name"
ss := StateStore{
client: &mockedDynamoDB{
BatchWriteItemWithContextFn: func(ctx context.Context, input *dynamodb.BatchWriteItemInput, op ...request.Option) (output *dynamodb.BatchWriteItemOutput, err error) {
@ -866,6 +941,78 @@ func TestBulkSet(t *testing.T) {
err := ss.BulkSet(context.Background(), req)
assert.NotNil(t, err)
})
t.Run("Successfully set items with metadata partition key", func(t *testing.T) {
ss := StateStore{
client: &mockedDynamoDB{
BatchWriteItemWithContextFn: func(ctx context.Context, input *dynamodb.BatchWriteItemInput, op ...request.Option) (output *dynamodb.BatchWriteItemOutput, err error) {
expected := map[string][]*dynamodb.WriteRequest{}
expected[tableName] = []*dynamodb.WriteRequest{
{
PutRequest: &dynamodb.PutRequest{
Item: map[string]*dynamodb.AttributeValue{
"key": {
S: aws.String(pkey),
},
"value": {
S: aws.String(`{"Value":"value1"}`),
},
},
},
},
{
PutRequest: &dynamodb.PutRequest{
Item: map[string]*dynamodb.AttributeValue{
"key": {
S: aws.String(pkey),
},
"value": {
S: aws.String(`{"Value":"value2"}`),
},
},
},
},
}
for tbl := range expected {
for reqNum := range expected[tbl] {
expectedItem := expected[tbl][reqNum].PutRequest.Item
inputItem := input.RequestItems[tbl][reqNum].PutRequest.Item
assert.Equal(t, expectedItem["key"], inputItem["key"])
assert.Equal(t, expectedItem["value"], inputItem["value"])
}
}
return &dynamodb.BatchWriteItemOutput{
UnprocessedItems: map[string][]*dynamodb.WriteRequest{},
}, nil
},
},
table: tableName,
}
req := []state.SetRequest{
{
Key: "key1",
Metadata: map[string]string{
metadataPartitionKey: pkey,
},
Value: value{
Value: "value1",
},
},
{
Key: "key2",
Metadata: map[string]string{
metadataPartitionKey: pkey,
},
Value: value{
Value: "value2",
},
},
}
err := ss.BulkSet(context.Background(), req)
assert.Nil(t, err)
})
}
func TestDelete(t *testing.T) {
@ -968,11 +1115,35 @@ func TestDelete(t *testing.T) {
err := ss.Delete(context.Background(), req)
assert.NotNil(t, err)
})
t.Run("Successfully delete item with metadata partition key", func(t *testing.T) {
req := &state.DeleteRequest{
Key: "key",
Metadata: map[string]string{
metadataPartitionKey: pkey,
},
}
ss := StateStore{
client: &mockedDynamoDB{
DeleteItemWithContextFn: func(ctx context.Context, input *dynamodb.DeleteItemInput, op ...request.Option) (output *dynamodb.DeleteItemOutput, err error) {
assert.Equal(t, map[string]*dynamodb.AttributeValue{
"key": {
S: aws.String(pkey),
},
}, input.Key)
return nil, nil
},
},
}
err := ss.Delete(context.Background(), req)
assert.Nil(t, err)
})
}
func TestBulkDelete(t *testing.T) {
t.Run("Successfully delete items", func(t *testing.T) {
tableName := "table_name"
ss := StateStore{
client: &mockedDynamoDB{
BatchWriteItemWithContextFn: func(ctx context.Context, input *dynamodb.BatchWriteItemInput, op ...request.Option) (output *dynamodb.BatchWriteItemOutput, err error) {
@ -1036,4 +1207,55 @@ func TestBulkDelete(t *testing.T) {
err := ss.BulkDelete(context.Background(), req)
assert.NotNil(t, err)
})
t.Run("Successfully delete items with metadata partition key", func(t *testing.T) {
ss := StateStore{
client: &mockedDynamoDB{
BatchWriteItemWithContextFn: func(ctx context.Context, input *dynamodb.BatchWriteItemInput, op ...request.Option) (output *dynamodb.BatchWriteItemOutput, err error) {
expected := map[string][]*dynamodb.WriteRequest{}
expected[tableName] = []*dynamodb.WriteRequest{
{
DeleteRequest: &dynamodb.DeleteRequest{
Key: map[string]*dynamodb.AttributeValue{
"key": {
S: aws.String(pkey),
},
},
},
},
{
DeleteRequest: &dynamodb.DeleteRequest{
Key: map[string]*dynamodb.AttributeValue{
"key": {
S: aws.String(pkey),
},
},
},
},
}
assert.Equal(t, expected, input.RequestItems)
return &dynamodb.BatchWriteItemOutput{
UnprocessedItems: map[string][]*dynamodb.WriteRequest{},
}, nil
},
},
table: tableName,
}
req := []state.DeleteRequest{
{
Key: "key1",
Metadata: map[string]string{
metadataPartitionKey: pkey,
},
},
{
Key: "key2",
Metadata: map[string]string{
metadataPartitionKey: pkey,
},
},
}
err := ss.BulkDelete(context.Background(), req)
assert.Nil(t, err)
})
}