Update of amqp go library
Signed-off-by: TKTheTechie <thomas.kunnumpurath@solace.com>
This commit is contained in:
parent
55fed36a51
commit
5cd02e8c78
|
@ -92,7 +92,7 @@ func AddPrefixToAddress(t string) string {
|
|||
}
|
||||
|
||||
// Publish the topic to amqp pubsub
|
||||
func (a *amqpPubSub) Publish(req *pubsub.PublishRequest) error {
|
||||
func (a *amqpPubSub) Publish(ctx context.Context, req *pubsub.PublishRequest) error {
|
||||
a.publishLock.Lock()
|
||||
defer a.publishLock.Unlock()
|
||||
|
||||
|
@ -115,14 +115,15 @@ func (a *amqpPubSub) Publish(req *pubsub.PublishRequest) error {
|
|||
}
|
||||
}
|
||||
|
||||
sender, err := a.session.NewSender(
|
||||
amqp.LinkTargetAddress(AddPrefixToAddress(req.Topic)),
|
||||
sender, err := a.session.NewSender(ctx,
|
||||
AddPrefixToAddress(req.Topic),
|
||||
nil,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
a.logger.Errorf("Unable to create link to %s", req.Topic, err)
|
||||
} else {
|
||||
err = sender.Send(a.ctx, m)
|
||||
err = sender.Send(ctx, m)
|
||||
|
||||
// If the publish operation has failed, attempt to republish a maximum number of times
|
||||
// before giving up
|
||||
|
@ -131,7 +132,7 @@ func (a *amqpPubSub) Publish(req *pubsub.PublishRequest) error {
|
|||
a.publishRetryCount++
|
||||
|
||||
// Send message
|
||||
err = sender.Send(a.ctx, m)
|
||||
err = sender.Send(ctx, m)
|
||||
|
||||
if err != nil {
|
||||
a.logger.Warnf("Failed to publish a message to the broker", err)
|
||||
|
@ -147,8 +148,9 @@ func (a *amqpPubSub) Publish(req *pubsub.PublishRequest) error {
|
|||
func (a *amqpPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
|
||||
prefixedTopic := AddPrefixToAddress(req.Topic)
|
||||
|
||||
receiver, err := a.session.NewReceiver(
|
||||
amqp.LinkSourceAddress(prefixedTopic),
|
||||
receiver, err := a.session.NewReceiver(a.ctx,
|
||||
prefixedTopic,
|
||||
nil,
|
||||
)
|
||||
|
||||
if err == nil {
|
||||
|
@ -214,13 +216,13 @@ func (a *amqpPubSub) connect() (*amqp.Session, error) {
|
|||
clientOpts := a.createClientOptions(uri)
|
||||
|
||||
a.logger.Infof("Attempting to connect to %s", a.metadata.url)
|
||||
client, err := amqp.Dial(a.metadata.url, clientOpts...)
|
||||
client, err := amqp.Dial(a.metadata.url, &clientOpts)
|
||||
if err != nil {
|
||||
a.logger.Fatal("Dialing AMQP server:", err)
|
||||
}
|
||||
|
||||
// Open a session
|
||||
session, err := client.NewSession()
|
||||
session, err := client.NewSession(a.ctx, nil)
|
||||
if err != nil {
|
||||
a.logger.Fatal("Creating AMQP session:", err)
|
||||
}
|
||||
|
@ -251,20 +253,21 @@ func (a *amqpPubSub) newTLSConfig() *tls.Config {
|
|||
return tlsConfig
|
||||
}
|
||||
|
||||
func (a *amqpPubSub) createClientOptions(uri *url.URL) []amqp.ConnOption {
|
||||
var opts []amqp.ConnOption
|
||||
func (a *amqpPubSub) createClientOptions(uri *url.URL) amqp.ConnOptions {
|
||||
var opts amqp.ConnOptions
|
||||
|
||||
scheme := uri.Scheme
|
||||
|
||||
switch scheme {
|
||||
case "amqp":
|
||||
if a.metadata.anonymous == true {
|
||||
opts = append(opts, amqp.ConnSASLAnonymous())
|
||||
opts.SASLType = amqp.SASLTypeAnonymous()
|
||||
} else {
|
||||
opts = append(opts, amqp.ConnSASLPlain(a.metadata.username, a.metadata.password))
|
||||
opts.SASLType = amqp.SASLTypePlain(a.metadata.username, a.metadata.password)
|
||||
}
|
||||
case "amqps":
|
||||
opts = append(opts, amqp.ConnSASLPlain(a.metadata.username, a.metadata.password), amqp.ConnTLS((true)), amqp.ConnTLSConfig(a.newTLSConfig()))
|
||||
opts.SASLType = amqp.SASLTypePlain(a.metadata.username, a.metadata.password)
|
||||
opts.TLSConfig = a.newTLSConfig()
|
||||
}
|
||||
|
||||
return opts
|
||||
|
|
Loading…
Reference in New Issue