mirror of https://github.com/tikv/client-go.git
txn: support force locking and lock only if exists be used together (#727)
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com> Co-authored-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
This commit is contained in:
parent
df993878bd
commit
f555fdd2c9
|
|
@ -1283,6 +1283,49 @@ func (s *testCommitterSuite) TestAggressiveLockingInsert() {
|
||||||
s.NoError(txn.Rollback())
|
s.NoError(txn.Rollback())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *testCommitterSuite) TestAggressiveLockingLockOnlyIfExists() {
|
||||||
|
// txn conflicts with txn0, key exists
|
||||||
|
txn := s.begin()
|
||||||
|
txn.SetPessimistic(true)
|
||||||
|
|
||||||
|
txn0 := s.begin()
|
||||||
|
s.NoError(txn0.Set([]byte("k1"), []byte("v1")))
|
||||||
|
s.NoError(txn0.Commit(context.Background()))
|
||||||
|
txn0CommitTS := txn0.GetCommitTS()
|
||||||
|
|
||||||
|
txn.StartAggressiveLocking()
|
||||||
|
lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now(), ReturnValues: true, LockOnlyIfExists: true}
|
||||||
|
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1")))
|
||||||
|
s.True(lockCtx.Values["k1"].Exists)
|
||||||
|
s.False(lockCtx.Values["k1"].AlreadyLocked)
|
||||||
|
s.Equal(lockCtx.Values["k1"].Value, []byte("v1"))
|
||||||
|
s.Equal(lockCtx.Values["k1"].LockedWithConflictTS, txn0CommitTS)
|
||||||
|
s.Equal(txn.GetAggressiveLockingKeys(), []string{"k1"})
|
||||||
|
|
||||||
|
txn.CancelAggressiveLocking(context.Background())
|
||||||
|
s.NoError(txn.Rollback())
|
||||||
|
|
||||||
|
// txn conflicts with txn0, key doesn't exist. Returns WriteConflict error.
|
||||||
|
txn = s.begin()
|
||||||
|
txn.SetPessimistic(true)
|
||||||
|
|
||||||
|
txn0 = s.begin()
|
||||||
|
s.NoError(txn0.Delete([]byte("k1")))
|
||||||
|
s.NoError(txn0.Commit(context.Background()))
|
||||||
|
txn0CommitTS = txn0.GetCommitTS()
|
||||||
|
|
||||||
|
txn.StartAggressiveLocking()
|
||||||
|
lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now(), ReturnValues: true, LockOnlyIfExists: true}
|
||||||
|
err := txn.LockKeys(context.Background(), lockCtx, []byte("k1"))
|
||||||
|
s.NotNil(err)
|
||||||
|
s.IsType(errors.Cause(err), &tikverr.ErrWriteConflict{})
|
||||||
|
s.NotContains(lockCtx.Values, "k1")
|
||||||
|
s.Equal(txn.GetAggressiveLockingKeys(), []string{})
|
||||||
|
|
||||||
|
txn.CancelAggressiveLocking(context.Background())
|
||||||
|
s.NoError(txn.Rollback())
|
||||||
|
}
|
||||||
|
|
||||||
func (s *testCommitterSuite) TestAggressiveLockingSwitchPrimary() {
|
func (s *testCommitterSuite) TestAggressiveLockingSwitchPrimary() {
|
||||||
txn := s.begin()
|
txn := s.begin()
|
||||||
txn.SetPessimistic(true)
|
txn.SetPessimistic(true)
|
||||||
|
|
|
||||||
|
|
@ -673,7 +673,7 @@ func (mvcc *MVCCLevelDB) pessimisticLockMutation(batch *leveldb.Batch, mutation
|
||||||
// operation between startTS and forUpdateTS
|
// operation between startTS and forUpdateTS
|
||||||
// It's also possible that the key is already locked by the same transaction. Also do the conflict check to
|
// It's also possible that the key is already locked by the same transaction. Also do the conflict check to
|
||||||
// provide an idempotent result.
|
// provide an idempotent result.
|
||||||
val, err := checkConflictValue(iter, mutation, forUpdateTS, startTS, true, kvrpcpb.AssertionLevel_Off, lctx.WakeUpMode == kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock)
|
val, err := checkConflictValue(iter, mutation, forUpdateTS, startTS, true, kvrpcpb.AssertionLevel_Off, lctx.LockOnlyIfExists, lctx.WakeUpMode == kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if conflict, ok := err.(*ErrConflict); lctx.WakeUpMode == kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock && ok && conflict.CanForceLock {
|
if conflict, ok := err.(*ErrConflict); lctx.WakeUpMode == kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock && ok && conflict.CanForceLock {
|
||||||
lctx.results = append(lctx.results, &kvrpcpb.PessimisticLockKeyResult{
|
lctx.results = append(lctx.results, &kvrpcpb.PessimisticLockKeyResult{
|
||||||
|
|
@ -838,7 +838,7 @@ func (mvcc *MVCCLevelDB) Prewrite(req *kvrpcpb.PrewriteRequest) []error {
|
||||||
return errs
|
return errs
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, forUpdateTS uint64, startTS uint64, getVal bool, assertionLevel kvrpcpb.AssertionLevel, allowLockWithConflict bool) ([]byte, error) {
|
func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, forUpdateTS uint64, startTS uint64, getVal bool, assertionLevel kvrpcpb.AssertionLevel, lockOnlyIfExists bool, allowLockWithConflict bool) ([]byte, error) {
|
||||||
dec := &valueDecoder{
|
dec := &valueDecoder{
|
||||||
expectKey: m.Key,
|
expectKey: m.Key,
|
||||||
}
|
}
|
||||||
|
|
@ -921,6 +921,11 @@ func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, forUpdateTS uint64,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if dec.value.valueType == typeDelete {
|
} else if dec.value.valueType == typeDelete {
|
||||||
|
if lockOnlyIfExists && writeConflictErr != nil {
|
||||||
|
// If lockOnlyIfExists is enabled and the key doesn't exist, force locking shouldn't take effect.
|
||||||
|
return nil, writeConflictErr
|
||||||
|
}
|
||||||
|
|
||||||
needCheckShouldNotExistForPessimisticLock = false
|
needCheckShouldNotExistForPessimisticLock = false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -999,7 +1004,7 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch,
|
||||||
// The minCommitTS has been pushed forward.
|
// The minCommitTS has been pushed forward.
|
||||||
minCommitTS = dec.lock.minCommitTS
|
minCommitTS = dec.lock.minCommitTS
|
||||||
}
|
}
|
||||||
_, err = checkConflictValue(iter, mutation, startTS, startTS, false, assertionLevel, false)
|
_, err = checkConflictValue(iter, mutation, startTS, startTS, false, assertionLevel, false, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -1007,7 +1012,7 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch,
|
||||||
if pessimisticAction == kvrpcpb.PrewriteRequest_DO_PESSIMISTIC_CHECK {
|
if pessimisticAction == kvrpcpb.PrewriteRequest_DO_PESSIMISTIC_CHECK {
|
||||||
return ErrAbort("pessimistic lock not found")
|
return ErrAbort("pessimistic lock not found")
|
||||||
}
|
}
|
||||||
_, err = checkConflictValue(iter, mutation, startTS, startTS, false, assertionLevel, false)
|
_, err = checkConflictValue(iter, mutation, startTS, startTS, false, assertionLevel, false, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1046,7 +1046,7 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if txn.IsInAggressiveLockingMode() && len(keys) == 1 && !lockCtx.LockOnlyIfExists {
|
if txn.IsInAggressiveLockingMode() && len(keys) == 1 {
|
||||||
lockWakeUpMode = kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock
|
lockWakeUpMode = kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock
|
||||||
}
|
}
|
||||||
bo := retry.NewBackofferWithVars(ctx, pessimisticLockMaxBackoff, txn.vars)
|
bo := retry.NewBackofferWithVars(ctx, pessimisticLockMaxBackoff, txn.vars)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue