Added eventqueue package (#54)

* Added eventqueue package

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Changed per review feedback

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Renamed lock

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

---------

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
Alessandro (Ale) Segala 2023-08-07 15:50:40 -07:00 committed by GitHub
parent bc1c1eee0a
commit b6b141aa3e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 1166 additions and 0 deletions

18
eventqueue/eventqueue.go Normal file
View File

@ -0,0 +1,18 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package eventqueue implements a queue processor for delayed events.
// Events are maintained in an in-memory queue, where items are in the order of when they are to be executed.
// Users should interact with the Processor to process events in the queue.
// When the queue has at least 1 item, the processor uses a single background goroutine to wait on the next item to be executed.
package eventqueue

View File

@ -0,0 +1,72 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package eventqueue
import (
"fmt"
"time"
kclock "k8s.io/utils/clock"
)
// queueableItem is an item that can be queued and it's used for testing.
type queueableItem struct {
Name string
ExecutionTime time.Time
}
// Key returns the key for this unique item.
func (r queueableItem) Key() string {
return r.Name
}
// ScheduledTime returns the time the item is scheduled to be executed at.
// This is implemented to comply with the queueable interface.
func (r queueableItem) ScheduledTime() time.Time {
return r.ExecutionTime
}
func ExampleProcessor() {
// Init a clock using k8s.io/utils/clock
clock := kclock.RealClock{}
// Method invoked when an item is to be executed
executed := make(chan string, 3)
executeFn := func(r *queueableItem) {
executed <- "Executed: " + r.Name
}
// Create the processor
processor := NewProcessor[*queueableItem](executeFn, clock)
// Add items to the processor, in any order, using Enqueue
processor.Enqueue(&queueableItem{Name: "item1", ExecutionTime: clock.Now().Add(500 * time.Millisecond)})
processor.Enqueue(&queueableItem{Name: "item2", ExecutionTime: clock.Now().Add(200 * time.Millisecond)})
processor.Enqueue(&queueableItem{Name: "item3", ExecutionTime: clock.Now().Add(300 * time.Millisecond)})
processor.Enqueue(&queueableItem{Name: "item4", ExecutionTime: clock.Now().Add(time.Second)})
// Items with the same value returned by Key() are considered the same, so will be replaced
processor.Enqueue(&queueableItem{Name: "item3", ExecutionTime: clock.Now().Add(100 * time.Millisecond)})
// Using Dequeue allows removing an item from the queue
processor.Dequeue("item4")
for i := 0; i < 3; i++ {
fmt.Println(<-executed)
}
// Output:
// Executed: item3
// Executed: item2
// Executed: item1
}

221
eventqueue/processor.go Normal file
View File

@ -0,0 +1,221 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package eventqueue
import (
"errors"
"sync"
"sync/atomic"
"time"
kclock "k8s.io/utils/clock"
)
// ErrProcessorStopped is returned when the processor is not running.
var ErrProcessorStopped = errors.New("processor is stopped")
// Processor manages the queue of items and processes them at the correct time.
type Processor[T queueable] struct {
executeFn func(r T)
queue queue[T]
clock kclock.Clock
lock sync.Mutex
processorRunningCh chan struct{}
stopCh chan struct{}
resetCh chan struct{}
stopped atomic.Bool
}
// NewProcessor returns a new Processor object.
// executeFn is the callback invoked when the item is to be executed; this will be invoked in a background goroutine.
func NewProcessor[T queueable](executeFn func(r T), clock kclock.Clock) *Processor[T] {
return &Processor[T]{
executeFn: executeFn,
queue: newQueue[T](),
processorRunningCh: make(chan struct{}, 1),
stopCh: make(chan struct{}),
resetCh: make(chan struct{}, 1),
clock: clock,
}
}
// Enqueue adds a new item to the queue.
// If a item with the same ID already exists, it'll be replaced.
func (p *Processor[T]) Enqueue(r T) error {
if p.stopped.Load() {
return ErrProcessorStopped
}
// Insert or replace the item in the queue
// If the item added or replaced is the first one in the queue, we need to know that
p.lock.Lock()
peek, ok := p.queue.Peek()
isFirst := (ok && peek.Key() == r.Key()) // This is going to be true if the item being replaced is the first one in the queue
p.queue.Insert(r, true)
peek, _ = p.queue.Peek() // No need to check for "ok" here because we know this will return an item
isFirst = isFirst || (peek == r) // This is also going to be true if the item just added landed at the front of the queue
p.process(isFirst)
p.lock.Unlock()
return nil
}
// Dequeue removes a item from the queue.
func (p *Processor[T]) Dequeue(key string) error {
if p.stopped.Load() {
return ErrProcessorStopped
}
// We need to check if this is the next item in the queue, as that requires stopping the processor
p.lock.Lock()
peek, ok := p.queue.Peek()
p.queue.Remove(key)
if ok && peek.Key() == key {
// If the item was the first one in the queue, restart the processor
p.process(true)
}
p.lock.Unlock()
return nil
}
// Close stops the processor.
// This method blocks until the processor loop returns.
func (p *Processor[T]) Close() error {
if !p.stopped.CompareAndSwap(false, true) {
// Already stopped
return nil
}
// Send a signal to stop
close(p.stopCh)
// Blocks until processor loop ends
p.processorRunningCh <- struct{}{}
return nil
}
// Start the processing loop if it's not already running.
// This must be invoked while the caller has a lock.
func (p *Processor[T]) process(isNext bool) {
// Do not start a loop if it's already running
select {
case p.processorRunningCh <- struct{}{}:
// Nop - fallthrough
default:
// Already running
if isNext {
// If this is the next item, send a reset signal
// Use a select in case another goroutine is sending a reset signal too
select {
case p.resetCh <- struct{}{}:
default:
}
}
return
}
go p.processLoop()
}
// Processing loop.
func (p *Processor[T]) processLoop() {
defer func() {
// Release the channel when exiting
<-p.processorRunningCh
}()
var (
r T
ok bool
t kclock.Timer
scheduledTime time.Time
deadline time.Duration
)
for {
// Continue processing items until the queue is empty
p.lock.Lock()
r, ok = p.queue.Peek()
p.lock.Unlock()
if !ok {
return
}
// Check if after obtaining the lock we have a stop or reset signals
// Do this before we create a timer
select {
case <-p.stopCh:
// Exit on stop signals
return
case <-p.resetCh:
// Restart the loop on reset signals
continue
default:
// Nop, proceed
}
scheduledTime = r.ScheduledTime()
deadline = scheduledTime.Sub(p.clock.Now())
// If the deadline is less than 0.5ms away, execute it right away
// This is more efficient than creating a timer
if deadline < 500*time.Microsecond {
p.execute(r)
continue
}
t = p.clock.NewTimer(deadline)
select {
// Wait for when it's time to execute the item
case <-t.C():
p.execute(r)
// If we get a reset signal, restart the loop
case <-p.resetCh:
// Restart the loop
continue
// If we receive a stop signal, exit
case <-p.stopCh:
// Stop the timer and exit the loop
if !t.Stop() {
<-t.C()
}
return
}
}
}
// Executes a item when it's time.
func (p *Processor[T]) execute(r T) {
// Pop the item now that we're ready to process it
// There's a small chance this is a different item than the one we peeked before
p.lock.Lock()
// For safety, let's peek at the first item before popping it and make sure it's the same object
// It's unlikely, but if it's a different object then restart the loop
peek, ok := p.queue.Peek()
if !ok || peek != r {
p.lock.Unlock()
return
}
r, ok = p.queue.Pop()
p.lock.Unlock()
if !ok {
return
}
go p.executeFn(r)
}

View File

@ -0,0 +1,376 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package eventqueue
import (
"math/rand"
"runtime"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
clocktesting "k8s.io/utils/clock/testing"
)
func TestProcessor(t *testing.T) {
// Create the processor
clock := clocktesting.NewFakeClock(time.Now())
executeCh := make(chan *queueableItem)
processor := NewProcessor(func(r *queueableItem) {
executeCh <- r
}, clock)
assertExecutedItem := func(t *testing.T) *queueableItem {
t.Helper()
// The signal is sent in a background goroutine, so we need to use a wall clock here
runtime.Gosched()
select {
case r := <-executeCh:
return r
case <-time.After(700 * time.Millisecond):
t.Fatal("did not receive signal in 700ms")
}
return nil
}
assertNoExecutedItem := func(t *testing.T) {
t.Helper()
// The signal is sent in a background goroutine, so we need to use a wall clock here
runtime.Gosched()
select {
case r := <-executeCh:
t.Fatalf("received unexpected item: %s", r.Name)
case <-time.After(500 * time.Millisecond):
// all good
}
}
// Makes tickers advance
// Note that step must be > 500ms
advanceTickers := func(step time.Duration, count int) {
clock.Step(50 * time.Millisecond)
// Sleep on the wall clock for a few ms to allow the background goroutine to get in sync (especially when testing with -race)
runtime.Gosched()
time.Sleep(50 * time.Millisecond)
for i := 0; i < count; i++ {
clock.Step(step)
// Sleep on the wall clock for a few ms to allow the background goroutine to get in sync (especially when testing with -race)
runtime.Gosched()
time.Sleep(50 * time.Millisecond)
}
}
t.Run("enqueue items", func(t *testing.T) {
for i := 1; i <= 5; i++ {
err := processor.Enqueue(
newTestItem(i, clock.Now().Add(time.Second*time.Duration(i))),
)
require.NoError(t, err)
}
// Advance tickers by 500ms to start
advanceTickers(500*time.Millisecond, 1)
// Advance tickers and assert messages are coming in order
for i := 1; i <= 5; i++ {
t.Logf("Waiting for signal %d", i)
advanceTickers(time.Second, 1)
received := assertExecutedItem(t)
assert.Equal(t, strconv.Itoa(i), received.Name)
}
})
t.Run("enqueue item to be executed right away", func(t *testing.T) {
r := newTestItem(1, clock.Now())
err := processor.Enqueue(r)
require.NoError(t, err)
advanceTickers(500*time.Millisecond, 1)
received := assertExecutedItem(t)
assert.Equal(t, "1", received.Name)
})
t.Run("enqueue item at the front of the queue", func(t *testing.T) {
// Enqueue 4 items
for i := 1; i <= 4; i++ {
err := processor.Enqueue(
newTestItem(i, clock.Now().Add(time.Second*time.Duration(i))),
)
require.NoError(t, err)
}
// Advance tickers by 1500ms to trigger the first item
t.Log("Waiting for signal 1")
advanceTickers(500*time.Millisecond, 3)
received := assertExecutedItem(t)
assert.Equal(t, "1", received.Name)
// Add a new item at the front of the queue
err := processor.Enqueue(
newTestItem(99, clock.Now()),
)
require.NoError(t, err)
// Advance tickers and assert messages are coming in order
for i := 1; i <= 4; i++ {
// First item should be 99
expect := strconv.Itoa(i)
if i == 1 {
expect = "99"
}
t.Logf("Waiting for signal %s", expect)
advanceTickers(time.Second, 1)
received := assertExecutedItem(t)
assert.Equal(t, expect, received.Name)
}
})
t.Run("dequeue item", func(t *testing.T) {
// Enqueue 5 items
for i := 1; i <= 5; i++ {
err := processor.Enqueue(
newTestItem(i, clock.Now().Add(time.Second*time.Duration(i))),
)
require.NoError(t, err)
}
// Advance tickers by a few ms to start
advanceTickers(0, 0)
// Dequeue items 2 and 4
// Note that this is a string because it's the key
err := processor.Dequeue("2")
require.NoError(t, err)
err = processor.Dequeue("4")
require.NoError(t, err)
// Advance tickers and assert messages are coming in order
for i := 1; i <= 5; i++ {
if i == 2 || i == 4 {
// Skip items that have been removed
t.Logf("Should not receive signal %d", i)
advanceTickers(time.Second, 1)
assertNoExecutedItem(t)
continue
}
t.Logf("Waiting for signal %d", i)
advanceTickers(time.Second, 1)
received := assertExecutedItem(t)
assert.Equal(t, strconv.Itoa(i), received.Name)
}
})
t.Run("dequeue item from the front of the queue", func(t *testing.T) {
// Enqueue 6 items
for i := 1; i <= 6; i++ {
err := processor.Enqueue(
newTestItem(i, clock.Now().Add(time.Second*time.Duration(i))),
)
require.NoError(t, err)
}
// Advance tickers by a few ms to start
advanceTickers(0, 0)
// Advance tickers and assert messages are coming in order
for i := 1; i <= 6; i++ {
// On messages 2 and 5, dequeue the item when it's at the front of the queue
if i == 2 || i == 5 {
// Dequeue the item at the front of the queue
// Note that this is a string because it's the key
err := processor.Dequeue(strconv.Itoa(i))
require.NoError(t, err)
// Skip items that have been removed
t.Logf("Should not receive signal %d", i)
advanceTickers(time.Second, 1)
assertNoExecutedItem(t)
continue
}
t.Logf("Waiting for signal %d", i)
advanceTickers(time.Second, 1)
received := assertExecutedItem(t)
assert.Equal(t, strconv.Itoa(i), received.Name)
}
})
t.Run("replace item", func(t *testing.T) {
// Enqueue 5 items
for i := 1; i <= 5; i++ {
err := processor.Enqueue(
newTestItem(i, clock.Now().Add(time.Second*time.Duration(i))),
)
require.NoError(t, err)
}
// Replace item 4, bumping its priority down
err := processor.Enqueue(newTestItem(4, clock.Now().Add(6*time.Second)))
require.NoError(t, err)
// Advance tickers by a few ms to start
advanceTickers(0, 0)
// Advance tickers and assert messages are coming in order
for i := 1; i <= 6; i++ {
if i == 4 {
// This item has been pushed down
t.Logf("Should not receive signal %d now", i)
advanceTickers(time.Second, 1)
assertNoExecutedItem(t)
continue
}
expect := i
if i == 6 {
// Item 4 should come now
expect = 4
}
t.Logf("Waiting for signal %d", expect)
advanceTickers(time.Second, 1)
received := assertExecutedItem(t)
assert.Equal(t, strconv.Itoa(expect), received.Name)
}
})
t.Run("replace item at the front of the queue", func(t *testing.T) {
// Enqueue 5 items
for i := 1; i <= 5; i++ {
err := processor.Enqueue(
newTestItem(i, clock.Now().Add(time.Second*time.Duration(i))),
)
require.NoError(t, err)
}
// Advance tickers by a few ms to start
advanceTickers(0, 0)
// Advance tickers and assert messages are coming in order
for i := 1; i <= 6; i++ {
if i == 2 {
// Replace item 2, bumping its priority down, while it's at the front of the queue
err := processor.Enqueue(newTestItem(2, clock.Now().Add(5*time.Second)))
require.NoError(t, err)
// This item has been pushed down
t.Logf("Should not receive signal %d now", i)
advanceTickers(time.Second, 1)
assertNoExecutedItem(t)
continue
}
expect := i
if i == 6 {
// Item 2 should come now
expect = 2
}
t.Logf("Waiting for signal %d", expect)
advanceTickers(time.Second, 1)
received := assertExecutedItem(t)
assert.Equal(t, strconv.Itoa(expect), received.Name)
}
})
t.Run("enqueue multiple items concurrently that to be executed randomly", func(t *testing.T) {
const (
count = 150
maxDelay = 30
)
now := clock.Now()
wg := sync.WaitGroup{}
for i := 0; i < count; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
execTime := now.Add(time.Second * time.Duration(rand.Intn(maxDelay))) //nolint:gosec
err := processor.Enqueue(newTestItem(i, execTime))
require.NoError(t, err)
}(i)
}
wg.Wait()
// Collect
collected := make([]bool, count)
var collectedCount int64
doneCh := make(chan struct{})
go func() {
for {
select {
case <-doneCh:
return
case r := <-executeCh:
n, err := strconv.Atoi(r.Name)
if err == nil {
collected[n] = true
atomic.AddInt64(&collectedCount, 1)
}
}
}
}()
// Advance tickers and assert messages are coming in order
for i := 0; i <= maxDelay; i++ {
advanceTickers(time.Second, 1)
time.Sleep(50 * time.Millisecond)
}
// Allow for synchronization
assert.Eventually(t, func() bool {
return atomic.LoadInt64(&collectedCount) == count
}, 5*time.Second, 50*time.Millisecond)
close(doneCh)
// Ensure all items are true
for i := 0; i < count; i++ {
assert.Truef(t, collected[i], "item %d not received", i)
}
})
t.Run("stop processor", func(t *testing.T) {
// Enqueue 5 items
for i := 1; i <= 5; i++ {
err := processor.Enqueue(
newTestItem(i, clock.Now().Add(time.Second*time.Duration(i))),
)
require.NoError(t, err)
}
// Advance tickers by a few ms to start
advanceTickers(0, 0)
// Stop the processor
require.NoError(t, processor.Close())
// Queue should not be processed
advanceTickers(time.Second, 2)
assertNoExecutedItem(t)
// Enqueuing and dequeueing should fail
err := processor.Enqueue(newTestItem(99, clock.Now()))
require.ErrorIs(t, err, ErrProcessorStopped)
err = processor.Dequeue("99")
require.ErrorIs(t, err, ErrProcessorStopped)
// Stopping again is a nop (should not crash)
require.NoError(t, processor.Close())
})
}

163
eventqueue/queue.go Normal file
View File

@ -0,0 +1,163 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package eventqueue
import (
"container/heap"
"time"
)
// queueable is the interface for items that can be added to the queue.
type queueable interface {
comparable
Key() string
ScheduledTime() time.Time
}
// queue implements a queue for items that are scheduled to be executed at a later time.
// It acts as a "priority queue", in which items are added in order of when they're scheduled.
// Internally, it uses a heap (from container/heap) that allows Insert and Pop operations to be completed in O(log N) time (where N is the queue's length).
// Note: methods in this struct are not safe for concurrent use. Callers should use locks to ensure consistency.
type queue[T queueable] struct {
heap *queueHeap[T]
items map[string]*queueItem[T]
}
// newQueue creates a new queue.
func newQueue[T queueable]() queue[T] {
return queue[T]{
heap: new(queueHeap[T]),
items: make(map[string]*queueItem[T]),
}
}
// Len returns the number of items in the queue.
func (p *queue[T]) Len() int {
return p.heap.Len()
}
// Insert inserts a new item into the queue.
// If replace is true, existing items are replaced
func (p *queue[T]) Insert(r T, replace bool) {
key := r.Key()
// Check if the item already exists
item, ok := p.items[key]
if ok {
if replace {
item.value = r
heap.Fix(p.heap, item.index)
}
return
}
item = &queueItem[T]{
value: r,
}
heap.Push(p.heap, item)
p.items[key] = item
}
// Pop removes the next item in the queue and returns it.
// The returned boolean value will be "true" if an item was found.
func (p *queue[T]) Pop() (T, bool) {
if p.Len() == 0 {
var zero T
return zero, false
}
item, ok := heap.Pop(p.heap).(*queueItem[T])
if !ok || item == nil {
var zero T
return zero, false
}
delete(p.items, item.value.Key())
return item.value, true
}
// Peek returns the next item in the queue, without removing it.
// The returned boolean value will be "true" if an item was found.
func (p *queue[T]) Peek() (T, bool) {
if p.Len() == 0 {
var zero T
return zero, false
}
return (*p.heap)[0].value, true
}
// Remove an item from the queue.
func (p *queue[T]) Remove(key string) {
// If the item is not in the queue, this is a nop
item, ok := p.items[key]
if !ok {
return
}
heap.Remove(p.heap, item.index)
delete(p.items, key)
}
// Update an item in the queue.
func (p *queue[T]) Update(r T) {
// If the item is not in the queue, this is a nop
item, ok := p.items[r.Key()]
if !ok {
return
}
item.value = r
heap.Fix(p.heap, item.index)
}
type queueItem[T queueable] struct {
value T
// The index of the item in the heap. This is maintained by the heap.Interface methods.
index int
}
type queueHeap[T queueable] []*queueItem[T]
func (pq queueHeap[T]) Len() int {
return len(pq)
}
func (pq queueHeap[T]) Less(i, j int) bool {
return pq[i].value.ScheduledTime().Before(pq[j].value.ScheduledTime())
}
func (pq queueHeap[T]) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
func (pq *queueHeap[T]) Push(x any) {
n := len(*pq)
item := x.(*queueItem[T])
item.index = n
*pq = append(*pq, item)
}
func (pq *queueHeap[T]) Pop() any {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = nil // Avoid memory leak
item.index = -1 // For safety
*pq = old[0 : n-1]
return item
}

316
eventqueue/queue_test.go Normal file
View File

@ -0,0 +1,316 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package eventqueue
import (
"strconv"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestQueue(t *testing.T) {
queue := newQueue[*queueableItem]()
// Add 5 items, which are not in order
queue.Insert(newTestItem(2, "2022-02-02T02:02:02Z"), false)
queue.Insert(newTestItem(3, "2023-03-03T03:03:03Z"), false)
queue.Insert(newTestItem(1, "2021-01-01T01:01:01Z"), false)
queue.Insert(newTestItem(5, "2029-09-09T09:09:09Z"), false)
queue.Insert(newTestItem(4, "2024-04-04T04:04:04Z"), false)
require.Equal(t, 5, queue.Len())
i := 0
for {
// Pop an element from the queue
r, ok := queue.Pop()
if i < 5 {
require.True(t, ok)
require.NotNil(t, r)
} else {
require.False(t, ok)
break
}
i++
// Results should be in order
ri, err := strconv.Atoi(r.Name)
require.NoError(t, err)
assert.Equal(t, i, ri)
}
}
func TestQueueSkipDuplicates(t *testing.T) {
queue := newQueue[*queueableItem]()
// Add 2 items
queue.Insert(newTestItem(2, "2022-02-02T02:02:02Z"), false)
queue.Insert(newTestItem(1, "2021-01-01T01:01:01Z"), false)
require.Equal(t, 2, queue.Len())
// Add a duplicate item (same actor type, actor ID, name), but different time
queue.Insert(newTestItem(2, "2029-09-09T09:09:09Z"), false)
require.Equal(t, 2, queue.Len())
// Pop the items and check only the 2 original ones were in the queue
popAndCompare(t, &queue, 1, "2021-01-01T01:01:01Z")
popAndCompare(t, &queue, 2, "2022-02-02T02:02:02Z")
_, ok := queue.Pop()
require.False(t, ok)
}
func TestQueueReplaceDuplicates(t *testing.T) {
queue := newQueue[*queueableItem]()
// Add 2 items
queue.Insert(newTestItem(2, "2022-02-02T02:02:02Z"), false)
queue.Insert(newTestItem(1, "2021-01-01T01:01:01Z"), false)
require.Equal(t, 2, queue.Len())
// Replace a item
queue.Insert(newTestItem(1, "2029-09-09T09:09:09Z"), true)
require.Equal(t, 2, queue.Len())
// Pop the items and validate the new order
popAndCompare(t, &queue, 2, "2022-02-02T02:02:02Z")
popAndCompare(t, &queue, 1, "2029-09-09T09:09:09Z")
_, ok := queue.Pop()
require.False(t, ok)
}
func TestAddToQueue(t *testing.T) {
queue := newQueue[*queueableItem]()
// Add 5 items, which are not in order
queue.Insert(newTestItem(2, "2022-02-02T02:02:02Z"), false)
queue.Insert(newTestItem(5, "2023-03-03T03:03:03Z"), false)
queue.Insert(newTestItem(1, "2021-01-01T01:01:01Z"), false)
queue.Insert(newTestItem(8, "2029-09-09T09:09:09Z"), false)
queue.Insert(newTestItem(7, "2024-04-04T04:04:04Z"), false)
require.Equal(t, 5, queue.Len())
// Pop 2 elements from the queue
for i := 1; i <= 2; i++ {
r, ok := queue.Pop()
require.True(t, ok)
require.NotNil(t, r)
ri, err := strconv.Atoi(r.Name)
require.NoError(t, err)
assert.Equal(t, i, ri)
}
// Add 4 more elements
// Two are at the very beginning (including one that had the same time as one popped before)
// One is in the middle
// One is at the end
queue.Insert(newTestItem(3, "2021-01-01T01:01:01Z"), false)
queue.Insert(newTestItem(4, "2021-01-11T11:11:11Z"), false)
queue.Insert(newTestItem(6, "2023-03-13T13:13:13Z"), false)
queue.Insert(newTestItem(9, "2030-10-30T10:10:10Z"), false)
require.Equal(t, 7, queue.Len())
// Pop all the remaining elements and make sure they're in order
for i := 3; i <= 9; i++ {
r, ok := queue.Pop()
require.True(t, ok)
require.NotNil(t, r)
ri, err := strconv.Atoi(r.Name)
require.NoError(t, err)
assert.Equal(t, i, ri)
}
// Queue should be empty now
_, ok := queue.Pop()
require.False(t, ok)
require.Equal(t, 0, queue.Len())
}
func TestRemoveFromQueue(t *testing.T) {
queue := newQueue[*queueableItem]()
// Add 5 items, which are not in order
queue.Insert(newTestItem(2, "2022-02-02T02:02:02Z"), false)
queue.Insert(newTestItem(3, "2023-03-03T03:03:03Z"), false)
queue.Insert(newTestItem(1, "2021-01-01T01:01:01Z"), false)
queue.Insert(newTestItem(5, "2029-09-09T09:09:09Z"), false)
queue.Insert(newTestItem(4, "2024-04-04T04:04:04Z"), false)
require.Equal(t, 5, queue.Len())
// Pop 2 elements from the queue
for i := 1; i <= 2; i++ {
r, ok := queue.Pop()
require.True(t, ok)
require.NotNil(t, r)
ri, err := strconv.Atoi(r.Name)
require.NoError(t, err)
assert.Equal(t, i, ri)
}
require.Equal(t, 3, queue.Len())
// Remove the item with number "4"
// Note that this is a string because it's the key
queue.Remove("4")
// Removing non-existing items is a nop
queue.Remove("10")
require.Equal(t, 2, queue.Len())
// Pop all the remaining elements and make sure they're in order
popAndCompare(t, &queue, 3, "2023-03-03T03:03:03Z")
popAndCompare(t, &queue, 5, "2029-09-09T09:09:09Z")
_, ok := queue.Pop()
require.False(t, ok)
}
func TestUpdateInQueue(t *testing.T) {
queue := newQueue[*queueableItem]()
// Add 5 items, which are not in order
queue.Insert(newTestItem(2, "2022-02-02T02:02:02Z"), false)
queue.Insert(newTestItem(3, "2023-03-03T03:03:03Z"), false)
queue.Insert(newTestItem(1, "2021-01-01T01:01:01Z"), false)
queue.Insert(newTestItem(5, "2029-09-09T09:09:09Z"), false)
queue.Insert(newTestItem(4, "2024-04-04T04:04:04Z"), false)
require.Equal(t, 5, queue.Len())
// Pop 2 elements from the queue
for i := 1; i <= 2; i++ {
r, ok := queue.Pop()
require.True(t, ok)
require.NotNil(t, r)
ri, err := strconv.Atoi(r.Name)
require.NoError(t, err)
assert.Equal(t, i, ri)
}
require.Equal(t, 3, queue.Len())
// Update the item with number "4" but maintain priority
queue.Update(newTestItem(4, "2024-04-04T14:14:14Z"))
// Update the item with number "5" and increase the priority
queue.Update(newTestItem(5, "2021-01-01T01:01:01Z"))
// Updating non-existing items is a nop
queue.Update(newTestItem(10, "2021-01-01T01:01:01Z"))
require.Equal(t, 3, queue.Len())
// Pop all the remaining elements and make sure they're in order
popAndCompare(t, &queue, 5, "2021-01-01T01:01:01Z") // 5 comes before 3 now
popAndCompare(t, &queue, 3, "2023-03-03T03:03:03Z")
popAndCompare(t, &queue, 4, "2024-04-04T14:14:14Z")
_, ok := queue.Pop()
require.False(t, ok)
}
func TestQueuePeek(t *testing.T) {
queue := newQueue[*queueableItem]()
// Peeking an empty queue returns false
_, ok := queue.Peek()
require.False(t, ok)
// Add 6 items, which are not in order
queue.Insert(newTestItem(2, "2022-02-02T02:02:02Z"), false)
require.Equal(t, 1, queue.Len())
peekAndCompare(t, &queue, 2, "2022-02-02T02:02:02Z")
queue.Insert(newTestItem(3, "2023-03-03T03:03:03Z"), false)
require.Equal(t, 2, queue.Len())
peekAndCompare(t, &queue, 2, "2022-02-02T02:02:02Z")
queue.Insert(newTestItem(1, "2021-01-01T01:01:01Z"), false)
require.Equal(t, 3, queue.Len())
peekAndCompare(t, &queue, 1, "2021-01-01T01:01:01Z")
queue.Insert(newTestItem(5, "2029-09-09T09:09:09Z"), false)
require.Equal(t, 4, queue.Len())
peekAndCompare(t, &queue, 1, "2021-01-01T01:01:01Z")
queue.Insert(newTestItem(4, "2024-04-04T04:04:04Z"), false)
require.Equal(t, 5, queue.Len())
peekAndCompare(t, &queue, 1, "2021-01-01T01:01:01Z")
queue.Insert(newTestItem(6, "2019-01-19T01:01:01Z"), false)
require.Equal(t, 6, queue.Len())
peekAndCompare(t, &queue, 6, "2019-01-19T01:01:01Z")
// Pop from the queue
popAndCompare(t, &queue, 6, "2019-01-19T01:01:01Z")
peekAndCompare(t, &queue, 1, "2021-01-01T01:01:01Z")
// Update a item to bring it to first
queue.Update(newTestItem(2, "2019-01-19T01:01:01Z"))
peekAndCompare(t, &queue, 2, "2019-01-19T01:01:01Z")
// Replace the first item to push it back
queue.Insert(newTestItem(2, "2039-01-19T01:01:01Z"), true)
peekAndCompare(t, &queue, 1, "2021-01-01T01:01:01Z")
}
func newTestItem(n int, dueTime any) *queueableItem {
r := &queueableItem{
Name: strconv.Itoa(n),
}
switch t := dueTime.(type) {
case time.Time:
r.ExecutionTime = t
case string:
r.ExecutionTime, _ = time.Parse(time.RFC3339, t)
case int64:
r.ExecutionTime = time.Unix(t, 0)
}
return r
}
func popAndCompare(t *testing.T, q *queue[*queueableItem], expectN int, expectDueTime string) {
r, ok := q.Pop()
require.True(t, ok)
require.NotNil(t, r)
assert.Equal(t, strconv.Itoa(expectN), r.Name)
assert.Equal(t, expectDueTime, r.ScheduledTime().Format(time.RFC3339))
}
func peekAndCompare(t *testing.T, q *queue[*queueableItem], expectN int, expectDueTime string) {
r, ok := q.Peek()
require.True(t, ok)
require.NotNil(t, r)
assert.Equal(t, strconv.Itoa(expectN), r.Name)
assert.Equal(t, expectDueTime, r.ScheduledTime().Format(time.RFC3339))
}