From 63af15926ba3ffcd8d74d0927de818dc77c75fb8 Mon Sep 17 00:00:00 2001 From: Roberto Rojas Date: Tue, 22 Oct 2019 17:30:15 -0400 Subject: [PATCH] ETCD Implementation of State Store (#42) * etcd: initial commit * etcd: functionality and unit tests * etcd: fixes marshaling data sent to etcd * changes as per PR review --- go.mod | 10 +- go.sum | 14 +++ state/Readme.md | 1 + state/etcd/etcd.go | 197 ++++++++++++++++++++++++++++++++++++++++ state/etcd/etcd_test.go | 71 +++++++++++++++ 5 files changed, 291 insertions(+), 2 deletions(-) create mode 100644 state/etcd/etcd.go create mode 100644 state/etcd/etcd_test.go diff --git a/go.mod b/go.mod index c7c8264ae..bd74548e0 100644 --- a/go.mod +++ b/go.mod @@ -21,13 +21,15 @@ require ( github.com/Sirupsen/logrus v1.0.6 github.com/a8m/documentdb v1.2.0 github.com/aws/aws-sdk-go v1.25.0 + github.com/coreos/etcd v3.3.17+incompatible // indirect + github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect + github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect github.com/creack/pty v1.1.9 // indirect github.com/eapache/go-resiliency v1.2.0 // indirect github.com/eclipse/paho.mqtt.golang v1.2.0 github.com/frankban/quicktest v1.5.0 // indirect github.com/garyburd/redigo v1.6.0 // indirect github.com/go-redis/redis v6.15.5+incompatible - github.com/google/btree v1.0.0 // indirect github.com/google/pprof v0.0.0-20190908185732-236ed259b199 // indirect github.com/google/uuid v1.1.1 github.com/grpc-ecosystem/grpc-gateway v1.11.2 // indirect @@ -47,7 +49,11 @@ require ( github.com/satori/go.uuid v1.2.0 github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271 github.com/stretchr/testify v1.4.0 + go.etcd.io/etcd v3.3.17+incompatible go.opencensus.io v0.22.1 // indirect + go.uber.org/atomic v1.4.0 // indirect + go.uber.org/multierr v1.2.0 // indirect + go.uber.org/zap v1.10.0 // indirect golang.org/x/crypto v0.0.0-20190927123631-a832865fa7ad golang.org/x/exp v0.0.0-20190927203820-447a159532ef // indirect golang.org/x/image v0.0.0-20190910094157-69e4b8554b2a // indirect @@ -59,7 +65,7 @@ require ( google.golang.org/api v0.10.0 google.golang.org/appengine v1.6.4 // indirect google.golang.org/genproto v0.0.0-20190927181202-20e1ac93f88c // indirect - google.golang.org/grpc v1.24.0 // indirect + google.golang.org/grpc v1.24.0 gopkg.in/airbrake/gobrake.v2 v2.0.9 // indirect gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 // indirect diff --git a/go.sum b/go.sum index da2512bb6..385afb610 100644 --- a/go.sum +++ b/go.sum @@ -93,6 +93,12 @@ github.com/census-instrumentation/opencensus-proto v0.2.0/go.mod h1:f6KPmirojxKA github.com/census-instrumentation/opencensus-proto v0.2.1 h1:glEXhBS5PSLLv4IXzLA5yPRVX4bilULVyxxbrfOtDAk= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/coreos/etcd v3.3.17+incompatible h1:f/Z3EoDSx1yjaIjLQGo1diYUlQYSBrrAQ5vP8NjwXwo= +github.com/coreos/etcd v3.3.17+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f h1:JOrtw2xFKzlg+cbHpyrpLDmnN1HqhBfnX7WDiW7eG2c= +github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg= +github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -285,12 +291,20 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= +go.etcd.io/etcd v3.3.17+incompatible h1:g8iRku1SID8QAW8cDlV0L/PkZlw63LSiYEHYHoE6j/s= +go.etcd.io/etcd v3.3.17+incompatible/go.mod h1:yaeTdrJi5lOmYerz05bd8+V7KubZs8YSFZfzsF9A6aI= go.opencensus.io v0.15.0/go.mod h1:UffZAU+4sDEINUGP/B7UfBBkq4fqLu9zXAX7ke6CHW0= go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.1 h1:8dP3SGL7MPB94crU3bEPplMPe83FI4EouesJUeFHv50= go.opencensus.io v0.22.1/go.mod h1:Ap50jQcDJrx6rB6VgeeFPtuPIf3wMRvRfrfYDO6+BmA= +go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= +go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/multierr v1.2.0 h1:6I+W7f5VwC5SV9dNrZ3qXrDB9mD0dyGOi/ZJmYw03T4= +go.uber.org/multierr v1.2.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= +go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181001203147-e3636079e1a4/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181025213731-e84da0312774/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= diff --git a/state/Readme.md b/state/Readme.md index 5b855263f..b8ee7682d 100644 --- a/state/Readme.md +++ b/state/Readme.md @@ -6,6 +6,7 @@ Currently supported state stores are: * Azure CosmosDB * Redis +* Etcd ## Implementing a new State Store diff --git a/state/etcd/etcd.go b/state/etcd/etcd.go new file mode 100644 index 000000000..839a530d7 --- /dev/null +++ b/state/etcd/etcd.go @@ -0,0 +1,197 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. +// ------------------------------------------------------------ + +package etcd + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strings" + "time" + + "github.com/dapr/components-contrib/state" + jsoniter "github.com/json-iterator/go" + "go.etcd.io/etcd/clientv3" + "google.golang.org/grpc" +) + +const defaultOperationTimeout = time.Duration(10 * time.Second) +const defaultSeparator = "," + +var errMissingEndpoints = errors.New("Endpoints are required") +var errInvalidDialTimeout = errors.New("DialTimeout is invalid") + +// ETCD is a state store +type ETCD struct { + json jsoniter.API + client *clientv3.Client + operationTimeout time.Duration +} + +type configProperties struct { + Endpoints string `json:"endpoints"` + DialTimeout string `json:"dialTimeout"` + OperationTimeout string `json:"operationTimeout"` +} + +//--- StateStore --- + +// NewETCD returns a new ETCD state store +func NewETCD() *ETCD { + return &ETCD{ + json: jsoniter.ConfigFastest, + } +} + +// Init does metadata and connection parsing +func (r *ETCD) Init(metadata state.Metadata) error { + cp, err := toConfigProperties(metadata.Properties) + if err != nil { + return err + } + err = validateRequired(cp) + if err != nil { + return err + } + + clientConfig, err := toEtcdConfig(cp) + if err != nil { + return err + } + + client, err := clientv3.New(*clientConfig) + if err != nil { + return err + } + + r.client = client + + ot := defaultOperationTimeout + newOt, err := time.ParseDuration(cp.OperationTimeout) + if err == nil { + r.operationTimeout = newOt + } + r.operationTimeout = ot + + return nil +} + +func toConfigProperties(properties map[string]string) (*configProperties, error) { + b, err := json.Marshal(properties) + if err != nil { + return nil, err + } + + var configProps configProperties + err = json.Unmarshal(b, &configProps) + if err != nil { + return nil, err + } + + return &configProps, nil +} + +func toEtcdConfig(configProps *configProperties) (*clientv3.Config, error) { + endpoints := strings.Split(configProps.Endpoints, defaultSeparator) + dialTimeout, err := time.ParseDuration(configProps.DialTimeout) + if err != nil { + return nil, err + } + + clientConfig := clientv3.Config{ + Endpoints: endpoints, + DialTimeout: dialTimeout, + DialOptions: []grpc.DialOption{grpc.WithBlock()}, + } + + return &clientConfig, nil +} + +func validateRequired(configProps *configProperties) error { + if len(configProps.Endpoints) == 0 { + return errMissingEndpoints + } + + _, err := time.ParseDuration(configProps.DialTimeout) + if err != nil { + return errInvalidDialTimeout + } + + return nil +} + +// Get retrieves state from ETCD with a key +func (r *ETCD) Get(req *state.GetRequest) (*state.GetResponse, error) { + ctx, _ := context.WithTimeout(context.Background(), r.operationTimeout) + resp, err := r.client.Get(ctx, req.Key, clientv3.WithSort(clientv3.SortByVersion, clientv3.SortDescend)) + if err != nil { + return nil, err + } + + if resp.Count == 0 { + return &state.GetResponse{}, nil + } + + return &state.GetResponse{ + Data: resp.Kvs[0].Value, + ETag: fmt.Sprintf("%d", resp.Kvs[0].Version), + }, nil +} + +// Delete performs a delete operation +func (r *ETCD) Delete(req *state.DeleteRequest) error { + ctx, _ := context.WithTimeout(context.Background(), r.operationTimeout) + _, err := r.client.Delete(ctx, req.Key) + if err != nil { + return err + } + + return nil +} + +// BulkDelete performs a bulk delete operation +func (r *ETCD) BulkDelete(req []state.DeleteRequest) error { + for _, re := range req { + err := r.Delete(&re) + if err != nil { + return err + } + } + + return nil +} + +// Set saves state into ETCD +func (r *ETCD) Set(req *state.SetRequest) error { + ctx, _ := context.WithTimeout(context.Background(), r.operationTimeout) + + var vStr string + b, ok := req.Value.([]byte) + if ok { + vStr = string(b) + } else { + vStr, _ = r.json.MarshalToString(req.Value) + } + + _, err := r.client.Put(ctx, req.Key, vStr) + if err != nil { + return err + } + return nil +} + +// BulkSet performs a bulks save operation +func (r *ETCD) BulkSet(req []state.SetRequest) error { + for _, s := range req { + err := r.Set(&s) + if err != nil { + return err + } + } + + return nil +} diff --git a/state/etcd/etcd_test.go b/state/etcd/etcd_test.go new file mode 100644 index 000000000..e1535a044 --- /dev/null +++ b/state/etcd/etcd_test.go @@ -0,0 +1,71 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. +// ------------------------------------------------------------ + +package etcd + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +// toConfigProperties +func TestToConfigProperties(t *testing.T) { + t.Run("With all required fields", func(t *testing.T) { + properties := map[string]string{ + "endpoints": "localhost:6379", + "dialTimeout": "5s", + } + cp, err := toConfigProperties(properties) + assert.Equal(t, err, nil, fmt.Sprintf("Unexpected error: %v", err)) + assert.NotNil(t, cp, "failed to respond to missing data field") + assert.Equal(t, "localhost:6379", cp.Endpoints, "failed to get endpoints") + assert.Equal(t, "5s", cp.DialTimeout, "failed to get DialTimeout") + }) +} + +// toEtcdConfig +func TestToEtcdConfig(t *testing.T) { + t.Run("With valid fields", func(t *testing.T) { + cp := &configProperties{ + Endpoints: "localhost:6379", + DialTimeout: "5s", + } + _, err := toEtcdConfig(cp) + assert.Equal(t, err, nil, fmt.Sprintf("Unexpected error: %v", err)) + }) + t.Run("With invalid fields", func(t *testing.T) { + cp := &configProperties{} + _, err := toEtcdConfig(cp) + assert.NotNil(t, err, "Expected error due to invalid fields") + }) +} + +// validateRequired +func TestValidateRequired(t *testing.T) { + t.Run("With all required fields", func(t *testing.T) { + configProps := &configProperties{ + Endpoints: "localhost:6379", + DialTimeout: "5s", + } + err := validateRequired(configProps) + assert.Equal(t, nil, err, "failed to read all fields") + }) + t.Run("With missing endpoints", func(t *testing.T) { + configProps := &configProperties{ + DialTimeout: "5s", + } + err := validateRequired(configProps) + assert.NotNil(t, err, "failed to get missing endpoints error") + }) + t.Run("With missing dialTimeout", func(t *testing.T) { + configProps := &configProperties{ + DialTimeout: "5s", + } + err := validateRequired(configProps) + assert.NotNil(t, err, "failed to get invalid dialTimeout error") + }) +}