Update - Add new PubSub component - KubeMQ Kubernetes message broker
Signed-off-by: Lior Nabat <lior.nabat@gmail.clom>
This commit is contained in:
parent
04f4f697fd
commit
2e3a46e04d
|
@ -3,10 +3,11 @@ package kubemq
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/dapr/components-contrib/pubsub"
|
"github.com/dapr/components-contrib/pubsub"
|
||||||
"github.com/dapr/kit/logger"
|
"github.com/dapr/kit/logger"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type kubeMQ struct {
|
type kubeMQ struct {
|
||||||
|
@ -71,9 +72,9 @@ func (k *kubeMQ) Close() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func getRandomID() string {
|
func getRandomID() string {
|
||||||
randomUuid, err := uuid.NewRandom()
|
randomUUID, err := uuid.NewRandom()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Sprintf("%d", time.Now().UnixNano())
|
return fmt.Sprintf("%d", time.Now().UnixNano())
|
||||||
}
|
}
|
||||||
return randomUuid.String()
|
return randomUUID.String()
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,11 +2,12 @@ package kubemq
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/dapr/components-contrib/pubsub"
|
"github.com/dapr/components-contrib/pubsub"
|
||||||
"github.com/dapr/kit/logger"
|
"github.com/dapr/kit/logger"
|
||||||
"github.com/kubemq-io/kubemq-go"
|
"github.com/kubemq-io/kubemq-go"
|
||||||
"go.uber.org/atomic"
|
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type kubemqEventsClient interface {
|
type kubemqEventsClient interface {
|
||||||
|
@ -16,6 +17,7 @@ type kubemqEventsClient interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
type kubeMQEvents struct {
|
type kubeMQEvents struct {
|
||||||
|
lock sync.RWMutex
|
||||||
client kubemqEventsClient
|
client kubemqEventsClient
|
||||||
metadata *metadata
|
metadata *metadata
|
||||||
logger logger.Logger
|
logger logger.Logger
|
||||||
|
@ -24,7 +26,7 @@ type kubeMQEvents struct {
|
||||||
waitForResultTimeout time.Duration
|
waitForResultTimeout time.Duration
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
ctxCancel context.CancelFunc
|
ctxCancel context.CancelFunc
|
||||||
isInitialized *atomic.Bool
|
isInitialized bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func newkubeMQEvents(logger logger.Logger) *kubeMQEvents {
|
func newkubeMQEvents(logger logger.Logger) *kubeMQEvents {
|
||||||
|
@ -37,15 +39,19 @@ func newkubeMQEvents(logger logger.Logger) *kubeMQEvents {
|
||||||
waitForResultTimeout: 60 * time.Second,
|
waitForResultTimeout: 60 * time.Second,
|
||||||
ctx: nil,
|
ctx: nil,
|
||||||
ctxCancel: nil,
|
ctxCancel: nil,
|
||||||
isInitialized: atomic.NewBool(false),
|
isInitialized: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
func (k *kubeMQEvents) init() error {
|
func (k *kubeMQEvents) init() error {
|
||||||
k.ctx, k.ctxCancel = context.WithCancel(context.Background())
|
k.lock.RLock()
|
||||||
if k.metadata.useMock {
|
isInit := k.isInitialized
|
||||||
k.isInitialized.Store(true)
|
k.lock.RUnlock()
|
||||||
|
if isInit {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
k.lock.Lock()
|
||||||
|
defer k.lock.Unlock()
|
||||||
|
k.ctx, k.ctxCancel = context.WithCancel(context.Background())
|
||||||
clientID := k.metadata.clientID
|
clientID := k.metadata.clientID
|
||||||
if clientID == "" {
|
if clientID == "" {
|
||||||
clientID = getRandomID()
|
clientID = getRandomID()
|
||||||
|
@ -68,7 +74,7 @@ func (k *kubeMQEvents) init() error {
|
||||||
k.logger.Errorf("error init kubemq client error: %w", err.Error())
|
k.logger.Errorf("error init kubemq client error: %w", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
k.isInitialized.Store(true)
|
k.isInitialized = true
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
func (k *kubeMQEvents) Init(meta *metadata) error {
|
func (k *kubeMQEvents) Init(meta *metadata) error {
|
||||||
|
@ -87,10 +93,8 @@ func (k *kubeMQEvents) setPublishStream() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
func (k *kubeMQEvents) Publish(req *pubsub.PublishRequest) error {
|
func (k *kubeMQEvents) Publish(req *pubsub.PublishRequest) error {
|
||||||
if !k.isInitialized.Load() {
|
if err := k.init(); err != nil {
|
||||||
if err := k.init(); err != nil {
|
return err
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
k.logger.Debugf("kubemq pub/sub: publishing message to %s", req.Topic)
|
k.logger.Debugf("kubemq pub/sub: publishing message to %s", req.Topic)
|
||||||
event := &kubemq.Event{
|
event := &kubemq.Event{
|
||||||
|
@ -112,10 +116,8 @@ func (k *kubeMQEvents) Features() []pubsub.Feature {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *kubeMQEvents) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
|
func (k *kubeMQEvents) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
|
||||||
if !k.isInitialized.Load() {
|
if err := k.init(); err != nil {
|
||||||
if err := k.init(); err != nil {
|
return err
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
clientID := k.metadata.clientID
|
clientID := k.metadata.clientID
|
||||||
if clientID == "" {
|
if clientID == "" {
|
||||||
|
@ -151,7 +153,6 @@ func (k *kubeMQEvents) Subscribe(ctx context.Context, req pubsub.SubscribeReques
|
||||||
k.logger.Errorf("kubemq pub/sub error: error resending message from topic '%s', %s", req.Topic, err.Error())
|
k.logger.Errorf("kubemq pub/sub error: error resending message from topic '%s', %s", req.Topic, err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
k.logger.Errorf("kubemq events pub/sub error: error subscribing to topic '%s', %s", req.Topic, err.Error())
|
k.logger.Errorf("kubemq events pub/sub error: error subscribing to topic '%s', %s", req.Topic, err.Error())
|
||||||
|
|
|
@ -3,11 +3,12 @@ package kubemq
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/dapr/components-contrib/pubsub"
|
"github.com/dapr/components-contrib/pubsub"
|
||||||
"github.com/dapr/kit/logger"
|
"github.com/dapr/kit/logger"
|
||||||
"github.com/kubemq-io/kubemq-go"
|
"github.com/kubemq-io/kubemq-go"
|
||||||
"go.uber.org/atomic"
|
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// interface used to allow unit testing.
|
// interface used to allow unit testing.
|
||||||
|
@ -18,6 +19,7 @@ type kubemqEventsStoreClient interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
type kubeMQEventStore struct {
|
type kubeMQEventStore struct {
|
||||||
|
lock sync.RWMutex
|
||||||
client kubemqEventsStoreClient
|
client kubemqEventsStoreClient
|
||||||
metadata *metadata
|
metadata *metadata
|
||||||
logger logger.Logger
|
logger logger.Logger
|
||||||
|
@ -26,7 +28,7 @@ type kubeMQEventStore struct {
|
||||||
waitForResultTimeout time.Duration
|
waitForResultTimeout time.Duration
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
ctxCancel context.CancelFunc
|
ctxCancel context.CancelFunc
|
||||||
isInitialized *atomic.Bool
|
isInitialized bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func newKubeMQEventsStore(logger logger.Logger) *kubeMQEventStore {
|
func newKubeMQEventsStore(logger logger.Logger) *kubeMQEventStore {
|
||||||
|
@ -39,15 +41,19 @@ func newKubeMQEventsStore(logger logger.Logger) *kubeMQEventStore {
|
||||||
waitForResultTimeout: 60 * time.Second,
|
waitForResultTimeout: 60 * time.Second,
|
||||||
ctx: nil,
|
ctx: nil,
|
||||||
ctxCancel: nil,
|
ctxCancel: nil,
|
||||||
isInitialized: atomic.NewBool(false),
|
isInitialized: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
func (k *kubeMQEventStore) init() error {
|
func (k *kubeMQEventStore) init() error {
|
||||||
k.ctx, k.ctxCancel = context.WithCancel(context.Background())
|
k.lock.RLock()
|
||||||
if k.metadata.useMock {
|
isInit := k.isInitialized
|
||||||
k.isInitialized.Store(true)
|
k.lock.RUnlock()
|
||||||
|
if isInit {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
k.lock.Lock()
|
||||||
|
defer k.lock.Unlock()
|
||||||
|
k.ctx, k.ctxCancel = context.WithCancel(context.Background())
|
||||||
clientID := k.metadata.clientID
|
clientID := k.metadata.clientID
|
||||||
if clientID == "" {
|
if clientID == "" {
|
||||||
clientID = getRandomID()
|
clientID = getRandomID()
|
||||||
|
@ -70,25 +76,9 @@ func (k *kubeMQEventStore) init() error {
|
||||||
k.logger.Errorf("error init kubemq client error: %w", err.Error())
|
k.logger.Errorf("error init kubemq client error: %w", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
k.isInitialized.Store(true)
|
k.isInitialized = true
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
func (k *kubeMQEventStore) getSubscriberClient(clientID string) (kubemqEventsStoreClient, error) {
|
|
||||||
client, err := kubemq.NewEventsStoreClient(k.ctx,
|
|
||||||
kubemq.WithAddress(k.metadata.host, k.metadata.port),
|
|
||||||
kubemq.WithClientId(clientID),
|
|
||||||
kubemq.WithTransportType(kubemq.TransportTypeGRPC),
|
|
||||||
kubemq.WithCheckConnection(true),
|
|
||||||
kubemq.WithAuthToken(k.metadata.authToken),
|
|
||||||
kubemq.WithAutoReconnect(true),
|
|
||||||
kubemq.WithReconnectInterval(time.Second))
|
|
||||||
if err != nil {
|
|
||||||
k.logger.Errorf("error init kubemq client error: %s", err.Error())
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return client, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (k *kubeMQEventStore) Init(meta *metadata) error {
|
func (k *kubeMQEventStore) Init(meta *metadata) error {
|
||||||
k.metadata = meta
|
k.metadata = meta
|
||||||
_ = k.init()
|
_ = k.init()
|
||||||
|
@ -108,10 +98,8 @@ func (k *kubeMQEventStore) setPublishStream() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
func (k *kubeMQEventStore) Publish(req *pubsub.PublishRequest) error {
|
func (k *kubeMQEventStore) Publish(req *pubsub.PublishRequest) error {
|
||||||
if !k.isInitialized.Load() {
|
if err := k.init(); err != nil {
|
||||||
if err := k.init(); err != nil {
|
return err
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
k.logger.Debugf("kubemq pub/sub: publishing message to %s", req.Topic)
|
k.logger.Debugf("kubemq pub/sub: publishing message to %s", req.Topic)
|
||||||
event := &kubemq.EventStore{
|
event := &kubemq.EventStore{
|
||||||
|
@ -141,8 +129,8 @@ func (k *kubeMQEventStore) Features() []pubsub.Feature {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *kubeMQEventStore) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
|
func (k *kubeMQEventStore) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
|
||||||
if k.metadata.useMock {
|
if err := k.init(); err != nil {
|
||||||
return nil
|
return err
|
||||||
}
|
}
|
||||||
clientID := k.metadata.clientID
|
clientID := k.metadata.clientID
|
||||||
if clientID == "" {
|
if clientID == "" {
|
||||||
|
@ -182,7 +170,6 @@ func (k *kubeMQEventStore) Subscribe(ctx context.Context, req pubsub.SubscribeRe
|
||||||
k.logger.Errorf("kubemq pub/sub error: error resending message from topic '%s', %s", req.Topic, err.Error())
|
k.logger.Errorf("kubemq pub/sub error: error resending message from topic '%s', %s", req.Topic, err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
k.logger.Errorf("kubemq pub/sub error: error subscribing to topic '%s', %s", req.Topic, err.Error())
|
k.logger.Errorf("kubemq pub/sub error: error subscribing to topic '%s', %s", req.Topic, err.Error())
|
||||||
|
|
|
@ -14,7 +14,6 @@ type metadata struct {
|
||||||
authToken string
|
authToken string
|
||||||
group string
|
group string
|
||||||
isStore bool
|
isStore bool
|
||||||
useMock bool
|
|
||||||
disableReDelivery bool
|
disableReDelivery bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -71,11 +70,6 @@ func createMetadata(pubSubMetadata pubsub.Metadata) (*metadata, error) {
|
||||||
return nil, fmt.Errorf("invalid kubeMQ store value, store can be true or false")
|
return nil, fmt.Errorf("invalid kubeMQ store value, store can be true or false")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if val, found := pubSubMetadata.Properties["useMock"]; found && val != "" {
|
|
||||||
if val == "true" {
|
|
||||||
result.useMock = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if val, found := pubSubMetadata.Properties["disableReDelivery"]; found && val != "" {
|
if val, found := pubSubMetadata.Properties["disableReDelivery"]; found && val != "" {
|
||||||
if val == "true" {
|
if val == "true" {
|
||||||
result.disableReDelivery = true
|
result.disableReDelivery = true
|
||||||
|
|
|
@ -35,7 +35,6 @@ func Test_createMetadata(t *testing.T) {
|
||||||
authToken: "authToken",
|
authToken: "authToken",
|
||||||
group: "group",
|
group: "group",
|
||||||
isStore: true,
|
isStore: true,
|
||||||
useMock: true,
|
|
||||||
disableReDelivery: true,
|
disableReDelivery: true,
|
||||||
},
|
},
|
||||||
wantErr: false,
|
wantErr: false,
|
||||||
|
|
Loading…
Reference in New Issue