diff --git a/integration_tests/2pc_test.go b/integration_tests/2pc_test.go index 0109cbf6..e97eb701 100644 --- a/integration_tests/2pc_test.go +++ b/integration_tests/2pc_test.go @@ -1953,3 +1953,24 @@ func (s *testCommitterSuite) TestFlagsInMemBufferMutations() { s.Equal(assertNotExist, mutations.IsAssertNotExist(i)) }) } + +func (s *testCommitterSuite) TestExtractKeyExistsErr() { + txn := s.begin() + err := txn.Set([]byte("de"), []byte("ef")) + s.Nil(err) + err = txn.Commit(context.Background()) + s.Nil(err) + + txn = s.begin() + err = txn.GetMemBuffer().SetWithFlags([]byte("de"), []byte("fg"), kv.SetPresumeKeyNotExists) + s.Nil(err) + committer, err := txn.NewCommitter(0) + s.Nil(err) + // Forcibly construct a case when Op_Insert is prewritten while not having KeyNotExists flag. + // In real use cases, it should only happen when enabling amending transactions. + txn.GetMemBuffer().UpdateFlags([]byte("de"), kv.DelPresumeKeyNotExists) + err = committer.PrewriteAllMutations(context.Background()) + s.ErrorContains(err, "existErr") + s.True(txn.GetMemBuffer().TryLock()) + txn.GetMemBuffer().Unlock() +} diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index 03a3e301..45f8340b 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -459,10 +459,10 @@ func newTwoPhaseCommitter(txn *KVTxn, sessionID uint64) (*twoPhaseCommitter, err func (c *twoPhaseCommitter) extractKeyExistsErr(err *tikverr.ErrKeyExist) error { c.txn.GetMemBuffer().RLock() + defer c.txn.GetMemBuffer().RUnlock() if !c.txn.us.HasPresumeKeyNotExists(err.GetKey()) { return errors.Errorf("session %d, existErr for key:%s should not be nil", c.sessionID, err.GetKey()) } - c.txn.GetMemBuffer().RUnlock() return errors.WithStack(err) }