mirror of https://github.com/tikv/client-go.git
Acquire read lock in LockKeys to avoid data race (#585)
Signed-off-by: Yilin Chen <sticnarf@gmail.com> Signed-off-by: Yilin Chen <sticnarf@gmail.com>
This commit is contained in:
parent
51f3bd3944
commit
8dfd76bf46
|
|
@ -458,9 +458,11 @@ func newTwoPhaseCommitter(txn *KVTxn, sessionID uint64) (*twoPhaseCommitter, err
|
|||
}
|
||||
|
||||
func (c *twoPhaseCommitter) extractKeyExistsErr(err *tikverr.ErrKeyExist) error {
|
||||
c.txn.GetMemBuffer().RLock()
|
||||
if !c.txn.us.HasPresumeKeyNotExists(err.GetKey()) {
|
||||
return errors.Errorf("session %d, existErr for key:%s should not be nil", c.sessionID, err.GetKey())
|
||||
}
|
||||
c.txn.GetMemBuffer().RUnlock()
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -86,6 +86,7 @@ func (actionPessimisticRollback) tiKVTxnRegionsNumHistogram() prometheus.Observe
|
|||
func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error {
|
||||
m := batch.mutations
|
||||
mutations := make([]*kvrpcpb.Mutation, m.Len())
|
||||
c.txn.GetMemBuffer().RLock()
|
||||
for i := 0; i < m.Len(); i++ {
|
||||
mut := &kvrpcpb.Mutation{
|
||||
Op: kvrpcpb.Op_PessimisticLock,
|
||||
|
|
@ -96,6 +97,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
|
|||
}
|
||||
mutations[i] = mut
|
||||
}
|
||||
c.txn.GetMemBuffer().RUnlock()
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdPessimisticLock, &kvrpcpb.PessimisticLockRequest{
|
||||
Mutations: mutations,
|
||||
PrimaryLock: c.primary(),
|
||||
|
|
|
|||
|
|
@ -627,7 +627,10 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput
|
|||
}
|
||||
}
|
||||
}()
|
||||
|
||||
memBuf := txn.us.GetMemBuffer()
|
||||
// Avoid data race with concurrent updates to the memBuf
|
||||
memBuf.RLock()
|
||||
for _, key := range keysInput {
|
||||
// The value of lockedMap is only used by pessimistic transactions.
|
||||
var valueExist, locked, checkKeyExists bool
|
||||
|
|
@ -642,6 +645,7 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput
|
|||
if checkKeyExists && valueExist {
|
||||
alreadyExist := kvrpcpb.AlreadyExist{Key: key}
|
||||
e := &tikverr.ErrKeyExist{AlreadyExist: &alreadyExist}
|
||||
memBuf.RUnlock()
|
||||
return txn.committer.extractKeyExistsErr(e)
|
||||
}
|
||||
}
|
||||
|
|
@ -651,6 +655,8 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput
|
|||
lockCtx.Values[string(key)] = tikv.ReturnedValue{AlreadyLocked: true}
|
||||
}
|
||||
}
|
||||
memBuf.RUnlock()
|
||||
|
||||
if len(keys) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
|
@ -717,11 +723,18 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput
|
|||
atomic.CompareAndSwapUint32(lockCtx.Killed, 1, 0)
|
||||
}
|
||||
if err != nil {
|
||||
var unmarkKeys [][]byte
|
||||
// Avoid data race with concurrent updates to the memBuf
|
||||
memBuf.RLock()
|
||||
for _, key := range keys {
|
||||
if txn.us.HasPresumeKeyNotExists(key) {
|
||||
txn.us.UnmarkPresumeKeyNotExists(key)
|
||||
unmarkKeys = append(unmarkKeys, key)
|
||||
}
|
||||
}
|
||||
memBuf.RUnlock()
|
||||
for _, key := range unmarkKeys {
|
||||
txn.us.UnmarkPresumeKeyNotExists(key)
|
||||
}
|
||||
keyMayBeLocked := !(tikverr.IsErrWriteConflict(err) || tikverr.IsErrKeyExist(err))
|
||||
// If there is only 1 key and lock fails, no need to do pessimistic rollback.
|
||||
if len(keys) > 1 || keyMayBeLocked {
|
||||
|
|
|
|||
Loading…
Reference in New Issue