mirror of https://github.com/dapr/kit.git
Adds generic ring
Adds a generic implementation of the stdblib ring buffer so that each ring `Value` can be a concrete type. https://pkg.go.dev/container/ring Adds `Len() int` and `Keys() []K` func to the generic Map cmap. Changes `events/queue` Processor `Queueable` to be an exported type. No functional change, but consumed types should be exported. Signed-off-by: joshvanl <me@joshvanl.dev>
This commit is contained in:
parent
d37dc603d0
commit
2423331f9c
|
@ -26,6 +26,8 @@ type Map[K comparable, T any] interface {
|
|||
LoadAndDelete(key K) (T, bool)
|
||||
Range(fn func(key K, value T) bool)
|
||||
Store(key K, value T)
|
||||
Len() int
|
||||
Keys() []K
|
||||
}
|
||||
|
||||
type mapimpl[K comparable, T any] struct {
|
||||
|
@ -79,3 +81,19 @@ func (m *mapimpl[K, T]) Store(k K, v T) {
|
|||
defer m.lock.Unlock()
|
||||
m.m[k] = v
|
||||
}
|
||||
|
||||
func (m *mapimpl[K, T]) Len() int {
|
||||
m.lock.RLock()
|
||||
defer m.lock.RUnlock()
|
||||
return len(m.m)
|
||||
}
|
||||
|
||||
func (m *mapimpl[K, T]) Keys() []K {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
keys := make([]K, 0, len(m.m))
|
||||
for k := range m.m {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
return keys
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ import (
|
|||
)
|
||||
|
||||
// Processor manages the queue of items and processes them at the correct time.
|
||||
type Processor[K comparable, T queueable[K]] struct {
|
||||
type Processor[K comparable, T Queueable[K]] struct {
|
||||
executeFn func(r T)
|
||||
queue queue[K, T]
|
||||
clock kclock.Clock
|
||||
|
@ -36,7 +36,7 @@ type Processor[K comparable, T queueable[K]] struct {
|
|||
|
||||
// NewProcessor returns a new Processor object.
|
||||
// executeFn is the callback invoked when the item is to be executed; this will be invoked in a background goroutine.
|
||||
func NewProcessor[K comparable, T queueable[K]](executeFn func(r T)) *Processor[K, T] {
|
||||
func NewProcessor[K comparable, T Queueable[K]](executeFn func(r T)) *Processor[K, T] {
|
||||
return &Processor[K, T]{
|
||||
executeFn: executeFn,
|
||||
queue: newQueue[K, T](),
|
||||
|
|
|
@ -18,8 +18,8 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
// queueable is the interface for items that can be added to the queue.
|
||||
type queueable[T comparable] interface {
|
||||
// Queueable is the interface for items that can be added to the queue.
|
||||
type Queueable[T comparable] interface {
|
||||
comparable
|
||||
Key() T
|
||||
ScheduledTime() time.Time
|
||||
|
@ -29,13 +29,13 @@ type queueable[T comparable] interface {
|
|||
// It acts as a "priority queue", in which items are added in order of when they're scheduled.
|
||||
// Internally, it uses a heap (from container/heap) that allows Insert and Pop operations to be completed in O(log N) time (where N is the queue's length).
|
||||
// Note: methods in this struct are not safe for concurrent use. Callers should use locks to ensure consistency.
|
||||
type queue[K comparable, T queueable[K]] struct {
|
||||
type queue[K comparable, T Queueable[K]] struct {
|
||||
heap *queueHeap[K, T]
|
||||
items map[K]*queueItem[K, T]
|
||||
}
|
||||
|
||||
// newQueue creates a new queue.
|
||||
func newQueue[K comparable, T queueable[K]]() queue[K, T] {
|
||||
func newQueue[K comparable, T Queueable[K]]() queue[K, T] {
|
||||
return queue[K, T]{
|
||||
heap: new(queueHeap[K, T]),
|
||||
items: make(map[K]*queueItem[K, T]),
|
||||
|
@ -122,14 +122,14 @@ func (p *queue[K, T]) Update(r T) {
|
|||
heap.Fix(p.heap, item.index)
|
||||
}
|
||||
|
||||
type queueItem[K comparable, T queueable[K]] struct {
|
||||
type queueItem[K comparable, T Queueable[K]] struct {
|
||||
value T
|
||||
|
||||
// The index of the item in the heap. This is maintained by the heap.Interface methods.
|
||||
index int
|
||||
}
|
||||
|
||||
type queueHeap[K comparable, T queueable[K]] []*queueItem[K, T]
|
||||
type queueHeap[K comparable, T Queueable[K]] []*queueItem[K, T]
|
||||
|
||||
func (pq queueHeap[K, T]) Len() int {
|
||||
return len(pq)
|
||||
|
|
|
@ -0,0 +1,137 @@
|
|||
// Copyright 2009 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// Package ring implements operations on circular lists.
|
||||
// Edited to be generic.
|
||||
package ring
|
||||
|
||||
// A Ring is an element of a circular list, or ring.
|
||||
// Rings do not have a beginning or end; a pointer to any ring element
|
||||
// serves as reference to the entire ring. Empty rings are represented
|
||||
// as nil Ring pointers. The zero value for a Ring is a one-element
|
||||
// ring with a nil Value.
|
||||
type Ring[T any] struct {
|
||||
next, prev *Ring[T]
|
||||
Value T // for use by client; untouched by this library
|
||||
}
|
||||
|
||||
func (r *Ring[T]) init() *Ring[T] {
|
||||
r.next = r
|
||||
r.prev = r
|
||||
return r
|
||||
}
|
||||
|
||||
// Next returns the next ring element. r must not be empty.
|
||||
func (r *Ring[T]) Next() *Ring[T] {
|
||||
if r.next == nil {
|
||||
return r.init()
|
||||
}
|
||||
return r.next
|
||||
}
|
||||
|
||||
// Prev returns the previous ring element. r must not be empty.
|
||||
func (r *Ring[T]) Prev() *Ring[T] {
|
||||
if r.next == nil {
|
||||
return r.init()
|
||||
}
|
||||
return r.prev
|
||||
}
|
||||
|
||||
// Move moves n % r.Len() elements backward (n < 0) or forward (n >= 0)
|
||||
// in the ring and returns that ring element. r must not be empty.
|
||||
func (r *Ring[T]) Move(n int) *Ring[T] {
|
||||
if r.next == nil {
|
||||
return r.init()
|
||||
}
|
||||
switch {
|
||||
case n < 0:
|
||||
for ; n < 0; n++ {
|
||||
r = r.prev
|
||||
}
|
||||
case n > 0:
|
||||
for ; n > 0; n-- {
|
||||
r = r.next
|
||||
}
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
// New creates a ring of n elements.
|
||||
func New[T any](n int) *Ring[T] {
|
||||
if n <= 0 {
|
||||
return nil
|
||||
}
|
||||
r := new(Ring[T])
|
||||
p := r
|
||||
for i := 1; i < n; i++ {
|
||||
p.next = &Ring[T]{prev: p}
|
||||
p = p.next
|
||||
}
|
||||
p.next = r
|
||||
r.prev = p
|
||||
return r
|
||||
}
|
||||
|
||||
// Link connects ring r with ring s such that r.Next()
|
||||
// becomes s and returns the original value for r.Next().
|
||||
// r must not be empty.
|
||||
//
|
||||
// If r and s point to the same ring, linking
|
||||
// them removes the elements between r and s from the ring.
|
||||
// The removed elements form a subring and the result is a
|
||||
// reference to that subring (if no elements were removed,
|
||||
// the result is still the original value for r.Next(),
|
||||
// and not nil).
|
||||
//
|
||||
// If r and s point to different rings, linking
|
||||
// them creates a single ring with the elements of s inserted
|
||||
// after r. The result points to the element following the
|
||||
// last element of s after insertion.
|
||||
func (r *Ring[T]) Link(s *Ring[T]) *Ring[T] {
|
||||
n := r.Next()
|
||||
if s != nil {
|
||||
p := s.Prev()
|
||||
// Note: Cannot use multiple assignment because
|
||||
// evaluation order of LHS is not specified.
|
||||
r.next = s
|
||||
s.prev = r
|
||||
n.prev = p
|
||||
p.next = n
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
// Unlink removes n % r.Len() elements from the ring r, starting
|
||||
// at r.Next(). If n % r.Len() == 0, r remains unchanged.
|
||||
// The result is the removed subring. r must not be empty.
|
||||
func (r *Ring[T]) Unlink(n int) *Ring[T] {
|
||||
if n <= 0 {
|
||||
return nil
|
||||
}
|
||||
return r.Link(r.Move(n + 1))
|
||||
}
|
||||
|
||||
// Len computes the number of elements in ring r.
|
||||
// It executes in time proportional to the number of elements.
|
||||
func (r *Ring[T]) Len() int {
|
||||
n := 0
|
||||
if r != nil {
|
||||
n = 1
|
||||
for p := r.Next(); p != r; p = p.next {
|
||||
n++
|
||||
}
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
// Do calls function f on each element of the ring, in forward order.
|
||||
// The behavior of Do is undefined if f changes *r.
|
||||
func (r *Ring[T]) Do(f func(any)) {
|
||||
if r != nil {
|
||||
f(r.Value)
|
||||
for p := r.Next(); p != r; p = p.next {
|
||||
f(p.Value)
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue