mirror of https://github.com/dapr/kit.git
Renamed lock
Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
parent
8452c3b19d
commit
76ae036b00
|
|
@ -30,7 +30,7 @@ type Processor[T queueable] struct {
|
|||
executeFn func(r T)
|
||||
queue queue[T]
|
||||
clock kclock.Clock
|
||||
queueLock sync.Mutex
|
||||
lock sync.Mutex
|
||||
processorRunningCh chan struct{}
|
||||
stopCh chan struct{}
|
||||
resetCh chan struct{}
|
||||
|
|
@ -59,14 +59,14 @@ func (p *Processor[T]) Enqueue(r T) error {
|
|||
|
||||
// Insert or replace the item in the queue
|
||||
// If the item added or replaced is the first one in the queue, we need to know that
|
||||
p.queueLock.Lock()
|
||||
p.lock.Lock()
|
||||
peek, ok := p.queue.Peek()
|
||||
isFirst := (ok && peek.Key() == r.Key()) // This is going to be true if the item being replaced is the first one in the queue
|
||||
p.queue.Insert(r, true)
|
||||
peek, _ = p.queue.Peek() // No need to check for "ok" here because we know this will return an item
|
||||
isFirst = isFirst || (peek == r) // This is also going to be true if the item just added landed at the front of the queue
|
||||
p.process(isFirst)
|
||||
p.queueLock.Unlock()
|
||||
p.lock.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
@ -78,14 +78,14 @@ func (p *Processor[T]) Dequeue(key string) error {
|
|||
}
|
||||
|
||||
// We need to check if this is the next item in the queue, as that requires stopping the processor
|
||||
p.queueLock.Lock()
|
||||
p.lock.Lock()
|
||||
peek, ok := p.queue.Peek()
|
||||
p.queue.Remove(key)
|
||||
if ok && peek.Key() == key {
|
||||
// If the item was the first one in the queue, restart the processor
|
||||
p.process(true)
|
||||
}
|
||||
p.queueLock.Unlock()
|
||||
p.lock.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
@ -147,9 +147,9 @@ func (p *Processor[T]) processLoop() {
|
|||
|
||||
for {
|
||||
// Continue processing items until the queue is empty
|
||||
p.queueLock.Lock()
|
||||
p.lock.Lock()
|
||||
r, ok = p.queue.Peek()
|
||||
p.queueLock.Unlock()
|
||||
p.lock.Unlock()
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
|
@ -203,16 +203,16 @@ func (p *Processor[T]) processLoop() {
|
|||
func (p *Processor[T]) execute(r T) {
|
||||
// Pop the item now that we're ready to process it
|
||||
// There's a small chance this is a different item than the one we peeked before
|
||||
p.queueLock.Lock()
|
||||
p.lock.Lock()
|
||||
// For safety, let's peek at the first item before popping it and make sure it's the same object
|
||||
// It's unlikely, but if it's a different object then restart the loop
|
||||
peek, ok := p.queue.Peek()
|
||||
if !ok || peek != r {
|
||||
p.queueLock.Unlock()
|
||||
p.lock.Unlock()
|
||||
return
|
||||
}
|
||||
r, ok = p.queue.Pop()
|
||||
p.queueLock.Unlock()
|
||||
p.lock.Unlock()
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue