From bdd41058aa13901e59b46b625aa73caa46e56b6d Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Mon, 6 Mar 2023 22:12:45 +0800 Subject: [PATCH] metrics: seperate metrics with source scope for txn command (#723) * seperate metrics with source scope Signed-off-by: cfzjywxk * fix async pessimistic rollback Signed-off-by: cfzjywxk * fix missing definition Signed-off-by: cfzjywxk --------- Signed-off-by: cfzjywxk --- internal/client/client.go | 33 +++++++++++++----- internal/locate/region_request.go | 6 +++- metrics/metrics.go | 26 +++++++------- metrics/shortcuts.go | 57 ++++++++++++++++++++++--------- txnkv/transaction/2pc.go | 15 ++++++-- txnkv/transaction/cleanup.go | 13 ++++--- txnkv/transaction/commit.go | 18 ++++++---- txnkv/transaction/pessimistic.go | 29 ++++++++++++---- txnkv/transaction/prewrite.go | 16 ++++++--- txnkv/transaction/txn.go | 1 + txnkv/txnsnapshot/snapshot.go | 6 +++- 11 files changed, 155 insertions(+), 65 deletions(-) diff --git a/internal/client/client.go b/internal/client/client.go index 168638d1..7954eff5 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -487,9 +487,10 @@ var ( ) type sendReqHistCacheKey struct { - tp tikvrpc.CmdType - id uint64 - staleRad bool + tp tikvrpc.CmdType + id uint64 + staleRad bool + isInternal bool } type sendReqCounterCacheKey struct { @@ -497,6 +498,11 @@ type sendReqCounterCacheKey struct { requestSource string } +type rpcNetLatencyCacheKey struct { + storeID uint64 + isInternal bool +} + type sendReqCounterCacheValue struct { counter prometheus.Counter timeCounter prometheus.Counter @@ -506,11 +512,13 @@ func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, resp *tikvr elapsed := time.Since(start) secs := elapsed.Seconds() storeID := req.Context.GetPeer().GetStoreId() + isInternal := util.IsInternalRequest(req.GetRequestSource()) histKey := sendReqHistCacheKey{ req.Type, storeID, staleRead, + isInternal, } counterKey := sendReqCounterCacheKey{ histKey, @@ -525,7 +533,8 @@ func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, resp *tikvr if len(storeIDStr) == 0 { storeIDStr = strconv.FormatUint(storeID, 10) } - hist = metrics.TiKVSendReqHistogram.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead)) + hist = metrics.TiKVSendReqHistogram.WithLabelValues(reqType, storeIDStr, + strconv.FormatBool(staleRead), strconv.FormatBool(isInternal)) sendReqHistCache.Store(histKey, hist) } counter, ok := sendReqCounterCache.Load(counterKey) @@ -534,8 +543,10 @@ func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, resp *tikvr storeIDStr = strconv.FormatUint(storeID, 10) } counter = sendReqCounterCacheValue{ - metrics.TiKVSendReqCounter.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead), counterKey.requestSource), - metrics.TiKVSendReqTimeCounter.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead), counterKey.requestSource), + metrics.TiKVSendReqCounter.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead), + counterKey.requestSource, strconv.FormatBool(isInternal)), + metrics.TiKVSendReqTimeCounter.WithLabelValues(reqType, storeIDStr, + strconv.FormatBool(staleRead), counterKey.requestSource, strconv.FormatBool(isInternal)), } sendReqCounterCache.Store(counterKey, counter) } @@ -546,13 +557,17 @@ func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, resp *tikvr if execDetail := resp.GetExecDetailsV2(); execDetail != nil && execDetail.TimeDetail != nil && execDetail.TimeDetail.TotalRpcWallTimeNs > 0 { - latHist, ok := rpcNetLatencyHistCache.Load(storeID) + cacheKey := rpcNetLatencyCacheKey{ + storeID, + isInternal, + } + latHist, ok := rpcNetLatencyHistCache.Load(cacheKey) if !ok { if len(storeIDStr) == 0 { storeIDStr = strconv.FormatUint(storeID, 10) } - latHist = metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(storeIDStr) - rpcNetLatencyHistCache.Store(storeID, latHist) + latHist = metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(storeIDStr, strconv.FormatBool(isInternal)) + rpcNetLatencyHistCache.Store(cacheKey, latHist) } latency := elapsed - time.Duration(execDetail.TimeDetail.TotalRpcWallTimeNs)*time.Nanosecond latHist.(prometheus.Observer).Observe(latency.Seconds()) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index d3bd89c9..2a1f6615 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -1571,7 +1571,11 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext } // NOTE: Please add the region error handler in the same order of errorpb.Error. - metrics.TiKVRegionErrorCounter.WithLabelValues(regionErrorToLabel(regionErr)).Inc() + isInternal := false + if req != nil { + isInternal = util.IsInternalRequest(req.GetRequestSource()) + } + metrics.TiKVRegionErrorCounter.WithLabelValues(regionErrorToLabel(regionErr), strconv.FormatBool(isInternal)).Inc() if notLeader := regionErr.GetNotLeader(); notLeader != nil { // Retry if error is `NotLeader`. diff --git a/metrics/metrics.go b/metrics/metrics.go index 4e9a6278..b72cfb5d 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -50,8 +50,8 @@ var ( TiKVCoprocessorHistogram *prometheus.HistogramVec TiKVLockResolverCounter *prometheus.CounterVec TiKVRegionErrorCounter *prometheus.CounterVec - TiKVTxnWriteKVCountHistogram prometheus.Histogram - TiKVTxnWriteSizeHistogram prometheus.Histogram + TiKVTxnWriteKVCountHistogram *prometheus.HistogramVec + TiKVTxnWriteSizeHistogram *prometheus.HistogramVec TiKVRawkvCmdHistogram *prometheus.HistogramVec TiKVRawkvSizeHistogram *prometheus.HistogramVec TiKVTxnRegionsNumHistogram *prometheus.HistogramVec @@ -150,7 +150,7 @@ func initMetrics(namespace, subsystem string) { Name: "request_seconds", Help: "Bucketed histogram of sending request duration.", Buckets: prometheus.ExponentialBuckets(0.0005, 2, 29), // 0.5ms ~ 1.5days - }, []string{LblType, LblStore, LblStaleRead}) + }, []string{LblType, LblStore, LblStaleRead, LblScope}) TiKVSendReqCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -158,7 +158,7 @@ func initMetrics(namespace, subsystem string) { Subsystem: subsystem, Name: "request_counter", Help: "Counter of sending request with multi dimensions.", - }, []string{LblType, LblStore, LblStaleRead, LblSource}) + }, []string{LblType, LblStore, LblStaleRead, LblSource, LblScope}) TiKVSendReqTimeCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -166,7 +166,7 @@ func initMetrics(namespace, subsystem string) { Subsystem: subsystem, Name: "request_time_counter", Help: "Counter of request time with multi dimensions.", - }, []string{LblType, LblStore, LblStaleRead, LblSource}) + }, []string{LblType, LblStore, LblStaleRead, LblSource, LblScope}) TiKVRPCNetLatencyHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -175,7 +175,7 @@ func initMetrics(namespace, subsystem string) { Name: "rpc_net_latency_seconds", Help: "Bucketed histogram of time difference between TiDB and TiKV.", Buckets: prometheus.ExponentialBuckets(5e-5, 2, 18), // 50us ~ 6.5s - }, []string{LblStore}) + }, []string{LblStore, LblScope}) TiKVCoprocessorHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -184,7 +184,7 @@ func initMetrics(namespace, subsystem string) { Name: "cop_duration_seconds", Help: "Run duration of a single coprocessor task, includes backoff time.", Buckets: prometheus.ExponentialBuckets(0.0005, 2, 29), // 0.5ms ~ 1.5days - }, []string{LblStore, LblStaleRead}) + }, []string{LblStore, LblStaleRead, LblScope}) TiKVLockResolverCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -200,25 +200,25 @@ func initMetrics(namespace, subsystem string) { Subsystem: subsystem, Name: "region_err_total", Help: "Counter of region errors.", - }, []string{LblType}) + }, []string{LblType, LblScope}) - TiKVTxnWriteKVCountHistogram = prometheus.NewHistogram( + TiKVTxnWriteKVCountHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: namespace, Subsystem: subsystem, Name: "txn_write_kv_num", Help: "Count of kv pairs to write in a transaction.", Buckets: prometheus.ExponentialBuckets(1, 4, 17), // 1 ~ 4G - }) + }, []string{LblScope}) - TiKVTxnWriteSizeHistogram = prometheus.NewHistogram( + TiKVTxnWriteSizeHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: namespace, Subsystem: subsystem, Name: "txn_write_size_bytes", Help: "Size of kv pairs to write in a transaction.", Buckets: prometheus.ExponentialBuckets(16, 4, 17), // 16Bytes ~ 64GB - }) + }, []string{LblScope}) TiKVRawkvCmdHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -245,7 +245,7 @@ func initMetrics(namespace, subsystem string) { Name: "txn_regions_num", Help: "Number of regions in a transaction.", Buckets: prometheus.ExponentialBuckets(1, 2, 25), // 1 ~ 16M - }, []string{LblType}) + }, []string{LblType, LblScope}) TiKVLoadSafepointCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ diff --git a/metrics/shortcuts.go b/metrics/shortcuts.go index ce97afee..d5bd41a2 100644 --- a/metrics/shortcuts.go +++ b/metrics/shortcuts.go @@ -74,14 +74,27 @@ var ( BackoffHistogramIsWitness prometheus.Observer BackoffHistogramEmpty prometheus.Observer - TxnRegionsNumHistogramWithSnapshot prometheus.Observer - TxnRegionsNumHistogramPrewrite prometheus.Observer - TxnRegionsNumHistogramCommit prometheus.Observer - TxnRegionsNumHistogramCleanup prometheus.Observer - TxnRegionsNumHistogramPessimisticLock prometheus.Observer - TxnRegionsNumHistogramPessimisticRollback prometheus.Observer - TxnRegionsNumHistogramWithCoprocessor prometheus.Observer - TxnRegionsNumHistogramWithBatchCoprocessor prometheus.Observer + TxnRegionsNumHistogramWithSnapshotInternal prometheus.Observer + TxnRegionsNumHistogramWithSnapshot prometheus.Observer + TxnRegionsNumHistogramPrewriteInternal prometheus.Observer + TxnRegionsNumHistogramPrewrite prometheus.Observer + TxnRegionsNumHistogramCommitInternal prometheus.Observer + TxnRegionsNumHistogramCommit prometheus.Observer + TxnRegionsNumHistogramCleanupInternal prometheus.Observer + TxnRegionsNumHistogramCleanup prometheus.Observer + TxnRegionsNumHistogramPessimisticLockInternal prometheus.Observer + TxnRegionsNumHistogramPessimisticLock prometheus.Observer + TxnRegionsNumHistogramPessimisticRollbackInternal prometheus.Observer + TxnRegionsNumHistogramPessimisticRollback prometheus.Observer + TxnRegionsNumHistogramWithCoprocessorInternal prometheus.Observer + TxnRegionsNumHistogramWithCoprocessor prometheus.Observer + TxnRegionsNumHistogramWithBatchCoprocessorInternal prometheus.Observer + TxnRegionsNumHistogramWithBatchCoprocessor prometheus.Observer + + TxnWriteKVCountHistogramInternal prometheus.Observer + TxnWriteKVCountHistogramGeneral prometheus.Observer + TxnWriteSizeHistogramInternal prometheus.Observer + TxnWriteSizeHistogramGeneral prometheus.Observer LockResolverCountWithBatchResolve prometheus.Counter LockResolverCountWithExpired prometheus.Counter @@ -185,14 +198,26 @@ func initShortcuts() { BackoffHistogramIsWitness = TiKVBackoffHistogram.WithLabelValues("isWitness") BackoffHistogramEmpty = TiKVBackoffHistogram.WithLabelValues("") - TxnRegionsNumHistogramWithSnapshot = TiKVTxnRegionsNumHistogram.WithLabelValues("snapshot") - TxnRegionsNumHistogramPrewrite = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_prewrite") - TxnRegionsNumHistogramCommit = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_commit") - TxnRegionsNumHistogramCleanup = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_cleanup") - TxnRegionsNumHistogramPessimisticLock = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_pessimistic_lock") - TxnRegionsNumHistogramPessimisticRollback = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_pessimistic_rollback") - TxnRegionsNumHistogramWithCoprocessor = TiKVTxnRegionsNumHistogram.WithLabelValues("coprocessor") - TxnRegionsNumHistogramWithBatchCoprocessor = TiKVTxnRegionsNumHistogram.WithLabelValues("batch_coprocessor") + TxnRegionsNumHistogramWithSnapshotInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("snapshot", LblInternal) + TxnRegionsNumHistogramWithSnapshot = TiKVTxnRegionsNumHistogram.WithLabelValues("snapshot", LblGeneral) + TxnRegionsNumHistogramPrewriteInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_prewrite", LblInternal) + TxnRegionsNumHistogramPrewrite = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_prewrite", LblGeneral) + TxnRegionsNumHistogramCommitInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_commit", LblInternal) + TxnRegionsNumHistogramCommit = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_commit", LblGeneral) + TxnRegionsNumHistogramCleanupInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_cleanup", LblInternal) + TxnRegionsNumHistogramCleanup = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_cleanup", LblGeneral) + TxnRegionsNumHistogramPessimisticLockInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_pessimistic_lock", LblInternal) + TxnRegionsNumHistogramPessimisticLock = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_pessimistic_lock", LblGeneral) + TxnRegionsNumHistogramPessimisticRollbackInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_pessimistic_rollback", LblInternal) + TxnRegionsNumHistogramPessimisticRollback = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_pessimistic_rollback", LblGeneral) + TxnRegionsNumHistogramWithCoprocessorInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("coprocessor", LblInternal) + TxnRegionsNumHistogramWithCoprocessor = TiKVTxnRegionsNumHistogram.WithLabelValues("batch_coprocessor", LblGeneral) + TxnRegionsNumHistogramWithBatchCoprocessorInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("coprocessor", LblInternal) + TxnRegionsNumHistogramWithBatchCoprocessor = TiKVTxnRegionsNumHistogram.WithLabelValues("batch_coprocessor", LblGeneral) + TxnWriteKVCountHistogramInternal = TiKVTxnWriteKVCountHistogram.WithLabelValues(LblInternal) + TxnWriteKVCountHistogramGeneral = TiKVTxnWriteKVCountHistogram.WithLabelValues(LblGeneral) + TxnWriteSizeHistogramInternal = TiKVTxnWriteKVCountHistogram.WithLabelValues(LblInternal) + TxnWriteSizeHistogramGeneral = TiKVTxnWriteKVCountHistogram.WithLabelValues(LblGeneral) LockResolverCountWithBatchResolve = TiKVLockResolverCounter.WithLabelValues("batch_resolve") LockResolverCountWithExpired = TiKVLockResolverCounter.WithLabelValues("expired") diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index 8e299b76..47277ab3 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -187,6 +187,10 @@ type twoPhaseCommitter struct { // assertion error happened when initializing mutations, could be false positive if pessimistic lock is lost stashedAssertionError error + + // isInternal means it's related to an internal transaction. It's only used by `asyncPessimisticRollback` as the + // committer may contain a nil `txn` pointer. + isInternal bool } type memBufferMutations struct { @@ -696,8 +700,15 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error { WriteKeys: c.mutations.Len(), ResolveLock: util.ResolveLockDetail{}, } - metrics.TiKVTxnWriteKVCountHistogram.Observe(float64(commitDetail.WriteKeys)) - metrics.TiKVTxnWriteSizeHistogram.Observe(float64(commitDetail.WriteSize)) + + isInternalReq := util.IsInternalRequest(c.txn.GetRequestSource()) + if isInternalReq { + metrics.TxnWriteKVCountHistogramInternal.Observe(float64(commitDetail.WriteKeys)) + metrics.TxnWriteSizeHistogramInternal.Observe(float64(commitDetail.WriteSize)) + } else { + metrics.TxnWriteKVCountHistogramGeneral.Observe(float64(commitDetail.WriteKeys)) + metrics.TxnWriteSizeHistogramGeneral.Observe(float64(commitDetail.WriteSize)) + } c.hasNoNeedCommitKeys = checkCnt > 0 c.lockTTL = txnLockTTL(txn.startTime, size) c.priority = txn.priority.ToPB() diff --git a/txnkv/transaction/cleanup.go b/txnkv/transaction/cleanup.go index 63b84ffc..03cf6d0c 100644 --- a/txnkv/transaction/cleanup.go +++ b/txnkv/transaction/cleanup.go @@ -46,19 +46,22 @@ import ( "go.uber.org/zap" ) -type actionCleanup struct{} +type actionCleanup struct{ isInternal bool } var _ twoPhaseCommitAction = actionCleanup{} -func (actionCleanup) String() string { +func (action actionCleanup) String() string { return "cleanup" } -func (actionCleanup) tiKVTxnRegionsNumHistogram() prometheus.Observer { +func (action actionCleanup) tiKVTxnRegionsNumHistogram() prometheus.Observer { + if action.isInternal { + return metrics.TxnRegionsNumHistogramCleanupInternal + } return metrics.TxnRegionsNumHistogramCleanup } -func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error { +func (action actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error { req := tikvrpc.NewRequest(tikvrpc.CmdBatchRollback, &kvrpcpb.BatchRollbackRequest{ Keys: batch.mutations.GetKeys(), StartVersion: c.startTS, @@ -99,5 +102,5 @@ func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer } func (c *twoPhaseCommitter) cleanupMutations(bo *retry.Backoffer, mutations CommitterMutations) error { - return c.doActionOnMutations(bo, actionCleanup{}, mutations) + return c.doActionOnMutations(bo, actionCleanup{isInternal: c.txn.isInternal()}, mutations) } diff --git a/txnkv/transaction/commit.go b/txnkv/transaction/commit.go index 15b61872..4871b07e 100644 --- a/txnkv/transaction/commit.go +++ b/txnkv/transaction/commit.go @@ -52,19 +52,25 @@ import ( "go.uber.org/zap" ) -type actionCommit struct{ retry bool } +type actionCommit struct { + retry bool + isInternal bool +} var _ twoPhaseCommitAction = actionCommit{} -func (actionCommit) String() string { +func (action actionCommit) String() string { return "commit" } -func (actionCommit) tiKVTxnRegionsNumHistogram() prometheus.Observer { +func (action actionCommit) tiKVTxnRegionsNumHistogram() prometheus.Observer { + if action.isInternal { + return metrics.TxnRegionsNumHistogramCommitInternal + } return metrics.TxnRegionsNumHistogramCommit } -func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error { +func (action actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error { keys := batch.mutations.GetKeys() req := tikvrpc.NewRequest(tikvrpc.CmdCommit, &kvrpcpb.CommitRequest{ StartVersion: c.startTS, @@ -132,7 +138,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, if same { continue } - return c.doActionOnMutations(bo, actionCommit{true}, batch.mutations) + return c.doActionOnMutations(bo, actionCommit{true, action.isInternal}, batch.mutations) } if resp.Resp == nil { @@ -220,5 +226,5 @@ func (c *twoPhaseCommitter) commitMutations(bo *retry.Backoffer, mutations Commi bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) } - return c.doActionOnMutations(bo, actionCommit{}, mutations) + return c.doActionOnMutations(bo, actionCommit{isInternal: c.txn.isInternal()}, mutations) } diff --git a/txnkv/transaction/pessimistic.go b/txnkv/transaction/pessimistic.go index d166caa8..fbdda273 100644 --- a/txnkv/transaction/pessimistic.go +++ b/txnkv/transaction/pessimistic.go @@ -61,27 +61,36 @@ import ( type actionPessimisticLock struct { *kv.LockCtx wakeUpMode kvrpcpb.PessimisticLockWakeUpMode + isInternal bool +} +type actionPessimisticRollback struct { + isInternal bool } -type actionPessimisticRollback struct{} var ( _ twoPhaseCommitAction = actionPessimisticLock{} _ twoPhaseCommitAction = actionPessimisticRollback{} ) -func (actionPessimisticLock) String() string { +func (action actionPessimisticLock) String() string { return "pessimistic_lock" } -func (actionPessimisticLock) tiKVTxnRegionsNumHistogram() prometheus.Observer { +func (action actionPessimisticLock) tiKVTxnRegionsNumHistogram() prometheus.Observer { + if action.isInternal { + return metrics.TxnRegionsNumHistogramPessimisticLockInternal + } return metrics.TxnRegionsNumHistogramPessimisticLock } -func (actionPessimisticRollback) String() string { +func (action actionPessimisticRollback) String() string { return "pessimistic_rollback" } -func (actionPessimisticRollback) tiKVTxnRegionsNumHistogram() prometheus.Observer { +func (action actionPessimisticRollback) tiKVTxnRegionsNumHistogram() prometheus.Observer { + if action.isInternal { + return metrics.TxnRegionsNumHistogramPessimisticRollbackInternal + } return metrics.TxnRegionsNumHistogramPessimisticRollback } @@ -540,9 +549,15 @@ func (c *twoPhaseCommitter) pessimisticLockMutations(bo *retry.Backoffer, lockCt } } } - return c.doActionOnMutations(bo, actionPessimisticLock{LockCtx: lockCtx, wakeUpMode: lockWaitMode}, mutations) + return c.doActionOnMutations(bo, actionPessimisticLock{LockCtx: lockCtx, wakeUpMode: lockWaitMode, isInternal: c.txn.isInternal()}, mutations) } func (c *twoPhaseCommitter) pessimisticRollbackMutations(bo *retry.Backoffer, mutations CommitterMutations) error { - return c.doActionOnMutations(bo, actionPessimisticRollback{}, mutations) + isInternal := false + if c.txn != nil { + isInternal = c.txn.isInternal() + } else { + isInternal = c.isInternal + } + return c.doActionOnMutations(bo, actionPessimisticRollback{isInternal: isInternal}, mutations) } diff --git a/txnkv/transaction/prewrite.go b/txnkv/transaction/prewrite.go index b74e06fe..5f6c385c 100644 --- a/txnkv/transaction/prewrite.go +++ b/txnkv/transaction/prewrite.go @@ -59,15 +59,21 @@ import ( "go.uber.org/zap" ) -type actionPrewrite struct{ retry bool } +type actionPrewrite struct { + retry bool + isInternal bool +} var _ twoPhaseCommitAction = actionPrewrite{} -func (actionPrewrite) String() string { +func (action actionPrewrite) String() string { return "prewrite" } -func (actionPrewrite) tiKVTxnRegionsNumHistogram() prometheus.Observer { +func (action actionPrewrite) tiKVTxnRegionsNumHistogram() prometheus.Observer { + if action.isInternal { + return metrics.TxnRegionsNumHistogramPrewriteInternal + } return metrics.TxnRegionsNumHistogramPrewrite } @@ -302,7 +308,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B if same { continue } - err = c.doActionOnMutations(bo, actionPrewrite{true}, batch.mutations) + err = c.doActionOnMutations(bo, actionPrewrite{true, action.isInternal}, batch.mutations) return err } @@ -436,5 +442,5 @@ func (c *twoPhaseCommitter) prewriteMutations(bo *retry.Backoffer, mutations Com } // `doActionOnMutations` will unset `useOnePC` if the mutations is splitted into multiple batches. - return c.doActionOnMutations(bo, actionPrewrite{}, mutations) + return c.doActionOnMutations(bo, actionPrewrite{isInternal: c.txn.isInternal()}, mutations) } diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 3a0fd049..de48d6e5 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -1280,6 +1280,7 @@ func (txn *KVTxn) asyncPessimisticRollback(ctx context.Context, keys [][]byte, s startTS: txn.committer.startTS, forUpdateTS: txn.committer.forUpdateTS, primaryKey: txn.committer.primaryKey, + isInternal: txn.isInternal(), } if specifiedForUpdateTS > committer.forUpdateTS { committer.forUpdateTS = specifiedForUpdateTS diff --git a/txnkv/txnsnapshot/snapshot.go b/txnkv/txnsnapshot/snapshot.go index 462fd876..ba0b5efc 100644 --- a/txnkv/txnsnapshot/snapshot.go +++ b/txnkv/txnsnapshot/snapshot.go @@ -335,7 +335,11 @@ func (s *KVSnapshot) batchGetKeysByRegions(bo *retry.Backoffer, keys [][]byte, c return err } - metrics.TxnRegionsNumHistogramWithSnapshot.Observe(float64(len(groups))) + if s.IsInternal() { + metrics.TxnRegionsNumHistogramWithSnapshotInternal.Observe(float64(len(groups))) + } else { + metrics.TxnRegionsNumHistogramWithSnapshot.Observe(float64(len(groups))) + } var batches []batchKeys for id, g := range groups {