Export commitTS of KVTxn (#1489)

Signed-off-by: Bin Zhang <b6g2021@gmail.com>
This commit is contained in:
Bin Zhang 2024-11-26 23:15:26 -08:00 committed by GitHub
parent 05d115b3e8
commit 89643b0e8c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 38 additions and 38 deletions

View File

@ -112,12 +112,12 @@ func (s *testOnePCSuite) Test1PC() {
s.Equal(txn.GetCommitter().GetOnePCCommitTS(), txn.GetCommitter().GetCommitTS())
s.Greater(txn.GetCommitter().GetOnePCCommitTS(), txn.StartTS())
// Check keys are committed with the same version
s.mustGetFromSnapshot(txn.GetCommitTS(), k3, v3)
s.mustGetFromSnapshot(txn.GetCommitTS(), k4, v4)
s.mustGetFromSnapshot(txn.GetCommitTS(), k5, v5)
s.mustGetNoneFromSnapshot(txn.GetCommitTS()-1, k3)
s.mustGetNoneFromSnapshot(txn.GetCommitTS()-1, k4)
s.mustGetNoneFromSnapshot(txn.GetCommitTS()-1, k5)
s.mustGetFromSnapshot(txn.CommitTS(), k3, v3)
s.mustGetFromSnapshot(txn.CommitTS(), k4, v4)
s.mustGetFromSnapshot(txn.CommitTS(), k5, v5)
s.mustGetNoneFromSnapshot(txn.CommitTS()-1, k3)
s.mustGetNoneFromSnapshot(txn.CommitTS()-1, k4)
s.mustGetNoneFromSnapshot(txn.CommitTS()-1, k5)
// Overwriting in MVCC
v5New := []byte("v5new")
@ -129,8 +129,8 @@ func (s *testOnePCSuite) Test1PC() {
s.True(txn.GetCommitter().IsOnePC())
s.Equal(txn.GetCommitter().GetOnePCCommitTS(), txn.GetCommitter().GetCommitTS())
s.Greater(txn.GetCommitter().GetOnePCCommitTS(), txn.StartTS())
s.mustGetFromSnapshot(txn.GetCommitTS(), k5, v5New)
s.mustGetFromSnapshot(txn.GetCommitTS()-1, k5, v5)
s.mustGetFromSnapshot(txn.CommitTS(), k5, v5New)
s.mustGetFromSnapshot(txn.CommitTS()-1, k5, v5)
// Check all keys
keys := [][]byte{k1, k2, k3, k4, k5}
@ -175,8 +175,8 @@ func (s *testOnePCSuite) Test1PCIsolation() {
s.mustGetFromTxn(txn2, k, v1)
s.Nil(txn2.Rollback())
s.mustGetFromSnapshot(txn.GetCommitTS(), k, v2)
s.mustGetFromSnapshot(txn.GetCommitTS()-1, k, v1)
s.mustGetFromSnapshot(txn.CommitTS(), k, v2)
s.mustGetFromSnapshot(txn.CommitTS()-1, k, v1)
}
func (s *testOnePCSuite) Test1PCDisallowMultiRegion() {

View File

@ -1076,7 +1076,7 @@ func (s *testCommitterSuite) TestPessimisticLockAllowLockWithConflict() {
txn.StartAggressiveLocking()
s.Nil(txn0.Commit(context.Background()))
s.Greater(txn0.GetCommitTS(), txn.StartTS())
s.Greater(txn0.CommitTS(), txn.StartTS())
lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
if checkExistence {
@ -1087,9 +1087,9 @@ func (s *testCommitterSuite) TestPessimisticLockAllowLockWithConflict() {
}
s.Nil(txn.LockKeys(context.Background(), lockCtx, key))
s.Equal(txn0.GetCommitTS(), lockCtx.MaxLockedWithConflictTS)
s.Equal(txn0.CommitTS(), lockCtx.MaxLockedWithConflictTS)
v := lockCtx.Values[string(key)]
s.Equal(txn0.GetCommitTS(), v.LockedWithConflictTS)
s.Equal(txn0.CommitTS(), v.LockedWithConflictTS)
s.True(v.Exists)
s.Equal(value, v.Value)
@ -1269,12 +1269,12 @@ func (s *testCommitterSuite) TestAggressiveLockingInsert() {
s.IsType(errors.Cause(err), &tikverr.ErrWriteConflict{})
lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
s.NoError(insertPessimisticLock(lockCtx, "k8"))
s.Equal(txn2.GetCommitTS(), lockCtx.MaxLockedWithConflictTS)
s.Equal(txn2.GetCommitTS(), lockCtx.Values["k8"].LockedWithConflictTS)
s.Equal(txn2.CommitTS(), lockCtx.MaxLockedWithConflictTS)
s.Equal(txn2.CommitTS(), lockCtx.Values["k8"].LockedWithConflictTS)
// Update forUpdateTS to simulate a pessimistic retry.
newForUpdateTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
s.Nil(err)
s.GreaterOrEqual(newForUpdateTS, txn2.GetCommitTS())
s.GreaterOrEqual(newForUpdateTS, txn2.CommitTS())
lockCtx = &kv.LockCtx{ForUpdateTS: newForUpdateTS, WaitStartTime: time.Now()}
mustAlreadyExist(insertPessimisticLock(lockCtx, "k7"))
s.NoError(insertPessimisticLock(lockCtx, "k8"))
@ -1291,7 +1291,7 @@ func (s *testCommitterSuite) TestAggressiveLockingLockOnlyIfExists() {
txn0 := s.begin()
s.NoError(txn0.Set([]byte("k1"), []byte("v1")))
s.NoError(txn0.Commit(context.Background()))
txn0CommitTS := txn0.GetCommitTS()
txn0CommitTS := txn0.CommitTS()
txn.StartAggressiveLocking()
lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now(), ReturnValues: true, LockOnlyIfExists: true}
@ -1312,7 +1312,7 @@ func (s *testCommitterSuite) TestAggressiveLockingLockOnlyIfExists() {
txn0 = s.begin()
s.NoError(txn0.Delete([]byte("k1")))
s.NoError(txn0.Commit(context.Background()))
txn0CommitTS = txn0.GetCommitTS()
txn0CommitTS = txn0.CommitTS()
txn.StartAggressiveLocking()
lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now(), ReturnValues: true, LockOnlyIfExists: true}
@ -1461,13 +1461,13 @@ func (s *testCommitterSuite) TestAggressiveLockingLoadValueOptionChanges() {
s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k2")))
if firstAttemptLockedWithConflict {
s.Equal(txn2.GetCommitTS(), lockCtx.MaxLockedWithConflictTS)
s.Equal(txn2.GetCommitTS(), lockCtx.Values["k1"].LockedWithConflictTS)
s.Equal(txn2.GetCommitTS(), lockCtx.Values["k2"].LockedWithConflictTS)
s.Equal(txn2.CommitTS(), lockCtx.MaxLockedWithConflictTS)
s.Equal(txn2.CommitTS(), lockCtx.Values["k1"].LockedWithConflictTS)
s.Equal(txn2.CommitTS(), lockCtx.Values["k2"].LockedWithConflictTS)
}
if firstAttemptLockedWithConflict {
forUpdateTS = txn2.GetCommitTS() + 1
forUpdateTS = txn2.CommitTS() + 1
} else {
forUpdateTS++
}

View File

@ -81,7 +81,7 @@ func (s *testAssertionSuite) testAssertionImpl(keyPrefix string, pessimistic boo
err = prepareTxn.Commit(context.Background())
s.Nil(err)
prepareStartTS := prepareTxn.GetCommitter().GetStartTS()
prepareCommitTS := prepareTxn.GetCommitTS()
prepareCommitTS := prepareTxn.CommitTS()
// A helper to perform a complete transaction. When multiple keys are passed in, assertion will be set on only
// the last key.

View File

@ -112,7 +112,7 @@ func (s *testAsyncCommitCommon) putKV(key, value []byte, enableAsyncCommit bool)
s.Nil(err)
err = txn.Commit(context.Background())
s.Nil(err)
return txn.StartTS(), txn.GetCommitTS()
return txn.StartTS(), txn.CommitTS()
}
func (s *testAsyncCommitCommon) mustGetFromTxn(txn transaction.TxnProbe, key, expectedValue []byte) {
@ -440,8 +440,8 @@ func (s *testAsyncCommitSuite) TestAsyncCommitLinearizability() {
s.Nil(err)
err = t1.Commit(ctx)
s.Nil(err)
commitTS1 := t1.GetCommitTS()
commitTS2 := t2.GetCommitTS()
commitTS1 := t1.CommitTS()
commitTS2 := t2.CommitTS()
s.Less(commitTS2, commitTS1)
}

View File

@ -96,7 +96,7 @@ func (s *testIsolationSuite) SetWithRetry(k, v []byte) writeRecord {
if err == nil {
return writeRecord{
startTS: txn.StartTS(),
commitTS: txn.GetCommitTS(),
commitTS: txn.CommitTS(),
}
}
}

View File

@ -142,7 +142,7 @@ func (s *testLockSuite) putKV(key, value []byte) (uint64, uint64) {
s.Nil(err)
err = txn.Commit(context.Background())
s.Nil(err)
return txn.StartTS(), txn.GetCommitTS()
return txn.StartTS(), txn.CommitTS()
}
func (s *testLockSuite) prepareAlphabetLocks() {
@ -1311,7 +1311,7 @@ func (s *testLockWithTiKVSuite) TestPrewriteCheckForUpdateTS() {
info := simulatedTxn.GetAggressiveLockingKeysInfo()
s.Equal(1, len(info))
s.Equal(k1, info[0].Key())
s.Equal(txn2.GetCommitTS(), info[0].ActualLockForUpdateTS())
s.Equal(txn2.CommitTS(), info[0].ActualLockForUpdateTS())
simulatedTxn.DoneAggressiveLocking(context.Background())
defer func() {
@ -1324,12 +1324,12 @@ func (s *testLockWithTiKVSuite) TestPrewriteCheckForUpdateTS() {
s.Error(err)
s.Regexp("[pP]essimistic ?[lL]ock ?[nN]ot ?[fF]ound", err.Error())
snapshot := s.store.GetSnapshot(txn2.GetCommitTS())
snapshot := s.store.GetSnapshot(txn2.CommitTS())
v, err := snapshot.Get(context.Background(), k1)
s.NoError(err)
s.Equal(v2, v)
snapshot = s.store.GetSnapshot(txn2.GetCommitTS() - 1)
snapshot = s.store.GetSnapshot(txn2.CommitTS() - 1)
_, err = snapshot.Get(context.Background(), k1)
s.Equal(tikverr.ErrNotExist, err)
@ -1369,7 +1369,7 @@ func (s *testLockWithTiKVSuite) TestPrewriteCheckForUpdateTS() {
s.NoError(txn.Commit(ctx))
snapshot = s.store.GetSnapshot(txn.GetCommitTS())
snapshot = s.store.GetSnapshot(txn.CommitTS())
v, err = snapshot.Get(context.Background(), k2)
s.NoError(err)
s.Equal(v1, v)
@ -1377,7 +1377,7 @@ func (s *testLockWithTiKVSuite) TestPrewriteCheckForUpdateTS() {
s.NoError(err)
s.Equal(v1, v)
snapshot = s.store.GetSnapshot(txn.GetCommitTS() - 1)
snapshot = s.store.GetSnapshot(txn.CommitTS() - 1)
v, err = snapshot.Get(context.Background(), k2)
s.NoError(err)
s.Equal(v2, v)

View File

@ -38,11 +38,6 @@ func (txn TxnProbe) SetStartTS(ts uint64) {
txn.startTS = ts
}
// GetCommitTS returns the commit ts.
func (txn TxnProbe) GetCommitTS() uint64 {
return txn.commitTS
}
// GetUnionStore returns transaction's embedded unionstore.
func (txn TxnProbe) GetUnionStore() *unionstore.KVUnionStore {
return txn.us

View File

@ -1704,6 +1704,11 @@ func (txn *KVTxn) StartTS() uint64 {
return txn.startTS
}
// CommitTS returns the commit timestamp of the already committed transaction, or zero if it's not committed yet.
func (txn *KVTxn) CommitTS() uint64 {
return txn.commitTS
}
// Valid returns if the transaction is valid.
// A transaction become invalid after commit or rollback.
func (txn *KVTxn) Valid() bool {