Ensure proper handling of binary data in DynamoDB state store (#3658)

Signed-off-by: distkloc <1193849+distkloc@users.noreply.github.com>
Co-authored-by: Josh van Leeuwen <me@joshvanl.dev>
Co-authored-by: Cassie Coyle <cassie@diagrid.io>
Co-authored-by: Yaron Schneider <schneider.yaron@live.com>
This commit is contained in:
Yuichi Kanbayashi 2025-06-26 22:52:48 +09:00 committed by GitHub
parent 849f139ff6
commit e4a8a3e868
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 128 additions and 45 deletions

View File

@ -61,6 +61,13 @@ type dynamoDBMetadata struct {
PartitionKey string `json:"partitionKey"`
}
type putData struct {
ConditionExpression *string
ExpressionAttributeValues map[string]*dynamodb.AttributeValue
Item map[string]*dynamodb.AttributeValue
TableName *string
}
const (
defaultPartitionKeyName = "key"
metadataPartitionKey = "partitionKey"
@ -164,9 +171,9 @@ func (d *StateStore) Get(ctx context.Context, req *state.GetRequest) (*state.Get
return &state.GetResponse{}, nil
}
var output string
if err = dynamodbattribute.Unmarshal(result.Item["value"], &output); err != nil {
return nil, err
data, err := unmarshalValue(result.Item["value"])
if err != nil {
return nil, fmt.Errorf("dynamodb error: failed to unmarshal value for key %s: %w", req.Key, err)
}
var metadata map[string]string
@ -187,7 +194,7 @@ func (d *StateStore) Get(ctx context.Context, req *state.GetRequest) (*state.Get
}
resp := &state.GetResponse{
Data: []byte(output),
Data: data,
Metadata: metadata,
}
@ -205,29 +212,12 @@ func (d *StateStore) Get(ctx context.Context, req *state.GetRequest) (*state.Get
// Set saves a dynamoDB item.
func (d *StateStore) Set(ctx context.Context, req *state.SetRequest) error {
item, err := d.getItemFromReq(req)
pd, err := d.createPutData(req)
if err != nil {
return err
}
input := &dynamodb.PutItemInput{
Item: item,
TableName: &d.table,
}
if req.HasETag() {
condExpr := "etag = :etag"
input.ConditionExpression = &condExpr
exprAttrValues := make(map[string]*dynamodb.AttributeValue)
exprAttrValues[":etag"] = &dynamodb.AttributeValue{
S: req.ETag,
}
input.ExpressionAttributeValues = exprAttrValues
} else if req.Options.Concurrency == state.FirstWrite {
condExpr := "attribute_not_exists(etag)"
input.ConditionExpression = &condExpr
}
_, err = d.authProvider.DynamoDB().DynamoDB.PutItemWithContext(ctx, input)
_, err = d.authProvider.DynamoDB().DynamoDB.PutItemWithContext(ctx, pd.ToPutItemInput())
if err != nil && req.HasETag() {
switch cErr := err.(type) {
case *dynamodb.ConditionalCheckFailedException:
@ -292,9 +282,55 @@ func (d *StateStore) getDynamoDBMetadata(meta state.Metadata) (*dynamoDBMetadata
return &m, err
}
// getItemFromReq converts a dapr state.SetRequest into an dynamodb item
func (d *StateStore) getItemFromReq(req *state.SetRequest) (map[string]*dynamodb.AttributeValue, error) {
value, err := d.marshalToString(req.Value)
// createPutData creates a DynamoDB put request data from a SetRequest.
func (d *StateStore) createPutData(req *state.SetRequest) (putData, error) {
item, err := d.createItem(req)
if err != nil {
return putData{}, err
}
pd := putData{
Item: item,
TableName: ptr.Of(d.table),
}
if req.HasETag() {
condExpr := "etag = :etag"
pd.ConditionExpression = &condExpr
exprAttrValues := make(map[string]*dynamodb.AttributeValue)
exprAttrValues[":etag"] = &dynamodb.AttributeValue{
S: req.ETag,
}
pd.ExpressionAttributeValues = exprAttrValues
} else if req.Options.Concurrency == state.FirstWrite {
condExpr := "attribute_not_exists(etag)"
pd.ConditionExpression = &condExpr
}
return pd, nil
}
func (d putData) ToPutItemInput() *dynamodb.PutItemInput {
return &dynamodb.PutItemInput{
ConditionExpression: d.ConditionExpression,
ExpressionAttributeValues: d.ExpressionAttributeValues,
Item: d.Item,
TableName: d.TableName,
}
}
func (d putData) ToPut() *dynamodb.Put {
return &dynamodb.Put{
ConditionExpression: d.ConditionExpression,
ExpressionAttributeValues: d.ExpressionAttributeValues,
Item: d.Item,
TableName: d.TableName,
}
}
// createItem creates a DynamoDB item from a SetRequest.
func (d *StateStore) createItem(req *state.SetRequest) (map[string]*dynamodb.AttributeValue, error) {
value, err := marshalValue(req.Value)
if err != nil {
return nil, fmt.Errorf("dynamodb error: failed to marshal value for key %s: %w", req.Key, err)
}
@ -313,9 +349,7 @@ func (d *StateStore) getItemFromReq(req *state.SetRequest) (map[string]*dynamodb
d.partitionKey: {
S: ptr.Of(req.Key),
},
"value": {
S: ptr.Of(value),
},
"value": value,
"etag": {
S: ptr.Of(strconv.FormatUint(newEtag, 16)),
},
@ -340,12 +374,27 @@ func getRand64() (uint64, error) {
return binary.LittleEndian.Uint64(randBuf), nil
}
func (d *StateStore) marshalToString(v interface{}) (string, error) {
if buf, ok := v.([]byte); ok {
return string(buf), nil
func marshalValue(v interface{}) (*dynamodb.AttributeValue, error) {
if bt, ok := v.([]byte); ok {
return &dynamodb.AttributeValue{B: bt}, nil
}
return jsoniterator.ConfigFastest.MarshalToString(v)
str, err := jsoniterator.ConfigFastest.MarshalToString(v)
if err != nil {
return nil, err
}
return &dynamodb.AttributeValue{S: ptr.Of(str)}, nil
}
func unmarshalValue(value *dynamodb.AttributeValue) ([]byte, error) {
if value == nil {
return []byte(nil), nil
}
if value.B != nil {
return value.B, nil
}
return []byte(*value.S), nil
}
// Parse and process ttlInSeconds.
@ -404,21 +453,11 @@ func (d *StateStore) Multi(ctx context.Context, request *state.TransactionalStat
twi := &dynamodb.TransactWriteItem{}
switch req := o.(type) {
case state.SetRequest:
value, err := d.marshalToString(req.Value)
pd, err := d.createPutData(&req)
if err != nil {
return fmt.Errorf("dynamodb error: failed to marshal value for key %s: %w", req.Key, err)
}
twi.Put = &dynamodb.Put{
TableName: ptr.Of(d.table),
Item: map[string]*dynamodb.AttributeValue{
d.partitionKey: {
S: ptr.Of(req.Key),
},
"value": {
S: ptr.Of(value),
},
},
}
twi.Put = pd.ToPut()
case state.DeleteRequest:
twi.Delete = &dynamodb.Delete{

View File

@ -452,6 +452,50 @@ func TestSet(t *testing.T) {
require.NoError(t, err)
})
t.Run("Successfully set item with binary value", func(t *testing.T) {
mockedDB := &awsAuth.MockDynamoDB{
PutItemWithContextFn: func(ctx context.Context, input *dynamodb.PutItemInput, op ...request.Option) (output *dynamodb.PutItemOutput, err error) {
assert.Equal(t, dynamodb.AttributeValue{
S: aws.String("key"),
}, *input.Item["key"])
assert.Equal(t, dynamodb.AttributeValue{
B: []byte("value"),
}, *input.Item["value"])
assert.Len(t, input.Item, 3)
return &dynamodb.PutItemOutput{
Attributes: map[string]*dynamodb.AttributeValue{
"key": {
S: aws.String("value"),
},
},
}, nil
},
}
dynamo := awsAuth.DynamoDBClients{
DynamoDB: mockedDB,
}
mockedClients := awsAuth.Clients{
Dynamo: &dynamo,
}
mockAuthProvider := &awsAuth.StaticAuth{}
mockAuthProvider.WithMockClients(&mockedClients)
s := StateStore{
authProvider: mockAuthProvider,
partitionKey: defaultPartitionKeyName,
}
req := &state.SetRequest{
Key: "key",
Value: []byte("value"),
}
err := s.Set(t.Context(), req)
require.NoError(t, err)
})
t.Run("Successfully set item with matching etag", func(t *testing.T) {
mockedDB := &awsAuth.MockDynamoDB{
PutItemWithContextFn: func(ctx context.Context, input *dynamodb.PutItemInput, op ...request.Option) (output *dynamodb.PutItemOutput, err error) {