From 2039a30de702ce1bf2bd3e0a5a3741fa87d41f5c Mon Sep 17 00:00:00 2001 From: TKTheTechie Date: Mon, 21 Nov 2022 14:49:02 -0500 Subject: [PATCH 01/10] AMQP Plugin Initial cut of the AMQP 1.0 Plugin Signed-off-by: TKTheTechie --- pubsub/amqp/amqp.go | 285 +++++++++++++++++++++++++++++++++++++++ pubsub/amqp/amqp_test.go | 138 +++++++++++++++++++ pubsub/amqp/metadata.go | 114 ++++++++++++++++ 3 files changed, 537 insertions(+) create mode 100644 pubsub/amqp/amqp.go create mode 100644 pubsub/amqp/amqp_test.go create mode 100644 pubsub/amqp/metadata.go diff --git a/pubsub/amqp/amqp.go b/pubsub/amqp/amqp.go new file mode 100644 index 000000000..9f3341b0b --- /dev/null +++ b/pubsub/amqp/amqp.go @@ -0,0 +1,285 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package amqp + +import ( + "context" + "crypto/tls" + "crypto/x509" + "errors" + amqp "github.com/Azure/go-amqp" + "github.com/dapr/components-contrib/pubsub" + "github.com/dapr/kit/logger" + "net/url" + "strconv" + "strings" + "sync" + time "time" +) + +const ( + publishRetryWaitSeconds = 2 + publishMaxRetries = 3 +) + +// amqpPubSub type allows sending and receiving data to/from an AMQP 1.0 broker +type amqpPubSub struct { + session amqp.Session + metadata *metadata + logger logger.Logger + publishLock sync.RWMutex + publishRetryCount int + ctx context.Context + cancel context.CancelFunc +} + +// NewAMQPPubsub returns a new AMQPPubSub instance +func NewAMQPPubsub(logger logger.Logger) pubsub.PubSub { + return &amqpPubSub{ + logger: logger, + publishLock: sync.RWMutex{}, + } +} + +// Init parses the metadata and creates a new Pub Sub Client. +func (a *amqpPubSub) Init(metadata pubsub.Metadata) error { + amqpMeta, err := parseAMQPMetaData(metadata, a.logger) + if err != nil { + return err + } + + a.metadata = amqpMeta + + a.ctx, a.cancel = context.WithCancel(context.Background()) + + s, err := a.connect() + + if err != nil { + return err + } + + a.session = *s + + return err +} + +// Publish the topic to amqp pubsub +func (a *amqpPubSub) Publish(req *pubsub.PublishRequest) error { + + a.publishLock.Lock() + defer a.publishLock.Unlock() + + a.publishRetryCount = 0 + + if req.Topic == "" { + return errors.New("topic name is empty") + } + + m := amqp.NewMessage(req.Data) + + //If the request has ttl specified, put it on the message header + ttlProp := req.Metadata["ttlInSeconds"] + if ttlProp != "" { + ttlInSeconds, err := strconv.Atoi(ttlProp) + if err != nil { + a.logger.Warnf("Invalid ttl received from message %s", ttlInSeconds) + } else { + m.Header.TTL = time.Second * time.Duration(ttlInSeconds) + } + } + + dest := req.Topic + + //Unless the request comes in to publish on a queue, publish directly on a topic + if !strings.HasPrefix(dest, "queue://") && !strings.HasPrefix(dest, "topic://") { + dest = "topic://" + dest + } + + sender, err := a.session.NewSender( + amqp.LinkTargetAddress(dest), + ) + + if err != nil { + a.logger.Errorf("Unable to create link to %s", req.Topic, err) + } else { + + err = sender.Send(a.ctx, m) + + //If the publish operation has failed, attemp to republish a maximum number of times + // before giving up + if err != nil { + for a.publishRetryCount <= publishMaxRetries { + a.publishRetryCount++ + + // Send message + err = sender.Send(a.ctx, m) + + if err != nil { + a.logger.Warnf("Failed to publish a message to the broker", err) + } + time.Sleep(publishRetryWaitSeconds * time.Second) + } + } + } + + return err + +} + +// Set up a subscription directly to a queue. Subscriptions to topics are not currently supported +func (a *amqpPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error { + + receiver, err := a.session.NewReceiver( + amqp.LinkSourceAddress(req.Topic), + ) + + if err == nil { + a.logger.Infof("Attempting to subscribe to %s", req.Topic) + go a.subscribeForever(ctx, receiver, handler) + } else { + a.logger.Error("Unable to create a receiver:", err) + } + + return err + +} + +// function that subscribes to a queue in a tight loop +func (a *amqpPubSub) subscribeForever(ctx context.Context, receiver *amqp.Receiver, handler pubsub.Handler) { + for { + // Receive next message + msg, err := receiver.Receive(ctx) + + a.logger.Debugf("Received a message %s", msg.GetData()) + + pubsubMsg := &pubsub.NewMessage{ + Data: msg.GetData(), + Topic: msg.LinkName(), + } + + if err != nil { + a.logger.Errorf("failed to establish receiver") + } + + err = handler(ctx, pubsubMsg) + + if err == nil { + err := receiver.AcceptMessage(ctx, msg) + a.logger.Debugf("ACKed a message") + if err != nil { + a.logger.Errorf("failed to acknowledge a message") + } + } else { + a.logger.Errorf("Error processing message from %s", msg.LinkName()) + a.logger.Debugf("NAKd a message") + err := receiver.RejectMessage(ctx, msg, nil) + if err != nil { + a.logger.Errorf("failed to NAK a message") + + } + } + + } +} + +// Connect to the AMQP broker +func (a *amqpPubSub) connect() (*amqp.Session, error) { + uri, err := url.Parse(a.metadata.url) + if err != nil { + return nil, err + } + + clientOpts := a.createClientOptions(uri) + + a.logger.Infof("Attempting to connect to %s", a.metadata.url) + client, err := amqp.Dial(a.metadata.url, clientOpts...) + if err != nil { + a.logger.Fatal("Dialing AMQP server:", err) + } + + // Open a session + session, err := client.NewSession() + if err != nil { + a.logger.Fatal("Creating AMQP session:", err) + } + + return session, nil + +} + +func (a *amqpPubSub) newTLSConfig() *tls.Config { + tlsConfig := new(tls.Config) + + if a.metadata.clientCert != "" && a.metadata.clientKey != "" { + cert, err := tls.X509KeyPair([]byte(a.metadata.clientCert), []byte(a.metadata.clientKey)) + if err != nil { + a.logger.Warnf("unable to load client certificate and key pair. Err: %v", err) + + return tlsConfig + } + tlsConfig.Certificates = []tls.Certificate{cert} + } + + if a.metadata.caCert != "" { + tlsConfig.RootCAs = x509.NewCertPool() + if ok := tlsConfig.RootCAs.AppendCertsFromPEM([]byte(a.metadata.caCert)); !ok { + a.logger.Warnf("unable to load ca certificate.") + } + } + + return tlsConfig +} + +func (a *amqpPubSub) createClientOptions(uri *url.URL) []amqp.ConnOption { + + var opts []amqp.ConnOption + + scheme := uri.Scheme + + switch scheme { + case "amqp": + if a.metadata.anonymous == true { + opts = append(opts, amqp.ConnSASLAnonymous()) + + } else { + opts = append(opts, amqp.ConnSASLPlain(a.metadata.username, a.metadata.password)) + } + case "amqps": + opts = append(opts, amqp.ConnSASLPlain(a.metadata.username, a.metadata.password)) + opts = append(opts, amqp.ConnTLS(true)) + opts = append(opts, amqp.ConnTLSConfig(a.newTLSConfig())) + } + + return opts +} + +// Close the session +func (a *amqpPubSub) Close() error { + a.publishLock.Lock() + + defer a.publishLock.Unlock() + + err := a.session.Close(a.ctx) + + if err != nil { + a.logger.Warnf("failed to close the connection.", err) + } + return err + +} + +// Feature list for AMQP PubSub +func (a *amqpPubSub) Features() []pubsub.Feature { + return []pubsub.Feature{pubsub.FeatureSubscribeWildcards, pubsub.FeatureMessageTTL} +} diff --git a/pubsub/amqp/amqp_test.go b/pubsub/amqp/amqp_test.go new file mode 100644 index 000000000..41f7978b6 --- /dev/null +++ b/pubsub/amqp/amqp_test.go @@ -0,0 +1,138 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package amqp + +import ( + "crypto/x509" + "encoding/pem" + "errors" + mdata "github.com/dapr/components-contrib/metadata" + "github.com/dapr/components-contrib/pubsub" + "github.com/dapr/kit/logger" + "github.com/stretchr/testify/assert" + "testing" +) + +func getFakeProperties() map[string]string { + return map[string]string{ + "consumerID": "client", + amqpUrl: "tcp://fakeUser:fakePassword@fake.mqtt.host:1883", + anonymous: "false", + username: "default", + password: "default", + } +} + +func TestParseMetadata(t *testing.T) { + log := logger.NewLogger("test") + t.Run("metadata is correct", func(t *testing.T) { + fakeProperties := getFakeProperties() + + fakeMetaData := pubsub.Metadata{Base: mdata.Base{Properties: fakeProperties}} + + m, err := parseAMQPMetaData(fakeMetaData, log) + + // assert + assert.NoError(t, err) + assert.Equal(t, fakeProperties[amqpUrl], m.url) + }) + + t.Run("url is not given", func(t *testing.T) { + fakeProperties := getFakeProperties() + + fakeMetaData := pubsub.Metadata{ + Base: mdata.Base{Properties: fakeProperties}, + } + fakeMetaData.Properties[amqpUrl] = "" + + m, err := parseAMQPMetaData(fakeMetaData, log) + + // assert + assert.EqualError(t, err, errors.New(errorMsgPrefix+" missing url").Error()) + assert.Equal(t, fakeProperties[amqpUrl], m.url) + }) + + t.Run("invalid ca certificate", func(t *testing.T) { + fakeProperties := getFakeProperties() + fakeMetaData := pubsub.Metadata{Base: mdata.Base{Properties: fakeProperties}} + fakeMetaData.Properties[amqpCACert] = "randomNonPEMBlockCA" + _, err := parseAMQPMetaData(fakeMetaData, log) + + // assert + assert.Contains(t, err.Error(), "invalid caCert") + }) + + t.Run("valid ca certificate", func(t *testing.T) { + fakeProperties := getFakeProperties() + fakeMetaData := pubsub.Metadata{Base: mdata.Base{Properties: fakeProperties}} + fakeMetaData.Properties[amqpCACert] = "-----BEGIN CERTIFICATE-----\nMIICyDCCAbACCQDb8BtgvbqW5jANBgkqhkiG9w0BAQsFADAmMQswCQYDVQQGEwJJ\nTjEXMBUGA1UEAwwOZGFwck1xdHRUZXN0Q0EwHhcNMjAwODEyMDY1MzU4WhcNMjUw\nODEyMDY1MzU4WjAmMQswCQYDVQQGEwJJTjEXMBUGA1UEAwwOZGFwck1xdHRUZXN0\nQ0EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDEXte1GBxFJaygsEnK\nHV2AxazZW6Vppv+i50AuURHcaGo0i8G5CTfHzSKrYtTFfBskUspl+2N8GPV5c8Eb\ng+PP6YFn1wiHVz+wRSk3BD35DcGOT2o4XsJw5tiAzJkbpAOYCYl7KAM+BtOf41uC\nd6TdqmawhRGtv1ND2WtyJOT6A3KcUfjhL4TFEhWoljPJVay4TQoJcZMAImD/Xcxw\n6urv6wmUJby3/RJ3I46ZNH3zxEw5vSq1TuzuXxQmfPJG0ZPKJtQZ2nkZ3PNZe4bd\nNUa83YgQap7nBhYdYMMsQyLES2qy3mPcemBVoBWRGODel4PMEcsQiOhAyloAF2d3\nhd+LAgMBAAEwDQYJKoZIhvcNAQELBQADggEBAK13X5JYBy78vHYoP0Oq9fe5XBbL\nuRM8YLnet9b/bXTGG4SnCCOGqWz99swYK7SVyR5l2h8SAoLzeNV61PtaZ6fHrbar\noxSL7BoRXOhMH6LQATadyvwlJ71uqlagqya7soaPK09TtfzeebLT0QkRCWT9b9lQ\nDBvBVCaFidynJL1ts21m5yUdIY4JSu4sGZGb4FRGFdBv/hD3wH8LAkOppsSv3C/Q\nkfkDDSQzYbdMoBuXmafvi3He7Rv+e6Tj9or1rrWdx0MIKlZPzz4DOe5Rh112uRB9\n7xPHJt16c+Ya3DKpchwwdNcki0vFchlpV96HK8sMCoY9kBzPhkEQLdiBGv4=\n-----END CERTIFICATE-----\n" + m, err := parseAMQPMetaData(fakeMetaData, log) + + // assert + assert.NoError(t, err) + block, _ := pem.Decode([]byte(m.tlsCfg.caCert)) + cert, err := x509.ParseCertificate(block.Bytes) + if err != nil { + t.Errorf("failed to parse ca certificate from metadata. %v", err) + } + assert.Equal(t, "daprMqttTestCA", cert.Subject.CommonName) + }) + + t.Run("invalid client certificate", func(t *testing.T) { + fakeProperties := getFakeProperties() + fakeMetaData := pubsub.Metadata{Base: mdata.Base{Properties: fakeProperties}} + fakeMetaData.Properties[amqpClientCert] = "randomNonPEMBlockClientCert" + _, err := parseAMQPMetaData(fakeMetaData, log) + + // assert + assert.Contains(t, err.Error(), "invalid clientCert") + }) + + t.Run("valid client certificate", func(t *testing.T) { + fakeProperties := getFakeProperties() + fakeMetaData := pubsub.Metadata{Base: mdata.Base{Properties: fakeProperties}} + fakeMetaData.Properties[amqpClientCert] = "-----BEGIN CERTIFICATE-----\nMIICzDCCAbQCCQDBKDMS3SHsDzANBgkqhkiG9w0BAQUFADAmMQswCQYDVQQGEwJJ\nTjEXMBUGA1UEAwwOZGFwck1xdHRUZXN0Q0EwHhcNMjAwODEyMDY1NTE1WhcNMjEw\nODA3MDY1NTE1WjAqMQswCQYDVQQGEwJJTjEbMBkGA1UEAwwSZGFwck1xdHRUZXN0\nQ2xpZW50MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA5IDfsGI2pb4W\nt3CjckrKuNeTrgmla3sXxSI5wfDgLGd/XkNu++M6yi9ABaBiYChpxbylqIeAn/HT\n3r/nhcb+bldMtEkU9tODHy/QDhvN2UGFjRsMfzO9p1oMpTnRdJCHYinE+oqVced5\nHI+UEofAU+1eiIXqJGKrdfn4gvaHst4QfVPvui8WzJq9TMkEhEME+5hs3VKyKZr2\nqjIxzr7nLVod3DBf482VjxRI06Ip3fPvNuMWwzj2G+Rj8PMcBjoKeCLQL9uQh7f1\nTWHuACqNIrmFEUQWdGETnRjHWIvw0NEL40+Ur2b5+7/hoqnTzReJ3XUe1jM3l44f\nl0rOf4hu2QIDAQABMA0GCSqGSIb3DQEBBQUAA4IBAQAT9yoIeX0LTsvx7/b+8V3a\nkP+j8u97QCc8n5xnMpivcMEk5cfqXX5Llv2EUJ9kBsynrJwT7ujhTJXSA/zb2UdC\nKH8PaSrgIlLwQNZMDofbz6+zPbjStkgne/ZQkTDIxY73sGpJL8LsQVO9p2KjOpdj\nSf9KuJhLzcHolh7ry3ZrkOg+QlMSvseeDRAxNhpkJrGQ6piXoUiEeKKNa0rWTMHx\nIP1Hqj+hh7jgqoQR48NL2jNng7I64HqTl6Mv2fiNfINiw+5xmXTB0QYkGU5NvPBO\naKcCRcGlU7ND89BogQPZsl/P04tAuQqpQWffzT4sEEOyWSVGda4N2Ys3GSQGBv8e\n-----END CERTIFICATE-----\n" + m, err := parseAMQPMetaData(fakeMetaData, log) + + // assert + assert.NoError(t, err) + block, _ := pem.Decode([]byte(m.tlsCfg.clientCert)) + cert, err := x509.ParseCertificate(block.Bytes) + if err != nil { + t.Errorf("failed to parse client certificate from metadata. %v", err) + } + assert.Equal(t, "daprMqttTestClient", cert.Subject.CommonName) + }) + + t.Run("invalid client certificate key", func(t *testing.T) { + fakeProperties := getFakeProperties() + fakeMetaData := pubsub.Metadata{Base: mdata.Base{Properties: fakeProperties}} + fakeMetaData.Properties[amqpClientKey] = "randomNonPEMBlockClientKey" + _, err := parseAMQPMetaData(fakeMetaData, log) + + // assert + assert.Contains(t, err.Error(), "invalid clientKey") + }) + + t.Run("valid client certificate key", func(t *testing.T) { + fakeProperties := getFakeProperties() + fakeMetaData := pubsub.Metadata{Base: mdata.Base{Properties: fakeProperties}} + fakeMetaData.Properties[amqpClientKey] = "-----BEGIN RSA PRIVATE KEY-----\nMIIEpAIBAAKCAQEA5IDfsGI2pb4Wt3CjckrKuNeTrgmla3sXxSI5wfDgLGd/XkNu\n++M6yi9ABaBiYChpxbylqIeAn/HT3r/nhcb+bldMtEkU9tODHy/QDhvN2UGFjRsM\nfzO9p1oMpTnRdJCHYinE+oqVced5HI+UEofAU+1eiIXqJGKrdfn4gvaHst4QfVPv\nui8WzJq9TMkEhEME+5hs3VKyKZr2qjIxzr7nLVod3DBf482VjxRI06Ip3fPvNuMW\nwzj2G+Rj8PMcBjoKeCLQL9uQh7f1TWHuACqNIrmFEUQWdGETnRjHWIvw0NEL40+U\nr2b5+7/hoqnTzReJ3XUe1jM3l44fl0rOf4hu2QIDAQABAoIBAQCVMINb4TP20P55\n9IPyqlxjhPT563hijXK+lhMJyiBDPavOOs7qjLikq2bshYPVbm1o2jt6pkXXqAeB\n5t/d20fheQQurYyPfxecNBZuL78duwbcUy28m2aXLlcVRYO4zGhoMgdW4UajoNLV\nT/UIiDONWGyhTHXMHdP+6h9UOmvs3o4b225AuLrw9n6QO5I1Se8lcfOTIqR1fy4O\nGsUWEQPdW0X3Dhgpx7kDIuBTAQzbjD31PCR1U8h2wsCeEe6hPCrsMbo/D019weol\ndi40tbWR1/oNz0+vro2d9YDPJkXN0gmpT51Z4YJoexZBdyzO5z4DMSdn5yczzt6p\nQq8LsXAFAoGBAPYXRbC4OxhtuC+xr8KRkaCCMjtjUWFbFWf6OFgUS9b5uPz9xvdY\nXo7wBP1zp2dS8yFsdIYH5Six4Z5iOuDR4sVixzjabhwedL6bmS1zV5qcCWeASKX1\nURgSkfMmC4Tg3LBgZ9YxySFcVRjikxljkS3eK7Mp7Xmj5afe7qV73TJfAoGBAO20\nTtw2RGe02xnydZmmwf+NpQHOA9S0JsehZA6NRbtPEN/C8bPJIq4VABC5zcH+tfYf\nzndbDlGhuk+qpPA590rG5RSOUjYnQFq7njdSfFyok9dXSZQTjJwFnG2oy0LmgjCe\nROYnbCzD+a+gBKV4xlo2M80OLakQ3zOwPT0xNRnHAoGATLEj/tbrU8mdxP9TDwfe\nom7wyKFDE1wXZ7gLJyfsGqrog69y+lKH5XPXmkUYvpKTQq9SARMkz3HgJkPmpXnD\nelA2Vfl8pza2m1BShF+VxZErPR41hcLV6vKemXAZ1udc33qr4YzSaZskygSSYy8s\nZ2b9p3BBmc8CGzbWmKvpW3ECgYEAn7sFLxdMWj/+5221Nr4HKPn+wrq0ek9gq884\n1Ep8bETSOvrdvolPQ5mbBKJGsLC/h5eR/0Rx18sMzpIF6eOZ2GbU8z474mX36cCf\nrd9A8Gbbid3+9IE6gHGIz2uYwujw3UjNVbdyCpbahvjJhoQlDePUZVu8tRpAUpSA\nYklZvGsCgYBuIlOFTNGMVUnwfzrcS9a/31LSvWTZa8w2QFjsRPMYFezo2l4yWs4D\nPEpeuoJm+Gp6F6ayjoeyOw9mvMBH5hAZr4WjbiU6UodzEHREAsLAzCzcRyIpnDE6\nPW1c3j60r8AHVufkWTA+8B9WoLC5MqcYTV3beMGnNGGqS2PeBom63Q==\n-----END RSA PRIVATE KEY-----\n" + m, err := parseAMQPMetaData(fakeMetaData, log) + + // assert + assert.NoError(t, err) + assert.NotNil(t, m.tlsCfg.clientKey, "failed to parse valid client certificate key") + }) +} diff --git a/pubsub/amqp/metadata.go b/pubsub/amqp/metadata.go new file mode 100644 index 000000000..ae7878668 --- /dev/null +++ b/pubsub/amqp/metadata.go @@ -0,0 +1,114 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package amqp + +import ( + "encoding/pem" + "fmt" + "strconv" + "time" + + "github.com/dapr/components-contrib/pubsub" + "github.com/dapr/kit/logger" +) + +const ( + // errors. + errorMsgPrefix = "amqp pub sub error:" +) + +type metadata struct { + tlsCfg + url string + username string + password string + anonymous bool +} + +type tlsCfg struct { + caCert string + clientCert string + clientKey string +} + +const ( + // Keys + amqpUrl = "url" + anonymous = "anonymous" + username = "username" + password = "password" + amqpCACert = "caCert" + amqpClientCert = "clientCert" + amqpClientKey = "clientKey" + defaultWait = 30 * time.Second +) + +// isValidPEM validates the provided input has PEM formatted block. +func isValidPEM(val string) bool { + block, _ := pem.Decode([]byte(val)) + + return block != nil +} + +func parseAMQPMetaData(md pubsub.Metadata, log logger.Logger) (*metadata, error) { + m := metadata{} + + // required configuration settings + if val, ok := md.Properties[amqpUrl]; ok && val != "" { + m.url = val + } else { + return &m, fmt.Errorf("%s missing url", errorMsgPrefix) + } + + if val, ok := md.Properties[username]; ok && val != "" { + m.username = val + } else { + return &m, fmt.Errorf("%s missing username", errorMsgPrefix) + } + + if val, ok := md.Properties[password]; ok && val != "" { + m.password = val + } else { + return &m, fmt.Errorf("%s missing username", errorMsgPrefix) + } + + // optional configuration settings + if val, ok := md.Properties[anonymous]; ok && val != "" { + var err error + m.anonymous, err = strconv.ParseBool(val) + if err != nil { + return &m, fmt.Errorf("%s invalid anonymous %s, %s", errorMsgPrefix, val, err) + } + } + if val, ok := md.Properties[amqpCACert]; ok && val != "" { + if !isValidPEM(val) { + return &m, fmt.Errorf("%s invalid caCert", errorMsgPrefix) + } + m.tlsCfg.caCert = val + } + if val, ok := md.Properties[amqpClientCert]; ok && val != "" { + if !isValidPEM(val) { + return &m, fmt.Errorf("%s invalid clientCert", errorMsgPrefix) + } + m.tlsCfg.clientCert = val + } + if val, ok := md.Properties[amqpClientKey]; ok && val != "" { + if !isValidPEM(val) { + return &m, fmt.Errorf("%s invalid clientKey", errorMsgPrefix) + } + m.tlsCfg.clientKey = val + } + + return &m, nil +} From 619c35693e58287be967aae66d12919e9183f047 Mon Sep 17 00:00:00 2001 From: TKTheTechie Date: Wed, 23 Nov 2022 13:27:13 -0500 Subject: [PATCH 02/10] Adding conformance tests Signed-off-by: TKTheTechie --- pubsub/amqp/amqp.go | 71 ++++++++++++---------- tests/config/pubsub/amqp/solace/pubsub.yml | 17 ++++++ tests/config/pubsub/tests.yml | 3 + tests/conformance/common.go | 3 + 4 files changed, 61 insertions(+), 33 deletions(-) create mode 100644 tests/config/pubsub/amqp/solace/pubsub.yml diff --git a/pubsub/amqp/amqp.go b/pubsub/amqp/amqp.go index 9f3341b0b..4775cf323 100644 --- a/pubsub/amqp/amqp.go +++ b/pubsub/amqp/amqp.go @@ -74,6 +74,17 @@ func (a *amqpPubSub) Init(metadata pubsub.Metadata) error { return err } +func AddPrefixToAddress(t string) string { + dest := t + + //Unless the request comes in to publish on a queue, publish directly on a topic + if !strings.HasPrefix(dest, "queue://") && !strings.HasPrefix(dest, "topic://") { + dest = "topic://" + dest + } + + return dest +} + // Publish the topic to amqp pubsub func (a *amqpPubSub) Publish(req *pubsub.PublishRequest) error { @@ -99,15 +110,8 @@ func (a *amqpPubSub) Publish(req *pubsub.PublishRequest) error { } } - dest := req.Topic - - //Unless the request comes in to publish on a queue, publish directly on a topic - if !strings.HasPrefix(dest, "queue://") && !strings.HasPrefix(dest, "topic://") { - dest = "topic://" + dest - } - sender, err := a.session.NewSender( - amqp.LinkTargetAddress(dest), + amqp.LinkTargetAddress(AddPrefixToAddress(req.Topic)), ) if err != nil { @@ -137,11 +141,10 @@ func (a *amqpPubSub) Publish(req *pubsub.PublishRequest) error { } -// Set up a subscription directly to a queue. Subscriptions to topics are not currently supported func (a *amqpPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error { receiver, err := a.session.NewReceiver( - amqp.LinkSourceAddress(req.Topic), + amqp.LinkSourceAddress(AddPrefixToAddress(req.Topic)), ) if err == nil { @@ -161,32 +164,34 @@ func (a *amqpPubSub) subscribeForever(ctx context.Context, receiver *amqp.Receiv // Receive next message msg, err := receiver.Receive(ctx) - a.logger.Debugf("Received a message %s", msg.GetData()) + if msg != nil { + a.logger.Debugf("Received a message %s", msg.GetData()) - pubsubMsg := &pubsub.NewMessage{ - Data: msg.GetData(), - Topic: msg.LinkName(), - } - - if err != nil { - a.logger.Errorf("failed to establish receiver") - } - - err = handler(ctx, pubsubMsg) - - if err == nil { - err := receiver.AcceptMessage(ctx, msg) - a.logger.Debugf("ACKed a message") - if err != nil { - a.logger.Errorf("failed to acknowledge a message") + pubsubMsg := &pubsub.NewMessage{ + Data: msg.GetData(), + Topic: msg.LinkName(), } - } else { - a.logger.Errorf("Error processing message from %s", msg.LinkName()) - a.logger.Debugf("NAKd a message") - err := receiver.RejectMessage(ctx, msg, nil) - if err != nil { - a.logger.Errorf("failed to NAK a message") + if err != nil { + a.logger.Errorf("failed to establish receiver") + } + + err = handler(ctx, pubsubMsg) + + if err == nil { + err := receiver.AcceptMessage(ctx, msg) + a.logger.Debugf("ACKed a message") + if err != nil { + a.logger.Errorf("failed to acknowledge a message") + } + } else { + a.logger.Errorf("Error processing message from %s", msg.LinkName()) + a.logger.Debugf("NAKd a message") + err := receiver.RejectMessage(ctx, msg, nil) + if err != nil { + a.logger.Errorf("failed to NAK a message") + + } } } diff --git a/tests/config/pubsub/amqp/solace/pubsub.yml b/tests/config/pubsub/amqp/solace/pubsub.yml new file mode 100644 index 000000000..efc51a9dc --- /dev/null +++ b/tests/config/pubsub/amqp/solace/pubsub.yml @@ -0,0 +1,17 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: solace +spec: + type: pubsub.amqp + version: v1 + metadata: + - name: url + value: 'amqp://localhost:5672' + - name: username + value: 'default' + - name: password + value: 'default' + - name: anonymous + value: true + diff --git a/tests/config/pubsub/tests.yml b/tests/config/pubsub/tests.yml index 70ecceff8..2415b7f20 100644 --- a/tests/config/pubsub/tests.yml +++ b/tests/config/pubsub/tests.yml @@ -62,6 +62,9 @@ components: - component: mqtt profile: mosquitto operations: ["publish", "subscribe", "multiplehandlers"] + - component: amqp + profile: solace + operations: ["publish", "subscribe"] - component: mqtt profile: emqx operations: ["publish", "subscribe", "multiplehandlers"] diff --git a/tests/conformance/common.go b/tests/conformance/common.go index 2229f0a9f..07e40fd6f 100644 --- a/tests/conformance/common.go +++ b/tests/conformance/common.go @@ -50,6 +50,7 @@ import ( b_postgres "github.com/dapr/components-contrib/bindings/postgres" b_rabbitmq "github.com/dapr/components-contrib/bindings/rabbitmq" b_redis "github.com/dapr/components-contrib/bindings/redis" + p_amqp "github.com/dapr/components-contrib/pubsub/amqp" p_snssqs "github.com/dapr/components-contrib/pubsub/aws/snssqs" p_eventhubs "github.com/dapr/components-contrib/pubsub/azure/eventhubs" p_servicebusqueues "github.com/dapr/components-contrib/pubsub/azure/servicebus/queues" @@ -401,6 +402,8 @@ func loadPubSub(tc TestComponent) pubsub.PubSub { pubsub = p_snssqs.NewSnsSqs(testLogger) case "kubemq": pubsub = p_kubemq.NewKubeMQ(testLogger) + case "amqp": + pubsub = p_amqp.NewAMQPPubsub(testLogger) default: return nil } From d7f4933bbbad6e52c469ec8b6495ba0d8ee596e1 Mon Sep 17 00:00:00 2001 From: TKTheTechie Date: Thu, 15 Dec 2022 10:26:58 -0600 Subject: [PATCH 03/10] Update amqp.go Support topic subscriptions Signed-off-by: TKTheTechie --- pubsub/amqp/amqp.go | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/pubsub/amqp/amqp.go b/pubsub/amqp/amqp.go index 4775cf323..2a3c4a6b2 100644 --- a/pubsub/amqp/amqp.go +++ b/pubsub/amqp/amqp.go @@ -18,14 +18,15 @@ import ( "crypto/tls" "crypto/x509" "errors" - amqp "github.com/Azure/go-amqp" - "github.com/dapr/components-contrib/pubsub" - "github.com/dapr/kit/logger" "net/url" "strconv" "strings" "sync" time "time" + + amqp "github.com/Azure/go-amqp" + "github.com/dapr/components-contrib/pubsub" + "github.com/dapr/kit/logger" ) const ( @@ -78,8 +79,12 @@ func AddPrefixToAddress(t string) string { dest := t //Unless the request comes in to publish on a queue, publish directly on a topic - if !strings.HasPrefix(dest, "queue://") && !strings.HasPrefix(dest, "topic://") { + if !strings.HasPrefix(dest, "queue:") && !strings.HasPrefix(dest, "topic:") { dest = "topic://" + dest + } else if strings.HasPrefix(dest, "queue:") { + dest = strings.Replace(dest, "queue:", "queue://", 1) + } else if strings.HasPrefix(dest, "topic:") { + dest = strings.Replace(dest, "topic:", "topic://", 1) } return dest @@ -143,13 +148,15 @@ func (a *amqpPubSub) Publish(req *pubsub.PublishRequest) error { func (a *amqpPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error { + prefixedTopic := AddPrefixToAddress(req.Topic) + receiver, err := a.session.NewReceiver( - amqp.LinkSourceAddress(AddPrefixToAddress(req.Topic)), + amqp.LinkSourceAddress(prefixedTopic), ) if err == nil { - a.logger.Infof("Attempting to subscribe to %s", req.Topic) - go a.subscribeForever(ctx, receiver, handler) + a.logger.Infof("Attempting to subscribe to %s", prefixedTopic) + go a.subscribeForever(ctx, receiver, handler, prefixedTopic) } else { a.logger.Error("Unable to create a receiver:", err) } @@ -159,7 +166,7 @@ func (a *amqpPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, } // function that subscribes to a queue in a tight loop -func (a *amqpPubSub) subscribeForever(ctx context.Context, receiver *amqp.Receiver, handler pubsub.Handler) { +func (a *amqpPubSub) subscribeForever(ctx context.Context, receiver *amqp.Receiver, handler pubsub.Handler, t string) { for { // Receive next message msg, err := receiver.Receive(ctx) From 338bc03e9e750f9f8eeff57b41c46a50f4845305 Mon Sep 17 00:00:00 2001 From: TKTheTechie Date: Mon, 19 Dec 2022 10:50:31 -0500 Subject: [PATCH 04/10] Update amqp.go Signed-off-by: TKTheTechie --- pubsub/amqp/amqp.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/pubsub/amqp/amqp.go b/pubsub/amqp/amqp.go index 2a3c4a6b2..2e2d8e6d7 100644 --- a/pubsub/amqp/amqp.go +++ b/pubsub/amqp/amqp.go @@ -18,6 +18,7 @@ import ( "crypto/tls" "crypto/x509" "errors" + "fmt" "net/url" "strconv" "strings" @@ -172,10 +173,16 @@ func (a *amqpPubSub) subscribeForever(ctx context.Context, receiver *amqp.Receiv msg, err := receiver.Receive(ctx) if msg != nil { - a.logger.Debugf("Received a message %s", msg.GetData()) + + data := msg.GetData() + + //if data is empty, then check the value field for data + if data == nil || len(data) == 0 { + data = []byte(fmt.Sprint(msg.Value)) + } pubsubMsg := &pubsub.NewMessage{ - Data: msg.GetData(), + Data: data, Topic: msg.LinkName(), } From 57c2324670211a1f9f3b3fa9819a81736ac2c585 Mon Sep 17 00:00:00 2001 From: TKTheTechie Date: Wed, 21 Dec 2022 13:18:35 -0500 Subject: [PATCH 05/10] Refactoring package names Signed-off-by: TKTheTechie --- pubsub/{ => solace}/amqp/amqp.go | 0 pubsub/{ => solace}/amqp/amqp_test.go | 3 +- pubsub/{ => solace}/amqp/metadata.go | 0 .../{amqp/solace => solace/amqp}/pubsub.yml | 5 +-- tests/config/pubsub/tests.yml | 31 +++++++++---------- tests/conformance/common.go | 6 ++-- 6 files changed, 21 insertions(+), 24 deletions(-) rename pubsub/{ => solace}/amqp/amqp.go (100%) rename pubsub/{ => solace}/amqp/amqp_test.go (99%) rename pubsub/{ => solace}/amqp/metadata.go (100%) rename tests/config/pubsub/{amqp/solace => solace/amqp}/pubsub.yml (84%) diff --git a/pubsub/amqp/amqp.go b/pubsub/solace/amqp/amqp.go similarity index 100% rename from pubsub/amqp/amqp.go rename to pubsub/solace/amqp/amqp.go diff --git a/pubsub/amqp/amqp_test.go b/pubsub/solace/amqp/amqp_test.go similarity index 99% rename from pubsub/amqp/amqp_test.go rename to pubsub/solace/amqp/amqp_test.go index 41f7978b6..6f2195764 100644 --- a/pubsub/amqp/amqp_test.go +++ b/pubsub/solace/amqp/amqp_test.go @@ -17,11 +17,12 @@ import ( "crypto/x509" "encoding/pem" "errors" + "testing" + mdata "github.com/dapr/components-contrib/metadata" "github.com/dapr/components-contrib/pubsub" "github.com/dapr/kit/logger" "github.com/stretchr/testify/assert" - "testing" ) func getFakeProperties() map[string]string { diff --git a/pubsub/amqp/metadata.go b/pubsub/solace/amqp/metadata.go similarity index 100% rename from pubsub/amqp/metadata.go rename to pubsub/solace/amqp/metadata.go diff --git a/tests/config/pubsub/amqp/solace/pubsub.yml b/tests/config/pubsub/solace/amqp/pubsub.yml similarity index 84% rename from tests/config/pubsub/amqp/solace/pubsub.yml rename to tests/config/pubsub/solace/amqp/pubsub.yml index efc51a9dc..6be13c322 100644 --- a/tests/config/pubsub/amqp/solace/pubsub.yml +++ b/tests/config/pubsub/solace/amqp/pubsub.yml @@ -1,9 +1,7 @@ apiVersion: dapr.io/v1alpha1 kind: Component -metadata: - name: solace spec: - type: pubsub.amqp + type: pubsub.solace.amqp version: v1 metadata: - name: url @@ -14,4 +12,3 @@ spec: value: 'default' - name: anonymous value: true - diff --git a/tests/config/pubsub/tests.yml b/tests/config/pubsub/tests.yml index 2415b7f20..cf51bd393 100644 --- a/tests/config/pubsub/tests.yml +++ b/tests/config/pubsub/tests.yml @@ -12,7 +12,7 @@ componentType: pubsub components: - component: azure.eventhubs - operations: ["publish", "subscribe", "multiplehandlers", "bulkpublish"] + operations: ['publish', 'subscribe', 'multiplehandlers', 'bulkpublish'] config: pubsubName: azure-eventhubs testTopicName: eventhubs-pubsub-topic @@ -42,13 +42,13 @@ components: testMultiTopic2Name: dapr-conf-queue-multi2 checkInOrderProcessing: false - component: redis - operations: ["publish", "subscribe", "multiplehandlers"] + operations: ['publish', 'subscribe', 'multiplehandlers'] config: checkInOrderProcessing: false - component: natsstreaming - operations: ["publish", "subscribe", "multiplehandlers"] + operations: ['publish', 'subscribe', 'multiplehandlers'] - component: jetstream - operations: ["publish", "subscribe", "multiplehandlers"] + operations: ['publish', 'subscribe', 'multiplehandlers'] - component: kafka allOperations: true - component: kafka @@ -58,30 +58,29 @@ components: profile: confluent allOperations: true - component: pulsar - operations: ["publish", "subscribe", "multiplehandlers"] + operations: ['publish', 'subscribe', 'multiplehandlers'] - component: mqtt profile: mosquitto - operations: ["publish", "subscribe", "multiplehandlers"] - - component: amqp - profile: solace - operations: ["publish", "subscribe"] + operations: ['publish', 'subscribe', 'multiplehandlers'] + - component: solace.amqp + operations: ['publish', 'subscribe'] - component: mqtt profile: emqx - operations: ["publish", "subscribe", "multiplehandlers"] + operations: ['publish', 'subscribe', 'multiplehandlers'] - component: mqtt profile: vernemq - operations: ["publish", "subscribe", "multiplehandlers"] + operations: ['publish', 'subscribe', 'multiplehandlers'] - component: hazelcast - operations: ["publish", "subscribe", "multiplehandlers"] + operations: ['publish', 'subscribe', 'multiplehandlers'] - component: rabbitmq - operations: ["publish", "subscribe", "multiplehandlers"] + operations: ['publish', 'subscribe', 'multiplehandlers'] config: checkInOrderProcessing: false - component: in-memory - operations: ["publish", "subscribe", "multiplehandlers"] + operations: ['publish', 'subscribe', 'multiplehandlers'] - component: aws.snssqs - operations: ["publish", "subscribe", "multiplehandlers"] + operations: ['publish', 'subscribe', 'multiplehandlers'] config: checkInOrderProcessing: false - component: kubemq - operations: ["publish", "subscribe", "multiplehandlers"] + operations: ['publish', 'subscribe', 'multiplehandlers'] diff --git a/tests/conformance/common.go b/tests/conformance/common.go index 07e40fd6f..04bc29815 100644 --- a/tests/conformance/common.go +++ b/tests/conformance/common.go @@ -50,7 +50,6 @@ import ( b_postgres "github.com/dapr/components-contrib/bindings/postgres" b_rabbitmq "github.com/dapr/components-contrib/bindings/rabbitmq" b_redis "github.com/dapr/components-contrib/bindings/redis" - p_amqp "github.com/dapr/components-contrib/pubsub/amqp" p_snssqs "github.com/dapr/components-contrib/pubsub/aws/snssqs" p_eventhubs "github.com/dapr/components-contrib/pubsub/azure/eventhubs" p_servicebusqueues "github.com/dapr/components-contrib/pubsub/azure/servicebus/queues" @@ -65,6 +64,7 @@ import ( p_pulsar "github.com/dapr/components-contrib/pubsub/pulsar" p_rabbitmq "github.com/dapr/components-contrib/pubsub/rabbitmq" p_redis "github.com/dapr/components-contrib/pubsub/redis" + p_solaceamqp "github.com/dapr/components-contrib/pubsub/solace/amqp" ss_azure "github.com/dapr/components-contrib/secretstores/azure/keyvault" ss_hashicorp_vault "github.com/dapr/components-contrib/secretstores/hashicorp/vault" ss_kubernetes "github.com/dapr/components-contrib/secretstores/kubernetes" @@ -402,8 +402,8 @@ func loadPubSub(tc TestComponent) pubsub.PubSub { pubsub = p_snssqs.NewSnsSqs(testLogger) case "kubemq": pubsub = p_kubemq.NewKubeMQ(testLogger) - case "amqp": - pubsub = p_amqp.NewAMQPPubsub(testLogger) + case "solace.amqp": + pubsub = p_solaceamqp.NewAMQPPubsub(testLogger) default: return nil } From a5dfcb80fa2fcae1af29ec0f9c3ce501f7949beb Mon Sep 17 00:00:00 2001 From: TKTheTechie Date: Wed, 21 Dec 2022 16:32:50 -0500 Subject: [PATCH 06/10] Allowing anonymous connections Updating rule to allow an anonymous connection without credentials Signed-off-by: TKTheTechie --- pubsub/solace/amqp/metadata.go | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/pubsub/solace/amqp/metadata.go b/pubsub/solace/amqp/metadata.go index ae7878668..849778eff 100644 --- a/pubsub/solace/amqp/metadata.go +++ b/pubsub/solace/amqp/metadata.go @@ -62,7 +62,7 @@ func isValidPEM(val string) bool { } func parseAMQPMetaData(md pubsub.Metadata, log logger.Logger) (*metadata, error) { - m := metadata{} + m := metadata{anonymous: false} // required configuration settings if val, ok := md.Properties[amqpUrl]; ok && val != "" { @@ -71,18 +71,6 @@ func parseAMQPMetaData(md pubsub.Metadata, log logger.Logger) (*metadata, error) return &m, fmt.Errorf("%s missing url", errorMsgPrefix) } - if val, ok := md.Properties[username]; ok && val != "" { - m.username = val - } else { - return &m, fmt.Errorf("%s missing username", errorMsgPrefix) - } - - if val, ok := md.Properties[password]; ok && val != "" { - m.password = val - } else { - return &m, fmt.Errorf("%s missing username", errorMsgPrefix) - } - // optional configuration settings if val, ok := md.Properties[anonymous]; ok && val != "" { var err error @@ -91,6 +79,22 @@ func parseAMQPMetaData(md pubsub.Metadata, log logger.Logger) (*metadata, error) return &m, fmt.Errorf("%s invalid anonymous %s, %s", errorMsgPrefix, val, err) } } + + if !m.anonymous { + + if val, ok := md.Properties[username]; ok && val != "" { + m.username = val + } else { + return &m, fmt.Errorf("%s missing username", errorMsgPrefix) + } + + if val, ok := md.Properties[password]; ok && val != "" { + m.password = val + } else { + return &m, fmt.Errorf("%s missing username", errorMsgPrefix) + } + } + if val, ok := md.Properties[amqpCACert]; ok && val != "" { if !isValidPEM(val) { return &m, fmt.Errorf("%s invalid caCert", errorMsgPrefix) From 9c28641594a1d5af33a4434216da82c13bbaa868 Mon Sep 17 00:00:00 2001 From: TKTheTechie Date: Fri, 23 Dec 2022 08:58:41 -0500 Subject: [PATCH 07/10] Fixing linting issues, conformance infrastructure * Fixing linting issues * Adding infrastructure to .github/workflows * Fixing conformance tests config to account for default anonymous config on the event broker Signed-off-by: TKTheTechie --- .../infrastructure/docker-compose-solace.yml | 61 +++++++++++++++++++ pubsub/solace/amqp/amqp.go | 31 +++------- pubsub/solace/amqp/amqp_test.go | 10 +-- pubsub/solace/amqp/metadata.go | 5 +- tests/config/pubsub/solace/amqp/pubsub.yml | 4 -- 5 files changed, 77 insertions(+), 34 deletions(-) create mode 100644 .github/infrastructure/docker-compose-solace.yml diff --git a/.github/infrastructure/docker-compose-solace.yml b/.github/infrastructure/docker-compose-solace.yml new file mode 100644 index 000000000..96cb46abb --- /dev/null +++ b/.github/infrastructure/docker-compose-solace.yml @@ -0,0 +1,61 @@ +version: '3.3' + +services: + primary: + container_name: pubSubStandardSingleNode + image: solace/solace-pubsub-standard:latest + volumes: + - "storage-group:/var/lib/solace" + shm_size: 1g + ulimits: + core: -1 + nofile: + soft: 2448 + hard: 6592 + deploy: + restart_policy: + condition: on-failure + max_attempts: 1 + ports: + #Port Mappings: With the exception of SMF, ports are mapped straight + #through from host to container. This may result in port collisions on + #commonly used ports that will cause failure of the container to start. + #Web transport + - '8008:8008' + #Web transport over TLS + - '1443:1443' + #SEMP over TLS + - '1943:1943' + #MQTT Default VPN + - '1883:1883' + #AMQP Default VPN over TLS + - '5671:5671' + #AMQP Default VPN + - '5672:5672' + #MQTT Default VPN over WebSockets + - '8000:8000' + #MQTT Default VPN over WebSockets / TLS + - '8443:8443' + #MQTT Default VPN over TLS + - '8883:8883' + #SEMP / PubSub+ Manager + - '8080:8080' + #REST Default VPN + - '9000:9000' + #REST Default VPN over TLS + - '9443:9443' + #SMF + - '55554:55555' + #SMF Compressed + - '55003:55003' + #SMF over TLS + - '55443:55443' + #SSH connection to CLI + - '2222:2222' + environment: + - username_admin_globalaccesslevel=admin + - username_admin_password=admin + - system_scaling_maxconnectioncount=100 + +volumes: + storage-group: \ No newline at end of file diff --git a/pubsub/solace/amqp/amqp.go b/pubsub/solace/amqp/amqp.go index 2e2d8e6d7..c32c09515 100644 --- a/pubsub/solace/amqp/amqp.go +++ b/pubsub/solace/amqp/amqp.go @@ -26,6 +26,7 @@ import ( time "time" amqp "github.com/Azure/go-amqp" + "github.com/dapr/components-contrib/pubsub" "github.com/dapr/kit/logger" ) @@ -37,7 +38,7 @@ const ( // amqpPubSub type allows sending and receiving data to/from an AMQP 1.0 broker type amqpPubSub struct { - session amqp.Session + session *amqp.Session metadata *metadata logger logger.Logger publishLock sync.RWMutex @@ -66,12 +67,11 @@ func (a *amqpPubSub) Init(metadata pubsub.Metadata) error { a.ctx, a.cancel = context.WithCancel(context.Background()) s, err := a.connect() - if err != nil { return err } - a.session = *s + a.session = s return err } @@ -79,7 +79,7 @@ func (a *amqpPubSub) Init(metadata pubsub.Metadata) error { func AddPrefixToAddress(t string) string { dest := t - //Unless the request comes in to publish on a queue, publish directly on a topic + // Unless the request comes in to publish on a queue, publish directly on a topic if !strings.HasPrefix(dest, "queue:") && !strings.HasPrefix(dest, "topic:") { dest = "topic://" + dest } else if strings.HasPrefix(dest, "queue:") { @@ -93,7 +93,6 @@ func AddPrefixToAddress(t string) string { // Publish the topic to amqp pubsub func (a *amqpPubSub) Publish(req *pubsub.PublishRequest) error { - a.publishLock.Lock() defer a.publishLock.Unlock() @@ -105,7 +104,7 @@ func (a *amqpPubSub) Publish(req *pubsub.PublishRequest) error { m := amqp.NewMessage(req.Data) - //If the request has ttl specified, put it on the message header + // If the request has ttl specified, put it on the message header ttlProp := req.Metadata["ttlInSeconds"] if ttlProp != "" { ttlInSeconds, err := strconv.Atoi(ttlProp) @@ -123,10 +122,9 @@ func (a *amqpPubSub) Publish(req *pubsub.PublishRequest) error { if err != nil { a.logger.Errorf("Unable to create link to %s", req.Topic, err) } else { - err = sender.Send(a.ctx, m) - //If the publish operation has failed, attemp to republish a maximum number of times + // If the publish operation has failed, attempt to republish a maximum number of times // before giving up if err != nil { for a.publishRetryCount <= publishMaxRetries { @@ -144,11 +142,9 @@ func (a *amqpPubSub) Publish(req *pubsub.PublishRequest) error { } return err - } func (a *amqpPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error { - prefixedTopic := AddPrefixToAddress(req.Topic) receiver, err := a.session.NewReceiver( @@ -163,7 +159,6 @@ func (a *amqpPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, } return err - } // function that subscribes to a queue in a tight loop @@ -173,10 +168,9 @@ func (a *amqpPubSub) subscribeForever(ctx context.Context, receiver *amqp.Receiv msg, err := receiver.Receive(ctx) if msg != nil { - data := msg.GetData() - //if data is empty, then check the value field for data + // if data is empty, then check the value field for data if data == nil || len(data) == 0 { data = []byte(fmt.Sprint(msg.Value)) } @@ -204,11 +198,9 @@ func (a *amqpPubSub) subscribeForever(ctx context.Context, receiver *amqp.Receiv err := receiver.RejectMessage(ctx, msg, nil) if err != nil { a.logger.Errorf("failed to NAK a message") - } } } - } } @@ -234,7 +226,6 @@ func (a *amqpPubSub) connect() (*amqp.Session, error) { } return session, nil - } func (a *amqpPubSub) newTLSConfig() *tls.Config { @@ -261,7 +252,6 @@ func (a *amqpPubSub) newTLSConfig() *tls.Config { } func (a *amqpPubSub) createClientOptions(uri *url.URL) []amqp.ConnOption { - var opts []amqp.ConnOption scheme := uri.Scheme @@ -270,14 +260,11 @@ func (a *amqpPubSub) createClientOptions(uri *url.URL) []amqp.ConnOption { case "amqp": if a.metadata.anonymous == true { opts = append(opts, amqp.ConnSASLAnonymous()) - } else { opts = append(opts, amqp.ConnSASLPlain(a.metadata.username, a.metadata.password)) } case "amqps": - opts = append(opts, amqp.ConnSASLPlain(a.metadata.username, a.metadata.password)) - opts = append(opts, amqp.ConnTLS(true)) - opts = append(opts, amqp.ConnTLSConfig(a.newTLSConfig())) + opts = append(opts, amqp.ConnSASLPlain(a.metadata.username, a.metadata.password), amqp.ConnTLS((true)), amqp.ConnTLSConfig(a.newTLSConfig())) } return opts @@ -290,12 +277,10 @@ func (a *amqpPubSub) Close() error { defer a.publishLock.Unlock() err := a.session.Close(a.ctx) - if err != nil { a.logger.Warnf("failed to close the connection.", err) } return err - } // Feature list for AMQP PubSub diff --git a/pubsub/solace/amqp/amqp_test.go b/pubsub/solace/amqp/amqp_test.go index 6f2195764..d2b26175b 100644 --- a/pubsub/solace/amqp/amqp_test.go +++ b/pubsub/solace/amqp/amqp_test.go @@ -20,15 +20,17 @@ import ( "testing" mdata "github.com/dapr/components-contrib/metadata" + "github.com/dapr/components-contrib/pubsub" "github.com/dapr/kit/logger" + "github.com/stretchr/testify/assert" ) func getFakeProperties() map[string]string { return map[string]string{ "consumerID": "client", - amqpUrl: "tcp://fakeUser:fakePassword@fake.mqtt.host:1883", + amqpURL: "tcp://fakeUser:fakePassword@fake.mqtt.host:1883", anonymous: "false", username: "default", password: "default", @@ -46,7 +48,7 @@ func TestParseMetadata(t *testing.T) { // assert assert.NoError(t, err) - assert.Equal(t, fakeProperties[amqpUrl], m.url) + assert.Equal(t, fakeProperties[amqpURL], m.url) }) t.Run("url is not given", func(t *testing.T) { @@ -55,13 +57,13 @@ func TestParseMetadata(t *testing.T) { fakeMetaData := pubsub.Metadata{ Base: mdata.Base{Properties: fakeProperties}, } - fakeMetaData.Properties[amqpUrl] = "" + fakeMetaData.Properties[amqpURL] = "" m, err := parseAMQPMetaData(fakeMetaData, log) // assert assert.EqualError(t, err, errors.New(errorMsgPrefix+" missing url").Error()) - assert.Equal(t, fakeProperties[amqpUrl], m.url) + assert.Equal(t, fakeProperties[amqpURL], m.url) }) t.Run("invalid ca certificate", func(t *testing.T) { diff --git a/pubsub/solace/amqp/metadata.go b/pubsub/solace/amqp/metadata.go index 849778eff..bfbfc5261 100644 --- a/pubsub/solace/amqp/metadata.go +++ b/pubsub/solace/amqp/metadata.go @@ -44,7 +44,7 @@ type tlsCfg struct { const ( // Keys - amqpUrl = "url" + amqpURL = "url" anonymous = "anonymous" username = "username" password = "password" @@ -65,7 +65,7 @@ func parseAMQPMetaData(md pubsub.Metadata, log logger.Logger) (*metadata, error) m := metadata{anonymous: false} // required configuration settings - if val, ok := md.Properties[amqpUrl]; ok && val != "" { + if val, ok := md.Properties[amqpURL]; ok && val != "" { m.url = val } else { return &m, fmt.Errorf("%s missing url", errorMsgPrefix) @@ -81,7 +81,6 @@ func parseAMQPMetaData(md pubsub.Metadata, log logger.Logger) (*metadata, error) } if !m.anonymous { - if val, ok := md.Properties[username]; ok && val != "" { m.username = val } else { diff --git a/tests/config/pubsub/solace/amqp/pubsub.yml b/tests/config/pubsub/solace/amqp/pubsub.yml index 6be13c322..16a07593e 100644 --- a/tests/config/pubsub/solace/amqp/pubsub.yml +++ b/tests/config/pubsub/solace/amqp/pubsub.yml @@ -6,9 +6,5 @@ spec: metadata: - name: url value: 'amqp://localhost:5672' - - name: username - value: 'default' - - name: password - value: 'default' - name: anonymous value: true From bbab204ee9e6b81c34d4fcb98d17a84020cf7b73 Mon Sep 17 00:00:00 2001 From: TKTheTechie Date: Fri, 23 Dec 2022 09:04:29 -0500 Subject: [PATCH 08/10] Update conformance.yml with solace Signed-off-by: TKTheTechie thomas.kunnumpurath@solace.com --- .github/workflows/conformance.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.github/workflows/conformance.yml b/.github/workflows/conformance.yml index be728c034..7bf45c961 100644 --- a/.github/workflows/conformance.yml +++ b/.github/workflows/conformance.yml @@ -72,6 +72,7 @@ jobs: - pubsub.kafka-wurstmeister - pubsub.kafka-confluent - pubsub.kubemq + - pubsub.solace - secretstores.kubernetes - secretstores.localenv - secretstores.localfile @@ -364,6 +365,10 @@ jobs: - name: Start kubemq run: docker-compose -f ./.github/infrastructure/docker-compose-kubemq.yml -p kubemq up -d if: contains(matrix.component, 'kubemq') + + - name: Start solace + run: docker-compose -f ./.github/infrastructure/docker-compose-solace.yml -p solace up -d + if: contains(matrix.component, 'solace') - name: Setup KinD test data if: contains(matrix.component, 'kubernetes') From 55fed36a516d661588efe14c137b2c35995291a9 Mon Sep 17 00:00:00 2001 From: Bernd Verst <4535280+berndverst@users.noreply.github.com> Date: Wed, 28 Dec 2022 17:43:51 -0800 Subject: [PATCH 09/10] fix workflow Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com> --- .github/workflows/conformance.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/conformance.yml b/.github/workflows/conformance.yml index 53215dc77..1fecad380 100644 --- a/.github/workflows/conformance.yml +++ b/.github/workflows/conformance.yml @@ -482,7 +482,7 @@ jobs: run: docker-compose -f ./.github/infrastructure/docker-compose-kubemq.yml -p kubemq up -d if: contains(matrix.component, 'kubemq') - - name: Start solace + - name: Start solace run: docker-compose -f ./.github/infrastructure/docker-compose-solace.yml -p solace up -d if: contains(matrix.component, 'solace') From 5cd02e8c78cb49fa6216726c4ccf27f55d5bd40f Mon Sep 17 00:00:00 2001 From: TKTheTechie Date: Wed, 28 Dec 2022 23:28:25 -0500 Subject: [PATCH 10/10] Update of amqp go library Signed-off-by: TKTheTechie --- pubsub/solace/amqp/amqp.go | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/pubsub/solace/amqp/amqp.go b/pubsub/solace/amqp/amqp.go index c32c09515..137a77c18 100644 --- a/pubsub/solace/amqp/amqp.go +++ b/pubsub/solace/amqp/amqp.go @@ -92,7 +92,7 @@ func AddPrefixToAddress(t string) string { } // Publish the topic to amqp pubsub -func (a *amqpPubSub) Publish(req *pubsub.PublishRequest) error { +func (a *amqpPubSub) Publish(ctx context.Context, req *pubsub.PublishRequest) error { a.publishLock.Lock() defer a.publishLock.Unlock() @@ -115,14 +115,15 @@ func (a *amqpPubSub) Publish(req *pubsub.PublishRequest) error { } } - sender, err := a.session.NewSender( - amqp.LinkTargetAddress(AddPrefixToAddress(req.Topic)), + sender, err := a.session.NewSender(ctx, + AddPrefixToAddress(req.Topic), + nil, ) if err != nil { a.logger.Errorf("Unable to create link to %s", req.Topic, err) } else { - err = sender.Send(a.ctx, m) + err = sender.Send(ctx, m) // If the publish operation has failed, attempt to republish a maximum number of times // before giving up @@ -131,7 +132,7 @@ func (a *amqpPubSub) Publish(req *pubsub.PublishRequest) error { a.publishRetryCount++ // Send message - err = sender.Send(a.ctx, m) + err = sender.Send(ctx, m) if err != nil { a.logger.Warnf("Failed to publish a message to the broker", err) @@ -147,8 +148,9 @@ func (a *amqpPubSub) Publish(req *pubsub.PublishRequest) error { func (a *amqpPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error { prefixedTopic := AddPrefixToAddress(req.Topic) - receiver, err := a.session.NewReceiver( - amqp.LinkSourceAddress(prefixedTopic), + receiver, err := a.session.NewReceiver(a.ctx, + prefixedTopic, + nil, ) if err == nil { @@ -214,13 +216,13 @@ func (a *amqpPubSub) connect() (*amqp.Session, error) { clientOpts := a.createClientOptions(uri) a.logger.Infof("Attempting to connect to %s", a.metadata.url) - client, err := amqp.Dial(a.metadata.url, clientOpts...) + client, err := amqp.Dial(a.metadata.url, &clientOpts) if err != nil { a.logger.Fatal("Dialing AMQP server:", err) } // Open a session - session, err := client.NewSession() + session, err := client.NewSession(a.ctx, nil) if err != nil { a.logger.Fatal("Creating AMQP session:", err) } @@ -251,20 +253,21 @@ func (a *amqpPubSub) newTLSConfig() *tls.Config { return tlsConfig } -func (a *amqpPubSub) createClientOptions(uri *url.URL) []amqp.ConnOption { - var opts []amqp.ConnOption +func (a *amqpPubSub) createClientOptions(uri *url.URL) amqp.ConnOptions { + var opts amqp.ConnOptions scheme := uri.Scheme switch scheme { case "amqp": if a.metadata.anonymous == true { - opts = append(opts, amqp.ConnSASLAnonymous()) + opts.SASLType = amqp.SASLTypeAnonymous() } else { - opts = append(opts, amqp.ConnSASLPlain(a.metadata.username, a.metadata.password)) + opts.SASLType = amqp.SASLTypePlain(a.metadata.username, a.metadata.password) } case "amqps": - opts = append(opts, amqp.ConnSASLPlain(a.metadata.username, a.metadata.password), amqp.ConnTLS((true)), amqp.ConnTLSConfig(a.newTLSConfig())) + opts.SASLType = amqp.SASLTypePlain(a.metadata.username, a.metadata.password) + opts.TLSConfig = a.newTLSConfig() } return opts