From 4e77594a4d0f2687757590c13e2de4a457828d15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sky/=E6=95=96=E5=B0=8F=E5=89=91?= Date: Fri, 16 Oct 2020 00:53:49 +0800 Subject: [PATCH] remove etcd state store to prepare to update grpc to new version (#499) * remove etcd state store to prepare to update grpc to new version * update go.mod Co-authored-by: Yaron Schneider --- .gitignore | 3 +- go.mod | 2 - go.sum | 2 - state/etcd/etcd.go | 233 ---------------------------------------- state/etcd/etcd_test.go | 71 ------------ 5 files changed, 2 insertions(+), 309 deletions(-) delete mode 100644 state/etcd/etcd.go delete mode 100644 state/etcd/etcd_test.go diff --git a/.gitignore b/.gitignore index 9350d27e2..c40cb2c7a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /dist .idea -.vscode \ No newline at end of file +.vscode +/vendor diff --git a/go.mod b/go.mod index 63db2d83c..2c637e38d 100644 --- a/go.mod +++ b/go.mod @@ -69,10 +69,8 @@ require ( github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271 github.com/stretchr/testify v1.5.1 github.com/tidwall/pretty v1.0.1 // indirect - github.com/tmc/grpc-websocket-proxy v0.0.0-20200122045848-3419fae592fc // indirect github.com/valyala/fasthttp v1.6.0 github.com/vmware/vmware-go-kcl v0.0.0-20191104173950-b6c74c3fe74e - go.etcd.io/etcd v3.3.17+incompatible go.mongodb.org/mongo-driver v1.1.2 go.opencensus.io v0.22.3 golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 diff --git a/go.sum b/go.sum index 2eb73a46a..9a9da8716 100644 --- a/go.sum +++ b/go.sum @@ -802,8 +802,6 @@ github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhV github.com/tidwall/pretty v1.0.1 h1:WE4RBSZ1x6McVVC8S/Md+Qse8YUv6HRObAx6ke00NY8= github.com/tidwall/pretty v1.0.1/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= -github.com/tmc/grpc-websocket-proxy v0.0.0-20200122045848-3419fae592fc h1:yUaosFVTJwnltaHbSNC3i82I92quFs+OFPRl8kNMVwo= -github.com/tmc/grpc-websocket-proxy v0.0.0-20200122045848-3419fae592fc/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= diff --git a/state/etcd/etcd.go b/state/etcd/etcd.go deleted file mode 100644 index 5b3ad8fc4..000000000 --- a/state/etcd/etcd.go +++ /dev/null @@ -1,233 +0,0 @@ -// ------------------------------------------------------------ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. -// ------------------------------------------------------------ - -package etcd - -import ( - "context" - "encoding/json" - "errors" - "fmt" - "strings" - "time" - - "github.com/dapr/components-contrib/state" - "github.com/dapr/dapr/pkg/logger" - jsoniter "github.com/json-iterator/go" - "go.etcd.io/etcd/clientv3" - "google.golang.org/grpc" -) - -const ( - defaultOperationTimeout = 10 * time.Second - defaultSeparator = "," -) - -var ( - errMissingEndpoints = errors.New("endpoints are required") - errInvalidDialTimeout = errors.New("DialTimeout is invalid") -) - -// ETCD is a state store -type ETCD struct { - json jsoniter.API - client *clientv3.Client - operationTimeout time.Duration - logger logger.Logger -} - -type configProperties struct { - Endpoints string `json:"endpoints"` - DialTimeout string `json:"dialTimeout"` - OperationTimeout string `json:"operationTimeout"` -} - -//--- StateStore --- - -// NewETCD returns a new ETCD state store -func NewETCD(logger logger.Logger) *ETCD { - return &ETCD{ - json: jsoniter.ConfigFastest, - logger: logger, - } -} - -// Init does metadata and connection parsing -func (r *ETCD) Init(metadata state.Metadata) error { - cp, err := toConfigProperties(metadata.Properties) - if err != nil { - return err - } - err = validateRequired(cp) - if err != nil { - return err - } - - clientConfig, err := toEtcdConfig(cp) - if err != nil { - return err - } - - client, err := clientv3.New(*clientConfig) - if err != nil { - return err - } - - r.client = client - - ot := defaultOperationTimeout - newOt, err := time.ParseDuration(cp.OperationTimeout) - if err == nil { - r.operationTimeout = newOt - } - r.operationTimeout = ot - - return nil -} - -func toConfigProperties(properties map[string]string) (*configProperties, error) { - b, err := json.Marshal(properties) - if err != nil { - return nil, err - } - - var configProps configProperties - err = json.Unmarshal(b, &configProps) - if err != nil { - return nil, err - } - - return &configProps, nil -} - -func toEtcdConfig(configProps *configProperties) (*clientv3.Config, error) { - endpoints := strings.Split(configProps.Endpoints, defaultSeparator) - dialTimeout, err := time.ParseDuration(configProps.DialTimeout) - if err != nil { - return nil, err - } - - clientConfig := clientv3.Config{ - Endpoints: endpoints, - DialTimeout: dialTimeout, - DialOptions: []grpc.DialOption{grpc.WithBlock()}, - } - - return &clientConfig, nil -} - -func validateRequired(configProps *configProperties) error { - if len(configProps.Endpoints) == 0 { - return errMissingEndpoints - } - - _, err := time.ParseDuration(configProps.DialTimeout) - if err != nil { - return errInvalidDialTimeout - } - - return nil -} - -// Get retrieves state from ETCD with a key -func (r *ETCD) Get(req *state.GetRequest) (*state.GetResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), r.operationTimeout) - defer cancel() - resp, err := r.client.Get(ctx, req.Key, clientv3.WithSort(clientv3.SortByVersion, clientv3.SortDescend)) - if err != nil { - return nil, err - } - - if resp.Count == 0 { - return &state.GetResponse{}, nil - } - - return &state.GetResponse{ - Data: resp.Kvs[0].Value, - ETag: fmt.Sprintf("%d", resp.Kvs[0].Version), - }, nil -} - -// Delete performs a delete operation -func (r *ETCD) Delete(req *state.DeleteRequest) error { - err := state.CheckRequestOptions(req.Options) - if err != nil { - return err - } - - ctx, cancelFn := context.WithTimeout(context.Background(), r.operationTimeout) - defer cancelFn() - - version := req.ETag - // honor client etag - if version != "" { - txn := r.client.KV.Txn(ctx) - _, err = txn.If(clientv3.Compare(clientv3.Version(req.Key), "=", version)).Then(clientv3.OpDelete(req.Key)).Commit() - } else { - _, err = r.client.Delete(ctx, req.Key) - } - - if err != nil { - return err - } - - return nil -} - -// BulkDelete performs a bulk delete operation -func (r *ETCD) BulkDelete(req []state.DeleteRequest) error { - for i := range req { - err := r.Delete(&req[i]) - if err != nil { - return err - } - } - - return nil -} - -// Set saves state into ETCD -func (r *ETCD) Set(req *state.SetRequest) error { - err := state.CheckRequestOptions(req.Options) - if err != nil { - return err - } - ctx, cancelFn := context.WithTimeout(context.Background(), r.operationTimeout) - defer cancelFn() - var vStr string - b, ok := req.Value.([]byte) - if ok { - vStr = string(b) - } else { - vStr, _ = r.json.MarshalToString(req.Value) - } - - version := req.ETag - // honor client etag - if version != "" { - txn := r.client.KV.Txn(ctx) - _, err = txn.If(clientv3.Compare(clientv3.Version(req.Key), "=", version)).Then(clientv3.OpPut(req.Key, vStr)).Commit() - } else { - _, err = r.client.Put(ctx, req.Key, vStr) - } - - if err != nil { - return err - } - - return nil -} - -// BulkSet performs a bulks save operation -func (r *ETCD) BulkSet(req []state.SetRequest) error { - for i := range req { - err := r.Set(&req[i]) - if err != nil { - return err - } - } - - return nil -} diff --git a/state/etcd/etcd_test.go b/state/etcd/etcd_test.go deleted file mode 100644 index e1535a044..000000000 --- a/state/etcd/etcd_test.go +++ /dev/null @@ -1,71 +0,0 @@ -// ------------------------------------------------------------ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. -// ------------------------------------------------------------ - -package etcd - -import ( - "fmt" - "testing" - - "github.com/stretchr/testify/assert" -) - -// toConfigProperties -func TestToConfigProperties(t *testing.T) { - t.Run("With all required fields", func(t *testing.T) { - properties := map[string]string{ - "endpoints": "localhost:6379", - "dialTimeout": "5s", - } - cp, err := toConfigProperties(properties) - assert.Equal(t, err, nil, fmt.Sprintf("Unexpected error: %v", err)) - assert.NotNil(t, cp, "failed to respond to missing data field") - assert.Equal(t, "localhost:6379", cp.Endpoints, "failed to get endpoints") - assert.Equal(t, "5s", cp.DialTimeout, "failed to get DialTimeout") - }) -} - -// toEtcdConfig -func TestToEtcdConfig(t *testing.T) { - t.Run("With valid fields", func(t *testing.T) { - cp := &configProperties{ - Endpoints: "localhost:6379", - DialTimeout: "5s", - } - _, err := toEtcdConfig(cp) - assert.Equal(t, err, nil, fmt.Sprintf("Unexpected error: %v", err)) - }) - t.Run("With invalid fields", func(t *testing.T) { - cp := &configProperties{} - _, err := toEtcdConfig(cp) - assert.NotNil(t, err, "Expected error due to invalid fields") - }) -} - -// validateRequired -func TestValidateRequired(t *testing.T) { - t.Run("With all required fields", func(t *testing.T) { - configProps := &configProperties{ - Endpoints: "localhost:6379", - DialTimeout: "5s", - } - err := validateRequired(configProps) - assert.Equal(t, nil, err, "failed to read all fields") - }) - t.Run("With missing endpoints", func(t *testing.T) { - configProps := &configProperties{ - DialTimeout: "5s", - } - err := validateRequired(configProps) - assert.NotNil(t, err, "failed to get missing endpoints error") - }) - t.Run("With missing dialTimeout", func(t *testing.T) { - configProps := &configProperties{ - DialTimeout: "5s", - } - err := validateRequired(configProps) - assert.NotNil(t, err, "failed to get invalid dialTimeout error") - }) -}