From 77af8ac182795c2cd168b9998e517bc2b8d737eb Mon Sep 17 00:00:00 2001 From: Josh van Leeuwen Date: Wed, 23 Apr 2025 11:28:21 -0300 Subject: [PATCH] events/loop & slices (#119) * events/loop & slices Adds a generic control loop implementation to `event/loop`. Adds a new `slices` package that provides a generic slice de-duplication func. Makes events batcher and queue processer taker in Options. Allows enqueuing multiple processor items in same func call. Signed-off-by: joshvanl * Lint Signed-off-by: joshvanl * lint Signed-off-by: joshvanl * lint Signed-off-by: joshvanl * Elements match Signed-off-by: joshvanl * Adds buffer size option to events loop Signed-off-by: joshvanl * nit Signed-off-by: joshvanl --------- Signed-off-by: joshvanl --- concurrency/slice/slice.go | 7 ++++ events/batcher/batcher.go | 27 ++++++++----- events/batcher/batcher_test.go | 27 ++++++++----- events/loop/loop.go | 72 +++++++++++++++++++++++++++++++++ events/queue/eventqueue_test.go | 4 +- events/queue/processor.go | 36 +++++++++++------ events/queue/processor_test.go | 16 +++++--- fswatcher/fswatcher.go | 4 +- fswatcher/fswatcher_test.go | 6 ++- fswatcher/unit_test.go | 4 +- slices/slices.go | 27 +++++++++++++ slices/slices_test.go | 55 +++++++++++++++++++++++++ 12 files changed, 240 insertions(+), 45 deletions(-) create mode 100644 events/loop/loop.go create mode 100644 slices/slices.go create mode 100644 slices/slices_test.go diff --git a/concurrency/slice/slice.go b/concurrency/slice/slice.go index 5917b86..d8a6542 100644 --- a/concurrency/slice/slice.go +++ b/concurrency/slice/slice.go @@ -20,6 +20,7 @@ type Slice[T any] interface { Append(items ...T) int Len() int Slice() []T + Store(items ...T) } type slice[T any] struct { @@ -49,3 +50,9 @@ func (s *slice[T]) Slice() []T { defer s.lock.RUnlock() return s.data } + +func (s *slice[T]) Store(items ...T) { + s.lock.Lock() + defer s.lock.Unlock() + s.data = items +} diff --git a/events/batcher/batcher.go b/events/batcher/batcher.go index 79348e5..e5bca87 100644 --- a/events/batcher/batcher.go +++ b/events/batcher/batcher.go @@ -29,6 +29,11 @@ type eventCh[T any] struct { ch chan<- T } +type Options struct { + Interval time.Duration + Clock clock.Clock +} + // Batcher is a one to many event batcher. It batches events and sends them to // the added event channel subscribers. Events are sent to the channels after // the interval has elapsed. If events with the same key are received within @@ -47,24 +52,26 @@ type Batcher[K comparable, T any] struct { } // New creates a new Batcher with the given interval and key type. -func New[K comparable, T any](interval time.Duration) *Batcher[K, T] { +func New[K comparable, T any](opts Options) *Batcher[K, T] { + cl := opts.Clock + if cl == nil { + cl = clock.RealClock{} + } + b := &Batcher[K, T]{ - interval: interval, - clock: clock.RealClock{}, + interval: opts.Interval, + clock: cl, closeCh: make(chan struct{}), } - b.queue = queue.NewProcessor[K, *item[K, T]](b.execute) + b.queue = queue.NewProcessor[K, *item[K, T]](queue.Options[K, *item[K, T]]{ + ExecuteFn: b.execute, + Clock: opts.Clock, + }) return b } -// WithClock sets the clock used by the batcher. Used for testing. -func (b *Batcher[K, T]) WithClock(clock clock.Clock) { - b.queue.WithClock(clock) - b.clock = clock -} - // Subscribe adds a new event channel subscriber. If the batcher is closed, the // subscriber is silently dropped. func (b *Batcher[K, T]) Subscribe(ctx context.Context, ch ...chan<- T) { diff --git a/events/batcher/batcher_test.go b/events/batcher/batcher_test.go index 4420207..8329662 100644 --- a/events/batcher/batcher_test.go +++ b/events/batcher/batcher_test.go @@ -26,22 +26,23 @@ func TestNew(t *testing.T) { t.Parallel() interval := time.Millisecond * 10 - b := New[string, struct{}](interval) - assert.Equal(t, interval, b.interval) + b := New[string, struct{}](Options{Interval: interval}) assert.False(t, b.closed.Load()) } func TestWithClock(t *testing.T) { - b := New[string, struct{}](time.Millisecond * 10) fakeClock := testingclock.NewFakeClock(time.Now()) - b.WithClock(fakeClock) + b := New[string, struct{}](Options{ + Interval: time.Millisecond * 10, + Clock: fakeClock, + }) assert.Equal(t, fakeClock, b.clock) } func TestSubscribe(t *testing.T) { t.Parallel() - b := New[string, struct{}](time.Millisecond * 10) + b := New[string, struct{}](Options{Interval: time.Millisecond * 10}) ch := make(chan struct{}) b.Subscribe(context.Background(), ch) assert.Len(t, b.eventChs, 1) @@ -51,8 +52,10 @@ func TestBatch(t *testing.T) { t.Parallel() fakeClock := testingclock.NewFakeClock(time.Now()) - b := New[string, struct{}](time.Millisecond * 10) - b.WithClock(fakeClock) + b := New[string, struct{}](Options{ + Interval: time.Millisecond * 10, + Clock: fakeClock, + }) ch1 := make(chan struct{}) ch2 := make(chan struct{}) ch3 := make(chan struct{}) @@ -104,8 +107,10 @@ func TestBatch(t *testing.T) { t.Run("ensure items are received in order with latest value", func(t *testing.T) { fakeClock := testingclock.NewFakeClock(time.Now()) - b := New[int, int](time.Millisecond * 10) - b.WithClock(fakeClock) + b := New[int, int](Options{ + Interval: time.Millisecond * 10, + Clock: fakeClock, + }) t.Cleanup(b.Close) ch1 := make(chan int, 10) ch2 := make(chan int, 10) @@ -136,7 +141,7 @@ func TestBatch(t *testing.T) { func TestClose(t *testing.T) { t.Parallel() - b := New[string, struct{}](time.Millisecond * 10) + b := New[string, struct{}](Options{Interval: time.Millisecond * 10}) ch := make(chan struct{}) b.Subscribe(context.Background(), ch) assert.Len(t, b.eventChs, 1) @@ -148,7 +153,7 @@ func TestClose(t *testing.T) { func TestSubscribeAfterClose(t *testing.T) { t.Parallel() - b := New[string, struct{}](time.Millisecond * 10) + b := New[string, struct{}](Options{Interval: time.Millisecond * 10}) b.Close() ch := make(chan struct{}) b.Subscribe(context.Background(), ch) diff --git a/events/loop/loop.go b/events/loop/loop.go new file mode 100644 index 0000000..b609579 --- /dev/null +++ b/events/loop/loop.go @@ -0,0 +1,72 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package loop + +import ( + "context" +) + +type HandlerFunc[T any] func(context.Context, T) error + +type Options[T any] struct { + Handler HandlerFunc[T] + BufferSize *uint64 +} + +type Loop[T any] struct { + queue chan T + handler HandlerFunc[T] + + closeCh chan struct{} +} + +func New[T any](opts Options[T]) *Loop[T] { + size := 1 + if opts.BufferSize != nil { + size = int(*opts.BufferSize) + } + + return &Loop[T]{ + queue: make(chan T, size), + closeCh: make(chan struct{}), + handler: opts.Handler, + } +} + +func (l *Loop[T]) Run(ctx context.Context) error { + defer close(l.closeCh) + + for { + var req T + select { + case req = <-l.queue: + case <-ctx.Done(): + } + + if err := ctx.Err(); err != nil { + return err + } + + if err := l.handler(ctx, req); err != nil { + return err + } + } +} + +func (l *Loop[T]) Enqueue(req T) { + select { + case l.queue <- req: + case <-l.closeCh: + } +} diff --git a/events/queue/eventqueue_test.go b/events/queue/eventqueue_test.go index a072263..dc32b35 100644 --- a/events/queue/eventqueue_test.go +++ b/events/queue/eventqueue_test.go @@ -43,7 +43,9 @@ func ExampleProcessor() { } // Create the processor - processor := NewProcessor[string, *queueableItem](executeFn) + processor := NewProcessor[string, *queueableItem](Options[string, *queueableItem]{ + ExecuteFn: executeFn, + }) // Add items to the processor, in any order, using Enqueue processor.Enqueue(&queueableItem{Name: "item1", ExecutionTime: time.Now().Add(500 * time.Millisecond)}) diff --git a/events/queue/processor.go b/events/queue/processor.go index 81fcf6c..b562b17 100644 --- a/events/queue/processor.go +++ b/events/queue/processor.go @@ -21,6 +21,11 @@ import ( kclock "k8s.io/utils/clock" ) +type Options[K comparable, T Queueable[K]] struct { + ExecuteFn func(r T) + Clock kclock.Clock +} + // Processor manages the queue of items and processes them at the correct time. type Processor[K comparable, T Queueable[K]] struct { executeFn func(r T) @@ -36,40 +41,45 @@ type Processor[K comparable, T Queueable[K]] struct { // NewProcessor returns a new Processor object. // executeFn is the callback invoked when the item is to be executed; this will be invoked in a background goroutine. -func NewProcessor[K comparable, T Queueable[K]](executeFn func(r T)) *Processor[K, T] { +func NewProcessor[K comparable, T Queueable[K]](opts Options[K, T]) *Processor[K, T] { + cl := opts.Clock + if cl == nil { + cl = kclock.RealClock{} + } return &Processor[K, T]{ - executeFn: executeFn, + executeFn: opts.ExecuteFn, queue: newQueue[K, T](), processorRunningCh: make(chan struct{}, 1), stopCh: make(chan struct{}), resetCh: make(chan struct{}, 1), - clock: kclock.RealClock{}, + clock: cl, } } -// WithClock sets the clock used by the processor. Used for testing. -func (p *Processor[K, T]) WithClock(clock kclock.Clock) *Processor[K, T] { - p.clock = clock - return p -} - -// Enqueue adds a new item to the queue. +// Enqueue adds a new items to the queue. // If a item with the same ID already exists, it'll be replaced. -func (p *Processor[K, T]) Enqueue(r T) { +func (p *Processor[K, T]) Enqueue(rs ...T) { if p.stopped.Load() { return } + p.lock.Lock() + defer p.lock.Unlock() + + for _, r := range rs { + p.enqueue(r) + } +} + +func (p *Processor[K, T]) enqueue(r T) { // Insert or replace the item in the queue // If the item added or replaced is the first one in the queue, we need to know that - p.lock.Lock() peek, ok := p.queue.Peek() isFirst := (ok && peek.Key() == r.Key()) // This is going to be true if the item being replaced is the first one in the queue p.queue.Insert(r, true) peek, _ = p.queue.Peek() // No need to check for "ok" here because we know this will return an item isFirst = isFirst || (peek == r) // This is also going to be true if the item just added landed at the front of the queue p.process(isFirst) - p.lock.Unlock() } // Dequeue removes a item from the queue. diff --git a/events/queue/processor_test.go b/events/queue/processor_test.go index 73e03c2..bf0efd5 100644 --- a/events/queue/processor_test.go +++ b/events/queue/processor_test.go @@ -31,10 +31,12 @@ func TestProcessor(t *testing.T) { // Create the processor clock := clocktesting.NewFakeClock(time.Now()) executeCh := make(chan *queueableItem) - processor := NewProcessor[string](func(r *queueableItem) { - executeCh <- r + processor := NewProcessor[string, *queueableItem](Options[string, *queueableItem]{ + ExecuteFn: func(r *queueableItem) { + executeCh <- r + }, + Clock: clock, }) - processor.clock = clock assertExecutedItem := func(t *testing.T) *queueableItem { t.Helper() @@ -347,10 +349,12 @@ func TestClose(t *testing.T) { // Create the processor clock := clocktesting.NewFakeClock(time.Now()) executeCh := make(chan *queueableItem) - processor := NewProcessor[string](func(r *queueableItem) { - executeCh <- r + processor := NewProcessor[string, *queueableItem](Options[string, *queueableItem]{ + ExecuteFn: func(r *queueableItem) { + executeCh <- r + }, + Clock: clock, }) - processor.clock = clock processor.Enqueue(newTestItem(1, clock.Now().Add(time.Second))) processor.Enqueue(newTestItem(2, clock.Now().Add(time.Second*2))) diff --git a/fswatcher/fswatcher.go b/fswatcher/fswatcher.go index 330444e..d192fff 100644 --- a/fswatcher/fswatcher.go +++ b/fswatcher/fswatcher.go @@ -71,7 +71,9 @@ func New(opts Options) (*FSWatcher, error) { w: w, // Often the case, writes to files are not atomic and involve multiple file system events. // We want to hold off on sending events until we are sure that the file has been written to completion. We do this by waiting for a period of time after the last event has been received for a file name. - batcher: batcher.New[string, struct{}](interval), + batcher: batcher.New[string, struct{}](batcher.Options{ + Interval: interval, + }), }, nil } diff --git a/fswatcher/fswatcher_test.go b/fswatcher/fswatcher_test.go index fb14eaf..da27a84 100644 --- a/fswatcher/fswatcher_test.go +++ b/fswatcher/fswatcher_test.go @@ -191,8 +191,10 @@ func TestFSWatcher(t *testing.T) { t.Run("should batch events of the same file for multiple events", func(t *testing.T) { clock := clocktesting.NewFakeClock(time.Time{}) - batcher := batcher.New[string, struct{}](time.Millisecond * 500) - batcher.WithClock(clock) + batcher := batcher.New[string, struct{}](batcher.Options{ + Interval: time.Millisecond * 500, + Clock: clock, + }) dir1 := t.TempDir() dir2 := t.TempDir() fp1 := filepath.Join(dir1, "test1.txt") diff --git a/fswatcher/unit_test.go b/fswatcher/unit_test.go index dc059ad..f4e49bc 100644 --- a/fswatcher/unit_test.go +++ b/fswatcher/unit_test.go @@ -27,7 +27,9 @@ import ( ) func TestWithBatcher(t *testing.T) { - b := batcher.New[string, struct{}](time.Millisecond * 10) + b := batcher.New[string, struct{}](batcher.Options{ + Interval: time.Millisecond * 10, + }) f, err := New(Options{}) require.NoError(t, err) f.WithBatcher(b) diff --git a/slices/slices.go b/slices/slices.go new file mode 100644 index 0000000..436c608 --- /dev/null +++ b/slices/slices.go @@ -0,0 +1,27 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package slices + +// Deduplicate removes duplicate elements from a slice. +func Deduplicate[S ~[]E, E comparable](s S) S { + ded := make(map[E]struct{}, len(s)) + for _, v := range s { + ded[v] = struct{}{} + } + unique := make(S, 0, len(ded)) + for v := range ded { + unique = append(unique, v) + } + return unique +} diff --git a/slices/slices_test.go b/slices/slices_test.go new file mode 100644 index 0000000..da6ca62 --- /dev/null +++ b/slices/slices_test.go @@ -0,0 +1,55 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package slices + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_Deduplicate(t *testing.T) { + tests := []struct { + input []int + exp []int + }{ + { + input: []int{1, 2, 3}, + exp: []int{1, 2, 3}, + }, + { + input: []int{1, 2, 2, 3, 1}, + exp: []int{1, 2, 3}, + }, + { + input: []int{5, 5, 5, 5}, + exp: []int{5}, + }, + { + input: []int{}, + exp: []int{}, + }, + { + input: []int{42}, + exp: []int{42}, + }, + } + + for _, test := range tests { + t.Run(fmt.Sprintf("%v", test.input), func(t *testing.T) { + assert.ElementsMatch(t, test.exp, Deduplicate(test.input)) + }) + } +}