Adding conformance tests
Signed-off-by: TKTheTechie <thomas.kunnumpurath@solace.com>
This commit is contained in:
parent
2039a30de7
commit
619c35693e
|
@ -74,6 +74,17 @@ func (a *amqpPubSub) Init(metadata pubsub.Metadata) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func AddPrefixToAddress(t string) string {
|
||||
dest := t
|
||||
|
||||
//Unless the request comes in to publish on a queue, publish directly on a topic
|
||||
if !strings.HasPrefix(dest, "queue://") && !strings.HasPrefix(dest, "topic://") {
|
||||
dest = "topic://" + dest
|
||||
}
|
||||
|
||||
return dest
|
||||
}
|
||||
|
||||
// Publish the topic to amqp pubsub
|
||||
func (a *amqpPubSub) Publish(req *pubsub.PublishRequest) error {
|
||||
|
||||
|
@ -99,15 +110,8 @@ func (a *amqpPubSub) Publish(req *pubsub.PublishRequest) error {
|
|||
}
|
||||
}
|
||||
|
||||
dest := req.Topic
|
||||
|
||||
//Unless the request comes in to publish on a queue, publish directly on a topic
|
||||
if !strings.HasPrefix(dest, "queue://") && !strings.HasPrefix(dest, "topic://") {
|
||||
dest = "topic://" + dest
|
||||
}
|
||||
|
||||
sender, err := a.session.NewSender(
|
||||
amqp.LinkTargetAddress(dest),
|
||||
amqp.LinkTargetAddress(AddPrefixToAddress(req.Topic)),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
|
@ -137,11 +141,10 @@ func (a *amqpPubSub) Publish(req *pubsub.PublishRequest) error {
|
|||
|
||||
}
|
||||
|
||||
// Set up a subscription directly to a queue. Subscriptions to topics are not currently supported
|
||||
func (a *amqpPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
|
||||
|
||||
receiver, err := a.session.NewReceiver(
|
||||
amqp.LinkSourceAddress(req.Topic),
|
||||
amqp.LinkSourceAddress(AddPrefixToAddress(req.Topic)),
|
||||
)
|
||||
|
||||
if err == nil {
|
||||
|
@ -161,32 +164,34 @@ func (a *amqpPubSub) subscribeForever(ctx context.Context, receiver *amqp.Receiv
|
|||
// Receive next message
|
||||
msg, err := receiver.Receive(ctx)
|
||||
|
||||
a.logger.Debugf("Received a message %s", msg.GetData())
|
||||
if msg != nil {
|
||||
a.logger.Debugf("Received a message %s", msg.GetData())
|
||||
|
||||
pubsubMsg := &pubsub.NewMessage{
|
||||
Data: msg.GetData(),
|
||||
Topic: msg.LinkName(),
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
a.logger.Errorf("failed to establish receiver")
|
||||
}
|
||||
|
||||
err = handler(ctx, pubsubMsg)
|
||||
|
||||
if err == nil {
|
||||
err := receiver.AcceptMessage(ctx, msg)
|
||||
a.logger.Debugf("ACKed a message")
|
||||
if err != nil {
|
||||
a.logger.Errorf("failed to acknowledge a message")
|
||||
pubsubMsg := &pubsub.NewMessage{
|
||||
Data: msg.GetData(),
|
||||
Topic: msg.LinkName(),
|
||||
}
|
||||
} else {
|
||||
a.logger.Errorf("Error processing message from %s", msg.LinkName())
|
||||
a.logger.Debugf("NAKd a message")
|
||||
err := receiver.RejectMessage(ctx, msg, nil)
|
||||
if err != nil {
|
||||
a.logger.Errorf("failed to NAK a message")
|
||||
|
||||
if err != nil {
|
||||
a.logger.Errorf("failed to establish receiver")
|
||||
}
|
||||
|
||||
err = handler(ctx, pubsubMsg)
|
||||
|
||||
if err == nil {
|
||||
err := receiver.AcceptMessage(ctx, msg)
|
||||
a.logger.Debugf("ACKed a message")
|
||||
if err != nil {
|
||||
a.logger.Errorf("failed to acknowledge a message")
|
||||
}
|
||||
} else {
|
||||
a.logger.Errorf("Error processing message from %s", msg.LinkName())
|
||||
a.logger.Debugf("NAKd a message")
|
||||
err := receiver.RejectMessage(ctx, msg, nil)
|
||||
if err != nil {
|
||||
a.logger.Errorf("failed to NAK a message")
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,17 @@
|
|||
apiVersion: dapr.io/v1alpha1
|
||||
kind: Component
|
||||
metadata:
|
||||
name: solace
|
||||
spec:
|
||||
type: pubsub.amqp
|
||||
version: v1
|
||||
metadata:
|
||||
- name: url
|
||||
value: 'amqp://localhost:5672'
|
||||
- name: username
|
||||
value: 'default'
|
||||
- name: password
|
||||
value: 'default'
|
||||
- name: anonymous
|
||||
value: true
|
||||
|
|
@ -62,6 +62,9 @@ components:
|
|||
- component: mqtt
|
||||
profile: mosquitto
|
||||
operations: ["publish", "subscribe", "multiplehandlers"]
|
||||
- component: amqp
|
||||
profile: solace
|
||||
operations: ["publish", "subscribe"]
|
||||
- component: mqtt
|
||||
profile: emqx
|
||||
operations: ["publish", "subscribe", "multiplehandlers"]
|
||||
|
|
|
@ -50,6 +50,7 @@ import (
|
|||
b_postgres "github.com/dapr/components-contrib/bindings/postgres"
|
||||
b_rabbitmq "github.com/dapr/components-contrib/bindings/rabbitmq"
|
||||
b_redis "github.com/dapr/components-contrib/bindings/redis"
|
||||
p_amqp "github.com/dapr/components-contrib/pubsub/amqp"
|
||||
p_snssqs "github.com/dapr/components-contrib/pubsub/aws/snssqs"
|
||||
p_eventhubs "github.com/dapr/components-contrib/pubsub/azure/eventhubs"
|
||||
p_servicebusqueues "github.com/dapr/components-contrib/pubsub/azure/servicebus/queues"
|
||||
|
@ -401,6 +402,8 @@ func loadPubSub(tc TestComponent) pubsub.PubSub {
|
|||
pubsub = p_snssqs.NewSnsSqs(testLogger)
|
||||
case "kubemq":
|
||||
pubsub = p_kubemq.NewKubeMQ(testLogger)
|
||||
case "amqp":
|
||||
pubsub = p_amqp.NewAMQPPubsub(testLogger)
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue