Merge branch 'master' into support_engine_role

This commit is contained in:
guo-shaoge 2023-03-07 15:47:49 +08:00 committed by GitHub
commit bd24a9e443
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 177 additions and 65 deletions

View File

@ -487,9 +487,10 @@ var (
)
type sendReqHistCacheKey struct {
tp tikvrpc.CmdType
id uint64
staleRad bool
tp tikvrpc.CmdType
id uint64
staleRad bool
isInternal bool
}
type sendReqCounterCacheKey struct {
@ -497,6 +498,11 @@ type sendReqCounterCacheKey struct {
requestSource string
}
type rpcNetLatencyCacheKey struct {
storeID uint64
isInternal bool
}
type sendReqCounterCacheValue struct {
counter prometheus.Counter
timeCounter prometheus.Counter
@ -506,11 +512,13 @@ func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, resp *tikvr
elapsed := time.Since(start)
secs := elapsed.Seconds()
storeID := req.Context.GetPeer().GetStoreId()
isInternal := util.IsInternalRequest(req.GetRequestSource())
histKey := sendReqHistCacheKey{
req.Type,
storeID,
staleRead,
isInternal,
}
counterKey := sendReqCounterCacheKey{
histKey,
@ -525,7 +533,8 @@ func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, resp *tikvr
if len(storeIDStr) == 0 {
storeIDStr = strconv.FormatUint(storeID, 10)
}
hist = metrics.TiKVSendReqHistogram.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead))
hist = metrics.TiKVSendReqHistogram.WithLabelValues(reqType, storeIDStr,
strconv.FormatBool(staleRead), strconv.FormatBool(isInternal))
sendReqHistCache.Store(histKey, hist)
}
counter, ok := sendReqCounterCache.Load(counterKey)
@ -534,8 +543,10 @@ func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, resp *tikvr
storeIDStr = strconv.FormatUint(storeID, 10)
}
counter = sendReqCounterCacheValue{
metrics.TiKVSendReqCounter.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead), counterKey.requestSource),
metrics.TiKVSendReqTimeCounter.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead), counterKey.requestSource),
metrics.TiKVSendReqCounter.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead),
counterKey.requestSource, strconv.FormatBool(isInternal)),
metrics.TiKVSendReqTimeCounter.WithLabelValues(reqType, storeIDStr,
strconv.FormatBool(staleRead), counterKey.requestSource, strconv.FormatBool(isInternal)),
}
sendReqCounterCache.Store(counterKey, counter)
}
@ -546,13 +557,17 @@ func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, resp *tikvr
if execDetail := resp.GetExecDetailsV2(); execDetail != nil &&
execDetail.TimeDetail != nil && execDetail.TimeDetail.TotalRpcWallTimeNs > 0 {
latHist, ok := rpcNetLatencyHistCache.Load(storeID)
cacheKey := rpcNetLatencyCacheKey{
storeID,
isInternal,
}
latHist, ok := rpcNetLatencyHistCache.Load(cacheKey)
if !ok {
if len(storeIDStr) == 0 {
storeIDStr = strconv.FormatUint(storeID, 10)
}
latHist = metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(storeIDStr)
rpcNetLatencyHistCache.Store(storeID, latHist)
latHist = metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(storeIDStr, strconv.FormatBool(isInternal))
rpcNetLatencyHistCache.Store(cacheKey, latHist)
}
latency := elapsed - time.Duration(execDetail.TimeDetail.TotalRpcWallTimeNs)*time.Nanosecond
latHist.(prometheus.Observer).Observe(latency.Seconds())

View File

@ -950,6 +950,20 @@ func (c *RegionCache) LocateKey(bo *retry.Backoffer, key []byte) (*KeyLocation,
}, nil
}
// TryLocateKey searches for the region and range that the key is located, but return nil when region miss or invalid.
func (c *RegionCache) TryLocateKey(key []byte) *KeyLocation {
r := c.tryFindRegionByKey(key, false)
if r == nil {
return nil
}
return &KeyLocation{
Region: r.VerID(),
StartKey: r.StartKey(),
EndKey: r.EndKey(),
Buckets: r.getStore().buckets,
}
}
// LocateEndKey searches for the region and range that the key is located.
// Unlike LocateKey, start key of a region is exclusive and end key is inclusive.
func (c *RegionCache) LocateEndKey(bo *retry.Backoffer, key []byte) (*KeyLocation, error) {
@ -997,6 +1011,14 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey
return r, nil
}
func (c *RegionCache) tryFindRegionByKey(key []byte, isEndKey bool) (r *Region) {
r = c.searchCachedRegion(key, isEndKey)
if r == nil || r.checkNeedReloadAndMarkUpdated() {
return nil
}
return r
}
// OnSendFailForTiFlash handles send request fail logic for tiflash.
func (c *RegionCache) OnSendFailForTiFlash(bo *retry.Backoffer, store *Store, region RegionVerID, prev *metapb.Region, scheduleReload bool, err error, skipSwitchPeerLog bool) {
r := c.GetCachedRegionWithRLock(region)

View File

@ -1560,7 +1560,11 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
}
// NOTE: Please add the region error handler in the same order of errorpb.Error.
metrics.TiKVRegionErrorCounter.WithLabelValues(regionErrorToLabel(regionErr)).Inc()
isInternal := false
if req != nil {
isInternal = util.IsInternalRequest(req.GetRequestSource())
}
metrics.TiKVRegionErrorCounter.WithLabelValues(regionErrorToLabel(regionErr), strconv.FormatBool(isInternal)).Inc()
if notLeader := regionErr.GetNotLeader(); notLeader != nil {
// Retry if error is `NotLeader`.

View File

@ -50,8 +50,8 @@ var (
TiKVCoprocessorHistogram *prometheus.HistogramVec
TiKVLockResolverCounter *prometheus.CounterVec
TiKVRegionErrorCounter *prometheus.CounterVec
TiKVTxnWriteKVCountHistogram prometheus.Histogram
TiKVTxnWriteSizeHistogram prometheus.Histogram
TiKVTxnWriteKVCountHistogram *prometheus.HistogramVec
TiKVTxnWriteSizeHistogram *prometheus.HistogramVec
TiKVRawkvCmdHistogram *prometheus.HistogramVec
TiKVRawkvSizeHistogram *prometheus.HistogramVec
TiKVTxnRegionsNumHistogram *prometheus.HistogramVec
@ -150,7 +150,7 @@ func initMetrics(namespace, subsystem string) {
Name: "request_seconds",
Help: "Bucketed histogram of sending request duration.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 29), // 0.5ms ~ 1.5days
}, []string{LblType, LblStore, LblStaleRead})
}, []string{LblType, LblStore, LblStaleRead, LblScope})
TiKVSendReqCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
@ -158,7 +158,7 @@ func initMetrics(namespace, subsystem string) {
Subsystem: subsystem,
Name: "request_counter",
Help: "Counter of sending request with multi dimensions.",
}, []string{LblType, LblStore, LblStaleRead, LblSource})
}, []string{LblType, LblStore, LblStaleRead, LblSource, LblScope})
TiKVSendReqTimeCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
@ -166,7 +166,7 @@ func initMetrics(namespace, subsystem string) {
Subsystem: subsystem,
Name: "request_time_counter",
Help: "Counter of request time with multi dimensions.",
}, []string{LblType, LblStore, LblStaleRead, LblSource})
}, []string{LblType, LblStore, LblStaleRead, LblSource, LblScope})
TiKVRPCNetLatencyHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
@ -175,7 +175,7 @@ func initMetrics(namespace, subsystem string) {
Name: "rpc_net_latency_seconds",
Help: "Bucketed histogram of time difference between TiDB and TiKV.",
Buckets: prometheus.ExponentialBuckets(5e-5, 2, 18), // 50us ~ 6.5s
}, []string{LblStore})
}, []string{LblStore, LblScope})
TiKVCoprocessorHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
@ -184,7 +184,7 @@ func initMetrics(namespace, subsystem string) {
Name: "cop_duration_seconds",
Help: "Run duration of a single coprocessor task, includes backoff time.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 29), // 0.5ms ~ 1.5days
}, []string{LblStore, LblStaleRead})
}, []string{LblStore, LblStaleRead, LblScope})
TiKVLockResolverCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
@ -200,25 +200,25 @@ func initMetrics(namespace, subsystem string) {
Subsystem: subsystem,
Name: "region_err_total",
Help: "Counter of region errors.",
}, []string{LblType})
}, []string{LblType, LblScope})
TiKVTxnWriteKVCountHistogram = prometheus.NewHistogram(
TiKVTxnWriteKVCountHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "txn_write_kv_num",
Help: "Count of kv pairs to write in a transaction.",
Buckets: prometheus.ExponentialBuckets(1, 4, 17), // 1 ~ 4G
})
}, []string{LblScope})
TiKVTxnWriteSizeHistogram = prometheus.NewHistogram(
TiKVTxnWriteSizeHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "txn_write_size_bytes",
Help: "Size of kv pairs to write in a transaction.",
Buckets: prometheus.ExponentialBuckets(16, 4, 17), // 16Bytes ~ 64GB
})
}, []string{LblScope})
TiKVRawkvCmdHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
@ -245,7 +245,7 @@ func initMetrics(namespace, subsystem string) {
Name: "txn_regions_num",
Help: "Number of regions in a transaction.",
Buckets: prometheus.ExponentialBuckets(1, 2, 25), // 1 ~ 16M
}, []string{LblType})
}, []string{LblType, LblScope})
TiKVLoadSafepointCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{

View File

@ -74,14 +74,27 @@ var (
BackoffHistogramIsWitness prometheus.Observer
BackoffHistogramEmpty prometheus.Observer
TxnRegionsNumHistogramWithSnapshot prometheus.Observer
TxnRegionsNumHistogramPrewrite prometheus.Observer
TxnRegionsNumHistogramCommit prometheus.Observer
TxnRegionsNumHistogramCleanup prometheus.Observer
TxnRegionsNumHistogramPessimisticLock prometheus.Observer
TxnRegionsNumHistogramPessimisticRollback prometheus.Observer
TxnRegionsNumHistogramWithCoprocessor prometheus.Observer
TxnRegionsNumHistogramWithBatchCoprocessor prometheus.Observer
TxnRegionsNumHistogramWithSnapshotInternal prometheus.Observer
TxnRegionsNumHistogramWithSnapshot prometheus.Observer
TxnRegionsNumHistogramPrewriteInternal prometheus.Observer
TxnRegionsNumHistogramPrewrite prometheus.Observer
TxnRegionsNumHistogramCommitInternal prometheus.Observer
TxnRegionsNumHistogramCommit prometheus.Observer
TxnRegionsNumHistogramCleanupInternal prometheus.Observer
TxnRegionsNumHistogramCleanup prometheus.Observer
TxnRegionsNumHistogramPessimisticLockInternal prometheus.Observer
TxnRegionsNumHistogramPessimisticLock prometheus.Observer
TxnRegionsNumHistogramPessimisticRollbackInternal prometheus.Observer
TxnRegionsNumHistogramPessimisticRollback prometheus.Observer
TxnRegionsNumHistogramWithCoprocessorInternal prometheus.Observer
TxnRegionsNumHistogramWithCoprocessor prometheus.Observer
TxnRegionsNumHistogramWithBatchCoprocessorInternal prometheus.Observer
TxnRegionsNumHistogramWithBatchCoprocessor prometheus.Observer
TxnWriteKVCountHistogramInternal prometheus.Observer
TxnWriteKVCountHistogramGeneral prometheus.Observer
TxnWriteSizeHistogramInternal prometheus.Observer
TxnWriteSizeHistogramGeneral prometheus.Observer
LockResolverCountWithBatchResolve prometheus.Counter
LockResolverCountWithExpired prometheus.Counter
@ -185,14 +198,26 @@ func initShortcuts() {
BackoffHistogramIsWitness = TiKVBackoffHistogram.WithLabelValues("isWitness")
BackoffHistogramEmpty = TiKVBackoffHistogram.WithLabelValues("")
TxnRegionsNumHistogramWithSnapshot = TiKVTxnRegionsNumHistogram.WithLabelValues("snapshot")
TxnRegionsNumHistogramPrewrite = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_prewrite")
TxnRegionsNumHistogramCommit = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_commit")
TxnRegionsNumHistogramCleanup = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_cleanup")
TxnRegionsNumHistogramPessimisticLock = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_pessimistic_lock")
TxnRegionsNumHistogramPessimisticRollback = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_pessimistic_rollback")
TxnRegionsNumHistogramWithCoprocessor = TiKVTxnRegionsNumHistogram.WithLabelValues("coprocessor")
TxnRegionsNumHistogramWithBatchCoprocessor = TiKVTxnRegionsNumHistogram.WithLabelValues("batch_coprocessor")
TxnRegionsNumHistogramWithSnapshotInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("snapshot", LblInternal)
TxnRegionsNumHistogramWithSnapshot = TiKVTxnRegionsNumHistogram.WithLabelValues("snapshot", LblGeneral)
TxnRegionsNumHistogramPrewriteInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_prewrite", LblInternal)
TxnRegionsNumHistogramPrewrite = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_prewrite", LblGeneral)
TxnRegionsNumHistogramCommitInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_commit", LblInternal)
TxnRegionsNumHistogramCommit = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_commit", LblGeneral)
TxnRegionsNumHistogramCleanupInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_cleanup", LblInternal)
TxnRegionsNumHistogramCleanup = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_cleanup", LblGeneral)
TxnRegionsNumHistogramPessimisticLockInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_pessimistic_lock", LblInternal)
TxnRegionsNumHistogramPessimisticLock = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_pessimistic_lock", LblGeneral)
TxnRegionsNumHistogramPessimisticRollbackInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_pessimistic_rollback", LblInternal)
TxnRegionsNumHistogramPessimisticRollback = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_pessimistic_rollback", LblGeneral)
TxnRegionsNumHistogramWithCoprocessorInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("coprocessor", LblInternal)
TxnRegionsNumHistogramWithCoprocessor = TiKVTxnRegionsNumHistogram.WithLabelValues("batch_coprocessor", LblGeneral)
TxnRegionsNumHistogramWithBatchCoprocessorInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("coprocessor", LblInternal)
TxnRegionsNumHistogramWithBatchCoprocessor = TiKVTxnRegionsNumHistogram.WithLabelValues("batch_coprocessor", LblGeneral)
TxnWriteKVCountHistogramInternal = TiKVTxnWriteKVCountHistogram.WithLabelValues(LblInternal)
TxnWriteKVCountHistogramGeneral = TiKVTxnWriteKVCountHistogram.WithLabelValues(LblGeneral)
TxnWriteSizeHistogramInternal = TiKVTxnWriteKVCountHistogram.WithLabelValues(LblInternal)
TxnWriteSizeHistogramGeneral = TiKVTxnWriteKVCountHistogram.WithLabelValues(LblGeneral)
LockResolverCountWithBatchResolve = TiKVLockResolverCounter.WithLabelValues("batch_resolve")
LockResolverCountWithExpired = TiKVLockResolverCounter.WithLabelValues("expired")

View File

@ -187,6 +187,10 @@ type twoPhaseCommitter struct {
// assertion error happened when initializing mutations, could be false positive if pessimistic lock is lost
stashedAssertionError error
// isInternal means it's related to an internal transaction. It's only used by `asyncPessimisticRollback` as the
// committer may contain a nil `txn` pointer.
isInternal bool
}
type memBufferMutations struct {
@ -696,8 +700,15 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error {
WriteKeys: c.mutations.Len(),
ResolveLock: util.ResolveLockDetail{},
}
metrics.TiKVTxnWriteKVCountHistogram.Observe(float64(commitDetail.WriteKeys))
metrics.TiKVTxnWriteSizeHistogram.Observe(float64(commitDetail.WriteSize))
isInternalReq := util.IsInternalRequest(c.txn.GetRequestSource())
if isInternalReq {
metrics.TxnWriteKVCountHistogramInternal.Observe(float64(commitDetail.WriteKeys))
metrics.TxnWriteSizeHistogramInternal.Observe(float64(commitDetail.WriteSize))
} else {
metrics.TxnWriteKVCountHistogramGeneral.Observe(float64(commitDetail.WriteKeys))
metrics.TxnWriteSizeHistogramGeneral.Observe(float64(commitDetail.WriteSize))
}
c.hasNoNeedCommitKeys = checkCnt > 0
c.lockTTL = txnLockTTL(txn.startTime, size)
c.priority = txn.priority.ToPB()

View File

@ -46,19 +46,22 @@ import (
"go.uber.org/zap"
)
type actionCleanup struct{}
type actionCleanup struct{ isInternal bool }
var _ twoPhaseCommitAction = actionCleanup{}
func (actionCleanup) String() string {
func (action actionCleanup) String() string {
return "cleanup"
}
func (actionCleanup) tiKVTxnRegionsNumHistogram() prometheus.Observer {
func (action actionCleanup) tiKVTxnRegionsNumHistogram() prometheus.Observer {
if action.isInternal {
return metrics.TxnRegionsNumHistogramCleanupInternal
}
return metrics.TxnRegionsNumHistogramCleanup
}
func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error {
func (action actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error {
req := tikvrpc.NewRequest(tikvrpc.CmdBatchRollback, &kvrpcpb.BatchRollbackRequest{
Keys: batch.mutations.GetKeys(),
StartVersion: c.startTS,
@ -99,5 +102,5 @@ func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer
}
func (c *twoPhaseCommitter) cleanupMutations(bo *retry.Backoffer, mutations CommitterMutations) error {
return c.doActionOnMutations(bo, actionCleanup{}, mutations)
return c.doActionOnMutations(bo, actionCleanup{isInternal: c.txn.isInternal()}, mutations)
}

View File

@ -52,19 +52,25 @@ import (
"go.uber.org/zap"
)
type actionCommit struct{ retry bool }
type actionCommit struct {
retry bool
isInternal bool
}
var _ twoPhaseCommitAction = actionCommit{}
func (actionCommit) String() string {
func (action actionCommit) String() string {
return "commit"
}
func (actionCommit) tiKVTxnRegionsNumHistogram() prometheus.Observer {
func (action actionCommit) tiKVTxnRegionsNumHistogram() prometheus.Observer {
if action.isInternal {
return metrics.TxnRegionsNumHistogramCommitInternal
}
return metrics.TxnRegionsNumHistogramCommit
}
func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error {
func (action actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error {
keys := batch.mutations.GetKeys()
req := tikvrpc.NewRequest(tikvrpc.CmdCommit, &kvrpcpb.CommitRequest{
StartVersion: c.startTS,
@ -132,7 +138,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer,
if same {
continue
}
return c.doActionOnMutations(bo, actionCommit{true}, batch.mutations)
return c.doActionOnMutations(bo, actionCommit{true, action.isInternal}, batch.mutations)
}
if resp.Resp == nil {
@ -220,5 +226,5 @@ func (c *twoPhaseCommitter) commitMutations(bo *retry.Backoffer, mutations Commi
bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1))
}
return c.doActionOnMutations(bo, actionCommit{}, mutations)
return c.doActionOnMutations(bo, actionCommit{isInternal: c.txn.isInternal()}, mutations)
}

View File

@ -61,27 +61,36 @@ import (
type actionPessimisticLock struct {
*kv.LockCtx
wakeUpMode kvrpcpb.PessimisticLockWakeUpMode
isInternal bool
}
type actionPessimisticRollback struct {
isInternal bool
}
type actionPessimisticRollback struct{}
var (
_ twoPhaseCommitAction = actionPessimisticLock{}
_ twoPhaseCommitAction = actionPessimisticRollback{}
)
func (actionPessimisticLock) String() string {
func (action actionPessimisticLock) String() string {
return "pessimistic_lock"
}
func (actionPessimisticLock) tiKVTxnRegionsNumHistogram() prometheus.Observer {
func (action actionPessimisticLock) tiKVTxnRegionsNumHistogram() prometheus.Observer {
if action.isInternal {
return metrics.TxnRegionsNumHistogramPessimisticLockInternal
}
return metrics.TxnRegionsNumHistogramPessimisticLock
}
func (actionPessimisticRollback) String() string {
func (action actionPessimisticRollback) String() string {
return "pessimistic_rollback"
}
func (actionPessimisticRollback) tiKVTxnRegionsNumHistogram() prometheus.Observer {
func (action actionPessimisticRollback) tiKVTxnRegionsNumHistogram() prometheus.Observer {
if action.isInternal {
return metrics.TxnRegionsNumHistogramPessimisticRollbackInternal
}
return metrics.TxnRegionsNumHistogramPessimisticRollback
}
@ -540,9 +549,15 @@ func (c *twoPhaseCommitter) pessimisticLockMutations(bo *retry.Backoffer, lockCt
}
}
}
return c.doActionOnMutations(bo, actionPessimisticLock{LockCtx: lockCtx, wakeUpMode: lockWaitMode}, mutations)
return c.doActionOnMutations(bo, actionPessimisticLock{LockCtx: lockCtx, wakeUpMode: lockWaitMode, isInternal: c.txn.isInternal()}, mutations)
}
func (c *twoPhaseCommitter) pessimisticRollbackMutations(bo *retry.Backoffer, mutations CommitterMutations) error {
return c.doActionOnMutations(bo, actionPessimisticRollback{}, mutations)
isInternal := false
if c.txn != nil {
isInternal = c.txn.isInternal()
} else {
isInternal = c.isInternal
}
return c.doActionOnMutations(bo, actionPessimisticRollback{isInternal: isInternal}, mutations)
}

View File

@ -59,15 +59,21 @@ import (
"go.uber.org/zap"
)
type actionPrewrite struct{ retry bool }
type actionPrewrite struct {
retry bool
isInternal bool
}
var _ twoPhaseCommitAction = actionPrewrite{}
func (actionPrewrite) String() string {
func (action actionPrewrite) String() string {
return "prewrite"
}
func (actionPrewrite) tiKVTxnRegionsNumHistogram() prometheus.Observer {
func (action actionPrewrite) tiKVTxnRegionsNumHistogram() prometheus.Observer {
if action.isInternal {
return metrics.TxnRegionsNumHistogramPrewriteInternal
}
return metrics.TxnRegionsNumHistogramPrewrite
}
@ -302,7 +308,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B
if same {
continue
}
err = c.doActionOnMutations(bo, actionPrewrite{true}, batch.mutations)
err = c.doActionOnMutations(bo, actionPrewrite{true, action.isInternal}, batch.mutations)
return err
}
@ -436,5 +442,5 @@ func (c *twoPhaseCommitter) prewriteMutations(bo *retry.Backoffer, mutations Com
}
// `doActionOnMutations` will unset `useOnePC` if the mutations is splitted into multiple batches.
return c.doActionOnMutations(bo, actionPrewrite{}, mutations)
return c.doActionOnMutations(bo, actionPrewrite{isInternal: c.txn.isInternal()}, mutations)
}

View File

@ -1280,6 +1280,7 @@ func (txn *KVTxn) asyncPessimisticRollback(ctx context.Context, keys [][]byte, s
startTS: txn.committer.startTS,
forUpdateTS: txn.committer.forUpdateTS,
primaryKey: txn.committer.primaryKey,
isInternal: txn.isInternal(),
}
if specifiedForUpdateTS > committer.forUpdateTS {
committer.forUpdateTS = specifiedForUpdateTS

View File

@ -335,7 +335,11 @@ func (s *KVSnapshot) batchGetKeysByRegions(bo *retry.Backoffer, keys [][]byte, c
return err
}
metrics.TxnRegionsNumHistogramWithSnapshot.Observe(float64(len(groups)))
if s.IsInternal() {
metrics.TxnRegionsNumHistogramWithSnapshotInternal.Observe(float64(len(groups)))
} else {
metrics.TxnRegionsNumHistogramWithSnapshot.Observe(float64(len(groups)))
}
var batches []batchKeys
for id, g := range groups {