451 lines
12 KiB
Go
451 lines
12 KiB
Go
/*
|
|
Copyright 2021 The Dapr Authors
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package etcd
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"crypto/x509"
|
|
"errors"
|
|
"fmt"
|
|
"reflect"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
clientv3 "go.etcd.io/etcd/client/v3"
|
|
|
|
"github.com/dapr/components-contrib/metadata"
|
|
"github.com/dapr/components-contrib/state"
|
|
stateutils "github.com/dapr/components-contrib/state/utils"
|
|
"github.com/dapr/kit/logger"
|
|
kitmd "github.com/dapr/kit/metadata"
|
|
"github.com/dapr/kit/ptr"
|
|
"github.com/dapr/kit/utils"
|
|
)
|
|
|
|
// Etcd is a state store implementation for Etcd.
|
|
type Etcd struct {
|
|
state.BulkStore
|
|
|
|
client *clientv3.Client
|
|
keyPrefixPath string
|
|
features []state.Feature
|
|
logger logger.Logger
|
|
schema schemaMarshaller
|
|
maxTxnOps int
|
|
}
|
|
|
|
type etcdConfig struct {
|
|
Endpoints string `json:"endpoints"`
|
|
KeyPrefixPath string `json:"keyPrefixPath"`
|
|
// TLS
|
|
TLSEnable string `json:"tlsEnable"`
|
|
CA string `json:"ca"`
|
|
Cert string `json:"cert"`
|
|
Key string `json:"key"`
|
|
// Transaction server options
|
|
MaxTxnOps int `mapstructure:"maxTxnOps"`
|
|
}
|
|
|
|
// NewEtcdStateStoreV1 returns a new etcd state store for schema V1.
|
|
func NewEtcdStateStoreV1(logger logger.Logger) state.Store {
|
|
return newETCD(logger, schemaV1{})
|
|
}
|
|
|
|
// NewEtcdStateStoreV2 returns a new etcd state store for schema V2.
|
|
func NewEtcdStateStoreV2(logger logger.Logger) state.Store {
|
|
return newETCD(logger, schemaV2{})
|
|
}
|
|
|
|
func newETCD(logger logger.Logger, schema schemaMarshaller) state.Store {
|
|
s := &Etcd{
|
|
schema: schema,
|
|
logger: logger,
|
|
features: []state.Feature{
|
|
state.FeatureETag,
|
|
state.FeatureTransactional,
|
|
state.FeatureTTL,
|
|
},
|
|
}
|
|
s.BulkStore = state.NewDefaultBulkStore(s)
|
|
return s
|
|
}
|
|
|
|
// Init does metadata and config parsing and initializes the
|
|
// Etcd client.
|
|
func (e *Etcd) Init(_ context.Context, metadata state.Metadata) error {
|
|
etcdConfig, err := metadataToConfig(metadata.Properties)
|
|
if err != nil {
|
|
return fmt.Errorf("couldn't convert metadata properties: %w", err)
|
|
}
|
|
|
|
e.client, err = e.ParseClientFromConfig(etcdConfig)
|
|
if err != nil {
|
|
return fmt.Errorf("initializing etcd client: %w", err)
|
|
}
|
|
|
|
e.keyPrefixPath = etcdConfig.KeyPrefixPath
|
|
e.maxTxnOps = etcdConfig.MaxTxnOps
|
|
|
|
return nil
|
|
}
|
|
|
|
func (e *Etcd) ParseClientFromConfig(etcdConfig *etcdConfig) (*clientv3.Client, error) {
|
|
endpoints := strings.Split(etcdConfig.Endpoints, ",")
|
|
if len(endpoints) == 0 || endpoints[0] == "" {
|
|
return nil, errors.New("endpoints required")
|
|
}
|
|
|
|
var tlsConfig *tls.Config
|
|
if utils.IsTruthy(etcdConfig.TLSEnable) {
|
|
if etcdConfig.Cert != "" && etcdConfig.Key != "" && etcdConfig.CA != "" {
|
|
var err error
|
|
tlsConfig, err = NewTLSConfig(etcdConfig.Cert, etcdConfig.Key, etcdConfig.CA)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("tls authentication error: %w", err)
|
|
}
|
|
} else {
|
|
return nil, errors.New("tls authentication information is incomplete")
|
|
}
|
|
}
|
|
|
|
config := clientv3.Config{
|
|
Endpoints: endpoints,
|
|
DialTimeout: 5 * time.Second,
|
|
TLS: tlsConfig,
|
|
}
|
|
client, err := clientv3.New(config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return client, nil
|
|
}
|
|
|
|
// Features returns the features available in this state store.
|
|
func (e *Etcd) Features() []state.Feature {
|
|
return e.features
|
|
}
|
|
|
|
func metadataToConfig(connInfo map[string]string) (*etcdConfig, error) {
|
|
m := &etcdConfig{
|
|
// This is the default value for maximum ops per transaction, configurtable via etcd server flag --max-txn-ops.
|
|
MaxTxnOps: 128,
|
|
}
|
|
err := kitmd.DecodeMetadata(connInfo, m)
|
|
return m, err
|
|
}
|
|
|
|
// Get retrieves a Etcd KV item.
|
|
func (e *Etcd) Get(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) {
|
|
keyWithPath := e.keyPrefixPath + "/" + req.Key
|
|
|
|
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
defer cancel()
|
|
|
|
resp, err := e.client.Get(ctx, keyWithPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("couldn't get key %s: %w", keyWithPath, err)
|
|
}
|
|
|
|
if resp == nil || len(resp.Kvs) == 0 {
|
|
return &state.GetResponse{}, nil
|
|
}
|
|
|
|
data, metadata, err := e.schema.decode(resp.Kvs[0].Value)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &state.GetResponse{
|
|
Data: data,
|
|
ETag: ptr.Of(strconv.Itoa(int(resp.Kvs[0].ModRevision))),
|
|
Metadata: metadata,
|
|
}, nil
|
|
}
|
|
|
|
// Set saves a Etcd KV item.
|
|
func (e *Etcd) Set(ctx context.Context, req *state.SetRequest) error {
|
|
ttlInSeconds, err := e.doSetValidateParameters(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
keyWithPath := e.keyPrefixPath + "/" + req.Key
|
|
err = e.doValidateEtag(keyWithPath, req.ETag, req.Options.Concurrency)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return e.doSet(ctx, keyWithPath, req.Value, req.ETag, ttlInSeconds)
|
|
}
|
|
|
|
func (e *Etcd) doSet(ctx context.Context, key string, val any, etag *string, ttlInSeconds *int64) error {
|
|
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
defer cancel()
|
|
|
|
reqVal, err := e.schema.encode(val, ttlInSeconds)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var leaseID clientv3.LeaseID
|
|
if ttlInSeconds != nil {
|
|
var resp *clientv3.LeaseGrantResponse
|
|
resp, err = e.client.Grant(ctx, *ttlInSeconds)
|
|
if err != nil {
|
|
return fmt.Errorf("couldn't grant lease %s: %w", key, err)
|
|
}
|
|
leaseID = resp.ID
|
|
}
|
|
|
|
if etag != nil {
|
|
etag, _ := strconv.ParseInt(*etag, 10, 64)
|
|
_, err = e.client.Txn(ctx).
|
|
If(clientv3.Compare(clientv3.ModRevision(key), "=", etag)).
|
|
Then(clientv3.OpPut(key, reqVal, clientv3.WithLease(leaseID))).
|
|
Commit()
|
|
} else {
|
|
_, err = e.client.Put(ctx, key, reqVal, clientv3.WithLease(leaseID))
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("couldn't set key %s: %w", key, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (e *Etcd) doSetValidateParameters(req *state.SetRequest) (*int64, error) {
|
|
err := state.CheckRequestOptions(req.Options)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ttlInSeconds, err := stateutils.ParseTTL64(req.Metadata)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return ttlInSeconds, nil
|
|
}
|
|
|
|
// Delete performes a Etcd KV delete operation.
|
|
func (e *Etcd) Delete(ctx context.Context, req *state.DeleteRequest) error {
|
|
if err := state.CheckRequestOptions(req.Options); err != nil {
|
|
return err
|
|
}
|
|
|
|
keyWithPath := e.keyPrefixPath + "/" + req.Key
|
|
|
|
if err := e.doValidateEtag(keyWithPath, req.ETag, req.Options.Concurrency); err != nil {
|
|
return err
|
|
}
|
|
|
|
return e.doDelete(ctx, keyWithPath, req.ETag)
|
|
}
|
|
|
|
func (e *Etcd) doDelete(ctx context.Context, key string, etag *string) error {
|
|
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
defer cancel()
|
|
|
|
var err error
|
|
if etag != nil {
|
|
etag, _ := strconv.ParseInt(*etag, 10, 64)
|
|
_, err = e.client.Txn(ctx).
|
|
If(clientv3.Compare(clientv3.ModRevision(key), "=", etag)).
|
|
Then(clientv3.OpDelete(key)).
|
|
Commit()
|
|
} else {
|
|
_, err = e.client.Delete(ctx, key)
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("couldn't delete key %s: %w", key, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (e *Etcd) doValidateEtag(key string, etag *string, concurrency string) error {
|
|
hasEtag := etag != nil && *etag != ""
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
if concurrency == state.FirstWrite && !hasEtag {
|
|
item, err := e.client.Get(ctx, key)
|
|
if err != nil {
|
|
return fmt.Errorf("couldn't get key %s: %w", key, err)
|
|
} else if item != nil && len(item.Kvs) != 0 {
|
|
return state.NewETagError(state.ETagMismatch, errors.New("item already exists and no etag was passed"))
|
|
} else {
|
|
return nil
|
|
}
|
|
} else if hasEtag {
|
|
item, err := e.client.Get(ctx, key)
|
|
if err != nil {
|
|
return fmt.Errorf("couldn't get key %s: %w", key, err)
|
|
}
|
|
if item == nil || len(item.Kvs) == 0 {
|
|
return state.NewETagError(state.ETagMismatch, fmt.Errorf("state not exist or expired for key=%s", key))
|
|
}
|
|
currentEtag := strconv.Itoa(int(item.Kvs[0].ModRevision))
|
|
if currentEtag != *etag {
|
|
return state.NewETagError(state.ETagMismatch, fmt.Errorf(
|
|
"state etag not match for key=%s: current=%s, expect=%s", key, currentEtag, *etag))
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (e *Etcd) GetComponentMetadata() (metadataInfo metadata.MetadataMap) {
|
|
metadataStruct := etcdConfig{}
|
|
metadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, metadata.StateStoreType)
|
|
return
|
|
}
|
|
|
|
func (e *Etcd) Close() error {
|
|
if e.client == nil {
|
|
return nil
|
|
}
|
|
|
|
return e.client.Close()
|
|
}
|
|
|
|
func (e *Etcd) Ping() error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
if _, err := e.client.Get(ctx, "health"); err != nil {
|
|
return fmt.Errorf("etcd store: error connecting to etcd at %s: %w", e.client.Endpoints(), err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// MultiMaxSize returns the maximum number of operations allowed in a transaction.
|
|
// For Etcd the default is 128, but this can be configured via the server flag --max-txn-ops.
|
|
// As such we are using the component metadata value maxTxnOps.
|
|
func (e *Etcd) MultiMaxSize() int {
|
|
return e.maxTxnOps
|
|
}
|
|
|
|
// Multi performs a transactional operation. succeeds only if all operations succeed, and fails if one or more operations fail.
|
|
func (e *Etcd) Multi(ctx context.Context, request *state.TransactionalStateRequest) error {
|
|
if len(request.Operations) == 0 {
|
|
return nil
|
|
}
|
|
|
|
ops := make([]clientv3.Op, 0, len(request.Operations))
|
|
|
|
for _, o := range request.Operations {
|
|
switch req := o.(type) {
|
|
case state.SetRequest:
|
|
ttlInSeconds, err := e.doSetValidateParameters(&req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
keyWithPath := e.keyPrefixPath + "/" + req.Key
|
|
err = e.doValidateEtag(keyWithPath, req.ETag, req.Options.Concurrency)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
reqVal, err := e.schema.encode(req.Value, ttlInSeconds)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var cmp clientv3.Cmp
|
|
if req.HasETag() {
|
|
etag, _ := strconv.ParseInt(*req.ETag, 10, 64)
|
|
cmp = clientv3.Compare(clientv3.ModRevision(keyWithPath), "=", etag)
|
|
}
|
|
if ttlInSeconds != nil {
|
|
resp, err := e.client.Grant(ctx, *ttlInSeconds)
|
|
if err != nil {
|
|
return fmt.Errorf("couldn't grant lease %s: %w", keyWithPath, err)
|
|
}
|
|
put := clientv3.OpPut(keyWithPath, reqVal, clientv3.WithLease(resp.ID))
|
|
if req.HasETag() {
|
|
ops = append(ops, clientv3.OpTxn([]clientv3.Cmp{cmp}, []clientv3.Op{put}, nil))
|
|
} else {
|
|
ops = append(ops, clientv3.OpTxn(nil, []clientv3.Op{put}, nil))
|
|
}
|
|
} else {
|
|
put := clientv3.OpPut(keyWithPath, reqVal)
|
|
if req.HasETag() {
|
|
ops = append(ops, clientv3.OpTxn([]clientv3.Cmp{cmp}, []clientv3.Op{put}, nil))
|
|
} else {
|
|
ops = append(ops, clientv3.OpTxn(nil, []clientv3.Op{put}, nil))
|
|
}
|
|
}
|
|
case state.DeleteRequest:
|
|
if err := state.CheckRequestOptions(req.Options); err != nil {
|
|
return err
|
|
}
|
|
|
|
keyWithPath := e.keyPrefixPath + "/" + req.Key
|
|
err := e.doValidateEtag(keyWithPath, req.ETag, req.Options.Concurrency)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
del := clientv3.OpDelete(keyWithPath)
|
|
if req.HasETag() {
|
|
etag, _ := strconv.ParseInt(*req.ETag, 10, 64)
|
|
cmp := clientv3.Compare(clientv3.ModRevision(keyWithPath), "=", etag)
|
|
ops = append(ops, clientv3.OpTxn([]clientv3.Cmp{cmp}, []clientv3.Op{del}, nil))
|
|
} else {
|
|
ops = append(ops, clientv3.OpTxn(nil, []clientv3.Op{del}, nil))
|
|
}
|
|
}
|
|
}
|
|
|
|
_, err := e.client.Txn(ctx).Then(ops...).Commit()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func NewTLSConfig(clientCert, clientKey, caCert string) (*tls.Config, error) {
|
|
valid := false
|
|
|
|
config := &tls.Config{
|
|
MinVersion: tls.VersionTLS12,
|
|
}
|
|
|
|
if clientCert != "" && clientKey != "" {
|
|
key := []byte(clientKey)
|
|
cert, err := tls.X509KeyPair([]byte(clientCert), key)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error parse X509KeyPair: %w", err)
|
|
}
|
|
config.Certificates = []tls.Certificate{cert}
|
|
valid = true
|
|
}
|
|
|
|
if caCert != "" {
|
|
caCertPool := x509.NewCertPool()
|
|
caCertPool.AppendCertsFromPEM([]byte(caCert))
|
|
config.RootCAs = caCertPool
|
|
valid = true
|
|
}
|
|
|
|
if !valid {
|
|
config = nil
|
|
}
|
|
|
|
return config, nil
|
|
}
|