components-contrib/state/in-memory/in_memory.go

462 lines
11 KiB
Go

/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package inmemory
import (
"context"
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/google/uuid"
"k8s.io/utils/clock"
"github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/state"
"github.com/dapr/components-contrib/state/utils"
"github.com/dapr/kit/logger"
"github.com/dapr/kit/ptr"
)
type inMemoryStore struct {
state.BulkStore
items map[string]*inMemStateStoreItem
lock sync.RWMutex
log logger.Logger
clock clock.Clock
closeCh chan struct{}
closed atomic.Bool
wg sync.WaitGroup
}
func NewInMemoryStateStore(log logger.Logger) state.Store {
return newStateStore(log)
}
func newStateStore(log logger.Logger) *inMemoryStore {
s := &inMemoryStore{
items: map[string]*inMemStateStoreItem{},
log: log,
closeCh: make(chan struct{}),
clock: clock.RealClock{},
}
s.BulkStore = state.NewDefaultBulkStore(s)
return s
}
func (store *inMemoryStore) Init(ctx context.Context, metadata state.Metadata) error {
// start a background go routine to clean expired item
store.wg.Add(1)
go func() {
defer store.wg.Done()
store.startCleanThread()
}()
return nil
}
func (store *inMemoryStore) Close() error {
if store.closed.CompareAndSwap(false, true) {
close(store.closeCh)
}
// release memory reference
store.lock.Lock()
defer store.lock.Unlock()
for k := range store.items {
delete(store.items, k)
}
store.wg.Wait()
return nil
}
func (store *inMemoryStore) Features() []state.Feature {
return []state.Feature{
state.FeatureETag,
state.FeatureTransactional,
state.FeatureTTL,
state.FeatureDeleteWithPrefix,
}
}
func (store *inMemoryStore) Delete(ctx context.Context, req *state.DeleteRequest) error {
// step1: validate parameters
if err := state.CheckRequestOptions(req.Options); err != nil {
return err
}
// step2 and step3 should be protected by write-lock
store.lock.Lock()
defer store.lock.Unlock()
// step2: validate etag if needed
if err := store.doValidateEtag(req.Key, req.ETag, req.Options.Concurrency); err != nil {
return err
}
// step3: do really delete
// this operation won't fail
store.doDelete(ctx, req.Key)
return nil
}
func (store *inMemoryStore) DeleteWithPrefix(ctx context.Context, req state.DeleteWithPrefixRequest) (state.DeleteWithPrefixResponse, error) {
// step1: validate parameters
err := req.Validate()
if err != nil {
return state.DeleteWithPrefixResponse{}, err
}
// step2 should be protected by write-lock
store.lock.Lock()
defer store.lock.Unlock()
// step2: do really delete
// this operation won't fail
var count int64
for key := range store.items {
if strings.HasPrefix(key, req.Prefix) {
// The string contains the prefix, now we check to make sure there aren't more || after
longerPrefix := strings.Contains(key[len(req.Prefix):], "||")
if !longerPrefix {
delete(store.items, key)
count++
}
}
}
return state.DeleteWithPrefixResponse{Count: count}, nil
}
func (store *inMemoryStore) doValidateEtag(key string, etag *string, concurrency string) error {
hasEtag := etag != nil && *etag != ""
if concurrency == state.FirstWrite && !hasEtag {
item := store.items[key]
if item != nil {
return state.NewETagError(state.ETagMismatch, errors.New("item already exists and no etag was passed"))
} else {
return nil
}
} else if hasEtag {
item := store.items[key]
if item == nil {
return state.NewETagError(state.ETagMismatch, fmt.Errorf("state not exist or expired for key=%s", key))
}
if item.etag == nil {
return state.NewETagError(state.ETagMismatch, fmt.Errorf(
"state etag not match for key=%s: current=nil, expect=%s", key, *etag))
}
if *item.etag != *etag {
return state.NewETagError(state.ETagMismatch, fmt.Errorf(
"state etag not match for key=%s: current=%s, expect=%s", key, *item.etag, *etag))
}
}
return nil
}
func (store *inMemoryStore) doDelete(ctx context.Context, key string) {
delete(store.items, key)
}
func (store *inMemoryStore) Get(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) {
store.lock.RLock()
item := store.items[req.Key]
store.lock.RUnlock()
if item != nil && item.isExpired(store.clock.Now()) {
store.lock.Lock()
item = store.getAndExpire(req.Key)
store.lock.Unlock()
}
if item == nil {
return &state.GetResponse{}, nil
}
var metadata map[string]string
if item.expire != nil {
metadata = map[string]string{
state.GetRespMetaKeyTTLExpireTime: item.expire.UTC().Format(time.RFC3339),
}
}
return &state.GetResponse{Data: item.data, ETag: item.etag, Metadata: metadata}, nil
}
func (store *inMemoryStore) BulkGet(ctx context.Context, req []state.GetRequest, _ state.BulkGetOpts) ([]state.BulkGetResponse, error) {
res := make([]state.BulkGetResponse, len(req))
if len(req) == 0 {
return res, nil
}
// While working in bulk, we won't delete expired records we may encounter; we'll just let them stay until GC picks them up
store.lock.RLock()
defer store.lock.RUnlock()
for i, r := range req {
item := store.items[r.Key]
if item != nil && !item.isExpired(store.clock.Now()) {
res[i] = state.BulkGetResponse{
Key: r.Key,
Data: item.data,
ETag: item.etag,
}
if item.expire != nil {
res[i].Metadata = map[string]string{
state.GetRespMetaKeyTTLExpireTime: item.expire.UTC().Format(time.RFC3339),
}
}
} else {
res[i] = state.BulkGetResponse{
Key: r.Key,
}
}
}
return res, nil
}
func (store *inMemoryStore) getAndExpire(key string) *inMemStateStoreItem {
// get item and check expired again to avoid if item changed between we got this write-lock
item := store.items[key]
if item == nil {
return nil
}
if item.isExpired(store.clock.Now()) {
delete(store.items, key)
return nil
}
return item
}
func (store *inMemoryStore) marshal(v any) (bt []byte, err error) {
byteArray, isBinary := v.([]uint8)
if isBinary {
bt = byteArray
} else {
bt, err = utils.Marshal(v, json.Marshal)
if err != nil {
return nil, err
}
}
return bt, nil
}
func (store *inMemoryStore) Set(ctx context.Context, req *state.SetRequest) error {
// step1: validate parameters
ttlInSeconds, err := store.doSetValidateParameters(req)
if err != nil {
return err
}
// step2 and step3 should be protected by write-lock
store.lock.Lock()
defer store.lock.Unlock()
// step2: validate etag if needed
err = store.doValidateEtag(req.Key, req.ETag, req.Options.Concurrency)
if err != nil {
return err
}
// step3: do really set
bt, err := store.marshal(req.Value)
if err != nil {
return err
}
// this operation won't fail
store.doSet(ctx, req.Key, bt, ttlInSeconds)
return nil
}
func (store *inMemoryStore) doSetValidateParameters(req *state.SetRequest) (int, error) {
err := state.CheckRequestOptions(req.Options)
if err != nil {
return 0, err
}
ttlInSeconds, err := doParseTTLInSeconds(req.Metadata)
if err != nil {
return 0, err
}
return ttlInSeconds, nil
}
func doParseTTLInSeconds(metadata map[string]string) (int, error) {
s := metadata["ttlInSeconds"]
if s == "" {
return 0, nil
}
i, err := strconv.Atoi(s)
if err != nil {
return 0, err
}
if i < 0 {
i = 0
}
return i, nil
}
func (store *inMemoryStore) doSet(ctx context.Context, key string, data []byte, ttlInSeconds int) {
etag := uuid.New().String()
el := &inMemStateStoreItem{
data: data,
etag: &etag,
}
if ttlInSeconds > 0 {
el.expire = ptr.Of(store.clock.Now().Add(time.Duration(ttlInSeconds) * time.Second))
}
store.items[key] = el
}
// innerSetRequest is only used to pass ttlInSeconds and data with SetRequest.
type innerSetRequest struct {
req state.SetRequest
ttl int
data []byte
}
// Implements state.TransactionalStateOperation
func (innerSetRequest) Operation() state.OperationType {
return "_internal"
}
// Implements state.StateRequest
func (r innerSetRequest) GetKey() string {
return r.req.Key
}
func (r innerSetRequest) GetMetadata() map[string]string {
return r.req.Metadata
}
func (store *inMemoryStore) Multi(ctx context.Context, request *state.TransactionalStateRequest) error {
if len(request.Operations) == 0 {
return nil
}
// step1: validate parameters
for i, o := range request.Operations {
switch req := o.(type) {
case state.SetRequest:
ttlInSeconds, err := store.doSetValidateParameters(&req)
if err != nil {
return err
}
bt, err := store.marshal(req.Value)
if err != nil {
return err
}
innerSetRequest := &innerSetRequest{
req: req,
ttl: ttlInSeconds,
data: bt,
}
// replace with innerSetRequest
request.Operations[i] = innerSetRequest
case state.DeleteRequest:
err := state.CheckRequestOptions(&req)
if err != nil {
return err
}
}
}
// step2 and step3 should be protected by write-lock
store.lock.Lock()
defer store.lock.Unlock()
// step2: validate etag if needed
for _, o := range request.Operations {
switch req := o.(type) {
case *innerSetRequest:
err := store.doValidateEtag(req.req.Key, req.req.ETag, req.req.Options.Concurrency)
if err != nil {
return err
}
case state.DeleteRequest:
err := store.doValidateEtag(req.Key, req.ETag, req.Options.Concurrency)
if err != nil {
return err
}
}
}
// step3: do really set
// these operations won't fail
for _, o := range request.Operations {
switch req := o.(type) {
case *innerSetRequest:
store.doSet(ctx, req.req.Key, req.data, req.ttl)
case state.DeleteRequest:
store.doDelete(ctx, req.Key)
}
}
return nil
}
func (store *inMemoryStore) startCleanThread() {
for {
select {
case <-time.After(time.Second):
store.doCleanExpiredItems()
case <-store.closeCh:
return
}
}
}
func (store *inMemoryStore) doCleanExpiredItems() {
store.lock.Lock()
defer store.lock.Unlock()
for key, item := range store.items {
if item.expire != nil && item.isExpired(store.clock.Now()) {
store.doDelete(context.Background(), key)
}
}
}
func (store *inMemoryStore) GetComponentMetadata() (metadataInfo metadata.MetadataMap) {
// no metadata, hence no metadata struct to convert here
return
}
type inMemStateStoreItem struct {
data []byte
etag *string
expire *time.Time
}
func (item *inMemStateStoreItem) isExpired(now time.Time) bool {
if item == nil || item.expire == nil {
return false
}
return now.After(*item.expire)
}