redis state store: implement state query API (#1488)
Signed-off-by: Dmitry Shmulevich <dmitry.shmulevich@gmail.com>
This commit is contained in:
parent
d6b969a825
commit
f819b290f7
|
@ -0,0 +1,8 @@
|
|||
version: '2'
|
||||
services:
|
||||
redis:
|
||||
image: redislabs/rejson:2.0.6
|
||||
ports:
|
||||
- "6379:6379"
|
||||
environment:
|
||||
- REDIS_REPLICATION_MODE=master
|
|
@ -225,9 +225,7 @@ jobs:
|
|||
done
|
||||
|
||||
- name: Start Redis
|
||||
uses: supercharge/redis-github-action@1.2.0
|
||||
with:
|
||||
redis-version: 6
|
||||
run: docker-compose -f ./.github/infrastructure/docker-compose-redisjson.yml -p redis up -d
|
||||
if: contains(matrix.component, 'redis')
|
||||
|
||||
- name: Start MongoDB
|
||||
|
|
|
@ -34,6 +34,9 @@ const (
|
|||
|
||||
// ContentType defines the metadata key for the content type.
|
||||
ContentType = "contentType"
|
||||
|
||||
// QueryIndexName defines the metadata key for the name of query indexing schema (for redis).
|
||||
QueryIndexName = "queryIndexName"
|
||||
)
|
||||
|
||||
// TryGetTTL tries to get the ttl as a time.Duration value for pubsub, binding and any other building block.
|
||||
|
@ -102,3 +105,11 @@ func TryGetContentType(props map[string]string) (string, bool) {
|
|||
|
||||
return "", false
|
||||
}
|
||||
|
||||
func TryGetQueryIndexName(props map[string]string) (string, bool) {
|
||||
if val, ok := props[QueryIndexName]; ok && val != "" {
|
||||
return val, true
|
||||
}
|
||||
|
||||
return "", false
|
||||
}
|
||||
|
|
|
@ -19,4 +19,5 @@ type metadata struct {
|
|||
maxRetries int
|
||||
maxRetryBackoff time.Duration
|
||||
ttlInSeconds *int
|
||||
queryIndexes string
|
||||
}
|
||||
|
|
|
@ -15,7 +15,6 @@ package redis
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
@ -25,20 +24,69 @@ import (
|
|||
"github.com/go-redis/redis/v8"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
|
||||
"github.com/dapr/components-contrib/contenttype"
|
||||
rediscomponent "github.com/dapr/components-contrib/internal/component/redis"
|
||||
daprmetadata "github.com/dapr/components-contrib/metadata"
|
||||
"github.com/dapr/components-contrib/state"
|
||||
"github.com/dapr/components-contrib/state/query"
|
||||
"github.com/dapr/components-contrib/state/utils"
|
||||
"github.com/dapr/kit/logger"
|
||||
)
|
||||
|
||||
const (
|
||||
setQuery = "local var1 = redis.pcall(\"HGET\", KEYS[1], \"version\"); if type(var1) == \"table\" then redis.call(\"DEL\", KEYS[1]); end; local var2 = redis.pcall(\"HGET\", KEYS[1], \"first-write\"); if not var1 or type(var1)==\"table\" or var1 == \"\" or var1 == ARGV[1] or (not var2 and ARGV[1] == \"0\") then redis.call(\"HSET\", KEYS[1], \"data\", ARGV[2]); if ARGV[3] == \"0\" then redis.call(\"HSET\", KEYS[1], \"first-write\", 0); end; return redis.call(\"HINCRBY\", KEYS[1], \"version\", 1) else return error(\"failed to set key \" .. KEYS[1]) end"
|
||||
delQuery = "local var1 = redis.pcall(\"HGET\", KEYS[1], \"version\"); if not var1 or type(var1)==\"table\" or var1 == ARGV[1] or var1 == \"\" or ARGV[1] == \"0\" then return redis.call(\"DEL\", KEYS[1]) else return error(\"failed to delete \" .. KEYS[1]) end"
|
||||
setDefaultQuery = `
|
||||
local etag = redis.pcall("HGET", KEYS[1], "version");
|
||||
if type(etag) == "table" then
|
||||
redis.call("DEL", KEYS[1]);
|
||||
end;
|
||||
local fwr = redis.pcall("HGET", KEYS[1], "first-write");
|
||||
if not etag or type(etag)=="table" or etag == "" or etag == ARGV[1] or (not fwr and ARGV[1] == "0") then
|
||||
redis.call("HSET", KEYS[1], "data", ARGV[2]);
|
||||
if ARGV[3] == "0" then
|
||||
redis.call("HSET", KEYS[1], "first-write", 0);
|
||||
end;
|
||||
return redis.call("HINCRBY", KEYS[1], "version", 1)
|
||||
else
|
||||
return error("failed to set key " .. KEYS[1])
|
||||
end`
|
||||
delDefaultQuery = `
|
||||
local etag = redis.pcall("HGET", KEYS[1], "version");
|
||||
if not etag or type(etag)=="table" or etag == ARGV[1] or etag == "" or ARGV[1] == "0" then
|
||||
return redis.call("DEL", KEYS[1])
|
||||
else
|
||||
return error("failed to delete " .. KEYS[1])
|
||||
end`
|
||||
setJSONQuery = `
|
||||
local etag = redis.pcall("JSON.GET", KEYS[1], ".version");
|
||||
if type(etag) == "table" then
|
||||
redis.call("JSON.DEL", KEYS[1]);
|
||||
end;
|
||||
if not etag or type(etag)=="table" or etag == "" then
|
||||
etag = ARGV[1];
|
||||
end;
|
||||
local fwr = redis.pcall("JSON.GET", KEYS[1], ".first-write");
|
||||
if etag == ARGV[1] or ((not fwr or type(fwr) == "table") and ARGV[1] == "0") then
|
||||
redis.call("JSON.SET", KEYS[1], "$", ARGV[2]);
|
||||
if ARGV[3] == "0" then
|
||||
redis.call("JSON.SET", KEYS[1], ".first-write", 0);
|
||||
end;
|
||||
return redis.call("JSON.SET", KEYS[1], ".version", (etag+1))
|
||||
else
|
||||
return error("failed to set key " .. KEYS[1])
|
||||
end`
|
||||
delJSONQuery = `
|
||||
local etag = redis.pcall("JSON.GET", KEYS[1], ".version");
|
||||
if not etag or type(etag)=="table" or etag == ARGV[1] or etag == "" or ARGV[1] == "0" then
|
||||
return redis.call("JSON.DEL", KEYS[1])
|
||||
else
|
||||
return error("failed to delete " .. KEYS[1])
|
||||
end`
|
||||
connectedSlavesReplicas = "connected_slaves:"
|
||||
infoReplicationDelimiter = "\r\n"
|
||||
maxRetries = "maxRetries"
|
||||
maxRetryBackoff = "maxRetryBackoff"
|
||||
ttlInSeconds = "ttlInSeconds"
|
||||
queryIndexes = "queryIndexes"
|
||||
defaultBase = 10
|
||||
defaultBitSize = 0
|
||||
defaultDB = 0
|
||||
|
@ -54,6 +102,7 @@ type StateStore struct {
|
|||
json jsoniter.API
|
||||
metadata metadata
|
||||
replicas int
|
||||
querySchemas querySchemas
|
||||
|
||||
features []state.Feature
|
||||
logger logger.Logger
|
||||
|
@ -106,6 +155,10 @@ func parseRedisMetadata(meta state.Metadata) (metadata, error) {
|
|||
m.ttlInSeconds = nil
|
||||
}
|
||||
|
||||
if val, ok := meta.Properties[queryIndexes]; ok && val != "" {
|
||||
m.queryIndexes = val
|
||||
}
|
||||
|
||||
return m, nil
|
||||
}
|
||||
|
||||
|
@ -131,15 +184,26 @@ func (r *StateStore) Init(metadata state.Metadata) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// check for query schemas
|
||||
if r.querySchemas, err = parseQuerySchemas(m.queryIndexes); err != nil {
|
||||
return fmt.Errorf("redis store: error parsing query index schema: %v", err)
|
||||
}
|
||||
|
||||
r.ctx, r.cancel = context.WithCancel(context.Background())
|
||||
|
||||
if _, err = r.client.Ping(r.ctx).Result(); err != nil {
|
||||
return fmt.Errorf("redis store: error connecting to redis at %s: %s", r.clientSettings.Host, err)
|
||||
return fmt.Errorf("redis store: error connecting to redis at %s: %v", r.clientSettings.Host, err)
|
||||
}
|
||||
|
||||
r.replicas, err = r.getConnectedSlaves()
|
||||
if r.replicas, err = r.getConnectedSlaves(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return err
|
||||
if err = r.registerSchemas(); err != nil {
|
||||
return fmt.Errorf("redis store: error registering query schemas: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Features returns the features available in this state store.
|
||||
|
@ -181,6 +245,13 @@ func (r *StateStore) deleteValue(req *state.DeleteRequest) error {
|
|||
etag := "0"
|
||||
req.ETag = &etag
|
||||
}
|
||||
|
||||
var delQuery string
|
||||
if contentType, ok := req.Metadata[daprmetadata.ContentType]; ok && contentType == contenttype.JSONContentType {
|
||||
delQuery = delJSONQuery
|
||||
} else {
|
||||
delQuery = delDefaultQuery
|
||||
}
|
||||
_, err := r.client.Do(r.ctx, "EVAL", delQuery, 1, req.Key, *req.ETag).Result()
|
||||
if err != nil {
|
||||
return state.NewETagError(state.ETagMismatch, err)
|
||||
|
@ -216,8 +287,7 @@ func (r *StateStore) directGet(req *state.GetRequest) (*state.GetResponse, error
|
|||
}, nil
|
||||
}
|
||||
|
||||
// Get retrieves state from redis with a key.
|
||||
func (r *StateStore) Get(req *state.GetRequest) (*state.GetResponse, error) {
|
||||
func (r *StateStore) getDefault(req *state.GetRequest) (*state.GetResponse, error) {
|
||||
res, err := r.client.Do(r.ctx, "HGETALL", req.Key).Result() // Prefer values with ETags
|
||||
if err != nil {
|
||||
return r.directGet(req) // Falls back to original get for backward compats.
|
||||
|
@ -241,6 +311,57 @@ func (r *StateStore) Get(req *state.GetRequest) (*state.GetResponse, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (r *StateStore) getJSON(req *state.GetRequest) (*state.GetResponse, error) {
|
||||
res, err := r.client.Do(r.ctx, "JSON.GET", req.Key).Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if res == nil {
|
||||
return &state.GetResponse{}, nil
|
||||
}
|
||||
|
||||
str, ok := res.(string)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("invalid result")
|
||||
}
|
||||
|
||||
var entry jsonEntry
|
||||
if err = r.json.UnmarshalFromString(str, &entry); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var version *string
|
||||
if entry.Version != nil {
|
||||
version = new(string)
|
||||
*version = strconv.Itoa(*entry.Version)
|
||||
}
|
||||
|
||||
data, err := r.json.Marshal(entry.Data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &state.GetResponse{
|
||||
Data: data,
|
||||
ETag: version,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Get retrieves state from redis with a key.
|
||||
func (r *StateStore) Get(req *state.GetRequest) (*state.GetResponse, error) {
|
||||
if contentType, ok := req.Metadata[daprmetadata.ContentType]; ok && contentType == contenttype.JSONContentType {
|
||||
return r.getJSON(req)
|
||||
}
|
||||
|
||||
return r.getDefault(req)
|
||||
}
|
||||
|
||||
type jsonEntry struct {
|
||||
Data interface{} `json:"data"`
|
||||
Version *int `json:"version,omitempty"`
|
||||
}
|
||||
|
||||
func (r *StateStore) setValue(req *state.SetRequest) error {
|
||||
err := state.CheckRequestOptions(req.Options)
|
||||
if err != nil {
|
||||
|
@ -259,13 +380,22 @@ func (r *StateStore) setValue(req *state.SetRequest) error {
|
|||
ttl = r.metadata.ttlInSeconds
|
||||
}
|
||||
|
||||
bt, _ := utils.Marshal(req.Value, r.json.Marshal)
|
||||
|
||||
firstWrite := 1
|
||||
if req.Options.Concurrency == state.FirstWrite {
|
||||
firstWrite = 0
|
||||
}
|
||||
_, err = r.client.Do(r.ctx, "EVAL", setQuery, 1, req.Key, ver, bt, firstWrite).Result()
|
||||
|
||||
var bt []byte
|
||||
var setQuery string
|
||||
if contentType, ok := req.Metadata[daprmetadata.ContentType]; ok && contentType == contenttype.JSONContentType {
|
||||
setQuery = setJSONQuery
|
||||
bt, _ = utils.Marshal(&jsonEntry{Data: req.Value}, r.json.Marshal)
|
||||
} else {
|
||||
setQuery = setDefaultQuery
|
||||
bt, _ = utils.Marshal(req.Value, r.json.Marshal)
|
||||
}
|
||||
|
||||
err = r.client.Do(r.ctx, "EVAL", setQuery, 1, req.Key, ver, bt, firstWrite).Err()
|
||||
if err != nil {
|
||||
if req.ETag != nil {
|
||||
return state.NewETagError(state.ETagMismatch, err)
|
||||
|
@ -305,6 +435,17 @@ func (r *StateStore) Set(req *state.SetRequest) error {
|
|||
|
||||
// Multi performs a transactional operation. succeeds only if all operations succeed, and fails if one or more operations fail.
|
||||
func (r *StateStore) Multi(request *state.TransactionalStateRequest) error {
|
||||
var setQuery, delQuery string
|
||||
var isJSON bool
|
||||
if contentType, ok := request.Metadata[daprmetadata.ContentType]; ok && contentType == contenttype.JSONContentType {
|
||||
isJSON = true
|
||||
setQuery = setJSONQuery
|
||||
delQuery = delJSONQuery
|
||||
} else {
|
||||
setQuery = setDefaultQuery
|
||||
delQuery = delDefaultQuery
|
||||
}
|
||||
|
||||
pipe := r.client.TxPipeline()
|
||||
for _, o := range request.Operations {
|
||||
//nolint:golint,nestif
|
||||
|
@ -322,8 +463,12 @@ func (r *StateStore) Multi(request *state.TransactionalStateRequest) error {
|
|||
if ttl == nil {
|
||||
ttl = r.metadata.ttlInSeconds
|
||||
}
|
||||
|
||||
bt, _ := utils.Marshal(req.Value, r.json.Marshal)
|
||||
var bt []byte
|
||||
if isJSON {
|
||||
bt, _ = utils.Marshal(&jsonEntry{Data: req.Value}, r.json.Marshal)
|
||||
} else {
|
||||
bt, _ = utils.Marshal(req.Value, r.json.Marshal)
|
||||
}
|
||||
pipe.Do(r.ctx, "EVAL", setQuery, 1, req.Key, ver, bt)
|
||||
if ttl != nil && *ttl > 0 {
|
||||
pipe.Do(r.ctx, "EXPIRE", req.Key, *ttl)
|
||||
|
@ -346,6 +491,26 @@ func (r *StateStore) Multi(request *state.TransactionalStateRequest) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (r *StateStore) registerSchemas() error {
|
||||
for name, elem := range r.querySchemas {
|
||||
r.logger.Infof("redis: create query index %s", name)
|
||||
if err := r.client.Do(r.ctx, elem.schema...).Err(); err != nil {
|
||||
if err.Error() != "Index already exists" {
|
||||
return err
|
||||
}
|
||||
r.logger.Infof("redis: drop stale query index %s", name)
|
||||
if err = r.client.Do(r.ctx, "FT.DROPINDEX", name).Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = r.client.Do(r.ctx, elem.schema...).Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *StateStore) getKeyVersion(vals []interface{}) (data string, version *string, err error) {
|
||||
seenData := false
|
||||
seenVersion := false
|
||||
|
@ -362,7 +527,7 @@ func (r *StateStore) getKeyVersion(vals []interface{}) (data string, version *st
|
|||
}
|
||||
}
|
||||
if !seenData || !seenVersion {
|
||||
return "", nil, errors.New("required hash field 'data' or 'version' was not found")
|
||||
return "", nil, fmt.Errorf("required hash field 'data' or 'version' was not found")
|
||||
}
|
||||
|
||||
return data, version, nil
|
||||
|
@ -394,6 +559,33 @@ func (r *StateStore) parseTTL(req *state.SetRequest) (*int, error) {
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
// Query executes a query against store.
|
||||
func (r *StateStore) Query(req *state.QueryRequest) (*state.QueryResponse, error) {
|
||||
indexName, ok := daprmetadata.TryGetQueryIndexName(req.Metadata)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("query index not found")
|
||||
}
|
||||
elem, ok := r.querySchemas[indexName]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("query index schema %q not found", indexName)
|
||||
}
|
||||
|
||||
q := NewQuery(indexName, elem.keys)
|
||||
qbuilder := query.NewQueryBuilder(q)
|
||||
if err := qbuilder.BuildQuery(&req.Query); err != nil {
|
||||
return &state.QueryResponse{}, err
|
||||
}
|
||||
data, token, err := q.execute(r.ctx, r.client)
|
||||
if err != nil {
|
||||
return &state.QueryResponse{}, err
|
||||
}
|
||||
|
||||
return &state.QueryResponse{
|
||||
Results: data,
|
||||
Token: token,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (r *StateStore) Close() error {
|
||||
r.cancel()
|
||||
|
||||
|
|
|
@ -0,0 +1,206 @@
|
|||
/*
|
||||
Copyright 2021 The Dapr Authors
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package redis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/dapr/components-contrib/state"
|
||||
"github.com/dapr/components-contrib/state/query"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
)
|
||||
|
||||
type Query struct {
|
||||
schemaName string
|
||||
aliases map[string]string
|
||||
query []interface{}
|
||||
limit int
|
||||
offset int64
|
||||
}
|
||||
|
||||
func NewQuery(schemaName string, aliases map[string]string) *Query {
|
||||
return &Query{
|
||||
schemaName: schemaName,
|
||||
aliases: aliases,
|
||||
}
|
||||
}
|
||||
|
||||
func (q *Query) getAlias(jsonPath string) (string, error) {
|
||||
alias, ok := q.aliases[jsonPath]
|
||||
if !ok {
|
||||
return "", fmt.Errorf("JSON path %q is not indexed", jsonPath)
|
||||
}
|
||||
return alias, nil
|
||||
}
|
||||
|
||||
func (q *Query) VisitEQ(f *query.EQ) (string, error) {
|
||||
// @<key>:(<val>)
|
||||
alias, err := q.getAlias(f.Key)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return fmt.Sprintf("@%s:(%s)", alias, f.Val), nil
|
||||
}
|
||||
|
||||
func (q *Query) VisitIN(f *query.IN) (string, error) {
|
||||
// @<key>:(<val1>|<val2>...)
|
||||
if len(f.Vals) == 0 {
|
||||
return "", fmt.Errorf("empty IN operator for key %q", f.Key)
|
||||
}
|
||||
alias, err := q.getAlias(f.Key)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
vals := make([]string, len(f.Vals))
|
||||
for i := range f.Vals {
|
||||
vals[i] = f.Vals[i].(string)
|
||||
}
|
||||
str := fmt.Sprintf("@%s:(%s)", alias, strings.Join(vals, "|"))
|
||||
|
||||
return str, nil
|
||||
}
|
||||
|
||||
func (q *Query) visitFilters(op string, filters []query.Filter) (string, error) {
|
||||
var (
|
||||
arr []string
|
||||
str string
|
||||
err error
|
||||
)
|
||||
for _, fil := range filters {
|
||||
switch f := fil.(type) {
|
||||
case *query.EQ:
|
||||
if str, err = q.VisitEQ(f); err != nil {
|
||||
return "", err
|
||||
}
|
||||
arr = append(arr, fmt.Sprintf("(%s)", str))
|
||||
case *query.IN:
|
||||
if str, err = q.VisitIN(f); err != nil {
|
||||
return "", err
|
||||
}
|
||||
arr = append(arr, fmt.Sprintf("(%s)", str))
|
||||
case *query.OR:
|
||||
if str, err = q.VisitOR(f); err != nil {
|
||||
return "", err
|
||||
}
|
||||
arr = append(arr, str)
|
||||
case *query.AND:
|
||||
if str, err = q.VisitAND(f); err != nil {
|
||||
return "", err
|
||||
}
|
||||
arr = append(arr, str)
|
||||
default:
|
||||
return "", fmt.Errorf("unsupported filter type %#v", f)
|
||||
}
|
||||
}
|
||||
|
||||
return fmt.Sprintf("(%s)", strings.Join(arr, op)), nil
|
||||
}
|
||||
|
||||
func (q *Query) VisitAND(f *query.AND) (string, error) {
|
||||
// ( <expression1> <expression2> ... <expressionN> )
|
||||
return q.visitFilters(" ", f.Filters)
|
||||
}
|
||||
|
||||
func (q *Query) VisitOR(f *query.OR) (string, error) {
|
||||
// ( <expression1> | <expression2> | ... | <expressionN> )
|
||||
return q.visitFilters("|", f.Filters)
|
||||
}
|
||||
|
||||
func (q *Query) Finalize(filters string, qq *query.Query) error {
|
||||
if len(filters) == 0 {
|
||||
filters = "*"
|
||||
}
|
||||
q.query = []interface{}{filters}
|
||||
|
||||
// sorting
|
||||
if len(qq.Sort) > 0 {
|
||||
if len(qq.Sort) != 1 {
|
||||
return errors.New("multiple SORTBY steps are not allowed. Sort multiple fields in a single step")
|
||||
}
|
||||
alias, err := q.getAlias(qq.Sort[0].Key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
q.query = append(q.query, "SORTBY", alias)
|
||||
if qq.Sort[0].Order == query.DESC {
|
||||
q.query = append(q.query, "DESC")
|
||||
}
|
||||
}
|
||||
// pagination
|
||||
if qq.Page.Limit > 0 {
|
||||
q.limit = qq.Page.Limit
|
||||
q.offset = 0
|
||||
if len(qq.Page.Token) != 0 {
|
||||
var err error
|
||||
q.offset, err = strconv.ParseInt(qq.Page.Token, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
q.query = append(q.query, "LIMIT", qq.Page.Token, fmt.Sprintf("%d", q.limit))
|
||||
} else {
|
||||
q.offset = 0
|
||||
q.query = append(q.query, "LIMIT", "0", fmt.Sprintf("%d", q.limit))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *Query) execute(ctx context.Context, client redis.UniversalClient) ([]state.QueryItem, string, error) {
|
||||
query := append(append([]interface{}{"FT.SEARCH", q.schemaName}, q.query...), "RETURN", "2", "$.data", "$.version")
|
||||
ret, err := client.Do(ctx, query...).Result()
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
arr, ok := ret.([]interface{})
|
||||
if !ok {
|
||||
return nil, "", fmt.Errorf("invalid output")
|
||||
}
|
||||
// arr[0] = number of matching elements in DB (ignoring pagination)
|
||||
// arr[2n] = key
|
||||
// arr[2n+1][0] = "$.data"
|
||||
// arr[2n+1][1] = value
|
||||
// arr[2n+1][2] = "$.version"
|
||||
// arr[2n+1][3] = etag
|
||||
if len(arr)%2 != 1 {
|
||||
return nil, "", fmt.Errorf("invalid output")
|
||||
}
|
||||
res := []state.QueryItem{}
|
||||
for i := 1; i < len(arr); i += 2 {
|
||||
item := state.QueryItem{
|
||||
Key: arr[i].(string),
|
||||
}
|
||||
if data, ok := arr[i+1].([]interface{}); ok && len(data) == 4 && data[0] == "$.data" && data[2] == "$.version" {
|
||||
item.Data = []byte(data[1].(string))
|
||||
etag := data[3].(string)
|
||||
item.ETag = &etag
|
||||
} else {
|
||||
item.Error = fmt.Sprintf("%#v is not []interface{}", arr[i+1])
|
||||
}
|
||||
res = append(res, item)
|
||||
}
|
||||
// set next query token only if limit is specified
|
||||
var token string
|
||||
if q.limit > 0 && len(res) > 0 {
|
||||
token = strconv.FormatInt(q.offset+int64(len(res)), 10)
|
||||
}
|
||||
|
||||
return res, token, err
|
||||
}
|
|
@ -0,0 +1,83 @@
|
|||
/*
|
||||
Copyright 2021 The Dapr Authors
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package redis
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type index struct {
|
||||
Key string `json:"key"`
|
||||
Type string `json:"type"`
|
||||
}
|
||||
|
||||
type querySchema struct {
|
||||
Name string `json:"name"`
|
||||
Indexes []index `json:"indexes"`
|
||||
}
|
||||
|
||||
type querySchemaElem struct {
|
||||
schema []interface{}
|
||||
keys map[string]string
|
||||
}
|
||||
|
||||
type querySchemas map[string]*querySchemaElem
|
||||
|
||||
func parseQuerySchemas(content string) (querySchemas, error) {
|
||||
if len(content) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var schemas []querySchema
|
||||
if err := json.Unmarshal([]byte(content), &schemas); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ret := querySchemas{}
|
||||
for _, schema := range schemas {
|
||||
if len(schema.Name) == 0 {
|
||||
return nil, fmt.Errorf("empty query schema name")
|
||||
}
|
||||
if _, ok := ret[schema.Name]; ok {
|
||||
return nil, fmt.Errorf("duplicate schema name %s", schema.Name)
|
||||
}
|
||||
elem := &querySchemaElem{
|
||||
keys: make(map[string]string),
|
||||
schema: []interface{}{"FT.CREATE", schema.Name, "ON", "JSON", "SCHEMA"},
|
||||
}
|
||||
for id, indx := range schema.Indexes {
|
||||
if err := validateIndex(schema.Name, indx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
alias := fmt.Sprintf("var%d", id)
|
||||
elem.keys[indx.Key] = alias
|
||||
elem.schema = append(elem.schema, fmt.Sprintf("$.data.%s", indx.Key), "AS", alias, indx.Type, "SORTABLE")
|
||||
}
|
||||
ret[schema.Name] = elem
|
||||
}
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func validateIndex(name string, indx index) error {
|
||||
if len(indx.Key) == 0 {
|
||||
return fmt.Errorf("empty key in query schema %s", name)
|
||||
}
|
||||
if len(indx.Type) == 0 {
|
||||
return fmt.Errorf("empty type in query schema %s", name)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,148 @@
|
|||
package redis
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestParsingEmptySchema(t *testing.T) {
|
||||
schemas, err := parseQuerySchemas("")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 0, len(schemas))
|
||||
}
|
||||
|
||||
func TestParsingSingleSchema(t *testing.T) {
|
||||
content := `
|
||||
[
|
||||
{
|
||||
"name": "schema1",
|
||||
"indexes": [
|
||||
{
|
||||
"key": "person.org",
|
||||
"type": "TEXT"
|
||||
},
|
||||
{
|
||||
"key": "person.id",
|
||||
"type": "NUMERIC"
|
||||
},
|
||||
{
|
||||
"key": "city",
|
||||
"type": "TEXT"
|
||||
},
|
||||
{
|
||||
"key": "state",
|
||||
"type": "TEXT"
|
||||
}
|
||||
]
|
||||
}
|
||||
]`
|
||||
schemas, err := parseQuerySchemas(content)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, len(schemas))
|
||||
assert.Equal(t,
|
||||
schemas["schema1"].keys,
|
||||
map[string]string{"person.org": "var0", "person.id": "var1", "city": "var2", "state": "var3"})
|
||||
assert.Equal(t,
|
||||
schemas["schema1"].schema,
|
||||
[]interface{}{
|
||||
"FT.CREATE", "schema1", "ON", "JSON", "SCHEMA",
|
||||
"$.data.person.org", "AS", "var0", "TEXT", "SORTABLE",
|
||||
"$.data.person.id", "AS", "var1", "NUMERIC", "SORTABLE",
|
||||
"$.data.city", "AS", "var2", "TEXT", "SORTABLE",
|
||||
"$.data.state", "AS", "var3", "TEXT", "SORTABLE",
|
||||
})
|
||||
}
|
||||
|
||||
func TestParsingMultiSchema(t *testing.T) {
|
||||
content := `
|
||||
[
|
||||
{
|
||||
"name": "schema1",
|
||||
"indexes": [
|
||||
{
|
||||
"key": "person.org",
|
||||
"type": "TEXT"
|
||||
},
|
||||
{
|
||||
"key": "city",
|
||||
"type": "TEXT"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "schema2",
|
||||
"indexes": [
|
||||
{
|
||||
"key": "person.id",
|
||||
"type": "NUMERIC"
|
||||
},
|
||||
{
|
||||
"key": "state",
|
||||
"type": "TEXT"
|
||||
}
|
||||
]
|
||||
}
|
||||
]`
|
||||
schemas, err := parseQuerySchemas(content)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 2, len(schemas))
|
||||
assert.Equal(t,
|
||||
schemas["schema1"].keys,
|
||||
map[string]string{"person.org": "var0", "city": "var1"})
|
||||
assert.Equal(t,
|
||||
schemas["schema1"].schema,
|
||||
[]interface{}{
|
||||
"FT.CREATE", "schema1", "ON", "JSON", "SCHEMA",
|
||||
"$.data.person.org", "AS", "var0", "TEXT", "SORTABLE",
|
||||
"$.data.city", "AS", "var1", "TEXT", "SORTABLE",
|
||||
})
|
||||
assert.Equal(t,
|
||||
schemas["schema2"].keys,
|
||||
map[string]string{"person.id": "var0", "state": "var1"})
|
||||
assert.Equal(t,
|
||||
schemas["schema2"].schema,
|
||||
[]interface{}{
|
||||
"FT.CREATE", "schema2", "ON", "JSON", "SCHEMA",
|
||||
"$.data.person.id", "AS", "var0", "NUMERIC", "SORTABLE",
|
||||
"$.data.state", "AS", "var1", "TEXT", "SORTABLE",
|
||||
})
|
||||
}
|
||||
|
||||
func TestParsingSchemaErrors(t *testing.T) {
|
||||
tests := []struct{ content, err string }{
|
||||
{
|
||||
content: `
|
||||
[
|
||||
{
|
||||
"name": "schema1",
|
||||
"indexes": [
|
||||
{
|
||||
"type": "TEXT"
|
||||
}
|
||||
]
|
||||
}
|
||||
]`,
|
||||
err: "empty key in query schema schema1",
|
||||
},
|
||||
{
|
||||
content: `
|
||||
[
|
||||
{
|
||||
"name": "schema3",
|
||||
"indexes": [
|
||||
{
|
||||
"key": "state"
|
||||
}
|
||||
]
|
||||
}
|
||||
]`,
|
||||
err: "empty type in query schema schema3",
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
_, err := parseQuerySchemas(test.content)
|
||||
assert.EqualError(t, err, test.err)
|
||||
}
|
||||
}
|
|
@ -9,3 +9,16 @@ spec:
|
|||
value: localhost:6379
|
||||
- name: redisPassword
|
||||
value: ""
|
||||
- name: queryIndexes
|
||||
value: |
|
||||
[
|
||||
{
|
||||
"name": "qIndx",
|
||||
"indexes": [
|
||||
{
|
||||
"key": "message",
|
||||
"type": "TEXT"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
|
|
|
@ -2,8 +2,7 @@
|
|||
componentType: state
|
||||
components:
|
||||
- component: redis
|
||||
allOperations: false
|
||||
operations: [ "set", "get", "delete", "bulkset", "bulkdelete", "transaction", "etag", "first-write" ]
|
||||
allOperations: true
|
||||
- component: mongodb
|
||||
allOperations: true
|
||||
- component: azure.cosmosdb
|
||||
|
|
|
@ -23,6 +23,8 @@ import (
|
|||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/dapr/components-contrib/contenttype"
|
||||
"github.com/dapr/components-contrib/metadata"
|
||||
"github.com/dapr/components-contrib/state"
|
||||
"github.com/dapr/components-contrib/tests/conformance/utils"
|
||||
)
|
||||
|
@ -38,6 +40,7 @@ type scenario struct {
|
|||
bulkOnly bool
|
||||
transactionOnly bool
|
||||
transactionGroup int
|
||||
contentType string
|
||||
}
|
||||
|
||||
type queryScenario struct {
|
||||
|
@ -90,8 +93,9 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
|
|||
value: "hello world",
|
||||
},
|
||||
{
|
||||
key: fmt.Sprintf("%s-struct", key),
|
||||
value: ValueType{Message: fmt.Sprintf("%s-test", key)},
|
||||
key: fmt.Sprintf("%s-struct", key),
|
||||
value: ValueType{Message: fmt.Sprintf("test%s", key)},
|
||||
contentType: contenttype.JSONContentType,
|
||||
},
|
||||
{
|
||||
key: fmt.Sprintf("%s-to-be-deleted", key),
|
||||
|
@ -191,7 +195,7 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
|
|||
"EQ": {"message": "dummy"}
|
||||
},
|
||||
{
|
||||
"IN": {"message": ["` + key + `-test", "dummy"]}
|
||||
"IN": {"message": ["test` + key + `", "dummy"]}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
@ -200,7 +204,7 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
|
|||
results: []state.QueryItem{
|
||||
{
|
||||
Key: fmt.Sprintf("%s-struct", key),
|
||||
Data: []byte(fmt.Sprintf("{\"message\":\"%s-test\"}", key)),
|
||||
Data: []byte(fmt.Sprintf("{\"message\":\"test%s\"}", key)),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -223,10 +227,14 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
|
|||
for _, scenario := range scenarios {
|
||||
if !scenario.bulkOnly && !scenario.transactionOnly {
|
||||
t.Logf("Setting value for %s", scenario.key)
|
||||
err := statestore.Set(&state.SetRequest{
|
||||
req := &state.SetRequest{
|
||||
Key: scenario.key,
|
||||
Value: scenario.value,
|
||||
})
|
||||
}
|
||||
if len(scenario.contentType) != 0 {
|
||||
req.Metadata = map[string]string{metadata.ContentType: scenario.contentType}
|
||||
}
|
||||
err := statestore.Set(req)
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
}
|
||||
|
@ -238,9 +246,13 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
|
|||
for _, scenario := range scenarios {
|
||||
if !scenario.bulkOnly && !scenario.transactionOnly {
|
||||
t.Logf("Checking value presence for %s", scenario.key)
|
||||
res, err := statestore.Get(&state.GetRequest{
|
||||
req := &state.GetRequest{
|
||||
Key: scenario.key,
|
||||
})
|
||||
}
|
||||
if len(scenario.contentType) != 0 {
|
||||
req.Metadata = map[string]string{metadata.ContentType: scenario.contentType}
|
||||
}
|
||||
res, err := statestore.Get(req)
|
||||
assert.Nil(t, err)
|
||||
assertEquals(t, scenario.value, res)
|
||||
}
|
||||
|
@ -257,6 +269,10 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
|
|||
var req state.QueryRequest
|
||||
err := json.Unmarshal([]byte(scenario.query), &req.Query)
|
||||
assert.NoError(t, err)
|
||||
req.Metadata = map[string]string{
|
||||
metadata.ContentType: contenttype.JSONContentType,
|
||||
metadata.QueryIndexName: "qIndx",
|
||||
}
|
||||
resp, err := querier.Query(&req)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(scenario.results), len(resp.Results))
|
||||
|
@ -279,9 +295,13 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
|
|||
if !scenario.bulkOnly && scenario.toBeDeleted {
|
||||
// this also deletes two keys that were not inserted in the set operation
|
||||
t.Logf("Deleting %s", scenario.key)
|
||||
err := statestore.Delete(&state.DeleteRequest{
|
||||
req := &state.DeleteRequest{
|
||||
Key: scenario.key,
|
||||
})
|
||||
}
|
||||
if len(scenario.contentType) != 0 {
|
||||
req.Metadata = map[string]string{metadata.ContentType: scenario.contentType}
|
||||
}
|
||||
err := statestore.Delete(req)
|
||||
assert.Nil(t, err)
|
||||
|
||||
t.Logf("Checking value absence for %s", scenario.key)
|
||||
|
|
Loading…
Reference in New Issue