From 41a5a088941410e99ee41ec7bc7dfe536c986c57 Mon Sep 17 00:00:00 2001 From: Laurence <45508533+LaurenceLiZhixin@users.noreply.github.com> Date: Thu, 17 Feb 2022 22:41:36 +0800 Subject: [PATCH] Fix: unsubscribe by id (#1483) * Fix: unsubscribe by id Signed-off-by: LaurenceLiZhixin <382673304@qq.com> * fix: linter Signed-off-by: LaurenceLiZhixin <382673304@qq.com> * fix: UnSubscribe to Unsubscribe Signed-off-by: LaurenceLiZhixin <382673304@qq.com> * fix: mod tidy all Signed-off-by: LaurenceLiZhixin <382673304@qq.com> * Fix: change uuid repo Signed-off-by: LaurenceLiZhixin <382673304@qq.com> * Fix: mod tidy Signed-off-by: LaurenceLiZhixin <382673304@qq.com> Co-authored-by: Looong Dai --- configuration/redis/redis.go | 55 ++++++++++++++---------------------- configuration/requests.go | 7 +++-- configuration/store.go | 4 +-- 3 files changed, 27 insertions(+), 39 deletions(-) diff --git a/configuration/redis/redis.go b/configuration/redis/redis.go index 698a219e9..a3b98d4eb 100644 --- a/configuration/redis/redis.go +++ b/configuration/redis/redis.go @@ -24,6 +24,7 @@ import ( "time" "github.com/go-redis/redis/v8" + "github.com/google/uuid" jsoniter "github.com/json-iterator/go" "github.com/dapr/components-contrib/configuration" @@ -258,17 +259,14 @@ func (r *ConfigurationStore) Get(ctx context.Context, req *configuration.GetRequ }, nil } -func (r *ConfigurationStore) Subscribe(ctx context.Context, req *configuration.SubscribeRequest, handler configuration.UpdateHandler) error { +func (r *ConfigurationStore) Subscribe(ctx context.Context, req *configuration.SubscribeRequest, handler configuration.UpdateHandler) (string, error) { + subscribeID := uuid.New().String() if len(req.Keys) == 0 { // subscribe all keys stop := make(chan struct{}) - if oldStopChan, ok := r.subscribeStopChanMap.Load(keySpaceAny); ok { - // already exist subscription - close(oldStopChan.(chan struct{})) - } - r.subscribeStopChanMap.Store(keySpaceAny, stop) - go r.doSubscribe(ctx, req, handler, keySpaceAny, stop) - return nil + r.subscribeStopChanMap.Store(subscribeID, stop) + go r.doSubscribe(ctx, req, handler, keySpaceAny, subscribeID, stop) + return subscribeID, nil } for _, k := range req.Keys { // subscribe single key @@ -278,34 +276,22 @@ func (r *ConfigurationStore) Subscribe(ctx context.Context, req *configuration.S // already exist subscription close(oldStopChan.(chan struct{})) } - r.subscribeStopChanMap.Store(keySpacePrefixAndKey, stop) - go r.doSubscribe(ctx, req, handler, keySpacePrefixAndKey, stop) + r.subscribeStopChanMap.Store(subscribeID, stop) + go r.doSubscribe(ctx, req, handler, keySpacePrefixAndKey, subscribeID, stop) + } + return subscribeID, nil +} + +func (r *ConfigurationStore) Unsubscribe(ctx context.Context, req *configuration.UnsubscribeRequest) error { + if oldStopChan, ok := r.subscribeStopChanMap.Load(req.ID); ok { + // already exist subscription + r.subscribeStopChanMap.Delete(req.ID) + close(oldStopChan.(chan struct{})) } return nil } -func (r *ConfigurationStore) Unsubscribe(ctx context.Context, req *configuration.UnSubscribeRequest) error { - if len(req.Keys) == 0 { - if oldStopChan, ok := r.subscribeStopChanMap.Load(keySpaceAny); ok { - // already exist subscription - r.subscribeStopChanMap.Delete(keySpaceAny) - close(oldStopChan.(chan struct{})) - } - return nil - } - for _, k := range req.Keys { - // subscribe single key - keySpacePrefixAndKey := keySpacePrefix + k - if oldStopChan, ok := r.subscribeStopChanMap.Load(keySpacePrefixAndKey); ok { - // already exist subscription - r.subscribeStopChanMap.Delete(keySpacePrefixAndKey) - close(oldStopChan.(chan struct{})) - } - } - return nil -} - -func (r *ConfigurationStore) doSubscribe(ctx context.Context, req *configuration.SubscribeRequest, handler configuration.UpdateHandler, redisChannel4revision string, stop chan struct{}) { +func (r *ConfigurationStore) doSubscribe(ctx context.Context, req *configuration.SubscribeRequest, handler configuration.UpdateHandler, redisChannel4revision string, id string, stop chan struct{}) { // enable notify-keyspace-events by redis Set command r.client.ConfigSet(ctx, "notify-keyspace-events", "KA") p := r.client.Subscribe(ctx, redisChannel4revision) @@ -316,12 +302,12 @@ func (r *ConfigurationStore) doSubscribe(ctx context.Context, req *configuration case <-ctx.Done(): return case msg := <-p.Channel(): - r.handleSubscribedChange(ctx, req, handler, msg) + r.handleSubscribedChange(ctx, req, handler, msg, id) } } } -func (r *ConfigurationStore) handleSubscribedChange(ctx context.Context, req *configuration.SubscribeRequest, handler configuration.UpdateHandler, msg *redis.Message) { +func (r *ConfigurationStore) handleSubscribedChange(ctx context.Context, req *configuration.SubscribeRequest, handler configuration.UpdateHandler, msg *redis.Message, id string) { defer func() { if err := recover(); err != nil { r.logger.Errorf("panic in handleSubscribedChange()method and recovered: %s", err) @@ -345,6 +331,7 @@ func (r *ConfigurationStore) handleSubscribedChange(ctx context.Context, req *co e := &configuration.UpdateEvent{ Items: getResponse.Items, + ID: id, } err = handler(ctx, e) if err != nil { diff --git a/configuration/requests.go b/configuration/requests.go index 2d8962d7c..a6b86997e 100644 --- a/configuration/requests.go +++ b/configuration/requests.go @@ -33,12 +33,13 @@ type SubscribeRequest struct { Metadata map[string]string `json:"metadata"` } -// UnSubscribeRequest is the object describing a request to unsubscribe configuration. -type UnSubscribeRequest struct { - Keys []string `json:"keys"` +// UnsubscribeRequest is the object describing a request to unsubscribe configuration. +type UnsubscribeRequest struct { + ID string `json:"id"` } // UpdateEvent is the object describing a configuration update event. type UpdateEvent struct { + ID string `json:"id"` Items []*Item `json:"items"` } diff --git a/configuration/store.go b/configuration/store.go index c743b5808..4622b0d73 100644 --- a/configuration/store.go +++ b/configuration/store.go @@ -24,10 +24,10 @@ type Store interface { Get(ctx context.Context, req *GetRequest) (*GetResponse, error) // Subscribe configuration by update event. - Subscribe(ctx context.Context, req *SubscribeRequest, handler UpdateHandler) error + Subscribe(ctx context.Context, req *SubscribeRequest, handler UpdateHandler) (string, error) // Unsubscribe configuration with keys - Unsubscribe(ctx context.Context, req *UnSubscribeRequest) error + Unsubscribe(ctx context.Context, req *UnsubscribeRequest) error } // UpdateHandler is the handler used to send event to daprd.