diff --git a/events/loop/fake/fake.go b/events/loop/fake/fake.go new file mode 100644 index 0000000..5c78763 --- /dev/null +++ b/events/loop/fake/fake.go @@ -0,0 +1,65 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fake + +import ( + "context" + + "github.com/dapr/kit/events/loop" +) + +type Fake[T any] struct { + runFn func(context.Context) error + enqueueFn func(T) + closeFn func(T) +} + +func New[T any]() *Fake[T] { + return &Fake[T]{ + runFn: func(context.Context) error { return nil }, + enqueueFn: func(T) {}, + closeFn: func(T) {}, + } +} + +func (f *Fake[T]) WithRun(fn func(context.Context) error) *Fake[T] { + f.runFn = fn + return f +} + +func (f *Fake[T]) WithEnqueue(fn func(T)) *Fake[T] { + f.enqueueFn = fn + return f +} + +func (f *Fake[T]) WithClose(fn func(T)) *Fake[T] { + f.closeFn = fn + return f +} + +func (f *Fake[T]) Run(ctx context.Context) error { + return f.runFn(ctx) +} + +func (f *Fake[T]) Enqueue(t T) { + f.enqueueFn(t) +} + +func (f *Fake[T]) Close(t T) { + f.closeFn(t) +} + +func (f *Fake[T]) Reset(loop.Handler[T], uint64) loop.Interface[T] { + return f +} diff --git a/events/loop/fake/fake_test.go b/events/loop/fake/fake_test.go new file mode 100644 index 0000000..3bb1490 --- /dev/null +++ b/events/loop/fake/fake_test.go @@ -0,0 +1,24 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fake + +import ( + "testing" + + "github.com/dapr/kit/events/loop" +) + +func Test_Fake(*testing.T) { + var _ loop.Interface[int] = New[int]() +} diff --git a/events/loop/loop.go b/events/loop/loop.go index b609579..c7fb073 100644 --- a/events/loop/loop.go +++ b/events/loop/loop.go @@ -15,58 +15,91 @@ package loop import ( "context" + "sync" ) -type HandlerFunc[T any] func(context.Context, T) error - -type Options[T any] struct { - Handler HandlerFunc[T] - BufferSize *uint64 +type Handler[T any] interface { + Handle(ctx context.Context, t T) error } -type Loop[T any] struct { +type Interface[T any] interface { + Run(ctx context.Context) error + Enqueue(t T) + Close(t T) + Reset(h Handler[T], size uint64) Interface[T] +} + +type loop[T any] struct { queue chan T - handler HandlerFunc[T] + handler Handler[T] + closed bool closeCh chan struct{} + lock sync.RWMutex } -func New[T any](opts Options[T]) *Loop[T] { - size := 1 - if opts.BufferSize != nil { - size = int(*opts.BufferSize) - } - - return &Loop[T]{ +func New[T any](h Handler[T], size uint64) Interface[T] { + return &loop[T]{ queue: make(chan T, size), + handler: h, closeCh: make(chan struct{}), - handler: opts.Handler, } } -func (l *Loop[T]) Run(ctx context.Context) error { +func Empty[T any]() Interface[T] { + return new(loop[T]) +} + +func (l *loop[T]) Run(ctx context.Context) error { defer close(l.closeCh) for { - var req T - select { - case req = <-l.queue: - case <-ctx.Done(): + req, ok := <-l.queue + if !ok { + return nil } - if err := ctx.Err(); err != nil { - return err - } - - if err := l.handler(ctx, req); err != nil { + if err := l.handler.Handle(ctx, req); err != nil { return err } } } -func (l *Loop[T]) Enqueue(req T) { +func (l *loop[T]) Enqueue(req T) { + l.lock.RLock() + defer l.lock.RUnlock() + + if l.closed { + return + } + select { case l.queue <- req: case <-l.closeCh: } } + +func (l *loop[T]) Close(req T) { + l.lock.Lock() + l.closed = true + l.queue <- req + close(l.queue) + l.lock.Unlock() + <-l.closeCh +} + +func (l *loop[T]) Reset(h Handler[T], size uint64) Interface[T] { + if l == nil { + return New[T](h, size) + } + + l.closed = false + l.closeCh = make(chan struct{}) + l.handler = h + + // TODO: @joshvanl: use a ring buffer so that we don't need to reallocate and + // improve performance. + l.queue = make(chan T, size) + + return l +}