dynamodb state: refactor common Set() & BulkSet() logic

Both Set() and BulkSet() contained duplicate logic to convert a dapr
state.SetRequest into a map[string]*dynamodb.AttributeValue. This
change refactors that logic into a new utility function
getItemFromReq(). This will ensure that subsequent changes to add
missing features (e.g. etag) will only have to be expressed once.

Signed-off-by: Mike Brown <github@torvosoft.com>
This commit is contained in:
Mike Brown 2022-08-17 16:29:54 -04:00 committed by Bernd Verst
parent 5aae9c1126
commit ad84226cf4
1 changed files with 34 additions and 61 deletions

View File

@ -128,38 +128,9 @@ func (d *StateStore) BulkGet(req []state.GetRequest) (bool, []state.BulkGetRespo
// Set saves a dynamoDB item.
func (d *StateStore) Set(req *state.SetRequest) error {
value, err := d.marshalToString(req.Value)
item, err := d.getItemFromReq(req)
if err != nil {
return fmt.Errorf("dynamodb error: failed to set key %s: %s", req.Key, err)
}
ttl, err := d.parseTTL(req)
if err != nil {
return fmt.Errorf("dynamodb error: failed to parse ttlInSeconds: %s", err)
}
var item map[string]*dynamodb.AttributeValue
if ttl != nil {
item = map[string]*dynamodb.AttributeValue{
"key": {
S: aws.String(req.Key),
},
"value": {
S: aws.String(value),
},
d.ttlAttributeName: {
N: aws.String(strconv.FormatInt(*ttl, 10)),
},
}
} else {
item = map[string]*dynamodb.AttributeValue{
"key": {
S: aws.String(req.Key),
},
"value": {
S: aws.String(value),
},
}
return err
}
input := &dynamodb.PutItemInput{
@ -178,37 +149,9 @@ func (d *StateStore) BulkSet(req []state.SetRequest) error {
for _, r := range req {
r := r // avoid G601.
value, err := d.marshalToString(r.Value)
item, err := d.getItemFromReq(&r)
if err != nil {
return fmt.Errorf("dynamodb error: failed to set key %s: %s", r.Key, err)
}
ttl, err := d.parseTTL(&r)
if err != nil {
return fmt.Errorf("dynamodb error: failed to parse ttlInSeconds: %s", err)
}
var item map[string]*dynamodb.AttributeValue
if ttl != nil {
item = map[string]*dynamodb.AttributeValue{
"key": {
S: aws.String(r.Key),
},
"value": {
S: aws.String(value),
},
d.ttlAttributeName: {
N: aws.String(strconv.FormatInt(*ttl, 10)),
},
}
} else {
item = map[string]*dynamodb.AttributeValue{
"key": {
S: aws.String(r.Key),
},
"value": {
S: aws.String(value),
},
}
return err
}
writeRequest := &dynamodb.WriteRequest{
@ -300,6 +243,36 @@ func (d *StateStore) getClient(metadata *dynamoDBMetadata) (*dynamodb.DynamoDB,
return c, nil
}
// getItemFromReq converts a dapr state.SetRequest into an dynamodb item
func (d *StateStore) getItemFromReq(req *state.SetRequest) (map[string]*dynamodb.AttributeValue, error) {
value, err := d.marshalToString(req.Value)
if err != nil {
return nil, fmt.Errorf("dynamodb error: failed to set key %s: %s", req.Key, err)
}
ttl, err := d.parseTTL(req)
if err != nil {
return nil, fmt.Errorf("dynamodb error: failed to parse ttlInSeconds: %s", err)
}
item := map[string]*dynamodb.AttributeValue{
"key": {
S: aws.String(req.Key),
},
"value": {
S: aws.String(value),
},
}
if ttl != nil {
item[d.ttlAttributeName] = &dynamodb.AttributeValue{
N: aws.String(strconv.FormatInt(*ttl, 10)),
}
}
return item, nil
}
func (d *StateStore) marshalToString(v interface{}) (string, error) {
if buf, ok := v.([]byte); ok {
return string(buf), nil