Rabbitmq pubsub component stop has not be well handled (#860)
* fix pubsub rabbitmq stop * fix lint * Add read lock while reading stopped * fix lint Co-authored-by: Phil Kedy <phil.kedy@gmail.com> Co-authored-by: Artur Souza <artursouza.ms@outlook.com>
This commit is contained in:
parent
a28587ed98
commit
7512c77a28
|
|
@ -279,6 +279,10 @@ func (r *rabbitMQ) subscribeForever(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if r.isStopped() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
r.logger.Errorf("%s error in subscription for %s, %s", logMessagePrefix, queueName, err)
|
r.logger.Errorf("%s error in subscription for %s, %s", logMessagePrefix, queueName, err)
|
||||||
|
|
||||||
if mustReconnect(channel, err) {
|
if mustReconnect(channel, err) {
|
||||||
|
|
@ -385,6 +389,13 @@ func (r *rabbitMQ) reset() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *rabbitMQ) isStopped() bool {
|
||||||
|
r.channelMutex.RLock()
|
||||||
|
defer r.channelMutex.RUnlock()
|
||||||
|
|
||||||
|
return r.stopped
|
||||||
|
}
|
||||||
|
|
||||||
func (r *rabbitMQ) Close() error {
|
func (r *rabbitMQ) Close() error {
|
||||||
r.channelMutex.Lock()
|
r.channelMutex.Lock()
|
||||||
defer r.channelMutex.Unlock()
|
defer r.channelMutex.Unlock()
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue