diff --git a/eventqueue/processor.go b/eventqueue/processor.go index 8248450..882343d 100644 --- a/eventqueue/processor.go +++ b/eventqueue/processor.go @@ -30,7 +30,7 @@ type Processor[T queueable] struct { executeFn func(r T) queue queue[T] clock kclock.Clock - queueLock sync.Mutex + lock sync.Mutex processorRunningCh chan struct{} stopCh chan struct{} resetCh chan struct{} @@ -59,14 +59,14 @@ func (p *Processor[T]) Enqueue(r T) error { // Insert or replace the item in the queue // If the item added or replaced is the first one in the queue, we need to know that - p.queueLock.Lock() + p.lock.Lock() peek, ok := p.queue.Peek() isFirst := (ok && peek.Key() == r.Key()) // This is going to be true if the item being replaced is the first one in the queue p.queue.Insert(r, true) peek, _ = p.queue.Peek() // No need to check for "ok" here because we know this will return an item isFirst = isFirst || (peek == r) // This is also going to be true if the item just added landed at the front of the queue p.process(isFirst) - p.queueLock.Unlock() + p.lock.Unlock() return nil } @@ -78,14 +78,14 @@ func (p *Processor[T]) Dequeue(key string) error { } // We need to check if this is the next item in the queue, as that requires stopping the processor - p.queueLock.Lock() + p.lock.Lock() peek, ok := p.queue.Peek() p.queue.Remove(key) if ok && peek.Key() == key { // If the item was the first one in the queue, restart the processor p.process(true) } - p.queueLock.Unlock() + p.lock.Unlock() return nil } @@ -147,9 +147,9 @@ func (p *Processor[T]) processLoop() { for { // Continue processing items until the queue is empty - p.queueLock.Lock() + p.lock.Lock() r, ok = p.queue.Peek() - p.queueLock.Unlock() + p.lock.Unlock() if !ok { return } @@ -203,16 +203,16 @@ func (p *Processor[T]) processLoop() { func (p *Processor[T]) execute(r T) { // Pop the item now that we're ready to process it // There's a small chance this is a different item than the one we peeked before - p.queueLock.Lock() + p.lock.Lock() // For safety, let's peek at the first item before popping it and make sure it's the same object // It's unlikely, but if it's a different object then restart the loop peek, ok := p.queue.Peek() if !ok || peek != r { - p.queueLock.Unlock() + p.lock.Unlock() return } r, ok = p.queue.Pop() - p.queueLock.Unlock() + p.lock.Unlock() if !ok { return }