From ccb3cdb2f14b5bf1bb9aa191cbf653f63c55d520 Mon Sep 17 00:00:00 2001 From: disksing Date: Thu, 24 Jun 2021 15:55:19 +0800 Subject: [PATCH] 1pc_tesst, async_commit_test: replace pingcap/check with testify (#162) Signed-off-by: disksing --- integration_tests/1pc_test.go | 236 ++++++++++--------- integration_tests/async_commit_fail_test.go | 167 +++++++------- integration_tests/async_commit_test.go | 243 ++++++++++---------- 3 files changed, 328 insertions(+), 318 deletions(-) diff --git a/integration_tests/1pc_test.go b/integration_tests/1pc_test.go index ff7a79a2..67616a49 100644 --- a/integration_tests/1pc_test.go +++ b/integration_tests/1pc_test.go @@ -34,8 +34,9 @@ package tikv_test import ( "context" + "testing" - . "github.com/pingcap/check" + "github.com/stretchr/testify/suite" "github.com/tikv/client-go/v2/metrics" "github.com/tikv/client-go/v2/mockstore" "github.com/tikv/client-go/v2/oracle" @@ -43,11 +44,8 @@ import ( "github.com/tikv/client-go/v2/util" ) -func (s *testAsyncCommitCommon) begin1PC(c *C) tikv.TxnProbe { - txn, err := s.store.Begin() - c.Assert(err, IsNil) - txn.SetEnable1PC(true) - return tikv.TxnProbe{KVTxn: txn} +func TestOnePC(t *testing.T) { + suite.Run(t, new(testOnePCSuite)) } type testOnePCSuite struct { @@ -55,56 +53,54 @@ type testOnePCSuite struct { bo *tikv.Backoffer } -var _ = SerialSuites(&testOnePCSuite{}) - -func (s *testOnePCSuite) SetUpTest(c *C) { - s.testAsyncCommitCommon.setUpTest(c) +func (s *testOnePCSuite) SetupTest() { + s.testAsyncCommitCommon.setUpTest() s.bo = tikv.NewBackofferWithVars(context.Background(), 5000, nil) } -func (s *testOnePCSuite) Test1PC(c *C) { +func (s *testOnePCSuite) Test1PC() { ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) k1 := []byte("k1") v1 := []byte("v1") - txn := s.begin1PC(c) + txn := s.begin1PC() err := txn.Set(k1, v1) - c.Assert(err, IsNil) + s.Nil(err) err = txn.Commit(ctx) - c.Assert(err, IsNil) - c.Assert(txn.GetCommitter().IsOnePC(), IsTrue) - c.Assert(txn.GetCommitter().GetOnePCCommitTS(), Equals, txn.GetCommitter().GetCommitTS()) - c.Assert(txn.GetCommitter().GetOnePCCommitTS(), Greater, txn.StartTS()) + s.Nil(err) + s.True(txn.GetCommitter().IsOnePC()) + s.Equal(txn.GetCommitter().GetOnePCCommitTS(), txn.GetCommitter().GetCommitTS()) + s.Greater(txn.GetCommitter().GetOnePCCommitTS(), txn.StartTS()) // ttlManager is not used for 1PC. - c.Assert(txn.GetCommitter().IsTTLUninitialized(), IsTrue) + s.True(txn.GetCommitter().IsTTLUninitialized()) // 1PC doesn't work if sessionID == 0 k2 := []byte("k2") v2 := []byte("v2") - txn = s.begin1PC(c) + txn = s.begin1PC() err = txn.Set(k2, v2) - c.Assert(err, IsNil) + s.Nil(err) err = txn.Commit(context.Background()) - c.Assert(err, IsNil) - c.Assert(txn.GetCommitter().IsOnePC(), IsFalse) - c.Assert(txn.GetCommitter().GetOnePCCommitTS(), Equals, uint64(0)) - c.Assert(txn.GetCommitter().GetCommitTS(), Greater, txn.StartTS()) + s.Nil(err) + s.False(txn.GetCommitter().IsOnePC()) + s.Equal(txn.GetCommitter().GetOnePCCommitTS(), uint64(0)) + s.Greater(txn.GetCommitter().GetCommitTS(), txn.StartTS()) // 1PC doesn't work if system variable not set k3 := []byte("k3") v3 := []byte("v3") - txn = s.begin(c) + txn = s.begin() err = txn.Set(k3, v3) - c.Assert(err, IsNil) + s.Nil(err) err = txn.Commit(ctx) - c.Assert(err, IsNil) - c.Assert(txn.GetCommitter().IsOnePC(), IsFalse) - c.Assert(txn.GetCommitter().GetOnePCCommitTS(), Equals, uint64(0)) - c.Assert(txn.GetCommitter().GetCommitTS(), Greater, txn.StartTS()) + s.Nil(err) + s.False(txn.GetCommitter().IsOnePC()) + s.Equal(txn.GetCommitter().GetOnePCCommitTS(), uint64(0)) + s.Greater(txn.GetCommitter().GetCommitTS(), txn.StartTS()) // Test multiple keys k4 := []byte("k4") @@ -114,89 +110,89 @@ func (s *testOnePCSuite) Test1PC(c *C) { k6 := []byte("k6") v6 := []byte("v6") - txn = s.begin1PC(c) + txn = s.begin1PC() err = txn.Set(k4, v4) - c.Assert(err, IsNil) + s.Nil(err) err = txn.Set(k5, v5) - c.Assert(err, IsNil) + s.Nil(err) err = txn.Set(k6, v6) - c.Assert(err, IsNil) + s.Nil(err) err = txn.Commit(ctx) - c.Assert(err, IsNil) - c.Assert(txn.GetCommitter().IsOnePC(), IsTrue) - c.Assert(txn.GetCommitter().GetOnePCCommitTS(), Equals, txn.GetCommitter().GetCommitTS()) - c.Assert(txn.GetCommitter().GetOnePCCommitTS(), Greater, txn.StartTS()) + s.Nil(err) + s.True(txn.GetCommitter().IsOnePC()) + s.Equal(txn.GetCommitter().GetOnePCCommitTS(), txn.GetCommitter().GetCommitTS()) + s.Greater(txn.GetCommitter().GetOnePCCommitTS(), txn.StartTS()) // Check keys are committed with the same version - s.mustGetFromSnapshot(c, txn.GetCommitTS(), k4, v4) - s.mustGetFromSnapshot(c, txn.GetCommitTS(), k5, v5) - s.mustGetFromSnapshot(c, txn.GetCommitTS(), k6, v6) - s.mustGetNoneFromSnapshot(c, txn.GetCommitTS()-1, k4) - s.mustGetNoneFromSnapshot(c, txn.GetCommitTS()-1, k5) - s.mustGetNoneFromSnapshot(c, txn.GetCommitTS()-1, k6) + s.mustGetFromSnapshot(txn.GetCommitTS(), k4, v4) + s.mustGetFromSnapshot(txn.GetCommitTS(), k5, v5) + s.mustGetFromSnapshot(txn.GetCommitTS(), k6, v6) + s.mustGetNoneFromSnapshot(txn.GetCommitTS()-1, k4) + s.mustGetNoneFromSnapshot(txn.GetCommitTS()-1, k5) + s.mustGetNoneFromSnapshot(txn.GetCommitTS()-1, k6) // Overwriting in MVCC v6New := []byte("v6new") - txn = s.begin1PC(c) + txn = s.begin1PC() err = txn.Set(k6, v6New) - c.Assert(err, IsNil) + s.Nil(err) err = txn.Commit(ctx) - c.Assert(err, IsNil) - c.Assert(txn.GetCommitter().IsOnePC(), IsTrue) - c.Assert(txn.GetCommitter().GetOnePCCommitTS(), Equals, txn.GetCommitter().GetCommitTS()) - c.Assert(txn.GetCommitter().GetOnePCCommitTS(), Greater, txn.StartTS()) - s.mustGetFromSnapshot(c, txn.GetCommitTS(), k6, v6New) - s.mustGetFromSnapshot(c, txn.GetCommitTS()-1, k6, v6) + s.Nil(err) + s.True(txn.GetCommitter().IsOnePC()) + s.Equal(txn.GetCommitter().GetOnePCCommitTS(), txn.GetCommitter().GetCommitTS()) + s.Greater(txn.GetCommitter().GetOnePCCommitTS(), txn.StartTS()) + s.mustGetFromSnapshot(txn.GetCommitTS(), k6, v6New) + s.mustGetFromSnapshot(txn.GetCommitTS()-1, k6, v6) // Check all keys keys := [][]byte{k1, k2, k3, k4, k5, k6} values := [][]byte{v1, v2, v3, v4, v5, v6New} ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) - c.Assert(err, IsNil) + s.Nil(err) snap := s.store.GetSnapshot(ver) for i, k := range keys { v, err := snap.Get(ctx, k) - c.Assert(err, IsNil) - c.Assert(v, BytesEquals, values[i]) + s.Nil(err) + s.Equal(v, values[i]) } } -func (s *testOnePCSuite) Test1PCIsolation(c *C) { +func (s *testOnePCSuite) Test1PCIsolation() { ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) k := []byte("k") v1 := []byte("v1") - txn := s.begin1PC(c) + txn := s.begin1PC() txn.Set(k, v1) err := txn.Commit(ctx) - c.Assert(err, IsNil) + s.Nil(err) v2 := []byte("v2") - txn = s.begin1PC(c) + txn = s.begin1PC() txn.Set(k, v2) // Make `txn`'s commitTs more likely to be less than `txn2`'s startTs if there's bug in commitTs // calculation. for i := 0; i < 10; i++ { _, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) - c.Assert(err, IsNil) + s.Nil(err) } - txn2 := s.begin1PC(c) - s.mustGetFromTxn(c, txn2, k, v1) + txn2 := s.begin1PC() + s.mustGetFromTxn(txn2, k, v1) err = txn.Commit(ctx) - c.Assert(txn.GetCommitter().IsOnePC(), IsTrue) - c.Assert(err, IsNil) + s.True(txn.GetCommitter().IsOnePC()) + s.Nil(err) - s.mustGetFromTxn(c, txn2, k, v1) - c.Assert(txn2.Rollback(), IsNil) + s.mustGetFromTxn(txn2, k, v1) + s.Nil(txn2.Rollback()) - s.mustGetFromSnapshot(c, txn.GetCommitTS(), k, v2) - s.mustGetFromSnapshot(c, txn.GetCommitTS()-1, k, v1) + s.mustGetFromSnapshot(txn.GetCommitTS(), k, v2) + s.mustGetFromSnapshot(txn.GetCommitTS()-1, k, v1) } -func (s *testOnePCSuite) Test1PCDisallowMultiRegion(c *C) { +func (s *testOnePCSuite) Test1PCDisallowMultiRegion() { // This test doesn't support tikv mode. if *mockstore.WithTiKV { return @@ -204,127 +200,127 @@ func (s *testOnePCSuite) Test1PCDisallowMultiRegion(c *C) { ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) - txn := s.begin1PC(c) + txn := s.begin1PC() keys := []string{"k0", "k1", "k2", "k3"} values := []string{"v0", "v1", "v2", "v3"} err := txn.Set([]byte(keys[0]), []byte(values[0])) - c.Assert(err, IsNil) + s.Nil(err) err = txn.Set([]byte(keys[3]), []byte(values[3])) - c.Assert(err, IsNil) + s.Nil(err) err = txn.Commit(ctx) - c.Assert(err, IsNil) + s.Nil(err) // 1PC doesn't work if it affects multiple regions. loc, err := s.store.GetRegionCache().LocateKey(s.bo, []byte(keys[2])) - c.Assert(err, IsNil) + s.Nil(err) newRegionID := s.cluster.AllocID() newPeerID := s.cluster.AllocID() s.cluster.Split(loc.Region.GetID(), newRegionID, []byte(keys[2]), []uint64{newPeerID}, newPeerID) - txn = s.begin1PC(c) + txn = s.begin1PC() err = txn.Set([]byte(keys[1]), []byte(values[1])) - c.Assert(err, IsNil) + s.Nil(err) err = txn.Set([]byte(keys[2]), []byte(values[2])) - c.Assert(err, IsNil) + s.Nil(err) err = txn.Commit(ctx) - c.Assert(err, IsNil) - c.Assert(txn.GetCommitter().IsOnePC(), IsFalse) - c.Assert(txn.GetCommitter().GetOnePCCommitTS(), Equals, uint64(0)) - c.Assert(txn.GetCommitter().GetCommitTS(), Greater, txn.StartTS()) + s.Nil(err) + s.False(txn.GetCommitter().IsOnePC()) + s.Equal(txn.GetCommitter().GetOnePCCommitTS(), uint64(0)) + s.Greater(txn.GetCommitter().GetCommitTS(), txn.StartTS()) ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) - c.Assert(err, IsNil) + s.Nil(err) snap := s.store.GetSnapshot(ver) for i, k := range keys { v, err := snap.Get(ctx, []byte(k)) - c.Assert(err, IsNil) - c.Assert(v, BytesEquals, []byte(values[i])) + s.Nil(err) + s.Equal(v, []byte(values[i])) } } // It's just a simple validation of linearizability. // Extra tests are needed to test this feature with the control of the TiKV cluster. -func (s *testOnePCSuite) Test1PCLinearizability(c *C) { - t1 := s.begin(c) - t2 := s.begin(c) +func (s *testOnePCSuite) Test1PCLinearizability() { + t1 := s.begin() + t2 := s.begin() err := t1.Set([]byte("a"), []byte("a1")) - c.Assert(err, IsNil) + s.Nil(err) err = t2.Set([]byte("b"), []byte("b1")) - c.Assert(err, IsNil) + s.Nil(err) ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) // t2 commits earlier than t1 err = t2.Commit(ctx) - c.Assert(err, IsNil) + s.Nil(err) err = t1.Commit(ctx) - c.Assert(err, IsNil) + s.Nil(err) commitTS1 := t1.GetCommitter().GetCommitTS() commitTS2 := t2.GetCommitter().GetCommitTS() - c.Assert(commitTS2, Less, commitTS1) + s.Less(commitTS2, commitTS1) } -func (s *testOnePCSuite) Test1PCWithMultiDC(c *C) { +func (s *testOnePCSuite) Test1PCWithMultiDC() { // It requires setting placement rules to run with TiKV if *mockstore.WithTiKV { return } - localTxn := s.begin1PC(c) + localTxn := s.begin1PC() err := localTxn.Set([]byte("a"), []byte("a1")) localTxn.SetScope("bj") - c.Assert(err, IsNil) + s.Nil(err) ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) err = localTxn.Commit(ctx) - c.Assert(err, IsNil) - c.Assert(localTxn.GetCommitter().IsOnePC(), IsFalse) + s.Nil(err) + s.False(localTxn.GetCommitter().IsOnePC()) - globalTxn := s.begin1PC(c) + globalTxn := s.begin1PC() err = globalTxn.Set([]byte("b"), []byte("b1")) globalTxn.SetScope(oracle.GlobalTxnScope) - c.Assert(err, IsNil) + s.Nil(err) err = globalTxn.Commit(ctx) - c.Assert(err, IsNil) - c.Assert(globalTxn.GetCommitter().IsOnePC(), IsTrue) + s.Nil(err) + s.True(globalTxn.GetCommitter().IsOnePC()) } -func (s *testOnePCSuite) TestTxnCommitCounter(c *C) { +func (s *testOnePCSuite) TestTxnCommitCounter() { initial := metrics.GetTxnCommitCounter() // 2PC - txn := s.begin(c) + txn := s.begin() err := txn.Set([]byte("k"), []byte("v")) - c.Assert(err, IsNil) + s.Nil(err) ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) err = txn.Commit(ctx) - c.Assert(err, IsNil) + s.Nil(err) curr := metrics.GetTxnCommitCounter() diff := curr.Sub(initial) - c.Assert(diff.TwoPC, Equals, int64(1)) - c.Assert(diff.AsyncCommit, Equals, int64(0)) - c.Assert(diff.OnePC, Equals, int64(0)) + s.Equal(diff.TwoPC, int64(1)) + s.Equal(diff.AsyncCommit, int64(0)) + s.Equal(diff.OnePC, int64(0)) // AsyncCommit - txn = s.beginAsyncCommit(c) + txn = s.beginAsyncCommit() err = txn.Set([]byte("k1"), []byte("v1")) - c.Assert(err, IsNil) + s.Nil(err) err = txn.Commit(ctx) - c.Assert(err, IsNil) + s.Nil(err) curr = metrics.GetTxnCommitCounter() diff = curr.Sub(initial) - c.Assert(diff.TwoPC, Equals, int64(1)) - c.Assert(diff.AsyncCommit, Equals, int64(1)) - c.Assert(diff.OnePC, Equals, int64(0)) + s.Equal(diff.TwoPC, int64(1)) + s.Equal(diff.AsyncCommit, int64(1)) + s.Equal(diff.OnePC, int64(0)) // 1PC - txn = s.begin1PC(c) + txn = s.begin1PC() err = txn.Set([]byte("k2"), []byte("v2")) - c.Assert(err, IsNil) + s.Nil(err) err = txn.Commit(ctx) - c.Assert(err, IsNil) + s.Nil(err) curr = metrics.GetTxnCommitCounter() diff = curr.Sub(initial) - c.Assert(diff.TwoPC, Equals, int64(1)) - c.Assert(diff.AsyncCommit, Equals, int64(1)) - c.Assert(diff.OnePC, Equals, int64(1)) + s.Equal(diff.TwoPC, int64(1)) + s.Equal(diff.AsyncCommit, int64(1)) + s.Equal(diff.OnePC, int64(1)) } diff --git a/integration_tests/async_commit_fail_test.go b/integration_tests/async_commit_fail_test.go index 7f4a008b..ea2711d1 100644 --- a/integration_tests/async_commit_fail_test.go +++ b/integration_tests/async_commit_fail_test.go @@ -36,63 +36,66 @@ import ( "bytes" "context" "sort" + "testing" - . "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/parser/terror" + "github.com/stretchr/testify/suite" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/mockstore" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/util" ) +func TestAsyncCommitFail(t *testing.T) { + suite.Run(t, new(testAsyncCommitFailSuite)) +} + type testAsyncCommitFailSuite struct { testAsyncCommitCommon } -var _ = SerialSuites(&testAsyncCommitFailSuite{}) - -func (s *testAsyncCommitFailSuite) SetUpTest(c *C) { - s.testAsyncCommitCommon.setUpTest(c) +func (s *testAsyncCommitFailSuite) SetupTest() { + s.testAsyncCommitCommon.setUpTest() } // TestFailCommitPrimaryRpcErrors tests rpc errors are handled properly when // committing primary region task. -func (s *testAsyncCommitFailSuite) TestFailAsyncCommitPrewriteRpcErrors(c *C) { +func (s *testAsyncCommitFailSuite) TestFailAsyncCommitPrewriteRpcErrors() { // This test doesn't support tikv mode because it needs setting failpoint in unistore. if *mockstore.WithTiKV { return } - c.Assert(failpoint.Enable("tikvclient/noRetryOnRpcError", "return(true)"), IsNil) - c.Assert(failpoint.Enable("tikvclient/rpcPrewriteTimeout", `return(true)`), IsNil) + s.Nil(failpoint.Enable("tikvclient/noRetryOnRpcError", "return(true)")) + s.Nil(failpoint.Enable("tikvclient/rpcPrewriteTimeout", `return(true)`)) defer func() { - c.Assert(failpoint.Disable("tikvclient/rpcPrewriteTimeout"), IsNil) - c.Assert(failpoint.Disable("tikvclient/noRetryOnRpcError"), IsNil) + s.Nil(failpoint.Disable("tikvclient/rpcPrewriteTimeout")) + s.Nil(failpoint.Disable("tikvclient/noRetryOnRpcError")) }() // The rpc error will be wrapped to ErrResultUndetermined. - t1 := s.beginAsyncCommit(c) + t1 := s.beginAsyncCommit() err := t1.Set([]byte("a"), []byte("a1")) - c.Assert(err, IsNil) + s.Nil(err) ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) err = t1.Commit(ctx) - c.Assert(err, NotNil) - c.Assert(terror.ErrorEqual(err, terror.ErrResultUndetermined), IsTrue, Commentf("%s", errors.ErrorStack(err))) + s.NotNil(err) + s.True(terror.ErrorEqual(err, terror.ErrResultUndetermined), errors.ErrorStack(err)) // We don't need to call "Rollback" after "Commit" fails. err = t1.Rollback() - c.Assert(err, Equals, tikverr.ErrInvalidTxn) + s.Equal(err, tikverr.ErrInvalidTxn) // Create a new transaction to check. The previous transaction should actually commit. - t2 := s.beginAsyncCommit(c) + t2 := s.beginAsyncCommit() res, err := t2.Get(context.Background(), []byte("a")) - c.Assert(err, IsNil) - c.Assert(bytes.Equal(res, []byte("a1")), IsTrue) + s.Nil(err) + s.True(bytes.Equal(res, []byte("a1"))) } -func (s *testAsyncCommitFailSuite) TestAsyncCommitPrewriteCancelled(c *C) { +func (s *testAsyncCommitFailSuite) TestAsyncCommitPrewriteCancelled() { // This test doesn't support tikv mode because it needs setting failpoint in unistore. if *mockstore.WithTiKV { return @@ -102,69 +105,69 @@ func (s *testAsyncCommitFailSuite) TestAsyncCommitPrewriteCancelled(c *C) { splitKey := "s" bo := tikv.NewBackofferWithVars(context.Background(), 5000, nil) loc, err := s.store.GetRegionCache().LocateKey(bo, []byte(splitKey)) - c.Assert(err, IsNil) + s.Nil(err) newRegionID := s.cluster.AllocID() newPeerID := s.cluster.AllocID() s.cluster.Split(loc.Region.GetID(), newRegionID, []byte(splitKey), []uint64{newPeerID}, newPeerID) s.store.GetRegionCache().InvalidateCachedRegion(loc.Region) - c.Assert(failpoint.Enable("tikvclient/rpcPrewriteResult", `1*return("writeConflict")->sleep(50)`), IsNil) + s.Nil(failpoint.Enable("tikvclient/rpcPrewriteResult", `1*return("writeConflict")->sleep(50)`)) defer func() { - c.Assert(failpoint.Disable("tikvclient/rpcPrewriteResult"), IsNil) + s.Nil(failpoint.Disable("tikvclient/rpcPrewriteResult")) }() - t1 := s.beginAsyncCommit(c) + t1 := s.beginAsyncCommit() err = t1.Set([]byte("a"), []byte("a")) - c.Assert(err, IsNil) + s.Nil(err) err = t1.Set([]byte("z"), []byte("z")) - c.Assert(err, IsNil) + s.Nil(err) ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) err = t1.Commit(ctx) - c.Assert(err, NotNil) + s.NotNil(err) _, ok := errors.Cause(err).(*tikverr.ErrWriteConflict) - c.Assert(ok, IsTrue, Commentf("%s", errors.ErrorStack(err))) + s.True(ok, errors.ErrorStack(err)) } -func (s *testAsyncCommitFailSuite) TestPointGetWithAsyncCommit(c *C) { - s.putAlphabets(c, true) +func (s *testAsyncCommitFailSuite) TestPointGetWithAsyncCommit() { + s.putAlphabets(true) - txn := s.beginAsyncCommit(c) + txn := s.beginAsyncCommit() txn.Set([]byte("a"), []byte("v1")) txn.Set([]byte("b"), []byte("v2")) - s.mustPointGet(c, []byte("a"), []byte("a")) - s.mustPointGet(c, []byte("b"), []byte("b")) + s.mustPointGet([]byte("a"), []byte("a")) + s.mustPointGet([]byte("b"), []byte("b")) // PointGet cannot ignore async commit transactions' locks. - c.Assert(failpoint.Enable("tikvclient/asyncCommitDoNothing", "return"), IsNil) + s.Nil(failpoint.Enable("tikvclient/asyncCommitDoNothing", "return")) ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) err := txn.Commit(ctx) - c.Assert(err, IsNil) - c.Assert(txn.GetCommitter().IsAsyncCommit(), IsTrue) - s.mustPointGet(c, []byte("a"), []byte("v1")) - s.mustPointGet(c, []byte("b"), []byte("v2")) - c.Assert(failpoint.Disable("tikvclient/asyncCommitDoNothing"), IsNil) + s.Nil(err) + s.True(txn.GetCommitter().IsAsyncCommit()) + s.mustPointGet([]byte("a"), []byte("v1")) + s.mustPointGet([]byte("b"), []byte("v2")) + s.Nil(failpoint.Disable("tikvclient/asyncCommitDoNothing")) // PointGet will not push the `max_ts` to its ts which is MaxUint64. - txn2 := s.beginAsyncCommit(c) - s.mustGetFromTxn(c, txn2, []byte("a"), []byte("v1")) - s.mustGetFromTxn(c, txn2, []byte("b"), []byte("v2")) + txn2 := s.beginAsyncCommit() + s.mustGetFromTxn(txn2, []byte("a"), []byte("v1")) + s.mustGetFromTxn(txn2, []byte("b"), []byte("v2")) err = txn2.Rollback() - c.Assert(err, IsNil) + s.Nil(err) } -func (s *testAsyncCommitFailSuite) TestSecondaryListInPrimaryLock(c *C) { +func (s *testAsyncCommitFailSuite) TestSecondaryListInPrimaryLock() { // This test doesn't support tikv mode. if *mockstore.WithTiKV { return } - s.putAlphabets(c, true) + s.putAlphabets(true) // Split into several regions. for _, splitKey := range []string{"h", "o", "u"} { bo := tikv.NewBackofferWithVars(context.Background(), 5000, nil) loc, err := s.store.GetRegionCache().LocateKey(bo, []byte(splitKey)) - c.Assert(err, IsNil) + s.Nil(err) newRegionID := s.cluster.AllocID() newPeerID := s.cluster.AllocID() s.cluster.Split(loc.Region.GetID(), newRegionID, []byte(splitKey), []uint64{newPeerID}, newPeerID) @@ -174,37 +177,37 @@ func (s *testAsyncCommitFailSuite) TestSecondaryListInPrimaryLock(c *C) { // Ensure the region has been split bo := tikv.NewBackofferWithVars(context.Background(), 5000, nil) loc, err := s.store.GetRegionCache().LocateKey(bo, []byte("i")) - c.Assert(err, IsNil) - c.Assert(loc.StartKey, BytesEquals, []byte("h")) - c.Assert(loc.EndKey, BytesEquals, []byte("o")) + s.Nil(err) + s.Equal(loc.StartKey, []byte("h")) + s.Equal(loc.EndKey, []byte("o")) loc, err = s.store.GetRegionCache().LocateKey(bo, []byte("p")) - c.Assert(err, IsNil) - c.Assert(loc.StartKey, BytesEquals, []byte("o")) - c.Assert(loc.EndKey, BytesEquals, []byte("u")) + s.Nil(err) + s.Equal(loc.StartKey, []byte("o")) + s.Equal(loc.EndKey, []byte("u")) var sessionID uint64 = 0 test := func(keys []string, values []string) { sessionID++ ctx := context.WithValue(context.Background(), util.SessionID, sessionID) - txn := s.beginAsyncCommit(c) + txn := s.beginAsyncCommit() for i := range keys { txn.Set([]byte(keys[i]), []byte(values[i])) } - c.Assert(failpoint.Enable("tikvclient/asyncCommitDoNothing", "return"), IsNil) + s.Nil(failpoint.Enable("tikvclient/asyncCommitDoNothing", "return")) err = txn.Commit(ctx) - c.Assert(err, IsNil) + s.Nil(err) primary := txn.GetCommitter().GetPrimaryKey() bo := tikv.NewBackofferWithVars(context.Background(), 5000, nil) lockResolver := tikv.LockResolverProbe{LockResolver: s.store.GetLockResolver()} txnStatus, err := lockResolver.GetTxnStatus(bo, txn.StartTS(), primary, 0, 0, false, false, nil) - c.Assert(err, IsNil) - c.Assert(txnStatus.IsCommitted(), IsFalse) - c.Assert(txnStatus.Action(), Equals, kvrpcpb.Action_NoAction) + s.Nil(err) + s.False(txnStatus.IsCommitted()) + s.Equal(txnStatus.Action(), kvrpcpb.Action_NoAction) // Currently when the transaction has no secondary, the `secondaries` field of the txnStatus // will be set nil. So here initialize the `expectedSecondaries` to nil too. var expectedSecondaries [][]byte @@ -222,9 +225,9 @@ func (s *testAsyncCommitFailSuite) TestSecondaryListInPrimaryLock(c *C) { return bytes.Compare(gotSecondaries[i], gotSecondaries[j]) < 0 }) - c.Assert(gotSecondaries, DeepEquals, expectedSecondaries) + s.Equal(gotSecondaries, expectedSecondaries) - c.Assert(failpoint.Disable("tikvclient/asyncCommitDoNothing"), IsNil) + s.Nil(failpoint.Disable("tikvclient/asyncCommitDoNothing")) txn.GetCommitter().Cleanup(context.Background()) } @@ -235,68 +238,68 @@ func (s *testAsyncCommitFailSuite) TestSecondaryListInPrimaryLock(c *C) { test([]string{"i", "a", "z", "u", "b"}, []string{"i5", "a5", "z5", "u5", "b5"}) } -func (s *testAsyncCommitFailSuite) TestAsyncCommitContextCancelCausingUndetermined(c *C) { +func (s *testAsyncCommitFailSuite) TestAsyncCommitContextCancelCausingUndetermined() { // For an async commit transaction, if RPC returns context.Canceled error when prewriting, the // transaction should go to undetermined state. - txn := s.beginAsyncCommit(c) + txn := s.beginAsyncCommit() err := txn.Set([]byte("a"), []byte("va")) - c.Assert(err, IsNil) + s.Nil(err) - c.Assert(failpoint.Enable("tikvclient/rpcContextCancelErr", `return(true)`), IsNil) + s.Nil(failpoint.Enable("tikvclient/rpcContextCancelErr", `return(true)`)) defer func() { - c.Assert(failpoint.Disable("tikvclient/rpcContextCancelErr"), IsNil) + s.Nil(failpoint.Disable("tikvclient/rpcContextCancelErr")) }() ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) err = txn.Commit(ctx) - c.Assert(err, NotNil) - c.Assert(txn.GetCommitter().GetUndeterminedErr(), NotNil) + s.NotNil(err) + s.NotNil(txn.GetCommitter().GetUndeterminedErr()) } // TestAsyncCommitRPCErrorThenWriteConflict verifies that the determined failure error overwrites undetermined error. -func (s *testAsyncCommitFailSuite) TestAsyncCommitRPCErrorThenWriteConflict(c *C) { +func (s *testAsyncCommitFailSuite) TestAsyncCommitRPCErrorThenWriteConflict() { // This test doesn't support tikv mode because it needs setting failpoint in unistore. if *mockstore.WithTiKV { return } - txn := s.beginAsyncCommit(c) + txn := s.beginAsyncCommit() err := txn.Set([]byte("a"), []byte("va")) - c.Assert(err, IsNil) + s.Nil(err) - c.Assert(failpoint.Enable("tikvclient/rpcPrewriteResult", `1*return("timeout")->return("writeConflict")`), IsNil) + s.Nil(failpoint.Enable("tikvclient/rpcPrewriteResult", `1*return("timeout")->return("writeConflict")`)) defer func() { - c.Assert(failpoint.Disable("tikvclient/rpcPrewriteResult"), IsNil) + s.Nil(failpoint.Disable("tikvclient/rpcPrewriteResult")) }() ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) err = txn.Commit(ctx) - c.Assert(err, NotNil) - c.Assert(txn.GetCommitter().GetUndeterminedErr(), IsNil) + s.NotNil(err) + s.Nil(txn.GetCommitter().GetUndeterminedErr()) } // TestAsyncCommitRPCErrorThenWriteConflictInChild verifies that the determined failure error in a child recursion // overwrites the undetermined error in the parent. -func (s *testAsyncCommitFailSuite) TestAsyncCommitRPCErrorThenWriteConflictInChild(c *C) { +func (s *testAsyncCommitFailSuite) TestAsyncCommitRPCErrorThenWriteConflictInChild() { // This test doesn't support tikv mode because it needs setting failpoint in unistore. if *mockstore.WithTiKV { return } - txn := s.beginAsyncCommit(c) + txn := s.beginAsyncCommit() err := txn.Set([]byte("a"), []byte("va")) - c.Assert(err, IsNil) + s.Nil(err) - c.Assert(failpoint.Enable("tikvclient/rpcPrewriteResult", `1*return("timeout")->return("writeConflict")`), IsNil) - c.Assert(failpoint.Enable("tikvclient/forceRecursion", `return`), IsNil) + s.Nil(failpoint.Enable("tikvclient/rpcPrewriteResult", `1*return("timeout")->return("writeConflict")`)) + s.Nil(failpoint.Enable("tikvclient/forceRecursion", `return`)) defer func() { - c.Assert(failpoint.Disable("tikvclient/rpcPrewriteResult"), IsNil) - c.Assert(failpoint.Disable("tikvclient/forceRecursion"), IsNil) + s.Nil(failpoint.Disable("tikvclient/rpcPrewriteResult")) + s.Nil(failpoint.Disable("tikvclient/forceRecursion")) }() ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) err = txn.Commit(ctx) - c.Assert(err, NotNil) - c.Assert(txn.GetCommitter().GetUndeterminedErr(), IsNil) + s.NotNil(err) + s.Nil(txn.GetCommitter().GetUndeterminedErr()) } diff --git a/integration_tests/async_commit_test.go b/integration_tests/async_commit_test.go index 62d4fa01..b2652bf5 100644 --- a/integration_tests/async_commit_test.go +++ b/integration_tests/async_commit_test.go @@ -45,6 +45,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/store/mockstore/unistore" + "github.com/stretchr/testify/suite" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/mockstore" "github.com/tikv/client-go/v2/mockstore/cluster" @@ -59,107 +60,119 @@ func TestT(t *testing.T) { TestingT(t) } +func TestAsyncCommit(t *testing.T) { + suite.Run(t, new(testAsyncCommitSuite)) +} + // testAsyncCommitCommon is used to put common parts that will be both used by // testAsyncCommitSuite and testAsyncCommitFailSuite. type testAsyncCommitCommon struct { + suite.Suite cluster cluster.Cluster store *tikv.KVStore } -func (s *testAsyncCommitCommon) setUpTest(c *C) { +func (s *testAsyncCommitCommon) setUpTest() { if *mockstore.WithTiKV { - s.store = NewTestStore(c) + s.store = NewTestStoreT(s.T()) return } client, pdClient, cluster, err := unistore.New("") - c.Assert(err, IsNil) + s.Require().Nil(err) unistore.BootstrapWithSingleStore(cluster) s.cluster = cluster store, err := tikv.NewTestTiKVStore(fpClient{Client: client}, pdClient, nil, nil, 0) - c.Assert(err, IsNil) + s.Require().Nil(err) s.store = store } -func (s *testAsyncCommitCommon) putAlphabets(c *C, enableAsyncCommit bool) { +func (s *testAsyncCommitCommon) putAlphabets(enableAsyncCommit bool) { for ch := byte('a'); ch <= byte('z'); ch++ { - s.putKV(c, []byte{ch}, []byte{ch}, enableAsyncCommit) + s.putKV([]byte{ch}, []byte{ch}, enableAsyncCommit) } } -func (s *testAsyncCommitCommon) putKV(c *C, key, value []byte, enableAsyncCommit bool) (uint64, uint64) { - txn := s.beginAsyncCommit(c) +func (s *testAsyncCommitCommon) putKV(key, value []byte, enableAsyncCommit bool) (uint64, uint64) { + txn := s.beginAsyncCommit() err := txn.Set(key, value) - c.Assert(err, IsNil) + s.Nil(err) err = txn.Commit(context.Background()) - c.Assert(err, IsNil) + s.Nil(err) return txn.StartTS(), txn.GetCommitTS() } -func (s *testAsyncCommitCommon) mustGetFromTxn(c *C, txn tikv.TxnProbe, key, expectedValue []byte) { +func (s *testAsyncCommitCommon) mustGetFromTxn(txn tikv.TxnProbe, key, expectedValue []byte) { v, err := txn.Get(context.Background(), key) - c.Assert(err, IsNil) - c.Assert(v, BytesEquals, expectedValue) + s.Nil(err) + s.Equal(v, expectedValue) } -func (s *testAsyncCommitCommon) mustGetLock(c *C, key []byte) *tikv.Lock { +func (s *testAsyncCommitCommon) mustGetLock(key []byte) *tikv.Lock { ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) - c.Assert(err, IsNil) + s.Nil(err) req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{ Key: key, Version: ver, }) bo := tikv.NewBackofferWithVars(context.Background(), 5000, nil) loc, err := s.store.GetRegionCache().LocateKey(bo, key) - c.Assert(err, IsNil) + s.Nil(err) resp, err := s.store.SendReq(bo, req, loc.Region, time.Second*10) - c.Assert(err, IsNil) - c.Assert(resp.Resp, NotNil) + s.Nil(err) + s.NotNil(resp.Resp) keyErr := resp.Resp.(*kvrpcpb.GetResponse).GetError() - c.Assert(keyErr, NotNil) + s.NotNil(keyErr) var lockutil tikv.LockProbe lock, err := lockutil.ExtractLockFromKeyErr(keyErr) - c.Assert(err, IsNil) + s.Nil(err) return lock } -func (s *testAsyncCommitCommon) mustPointGet(c *C, key, expectedValue []byte) { +func (s *testAsyncCommitCommon) mustPointGet(key, expectedValue []byte) { snap := s.store.GetSnapshot(math.MaxUint64) value, err := snap.Get(context.Background(), key) - c.Assert(err, IsNil) - c.Assert(value, BytesEquals, expectedValue) + s.Nil(err) + s.Equal(value, expectedValue) } -func (s *testAsyncCommitCommon) mustGetFromSnapshot(c *C, version uint64, key, expectedValue []byte) { +func (s *testAsyncCommitCommon) mustGetFromSnapshot(version uint64, key, expectedValue []byte) { snap := s.store.GetSnapshot(version) value, err := snap.Get(context.Background(), key) - c.Assert(err, IsNil) - c.Assert(value, BytesEquals, expectedValue) + s.Nil(err) + s.Equal(value, expectedValue) } -func (s *testAsyncCommitCommon) mustGetNoneFromSnapshot(c *C, version uint64, key []byte) { +func (s *testAsyncCommitCommon) mustGetNoneFromSnapshot(version uint64, key []byte) { snap := s.store.GetSnapshot(version) _, err := snap.Get(context.Background(), key) - c.Assert(errors.Cause(err), Equals, tikverr.ErrNotExist) + s.Equal(errors.Cause(err), tikverr.ErrNotExist) } -func (s *testAsyncCommitCommon) beginAsyncCommitWithLinearizability(c *C) tikv.TxnProbe { - txn := s.beginAsyncCommit(c) +func (s *testAsyncCommitCommon) beginAsyncCommitWithLinearizability() tikv.TxnProbe { + txn := s.beginAsyncCommit() txn.SetCausalConsistency(false) return txn } -func (s *testAsyncCommitCommon) beginAsyncCommit(c *C) tikv.TxnProbe { +func (s *testAsyncCommitCommon) beginAsyncCommit() tikv.TxnProbe { txn, err := s.store.Begin() - c.Assert(err, IsNil) + s.Nil(err) txn.SetEnableAsyncCommit(true) return tikv.TxnProbe{KVTxn: txn} } -func (s *testAsyncCommitCommon) begin(c *C) tikv.TxnProbe { +func (s *testAsyncCommitCommon) begin() tikv.TxnProbe { txn, err := s.store.Begin() - c.Assert(err, IsNil) + s.Nil(err) + return tikv.TxnProbe{KVTxn: txn} +} + +func (s *testAsyncCommitCommon) begin1PC() tikv.TxnProbe { + txn, err := s.store.Begin() + s.Nil(err) + txn.SetEnable1PC(true) return tikv.TxnProbe{KVTxn: txn} } @@ -168,16 +181,14 @@ type testAsyncCommitSuite struct { bo *tikv.Backoffer } -var _ = SerialSuites(&testAsyncCommitSuite{}) - -func (s *testAsyncCommitSuite) SetUpTest(c *C) { - s.testAsyncCommitCommon.setUpTest(c) +func (s *testAsyncCommitSuite) SetupTest() { + s.testAsyncCommitCommon.setUpTest() s.bo = tikv.NewBackofferWithVars(context.Background(), 5000, nil) } -func (s *testAsyncCommitSuite) lockKeysWithAsyncCommit(c *C, keys, values [][]byte, primaryKey, primaryValue []byte, commitPrimary bool) (uint64, uint64) { +func (s *testAsyncCommitSuite) lockKeysWithAsyncCommit(keys, values [][]byte, primaryKey, primaryValue []byte, commitPrimary bool) (uint64, uint64) { txn, err := s.store.Begin() - c.Assert(err, IsNil) + s.Nil(err) txn.SetEnableAsyncCommit(true) for i, k := range keys { if len(values[i]) > 0 { @@ -185,69 +196,69 @@ func (s *testAsyncCommitSuite) lockKeysWithAsyncCommit(c *C, keys, values [][]by } else { err = txn.Delete(k) } - c.Assert(err, IsNil) + s.Nil(err) } if len(primaryValue) > 0 { err = txn.Set(primaryKey, primaryValue) } else { err = txn.Delete(primaryKey) } - c.Assert(err, IsNil) + s.Nil(err) txnProbe := tikv.TxnProbe{KVTxn: txn} tpc, err := txnProbe.NewCommitter(0) - c.Assert(err, IsNil) + s.Nil(err) tpc.SetPrimaryKey(primaryKey) ctx := context.Background() err = tpc.PrewriteAllMutations(ctx) - c.Assert(err, IsNil) + s.Nil(err) if commitPrimary { commitTS, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) - c.Assert(err, IsNil) + s.Nil(err) tpc.SetCommitTS(commitTS) err = tpc.CommitMutations(ctx) - c.Assert(err, IsNil) + s.Nil(err) } return txn.StartTS(), tpc.GetCommitTS() } -func (s *testAsyncCommitSuite) TestCheckSecondaries(c *C) { +func (s *testAsyncCommitSuite) TestCheckSecondaries() { // This test doesn't support tikv mode. if *mockstore.WithTiKV { return } - s.putAlphabets(c, true) + s.putAlphabets(true) loc, err := s.store.GetRegionCache().LocateKey(s.bo, []byte("a")) - c.Assert(err, IsNil) + s.Nil(err) newRegionID, peerID := s.cluster.AllocID(), s.cluster.AllocID() s.cluster.Split(loc.Region.GetID(), newRegionID, []byte("e"), []uint64{peerID}, peerID) s.store.GetRegionCache().InvalidateCachedRegion(loc.Region) // No locks to check, only primary key is locked, should be successful. - s.lockKeysWithAsyncCommit(c, [][]byte{}, [][]byte{}, []byte("z"), []byte("z"), false) - lock := s.mustGetLock(c, []byte("z")) + s.lockKeysWithAsyncCommit([][]byte{}, [][]byte{}, []byte("z"), []byte("z"), false) + lock := s.mustGetLock([]byte("z")) lock.UseAsyncCommit = true ts, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) - c.Assert(err, IsNil) + s.Nil(err) var lockutil tikv.LockProbe status := lockutil.NewLockStatus(nil, true, ts) resolver := tikv.LockResolverProbe{LockResolver: s.store.GetLockResolver()} err = resolver.ResolveLockAsync(s.bo, lock, status) - c.Assert(err, IsNil) + s.Nil(err) currentTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) - c.Assert(err, IsNil) + s.Nil(err) status, err = resolver.GetTxnStatus(s.bo, lock.TxnID, []byte("z"), currentTS, currentTS, true, false, nil) - c.Assert(err, IsNil) - c.Assert(status.IsCommitted(), IsTrue) - c.Assert(status.CommitTS(), Equals, ts) + s.Nil(err) + s.True(status.IsCommitted()) + s.Equal(status.CommitTS(), ts) // One key is committed (i), one key is locked (a). Should get committed. ts, err = s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) - c.Assert(err, IsNil) + s.Nil(err) commitTs := ts + 10 gotCheckA := int64(0) @@ -313,18 +324,18 @@ func (s *testAsyncCommitSuite) TestCheckSecondaries(c *C) { MinCommitTS: ts + 5, } - _ = s.beginAsyncCommit(c) + _ = s.beginAsyncCommit() err = resolver.ResolveLockAsync(s.bo, lock, status) - c.Assert(err, IsNil) - c.Assert(gotCheckA, Equals, int64(1)) - c.Assert(gotCheckB, Equals, int64(1)) - c.Assert(gotOther, Equals, int64(0)) - c.Assert(gotResolve, Equals, int64(1)) + s.Nil(err) + s.Equal(gotCheckA, int64(1)) + s.Equal(gotCheckB, int64(1)) + s.Equal(gotOther, int64(0)) + s.Equal(gotResolve, int64(1)) // One key has been rolled back (b), one is locked (a). Should be rolled back. ts, err = s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) - c.Assert(err, IsNil) + s.Nil(err) commitTs = ts + 10 gotCheckA = int64(0) @@ -353,45 +364,45 @@ func (s *testAsyncCommitSuite) TestCheckSecondaries(c *C) { lock.MinCommitTS = ts + 5 err = resolver.ResolveLockAsync(s.bo, lock, status) - c.Assert(err, IsNil) - c.Assert(gotCheckA, Equals, int64(1)) - c.Assert(gotCheckB, Equals, int64(1)) - c.Assert(gotResolve, Equals, int64(1)) - c.Assert(gotOther, Equals, int64(0)) + s.Nil(err) + s.Equal(gotCheckA, int64(1)) + s.Equal(gotCheckB, int64(1)) + s.Equal(gotResolve, int64(1)) + s.Equal(gotOther, int64(0)) } -func (s *testAsyncCommitSuite) TestRepeatableRead(c *C) { +func (s *testAsyncCommitSuite) TestRepeatableRead() { var sessionID uint64 = 0 test := func(isPessimistic bool) { - s.putKV(c, []byte("k1"), []byte("v1"), true) + s.putKV([]byte("k1"), []byte("v1"), true) sessionID++ ctx := context.WithValue(context.Background(), util.SessionID, sessionID) - txn1 := s.beginAsyncCommit(c) + txn1 := s.beginAsyncCommit() txn1.SetPessimistic(isPessimistic) - s.mustGetFromTxn(c, txn1, []byte("k1"), []byte("v1")) + s.mustGetFromTxn(txn1, []byte("k1"), []byte("v1")) txn1.Set([]byte("k1"), []byte("v2")) for i := 0; i < 20; i++ { _, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) - c.Assert(err, IsNil) + s.Nil(err) } - txn2 := s.beginAsyncCommit(c) - s.mustGetFromTxn(c, txn2, []byte("k1"), []byte("v1")) + txn2 := s.beginAsyncCommit() + s.mustGetFromTxn(txn2, []byte("k1"), []byte("v1")) err := txn1.Commit(ctx) - c.Assert(err, IsNil) + s.Nil(err) // Check txn1 is committed in async commit. - c.Assert(txn1.IsAsyncCommit(), IsTrue) - s.mustGetFromTxn(c, txn2, []byte("k1"), []byte("v1")) + s.True(txn1.IsAsyncCommit()) + s.mustGetFromTxn(txn2, []byte("k1"), []byte("v1")) err = txn2.Rollback() - c.Assert(err, IsNil) + s.Nil(err) - txn3 := s.beginAsyncCommit(c) - s.mustGetFromTxn(c, txn3, []byte("k1"), []byte("v2")) + txn3 := s.beginAsyncCommit() + s.mustGetFromTxn(txn3, []byte("k1"), []byte("v2")) err = txn3.Rollback() - c.Assert(err, IsNil) + s.Nil(err) } test(false) @@ -400,69 +411,69 @@ func (s *testAsyncCommitSuite) TestRepeatableRead(c *C) { // It's just a simple validation of linearizability. // Extra tests are needed to test this feature with the control of the TiKV cluster. -func (s *testAsyncCommitSuite) TestAsyncCommitLinearizability(c *C) { - t1 := s.beginAsyncCommitWithLinearizability(c) - t2 := s.beginAsyncCommitWithLinearizability(c) +func (s *testAsyncCommitSuite) TestAsyncCommitLinearizability() { + t1 := s.beginAsyncCommitWithLinearizability() + t2 := s.beginAsyncCommitWithLinearizability() err := t1.Set([]byte("a"), []byte("a1")) - c.Assert(err, IsNil) + s.Nil(err) err = t2.Set([]byte("b"), []byte("b1")) - c.Assert(err, IsNil) + s.Nil(err) ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) // t2 commits earlier than t1 err = t2.Commit(ctx) - c.Assert(err, IsNil) + s.Nil(err) err = t1.Commit(ctx) - c.Assert(err, IsNil) + s.Nil(err) commitTS1 := t1.GetCommitTS() commitTS2 := t2.GetCommitTS() - c.Assert(commitTS2, Less, commitTS1) + s.Less(commitTS2, commitTS1) } // TestAsyncCommitWithMultiDC tests that async commit can only be enabled in global transactions -func (s *testAsyncCommitSuite) TestAsyncCommitWithMultiDC(c *C) { +func (s *testAsyncCommitSuite) TestAsyncCommitWithMultiDC() { // It requires setting placement rules to run with TiKV if *mockstore.WithTiKV { return } - localTxn := s.beginAsyncCommit(c) + localTxn := s.beginAsyncCommit() err := localTxn.Set([]byte("a"), []byte("a1")) localTxn.SetScope("bj") - c.Assert(err, IsNil) + s.Nil(err) ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) err = localTxn.Commit(ctx) - c.Assert(err, IsNil) - c.Assert(localTxn.IsAsyncCommit(), IsFalse) + s.Nil(err) + s.False(localTxn.IsAsyncCommit()) - globalTxn := s.beginAsyncCommit(c) + globalTxn := s.beginAsyncCommit() err = globalTxn.Set([]byte("b"), []byte("b1")) globalTxn.SetScope(oracle.GlobalTxnScope) - c.Assert(err, IsNil) + s.Nil(err) err = globalTxn.Commit(ctx) - c.Assert(err, IsNil) - c.Assert(globalTxn.IsAsyncCommit(), IsTrue) + s.Nil(err) + s.True(globalTxn.IsAsyncCommit()) } -func (s *testAsyncCommitSuite) TestResolveTxnFallbackFromAsyncCommit(c *C) { +func (s *testAsyncCommitSuite) TestResolveTxnFallbackFromAsyncCommit() { keys := [][]byte{[]byte("k0"), []byte("k1")} values := [][]byte{[]byte("v00"), []byte("v10")} initTest := func() tikv.CommitterProbe { - t0 := s.begin(c) + t0 := s.begin() err := t0.Set(keys[0], values[0]) - c.Assert(err, IsNil) + s.Nil(err) err = t0.Set(keys[1], values[1]) - c.Assert(err, IsNil) + s.Nil(err) err = t0.Commit(context.Background()) - c.Assert(err, IsNil) + s.Nil(err) - t1 := s.beginAsyncCommit(c) + t1 := s.beginAsyncCommit() err = t1.Set(keys[0], []byte("v01")) - c.Assert(err, IsNil) + s.Nil(err) err = t1.Set(keys[1], []byte("v11")) - c.Assert(err, IsNil) + s.Nil(err) committer, err := t1.NewCommitter(1) - c.Assert(err, IsNil) + s.Nil(err) committer.SetLockTTL(1) committer.SetUseAsyncCommit() return committer @@ -470,23 +481,23 @@ func (s *testAsyncCommitSuite) TestResolveTxnFallbackFromAsyncCommit(c *C) { prewriteKey := func(committer tikv.CommitterProbe, idx int, fallback bool) { bo := tikv.NewBackofferWithVars(context.Background(), 5000, nil) loc, err := s.store.GetRegionCache().LocateKey(bo, keys[idx]) - c.Assert(err, IsNil) + s.Nil(err) req := committer.BuildPrewriteRequest(loc.Region.GetID(), loc.Region.GetConfVer(), loc.Region.GetVer(), committer.GetMutations().Slice(idx, idx+1), 1) if fallback { req.Req.(*kvrpcpb.PrewriteRequest).MaxCommitTs = 1 } resp, err := s.store.SendReq(bo, req, loc.Region, 5000) - c.Assert(err, IsNil) - c.Assert(resp.Resp, NotNil) + s.Nil(err) + s.NotNil(resp.Resp) } readKey := func(idx int) { - t2 := s.begin(c) + t2 := s.begin() ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() val, err := t2.Get(ctx, keys[idx]) - c.Assert(err, IsNil) - c.Assert(val, DeepEquals, values[idx]) + s.Nil(err) + s.Equal(val, values[idx]) } // Case 1: Fallback primary, read primary