From 494596be0ee829eb8edfea497e6713f0f10d23a0 Mon Sep 17 00:00:00 2001 From: Dmitry Shmulevich Date: Fri, 1 Oct 2021 17:27:36 -0700 Subject: [PATCH] fix 'durable' setting in RabbitMQ (#1183) --- pubsub/rabbitmq/metadata.go | 10 +++++++++- pubsub/rabbitmq/metadata_test.go | 4 +++- pubsub/rabbitmq/rabbitmq.go | 5 +++-- 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/pubsub/rabbitmq/metadata.go b/pubsub/rabbitmq/metadata.go index c080c9ab1..cb83db8d7 100644 --- a/pubsub/rabbitmq/metadata.go +++ b/pubsub/rabbitmq/metadata.go @@ -11,6 +11,7 @@ import ( type metadata struct { consumerID string host string + durable bool deleteWhenUnused bool autoAck bool requeueInFailure bool @@ -23,6 +24,7 @@ type metadata struct { // createMetadata creates a new instance from the pubsub metadata. func createMetadata(pubSubMetadata pubsub.Metadata) (*metadata, error) { result := metadata{ + durable: true, deleteWhenUnused: true, autoAck: false, reconnectWait: time.Duration(defaultReconnectWaitSeconds) * time.Second, @@ -47,6 +49,12 @@ func createMetadata(pubSubMetadata pubsub.Metadata) (*metadata, error) { } } + if val, found := pubSubMetadata.Properties[metadataDurable]; found && val != "" { + if boolVal, err := strconv.ParseBool(val); err == nil { + result.durable = boolVal + } + } + if val, found := pubSubMetadata.Properties[metadataDeleteWhenUnusedKey]; found && val != "" { if boolVal, err := strconv.ParseBool(val); err == nil { result.deleteWhenUnused = boolVal @@ -71,7 +79,7 @@ func createMetadata(pubSubMetadata pubsub.Metadata) (*metadata, error) { } } - if val, found := pubSubMetadata.Properties[metadataprefetchCount]; found && val != "" { + if val, found := pubSubMetadata.Properties[metadataPrefetchCount]; found && val != "" { if intVal, err := strconv.Atoi(val); err == nil { result.prefetchCount = uint8(intVal) } diff --git a/pubsub/rabbitmq/metadata_test.go b/pubsub/rabbitmq/metadata_test.go index a7f4ee7a4..21aa36b29 100644 --- a/pubsub/rabbitmq/metadata_test.go +++ b/pubsub/rabbitmq/metadata_test.go @@ -127,7 +127,7 @@ func TestCreateMetadata(t *testing.T) { fakeMetaData := pubsub.Metadata{ Properties: fakeProperties, } - fakeMetaData.Properties[metadataprefetchCount] = "1" + fakeMetaData.Properties[metadataPrefetchCount] = "1" // act m, err := createMetadata(fakeMetaData) @@ -206,6 +206,7 @@ func TestCreateMetadata(t *testing.T) { fakeMetaData := pubsub.Metadata{ Properties: fakeProperties, } + fakeMetaData.Properties[metadataDurable] = tt.in // act m, err := createMetadata(fakeMetaData) @@ -214,6 +215,7 @@ func TestCreateMetadata(t *testing.T) { assert.NoError(t, err) assert.Equal(t, fakeProperties[metadataHostKey], m.host) assert.Equal(t, fakeProperties[metadataConsumerIDKey], m.consumerID) + assert.Equal(t, tt.expected, m.durable) }) } diff --git a/pubsub/rabbitmq/rabbitmq.go b/pubsub/rabbitmq/rabbitmq.go index ac4f55aa9..3d6734d4a 100644 --- a/pubsub/rabbitmq/rabbitmq.go +++ b/pubsub/rabbitmq/rabbitmq.go @@ -24,6 +24,7 @@ const ( metadataHostKey = "host" metadataConsumerIDKey = "consumerID" + metadataDurable = "durable" metadataDeleteWhenUnusedKey = "deletedWhenUnused" metadataAutoAckKey = "autoAck" metadataDeliveryModeKey = "deliveryMode" @@ -31,7 +32,7 @@ const ( metadataReconnectWaitSeconds = "reconnectWaitSeconds" defaultReconnectWaitSeconds = 10 - metadataprefetchCount = "prefetchCount" + metadataPrefetchCount = "prefetchCount" ) // RabbitMQ allows sending/receiving messages in pub/sub format. @@ -228,7 +229,7 @@ func (r *rabbitMQ) prepareSubscription(channel rabbitMQChannelBroker, req pubsub } r.logger.Debugf("%s declaring queue '%s'", logMessagePrefix, queueName) - q, err := channel.QueueDeclare(queueName, true, r.metadata.deleteWhenUnused, false, false, nil) + q, err := channel.QueueDeclare(queueName, r.metadata.durable, r.metadata.deleteWhenUnused, false, false, nil) if err != nil { return nil, err }