remove etcd state store to prepare to update grpc to new version (#499)
* remove etcd state store to prepare to update grpc to new version * update go.mod Co-authored-by: Yaron Schneider <yaronsc@microsoft.com>
This commit is contained in:
parent
df5be19205
commit
4e77594a4d
|
|
@ -1,3 +1,4 @@
|
|||
/dist
|
||||
.idea
|
||||
.vscode
|
||||
.vscode
|
||||
/vendor
|
||||
|
|
|
|||
2
go.mod
2
go.mod
|
|
@ -69,10 +69,8 @@ require (
|
|||
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271
|
||||
github.com/stretchr/testify v1.5.1
|
||||
github.com/tidwall/pretty v1.0.1 // indirect
|
||||
github.com/tmc/grpc-websocket-proxy v0.0.0-20200122045848-3419fae592fc // indirect
|
||||
github.com/valyala/fasthttp v1.6.0
|
||||
github.com/vmware/vmware-go-kcl v0.0.0-20191104173950-b6c74c3fe74e
|
||||
go.etcd.io/etcd v3.3.17+incompatible
|
||||
go.mongodb.org/mongo-driver v1.1.2
|
||||
go.opencensus.io v0.22.3
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
|
||||
|
|
|
|||
2
go.sum
2
go.sum
|
|
@ -802,8 +802,6 @@ github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhV
|
|||
github.com/tidwall/pretty v1.0.1 h1:WE4RBSZ1x6McVVC8S/Md+Qse8YUv6HRObAx6ke00NY8=
|
||||
github.com/tidwall/pretty v1.0.1/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
|
||||
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
|
||||
github.com/tmc/grpc-websocket-proxy v0.0.0-20200122045848-3419fae592fc h1:yUaosFVTJwnltaHbSNC3i82I92quFs+OFPRl8kNMVwo=
|
||||
github.com/tmc/grpc-websocket-proxy v0.0.0-20200122045848-3419fae592fc/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
|
||||
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
|
||||
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
|
||||
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
|
||||
|
|
|
|||
|
|
@ -1,233 +0,0 @@
|
|||
// ------------------------------------------------------------
|
||||
// Copyright (c) Microsoft Corporation.
|
||||
// Licensed under the MIT License.
|
||||
// ------------------------------------------------------------
|
||||
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/dapr/components-contrib/state"
|
||||
"github.com/dapr/dapr/pkg/logger"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
"go.etcd.io/etcd/clientv3"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultOperationTimeout = 10 * time.Second
|
||||
defaultSeparator = ","
|
||||
)
|
||||
|
||||
var (
|
||||
errMissingEndpoints = errors.New("endpoints are required")
|
||||
errInvalidDialTimeout = errors.New("DialTimeout is invalid")
|
||||
)
|
||||
|
||||
// ETCD is a state store
|
||||
type ETCD struct {
|
||||
json jsoniter.API
|
||||
client *clientv3.Client
|
||||
operationTimeout time.Duration
|
||||
logger logger.Logger
|
||||
}
|
||||
|
||||
type configProperties struct {
|
||||
Endpoints string `json:"endpoints"`
|
||||
DialTimeout string `json:"dialTimeout"`
|
||||
OperationTimeout string `json:"operationTimeout"`
|
||||
}
|
||||
|
||||
//--- StateStore ---
|
||||
|
||||
// NewETCD returns a new ETCD state store
|
||||
func NewETCD(logger logger.Logger) *ETCD {
|
||||
return &ETCD{
|
||||
json: jsoniter.ConfigFastest,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// Init does metadata and connection parsing
|
||||
func (r *ETCD) Init(metadata state.Metadata) error {
|
||||
cp, err := toConfigProperties(metadata.Properties)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = validateRequired(cp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
clientConfig, err := toEtcdConfig(cp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
client, err := clientv3.New(*clientConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r.client = client
|
||||
|
||||
ot := defaultOperationTimeout
|
||||
newOt, err := time.ParseDuration(cp.OperationTimeout)
|
||||
if err == nil {
|
||||
r.operationTimeout = newOt
|
||||
}
|
||||
r.operationTimeout = ot
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func toConfigProperties(properties map[string]string) (*configProperties, error) {
|
||||
b, err := json.Marshal(properties)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var configProps configProperties
|
||||
err = json.Unmarshal(b, &configProps)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &configProps, nil
|
||||
}
|
||||
|
||||
func toEtcdConfig(configProps *configProperties) (*clientv3.Config, error) {
|
||||
endpoints := strings.Split(configProps.Endpoints, defaultSeparator)
|
||||
dialTimeout, err := time.ParseDuration(configProps.DialTimeout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
clientConfig := clientv3.Config{
|
||||
Endpoints: endpoints,
|
||||
DialTimeout: dialTimeout,
|
||||
DialOptions: []grpc.DialOption{grpc.WithBlock()},
|
||||
}
|
||||
|
||||
return &clientConfig, nil
|
||||
}
|
||||
|
||||
func validateRequired(configProps *configProperties) error {
|
||||
if len(configProps.Endpoints) == 0 {
|
||||
return errMissingEndpoints
|
||||
}
|
||||
|
||||
_, err := time.ParseDuration(configProps.DialTimeout)
|
||||
if err != nil {
|
||||
return errInvalidDialTimeout
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get retrieves state from ETCD with a key
|
||||
func (r *ETCD) Get(req *state.GetRequest) (*state.GetResponse, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), r.operationTimeout)
|
||||
defer cancel()
|
||||
resp, err := r.client.Get(ctx, req.Key, clientv3.WithSort(clientv3.SortByVersion, clientv3.SortDescend))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if resp.Count == 0 {
|
||||
return &state.GetResponse{}, nil
|
||||
}
|
||||
|
||||
return &state.GetResponse{
|
||||
Data: resp.Kvs[0].Value,
|
||||
ETag: fmt.Sprintf("%d", resp.Kvs[0].Version),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Delete performs a delete operation
|
||||
func (r *ETCD) Delete(req *state.DeleteRequest) error {
|
||||
err := state.CheckRequestOptions(req.Options)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx, cancelFn := context.WithTimeout(context.Background(), r.operationTimeout)
|
||||
defer cancelFn()
|
||||
|
||||
version := req.ETag
|
||||
// honor client etag
|
||||
if version != "" {
|
||||
txn := r.client.KV.Txn(ctx)
|
||||
_, err = txn.If(clientv3.Compare(clientv3.Version(req.Key), "=", version)).Then(clientv3.OpDelete(req.Key)).Commit()
|
||||
} else {
|
||||
_, err = r.client.Delete(ctx, req.Key)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// BulkDelete performs a bulk delete operation
|
||||
func (r *ETCD) BulkDelete(req []state.DeleteRequest) error {
|
||||
for i := range req {
|
||||
err := r.Delete(&req[i])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Set saves state into ETCD
|
||||
func (r *ETCD) Set(req *state.SetRequest) error {
|
||||
err := state.CheckRequestOptions(req.Options)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ctx, cancelFn := context.WithTimeout(context.Background(), r.operationTimeout)
|
||||
defer cancelFn()
|
||||
var vStr string
|
||||
b, ok := req.Value.([]byte)
|
||||
if ok {
|
||||
vStr = string(b)
|
||||
} else {
|
||||
vStr, _ = r.json.MarshalToString(req.Value)
|
||||
}
|
||||
|
||||
version := req.ETag
|
||||
// honor client etag
|
||||
if version != "" {
|
||||
txn := r.client.KV.Txn(ctx)
|
||||
_, err = txn.If(clientv3.Compare(clientv3.Version(req.Key), "=", version)).Then(clientv3.OpPut(req.Key, vStr)).Commit()
|
||||
} else {
|
||||
_, err = r.client.Put(ctx, req.Key, vStr)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// BulkSet performs a bulks save operation
|
||||
func (r *ETCD) BulkSet(req []state.SetRequest) error {
|
||||
for i := range req {
|
||||
err := r.Set(&req[i])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
@ -1,71 +0,0 @@
|
|||
// ------------------------------------------------------------
|
||||
// Copyright (c) Microsoft Corporation.
|
||||
// Licensed under the MIT License.
|
||||
// ------------------------------------------------------------
|
||||
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// toConfigProperties
|
||||
func TestToConfigProperties(t *testing.T) {
|
||||
t.Run("With all required fields", func(t *testing.T) {
|
||||
properties := map[string]string{
|
||||
"endpoints": "localhost:6379",
|
||||
"dialTimeout": "5s",
|
||||
}
|
||||
cp, err := toConfigProperties(properties)
|
||||
assert.Equal(t, err, nil, fmt.Sprintf("Unexpected error: %v", err))
|
||||
assert.NotNil(t, cp, "failed to respond to missing data field")
|
||||
assert.Equal(t, "localhost:6379", cp.Endpoints, "failed to get endpoints")
|
||||
assert.Equal(t, "5s", cp.DialTimeout, "failed to get DialTimeout")
|
||||
})
|
||||
}
|
||||
|
||||
// toEtcdConfig
|
||||
func TestToEtcdConfig(t *testing.T) {
|
||||
t.Run("With valid fields", func(t *testing.T) {
|
||||
cp := &configProperties{
|
||||
Endpoints: "localhost:6379",
|
||||
DialTimeout: "5s",
|
||||
}
|
||||
_, err := toEtcdConfig(cp)
|
||||
assert.Equal(t, err, nil, fmt.Sprintf("Unexpected error: %v", err))
|
||||
})
|
||||
t.Run("With invalid fields", func(t *testing.T) {
|
||||
cp := &configProperties{}
|
||||
_, err := toEtcdConfig(cp)
|
||||
assert.NotNil(t, err, "Expected error due to invalid fields")
|
||||
})
|
||||
}
|
||||
|
||||
// validateRequired
|
||||
func TestValidateRequired(t *testing.T) {
|
||||
t.Run("With all required fields", func(t *testing.T) {
|
||||
configProps := &configProperties{
|
||||
Endpoints: "localhost:6379",
|
||||
DialTimeout: "5s",
|
||||
}
|
||||
err := validateRequired(configProps)
|
||||
assert.Equal(t, nil, err, "failed to read all fields")
|
||||
})
|
||||
t.Run("With missing endpoints", func(t *testing.T) {
|
||||
configProps := &configProperties{
|
||||
DialTimeout: "5s",
|
||||
}
|
||||
err := validateRequired(configProps)
|
||||
assert.NotNil(t, err, "failed to get missing endpoints error")
|
||||
})
|
||||
t.Run("With missing dialTimeout", func(t *testing.T) {
|
||||
configProps := &configProperties{
|
||||
DialTimeout: "5s",
|
||||
}
|
||||
err := validateRequired(configProps)
|
||||
assert.NotNil(t, err, "failed to get invalid dialTimeout error")
|
||||
})
|
||||
}
|
||||
Loading…
Reference in New Issue