diff --git a/integration_tests/async_commit_test.go b/integration_tests/async_commit_test.go index b3e3159c..55212cac 100644 --- a/integration_tests/async_commit_test.go +++ b/integration_tests/async_commit_test.go @@ -603,3 +603,34 @@ func (s *testAsyncCommitSuite) TestPessimisticTxnResolveAsyncCommitLock() { err = txn.Commit(context.Background()) s.Nil(err) } + +func (s *testAsyncCommitSuite) TestRollbackAsyncCommitEnforcesFallback() { + // This test doesn't support tikv mode. + + t1 := s.beginAsyncCommit() + t1.SetPessimistic(true) + t1.Set([]byte("a"), []byte("a")) + t1.Set([]byte("z"), []byte("z")) + committer, err := t1.NewCommitter(1) + s.Nil(err) + committer.SetUseAsyncCommit() + committer.SetLockTTL(1000) + committer.SetMaxCommitTS(oracle.ComposeTS(oracle.ExtractPhysical(committer.GetStartTS())+1500, 0)) + committer.PrewriteMutations(context.Background(), committer.GetMutations().Slice(0, 1)) + s.True(committer.IsAsyncCommit()) + lock := s.mustGetLock([]byte("a")) + resolver := tikv.NewLockResolverProb(s.store.GetLockResolver()) + for { + currentTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + s.Nil(err) + status, err := resolver.GetTxnStatus(s.bo, lock.TxnID, []byte("a"), currentTS, currentTS, false, false, nil) + s.Nil(err) + if status.IsRolledBack() { + break + } + time.Sleep(time.Millisecond * 30) + } + s.True(committer.IsAsyncCommit()) + committer.PrewriteMutations(context.Background(), committer.GetMutations().Slice(1, 2)) + s.False(committer.IsAsyncCommit()) +} diff --git a/txnkv/transaction/prewrite.go b/txnkv/transaction/prewrite.go index 8405f1c9..4760dfcb 100644 --- a/txnkv/transaction/prewrite.go +++ b/txnkv/transaction/prewrite.go @@ -52,6 +52,7 @@ import ( "github.com/tikv/client-go/v2/internal/logutil" "github.com/tikv/client-go/v2/internal/retry" "github.com/tikv/client-go/v2/metrics" + "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/txnkv/txnlock" "github.com/tikv/client-go/v2/util" @@ -108,6 +109,20 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchMutations, txnSize u ttl := c.lockTTL + // ref: https://github.com/pingcap/tidb/issues/33641 + // we make the TTL satisfy the following condition: + // max_commit_ts.physical < start_ts.physical + TTL < (current_ts of some check_txn_status that wants to resolve this lock).physical + // and we have: + // current_ts <= max_ts <= min_commit_ts of this lock + // such that + // max_commit_ts < min_commit_ts, so if this lock is resolved, it will be forced to fall back to normal 2PC, thus resolving the issue. + if c.isAsyncCommit() && c.isPessimistic { + safeTTL := uint64(oracle.ExtractPhysical(c.maxCommitTS)-oracle.ExtractPhysical(c.startTS)) + 1 + if safeTTL > ttl { + ttl = safeTTL + } + } + if c.sessionID > 0 { if _, err := util.EvalFailpoint("twoPCShortLockTTL"); err == nil { ttl = 1