mirror of https://github.com/tikv/client-go.git
fix the vulnerability of atomicity in async commit (#492)
* fix the vulnerability of atomicity in async commit Signed-off-by: ekexium <ekexium@gmail.com> Revert "fix the vulnerability of atomicity in async commit" This reverts commit b0b34c56168ba288e1619e66e626321365cb0921. tidy up Signed-off-by: ekexium <ekexium@gmail.com> * add test Signed-off-by: ekexium <ekexium@gmail.com> * shorten the time of the test Signed-off-by: ekexium <ekexium@gmail.com> * fix test Signed-off-by: ekexium <ekexium@gmail.com> Co-authored-by: Yilin Chen <sticnarf@gmail.com>
This commit is contained in:
parent
2be4a58dbc
commit
190a4d190c
|
|
@ -603,3 +603,34 @@ func (s *testAsyncCommitSuite) TestPessimisticTxnResolveAsyncCommitLock() {
|
|||
err = txn.Commit(context.Background())
|
||||
s.Nil(err)
|
||||
}
|
||||
|
||||
func (s *testAsyncCommitSuite) TestRollbackAsyncCommitEnforcesFallback() {
|
||||
// This test doesn't support tikv mode.
|
||||
|
||||
t1 := s.beginAsyncCommit()
|
||||
t1.SetPessimistic(true)
|
||||
t1.Set([]byte("a"), []byte("a"))
|
||||
t1.Set([]byte("z"), []byte("z"))
|
||||
committer, err := t1.NewCommitter(1)
|
||||
s.Nil(err)
|
||||
committer.SetUseAsyncCommit()
|
||||
committer.SetLockTTL(1000)
|
||||
committer.SetMaxCommitTS(oracle.ComposeTS(oracle.ExtractPhysical(committer.GetStartTS())+1500, 0))
|
||||
committer.PrewriteMutations(context.Background(), committer.GetMutations().Slice(0, 1))
|
||||
s.True(committer.IsAsyncCommit())
|
||||
lock := s.mustGetLock([]byte("a"))
|
||||
resolver := tikv.NewLockResolverProb(s.store.GetLockResolver())
|
||||
for {
|
||||
currentTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
||||
s.Nil(err)
|
||||
status, err := resolver.GetTxnStatus(s.bo, lock.TxnID, []byte("a"), currentTS, currentTS, false, false, nil)
|
||||
s.Nil(err)
|
||||
if status.IsRolledBack() {
|
||||
break
|
||||
}
|
||||
time.Sleep(time.Millisecond * 30)
|
||||
}
|
||||
s.True(committer.IsAsyncCommit())
|
||||
committer.PrewriteMutations(context.Background(), committer.GetMutations().Slice(1, 2))
|
||||
s.False(committer.IsAsyncCommit())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -52,6 +52,7 @@ import (
|
|||
"github.com/tikv/client-go/v2/internal/logutil"
|
||||
"github.com/tikv/client-go/v2/internal/retry"
|
||||
"github.com/tikv/client-go/v2/metrics"
|
||||
"github.com/tikv/client-go/v2/oracle"
|
||||
"github.com/tikv/client-go/v2/tikvrpc"
|
||||
"github.com/tikv/client-go/v2/txnkv/txnlock"
|
||||
"github.com/tikv/client-go/v2/util"
|
||||
|
|
@ -108,6 +109,20 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchMutations, txnSize u
|
|||
|
||||
ttl := c.lockTTL
|
||||
|
||||
// ref: https://github.com/pingcap/tidb/issues/33641
|
||||
// we make the TTL satisfy the following condition:
|
||||
// max_commit_ts.physical < start_ts.physical + TTL < (current_ts of some check_txn_status that wants to resolve this lock).physical
|
||||
// and we have:
|
||||
// current_ts <= max_ts <= min_commit_ts of this lock
|
||||
// such that
|
||||
// max_commit_ts < min_commit_ts, so if this lock is resolved, it will be forced to fall back to normal 2PC, thus resolving the issue.
|
||||
if c.isAsyncCommit() && c.isPessimistic {
|
||||
safeTTL := uint64(oracle.ExtractPhysical(c.maxCommitTS)-oracle.ExtractPhysical(c.startTS)) + 1
|
||||
if safeTTL > ttl {
|
||||
ttl = safeTTL
|
||||
}
|
||||
}
|
||||
|
||||
if c.sessionID > 0 {
|
||||
if _, err := util.EvalFailpoint("twoPCShortLockTTL"); err == nil {
|
||||
ttl = 1
|
||||
|
|
|
|||
Loading…
Reference in New Issue