Revert "*: fix batch-client wait too long and add some metrics (#973)" (#984)

This reverts commit adb7db13c3.

Signed-off-by: crazycs520 <crazycs520@gmail.com>
This commit is contained in:
crazycs 2023-09-19 11:15:11 +08:00 committed by GitHub
parent 330fc8d843
commit be2b4c78a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 11 additions and 59 deletions

View File

@ -620,7 +620,7 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R
// TiDB RPC server supports batch RPC, but batch connection will send heart beat, It's not necessary since // TiDB RPC server supports batch RPC, but batch connection will send heart beat, It's not necessary since
// request to TiDB is not high frequency. // request to TiDB is not high frequency.
if config.GetGlobalConfig().TiKVClient.MaxBatchSize > 0 && enableBatch && !connArray.batchConn.isBusy(start.UnixNano()) { if config.GetGlobalConfig().TiKVClient.MaxBatchSize > 0 && enableBatch {
if batchReq := req.ToBatchCommandsRequest(); batchReq != nil { if batchReq := req.ToBatchCommandsRequest(); batchReq != nil {
defer trace.StartRegion(ctx, req.Type.String()).End() defer trace.StartRegion(ctx, req.Type.String()).End()
return sendBatchRequest(ctx, addr, req.ForwardedHost, connArray.batchConn, batchReq, timeout) return sendBatchRequest(ctx, addr, req.ForwardedHost, connArray.batchConn, batchReq, timeout)

View File

@ -70,7 +70,6 @@ type batchCommandsEntry struct {
// canceled indicated the request is canceled or not. // canceled indicated the request is canceled or not.
canceled int32 canceled int32
err error err error
start time.Time
} }
func (b *batchCommandsEntry) isCanceled() bool { func (b *batchCommandsEntry) isCanceled() bool {
@ -199,15 +198,8 @@ type batchConn struct {
batchSize prometheus.Observer batchSize prometheus.Observer
index uint32 index uint32
state atomic.Int32
startHandingTime atomic.Int64
} }
var (
batchConnIdle = int32(0)
batchConnHanding = int32(1)
)
func newBatchConn(connCount, maxBatchSize uint, idleNotify *uint32) *batchConn { func newBatchConn(connCount, maxBatchSize uint, idleNotify *uint32) *batchConn {
return &batchConn{ return &batchConn{
batchCommandsCh: make(chan *batchCommandsEntry, maxBatchSize), batchCommandsCh: make(chan *batchCommandsEntry, maxBatchSize),
@ -224,16 +216,6 @@ func (a *batchConn) isIdle() bool {
return atomic.LoadUint32(&a.idle) != 0 return atomic.LoadUint32(&a.idle) != 0
} }
func (a *batchConn) isBusy(now int64) bool {
if len(a.batchCommandsCh) == cap(a.batchCommandsCh) {
return true
}
if a.state.Load() == batchConnHanding && (now-a.startHandingTime.Load()) > int64(time.Second) {
return true
}
return false
}
// fetchAllPendingRequests fetches all pending requests from the channel. // fetchAllPendingRequests fetches all pending requests from the channel.
func (a *batchConn) fetchAllPendingRequests( func (a *batchConn) fetchAllPendingRequests(
maxBatchSize int, maxBatchSize int,
@ -329,7 +311,6 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) {
bestBatchWaitSize := cfg.BatchWaitSize bestBatchWaitSize := cfg.BatchWaitSize
for { for {
a.state.Store(batchConnIdle)
a.reqBuilder.reset() a.reqBuilder.reset()
start := a.fetchAllPendingRequests(int(cfg.MaxBatchSize)) start := a.fetchAllPendingRequests(int(cfg.MaxBatchSize))
@ -341,8 +322,6 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) {
} }
} }
a.state.Store(batchConnHanding)
a.startHandingTime.Store(start.UnixNano())
if a.reqBuilder.len() < int(cfg.MaxBatchSize) && cfg.MaxBatchWaitTime > 0 { if a.reqBuilder.len() < int(cfg.MaxBatchSize) && cfg.MaxBatchWaitTime > 0 {
// If the target TiKV is overload, wait a while to collect more requests. // If the target TiKV is overload, wait a while to collect more requests.
if atomic.LoadUint64(&a.tikvTransportLayerLoad) >= uint64(cfg.OverloadThreshold) { if atomic.LoadUint64(&a.tikvTransportLayerLoad) >= uint64(cfg.OverloadThreshold) {
@ -399,14 +378,11 @@ func (a *batchConn) getClientAndSend() {
} }
defer cli.unlockForSend() defer cli.unlockForSend()
now := time.Now()
batchCmdWaitToSendDuration := metrics.TiKVBatchCmdDuration.WithLabelValues("wait-to-send", target)
req, forwardingReqs := a.reqBuilder.build(func(id uint64, e *batchCommandsEntry) { req, forwardingReqs := a.reqBuilder.build(func(id uint64, e *batchCommandsEntry) {
cli.batched.Store(id, e) cli.batched.Store(id, e)
if trace.IsEnabled() { if trace.IsEnabled() {
trace.Log(e.ctx, "rpc", "send") trace.Log(e.ctx, "rpc", "send")
} }
batchCmdWaitToSendDuration.Observe(float64(now.Sub(e.start)))
}) })
if req != nil { if req != nil {
cli.send("", req) cli.send("", req)
@ -531,14 +507,6 @@ func (c *batchCommandsClient) isStopped() bool {
} }
func (c *batchCommandsClient) send(forwardedHost string, req *tikvpb.BatchCommandsRequest) { func (c *batchCommandsClient) send(forwardedHost string, req *tikvpb.BatchCommandsRequest) {
start := time.Now()
defer func() {
if forwardedHost == "" {
metrics.TiKVBatchConnSendDuration.WithLabelValues(c.target).Observe(time.Since(start).Seconds())
} else {
metrics.TiKVBatchConnSendDuration.WithLabelValues(forwardedHost).Observe(time.Since(start).Seconds())
}
}()
err := c.initBatchClient(forwardedHost) err := c.initBatchClient(forwardedHost)
if err != nil { if err != nil {
logutil.BgLogger().Warn( logutil.BgLogger().Warn(
@ -644,7 +612,6 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransport
} }
}() }()
batchCmdGotRespDuration := metrics.TiKVBatchCmdDuration.WithLabelValues("got-resp", c.target)
epoch := atomic.LoadUint64(&c.epoch) epoch := atomic.LoadUint64(&c.epoch)
for { for {
resp, err := streamClient.recv() resp, err := streamClient.recv()
@ -668,7 +635,6 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransport
} }
responses := resp.GetResponses() responses := resp.GetResponses()
now := time.Now()
for i, requestID := range resp.GetRequestIds() { for i, requestID := range resp.GetRequestIds() {
value, ok := c.batched.Load(requestID) value, ok := c.batched.Load(requestID)
if !ok { if !ok {
@ -683,7 +649,6 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransport
trace.Log(entry.ctx, "rpc", "received") trace.Log(entry.ctx, "rpc", "received")
} }
logutil.Eventf(entry.ctx, "receive %T response with other %d batched requests from %s", responses[i].GetCmd(), len(responses), c.target) logutil.Eventf(entry.ctx, "receive %T response with other %d batched requests from %s", responses[i].GetCmd(), len(responses), c.target)
batchCmdGotRespDuration.Observe(float64(now.Sub(entry.start)))
if atomic.LoadInt32(&entry.canceled) == 0 { if atomic.LoadInt32(&entry.canceled) == 0 {
// Put the response only if the request is not canceled. // Put the response only if the request is not canceled.
entry.res <- responses[i] entry.res <- responses[i]
@ -808,7 +773,6 @@ func sendBatchRequest(
req *tikvpb.BatchCommandsRequest_Request, req *tikvpb.BatchCommandsRequest_Request,
timeout time.Duration, timeout time.Duration,
) (*tikvrpc.Response, error) { ) (*tikvrpc.Response, error) {
start := time.Now()
entry := &batchCommandsEntry{ entry := &batchCommandsEntry{
ctx: ctx, ctx: ctx,
req: req, req: req,
@ -816,11 +780,11 @@ func sendBatchRequest(
forwardedHost: forwardedHost, forwardedHost: forwardedHost,
canceled: 0, canceled: 0,
err: nil, err: nil,
start: start,
} }
timer := time.NewTimer(timeout) timer := time.NewTimer(timeout)
defer timer.Stop() defer timer.Stop()
start := time.Now()
select { select {
case batchConn.batchCommandsCh <- entry: case batchConn.batchCommandsCh <- entry:
case <-ctx.Done(): case <-ctx.Done():
@ -831,7 +795,7 @@ func sendBatchRequest(
return nil, errors.WithMessage(context.DeadlineExceeded, "wait sendLoop") return nil, errors.WithMessage(context.DeadlineExceeded, "wait sendLoop")
} }
waitDuration := time.Since(start) waitDuration := time.Since(start)
metrics.TiKVBatchCmdDuration.WithLabelValues("send-to-chan", addr).Observe(float64(waitDuration)) metrics.TiKVBatchWaitDuration.Observe(float64(waitDuration))
select { select {
case res, ok := <-entry.res: case res, ok := <-entry.res:

View File

@ -62,8 +62,7 @@ var (
TiKVLocalLatchWaitTimeHistogram prometheus.Histogram TiKVLocalLatchWaitTimeHistogram prometheus.Histogram
TiKVStatusDuration *prometheus.HistogramVec TiKVStatusDuration *prometheus.HistogramVec
TiKVStatusCounter *prometheus.CounterVec TiKVStatusCounter *prometheus.CounterVec
TiKVBatchConnSendDuration *prometheus.HistogramVec TiKVBatchWaitDuration prometheus.Histogram
TiKVBatchCmdDuration *prometheus.HistogramVec
TiKVBatchSendLatency prometheus.Histogram TiKVBatchSendLatency prometheus.Histogram
TiKVBatchWaitOverLoad prometheus.Counter TiKVBatchWaitOverLoad prometheus.Counter
TiKVBatchPendingRequests *prometheus.HistogramVec TiKVBatchPendingRequests *prometheus.HistogramVec
@ -334,25 +333,15 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) {
ConstLabels: constLabels, ConstLabels: constLabels,
}, []string{LblResult}) }, []string{LblResult})
TiKVBatchConnSendDuration = prometheus.NewHistogramVec( TiKVBatchWaitDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{ prometheus.HistogramOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "batch_conn_send_seconds", Name: "batch_wait_duration",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 22), // 0.5ms ~ 1048s Buckets: prometheus.ExponentialBuckets(1, 2, 34), // 1ns ~ 8s
Help: "batch conn send duration", Help: "batch wait duration",
ConstLabels: constLabels, ConstLabels: constLabels,
}, []string{LblStore}) })
TiKVBatchCmdDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "batch_cmd_duration",
Buckets: prometheus.ExponentialBuckets(16, 2, 36), // 16ns ~ 549s
Help: "batch cmd duration, unit is nanosecond",
ConstLabels: constLabels,
}, []string{LblType, LblStore})
TiKVBatchSendLatency = prometheus.NewHistogram( TiKVBatchSendLatency = prometheus.NewHistogram(
prometheus.HistogramOpts{ prometheus.HistogramOpts{
@ -778,8 +767,7 @@ func RegisterMetrics() {
prometheus.MustRegister(TiKVLocalLatchWaitTimeHistogram) prometheus.MustRegister(TiKVLocalLatchWaitTimeHistogram)
prometheus.MustRegister(TiKVStatusDuration) prometheus.MustRegister(TiKVStatusDuration)
prometheus.MustRegister(TiKVStatusCounter) prometheus.MustRegister(TiKVStatusCounter)
prometheus.MustRegister(TiKVBatchConnSendDuration) prometheus.MustRegister(TiKVBatchWaitDuration)
prometheus.MustRegister(TiKVBatchCmdDuration)
prometheus.MustRegister(TiKVBatchSendLatency) prometheus.MustRegister(TiKVBatchSendLatency)
prometheus.MustRegister(TiKVBatchRecvLatency) prometheus.MustRegister(TiKVBatchRecvLatency)
prometheus.MustRegister(TiKVBatchWaitOverLoad) prometheus.MustRegister(TiKVBatchWaitOverLoad)