diff --git a/integration_tests/lock_test.go b/integration_tests/lock_test.go index 89bead1c..81e76cbd 100644 --- a/integration_tests/lock_test.go +++ b/integration_tests/lock_test.go @@ -56,6 +56,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" "github.com/tikv/client-go/v2/config" + "github.com/tikv/client-go/v2/config/retry" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/oracle" @@ -1028,6 +1029,7 @@ type testLockWithTiKVSuite struct { func (s *testLockWithTiKVSuite) SetupTest() { if *withTiKV { s.store = tikv.StoreProbe{KVStore: NewTestStore(s.T())} + s.cleanupLocks() } else { s.store = tikv.StoreProbe{KVStore: NewTestUniStore(s.T())} } @@ -1037,6 +1039,19 @@ func (s *testLockWithTiKVSuite) TearDownTest() { s.store.Close() } +func (s *testLockWithTiKVSuite) cleanupLocks() { + // Cleanup possible left locks. + bo := tikv.NewBackofferWithVars(context.Background(), int(transaction.PrewriteMaxBackoff.Load()), nil) + ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) + currentTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + s.NoError(err) + remainingLocks, err := s.store.ScanLocks(ctx, []byte("k"), []byte("l"), currentTS) + s.NoError(err) + if len(remainingLocks) > 0 { + s.mustResolve(ctx, bo, remainingLocks, currentTS, []byte("k"), []byte("l")) + } +} + // TODO: Migrate FairLocking related tests here. func withRetry[T any](f func() (T, error), limit int, delay time.Duration) (T, error) { @@ -1559,3 +1574,134 @@ func (s *testLockWithTiKVSuite) TestBatchResolveLocks() { s.NoError(err) s.Equal(v3, v) } + +func (s *testLockWithTiKVSuite) makeLock(startTS uint64, forUpdateTS uint64, key []byte, primary []byte) *txnlock.Lock { + return &txnlock.Lock{ + Key: key, + Primary: primary, + TxnID: startTS, + TTL: 10, + TxnSize: 1024, + LockType: kvrpcpb.Op_PessimisticLock, + UseAsyncCommit: false, + LockForUpdateTS: forUpdateTS, + MinCommitTS: forUpdateTS, + } +} + +func (s *testLockWithTiKVSuite) mustLockNum(ctx context.Context, expectedNum int, scanTS uint64, startKey []byte, endKey []byte) { + remainingLocks, err := s.store.ScanLocks(ctx, startKey, endKey, scanTS) + s.NoError(err) + s.Len(remainingLocks, expectedNum) +} + +func (s *testLockWithTiKVSuite) mustResolve(ctx context.Context, bo *retry.Backoffer, remainingLocks []*txnlock.Lock, callerTS uint64, startKey []byte, endKey []byte) { + if len(remainingLocks) > 0 { + _, err := s.store.GetLockResolver().ResolveLocksWithOpts(bo, txnlock.ResolveLocksOptions{ + CallerStartTS: callerTS, + Locks: remainingLocks, + Lite: false, + ForRead: false, + Detail: nil, + PessimisticRegionResolve: true, + }) + s.NoError(err) + + lockAfterResolve, err := s.store.ScanLocks(ctx, startKey, endKey, callerTS) + s.NoError(err) + s.Len(lockAfterResolve, 0) + } +} + +func (s *testLockWithTiKVSuite) TestPessimisticRollbackWithRead() { + // The test relies on the pessimistic rollback read phase implementations in tikv + // https://github.com/tikv/tikv/pull/16185, which is not implemented in mockstore by now. + if !*withTiKV { + return + } + + s.NoError(failpoint.Enable("tikvclient/shortPessimisticLockTTL", "return")) + s.NoError(failpoint.Enable("tikvclient/twoPCShortLockTTL", "return")) + defer func() { + s.NoError(failpoint.Disable("tikvclient/shortPessimisticLockTTL")) + s.NoError(failpoint.Disable("tikvclient/twoPCShortLockTTL")) + }() + test := func(inMemoryLock bool) { + recoverFunc := s.trySetTiKVConfig("pessimistic-txn.in-memory", inMemoryLock) + defer recoverFunc() + + // Init, cleanup possible left locks. + bo := tikv.NewBackofferWithVars(context.Background(), int(transaction.PrewriteMaxBackoff.Load()), nil) + ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) + s.cleanupLocks() + + // Basic case, three keys could be rolled back within one pessimistic rollback request. + k1, k2, k3 := []byte("k1"), []byte("k2"), []byte("k3") + txn1, err := s.store.Begin() + s.NoError(err) + startTS := txn1.StartTS() + txn1.SetPessimistic(true) + lockCtx := kv.NewLockCtx(startTS, 200, time.Now()) + err = txn1.LockKeys(ctx, lockCtx, k1, k2, k3) + s.NoError(err) + txn1.GetCommitter().CloseTTLManager() + + time.Sleep(time.Millisecond * 100) + s.mustLockNum(ctx, 3, startTS+1, []byte("k"), []byte("l")) + locks := []*txnlock.Lock{ + s.makeLock(startTS, startTS, k3, k1), + } + s.mustResolve(ctx, bo, locks, startTS+1, []byte("k"), []byte("l")) + + time.Sleep(time.Millisecond * 100) + s.mustLockNum(ctx, 0, startTS+1, []byte("k"), []byte("l")) + + // Acquire pessimistic locks for more than 256(RESOLVE_LOCK_BATCH_SIZE) keys. + formatKey := func(prefix rune, i int) []byte { + return []byte(fmt.Sprintf("%c%04d", prefix, i)) + } + numKeys := 1000 + prewriteKeys := make([][]byte, 0, numKeys/2) + pessimisticLockKeys := make([][]byte, 0, numKeys/2) + for i := 0; i < numKeys; i++ { + key := formatKey('k', i) + if i%2 == 0 { + err = txn1.LockKeys(ctx, lockCtx, key) + pessimisticLockKeys = append(pessimisticLockKeys, key) + } else { + err = txn1.Set(key, []byte("val")) + s.NoError(err) + prewriteKeys = append(prewriteKeys, key) + } + s.NoError(err) + } + committer, err := txn1.NewCommitter(1) + s.NoError(err) + mutations := committer.MutationsOfKeys(prewriteKeys) + err = committer.PrewriteMutations(ctx, mutations) + s.NoError(err) + + // All the pessimistic locks belonging to the same transaction are pessimistic + // rolled back within one request. + time.Sleep(time.Millisecond * 100) + pessimisticLock := s.makeLock(startTS, startTS, pessimisticLockKeys[1], pessimisticLockKeys[0]) + _, err = s.store.GetLockResolver().ResolveLocksWithOpts(bo, txnlock.ResolveLocksOptions{ + CallerStartTS: startTS + 1, + Locks: []*txnlock.Lock{pessimisticLock}, + Lite: false, + ForRead: false, + Detail: nil, + PessimisticRegionResolve: true, + }) + s.NoError(err) + + time.Sleep(time.Millisecond * 100) + s.mustLockNum(ctx, numKeys/2, startTS+1, []byte("k"), []byte("l")) + + // Cleanup. + err = txn1.Rollback() + s.NoError(err) + } + test(false) + test(true) +} diff --git a/txnkv/transaction/pessimistic.go b/txnkv/transaction/pessimistic.go index d04c951c..56384e84 100644 --- a/txnkv/transaction/pessimistic.go +++ b/txnkv/transaction/pessimistic.go @@ -356,8 +356,9 @@ func (action actionPessimisticLock) handlePessimisticLockResponseNormalMode( c.store.GetLockResolver().UpdateResolvingLocks(locks, c.startTS, *diagCtx.resolvingRecordToken) } resolveLockOpts := txnlock.ResolveLocksOptions{ - CallerStartTS: 0, - Locks: locks, + CallerStartTS: 0, + Locks: locks, + PessimisticRegionResolve: false, } if action.LockCtx.Stats != nil { resolveLockOpts.Detail = &action.LockCtx.Stats.ResolveLock @@ -484,8 +485,9 @@ func (action actionPessimisticLock) handlePessimisticLockResponseForceLockMode( c.store.GetLockResolver().UpdateResolvingLocks(locks, c.startTS, *diagCtx.resolvingRecordToken) } resolveLockOpts := txnlock.ResolveLocksOptions{ - CallerStartTS: 0, - Locks: locks, + CallerStartTS: 0, + Locks: locks, + PessimisticRegionResolve: false, } if action.LockCtx.Stats != nil { resolveLockOpts.Detail = &action.LockCtx.Stats.ResolveLock diff --git a/txnkv/transaction/prewrite.go b/txnkv/transaction/prewrite.go index fac2f4f5..1a8d24aa 100644 --- a/txnkv/transaction/prewrite.go +++ b/txnkv/transaction/prewrite.go @@ -471,9 +471,10 @@ func (action actionPrewrite) handleSingleBatch( c.store.GetLockResolver().UpdateResolvingLocks(locks, c.startTS, *resolvingRecordToken) } resolveLockOpts := txnlock.ResolveLocksOptions{ - CallerStartTS: c.startTS, - Locks: locks, - Detail: &c.getDetail().ResolveLock, + CallerStartTS: c.startTS, + Locks: locks, + Detail: &c.getDetail().ResolveLock, + PessimisticRegionResolve: false, } resolveLockRes, err := c.store.GetLockResolver().ResolveLocksWithOpts(bo, resolveLockOpts) if err != nil { diff --git a/txnkv/txnlock/lock_resolver.go b/txnkv/txnlock/lock_resolver.go index bd668a1f..3d1508e9 100644 --- a/txnkv/txnlock/lock_resolver.go +++ b/txnkv/txnlock/lock_resolver.go @@ -160,6 +160,11 @@ func (s TxnStatus) StatusCacheable() bool { return false } +func (s TxnStatus) String() string { + // TODO: print primary lock after redact is introduced. + return fmt.Sprintf("ttl:%v commit_ts:%v action: %v", s.ttl, s.commitTS, s.action) +} + // Lock represents a lock from tikv server. type Lock struct { Key []byte @@ -261,7 +266,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo // // `resolvePessimisticLock` should be called after calling `getTxnStatus`. // See: https://github.com/pingcap/tidb/issues/45134 - err := lr.resolvePessimisticLock(bo, l) + err := lr.resolvePessimisticLock(bo, l, false, nil) if err != nil { return false, err } @@ -350,11 +355,12 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo // ResolveLocksOptions is the options struct for calling resolving lock. type ResolveLocksOptions struct { - CallerStartTS uint64 - Locks []*Lock - Lite bool - ForRead bool - Detail *util.ResolveLockDetail + CallerStartTS uint64 + Locks []*Lock + Lite bool + ForRead bool + Detail *util.ResolveLockDetail + PessimisticRegionResolve bool } // ResolveLockResult is the result struct for resolving lock. @@ -441,7 +447,7 @@ func (lr *LockResolver) ResolveLocksDone(callerStartTS uint64, token int) { } func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, opts ResolveLocksOptions) (ResolveLockResult, error) { - callerStartTS, locks, forRead, lite, detail := opts.CallerStartTS, opts.Locks, opts.ForRead, opts.Lite, opts.Detail + callerStartTS, locks, forRead, lite, detail, pessimisticRegionResolve := opts.CallerStartTS, opts.Locks, opts.ForRead, opts.Lite, opts.Detail, opts.PessimisticRegionResolve if lr.testingKnobs.meetLock != nil { lr.testingKnobs.meetLock(locks) } @@ -464,6 +470,7 @@ func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, opts ResolveLocksOptio // TxnID -> []Region, record resolved Regions. // TODO: Maybe put it in LockResolver and share by all txns. cleanTxns := make(map[uint64]map[locate.RegionVerID]struct{}) + pessimisticCleanTxns := make(map[uint64]map[locate.RegionVerID]struct{}) var resolve func(*Lock, bool) (TxnStatus, error) resolve = func(l *Lock, forceSyncCommit bool) (TxnStatus, error) { status, err := lr.getTxnStatusFromLock(bo, l, callerStartTS, forceSyncCommit, detail) @@ -482,7 +489,8 @@ func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, opts ResolveLocksOptio return status, nil } - // If the lock is committed or rollbacked, resolve lock. + // If the lock is committed or rolled back, resolve lock. + // If the lock is regarded as an expired pessimistic lock, pessimistic rollback it. metrics.LockResolverCountWithExpired.Inc() cleanRegions, exists := cleanTxns[l.TxnID] if !exists { @@ -504,7 +512,16 @@ func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, opts ResolveLocksOptio } if l.LockType == kvrpcpb.Op_PessimisticLock { // pessimistic locks don't block read so it needn't be async. - err = lr.resolvePessimisticLock(bo, l) + if pessimisticRegionResolve { + pessimisticCleanRegions, exists := pessimisticCleanTxns[l.TxnID] + if !exists { + pessimisticCleanRegions = make(map[locate.RegionVerID]struct{}) + pessimisticCleanTxns[l.TxnID] = pessimisticCleanRegions + } + err = lr.resolvePessimisticLock(bo, l, true, pessimisticCleanRegions) + } else { + err = lr.resolvePessimisticLock(bo, l, false, nil) + } } else { if forRead { asyncCtx := context.WithValue(lr.asyncResolveCtx, util.RequestSourceKey, bo.GetCtx().Value(util.RequestSourceKey)) @@ -669,7 +686,8 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *retry.Backoffer, l *Lock, calle if lr.store.GetOracle().UntilExpired(l.TxnID, l.TTL, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) <= 0 { logutil.Logger(bo.GetCtx()).Warn("lock txn not found, lock has expired", zap.Uint64("CallerStartTs", callerStartTS), - zap.Stringer("lock str", l)) + zap.Stringer("lock str", l), + zap.Stringer("status", status)) if l.LockType == kvrpcpb.Op_PessimisticLock { if _, err := util.EvalFailpoint("txnExpireRetTTL"); err == nil { return TxnStatus{action: kvrpcpb.Action_LockNotExistDoNothing}, @@ -1178,7 +1196,10 @@ func (lr *LockResolver) resolveLock(bo *retry.Backoffer, l *Lock, status TxnStat // resolvePessimisticLock handles pessimistic locks after checking txn status. // Note that this function assumes `CheckTxnStatus` is done (or `getTxnStatusFromLock` has been called) on the lock. -func (lr *LockResolver) resolvePessimisticLock(bo *retry.Backoffer, l *Lock) error { +// When "pessimisticRegionResolve" is set to false, only pessimistic rollback input lock. Otherwise, the corresponding +// region will be scanned, and all relevant pessimistic locks that are read will be rolled back at the same time, +// similar to the `resolveLock` function. +func (lr *LockResolver) resolvePessimisticLock(bo *retry.Backoffer, l *Lock, pessimisticRegionResolve bool, pessimisticCleanRegions map[locate.RegionVerID]struct{}) error { metrics.LockResolverCountWithResolveLocks.Inc() // The lock has been resolved by getTxnStatusFromLock. if bytes.Equal(l.Key, l.Primary) { @@ -1189,6 +1210,11 @@ func (lr *LockResolver) resolvePessimisticLock(bo *retry.Backoffer, l *Lock) err if err != nil { return err } + if pessimisticRegionResolve && pessimisticCleanRegions != nil { + if _, ok := pessimisticCleanRegions[loc.Region]; ok { + return nil + } + } forUpdateTS := l.LockForUpdateTS if forUpdateTS == 0 { forUpdateTS = math.MaxUint64 @@ -1196,7 +1222,9 @@ func (lr *LockResolver) resolvePessimisticLock(bo *retry.Backoffer, l *Lock) err pessimisticRollbackReq := &kvrpcpb.PessimisticRollbackRequest{ StartVersion: l.TxnID, ForUpdateTs: forUpdateTS, - Keys: [][]byte{l.Key}, + } + if !pessimisticRegionResolve { + pessimisticRollbackReq.Keys = [][]byte{l.Key} } req := tikvrpc.NewRequest(tikvrpc.CmdPessimisticRollback, pessimisticRollbackReq, kvrpcpb.Context{ ResourceControlContext: &kvrpcpb.ResourceControlContext{ @@ -1228,6 +1256,9 @@ func (lr *LockResolver) resolvePessimisticLock(bo *retry.Backoffer, l *Lock) err logutil.Logger(bo.GetCtx()).Error("resolveLock error", zap.Error(err)) return err } + if pessimisticRegionResolve && pessimisticCleanRegions != nil { + pessimisticCleanRegions[loc.Region] = struct{}{} + } return nil } } diff --git a/txnkv/txnlock/test_probe.go b/txnkv/txnlock/test_probe.go index 67468bd3..6f91f252 100644 --- a/txnkv/txnlock/test_probe.go +++ b/txnkv/txnlock/test_probe.go @@ -58,7 +58,7 @@ func (l LockResolverProbe) ResolveLock(bo *retry.Backoffer, lock *Lock) error { // ResolvePessimisticLock resolves single pessimistic lock. func (l LockResolverProbe) ResolvePessimisticLock(bo *retry.Backoffer, lock *Lock) error { - return l.resolvePessimisticLock(bo, lock) + return l.resolvePessimisticLock(bo, lock, false, nil) } // GetTxnStatus sends the CheckTxnStatus request to the TiKV server.