Merge branch 'master' into aws-pubsub-cert-tests-1339

This commit is contained in:
Roberto Rojas 2022-12-29 09:04:08 -05:00 committed by GitHub
commit c1a89423f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 641 additions and 10 deletions

View File

@ -0,0 +1,61 @@
version: '3.3'
services:
primary:
container_name: pubSubStandardSingleNode
image: solace/solace-pubsub-standard:latest
volumes:
- "storage-group:/var/lib/solace"
shm_size: 1g
ulimits:
core: -1
nofile:
soft: 2448
hard: 6592
deploy:
restart_policy:
condition: on-failure
max_attempts: 1
ports:
#Port Mappings: With the exception of SMF, ports are mapped straight
#through from host to container. This may result in port collisions on
#commonly used ports that will cause failure of the container to start.
#Web transport
- '8008:8008'
#Web transport over TLS
- '1443:1443'
#SEMP over TLS
- '1943:1943'
#MQTT Default VPN
- '1883:1883'
#AMQP Default VPN over TLS
- '5671:5671'
#AMQP Default VPN
- '5672:5672'
#MQTT Default VPN over WebSockets
- '8000:8000'
#MQTT Default VPN over WebSockets / TLS
- '8443:8443'
#MQTT Default VPN over TLS
- '8883:8883'
#SEMP / PubSub+ Manager
- '8080:8080'
#REST Default VPN
- '9000:9000'
#REST Default VPN over TLS
- '9443:9443'
#SMF
- '55554:55555'
#SMF Compressed
- '55003:55003'
#SMF over TLS
- '55443:55443'
#SSH connection to CLI
- '2222:2222'
environment:
- username_admin_globalaccesslevel=admin
- username_admin_password=admin
- system_scaling_maxconnectioncount=100
volumes:
storage-group:

View File

@ -74,6 +74,7 @@ jobs:
- pubsub.kafka-wurstmeister
- pubsub.kafka-confluent
- pubsub.kubemq
- pubsub.solace
- secretstores.kubernetes
- secretstores.localenv
- secretstores.localfile
@ -481,6 +482,10 @@ jobs:
run: docker-compose -f ./.github/infrastructure/docker-compose-kubemq.yml -p kubemq up -d
if: contains(matrix.component, 'kubemq')
- name: Start solace
run: docker-compose -f ./.github/infrastructure/docker-compose-solace.yml -p solace up -d
if: contains(matrix.component, 'solace')
- name: Start nats with JetStream
run: |
docker-compose -f ./.github/infrastructure/docker-compose-jetstream.yml up -p jetstream -d

292
pubsub/solace/amqp/amqp.go Normal file
View File

@ -0,0 +1,292 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package amqp
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"net/url"
"strconv"
"strings"
"sync"
time "time"
amqp "github.com/Azure/go-amqp"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
)
const (
publishRetryWaitSeconds = 2
publishMaxRetries = 3
)
// amqpPubSub type allows sending and receiving data to/from an AMQP 1.0 broker
type amqpPubSub struct {
session *amqp.Session
metadata *metadata
logger logger.Logger
publishLock sync.RWMutex
publishRetryCount int
ctx context.Context
cancel context.CancelFunc
}
// NewAMQPPubsub returns a new AMQPPubSub instance
func NewAMQPPubsub(logger logger.Logger) pubsub.PubSub {
return &amqpPubSub{
logger: logger,
publishLock: sync.RWMutex{},
}
}
// Init parses the metadata and creates a new Pub Sub Client.
func (a *amqpPubSub) Init(metadata pubsub.Metadata) error {
amqpMeta, err := parseAMQPMetaData(metadata, a.logger)
if err != nil {
return err
}
a.metadata = amqpMeta
a.ctx, a.cancel = context.WithCancel(context.Background())
s, err := a.connect()
if err != nil {
return err
}
a.session = s
return err
}
func AddPrefixToAddress(t string) string {
dest := t
// Unless the request comes in to publish on a queue, publish directly on a topic
if !strings.HasPrefix(dest, "queue:") && !strings.HasPrefix(dest, "topic:") {
dest = "topic://" + dest
} else if strings.HasPrefix(dest, "queue:") {
dest = strings.Replace(dest, "queue:", "queue://", 1)
} else if strings.HasPrefix(dest, "topic:") {
dest = strings.Replace(dest, "topic:", "topic://", 1)
}
return dest
}
// Publish the topic to amqp pubsub
func (a *amqpPubSub) Publish(ctx context.Context, req *pubsub.PublishRequest) error {
a.publishLock.Lock()
defer a.publishLock.Unlock()
a.publishRetryCount = 0
if req.Topic == "" {
return errors.New("topic name is empty")
}
m := amqp.NewMessage(req.Data)
// If the request has ttl specified, put it on the message header
ttlProp := req.Metadata["ttlInSeconds"]
if ttlProp != "" {
ttlInSeconds, err := strconv.Atoi(ttlProp)
if err != nil {
a.logger.Warnf("Invalid ttl received from message %s", ttlInSeconds)
} else {
m.Header.TTL = time.Second * time.Duration(ttlInSeconds)
}
}
sender, err := a.session.NewSender(ctx,
AddPrefixToAddress(req.Topic),
nil,
)
if err != nil {
a.logger.Errorf("Unable to create link to %s", req.Topic, err)
} else {
err = sender.Send(ctx, m)
// If the publish operation has failed, attempt to republish a maximum number of times
// before giving up
if err != nil {
for a.publishRetryCount <= publishMaxRetries {
a.publishRetryCount++
// Send message
err = sender.Send(ctx, m)
if err != nil {
a.logger.Warnf("Failed to publish a message to the broker", err)
}
time.Sleep(publishRetryWaitSeconds * time.Second)
}
}
}
return err
}
func (a *amqpPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
prefixedTopic := AddPrefixToAddress(req.Topic)
receiver, err := a.session.NewReceiver(a.ctx,
prefixedTopic,
nil,
)
if err == nil {
a.logger.Infof("Attempting to subscribe to %s", prefixedTopic)
go a.subscribeForever(ctx, receiver, handler, prefixedTopic)
} else {
a.logger.Error("Unable to create a receiver:", err)
}
return err
}
// function that subscribes to a queue in a tight loop
func (a *amqpPubSub) subscribeForever(ctx context.Context, receiver *amqp.Receiver, handler pubsub.Handler, t string) {
for {
// Receive next message
msg, err := receiver.Receive(ctx)
if msg != nil {
data := msg.GetData()
// if data is empty, then check the value field for data
if data == nil || len(data) == 0 {
data = []byte(fmt.Sprint(msg.Value))
}
pubsubMsg := &pubsub.NewMessage{
Data: data,
Topic: msg.LinkName(),
}
if err != nil {
a.logger.Errorf("failed to establish receiver")
}
err = handler(ctx, pubsubMsg)
if err == nil {
err := receiver.AcceptMessage(ctx, msg)
a.logger.Debugf("ACKed a message")
if err != nil {
a.logger.Errorf("failed to acknowledge a message")
}
} else {
a.logger.Errorf("Error processing message from %s", msg.LinkName())
a.logger.Debugf("NAKd a message")
err := receiver.RejectMessage(ctx, msg, nil)
if err != nil {
a.logger.Errorf("failed to NAK a message")
}
}
}
}
}
// Connect to the AMQP broker
func (a *amqpPubSub) connect() (*amqp.Session, error) {
uri, err := url.Parse(a.metadata.url)
if err != nil {
return nil, err
}
clientOpts := a.createClientOptions(uri)
a.logger.Infof("Attempting to connect to %s", a.metadata.url)
client, err := amqp.Dial(a.metadata.url, &clientOpts)
if err != nil {
a.logger.Fatal("Dialing AMQP server:", err)
}
// Open a session
session, err := client.NewSession(a.ctx, nil)
if err != nil {
a.logger.Fatal("Creating AMQP session:", err)
}
return session, nil
}
func (a *amqpPubSub) newTLSConfig() *tls.Config {
tlsConfig := new(tls.Config)
if a.metadata.clientCert != "" && a.metadata.clientKey != "" {
cert, err := tls.X509KeyPair([]byte(a.metadata.clientCert), []byte(a.metadata.clientKey))
if err != nil {
a.logger.Warnf("unable to load client certificate and key pair. Err: %v", err)
return tlsConfig
}
tlsConfig.Certificates = []tls.Certificate{cert}
}
if a.metadata.caCert != "" {
tlsConfig.RootCAs = x509.NewCertPool()
if ok := tlsConfig.RootCAs.AppendCertsFromPEM([]byte(a.metadata.caCert)); !ok {
a.logger.Warnf("unable to load ca certificate.")
}
}
return tlsConfig
}
func (a *amqpPubSub) createClientOptions(uri *url.URL) amqp.ConnOptions {
var opts amqp.ConnOptions
scheme := uri.Scheme
switch scheme {
case "amqp":
if a.metadata.anonymous == true {
opts.SASLType = amqp.SASLTypeAnonymous()
} else {
opts.SASLType = amqp.SASLTypePlain(a.metadata.username, a.metadata.password)
}
case "amqps":
opts.SASLType = amqp.SASLTypePlain(a.metadata.username, a.metadata.password)
opts.TLSConfig = a.newTLSConfig()
}
return opts
}
// Close the session
func (a *amqpPubSub) Close() error {
a.publishLock.Lock()
defer a.publishLock.Unlock()
err := a.session.Close(a.ctx)
if err != nil {
a.logger.Warnf("failed to close the connection.", err)
}
return err
}
// Feature list for AMQP PubSub
func (a *amqpPubSub) Features() []pubsub.Feature {
return []pubsub.Feature{pubsub.FeatureSubscribeWildcards, pubsub.FeatureMessageTTL}
}

View File

@ -0,0 +1,141 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package amqp
import (
"crypto/x509"
"encoding/pem"
"errors"
"testing"
mdata "github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
"github.com/stretchr/testify/assert"
)
func getFakeProperties() map[string]string {
return map[string]string{
"consumerID": "client",
amqpURL: "tcp://fakeUser:fakePassword@fake.mqtt.host:1883",
anonymous: "false",
username: "default",
password: "default",
}
}
func TestParseMetadata(t *testing.T) {
log := logger.NewLogger("test")
t.Run("metadata is correct", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{Base: mdata.Base{Properties: fakeProperties}}
m, err := parseAMQPMetaData(fakeMetaData, log)
// assert
assert.NoError(t, err)
assert.Equal(t, fakeProperties[amqpURL], m.url)
})
t.Run("url is not given", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{
Base: mdata.Base{Properties: fakeProperties},
}
fakeMetaData.Properties[amqpURL] = ""
m, err := parseAMQPMetaData(fakeMetaData, log)
// assert
assert.EqualError(t, err, errors.New(errorMsgPrefix+" missing url").Error())
assert.Equal(t, fakeProperties[amqpURL], m.url)
})
t.Run("invalid ca certificate", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{Base: mdata.Base{Properties: fakeProperties}}
fakeMetaData.Properties[amqpCACert] = "randomNonPEMBlockCA"
_, err := parseAMQPMetaData(fakeMetaData, log)
// assert
assert.Contains(t, err.Error(), "invalid caCert")
})
t.Run("valid ca certificate", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{Base: mdata.Base{Properties: fakeProperties}}
fakeMetaData.Properties[amqpCACert] = "-----BEGIN CERTIFICATE-----\nMIICyDCCAbACCQDb8BtgvbqW5jANBgkqhkiG9w0BAQsFADAmMQswCQYDVQQGEwJJ\nTjEXMBUGA1UEAwwOZGFwck1xdHRUZXN0Q0EwHhcNMjAwODEyMDY1MzU4WhcNMjUw\nODEyMDY1MzU4WjAmMQswCQYDVQQGEwJJTjEXMBUGA1UEAwwOZGFwck1xdHRUZXN0\nQ0EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDEXte1GBxFJaygsEnK\nHV2AxazZW6Vppv+i50AuURHcaGo0i8G5CTfHzSKrYtTFfBskUspl+2N8GPV5c8Eb\ng+PP6YFn1wiHVz+wRSk3BD35DcGOT2o4XsJw5tiAzJkbpAOYCYl7KAM+BtOf41uC\nd6TdqmawhRGtv1ND2WtyJOT6A3KcUfjhL4TFEhWoljPJVay4TQoJcZMAImD/Xcxw\n6urv6wmUJby3/RJ3I46ZNH3zxEw5vSq1TuzuXxQmfPJG0ZPKJtQZ2nkZ3PNZe4bd\nNUa83YgQap7nBhYdYMMsQyLES2qy3mPcemBVoBWRGODel4PMEcsQiOhAyloAF2d3\nhd+LAgMBAAEwDQYJKoZIhvcNAQELBQADggEBAK13X5JYBy78vHYoP0Oq9fe5XBbL\nuRM8YLnet9b/bXTGG4SnCCOGqWz99swYK7SVyR5l2h8SAoLzeNV61PtaZ6fHrbar\noxSL7BoRXOhMH6LQATadyvwlJ71uqlagqya7soaPK09TtfzeebLT0QkRCWT9b9lQ\nDBvBVCaFidynJL1ts21m5yUdIY4JSu4sGZGb4FRGFdBv/hD3wH8LAkOppsSv3C/Q\nkfkDDSQzYbdMoBuXmafvi3He7Rv+e6Tj9or1rrWdx0MIKlZPzz4DOe5Rh112uRB9\n7xPHJt16c+Ya3DKpchwwdNcki0vFchlpV96HK8sMCoY9kBzPhkEQLdiBGv4=\n-----END CERTIFICATE-----\n"
m, err := parseAMQPMetaData(fakeMetaData, log)
// assert
assert.NoError(t, err)
block, _ := pem.Decode([]byte(m.tlsCfg.caCert))
cert, err := x509.ParseCertificate(block.Bytes)
if err != nil {
t.Errorf("failed to parse ca certificate from metadata. %v", err)
}
assert.Equal(t, "daprMqttTestCA", cert.Subject.CommonName)
})
t.Run("invalid client certificate", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{Base: mdata.Base{Properties: fakeProperties}}
fakeMetaData.Properties[amqpClientCert] = "randomNonPEMBlockClientCert"
_, err := parseAMQPMetaData(fakeMetaData, log)
// assert
assert.Contains(t, err.Error(), "invalid clientCert")
})
t.Run("valid client certificate", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{Base: mdata.Base{Properties: fakeProperties}}
fakeMetaData.Properties[amqpClientCert] = "-----BEGIN CERTIFICATE-----\nMIICzDCCAbQCCQDBKDMS3SHsDzANBgkqhkiG9w0BAQUFADAmMQswCQYDVQQGEwJJ\nTjEXMBUGA1UEAwwOZGFwck1xdHRUZXN0Q0EwHhcNMjAwODEyMDY1NTE1WhcNMjEw\nODA3MDY1NTE1WjAqMQswCQYDVQQGEwJJTjEbMBkGA1UEAwwSZGFwck1xdHRUZXN0\nQ2xpZW50MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA5IDfsGI2pb4W\nt3CjckrKuNeTrgmla3sXxSI5wfDgLGd/XkNu++M6yi9ABaBiYChpxbylqIeAn/HT\n3r/nhcb+bldMtEkU9tODHy/QDhvN2UGFjRsMfzO9p1oMpTnRdJCHYinE+oqVced5\nHI+UEofAU+1eiIXqJGKrdfn4gvaHst4QfVPvui8WzJq9TMkEhEME+5hs3VKyKZr2\nqjIxzr7nLVod3DBf482VjxRI06Ip3fPvNuMWwzj2G+Rj8PMcBjoKeCLQL9uQh7f1\nTWHuACqNIrmFEUQWdGETnRjHWIvw0NEL40+Ur2b5+7/hoqnTzReJ3XUe1jM3l44f\nl0rOf4hu2QIDAQABMA0GCSqGSIb3DQEBBQUAA4IBAQAT9yoIeX0LTsvx7/b+8V3a\nkP+j8u97QCc8n5xnMpivcMEk5cfqXX5Llv2EUJ9kBsynrJwT7ujhTJXSA/zb2UdC\nKH8PaSrgIlLwQNZMDofbz6+zPbjStkgne/ZQkTDIxY73sGpJL8LsQVO9p2KjOpdj\nSf9KuJhLzcHolh7ry3ZrkOg+QlMSvseeDRAxNhpkJrGQ6piXoUiEeKKNa0rWTMHx\nIP1Hqj+hh7jgqoQR48NL2jNng7I64HqTl6Mv2fiNfINiw+5xmXTB0QYkGU5NvPBO\naKcCRcGlU7ND89BogQPZsl/P04tAuQqpQWffzT4sEEOyWSVGda4N2Ys3GSQGBv8e\n-----END CERTIFICATE-----\n"
m, err := parseAMQPMetaData(fakeMetaData, log)
// assert
assert.NoError(t, err)
block, _ := pem.Decode([]byte(m.tlsCfg.clientCert))
cert, err := x509.ParseCertificate(block.Bytes)
if err != nil {
t.Errorf("failed to parse client certificate from metadata. %v", err)
}
assert.Equal(t, "daprMqttTestClient", cert.Subject.CommonName)
})
t.Run("invalid client certificate key", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{Base: mdata.Base{Properties: fakeProperties}}
fakeMetaData.Properties[amqpClientKey] = "randomNonPEMBlockClientKey"
_, err := parseAMQPMetaData(fakeMetaData, log)
// assert
assert.Contains(t, err.Error(), "invalid clientKey")
})
t.Run("valid client certificate key", func(t *testing.T) {
fakeProperties := getFakeProperties()
fakeMetaData := pubsub.Metadata{Base: mdata.Base{Properties: fakeProperties}}
fakeMetaData.Properties[amqpClientKey] = "-----BEGIN RSA PRIVATE KEY-----\nMIIEpAIBAAKCAQEA5IDfsGI2pb4Wt3CjckrKuNeTrgmla3sXxSI5wfDgLGd/XkNu\n++M6yi9ABaBiYChpxbylqIeAn/HT3r/nhcb+bldMtEkU9tODHy/QDhvN2UGFjRsM\nfzO9p1oMpTnRdJCHYinE+oqVced5HI+UEofAU+1eiIXqJGKrdfn4gvaHst4QfVPv\nui8WzJq9TMkEhEME+5hs3VKyKZr2qjIxzr7nLVod3DBf482VjxRI06Ip3fPvNuMW\nwzj2G+Rj8PMcBjoKeCLQL9uQh7f1TWHuACqNIrmFEUQWdGETnRjHWIvw0NEL40+U\nr2b5+7/hoqnTzReJ3XUe1jM3l44fl0rOf4hu2QIDAQABAoIBAQCVMINb4TP20P55\n9IPyqlxjhPT563hijXK+lhMJyiBDPavOOs7qjLikq2bshYPVbm1o2jt6pkXXqAeB\n5t/d20fheQQurYyPfxecNBZuL78duwbcUy28m2aXLlcVRYO4zGhoMgdW4UajoNLV\nT/UIiDONWGyhTHXMHdP+6h9UOmvs3o4b225AuLrw9n6QO5I1Se8lcfOTIqR1fy4O\nGsUWEQPdW0X3Dhgpx7kDIuBTAQzbjD31PCR1U8h2wsCeEe6hPCrsMbo/D019weol\ndi40tbWR1/oNz0+vro2d9YDPJkXN0gmpT51Z4YJoexZBdyzO5z4DMSdn5yczzt6p\nQq8LsXAFAoGBAPYXRbC4OxhtuC+xr8KRkaCCMjtjUWFbFWf6OFgUS9b5uPz9xvdY\nXo7wBP1zp2dS8yFsdIYH5Six4Z5iOuDR4sVixzjabhwedL6bmS1zV5qcCWeASKX1\nURgSkfMmC4Tg3LBgZ9YxySFcVRjikxljkS3eK7Mp7Xmj5afe7qV73TJfAoGBAO20\nTtw2RGe02xnydZmmwf+NpQHOA9S0JsehZA6NRbtPEN/C8bPJIq4VABC5zcH+tfYf\nzndbDlGhuk+qpPA590rG5RSOUjYnQFq7njdSfFyok9dXSZQTjJwFnG2oy0LmgjCe\nROYnbCzD+a+gBKV4xlo2M80OLakQ3zOwPT0xNRnHAoGATLEj/tbrU8mdxP9TDwfe\nom7wyKFDE1wXZ7gLJyfsGqrog69y+lKH5XPXmkUYvpKTQq9SARMkz3HgJkPmpXnD\nelA2Vfl8pza2m1BShF+VxZErPR41hcLV6vKemXAZ1udc33qr4YzSaZskygSSYy8s\nZ2b9p3BBmc8CGzbWmKvpW3ECgYEAn7sFLxdMWj/+5221Nr4HKPn+wrq0ek9gq884\n1Ep8bETSOvrdvolPQ5mbBKJGsLC/h5eR/0Rx18sMzpIF6eOZ2GbU8z474mX36cCf\nrd9A8Gbbid3+9IE6gHGIz2uYwujw3UjNVbdyCpbahvjJhoQlDePUZVu8tRpAUpSA\nYklZvGsCgYBuIlOFTNGMVUnwfzrcS9a/31LSvWTZa8w2QFjsRPMYFezo2l4yWs4D\nPEpeuoJm+Gp6F6ayjoeyOw9mvMBH5hAZr4WjbiU6UodzEHREAsLAzCzcRyIpnDE6\nPW1c3j60r8AHVufkWTA+8B9WoLC5MqcYTV3beMGnNGGqS2PeBom63Q==\n-----END RSA PRIVATE KEY-----\n"
m, err := parseAMQPMetaData(fakeMetaData, log)
// assert
assert.NoError(t, err)
assert.NotNil(t, m.tlsCfg.clientKey, "failed to parse valid client certificate key")
})
}

View File

@ -0,0 +1,117 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package amqp
import (
"encoding/pem"
"fmt"
"strconv"
"time"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
)
const (
// errors.
errorMsgPrefix = "amqp pub sub error:"
)
type metadata struct {
tlsCfg
url string
username string
password string
anonymous bool
}
type tlsCfg struct {
caCert string
clientCert string
clientKey string
}
const (
// Keys
amqpURL = "url"
anonymous = "anonymous"
username = "username"
password = "password"
amqpCACert = "caCert"
amqpClientCert = "clientCert"
amqpClientKey = "clientKey"
defaultWait = 30 * time.Second
)
// isValidPEM validates the provided input has PEM formatted block.
func isValidPEM(val string) bool {
block, _ := pem.Decode([]byte(val))
return block != nil
}
func parseAMQPMetaData(md pubsub.Metadata, log logger.Logger) (*metadata, error) {
m := metadata{anonymous: false}
// required configuration settings
if val, ok := md.Properties[amqpURL]; ok && val != "" {
m.url = val
} else {
return &m, fmt.Errorf("%s missing url", errorMsgPrefix)
}
// optional configuration settings
if val, ok := md.Properties[anonymous]; ok && val != "" {
var err error
m.anonymous, err = strconv.ParseBool(val)
if err != nil {
return &m, fmt.Errorf("%s invalid anonymous %s, %s", errorMsgPrefix, val, err)
}
}
if !m.anonymous {
if val, ok := md.Properties[username]; ok && val != "" {
m.username = val
} else {
return &m, fmt.Errorf("%s missing username", errorMsgPrefix)
}
if val, ok := md.Properties[password]; ok && val != "" {
m.password = val
} else {
return &m, fmt.Errorf("%s missing username", errorMsgPrefix)
}
}
if val, ok := md.Properties[amqpCACert]; ok && val != "" {
if !isValidPEM(val) {
return &m, fmt.Errorf("%s invalid caCert", errorMsgPrefix)
}
m.tlsCfg.caCert = val
}
if val, ok := md.Properties[amqpClientCert]; ok && val != "" {
if !isValidPEM(val) {
return &m, fmt.Errorf("%s invalid clientCert", errorMsgPrefix)
}
m.tlsCfg.clientCert = val
}
if val, ok := md.Properties[amqpClientKey]; ok && val != "" {
if !isValidPEM(val) {
return &m, fmt.Errorf("%s invalid clientKey", errorMsgPrefix)
}
m.tlsCfg.clientKey = val
}
return &m, nil
}

View File

@ -0,0 +1,10 @@
apiVersion: dapr.io/v1alpha1
kind: Component
spec:
type: pubsub.solace.amqp
version: v1
metadata:
- name: url
value: 'amqp://localhost:5672'
- name: anonymous
value: true

View File

@ -12,7 +12,7 @@
componentType: pubsub
components:
- component: azure.eventhubs
operations: ["publish", "subscribe", "multiplehandlers", "bulkpublish"]
operations: ['publish', 'subscribe', 'multiplehandlers', 'bulkpublish']
config:
pubsubName: azure-eventhubs
testTopicName: eventhubs-pubsub-topic
@ -50,9 +50,9 @@ components:
config:
checkInOrderProcessing: false
- component: natsstreaming
operations: ["publish", "subscribe", "multiplehandlers"]
operations: ['publish', 'subscribe', 'multiplehandlers']
- component: jetstream
operations: ["publish", "subscribe", "multiplehandlers"]
operations: ['publish', 'subscribe', 'multiplehandlers']
- component: kafka
allOperations: true
- component: kafka
@ -62,20 +62,22 @@ components:
profile: confluent
allOperations: true
- component: pulsar
operations: ["publish", "subscribe", "multiplehandlers"]
operations: ['publish', 'subscribe', 'multiplehandlers']
- component: mqtt
profile: mosquitto
operations: ["publish", "subscribe", "multiplehandlers"]
operations: ['publish', 'subscribe', 'multiplehandlers']
- component: solace.amqp
operations: ['publish', 'subscribe']
- component: mqtt
profile: emqx
operations: ["publish", "subscribe", "multiplehandlers"]
operations: ['publish', 'subscribe', 'multiplehandlers']
- component: mqtt
profile: vernemq
operations: ["publish", "subscribe", "multiplehandlers"]
operations: ['publish', 'subscribe', 'multiplehandlers']
- component: hazelcast
operations: ["publish", "subscribe", "multiplehandlers"]
operations: ['publish', 'subscribe', 'multiplehandlers']
- component: rabbitmq
operations: ["publish", "subscribe", "multiplehandlers"]
operations: ['publish', 'subscribe', 'multiplehandlers']
config:
checkInOrderProcessing: false
- component: in-memory
@ -94,4 +96,4 @@ components:
pubsubName: aws-snssqs
checkInOrderProcessing: false
- component: kubemq
operations: ["publish", "subscribe", "multiplehandlers"]
operations: ['publish', 'subscribe', 'multiplehandlers']

View File

@ -70,6 +70,7 @@ import (
p_pulsar "github.com/dapr/components-contrib/pubsub/pulsar"
p_rabbitmq "github.com/dapr/components-contrib/pubsub/rabbitmq"
p_redis "github.com/dapr/components-contrib/pubsub/redis"
p_solaceamqp "github.com/dapr/components-contrib/pubsub/solace/amqp"
ss_azure "github.com/dapr/components-contrib/secretstores/azure/keyvault"
ss_hashicorp_vault "github.com/dapr/components-contrib/secretstores/hashicorp/vault"
ss_kubernetes "github.com/dapr/components-contrib/secretstores/kubernetes"
@ -452,6 +453,8 @@ func loadPubSub(tc TestComponent) pubsub.PubSub {
pubsub = p_snssqs.NewSnsSqs(testLogger)
case "kubemq":
pubsub = p_kubemq.NewKubeMQ(testLogger)
case "solace.amqp":
pubsub = p_solaceamqp.NewAMQPPubsub(testLogger)
default:
return nil
}