diff --git a/pubsub/kubemq/kubemq.go b/pubsub/kubemq/kubemq.go index 4296a3519..7d998a602 100644 --- a/pubsub/kubemq/kubemq.go +++ b/pubsub/kubemq/kubemq.go @@ -3,10 +3,11 @@ package kubemq import ( "context" "fmt" + "time" + "github.com/dapr/components-contrib/pubsub" "github.com/dapr/kit/logger" "github.com/google/uuid" - "time" ) type kubeMQ struct { @@ -71,9 +72,9 @@ func (k *kubeMQ) Close() error { } func getRandomID() string { - randomUuid, err := uuid.NewRandom() + randomUUID, err := uuid.NewRandom() if err != nil { return fmt.Sprintf("%d", time.Now().UnixNano()) } - return randomUuid.String() + return randomUUID.String() } diff --git a/pubsub/kubemq/kubemq_events.go b/pubsub/kubemq/kubemq_events.go index cf1a3bbf7..aa47bbc06 100644 --- a/pubsub/kubemq/kubemq_events.go +++ b/pubsub/kubemq/kubemq_events.go @@ -2,11 +2,12 @@ package kubemq import ( "context" + "sync" + "time" + "github.com/dapr/components-contrib/pubsub" "github.com/dapr/kit/logger" "github.com/kubemq-io/kubemq-go" - "go.uber.org/atomic" - "time" ) type kubemqEventsClient interface { @@ -16,6 +17,7 @@ type kubemqEventsClient interface { } type kubeMQEvents struct { + lock sync.RWMutex client kubemqEventsClient metadata *metadata logger logger.Logger @@ -24,7 +26,7 @@ type kubeMQEvents struct { waitForResultTimeout time.Duration ctx context.Context ctxCancel context.CancelFunc - isInitialized *atomic.Bool + isInitialized bool } func newkubeMQEvents(logger logger.Logger) *kubeMQEvents { @@ -37,15 +39,19 @@ func newkubeMQEvents(logger logger.Logger) *kubeMQEvents { waitForResultTimeout: 60 * time.Second, ctx: nil, ctxCancel: nil, - isInitialized: atomic.NewBool(false), + isInitialized: false, } } func (k *kubeMQEvents) init() error { - k.ctx, k.ctxCancel = context.WithCancel(context.Background()) - if k.metadata.useMock { - k.isInitialized.Store(true) + k.lock.RLock() + isInit := k.isInitialized + k.lock.RUnlock() + if isInit { return nil } + k.lock.Lock() + defer k.lock.Unlock() + k.ctx, k.ctxCancel = context.WithCancel(context.Background()) clientID := k.metadata.clientID if clientID == "" { clientID = getRandomID() @@ -68,7 +74,7 @@ func (k *kubeMQEvents) init() error { k.logger.Errorf("error init kubemq client error: %w", err.Error()) return err } - k.isInitialized.Store(true) + k.isInitialized = true return nil } func (k *kubeMQEvents) Init(meta *metadata) error { @@ -87,10 +93,8 @@ func (k *kubeMQEvents) setPublishStream() error { return err } func (k *kubeMQEvents) Publish(req *pubsub.PublishRequest) error { - if !k.isInitialized.Load() { - if err := k.init(); err != nil { - return err - } + if err := k.init(); err != nil { + return err } k.logger.Debugf("kubemq pub/sub: publishing message to %s", req.Topic) event := &kubemq.Event{ @@ -112,10 +116,8 @@ func (k *kubeMQEvents) Features() []pubsub.Feature { } func (k *kubeMQEvents) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error { - if !k.isInitialized.Load() { - if err := k.init(); err != nil { - return err - } + if err := k.init(); err != nil { + return err } clientID := k.metadata.clientID if clientID == "" { @@ -151,7 +153,6 @@ func (k *kubeMQEvents) Subscribe(ctx context.Context, req pubsub.SubscribeReques k.logger.Errorf("kubemq pub/sub error: error resending message from topic '%s', %s", req.Topic, err.Error()) } } - }) if err != nil { k.logger.Errorf("kubemq events pub/sub error: error subscribing to topic '%s', %s", req.Topic, err.Error()) diff --git a/pubsub/kubemq/kubemq_eventstore.go b/pubsub/kubemq/kubemq_eventstore.go index e2eec5447..093c64586 100644 --- a/pubsub/kubemq/kubemq_eventstore.go +++ b/pubsub/kubemq/kubemq_eventstore.go @@ -3,11 +3,12 @@ package kubemq import ( "context" "fmt" + "sync" + "time" + "github.com/dapr/components-contrib/pubsub" "github.com/dapr/kit/logger" "github.com/kubemq-io/kubemq-go" - "go.uber.org/atomic" - "time" ) // interface used to allow unit testing. @@ -18,6 +19,7 @@ type kubemqEventsStoreClient interface { } type kubeMQEventStore struct { + lock sync.RWMutex client kubemqEventsStoreClient metadata *metadata logger logger.Logger @@ -26,7 +28,7 @@ type kubeMQEventStore struct { waitForResultTimeout time.Duration ctx context.Context ctxCancel context.CancelFunc - isInitialized *atomic.Bool + isInitialized bool } func newKubeMQEventsStore(logger logger.Logger) *kubeMQEventStore { @@ -39,15 +41,19 @@ func newKubeMQEventsStore(logger logger.Logger) *kubeMQEventStore { waitForResultTimeout: 60 * time.Second, ctx: nil, ctxCancel: nil, - isInitialized: atomic.NewBool(false), + isInitialized: false, } } func (k *kubeMQEventStore) init() error { - k.ctx, k.ctxCancel = context.WithCancel(context.Background()) - if k.metadata.useMock { - k.isInitialized.Store(true) + k.lock.RLock() + isInit := k.isInitialized + k.lock.RUnlock() + if isInit { return nil } + k.lock.Lock() + defer k.lock.Unlock() + k.ctx, k.ctxCancel = context.WithCancel(context.Background()) clientID := k.metadata.clientID if clientID == "" { clientID = getRandomID() @@ -70,25 +76,9 @@ func (k *kubeMQEventStore) init() error { k.logger.Errorf("error init kubemq client error: %w", err.Error()) return err } - k.isInitialized.Store(true) + k.isInitialized = true return nil } -func (k *kubeMQEventStore) getSubscriberClient(clientID string) (kubemqEventsStoreClient, error) { - client, err := kubemq.NewEventsStoreClient(k.ctx, - kubemq.WithAddress(k.metadata.host, k.metadata.port), - kubemq.WithClientId(clientID), - kubemq.WithTransportType(kubemq.TransportTypeGRPC), - kubemq.WithCheckConnection(true), - kubemq.WithAuthToken(k.metadata.authToken), - kubemq.WithAutoReconnect(true), - kubemq.WithReconnectInterval(time.Second)) - if err != nil { - k.logger.Errorf("error init kubemq client error: %s", err.Error()) - return nil, err - } - return client, nil -} - func (k *kubeMQEventStore) Init(meta *metadata) error { k.metadata = meta _ = k.init() @@ -108,10 +98,8 @@ func (k *kubeMQEventStore) setPublishStream() error { return nil } func (k *kubeMQEventStore) Publish(req *pubsub.PublishRequest) error { - if !k.isInitialized.Load() { - if err := k.init(); err != nil { - return err - } + if err := k.init(); err != nil { + return err } k.logger.Debugf("kubemq pub/sub: publishing message to %s", req.Topic) event := &kubemq.EventStore{ @@ -141,8 +129,8 @@ func (k *kubeMQEventStore) Features() []pubsub.Feature { } func (k *kubeMQEventStore) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error { - if k.metadata.useMock { - return nil + if err := k.init(); err != nil { + return err } clientID := k.metadata.clientID if clientID == "" { @@ -182,7 +170,6 @@ func (k *kubeMQEventStore) Subscribe(ctx context.Context, req pubsub.SubscribeRe k.logger.Errorf("kubemq pub/sub error: error resending message from topic '%s', %s", req.Topic, err.Error()) } } - }) if err != nil { k.logger.Errorf("kubemq pub/sub error: error subscribing to topic '%s', %s", req.Topic, err.Error()) diff --git a/pubsub/kubemq/metadata.go b/pubsub/kubemq/metadata.go index 123d57108..5fdc4d676 100644 --- a/pubsub/kubemq/metadata.go +++ b/pubsub/kubemq/metadata.go @@ -14,7 +14,6 @@ type metadata struct { authToken string group string isStore bool - useMock bool disableReDelivery bool } @@ -71,11 +70,6 @@ func createMetadata(pubSubMetadata pubsub.Metadata) (*metadata, error) { return nil, fmt.Errorf("invalid kubeMQ store value, store can be true or false") } } - if val, found := pubSubMetadata.Properties["useMock"]; found && val != "" { - if val == "true" { - result.useMock = true - } - } if val, found := pubSubMetadata.Properties["disableReDelivery"]; found && val != "" { if val == "true" { result.disableReDelivery = true diff --git a/pubsub/kubemq/metadata_test.go b/pubsub/kubemq/metadata_test.go index 63ee30d5b..a5a81bb19 100644 --- a/pubsub/kubemq/metadata_test.go +++ b/pubsub/kubemq/metadata_test.go @@ -35,7 +35,6 @@ func Test_createMetadata(t *testing.T) { authToken: "authToken", group: "group", isStore: true, - useMock: true, disableReDelivery: true, }, wantErr: false,