180 lines
4.2 KiB
Go
180 lines
4.2 KiB
Go
package kubemq
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"reflect"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
qs "github.com/kubemq-io/kubemq-go/queues_stream"
|
|
|
|
"github.com/dapr/components-contrib/bindings"
|
|
"github.com/dapr/components-contrib/metadata"
|
|
"github.com/dapr/kit/logger"
|
|
)
|
|
|
|
// interface used to allow unit testing.
|
|
type Kubemq interface {
|
|
bindings.InputBinding
|
|
bindings.OutputBinding
|
|
}
|
|
|
|
type kubeMQ struct {
|
|
client *qs.QueuesStreamClient
|
|
opts *options
|
|
logger logger.Logger
|
|
closed atomic.Bool
|
|
closeCh chan struct{}
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
func NewKubeMQ(logger logger.Logger) Kubemq {
|
|
return &kubeMQ{
|
|
client: nil,
|
|
opts: nil,
|
|
logger: logger,
|
|
closeCh: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
func (k *kubeMQ) Init(ctx context.Context, metadata bindings.Metadata) error {
|
|
opts, err := createOptions(metadata)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
k.opts = opts
|
|
client, err := qs.NewQueuesStreamClient(ctx,
|
|
qs.WithAddress(opts.internalHost, opts.internalPort),
|
|
qs.WithCheckConnection(true),
|
|
qs.WithAuthToken(opts.AuthToken),
|
|
qs.WithAutoReconnect(true),
|
|
qs.WithReconnectInterval(time.Second))
|
|
if err != nil {
|
|
k.logger.Errorf("error init kubemq client error: %s", err.Error())
|
|
return err
|
|
}
|
|
k.client = client
|
|
return nil
|
|
}
|
|
|
|
func (k *kubeMQ) Read(ctx context.Context, handler bindings.Handler) error {
|
|
if k.closed.Load() {
|
|
return errors.New("binding is closed")
|
|
}
|
|
k.wg.Add(2)
|
|
processCtx, cancel := context.WithCancel(ctx)
|
|
go func() {
|
|
defer k.wg.Done()
|
|
defer cancel()
|
|
select {
|
|
case <-k.closeCh:
|
|
case <-processCtx.Done():
|
|
}
|
|
}()
|
|
go func() {
|
|
defer k.wg.Done()
|
|
for {
|
|
err := k.processQueueMessage(processCtx, handler)
|
|
if err != nil {
|
|
k.logger.Error(err.Error())
|
|
}
|
|
// If context cancelled or kubeMQ closed, exit. Otherwise, continue
|
|
// after a second.
|
|
select {
|
|
case <-time.After(time.Second):
|
|
continue
|
|
case <-processCtx.Done():
|
|
}
|
|
return
|
|
}
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
func (k *kubeMQ) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
|
queueMessage := qs.NewQueueMessage().
|
|
SetChannel(k.opts.Channel).
|
|
SetBody(req.Data).
|
|
SetPolicyDelaySeconds(parsePolicyDelaySeconds(req.Metadata)).
|
|
SetPolicyExpirationSeconds(parsePolicyExpirationSeconds(req.Metadata)).
|
|
SetPolicyMaxReceiveCount(parseSetPolicyMaxReceiveCount(req.Metadata)).
|
|
SetPolicyMaxReceiveQueue(parsePolicyMaxReceiveQueue(req.Metadata))
|
|
result, err := k.client.Send(ctx, queueMessage)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(result.Results) > 0 {
|
|
if result.Results[0].GetIsError() {
|
|
return nil, fmt.Errorf("error sending queue message: %s", result.Results[0].GetError())
|
|
}
|
|
}
|
|
return &bindings.InvokeResponse{
|
|
Data: nil,
|
|
Metadata: nil,
|
|
}, nil
|
|
}
|
|
|
|
func (k *kubeMQ) Operations() []bindings.OperationKind {
|
|
return []bindings.OperationKind{bindings.CreateOperation}
|
|
}
|
|
|
|
func (k *kubeMQ) Close() error {
|
|
if k.closed.CompareAndSwap(false, true) {
|
|
close(k.closeCh)
|
|
}
|
|
defer k.wg.Wait()
|
|
return k.client.Close()
|
|
}
|
|
|
|
func (k *kubeMQ) processQueueMessage(ctx context.Context, handler bindings.Handler) error {
|
|
pr := qs.NewPollRequest().
|
|
SetChannel(k.opts.Channel).
|
|
SetMaxItems(k.opts.PollMaxItems).
|
|
SetWaitTimeout(k.opts.PollTimeoutSeconds).
|
|
SetAutoAck(k.opts.AutoAcknowledged)
|
|
|
|
pollResp, err := k.client.Poll(ctx, pr)
|
|
if err != nil {
|
|
if strings.Contains(err.Error(), "timout waiting response") {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
if !pollResp.HasMessages() {
|
|
return nil
|
|
}
|
|
|
|
for _, message := range pollResp.Messages {
|
|
_, err := handler(ctx, &bindings.ReadResponse{
|
|
Data: message.Body,
|
|
})
|
|
if err != nil {
|
|
k.logger.Errorf("error received from response handler: %s", err.Error())
|
|
err := message.NAck()
|
|
if err != nil {
|
|
k.logger.Errorf("error processing nack message error: %s", err.Error())
|
|
}
|
|
time.Sleep(time.Second)
|
|
continue
|
|
} else {
|
|
err := message.Ack()
|
|
if err != nil {
|
|
k.logger.Errorf("error processing ack queue message error: %s", err.Error())
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetComponentMetadata returns the metadata of the component.
|
|
func (k *kubeMQ) GetComponentMetadata() (metadataInfo metadata.MetadataMap) {
|
|
metadataStruct := options{}
|
|
metadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, metadata.BindingType)
|
|
return
|
|
}
|