mirror of https://github.com/tikv/client-go.git
1051 lines
32 KiB
Go
1051 lines
32 KiB
Go
// Copyright 2021 TiKV Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
// NOTE: The code in this file is based on code from the
|
|
// TiDB project, licensed under the Apache License v 2.0
|
|
//
|
|
// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/client/client.go
|
|
//
|
|
|
|
// Copyright 2016 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
// Package client provides tcp connection to kvserver.
|
|
package client
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"runtime/trace"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
|
|
"github.com/opentracing/opentracing-go"
|
|
"github.com/pingcap/kvproto/pkg/coprocessor"
|
|
"github.com/pingcap/kvproto/pkg/debugpb"
|
|
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
|
"github.com/pingcap/kvproto/pkg/mpp"
|
|
"github.com/pingcap/kvproto/pkg/tikvpb"
|
|
"github.com/pkg/errors"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/tikv/client-go/v2/config"
|
|
tikverr "github.com/tikv/client-go/v2/error"
|
|
"github.com/tikv/client-go/v2/internal/apicodec"
|
|
"github.com/tikv/client-go/v2/internal/logutil"
|
|
"github.com/tikv/client-go/v2/metrics"
|
|
"github.com/tikv/client-go/v2/tikvrpc"
|
|
"github.com/tikv/client-go/v2/util"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/backoff"
|
|
"google.golang.org/grpc/connectivity"
|
|
"google.golang.org/grpc/credentials"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
"google.golang.org/grpc/encoding/gzip"
|
|
"google.golang.org/grpc/experimental"
|
|
"google.golang.org/grpc/keepalive"
|
|
"google.golang.org/grpc/metadata"
|
|
)
|
|
|
|
// MaxRecvMsgSize set max gRPC receive message size received from server. If any message size is larger than
|
|
// current value, an error will be reported from gRPC.
|
|
var MaxRecvMsgSize = math.MaxInt64 - 1
|
|
|
|
// Timeout durations.
|
|
const (
|
|
dialTimeout = 5 * time.Second
|
|
ReadTimeoutShort = 30 * time.Second // For requests that read/write several key-values.
|
|
ReadTimeoutMedium = 60 * time.Second // For requests that may need scan region.
|
|
|
|
// MaxWriteExecutionTime is the MaxExecutionDurationMs field for write requests.
|
|
// Because the last deadline check is before proposing, let us give it 10 more seconds
|
|
// after proposing.
|
|
MaxWriteExecutionTime = ReadTimeoutShort - 10*time.Second
|
|
)
|
|
|
|
// forwardMetadataKey is the key of gRPC metadata which represents a forwarded request.
|
|
const forwardMetadataKey = "tikv-forwarded-host"
|
|
|
|
// Client is a client that sends RPC.
|
|
// It should not be used after calling Close().
|
|
type Client interface {
|
|
// Close should release all data.
|
|
Close() error
|
|
// CloseAddr closes gRPC connections to the address. It will reconnect the next time it's used.
|
|
CloseAddr(addr string) error
|
|
// SendRequest sends Request.
|
|
SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error)
|
|
// SetEventListener registers an event listener for the Client instance. If it's called more than once, the
|
|
// previously set one will be replaced.
|
|
SetEventListener(listener ClientEventListener)
|
|
}
|
|
|
|
// ClientEventListener is a listener to handle events produced by `Client`.
|
|
type ClientEventListener interface {
|
|
// OnHealthFeedback is called when `Client` receives a response that carries the HealthFeedback information.
|
|
OnHealthFeedback(feedback *kvrpcpb.HealthFeedback)
|
|
}
|
|
|
|
// ClientExt is a client has extended interfaces.
|
|
type ClientExt interface {
|
|
// CloseAddrVer closes gRPC connections to the address with additional `ver` parameter.
|
|
// Each new connection will have an incremented `ver` value, and attempts to close a previous `ver` will be ignored.
|
|
// Passing `math.MaxUint64` as the `ver` parameter will forcefully close all connections to the address.
|
|
CloseAddrVer(addr string, ver uint64) error
|
|
}
|
|
|
|
// ErrConn wraps error with target address and version of the connection.
|
|
type ErrConn struct {
|
|
Err error
|
|
Addr string
|
|
Ver uint64
|
|
}
|
|
|
|
func (e *ErrConn) Error() string {
|
|
return fmt.Sprintf("[%s](%d) %s", e.Addr, e.Ver, e.Err.Error())
|
|
}
|
|
|
|
func (e *ErrConn) Cause() error {
|
|
return e.Err
|
|
}
|
|
|
|
func (e *ErrConn) Unwrap() error {
|
|
return e.Err
|
|
}
|
|
|
|
func WrapErrConn(err error, conn *connArray) error {
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
return &ErrConn{
|
|
Err: err,
|
|
Addr: conn.target,
|
|
Ver: conn.ver,
|
|
}
|
|
}
|
|
|
|
type connArray struct {
|
|
// The target host.
|
|
target string
|
|
// version of the connection array, increase by 1 when reconnect.
|
|
ver uint64
|
|
|
|
index uint32
|
|
v []*monitoredConn
|
|
// streamTimeout binds with a background goroutine to process coprocessor streaming timeout.
|
|
streamTimeout chan *tikvrpc.Lease
|
|
dialTimeout time.Duration
|
|
// batchConn is not null when batch is enabled.
|
|
*batchConn
|
|
done chan struct{}
|
|
|
|
monitor *connMonitor
|
|
|
|
metrics struct {
|
|
rpcLatHist *rpcMetrics
|
|
rpcSrcLatSum sync.Map
|
|
rpcNetLatExternal prometheus.Observer
|
|
rpcNetLatInternal prometheus.Observer
|
|
}
|
|
}
|
|
|
|
func newConnArray(maxSize uint, addr string, ver uint64, security config.Security,
|
|
idleNotify *uint32, enableBatch bool, dialTimeout time.Duration, m *connMonitor, eventListener *atomic.Pointer[ClientEventListener], opts []grpc.DialOption) (*connArray, error) {
|
|
a := &connArray{
|
|
ver: ver,
|
|
index: 0,
|
|
v: make([]*monitoredConn, maxSize),
|
|
streamTimeout: make(chan *tikvrpc.Lease, 1024),
|
|
done: make(chan struct{}),
|
|
dialTimeout: dialTimeout,
|
|
monitor: m,
|
|
}
|
|
a.metrics.rpcLatHist = deriveRPCMetrics(metrics.TiKVSendReqHistogram.MustCurryWith(prometheus.Labels{metrics.LblStore: addr}))
|
|
a.metrics.rpcNetLatExternal = metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(addr, "false")
|
|
a.metrics.rpcNetLatInternal = metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(addr, "true")
|
|
if err := a.Init(addr, security, idleNotify, enableBatch, eventListener, opts...); err != nil {
|
|
return nil, err
|
|
}
|
|
return a, nil
|
|
}
|
|
|
|
type connMonitor struct {
|
|
m sync.Map
|
|
loopOnce sync.Once
|
|
stopOnce sync.Once
|
|
stop chan struct{}
|
|
}
|
|
|
|
func (c *connMonitor) AddConn(conn *monitoredConn) {
|
|
c.m.Store(conn.Name, conn)
|
|
}
|
|
|
|
func (c *connMonitor) RemoveConn(conn *monitoredConn) {
|
|
c.m.Delete(conn.Name)
|
|
for state := connectivity.Idle; state <= connectivity.Shutdown; state++ {
|
|
metrics.TiKVGrpcConnectionState.WithLabelValues(conn.Name, conn.Target(), state.String()).Set(0)
|
|
}
|
|
}
|
|
|
|
func (c *connMonitor) Start() {
|
|
c.loopOnce.Do(
|
|
func() {
|
|
c.stop = make(chan struct{})
|
|
go c.start()
|
|
},
|
|
)
|
|
}
|
|
|
|
func (c *connMonitor) Stop() {
|
|
c.stopOnce.Do(
|
|
func() {
|
|
if c.stop != nil {
|
|
close(c.stop)
|
|
}
|
|
},
|
|
)
|
|
}
|
|
|
|
func (c *connMonitor) start() {
|
|
|
|
ticker := time.NewTicker(time.Second)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
c.m.Range(func(_, value interface{}) bool {
|
|
conn := value.(*monitoredConn)
|
|
nowState := conn.GetState()
|
|
for state := connectivity.Idle; state <= connectivity.Shutdown; state++ {
|
|
if state == nowState {
|
|
metrics.TiKVGrpcConnectionState.WithLabelValues(conn.Name, conn.Target(), nowState.String()).Set(1)
|
|
} else {
|
|
metrics.TiKVGrpcConnectionState.WithLabelValues(conn.Name, conn.Target(), state.String()).Set(0)
|
|
}
|
|
}
|
|
return true
|
|
})
|
|
case <-c.stop:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
type monitoredConn struct {
|
|
*grpc.ClientConn
|
|
Name string
|
|
}
|
|
|
|
func (a *connArray) monitoredDial(ctx context.Context, connName, target string, opts ...grpc.DialOption) (conn *monitoredConn, err error) {
|
|
conn = &monitoredConn{
|
|
Name: connName,
|
|
}
|
|
conn.ClientConn, err = grpc.DialContext(ctx, target, opts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
a.monitor.AddConn(conn)
|
|
return conn, nil
|
|
}
|
|
|
|
func (c *monitoredConn) Close() error {
|
|
if c.ClientConn != nil {
|
|
err := c.ClientConn.Close()
|
|
logutil.BgLogger().Debug("close gRPC connection", zap.String("target", c.Name), zap.Error(err))
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (a *connArray) Init(addr string, security config.Security, idleNotify *uint32, enableBatch bool, eventListener *atomic.Pointer[ClientEventListener], opts ...grpc.DialOption) error {
|
|
a.target = addr
|
|
|
|
opt := grpc.WithTransportCredentials(insecure.NewCredentials())
|
|
if len(security.ClusterSSLCA) != 0 {
|
|
tlsConfig, err := security.ToTLSConfig()
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
opt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))
|
|
}
|
|
|
|
cfg := config.GetGlobalConfig()
|
|
var (
|
|
unaryInterceptor grpc.UnaryClientInterceptor
|
|
streamInterceptor grpc.StreamClientInterceptor
|
|
)
|
|
if cfg.OpenTracingEnable {
|
|
unaryInterceptor = grpc_opentracing.UnaryClientInterceptor()
|
|
streamInterceptor = grpc_opentracing.StreamClientInterceptor()
|
|
}
|
|
|
|
allowBatch := (cfg.TiKVClient.MaxBatchSize > 0) && enableBatch
|
|
if allowBatch {
|
|
a.batchConn = newBatchConn(uint(len(a.v)), cfg.TiKVClient.MaxBatchSize, idleNotify)
|
|
a.batchConn.initMetrics(a.target)
|
|
}
|
|
keepAlive := cfg.TiKVClient.GrpcKeepAliveTime
|
|
for i := range a.v {
|
|
ctx, cancel := context.WithTimeout(context.Background(), a.dialTimeout)
|
|
var callOptions []grpc.CallOption
|
|
callOptions = append(callOptions, grpc.MaxCallRecvMsgSize(MaxRecvMsgSize))
|
|
if cfg.TiKVClient.GrpcCompressionType == gzip.Name {
|
|
callOptions = append(callOptions, grpc.UseCompressor(gzip.Name))
|
|
}
|
|
|
|
opts = append([]grpc.DialOption{
|
|
opt,
|
|
grpc.WithInitialWindowSize(cfg.TiKVClient.GrpcInitialWindowSize),
|
|
grpc.WithInitialConnWindowSize(cfg.TiKVClient.GrpcInitialConnWindowSize),
|
|
grpc.WithUnaryInterceptor(unaryInterceptor),
|
|
grpc.WithStreamInterceptor(streamInterceptor),
|
|
grpc.WithDefaultCallOptions(callOptions...),
|
|
grpc.WithConnectParams(grpc.ConnectParams{
|
|
Backoff: backoff.Config{
|
|
BaseDelay: 100 * time.Millisecond, // Default was 1s.
|
|
Multiplier: 1.6, // Default
|
|
Jitter: 0.2, // Default
|
|
MaxDelay: 3 * time.Second, // Default was 120s.
|
|
},
|
|
MinConnectTimeout: a.dialTimeout,
|
|
}),
|
|
grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
|
Time: time.Duration(keepAlive) * time.Second,
|
|
Timeout: cfg.TiKVClient.GetGrpcKeepAliveTimeout(),
|
|
}),
|
|
}, opts...)
|
|
if cfg.TiKVClient.GrpcSharedBufferPool {
|
|
opts = append(opts, experimental.WithRecvBufferPool(grpc.NewSharedBufferPool()))
|
|
}
|
|
conn, err := a.monitoredDial(
|
|
ctx,
|
|
fmt.Sprintf("%s-%d", a.target, i),
|
|
addr,
|
|
opts...,
|
|
)
|
|
|
|
cancel()
|
|
if err != nil {
|
|
// Cleanup if the initialization fails.
|
|
a.Close()
|
|
return errors.WithStack(err)
|
|
}
|
|
a.v[i] = conn
|
|
|
|
if allowBatch {
|
|
batchClient := &batchCommandsClient{
|
|
target: a.target,
|
|
conn: conn.ClientConn,
|
|
forwardedClients: make(map[string]*batchCommandsStream),
|
|
batched: sync.Map{},
|
|
epoch: 0,
|
|
closed: 0,
|
|
tikvClientCfg: cfg.TiKVClient,
|
|
tikvLoad: &a.tikvTransportLayerLoad,
|
|
dialTimeout: a.dialTimeout,
|
|
tryLock: tryLock{sync.NewCond(new(sync.Mutex)), false},
|
|
eventListener: eventListener,
|
|
metrics: &a.batchConn.metrics,
|
|
}
|
|
batchClient.maxConcurrencyRequestLimit.Store(cfg.TiKVClient.MaxConcurrencyRequestLimit)
|
|
a.batchCommandsClients = append(a.batchCommandsClients, batchClient)
|
|
}
|
|
}
|
|
go tikvrpc.CheckStreamTimeoutLoop(a.streamTimeout, a.done)
|
|
if allowBatch {
|
|
go a.batchSendLoop(cfg.TiKVClient)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (a *connArray) Get() *grpc.ClientConn {
|
|
next := atomic.AddUint32(&a.index, 1) % uint32(len(a.v))
|
|
return a.v[next].ClientConn
|
|
}
|
|
|
|
func (a *connArray) Close() {
|
|
if a.batchConn != nil {
|
|
a.batchConn.Close()
|
|
}
|
|
|
|
for _, c := range a.v {
|
|
if c != nil {
|
|
err := c.Close()
|
|
tikverr.Log(err)
|
|
if err == nil {
|
|
a.monitor.RemoveConn(c)
|
|
}
|
|
}
|
|
}
|
|
|
|
close(a.done)
|
|
}
|
|
|
|
func (a *connArray) updateRPCMetrics(req *tikvrpc.Request, resp *tikvrpc.Response, latency time.Duration) {
|
|
seconds := latency.Seconds()
|
|
stale := req.GetStaleRead()
|
|
source := req.GetRequestSource()
|
|
internal := util.IsInternalRequest(req.GetRequestSource())
|
|
|
|
a.metrics.rpcLatHist.get(req.Type, stale, internal).Observe(seconds)
|
|
|
|
srcLatSum, ok := a.metrics.rpcSrcLatSum.Load(source)
|
|
if !ok {
|
|
srcLatSum = deriveRPCMetrics(metrics.TiKVSendReqSummary.MustCurryWith(
|
|
prometheus.Labels{metrics.LblStore: a.target, metrics.LblSource: source}))
|
|
a.metrics.rpcSrcLatSum.Store(source, srcLatSum)
|
|
}
|
|
srcLatSum.(*rpcMetrics).get(req.Type, stale, internal).Observe(seconds)
|
|
|
|
if execDetail := resp.GetExecDetailsV2(); execDetail != nil {
|
|
var totalRpcWallTimeNs uint64
|
|
if execDetail.TimeDetailV2 != nil {
|
|
totalRpcWallTimeNs = execDetail.TimeDetailV2.TotalRpcWallTimeNs
|
|
} else if execDetail.TimeDetail != nil {
|
|
totalRpcWallTimeNs = execDetail.TimeDetail.TotalRpcWallTimeNs
|
|
}
|
|
if totalRpcWallTimeNs > 0 {
|
|
lat := latency - time.Duration(totalRpcWallTimeNs)
|
|
if internal {
|
|
a.metrics.rpcNetLatInternal.Observe(lat.Seconds())
|
|
} else {
|
|
a.metrics.rpcNetLatExternal.Observe(lat.Seconds())
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
type option struct {
|
|
gRPCDialOptions []grpc.DialOption
|
|
security config.Security
|
|
dialTimeout time.Duration
|
|
codec apicodec.Codec
|
|
}
|
|
|
|
// Opt is the option for the client.
|
|
type Opt func(*option)
|
|
|
|
// WithSecurity is used to set the security config.
|
|
func WithSecurity(security config.Security) Opt {
|
|
return func(c *option) {
|
|
c.security = security
|
|
}
|
|
}
|
|
|
|
// WithGRPCDialOptions is used to set the grpc.DialOption.
|
|
func WithGRPCDialOptions(grpcDialOptions ...grpc.DialOption) Opt {
|
|
return func(c *option) {
|
|
c.gRPCDialOptions = grpcDialOptions
|
|
}
|
|
}
|
|
|
|
// WithCodec is used to set RPCClient's codec.
|
|
func WithCodec(codec apicodec.Codec) Opt {
|
|
return func(c *option) {
|
|
c.codec = codec
|
|
}
|
|
}
|
|
|
|
// RPCClient is RPC client struct.
|
|
// TODO: Add flow control between RPC clients in TiDB ond RPC servers in TiKV.
|
|
// Since we use shared client connection to communicate to the same TiKV, it's possible
|
|
// that there are too many concurrent requests which overload the service of TiKV.
|
|
type RPCClient struct {
|
|
sync.RWMutex
|
|
|
|
conns map[string]*connArray
|
|
vers map[string]uint64
|
|
option *option
|
|
|
|
idleNotify uint32
|
|
|
|
// Periodically check whether there is any connection that is idle and then close and remove these connections.
|
|
// Implement background cleanup.
|
|
isClosed bool
|
|
|
|
connMonitor *connMonitor
|
|
|
|
eventListener *atomic.Pointer[ClientEventListener]
|
|
}
|
|
|
|
var _ Client = &RPCClient{}
|
|
|
|
// NewRPCClient creates a client that manages connections and rpc calls with tikv-servers.
|
|
func NewRPCClient(opts ...Opt) *RPCClient {
|
|
cli := &RPCClient{
|
|
conns: make(map[string]*connArray),
|
|
vers: make(map[string]uint64),
|
|
option: &option{
|
|
dialTimeout: dialTimeout,
|
|
},
|
|
connMonitor: &connMonitor{},
|
|
eventListener: new(atomic.Pointer[ClientEventListener]),
|
|
}
|
|
for _, opt := range opts {
|
|
opt(cli.option)
|
|
}
|
|
cli.connMonitor.Start()
|
|
return cli
|
|
}
|
|
|
|
func (c *RPCClient) getConnArray(addr string, enableBatch bool, opt ...func(cfg *config.TiKVClient)) (*connArray, error) {
|
|
c.RLock()
|
|
if c.isClosed {
|
|
c.RUnlock()
|
|
return nil, errors.Errorf("rpcClient is closed")
|
|
}
|
|
array, ok := c.conns[addr]
|
|
c.RUnlock()
|
|
if !ok {
|
|
var err error
|
|
array, err = c.createConnArray(addr, enableBatch, opt...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// An idle connArray will not change to active again, this avoid the race condition
|
|
// that recycling idle connection close an active connection unexpectedly (idle -> active).
|
|
if array.batchConn != nil && array.isIdle() {
|
|
return nil, errors.Errorf("rpcClient is idle")
|
|
}
|
|
|
|
return array, nil
|
|
}
|
|
|
|
func (c *RPCClient) createConnArray(addr string, enableBatch bool, opts ...func(cfg *config.TiKVClient)) (*connArray, error) {
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
array, ok := c.conns[addr]
|
|
if !ok {
|
|
var err error
|
|
client := config.GetGlobalConfig().TiKVClient
|
|
for _, opt := range opts {
|
|
opt(&client)
|
|
}
|
|
ver := c.vers[addr] + 1
|
|
array, err = newConnArray(
|
|
client.GrpcConnectionCount,
|
|
addr,
|
|
ver,
|
|
c.option.security,
|
|
&c.idleNotify,
|
|
enableBatch,
|
|
c.option.dialTimeout,
|
|
c.connMonitor,
|
|
c.eventListener,
|
|
c.option.gRPCDialOptions)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
c.conns[addr] = array
|
|
c.vers[addr] = ver
|
|
}
|
|
return array, nil
|
|
}
|
|
|
|
func (c *RPCClient) closeConns() {
|
|
c.Lock()
|
|
if !c.isClosed {
|
|
c.isClosed = true
|
|
// close all connections
|
|
for _, array := range c.conns {
|
|
array.Close()
|
|
}
|
|
}
|
|
c.Unlock()
|
|
}
|
|
|
|
func (c *RPCClient) recycleIdleConnArray() {
|
|
start := time.Now()
|
|
|
|
var addrs []string
|
|
var vers []uint64
|
|
c.RLock()
|
|
for _, conn := range c.conns {
|
|
if conn.batchConn != nil && conn.isIdle() {
|
|
addrs = append(addrs, conn.target)
|
|
vers = append(vers, conn.ver)
|
|
}
|
|
}
|
|
c.RUnlock()
|
|
|
|
for i, addr := range addrs {
|
|
c.CloseAddrVer(addr, vers[i])
|
|
}
|
|
|
|
metrics.TiKVBatchClientRecycle.Observe(time.Since(start).Seconds())
|
|
}
|
|
|
|
func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (resp *tikvrpc.Response, err error) {
|
|
tikvrpc.AttachContext(req, req.Context)
|
|
|
|
var spanRPC opentracing.Span
|
|
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
|
|
spanRPC = span.Tracer().StartSpan(fmt.Sprintf("rpcClient.SendRequest, region ID: %d, type: %s", req.RegionId, req.Type), opentracing.ChildOf(span.Context()))
|
|
defer spanRPC.Finish()
|
|
ctx = opentracing.ContextWithSpan(ctx, spanRPC)
|
|
}
|
|
|
|
if atomic.CompareAndSwapUint32(&c.idleNotify, 1, 0) {
|
|
go c.recycleIdleConnArray()
|
|
}
|
|
|
|
// TiDB will not send batch commands to TiFlash, to resolve the conflict with Batch Cop Request.
|
|
// tiflash/tiflash_mpp/tidb don't use BatchCommand.
|
|
enableBatch := req.StoreTp == tikvrpc.TiKV
|
|
connArray, err := c.getConnArray(addr, enableBatch)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
wrapErrConn := func(resp *tikvrpc.Response, err error) (*tikvrpc.Response, error) {
|
|
return resp, WrapErrConn(err, connArray)
|
|
}
|
|
|
|
start := time.Now()
|
|
defer func() {
|
|
elapsed := time.Since(start)
|
|
connArray.updateRPCMetrics(req, resp, elapsed)
|
|
|
|
if stmtExec := ctx.Value(util.ExecDetailsKey); stmtExec != nil {
|
|
execDetails := stmtExec.(*util.ExecDetails)
|
|
atomic.AddInt64(&execDetails.WaitKVRespDuration, int64(elapsed))
|
|
execNetworkCollector := networkCollector{}
|
|
execNetworkCollector.onReq(req, execDetails)
|
|
execNetworkCollector.onResp(req, resp, execDetails)
|
|
}
|
|
|
|
if spanRPC != nil && util.TraceExecDetailsEnabled(ctx) {
|
|
if si := buildSpanInfoFromResp(resp); si != nil {
|
|
si.addTo(spanRPC, start)
|
|
}
|
|
}
|
|
}()
|
|
|
|
// TiDB RPC server supports batch RPC, but batch connection will send heart beat, It's not necessary since
|
|
// request to TiDB is not high frequency.
|
|
pri := req.GetResourceControlContext().GetOverridePriority()
|
|
if config.GetGlobalConfig().TiKVClient.MaxBatchSize > 0 && enableBatch {
|
|
if batchReq := req.ToBatchCommandsRequest(); batchReq != nil {
|
|
defer trace.StartRegion(ctx, req.Type.String()).End()
|
|
return wrapErrConn(sendBatchRequest(ctx, addr, req.ForwardedHost, connArray.batchConn, batchReq, timeout, pri))
|
|
}
|
|
}
|
|
|
|
clientConn := connArray.Get()
|
|
if state := clientConn.GetState(); state == connectivity.TransientFailure {
|
|
storeID := strconv.FormatUint(req.Context.GetPeer().GetStoreId(), 10)
|
|
metrics.TiKVGRPCConnTransientFailureCounter.WithLabelValues(addr, storeID).Inc()
|
|
}
|
|
|
|
if req.IsDebugReq() {
|
|
client := debugpb.NewDebugClient(clientConn)
|
|
ctx1, cancel := context.WithTimeout(ctx, timeout)
|
|
defer cancel()
|
|
return wrapErrConn(tikvrpc.CallDebugRPC(ctx1, client, req))
|
|
}
|
|
|
|
client := tikvpb.NewTikvClient(clientConn)
|
|
|
|
// Set metadata for request forwarding. Needn't forward DebugReq.
|
|
if req.ForwardedHost != "" {
|
|
ctx = metadata.AppendToOutgoingContext(ctx, forwardMetadataKey, req.ForwardedHost)
|
|
}
|
|
switch req.Type {
|
|
case tikvrpc.CmdBatchCop:
|
|
return wrapErrConn(c.getBatchCopStreamResponse(ctx, client, req, timeout, connArray))
|
|
case tikvrpc.CmdCopStream:
|
|
return wrapErrConn(c.getCopStreamResponse(ctx, client, req, timeout, connArray))
|
|
case tikvrpc.CmdMPPConn:
|
|
return wrapErrConn(c.getMPPStreamResponse(ctx, client, req, timeout, connArray))
|
|
}
|
|
// Or else it's a unary call.
|
|
ctx1, cancel := context.WithTimeout(ctx, timeout)
|
|
defer cancel()
|
|
return wrapErrConn(tikvrpc.CallRPC(ctx1, client, req))
|
|
}
|
|
|
|
// SendRequest sends a Request to server and receives Response.
|
|
func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
|
|
// In unit test, the option or codec may be nil. Here should skip the encode/decode process.
|
|
if c.option == nil || c.option.codec == nil {
|
|
return c.sendRequest(ctx, addr, req, timeout)
|
|
}
|
|
|
|
codec := c.option.codec
|
|
req, err := codec.EncodeRequest(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
resp, err := c.sendRequest(ctx, addr, req, timeout)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return codec.DecodeResponse(req, resp)
|
|
}
|
|
|
|
func (c *RPCClient) getCopStreamResponse(ctx context.Context, client tikvpb.TikvClient, req *tikvrpc.Request, timeout time.Duration, connArray *connArray) (*tikvrpc.Response, error) {
|
|
// Coprocessor streaming request.
|
|
// Use context to support timeout for grpc streaming client.
|
|
ctx1, cancel := context.WithCancel(ctx)
|
|
// Should NOT call defer cancel() here because it will cancel further stream.Recv()
|
|
// We put it in copStream.Lease.Cancel call this cancel at copStream.Close
|
|
// TODO: add unit test for SendRequest.
|
|
resp, err := tikvrpc.CallRPC(ctx1, client, req)
|
|
if err != nil {
|
|
cancel()
|
|
return nil, err
|
|
}
|
|
|
|
// Put the lease object to the timeout channel, so it would be checked periodically.
|
|
copStream := resp.Resp.(*tikvrpc.CopStreamResponse)
|
|
copStream.Timeout = timeout
|
|
copStream.Lease.Cancel = cancel
|
|
connArray.streamTimeout <- &copStream.Lease
|
|
|
|
// Read the first streaming response to get CopStreamResponse.
|
|
// This can make error handling much easier, because SendReq() retry on
|
|
// region error automatically.
|
|
var first *coprocessor.Response
|
|
first, err = copStream.Recv()
|
|
if err != nil {
|
|
if errors.Cause(err) != io.EOF {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
logutil.BgLogger().Debug("copstream returns nothing for the request.")
|
|
}
|
|
copStream.Response = first
|
|
return resp, nil
|
|
|
|
}
|
|
|
|
func (c *RPCClient) getBatchCopStreamResponse(ctx context.Context, client tikvpb.TikvClient, req *tikvrpc.Request, timeout time.Duration, connArray *connArray) (*tikvrpc.Response, error) {
|
|
// Coprocessor streaming request.
|
|
// Use context to support timeout for grpc streaming client.
|
|
ctx1, cancel := context.WithCancel(ctx)
|
|
// Should NOT call defer cancel() here because it will cancel further stream.Recv()
|
|
// We put it in copStream.Lease.Cancel call this cancel at copStream.Close
|
|
// TODO: add unit test for SendRequest.
|
|
resp, err := tikvrpc.CallRPC(ctx1, client, req)
|
|
if err != nil {
|
|
cancel()
|
|
return nil, err
|
|
}
|
|
|
|
// Put the lease object to the timeout channel, so it would be checked periodically.
|
|
copStream := resp.Resp.(*tikvrpc.BatchCopStreamResponse)
|
|
copStream.Timeout = timeout
|
|
copStream.Lease.Cancel = cancel
|
|
connArray.streamTimeout <- &copStream.Lease
|
|
|
|
// Read the first streaming response to get CopStreamResponse.
|
|
// This can make error handling much easier, because SendReq() retry on
|
|
// region error automatically.
|
|
var first *coprocessor.BatchResponse
|
|
first, err = copStream.Recv()
|
|
if err != nil {
|
|
if errors.Cause(err) != io.EOF {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
logutil.BgLogger().Debug("batch copstream returns nothing for the request.")
|
|
}
|
|
copStream.BatchResponse = first
|
|
return resp, nil
|
|
}
|
|
|
|
func (c *RPCClient) getMPPStreamResponse(ctx context.Context, client tikvpb.TikvClient, req *tikvrpc.Request, timeout time.Duration, connArray *connArray) (*tikvrpc.Response, error) {
|
|
// MPP streaming request.
|
|
// Use context to support timeout for grpc streaming client.
|
|
ctx1, cancel := context.WithCancel(ctx)
|
|
// Should NOT call defer cancel() here because it will cancel further stream.Recv()
|
|
// We put it in copStream.Lease.Cancel call this cancel at copStream.Close
|
|
// TODO: add unit test for SendRequest.
|
|
resp, err := tikvrpc.CallRPC(ctx1, client, req)
|
|
if err != nil {
|
|
cancel()
|
|
return nil, err
|
|
}
|
|
|
|
// Put the lease object to the timeout channel, so it would be checked periodically.
|
|
copStream := resp.Resp.(*tikvrpc.MPPStreamResponse)
|
|
copStream.Timeout = timeout
|
|
copStream.Lease.Cancel = cancel
|
|
connArray.streamTimeout <- &copStream.Lease
|
|
|
|
// Read the first streaming response to get CopStreamResponse.
|
|
// This can make error handling much easier, because SendReq() retry on
|
|
// region error automatically.
|
|
var first *mpp.MPPDataPacket
|
|
first, err = copStream.Recv()
|
|
if err != nil {
|
|
if errors.Cause(err) != io.EOF {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
}
|
|
copStream.MPPDataPacket = first
|
|
return resp, nil
|
|
}
|
|
|
|
// Close closes all connections.
|
|
func (c *RPCClient) Close() error {
|
|
// TODO: add a unit test for SendRequest After Closed
|
|
c.closeConns()
|
|
c.connMonitor.Stop()
|
|
return nil
|
|
}
|
|
|
|
// CloseAddr closes gRPC connections to the address.
|
|
func (c *RPCClient) CloseAddr(addr string) error {
|
|
return c.CloseAddrVer(addr, math.MaxUint64)
|
|
}
|
|
|
|
func (c *RPCClient) CloseAddrVer(addr string, ver uint64) error {
|
|
c.Lock()
|
|
if c.isClosed {
|
|
c.Unlock()
|
|
return nil
|
|
}
|
|
conn, ok := c.conns[addr]
|
|
if ok {
|
|
if conn.ver <= ver {
|
|
delete(c.conns, addr)
|
|
logutil.BgLogger().Debug("close connection", zap.String("target", addr), zap.Uint64("ver", ver), zap.Uint64("conn.ver", conn.ver))
|
|
} else {
|
|
logutil.BgLogger().Debug("ignore close connection", zap.String("target", addr), zap.Uint64("ver", ver), zap.Uint64("conn.ver", conn.ver))
|
|
conn = nil
|
|
}
|
|
}
|
|
c.Unlock()
|
|
|
|
if conn != nil {
|
|
conn.Close()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SetEventListener registers an event listener for the Client instance. If it's called more than once, the
|
|
// previously set one will be replaced.
|
|
func (c *RPCClient) SetEventListener(listener ClientEventListener) {
|
|
c.eventListener.Store(&listener)
|
|
}
|
|
|
|
type spanInfo struct {
|
|
name string
|
|
dur uint64
|
|
async bool
|
|
children []spanInfo
|
|
}
|
|
|
|
func (si *spanInfo) calcDur() uint64 {
|
|
if si.dur == 0 {
|
|
for _, child := range si.children {
|
|
if child.async {
|
|
// TODO: Here we just skip the duration of async process, however there might be a sync point before a
|
|
// specified span, in which case we should take the max(main_routine_duration, async_span_duration).
|
|
// It's OK for now, since only pesist-log is marked as async, whose duration is typically smaller than
|
|
// commit-log.
|
|
continue
|
|
}
|
|
si.dur += child.calcDur()
|
|
}
|
|
}
|
|
return si.dur
|
|
}
|
|
|
|
func (si *spanInfo) addTo(parent opentracing.Span, start time.Time) time.Time {
|
|
if parent == nil {
|
|
return start
|
|
}
|
|
dur := si.calcDur()
|
|
if dur == 0 {
|
|
return start
|
|
}
|
|
end := start.Add(time.Duration(dur) * time.Nanosecond)
|
|
tracer := parent.Tracer()
|
|
span := tracer.StartSpan(si.name, opentracing.ChildOf(parent.Context()), opentracing.StartTime(start))
|
|
t := start
|
|
for _, child := range si.children {
|
|
t = child.addTo(span, t)
|
|
}
|
|
span.FinishWithOptions(opentracing.FinishOptions{FinishTime: end})
|
|
if si.async {
|
|
span.SetTag("async", "true")
|
|
return start
|
|
}
|
|
return end
|
|
}
|
|
|
|
func (si *spanInfo) printTo(out io.StringWriter) {
|
|
out.WriteString(si.name)
|
|
if si.async {
|
|
out.WriteString("'")
|
|
}
|
|
if si.dur > 0 {
|
|
out.WriteString("[")
|
|
out.WriteString(time.Duration(si.dur).String())
|
|
out.WriteString("]")
|
|
}
|
|
if len(si.children) > 0 {
|
|
out.WriteString("{")
|
|
for _, child := range si.children {
|
|
out.WriteString(" ")
|
|
child.printTo(out)
|
|
}
|
|
out.WriteString(" }")
|
|
}
|
|
}
|
|
|
|
func (si *spanInfo) String() string {
|
|
buf := new(strings.Builder)
|
|
si.printTo(buf)
|
|
return buf.String()
|
|
}
|
|
|
|
func buildSpanInfoFromResp(resp *tikvrpc.Response) *spanInfo {
|
|
details := resp.GetExecDetailsV2()
|
|
if details == nil {
|
|
return nil
|
|
}
|
|
|
|
td := details.TimeDetailV2
|
|
tdOld := details.TimeDetail
|
|
sd := details.ScanDetailV2
|
|
wd := details.WriteDetail
|
|
|
|
if td == nil && tdOld == nil {
|
|
return nil
|
|
}
|
|
|
|
var spanRPC, spanWait, spanProcess spanInfo
|
|
if td != nil {
|
|
spanRPC = spanInfo{name: "tikv.RPC", dur: td.TotalRpcWallTimeNs}
|
|
spanWait = spanInfo{name: "tikv.Wait", dur: td.WaitWallTimeNs}
|
|
spanProcess = spanInfo{name: "tikv.Process", dur: td.ProcessWallTimeNs}
|
|
} else if tdOld != nil {
|
|
// TimeDetail is deprecated, will be removed in future version.
|
|
spanRPC = spanInfo{name: "tikv.RPC", dur: tdOld.TotalRpcWallTimeNs}
|
|
spanWait = spanInfo{name: "tikv.Wait", dur: tdOld.WaitWallTimeMs * uint64(time.Millisecond)}
|
|
spanProcess = spanInfo{name: "tikv.Process", dur: tdOld.ProcessWallTimeMs * uint64(time.Millisecond)}
|
|
}
|
|
|
|
if sd != nil {
|
|
spanWait.children = append(spanWait.children, spanInfo{name: "tikv.GetSnapshot", dur: sd.GetSnapshotNanos})
|
|
if wd == nil {
|
|
spanProcess.children = append(spanProcess.children, spanInfo{name: "tikv.RocksDBBlockRead", dur: sd.RocksdbBlockReadNanos})
|
|
}
|
|
}
|
|
|
|
spanRPC.children = append(spanRPC.children, spanWait, spanProcess)
|
|
if td != nil {
|
|
spanSuspend := spanInfo{name: "tikv.Suspend", dur: td.ProcessSuspendWallTimeNs}
|
|
spanRPC.children = append(spanRPC.children, spanSuspend)
|
|
}
|
|
|
|
if wd != nil {
|
|
spanAsyncWrite := spanInfo{
|
|
name: "tikv.AsyncWrite",
|
|
children: []spanInfo{
|
|
{name: "tikv.StoreBatchWait", dur: wd.StoreBatchWaitNanos},
|
|
{name: "tikv.ProposeSendWait", dur: wd.ProposeSendWaitNanos},
|
|
{name: "tikv.PersistLog", dur: wd.PersistLogNanos, async: true, children: []spanInfo{
|
|
{name: "tikv.RaftDBWriteWait", dur: wd.RaftDbWriteLeaderWaitNanos}, // MutexLock + WriteLeader
|
|
{name: "tikv.RaftDBWriteWAL", dur: wd.RaftDbSyncLogNanos},
|
|
{name: "tikv.RaftDBWriteMemtable", dur: wd.RaftDbWriteMemtableNanos},
|
|
}},
|
|
{name: "tikv.CommitLog", dur: wd.CommitLogNanos},
|
|
{name: "tikv.ApplyBatchWait", dur: wd.ApplyBatchWaitNanos},
|
|
{name: "tikv.ApplyLog", dur: wd.ApplyLogNanos, children: []spanInfo{
|
|
{name: "tikv.ApplyMutexLock", dur: wd.ApplyMutexLockNanos},
|
|
{name: "tikv.ApplyWriteLeaderWait", dur: wd.ApplyWriteLeaderWaitNanos},
|
|
{name: "tikv.ApplyWriteWAL", dur: wd.ApplyWriteWalNanos},
|
|
{name: "tikv.ApplyWriteMemtable", dur: wd.ApplyWriteMemtableNanos},
|
|
}},
|
|
},
|
|
}
|
|
spanRPC.children = append(spanRPC.children, spanAsyncWrite)
|
|
}
|
|
|
|
return &spanRPC
|
|
}
|
|
|
|
func deriveRPCMetrics(root prometheus.ObserverVec) *rpcMetrics {
|
|
return &rpcMetrics{
|
|
root: root,
|
|
latGet: root.With(prometheus.Labels{metrics.LblType: tikvrpc.CmdGet.String(), metrics.LblStaleRead: "false", metrics.LblScope: "false"}),
|
|
latCop: root.With(prometheus.Labels{metrics.LblType: tikvrpc.CmdCop.String(), metrics.LblStaleRead: "false", metrics.LblScope: "false"}),
|
|
latBatchGet: root.With(prometheus.Labels{metrics.LblType: tikvrpc.CmdBatchGet.String(), metrics.LblStaleRead: "false", metrics.LblScope: "false"}),
|
|
}
|
|
}
|
|
|
|
type rpcMetrics struct {
|
|
root prometheus.ObserverVec
|
|
|
|
// static metrics
|
|
latGet prometheus.Observer
|
|
latCop prometheus.Observer
|
|
latBatchGet prometheus.Observer
|
|
|
|
latOther sync.Map
|
|
}
|
|
|
|
func (m *rpcMetrics) get(cmd tikvrpc.CmdType, stale bool, internal bool) prometheus.Observer {
|
|
if !stale && !internal {
|
|
switch cmd {
|
|
case tikvrpc.CmdGet:
|
|
return m.latGet
|
|
case tikvrpc.CmdCop:
|
|
return m.latCop
|
|
case tikvrpc.CmdBatchGet:
|
|
return m.latBatchGet
|
|
}
|
|
}
|
|
key := uint64(cmd)
|
|
if stale {
|
|
key |= 1 << 16
|
|
}
|
|
if internal {
|
|
key |= 1 << 17
|
|
}
|
|
lat, ok := m.latOther.Load(key)
|
|
if !ok {
|
|
lat = m.root.With(prometheus.Labels{
|
|
metrics.LblType: cmd.String(),
|
|
metrics.LblStaleRead: strconv.FormatBool(stale),
|
|
metrics.LblScope: strconv.FormatBool(internal),
|
|
})
|
|
m.latOther.Store(key, lat)
|
|
}
|
|
return lat.(prometheus.Observer)
|
|
}
|