components-contrib/state/rethinkdb/rethinkdb.go

322 lines
8.6 KiB
Go

/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rethinkdb
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"reflect"
"time"
r "github.com/dancannon/gorethink"
"github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/state"
"github.com/dapr/kit/logger"
kitmd "github.com/dapr/kit/metadata"
"github.com/dapr/kit/ptr"
)
const (
stateTableNameDefault = "daprstate"
stateTablePKName = "id"
stateArchiveTableName = "daprstate_archive"
stateArchiveTablePKName = "key"
)
// RethinkDB is a state store implementation for RethinkDB.
type RethinkDB struct {
session *r.Session
config *stateConfig
features []state.Feature
logger logger.Logger
}
type stateConfig struct {
r.ConnectOpts `mapstructure:",squash"`
Archive bool `json:"archive"`
Table string `json:"table"`
}
type stateRecord struct {
ID string `json:"id" rethinkdb:"id"`
TS int64 `json:"timestamp" rethinkdb:"timestamp"`
Hash string `json:"hash,omitempty" rethinkdb:"hash,omitempty"`
Data any `json:"data,omitempty" rethinkdb:"data,omitempty"`
}
// NewRethinkDBStateStore returns a new RethinkDB state store.
func NewRethinkDBStateStore(logger logger.Logger) state.Store {
s := &RethinkDB{
features: []state.Feature{},
logger: logger,
}
return s
}
// Init parses metadata, initializes the RethinkDB client, and ensures the state table exists.
func (s *RethinkDB) Init(ctx context.Context, metadata state.Metadata) error {
r.Log.Out = io.Discard
r.SetTags("rethinkdb", "json")
cfg, err := metadataToConfig(metadata.Properties, s.logger)
if err != nil {
return fmt.Errorf("unable to parse metadata properties: %w", err)
}
// in case someone runs Init multiple times
if s.session != nil && s.session.IsConnected() {
s.session.Close()
}
ses, err := r.Connect(cfg.ConnectOpts)
if err != nil {
return fmt.Errorf("error connecting to the database: %w", err)
}
s.session = ses
s.config = cfg
// check if table already exists
listContext, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
c, err := r.DB(s.config.Database).TableList().Run(s.session, r.RunOpts{Context: listContext})
if err != nil {
return fmt.Errorf("error checking for state table existence in DB: %w", err)
}
if c == nil {
return fmt.Errorf("invalid database response, cursor required: %w", err)
}
defer c.Close()
var list []string
err = c.All(&list)
if err != nil {
return fmt.Errorf("invalid database responsewhile listing tables: %w", err)
}
if !tableExists(list, s.config.Table) {
cctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
_, err = r.DB(s.config.Database).TableCreate(s.config.Table, r.TableCreateOpts{
PrimaryKey: stateTablePKName,
}).RunWrite(s.session, r.RunOpts{Context: cctx})
if err != nil {
return fmt.Errorf("error creating state table in DB: %w", err)
}
}
if s.config.Archive && !tableExists(list, stateArchiveTableName) {
// create archive table with autokey to preserve state id
ctblCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
_, err = r.DB(s.config.Database).TableCreate(stateArchiveTableName,
r.TableCreateOpts{PrimaryKey: stateArchiveTablePKName}).RunWrite(s.session, r.RunOpts{Context: ctblCtx})
if err != nil {
return fmt.Errorf("error creating state archive table in DB: %w", err)
}
// index archive table for id and timestamp
cindCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
_, err = r.DB(s.config.Database).Table(stateArchiveTableName).
IndexCreateFunc("state_index", func(row r.Term) interface{} {
return []interface{}{row.Field("id"), row.Field("timestamp")}
}).RunWrite(s.session, r.RunOpts{Context: cindCtx})
if err != nil {
return fmt.Errorf("error creating state archive index in DB: %w", err)
}
}
return nil
}
// Features returns the features available in this state store.
func (s *RethinkDB) Features() []state.Feature {
return s.features
}
func tableExists(arr []string, table string) bool {
for _, a := range arr {
if a == table {
return true
}
}
return false
}
// Get retrieves a RethinkDB KV item.
func (s *RethinkDB) Get(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) {
if req == nil || req.Key == "" {
return nil, errors.New("invalid state request, missing key")
}
c, err := r.Table(s.config.Table).Get(req.Key).Run(s.session, r.RunOpts{Context: ctx})
if err != nil {
return nil, fmt.Errorf("error getting record from the database: %w", err)
}
if c == nil || c.IsNil() {
return &state.GetResponse{}, nil
}
if c != nil {
defer c.Close()
}
var doc stateRecord
err = c.One(&doc)
if err != nil {
return nil, fmt.Errorf("error parsing database content: %w", err)
}
resp := &state.GetResponse{ETag: ptr.Of(doc.Hash)}
b, ok := doc.Data.([]byte)
if ok {
resp.Data = b
} else {
data, err := json.Marshal(doc.Data)
if err != nil {
return nil, errors.New("error serializing data from database")
}
resp.Data = data
}
return resp, nil
}
func (s *RethinkDB) BulkGet(ctx context.Context, req []state.GetRequest, opts state.BulkGetOpts) ([]state.BulkGetResponse, error) {
return state.DoBulkGet(ctx, req, opts, s.Get)
}
// Set saves a state KV item.
func (s *RethinkDB) Set(ctx context.Context, req *state.SetRequest) error {
if req == nil || req.Key == "" || req.Value == nil {
return errors.New("invalid state request, key and value required")
}
return s.BulkSet(ctx, []state.SetRequest{*req}, state.BulkStoreOpts{})
}
// BulkSet performs a bulk save operation.
func (s *RethinkDB) BulkSet(ctx context.Context, req []state.SetRequest, _ state.BulkStoreOpts) error {
docs := make([]*stateRecord, len(req))
now := time.Now().UnixNano()
for i, v := range req {
var etag string
if v.ETag != nil {
etag = *v.ETag
}
docs[i] = &stateRecord{
ID: v.Key,
TS: now,
Data: v.Value,
Hash: etag,
}
}
resp, err := r.Table(s.config.Table).Insert(docs, r.InsertOpts{
Conflict: "replace",
ReturnChanges: true,
}).RunWrite(s.session, r.RunOpts{Context: ctx})
if err != nil {
return fmt.Errorf("error saving records to the database: %w", err)
}
if s.config.Archive && len(resp.Changes) > 0 {
s.archive(ctx, resp.Changes)
}
return nil
}
func (s *RethinkDB) archive(ctx context.Context, changes []r.ChangeResponse) error {
list := make([]map[string]interface{}, 0)
for _, c := range changes {
if c.NewValue != nil {
record, ok := c.NewValue.(map[string]interface{})
if !ok {
s.logger.Infof("invalid state DB change type: %T", c.NewValue)
continue
}
list = append(list, record)
}
}
if len(list) > 0 {
_, err := r.Table(stateArchiveTableName).Insert(list).RunWrite(s.session, r.RunOpts{Context: ctx})
if err != nil {
return fmt.Errorf("error archiving records to the database: %w", err)
}
}
return nil
}
// Delete performes a RethinkDB KV delete operation.
func (s *RethinkDB) Delete(ctx context.Context, req *state.DeleteRequest) error {
if req == nil || req.Key == "" {
return errors.New("invalid request, missing key")
}
return s.BulkDelete(ctx, []state.DeleteRequest{*req}, state.BulkStoreOpts{})
}
// BulkDelete performs a bulk delete operation.
func (s *RethinkDB) BulkDelete(ctx context.Context, req []state.DeleteRequest, _ state.BulkStoreOpts) error {
list := make([]string, len(req))
for i, d := range req {
list[i] = d.Key
}
c, err := r.Table(s.config.Table).GetAll(r.Args(list)).Delete().Run(s.session, r.RunOpts{Context: ctx})
if err != nil {
return fmt.Errorf("error deleting record from the database: %w", err)
}
defer c.Close()
return nil
}
func metadataToConfig(cfg map[string]string, logger logger.Logger) (*stateConfig, error) {
// defaults
c := stateConfig{
Table: stateTableNameDefault,
}
err := kitmd.DecodeMetadata(cfg, &c)
if err != nil {
return nil, err
}
return &c, nil
}
func (s *RethinkDB) GetComponentMetadata() (metadataInfo metadata.MetadataMap) {
metadataStruct := stateConfig{}
metadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, metadata.StateStoreType)
return
}
func (s *RethinkDB) Close() error {
if s.session == nil {
return nil
}
return s.session.Close()
}