Fix: unsubscribe by id (#1483)
* Fix: unsubscribe by id Signed-off-by: LaurenceLiZhixin <382673304@qq.com> * fix: linter Signed-off-by: LaurenceLiZhixin <382673304@qq.com> * fix: UnSubscribe to Unsubscribe Signed-off-by: LaurenceLiZhixin <382673304@qq.com> * fix: mod tidy all Signed-off-by: LaurenceLiZhixin <382673304@qq.com> * Fix: change uuid repo Signed-off-by: LaurenceLiZhixin <382673304@qq.com> * Fix: mod tidy Signed-off-by: LaurenceLiZhixin <382673304@qq.com> Co-authored-by: Looong Dai <long.dai@intel.com>
This commit is contained in:
parent
14419b421a
commit
41a5a08894
|
@ -24,6 +24,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/google/uuid"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
|
||||
"github.com/dapr/components-contrib/configuration"
|
||||
|
@ -258,17 +259,14 @@ func (r *ConfigurationStore) Get(ctx context.Context, req *configuration.GetRequ
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (r *ConfigurationStore) Subscribe(ctx context.Context, req *configuration.SubscribeRequest, handler configuration.UpdateHandler) error {
|
||||
func (r *ConfigurationStore) Subscribe(ctx context.Context, req *configuration.SubscribeRequest, handler configuration.UpdateHandler) (string, error) {
|
||||
subscribeID := uuid.New().String()
|
||||
if len(req.Keys) == 0 {
|
||||
// subscribe all keys
|
||||
stop := make(chan struct{})
|
||||
if oldStopChan, ok := r.subscribeStopChanMap.Load(keySpaceAny); ok {
|
||||
// already exist subscription
|
||||
close(oldStopChan.(chan struct{}))
|
||||
}
|
||||
r.subscribeStopChanMap.Store(keySpaceAny, stop)
|
||||
go r.doSubscribe(ctx, req, handler, keySpaceAny, stop)
|
||||
return nil
|
||||
r.subscribeStopChanMap.Store(subscribeID, stop)
|
||||
go r.doSubscribe(ctx, req, handler, keySpaceAny, subscribeID, stop)
|
||||
return subscribeID, nil
|
||||
}
|
||||
for _, k := range req.Keys {
|
||||
// subscribe single key
|
||||
|
@ -278,34 +276,22 @@ func (r *ConfigurationStore) Subscribe(ctx context.Context, req *configuration.S
|
|||
// already exist subscription
|
||||
close(oldStopChan.(chan struct{}))
|
||||
}
|
||||
r.subscribeStopChanMap.Store(keySpacePrefixAndKey, stop)
|
||||
go r.doSubscribe(ctx, req, handler, keySpacePrefixAndKey, stop)
|
||||
r.subscribeStopChanMap.Store(subscribeID, stop)
|
||||
go r.doSubscribe(ctx, req, handler, keySpacePrefixAndKey, subscribeID, stop)
|
||||
}
|
||||
return subscribeID, nil
|
||||
}
|
||||
|
||||
func (r *ConfigurationStore) Unsubscribe(ctx context.Context, req *configuration.UnsubscribeRequest) error {
|
||||
if oldStopChan, ok := r.subscribeStopChanMap.Load(req.ID); ok {
|
||||
// already exist subscription
|
||||
r.subscribeStopChanMap.Delete(req.ID)
|
||||
close(oldStopChan.(chan struct{}))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *ConfigurationStore) Unsubscribe(ctx context.Context, req *configuration.UnSubscribeRequest) error {
|
||||
if len(req.Keys) == 0 {
|
||||
if oldStopChan, ok := r.subscribeStopChanMap.Load(keySpaceAny); ok {
|
||||
// already exist subscription
|
||||
r.subscribeStopChanMap.Delete(keySpaceAny)
|
||||
close(oldStopChan.(chan struct{}))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
for _, k := range req.Keys {
|
||||
// subscribe single key
|
||||
keySpacePrefixAndKey := keySpacePrefix + k
|
||||
if oldStopChan, ok := r.subscribeStopChanMap.Load(keySpacePrefixAndKey); ok {
|
||||
// already exist subscription
|
||||
r.subscribeStopChanMap.Delete(keySpacePrefixAndKey)
|
||||
close(oldStopChan.(chan struct{}))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *ConfigurationStore) doSubscribe(ctx context.Context, req *configuration.SubscribeRequest, handler configuration.UpdateHandler, redisChannel4revision string, stop chan struct{}) {
|
||||
func (r *ConfigurationStore) doSubscribe(ctx context.Context, req *configuration.SubscribeRequest, handler configuration.UpdateHandler, redisChannel4revision string, id string, stop chan struct{}) {
|
||||
// enable notify-keyspace-events by redis Set command
|
||||
r.client.ConfigSet(ctx, "notify-keyspace-events", "KA")
|
||||
p := r.client.Subscribe(ctx, redisChannel4revision)
|
||||
|
@ -316,12 +302,12 @@ func (r *ConfigurationStore) doSubscribe(ctx context.Context, req *configuration
|
|||
case <-ctx.Done():
|
||||
return
|
||||
case msg := <-p.Channel():
|
||||
r.handleSubscribedChange(ctx, req, handler, msg)
|
||||
r.handleSubscribedChange(ctx, req, handler, msg, id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *ConfigurationStore) handleSubscribedChange(ctx context.Context, req *configuration.SubscribeRequest, handler configuration.UpdateHandler, msg *redis.Message) {
|
||||
func (r *ConfigurationStore) handleSubscribedChange(ctx context.Context, req *configuration.SubscribeRequest, handler configuration.UpdateHandler, msg *redis.Message, id string) {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
r.logger.Errorf("panic in handleSubscribedChange()method and recovered: %s", err)
|
||||
|
@ -345,6 +331,7 @@ func (r *ConfigurationStore) handleSubscribedChange(ctx context.Context, req *co
|
|||
|
||||
e := &configuration.UpdateEvent{
|
||||
Items: getResponse.Items,
|
||||
ID: id,
|
||||
}
|
||||
err = handler(ctx, e)
|
||||
if err != nil {
|
||||
|
|
|
@ -33,12 +33,13 @@ type SubscribeRequest struct {
|
|||
Metadata map[string]string `json:"metadata"`
|
||||
}
|
||||
|
||||
// UnSubscribeRequest is the object describing a request to unsubscribe configuration.
|
||||
type UnSubscribeRequest struct {
|
||||
Keys []string `json:"keys"`
|
||||
// UnsubscribeRequest is the object describing a request to unsubscribe configuration.
|
||||
type UnsubscribeRequest struct {
|
||||
ID string `json:"id"`
|
||||
}
|
||||
|
||||
// UpdateEvent is the object describing a configuration update event.
|
||||
type UpdateEvent struct {
|
||||
ID string `json:"id"`
|
||||
Items []*Item `json:"items"`
|
||||
}
|
||||
|
|
|
@ -24,10 +24,10 @@ type Store interface {
|
|||
Get(ctx context.Context, req *GetRequest) (*GetResponse, error)
|
||||
|
||||
// Subscribe configuration by update event.
|
||||
Subscribe(ctx context.Context, req *SubscribeRequest, handler UpdateHandler) error
|
||||
Subscribe(ctx context.Context, req *SubscribeRequest, handler UpdateHandler) (string, error)
|
||||
|
||||
// Unsubscribe configuration with keys
|
||||
Unsubscribe(ctx context.Context, req *UnSubscribeRequest) error
|
||||
Unsubscribe(ctx context.Context, req *UnsubscribeRequest) error
|
||||
}
|
||||
|
||||
// UpdateHandler is the handler used to send event to daprd.
|
||||
|
|
Loading…
Reference in New Issue