From 9e78671fc09d5f93a731fd783f2d8733db1417bf Mon Sep 17 00:00:00 2001 From: Francisco Beltrao Date: Wed, 27 Nov 2019 19:37:09 +0100 Subject: [PATCH] Add RabbitMQ pub/sub (#124) * Add RabbitMQ pub/sub * Fix lint error (global variable) * RabbitMQ pub/sub auto ack defaults to false Fix log message incorrect prefix * Remove extra line in go.mod * Revert extra line in go.mod --- pubsub/Readme.md | 1 + pubsub/rabbitmq/metadata.go | 70 +++++++++++ pubsub/rabbitmq/metadata_test.go | 207 +++++++++++++++++++++++++++++++ pubsub/rabbitmq/rabbitmq.go | 175 ++++++++++++++++++++++++++ pubsub/rabbitmq/rabbitmq_test.go | 45 +++++++ 5 files changed, 498 insertions(+) create mode 100644 pubsub/rabbitmq/metadata.go create mode 100644 pubsub/rabbitmq/metadata_test.go create mode 100644 pubsub/rabbitmq/rabbitmq.go create mode 100644 pubsub/rabbitmq/rabbitmq_test.go diff --git a/pubsub/Readme.md b/pubsub/Readme.md index b9875aeca..3252031f4 100644 --- a/pubsub/Readme.md +++ b/pubsub/Readme.md @@ -7,6 +7,7 @@ Currently supported pub-subs are: * Redis Streams * NATS * Azure Service Bus +* RabbitMQ ## Implementing a new Pub Sub diff --git a/pubsub/rabbitmq/metadata.go b/pubsub/rabbitmq/metadata.go new file mode 100644 index 000000000..a37613045 --- /dev/null +++ b/pubsub/rabbitmq/metadata.go @@ -0,0 +1,70 @@ +package rabbitmq + +import ( + "fmt" + "strconv" + + "github.com/dapr/components-contrib/pubsub" +) + +type metadata struct { + consumerID string + host string + durable bool + deleteWhenUnused bool + autoAck bool + requeueInFailure bool + deliveryMode uint8 // Transient (0 or 1) or Persistent (2) +} + +// createMetadata creates a new instance from the pubsub metadata +func createMetadata(pubSubMetadata pubsub.Metadata) (*metadata, error) { + result := metadata{deleteWhenUnused: true, autoAck: false} + + if val, found := pubSubMetadata.Properties[metadataHostKey]; found && val != "" { + result.host = val + } else { + return &result, fmt.Errorf("%s missing RabbitMQ host", errorMessagePrefix) + } + + if val, found := pubSubMetadata.Properties[metadataConsumerIDKey]; found && val != "" { + result.consumerID = val + } else { + return &result, fmt.Errorf("%s missing RabbitMQ consumerID", errorMessagePrefix) + } + + if val, found := pubSubMetadata.Properties[metadataDeliveryModeKey]; found && val != "" { + if intVal, err := strconv.Atoi(val); err == nil { + if intVal < 0 || intVal > 2 { + return &result, fmt.Errorf("%s invalid RabbitMQ delivery mode, accepted values are between 0 and 2", errorMessagePrefix) + } + result.deliveryMode = uint8(intVal) + } + } + + if val, found := pubSubMetadata.Properties[metadataDurableKey]; found && val != "" { + if boolVal, err := strconv.ParseBool(val); err == nil { + result.durable = boolVal + } + } + + if val, found := pubSubMetadata.Properties[metadataDeleteWhenUnusedKey]; found && val != "" { + if boolVal, err := strconv.ParseBool(val); err == nil { + result.deleteWhenUnused = boolVal + } + } + + if val, found := pubSubMetadata.Properties[metadataAutoAckKey]; found && val != "" { + if boolVal, err := strconv.ParseBool(val); err == nil { + result.autoAck = boolVal + } + } + + if val, found := pubSubMetadata.Properties[metadataRequeueInFailureKey]; found && val != "" { + if boolVal, err := strconv.ParseBool(val); err == nil { + result.requeueInFailure = boolVal + } + } + + return &result, nil +} diff --git a/pubsub/rabbitmq/metadata_test.go b/pubsub/rabbitmq/metadata_test.go new file mode 100644 index 000000000..8fa501ab8 --- /dev/null +++ b/pubsub/rabbitmq/metadata_test.go @@ -0,0 +1,207 @@ +package rabbitmq + +import ( + "fmt" + "testing" + + "github.com/dapr/components-contrib/pubsub" + "github.com/stretchr/testify/assert" +) + +func getFakeProperties() map[string]string { + props := map[string]string{} + props[metadataHostKey] = "fakehost" + props[metadataConsumerIDKey] = "fakeConsumerID" + + return props +} + +func TestCreateMetadata(t *testing.T) { + + var booleanFlagTests = []struct { + in string + expected bool + }{ + {"true", true}, + {"TRUE", true}, + {"false", false}, + {"FALSE", false}, + } + + t.Run("metadata is correct", func(t *testing.T) { + fakeProperties := getFakeProperties() + + fakeMetaData := pubsub.Metadata{ + Properties: fakeProperties, + } + + // act + m, err := createMetadata(fakeMetaData) + + // assert + assert.NoError(t, err) + assert.Equal(t, fakeProperties[metadataHostKey], m.host) + assert.Equal(t, fakeProperties[metadataConsumerIDKey], m.consumerID) + assert.Equal(t, false, m.durable) + assert.Equal(t, false, m.autoAck) + assert.Equal(t, false, m.requeueInFailure) + assert.Equal(t, true, m.deleteWhenUnused) + assert.Equal(t, uint8(0), m.deliveryMode) + }) + + t.Run("host is not given", func(t *testing.T) { + fakeProperties := getFakeProperties() + + fakeMetaData := pubsub.Metadata{ + Properties: fakeProperties, + } + fakeMetaData.Properties[metadataHostKey] = "" + + // act + m, err := createMetadata(fakeMetaData) + + // assert + assert.EqualError(t, err, "rabbitmq pub/sub error: missing RabbitMQ host") + assert.Empty(t, m.host) + assert.Empty(t, m.consumerID) + }) + + t.Run("consumerID is not given", func(t *testing.T) { + fakeProperties := getFakeProperties() + + fakeMetaData := pubsub.Metadata{ + Properties: fakeProperties, + } + fakeMetaData.Properties[metadataConsumerIDKey] = "" + + // act + m, err := createMetadata(fakeMetaData) + + // assert + assert.EqualError(t, err, "rabbitmq pub/sub error: missing RabbitMQ consumerID") + assert.Equal(t, fakeProperties[metadataHostKey], m.host) + assert.Equal(t, fakeProperties[metadataConsumerIDKey], m.consumerID) + assert.Empty(t, m.consumerID) + }) + + var invalidDeliveryModes = []string{"3", "10", "-1"} + + for _, deliveryMode := range invalidDeliveryModes { + t.Run(fmt.Sprintf("deliveryMode value=%s", deliveryMode), func(t *testing.T) { + fakeProperties := getFakeProperties() + + fakeMetaData := pubsub.Metadata{ + Properties: fakeProperties, + } + fakeMetaData.Properties[metadataDeliveryModeKey] = deliveryMode + + // act + m, err := createMetadata(fakeMetaData) + + // assert + assert.EqualError(t, err, "rabbitmq pub/sub error: invalid RabbitMQ delivery mode, accepted values are between 0 and 2") + assert.Equal(t, fakeProperties[metadataHostKey], m.host) + assert.Equal(t, fakeProperties[metadataConsumerIDKey], m.consumerID) + assert.Equal(t, uint8(0), m.deliveryMode) + }) + } + + t.Run("deliveryMode is set", func(t *testing.T) { + fakeProperties := getFakeProperties() + + fakeMetaData := pubsub.Metadata{ + Properties: fakeProperties, + } + fakeMetaData.Properties[metadataDeliveryModeKey] = "2" + + // act + m, err := createMetadata(fakeMetaData) + + // assert + assert.NoError(t, err) + assert.Equal(t, fakeProperties[metadataHostKey], m.host) + assert.Equal(t, fakeProperties[metadataConsumerIDKey], m.consumerID) + assert.Equal(t, uint8(2), m.deliveryMode) + }) + + for _, tt := range booleanFlagTests { + + t.Run(fmt.Sprintf("autoAck value=%s", tt.in), func(t *testing.T) { + fakeProperties := getFakeProperties() + + fakeMetaData := pubsub.Metadata{ + Properties: fakeProperties, + } + fakeMetaData.Properties[metadataAutoAckKey] = tt.in + + // act + m, err := createMetadata(fakeMetaData) + + // assert + assert.NoError(t, err) + assert.Equal(t, fakeProperties[metadataHostKey], m.host) + assert.Equal(t, fakeProperties[metadataConsumerIDKey], m.consumerID) + assert.Equal(t, tt.expected, m.autoAck) + }) + } + + for _, tt := range booleanFlagTests { + t.Run(fmt.Sprintf("requeueInFailure value=%s", tt.in), func(t *testing.T) { + fakeProperties := getFakeProperties() + + fakeMetaData := pubsub.Metadata{ + Properties: fakeProperties, + } + fakeMetaData.Properties[metadataRequeueInFailureKey] = tt.in + + // act + m, err := createMetadata(fakeMetaData) + + // assert + assert.NoError(t, err) + assert.Equal(t, fakeProperties[metadataHostKey], m.host) + assert.Equal(t, fakeProperties[metadataConsumerIDKey], m.consumerID) + assert.Equal(t, tt.expected, m.requeueInFailure) + }) + } + + for _, tt := range booleanFlagTests { + t.Run(fmt.Sprintf("deleteWhenUnused value=%s", tt.in), func(t *testing.T) { + fakeProperties := getFakeProperties() + + fakeMetaData := pubsub.Metadata{ + Properties: fakeProperties, + } + fakeMetaData.Properties[metadataDeleteWhenUnusedKey] = tt.in + + // act + m, err := createMetadata(fakeMetaData) + + // assert + assert.NoError(t, err) + assert.Equal(t, fakeProperties[metadataHostKey], m.host) + assert.Equal(t, fakeProperties[metadataConsumerIDKey], m.consumerID) + assert.Equal(t, tt.expected, m.deleteWhenUnused) + }) + } + + for _, tt := range booleanFlagTests { + t.Run(fmt.Sprintf("durable value=%s", tt.in), func(t *testing.T) { + fakeProperties := getFakeProperties() + + fakeMetaData := pubsub.Metadata{ + Properties: fakeProperties, + } + fakeMetaData.Properties[metadataDurableKey] = tt.in + + // act + m, err := createMetadata(fakeMetaData) + + // assert + assert.NoError(t, err) + assert.Equal(t, fakeProperties[metadataHostKey], m.host) + assert.Equal(t, fakeProperties[metadataConsumerIDKey], m.consumerID) + assert.Equal(t, tt.expected, m.durable) + }) + } +} diff --git a/pubsub/rabbitmq/rabbitmq.go b/pubsub/rabbitmq/rabbitmq.go new file mode 100644 index 000000000..a04f0db90 --- /dev/null +++ b/pubsub/rabbitmq/rabbitmq.go @@ -0,0 +1,175 @@ +package rabbitmq + +import ( + "fmt" + + log "github.com/Sirupsen/logrus" + "github.com/dapr/components-contrib/pubsub" + "github.com/streadway/amqp" +) + +const ( + fanoutExchangeKind = "fanout" + logMessagePrefix = "rabbitmq pub/sub:" + errorMessagePrefix = "rabbitmq pub/sub error:" + + metadataHostKey = "host" + metadataConsumerIDKey = "consumerID" + metadataDurableKey = "durable" + metadataDeleteWhenUnusedKey = "deletedWhenUnused" + metadataAutoAckKey = "autoAck" + metadataDeliveryModeKey = "deliveryMode" + metadataRequeueInFailureKey = "requeueInFailure" +) + +// RabbitMQ allows sending/receiving messages in pub/sub format +type rabbitMQ struct { + connection *amqp.Connection + channel *amqp.Channel + metadata *metadata + declaredExchanges map[string]bool +} + +// NewRabbitMQ creates a new RabbitMQ pub/sub +func NewRabbitMQ() pubsub.PubSub { + return createRabbitMQ() +} + +func createRabbitMQ() *rabbitMQ { + return &rabbitMQ{declaredExchanges: make(map[string]bool)} +} + +// Init does metadata parsing and connection creation +func (r *rabbitMQ) Init(metadata pubsub.Metadata) error { + meta, err := createMetadata(metadata) + if err != nil { + return err + } + + r.metadata = meta + + conn, err := amqp.Dial(meta.host) + if err != nil { + return err + } + + ch, err := conn.Channel() + if err != nil { + return err + } + + r.connection = conn + r.channel = ch + return nil +} + +func (r *rabbitMQ) Publish(req *pubsub.PublishRequest) error { + err := r.ensureExchangeDeclared(req.Topic) + if err != nil { + return err + } + + log.Debugf("%s publishing message to topic '%s'", logMessagePrefix, req.Topic) + + err = r.channel.Publish(req.Topic, "", false, false, amqp.Publishing{ + ContentType: "text/plain", + Body: req.Data, + DeliveryMode: r.metadata.deliveryMode, + }) + + if err != nil { + return err + } + + return nil +} + +func (r *rabbitMQ) Subscribe(req pubsub.SubscribeRequest, handler func(msg *pubsub.NewMessage) error) error { + + err := r.ensureExchangeDeclared(req.Topic) + if err != nil { + return err + } + + queueName := fmt.Sprintf("%s-%s", r.metadata.consumerID, req.Topic) + + log.Debugf("%s declaring queue '%s'", logMessagePrefix, queueName) + q, err := r.channel.QueueDeclare(queueName, r.metadata.durable, r.metadata.deleteWhenUnused, true, false, nil) + if err != nil { + return err + } + + log.Debugf("%s binding queue '%s' to exchange '%s'", logMessagePrefix, q.Name, req.Topic) + err = r.channel.QueueBind(q.Name, "", req.Topic, false, nil) + if err != nil { + return err + } + + msgs, err := r.channel.Consume( + q.Name, + queueName, // consumerId + r.metadata.autoAck, // autoAck + !r.metadata.durable, // exclusive + false, // noLocal + false, // noWait + nil, + ) + + if err != nil { + return err + } + + go r.listenMessages(msgs, req.Topic, handler) + + return nil +} + +func (r *rabbitMQ) listenMessages(msgs <-chan amqp.Delivery, topic string, handler func(msg *pubsub.NewMessage) error) { + for d := range msgs { + r.handleMessage(d, topic, handler) + } +} + +func (r *rabbitMQ) handleMessage(d amqp.Delivery, topic string, handler func(msg *pubsub.NewMessage) error) { + pubsubMsg := &pubsub.NewMessage{ + Data: d.Body, + Topic: topic, + } + + err := handler(pubsubMsg) + if err != nil { + log.Errorf("%s error handling message from topic '%s', %s", logMessagePrefix, topic, err) + } + + // if message is not auto acked we need to ack/nack + if !r.metadata.autoAck { + if err != nil { + requeue := r.metadata.requeueInFailure && !d.Redelivered + + log.Debugf("%s nacking message '%s' from topic '%s', requeue=%t", logMessagePrefix, d.MessageId, topic, requeue) + if err = r.channel.Nack(d.DeliveryTag, false, requeue); err != nil { + log.Errorf("%s error nacking message '%s' from topic '%s', %s", logMessagePrefix, d.MessageId, topic, err) + } + } else { + log.Debugf("%s acking message '%s' from topic '%s'", logMessagePrefix, d.MessageId, topic) + if err = r.channel.Ack(d.DeliveryTag, false); err != nil { + log.Errorf("%s error acking message '%s' from topic '%s', %s", logMessagePrefix, d.MessageId, topic, err) + } + } + } +} + +func (r *rabbitMQ) ensureExchangeDeclared(exchange string) error { + + if _, exists := r.declaredExchanges[exchange]; !exists { + log.Debugf("%s declaring exchange '%s' of kind '%s'", logMessagePrefix, exchange, fanoutExchangeKind) + err := r.channel.ExchangeDeclare(exchange, fanoutExchangeKind, true, false, false, false, nil) + if err != nil { + return err + } + + r.declaredExchanges[exchange] = true + } + + return nil +} diff --git a/pubsub/rabbitmq/rabbitmq_test.go b/pubsub/rabbitmq/rabbitmq_test.go new file mode 100644 index 000000000..68d7435fa --- /dev/null +++ b/pubsub/rabbitmq/rabbitmq_test.go @@ -0,0 +1,45 @@ +package rabbitmq + +import ( + "testing" + + "github.com/dapr/components-contrib/pubsub" + "github.com/streadway/amqp" + "github.com/stretchr/testify/assert" +) + +func createAmqpMessage(body string) amqp.Delivery { + return amqp.Delivery{Body: []byte(body)} +} + +func TestProcessSubscriberMessage(t *testing.T) { + testMetadata := &metadata{autoAck: true} + testRabbitMQSubscriber := createRabbitMQ() + testRabbitMQSubscriber.metadata = testMetadata + + const topic = "testTopic" + + ch := make(chan amqp.Delivery) + defer close(ch) + + messageCount := 0 + + fakeHandler := func(msg *pubsub.NewMessage) error { + messageCount++ + + assert.Equal(t, topic, msg.Topic) + assert.NotNil(t, msg.Data) + + return nil + } + + go testRabbitMQSubscriber.listenMessages(ch, topic, fakeHandler) + assert.Equal(t, messageCount, 0) + ch <- createAmqpMessage("{ \"msg\": \"1\"}") + ch <- createAmqpMessage("{ \"msg\": \"2\"}") + assert.GreaterOrEqual(t, messageCount, 1) + assert.LessOrEqual(t, messageCount, 2) + ch <- createAmqpMessage("{ \"msg\": \"3\"}") + assert.GreaterOrEqual(t, messageCount, 2) + assert.LessOrEqual(t, messageCount, 3) +}