From 57c12f7c64f626a478374de3c405c5134ea3cd42 Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Wed, 13 Jul 2022 16:56:47 +0800 Subject: [PATCH] Handle more detailed statistics from TiKV (#536) * support more detailed statistics returned from TiKV Signed-off-by: Yilin Chen * fix log formatting Signed-off-by: Yilin Chen * add to test Signed-off-by: Yilin Chen --- go.mod | 2 +- go.sum | 4 +- integration_tests/go.sum | 1 - integration_tests/snapshot_test.go | 3 + internal/client/client.go | 51 +++-- metrics/metrics.go | 11 + tikvrpc/tikvrpc.go | 19 ++ txnkv/transaction/commit.go | 5 +- txnkv/transaction/pessimistic.go | 8 +- txnkv/transaction/prewrite.go | 8 +- util/execdetails.go | 309 +++++++++++++++++++++++++++-- 11 files changed, 377 insertions(+), 44 deletions(-) diff --git a/go.mod b/go.mod index 44fd6431..c662d144 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/opentracing/opentracing-go v1.2.0 github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 - github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305 + github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.11.0 diff --git a/go.sum b/go.sum index 4064e616..76b2bf9f 100644 --- a/go.sum +++ b/go.sum @@ -161,8 +161,8 @@ github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305 h1:TZ0teMZoKHnZDlJxNkWrp5Sgv3w+ruNbrqtBYKsfaNw= -github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= +github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a h1:nP2wmyw9JTRsk5rm+tZtfAso6c/1FvuaFNbXTaYz3FE= +github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee h1:VO2t6IBpfvW34TdtD/G10VvnGqjLic1jzOuHjUb5VqM= github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/integration_tests/go.sum b/integration_tests/go.sum index 634e18a5..53a36227 100644 --- a/integration_tests/go.sum +++ b/integration_tests/go.sum @@ -861,7 +861,6 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a h1:nP2wmyw9JTRsk5rm+tZtfAso6c/1FvuaFNbXTaYz3FE= github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= diff --git a/integration_tests/snapshot_test.go b/integration_tests/snapshot_test.go index 3f7515d1..a9880227 100644 --- a/integration_tests/snapshot_test.go +++ b/integration_tests/snapshot_test.go @@ -309,6 +309,7 @@ func (s *testSnapshotSuite) TestSnapshotRuntimeStats() { ProcessedVersions: 10, ProcessedVersionsSize: 10, TotalVersions: 15, + GetSnapshotNanos: 500, RocksdbBlockReadCount: 20, RocksdbBlockReadByte: 15, RocksdbDeleteSkippedCount: 5, @@ -322,6 +323,7 @@ func (s *testSnapshotSuite) TestSnapshotRuntimeStats() { "scan_detail: {total_process_keys: 10, " + "total_process_keys_size: 10, " + "total_keys: 15, " + + "get_snapshot_time: 500ns, " + "rocksdb: {delete_skipped_count: 5, " + "key_skipped_count: 1, " + "block: {cache_hit_count: 10, read_count: 20, read_byte: 15 Bytes}}}" @@ -332,6 +334,7 @@ func (s *testSnapshotSuite) TestSnapshotRuntimeStats() { "scan_detail: {total_process_keys: 20, " + "total_process_keys_size: 20, " + "total_keys: 30, " + + "get_snapshot_time: 1µs, " + "rocksdb: {delete_skipped_count: 10, " + "key_skipped_count: 2, " + "block: {cache_hit_count: 20, read_count: 40, read_byte: 30 Bytes}}}" diff --git a/internal/client/client.go b/internal/client/client.go index 55cfabaf..a5592249 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -374,8 +374,9 @@ func (c *RPCClient) closeConns() { } var ( - sendReqHistCache sync.Map - sendReqCounterCache sync.Map + sendReqHistCache sync.Map + sendReqCounterCache sync.Map + rpcNetLatencyHistCache sync.Map ) type sendReqHistCacheKey struct { @@ -394,10 +395,14 @@ type sendReqCounterCacheValue struct { timeCounter prometheus.Counter } -func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, start time.Time, staleRead bool) { +func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, resp *tikvrpc.Response, start time.Time, staleRead bool) { + elapsed := time.Since(start) + secs := elapsed.Seconds() + storeID := req.Context.GetPeer().GetStoreId() + histKey := sendReqHistCacheKey{ req.Type, - req.Context.GetPeer().GetStoreId(), + storeID, staleRead, } counterKey := sendReqCounterCacheKey{ @@ -405,31 +410,49 @@ func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, start time. req.GetRequestSource(), } + reqType := req.Type.String() + var storeIDStr string + hist, ok := sendReqHistCache.Load(histKey) if !ok { - reqType := req.Type.String() - storeID := strconv.FormatUint(req.Context.GetPeer().GetStoreId(), 10) - hist = metrics.TiKVSendReqHistogram.WithLabelValues(reqType, storeID, strconv.FormatBool(staleRead)) + if len(storeIDStr) == 0 { + storeIDStr = strconv.FormatUint(storeID, 10) + } + hist = metrics.TiKVSendReqHistogram.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead)) sendReqHistCache.Store(histKey, hist) } counter, ok := sendReqCounterCache.Load(counterKey) if !ok { - reqType := req.Type.String() - storeID := strconv.FormatUint(req.Context.GetPeer().GetStoreId(), 10) + if len(storeIDStr) == 0 { + storeIDStr = strconv.FormatUint(storeID, 10) + } counter = sendReqCounterCacheValue{ - metrics.TiKVSendReqCounter.WithLabelValues(reqType, storeID, strconv.FormatBool(staleRead), counterKey.requestSource), - metrics.TiKVSendReqTimeCounter.WithLabelValues(reqType, storeID, strconv.FormatBool(staleRead), counterKey.requestSource), + metrics.TiKVSendReqCounter.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead), counterKey.requestSource), + metrics.TiKVSendReqTimeCounter.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead), counterKey.requestSource), } sendReqCounterCache.Store(counterKey, counter) } - secs := time.Since(start).Seconds() hist.(prometheus.Observer).Observe(secs) counter.(sendReqCounterCacheValue).counter.Inc() counter.(sendReqCounterCacheValue).timeCounter.Add(secs) + + if execDetail, err := resp.GetExecDetailsV2(); err == nil && + execDetail != nil && execDetail.TimeDetail != nil && execDetail.TimeDetail.TotalRpcWallTimeNs > 0 { + latHist, ok := rpcNetLatencyHistCache.Load(storeID) + if !ok { + if len(storeIDStr) == 0 { + storeIDStr = strconv.FormatUint(storeID, 10) + } + latHist = metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(storeIDStr) + sendReqHistCache.Store(storeID, latHist) + } + latency := elapsed - time.Duration(execDetail.TimeDetail.TotalRpcWallTimeNs)*time.Nanosecond + latHist.(prometheus.Observer).Observe(latency.Seconds()) + } } -func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { +func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (resp *tikvrpc.Response, err error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan(fmt.Sprintf("rpcClient.SendRequest, region ID: %d, type: %s", req.RegionId, req.Type), opentracing.ChildOf(span.Context())) defer span1.Finish() @@ -456,7 +479,7 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R detail := stmtExec.(*util.ExecDetails) atomic.AddInt64(&detail.WaitKVRespDuration, int64(time.Since(start))) } - c.updateTiKVSendReqHistogram(req, start, staleRead) + c.updateTiKVSendReqHistogram(req, resp, start, staleRead) }() // TiDB RPC server supports batch RPC, but batch connection will send heart beat, It's not necessary since diff --git a/metrics/metrics.go b/metrics/metrics.go index 0d2c8711..38b2fcd9 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -46,6 +46,7 @@ var ( TiKVSendReqHistogram *prometheus.HistogramVec TiKVSendReqCounter *prometheus.CounterVec TiKVSendReqTimeCounter *prometheus.CounterVec + TiKVRPCNetLatencyHistogram *prometheus.HistogramVec TiKVCoprocessorHistogram *prometheus.HistogramVec TiKVLockResolverCounter *prometheus.CounterVec TiKVRegionErrorCounter *prometheus.CounterVec @@ -161,6 +162,15 @@ func initMetrics(namespace, subsystem string) { Help: "Counter of request time with multi dimensions.", }, []string{LblType, LblStore, LblStaleRead, LblSource}) + TiKVRPCNetLatencyHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "rpc_net_latency_seconds", + Help: "Bucketed histogram of time difference between TiDB and TiKV.", + Buckets: prometheus.ExponentialBuckets(5e-5, 2, 18), // 50us ~ 6.5s + }, []string{LblStore}) + TiKVCoprocessorHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: namespace, @@ -599,6 +609,7 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVSendReqHistogram) prometheus.MustRegister(TiKVSendReqCounter) prometheus.MustRegister(TiKVSendReqTimeCounter) + prometheus.MustRegister(TiKVRPCNetLatencyHistogram) prometheus.MustRegister(TiKVCoprocessorHistogram) prometheus.MustRegister(TiKVLockResolverCounter) prometheus.MustRegister(TiKVRegionErrorCounter) diff --git a/tikvrpc/tikvrpc.go b/tikvrpc/tikvrpc.go index 52a84549..cd565da2 100644 --- a/tikvrpc/tikvrpc.go +++ b/tikvrpc/tikvrpc.go @@ -926,6 +926,25 @@ func (resp *Response) GetRegionError() (*errorpb.Error, error) { return err.GetRegionError(), nil } +type getExecDetailsV2 interface { + GetExecDetailsV2() *kvrpcpb.ExecDetailsV2 +} + +// GetExecDetailsV2 returns the ExecDetailsV2 of the underlying concrete response. +func (resp *Response) GetExecDetailsV2() (*kvrpcpb.ExecDetailsV2, error) { + if resp == nil || resp.Resp == nil { + return nil, nil + } + details, ok := resp.Resp.(getExecDetailsV2) + if !ok { + if _, isEmpty := resp.Resp.(*tikvpb.BatchCommandsEmptyResponse); isEmpty { + return nil, nil + } + return nil, errors.Errorf("invalid response type %v", resp) + } + return details.GetExecDetailsV2(), nil +} + // CallRPC launches a rpc call. // ch is needed to implement timeout for coprocessor streaming, the stream object's // cancel function will be sent to the channel, together with a lease checked by a background goroutine. diff --git a/txnkv/transaction/commit.go b/txnkv/transaction/commit.go index 88ccc4a1..5e4d8282 100644 --- a/txnkv/transaction/commit.go +++ b/txnkv/transaction/commit.go @@ -88,7 +88,8 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, sender := locate.NewRegionRequestSender(c.store.GetRegionCache(), c.store.GetTiKVClient()) for { attempts++ - if time.Since(tBegin) > slowRequestThreshold { + reqBegin := time.Now() + if reqBegin.Sub(tBegin) > slowRequestThreshold { logutil.BgLogger().Warn("slow commit request", zap.Uint64("startTS", c.startTS), zap.Stringer("region", &batch.region), zap.Int("attempts", attempts)) tBegin = time.Now() } @@ -140,6 +141,8 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, // we can clean undetermined error. if batch.isPrimary && !c.isAsyncCommit() { c.setUndeterminedErr(nil) + reqDuration := time.Since(reqBegin) + c.getDetail().MergeReqDetails(reqDuration, batch.region.GetID(), sender.GetStoreAddr(), commitResp.ExecDetailsV2) } if keyErr := commitResp.GetError(); keyErr != nil { if rejected := keyErr.GetCommitTsExpired(); rejected != nil { diff --git a/txnkv/transaction/pessimistic.go b/txnkv/transaction/pessimistic.go index 81fb1d69..e9705061 100644 --- a/txnkv/transaction/pessimistic.go +++ b/txnkv/transaction/pessimistic.go @@ -144,10 +144,12 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * time.Sleep(300 * time.Millisecond) return errors.WithStack(&tikverr.ErrWriteConflict{WriteConflict: nil}) } + sender := locate.NewRegionRequestSender(c.store.GetRegionCache(), c.store.GetTiKVClient()) startTime := time.Now() - resp, err := c.store.SendReq(bo, req, batch.region, client.ReadTimeoutShort) + resp, err := sender.SendReq(bo, req, batch.region, client.ReadTimeoutShort) + reqDuration := time.Since(startTime) if action.LockCtx.Stats != nil { - atomic.AddInt64(&action.LockCtx.Stats.LockRPCTime, int64(time.Since(startTime))) + atomic.AddInt64(&action.LockCtx.Stats.LockRPCTime, int64(reqDuration)) atomic.AddInt64(&action.LockCtx.Stats.LockRPCCount, 1) } if err != nil { @@ -183,6 +185,8 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * lockResp := resp.Resp.(*kvrpcpb.PessimisticLockResponse) keyErrs := lockResp.GetErrors() if len(keyErrs) == 0 { + action.LockCtx.Stats.MergeReqDetails(reqDuration, batch.region.GetID(), sender.GetStoreAddr(), lockResp.ExecDetailsV2) + if batch.isPrimary { // After locking the primary key, we should protect the primary lock from expiring // now in case locking the remaining keys take a long time. diff --git a/txnkv/transaction/prewrite.go b/txnkv/transaction/prewrite.go index 738fb74e..8b4161d9 100644 --- a/txnkv/transaction/prewrite.go +++ b/txnkv/transaction/prewrite.go @@ -247,7 +247,8 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B if attempts > 1 || action.retry { req.IsRetryRequest = true } - if time.Since(tBegin) > slowRequestThreshold { + reqBegin := time.Now() + if reqBegin.Sub(tBegin) > slowRequestThreshold { logutil.BgLogger().Warn("slow prewrite request", zap.Uint64("startTS", c.startTS), zap.Stringer("region", &batch.region), zap.Int("attempts", attempts)) tBegin = time.Now() } @@ -305,6 +306,10 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B // Clear the RPC Error since the request is evaluated successfully. sender.SetRPCError(nil) + // Update CommitDetails + reqDuration := time.Since(reqBegin) + c.getDetail().MergeReqDetails(reqDuration, batch.region.GetID(), sender.GetStoreAddr(), prewriteResp.ExecDetailsV2) + if batch.isPrimary { // After writing the primary key, if the size of the transaction is larger than 32M, // start the ttlManager. The ttlManager will be closed in tikvTxn.Commit(). @@ -358,6 +363,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B c.mu.Unlock() } } + return nil } var locks []*txnlock.Lock diff --git a/util/execdetails.go b/util/execdetails.go index 708321fa..6c6d78f3 100644 --- a/util/execdetails.go +++ b/util/execdetails.go @@ -60,6 +60,54 @@ var ( ExecDetailsKey = execDetailsCtxKeyType{} ) +// TiKVExecDetails is the detail execution information at TiKV side. +type TiKVExecDetails struct { + TimeDetail *TimeDetail + ScanDetail *ScanDetail + WriteDetail *WriteDetail +} + +// NewTiKVExecDetails creates a TiKVExecDetails from a kvproto ExecDetailsV2. +func NewTiKVExecDetails(pb *kvrpcpb.ExecDetailsV2) TiKVExecDetails { + if pb == nil { + return TiKVExecDetails{} + } + td := &TimeDetail{} + td.MergeFromTimeDetail(pb.TimeDetail) + sd := &ScanDetail{} + sd.MergeFromScanDetailV2(pb.ScanDetailV2) + wd := &WriteDetail{} + wd.MergeFromWriteDetailPb(pb.WriteDetail) + return TiKVExecDetails{ + TimeDetail: td, + ScanDetail: sd, + WriteDetail: wd, + } +} + +func (ed *TiKVExecDetails) String() string { + if ed == nil { + return "" + } + buf := bytes.NewBuffer(make([]byte, 0, 16)) + if ed.TimeDetail != nil { + buf.WriteString(ed.TimeDetail.String()) + } + if ed.ScanDetail != nil { + if buf.Len() > 0 { + buf.WriteString(", ") + } + buf.WriteString(ed.ScanDetail.String()) + } + if ed.WriteDetail != nil { + if buf.Len() > 0 { + buf.WriteString(", ") + } + buf.WriteString(ed.WriteDetail.String()) + } + return buf.String() +} + // CommitDetails contains commit detail information. type CommitDetails struct { GetCommitTsTime time.Duration @@ -70,8 +118,12 @@ type CommitDetails struct { LocalLatchTime time.Duration Mu struct { sync.Mutex - CommitBackoffTime int64 - BackoffTypes []string + CommitBackoffTime int64 + BackoffTypes []string + SlowestReqTotalTime time.Duration + SlowestRegion uint64 + SlowestStoreAddr string + SlowestExecDetails TiKVExecDetails } WriteKeys int WriteSize int @@ -95,6 +147,27 @@ func (cd *CommitDetails) Merge(other *CommitDetails) { cd.TxnRetry += other.TxnRetry cd.Mu.CommitBackoffTime += other.Mu.CommitBackoffTime cd.Mu.BackoffTypes = append(cd.Mu.BackoffTypes, other.Mu.BackoffTypes...) + if cd.Mu.SlowestReqTotalTime < other.Mu.SlowestReqTotalTime { + cd.Mu.SlowestReqTotalTime = other.Mu.SlowestReqTotalTime + cd.Mu.SlowestRegion = other.Mu.SlowestRegion + cd.Mu.SlowestStoreAddr = other.Mu.SlowestStoreAddr + cd.Mu.SlowestExecDetails = other.Mu.SlowestExecDetails + } +} + +// MergeReqDetails merges ExecDetailsV2 into the current CommitDetails. +func (cd *CommitDetails) MergeReqDetails(reqDuration time.Duration, regionID uint64, addr string, execDetails *kvrpcpb.ExecDetailsV2) { + if cd == nil { + return + } + cd.Mu.Lock() + defer cd.Mu.Unlock() + if reqDuration > cd.Mu.SlowestReqTotalTime { + cd.Mu.SlowestReqTotalTime = reqDuration + cd.Mu.SlowestRegion = regionID + cd.Mu.SlowestStoreAddr = addr + cd.Mu.SlowestExecDetails = NewTiKVExecDetails(execDetails) + } } // Clone returns a deep copy of itself. @@ -114,6 +187,10 @@ func (cd *CommitDetails) Clone() *CommitDetails { } commit.Mu.BackoffTypes = append([]string{}, cd.Mu.BackoffTypes...) commit.Mu.CommitBackoffTime = cd.Mu.CommitBackoffTime + commit.Mu.SlowestReqTotalTime = cd.Mu.SlowestReqTotalTime + commit.Mu.SlowestRegion = cd.Mu.SlowestRegion + commit.Mu.SlowestStoreAddr = cd.Mu.SlowestStoreAddr + commit.Mu.SlowestExecDetails = cd.Mu.SlowestExecDetails return commit } @@ -126,7 +203,11 @@ type LockKeysDetails struct { BackoffTime int64 Mu struct { sync.Mutex - BackoffTypes []string + BackoffTypes []string + SlowestReqTotalTime time.Duration + SlowestRegion uint64 + SlowestStoreAddr string + SlowestExecDetails TiKVExecDetails } LockRPCTime int64 LockRPCCount int64 @@ -144,6 +225,27 @@ func (ld *LockKeysDetails) Merge(lockKey *LockKeysDetails) { ld.LockRPCCount += ld.LockRPCCount ld.Mu.BackoffTypes = append(ld.Mu.BackoffTypes, lockKey.Mu.BackoffTypes...) ld.RetryCount++ + if ld.Mu.SlowestReqTotalTime < lockKey.Mu.SlowestReqTotalTime { + ld.Mu.SlowestReqTotalTime = lockKey.Mu.SlowestReqTotalTime + ld.Mu.SlowestRegion = lockKey.Mu.SlowestRegion + ld.Mu.SlowestStoreAddr = lockKey.Mu.SlowestStoreAddr + ld.Mu.SlowestExecDetails = lockKey.Mu.SlowestExecDetails + } +} + +// MergeReqDetails merges ExecDetailsV2 into the current LockKeysDetails. +func (ld *LockKeysDetails) MergeReqDetails(reqDuration time.Duration, regionID uint64, addr string, execDetails *kvrpcpb.ExecDetailsV2) { + if ld == nil { + return + } + ld.Mu.Lock() + defer ld.Mu.Unlock() + if reqDuration > ld.Mu.SlowestReqTotalTime { + ld.Mu.SlowestReqTotalTime = reqDuration + ld.Mu.SlowestRegion = regionID + ld.Mu.SlowestStoreAddr = addr + ld.Mu.SlowestExecDetails = NewTiKVExecDetails(execDetails) + } } // Clone returns a deep copy of itself. @@ -159,6 +261,10 @@ func (ld *LockKeysDetails) Clone() *LockKeysDetails { ResolveLock: ld.ResolveLock, } lock.Mu.BackoffTypes = append([]string{}, ld.Mu.BackoffTypes...) + lock.Mu.SlowestReqTotalTime = ld.Mu.SlowestReqTotalTime + lock.Mu.SlowestRegion = ld.Mu.SlowestRegion + lock.Mu.SlowestStoreAddr = ld.Mu.SlowestStoreAddr + lock.Mu.SlowestExecDetails = ld.Mu.SlowestExecDetails return lock } @@ -231,7 +337,12 @@ type ScanDetail struct { RocksdbBlockReadCount uint64 // RocksdbBlockReadByte is the total number of bytes from block reads. RocksdbBlockReadByte uint64 - ResolveLock *ResolveLockDetail + // RocksdbBlockReadDuration is the total time used for block reads. + RocksdbBlockReadDuration time.Duration + // GetSnapshotDuration is the time spent getting an engine snapshot. + GetSnapshotDuration time.Duration + + ResolveLock *ResolveLockDetail } // Merge merges scan detail execution details into self. @@ -244,6 +355,8 @@ func (sd *ScanDetail) Merge(scanDetail *ScanDetail) { atomic.AddUint64(&sd.RocksdbBlockCacheHitCount, scanDetail.RocksdbBlockCacheHitCount) atomic.AddUint64(&sd.RocksdbBlockReadCount, scanDetail.RocksdbBlockReadCount) atomic.AddUint64(&sd.RocksdbBlockReadByte, scanDetail.RocksdbBlockReadByte) + atomic.AddInt64((*int64)(&sd.RocksdbBlockReadDuration), int64(scanDetail.RocksdbBlockReadDuration)) + atomic.AddInt64((*int64)(&sd.GetSnapshotDuration), int64(scanDetail.GetSnapshotDuration)) } var zeroScanDetail = ScanDetail{} @@ -255,24 +368,60 @@ func (sd *ScanDetail) String() string { } buf := bytes.NewBuffer(make([]byte, 0, 16)) buf.WriteString("scan_detail: {") - buf.WriteString("total_process_keys: ") - buf.WriteString(strconv.FormatInt(sd.ProcessedKeys, 10)) - buf.WriteString(", total_process_keys_size: ") - buf.WriteString(strconv.FormatInt(sd.ProcessedKeysSize, 10)) - buf.WriteString(", total_keys: ") - buf.WriteString(strconv.FormatInt(sd.TotalKeys, 10)) - buf.WriteString(", rocksdb: {") - buf.WriteString("delete_skipped_count: ") - buf.WriteString(strconv.FormatUint(sd.RocksdbDeleteSkippedCount, 10)) - buf.WriteString(", key_skipped_count: ") - buf.WriteString(strconv.FormatUint(sd.RocksdbKeySkippedCount, 10)) - buf.WriteString(", block: {") - buf.WriteString("cache_hit_count: ") - buf.WriteString(strconv.FormatUint(sd.RocksdbBlockCacheHitCount, 10)) - buf.WriteString(", read_count: ") - buf.WriteString(strconv.FormatUint(sd.RocksdbBlockReadCount, 10)) - buf.WriteString(", read_byte: ") - buf.WriteString(FormatBytes(int64(sd.RocksdbBlockReadByte))) + if sd.ProcessedKeys > 0 { + buf.WriteString("total_process_keys: ") + buf.WriteString(strconv.FormatInt(sd.ProcessedKeys, 10)) + buf.WriteString(", ") + } + if sd.ProcessedKeysSize > 0 { + buf.WriteString("total_process_keys_size: ") + buf.WriteString(strconv.FormatInt(sd.ProcessedKeysSize, 10)) + buf.WriteString(", ") + } + if sd.TotalKeys > 0 { + buf.WriteString("total_keys: ") + buf.WriteString(strconv.FormatInt(sd.TotalKeys, 10)) + buf.WriteString(", ") + } + if sd.GetSnapshotDuration > 0 { + buf.WriteString("get_snapshot_time: ") + buf.WriteString(FormatDuration(sd.GetSnapshotDuration)) + buf.WriteString(", ") + } + buf.WriteString("rocksdb: {") + if sd.RocksdbDeleteSkippedCount > 0 { + buf.WriteString("delete_skipped_count: ") + buf.WriteString(strconv.FormatUint(sd.RocksdbDeleteSkippedCount, 10)) + buf.WriteString(", ") + } + if sd.RocksdbKeySkippedCount > 0 { + buf.WriteString("key_skipped_count: ") + buf.WriteString(strconv.FormatUint(sd.RocksdbKeySkippedCount, 10)) + buf.WriteString(", ") + } + buf.WriteString("block: {") + if sd.RocksdbBlockCacheHitCount > 0 { + buf.WriteString("cache_hit_count: ") + buf.WriteString(strconv.FormatUint(sd.RocksdbBlockCacheHitCount, 10)) + buf.WriteString(", ") + } + if sd.RocksdbBlockReadCount > 0 { + buf.WriteString("read_count: ") + buf.WriteString(strconv.FormatUint(sd.RocksdbBlockReadCount, 10)) + buf.WriteString(", ") + } + if sd.RocksdbBlockReadByte > 0 { + buf.WriteString("read_byte: ") + buf.WriteString(FormatBytes(int64(sd.RocksdbBlockReadByte))) + buf.WriteString(", ") + } + if sd.RocksdbBlockReadDuration > 0 { + buf.WriteString("read_time: ") + buf.WriteString(FormatDuration(sd.RocksdbBlockReadDuration)) + } + if buf.Bytes()[buf.Len()-2] == ',' { + buf.Truncate(buf.Len() - 2) + } buf.WriteString("}}}") return buf.String() } @@ -288,9 +437,115 @@ func (sd *ScanDetail) MergeFromScanDetailV2(scanDetail *kvrpcpb.ScanDetailV2) { sd.RocksdbBlockCacheHitCount += scanDetail.RocksdbBlockCacheHitCount sd.RocksdbBlockReadCount += scanDetail.RocksdbBlockReadCount sd.RocksdbBlockReadByte += scanDetail.RocksdbBlockReadByte + sd.RocksdbBlockReadDuration += time.Duration(scanDetail.RocksdbBlockReadNanos) * time.Nanosecond + sd.GetSnapshotDuration += time.Duration(scanDetail.GetSnapshotNanos) * time.Nanosecond } } +// WriteDetail contains the detailed time breakdown of a write operation. +type WriteDetail struct { + // StoreBatchWaitDuration is the wait duration in the store loop. + StoreBatchWaitDuration time.Duration + // ProposeSendWaitDuration is the duration before sending proposal to peers. + ProposeSendWaitDuration time.Duration + // PersistLogDuration is the total time spent on persisting the log. + PersistLogDuration time.Duration + // RaftDbWriteLeaderWaitDuration is the wait time until the Raft log write leader begins to write. + RaftDbWriteLeaderWaitDuration time.Duration + // RaftDbSyncLogDuration is the time spent on synchronizing the Raft log to the disk. + RaftDbSyncLogDuration time.Duration + // RaftDbWriteMemtableDuration is the time spent on writing the Raft log to the Raft memtable. + RaftDbWriteMemtableDuration time.Duration + // CommitLogDuration is the time waiting for peers to confirm the proposal (counting from the instant when the leader sends the proposal message). + CommitLogDuration time.Duration + // ApplyBatchWaitDuration is the wait duration in the apply loop. + ApplyBatchWaitDuration time.Duration + // ApplyLogDuration is the total time spend to applying the log. + ApplyLogDuration time.Duration + // ApplyMutexLockDuration is the wait time until the KV RocksDB lock is acquired. + ApplyMutexLockDuration time.Duration + // ApplyWriteLeaderWaitDuration is the wait time until becoming the KV RocksDB write leader. + ApplyWriteLeaderWaitDuration time.Duration + // ApplyWriteWalDuration is the time spent on writing the KV DB WAL to the disk. + ApplyWriteWalDuration time.Duration + // ApplyWriteMemtableNanos is the time spent on writing to the memtable of the KV RocksDB. + ApplyWriteMemtableDuration time.Duration +} + +// MergeFromWriteDetailPb merges WriteDetail protobuf into the current WriteDetail +func (wd *WriteDetail) MergeFromWriteDetailPb(pb *kvrpcpb.WriteDetail) { + if pb != nil { + wd.StoreBatchWaitDuration += time.Duration(pb.StoreBatchWaitNanos) * time.Nanosecond + wd.ProposeSendWaitDuration += time.Duration(pb.ProposeSendWaitNanos) * time.Nanosecond + wd.PersistLogDuration += time.Duration(pb.PersistLogNanos) * time.Nanosecond + wd.RaftDbWriteLeaderWaitDuration += time.Duration(pb.RaftDbWriteLeaderWaitNanos) * time.Nanosecond + wd.RaftDbSyncLogDuration += time.Duration(pb.RaftDbSyncLogNanos) * time.Nanosecond + wd.RaftDbWriteMemtableDuration += time.Duration(pb.RaftDbWriteMemtableNanos) * time.Nanosecond + wd.CommitLogDuration += time.Duration(pb.CommitLogNanos) * time.Nanosecond + wd.ApplyBatchWaitDuration += time.Duration(pb.ApplyBatchWaitNanos) * time.Nanosecond + wd.ApplyLogDuration += time.Duration(pb.ApplyLogNanos) * time.Nanosecond + wd.ApplyMutexLockDuration += time.Duration(pb.ApplyMutexLockNanos) * time.Nanosecond + wd.ApplyWriteLeaderWaitDuration += time.Duration(pb.ApplyWriteLeaderWaitNanos) * time.Nanosecond + wd.ApplyWriteWalDuration += time.Duration(pb.ApplyWriteWalNanos) * time.Nanosecond + wd.ApplyWriteMemtableDuration += time.Duration(pb.ApplyWriteMemtableNanos) * time.Nanosecond + } +} + +// Merge merges another WriteDetail protobuf into self. +func (wd *WriteDetail) Merge(writeDetail *WriteDetail) { + atomic.AddInt64((*int64)(&wd.StoreBatchWaitDuration), int64(writeDetail.StoreBatchWaitDuration)) + atomic.AddInt64((*int64)(&wd.ProposeSendWaitDuration), int64(writeDetail.ProposeSendWaitDuration)) + atomic.AddInt64((*int64)(&wd.PersistLogDuration), int64(writeDetail.PersistLogDuration)) + atomic.AddInt64((*int64)(&wd.RaftDbWriteLeaderWaitDuration), int64(writeDetail.RaftDbWriteLeaderWaitDuration)) + atomic.AddInt64((*int64)(&wd.RaftDbSyncLogDuration), int64(writeDetail.RaftDbSyncLogDuration)) + atomic.AddInt64((*int64)(&wd.RaftDbWriteMemtableDuration), int64(writeDetail.RaftDbWriteMemtableDuration)) + atomic.AddInt64((*int64)(&wd.CommitLogDuration), int64(writeDetail.CommitLogDuration)) + atomic.AddInt64((*int64)(&wd.ApplyBatchWaitDuration), int64(writeDetail.ApplyBatchWaitDuration)) + atomic.AddInt64((*int64)(&wd.ApplyLogDuration), int64(writeDetail.ApplyLogDuration)) + atomic.AddInt64((*int64)(&wd.ApplyMutexLockDuration), int64(writeDetail.ApplyMutexLockDuration)) + atomic.AddInt64((*int64)(&wd.ApplyWriteLeaderWaitDuration), int64(writeDetail.ApplyWriteLeaderWaitDuration)) + atomic.AddInt64((*int64)(&wd.ApplyWriteWalDuration), int64(writeDetail.ApplyWriteWalDuration)) + atomic.AddInt64((*int64)(&wd.ApplyWriteMemtableDuration), int64(writeDetail.ApplyWriteMemtableDuration)) +} + +var zeroWriteDetail = WriteDetail{} + +func (wd *WriteDetail) String() string { + if wd == nil || *wd == zeroWriteDetail { + return "" + } + buf := bytes.NewBuffer(make([]byte, 0, 64)) + buf.WriteString("write_detail: {") + buf.WriteString("store_batch_wait: ") + buf.WriteString(FormatDuration(wd.StoreBatchWaitDuration)) + buf.WriteString(", propose_send_wait: ") + buf.WriteString(FormatDuration(wd.ProposeSendWaitDuration)) + buf.WriteString(", persist_log: {total: ") + buf.WriteString(FormatDuration(wd.PersistLogDuration)) + buf.WriteString(", write_leader_wait: ") + buf.WriteString(FormatDuration(wd.RaftDbWriteLeaderWaitDuration)) + buf.WriteString(", sync_log: ") + buf.WriteString(FormatDuration(wd.RaftDbSyncLogDuration)) + buf.WriteString(", write_memtable: ") + buf.WriteString(FormatDuration(wd.RaftDbWriteMemtableDuration)) + buf.WriteString("}, commit_log: ") + buf.WriteString(FormatDuration(wd.CommitLogDuration)) + buf.WriteString(", apply_batch_wait: ") + buf.WriteString(FormatDuration(wd.ApplyBatchWaitDuration)) + buf.WriteString(", apply: {total:") + buf.WriteString(FormatDuration(wd.ApplyLogDuration)) + buf.WriteString(", mutex_lock: ") + buf.WriteString(FormatDuration(wd.ApplyMutexLockDuration)) + buf.WriteString(", write_leader_wait: ") + buf.WriteString(FormatDuration(wd.ApplyWriteLeaderWaitDuration)) + buf.WriteString(", write_wal: ") + buf.WriteString(FormatDuration(wd.ApplyWriteWalDuration)) + buf.WriteString(", write_memtable: ") + buf.WriteString(FormatDuration(wd.ApplyWriteMemtableDuration)) + buf.WriteString("}}") + return buf.String() +} + // TimeDetail contains coprocessor time detail information. type TimeDetail struct { // Off-cpu and on-cpu wall time elapsed to actually process the request payload. It does not @@ -304,6 +559,8 @@ type TimeDetail struct { WaitTime time.Duration // KvReadWallTimeMs is the time used in KV Scan/Get. KvReadWallTimeMs time.Duration + // TotalRPCWallTime is Total wall clock time spent on this RPC in TiKV. + TotalRPCWallTime time.Duration } // String implements the fmt.Stringer interface. @@ -323,6 +580,13 @@ func (td *TimeDetail) String() string { buf.WriteString("total_wait_time: ") buf.WriteString(FormatDuration(td.WaitTime)) } + if td.TotalRPCWallTime > 0 { + if buf.Len() > 0 { + buf.WriteString(", ") + } + buf.WriteString("tikv_wall_time: ") + buf.WriteString(FormatDuration(td.TotalRPCWallTime)) + } return buf.String() } @@ -332,6 +596,7 @@ func (td *TimeDetail) MergeFromTimeDetail(timeDetail *kvrpcpb.TimeDetail) { td.WaitTime += time.Duration(timeDetail.WaitWallTimeMs) * time.Millisecond td.ProcessTime += time.Duration(timeDetail.ProcessWallTimeMs) * time.Millisecond td.KvReadWallTimeMs += time.Duration(timeDetail.KvReadWallTimeMs) * time.Millisecond + td.TotalRPCWallTime += time.Duration(timeDetail.TotalRpcWallTimeNs) * time.Nanosecond } }