Revert "Merge branch 'master' of https://github.com/dapr/components-contrib into dapr-master"
This reverts commit9a91bccc4b, reversing changes made to7e0cb1fa2d.
This commit is contained in:
parent
9a91bccc4b
commit
110ef82681
|
|
@ -0,0 +1,182 @@
|
|||
// ------------------------------------------------------------
|
||||
// Copyright (c) Microsoft Corporation.
|
||||
// Licensed under the MIT License.
|
||||
// ------------------------------------------------------------
|
||||
|
||||
package rabbitmq
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/dapr/components-contrib/bindings"
|
||||
"github.com/dapr/dapr/pkg/logger"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/streadway/amqp"
|
||||
)
|
||||
|
||||
const (
|
||||
rabbitMQQueueMessageTTLKey = "x-message-ttl"
|
||||
)
|
||||
|
||||
// RabbitMQ allows sending/receiving data to/from RabbitMQ
|
||||
type RabbitMQ struct {
|
||||
connection *amqp.Connection
|
||||
channel *amqp.Channel
|
||||
metadata rabbitMQMetadata
|
||||
logger logger.Logger
|
||||
queue amqp.Queue
|
||||
}
|
||||
|
||||
// Metadata is the rabbitmq config
|
||||
type rabbitMQMetadata struct {
|
||||
QueueName string `json:"queueName"`
|
||||
Host string `json:"host"`
|
||||
Durable bool `json:"durable,string"`
|
||||
DeleteWhenUnused bool `json:"deleteWhenUnused,string"`
|
||||
PrefetchCount string `json:"prefetchCount"`
|
||||
defaultQueueTTL *time.Duration
|
||||
prefetchCount int
|
||||
}
|
||||
|
||||
// NewRabbitMQ returns a new rabbitmq instance
|
||||
func NewRabbitMQ(logger logger.Logger) *RabbitMQ {
|
||||
return &RabbitMQ{logger: logger}
|
||||
}
|
||||
|
||||
// Init does metadata parsing and connection creation
|
||||
func (r *RabbitMQ) Init(metadata bindings.Metadata) error {
|
||||
err := r.parseMetadata(metadata)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
conn, err := amqp.Dial(r.metadata.Host)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ch, err := conn.Channel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ch.Qos(r.metadata.prefetchCount, 0, true)
|
||||
r.connection = conn
|
||||
r.channel = ch
|
||||
|
||||
q, err := r.declareQueue()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r.queue = q
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *RabbitMQ) Operations() []bindings.OperationKind {
|
||||
return []bindings.OperationKind{bindings.CreateOperation}
|
||||
}
|
||||
|
||||
func (r *RabbitMQ) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
||||
pub := amqp.Publishing{
|
||||
DeliveryMode: amqp.Persistent,
|
||||
ContentType: "text/plain",
|
||||
Body: req.Data,
|
||||
}
|
||||
|
||||
ttl, ok, err := bindings.TryGetTTL(req.Metadata)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// The default time to live has been set in the queue
|
||||
// We allow overriding on each call, by setting a value in request metadata
|
||||
if ok {
|
||||
// RabbitMQ expects the duration in ms
|
||||
pub.Expiration = strconv.FormatInt(ttl.Milliseconds(), 10)
|
||||
}
|
||||
|
||||
err = r.channel.Publish("", r.metadata.QueueName, false, false, pub)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (r *RabbitMQ) parseMetadata(metadata bindings.Metadata) error {
|
||||
b, err := json.Marshal(metadata.Properties)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var m rabbitMQMetadata
|
||||
err = json.Unmarshal(b, &m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if m.PrefetchCount != "" {
|
||||
m.prefetchCount, err = strconv.Atoi(m.PrefetchCount)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "%s value must be a valid integer: actual is '%s'", "prefetchCount", m.PrefetchCount)
|
||||
}
|
||||
}
|
||||
|
||||
ttl, ok, err := bindings.TryGetTTL(metadata.Properties)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if ok {
|
||||
m.defaultQueueTTL = &ttl
|
||||
}
|
||||
|
||||
r.metadata = m
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *RabbitMQ) declareQueue() (amqp.Queue, error) {
|
||||
args := amqp.Table{}
|
||||
if r.metadata.defaultQueueTTL != nil {
|
||||
// Value in ms
|
||||
ttl := *r.metadata.defaultQueueTTL / time.Millisecond
|
||||
args[rabbitMQQueueMessageTTLKey] = int(ttl)
|
||||
}
|
||||
|
||||
return r.channel.QueueDeclare(r.metadata.QueueName, r.metadata.Durable, r.metadata.DeleteWhenUnused, false, false, args)
|
||||
}
|
||||
|
||||
func (r *RabbitMQ) Read(handler func(*bindings.ReadResponse) error) error {
|
||||
msgs, err := r.channel.Consume(
|
||||
r.queue.Name,
|
||||
"",
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
forever := make(chan bool)
|
||||
|
||||
go func() {
|
||||
for d := range msgs {
|
||||
err := handler(&bindings.ReadResponse{
|
||||
Data: d.Body,
|
||||
})
|
||||
if err == nil {
|
||||
r.channel.Ack(d.DeliveryTag, false)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
<-forever
|
||||
return nil
|
||||
}
|
||||
|
|
@ -0,0 +1,115 @@
|
|||
// ------------------------------------------------------------
|
||||
// Copyright (c) Microsoft Corporation.
|
||||
// Licensed under the MIT License.
|
||||
// ------------------------------------------------------------
|
||||
|
||||
package rabbitmq
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/dapr/components-contrib/bindings"
|
||||
"github.com/dapr/dapr/pkg/logger"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestParseMetadata(t *testing.T) {
|
||||
const queueName = "test-queue"
|
||||
const host = "test-host"
|
||||
var oneSecondTTL time.Duration = time.Second
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
properties map[string]string
|
||||
expectedDeleteWhenUnused bool
|
||||
expectedDurable bool
|
||||
expectedTTL *time.Duration
|
||||
expectedStringPrefetchCount string
|
||||
expectedIntPrefetchCount int
|
||||
}{
|
||||
{
|
||||
name: "Delete / Durable",
|
||||
properties: map[string]string{"QueueName": queueName, "Host": host, "DeleteWhenUnused": "true", "Durable": "true"},
|
||||
expectedDeleteWhenUnused: true,
|
||||
expectedDurable: true,
|
||||
},
|
||||
{
|
||||
name: "Not Delete / Not Durable",
|
||||
properties: map[string]string{"QueueName": queueName, "Host": host, "DeleteWhenUnused": "false", "Durable": "false"},
|
||||
expectedDeleteWhenUnused: false,
|
||||
expectedDurable: false,
|
||||
},
|
||||
{
|
||||
name: "With one second TTL",
|
||||
properties: map[string]string{"QueueName": queueName, "Host": host, "DeleteWhenUnused": "false", "Durable": "false", bindings.TTLMetadataKey: "1"},
|
||||
expectedDeleteWhenUnused: false,
|
||||
expectedDurable: false,
|
||||
expectedTTL: &oneSecondTTL,
|
||||
},
|
||||
{
|
||||
name: "Empty TTL",
|
||||
properties: map[string]string{"QueueName": queueName, "Host": host, "DeleteWhenUnused": "false", "Durable": "false", bindings.TTLMetadataKey: ""},
|
||||
expectedDeleteWhenUnused: false,
|
||||
expectedDurable: false,
|
||||
},
|
||||
{
|
||||
name: "With one PrefetchCount",
|
||||
properties: map[string]string{"QueueName": queueName, "Host": host, "DeleteWhenUnused": "false", "Durable": "false", "PrefetchCount": "1"},
|
||||
expectedDeleteWhenUnused: false,
|
||||
expectedDurable: false,
|
||||
expectedStringPrefetchCount: "1",
|
||||
expectedIntPrefetchCount: 1,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range testCases {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
m := bindings.Metadata{}
|
||||
m.Properties = tt.properties
|
||||
r := RabbitMQ{logger: logger.NewLogger("test")}
|
||||
err := r.parseMetadata(m)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, queueName, r.metadata.QueueName)
|
||||
assert.Equal(t, host, r.metadata.Host)
|
||||
assert.Equal(t, tt.expectedDeleteWhenUnused, r.metadata.DeleteWhenUnused)
|
||||
assert.Equal(t, tt.expectedDurable, r.metadata.Durable)
|
||||
assert.Equal(t, tt.expectedTTL, r.metadata.defaultQueueTTL)
|
||||
assert.Equal(t, tt.expectedStringPrefetchCount, r.metadata.PrefetchCount)
|
||||
assert.Equal(t, tt.expectedIntPrefetchCount, r.metadata.prefetchCount)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseMetadataWithInvalidTTL(t *testing.T) {
|
||||
const queueName = "test-queue"
|
||||
const host = "test-host"
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
properties map[string]string
|
||||
}{
|
||||
{
|
||||
name: "Whitespaces TTL",
|
||||
properties: map[string]string{"QueueName": queueName, "Host": host, bindings.TTLMetadataKey: " "},
|
||||
},
|
||||
{
|
||||
name: "Negative ttl",
|
||||
properties: map[string]string{"QueueName": queueName, "Host": host, bindings.TTLMetadataKey: "-1"},
|
||||
},
|
||||
{
|
||||
name: "Non-numeric ttl",
|
||||
properties: map[string]string{"QueueName": queueName, "Host": host, bindings.TTLMetadataKey: "abc"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range testCases {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
m := bindings.Metadata{}
|
||||
m.Properties = tt.properties
|
||||
r := RabbitMQ{logger: logger.NewLogger("test")}
|
||||
err := r.parseMetadata(m)
|
||||
assert.NotNil(t, err)
|
||||
})
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue