Provide distinction between missing and empty etags (#600)

* provide distinction between missing and empty etags

* linter

* linter 2
This commit is contained in:
Yaron Schneider 2021-01-14 10:41:38 -08:00 committed by GitHub
parent fe9faadb46
commit 99256e6bec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 166 additions and 97 deletions

View File

@ -104,9 +104,9 @@ func (aspike *Aerospike) Set(req *state.SetRequest) error {
writePolicy := &as.WritePolicy{} writePolicy := &as.WritePolicy{}
// not a new record // not a new record
if req.ETag != "" { if req.ETag != nil {
var gen uint32 var gen uint32
gen, err = convertETag(req.ETag) gen, err = convertETag(*req.ETag)
if err != nil { if err != nil {
return err return err
} }
@ -133,7 +133,7 @@ func (aspike *Aerospike) Set(req *state.SetRequest) error {
} }
err = aspike.client.Put(writePolicy, asKey, as.BinMap(data)) err = aspike.client.Put(writePolicy, asKey, as.BinMap(data))
if err != nil { if err != nil {
if req.ETag != "" { if req.ETag != nil {
return state.NewETagError(state.ETagMismatch, err) return state.NewETagError(state.ETagMismatch, err)
} }
@ -183,9 +183,9 @@ func (aspike *Aerospike) Delete(req *state.DeleteRequest) error {
} }
writePolicy := &as.WritePolicy{} writePolicy := &as.WritePolicy{}
if req.ETag != "" { if req.ETag != nil {
var gen uint32 var gen uint32
gen, err = convertETag(req.ETag) gen, err = convertETag(*req.ETag)
if err != nil { if err != nil {
return err return err
} }
@ -208,7 +208,7 @@ func (aspike *Aerospike) Delete(req *state.DeleteRequest) error {
_, err = aspike.client.Delete(writePolicy, asKey) _, err = aspike.client.Delete(writePolicy, asKey)
if err != nil { if err != nil {
if req.ETag != "" { if req.ETag != nil {
return state.NewETagError(state.ETagMismatch, err) return state.NewETagError(state.ETagMismatch, err)
} }

View File

@ -186,8 +186,12 @@ func (r *StateStore) writeFile(req *state.SetRequest) error {
accessConditions := azblob.BlobAccessConditions{} accessConditions := azblob.BlobAccessConditions{}
if req.Options.Concurrency == state.LastWrite { if req.Options.Concurrency == state.FirstWrite && req.ETag != nil {
accessConditions.IfMatch = azblob.ETag(req.ETag) var etag string
if req.ETag != nil {
etag = *req.ETag
}
accessConditions.IfMatch = azblob.ETag(etag)
} }
_, err := azblob.UploadBufferToBlockBlob(context.Background(), r.marshal(req), blobURL, azblob.UploadToBlockBlobOptions{ _, err := azblob.UploadBufferToBlockBlob(context.Background(), r.marshal(req), blobURL, azblob.UploadToBlockBlobOptions{
@ -198,7 +202,7 @@ func (r *StateStore) writeFile(req *state.SetRequest) error {
if err != nil { if err != nil {
r.logger.Debugf("write file %s, err %s", req.Key, err) r.logger.Debugf("write file %s, err %s", req.Key, err)
if req.ETag != "" { if req.ETag != nil {
return state.NewETagError(state.ETagMismatch, err) return state.NewETagError(state.ETagMismatch, err)
} }
@ -212,15 +216,19 @@ func (r *StateStore) deleteFile(req *state.DeleteRequest) error {
blobURL := r.containerURL.NewBlockBlobURL(getFileName((req.Key))) blobURL := r.containerURL.NewBlockBlobURL(getFileName((req.Key)))
accessConditions := azblob.BlobAccessConditions{} accessConditions := azblob.BlobAccessConditions{}
if req.Options.Concurrency == state.LastWrite { if req.Options.Concurrency == state.FirstWrite && req.ETag != nil {
accessConditions.IfMatch = azblob.ETag(req.ETag) var etag string
if req.ETag != nil {
etag = *req.ETag
}
accessConditions.IfMatch = azblob.ETag(etag)
} }
_, err := blobURL.Delete(context.Background(), azblob.DeleteSnapshotsOptionNone, accessConditions) _, err := blobURL.Delete(context.Background(), azblob.DeleteSnapshotsOptionNone, accessConditions)
if err != nil { if err != nil {
r.logger.Debugf("delete file %s, err %s", req.Key, err) r.logger.Debugf("delete file %s, err %s", req.Key, err)
if req.ETag != "" { if req.ETag != nil {
return state.NewETagError(state.ETagMismatch, err) return state.NewETagError(state.ETagMismatch, err)
} }

View File

@ -200,8 +200,12 @@ func (c *StateStore) Set(req *state.SetRequest) error {
partitionKey := populatePartitionMetadata(req.Key, req.Metadata) partitionKey := populatePartitionMetadata(req.Key, req.Metadata)
options := []documentdb.CallOption{documentdb.PartitionKey(partitionKey)} options := []documentdb.CallOption{documentdb.PartitionKey(partitionKey)}
if req.ETag != "" { if req.ETag != nil {
options = append(options, documentdb.IfMatch((req.ETag))) var etag string
if req.ETag != nil {
etag = *req.ETag
}
options = append(options, documentdb.IfMatch((etag)))
} }
if req.Options.Consistency == state.Strong { if req.Options.Consistency == state.Strong {
options = append(options, documentdb.ConsistencyLevel(documentdb.Strong)) options = append(options, documentdb.ConsistencyLevel(documentdb.Strong))
@ -226,7 +230,7 @@ func (c *StateStore) Set(req *state.SetRequest) error {
} }
if err != nil { if err != nil {
if req.ETag != "" { if req.ETag != nil {
return state.NewETagError(state.ETagMismatch, err) return state.NewETagError(state.ETagMismatch, err)
} }
@ -259,8 +263,12 @@ func (c *StateStore) Delete(req *state.DeleteRequest) error {
return nil return nil
} }
if req.ETag != "" { if req.ETag != nil {
options = append(options, documentdb.IfMatch((req.ETag))) var etag string
if req.ETag != nil {
etag = *req.ETag
}
options = append(options, documentdb.IfMatch((etag)))
} }
if req.Options.Consistency == state.Strong { if req.Options.Consistency == state.Strong {
options = append(options, documentdb.ConsistencyLevel(documentdb.Strong)) options = append(options, documentdb.ConsistencyLevel(documentdb.Strong))
@ -275,7 +283,7 @@ func (c *StateStore) Delete(req *state.DeleteRequest) error {
} }
if err != nil { if err != nil {
if req.ETag != "" { if req.ETag != nil {
return state.NewETagError(state.ETagMismatch, err) return state.NewETagError(state.ETagMismatch, err)
} }
} }

View File

@ -97,7 +97,7 @@ func (r *StateStore) Delete(req *state.DeleteRequest) error {
err := r.deleteRow(req) err := r.deleteRow(req)
if err != nil { if err != nil {
if req.ETag != "" { if req.ETag != nil {
return state.NewETagError(state.ETagMismatch, err) return state.NewETagError(state.ETagMismatch, err)
} }
} }
@ -131,7 +131,7 @@ func (r *StateStore) Set(req *state.SetRequest) error {
err := r.writeRow(req) err := r.writeRow(req)
if err != nil { if err != nil {
if req.ETag != "" { if req.ETag != nil {
return state.NewETagError(state.ETagMismatch, err) return state.NewETagError(state.ETagMismatch, err)
} }
} }
@ -179,7 +179,12 @@ func (r *StateStore) writeRow(req *state.SetRequest) error {
entity.Properties = map[string]interface{}{ entity.Properties = map[string]interface{}{
valueEntityProperty: r.marshal(req), valueEntityProperty: r.marshal(req),
} }
entity.OdataEtag = req.ETag
var etag string
if req.ETag != nil {
etag = *req.ETag
}
entity.OdataEtag = etag
// InsertOrReplace does not support ETag concurrency, therefore we will try to use Update method first // InsertOrReplace does not support ETag concurrency, therefore we will try to use Update method first
// as it's more frequent, and then Insert // as it's more frequent, and then Insert
@ -212,7 +217,12 @@ func isTableAlreadyExistsError(err error) bool {
func (r *StateStore) deleteRow(req *state.DeleteRequest) error { func (r *StateStore) deleteRow(req *state.DeleteRequest) error {
pk, rk := getPartitionAndRowKey(req.Key) pk, rk := getPartitionAndRowKey(req.Key)
entity := r.table.GetEntityReference(pk, rk) entity := r.table.GetEntityReference(pk, rk)
entity.OdataEtag = req.ETag
var etag string
if req.ETag != nil {
etag = *req.ETag
}
entity.OdataEtag = etag
return entity.Delete(true, nil) return entity.Delete(true, nil)
} }

View File

@ -534,9 +534,14 @@ func (c *CRDT) Delete(req *state.DeleteRequest) error {
defer cancel() defer cancel()
client := c.getClient() client := c.getClient()
var etag string
if req.ETag != nil {
etag = *req.ETag
}
_, err = client.DeleteState(ctx, &kvstore_pb.DeleteStateEnvelope{ _, err = client.DeleteState(ctx, &kvstore_pb.DeleteStateEnvelope{
Key: req.Key, Key: req.Key,
Etag: req.ETag, Etag: etag,
}) })
return err return err
@ -578,9 +583,14 @@ func (c *CRDT) Set(req *state.SetRequest) error {
} }
client := c.getClient() client := c.getClient()
var etag string
if req.ETag != nil {
etag = *req.ETag
}
_, err = client.SaveState(ctx, &kvstore_pb.SaveStateEnvelope{ _, err = client.SaveState(ctx, &kvstore_pb.SaveStateEnvelope{
Key: req.Key, Key: req.Key,
Etag: req.ETag, Etag: etag,
Value: &any.Any{ Value: &any.Any{
Value: bt, Value: bt,
}, },

View File

@ -139,9 +139,9 @@ func (cbs *Couchbase) Set(req *state.SetRequest) error {
// nolint:nestif // nolint:nestif
// key already exists (use Replace) // key already exists (use Replace)
if req.ETag != "" { if req.ETag != nil {
// compare-and-swap (CAS) for managing concurrent modifications - https://docs.couchbase.com/go-sdk/current/concurrent-mutations-cluster.html // compare-and-swap (CAS) for managing concurrent modifications - https://docs.couchbase.com/go-sdk/current/concurrent-mutations-cluster.html
cas, cerr := eTagToCas(req.ETag) cas, cerr := eTagToCas(*req.ETag)
if cerr != nil { if cerr != nil {
return err return err
} }
@ -160,7 +160,7 @@ func (cbs *Couchbase) Set(req *state.SetRequest) error {
} }
if err != nil { if err != nil {
if req.ETag != "" { if req.ETag != nil {
return state.NewETagError(state.ETagMismatch, err) return state.NewETagError(state.ETagMismatch, err)
} }
@ -197,8 +197,8 @@ func (cbs *Couchbase) Delete(req *state.DeleteRequest) error {
var cas gocb.Cas = 0 var cas gocb.Cas = 0
if req.ETag != "" { if req.ETag != nil {
cas, err = eTagToCas(req.ETag) cas, err = eTagToCas(*req.ETag)
if err != nil { if err != nil {
return err return err
} }
@ -209,7 +209,7 @@ func (cbs *Couchbase) Delete(req *state.DeleteRequest) error {
_, err = cbs.bucket.Remove(req.Key, cas) _, err = cbs.bucket.Remove(req.Key, cas)
} }
if err != nil { if err != nil {
if req.ETag != "" { if req.ETag != nil {
return state.NewETagError(state.ETagMismatch, err) return state.NewETagError(state.ETagMismatch, err)
} }

View File

@ -104,7 +104,7 @@ func (p *postgresDBAccess) setValue(req *state.SetRequest) error {
// Sprintf is required for table name because sql.DB does not substitute parameters for table names. // Sprintf is required for table name because sql.DB does not substitute parameters for table names.
// Other parameters use sql.DB parameter substitution. // Other parameters use sql.DB parameter substitution.
if req.ETag == "" { if req.ETag == nil {
result, err = p.db.Exec(fmt.Sprintf( result, err = p.db.Exec(fmt.Sprintf(
`INSERT INTO %s (key, value) VALUES ($1, $2) `INSERT INTO %s (key, value) VALUES ($1, $2)
ON CONFLICT (key) DO UPDATE SET value = $2, updatedate = NOW();`, ON CONFLICT (key) DO UPDATE SET value = $2, updatedate = NOW();`,
@ -112,7 +112,7 @@ func (p *postgresDBAccess) setValue(req *state.SetRequest) error {
} else { } else {
// Convert req.ETag to integer for postgres compatibility // Convert req.ETag to integer for postgres compatibility
var etag int var etag int
etag, err = strconv.Atoi(req.ETag) etag, err = strconv.Atoi(*req.ETag)
if err != nil { if err != nil {
return state.NewETagError(state.ETagInvalid, err) return state.NewETagError(state.ETagInvalid, err)
} }
@ -170,11 +170,11 @@ func (p *postgresDBAccess) deleteValue(req *state.DeleteRequest) error {
var result sql.Result var result sql.Result
var err error var err error
if req.ETag == "" { if req.ETag == nil {
result, err = p.db.Exec("DELETE FROM state WHERE key = $1", req.Key) result, err = p.db.Exec("DELETE FROM state WHERE key = $1", req.Key)
} else { } else {
// Convert req.ETag to integer for postgres compatibility // Convert req.ETag to integer for postgres compatibility
etag, conversionError := strconv.Atoi(req.ETag) etag, conversionError := strconv.Atoi(*req.ETag)
if conversionError != nil { if conversionError != nil {
return state.NewETagError(state.ETagInvalid, err) return state.NewETagError(state.ETagInvalid, err)
} }

View File

@ -293,10 +293,11 @@ func deleteWithInvalidEtagFails(t *testing.T, pgs *PostgreSQL) {
value := &fakeItem{Color: "mauve"} value := &fakeItem{Color: "mauve"}
setItem(t, pgs, key, value, "") setItem(t, pgs, key, value, "")
etag := "1234"
// Delete the item with a fake etag // Delete the item with a fake etag
deleteReq := &state.DeleteRequest{ deleteReq := &state.DeleteRequest{
Key: key, Key: key,
ETag: "1234", ETag: &etag,
} }
err := pgs.Delete(deleteReq) err := pgs.Delete(deleteReq)
assert.NotNil(t, err) assert.NotNil(t, err)
@ -317,7 +318,7 @@ func newItemWithEtagFails(t *testing.T, pgs *PostgreSQL) {
setReq := &state.SetRequest{ setReq := &state.SetRequest{
Key: randomKey(), Key: randomKey(),
ETag: invalidEtag, ETag: &invalidEtag,
Value: value, Value: value,
} }
@ -344,7 +345,7 @@ func updateWithOldEtagFails(t *testing.T, pgs *PostgreSQL) {
newValue = &fakeItem{Color: "maroon"} newValue = &fakeItem{Color: "maroon"}
setReq := &state.SetRequest{ setReq := &state.SetRequest{
Key: key, Key: key,
ETag: originalEtag, ETag: &originalEtag,
Value: newValue, Value: newValue,
} }
err := pgs.Set(setReq) err := pgs.Set(setReq)
@ -504,7 +505,7 @@ func getConnectionString() string {
func setItem(t *testing.T, pgs *PostgreSQL, key string, value interface{}, etag string) { func setItem(t *testing.T, pgs *PostgreSQL, key string, value interface{}, etag string) {
setReq := &state.SetRequest{ setReq := &state.SetRequest{
Key: key, Key: key,
ETag: etag, ETag: &etag,
Value: value, Value: value,
} }
@ -532,7 +533,7 @@ func getItem(t *testing.T, pgs *PostgreSQL, key string) (*state.GetResponse, *fa
func deleteItem(t *testing.T, pgs *PostgreSQL, key string, etag string) { func deleteItem(t *testing.T, pgs *PostgreSQL, key string, etag string) {
deleteReq := &state.DeleteRequest{ deleteReq := &state.DeleteRequest{
Key: key, Key: key,
ETag: etag, ETag: &etag,
Options: state.DeleteStateOption{}, Options: state.DeleteStateOption{},
} }

View File

@ -215,10 +215,11 @@ func (r *StateStore) parseConnectedSlaves(res string) int {
} }
func (r *StateStore) deleteValue(req *state.DeleteRequest) error { func (r *StateStore) deleteValue(req *state.DeleteRequest) error {
if req.ETag == "" { if req.ETag == nil {
req.ETag = "0" etag := "0"
req.ETag = &etag
} }
_, err := r.client.DoContext(context.Background(), "EVAL", delQuery, 1, req.Key, req.ETag).Result() _, err := r.client.DoContext(context.Background(), "EVAL", delQuery, 1, req.Key, *req.ETag).Result()
if err != nil { if err != nil {
return state.NewETagError(state.ETagMismatch, err) return state.NewETagError(state.ETagMismatch, err)
} }
@ -292,7 +293,7 @@ func (r *StateStore) setValue(req *state.SetRequest) error {
_, err = r.client.DoContext(context.Background(), "EVAL", setQuery, 1, req.Key, ver, bt).Result() _, err = r.client.DoContext(context.Background(), "EVAL", setQuery, 1, req.Key, ver, bt).Result()
if err != nil { if err != nil {
if req.ETag != "" { if req.ETag != nil {
return state.NewETagError(state.ETagMismatch, err) return state.NewETagError(state.ETagMismatch, err)
} }
@ -328,10 +329,11 @@ func (r *StateStore) Multi(request *state.TransactionalStateRequest) error {
pipe.Do("EVAL", setQuery, 1, req.Key, ver, bt) pipe.Do("EVAL", setQuery, 1, req.Key, ver, bt)
} else if o.Operation == state.Delete { } else if o.Operation == state.Delete {
req := o.Request.(state.DeleteRequest) req := o.Request.(state.DeleteRequest)
if req.ETag == "" { if req.ETag == nil {
req.ETag = "0" etag := "0"
req.ETag = &etag
} }
pipe.Do("EVAL", delQuery, 1, req.Key, req.ETag) pipe.Do("EVAL", delQuery, 1, req.Key, *req.ETag)
} }
} }
@ -362,10 +364,10 @@ func (r *StateStore) getKeyVersion(vals []interface{}) (data string, version str
} }
func (r *StateStore) parseETag(req *state.SetRequest) (int, error) { func (r *StateStore) parseETag(req *state.SetRequest) (int, error) {
if req.Options.Concurrency == state.LastWrite || req.ETag == "" { if req.Options.Concurrency == state.LastWrite || req.ETag == nil || (req.ETag != nil && *req.ETag == "") {
return 0, nil return 0, nil
} }
ver, err := strconv.Atoi(req.ETag) ver, err := strconv.Atoi(*req.ETag)
if err != nil { if err != nil {
return -1, state.NewETagError(state.ETagInvalid, err) return -1, state.NewETagError(state.ETagInvalid, err)
} }

View File

@ -52,31 +52,35 @@ func TestGetKeyVersion(t *testing.T) {
func TestParseEtag(t *testing.T) { func TestParseEtag(t *testing.T) {
store := NewRedisStateStore(logger.NewLogger("test")) store := NewRedisStateStore(logger.NewLogger("test"))
t.Run("Empty ETag", func(t *testing.T) { t.Run("Empty ETag", func(t *testing.T) {
etag := ""
ver, err := store.parseETag(&state.SetRequest{ ver, err := store.parseETag(&state.SetRequest{
ETag: "", ETag: &etag,
}) })
assert.Equal(t, nil, err, "failed to parse ETag") assert.Equal(t, nil, err, "failed to parse ETag")
assert.Equal(t, 0, ver, "default version should be 0") assert.Equal(t, 0, ver, "default version should be 0")
}) })
t.Run("Number ETag", func(t *testing.T) { t.Run("Number ETag", func(t *testing.T) {
etag := "354"
ver, err := store.parseETag(&state.SetRequest{ ver, err := store.parseETag(&state.SetRequest{
ETag: "354", ETag: &etag,
}) })
assert.Equal(t, nil, err, "failed to parse ETag") assert.Equal(t, nil, err, "failed to parse ETag")
assert.Equal(t, 354, ver, "version should be 254") assert.Equal(t, 354, ver, "version should be 254")
}) })
t.Run("String ETag", func(t *testing.T) { t.Run("String ETag", func(t *testing.T) {
etag := "dragon"
_, err := store.parseETag(&state.SetRequest{ _, err := store.parseETag(&state.SetRequest{
ETag: "dragon", ETag: &etag,
}) })
assert.NotNil(t, err, "shouldn't recognize string ETag") assert.NotNil(t, err, "shouldn't recognize string ETag")
}) })
t.Run("Concurrency=LastWrite", func(t *testing.T) { t.Run("Concurrency=LastWrite", func(t *testing.T) {
etag := "dragon"
ver, err := store.parseETag(&state.SetRequest{ ver, err := store.parseETag(&state.SetRequest{
Options: state.SetStateOption{ Options: state.SetStateOption{
Concurrency: state.LastWrite, Concurrency: state.LastWrite,
}, },
ETag: "dragon", ETag: &etag,
}) })
assert.Equal(t, nil, err, "failed to parse ETag") assert.Equal(t, nil, err, "failed to parse ETag")
assert.Equal(t, 0, ver, "version should be 0") assert.Equal(t, 0, ver, "version should be 0")
@ -154,12 +158,13 @@ func TestTransactionalDelete(t *testing.T) {
Value: "deathstar", Value: "deathstar",
}) })
etag := "1"
err := ss.Multi(&state.TransactionalStateRequest{ err := ss.Multi(&state.TransactionalStateRequest{
Operations: []state.TransactionalStateOperation{{ Operations: []state.TransactionalStateOperation{{
Operation: state.Delete, Operation: state.Delete,
Request: state.DeleteRequest{ Request: state.DeleteRequest{
Key: "weapon", Key: "weapon",
ETag: "1", ETag: &etag,
}, },
}}, }},
}) })

View File

@ -20,7 +20,7 @@ type GetStateOption struct {
// DeleteRequest is the object describing a delete state request // DeleteRequest is the object describing a delete state request
type DeleteRequest struct { type DeleteRequest struct {
Key string `json:"key"` Key string `json:"key"`
ETag string `json:"etag,omitempty"` ETag *string `json:"etag,omitempty"`
Metadata map[string]string `json:"metadata"` Metadata map[string]string `json:"metadata"`
Options DeleteStateOption `json:"options,omitempty"` Options DeleteStateOption `json:"options,omitempty"`
} }
@ -45,7 +45,7 @@ type DeleteStateOption struct {
type SetRequest struct { type SetRequest struct {
Key string `json:"key"` Key string `json:"key"`
Value interface{} `json:"value"` Value interface{} `json:"value"`
ETag string `json:"etag,omitempty"` ETag *string `json:"etag,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"` Metadata map[string]string `json:"metadata,omitempty"`
Options SetStateOption `json:"options,omitempty"` Options SetStateOption `json:"options,omitempty"`
} }

View File

@ -188,11 +188,16 @@ func (s *RethinkDB) Set(req *state.SetRequest) error {
func (s *RethinkDB) BulkSet(req []state.SetRequest) error { func (s *RethinkDB) BulkSet(req []state.SetRequest) error {
docs := make([]*stateRecord, len(req)) docs := make([]*stateRecord, len(req))
for i, v := range req { for i, v := range req {
var etag string
if v.ETag != nil {
etag = *v.ETag
}
docs[i] = &stateRecord{ docs[i] = &stateRecord{
ID: v.Key, ID: v.Key,
TS: time.Now().UTC().UnixNano(), TS: time.Now().UTC().UnixNano(),
Hash: v.ETag,
Data: v.Value, Data: v.Value,
Hash: etag,
} }
} }

View File

@ -93,7 +93,7 @@ func TestRethinkDBStateStore(t *testing.T) {
d2.F2 = 2 d2.F2 = 2
d2.F3 = time.Now().UTC() d2.F3 = time.Now().UTC()
tag := fmt.Sprintf("hash-%d", time.Now().UnixNano()) tag := fmt.Sprintf("hash-%d", time.Now().UnixNano())
if err = db.Set(&state.SetRequest{Key: k, Value: d2, ETag: tag}); err != nil { if err = db.Set(&state.SetRequest{Key: k, Value: d2, ETag: &tag}); err != nil {
t.Fatalf("error setting data to db: %v", err) t.Fatalf("error setting data to db: %v", err)
} }

View File

@ -323,9 +323,9 @@ func (s *SQLServer) executeMulti(sets []state.SetRequest, deletes []state.Delete
func (s *SQLServer) Delete(req *state.DeleteRequest) error { func (s *SQLServer) Delete(req *state.DeleteRequest) error {
var err error var err error
var res sql.Result var res sql.Result
if req.ETag != "" { if req.ETag != nil {
var b []byte var b []byte
b, err = hex.DecodeString(req.ETag) b, err = hex.DecodeString(*req.ETag)
if err != nil { if err != nil {
return state.NewETagError(state.ETagInvalid, err) return state.NewETagError(state.ETagInvalid, err)
} }
@ -336,7 +336,7 @@ func (s *SQLServer) Delete(req *state.DeleteRequest) error {
} }
if err != nil { if err != nil {
if req.ETag != "" { if req.ETag != nil {
return state.NewETagError(state.ETagMismatch, err) return state.NewETagError(state.ETagMismatch, err)
} }
@ -385,8 +385,8 @@ func (s *SQLServer) executeBulkDelete(db dbExecutor, req []state.DeleteRequest)
for i, d := range req { for i, d := range req {
var etag []byte var etag []byte
var err error var err error
if d.ETag != "" { if d.ETag != nil {
etag, err = hex.DecodeString(d.ETag) etag, err = hex.DecodeString(*d.ETag)
if err != nil { if err != nil {
return state.NewETagError(state.ETagInvalid, err) return state.NewETagError(state.ETagInvalid, err)
} }
@ -473,9 +473,9 @@ func (s *SQLServer) executeSet(db dbExecutor, req *state.SetRequest) error {
return err return err
} }
etag := sql.Named(rowVersionColumnName, nil) etag := sql.Named(rowVersionColumnName, nil)
if req.ETag != "" { if req.ETag != nil {
var b []byte var b []byte
b, err = hex.DecodeString(req.ETag) b, err = hex.DecodeString(*req.ETag)
if err != nil { if err != nil {
return state.NewETagError(state.ETagInvalid, err) return state.NewETagError(state.ETagInvalid, err)
} }
@ -483,7 +483,7 @@ func (s *SQLServer) executeSet(db dbExecutor, req *state.SetRequest) error {
} }
res, err := db.Exec(s.upsertCommand, sql.Named(keyColumnName, req.Key), sql.Named("Data", string(bytes)), etag) res, err := db.Exec(s.upsertCommand, sql.Named(keyColumnName, req.Key), sql.Named("Data", string(bytes)), etag)
if err != nil { if err != nil {
if req.ETag != "" { if req.ETag != nil && *req.ETag != "" {
return state.NewETagError(state.ETagMismatch, err) return state.NewETagError(state.ETagMismatch, err)
} }

View File

@ -27,10 +27,10 @@ const (
// connectionStringEnvKey defines the key containing the integration test connection string // connectionStringEnvKey defines the key containing the integration test connection string
// To use docker, server=localhost;user id=sa;password=Pass@Word1;port=1433; // To use docker, server=localhost;user id=sa;password=Pass@Word1;port=1433;
// To use Azure SQL, server=<your-db-server-name>.database.windows.net;user id=<your-db-user>;port=1433;password=<your-password>;database=dapr_test; // To use Azure SQL, server=<your-db-server-name>.database.windows.net;user id=<your-db-user>;port=1433;password=<your-password>;database=dapr_test;
connectionStringEnvKey = "DAPR_TEST_SQL_CONNSTRING" connectionStringEnvKey = "DAPR_TEST_SQL_CONNSTRING"
usersTableName = "Users" usersTableName = "Users"
invalidEtag = "FFFFFFFFFFFFFFFF" beverageTea = "tea"
beverageTea = "tea" invalidEtag string = "FFFFFFFFFFFFFFFF"
) )
type user struct { type user struct {
@ -215,6 +215,8 @@ func (n uuidKeyGenerator) NextKey() string {
} }
func testSingleOperations(t *testing.T) { func testSingleOperations(t *testing.T) {
invEtag := invalidEtag
tests := []struct { tests := []struct {
name string name string
kt KeyType kt KeyType
@ -241,7 +243,7 @@ func testSingleOperations(t *testing.T) {
// Update with ETAG // Update with ETAG
waterJohn := johnV1 waterJohn := johnV1
waterJohn.FavoriteBeverage = "Water" waterJohn.FavoriteBeverage = "Water"
err = store.Set(&state.SetRequest{Key: waterJohn.ID, Value: waterJohn, ETag: etagFromInsert}) err = store.Set(&state.SetRequest{Key: waterJohn.ID, Value: waterJohn, ETag: &etagFromInsert})
assert.Nil(t, err) assert.Nil(t, err)
// Get updated // Get updated
@ -259,17 +261,17 @@ func testSingleOperations(t *testing.T) {
// 8. Update with invalid ETAG should fail // 8. Update with invalid ETAG should fail
failedJohn := johnV3 failedJohn := johnV3
failedJohn.FavoriteBeverage = "Will not work" failedJohn.FavoriteBeverage = "Will not work"
err = store.Set(&state.SetRequest{Key: failedJohn.ID, Value: failedJohn, ETag: etagFromInsert}) err = store.Set(&state.SetRequest{Key: failedJohn.ID, Value: failedJohn, ETag: &etagFromInsert})
assert.NotNil(t, err) assert.NotNil(t, err)
_, etag := assertLoadedUserIsEqual(t, store, johnV3.ID, johnV3) _, etag := assertLoadedUserIsEqual(t, store, johnV3.ID, johnV3)
// 9. Delete with invalid ETAG should fail // 9. Delete with invalid ETAG should fail
err = store.Delete(&state.DeleteRequest{Key: johnV3.ID, ETag: invalidEtag}) err = store.Delete(&state.DeleteRequest{Key: johnV3.ID, ETag: &invEtag})
assert.NotNil(t, err) assert.NotNil(t, err)
assertLoadedUserIsEqual(t, store, johnV3.ID, johnV3) assertLoadedUserIsEqual(t, store, johnV3.ID, johnV3)
// 10. Delete with valid ETAG // 10. Delete with valid ETAG
err = store.Delete(&state.DeleteRequest{Key: johnV2.ID, ETag: etag}) err = store.Delete(&state.DeleteRequest{Key: johnV2.ID, ETag: &etag})
assert.Nil(t, err) assert.Nil(t, err)
assertUserDoesNotExist(t, store, johnV2.ID) assertUserDoesNotExist(t, store, johnV2.ID)
@ -282,7 +284,8 @@ func testSetNewRecordWithInvalidEtagShouldFail(t *testing.T) {
u := user{uuid.New().String(), "John", "Coffee"} u := user{uuid.New().String(), "John", "Coffee"}
err := store.Set(&state.SetRequest{Key: u.ID, Value: u, ETag: invalidEtag}) invEtag := invalidEtag
err := store.Set(&state.SetRequest{Key: u.ID, Value: u, ETag: &invEtag})
assert.NotNil(t, err) assert.NotNil(t, err)
} }
@ -397,8 +400,8 @@ func testMultiOperations(t *testing.T) {
err = store.Multi(&state.TransactionalStateRequest{ err = store.Multi(&state.TransactionalStateRequest{
Operations: []state.TransactionalStateOperation{ Operations: []state.TransactionalStateOperation{
{Operation: state.Delete, Request: state.DeleteRequest{Key: toDelete.ID, ETag: toDelete.etag}}, {Operation: state.Delete, Request: state.DeleteRequest{Key: toDelete.ID, ETag: &toDelete.etag}},
{Operation: state.Upsert, Request: state.SetRequest{Key: modified.ID, Value: modified, ETag: toModify.etag}}, {Operation: state.Upsert, Request: state.SetRequest{Key: modified.ID, Value: modified, ETag: &toModify.etag}},
{Operation: state.Upsert, Request: state.SetRequest{Key: toInsert.ID, Value: toInsert}}, {Operation: state.Upsert, Request: state.SetRequest{Key: toInsert.ID, Value: toInsert}},
}, },
}) })
@ -421,8 +424,8 @@ func testMultiOperations(t *testing.T) {
err = store.Multi(&state.TransactionalStateRequest{ err = store.Multi(&state.TransactionalStateRequest{
Operations: []state.TransactionalStateOperation{ Operations: []state.TransactionalStateOperation{
{Operation: state.Delete, Request: state.DeleteRequest{Key: toDelete.ID, ETag: toDelete.etag}}, {Operation: state.Delete, Request: state.DeleteRequest{Key: toDelete.ID, ETag: &toDelete.etag}},
{Operation: state.Upsert, Request: state.SetRequest{Key: modified.ID, Value: modified, ETag: toModify.etag}}, {Operation: state.Upsert, Request: state.SetRequest{Key: modified.ID, Value: modified, ETag: &toModify.etag}},
}, },
}) })
assert.Nil(t, err) assert.Nil(t, err)
@ -439,9 +442,10 @@ func testMultiOperations(t *testing.T) {
toDelete := loadedUsers[userIndex] toDelete := loadedUsers[userIndex]
toInsert := user{keyGen.NextKey(), "Wont-be-inserted", "Beer"} toInsert := user{keyGen.NextKey(), "Wont-be-inserted", "Beer"}
invEtag := invalidEtag
err = store.Multi(&state.TransactionalStateRequest{ err = store.Multi(&state.TransactionalStateRequest{
Operations: []state.TransactionalStateOperation{ Operations: []state.TransactionalStateOperation{
{Operation: state.Delete, Request: state.DeleteRequest{Key: toDelete.ID, ETag: invalidEtag}}, {Operation: state.Delete, Request: state.DeleteRequest{Key: toDelete.ID, ETag: &invEtag}},
{Operation: state.Upsert, Request: state.SetRequest{Key: toInsert.ID, Value: toInsert}}, {Operation: state.Upsert, Request: state.SetRequest{Key: toInsert.ID, Value: toInsert}},
}, },
}) })
@ -459,9 +463,10 @@ func testMultiOperations(t *testing.T) {
modified := toModify.user modified := toModify.user
modified.FavoriteBeverage = beverageTea modified.FavoriteBeverage = beverageTea
invEtag := invalidEtag
err = store.Multi(&state.TransactionalStateRequest{ err = store.Multi(&state.TransactionalStateRequest{
Operations: []state.TransactionalStateOperation{ Operations: []state.TransactionalStateOperation{
{Operation: state.Delete, Request: state.DeleteRequest{Key: toDelete.ID, ETag: invalidEtag}}, {Operation: state.Delete, Request: state.DeleteRequest{Key: toDelete.ID, ETag: &invEtag}},
{Operation: state.Upsert, Request: state.SetRequest{Key: modified.ID, Value: modified}}, {Operation: state.Upsert, Request: state.SetRequest{Key: modified.ID, Value: modified}},
}, },
}) })
@ -478,10 +483,11 @@ func testMultiOperations(t *testing.T) {
modified := toModify.user modified := toModify.user
modified.FavoriteBeverage = beverageTea modified.FavoriteBeverage = beverageTea
invEtag := invalidEtag
err = store.Multi(&state.TransactionalStateRequest{ err = store.Multi(&state.TransactionalStateRequest{
Operations: []state.TransactionalStateOperation{ Operations: []state.TransactionalStateOperation{
{Operation: state.Delete, Request: state.DeleteRequest{Key: toDelete.ID}}, {Operation: state.Delete, Request: state.DeleteRequest{Key: toDelete.ID}},
{Operation: state.Upsert, Request: state.SetRequest{Key: modified.ID, Value: modified, ETag: invalidEtag}}, {Operation: state.Upsert, Request: state.SetRequest{Key: modified.ID, Value: modified, ETag: &invEtag}},
}, },
}) })
@ -539,7 +545,7 @@ func testBulkSet(t *testing.T) {
toInsert := user{keyGen.NextKey(), "Maria", "Wine"} toInsert := user{keyGen.NextKey(), "Maria", "Wine"}
err := store.BulkSet([]state.SetRequest{ err := store.BulkSet([]state.SetRequest{
{Key: modified.ID, Value: modified, ETag: toModifyETag}, {Key: modified.ID, Value: modified, ETag: &toModifyETag},
{Key: toInsert.ID, Value: toInsert}, {Key: toInsert.ID, Value: toInsert},
}) })
assert.Nil(t, err) assert.Nil(t, err)
@ -577,10 +583,11 @@ func testBulkSet(t *testing.T) {
modified := toModify modified := toModify
modified.FavoriteBeverage = beverageTea modified.FavoriteBeverage = beverageTea
invEtag := invalidEtag
sets := []state.SetRequest{ sets := []state.SetRequest{
{Key: toInsert1.ID, Value: toInsert1}, {Key: toInsert1.ID, Value: toInsert1},
{Key: toInsert2.ID, Value: toInsert2}, {Key: toInsert2.ID, Value: toInsert2},
{Key: modified.ID, Value: modified, ETag: invalidEtag}, {Key: modified.ID, Value: modified, ETag: &invEtag},
} }
err := store.BulkSet(sets) err := store.BulkSet(sets)
@ -654,8 +661,8 @@ func testBulkDelete(t *testing.T) {
deleted2, deleted2Etag := assertUserExists(t, store, initialUsers[userIndex+1].ID) deleted2, deleted2Etag := assertUserExists(t, store, initialUsers[userIndex+1].ID)
err := store.BulkDelete([]state.DeleteRequest{ err := store.BulkDelete([]state.DeleteRequest{
{Key: deleted1.ID, ETag: deleted1Etag}, {Key: deleted1.ID, ETag: &deleted1Etag},
{Key: deleted2.ID, ETag: deleted2Etag}, {Key: deleted2.ID, ETag: &deleted2Etag},
}) })
assert.Nil(t, err) assert.Nil(t, err)
totalUsers -= 2 totalUsers -= 2
@ -671,7 +678,7 @@ func testBulkDelete(t *testing.T) {
deleted2 := initialUsers[userIndex+1] deleted2 := initialUsers[userIndex+1]
err := store.BulkDelete([]state.DeleteRequest{ err := store.BulkDelete([]state.DeleteRequest{
{Key: deleted1.ID, ETag: deleted1Etag}, {Key: deleted1.ID, ETag: &deleted1Etag},
{Key: deleted2.ID}, {Key: deleted2.ID},
}) })
assert.Nil(t, err) assert.Nil(t, err)
@ -687,9 +694,10 @@ func testBulkDelete(t *testing.T) {
deleted1, deleted1Etag := assertUserExists(t, store, initialUsers[userIndex].ID) deleted1, deleted1Etag := assertUserExists(t, store, initialUsers[userIndex].ID)
deleted2 := initialUsers[userIndex+1] deleted2 := initialUsers[userIndex+1]
invEtag := invalidEtag
err := store.BulkDelete([]state.DeleteRequest{ err := store.BulkDelete([]state.DeleteRequest{
{Key: deleted1.ID, ETag: deleted1Etag}, {Key: deleted1.ID, ETag: &deleted1Etag},
{Key: deleted2.ID, ETag: invalidEtag}, {Key: deleted2.ID, ETag: &invEtag},
}) })
assert.NotNil(t, err) assert.NotNil(t, err)
assert.NotNil(t, err) assert.NotNil(t, err)
@ -770,7 +778,7 @@ func testConcurrentSets(t *testing.T) {
defer wc.Done() defer wc.Done()
modified := user{"1", "John", beverageTea} modified := user{"1", "John", beverageTea}
err := store.Set(&state.SetRequest{Key: id, Value: modified, ETag: etag}) err := store.Set(&state.SetRequest{Key: id, Value: modified, ETag: &etag})
if err != nil { if err != nil {
atomic.AddInt32(&totalErrors, 1) atomic.AddInt32(&totalErrors, 1)
} else { } else {

View File

@ -178,7 +178,7 @@ func (s *StateStore) Delete(req *state.DeleteRequest) error {
} }
if err != nil { if err != nil {
if req.ETag != "" { if req.ETag != nil {
return state.NewETagError(state.ETagMismatch, err) return state.NewETagError(state.ETagMismatch, err)
} }
@ -231,7 +231,7 @@ func (s *StateStore) Set(req *state.SetRequest) error {
} }
if err != nil { if err != nil {
if req.ETag != "" { if req.ETag != nil {
return state.NewETagError(state.ETagMismatch, err) return state.NewETagError(state.ETagMismatch, err)
} }
@ -299,7 +299,12 @@ func (s *StateStore) newDeleteRequest(req *state.DeleteRequest) (*zk.DeleteReque
if req.Options.Concurrency == state.LastWrite { if req.Options.Concurrency == state.LastWrite {
version = anyVersion version = anyVersion
} else { } else {
version = s.parseETag(req.ETag) var etag string
if req.ETag != nil {
etag = *req.ETag
}
version = s.parseETag(etag)
} }
return &zk.DeleteRequest{ return &zk.DeleteRequest{
@ -324,7 +329,12 @@ func (s *StateStore) newSetDataRequest(req *state.SetRequest) (*zk.SetDataReques
if req.Options.Concurrency == state.LastWrite { if req.Options.Concurrency == state.LastWrite {
version = anyVersion version = anyVersion
} else { } else {
version = s.parseETag(req.ETag) var etag string
if req.ETag != nil {
etag = *req.ETag
}
version = s.parseETag(etag)
} }
return &zk.SetDataRequest{ return &zk.SetDataRequest{

View File

@ -94,6 +94,7 @@ func TestDelete(t *testing.T) {
conn := NewMockConn(ctrl) conn := NewMockConn(ctrl)
s := StateStore{conn: conn} s := StateStore{conn: conn}
etag := "123"
t.Run("With key", func(t *testing.T) { t.Run("With key", func(t *testing.T) {
conn.EXPECT().Delete("foo", int32(anyVersion)).Return(nil).Times(1) conn.EXPECT().Delete("foo", int32(anyVersion)).Return(nil).Times(1)
@ -104,7 +105,7 @@ func TestDelete(t *testing.T) {
t.Run("With key and version", func(t *testing.T) { t.Run("With key and version", func(t *testing.T) {
conn.EXPECT().Delete("foo", int32(123)).Return(nil).Times(1) conn.EXPECT().Delete("foo", int32(123)).Return(nil).Times(1)
err := s.Delete(&state.DeleteRequest{Key: "foo", ETag: "123"}) err := s.Delete(&state.DeleteRequest{Key: "foo", ETag: &etag})
assert.NoError(t, err, "Key must be exists") assert.NoError(t, err, "Key must be exists")
}) })
@ -113,7 +114,7 @@ func TestDelete(t *testing.T) {
err := s.Delete(&state.DeleteRequest{ err := s.Delete(&state.DeleteRequest{
Key: "foo", Key: "foo",
ETag: "123", ETag: &etag,
Options: state.DeleteStateOption{Concurrency: state.LastWrite}, Options: state.DeleteStateOption{Concurrency: state.LastWrite},
}) })
assert.NoError(t, err, "Key must be exists") assert.NoError(t, err, "Key must be exists")
@ -186,6 +187,7 @@ func TestSet(t *testing.T) {
stat := &zk.Stat{} stat := &zk.Stat{}
etag := "123"
t.Run("With key", func(t *testing.T) { t.Run("With key", func(t *testing.T) {
conn.EXPECT().Set("foo", []byte("\"bar\""), int32(anyVersion)).Return(stat, nil).Times(1) conn.EXPECT().Set("foo", []byte("\"bar\""), int32(anyVersion)).Return(stat, nil).Times(1)
@ -195,7 +197,7 @@ func TestSet(t *testing.T) {
t.Run("With key and version", func(t *testing.T) { t.Run("With key and version", func(t *testing.T) {
conn.EXPECT().Set("foo", []byte("\"bar\""), int32(123)).Return(stat, nil).Times(1) conn.EXPECT().Set("foo", []byte("\"bar\""), int32(123)).Return(stat, nil).Times(1)
err := s.Set(&state.SetRequest{Key: "foo", Value: "bar", ETag: "123"}) err := s.Set(&state.SetRequest{Key: "foo", Value: "bar", ETag: &etag})
assert.NoError(t, err, "Key must be set") assert.NoError(t, err, "Key must be set")
}) })
t.Run("With key and concurrency", func(t *testing.T) { t.Run("With key and concurrency", func(t *testing.T) {
@ -204,7 +206,7 @@ func TestSet(t *testing.T) {
err := s.Set(&state.SetRequest{ err := s.Set(&state.SetRequest{
Key: "foo", Key: "foo",
Value: "bar", Value: "bar",
ETag: "123", ETag: &etag,
Options: state.SetStateOption{Concurrency: state.LastWrite}, Options: state.SetStateOption{Concurrency: state.LastWrite},
}) })
assert.NoError(t, err, "Key must be set") assert.NoError(t, err, "Key must be set")