462 lines
11 KiB
Go
462 lines
11 KiB
Go
/*
|
|
Copyright 2021 The Dapr Authors
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package inmemory
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"k8s.io/utils/clock"
|
|
|
|
"github.com/dapr/components-contrib/metadata"
|
|
"github.com/dapr/components-contrib/state"
|
|
"github.com/dapr/components-contrib/state/utils"
|
|
"github.com/dapr/kit/logger"
|
|
"github.com/dapr/kit/ptr"
|
|
)
|
|
|
|
type inMemoryStore struct {
|
|
state.BulkStore
|
|
|
|
items map[string]*inMemStateStoreItem
|
|
lock sync.RWMutex
|
|
log logger.Logger
|
|
clock clock.Clock
|
|
closeCh chan struct{}
|
|
closed atomic.Bool
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
func NewInMemoryStateStore(log logger.Logger) state.Store {
|
|
return newStateStore(log)
|
|
}
|
|
|
|
func newStateStore(log logger.Logger) *inMemoryStore {
|
|
s := &inMemoryStore{
|
|
items: map[string]*inMemStateStoreItem{},
|
|
log: log,
|
|
closeCh: make(chan struct{}),
|
|
clock: clock.RealClock{},
|
|
}
|
|
s.BulkStore = state.NewDefaultBulkStore(s)
|
|
return s
|
|
}
|
|
|
|
func (store *inMemoryStore) Init(ctx context.Context, metadata state.Metadata) error {
|
|
// start a background go routine to clean expired item
|
|
store.wg.Add(1)
|
|
go func() {
|
|
defer store.wg.Done()
|
|
store.startCleanThread()
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
func (store *inMemoryStore) Close() error {
|
|
if store.closed.CompareAndSwap(false, true) {
|
|
close(store.closeCh)
|
|
}
|
|
|
|
// release memory reference
|
|
store.lock.Lock()
|
|
defer store.lock.Unlock()
|
|
for k := range store.items {
|
|
delete(store.items, k)
|
|
}
|
|
|
|
store.wg.Wait()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (store *inMemoryStore) Features() []state.Feature {
|
|
return []state.Feature{
|
|
state.FeatureETag,
|
|
state.FeatureTransactional,
|
|
state.FeatureTTL,
|
|
state.FeatureDeleteWithPrefix,
|
|
}
|
|
}
|
|
|
|
func (store *inMemoryStore) Delete(ctx context.Context, req *state.DeleteRequest) error {
|
|
// step1: validate parameters
|
|
if err := state.CheckRequestOptions(req.Options); err != nil {
|
|
return err
|
|
}
|
|
|
|
// step2 and step3 should be protected by write-lock
|
|
store.lock.Lock()
|
|
defer store.lock.Unlock()
|
|
|
|
// step2: validate etag if needed
|
|
if err := store.doValidateEtag(req.Key, req.ETag, req.Options.Concurrency); err != nil {
|
|
return err
|
|
}
|
|
|
|
// step3: do really delete
|
|
// this operation won't fail
|
|
store.doDelete(ctx, req.Key)
|
|
return nil
|
|
}
|
|
|
|
func (store *inMemoryStore) DeleteWithPrefix(ctx context.Context, req state.DeleteWithPrefixRequest) (state.DeleteWithPrefixResponse, error) {
|
|
// step1: validate parameters
|
|
err := req.Validate()
|
|
if err != nil {
|
|
return state.DeleteWithPrefixResponse{}, err
|
|
}
|
|
|
|
// step2 should be protected by write-lock
|
|
store.lock.Lock()
|
|
defer store.lock.Unlock()
|
|
|
|
// step2: do really delete
|
|
// this operation won't fail
|
|
var count int64
|
|
|
|
for key := range store.items {
|
|
if strings.HasPrefix(key, req.Prefix) {
|
|
// The string contains the prefix, now we check to make sure there aren't more || after
|
|
longerPrefix := strings.Contains(key[len(req.Prefix):], "||")
|
|
if !longerPrefix {
|
|
delete(store.items, key)
|
|
count++
|
|
}
|
|
}
|
|
}
|
|
return state.DeleteWithPrefixResponse{Count: count}, nil
|
|
}
|
|
|
|
func (store *inMemoryStore) doValidateEtag(key string, etag *string, concurrency string) error {
|
|
hasEtag := etag != nil && *etag != ""
|
|
|
|
if concurrency == state.FirstWrite && !hasEtag {
|
|
item := store.items[key]
|
|
if item != nil {
|
|
return state.NewETagError(state.ETagMismatch, errors.New("item already exists and no etag was passed"))
|
|
} else {
|
|
return nil
|
|
}
|
|
} else if hasEtag {
|
|
item := store.items[key]
|
|
if item == nil {
|
|
return state.NewETagError(state.ETagMismatch, fmt.Errorf("state not exist or expired for key=%s", key))
|
|
}
|
|
if item.etag == nil {
|
|
return state.NewETagError(state.ETagMismatch, fmt.Errorf(
|
|
"state etag not match for key=%s: current=nil, expect=%s", key, *etag))
|
|
}
|
|
if *item.etag != *etag {
|
|
return state.NewETagError(state.ETagMismatch, fmt.Errorf(
|
|
"state etag not match for key=%s: current=%s, expect=%s", key, *item.etag, *etag))
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (store *inMemoryStore) doDelete(ctx context.Context, key string) {
|
|
delete(store.items, key)
|
|
}
|
|
|
|
func (store *inMemoryStore) Get(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) {
|
|
store.lock.RLock()
|
|
item := store.items[req.Key]
|
|
store.lock.RUnlock()
|
|
if item != nil && item.isExpired(store.clock.Now()) {
|
|
store.lock.Lock()
|
|
item = store.getAndExpire(req.Key)
|
|
store.lock.Unlock()
|
|
}
|
|
|
|
if item == nil {
|
|
return &state.GetResponse{}, nil
|
|
}
|
|
|
|
var metadata map[string]string
|
|
if item.expire != nil {
|
|
metadata = map[string]string{
|
|
state.GetRespMetaKeyTTLExpireTime: item.expire.UTC().Format(time.RFC3339),
|
|
}
|
|
}
|
|
|
|
return &state.GetResponse{Data: item.data, ETag: item.etag, Metadata: metadata}, nil
|
|
}
|
|
|
|
func (store *inMemoryStore) BulkGet(ctx context.Context, req []state.GetRequest, _ state.BulkGetOpts) ([]state.BulkGetResponse, error) {
|
|
res := make([]state.BulkGetResponse, len(req))
|
|
if len(req) == 0 {
|
|
return res, nil
|
|
}
|
|
|
|
// While working in bulk, we won't delete expired records we may encounter; we'll just let them stay until GC picks them up
|
|
store.lock.RLock()
|
|
defer store.lock.RUnlock()
|
|
|
|
for i, r := range req {
|
|
item := store.items[r.Key]
|
|
if item != nil && !item.isExpired(store.clock.Now()) {
|
|
res[i] = state.BulkGetResponse{
|
|
Key: r.Key,
|
|
Data: item.data,
|
|
ETag: item.etag,
|
|
}
|
|
|
|
if item.expire != nil {
|
|
res[i].Metadata = map[string]string{
|
|
state.GetRespMetaKeyTTLExpireTime: item.expire.UTC().Format(time.RFC3339),
|
|
}
|
|
}
|
|
} else {
|
|
res[i] = state.BulkGetResponse{
|
|
Key: r.Key,
|
|
}
|
|
}
|
|
}
|
|
|
|
return res, nil
|
|
}
|
|
|
|
func (store *inMemoryStore) getAndExpire(key string) *inMemStateStoreItem {
|
|
// get item and check expired again to avoid if item changed between we got this write-lock
|
|
item := store.items[key]
|
|
if item == nil {
|
|
return nil
|
|
}
|
|
if item.isExpired(store.clock.Now()) {
|
|
delete(store.items, key)
|
|
return nil
|
|
}
|
|
return item
|
|
}
|
|
|
|
func (store *inMemoryStore) marshal(v any) (bt []byte, err error) {
|
|
byteArray, isBinary := v.([]uint8)
|
|
if isBinary {
|
|
bt = byteArray
|
|
} else {
|
|
bt, err = utils.Marshal(v, json.Marshal)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return bt, nil
|
|
}
|
|
|
|
func (store *inMemoryStore) Set(ctx context.Context, req *state.SetRequest) error {
|
|
// step1: validate parameters
|
|
ttlInSeconds, err := store.doSetValidateParameters(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// step2 and step3 should be protected by write-lock
|
|
store.lock.Lock()
|
|
defer store.lock.Unlock()
|
|
|
|
// step2: validate etag if needed
|
|
err = store.doValidateEtag(req.Key, req.ETag, req.Options.Concurrency)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// step3: do really set
|
|
bt, err := store.marshal(req.Value)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// this operation won't fail
|
|
store.doSet(ctx, req.Key, bt, ttlInSeconds)
|
|
return nil
|
|
}
|
|
|
|
func (store *inMemoryStore) doSetValidateParameters(req *state.SetRequest) (int, error) {
|
|
err := state.CheckRequestOptions(req.Options)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
ttlInSeconds, err := doParseTTLInSeconds(req.Metadata)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
return ttlInSeconds, nil
|
|
}
|
|
|
|
func doParseTTLInSeconds(metadata map[string]string) (int, error) {
|
|
s := metadata["ttlInSeconds"]
|
|
if s == "" {
|
|
return 0, nil
|
|
}
|
|
|
|
i, err := strconv.Atoi(s)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
if i < 0 {
|
|
i = 0
|
|
}
|
|
|
|
return i, nil
|
|
}
|
|
|
|
func (store *inMemoryStore) doSet(ctx context.Context, key string, data []byte, ttlInSeconds int) {
|
|
etag := uuid.New().String()
|
|
el := &inMemStateStoreItem{
|
|
data: data,
|
|
etag: &etag,
|
|
}
|
|
if ttlInSeconds > 0 {
|
|
el.expire = ptr.Of(store.clock.Now().Add(time.Duration(ttlInSeconds) * time.Second))
|
|
}
|
|
|
|
store.items[key] = el
|
|
}
|
|
|
|
// innerSetRequest is only used to pass ttlInSeconds and data with SetRequest.
|
|
type innerSetRequest struct {
|
|
req state.SetRequest
|
|
ttl int
|
|
data []byte
|
|
}
|
|
|
|
// Implements state.TransactionalStateOperation
|
|
func (innerSetRequest) Operation() state.OperationType {
|
|
return "_internal"
|
|
}
|
|
|
|
// Implements state.StateRequest
|
|
func (r innerSetRequest) GetKey() string {
|
|
return r.req.Key
|
|
}
|
|
|
|
func (r innerSetRequest) GetMetadata() map[string]string {
|
|
return r.req.Metadata
|
|
}
|
|
|
|
func (store *inMemoryStore) Multi(ctx context.Context, request *state.TransactionalStateRequest) error {
|
|
if len(request.Operations) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// step1: validate parameters
|
|
for i, o := range request.Operations {
|
|
switch req := o.(type) {
|
|
case state.SetRequest:
|
|
ttlInSeconds, err := store.doSetValidateParameters(&req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
bt, err := store.marshal(req.Value)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
innerSetRequest := &innerSetRequest{
|
|
req: req,
|
|
ttl: ttlInSeconds,
|
|
data: bt,
|
|
}
|
|
// replace with innerSetRequest
|
|
request.Operations[i] = innerSetRequest
|
|
case state.DeleteRequest:
|
|
err := state.CheckRequestOptions(&req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// step2 and step3 should be protected by write-lock
|
|
store.lock.Lock()
|
|
defer store.lock.Unlock()
|
|
|
|
// step2: validate etag if needed
|
|
for _, o := range request.Operations {
|
|
switch req := o.(type) {
|
|
case *innerSetRequest:
|
|
err := store.doValidateEtag(req.req.Key, req.req.ETag, req.req.Options.Concurrency)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
case state.DeleteRequest:
|
|
err := store.doValidateEtag(req.Key, req.ETag, req.Options.Concurrency)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// step3: do really set
|
|
// these operations won't fail
|
|
for _, o := range request.Operations {
|
|
switch req := o.(type) {
|
|
case *innerSetRequest:
|
|
store.doSet(ctx, req.req.Key, req.data, req.ttl)
|
|
case state.DeleteRequest:
|
|
store.doDelete(ctx, req.Key)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (store *inMemoryStore) startCleanThread() {
|
|
for {
|
|
select {
|
|
case <-time.After(time.Second):
|
|
store.doCleanExpiredItems()
|
|
case <-store.closeCh:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (store *inMemoryStore) doCleanExpiredItems() {
|
|
store.lock.Lock()
|
|
defer store.lock.Unlock()
|
|
|
|
for key, item := range store.items {
|
|
if item.expire != nil && item.isExpired(store.clock.Now()) {
|
|
store.doDelete(context.Background(), key)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (store *inMemoryStore) GetComponentMetadata() (metadataInfo metadata.MetadataMap) {
|
|
// no metadata, hence no metadata struct to convert here
|
|
return
|
|
}
|
|
|
|
type inMemStateStoreItem struct {
|
|
data []byte
|
|
etag *string
|
|
expire *time.Time
|
|
}
|
|
|
|
func (item *inMemStateStoreItem) isExpired(now time.Time) bool {
|
|
if item == nil || item.expire == nil {
|
|
return false
|
|
}
|
|
return now.After(*item.expire)
|
|
}
|