Fix: add configuration unsubscribe api (#1440)
Signed-off-by: LaurenceLiZhixin <382673304@qq.com> Co-authored-by: Looong Dai <long.dai@intel.com>
This commit is contained in:
parent
db9e071449
commit
2bf7f68e0c
|
@ -20,6 +20,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/go-redis/redis/v8"
|
"github.com/go-redis/redis/v8"
|
||||||
|
@ -52,10 +53,11 @@ const (
|
||||||
|
|
||||||
// ConfigurationStore is a Redis configuration store.
|
// ConfigurationStore is a Redis configuration store.
|
||||||
type ConfigurationStore struct {
|
type ConfigurationStore struct {
|
||||||
client redis.UniversalClient
|
client redis.UniversalClient
|
||||||
json jsoniter.API
|
json jsoniter.API
|
||||||
metadata metadata
|
metadata metadata
|
||||||
replicas int
|
replicas int
|
||||||
|
subscribeStopChanMap sync.Map
|
||||||
|
|
||||||
logger logger.Logger
|
logger logger.Logger
|
||||||
}
|
}
|
||||||
|
@ -258,21 +260,59 @@ func (r *ConfigurationStore) Get(ctx context.Context, req *configuration.GetRequ
|
||||||
|
|
||||||
func (r *ConfigurationStore) Subscribe(ctx context.Context, req *configuration.SubscribeRequest, handler configuration.UpdateHandler) error {
|
func (r *ConfigurationStore) Subscribe(ctx context.Context, req *configuration.SubscribeRequest, handler configuration.UpdateHandler) error {
|
||||||
if len(req.Keys) == 0 {
|
if len(req.Keys) == 0 {
|
||||||
go r.doSubscribe(ctx, req, handler, keySpaceAny)
|
// subscribe all keys
|
||||||
|
stop := make(chan struct{})
|
||||||
|
if oldStopChan, ok := r.subscribeStopChanMap.Load(keySpaceAny); ok {
|
||||||
|
// already exist subscription
|
||||||
|
close(oldStopChan.(chan struct{}))
|
||||||
|
}
|
||||||
|
r.subscribeStopChanMap.Store(keySpaceAny, stop)
|
||||||
|
go r.doSubscribe(ctx, req, handler, keySpaceAny, stop)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
for _, k := range req.Keys {
|
for _, k := range req.Keys {
|
||||||
go r.doSubscribe(ctx, req, handler, keySpacePrefix+k)
|
// subscribe single key
|
||||||
|
stop := make(chan struct{})
|
||||||
|
keySpacePrefixAndKey := keySpacePrefix + k
|
||||||
|
if oldStopChan, ok := r.subscribeStopChanMap.Load(keySpacePrefixAndKey); ok {
|
||||||
|
// already exist subscription
|
||||||
|
close(oldStopChan.(chan struct{}))
|
||||||
|
}
|
||||||
|
r.subscribeStopChanMap.Store(keySpacePrefixAndKey, stop)
|
||||||
|
go r.doSubscribe(ctx, req, handler, keySpacePrefixAndKey, stop)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ConfigurationStore) doSubscribe(ctx context.Context, req *configuration.SubscribeRequest, handler configuration.UpdateHandler, redisChannel4revision string) {
|
func (r *ConfigurationStore) Unsubscribe(ctx context.Context, req *configuration.UnSubscribeRequest) error {
|
||||||
|
if len(req.Keys) == 0 {
|
||||||
|
if oldStopChan, ok := r.subscribeStopChanMap.Load(keySpaceAny); ok {
|
||||||
|
// already exist subscription
|
||||||
|
r.subscribeStopChanMap.Delete(keySpaceAny)
|
||||||
|
close(oldStopChan.(chan struct{}))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
for _, k := range req.Keys {
|
||||||
|
// subscribe single key
|
||||||
|
keySpacePrefixAndKey := keySpacePrefix + k
|
||||||
|
if oldStopChan, ok := r.subscribeStopChanMap.Load(keySpacePrefixAndKey); ok {
|
||||||
|
// already exist subscription
|
||||||
|
r.subscribeStopChanMap.Delete(keySpacePrefixAndKey)
|
||||||
|
close(oldStopChan.(chan struct{}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ConfigurationStore) doSubscribe(ctx context.Context, req *configuration.SubscribeRequest, handler configuration.UpdateHandler, redisChannel4revision string, stop chan struct{}) {
|
||||||
// enable notify-keyspace-events by redis Set command
|
// enable notify-keyspace-events by redis Set command
|
||||||
r.client.ConfigSet(ctx, "notify-keyspace-events", "KA")
|
r.client.ConfigSet(ctx, "notify-keyspace-events", "KA")
|
||||||
p := r.client.Subscribe(ctx, redisChannel4revision)
|
p := r.client.Subscribe(ctx, redisChannel4revision)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
case <-stop:
|
||||||
|
return
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case msg := <-p.Channel():
|
case msg := <-p.Channel():
|
||||||
|
|
|
@ -33,6 +33,11 @@ type SubscribeRequest struct {
|
||||||
Metadata map[string]string `json:"metadata"`
|
Metadata map[string]string `json:"metadata"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UnSubscribeRequest is the object describing a request to unsubscribe configuration.
|
||||||
|
type UnSubscribeRequest struct {
|
||||||
|
Keys []string `json:"keys"`
|
||||||
|
}
|
||||||
|
|
||||||
// UpdateEvent is the object describing a configuration update event.
|
// UpdateEvent is the object describing a configuration update event.
|
||||||
type UpdateEvent struct {
|
type UpdateEvent struct {
|
||||||
Items []*Item `json:"items"`
|
Items []*Item `json:"items"`
|
||||||
|
|
|
@ -25,6 +25,9 @@ type Store interface {
|
||||||
|
|
||||||
// Subscribe configuration by update event.
|
// Subscribe configuration by update event.
|
||||||
Subscribe(ctx context.Context, req *SubscribeRequest, handler UpdateHandler) error
|
Subscribe(ctx context.Context, req *SubscribeRequest, handler UpdateHandler) error
|
||||||
|
|
||||||
|
// Unsubscribe configuration with keys
|
||||||
|
Unsubscribe(ctx context.Context, req *UnSubscribeRequest) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateHandler is the handler used to send event to daprd.
|
// UpdateHandler is the handler used to send event to daprd.
|
||||||
|
|
Loading…
Reference in New Issue