From 6254982cd73adca803cdf042b968c0c8da4ddaae Mon Sep 17 00:00:00 2001 From: spike Date: Tue, 6 Dec 2022 22:27:54 +0800 Subject: [PATCH] Raise error if protocol does not match connection string Signed-off-by: spike --- pubsub/rabbitmq/metadata.go | 3 +++ pubsub/rabbitmq/metadata_test.go | 39 +++++++++++++++++++++++++++++++- 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/pubsub/rabbitmq/metadata.go b/pubsub/rabbitmq/metadata.go index 13d730432..ca10d3282 100644 --- a/pubsub/rabbitmq/metadata.go +++ b/pubsub/rabbitmq/metadata.go @@ -110,6 +110,9 @@ func createMetadata(pubSubMetadata pubsub.Metadata, log logger.Logger) (*metadat } if val, found := pubSubMetadata.Properties[metadataProtocolKey]; found && val != "" { + if result.connectionString != "" && result.protocol != val { + return &result, fmt.Errorf("%s protocol does not match connection string, protocol: %s, connection string: %s", errorMessagePrefix, val, result.connectionString) + } result.protocol = val } diff --git a/pubsub/rabbitmq/metadata_test.go b/pubsub/rabbitmq/metadata_test.go index f23857ace..88618a8c2 100644 --- a/pubsub/rabbitmq/metadata_test.go +++ b/pubsub/rabbitmq/metadata_test.go @@ -30,7 +30,7 @@ import ( func getFakeProperties() map[string]string { props := map[string]string{} props[metadataConnectionStringKey] = "amqps://localhost:5671" - props[metadataProtocolKey] = "fakeprotocol" + props[metadataProtocolKey] = "amqps" props[metadataHostnameKey] = "fakehostname" props[metadataUsernameKey] = "fakeusername" props[metadataPasswordKey] = "fakepassword" @@ -79,6 +79,9 @@ func TestCreateMetadata(t *testing.T) { assert.Equal(t, uint8(0), m.prefetchCount) assert.Equal(t, int64(0), m.maxLen) assert.Equal(t, int64(0), m.maxLenBytes) + assert.Equal(t, "", m.ClientKey) + assert.Equal(t, "", m.ClientCert) + assert.Equal(t, "", m.CACert) assert.Equal(t, fanoutExchangeKind, m.exchangeKind) }) @@ -122,6 +125,40 @@ func TestCreateMetadata(t *testing.T) { assert.Equal(t, uint8(2), m.deliveryMode) }) + t.Run("protocol does not match connection string", func(t *testing.T) { + fakeProperties := getFakeProperties() + + fakeMetaData := pubsub.Metadata{ + Base: mdata.Base{Properties: fakeProperties}, + } + fakeMetaData.Properties[metadataProtocolKey] = "fakeprotocol" + + // act + _, err := createMetadata(fakeMetaData, log) + + // assert + if assert.Error(t, err) { + assert.Equal(t, err.Error(), fmt.Sprintf("%s protocol does not match connection string, protocol: %s, connection string: %s", errorMessagePrefix, fakeMetaData.Properties[metadataProtocolKey], fakeMetaData.Properties[metadataConnectionStringKey])) + } + }) + + t.Run("connection string is empty, protocol is not empty", func(t *testing.T) { + fakeProperties := getFakeProperties() + + fakeMetaData := pubsub.Metadata{ + Base: mdata.Base{Properties: fakeProperties}, + } + fakeMetaData.Properties[metadataProtocolKey] = "fakeprotocol" + fakeMetaData.Properties[metadataConnectionStringKey] = "" + + // act + m, err := createMetadata(fakeMetaData, log) + + // assert + assert.Nil(t, err) + assert.Equal(t, fakeProperties[metadataProtocolKey], m.protocol) + }) + t.Run("invalid concurrency", func(t *testing.T) { fakeProperties := getFakeProperties()