diff --git a/integration_tests/2pc_test.go b/integration_tests/2pc_test.go index 0726e2a1..28f0eeb6 100644 --- a/integration_tests/2pc_test.go +++ b/integration_tests/2pc_test.go @@ -1283,6 +1283,49 @@ func (s *testCommitterSuite) TestAggressiveLockingInsert() { s.NoError(txn.Rollback()) } +func (s *testCommitterSuite) TestAggressiveLockingLockOnlyIfExists() { + // txn conflicts with txn0, key exists + txn := s.begin() + txn.SetPessimistic(true) + + txn0 := s.begin() + s.NoError(txn0.Set([]byte("k1"), []byte("v1"))) + s.NoError(txn0.Commit(context.Background())) + txn0CommitTS := txn0.GetCommitTS() + + txn.StartAggressiveLocking() + lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now(), ReturnValues: true, LockOnlyIfExists: true} + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1"))) + s.True(lockCtx.Values["k1"].Exists) + s.False(lockCtx.Values["k1"].AlreadyLocked) + s.Equal(lockCtx.Values["k1"].Value, []byte("v1")) + s.Equal(lockCtx.Values["k1"].LockedWithConflictTS, txn0CommitTS) + s.Equal(txn.GetAggressiveLockingKeys(), []string{"k1"}) + + txn.CancelAggressiveLocking(context.Background()) + s.NoError(txn.Rollback()) + + // txn conflicts with txn0, key doesn't exist. Returns WriteConflict error. + txn = s.begin() + txn.SetPessimistic(true) + + txn0 = s.begin() + s.NoError(txn0.Delete([]byte("k1"))) + s.NoError(txn0.Commit(context.Background())) + txn0CommitTS = txn0.GetCommitTS() + + txn.StartAggressiveLocking() + lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now(), ReturnValues: true, LockOnlyIfExists: true} + err := txn.LockKeys(context.Background(), lockCtx, []byte("k1")) + s.NotNil(err) + s.IsType(errors.Cause(err), &tikverr.ErrWriteConflict{}) + s.NotContains(lockCtx.Values, "k1") + s.Equal(txn.GetAggressiveLockingKeys(), []string{}) + + txn.CancelAggressiveLocking(context.Background()) + s.NoError(txn.Rollback()) +} + func (s *testCommitterSuite) TestAggressiveLockingSwitchPrimary() { txn := s.begin() txn.SetPessimistic(true) diff --git a/internal/mockstore/mocktikv/mvcc_leveldb.go b/internal/mockstore/mocktikv/mvcc_leveldb.go index 6891809b..3d3641e0 100644 --- a/internal/mockstore/mocktikv/mvcc_leveldb.go +++ b/internal/mockstore/mocktikv/mvcc_leveldb.go @@ -673,7 +673,7 @@ func (mvcc *MVCCLevelDB) pessimisticLockMutation(batch *leveldb.Batch, mutation // operation between startTS and forUpdateTS // It's also possible that the key is already locked by the same transaction. Also do the conflict check to // provide an idempotent result. - val, err := checkConflictValue(iter, mutation, forUpdateTS, startTS, true, kvrpcpb.AssertionLevel_Off, lctx.WakeUpMode == kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock) + val, err := checkConflictValue(iter, mutation, forUpdateTS, startTS, true, kvrpcpb.AssertionLevel_Off, lctx.LockOnlyIfExists, lctx.WakeUpMode == kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock) if err != nil { if conflict, ok := err.(*ErrConflict); lctx.WakeUpMode == kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock && ok && conflict.CanForceLock { lctx.results = append(lctx.results, &kvrpcpb.PessimisticLockKeyResult{ @@ -838,7 +838,7 @@ func (mvcc *MVCCLevelDB) Prewrite(req *kvrpcpb.PrewriteRequest) []error { return errs } -func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, forUpdateTS uint64, startTS uint64, getVal bool, assertionLevel kvrpcpb.AssertionLevel, allowLockWithConflict bool) ([]byte, error) { +func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, forUpdateTS uint64, startTS uint64, getVal bool, assertionLevel kvrpcpb.AssertionLevel, lockOnlyIfExists bool, allowLockWithConflict bool) ([]byte, error) { dec := &valueDecoder{ expectKey: m.Key, } @@ -921,6 +921,11 @@ func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, forUpdateTS uint64, } } } else if dec.value.valueType == typeDelete { + if lockOnlyIfExists && writeConflictErr != nil { + // If lockOnlyIfExists is enabled and the key doesn't exist, force locking shouldn't take effect. + return nil, writeConflictErr + } + needCheckShouldNotExistForPessimisticLock = false } @@ -999,7 +1004,7 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, // The minCommitTS has been pushed forward. minCommitTS = dec.lock.minCommitTS } - _, err = checkConflictValue(iter, mutation, startTS, startTS, false, assertionLevel, false) + _, err = checkConflictValue(iter, mutation, startTS, startTS, false, assertionLevel, false, false) if err != nil { return err } @@ -1007,7 +1012,7 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, if pessimisticAction == kvrpcpb.PrewriteRequest_DO_PESSIMISTIC_CHECK { return ErrAbort("pessimistic lock not found") } - _, err = checkConflictValue(iter, mutation, startTS, startTS, false, assertionLevel, false) + _, err = checkConflictValue(iter, mutation, startTS, startTS, false, assertionLevel, false, false) if err != nil { return err } diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index de48d6e5..2bc42d62 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -1046,7 +1046,7 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func() } } - if txn.IsInAggressiveLockingMode() && len(keys) == 1 && !lockCtx.LockOnlyIfExists { + if txn.IsInAggressiveLockingMode() && len(keys) == 1 { lockWakeUpMode = kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock } bo := retry.NewBackofferWithVars(ctx, pessimisticLockMaxBackoff, txn.vars)