mirror of https://github.com/tikv/client-go.git
lock_test: replace pingcap/check with testify (#160)
Signed-off-by: disksing <i@disksing.com> Co-authored-by: Shirly <AndreMouche@126.com>
This commit is contained in:
parent
f634df18a9
commit
74baa6990e
|
|
@ -39,13 +39,14 @@ import (
|
|||
"math"
|
||||
"runtime"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
. "github.com/pingcap/check"
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/failpoint"
|
||||
deadlockpb "github.com/pingcap/kvproto/pkg/deadlock"
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/stretchr/testify/suite"
|
||||
tikverr "github.com/tikv/client-go/v2/error"
|
||||
"github.com/tikv/client-go/v2/kv"
|
||||
"github.com/tikv/client-go/v2/oracle"
|
||||
|
|
@ -55,127 +56,130 @@ import (
|
|||
|
||||
var getMaxBackoff = tikv.ConfigProbe{}.GetGetMaxBackoff()
|
||||
|
||||
func TestLock(t *testing.T) {
|
||||
suite.Run(t, new(testLockSuite))
|
||||
}
|
||||
|
||||
type testLockSuite struct {
|
||||
suite.Suite
|
||||
store tikv.StoreProbe
|
||||
}
|
||||
|
||||
var _ = SerialSuites(&testLockSuite{})
|
||||
|
||||
func (s *testLockSuite) SetUpTest(c *C) {
|
||||
s.store = tikv.StoreProbe{KVStore: NewTestStore(c)}
|
||||
func (s *testLockSuite) SetupTest() {
|
||||
s.store = tikv.StoreProbe{KVStore: NewTestStoreT(s.T())}
|
||||
}
|
||||
|
||||
func (s *testLockSuite) TearDownTest(c *C) {
|
||||
func (s *testLockSuite) TearDownTest() {
|
||||
s.store.Close()
|
||||
}
|
||||
|
||||
func (s *testLockSuite) lockKey(c *C, key, value, primaryKey, primaryValue []byte, commitPrimary bool) (uint64, uint64) {
|
||||
func (s *testLockSuite) lockKey(key, value, primaryKey, primaryValue []byte, commitPrimary bool) (uint64, uint64) {
|
||||
txn, err := s.store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
if len(value) > 0 {
|
||||
err = txn.Set(key, value)
|
||||
} else {
|
||||
err = txn.Delete(key)
|
||||
}
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
|
||||
if len(primaryValue) > 0 {
|
||||
err = txn.Set(primaryKey, primaryValue)
|
||||
} else {
|
||||
err = txn.Delete(primaryKey)
|
||||
}
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
tpc, err := txn.NewCommitter(0)
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
tpc.SetPrimaryKey(primaryKey)
|
||||
|
||||
ctx := context.Background()
|
||||
err = tpc.PrewriteAllMutations(ctx)
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
|
||||
if commitPrimary {
|
||||
commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
tpc.SetCommitTS(commitTS)
|
||||
err = tpc.CommitMutations(ctx)
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
}
|
||||
return txn.StartTS(), tpc.GetCommitTS()
|
||||
}
|
||||
|
||||
func (s *testLockSuite) putAlphabets(c *C) {
|
||||
func (s *testLockSuite) putAlphabets() {
|
||||
for ch := byte('a'); ch <= byte('z'); ch++ {
|
||||
s.putKV(c, []byte{ch}, []byte{ch})
|
||||
s.putKV([]byte{ch}, []byte{ch})
|
||||
}
|
||||
}
|
||||
|
||||
func (s *testLockSuite) putKV(c *C, key, value []byte) (uint64, uint64) {
|
||||
func (s *testLockSuite) putKV(key, value []byte) (uint64, uint64) {
|
||||
txn, err := s.store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
err = txn.Set(key, value)
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
err = txn.Commit(context.Background())
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
return txn.StartTS(), txn.GetCommitTS()
|
||||
}
|
||||
|
||||
func (s *testLockSuite) prepareAlphabetLocks(c *C) {
|
||||
s.putKV(c, []byte("c"), []byte("cc"))
|
||||
s.lockKey(c, []byte("c"), []byte("c"), []byte("z1"), []byte("z1"), true)
|
||||
s.lockKey(c, []byte("d"), []byte("dd"), []byte("z2"), []byte("z2"), false)
|
||||
s.lockKey(c, []byte("foo"), []byte("foo"), []byte("z3"), []byte("z3"), false)
|
||||
s.putKV(c, []byte("bar"), []byte("bar"))
|
||||
s.lockKey(c, []byte("bar"), nil, []byte("z4"), []byte("z4"), true)
|
||||
func (s *testLockSuite) prepareAlphabetLocks() {
|
||||
s.putKV([]byte("c"), []byte("cc"))
|
||||
s.lockKey([]byte("c"), []byte("c"), []byte("z1"), []byte("z1"), true)
|
||||
s.lockKey([]byte("d"), []byte("dd"), []byte("z2"), []byte("z2"), false)
|
||||
s.lockKey([]byte("foo"), []byte("foo"), []byte("z3"), []byte("z3"), false)
|
||||
s.putKV([]byte("bar"), []byte("bar"))
|
||||
s.lockKey([]byte("bar"), nil, []byte("z4"), []byte("z4"), true)
|
||||
}
|
||||
|
||||
func (s *testLockSuite) TestScanLockResolveWithGet(c *C) {
|
||||
s.putAlphabets(c)
|
||||
s.prepareAlphabetLocks(c)
|
||||
func (s *testLockSuite) TestScanLockResolveWithGet() {
|
||||
s.putAlphabets()
|
||||
s.prepareAlphabetLocks()
|
||||
|
||||
txn, err := s.store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
for ch := byte('a'); ch <= byte('z'); ch++ {
|
||||
v, err := txn.Get(context.TODO(), []byte{ch})
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(v, BytesEquals, []byte{ch})
|
||||
s.Nil(err)
|
||||
s.Equal(v, []byte{ch})
|
||||
}
|
||||
}
|
||||
|
||||
func (s *testLockSuite) TestScanLockResolveWithSeek(c *C) {
|
||||
s.putAlphabets(c)
|
||||
s.prepareAlphabetLocks(c)
|
||||
func (s *testLockSuite) TestScanLockResolveWithSeek() {
|
||||
s.putAlphabets()
|
||||
s.prepareAlphabetLocks()
|
||||
|
||||
txn, err := s.store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
iter, err := txn.Iter([]byte("a"), nil)
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
for ch := byte('a'); ch <= byte('z'); ch++ {
|
||||
c.Assert(iter.Valid(), IsTrue)
|
||||
c.Assert(iter.Key(), BytesEquals, []byte{ch})
|
||||
c.Assert(iter.Value(), BytesEquals, []byte{ch})
|
||||
c.Assert(iter.Next(), IsNil)
|
||||
s.True(iter.Valid())
|
||||
s.Equal(iter.Key(), []byte{ch})
|
||||
s.Equal(iter.Value(), []byte{ch})
|
||||
s.Nil(iter.Next())
|
||||
}
|
||||
}
|
||||
|
||||
func (s *testLockSuite) TestScanLockResolveWithSeekKeyOnly(c *C) {
|
||||
s.putAlphabets(c)
|
||||
s.prepareAlphabetLocks(c)
|
||||
func (s *testLockSuite) TestScanLockResolveWithSeekKeyOnly() {
|
||||
s.putAlphabets()
|
||||
s.prepareAlphabetLocks()
|
||||
|
||||
txn, err := s.store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
txn.GetSnapshot().SetKeyOnly(true)
|
||||
iter, err := txn.Iter([]byte("a"), nil)
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
for ch := byte('a'); ch <= byte('z'); ch++ {
|
||||
c.Assert(iter.Valid(), IsTrue)
|
||||
c.Assert(iter.Key(), BytesEquals, []byte{ch})
|
||||
c.Assert(iter.Next(), IsNil)
|
||||
s.True(iter.Valid())
|
||||
s.Equal(iter.Key(), []byte{ch})
|
||||
s.Nil(iter.Next())
|
||||
}
|
||||
}
|
||||
|
||||
func (s *testLockSuite) TestScanLockResolveWithBatchGet(c *C) {
|
||||
s.putAlphabets(c)
|
||||
s.prepareAlphabetLocks(c)
|
||||
func (s *testLockSuite) TestScanLockResolveWithBatchGet() {
|
||||
s.putAlphabets()
|
||||
s.prepareAlphabetLocks()
|
||||
|
||||
var keys [][]byte
|
||||
for ch := byte('a'); ch <= byte('z'); ch++ {
|
||||
|
|
@ -183,188 +187,188 @@ func (s *testLockSuite) TestScanLockResolveWithBatchGet(c *C) {
|
|||
}
|
||||
|
||||
txn, err := s.store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
m, err := toTiDBTxn(&txn).BatchGet(context.Background(), toTiDBKeys(keys))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(len(m), Equals, int('z'-'a'+1))
|
||||
s.Nil(err)
|
||||
s.Equal(len(m), int('z'-'a'+1))
|
||||
for ch := byte('a'); ch <= byte('z'); ch++ {
|
||||
k := []byte{ch}
|
||||
c.Assert(m[string(k)], BytesEquals, k)
|
||||
s.Equal(m[string(k)], k)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *testLockSuite) TestCleanLock(c *C) {
|
||||
func (s *testLockSuite) TestCleanLock() {
|
||||
for ch := byte('a'); ch <= byte('z'); ch++ {
|
||||
k := []byte{ch}
|
||||
s.lockKey(c, k, k, k, k, false)
|
||||
s.lockKey(k, k, k, k, false)
|
||||
}
|
||||
txn, err := s.store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
for ch := byte('a'); ch <= byte('z'); ch++ {
|
||||
err = txn.Set([]byte{ch}, []byte{ch + 1})
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
}
|
||||
err = txn.Commit(context.Background())
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
}
|
||||
|
||||
func (s *testLockSuite) TestGetTxnStatus(c *C) {
|
||||
startTS, commitTS := s.putKV(c, []byte("a"), []byte("a"))
|
||||
func (s *testLockSuite) TestGetTxnStatus() {
|
||||
startTS, commitTS := s.putKV([]byte("a"), []byte("a"))
|
||||
status, err := s.store.GetLockResolver().GetTxnStatus(startTS, startTS, []byte("a"))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.IsCommitted(), IsTrue)
|
||||
c.Assert(status.CommitTS(), Equals, commitTS)
|
||||
s.Nil(err)
|
||||
s.True(status.IsCommitted())
|
||||
s.Equal(status.CommitTS(), commitTS)
|
||||
|
||||
startTS, commitTS = s.lockKey(c, []byte("a"), []byte("a"), []byte("a"), []byte("a"), true)
|
||||
startTS, commitTS = s.lockKey([]byte("a"), []byte("a"), []byte("a"), []byte("a"), true)
|
||||
status, err = s.store.GetLockResolver().GetTxnStatus(startTS, startTS, []byte("a"))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.IsCommitted(), IsTrue)
|
||||
c.Assert(status.CommitTS(), Equals, commitTS)
|
||||
s.Nil(err)
|
||||
s.True(status.IsCommitted())
|
||||
s.Equal(status.CommitTS(), commitTS)
|
||||
|
||||
startTS, _ = s.lockKey(c, []byte("a"), []byte("a"), []byte("a"), []byte("a"), false)
|
||||
startTS, _ = s.lockKey([]byte("a"), []byte("a"), []byte("a"), []byte("a"), false)
|
||||
status, err = s.store.GetLockResolver().GetTxnStatus(startTS, startTS, []byte("a"))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.IsCommitted(), IsFalse)
|
||||
c.Assert(status.TTL(), Greater, uint64(0), Commentf("action:%s", status.Action()))
|
||||
s.Nil(err)
|
||||
s.False(status.IsCommitted())
|
||||
s.Greater(status.TTL(), uint64(0), fmt.Sprintf("action:%s", status.Action()))
|
||||
}
|
||||
|
||||
func (s *testLockSuite) TestCheckTxnStatusTTL(c *C) {
|
||||
func (s *testLockSuite) TestCheckTxnStatusTTL() {
|
||||
txn, err := s.store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
txn.Set([]byte("key"), []byte("value"))
|
||||
s.prewriteTxnWithTTL(c, txn, 1000)
|
||||
s.prewriteTxnWithTTL(txn, 1000)
|
||||
|
||||
bo := tikv.NewBackofferWithVars(context.Background(), tikv.PrewriteMaxBackoff, nil)
|
||||
lr := s.store.NewLockResolver()
|
||||
callerStartTS, err := s.store.GetOracle().GetTimestamp(bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
|
||||
// Check the lock TTL of a transaction.
|
||||
status, err := lr.LockResolver.GetTxnStatus(txn.StartTS(), callerStartTS, []byte("key"))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.IsCommitted(), IsFalse)
|
||||
c.Assert(status.TTL(), Greater, uint64(0))
|
||||
c.Assert(status.CommitTS(), Equals, uint64(0))
|
||||
s.Nil(err)
|
||||
s.False(status.IsCommitted())
|
||||
s.Greater(status.TTL(), uint64(0))
|
||||
s.Equal(status.CommitTS(), uint64(0))
|
||||
|
||||
// Rollback the txn.
|
||||
lock := s.mustGetLock(c, []byte("key"))
|
||||
lock := s.mustGetLock([]byte("key"))
|
||||
err = s.store.NewLockResolver().ResolveLock(context.Background(), lock)
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
|
||||
// Check its status is rollbacked.
|
||||
status, err = lr.LockResolver.GetTxnStatus(txn.StartTS(), callerStartTS, []byte("key"))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.TTL(), Equals, uint64(0))
|
||||
c.Assert(status.CommitTS(), Equals, uint64(0))
|
||||
c.Assert(status.Action(), Equals, kvrpcpb.Action_NoAction)
|
||||
s.Nil(err)
|
||||
s.Equal(status.TTL(), uint64(0))
|
||||
s.Equal(status.CommitTS(), uint64(0))
|
||||
s.Equal(status.Action(), kvrpcpb.Action_NoAction)
|
||||
|
||||
// Check a committed txn.
|
||||
startTS, commitTS := s.putKV(c, []byte("a"), []byte("a"))
|
||||
startTS, commitTS := s.putKV([]byte("a"), []byte("a"))
|
||||
status, err = lr.LockResolver.GetTxnStatus(startTS, callerStartTS, []byte("a"))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.TTL(), Equals, uint64(0))
|
||||
c.Assert(status.CommitTS(), Equals, commitTS)
|
||||
s.Nil(err)
|
||||
s.Equal(status.TTL(), uint64(0))
|
||||
s.Equal(status.CommitTS(), commitTS)
|
||||
}
|
||||
|
||||
func (s *testLockSuite) TestTxnHeartBeat(c *C) {
|
||||
func (s *testLockSuite) TestTxnHeartBeat() {
|
||||
txn, err := s.store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
txn.Set([]byte("key"), []byte("value"))
|
||||
s.prewriteTxn(c, txn)
|
||||
s.prewriteTxn(txn)
|
||||
|
||||
newTTL, err := s.store.SendTxnHeartbeat(context.Background(), []byte("key"), txn.StartTS(), 6666)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(newTTL, Equals, uint64(6666))
|
||||
s.Nil(err)
|
||||
s.Equal(newTTL, uint64(6666))
|
||||
|
||||
newTTL, err = s.store.SendTxnHeartbeat(context.Background(), []byte("key"), txn.StartTS(), 5555)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(newTTL, Equals, uint64(6666))
|
||||
s.Nil(err)
|
||||
s.Equal(newTTL, uint64(6666))
|
||||
|
||||
lock := s.mustGetLock(c, []byte("key"))
|
||||
lock := s.mustGetLock([]byte("key"))
|
||||
err = s.store.NewLockResolver().ResolveLock(context.Background(), lock)
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
|
||||
newTTL, err = s.store.SendTxnHeartbeat(context.Background(), []byte("key"), txn.StartTS(), 6666)
|
||||
c.Assert(err, NotNil)
|
||||
c.Assert(newTTL, Equals, uint64(0))
|
||||
s.NotNil(err)
|
||||
s.Equal(newTTL, uint64(0))
|
||||
}
|
||||
|
||||
func (s *testLockSuite) TestCheckTxnStatus(c *C) {
|
||||
func (s *testLockSuite) TestCheckTxnStatus() {
|
||||
txn, err := s.store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
txn.Set([]byte("key"), []byte("value"))
|
||||
txn.Set([]byte("second"), []byte("xxx"))
|
||||
s.prewriteTxnWithTTL(c, txn, 1000)
|
||||
s.prewriteTxnWithTTL(txn, 1000)
|
||||
|
||||
o := s.store.GetOracle()
|
||||
currentTS, err := o.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(currentTS, Greater, txn.StartTS())
|
||||
s.Nil(err)
|
||||
s.Greater(currentTS, txn.StartTS())
|
||||
|
||||
bo := tikv.NewBackofferWithVars(context.Background(), tikv.PrewriteMaxBackoff, nil)
|
||||
resolver := s.store.NewLockResolver()
|
||||
// Call getTxnStatus to check the lock status.
|
||||
status, err := resolver.GetTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, true, false, nil)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.IsCommitted(), IsFalse)
|
||||
c.Assert(status.TTL(), Greater, uint64(0))
|
||||
c.Assert(status.CommitTS(), Equals, uint64(0))
|
||||
c.Assert(status.Action(), Equals, kvrpcpb.Action_MinCommitTSPushed)
|
||||
s.Nil(err)
|
||||
s.False(status.IsCommitted())
|
||||
s.Greater(status.TTL(), uint64(0))
|
||||
s.Equal(status.CommitTS(), uint64(0))
|
||||
s.Equal(status.Action(), kvrpcpb.Action_MinCommitTSPushed)
|
||||
|
||||
// Test the ResolveLocks API
|
||||
lock := s.mustGetLock(c, []byte("second"))
|
||||
lock := s.mustGetLock([]byte("second"))
|
||||
timeBeforeExpire, _, err := resolver.ResolveLocks(bo, currentTS, []*tikv.Lock{lock})
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(timeBeforeExpire > int64(0), IsTrue)
|
||||
s.Nil(err)
|
||||
s.True(timeBeforeExpire > int64(0))
|
||||
|
||||
// Force rollback the lock using lock.TTL = 0.
|
||||
lock.TTL = uint64(0)
|
||||
timeBeforeExpire, _, err = resolver.ResolveLocks(bo, currentTS, []*tikv.Lock{lock})
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(timeBeforeExpire, Equals, int64(0))
|
||||
s.Nil(err)
|
||||
s.Equal(timeBeforeExpire, int64(0))
|
||||
|
||||
// Then call getTxnStatus again and check the lock status.
|
||||
currentTS, err = o.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
status, err = s.store.NewLockResolver().GetTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, 0, true, false, nil)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.TTL(), Equals, uint64(0))
|
||||
c.Assert(status.CommitTS(), Equals, uint64(0))
|
||||
c.Assert(status.Action(), Equals, kvrpcpb.Action_NoAction)
|
||||
s.Nil(err)
|
||||
s.Equal(status.TTL(), uint64(0))
|
||||
s.Equal(status.CommitTS(), uint64(0))
|
||||
s.Equal(status.Action(), kvrpcpb.Action_NoAction)
|
||||
|
||||
// Call getTxnStatus on a committed transaction.
|
||||
startTS, commitTS := s.putKV(c, []byte("a"), []byte("a"))
|
||||
startTS, commitTS := s.putKV([]byte("a"), []byte("a"))
|
||||
status, err = s.store.NewLockResolver().GetTxnStatus(bo, startTS, []byte("a"), currentTS, currentTS, true, false, nil)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.TTL(), Equals, uint64(0))
|
||||
c.Assert(status.CommitTS(), Equals, commitTS)
|
||||
s.Nil(err)
|
||||
s.Equal(status.TTL(), uint64(0))
|
||||
s.Equal(status.CommitTS(), commitTS)
|
||||
}
|
||||
|
||||
func (s *testLockSuite) TestCheckTxnStatusNoWait(c *C) {
|
||||
func (s *testLockSuite) TestCheckTxnStatusNoWait() {
|
||||
txn, err := s.store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
txn.Set([]byte("key"), []byte("value"))
|
||||
txn.Set([]byte("second"), []byte("xxx"))
|
||||
committer, err := txn.NewCommitter(0)
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
// Increase lock TTL to make CI more stable.
|
||||
committer.SetLockTTLByTimeAndSize(txn.GetStartTime(), 200*1024*1024)
|
||||
|
||||
// Only prewrite the secondary key to simulate a concurrent prewrite case:
|
||||
// prewrite secondary regions success and prewrite the primary region is pending.
|
||||
err = committer.PrewriteMutations(context.Background(), committer.MutationsOfKeys([][]byte{[]byte("second")}))
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
|
||||
o := s.store.GetOracle()
|
||||
currentTS, err := o.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
bo := tikv.NewBackofferWithVars(context.Background(), tikv.PrewriteMaxBackoff, nil)
|
||||
resolver := s.store.NewLockResolver()
|
||||
|
||||
// Call getTxnStatus for the TxnNotFound case.
|
||||
_, err = resolver.GetTxnStatus(bo, txn.StartTS(), []byte("key"), currentTS, currentTS, false, false, nil)
|
||||
c.Assert(err, NotNil)
|
||||
c.Assert(resolver.IsErrorNotFound(err), IsTrue)
|
||||
s.NotNil(err)
|
||||
s.True(resolver.IsErrorNotFound(err))
|
||||
|
||||
errCh := make(chan error)
|
||||
go func() {
|
||||
|
|
@ -379,14 +383,14 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait(c *C) {
|
|||
}
|
||||
// Call getTxnStatusFromLock to cover the retry logic.
|
||||
status, err := resolver.GetTxnStatusFromLock(bo, lock, currentTS, false)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.TTL(), Greater, uint64(0))
|
||||
c.Assert(<-errCh, IsNil)
|
||||
c.Assert(committer.CleanupMutations(context.Background()), IsNil)
|
||||
s.Nil(err)
|
||||
s.Greater(status.TTL(), uint64(0))
|
||||
s.Nil(<-errCh)
|
||||
s.Nil(committer.CleanupMutations(context.Background()))
|
||||
|
||||
// Call getTxnStatusFromLock to cover TxnNotFound and retry timeout.
|
||||
startTS, err := o.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
lock = &tikv.Lock{
|
||||
Key: []byte("second"),
|
||||
Primary: []byte("key_not_exist"),
|
||||
|
|
@ -394,154 +398,154 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait(c *C) {
|
|||
TTL: 1000,
|
||||
}
|
||||
status, err = resolver.GetTxnStatusFromLock(bo, lock, currentTS, false)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.TTL(), Equals, uint64(0))
|
||||
c.Assert(status.CommitTS(), Equals, uint64(0))
|
||||
c.Assert(status.Action(), Equals, kvrpcpb.Action_LockNotExistRollback)
|
||||
s.Nil(err)
|
||||
s.Equal(status.TTL(), uint64(0))
|
||||
s.Equal(status.CommitTS(), uint64(0))
|
||||
s.Equal(status.Action(), kvrpcpb.Action_LockNotExistRollback)
|
||||
}
|
||||
|
||||
func (s *testLockSuite) prewriteTxn(c *C, txn tikv.TxnProbe) {
|
||||
s.prewriteTxnWithTTL(c, txn, 0)
|
||||
func (s *testLockSuite) prewriteTxn(txn tikv.TxnProbe) {
|
||||
s.prewriteTxnWithTTL(txn, 0)
|
||||
}
|
||||
|
||||
func (s *testLockSuite) prewriteTxnWithTTL(c *C, txn tikv.TxnProbe, ttl uint64) {
|
||||
func (s *testLockSuite) prewriteTxnWithTTL(txn tikv.TxnProbe, ttl uint64) {
|
||||
committer, err := txn.NewCommitter(0)
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
if ttl > 0 {
|
||||
elapsed := time.Since(txn.GetStartTime()) / time.Millisecond
|
||||
committer.SetLockTTL(uint64(elapsed) + ttl)
|
||||
}
|
||||
err = committer.PrewriteAllMutations(context.Background())
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
}
|
||||
|
||||
func (s *testLockSuite) mustGetLock(c *C, key []byte) *tikv.Lock {
|
||||
func (s *testLockSuite) mustGetLock(key []byte) *tikv.Lock {
|
||||
ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
bo := tikv.NewBackofferWithVars(context.Background(), getMaxBackoff, nil)
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{
|
||||
Key: key,
|
||||
Version: ver,
|
||||
})
|
||||
loc, err := s.store.GetRegionCache().LocateKey(bo, key)
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
resp, err := s.store.SendReq(bo, req, loc.Region, tikv.ReadTimeoutShort)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(resp.Resp, NotNil)
|
||||
s.Nil(err)
|
||||
s.NotNil(resp.Resp)
|
||||
keyErr := resp.Resp.(*kvrpcpb.GetResponse).GetError()
|
||||
c.Assert(keyErr, NotNil)
|
||||
s.NotNil(keyErr)
|
||||
lock, err := tikv.LockProbe{}.ExtractLockFromKeyErr(keyErr)
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
return lock
|
||||
}
|
||||
|
||||
func (s *testLockSuite) ttlEquals(c *C, x, y uint64) {
|
||||
func (s *testLockSuite) ttlEquals(x, y uint64) {
|
||||
// NOTE: On ppc64le, all integers are by default unsigned integers,
|
||||
// hence we have to separately cast the value returned by "math.Abs()" function for ppc64le.
|
||||
if runtime.GOARCH == "ppc64le" {
|
||||
c.Assert(int(-math.Abs(float64(x-y))), LessEqual, 2)
|
||||
s.LessOrEqual(int(-math.Abs(float64(x-y))), 2)
|
||||
} else {
|
||||
c.Assert(int(math.Abs(float64(x-y))), LessEqual, 2)
|
||||
s.LessOrEqual(int(math.Abs(float64(x-y))), 2)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (s *testLockSuite) TestLockTTL(c *C) {
|
||||
func (s *testLockSuite) TestLockTTL() {
|
||||
defaultLockTTL := tikv.ConfigProbe{}.GetDefaultLockTTL()
|
||||
ttlFactor := tikv.ConfigProbe{}.GetTTLFactor()
|
||||
|
||||
txn, err := s.store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
txn.Set([]byte("key"), []byte("value"))
|
||||
time.Sleep(time.Millisecond)
|
||||
s.prewriteTxnWithTTL(c, txn, 3100)
|
||||
l := s.mustGetLock(c, []byte("key"))
|
||||
c.Assert(l.TTL >= defaultLockTTL, IsTrue)
|
||||
s.prewriteTxnWithTTL(txn, 3100)
|
||||
l := s.mustGetLock([]byte("key"))
|
||||
s.True(l.TTL >= defaultLockTTL)
|
||||
|
||||
// Huge txn has a greater TTL.
|
||||
txn, err = s.store.Begin()
|
||||
start := time.Now()
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
txn.Set([]byte("key"), []byte("value"))
|
||||
for i := 0; i < 2048; i++ {
|
||||
k, v := randKV(1024, 1024)
|
||||
txn.Set([]byte(k), []byte(v))
|
||||
}
|
||||
s.prewriteTxn(c, txn)
|
||||
l = s.mustGetLock(c, []byte("key"))
|
||||
s.ttlEquals(c, l.TTL, uint64(ttlFactor*2)+uint64(time.Since(start)/time.Millisecond))
|
||||
s.prewriteTxn(txn)
|
||||
l = s.mustGetLock([]byte("key"))
|
||||
s.ttlEquals(l.TTL, uint64(ttlFactor*2)+uint64(time.Since(start)/time.Millisecond))
|
||||
|
||||
// Txn with long read time.
|
||||
start = time.Now()
|
||||
txn, err = s.store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
time.Sleep(time.Millisecond * 50)
|
||||
txn.Set([]byte("key"), []byte("value"))
|
||||
s.prewriteTxn(c, txn)
|
||||
l = s.mustGetLock(c, []byte("key"))
|
||||
s.ttlEquals(c, l.TTL, defaultLockTTL+uint64(time.Since(start)/time.Millisecond))
|
||||
s.prewriteTxn(txn)
|
||||
l = s.mustGetLock([]byte("key"))
|
||||
s.ttlEquals(l.TTL, defaultLockTTL+uint64(time.Since(start)/time.Millisecond))
|
||||
}
|
||||
|
||||
func (s *testLockSuite) TestBatchResolveLocks(c *C) {
|
||||
func (s *testLockSuite) TestBatchResolveLocks() {
|
||||
// The first transaction is a normal transaction with a long TTL
|
||||
txn, err := s.store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
txn.Set([]byte("k1"), []byte("v1"))
|
||||
txn.Set([]byte("k2"), []byte("v2"))
|
||||
s.prewriteTxnWithTTL(c, txn, 20000)
|
||||
s.prewriteTxnWithTTL(txn, 20000)
|
||||
|
||||
// The second transaction is an async commit transaction
|
||||
txn, err = s.store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
txn.Set([]byte("k3"), []byte("v3"))
|
||||
txn.Set([]byte("k4"), []byte("v4"))
|
||||
committer, err := txn.NewCommitter(0)
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
committer.SetUseAsyncCommit()
|
||||
committer.SetLockTTL(20000)
|
||||
committer.PrewriteAllMutations(context.Background())
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
|
||||
var locks []*tikv.Lock
|
||||
for _, key := range []string{"k1", "k2", "k3", "k4"} {
|
||||
l := s.mustGetLock(c, []byte(key))
|
||||
l := s.mustGetLock([]byte(key))
|
||||
locks = append(locks, l)
|
||||
}
|
||||
|
||||
// Locks may not expired
|
||||
msBeforeLockExpired := s.store.GetOracle().UntilExpired(locks[0].TxnID, locks[1].TTL, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
||||
c.Assert(msBeforeLockExpired, Greater, int64(0))
|
||||
s.Greater(msBeforeLockExpired, int64(0))
|
||||
msBeforeLockExpired = s.store.GetOracle().UntilExpired(locks[3].TxnID, locks[3].TTL, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
||||
c.Assert(msBeforeLockExpired, Greater, int64(0))
|
||||
s.Greater(msBeforeLockExpired, int64(0))
|
||||
|
||||
lr := s.store.NewLockResolver()
|
||||
bo := tikv.NewGcResolveLockMaxBackoffer(context.Background())
|
||||
loc, err := s.store.GetRegionCache().LocateKey(bo, locks[0].Primary)
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
// Check BatchResolveLocks resolve the lock even the ttl is not expired.
|
||||
success, err := lr.BatchResolveLocks(bo, locks, loc.Region)
|
||||
c.Assert(success, IsTrue)
|
||||
c.Assert(err, IsNil)
|
||||
s.True(success)
|
||||
s.Nil(err)
|
||||
|
||||
txn, err = s.store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
// transaction 1 is rolled back
|
||||
_, err = txn.Get(context.Background(), []byte("k1"))
|
||||
c.Assert(err, Equals, tikverr.ErrNotExist)
|
||||
s.Equal(err, tikverr.ErrNotExist)
|
||||
_, err = txn.Get(context.Background(), []byte("k2"))
|
||||
c.Assert(err, Equals, tikverr.ErrNotExist)
|
||||
s.Equal(err, tikverr.ErrNotExist)
|
||||
// transaction 2 is committed
|
||||
v, err := txn.Get(context.Background(), []byte("k3"))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(bytes.Equal(v, []byte("v3")), IsTrue)
|
||||
s.Nil(err)
|
||||
s.True(bytes.Equal(v, []byte("v3")))
|
||||
v, err = txn.Get(context.Background(), []byte("k4"))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(bytes.Equal(v, []byte("v4")), IsTrue)
|
||||
s.Nil(err)
|
||||
s.True(bytes.Equal(v, []byte("v4")))
|
||||
}
|
||||
|
||||
func (s *testLockSuite) TestNewLockZeroTTL(c *C) {
|
||||
func (s *testLockSuite) TestNewLockZeroTTL() {
|
||||
l := tikv.NewLock(&kvrpcpb.LockInfo{})
|
||||
c.Assert(l.TTL, Equals, uint64(0))
|
||||
s.Equal(l.TTL, uint64(0))
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
|
@ -549,121 +553,121 @@ func init() {
|
|||
tikv.ConfigProbe{}.SetOracleUpdateInterval(2)
|
||||
}
|
||||
|
||||
func (s *testLockSuite) TestZeroMinCommitTS(c *C) {
|
||||
func (s *testLockSuite) TestZeroMinCommitTS() {
|
||||
txn, err := s.store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
txn.Set([]byte("key"), []byte("value"))
|
||||
bo := tikv.NewBackofferWithVars(context.Background(), tikv.PrewriteMaxBackoff, nil)
|
||||
|
||||
mockValue := fmt.Sprintf(`return(%d)`, txn.StartTS())
|
||||
c.Assert(failpoint.Enable("tikvclient/mockZeroCommitTS", mockValue), IsNil)
|
||||
s.prewriteTxnWithTTL(c, txn, 1000)
|
||||
c.Assert(failpoint.Disable("tikvclient/mockZeroCommitTS"), IsNil)
|
||||
s.Nil(failpoint.Enable("tikvclient/mockZeroCommitTS", mockValue))
|
||||
s.prewriteTxnWithTTL(txn, 1000)
|
||||
s.Nil(failpoint.Disable("tikvclient/mockZeroCommitTS"))
|
||||
|
||||
lock := s.mustGetLock(c, []byte("key"))
|
||||
lock := s.mustGetLock([]byte("key"))
|
||||
expire, pushed, err := s.store.NewLockResolver().ResolveLocks(bo, 0, []*tikv.Lock{lock})
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(pushed, HasLen, 0)
|
||||
c.Assert(expire, Greater, int64(0))
|
||||
s.Nil(err)
|
||||
s.Len(pushed, 0)
|
||||
s.Greater(expire, int64(0))
|
||||
|
||||
expire, pushed, err = s.store.NewLockResolver().ResolveLocks(bo, math.MaxUint64, []*tikv.Lock{lock})
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(pushed, HasLen, 1)
|
||||
c.Assert(expire, Greater, int64(0))
|
||||
s.Nil(err)
|
||||
s.Len(pushed, 1)
|
||||
s.Greater(expire, int64(0))
|
||||
|
||||
// Clean up this test.
|
||||
lock.TTL = uint64(0)
|
||||
expire, _, err = s.store.NewLockResolver().ResolveLocks(bo, 0, []*tikv.Lock{lock})
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(expire, Equals, int64(0))
|
||||
s.Nil(err)
|
||||
s.Equal(expire, int64(0))
|
||||
}
|
||||
|
||||
func (s *testLockSuite) prepareTxnFallenBackFromAsyncCommit(c *C) {
|
||||
func (s *testLockSuite) prepareTxnFallenBackFromAsyncCommit() {
|
||||
txn, err := s.store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
err = txn.Set([]byte("fb1"), []byte("1"))
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
err = txn.Set([]byte("fb2"), []byte("2"))
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
|
||||
committer, err := txn.NewCommitter(1)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(committer.GetMutations().Len(), Equals, 2)
|
||||
s.Nil(err)
|
||||
s.Equal(committer.GetMutations().Len(), 2)
|
||||
committer.SetLockTTL(0)
|
||||
committer.SetUseAsyncCommit()
|
||||
committer.SetCommitTS(committer.GetStartTS() + (100 << 18)) // 100ms
|
||||
|
||||
err = committer.PrewriteMutations(context.Background(), committer.GetMutations().Slice(0, 1))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(committer.IsAsyncCommit(), IsTrue)
|
||||
s.Nil(err)
|
||||
s.True(committer.IsAsyncCommit())
|
||||
|
||||
// Set an invalid maxCommitTS to produce MaxCommitTsTooLarge
|
||||
committer.SetMaxCommitTS(committer.GetStartTS() - 1)
|
||||
err = committer.PrewriteMutations(context.Background(), committer.GetMutations().Slice(1, 2))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(committer.IsAsyncCommit(), IsFalse) // Fallback due to MaxCommitTsTooLarge
|
||||
s.Nil(err)
|
||||
s.False(committer.IsAsyncCommit()) // Fallback due to MaxCommitTsTooLarge
|
||||
}
|
||||
|
||||
func (s *testLockSuite) TestCheckLocksFallenBackFromAsyncCommit(c *C) {
|
||||
s.prepareTxnFallenBackFromAsyncCommit(c)
|
||||
func (s *testLockSuite) TestCheckLocksFallenBackFromAsyncCommit() {
|
||||
s.prepareTxnFallenBackFromAsyncCommit()
|
||||
|
||||
lock := s.mustGetLock(c, []byte("fb1"))
|
||||
c.Assert(lock.UseAsyncCommit, IsTrue)
|
||||
lock := s.mustGetLock([]byte("fb1"))
|
||||
s.True(lock.UseAsyncCommit)
|
||||
bo := tikv.NewBackoffer(context.Background(), getMaxBackoff)
|
||||
lr := s.store.NewLockResolver()
|
||||
status, err := lr.GetTxnStatusFromLock(bo, lock, 0, false)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(tikv.LockProbe{}.GetPrimaryKeyFromTxnStatus(status), DeepEquals, []byte("fb1"))
|
||||
s.Nil(err)
|
||||
s.Equal(tikv.LockProbe{}.GetPrimaryKeyFromTxnStatus(status), []byte("fb1"))
|
||||
|
||||
err = lr.CheckAllSecondaries(bo, lock, &status)
|
||||
c.Assert(lr.IsNonAsyncCommitLock(err), IsTrue)
|
||||
s.True(lr.IsNonAsyncCommitLock(err))
|
||||
|
||||
status, err = lr.GetTxnStatusFromLock(bo, lock, 0, true)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(status.Action(), Equals, kvrpcpb.Action_TTLExpireRollback)
|
||||
c.Assert(status.TTL(), Equals, uint64(0))
|
||||
s.Nil(err)
|
||||
s.Equal(status.Action(), kvrpcpb.Action_TTLExpireRollback)
|
||||
s.Equal(status.TTL(), uint64(0))
|
||||
}
|
||||
|
||||
func (s *testLockSuite) TestResolveTxnFallenBackFromAsyncCommit(c *C) {
|
||||
s.prepareTxnFallenBackFromAsyncCommit(c)
|
||||
func (s *testLockSuite) TestResolveTxnFallenBackFromAsyncCommit() {
|
||||
s.prepareTxnFallenBackFromAsyncCommit()
|
||||
|
||||
lock := s.mustGetLock(c, []byte("fb1"))
|
||||
c.Assert(lock.UseAsyncCommit, IsTrue)
|
||||
lock := s.mustGetLock([]byte("fb1"))
|
||||
s.True(lock.UseAsyncCommit)
|
||||
bo := tikv.NewBackoffer(context.Background(), getMaxBackoff)
|
||||
expire, pushed, err := s.store.NewLockResolver().ResolveLocks(bo, 0, []*tikv.Lock{lock})
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(expire, Equals, int64(0))
|
||||
c.Assert(len(pushed), Equals, 0)
|
||||
s.Nil(err)
|
||||
s.Equal(expire, int64(0))
|
||||
s.Equal(len(pushed), 0)
|
||||
|
||||
t3, err := s.store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
_, err = t3.Get(context.Background(), []byte("fb1"))
|
||||
c.Assert(tikverr.IsErrNotFound(err), IsTrue)
|
||||
s.True(tikverr.IsErrNotFound(err))
|
||||
_, err = t3.Get(context.Background(), []byte("fb2"))
|
||||
c.Assert(tikverr.IsErrNotFound(err), IsTrue)
|
||||
s.True(tikverr.IsErrNotFound(err))
|
||||
}
|
||||
|
||||
func (s *testLockSuite) TestBatchResolveTxnFallenBackFromAsyncCommit(c *C) {
|
||||
s.prepareTxnFallenBackFromAsyncCommit(c)
|
||||
func (s *testLockSuite) TestBatchResolveTxnFallenBackFromAsyncCommit() {
|
||||
s.prepareTxnFallenBackFromAsyncCommit()
|
||||
|
||||
lock := s.mustGetLock(c, []byte("fb1"))
|
||||
c.Assert(lock.UseAsyncCommit, IsTrue)
|
||||
lock := s.mustGetLock([]byte("fb1"))
|
||||
s.True(lock.UseAsyncCommit)
|
||||
bo := tikv.NewBackoffer(context.Background(), getMaxBackoff)
|
||||
loc, err := s.store.GetRegionCache().LocateKey(bo, []byte("fb1"))
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
ok, err := s.store.NewLockResolver().BatchResolveLocks(bo, []*tikv.Lock{lock}, loc.Region)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(ok, IsTrue)
|
||||
s.Nil(err)
|
||||
s.True(ok)
|
||||
|
||||
t3, err := s.store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
_, err = t3.Get(context.Background(), []byte("fb1"))
|
||||
c.Assert(tikverr.IsErrNotFound(err), IsTrue)
|
||||
s.True(tikverr.IsErrNotFound(err))
|
||||
_, err = t3.Get(context.Background(), []byte("fb2"))
|
||||
c.Assert(tikverr.IsErrNotFound(err), IsTrue)
|
||||
s.True(tikverr.IsErrNotFound(err))
|
||||
}
|
||||
|
||||
func (s *testLockSuite) TestDeadlockReportWaitChain(c *C) {
|
||||
func (s *testLockSuite) TestDeadlockReportWaitChain() {
|
||||
// Utilities to make the test logic clear and simple.
|
||||
type txnWrapper struct {
|
||||
tikv.TxnProbe
|
||||
|
|
@ -684,13 +688,13 @@ func (s *testLockSuite) TestDeadlockReportWaitChain(c *C) {
|
|||
res := make([]*txnWrapper, 0, num)
|
||||
for i := 0; i < num; i++ {
|
||||
txnProbe, err := s.store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
txn := &txnWrapper{TxnProbe: txnProbe}
|
||||
txn.SetPessimistic(true)
|
||||
tag := fmt.Sprintf("tag-init%v", i)
|
||||
key := []byte{'k', byte(i)}
|
||||
err = txn.LockKeys(context.Background(), makeLockCtx(txn, tag), key)
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
|
||||
res = append(res, txn)
|
||||
}
|
||||
|
|
@ -699,7 +703,7 @@ func (s *testLockSuite) TestDeadlockReportWaitChain(c *C) {
|
|||
|
||||
// Let the i-th trnasaction lock the key that has been locked by j-th transaction
|
||||
tryLock := func(txns []*txnWrapper, i int, j int) error {
|
||||
c.Logf("txn %v try locking %v", i, j)
|
||||
s.T().Logf("txn %v try locking %v", i, j)
|
||||
txn := txns[i]
|
||||
tag := fmt.Sprintf("tag-%v-%v", i, j)
|
||||
key := []byte{'k', byte(j)}
|
||||
|
|
@ -714,10 +718,10 @@ func (s *testLockSuite) TestDeadlockReportWaitChain(c *C) {
|
|||
err := tryLock(txns, i, j)
|
||||
// After the lock being waited for is released, the transaction returns a WriteConflict error
|
||||
// unconditionally, which is by design.
|
||||
c.Assert(err, NotNil)
|
||||
c.Logf("txn %v wait for %v finished, err: %s", i, j, err.Error())
|
||||
s.NotNil(err)
|
||||
s.T().Logf("txn %v wait for %v finished, err: %s", i, j, err.Error())
|
||||
_, ok := errors.Cause(err).(*tikverr.ErrWriteConflict)
|
||||
c.Assert(ok, IsTrue)
|
||||
s.True(ok)
|
||||
}()
|
||||
}
|
||||
|
||||
|
|
@ -725,21 +729,21 @@ func (s *testLockSuite) TestDeadlockReportWaitChain(c *C) {
|
|||
// It's expected that each transaction should be rolled back after its blocker, so that `Rollback` will not
|
||||
// run when there's concurrent `LockKeys` running.
|
||||
// If it's blocked on the `Wait` forever, it means the transaction's blocker is not rolled back.
|
||||
c.Logf("rollback txn %v", i)
|
||||
s.T().Logf("rollback txn %v", i)
|
||||
txns[i].wg.Wait()
|
||||
err := txns[i].Rollback()
|
||||
c.Assert(err, IsNil)
|
||||
s.Nil(err)
|
||||
}
|
||||
|
||||
// Check the given WaitForEntry is caused by txn[i] waiting for txn[j].
|
||||
checkWaitChainEntry := func(txns []*txnWrapper, entry *deadlockpb.WaitForEntry, i, j int) {
|
||||
c.Assert(entry.Txn, Equals, txns[i].StartTS())
|
||||
c.Assert(entry.WaitForTxn, Equals, txns[j].StartTS())
|
||||
c.Assert(entry.Key, BytesEquals, []byte{'k', byte(j)})
|
||||
c.Assert(string(entry.ResourceGroupTag), Equals, fmt.Sprintf("tag-%v-%v", i, j))
|
||||
s.Equal(entry.Txn, txns[i].StartTS())
|
||||
s.Equal(entry.WaitForTxn, txns[j].StartTS())
|
||||
s.Equal(entry.Key, []byte{'k', byte(j)})
|
||||
s.Equal(string(entry.ResourceGroupTag), fmt.Sprintf("tag-%v-%v", i, j))
|
||||
}
|
||||
|
||||
c.Log("test case 1: 1->0->1")
|
||||
s.T().Log("test case 1: 1->0->1")
|
||||
|
||||
txns := prepareTxns(2)
|
||||
|
||||
|
|
@ -749,12 +753,12 @@ func (s *testLockSuite) TestDeadlockReportWaitChain(c *C) {
|
|||
|
||||
// txn2 tries locking k1 and encounters deadlock error.
|
||||
err := tryLock(txns, 1, 0)
|
||||
c.Assert(err, NotNil)
|
||||
s.NotNil(err)
|
||||
dl, ok := errors.Cause(err).(*tikverr.ErrDeadlock)
|
||||
c.Assert(ok, IsTrue)
|
||||
s.True(ok)
|
||||
|
||||
waitChain := dl.GetWaitChain()
|
||||
c.Assert(len(waitChain), Equals, 2)
|
||||
s.Equal(len(waitChain), 2)
|
||||
checkWaitChainEntry(txns, waitChain[0], 0, 1)
|
||||
checkWaitChainEntry(txns, waitChain[1], 1, 0)
|
||||
|
||||
|
|
@ -762,7 +766,7 @@ func (s *testLockSuite) TestDeadlockReportWaitChain(c *C) {
|
|||
waitAndRollback(txns, 1)
|
||||
waitAndRollback(txns, 0)
|
||||
|
||||
c.Log("test case 2: 3->2->0->1->3")
|
||||
s.T().Log("test case 2: 3->2->0->1->3")
|
||||
txns = prepareTxns(4)
|
||||
|
||||
makeWaitFor(txns, 0, 1)
|
||||
|
|
@ -772,13 +776,13 @@ func (s *testLockSuite) TestDeadlockReportWaitChain(c *C) {
|
|||
time.Sleep(time.Millisecond * 100)
|
||||
|
||||
err = tryLock(txns, 3, 2)
|
||||
c.Assert(err, NotNil)
|
||||
s.NotNil(err)
|
||||
dl, ok = errors.Cause(err).(*tikverr.ErrDeadlock)
|
||||
c.Assert(ok, IsTrue)
|
||||
s.True(ok)
|
||||
|
||||
waitChain = dl.GetWaitChain()
|
||||
c.Assert(len(waitChain), Equals, 4)
|
||||
c.Logf("wait chain: \n** %v\n**%v\n**%v\n**%v\n", waitChain[0], waitChain[1], waitChain[2], waitChain[3])
|
||||
s.Equal(len(waitChain), 4)
|
||||
s.T().Logf("wait chain: \n** %v\n**%v\n**%v\n**%v\n", waitChain[0], waitChain[1], waitChain[2], waitChain[3])
|
||||
checkWaitChainEntry(txns, waitChain[0], 2, 0)
|
||||
checkWaitChainEntry(txns, waitChain[1], 0, 1)
|
||||
checkWaitChainEntry(txns, waitChain[2], 1, 3)
|
||||
|
|
|
|||
Loading…
Reference in New Issue