mirror of https://github.com/tikv/client-go.git
PessimisticLock: Lock the specified key only if the key exists (#561)
* optimize for lock if exists Signed-off-by: TonsnakeLin <lpbgytong@163.com> * fix bugs for lock if exists Signed-off-by: TonsnakeLin <lpbgytong@163.com> * optimize lock info Signed-off-by: TonsnakeLin <lpbgytong@163.com> * fix bugs Signed-off-by: TonsnakeLin <lpbgytong@163.com> * remove the contrl flag for lock stats Signed-off-by: TonsnakeLin <lpbgytong@163.com> * update kvproto Signed-off-by: TonsnakeLin <lpbgytong@163.com> * change to LockOnlyIfExists Signed-off-by: TonsnakeLin <lpbgytong@163.com> * update kvproto Signed-off-by: TonsnakeLin <lpbgytong@163.com> * change to LockOnlyIfExists Signed-off-by: TonsnakeLin <lpbgytong@163.com> * make test pass Signed-off-by: TonsnakeLin <lpbgytong@163.com> * Update txnkv/transaction/txn.go Co-authored-by: MyonKeminta <9948422+MyonKeminta@users.noreply.github.com> Signed-off-by: TonsnakeLin <lpbgytong@163.com> * *: add getPDClient to rawKV public api (#570) Signed-off-by: dongxu <i@huangdx.net> Signed-off-by: dongxu <i@huangdx.net> Signed-off-by: TonsnakeLin <lpbgytong@163.com> * Update README.md (#571) Signed-off-by: dongxu <i@huangdx.net> Signed-off-by: TonsnakeLin <lpbgytong@163.com> * update dependency of integration test (#572) Signed-off-by: cfzjywxk <lsswxrxr@163.com> Signed-off-by: cfzjywxk <lsswxrxr@163.com> Signed-off-by: TonsnakeLin <lpbgytong@163.com> * Support postponed conflict check (#556) * replace kvproto Signed-off-by: ekexium <eke@fastmail.com> * support NeedConflictCheck Signed-off-by: ekexium <eke@fastmail.com> * fix mutation encoding Signed-off-by: ekexium <eke@fastmail.com> * support temporary flag Signed-off-by: ekexium <eke@fastmail.com> * update kvproto Signed-off-by: ekexium <eke@fastmail.com> * fix style Signed-off-by: ekexium <eke@fastmail.com> * add an option to enable the behavior Signed-off-by: ekexium <eke@fastmail.com> * replace AfterCheckPoint with existing canModity Signed-off-by: ekexium <eke@fastmail.com> * UpdateFlag do not unset temporary flag Signed-off-by: ekexium <eke@fastmail.com> * remove unused function Signed-off-by: ekexium <eke@fastmail.com> * update tidb dependency Signed-off-by: ekexium <eke@fastmail.com> update tidb dependency Signed-off-by: ekexium <eke@fastmail.com> * fix test Signed-off-by: ekexium <eke@fastmail.com> * do no unset flag on read Signed-off-by: ekexium <eke@fastmail.com> * update tidb dependency Signed-off-by: ekexium <eke@fastmail.com> * update comment Signed-off-by: ekexium <eke@fastmail.com> Signed-off-by: ekexium <eke@fastmail.com> * add testcase Signed-off-by: TonsnakeLin <lpbgytong@163.com> * add test case Signed-off-by: TonsnakeLin <lpbgytong@163.com> * unset pk if lockifexits failed Signed-off-by: TonsnakeLin <lpbgytong@163.com> * fix format Signed-off-by: TonsnakeLin <lpbgytong@163.com> * LockOnlyIfExists only when pk selected Signed-off-by: TonsnakeLin <lpbgytong@163.com> * remove test function to txn_probe Signed-off-by: TonsnakeLin <lpbgytong@163.com> * add more info to error Signed-off-by: TonsnakeLin <lpbgytong@163.com> * add more info to error Signed-off-by: TonsnakeLin <lpbgytong@163.com> * add protection for input key Signed-off-by: TonsnakeLin <lpbgytong@163.com> Signed-off-by: TonsnakeLin <lpbgytong@163.com> Signed-off-by: dongxu <i@huangdx.net> Signed-off-by: cfzjywxk <lsswxrxr@163.com> Signed-off-by: ekexium <eke@fastmail.com> Co-authored-by: MyonKeminta <9948422+MyonKeminta@users.noreply.github.com> Co-authored-by: dongxu <i@huangdx.net> Co-authored-by: cfzjywxk <lsswxrxr@163.com> Co-authored-by: ekexium <eke@fastmail.com> Co-authored-by: Yilin Chen <sticnarf@gmail.com>
This commit is contained in:
parent
87f5f61897
commit
fed87c9939
|
@ -35,6 +35,7 @@
|
|||
package error
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
|
@ -238,10 +239,36 @@ type ErrAssertionFailed struct {
|
|||
*kvrpcpb.AssertionFailed
|
||||
}
|
||||
|
||||
// ErrLockOnlyIfExistsNoReturnValue is used when the flag `LockOnlyIfExists` of `LockCtx` is set, but `ReturnValues`` is not.
|
||||
type ErrLockOnlyIfExistsNoReturnValue struct {
|
||||
StartTS uint64
|
||||
ForUpdateTs uint64
|
||||
LockKey []byte
|
||||
}
|
||||
|
||||
// ErrLockOnlyIfExistsNoPrimaryKey is used when the flag `LockOnlyIfExists` of `LockCtx` is set, but primary key of current transaction is not.
|
||||
type ErrLockOnlyIfExistsNoPrimaryKey struct {
|
||||
StartTS uint64
|
||||
ForUpdateTs uint64
|
||||
LockKey []byte
|
||||
}
|
||||
|
||||
func (e *ErrAssertionFailed) Error() string {
|
||||
return fmt.Sprintf("assertion failed { %s }", e.AssertionFailed.String())
|
||||
}
|
||||
|
||||
func (e *ErrLockOnlyIfExistsNoReturnValue) Error() string {
|
||||
return fmt.Sprintf("LockOnlyIfExists is set for Lock Context, but ReturnValues is not set, "+
|
||||
"StartTs is {%d}, ForUpdateTs is {%d}, one of lock keys is {%v}.",
|
||||
e.StartTS, e.ForUpdateTs, hex.EncodeToString(e.LockKey))
|
||||
}
|
||||
|
||||
func (e *ErrLockOnlyIfExistsNoPrimaryKey) Error() string {
|
||||
return fmt.Sprintf("LockOnlyIfExists is set for Lock Context, but primary key of current transaction is not set, "+
|
||||
"StartTs is {%d}, ForUpdateTs is {%d}, one of lock keys is {%s}",
|
||||
e.StartTS, e.ForUpdateTs, hex.EncodeToString(e.LockKey))
|
||||
}
|
||||
|
||||
// ExtractKeyErr extracts a KeyError.
|
||||
func ExtractKeyErr(keyErr *kvrpcpb.KeyError) error {
|
||||
if val, err := util.EvalFailpoint("mockRetryableErrorResp"); err == nil {
|
||||
|
|
|
@ -748,6 +748,191 @@ func (s *testCommitterSuite) TestPessimisticLockReturnValues() {
|
|||
s.Equal(lockCtx.Values[string(key2)].Value, key2)
|
||||
}
|
||||
|
||||
func lockOneKey(s *testCommitterSuite, txn transaction.TxnProbe, key []byte) {
|
||||
lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
|
||||
s.Nil(txn.LockKeys(context.Background(), lockCtx, key))
|
||||
}
|
||||
|
||||
func getLockOnlyIfExistsCtx(txn transaction.TxnProbe, keyCount int) *kv.LockCtx {
|
||||
lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
|
||||
lockCtx.InitReturnValues(keyCount)
|
||||
lockCtx.LockOnlyIfExists = true
|
||||
return lockCtx
|
||||
}
|
||||
|
||||
func checkLockKeyResult(s *testCommitterSuite, txn transaction.TxnProbe, lockCtx *kv.LockCtx,
|
||||
key []byte, value []byte, lockCtxValLen int, primaryKey []byte) {
|
||||
s.Len(lockCtx.Values, lockCtxValLen)
|
||||
if value != nil {
|
||||
s.Equal(lockCtx.Values[string(key)].Value, key)
|
||||
} else {
|
||||
s.Equal(lockCtx.Values[string(key)].Exists, false)
|
||||
}
|
||||
s.Equal(txn.GetCommitter().GetPrimaryKey(), primaryKey)
|
||||
}
|
||||
|
||||
func getMembufferFlags(s *testCommitterSuite, txn transaction.TxnProbe, key []byte, errStr string) kv.KeyFlags {
|
||||
memBuf := txn.GetMemBuffer()
|
||||
flags, err := memBuf.GetFlags(key)
|
||||
if len(errStr) != 0 {
|
||||
s.Equal(err.Error(), errStr)
|
||||
} else {
|
||||
s.Nil(err)
|
||||
}
|
||||
return flags
|
||||
}
|
||||
|
||||
func (s *testCommitterSuite) TestPessimisticLockIfExists() {
|
||||
key0 := []byte("jkey")
|
||||
key := []byte("key")
|
||||
key2 := []byte("key2")
|
||||
key3 := []byte("key3")
|
||||
txn := s.begin()
|
||||
s.Nil(txn.Set(key, key))
|
||||
s.Nil(txn.Set(key3, key3))
|
||||
s.Nil(txn.Commit(context.Background()))
|
||||
|
||||
// Lcoked "key" successfully.
|
||||
txn = s.begin()
|
||||
txn.SetPessimistic(true)
|
||||
lockOneKey(s, txn, key0)
|
||||
lockCtx := getLockOnlyIfExistsCtx(txn, 1)
|
||||
s.Nil(txn.LockKeys(context.Background(), lockCtx, key))
|
||||
checkLockKeyResult(s, txn, lockCtx, key, key, 1, key0)
|
||||
flags := getMembufferFlags(s, txn, key, "")
|
||||
s.Equal(flags.HasLockedValueExists(), true)
|
||||
s.Equal(txn.GetLcokedCount(), 2)
|
||||
s.Nil(txn.Rollback())
|
||||
|
||||
// Locked "key2" unsuccessfully.
|
||||
txn = s.begin()
|
||||
txn.SetPessimistic(true)
|
||||
lockOneKey(s, txn, key0)
|
||||
lockCtx = getLockOnlyIfExistsCtx(txn, 1)
|
||||
s.Nil(txn.LockKeys(context.Background(), lockCtx, key2))
|
||||
checkLockKeyResult(s, txn, lockCtx, key2, nil, 1, key0)
|
||||
flags = getMembufferFlags(s, txn, key, "not exist")
|
||||
s.Equal(txn.GetLcokedCount(), 1)
|
||||
s.Nil(txn.Rollback())
|
||||
|
||||
// Lock order is key, key2, key3.
|
||||
txn = s.begin()
|
||||
txn.SetPessimistic(true)
|
||||
lockOneKey(s, txn, key0)
|
||||
lockCtx = getLockOnlyIfExistsCtx(txn, 3)
|
||||
s.Nil(txn.LockKeys(context.Background(), lockCtx, key, key2, key3))
|
||||
s.Len(lockCtx.Values, 3)
|
||||
s.Equal(lockCtx.Values[string(key)].Value, key)
|
||||
s.Equal(lockCtx.Values[string(key2)].Exists, false)
|
||||
s.Equal(lockCtx.Values[string(key3)].Value, key3)
|
||||
s.Equal(txn.GetCommitter().GetPrimaryKey(), key0)
|
||||
memBuf := txn.GetMemBuffer()
|
||||
flags, err := memBuf.GetFlags(key)
|
||||
s.Equal(flags.HasLockedValueExists(), true)
|
||||
flags, err = memBuf.GetFlags(key2)
|
||||
s.Equal(err.Error(), "not exist")
|
||||
flags, err = memBuf.GetFlags(key3)
|
||||
s.Equal(flags.HasLockedValueExists(), true)
|
||||
s.Equal(txn.GetLcokedCount(), 3)
|
||||
s.Nil(txn.Rollback())
|
||||
|
||||
// Lock order is key2, key, key3.
|
||||
txn = s.begin()
|
||||
txn.SetPessimistic(true)
|
||||
lockOneKey(s, txn, key0)
|
||||
lockCtx = getLockOnlyIfExistsCtx(txn, 3)
|
||||
s.Nil(txn.LockKeys(context.Background(), lockCtx, key2, key, key3))
|
||||
s.Len(lockCtx.Values, 3)
|
||||
s.Equal(lockCtx.Values[string(key2)].Exists, false)
|
||||
s.Equal(lockCtx.Values[string(key)].Value, key)
|
||||
s.Equal(lockCtx.Values[string(key3)].Value, key3)
|
||||
s.Equal(txn.GetCommitter().GetPrimaryKey(), key0) // key is sorted in LockKeys()
|
||||
memBuf = txn.GetMemBuffer()
|
||||
flags, err = memBuf.GetFlags(key)
|
||||
s.Equal(flags.HasLockedValueExists(), true)
|
||||
flags, err = memBuf.GetFlags(key2)
|
||||
s.Equal(err.Error(), "not exist")
|
||||
flags, err = memBuf.GetFlags(key3)
|
||||
s.Equal(flags.HasLockedValueExists(), true)
|
||||
s.Equal(txn.GetLcokedCount(), 3)
|
||||
s.Nil(txn.Commit(context.Background()))
|
||||
|
||||
// LockKeys(key2), LockKeys(key3, key).
|
||||
txn = s.begin()
|
||||
txn.SetPessimistic(true)
|
||||
lockOneKey(s, txn, key0)
|
||||
lockCtx = getLockOnlyIfExistsCtx(txn, 1)
|
||||
s.Nil(txn.LockKeys(context.Background(), lockCtx, key2))
|
||||
lockCtx = getLockOnlyIfExistsCtx(txn, 2)
|
||||
s.Nil(txn.LockKeys(context.Background(), lockCtx, key3, key))
|
||||
s.Equal(lockCtx.Values[string(key3)].Value, key3)
|
||||
s.Equal(txn.GetCommitter().GetPrimaryKey(), key0)
|
||||
memBuf = txn.GetMemBuffer()
|
||||
flags, err = memBuf.GetFlags(key)
|
||||
s.Equal(flags.HasLockedValueExists(), true)
|
||||
flags, err = memBuf.GetFlags(key2)
|
||||
s.Equal(err.Error(), "not exist")
|
||||
flags, err = memBuf.GetFlags(key3)
|
||||
s.Equal(flags.HasLockedValueExists(), true)
|
||||
s.Equal(txn.GetLcokedCount(), 3)
|
||||
s.Nil(txn.Commit(context.Background()))
|
||||
|
||||
// Lock order is key0, key, key3.
|
||||
txn = s.begin()
|
||||
txn.SetPessimistic(true)
|
||||
lockOneKey(s, txn, key0)
|
||||
lockCtx = getLockOnlyIfExistsCtx(txn, 3)
|
||||
s.Nil(txn.LockKeys(context.Background(), lockCtx, key0, key, key3))
|
||||
s.Len(lockCtx.Values, 3)
|
||||
key0Val, ok := lockCtx.Values[string(key0)]
|
||||
s.Equal(ok, true)
|
||||
s.Equal(key0Val.AlreadyLocked, true)
|
||||
s.Equal(key0Val.Exists, false)
|
||||
s.Equal(lockCtx.Values[string(key)].Value, key)
|
||||
s.Equal(lockCtx.Values[string(key3)].Value, key3)
|
||||
s.Equal(txn.GetCommitter().GetPrimaryKey(), key0)
|
||||
memBuf = txn.GetMemBuffer()
|
||||
flags, err = memBuf.GetFlags(key)
|
||||
s.Equal(flags.HasLockedValueExists(), true)
|
||||
flags, err = memBuf.GetFlags(key0)
|
||||
s.Equal(true, flags.HasLockedValueExists()) // in fact, there is no value
|
||||
s.Equal(flags.HasLocked(), true)
|
||||
flags, err = memBuf.GetFlags(key3)
|
||||
s.Equal(flags.HasLockedValueExists(), true)
|
||||
s.Equal(txn.GetLcokedCount(), 3)
|
||||
s.Nil(txn.Commit(context.Background()))
|
||||
|
||||
// When the primary key is not selected, it can't send a lock request with LockOnlyIfExists mode
|
||||
txn = s.begin()
|
||||
txn.SetPessimistic(true)
|
||||
lockCtx = getLockOnlyIfExistsCtx(txn, 1)
|
||||
err = txn.LockKeys(context.Background(), lockCtx, key)
|
||||
err, ok = err.(*tikverr.ErrLockOnlyIfExistsNoPrimaryKey)
|
||||
s.Equal(ok, true)
|
||||
s.Nil(txn.Rollback())
|
||||
|
||||
// When LockOnlyIfExists is true, ReturnValue must be true too.
|
||||
txn = s.begin()
|
||||
txn.SetPessimistic(true)
|
||||
lockOneKey(s, txn, key)
|
||||
lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
|
||||
lockCtx.LockOnlyIfExists = true
|
||||
err = txn.LockKeys(context.Background(), lockCtx, key2)
|
||||
err, ok = err.(*tikverr.ErrLockOnlyIfExistsNoReturnValue)
|
||||
s.Equal(ok, true)
|
||||
s.Nil(txn.Rollback())
|
||||
|
||||
txn = s.begin()
|
||||
txn.SetPessimistic(true)
|
||||
lockOneKey(s, txn, key)
|
||||
lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
|
||||
lockCtx.LockOnlyIfExists = true
|
||||
err = txn.LockKeys(context.Background(), lockCtx)
|
||||
err, ok = err.(*tikverr.ErrLockOnlyIfExistsNoReturnValue)
|
||||
s.Equal(ok, false)
|
||||
s.Nil(txn.Rollback())
|
||||
}
|
||||
|
||||
func (s *testCommitterSuite) TestPessimisticLockCheckExistence() {
|
||||
key := []byte("key")
|
||||
key2 := []byte("key2")
|
||||
|
|
|
@ -532,10 +532,11 @@ type lockCtx struct {
|
|||
ttl uint64
|
||||
minCommitTs uint64
|
||||
|
||||
returnValues bool
|
||||
checkExistence bool
|
||||
values [][]byte
|
||||
keyNotFound []bool
|
||||
returnValues bool
|
||||
checkExistence bool
|
||||
values [][]byte
|
||||
keyNotFound []bool
|
||||
LockOnlyIfExists bool
|
||||
}
|
||||
|
||||
// PessimisticLock writes the pessimistic lock.
|
||||
|
@ -545,13 +546,14 @@ func (mvcc *MVCCLevelDB) PessimisticLock(req *kvrpcpb.PessimisticLockRequest) *k
|
|||
defer mvcc.mu.Unlock()
|
||||
mutations := req.Mutations
|
||||
lCtx := &lockCtx{
|
||||
startTS: req.StartVersion,
|
||||
forUpdateTS: req.ForUpdateTs,
|
||||
primary: req.PrimaryLock,
|
||||
ttl: req.LockTtl,
|
||||
minCommitTs: req.MinCommitTs,
|
||||
returnValues: req.ReturnValues,
|
||||
checkExistence: req.CheckExistence,
|
||||
startTS: req.StartVersion,
|
||||
forUpdateTS: req.ForUpdateTs,
|
||||
primary: req.PrimaryLock,
|
||||
ttl: req.LockTtl,
|
||||
minCommitTs: req.MinCommitTs,
|
||||
returnValues: req.ReturnValues,
|
||||
checkExistence: req.CheckExistence,
|
||||
LockOnlyIfExists: req.LockOnlyIfExists,
|
||||
}
|
||||
lockWaitTime := req.WaitTimeout
|
||||
|
||||
|
@ -594,6 +596,11 @@ func (mvcc *MVCCLevelDB) PessimisticLock(req *kvrpcpb.PessimisticLockRequest) *k
|
|||
func (mvcc *MVCCLevelDB) pessimisticLockMutation(batch *leveldb.Batch, mutation *kvrpcpb.Mutation, lctx *lockCtx) error {
|
||||
startTS := lctx.startTS
|
||||
forUpdateTS := lctx.forUpdateTS
|
||||
|
||||
if lctx.LockOnlyIfExists && !lctx.returnValues {
|
||||
return errors.New("LockOnlyIfExists is set for LockKeys but ReturnValues is not set")
|
||||
}
|
||||
|
||||
startKey := mvccEncode(mutation.Key, lockVer)
|
||||
iter := newIterator(mvcc.getDB(""), &util.Range{
|
||||
Start: startKey,
|
||||
|
@ -635,6 +642,10 @@ func (mvcc *MVCCLevelDB) pessimisticLockMutation(batch *leveldb.Batch, mutation
|
|||
lctx.keyNotFound = append(lctx.keyNotFound, len(val) == 0)
|
||||
}
|
||||
|
||||
if lctx.LockOnlyIfExists && len(val) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
lock := mvccLock{
|
||||
startTS: startTS,
|
||||
primary: lctx.primary,
|
||||
|
|
1
kv/kv.go
1
kv/kv.go
|
@ -65,6 +65,7 @@ type LockCtx struct {
|
|||
LockKeysCount *int32
|
||||
ReturnValues bool
|
||||
CheckExistence bool
|
||||
LockOnlyIfExists bool
|
||||
Values map[string]ReturnedValue
|
||||
ValuesLock sync.Mutex
|
||||
LockExpired *uint32
|
||||
|
|
|
@ -97,15 +97,16 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
|
|||
mutations[i] = mut
|
||||
}
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdPessimisticLock, &kvrpcpb.PessimisticLockRequest{
|
||||
Mutations: mutations,
|
||||
PrimaryLock: c.primary(),
|
||||
StartVersion: c.startTS,
|
||||
ForUpdateTs: c.forUpdateTS,
|
||||
IsFirstLock: c.isFirstLock,
|
||||
WaitTimeout: action.LockWaitTime(),
|
||||
ReturnValues: action.ReturnValues,
|
||||
CheckExistence: action.CheckExistence,
|
||||
MinCommitTs: c.forUpdateTS + 1,
|
||||
Mutations: mutations,
|
||||
PrimaryLock: c.primary(),
|
||||
StartVersion: c.startTS,
|
||||
ForUpdateTs: c.forUpdateTS,
|
||||
IsFirstLock: c.isFirstLock,
|
||||
WaitTimeout: action.LockWaitTime(),
|
||||
ReturnValues: action.ReturnValues,
|
||||
CheckExistence: action.CheckExistence,
|
||||
MinCommitTs: c.forUpdateTS + 1,
|
||||
LockOnlyIfExists: action.LockOnlyIfExists,
|
||||
}, kvrpcpb.Context{
|
||||
Priority: c.priority,
|
||||
SyncLog: c.syncLog,
|
||||
|
@ -185,7 +186,9 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
|
|||
lockResp := resp.Resp.(*kvrpcpb.PessimisticLockResponse)
|
||||
keyErrs := lockResp.GetErrors()
|
||||
if len(keyErrs) == 0 {
|
||||
action.LockCtx.Stats.MergeReqDetails(reqDuration, batch.region.GetID(), sender.GetStoreAddr(), lockResp.ExecDetailsV2)
|
||||
if action.LockCtx.Stats != nil {
|
||||
action.LockCtx.Stats.MergeReqDetails(reqDuration, batch.region.GetID(), sender.GetStoreAddr(), lockResp.ExecDetailsV2)
|
||||
}
|
||||
|
||||
if batch.isPrimary {
|
||||
// After locking the primary key, we should protect the primary lock from expiring
|
||||
|
@ -245,7 +248,9 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
|
|||
resolveLockOpts := txnlock.ResolveLocksOptions{
|
||||
CallerStartTS: 0,
|
||||
Locks: locks,
|
||||
Detail: &action.LockCtx.Stats.ResolveLock,
|
||||
}
|
||||
if action.LockCtx.Stats != nil {
|
||||
resolveLockOpts.Detail = &action.LockCtx.Stats.ResolveLock
|
||||
}
|
||||
resolveLockRes, err := c.store.GetLockResolver().ResolveLocksWithOpts(bo, resolveLockOpts)
|
||||
if err != nil {
|
||||
|
|
|
@ -92,6 +92,11 @@ func (txn TxnProbe) GetStartTime() time.Time {
|
|||
return txn.startTime
|
||||
}
|
||||
|
||||
// GetLcokedCount is used for testcase
|
||||
func (txn TxnProbe) GetLcokedCount() int {
|
||||
return txn.lockedCnt
|
||||
}
|
||||
|
||||
func newTwoPhaseCommitterWithInit(txn *KVTxn, sessionID uint64) (*twoPhaseCommitter, error) {
|
||||
c, err := newTwoPhaseCommitter(txn, sessionID)
|
||||
if err != nil {
|
||||
|
|
|
@ -596,6 +596,7 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput
|
|||
// it before initiating an RPC request.
|
||||
ctx = interceptor.WithRPCInterceptor(ctx, txn.interceptor)
|
||||
}
|
||||
|
||||
ctx = context.WithValue(ctx, util.RequestSourceKey, *txn.RequestSource)
|
||||
// Exclude keys that are already locked.
|
||||
var err error
|
||||
|
@ -653,6 +654,25 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput
|
|||
if len(keys) == 0 {
|
||||
return nil
|
||||
}
|
||||
if lockCtx.LockOnlyIfExists {
|
||||
if !lockCtx.ReturnValues {
|
||||
return &tikverr.ErrLockOnlyIfExistsNoReturnValue{
|
||||
StartTS: txn.startTS,
|
||||
ForUpdateTs: lockCtx.ForUpdateTS,
|
||||
LockKey: keys[0],
|
||||
}
|
||||
}
|
||||
// It can't transform LockOnlyIfExists mode to normal mode. If so, it can add a lock to a key
|
||||
// which doesn't exist in tikv. TiDB should ensure that primary key must be set when it sends
|
||||
// a LockOnlyIfExists pessmistic lock request.
|
||||
if txn.committer == nil || txn.committer.primaryKey == nil {
|
||||
return &tikverr.ErrLockOnlyIfExistsNoPrimaryKey{
|
||||
StartTS: txn.startTS,
|
||||
ForUpdateTs: lockCtx.ForUpdateTS,
|
||||
LockKey: keys[0],
|
||||
}
|
||||
}
|
||||
}
|
||||
keys = deduplicateKeys(keys)
|
||||
checkedExistence := false
|
||||
if txn.IsPessimistic() && lockCtx.ForUpdateTS > 0 {
|
||||
|
@ -674,7 +694,6 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput
|
|||
txn.committer.primaryKey = keys[0]
|
||||
assignedPrimaryKey = true
|
||||
}
|
||||
|
||||
lockCtx.Stats = &util.LockKeysDetails{
|
||||
LockKeys: int32(len(keys)),
|
||||
ResolveLock: util.ResolveLockDetail{},
|
||||
|
@ -685,7 +704,7 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput
|
|||
// concurrently execute on multiple regions may lead to deadlock.
|
||||
txn.committer.isFirstLock = txn.lockedCnt == 0 && len(keys) == 1
|
||||
err = txn.committer.pessimisticLockMutations(bo, lockCtx, &PlainMutations{keys: keys})
|
||||
if bo.GetTotalSleep() > 0 {
|
||||
if lockCtx.Stats != nil && bo.GetTotalSleep() > 0 {
|
||||
atomic.AddInt64(&lockCtx.Stats.BackoffTime, int64(bo.GetTotalSleep())*int64(time.Millisecond))
|
||||
lockCtx.Stats.Mu.Lock()
|
||||
lockCtx.Stats.Mu.BackoffTypes = append(lockCtx.Stats.Mu.BackoffTypes, bo.GetTypes()...)
|
||||
|
@ -744,6 +763,7 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput
|
|||
checkedExistence = true
|
||||
}
|
||||
}
|
||||
skipedLockKeys := 0
|
||||
for _, key := range keys {
|
||||
valExists := tikv.SetKeyLockedValueExists
|
||||
// PointGet and BatchPointGet will return value in pessimistic lock response, the value may not exist.
|
||||
|
@ -758,14 +778,23 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
if lockCtx.LockOnlyIfExists && valExists == tikv.SetKeyLockedValueNotExists {
|
||||
skipedLockKeys++
|
||||
continue
|
||||
}
|
||||
memBuf.UpdateFlags(key, tikv.SetKeyLocked, tikv.DelNeedCheckExists, valExists)
|
||||
}
|
||||
txn.lockedCnt += len(keys)
|
||||
txn.lockedCnt += len(keys) - skipedLockKeys
|
||||
return nil
|
||||
}
|
||||
|
||||
// deduplicateKeys deduplicate the keys, it use sort instead of map to avoid memory allocation.
|
||||
func deduplicateKeys(keys [][]byte) [][]byte {
|
||||
if len(keys) == 1 {
|
||||
return keys
|
||||
}
|
||||
|
||||
sort.Slice(keys, func(i, j int) bool {
|
||||
return bytes.Compare(keys[i], keys[j]) < 0
|
||||
})
|
||||
|
|
Loading…
Reference in New Issue