events/loop: add reset (#120)

* events/loop: add reset

Update loop implementation is include functionality for Reset which is
useful when caching the loop struct for future use to reduce
allocations.

Signed-off-by: joshvanl <me@joshvanl.dev>

* lint

Signed-off-by: joshvanl <me@joshvanl.dev>

---------

Signed-off-by: joshvanl <me@joshvanl.dev>
This commit is contained in:
Josh van Leeuwen 2025-05-15 23:23:38 +01:00 committed by GitHub
parent e3d4a8f1b4
commit 98fe567235
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 148 additions and 26 deletions

65
events/loop/fake/fake.go Normal file
View File

@ -0,0 +1,65 @@
/*
Copyright 2025 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fake
import (
"context"
"github.com/dapr/kit/events/loop"
)
type Fake[T any] struct {
runFn func(context.Context) error
enqueueFn func(T)
closeFn func(T)
}
func New[T any]() *Fake[T] {
return &Fake[T]{
runFn: func(context.Context) error { return nil },
enqueueFn: func(T) {},
closeFn: func(T) {},
}
}
func (f *Fake[T]) WithRun(fn func(context.Context) error) *Fake[T] {
f.runFn = fn
return f
}
func (f *Fake[T]) WithEnqueue(fn func(T)) *Fake[T] {
f.enqueueFn = fn
return f
}
func (f *Fake[T]) WithClose(fn func(T)) *Fake[T] {
f.closeFn = fn
return f
}
func (f *Fake[T]) Run(ctx context.Context) error {
return f.runFn(ctx)
}
func (f *Fake[T]) Enqueue(t T) {
f.enqueueFn(t)
}
func (f *Fake[T]) Close(t T) {
f.closeFn(t)
}
func (f *Fake[T]) Reset(loop.Handler[T], uint64) loop.Interface[T] {
return f
}

View File

@ -0,0 +1,24 @@
/*
Copyright 2025 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fake
import (
"testing"
"github.com/dapr/kit/events/loop"
)
func Test_Fake(*testing.T) {
var _ loop.Interface[int] = New[int]()
}

View File

@ -15,58 +15,91 @@ package loop
import ( import (
"context" "context"
"sync"
) )
type HandlerFunc[T any] func(context.Context, T) error type Handler[T any] interface {
Handle(ctx context.Context, t T) error
type Options[T any] struct {
Handler HandlerFunc[T]
BufferSize *uint64
} }
type Loop[T any] struct { type Interface[T any] interface {
Run(ctx context.Context) error
Enqueue(t T)
Close(t T)
Reset(h Handler[T], size uint64) Interface[T]
}
type loop[T any] struct {
queue chan T queue chan T
handler HandlerFunc[T] handler Handler[T]
closed bool
closeCh chan struct{} closeCh chan struct{}
lock sync.RWMutex
} }
func New[T any](opts Options[T]) *Loop[T] { func New[T any](h Handler[T], size uint64) Interface[T] {
size := 1 return &loop[T]{
if opts.BufferSize != nil {
size = int(*opts.BufferSize)
}
return &Loop[T]{
queue: make(chan T, size), queue: make(chan T, size),
handler: h,
closeCh: make(chan struct{}), closeCh: make(chan struct{}),
handler: opts.Handler,
} }
} }
func (l *Loop[T]) Run(ctx context.Context) error { func Empty[T any]() Interface[T] {
return new(loop[T])
}
func (l *loop[T]) Run(ctx context.Context) error {
defer close(l.closeCh) defer close(l.closeCh)
for { for {
var req T req, ok := <-l.queue
select { if !ok {
case req = <-l.queue: return nil
case <-ctx.Done():
} }
if err := ctx.Err(); err != nil { if err := l.handler.Handle(ctx, req); err != nil {
return err
}
if err := l.handler(ctx, req); err != nil {
return err return err
} }
} }
} }
func (l *Loop[T]) Enqueue(req T) { func (l *loop[T]) Enqueue(req T) {
l.lock.RLock()
defer l.lock.RUnlock()
if l.closed {
return
}
select { select {
case l.queue <- req: case l.queue <- req:
case <-l.closeCh: case <-l.closeCh:
} }
} }
func (l *loop[T]) Close(req T) {
l.lock.Lock()
l.closed = true
l.queue <- req
close(l.queue)
l.lock.Unlock()
<-l.closeCh
}
func (l *loop[T]) Reset(h Handler[T], size uint64) Interface[T] {
if l == nil {
return New[T](h, size)
}
l.closed = false
l.closeCh = make(chan struct{})
l.handler = h
// TODO: @joshvanl: use a ring buffer so that we don't need to reallocate and
// improve performance.
l.queue = make(chan T, size)
return l
}