Mutexmap (#95)

* Adds Mutex Map

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Adds an atomic map

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* More work on atomic map and mutex map

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Fixes, improvements and more tests

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Updates interface

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Linter

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Refactors atomic map to use generics

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* cleanups

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Apply suggestions from code review

Co-authored-by: Cassie Coyle <cassie.i.coyle@gmail.com>
Signed-off-by: Elena Kolevska <elena-kolevska@users.noreply.github.com>

* small reorg

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Adds ItemCount()

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Removes atomicmap in favour of haxmap

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* formats fix and adds comment

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Update concurrency/mutexmap.go

Co-authored-by: Josh van Leeuwen <me@joshvanl.dev>
Signed-off-by: Elena Kolevska <elena-kolevska@users.noreply.github.com>

* Uses built in `clear`

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Revert "Removes atomicmap in favour of haxmap"

This reverts commit 20ca9ad197.

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Uses clear() for atomic map too

Signed-off-by: Elena Kolevska <elena@kolevska.com>

---------

Signed-off-by: Elena Kolevska <elena@kolevska.com>
Signed-off-by: Elena Kolevska <elena-kolevska@users.noreply.github.com>
Co-authored-by: Cassie Coyle <cassie.i.coyle@gmail.com>
Co-authored-by: Josh van Leeuwen <me@joshvanl.dev>
This commit is contained in:
Elena Kolevska 2024-05-23 23:57:05 +01:00 committed by GitHub
parent ccffb60016
commit 106329e583
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 420 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
**/.DS_Store
.idea
.vscode
.vs

111
concurrency/atomicmap.go Normal file
View File

@ -0,0 +1,111 @@
/*
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package concurrency
import (
"sync"
"golang.org/x/exp/constraints"
)
type AtomicValue[T constraints.Integer] struct {
lock sync.RWMutex
value T
}
func (a *AtomicValue[T]) Load() T {
a.lock.RLock()
defer a.lock.RUnlock()
return a.value
}
func (a *AtomicValue[T]) Store(v T) {
a.lock.Lock()
defer a.lock.Unlock()
a.value = v
}
func (a *AtomicValue[T]) Add(v T) T {
a.lock.Lock()
defer a.lock.Unlock()
a.value += v
return a.value
}
type AtomicMap[K comparable, T constraints.Integer] interface {
Get(key K) (*AtomicValue[T], bool)
GetOrCreate(key K, createT T) *AtomicValue[T]
Delete(key K)
ForEach(fn func(key K, value *AtomicValue[T]))
Clear()
}
type atomicMap[K comparable, T constraints.Integer] struct {
lock sync.RWMutex
items map[K]*AtomicValue[T]
}
func NewAtomicMap[K comparable, T constraints.Integer]() AtomicMap[K, T] {
return &atomicMap[K, T]{
items: make(map[K]*AtomicValue[T]),
}
}
func (a *atomicMap[K, T]) Get(key K) (*AtomicValue[T], bool) {
a.lock.RLock()
defer a.lock.RUnlock()
item, ok := a.items[key]
if !ok {
return nil, false
}
return item, true
}
func (a *atomicMap[K, T]) GetOrCreate(key K, createT T) *AtomicValue[T] {
a.lock.RLock()
item, ok := a.items[key]
a.lock.RUnlock()
if !ok {
a.lock.Lock()
// Double-check the key exists to avoid race condition
item, ok = a.items[key]
if !ok {
item = &AtomicValue[T]{value: createT}
a.items[key] = item
}
a.lock.Unlock()
}
return item
}
func (a *atomicMap[K, T]) Delete(key K) {
a.lock.Lock()
delete(a.items, key)
a.lock.Unlock()
}
func (a *atomicMap[K, T]) ForEach(fn func(key K, value *AtomicValue[T])) {
a.lock.RLock()
defer a.lock.RUnlock()
for k, v := range a.items {
fn(k, v)
}
}
func (a *atomicMap[K, T]) Clear() {
a.lock.Lock()
defer a.lock.Unlock()
clear(a.items)
}

View File

@ -0,0 +1,79 @@
/*
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package concurrency
import (
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestAtomicMapInt32_New_Get_Delete(t *testing.T) {
m := NewAtomicMap[string, int32]().(*atomicMap[string, int32])
require.NotNil(t, m)
require.NotNil(t, m.items)
require.Empty(t, m.items)
t.Run("basic operations", func(t *testing.T) {
key := "key1"
value := int32(10)
// Initially, the key should not exist
_, ok := m.Get(key)
require.False(t, ok)
// Add a value and check it
m.GetOrCreate(key, 0).Store(value)
result, ok := m.Get(key)
require.True(t, ok)
assert.Equal(t, value, result.Load())
// Delete the key and check it no longer exists
m.Delete(key)
_, ok = m.Get(key)
require.False(t, ok)
})
t.Run("concurrent access multiple keys", func(t *testing.T) {
var wg sync.WaitGroup
keys := []string{"key1", "key2", "key3"}
iterations := 100
wg.Add(len(keys) * 2)
for _, key := range keys {
go func(k string) {
defer wg.Done()
for i := 0; i < iterations; i++ {
m.GetOrCreate(k, 0).Add(1)
}
}(key)
go func(k string) {
defer wg.Done()
for i := 0; i < iterations; i++ {
m.GetOrCreate(k, 0).Add(-1)
}
}(key)
}
wg.Wait()
for _, key := range keys {
val, ok := m.Get(key)
require.True(t, ok)
require.Equal(t, int32(0), val.Load())
}
})
}

119
concurrency/mutexmap.go Normal file
View File

@ -0,0 +1,119 @@
/*
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package concurrency
import (
"sync"
)
// MutexMap is an interface that defines a thread-safe map with keys of type T associated to
// read-write mutexes (sync.RWMutex), allowing for granular locking on a per-key basis.
// This can be useful for scenarios where fine-grained concurrency control is needed.
//
// Methods:
// - Lock(key T): Acquires an exclusive lock on the mutex associated with the given key.
// - Unlock(key T): Releases the exclusive lock on the mutex associated with the given key.
// - RLock(key T): Acquires a read lock on the mutex associated with the given key.
// - RUnlock(key T): Releases the read lock on the mutex associated with the given key.
// - Delete(key T): Removes the mutex associated with the given key from the map.
// - Clear(): Removes all mutexes from the map.
// - ItemCount() int: Returns the number of items (mutexes) in the map.
type MutexMap[T comparable] interface {
Lock(key T)
Unlock(key T)
RLock(key T)
RUnlock(key T)
Delete(key T)
Clear()
ItemCount() int
}
type mutexMap[T comparable] struct {
lock sync.RWMutex
items map[T]*sync.RWMutex
}
func NewMutexMap[T comparable]() MutexMap[T] {
return &mutexMap[T]{
items: make(map[T]*sync.RWMutex),
}
}
func (a *mutexMap[T]) Lock(key T) {
a.lock.RLock()
mutex, ok := a.items[key]
a.lock.RUnlock()
if !ok {
a.lock.Lock()
mutex, ok = a.items[key]
if !ok {
mutex = &sync.RWMutex{}
a.items[key] = mutex
}
a.lock.Unlock()
}
mutex.Lock()
}
func (a *mutexMap[T]) Unlock(key T) {
a.lock.RLock()
mutex, ok := a.items[key]
a.lock.RUnlock()
if ok {
mutex.Unlock()
}
}
func (a *mutexMap[T]) RLock(key T) {
a.lock.RLock()
mutex, ok := a.items[key]
a.lock.RUnlock()
if !ok {
a.lock.Lock()
mutex, ok = a.items[key]
if !ok {
mutex = &sync.RWMutex{}
a.items[key] = mutex
}
a.lock.Unlock()
}
mutex.Lock()
}
func (a *mutexMap[T]) RUnlock(key T) {
a.lock.RLock()
mutex, ok := a.items[key]
a.lock.RUnlock()
if ok {
mutex.Unlock()
}
}
func (a *mutexMap[T]) Delete(key T) {
a.lock.Lock()
delete(a.items, key)
a.lock.Unlock()
}
func (a *mutexMap[T]) Clear() {
a.lock.Lock()
clear(a.items)
a.lock.Unlock()
}
func (a *mutexMap[T]) ItemCount() int {
a.lock.Lock()
defer a.lock.Unlock()
return len(a.items)
}

View File

@ -0,0 +1,107 @@
/*
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package concurrency
import (
"sync"
"testing"
"github.com/stretchr/testify/require"
)
func TestNewMutexMap_Add_Delete(t *testing.T) {
mm := NewMutexMap[string]().(*mutexMap[string])
t.Run("New mutex map", func(t *testing.T) {
require.NotNil(t, mm)
require.NotNil(t, mm.items)
require.Empty(t, mm.items)
})
t.Run("Lock and unlock mutex", func(t *testing.T) {
mm.Lock("key1")
_, ok := mm.items["key1"]
require.True(t, ok)
mm.Unlock("key1")
})
t.Run("Concurrently lock and unlock mutexes", func(t *testing.T) {
var counter int
var wg sync.WaitGroup
numGoroutines := 10
wg.Add(numGoroutines)
// Concurrently lock and unlock for each key
for i := 0; i < numGoroutines; i++ {
go func() {
defer wg.Done()
mm.Lock("key1")
counter++
mm.Unlock("key1")
}()
}
wg.Wait()
require.Equal(t, 10, counter)
})
t.Run("RLock and RUnlock mutex", func(t *testing.T) {
mm.RLock("key1")
_, ok := mm.items["key1"]
require.True(t, ok)
mm.RUnlock("key1")
})
t.Run("Concurrently RLock and RUnlock mutexes", func(t *testing.T) {
var counter int
var wg sync.WaitGroup
numGoroutines := 10
wg.Add(numGoroutines)
// Concurrently RLock and RUnlock for each key
for i := 0; i < numGoroutines; i++ {
go func() {
defer wg.Done()
mm.RLock("key1")
counter++
mm.RUnlock("key1")
}()
}
wg.Wait()
require.Equal(t, 10, counter)
})
t.Run("Delete mutex", func(t *testing.T) {
mm.Lock("key1")
mm.Unlock("key1")
mm.Delete("key1")
_, ok := mm.items["key1"]
require.False(t, ok)
})
t.Run("Clear all mutexes, and check item count", func(t *testing.T) {
mm.Lock("key1")
mm.Unlock("key1")
mm.Lock("key2")
mm.Unlock("key2")
require.Equal(t, 2, mm.ItemCount())
mm.Clear()
require.Empty(t, mm.items)
})
}