client-go/internal/client/conn_batch.go

340 lines
11 KiB
Go

// Copyright 2025 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package client
import (
"runtime"
"runtime/trace"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/internal/logutil"
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
)
type batchConn struct {
// An atomic flag indicates whether the batch is idle or not.
// 0 for busy, others for idle.
idle uint32
// batchCommandsCh used for batch commands.
batchCommandsCh chan *batchCommandsEntry
batchCommandsClients []*batchCommandsClient
tikvTransportLayerLoad uint64
closed chan struct{}
reqBuilder *batchCommandsBuilder
// Notify rpcClient to check the idle flag
idleNotify *uint32
idleDetect *time.Timer
fetchMoreTimer *time.Timer
index uint32
metrics batchConnMetrics
}
func newBatchConn(connCount, maxBatchSize uint, idleNotify *uint32) *batchConn {
return &batchConn{
batchCommandsCh: make(chan *batchCommandsEntry, maxBatchSize),
batchCommandsClients: make([]*batchCommandsClient, 0, connCount),
tikvTransportLayerLoad: 0,
closed: make(chan struct{}),
reqBuilder: newBatchCommandsBuilder(maxBatchSize),
idleNotify: idleNotify,
idleDetect: time.NewTimer(idleTimeout),
}
}
func (a *batchConn) initMetrics(target string) {
a.metrics.pendingRequests = metrics.TiKVBatchPendingRequests.WithLabelValues(target)
a.metrics.batchSize = metrics.TiKVBatchRequests.WithLabelValues(target)
a.metrics.sendLoopWaitHeadDur = metrics.TiKVBatchSendLoopDuration.WithLabelValues(target, "wait-head")
a.metrics.sendLoopWaitMoreDur = metrics.TiKVBatchSendLoopDuration.WithLabelValues(target, "wait-more")
a.metrics.sendLoopSendDur = metrics.TiKVBatchSendLoopDuration.WithLabelValues(target, "send")
a.metrics.recvLoopRecvDur = metrics.TiKVBatchRecvLoopDuration.WithLabelValues(target, "recv")
a.metrics.recvLoopProcessDur = metrics.TiKVBatchRecvLoopDuration.WithLabelValues(target, "process")
a.metrics.batchSendTailLat = metrics.TiKVBatchSendTailLatency.WithLabelValues(target)
a.metrics.batchRecvTailLat = metrics.TiKVBatchRecvTailLatency.WithLabelValues(target)
a.metrics.headArrivalInterval = metrics.TiKVBatchHeadArrivalInterval.WithLabelValues(target)
a.metrics.batchMoreRequests = metrics.TiKVBatchMoreRequests.WithLabelValues(target)
a.metrics.bestBatchSize = metrics.TiKVBatchBestSize.WithLabelValues(target)
}
func (a *batchConn) isIdle() bool {
return atomic.LoadUint32(&a.idle) != 0
}
// fetchAllPendingRequests fetches all pending requests from the channel.
func (a *batchConn) fetchAllPendingRequests(maxBatchSize int) (headRecvTime time.Time, headArrivalInterval time.Duration) {
// Block on the first element.
latestReqStartTime := a.reqBuilder.latestReqStartTime
var headEntry *batchCommandsEntry
select {
case headEntry = <-a.batchCommandsCh:
if !a.idleDetect.Stop() {
<-a.idleDetect.C
}
a.idleDetect.Reset(idleTimeout)
case <-a.idleDetect.C:
a.idleDetect.Reset(idleTimeout)
atomic.AddUint32(&a.idle, 1)
atomic.CompareAndSwapUint32(a.idleNotify, 0, 1)
// This batchConn to be recycled
return time.Now(), 0
case <-a.closed:
return time.Now(), 0
}
if headEntry == nil {
return time.Now(), 0
}
headRecvTime = time.Now()
if headEntry.start.After(latestReqStartTime) && !latestReqStartTime.IsZero() {
headArrivalInterval = headEntry.start.Sub(latestReqStartTime)
}
a.reqBuilder.push(headEntry)
// This loop is for trying best to collect more requests.
for a.reqBuilder.len() < maxBatchSize {
select {
case entry := <-a.batchCommandsCh:
if entry == nil {
return
}
a.reqBuilder.push(entry)
default:
return
}
}
return
}
// fetchMorePendingRequests fetches more pending requests from the channel.
func (a *batchConn) fetchMorePendingRequests(
maxBatchSize int,
batchWaitSize int,
maxWaitTime time.Duration,
) {
// Try to collect `batchWaitSize` requests, or wait `maxWaitTime`.
if a.fetchMoreTimer == nil {
a.fetchMoreTimer = time.NewTimer(maxWaitTime)
} else {
a.fetchMoreTimer.Reset(maxWaitTime)
}
for a.reqBuilder.len() < batchWaitSize {
select {
case entry := <-a.batchCommandsCh:
if entry == nil {
if !a.fetchMoreTimer.Stop() {
<-a.fetchMoreTimer.C
}
return
}
a.reqBuilder.push(entry)
case <-a.fetchMoreTimer.C:
return
}
}
if !a.fetchMoreTimer.Stop() {
<-a.fetchMoreTimer.C
}
// Do an additional non-block try. Here we test the length with `maxBatchSize` instead
// of `batchWaitSize` because trying best to fetch more requests is necessary so that
// we can adjust the `batchWaitSize` dynamically.
yielded := false
for a.reqBuilder.len() < maxBatchSize {
select {
case entry := <-a.batchCommandsCh:
if entry == nil {
return
}
a.reqBuilder.push(entry)
default:
if yielded {
return
}
// yield once to batch more requests.
runtime.Gosched()
yielded = true
}
}
}
const idleTimeout = 3 * time.Minute
// BatchSendLoopPanicCounter is only used for testing.
var BatchSendLoopPanicCounter int64 = 0
var initBatchPolicyWarn sync.Once
func (a *batchConn) batchSendLoop(cfg config.TiKVClient) {
defer func() {
if r := recover(); r != nil {
metrics.TiKVPanicCounter.WithLabelValues(metrics.LabelBatchSendLoop).Inc()
logutil.BgLogger().Error("batchSendLoop",
zap.Any("r", r),
zap.Stack("stack"))
atomic.AddInt64(&BatchSendLoopPanicCounter, 1)
logutil.BgLogger().Info("restart batchSendLoop", zap.Int64("count", atomic.LoadInt64(&BatchSendLoopPanicCounter)))
go a.batchSendLoop(cfg)
}
}()
trigger, ok := newTurboBatchTriggerFromPolicy(cfg.BatchPolicy)
if !ok {
initBatchPolicyWarn.Do(func() {
logutil.BgLogger().Warn("fallback to default batch policy due to invalid value", zap.String("value", cfg.BatchPolicy))
})
}
turboBatchWaitTime := trigger.turboWaitTime()
avgBatchWaitSize := float64(cfg.BatchWaitSize)
for {
sendLoopStartTime := time.Now()
a.reqBuilder.reset()
headRecvTime, headArrivalInterval := a.fetchAllPendingRequests(int(cfg.MaxBatchSize))
if a.reqBuilder.len() == 0 {
// the conn is closed or recycled.
return
}
// curl -X PUT -d 'return(true)' http://0.0.0.0:10080/fail/tikvclient/mockBlockOnBatchClient
if val, err := util.EvalFailpoint("mockBlockOnBatchClient"); err == nil {
if val.(bool) {
time.Sleep(1 * time.Hour)
}
}
if batchSize := a.reqBuilder.len(); batchSize < int(cfg.MaxBatchSize) {
if cfg.MaxBatchWaitTime > 0 && atomic.LoadUint64(&a.tikvTransportLayerLoad) > uint64(cfg.OverloadThreshold) {
// If the target TiKV is overload, wait a while to collect more requests.
metrics.TiKVBatchWaitOverLoad.Inc()
a.fetchMorePendingRequests(int(cfg.MaxBatchSize), int(cfg.BatchWaitSize), cfg.MaxBatchWaitTime)
} else if turboBatchWaitTime > 0 && headArrivalInterval > 0 && trigger.needFetchMore(headArrivalInterval) {
batchWaitSize := trigger.preferredBatchWaitSize(avgBatchWaitSize, int(cfg.BatchWaitSize))
a.fetchMorePendingRequests(int(cfg.MaxBatchSize), batchWaitSize, turboBatchWaitTime)
a.metrics.batchMoreRequests.Observe(float64(a.reqBuilder.len() - batchSize))
}
}
length := a.reqBuilder.len()
avgBatchWaitSize = 0.2*float64(length) + 0.8*avgBatchWaitSize
a.metrics.pendingRequests.Observe(float64(len(a.batchCommandsCh) + length))
a.metrics.bestBatchSize.Observe(avgBatchWaitSize)
a.metrics.headArrivalInterval.Observe(headArrivalInterval.Seconds())
a.metrics.sendLoopWaitHeadDur.Observe(headRecvTime.Sub(sendLoopStartTime).Seconds())
a.metrics.sendLoopWaitMoreDur.Observe(time.Since(sendLoopStartTime).Seconds())
a.getClientAndSend()
sendLoopEndTime := time.Now()
a.metrics.sendLoopSendDur.Observe(sendLoopEndTime.Sub(sendLoopStartTime).Seconds())
if dur := sendLoopEndTime.Sub(headRecvTime); dur > batchSendTailLatThreshold {
a.metrics.batchSendTailLat.Observe(dur.Seconds())
}
}
}
const (
SendFailedReasonNoAvailableLimit = "concurrency limit exceeded"
SendFailedReasonTryLockForSendFail = "tryLockForSend fail"
)
func (a *batchConn) getClientAndSend() {
if val, err := util.EvalFailpoint("mockBatchClientSendDelay"); err == nil {
if timeout, ok := val.(int); ok && timeout > 0 {
time.Sleep(time.Duration(timeout * int(time.Millisecond)))
}
}
// Choose a connection by round-robbin.
var (
cli *batchCommandsClient
target string
)
reasons := make([]string, 0)
hasHighPriorityTask := a.reqBuilder.hasHighPriorityTask()
for i := 0; i < len(a.batchCommandsClients); i++ {
a.index = (a.index + 1) % uint32(len(a.batchCommandsClients))
target = a.batchCommandsClients[a.index].target
// The lock protects the batchCommandsClient from been closed while it's in use.
c := a.batchCommandsClients[a.index]
if hasHighPriorityTask || c.available() > 0 {
if c.tryLockForSend() {
cli = c
break
} else {
reasons = append(reasons, SendFailedReasonTryLockForSendFail)
}
} else {
reasons = append(reasons, SendFailedReasonNoAvailableLimit)
}
}
if cli == nil {
logutil.BgLogger().Info("no available connections", zap.String("target", target), zap.Any("reasons", reasons))
metrics.TiKVNoAvailableConnectionCounter.Inc()
if config.GetGlobalConfig().TiKVClient.MaxConcurrencyRequestLimit == config.DefMaxConcurrencyRequestLimit {
// Only cancel requests when MaxConcurrencyRequestLimit feature is not enabled, to be compatible with the behavior of older versions.
// TODO: But when MaxConcurrencyRequestLimit feature is enabled, the requests won't be canceled and will wait until timeout.
// This behavior may not be reasonable, as the timeout is usually 40s or 60s, which is too long to retry in time.
a.reqBuilder.cancel(errors.New("no available connections"))
}
return
}
defer cli.unlockForSend()
available := cli.available()
reqSendTime := time.Now()
batch := 0
req, forwardingReqs := a.reqBuilder.buildWithLimit(available, func(id uint64, e *batchCommandsEntry) {
cli.batched.Store(id, e)
cli.sent.Add(1)
atomic.StoreInt64(&e.sendLat, int64(reqSendTime.Sub(e.start)))
if trace.IsEnabled() {
trace.Log(e.ctx, "rpc", "send")
}
})
if req != nil {
batch += len(req.RequestIds)
cli.send("", req)
}
for forwardedHost, req := range forwardingReqs {
batch += len(req.RequestIds)
cli.send(forwardedHost, req)
}
if batch > 0 {
a.metrics.batchSize.Observe(float64(batch))
}
}
func (a *batchConn) Close() {
// Close all batchRecvLoop.
for _, c := range a.batchCommandsClients {
// After connections are closed, `batchRecvLoop`s will check the flag.
atomic.StoreInt32(&c.closed, 1)
}
// Don't close(batchCommandsCh) because when Close() is called, someone maybe
// calling SendRequest and writing batchCommandsCh, if we close it here the
// writing goroutine will panic.
close(a.closed)
}