mirror of https://github.com/tikv/client-go.git
txn: use region pessimsitic lock rollback to speed up cleanup (#1125)
* use region pessimsitic lock rollback and clean for write-write conflict processing Signed-off-by: cfzjywxk <cfzjywxk@gmail.com> * format Signed-off-by: cfzjywxk <cfzjywxk@gmail.com> * debug ci failure patch, disable region pessimistic rollback Signed-off-by: cfzjywxk <cfzjywxk@gmail.com> --------- Signed-off-by: cfzjywxk <cfzjywxk@gmail.com>
This commit is contained in:
parent
5945fe3920
commit
d2887d56ab
|
|
@ -56,6 +56,7 @@ import (
|
|||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"github.com/tikv/client-go/v2/config"
|
||||
"github.com/tikv/client-go/v2/config/retry"
|
||||
tikverr "github.com/tikv/client-go/v2/error"
|
||||
"github.com/tikv/client-go/v2/kv"
|
||||
"github.com/tikv/client-go/v2/oracle"
|
||||
|
|
@ -1028,6 +1029,7 @@ type testLockWithTiKVSuite struct {
|
|||
func (s *testLockWithTiKVSuite) SetupTest() {
|
||||
if *withTiKV {
|
||||
s.store = tikv.StoreProbe{KVStore: NewTestStore(s.T())}
|
||||
s.cleanupLocks()
|
||||
} else {
|
||||
s.store = tikv.StoreProbe{KVStore: NewTestUniStore(s.T())}
|
||||
}
|
||||
|
|
@ -1037,6 +1039,19 @@ func (s *testLockWithTiKVSuite) TearDownTest() {
|
|||
s.store.Close()
|
||||
}
|
||||
|
||||
func (s *testLockWithTiKVSuite) cleanupLocks() {
|
||||
// Cleanup possible left locks.
|
||||
bo := tikv.NewBackofferWithVars(context.Background(), int(transaction.PrewriteMaxBackoff.Load()), nil)
|
||||
ctx := context.WithValue(context.Background(), util.SessionID, uint64(1))
|
||||
currentTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
|
||||
s.NoError(err)
|
||||
remainingLocks, err := s.store.ScanLocks(ctx, []byte("k"), []byte("l"), currentTS)
|
||||
s.NoError(err)
|
||||
if len(remainingLocks) > 0 {
|
||||
s.mustResolve(ctx, bo, remainingLocks, currentTS, []byte("k"), []byte("l"))
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Migrate FairLocking related tests here.
|
||||
|
||||
func withRetry[T any](f func() (T, error), limit int, delay time.Duration) (T, error) {
|
||||
|
|
@ -1559,3 +1574,134 @@ func (s *testLockWithTiKVSuite) TestBatchResolveLocks() {
|
|||
s.NoError(err)
|
||||
s.Equal(v3, v)
|
||||
}
|
||||
|
||||
func (s *testLockWithTiKVSuite) makeLock(startTS uint64, forUpdateTS uint64, key []byte, primary []byte) *txnlock.Lock {
|
||||
return &txnlock.Lock{
|
||||
Key: key,
|
||||
Primary: primary,
|
||||
TxnID: startTS,
|
||||
TTL: 10,
|
||||
TxnSize: 1024,
|
||||
LockType: kvrpcpb.Op_PessimisticLock,
|
||||
UseAsyncCommit: false,
|
||||
LockForUpdateTS: forUpdateTS,
|
||||
MinCommitTS: forUpdateTS,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *testLockWithTiKVSuite) mustLockNum(ctx context.Context, expectedNum int, scanTS uint64, startKey []byte, endKey []byte) {
|
||||
remainingLocks, err := s.store.ScanLocks(ctx, startKey, endKey, scanTS)
|
||||
s.NoError(err)
|
||||
s.Len(remainingLocks, expectedNum)
|
||||
}
|
||||
|
||||
func (s *testLockWithTiKVSuite) mustResolve(ctx context.Context, bo *retry.Backoffer, remainingLocks []*txnlock.Lock, callerTS uint64, startKey []byte, endKey []byte) {
|
||||
if len(remainingLocks) > 0 {
|
||||
_, err := s.store.GetLockResolver().ResolveLocksWithOpts(bo, txnlock.ResolveLocksOptions{
|
||||
CallerStartTS: callerTS,
|
||||
Locks: remainingLocks,
|
||||
Lite: false,
|
||||
ForRead: false,
|
||||
Detail: nil,
|
||||
PessimisticRegionResolve: true,
|
||||
})
|
||||
s.NoError(err)
|
||||
|
||||
lockAfterResolve, err := s.store.ScanLocks(ctx, startKey, endKey, callerTS)
|
||||
s.NoError(err)
|
||||
s.Len(lockAfterResolve, 0)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *testLockWithTiKVSuite) TestPessimisticRollbackWithRead() {
|
||||
// The test relies on the pessimistic rollback read phase implementations in tikv
|
||||
// https://github.com/tikv/tikv/pull/16185, which is not implemented in mockstore by now.
|
||||
if !*withTiKV {
|
||||
return
|
||||
}
|
||||
|
||||
s.NoError(failpoint.Enable("tikvclient/shortPessimisticLockTTL", "return"))
|
||||
s.NoError(failpoint.Enable("tikvclient/twoPCShortLockTTL", "return"))
|
||||
defer func() {
|
||||
s.NoError(failpoint.Disable("tikvclient/shortPessimisticLockTTL"))
|
||||
s.NoError(failpoint.Disable("tikvclient/twoPCShortLockTTL"))
|
||||
}()
|
||||
test := func(inMemoryLock bool) {
|
||||
recoverFunc := s.trySetTiKVConfig("pessimistic-txn.in-memory", inMemoryLock)
|
||||
defer recoverFunc()
|
||||
|
||||
// Init, cleanup possible left locks.
|
||||
bo := tikv.NewBackofferWithVars(context.Background(), int(transaction.PrewriteMaxBackoff.Load()), nil)
|
||||
ctx := context.WithValue(context.Background(), util.SessionID, uint64(1))
|
||||
s.cleanupLocks()
|
||||
|
||||
// Basic case, three keys could be rolled back within one pessimistic rollback request.
|
||||
k1, k2, k3 := []byte("k1"), []byte("k2"), []byte("k3")
|
||||
txn1, err := s.store.Begin()
|
||||
s.NoError(err)
|
||||
startTS := txn1.StartTS()
|
||||
txn1.SetPessimistic(true)
|
||||
lockCtx := kv.NewLockCtx(startTS, 200, time.Now())
|
||||
err = txn1.LockKeys(ctx, lockCtx, k1, k2, k3)
|
||||
s.NoError(err)
|
||||
txn1.GetCommitter().CloseTTLManager()
|
||||
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
s.mustLockNum(ctx, 3, startTS+1, []byte("k"), []byte("l"))
|
||||
locks := []*txnlock.Lock{
|
||||
s.makeLock(startTS, startTS, k3, k1),
|
||||
}
|
||||
s.mustResolve(ctx, bo, locks, startTS+1, []byte("k"), []byte("l"))
|
||||
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
s.mustLockNum(ctx, 0, startTS+1, []byte("k"), []byte("l"))
|
||||
|
||||
// Acquire pessimistic locks for more than 256(RESOLVE_LOCK_BATCH_SIZE) keys.
|
||||
formatKey := func(prefix rune, i int) []byte {
|
||||
return []byte(fmt.Sprintf("%c%04d", prefix, i))
|
||||
}
|
||||
numKeys := 1000
|
||||
prewriteKeys := make([][]byte, 0, numKeys/2)
|
||||
pessimisticLockKeys := make([][]byte, 0, numKeys/2)
|
||||
for i := 0; i < numKeys; i++ {
|
||||
key := formatKey('k', i)
|
||||
if i%2 == 0 {
|
||||
err = txn1.LockKeys(ctx, lockCtx, key)
|
||||
pessimisticLockKeys = append(pessimisticLockKeys, key)
|
||||
} else {
|
||||
err = txn1.Set(key, []byte("val"))
|
||||
s.NoError(err)
|
||||
prewriteKeys = append(prewriteKeys, key)
|
||||
}
|
||||
s.NoError(err)
|
||||
}
|
||||
committer, err := txn1.NewCommitter(1)
|
||||
s.NoError(err)
|
||||
mutations := committer.MutationsOfKeys(prewriteKeys)
|
||||
err = committer.PrewriteMutations(ctx, mutations)
|
||||
s.NoError(err)
|
||||
|
||||
// All the pessimistic locks belonging to the same transaction are pessimistic
|
||||
// rolled back within one request.
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
pessimisticLock := s.makeLock(startTS, startTS, pessimisticLockKeys[1], pessimisticLockKeys[0])
|
||||
_, err = s.store.GetLockResolver().ResolveLocksWithOpts(bo, txnlock.ResolveLocksOptions{
|
||||
CallerStartTS: startTS + 1,
|
||||
Locks: []*txnlock.Lock{pessimisticLock},
|
||||
Lite: false,
|
||||
ForRead: false,
|
||||
Detail: nil,
|
||||
PessimisticRegionResolve: true,
|
||||
})
|
||||
s.NoError(err)
|
||||
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
s.mustLockNum(ctx, numKeys/2, startTS+1, []byte("k"), []byte("l"))
|
||||
|
||||
// Cleanup.
|
||||
err = txn1.Rollback()
|
||||
s.NoError(err)
|
||||
}
|
||||
test(false)
|
||||
test(true)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -356,8 +356,9 @@ func (action actionPessimisticLock) handlePessimisticLockResponseNormalMode(
|
|||
c.store.GetLockResolver().UpdateResolvingLocks(locks, c.startTS, *diagCtx.resolvingRecordToken)
|
||||
}
|
||||
resolveLockOpts := txnlock.ResolveLocksOptions{
|
||||
CallerStartTS: 0,
|
||||
Locks: locks,
|
||||
CallerStartTS: 0,
|
||||
Locks: locks,
|
||||
PessimisticRegionResolve: false,
|
||||
}
|
||||
if action.LockCtx.Stats != nil {
|
||||
resolveLockOpts.Detail = &action.LockCtx.Stats.ResolveLock
|
||||
|
|
@ -484,8 +485,9 @@ func (action actionPessimisticLock) handlePessimisticLockResponseForceLockMode(
|
|||
c.store.GetLockResolver().UpdateResolvingLocks(locks, c.startTS, *diagCtx.resolvingRecordToken)
|
||||
}
|
||||
resolveLockOpts := txnlock.ResolveLocksOptions{
|
||||
CallerStartTS: 0,
|
||||
Locks: locks,
|
||||
CallerStartTS: 0,
|
||||
Locks: locks,
|
||||
PessimisticRegionResolve: false,
|
||||
}
|
||||
if action.LockCtx.Stats != nil {
|
||||
resolveLockOpts.Detail = &action.LockCtx.Stats.ResolveLock
|
||||
|
|
|
|||
|
|
@ -471,9 +471,10 @@ func (action actionPrewrite) handleSingleBatch(
|
|||
c.store.GetLockResolver().UpdateResolvingLocks(locks, c.startTS, *resolvingRecordToken)
|
||||
}
|
||||
resolveLockOpts := txnlock.ResolveLocksOptions{
|
||||
CallerStartTS: c.startTS,
|
||||
Locks: locks,
|
||||
Detail: &c.getDetail().ResolveLock,
|
||||
CallerStartTS: c.startTS,
|
||||
Locks: locks,
|
||||
Detail: &c.getDetail().ResolveLock,
|
||||
PessimisticRegionResolve: false,
|
||||
}
|
||||
resolveLockRes, err := c.store.GetLockResolver().ResolveLocksWithOpts(bo, resolveLockOpts)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -160,6 +160,11 @@ func (s TxnStatus) StatusCacheable() bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (s TxnStatus) String() string {
|
||||
// TODO: print primary lock after redact is introduced.
|
||||
return fmt.Sprintf("ttl:%v commit_ts:%v action: %v", s.ttl, s.commitTS, s.action)
|
||||
}
|
||||
|
||||
// Lock represents a lock from tikv server.
|
||||
type Lock struct {
|
||||
Key []byte
|
||||
|
|
@ -261,7 +266,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo
|
|||
//
|
||||
// `resolvePessimisticLock` should be called after calling `getTxnStatus`.
|
||||
// See: https://github.com/pingcap/tidb/issues/45134
|
||||
err := lr.resolvePessimisticLock(bo, l)
|
||||
err := lr.resolvePessimisticLock(bo, l, false, nil)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
|
@ -350,11 +355,12 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo
|
|||
|
||||
// ResolveLocksOptions is the options struct for calling resolving lock.
|
||||
type ResolveLocksOptions struct {
|
||||
CallerStartTS uint64
|
||||
Locks []*Lock
|
||||
Lite bool
|
||||
ForRead bool
|
||||
Detail *util.ResolveLockDetail
|
||||
CallerStartTS uint64
|
||||
Locks []*Lock
|
||||
Lite bool
|
||||
ForRead bool
|
||||
Detail *util.ResolveLockDetail
|
||||
PessimisticRegionResolve bool
|
||||
}
|
||||
|
||||
// ResolveLockResult is the result struct for resolving lock.
|
||||
|
|
@ -441,7 +447,7 @@ func (lr *LockResolver) ResolveLocksDone(callerStartTS uint64, token int) {
|
|||
}
|
||||
|
||||
func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, opts ResolveLocksOptions) (ResolveLockResult, error) {
|
||||
callerStartTS, locks, forRead, lite, detail := opts.CallerStartTS, opts.Locks, opts.ForRead, opts.Lite, opts.Detail
|
||||
callerStartTS, locks, forRead, lite, detail, pessimisticRegionResolve := opts.CallerStartTS, opts.Locks, opts.ForRead, opts.Lite, opts.Detail, opts.PessimisticRegionResolve
|
||||
if lr.testingKnobs.meetLock != nil {
|
||||
lr.testingKnobs.meetLock(locks)
|
||||
}
|
||||
|
|
@ -464,6 +470,7 @@ func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, opts ResolveLocksOptio
|
|||
// TxnID -> []Region, record resolved Regions.
|
||||
// TODO: Maybe put it in LockResolver and share by all txns.
|
||||
cleanTxns := make(map[uint64]map[locate.RegionVerID]struct{})
|
||||
pessimisticCleanTxns := make(map[uint64]map[locate.RegionVerID]struct{})
|
||||
var resolve func(*Lock, bool) (TxnStatus, error)
|
||||
resolve = func(l *Lock, forceSyncCommit bool) (TxnStatus, error) {
|
||||
status, err := lr.getTxnStatusFromLock(bo, l, callerStartTS, forceSyncCommit, detail)
|
||||
|
|
@ -482,7 +489,8 @@ func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, opts ResolveLocksOptio
|
|||
return status, nil
|
||||
}
|
||||
|
||||
// If the lock is committed or rollbacked, resolve lock.
|
||||
// If the lock is committed or rolled back, resolve lock.
|
||||
// If the lock is regarded as an expired pessimistic lock, pessimistic rollback it.
|
||||
metrics.LockResolverCountWithExpired.Inc()
|
||||
cleanRegions, exists := cleanTxns[l.TxnID]
|
||||
if !exists {
|
||||
|
|
@ -504,7 +512,16 @@ func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, opts ResolveLocksOptio
|
|||
}
|
||||
if l.LockType == kvrpcpb.Op_PessimisticLock {
|
||||
// pessimistic locks don't block read so it needn't be async.
|
||||
err = lr.resolvePessimisticLock(bo, l)
|
||||
if pessimisticRegionResolve {
|
||||
pessimisticCleanRegions, exists := pessimisticCleanTxns[l.TxnID]
|
||||
if !exists {
|
||||
pessimisticCleanRegions = make(map[locate.RegionVerID]struct{})
|
||||
pessimisticCleanTxns[l.TxnID] = pessimisticCleanRegions
|
||||
}
|
||||
err = lr.resolvePessimisticLock(bo, l, true, pessimisticCleanRegions)
|
||||
} else {
|
||||
err = lr.resolvePessimisticLock(bo, l, false, nil)
|
||||
}
|
||||
} else {
|
||||
if forRead {
|
||||
asyncCtx := context.WithValue(lr.asyncResolveCtx, util.RequestSourceKey, bo.GetCtx().Value(util.RequestSourceKey))
|
||||
|
|
@ -669,7 +686,8 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *retry.Backoffer, l *Lock, calle
|
|||
if lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) <= 0 {
|
||||
logutil.Logger(bo.GetCtx()).Warn("lock txn not found, lock has expired",
|
||||
zap.Uint64("CallerStartTs", callerStartTS),
|
||||
zap.Stringer("lock str", l))
|
||||
zap.Stringer("lock str", l),
|
||||
zap.Stringer("status", status))
|
||||
if l.LockType == kvrpcpb.Op_PessimisticLock {
|
||||
if _, err := util.EvalFailpoint("txnExpireRetTTL"); err == nil {
|
||||
return TxnStatus{action: kvrpcpb.Action_LockNotExistDoNothing},
|
||||
|
|
@ -1178,7 +1196,10 @@ func (lr *LockResolver) resolveLock(bo *retry.Backoffer, l *Lock, status TxnStat
|
|||
|
||||
// resolvePessimisticLock handles pessimistic locks after checking txn status.
|
||||
// Note that this function assumes `CheckTxnStatus` is done (or `getTxnStatusFromLock` has been called) on the lock.
|
||||
func (lr *LockResolver) resolvePessimisticLock(bo *retry.Backoffer, l *Lock) error {
|
||||
// When "pessimisticRegionResolve" is set to false, only pessimistic rollback input lock. Otherwise, the corresponding
|
||||
// region will be scanned, and all relevant pessimistic locks that are read will be rolled back at the same time,
|
||||
// similar to the `resolveLock` function.
|
||||
func (lr *LockResolver) resolvePessimisticLock(bo *retry.Backoffer, l *Lock, pessimisticRegionResolve bool, pessimisticCleanRegions map[locate.RegionVerID]struct{}) error {
|
||||
metrics.LockResolverCountWithResolveLocks.Inc()
|
||||
// The lock has been resolved by getTxnStatusFromLock.
|
||||
if bytes.Equal(l.Key, l.Primary) {
|
||||
|
|
@ -1189,6 +1210,11 @@ func (lr *LockResolver) resolvePessimisticLock(bo *retry.Backoffer, l *Lock) err
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if pessimisticRegionResolve && pessimisticCleanRegions != nil {
|
||||
if _, ok := pessimisticCleanRegions[loc.Region]; ok {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
forUpdateTS := l.LockForUpdateTS
|
||||
if forUpdateTS == 0 {
|
||||
forUpdateTS = math.MaxUint64
|
||||
|
|
@ -1196,7 +1222,9 @@ func (lr *LockResolver) resolvePessimisticLock(bo *retry.Backoffer, l *Lock) err
|
|||
pessimisticRollbackReq := &kvrpcpb.PessimisticRollbackRequest{
|
||||
StartVersion: l.TxnID,
|
||||
ForUpdateTs: forUpdateTS,
|
||||
Keys: [][]byte{l.Key},
|
||||
}
|
||||
if !pessimisticRegionResolve {
|
||||
pessimisticRollbackReq.Keys = [][]byte{l.Key}
|
||||
}
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdPessimisticRollback, pessimisticRollbackReq, kvrpcpb.Context{
|
||||
ResourceControlContext: &kvrpcpb.ResourceControlContext{
|
||||
|
|
@ -1228,6 +1256,9 @@ func (lr *LockResolver) resolvePessimisticLock(bo *retry.Backoffer, l *Lock) err
|
|||
logutil.Logger(bo.GetCtx()).Error("resolveLock error", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
if pessimisticRegionResolve && pessimisticCleanRegions != nil {
|
||||
pessimisticCleanRegions[loc.Region] = struct{}{}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -58,7 +58,7 @@ func (l LockResolverProbe) ResolveLock(bo *retry.Backoffer, lock *Lock) error {
|
|||
|
||||
// ResolvePessimisticLock resolves single pessimistic lock.
|
||||
func (l LockResolverProbe) ResolvePessimisticLock(bo *retry.Backoffer, lock *Lock) error {
|
||||
return l.resolvePessimisticLock(bo, lock)
|
||||
return l.resolvePessimisticLock(bo, lock, false, nil)
|
||||
}
|
||||
|
||||
// GetTxnStatus sends the CheckTxnStatus request to the TiKV server.
|
||||
|
|
|
|||
Loading…
Reference in New Issue