components-contrib/pubsub/kubemq/kubemq_eventstore.go

225 lines
6.0 KiB
Go

package kubemq
import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"time"
"github.com/kubemq-io/kubemq-go"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
)
// interface used to allow unit testing.
type kubemqEventsStoreClient interface {
Stream(ctx context.Context, onResult func(result *kubemq.EventStoreResult, err error)) (func(msg *kubemq.EventStore) error, error)
Subscribe(ctx context.Context, request *kubemq.EventsStoreSubscription, onEvent func(msg *kubemq.EventStoreReceive, err error)) error
Close() error
}
type kubeMQEventStore struct {
lock sync.RWMutex
client kubemqEventsStoreClient
metadata *kubemqMetadata
logger logger.Logger
publishFunc func(msg *kubemq.EventStore) error
resultChan chan *kubemq.EventStoreResult
waitForResultTimeout time.Duration
ctx context.Context
ctxCancel context.CancelFunc
isInitialized bool
}
func newKubeMQEventsStore(logger logger.Logger) *kubeMQEventStore {
return &kubeMQEventStore{
client: nil,
metadata: nil,
logger: logger,
publishFunc: nil,
resultChan: make(chan *kubemq.EventStoreResult, 1),
waitForResultTimeout: 10 * time.Second,
ctx: nil,
ctxCancel: nil,
isInitialized: false,
}
}
func (k *kubeMQEventStore) init() error {
k.lock.RLock()
isInit := k.isInitialized
k.lock.RUnlock()
if isInit {
return nil
}
k.lock.Lock()
defer k.lock.Unlock()
k.ctx, k.ctxCancel = context.WithCancel(context.Background())
clientID := k.metadata.ClientID
if clientID == "" {
clientID = getRandomID()
}
client, err := kubemq.NewEventsStoreClient(k.ctx,
kubemq.WithAddress(k.metadata.internalHost, k.metadata.internalPort),
kubemq.WithClientId(clientID),
kubemq.WithTransportType(kubemq.TransportTypeGRPC),
kubemq.WithCheckConnection(true),
kubemq.WithAuthToken(k.metadata.AuthToken),
kubemq.WithAutoReconnect(true),
kubemq.WithReconnectInterval(time.Second))
if err != nil {
k.logger.Errorf("error init kubemq client error: %s", err.Error())
return err
}
k.ctx, k.ctxCancel = context.WithCancel(context.Background())
k.client = client
if err := k.setPublishStream(); err != nil {
k.logger.Errorf("error init kubemq client error: %w", err.Error())
return err
}
k.isInitialized = true
return nil
}
func (k *kubeMQEventStore) Init(meta *kubemqMetadata) error {
k.metadata = meta
_ = k.init()
return nil
}
func (k *kubeMQEventStore) setPublishStream() error {
var err error
k.publishFunc, err = k.client.Stream(k.ctx, func(result *kubemq.EventStoreResult, err error) {
select {
case k.resultChan <- result:
default:
}
})
if err != nil {
return err
}
return nil
}
func (k *kubeMQEventStore) Publish(req *pubsub.PublishRequest) error {
if err := k.init(); err != nil {
return err
}
if req.Topic == "" {
return errors.New("kubemq pub/sub error: topic is required")
}
k.logger.Debugf("kubemq pub/sub: publishing message to %s", req.Topic)
metadata := ""
if req.Metadata != nil {
data, err := json.Marshal(req.Metadata)
if err != nil {
return fmt.Errorf("kubemq pub/sub error: failed to marshal metadata: %s", err.Error())
}
metadata = string(data)
}
contentType := ""
if req.ContentType != nil {
contentType = *req.ContentType
}
event := &kubemq.EventStore{
Id: "",
Channel: req.Topic,
Metadata: metadata,
Body: req.Data,
ClientId: k.metadata.ClientID,
Tags: map[string]string{
"ContentType": contentType,
},
}
if err := k.publishFunc(event); err != nil {
k.logger.Errorf("kubemq pub/sub error: publishing to %s failed with error: %s", req.Topic, err.Error())
return err
}
select {
case res := <-k.resultChan:
if res != nil && res.Err != nil {
return res.Err
}
case <-time.After(k.waitForResultTimeout):
return errors.New("kubemq pub/sub error: timeout waiting for response")
}
return nil
}
func (k *kubeMQEventStore) Features() []pubsub.Feature {
return nil
}
func (k *kubeMQEventStore) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
if err := k.init(); err != nil {
return err
}
clientID := k.metadata.ClientID
if clientID == "" {
clientID = getRandomID()
}
k.logger.Debugf("kubemq pub/sub: subscribing to %s", req.Topic)
err := k.client.Subscribe(ctx, &kubemq.EventsStoreSubscription{
Channel: req.Topic,
Group: k.metadata.Group,
ClientId: clientID,
SubscriptionType: kubemq.StartFromNewEvents(),
}, func(event *kubemq.EventStoreReceive, err error) {
if err != nil {
k.logger.Errorf("kubemq pub/sub error: subscribing to %s failed with error: %s", req.Topic, err.Error())
return
}
if ctx.Err() != nil {
return
}
var metadata map[string]string
if event.Metadata != "" {
_ = json.Unmarshal([]byte(event.Metadata), &metadata)
}
var contentType *string
if event.Tags != nil {
if event.Tags["ContentType"] != "" {
*contentType = event.Tags["ContentType"]
}
}
msg := &pubsub.NewMessage{
Data: event.Body,
Topic: req.Topic,
Metadata: metadata,
ContentType: contentType,
}
if err := handler(ctx, msg); err != nil {
k.logger.Errorf("kubemq pub/sub error: error handling message from topic '%s', %s, resending...", req.Topic, err.Error())
if k.metadata.DisableReDelivery {
return
}
if err := k.Publish(&pubsub.PublishRequest{
Data: msg.Data,
Topic: msg.Topic,
}); err != nil {
k.logger.Errorf("kubemq pub/sub error: error resending message from topic '%s', %s", req.Topic, err.Error())
}
}
})
if err != nil {
k.logger.Errorf("kubemq pub/sub error: error subscribing to topic '%s', %s", req.Topic, err.Error())
return err
}
time.Sleep(1 * time.Second)
k.logger.Debugf("kubemq pub/sub: subscribed to %s completed", req.Topic)
return nil
}
func (k *kubeMQEventStore) Close() error {
if k.ctxCancel != nil {
k.ctxCancel()
}
if k.client == nil {
return nil
}
return k.client.Close()
}