Rabbitmq pubsub add client name (#2933)

Signed-off-by: zhangchao <zchao9100@gmail.com>
Co-authored-by: Bernd Verst <github@bernd.dev>
This commit is contained in:
Taction 2023-09-06 06:14:17 +08:00 committed by GitHub
parent 7dbfd40327
commit 0c2ce324b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 59 additions and 9 deletions

View File

@ -46,6 +46,8 @@ type rabbitmqMetadata struct {
MaxLen int64 `mapstructure:"maxLen"`
MaxLenBytes int64 `mapstructure:"maxLenBytes"`
ExchangeKind string `mapstructure:"exchangeKind"`
ClientName string `mapstructure:"clientName"`
HeartBeat time.Duration `mapstructure:"heartBeat"`
PublisherConfirm bool `mapstructure:"publisherConfirm"`
SaslExternal bool `mapstructure:"saslExternal"`
Concurrency pubsub.ConcurrencyMode `mapstructure:"concurrency"`
@ -77,6 +79,8 @@ const (
metadataPublisherConfirmKey = "publisherConfirm"
metadataSaslExternal = "saslExternal"
metadataMaxPriority = "maxPriority"
metadataClientNameKey = "clientName"
metadataHeartBeatKey = "heartBeat"
metadataQueueNameKey = "queueName"
defaultReconnectWaitSeconds = 3
@ -97,6 +101,7 @@ func createMetadata(pubSubMetadata pubsub.Metadata, log logger.Logger) (*rabbitm
ExchangeKind: fanoutExchangeKind,
PublisherConfirm: false,
SaslExternal: false,
HeartBeat: defaultHeartbeat,
}
// upgrade metadata

View File

@ -19,6 +19,7 @@ import (
"fmt"
"strings"
"testing"
"time"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/stretchr/testify/assert"
@ -141,6 +142,42 @@ func TestCreateMetadata(t *testing.T) {
assert.Equal(t, uint8(2), m.DeliveryMode)
})
t.Run("client name is set", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
fakeMetaData.Properties[metadataClientNameKey] = "fakeclientname"
// act
m, err := createMetadata(fakeMetaData, log)
// assert
assert.NoError(t, err)
assert.Equal(t, fakeProperties[metadataHostnameKey], m.Hostname)
assert.Equal(t, fakeProperties[metadataConsumerIDKey], m.ConsumerID)
assert.Equal(t, "fakeclientname", m.ClientName)
})
t.Run("heart beat is set", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
fakeMetaData.Properties[metadataHeartBeatKey] = "1m"
// act
m, err := createMetadata(fakeMetaData, log)
// assert
assert.NoError(t, err)
assert.Equal(t, fakeProperties[metadataHostnameKey], m.Hostname)
assert.Equal(t, fakeProperties[metadataConsumerIDKey], m.ConsumerID)
assert.Equal(t, time.Minute, m.HeartBeat)
})
t.Run("disable durable mode, disable delete when unused", func(t *testing.T) {
fakeProperties := getFakeProperties()

View File

@ -45,12 +45,15 @@ const (
publishMaxRetries = 3
publishRetryWaitSeconds = 2
defaultHeartbeat = 10 * time.Second
defaultLocale = "en_US"
argQueueMode = "x-queue-mode"
argMaxLength = "x-max-length"
argMaxLengthBytes = "x-max-length-bytes"
argDeadLetterExchange = "x-dead-letter-exchange"
argMaxPriority = "x-max-priority"
propertyClientName = "connection_name"
queueModeLazy = "lazy"
reqMetadataRoutingKey = "routingKey"
reqMetadataQueueTypeKey = "queueType" // at the moment, only supporting classic and quorum queues
@ -67,7 +70,7 @@ type rabbitMQ struct {
metadata *rabbitmqMetadata
declaredExchanges map[string]bool
connectionDial func(protocol, uri string, tlsCfg *tls.Config, externalSasl bool) (rabbitMQConnectionBroker, rabbitMQChannelBroker, error)
connectionDial func(protocol, uri, clientName string, heartBeat time.Duration, tlsCfg *tls.Config, externalSasl bool) (rabbitMQConnectionBroker, rabbitMQChannelBroker, error)
closeCh chan struct{}
closed atomic.Bool
wg sync.WaitGroup
@ -108,22 +111,27 @@ func NewRabbitMQ(logger logger.Logger) pubsub.PubSub {
}
}
func dial(protocol, uri string, tlsCfg *tls.Config, externalSasl bool) (rabbitMQConnectionBroker, rabbitMQChannelBroker, error) {
func dial(protocol, uri, clientName string, heartBeat time.Duration, tlsCfg *tls.Config, externalSasl bool) (rabbitMQConnectionBroker, rabbitMQChannelBroker, error) {
var (
conn *amqp.Connection
ch *amqp.Channel
err error
cfg = amqp.Config{Heartbeat: heartBeat, Locale: defaultLocale} // use default locale of amqp091-go
)
if len(clientName) > 0 {
cfg.Properties = map[string]interface{}{
propertyClientName: clientName,
}
}
if protocol == protocolAMQPS {
cfg.TLSClientConfig = tlsCfg
if externalSasl {
conn, err = amqp.DialTLS_ExternalAuth(uri, tlsCfg)
} else {
conn, err = amqp.DialTLS(uri, tlsCfg)
cfg.SASL = []amqp.Authentication{&amqp.ExternalAuth{}}
}
} else {
conn, err = amqp.Dial(uri)
}
conn, err = amqp.DialConfig(uri, cfg)
if err != nil {
return nil, nil, err
}
@ -180,7 +188,7 @@ func (r *rabbitMQ) reconnect(connectionCount int) error {
return err
}
r.connection, r.channel, err = r.connectionDial(r.metadata.internalProtocol, r.metadata.connectionURI(), tlsCfg, r.metadata.SaslExternal)
r.connection, r.channel, err = r.connectionDial(r.metadata.internalProtocol, r.metadata.connectionURI(), r.metadata.ClientName, r.metadata.HeartBeat, tlsCfg, r.metadata.SaslExternal)
if err != nil {
r.reset()

View File

@ -40,7 +40,7 @@ func newRabbitMQTest(broker *rabbitMQInMemoryBroker) *rabbitMQ {
return &rabbitMQ{
declaredExchanges: make(map[string]bool),
logger: logger.NewLogger("test"),
connectionDial: func(protocol, uri string, tlsCfg *tls.Config, externalSasl bool) (rabbitMQConnectionBroker, rabbitMQChannelBroker, error) {
connectionDial: func(protocol, uri, clientName string, heartBeat time.Duration, tlsCfg *tls.Config, externalSasl bool) (rabbitMQConnectionBroker, rabbitMQChannelBroker, error) {
broker.connectCount.Add(1)
return broker, broker, nil
},