fix 'durable' setting in RabbitMQ (#1183)

This commit is contained in:
Dmitry Shmulevich 2021-10-01 17:27:36 -07:00 committed by GitHub
parent a9cf218c04
commit 494596be0e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 15 additions and 4 deletions

View File

@ -11,6 +11,7 @@ import (
type metadata struct { type metadata struct {
consumerID string consumerID string
host string host string
durable bool
deleteWhenUnused bool deleteWhenUnused bool
autoAck bool autoAck bool
requeueInFailure bool requeueInFailure bool
@ -23,6 +24,7 @@ type metadata struct {
// createMetadata creates a new instance from the pubsub metadata. // createMetadata creates a new instance from the pubsub metadata.
func createMetadata(pubSubMetadata pubsub.Metadata) (*metadata, error) { func createMetadata(pubSubMetadata pubsub.Metadata) (*metadata, error) {
result := metadata{ result := metadata{
durable: true,
deleteWhenUnused: true, deleteWhenUnused: true,
autoAck: false, autoAck: false,
reconnectWait: time.Duration(defaultReconnectWaitSeconds) * time.Second, reconnectWait: time.Duration(defaultReconnectWaitSeconds) * time.Second,
@ -47,6 +49,12 @@ func createMetadata(pubSubMetadata pubsub.Metadata) (*metadata, error) {
} }
} }
if val, found := pubSubMetadata.Properties[metadataDurable]; found && val != "" {
if boolVal, err := strconv.ParseBool(val); err == nil {
result.durable = boolVal
}
}
if val, found := pubSubMetadata.Properties[metadataDeleteWhenUnusedKey]; found && val != "" { if val, found := pubSubMetadata.Properties[metadataDeleteWhenUnusedKey]; found && val != "" {
if boolVal, err := strconv.ParseBool(val); err == nil { if boolVal, err := strconv.ParseBool(val); err == nil {
result.deleteWhenUnused = boolVal result.deleteWhenUnused = boolVal
@ -71,7 +79,7 @@ func createMetadata(pubSubMetadata pubsub.Metadata) (*metadata, error) {
} }
} }
if val, found := pubSubMetadata.Properties[metadataprefetchCount]; found && val != "" { if val, found := pubSubMetadata.Properties[metadataPrefetchCount]; found && val != "" {
if intVal, err := strconv.Atoi(val); err == nil { if intVal, err := strconv.Atoi(val); err == nil {
result.prefetchCount = uint8(intVal) result.prefetchCount = uint8(intVal)
} }

View File

@ -127,7 +127,7 @@ func TestCreateMetadata(t *testing.T) {
fakeMetaData := pubsub.Metadata{ fakeMetaData := pubsub.Metadata{
Properties: fakeProperties, Properties: fakeProperties,
} }
fakeMetaData.Properties[metadataprefetchCount] = "1" fakeMetaData.Properties[metadataPrefetchCount] = "1"
// act // act
m, err := createMetadata(fakeMetaData) m, err := createMetadata(fakeMetaData)
@ -206,6 +206,7 @@ func TestCreateMetadata(t *testing.T) {
fakeMetaData := pubsub.Metadata{ fakeMetaData := pubsub.Metadata{
Properties: fakeProperties, Properties: fakeProperties,
} }
fakeMetaData.Properties[metadataDurable] = tt.in
// act // act
m, err := createMetadata(fakeMetaData) m, err := createMetadata(fakeMetaData)
@ -214,6 +215,7 @@ func TestCreateMetadata(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, fakeProperties[metadataHostKey], m.host) assert.Equal(t, fakeProperties[metadataHostKey], m.host)
assert.Equal(t, fakeProperties[metadataConsumerIDKey], m.consumerID) assert.Equal(t, fakeProperties[metadataConsumerIDKey], m.consumerID)
assert.Equal(t, tt.expected, m.durable)
}) })
} }

View File

@ -24,6 +24,7 @@ const (
metadataHostKey = "host" metadataHostKey = "host"
metadataConsumerIDKey = "consumerID" metadataConsumerIDKey = "consumerID"
metadataDurable = "durable"
metadataDeleteWhenUnusedKey = "deletedWhenUnused" metadataDeleteWhenUnusedKey = "deletedWhenUnused"
metadataAutoAckKey = "autoAck" metadataAutoAckKey = "autoAck"
metadataDeliveryModeKey = "deliveryMode" metadataDeliveryModeKey = "deliveryMode"
@ -31,7 +32,7 @@ const (
metadataReconnectWaitSeconds = "reconnectWaitSeconds" metadataReconnectWaitSeconds = "reconnectWaitSeconds"
defaultReconnectWaitSeconds = 10 defaultReconnectWaitSeconds = 10
metadataprefetchCount = "prefetchCount" metadataPrefetchCount = "prefetchCount"
) )
// RabbitMQ allows sending/receiving messages in pub/sub format. // RabbitMQ allows sending/receiving messages in pub/sub format.
@ -228,7 +229,7 @@ func (r *rabbitMQ) prepareSubscription(channel rabbitMQChannelBroker, req pubsub
} }
r.logger.Debugf("%s declaring queue '%s'", logMessagePrefix, queueName) r.logger.Debugf("%s declaring queue '%s'", logMessagePrefix, queueName)
q, err := channel.QueueDeclare(queueName, true, r.metadata.deleteWhenUnused, false, false, nil) q, err := channel.QueueDeclare(queueName, r.metadata.durable, r.metadata.deleteWhenUnused, false, false, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }