diff --git a/integration_tests/2pc_test.go b/integration_tests/2pc_test.go index 8ce41838..17878d4b 100644 --- a/integration_tests/2pc_test.go +++ b/integration_tests/2pc_test.go @@ -1051,7 +1051,7 @@ func (s *testCommitterSuite) TestPessimisticLockPrimary() { s.Nil(failpoint.Disable("tikvclient/txnNotFoundRetTTL")) s.Nil(err) waitErr := <-doneCh - s.Equal(tikverr.ErrLockWaitTimeout, waitErr) + s.Equal(tikverr.ErrLockWaitTimeout, errors.Unwrap(waitErr)) } func (s *testCommitterSuite) TestResolvePessimisticLock() { diff --git a/integration_tests/main_test.go b/integration_tests/main_test.go index 5ae9aa71..fcd7a050 100644 --- a/integration_tests/main_test.go +++ b/integration_tests/main_test.go @@ -26,7 +26,7 @@ func TestMain(m *testing.M) { opts := []goleak.Option{ goleak.IgnoreTopFunction("github.com/pingcap/goleveldb/leveldb.(*DB).mpoolDrain"), - goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/tikv.(*ttlManager).keepAlive"), // TODO: fix ttlManager goroutine leak + goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/txnkv/transaction.keepAlive"), // TODO: fix ttlManager goroutine leak } goleak.VerifyTestMain(m, opts...) diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index 8a332b29..53d6e820 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -374,7 +374,7 @@ func (c *twoPhaseCommitter) extractKeyExistsErr(err *tikverr.ErrKeyExist) error if !c.txn.us.HasPresumeKeyNotExists(err.GetKey()) { return errors.Errorf("session %d, existErr for key:%s should not be nil", c.sessionID, err.GetKey()) } - return errors.Trace(err) + return errors.WithStack(err) } // KVFilter is a filter that filters out unnecessary KV pairs. @@ -484,7 +484,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error { logutil.BgLogger().Error("commit failed", zap.Uint64("session", c.sessionID), zap.Error(err)) - return errors.Trace(err) + return err } commitDetail := &util.CommitDetails{WriteSize: size, WriteKeys: c.mutations.Len()} @@ -565,7 +565,7 @@ func (c *twoPhaseCommitter) doActionOnMutations(bo *retry.Backoffer, action twoP } groups, err := c.groupMutations(bo, mutations) if err != nil { - return errors.Trace(err) + return err } // This is redundant since `doActionOnGroupMutations` will still split groups into batches and @@ -599,7 +599,7 @@ func groupSortedMutationsByRegion(c *locate.RegionCache, bo *retry.Backoffer, m var err error lastLoc, err = c.LocateKey(bo, m.GetKey(i)) if err != nil { - return nil, errors.Trace(err) + return nil, err } } } @@ -616,7 +616,7 @@ func groupSortedMutationsByRegion(c *locate.RegionCache, bo *retry.Backoffer, m func (c *twoPhaseCommitter) groupMutations(bo *retry.Backoffer, mutations CommitterMutations) ([]groupedMutations, error) { groups, err := groupSortedMutationsByRegion(c.store.GetRegionCache(), bo, mutations) if err != nil { - return nil, errors.Trace(err) + return nil, err } // Pre-split regions to avoid too much write workload into a single region. @@ -637,7 +637,7 @@ func (c *twoPhaseCommitter) groupMutations(bo *retry.Backoffer, mutations Commit if didPreSplit { groups, err = groupSortedMutationsByRegion(c.store.GetRegionCache(), bo, mutations) if err != nil { - return nil, errors.Trace(err) + return nil, err } } @@ -749,7 +749,7 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *retry.Backoffer, action // primary should be committed(not async commit)/cleanup/pessimistically locked first err = c.doActionOnBatches(bo, action, batchBuilder.primaryBatch()) if err != nil { - return errors.Trace(err) + return err } if actionIsCommit && c.testingKnobs.bkAfterCommitPrimary != nil && c.testingKnobs.acAfterCommitPrimary != nil { c.testingKnobs.acAfterCommitPrimary <- struct{}{} @@ -791,7 +791,7 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *retry.Backoffer, action } else { err = c.doActionOnBatches(bo, action, batchBuilder.allBatches()) } - return errors.Trace(err) + return err } // doActionOnBatches does action to batches in parallel. @@ -815,7 +815,7 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *retry.Backoffer, action twoPha zap.Stringer("action type", action), zap.Error(e), zap.Uint64("txnStartTS", c.startTS)) - return errors.Trace(e) + return e } } return nil @@ -829,8 +829,7 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *retry.Backoffer, action twoPha rateLim = config.GetGlobalConfig().CommitterConcurrency } batchExecutor := newBatchExecutor(rateLim, c, action, bo) - err := batchExecutor.process(batches) - return errors.Trace(err) + return batchExecutor.process(batches) } func (c *twoPhaseCommitter) keyValueSize(key, value []byte) int { @@ -966,16 +965,16 @@ func sendTxnHeartBeat(bo *retry.Backoffer, store kvstore, primary []byte, startT for { loc, err := store.GetRegionCache().LocateKey(bo, primary) if err != nil { - return 0, false, errors.Trace(err) + return 0, false, err } req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds()) resp, err := store.SendReq(bo, req, loc.Region, client.ReadTimeoutShort) if err != nil { - return 0, false, errors.Trace(err) + return 0, false, err } regionErr, err := resp.GetRegionError() if err != nil { - return 0, false, errors.Trace(err) + return 0, false, err } if regionErr != nil { // For other region error and the fake region error, backoff because @@ -984,13 +983,13 @@ func sendTxnHeartBeat(bo *retry.Backoffer, store kvstore, primary []byte, startT if regionErr.GetEpochNotMatch() == nil || locate.IsFakeRegionError(regionErr) { err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) if err != nil { - return 0, false, errors.Trace(err) + return 0, false, err } } continue } if resp.Resp == nil { - return 0, false, errors.Trace(tikverr.ErrBodyMissing) + return 0, false, errors.WithStack(tikverr.ErrBodyMissing) } cmdResp := resp.Resp.(*kvrpcpb.TxnHeartBeatResponse) if keyErr := cmdResp.GetError(); keyErr != nil { @@ -1195,7 +1194,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { // instead of falling back to the normal 2PC because a normal 2PC will // also be likely to fail due to the same timestamp issue. if err != nil { - return errors.Trace(err) + return err } // Plus 1 to avoid producing the same commit TS with previously committed transactions c.minCommitTS = latestTS + 1 @@ -1203,7 +1202,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { // Calculate maxCommitTS if necessary if commitTSMayBeCalculated { if err = c.calculateMaxCommitTS(ctx); err != nil { - return errors.Trace(err) + return err } } @@ -1229,7 +1228,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { zap.Error(err), zap.NamedError("rpcErr", undeterminedErr), zap.Uint64("txnStartTS", c.startTS)) - return errors.Trace(tikverr.ErrResultUndetermined) + return errors.WithStack(tikverr.ErrResultUndetermined) } } @@ -1261,7 +1260,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { logutil.Logger(ctx).Debug("2PC failed on prewrite", zap.Error(err), zap.Uint64("txnStartTS", c.startTS)) - return errors.Trace(err) + return err } // strip check_not_exists keys that no need to commit. @@ -1271,8 +1270,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { if c.isOnePC() { if c.onePCCommitTS == 0 { - err = errors.Errorf("session %d invalid onePCCommitTS for 1PC protocol after prewrite, startTS=%v", c.sessionID, c.startTS) - return errors.Trace(err) + return errors.Errorf("session %d invalid onePCCommitTS for 1PC protocol after prewrite, startTS=%v", c.sessionID, c.startTS) } c.commitTS = c.onePCCommitTS c.txn.commitTS = c.commitTS @@ -1289,8 +1287,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { if c.isAsyncCommit() { if c.minCommitTS == 0 { - err = errors.Errorf("session %d invalid minCommitTS for async commit protocol after prewrite, startTS=%v", c.sessionID, c.startTS) - return errors.Trace(err) + return errors.Errorf("session %d invalid minCommitTS for async commit protocol after prewrite, startTS=%v", c.sessionID, c.startTS) } commitTS = c.minCommitTS } else { @@ -1301,7 +1298,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { logutil.Logger(ctx).Warn("2PC get commitTS failed", zap.Error(err), zap.Uint64("txnStartTS", c.startTS)) - return errors.Trace(err) + return err } commitDetail.GetCommitTsTime = time.Since(start) logutil.Event(ctx, "finish get commit ts") @@ -1313,18 +1310,18 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { if !tryAmend { _, _, err = c.checkSchemaValid(ctx, commitTS, c.txn.schemaVer, false) if err != nil { - return errors.Trace(err) + return err } } else { relatedSchemaChange, memAmended, err := c.checkSchemaValid(ctx, commitTS, c.txn.schemaVer, true) if err != nil { - return errors.Trace(err) + return err } if memAmended { // Get new commitTS and check schema valid again. newCommitTS, err := c.getCommitTS(ctx, commitDetail) if err != nil { - return errors.Trace(err) + return err } // If schema check failed between commitTS and newCommitTs, report schema change error. _, _, err = c.checkSchemaValid(ctx, newCommitTS, relatedSchemaChange.LatestInfoSchema, false) @@ -1334,7 +1331,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { zap.Uint64("amendTS", commitTS), zap.Int64("amendedSchemaVersion", relatedSchemaChange.LatestInfoSchema.SchemaMetaVersion()), zap.Uint64("newCommitTS", newCommitTS)) - return errors.Trace(err) + return err } commitTS = newCommitTS } @@ -1413,13 +1410,13 @@ func (c *twoPhaseCommitter) commitTxn(ctx context.Context, commitDetail *util.Co zap.Error(err), zap.NamedError("rpcErr", undeterminedErr), zap.Uint64("txnStartTS", c.startTS)) - err = errors.Trace(tikverr.ErrResultUndetermined) + err = errors.WithStack(tikverr.ErrResultUndetermined) } if !c.mu.committed { logutil.Logger(ctx).Debug("2PC failed on commit", zap.Error(err), zap.Uint64("txnStartTS", c.startTS)) - return errors.Trace(err) + return err } logutil.Logger(ctx).Debug("got some exceptions, but 2PC was still successful", zap.Error(err), @@ -1492,7 +1489,7 @@ func (c *twoPhaseCommitter) amendPessimisticLock(ctx context.Context, addMutatio if _, ok := errors.Cause(err).(*tikverr.ErrWriteConflict); ok { newForUpdateTSVer, err := c.store.CurrentTimestamp(oracle.GlobalTxnScope) if err != nil { - return errors.Trace(err) + return err } lCtx.ForUpdateTS = newForUpdateTSVer c.forUpdateTS = newForUpdateTSVer @@ -1569,7 +1566,7 @@ func (c *twoPhaseCommitter) getCommitTS(ctx context.Context, commitDetail *util. logutil.Logger(ctx).Warn("2PC get commitTS failed", zap.Error(err), zap.Uint64("txnStartTS", c.startTS)) - return 0, errors.Trace(err) + return 0, err } commitDetail.GetCommitTsTime = time.Since(start) logutil.Event(ctx, "finish get commit ts") @@ -1580,7 +1577,7 @@ func (c *twoPhaseCommitter) getCommitTS(ctx context.Context, commitDetail *util. err = errors.Errorf("session %d invalid transaction tso with txnStartTS=%v while txnCommitTS=%v", c.sessionID, c.startTS, commitTS) logutil.BgLogger().Error("invalid transaction", zap.Error(err)) - return 0, errors.Trace(err) + return 0, err } return commitTS, nil } @@ -1592,8 +1589,7 @@ func (c *twoPhaseCommitter) checkSchemaValid(ctx context.Context, checkTS uint64 if _, err := util.EvalFailpoint("failCheckSchemaValid"); err == nil { logutil.Logger(ctx).Info("[failpoint] injected fail schema check", zap.Uint64("txnStartTS", c.startTS)) - err := errors.Errorf("mock check schema valid failure") - return nil, false, err + return nil, false, errors.Errorf("mock check schema valid failure") } if c.txn.schemaLeaseChecker == nil { if c.sessionID > 0 { @@ -1611,7 +1607,7 @@ func (c *twoPhaseCommitter) checkSchemaValid(ctx context.Context, checkTS uint64 if amendErr != nil { logutil.BgLogger().Info("txn amend has failed", zap.Uint64("sessionID", c.sessionID), zap.Uint64("startTS", c.startTS), zap.Error(amendErr)) - return nil, false, err + return nil, false, errors.WithStack(err) } logutil.Logger(ctx).Info("amend txn successfully", zap.Uint64("sessionID", c.sessionID), zap.Uint64("txn startTS", c.startTS), zap.Bool("memAmended", memAmended), @@ -1619,7 +1615,7 @@ func (c *twoPhaseCommitter) checkSchemaValid(ctx context.Context, checkTS uint64 zap.Int64s("table ids", relatedChanges.PhyTblIDS), zap.Uint64s("action types", relatedChanges.ActionTypes)) return relatedChanges, memAmended, nil } - return nil, false, errors.Trace(err) + return nil, false, errors.WithStack(err) } return nil, false, nil } @@ -1632,7 +1628,7 @@ func (c *twoPhaseCommitter) calculateMaxCommitTS(ctx context.Context) error { logutil.Logger(ctx).Info("Schema changed for async commit txn", zap.Error(err), zap.Uint64("startTS", c.startTS)) - return errors.Trace(err) + return err } safeWindow := config.GetGlobalConfig().TiKVClient.AsyncCommit.SafeWindow @@ -1665,7 +1661,7 @@ func (b *batchMutations) relocate(bo *retry.Backoffer, c *locate.RegionCache) (b begin, end := b.mutations.GetKey(0), b.mutations.GetKey(b.mutations.Len()-1) loc, err := c.LocateKey(bo, begin) if err != nil { - return false, errors.Trace(err) + return false, err } if !loc.Contains(end) { return false, nil diff --git a/txnkv/transaction/batch_getter.go b/txnkv/transaction/batch_getter.go index ff62a8ac..0946360b 100644 --- a/txnkv/transaction/batch_getter.go +++ b/txnkv/transaction/batch_getter.go @@ -37,7 +37,6 @@ package transaction import ( "context" - "github.com/pingcap/errors" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/internal/unionstore" ) @@ -79,13 +78,13 @@ func (b *BufferBatchGetter) BatchGet(ctx context.Context, keys [][]byte) (map[st continue } if !tikverr.IsErrNotFound(err) { - return nil, errors.Trace(err) + return nil, err } shrinkKeys = append(shrinkKeys, key) } storageValues, err := b.snapshot.BatchGet(ctx, shrinkKeys) if err != nil { - return nil, errors.Trace(err) + return nil, err } for i, key := range keys { if len(bufferValues[i]) == 0 { diff --git a/txnkv/transaction/cleanup.go b/txnkv/transaction/cleanup.go index bf371c0e..469bcfbd 100644 --- a/txnkv/transaction/cleanup.go +++ b/txnkv/transaction/cleanup.go @@ -66,26 +66,25 @@ func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds())}) resp, err := c.store.SendReq(bo, req, batch.region, client.ReadTimeoutShort) if err != nil { - return errors.Trace(err) + return err } regionErr, err := resp.GetRegionError() if err != nil { - return errors.Trace(err) + return err } if regionErr != nil { err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) if err != nil { - return errors.Trace(err) + return err } - err = c.cleanupMutations(bo, batch.mutations) - return errors.Trace(err) + return c.cleanupMutations(bo, batch.mutations) } if keyErr := resp.Resp.(*kvrpcpb.BatchRollbackResponse).GetError(); keyErr != nil { err = errors.Errorf("session %d 2PC cleanup failed: %s", c.sessionID, keyErr) logutil.BgLogger().Debug("2PC failed cleanup key", zap.Error(err), zap.Uint64("txnStartTS", c.startTS)) - return errors.Trace(err) + return err } return nil } diff --git a/txnkv/transaction/commit.go b/txnkv/transaction/commit.go index 7ea09825..c89b34d8 100644 --- a/txnkv/transaction/commit.go +++ b/txnkv/transaction/commit.go @@ -92,17 +92,17 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, // an error (may lead to the duplicated key error when upper level restarts the transaction). Currently the best // solution is to populate this error and let upper layer drop the connection to the corresponding mysql client. if batch.isPrimary && sender.GetRPCError() != nil && !c.isAsyncCommit() { - c.setUndeterminedErr(errors.Trace(sender.GetRPCError())) + c.setUndeterminedErr(errors.WithStack(sender.GetRPCError())) } // Unexpected error occurs, return it. if err != nil { - return errors.Trace(err) + return err } regionErr, err := resp.GetRegionError() if err != nil { - return errors.Trace(err) + return err } if regionErr != nil { // For other region error and the fake region error, backoff because @@ -111,22 +111,21 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, if regionErr.GetEpochNotMatch() == nil || locate.IsFakeRegionError(regionErr) { err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) if err != nil { - return errors.Trace(err) + return err } } same, err := batch.relocate(bo, c.store.GetRegionCache()) if err != nil { - return errors.Trace(err) + return err } if same { continue } - err = c.doActionOnMutations(bo, actionCommit{true}, batch.mutations) - return errors.Trace(err) + return c.doActionOnMutations(bo, actionCommit{true}, batch.mutations) } if resp.Resp == nil { - return errors.Trace(tikverr.ErrBodyMissing) + return errors.WithStack(tikverr.ErrBodyMissing) } commitResp := resp.Resp.(*kvrpcpb.CommitResponse) // Here we can make sure tikv has processed the commit primary key request. So @@ -143,9 +142,8 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, // Do not retry for a txn which has a too large MinCommitTs // 3600000 << 18 = 943718400000 if rejected.MinCommitTs-rejected.AttemptedCommitTs > 943718400000 { - err := errors.Errorf("2PC MinCommitTS is too large, we got MinCommitTS: %d, and AttemptedCommitTS: %d", + return errors.Errorf("2PC MinCommitTS is too large, we got MinCommitTS: %d, and AttemptedCommitTS: %d", rejected.MinCommitTs, rejected.AttemptedCommitTs) - return errors.Trace(err) } // Update commit ts and retry. @@ -154,7 +152,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, logutil.Logger(bo.GetCtx()).Warn("2PC get commitTS failed", zap.Error(err), zap.Uint64("txnStartTS", c.startTS)) - return errors.Trace(err) + return err } c.mu.Lock() @@ -183,7 +181,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, zap.Uint64("txnStartTS", c.startTS), zap.Uint64("commitTS", c.commitTS), zap.Strings("keys", hexBatchKeys(keys))) - return errors.Trace(err) + return err } // The transaction maybe rolled back by concurrent transactions. logutil.Logger(bo.GetCtx()).Debug("2PC failed commit primary key", diff --git a/txnkv/transaction/pessimistic.go b/txnkv/transaction/pessimistic.go index de00e070..35927ecb 100644 --- a/txnkv/transaction/pessimistic.go +++ b/txnkv/transaction/pessimistic.go @@ -132,7 +132,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * } if _, err := util.EvalFailpoint("PessimisticLockErrWriteConflict"); err == nil { time.Sleep(300 * time.Millisecond) - return &tikverr.ErrWriteConflict{WriteConflict: nil} + return errors.WithStack(&tikverr.ErrWriteConflict{WriteConflict: nil}) } startTime := time.Now() resp, err := c.store.SendReq(bo, req, batch.region, client.ReadTimeoutShort) @@ -141,11 +141,11 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * atomic.AddInt64(&action.LockCtx.Stats.LockRPCCount, 1) } if err != nil { - return errors.Trace(err) + return err } regionErr, err := resp.GetRegionError() if err != nil { - return errors.Trace(err) + return err } if regionErr != nil { // For other region error and the fake region error, backoff because @@ -154,21 +154,21 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * if regionErr.GetEpochNotMatch() == nil || locate.IsFakeRegionError(regionErr) { err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) if err != nil { - return errors.Trace(err) + return err } } same, err := batch.relocate(bo, c.store.GetRegionCache()) if err != nil { - return errors.Trace(err) + return err } if same { continue } err = c.pessimisticLockMutations(bo, action.LockCtx, batch.mutations) - return errors.Trace(err) + return err } if resp.Resp == nil { - return errors.Trace(tikverr.ErrBodyMissing) + return errors.WithStack(tikverr.ErrBodyMissing) } lockResp := resp.Resp.(*kvrpcpb.PessimisticLockResponse) keyErrs := lockResp.GetErrors() @@ -196,13 +196,13 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * return c.extractKeyExistsErr(e) } if deadlock := keyErr.Deadlock; deadlock != nil { - return &tikverr.ErrDeadlock{Deadlock: deadlock} + return errors.WithStack(&tikverr.ErrDeadlock{Deadlock: deadlock}) } // Extract lock from key error lock, err1 := txnlock.ExtractLockFromKeyErr(keyErr) if err1 != nil { - return errors.Trace(err1) + return err1 } locks = append(locks, lock) } @@ -211,7 +211,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * startTime = time.Now() msBeforeTxnExpired, _, err := c.store.GetLockResolver().ResolveLocks(bo, 0, locks) if err != nil { - return errors.Trace(err) + return err } if action.LockCtx.Stats != nil { atomic.AddInt64(&action.LockCtx.Stats.ResolveLockTime, int64(time.Since(startTime))) @@ -221,13 +221,13 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * // the pessimistic lock. We should return acquire fail with nowait set or timeout error if necessary. if msBeforeTxnExpired > 0 { if action.LockWaitTime() == kv.LockNoWait { - return tikverr.ErrLockAcquireFailAndNoWaitSet + return errors.WithStack(tikverr.ErrLockAcquireFailAndNoWaitSet) } else if action.LockWaitTime() == kv.LockAlwaysWait { // do nothing but keep wait } else { // the lockWaitTime is set, we should return wait timeout if we are still blocked by a lock if time.Since(lockWaitStartTime).Milliseconds() >= action.LockWaitTime() { - return errors.Trace(tikverr.ErrLockWaitTimeout) + return errors.WithStack(tikverr.ErrLockWaitTimeout) } } if action.LockCtx.PessimisticLockWaited != nil { @@ -243,7 +243,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * // actionPessimisticLock runs on each region parallelly, we have to consider that // the error may be dropped. if atomic.LoadUint32(action.Killed) == 1 { - return errors.Trace(tikverr.ErrQueryInterrupted) + return errors.WithStack(tikverr.ErrQueryInterrupted) } } } @@ -258,19 +258,18 @@ func (actionPessimisticRollback) handleSingleBatch(c *twoPhaseCommitter, bo *ret req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds()) resp, err := c.store.SendReq(bo, req, batch.region, client.ReadTimeoutShort) if err != nil { - return errors.Trace(err) + return err } regionErr, err := resp.GetRegionError() if err != nil { - return errors.Trace(err) + return err } if regionErr != nil { err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) if err != nil { - return errors.Trace(err) + return err } - err = c.pessimisticRollbackMutations(bo, batch.mutations) - return errors.Trace(err) + return c.pessimisticRollbackMutations(bo, batch.mutations) } return nil } diff --git a/txnkv/transaction/prewrite.go b/txnkv/transaction/prewrite.go index 2bc69f68..4d28e29c 100644 --- a/txnkv/transaction/prewrite.go +++ b/txnkv/transaction/prewrite.go @@ -194,7 +194,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B // If prewrite has been cancelled, all ongoing prewrite RPCs will become errors, we needn't set undetermined // errors. if (c.isAsyncCommit() || c.isOnePC()) && sender.GetRPCError() != nil && atomic.LoadUint32(&c.prewriteCancelled) == 0 { - c.setUndeterminedErr(errors.Trace(sender.GetRPCError())) + c.setUndeterminedErr(sender.GetRPCError()) } } }() @@ -208,12 +208,12 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B resp, err := sender.SendReq(bo, req, batch.region, client.ReadTimeoutShort) // Unexpected error occurs, return it if err != nil { - return errors.Trace(err) + return err } regionErr, err := resp.GetRegionError() if err != nil { - return errors.Trace(err) + return err } if regionErr != nil { // For other region error and the fake region error, backoff because @@ -222,7 +222,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B if regionErr.GetEpochNotMatch() == nil || locate.IsFakeRegionError(regionErr) { err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) if err != nil { - return errors.Trace(err) + return err } } if regionErr.GetDiskFull() != nil { @@ -236,21 +236,21 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B zap.String("store_id", desc), zap.String("reason", regionErr.GetDiskFull().GetReason())) - return errors.Trace(errors.New(regionErr.String())) + return errors.New(regionErr.String()) } same, err := batch.relocate(bo, c.store.GetRegionCache()) if err != nil { - return errors.Trace(err) + return err } if same { continue } err = c.doActionOnMutations(bo, actionPrewrite{true}, batch.mutations) - return errors.Trace(err) + return err } if resp.Resp == nil { - return errors.Trace(tikverr.ErrBodyMissing) + return errors.WithStack(tikverr.ErrBodyMissing) } prewriteResp := resp.Resp.(*kvrpcpb.PrewriteResponse) keyErrs := prewriteResp.GetErrors() @@ -271,7 +271,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B if c.isOnePC() { if prewriteResp.OnePcCommitTs == 0 { if prewriteResp.MinCommitTs != 0 { - return errors.Trace(errors.New("MinCommitTs must be 0 when 1pc falls back to 2pc")) + return errors.New("MinCommitTs must be 0 when 1pc falls back to 2pc") } logutil.Logger(bo.GetCtx()).Warn("1pc failed and fallbacks to normal commit procedure", zap.Uint64("startTS", c.startTS)) @@ -324,7 +324,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B // Extract lock from key error lock, err1 := txnlock.ExtractLockFromKeyErr(keyErr) if err1 != nil { - return errors.Trace(err1) + return err1 } logutil.BgLogger().Info("prewrite encounters lock", zap.Uint64("session", c.sessionID), @@ -334,13 +334,13 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B start := time.Now() msBeforeExpired, err := c.store.GetLockResolver().ResolveLocksForWrite(bo, c.startTS, c.forUpdateTS, locks) if err != nil { - return errors.Trace(err) + return err } atomic.AddInt64(&c.getDetail().ResolveLockTime, int64(time.Since(start))) if msBeforeExpired > 0 { err = bo.BackoffWithCfgAndMaxSleep(retry.BoTxnLock, int(msBeforeExpired), errors.Errorf("2PC prewrite lockedKeys: %d", len(locks))) if err != nil { - return errors.Trace(err) + return err } } } diff --git a/txnkv/transaction/test_probe.go b/txnkv/transaction/test_probe.go index b5537aaa..7a37250c 100644 --- a/txnkv/transaction/test_probe.go +++ b/txnkv/transaction/test_probe.go @@ -20,7 +20,6 @@ import ( "sync/atomic" "time" - "github.com/pingcap/errors" "github.com/tikv/client-go/v2/internal/locate" "github.com/tikv/client-go/v2/internal/retry" "github.com/tikv/client-go/v2/internal/unionstore" @@ -95,10 +94,10 @@ func (txn TxnProbe) GetStartTime() time.Time { func newTwoPhaseCommitterWithInit(txn *KVTxn, sessionID uint64) (*twoPhaseCommitter, error) { c, err := newTwoPhaseCommitter(txn, sessionID) if err != nil { - return nil, errors.Trace(err) + return nil, err } if err = c.initKeysAndMutations(); err != nil { - return nil, errors.Trace(err) + return nil, err } return c, nil } diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index b67da4de..661b939a 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -157,7 +157,7 @@ func (txn *KVTxn) Get(ctx context.Context, k []byte) ([]byte, error) { return nil, err } if err != nil { - return nil, errors.Trace(err) + return nil, err } return ret, nil @@ -339,7 +339,7 @@ func (txn *KVTxn) Commit(ctx context.Context) error { if committer == nil { committer, err = newTwoPhaseCommitter(txn, sessionID) if err != nil { - return errors.Trace(err) + return err } txn.committer = committer } @@ -352,7 +352,7 @@ func (txn *KVTxn) Commit(ctx context.Context) error { err = committer.initKeysAndMutations() initRegion.End() if err != nil { - return errors.Trace(err) + return err } if committer.mutations.Len() == 0 { return nil @@ -383,7 +383,7 @@ func (txn *KVTxn) Commit(ctx context.Context) error { txn.onCommitted(err) } logutil.Logger(ctx).Debug("[kv] txnLatches disabled, 2pc directly", zap.Error(err)) - return errors.Trace(err) + return err } // latches enabled @@ -407,7 +407,7 @@ func (txn *KVTxn) Commit(ctx context.Context) error { lock.SetCommitTS(committer.commitTS) } logutil.Logger(ctx).Debug("[kv] txnLatches enabled while txn retryable", zap.Error(err)) - return errors.Trace(err) + return err } func (txn *KVTxn) close() {