Merge branch 'master' into exporters

This commit is contained in:
Yaron Schneider 2019-10-22 21:40:33 -07:00 committed by GitHub
commit 0b93c5e585
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 291 additions and 1 deletions

9
go.mod
View File

@ -22,6 +22,9 @@ require (
github.com/Sirupsen/logrus v1.0.6
github.com/a8m/documentdb v1.2.0
github.com/aws/aws-sdk-go v1.25.0
github.com/coreos/etcd v3.3.17+incompatible // indirect
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/creack/pty v1.1.9 // indirect
github.com/eapache/go-resiliency v1.2.0 // indirect
github.com/eclipse/paho.mqtt.golang v1.2.0
@ -49,6 +52,10 @@ require (
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271
github.com/stretchr/testify v1.4.0
go.opencensus.io v0.22.1
go.etcd.io/etcd v3.3.17+incompatible
go.uber.org/atomic v1.4.0 // indirect
go.uber.org/multierr v1.2.0 // indirect
go.uber.org/zap v1.10.0 // indirect
golang.org/x/crypto v0.0.0-20190927123631-a832865fa7ad
golang.org/x/exp v0.0.0-20190927203820-447a159532ef // indirect
golang.org/x/image v0.0.0-20190910094157-69e4b8554b2a // indirect
@ -60,7 +67,7 @@ require (
google.golang.org/api v0.10.0
google.golang.org/appengine v1.6.4 // indirect
google.golang.org/genproto v0.0.0-20190927181202-20e1ac93f88c // indirect
google.golang.org/grpc v1.24.0 // indirect
google.golang.org/grpc v1.24.0
gopkg.in/airbrake/gobrake.v2 v2.0.9 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 // indirect

14
go.sum
View File

@ -96,6 +96,12 @@ github.com/census-instrumentation/opencensus-proto v0.2.0/go.mod h1:f6KPmirojxKA
github.com/census-instrumentation/opencensus-proto v0.2.1 h1:glEXhBS5PSLLv4IXzLA5yPRVX4bilULVyxxbrfOtDAk=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/coreos/etcd v3.3.17+incompatible h1:f/Z3EoDSx1yjaIjLQGo1diYUlQYSBrrAQ5vP8NjwXwo=
github.com/coreos/etcd v3.3.17+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f h1:JOrtw2xFKzlg+cbHpyrpLDmnN1HqhBfnX7WDiW7eG2c=
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@ -300,12 +306,20 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
go.etcd.io/etcd v3.3.17+incompatible h1:g8iRku1SID8QAW8cDlV0L/PkZlw63LSiYEHYHoE6j/s=
go.etcd.io/etcd v3.3.17+incompatible/go.mod h1:yaeTdrJi5lOmYerz05bd8+V7KubZs8YSFZfzsF9A6aI=
go.opencensus.io v0.15.0/go.mod h1:UffZAU+4sDEINUGP/B7UfBBkq4fqLu9zXAX7ke6CHW0=
go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.1 h1:8dP3SGL7MPB94crU3bEPplMPe83FI4EouesJUeFHv50=
go.opencensus.io v0.22.1/go.mod h1:Ap50jQcDJrx6rB6VgeeFPtuPIf3wMRvRfrfYDO6+BmA=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.2.0 h1:6I+W7f5VwC5SV9dNrZ3qXrDB9mD0dyGOi/ZJmYw03T4=
go.uber.org/multierr v1.2.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181001203147-e3636079e1a4/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181025213731-e84da0312774/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=

View File

@ -6,6 +6,7 @@ Currently supported state stores are:
* Azure CosmosDB
* Redis
* Etcd
## Implementing a new State Store

197
state/etcd/etcd.go Normal file
View File

@ -0,0 +1,197 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package etcd
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
"github.com/dapr/components-contrib/state"
jsoniter "github.com/json-iterator/go"
"go.etcd.io/etcd/clientv3"
"google.golang.org/grpc"
)
const defaultOperationTimeout = time.Duration(10 * time.Second)
const defaultSeparator = ","
var errMissingEndpoints = errors.New("Endpoints are required")
var errInvalidDialTimeout = errors.New("DialTimeout is invalid")
// ETCD is a state store
type ETCD struct {
json jsoniter.API
client *clientv3.Client
operationTimeout time.Duration
}
type configProperties struct {
Endpoints string `json:"endpoints"`
DialTimeout string `json:"dialTimeout"`
OperationTimeout string `json:"operationTimeout"`
}
//--- StateStore ---
// NewETCD returns a new ETCD state store
func NewETCD() *ETCD {
return &ETCD{
json: jsoniter.ConfigFastest,
}
}
// Init does metadata and connection parsing
func (r *ETCD) Init(metadata state.Metadata) error {
cp, err := toConfigProperties(metadata.Properties)
if err != nil {
return err
}
err = validateRequired(cp)
if err != nil {
return err
}
clientConfig, err := toEtcdConfig(cp)
if err != nil {
return err
}
client, err := clientv3.New(*clientConfig)
if err != nil {
return err
}
r.client = client
ot := defaultOperationTimeout
newOt, err := time.ParseDuration(cp.OperationTimeout)
if err == nil {
r.operationTimeout = newOt
}
r.operationTimeout = ot
return nil
}
func toConfigProperties(properties map[string]string) (*configProperties, error) {
b, err := json.Marshal(properties)
if err != nil {
return nil, err
}
var configProps configProperties
err = json.Unmarshal(b, &configProps)
if err != nil {
return nil, err
}
return &configProps, nil
}
func toEtcdConfig(configProps *configProperties) (*clientv3.Config, error) {
endpoints := strings.Split(configProps.Endpoints, defaultSeparator)
dialTimeout, err := time.ParseDuration(configProps.DialTimeout)
if err != nil {
return nil, err
}
clientConfig := clientv3.Config{
Endpoints: endpoints,
DialTimeout: dialTimeout,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
}
return &clientConfig, nil
}
func validateRequired(configProps *configProperties) error {
if len(configProps.Endpoints) == 0 {
return errMissingEndpoints
}
_, err := time.ParseDuration(configProps.DialTimeout)
if err != nil {
return errInvalidDialTimeout
}
return nil
}
// Get retrieves state from ETCD with a key
func (r *ETCD) Get(req *state.GetRequest) (*state.GetResponse, error) {
ctx, _ := context.WithTimeout(context.Background(), r.operationTimeout)
resp, err := r.client.Get(ctx, req.Key, clientv3.WithSort(clientv3.SortByVersion, clientv3.SortDescend))
if err != nil {
return nil, err
}
if resp.Count == 0 {
return &state.GetResponse{}, nil
}
return &state.GetResponse{
Data: resp.Kvs[0].Value,
ETag: fmt.Sprintf("%d", resp.Kvs[0].Version),
}, nil
}
// Delete performs a delete operation
func (r *ETCD) Delete(req *state.DeleteRequest) error {
ctx, _ := context.WithTimeout(context.Background(), r.operationTimeout)
_, err := r.client.Delete(ctx, req.Key)
if err != nil {
return err
}
return nil
}
// BulkDelete performs a bulk delete operation
func (r *ETCD) BulkDelete(req []state.DeleteRequest) error {
for _, re := range req {
err := r.Delete(&re)
if err != nil {
return err
}
}
return nil
}
// Set saves state into ETCD
func (r *ETCD) Set(req *state.SetRequest) error {
ctx, _ := context.WithTimeout(context.Background(), r.operationTimeout)
var vStr string
b, ok := req.Value.([]byte)
if ok {
vStr = string(b)
} else {
vStr, _ = r.json.MarshalToString(req.Value)
}
_, err := r.client.Put(ctx, req.Key, vStr)
if err != nil {
return err
}
return nil
}
// BulkSet performs a bulks save operation
func (r *ETCD) BulkSet(req []state.SetRequest) error {
for _, s := range req {
err := r.Set(&s)
if err != nil {
return err
}
}
return nil
}

71
state/etcd/etcd_test.go Normal file
View File

@ -0,0 +1,71 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package etcd
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
)
// toConfigProperties
func TestToConfigProperties(t *testing.T) {
t.Run("With all required fields", func(t *testing.T) {
properties := map[string]string{
"endpoints": "localhost:6379",
"dialTimeout": "5s",
}
cp, err := toConfigProperties(properties)
assert.Equal(t, err, nil, fmt.Sprintf("Unexpected error: %v", err))
assert.NotNil(t, cp, "failed to respond to missing data field")
assert.Equal(t, "localhost:6379", cp.Endpoints, "failed to get endpoints")
assert.Equal(t, "5s", cp.DialTimeout, "failed to get DialTimeout")
})
}
// toEtcdConfig
func TestToEtcdConfig(t *testing.T) {
t.Run("With valid fields", func(t *testing.T) {
cp := &configProperties{
Endpoints: "localhost:6379",
DialTimeout: "5s",
}
_, err := toEtcdConfig(cp)
assert.Equal(t, err, nil, fmt.Sprintf("Unexpected error: %v", err))
})
t.Run("With invalid fields", func(t *testing.T) {
cp := &configProperties{}
_, err := toEtcdConfig(cp)
assert.NotNil(t, err, "Expected error due to invalid fields")
})
}
// validateRequired
func TestValidateRequired(t *testing.T) {
t.Run("With all required fields", func(t *testing.T) {
configProps := &configProperties{
Endpoints: "localhost:6379",
DialTimeout: "5s",
}
err := validateRequired(configProps)
assert.Equal(t, nil, err, "failed to read all fields")
})
t.Run("With missing endpoints", func(t *testing.T) {
configProps := &configProperties{
DialTimeout: "5s",
}
err := validateRequired(configProps)
assert.NotNil(t, err, "failed to get missing endpoints error")
})
t.Run("With missing dialTimeout", func(t *testing.T) {
configProps := &configProperties{
DialTimeout: "5s",
}
err := validateRequired(configProps)
assert.NotNil(t, err, "failed to get invalid dialTimeout error")
})
}