events/loop & slices (#119)

* events/loop & slices

Adds a generic control loop implementation to `event/loop`.

Adds a new `slices` package that provides a generic slice de-duplication
func.

Makes events batcher and queue processer taker in Options.

Allows enqueuing multiple processor items in same func call.

Signed-off-by: joshvanl <me@joshvanl.dev>

* Lint

Signed-off-by: joshvanl <me@joshvanl.dev>

* lint

Signed-off-by: joshvanl <me@joshvanl.dev>

* lint

Signed-off-by: joshvanl <me@joshvanl.dev>

* Elements match

Signed-off-by: joshvanl <me@joshvanl.dev>

* Adds buffer size option to events loop

Signed-off-by: joshvanl <me@joshvanl.dev>

* nit

Signed-off-by: joshvanl <me@joshvanl.dev>

---------

Signed-off-by: joshvanl <me@joshvanl.dev>
This commit is contained in:
Josh van Leeuwen 2025-04-23 11:28:21 -03:00 committed by GitHub
parent a3f06e444a
commit 77af8ac182
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 240 additions and 45 deletions

View File

@ -20,6 +20,7 @@ type Slice[T any] interface {
Append(items ...T) int Append(items ...T) int
Len() int Len() int
Slice() []T Slice() []T
Store(items ...T)
} }
type slice[T any] struct { type slice[T any] struct {
@ -49,3 +50,9 @@ func (s *slice[T]) Slice() []T {
defer s.lock.RUnlock() defer s.lock.RUnlock()
return s.data return s.data
} }
func (s *slice[T]) Store(items ...T) {
s.lock.Lock()
defer s.lock.Unlock()
s.data = items
}

View File

@ -29,6 +29,11 @@ type eventCh[T any] struct {
ch chan<- T ch chan<- T
} }
type Options struct {
Interval time.Duration
Clock clock.Clock
}
// Batcher is a one to many event batcher. It batches events and sends them to // Batcher is a one to many event batcher. It batches events and sends them to
// the added event channel subscribers. Events are sent to the channels after // the added event channel subscribers. Events are sent to the channels after
// the interval has elapsed. If events with the same key are received within // the interval has elapsed. If events with the same key are received within
@ -47,24 +52,26 @@ type Batcher[K comparable, T any] struct {
} }
// New creates a new Batcher with the given interval and key type. // New creates a new Batcher with the given interval and key type.
func New[K comparable, T any](interval time.Duration) *Batcher[K, T] { func New[K comparable, T any](opts Options) *Batcher[K, T] {
cl := opts.Clock
if cl == nil {
cl = clock.RealClock{}
}
b := &Batcher[K, T]{ b := &Batcher[K, T]{
interval: interval, interval: opts.Interval,
clock: clock.RealClock{}, clock: cl,
closeCh: make(chan struct{}), closeCh: make(chan struct{}),
} }
b.queue = queue.NewProcessor[K, *item[K, T]](b.execute) b.queue = queue.NewProcessor[K, *item[K, T]](queue.Options[K, *item[K, T]]{
ExecuteFn: b.execute,
Clock: opts.Clock,
})
return b return b
} }
// WithClock sets the clock used by the batcher. Used for testing.
func (b *Batcher[K, T]) WithClock(clock clock.Clock) {
b.queue.WithClock(clock)
b.clock = clock
}
// Subscribe adds a new event channel subscriber. If the batcher is closed, the // Subscribe adds a new event channel subscriber. If the batcher is closed, the
// subscriber is silently dropped. // subscriber is silently dropped.
func (b *Batcher[K, T]) Subscribe(ctx context.Context, ch ...chan<- T) { func (b *Batcher[K, T]) Subscribe(ctx context.Context, ch ...chan<- T) {

View File

@ -26,22 +26,23 @@ func TestNew(t *testing.T) {
t.Parallel() t.Parallel()
interval := time.Millisecond * 10 interval := time.Millisecond * 10
b := New[string, struct{}](interval) b := New[string, struct{}](Options{Interval: interval})
assert.Equal(t, interval, b.interval)
assert.False(t, b.closed.Load()) assert.False(t, b.closed.Load())
} }
func TestWithClock(t *testing.T) { func TestWithClock(t *testing.T) {
b := New[string, struct{}](time.Millisecond * 10)
fakeClock := testingclock.NewFakeClock(time.Now()) fakeClock := testingclock.NewFakeClock(time.Now())
b.WithClock(fakeClock) b := New[string, struct{}](Options{
Interval: time.Millisecond * 10,
Clock: fakeClock,
})
assert.Equal(t, fakeClock, b.clock) assert.Equal(t, fakeClock, b.clock)
} }
func TestSubscribe(t *testing.T) { func TestSubscribe(t *testing.T) {
t.Parallel() t.Parallel()
b := New[string, struct{}](time.Millisecond * 10) b := New[string, struct{}](Options{Interval: time.Millisecond * 10})
ch := make(chan struct{}) ch := make(chan struct{})
b.Subscribe(context.Background(), ch) b.Subscribe(context.Background(), ch)
assert.Len(t, b.eventChs, 1) assert.Len(t, b.eventChs, 1)
@ -51,8 +52,10 @@ func TestBatch(t *testing.T) {
t.Parallel() t.Parallel()
fakeClock := testingclock.NewFakeClock(time.Now()) fakeClock := testingclock.NewFakeClock(time.Now())
b := New[string, struct{}](time.Millisecond * 10) b := New[string, struct{}](Options{
b.WithClock(fakeClock) Interval: time.Millisecond * 10,
Clock: fakeClock,
})
ch1 := make(chan struct{}) ch1 := make(chan struct{})
ch2 := make(chan struct{}) ch2 := make(chan struct{})
ch3 := make(chan struct{}) ch3 := make(chan struct{})
@ -104,8 +107,10 @@ func TestBatch(t *testing.T) {
t.Run("ensure items are received in order with latest value", func(t *testing.T) { t.Run("ensure items are received in order with latest value", func(t *testing.T) {
fakeClock := testingclock.NewFakeClock(time.Now()) fakeClock := testingclock.NewFakeClock(time.Now())
b := New[int, int](time.Millisecond * 10) b := New[int, int](Options{
b.WithClock(fakeClock) Interval: time.Millisecond * 10,
Clock: fakeClock,
})
t.Cleanup(b.Close) t.Cleanup(b.Close)
ch1 := make(chan int, 10) ch1 := make(chan int, 10)
ch2 := make(chan int, 10) ch2 := make(chan int, 10)
@ -136,7 +141,7 @@ func TestBatch(t *testing.T) {
func TestClose(t *testing.T) { func TestClose(t *testing.T) {
t.Parallel() t.Parallel()
b := New[string, struct{}](time.Millisecond * 10) b := New[string, struct{}](Options{Interval: time.Millisecond * 10})
ch := make(chan struct{}) ch := make(chan struct{})
b.Subscribe(context.Background(), ch) b.Subscribe(context.Background(), ch)
assert.Len(t, b.eventChs, 1) assert.Len(t, b.eventChs, 1)
@ -148,7 +153,7 @@ func TestClose(t *testing.T) {
func TestSubscribeAfterClose(t *testing.T) { func TestSubscribeAfterClose(t *testing.T) {
t.Parallel() t.Parallel()
b := New[string, struct{}](time.Millisecond * 10) b := New[string, struct{}](Options{Interval: time.Millisecond * 10})
b.Close() b.Close()
ch := make(chan struct{}) ch := make(chan struct{})
b.Subscribe(context.Background(), ch) b.Subscribe(context.Background(), ch)

72
events/loop/loop.go Normal file
View File

@ -0,0 +1,72 @@
/*
Copyright 2025 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package loop
import (
"context"
)
type HandlerFunc[T any] func(context.Context, T) error
type Options[T any] struct {
Handler HandlerFunc[T]
BufferSize *uint64
}
type Loop[T any] struct {
queue chan T
handler HandlerFunc[T]
closeCh chan struct{}
}
func New[T any](opts Options[T]) *Loop[T] {
size := 1
if opts.BufferSize != nil {
size = int(*opts.BufferSize)
}
return &Loop[T]{
queue: make(chan T, size),
closeCh: make(chan struct{}),
handler: opts.Handler,
}
}
func (l *Loop[T]) Run(ctx context.Context) error {
defer close(l.closeCh)
for {
var req T
select {
case req = <-l.queue:
case <-ctx.Done():
}
if err := ctx.Err(); err != nil {
return err
}
if err := l.handler(ctx, req); err != nil {
return err
}
}
}
func (l *Loop[T]) Enqueue(req T) {
select {
case l.queue <- req:
case <-l.closeCh:
}
}

View File

@ -43,7 +43,9 @@ func ExampleProcessor() {
} }
// Create the processor // Create the processor
processor := NewProcessor[string, *queueableItem](executeFn) processor := NewProcessor[string, *queueableItem](Options[string, *queueableItem]{
ExecuteFn: executeFn,
})
// Add items to the processor, in any order, using Enqueue // Add items to the processor, in any order, using Enqueue
processor.Enqueue(&queueableItem{Name: "item1", ExecutionTime: time.Now().Add(500 * time.Millisecond)}) processor.Enqueue(&queueableItem{Name: "item1", ExecutionTime: time.Now().Add(500 * time.Millisecond)})

View File

@ -21,6 +21,11 @@ import (
kclock "k8s.io/utils/clock" kclock "k8s.io/utils/clock"
) )
type Options[K comparable, T Queueable[K]] struct {
ExecuteFn func(r T)
Clock kclock.Clock
}
// Processor manages the queue of items and processes them at the correct time. // Processor manages the queue of items and processes them at the correct time.
type Processor[K comparable, T Queueable[K]] struct { type Processor[K comparable, T Queueable[K]] struct {
executeFn func(r T) executeFn func(r T)
@ -36,40 +41,45 @@ type Processor[K comparable, T Queueable[K]] struct {
// NewProcessor returns a new Processor object. // NewProcessor returns a new Processor object.
// executeFn is the callback invoked when the item is to be executed; this will be invoked in a background goroutine. // executeFn is the callback invoked when the item is to be executed; this will be invoked in a background goroutine.
func NewProcessor[K comparable, T Queueable[K]](executeFn func(r T)) *Processor[K, T] { func NewProcessor[K comparable, T Queueable[K]](opts Options[K, T]) *Processor[K, T] {
cl := opts.Clock
if cl == nil {
cl = kclock.RealClock{}
}
return &Processor[K, T]{ return &Processor[K, T]{
executeFn: executeFn, executeFn: opts.ExecuteFn,
queue: newQueue[K, T](), queue: newQueue[K, T](),
processorRunningCh: make(chan struct{}, 1), processorRunningCh: make(chan struct{}, 1),
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
resetCh: make(chan struct{}, 1), resetCh: make(chan struct{}, 1),
clock: kclock.RealClock{}, clock: cl,
} }
} }
// WithClock sets the clock used by the processor. Used for testing. // Enqueue adds a new items to the queue.
func (p *Processor[K, T]) WithClock(clock kclock.Clock) *Processor[K, T] {
p.clock = clock
return p
}
// Enqueue adds a new item to the queue.
// If a item with the same ID already exists, it'll be replaced. // If a item with the same ID already exists, it'll be replaced.
func (p *Processor[K, T]) Enqueue(r T) { func (p *Processor[K, T]) Enqueue(rs ...T) {
if p.stopped.Load() { if p.stopped.Load() {
return return
} }
p.lock.Lock()
defer p.lock.Unlock()
for _, r := range rs {
p.enqueue(r)
}
}
func (p *Processor[K, T]) enqueue(r T) {
// Insert or replace the item in the queue // Insert or replace the item in the queue
// If the item added or replaced is the first one in the queue, we need to know that // If the item added or replaced is the first one in the queue, we need to know that
p.lock.Lock()
peek, ok := p.queue.Peek() peek, ok := p.queue.Peek()
isFirst := (ok && peek.Key() == r.Key()) // This is going to be true if the item being replaced is the first one in the queue isFirst := (ok && peek.Key() == r.Key()) // This is going to be true if the item being replaced is the first one in the queue
p.queue.Insert(r, true) p.queue.Insert(r, true)
peek, _ = p.queue.Peek() // No need to check for "ok" here because we know this will return an item peek, _ = p.queue.Peek() // No need to check for "ok" here because we know this will return an item
isFirst = isFirst || (peek == r) // This is also going to be true if the item just added landed at the front of the queue isFirst = isFirst || (peek == r) // This is also going to be true if the item just added landed at the front of the queue
p.process(isFirst) p.process(isFirst)
p.lock.Unlock()
} }
// Dequeue removes a item from the queue. // Dequeue removes a item from the queue.

View File

@ -31,10 +31,12 @@ func TestProcessor(t *testing.T) {
// Create the processor // Create the processor
clock := clocktesting.NewFakeClock(time.Now()) clock := clocktesting.NewFakeClock(time.Now())
executeCh := make(chan *queueableItem) executeCh := make(chan *queueableItem)
processor := NewProcessor[string](func(r *queueableItem) { processor := NewProcessor[string, *queueableItem](Options[string, *queueableItem]{
ExecuteFn: func(r *queueableItem) {
executeCh <- r executeCh <- r
},
Clock: clock,
}) })
processor.clock = clock
assertExecutedItem := func(t *testing.T) *queueableItem { assertExecutedItem := func(t *testing.T) *queueableItem {
t.Helper() t.Helper()
@ -347,10 +349,12 @@ func TestClose(t *testing.T) {
// Create the processor // Create the processor
clock := clocktesting.NewFakeClock(time.Now()) clock := clocktesting.NewFakeClock(time.Now())
executeCh := make(chan *queueableItem) executeCh := make(chan *queueableItem)
processor := NewProcessor[string](func(r *queueableItem) { processor := NewProcessor[string, *queueableItem](Options[string, *queueableItem]{
ExecuteFn: func(r *queueableItem) {
executeCh <- r executeCh <- r
},
Clock: clock,
}) })
processor.clock = clock
processor.Enqueue(newTestItem(1, clock.Now().Add(time.Second))) processor.Enqueue(newTestItem(1, clock.Now().Add(time.Second)))
processor.Enqueue(newTestItem(2, clock.Now().Add(time.Second*2))) processor.Enqueue(newTestItem(2, clock.Now().Add(time.Second*2)))

View File

@ -71,7 +71,9 @@ func New(opts Options) (*FSWatcher, error) {
w: w, w: w,
// Often the case, writes to files are not atomic and involve multiple file system events. // Often the case, writes to files are not atomic and involve multiple file system events.
// We want to hold off on sending events until we are sure that the file has been written to completion. We do this by waiting for a period of time after the last event has been received for a file name. // We want to hold off on sending events until we are sure that the file has been written to completion. We do this by waiting for a period of time after the last event has been received for a file name.
batcher: batcher.New[string, struct{}](interval), batcher: batcher.New[string, struct{}](batcher.Options{
Interval: interval,
}),
}, nil }, nil
} }

View File

@ -191,8 +191,10 @@ func TestFSWatcher(t *testing.T) {
t.Run("should batch events of the same file for multiple events", func(t *testing.T) { t.Run("should batch events of the same file for multiple events", func(t *testing.T) {
clock := clocktesting.NewFakeClock(time.Time{}) clock := clocktesting.NewFakeClock(time.Time{})
batcher := batcher.New[string, struct{}](time.Millisecond * 500) batcher := batcher.New[string, struct{}](batcher.Options{
batcher.WithClock(clock) Interval: time.Millisecond * 500,
Clock: clock,
})
dir1 := t.TempDir() dir1 := t.TempDir()
dir2 := t.TempDir() dir2 := t.TempDir()
fp1 := filepath.Join(dir1, "test1.txt") fp1 := filepath.Join(dir1, "test1.txt")

View File

@ -27,7 +27,9 @@ import (
) )
func TestWithBatcher(t *testing.T) { func TestWithBatcher(t *testing.T) {
b := batcher.New[string, struct{}](time.Millisecond * 10) b := batcher.New[string, struct{}](batcher.Options{
Interval: time.Millisecond * 10,
})
f, err := New(Options{}) f, err := New(Options{})
require.NoError(t, err) require.NoError(t, err)
f.WithBatcher(b) f.WithBatcher(b)

27
slices/slices.go Normal file
View File

@ -0,0 +1,27 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package slices
// Deduplicate removes duplicate elements from a slice.
func Deduplicate[S ~[]E, E comparable](s S) S {
ded := make(map[E]struct{}, len(s))
for _, v := range s {
ded[v] = struct{}{}
}
unique := make(S, 0, len(ded))
for v := range ded {
unique = append(unique, v)
}
return unique
}

55
slices/slices_test.go Normal file
View File

@ -0,0 +1,55 @@
/*
Copyright 2025 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package slices
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
)
func Test_Deduplicate(t *testing.T) {
tests := []struct {
input []int
exp []int
}{
{
input: []int{1, 2, 3},
exp: []int{1, 2, 3},
},
{
input: []int{1, 2, 2, 3, 1},
exp: []int{1, 2, 3},
},
{
input: []int{5, 5, 5, 5},
exp: []int{5},
},
{
input: []int{},
exp: []int{},
},
{
input: []int{42},
exp: []int{42},
},
}
for _, test := range tests {
t.Run(fmt.Sprintf("%v", test.input), func(t *testing.T) {
assert.ElementsMatch(t, test.exp, Deduplicate(test.input))
})
}
}