mirror of https://github.com/dapr/kit.git
231 lines
5.9 KiB
Go
231 lines
5.9 KiB
Go
/*
|
|
Copyright 2023 The Dapr Authors
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package queue
|
|
|
|
import (
|
|
"errors"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
kclock "k8s.io/utils/clock"
|
|
)
|
|
|
|
// ErrProcessorStopped is returned when the processor is not running.
|
|
var ErrProcessorStopped = errors.New("processor is stopped")
|
|
|
|
// Processor manages the queue of items and processes them at the correct time.
|
|
type Processor[T queueable] struct {
|
|
executeFn func(r T)
|
|
queue queue[T]
|
|
clock kclock.Clock
|
|
lock sync.Mutex
|
|
wg sync.WaitGroup
|
|
processorRunningCh chan struct{}
|
|
stopCh chan struct{}
|
|
resetCh chan struct{}
|
|
stopped atomic.Bool
|
|
}
|
|
|
|
// NewProcessor returns a new Processor object.
|
|
// executeFn is the callback invoked when the item is to be executed; this will be invoked in a background goroutine.
|
|
func NewProcessor[T queueable](executeFn func(r T)) *Processor[T] {
|
|
return &Processor[T]{
|
|
executeFn: executeFn,
|
|
queue: newQueue[T](),
|
|
processorRunningCh: make(chan struct{}, 1),
|
|
stopCh: make(chan struct{}),
|
|
resetCh: make(chan struct{}, 1),
|
|
clock: kclock.RealClock{},
|
|
}
|
|
}
|
|
|
|
// WithClock sets the clock used by the processor. Used for testing.
|
|
func (p *Processor[T]) WithClock(clock kclock.Clock) *Processor[T] {
|
|
p.clock = clock
|
|
return p
|
|
}
|
|
|
|
// Enqueue adds a new item to the queue.
|
|
// If a item with the same ID already exists, it'll be replaced.
|
|
func (p *Processor[T]) Enqueue(r T) error {
|
|
if p.stopped.Load() {
|
|
return ErrProcessorStopped
|
|
}
|
|
|
|
// Insert or replace the item in the queue
|
|
// If the item added or replaced is the first one in the queue, we need to know that
|
|
p.lock.Lock()
|
|
peek, ok := p.queue.Peek()
|
|
isFirst := (ok && peek.Key() == r.Key()) // This is going to be true if the item being replaced is the first one in the queue
|
|
p.queue.Insert(r, true)
|
|
peek, _ = p.queue.Peek() // No need to check for "ok" here because we know this will return an item
|
|
isFirst = isFirst || (peek == r) // This is also going to be true if the item just added landed at the front of the queue
|
|
p.process(isFirst)
|
|
p.lock.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Dequeue removes a item from the queue.
|
|
func (p *Processor[T]) Dequeue(key string) error {
|
|
if p.stopped.Load() {
|
|
return ErrProcessorStopped
|
|
}
|
|
|
|
// We need to check if this is the next item in the queue, as that requires stopping the processor
|
|
p.lock.Lock()
|
|
peek, ok := p.queue.Peek()
|
|
p.queue.Remove(key)
|
|
if ok && peek.Key() == key {
|
|
// If the item was the first one in the queue, restart the processor
|
|
p.process(true)
|
|
}
|
|
p.lock.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Close stops the processor.
|
|
// This method blocks until the processor loop returns.
|
|
func (p *Processor[T]) Close() error {
|
|
defer p.wg.Wait()
|
|
if p.stopped.CompareAndSwap(false, true) {
|
|
// Send a signal to stop
|
|
close(p.stopCh)
|
|
// Blocks until processor loop ends
|
|
p.processorRunningCh <- struct{}{}
|
|
return nil
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Start the processing loop if it's not already running.
|
|
// This must be invoked while the caller has a lock.
|
|
func (p *Processor[T]) process(isNext bool) {
|
|
// Do not start a loop if it's already running
|
|
select {
|
|
case p.processorRunningCh <- struct{}{}:
|
|
// Nop - fallthrough
|
|
default:
|
|
// Already running
|
|
if isNext {
|
|
// If this is the next item, send a reset signal
|
|
// Use a select in case another goroutine is sending a reset signal too
|
|
select {
|
|
case p.resetCh <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
p.wg.Add(1)
|
|
go func() {
|
|
defer p.wg.Done()
|
|
p.processLoop()
|
|
}()
|
|
}
|
|
|
|
// Processing loop.
|
|
func (p *Processor[T]) processLoop() {
|
|
defer func() {
|
|
// Release the channel when exiting
|
|
<-p.processorRunningCh
|
|
}()
|
|
|
|
var (
|
|
r T
|
|
ok bool
|
|
t kclock.Timer
|
|
scheduledTime time.Time
|
|
deadline time.Duration
|
|
)
|
|
|
|
for {
|
|
// Continue processing items until the queue is empty
|
|
p.lock.Lock()
|
|
r, ok = p.queue.Peek()
|
|
p.lock.Unlock()
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
// Check if after obtaining the lock we have a stop or reset signals
|
|
// Do this before we create a timer
|
|
select {
|
|
case <-p.stopCh:
|
|
// Exit on stop signals
|
|
return
|
|
case <-p.resetCh:
|
|
// Restart the loop on reset signals
|
|
continue
|
|
default:
|
|
// Nop, proceed
|
|
}
|
|
|
|
scheduledTime = r.ScheduledTime()
|
|
deadline = scheduledTime.Sub(p.clock.Now())
|
|
|
|
// If the deadline is less than 0.5ms away, execute it right away
|
|
// This is more efficient than creating a timer
|
|
if deadline < 500*time.Microsecond {
|
|
p.execute(r)
|
|
continue
|
|
}
|
|
|
|
t = p.clock.NewTimer(deadline)
|
|
select {
|
|
// Wait for when it's time to execute the item
|
|
case <-t.C():
|
|
p.execute(r)
|
|
|
|
// If we get a reset signal, restart the loop
|
|
case <-p.resetCh:
|
|
// Restart the loop
|
|
continue
|
|
|
|
// If we receive a stop signal, exit
|
|
case <-p.stopCh:
|
|
// Stop the timer and exit the loop
|
|
if !t.Stop() {
|
|
<-t.C()
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Executes a item when it's time.
|
|
func (p *Processor[T]) execute(r T) {
|
|
// Pop the item now that we're ready to process it
|
|
// There's a small chance this is a different item than the one we peeked before
|
|
p.lock.Lock()
|
|
// For safety, let's peek at the first item before popping it and make sure it's the same object
|
|
// It's unlikely, but if it's a different object then restart the loop
|
|
peek, ok := p.queue.Peek()
|
|
if !ok || peek != r {
|
|
p.lock.Unlock()
|
|
return
|
|
}
|
|
r, ok = p.queue.Pop()
|
|
p.lock.Unlock()
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
go p.executeFn(r)
|
|
}
|