From 1cb7de47621f861ff3e906203c0837461a58ad64 Mon Sep 17 00:00:00 2001 From: Shirly Date: Fri, 11 Jun 2021 17:17:35 +0800 Subject: [PATCH] tikv/kv: add the function NewTxnClient (#79) Signed-off-by: shirly --- tikv/kv.go | 59 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/tikv/kv.go b/tikv/kv.go index 518f8ed2..e87c2269 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -16,6 +16,7 @@ package tikv import ( "context" "crypto/tls" + "fmt" "math" "math/rand" "strconv" @@ -43,6 +44,8 @@ import ( pd "github.com/tikv/pd/client" "go.etcd.io/etcd/clientv3" "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" ) // DCLabelKey indicates the key of label which represents the dc for Store. @@ -156,6 +159,62 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Cl return store, nil } +// NewTxnClient creates a txn client with pdAddrs. +func NewTxnClient(pdAddrs []string) (*KVStore, error) { + + cfg := config.GetGlobalConfig() + pdClient, err := NewPDClient(pdAddrs) + if err != nil { + return nil, errors.Trace(err) + } + // init uuid + // FIXME: uuid will be a very long and ugly string, simplify it. + uuid := fmt.Sprintf("tikv-%v", pdClient.GetClusterID(context.TODO())) + tlsConfig, err := cfg.Security.ToTLSConfig() + if err != nil { + return nil, errors.Trace(err) + } + + spkv, err := NewEtcdSafePointKV(pdAddrs, tlsConfig) + if err != nil { + return nil, errors.Trace(err) + } + + s, err := NewKVStore(uuid, pdClient, spkv, NewRPCClient(cfg.Security)) + if err != nil { + return nil, errors.Trace(err) + } + if cfg.TxnLocalLatches.Enabled { + s.EnableTxnLocalLatches(cfg.TxnLocalLatches.Capacity) + } + return s, nil +} + +// NewPDClient creates pd.Client with pdAddrs. +func NewPDClient(pdAddrs []string) (pd.Client, error) { + cfg := config.GetGlobalConfig() + // init pd-client + pdCli, err := pd.NewClient(pdAddrs, pd.SecurityOption{ + CAPath: cfg.Security.ClusterSSLCA, + CertPath: cfg.Security.ClusterSSLCert, + KeyPath: cfg.Security.ClusterSSLKey, + }, + pd.WithGRPCDialOptions( + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: time.Duration(cfg.TiKVClient.GrpcKeepAliveTime) * time.Second, + Timeout: time.Duration(cfg.TiKVClient.GrpcKeepAliveTimeout) * time.Second, + }), + ), + pd.WithCustomTimeoutOption(time.Duration(cfg.PDClient.PDServerTimeout)*time.Second), + pd.WithForwardingOption(config.GetGlobalConfig().EnableForwarding)) + + if err != nil { + return nil, errors.Trace(err) + } + pdClient := &CodecPDClient{Client: util.InterceptedPDClient{Client: pdCli}} + return pdClient, nil +} + // EnableTxnLocalLatches enables txn latch. It should be called before using // the store to serve any requests. func (s *KVStore) EnableTxnLocalLatches(size uint) {