diff --git a/.github/infrastructure/docker-compose-solace.yml b/.github/infrastructure/docker-compose-solace.yml new file mode 100644 index 000000000..96cb46abb --- /dev/null +++ b/.github/infrastructure/docker-compose-solace.yml @@ -0,0 +1,61 @@ +version: '3.3' + +services: + primary: + container_name: pubSubStandardSingleNode + image: solace/solace-pubsub-standard:latest + volumes: + - "storage-group:/var/lib/solace" + shm_size: 1g + ulimits: + core: -1 + nofile: + soft: 2448 + hard: 6592 + deploy: + restart_policy: + condition: on-failure + max_attempts: 1 + ports: + #Port Mappings: With the exception of SMF, ports are mapped straight + #through from host to container. This may result in port collisions on + #commonly used ports that will cause failure of the container to start. + #Web transport + - '8008:8008' + #Web transport over TLS + - '1443:1443' + #SEMP over TLS + - '1943:1943' + #MQTT Default VPN + - '1883:1883' + #AMQP Default VPN over TLS + - '5671:5671' + #AMQP Default VPN + - '5672:5672' + #MQTT Default VPN over WebSockets + - '8000:8000' + #MQTT Default VPN over WebSockets / TLS + - '8443:8443' + #MQTT Default VPN over TLS + - '8883:8883' + #SEMP / PubSub+ Manager + - '8080:8080' + #REST Default VPN + - '9000:9000' + #REST Default VPN over TLS + - '9443:9443' + #SMF + - '55554:55555' + #SMF Compressed + - '55003:55003' + #SMF over TLS + - '55443:55443' + #SSH connection to CLI + - '2222:2222' + environment: + - username_admin_globalaccesslevel=admin + - username_admin_password=admin + - system_scaling_maxconnectioncount=100 + +volumes: + storage-group: \ No newline at end of file diff --git a/.github/workflows/conformance.yml b/.github/workflows/conformance.yml index abb22eefb..1fecad380 100644 --- a/.github/workflows/conformance.yml +++ b/.github/workflows/conformance.yml @@ -74,6 +74,7 @@ jobs: - pubsub.kafka-wurstmeister - pubsub.kafka-confluent - pubsub.kubemq + - pubsub.solace - secretstores.kubernetes - secretstores.localenv - secretstores.localfile @@ -480,6 +481,10 @@ jobs: - name: Start kubemq run: docker-compose -f ./.github/infrastructure/docker-compose-kubemq.yml -p kubemq up -d if: contains(matrix.component, 'kubemq') + + - name: Start solace + run: docker-compose -f ./.github/infrastructure/docker-compose-solace.yml -p solace up -d + if: contains(matrix.component, 'solace') - name: Start nats with JetStream run: | diff --git a/pubsub/solace/amqp/amqp.go b/pubsub/solace/amqp/amqp.go new file mode 100644 index 000000000..137a77c18 --- /dev/null +++ b/pubsub/solace/amqp/amqp.go @@ -0,0 +1,292 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package amqp + +import ( + "context" + "crypto/tls" + "crypto/x509" + "errors" + "fmt" + "net/url" + "strconv" + "strings" + "sync" + time "time" + + amqp "github.com/Azure/go-amqp" + + "github.com/dapr/components-contrib/pubsub" + "github.com/dapr/kit/logger" +) + +const ( + publishRetryWaitSeconds = 2 + publishMaxRetries = 3 +) + +// amqpPubSub type allows sending and receiving data to/from an AMQP 1.0 broker +type amqpPubSub struct { + session *amqp.Session + metadata *metadata + logger logger.Logger + publishLock sync.RWMutex + publishRetryCount int + ctx context.Context + cancel context.CancelFunc +} + +// NewAMQPPubsub returns a new AMQPPubSub instance +func NewAMQPPubsub(logger logger.Logger) pubsub.PubSub { + return &amqpPubSub{ + logger: logger, + publishLock: sync.RWMutex{}, + } +} + +// Init parses the metadata and creates a new Pub Sub Client. +func (a *amqpPubSub) Init(metadata pubsub.Metadata) error { + amqpMeta, err := parseAMQPMetaData(metadata, a.logger) + if err != nil { + return err + } + + a.metadata = amqpMeta + + a.ctx, a.cancel = context.WithCancel(context.Background()) + + s, err := a.connect() + if err != nil { + return err + } + + a.session = s + + return err +} + +func AddPrefixToAddress(t string) string { + dest := t + + // Unless the request comes in to publish on a queue, publish directly on a topic + if !strings.HasPrefix(dest, "queue:") && !strings.HasPrefix(dest, "topic:") { + dest = "topic://" + dest + } else if strings.HasPrefix(dest, "queue:") { + dest = strings.Replace(dest, "queue:", "queue://", 1) + } else if strings.HasPrefix(dest, "topic:") { + dest = strings.Replace(dest, "topic:", "topic://", 1) + } + + return dest +} + +// Publish the topic to amqp pubsub +func (a *amqpPubSub) Publish(ctx context.Context, req *pubsub.PublishRequest) error { + a.publishLock.Lock() + defer a.publishLock.Unlock() + + a.publishRetryCount = 0 + + if req.Topic == "" { + return errors.New("topic name is empty") + } + + m := amqp.NewMessage(req.Data) + + // If the request has ttl specified, put it on the message header + ttlProp := req.Metadata["ttlInSeconds"] + if ttlProp != "" { + ttlInSeconds, err := strconv.Atoi(ttlProp) + if err != nil { + a.logger.Warnf("Invalid ttl received from message %s", ttlInSeconds) + } else { + m.Header.TTL = time.Second * time.Duration(ttlInSeconds) + } + } + + sender, err := a.session.NewSender(ctx, + AddPrefixToAddress(req.Topic), + nil, + ) + + if err != nil { + a.logger.Errorf("Unable to create link to %s", req.Topic, err) + } else { + err = sender.Send(ctx, m) + + // If the publish operation has failed, attempt to republish a maximum number of times + // before giving up + if err != nil { + for a.publishRetryCount <= publishMaxRetries { + a.publishRetryCount++ + + // Send message + err = sender.Send(ctx, m) + + if err != nil { + a.logger.Warnf("Failed to publish a message to the broker", err) + } + time.Sleep(publishRetryWaitSeconds * time.Second) + } + } + } + + return err +} + +func (a *amqpPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error { + prefixedTopic := AddPrefixToAddress(req.Topic) + + receiver, err := a.session.NewReceiver(a.ctx, + prefixedTopic, + nil, + ) + + if err == nil { + a.logger.Infof("Attempting to subscribe to %s", prefixedTopic) + go a.subscribeForever(ctx, receiver, handler, prefixedTopic) + } else { + a.logger.Error("Unable to create a receiver:", err) + } + + return err +} + +// function that subscribes to a queue in a tight loop +func (a *amqpPubSub) subscribeForever(ctx context.Context, receiver *amqp.Receiver, handler pubsub.Handler, t string) { + for { + // Receive next message + msg, err := receiver.Receive(ctx) + + if msg != nil { + data := msg.GetData() + + // if data is empty, then check the value field for data + if data == nil || len(data) == 0 { + data = []byte(fmt.Sprint(msg.Value)) + } + + pubsubMsg := &pubsub.NewMessage{ + Data: data, + Topic: msg.LinkName(), + } + + if err != nil { + a.logger.Errorf("failed to establish receiver") + } + + err = handler(ctx, pubsubMsg) + + if err == nil { + err := receiver.AcceptMessage(ctx, msg) + a.logger.Debugf("ACKed a message") + if err != nil { + a.logger.Errorf("failed to acknowledge a message") + } + } else { + a.logger.Errorf("Error processing message from %s", msg.LinkName()) + a.logger.Debugf("NAKd a message") + err := receiver.RejectMessage(ctx, msg, nil) + if err != nil { + a.logger.Errorf("failed to NAK a message") + } + } + } + } +} + +// Connect to the AMQP broker +func (a *amqpPubSub) connect() (*amqp.Session, error) { + uri, err := url.Parse(a.metadata.url) + if err != nil { + return nil, err + } + + clientOpts := a.createClientOptions(uri) + + a.logger.Infof("Attempting to connect to %s", a.metadata.url) + client, err := amqp.Dial(a.metadata.url, &clientOpts) + if err != nil { + a.logger.Fatal("Dialing AMQP server:", err) + } + + // Open a session + session, err := client.NewSession(a.ctx, nil) + if err != nil { + a.logger.Fatal("Creating AMQP session:", err) + } + + return session, nil +} + +func (a *amqpPubSub) newTLSConfig() *tls.Config { + tlsConfig := new(tls.Config) + + if a.metadata.clientCert != "" && a.metadata.clientKey != "" { + cert, err := tls.X509KeyPair([]byte(a.metadata.clientCert), []byte(a.metadata.clientKey)) + if err != nil { + a.logger.Warnf("unable to load client certificate and key pair. Err: %v", err) + + return tlsConfig + } + tlsConfig.Certificates = []tls.Certificate{cert} + } + + if a.metadata.caCert != "" { + tlsConfig.RootCAs = x509.NewCertPool() + if ok := tlsConfig.RootCAs.AppendCertsFromPEM([]byte(a.metadata.caCert)); !ok { + a.logger.Warnf("unable to load ca certificate.") + } + } + + return tlsConfig +} + +func (a *amqpPubSub) createClientOptions(uri *url.URL) amqp.ConnOptions { + var opts amqp.ConnOptions + + scheme := uri.Scheme + + switch scheme { + case "amqp": + if a.metadata.anonymous == true { + opts.SASLType = amqp.SASLTypeAnonymous() + } else { + opts.SASLType = amqp.SASLTypePlain(a.metadata.username, a.metadata.password) + } + case "amqps": + opts.SASLType = amqp.SASLTypePlain(a.metadata.username, a.metadata.password) + opts.TLSConfig = a.newTLSConfig() + } + + return opts +} + +// Close the session +func (a *amqpPubSub) Close() error { + a.publishLock.Lock() + + defer a.publishLock.Unlock() + + err := a.session.Close(a.ctx) + if err != nil { + a.logger.Warnf("failed to close the connection.", err) + } + return err +} + +// Feature list for AMQP PubSub +func (a *amqpPubSub) Features() []pubsub.Feature { + return []pubsub.Feature{pubsub.FeatureSubscribeWildcards, pubsub.FeatureMessageTTL} +} diff --git a/pubsub/solace/amqp/amqp_test.go b/pubsub/solace/amqp/amqp_test.go new file mode 100644 index 000000000..d2b26175b --- /dev/null +++ b/pubsub/solace/amqp/amqp_test.go @@ -0,0 +1,141 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package amqp + +import ( + "crypto/x509" + "encoding/pem" + "errors" + "testing" + + mdata "github.com/dapr/components-contrib/metadata" + + "github.com/dapr/components-contrib/pubsub" + "github.com/dapr/kit/logger" + + "github.com/stretchr/testify/assert" +) + +func getFakeProperties() map[string]string { + return map[string]string{ + "consumerID": "client", + amqpURL: "tcp://fakeUser:fakePassword@fake.mqtt.host:1883", + anonymous: "false", + username: "default", + password: "default", + } +} + +func TestParseMetadata(t *testing.T) { + log := logger.NewLogger("test") + t.Run("metadata is correct", func(t *testing.T) { + fakeProperties := getFakeProperties() + + fakeMetaData := pubsub.Metadata{Base: mdata.Base{Properties: fakeProperties}} + + m, err := parseAMQPMetaData(fakeMetaData, log) + + // assert + assert.NoError(t, err) + assert.Equal(t, fakeProperties[amqpURL], m.url) + }) + + t.Run("url is not given", func(t *testing.T) { + fakeProperties := getFakeProperties() + + fakeMetaData := pubsub.Metadata{ + Base: mdata.Base{Properties: fakeProperties}, + } + fakeMetaData.Properties[amqpURL] = "" + + m, err := parseAMQPMetaData(fakeMetaData, log) + + // assert + assert.EqualError(t, err, errors.New(errorMsgPrefix+" missing url").Error()) + assert.Equal(t, fakeProperties[amqpURL], m.url) + }) + + t.Run("invalid ca certificate", func(t *testing.T) { + fakeProperties := getFakeProperties() + fakeMetaData := pubsub.Metadata{Base: mdata.Base{Properties: fakeProperties}} + fakeMetaData.Properties[amqpCACert] = "randomNonPEMBlockCA" + _, err := parseAMQPMetaData(fakeMetaData, log) + + // assert + assert.Contains(t, err.Error(), "invalid caCert") + }) + + t.Run("valid ca certificate", func(t *testing.T) { + fakeProperties := getFakeProperties() + fakeMetaData := pubsub.Metadata{Base: mdata.Base{Properties: fakeProperties}} + fakeMetaData.Properties[amqpCACert] = "-----BEGIN CERTIFICATE-----\nMIICyDCCAbACCQDb8BtgvbqW5jANBgkqhkiG9w0BAQsFADAmMQswCQYDVQQGEwJJ\nTjEXMBUGA1UEAwwOZGFwck1xdHRUZXN0Q0EwHhcNMjAwODEyMDY1MzU4WhcNMjUw\nODEyMDY1MzU4WjAmMQswCQYDVQQGEwJJTjEXMBUGA1UEAwwOZGFwck1xdHRUZXN0\nQ0EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDEXte1GBxFJaygsEnK\nHV2AxazZW6Vppv+i50AuURHcaGo0i8G5CTfHzSKrYtTFfBskUspl+2N8GPV5c8Eb\ng+PP6YFn1wiHVz+wRSk3BD35DcGOT2o4XsJw5tiAzJkbpAOYCYl7KAM+BtOf41uC\nd6TdqmawhRGtv1ND2WtyJOT6A3KcUfjhL4TFEhWoljPJVay4TQoJcZMAImD/Xcxw\n6urv6wmUJby3/RJ3I46ZNH3zxEw5vSq1TuzuXxQmfPJG0ZPKJtQZ2nkZ3PNZe4bd\nNUa83YgQap7nBhYdYMMsQyLES2qy3mPcemBVoBWRGODel4PMEcsQiOhAyloAF2d3\nhd+LAgMBAAEwDQYJKoZIhvcNAQELBQADggEBAK13X5JYBy78vHYoP0Oq9fe5XBbL\nuRM8YLnet9b/bXTGG4SnCCOGqWz99swYK7SVyR5l2h8SAoLzeNV61PtaZ6fHrbar\noxSL7BoRXOhMH6LQATadyvwlJ71uqlagqya7soaPK09TtfzeebLT0QkRCWT9b9lQ\nDBvBVCaFidynJL1ts21m5yUdIY4JSu4sGZGb4FRGFdBv/hD3wH8LAkOppsSv3C/Q\nkfkDDSQzYbdMoBuXmafvi3He7Rv+e6Tj9or1rrWdx0MIKlZPzz4DOe5Rh112uRB9\n7xPHJt16c+Ya3DKpchwwdNcki0vFchlpV96HK8sMCoY9kBzPhkEQLdiBGv4=\n-----END CERTIFICATE-----\n" + m, err := parseAMQPMetaData(fakeMetaData, log) + + // assert + assert.NoError(t, err) + block, _ := pem.Decode([]byte(m.tlsCfg.caCert)) + cert, err := x509.ParseCertificate(block.Bytes) + if err != nil { + t.Errorf("failed to parse ca certificate from metadata. %v", err) + } + assert.Equal(t, "daprMqttTestCA", cert.Subject.CommonName) + }) + + t.Run("invalid client certificate", func(t *testing.T) { + fakeProperties := getFakeProperties() + fakeMetaData := pubsub.Metadata{Base: mdata.Base{Properties: fakeProperties}} + fakeMetaData.Properties[amqpClientCert] = "randomNonPEMBlockClientCert" + _, err := parseAMQPMetaData(fakeMetaData, log) + + // assert + assert.Contains(t, err.Error(), "invalid clientCert") + }) + + t.Run("valid client certificate", func(t *testing.T) { + fakeProperties := getFakeProperties() + fakeMetaData := pubsub.Metadata{Base: mdata.Base{Properties: fakeProperties}} + fakeMetaData.Properties[amqpClientCert] = "-----BEGIN CERTIFICATE-----\nMIICzDCCAbQCCQDBKDMS3SHsDzANBgkqhkiG9w0BAQUFADAmMQswCQYDVQQGEwJJ\nTjEXMBUGA1UEAwwOZGFwck1xdHRUZXN0Q0EwHhcNMjAwODEyMDY1NTE1WhcNMjEw\nODA3MDY1NTE1WjAqMQswCQYDVQQGEwJJTjEbMBkGA1UEAwwSZGFwck1xdHRUZXN0\nQ2xpZW50MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA5IDfsGI2pb4W\nt3CjckrKuNeTrgmla3sXxSI5wfDgLGd/XkNu++M6yi9ABaBiYChpxbylqIeAn/HT\n3r/nhcb+bldMtEkU9tODHy/QDhvN2UGFjRsMfzO9p1oMpTnRdJCHYinE+oqVced5\nHI+UEofAU+1eiIXqJGKrdfn4gvaHst4QfVPvui8WzJq9TMkEhEME+5hs3VKyKZr2\nqjIxzr7nLVod3DBf482VjxRI06Ip3fPvNuMWwzj2G+Rj8PMcBjoKeCLQL9uQh7f1\nTWHuACqNIrmFEUQWdGETnRjHWIvw0NEL40+Ur2b5+7/hoqnTzReJ3XUe1jM3l44f\nl0rOf4hu2QIDAQABMA0GCSqGSIb3DQEBBQUAA4IBAQAT9yoIeX0LTsvx7/b+8V3a\nkP+j8u97QCc8n5xnMpivcMEk5cfqXX5Llv2EUJ9kBsynrJwT7ujhTJXSA/zb2UdC\nKH8PaSrgIlLwQNZMDofbz6+zPbjStkgne/ZQkTDIxY73sGpJL8LsQVO9p2KjOpdj\nSf9KuJhLzcHolh7ry3ZrkOg+QlMSvseeDRAxNhpkJrGQ6piXoUiEeKKNa0rWTMHx\nIP1Hqj+hh7jgqoQR48NL2jNng7I64HqTl6Mv2fiNfINiw+5xmXTB0QYkGU5NvPBO\naKcCRcGlU7ND89BogQPZsl/P04tAuQqpQWffzT4sEEOyWSVGda4N2Ys3GSQGBv8e\n-----END CERTIFICATE-----\n" + m, err := parseAMQPMetaData(fakeMetaData, log) + + // assert + assert.NoError(t, err) + block, _ := pem.Decode([]byte(m.tlsCfg.clientCert)) + cert, err := x509.ParseCertificate(block.Bytes) + if err != nil { + t.Errorf("failed to parse client certificate from metadata. %v", err) + } + assert.Equal(t, "daprMqttTestClient", cert.Subject.CommonName) + }) + + t.Run("invalid client certificate key", func(t *testing.T) { + fakeProperties := getFakeProperties() + fakeMetaData := pubsub.Metadata{Base: mdata.Base{Properties: fakeProperties}} + fakeMetaData.Properties[amqpClientKey] = "randomNonPEMBlockClientKey" + _, err := parseAMQPMetaData(fakeMetaData, log) + + // assert + assert.Contains(t, err.Error(), "invalid clientKey") + }) + + t.Run("valid client certificate key", func(t *testing.T) { + fakeProperties := getFakeProperties() + fakeMetaData := pubsub.Metadata{Base: mdata.Base{Properties: fakeProperties}} + fakeMetaData.Properties[amqpClientKey] = "-----BEGIN RSA PRIVATE KEY-----\nMIIEpAIBAAKCAQEA5IDfsGI2pb4Wt3CjckrKuNeTrgmla3sXxSI5wfDgLGd/XkNu\n++M6yi9ABaBiYChpxbylqIeAn/HT3r/nhcb+bldMtEkU9tODHy/QDhvN2UGFjRsM\nfzO9p1oMpTnRdJCHYinE+oqVced5HI+UEofAU+1eiIXqJGKrdfn4gvaHst4QfVPv\nui8WzJq9TMkEhEME+5hs3VKyKZr2qjIxzr7nLVod3DBf482VjxRI06Ip3fPvNuMW\nwzj2G+Rj8PMcBjoKeCLQL9uQh7f1TWHuACqNIrmFEUQWdGETnRjHWIvw0NEL40+U\nr2b5+7/hoqnTzReJ3XUe1jM3l44fl0rOf4hu2QIDAQABAoIBAQCVMINb4TP20P55\n9IPyqlxjhPT563hijXK+lhMJyiBDPavOOs7qjLikq2bshYPVbm1o2jt6pkXXqAeB\n5t/d20fheQQurYyPfxecNBZuL78duwbcUy28m2aXLlcVRYO4zGhoMgdW4UajoNLV\nT/UIiDONWGyhTHXMHdP+6h9UOmvs3o4b225AuLrw9n6QO5I1Se8lcfOTIqR1fy4O\nGsUWEQPdW0X3Dhgpx7kDIuBTAQzbjD31PCR1U8h2wsCeEe6hPCrsMbo/D019weol\ndi40tbWR1/oNz0+vro2d9YDPJkXN0gmpT51Z4YJoexZBdyzO5z4DMSdn5yczzt6p\nQq8LsXAFAoGBAPYXRbC4OxhtuC+xr8KRkaCCMjtjUWFbFWf6OFgUS9b5uPz9xvdY\nXo7wBP1zp2dS8yFsdIYH5Six4Z5iOuDR4sVixzjabhwedL6bmS1zV5qcCWeASKX1\nURgSkfMmC4Tg3LBgZ9YxySFcVRjikxljkS3eK7Mp7Xmj5afe7qV73TJfAoGBAO20\nTtw2RGe02xnydZmmwf+NpQHOA9S0JsehZA6NRbtPEN/C8bPJIq4VABC5zcH+tfYf\nzndbDlGhuk+qpPA590rG5RSOUjYnQFq7njdSfFyok9dXSZQTjJwFnG2oy0LmgjCe\nROYnbCzD+a+gBKV4xlo2M80OLakQ3zOwPT0xNRnHAoGATLEj/tbrU8mdxP9TDwfe\nom7wyKFDE1wXZ7gLJyfsGqrog69y+lKH5XPXmkUYvpKTQq9SARMkz3HgJkPmpXnD\nelA2Vfl8pza2m1BShF+VxZErPR41hcLV6vKemXAZ1udc33qr4YzSaZskygSSYy8s\nZ2b9p3BBmc8CGzbWmKvpW3ECgYEAn7sFLxdMWj/+5221Nr4HKPn+wrq0ek9gq884\n1Ep8bETSOvrdvolPQ5mbBKJGsLC/h5eR/0Rx18sMzpIF6eOZ2GbU8z474mX36cCf\nrd9A8Gbbid3+9IE6gHGIz2uYwujw3UjNVbdyCpbahvjJhoQlDePUZVu8tRpAUpSA\nYklZvGsCgYBuIlOFTNGMVUnwfzrcS9a/31LSvWTZa8w2QFjsRPMYFezo2l4yWs4D\nPEpeuoJm+Gp6F6ayjoeyOw9mvMBH5hAZr4WjbiU6UodzEHREAsLAzCzcRyIpnDE6\nPW1c3j60r8AHVufkWTA+8B9WoLC5MqcYTV3beMGnNGGqS2PeBom63Q==\n-----END RSA PRIVATE KEY-----\n" + m, err := parseAMQPMetaData(fakeMetaData, log) + + // assert + assert.NoError(t, err) + assert.NotNil(t, m.tlsCfg.clientKey, "failed to parse valid client certificate key") + }) +} diff --git a/pubsub/solace/amqp/metadata.go b/pubsub/solace/amqp/metadata.go new file mode 100644 index 000000000..bfbfc5261 --- /dev/null +++ b/pubsub/solace/amqp/metadata.go @@ -0,0 +1,117 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package amqp + +import ( + "encoding/pem" + "fmt" + "strconv" + "time" + + "github.com/dapr/components-contrib/pubsub" + "github.com/dapr/kit/logger" +) + +const ( + // errors. + errorMsgPrefix = "amqp pub sub error:" +) + +type metadata struct { + tlsCfg + url string + username string + password string + anonymous bool +} + +type tlsCfg struct { + caCert string + clientCert string + clientKey string +} + +const ( + // Keys + amqpURL = "url" + anonymous = "anonymous" + username = "username" + password = "password" + amqpCACert = "caCert" + amqpClientCert = "clientCert" + amqpClientKey = "clientKey" + defaultWait = 30 * time.Second +) + +// isValidPEM validates the provided input has PEM formatted block. +func isValidPEM(val string) bool { + block, _ := pem.Decode([]byte(val)) + + return block != nil +} + +func parseAMQPMetaData(md pubsub.Metadata, log logger.Logger) (*metadata, error) { + m := metadata{anonymous: false} + + // required configuration settings + if val, ok := md.Properties[amqpURL]; ok && val != "" { + m.url = val + } else { + return &m, fmt.Errorf("%s missing url", errorMsgPrefix) + } + + // optional configuration settings + if val, ok := md.Properties[anonymous]; ok && val != "" { + var err error + m.anonymous, err = strconv.ParseBool(val) + if err != nil { + return &m, fmt.Errorf("%s invalid anonymous %s, %s", errorMsgPrefix, val, err) + } + } + + if !m.anonymous { + if val, ok := md.Properties[username]; ok && val != "" { + m.username = val + } else { + return &m, fmt.Errorf("%s missing username", errorMsgPrefix) + } + + if val, ok := md.Properties[password]; ok && val != "" { + m.password = val + } else { + return &m, fmt.Errorf("%s missing username", errorMsgPrefix) + } + } + + if val, ok := md.Properties[amqpCACert]; ok && val != "" { + if !isValidPEM(val) { + return &m, fmt.Errorf("%s invalid caCert", errorMsgPrefix) + } + m.tlsCfg.caCert = val + } + if val, ok := md.Properties[amqpClientCert]; ok && val != "" { + if !isValidPEM(val) { + return &m, fmt.Errorf("%s invalid clientCert", errorMsgPrefix) + } + m.tlsCfg.clientCert = val + } + if val, ok := md.Properties[amqpClientKey]; ok && val != "" { + if !isValidPEM(val) { + return &m, fmt.Errorf("%s invalid clientKey", errorMsgPrefix) + } + m.tlsCfg.clientKey = val + } + + return &m, nil +} diff --git a/tests/config/pubsub/solace/amqp/pubsub.yml b/tests/config/pubsub/solace/amqp/pubsub.yml new file mode 100644 index 000000000..16a07593e --- /dev/null +++ b/tests/config/pubsub/solace/amqp/pubsub.yml @@ -0,0 +1,10 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +spec: + type: pubsub.solace.amqp + version: v1 + metadata: + - name: url + value: 'amqp://localhost:5672' + - name: anonymous + value: true diff --git a/tests/config/pubsub/tests.yml b/tests/config/pubsub/tests.yml index 62f42b4c8..18acc9e82 100644 --- a/tests/config/pubsub/tests.yml +++ b/tests/config/pubsub/tests.yml @@ -12,7 +12,7 @@ componentType: pubsub components: - component: azure.eventhubs - operations: ["publish", "subscribe", "multiplehandlers", "bulkpublish"] + operations: ['publish', 'subscribe', 'multiplehandlers', 'bulkpublish'] config: pubsubName: azure-eventhubs testTopicName: eventhubs-pubsub-topic @@ -50,9 +50,9 @@ components: config: checkInOrderProcessing: false - component: natsstreaming - operations: ["publish", "subscribe", "multiplehandlers"] + operations: ['publish', 'subscribe', 'multiplehandlers'] - component: jetstream - operations: ["publish", "subscribe", "multiplehandlers"] + operations: ['publish', 'subscribe', 'multiplehandlers'] - component: kafka allOperations: true - component: kafka @@ -62,20 +62,22 @@ components: profile: confluent allOperations: true - component: pulsar - operations: ["publish", "subscribe", "multiplehandlers"] + operations: ['publish', 'subscribe', 'multiplehandlers'] - component: mqtt profile: mosquitto - operations: ["publish", "subscribe", "multiplehandlers"] + operations: ['publish', 'subscribe', 'multiplehandlers'] + - component: solace.amqp + operations: ['publish', 'subscribe'] - component: mqtt profile: emqx - operations: ["publish", "subscribe", "multiplehandlers"] + operations: ['publish', 'subscribe', 'multiplehandlers'] - component: mqtt profile: vernemq - operations: ["publish", "subscribe", "multiplehandlers"] + operations: ['publish', 'subscribe', 'multiplehandlers'] - component: hazelcast - operations: ["publish", "subscribe", "multiplehandlers"] + operations: ['publish', 'subscribe', 'multiplehandlers'] - component: rabbitmq - operations: ["publish", "subscribe", "multiplehandlers"] + operations: ['publish', 'subscribe', 'multiplehandlers'] config: checkInOrderProcessing: false - component: in-memory @@ -94,4 +96,4 @@ components: pubsubName: aws-snssqs checkInOrderProcessing: false - component: kubemq - operations: ["publish", "subscribe", "multiplehandlers"] + operations: ['publish', 'subscribe', 'multiplehandlers'] diff --git a/tests/conformance/common.go b/tests/conformance/common.go index d79e0fe28..5d08462f8 100644 --- a/tests/conformance/common.go +++ b/tests/conformance/common.go @@ -70,6 +70,7 @@ import ( p_pulsar "github.com/dapr/components-contrib/pubsub/pulsar" p_rabbitmq "github.com/dapr/components-contrib/pubsub/rabbitmq" p_redis "github.com/dapr/components-contrib/pubsub/redis" + p_solaceamqp "github.com/dapr/components-contrib/pubsub/solace/amqp" ss_azure "github.com/dapr/components-contrib/secretstores/azure/keyvault" ss_hashicorp_vault "github.com/dapr/components-contrib/secretstores/hashicorp/vault" ss_kubernetes "github.com/dapr/components-contrib/secretstores/kubernetes" @@ -452,6 +453,8 @@ func loadPubSub(tc TestComponent) pubsub.PubSub { pubsub = p_snssqs.NewSnsSqs(testLogger) case "kubemq": pubsub = p_kubemq.NewKubeMQ(testLogger) + case "solace.amqp": + pubsub = p_solaceamqp.NewAMQPPubsub(testLogger) default: return nil }