diff --git a/configuration/metadata.go b/configuration/metadata.go new file mode 100644 index 000000000..6a225fc91 --- /dev/null +++ b/configuration/metadata.go @@ -0,0 +1,11 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation and Dapr Contributors. +// Licensed under the MIT License. +// ------------------------------------------------------------ + +package configuration + +// Metadata contains a configuration store specific set of metadata property. +type Metadata struct { + Properties map[string]string `json:"properties"` +} diff --git a/configuration/redis/internal/redis_value.go b/configuration/redis/internal/redis_value.go new file mode 100644 index 000000000..8a7d83ade --- /dev/null +++ b/configuration/redis/internal/redis_value.go @@ -0,0 +1,36 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation and Dapr Contributors. +// Licensed under the MIT License. +// ------------------------------------------------------------ + +package internal + +import ( + "fmt" + "strings" +) + +const ( + channelPrefix = "__keyspace@0__:" + separator = "||" +) + +func GetRedisValueAndVersion(redisValue string) (string, string) { + valueAndRevision := strings.Split(redisValue, separator) + if len(valueAndRevision) == 0 { + return "", "" + } + if len(valueAndRevision) == 1 { + return valueAndRevision[0], "" + } + return valueAndRevision[0], valueAndRevision[1] +} + +func ParseRedisKeyFromEvent(eventChannel string) (string, error) { + index := strings.Index(eventChannel, channelPrefix) + if index == -1 { + return "", fmt.Errorf("wrong format of event channel, it should start with '%s': eventChannel=%s", channelPrefix, eventChannel) + } + + return eventChannel[len(channelPrefix):], nil +} diff --git a/configuration/redis/internal/redis_value_test.go b/configuration/redis/internal/redis_value_test.go new file mode 100644 index 000000000..4d59fff82 --- /dev/null +++ b/configuration/redis/internal/redis_value_test.go @@ -0,0 +1,104 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation and Dapr Contributors. +// Licensed under the MIT License. +// ------------------------------------------------------------ + +package internal + +import "testing" + +func TestGetRedisValueAndVersion(t *testing.T) { + type args struct { + redisValue string + } + tests := []struct { + name string + args args + want string + want1 string + }{ + { + name: "empty value", + args: args{ + redisValue: "", + }, + want: "", + want1: "", + }, + { + name: "value without version", + args: args{ + redisValue: "mockValue", + }, + want: "mockValue", + want1: "", + }, + { + name: "value without version", + args: args{ + redisValue: "mockValue||", + }, + want: "mockValue", + want1: "", + }, + { + name: "value with version", + args: args{ + redisValue: "mockValue||v1.0.0", + }, + want: "mockValue", + want1: "v1.0.0", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, got1 := GetRedisValueAndVersion(tt.args.redisValue) + if got != tt.want { + t.Errorf("GetRedisValueAndVersion() got = %v, want %v", got, tt.want) + } + if got1 != tt.want1 { + t.Errorf("GetRedisValueAndVersion() got1 = %v, want %v", got1, tt.want1) + } + }) + } +} + +func TestParseRedisKeyFromEvent(t *testing.T) { + type args struct { + eventChannel string + } + tests := []struct { + name string + args args + want string + wantErr bool + }{ + { + name: "invalid channel name", + args: args{ + eventChannel: "invalie channel name", + }, + want: "", + wantErr: true, + }, { + name: "valid channel name", + args: args{ + eventChannel: channelPrefix + "key", + }, + want: "key", + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := ParseRedisKeyFromEvent(tt.args.eventChannel) + if (err != nil) != tt.wantErr { + t.Errorf("ParseRedisKeyFromEvent() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("ParseRedisKeyFromEvent() got = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/configuration/redis/metadata.go b/configuration/redis/metadata.go new file mode 100644 index 000000000..53b57226b --- /dev/null +++ b/configuration/redis/metadata.go @@ -0,0 +1,18 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation and Dapr Contributors. +// Licensed under the MIT License. +// ------------------------------------------------------------ + +package redis + +import "time" + +type metadata struct { + host string + password string + sentinelMasterName string + maxRetries int + maxRetryBackoff time.Duration + enableTLS bool + failover bool +} diff --git a/configuration/redis/redis.go b/configuration/redis/redis.go new file mode 100644 index 000000000..9f2d58cbe --- /dev/null +++ b/configuration/redis/redis.go @@ -0,0 +1,300 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation and Dapr Contributors. +// Licensed under the MIT License. +// ------------------------------------------------------------ + +package redis + +import ( + "context" + "crypto/tls" + "errors" + "fmt" + "strconv" + "strings" + "time" + + "github.com/go-redis/redis/v8" + jsoniter "github.com/json-iterator/go" + + "github.com/dapr/components-contrib/configuration" + "github.com/dapr/components-contrib/configuration/redis/internal" + "github.com/dapr/kit/logger" +) + +const ( + connectedSlavesReplicas = "connected_slaves:" + infoReplicationDelimiter = "\r\n" + host = "redisHost" + password = "redisPassword" + enableTLS = "enableTLS" + maxRetries = "maxRetries" + maxRetryBackoff = "maxRetryBackoff" + failover = "failover" + sentinelMasterName = "sentinelMasterName" + defaultBase = 10 + defaultBitSize = 0 + defaultDB = 0 + defaultMaxRetries = 3 + defaultMaxRetryBackoff = time.Second * 2 + defaultEnableTLS = false + keySpacePrefix = "__keyspace@0__:" + keySpaceAny = "__keyspace@0__:*" +) + +// ConfigurationStore is a Redis configuration store. +type ConfigurationStore struct { + client redis.UniversalClient + json jsoniter.API + metadata metadata + replicas int + + logger logger.Logger +} + +// NewRedisConfigurationStore returns a new redis state store. +func NewRedisConfigurationStore(logger logger.Logger) configuration.Store { + s := &ConfigurationStore{ + json: jsoniter.ConfigFastest, + logger: logger, + } + + return s +} + +func parseRedisMetadata(meta configuration.Metadata) (metadata, error) { + m := metadata{} + + if val, ok := meta.Properties[host]; ok && val != "" { + m.host = val + } else { + return m, errors.New("redis store error: missing host address") + } + + if val, ok := meta.Properties[password]; ok && val != "" { + m.password = val + } + + m.enableTLS = defaultEnableTLS + if val, ok := meta.Properties[enableTLS]; ok && val != "" { + tls, err := strconv.ParseBool(val) + if err != nil { + return m, fmt.Errorf("redis store error: can't parse enableTLS field: %s", err) + } + m.enableTLS = tls + } + + m.maxRetries = defaultMaxRetries + if val, ok := meta.Properties[maxRetries]; ok && val != "" { + parsedVal, err := strconv.ParseInt(val, defaultBase, defaultBitSize) + if err != nil { + return m, fmt.Errorf("redis store error: can't parse maxRetries field: %s", err) + } + m.maxRetries = int(parsedVal) + } + + m.maxRetryBackoff = defaultMaxRetryBackoff + if val, ok := meta.Properties[maxRetryBackoff]; ok && val != "" { + parsedVal, err := strconv.ParseInt(val, defaultBase, defaultBitSize) + if err != nil { + return m, fmt.Errorf("redis store error: can't parse maxRetryBackoff field: %s", err) + } + m.maxRetryBackoff = time.Duration(parsedVal) + } + + if val, ok := meta.Properties[failover]; ok && val != "" { + failover, err := strconv.ParseBool(val) + if err != nil { + return m, fmt.Errorf("redis store error: can't parse failover field: %s", err) + } + m.failover = failover + } + + // set the sentinelMasterName only with failover == true. + if m.failover { + if val, ok := meta.Properties[sentinelMasterName]; ok && val != "" { + m.sentinelMasterName = val + } else { + return m, errors.New("redis store error: missing sentinelMasterName") + } + } + + return m, nil +} + +// Init does metadata and connection parsing. +func (r *ConfigurationStore) Init(metadata configuration.Metadata) error { + m, err := parseRedisMetadata(metadata) + if err != nil { + return err + } + r.metadata = m + + if r.metadata.failover { + r.client = r.newFailoverClient(m) + } else { + r.client = r.newClient(m) + } + + if _, err = r.client.Ping(context.TODO()).Result(); err != nil { + return fmt.Errorf("redis store: error connecting to redis at %s: %s", m.host, err) + } + + r.replicas, err = r.getConnectedSlaves() + + return err +} + +func (r *ConfigurationStore) newClient(m metadata) *redis.Client { + opts := &redis.Options{ + Addr: m.host, + Password: m.password, + DB: defaultDB, + MaxRetries: m.maxRetries, + MaxRetryBackoff: m.maxRetryBackoff, + } + + // tell the linter to skip a check here. + /* #nosec */ + if m.enableTLS { + opts.TLSConfig = &tls.Config{ + InsecureSkipVerify: m.enableTLS, + } + } + + return redis.NewClient(opts) +} + +func (r *ConfigurationStore) newFailoverClient(m metadata) *redis.Client { + opts := &redis.FailoverOptions{ + MasterName: r.metadata.sentinelMasterName, + SentinelAddrs: []string{r.metadata.host}, + DB: defaultDB, + MaxRetries: m.maxRetries, + MaxRetryBackoff: m.maxRetryBackoff, + } + + /* #nosec */ + if m.enableTLS { + opts.TLSConfig = &tls.Config{ + InsecureSkipVerify: m.enableTLS, + } + } + + return redis.NewFailoverClient(opts) +} + +func (r *ConfigurationStore) getConnectedSlaves() (int, error) { + res, err := r.client.Do(context.Background(), "INFO", "replication").Result() + if err != nil { + return 0, err + } + + // Response example: https://redis.io/commands/info#return-value + // # Replication\r\nrole:master\r\nconnected_slaves:1\r\n + s, _ := strconv.Unquote(fmt.Sprintf("%q", res)) + if len(s) == 0 { + return 0, nil + } + + return r.parseConnectedSlaves(s), nil +} + +func (r *ConfigurationStore) parseConnectedSlaves(res string) int { + infos := strings.Split(res, infoReplicationDelimiter) + for _, info := range infos { + if strings.Contains(info, connectedSlavesReplicas) { + parsedReplicas, _ := strconv.ParseUint(info[len(connectedSlavesReplicas):], 10, 32) + return int(parsedReplicas) + } + } + + return 0 +} + +func (r *ConfigurationStore) Get(ctx context.Context, req *configuration.GetRequest) (*configuration.GetResponse, error) { + keys := req.Keys + var err error + if len(keys) == 0 { + if keys, err = r.client.Keys(ctx, "*").Result(); err != nil { + r.logger.Errorf("failed to all keys, error is %s", err) + } + } + + items := make([]*configuration.Item, 0, 16) + + // query by keys + for _, redisKey := range keys { + item := &configuration.Item{ + Metadata: map[string]string{}, + } + + redisValue, err := r.client.Get(ctx, redisKey).Result() + if err != nil { + return &configuration.GetResponse{}, fmt.Errorf("fail to get configuration for redis key=%s, error is %s", redisKey, err) + } + val, version := internal.GetRedisValueAndVersion(redisValue) + item.Key = redisKey + item.Version = version + item.Value = val + + if item.Value != "" { + items = append(items, item) + } + } + + return &configuration.GetResponse{ + Items: items, + }, nil +} + +func (r *ConfigurationStore) Subscribe(ctx context.Context, req *configuration.SubscribeRequest, handler configuration.UpdateHandler) error { + if len(req.Keys) == 0 { + go r.doSubscribe(ctx, req, handler, keySpaceAny) + return nil + } + for _, k := range req.Keys { + go r.doSubscribe(ctx, req, handler, keySpacePrefix+k) + } + return nil +} + +func (r *ConfigurationStore) doSubscribe(ctx context.Context, req *configuration.SubscribeRequest, handler configuration.UpdateHandler, redisChannel4revision string) { + // enable notify-keyspace-events by redis Set command + r.client.ConfigSet(ctx, "notify-keyspace-events", "KA") + p := r.client.Subscribe(ctx, redisChannel4revision) + for msg := range p.Channel() { + r.handleSubscribedChange(ctx, req, handler, msg) + } +} + +func (r *ConfigurationStore) handleSubscribedChange(ctx context.Context, req *configuration.SubscribeRequest, handler configuration.UpdateHandler, msg *redis.Message) { + defer func() { + if err := recover(); err != nil { + r.logger.Errorf("panic in handleSubscribedChange()method and recovered: %s", err) + } + }() + targetKey, err := internal.ParseRedisKeyFromEvent(msg.Channel) + if err != nil { + r.logger.Errorf("parse redis key failed: %s", err) + return + } + + // get all keys if only one is changed + getResponse, err := r.Get(ctx, &configuration.GetRequest{ + Metadata: req.Metadata, + Keys: []string{targetKey}, + }) + if err != nil { + r.logger.Errorf("get response from redis failed: %s", err) + return + } + + e := &configuration.UpdateEvent{ + Items: getResponse.Items, + } + err = handler(ctx, e) + if err != nil { + r.logger.Errorf("fail to call handler to notify event for configuration update subscribe: %s", err) + } +} diff --git a/configuration/redis/redis_test.go b/configuration/redis/redis_test.go new file mode 100644 index 000000000..e107c7eb9 --- /dev/null +++ b/configuration/redis/redis_test.go @@ -0,0 +1,257 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation and Dapr Contributors. +// Licensed under the MIT License. +// ------------------------------------------------------------ + +package redis + +import ( + "context" + "reflect" + "testing" + "time" + + "github.com/alicebob/miniredis/v2" + "github.com/go-redis/redis/v8" + jsoniter "github.com/json-iterator/go" + "github.com/stretchr/testify/assert" + + "github.com/dapr/components-contrib/configuration" + "github.com/dapr/kit/logger" +) + +func TestConfigurationStore_Get(t *testing.T) { + s, c := setupMiniredis() + defer s.Close() + assert.Nil(t, s.Set("testKey", "testValue")) + assert.Nil(t, s.Set("testKey2", "testValue2")) + + type fields struct { + client *redis.Client + json jsoniter.API + metadata metadata + replicas int + logger logger.Logger + } + type args struct { + ctx context.Context + req *configuration.GetRequest + } + tests := []struct { + name string + fields fields + args args + want *configuration.GetResponse + wantErr bool + }{ + { + name: "normal get redis value", + fields: fields{ + client: c, + json: jsoniter.ConfigFastest, + logger: logger.NewLogger("test"), + }, + args: args{ + req: &configuration.GetRequest{ + Keys: []string{"testKey"}, + }, + ctx: context.Background(), + }, + want: &configuration.GetResponse{ + Items: []*configuration.Item{ + { + Key: "testKey", + Value: "testValue", + Metadata: make(map[string]string), + }, + }, + }, + }, + { + name: "get with no request key", + fields: fields{ + client: c, + json: jsoniter.ConfigFastest, + logger: logger.NewLogger("test"), + }, + args: args{ + req: &configuration.GetRequest{}, + ctx: context.Background(), + }, + want: &configuration.GetResponse{ + Items: []*configuration.Item{ + { + Key: "testKey", + Value: "testValue", + Metadata: make(map[string]string), + }, { + Key: "testKey2", + Value: "testValue2", + Metadata: make(map[string]string), + }, + }, + }, + }, + { + name: "get with not exists key", + fields: fields{ + client: c, + json: jsoniter.ConfigFastest, + logger: logger.NewLogger("test"), + }, + args: args{ + req: &configuration.GetRequest{ + Keys: []string{"notExistKey"}, + }, + ctx: context.Background(), + }, + want: &configuration.GetResponse{ + Items: []*configuration.Item{}, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &ConfigurationStore{ + client: tt.fields.client, + json: tt.fields.json, + metadata: tt.fields.metadata, + replicas: tt.fields.replicas, + logger: tt.fields.logger, + } + got, err := r.Get(tt.args.ctx, tt.args.req) + if (err != nil) != tt.wantErr { + t.Errorf("Get() error = %v, wantErr %v", err, tt.wantErr) + return + } + + if got == nil { + t.Errorf("Get() got configuration response is nil") + return + } + + if len(got.Items) != len(tt.want.Items) { + t.Errorf("Get() got len = %v, want len = %v", len(got.Items), len(tt.want.Items)) + return + } + + if len(got.Items) == 0 { + return + } + + for k := range got.Items { + assert.Equal(t, tt.want.Items[k], got.Items[k]) + } + }) + } +} + +func TestParseConnectedSlaves(t *testing.T) { + store := &ConfigurationStore{logger: logger.NewLogger("test")} + + t.Run("Empty info", func(t *testing.T) { + slaves := store.parseConnectedSlaves("") + assert.Equal(t, 0, slaves, "connected slaves must be 0") + }) + + t.Run("connectedSlaves property is not included", func(t *testing.T) { + slaves := store.parseConnectedSlaves("# Replication\r\nrole:master\r\n") + assert.Equal(t, 0, slaves, "connected slaves must be 0") + }) + + t.Run("connectedSlaves is 2", func(t *testing.T) { + slaves := store.parseConnectedSlaves("# Replication\r\nrole:master\r\nconnected_slaves:2\r\n") + assert.Equal(t, 2, slaves, "connected slaves must be 2") + }) + + t.Run("connectedSlaves is 1", func(t *testing.T) { + slaves := store.parseConnectedSlaves("# Replication\r\nrole:master\r\nconnected_slaves:1") + assert.Equal(t, 1, slaves, "connected slaves must be 1") + }) +} + +func TestNewRedisConfigurationStore(t *testing.T) { + type args struct { + logger logger.Logger + } + tests := []struct { + name string + args args + want configuration.Store + }{ + { + args: args{ + logger: logger.NewLogger("test"), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := NewRedisConfigurationStore(tt.args.logger) + assert.NotNil(t, got) + }) + } +} + +func Test_parseRedisMetadata(t *testing.T) { + type args struct { + meta configuration.Metadata + } + testProperties := make(map[string]string) + testProperties[host] = "testHost" + testProperties[password] = "testPassword" + testProperties[enableTLS] = "true" + testProperties[maxRetries] = "10" + testProperties[maxRetryBackoff] = "1000000000" + testProperties[failover] = "true" + testProperties[sentinelMasterName] = "tesSentinelMasterName" + tests := []struct { + name string + args args + want metadata + wantErr bool + }{ + { + args: args{ + meta: configuration.Metadata{ + Properties: testProperties, + }, + }, + want: metadata{ + host: "testHost", + password: "testPassword", + enableTLS: true, + maxRetries: 10, + maxRetryBackoff: time.Second, + failover: true, + sentinelMasterName: "tesSentinelMasterName", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := parseRedisMetadata(tt.args.meta) + if (err != nil) != tt.wantErr { + t.Errorf("parseRedisMetadata() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("parseRedisMetadata() got = %v, want %v", got, tt.want) + } + }) + } +} + +func setupMiniredis() (*miniredis.Miniredis, *redis.Client) { + s, err := miniredis.Run() + if err != nil { + panic(err) + } + opts := &redis.Options{ + Addr: s.Addr(), + DB: defaultDB, + } + + return s, redis.NewClient(opts) +} diff --git a/configuration/requests.go b/configuration/requests.go new file mode 100644 index 000000000..9525c81ef --- /dev/null +++ b/configuration/requests.go @@ -0,0 +1,31 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation and Dapr Contributors. +// Licensed under the MIT License. +// ------------------------------------------------------------ + +package configuration + +// ConfigurationItem represents a configuration item with name, content and other information. +type Item struct { + Key string `json:"key"` + Value string `json:"value,omitempty"` + Version string `json:"version,omitempty"` + Metadata map[string]string `json:"metadata,omitempty"` +} + +// GetRequest is the object describing a request to get configuration. +type GetRequest struct { + Keys []string `json:"keys"` + Metadata map[string]string `json:"metadata"` +} + +// SubscribeRequest is the object describing a request to subscribe configuration. +type SubscribeRequest struct { + Keys []string `json:"keys"` + Metadata map[string]string `json:"metadata"` +} + +// UpdateEvent is the object describing a configuration update event. +type UpdateEvent struct { + Items []*Item `json:"items"` +} diff --git a/configuration/responses.go b/configuration/responses.go new file mode 100644 index 000000000..15f1d691a --- /dev/null +++ b/configuration/responses.go @@ -0,0 +1,11 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation and Dapr Contributors. +// Licensed under the MIT License. +// ------------------------------------------------------------ + +package configuration + +// GetResponse is the request object for getting configuration. +type GetResponse struct { + Items []*Item `json:"items"` +} diff --git a/configuration/store.go b/configuration/store.go new file mode 100644 index 000000000..74459f800 --- /dev/null +++ b/configuration/store.go @@ -0,0 +1,23 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation and Dapr Contributors. +// Licensed under the MIT License. +// ------------------------------------------------------------ + +package configuration + +import "context" + +// Store is an interface to perform operations on store. +type Store interface { + // Init configuration store. + Init(metadata Metadata) error + + // Get configuration. + Get(ctx context.Context, req *GetRequest) (*GetResponse, error) + + // Subscribe configuration by update event. + Subscribe(ctx context.Context, req *SubscribeRequest, handler UpdateHandler) error +} + +// UpdateHandler is the handler used to send event to daprd. +type UpdateHandler func(ctx context.Context, e *UpdateEvent) error