diff --git a/kv/kv.go b/kv/kv.go index 12bf56c8..ad3878c7 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -25,6 +25,7 @@ import ( "sync" "time" + "github.com/pingcap/kvproto/pkg/kvrpcpb" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/util" ) @@ -66,7 +67,12 @@ type LockCtx struct { LockExpired *uint32 Stats *util.LockKeysDetails ResourceGroupTag []byte - OnDeadlock func(*tikverr.ErrDeadlock) + // ResourceGroupTagger is a special tagger used only for PessimisticLockRequest. + // We did not use tikvrpc.ResourceGroupTagger here because the kv package is a + // more basic component, and we cannot rely on tikvrpc.Request here, so we treat + // LockCtx specially. + ResourceGroupTagger func(*kvrpcpb.PessimisticLockRequest) []byte + OnDeadlock func(*tikverr.ErrDeadlock) } // LockWaitTime returns lockWaitTimeInMs diff --git a/txnkv/transaction/pessimistic.go b/txnkv/transaction/pessimistic.go index 9064aaf3..683a1a45 100644 --- a/txnkv/transaction/pessimistic.go +++ b/txnkv/transaction/pessimistic.go @@ -119,6 +119,9 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * MinCommitTs: c.forUpdateTS + 1, }, kvrpcpb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: action.LockCtx.ResourceGroupTag, MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds())}) + if action.LockCtx.ResourceGroupTag == nil && action.LockCtx.ResourceGroupTagger != nil { + req.ResourceGroupTag = action.LockCtx.ResourceGroupTagger(req.Req.(*kvrpcpb.PessimisticLockRequest)) + } lockWaitStartTime := action.WaitStartTime for { // if lockWaitTime set, refine the request `WaitTimeout` field based on timeout limit