From 70c148e84e94befab85875f7306fd805281b5197 Mon Sep 17 00:00:00 2001 From: ekexium Date: Fri, 2 Feb 2024 10:26:02 +0800 Subject: [PATCH] ErrQueryInterrupted with parameters (#1124) * feat: ErrQueryInterrupted with parameters Signed-off-by: ekexium * Revert "Revert "fix: check kill signal against 0 (#1102)" (#1129)" This reverts commit 3480b5ed7ce131ed39f2e6ffcdec603f4166e225. Signed-off-by: ekexium --------- Signed-off-by: ekexium Co-authored-by: cfzjywxk --- config/retry/backoff.go | 21 +++++++++++++++++---- error/error.go | 11 ++++++++++- integration_tests/2pc_test.go | 10 ++++++++++ internal/client/retry/backoff.go | 27 +++++++++++++++++++++------ internal/locate/region_request.go | 5 ++--- kv/variables.go | 4 ++++ txnkv/transaction/2pc.go | 2 +- 7 files changed, 65 insertions(+), 15 deletions(-) diff --git a/config/retry/backoff.go b/config/retry/backoff.go index a2723e05..c18577ad 100644 --- a/config/retry/backoff.go +++ b/config/retry/backoff.go @@ -217,10 +217,9 @@ func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err e atomic.AddInt64(&detail.BackoffCount, 1) } - if b.vars != nil && b.vars.Killed != nil { - if atomic.LoadUint32(b.vars.Killed) == 1 { - return errors.WithStack(tikverr.ErrQueryInterrupted) - } + err2 := b.CheckKilled() + if err2 != nil { + return err2 } var startTs interface{} @@ -382,3 +381,17 @@ func (b *Backoffer) longestSleepCfg() (*Config, int) { } return nil, 0 } + +func (b *Backoffer) CheckKilled() error { + if b.vars != nil && b.vars.Killed != nil { + killed := atomic.LoadUint32(b.vars.Killed) + if killed != 0 { + logutil.BgLogger().Info( + "backoff stops because a killed signal is received", + zap.Uint32("signal", killed), + ) + return errors.WithStack(tikverr.ErrQueryInterruptedWithSignal{Signal: killed}) + } + } + return nil +} diff --git a/error/error.go b/error/error.go index 3f10c211..14f29a44 100644 --- a/error/error.go +++ b/error/error.go @@ -64,6 +64,7 @@ var ( // ErrTiFlashServerTimeout is the error when tiflash server is timeout. ErrTiFlashServerTimeout = errors.New("tiflash server timeout") // ErrQueryInterrupted is the error when the query is interrupted. + // This is deprecated. Keep it only to pass CI :-(. We can remove this later. ErrQueryInterrupted = errors.New("query interrupted") // ErrTiKVStaleCommand is the error that the command is stale in tikv. ErrTiKVStaleCommand = errors.New("tikv stale command") @@ -96,11 +97,19 @@ var ( // ErrIsWitness is the error when a request is send to a witness. ErrIsWitness = errors.New("peer is witness") // ErrUnknown is the unknow error. - ErrUnknown = errors.New("unknow") + ErrUnknown = errors.New("unknown") // ErrResultUndetermined is the error when execution result is unknown. ErrResultUndetermined = errors.New("execution result undetermined") ) +type ErrQueryInterruptedWithSignal struct { + Signal uint32 +} + +func (e ErrQueryInterruptedWithSignal) Error() string { + return fmt.Sprintf("query interrupted by signal %d", e.Signal) +} + // MismatchClusterID represents the message that the cluster ID of the PD client does not match the PD. const MismatchClusterID = "mismatch cluster id" diff --git a/integration_tests/2pc_test.go b/integration_tests/2pc_test.go index 355dafea..9369bfb6 100644 --- a/integration_tests/2pc_test.go +++ b/integration_tests/2pc_test.go @@ -2502,3 +2502,13 @@ func (s *testCommitterSuite) TestExtractKeyExistsErr() { s.True(txn.GetMemBuffer().TryLock()) txn.GetMemBuffer().Unlock() } + +func (s *testCommitterSuite) TestKillSignal() { + txn := s.begin() + err := txn.Set([]byte("key"), []byte("value")) + s.Nil(err) + var killed uint32 = 2 + txn.SetVars(kv.NewVariables(&killed)) + err = txn.Commit(context.Background()) + s.ErrorContains(err, "query interrupted") +} diff --git a/internal/client/retry/backoff.go b/internal/client/retry/backoff.go index a2723e05..229d9766 100644 --- a/internal/client/retry/backoff.go +++ b/internal/client/retry/backoff.go @@ -217,23 +217,38 @@ func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err e atomic.AddInt64(&detail.BackoffCount, 1) } - if b.vars != nil && b.vars.Killed != nil { - if atomic.LoadUint32(b.vars.Killed) == 1 { - return errors.WithStack(tikverr.ErrQueryInterrupted) - } + err2 := b.checkKilled() + if err2 != nil { + return err2 } var startTs interface{} if ts := b.ctx.Value(TxnStartKey); ts != nil { startTs = ts } - logutil.Logger(b.ctx).Debug("retry later", + logutil.Logger(b.ctx).Debug( + "retry later", zap.Error(err), zap.Int("totalSleep", b.totalSleep), zap.Int("excludedSleep", b.excludedSleep), zap.Int("maxSleep", b.maxSleep), zap.Stringer("type", cfg), - zap.Reflect("txnStartTS", startTs)) + zap.Reflect("txnStartTS", startTs), + ) + return nil +} + +func (b *Backoffer) checkKilled() error { + if b.vars != nil && b.vars.Killed != nil { + killed := atomic.LoadUint32(b.vars.Killed) + if killed != 0 { + logutil.BgLogger().Info( + "backoff stops because a killed signal is received", + zap.Uint32("signal", killed), + ) + return errors.WithStack(tikverr.ErrQueryInterruptedWithSignal{Signal: killed}) + } + } return nil } diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index d839d534..efcafbf1 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -1499,9 +1499,8 @@ func (s *RegionRequestSender) SendReqCtx( } // recheck whether the session/query is killed during the Next() - boVars := bo.GetVars() - if boVars != nil && boVars.Killed != nil && atomic.LoadUint32(boVars.Killed) == 1 { - return nil, nil, retryTimes, errors.WithStack(tikverr.ErrQueryInterrupted) + if err2 := bo.CheckKilled(); err2 != nil { + return nil, nil, retryTimes, err2 } if val, err := util.EvalFailpoint("mockRetrySendReqToRegion"); err == nil { if val.(bool) { diff --git a/kv/variables.go b/kv/variables.go index 581be54d..cae78c9c 100644 --- a/kv/variables.go +++ b/kv/variables.go @@ -44,6 +44,10 @@ type Variables struct { // Pointer to SessionVars.Killed // Killed is a flag to indicate that this query is killed. + // This is an enum value rather than a boolean. See sqlkiller.go + // in TiDB for its definition. + // When its value is 0, it's not killed + // When its value is not 0, it's killed, the value indicates concrete reason. Killed *uint32 } diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index 866d320b..dc281d65 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -1066,7 +1066,7 @@ func (c *twoPhaseCommitter) doActionOnBatches( ) // TODO: There might be various signals besides a query interruption, // but we are unable to differentiate them, because the definition is in TiDB. - return errors.WithStack(tikverr.ErrQueryInterrupted) + return errors.WithStack(tikverr.ErrQueryInterruptedWithSignal{Signal: status}) } } if len(batches) == 0 {