Update KubeMQ components (Fix panic and small production issues) (#2821)

Signed-off-by: Lior Nabat <lior.nabat@kubemq.io>
Signed-off-by: Lior Nabat <lior.nabat@gmail.clom>
Co-authored-by: Lior Nabat <lior.nabat@gmail.clom>
This commit is contained in:
KubeMQ 2023-05-01 20:53:23 +03:00 committed by GitHub
parent 20edac1b82
commit 9e5e142db9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 77 additions and 15 deletions

View File

@ -35,12 +35,11 @@ func (k *kubeMQ) Init(_ context.Context, metadata pubsub.Metadata) error {
k.metadata = meta
if meta.IsStore {
k.eventStoreClient = newKubeMQEventsStore(k.logger)
_ = k.eventStoreClient.Init(meta)
return k.eventStoreClient.Init(meta)
} else {
k.eventsClient = newkubeMQEvents(k.logger)
_ = k.eventsClient.Init(meta)
return k.eventsClient.Init(meta)
}
return nil
}
func (k *kubeMQ) Features() []pubsub.Feature {

View File

@ -2,6 +2,8 @@ package kubemq
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
@ -101,14 +103,31 @@ func (k *kubeMQEvents) Publish(req *pubsub.PublishRequest) error {
if err := k.init(); err != nil {
return err
}
if req.Topic == "" {
return fmt.Errorf("kubemq pub/sub error: topic is required")
}
metadata := ""
if req.Metadata != nil {
data, err := json.Marshal(req.Metadata)
if err != nil {
return fmt.Errorf("kubemq pub/sub error: failed to marshal metadata: %s", err.Error())
}
metadata = string(data)
}
contentType := ""
if req.ContentType != nil {
contentType = *req.ContentType
}
k.logger.Debugf("kubemq pub/sub: publishing message to %s", req.Topic)
event := &kubemq.Event{
Id: "",
Channel: req.Topic,
Metadata: "",
Metadata: metadata,
Body: req.Data,
ClientId: k.metadata.ClientID,
Tags: map[string]string{},
Tags: map[string]string{
"ContentType": contentType,
},
}
if err := k.publishFunc(event); err != nil {
k.logger.Errorf("kubemq pub/sub error: publishing to %s failed with error: %s", req.Topic, err.Error())
@ -142,9 +161,21 @@ func (k *kubeMQEvents) Subscribe(ctx context.Context, req pubsub.SubscribeReques
if ctx.Err() != nil {
return
}
var metadata map[string]string
if event.Metadata != "" {
_ = json.Unmarshal([]byte(event.Metadata), &metadata)
}
var contentType *string
if event.Tags != nil {
if event.Tags["ContentType"] != "" {
*contentType = event.Tags["ContentType"]
}
}
msg := &pubsub.NewMessage{
Data: event.Body,
Topic: req.Topic,
Data: event.Body,
Topic: req.Topic,
Metadata: metadata,
ContentType: contentType,
}
if err := handler(k.ctx, msg); err != nil {
@ -173,5 +204,8 @@ func (k *kubeMQEvents) Close() error {
if k.ctxCancel != nil {
k.ctxCancel()
}
if k.client == nil {
return nil
}
return k.client.Close()
}

View File

@ -2,6 +2,7 @@ package kubemq
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
@ -39,7 +40,7 @@ func newKubeMQEventsStore(logger logger.Logger) *kubeMQEventStore {
logger: logger,
publishFunc: nil,
resultChan: make(chan *kubemq.EventStoreResult, 1),
waitForResultTimeout: 60 * time.Second,
waitForResultTimeout: 10 * time.Second,
ctx: nil,
ctxCancel: nil,
isInitialized: false,
@ -106,14 +107,31 @@ func (k *kubeMQEventStore) Publish(req *pubsub.PublishRequest) error {
if err := k.init(); err != nil {
return err
}
if req.Topic == "" {
return fmt.Errorf("kubemq pub/sub error: topic is required")
}
k.logger.Debugf("kubemq pub/sub: publishing message to %s", req.Topic)
metadata := ""
if req.Metadata != nil {
data, err := json.Marshal(req.Metadata)
if err != nil {
return fmt.Errorf("kubemq pub/sub error: failed to marshal metadata: %s", err.Error())
}
metadata = string(data)
}
contentType := ""
if req.ContentType != nil {
contentType = *req.ContentType
}
event := &kubemq.EventStore{
Id: "",
Channel: req.Topic,
Metadata: "",
Metadata: metadata,
Body: req.Data,
ClientId: k.metadata.ClientID,
Tags: map[string]string{},
Tags: map[string]string{
"ContentType": contentType,
},
}
if err := k.publishFunc(event); err != nil {
k.logger.Errorf("kubemq pub/sub error: publishing to %s failed with error: %s", req.Topic, err.Error())
@ -121,7 +139,7 @@ func (k *kubeMQEventStore) Publish(req *pubsub.PublishRequest) error {
}
select {
case res := <-k.resultChan:
if res.Err != nil {
if res != nil && res.Err != nil {
return res.Err
}
case <-time.After(k.waitForResultTimeout):
@ -142,7 +160,6 @@ func (k *kubeMQEventStore) Subscribe(ctx context.Context, req pubsub.SubscribeRe
if clientID == "" {
clientID = getRandomID()
}
k.logger.Debugf("kubemq pub/sub: subscribing to %s", req.Topic)
err := k.client.Subscribe(ctx, &kubemq.EventsStoreSubscription{
Channel: req.Topic,
@ -157,13 +174,22 @@ func (k *kubeMQEventStore) Subscribe(ctx context.Context, req pubsub.SubscribeRe
if ctx.Err() != nil {
return
}
var metadata map[string]string
if event.Metadata != "" {
_ = json.Unmarshal([]byte(event.Metadata), &metadata)
}
var contentType *string
if event.Tags != nil {
if event.Tags["ContentType"] != "" {
*contentType = event.Tags["ContentType"]
}
}
msg := &pubsub.NewMessage{
Data: event.Body,
Topic: req.Topic,
Metadata: nil,
ContentType: nil,
Metadata: metadata,
ContentType: contentType,
}
if err := handler(ctx, msg); err != nil {
k.logger.Errorf("kubemq pub/sub error: error handling message from topic '%s', %s, resending...", req.Topic, err.Error())
if k.metadata.DisableReDelivery {
@ -190,5 +216,8 @@ func (k *kubeMQEventStore) Close() error {
if k.ctxCancel != nil {
k.ctxCancel()
}
if k.client == nil {
return nil
}
return k.client.Close()
}