diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index 02a51730..03a3e301 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -458,9 +458,11 @@ func newTwoPhaseCommitter(txn *KVTxn, sessionID uint64) (*twoPhaseCommitter, err } func (c *twoPhaseCommitter) extractKeyExistsErr(err *tikverr.ErrKeyExist) error { + c.txn.GetMemBuffer().RLock() if !c.txn.us.HasPresumeKeyNotExists(err.GetKey()) { return errors.Errorf("session %d, existErr for key:%s should not be nil", c.sessionID, err.GetKey()) } + c.txn.GetMemBuffer().RUnlock() return errors.WithStack(err) } diff --git a/txnkv/transaction/pessimistic.go b/txnkv/transaction/pessimistic.go index 7aae990a..9a3f993e 100644 --- a/txnkv/transaction/pessimistic.go +++ b/txnkv/transaction/pessimistic.go @@ -86,6 +86,7 @@ func (actionPessimisticRollback) tiKVTxnRegionsNumHistogram() prometheus.Observe func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error { m := batch.mutations mutations := make([]*kvrpcpb.Mutation, m.Len()) + c.txn.GetMemBuffer().RLock() for i := 0; i < m.Len(); i++ { mut := &kvrpcpb.Mutation{ Op: kvrpcpb.Op_PessimisticLock, @@ -96,6 +97,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * } mutations[i] = mut } + c.txn.GetMemBuffer().RUnlock() req := tikvrpc.NewRequest(tikvrpc.CmdPessimisticLock, &kvrpcpb.PessimisticLockRequest{ Mutations: mutations, PrimaryLock: c.primary(), diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index f60b0ed3..36fd1541 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -627,7 +627,10 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput } } }() + memBuf := txn.us.GetMemBuffer() + // Avoid data race with concurrent updates to the memBuf + memBuf.RLock() for _, key := range keysInput { // The value of lockedMap is only used by pessimistic transactions. var valueExist, locked, checkKeyExists bool @@ -642,6 +645,7 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput if checkKeyExists && valueExist { alreadyExist := kvrpcpb.AlreadyExist{Key: key} e := &tikverr.ErrKeyExist{AlreadyExist: &alreadyExist} + memBuf.RUnlock() return txn.committer.extractKeyExistsErr(e) } } @@ -651,6 +655,8 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput lockCtx.Values[string(key)] = tikv.ReturnedValue{AlreadyLocked: true} } } + memBuf.RUnlock() + if len(keys) == 0 { return nil } @@ -717,11 +723,18 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput atomic.CompareAndSwapUint32(lockCtx.Killed, 1, 0) } if err != nil { + var unmarkKeys [][]byte + // Avoid data race with concurrent updates to the memBuf + memBuf.RLock() for _, key := range keys { if txn.us.HasPresumeKeyNotExists(key) { - txn.us.UnmarkPresumeKeyNotExists(key) + unmarkKeys = append(unmarkKeys, key) } } + memBuf.RUnlock() + for _, key := range unmarkKeys { + txn.us.UnmarkPresumeKeyNotExists(key) + } keyMayBeLocked := !(tikverr.IsErrWriteConflict(err) || tikverr.IsErrKeyExist(err)) // If there is only 1 key and lock fails, no need to do pessimistic rollback. if len(keys) > 1 || keyMayBeLocked {