components-contrib/pubsub/rabbitmq/rabbitmq.go

748 lines
23 KiB
Go

/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rabbitmq
import (
"context"
"crypto/tls"
"errors"
"fmt"
"math"
"reflect"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
kitstrings "github.com/dapr/kit/strings"
)
const (
fanoutExchangeKind = "fanout"
logMessagePrefix = "rabbitmq pub/sub:"
errorMessagePrefix = "rabbitmq pub/sub error:"
errorChannelNotInitialized = "channel not initialized"
errorChannelConnection = "channel/connection is not open"
errorInvalidQueueType = "invalid queue type"
defaultDeadLetterExchangeFormat = "dlx-%s"
defaultDeadLetterQueueFormat = "dlq-%s"
publishMaxRetries = 3
publishRetryWaitSeconds = 2
defaultHeartbeat = 10 * time.Second
defaultLocale = "en_US"
argQueueMode = "x-queue-mode"
argMaxLength = "x-max-length"
argMaxLengthBytes = "x-max-length-bytes"
argDeadLetterExchange = "x-dead-letter-exchange"
argMaxPriority = "x-max-priority"
argSingleActiveConsumer = "x-single-active-consumer"
propertyClientName = "connection_name"
queueModeLazy = "lazy"
reqMetadataRoutingKey = "routingKey"
reqMetadataQueueTypeKey = "queueType" // at the moment, only supporting classic and quorum queues
reqMetadataSingleActiveConsumerKey = "singleActiveConsumer"
reqMetadataMaxLenKey = "maxLen"
reqMetadataMaxLenBytesKey = "maxLenBytes"
)
// RabbitMQ allows sending/receiving messages in pub/sub format.
type rabbitMQ struct {
connection rabbitMQConnectionBroker
channel rabbitMQChannelBroker
channelMutex sync.RWMutex
connectionCount int
metadata *rabbitmqMetadata
declaredExchanges map[string]bool
connectionDial func(protocol, uri, clientName string, heartBeat time.Duration, tlsCfg *tls.Config, externalSasl bool) (rabbitMQConnectionBroker, rabbitMQChannelBroker, error)
closeCh chan struct{}
closed atomic.Bool
wg sync.WaitGroup
logger logger.Logger
}
// interface used to allow unit testing.
//
//nolint:interfacebloat
type rabbitMQChannelBroker interface {
PublishWithContext(ctx context.Context, exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing) error
PublishWithDeferredConfirmWithContext(ctx context.Context, exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing) (*amqp.DeferredConfirmation, error)
QueueDeclare(name string, durable bool, autoDelete bool, exclusive bool, noWait bool, args amqp.Table) (amqp.Queue, error)
QueueBind(name string, key string, exchange string, noWait bool, args amqp.Table) error
Consume(queue string, consumer string, autoAck bool, exclusive bool, noLocal bool, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)
Nack(tag uint64, multiple bool, requeue bool) error
Ack(tag uint64, multiple bool) error
ExchangeDeclare(name string, kind string, durable bool, autoDelete bool, internal bool, noWait bool, args amqp.Table) error
Qos(prefetchCount, prefetchSize int, global bool) error
Confirm(noWait bool) error
Close() error
IsClosed() bool
}
// interface used to allow unit testing.
type rabbitMQConnectionBroker interface {
Close() error
}
// NewRabbitMQ creates a new RabbitMQ pub/sub.
func NewRabbitMQ(logger logger.Logger) pubsub.PubSub {
return &rabbitMQ{
declaredExchanges: make(map[string]bool),
logger: logger,
connectionDial: dial,
closeCh: make(chan struct{}),
}
}
func dial(protocol, uri, clientName string, heartBeat time.Duration, tlsCfg *tls.Config, externalSasl bool) (rabbitMQConnectionBroker, rabbitMQChannelBroker, error) {
var (
conn *amqp.Connection
ch *amqp.Channel
err error
cfg = amqp.Config{Heartbeat: heartBeat, Locale: defaultLocale} // use default locale of amqp091-go
)
if len(clientName) > 0 {
cfg.Properties = map[string]interface{}{
propertyClientName: clientName,
}
}
if protocol == protocolAMQPS {
cfg.TLSClientConfig = tlsCfg
if externalSasl {
cfg.SASL = []amqp.Authentication{&amqp.ExternalAuth{}}
}
}
conn, err = amqp.DialConfig(uri, cfg)
if err != nil {
return nil, nil, err
}
ch, err = conn.Channel()
if err != nil {
conn.Close()
return nil, nil, err
}
return conn, ch, nil
}
// Init does metadata parsing and connection creation.
func (r *rabbitMQ) Init(_ context.Context, metadata pubsub.Metadata) error {
meta, err := createMetadata(metadata, r.logger)
if err != nil {
return err
}
r.metadata = meta
if err := r.reconnect(0); err != nil {
return err
}
return nil
}
func (r *rabbitMQ) reconnect(connectionCount int) error {
r.channelMutex.Lock()
defer r.channelMutex.Unlock()
if r.isStopped() {
// Do not reconnect on stopped service.
return errors.New("cannot connect after component is stopped")
}
r.logger.Infof("%s connectionCount: current=%d reference=%d", logMessagePrefix, r.connectionCount, connectionCount)
if connectionCount != r.connectionCount {
// Reconnection request is old.
r.logger.Infof("%s stale reconnect attempt", logMessagePrefix)
return nil
}
err := r.reset()
if err != nil {
return err
}
tlsCfg, err := pubsub.ConvertTLSPropertiesToTLSConfig(r.metadata.TLSProperties)
if err != nil {
return err
}
r.connection, r.channel, err = r.connectionDial(r.metadata.internalProtocol, r.metadata.connectionURI(), r.metadata.ClientName, r.metadata.HeartBeat, tlsCfg, r.metadata.SaslExternal)
if err != nil {
r.reset()
return err
}
if r.metadata.PublisherConfirm {
err = r.channel.Confirm(false)
if err != nil {
r.reset()
return err
}
}
r.connectionCount++
r.logger.Infof("%s connected with connectionCount=%d", logMessagePrefix, r.connectionCount)
return nil
}
func (r *rabbitMQ) publishSync(ctx context.Context, req *pubsub.PublishRequest) (rabbitMQChannelBroker, int, error) {
r.channelMutex.Lock()
defer r.channelMutex.Unlock()
if r.channel == nil {
return r.channel, r.connectionCount, errors.New(errorChannelNotInitialized)
}
if err := r.ensureExchangeDeclared(r.channel, req.Topic, r.metadata.ExchangeKind, r.metadata.Durable, r.metadata.DeleteWhenUnused); err != nil {
r.logger.Errorf("%s publishing to %s failed in ensureExchangeDeclared: %v", logMessagePrefix, req.Topic, err)
return r.channel, r.connectionCount, err
}
routingKey := ""
if val, ok := req.Metadata[reqMetadataRoutingKey]; ok && val != "" {
routingKey = val
}
ttl, ok, err := metadata.TryGetTTL(req.Metadata)
if err != nil {
r.logger.Warnf("%s publishing to %s failed to parse TryGetTTL: %v, it is ignored.", logMessagePrefix, req.Topic, err)
}
var expiration string
if ok {
// RabbitMQ expects the duration in ms
expiration = strconv.FormatInt(ttl.Milliseconds(), 10)
} else if r.metadata.DefaultQueueTTL != nil {
expiration = strconv.FormatInt(r.metadata.DefaultQueueTTL.Milliseconds(), 10)
}
p := amqp.Publishing{
ContentType: "text/plain",
Body: req.Data,
DeliveryMode: r.metadata.DeliveryMode,
Expiration: expiration,
}
priority, ok, err := metadata.TryGetPriority(req.Metadata)
if err != nil {
r.logger.Warnf("%s publishing to %s failed to parse priority: %v, it is ignored.", logMessagePrefix, req.Topic, err)
}
if ok {
p.Priority = priority
}
confirm, err := r.channel.PublishWithDeferredConfirmWithContext(ctx, req.Topic, routingKey, false, false, p)
if err != nil {
r.logger.Errorf("%s publishing to %s failed in channel.Publish: %v", logMessagePrefix, req.Topic, err)
return r.channel, r.connectionCount, err
}
// confirm will be nil if are not requesting publish confirmations
if confirm != nil {
// Blocks until the server confirms
ok := confirm.Wait()
if !ok {
err = errors.New("did not receive confirmation of publishing")
r.logger.Errorf("%s publishing to %s failed: %v", logMessagePrefix, req.Topic, err)
}
}
return r.channel, r.connectionCount, nil
}
func (r *rabbitMQ) Publish(ctx context.Context, req *pubsub.PublishRequest) error {
if r.closed.Load() {
return errors.New("component is closed")
}
r.logger.Debugf("%s publishing message to %s", logMessagePrefix, req.Topic)
attempt := 0
for {
attempt++
channel, connectionCount, err := r.publishSync(ctx, req)
if err == nil {
return nil
}
if attempt >= publishMaxRetries {
r.logger.Errorf("%s publishing failed: %v", logMessagePrefix, err)
return err
}
if mustReconnect(channel, err) {
r.logger.Warnf("%s publisher is reconnecting in %s ...", logMessagePrefix, r.metadata.ReconnectWait.String())
select {
case <-time.After(r.metadata.ReconnectWait):
case <-ctx.Done():
return nil
}
r.reconnect(connectionCount)
} else {
r.logger.Warnf("%s publishing attempt (%d/%d) failed: %v", logMessagePrefix, attempt, publishMaxRetries, err)
select {
case <-time.After(publishRetryWaitSeconds * time.Second):
case <-ctx.Done():
return nil
}
}
}
}
func (r *rabbitMQ) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
if r.closed.Load() {
return errors.New("component is closed")
}
queueName := req.Metadata[metadataQueueNameKey]
if queueName == "" {
if r.metadata.ConsumerID == "" {
return errors.New("consumerID is required for subscriptions that don't specify a queue name")
}
queueName = fmt.Sprintf("%s-%s", r.metadata.ConsumerID, req.Topic)
}
r.logger.Infof("%s subscribe to topic/queue '%s/%s'", logMessagePrefix, req.Topic, queueName)
// Do not set a timeout on the context, as we're just waiting for the first ack; we're using a semaphore instead
ackCh := make(chan bool, 1)
defer close(ackCh)
subctx, cancel := context.WithCancel(ctx)
r.wg.Add(2)
go func() {
defer r.wg.Done()
r.subscribeForever(subctx, req, queueName, handler, ackCh)
}()
go func() {
defer r.wg.Done()
defer cancel()
select {
case <-subctx.Done():
case <-r.closeCh:
}
}()
// Wait for the ack for 1 minute or return an error
select {
case <-time.After(time.Minute):
return fmt.Errorf("failed to subscribe to %s", queueName)
case failed := <-ackCh:
if failed {
return fmt.Errorf("error not retriable for %s", queueName)
} else {
return nil
}
}
}
// this function call should be wrapped by channelMutex.
func (r *rabbitMQ) prepareSubscription(channel rabbitMQChannelBroker, req pubsub.SubscribeRequest, queueName string) (*amqp.Queue, error) {
err := r.ensureExchangeDeclared(channel, req.Topic, r.metadata.ExchangeKind, r.metadata.Durable, r.metadata.DeleteWhenUnused)
if err != nil {
r.logger.Errorf("%s prepareSubscription for topic/queue '%s/%s' failed in ensureExchangeDeclared: %v", logMessagePrefix, req.Topic, queueName, err)
return nil, err
}
r.logger.Infof("%s declaring queue '%s'", logMessagePrefix, queueName)
var args amqp.Table
if r.metadata.EnableDeadLetter {
// declare dead letter exchange
dlxName := fmt.Sprintf(defaultDeadLetterExchangeFormat, queueName)
dlqName := fmt.Sprintf(defaultDeadLetterQueueFormat, queueName)
// dead letter exchange is always durable
err = r.ensureExchangeDeclared(channel, dlxName, fanoutExchangeKind, true, r.metadata.DeleteWhenUnused)
if err != nil {
r.logger.Errorf("%s prepareSubscription for topic/queue '%s/%s' failed in ensureExchangeDeclared: %v", logMessagePrefix, req.Topic, dlqName, err)
return nil, err
}
var q amqp.Queue
dlqArgs := r.metadata.formatQueueDeclareArgs(nil)
// dead letter queue use lazy mode, keeping as many messages as possible on disk to reduce RAM usage
dlqArgs[argQueueMode] = queueModeLazy
q, err = channel.QueueDeclare(dlqName, true, r.metadata.DeleteWhenUnused, false, false, dlqArgs)
if err != nil {
r.logger.Errorf("%s prepareSubscription for topic/queue '%s/%s' failed in channel.QueueDeclare: %v", logMessagePrefix, req.Topic, dlqName, err)
return nil, err
}
err = channel.QueueBind(q.Name, "", dlxName, false, nil)
if err != nil {
r.logger.Errorf("%s prepareSubscription for topic/queue '%s/%s' failed in channel.QueueBind: %v", logMessagePrefix, req.Topic, dlqName, err)
return nil, err
}
r.logger.Infof("%s declared dead letter exchange for queue '%s' bind dead letter queue '%s' to dead letter exchange '%s'", logMessagePrefix, queueName, dlqName, dlxName)
args = amqp.Table{argDeadLetterExchange: dlxName}
}
args = r.metadata.formatQueueDeclareArgs(args)
// use priority queue if configured on subscription
if val, ok := req.Metadata[metadataMaxPriority]; ok && val != "" {
parsedVal, pErr := strconv.ParseUint(val, 10, 0)
if pErr != nil {
r.logger.Errorf("%s prepareSubscription error: can't parse maxPriority %s value on subscription metadata for topic/queue `%s/%s`: %s", logMessagePrefix, val, req.Topic, queueName, pErr)
return nil, pErr
}
mp := uint8(parsedVal)
if parsedVal > 255 {
mp = math.MaxUint8
}
args[argMaxPriority] = mp
}
// queue type is classic by default, but we allow user to create quorum queues if desired
if val := req.Metadata[reqMetadataQueueTypeKey]; val != "" {
if !queueTypeValid(val) {
return nil, fmt.Errorf("invalid queue type %s. Valid types are %s and %s", val, amqp.QueueTypeClassic, amqp.QueueTypeQuorum)
} else {
args[amqp.QueueTypeArg] = val
}
} else {
args[amqp.QueueTypeArg] = amqp.QueueTypeClassic
}
// Applying x-single-active-consumer if defined at subscription level
if val := req.Metadata[reqMetadataSingleActiveConsumerKey]; kitstrings.IsTruthy(val) {
args[argSingleActiveConsumer] = true
}
// Applying x-max-length-bytes if defined at subscription level
if val, ok := req.Metadata[reqMetadataMaxLenBytesKey]; ok && val != "" {
parsedVal, pErr := strconv.ParseUint(val, 10, 0)
if pErr != nil {
r.logger.Errorf("%s prepareSubscription error: can't parse %s value on subscription metadata for topic/queue `%s/%s`: %s", logMessagePrefix, argMaxLengthBytes, req.Topic, queueName, pErr)
return nil, pErr
}
args[argMaxLengthBytes] = parsedVal
}
// Applying x-max-length if defined at subscription level
if val, ok := req.Metadata[reqMetadataMaxLenKey]; ok && val != "" {
parsedVal, pErr := strconv.ParseUint(val, 10, 0)
if pErr != nil {
r.logger.Errorf("%s prepareSubscription error: can't parse %s value on subscription metadata for topic/queue `%s/%s`: %s", logMessagePrefix, argMaxLength, req.Topic, queueName, pErr)
return nil, pErr
}
args[argMaxLength] = parsedVal
}
q, err := channel.QueueDeclare(queueName, r.metadata.Durable, r.metadata.DeleteWhenUnused, false, false, args)
if err != nil {
r.logger.Errorf("%s prepareSubscription for topic/queue '%s/%s' failed in channel.QueueDeclare: %v", logMessagePrefix, req.Topic, queueName, err)
return nil, err
}
if r.metadata.PrefetchCount > 0 {
r.logger.Infof("%s setting prefetch count to %s", logMessagePrefix, strconv.Itoa(int(r.metadata.PrefetchCount)))
err = channel.Qos(int(r.metadata.PrefetchCount), 0, false)
if err != nil {
r.logger.Errorf("%s prepareSubscription for topic/queue '%s/%s' failed in channel.Qos: %v", logMessagePrefix, req.Topic, queueName, err)
return nil, err
}
}
metadataRoutingKey := ""
if val, ok := req.Metadata[reqMetadataRoutingKey]; ok && val != "" {
metadataRoutingKey = val
}
routingKeys := strings.Split(metadataRoutingKey, ",")
for i := range routingKeys {
routingKey := routingKeys[i]
r.logger.Debugf("%s binding queue '%s' to exchange '%s' with routing key '%s'", logMessagePrefix, q.Name, req.Topic, routingKey)
err = channel.QueueBind(q.Name, routingKey, req.Topic, false, nil)
if err != nil {
r.logger.Errorf("%s prepareSubscription for topic/queue '%s/%s' failed in channel.QueueBind: %v", logMessagePrefix, req.Topic, queueName, err)
return nil, err
}
}
return &q, nil
}
func (r *rabbitMQ) ensureSubscription(req pubsub.SubscribeRequest, queueName string) (rabbitMQChannelBroker, int, *amqp.Queue, error) {
r.channelMutex.RLock()
defer r.channelMutex.RUnlock()
if r.channel == nil {
return nil, r.connectionCount, nil, errors.New(errorChannelNotInitialized)
}
q, err := r.prepareSubscription(r.channel, req, queueName)
return r.channel, r.connectionCount, q, err
}
func (r *rabbitMQ) subscribeForever(ctx context.Context, req pubsub.SubscribeRequest, queueName string, handler pubsub.Handler, ackCh chan bool) {
for {
var (
err error
errFuncName string
connectionCount int
channel rabbitMQChannelBroker
q *amqp.Queue
msgs <-chan amqp.Delivery
)
for {
channel, connectionCount, q, err = r.ensureSubscription(req, queueName)
if err != nil {
errFuncName = "ensureSubscription"
break
}
msgs, err = channel.Consume(
q.Name,
queueName, // consumerID
r.metadata.AutoAck, // autoAck
false, // exclusive
false, // noLocal
false, // noWait
nil,
)
if err != nil {
errFuncName = "channel.Consume"
break
}
// one-time notification on successful subscribe
if ackCh != nil {
ackCh <- false
ackCh = nil
}
err = r.listenMessages(ctx, channel, msgs, req.Topic, handler)
if err != nil {
errFuncName = "listenMessages"
break
}
}
if strings.Contains(err.Error(), errorInvalidQueueType) {
ackCh <- true
return
}
if err == context.Canceled || err == context.DeadlineExceeded {
// Subscription context was canceled
r.logger.Infof("%s subscription for %s has context canceled", logMessagePrefix, queueName)
return
}
if r.isStopped() {
r.logger.Infof("%s subscriber for %s is stopped", logMessagePrefix, queueName)
return
}
// print the error if the subscriber is running.
if err != nil {
r.logger.Errorf("%s error in subscriber for %s in %s: %v", logMessagePrefix, queueName, errFuncName, err)
}
if mustReconnect(channel, err) {
r.logger.Warnf("%s subscriber is reconnecting in %s ...", logMessagePrefix, r.metadata.ReconnectWait.String())
select {
case <-time.After(r.metadata.ReconnectWait):
case <-ctx.Done():
r.logger.Infof("%s subscription for %s has context canceled", logMessagePrefix, queueName)
return
}
r.reconnect(connectionCount)
}
}
}
func (r *rabbitMQ) listenMessages(ctx context.Context, channel rabbitMQChannelBroker, msgCh <-chan amqp.Delivery, topic string, handler pubsub.Handler) error {
var err error
for {
select {
case <-ctx.Done():
return ctx.Err()
case d, more := <-msgCh:
// Handle case of channel closed
if !more {
r.logger.Debugf("%s subscriber channel closed for topic %s", logMessagePrefix, topic)
return nil
}
switch r.metadata.Concurrency {
case pubsub.Single:
err = r.handleMessage(ctx, d, topic, handler)
if err != nil && mustReconnect(channel, err) {
return err
}
case pubsub.Parallel:
r.wg.Add(1)
go func(d amqp.Delivery) {
defer r.wg.Done()
if err := r.handleMessage(ctx, d, topic, handler); err != nil {
r.logger.Errorf("%s error handling message: %v", logMessagePrefix, err)
}
}(d)
}
}
}
}
func (r *rabbitMQ) handleMessage(ctx context.Context, d amqp.Delivery, topic string, handler pubsub.Handler) error {
pubsubMsg := &pubsub.NewMessage{
Data: d.Body,
Topic: topic,
}
err := handler(ctx, pubsubMsg)
if err != nil {
r.logger.Errorf("%s handling message from topic '%s', %s", errorMessagePrefix, topic, err)
if !r.metadata.AutoAck {
// if message is not auto acked we need to ack/nack
r.logger.Debugf("%s nacking message '%s' from topic '%s', requeue=%t", logMessagePrefix, d.MessageId, topic, r.metadata.RequeueInFailure)
if err = d.Nack(false, r.metadata.RequeueInFailure); err != nil {
r.logger.Errorf("%s error nacking message '%s' from topic '%s', %s", logMessagePrefix, d.MessageId, topic, err)
}
}
} else if !r.metadata.AutoAck {
// if message is not auto acked we need to ack/nack
r.logger.Debugf("%s acking message '%s' from topic '%s'", logMessagePrefix, d.MessageId, topic)
if err = d.Ack(false); err != nil {
r.logger.Errorf("%s error acking message '%s' from topic '%s', %s", logMessagePrefix, d.MessageId, topic, err)
}
}
return err
}
// this function call should be wrapped by channelMutex.
func (r *rabbitMQ) ensureExchangeDeclared(channel rabbitMQChannelBroker, exchange, exchangeKind string, durable bool, autoDelete bool) error {
if !r.containsExchange(exchange) {
r.logger.Debugf("%s declaring exchange '%s' of kind '%s'", logMessagePrefix, exchange, exchangeKind)
err := channel.ExchangeDeclare(exchange, exchangeKind, durable, autoDelete, false, false, nil)
if err != nil {
r.logger.Errorf("%s ensureExchangeDeclared: channel.ExchangeDeclare failed: %v", logMessagePrefix, err)
return err
}
r.putExchange(exchange)
}
return nil
}
// this function call should be wrapped by channelMutex.
func (r *rabbitMQ) containsExchange(exchange string) bool {
_, exists := r.declaredExchanges[exchange]
return exists
}
// this function call should be wrapped by channelMutex.
func (r *rabbitMQ) putExchange(exchange string) {
r.declaredExchanges[exchange] = true
}
// this function call should be wrapped by channelMutex.
func (r *rabbitMQ) reset() (err error) {
if len(r.declaredExchanges) > 0 {
r.declaredExchanges = make(map[string]bool)
}
if r.channel != nil {
if err = r.channel.Close(); err != nil {
r.logger.Errorf("%s reset: channel.Close() failed: %v", logMessagePrefix, err)
}
r.channel = nil
}
if r.connection != nil {
if err2 := r.connection.Close(); err2 != nil {
r.logger.Errorf("%s reset: connection.Close() failed: %v", logMessagePrefix, err2)
if err == nil {
err = err2
}
}
r.connection = nil
}
return
}
func (r *rabbitMQ) isStopped() bool {
return r.closed.Load()
}
// Close closes the rabbitMQ connection. Blocks until all go routines are done.
func (r *rabbitMQ) Close() error {
r.channelMutex.Lock()
defer r.channelMutex.Unlock()
if r.closed.CompareAndSwap(false, true) {
close(r.closeCh)
}
defer r.wg.Wait()
return r.reset()
}
func (r *rabbitMQ) Features() []pubsub.Feature {
return []pubsub.Feature{pubsub.FeatureMessageTTL}
}
func mustReconnect(channel rabbitMQChannelBroker, err error) bool {
if channel == nil {
return true
}
if err == nil {
return false
}
return strings.Contains(err.Error(), errorChannelConnection)
}
// GetComponentMetadata returns the metadata of the component.
func (r *rabbitMQ) GetComponentMetadata() (metadataInfo metadata.MetadataMap) {
metadataStruct := rabbitmqMetadata{}
metadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, metadata.PubSubType)
return
}
func queueTypeValid(qType string) bool {
return qType == amqp.QueueTypeClassic || qType == amqp.QueueTypeQuorum
}