diff --git a/state/aws/dynamodb/dynamodb.go b/state/aws/dynamodb/dynamodb.go index ca1521d79..f1eb010cc 100644 --- a/state/aws/dynamodb/dynamodb.go +++ b/state/aws/dynamodb/dynamodb.go @@ -39,6 +39,7 @@ type StateStore struct { client dynamodbiface.DynamoDBAPI table string ttlAttributeName string + partitionKey string } type dynamoDBMetadata struct { @@ -49,15 +50,19 @@ type dynamoDBMetadata struct { SessionToken string `json:"sessionToken"` Table string `json:"table"` TTLAttributeName string `json:"ttlAttributeName"` + PartitionKey string `json:"partitionKey"` } const ( - metadataPartitionKey = "partitionKey" + defaultPartitionKeyName = "key" + metadataPartitionKey = "partitionKey" ) // NewDynamoDBStateStore returns a new dynamoDB state store. func NewDynamoDBStateStore(_ logger.Logger) state.Store { - return &StateStore{} + return &StateStore{ + partitionKey: defaultPartitionKeyName, + } } // Init does metadata and connection parsing. @@ -75,6 +80,7 @@ func (d *StateStore) Init(metadata state.Metadata) error { d.client = client d.table = meta.Table d.ttlAttributeName = meta.TTLAttributeName + d.partitionKey = meta.PartitionKey return nil } @@ -90,8 +96,8 @@ func (d *StateStore) Get(ctx context.Context, req *state.GetRequest) (*state.Get ConsistentRead: aws.Bool(req.Options.Consistency == state.Strong), TableName: aws.String(d.table), Key: map[string]*dynamodb.AttributeValue{ - "key": { - S: aws.String(populatePartitionMetadata(req.Key, req.Metadata)), + d.partitionKey: { + S: aws.String(req.Key), }, }, } @@ -227,8 +233,8 @@ func (d *StateStore) BulkSet(ctx context.Context, req []state.SetRequest) error func (d *StateStore) Delete(ctx context.Context, req *state.DeleteRequest) error { input := &dynamodb.DeleteItemInput{ Key: map[string]*dynamodb.AttributeValue{ - "key": { - S: aws.String(populatePartitionMetadata(req.Key, req.Metadata)), + d.partitionKey: { + S: aws.String(req.Key), }, }, TableName: aws.String(d.table), @@ -271,8 +277,8 @@ func (d *StateStore) BulkDelete(ctx context.Context, req []state.DeleteRequest) writeRequest := &dynamodb.WriteRequest{ DeleteRequest: &dynamodb.DeleteRequest{ Key: map[string]*dynamodb.AttributeValue{ - "key": { - S: aws.String(populatePartitionMetadata(r.Key, r.Metadata)), + d.partitionKey: { + S: aws.String(r.Key), }, }, }, @@ -303,6 +309,7 @@ func (d *StateStore) getDynamoDBMetadata(meta state.Metadata) (*dynamoDBMetadata if m.Table == "" { return nil, fmt.Errorf("missing dynamodb table name") } + m.PartitionKey = populatePartitionMetadata(meta.Properties, defaultPartitionKeyName) return &m, err } @@ -318,10 +325,9 @@ func (d *StateStore) getClient(metadata *dynamoDBMetadata) (*dynamodb.DynamoDB, // getItemFromReq converts a dapr state.SetRequest into an dynamodb item func (d *StateStore) getItemFromReq(req *state.SetRequest) (map[string]*dynamodb.AttributeValue, error) { - partitionKey := populatePartitionMetadata(req.Key, req.Metadata) value, err := d.marshalToString(req.Value) if err != nil { - return nil, fmt.Errorf("dynamodb error: failed to set key %s: %s", partitionKey, err) + return nil, fmt.Errorf("dynamodb error: failed to marshal value for key %s: %s", req.Key, err) } ttl, err := d.parseTTL(req) @@ -335,8 +341,8 @@ func (d *StateStore) getItemFromReq(req *state.SetRequest) (map[string]*dynamodb } item := map[string]*dynamodb.AttributeValue{ - "key": { - S: aws.String(partitionKey), + d.partitionKey: { + S: aws.String(req.Key), }, "value": { S: aws.String(value), @@ -393,11 +399,11 @@ func (d *StateStore) parseTTL(req *state.SetRequest) (*int64, error) { } // This is a helper to return the partition key to use. If if metadata["partitionkey"] is present, -// use that, otherwise use what's in "key". -func populatePartitionMetadata(key string, requestMetadata map[string]string) string { +// use that, otherwise use default primay key "key". +func populatePartitionMetadata(requestMetadata map[string]string, defaultPartitionKeyName string) string { if val, found := requestMetadata[metadataPartitionKey]; found { return val } - return key + return defaultPartitionKeyName } diff --git a/state/aws/dynamodb/dynamodb_test.go b/state/aws/dynamodb/dynamodb_test.go index 426c869d0..ffd71a57c 100644 --- a/state/aws/dynamodb/dynamodb_test.go +++ b/state/aws/dynamodb/dynamodb_test.go @@ -70,6 +70,12 @@ func (m *mockedDynamoDB) BatchWriteItemWithContext(ctx context.Context, input *d func TestInit(t *testing.T) { m := state.Metadata{} s := NewDynamoDBStateStore(logger.NewLogger("test")).(*StateStore) + + t.Run("NewDynamoDBStateStore Default Partition Key", func(t *testing.T) { + assert.NotNil(t, s) + assert.Equal(t, s.partitionKey, defaultPartitionKeyName) + }) + t.Run("Init with valid metadata", func(t *testing.T) { m.Properties = map[string]string{ "AccessKey": "a", @@ -100,29 +106,40 @@ func TestInit(t *testing.T) { err := s.Init(m) assert.Nil(t, err) }) + + t.Run("Init with partition key", func(t *testing.T) { + pkey := "pkey" + m.Properties = map[string]string{ + "Table": "a", + "partitionKey": pkey, + } + err := s.Init(m) + assert.Nil(t, err) + assert.Equal(t, s.partitionKey, pkey) + }) } func TestGet(t *testing.T) { t.Run("Successfully retrieve item", func(t *testing.T) { - ss := StateStore{ - client: &mockedDynamoDB{ - GetItemWithContextFn: func(ctx context.Context, input *dynamodb.GetItemInput, op ...request.Option) (output *dynamodb.GetItemOutput, err error) { - return &dynamodb.GetItemOutput{ - Item: map[string]*dynamodb.AttributeValue{ - "key": { - S: aws.String("someKey"), - }, - "value": { - S: aws.String("some value"), - }, - "etag": { - S: aws.String("1bdead4badc0ffee"), - }, + ss := NewDynamoDBStateStore(logger.NewLogger("test")).(*StateStore) + ss.client = &mockedDynamoDB{ + GetItemWithContextFn: func(ctx context.Context, input *dynamodb.GetItemInput, op ...request.Option) (output *dynamodb.GetItemOutput, err error) { + return &dynamodb.GetItemOutput{ + Item: map[string]*dynamodb.AttributeValue{ + "key": { + S: aws.String("someKey"), }, - }, nil - }, + "value": { + S: aws.String("some value"), + }, + "etag": { + S: aws.String("1bdead4badc0ffee"), + }, + }, + }, nil }, } + req := &state.GetRequest{ Key: "someKey", Metadata: nil, @@ -273,45 +290,6 @@ func TestGet(t *testing.T) { assert.Nil(t, err) assert.Empty(t, out.Data) }) - t.Run("Successfully retrieve item with metadata partition key", func(t *testing.T) { - ss := StateStore{ - client: &mockedDynamoDB{ - GetItemWithContextFn: func(ctx context.Context, input *dynamodb.GetItemInput, op ...request.Option) (output *dynamodb.GetItemOutput, err error) { - if *input.Key["key"].S != pkey { - return &dynamodb.GetItemOutput{ - Item: map[string]*dynamodb.AttributeValue{}, - }, nil - } - return &dynamodb.GetItemOutput{ - Item: map[string]*dynamodb.AttributeValue{ - "key": { - S: input.Key["key"].S, - }, - "value": { - S: aws.String("some value"), - }, - "etag": { - S: aws.String("1bdead4badc0ffee"), - }, - }, - }, nil - }, - }, - } - req := &state.GetRequest{ - Key: "someKey", - Metadata: map[string]string{ - metadataPartitionKey: pkey, - }, - Options: state.GetStateOption{ - Consistency: "strong", - }, - } - out, err := ss.Get(context.Background(), req) - assert.Nil(t, err) - assert.Equal(t, []byte("some value"), out.Data) - assert.Equal(t, "1bdead4badc0ffee", *out.ETag) - }) } func TestSet(t *testing.T) { @@ -320,25 +298,24 @@ func TestSet(t *testing.T) { } t.Run("Successfully set item", func(t *testing.T) { - ss := StateStore{ - client: &mockedDynamoDB{ - PutItemWithContextFn: func(ctx context.Context, input *dynamodb.PutItemInput, op ...request.Option) (output *dynamodb.PutItemOutput, err error) { - assert.Equal(t, dynamodb.AttributeValue{ - S: aws.String("key"), - }, *input.Item["key"]) - assert.Equal(t, dynamodb.AttributeValue{ - S: aws.String(`{"Value":"value"}`), - }, *input.Item["value"]) - assert.Equal(t, len(input.Item), 3) + ss := NewDynamoDBStateStore(logger.NewLogger("test")).(*StateStore) + ss.client = &mockedDynamoDB{ + PutItemWithContextFn: func(ctx context.Context, input *dynamodb.PutItemInput, op ...request.Option) (output *dynamodb.PutItemOutput, err error) { + assert.Equal(t, dynamodb.AttributeValue{ + S: aws.String("key"), + }, *input.Item["key"]) + assert.Equal(t, dynamodb.AttributeValue{ + S: aws.String(`{"Value":"value"}`), + }, *input.Item["value"]) + assert.Equal(t, len(input.Item), 3) - return &dynamodb.PutItemOutput{ - Attributes: map[string]*dynamodb.AttributeValue{ - "key": { - S: aws.String("value"), - }, + return &dynamodb.PutItemOutput{ + Attributes: map[string]*dynamodb.AttributeValue{ + "key": { + S: aws.String("value"), }, - }, nil - }, + }, + }, nil }, } req := &state.SetRequest{ @@ -352,29 +329,28 @@ func TestSet(t *testing.T) { }) t.Run("Successfully set item with matching etag", func(t *testing.T) { - ss := StateStore{ - client: &mockedDynamoDB{ - PutItemWithContextFn: func(ctx context.Context, input *dynamodb.PutItemInput, op ...request.Option) (output *dynamodb.PutItemOutput, err error) { - assert.Equal(t, dynamodb.AttributeValue{ - S: aws.String("key"), - }, *input.Item["key"]) - assert.Equal(t, dynamodb.AttributeValue{ - S: aws.String(`{"Value":"value"}`), - }, *input.Item["value"]) - assert.Equal(t, "etag = :etag", *input.ConditionExpression) - assert.Equal(t, &dynamodb.AttributeValue{ - S: aws.String("1bdead4badc0ffee"), - }, input.ExpressionAttributeValues[":etag"]) - assert.Equal(t, len(input.Item), 3) + ss := NewDynamoDBStateStore(logger.NewLogger("test")).(*StateStore) + ss.client = &mockedDynamoDB{ + PutItemWithContextFn: func(ctx context.Context, input *dynamodb.PutItemInput, op ...request.Option) (output *dynamodb.PutItemOutput, err error) { + assert.Equal(t, dynamodb.AttributeValue{ + S: aws.String("key"), + }, *input.Item["key"]) + assert.Equal(t, dynamodb.AttributeValue{ + S: aws.String(`{"Value":"value"}`), + }, *input.Item["value"]) + assert.Equal(t, "etag = :etag", *input.ConditionExpression) + assert.Equal(t, &dynamodb.AttributeValue{ + S: aws.String("1bdead4badc0ffee"), + }, input.ExpressionAttributeValues[":etag"]) + assert.Equal(t, len(input.Item), 3) - return &dynamodb.PutItemOutput{ - Attributes: map[string]*dynamodb.AttributeValue{ - "key": { - S: aws.String("value"), - }, + return &dynamodb.PutItemOutput{ + Attributes: map[string]*dynamodb.AttributeValue{ + "key": { + S: aws.String("value"), }, - }, nil - }, + }, + }, nil }, } etag := "1bdead4badc0ffee" @@ -390,24 +366,23 @@ func TestSet(t *testing.T) { }) t.Run("Unsuccessfully set item with mismatched etag", func(t *testing.T) { - ss := StateStore{ - client: &mockedDynamoDB{ - PutItemWithContextFn: func(ctx context.Context, input *dynamodb.PutItemInput, op ...request.Option) (output *dynamodb.PutItemOutput, err error) { - assert.Equal(t, dynamodb.AttributeValue{ - S: aws.String("key"), - }, *input.Item["key"]) - assert.Equal(t, dynamodb.AttributeValue{ - S: aws.String(`{"Value":"value"}`), - }, *input.Item["value"]) - assert.Equal(t, "etag = :etag", *input.ConditionExpression) - assert.Equal(t, &dynamodb.AttributeValue{ - S: aws.String("bogusetag"), - }, input.ExpressionAttributeValues[":etag"]) - assert.Equal(t, len(input.Item), 3) + ss := NewDynamoDBStateStore(logger.NewLogger("test")).(*StateStore) + ss.client = &mockedDynamoDB{ + PutItemWithContextFn: func(ctx context.Context, input *dynamodb.PutItemInput, op ...request.Option) (output *dynamodb.PutItemOutput, err error) { + assert.Equal(t, dynamodb.AttributeValue{ + S: aws.String("key"), + }, *input.Item["key"]) + assert.Equal(t, dynamodb.AttributeValue{ + S: aws.String(`{"Value":"value"}`), + }, *input.Item["value"]) + assert.Equal(t, "etag = :etag", *input.ConditionExpression) + assert.Equal(t, &dynamodb.AttributeValue{ + S: aws.String("bogusetag"), + }, input.ExpressionAttributeValues[":etag"]) + assert.Equal(t, len(input.Item), 3) - var checkErr dynamodb.ConditionalCheckFailedException - return nil, &checkErr - }, + var checkErr dynamodb.ConditionalCheckFailedException + return nil, &checkErr }, } etag := "bogusetag" @@ -430,26 +405,25 @@ func TestSet(t *testing.T) { }) t.Run("Successfully set item with first-write-concurrency", func(t *testing.T) { - ss := StateStore{ - client: &mockedDynamoDB{ - PutItemWithContextFn: func(ctx context.Context, input *dynamodb.PutItemInput, op ...request.Option) (output *dynamodb.PutItemOutput, err error) { - assert.Equal(t, dynamodb.AttributeValue{ - S: aws.String("key"), - }, *input.Item["key"]) - assert.Equal(t, dynamodb.AttributeValue{ - S: aws.String(`{"Value":"value"}`), - }, *input.Item["value"]) - assert.Equal(t, "attribute_not_exists(etag)", *input.ConditionExpression) - assert.Equal(t, len(input.Item), 3) + ss := NewDynamoDBStateStore(logger.NewLogger("test")).(*StateStore) + ss.client = &mockedDynamoDB{ + PutItemWithContextFn: func(ctx context.Context, input *dynamodb.PutItemInput, op ...request.Option) (output *dynamodb.PutItemOutput, err error) { + assert.Equal(t, dynamodb.AttributeValue{ + S: aws.String("key"), + }, *input.Item["key"]) + assert.Equal(t, dynamodb.AttributeValue{ + S: aws.String(`{"Value":"value"}`), + }, *input.Item["value"]) + assert.Equal(t, "attribute_not_exists(etag)", *input.ConditionExpression) + assert.Equal(t, len(input.Item), 3) - return &dynamodb.PutItemOutput{ - Attributes: map[string]*dynamodb.AttributeValue{ - "key": { - S: aws.String("value"), - }, + return &dynamodb.PutItemOutput{ + Attributes: map[string]*dynamodb.AttributeValue{ + "key": { + S: aws.String("value"), }, - }, nil - }, + }, + }, nil }, } req := &state.SetRequest{ @@ -466,21 +440,20 @@ func TestSet(t *testing.T) { }) t.Run("Unsuccessfully set item with first-write-concurrency", func(t *testing.T) { - ss := StateStore{ - client: &mockedDynamoDB{ - PutItemWithContextFn: func(ctx context.Context, input *dynamodb.PutItemInput, op ...request.Option) (output *dynamodb.PutItemOutput, err error) { - assert.Equal(t, dynamodb.AttributeValue{ - S: aws.String("key"), - }, *input.Item["key"]) - assert.Equal(t, dynamodb.AttributeValue{ - S: aws.String(`{"Value":"value"}`), - }, *input.Item["value"]) - assert.Equal(t, "attribute_not_exists(etag)", *input.ConditionExpression) - assert.Equal(t, len(input.Item), 3) + ss := NewDynamoDBStateStore(logger.NewLogger("test")).(*StateStore) + ss.client = &mockedDynamoDB{ + PutItemWithContextFn: func(ctx context.Context, input *dynamodb.PutItemInput, op ...request.Option) (output *dynamodb.PutItemOutput, err error) { + assert.Equal(t, dynamodb.AttributeValue{ + S: aws.String("key"), + }, *input.Item["key"]) + assert.Equal(t, dynamodb.AttributeValue{ + S: aws.String(`{"Value":"value"}`), + }, *input.Item["value"]) + assert.Equal(t, "attribute_not_exists(etag)", *input.ConditionExpression) + assert.Equal(t, len(input.Item), 3) - var checkErr dynamodb.ConditionalCheckFailedException - return nil, &checkErr - }, + var checkErr dynamodb.ConditionalCheckFailedException + return nil, &checkErr }, } req := &state.SetRequest{ @@ -502,28 +475,28 @@ func TestSet(t *testing.T) { }) t.Run("Successfully set item with ttl = -1", func(t *testing.T) { - ss := StateStore{ - client: &mockedDynamoDB{ - PutItemWithContextFn: func(ctx context.Context, input *dynamodb.PutItemInput, op ...request.Option) (output *dynamodb.PutItemOutput, err error) { - assert.Equal(t, len(input.Item), 4) - result := DynamoDBItem{} - dynamodbattribute.UnmarshalMap(input.Item, &result) - assert.Equal(t, result.Key, "someKey") - assert.Equal(t, result.Value, "{\"Value\":\"someValue\"}") - assert.Greater(t, result.TestAttributeName, time.Now().Unix()-2) - assert.Less(t, result.TestAttributeName, time.Now().Unix()) + ss := NewDynamoDBStateStore(logger.NewLogger("test")).(*StateStore) + ss.client = &mockedDynamoDB{ + PutItemWithContextFn: func(ctx context.Context, input *dynamodb.PutItemInput, op ...request.Option) (output *dynamodb.PutItemOutput, err error) { + assert.Equal(t, len(input.Item), 4) + result := DynamoDBItem{} + dynamodbattribute.UnmarshalMap(input.Item, &result) + assert.Equal(t, result.Key, "someKey") + assert.Equal(t, result.Value, "{\"Value\":\"someValue\"}") + assert.Greater(t, result.TestAttributeName, time.Now().Unix()-2) + assert.Less(t, result.TestAttributeName, time.Now().Unix()) - return &dynamodb.PutItemOutput{ - Attributes: map[string]*dynamodb.AttributeValue{ - "key": { - S: aws.String("value"), - }, + return &dynamodb.PutItemOutput{ + Attributes: map[string]*dynamodb.AttributeValue{ + "key": { + S: aws.String("value"), }, - }, nil - }, + }, + }, nil }, - ttlAttributeName: "testAttributeName", } + ss.ttlAttributeName = "testAttributeName" + req := &state.SetRequest{ Key: "someKey", Value: value{ @@ -537,28 +510,28 @@ func TestSet(t *testing.T) { assert.Nil(t, err) }) t.Run("Successfully set item with 'correct' ttl", func(t *testing.T) { - ss := StateStore{ - client: &mockedDynamoDB{ - PutItemWithContextFn: func(ctx context.Context, input *dynamodb.PutItemInput, op ...request.Option) (output *dynamodb.PutItemOutput, err error) { - assert.Equal(t, len(input.Item), 4) - result := DynamoDBItem{} - dynamodbattribute.UnmarshalMap(input.Item, &result) - assert.Equal(t, result.Key, "someKey") - assert.Equal(t, result.Value, "{\"Value\":\"someValue\"}") - assert.Greater(t, result.TestAttributeName, time.Now().Unix()+180-1) - assert.Less(t, result.TestAttributeName, time.Now().Unix()+180+1) + ss := NewDynamoDBStateStore(logger.NewLogger("test")).(*StateStore) + ss.client = &mockedDynamoDB{ + PutItemWithContextFn: func(ctx context.Context, input *dynamodb.PutItemInput, op ...request.Option) (output *dynamodb.PutItemOutput, err error) { + assert.Equal(t, len(input.Item), 4) + result := DynamoDBItem{} + dynamodbattribute.UnmarshalMap(input.Item, &result) + assert.Equal(t, result.Key, "someKey") + assert.Equal(t, result.Value, "{\"Value\":\"someValue\"}") + assert.Greater(t, result.TestAttributeName, time.Now().Unix()+180-1) + assert.Less(t, result.TestAttributeName, time.Now().Unix()+180+1) - return &dynamodb.PutItemOutput{ - Attributes: map[string]*dynamodb.AttributeValue{ - "key": { - S: aws.String("value"), - }, + return &dynamodb.PutItemOutput{ + Attributes: map[string]*dynamodb.AttributeValue{ + "key": { + S: aws.String("value"), }, - }, nil - }, + }, + }, nil }, - ttlAttributeName: "testAttributeName", } + ss.ttlAttributeName = "testAttributeName" + req := &state.SetRequest{ Key: "someKey", Value: value{ @@ -573,11 +546,10 @@ func TestSet(t *testing.T) { }) t.Run("Unsuccessfully set item", func(t *testing.T) { - ss := StateStore{ - client: &mockedDynamoDB{ - PutItemWithContextFn: func(ctx context.Context, input *dynamodb.PutItemInput, op ...request.Option) (output *dynamodb.PutItemOutput, err error) { - return nil, fmt.Errorf("unable to put item") - }, + ss := NewDynamoDBStateStore(logger.NewLogger("test")).(*StateStore) + ss.client = &mockedDynamoDB{ + PutItemWithContextFn: func(ctx context.Context, input *dynamodb.PutItemInput, op ...request.Option) (output *dynamodb.PutItemOutput, err error) { + return nil, fmt.Errorf("unable to put item") }, } req := &state.SetRequest{ @@ -590,25 +562,24 @@ func TestSet(t *testing.T) { assert.NotNil(t, err) }) t.Run("Successfully set item with correct ttl but without component metadata", func(t *testing.T) { - ss := StateStore{ - client: &mockedDynamoDB{ - PutItemWithContextFn: func(ctx context.Context, input *dynamodb.PutItemInput, op ...request.Option) (output *dynamodb.PutItemOutput, err error) { - assert.Equal(t, dynamodb.AttributeValue{ - S: aws.String("someKey"), - }, *input.Item["key"]) - assert.Equal(t, dynamodb.AttributeValue{ - S: aws.String(`{"Value":"someValue"}`), - }, *input.Item["value"]) - assert.Equal(t, len(input.Item), 3) + ss := NewDynamoDBStateStore(logger.NewLogger("test")).(*StateStore) + ss.client = &mockedDynamoDB{ + PutItemWithContextFn: func(ctx context.Context, input *dynamodb.PutItemInput, op ...request.Option) (output *dynamodb.PutItemOutput, err error) { + assert.Equal(t, dynamodb.AttributeValue{ + S: aws.String("someKey"), + }, *input.Item["key"]) + assert.Equal(t, dynamodb.AttributeValue{ + S: aws.String(`{"Value":"someValue"}`), + }, *input.Item["value"]) + assert.Equal(t, len(input.Item), 3) - return &dynamodb.PutItemOutput{ - Attributes: map[string]*dynamodb.AttributeValue{ - "key": { - S: aws.String("value"), - }, + return &dynamodb.PutItemOutput{ + Attributes: map[string]*dynamodb.AttributeValue{ + "key": { + S: aws.String("value"), }, - }, nil - }, + }, + }, nil }, } req := &state.SetRequest{ @@ -663,40 +634,6 @@ func TestSet(t *testing.T) { assert.NotNil(t, err) assert.Equal(t, "dynamodb error: failed to parse ttlInSeconds: strconv.ParseInt: parsing \"invalidvalue\": invalid syntax", err.Error()) }) - t.Run("Successfully set item with metadata partition key", func(t *testing.T) { - ss := StateStore{ - client: &mockedDynamoDB{ - PutItemWithContextFn: func(ctx context.Context, input *dynamodb.PutItemInput, op ...request.Option) (output *dynamodb.PutItemOutput, err error) { - assert.Equal(t, dynamodb.AttributeValue{ - S: aws.String(pkey), - }, *input.Item["key"]) - assert.Equal(t, dynamodb.AttributeValue{ - S: aws.String(`{"Value":"value"}`), - }, *input.Item["value"]) - assert.Equal(t, len(input.Item), 3) - - return &dynamodb.PutItemOutput{ - Attributes: map[string]*dynamodb.AttributeValue{ - "key": { - S: aws.String("value"), - }, - }, - }, nil - }, - }, - } - req := &state.SetRequest{ - Key: "key", - Metadata: map[string]string{ - metadataPartitionKey: pkey, - }, - Value: value{ - Value: "value", - }, - } - err := ss.Set(context.Background(), req) - assert.Nil(t, err) - }) } func TestBulkSet(t *testing.T) { @@ -705,54 +642,54 @@ func TestBulkSet(t *testing.T) { } t.Run("Successfully set items", func(t *testing.T) { - ss := StateStore{ - client: &mockedDynamoDB{ - BatchWriteItemWithContextFn: func(ctx context.Context, input *dynamodb.BatchWriteItemInput, op ...request.Option) (output *dynamodb.BatchWriteItemOutput, err error) { - expected := map[string][]*dynamodb.WriteRequest{} - expected[tableName] = []*dynamodb.WriteRequest{ - { - PutRequest: &dynamodb.PutRequest{ - Item: map[string]*dynamodb.AttributeValue{ - "key": { - S: aws.String("key1"), - }, - "value": { - S: aws.String(`{"Value":"value1"}`), - }, + ss := NewDynamoDBStateStore(logger.NewLogger("test")).(*StateStore) + ss.client = &mockedDynamoDB{ + BatchWriteItemWithContextFn: func(ctx context.Context, input *dynamodb.BatchWriteItemInput, op ...request.Option) (output *dynamodb.BatchWriteItemOutput, err error) { + expected := map[string][]*dynamodb.WriteRequest{} + expected[tableName] = []*dynamodb.WriteRequest{ + { + PutRequest: &dynamodb.PutRequest{ + Item: map[string]*dynamodb.AttributeValue{ + "key": { + S: aws.String("key1"), + }, + "value": { + S: aws.String(`{"Value":"value1"}`), }, }, }, - { - PutRequest: &dynamodb.PutRequest{ - Item: map[string]*dynamodb.AttributeValue{ - "key": { - S: aws.String("key2"), - }, - "value": { - S: aws.String(`{"Value":"value2"}`), - }, + }, + { + PutRequest: &dynamodb.PutRequest{ + Item: map[string]*dynamodb.AttributeValue{ + "key": { + S: aws.String("key2"), + }, + "value": { + S: aws.String(`{"Value":"value2"}`), }, }, }, + }, + } + + for tbl := range expected { + for reqNum := range expected[tbl] { + expectedItem := expected[tbl][reqNum].PutRequest.Item + inputItem := input.RequestItems[tbl][reqNum].PutRequest.Item + + assert.Equal(t, expectedItem["key"], inputItem["key"]) + assert.Equal(t, expectedItem["value"], inputItem["value"]) } + } - for tbl := range expected { - for reqNum := range expected[tbl] { - expectedItem := expected[tbl][reqNum].PutRequest.Item - inputItem := input.RequestItems[tbl][reqNum].PutRequest.Item - - assert.Equal(t, expectedItem["key"], inputItem["key"]) - assert.Equal(t, expectedItem["value"], inputItem["value"]) - } - } - - return &dynamodb.BatchWriteItemOutput{ - UnprocessedItems: map[string][]*dynamodb.WriteRequest{}, - }, nil - }, + return &dynamodb.BatchWriteItemOutput{ + UnprocessedItems: map[string][]*dynamodb.WriteRequest{}, + }, nil }, - table: tableName, } + ss.table = tableName + req := []state.SetRequest{ { Key: "key1", @@ -771,57 +708,57 @@ func TestBulkSet(t *testing.T) { assert.Nil(t, err) }) t.Run("Successfully set items with ttl = -1", func(t *testing.T) { - ss := StateStore{ - client: &mockedDynamoDB{ - BatchWriteItemWithContextFn: func(ctx context.Context, input *dynamodb.BatchWriteItemInput, op ...request.Option) (output *dynamodb.BatchWriteItemOutput, err error) { - expected := map[string][]*dynamodb.WriteRequest{} - expected[tableName] = []*dynamodb.WriteRequest{ - { - PutRequest: &dynamodb.PutRequest{ - Item: map[string]*dynamodb.AttributeValue{ - "key": { - S: aws.String("key1"), - }, - "value": { - S: aws.String(`{"Value":"value1"}`), - }, - "testAttributeName": { - N: aws.String(strconv.FormatInt(time.Now().Unix()-1, 10)), - }, + ss := NewDynamoDBStateStore(logger.NewLogger("test")).(*StateStore) + ss.client = &mockedDynamoDB{ + BatchWriteItemWithContextFn: func(ctx context.Context, input *dynamodb.BatchWriteItemInput, op ...request.Option) (output *dynamodb.BatchWriteItemOutput, err error) { + expected := map[string][]*dynamodb.WriteRequest{} + expected[tableName] = []*dynamodb.WriteRequest{ + { + PutRequest: &dynamodb.PutRequest{ + Item: map[string]*dynamodb.AttributeValue{ + "key": { + S: aws.String("key1"), + }, + "value": { + S: aws.String(`{"Value":"value1"}`), + }, + "testAttributeName": { + N: aws.String(strconv.FormatInt(time.Now().Unix()-1, 10)), }, }, }, - { - PutRequest: &dynamodb.PutRequest{ - Item: map[string]*dynamodb.AttributeValue{ - "key": { - S: aws.String("key2"), - }, - "value": { - S: aws.String(`{"Value":"value2"}`), - }, + }, + { + PutRequest: &dynamodb.PutRequest{ + Item: map[string]*dynamodb.AttributeValue{ + "key": { + S: aws.String("key2"), + }, + "value": { + S: aws.String(`{"Value":"value2"}`), }, }, }, - } - for tbl := range expected { - for reqNum := range expected[tbl] { - expectedItem := expected[tbl][reqNum].PutRequest.Item - inputItem := input.RequestItems[tbl][reqNum].PutRequest.Item + }, + } + for tbl := range expected { + for reqNum := range expected[tbl] { + expectedItem := expected[tbl][reqNum].PutRequest.Item + inputItem := input.RequestItems[tbl][reqNum].PutRequest.Item - assert.Equal(t, expectedItem["key"], inputItem["key"]) - assert.Equal(t, expectedItem["value"], inputItem["value"]) - } + assert.Equal(t, expectedItem["key"], inputItem["key"]) + assert.Equal(t, expectedItem["value"], inputItem["value"]) } + } - return &dynamodb.BatchWriteItemOutput{ - UnprocessedItems: map[string][]*dynamodb.WriteRequest{}, - }, nil - }, + return &dynamodb.BatchWriteItemOutput{ + UnprocessedItems: map[string][]*dynamodb.WriteRequest{}, + }, nil }, - table: tableName, - ttlAttributeName: "testAttributeName", } + ss.table = tableName + ss.ttlAttributeName = "testAttributeName" + req := []state.SetRequest{ { Key: "key1", @@ -843,59 +780,59 @@ func TestBulkSet(t *testing.T) { assert.Nil(t, err) }) t.Run("Successfully set items with ttl", func(t *testing.T) { - ss := StateStore{ - client: &mockedDynamoDB{ - BatchWriteItemWithContextFn: func(ctx context.Context, input *dynamodb.BatchWriteItemInput, op ...request.Option) (output *dynamodb.BatchWriteItemOutput, err error) { - expected := map[string][]*dynamodb.WriteRequest{} - // This might fail occasionally due to timestamp precision. - timestamp := time.Now().Unix() + 90 - expected[tableName] = []*dynamodb.WriteRequest{ - { - PutRequest: &dynamodb.PutRequest{ - Item: map[string]*dynamodb.AttributeValue{ - "key": { - S: aws.String("key1"), - }, - "value": { - S: aws.String(`{"Value":"value1"}`), - }, - "testAttributeName": { - N: aws.String(strconv.FormatInt(timestamp, 10)), - }, + ss := NewDynamoDBStateStore(logger.NewLogger("test")).(*StateStore) + ss.client = &mockedDynamoDB{ + BatchWriteItemWithContextFn: func(ctx context.Context, input *dynamodb.BatchWriteItemInput, op ...request.Option) (output *dynamodb.BatchWriteItemOutput, err error) { + expected := map[string][]*dynamodb.WriteRequest{} + // This might fail occasionally due to timestamp precision. + timestamp := time.Now().Unix() + 90 + expected[tableName] = []*dynamodb.WriteRequest{ + { + PutRequest: &dynamodb.PutRequest{ + Item: map[string]*dynamodb.AttributeValue{ + "key": { + S: aws.String("key1"), + }, + "value": { + S: aws.String(`{"Value":"value1"}`), + }, + "testAttributeName": { + N: aws.String(strconv.FormatInt(timestamp, 10)), }, }, }, - { - PutRequest: &dynamodb.PutRequest{ - Item: map[string]*dynamodb.AttributeValue{ - "key": { - S: aws.String("key2"), - }, - "value": { - S: aws.String(`{"Value":"value2"}`), - }, + }, + { + PutRequest: &dynamodb.PutRequest{ + Item: map[string]*dynamodb.AttributeValue{ + "key": { + S: aws.String("key2"), + }, + "value": { + S: aws.String(`{"Value":"value2"}`), }, }, }, - } - for tbl := range expected { - for reqNum := range expected[tbl] { - expectedItem := expected[tbl][reqNum].PutRequest.Item - inputItem := input.RequestItems[tbl][reqNum].PutRequest.Item + }, + } + for tbl := range expected { + for reqNum := range expected[tbl] { + expectedItem := expected[tbl][reqNum].PutRequest.Item + inputItem := input.RequestItems[tbl][reqNum].PutRequest.Item - assert.Equal(t, expectedItem["key"], inputItem["key"]) - assert.Equal(t, expectedItem["value"], inputItem["value"]) - } + assert.Equal(t, expectedItem["key"], inputItem["key"]) + assert.Equal(t, expectedItem["value"], inputItem["value"]) } + } - return &dynamodb.BatchWriteItemOutput{ - UnprocessedItems: map[string][]*dynamodb.WriteRequest{}, - }, nil - }, + return &dynamodb.BatchWriteItemOutput{ + UnprocessedItems: map[string][]*dynamodb.WriteRequest{}, + }, nil }, - table: tableName, - ttlAttributeName: "testAttributeName", } + ss.table = tableName + ss.ttlAttributeName = "testAttributeName" + req := []state.SetRequest{ { Key: "key1", @@ -941,78 +878,6 @@ func TestBulkSet(t *testing.T) { err := ss.BulkSet(context.Background(), req) assert.NotNil(t, err) }) - t.Run("Successfully set items with metadata partition key", func(t *testing.T) { - ss := StateStore{ - client: &mockedDynamoDB{ - BatchWriteItemWithContextFn: func(ctx context.Context, input *dynamodb.BatchWriteItemInput, op ...request.Option) (output *dynamodb.BatchWriteItemOutput, err error) { - expected := map[string][]*dynamodb.WriteRequest{} - expected[tableName] = []*dynamodb.WriteRequest{ - { - PutRequest: &dynamodb.PutRequest{ - Item: map[string]*dynamodb.AttributeValue{ - "key": { - S: aws.String(pkey), - }, - "value": { - S: aws.String(`{"Value":"value1"}`), - }, - }, - }, - }, - { - PutRequest: &dynamodb.PutRequest{ - Item: map[string]*dynamodb.AttributeValue{ - "key": { - S: aws.String(pkey), - }, - "value": { - S: aws.String(`{"Value":"value2"}`), - }, - }, - }, - }, - } - - for tbl := range expected { - for reqNum := range expected[tbl] { - expectedItem := expected[tbl][reqNum].PutRequest.Item - inputItem := input.RequestItems[tbl][reqNum].PutRequest.Item - - assert.Equal(t, expectedItem["key"], inputItem["key"]) - assert.Equal(t, expectedItem["value"], inputItem["value"]) - } - } - - return &dynamodb.BatchWriteItemOutput{ - UnprocessedItems: map[string][]*dynamodb.WriteRequest{}, - }, nil - }, - }, - table: tableName, - } - req := []state.SetRequest{ - { - Key: "key1", - Metadata: map[string]string{ - metadataPartitionKey: pkey, - }, - Value: value{ - Value: "value1", - }, - }, - { - Key: "key2", - Metadata: map[string]string{ - metadataPartitionKey: pkey, - }, - Value: value{ - Value: "value2", - }, - }, - } - err := ss.BulkSet(context.Background(), req) - assert.Nil(t, err) - }) } func TestDelete(t *testing.T) { @@ -1021,19 +886,19 @@ func TestDelete(t *testing.T) { Key: "key", } - ss := StateStore{ - client: &mockedDynamoDB{ - DeleteItemWithContextFn: func(ctx context.Context, input *dynamodb.DeleteItemInput, op ...request.Option) (output *dynamodb.DeleteItemOutput, err error) { - assert.Equal(t, map[string]*dynamodb.AttributeValue{ - "key": { - S: aws.String(req.Key), - }, - }, input.Key) + ss := NewDynamoDBStateStore(logger.NewLogger("test")).(*StateStore) + ss.client = &mockedDynamoDB{ + DeleteItemWithContextFn: func(ctx context.Context, input *dynamodb.DeleteItemInput, op ...request.Option) (output *dynamodb.DeleteItemOutput, err error) { + assert.Equal(t, map[string]*dynamodb.AttributeValue{ + "key": { + S: aws.String(req.Key), + }, + }, input.Key) - return nil, nil - }, + return nil, nil }, } + err := ss.Delete(context.Background(), req) assert.Nil(t, err) }) @@ -1044,24 +909,23 @@ func TestDelete(t *testing.T) { ETag: &etag, Key: "key", } + ss := NewDynamoDBStateStore(logger.NewLogger("test")).(*StateStore) + ss.client = &mockedDynamoDB{ + DeleteItemWithContextFn: func(ctx context.Context, input *dynamodb.DeleteItemInput, op ...request.Option) (output *dynamodb.DeleteItemOutput, err error) { + assert.Equal(t, map[string]*dynamodb.AttributeValue{ + "key": { + S: aws.String(req.Key), + }, + }, input.Key) + assert.Equal(t, "etag = :etag", *input.ConditionExpression) + assert.Equal(t, &dynamodb.AttributeValue{ + S: aws.String("1bdead4badc0ffee"), + }, input.ExpressionAttributeValues[":etag"]) - ss := StateStore{ - client: &mockedDynamoDB{ - DeleteItemWithContextFn: func(ctx context.Context, input *dynamodb.DeleteItemInput, op ...request.Option) (output *dynamodb.DeleteItemOutput, err error) { - assert.Equal(t, map[string]*dynamodb.AttributeValue{ - "key": { - S: aws.String(req.Key), - }, - }, input.Key) - assert.Equal(t, "etag = :etag", *input.ConditionExpression) - assert.Equal(t, &dynamodb.AttributeValue{ - S: aws.String("1bdead4badc0ffee"), - }, input.ExpressionAttributeValues[":etag"]) - - return nil, nil - }, + return nil, nil }, } + err := ss.Delete(context.Background(), req) assert.Nil(t, err) }) @@ -1073,24 +937,24 @@ func TestDelete(t *testing.T) { Key: "key", } - ss := StateStore{ - client: &mockedDynamoDB{ - DeleteItemWithContextFn: func(ctx context.Context, input *dynamodb.DeleteItemInput, op ...request.Option) (output *dynamodb.DeleteItemOutput, err error) { - assert.Equal(t, map[string]*dynamodb.AttributeValue{ - "key": { - S: aws.String(req.Key), - }, - }, input.Key) - assert.Equal(t, "etag = :etag", *input.ConditionExpression) - assert.Equal(t, &dynamodb.AttributeValue{ - S: aws.String("bogusetag"), - }, input.ExpressionAttributeValues[":etag"]) + ss := NewDynamoDBStateStore(logger.NewLogger("test")).(*StateStore) + ss.client = &mockedDynamoDB{ + DeleteItemWithContextFn: func(ctx context.Context, input *dynamodb.DeleteItemInput, op ...request.Option) (output *dynamodb.DeleteItemOutput, err error) { + assert.Equal(t, map[string]*dynamodb.AttributeValue{ + "key": { + S: aws.String(req.Key), + }, + }, input.Key) + assert.Equal(t, "etag = :etag", *input.ConditionExpression) + assert.Equal(t, &dynamodb.AttributeValue{ + S: aws.String("bogusetag"), + }, input.ExpressionAttributeValues[":etag"]) - var checkErr dynamodb.ConditionalCheckFailedException - return nil, &checkErr - }, + var checkErr dynamodb.ConditionalCheckFailedException + return nil, &checkErr }, } + err := ss.Delete(context.Background(), req) assert.NotNil(t, err) switch tagErr := err.(type) { @@ -1115,68 +979,43 @@ func TestDelete(t *testing.T) { err := ss.Delete(context.Background(), req) assert.NotNil(t, err) }) - - t.Run("Successfully delete item with metadata partition key", func(t *testing.T) { - req := &state.DeleteRequest{ - Key: "key", - Metadata: map[string]string{ - metadataPartitionKey: pkey, - }, - } - - ss := StateStore{ - client: &mockedDynamoDB{ - DeleteItemWithContextFn: func(ctx context.Context, input *dynamodb.DeleteItemInput, op ...request.Option) (output *dynamodb.DeleteItemOutput, err error) { - assert.Equal(t, map[string]*dynamodb.AttributeValue{ - "key": { - S: aws.String(pkey), - }, - }, input.Key) - - return nil, nil - }, - }, - } - err := ss.Delete(context.Background(), req) - assert.Nil(t, err) - }) } func TestBulkDelete(t *testing.T) { t.Run("Successfully delete items", func(t *testing.T) { - ss := StateStore{ - client: &mockedDynamoDB{ - BatchWriteItemWithContextFn: func(ctx context.Context, input *dynamodb.BatchWriteItemInput, op ...request.Option) (output *dynamodb.BatchWriteItemOutput, err error) { - expected := map[string][]*dynamodb.WriteRequest{} - expected[tableName] = []*dynamodb.WriteRequest{ - { - DeleteRequest: &dynamodb.DeleteRequest{ - Key: map[string]*dynamodb.AttributeValue{ - "key": { - S: aws.String("key1"), - }, + ss := NewDynamoDBStateStore(logger.NewLogger("test")).(*StateStore) + ss.client = &mockedDynamoDB{ + BatchWriteItemWithContextFn: func(ctx context.Context, input *dynamodb.BatchWriteItemInput, op ...request.Option) (output *dynamodb.BatchWriteItemOutput, err error) { + expected := map[string][]*dynamodb.WriteRequest{} + expected[tableName] = []*dynamodb.WriteRequest{ + { + DeleteRequest: &dynamodb.DeleteRequest{ + Key: map[string]*dynamodb.AttributeValue{ + "key": { + S: aws.String("key1"), }, }, }, - { - DeleteRequest: &dynamodb.DeleteRequest{ - Key: map[string]*dynamodb.AttributeValue{ - "key": { - S: aws.String("key2"), - }, + }, + { + DeleteRequest: &dynamodb.DeleteRequest{ + Key: map[string]*dynamodb.AttributeValue{ + "key": { + S: aws.String("key2"), }, }, }, - } - assert.Equal(t, expected, input.RequestItems) + }, + } + assert.Equal(t, expected, input.RequestItems) - return &dynamodb.BatchWriteItemOutput{ - UnprocessedItems: map[string][]*dynamodb.WriteRequest{}, - }, nil - }, + return &dynamodb.BatchWriteItemOutput{ + UnprocessedItems: map[string][]*dynamodb.WriteRequest{}, + }, nil }, - table: tableName, } + ss.table = tableName + req := []state.DeleteRequest{ { Key: "key1", @@ -1207,55 +1046,4 @@ func TestBulkDelete(t *testing.T) { err := ss.BulkDelete(context.Background(), req) assert.NotNil(t, err) }) - t.Run("Successfully delete items with metadata partition key", func(t *testing.T) { - ss := StateStore{ - client: &mockedDynamoDB{ - BatchWriteItemWithContextFn: func(ctx context.Context, input *dynamodb.BatchWriteItemInput, op ...request.Option) (output *dynamodb.BatchWriteItemOutput, err error) { - expected := map[string][]*dynamodb.WriteRequest{} - expected[tableName] = []*dynamodb.WriteRequest{ - { - DeleteRequest: &dynamodb.DeleteRequest{ - Key: map[string]*dynamodb.AttributeValue{ - "key": { - S: aws.String(pkey), - }, - }, - }, - }, - { - DeleteRequest: &dynamodb.DeleteRequest{ - Key: map[string]*dynamodb.AttributeValue{ - "key": { - S: aws.String(pkey), - }, - }, - }, - }, - } - assert.Equal(t, expected, input.RequestItems) - - return &dynamodb.BatchWriteItemOutput{ - UnprocessedItems: map[string][]*dynamodb.WriteRequest{}, - }, nil - }, - }, - table: tableName, - } - req := []state.DeleteRequest{ - { - Key: "key1", - Metadata: map[string]string{ - metadataPartitionKey: pkey, - }, - }, - { - Key: "key2", - Metadata: map[string]string{ - metadataPartitionKey: pkey, - }, - }, - } - err := ss.BulkDelete(context.Background(), req) - assert.Nil(t, err) - }) }