Adding redisDB metadata

Signed-off-by: shivam <shivamkm07@gmail.com>
This commit is contained in:
shivam 2023-01-06 15:48:23 +05:30
parent 15394572dd
commit 874c11ac2a
5 changed files with 87 additions and 21 deletions

View File

@ -19,8 +19,8 @@ import (
) )
const ( const (
channelPrefix = "__keyspace@0__:" keySpacePrefix = "__keyspace@"
separator = "||" separator = "||"
) )
func GetRedisValueAndVersion(redisValue string) (string, string) { func GetRedisValueAndVersion(redisValue string) (string, string) {
@ -34,7 +34,8 @@ func GetRedisValueAndVersion(redisValue string) (string, string) {
return valueAndRevision[0], valueAndRevision[1] return valueAndRevision[0], valueAndRevision[1]
} }
func ParseRedisKeyFromEvent(eventChannel string) (string, error) { func ParseRedisKeyFromChannel(eventChannel string, redisDB int) (string, error) {
channelPrefix := keySpacePrefix + fmt.Sprint(redisDB) + "__:"
index := strings.Index(eventChannel, channelPrefix) index := strings.Index(eventChannel, channelPrefix)
if index == -1 { if index == -1 {
return "", fmt.Errorf("wrong format of event channel, it should start with '%s': eventChannel=%s", channelPrefix, eventChannel) return "", fmt.Errorf("wrong format of event channel, it should start with '%s': eventChannel=%s", channelPrefix, eventChannel)
@ -42,3 +43,8 @@ func ParseRedisKeyFromEvent(eventChannel string) (string, error) {
return eventChannel[len(channelPrefix):], nil return eventChannel[len(channelPrefix):], nil
} }
func GetRedisChannelFromKey(key string, redisDB int) string {
redisEvent := keySpacePrefix + fmt.Sprint(redisDB) + "__:" + key
return redisEvent
}

View File

@ -71,9 +71,10 @@ func TestGetRedisValueAndVersion(t *testing.T) {
} }
} }
func TestParseRedisKeyFromEvent(t *testing.T) { func TestParseRedisKeyFromChannel(t *testing.T) {
type args struct { type args struct {
eventChannel string eventChannel string
redisDB int
} }
tests := []struct { tests := []struct {
name string name string
@ -84,14 +85,24 @@ func TestParseRedisKeyFromEvent(t *testing.T) {
{ {
name: "invalid channel name", name: "invalid channel name",
args: args{ args: args{
eventChannel: "invalie channel name", eventChannel: "invalid channel name",
redisDB: 0,
}, },
want: "", want: "",
wantErr: true, wantErr: true,
}, { }, {
name: "valid channel name", name: "valid channel name with DB 0",
args: args{ args: args{
eventChannel: channelPrefix + "key", eventChannel: keySpacePrefix + "0__:key",
redisDB: 0,
},
want: "key",
wantErr: false,
}, {
name: "valid channel name with DB 1",
args: args{
eventChannel: keySpacePrefix + "1__:key",
redisDB: 1,
}, },
want: "key", want: "key",
wantErr: false, wantErr: false,
@ -99,13 +110,49 @@ func TestParseRedisKeyFromEvent(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
got, err := ParseRedisKeyFromEvent(tt.args.eventChannel) got, err := ParseRedisKeyFromChannel(tt.args.eventChannel, tt.args.redisDB)
if (err != nil) != tt.wantErr { if (err != nil) != tt.wantErr {
t.Errorf("ParseRedisKeyFromEvent() error = %v, wantErr %v", err, tt.wantErr) t.Errorf("ParseRedisKeyFromChannel() error = %v, wantErr %v", err, tt.wantErr)
return return
} }
if got != tt.want { if got != tt.want {
t.Errorf("ParseRedisKeyFromEvent() got = %v, want %v", got, tt.want) t.Errorf("ParseRedisKeyFromChannel() got = %v, want %v", got, tt.want)
}
})
}
}
func TestGetRedisChannelFromKey(t *testing.T) {
type args struct {
key string
redisDB int
}
tests := []struct {
name string
args args
want string
}{
{
name: "key with redisDB 0",
args: args{
key: "key",
redisDB: 0,
},
want: keySpacePrefix + "0__:key",
}, {
name: "key with redisDB 1",
args: args{
key: "key",
redisDB: 1,
},
want: keySpacePrefix + "1__:key",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := GetRedisChannelFromKey(tt.args.key, tt.args.redisDB)
if got != tt.want {
t.Errorf("GetRedisChannelFromKey() got = %v, want %v", got, tt.want)
} }
}) })
} }

View File

@ -23,4 +23,5 @@ type metadata struct {
MaxRetryBackoff time.Duration MaxRetryBackoff time.Duration
EnableTLS bool EnableTLS bool
Failover bool Failover bool
DB int
} }

View File

@ -42,14 +42,13 @@ const (
maxRetryBackoff = "maxRetryBackoff" maxRetryBackoff = "maxRetryBackoff"
failover = "failover" failover = "failover"
sentinelMasterName = "sentinelMasterName" sentinelMasterName = "sentinelMasterName"
redisDB = "redisDB"
defaultBase = 10 defaultBase = 10
defaultBitSize = 0 defaultBitSize = 0
defaultDB = 0 defaultDB = 0
defaultMaxRetries = 3 defaultMaxRetries = 3
defaultMaxRetryBackoff = time.Second * 2 defaultMaxRetryBackoff = time.Second * 2
defaultEnableTLS = false defaultEnableTLS = false
keySpacePrefix = "__keyspace@0__:"
keySpaceAny = "__keyspace@0__:*"
redisWrongTypeIdentifyStr = "WRONGTYPE" redisWrongTypeIdentifyStr = "WRONGTYPE"
) )
@ -131,6 +130,15 @@ func parseRedisMetadata(meta configuration.Metadata) (metadata, error) {
} }
} }
m.DB = defaultDB
if val, ok := meta.Properties[redisDB]; ok && val != "" {
parsedVal, err := strconv.ParseInt(val, defaultBase, defaultBitSize)
if err != nil {
return m, fmt.Errorf("redis store error: can't parse redisDB field: %s", err)
}
m.DB = int(parsedVal)
}
return m, nil return m, nil
} }
@ -161,7 +169,7 @@ func (r *ConfigurationStore) newClient(m metadata) *redis.Client {
opts := &redis.Options{ opts := &redis.Options{
Addr: m.Host, Addr: m.Host,
Password: m.Password, Password: m.Password,
DB: defaultDB, DB: m.DB,
MaxRetries: m.MaxRetries, MaxRetries: m.MaxRetries,
MaxRetryBackoff: m.MaxRetryBackoff, MaxRetryBackoff: m.MaxRetryBackoff,
} }
@ -181,7 +189,7 @@ func (r *ConfigurationStore) newFailoverClient(m metadata) *redis.Client {
opts := &redis.FailoverOptions{ opts := &redis.FailoverOptions{
MasterName: r.metadata.SentinelMasterName, MasterName: r.metadata.SentinelMasterName,
SentinelAddrs: []string{r.metadata.Host}, SentinelAddrs: []string{r.metadata.Host},
DB: defaultDB, DB: m.DB,
MaxRetries: m.MaxRetries, MaxRetries: m.MaxRetries,
MaxRetryBackoff: m.MaxRetryBackoff, MaxRetryBackoff: m.MaxRetryBackoff,
} }
@ -273,8 +281,9 @@ func (r *ConfigurationStore) Subscribe(ctx context.Context, req *configuration.S
if len(req.Keys) == 0 { if len(req.Keys) == 0 {
// subscribe all keys // subscribe all keys
stop := make(chan struct{}) stop := make(chan struct{})
keyStopChanMap[keySpaceAny] = stop allKeysChannel := internal.GetRedisChannelFromKey("*", r.metadata.DB)
go r.doSubscribe(ctx, req, handler, keySpaceAny, subscribeID, stop) keyStopChanMap[allKeysChannel] = stop
go r.doSubscribe(ctx, req, handler, allKeysChannel, subscribeID, stop)
r.subscribeStopChanMap.Store(subscribeID, keyStopChanMap) r.subscribeStopChanMap.Store(subscribeID, keyStopChanMap)
return subscribeID, nil return subscribeID, nil
} }
@ -282,9 +291,9 @@ func (r *ConfigurationStore) Subscribe(ctx context.Context, req *configuration.S
for _, k := range req.Keys { for _, k := range req.Keys {
// subscribe single key // subscribe single key
stop := make(chan struct{}) stop := make(chan struct{})
keySpacePrefixAndKey := keySpacePrefix + k redisChannel := internal.GetRedisChannelFromKey(k, r.metadata.DB)
keyStopChanMap[keySpacePrefixAndKey] = stop keyStopChanMap[redisChannel] = stop
go r.doSubscribe(ctx, req, handler, keySpacePrefixAndKey, subscribeID, stop) go r.doSubscribe(ctx, req, handler, redisChannel, subscribeID, stop)
} }
r.subscribeStopChanMap.Store(subscribeID, keyStopChanMap) r.subscribeStopChanMap.Store(subscribeID, keyStopChanMap)
return subscribeID, nil return subscribeID, nil
@ -306,7 +315,8 @@ func (r *ConfigurationStore) doSubscribe(ctx context.Context, req *configuration
// enable notify-keyspace-events by redis Set command // enable notify-keyspace-events by redis Set command
r.client.ConfigSet(ctx, "notify-keyspace-events", "KA") r.client.ConfigSet(ctx, "notify-keyspace-events", "KA")
var p *redis.PubSub var p *redis.PubSub
if redisChannel4revision == keySpaceAny { allKeysChannel := internal.GetRedisChannelFromKey("*", r.metadata.DB)
if redisChannel4revision == allKeysChannel {
p = r.client.PSubscribe(ctx, redisChannel4revision) p = r.client.PSubscribe(ctx, redisChannel4revision)
} else { } else {
p = r.client.Subscribe(ctx, redisChannel4revision) p = r.client.Subscribe(ctx, redisChannel4revision)
@ -325,7 +335,7 @@ func (r *ConfigurationStore) doSubscribe(ctx context.Context, req *configuration
} }
func (r *ConfigurationStore) handleSubscribedChange(ctx context.Context, req *configuration.SubscribeRequest, handler configuration.UpdateHandler, msg *redis.Message, id string) { func (r *ConfigurationStore) handleSubscribedChange(ctx context.Context, req *configuration.SubscribeRequest, handler configuration.UpdateHandler, msg *redis.Message, id string) {
targetKey, err := internal.ParseRedisKeyFromEvent(msg.Channel) targetKey, err := internal.ParseRedisKeyFromChannel(msg.Channel, r.metadata.DB)
if err != nil { if err != nil {
r.logger.Errorf("parse redis key failed: %s", err) r.logger.Errorf("parse redis key failed: %s", err)
return return

View File

@ -250,6 +250,7 @@ func Test_parseRedisMetadata(t *testing.T) {
testProperties[maxRetryBackoff] = "1000000000" testProperties[maxRetryBackoff] = "1000000000"
testProperties[failover] = "true" testProperties[failover] = "true"
testProperties[sentinelMasterName] = "tesSentinelMasterName" testProperties[sentinelMasterName] = "tesSentinelMasterName"
testProperties[redisDB] = "0"
tests := []struct { tests := []struct {
name string name string
args args args args
@ -270,6 +271,7 @@ func Test_parseRedisMetadata(t *testing.T) {
MaxRetryBackoff: time.Second, MaxRetryBackoff: time.Second,
Failover: true, Failover: true,
SentinelMasterName: "tesSentinelMasterName", SentinelMasterName: "tesSentinelMasterName",
DB: 0,
}, },
}, },
} }