do not check sessionID for async commit and 1PC (#228)

Signed-off-by: Yilin Chen <sticnarf@gmail.com>
This commit is contained in:
Yilin Chen 2021-07-13 14:53:58 +08:00 committed by GitHub
parent 0fdc8e3d6f
commit f5a3974312
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 37 additions and 65 deletions

View File

@ -40,7 +40,6 @@ import (
"github.com/tikv/client-go/v2/metrics" "github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/util"
) )
func TestOnePC(t *testing.T) { func TestOnePC(t *testing.T) {
@ -62,15 +61,13 @@ func (s *testOnePCSuite) TearDownTest() {
} }
func (s *testOnePCSuite) Test1PC() { func (s *testOnePCSuite) Test1PC() {
ctx := context.WithValue(context.Background(), util.SessionID, uint64(1))
k1 := []byte("k1") k1 := []byte("k1")
v1 := []byte("v1") v1 := []byte("v1")
txn := s.begin1PC() txn := s.begin1PC()
err := txn.Set(k1, v1) err := txn.Set(k1, v1)
s.Nil(err) s.Nil(err)
err = txn.Commit(ctx) err = txn.Commit(context.Background())
s.Nil(err) s.Nil(err)
s.True(txn.GetCommitter().IsOnePC()) s.True(txn.GetCommitter().IsOnePC())
s.Equal(txn.GetCommitter().GetOnePCCommitTS(), txn.GetCommitter().GetCommitTS()) s.Equal(txn.GetCommitter().GetOnePCCommitTS(), txn.GetCommitter().GetCommitTS())
@ -78,11 +75,12 @@ func (s *testOnePCSuite) Test1PC() {
// ttlManager is not used for 1PC. // ttlManager is not used for 1PC.
s.True(txn.GetCommitter().IsTTLUninitialized()) s.True(txn.GetCommitter().IsTTLUninitialized())
// 1PC doesn't work if sessionID == 0 // 1PC doesn't work if 1PC option is not set
k2 := []byte("k2") k2 := []byte("k2")
v2 := []byte("v2") v2 := []byte("v2")
txn = s.begin1PC() txn = s.begin()
err = txn.Set(k2, v2) err = txn.Set(k2, v2)
s.Nil(err) s.Nil(err)
err = txn.Commit(context.Background()) err = txn.Commit(context.Background())
@ -91,83 +89,67 @@ func (s *testOnePCSuite) Test1PC() {
s.Equal(txn.GetCommitter().GetOnePCCommitTS(), uint64(0)) s.Equal(txn.GetCommitter().GetOnePCCommitTS(), uint64(0))
s.Greater(txn.GetCommitter().GetCommitTS(), txn.StartTS()) s.Greater(txn.GetCommitter().GetCommitTS(), txn.StartTS())
// 1PC doesn't work if system variable not set // Test multiple keys
k3 := []byte("k3") k3 := []byte("k3")
v3 := []byte("v3") v3 := []byte("v3")
txn = s.begin()
err = txn.Set(k3, v3)
s.Nil(err)
err = txn.Commit(ctx)
s.Nil(err)
s.False(txn.GetCommitter().IsOnePC())
s.Equal(txn.GetCommitter().GetOnePCCommitTS(), uint64(0))
s.Greater(txn.GetCommitter().GetCommitTS(), txn.StartTS())
// Test multiple keys
k4 := []byte("k4") k4 := []byte("k4")
v4 := []byte("v4") v4 := []byte("v4")
k5 := []byte("k5") k5 := []byte("k5")
v5 := []byte("v5") v5 := []byte("v5")
k6 := []byte("k6")
v6 := []byte("v6")
txn = s.begin1PC() txn = s.begin1PC()
err = txn.Set(k3, v3)
s.Nil(err)
err = txn.Set(k4, v4) err = txn.Set(k4, v4)
s.Nil(err) s.Nil(err)
err = txn.Set(k5, v5) err = txn.Set(k5, v5)
s.Nil(err) s.Nil(err)
err = txn.Set(k6, v6) err = txn.Commit(context.Background())
s.Nil(err)
err = txn.Commit(ctx)
s.Nil(err) s.Nil(err)
s.True(txn.GetCommitter().IsOnePC()) s.True(txn.GetCommitter().IsOnePC())
s.Equal(txn.GetCommitter().GetOnePCCommitTS(), txn.GetCommitter().GetCommitTS()) s.Equal(txn.GetCommitter().GetOnePCCommitTS(), txn.GetCommitter().GetCommitTS())
s.Greater(txn.GetCommitter().GetOnePCCommitTS(), txn.StartTS()) s.Greater(txn.GetCommitter().GetOnePCCommitTS(), txn.StartTS())
// Check keys are committed with the same version // Check keys are committed with the same version
s.mustGetFromSnapshot(txn.GetCommitTS(), k3, v3)
s.mustGetFromSnapshot(txn.GetCommitTS(), k4, v4) s.mustGetFromSnapshot(txn.GetCommitTS(), k4, v4)
s.mustGetFromSnapshot(txn.GetCommitTS(), k5, v5) s.mustGetFromSnapshot(txn.GetCommitTS(), k5, v5)
s.mustGetFromSnapshot(txn.GetCommitTS(), k6, v6) s.mustGetNoneFromSnapshot(txn.GetCommitTS()-1, k3)
s.mustGetNoneFromSnapshot(txn.GetCommitTS()-1, k4) s.mustGetNoneFromSnapshot(txn.GetCommitTS()-1, k4)
s.mustGetNoneFromSnapshot(txn.GetCommitTS()-1, k5) s.mustGetNoneFromSnapshot(txn.GetCommitTS()-1, k5)
s.mustGetNoneFromSnapshot(txn.GetCommitTS()-1, k6)
// Overwriting in MVCC // Overwriting in MVCC
v6New := []byte("v6new") v5New := []byte("v5new")
txn = s.begin1PC() txn = s.begin1PC()
err = txn.Set(k6, v6New) err = txn.Set(k5, v5New)
s.Nil(err) s.Nil(err)
err = txn.Commit(ctx) err = txn.Commit(context.Background())
s.Nil(err) s.Nil(err)
s.True(txn.GetCommitter().IsOnePC()) s.True(txn.GetCommitter().IsOnePC())
s.Equal(txn.GetCommitter().GetOnePCCommitTS(), txn.GetCommitter().GetCommitTS()) s.Equal(txn.GetCommitter().GetOnePCCommitTS(), txn.GetCommitter().GetCommitTS())
s.Greater(txn.GetCommitter().GetOnePCCommitTS(), txn.StartTS()) s.Greater(txn.GetCommitter().GetOnePCCommitTS(), txn.StartTS())
s.mustGetFromSnapshot(txn.GetCommitTS(), k6, v6New) s.mustGetFromSnapshot(txn.GetCommitTS(), k5, v5New)
s.mustGetFromSnapshot(txn.GetCommitTS()-1, k6, v6) s.mustGetFromSnapshot(txn.GetCommitTS()-1, k5, v5)
// Check all keys // Check all keys
keys := [][]byte{k1, k2, k3, k4, k5, k6} keys := [][]byte{k1, k2, k3, k4, k5}
values := [][]byte{v1, v2, v3, v4, v5, v6New} values := [][]byte{v1, v2, v3, v4, v5New}
ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
s.Nil(err) s.Nil(err)
snap := s.store.GetSnapshot(ver) snap := s.store.GetSnapshot(ver)
for i, k := range keys { for i, k := range keys {
v, err := snap.Get(ctx, k) v, err := snap.Get(context.Background(), k)
s.Nil(err) s.Nil(err)
s.Equal(v, values[i]) s.Equal(v, values[i])
} }
} }
func (s *testOnePCSuite) Test1PCIsolation() { func (s *testOnePCSuite) Test1PCIsolation() {
ctx := context.WithValue(context.Background(), util.SessionID, uint64(1))
k := []byte("k") k := []byte("k")
v1 := []byte("v1") v1 := []byte("v1")
txn := s.begin1PC() txn := s.begin1PC()
txn.Set(k, v1) txn.Set(k, v1)
err := txn.Commit(ctx) err := txn.Commit(context.Background())
s.Nil(err) s.Nil(err)
v2 := []byte("v2") v2 := []byte("v2")
@ -177,14 +159,14 @@ func (s *testOnePCSuite) Test1PCIsolation() {
// Make `txn`'s commitTs more likely to be less than `txn2`'s startTs if there's bug in commitTs // Make `txn`'s commitTs more likely to be less than `txn2`'s startTs if there's bug in commitTs
// calculation. // calculation.
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
_, err := s.store.GetOracle().GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) _, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
s.Nil(err) s.Nil(err)
} }
txn2 := s.begin1PC() txn2 := s.begin1PC()
s.mustGetFromTxn(txn2, k, v1) s.mustGetFromTxn(txn2, k, v1)
err = txn.Commit(ctx) err = txn.Commit(context.Background())
s.True(txn.GetCommitter().IsOnePC()) s.True(txn.GetCommitter().IsOnePC())
s.Nil(err) s.Nil(err)
@ -201,8 +183,6 @@ func (s *testOnePCSuite) Test1PCDisallowMultiRegion() {
return return
} }
ctx := context.WithValue(context.Background(), util.SessionID, uint64(1))
txn := s.begin1PC() txn := s.begin1PC()
keys := []string{"k0", "k1", "k2", "k3"} keys := []string{"k0", "k1", "k2", "k3"}
@ -212,7 +192,7 @@ func (s *testOnePCSuite) Test1PCDisallowMultiRegion() {
s.Nil(err) s.Nil(err)
err = txn.Set([]byte(keys[3]), []byte(values[3])) err = txn.Set([]byte(keys[3]), []byte(values[3]))
s.Nil(err) s.Nil(err)
err = txn.Commit(ctx) err = txn.Commit(context.Background())
s.Nil(err) s.Nil(err)
// 1PC doesn't work if it affects multiple regions. // 1PC doesn't work if it affects multiple regions.
@ -227,7 +207,7 @@ func (s *testOnePCSuite) Test1PCDisallowMultiRegion() {
s.Nil(err) s.Nil(err)
err = txn.Set([]byte(keys[2]), []byte(values[2])) err = txn.Set([]byte(keys[2]), []byte(values[2]))
s.Nil(err) s.Nil(err)
err = txn.Commit(ctx) err = txn.Commit(context.Background())
s.Nil(err) s.Nil(err)
s.False(txn.GetCommitter().IsOnePC()) s.False(txn.GetCommitter().IsOnePC())
s.Equal(txn.GetCommitter().GetOnePCCommitTS(), uint64(0)) s.Equal(txn.GetCommitter().GetOnePCCommitTS(), uint64(0))
@ -237,7 +217,7 @@ func (s *testOnePCSuite) Test1PCDisallowMultiRegion() {
s.Nil(err) s.Nil(err)
snap := s.store.GetSnapshot(ver) snap := s.store.GetSnapshot(ver)
for i, k := range keys { for i, k := range keys {
v, err := snap.Get(ctx, []byte(k)) v, err := snap.Get(context.Background(), []byte(k))
s.Nil(err) s.Nil(err)
s.Equal(v, []byte(values[i])) s.Equal(v, []byte(values[i]))
} }
@ -252,11 +232,10 @@ func (s *testOnePCSuite) Test1PCLinearizability() {
s.Nil(err) s.Nil(err)
err = t2.Set([]byte("b"), []byte("b1")) err = t2.Set([]byte("b"), []byte("b1"))
s.Nil(err) s.Nil(err)
ctx := context.WithValue(context.Background(), util.SessionID, uint64(1))
// t2 commits earlier than t1 // t2 commits earlier than t1
err = t2.Commit(ctx) err = t2.Commit(context.Background())
s.Nil(err) s.Nil(err)
err = t1.Commit(ctx) err = t1.Commit(context.Background())
s.Nil(err) s.Nil(err)
commitTS1 := t1.GetCommitter().GetCommitTS() commitTS1 := t1.GetCommitter().GetCommitTS()
commitTS2 := t2.GetCommitter().GetCommitTS() commitTS2 := t2.GetCommitter().GetCommitTS()
@ -273,8 +252,7 @@ func (s *testOnePCSuite) Test1PCWithMultiDC() {
err := localTxn.Set([]byte("a"), []byte("a1")) err := localTxn.Set([]byte("a"), []byte("a1"))
localTxn.SetScope("bj") localTxn.SetScope("bj")
s.Nil(err) s.Nil(err)
ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) err = localTxn.Commit(context.Background())
err = localTxn.Commit(ctx)
s.Nil(err) s.Nil(err)
s.False(localTxn.GetCommitter().IsOnePC()) s.False(localTxn.GetCommitter().IsOnePC())
@ -282,7 +260,7 @@ func (s *testOnePCSuite) Test1PCWithMultiDC() {
err = globalTxn.Set([]byte("b"), []byte("b1")) err = globalTxn.Set([]byte("b"), []byte("b1"))
globalTxn.SetScope(oracle.GlobalTxnScope) globalTxn.SetScope(oracle.GlobalTxnScope)
s.Nil(err) s.Nil(err)
err = globalTxn.Commit(ctx) err = globalTxn.Commit(context.Background())
s.Nil(err) s.Nil(err)
s.True(globalTxn.GetCommitter().IsOnePC()) s.True(globalTxn.GetCommitter().IsOnePC())
} }
@ -294,8 +272,7 @@ func (s *testOnePCSuite) TestTxnCommitCounter() {
txn := s.begin() txn := s.begin()
err := txn.Set([]byte("k"), []byte("v")) err := txn.Set([]byte("k"), []byte("v"))
s.Nil(err) s.Nil(err)
ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) err = txn.Commit(context.Background())
err = txn.Commit(ctx)
s.Nil(err) s.Nil(err)
curr := metrics.GetTxnCommitCounter() curr := metrics.GetTxnCommitCounter()
diff := curr.Sub(initial) diff := curr.Sub(initial)
@ -307,7 +284,7 @@ func (s *testOnePCSuite) TestTxnCommitCounter() {
txn = s.beginAsyncCommit() txn = s.beginAsyncCommit()
err = txn.Set([]byte("k1"), []byte("v1")) err = txn.Set([]byte("k1"), []byte("v1"))
s.Nil(err) s.Nil(err)
err = txn.Commit(ctx) err = txn.Commit(context.Background())
s.Nil(err) s.Nil(err)
curr = metrics.GetTxnCommitCounter() curr = metrics.GetTxnCommitCounter()
diff = curr.Sub(initial) diff = curr.Sub(initial)
@ -319,7 +296,7 @@ func (s *testOnePCSuite) TestTxnCommitCounter() {
txn = s.begin1PC() txn = s.begin1PC()
err = txn.Set([]byte("k2"), []byte("v2")) err = txn.Set([]byte("k2"), []byte("v2"))
s.Nil(err) s.Nil(err)
err = txn.Commit(ctx) err = txn.Commit(context.Background())
s.Nil(err) s.Nil(err)
curr = metrics.GetTxnCommitCounter() curr = metrics.GetTxnCommitCounter()
diff = curr.Sub(initial) diff = curr.Sub(initial)

View File

@ -1269,7 +1269,6 @@ func (s *testCommitterSuite) TestAsyncCommit() {
committer, err := txn1.NewCommitter(0) committer, err := txn1.NewCommitter(0)
s.Nil(err) s.Nil(err)
committer.SetSessionID(1)
committer.SetMinCommitTS(txn1.StartTS() + 10) committer.SetMinCommitTS(txn1.StartTS() + 10)
err = committer.Execute(ctx) err = committer.Execute(ctx)
s.Nil(err) s.Nil(err)

View File

@ -40,7 +40,6 @@ import (
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/util"
) )
var scanBatchSize = tikv.ConfigProbe{}.GetScanBatchSize() var scanBatchSize = tikv.ConfigProbe{}.GetScanBatchSize()
@ -54,15 +53,12 @@ type testScanSuite struct {
store *tikv.KVStore store *tikv.KVStore
recordPrefix []byte recordPrefix []byte
rowNums []int rowNums []int
ctx context.Context
} }
func (s *testScanSuite) SetupSuite() { func (s *testScanSuite) SetupSuite() {
s.store = NewTestStore(s.T()) s.store = NewTestStore(s.T())
s.recordPrefix = []byte("prefix") s.recordPrefix = []byte("prefix")
s.rowNums = append(s.rowNums, 1, scanBatchSize, scanBatchSize+1, scanBatchSize*3) s.rowNums = append(s.rowNums, 1, scanBatchSize, scanBatchSize+1, scanBatchSize*3)
// Avoid using async commit logic.
s.ctx = context.WithValue(context.Background(), util.SessionID, uint64(0))
} }
func (s *testScanSuite) TearDownSuite() { func (s *testScanSuite) TearDownSuite() {
@ -76,7 +72,7 @@ func (s *testScanSuite) TearDownSuite() {
s.Require().Nil(err) s.Require().Nil(err)
scanner.Next() scanner.Next()
} }
err = txn.Commit(s.ctx) err = txn.Commit(context.Background())
s.Require().Nil(err) s.Require().Nil(err)
err = s.store.Close() err = s.store.Close()
s.Require().Nil(err) s.Require().Nil(err)
@ -125,16 +121,16 @@ func (s *testScanSuite) TestScan() {
err := txn.Set(s.makeKey(i), s.makeValue(i)) err := txn.Set(s.makeKey(i), s.makeValue(i))
s.Nil(err) s.Nil(err)
} }
err := txn.Commit(s.ctx) err := txn.Commit(context.Background())
s.Nil(err) s.Nil(err)
mockTableID := int64(999) mockTableID := int64(999)
if rowNum > 123 { if rowNum > 123 {
_, err = s.store.SplitRegions(s.ctx, [][]byte{s.makeKey(123)}, false, &mockTableID) _, err = s.store.SplitRegions(context.Background(), [][]byte{s.makeKey(123)}, false, &mockTableID)
s.Nil(err) s.Nil(err)
} }
if rowNum > 456 { if rowNum > 456 {
_, err = s.store.SplitRegions(s.ctx, [][]byte{s.makeKey(456)}, false, &mockTableID) _, err = s.store.SplitRegions(context.Background(), [][]byte{s.makeKey(456)}, false, &mockTableID)
s.Nil(err) s.Nil(err)
} }

View File

@ -913,7 +913,7 @@ func (c *twoPhaseCommitter) checkAsyncCommit() bool {
asyncCommitCfg := config.GetGlobalConfig().TiKVClient.AsyncCommit asyncCommitCfg := config.GetGlobalConfig().TiKVClient.AsyncCommit
// TODO the keys limit need more tests, this value makes the unit test pass by now. // TODO the keys limit need more tests, this value makes the unit test pass by now.
// Async commit is not compatible with Binlog because of the non unique timestamp issue. // Async commit is not compatible with Binlog because of the non unique timestamp issue.
if c.sessionID > 0 && c.txn.enableAsyncCommit && if c.txn.enableAsyncCommit &&
uint(c.mutations.Len()) <= asyncCommitCfg.KeysLimit && uint(c.mutations.Len()) <= asyncCommitCfg.KeysLimit &&
!c.shouldWriteBinlog() { !c.shouldWriteBinlog() {
totalKeySize := uint64(0) totalKeySize := uint64(0)
@ -935,7 +935,7 @@ func (c *twoPhaseCommitter) checkOnePC() bool {
return false return false
} }
return c.sessionID > 0 && !c.shouldWriteBinlog() && c.txn.enable1PC return !c.shouldWriteBinlog() && c.txn.enable1PC
} }
func (c *twoPhaseCommitter) needLinearizability() bool { func (c *twoPhaseCommitter) needLinearizability() bool {