mirror of https://github.com/dapr/go-sdk.git
308 lines
7.6 KiB
Go
308 lines
7.6 KiB
Go
package client
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"time"
|
|
|
|
v1 "github.com/dapr/go-sdk/dapr/proto/common/v1"
|
|
pb "github.com/dapr/go-sdk/dapr/proto/runtime/v1"
|
|
duration "github.com/golang/protobuf/ptypes/duration"
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
const (
|
|
// StateConsistencyUndefined is the undefined value for state consistency
|
|
StateConsistencyUndefined StateConsistency = 0
|
|
// StateConsistencyEventual represents eventual state consistency value
|
|
StateConsistencyEventual StateConsistency = 1
|
|
// StateConsistencyStrong represents strong state consistency value
|
|
StateConsistencyStrong StateConsistency = 2
|
|
|
|
// StateConcurrencyUndefined is the undefined value for state concurrency
|
|
StateConcurrencyUndefined StateConcurrency = 0
|
|
// StateConcurrencyFirstWrite represents first write concurrency value
|
|
StateConcurrencyFirstWrite StateConcurrency = 1
|
|
// StateConcurrencyLastWrite represents last write concurrency value
|
|
StateConcurrencyLastWrite StateConcurrency = 2
|
|
|
|
// RetryPatternUndefined is the undefined value for retry pattern
|
|
RetryPatternUndefined RetryPattern = 0
|
|
// RetryPatternLinear represents the linear retry pattern value
|
|
RetryPatternLinear RetryPattern = 1
|
|
// RetryPatternExponential represents the exponential retry pattern value
|
|
RetryPatternExponential RetryPattern = 2
|
|
)
|
|
|
|
type (
|
|
// StateConsistency is the consistency enum type
|
|
StateConsistency int
|
|
// StateConcurrency is the concurrency enum type
|
|
StateConcurrency int
|
|
// RetryPattern is the retry pattern enum type
|
|
RetryPattern int
|
|
)
|
|
|
|
func (c StateConsistency) String() string {
|
|
names := [...]string{
|
|
"Undefined",
|
|
"Strong",
|
|
"Eventual",
|
|
}
|
|
if c < StateConsistencyStrong || c > StateConsistencyEventual {
|
|
return "Undefined"
|
|
}
|
|
|
|
return names[c]
|
|
}
|
|
|
|
// End Consistency
|
|
|
|
func (c StateConcurrency) String() string {
|
|
names := [...]string{
|
|
"Undefined",
|
|
"FirstWrite",
|
|
"LastWrite",
|
|
}
|
|
if c < StateConcurrencyFirstWrite || c > StateConcurrencyLastWrite {
|
|
return "Undefined"
|
|
}
|
|
|
|
return names[c]
|
|
}
|
|
|
|
// END Concurrency
|
|
func (c RetryPattern) String() string {
|
|
names := [...]string{
|
|
"Undefined",
|
|
"Linear",
|
|
"Exponential",
|
|
}
|
|
if c < RetryPatternLinear || c > RetryPatternExponential {
|
|
return "Undefined"
|
|
}
|
|
|
|
return names[c]
|
|
}
|
|
|
|
// END Retry Pattern
|
|
|
|
var (
|
|
stateOptionRetryPolicyDefault = &v1.StateRetryPolicy{
|
|
Threshold: 3,
|
|
Pattern: v1.StateRetryPolicy_RETRY_EXPONENTIAL,
|
|
}
|
|
|
|
stateOptionDefault = &v1.StateOptions{
|
|
Concurrency: v1.StateOptions_CONCURRENCY_LAST_WRITE,
|
|
Consistency: v1.StateOptions_CONSISTENCY_STRONG,
|
|
RetryPolicy: stateOptionRetryPolicyDefault,
|
|
}
|
|
)
|
|
|
|
// State is a collection of StateItems with a store name
|
|
type State struct {
|
|
StoreName string
|
|
States []*StateItem
|
|
}
|
|
|
|
// StateItem represents the state to be persisted
|
|
type StateItem struct {
|
|
Key string
|
|
Value []byte
|
|
Etag string
|
|
Metadata map[string]string
|
|
Options *StateOptions
|
|
}
|
|
|
|
// StateOptions represents the state store persistence policy
|
|
type StateOptions struct {
|
|
Concurrency StateConcurrency
|
|
Consistency StateConsistency
|
|
RetryPolicy *StateRetryPolicy
|
|
}
|
|
|
|
// StateRetryPolicy represents the state store invocation retry policy
|
|
type StateRetryPolicy struct {
|
|
Threshold int32
|
|
Pattern RetryPattern
|
|
Interval time.Duration
|
|
}
|
|
|
|
// *** Converters
|
|
|
|
func toProtoSaveStateRequest(s *State) (req *pb.SaveStateRequest) {
|
|
r := &pb.SaveStateRequest{
|
|
StoreName: s.StoreName,
|
|
States: make([]*v1.StateItem, 0),
|
|
}
|
|
|
|
for _, si := range s.States {
|
|
item := toProtoSaveStateItem(si)
|
|
r.States = append(r.States, item)
|
|
}
|
|
return r
|
|
}
|
|
|
|
func toProtoSaveStateItem(si *StateItem) (item *v1.StateItem) {
|
|
return &v1.StateItem{
|
|
Etag: si.Etag,
|
|
Key: si.Key,
|
|
Metadata: si.Metadata,
|
|
Value: si.Value,
|
|
Options: toProtoStateOptions(si.Options),
|
|
}
|
|
}
|
|
|
|
func toProtoStateOptions(so *StateOptions) (opts *v1.StateOptions) {
|
|
if so == nil {
|
|
return stateOptionDefault
|
|
}
|
|
return &v1.StateOptions{
|
|
Concurrency: (v1.StateOptions_StateConcurrency(so.Concurrency)),
|
|
Consistency: (v1.StateOptions_StateConsistency(so.Consistency)),
|
|
RetryPolicy: &v1.StateRetryPolicy{
|
|
Interval: toProtoDuration(so.RetryPolicy.Interval),
|
|
Pattern: (v1.StateRetryPolicy_RetryPattern(so.RetryPolicy.Pattern)),
|
|
Threshold: so.RetryPolicy.Threshold,
|
|
},
|
|
}
|
|
}
|
|
|
|
func toProtoDuration(d time.Duration) *duration.Duration {
|
|
nanos := d.Nanoseconds()
|
|
secs := nanos / 1e9
|
|
nanos -= secs * 1e9
|
|
return &duration.Duration{
|
|
Seconds: int64(secs),
|
|
Nanos: int32(nanos),
|
|
}
|
|
}
|
|
|
|
// *** Save State ***
|
|
|
|
// SaveState saves the fully loaded save state request
|
|
func (c *Client) SaveState(ctx context.Context, s *State) error {
|
|
if s == nil || s.StoreName == "" || s.States == nil || len(s.States) < 1 {
|
|
return errors.New("nil or invalid state")
|
|
}
|
|
req := toProtoSaveStateRequest(s)
|
|
_, err := c.protoClient.SaveState(authContext(ctx), req)
|
|
if err != nil {
|
|
return errors.Wrap(err, "error saving state")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SaveStateItem saves a single state item
|
|
func (c *Client) SaveStateItem(ctx context.Context, store string, item *StateItem) error {
|
|
if store == "" {
|
|
return errors.New("nil store")
|
|
}
|
|
if item == nil {
|
|
return errors.New("nil item")
|
|
}
|
|
|
|
req := &State{
|
|
StoreName: store,
|
|
States: []*StateItem{item},
|
|
}
|
|
|
|
return c.SaveState(ctx, req)
|
|
}
|
|
|
|
// SaveStateWithData saves the data into store using default state options
|
|
func (c *Client) SaveStateWithData(ctx context.Context, store, key string, data []byte) error {
|
|
if store == "" {
|
|
return errors.New("nil store")
|
|
}
|
|
if key == "" {
|
|
return errors.New("nil key")
|
|
}
|
|
|
|
req := &State{
|
|
StoreName: store,
|
|
States: []*StateItem{
|
|
{
|
|
Key: key,
|
|
Value: data,
|
|
},
|
|
},
|
|
}
|
|
|
|
return c.SaveState(ctx, req)
|
|
}
|
|
|
|
// SaveStateJSON saves the JSON serialized in into store using default state options
|
|
func (c *Client) SaveStateJSON(ctx context.Context, store, key string, in interface{}) error {
|
|
if in == nil {
|
|
return errors.New("nil data to save")
|
|
}
|
|
b, err := json.Marshal(in)
|
|
if err != nil {
|
|
return errors.Wrap(err, "error marshaling content")
|
|
}
|
|
return c.SaveStateWithData(ctx, store, key, b)
|
|
}
|
|
|
|
// *** Get State ***
|
|
|
|
// GetStateWithConsistency retreaves state from specific store using provided request
|
|
func (c *Client) GetStateWithConsistency(ctx context.Context, store, key string, sc StateConsistency) (out []byte, err error) {
|
|
if store == "" {
|
|
return nil, errors.New("nil store")
|
|
}
|
|
if key == "" {
|
|
return nil, errors.New("nil key")
|
|
}
|
|
|
|
req := &pb.GetStateRequest{
|
|
StoreName: store,
|
|
Key: key,
|
|
Consistency: (v1.StateOptions_StateConsistency(sc)),
|
|
}
|
|
|
|
result, err := c.protoClient.GetState(authContext(ctx), req)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "error getting state")
|
|
}
|
|
|
|
return result.Data, nil
|
|
}
|
|
|
|
// GetState retreaves state from specific store using default consistency option
|
|
func (c *Client) GetState(ctx context.Context, store, key string) (out []byte, err error) {
|
|
return c.GetStateWithConsistency(ctx, store, key, StateConsistencyStrong)
|
|
}
|
|
|
|
// *** Delete State ***
|
|
|
|
// DeleteStateWithOptions deletes content from store using provided state options and etag
|
|
func (c *Client) DeleteStateWithOptions(ctx context.Context, store, key, etag string, opts *StateOptions) error {
|
|
if store == "" {
|
|
return errors.New("nil store")
|
|
}
|
|
if key == "" {
|
|
return errors.New("nil key")
|
|
}
|
|
|
|
req := &pb.DeleteStateRequest{
|
|
StoreName: store,
|
|
Key: key,
|
|
Etag: etag,
|
|
Options: toProtoStateOptions(opts),
|
|
}
|
|
|
|
_, err := c.protoClient.DeleteState(authContext(ctx), req)
|
|
if err != nil {
|
|
return errors.Wrap(err, "error deleting state")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// DeleteState deletes content from store using default state options
|
|
func (c *Client) DeleteState(ctx context.Context, store, key string) error {
|
|
return c.DeleteStateWithOptions(ctx, store, key, "", nil)
|
|
}
|