mirror of https://github.com/tikv/client-go.git
Handle more detailed statistics from TiKV (#536)
* support more detailed statistics returned from TiKV Signed-off-by: Yilin Chen <sticnarf@gmail.com> * fix log formatting Signed-off-by: Yilin Chen <sticnarf@gmail.com> * add to test Signed-off-by: Yilin Chen <sticnarf@gmail.com>
This commit is contained in:
parent
1c198aab95
commit
57c12f7c64
2
go.mod
2
go.mod
|
|
@ -16,7 +16,7 @@ require (
|
|||
github.com/opentracing/opentracing-go v1.2.0
|
||||
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
|
||||
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
|
||||
github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305
|
||||
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a
|
||||
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/prometheus/client_golang v1.11.0
|
||||
|
|
|
|||
4
go.sum
4
go.sum
|
|
@ -161,8 +161,8 @@ github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB
|
|||
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
|
||||
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
|
||||
github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
|
||||
github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305 h1:TZ0teMZoKHnZDlJxNkWrp5Sgv3w+ruNbrqtBYKsfaNw=
|
||||
github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
|
||||
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a h1:nP2wmyw9JTRsk5rm+tZtfAso6c/1FvuaFNbXTaYz3FE=
|
||||
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
|
||||
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
|
||||
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee h1:VO2t6IBpfvW34TdtD/G10VvnGqjLic1jzOuHjUb5VqM=
|
||||
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
|
||||
|
|
|
|||
|
|
@ -861,7 +861,6 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN
|
|||
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
|
||||
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
|
||||
github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
|
||||
github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
|
||||
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a h1:nP2wmyw9JTRsk5rm+tZtfAso6c/1FvuaFNbXTaYz3FE=
|
||||
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
|
||||
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
|
||||
|
|
|
|||
|
|
@ -309,6 +309,7 @@ func (s *testSnapshotSuite) TestSnapshotRuntimeStats() {
|
|||
ProcessedVersions: 10,
|
||||
ProcessedVersionsSize: 10,
|
||||
TotalVersions: 15,
|
||||
GetSnapshotNanos: 500,
|
||||
RocksdbBlockReadCount: 20,
|
||||
RocksdbBlockReadByte: 15,
|
||||
RocksdbDeleteSkippedCount: 5,
|
||||
|
|
@ -322,6 +323,7 @@ func (s *testSnapshotSuite) TestSnapshotRuntimeStats() {
|
|||
"scan_detail: {total_process_keys: 10, " +
|
||||
"total_process_keys_size: 10, " +
|
||||
"total_keys: 15, " +
|
||||
"get_snapshot_time: 500ns, " +
|
||||
"rocksdb: {delete_skipped_count: 5, " +
|
||||
"key_skipped_count: 1, " +
|
||||
"block: {cache_hit_count: 10, read_count: 20, read_byte: 15 Bytes}}}"
|
||||
|
|
@ -332,6 +334,7 @@ func (s *testSnapshotSuite) TestSnapshotRuntimeStats() {
|
|||
"scan_detail: {total_process_keys: 20, " +
|
||||
"total_process_keys_size: 20, " +
|
||||
"total_keys: 30, " +
|
||||
"get_snapshot_time: 1µs, " +
|
||||
"rocksdb: {delete_skipped_count: 10, " +
|
||||
"key_skipped_count: 2, " +
|
||||
"block: {cache_hit_count: 20, read_count: 40, read_byte: 30 Bytes}}}"
|
||||
|
|
|
|||
|
|
@ -376,6 +376,7 @@ func (c *RPCClient) closeConns() {
|
|||
var (
|
||||
sendReqHistCache sync.Map
|
||||
sendReqCounterCache sync.Map
|
||||
rpcNetLatencyHistCache sync.Map
|
||||
)
|
||||
|
||||
type sendReqHistCacheKey struct {
|
||||
|
|
@ -394,10 +395,14 @@ type sendReqCounterCacheValue struct {
|
|||
timeCounter prometheus.Counter
|
||||
}
|
||||
|
||||
func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, start time.Time, staleRead bool) {
|
||||
func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, resp *tikvrpc.Response, start time.Time, staleRead bool) {
|
||||
elapsed := time.Since(start)
|
||||
secs := elapsed.Seconds()
|
||||
storeID := req.Context.GetPeer().GetStoreId()
|
||||
|
||||
histKey := sendReqHistCacheKey{
|
||||
req.Type,
|
||||
req.Context.GetPeer().GetStoreId(),
|
||||
storeID,
|
||||
staleRead,
|
||||
}
|
||||
counterKey := sendReqCounterCacheKey{
|
||||
|
|
@ -405,31 +410,49 @@ func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, start time.
|
|||
req.GetRequestSource(),
|
||||
}
|
||||
|
||||
reqType := req.Type.String()
|
||||
var storeIDStr string
|
||||
|
||||
hist, ok := sendReqHistCache.Load(histKey)
|
||||
if !ok {
|
||||
reqType := req.Type.String()
|
||||
storeID := strconv.FormatUint(req.Context.GetPeer().GetStoreId(), 10)
|
||||
hist = metrics.TiKVSendReqHistogram.WithLabelValues(reqType, storeID, strconv.FormatBool(staleRead))
|
||||
if len(storeIDStr) == 0 {
|
||||
storeIDStr = strconv.FormatUint(storeID, 10)
|
||||
}
|
||||
hist = metrics.TiKVSendReqHistogram.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead))
|
||||
sendReqHistCache.Store(histKey, hist)
|
||||
}
|
||||
counter, ok := sendReqCounterCache.Load(counterKey)
|
||||
if !ok {
|
||||
reqType := req.Type.String()
|
||||
storeID := strconv.FormatUint(req.Context.GetPeer().GetStoreId(), 10)
|
||||
if len(storeIDStr) == 0 {
|
||||
storeIDStr = strconv.FormatUint(storeID, 10)
|
||||
}
|
||||
counter = sendReqCounterCacheValue{
|
||||
metrics.TiKVSendReqCounter.WithLabelValues(reqType, storeID, strconv.FormatBool(staleRead), counterKey.requestSource),
|
||||
metrics.TiKVSendReqTimeCounter.WithLabelValues(reqType, storeID, strconv.FormatBool(staleRead), counterKey.requestSource),
|
||||
metrics.TiKVSendReqCounter.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead), counterKey.requestSource),
|
||||
metrics.TiKVSendReqTimeCounter.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead), counterKey.requestSource),
|
||||
}
|
||||
sendReqCounterCache.Store(counterKey, counter)
|
||||
}
|
||||
|
||||
secs := time.Since(start).Seconds()
|
||||
hist.(prometheus.Observer).Observe(secs)
|
||||
counter.(sendReqCounterCacheValue).counter.Inc()
|
||||
counter.(sendReqCounterCacheValue).timeCounter.Add(secs)
|
||||
|
||||
if execDetail, err := resp.GetExecDetailsV2(); err == nil &&
|
||||
execDetail != nil && execDetail.TimeDetail != nil && execDetail.TimeDetail.TotalRpcWallTimeNs > 0 {
|
||||
latHist, ok := rpcNetLatencyHistCache.Load(storeID)
|
||||
if !ok {
|
||||
if len(storeIDStr) == 0 {
|
||||
storeIDStr = strconv.FormatUint(storeID, 10)
|
||||
}
|
||||
latHist = metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(storeIDStr)
|
||||
sendReqHistCache.Store(storeID, latHist)
|
||||
}
|
||||
latency := elapsed - time.Duration(execDetail.TimeDetail.TotalRpcWallTimeNs)*time.Nanosecond
|
||||
latHist.(prometheus.Observer).Observe(latency.Seconds())
|
||||
}
|
||||
}
|
||||
|
||||
func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
|
||||
func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (resp *tikvrpc.Response, err error) {
|
||||
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
|
||||
span1 := span.Tracer().StartSpan(fmt.Sprintf("rpcClient.SendRequest, region ID: %d, type: %s", req.RegionId, req.Type), opentracing.ChildOf(span.Context()))
|
||||
defer span1.Finish()
|
||||
|
|
@ -456,7 +479,7 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R
|
|||
detail := stmtExec.(*util.ExecDetails)
|
||||
atomic.AddInt64(&detail.WaitKVRespDuration, int64(time.Since(start)))
|
||||
}
|
||||
c.updateTiKVSendReqHistogram(req, start, staleRead)
|
||||
c.updateTiKVSendReqHistogram(req, resp, start, staleRead)
|
||||
}()
|
||||
|
||||
// TiDB RPC server supports batch RPC, but batch connection will send heart beat, It's not necessary since
|
||||
|
|
|
|||
|
|
@ -46,6 +46,7 @@ var (
|
|||
TiKVSendReqHistogram *prometheus.HistogramVec
|
||||
TiKVSendReqCounter *prometheus.CounterVec
|
||||
TiKVSendReqTimeCounter *prometheus.CounterVec
|
||||
TiKVRPCNetLatencyHistogram *prometheus.HistogramVec
|
||||
TiKVCoprocessorHistogram *prometheus.HistogramVec
|
||||
TiKVLockResolverCounter *prometheus.CounterVec
|
||||
TiKVRegionErrorCounter *prometheus.CounterVec
|
||||
|
|
@ -161,6 +162,15 @@ func initMetrics(namespace, subsystem string) {
|
|||
Help: "Counter of request time with multi dimensions.",
|
||||
}, []string{LblType, LblStore, LblStaleRead, LblSource})
|
||||
|
||||
TiKVRPCNetLatencyHistogram = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "rpc_net_latency_seconds",
|
||||
Help: "Bucketed histogram of time difference between TiDB and TiKV.",
|
||||
Buckets: prometheus.ExponentialBuckets(5e-5, 2, 18), // 50us ~ 6.5s
|
||||
}, []string{LblStore})
|
||||
|
||||
TiKVCoprocessorHistogram = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
|
|
@ -599,6 +609,7 @@ func RegisterMetrics() {
|
|||
prometheus.MustRegister(TiKVSendReqHistogram)
|
||||
prometheus.MustRegister(TiKVSendReqCounter)
|
||||
prometheus.MustRegister(TiKVSendReqTimeCounter)
|
||||
prometheus.MustRegister(TiKVRPCNetLatencyHistogram)
|
||||
prometheus.MustRegister(TiKVCoprocessorHistogram)
|
||||
prometheus.MustRegister(TiKVLockResolverCounter)
|
||||
prometheus.MustRegister(TiKVRegionErrorCounter)
|
||||
|
|
|
|||
|
|
@ -926,6 +926,25 @@ func (resp *Response) GetRegionError() (*errorpb.Error, error) {
|
|||
return err.GetRegionError(), nil
|
||||
}
|
||||
|
||||
type getExecDetailsV2 interface {
|
||||
GetExecDetailsV2() *kvrpcpb.ExecDetailsV2
|
||||
}
|
||||
|
||||
// GetExecDetailsV2 returns the ExecDetailsV2 of the underlying concrete response.
|
||||
func (resp *Response) GetExecDetailsV2() (*kvrpcpb.ExecDetailsV2, error) {
|
||||
if resp == nil || resp.Resp == nil {
|
||||
return nil, nil
|
||||
}
|
||||
details, ok := resp.Resp.(getExecDetailsV2)
|
||||
if !ok {
|
||||
if _, isEmpty := resp.Resp.(*tikvpb.BatchCommandsEmptyResponse); isEmpty {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, errors.Errorf("invalid response type %v", resp)
|
||||
}
|
||||
return details.GetExecDetailsV2(), nil
|
||||
}
|
||||
|
||||
// CallRPC launches a rpc call.
|
||||
// ch is needed to implement timeout for coprocessor streaming, the stream object's
|
||||
// cancel function will be sent to the channel, together with a lease checked by a background goroutine.
|
||||
|
|
|
|||
|
|
@ -88,7 +88,8 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer,
|
|||
sender := locate.NewRegionRequestSender(c.store.GetRegionCache(), c.store.GetTiKVClient())
|
||||
for {
|
||||
attempts++
|
||||
if time.Since(tBegin) > slowRequestThreshold {
|
||||
reqBegin := time.Now()
|
||||
if reqBegin.Sub(tBegin) > slowRequestThreshold {
|
||||
logutil.BgLogger().Warn("slow commit request", zap.Uint64("startTS", c.startTS), zap.Stringer("region", &batch.region), zap.Int("attempts", attempts))
|
||||
tBegin = time.Now()
|
||||
}
|
||||
|
|
@ -140,6 +141,8 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer,
|
|||
// we can clean undetermined error.
|
||||
if batch.isPrimary && !c.isAsyncCommit() {
|
||||
c.setUndeterminedErr(nil)
|
||||
reqDuration := time.Since(reqBegin)
|
||||
c.getDetail().MergeReqDetails(reqDuration, batch.region.GetID(), sender.GetStoreAddr(), commitResp.ExecDetailsV2)
|
||||
}
|
||||
if keyErr := commitResp.GetError(); keyErr != nil {
|
||||
if rejected := keyErr.GetCommitTsExpired(); rejected != nil {
|
||||
|
|
|
|||
|
|
@ -144,10 +144,12 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
|
|||
time.Sleep(300 * time.Millisecond)
|
||||
return errors.WithStack(&tikverr.ErrWriteConflict{WriteConflict: nil})
|
||||
}
|
||||
sender := locate.NewRegionRequestSender(c.store.GetRegionCache(), c.store.GetTiKVClient())
|
||||
startTime := time.Now()
|
||||
resp, err := c.store.SendReq(bo, req, batch.region, client.ReadTimeoutShort)
|
||||
resp, err := sender.SendReq(bo, req, batch.region, client.ReadTimeoutShort)
|
||||
reqDuration := time.Since(startTime)
|
||||
if action.LockCtx.Stats != nil {
|
||||
atomic.AddInt64(&action.LockCtx.Stats.LockRPCTime, int64(time.Since(startTime)))
|
||||
atomic.AddInt64(&action.LockCtx.Stats.LockRPCTime, int64(reqDuration))
|
||||
atomic.AddInt64(&action.LockCtx.Stats.LockRPCCount, 1)
|
||||
}
|
||||
if err != nil {
|
||||
|
|
@ -183,6 +185,8 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
|
|||
lockResp := resp.Resp.(*kvrpcpb.PessimisticLockResponse)
|
||||
keyErrs := lockResp.GetErrors()
|
||||
if len(keyErrs) == 0 {
|
||||
action.LockCtx.Stats.MergeReqDetails(reqDuration, batch.region.GetID(), sender.GetStoreAddr(), lockResp.ExecDetailsV2)
|
||||
|
||||
if batch.isPrimary {
|
||||
// After locking the primary key, we should protect the primary lock from expiring
|
||||
// now in case locking the remaining keys take a long time.
|
||||
|
|
|
|||
|
|
@ -247,7 +247,8 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B
|
|||
if attempts > 1 || action.retry {
|
||||
req.IsRetryRequest = true
|
||||
}
|
||||
if time.Since(tBegin) > slowRequestThreshold {
|
||||
reqBegin := time.Now()
|
||||
if reqBegin.Sub(tBegin) > slowRequestThreshold {
|
||||
logutil.BgLogger().Warn("slow prewrite request", zap.Uint64("startTS", c.startTS), zap.Stringer("region", &batch.region), zap.Int("attempts", attempts))
|
||||
tBegin = time.Now()
|
||||
}
|
||||
|
|
@ -305,6 +306,10 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B
|
|||
// Clear the RPC Error since the request is evaluated successfully.
|
||||
sender.SetRPCError(nil)
|
||||
|
||||
// Update CommitDetails
|
||||
reqDuration := time.Since(reqBegin)
|
||||
c.getDetail().MergeReqDetails(reqDuration, batch.region.GetID(), sender.GetStoreAddr(), prewriteResp.ExecDetailsV2)
|
||||
|
||||
if batch.isPrimary {
|
||||
// After writing the primary key, if the size of the transaction is larger than 32M,
|
||||
// start the ttlManager. The ttlManager will be closed in tikvTxn.Commit().
|
||||
|
|
@ -358,6 +363,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B
|
|||
c.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
var locks []*txnlock.Lock
|
||||
|
|
|
|||
|
|
@ -60,6 +60,54 @@ var (
|
|||
ExecDetailsKey = execDetailsCtxKeyType{}
|
||||
)
|
||||
|
||||
// TiKVExecDetails is the detail execution information at TiKV side.
|
||||
type TiKVExecDetails struct {
|
||||
TimeDetail *TimeDetail
|
||||
ScanDetail *ScanDetail
|
||||
WriteDetail *WriteDetail
|
||||
}
|
||||
|
||||
// NewTiKVExecDetails creates a TiKVExecDetails from a kvproto ExecDetailsV2.
|
||||
func NewTiKVExecDetails(pb *kvrpcpb.ExecDetailsV2) TiKVExecDetails {
|
||||
if pb == nil {
|
||||
return TiKVExecDetails{}
|
||||
}
|
||||
td := &TimeDetail{}
|
||||
td.MergeFromTimeDetail(pb.TimeDetail)
|
||||
sd := &ScanDetail{}
|
||||
sd.MergeFromScanDetailV2(pb.ScanDetailV2)
|
||||
wd := &WriteDetail{}
|
||||
wd.MergeFromWriteDetailPb(pb.WriteDetail)
|
||||
return TiKVExecDetails{
|
||||
TimeDetail: td,
|
||||
ScanDetail: sd,
|
||||
WriteDetail: wd,
|
||||
}
|
||||
}
|
||||
|
||||
func (ed *TiKVExecDetails) String() string {
|
||||
if ed == nil {
|
||||
return ""
|
||||
}
|
||||
buf := bytes.NewBuffer(make([]byte, 0, 16))
|
||||
if ed.TimeDetail != nil {
|
||||
buf.WriteString(ed.TimeDetail.String())
|
||||
}
|
||||
if ed.ScanDetail != nil {
|
||||
if buf.Len() > 0 {
|
||||
buf.WriteString(", ")
|
||||
}
|
||||
buf.WriteString(ed.ScanDetail.String())
|
||||
}
|
||||
if ed.WriteDetail != nil {
|
||||
if buf.Len() > 0 {
|
||||
buf.WriteString(", ")
|
||||
}
|
||||
buf.WriteString(ed.WriteDetail.String())
|
||||
}
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
// CommitDetails contains commit detail information.
|
||||
type CommitDetails struct {
|
||||
GetCommitTsTime time.Duration
|
||||
|
|
@ -72,6 +120,10 @@ type CommitDetails struct {
|
|||
sync.Mutex
|
||||
CommitBackoffTime int64
|
||||
BackoffTypes []string
|
||||
SlowestReqTotalTime time.Duration
|
||||
SlowestRegion uint64
|
||||
SlowestStoreAddr string
|
||||
SlowestExecDetails TiKVExecDetails
|
||||
}
|
||||
WriteKeys int
|
||||
WriteSize int
|
||||
|
|
@ -95,6 +147,27 @@ func (cd *CommitDetails) Merge(other *CommitDetails) {
|
|||
cd.TxnRetry += other.TxnRetry
|
||||
cd.Mu.CommitBackoffTime += other.Mu.CommitBackoffTime
|
||||
cd.Mu.BackoffTypes = append(cd.Mu.BackoffTypes, other.Mu.BackoffTypes...)
|
||||
if cd.Mu.SlowestReqTotalTime < other.Mu.SlowestReqTotalTime {
|
||||
cd.Mu.SlowestReqTotalTime = other.Mu.SlowestReqTotalTime
|
||||
cd.Mu.SlowestRegion = other.Mu.SlowestRegion
|
||||
cd.Mu.SlowestStoreAddr = other.Mu.SlowestStoreAddr
|
||||
cd.Mu.SlowestExecDetails = other.Mu.SlowestExecDetails
|
||||
}
|
||||
}
|
||||
|
||||
// MergeReqDetails merges ExecDetailsV2 into the current CommitDetails.
|
||||
func (cd *CommitDetails) MergeReqDetails(reqDuration time.Duration, regionID uint64, addr string, execDetails *kvrpcpb.ExecDetailsV2) {
|
||||
if cd == nil {
|
||||
return
|
||||
}
|
||||
cd.Mu.Lock()
|
||||
defer cd.Mu.Unlock()
|
||||
if reqDuration > cd.Mu.SlowestReqTotalTime {
|
||||
cd.Mu.SlowestReqTotalTime = reqDuration
|
||||
cd.Mu.SlowestRegion = regionID
|
||||
cd.Mu.SlowestStoreAddr = addr
|
||||
cd.Mu.SlowestExecDetails = NewTiKVExecDetails(execDetails)
|
||||
}
|
||||
}
|
||||
|
||||
// Clone returns a deep copy of itself.
|
||||
|
|
@ -114,6 +187,10 @@ func (cd *CommitDetails) Clone() *CommitDetails {
|
|||
}
|
||||
commit.Mu.BackoffTypes = append([]string{}, cd.Mu.BackoffTypes...)
|
||||
commit.Mu.CommitBackoffTime = cd.Mu.CommitBackoffTime
|
||||
commit.Mu.SlowestReqTotalTime = cd.Mu.SlowestReqTotalTime
|
||||
commit.Mu.SlowestRegion = cd.Mu.SlowestRegion
|
||||
commit.Mu.SlowestStoreAddr = cd.Mu.SlowestStoreAddr
|
||||
commit.Mu.SlowestExecDetails = cd.Mu.SlowestExecDetails
|
||||
return commit
|
||||
}
|
||||
|
||||
|
|
@ -127,6 +204,10 @@ type LockKeysDetails struct {
|
|||
Mu struct {
|
||||
sync.Mutex
|
||||
BackoffTypes []string
|
||||
SlowestReqTotalTime time.Duration
|
||||
SlowestRegion uint64
|
||||
SlowestStoreAddr string
|
||||
SlowestExecDetails TiKVExecDetails
|
||||
}
|
||||
LockRPCTime int64
|
||||
LockRPCCount int64
|
||||
|
|
@ -144,6 +225,27 @@ func (ld *LockKeysDetails) Merge(lockKey *LockKeysDetails) {
|
|||
ld.LockRPCCount += ld.LockRPCCount
|
||||
ld.Mu.BackoffTypes = append(ld.Mu.BackoffTypes, lockKey.Mu.BackoffTypes...)
|
||||
ld.RetryCount++
|
||||
if ld.Mu.SlowestReqTotalTime < lockKey.Mu.SlowestReqTotalTime {
|
||||
ld.Mu.SlowestReqTotalTime = lockKey.Mu.SlowestReqTotalTime
|
||||
ld.Mu.SlowestRegion = lockKey.Mu.SlowestRegion
|
||||
ld.Mu.SlowestStoreAddr = lockKey.Mu.SlowestStoreAddr
|
||||
ld.Mu.SlowestExecDetails = lockKey.Mu.SlowestExecDetails
|
||||
}
|
||||
}
|
||||
|
||||
// MergeReqDetails merges ExecDetailsV2 into the current LockKeysDetails.
|
||||
func (ld *LockKeysDetails) MergeReqDetails(reqDuration time.Duration, regionID uint64, addr string, execDetails *kvrpcpb.ExecDetailsV2) {
|
||||
if ld == nil {
|
||||
return
|
||||
}
|
||||
ld.Mu.Lock()
|
||||
defer ld.Mu.Unlock()
|
||||
if reqDuration > ld.Mu.SlowestReqTotalTime {
|
||||
ld.Mu.SlowestReqTotalTime = reqDuration
|
||||
ld.Mu.SlowestRegion = regionID
|
||||
ld.Mu.SlowestStoreAddr = addr
|
||||
ld.Mu.SlowestExecDetails = NewTiKVExecDetails(execDetails)
|
||||
}
|
||||
}
|
||||
|
||||
// Clone returns a deep copy of itself.
|
||||
|
|
@ -159,6 +261,10 @@ func (ld *LockKeysDetails) Clone() *LockKeysDetails {
|
|||
ResolveLock: ld.ResolveLock,
|
||||
}
|
||||
lock.Mu.BackoffTypes = append([]string{}, ld.Mu.BackoffTypes...)
|
||||
lock.Mu.SlowestReqTotalTime = ld.Mu.SlowestReqTotalTime
|
||||
lock.Mu.SlowestRegion = ld.Mu.SlowestRegion
|
||||
lock.Mu.SlowestStoreAddr = ld.Mu.SlowestStoreAddr
|
||||
lock.Mu.SlowestExecDetails = ld.Mu.SlowestExecDetails
|
||||
return lock
|
||||
}
|
||||
|
||||
|
|
@ -231,6 +337,11 @@ type ScanDetail struct {
|
|||
RocksdbBlockReadCount uint64
|
||||
// RocksdbBlockReadByte is the total number of bytes from block reads.
|
||||
RocksdbBlockReadByte uint64
|
||||
// RocksdbBlockReadDuration is the total time used for block reads.
|
||||
RocksdbBlockReadDuration time.Duration
|
||||
// GetSnapshotDuration is the time spent getting an engine snapshot.
|
||||
GetSnapshotDuration time.Duration
|
||||
|
||||
ResolveLock *ResolveLockDetail
|
||||
}
|
||||
|
||||
|
|
@ -244,6 +355,8 @@ func (sd *ScanDetail) Merge(scanDetail *ScanDetail) {
|
|||
atomic.AddUint64(&sd.RocksdbBlockCacheHitCount, scanDetail.RocksdbBlockCacheHitCount)
|
||||
atomic.AddUint64(&sd.RocksdbBlockReadCount, scanDetail.RocksdbBlockReadCount)
|
||||
atomic.AddUint64(&sd.RocksdbBlockReadByte, scanDetail.RocksdbBlockReadByte)
|
||||
atomic.AddInt64((*int64)(&sd.RocksdbBlockReadDuration), int64(scanDetail.RocksdbBlockReadDuration))
|
||||
atomic.AddInt64((*int64)(&sd.GetSnapshotDuration), int64(scanDetail.GetSnapshotDuration))
|
||||
}
|
||||
|
||||
var zeroScanDetail = ScanDetail{}
|
||||
|
|
@ -255,24 +368,60 @@ func (sd *ScanDetail) String() string {
|
|||
}
|
||||
buf := bytes.NewBuffer(make([]byte, 0, 16))
|
||||
buf.WriteString("scan_detail: {")
|
||||
if sd.ProcessedKeys > 0 {
|
||||
buf.WriteString("total_process_keys: ")
|
||||
buf.WriteString(strconv.FormatInt(sd.ProcessedKeys, 10))
|
||||
buf.WriteString(", total_process_keys_size: ")
|
||||
buf.WriteString(", ")
|
||||
}
|
||||
if sd.ProcessedKeysSize > 0 {
|
||||
buf.WriteString("total_process_keys_size: ")
|
||||
buf.WriteString(strconv.FormatInt(sd.ProcessedKeysSize, 10))
|
||||
buf.WriteString(", total_keys: ")
|
||||
buf.WriteString(", ")
|
||||
}
|
||||
if sd.TotalKeys > 0 {
|
||||
buf.WriteString("total_keys: ")
|
||||
buf.WriteString(strconv.FormatInt(sd.TotalKeys, 10))
|
||||
buf.WriteString(", rocksdb: {")
|
||||
buf.WriteString(", ")
|
||||
}
|
||||
if sd.GetSnapshotDuration > 0 {
|
||||
buf.WriteString("get_snapshot_time: ")
|
||||
buf.WriteString(FormatDuration(sd.GetSnapshotDuration))
|
||||
buf.WriteString(", ")
|
||||
}
|
||||
buf.WriteString("rocksdb: {")
|
||||
if sd.RocksdbDeleteSkippedCount > 0 {
|
||||
buf.WriteString("delete_skipped_count: ")
|
||||
buf.WriteString(strconv.FormatUint(sd.RocksdbDeleteSkippedCount, 10))
|
||||
buf.WriteString(", key_skipped_count: ")
|
||||
buf.WriteString(", ")
|
||||
}
|
||||
if sd.RocksdbKeySkippedCount > 0 {
|
||||
buf.WriteString("key_skipped_count: ")
|
||||
buf.WriteString(strconv.FormatUint(sd.RocksdbKeySkippedCount, 10))
|
||||
buf.WriteString(", block: {")
|
||||
buf.WriteString(", ")
|
||||
}
|
||||
buf.WriteString("block: {")
|
||||
if sd.RocksdbBlockCacheHitCount > 0 {
|
||||
buf.WriteString("cache_hit_count: ")
|
||||
buf.WriteString(strconv.FormatUint(sd.RocksdbBlockCacheHitCount, 10))
|
||||
buf.WriteString(", read_count: ")
|
||||
buf.WriteString(", ")
|
||||
}
|
||||
if sd.RocksdbBlockReadCount > 0 {
|
||||
buf.WriteString("read_count: ")
|
||||
buf.WriteString(strconv.FormatUint(sd.RocksdbBlockReadCount, 10))
|
||||
buf.WriteString(", read_byte: ")
|
||||
buf.WriteString(", ")
|
||||
}
|
||||
if sd.RocksdbBlockReadByte > 0 {
|
||||
buf.WriteString("read_byte: ")
|
||||
buf.WriteString(FormatBytes(int64(sd.RocksdbBlockReadByte)))
|
||||
buf.WriteString(", ")
|
||||
}
|
||||
if sd.RocksdbBlockReadDuration > 0 {
|
||||
buf.WriteString("read_time: ")
|
||||
buf.WriteString(FormatDuration(sd.RocksdbBlockReadDuration))
|
||||
}
|
||||
if buf.Bytes()[buf.Len()-2] == ',' {
|
||||
buf.Truncate(buf.Len() - 2)
|
||||
}
|
||||
buf.WriteString("}}}")
|
||||
return buf.String()
|
||||
}
|
||||
|
|
@ -288,9 +437,115 @@ func (sd *ScanDetail) MergeFromScanDetailV2(scanDetail *kvrpcpb.ScanDetailV2) {
|
|||
sd.RocksdbBlockCacheHitCount += scanDetail.RocksdbBlockCacheHitCount
|
||||
sd.RocksdbBlockReadCount += scanDetail.RocksdbBlockReadCount
|
||||
sd.RocksdbBlockReadByte += scanDetail.RocksdbBlockReadByte
|
||||
sd.RocksdbBlockReadDuration += time.Duration(scanDetail.RocksdbBlockReadNanos) * time.Nanosecond
|
||||
sd.GetSnapshotDuration += time.Duration(scanDetail.GetSnapshotNanos) * time.Nanosecond
|
||||
}
|
||||
}
|
||||
|
||||
// WriteDetail contains the detailed time breakdown of a write operation.
|
||||
type WriteDetail struct {
|
||||
// StoreBatchWaitDuration is the wait duration in the store loop.
|
||||
StoreBatchWaitDuration time.Duration
|
||||
// ProposeSendWaitDuration is the duration before sending proposal to peers.
|
||||
ProposeSendWaitDuration time.Duration
|
||||
// PersistLogDuration is the total time spent on persisting the log.
|
||||
PersistLogDuration time.Duration
|
||||
// RaftDbWriteLeaderWaitDuration is the wait time until the Raft log write leader begins to write.
|
||||
RaftDbWriteLeaderWaitDuration time.Duration
|
||||
// RaftDbSyncLogDuration is the time spent on synchronizing the Raft log to the disk.
|
||||
RaftDbSyncLogDuration time.Duration
|
||||
// RaftDbWriteMemtableDuration is the time spent on writing the Raft log to the Raft memtable.
|
||||
RaftDbWriteMemtableDuration time.Duration
|
||||
// CommitLogDuration is the time waiting for peers to confirm the proposal (counting from the instant when the leader sends the proposal message).
|
||||
CommitLogDuration time.Duration
|
||||
// ApplyBatchWaitDuration is the wait duration in the apply loop.
|
||||
ApplyBatchWaitDuration time.Duration
|
||||
// ApplyLogDuration is the total time spend to applying the log.
|
||||
ApplyLogDuration time.Duration
|
||||
// ApplyMutexLockDuration is the wait time until the KV RocksDB lock is acquired.
|
||||
ApplyMutexLockDuration time.Duration
|
||||
// ApplyWriteLeaderWaitDuration is the wait time until becoming the KV RocksDB write leader.
|
||||
ApplyWriteLeaderWaitDuration time.Duration
|
||||
// ApplyWriteWalDuration is the time spent on writing the KV DB WAL to the disk.
|
||||
ApplyWriteWalDuration time.Duration
|
||||
// ApplyWriteMemtableNanos is the time spent on writing to the memtable of the KV RocksDB.
|
||||
ApplyWriteMemtableDuration time.Duration
|
||||
}
|
||||
|
||||
// MergeFromWriteDetailPb merges WriteDetail protobuf into the current WriteDetail
|
||||
func (wd *WriteDetail) MergeFromWriteDetailPb(pb *kvrpcpb.WriteDetail) {
|
||||
if pb != nil {
|
||||
wd.StoreBatchWaitDuration += time.Duration(pb.StoreBatchWaitNanos) * time.Nanosecond
|
||||
wd.ProposeSendWaitDuration += time.Duration(pb.ProposeSendWaitNanos) * time.Nanosecond
|
||||
wd.PersistLogDuration += time.Duration(pb.PersistLogNanos) * time.Nanosecond
|
||||
wd.RaftDbWriteLeaderWaitDuration += time.Duration(pb.RaftDbWriteLeaderWaitNanos) * time.Nanosecond
|
||||
wd.RaftDbSyncLogDuration += time.Duration(pb.RaftDbSyncLogNanos) * time.Nanosecond
|
||||
wd.RaftDbWriteMemtableDuration += time.Duration(pb.RaftDbWriteMemtableNanos) * time.Nanosecond
|
||||
wd.CommitLogDuration += time.Duration(pb.CommitLogNanos) * time.Nanosecond
|
||||
wd.ApplyBatchWaitDuration += time.Duration(pb.ApplyBatchWaitNanos) * time.Nanosecond
|
||||
wd.ApplyLogDuration += time.Duration(pb.ApplyLogNanos) * time.Nanosecond
|
||||
wd.ApplyMutexLockDuration += time.Duration(pb.ApplyMutexLockNanos) * time.Nanosecond
|
||||
wd.ApplyWriteLeaderWaitDuration += time.Duration(pb.ApplyWriteLeaderWaitNanos) * time.Nanosecond
|
||||
wd.ApplyWriteWalDuration += time.Duration(pb.ApplyWriteWalNanos) * time.Nanosecond
|
||||
wd.ApplyWriteMemtableDuration += time.Duration(pb.ApplyWriteMemtableNanos) * time.Nanosecond
|
||||
}
|
||||
}
|
||||
|
||||
// Merge merges another WriteDetail protobuf into self.
|
||||
func (wd *WriteDetail) Merge(writeDetail *WriteDetail) {
|
||||
atomic.AddInt64((*int64)(&wd.StoreBatchWaitDuration), int64(writeDetail.StoreBatchWaitDuration))
|
||||
atomic.AddInt64((*int64)(&wd.ProposeSendWaitDuration), int64(writeDetail.ProposeSendWaitDuration))
|
||||
atomic.AddInt64((*int64)(&wd.PersistLogDuration), int64(writeDetail.PersistLogDuration))
|
||||
atomic.AddInt64((*int64)(&wd.RaftDbWriteLeaderWaitDuration), int64(writeDetail.RaftDbWriteLeaderWaitDuration))
|
||||
atomic.AddInt64((*int64)(&wd.RaftDbSyncLogDuration), int64(writeDetail.RaftDbSyncLogDuration))
|
||||
atomic.AddInt64((*int64)(&wd.RaftDbWriteMemtableDuration), int64(writeDetail.RaftDbWriteMemtableDuration))
|
||||
atomic.AddInt64((*int64)(&wd.CommitLogDuration), int64(writeDetail.CommitLogDuration))
|
||||
atomic.AddInt64((*int64)(&wd.ApplyBatchWaitDuration), int64(writeDetail.ApplyBatchWaitDuration))
|
||||
atomic.AddInt64((*int64)(&wd.ApplyLogDuration), int64(writeDetail.ApplyLogDuration))
|
||||
atomic.AddInt64((*int64)(&wd.ApplyMutexLockDuration), int64(writeDetail.ApplyMutexLockDuration))
|
||||
atomic.AddInt64((*int64)(&wd.ApplyWriteLeaderWaitDuration), int64(writeDetail.ApplyWriteLeaderWaitDuration))
|
||||
atomic.AddInt64((*int64)(&wd.ApplyWriteWalDuration), int64(writeDetail.ApplyWriteWalDuration))
|
||||
atomic.AddInt64((*int64)(&wd.ApplyWriteMemtableDuration), int64(writeDetail.ApplyWriteMemtableDuration))
|
||||
}
|
||||
|
||||
var zeroWriteDetail = WriteDetail{}
|
||||
|
||||
func (wd *WriteDetail) String() string {
|
||||
if wd == nil || *wd == zeroWriteDetail {
|
||||
return ""
|
||||
}
|
||||
buf := bytes.NewBuffer(make([]byte, 0, 64))
|
||||
buf.WriteString("write_detail: {")
|
||||
buf.WriteString("store_batch_wait: ")
|
||||
buf.WriteString(FormatDuration(wd.StoreBatchWaitDuration))
|
||||
buf.WriteString(", propose_send_wait: ")
|
||||
buf.WriteString(FormatDuration(wd.ProposeSendWaitDuration))
|
||||
buf.WriteString(", persist_log: {total: ")
|
||||
buf.WriteString(FormatDuration(wd.PersistLogDuration))
|
||||
buf.WriteString(", write_leader_wait: ")
|
||||
buf.WriteString(FormatDuration(wd.RaftDbWriteLeaderWaitDuration))
|
||||
buf.WriteString(", sync_log: ")
|
||||
buf.WriteString(FormatDuration(wd.RaftDbSyncLogDuration))
|
||||
buf.WriteString(", write_memtable: ")
|
||||
buf.WriteString(FormatDuration(wd.RaftDbWriteMemtableDuration))
|
||||
buf.WriteString("}, commit_log: ")
|
||||
buf.WriteString(FormatDuration(wd.CommitLogDuration))
|
||||
buf.WriteString(", apply_batch_wait: ")
|
||||
buf.WriteString(FormatDuration(wd.ApplyBatchWaitDuration))
|
||||
buf.WriteString(", apply: {total:")
|
||||
buf.WriteString(FormatDuration(wd.ApplyLogDuration))
|
||||
buf.WriteString(", mutex_lock: ")
|
||||
buf.WriteString(FormatDuration(wd.ApplyMutexLockDuration))
|
||||
buf.WriteString(", write_leader_wait: ")
|
||||
buf.WriteString(FormatDuration(wd.ApplyWriteLeaderWaitDuration))
|
||||
buf.WriteString(", write_wal: ")
|
||||
buf.WriteString(FormatDuration(wd.ApplyWriteWalDuration))
|
||||
buf.WriteString(", write_memtable: ")
|
||||
buf.WriteString(FormatDuration(wd.ApplyWriteMemtableDuration))
|
||||
buf.WriteString("}}")
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
// TimeDetail contains coprocessor time detail information.
|
||||
type TimeDetail struct {
|
||||
// Off-cpu and on-cpu wall time elapsed to actually process the request payload. It does not
|
||||
|
|
@ -304,6 +559,8 @@ type TimeDetail struct {
|
|||
WaitTime time.Duration
|
||||
// KvReadWallTimeMs is the time used in KV Scan/Get.
|
||||
KvReadWallTimeMs time.Duration
|
||||
// TotalRPCWallTime is Total wall clock time spent on this RPC in TiKV.
|
||||
TotalRPCWallTime time.Duration
|
||||
}
|
||||
|
||||
// String implements the fmt.Stringer interface.
|
||||
|
|
@ -323,6 +580,13 @@ func (td *TimeDetail) String() string {
|
|||
buf.WriteString("total_wait_time: ")
|
||||
buf.WriteString(FormatDuration(td.WaitTime))
|
||||
}
|
||||
if td.TotalRPCWallTime > 0 {
|
||||
if buf.Len() > 0 {
|
||||
buf.WriteString(", ")
|
||||
}
|
||||
buf.WriteString("tikv_wall_time: ")
|
||||
buf.WriteString(FormatDuration(td.TotalRPCWallTime))
|
||||
}
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
|
|
@ -332,6 +596,7 @@ func (td *TimeDetail) MergeFromTimeDetail(timeDetail *kvrpcpb.TimeDetail) {
|
|||
td.WaitTime += time.Duration(timeDetail.WaitWallTimeMs) * time.Millisecond
|
||||
td.ProcessTime += time.Duration(timeDetail.ProcessWallTimeMs) * time.Millisecond
|
||||
td.KvReadWallTimeMs += time.Duration(timeDetail.KvReadWallTimeMs) * time.Millisecond
|
||||
td.TotalRPCWallTime += time.Duration(timeDetail.TotalRpcWallTimeNs) * time.Nanosecond
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue