metrics: seperate metrics with source scope for txn command (#723)

* seperate metrics with source scope

Signed-off-by: cfzjywxk <lsswxrxr@163.com>

* fix async pessimistic rollback

Signed-off-by: cfzjywxk <lsswxrxr@163.com>

* fix missing definition

Signed-off-by: cfzjywxk <lsswxrxr@163.com>

---------

Signed-off-by: cfzjywxk <lsswxrxr@163.com>
This commit is contained in:
cfzjywxk 2023-03-06 22:12:45 +08:00 committed by GitHub
parent 31d55e522c
commit bdd41058aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 155 additions and 65 deletions

View File

@ -487,9 +487,10 @@ var (
) )
type sendReqHistCacheKey struct { type sendReqHistCacheKey struct {
tp tikvrpc.CmdType tp tikvrpc.CmdType
id uint64 id uint64
staleRad bool staleRad bool
isInternal bool
} }
type sendReqCounterCacheKey struct { type sendReqCounterCacheKey struct {
@ -497,6 +498,11 @@ type sendReqCounterCacheKey struct {
requestSource string requestSource string
} }
type rpcNetLatencyCacheKey struct {
storeID uint64
isInternal bool
}
type sendReqCounterCacheValue struct { type sendReqCounterCacheValue struct {
counter prometheus.Counter counter prometheus.Counter
timeCounter prometheus.Counter timeCounter prometheus.Counter
@ -506,11 +512,13 @@ func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, resp *tikvr
elapsed := time.Since(start) elapsed := time.Since(start)
secs := elapsed.Seconds() secs := elapsed.Seconds()
storeID := req.Context.GetPeer().GetStoreId() storeID := req.Context.GetPeer().GetStoreId()
isInternal := util.IsInternalRequest(req.GetRequestSource())
histKey := sendReqHistCacheKey{ histKey := sendReqHistCacheKey{
req.Type, req.Type,
storeID, storeID,
staleRead, staleRead,
isInternal,
} }
counterKey := sendReqCounterCacheKey{ counterKey := sendReqCounterCacheKey{
histKey, histKey,
@ -525,7 +533,8 @@ func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, resp *tikvr
if len(storeIDStr) == 0 { if len(storeIDStr) == 0 {
storeIDStr = strconv.FormatUint(storeID, 10) storeIDStr = strconv.FormatUint(storeID, 10)
} }
hist = metrics.TiKVSendReqHistogram.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead)) hist = metrics.TiKVSendReqHistogram.WithLabelValues(reqType, storeIDStr,
strconv.FormatBool(staleRead), strconv.FormatBool(isInternal))
sendReqHistCache.Store(histKey, hist) sendReqHistCache.Store(histKey, hist)
} }
counter, ok := sendReqCounterCache.Load(counterKey) counter, ok := sendReqCounterCache.Load(counterKey)
@ -534,8 +543,10 @@ func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, resp *tikvr
storeIDStr = strconv.FormatUint(storeID, 10) storeIDStr = strconv.FormatUint(storeID, 10)
} }
counter = sendReqCounterCacheValue{ counter = sendReqCounterCacheValue{
metrics.TiKVSendReqCounter.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead), counterKey.requestSource), metrics.TiKVSendReqCounter.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead),
metrics.TiKVSendReqTimeCounter.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead), counterKey.requestSource), counterKey.requestSource, strconv.FormatBool(isInternal)),
metrics.TiKVSendReqTimeCounter.WithLabelValues(reqType, storeIDStr,
strconv.FormatBool(staleRead), counterKey.requestSource, strconv.FormatBool(isInternal)),
} }
sendReqCounterCache.Store(counterKey, counter) sendReqCounterCache.Store(counterKey, counter)
} }
@ -546,13 +557,17 @@ func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, resp *tikvr
if execDetail := resp.GetExecDetailsV2(); execDetail != nil && if execDetail := resp.GetExecDetailsV2(); execDetail != nil &&
execDetail.TimeDetail != nil && execDetail.TimeDetail.TotalRpcWallTimeNs > 0 { execDetail.TimeDetail != nil && execDetail.TimeDetail.TotalRpcWallTimeNs > 0 {
latHist, ok := rpcNetLatencyHistCache.Load(storeID) cacheKey := rpcNetLatencyCacheKey{
storeID,
isInternal,
}
latHist, ok := rpcNetLatencyHistCache.Load(cacheKey)
if !ok { if !ok {
if len(storeIDStr) == 0 { if len(storeIDStr) == 0 {
storeIDStr = strconv.FormatUint(storeID, 10) storeIDStr = strconv.FormatUint(storeID, 10)
} }
latHist = metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(storeIDStr) latHist = metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(storeIDStr, strconv.FormatBool(isInternal))
rpcNetLatencyHistCache.Store(storeID, latHist) rpcNetLatencyHistCache.Store(cacheKey, latHist)
} }
latency := elapsed - time.Duration(execDetail.TimeDetail.TotalRpcWallTimeNs)*time.Nanosecond latency := elapsed - time.Duration(execDetail.TimeDetail.TotalRpcWallTimeNs)*time.Nanosecond
latHist.(prometheus.Observer).Observe(latency.Seconds()) latHist.(prometheus.Observer).Observe(latency.Seconds())

View File

@ -1571,7 +1571,11 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
} }
// NOTE: Please add the region error handler in the same order of errorpb.Error. // NOTE: Please add the region error handler in the same order of errorpb.Error.
metrics.TiKVRegionErrorCounter.WithLabelValues(regionErrorToLabel(regionErr)).Inc() isInternal := false
if req != nil {
isInternal = util.IsInternalRequest(req.GetRequestSource())
}
metrics.TiKVRegionErrorCounter.WithLabelValues(regionErrorToLabel(regionErr), strconv.FormatBool(isInternal)).Inc()
if notLeader := regionErr.GetNotLeader(); notLeader != nil { if notLeader := regionErr.GetNotLeader(); notLeader != nil {
// Retry if error is `NotLeader`. // Retry if error is `NotLeader`.

View File

@ -50,8 +50,8 @@ var (
TiKVCoprocessorHistogram *prometheus.HistogramVec TiKVCoprocessorHistogram *prometheus.HistogramVec
TiKVLockResolverCounter *prometheus.CounterVec TiKVLockResolverCounter *prometheus.CounterVec
TiKVRegionErrorCounter *prometheus.CounterVec TiKVRegionErrorCounter *prometheus.CounterVec
TiKVTxnWriteKVCountHistogram prometheus.Histogram TiKVTxnWriteKVCountHistogram *prometheus.HistogramVec
TiKVTxnWriteSizeHistogram prometheus.Histogram TiKVTxnWriteSizeHistogram *prometheus.HistogramVec
TiKVRawkvCmdHistogram *prometheus.HistogramVec TiKVRawkvCmdHistogram *prometheus.HistogramVec
TiKVRawkvSizeHistogram *prometheus.HistogramVec TiKVRawkvSizeHistogram *prometheus.HistogramVec
TiKVTxnRegionsNumHistogram *prometheus.HistogramVec TiKVTxnRegionsNumHistogram *prometheus.HistogramVec
@ -150,7 +150,7 @@ func initMetrics(namespace, subsystem string) {
Name: "request_seconds", Name: "request_seconds",
Help: "Bucketed histogram of sending request duration.", Help: "Bucketed histogram of sending request duration.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 29), // 0.5ms ~ 1.5days Buckets: prometheus.ExponentialBuckets(0.0005, 2, 29), // 0.5ms ~ 1.5days
}, []string{LblType, LblStore, LblStaleRead}) }, []string{LblType, LblStore, LblStaleRead, LblScope})
TiKVSendReqCounter = prometheus.NewCounterVec( TiKVSendReqCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{ prometheus.CounterOpts{
@ -158,7 +158,7 @@ func initMetrics(namespace, subsystem string) {
Subsystem: subsystem, Subsystem: subsystem,
Name: "request_counter", Name: "request_counter",
Help: "Counter of sending request with multi dimensions.", Help: "Counter of sending request with multi dimensions.",
}, []string{LblType, LblStore, LblStaleRead, LblSource}) }, []string{LblType, LblStore, LblStaleRead, LblSource, LblScope})
TiKVSendReqTimeCounter = prometheus.NewCounterVec( TiKVSendReqTimeCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{ prometheus.CounterOpts{
@ -166,7 +166,7 @@ func initMetrics(namespace, subsystem string) {
Subsystem: subsystem, Subsystem: subsystem,
Name: "request_time_counter", Name: "request_time_counter",
Help: "Counter of request time with multi dimensions.", Help: "Counter of request time with multi dimensions.",
}, []string{LblType, LblStore, LblStaleRead, LblSource}) }, []string{LblType, LblStore, LblStaleRead, LblSource, LblScope})
TiKVRPCNetLatencyHistogram = prometheus.NewHistogramVec( TiKVRPCNetLatencyHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{ prometheus.HistogramOpts{
@ -175,7 +175,7 @@ func initMetrics(namespace, subsystem string) {
Name: "rpc_net_latency_seconds", Name: "rpc_net_latency_seconds",
Help: "Bucketed histogram of time difference between TiDB and TiKV.", Help: "Bucketed histogram of time difference between TiDB and TiKV.",
Buckets: prometheus.ExponentialBuckets(5e-5, 2, 18), // 50us ~ 6.5s Buckets: prometheus.ExponentialBuckets(5e-5, 2, 18), // 50us ~ 6.5s
}, []string{LblStore}) }, []string{LblStore, LblScope})
TiKVCoprocessorHistogram = prometheus.NewHistogramVec( TiKVCoprocessorHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{ prometheus.HistogramOpts{
@ -184,7 +184,7 @@ func initMetrics(namespace, subsystem string) {
Name: "cop_duration_seconds", Name: "cop_duration_seconds",
Help: "Run duration of a single coprocessor task, includes backoff time.", Help: "Run duration of a single coprocessor task, includes backoff time.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 29), // 0.5ms ~ 1.5days Buckets: prometheus.ExponentialBuckets(0.0005, 2, 29), // 0.5ms ~ 1.5days
}, []string{LblStore, LblStaleRead}) }, []string{LblStore, LblStaleRead, LblScope})
TiKVLockResolverCounter = prometheus.NewCounterVec( TiKVLockResolverCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{ prometheus.CounterOpts{
@ -200,25 +200,25 @@ func initMetrics(namespace, subsystem string) {
Subsystem: subsystem, Subsystem: subsystem,
Name: "region_err_total", Name: "region_err_total",
Help: "Counter of region errors.", Help: "Counter of region errors.",
}, []string{LblType}) }, []string{LblType, LblScope})
TiKVTxnWriteKVCountHistogram = prometheus.NewHistogram( TiKVTxnWriteKVCountHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{ prometheus.HistogramOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "txn_write_kv_num", Name: "txn_write_kv_num",
Help: "Count of kv pairs to write in a transaction.", Help: "Count of kv pairs to write in a transaction.",
Buckets: prometheus.ExponentialBuckets(1, 4, 17), // 1 ~ 4G Buckets: prometheus.ExponentialBuckets(1, 4, 17), // 1 ~ 4G
}) }, []string{LblScope})
TiKVTxnWriteSizeHistogram = prometheus.NewHistogram( TiKVTxnWriteSizeHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{ prometheus.HistogramOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "txn_write_size_bytes", Name: "txn_write_size_bytes",
Help: "Size of kv pairs to write in a transaction.", Help: "Size of kv pairs to write in a transaction.",
Buckets: prometheus.ExponentialBuckets(16, 4, 17), // 16Bytes ~ 64GB Buckets: prometheus.ExponentialBuckets(16, 4, 17), // 16Bytes ~ 64GB
}) }, []string{LblScope})
TiKVRawkvCmdHistogram = prometheus.NewHistogramVec( TiKVRawkvCmdHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{ prometheus.HistogramOpts{
@ -245,7 +245,7 @@ func initMetrics(namespace, subsystem string) {
Name: "txn_regions_num", Name: "txn_regions_num",
Help: "Number of regions in a transaction.", Help: "Number of regions in a transaction.",
Buckets: prometheus.ExponentialBuckets(1, 2, 25), // 1 ~ 16M Buckets: prometheus.ExponentialBuckets(1, 2, 25), // 1 ~ 16M
}, []string{LblType}) }, []string{LblType, LblScope})
TiKVLoadSafepointCounter = prometheus.NewCounterVec( TiKVLoadSafepointCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{ prometheus.CounterOpts{

View File

@ -74,14 +74,27 @@ var (
BackoffHistogramIsWitness prometheus.Observer BackoffHistogramIsWitness prometheus.Observer
BackoffHistogramEmpty prometheus.Observer BackoffHistogramEmpty prometheus.Observer
TxnRegionsNumHistogramWithSnapshot prometheus.Observer TxnRegionsNumHistogramWithSnapshotInternal prometheus.Observer
TxnRegionsNumHistogramPrewrite prometheus.Observer TxnRegionsNumHistogramWithSnapshot prometheus.Observer
TxnRegionsNumHistogramCommit prometheus.Observer TxnRegionsNumHistogramPrewriteInternal prometheus.Observer
TxnRegionsNumHistogramCleanup prometheus.Observer TxnRegionsNumHistogramPrewrite prometheus.Observer
TxnRegionsNumHistogramPessimisticLock prometheus.Observer TxnRegionsNumHistogramCommitInternal prometheus.Observer
TxnRegionsNumHistogramPessimisticRollback prometheus.Observer TxnRegionsNumHistogramCommit prometheus.Observer
TxnRegionsNumHistogramWithCoprocessor prometheus.Observer TxnRegionsNumHistogramCleanupInternal prometheus.Observer
TxnRegionsNumHistogramWithBatchCoprocessor prometheus.Observer TxnRegionsNumHistogramCleanup prometheus.Observer
TxnRegionsNumHistogramPessimisticLockInternal prometheus.Observer
TxnRegionsNumHistogramPessimisticLock prometheus.Observer
TxnRegionsNumHistogramPessimisticRollbackInternal prometheus.Observer
TxnRegionsNumHistogramPessimisticRollback prometheus.Observer
TxnRegionsNumHistogramWithCoprocessorInternal prometheus.Observer
TxnRegionsNumHistogramWithCoprocessor prometheus.Observer
TxnRegionsNumHistogramWithBatchCoprocessorInternal prometheus.Observer
TxnRegionsNumHistogramWithBatchCoprocessor prometheus.Observer
TxnWriteKVCountHistogramInternal prometheus.Observer
TxnWriteKVCountHistogramGeneral prometheus.Observer
TxnWriteSizeHistogramInternal prometheus.Observer
TxnWriteSizeHistogramGeneral prometheus.Observer
LockResolverCountWithBatchResolve prometheus.Counter LockResolverCountWithBatchResolve prometheus.Counter
LockResolverCountWithExpired prometheus.Counter LockResolverCountWithExpired prometheus.Counter
@ -185,14 +198,26 @@ func initShortcuts() {
BackoffHistogramIsWitness = TiKVBackoffHistogram.WithLabelValues("isWitness") BackoffHistogramIsWitness = TiKVBackoffHistogram.WithLabelValues("isWitness")
BackoffHistogramEmpty = TiKVBackoffHistogram.WithLabelValues("") BackoffHistogramEmpty = TiKVBackoffHistogram.WithLabelValues("")
TxnRegionsNumHistogramWithSnapshot = TiKVTxnRegionsNumHistogram.WithLabelValues("snapshot") TxnRegionsNumHistogramWithSnapshotInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("snapshot", LblInternal)
TxnRegionsNumHistogramPrewrite = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_prewrite") TxnRegionsNumHistogramWithSnapshot = TiKVTxnRegionsNumHistogram.WithLabelValues("snapshot", LblGeneral)
TxnRegionsNumHistogramCommit = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_commit") TxnRegionsNumHistogramPrewriteInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_prewrite", LblInternal)
TxnRegionsNumHistogramCleanup = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_cleanup") TxnRegionsNumHistogramPrewrite = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_prewrite", LblGeneral)
TxnRegionsNumHistogramPessimisticLock = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_pessimistic_lock") TxnRegionsNumHistogramCommitInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_commit", LblInternal)
TxnRegionsNumHistogramPessimisticRollback = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_pessimistic_rollback") TxnRegionsNumHistogramCommit = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_commit", LblGeneral)
TxnRegionsNumHistogramWithCoprocessor = TiKVTxnRegionsNumHistogram.WithLabelValues("coprocessor") TxnRegionsNumHistogramCleanupInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_cleanup", LblInternal)
TxnRegionsNumHistogramWithBatchCoprocessor = TiKVTxnRegionsNumHistogram.WithLabelValues("batch_coprocessor") TxnRegionsNumHistogramCleanup = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_cleanup", LblGeneral)
TxnRegionsNumHistogramPessimisticLockInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_pessimistic_lock", LblInternal)
TxnRegionsNumHistogramPessimisticLock = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_pessimistic_lock", LblGeneral)
TxnRegionsNumHistogramPessimisticRollbackInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_pessimistic_rollback", LblInternal)
TxnRegionsNumHistogramPessimisticRollback = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_pessimistic_rollback", LblGeneral)
TxnRegionsNumHistogramWithCoprocessorInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("coprocessor", LblInternal)
TxnRegionsNumHistogramWithCoprocessor = TiKVTxnRegionsNumHistogram.WithLabelValues("batch_coprocessor", LblGeneral)
TxnRegionsNumHistogramWithBatchCoprocessorInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("coprocessor", LblInternal)
TxnRegionsNumHistogramWithBatchCoprocessor = TiKVTxnRegionsNumHistogram.WithLabelValues("batch_coprocessor", LblGeneral)
TxnWriteKVCountHistogramInternal = TiKVTxnWriteKVCountHistogram.WithLabelValues(LblInternal)
TxnWriteKVCountHistogramGeneral = TiKVTxnWriteKVCountHistogram.WithLabelValues(LblGeneral)
TxnWriteSizeHistogramInternal = TiKVTxnWriteKVCountHistogram.WithLabelValues(LblInternal)
TxnWriteSizeHistogramGeneral = TiKVTxnWriteKVCountHistogram.WithLabelValues(LblGeneral)
LockResolverCountWithBatchResolve = TiKVLockResolverCounter.WithLabelValues("batch_resolve") LockResolverCountWithBatchResolve = TiKVLockResolverCounter.WithLabelValues("batch_resolve")
LockResolverCountWithExpired = TiKVLockResolverCounter.WithLabelValues("expired") LockResolverCountWithExpired = TiKVLockResolverCounter.WithLabelValues("expired")

View File

@ -187,6 +187,10 @@ type twoPhaseCommitter struct {
// assertion error happened when initializing mutations, could be false positive if pessimistic lock is lost // assertion error happened when initializing mutations, could be false positive if pessimistic lock is lost
stashedAssertionError error stashedAssertionError error
// isInternal means it's related to an internal transaction. It's only used by `asyncPessimisticRollback` as the
// committer may contain a nil `txn` pointer.
isInternal bool
} }
type memBufferMutations struct { type memBufferMutations struct {
@ -696,8 +700,15 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error {
WriteKeys: c.mutations.Len(), WriteKeys: c.mutations.Len(),
ResolveLock: util.ResolveLockDetail{}, ResolveLock: util.ResolveLockDetail{},
} }
metrics.TiKVTxnWriteKVCountHistogram.Observe(float64(commitDetail.WriteKeys))
metrics.TiKVTxnWriteSizeHistogram.Observe(float64(commitDetail.WriteSize)) isInternalReq := util.IsInternalRequest(c.txn.GetRequestSource())
if isInternalReq {
metrics.TxnWriteKVCountHistogramInternal.Observe(float64(commitDetail.WriteKeys))
metrics.TxnWriteSizeHistogramInternal.Observe(float64(commitDetail.WriteSize))
} else {
metrics.TxnWriteKVCountHistogramGeneral.Observe(float64(commitDetail.WriteKeys))
metrics.TxnWriteSizeHistogramGeneral.Observe(float64(commitDetail.WriteSize))
}
c.hasNoNeedCommitKeys = checkCnt > 0 c.hasNoNeedCommitKeys = checkCnt > 0
c.lockTTL = txnLockTTL(txn.startTime, size) c.lockTTL = txnLockTTL(txn.startTime, size)
c.priority = txn.priority.ToPB() c.priority = txn.priority.ToPB()

View File

@ -46,19 +46,22 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
type actionCleanup struct{} type actionCleanup struct{ isInternal bool }
var _ twoPhaseCommitAction = actionCleanup{} var _ twoPhaseCommitAction = actionCleanup{}
func (actionCleanup) String() string { func (action actionCleanup) String() string {
return "cleanup" return "cleanup"
} }
func (actionCleanup) tiKVTxnRegionsNumHistogram() prometheus.Observer { func (action actionCleanup) tiKVTxnRegionsNumHistogram() prometheus.Observer {
if action.isInternal {
return metrics.TxnRegionsNumHistogramCleanupInternal
}
return metrics.TxnRegionsNumHistogramCleanup return metrics.TxnRegionsNumHistogramCleanup
} }
func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error { func (action actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error {
req := tikvrpc.NewRequest(tikvrpc.CmdBatchRollback, &kvrpcpb.BatchRollbackRequest{ req := tikvrpc.NewRequest(tikvrpc.CmdBatchRollback, &kvrpcpb.BatchRollbackRequest{
Keys: batch.mutations.GetKeys(), Keys: batch.mutations.GetKeys(),
StartVersion: c.startTS, StartVersion: c.startTS,
@ -99,5 +102,5 @@ func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer
} }
func (c *twoPhaseCommitter) cleanupMutations(bo *retry.Backoffer, mutations CommitterMutations) error { func (c *twoPhaseCommitter) cleanupMutations(bo *retry.Backoffer, mutations CommitterMutations) error {
return c.doActionOnMutations(bo, actionCleanup{}, mutations) return c.doActionOnMutations(bo, actionCleanup{isInternal: c.txn.isInternal()}, mutations)
} }

View File

@ -52,19 +52,25 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
type actionCommit struct{ retry bool } type actionCommit struct {
retry bool
isInternal bool
}
var _ twoPhaseCommitAction = actionCommit{} var _ twoPhaseCommitAction = actionCommit{}
func (actionCommit) String() string { func (action actionCommit) String() string {
return "commit" return "commit"
} }
func (actionCommit) tiKVTxnRegionsNumHistogram() prometheus.Observer { func (action actionCommit) tiKVTxnRegionsNumHistogram() prometheus.Observer {
if action.isInternal {
return metrics.TxnRegionsNumHistogramCommitInternal
}
return metrics.TxnRegionsNumHistogramCommit return metrics.TxnRegionsNumHistogramCommit
} }
func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error { func (action actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error {
keys := batch.mutations.GetKeys() keys := batch.mutations.GetKeys()
req := tikvrpc.NewRequest(tikvrpc.CmdCommit, &kvrpcpb.CommitRequest{ req := tikvrpc.NewRequest(tikvrpc.CmdCommit, &kvrpcpb.CommitRequest{
StartVersion: c.startTS, StartVersion: c.startTS,
@ -132,7 +138,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer,
if same { if same {
continue continue
} }
return c.doActionOnMutations(bo, actionCommit{true}, batch.mutations) return c.doActionOnMutations(bo, actionCommit{true, action.isInternal}, batch.mutations)
} }
if resp.Resp == nil { if resp.Resp == nil {
@ -220,5 +226,5 @@ func (c *twoPhaseCommitter) commitMutations(bo *retry.Backoffer, mutations Commi
bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1))
} }
return c.doActionOnMutations(bo, actionCommit{}, mutations) return c.doActionOnMutations(bo, actionCommit{isInternal: c.txn.isInternal()}, mutations)
} }

View File

@ -61,27 +61,36 @@ import (
type actionPessimisticLock struct { type actionPessimisticLock struct {
*kv.LockCtx *kv.LockCtx
wakeUpMode kvrpcpb.PessimisticLockWakeUpMode wakeUpMode kvrpcpb.PessimisticLockWakeUpMode
isInternal bool
}
type actionPessimisticRollback struct {
isInternal bool
} }
type actionPessimisticRollback struct{}
var ( var (
_ twoPhaseCommitAction = actionPessimisticLock{} _ twoPhaseCommitAction = actionPessimisticLock{}
_ twoPhaseCommitAction = actionPessimisticRollback{} _ twoPhaseCommitAction = actionPessimisticRollback{}
) )
func (actionPessimisticLock) String() string { func (action actionPessimisticLock) String() string {
return "pessimistic_lock" return "pessimistic_lock"
} }
func (actionPessimisticLock) tiKVTxnRegionsNumHistogram() prometheus.Observer { func (action actionPessimisticLock) tiKVTxnRegionsNumHistogram() prometheus.Observer {
if action.isInternal {
return metrics.TxnRegionsNumHistogramPessimisticLockInternal
}
return metrics.TxnRegionsNumHistogramPessimisticLock return metrics.TxnRegionsNumHistogramPessimisticLock
} }
func (actionPessimisticRollback) String() string { func (action actionPessimisticRollback) String() string {
return "pessimistic_rollback" return "pessimistic_rollback"
} }
func (actionPessimisticRollback) tiKVTxnRegionsNumHistogram() prometheus.Observer { func (action actionPessimisticRollback) tiKVTxnRegionsNumHistogram() prometheus.Observer {
if action.isInternal {
return metrics.TxnRegionsNumHistogramPessimisticRollbackInternal
}
return metrics.TxnRegionsNumHistogramPessimisticRollback return metrics.TxnRegionsNumHistogramPessimisticRollback
} }
@ -540,9 +549,15 @@ func (c *twoPhaseCommitter) pessimisticLockMutations(bo *retry.Backoffer, lockCt
} }
} }
} }
return c.doActionOnMutations(bo, actionPessimisticLock{LockCtx: lockCtx, wakeUpMode: lockWaitMode}, mutations) return c.doActionOnMutations(bo, actionPessimisticLock{LockCtx: lockCtx, wakeUpMode: lockWaitMode, isInternal: c.txn.isInternal()}, mutations)
} }
func (c *twoPhaseCommitter) pessimisticRollbackMutations(bo *retry.Backoffer, mutations CommitterMutations) error { func (c *twoPhaseCommitter) pessimisticRollbackMutations(bo *retry.Backoffer, mutations CommitterMutations) error {
return c.doActionOnMutations(bo, actionPessimisticRollback{}, mutations) isInternal := false
if c.txn != nil {
isInternal = c.txn.isInternal()
} else {
isInternal = c.isInternal
}
return c.doActionOnMutations(bo, actionPessimisticRollback{isInternal: isInternal}, mutations)
} }

View File

@ -59,15 +59,21 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
type actionPrewrite struct{ retry bool } type actionPrewrite struct {
retry bool
isInternal bool
}
var _ twoPhaseCommitAction = actionPrewrite{} var _ twoPhaseCommitAction = actionPrewrite{}
func (actionPrewrite) String() string { func (action actionPrewrite) String() string {
return "prewrite" return "prewrite"
} }
func (actionPrewrite) tiKVTxnRegionsNumHistogram() prometheus.Observer { func (action actionPrewrite) tiKVTxnRegionsNumHistogram() prometheus.Observer {
if action.isInternal {
return metrics.TxnRegionsNumHistogramPrewriteInternal
}
return metrics.TxnRegionsNumHistogramPrewrite return metrics.TxnRegionsNumHistogramPrewrite
} }
@ -302,7 +308,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B
if same { if same {
continue continue
} }
err = c.doActionOnMutations(bo, actionPrewrite{true}, batch.mutations) err = c.doActionOnMutations(bo, actionPrewrite{true, action.isInternal}, batch.mutations)
return err return err
} }
@ -436,5 +442,5 @@ func (c *twoPhaseCommitter) prewriteMutations(bo *retry.Backoffer, mutations Com
} }
// `doActionOnMutations` will unset `useOnePC` if the mutations is splitted into multiple batches. // `doActionOnMutations` will unset `useOnePC` if the mutations is splitted into multiple batches.
return c.doActionOnMutations(bo, actionPrewrite{}, mutations) return c.doActionOnMutations(bo, actionPrewrite{isInternal: c.txn.isInternal()}, mutations)
} }

View File

@ -1280,6 +1280,7 @@ func (txn *KVTxn) asyncPessimisticRollback(ctx context.Context, keys [][]byte, s
startTS: txn.committer.startTS, startTS: txn.committer.startTS,
forUpdateTS: txn.committer.forUpdateTS, forUpdateTS: txn.committer.forUpdateTS,
primaryKey: txn.committer.primaryKey, primaryKey: txn.committer.primaryKey,
isInternal: txn.isInternal(),
} }
if specifiedForUpdateTS > committer.forUpdateTS { if specifiedForUpdateTS > committer.forUpdateTS {
committer.forUpdateTS = specifiedForUpdateTS committer.forUpdateTS = specifiedForUpdateTS

View File

@ -335,7 +335,11 @@ func (s *KVSnapshot) batchGetKeysByRegions(bo *retry.Backoffer, keys [][]byte, c
return err return err
} }
metrics.TxnRegionsNumHistogramWithSnapshot.Observe(float64(len(groups))) if s.IsInternal() {
metrics.TxnRegionsNumHistogramWithSnapshotInternal.Observe(float64(len(groups)))
} else {
metrics.TxnRegionsNumHistogramWithSnapshot.Observe(float64(len(groups)))
}
var batches []batchKeys var batches []batchKeys
for id, g := range groups { for id, g := range groups {