components-contrib/pubsub/kubemq/kubemq.go

87 lines
2.0 KiB
Go

package kubemq
import (
"context"
"reflect"
"strconv"
"time"
"github.com/google/uuid"
contribMetadata "github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
)
type kubeMQ struct {
metadata *kubemqMetadata
logger logger.Logger
eventsClient *kubeMQEvents
eventStoreClient *kubeMQEventStore
}
func NewKubeMQ(logger logger.Logger) pubsub.PubSub {
return &kubeMQ{
logger: logger,
}
}
func (k *kubeMQ) Init(_ context.Context, metadata pubsub.Metadata) error {
meta, err := createMetadata(metadata)
if err != nil {
k.logger.Errorf("error init kubemq client error: %s", err.Error())
return err
}
k.metadata = meta
if meta.IsStore {
k.eventStoreClient = newKubeMQEventsStore(k.logger)
return k.eventStoreClient.Init(meta)
} else {
k.eventsClient = newkubeMQEvents(k.logger)
return k.eventsClient.Init(meta)
}
}
func (k *kubeMQ) Features() []pubsub.Feature {
return nil
}
func (k *kubeMQ) Publish(_ context.Context, req *pubsub.PublishRequest) error {
if k.metadata.IsStore {
return k.eventStoreClient.Publish(req)
} else {
return k.eventsClient.Publish(req)
}
}
func (k *kubeMQ) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
if k.metadata.IsStore {
return k.eventStoreClient.Subscribe(ctx, req, handler)
} else {
return k.eventsClient.Subscribe(ctx, req, handler)
}
}
func (k *kubeMQ) Close() error {
if k.metadata.IsStore {
return k.eventStoreClient.Close()
} else {
return k.eventsClient.Close()
}
}
func getRandomID() string {
randomUUID, err := uuid.NewRandom()
if err != nil {
return strconv.FormatInt(time.Now().UnixNano(), 10)
}
return randomUUID.String()
}
// GetComponentMetadata returns the metadata of the component.
func (k *kubeMQ) GetComponentMetadata() (metadataInfo contribMetadata.MetadataMap) {
metadataStruct := &kubemqMetadata{}
contribMetadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, contribMetadata.PubSubType)
return
}