Adding RabbitMQ to the conformance tests (#739)

* Initial pass at adding RabbitMQ to the conformance tests

* Fixing unit tests
This commit is contained in:
Phil Kedy 2021-03-04 14:56:35 -05:00 committed by GitHub
parent a8d5dea748
commit a1a0ad2c25
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 56 additions and 24 deletions

View File

@ -0,0 +1,7 @@
version: '2'
services:
rabbitmq:
image: rabbitmq:3-management
ports:
- 5672:5672
- 15672:15672

View File

@ -40,6 +40,7 @@ jobs:
- pubsub.mqtt-emqx
- pubsub.mqtt-vernemq
- pubsub.hazelcast
- pubsub.rabbitmq
- secretstores.kubernetes
- secretstores.localenv
- secretstores.localfile
@ -200,6 +201,10 @@ jobs:
run: docker-compose -f ./.github/infrastructure/docker-compose-hazelcast.yml -p hazelcast up -d
if: contains(matrix.component, 'hazelcast')
- name: Start rabbitmq
run: docker-compose -f ./.github/infrastructure/docker-compose-rabbitmq.yml -p rabbitmq up -d
if: contains(matrix.component, 'rabbitmq')
- name: Start KinD
uses: helm/kind-action@v1.0.0
if: contains(matrix.component, 'kubernetes')

View File

@ -36,8 +36,6 @@ func createMetadata(pubSubMetadata pubsub.Metadata) (*metadata, error) {
if val, found := pubSubMetadata.Properties[metadataConsumerIDKey]; found && val != "" {
result.consumerID = val
} else {
return &result, fmt.Errorf("%s missing RabbitMQ consumerID", errorMessagePrefix)
}
if val, found := pubSubMetadata.Properties[metadataDeliveryModeKey]; found && val != "" {

View File

@ -65,24 +65,6 @@ func TestCreateMetadata(t *testing.T) {
assert.Empty(t, m.consumerID)
})
t.Run("consumerID is not given", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Properties: fakeProperties,
}
fakeMetaData.Properties[metadataConsumerIDKey] = ""
// act
m, err := createMetadata(fakeMetaData)
// assert
assert.EqualError(t, err, "rabbitmq pub/sub error: missing RabbitMQ consumerID")
assert.Equal(t, fakeProperties[metadataHostKey], m.host)
assert.Equal(t, fakeProperties[metadataConsumerIDKey], m.consumerID)
assert.Empty(t, m.consumerID)
})
invalidDeliveryModes := []string{"3", "10", "-1"}
for _, deliveryMode := range invalidDeliveryModes {

View File

@ -193,6 +193,10 @@ func (r *rabbitMQ) Publish(req *pubsub.PublishRequest) error {
}
func (r *rabbitMQ) Subscribe(req pubsub.SubscribeRequest, handler func(msg *pubsub.NewMessage) error) error {
if r.metadata.consumerID == "" {
return errors.New("consumerID is required for subscriptions")
}
queueName := fmt.Sprintf("%s-%s", r.metadata.consumerID, req.Topic)
go r.subscribeForever(req, queueName, handler)
@ -321,12 +325,12 @@ func (r *rabbitMQ) handleMessage(channel rabbitMQChannelBroker, d amqp.Delivery,
requeue := r.metadata.requeueInFailure && !d.Redelivered
r.logger.Debugf("%s nacking message '%s' from topic '%s', requeue=%t", logMessagePrefix, d.MessageId, topic, requeue)
if err = channel.Nack(d.DeliveryTag, false, requeue); err != nil {
if err = d.Nack(false, requeue); err != nil {
r.logger.Errorf("%s error nacking message '%s' from topic '%s', %s", logMessagePrefix, d.MessageId, topic, err)
}
} else {
r.logger.Debugf("%s acking message '%s' from topic '%s'", logMessagePrefix, d.MessageId, topic)
if err = channel.Ack(d.DeliveryTag, false); err != nil {
if err = d.Ack(false); err != nil {
r.logger.Errorf("%s error acking message '%s' from topic '%s', %s", logMessagePrefix, d.MessageId, topic, err)
}
}

View File

@ -46,8 +46,9 @@ func TestNoConsumer(t *testing.T) {
},
}
err := pubsubRabbitMQ.Init(metadata)
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "missing RabbitMQ consumerID")
assert.NoError(t, err)
err = pubsubRabbitMQ.Subscribe(pubsub.SubscribeRequest{}, nil)
assert.Contains(t, err.Error(), "consumerID is required for subscriptions")
}
func TestConcurrencyMode(t *testing.T) {

View File

@ -0,0 +1,28 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.rabbitmq
version: v1
metadata:
- name: host
value: "amqp://localhost:5672"
- name: consumerID
value: "testConsumer"
- name: durable
value: "false"
- name: deletedWhenUnused
value: "false"
- name: autoAck
value: "false"
- name: deliveryMode
value: "0"
- name: requeueInFailure
value: "true"
- name: prefetchCount
value: "0"
- name: reconnectWait
value: "0"
- name: concurrencyMode
value: single

View File

@ -36,3 +36,7 @@ components:
allOperations: true
- component: hazelcast
allOperations: true
- component: rabbitmq
allOperations: true
config:
checkInOrderProcessing: false

View File

@ -31,6 +31,7 @@ import (
p_mqtt "github.com/dapr/components-contrib/pubsub/mqtt"
p_natsstreaming "github.com/dapr/components-contrib/pubsub/natsstreaming"
p_pulsar "github.com/dapr/components-contrib/pubsub/pulsar"
p_rabbitmq "github.com/dapr/components-contrib/pubsub/rabbitmq"
p_redis "github.com/dapr/components-contrib/pubsub/redis"
"github.com/dapr/components-contrib/secretstores"
ss_azure "github.com/dapr/components-contrib/secretstores/azure/keyvault"
@ -323,6 +324,8 @@ func loadPubSub(tc TestComponent) pubsub.PubSub {
pubsub = p_mqtt.NewMQTTPubSub(testLogger)
case "hazelcast":
pubsub = p_hazelcast.NewHazelcastPubSub(testLogger)
case "rabbitmq":
pubsub = p_rabbitmq.NewRabbitMQ(testLogger)
default:
return nil
}