kmsv2: fix race in simpleCache.set when setting cache size metric

Signed-off-by: Monis Khan <mok@microsoft.com>

Kubernetes-commit: b10697c7880848d7ec110fd6b3e67015bbe74fa8
This commit is contained in:
Monis Khan 2023-08-27 15:14:04 -04:00 committed by Kubernetes Publisher
parent 9998872cea
commit 49c6151dee
2 changed files with 60 additions and 4 deletions

View File

@ -39,8 +39,10 @@ type simpleCache struct {
ttl time.Duration
// hashPool is a per cache pool of hash.Hash (to avoid allocations from building the Hash)
// SHA-256 is used to prevent collisions
hashPool *sync.Pool
providerName string
hashPool *sync.Pool
providerName string
mu sync.Mutex // guards call to set
recordCacheSize func(providerName string, size int) // for unit tests
}
func newSimpleCache(clock clock.Clock, ttl time.Duration, providerName string) *simpleCache {
@ -54,7 +56,8 @@ func newSimpleCache(clock clock.Clock, ttl time.Duration, providerName string) *
return sha256.New()
},
},
providerName: providerName,
providerName: providerName,
recordCacheSize: metrics.RecordDekSourceCacheSize,
}
}
@ -69,6 +72,8 @@ func (c *simpleCache) get(key []byte) value.Read {
// set caches the record for the key
func (c *simpleCache) set(key []byte, transformer value.Read) {
c.mu.Lock()
defer c.mu.Unlock()
if len(key) == 0 {
panic("key must not be empty")
}
@ -77,7 +82,7 @@ func (c *simpleCache) set(key []byte, transformer value.Read) {
}
c.cache.Set(c.keyFunc(key), transformer, c.ttl)
// Add metrics for cache size
metrics.RecordDekSourceCacheSize(c.providerName, c.cache.Len())
c.recordCacheSize(c.providerName, c.cache.Len())
}
// keyFunc generates a string key by hashing the inputs.

View File

@ -22,9 +22,11 @@ import (
"crypto/sha256"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/storage/value"
testingclock "k8s.io/utils/clock/testing"
)
@ -153,3 +155,52 @@ func generateKey(length int) (key []byte, err error) {
}
return key, nil
}
func TestMetrics(t *testing.T) {
fakeClock := testingclock.NewFakeClock(time.Now())
cache := newSimpleCache(fakeClock, 5*time.Second, "panda")
var record sync.Map
var cacheSize atomic.Uint64
cache.recordCacheSize = func(providerName string, size int) {
if providerName != "panda" {
t.Errorf(`expected "panda" as provider name, got %q`, providerName)
}
if _, loaded := record.LoadOrStore(size, nil); loaded {
t.Errorf("detected duplicated cache size metric for %d", size)
}
newSize := uint64(size)
oldSize := cacheSize.Swap(newSize)
if oldSize > newSize {
t.Errorf("cache size decreased from %d to %d", oldSize, newSize)
}
}
transformer := &envelopeTransformer{}
want := sets.NewInt()
startCh := make(chan struct{})
wg := sync.WaitGroup{}
for i := 0; i < 100; i++ {
want.Insert(i + 1)
k := fmt.Sprintf("key-%d", i)
wg.Add(1)
go func(key string) {
defer wg.Done()
<-startCh
cache.set([]byte(key), transformer)
}(k)
}
close(startCh)
wg.Wait()
got := sets.NewInt()
record.Range(func(key, value any) bool {
got.Insert(key.(int))
if value != nil {
t.Errorf("expected value to be nil but got %v", value)
}
return true
})
if !want.Equal(got) {
t.Errorf("cache size entries missing values: %v", want.SymmetricDifference(got).List())
}
}