Add conformance tests for etags in Bulk operations (#2843)

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
Signed-off-by: Artur Souza <artursouza.ms@outlook.com>
Co-authored-by: Artur Souza <artursouza.ms@outlook.com>
This commit is contained in:
Alessandro (Ale) Segala 2023-05-17 16:28:29 -07:00 committed by GitHub
parent e30d140479
commit 1f27fd0e8b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 1465 additions and 1084 deletions

View File

@ -15,7 +15,7 @@ npm ci
npm run build
```
> Important: do not publish this worker (e.g. with `npx wrangler publish`), as it should not use the config in the `wrangler.toml` file!
> Important: do not deploy this worker (e.g. with `npx wrangler deploy`), as it should not use the config in the `wrangler.toml` file!
## Develop locally

File diff suppressed because it is too large Load Diff

View File

@ -2,24 +2,24 @@
"private": true,
"name": "dapr-cfworkers-client",
"description": "Client code for Dapr to interact with Cloudflare Workers",
"version": "20230216",
"version": "20230517",
"main": "worker.ts",
"scripts": {
"build": "esbuild --bundle --minify --outfile=../workers/code/worker.js --format=esm --platform=browser --sourcemap worker.ts",
"start": "wrangler dev --local",
"start": "wrangler dev",
"format": "prettier --write ."
},
"author": "Dapr authors",
"license": "Apache2",
"devDependencies": {
"@cloudflare/workers-types": "^4.20230215.0",
"esbuild": "^0.17.8",
"prettier": "^2.8.4",
"typescript": "^4.9.5",
"wrangler": "^2.10.0"
"@cloudflare/workers-types": "^4.20230511.0",
"esbuild": "^0.17.19",
"prettier": "^2.8.8",
"typescript": "^5.0.4",
"wrangler": "^3.0.0"
},
"dependencies": {
"itty-router": "3.0.11",
"jose": "4.12.0"
"itty-router": "3.0.12",
"jose": "4.14.4"
}
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -193,7 +193,7 @@ func (p *PostgresDBAccess) doSet(parentCtx context.Context, db dbquerier, req *s
params []any
)
if req.ETag == nil || *req.ETag == "" {
if !req.HasETag() {
params = []any{req.Key, value, isBinary}
} else {
var etag64 uint64
@ -220,7 +220,7 @@ func (p *PostgresDBAccess) doSet(parentCtx context.Context, db dbquerier, req *s
return err
}
if result.RowsAffected() != 1 {
if req.ETag != nil && *req.ETag != "" {
if req.HasETag() {
return state.NewETagError(state.ETagMismatch, nil)
}
return errors.New("no item was updated")
@ -291,13 +291,39 @@ func (p *PostgresDBAccess) BulkGet(parentCtx context.Context, req []state.GetReq
// Scan all rows
var n int
res := make([]state.BulkGetResponse, len(req))
foundKeys := make(map[string]struct{}, len(req))
for ; rows.Next(); n++ {
if n >= len(req) {
// Sanity check to prevent panics, which should never happen
return nil, fmt.Errorf("query returned more records than expected (expected %d)", len(req))
}
r := state.BulkGetResponse{}
r.Key, r.Data, r.ETag, err = readRow(rows)
if err != nil {
r.Error = err.Error()
}
res[n] = r
foundKeys[r.Key] = struct{}{}
}
// Populate missing keys with empty values
// This is to ensure consistency with the other state stores that implement BulkGet as a loop over Get, and with the Get method
if len(foundKeys) < len(req) {
var ok bool
for _, r := range req {
_, ok = foundKeys[r.Key]
if !ok {
if n >= len(req) {
// Sanity check to prevent panics, which should never happen
return nil, fmt.Errorf("query returned more records than expected (expected %d)", len(req))
}
res[n] = state.BulkGetResponse{
Key: r.Key,
}
n++
}
}
}
return res[:n], nil
@ -352,7 +378,7 @@ func (p *PostgresDBAccess) doDelete(parentCtx context.Context, db dbquerier, req
ctx, cancel := context.WithTimeout(parentCtx, p.metadata.Timeout)
defer cancel()
var result pgconn.CommandTag
if req.ETag == nil || *req.ETag == "" {
if !req.HasETag() {
result, err = db.Exec(ctx, "DELETE FROM "+p.metadata.TableName+" WHERE key = $1", req.Key)
} else {
// Convert req.ETag to uint32 for postgres XID compatibility

View File

@ -129,7 +129,7 @@ func (aspike *Aerospike) Set(ctx context.Context, req *state.SetRequest) error {
writePolicy := &as.WritePolicy{}
// not a new record
if req.ETag != nil {
if req.HasETag() {
var gen uint32
gen, err = convertETag(*req.ETag)
if err != nil {
@ -158,7 +158,7 @@ func (aspike *Aerospike) Set(ctx context.Context, req *state.SetRequest) error {
}
err = aspike.client.Put(writePolicy, asKey, as.BinMap(data))
if err != nil {
if req.ETag != nil {
if req.HasETag() {
return state.NewETagError(state.ETagMismatch, err)
}
@ -210,7 +210,7 @@ func (aspike *Aerospike) Delete(ctx context.Context, req *state.DeleteRequest) e
}
writePolicy := &as.WritePolicy{}
if req.ETag != nil {
if req.HasETag() {
var gen uint32
gen, err = convertETag(*req.ETag)
if err != nil {
@ -235,7 +235,7 @@ func (aspike *Aerospike) Delete(ctx context.Context, req *state.DeleteRequest) e
_, err = aspike.client.Delete(writePolicy, asKey)
if err != nil {
if req.ETag != nil {
if req.HasETag() {
return state.NewETagError(state.ETagMismatch, err)
}

View File

@ -170,7 +170,7 @@ func (s *AliCloudTableStore) updateRowChange(req *state.SetRequest) *tablestore.
value, _ := marshal(req.Value)
change.PutColumn(stateValue, value)
if req.ETag != nil {
if req.HasETag() {
change.PutColumn(sateEtag, *req.ETag)
}

View File

@ -138,9 +138,10 @@ func (d *StateStore) Get(ctx context.Context, req *state.GetRequest) (*state.Get
Data: []byte(output),
}
var etag string
if etagVal, ok := result.Item["etag"]; ok {
if err = dynamodbattribute.Unmarshal(etagVal, &etag); err != nil {
if result.Item["etag"] != nil {
var etag string
err = dynamodbattribute.Unmarshal(result.Item["etag"], &etag)
if err != nil {
return nil, err
}
resp.ETag = &etag
@ -161,9 +162,7 @@ func (d *StateStore) Set(ctx context.Context, req *state.SetRequest) error {
TableName: &d.table,
}
haveEtag := false
if req.ETag != nil && *req.ETag != "" {
haveEtag = true
if req.HasETag() {
condExpr := "etag = :etag"
input.ConditionExpression = &condExpr
exprAttrValues := make(map[string]*dynamodb.AttributeValue)
@ -177,7 +176,7 @@ func (d *StateStore) Set(ctx context.Context, req *state.SetRequest) error {
}
_, err = d.client.PutItemWithContext(ctx, input)
if err != nil && haveEtag {
if err != nil && req.HasETag() {
switch cErr := err.(type) {
case *dynamodb.ConditionalCheckFailedException:
err = state.NewETagError(state.ETagMismatch, cErr)
@ -198,7 +197,7 @@ func (d *StateStore) Delete(ctx context.Context, req *state.DeleteRequest) error
TableName: aws.String(d.table),
}
if req.ETag != nil && *req.ETag != "" {
if req.HasETag() {
condExpr := "etag = :etag"
input.ConditionExpression = &condExpr
exprAttrValues := make(map[string]*dynamodb.AttributeValue)

View File

@ -152,10 +152,10 @@ func (r *StateStore) readFile(ctx context.Context, req *state.GetRequest) (*stat
func (r *StateStore) writeFile(ctx context.Context, req *state.SetRequest) error {
modifiedAccessConditions := blob.ModifiedAccessConditions{}
if req.ETag != nil && *req.ETag != "" {
if req.HasETag() {
modifiedAccessConditions.IfMatch = ptr.Of(azcore.ETag(*req.ETag))
}
if req.Options.Concurrency == state.FirstWrite && (req.ETag == nil || *req.ETag == "") {
if req.Options.Concurrency == state.FirstWrite && !req.HasETag() {
modifiedAccessConditions.IfNoneMatch = ptr.Of(azcore.ETagAny)
}
@ -179,7 +179,7 @@ func (r *StateStore) writeFile(ctx context.Context, req *state.SetRequest) error
if err != nil {
// Check if the error is due to ETag conflict
if req.ETag != nil && isETagConflictError(err) {
if req.HasETag() && isETagConflictError(err) {
return state.NewETagError(state.ETagMismatch, err)
}
@ -193,7 +193,7 @@ func (r *StateStore) deleteFile(ctx context.Context, req *state.DeleteRequest) e
blockBlobClient := r.containerClient.NewBlockBlobClient(getFileName(req.Key))
modifiedAccessConditions := blob.ModifiedAccessConditions{}
if req.ETag != nil && *req.ETag != "" {
if req.HasETag() {
modifiedAccessConditions.IfMatch = ptr.Of(azcore.ETag(*req.ETag))
}
@ -206,7 +206,7 @@ func (r *StateStore) deleteFile(ctx context.Context, req *state.DeleteRequest) e
_, err := blockBlobClient.Delete(ctx, &deleteOptions)
if err != nil {
if req.ETag != nil && isETagConflictError(err) {
if req.HasETag() && isETagConflictError(err) {
return state.NewETagError(state.ETagMismatch, err)
} else if isNotFoundError(err) {
// deleting an item that doesn't exist without specifying an ETAG is a noop

View File

@ -385,7 +385,7 @@ func (c *StateStore) Set(ctx context.Context, req *state.SetRequest) error {
partitionKey := populatePartitionMetadata(req.Key, req.Metadata)
options := azcosmos.ItemOptions{}
if req.ETag != nil && *req.ETag != "" {
if req.HasETag() {
etag := azcore.ETag(*req.ETag)
options.IfMatchEtag = &etag
} else if req.Options.Concurrency == state.FirstWrite {
@ -436,7 +436,7 @@ func (c *StateStore) Delete(ctx context.Context, req *state.DeleteRequest) error
partitionKey := populatePartitionMetadata(req.Key, req.Metadata)
options := azcosmos.ItemOptions{}
if req.ETag != nil && *req.ETag != "" {
if req.HasETag() {
etag := azcore.ETag(*req.ETag)
options.IfMatchEtag = &etag
} else if req.Options.Concurrency == state.FirstWrite {
@ -494,7 +494,7 @@ func (c *StateStore) Multi(ctx context.Context, request *state.TransactionalStat
}
doc.PartitionKey = partitionKey
if req.ETag != nil && *req.ETag != "" {
if req.HasETag() {
etag := azcore.ETag(*req.ETag)
options.IfMatchETag = &etag
} else if req.Options.Concurrency == state.FirstWrite {
@ -514,7 +514,7 @@ func (c *StateStore) Multi(ctx context.Context, request *state.TransactionalStat
batch.UpsertItem(marsh, options)
numOperations++
case state.DeleteRequest:
if req.ETag != nil && *req.ETag != "" {
if req.HasETag() {
etag := azcore.ETag(*req.ETag)
options.IfMatchETag = &etag
} else if req.Options.Concurrency == state.FirstWrite {

View File

@ -54,6 +54,7 @@ import (
mdutils "github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/state"
"github.com/dapr/kit/logger"
"github.com/dapr/kit/ptr"
)
const (
@ -168,7 +169,7 @@ func (r *StateStore) Features() []state.Feature {
func (r *StateStore) Delete(ctx context.Context, req *state.DeleteRequest) error {
err := r.deleteRow(ctx, req)
if err != nil {
if req.ETag != nil {
if req.HasETag() {
return state.NewETagError(state.ETagMismatch, err)
} else if isNotFoundError(err) {
// deleting an item that doesn't exist without specifying an ETAG is a noop
@ -262,18 +263,16 @@ func (r *StateStore) writeRow(ctx context.Context, req *state.SetRequest) error
// Always Update using the etag when provided even if Concurrency != FirstWrite.
// Today the presence of etag takes precedence over Concurrency.
// In the future #2739 will impose a breaking change which must disallow the use of etag when not using FirstWrite.
if req.ETag != nil && *req.ETag != "" {
etag := azcore.ETag(*req.ETag)
_, uerr := r.client.UpdateEntity(updateContext, marshalledEntity, &aztables.UpdateEntityOptions{
IfMatch: &etag,
if req.HasETag() {
_, err = r.client.UpdateEntity(updateContext, marshalledEntity, &aztables.UpdateEntityOptions{
IfMatch: ptr.Of(azcore.ETag(*req.ETag)),
UpdateMode: aztables.UpdateModeReplace,
})
if uerr != nil {
if isNotFoundError(uerr) {
return state.NewETagError(state.ETagMismatch, uerr)
if err != nil {
if isPreconditionFailedError(err) {
return state.NewETagError(state.ETagMismatch, err)
}
return uerr
return err
}
} else if req.Options.Concurrency == state.FirstWrite {
// Otherwise, if FirstWrite was set, but no etag was provided for an Update operation
@ -283,12 +282,12 @@ func (r *StateStore) writeRow(ctx context.Context, req *state.SetRequest) error
return state.NewETagError(state.ETagMismatch, errors.New("update with Concurrency.FirstWrite without ETag"))
} else {
// Finally, last write semantics without ETag should always perform a force update.
_, uerr := r.client.UpdateEntity(updateContext, marshalledEntity, &aztables.UpdateEntityOptions{
_, err = r.client.UpdateEntity(updateContext, marshalledEntity, &aztables.UpdateEntityOptions{
IfMatch: nil, // this is the same as "*" matching all ETags
UpdateMode: aztables.UpdateModeReplace,
})
if uerr != nil {
return uerr
if err != nil {
return err
}
}
} else {
@ -308,6 +307,14 @@ func isNotFoundError(err error) bool {
return false
}
func isPreconditionFailedError(err error) bool {
var respErr *azcore.ResponseError
if errors.As(err, &respErr) {
return respErr.ErrorCode == string(aztables.UpdateConditionNotSatisfied)
}
return false
}
func isEntityAlreadyExistsError(err error) bool {
var respErr *azcore.ResponseError
if errors.As(err, &respErr) {
@ -329,10 +336,17 @@ func (r *StateStore) deleteRow(ctx context.Context, req *state.DeleteRequest) er
deleteContext, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
if req.ETag != nil {
azcoreETag := azcore.ETag(*req.ETag)
_, err := r.client.DeleteEntity(deleteContext, pk, rk, &aztables.DeleteEntityOptions{IfMatch: &azcoreETag})
return err
if req.HasETag() {
_, err := r.client.DeleteEntity(deleteContext, pk, rk, &aztables.DeleteEntityOptions{
IfMatch: ptr.Of(azcore.ETag(*req.ETag)),
})
if err != nil {
if isPreconditionFailedError(err) {
return state.NewETagError(state.ETagMismatch, err)
}
return err
}
return nil
}
all := azcore.ETagAny
_, err := r.client.DeleteEntity(deleteContext, pk, rk, &aztables.DeleteEntityOptions{IfMatch: &all})
@ -373,7 +387,7 @@ func (r *StateStore) marshal(req *state.SetRequest) ([]byte, error) {
},
}
if req.ETag != nil {
if req.HasETag() {
entity.ETag = *req.ETag
}

View File

@ -28,12 +28,17 @@ func New(logger logger.Logger) state.Store {
ETagColumn: "etag",
MigrateFn: ensureTables,
SetQueryFn: func(req *state.SetRequest, opts postgresql.SetQueryOptions) string {
// String concat is required for table name because sql.DB does not
// substitute parameters for table names.
// Other parameters use sql.DB parameter substitution.
if req.ETag == nil || *req.ETag == "" {
// Sprintf is required for table name because the driver does not substitute parameters for table names.
if !req.HasETag() {
// We do an upsert in both cases, even when concurrency is first-write, because the row may exist but be expired (and not yet garbage collected)
// The difference is that with concurrency as first-write, we'll update the row only if it's expired
var whereClause string
if req.Options.Concurrency == state.FirstWrite {
whereClause = " WHERE (t.expiredate IS NOT NULL AND t.expiredate < CURRENT_TIMESTAMP)"
}
return `
INSERT INTO ` + opts.TableName + `
INSERT INTO ` + opts.TableName + ` AS t
(key, value, isbinary, etag, expiredate)
VALUES
($1, $2, $3, 1, ` + opts.ExpireDateValue + `)
@ -42,7 +47,8 @@ ON CONFLICT (key) DO UPDATE SET
isbinary = $3,
updatedate = NOW(),
etag = EXCLUDED.etag + 1,
expiredate = ` + opts.ExpireDateValue + `;`
expiredate = ` + opts.ExpireDateValue +
whereClause
}
// When an etag is provided do an update - no insert.

View File

@ -173,7 +173,7 @@ func (cbs *Couchbase) Set(ctx context.Context, req *state.SetRequest) error {
//nolint:nestif
// key already exists (use Replace)
if req.ETag != nil {
if req.HasETag() {
// compare-and-swap (CAS) for managing concurrent modifications - https://docs.couchbase.com/go-sdk/current/concurrent-mutations-cluster.html
cas, cerr := eTagToCas(*req.ETag)
if cerr != nil {
@ -194,7 +194,7 @@ func (cbs *Couchbase) Set(ctx context.Context, req *state.SetRequest) error {
}
if err != nil {
if req.ETag != nil {
if req.HasETag() {
return state.NewETagError(state.ETagMismatch, err)
}
@ -231,7 +231,7 @@ func (cbs *Couchbase) Delete(ctx context.Context, req *state.DeleteRequest) erro
var cas gocb.Cas = 0
if req.ETag != nil {
if req.HasETag() {
cas, err = eTagToCas(*req.ETag)
if err != nil {
return err
@ -243,7 +243,7 @@ func (cbs *Couchbase) Delete(ctx context.Context, req *state.DeleteRequest) erro
_, err = cbs.bucket.Remove(req.Key, cas)
}
if err != nil {
if req.ETag != nil {
if req.HasETag() {
return state.NewETagError(state.ETagMismatch, err)
}

View File

@ -344,7 +344,7 @@ func (e *Etcd) Multi(ctx context.Context, request *state.TransactionalStateReque
return err
}
var cmp clientv3.Cmp
if req.ETag != nil {
if req.HasETag() {
etag, _ := strconv.ParseInt(*req.ETag, 10, 64)
cmp = clientv3.Compare(clientv3.ModRevision(keyWithPath), "=", etag)
}
@ -354,14 +354,14 @@ func (e *Etcd) Multi(ctx context.Context, request *state.TransactionalStateReque
return fmt.Errorf("couldn't grant lease %s: %w", keyWithPath, err)
}
put := clientv3.OpPut(keyWithPath, string(reqVal), clientv3.WithLease(resp.ID))
if req.ETag != nil {
if req.HasETag() {
ops = append(ops, clientv3.OpTxn([]clientv3.Cmp{cmp}, []clientv3.Op{put}, nil))
} else {
ops = append(ops, clientv3.OpTxn(nil, []clientv3.Op{put}, nil))
}
} else {
put := clientv3.OpPut(keyWithPath, string(reqVal))
if req.ETag != nil {
if req.HasETag() {
ops = append(ops, clientv3.OpTxn([]clientv3.Cmp{cmp}, []clientv3.Op{put}, nil))
} else {
ops = append(ops, clientv3.OpTxn(nil, []clientv3.Op{put}, nil))
@ -379,7 +379,7 @@ func (e *Etcd) Multi(ctx context.Context, request *state.TransactionalStateReque
}
del := clientv3.OpDelete(keyWithPath)
if req.ETag != nil {
if req.HasETag() {
etag, _ := strconv.ParseInt(*req.ETag, 10, 64)
cmp := clientv3.Compare(clientv3.ModRevision(keyWithPath), "=", etag)
ops = append(ops, clientv3.OpTxn([]clientv3.Cmp{cmp}, []clientv3.Op{del}, nil))

View File

@ -169,7 +169,6 @@ func (store *inMemoryStore) BulkGet(ctx context.Context, req []state.GetRequest,
store.lock.RLock()
defer store.lock.RUnlock()
n := 0
for i, r := range req {
item := store.items[r.Key]
if item != nil && !item.isExpired() {
@ -178,11 +177,14 @@ func (store *inMemoryStore) BulkGet(ctx context.Context, req []state.GetRequest,
Data: item.data,
ETag: item.etag,
}
n++
} else {
res[i] = state.BulkGetResponse{
Key: r.Key,
}
}
}
return res[:n], nil
return res, nil
}
func (store *inMemoryStore) getAndExpire(key string) *inMemStateStoreItem {

View File

@ -206,7 +206,7 @@ func (m *MongoDB) setInternal(ctx context.Context, req *state.SetRequest) error
// create a document based on request key and value
filter := bson.M{id: req.Key}
if req.ETag != nil && *req.ETag != "" {
if req.HasETag() {
filter[etag] = *req.ETag
} else if req.Options.Concurrency == state.FirstWrite {
uuid, err := uuid.NewRandom()
@ -302,7 +302,7 @@ func (m *MongoDB) Get(ctx context.Context, req *state.GetRequest) (*state.GetRes
func (m *MongoDB) BulkGet(ctx context.Context, req []state.GetRequest, _ state.BulkGetOpts) ([]state.BulkGetResponse, error) {
// If nothing is being requested, short-circuit
if len(req) == 0 {
return []state.BulkGetResponse{}, nil
return nil, nil
}
// Get all the keys
@ -326,12 +326,13 @@ func (m *MongoDB) BulkGet(ctx context.Context, req []state.GetRequest, _ state.B
// No documents found, just return an empty list
err = nil
}
return []state.BulkGetResponse{}, err
return nil, err
}
defer cur.Close(ctx)
// Read all results
res := make([]state.BulkGetResponse, 0, len(keys))
foundKeys := make(map[string]struct{}, len(keys))
for cur.Next(ctx) {
var (
doc Item
@ -339,7 +340,7 @@ func (m *MongoDB) BulkGet(ctx context.Context, req []state.GetRequest, _ state.B
)
err = cur.Decode(&doc)
if err != nil {
return res, err
return nil, err
}
bgr := state.BulkGetResponse{
@ -356,12 +357,27 @@ func (m *MongoDB) BulkGet(ctx context.Context, req []state.GetRequest, _ state.B
bgr.Data = data
}
res = append(res, bgr)
foundKeys[bgr.Key] = struct{}{}
}
err = cur.Err()
if err != nil {
return res, err
}
// Populate missing keys with empty values
// This is to ensure consistency with the other state stores that implement BulkGet as a loop over Get, and with the Get method
if len(foundKeys) < len(req) {
var ok bool
for _, r := range req {
_, ok = foundKeys[r.Key]
if !ok {
res = append(res, state.BulkGetResponse{
Key: r.Key,
})
}
}
}
return res, nil
}
@ -427,7 +443,7 @@ func (m *MongoDB) Delete(ctx context.Context, req *state.DeleteRequest) error {
func (m *MongoDB) deleteInternal(ctx context.Context, req *state.DeleteRequest) error {
filter := bson.M{id: req.Key}
if req.ETag != nil && *req.ETag != "" {
if req.HasETag() {
filter[etag] = *req.ETag
}
result, err := m.collection.DeleteOne(ctx, filter)
@ -436,7 +452,7 @@ func (m *MongoDB) deleteInternal(ctx context.Context, req *state.DeleteRequest)
}
if result.DeletedCount == 0 && req.ETag != nil && *req.ETag != "" {
return errors.New("key or etag not found")
return state.NewETagError(state.ETagMismatch, err)
}
return nil

View File

@ -495,7 +495,7 @@ func (m *MySQL) deleteValue(parentCtx context.Context, querier querier, req *sta
execCtx, cancel := context.WithTimeout(parentCtx, m.timeout)
defer cancel()
if req.ETag == nil || *req.ETag == "" {
if !req.HasETag() {
result, err = querier.ExecContext(execCtx,
`DELETE FROM `+m.tableName+` WHERE id = ?`,
req.Key)
@ -721,13 +721,39 @@ func (m *MySQL) BulkGet(parentCtx context.Context, req []state.GetRequest, _ sta
var n int
res := make([]state.BulkGetResponse, len(req))
foundKeys := make(map[string]struct{}, len(req))
for ; rows.Next(); n++ {
if n >= len(req) {
// Sanity check to prevent panics, which should never happen
return nil, fmt.Errorf("query returned more records than expected (expected %d)", len(req))
}
r := state.BulkGetResponse{}
r.Key, r.Data, r.ETag, err = readRow(rows)
if err != nil {
r.Error = err.Error()
}
res[n] = r
foundKeys[r.Key] = struct{}{}
}
// Populate missing keys with empty values
// This is to ensure consistency with the other state stores that implement BulkGet as a loop over Get, and with the Get method
if len(foundKeys) < len(req) {
var ok bool
for _, r := range req {
_, ok = foundKeys[r.Key]
if !ok {
if n >= len(req) {
// Sanity check to prevent panics, which should never happen
return nil, fmt.Errorf("query returned more records than expected (expected %d)", len(req))
}
res[n] = state.BulkGetResponse{
Key: r.Key,
}
n++
}
}
}
return res[:n], nil

View File

@ -162,7 +162,7 @@ func (o *oracleDatabaseAccess) doSet(ctx context.Context, db querier, req *state
etag := etagObj.String()
var result sql.Result
if req.ETag == nil || *req.ETag == "" {
if !req.HasETag() {
// Sprintf is required for table name because sql.DB does not substitute parameters for table names.
// Other parameters use sql.DB parameter substitution.
var stmt string
@ -201,7 +201,7 @@ func (o *oracleDatabaseAccess) doSet(ctx context.Context, db querier, req *state
return err
}
if rows != 1 {
if req.ETag != nil && *req.ETag != "" {
if req.HasETag() {
return state.NewETagError(state.ETagMismatch, err)
}
return errors.New("no item was updated")
@ -262,7 +262,7 @@ func (o *oracleDatabaseAccess) doDelete(ctx context.Context, db querier, req *st
}
var result sql.Result
if req.ETag == nil || *req.ETag == "" {
if !req.HasETag() {
result, err = db.ExecContext(ctx, "DELETE FROM "+o.metadata.TableName+" WHERE key = :key", req.Key)
} else {
result, err = db.ExecContext(ctx, "DELETE FROM "+o.metadata.TableName+" WHERE key = :key AND etag = :etag", req.Key, *req.ETag)

View File

@ -25,10 +25,8 @@ func NewPostgreSQLStateStore(logger logger.Logger) state.Store {
ETagColumn: "xmin",
MigrateFn: performMigration,
SetQueryFn: func(req *state.SetRequest, opts postgresql.SetQueryOptions) string {
// Sprintf is required for table name because sql.DB does not
// substitute parameters for table names.
// Other parameters use sql.DB parameter substitution.
if req.ETag == nil || *req.ETag == "" {
// Sprintf is required for table name because the driver does not substitute parameters for table names.
if !req.HasETag() {
// We do an upsert in both cases, even when concurrency is first-write, because the row may exist but be expired (and not yet garbage collected)
// The difference is that with concurrency as first-write, we'll update the row only if it's expired
var whereClause string

View File

@ -102,8 +102,7 @@ type StateStore struct {
querySchemas querySchemas
suppressActorStateStoreWarning atomic.Bool
features []state.Feature
logger logger.Logger
logger logger.Logger
}
// NewRedisStateStore returns a new redis state store.
@ -116,7 +115,6 @@ func NewRedisStateStore(log logger.Logger) state.Store {
func newStateStore(log logger.Logger) *StateStore {
return &StateStore{
json: jsoniter.ConfigFastest,
features: []state.Feature{state.FeatureETag, state.FeatureTransactional, state.FeatureQueryAPI},
logger: log,
suppressActorStateStoreWarning: atomic.Bool{},
}
@ -162,7 +160,11 @@ func (r *StateStore) Init(ctx context.Context, metadata state.Metadata) error {
// Features returns the features available in this state store.
func (r *StateStore) Features() []state.Feature {
return r.features
if r.clientHasJSON {
return []state.Feature{state.FeatureETag, state.FeatureTransactional, state.FeatureQueryAPI}
} else {
return []state.Feature{state.FeatureETag, state.FeatureTransactional}
}
}
func (r *StateStore) getConnectedSlaves(ctx context.Context) (int, error) {
@ -201,9 +203,8 @@ func (r *StateStore) Delete(ctx context.Context, req *state.DeleteRequest) error
return err
}
if req.ETag == nil {
etag := "0"
req.ETag = &etag
if !req.HasETag() {
req.ETag = ptr.Of("0")
}
if req.Metadata[daprmetadata.ContentType] == contenttype.JSONContentType && r.clientHasJSON {
@ -352,7 +353,7 @@ func (r *StateStore) Set(ctx context.Context, req *state.SetRequest) error {
}
if err != nil {
if req.ETag != nil {
if req.HasETag() {
return state.NewETagError(state.ETagMismatch, err)
}
@ -426,9 +427,8 @@ func (r *StateStore) Multi(ctx context.Context, request *state.TransactionalStat
}
case state.DeleteRequest:
if req.ETag == nil {
etag := "0"
req.ETag = &etag
if !req.HasETag() {
req.ETag = ptr.Of("0")
}
isReqJSON := isJSON ||
(len(req.Metadata) > 0 && req.Metadata[daprmetadata.ContentType] == contenttype.JSONContentType)

View File

@ -57,6 +57,11 @@ func (r DeleteRequest) GetMetadata() map[string]string {
return r.Metadata
}
// HasETag returns true if the request has a non-empty ETag.
func (r DeleteRequest) HasETag() bool {
return r.ETag != nil && *r.ETag != ""
}
// Operation returns the operation type for DeleteRequest, implementing TransactionalStateOperationRequest.
func (r DeleteRequest) Operation() OperationType {
return OperationDelete
@ -88,6 +93,11 @@ func (r SetRequest) GetMetadata() map[string]string {
return r.Metadata
}
// HasETag returns true if the request has a non-empty ETag.
func (r SetRequest) HasETag() bool {
return r.ETag != nil && *r.ETag != ""
}
// Operation returns the operation type for SetRequest, implementing TransactionalStateOperationRequest.
func (r SetRequest) Operation() OperationType {
return OperationUpsert

View File

@ -299,13 +299,39 @@ func (a *sqliteDBAccess) BulkGet(parentCtx context.Context, req []state.GetReque
var n int
res := make([]state.BulkGetResponse, len(req))
foundKeys := make(map[string]struct{}, len(req))
for ; rows.Next(); n++ {
if n >= len(req) {
// Sanity check to prevent panics, which should never happen
return nil, fmt.Errorf("query returned more records than expected (expected %d)", len(req))
}
r := state.BulkGetResponse{}
r.Key, r.Data, r.ETag, err = readRow(rows)
if err != nil {
r.Error = err.Error()
}
res[n] = r
foundKeys[r.Key] = struct{}{}
}
// Populate missing keys with empty values
// This is to ensure consistency with the other state stores that implement BulkGet as a loop over Get, and with the Get method
if len(foundKeys) < len(req) {
var ok bool
for _, r := range req {
_, ok = foundKeys[r.Key]
if !ok {
if n >= len(req) {
// Sanity check to prevent panics, which should never happen
return nil, fmt.Errorf("query returned more records than expected (expected %d)", len(req))
}
res[n] = state.BulkGetResponse{
Key: r.Key,
}
n++
}
}
}
return res[:n], nil
@ -393,7 +419,7 @@ func (a *sqliteDBAccess) doSet(parentCtx context.Context, db querier, req *state
)
// Sprintf is required for table name because sql.DB does not substitute parameters for table names.
// And the same is for DATETIME function's seconds parameter (which is from an integer anyways).
if req.ETag == nil || *req.ETag == "" {
if !req.HasETag() {
// If the operation uses first-write concurrency, we need to handle the special case of a row that has expired but hasn't been garbage collected yet
// In this case, the row should be considered as if it were deleted
// With SQLite, the only way we can handle that is by performing a SELECT query first
@ -457,7 +483,7 @@ func (a *sqliteDBAccess) doSet(parentCtx context.Context, db querier, req *state
return err
}
if rows == 0 {
if req.ETag != nil && *req.ETag != "" {
if req.HasETag() {
return state.NewETagError(state.ETagMismatch, nil)
}
return errors.New("no item was updated")
@ -530,7 +556,7 @@ func (a *sqliteDBAccess) doDelete(parentCtx context.Context, db querier, req *st
ctx, cancel := context.WithTimeout(parentCtx, a.metadata.timeout)
defer cancel()
var result sql.Result
if req.ETag == nil || *req.ETag == "" {
if !req.HasETag() {
// Concatenation is required for table name because sql.DB does not substitute parameters for table names.
result, err = db.ExecContext(ctx, "DELETE FROM "+a.metadata.TableName+" WHERE key = ?",
req.Key)

View File

@ -204,7 +204,7 @@ func (s *SQLServer) Delete(ctx context.Context, req *state.DeleteRequest) error
func (s *SQLServer) executeDelete(ctx context.Context, db dbExecutor, req *state.DeleteRequest) error {
var err error
var res sql.Result
if req.ETag != nil {
if req.HasETag() {
var b []byte
b, err = hex.DecodeString(*req.ETag)
if err != nil {
@ -292,7 +292,7 @@ func (s *SQLServer) executeSet(ctx context.Context, db dbExecutor, req *state.Se
return err
}
etag := sql.Named(rowVersionColumnName, nil)
if req.ETag != nil && *req.ETag != "" {
if req.HasETag() {
var b []byte
b, err = hex.DecodeString(*req.ETag)
if err != nil {
@ -327,7 +327,7 @@ func (s *SQLServer) executeSet(ctx context.Context, db dbExecutor, req *state.Se
}
if rows != 1 {
if req.ETag != nil && *req.ETag != "" {
if req.HasETag() {
return state.NewETagError(state.ETagMismatch, err)
}
return errors.New("no item was updated")

View File

@ -186,7 +186,7 @@ func (s *StateStore) Delete(ctx context.Context, req *state.DeleteRequest) error
}
if err != nil {
if req.ETag != nil {
if req.HasETag() {
return state.NewETagError(state.ETagMismatch, err)
}
@ -209,7 +209,7 @@ func (s *StateStore) Set(ctx context.Context, req *state.SetRequest) error {
}
if err != nil {
if req.ETag != nil {
if req.HasETag() {
return state.NewETagError(state.ETagMismatch, err)
}
@ -232,7 +232,7 @@ func (s *StateStore) newDeleteRequest(req *state.DeleteRequest) (*zk.DeleteReque
} else {
var etag string
if req.ETag != nil {
if req.HasETag() {
etag = *req.ETag
}
version = s.parseETag(etag)
@ -262,7 +262,7 @@ func (s *StateStore) newSetDataRequest(req *state.SetRequest) (*zk.SetDataReques
} else {
var etag string
if req.ETag != nil {
if req.HasETag() {
etag = *req.ETag
}
version = s.parseETag(etag)

View File

@ -1,83 +1,106 @@
# Supported operations: set, get, delete, bulkget, bulkset, bulkdelete, transaction, etag, first-write, query, ttl
# Supported operations: transaction, etag, first-write, query, ttl
# Supported config:
# - badEtag: string containing a value for the bad etag, for exaple if the component uses numeric etags (default: "bad-etag")
componentType: state
components:
- component: redis.v6
allOperations: false
operations: [ "set", "get", "delete", "bulkset", "bulkdelete", "transaction", "etag", "first-write", "query", "ttl" ]
operations: [ "transaction", "etag", "first-write", "query", "ttl" ]
config:
# This component requires etags to be numeric
badEtag: "9999999"
- component: redis.v7
allOperations: false
operations: [ "set", "get", "delete", "bulkset", "bulkdelete", "transaction", "etag", "first-write" ]
# "query" is not included because redisjson hasn't been updated to Redis v7 yet
operations: [ "transaction", "etag", "first-write", "ttl" ]
config:
# This component requires etags to be numeric
badEtag: "9999999"
- component: mongodb
allOperations: false
operations: [ "set", "get", "delete", "bulkget", "bulkset", "bulkdelete", "transaction", "etag", "first-write", "query", "ttl" ]
operations: [ "transaction", "etag", "first-write", "query", "ttl" ]
- component: memcached
allOperations: false
operations: [ "set", "get", "delete", "bulkset", "bulkdelete", "ttl" ]
operations: [ "ttl" ]
- component: azure.cosmosdb
allOperations: false
operations: [ "set", "get", "delete", "bulkset", "bulkdelete", "transaction", "etag", "first-write", "query", "ttl" ]
operations: [ "transaction", "etag", "first-write", "query", "ttl" ]
- component: azure.blobstorage
allOperations: false
operations: [ "set", "get", "delete", "etag", "bulkset", "bulkdelete", "first-write" ]
operations: [ "etag", "first-write" ]
- component: azure.sql
allOperations: false
operations: [ "set", "get", "delete", "bulkset", "bulkdelete", "transaction", "etag", "first-write" ]
operations: [ "transaction", "etag", "first-write", "ttl" ]
config:
# This component requires etags to be hex-encoded numbers
badEtag: "FFFF"
- component: sqlserver
allOperations: false
operations: [ "set", "get", "delete", "bulkset", "bulkdelete", "transaction", "etag", "first-write", "ttl" ]
operations: [ "transaction", "etag", "first-write", "ttl" ]
config:
# This component requires etags to be hex-encoded numbers
badEtag: "FFFF"
- component: postgresql
allOperations: false
operations: [ "set", "get", "delete", "bulkget", "bulkset", "bulkdelete", "transaction", "etag", "first-write", "query", "ttl" ]
operations: [ "transaction", "etag", "first-write", "query", "ttl" ]
config:
# This component requires etags to be numeric
badEtag: "1"
- component: sqlite
allOperations: false
operations: [ "set", "get", "delete", "bulkget", "bulkset", "bulkdelete", "transaction", "etag", "first-write", "ttl" ]
operations: [ "transaction", "etag", "first-write", "ttl" ]
- component: mysql.mysql
allOperations: false
operations: [ "set", "get", "delete", "bulkget", "bulkset", "bulkdelete", "transaction", "etag", "first-write", "ttl" ]
operations: [ "transaction", "etag", "first-write", "ttl" ]
- component: mysql.mariadb
allOperations: false
operations: [ "set", "get", "delete", "bulkget", "bulkset", "bulkdelete", "transaction", "etag", "first-write", "ttl" ]
operations: [ "transaction", "etag", "first-write", "ttl" ]
- component: azure.tablestorage.storage
allOperations: false
operations: ["set", "get", "delete", "etag", "bulkset", "bulkdelete", "first-write"]
operations: [ "etag", "first-write"]
config:
# This component requires etags to be in this format
badEtag: "W/\"datetime'2023-05-09T12%3A28%3A54.1442151Z'\""
- component: azure.tablestorage.cosmosdb
allOperations: false
operations: ["set", "get", "delete", "etag", "bulkset", "bulkdelete", "first-write"]
operations: [ "etag", "first-write"]
config:
# This component requires etags to be in this format
badEtag: "W/\"datetime'2023-05-09T12%3A28%3A54.1442151Z'\""
- component: oracledatabase
allOperations: false
operations: [ "set", "get", "delete", "bulkset", "bulkdelete", "transaction", "etag", "first-write", "ttl" ]
operations: [ "transaction", "etag", "first-write", "ttl" ]
- component: cassandra
allOperations: false
operations: [ "set", "get", "delete", "bulkset", "bulkdelete", "ttl" ]
operations: [ "ttl" ]
- component: cloudflare.workerskv
allOperations: false
# Although this component supports TTLs, the minimum TTL is 60s, which makes it not suitable for our conformance tests
# TODO: perhaps create a special case `ttl60` operation for this component
# where the test would set a TTL of 60s for this particular operation.
# `ttl` and `ttl60` are mutually exclusive, and `allOperations` would
# exclude `ttl60`
operations: [ "set", "get", "delete", "bulkset", "bulkdelete"]
operations: []
- component: cockroachdb
allOperations: false
operations: [ "set", "get", "delete", "bulkget", "bulkset", "bulkdelete", "transaction", "etag", "query", "ttl" ]
operations: [ "transaction", "etag", "first-write", "query", "ttl" ]
config:
# This component requires etags to be numeric
badEtag: "9999999"
- component: rethinkdb
allOperations: false
operations: [ "set", "get", "delete", "bulkset", "bulkdelete"]
operations: []
- component: in-memory
allOperations: false
operations: [ "set", "get", "delete", "bulkget", "bulkset", "bulkdelete", "transaction", "etag", "first-write", "ttl" ]
operations: [ "transaction", "etag", "first-write", "ttl" ]
- component: aws.dynamodb.docker
allOperations: false
operations: [ "set", "get", "delete", "etag", "bulkset", "bulkdelete", "transaction", "first-write" ]
operations: [ "transaction", "etag", "first-write" ]
- component: aws.dynamodb.terraform
allOperations: false
operations: [ "set", "get", "delete", "etag", "bulkset", "bulkdelete", "transaction", "first-write" ]
operations: [ "transaction", "etag", "first-write" ]
- component: etcd
allOperations: false
operations: [ "set", "get", "delete", "bulkset", "bulkdelete", "transaction", "etag", "first-write", "ttl" ]
operations: [ "transaction", "etag", "first-write", "ttl" ]
- component: gcp.firestore.docker
allOperations: false
operations: [ "set", "get", "delete", "bulkset", "bulkdelete" ]
operations: []
- component: gcp.firestore.cloud
allOperations: false
operations: [ "set", "get", "delete", "bulkset", "bulkdelete" ]
operations: []

View File

@ -30,7 +30,6 @@ import (
"testing"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"
@ -366,97 +365,66 @@ func (tc *TestConfiguration) Run(t *testing.T) {
case "state":
filepath := fmt.Sprintf("../config/state/%s", componentConfigPath)
props, err := tc.loadComponentsAndProperties(t, filepath)
if err != nil {
t.Errorf("error running conformance test for %s: %s", comp.Component, err)
break
}
require.NoErrorf(t, err, "error running conformance test for component %s", comp.Component)
store := loadStateStore(comp)
assert.NotNil(t, store)
storeConfig := conf_state.NewTestConfig(comp.Component, comp.AllOperations, comp.Operations, comp.Config)
require.NotNilf(t, store, "error running conformance test for component %s", comp.Component)
storeConfig, err := conf_state.NewTestConfig(comp.Component, comp.AllOperations, comp.Operations, comp.Config)
require.NoErrorf(t, err, "error running conformance test for component %s", comp.Component)
conf_state.ConformanceTests(t, props, store, storeConfig)
case "secretstores":
filepath := fmt.Sprintf("../config/secretstores/%s", componentConfigPath)
props, err := tc.loadComponentsAndProperties(t, filepath)
if err != nil {
t.Errorf("error running conformance test for %s: %s", comp.Component, err)
break
}
require.NoErrorf(t, err, "error running conformance test for component %s", comp.Component)
store := loadSecretStore(comp)
assert.NotNil(t, store)
require.NotNilf(t, store, "error running conformance test for component %s", comp.Component)
storeConfig := conf_secret.NewTestConfig(comp.Component, comp.AllOperations, comp.Operations)
conf_secret.ConformanceTests(t, props, store, storeConfig)
case "pubsub":
filepath := fmt.Sprintf("../config/pubsub/%s", componentConfigPath)
props, err := tc.loadComponentsAndProperties(t, filepath)
if err != nil {
t.Errorf("error running conformance test for %s: %s", comp.Component, err)
break
}
require.NoErrorf(t, err, "error running conformance test for component %s", comp.Component)
pubsub := loadPubSub(comp)
assert.NotNil(t, pubsub)
require.NotNil(t, pubsub, "error running conformance test for component %s", comp.Component)
pubsubConfig, err := conf_pubsub.NewTestConfig(comp.Component, comp.AllOperations, comp.Operations, comp.Config)
if err != nil {
t.Errorf("error running conformance test for %s: %s", comp.Component, err)
break
}
require.NoErrorf(t, err, "error running conformance test for component %s", comp.Component)
conf_pubsub.ConformanceTests(t, props, pubsub, pubsubConfig)
case "bindings":
filepath := fmt.Sprintf("../config/bindings/%s", componentConfigPath)
props, err := tc.loadComponentsAndProperties(t, filepath)
if err != nil {
t.Errorf("error running conformance test for %s: %s", comp.Component, err)
break
}
require.NoErrorf(t, err, "error running conformance test for component %s", comp.Component)
inputBinding := loadInputBindings(comp)
outputBinding := loadOutputBindings(comp)
atLeastOne(t, func(item interface{}) bool {
return item != nil
}, inputBinding, outputBinding)
require.True(t, inputBinding != nil || outputBinding != nil)
bindingsConfig, err := conf_bindings.NewTestConfig(comp.Component, comp.AllOperations, comp.Operations, comp.Config)
if err != nil {
t.Errorf("error running conformance test for %s: %s", comp.Component, err)
break
}
require.NoErrorf(t, err, "error running conformance test for component %s", comp.Component)
conf_bindings.ConformanceTests(t, props, inputBinding, outputBinding, bindingsConfig)
case "workflows":
filepath := fmt.Sprintf("../config/workflows/%s", componentConfigPath)
props, err := tc.loadComponentsAndProperties(t, filepath)
if err != nil {
t.Errorf("error running conformance test for %s: %s", comp.Component, err)
break
}
require.NoErrorf(t, err, "error running conformance test for component %s", comp.Component)
wf := loadWorkflow(comp)
wfConfig := conf_workflows.NewTestConfig(comp.Component, comp.AllOperations, comp.Operations, comp.Config)
conf_workflows.ConformanceTests(t, props, wf, wfConfig)
case "crypto":
filepath := fmt.Sprintf("../config/crypto/%s", componentConfigPath)
props, err := tc.loadComponentsAndProperties(t, filepath)
if err != nil {
t.Errorf("error running conformance test for %s: %s", comp.Component, err)
break
}
require.NoErrorf(t, err, "error running conformance test for component %s", comp.Component)
component := loadCryptoProvider(comp)
require.NotNil(t, component)
require.NotNil(t, component, "error running conformance test for component %s", comp.Component)
cryptoConfig, err := conf_crypto.NewTestConfig(comp.Component, comp.AllOperations, comp.Operations, comp.Config)
if err != nil {
t.Errorf("error running conformance test for %s: %s", comp.Component, err)
break
}
require.NoErrorf(t, err, "error running conformance test for component %s", comp.Component)
conf_crypto.ConformanceTests(t, props, component, cryptoConfig)
case "configuration":
filepath := fmt.Sprintf("../config/configuration/%s", componentConfigPath)
props, err := tc.loadComponentsAndProperties(t, filepath)
if err != nil {
t.Errorf("error running conformance test for %s: %s", comp.Component, err)
break
}
require.NoErrorf(t, err, "error running conformance test for component %s", comp.Component)
store, updater := loadConfigurationStore(comp)
require.NotNil(t, store)
require.NotNil(t, updater)
require.NotNil(t, store, "error running conformance test for component %s", comp.Component)
require.NotNil(t, updater, "error running conformance test for component %s", comp.Component)
configurationConfig := conf_configuration.NewTestConfig(comp.Component, comp.AllOperations, comp.Operations, comp.Config)
conf_configuration.ConformanceTests(t, props, store, updater, configurationConfig, comp.Component)
default:
t.Errorf("unknown component type %s", tc.ComponentType)
t.Fatalf("unknown component type %s", tc.ComponentType)
}
})
}
@ -709,13 +677,3 @@ func loadWorkflow(tc TestComponent) workflows.Workflow {
return wf
}
func atLeastOne(t *testing.T, predicate func(interface{}) bool, items ...interface{}) {
met := false
for _, item := range items {
met = met || predicate(item)
}
assert.True(t, met)
}

View File

@ -26,11 +26,13 @@ import (
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
"github.com/dapr/components-contrib/contenttype"
"github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/state"
"github.com/dapr/components-contrib/tests/conformance/utils"
"github.com/dapr/kit/config"
"github.com/dapr/kit/ptr"
)
@ -59,19 +61,27 @@ type queryScenario struct {
type TestConfig struct {
utils.CommonConfig
BadEtag string `mapstructure:"badEtag"`
}
func NewTestConfig(component string, allOperations bool, operations []string, conf map[string]interface{}) TestConfig {
tc := TestConfig{
func NewTestConfig(component string, allOperations bool, operations []string, configMap map[string]interface{}) (TestConfig, error) {
testConfig := TestConfig{
CommonConfig: utils.CommonConfig{
ComponentType: "state",
ComponentName: component,
AllOperations: allOperations,
Operations: utils.NewStringSet(operations...),
},
BadEtag: "bad-etag",
}
return tc
err := config.Decode(configMap, &testConfig)
if err != nil {
return testConfig, err
}
return testConfig, nil
}
// ConformanceTests runs conf tests for state store.
@ -237,7 +247,7 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
// Don't run more tests if init failed
if t.Failed() {
t.Fatal("Init test failed, stopping further tests")
t.Fatal("Init failed, stopping further tests")
}
t.Run("ping", func(t *testing.T) {
@ -252,46 +262,46 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
}
})
if config.HasOperation("set") {
t.Run("set", func(t *testing.T) {
for _, scenario := range scenarios {
if !scenario.bulkOnly && !scenario.transactionOnly {
t.Logf("Setting value for %s", scenario.key)
req := &state.SetRequest{
Key: scenario.key,
Value: scenario.value,
}
if len(scenario.contentType) != 0 {
req.Metadata = map[string]string{metadata.ContentType: scenario.contentType}
}
err := statestore.Set(context.Background(), req)
assert.NoError(t, err)
t.Run("set", func(t *testing.T) {
for _, scenario := range scenarios {
if !scenario.bulkOnly && !scenario.transactionOnly {
t.Logf("Setting value for %s", scenario.key)
req := &state.SetRequest{
Key: scenario.key,
Value: scenario.value,
}
if len(scenario.contentType) != 0 {
req.Metadata = map[string]string{metadata.ContentType: scenario.contentType}
}
err := statestore.Set(context.Background(), req)
assert.NoError(t, err)
}
})
}
}
})
if config.HasOperation("get") {
t.Run("get", func(t *testing.T) {
for _, scenario := range scenarios {
if !scenario.bulkOnly && !scenario.transactionOnly {
t.Logf("Checking value presence for %s", scenario.key)
req := &state.GetRequest{
Key: scenario.key,
}
if len(scenario.contentType) != 0 {
req.Metadata = map[string]string{metadata.ContentType: scenario.contentType}
}
res, err := statestore.Get(context.Background(), req)
require.NoError(t, err)
assertEquals(t, scenario.value, res)
t.Run("get", func(t *testing.T) {
for _, scenario := range scenarios {
if !scenario.bulkOnly && !scenario.transactionOnly {
t.Logf("Checking value presence for %s", scenario.key)
req := &state.GetRequest{
Key: scenario.key,
}
if len(scenario.contentType) != 0 {
req.Metadata = map[string]string{metadata.ContentType: scenario.contentType}
}
res, err := statestore.Get(context.Background(), req)
require.NoError(t, err)
assertEquals(t, scenario.value, res)
}
})
}
}
})
if config.HasOperation("query") {
t.Run("query", func(t *testing.T) {
// Check if query feature is listed
features := statestore.Features()
require.True(t, state.FeatureQueryAPI.IsPresent(features))
querier, ok := statestore.(state.Querier)
assert.True(t, ok, "Querier interface is not implemented")
for _, scenario := range queryScenarios {
@ -317,116 +327,135 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
}
}
})
}
if config.HasOperation("delete") {
t.Run("delete", func(t *testing.T) {
for _, scenario := range scenarios {
if !scenario.bulkOnly && scenario.toBeDeleted {
// this also deletes two keys that were not inserted in the set operation
t.Logf("Deleting %s", scenario.key)
req := &state.DeleteRequest{
Key: scenario.key,
}
if len(scenario.contentType) != 0 {
req.Metadata = map[string]string{metadata.ContentType: scenario.contentType}
}
err := statestore.Delete(context.Background(), req)
assert.NoError(t, err, "no error expected while deleting %s", scenario.key)
t.Logf("Checking value absence for %s", scenario.key)
res, err := statestore.Get(context.Background(), &state.GetRequest{
Key: scenario.key,
})
assert.NoError(t, err, "no error expected while checking for absence for %s", scenario.key)
assert.Nil(t, res.Data, "no data expected while checking for absence for %s", scenario.key)
}
}
} else {
t.Run("query API feature not present", func(t *testing.T) {
features := statestore.Features()
assert.False(t, state.FeatureQueryAPI.IsPresent(features))
})
}
if config.HasOperation("bulkset") {
t.Run("bulkset", func(t *testing.T) {
var bulk []state.SetRequest
for _, scenario := range scenarios {
if scenario.bulkOnly {
t.Logf("Adding set request to bulk for %s", scenario.key)
bulk = append(bulk, state.SetRequest{
Key: scenario.key,
Value: scenario.value,
})
t.Run("delete", func(t *testing.T) {
for _, scenario := range scenarios {
if !scenario.bulkOnly && scenario.toBeDeleted {
// this also deletes two keys that were not inserted in the set operation
t.Logf("Deleting %s", scenario.key)
req := &state.DeleteRequest{
Key: scenario.key,
}
}
err := statestore.BulkSet(context.Background(), bulk, state.BulkStoreOpts{})
require.NoError(t, err)
for _, scenario := range scenarios {
if scenario.bulkOnly {
t.Logf("Checking value presence for %s", scenario.key)
// Data should have been inserted at this point
res, err := statestore.Get(context.Background(), &state.GetRequest{
Key: scenario.key,
})
require.NoError(t, err)
assertEquals(t, scenario.value, res)
if len(scenario.contentType) != 0 {
req.Metadata = map[string]string{metadata.ContentType: scenario.contentType}
}
}
})
}
err := statestore.Delete(context.Background(), req)
assert.NoError(t, err, "no error expected while deleting %s", scenario.key)
if config.HasOperation("bulkget") {
t.Run("bulkget", func(t *testing.T) {
var req []state.GetRequest
expects := map[string]any{}
for _, scenario := range scenarios {
if scenario.bulkOnly {
t.Logf("Adding get request to bulk for %s", scenario.key)
req = append(req, state.GetRequest{
Key: scenario.key,
})
expects[scenario.key] = scenario.value
}
}
res, err := statestore.BulkGet(context.Background(), req, state.BulkGetOpts{})
require.NoError(t, err)
require.Len(t, res, len(expects))
for _, r := range res {
t.Logf("Checking value equality %s", r.Key)
_, ok := expects[r.Key]
if assert.Empty(t, r.Error) && assert.True(t, ok) {
assertDataEquals(t, expects[r.Key], r.Data)
}
}
})
}
if config.HasOperation("bulkdelete") {
t.Run("bulkdelete", func(t *testing.T) {
var bulk []state.DeleteRequest
for _, scenario := range scenarios {
if scenario.bulkOnly && scenario.toBeDeleted {
t.Logf("Adding delete request to bulk for %s", scenario.key)
bulk = append(bulk, state.DeleteRequest{
Key: scenario.key,
})
}
}
err := statestore.BulkDelete(context.Background(), bulk, state.BulkStoreOpts{})
assert.NoError(t, err)
for _, req := range bulk {
t.Logf("Checking value absence for %s", req.Key)
t.Logf("Checking value absence for %s", scenario.key)
res, err := statestore.Get(context.Background(), &state.GetRequest{
Key: req.Key,
Key: scenario.key,
})
assert.NoError(t, err)
assert.Nil(t, res.Data)
assert.NoError(t, err, "no error expected while checking for absence for %s", scenario.key)
assert.Nil(t, res.Data, "no data expected while checking for absence for %s", scenario.key)
}
})
}
}
})
t.Run("bulkset", func(t *testing.T) {
var bulk []state.SetRequest
for _, scenario := range scenarios {
if scenario.bulkOnly {
t.Logf("Adding set request to bulk for %s", scenario.key)
bulk = append(bulk, state.SetRequest{
Key: scenario.key,
Value: scenario.value,
})
}
}
err := statestore.BulkSet(context.Background(), bulk, state.BulkStoreOpts{})
require.NoError(t, err)
for _, scenario := range scenarios {
if scenario.bulkOnly {
t.Logf("Checking value presence for %s", scenario.key)
// Data should have been inserted at this point
res, err := statestore.Get(context.Background(), &state.GetRequest{
Key: scenario.key,
})
require.NoError(t, err)
assertEquals(t, scenario.value, res)
}
}
})
t.Run("bulkget", func(t *testing.T) {
tests := []struct {
name string
req []state.GetRequest
expect map[string]any
}{
{name: "scenario", req: []state.GetRequest{}, expect: map[string]any{}},
{name: "include non-existent key", req: []state.GetRequest{{Key: "doesnotexist"}}, expect: map[string]any{"doesnotexist": nil}},
}
// Build test cases
first := true
for _, scenario := range scenarios {
if scenario.bulkOnly {
t.Logf("Adding get request to bulk for %s", scenario.key)
tests[0].req = append(tests[0].req, state.GetRequest{
Key: scenario.key,
})
tests[0].expect[scenario.key] = scenario.value
if first {
tests[1].req = append(tests[1].req, state.GetRequest{
Key: scenario.key,
})
tests[1].expect[scenario.key] = scenario.value
first = false
}
}
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
res, err := statestore.BulkGet(context.Background(), tt.req, state.BulkGetOpts{})
require.NoError(t, err)
require.Len(t, res, len(tt.expect))
for _, r := range res {
t.Logf("Checking value equality %s", r.Key)
val, ok := tt.expect[r.Key]
if assert.Empty(t, r.Error) && assert.True(t, ok) {
assertDataEquals(t, val, r.Data)
}
delete(tt.expect, r.Key)
}
})
}
})
t.Run("bulkdelete", func(t *testing.T) {
var bulk []state.DeleteRequest
for _, scenario := range scenarios {
if scenario.bulkOnly && scenario.toBeDeleted {
t.Logf("Adding delete request to bulk for %s", scenario.key)
bulk = append(bulk, state.DeleteRequest{
Key: scenario.key,
})
}
}
err := statestore.BulkDelete(context.Background(), bulk, state.BulkStoreOpts{})
assert.NoError(t, err)
for _, req := range bulk {
t.Logf("Checking value absence for %s", req.Key)
res, err := statestore.Get(context.Background(), &state.GetRequest{
Key: req.Key,
})
assert.NoError(t, err)
assert.Nil(t, res.Data)
}
})
//nolint:nestif
if config.HasOperation("transaction") {
t.Run("transaction", func(t *testing.T) {
// Check if transactional feature is listed
@ -512,11 +541,11 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
t.Run("transaction-order", func(t *testing.T) {
// Arrange
firstKey := "key1"
firstKey := key + "-key1"
firstValue := "value1"
secondKey := "key2"
secondKey := key + "-key2"
secondValue := "value2"
thirdKey := "key3"
thirdKey := key + "-key3"
thirdValue := "value3"
// for CosmosDB
@ -593,92 +622,254 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
}
})
} else {
// Check if transactional feature is NOT listed
features := statestore.Features()
assert.False(t, state.FeatureTransactional.IsPresent(features))
t.Run("transactional feature not present", func(t *testing.T) {
features := statestore.Features()
assert.False(t, state.FeatureTransactional.IsPresent(features))
})
}
// Supporting etags requires support for get, set, and delete so they are not checked individually
if config.HasOperation("etag") {
t.Run("etag", func(t *testing.T) {
testKey := "etagTest"
firstValue := []byte("testValue1")
secondValue := []byte("testValue2")
fakeEtag := "not-an-etag"
var (
etagErr *state.ETagError
bulkStoreErr state.BulkStoreError
testKeys = [4]string{key + "-etag1", key + "-etag2", key + "-etag3", key + "-etag4"}
etags [4]string
)
const (
firstValue = "first-value"
secondValue = "second-value"
thirdValue = "third-value"
)
// Check if eTag feature is listed
features := statestore.Features()
require.True(t, state.FeatureETag.IsPresent(features))
// Delete any potential object, it's important to start from a clean slate.
err := statestore.Delete(context.Background(), &state.DeleteRequest{
Key: testKey,
})
// Set some objects (no etag as they are new)
err := statestore.BulkSet(context.Background(), []state.SetRequest{
{Key: testKeys[0], Value: firstValue},
{Key: testKeys[1], Value: firstValue},
{Key: testKeys[2], Value: firstValue},
{Key: testKeys[3], Value: firstValue},
}, state.BulkStoreOpts{})
require.NoError(t, err)
// Set an object.
err = statestore.Set(context.Background(), &state.SetRequest{
Key: testKey,
Value: firstValue,
})
require.NoError(t, err)
// Validate the set.
// Validate the set, using both regular Get and BulkGet
res, err := statestore.Get(context.Background(), &state.GetRequest{
Key: testKey,
Key: testKeys[0],
})
require.NoError(t, err)
require.NotNil(t, res.ETag)
require.NotEmpty(t, *res.ETag)
assertEquals(t, firstValue, res)
etag := res.ETag
etags[0] = *res.ETag
// Try and update with wrong ETag, expect failure.
bulkRes, err := statestore.BulkGet(context.Background(), []state.GetRequest{
{Key: testKeys[1]},
{Key: testKeys[2]},
{Key: testKeys[3]},
}, state.BulkGetOpts{})
require.NoError(t, err)
require.Len(t, bulkRes, 3)
for i := 0; i < 3; i++ {
require.NotNil(t, bulkRes[i].ETag)
require.NotEmpty(t, *bulkRes[i].ETag)
assertDataEquals(t, firstValue, bulkRes[i].Data)
switch bulkRes[i].Key {
case testKeys[1]:
etags[1] = *bulkRes[i].ETag
case testKeys[2]:
etags[2] = *bulkRes[i].ETag
case testKeys[3]:
etags[3] = *bulkRes[i].ETag
}
}
// Try and update with wrong ETag, expect failure
err = statestore.Set(context.Background(), &state.SetRequest{
Key: testKey,
Key: testKeys[0],
Value: secondValue,
ETag: &fakeEtag,
ETag: &config.BadEtag,
})
require.Error(t, err)
require.ErrorAs(t, err, &etagErr)
assert.Equal(t, state.ETagMismatch, etagErr.Kind())
// Try and update with corect ETag, expect success.
// Try and update with Set and corect ETag, expect success
err = statestore.Set(context.Background(), &state.SetRequest{
Key: testKey,
Key: testKeys[0],
Value: secondValue,
ETag: etag,
ETag: &etags[0],
})
require.NoError(t, err)
// Validate the set.
// Validate the Set
res, err = statestore.Get(context.Background(), &state.GetRequest{
Key: testKey,
Key: testKeys[0],
})
require.NoError(t, err)
assertEquals(t, secondValue, res)
require.NotEqual(t, etag, res.ETag)
etag = res.ETag
require.NotNil(t, res.ETag)
require.NotEqual(t, etags[0], *res.ETag)
etags[0] = *res.ETag
// Try and delete with wrong ETag, expect failure.
err = statestore.Delete(context.Background(), &state.DeleteRequest{
Key: testKey,
ETag: &fakeEtag,
})
// Try and update bulk with one ETag wrong, expect partial success
err = statestore.BulkSet(context.Background(), []state.SetRequest{
{Key: testKeys[1], Value: secondValue, ETag: &config.BadEtag},
{Key: testKeys[2], Value: secondValue, ETag: &etags[2]},
}, state.BulkStoreOpts{})
require.Error(t, err)
unwrapErr, ok := err.(interface{ Unwrap() []error })
require.True(t, ok, "Returned error is not a joined error")
errs := unwrapErr.Unwrap()
require.Len(t, errs, 1)
require.ErrorAs(t, errs[0], &bulkStoreErr)
assert.Equal(t, testKeys[1], bulkStoreErr.Key())
etagErr = bulkStoreErr.ETagError()
require.NotNil(t, etagErr)
assert.Equal(t, state.ETagMismatch, etagErr.Kind())
// Try and delete with correct ETag, expect success.
err = statestore.Delete(context.Background(), &state.DeleteRequest{
Key: testKey,
ETag: etag,
// Validate: key 1 should be unchanged, and key 2 should be changed
res, err = statestore.Get(context.Background(), &state.GetRequest{
Key: testKeys[1],
})
require.NoError(t, err)
assertEquals(t, firstValue, res)
require.NotNil(t, res.ETag)
require.Equal(t, etags[1], *res.ETag)
res, err = statestore.Get(context.Background(), &state.GetRequest{
Key: testKeys[2],
})
require.NoError(t, err)
assertEquals(t, secondValue, res)
require.NotNil(t, res.ETag)
require.NotEqual(t, etags[2], *res.ETag)
etags[2] = *res.ETag
// Update bulk with valid etags
err = statestore.BulkSet(context.Background(), []state.SetRequest{
{Key: testKeys[1], Value: thirdValue, ETag: &etags[1]},
{Key: testKeys[2], Value: thirdValue, ETag: &etags[2]},
}, state.BulkStoreOpts{})
require.NoError(t, err)
// Validate
bulkRes, err = statestore.BulkGet(context.Background(), []state.GetRequest{
{Key: testKeys[1]},
{Key: testKeys[2]},
}, state.BulkGetOpts{})
require.NoError(t, err)
require.Len(t, bulkRes, 2)
for i := 0; i < 2; i++ {
require.NotNil(t, bulkRes[i].ETag)
require.NotEmpty(t, *bulkRes[i].ETag)
assertDataEquals(t, thirdValue, bulkRes[i].Data)
switch bulkRes[i].Key {
case testKeys[1]:
etags[1] = *bulkRes[i].ETag
case testKeys[2]:
etags[2] = *bulkRes[i].ETag
}
}
// Try and delete with wrong ETag, expect failure
err = statestore.Delete(context.Background(), &state.DeleteRequest{
Key: testKeys[0],
ETag: &config.BadEtag,
})
require.Error(t, err)
require.ErrorAs(t, err, &etagErr)
assert.NotEmpty(t, etagErr.Kind())
// Try and delete with correct ETag, expect success
err = statestore.Delete(context.Background(), &state.DeleteRequest{
Key: testKeys[0],
ETag: &etags[0],
})
require.NoError(t, err)
// Validate missing
res, err = statestore.Get(context.Background(), &state.GetRequest{
Key: testKeys[0],
})
require.NoError(t, err)
require.Empty(t, res.Data)
require.Empty(t, res.ETag)
// Try and delete bulk with two ETag's wrong, expect partial success
err = statestore.BulkDelete(context.Background(), []state.DeleteRequest{
{Key: testKeys[1], ETag: &etags[1]},
{Key: testKeys[2], ETag: &config.BadEtag},
}, state.BulkStoreOpts{})
require.Error(t, err)
unwrapErr, ok = err.(interface{ Unwrap() []error })
require.True(t, ok, "Returned error is not a joined error")
errs = unwrapErr.Unwrap()
require.Len(t, errs, 1)
require.ErrorAs(t, errs[0], &bulkStoreErr)
assert.Equal(t, testKeys[2], bulkStoreErr.Key())
etagErr = bulkStoreErr.ETagError()
require.NotNil(t, etagErr)
assert.Equal(t, state.ETagMismatch, etagErr.Kind())
// Validate key 1 missing
res, err = statestore.Get(context.Background(), &state.GetRequest{
Key: testKeys[1],
})
require.NoError(t, err)
require.Empty(t, res.Data)
require.Empty(t, res.ETag)
// Validate key 2 unchanged
res, err = statestore.Get(context.Background(), &state.GetRequest{
Key: testKeys[2],
})
require.NoError(t, err)
assertEquals(t, thirdValue, res)
require.NotNil(t, res.ETag)
require.Equal(t, etags[2], *res.ETag)
// Try and delete bulk with valid ETags
err = statestore.BulkDelete(context.Background(), []state.DeleteRequest{
{Key: testKeys[2], ETag: &etags[2]},
{Key: testKeys[3], ETag: &etags[3]},
}, state.BulkStoreOpts{})
require.NoError(t, err)
// Validate keys missing
bulkRes, err = statestore.BulkGet(context.Background(), []state.GetRequest{
{Key: testKeys[2]},
{Key: testKeys[3]},
}, state.BulkGetOpts{})
require.NoError(t, err)
require.Len(t, bulkRes, 2)
foundKeys := []string{}
for i := 0; i < 2; i++ {
require.Empty(t, bulkRes[i].Data)
require.Empty(t, bulkRes[i].ETag)
foundKeys = append(foundKeys, bulkRes[i].Key)
}
expectKeys := []string{
testKeys[2],
testKeys[3],
}
slices.Sort(foundKeys)
slices.Sort(expectKeys)
assert.EqualValues(t, expectKeys, foundKeys)
})
} else {
// Check if eTag feature is NOT listed
features := statestore.Features()
require.False(t, state.FeatureETag.IsPresent(features))
t.Run("etag feature not present", func(t *testing.T) {
features := statestore.Features()
require.False(t, state.FeatureETag.IsPresent(features))
})
}
if config.HasOperation("first-write") {
t.Run("first-write without etag", func(t *testing.T) {
testKey := "first-writeTest"
testKey := key + "-firstwrite-test"
firstValue := []byte("testValue1")
secondValue := []byte("testValue2")
@ -731,10 +922,11 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
})
require.NoError(t, err)
// Set the value
err = statestore.Set(context.Background(), requestSet[0])
require.NoError(t, err)
// Validate the set.
// Validate the set
res, err := statestore.Get(context.Background(), &state.GetRequest{
Key: testKey,
})
@ -749,7 +941,7 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
})
t.Run("first-write with etag", func(t *testing.T) {
testKey := "first-writeTest"
testKey := key + "-firstwrite-etag-test"
firstValue := []byte("testValue1")
secondValue := []byte("testValue2")
@ -758,13 +950,7 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
Value: firstValue,
}
// Delete any potential object, it's important to start from a clean slate.
err := statestore.Delete(context.Background(), &state.DeleteRequest{
Key: testKey,
})
require.NoError(t, err)
err = statestore.Set(context.Background(), request)
err := statestore.Set(context.Background(), request)
require.NoError(t, err)
// Validate the set.
@ -862,6 +1048,8 @@ func assertDataEquals(t *testing.T, expect any, actual []byte) {
assert.Equal(t, expect, v)
case []byte:
assert.Equal(t, expect, actual)
case nil:
assert.Empty(t, actual)
default:
// Other golang primitive types (string, bool ...)
if err := json.Unmarshal(actual, &v); err != nil {