parent
7ab8fde494
commit
de01000c9b
|
@ -542,7 +542,11 @@ const components = {
|
|||
'internal/component/sql',
|
||||
],
|
||||
},
|
||||
'state.etcd': {
|
||||
'state.etcd.v1': {
|
||||
conformance: true,
|
||||
conformanceSetup: 'docker-compose.sh etcd',
|
||||
},
|
||||
'state.etcd.v2': {
|
||||
conformance: true,
|
||||
conformanceSetup: 'docker-compose.sh etcd',
|
||||
},
|
||||
|
|
2
go.mod
2
go.mod
|
@ -119,6 +119,7 @@ require (
|
|||
golang.org/x/oauth2 v0.8.0
|
||||
google.golang.org/api v0.115.0
|
||||
google.golang.org/grpc v1.54.0
|
||||
google.golang.org/protobuf v1.30.0
|
||||
gopkg.in/couchbase/gocb.v1 v1.6.7
|
||||
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
|
@ -371,7 +372,6 @@ require (
|
|||
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
|
||||
google.golang.org/appengine v1.6.7 // indirect
|
||||
google.golang.org/genproto v0.0.0-20230403163135-c38d8f061ccd // indirect
|
||||
google.golang.org/protobuf v1.30.0 // indirect
|
||||
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
|
||||
gopkg.in/couchbase/gocbcore.v7 v7.1.18 // indirect
|
||||
gopkg.in/couchbaselabs/gocbconnstr.v1 v1.0.4 // indirect
|
||||
|
|
|
@ -0,0 +1,202 @@
|
|||
//
|
||||
//Copyright 2021 The Dapr Authors
|
||||
//Licensed under the Apache License, Version 2.0 (the "License");
|
||||
//you may not use this file except in compliance with the License.
|
||||
//You may obtain a copy of the License at
|
||||
//http://www.apache.org/licenses/LICENSE-2.0
|
||||
//Unless required by applicable law or agreed to in writing, software
|
||||
//distributed under the License is distributed on an "AS IS" BASIS,
|
||||
//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
//See the License for the specific language governing permissions and
|
||||
//limitations under the License.
|
||||
|
||||
// Code generated by protoc-gen-go. DO NOT EDIT.
|
||||
// versions:
|
||||
// protoc-gen-go v1.28.1
|
||||
// protoc v3.21.12
|
||||
// source: internal/proto/state/etcd/v2/value.proto
|
||||
|
||||
package v2
|
||||
|
||||
import (
|
||||
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
|
||||
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
|
||||
durationpb "google.golang.org/protobuf/types/known/durationpb"
|
||||
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
|
||||
reflect "reflect"
|
||||
sync "sync"
|
||||
)
|
||||
|
||||
const (
|
||||
// Verify that this generated code is sufficiently up-to-date.
|
||||
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
|
||||
// Verify that runtime/protoimpl is sufficiently up-to-date.
|
||||
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
|
||||
)
|
||||
|
||||
// Value is the value of the state key item. It contains the underlying data as
|
||||
// well as necessary metadata.
|
||||
type Value struct {
|
||||
state protoimpl.MessageState
|
||||
sizeCache protoimpl.SizeCache
|
||||
unknownFields protoimpl.UnknownFields
|
||||
|
||||
// Required. The value of the state key item.
|
||||
Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
|
||||
// Required. The creation time of the state key item. This is an
|
||||
// approximation by the components-contrib instance since ETCD does not
|
||||
// provide this information natively.
|
||||
Ts *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=ts,proto3" json:"ts,omitempty"`
|
||||
// Optional. The Time To Live of the state key item. The duration of the TTL
|
||||
// is from the creation time of the key (`ts`). If not specified, the key has
|
||||
// no TTL.
|
||||
Ttl *durationpb.Duration `protobuf:"bytes,3,opt,name=ttl,proto3,oneof" json:"ttl,omitempty"`
|
||||
}
|
||||
|
||||
func (x *Value) Reset() {
|
||||
*x = Value{}
|
||||
if protoimpl.UnsafeEnabled {
|
||||
mi := &file_internal_proto_state_etcd_v2_value_proto_msgTypes[0]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
}
|
||||
|
||||
func (x *Value) String() string {
|
||||
return protoimpl.X.MessageStringOf(x)
|
||||
}
|
||||
|
||||
func (*Value) ProtoMessage() {}
|
||||
|
||||
func (x *Value) ProtoReflect() protoreflect.Message {
|
||||
mi := &file_internal_proto_state_etcd_v2_value_proto_msgTypes[0]
|
||||
if protoimpl.UnsafeEnabled && x != nil {
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
if ms.LoadMessageInfo() == nil {
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
return ms
|
||||
}
|
||||
return mi.MessageOf(x)
|
||||
}
|
||||
|
||||
// Deprecated: Use Value.ProtoReflect.Descriptor instead.
|
||||
func (*Value) Descriptor() ([]byte, []int) {
|
||||
return file_internal_proto_state_etcd_v2_value_proto_rawDescGZIP(), []int{0}
|
||||
}
|
||||
|
||||
func (x *Value) GetData() []byte {
|
||||
if x != nil {
|
||||
return x.Data
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *Value) GetTs() *timestamppb.Timestamp {
|
||||
if x != nil {
|
||||
return x.Ts
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *Value) GetTtl() *durationpb.Duration {
|
||||
if x != nil {
|
||||
return x.Ttl
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var File_internal_proto_state_etcd_v2_value_proto protoreflect.FileDescriptor
|
||||
|
||||
var file_internal_proto_state_etcd_v2_value_proto_rawDesc = []byte{
|
||||
0x0a, 0x28, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f,
|
||||
0x2f, 0x73, 0x74, 0x61, 0x74, 0x65, 0x2f, 0x65, 0x74, 0x63, 0x64, 0x2f, 0x76, 0x32, 0x2f, 0x76,
|
||||
0x61, 0x6c, 0x75, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x1e, 0x64, 0x61, 0x70, 0x72,
|
||||
0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x63, 0x6f, 0x6d, 0x70, 0x6f, 0x6e, 0x65, 0x6e, 0x74,
|
||||
0x73, 0x2e, 0x73, 0x74, 0x61, 0x74, 0x65, 0x2e, 0x76, 0x32, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67,
|
||||
0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65,
|
||||
0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1e, 0x67, 0x6f, 0x6f,
|
||||
0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x64, 0x75, 0x72,
|
||||
0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x81, 0x01, 0x0a, 0x05,
|
||||
0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20,
|
||||
0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x2a, 0x0a, 0x02, 0x74, 0x73, 0x18,
|
||||
0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70,
|
||||
0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d,
|
||||
0x70, 0x52, 0x02, 0x74, 0x73, 0x12, 0x30, 0x0a, 0x03, 0x74, 0x74, 0x6c, 0x18, 0x03, 0x20, 0x01,
|
||||
0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74,
|
||||
0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x48, 0x00, 0x52,
|
||||
0x03, 0x74, 0x74, 0x6c, 0x88, 0x01, 0x01, 0x42, 0x06, 0x0a, 0x04, 0x5f, 0x74, 0x74, 0x6c, 0x42,
|
||||
0x41, 0x5a, 0x3f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x64, 0x61,
|
||||
0x70, 0x72, 0x2f, 0x63, 0x6f, 0x6d, 0x70, 0x6f, 0x6e, 0x65, 0x6e, 0x74, 0x73, 0x2d, 0x63, 0x6f,
|
||||
0x6e, 0x74, 0x72, 0x69, 0x62, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70,
|
||||
0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x65, 0x2f, 0x65, 0x74, 0x63, 0x64, 0x2f,
|
||||
0x76, 0x32, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||
}
|
||||
|
||||
var (
|
||||
file_internal_proto_state_etcd_v2_value_proto_rawDescOnce sync.Once
|
||||
file_internal_proto_state_etcd_v2_value_proto_rawDescData = file_internal_proto_state_etcd_v2_value_proto_rawDesc
|
||||
)
|
||||
|
||||
func file_internal_proto_state_etcd_v2_value_proto_rawDescGZIP() []byte {
|
||||
file_internal_proto_state_etcd_v2_value_proto_rawDescOnce.Do(func() {
|
||||
file_internal_proto_state_etcd_v2_value_proto_rawDescData = protoimpl.X.CompressGZIP(file_internal_proto_state_etcd_v2_value_proto_rawDescData)
|
||||
})
|
||||
return file_internal_proto_state_etcd_v2_value_proto_rawDescData
|
||||
}
|
||||
|
||||
var file_internal_proto_state_etcd_v2_value_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
|
||||
var file_internal_proto_state_etcd_v2_value_proto_goTypes = []interface{}{
|
||||
(*Value)(nil), // 0: dapr.proto.components.state.v2.Value
|
||||
(*timestamppb.Timestamp)(nil), // 1: google.protobuf.Timestamp
|
||||
(*durationpb.Duration)(nil), // 2: google.protobuf.Duration
|
||||
}
|
||||
var file_internal_proto_state_etcd_v2_value_proto_depIdxs = []int32{
|
||||
1, // 0: dapr.proto.components.state.v2.Value.ts:type_name -> google.protobuf.Timestamp
|
||||
2, // 1: dapr.proto.components.state.v2.Value.ttl:type_name -> google.protobuf.Duration
|
||||
2, // [2:2] is the sub-list for method output_type
|
||||
2, // [2:2] is the sub-list for method input_type
|
||||
2, // [2:2] is the sub-list for extension type_name
|
||||
2, // [2:2] is the sub-list for extension extendee
|
||||
0, // [0:2] is the sub-list for field type_name
|
||||
}
|
||||
|
||||
func init() { file_internal_proto_state_etcd_v2_value_proto_init() }
|
||||
func file_internal_proto_state_etcd_v2_value_proto_init() {
|
||||
if File_internal_proto_state_etcd_v2_value_proto != nil {
|
||||
return
|
||||
}
|
||||
if !protoimpl.UnsafeEnabled {
|
||||
file_internal_proto_state_etcd_v2_value_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
|
||||
switch v := v.(*Value); i {
|
||||
case 0:
|
||||
return &v.state
|
||||
case 1:
|
||||
return &v.sizeCache
|
||||
case 2:
|
||||
return &v.unknownFields
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
file_internal_proto_state_etcd_v2_value_proto_msgTypes[0].OneofWrappers = []interface{}{}
|
||||
type x struct{}
|
||||
out := protoimpl.TypeBuilder{
|
||||
File: protoimpl.DescBuilder{
|
||||
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
|
||||
RawDescriptor: file_internal_proto_state_etcd_v2_value_proto_rawDesc,
|
||||
NumEnums: 0,
|
||||
NumMessages: 1,
|
||||
NumExtensions: 0,
|
||||
NumServices: 0,
|
||||
},
|
||||
GoTypes: file_internal_proto_state_etcd_v2_value_proto_goTypes,
|
||||
DependencyIndexes: file_internal_proto_state_etcd_v2_value_proto_depIdxs,
|
||||
MessageInfos: file_internal_proto_state_etcd_v2_value_proto_msgTypes,
|
||||
}.Build()
|
||||
File_internal_proto_state_etcd_v2_value_proto = out.File
|
||||
file_internal_proto_state_etcd_v2_value_proto_rawDesc = nil
|
||||
file_internal_proto_state_etcd_v2_value_proto_goTypes = nil
|
||||
file_internal_proto_state_etcd_v2_value_proto_depIdxs = nil
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
Copyright 2021 The Dapr Authors
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
syntax = "proto3";
|
||||
|
||||
package dapr.proto.components.state.v2;
|
||||
|
||||
import "google/protobuf/timestamp.proto";
|
||||
import "google/protobuf/duration.proto";
|
||||
|
||||
option go_package = "github.com/dapr/components-contrib/internal/proto/state/etcd/v2";
|
||||
|
||||
|
||||
// Value is the value of the state key item. It contains the underlying data as
|
||||
// well as necessary metadata.
|
||||
message Value {
|
||||
// Required. The value of the state key item.
|
||||
bytes data = 1;
|
||||
|
||||
// Required. The creation time of the state key item. This is an
|
||||
// approximation by the components-contrib instance since ETCD does not
|
||||
// provide this information natively.
|
||||
google.protobuf.Timestamp ts = 2;
|
||||
|
||||
// Optional. The Time To Live of the state key item. The duration of the TTL
|
||||
// is from the creation time of the key (`ts`). If not specified, the key has
|
||||
// no TTL.
|
||||
optional google.protobuf.Duration ttl = 3;
|
||||
}
|
|
@ -17,7 +17,6 @@ import (
|
|||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
@ -43,6 +42,7 @@ type Etcd struct {
|
|||
keyPrefixPath string
|
||||
features []state.Feature
|
||||
logger logger.Logger
|
||||
schema schemaMarshaller
|
||||
}
|
||||
|
||||
type etcdConfig struct {
|
||||
|
@ -55,9 +55,19 @@ type etcdConfig struct {
|
|||
Key string `json:"key"`
|
||||
}
|
||||
|
||||
// NewEtcdStateStore returns a new etcd state store.
|
||||
func NewEtcdStateStore(logger logger.Logger) state.Store {
|
||||
// NewEtcdStateStoreV1 returns a new etcd state store for schema V1.
|
||||
func NewEtcdStateStoreV1(logger logger.Logger) state.Store {
|
||||
return newETCD(logger, schemaV1{})
|
||||
}
|
||||
|
||||
// NewEtcdStateStoreV2 returns a new etcd state store for schema V2.
|
||||
func NewEtcdStateStoreV2(logger logger.Logger) state.Store {
|
||||
return newETCD(logger, schemaV2{})
|
||||
}
|
||||
|
||||
func newETCD(logger logger.Logger, schema schemaMarshaller) state.Store {
|
||||
s := &Etcd{
|
||||
schema: schema,
|
||||
logger: logger,
|
||||
features: []state.Feature{state.FeatureETag, state.FeatureTransactional},
|
||||
}
|
||||
|
@ -141,9 +151,15 @@ func (e *Etcd) Get(ctx context.Context, req *state.GetRequest) (*state.GetRespon
|
|||
return &state.GetResponse{}, nil
|
||||
}
|
||||
|
||||
data, metadata, err := e.schema.decode(resp.Kvs[0].Value)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &state.GetResponse{
|
||||
Data: resp.Kvs[0].Value,
|
||||
ETag: ptr.Of(strconv.Itoa(int(resp.Kvs[0].ModRevision))),
|
||||
Data: data,
|
||||
ETag: ptr.Of(strconv.Itoa(int(resp.Kvs[0].ModRevision))),
|
||||
Metadata: metadata,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -160,20 +176,20 @@ func (e *Etcd) Set(ctx context.Context, req *state.SetRequest) error {
|
|||
return err
|
||||
}
|
||||
|
||||
reqVal, err := stateutils.Marshal(req.Value, json.Marshal)
|
||||
return e.doSet(ctx, keyWithPath, req.Value, req.ETag, ttlInSeconds)
|
||||
}
|
||||
|
||||
func (e *Etcd) doSet(ctx context.Context, key string, val any, etag *string, ttlInSeconds *int64) error {
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
reqVal, err := e.schema.encode(val, ttlInSeconds)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return e.doSet(ctx, keyWithPath, string(reqVal), req.ETag, ttlInSeconds)
|
||||
}
|
||||
|
||||
func (e *Etcd) doSet(ctx context.Context, key, reqVal string, etag *string, ttlInSeconds int64) error {
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if ttlInSeconds > 0 {
|
||||
resp, err := e.client.Grant(ctx, ttlInSeconds)
|
||||
if ttlInSeconds != nil {
|
||||
resp, err := e.client.Grant(ctx, *ttlInSeconds)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't grant lease %s: %w", key, err)
|
||||
}
|
||||
|
@ -207,22 +223,18 @@ func (e *Etcd) doSet(ctx context.Context, key, reqVal string, etag *string, ttlI
|
|||
return nil
|
||||
}
|
||||
|
||||
func (e *Etcd) doSetValidateParameters(req *state.SetRequest) (int64, error) {
|
||||
func (e *Etcd) doSetValidateParameters(req *state.SetRequest) (*int64, error) {
|
||||
err := state.CheckRequestOptions(req.Options)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var ttlVal int64
|
||||
ttlInSeconds, err := stateutils.ParseTTL(req.Metadata)
|
||||
ttlInSeconds, err := stateutils.ParseTTL64(req.Metadata)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if ttlInSeconds != nil {
|
||||
ttlVal = int64(*ttlInSeconds)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return ttlVal, nil
|
||||
return ttlInSeconds, nil
|
||||
}
|
||||
|
||||
// Delete performes a Etcd KV delete operation.
|
||||
|
@ -339,7 +351,7 @@ func (e *Etcd) Multi(ctx context.Context, request *state.TransactionalStateReque
|
|||
return err
|
||||
}
|
||||
|
||||
reqVal, err := stateutils.Marshal(req.Value, json.Marshal)
|
||||
reqVal, err := e.schema.encode(req.Value, ttlInSeconds)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -348,19 +360,19 @@ func (e *Etcd) Multi(ctx context.Context, request *state.TransactionalStateReque
|
|||
etag, _ := strconv.ParseInt(*req.ETag, 10, 64)
|
||||
cmp = clientv3.Compare(clientv3.ModRevision(keyWithPath), "=", etag)
|
||||
}
|
||||
if ttlInSeconds > 0 {
|
||||
resp, err := e.client.Grant(ctx, ttlInSeconds)
|
||||
if ttlInSeconds != nil {
|
||||
resp, err := e.client.Grant(ctx, *ttlInSeconds)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't grant lease %s: %w", keyWithPath, err)
|
||||
}
|
||||
put := clientv3.OpPut(keyWithPath, string(reqVal), clientv3.WithLease(resp.ID))
|
||||
put := clientv3.OpPut(keyWithPath, reqVal, clientv3.WithLease(resp.ID))
|
||||
if req.HasETag() {
|
||||
ops = append(ops, clientv3.OpTxn([]clientv3.Cmp{cmp}, []clientv3.Op{put}, nil))
|
||||
} else {
|
||||
ops = append(ops, clientv3.OpTxn(nil, []clientv3.Op{put}, nil))
|
||||
}
|
||||
} else {
|
||||
put := clientv3.OpPut(keyWithPath, string(reqVal))
|
||||
put := clientv3.OpPut(keyWithPath, reqVal)
|
||||
if req.HasETag() {
|
||||
ops = append(ops, clientv3.OpTxn([]clientv3.Cmp{cmp}, []clientv3.Op{put}, nil))
|
||||
} else {
|
||||
|
|
|
@ -0,0 +1,91 @@
|
|||
/*
|
||||
Copyright 2021 The Dapr Authors
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"google.golang.org/protobuf/proto"
|
||||
"google.golang.org/protobuf/types/known/durationpb"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
|
||||
pbv2 "github.com/dapr/components-contrib/internal/proto/state/etcd/v2"
|
||||
"github.com/dapr/components-contrib/state"
|
||||
"github.com/dapr/components-contrib/state/utils"
|
||||
)
|
||||
|
||||
// schemaMarshaller is an interface for encoding and decoding values which are
|
||||
// written and read from ETCD. Different storage schema versions store values
|
||||
// in different formats or envelopes.
|
||||
type schemaMarshaller interface {
|
||||
// encode the value in the correct storage schema.
|
||||
encode(data any, ttlInSeconds *int64) (string, error)
|
||||
|
||||
// decode the value from the correct storage schema, optionally returning
|
||||
// metadata extracted from the envelope.
|
||||
decode(data []byte) ([]byte, map[string]string, error)
|
||||
}
|
||||
|
||||
type schemaV1 struct{}
|
||||
|
||||
func (schemaV1) encode(data any, _ *int64) (string, error) {
|
||||
reqVal, err := utils.Marshal(data, json.Marshal)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return string(reqVal), nil
|
||||
}
|
||||
|
||||
func (schemaV1) decode(data []byte) ([]byte, map[string]string, error) {
|
||||
return data, nil, nil
|
||||
}
|
||||
|
||||
type schemaV2 struct{}
|
||||
|
||||
func (schemaV2) encode(data any, ttlInSeconds *int64) (string, error) {
|
||||
dataB, err := utils.JSONStringify(data)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
var duration durationpb.Duration
|
||||
if ttlInSeconds != nil {
|
||||
duration = durationpb.Duration{Seconds: *ttlInSeconds}
|
||||
}
|
||||
|
||||
value, err := proto.Marshal(&pbv2.Value{
|
||||
Data: dataB,
|
||||
Ts: timestamppb.New(time.Now().UTC()),
|
||||
Ttl: &duration,
|
||||
})
|
||||
|
||||
return string(value), err
|
||||
}
|
||||
|
||||
func (schemaV2) decode(data []byte) ([]byte, map[string]string, error) {
|
||||
var value pbv2.Value
|
||||
if err := proto.Unmarshal(data, &value); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
var metadata map[string]string
|
||||
if value.Ttl != nil {
|
||||
metadata = map[string]string{
|
||||
state.GetRespMetaKeyTTLExpireTime: value.Ts.AsTime().Add(value.Ttl.AsDuration()).Format(time.RFC3339),
|
||||
}
|
||||
}
|
||||
|
||||
return value.Data, metadata, nil
|
||||
}
|
|
@ -24,8 +24,7 @@ const MetadataTTLKey = "ttlInSeconds"
|
|||
|
||||
// ParseTTL parses the "ttlInSeconds" metadata property.
|
||||
func ParseTTL(requestMetadata map[string]string) (*int, error) {
|
||||
val, found := requestMetadata[MetadataTTLKey]
|
||||
if found && val != "" {
|
||||
if val := requestMetadata[MetadataTTLKey]; val != "" {
|
||||
parsedVal, err := strconv.ParseInt(val, 10, 0)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("incorrect value for metadata '%s': %w", MetadataTTLKey, err)
|
||||
|
@ -38,3 +37,18 @@ func ParseTTL(requestMetadata map[string]string) (*int, error) {
|
|||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// ParseTTL64 parses the "ttlInSeconds" metadata property.
|
||||
func ParseTTL64(requestMetadata map[string]string) (*int64, error) {
|
||||
if val := requestMetadata[MetadataTTLKey]; val != "" {
|
||||
parsedVal, err := strconv.ParseInt(val, 10, 0)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("incorrect value for metadata '%s': %w", MetadataTTLKey, err)
|
||||
}
|
||||
if parsedVal < -1 || parsedVal > math.MaxInt32 {
|
||||
return nil, fmt.Errorf("incorrect value for metadata '%s': must be -1 or greater", MetadataTTLKey)
|
||||
}
|
||||
return &parsedVal, nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
|
|
@ -13,6 +13,13 @@ limitations under the License.
|
|||
|
||||
package utils
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func Marshal(val interface{}, marshaler func(interface{}) ([]byte, error)) ([]byte, error) {
|
||||
var err error = nil
|
||||
bt, ok := val.([]byte)
|
||||
|
@ -22,3 +29,46 @@ func Marshal(val interface{}, marshaler func(interface{}) ([]byte, error)) ([]by
|
|||
|
||||
return bt, err
|
||||
}
|
||||
|
||||
func JSONStringify(value any) ([]byte, error) {
|
||||
switch value := value.(type) {
|
||||
case []byte:
|
||||
return value, nil
|
||||
case int:
|
||||
return []byte(strconv.FormatInt(int64(value), 10)), nil
|
||||
case int8:
|
||||
return []byte(strconv.FormatInt(int64(value), 10)), nil
|
||||
case int16:
|
||||
return []byte(strconv.FormatInt(int64(value), 10)), nil
|
||||
case int32:
|
||||
return []byte(strconv.FormatInt(int64(value), 10)), nil
|
||||
case int64:
|
||||
return []byte(strconv.FormatInt(value, 10)), nil
|
||||
case uint:
|
||||
return []byte(strconv.FormatUint(uint64(value), 10)), nil
|
||||
case uint16:
|
||||
return []byte(strconv.FormatUint(uint64(value), 10)), nil
|
||||
case uint32:
|
||||
return []byte(strconv.FormatUint(uint64(value), 10)), nil
|
||||
case uint64:
|
||||
return []byte(strconv.FormatUint(value, 10)), nil
|
||||
case float32:
|
||||
return []byte(strconv.FormatFloat(float64(value), 'f', -1, 64)), nil
|
||||
case float64:
|
||||
return []byte(strconv.FormatFloat(value, 'f', -1, 64)), nil
|
||||
case bool:
|
||||
if value {
|
||||
return []byte("true"), nil
|
||||
}
|
||||
return []byte("false"), nil
|
||||
case string:
|
||||
return []byte(`"` + strings.ReplaceAll(value, `"`, `\"`) + `"`), nil
|
||||
default:
|
||||
var buf bytes.Buffer
|
||||
enc := json.NewEncoder(&buf)
|
||||
enc.SetEscapeHTML(false)
|
||||
err := enc.Encode(value)
|
||||
// Trim newline.
|
||||
return bytes.TrimSuffix(buf.Bytes(), []byte{0xa}), err
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,4 +11,4 @@ spec:
|
|||
- name: keyPrefixPath
|
||||
value: "dapr"
|
||||
- name: tlsEnable
|
||||
value: "false"
|
||||
value: "false"
|
|
@ -0,0 +1,14 @@
|
|||
apiVersion: dapr.io/v1alpha1
|
||||
kind: Component
|
||||
metadata:
|
||||
name: statestore
|
||||
spec:
|
||||
type: state.etcd
|
||||
version: v2
|
||||
metadata:
|
||||
- name: endpoints
|
||||
value: "localhost:12379"
|
||||
- name: keyPrefixPath
|
||||
value: "dapr"
|
||||
- name: tlsEnable
|
||||
value: "false"
|
|
@ -73,9 +73,11 @@ components:
|
|||
operations: [ "transaction", "etag", "first-write" ]
|
||||
- component: aws.dynamodb.terraform
|
||||
operations: [ "transaction", "etag", "first-write" ]
|
||||
- component: etcd
|
||||
- component: etcd.v1
|
||||
operations: [ "transaction", "etag", "first-write", "ttl" ]
|
||||
- component: etcd.v2
|
||||
operations: [ "transaction", "etag", "first-write", "ttl" ]
|
||||
- component: gcp.firestore.docker
|
||||
operations: []
|
||||
- component: gcp.firestore.cloud
|
||||
operations: []
|
||||
operations: []
|
||||
|
|
|
@ -126,17 +126,17 @@ func ConformanceTests(t *testing.T, props map[string]string, inputBinding bindin
|
|||
// Check for an output binding specific operation before init
|
||||
if config.HasOperation("operations") {
|
||||
testLogger.Info("Init output binding ...")
|
||||
err := outputBinding.Init(context.Background(), bindings.Metadata{
|
||||
Base: metadata.Base{Properties: props},
|
||||
})
|
||||
err := outputBinding.Init(context.Background(), bindings.Metadata{Base: metadata.Base{
|
||||
Properties: props,
|
||||
}})
|
||||
assert.NoError(t, err, "expected no error setting up output binding")
|
||||
}
|
||||
// Check for an input binding specific operation before init
|
||||
if config.HasOperation("read") {
|
||||
testLogger.Info("Init input binding ...")
|
||||
err := inputBinding.Init(context.Background(), bindings.Metadata{
|
||||
Base: metadata.Base{Properties: props},
|
||||
})
|
||||
err := inputBinding.Init(context.Background(), bindings.Metadata{Base: metadata.Base{
|
||||
Properties: props,
|
||||
}})
|
||||
assert.NoError(t, err, "expected no error setting up input binding")
|
||||
}
|
||||
testLogger.Info("Init test done.")
|
||||
|
|
|
@ -329,7 +329,6 @@ func (tc *TestConfiguration) loadComponentsAndProperties(t *testing.T, filepath
|
|||
require.Equal(t, 1, len(comps)) // We only expect a single component per file
|
||||
c := comps[0]
|
||||
props, err := ConvertMetadataToProperties(c.Spec.Metadata)
|
||||
|
||||
return props, err
|
||||
}
|
||||
|
||||
|
@ -577,8 +576,10 @@ func loadStateStore(tc TestComponent) state.Store {
|
|||
store = s_awsdynamodb.NewDynamoDBStateStore(testLogger)
|
||||
case "aws.dynamodb.terraform":
|
||||
store = s_awsdynamodb.NewDynamoDBStateStore(testLogger)
|
||||
case "etcd":
|
||||
store = s_etcd.NewEtcdStateStore(testLogger)
|
||||
case "etcd.v1":
|
||||
store = s_etcd.NewEtcdStateStoreV1(testLogger)
|
||||
case "etcd.v2":
|
||||
store = s_etcd.NewEtcdStateStoreV2(testLogger)
|
||||
case "gcp.firestore.docker":
|
||||
store = s_gcpfirestore.NewFirestoreStateStore(testLogger)
|
||||
case "gcp.firestore.cloud":
|
||||
|
|
|
@ -159,7 +159,9 @@ func ConformanceTests(t *testing.T, props map[string]string, store configuration
|
|||
|
||||
// Initializing store
|
||||
err = store.Init(context.Background(), configuration.Metadata{
|
||||
Base: metadata.Base{Properties: props},
|
||||
Base: metadata.Base{
|
||||
Properties: props,
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
|
|
@ -114,9 +114,9 @@ func ConformanceTests(t *testing.T, props map[string]string, component contribCr
|
|||
|
||||
// Init
|
||||
t.Run("Init", func(t *testing.T) {
|
||||
err := component.Init(context.Background(), contribCrypto.Metadata{
|
||||
Base: metadata.Base{Properties: props},
|
||||
})
|
||||
err := component.Init(context.Background(), contribCrypto.Metadata{Base: metadata.Base{
|
||||
Properties: props,
|
||||
}})
|
||||
require.NoError(t, err, "expected no error on initializing store")
|
||||
})
|
||||
|
||||
|
|
|
@ -105,9 +105,9 @@ func ConformanceTests(t *testing.T, props map[string]string, ps pubsub.PubSub, c
|
|||
|
||||
// Init
|
||||
t.Run("init", func(t *testing.T) {
|
||||
err := ps.Init(context.Background(), pubsub.Metadata{
|
||||
Base: metadata.Base{Properties: props},
|
||||
})
|
||||
err := ps.Init(context.Background(), pubsub.Metadata{Base: metadata.Base{
|
||||
Properties: props,
|
||||
}})
|
||||
assert.NoError(t, err, "expected no error on setting up pubsub")
|
||||
})
|
||||
|
||||
|
|
|
@ -49,9 +49,9 @@ func ConformanceTests(t *testing.T, props map[string]string, store secretstores.
|
|||
|
||||
// Init
|
||||
t.Run("init", func(t *testing.T) {
|
||||
err := store.Init(context.Background(), secretstores.Metadata{
|
||||
Base: metadata.Base{Properties: props},
|
||||
})
|
||||
err := store.Init(context.Background(), secretstores.Metadata{Base: metadata.Base{
|
||||
Properties: props,
|
||||
}})
|
||||
assert.NoError(t, err, "expected no error on initializing store")
|
||||
})
|
||||
|
||||
|
|
|
@ -238,9 +238,9 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
|
|||
}
|
||||
|
||||
t.Run("init", func(t *testing.T) {
|
||||
err := statestore.Init(context.Background(), state.Metadata{
|
||||
Base: metadata.Base{Properties: props},
|
||||
})
|
||||
err := statestore.Init(context.Background(), state.Metadata{Base: metadata.Base{
|
||||
Properties: props,
|
||||
}})
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
|
|
|
@ -51,9 +51,9 @@ func NewTestConfig(component string, operations []string, conf map[string]interf
|
|||
func ConformanceTests(t *testing.T, props map[string]string, workflowItem workflows.Workflow, config TestConfig) {
|
||||
// Test vars
|
||||
t.Run("init", func(t *testing.T) {
|
||||
err := workflowItem.Init(workflows.Metadata{
|
||||
Base: metadata.Base{Properties: props},
|
||||
})
|
||||
err := workflowItem.Init(workflows.Metadata{Base: metadata.Base{
|
||||
Properties: props,
|
||||
}})
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
|
|
Loading…
Reference in New Issue