internal/mockstore: remove use of errors.Trace (#334)

Signed-off-by: disksing <i@disksing.com>
This commit is contained in:
disksing 2021-10-13 15:09:54 +08:00 committed by GitHub
parent 61664cf55c
commit ed3af2fc7f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 73 additions and 78 deletions

View File

@ -35,7 +35,6 @@
package mocktikv package mocktikv
import ( import (
"github.com/pingcap/errors"
pd "github.com/tikv/pd/client" pd "github.com/tikv/pd/client"
) )
@ -43,7 +42,7 @@ import (
func NewTiKVAndPDClient(path string, coprHandler CoprRPCHandler) (*RPCClient, *Cluster, pd.Client, error) { func NewTiKVAndPDClient(path string, coprHandler CoprRPCHandler) (*RPCClient, *Cluster, pd.Client, error) {
mvccStore, err := NewMVCCLevelDB(path) mvccStore, err := NewMVCCLevelDB(path)
if err != nil { if err != nil {
return nil, nil, nil, errors.Trace(err) return nil, nil, nil, err
} }
cluster := NewCluster(mvccStore) cluster := NewCluster(mvccStore)

View File

@ -93,7 +93,7 @@ func (l *mvccLock) MarshalBinary() ([]byte, error) {
mh.WriteNumber(&buf, l.forUpdateTS) mh.WriteNumber(&buf, l.forUpdateTS)
mh.WriteNumber(&buf, l.txnSize) mh.WriteNumber(&buf, l.txnSize)
mh.WriteNumber(&buf, l.minCommitTS) mh.WriteNumber(&buf, l.minCommitTS)
return buf.Bytes(), errors.Trace(mh.err) return buf.Bytes(), mh.err
} }
// UnmarshalBinary implements encoding.BinaryUnmarshaler interface. // UnmarshalBinary implements encoding.BinaryUnmarshaler interface.
@ -108,7 +108,7 @@ func (l *mvccLock) UnmarshalBinary(data []byte) error {
mh.ReadNumber(buf, &l.forUpdateTS) mh.ReadNumber(buf, &l.forUpdateTS)
mh.ReadNumber(buf, &l.txnSize) mh.ReadNumber(buf, &l.txnSize)
mh.ReadNumber(buf, &l.minCommitTS) mh.ReadNumber(buf, &l.minCommitTS)
return errors.Trace(mh.err) return mh.err
} }
// MarshalBinary implements encoding.BinaryMarshaler interface. // MarshalBinary implements encoding.BinaryMarshaler interface.
@ -121,7 +121,7 @@ func (v mvccValue) MarshalBinary() ([]byte, error) {
mh.WriteNumber(&buf, v.startTS) mh.WriteNumber(&buf, v.startTS)
mh.WriteNumber(&buf, v.commitTS) mh.WriteNumber(&buf, v.commitTS)
mh.WriteSlice(&buf, v.value) mh.WriteSlice(&buf, v.value)
return buf.Bytes(), errors.Trace(mh.err) return buf.Bytes(), mh.err
} }
// UnmarshalBinary implements encoding.BinaryUnmarshaler interface. // UnmarshalBinary implements encoding.BinaryUnmarshaler interface.
@ -134,7 +134,7 @@ func (v *mvccValue) UnmarshalBinary(data []byte) error {
mh.ReadNumber(buf, &v.startTS) mh.ReadNumber(buf, &v.startTS)
mh.ReadNumber(buf, &v.commitTS) mh.ReadNumber(buf, &v.commitTS)
mh.ReadSlice(buf, &v.value) mh.ReadSlice(buf, &v.value)
return errors.Trace(mh.err) return mh.err
} }
type marshalHelper struct { type marshalHelper struct {
@ -148,11 +148,10 @@ func (mh *marshalHelper) WriteSlice(buf io.Writer, slice []byte) {
var tmp [binary.MaxVarintLen64]byte var tmp [binary.MaxVarintLen64]byte
off := binary.PutUvarint(tmp[:], uint64(len(slice))) off := binary.PutUvarint(tmp[:], uint64(len(slice)))
if err := writeFull(buf, tmp[:off]); err != nil { if err := writeFull(buf, tmp[:off]); err != nil {
mh.err = errors.Trace(err) mh.err = err
return
} }
if err := writeFull(buf, slice); err != nil { if err := writeFull(buf, slice); err != nil {
mh.err = errors.Trace(err) mh.err = err
} }
} }
@ -162,7 +161,7 @@ func (mh *marshalHelper) WriteNumber(buf io.Writer, n interface{}) {
} }
err := binary.Write(buf, binary.LittleEndian, n) err := binary.Write(buf, binary.LittleEndian, n)
if err != nil { if err != nil {
mh.err = errors.Trace(err) mh.err = errors.WithStack(err)
} }
} }
@ -171,7 +170,7 @@ func writeFull(w io.Writer, slice []byte) error {
for written < len(slice) { for written < len(slice) {
n, err := w.Write(slice[written:]) n, err := w.Write(slice[written:])
if err != nil { if err != nil {
return errors.Trace(err) return errors.WithStack(err)
} }
written += n written += n
} }
@ -184,7 +183,7 @@ func (mh *marshalHelper) ReadNumber(r io.Reader, n interface{}) {
} }
err := binary.Read(r, binary.LittleEndian, n) err := binary.Read(r, binary.LittleEndian, n)
if err != nil { if err != nil {
mh.err = errors.Trace(err) mh.err = errors.WithStack(err)
} }
} }
@ -194,7 +193,7 @@ func (mh *marshalHelper) ReadSlice(r *bytes.Buffer, slice *[]byte) {
} }
sz, err := binary.ReadUvarint(r) sz, err := binary.ReadUvarint(r)
if err != nil { if err != nil {
mh.err = errors.Trace(err) mh.err = errors.WithStack(err)
return return
} }
const c10M = 10 * 1024 * 1024 const c10M = 10 * 1024 * 1024
@ -204,7 +203,7 @@ func (mh *marshalHelper) ReadSlice(r *bytes.Buffer, slice *[]byte) {
} }
data := make([]byte, sz) data := make([]byte, sz)
if _, err := io.ReadFull(r, data); err != nil { if _, err := io.ReadFull(r, data); err != nil {
mh.err = errors.Trace(err) mh.err = errors.WithStack(err)
return return
} }
*slice = data *slice = data

View File

@ -114,7 +114,7 @@ func mvccDecode(encodedKey []byte) ([]byte, uint64, error) {
remainBytes, key, err := codec.DecodeBytes(encodedKey, nil) remainBytes, key, err := codec.DecodeBytes(encodedKey, nil)
if err != nil { if err != nil {
// should never happen // should never happen
return nil, 0, errors.Trace(err) return nil, 0, err
} }
// if it's meta key // if it's meta key
if len(remainBytes) == 0 { if len(remainBytes) == 0 {
@ -124,10 +124,10 @@ func mvccDecode(encodedKey []byte) ([]byte, uint64, error) {
remainBytes, ver, err = codec.DecodeUintDesc(remainBytes) remainBytes, ver, err = codec.DecodeUintDesc(remainBytes)
if err != nil { if err != nil {
// should never happen // should never happen
return nil, 0, errors.Trace(err) return nil, 0, err
} }
if len(remainBytes) != 0 { if len(remainBytes) != 0 {
return nil, 0, ErrInvalidEncodedKey return nil, 0, errors.WithStack(ErrInvalidEncodedKey)
} }
return key, ver, nil return key, ver, nil
} }
@ -153,7 +153,7 @@ func NewMVCCLevelDB(path string) (*MVCCLevelDB, error) {
d, err = leveldb.OpenFile(path, &opt.Options{BlockCacheCapacity: 600 * 1024 * 1024}) d, err = leveldb.OpenFile(path, &opt.Options{BlockCacheCapacity: 600 * 1024 * 1024})
} }
return &MVCCLevelDB{db: d, deadlockDetector: deadlock.NewDetector()}, errors.Trace(err) return &MVCCLevelDB{db: d, deadlockDetector: deadlock.NewDetector()}, errors.WithStack(err)
} }
// Iterator wraps iterator.Iterator to provide Valid() method. // Iterator wraps iterator.Iterator to provide Valid() method.
@ -195,7 +195,7 @@ func newScanIterator(db *leveldb.DB, startKey, endKey []byte) (*Iterator, []byte
if len(startKey) == 0 && iter.Valid() { if len(startKey) == 0 && iter.Valid() {
key, _, err := mvccDecode(iter.Key()) key, _, err := mvccDecode(iter.Key())
if err != nil { if err != nil {
return nil, nil, errors.Trace(err) return nil, nil, err
} }
startKey = key startKey = key
} }
@ -216,7 +216,7 @@ func (dec *lockDecoder) Decode(iter *Iterator) (bool, error) {
iterKey := iter.Key() iterKey := iter.Key()
key, ver, err := mvccDecode(iterKey) key, ver, err := mvccDecode(iterKey)
if err != nil { if err != nil {
return false, errors.Trace(err) return false, err
} }
if !bytes.Equal(key, dec.expectKey) { if !bytes.Equal(key, dec.expectKey) {
return false, nil return false, nil
@ -228,7 +228,7 @@ func (dec *lockDecoder) Decode(iter *Iterator) (bool, error) {
var lock mvccLock var lock mvccLock
err = lock.UnmarshalBinary(iter.Value()) err = lock.UnmarshalBinary(iter.Value())
if err != nil { if err != nil {
return false, errors.Trace(err) return false, err
} }
dec.lock = lock dec.lock = lock
iter.Next() iter.Next()
@ -248,7 +248,7 @@ func (dec *valueDecoder) Decode(iter *Iterator) (bool, error) {
key, ver, err := mvccDecode(iter.Key()) key, ver, err := mvccDecode(iter.Key())
if err != nil { if err != nil {
return false, errors.Trace(err) return false, err
} }
if !bytes.Equal(key, dec.expectKey) { if !bytes.Equal(key, dec.expectKey) {
return false, nil return false, nil
@ -260,7 +260,7 @@ func (dec *valueDecoder) Decode(iter *Iterator) (bool, error) {
var value mvccValue var value mvccValue
err = value.UnmarshalBinary(iter.Value()) err = value.UnmarshalBinary(iter.Value())
if err != nil { if err != nil {
return false, errors.Trace(err) return false, err
} }
dec.value = value dec.value = value
iter.Next() iter.Next()
@ -279,7 +279,7 @@ func (dec *skipDecoder) Decode(iter *Iterator) (bool, error) {
for iter.Valid() { for iter.Valid() {
key, _, err := mvccDecode(iter.Key()) key, _, err := mvccDecode(iter.Key())
if err != nil { if err != nil {
return false, errors.Trace(err) return false, err
} }
if !bytes.Equal(key, dec.currKey) { if !bytes.Equal(key, dec.currKey) {
dec.currKey = key dec.currKey = key
@ -316,13 +316,13 @@ func getValue(iter *Iterator, key []byte, startTS uint64, isoLevel kvrpcpb.Isola
startTS, err = dec1.lock.check(startTS, key, resolvedLocks) startTS, err = dec1.lock.check(startTS, key, resolvedLocks)
} }
if err != nil { if err != nil {
return nil, errors.Trace(err) return nil, err
} }
dec2 := valueDecoder{expectKey: key} dec2 := valueDecoder{expectKey: key}
for iter.Valid() { for iter.Valid() {
ok, err := dec2.Decode(iter) ok, err := dec2.Decode(iter)
if err != nil { if err != nil {
return nil, errors.Trace(err) return nil, err
} }
if !ok { if !ok {
break break
@ -357,7 +357,7 @@ func (mvcc *MVCCLevelDB) BatchGet(ks [][]byte, startTS uint64, isoLevel kvrpcpb.
pairs = append(pairs, Pair{ pairs = append(pairs, Pair{
Key: k, Key: k,
Value: v, Value: v,
Err: errors.Trace(err), Err: err,
}) })
} }
return pairs return pairs
@ -382,7 +382,7 @@ func (mvcc *MVCCLevelDB) Scan(startKey, endKey []byte, limit int, startTS uint64
if err != nil { if err != nil {
pairs = append(pairs, Pair{ pairs = append(pairs, Pair{
Key: currKey, Key: currKey,
Err: errors.Trace(err), Err: err,
}) })
} }
if value != nil { if value != nil {
@ -568,7 +568,7 @@ func (mvcc *MVCCLevelDB) pessimisticLockMutation(batch *leveldb.Batch, mutation
} }
ok, err := dec.Decode(iter) ok, err := dec.Decode(iter)
if err != nil { if err != nil {
return errors.Trace(err) return err
} }
if ok { if ok {
if dec.lock.startTS != startTS { if dec.lock.startTS != startTS {
@ -606,7 +606,7 @@ func (mvcc *MVCCLevelDB) pessimisticLockMutation(batch *leveldb.Batch, mutation
writeKey := mvccEncode(mutation.Key, lockVer) writeKey := mvccEncode(mutation.Key, lockVer)
writeValue, err := lock.MarshalBinary() writeValue, err := lock.MarshalBinary()
if err != nil { if err != nil {
return errors.Trace(err) return err
} }
batch.Put(writeKey, writeValue) batch.Put(writeKey, writeValue)
@ -649,7 +649,7 @@ func pessimisticRollbackKey(db *leveldb.DB, batch *leveldb.Batch, key []byte, st
} }
ok, err := dec.Decode(iter) ok, err := dec.Decode(iter)
if err != nil { if err != nil {
return errors.Trace(err) return err
} }
if ok { if ok {
lock := dec.lock lock := dec.lock
@ -722,7 +722,7 @@ func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, forUpdateTS uint64,
} }
ok, err := dec.Decode(iter) ok, err := dec.Decode(iter)
if err != nil { if err != nil {
return nil, errors.Trace(err) return nil, err
} }
if !ok { if !ok {
return nil, nil return nil, nil
@ -784,7 +784,7 @@ func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, forUpdateTS uint64,
} }
ok, err = dec.Decode(iter) ok, err = dec.Decode(iter)
if err != nil { if err != nil {
return nil, errors.Trace(err) return nil, err
} }
} }
if getVal { if getVal {
@ -808,7 +808,7 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch,
} }
ok, err := dec.Decode(iter) ok, err := dec.Decode(iter)
if err != nil { if err != nil {
return errors.Trace(err) return err
} }
if ok { if ok {
if dec.lock.startTS != startTS { if dec.lock.startTS != startTS {
@ -862,7 +862,7 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch,
writeKey := mvccEncode(mutation.Key, lockVer) writeKey := mvccEncode(mutation.Key, lockVer)
writeValue, err := lock.MarshalBinary() writeValue, err := lock.MarshalBinary()
if err != nil { if err != nil {
return errors.Trace(err) return err
} }
batch.Put(writeKey, writeValue) batch.Put(writeKey, writeValue)
@ -881,7 +881,7 @@ func (mvcc *MVCCLevelDB) Commit(keys [][]byte, startTS, commitTS uint64) error {
for _, k := range keys { for _, k := range keys {
err := commitKey(mvcc.db, batch, k, startTS, commitTS) err := commitKey(mvcc.db, batch, k, startTS, commitTS)
if err != nil { if err != nil {
return errors.Trace(err) return err
} }
} }
return mvcc.db.Write(batch, nil) return mvcc.db.Write(batch, nil)
@ -899,14 +899,14 @@ func commitKey(db *leveldb.DB, batch *leveldb.Batch, key []byte, startTS, commit
} }
ok, err := dec.Decode(iter) ok, err := dec.Decode(iter)
if err != nil { if err != nil {
return errors.Trace(err) return err
} }
if !ok || dec.lock.startTS != startTS { if !ok || dec.lock.startTS != startTS {
// If the lock of this transaction is not found, or the lock is replaced by // If the lock of this transaction is not found, or the lock is replaced by
// another transaction, check commit information of this transaction. // another transaction, check commit information of this transaction.
c, ok, err1 := getTxnCommitInfo(iter, key, startTS) c, ok, err1 := getTxnCommitInfo(iter, key, startTS)
if err1 != nil { if err1 != nil {
return errors.Trace(err1) return err
} }
if ok && c.valueType != typeRollback { if ok && c.valueType != typeRollback {
// c.valueType != typeRollback means the transaction is already committed, do nothing. // c.valueType != typeRollback means the transaction is already committed, do nothing.
@ -926,7 +926,7 @@ func commitKey(db *leveldb.DB, batch *leveldb.Batch, key []byte, startTS, commit
} }
if err = commitLock(batch, dec.lock, key, startTS, commitTS); err != nil { if err = commitLock(batch, dec.lock, key, startTS, commitTS); err != nil {
return errors.Trace(err) return err
} }
return nil return nil
} }
@ -949,7 +949,7 @@ func commitLock(batch *leveldb.Batch, lock mvccLock, key []byte, startTS, commit
writeKey := mvccEncode(key, commitTS) writeKey := mvccEncode(key, commitTS)
writeValue, err := value.MarshalBinary() writeValue, err := value.MarshalBinary()
if err != nil { if err != nil {
return errors.Trace(err) return err
} }
batch.Put(writeKey, writeValue) batch.Put(writeKey, writeValue)
batch.Delete(mvccEncode(key, lockVer)) batch.Delete(mvccEncode(key, lockVer))
@ -968,7 +968,7 @@ func (mvcc *MVCCLevelDB) Rollback(keys [][]byte, startTS uint64) error {
for _, k := range keys { for _, k := range keys {
err := rollbackKey(mvcc.db, batch, k, startTS) err := rollbackKey(mvcc.db, batch, k, startTS)
if err != nil { if err != nil {
return errors.Trace(err) return err
} }
} }
return mvcc.db.Write(batch, nil) return mvcc.db.Write(batch, nil)
@ -987,12 +987,12 @@ func rollbackKey(db *leveldb.DB, batch *leveldb.Batch, key []byte, startTS uint6
} }
ok, err := dec.Decode(iter) ok, err := dec.Decode(iter)
if err != nil { if err != nil {
return errors.Trace(err) return err
} }
// If current transaction's lock exist. // If current transaction's lock exist.
if ok && dec.lock.startTS == startTS { if ok && dec.lock.startTS == startTS {
if err = rollbackLock(batch, key, startTS); err != nil { if err = rollbackLock(batch, key, startTS); err != nil {
return errors.Trace(err) return err
} }
return nil return nil
} }
@ -1001,7 +1001,7 @@ func rollbackKey(db *leveldb.DB, batch *leveldb.Batch, key []byte, startTS uint6
// If commit info of current transaction exist. // If commit info of current transaction exist.
c, ok, err := getTxnCommitInfo(iter, key, startTS) c, ok, err := getTxnCommitInfo(iter, key, startTS)
if err != nil { if err != nil {
return errors.Trace(err) return err
} }
if ok { if ok {
// If current transaction is already committed. // If current transaction is already committed.
@ -1022,7 +1022,7 @@ func rollbackKey(db *leveldb.DB, batch *leveldb.Batch, key []byte, startTS uint6
writeKey := mvccEncode(key, startTS) writeKey := mvccEncode(key, startTS)
writeValue, err := value.MarshalBinary() writeValue, err := value.MarshalBinary()
if err != nil { if err != nil {
return errors.Trace(err) return err
} }
batch.Put(writeKey, writeValue) batch.Put(writeKey, writeValue)
return nil return nil
@ -1037,7 +1037,7 @@ func writeRollback(batch *leveldb.Batch, key []byte, startTS uint64) error {
writeKey := mvccEncode(key, startTS) writeKey := mvccEncode(key, startTS)
writeValue, err := tomb.MarshalBinary() writeValue, err := tomb.MarshalBinary()
if err != nil { if err != nil {
return errors.Trace(err) return err
} }
batch.Put(writeKey, writeValue) batch.Put(writeKey, writeValue)
return nil return nil
@ -1059,7 +1059,7 @@ func getTxnCommitInfo(iter *Iterator, expectKey []byte, startTS uint64) (mvccVal
} }
ok, err := dec.Decode(iter) ok, err := dec.Decode(iter)
if err != nil || !ok { if err != nil || !ok {
return mvccValue{}, ok, errors.Trace(err) return mvccValue{}, ok, err
} }
if dec.value.startTS == startTS { if dec.value.startTS == startTS {
@ -1111,7 +1111,7 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error {
// If the commit information of the current transaction exist. // If the commit information of the current transaction exist.
c, ok, err := getTxnCommitInfo(iter, key, startTS) c, ok, err := getTxnCommitInfo(iter, key, startTS)
if err != nil { if err != nil {
return errors.Trace(err) return err
} }
if ok { if ok {
// If the current transaction has already committed. // If the current transaction has already committed.
@ -1132,7 +1132,7 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error {
writeKey := mvccEncode(key, startTS) writeKey := mvccEncode(key, startTS)
writeValue, err := value.MarshalBinary() writeValue, err := value.MarshalBinary()
if err != nil { if err != nil {
return errors.Trace(err) return err
} }
batch.Put(writeKey, writeValue) batch.Put(writeKey, writeValue)
return nil return nil
@ -1169,7 +1169,6 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS
var ok bool var ok bool
ok, err = dec.Decode(iter) ok, err = dec.Decode(iter)
if err != nil { if err != nil {
err = errors.Trace(err)
return return
} }
// If current transaction's lock exists. // If current transaction's lock exists.
@ -1182,18 +1181,16 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS
if resolvingPessimisticLock && lock.op == kvrpcpb.Op_PessimisticLock { if resolvingPessimisticLock && lock.op == kvrpcpb.Op_PessimisticLock {
action = kvrpcpb.Action_TTLExpirePessimisticRollback action = kvrpcpb.Action_TTLExpirePessimisticRollback
if err = pessimisticRollbackKey(mvcc.db, batch, primaryKey, lock.startTS, lock.forUpdateTS); err != nil { if err = pessimisticRollbackKey(mvcc.db, batch, primaryKey, lock.startTS, lock.forUpdateTS); err != nil {
err = errors.Trace(err)
return return
} }
} else { } else {
action = kvrpcpb.Action_TTLExpireRollback action = kvrpcpb.Action_TTLExpireRollback
if err = rollbackLock(batch, primaryKey, lockTS); err != nil { if err = rollbackLock(batch, primaryKey, lockTS); err != nil {
err = errors.Trace(err)
return return
} }
} }
if err = mvcc.db.Write(batch, nil); err != nil { if err = mvcc.db.Write(batch, nil); err != nil {
err = errors.Trace(err) err = errors.WithStack(err)
return return
} }
return 0, 0, action, nil return 0, 0, action, nil
@ -1223,12 +1220,12 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS
writeKey := mvccEncode(primaryKey, lockVer) writeKey := mvccEncode(primaryKey, lockVer)
writeValue, err1 := lock.MarshalBinary() writeValue, err1 := lock.MarshalBinary()
if err1 != nil { if err1 != nil {
err = errors.Trace(err1) err = err1
return return
} }
batch.Put(writeKey, writeValue) batch.Put(writeKey, writeValue)
if err1 = mvcc.db.Write(batch, nil); err1 != nil { if err1 = mvcc.db.Write(batch, nil); err1 != nil {
err = errors.Trace(err1) err = errors.WithStack(err1)
return return
} }
} }
@ -1241,7 +1238,7 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS
// If the commit info of the current transaction exists. // If the commit info of the current transaction exists.
c, ok, err1 := getTxnCommitInfo(iter, primaryKey, lockTS) c, ok, err1 := getTxnCommitInfo(iter, primaryKey, lockTS)
if err1 != nil { if err1 != nil {
err = errors.Trace(err1) err = err1
return return
} }
if ok { if ok {
@ -1271,11 +1268,11 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS
// primary key, see case TestSingleStatementRollback in session_test suite for example // primary key, see case TestSingleStatementRollback in session_test suite for example
batch := &leveldb.Batch{} batch := &leveldb.Batch{}
if err1 := writeRollback(batch, primaryKey, lockTS); err1 != nil { if err1 := writeRollback(batch, primaryKey, lockTS); err1 != nil {
err = errors.Trace(err1) err = err1
return return
} }
if err1 := mvcc.db.Write(batch, nil); err1 != nil { if err1 := mvcc.db.Write(batch, nil); err1 != nil {
err = errors.Trace(err1) err = errors.WithStack(err1)
return return
} }
return 0, 0, kvrpcpb.Action_LockNotExistRollback, nil return 0, 0, kvrpcpb.Action_LockNotExistRollback, nil
@ -1304,7 +1301,7 @@ func (mvcc *MVCCLevelDB) TxnHeartBeat(key []byte, startTS uint64, adviseTTL uint
} }
ok, err := dec.Decode(iter) ok, err := dec.Decode(iter)
if err != nil { if err != nil {
return 0, errors.Trace(err) return 0, err
} }
if ok && dec.lock.startTS == startTS { if ok && dec.lock.startTS == startTS {
if !bytes.Equal(dec.lock.primary, key) { if !bytes.Equal(dec.lock.primary, key) {
@ -1319,11 +1316,11 @@ func (mvcc *MVCCLevelDB) TxnHeartBeat(key []byte, startTS uint64, adviseTTL uint
writeKey := mvccEncode(key, lockVer) writeKey := mvccEncode(key, lockVer)
writeValue, err := lock.MarshalBinary() writeValue, err := lock.MarshalBinary()
if err != nil { if err != nil {
return 0, errors.Trace(err) return 0, err
} }
batch.Put(writeKey, writeValue) batch.Put(writeKey, writeValue)
if err = mvcc.db.Write(batch, nil); err != nil { if err = mvcc.db.Write(batch, nil); err != nil {
return 0, errors.Trace(err) return 0, errors.WithStack(err)
} }
} }
return lock.ttl, nil return lock.ttl, nil
@ -1340,7 +1337,7 @@ func (mvcc *MVCCLevelDB) ScanLock(startKey, endKey []byte, maxTS uint64) ([]*kvr
iter, currKey, err := newScanIterator(mvcc.db, startKey, endKey) iter, currKey, err := newScanIterator(mvcc.db, startKey, endKey)
defer iter.Release() defer iter.Release()
if err != nil { if err != nil {
return nil, errors.Trace(err) return nil, err
} }
var locks []*kvrpcpb.LockInfo var locks []*kvrpcpb.LockInfo
@ -1348,7 +1345,7 @@ func (mvcc *MVCCLevelDB) ScanLock(startKey, endKey []byte, maxTS uint64) ([]*kvr
dec := lockDecoder{expectKey: currKey} dec := lockDecoder{expectKey: currKey}
ok, err := dec.Decode(iter) ok, err := dec.Decode(iter)
if err != nil { if err != nil {
return nil, errors.Trace(err) return nil, err
} }
if ok && dec.lock.startTS <= maxTS { if ok && dec.lock.startTS <= maxTS {
locks = append(locks, &kvrpcpb.LockInfo{ locks = append(locks, &kvrpcpb.LockInfo{
@ -1361,7 +1358,7 @@ func (mvcc *MVCCLevelDB) ScanLock(startKey, endKey []byte, maxTS uint64) ([]*kvr
skip := skipDecoder{currKey: currKey} skip := skipDecoder{currKey: currKey}
_, err = skip.Decode(iter) _, err = skip.Decode(iter)
if err != nil { if err != nil {
return nil, errors.Trace(err) return nil, err
} }
currKey = skip.currKey currKey = skip.currKey
} }
@ -1376,7 +1373,7 @@ func (mvcc *MVCCLevelDB) ResolveLock(startKey, endKey []byte, startTS, commitTS
iter, currKey, err := newScanIterator(mvcc.db, startKey, endKey) iter, currKey, err := newScanIterator(mvcc.db, startKey, endKey)
defer iter.Release() defer iter.Release()
if err != nil { if err != nil {
return errors.Trace(err) return err
} }
batch := &leveldb.Batch{} batch := &leveldb.Batch{}
@ -1384,7 +1381,7 @@ func (mvcc *MVCCLevelDB) ResolveLock(startKey, endKey []byte, startTS, commitTS
dec := lockDecoder{expectKey: currKey} dec := lockDecoder{expectKey: currKey}
ok, err := dec.Decode(iter) ok, err := dec.Decode(iter)
if err != nil { if err != nil {
return errors.Trace(err) return err
} }
if ok && dec.lock.startTS == startTS { if ok && dec.lock.startTS == startTS {
if commitTS > 0 { if commitTS > 0 {
@ -1393,14 +1390,14 @@ func (mvcc *MVCCLevelDB) ResolveLock(startKey, endKey []byte, startTS, commitTS
err = rollbackLock(batch, currKey, startTS) err = rollbackLock(batch, currKey, startTS)
} }
if err != nil { if err != nil {
return errors.Trace(err) return err
} }
} }
skip := skipDecoder{currKey: currKey} skip := skipDecoder{currKey: currKey}
_, err = skip.Decode(iter) _, err = skip.Decode(iter)
if err != nil { if err != nil {
return errors.Trace(err) return err
} }
currKey = skip.currKey currKey = skip.currKey
} }
@ -1415,7 +1412,7 @@ func (mvcc *MVCCLevelDB) BatchResolveLock(startKey, endKey []byte, txnInfos map[
iter, currKey, err := newScanIterator(mvcc.db, startKey, endKey) iter, currKey, err := newScanIterator(mvcc.db, startKey, endKey)
defer iter.Release() defer iter.Release()
if err != nil { if err != nil {
return errors.Trace(err) return err
} }
batch := &leveldb.Batch{} batch := &leveldb.Batch{}
@ -1423,7 +1420,7 @@ func (mvcc *MVCCLevelDB) BatchResolveLock(startKey, endKey []byte, txnInfos map[
dec := lockDecoder{expectKey: currKey} dec := lockDecoder{expectKey: currKey}
ok, err := dec.Decode(iter) ok, err := dec.Decode(iter)
if err != nil { if err != nil {
return errors.Trace(err) return err
} }
if ok { if ok {
if commitTS, ok := txnInfos[dec.lock.startTS]; ok { if commitTS, ok := txnInfos[dec.lock.startTS]; ok {
@ -1433,7 +1430,7 @@ func (mvcc *MVCCLevelDB) BatchResolveLock(startKey, endKey []byte, txnInfos map[
err = rollbackLock(batch, currKey, dec.lock.startTS) err = rollbackLock(batch, currKey, dec.lock.startTS)
} }
if err != nil { if err != nil {
return errors.Trace(err) return err
} }
} }
} }
@ -1441,7 +1438,7 @@ func (mvcc *MVCCLevelDB) BatchResolveLock(startKey, endKey []byte, txnInfos map[
skip := skipDecoder{currKey: currKey} skip := skipDecoder{currKey: currKey}
_, err = skip.Decode(iter) _, err = skip.Decode(iter)
if err != nil { if err != nil {
return errors.Trace(err) return err
} }
currKey = skip.currKey currKey = skip.currKey
} }
@ -1456,7 +1453,7 @@ func (mvcc *MVCCLevelDB) GC(startKey, endKey []byte, safePoint uint64) error {
iter, currKey, err := newScanIterator(mvcc.db, startKey, endKey) iter, currKey, err := newScanIterator(mvcc.db, startKey, endKey)
defer iter.Release() defer iter.Release()
if err != nil { if err != nil {
return errors.Trace(err) return err
} }
// Mock TiKV usually doesn't need to process large amount of data. So write it in a single batch. // Mock TiKV usually doesn't need to process large amount of data. So write it in a single batch.
@ -1466,7 +1463,7 @@ func (mvcc *MVCCLevelDB) GC(startKey, endKey []byte, safePoint uint64) error {
lockDec := lockDecoder{expectKey: currKey} lockDec := lockDecoder{expectKey: currKey}
ok, err := lockDec.Decode(iter) ok, err := lockDec.Decode(iter)
if err != nil { if err != nil {
return errors.Trace(err) return err
} }
if ok && lockDec.lock.startTS <= safePoint { if ok && lockDec.lock.startTS <= safePoint {
return errors.Errorf( return errors.Errorf(
@ -1482,14 +1479,14 @@ func (mvcc *MVCCLevelDB) GC(startKey, endKey []byte, safePoint uint64) error {
for iter.Valid() { for iter.Valid() {
ok, err := dec.Decode(iter) ok, err := dec.Decode(iter)
if err != nil { if err != nil {
return errors.Trace(err) return err
} }
if !ok { if !ok {
// Go to the next key // Go to the next key
currKey, _, err = mvccDecode(iter.Key()) currKey, _, err = mvccDecode(iter.Key())
if err != nil { if err != nil {
return errors.Trace(err) return err
} }
break break
} }

View File

@ -911,7 +911,7 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
} }
batchResp, err := c.coprHandler.HandleBatchCop(ctx, reqCtx, session, req.BatchCop(), timeout) batchResp, err := c.coprHandler.HandleBatchCop(ctx, reqCtx, session, req.BatchCop(), timeout)
if err != nil { if err != nil {
return nil, errors.Trace(err) return nil, errors.WithStack(err)
} }
resp.Resp = batchResp resp.Resp = batchResp
case tikvrpc.CmdCopStream: case tikvrpc.CmdCopStream:
@ -922,7 +922,7 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
session.rawEndKey = MvccKey(session.endKey).Raw() session.rawEndKey = MvccKey(session.endKey).Raw()
streamResp, err := c.coprHandler.HandleCopStream(ctx, reqCtx, session, req.Cop(), timeout) streamResp, err := c.coprHandler.HandleCopStream(ctx, reqCtx, session, req.Cop(), timeout)
if err != nil { if err != nil {
return nil, errors.Trace(err) return nil, errors.WithStack(err)
} }
resp.Resp = streamResp resp.Resp = streamResp
case tikvrpc.CmdMvccGetByKey: case tikvrpc.CmdMvccGetByKey: