diff --git a/integration_tests/1pc_test.go b/integration_tests/1pc_test.go index 43905d16..e920fa26 100644 --- a/integration_tests/1pc_test.go +++ b/integration_tests/1pc_test.go @@ -40,7 +40,6 @@ import ( "github.com/tikv/client-go/v2/metrics" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" - "github.com/tikv/client-go/v2/util" ) func TestOnePC(t *testing.T) { @@ -62,15 +61,13 @@ func (s *testOnePCSuite) TearDownTest() { } func (s *testOnePCSuite) Test1PC() { - ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) - k1 := []byte("k1") v1 := []byte("v1") txn := s.begin1PC() err := txn.Set(k1, v1) s.Nil(err) - err = txn.Commit(ctx) + err = txn.Commit(context.Background()) s.Nil(err) s.True(txn.GetCommitter().IsOnePC()) s.Equal(txn.GetCommitter().GetOnePCCommitTS(), txn.GetCommitter().GetCommitTS()) @@ -78,11 +75,12 @@ func (s *testOnePCSuite) Test1PC() { // ttlManager is not used for 1PC. s.True(txn.GetCommitter().IsTTLUninitialized()) - // 1PC doesn't work if sessionID == 0 + // 1PC doesn't work if 1PC option is not set + k2 := []byte("k2") v2 := []byte("v2") - txn = s.begin1PC() + txn = s.begin() err = txn.Set(k2, v2) s.Nil(err) err = txn.Commit(context.Background()) @@ -91,83 +89,67 @@ func (s *testOnePCSuite) Test1PC() { s.Equal(txn.GetCommitter().GetOnePCCommitTS(), uint64(0)) s.Greater(txn.GetCommitter().GetCommitTS(), txn.StartTS()) - // 1PC doesn't work if system variable not set - + // Test multiple keys k3 := []byte("k3") v3 := []byte("v3") - - txn = s.begin() - err = txn.Set(k3, v3) - s.Nil(err) - err = txn.Commit(ctx) - s.Nil(err) - s.False(txn.GetCommitter().IsOnePC()) - s.Equal(txn.GetCommitter().GetOnePCCommitTS(), uint64(0)) - s.Greater(txn.GetCommitter().GetCommitTS(), txn.StartTS()) - - // Test multiple keys k4 := []byte("k4") v4 := []byte("v4") k5 := []byte("k5") v5 := []byte("v5") - k6 := []byte("k6") - v6 := []byte("v6") txn = s.begin1PC() + err = txn.Set(k3, v3) + s.Nil(err) err = txn.Set(k4, v4) s.Nil(err) err = txn.Set(k5, v5) s.Nil(err) - err = txn.Set(k6, v6) - s.Nil(err) - err = txn.Commit(ctx) + err = txn.Commit(context.Background()) s.Nil(err) s.True(txn.GetCommitter().IsOnePC()) s.Equal(txn.GetCommitter().GetOnePCCommitTS(), txn.GetCommitter().GetCommitTS()) s.Greater(txn.GetCommitter().GetOnePCCommitTS(), txn.StartTS()) // Check keys are committed with the same version + s.mustGetFromSnapshot(txn.GetCommitTS(), k3, v3) s.mustGetFromSnapshot(txn.GetCommitTS(), k4, v4) s.mustGetFromSnapshot(txn.GetCommitTS(), k5, v5) - s.mustGetFromSnapshot(txn.GetCommitTS(), k6, v6) + s.mustGetNoneFromSnapshot(txn.GetCommitTS()-1, k3) s.mustGetNoneFromSnapshot(txn.GetCommitTS()-1, k4) s.mustGetNoneFromSnapshot(txn.GetCommitTS()-1, k5) - s.mustGetNoneFromSnapshot(txn.GetCommitTS()-1, k6) // Overwriting in MVCC - v6New := []byte("v6new") + v5New := []byte("v5new") txn = s.begin1PC() - err = txn.Set(k6, v6New) + err = txn.Set(k5, v5New) s.Nil(err) - err = txn.Commit(ctx) + err = txn.Commit(context.Background()) s.Nil(err) s.True(txn.GetCommitter().IsOnePC()) s.Equal(txn.GetCommitter().GetOnePCCommitTS(), txn.GetCommitter().GetCommitTS()) s.Greater(txn.GetCommitter().GetOnePCCommitTS(), txn.StartTS()) - s.mustGetFromSnapshot(txn.GetCommitTS(), k6, v6New) - s.mustGetFromSnapshot(txn.GetCommitTS()-1, k6, v6) + s.mustGetFromSnapshot(txn.GetCommitTS(), k5, v5New) + s.mustGetFromSnapshot(txn.GetCommitTS()-1, k5, v5) // Check all keys - keys := [][]byte{k1, k2, k3, k4, k5, k6} - values := [][]byte{v1, v2, v3, v4, v5, v6New} + keys := [][]byte{k1, k2, k3, k4, k5} + values := [][]byte{v1, v2, v3, v4, v5New} ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) s.Nil(err) snap := s.store.GetSnapshot(ver) for i, k := range keys { - v, err := snap.Get(ctx, k) + v, err := snap.Get(context.Background(), k) s.Nil(err) s.Equal(v, values[i]) } } func (s *testOnePCSuite) Test1PCIsolation() { - ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) - k := []byte("k") v1 := []byte("v1") txn := s.begin1PC() txn.Set(k, v1) - err := txn.Commit(ctx) + err := txn.Commit(context.Background()) s.Nil(err) v2 := []byte("v2") @@ -177,14 +159,14 @@ func (s *testOnePCSuite) Test1PCIsolation() { // Make `txn`'s commitTs more likely to be less than `txn2`'s startTs if there's bug in commitTs // calculation. for i := 0; i < 10; i++ { - _, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + _, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) s.Nil(err) } txn2 := s.begin1PC() s.mustGetFromTxn(txn2, k, v1) - err = txn.Commit(ctx) + err = txn.Commit(context.Background()) s.True(txn.GetCommitter().IsOnePC()) s.Nil(err) @@ -201,8 +183,6 @@ func (s *testOnePCSuite) Test1PCDisallowMultiRegion() { return } - ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) - txn := s.begin1PC() keys := []string{"k0", "k1", "k2", "k3"} @@ -212,7 +192,7 @@ func (s *testOnePCSuite) Test1PCDisallowMultiRegion() { s.Nil(err) err = txn.Set([]byte(keys[3]), []byte(values[3])) s.Nil(err) - err = txn.Commit(ctx) + err = txn.Commit(context.Background()) s.Nil(err) // 1PC doesn't work if it affects multiple regions. @@ -227,7 +207,7 @@ func (s *testOnePCSuite) Test1PCDisallowMultiRegion() { s.Nil(err) err = txn.Set([]byte(keys[2]), []byte(values[2])) s.Nil(err) - err = txn.Commit(ctx) + err = txn.Commit(context.Background()) s.Nil(err) s.False(txn.GetCommitter().IsOnePC()) s.Equal(txn.GetCommitter().GetOnePCCommitTS(), uint64(0)) @@ -237,7 +217,7 @@ func (s *testOnePCSuite) Test1PCDisallowMultiRegion() { s.Nil(err) snap := s.store.GetSnapshot(ver) for i, k := range keys { - v, err := snap.Get(ctx, []byte(k)) + v, err := snap.Get(context.Background(), []byte(k)) s.Nil(err) s.Equal(v, []byte(values[i])) } @@ -252,11 +232,10 @@ func (s *testOnePCSuite) Test1PCLinearizability() { s.Nil(err) err = t2.Set([]byte("b"), []byte("b1")) s.Nil(err) - ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) // t2 commits earlier than t1 - err = t2.Commit(ctx) + err = t2.Commit(context.Background()) s.Nil(err) - err = t1.Commit(ctx) + err = t1.Commit(context.Background()) s.Nil(err) commitTS1 := t1.GetCommitter().GetCommitTS() commitTS2 := t2.GetCommitter().GetCommitTS() @@ -273,8 +252,7 @@ func (s *testOnePCSuite) Test1PCWithMultiDC() { err := localTxn.Set([]byte("a"), []byte("a1")) localTxn.SetScope("bj") s.Nil(err) - ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) - err = localTxn.Commit(ctx) + err = localTxn.Commit(context.Background()) s.Nil(err) s.False(localTxn.GetCommitter().IsOnePC()) @@ -282,7 +260,7 @@ func (s *testOnePCSuite) Test1PCWithMultiDC() { err = globalTxn.Set([]byte("b"), []byte("b1")) globalTxn.SetScope(oracle.GlobalTxnScope) s.Nil(err) - err = globalTxn.Commit(ctx) + err = globalTxn.Commit(context.Background()) s.Nil(err) s.True(globalTxn.GetCommitter().IsOnePC()) } @@ -294,8 +272,7 @@ func (s *testOnePCSuite) TestTxnCommitCounter() { txn := s.begin() err := txn.Set([]byte("k"), []byte("v")) s.Nil(err) - ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) - err = txn.Commit(ctx) + err = txn.Commit(context.Background()) s.Nil(err) curr := metrics.GetTxnCommitCounter() diff := curr.Sub(initial) @@ -307,7 +284,7 @@ func (s *testOnePCSuite) TestTxnCommitCounter() { txn = s.beginAsyncCommit() err = txn.Set([]byte("k1"), []byte("v1")) s.Nil(err) - err = txn.Commit(ctx) + err = txn.Commit(context.Background()) s.Nil(err) curr = metrics.GetTxnCommitCounter() diff = curr.Sub(initial) @@ -319,7 +296,7 @@ func (s *testOnePCSuite) TestTxnCommitCounter() { txn = s.begin1PC() err = txn.Set([]byte("k2"), []byte("v2")) s.Nil(err) - err = txn.Commit(ctx) + err = txn.Commit(context.Background()) s.Nil(err) curr = metrics.GetTxnCommitCounter() diff = curr.Sub(initial) diff --git a/integration_tests/2pc_test.go b/integration_tests/2pc_test.go index f2d8e3e1..676ea3b3 100644 --- a/integration_tests/2pc_test.go +++ b/integration_tests/2pc_test.go @@ -1269,7 +1269,6 @@ func (s *testCommitterSuite) TestAsyncCommit() { committer, err := txn1.NewCommitter(0) s.Nil(err) - committer.SetSessionID(1) committer.SetMinCommitTS(txn1.StartTS() + 10) err = committer.Execute(ctx) s.Nil(err) diff --git a/integration_tests/scan_test.go b/integration_tests/scan_test.go index 06781952..155562b8 100644 --- a/integration_tests/scan_test.go +++ b/integration_tests/scan_test.go @@ -40,7 +40,6 @@ import ( "github.com/stretchr/testify/suite" "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/tikv" - "github.com/tikv/client-go/v2/util" ) var scanBatchSize = tikv.ConfigProbe{}.GetScanBatchSize() @@ -54,15 +53,12 @@ type testScanSuite struct { store *tikv.KVStore recordPrefix []byte rowNums []int - ctx context.Context } func (s *testScanSuite) SetupSuite() { s.store = NewTestStore(s.T()) s.recordPrefix = []byte("prefix") s.rowNums = append(s.rowNums, 1, scanBatchSize, scanBatchSize+1, scanBatchSize*3) - // Avoid using async commit logic. - s.ctx = context.WithValue(context.Background(), util.SessionID, uint64(0)) } func (s *testScanSuite) TearDownSuite() { @@ -76,7 +72,7 @@ func (s *testScanSuite) TearDownSuite() { s.Require().Nil(err) scanner.Next() } - err = txn.Commit(s.ctx) + err = txn.Commit(context.Background()) s.Require().Nil(err) err = s.store.Close() s.Require().Nil(err) @@ -125,16 +121,16 @@ func (s *testScanSuite) TestScan() { err := txn.Set(s.makeKey(i), s.makeValue(i)) s.Nil(err) } - err := txn.Commit(s.ctx) + err := txn.Commit(context.Background()) s.Nil(err) mockTableID := int64(999) if rowNum > 123 { - _, err = s.store.SplitRegions(s.ctx, [][]byte{s.makeKey(123)}, false, &mockTableID) + _, err = s.store.SplitRegions(context.Background(), [][]byte{s.makeKey(123)}, false, &mockTableID) s.Nil(err) } if rowNum > 456 { - _, err = s.store.SplitRegions(s.ctx, [][]byte{s.makeKey(456)}, false, &mockTableID) + _, err = s.store.SplitRegions(context.Background(), [][]byte{s.makeKey(456)}, false, &mockTableID) s.Nil(err) } diff --git a/tikv/2pc.go b/tikv/2pc.go index c4884b88..3a8551cc 100644 --- a/tikv/2pc.go +++ b/tikv/2pc.go @@ -913,7 +913,7 @@ func (c *twoPhaseCommitter) checkAsyncCommit() bool { asyncCommitCfg := config.GetGlobalConfig().TiKVClient.AsyncCommit // TODO the keys limit need more tests, this value makes the unit test pass by now. // Async commit is not compatible with Binlog because of the non unique timestamp issue. - if c.sessionID > 0 && c.txn.enableAsyncCommit && + if c.txn.enableAsyncCommit && uint(c.mutations.Len()) <= asyncCommitCfg.KeysLimit && !c.shouldWriteBinlog() { totalKeySize := uint64(0) @@ -935,7 +935,7 @@ func (c *twoPhaseCommitter) checkOnePC() bool { return false } - return c.sessionID > 0 && !c.shouldWriteBinlog() && c.txn.enable1PC + return !c.shouldWriteBinlog() && c.txn.enable1PC } func (c *twoPhaseCommitter) needLinearizability() bool {