tikv/kv: add the function NewTxnClient (#79)

Signed-off-by: shirly <AndreMouche@126.com>
This commit is contained in:
Shirly 2021-06-11 17:17:35 +08:00 committed by GitHub
parent d9b5c73d4e
commit 1cb7de4762
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 59 additions and 0 deletions

View File

@ -16,6 +16,7 @@ package tikv
import (
"context"
"crypto/tls"
"fmt"
"math"
"math/rand"
"strconv"
@ -43,6 +44,8 @@ import (
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)
// DCLabelKey indicates the key of label which represents the dc for Store.
@ -156,6 +159,62 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Cl
return store, nil
}
// NewTxnClient creates a txn client with pdAddrs.
func NewTxnClient(pdAddrs []string) (*KVStore, error) {
cfg := config.GetGlobalConfig()
pdClient, err := NewPDClient(pdAddrs)
if err != nil {
return nil, errors.Trace(err)
}
// init uuid
// FIXME: uuid will be a very long and ugly string, simplify it.
uuid := fmt.Sprintf("tikv-%v", pdClient.GetClusterID(context.TODO()))
tlsConfig, err := cfg.Security.ToTLSConfig()
if err != nil {
return nil, errors.Trace(err)
}
spkv, err := NewEtcdSafePointKV(pdAddrs, tlsConfig)
if err != nil {
return nil, errors.Trace(err)
}
s, err := NewKVStore(uuid, pdClient, spkv, NewRPCClient(cfg.Security))
if err != nil {
return nil, errors.Trace(err)
}
if cfg.TxnLocalLatches.Enabled {
s.EnableTxnLocalLatches(cfg.TxnLocalLatches.Capacity)
}
return s, nil
}
// NewPDClient creates pd.Client with pdAddrs.
func NewPDClient(pdAddrs []string) (pd.Client, error) {
cfg := config.GetGlobalConfig()
// init pd-client
pdCli, err := pd.NewClient(pdAddrs, pd.SecurityOption{
CAPath: cfg.Security.ClusterSSLCA,
CertPath: cfg.Security.ClusterSSLCert,
KeyPath: cfg.Security.ClusterSSLKey,
},
pd.WithGRPCDialOptions(
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Duration(cfg.TiKVClient.GrpcKeepAliveTime) * time.Second,
Timeout: time.Duration(cfg.TiKVClient.GrpcKeepAliveTimeout) * time.Second,
}),
),
pd.WithCustomTimeoutOption(time.Duration(cfg.PDClient.PDServerTimeout)*time.Second),
pd.WithForwardingOption(config.GetGlobalConfig().EnableForwarding))
if err != nil {
return nil, errors.Trace(err)
}
pdClient := &CodecPDClient{Client: util.InterceptedPDClient{Client: pdCli}}
return pdClient, nil
}
// EnableTxnLocalLatches enables txn latch. It should be called before using
// the store to serve any requests.
func (s *KVStore) EnableTxnLocalLatches(size uint) {