From 2ea84a6c5a8163d6d0a883d7b8f4f728bfcb6508 Mon Sep 17 00:00:00 2001 From: ekexium Date: Tue, 26 Dec 2023 14:54:34 +0800 Subject: [PATCH] feat: check kill signal in 2pc committer (#1084) Signed-off-by: ekexium Co-authored-by: disksing --- error/error.go | 4 ++-- txnkv/transaction/2pc.go | 22 +++++++++++++++++++++- txnkv/transaction/pessimistic.go | 12 ------------ txnkv/transaction/txn.go | 6 ------ 4 files changed, 23 insertions(+), 21 deletions(-) diff --git a/error/error.go b/error/error.go index 62a75ff6..3f10c211 100644 --- a/error/error.go +++ b/error/error.go @@ -64,13 +64,13 @@ var ( // ErrTiFlashServerTimeout is the error when tiflash server is timeout. ErrTiFlashServerTimeout = errors.New("tiflash server timeout") // ErrQueryInterrupted is the error when the query is interrupted. - ErrQueryInterrupted = errors.New("query interruppted") + ErrQueryInterrupted = errors.New("query interrupted") // ErrTiKVStaleCommand is the error that the command is stale in tikv. ErrTiKVStaleCommand = errors.New("tikv stale command") // ErrTiKVMaxTimestampNotSynced is the error that tikv's max timestamp is not synced. ErrTiKVMaxTimestampNotSynced = errors.New("tikv max timestamp not synced") // ErrLockAcquireFailAndNoWaitSet is the error that acquire the lock failed while no wait is setted. - ErrLockAcquireFailAndNoWaitSet = errors.New("lock acquired failed and no wait is setted") + ErrLockAcquireFailAndNoWaitSet = errors.New("lock acquired failed and no wait is set") // ErrResolveLockTimeout is the error that resolve lock timeout. ErrResolveLockTimeout = errors.New("resolve lock timeout") // ErrLockWaitTimeout is the error that wait for the lock is timeout. diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index f145e55e..866d320b 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -1048,7 +1048,27 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *retry.Backoffer, action } // doActionOnBatches does action to batches in parallel. -func (c *twoPhaseCommitter) doActionOnBatches(bo *retry.Backoffer, action twoPhaseCommitAction, batches []batchMutations) error { +func (c *twoPhaseCommitter) doActionOnBatches( + bo *retry.Backoffer, action twoPhaseCommitAction, + batches []batchMutations, +) error { + // killSignal should never be nil for TiDB + if c.txn != nil && c.txn.vars != nil && c.txn.vars.Killed != nil { + // Do not reset the killed flag here. Let the upper layer reset the flag. + // Before it resets, any request is considered valid to be killed. + status := atomic.LoadUint32(c.txn.vars.Killed) + if status != 0 { + logutil.BgLogger().Info( + "query is killed", zap.Uint32( + "signal", + status, + ), + ) + // TODO: There might be various signals besides a query interruption, + // but we are unable to differentiate them, because the definition is in TiDB. + return errors.WithStack(tikverr.ErrQueryInterrupted) + } + } if len(batches) == 0 { return nil } diff --git a/txnkv/transaction/pessimistic.go b/txnkv/transaction/pessimistic.go index 2f18bd11..d04c951c 100644 --- a/txnkv/transaction/pessimistic.go +++ b/txnkv/transaction/pessimistic.go @@ -214,18 +214,6 @@ func (action actionPessimisticLock) handleSingleBatch( return nil } } - - // Handle the killed flag when waiting for the pessimistic lock. - // When a txn runs into LockKeys() and backoff here, it has no chance to call - // executor.Next() and check the killed flag. - if action.Killed != nil { - // Do not reset the killed flag here! - // actionPessimisticLock runs on each region parallelly, we have to consider that - // the error may be dropped. - if atomic.LoadUint32(action.Killed) == 1 { - return errors.WithStack(tikverr.ErrQueryInterrupted) - } - } } } diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 63f5eafd..27e747e7 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -1111,12 +1111,6 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func() lockCtx.Stats.Mu.BackoffTypes = append(lockCtx.Stats.Mu.BackoffTypes, bo.GetTypes()...) lockCtx.Stats.Mu.Unlock() } - if lockCtx.Killed != nil { - // If the kill signal is received during waiting for pessimisticLock, - // pessimisticLockKeys would handle the error but it doesn't reset the flag. - // We need to reset the killed flag here. - atomic.CompareAndSwapUint32(lockCtx.Killed, 1, 0) - } if txn.IsInAggressiveLockingMode() { if txn.aggressiveLockingContext.maxLockedWithConflictTS < lockCtx.MaxLockedWithConflictTS { txn.aggressiveLockingContext.maxLockedWithConflictTS = lockCtx.MaxLockedWithConflictTS