For KeyIsLocked error reported for timeout, if a lock is recently updated, don't try to resolve it. (#758)

* update client-go; format

Signed-off-by: ekexium <eke@fastmail.com>

* feat: do not resolve lock if duration_to_last_updated is short

Signed-off-by: ekexium <eke@fastmail.com>

* adjust the threshold to 1200ms to allow small deviation

Signed-off-by: ekexium <eke@fastmail.com>

* fix: don't treat it as WriteConflict, simply retry

Signed-off-by: ekexium <eke@fastmail.com>

* update kvproto

Signed-off-by: ekexium <eke@fastmail.com>

* set the threshold to 300ms

Signed-off-by: ekexium <eke@fastmail.com>

---------

Signed-off-by: ekexium <eke@fastmail.com>
This commit is contained in:
ekexium 2023-04-13 16:14:28 +08:00 committed by GitHub
parent f3e87039d8
commit 67e56a956f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 110 additions and 47 deletions

View File

@ -100,7 +100,9 @@ type diagnosticContext struct {
reqDuration time.Duration
}
func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error {
func (action actionPessimisticLock) handleSingleBatch(
c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations,
) error {
convertMutationsToPb := func(committerMutations CommitterMutations) []*kvrpcpb.Mutation {
mutations := make([]*kvrpcpb.Mutation, committerMutations.Len())
c.txn.GetMemBuffer().RLock()
@ -120,26 +122,28 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
m := batch.mutations
mutations := convertMutationsToPb(m)
req := tikvrpc.NewRequest(tikvrpc.CmdPessimisticLock, &kvrpcpb.PessimisticLockRequest{
Mutations: mutations,
PrimaryLock: c.primary(),
StartVersion: c.startTS,
ForUpdateTs: c.forUpdateTS,
IsFirstLock: c.isFirstLock,
WaitTimeout: action.LockWaitTime(),
ReturnValues: action.ReturnValues,
CheckExistence: action.CheckExistence,
MinCommitTs: c.forUpdateTS + 1,
WakeUpMode: action.wakeUpMode,
LockOnlyIfExists: action.LockOnlyIfExists,
}, kvrpcpb.Context{
Priority: c.priority,
SyncLog: c.syncLog,
ResourceGroupTag: action.LockCtx.ResourceGroupTag,
MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds()),
RequestSource: c.txn.GetRequestSource(),
ResourceGroupName: c.resourceGroupName,
})
req := tikvrpc.NewRequest(
tikvrpc.CmdPessimisticLock, &kvrpcpb.PessimisticLockRequest{
Mutations: mutations,
PrimaryLock: c.primary(),
StartVersion: c.startTS,
ForUpdateTs: c.forUpdateTS,
IsFirstLock: c.isFirstLock,
WaitTimeout: action.LockWaitTime(),
ReturnValues: action.ReturnValues,
CheckExistence: action.CheckExistence,
MinCommitTs: c.forUpdateTS + 1,
WakeUpMode: action.wakeUpMode,
LockOnlyIfExists: action.LockOnlyIfExists,
}, kvrpcpb.Context{
Priority: c.priority,
SyncLog: c.syncLog,
ResourceGroupTag: action.LockCtx.ResourceGroupTag,
MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds()),
RequestSource: c.txn.GetRequestSource(),
ResourceGroupName: c.resourceGroupName,
},
)
if action.LockCtx.ResourceGroupTag == nil && action.LockCtx.ResourceGroupTagger != nil {
req.ResourceGroupTag = action.LockCtx.ResourceGroupTagger(req.Req.(*kvrpcpb.PessimisticLockRequest))
}
@ -168,8 +172,10 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
for _, m := range mutations {
keys = append(keys, hex.EncodeToString(m.Key))
}
logutil.BgLogger().Info("[failpoint] injected lock ttl = 1 on pessimistic lock",
zap.Uint64("txnStartTS", c.startTS), zap.Strings("keys", keys))
logutil.BgLogger().Info(
"[failpoint] injected lock ttl = 1 on pessimistic lock",
zap.Uint64("txnStartTS", c.startTS), zap.Strings("keys", keys),
)
}
req.PessimisticLock().LockTtl = ttl
if _, err := util.EvalFailpoint("PessimisticLockErrWriteConflict"); err == nil {
@ -221,7 +227,9 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
}
}
func (action actionPessimisticLock) handleRegionError(c *twoPhaseCommitter, bo *retry.Backoffer, batch *batchMutations, regionErr *errorpb.Error) (finished bool, err error) {
func (action actionPessimisticLock) handleRegionError(
c *twoPhaseCommitter, bo *retry.Backoffer, batch *batchMutations, regionErr *errorpb.Error,
) (finished bool, err error) {
// For other region error and the fake region error, backoff because
// there's something wrong.
// For the real EpochNotMatch error, don't backoff.
@ -242,7 +250,13 @@ func (action actionPessimisticLock) handleRegionError(c *twoPhaseCommitter, bo *
return true, err
}
func (action actionPessimisticLock) handleKeyError(c *twoPhaseCommitter, keyErrs []*kvrpcpb.KeyError) (locks []*txnlock.Lock, finished bool, err error) {
// When handling wait timeout, if the current lock is updated within the threshold, do not try to resolve lock
// The default timeout in TiKV is 1 second. 300ms should be appropriate for common hot update workloads.
const skipResolveThresholdMs = 300
func (action actionPessimisticLock) handleKeyErrorForResolve(
c *twoPhaseCommitter, keyErrs []*kvrpcpb.KeyError,
) (locks []*txnlock.Lock, finished bool, err error) {
for _, keyErr := range keyErrs {
// Check already exists error
if alreadyExist := keyErr.GetAlreadyExist(); alreadyExist != nil {
@ -253,6 +267,15 @@ func (action actionPessimisticLock) handleKeyError(c *twoPhaseCommitter, keyErrs
return nil, true, errors.WithStack(&tikverr.ErrDeadlock{Deadlock: deadlock})
}
// Do not resolve the lock if the lock was recently updated which indicates the txn holding the lock is
// much likely alive.
// This should only happen for wait timeout.
if lockInfo := keyErr.GetLocked(); lockInfo != nil &&
lockInfo.DurationToLastUpdateMs > 0 &&
lockInfo.DurationToLastUpdateMs < skipResolveThresholdMs {
continue
}
// Extract lock from key error
lock, err1 := txnlock.ExtractLockFromKeyErr(keyErr)
if err1 != nil {
@ -260,10 +283,16 @@ func (action actionPessimisticLock) handleKeyError(c *twoPhaseCommitter, keyErrs
}
locks = append(locks, lock)
}
if len(locks) == 0 {
return nil, false, nil
}
return locks, false, nil
}
func (action actionPessimisticLock) handlePessimisticLockResponseNormalMode(c *twoPhaseCommitter, bo *retry.Backoffer, batch *batchMutations, mutationsPb []*kvrpcpb.Mutation, resp *tikvrpc.Response, diagCtx *diagnosticContext) (finished bool, err error) {
func (action actionPessimisticLock) handlePessimisticLockResponseNormalMode(
c *twoPhaseCommitter, bo *retry.Backoffer, batch *batchMutations, mutationsPb []*kvrpcpb.Mutation,
resp *tikvrpc.Response, diagCtx *diagnosticContext,
) (finished bool, err error) {
regionErr, err := resp.GetRegionError()
if err != nil {
return true, err
@ -283,7 +312,12 @@ func (action actionPessimisticLock) handlePessimisticLockResponseNormalMode(c *t
if len(keyErrs) == 0 {
if action.LockCtx.Stats != nil {
action.LockCtx.Stats.MergeReqDetails(diagCtx.reqDuration, batch.region.GetID(), diagCtx.sender.GetStoreAddr(), lockResp.ExecDetailsV2)
action.LockCtx.Stats.MergeReqDetails(
diagCtx.reqDuration,
batch.region.GetID(),
diagCtx.sender.GetStoreAddr(),
lockResp.ExecDetailsV2,
)
}
if batch.isPrimary {
@ -314,10 +348,14 @@ func (action actionPessimisticLock) handlePessimisticLockResponseNormalMode(c *t
}
return true, nil
}
locks, finished, err := action.handleKeyError(c, keyErrs)
locks, finished, err := action.handleKeyErrorForResolve(c, keyErrs)
if err != nil {
return finished, err
}
if len(locks) == 0 {
return false, nil
}
// Because we already waited on tikv, no need to Backoff here.
// tikv default will wait 3s(also the maximum wait value) when lock error occurs
@ -360,7 +398,10 @@ func (action actionPessimisticLock) handlePessimisticLockResponseNormalMode(c *t
return false, nil
}
func (action actionPessimisticLock) handlePessimisticLockResponseForceLockMode(c *twoPhaseCommitter, bo *retry.Backoffer, batch *batchMutations, mutationsPb []*kvrpcpb.Mutation, resp *tikvrpc.Response, diagCtx *diagnosticContext) (finished bool, err error) {
func (action actionPessimisticLock) handlePessimisticLockResponseForceLockMode(
c *twoPhaseCommitter, bo *retry.Backoffer, batch *batchMutations, mutationsPb []*kvrpcpb.Mutation,
resp *tikvrpc.Response, diagCtx *diagnosticContext,
) (finished bool, err error) {
regionErr, err := resp.GetRegionError()
if err != nil {
return true, err
@ -376,7 +417,9 @@ func (action actionPessimisticLock) handlePessimisticLockResponseForceLockMode(c
if len(mutationsPb) > 1 || len(lockResp.Results) > 1 {
panic("unreachable")
}
if batch.isPrimary && len(lockResp.Results) > 0 && lockResp.Results[0].Type != kvrpcpb.PessimisticLockKeyResultType_LockResultFailed {
if batch.isPrimary &&
len(lockResp.Results) > 0 &&
lockResp.Results[0].Type != kvrpcpb.PessimisticLockKeyResultType_LockResultFailed {
// After locking the primary key, we should protect the primary lock from expiring.
c.run(c, action.LockCtx)
}
@ -422,11 +465,16 @@ func (action actionPessimisticLock) handlePessimisticLockResponseForceLockMode(c
if len(lockResp.Results) > 0 && !isMutationFailed {
if action.LockCtx.Stats != nil {
action.LockCtx.Stats.MergeReqDetails(diagCtx.reqDuration, batch.region.GetID(), diagCtx.sender.GetStoreAddr(), lockResp.ExecDetailsV2)
action.LockCtx.Stats.MergeReqDetails(
diagCtx.reqDuration,
batch.region.GetID(),
diagCtx.sender.GetStoreAddr(),
lockResp.ExecDetailsV2,
)
}
}
locks, finished, err := action.handleKeyError(c, keyErrs)
locks, finished, err := action.handleKeyErrorForResolve(c, keyErrs)
if err != nil {
return finished, err
}
@ -477,9 +525,9 @@ func (action actionPessimisticLock) handlePessimisticLockResponseForceLockMode(c
return false, nil
}
// If the failedMutations is not empty and the error is not KeyIsLocked, the function should have already
// returned before. So this is an unreachable path.
return true, errors.New("Pessimistic lock response corrupted")
// This can be the situation where KeyIsLocked errors are generated by timeout,
// and we decide not to resolve them. Instead, just retry
return false, nil
}
if len(locks) != 0 {
@ -497,16 +545,20 @@ func (action actionPessimisticLock) handlePessimisticLockResponseForceLockMode(c
return true, nil
}
func (actionPessimisticRollback) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error {
func (actionPessimisticRollback) handleSingleBatch(
c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations,
) error {
forUpdateTS := c.forUpdateTS
if c.maxLockedWithConflictTS > forUpdateTS {
forUpdateTS = c.maxLockedWithConflictTS
}
req := tikvrpc.NewRequest(tikvrpc.CmdPessimisticRollback, &kvrpcpb.PessimisticRollbackRequest{
StartVersion: c.startTS,
ForUpdateTs: forUpdateTS,
Keys: batch.mutations.GetKeys(),
})
req := tikvrpc.NewRequest(
tikvrpc.CmdPessimisticRollback, &kvrpcpb.PessimisticRollbackRequest{
StartVersion: c.startTS,
ForUpdateTs: forUpdateTS,
Keys: batch.mutations.GetKeys(),
},
)
req.RequestSource = util.RequestSourceFromCtx(bo.GetCtx())
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
resp, err := c.store.SendReq(bo, req, batch.region, client.ReadTimeoutShort)
@ -528,7 +580,10 @@ func (actionPessimisticRollback) handleSingleBatch(c *twoPhaseCommitter, bo *ret
return nil
}
func (c *twoPhaseCommitter) pessimisticLockMutations(bo *retry.Backoffer, lockCtx *kv.LockCtx, lockWaitMode kvrpcpb.PessimisticLockWakeUpMode, mutations CommitterMutations) error {
func (c *twoPhaseCommitter) pessimisticLockMutations(
bo *retry.Backoffer, lockCtx *kv.LockCtx, lockWaitMode kvrpcpb.PessimisticLockWakeUpMode,
mutations CommitterMutations,
) error {
if c.sessionID > 0 {
if val, err := util.EvalFailpoint("beforePessimisticLock"); err == nil {
// Pass multiple instructions in one string, delimited by commas, to trigger multiple behaviors, like
@ -537,19 +592,27 @@ func (c *twoPhaseCommitter) pessimisticLockMutations(bo *retry.Backoffer, lockCt
for _, action := range strings.Split(v, ",") {
if action == "delay" {
duration := time.Duration(rand.Int63n(int64(time.Second) * 5))
logutil.Logger(bo.GetCtx()).Info("[failpoint] injected delay at pessimistic lock",
zap.Uint64("txnStartTS", c.startTS), zap.Duration("duration", duration))
logutil.Logger(bo.GetCtx()).Info(
"[failpoint] injected delay at pessimistic lock",
zap.Uint64("txnStartTS", c.startTS), zap.Duration("duration", duration),
)
time.Sleep(duration)
} else if action == "fail" {
logutil.Logger(bo.GetCtx()).Info("[failpoint] injected failure at pessimistic lock",
zap.Uint64("txnStartTS", c.startTS))
logutil.Logger(bo.GetCtx()).Info(
"[failpoint] injected failure at pessimistic lock",
zap.Uint64("txnStartTS", c.startTS),
)
return errors.New("injected failure at pessimistic lock")
}
}
}
}
}
return c.doActionOnMutations(bo, actionPessimisticLock{LockCtx: lockCtx, wakeUpMode: lockWaitMode, isInternal: c.txn.isInternal()}, mutations)
return c.doActionOnMutations(
bo,
actionPessimisticLock{LockCtx: lockCtx, wakeUpMode: lockWaitMode, isInternal: c.txn.isInternal()},
mutations,
)
}
func (c *twoPhaseCommitter) pessimisticRollbackMutations(bo *retry.Backoffer, mutations CommitterMutations) error {