inmemory return ttlExpiryTime in GetResponse (#2870)
Signed-off-by: joshvanl <me@joshvanl.dev> Signed-off-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com> Co-authored-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
parent
ff06ae6b33
commit
638b106bdb
|
@ -24,6 +24,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
|
"k8s.io/utils/clock"
|
||||||
|
|
||||||
"github.com/dapr/components-contrib/state"
|
"github.com/dapr/components-contrib/state"
|
||||||
"github.com/dapr/components-contrib/state/utils"
|
"github.com/dapr/components-contrib/state/utils"
|
||||||
|
@ -37,6 +38,7 @@ type inMemoryStore struct {
|
||||||
items map[string]*inMemStateStoreItem
|
items map[string]*inMemStateStoreItem
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
log logger.Logger
|
log logger.Logger
|
||||||
|
clock clock.Clock
|
||||||
closeCh chan struct{}
|
closeCh chan struct{}
|
||||||
closed atomic.Bool
|
closed atomic.Bool
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
|
@ -51,6 +53,7 @@ func newStateStore(log logger.Logger) *inMemoryStore {
|
||||||
items: map[string]*inMemStateStoreItem{},
|
items: map[string]*inMemStateStoreItem{},
|
||||||
log: log,
|
log: log,
|
||||||
closeCh: make(chan struct{}),
|
closeCh: make(chan struct{}),
|
||||||
|
clock: clock.RealClock{},
|
||||||
}
|
}
|
||||||
s.BulkStore = state.NewDefaultBulkStore(s)
|
s.BulkStore = state.NewDefaultBulkStore(s)
|
||||||
return s
|
return s
|
||||||
|
@ -146,7 +149,7 @@ func (store *inMemoryStore) Get(ctx context.Context, req *state.GetRequest) (*st
|
||||||
store.lock.RLock()
|
store.lock.RLock()
|
||||||
item := store.items[req.Key]
|
item := store.items[req.Key]
|
||||||
store.lock.RUnlock()
|
store.lock.RUnlock()
|
||||||
if item != nil && item.isExpired() {
|
if item != nil && item.isExpired(store.clock.Now()) {
|
||||||
store.lock.Lock()
|
store.lock.Lock()
|
||||||
item = store.getAndExpire(req.Key)
|
item = store.getAndExpire(req.Key)
|
||||||
store.lock.Unlock()
|
store.lock.Unlock()
|
||||||
|
@ -156,7 +159,14 @@ func (store *inMemoryStore) Get(ctx context.Context, req *state.GetRequest) (*st
|
||||||
return &state.GetResponse{}, nil
|
return &state.GetResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return &state.GetResponse{Data: item.data, ETag: item.etag}, nil
|
var metadata map[string]string
|
||||||
|
if item.expire != nil {
|
||||||
|
metadata = map[string]string{
|
||||||
|
state.GetRespMetaKeyTTLExpireTime: item.expire.UTC().Format(time.RFC3339),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &state.GetResponse{Data: item.data, ETag: item.etag, Metadata: metadata}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (store *inMemoryStore) BulkGet(ctx context.Context, req []state.GetRequest, _ state.BulkGetOpts) ([]state.BulkGetResponse, error) {
|
func (store *inMemoryStore) BulkGet(ctx context.Context, req []state.GetRequest, _ state.BulkGetOpts) ([]state.BulkGetResponse, error) {
|
||||||
|
@ -171,12 +181,18 @@ func (store *inMemoryStore) BulkGet(ctx context.Context, req []state.GetRequest,
|
||||||
|
|
||||||
for i, r := range req {
|
for i, r := range req {
|
||||||
item := store.items[r.Key]
|
item := store.items[r.Key]
|
||||||
if item != nil && !item.isExpired() {
|
if item != nil && !item.isExpired(store.clock.Now()) {
|
||||||
res[i] = state.BulkGetResponse{
|
res[i] = state.BulkGetResponse{
|
||||||
Key: r.Key,
|
Key: r.Key,
|
||||||
Data: item.data,
|
Data: item.data,
|
||||||
ETag: item.etag,
|
ETag: item.etag,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if item.expire != nil {
|
||||||
|
res[i].Metadata = map[string]string{
|
||||||
|
state.GetRespMetaKeyTTLExpireTime: item.expire.UTC().Format(time.RFC3339),
|
||||||
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
res[i] = state.BulkGetResponse{
|
res[i] = state.BulkGetResponse{
|
||||||
Key: r.Key,
|
Key: r.Key,
|
||||||
|
@ -193,7 +209,7 @@ func (store *inMemoryStore) getAndExpire(key string) *inMemStateStoreItem {
|
||||||
if item == nil {
|
if item == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if item.isExpired() {
|
if item.isExpired(store.clock.Now()) {
|
||||||
delete(store.items, key)
|
delete(store.items, key)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -280,7 +296,7 @@ func (store *inMemoryStore) doSet(ctx context.Context, key string, data []byte,
|
||||||
etag: &etag,
|
etag: &etag,
|
||||||
}
|
}
|
||||||
if ttlInSeconds > 0 {
|
if ttlInSeconds > 0 {
|
||||||
el.expire = ptr.Of(time.Now().UnixMilli() + int64(ttlInSeconds)*1000)
|
el.expire = ptr.Of(store.clock.Now().Add(time.Duration(ttlInSeconds) * time.Second))
|
||||||
}
|
}
|
||||||
|
|
||||||
store.items[key] = el
|
store.items[key] = el
|
||||||
|
@ -388,7 +404,7 @@ func (store *inMemoryStore) doCleanExpiredItems() {
|
||||||
defer store.lock.Unlock()
|
defer store.lock.Unlock()
|
||||||
|
|
||||||
for key, item := range store.items {
|
for key, item := range store.items {
|
||||||
if item.expire != nil && item.isExpired() {
|
if item.expire != nil && item.isExpired(store.clock.Now()) {
|
||||||
store.doDelete(context.Background(), key)
|
store.doDelete(context.Background(), key)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -402,12 +418,12 @@ func (store *inMemoryStore) GetComponentMetadata() map[string]string {
|
||||||
type inMemStateStoreItem struct {
|
type inMemStateStoreItem struct {
|
||||||
data []byte
|
data []byte
|
||||||
etag *string
|
etag *string
|
||||||
expire *int64
|
expire *time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
func (item *inMemStateStoreItem) isExpired() bool {
|
func (item *inMemStateStoreItem) isExpired(now time.Time) bool {
|
||||||
if item == nil || item.expire == nil {
|
if item == nil || item.expire == nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return time.Now().UnixMilli() > *item.expire
|
return now.After(*item.expire)
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,11 +15,14 @@ package inmemory
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/golang/mock/gomock"
|
"github.com/golang/mock/gomock"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
clocktesting "k8s.io/utils/clock/testing"
|
||||||
|
|
||||||
"github.com/dapr/components-contrib/state"
|
"github.com/dapr/components-contrib/state"
|
||||||
"github.com/dapr/kit/logger"
|
"github.com/dapr/kit/logger"
|
||||||
|
@ -30,7 +33,9 @@ func TestReadAndWrite(t *testing.T) {
|
||||||
|
|
||||||
defer ctl.Finish()
|
defer ctl.Finish()
|
||||||
|
|
||||||
store := NewInMemoryStateStore(logger.NewLogger("test"))
|
store := NewInMemoryStateStore(logger.NewLogger("test")).(*inMemoryStore)
|
||||||
|
fakeClock := clocktesting.NewFakeClock(time.Now())
|
||||||
|
store.clock = fakeClock
|
||||||
store.Init(context.Background(), state.Metadata{})
|
store.Init(context.Background(), state.Metadata{})
|
||||||
|
|
||||||
keyA := "theFirstKey"
|
keyA := "theFirstKey"
|
||||||
|
@ -65,7 +70,7 @@ func TestReadAndWrite(t *testing.T) {
|
||||||
err := store.Set(context.Background(), setReq)
|
err := store.Set(context.Background(), setReq)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
// simulate expiration
|
// simulate expiration
|
||||||
time.Sleep(2 * time.Second)
|
fakeClock.Step(2 * time.Second)
|
||||||
// get
|
// get
|
||||||
getReq := &state.GetRequest{
|
getReq := &state.GetRequest{
|
||||||
Key: keyA,
|
Key: keyA,
|
||||||
|
@ -77,6 +82,64 @@ func TestReadAndWrite(t *testing.T) {
|
||||||
assert.Nil(t, resp.ETag)
|
assert.Nil(t, resp.ETag)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("return expire time when ttlInSeconds set with Get", func(t *testing.T) {
|
||||||
|
now := fakeClock.Now()
|
||||||
|
|
||||||
|
// set with LWW
|
||||||
|
setReq := &state.SetRequest{
|
||||||
|
Key: keyA,
|
||||||
|
Value: valueA,
|
||||||
|
Metadata: map[string]string{"ttlInSeconds": "1000"},
|
||||||
|
}
|
||||||
|
|
||||||
|
err := store.Set(context.Background(), setReq)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// get
|
||||||
|
getReq := &state.GetRequest{
|
||||||
|
Key: keyA,
|
||||||
|
}
|
||||||
|
resp, err := store.Get(context.Background(), getReq)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.NotNil(t, resp)
|
||||||
|
assert.Equal(t, `"value of key"`, string(resp.Data))
|
||||||
|
assert.Len(t, resp.Metadata, 1)
|
||||||
|
require.Contains(t, resp.Metadata, "ttlExpireTime")
|
||||||
|
assert.Equal(t, now.Add(time.Second*1000).UTC().Format(time.RFC3339), resp.Metadata["ttlExpireTime"])
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("return expire time when ttlInSeconds set with GetBulk", func(t *testing.T) {
|
||||||
|
assert.NoError(t, store.Set(context.Background(), &state.SetRequest{
|
||||||
|
Key: "a",
|
||||||
|
Value: "123",
|
||||||
|
Metadata: map[string]string{"ttlInSeconds": "1000"},
|
||||||
|
}))
|
||||||
|
assert.NoError(t, store.Set(context.Background(), &state.SetRequest{
|
||||||
|
Key: "b",
|
||||||
|
Value: "456",
|
||||||
|
Metadata: map[string]string{"ttlInSeconds": "2001"},
|
||||||
|
}))
|
||||||
|
|
||||||
|
resp, err := store.BulkGet(context.Background(), []state.GetRequest{
|
||||||
|
{Key: "a"},
|
||||||
|
{Key: "b"},
|
||||||
|
}, state.BulkGetOpts{})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.NotNil(t, resp)
|
||||||
|
require.Len(t, resp, 2)
|
||||||
|
sort.Slice(resp, func(i, j int) bool {
|
||||||
|
return resp[i].Key < resp[j].Key
|
||||||
|
})
|
||||||
|
assert.Equal(t, `"123"`, string(resp[0].Data))
|
||||||
|
assert.Equal(t, `"456"`, string(resp[1].Data))
|
||||||
|
assert.Len(t, resp[0].Metadata, 1)
|
||||||
|
require.Contains(t, resp[0].Metadata, "ttlExpireTime")
|
||||||
|
assert.Len(t, resp[1].Metadata, 1)
|
||||||
|
require.Contains(t, resp[1].Metadata, "ttlExpireTime")
|
||||||
|
assert.Equal(t, fakeClock.Now().Add(time.Second*1000).UTC().Format(time.RFC3339), resp[0].Metadata["ttlExpireTime"])
|
||||||
|
assert.Equal(t, fakeClock.Now().Add(time.Second*2001).UTC().Format(time.RFC3339), resp[1].Metadata["ttlExpireTime"])
|
||||||
|
})
|
||||||
|
|
||||||
t.Run("set and get the second key successfully", func(t *testing.T) {
|
t.Run("set and get the second key successfully", func(t *testing.T) {
|
||||||
// set
|
// set
|
||||||
setReq := &state.SetRequest{
|
setReq := &state.SetRequest{
|
||||||
|
|
Loading…
Reference in New Issue