mirror of https://github.com/tikv/client-go.git
Replace pingap/errors with pkg/errors (#8)
* replace pingcap/errors with pkg/errors Signed-off-by: disksing <i@disksing.com> * get compiled Signed-off-by: disksing <i@disksing.com> * update meta.go Signed-off-by: disksing <i@disksing.com> * update txnkv.go Signed-off-by: disksing <i@disksing.com> * update codec.go Signed-off-by: disksing <i@disksing.com> * update region_cache.go Signed-off-by: disksing <i@disksing.com> * update mock.go Signed-off-by: disksing <i@disksing.com> * update txn_committer.go Signed-off-by: disksing <i@disksing.com> * update mvcc_leveldb.go Signed-off-by: disksing <i@disksing.com> * update rawkv.go Signed-off-by: disksing <i@disksing.com> * update snapshot.go Signed-off-by: disksing <i@disksing.com> * update lock_resolver.go Signed-off-by: disksing <i@disksing.com> * update scan.go Signed-off-by: disksing <i@disksing.com> * update region_request.go Signed-off-by: disksing <i@disksing.com> * update mvcc.go Signed-off-by: disksing <i@disksing.com> * update rpc.go, backoff.go client.go Signed-off-by: disksing <i@disksing.com> * update kv/*.go Signed-off-by: disksing <i@disksing.com> * update txn.go, pd.go Signed-off-by: disksing <i@disksing.com> * update txn/store/* Signed-off-by: disksing <i@disksing.com> * go mod tidy Signed-off-by: disksing <i@disksing.com>
This commit is contained in:
parent
50aef7f485
commit
970883c423
|
|
@ -16,7 +16,7 @@ package codec
|
|||
import (
|
||||
"bytes"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
|
|||
|
|
@ -14,7 +14,6 @@
|
|||
package codec
|
||||
|
||||
import (
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/kvproto/pkg/metapb"
|
||||
)
|
||||
|
||||
|
|
@ -23,14 +22,14 @@ func DecodeRegionMetaKey(r *metapb.Region) error {
|
|||
if len(r.StartKey) != 0 {
|
||||
_, decoded, err := DecodeBytes(r.StartKey)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
r.StartKey = decoded
|
||||
}
|
||||
if len(r.EndKey) != 0 {
|
||||
_, decoded, err := DecodeBytes(r.EndKey)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
r.EndKey = decoded
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ package codec
|
|||
import (
|
||||
"encoding/binary"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
const signMask uint64 = 0x8000000000000000
|
||||
|
|
|
|||
|
|
@ -19,7 +19,6 @@ import (
|
|||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/tikv/client-go/config"
|
||||
"github.com/tikv/client-go/key"
|
||||
"github.com/tikv/client-go/txnkv"
|
||||
|
|
@ -51,32 +50,27 @@ func initStore() {
|
|||
func puts(args ...[]byte) error {
|
||||
tx, err := client.Begin()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
|
||||
for i := 0; i < len(args); i += 2 {
|
||||
key, val := args[i], args[i+1]
|
||||
err := tx.Set(key, val)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
err = tx.Commit(context.Background())
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
return tx.Commit(context.Background())
|
||||
}
|
||||
|
||||
func get(k []byte) (KV, error) {
|
||||
tx, err := client.Begin()
|
||||
if err != nil {
|
||||
return KV{}, errors.Trace(err)
|
||||
return KV{}, err
|
||||
}
|
||||
v, err := tx.Get(k)
|
||||
if err != nil {
|
||||
return KV{}, errors.Trace(err)
|
||||
return KV{}, err
|
||||
}
|
||||
return KV{K: k, V: v}, nil
|
||||
}
|
||||
|
|
@ -84,29 +78,25 @@ func get(k []byte) (KV, error) {
|
|||
func dels(keys ...[]byte) error {
|
||||
tx, err := client.Begin()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
for _, key := range keys {
|
||||
err := tx.Delete(key)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
err = tx.Commit(context.Background())
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
return nil
|
||||
return tx.Commit(context.Background())
|
||||
}
|
||||
|
||||
func scan(keyPrefix []byte, limit int) ([]KV, error) {
|
||||
tx, err := client.Begin()
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
it, err := tx.Iter(key.Key(keyPrefix), nil)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
defer it.Close()
|
||||
var ret []KV
|
||||
|
|
|
|||
1
go.mod
1
go.mod
|
|
@ -27,7 +27,6 @@ require (
|
|||
github.com/onsi/gomega v1.4.3 // indirect
|
||||
github.com/opentracing/opentracing-go v1.0.2 // indirect
|
||||
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8
|
||||
github.com/pingcap/errors v0.11.0
|
||||
github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3 // indirect
|
||||
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
|
||||
github.com/pingcap/kvproto v0.0.0-20190305055742-ab7debc182d9
|
||||
|
|
|
|||
2
go.sum
2
go.sum
|
|
@ -80,8 +80,6 @@ github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg
|
|||
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
|
||||
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8 h1:USx2/E1bX46VG32FIw034Au6seQ2fY9NEILmNh/UlQg=
|
||||
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ=
|
||||
github.com/pingcap/errors v0.11.0 h1:DCJQB8jrHbQ1VVlMFIrbj2ApScNNotVmkSNplu2yUt4=
|
||||
github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
|
||||
github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3 h1:04yuCf5NMvLU8rB2m4Qs3rynH7EYpMno3lHkewIOdMo=
|
||||
github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3/go.mod h1:DazNTg0PTldtpsQiT9I5tVJwV1onHMKBBgXzmJUlMns=
|
||||
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
|
||||
|
|
|
|||
|
|
@ -16,7 +16,6 @@ package locate
|
|||
import (
|
||||
"context"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/kvproto/pkg/metapb"
|
||||
pd "github.com/pingcap/pd/client"
|
||||
"github.com/tikv/client-go/codec"
|
||||
|
|
@ -51,14 +50,14 @@ func (c *CodecPDClient) GetRegionByID(ctx context.Context, regionID uint64) (*me
|
|||
|
||||
func processRegionResult(region *metapb.Region, peer *metapb.Peer, err error) (*metapb.Region, *metapb.Peer, error) {
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
return nil, nil, err
|
||||
}
|
||||
if region == nil {
|
||||
return nil, nil, nil
|
||||
}
|
||||
err = codec.DecodeRegionMetaKey(region)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
return nil, nil, err
|
||||
}
|
||||
return region, peer, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,10 +21,10 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/google/btree"
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/pingcap/kvproto/pkg/metapb"
|
||||
"github.com/pingcap/pd/client"
|
||||
"github.com/pkg/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/tikv/client-go/codec"
|
||||
"github.com/tikv/client-go/metrics"
|
||||
|
|
@ -108,7 +108,7 @@ func (c *RegionCache) GetRPCContext(bo *retry.Backoffer, id RegionVerID) (*RPCCo
|
|||
|
||||
addr, err := c.GetStoreAddr(bo, peer.GetStoreId())
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
if addr == "" {
|
||||
// Store not found, region must be out of date.
|
||||
|
|
@ -153,7 +153,7 @@ func (c *RegionCache) LocateKey(bo *retry.Backoffer, key []byte) (*KeyLocation,
|
|||
|
||||
r, err := c.loadRegion(bo, key)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
|
|
@ -184,7 +184,7 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K
|
|||
|
||||
r, err := c.loadRegionByID(bo, regionID)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
|
|
@ -209,7 +209,7 @@ func (c *RegionCache) GroupKeysByRegion(bo *retry.Backoffer, keys [][]byte) (map
|
|||
var err error
|
||||
lastLoc, err = c.LocateKey(bo, k)
|
||||
if err != nil {
|
||||
return nil, first, errors.Trace(err)
|
||||
return nil, first, err
|
||||
}
|
||||
}
|
||||
id := lastLoc.Region
|
||||
|
|
@ -226,7 +226,7 @@ func (c *RegionCache) ListRegionIDsInKeyRange(bo *retry.Backoffer, startKey, end
|
|||
for {
|
||||
curRegion, err := c.LocateKey(bo, startKey)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
regionIDs = append(regionIDs, curRegion.Region.id)
|
||||
if curRegion.Contains(endKey) {
|
||||
|
|
@ -334,7 +334,7 @@ func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte) (*Region, erro
|
|||
if backoffErr != nil {
|
||||
err := bo.Backoff(retry.BoPDRPC, backoffErr)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
meta, leader, err := c.pdClient.GetRegion(bo.GetContext(), key)
|
||||
|
|
@ -368,7 +368,7 @@ func (c *RegionCache) loadRegionByID(bo *retry.Backoffer, regionID uint64) (*Reg
|
|||
if backoffErr != nil {
|
||||
err := bo.Backoff(retry.BoPDRPC, backoffErr)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
meta, leader, err := c.pdClient.GetRegionByID(bo.GetContext(), regionID)
|
||||
|
|
@ -411,7 +411,7 @@ func (c *RegionCache) GetStoreAddr(bo *retry.Backoffer, id uint64) (string, erro
|
|||
func (c *RegionCache) ReloadStoreAddr(bo *retry.Backoffer, id uint64) (string, error) {
|
||||
addr, err := c.loadStoreAddr(bo, id)
|
||||
if err != nil || addr == "" {
|
||||
return "", errors.Trace(err)
|
||||
return "", err
|
||||
}
|
||||
|
||||
c.storeMu.Lock()
|
||||
|
|
@ -436,11 +436,11 @@ func (c *RegionCache) loadStoreAddr(bo *retry.Backoffer, id uint64) (string, err
|
|||
metrics.RegionCacheCounter.WithLabelValues("get_store", metrics.RetLabel(err)).Inc()
|
||||
if err != nil {
|
||||
if errors.Cause(err) == context.Canceled {
|
||||
return "", errors.Trace(err)
|
||||
return "", err
|
||||
}
|
||||
err = errors.Errorf("loadStore from PD failed, id: %d, err: %v", id, err)
|
||||
if err = bo.Backoff(retry.BoPDRPC, err); err != nil {
|
||||
return "", errors.Trace(err)
|
||||
return "", err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,7 +14,6 @@
|
|||
package mocktikv
|
||||
|
||||
import (
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/pd/client"
|
||||
)
|
||||
|
||||
|
|
@ -29,7 +28,7 @@ func NewTiKVAndPDClient(cluster *Cluster, mvccStore MVCCStore, path string) (*RP
|
|||
var err error
|
||||
mvccStore, err = NewMVCCLevelDB(path)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -21,8 +21,8 @@ import (
|
|||
"sort"
|
||||
|
||||
"github.com/google/btree"
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/tikv/client-go/codec"
|
||||
)
|
||||
|
||||
|
|
@ -66,7 +66,7 @@ func (l *mvccLock) MarshalBinary() ([]byte, error) {
|
|||
mh.WriteSlice(&buf, l.value)
|
||||
mh.WriteNumber(&buf, l.op)
|
||||
mh.WriteNumber(&buf, l.ttl)
|
||||
return buf.Bytes(), errors.Trace(mh.err)
|
||||
return buf.Bytes(), mh.err
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler interface.
|
||||
|
|
@ -78,7 +78,7 @@ func (l *mvccLock) UnmarshalBinary(data []byte) error {
|
|||
mh.ReadSlice(buf, &l.value)
|
||||
mh.ReadNumber(buf, &l.op)
|
||||
mh.ReadNumber(buf, &l.ttl)
|
||||
return errors.Trace(mh.err)
|
||||
return mh.err
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler interface.
|
||||
|
|
@ -91,7 +91,7 @@ func (v mvccValue) MarshalBinary() ([]byte, error) {
|
|||
mh.WriteNumber(&buf, v.startTS)
|
||||
mh.WriteNumber(&buf, v.commitTS)
|
||||
mh.WriteSlice(&buf, v.value)
|
||||
return buf.Bytes(), errors.Trace(mh.err)
|
||||
return buf.Bytes(), mh.err
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler interface.
|
||||
|
|
@ -104,7 +104,7 @@ func (v *mvccValue) UnmarshalBinary(data []byte) error {
|
|||
mh.ReadNumber(buf, &v.startTS)
|
||||
mh.ReadNumber(buf, &v.commitTS)
|
||||
mh.ReadSlice(buf, &v.value)
|
||||
return errors.Trace(mh.err)
|
||||
return mh.err
|
||||
}
|
||||
|
||||
type marshalHelper struct {
|
||||
|
|
@ -118,11 +118,11 @@ func (mh *marshalHelper) WriteSlice(buf io.Writer, slice []byte) {
|
|||
var tmp [binary.MaxVarintLen64]byte
|
||||
off := binary.PutUvarint(tmp[:], uint64(len(slice)))
|
||||
if err := writeFull(buf, tmp[:off]); err != nil {
|
||||
mh.err = errors.Trace(err)
|
||||
mh.err = err
|
||||
return
|
||||
}
|
||||
if err := writeFull(buf, slice); err != nil {
|
||||
mh.err = errors.Trace(err)
|
||||
mh.err = err
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -132,7 +132,7 @@ func (mh *marshalHelper) WriteNumber(buf io.Writer, n interface{}) {
|
|||
}
|
||||
err := binary.Write(buf, binary.LittleEndian, n)
|
||||
if err != nil {
|
||||
mh.err = errors.Trace(err)
|
||||
mh.err = errors.WithStack(err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -141,7 +141,7 @@ func writeFull(w io.Writer, slice []byte) error {
|
|||
for written < len(slice) {
|
||||
n, err := w.Write(slice[written:])
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
written += n
|
||||
}
|
||||
|
|
@ -154,7 +154,7 @@ func (mh *marshalHelper) ReadNumber(r io.Reader, n interface{}) {
|
|||
}
|
||||
err := binary.Read(r, binary.LittleEndian, n)
|
||||
if err != nil {
|
||||
mh.err = errors.Trace(err)
|
||||
mh.err = errors.WithStack(err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -164,7 +164,7 @@ func (mh *marshalHelper) ReadSlice(r *bytes.Buffer, slice *[]byte) {
|
|||
}
|
||||
sz, err := binary.ReadUvarint(r)
|
||||
if err != nil {
|
||||
mh.err = errors.Trace(err)
|
||||
mh.err = errors.WithStack(err)
|
||||
return
|
||||
}
|
||||
const c10M = 10 * 1024 * 1024
|
||||
|
|
@ -174,7 +174,7 @@ func (mh *marshalHelper) ReadSlice(r *bytes.Buffer, slice *[]byte) {
|
|||
}
|
||||
data := make([]byte, sz)
|
||||
if _, err := io.ReadFull(r, data); err != nil {
|
||||
mh.err = errors.Trace(err)
|
||||
mh.err = errors.WithStack(err)
|
||||
return
|
||||
}
|
||||
*slice = data
|
||||
|
|
|
|||
|
|
@ -18,13 +18,13 @@ import (
|
|||
"math"
|
||||
"sync"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/goleveldb/leveldb"
|
||||
"github.com/pingcap/goleveldb/leveldb/iterator"
|
||||
"github.com/pingcap/goleveldb/leveldb/opt"
|
||||
"github.com/pingcap/goleveldb/leveldb/storage"
|
||||
"github.com/pingcap/goleveldb/leveldb/util"
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/pkg/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/tikv/client-go/codec"
|
||||
)
|
||||
|
|
@ -76,7 +76,7 @@ func mvccDecode(encodedKey []byte) ([]byte, uint64, error) {
|
|||
remainBytes, key, err := codec.DecodeBytes(encodedKey)
|
||||
if err != nil {
|
||||
// should never happen
|
||||
return nil, 0, errors.Trace(err)
|
||||
return nil, 0, err
|
||||
}
|
||||
// if it's meta key
|
||||
if len(remainBytes) == 0 {
|
||||
|
|
@ -86,7 +86,7 @@ func mvccDecode(encodedKey []byte) ([]byte, uint64, error) {
|
|||
remainBytes, ver, err = codec.DecodeUintDesc(remainBytes)
|
||||
if err != nil {
|
||||
// should never happen
|
||||
return nil, 0, errors.Trace(err)
|
||||
return nil, 0, err
|
||||
}
|
||||
if len(remainBytes) != 0 {
|
||||
return nil, 0, ErrInvalidEncodedKey
|
||||
|
|
@ -115,7 +115,7 @@ func NewMVCCLevelDB(path string) (*MVCCLevelDB, error) {
|
|||
d, err = leveldb.OpenFile(path, &opt.Options{BlockCacheCapacity: 600 * 1024 * 1024})
|
||||
}
|
||||
|
||||
return &MVCCLevelDB{db: d}, errors.Trace(err)
|
||||
return &MVCCLevelDB{db: d}, errors.WithStack(err)
|
||||
}
|
||||
|
||||
// Iterator wraps iterator.Iterator to provide Valid() method.
|
||||
|
|
@ -157,7 +157,7 @@ func newScanIterator(db *leveldb.DB, startKey, endKey []byte) (*Iterator, []byte
|
|||
if len(startKey) == 0 && iter.Valid() {
|
||||
key, _, err := mvccDecode(iter.Key())
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
return nil, nil, err
|
||||
}
|
||||
startKey = key
|
||||
}
|
||||
|
|
@ -185,7 +185,7 @@ func (dec *lockDecoder) Decode(iter *Iterator) (bool, error) {
|
|||
iterKey := iter.Key()
|
||||
key, ver, err := mvccDecode(iterKey)
|
||||
if err != nil {
|
||||
return false, errors.Trace(err)
|
||||
return false, err
|
||||
}
|
||||
if !bytes.Equal(key, dec.expectKey) {
|
||||
return false, nil
|
||||
|
|
@ -197,7 +197,7 @@ func (dec *lockDecoder) Decode(iter *Iterator) (bool, error) {
|
|||
var lock mvccLock
|
||||
err = lock.UnmarshalBinary(iter.Value())
|
||||
if err != nil {
|
||||
return false, errors.Trace(err)
|
||||
return false, err
|
||||
}
|
||||
dec.lock = lock
|
||||
iter.Next()
|
||||
|
|
@ -217,7 +217,7 @@ func (dec *valueDecoder) Decode(iter *Iterator) (bool, error) {
|
|||
|
||||
key, ver, err := mvccDecode(iter.Key())
|
||||
if err != nil {
|
||||
return false, errors.Trace(err)
|
||||
return false, err
|
||||
}
|
||||
if !bytes.Equal(key, dec.expectKey) {
|
||||
return false, nil
|
||||
|
|
@ -229,7 +229,7 @@ func (dec *valueDecoder) Decode(iter *Iterator) (bool, error) {
|
|||
var value mvccValue
|
||||
err = value.UnmarshalBinary(iter.Value())
|
||||
if err != nil {
|
||||
return false, errors.Trace(err)
|
||||
return false, err
|
||||
}
|
||||
dec.value = value
|
||||
iter.Next()
|
||||
|
|
@ -248,7 +248,7 @@ func (dec *skipDecoder) Decode(iter *Iterator) (bool, error) {
|
|||
for iter.Valid() {
|
||||
key, _, err := mvccDecode(iter.Key())
|
||||
if err != nil {
|
||||
return false, errors.Trace(err)
|
||||
return false, err
|
||||
}
|
||||
if !bytes.Equal(key, dec.currKey) {
|
||||
dec.currKey = key
|
||||
|
|
@ -270,7 +270,7 @@ func (dec *mvccEntryDecoder) Decode(iter *Iterator) (bool, error) {
|
|||
ldec := lockDecoder{expectKey: dec.expectKey}
|
||||
ok, err := ldec.Decode(iter)
|
||||
if err != nil {
|
||||
return ok, errors.Trace(err)
|
||||
return ok, err
|
||||
}
|
||||
if ok {
|
||||
dec.mvccEntry.lock = &ldec.lock
|
||||
|
|
@ -279,7 +279,7 @@ func (dec *mvccEntryDecoder) Decode(iter *Iterator) (bool, error) {
|
|||
vdec := valueDecoder{expectKey: dec.expectKey}
|
||||
ok, err = vdec.Decode(iter)
|
||||
if err != nil {
|
||||
return ok, errors.Trace(err)
|
||||
return ok, err
|
||||
}
|
||||
if !ok {
|
||||
break
|
||||
|
|
@ -316,13 +316,13 @@ func getValue(iter *Iterator, key []byte, startTS uint64, isoLevel kvrpcpb.Isola
|
|||
startTS, err = dec1.lock.check(startTS, key)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
dec2 := valueDecoder{expectKey: key}
|
||||
for iter.Valid() {
|
||||
ok, err := dec2.Decode(iter)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
if !ok {
|
||||
break
|
||||
|
|
@ -357,7 +357,7 @@ func (mvcc *MVCCLevelDB) BatchGet(ks [][]byte, startTS uint64, isoLevel kvrpcpb.
|
|||
pairs = append(pairs, Pair{
|
||||
Key: k,
|
||||
Value: v,
|
||||
Err: errors.Trace(err),
|
||||
Err: err,
|
||||
})
|
||||
}
|
||||
return pairs
|
||||
|
|
@ -371,7 +371,7 @@ func (mvcc *MVCCLevelDB) Scan(startKey, endKey []byte, limit int, startTS uint64
|
|||
iter, currKey, err := newScanIterator(mvcc.db, startKey, endKey)
|
||||
defer iter.Release()
|
||||
if err != nil {
|
||||
log.Error("scan new iterator fail:", errors.ErrorStack(err))
|
||||
log.Error("scan new iterator fail:", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -382,7 +382,7 @@ func (mvcc *MVCCLevelDB) Scan(startKey, endKey []byte, limit int, startTS uint64
|
|||
if err != nil {
|
||||
pairs = append(pairs, Pair{
|
||||
Key: currKey,
|
||||
Err: errors.Trace(err),
|
||||
Err: err,
|
||||
})
|
||||
}
|
||||
if value != nil {
|
||||
|
|
@ -395,7 +395,7 @@ func (mvcc *MVCCLevelDB) Scan(startKey, endKey []byte, limit int, startTS uint64
|
|||
skip := skipDecoder{currKey}
|
||||
ok, err = skip.Decode(iter)
|
||||
if err != nil {
|
||||
log.Error("seek to next key error:", errors.ErrorStack(err))
|
||||
log.Error("seek to next key error:", err)
|
||||
break
|
||||
}
|
||||
currKey = skip.currKey
|
||||
|
|
@ -450,7 +450,7 @@ func (mvcc *MVCCLevelDB) ReverseScan(startKey, endKey []byte, limit int, startTS
|
|||
helper.entry.values = append(helper.entry.values, value)
|
||||
}
|
||||
if err != nil {
|
||||
log.Error("Unmarshal fail:", errors.Trace(err))
|
||||
log.Error("Unmarshal fail:", err)
|
||||
break
|
||||
}
|
||||
succ = iter.Prev()
|
||||
|
|
@ -529,7 +529,7 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mu
|
|||
}
|
||||
ok, err := dec.Decode(iter)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
if ok {
|
||||
if dec.lock.startTS != startTS {
|
||||
|
|
@ -543,7 +543,7 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mu
|
|||
}
|
||||
ok, err = dec1.Decode(iter)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
// Note that it's a write conflict here, even if the value is a rollback one.
|
||||
if ok && dec1.value.commitTS >= startTS {
|
||||
|
|
@ -560,7 +560,7 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mu
|
|||
writeKey := mvccEncode(mutation.Key, lockVer)
|
||||
writeValue, err := lock.MarshalBinary()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
batch.Put(writeKey, writeValue)
|
||||
return nil
|
||||
|
|
@ -575,7 +575,7 @@ func (mvcc *MVCCLevelDB) Commit(keys [][]byte, startTS, commitTS uint64) error {
|
|||
for _, k := range keys {
|
||||
err := commitKey(mvcc.db, batch, k, startTS, commitTS)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
return mvcc.db.Write(batch, nil)
|
||||
|
|
@ -593,14 +593,14 @@ func commitKey(db *leveldb.DB, batch *leveldb.Batch, key []byte, startTS, commit
|
|||
}
|
||||
ok, err := dec.Decode(iter)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
if !ok || dec.lock.startTS != startTS {
|
||||
// If the lock of this transaction is not found, or the lock is replaced by
|
||||
// another transaction, check commit information of this transaction.
|
||||
c, ok, err1 := getTxnCommitInfo(iter, key, startTS)
|
||||
if err1 != nil {
|
||||
return errors.Trace(err1)
|
||||
return err1
|
||||
}
|
||||
if ok && c.valueType != typeRollback {
|
||||
// c.valueType != typeRollback means the transaction is already committed, do nothing.
|
||||
|
|
@ -609,10 +609,7 @@ func commitKey(db *leveldb.DB, batch *leveldb.Batch, key []byte, startTS, commit
|
|||
return ErrRetryable("txn not found")
|
||||
}
|
||||
|
||||
if err = commitLock(batch, dec.lock, key, startTS, commitTS); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
return nil
|
||||
return commitLock(batch, dec.lock, key, startTS, commitTS)
|
||||
}
|
||||
|
||||
func commitLock(batch *leveldb.Batch, lock mvccLock, key []byte, startTS, commitTS uint64) error {
|
||||
|
|
@ -632,7 +629,7 @@ func commitLock(batch *leveldb.Batch, lock mvccLock, key []byte, startTS, commit
|
|||
writeKey := mvccEncode(key, commitTS)
|
||||
writeValue, err := value.MarshalBinary()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
batch.Put(writeKey, writeValue)
|
||||
}
|
||||
|
|
@ -649,7 +646,7 @@ func (mvcc *MVCCLevelDB) Rollback(keys [][]byte, startTS uint64) error {
|
|||
for _, k := range keys {
|
||||
err := rollbackKey(mvcc.db, batch, k, startTS)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
return mvcc.db.Write(batch, nil)
|
||||
|
|
@ -668,21 +665,18 @@ func rollbackKey(db *leveldb.DB, batch *leveldb.Batch, key []byte, startTS uint6
|
|||
}
|
||||
ok, err := dec.Decode(iter)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
// If current transaction's lock exist.
|
||||
if ok && dec.lock.startTS == startTS {
|
||||
if err = rollbackLock(batch, dec.lock, key, startTS); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
return nil
|
||||
return rollbackLock(batch, dec.lock, key, startTS)
|
||||
}
|
||||
|
||||
// If current transaction's lock not exist.
|
||||
// If commit info of current transaction exist.
|
||||
c, ok, err := getTxnCommitInfo(iter, key, startTS)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
if ok {
|
||||
// If current transaction is already committed.
|
||||
|
|
@ -703,7 +697,7 @@ func rollbackKey(db *leveldb.DB, batch *leveldb.Batch, key []byte, startTS uint6
|
|||
writeKey := mvccEncode(key, startTS)
|
||||
writeValue, err := value.MarshalBinary()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
batch.Put(writeKey, writeValue)
|
||||
return nil
|
||||
|
|
@ -718,7 +712,7 @@ func rollbackLock(batch *leveldb.Batch, lock mvccLock, key []byte, startTS uint6
|
|||
writeKey := mvccEncode(key, startTS)
|
||||
writeValue, err := tomb.MarshalBinary()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
batch.Put(writeKey, writeValue)
|
||||
batch.Delete(mvccEncode(key, lockVer))
|
||||
|
|
@ -732,7 +726,7 @@ func getTxnCommitInfo(iter *Iterator, expectKey []byte, startTS uint64) (mvccVal
|
|||
}
|
||||
ok, err := dec.Decode(iter)
|
||||
if err != nil || !ok {
|
||||
return mvccValue{}, ok, errors.Trace(err)
|
||||
return mvccValue{}, ok, err
|
||||
}
|
||||
|
||||
if dec.value.startTS == startTS {
|
||||
|
|
@ -750,7 +744,7 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS uint64) error {
|
|||
batch := &leveldb.Batch{}
|
||||
err := rollbackKey(mvcc.db, batch, key, startTS)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
return mvcc.db.Write(batch, nil)
|
||||
}
|
||||
|
|
@ -763,7 +757,7 @@ func (mvcc *MVCCLevelDB) ScanLock(startKey, endKey []byte, maxTS uint64) ([]*kvr
|
|||
iter, currKey, err := newScanIterator(mvcc.db, startKey, endKey)
|
||||
defer iter.Release()
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var locks []*kvrpcpb.LockInfo
|
||||
|
|
@ -771,7 +765,7 @@ func (mvcc *MVCCLevelDB) ScanLock(startKey, endKey []byte, maxTS uint64) ([]*kvr
|
|||
dec := lockDecoder{expectKey: currKey}
|
||||
ok, err := dec.Decode(iter)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
if ok && dec.lock.startTS <= maxTS {
|
||||
locks = append(locks, &kvrpcpb.LockInfo{
|
||||
|
|
@ -784,7 +778,7 @@ func (mvcc *MVCCLevelDB) ScanLock(startKey, endKey []byte, maxTS uint64) ([]*kvr
|
|||
skip := skipDecoder{currKey: currKey}
|
||||
_, err = skip.Decode(iter)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
currKey = skip.currKey
|
||||
}
|
||||
|
|
@ -799,7 +793,7 @@ func (mvcc *MVCCLevelDB) ResolveLock(startKey, endKey []byte, startTS, commitTS
|
|||
iter, currKey, err := newScanIterator(mvcc.db, startKey, endKey)
|
||||
defer iter.Release()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
|
||||
batch := &leveldb.Batch{}
|
||||
|
|
@ -807,7 +801,7 @@ func (mvcc *MVCCLevelDB) ResolveLock(startKey, endKey []byte, startTS, commitTS
|
|||
dec := lockDecoder{expectKey: currKey}
|
||||
ok, err := dec.Decode(iter)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
if ok && dec.lock.startTS == startTS {
|
||||
if commitTS > 0 {
|
||||
|
|
@ -816,14 +810,14 @@ func (mvcc *MVCCLevelDB) ResolveLock(startKey, endKey []byte, startTS, commitTS
|
|||
err = rollbackLock(batch, dec.lock, currKey, startTS)
|
||||
}
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
skip := skipDecoder{currKey: currKey}
|
||||
_, err = skip.Decode(iter)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
currKey = skip.currKey
|
||||
}
|
||||
|
|
@ -838,7 +832,7 @@ func (mvcc *MVCCLevelDB) BatchResolveLock(startKey, endKey []byte, txnInfos map[
|
|||
iter, currKey, err := newScanIterator(mvcc.db, startKey, endKey)
|
||||
defer iter.Release()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
|
||||
batch := &leveldb.Batch{}
|
||||
|
|
@ -846,7 +840,7 @@ func (mvcc *MVCCLevelDB) BatchResolveLock(startKey, endKey []byte, txnInfos map[
|
|||
dec := lockDecoder{expectKey: currKey}
|
||||
ok, err := dec.Decode(iter)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
if ok {
|
||||
if commitTS, ok := txnInfos[dec.lock.startTS]; ok {
|
||||
|
|
@ -856,7 +850,7 @@ func (mvcc *MVCCLevelDB) BatchResolveLock(startKey, endKey []byte, txnInfos map[
|
|||
err = rollbackLock(batch, dec.lock, currKey, dec.lock.startTS)
|
||||
}
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -864,7 +858,7 @@ func (mvcc *MVCCLevelDB) BatchResolveLock(startKey, endKey []byte, txnInfos map[
|
|||
skip := skipDecoder{currKey: currKey}
|
||||
_, err = skip.Decode(iter)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
currKey = skip.currKey
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/pingcap/kvproto/pkg/metapb"
|
||||
"github.com/pingcap/pd/client"
|
||||
)
|
||||
|
|
|
|||
|
|
@ -20,10 +20,10 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/kvproto/pkg/errorpb"
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/pingcap/kvproto/pkg/metapb"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/tikv/client-go/rpc"
|
||||
)
|
||||
|
||||
|
|
@ -754,7 +754,7 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *rpc.Reque
|
|||
// copStream, err := handler.handleCopStream(ctx1, r)
|
||||
// if err != nil {
|
||||
// cancel()
|
||||
// return nil, errors.Trace(err)
|
||||
// return nil, err
|
||||
// }
|
||||
|
||||
// streamResp := &rpc.CopStreamResponse{
|
||||
|
|
@ -766,7 +766,7 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *rpc.Reque
|
|||
|
||||
// first, err := streamResp.Recv()
|
||||
// if err != nil {
|
||||
// return nil, errors.Trace(err)
|
||||
// return nil, err
|
||||
// }
|
||||
// streamResp.Response = first
|
||||
// resp.CopStream = streamResp
|
||||
|
|
|
|||
|
|
@ -13,6 +13,6 @@
|
|||
|
||||
package rawkv
|
||||
|
||||
import "github.com/pingcap/errors"
|
||||
import "github.com/pkg/errors"
|
||||
|
||||
var ErrBodyMissing = errors.New("response body is missing")
|
||||
|
|
|
|||
|
|
@ -18,9 +18,9 @@ import (
|
|||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
pd "github.com/pingcap/pd/client"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/tikv/client-go/config"
|
||||
"github.com/tikv/client-go/locate"
|
||||
"github.com/tikv/client-go/metrics"
|
||||
|
|
@ -59,7 +59,7 @@ func NewRawKVClient(pdAddrs []string, security config.Security) (*RawKVClient, e
|
|||
KeyPath: security.SSLKey,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
return &RawKVClient{
|
||||
clusterID: pdCli.GetClusterID(context.TODO()),
|
||||
|
|
@ -93,11 +93,11 @@ func (c *RawKVClient) Get(key []byte) ([]byte, error) {
|
|||
}
|
||||
resp, _, err := c.sendReq(key, req)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
cmdResp := resp.RawGet
|
||||
if cmdResp == nil {
|
||||
return nil, errors.Trace(ErrBodyMissing)
|
||||
return nil, errors.WithStack(ErrBodyMissing)
|
||||
}
|
||||
if cmdResp.GetError() != "" {
|
||||
return nil, errors.New(cmdResp.GetError())
|
||||
|
|
@ -118,12 +118,12 @@ func (c *RawKVClient) BatchGet(keys [][]byte) ([][]byte, error) {
|
|||
bo := retry.NewBackoffer(context.Background(), retry.RawkvMaxBackoff)
|
||||
resp, err := c.sendBatchReq(bo, keys, rpc.CmdRawBatchGet)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cmdResp := resp.RawBatchGet
|
||||
if cmdResp == nil {
|
||||
return nil, errors.Trace(ErrBodyMissing)
|
||||
return nil, errors.WithStack(ErrBodyMissing)
|
||||
}
|
||||
|
||||
keyToValue := make(map[string][]byte, len(keys))
|
||||
|
|
@ -158,11 +158,11 @@ func (c *RawKVClient) Put(key, value []byte) error {
|
|||
}
|
||||
resp, _, err := c.sendReq(key, req)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
cmdResp := resp.RawPut
|
||||
if cmdResp == nil {
|
||||
return errors.Trace(ErrBodyMissing)
|
||||
return errors.WithStack(ErrBodyMissing)
|
||||
}
|
||||
if cmdResp.GetError() != "" {
|
||||
return errors.New(cmdResp.GetError())
|
||||
|
|
@ -186,8 +186,7 @@ func (c *RawKVClient) BatchPut(keys, values [][]byte) error {
|
|||
}
|
||||
}
|
||||
bo := retry.NewBackoffer(context.Background(), retry.RawkvMaxBackoff)
|
||||
err := c.sendBatchPut(bo, keys, values)
|
||||
return errors.Trace(err)
|
||||
return c.sendBatchPut(bo, keys, values)
|
||||
}
|
||||
|
||||
// Delete deletes a key-value pair from TiKV.
|
||||
|
|
@ -203,11 +202,11 @@ func (c *RawKVClient) Delete(key []byte) error {
|
|||
}
|
||||
resp, _, err := c.sendReq(key, req)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
cmdResp := resp.RawDelete
|
||||
if cmdResp == nil {
|
||||
return errors.Trace(ErrBodyMissing)
|
||||
return errors.WithStack(ErrBodyMissing)
|
||||
}
|
||||
if cmdResp.GetError() != "" {
|
||||
return errors.New(cmdResp.GetError())
|
||||
|
|
@ -225,11 +224,11 @@ func (c *RawKVClient) BatchDelete(keys [][]byte) error {
|
|||
bo := retry.NewBackoffer(context.Background(), retry.RawkvMaxBackoff)
|
||||
resp, err := c.sendBatchReq(bo, keys, rpc.CmdRawBatchDelete)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
cmdResp := resp.RawBatchDelete
|
||||
if cmdResp == nil {
|
||||
return errors.Trace(ErrBodyMissing)
|
||||
return errors.WithStack(ErrBodyMissing)
|
||||
}
|
||||
if cmdResp.GetError() != "" {
|
||||
return errors.New(cmdResp.GetError())
|
||||
|
|
@ -255,11 +254,11 @@ func (c *RawKVClient) DeleteRange(startKey []byte, endKey []byte) error {
|
|||
var actualEndKey []byte
|
||||
resp, actualEndKey, err = c.sendDeleteRangeReq(startKey, endKey)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
cmdResp := resp.RawDeleteRange
|
||||
if cmdResp == nil {
|
||||
return errors.Trace(ErrBodyMissing)
|
||||
return errors.WithStack(ErrBodyMissing)
|
||||
}
|
||||
if cmdResp.GetError() != "" {
|
||||
return errors.New(cmdResp.GetError())
|
||||
|
|
@ -280,7 +279,7 @@ func (c *RawKVClient) Scan(startKey, endKey []byte, limit int) (keys [][]byte, v
|
|||
defer func() { metrics.RawkvCmdHistogram.WithLabelValues("raw_scan").Observe(time.Since(start).Seconds()) }()
|
||||
|
||||
if limit > MaxRawKVScanLimit {
|
||||
return nil, nil, errors.Trace(ErrMaxScanLimitExceeded)
|
||||
return nil, nil, errors.WithStack(ErrMaxScanLimitExceeded)
|
||||
}
|
||||
|
||||
for len(keys) < limit {
|
||||
|
|
@ -294,11 +293,11 @@ func (c *RawKVClient) Scan(startKey, endKey []byte, limit int) (keys [][]byte, v
|
|||
}
|
||||
resp, loc, err := c.sendReq(startKey, req)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
return nil, nil, err
|
||||
}
|
||||
cmdResp := resp.RawScan
|
||||
if cmdResp == nil {
|
||||
return nil, nil, errors.Trace(ErrBodyMissing)
|
||||
return nil, nil, errors.WithStack(ErrBodyMissing)
|
||||
}
|
||||
for _, pair := range cmdResp.Kvs {
|
||||
keys = append(keys, pair.Key)
|
||||
|
|
@ -318,20 +317,20 @@ func (c *RawKVClient) sendReq(key []byte, req *rpc.Request) (*rpc.Response, *loc
|
|||
for {
|
||||
loc, err := c.regionCache.LocateKey(bo, key)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
return nil, nil, err
|
||||
}
|
||||
resp, err := sender.SendReq(bo, req, loc.Region, rpc.ReadTimeoutShort)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
return nil, nil, err
|
||||
}
|
||||
regionErr, err := resp.GetRegionError()
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
return nil, nil, err
|
||||
}
|
||||
if regionErr != nil {
|
||||
err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
return nil, nil, err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
|
@ -342,7 +341,7 @@ func (c *RawKVClient) sendReq(key []byte, req *rpc.Request) (*rpc.Response, *loc
|
|||
func (c *RawKVClient) sendBatchReq(bo *retry.Backoffer, keys [][]byte, cmdType rpc.CmdType) (*rpc.Response, error) { // split the keys
|
||||
groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var batches []batch
|
||||
|
|
@ -410,18 +409,18 @@ func (c *RawKVClient) doBatchReq(bo *retry.Backoffer, batch batch, cmdType rpc.C
|
|||
|
||||
batchResp := singleBatchResp{}
|
||||
if err != nil {
|
||||
batchResp.err = errors.Trace(err)
|
||||
batchResp.err = err
|
||||
return batchResp
|
||||
}
|
||||
regionErr, err := resp.GetRegionError()
|
||||
if err != nil {
|
||||
batchResp.err = errors.Trace(err)
|
||||
batchResp.err = err
|
||||
return batchResp
|
||||
}
|
||||
if regionErr != nil {
|
||||
err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
batchResp.err = errors.Trace(err)
|
||||
batchResp.err = err
|
||||
return batchResp
|
||||
}
|
||||
resp, err = c.sendBatchReq(bo, batch.keys, cmdType)
|
||||
|
|
@ -436,7 +435,7 @@ func (c *RawKVClient) doBatchReq(bo *retry.Backoffer, batch batch, cmdType rpc.C
|
|||
case rpc.CmdRawBatchDelete:
|
||||
cmdResp := resp.RawBatchDelete
|
||||
if cmdResp == nil {
|
||||
batchResp.err = errors.Trace(ErrBodyMissing)
|
||||
batchResp.err = errors.WithStack(ErrBodyMissing)
|
||||
return batchResp
|
||||
}
|
||||
if cmdResp.GetError() != "" {
|
||||
|
|
@ -458,7 +457,7 @@ func (c *RawKVClient) sendDeleteRangeReq(startKey []byte, endKey []byte) (*rpc.R
|
|||
for {
|
||||
loc, err := c.regionCache.LocateKey(bo, startKey)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
actualEndKey := endKey
|
||||
|
|
@ -476,16 +475,16 @@ func (c *RawKVClient) sendDeleteRangeReq(startKey []byte, endKey []byte) (*rpc.R
|
|||
|
||||
resp, err := sender.SendReq(bo, req, loc.Region, rpc.ReadTimeoutShort)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
return nil, nil, err
|
||||
}
|
||||
regionErr, err := resp.GetRegionError()
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
return nil, nil, err
|
||||
}
|
||||
if regionErr != nil {
|
||||
err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
return nil, nil, err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
|
@ -500,7 +499,7 @@ func (c *RawKVClient) sendBatchPut(bo *retry.Backoffer, keys, values [][]byte) e
|
|||
}
|
||||
groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
var batches []batch
|
||||
// split the keys by size and RegionVerID
|
||||
|
|
@ -527,7 +526,7 @@ func (c *RawKVClient) sendBatchPut(bo *retry.Backoffer, keys, values [][]byte) e
|
|||
}
|
||||
}
|
||||
}
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
|
||||
func appendKeyBatches(batches []batch, regionID locate.RegionVerID, groupKeys [][]byte, limit int) []batch {
|
||||
|
|
@ -586,16 +585,16 @@ func (c *RawKVClient) doBatchPut(bo *retry.Backoffer, batch batch) error {
|
|||
sender := rpc.NewRegionRequestSender(c.regionCache, c.rpcClient)
|
||||
resp, err := sender.SendReq(bo, req, batch.regionID, rpc.ReadTimeoutShort)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
regionErr, err := resp.GetRegionError()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
if regionErr != nil {
|
||||
err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
// recursive call
|
||||
return c.sendBatchPut(bo, batch.keys, batch.values)
|
||||
|
|
@ -603,7 +602,7 @@ func (c *RawKVClient) doBatchPut(bo *retry.Backoffer, batch batch) error {
|
|||
|
||||
cmdResp := resp.RawBatchPut
|
||||
if cmdResp == nil {
|
||||
return errors.Trace(ErrBodyMissing)
|
||||
return errors.WithStack(ErrBodyMissing)
|
||||
}
|
||||
if cmdResp.GetError() != "" {
|
||||
return errors.New(cmdResp.GetError())
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ import (
|
|||
"math/rand"
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pkg/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/tikv/client-go/metrics"
|
||||
)
|
||||
|
|
@ -178,7 +178,7 @@ func NewBackoffer(ctx context.Context, maxSleep int) *Backoffer {
|
|||
func (b *Backoffer) Backoff(typ BackoffType, err error) error {
|
||||
select {
|
||||
case <-b.ctx.Done():
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
default:
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ import (
|
|||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/pingcap/kvproto/pkg/coprocessor"
|
||||
"github.com/pingcap/kvproto/pkg/errorpb"
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
|
|
@ -587,7 +587,7 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp
|
|||
return nil, errors.Errorf("invalid request type: %v", req.Type)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
|
@ -606,7 +606,7 @@ func (resp *CopStreamResponse) Recv() (*coprocessor.Response, error) {
|
|||
ret, err := resp.Tikv_CoprocessorStreamClient.Recv()
|
||||
|
||||
atomic.StoreInt64(&resp.Lease.deadline, 0) // Stop the lease check.
|
||||
return ret, errors.Trace(err)
|
||||
return ret, errors.WithStack(err)
|
||||
}
|
||||
|
||||
// Close closes the CopStreamResponse object.
|
||||
|
|
|
|||
|
|
@ -24,9 +24,9 @@ import (
|
|||
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
|
||||
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
|
||||
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/kvproto/pkg/coprocessor"
|
||||
"github.com/pingcap/kvproto/pkg/tikvpb"
|
||||
"github.com/pkg/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/tikv/client-go/config"
|
||||
"github.com/tikv/client-go/metrics"
|
||||
|
|
@ -203,7 +203,7 @@ func (a *connArray) Init(addr string, security config.Security) error {
|
|||
if len(security.SSLCA) != 0 {
|
||||
tlsConfig, err := security.ToTLSConfig()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
opt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))
|
||||
}
|
||||
|
|
@ -245,7 +245,7 @@ func (a *connArray) Init(addr string, security config.Security) error {
|
|||
if err != nil {
|
||||
// Cleanup if the initialization fails.
|
||||
a.Close()
|
||||
return errors.Trace(err)
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
a.v[i] = conn
|
||||
|
||||
|
|
@ -255,7 +255,7 @@ func (a *connArray) Init(addr string, security config.Security) error {
|
|||
streamClient, err := tikvClient.BatchCommands(context.TODO())
|
||||
if err != nil {
|
||||
a.Close()
|
||||
return errors.Trace(err)
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
batchClient := &batchCommandsClient{
|
||||
conn: conn,
|
||||
|
|
@ -530,19 +530,19 @@ func sendBatchRequest(
|
|||
case connArray.batchCommandsCh <- entry:
|
||||
case <-ctx1.Done():
|
||||
log.Warnf("SendRequest to %s is timeout", addr)
|
||||
return nil, errors.Trace(gstatus.Error(gcodes.DeadlineExceeded, "Canceled or timeout"))
|
||||
return nil, errors.WithStack(gstatus.Error(gcodes.DeadlineExceeded, "Canceled or timeout"))
|
||||
}
|
||||
|
||||
select {
|
||||
case res, ok := <-entry.res:
|
||||
if !ok {
|
||||
return nil, errors.Trace(entry.err)
|
||||
return nil, errors.WithStack(entry.err)
|
||||
}
|
||||
return FromBatchCommandsResponse(res), nil
|
||||
case <-ctx1.Done():
|
||||
atomic.StoreInt32(&entry.canceled, 1)
|
||||
log.Warnf("SendRequest to %s is canceled", addr)
|
||||
return nil, errors.Trace(gstatus.Error(gcodes.DeadlineExceeded, "Canceled or timeout"))
|
||||
return nil, errors.WithStack(gstatus.Error(gcodes.DeadlineExceeded, "Canceled or timeout"))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -557,7 +557,7 @@ func (c *rpcClient) SendRequest(ctx context.Context, addr string, req *Request,
|
|||
|
||||
connArray, err := c.getConnArray(addr)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if config.MaxBatchSize > 0 {
|
||||
|
|
@ -580,7 +580,7 @@ func (c *rpcClient) SendRequest(ctx context.Context, addr string, req *Request,
|
|||
defer cancel()
|
||||
resp, err := CallRPC(ctx1, client, req)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Put the lease object to the timeout channel, so it would be checked periodically.
|
||||
|
|
@ -596,7 +596,7 @@ func (c *rpcClient) SendRequest(ctx context.Context, addr string, req *Request,
|
|||
first, err = copStream.Recv()
|
||||
if err != nil {
|
||||
if errors.Cause(err) != io.EOF {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
log.Debug("copstream returns nothing for the request.")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,8 +17,8 @@ import (
|
|||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/kvproto/pkg/errorpb"
|
||||
"github.com/pkg/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/tikv/client-go/locate"
|
||||
"github.com/tikv/client-go/metrics"
|
||||
|
|
@ -91,7 +91,7 @@ func (s *RegionRequestSender) SendReq(bo *retry.Backoffer, req *Request, regionI
|
|||
for {
|
||||
ctx, err := s.regionCache.GetRPCContext(bo, regionID)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
if ctx == nil {
|
||||
// If the region is not found in cache, it must be out
|
||||
|
|
@ -106,7 +106,7 @@ func (s *RegionRequestSender) SendReq(bo *retry.Backoffer, req *Request, regionI
|
|||
s.storeAddr = ctx.Addr
|
||||
resp, retry, err := s.sendReqToRegion(bo, ctx, req, timeout)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
if retry {
|
||||
continue
|
||||
|
|
@ -114,12 +114,12 @@ func (s *RegionRequestSender) SendReq(bo *retry.Backoffer, req *Request, regionI
|
|||
|
||||
regionErr, err := resp.GetRegionError()
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
if regionErr != nil {
|
||||
retry, err := s.onRegionError(bo, ctx, regionErr)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
if retry {
|
||||
continue
|
||||
|
|
@ -131,13 +131,13 @@ func (s *RegionRequestSender) SendReq(bo *retry.Backoffer, req *Request, regionI
|
|||
|
||||
func (s *RegionRequestSender) sendReqToRegion(bo *retry.Backoffer, ctx *locate.RPCContext, req *Request, timeout time.Duration) (resp *Response, retry bool, err error) {
|
||||
if e := SetContext(req, ctx.Meta, ctx.Peer); e != nil {
|
||||
return nil, false, errors.Trace(e)
|
||||
return nil, false, err
|
||||
}
|
||||
resp, err = s.client.SendRequest(bo.GetContext(), ctx.Addr, req, timeout)
|
||||
if err != nil {
|
||||
s.rpcError = err
|
||||
if e := s.onSendFail(bo, ctx, err); e != nil {
|
||||
return nil, false, errors.Trace(e)
|
||||
return nil, false, err
|
||||
}
|
||||
return nil, true, nil
|
||||
}
|
||||
|
|
@ -147,7 +147,7 @@ func (s *RegionRequestSender) sendReqToRegion(bo *retry.Backoffer, ctx *locate.R
|
|||
func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *locate.RPCContext, err error) error {
|
||||
// If it failed because the context is cancelled by ourself, don't retry.
|
||||
if errors.Cause(err) == context.Canceled {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
code := codes.Unknown
|
||||
if s, ok := status.FromError(errors.Cause(err)); ok {
|
||||
|
|
@ -156,12 +156,12 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *locate.RPCCon
|
|||
if code == codes.Canceled {
|
||||
select {
|
||||
case <-bo.GetContext().Done():
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
default:
|
||||
// If we don't cancel, but the error code is Canceled, it must be from grpc remote.
|
||||
// This may happen when tikv is killed and exiting.
|
||||
// Backoff and retry in this case.
|
||||
log.Warn("receive a grpc cancel signal from remote:", errors.ErrorStack(err))
|
||||
log.Warn("receive a grpc cancel signal from remote:", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -171,8 +171,7 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *locate.RPCCon
|
|||
// When a store is not available, the leader of related region should be elected quickly.
|
||||
// TODO: the number of retry time should be limited:since region may be unavailable
|
||||
// when some unrecoverable disaster happened.
|
||||
err = bo.Backoff(retry.BoTiKVRPC, errors.Errorf("send tikv request error: %v, ctx: %v, try next peer later", err, ctx))
|
||||
return errors.Trace(err)
|
||||
return bo.Backoff(retry.BoTiKVRPC, errors.Errorf("send tikv request error: %v, ctx: %v, try next peer later", err, ctx))
|
||||
}
|
||||
|
||||
func regionErrorToLabel(e *errorpb.Error) string {
|
||||
|
|
@ -209,7 +208,7 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *locate.RPC
|
|||
}
|
||||
|
||||
if err = bo.Backoff(boType, errors.Errorf("not leader: %v, ctx: %v", notLeader, ctx)); err != nil {
|
||||
return false, errors.Trace(err)
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
|
|
@ -225,13 +224,13 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *locate.RPC
|
|||
if epochNotMatch := regionErr.GetEpochNotMatch(); epochNotMatch != nil {
|
||||
log.Debugf("tikv reports `StaleEpoch`, ctx: %v, retry later", ctx)
|
||||
err = s.regionCache.OnRegionStale(ctx, epochNotMatch.CurrentRegions)
|
||||
return false, errors.Trace(err)
|
||||
return false, err
|
||||
}
|
||||
if regionErr.GetServerIsBusy() != nil {
|
||||
log.Warnf("tikv reports `ServerIsBusy`, reason: %s, ctx: %v, retry later", regionErr.GetServerIsBusy().GetReason(), ctx)
|
||||
err = bo.Backoff(retry.BoServerBusy, errors.Errorf("server is busy, ctx: %v", ctx))
|
||||
if err != nil {
|
||||
return false, errors.Trace(err)
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,7 +16,6 @@ package txnkv
|
|||
import (
|
||||
"context"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/tikv/client-go/config"
|
||||
"github.com/tikv/client-go/retry"
|
||||
"github.com/tikv/client-go/txnkv/store"
|
||||
|
|
@ -29,7 +28,7 @@ type TxnClient struct {
|
|||
func NewTxnClient(pdAddrs []string, security config.Security) (*TxnClient, error) {
|
||||
tikvStore, err := store.NewStore(pdAddrs, security)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
return &TxnClient{
|
||||
tikvStore: tikvStore,
|
||||
|
|
@ -37,14 +36,13 @@ func NewTxnClient(pdAddrs []string, security config.Security) (*TxnClient, error
|
|||
}
|
||||
|
||||
func (c *TxnClient) Close() error {
|
||||
err := c.tikvStore.Close()
|
||||
return errors.Trace(err)
|
||||
return c.tikvStore.Close()
|
||||
}
|
||||
|
||||
func (c *TxnClient) Begin() (*Transaction, error) {
|
||||
ts, err := c.GetTS()
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
return c.BeginWithTS(ts), nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,7 +14,6 @@
|
|||
package kv
|
||||
|
||||
import (
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/tikv/client-go/key"
|
||||
)
|
||||
|
||||
|
|
@ -67,7 +66,7 @@ func (s *BufferStore) Get(k key.Key) ([]byte, error) {
|
|||
val, err = s.r.Get(k)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
if len(val) == 0 {
|
||||
return nil, ErrNotExist
|
||||
|
|
@ -79,11 +78,11 @@ func (s *BufferStore) Get(k key.Key) ([]byte, error) {
|
|||
func (s *BufferStore) Iter(k key.Key, upperBound key.Key) (Iterator, error) {
|
||||
bufferIt, err := s.MemBuffer.Iter(k, upperBound)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
retrieverIt, err := s.r.Iter(k, upperBound)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
return NewUnionIter(bufferIt, retrieverIt, false)
|
||||
}
|
||||
|
|
@ -92,27 +91,27 @@ func (s *BufferStore) Iter(k key.Key, upperBound key.Key) (Iterator, error) {
|
|||
func (s *BufferStore) IterReverse(k key.Key) (Iterator, error) {
|
||||
bufferIt, err := s.MemBuffer.IterReverse(k)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
retrieverIt, err := s.r.IterReverse(k)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
return NewUnionIter(bufferIt, retrieverIt, true)
|
||||
}
|
||||
|
||||
// WalkBuffer iterates all buffered kv pairs.
|
||||
func (s *BufferStore) WalkBuffer(f func(k key.Key, v []byte) error) error {
|
||||
return errors.Trace(WalkMemBuffer(s.MemBuffer, f))
|
||||
return WalkMemBuffer(s.MemBuffer, f)
|
||||
}
|
||||
|
||||
// SaveTo saves all buffered kv pairs into a Mutator.
|
||||
func (s *BufferStore) SaveTo(m Mutator) error {
|
||||
err := s.WalkBuffer(func(k key.Key, v []byte) error {
|
||||
if len(v) == 0 {
|
||||
return errors.Trace(m.Delete(k))
|
||||
return m.Delete(k)
|
||||
}
|
||||
return errors.Trace(m.Set(k, v))
|
||||
return m.Set(k, v)
|
||||
})
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@
|
|||
package kv
|
||||
|
||||
import (
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
var (
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ import (
|
|||
"fmt"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/pingcap/goleveldb/leveldb"
|
||||
"github.com/pingcap/goleveldb/leveldb/comparer"
|
||||
"github.com/pingcap/goleveldb/leveldb/iterator"
|
||||
|
|
@ -58,7 +58,7 @@ func (m *memDbBuffer) Iter(k key.Key, upperBound key.Key) (Iterator, error) {
|
|||
|
||||
err := i.Next()
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
return i, nil
|
||||
}
|
||||
|
|
@ -90,7 +90,7 @@ func (m *memDbBuffer) Get(k key.Key) ([]byte, error) {
|
|||
// Set associates key with value.
|
||||
func (m *memDbBuffer) Set(k key.Key, v []byte) error {
|
||||
if len(v) == 0 {
|
||||
return errors.Trace(ErrCannotSetNilValue)
|
||||
return errors.WithStack(ErrCannotSetNilValue)
|
||||
}
|
||||
if len(k)+len(v) > m.entrySizeLimit {
|
||||
return errors.WithMessage(ErrEntryTooLarge, fmt.Sprintf("entry too large, size: %d", len(k)+len(v)))
|
||||
|
|
@ -103,13 +103,13 @@ func (m *memDbBuffer) Set(k key.Key, v []byte) error {
|
|||
if m.Len() > int(m.bufferLenLimit) {
|
||||
return errors.WithMessage(ErrTxnTooLarge, fmt.Sprintf("transaction too large, size:%d", m.Size()))
|
||||
}
|
||||
return errors.Trace(err)
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
// Delete removes the entry from buffer with provided key.
|
||||
func (m *memDbBuffer) Delete(k key.Key) error {
|
||||
err := m.db.Put(k, nil)
|
||||
return errors.Trace(err)
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
// Size returns sum of keys and values length.
|
||||
|
|
@ -161,17 +161,17 @@ func (i *memDbIter) Close() {
|
|||
func WalkMemBuffer(memBuf MemBuffer, f func(k key.Key, v []byte) error) error {
|
||||
iter, err := memBuf.Iter(nil, nil)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
defer iter.Close()
|
||||
for iter.Valid() {
|
||||
if err = f(iter.Key(), iter.Value()); err != nil {
|
||||
return errors.Trace(err)
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
err = iter.Next()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -14,7 +14,6 @@
|
|||
package kv
|
||||
|
||||
import (
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/tikv/client-go/key"
|
||||
)
|
||||
|
||||
|
|
@ -38,7 +37,7 @@ func (s *mockSnapshot) BatchGet(keys []key.Key) (map[string][]byte, error) {
|
|||
continue
|
||||
}
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
m[string(k)] = v
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,7 +14,6 @@
|
|||
package kv
|
||||
|
||||
import (
|
||||
"github.com/pingcap/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/tikv/client-go/key"
|
||||
)
|
||||
|
|
@ -43,7 +42,7 @@ func NewUnionIter(dirtyIt Iterator, snapshotIt Iterator, reverse bool) (*UnionIt
|
|||
}
|
||||
err := it.updateCur()
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
return it, nil
|
||||
}
|
||||
|
|
@ -52,14 +51,14 @@ func NewUnionIter(dirtyIt Iterator, snapshotIt Iterator, reverse bool) (*UnionIt
|
|||
func (iter *UnionIter) dirtyNext() error {
|
||||
err := iter.dirtyIt.Next()
|
||||
iter.dirtyValid = iter.dirtyIt.Valid()
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
|
||||
// snapshotNext makes iter.snapshotIt go and update valid status.
|
||||
func (iter *UnionIter) snapshotNext() error {
|
||||
err := iter.snapshotIt.Next()
|
||||
iter.snapshotValid = iter.snapshotIt.Valid()
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (iter *UnionIter) updateCur() error {
|
||||
|
|
@ -80,7 +79,7 @@ func (iter *UnionIter) updateCur() error {
|
|||
// if delete it
|
||||
if len(iter.dirtyIt.Value()) == 0 {
|
||||
if err := iter.dirtyNext(); err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
|
@ -101,15 +100,15 @@ func (iter *UnionIter) updateCur() error {
|
|||
// snapshot has a record, but txn says we have deleted it
|
||||
// just go next
|
||||
if err := iter.dirtyNext(); err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
if err := iter.snapshotNext(); err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
if err := iter.snapshotNext(); err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
iter.curIsDirty = true
|
||||
break
|
||||
|
|
@ -123,7 +122,7 @@ func (iter *UnionIter) updateCur() error {
|
|||
log.Warnf("[kv] delete a record not exists? k = %q", iter.dirtyIt.Key())
|
||||
// jump over this deletion
|
||||
if err := iter.dirtyNext(); err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
|
@ -144,10 +143,9 @@ func (iter *UnionIter) Next() error {
|
|||
err = iter.dirtyNext()
|
||||
}
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
err = iter.updateCur()
|
||||
return errors.Trace(err)
|
||||
return iter.updateCur()
|
||||
}
|
||||
|
||||
// Value implements the Iterator Value interface.
|
||||
|
|
|
|||
|
|
@ -14,7 +14,6 @@
|
|||
package kv
|
||||
|
||||
import (
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/tikv/client-go/key"
|
||||
)
|
||||
|
||||
|
|
@ -187,7 +186,7 @@ func (us *unionStore) Get(k key.Key) ([]byte, error) {
|
|||
v, err = us.BufferStore.r.Get(k)
|
||||
}
|
||||
if err != nil {
|
||||
return v, errors.Trace(err)
|
||||
return v, err
|
||||
}
|
||||
if len(v) == 0 {
|
||||
return nil, ErrNotExist
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ package kv
|
|||
|
||||
import (
|
||||
. "github.com/pingcap/check"
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
var _ = Suite(&testUnionStoreSuite{})
|
||||
|
|
|
|||
|
|
@ -18,7 +18,6 @@ import (
|
|||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/pd/client"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/tikv/client-go/metrics"
|
||||
|
|
@ -52,7 +51,7 @@ func NewPdOracle(pdClient pd.Client, updateInterval time.Duration) (oracle.Oracl
|
|||
_, err := o.GetTimestamp(ctx)
|
||||
if err != nil {
|
||||
o.Close()
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
return o, nil
|
||||
}
|
||||
|
|
@ -68,7 +67,7 @@ func (o *pdOracle) IsExpired(lockTS, TTL uint64) bool {
|
|||
func (o *pdOracle) GetTimestamp(ctx context.Context) (uint64, error) {
|
||||
ts, err := o.getTimestamp(ctx)
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
return 0, err
|
||||
}
|
||||
o.setLastTS(ts)
|
||||
return ts, nil
|
||||
|
|
@ -85,7 +84,7 @@ func (f *tsFuture) Wait() (uint64, error) {
|
|||
physical, logical, err := f.TSFuture.Wait()
|
||||
metrics.TSFutureWaitDuration.Observe(time.Since(now).Seconds())
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
return 0, err
|
||||
}
|
||||
ts := oracle.ComposeTS(physical, logical)
|
||||
f.o.setLastTS(ts)
|
||||
|
|
@ -101,7 +100,7 @@ func (o *pdOracle) getTimestamp(ctx context.Context) (uint64, error) {
|
|||
now := time.Now()
|
||||
physical, logical, err := o.c.GetTS(ctx)
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
return 0, err
|
||||
}
|
||||
dist := time.Since(now)
|
||||
if dist > slowDist {
|
||||
|
|
|
|||
|
|
@ -17,8 +17,8 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/tikv/client-go/retry"
|
||||
"github.com/tikv/client-go/rpc"
|
||||
)
|
||||
|
|
@ -61,7 +61,7 @@ func (t *DeleteRangeTask) Execute() error {
|
|||
bo := retry.NewBackoffer(t.ctx, retry.DeleteRangeOneRegionMaxBackoff)
|
||||
loc, err := t.store.GetRegionCache().LocateKey(bo, startKey)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Delete to the end of the region, except if it's the last region overlapping the range
|
||||
|
|
@ -81,22 +81,22 @@ func (t *DeleteRangeTask) Execute() error {
|
|||
|
||||
resp, err := t.store.SendReq(bo, req, loc.Region, rpc.ReadTimeoutMedium)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
regionErr, err := resp.GetRegionError()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
if regionErr != nil {
|
||||
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
deleteRangeResp := resp.DeleteRange
|
||||
if deleteRangeResp == nil {
|
||||
return errors.Trace(rpc.ErrBodyMissing)
|
||||
return errors.WithStack(rpc.ErrBodyMissing)
|
||||
}
|
||||
if err := deleteRangeResp.GetError(); err != "" {
|
||||
return errors.Errorf("unexpected delete range err: %v", err)
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ package store
|
|||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/tikv/client-go/key"
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -19,8 +19,8 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/pkg/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/tikv/client-go/config"
|
||||
"github.com/tikv/client-go/locate"
|
||||
|
|
@ -61,7 +61,7 @@ var _ = NewLockResolver
|
|||
func NewLockResolver(etcdAddrs []string, security config.Security) (*LockResolver, error) {
|
||||
s, err := NewStore(etcdAddrs, security)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return s.GetLockResolver(), nil
|
||||
|
|
@ -164,7 +164,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo
|
|||
|
||||
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary)
|
||||
if err != nil {
|
||||
return false, errors.Trace(err)
|
||||
return false, err
|
||||
}
|
||||
txnInfos[l.TxnID] = uint64(status)
|
||||
}
|
||||
|
|
@ -187,25 +187,25 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo
|
|||
startTime = time.Now()
|
||||
resp, err := lr.store.SendReq(bo, req, loc, rpc.ReadTimeoutShort)
|
||||
if err != nil {
|
||||
return false, errors.Trace(err)
|
||||
return false, err
|
||||
}
|
||||
|
||||
regionErr, err := resp.GetRegionError()
|
||||
if err != nil {
|
||||
return false, errors.Trace(err)
|
||||
return false, err
|
||||
}
|
||||
|
||||
if regionErr != nil {
|
||||
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
return false, errors.Trace(err)
|
||||
return false, err
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
cmdResp := resp.ResolveLock
|
||||
if cmdResp == nil {
|
||||
return false, errors.Trace(rpc.ErrBodyMissing)
|
||||
return false, errors.WithStack(rpc.ErrBodyMissing)
|
||||
}
|
||||
if keyErr := cmdResp.GetError(); keyErr != nil {
|
||||
return false, errors.Errorf("unexpected resolve err: %s", keyErr)
|
||||
|
|
@ -250,7 +250,7 @@ func (lr *LockResolver) ResolveLocks(bo *retry.Backoffer, locks []*Lock) (ok boo
|
|||
for _, l := range expiredLocks {
|
||||
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary)
|
||||
if err != nil {
|
||||
return false, errors.Trace(err)
|
||||
return false, err
|
||||
}
|
||||
|
||||
cleanRegions := cleanTxns[l.TxnID]
|
||||
|
|
@ -261,7 +261,7 @@ func (lr *LockResolver) ResolveLocks(bo *retry.Backoffer, locks []*Lock) (ok boo
|
|||
|
||||
err = lr.resolveLock(bo, l, status, cleanRegions)
|
||||
if err != nil {
|
||||
return false, errors.Trace(err)
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
return len(expiredLocks) == len(locks), nil
|
||||
|
|
@ -273,8 +273,7 @@ func (lr *LockResolver) ResolveLocks(bo *retry.Backoffer, locks []*Lock) (ok boo
|
|||
// seconds before calling it after Prewrite.
|
||||
func (lr *LockResolver) GetTxnStatus(txnID uint64, primary []byte) (TxnStatus, error) {
|
||||
bo := retry.NewBackoffer(context.Background(), retry.CleanupMaxBackoff)
|
||||
status, err := lr.getTxnStatus(bo, txnID, primary)
|
||||
return status, errors.Trace(err)
|
||||
return lr.getTxnStatus(bo, txnID, primary)
|
||||
}
|
||||
|
||||
func (lr *LockResolver) getTxnStatus(bo *retry.Backoffer, txnID uint64, primary []byte) (TxnStatus, error) {
|
||||
|
|
@ -295,26 +294,26 @@ func (lr *LockResolver) getTxnStatus(bo *retry.Backoffer, txnID uint64, primary
|
|||
for {
|
||||
loc, err := lr.store.GetRegionCache().LocateKey(bo, primary)
|
||||
if err != nil {
|
||||
return status, errors.Trace(err)
|
||||
return status, err
|
||||
}
|
||||
resp, err := lr.store.SendReq(bo, req, loc.Region, rpc.ReadTimeoutShort)
|
||||
if err != nil {
|
||||
return status, errors.Trace(err)
|
||||
return status, err
|
||||
}
|
||||
regionErr, err := resp.GetRegionError()
|
||||
if err != nil {
|
||||
return status, errors.Trace(err)
|
||||
return status, err
|
||||
}
|
||||
if regionErr != nil {
|
||||
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
return status, errors.Trace(err)
|
||||
return status, err
|
||||
}
|
||||
continue
|
||||
}
|
||||
cmdResp := resp.Cleanup
|
||||
if cmdResp == nil {
|
||||
return status, errors.Trace(rpc.ErrBodyMissing)
|
||||
return status, errors.WithStack(rpc.ErrBodyMissing)
|
||||
}
|
||||
if keyErr := cmdResp.GetError(); keyErr != nil {
|
||||
err = errors.Errorf("unexpected cleanup err: %s, tid: %v", keyErr, txnID)
|
||||
|
|
@ -337,7 +336,7 @@ func (lr *LockResolver) resolveLock(bo *retry.Backoffer, l *Lock, status TxnStat
|
|||
for {
|
||||
loc, err := lr.store.GetRegionCache().LocateKey(bo, l.Key)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
if _, ok := cleanRegions[loc.Region]; ok {
|
||||
return nil
|
||||
|
|
@ -353,22 +352,22 @@ func (lr *LockResolver) resolveLock(bo *retry.Backoffer, l *Lock, status TxnStat
|
|||
}
|
||||
resp, err := lr.store.SendReq(bo, req, loc.Region, rpc.ReadTimeoutShort)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
regionErr, err := resp.GetRegionError()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
if regionErr != nil {
|
||||
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
cmdResp := resp.ResolveLock
|
||||
if cmdResp == nil {
|
||||
return errors.Trace(rpc.ErrBodyMissing)
|
||||
return errors.WithStack(rpc.ErrBodyMissing)
|
||||
}
|
||||
if keyErr := cmdResp.GetError(); keyErr != nil {
|
||||
err = errors.Errorf("unexpected resolve err: %s, lock: %v", keyErr, l)
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ import (
|
|||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pkg/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
|
@ -84,7 +84,7 @@ type EtcdSafePointKV struct {
|
|||
func NewEtcdSafePointKV(addrs []string, tlsConfig *tls.Config) (*EtcdSafePointKV, error) {
|
||||
etcdCli, err := createEtcdKV(addrs, tlsConfig)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
return &EtcdSafePointKV{cli: etcdCli}, nil
|
||||
}
|
||||
|
|
@ -100,7 +100,7 @@ func createEtcdKV(addrs []string, tlsConfig *tls.Config) (*clientv3.Client, erro
|
|||
TLS: tlsConfig,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
return cli, nil
|
||||
}
|
||||
|
|
@ -111,7 +111,7 @@ func (w *EtcdSafePointKV) Put(k string, v string) error {
|
|||
_, err := w.cli.Put(ctx, k, v)
|
||||
cancel()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -122,7 +122,7 @@ func (w *EtcdSafePointKV) Get(k string) (string, error) {
|
|||
resp, err := w.cli.Get(ctx, k)
|
||||
cancel()
|
||||
if err != nil {
|
||||
return "", errors.Trace(err)
|
||||
return "", errors.WithStack(err)
|
||||
}
|
||||
if len(resp.Kvs) > 0 {
|
||||
return string(resp.Kvs[0].Value), nil
|
||||
|
|
@ -135,7 +135,7 @@ func saveSafePoint(kv SafePointKV, key string, t uint64) error {
|
|||
err := kv.Put(GcSavedSafePoint, s)
|
||||
if err != nil {
|
||||
log.Error("save safepoint failed:", err)
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -144,7 +144,7 @@ func loadSafePoint(kv SafePointKV, key string) (uint64, error) {
|
|||
str, err := kv.Get(GcSavedSafePoint)
|
||||
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if str == "" {
|
||||
|
|
@ -153,7 +153,7 @@ func loadSafePoint(kv SafePointKV, key string) (uint64, error) {
|
|||
|
||||
t, err := strconv.ParseUint(str, 10, 64)
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
return 0, errors.WithStack(err)
|
||||
}
|
||||
return t, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,8 +17,8 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/pkg/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/tikv/client-go/key"
|
||||
"github.com/tikv/client-go/retry"
|
||||
|
|
@ -54,7 +54,7 @@ func newScanner(snapshot *TiKVSnapshot, startKey []byte, endKey []byte, batchSiz
|
|||
if kv.IsErrNotFound(err) {
|
||||
return scanner, nil
|
||||
}
|
||||
return scanner, errors.Trace(err)
|
||||
return scanner, err
|
||||
}
|
||||
|
||||
// Valid return valid.
|
||||
|
|
@ -94,7 +94,7 @@ func (s *Scanner) Next() error {
|
|||
err := s.getData(bo)
|
||||
if err != nil {
|
||||
s.Close()
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
if s.idx >= len(s.cache) {
|
||||
continue
|
||||
|
|
@ -112,7 +112,7 @@ func (s *Scanner) Next() error {
|
|||
// 'current' would be modified if the lock being resolved
|
||||
if err := s.resolveCurrentLock(bo, current); err != nil {
|
||||
s.Close()
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
|
||||
// The check here does not violate the KeyOnly semantic, because current's value
|
||||
|
|
@ -138,7 +138,7 @@ func (s *Scanner) startTS() uint64 {
|
|||
func (s *Scanner) resolveCurrentLock(bo *retry.Backoffer, current *pb.KvPair) error {
|
||||
val, err := s.snapshot.get(bo, key.Key(current.Key))
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
current.Error = nil
|
||||
current.Value = val
|
||||
|
|
@ -152,7 +152,7 @@ func (s *Scanner) getData(bo *retry.Backoffer) error {
|
|||
for {
|
||||
loc, err := s.snapshot.store.regionCache.LocateKey(bo, s.nextStartKey)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
|
||||
reqEndKey := s.endKey
|
||||
|
|
@ -176,28 +176,28 @@ func (s *Scanner) getData(bo *retry.Backoffer) error {
|
|||
}
|
||||
resp, err := sender.SendReq(bo, req, loc.Region, rpc.ReadTimeoutMedium)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
regionErr, err := resp.GetRegionError()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
if regionErr != nil {
|
||||
log.Debugf("scanner getData failed: %s", regionErr)
|
||||
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
cmdScanResp := resp.Scan
|
||||
if cmdScanResp == nil {
|
||||
return errors.Trace(rpc.ErrBodyMissing)
|
||||
return errors.WithStack(rpc.ErrBodyMissing)
|
||||
}
|
||||
|
||||
err = s.snapshot.store.CheckVisibility(s.startTS())
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
|
||||
kvPairs := cmdScanResp.Pairs
|
||||
|
|
@ -206,7 +206,7 @@ func (s *Scanner) getData(bo *retry.Backoffer) error {
|
|||
if keyErr := pair.GetError(); keyErr != nil {
|
||||
lock, err := extractLockFromKeyErr(keyErr)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
pair.Key = lock.Key
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,8 +21,8 @@ import (
|
|||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/pkg/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/tikv/client-go/key"
|
||||
"github.com/tikv/client-go/metrics"
|
||||
|
|
@ -81,12 +81,12 @@ func (s *TiKVSnapshot) BatchGet(keys []key.Key) (map[string][]byte, error) {
|
|||
mu.Unlock()
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = s.store.CheckVisibility(s.ts)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return m, nil
|
||||
|
|
@ -95,7 +95,7 @@ func (s *TiKVSnapshot) BatchGet(keys []key.Key) (map[string][]byte, error) {
|
|||
func (s *TiKVSnapshot) batchGetKeysByRegions(bo *retry.Backoffer, keys [][]byte, collectF func(k, v []byte)) error {
|
||||
groups, _, err := s.store.regionCache.GroupKeysByRegion(bo, keys)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
|
||||
metrics.TxnRegionsNumHistogram.WithLabelValues("snapshot").Observe(float64(len(groups)))
|
||||
|
|
@ -109,7 +109,7 @@ func (s *TiKVSnapshot) batchGetKeysByRegions(bo *retry.Backoffer, keys [][]byte,
|
|||
return nil
|
||||
}
|
||||
if len(batches) == 1 {
|
||||
return errors.Trace(s.batchGetSingleRegion(bo, batches[0], collectF))
|
||||
return s.batchGetSingleRegion(bo, batches[0], collectF)
|
||||
}
|
||||
ch := make(chan error)
|
||||
for _, batch1 := range batches {
|
||||
|
|
@ -126,7 +126,7 @@ func (s *TiKVSnapshot) batchGetKeysByRegions(bo *retry.Backoffer, keys [][]byte,
|
|||
err = e
|
||||
}
|
||||
}
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *TiKVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, collectF func(k, v []byte)) error {
|
||||
|
|
@ -147,23 +147,22 @@ func (s *TiKVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys
|
|||
}
|
||||
resp, err := sender.SendReq(bo, req, batch.region, rpc.ReadTimeoutMedium)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
regionErr, err := resp.GetRegionError()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
if regionErr != nil {
|
||||
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
err = s.batchGetKeysByRegions(bo, pending, collectF)
|
||||
return errors.Trace(err)
|
||||
return s.batchGetKeysByRegions(bo, pending, collectF)
|
||||
}
|
||||
batchGetResp := resp.BatchGet
|
||||
if batchGetResp == nil {
|
||||
return errors.Trace(rpc.ErrBodyMissing)
|
||||
return errors.WithStack(rpc.ErrBodyMissing)
|
||||
}
|
||||
var (
|
||||
lockedKeys [][]byte
|
||||
|
|
@ -177,7 +176,7 @@ func (s *TiKVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys
|
|||
}
|
||||
lock, err := extractLockFromKeyErr(keyErr)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
lockedKeys = append(lockedKeys, lock.Key)
|
||||
locks = append(locks, lock)
|
||||
|
|
@ -185,12 +184,12 @@ func (s *TiKVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys
|
|||
if len(lockedKeys) > 0 {
|
||||
ok, err := s.store.lockResolver.ResolveLocks(bo, locks)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
err = bo.Backoff(retry.BoTxnLockFast, errors.Errorf("batchGet lockedKeys: %d", len(lockedKeys)))
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
pending = lockedKeys
|
||||
|
|
@ -204,7 +203,7 @@ func (s *TiKVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys
|
|||
func (s *TiKVSnapshot) Get(k key.Key) ([]byte, error) {
|
||||
val, err := s.get(retry.NewBackoffer(context.Background(), retry.GetMaxBackoff), k)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
if len(val) == 0 {
|
||||
return nil, kv.ErrNotExist
|
||||
|
|
@ -229,41 +228,41 @@ func (s *TiKVSnapshot) get(bo *retry.Backoffer, k key.Key) ([]byte, error) {
|
|||
for {
|
||||
loc, err := s.store.regionCache.LocateKey(bo, k)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
resp, err := sender.SendReq(bo, req, loc.Region, rpc.ReadTimeoutShort)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
regionErr, err := resp.GetRegionError()
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
if regionErr != nil {
|
||||
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
continue
|
||||
}
|
||||
cmdGetResp := resp.Get
|
||||
if cmdGetResp == nil {
|
||||
return nil, errors.Trace(rpc.ErrBodyMissing)
|
||||
return nil, errors.WithStack(rpc.ErrBodyMissing)
|
||||
}
|
||||
val := cmdGetResp.GetValue()
|
||||
if keyErr := cmdGetResp.GetError(); keyErr != nil {
|
||||
lock, err := extractLockFromKeyErr(keyErr)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
ok, err := s.store.lockResolver.ResolveLocks(bo, []*Lock{lock})
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
if !ok {
|
||||
err = bo.Backoff(retry.BoTxnLockFast, errors.New(keyErr.String()))
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
continue
|
||||
|
|
@ -275,7 +274,7 @@ func (s *TiKVSnapshot) get(bo *retry.Backoffer, k key.Key) ([]byte, error) {
|
|||
// Iter returns a list of key-value pair after `k`.
|
||||
func (s *TiKVSnapshot) Iter(k key.Key, upperBound key.Key) (kv.Iterator, error) {
|
||||
scanner, err := newScanner(s, k, upperBound, scanBatchSize)
|
||||
return scanner, errors.Trace(err)
|
||||
return scanner, err
|
||||
}
|
||||
|
||||
// IterReverse creates a reversed Iterator positioned on the first entry which key is less than k.
|
||||
|
|
@ -294,17 +293,17 @@ func extractLockFromKeyErr(keyErr *pb.KeyError) (*Lock, error) {
|
|||
}
|
||||
if keyErr.Conflict != nil {
|
||||
err := errors.New(conflictToString(keyErr.Conflict))
|
||||
return nil, errors.Annotate(err, TxnRetryableMark)
|
||||
return nil, errors.WithMessage(err, TxnRetryableMark)
|
||||
}
|
||||
if keyErr.Retryable != "" {
|
||||
err := errors.Errorf("tikv restarts txn: %s", keyErr.GetRetryable())
|
||||
log.Debug(err)
|
||||
return nil, errors.Annotate(err, TxnRetryableMark)
|
||||
return nil, errors.WithMessage(err, TxnRetryableMark)
|
||||
}
|
||||
if keyErr.Abort != "" {
|
||||
err := errors.Errorf("tikv aborts txn: %s", keyErr.GetAbort())
|
||||
log.Warn(err)
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
return nil, errors.Errorf("unexpected KeyError: %s", keyErr.String())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,8 +17,8 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/pkg/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/tikv/client-go/key"
|
||||
"github.com/tikv/client-go/retry"
|
||||
|
|
@ -41,7 +41,7 @@ func SplitRegion(store *TiKVStore, splitKey key.Key) error {
|
|||
for {
|
||||
loc, err := store.GetRegionCache().LocateKey(bo, splitKey)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
if bytes.Equal(splitKey, loc.StartKey) {
|
||||
log.Infof("skip split_region region at %q", splitKey)
|
||||
|
|
@ -49,16 +49,16 @@ func SplitRegion(store *TiKVStore, splitKey key.Key) error {
|
|||
}
|
||||
res, err := sender.SendReq(bo, req, loc.Region, rpc.ReadTimeoutShort)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
regionErr, err := res.GetRegionError()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
if regionErr != nil {
|
||||
err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,8 +20,8 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
pd "github.com/pingcap/pd/client"
|
||||
"github.com/pkg/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/tikv/client-go/config"
|
||||
"github.com/tikv/client-go/locate"
|
||||
|
|
@ -64,22 +64,22 @@ func NewStore(pdAddrs []string, security config.Security) (*TiKVStore, error) {
|
|||
KeyPath: security.SSLKey,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
oracle, err := oracles.NewPdOracle(pdCli, time.Duration(oracleUpdateInterval)*time.Millisecond)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tlsConfig, err := security.ToTLSConfig()
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
spkv, err := NewEtcdSafePointKV(pdAddrs, tlsConfig)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
clusterID := pdCli.GetClusterID(context.TODO())
|
||||
|
|
@ -156,7 +156,7 @@ func (s *TiKVStore) Close() error {
|
|||
|
||||
close(s.closed)
|
||||
if err := s.client.Close(); err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
|
||||
if s.txnLatches != nil {
|
||||
|
|
@ -174,7 +174,7 @@ func (s *TiKVStore) GetTimestampWithRetry(bo *retry.Backoffer) (uint64, error) {
|
|||
}
|
||||
err = bo.Backoff(retry.BoPDRPC, errors.Errorf("get timestamp failed: %v", err))
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -212,11 +212,11 @@ func (s *TiKVStore) CheckVisibility(startTS uint64) error {
|
|||
diff := time.Since(cachedTime)
|
||||
|
||||
if diff > (GcSafePointCacheInterval - gcCPUTimeInaccuracyBound) {
|
||||
return errors.Trace(ErrPDServerTimeout)
|
||||
return errors.WithStack(ErrPDServerTimeout)
|
||||
}
|
||||
|
||||
if startTS < cachedSafePoint {
|
||||
return errors.Trace(ErrStartTSFallBehind)
|
||||
return errors.WithStack(ErrStartTSFallBehind)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
|||
|
|
@ -21,8 +21,8 @@ import (
|
|||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/pkg/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/tikv/client-go/config"
|
||||
"github.com/tikv/client-go/metrics"
|
||||
|
|
@ -171,7 +171,7 @@ func (c *TxnCommitter) doActionOnKeys(bo *retry.Backoffer, action commitAction,
|
|||
}
|
||||
groups, firstRegion, err := c.store.GetRegionCache().GroupKeysByRegion(bo, keys)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
|
||||
metrics.TxnRegionsNumHistogram.WithLabelValues(action.MetricsTag()).Observe(float64(len(groups)))
|
||||
|
|
@ -194,7 +194,7 @@ func (c *TxnCommitter) doActionOnKeys(bo *retry.Backoffer, action commitAction,
|
|||
// primary should be committed/cleanup first
|
||||
err = c.doActionOnBatches(bo, action, batches[:1])
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
batches = batches[1:]
|
||||
}
|
||||
|
|
@ -214,7 +214,7 @@ func (c *TxnCommitter) doActionOnKeys(bo *retry.Backoffer, action commitAction,
|
|||
} else {
|
||||
err = c.doActionOnBatches(bo, action, batches)
|
||||
}
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
|
||||
// doActionOnBatches does action to batches in parallel.
|
||||
|
|
@ -236,7 +236,7 @@ func (c *TxnCommitter) doActionOnBatches(bo *retry.Backoffer, action commitActio
|
|||
if e != nil {
|
||||
log.Debugf("con:%d 2PC doActionOnBatches %s failed: %v, tid: %d", c.ConnID, action, e, c.startTS)
|
||||
}
|
||||
return errors.Trace(e)
|
||||
return e
|
||||
}
|
||||
|
||||
// For prewrite, stop sending other requests after receiving first error.
|
||||
|
|
@ -283,7 +283,7 @@ func (c *TxnCommitter) doActionOnBatches(bo *retry.Backoffer, action commitActio
|
|||
}
|
||||
}
|
||||
}
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *TxnCommitter) keyValueSize(key []byte) int {
|
||||
|
|
@ -320,23 +320,22 @@ func (c *TxnCommitter) prewriteSingleBatch(bo *retry.Backoffer, batch batchKeys)
|
|||
for {
|
||||
resp, err := c.store.SendReq(bo, req, batch.region, rpc.ReadTimeoutShort)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
regionErr, err := resp.GetRegionError()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
if regionErr != nil {
|
||||
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
err = c.prewriteKeys(bo, batch.keys)
|
||||
return errors.Trace(err)
|
||||
return c.prewriteKeys(bo, batch.keys)
|
||||
}
|
||||
prewriteResp := resp.Prewrite
|
||||
if prewriteResp == nil {
|
||||
return errors.Trace(rpc.ErrBodyMissing)
|
||||
return errors.WithStack(rpc.ErrBodyMissing)
|
||||
}
|
||||
keyErrs := prewriteResp.GetErrors()
|
||||
if len(keyErrs) == 0 {
|
||||
|
|
@ -346,13 +345,13 @@ func (c *TxnCommitter) prewriteSingleBatch(bo *retry.Backoffer, batch batchKeys)
|
|||
for _, keyErr := range keyErrs {
|
||||
// Check already exists error
|
||||
if alreadyExist := keyErr.GetAlreadyExist(); alreadyExist != nil {
|
||||
return errors.Trace(ErrKeyAlreadyExist(alreadyExist.GetKey()))
|
||||
return errors.WithStack(ErrKeyAlreadyExist(alreadyExist.GetKey()))
|
||||
}
|
||||
|
||||
// Extract lock from key error
|
||||
lock, err1 := extractLockFromKeyErr(keyErr)
|
||||
if err1 != nil {
|
||||
return errors.Trace(err1)
|
||||
return err1
|
||||
}
|
||||
log.Debugf("con:%d 2PC prewrite encounters lock: %v", c.ConnID, lock)
|
||||
locks = append(locks, lock)
|
||||
|
|
@ -360,13 +359,13 @@ func (c *TxnCommitter) prewriteSingleBatch(bo *retry.Backoffer, batch batchKeys)
|
|||
start := time.Now()
|
||||
ok, err := c.store.GetLockResolver().ResolveLocks(bo, locks)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
atomic.AddInt64(&c.detail.ResolveLockTime, int64(time.Since(start)))
|
||||
if !ok {
|
||||
err = bo.Backoff(retry.BoTxnLock, errors.Errorf("2PC prewrite lockedKeys: %d", len(locks)))
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -409,28 +408,27 @@ func (c *TxnCommitter) commitSingleBatch(bo *retry.Backoffer, batch batchKeys) e
|
|||
// solution is to populate this error and let upper layer drop the connection to the corresponding mysql client.
|
||||
isPrimary := bytes.Equal(batch.keys[0], c.primary())
|
||||
if isPrimary && sender.RPCError() != nil {
|
||||
c.setUndeterminedErr(errors.Trace(sender.RPCError()))
|
||||
c.setUndeterminedErr(sender.RPCError())
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
regionErr, err := resp.GetRegionError()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
if regionErr != nil {
|
||||
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
// re-split keys and commit again.
|
||||
err = c.commitKeys(bo, batch.keys)
|
||||
return errors.Trace(err)
|
||||
return c.commitKeys(bo, batch.keys)
|
||||
}
|
||||
commitResp := resp.Commit
|
||||
if commitResp == nil {
|
||||
return errors.Trace(rpc.ErrBodyMissing)
|
||||
return errors.WithStack(rpc.ErrBodyMissing)
|
||||
}
|
||||
// Here we can make sure tikv has processed the commit primary key request. So
|
||||
// we can clean undetermined error.
|
||||
|
|
@ -445,11 +443,11 @@ func (c *TxnCommitter) commitSingleBatch(bo *retry.Backoffer, batch batchKeys) e
|
|||
// No secondary key could be rolled back after it's primary key is committed.
|
||||
// There must be a serious bug somewhere.
|
||||
log.Errorf("2PC failed commit key after primary key committed: %v, tid: %d", err, c.startTS)
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
// The transaction maybe rolled back by concurrent transactions.
|
||||
log.Debugf("2PC failed commit primary key: %v, retry later, tid: %d", err, c.startTS)
|
||||
return errors.Annotate(err, TxnRetryableMark)
|
||||
return errors.WithMessage(err, TxnRetryableMark)
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
|
|
@ -474,24 +472,23 @@ func (c *TxnCommitter) cleanupSingleBatch(bo *retry.Backoffer, batch batchKeys)
|
|||
}
|
||||
resp, err := c.store.SendReq(bo, req, batch.region, rpc.ReadTimeoutShort)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
regionErr, err := resp.GetRegionError()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
if regionErr != nil {
|
||||
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
err = c.cleanupKeys(bo, batch.keys)
|
||||
return errors.Trace(err)
|
||||
return c.cleanupKeys(bo, batch.keys)
|
||||
}
|
||||
if keyErr := resp.BatchRollback.GetError(); keyErr != nil {
|
||||
err = errors.Errorf("con:%d 2PC cleanup failed: %s", c.ConnID, keyErr)
|
||||
log.Debugf("2PC failed cleanup key: %v, tid: %d", err, c.startTS)
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -539,14 +536,14 @@ func (c *TxnCommitter) Execute(ctx context.Context) error {
|
|||
|
||||
if err != nil {
|
||||
log.Debugf("con:%d 2PC failed on prewrite: %v, tid: %d", c.ConnID, err, c.startTS)
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
|
||||
start = time.Now()
|
||||
commitTS, err := c.store.GetTimestampWithRetry(retry.NewBackoffer(ctx, retry.TsoMaxBackoff))
|
||||
if err != nil {
|
||||
log.Warnf("con:%d 2PC get commitTS failed: %v, tid: %d", c.ConnID, err, c.startTS)
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
c.detail.GetCommitTsTime = time.Since(start)
|
||||
|
||||
|
|
@ -555,13 +552,13 @@ func (c *TxnCommitter) Execute(ctx context.Context) error {
|
|||
err = errors.Errorf("con:%d Invalid transaction tso with start_ts=%v while commit_ts=%v",
|
||||
c.ConnID, c.startTS, commitTS)
|
||||
log.Error(err)
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
c.commitTS = commitTS
|
||||
|
||||
if c.store.GetOracle().IsExpired(c.startTS, c.maxTxnTimeUse) {
|
||||
err = errors.Errorf("con:%d txn takes too much time, start: %d, commit: %d", c.ConnID, c.startTS, c.commitTS)
|
||||
return errors.Annotate(err, TxnRetryableMark)
|
||||
return errors.WithMessage(err, TxnRetryableMark)
|
||||
}
|
||||
|
||||
start = time.Now()
|
||||
|
|
@ -573,11 +570,11 @@ func (c *TxnCommitter) Execute(ctx context.Context) error {
|
|||
if undeterminedErr := c.getUndeterminedErr(); undeterminedErr != nil {
|
||||
log.Warnf("con:%d 2PC commit result undetermined, err: %v, rpcErr: %v, tid: %v", c.ConnID, err, undeterminedErr, c.startTS)
|
||||
log.Error(err)
|
||||
err = errors.Trace(ErrResultUndetermined)
|
||||
err = errors.WithStack(ErrResultUndetermined)
|
||||
}
|
||||
if !c.mu.committed {
|
||||
log.Debugf("con:%d 2PC failed on commit: %v, tid: %d", c.ConnID, err, c.startTS)
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
log.Debugf("con:%d 2PC succeed with error: %v, tid: %d", c.ConnID, err, c.startTS)
|
||||
}
|
||||
|
|
|
|||
20
txnkv/txn.go
20
txnkv/txn.go
|
|
@ -18,8 +18,8 @@ import (
|
|||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/common/log"
|
||||
"github.com/tikv/client-go/key"
|
||||
"github.com/tikv/client-go/metrics"
|
||||
|
|
@ -66,12 +66,12 @@ func (txn *Transaction) Get(k key.Key) ([]byte, error) {
|
|||
return nil, err
|
||||
}
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = txn.tikvStore.CheckVisibility(txn.startTS)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return ret, nil
|
||||
|
|
@ -90,7 +90,7 @@ func (txn *Transaction) BatchGet(keys []key.Key) (map[string][]byte, error) {
|
|||
continue
|
||||
}
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
if len(val) != 0 {
|
||||
bufferValues[i] = val
|
||||
|
|
@ -98,7 +98,7 @@ func (txn *Transaction) BatchGet(keys []key.Key) (map[string][]byte, error) {
|
|||
}
|
||||
storageValues, err := txn.snapshot.BatchGet(shrinkKeys)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
return nil, err
|
||||
}
|
||||
for i, key := range keys {
|
||||
if bufferValues[i] == nil {
|
||||
|
|
@ -199,7 +199,7 @@ func (txn *Transaction) Commit(ctx context.Context) error {
|
|||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
for _, lockKey := range txn.lockKeys {
|
||||
if _, ok := mutations[string(lockKey)]; !ok {
|
||||
|
|
@ -215,14 +215,14 @@ func (txn *Transaction) Commit(ctx context.Context) error {
|
|||
|
||||
committer, err := store.NewTxnCommitter(txn.tikvStore, txn.startTS, txn.startTime, mutations)
|
||||
if err != nil || committer == nil {
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
|
||||
// latches disabled
|
||||
if txn.tikvStore.GetTxnLatches() == nil {
|
||||
err = committer.Execute(ctx)
|
||||
log.Debug("[kv]", txn.startTS, " txnLatches disabled, 2pc directly:", err)
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
|
||||
// latches enabled
|
||||
|
|
@ -236,14 +236,14 @@ func (txn *Transaction) Commit(ctx context.Context) error {
|
|||
defer txn.tikvStore.GetTxnLatches().UnLock(lock)
|
||||
if lock.IsStale() {
|
||||
err = errors.Errorf("startTS %d is stale", txn.startTS)
|
||||
return errors.Annotate(err, store.TxnRetryableMark)
|
||||
return errors.WithMessage(err, store.TxnRetryableMark)
|
||||
}
|
||||
err = committer.Execute(ctx)
|
||||
if err == nil {
|
||||
lock.SetCommitTS(committer.GetCommitTS())
|
||||
}
|
||||
log.Debug("[kv]", txn.startTS, " txnLatches enabled while txn retryable:", err)
|
||||
return errors.Trace(err)
|
||||
return err
|
||||
}
|
||||
|
||||
func (txn *Transaction) Rollback() error {
|
||||
|
|
|
|||
Loading…
Reference in New Issue