mirror of https://github.com/tikv/client-go.git
*: fix batch-client wait too long and add some metrics (#973)
Signed-off-by: crazycs520 <crazycs520@gmail.com> Co-authored-by: disksing <i@disksing.com>
This commit is contained in:
parent
39084386b3
commit
adb7db13c3
|
|
@ -620,7 +620,7 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R
|
||||||
|
|
||||||
// TiDB RPC server supports batch RPC, but batch connection will send heart beat, It's not necessary since
|
// TiDB RPC server supports batch RPC, but batch connection will send heart beat, It's not necessary since
|
||||||
// request to TiDB is not high frequency.
|
// request to TiDB is not high frequency.
|
||||||
if config.GetGlobalConfig().TiKVClient.MaxBatchSize > 0 && enableBatch {
|
if config.GetGlobalConfig().TiKVClient.MaxBatchSize > 0 && enableBatch && !connArray.batchConn.isBusy(start.UnixNano()) {
|
||||||
if batchReq := req.ToBatchCommandsRequest(); batchReq != nil {
|
if batchReq := req.ToBatchCommandsRequest(); batchReq != nil {
|
||||||
defer trace.StartRegion(ctx, req.Type.String()).End()
|
defer trace.StartRegion(ctx, req.Type.String()).End()
|
||||||
return sendBatchRequest(ctx, addr, req.ForwardedHost, connArray.batchConn, batchReq, timeout)
|
return sendBatchRequest(ctx, addr, req.ForwardedHost, connArray.batchConn, batchReq, timeout)
|
||||||
|
|
|
||||||
|
|
@ -70,6 +70,7 @@ type batchCommandsEntry struct {
|
||||||
// canceled indicated the request is canceled or not.
|
// canceled indicated the request is canceled or not.
|
||||||
canceled int32
|
canceled int32
|
||||||
err error
|
err error
|
||||||
|
start time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *batchCommandsEntry) isCanceled() bool {
|
func (b *batchCommandsEntry) isCanceled() bool {
|
||||||
|
|
@ -197,9 +198,16 @@ type batchConn struct {
|
||||||
pendingRequests prometheus.Observer
|
pendingRequests prometheus.Observer
|
||||||
batchSize prometheus.Observer
|
batchSize prometheus.Observer
|
||||||
|
|
||||||
index uint32
|
index uint32
|
||||||
|
state atomic.Int32
|
||||||
|
startHandingTime atomic.Int64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
batchConnIdle = int32(0)
|
||||||
|
batchConnHanding = int32(1)
|
||||||
|
)
|
||||||
|
|
||||||
func newBatchConn(connCount, maxBatchSize uint, idleNotify *uint32) *batchConn {
|
func newBatchConn(connCount, maxBatchSize uint, idleNotify *uint32) *batchConn {
|
||||||
return &batchConn{
|
return &batchConn{
|
||||||
batchCommandsCh: make(chan *batchCommandsEntry, maxBatchSize),
|
batchCommandsCh: make(chan *batchCommandsEntry, maxBatchSize),
|
||||||
|
|
@ -216,6 +224,16 @@ func (a *batchConn) isIdle() bool {
|
||||||
return atomic.LoadUint32(&a.idle) != 0
|
return atomic.LoadUint32(&a.idle) != 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *batchConn) isBusy(now int64) bool {
|
||||||
|
if len(a.batchCommandsCh) == cap(a.batchCommandsCh) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if a.state.Load() == batchConnHanding && (now-a.startHandingTime.Load()) > int64(time.Second) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
// fetchAllPendingRequests fetches all pending requests from the channel.
|
// fetchAllPendingRequests fetches all pending requests from the channel.
|
||||||
func (a *batchConn) fetchAllPendingRequests(
|
func (a *batchConn) fetchAllPendingRequests(
|
||||||
maxBatchSize int,
|
maxBatchSize int,
|
||||||
|
|
@ -311,6 +329,7 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) {
|
||||||
|
|
||||||
bestBatchWaitSize := cfg.BatchWaitSize
|
bestBatchWaitSize := cfg.BatchWaitSize
|
||||||
for {
|
for {
|
||||||
|
a.state.Store(batchConnIdle)
|
||||||
a.reqBuilder.reset()
|
a.reqBuilder.reset()
|
||||||
|
|
||||||
start := a.fetchAllPendingRequests(int(cfg.MaxBatchSize))
|
start := a.fetchAllPendingRequests(int(cfg.MaxBatchSize))
|
||||||
|
|
@ -322,6 +341,8 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
a.state.Store(batchConnHanding)
|
||||||
|
a.startHandingTime.Store(start.UnixNano())
|
||||||
if a.reqBuilder.len() < int(cfg.MaxBatchSize) && cfg.MaxBatchWaitTime > 0 {
|
if a.reqBuilder.len() < int(cfg.MaxBatchSize) && cfg.MaxBatchWaitTime > 0 {
|
||||||
// If the target TiKV is overload, wait a while to collect more requests.
|
// If the target TiKV is overload, wait a while to collect more requests.
|
||||||
if atomic.LoadUint64(&a.tikvTransportLayerLoad) >= uint64(cfg.OverloadThreshold) {
|
if atomic.LoadUint64(&a.tikvTransportLayerLoad) >= uint64(cfg.OverloadThreshold) {
|
||||||
|
|
@ -378,11 +399,14 @@ func (a *batchConn) getClientAndSend() {
|
||||||
}
|
}
|
||||||
defer cli.unlockForSend()
|
defer cli.unlockForSend()
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
batchCmdWaitToSendDuration := metrics.TiKVBatchCmdDuration.WithLabelValues("wait-to-send", target)
|
||||||
req, forwardingReqs := a.reqBuilder.build(func(id uint64, e *batchCommandsEntry) {
|
req, forwardingReqs := a.reqBuilder.build(func(id uint64, e *batchCommandsEntry) {
|
||||||
cli.batched.Store(id, e)
|
cli.batched.Store(id, e)
|
||||||
if trace.IsEnabled() {
|
if trace.IsEnabled() {
|
||||||
trace.Log(e.ctx, "rpc", "send")
|
trace.Log(e.ctx, "rpc", "send")
|
||||||
}
|
}
|
||||||
|
batchCmdWaitToSendDuration.Observe(float64(now.Sub(e.start)))
|
||||||
})
|
})
|
||||||
if req != nil {
|
if req != nil {
|
||||||
cli.send("", req)
|
cli.send("", req)
|
||||||
|
|
@ -507,6 +531,14 @@ func (c *batchCommandsClient) isStopped() bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *batchCommandsClient) send(forwardedHost string, req *tikvpb.BatchCommandsRequest) {
|
func (c *batchCommandsClient) send(forwardedHost string, req *tikvpb.BatchCommandsRequest) {
|
||||||
|
start := time.Now()
|
||||||
|
defer func() {
|
||||||
|
if forwardedHost == "" {
|
||||||
|
metrics.TiKVBatchConnSendDuration.WithLabelValues(c.target).Observe(time.Since(start).Seconds())
|
||||||
|
} else {
|
||||||
|
metrics.TiKVBatchConnSendDuration.WithLabelValues(forwardedHost).Observe(time.Since(start).Seconds())
|
||||||
|
}
|
||||||
|
}()
|
||||||
err := c.initBatchClient(forwardedHost)
|
err := c.initBatchClient(forwardedHost)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logutil.BgLogger().Warn(
|
logutil.BgLogger().Warn(
|
||||||
|
|
@ -612,6 +644,7 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransport
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
batchCmdGotRespDuration := metrics.TiKVBatchCmdDuration.WithLabelValues("got-resp", c.target)
|
||||||
epoch := atomic.LoadUint64(&c.epoch)
|
epoch := atomic.LoadUint64(&c.epoch)
|
||||||
for {
|
for {
|
||||||
resp, err := streamClient.recv()
|
resp, err := streamClient.recv()
|
||||||
|
|
@ -635,6 +668,7 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransport
|
||||||
}
|
}
|
||||||
|
|
||||||
responses := resp.GetResponses()
|
responses := resp.GetResponses()
|
||||||
|
now := time.Now()
|
||||||
for i, requestID := range resp.GetRequestIds() {
|
for i, requestID := range resp.GetRequestIds() {
|
||||||
value, ok := c.batched.Load(requestID)
|
value, ok := c.batched.Load(requestID)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|
@ -649,6 +683,7 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransport
|
||||||
trace.Log(entry.ctx, "rpc", "received")
|
trace.Log(entry.ctx, "rpc", "received")
|
||||||
}
|
}
|
||||||
logutil.Eventf(entry.ctx, "receive %T response with other %d batched requests from %s", responses[i].GetCmd(), len(responses), c.target)
|
logutil.Eventf(entry.ctx, "receive %T response with other %d batched requests from %s", responses[i].GetCmd(), len(responses), c.target)
|
||||||
|
batchCmdGotRespDuration.Observe(float64(now.Sub(entry.start)))
|
||||||
if atomic.LoadInt32(&entry.canceled) == 0 {
|
if atomic.LoadInt32(&entry.canceled) == 0 {
|
||||||
// Put the response only if the request is not canceled.
|
// Put the response only if the request is not canceled.
|
||||||
entry.res <- responses[i]
|
entry.res <- responses[i]
|
||||||
|
|
@ -773,6 +808,7 @@ func sendBatchRequest(
|
||||||
req *tikvpb.BatchCommandsRequest_Request,
|
req *tikvpb.BatchCommandsRequest_Request,
|
||||||
timeout time.Duration,
|
timeout time.Duration,
|
||||||
) (*tikvrpc.Response, error) {
|
) (*tikvrpc.Response, error) {
|
||||||
|
start := time.Now()
|
||||||
entry := &batchCommandsEntry{
|
entry := &batchCommandsEntry{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
req: req,
|
req: req,
|
||||||
|
|
@ -780,11 +816,11 @@ func sendBatchRequest(
|
||||||
forwardedHost: forwardedHost,
|
forwardedHost: forwardedHost,
|
||||||
canceled: 0,
|
canceled: 0,
|
||||||
err: nil,
|
err: nil,
|
||||||
|
start: start,
|
||||||
}
|
}
|
||||||
timer := time.NewTimer(timeout)
|
timer := time.NewTimer(timeout)
|
||||||
defer timer.Stop()
|
defer timer.Stop()
|
||||||
|
|
||||||
start := time.Now()
|
|
||||||
select {
|
select {
|
||||||
case batchConn.batchCommandsCh <- entry:
|
case batchConn.batchCommandsCh <- entry:
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
|
@ -795,7 +831,7 @@ func sendBatchRequest(
|
||||||
return nil, errors.WithMessage(context.DeadlineExceeded, "wait sendLoop")
|
return nil, errors.WithMessage(context.DeadlineExceeded, "wait sendLoop")
|
||||||
}
|
}
|
||||||
waitDuration := time.Since(start)
|
waitDuration := time.Since(start)
|
||||||
metrics.TiKVBatchWaitDuration.Observe(float64(waitDuration))
|
metrics.TiKVBatchCmdDuration.WithLabelValues("send-to-chan", addr).Observe(float64(waitDuration))
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case res, ok := <-entry.res:
|
case res, ok := <-entry.res:
|
||||||
|
|
|
||||||
|
|
@ -62,7 +62,8 @@ var (
|
||||||
TiKVLocalLatchWaitTimeHistogram prometheus.Histogram
|
TiKVLocalLatchWaitTimeHistogram prometheus.Histogram
|
||||||
TiKVStatusDuration *prometheus.HistogramVec
|
TiKVStatusDuration *prometheus.HistogramVec
|
||||||
TiKVStatusCounter *prometheus.CounterVec
|
TiKVStatusCounter *prometheus.CounterVec
|
||||||
TiKVBatchWaitDuration prometheus.Histogram
|
TiKVBatchConnSendDuration *prometheus.HistogramVec
|
||||||
|
TiKVBatchCmdDuration *prometheus.HistogramVec
|
||||||
TiKVBatchSendLatency prometheus.Histogram
|
TiKVBatchSendLatency prometheus.Histogram
|
||||||
TiKVBatchWaitOverLoad prometheus.Counter
|
TiKVBatchWaitOverLoad prometheus.Counter
|
||||||
TiKVBatchPendingRequests *prometheus.HistogramVec
|
TiKVBatchPendingRequests *prometheus.HistogramVec
|
||||||
|
|
@ -333,15 +334,25 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) {
|
||||||
ConstLabels: constLabels,
|
ConstLabels: constLabels,
|
||||||
}, []string{LblResult})
|
}, []string{LblResult})
|
||||||
|
|
||||||
TiKVBatchWaitDuration = prometheus.NewHistogram(
|
TiKVBatchConnSendDuration = prometheus.NewHistogramVec(
|
||||||
prometheus.HistogramOpts{
|
prometheus.HistogramOpts{
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
Subsystem: subsystem,
|
Subsystem: subsystem,
|
||||||
Name: "batch_wait_duration",
|
Name: "batch_conn_send_seconds",
|
||||||
Buckets: prometheus.ExponentialBuckets(1, 2, 34), // 1ns ~ 8s
|
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 22), // 0.5ms ~ 1048s
|
||||||
Help: "batch wait duration",
|
Help: "batch conn send duration",
|
||||||
ConstLabels: constLabels,
|
ConstLabels: constLabels,
|
||||||
})
|
}, []string{LblStore})
|
||||||
|
|
||||||
|
TiKVBatchCmdDuration = prometheus.NewHistogramVec(
|
||||||
|
prometheus.HistogramOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "batch_cmd_duration",
|
||||||
|
Buckets: prometheus.ExponentialBuckets(16, 2, 36), // 16ns ~ 549s
|
||||||
|
Help: "batch cmd duration, unit is nanosecond",
|
||||||
|
ConstLabels: constLabels,
|
||||||
|
}, []string{LblType, LblStore})
|
||||||
|
|
||||||
TiKVBatchSendLatency = prometheus.NewHistogram(
|
TiKVBatchSendLatency = prometheus.NewHistogram(
|
||||||
prometheus.HistogramOpts{
|
prometheus.HistogramOpts{
|
||||||
|
|
@ -767,7 +778,8 @@ func RegisterMetrics() {
|
||||||
prometheus.MustRegister(TiKVLocalLatchWaitTimeHistogram)
|
prometheus.MustRegister(TiKVLocalLatchWaitTimeHistogram)
|
||||||
prometheus.MustRegister(TiKVStatusDuration)
|
prometheus.MustRegister(TiKVStatusDuration)
|
||||||
prometheus.MustRegister(TiKVStatusCounter)
|
prometheus.MustRegister(TiKVStatusCounter)
|
||||||
prometheus.MustRegister(TiKVBatchWaitDuration)
|
prometheus.MustRegister(TiKVBatchConnSendDuration)
|
||||||
|
prometheus.MustRegister(TiKVBatchCmdDuration)
|
||||||
prometheus.MustRegister(TiKVBatchSendLatency)
|
prometheus.MustRegister(TiKVBatchSendLatency)
|
||||||
prometheus.MustRegister(TiKVBatchRecvLatency)
|
prometheus.MustRegister(TiKVBatchRecvLatency)
|
||||||
prometheus.MustRegister(TiKVBatchWaitOverLoad)
|
prometheus.MustRegister(TiKVBatchWaitOverLoad)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue