fix: lock_if_exist + aggressive locking (#705)

* fix: locking commands skipped by lock_only_if_exists are mistakenly calculated as successfully locked for aggressive locking

Signed-off-by: ekexium <ekexium@gmail.com>

* refactor

Signed-off-by: ekexium <ekexium@gmail.com>

---------

Signed-off-by: ekexium <ekexium@gmail.com>
This commit is contained in:
ekexium 2023-02-20 09:50:30 +08:00 committed by GitHub
parent b3c491995e
commit 2051f572a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 22 additions and 20 deletions

View File

@ -227,7 +227,7 @@ func (txn *KVTxn) Set(k []byte, v []byte) error {
// String implements fmt.Stringer interface. // String implements fmt.Stringer interface.
func (txn *KVTxn) String() string { func (txn *KVTxn) String() string {
res := fmt.Sprintf("%d", txn.StartTS()) res := fmt.Sprintf("%d", txn.StartTS())
if txn.aggressiveLockingContext != nil { if txn.IsInAggressiveLockingMode() {
res += fmt.Sprintf(" (aggressiveLocking: prev %d keys, current %d keys)", len(txn.aggressiveLockingContext.lastRetryUnnecessaryLocks), len(txn.aggressiveLockingContext.currentLockedKeys)) res += fmt.Sprintf(" (aggressiveLocking: prev %d keys, current %d keys)", len(txn.aggressiveLockingContext.lastRetryUnnecessaryLocks), len(txn.aggressiveLockingContext.currentLockedKeys))
} }
return res return res
@ -413,7 +413,7 @@ func (txn *KVTxn) Commit(ctx context.Context) error {
ctx = context.WithValue(ctx, util.RequestSourceKey, *txn.RequestSource) ctx = context.WithValue(ctx, util.RequestSourceKey, *txn.RequestSource)
if txn.aggressiveLockingContext != nil { if txn.IsInAggressiveLockingMode() {
if len(txn.aggressiveLockingContext.currentLockedKeys) != 0 { if len(txn.aggressiveLockingContext.currentLockedKeys) != 0 {
return errors.New("trying to commit transaction when aggressive locking is pending") return errors.New("trying to commit transaction when aggressive locking is pending")
} }
@ -536,7 +536,7 @@ func (txn *KVTxn) Rollback() error {
return tikverr.ErrInvalidTxn return tikverr.ErrInvalidTxn
} }
if txn.aggressiveLockingContext != nil { if txn.IsInAggressiveLockingMode() {
if len(txn.aggressiveLockingContext.currentLockedKeys) != 0 { if len(txn.aggressiveLockingContext.currentLockedKeys) != 0 {
txn.close() txn.close()
return errors.New("trying to rollback transaction when aggressive locking is pending") return errors.New("trying to rollback transaction when aggressive locking is pending")
@ -656,7 +656,7 @@ func (txn *KVTxn) LockKeysWithWaitTime(ctx context.Context, lockWaitTime int64,
// invocations that involves only one key, the pessimistic lock request will be performed in ForceLock mode // invocations that involves only one key, the pessimistic lock request will be performed in ForceLock mode
// (kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock). // (kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock).
func (txn *KVTxn) StartAggressiveLocking() { func (txn *KVTxn) StartAggressiveLocking() {
if txn.aggressiveLockingContext != nil { if txn.IsInAggressiveLockingMode() {
panic("Trying to start aggressive locking while it's already started") panic("Trying to start aggressive locking while it's already started")
} }
txn.aggressiveLockingContext = &aggressiveLockingContext{ txn.aggressiveLockingContext = &aggressiveLockingContext{
@ -751,7 +751,7 @@ func (txn *KVTxn) IsInAggressiveLockingMode() bool {
// IsInAggressiveLockingStage checks if a key is locked during the current aggressive locking stage. // IsInAggressiveLockingStage checks if a key is locked during the current aggressive locking stage.
func (txn *KVTxn) IsInAggressiveLockingStage(key []byte) bool { func (txn *KVTxn) IsInAggressiveLockingStage(key []byte) bool {
if txn.aggressiveLockingContext != nil { if txn.IsInAggressiveLockingMode() {
_, ok := txn.aggressiveLockingContext.currentLockedKeys[string(key)] _, ok := txn.aggressiveLockingContext.currentLockedKeys[string(key)]
return ok return ok
} }
@ -904,7 +904,7 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func()
} }
}() }()
if !txn.IsPessimistic() && txn.aggressiveLockingContext != nil { if !txn.IsPessimistic() && txn.IsInAggressiveLockingMode() {
return errors.New("trying to perform aggressive locking in optimistic transaction") return errors.New("trying to perform aggressive locking in optimistic transaction")
} }
@ -921,7 +921,7 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func()
} }
// If the key is locked in the current aggressive locking stage, override the information in memBuf. // If the key is locked in the current aggressive locking stage, override the information in memBuf.
isInLastAggressiveLockingStage := false isInLastAggressiveLockingStage := false
if txn.aggressiveLockingContext != nil { if txn.IsInAggressiveLockingMode() {
if entry, ok := txn.aggressiveLockingContext.currentLockedKeys[string(key)]; ok { if entry, ok := txn.aggressiveLockingContext.currentLockedKeys[string(key)]; ok {
locked = true locked = true
valueExist = entry.Value.Exists valueExist = entry.Value.Exists
@ -1008,9 +1008,9 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func()
ResolveLock: util.ResolveLockDetail{}, ResolveLock: util.ResolveLockDetail{},
} }
// If aggressive locking is enabled and we don't need to update the primary for all locks, we can avoid sending // If aggressive locking is enabled, and we don't need to update the primary for all locks, we can avoid sending
// RPC to those already locked keys. // RPC to those already locked keys.
if txn.aggressiveLockingContext != nil { if txn.IsInAggressiveLockingMode() {
keys, err = txn.filterAggressiveLockedKeys(lockCtx, allKeys) keys, err = txn.filterAggressiveLockedKeys(lockCtx, allKeys)
if err != nil { if err != nil {
return err return err
@ -1028,7 +1028,7 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func()
} }
} }
if txn.aggressiveLockingContext != nil && len(keys) == 1 && !lockCtx.LockOnlyIfExists { if txn.IsInAggressiveLockingMode() && len(keys) == 1 && !lockCtx.LockOnlyIfExists {
lockWakeUpMode = kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock lockWakeUpMode = kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock
} }
bo := retry.NewBackofferWithVars(ctx, pessimisticLockMaxBackoff, txn.vars) bo := retry.NewBackofferWithVars(ctx, pessimisticLockMaxBackoff, txn.vars)
@ -1048,7 +1048,7 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func()
// We need to reset the killed flag here. // We need to reset the killed flag here.
atomic.CompareAndSwapUint32(lockCtx.Killed, 1, 0) atomic.CompareAndSwapUint32(lockCtx.Killed, 1, 0)
} }
if txn.aggressiveLockingContext != nil { if txn.IsInAggressiveLockingMode() {
if txn.aggressiveLockingContext.maxLockedWithConflictTS < lockCtx.MaxLockedWithConflictTS { if txn.aggressiveLockingContext.maxLockedWithConflictTS < lockCtx.MaxLockedWithConflictTS {
txn.aggressiveLockingContext.maxLockedWithConflictTS = lockCtx.MaxLockedWithConflictTS txn.aggressiveLockingContext.maxLockedWithConflictTS = lockCtx.MaxLockedWithConflictTS
} }
@ -1089,7 +1089,7 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func()
} }
wg := txn.asyncPessimisticRollback(ctx, allKeys, rollbackForUpdateTS) wg := txn.asyncPessimisticRollback(ctx, allKeys, rollbackForUpdateTS)
txn.lockedCnt -= len(allKeys) - len(keys) txn.lockedCnt -= len(allKeys) - len(keys)
if txn.aggressiveLockingContext != nil { if txn.IsInAggressiveLockingMode() {
for _, k := range allKeys { for _, k := range allKeys {
delete(txn.aggressiveLockingContext.currentLockedKeys, string(k)) delete(txn.aggressiveLockingContext.currentLockedKeys, string(k))
} }
@ -1140,7 +1140,14 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func()
} }
} }
} }
if txn.aggressiveLockingContext != nil {
// note that lock_only_if_exists guarantees the response tells us whether the value exists
if lockCtx.LockOnlyIfExists && !valExists {
skippedLockKeys++
continue
}
if txn.IsInAggressiveLockingMode() {
txn.aggressiveLockingContext.currentLockedKeys[keyStr] = tempLockBufferEntry{ txn.aggressiveLockingContext.currentLockedKeys[keyStr] = tempLockBufferEntry{
HasReturnValue: lockCtx.ReturnValues, HasReturnValue: lockCtx.ReturnValues,
HasCheckExistence: lockCtx.CheckExistence, HasCheckExistence: lockCtx.CheckExistence,
@ -1152,18 +1159,13 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func()
if !valExists { if !valExists {
setValExists = tikv.SetKeyLockedValueNotExists setValExists = tikv.SetKeyLockedValueNotExists
} }
// TODO: Fix the calculation when aggressive-locking is active
if lockCtx.LockOnlyIfExists && !valExists {
skippedLockKeys++
continue
}
memBuf.UpdateFlags(key, tikv.SetKeyLocked, tikv.DelNeedCheckExists, setValExists) memBuf.UpdateFlags(key, tikv.SetKeyLocked, tikv.DelNeedCheckExists, setValExists)
} }
} }
// Update statistics information. // Update statistics information.
txn.lockedCnt += len(keys) - skippedLockKeys txn.lockedCnt += len(keys) - skippedLockKeys
if txn.aggressiveLockingContext != nil && lockCtx.Stats != nil { if txn.IsInAggressiveLockingMode() && lockCtx.Stats != nil {
txn.collectAggressiveLockingStats(lockCtx, len(keys), skippedLockKeys, filteredAggressiveLockedKeysCount, lockWakeUpMode) txn.collectAggressiveLockingStats(lockCtx, len(keys), skippedLockKeys, filteredAggressiveLockedKeysCount, lockWakeUpMode)
} }
return nil return nil
@ -1175,7 +1177,7 @@ func (txn *KVTxn) resetPrimary() {
} }
func (txn *KVTxn) selectPrimaryForPessimisticLock(sortedKeys [][]byte) { func (txn *KVTxn) selectPrimaryForPessimisticLock(sortedKeys [][]byte) {
if txn.aggressiveLockingContext != nil { if txn.IsInAggressiveLockingMode() {
lastPrimaryKey := txn.aggressiveLockingContext.lastPrimaryKey lastPrimaryKey := txn.aggressiveLockingContext.lastPrimaryKey
if lastPrimaryKey != nil { if lastPrimaryKey != nil {
foundIdx := sort.Search(len(sortedKeys), func(i int) bool { foundIdx := sort.Search(len(sortedKeys), func(i int) bool {