mirror of https://github.com/tikv/client-go.git
Fix incorrect insertion behavior in aggressive locking mode and add tests (#651)
* Add more tests and fix the bug of insert in aggressive locking Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com> * Fix tests Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com> * Fix insert test Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com> * Avoid primary re-selecting in TestAggressiveLockingLoadValueOptionChanges Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com> * Address comments Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com> Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com> Co-authored-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
This commit is contained in:
parent
43601f3d49
commit
0f633e4163
|
|
@ -39,6 +39,7 @@ package tikv_test
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
stderrs "errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
|
@ -310,7 +311,7 @@ func (s *testCommitterSuite) TestContextCancel2() {
|
||||||
cancel()
|
cancel()
|
||||||
// Secondary keys should not be canceled.
|
// Secondary keys should not be canceled.
|
||||||
s.Eventually(func() bool {
|
s.Eventually(func() bool {
|
||||||
return !s.isKeyLocked([]byte("b"))
|
return !s.isKeyOptimisticLocked([]byte("b"))
|
||||||
}, 2*time.Second, 20*time.Millisecond, "Secondary locks are not committed after 2 seconds")
|
}, 2*time.Second, 20*time.Millisecond, "Secondary locks are not committed after 2 seconds")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -370,7 +371,7 @@ func (s *testCommitterSuite) mustGetRegionID(key []byte) uint64 {
|
||||||
return loc.Region.GetID()
|
return loc.Region.GetID()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *testCommitterSuite) isKeyLocked(key []byte) bool {
|
func (s *testCommitterSuite) isKeyOptimisticLocked(key []byte) bool {
|
||||||
ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
|
ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
|
||||||
s.Nil(err)
|
s.Nil(err)
|
||||||
bo := tikv.NewBackofferWithVars(context.Background(), 500, nil)
|
bo := tikv.NewBackofferWithVars(context.Background(), 500, nil)
|
||||||
|
|
@ -387,6 +388,34 @@ func (s *testCommitterSuite) isKeyLocked(key []byte) bool {
|
||||||
return keyErr.GetLocked() != nil
|
return keyErr.GetLocked() != nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *testCommitterSuite) checkIsKeyLocked(key []byte, expectedLocked bool) {
|
||||||
|
// To be aware of the result of async operations (e.g. async pessimistic rollback), retry if the check fails.
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
txn := s.begin()
|
||||||
|
txn.SetPessimistic(true)
|
||||||
|
|
||||||
|
lockCtx := kv.NewLockCtx(txn.StartTS(), kv.LockNoWait, time.Now())
|
||||||
|
err := txn.LockKeys(context.Background(), lockCtx, key)
|
||||||
|
|
||||||
|
var isCheckSuccess bool
|
||||||
|
if err != nil && stderrs.Is(err, tikverr.ErrLockAcquireFailAndNoWaitSet) {
|
||||||
|
isCheckSuccess = expectedLocked
|
||||||
|
} else {
|
||||||
|
s.Nil(err)
|
||||||
|
isCheckSuccess = !expectedLocked
|
||||||
|
}
|
||||||
|
|
||||||
|
if isCheckSuccess {
|
||||||
|
s.Nil(txn.Rollback())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
s.Nil(txn.Rollback())
|
||||||
|
time.Sleep(time.Millisecond * 50)
|
||||||
|
}
|
||||||
|
s.Fail(fmt.Sprintf("expected key %q locked = %v, but the actual result not match", string(key), expectedLocked))
|
||||||
|
}
|
||||||
|
|
||||||
func (s *testCommitterSuite) TestPrewriteCancel() {
|
func (s *testCommitterSuite) TestPrewriteCancel() {
|
||||||
// Setup region delays for key "b" and "c".
|
// Setup region delays for key "b" and "c".
|
||||||
delays := map[uint64]time.Duration{
|
delays := map[uint64]time.Duration{
|
||||||
|
|
@ -416,7 +445,7 @@ func (s *testCommitterSuite) TestPrewriteCancel() {
|
||||||
s.NotNil(err)
|
s.NotNil(err)
|
||||||
// "c" should be cleaned up in reasonable time.
|
// "c" should be cleaned up in reasonable time.
|
||||||
s.Eventually(func() bool {
|
s.Eventually(func() bool {
|
||||||
return !s.isKeyLocked([]byte("c"))
|
return !s.isKeyOptimisticLocked([]byte("c"))
|
||||||
}, 500*time.Millisecond, 10*time.Millisecond)
|
}, 500*time.Millisecond, 10*time.Millisecond)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1112,6 +1141,318 @@ func (s *testCommitterSuite) TestPessimisticLockAllowLockWithConflictError() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *testCommitterSuite) TestAggressiveLocking() {
|
||||||
|
for _, finalIsDone := range []bool{false, true} {
|
||||||
|
txn := s.begin()
|
||||||
|
txn.SetPessimistic(true)
|
||||||
|
s.False(txn.IsInAggressiveLockingMode())
|
||||||
|
|
||||||
|
// Lock some keys in normal way.
|
||||||
|
lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
|
||||||
|
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1"), []byte("k2")))
|
||||||
|
s.checkIsKeyLocked([]byte("k1"), true)
|
||||||
|
s.checkIsKeyLocked([]byte("k2"), true)
|
||||||
|
|
||||||
|
// Enter aggressive locking mode and lock some keys.
|
||||||
|
txn.StartAggressiveLocking()
|
||||||
|
lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
|
||||||
|
for _, key := range []string{"k2", "k3", "k4"} {
|
||||||
|
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte(key)))
|
||||||
|
s.checkIsKeyLocked([]byte(key), true)
|
||||||
|
}
|
||||||
|
s.True(!txn.IsInAggressiveLockingStage([]byte("k2")))
|
||||||
|
s.True(txn.IsInAggressiveLockingStage([]byte("k3")))
|
||||||
|
s.True(txn.IsInAggressiveLockingStage([]byte("k4")))
|
||||||
|
|
||||||
|
// Retry and change some of the keys to be locked.
|
||||||
|
txn.RetryAggressiveLocking(context.Background())
|
||||||
|
s.checkIsKeyLocked([]byte("k1"), true)
|
||||||
|
s.checkIsKeyLocked([]byte("k2"), true)
|
||||||
|
s.checkIsKeyLocked([]byte("k3"), true)
|
||||||
|
s.checkIsKeyLocked([]byte("k4"), true)
|
||||||
|
lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
|
||||||
|
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k4")))
|
||||||
|
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k5")))
|
||||||
|
s.checkIsKeyLocked([]byte("k4"), true)
|
||||||
|
s.checkIsKeyLocked([]byte("k5"), true)
|
||||||
|
|
||||||
|
// Retry again, then the unnecessary locks acquired in the previous stage should be released.
|
||||||
|
txn.RetryAggressiveLocking(context.Background())
|
||||||
|
s.checkIsKeyLocked([]byte("k1"), true)
|
||||||
|
s.checkIsKeyLocked([]byte("k2"), true)
|
||||||
|
s.checkIsKeyLocked([]byte("k3"), false)
|
||||||
|
s.checkIsKeyLocked([]byte("k4"), true)
|
||||||
|
s.checkIsKeyLocked([]byte("k5"), true)
|
||||||
|
|
||||||
|
// Lock some different keys again and then done or cancel.
|
||||||
|
lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
|
||||||
|
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k2")))
|
||||||
|
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k5")))
|
||||||
|
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k6")))
|
||||||
|
|
||||||
|
if finalIsDone {
|
||||||
|
txn.DoneAggressiveLocking(context.Background())
|
||||||
|
time.Sleep(time.Millisecond * 50)
|
||||||
|
s.checkIsKeyLocked([]byte("k1"), true)
|
||||||
|
s.checkIsKeyLocked([]byte("k2"), true)
|
||||||
|
s.checkIsKeyLocked([]byte("k3"), false)
|
||||||
|
s.checkIsKeyLocked([]byte("k4"), false)
|
||||||
|
s.checkIsKeyLocked([]byte("k5"), true)
|
||||||
|
s.checkIsKeyLocked([]byte("k6"), true)
|
||||||
|
} else {
|
||||||
|
txn.CancelAggressiveLocking(context.Background())
|
||||||
|
time.Sleep(time.Millisecond * 50)
|
||||||
|
s.checkIsKeyLocked([]byte("k1"), true)
|
||||||
|
s.checkIsKeyLocked([]byte("k2"), true)
|
||||||
|
s.checkIsKeyLocked([]byte("k3"), false)
|
||||||
|
s.checkIsKeyLocked([]byte("k4"), false)
|
||||||
|
s.checkIsKeyLocked([]byte("k5"), false)
|
||||||
|
s.checkIsKeyLocked([]byte("k6"), false)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.NoError(txn.Rollback())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *testCommitterSuite) TestAggressiveLockingInsert() {
|
||||||
|
txn0 := s.begin()
|
||||||
|
s.NoError(txn0.Set([]byte("k1"), []byte("v1")))
|
||||||
|
s.NoError(txn0.Set([]byte("k3"), []byte("v3")))
|
||||||
|
s.NoError(txn0.Set([]byte("k6"), []byte("v6")))
|
||||||
|
s.NoError(txn0.Set([]byte("k8"), []byte("v8")))
|
||||||
|
s.NoError(txn0.Commit(context.Background()))
|
||||||
|
|
||||||
|
txn := s.begin()
|
||||||
|
txn.SetPessimistic(true)
|
||||||
|
|
||||||
|
lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
|
||||||
|
lockCtx.InitReturnValues(2)
|
||||||
|
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1"), []byte("k2")))
|
||||||
|
s.NoError(txn.Set([]byte("k5"), []byte("v5")))
|
||||||
|
s.NoError(txn.Delete([]byte("k6")))
|
||||||
|
|
||||||
|
insertPessimisticLock := func(lockCtx *kv.LockCtx, key string) error {
|
||||||
|
txn.GetMemBuffer().UpdateFlags([]byte(key), kv.SetPresumeKeyNotExists)
|
||||||
|
if lockCtx == nil {
|
||||||
|
lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
|
||||||
|
}
|
||||||
|
return txn.LockKeys(context.Background(), lockCtx, []byte(key))
|
||||||
|
}
|
||||||
|
|
||||||
|
mustAlreadyExist := func(err error) {
|
||||||
|
if _, ok := errors.Cause(err).(*tikverr.ErrKeyExist); !ok {
|
||||||
|
s.Fail(fmt.Sprintf("expected KeyExist error, but got: %+q", err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
txn.StartAggressiveLocking()
|
||||||
|
// Already-locked before aggressive locking.
|
||||||
|
mustAlreadyExist(insertPessimisticLock(nil, "k1"))
|
||||||
|
s.NoError(insertPessimisticLock(nil, "k2"))
|
||||||
|
// Acquiring new locks normally.
|
||||||
|
mustAlreadyExist(insertPessimisticLock(nil, "k3"))
|
||||||
|
s.NoError(insertPessimisticLock(nil, "k4"))
|
||||||
|
// The key added or deleted in the same transaction before entering aggressive locking.
|
||||||
|
// Since TiDB can detect it before invoking LockKeys, client-go actually didn't handle this case for now (no matter
|
||||||
|
// if in aggressive locking or not). So skip this test case here, and it can be uncommented if someday client-go
|
||||||
|
// supports such check.
|
||||||
|
// mustAlreadyExist(insertPessimisticLock(nil, "k5"))
|
||||||
|
// s.NoError(insertPessimisticLock(nil, "k6"))
|
||||||
|
|
||||||
|
// Locked with conflict and then do pessimistic retry.
|
||||||
|
txn2 := s.begin()
|
||||||
|
s.NoError(txn2.Set([]byte("k7"), []byte("v7")))
|
||||||
|
s.NoError(txn2.Delete([]byte("k8")))
|
||||||
|
s.NoError(txn2.Commit(context.Background()))
|
||||||
|
lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
|
||||||
|
err := insertPessimisticLock(lockCtx, "k7")
|
||||||
|
s.IsType(errors.Cause(err), &tikverr.ErrWriteConflict{})
|
||||||
|
lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
|
||||||
|
s.NoError(insertPessimisticLock(lockCtx, "k8"))
|
||||||
|
s.Equal(txn2.GetCommitTS(), lockCtx.MaxLockedWithConflictTS)
|
||||||
|
s.Equal(txn2.GetCommitTS(), lockCtx.Values["k8"].LockedWithConflictTS)
|
||||||
|
// Update forUpdateTS to simulate a pessimistic retry.
|
||||||
|
newForUpdateTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
|
||||||
|
s.Nil(err)
|
||||||
|
s.GreaterOrEqual(newForUpdateTS, txn2.GetCommitTS())
|
||||||
|
lockCtx = &kv.LockCtx{ForUpdateTS: newForUpdateTS, WaitStartTime: time.Now()}
|
||||||
|
mustAlreadyExist(insertPessimisticLock(lockCtx, "k7"))
|
||||||
|
s.NoError(insertPessimisticLock(lockCtx, "k8"))
|
||||||
|
|
||||||
|
txn.CancelAggressiveLocking(context.Background())
|
||||||
|
s.NoError(txn.Rollback())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *testCommitterSuite) TestAggressiveLockingSwitchPrimary() {
|
||||||
|
txn := s.begin()
|
||||||
|
txn.SetPessimistic(true)
|
||||||
|
checkPrimary := func(key string, expectedPrimary string) {
|
||||||
|
lockInfo := s.getLockInfo([]byte(key))
|
||||||
|
s.Equal(kvrpcpb.Op_PessimisticLock, lockInfo.LockType)
|
||||||
|
s.Equal(expectedPrimary, string(lockInfo.PrimaryLock))
|
||||||
|
}
|
||||||
|
|
||||||
|
forUpdateTS := txn.StartTS()
|
||||||
|
txn.StartAggressiveLocking()
|
||||||
|
lockCtx := &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()}
|
||||||
|
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1")))
|
||||||
|
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k2")))
|
||||||
|
checkPrimary("k1", "k1")
|
||||||
|
checkPrimary("k2", "k1")
|
||||||
|
|
||||||
|
// Primary not changed.
|
||||||
|
forUpdateTS++
|
||||||
|
txn.RetryAggressiveLocking(context.Background())
|
||||||
|
lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()}
|
||||||
|
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1")))
|
||||||
|
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k3")))
|
||||||
|
checkPrimary("k1", "k1")
|
||||||
|
checkPrimary("k3", "k1")
|
||||||
|
|
||||||
|
// Primary changed and is not in the set of previously locked keys.
|
||||||
|
forUpdateTS++
|
||||||
|
txn.RetryAggressiveLocking(context.Background())
|
||||||
|
lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()}
|
||||||
|
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k4")))
|
||||||
|
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k5")))
|
||||||
|
checkPrimary("k4", "k4")
|
||||||
|
checkPrimary("k5", "k4")
|
||||||
|
// Previously locked keys that are not in the most recent aggressive locking stage will be released.
|
||||||
|
s.checkIsKeyLocked([]byte("k2"), false)
|
||||||
|
|
||||||
|
// Primary changed and is in the set of previously locked keys.
|
||||||
|
forUpdateTS++
|
||||||
|
txn.RetryAggressiveLocking(context.Background())
|
||||||
|
lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()}
|
||||||
|
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k5")))
|
||||||
|
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k6")))
|
||||||
|
checkPrimary("k5", "k5")
|
||||||
|
checkPrimary("k6", "k5")
|
||||||
|
s.checkIsKeyLocked([]byte("k1"), false)
|
||||||
|
s.checkIsKeyLocked([]byte("k3"), false)
|
||||||
|
|
||||||
|
// Primary changed and is locked *before* the previous aggressive locking stage (suppose it's the n-th retry,
|
||||||
|
// the expected primary is locked during the (n-2)-th retry).
|
||||||
|
forUpdateTS++
|
||||||
|
txn.RetryAggressiveLocking(context.Background())
|
||||||
|
lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()}
|
||||||
|
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k7")))
|
||||||
|
forUpdateTS++
|
||||||
|
txn.RetryAggressiveLocking(context.Background())
|
||||||
|
lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()}
|
||||||
|
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k6")))
|
||||||
|
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k5")))
|
||||||
|
checkPrimary("k5", "k6")
|
||||||
|
checkPrimary("k6", "k6")
|
||||||
|
|
||||||
|
txn.CancelAggressiveLocking(context.Background())
|
||||||
|
// Check all released.
|
||||||
|
for i := 0; i < 6; i++ {
|
||||||
|
key := []byte{byte('k'), byte('1') + byte(i)}
|
||||||
|
s.checkIsKeyLocked(key, false)
|
||||||
|
}
|
||||||
|
s.NoError(txn.Rollback())
|
||||||
|
|
||||||
|
// Also test the primary-switching logic won't misbehave when the primary is already selected before entering
|
||||||
|
// aggressive locking.
|
||||||
|
txn = s.begin()
|
||||||
|
txn.SetPessimistic(true)
|
||||||
|
forUpdateTS = txn.StartTS()
|
||||||
|
lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()}
|
||||||
|
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1"), []byte("k2")))
|
||||||
|
checkPrimary("k1", "k1")
|
||||||
|
checkPrimary("k2", "k1")
|
||||||
|
|
||||||
|
txn.StartAggressiveLocking()
|
||||||
|
lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()}
|
||||||
|
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k2")))
|
||||||
|
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k3")))
|
||||||
|
checkPrimary("k2", "k1")
|
||||||
|
checkPrimary("k3", "k1")
|
||||||
|
|
||||||
|
forUpdateTS++
|
||||||
|
txn.RetryAggressiveLocking(context.Background())
|
||||||
|
lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()}
|
||||||
|
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k3")))
|
||||||
|
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k4")))
|
||||||
|
checkPrimary("k3", "k1")
|
||||||
|
checkPrimary("k4", "k1")
|
||||||
|
|
||||||
|
txn.CancelAggressiveLocking(context.Background())
|
||||||
|
s.checkIsKeyLocked([]byte("k1"), true)
|
||||||
|
s.checkIsKeyLocked([]byte("k2"), true)
|
||||||
|
s.checkIsKeyLocked([]byte("k3"), false)
|
||||||
|
s.checkIsKeyLocked([]byte("k4"), false)
|
||||||
|
s.NoError(txn.Rollback())
|
||||||
|
s.checkIsKeyLocked([]byte("k1"), false)
|
||||||
|
s.checkIsKeyLocked([]byte("k2"), false)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *testCommitterSuite) TestAggressiveLockingLoadValueOptionChanges() {
|
||||||
|
txn0 := s.begin()
|
||||||
|
s.NoError(txn0.Set([]byte("k2"), []byte("v2")))
|
||||||
|
s.NoError(txn0.Commit(context.Background()))
|
||||||
|
|
||||||
|
for _, firstAttemptLockedWithConflict := range []bool{false, true} {
|
||||||
|
txn := s.begin()
|
||||||
|
txn.SetPessimistic(true)
|
||||||
|
|
||||||
|
// Make the primary deterministic to avoid the following test code involves primary re-selecting logic.
|
||||||
|
lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
|
||||||
|
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k0")))
|
||||||
|
|
||||||
|
forUpdateTS := txn.StartTS()
|
||||||
|
txn.StartAggressiveLocking()
|
||||||
|
lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()}
|
||||||
|
|
||||||
|
var txn2 transaction.TxnProbe
|
||||||
|
if firstAttemptLockedWithConflict {
|
||||||
|
txn2 = s.begin()
|
||||||
|
s.NoError(txn2.Delete([]byte("k1")))
|
||||||
|
s.NoError(txn2.Set([]byte("k2"), []byte("v2")))
|
||||||
|
s.NoError(txn2.Commit(context.Background()))
|
||||||
|
}
|
||||||
|
|
||||||
|
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1")))
|
||||||
|
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k2")))
|
||||||
|
|
||||||
|
if firstAttemptLockedWithConflict {
|
||||||
|
s.Equal(txn2.GetCommitTS(), lockCtx.MaxLockedWithConflictTS)
|
||||||
|
s.Equal(txn2.GetCommitTS(), lockCtx.Values["k1"].LockedWithConflictTS)
|
||||||
|
s.Equal(txn2.GetCommitTS(), lockCtx.Values["k2"].LockedWithConflictTS)
|
||||||
|
}
|
||||||
|
|
||||||
|
if firstAttemptLockedWithConflict {
|
||||||
|
forUpdateTS = txn2.GetCommitTS() + 1
|
||||||
|
} else {
|
||||||
|
forUpdateTS++
|
||||||
|
}
|
||||||
|
lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()}
|
||||||
|
lockCtx.InitCheckExistence(2)
|
||||||
|
txn.RetryAggressiveLocking(context.Background())
|
||||||
|
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1")))
|
||||||
|
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k2")))
|
||||||
|
s.Equal(uint64(0), lockCtx.MaxLockedWithConflictTS)
|
||||||
|
s.Equal(false, lockCtx.Values["k1"].Exists)
|
||||||
|
s.Equal(true, lockCtx.Values["k2"].Exists)
|
||||||
|
|
||||||
|
forUpdateTS++
|
||||||
|
lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()}
|
||||||
|
lockCtx.InitReturnValues(2)
|
||||||
|
txn.RetryAggressiveLocking(context.Background())
|
||||||
|
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1")))
|
||||||
|
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k2")))
|
||||||
|
s.Equal(uint64(0), lockCtx.MaxLockedWithConflictTS)
|
||||||
|
s.Equal(false, lockCtx.Values["k1"].Exists)
|
||||||
|
s.Equal(true, lockCtx.Values["k2"].Exists)
|
||||||
|
s.Equal([]byte("v2"), lockCtx.Values["k2"].Value)
|
||||||
|
|
||||||
|
txn.CancelAggressiveLocking(context.Background())
|
||||||
|
s.NoError(txn.Rollback())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TestElapsedTTL tests that elapsed time is correct even if ts physical time is greater than local time.
|
// TestElapsedTTL tests that elapsed time is correct even if ts physical time is greater than local time.
|
||||||
func (s *testCommitterSuite) TestElapsedTTL() {
|
func (s *testCommitterSuite) TestElapsedTTL() {
|
||||||
key := []byte("key")
|
key := []byte("key")
|
||||||
|
|
|
||||||
|
|
@ -107,6 +107,7 @@ type ErrConflict struct {
|
||||||
ConflictTS uint64
|
ConflictTS uint64
|
||||||
ConflictCommitTS uint64
|
ConflictCommitTS uint64
|
||||||
Key []byte
|
Key []byte
|
||||||
|
CanForceLock bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ErrConflict) Error() string {
|
func (e *ErrConflict) Error() string {
|
||||||
|
|
|
||||||
|
|
@ -650,12 +650,13 @@ func (mvcc *MVCCLevelDB) pessimisticLockMutation(batch *leveldb.Batch, mutation
|
||||||
dec := lockDecoder{
|
dec := lockDecoder{
|
||||||
expectKey: mutation.Key,
|
expectKey: mutation.Key,
|
||||||
}
|
}
|
||||||
ok, err := dec.Decode(iter)
|
alreadyLocked, err := dec.Decode(iter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if ok {
|
if alreadyLocked {
|
||||||
if dec.lock.startTS != startTS {
|
if dec.lock.startTS != startTS {
|
||||||
|
// Locked by another transaction.
|
||||||
errDeadlock := mvcc.deadlockDetector.Detect(startTS, dec.lock.startTS, farm.Fingerprint64(mutation.Key))
|
errDeadlock := mvcc.deadlockDetector.Detect(startTS, dec.lock.startTS, farm.Fingerprint64(mutation.Key))
|
||||||
if errDeadlock != nil {
|
if errDeadlock != nil {
|
||||||
return &ErrDeadlock{
|
return &ErrDeadlock{
|
||||||
|
|
@ -666,14 +667,15 @@ func (mvcc *MVCCLevelDB) pessimisticLockMutation(batch *leveldb.Batch, mutation
|
||||||
}
|
}
|
||||||
return dec.lock.lockErr(mutation.Key)
|
return dec.lock.lockErr(mutation.Key)
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// For pessimisticLockMutation, check the correspond rollback record, there may be rollbackLock
|
// For pessimisticLockMutation, check the corresponding rollback record, there may be rollbackLock
|
||||||
// operation between startTS and forUpdateTS
|
// operation between startTS and forUpdateTS
|
||||||
|
// It's also possible that the key is already locked by the same transaction. Also do the conflict check to
|
||||||
|
// provide an idempotent result.
|
||||||
val, err := checkConflictValue(iter, mutation, forUpdateTS, startTS, true, kvrpcpb.AssertionLevel_Off, lctx.WakeUpMode == kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock)
|
val, err := checkConflictValue(iter, mutation, forUpdateTS, startTS, true, kvrpcpb.AssertionLevel_Off, lctx.WakeUpMode == kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if conflict, ok := err.(*ErrConflict); lctx.WakeUpMode == kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock && ok {
|
if conflict, ok := err.(*ErrConflict); lctx.WakeUpMode == kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock && ok && conflict.CanForceLock {
|
||||||
lctx.results = append(lctx.results, &kvrpcpb.PessimisticLockKeyResult{
|
lctx.results = append(lctx.results, &kvrpcpb.PessimisticLockKeyResult{
|
||||||
Type: kvrpcpb.PessimisticLockKeyResultType_LockResultLockedWithConflict,
|
Type: kvrpcpb.PessimisticLockKeyResultType_LockResultLockedWithConflict,
|
||||||
Value: val,
|
Value: val,
|
||||||
|
|
@ -709,21 +711,23 @@ func (mvcc *MVCCLevelDB) pessimisticLockMutation(batch *leveldb.Batch, mutation
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
lock := mvccLock{
|
if !alreadyLocked || dec.lock.forUpdateTS < forUpdateTS {
|
||||||
startTS: startTS,
|
lock := mvccLock{
|
||||||
primary: lctx.primary,
|
startTS: startTS,
|
||||||
op: kvrpcpb.Op_PessimisticLock,
|
primary: lctx.primary,
|
||||||
ttl: lctx.ttl,
|
op: kvrpcpb.Op_PessimisticLock,
|
||||||
forUpdateTS: forUpdateTS,
|
ttl: lctx.ttl,
|
||||||
minCommitTS: lctx.minCommitTs,
|
forUpdateTS: forUpdateTS,
|
||||||
}
|
minCommitTS: lctx.minCommitTs,
|
||||||
writeKey := mvccEncode(mutation.Key, lockVer)
|
}
|
||||||
writeValue, err := lock.MarshalBinary()
|
writeKey := mvccEncode(mutation.Key, lockVer)
|
||||||
if err != nil {
|
writeValue, err := lock.MarshalBinary()
|
||||||
return err
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
batch.Put(writeKey, writeValue)
|
||||||
}
|
}
|
||||||
|
|
||||||
batch.Put(writeKey, writeValue)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -899,12 +903,11 @@ func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, forUpdateTS uint64,
|
||||||
|
|
||||||
if dec.value.valueType == typePut || dec.value.valueType == typeLock {
|
if dec.value.valueType == typePut || dec.value.valueType == typeLock {
|
||||||
if needCheckShouldNotExistForPessimisticLock {
|
if needCheckShouldNotExistForPessimisticLock {
|
||||||
return nil, &ErrAssertionFailed{
|
if writeConflictErr != nil {
|
||||||
StartTS: startTS,
|
return nil, writeConflictErr
|
||||||
Key: m.Key,
|
}
|
||||||
Assertion: m.Assertion,
|
return nil, &ErrKeyAlreadyExist{
|
||||||
ExistingStartTS: dec.value.startTS,
|
Key: m.Key,
|
||||||
ExistingCommitTS: dec.value.commitTS,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if needCheckAssertionForPrewerite && m.Assertion == kvrpcpb.Assertion_NotExist {
|
if needCheckAssertionForPrewerite && m.Assertion == kvrpcpb.Assertion_NotExist {
|
||||||
|
|
@ -947,6 +950,9 @@ func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, forUpdateTS uint64,
|
||||||
}
|
}
|
||||||
|
|
||||||
// writeConflictErr is not nil only when write conflict is found and `allowLockWithConflict is set to true.
|
// writeConflictErr is not nil only when write conflict is found and `allowLockWithConflict is set to true.
|
||||||
|
if writeConflictErr != nil {
|
||||||
|
writeConflictErr.(*ErrConflict).CanForceLock = true
|
||||||
|
}
|
||||||
if getVal {
|
if getVal {
|
||||||
return retVal, writeConflictErr
|
return retVal, writeConflictErr
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -818,7 +818,9 @@ func (txn *KVTxn) filterAggressiveLockedKeys(lockCtx *tikv.LockCtx, allKeys [][]
|
||||||
!txn.mayAggressiveLockingLastLockedKeysExpire() {
|
!txn.mayAggressiveLockingLastLockedKeysExpire() {
|
||||||
// We can skip locking it since it's already locked during last attempt to aggressive locking, and
|
// We can skip locking it since it's already locked during last attempt to aggressive locking, and
|
||||||
// we already have the information that we need.
|
// we already have the information that we need.
|
||||||
lockCtx.Values[keyStr] = lastResult.Value
|
if lockCtx.Values != nil {
|
||||||
|
lockCtx.Values[keyStr] = lastResult.Value
|
||||||
|
}
|
||||||
txn.aggressiveLockingContext.currentLockedKeys[keyStr] = lastResult
|
txn.aggressiveLockingContext.currentLockedKeys[keyStr] = lastResult
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
@ -886,16 +888,24 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput
|
||||||
checkKeyExists = flags.HasNeedCheckExists()
|
checkKeyExists = flags.HasNeedCheckExists()
|
||||||
}
|
}
|
||||||
// If the key is locked in the current aggressive locking stage, override the information in memBuf.
|
// If the key is locked in the current aggressive locking stage, override the information in memBuf.
|
||||||
|
isInLastAggressiveLockingStage := false
|
||||||
if txn.aggressiveLockingContext != nil {
|
if txn.aggressiveLockingContext != nil {
|
||||||
if entry, ok := txn.aggressiveLockingContext.currentLockedKeys[string(key)]; ok {
|
if entry, ok := txn.aggressiveLockingContext.currentLockedKeys[string(key)]; ok {
|
||||||
locked = true
|
locked = true
|
||||||
valueExist = entry.Value.Exists
|
valueExist = entry.Value.Exists
|
||||||
|
} else if entry, ok := txn.aggressiveLockingContext.lastRetryUnnecessaryLocks[string(key)]; ok {
|
||||||
|
locked = true
|
||||||
|
valueExist = entry.Value.Exists
|
||||||
|
isInLastAggressiveLockingStage = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !locked {
|
if !locked || isInLastAggressiveLockingStage {
|
||||||
|
// Locks acquired in the previous aggressive locking stage might need to be updated later in
|
||||||
|
// `filterAggressiveLockedKeys`.
|
||||||
keys = append(keys, key)
|
keys = append(keys, key)
|
||||||
} else if txn.IsPessimistic() {
|
}
|
||||||
|
if locked && txn.IsPessimistic() {
|
||||||
if checkKeyExists && valueExist {
|
if checkKeyExists && valueExist {
|
||||||
alreadyExist := kvrpcpb.AlreadyExist{Key: key}
|
alreadyExist := kvrpcpb.AlreadyExist{Key: key}
|
||||||
e := &tikverr.ErrKeyExist{AlreadyExist: &alreadyExist}
|
e := &tikverr.ErrKeyExist{AlreadyExist: &alreadyExist}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue