txnkv/txnsnapshot: remove errors.Trace usages (#345)

close #344

Signed-off-by: disksing <i@disksing.com>
This commit is contained in:
disksing 2021-10-27 16:41:49 +08:00 committed by GitHub
parent 5c06d9b19a
commit 70c69a3fac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 48 additions and 49 deletions

View File

@ -241,26 +241,26 @@ func ExtractKeyErr(keyErr *kvrpcpb.KeyError) error {
}
if keyErr.Conflict != nil {
return &ErrWriteConflict{WriteConflict: keyErr.GetConflict()}
return errors.WithStack(&ErrWriteConflict{WriteConflict: keyErr.GetConflict()})
}
if keyErr.Retryable != "" {
return &ErrRetryable{Retryable: keyErr.Retryable}
return errors.WithStack(&ErrRetryable{Retryable: keyErr.Retryable})
}
if keyErr.Abort != "" {
err := errors.Errorf("tikv aborts txn: %s", keyErr.GetAbort())
logutil.BgLogger().Warn("2PC failed", zap.Error(err))
return errors.Trace(err)
return err
}
if keyErr.CommitTsTooLarge != nil {
err := errors.Errorf("commit TS %v is too large", keyErr.CommitTsTooLarge.CommitTs)
logutil.BgLogger().Warn("2PC failed", zap.Error(err))
return errors.Trace(err)
return err
}
if keyErr.TxnNotFound != nil {
err := errors.Errorf("txn %d not found", keyErr.TxnNotFound.StartTs)
return errors.Trace(err)
return err
}
return errors.Errorf("unexpected KeyError: %s", keyErr.String())
}

View File

@ -86,7 +86,7 @@ func newScanner(snapshot *KVSnapshot, startKey []byte, endKey []byte, batchSize
if tikverr.IsErrNotFound(err) {
return scanner, nil
}
return scanner, errors.Trace(err)
return scanner, err
}
// Valid return valid.
@ -129,7 +129,7 @@ func (s *Scanner) Next() error {
err = s.getData(bo)
if err != nil {
s.Close()
return errors.Trace(err)
return err
}
if s.idx >= len(s.cache) {
continue
@ -148,7 +148,7 @@ func (s *Scanner) Next() error {
// 'current' would be modified if the lock being resolved
if err := s.resolveCurrentLock(bo, current); err != nil {
s.Close()
return errors.Trace(err)
return err
}
// The check here does not violate the KeyOnly semantic, because current's value
@ -175,7 +175,7 @@ func (s *Scanner) resolveCurrentLock(bo *retry.Backoffer, current *kvrpcpb.KvPai
ctx := context.Background()
val, err := s.snapshot.get(ctx, bo, current.Key)
if err != nil {
return errors.Trace(err)
return err
}
current.Error = nil
current.Value = val
@ -199,7 +199,7 @@ func (s *Scanner) getData(bo *retry.Backoffer) error {
loc, err = s.snapshot.store.GetRegionCache().LocateEndKey(bo, s.nextEndKey)
}
if err != nil {
return errors.Trace(err)
return err
}
if !s.reverse {
@ -243,11 +243,11 @@ func (s *Scanner) getData(bo *retry.Backoffer) error {
s.snapshot.mu.RUnlock()
resp, err := sender.SendReq(bo, req, loc.Region, client.ReadTimeoutMedium)
if err != nil {
return errors.Trace(err)
return err
}
regionErr, err := resp.GetRegionError()
if err != nil {
return errors.Trace(err)
return err
}
if regionErr != nil {
logutil.BgLogger().Debug("scanner getData failed",
@ -258,19 +258,19 @@ func (s *Scanner) getData(bo *retry.Backoffer) error {
if regionErr.GetEpochNotMatch() == nil || locate.IsFakeRegionError(regionErr) {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return errors.Trace(err)
return err
}
}
continue
}
if resp.Resp == nil {
return errors.Trace(tikverr.ErrBodyMissing)
return errors.WithStack(tikverr.ErrBodyMissing)
}
cmdScanResp := resp.Resp.(*kvrpcpb.ScanResponse)
err = s.snapshot.store.CheckVisibility(s.startTS())
if err != nil {
return errors.Trace(err)
return err
}
// When there is a response-level key error, the returned pairs are incomplete.
@ -278,16 +278,16 @@ func (s *Scanner) getData(bo *retry.Backoffer) error {
if keyErr := cmdScanResp.GetError(); keyErr != nil {
lock, err := txnlock.ExtractLockFromKeyErr(keyErr)
if err != nil {
return errors.Trace(err)
return err
}
msBeforeExpired, _, err := txnlock.NewLockResolver(s.snapshot.store).ResolveLocks(bo, s.snapshot.version, []*txnlock.Lock{lock})
if err != nil {
return errors.Trace(err)
return err
}
if msBeforeExpired > 0 {
err = bo.BackoffWithMaxSleepTxnLockFast(int(msBeforeExpired), errors.Errorf("key is locked during scanning"))
if err != nil {
return errors.Trace(err)
return err
}
}
continue
@ -299,7 +299,7 @@ func (s *Scanner) getData(bo *retry.Backoffer) error {
if keyErr := pair.GetError(); keyErr != nil && len(pair.Key) == 0 {
lock, err := txnlock.ExtractLockFromKeyErr(keyErr)
if err != nil {
return errors.Trace(err)
return err
}
pair.Key = lock.Key
}

View File

@ -214,12 +214,12 @@ func (s *KVSnapshot) BatchGet(ctx context.Context, keys [][]byte) (map[string][]
})
s.recordBackoffInfo(bo)
if err != nil {
return nil, errors.Trace(err)
return nil, err
}
err = s.store.CheckVisibility(s.version)
if err != nil {
return nil, errors.Trace(err)
return nil, err
}
// Update the cache.
@ -259,7 +259,7 @@ type batchKeys struct {
func (b *batchKeys) relocate(bo *retry.Backoffer, c *locate.RegionCache) (bool, error) {
loc, err := c.LocateKey(bo, b.keys[0])
if err != nil {
return false, errors.Trace(err)
return false, err
}
// keys is not in order, so we have to iterate all keys.
for i := 1; i < len(b.keys); i++ {
@ -294,7 +294,7 @@ func (s *KVSnapshot) batchGetKeysByRegions(bo *retry.Backoffer, keys [][]byte, c
}(time.Now())
groups, _, err := s.store.GetRegionCache().GroupKeysByRegion(bo, keys, nil)
if err != nil {
return errors.Trace(err)
return err
}
metrics.TxnRegionsNumHistogramWithSnapshot.Observe(float64(len(groups)))
@ -308,7 +308,7 @@ func (s *KVSnapshot) batchGetKeysByRegions(bo *retry.Backoffer, keys [][]byte, c
return nil
}
if len(batches) == 1 {
return errors.Trace(s.batchGetSingleRegion(bo, batches[0], collectF))
return s.batchGetSingleRegion(bo, batches[0], collectF)
}
ch := make(chan error)
for _, batch1 := range batches {
@ -324,10 +324,10 @@ func (s *KVSnapshot) batchGetKeysByRegions(bo *retry.Backoffer, keys [][]byte, c
logutil.BgLogger().Debug("snapshot batchGet failed",
zap.Error(e),
zap.Uint64("txnStartTS", s.version))
err = e
err = errors.WithStack(e)
}
}
return errors.Trace(err)
return err
}
func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, collectF func(k, v []byte)) error {
@ -368,11 +368,11 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys,
}
resp, _, _, err := cli.SendReqCtx(bo, req, batch.region, client.ReadTimeoutMedium, tikvrpc.TiKV, "", ops...)
if err != nil {
return errors.Trace(err)
return err
}
regionErr, err := resp.GetRegionError()
if err != nil {
return errors.Trace(err)
return err
}
if regionErr != nil {
// For other region error and the fake region error, backoff because
@ -381,21 +381,20 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys,
if regionErr.GetEpochNotMatch() == nil || locate.IsFakeRegionError(regionErr) {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return errors.Trace(err)
return err
}
}
same, err := batch.relocate(bo, cli.regionCache)
if err != nil {
return errors.Trace(err)
return err
}
if same {
continue
}
err = s.batchGetKeysByRegions(bo, pending, collectF)
return errors.Trace(err)
return s.batchGetKeysByRegions(bo, pending, collectF)
}
if resp.Resp == nil {
return errors.Trace(tikverr.ErrBodyMissing)
return errors.WithStack(tikverr.ErrBodyMissing)
}
batchGetResp := resp.Resp.(*kvrpcpb.BatchGetResponse)
var (
@ -406,7 +405,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys,
// If a response-level error happens, skip reading pairs.
lock, err := txnlock.ExtractLockFromKeyErr(keyErr)
if err != nil {
return errors.Trace(err)
return err
}
lockedKeys = append(lockedKeys, lock.Key)
locks = append(locks, lock)
@ -419,7 +418,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys,
}
lock, err := txnlock.ExtractLockFromKeyErr(keyErr)
if err != nil {
return errors.Trace(err)
return err
}
lockedKeys = append(lockedKeys, lock.Key)
locks = append(locks, lock)
@ -435,12 +434,12 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys,
if len(lockedKeys) > 0 {
msBeforeExpired, err := cli.ResolveLocks(bo, s.version, locks)
if err != nil {
return errors.Trace(err)
return err
}
if msBeforeExpired > 0 {
err = bo.BackoffWithMaxSleepTxnLockFast(int(msBeforeExpired), errors.Errorf("batchGet lockedKeys: %d", len(lockedKeys)))
if err != nil {
return errors.Trace(err)
return err
}
}
// Only reduce pending keys when there is no response-level error. Otherwise,
@ -467,11 +466,11 @@ func (s *KVSnapshot) Get(ctx context.Context, k []byte) ([]byte, error) {
val, err := s.get(ctx, bo, k)
s.recordBackoffInfo(bo)
if err != nil {
return nil, errors.Trace(err)
return nil, err
}
err = s.store.CheckVisibility(s.version)
if err != nil {
return nil, errors.Trace(err)
return nil, err
}
if len(val) == 0 {
@ -540,15 +539,15 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([]
util.EvalFailpoint("beforeSendPointGet")
loc, err := s.store.GetRegionCache().LocateKey(bo, k)
if err != nil {
return nil, errors.Trace(err)
return nil, err
}
resp, _, _, err := cli.SendReqCtx(bo, req, loc.Region, client.ReadTimeoutShort, tikvrpc.TiKV, "", ops...)
if err != nil {
return nil, errors.Trace(err)
return nil, err
}
regionErr, err := resp.GetRegionError()
if err != nil {
return nil, errors.Trace(err)
return nil, err
}
if regionErr != nil {
// For other region error and the fake region error, backoff because
@ -557,13 +556,13 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([]
if regionErr.GetEpochNotMatch() == nil || locate.IsFakeRegionError(regionErr) {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return nil, errors.Trace(err)
return nil, err
}
}
continue
}
if resp.Resp == nil {
return nil, errors.Trace(tikverr.ErrBodyMissing)
return nil, errors.WithStack(tikverr.ErrBodyMissing)
}
cmdGetResp := resp.Resp.(*kvrpcpb.GetResponse)
if cmdGetResp.ExecDetailsV2 != nil {
@ -577,7 +576,7 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([]
if keyErr := cmdGetResp.GetError(); keyErr != nil {
lock, err := txnlock.ExtractLockFromKeyErr(keyErr)
if err != nil {
return nil, errors.Trace(err)
return nil, err
}
if firstLock == nil {
firstLock = lock
@ -591,12 +590,12 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([]
msBeforeExpired, err := cli.ResolveLocks(bo, s.version, []*txnlock.Lock{lock})
if err != nil {
return nil, errors.Trace(err)
return nil, err
}
if msBeforeExpired > 0 {
err = bo.BackoffWithMaxSleepTxnLockFast(int(msBeforeExpired), errors.New(keyErr.String()))
if err != nil {
return nil, errors.Trace(err)
return nil, err
}
}
continue
@ -624,13 +623,13 @@ func (s *KVSnapshot) mergeExecDetail(detail *kvrpcpb.ExecDetailsV2) {
// Iter return a list of key-value pair after `k`.
func (s *KVSnapshot) Iter(k []byte, upperBound []byte) (unionstore.Iterator, error) {
scanner, err := newScanner(s, k, upperBound, s.scanBatchSize, false)
return scanner, errors.Trace(err)
return scanner, err
}
// IterReverse creates a reversed Iterator positioned on the first entry which key is less than k.
func (s *KVSnapshot) IterReverse(k []byte) (unionstore.Iterator, error) {
scanner, err := newScanner(s, nil, k, s.scanBatchSize, true)
return scanner, errors.Trace(err)
return scanner, err
}
// SetNotFillCache indicates whether tikv should skip filling cache when