Add aggressive-locking mechanism and support locking with conflict (#528)

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
This commit is contained in:
MyonKeminta 2022-12-01 11:03:39 +08:00 committed by GitHub
parent 857772dd09
commit 5dc09b15e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 976 additions and 200 deletions

View File

@ -992,6 +992,126 @@ func (s *testCommitterSuite) TestPessimisticLockCheckExistence() {
s.Nil(txn.Rollback())
}
func (s *testCommitterSuite) TestPessimisticLockAllowLockWithConflict() {
key := []byte("key")
txn0 := s.begin()
txn0.SetPessimistic(true)
s.Nil(txn0.Set(key, key))
s.Nil(txn0.Commit(context.Background()))
// No conflict cases
for _, returnValues := range []bool{false, true} {
for _, checkExistence := range []bool{false, true} {
txn := s.begin()
txn.SetPessimistic(true)
txn.StartAggressiveLocking()
lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
if checkExistence {
lockCtx.InitCheckExistence(1)
}
if returnValues {
lockCtx.InitReturnValues(1)
}
s.Nil(txn.LockKeys(context.Background(), lockCtx, key))
if checkExistence || returnValues {
s.Len(lockCtx.Values, 1)
s.True(lockCtx.Values[string(key)].Exists)
} else {
s.Len(lockCtx.Values, 0)
}
if returnValues {
s.Equal(key, lockCtx.Values[string(key)].Value)
} else {
s.Len(lockCtx.Values[string(key)].Value, 0)
}
s.Equal(uint64(0), lockCtx.Values[string(key)].LockedWithConflictTS)
s.Equal(uint64(0), lockCtx.MaxLockedWithConflictTS)
txn.DoneAggressiveLocking(context.Background())
s.Nil(txn.Rollback())
}
}
// Conflicting cases
for _, returnValues := range []bool{false, true} {
for _, checkExistence := range []bool{false, true} {
// Make different values
value := []byte(fmt.Sprintf("value-%v-%v", returnValues, checkExistence))
txn0 := s.begin()
txn0.SetPessimistic(true)
s.Nil(txn0.Set(key, value))
txn := s.begin()
txn.SetPessimistic(true)
txn.StartAggressiveLocking()
s.Nil(txn0.Commit(context.Background()))
s.Greater(txn0.GetCommitTS(), txn.StartTS())
lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
if checkExistence {
lockCtx.InitCheckExistence(1)
}
if returnValues {
lockCtx.InitReturnValues(1)
}
s.Nil(txn.LockKeys(context.Background(), lockCtx, key))
s.Equal(txn0.GetCommitTS(), lockCtx.MaxLockedWithConflictTS)
v := lockCtx.Values[string(key)]
s.Equal(txn0.GetCommitTS(), v.LockedWithConflictTS)
s.True(v.Exists)
s.Equal(value, v.Value)
txn.CancelAggressiveLocking(context.Background())
s.Nil(txn.Rollback())
}
}
}
func (s *testCommitterSuite) TestPessimisticLockAllowLockWithConflictError() {
key := []byte("key")
for _, returnValues := range []bool{false, true} {
for _, checkExistence := range []bool{false, true} {
// Another transaction locked the key.
txn0 := s.begin()
txn0.SetPessimistic(true)
lockCtx := &kv.LockCtx{ForUpdateTS: txn0.StartTS(), WaitStartTime: time.Now()}
s.Nil(txn0.LockKeys(context.Background(), lockCtx, key))
// Test key is locked
txn := s.begin()
txn.SetPessimistic(true)
txn.StartAggressiveLocking()
lockCtx = kv.NewLockCtx(txn.StartTS(), 10, time.Now())
if checkExistence {
lockCtx.InitCheckExistence(1)
}
if returnValues {
lockCtx.InitReturnValues(1)
}
err := txn.LockKeys(context.Background(), lockCtx, key)
s.NotNil(err)
s.Equal(tikverr.ErrLockWaitTimeout.Error(), err.Error())
s.Equal([]string{}, txn.GetAggressiveLockingKeys())
// Abort the blocking transaction.
s.Nil(txn0.Rollback())
// Test region error
s.Nil(failpoint.Enable("tikvclient/tikvStoreSendReqResult", `1*return("PessimisticLockNotLeader")`))
err = txn.LockKeys(context.Background(), lockCtx, key)
s.Nil(err)
s.Nil(failpoint.Disable("tikvclient/tikvStoreSendReqResult"))
s.Equal([]string{"key"}, txn.GetAggressiveLockingKeys())
txn.CancelAggressiveLocking(context.Background())
s.Nil(txn.Rollback())
}
}
}
// TestElapsedTTL tests that elapsed time is correct even if ts physical time is greater than local time.
func (s *testCommitterSuite) TestElapsedTTL() {
key := []byte("key")

View File

@ -462,15 +462,15 @@ func (s *apiTestSuite) TestEmptyValue() {
s.Empty(v)
// batch_get
vs := s.mustBatchGetBytes(prefix, []string{"key", "key1"})
s.Equal([][]byte{[]byte{}, nil}, vs)
s.Equal([][]byte{{}, nil}, vs)
// scan
keys, values := s.mustScanBytes(prefix, "key", "keyz", 10)
s.Equal([][]byte{[]byte("key")}, keys)
s.Equal([][]byte{[]byte{}}, values)
s.Equal([][]byte{{}}, values)
// reverse scan
keys, values = s.mustReverseScanBytes(prefix, "keyz", "key", 10)
s.Equal([][]byte{[]byte("key")}, keys)
s.Equal([][]byte{[]byte{}}, values)
s.Equal([][]byte{{}}, values)
}
verifyNotExist := func() {

View File

@ -935,6 +935,12 @@ func (s *RegionRequestSender) SendReqCtx(
Resp: &kvrpcpb.GCResponse{RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{}}},
}, nil, nil
}
case "PessimisticLockNotLeader":
if req.Type == tikvrpc.CmdPessimisticLock {
return &tikvrpc.Response{
Resp: &kvrpcpb.PessimisticLockResponse{RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{}}},
}, nil, nil
}
case "GCServerIsBusy":
if req.Type == tikvrpc.CmdGC {
return &tikvrpc.Response{

View File

@ -532,11 +532,14 @@ type lockCtx struct {
ttl uint64
minCommitTs uint64
returnValues bool
checkExistence bool
values [][]byte
keyNotFound []bool
returnValues bool
checkExistence bool
results []*kvrpcpb.PessimisticLockKeyResult
LockOnlyIfExists bool
// Lock waiting is not supported in mocktikv. This only controls whether locking with conflict is allowed.
WakeUpMode kvrpcpb.PessimisticLockWakeUpMode
}
// PessimisticLock writes the pessimistic lock.
@ -554,6 +557,7 @@ func (mvcc *MVCCLevelDB) PessimisticLock(req *kvrpcpb.PessimisticLockRequest) *k
returnValues: req.ReturnValues,
checkExistence: req.CheckExistence,
LockOnlyIfExists: req.LockOnlyIfExists,
WakeUpMode: req.WakeUpMode,
}
lockWaitTime := req.WaitTimeout
@ -565,6 +569,11 @@ func (mvcc *MVCCLevelDB) PessimisticLock(req *kvrpcpb.PessimisticLockRequest) *k
errs = append(errs, err)
if err != nil {
anyError = true
if lCtx.WakeUpMode == kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock {
lCtx.results = append(lCtx.results, &kvrpcpb.PessimisticLockKeyResult{
Type: kvrpcpb.PessimisticLockKeyResultType_LockResultFailed,
})
}
}
if lockWaitTime == LockNoWait {
if _, ok := err.(*ErrLocked); ok {
@ -572,6 +581,14 @@ func (mvcc *MVCCLevelDB) PessimisticLock(req *kvrpcpb.PessimisticLockRequest) *k
}
}
}
if lCtx.WakeUpMode == kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock {
resp.Results = lCtx.results
}
if !anyError || lCtx.WakeUpMode == kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock {
if len(lCtx.results) != len(mutations) {
panic("pessimistic lock result count not match")
}
}
if anyError {
if lockWaitTime != LockNoWait {
// TODO: remove this when implement sever side wait.
@ -584,11 +601,34 @@ func (mvcc *MVCCLevelDB) PessimisticLock(req *kvrpcpb.PessimisticLockRequest) *k
resp.Errors = convertToKeyErrors([]error{err})
return resp
}
if req.ReturnValues {
resp.Values = lCtx.values
resp.NotFounds = lCtx.keyNotFound
} else if req.CheckExistence {
resp.NotFounds = lCtx.keyNotFound
if lCtx.WakeUpMode == kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeNormal {
if req.ReturnValues {
resp.Values = make([][]byte, 0, len(lCtx.results))
resp.NotFounds = make([]bool, 0, len(lCtx.results))
for _, res := range lCtx.results {
if res.Type == kvrpcpb.PessimisticLockKeyResultType_LockResultNormal {
resp.Values = append(resp.Values, res.Value)
resp.NotFounds = append(resp.NotFounds, !res.Existence)
} else {
panic("unreachable")
}
}
} else if req.CheckExistence {
resp.NotFounds = make([]bool, 0, len(lCtx.results))
for _, res := range lCtx.results {
if res.Type == kvrpcpb.PessimisticLockKeyResultType_LockResultNormal {
resp.NotFounds = append(resp.NotFounds, !res.Existence)
} else {
panic("unreachable")
}
}
} else {
for _, res := range lCtx.results {
if res.Type != kvrpcpb.PessimisticLockKeyResultType_LockResultNormal {
panic("unreachable")
}
}
}
}
return resp
}
@ -631,15 +671,38 @@ func (mvcc *MVCCLevelDB) pessimisticLockMutation(batch *leveldb.Batch, mutation
// For pessimisticLockMutation, check the correspond rollback record, there may be rollbackLock
// operation between startTS and forUpdateTS
val, err := checkConflictValue(iter, mutation, forUpdateTS, startTS, true, kvrpcpb.AssertionLevel_Off)
val, err := checkConflictValue(iter, mutation, forUpdateTS, startTS, true, kvrpcpb.AssertionLevel_Off, lctx.WakeUpMode == kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock)
if err != nil {
return err
}
if lctx.returnValues {
lctx.values = append(lctx.values, val)
lctx.keyNotFound = append(lctx.keyNotFound, len(val) == 0)
} else if lctx.checkExistence {
lctx.keyNotFound = append(lctx.keyNotFound, len(val) == 0)
if conflict, ok := err.(*ErrConflict); lctx.WakeUpMode == kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock && ok {
lctx.results = append(lctx.results, &kvrpcpb.PessimisticLockKeyResult{
Type: kvrpcpb.PessimisticLockKeyResultType_LockResultLockedWithConflict,
Value: val,
Existence: len(val) != 0,
LockedWithConflictTs: conflict.ConflictCommitTS,
})
} else {
return err
}
} else {
if lctx.returnValues {
lctx.results = append(lctx.results, &kvrpcpb.PessimisticLockKeyResult{
Type: kvrpcpb.PessimisticLockKeyResultType_LockResultNormal,
Value: val,
Existence: len(val) != 0,
LockedWithConflictTs: 0,
})
} else if lctx.checkExistence {
lctx.results = append(lctx.results, &kvrpcpb.PessimisticLockKeyResult{
Type: kvrpcpb.PessimisticLockKeyResultType_LockResultNormal,
Value: nil,
Existence: len(val) != 0,
LockedWithConflictTs: 0,
})
} else {
lctx.results = append(lctx.results, &kvrpcpb.PessimisticLockKeyResult{
Type: kvrpcpb.PessimisticLockKeyResultType_LockResultNormal,
})
}
}
if lctx.LockOnlyIfExists && len(val) == 0 {
@ -771,7 +834,7 @@ func (mvcc *MVCCLevelDB) Prewrite(req *kvrpcpb.PrewriteRequest) []error {
return errs
}
func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, forUpdateTS uint64, startTS uint64, getVal bool, assertionLevel kvrpcpb.AssertionLevel) ([]byte, error) {
func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, forUpdateTS uint64, startTS uint64, getVal bool, assertionLevel kvrpcpb.AssertionLevel, allowLockWithConflict bool) ([]byte, error) {
dec := &valueDecoder{
expectKey: m.Key,
}
@ -794,13 +857,18 @@ func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, forUpdateTS uint64,
}
// Note that it's a write conflict here, even if the value is a rollback one, or a op_lock record
var writeConflictErr error = nil
if dec.value.commitTS > forUpdateTS {
return nil, &ErrConflict{
writeConflictErr = &ErrConflict{
StartTS: forUpdateTS,
ConflictTS: dec.value.startTS,
ConflictCommitTS: dec.value.commitTS,
Key: m.Key,
}
if !allowLockWithConflict {
return nil, writeConflictErr
}
assertionLevel = kvrpcpb.AssertionLevel_Off
}
needGetVal := getVal
@ -877,10 +945,12 @@ func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, forUpdateTS uint64,
}
}
}
// writeConflictErr is not nil only when write conflict is found and `allowLockWithConflict is set to true.
if getVal {
return retVal, nil
return retVal, writeConflictErr
}
return nil, nil
return nil, writeConflictErr
}
func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch,
@ -923,7 +993,7 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch,
// The minCommitTS has been pushed forward.
minCommitTS = dec.lock.minCommitTS
}
_, err = checkConflictValue(iter, mutation, startTS, startTS, false, assertionLevel)
_, err = checkConflictValue(iter, mutation, startTS, startTS, false, assertionLevel, false)
if err != nil {
return err
}
@ -931,7 +1001,7 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch,
if pessimisticAction == kvrpcpb.PrewriteRequest_DO_PESSIMISTIC_CHECK {
return ErrAbort("pessimistic lock not found")
}
_, err = checkConflictValue(iter, mutation, startTS, startTS, false, assertionLevel)
_, err = checkConflictValue(iter, mutation, startTS, startTS, false, assertionLevel, false)
if err != nil {
return err
}

View File

@ -38,9 +38,10 @@ package kv
// Notice that the highest bit is used by red black tree, do not set flags on it.
type KeyFlags uint16
// FlagBytes is the byte size of type KeyFlags
const FlagBytes = 2
const (
// FlagBytes is the byte size of type KeyFlags
FlagBytes = 2
flagPresumeKNE KeyFlags = 1 << iota
flagKeyLocked
flagNeedLocked

View File

@ -32,9 +32,10 @@ import (
// ReturnedValue pairs the Value and AlreadyLocked flag for PessimisticLock return values result.
type ReturnedValue struct {
Value []byte
Exists bool
AlreadyLocked bool
Value []byte
Exists bool
LockedWithConflictTS uint64
AlreadyLocked bool
}
// Used for pessimistic lock wait time
@ -56,21 +57,22 @@ func defaultLockWaitTime() *lockWaitTimeInMs {
// LockCtx contains information for LockKeys method.
type LockCtx struct {
Killed *uint32
ForUpdateTS uint64
lockWaitTime *lockWaitTimeInMs
WaitStartTime time.Time
PessimisticLockWaited *int32
LockKeysDuration *int64
LockKeysCount *int32
ReturnValues bool
CheckExistence bool
LockOnlyIfExists bool
Values map[string]ReturnedValue
ValuesLock sync.Mutex
LockExpired *uint32
Stats *util.LockKeysDetails
ResourceGroupTag []byte
Killed *uint32
ForUpdateTS uint64
lockWaitTime *lockWaitTimeInMs
WaitStartTime time.Time
PessimisticLockWaited *int32
LockKeysDuration *int64
LockKeysCount *int32
ReturnValues bool
CheckExistence bool
LockOnlyIfExists bool
Values map[string]ReturnedValue
MaxLockedWithConflictTS uint64
ValuesLock sync.Mutex
LockExpired *uint32
Stats *util.LockKeysDetails
ResourceGroupTag []byte
// ResourceGroupTagger is a special tagger used only for PessimisticLockRequest.
// We did not use tikvrpc.ResourceGroupTagger here because the kv package is a
// more basic component, and we cannot rely on tikvrpc.Request here, so we treat

View File

@ -135,6 +135,8 @@ type twoPhaseCommitter struct {
primaryKey []byte
forUpdateTS uint64
maxLockedWithConflictTS uint64
mu struct {
sync.RWMutex
undeterminedErr error // undeterminedErr saves the rpc error we encounter when commit primary key.
@ -708,7 +710,10 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error {
func (c *twoPhaseCommitter) primary() []byte {
if len(c.primaryKey) == 0 {
return c.mutations.GetKey(0)
if c.mutations != nil {
return c.mutations.GetKey(0)
}
return nil
}
return c.primaryKey
}
@ -1766,7 +1771,7 @@ func (c *twoPhaseCommitter) amendPessimisticLock(ctx context.Context, addMutatio
var err error
for tryTimes < retryLimit {
pessimisticLockBo := retry.NewBackofferWithVars(ctx, pessimisticLockMaxBackoff, c.txn.vars)
err = c.pessimisticLockMutations(pessimisticLockBo, lCtx, &keysNeedToLock)
err = c.pessimisticLockMutations(pessimisticLockBo, lCtx, kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeNormal, &keysNeedToLock)
if err != nil {
// KeysNeedToLock won't change, so don't async rollback pessimistic locks here for write conflict.
if _, ok := errors.Cause(err).(*tikverr.ErrWriteConflict); ok {
@ -1972,6 +1977,7 @@ func (b *batched) appendBatchMutationsBySize(region locate.RegionVerID, mutation
var start, end int
for start = 0; start < mutations.Len(); start = end {
isPrimary := false
var size int
for end = start; end < mutations.Len() && size < limit; end++ {
var k, v []byte
@ -1980,11 +1986,13 @@ func (b *batched) appendBatchMutationsBySize(region locate.RegionVerID, mutation
size += sizeFn(k, v)
if b.primaryIdx < 0 && bytes.Equal(k, b.primaryKey) {
b.primaryIdx = len(b.batches)
isPrimary = true
}
}
b.batches = append(b.batches, batchMutations{
region: region,
mutations: mutations.Slice(start, end),
isPrimary: isPrimary,
})
}
}

View File

@ -41,6 +41,7 @@ import (
"sync/atomic"
"time"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
@ -59,6 +60,7 @@ import (
type actionPessimisticLock struct {
*kv.LockCtx
wakeUpMode kvrpcpb.PessimisticLockWakeUpMode
}
type actionPessimisticRollback struct{}
@ -83,21 +85,32 @@ func (actionPessimisticRollback) tiKVTxnRegionsNumHistogram() prometheus.Observe
return metrics.TxnRegionsNumHistogramPessimisticRollback
}
type diagnosticContext struct {
resolvingRecordToken *int
sender *locate.RegionRequestSender
reqDuration time.Duration
}
func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error {
m := batch.mutations
mutations := make([]*kvrpcpb.Mutation, m.Len())
c.txn.GetMemBuffer().RLock()
for i := 0; i < m.Len(); i++ {
mut := &kvrpcpb.Mutation{
Op: kvrpcpb.Op_PessimisticLock,
Key: m.GetKey(i),
convertMutationsToPb := func(committerMutations CommitterMutations) []*kvrpcpb.Mutation {
mutations := make([]*kvrpcpb.Mutation, committerMutations.Len())
c.txn.GetMemBuffer().RLock()
for i := 0; i < committerMutations.Len(); i++ {
mut := &kvrpcpb.Mutation{
Op: kvrpcpb.Op_PessimisticLock,
Key: committerMutations.GetKey(i),
}
if c.txn.us.HasPresumeKeyNotExists(committerMutations.GetKey(i)) || (c.doingAmend && committerMutations.GetOp(i) == kvrpcpb.Op_Insert) {
mut.Assertion = kvrpcpb.Assertion_NotExist
}
mutations[i] = mut
}
if c.txn.us.HasPresumeKeyNotExists(m.GetKey(i)) || (c.doingAmend && m.GetOp(i) == kvrpcpb.Op_Insert) {
mut.Assertion = kvrpcpb.Assertion_NotExist
}
mutations[i] = mut
c.txn.GetMemBuffer().RUnlock()
return mutations
}
c.txn.GetMemBuffer().RUnlock()
m := batch.mutations
mutations := convertMutationsToPb(m)
req := tikvrpc.NewRequest(tikvrpc.CmdPessimisticLock, &kvrpcpb.PessimisticLockRequest{
Mutations: mutations,
PrimaryLock: c.primary(),
@ -108,6 +121,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
ReturnValues: action.ReturnValues,
CheckExistence: action.CheckExistence,
MinCommitTs: c.forUpdateTS + 1,
WakeUpMode: action.wakeUpMode,
LockOnlyIfExists: action.LockOnlyIfExists,
}, kvrpcpb.Context{
Priority: c.priority,
@ -120,7 +134,12 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
req.ResourceGroupTag = action.LockCtx.ResourceGroupTagger(req.Req.(*kvrpcpb.PessimisticLockRequest))
}
lockWaitStartTime := action.WaitStartTime
var resolvingRecordToken *int
diagCtx := diagnosticContext{}
defer func() {
if diagCtx.resolvingRecordToken != nil {
c.store.GetLockResolver().ResolveLocksDone(c.startTS, *diagCtx.resolvingRecordToken)
}
}()
for {
// if lockWaitTime set, refine the request `WaitTimeout` field based on timeout limit
if action.LockWaitTime() > 0 && action.LockWaitTime() != kv.LockAlwaysWait {
@ -150,130 +169,31 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
sender := locate.NewRegionRequestSender(c.store.GetRegionCache(), c.store.GetTiKVClient())
startTime := time.Now()
resp, err := sender.SendReq(bo, req, batch.region, client.ReadTimeoutShort)
reqDuration := time.Since(startTime)
diagCtx.reqDuration = time.Since(startTime)
diagCtx.sender = sender
if action.LockCtx.Stats != nil {
atomic.AddInt64(&action.LockCtx.Stats.LockRPCTime, int64(reqDuration))
atomic.AddInt64(&action.LockCtx.Stats.LockRPCTime, int64(diagCtx.reqDuration))
atomic.AddInt64(&action.LockCtx.Stats.LockRPCCount, 1)
}
if err != nil {
return err
}
regionErr, err := resp.GetRegionError()
if err != nil {
return err
}
if regionErr != nil {
// For other region error and the fake region error, backoff because
// there's something wrong.
// For the real EpochNotMatch error, don't backoff.
if regionErr.GetEpochNotMatch() == nil || locate.IsFakeRegionError(regionErr) {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return err
}
}
same, err := batch.relocate(bo, c.store.GetRegionCache())
if action.wakeUpMode == kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeNormal {
finished, err := action.handlePessimisticLockResponseNormalMode(c, bo, &batch, mutations, resp, &diagCtx)
if err != nil {
return err
}
if same {
continue
if finished {
return nil
}
err = c.pessimisticLockMutations(bo, action.LockCtx, batch.mutations)
return err
}
if resp.Resp == nil {
return errors.WithStack(tikverr.ErrBodyMissing)
}
lockResp := resp.Resp.(*kvrpcpb.PessimisticLockResponse)
keyErrs := lockResp.GetErrors()
if len(keyErrs) == 0 {
if action.LockCtx.Stats != nil {
action.LockCtx.Stats.MergeReqDetails(reqDuration, batch.region.GetID(), sender.GetStoreAddr(), lockResp.ExecDetailsV2)
} else if action.wakeUpMode == kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock {
finished, err := action.handlePessimisticLockResponseForceLockMode(c, bo, &batch, mutations, resp, &diagCtx)
if err != nil {
return err
}
if batch.isPrimary {
// After locking the primary key, we should protect the primary lock from expiring
// now in case locking the remaining keys take a long time.
c.run(c, action.LockCtx)
}
// Handle the case that the TiKV's version is too old and doesn't support `CheckExistence`.
// If `CheckExistence` is set, `ReturnValues` is not set and `CheckExistence` is not supported, skip
// retrieving value totally (indicated by `skipRetrievingValue`) to avoid panicking.
skipRetrievingValue := !action.ReturnValues && action.CheckExistence && len(lockResp.NotFounds) == 0
if (action.ReturnValues || action.CheckExistence) && !skipRetrievingValue {
action.ValuesLock.Lock()
for i, mutation := range mutations {
var value []byte
if action.ReturnValues {
value = lockResp.Values[i]
}
var exists = !lockResp.NotFounds[i]
action.Values[string(mutation.Key)] = kv.ReturnedValue{
Value: value,
Exists: exists,
}
}
action.ValuesLock.Unlock()
}
return nil
}
var locks []*txnlock.Lock
for _, keyErr := range keyErrs {
// Check already exists error
if alreadyExist := keyErr.GetAlreadyExist(); alreadyExist != nil {
e := &tikverr.ErrKeyExist{AlreadyExist: alreadyExist}
return c.extractKeyExistsErr(e)
}
if deadlock := keyErr.Deadlock; deadlock != nil {
return errors.WithStack(&tikverr.ErrDeadlock{Deadlock: deadlock})
}
// Extract lock from key error
lock, err1 := txnlock.ExtractLockFromKeyErr(keyErr)
if err1 != nil {
return err1
}
locks = append(locks, lock)
}
// Because we already waited on tikv, no need to Backoff here.
// tikv default will wait 3s(also the maximum wait value) when lock error occurs
if resolvingRecordToken == nil {
token := c.store.GetLockResolver().RecordResolvingLocks(locks, c.startTS)
resolvingRecordToken = &token
defer c.store.GetLockResolver().ResolveLocksDone(c.startTS, *resolvingRecordToken)
} else {
c.store.GetLockResolver().UpdateResolvingLocks(locks, c.startTS, *resolvingRecordToken)
}
resolveLockOpts := txnlock.ResolveLocksOptions{
CallerStartTS: 0,
Locks: locks,
}
if action.LockCtx.Stats != nil {
resolveLockOpts.Detail = &action.LockCtx.Stats.ResolveLock
}
resolveLockRes, err := c.store.GetLockResolver().ResolveLocksWithOpts(bo, resolveLockOpts)
if err != nil {
return err
}
// If msBeforeTxnExpired is not zero, it means there are still locks blocking us acquiring
// the pessimistic lock. We should return acquire fail with nowait set or timeout error if necessary.
if resolveLockRes.TTL > 0 {
if action.LockWaitTime() == kv.LockNoWait {
return errors.WithStack(tikverr.ErrLockAcquireFailAndNoWaitSet)
} else if action.LockWaitTime() == kv.LockAlwaysWait {
// do nothing but keep wait
} else {
// the lockWaitTime is set, we should return wait timeout if we are still blocked by a lock
if time.Since(lockWaitStartTime).Milliseconds() >= action.LockWaitTime() {
return errors.WithStack(tikverr.ErrLockWaitTimeout)
}
}
if action.LockCtx.PessimisticLockWaited != nil {
atomic.StoreInt32(action.LockCtx.PessimisticLockWaited, 1)
if finished {
return nil
}
}
@ -291,15 +211,296 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
}
}
func (action actionPessimisticLock) handleRegionError(c *twoPhaseCommitter, bo *retry.Backoffer, batch *batchMutations, regionErr *errorpb.Error) (finished bool, err error) {
// For other region error and the fake region error, backoff because
// there's something wrong.
// For the real EpochNotMatch error, don't backoff.
if regionErr.GetEpochNotMatch() == nil || locate.IsFakeRegionError(regionErr) {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return true, err
}
}
same, err := batch.relocate(bo, c.store.GetRegionCache())
if err != nil {
return true, err
}
if same {
return false, nil
}
err = c.pessimisticLockMutations(bo, action.LockCtx, action.wakeUpMode, batch.mutations)
return true, err
}
func (action actionPessimisticLock) handleKeyError(c *twoPhaseCommitter, keyErrs []*kvrpcpb.KeyError) (locks []*txnlock.Lock, finished bool, err error) {
for _, keyErr := range keyErrs {
// Check already exists error
if alreadyExist := keyErr.GetAlreadyExist(); alreadyExist != nil {
e := &tikverr.ErrKeyExist{AlreadyExist: alreadyExist}
return nil, true, c.extractKeyExistsErr(e)
}
if deadlock := keyErr.Deadlock; deadlock != nil {
return nil, true, errors.WithStack(&tikverr.ErrDeadlock{Deadlock: deadlock})
}
// Extract lock from key error
lock, err1 := txnlock.ExtractLockFromKeyErr(keyErr)
if err1 != nil {
return nil, true, err1
}
locks = append(locks, lock)
}
return locks, false, nil
}
func (action actionPessimisticLock) handlePessimisticLockResponseNormalMode(c *twoPhaseCommitter, bo *retry.Backoffer, batch *batchMutations, mutationsPb []*kvrpcpb.Mutation, resp *tikvrpc.Response, diagCtx *diagnosticContext) (finished bool, err error) {
regionErr, err := resp.GetRegionError()
if err != nil {
return true, err
}
if regionErr != nil {
return action.handleRegionError(c, bo, batch, regionErr)
}
if resp.Resp == nil {
return true, errors.WithStack(tikverr.ErrBodyMissing)
}
lockResp := resp.Resp.(*kvrpcpb.PessimisticLockResponse)
if len(lockResp.Results) != 0 {
// We use old protocol in this mode. The `Result` field should not be used.
return true, errors.New("Pessimistic lock response corrupted")
}
keyErrs := lockResp.GetErrors()
if len(keyErrs) == 0 {
if action.LockCtx.Stats != nil {
action.LockCtx.Stats.MergeReqDetails(diagCtx.reqDuration, batch.region.GetID(), diagCtx.sender.GetStoreAddr(), lockResp.ExecDetailsV2)
}
if batch.isPrimary {
// After locking the primary key, we should protect the primary lock from expiring
// now in case locking the remaining keys take a long time.
c.run(c, action.LockCtx)
}
// Handle the case that the TiKV's version is too old and doesn't support `CheckExistence`.
// If `CheckExistence` is set, `ReturnValues` is not set and `CheckExistence` is not supported, skip
// retrieving value totally (indicated by `skipRetrievingValue`) to avoid panicking.
skipRetrievingValue := !action.ReturnValues && action.CheckExistence && len(lockResp.NotFounds) == 0
if (action.ReturnValues || action.CheckExistence) && !skipRetrievingValue {
action.ValuesLock.Lock()
for i, mutation := range mutationsPb {
var value []byte
if action.ReturnValues {
value = lockResp.Values[i]
}
var exists = !lockResp.NotFounds[i]
action.Values[string(mutation.Key)] = kv.ReturnedValue{
Value: value,
Exists: exists,
}
}
action.ValuesLock.Unlock()
}
return true, nil
}
locks, finished, err := action.handleKeyError(c, keyErrs)
if err != nil {
return finished, err
}
// Because we already waited on tikv, no need to Backoff here.
// tikv default will wait 3s(also the maximum wait value) when lock error occurs
if diagCtx.resolvingRecordToken == nil {
token := c.store.GetLockResolver().RecordResolvingLocks(locks, c.startTS)
diagCtx.resolvingRecordToken = &token
} else {
c.store.GetLockResolver().UpdateResolvingLocks(locks, c.startTS, *diagCtx.resolvingRecordToken)
}
resolveLockOpts := txnlock.ResolveLocksOptions{
CallerStartTS: 0,
Locks: locks,
}
if action.LockCtx.Stats != nil {
resolveLockOpts.Detail = &action.LockCtx.Stats.ResolveLock
}
resolveLockRes, err := c.store.GetLockResolver().ResolveLocksWithOpts(bo, resolveLockOpts)
if err != nil {
return true, err
}
// If msBeforeTxnExpired is not zero, it means there are still locks blocking us acquiring
// the pessimistic lock. We should return acquire fail with nowait set or timeout error if necessary.
if resolveLockRes.TTL > 0 {
if action.LockWaitTime() == kv.LockNoWait {
return true, errors.WithStack(tikverr.ErrLockAcquireFailAndNoWaitSet)
} else if action.LockWaitTime() == kv.LockAlwaysWait {
// do nothing but keep wait
} else {
// the lockWaitTime is set, we should return wait timeout if we are still blocked by a lock
if time.Since(action.WaitStartTime).Milliseconds() >= action.LockWaitTime() {
return true, errors.WithStack(tikverr.ErrLockWaitTimeout)
}
}
if action.LockCtx.PessimisticLockWaited != nil {
atomic.StoreInt32(action.LockCtx.PessimisticLockWaited, 1)
}
}
return false, nil
}
func (action actionPessimisticLock) handlePessimisticLockResponseForceLockMode(c *twoPhaseCommitter, bo *retry.Backoffer, batch *batchMutations, mutationsPb []*kvrpcpb.Mutation, resp *tikvrpc.Response, diagCtx *diagnosticContext) (finished bool, err error) {
regionErr, err := resp.GetRegionError()
if err != nil {
return true, err
}
if resp.Resp == nil {
return true, errors.WithStack(tikverr.ErrBodyMissing)
}
lockResp := resp.Resp.(*kvrpcpb.PessimisticLockResponse)
isMutationFailed := false
keyErrs := lockResp.GetErrors()
// We only allow single key in ForceLock mode now.
if len(mutationsPb) > 1 || len(lockResp.Results) > 1 {
panic("unreachable")
}
if batch.isPrimary && len(lockResp.Results) > 0 && lockResp.Results[0].Type != kvrpcpb.PessimisticLockKeyResultType_LockResultFailed {
// After locking the primary key, we should protect the primary lock from expiring.
c.run(c, action.LockCtx)
}
if len(lockResp.Results) > 0 {
res := lockResp.Results[0]
switch res.Type {
case kvrpcpb.PessimisticLockKeyResultType_LockResultNormal:
if action.ReturnValues {
action.ValuesLock.Lock()
action.Values[string(mutationsPb[0].Key)] = kv.ReturnedValue{
Value: res.Value,
Exists: res.Existence,
}
action.ValuesLock.Unlock()
} else if action.CheckExistence {
action.ValuesLock.Lock()
action.Values[string(mutationsPb[0].Key)] = kv.ReturnedValue{
Exists: res.Existence,
}
action.ValuesLock.Unlock()
}
case kvrpcpb.PessimisticLockKeyResultType_LockResultLockedWithConflict:
action.ValuesLock.Lock()
if action.Values == nil {
action.Values = make(map[string]kv.ReturnedValue, 1)
}
action.Values[string(mutationsPb[0].Key)] = kv.ReturnedValue{
Value: res.Value,
Exists: res.Existence,
LockedWithConflictTS: res.LockedWithConflictTs,
}
if res.LockedWithConflictTs > action.MaxLockedWithConflictTS {
action.MaxLockedWithConflictTS = res.LockedWithConflictTs
}
action.ValuesLock.Unlock()
case kvrpcpb.PessimisticLockKeyResultType_LockResultFailed:
isMutationFailed = true
default:
panic("unreachable")
}
}
if len(lockResp.Results) > 0 && !isMutationFailed {
if action.LockCtx.Stats != nil {
action.LockCtx.Stats.MergeReqDetails(diagCtx.reqDuration, batch.region.GetID(), diagCtx.sender.GetStoreAddr(), lockResp.ExecDetailsV2)
}
}
locks, finished, err := action.handleKeyError(c, keyErrs)
if err != nil {
return finished, err
}
if regionErr != nil {
return action.handleRegionError(c, bo, batch, regionErr)
}
if isMutationFailed {
if len(locks) > 0 {
// Because we already waited on tikv, no need to Backoff here.
// tikv default will wait 3s(also the maximum wait value) when lock error occurs
if diagCtx.resolvingRecordToken == nil {
token := c.store.GetLockResolver().RecordResolvingLocks(locks, c.startTS)
diagCtx.resolvingRecordToken = &token
} else {
c.store.GetLockResolver().UpdateResolvingLocks(locks, c.startTS, *diagCtx.resolvingRecordToken)
}
resolveLockOpts := txnlock.ResolveLocksOptions{
CallerStartTS: 0,
Locks: locks,
}
if action.LockCtx.Stats != nil {
resolveLockOpts.Detail = &action.LockCtx.Stats.ResolveLock
}
resolveLockRes, err := c.store.GetLockResolver().ResolveLocksWithOpts(bo, resolveLockOpts)
if err != nil {
return true, err
}
// If msBeforeTxnExpired is not zero, it means there are still locks blocking us acquiring
// the pessimistic lock. We should return acquire fail with nowait set or timeout error if necessary.
if resolveLockRes.TTL > 0 {
if action.LockWaitTime() == kv.LockNoWait {
return true, errors.WithStack(tikverr.ErrLockAcquireFailAndNoWaitSet)
} else if action.LockWaitTime() == kv.LockAlwaysWait {
// do nothing but keep wait
} else {
// the lockWaitTime is set, we should return wait timeout if we are still blocked by a lock
if time.Since(action.WaitStartTime).Milliseconds() >= action.LockWaitTime() {
return true, errors.WithStack(tikverr.ErrLockWaitTimeout)
}
}
if action.LockCtx.PessimisticLockWaited != nil {
atomic.StoreInt32(action.LockCtx.PessimisticLockWaited, 1)
}
}
return false, nil
}
// If the failedMutations is not empty and the error is not KeyIsLocked, the function should have already
// returned before. So this is an unreachable path.
return true, errors.New("Pessimistic lock response corrupted")
}
if len(locks) != 0 {
// If the key error is KeyIsLocked, we assume the server must have set resp.Results.
return true, errors.New("Pessimistic lock response corrupted")
}
if len(lockResp.Results) == 0 {
// If the `Results` field is missing in response, there must be either some unretryable error in keyErrs or some
// region error, therefore the function must have returned in above logic. This is supposed to be an unreachable
// path if TiKV is implemented correctly.
return true, errors.New("Pessimistic lock response corrupted")
}
return true, nil
}
func (actionPessimisticRollback) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error {
forUpdateTS := c.forUpdateTS
if c.maxLockedWithConflictTS > forUpdateTS {
forUpdateTS = c.maxLockedWithConflictTS
}
req := tikvrpc.NewRequest(tikvrpc.CmdPessimisticRollback, &kvrpcpb.PessimisticRollbackRequest{
StartVersion: c.startTS,
ForUpdateTs: c.forUpdateTS,
ForUpdateTs: forUpdateTS,
Keys: batch.mutations.GetKeys(),
})
req.RequestSource = util.RequestSourceFromCtx(bo.GetCtx())
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
resp, err := c.store.SendReq(bo, req, batch.region, client.ReadTimeoutShort)
if err != nil {
return err
}
@ -317,7 +518,7 @@ func (actionPessimisticRollback) handleSingleBatch(c *twoPhaseCommitter, bo *ret
return nil
}
func (c *twoPhaseCommitter) pessimisticLockMutations(bo *retry.Backoffer, lockCtx *kv.LockCtx, mutations CommitterMutations) error {
func (c *twoPhaseCommitter) pessimisticLockMutations(bo *retry.Backoffer, lockCtx *kv.LockCtx, lockWaitMode kvrpcpb.PessimisticLockWakeUpMode, mutations CommitterMutations) error {
if c.sessionID > 0 {
if val, err := util.EvalFailpoint("beforePessimisticLock"); err == nil {
// Pass multiple instructions in one string, delimited by commas, to trigger multiple behaviors, like
@ -338,7 +539,7 @@ func (c *twoPhaseCommitter) pessimisticLockMutations(bo *retry.Backoffer, lockCt
}
}
}
return c.doActionOnMutations(bo, actionPessimisticLock{lockCtx}, mutations)
return c.doActionOnMutations(bo, actionPessimisticLock{LockCtx: lockCtx, wakeUpMode: lockWaitMode}, mutations)
}
func (c *twoPhaseCommitter) pessimisticRollbackMutations(bo *retry.Backoffer, mutations CommitterMutations) error {

View File

@ -97,6 +97,24 @@ func (txn TxnProbe) GetLockedCount() int {
return txn.lockedCnt
}
// GetAggressiveLockingKeys returns the keys that are in the current aggressive locking stage.
func (txn TxnProbe) GetAggressiveLockingKeys() []string {
keys := make([]string, 0, len(txn.aggressiveLockingContext.currentLockedKeys))
for key := range txn.aggressiveLockingContext.currentLockedKeys {
keys = append(keys, key)
}
return keys
}
// GetAggressiveLockingPreviousKeys returns the keys that were locked in the previous aggressive locking stage.
func (txn TxnProbe) GetAggressiveLockingPreviousKeys() []string {
keys := make([]string, 0, len(txn.aggressiveLockingContext.lastRetryUnnecessaryLocks))
for key := range txn.aggressiveLockingContext.lastRetryUnnecessaryLocks {
keys = append(keys, key)
}
return keys
}
func newTwoPhaseCommitterWithInit(txn *KVTxn, sessionID uint64) (*twoPhaseCommitter, error) {
c, err := newTwoPhaseCommitter(txn, sessionID)
if err != nil {

View File

@ -39,6 +39,7 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"math/rand"
"runtime/trace"
"sort"
@ -78,6 +79,38 @@ type SchemaAmender interface {
AmendTxn(ctx context.Context, startInfoSchema SchemaVer, change *RelatedSchemaChange, mutations CommitterMutations) (CommitterMutations, error)
}
type tempLockBufferEntry struct {
HasReturnValue bool
HasCheckExistence bool
Value tikv.ReturnedValue
}
// trySkipLockingOnRetry checks if the key can be skipped in an aggressive locking context, and performs necessary
// changes to the entry. Returns whether the key can be skipped.
func (e *tempLockBufferEntry) trySkipLockingOnRetry(returnValue bool, checkExistence bool) bool {
if e.Value.LockedWithConflictTS != 0 {
// Clear its LockedWithConflictTS field as if it's locked in normal way.
e.Value.LockedWithConflictTS = 0
} else {
// If we require more information than those we already got during last attempt, we need to lock it again.
if !e.HasReturnValue && returnValue {
return false
}
if !returnValue && !e.HasCheckExistence && checkExistence {
return false
}
}
if !returnValue {
e.HasReturnValue = false
e.Value.Value = nil
}
if !checkExistence {
e.HasCheckExistence = false
e.Value.Exists = true
}
return true
}
// TxnOptions indicates the option when beginning a transaction.
// TxnOptions are set by the TxnOption values passed to Begin
type TxnOptions struct {
@ -127,6 +160,9 @@ type KVTxn struct {
interceptor interceptor.RPCInterceptor
assertionLevel kvrpcpb.AssertionLevel
*util.RequestSource
aggressiveLockingContext *aggressiveLockingContext
aggressiveLockingDirty bool
}
// NewTiKVTxn creates a new KVTxn.
@ -197,7 +233,11 @@ func (txn *KVTxn) Set(k []byte, v []byte) error {
// String implements fmt.Stringer interface.
func (txn *KVTxn) String() string {
return fmt.Sprintf("%d", txn.StartTS())
res := fmt.Sprintf("%d", txn.StartTS())
if txn.aggressiveLockingContext != nil {
res += fmt.Sprintf(" (aggressiveLocking: prev %d keys, current %d keys)", len(txn.aggressiveLockingContext.lastRetryUnnecessaryLocks), len(txn.aggressiveLockingContext.currentLockedKeys))
}
return res
}
// Iter creates an Iterator positioned on the first entry that k <= entry's key.
@ -379,6 +419,13 @@ func (txn *KVTxn) Commit(ctx context.Context) error {
ctx = context.WithValue(ctx, util.RequestSourceKey, *txn.RequestSource)
if txn.aggressiveLockingContext != nil {
if len(txn.aggressiveLockingContext.currentLockedKeys) != 0 {
return errors.New("trying to commit transaction when aggressive locking is pending")
}
txn.CancelAggressiveLocking(ctx)
}
if val, err := util.EvalFailpoint("mockCommitError"); err == nil && val.(bool) {
if _, err := util.EvalFailpoint("mockCommitErrorOpt"); err == nil {
failpoint.Disable("tikvclient/mockCommitErrorOpt")
@ -424,7 +471,7 @@ func (txn *KVTxn) Commit(ctx context.Context) error {
initRegion.End()
if err != nil {
if txn.IsPessimistic() {
txn.asyncPessimisticRollback(ctx, committer.mutations.GetKeys())
txn.asyncPessimisticRollback(ctx, committer.mutations.GetKeys(), txn.committer.forUpdateTS)
}
return err
}
@ -494,6 +541,15 @@ func (txn *KVTxn) Rollback() error {
if !txn.valid {
return tikverr.ErrInvalidTxn
}
if txn.aggressiveLockingContext != nil {
if len(txn.aggressiveLockingContext.currentLockedKeys) != 0 {
txn.close()
return errors.New("trying to rollback transaction when aggressive locking is pending")
}
txn.CancelAggressiveLocking(context.Background())
}
start := time.Now()
// Clean up pessimistic lock.
if txn.IsPessimistic() && txn.committer != nil {
@ -594,6 +650,185 @@ func (txn *KVTxn) LockKeysWithWaitTime(ctx context.Context, lockWaitTime int64,
return txn.LockKeys(ctx, lockCtx, keysInput...)
}
// StartAggressiveLocking makes the transaction enters aggressive locking state.
//
// Aggressive locking refers to the behavior that when a DML in a pessimistic transaction encounters write conflict,
// do not pessimistic-rollback them immediately; instead, keep the already-acquired locks and retry the statement.
// In this way, during retry, if it needs to acquire the same locks that was acquired in the previous execution, the
// lock RPC can be skipped. After finishing the execution, if some of the locks that were acquired in the previous
// execution but not needed in the current retried execution, they will be released.
//
// In aggressive locking state, keys locked by `LockKeys` will be recorded to a separated buffer. For `LockKeys`
// invocations that involves only one key, the pessimistic lock request will be performed in ForceLock mode
// (kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock).
func (txn *KVTxn) StartAggressiveLocking() {
if txn.aggressiveLockingContext != nil {
panic("Trying to start aggressive locking while it's already started")
}
txn.aggressiveLockingContext = &aggressiveLockingContext{
lastRetryUnnecessaryLocks: nil,
currentLockedKeys: make(map[string]tempLockBufferEntry),
startTime: time.Now(),
}
}
// RetryAggressiveLocking tells the transaction that the current statement will be retried (ends the current attempt
// and starts the next attempt).
// If some keys are already locked during the aggressive locking, after calling this function, these locks will then be
// regarded as being acquired in the previous attempt.
// If some locks is acquired in the previous attempt but not needed in the current attempt, after calling this function,
// these locks will then be released.
func (txn *KVTxn) RetryAggressiveLocking(ctx context.Context) {
if txn.aggressiveLockingContext == nil {
panic("Trying to retry aggressive locking while it's not started")
}
txn.cleanupAggressiveLockingRedundantLocks(ctx)
if txn.aggressiveLockingContext.assignedPrimaryKey {
txn.resetPrimary()
txn.aggressiveLockingContext.assignedPrimaryKey = false
}
txn.aggressiveLockingContext.lastPrimaryKey = txn.aggressiveLockingContext.primaryKey
txn.aggressiveLockingContext.primaryKey = nil
txn.aggressiveLockingContext.lastAttemptStartTime = txn.aggressiveLockingContext.startTime
txn.aggressiveLockingContext.startTime = time.Now()
txn.aggressiveLockingContext.lastRetryUnnecessaryLocks = txn.aggressiveLockingContext.currentLockedKeys
txn.aggressiveLockingContext.currentLockedKeys = make(map[string]tempLockBufferEntry)
}
// CancelAggressiveLocking cancels the current aggressive locking state. All pessimistic locks that were acquired
// during the aggressive locking state will be rolled back by PessimisticRollback.
func (txn *KVTxn) CancelAggressiveLocking(ctx context.Context) {
if txn.aggressiveLockingContext == nil {
panic("Trying to cancel aggressive locking while it's not started")
}
txn.cleanupAggressiveLockingRedundantLocks(context.Background())
if txn.aggressiveLockingContext.assignedPrimaryKey {
txn.resetPrimary()
txn.aggressiveLockingContext.assignedPrimaryKey = false
}
keys := make([][]byte, 0, len(txn.aggressiveLockingContext.currentLockedKeys))
for key := range txn.aggressiveLockingContext.currentLockedKeys {
keys = append(keys, []byte(key))
}
if len(keys) != 0 {
// The committer must have been initialized if some keys are locked
forUpdateTS := txn.committer.forUpdateTS
if txn.aggressiveLockingContext.maxLockedWithConflictTS > forUpdateTS {
forUpdateTS = txn.aggressiveLockingContext.maxLockedWithConflictTS
}
txn.asyncPessimisticRollback(context.Background(), keys, forUpdateTS)
txn.lockedCnt -= len(keys)
}
txn.aggressiveLockingContext = nil
}
// DoneAggressiveLocking finishes the current aggressive locking. The locked keys will be moved to the membuffer as if
// these keys are locked in nomral way. If there's any unneeded locks, they will be released.
func (txn *KVTxn) DoneAggressiveLocking(ctx context.Context) {
if txn.aggressiveLockingContext == nil {
panic("Trying to finish aggressive locking while it's not started")
}
txn.cleanupAggressiveLockingRedundantLocks(context.Background())
memBuffer := txn.GetMemBuffer()
for key, entry := range txn.aggressiveLockingContext.currentLockedKeys {
setValExists := tikv.SetKeyLockedValueExists
if (entry.HasCheckExistence || entry.HasReturnValue) && !entry.Value.Exists {
setValExists = tikv.SetKeyLockedValueNotExists
}
memBuffer.UpdateFlags([]byte(key), tikv.SetKeyLocked, tikv.DelNeedCheckExists, setValExists)
}
if txn.aggressiveLockingContext.maxLockedWithConflictTS > 0 {
// There are some keys locked so the committer must have been created.
if txn.aggressiveLockingContext.maxLockedWithConflictTS > txn.committer.maxLockedWithConflictTS {
txn.committer.maxLockedWithConflictTS = txn.aggressiveLockingContext.maxLockedWithConflictTS
}
}
txn.aggressiveLockingContext = nil
}
// IsInAggressiveLockingMode checks if the transaction is currently in aggressive locking mode.
func (txn *KVTxn) IsInAggressiveLockingMode() bool {
return txn.aggressiveLockingContext != nil
}
// IsInAggressiveLockingStage checks if a key is locked during the current aggressive locking stage.
func (txn *KVTxn) IsInAggressiveLockingStage(key []byte) bool {
if txn.aggressiveLockingContext != nil {
_, ok := txn.aggressiveLockingContext.currentLockedKeys[string(key)]
return ok
}
return false
}
func (txn *KVTxn) mayAggressiveLockingLastLockedKeysExpire() bool {
ttl := atomic.LoadUint64(&ManagedLockTTL)
return ttl <= math.MaxInt64 &&
time.Since(txn.aggressiveLockingContext.lastAttemptStartTime).Milliseconds() >= int64(ttl)
}
func (txn *KVTxn) cleanupAggressiveLockingRedundantLocks(ctx context.Context) {
if len(txn.aggressiveLockingContext.lastRetryUnnecessaryLocks) == 0 {
return
}
keys := make([][]byte, 0, len(txn.aggressiveLockingContext.lastRetryUnnecessaryLocks))
for keyStr := range txn.aggressiveLockingContext.lastRetryUnnecessaryLocks {
key := []byte(keyStr)
keys = append(keys, key)
}
if len(keys) != 0 {
// The committer must have been initialized if some keys are locked
forUpdateTS := txn.committer.forUpdateTS
if txn.aggressiveLockingContext.maxLockedWithConflictTS > forUpdateTS {
forUpdateTS = txn.aggressiveLockingContext.maxLockedWithConflictTS
}
txn.asyncPessimisticRollback(ctx, keys, forUpdateTS)
txn.lockedCnt -= len(keys)
}
}
func (txn *KVTxn) filterAggressiveLockedKeys(lockCtx *tikv.LockCtx, allKeys [][]byte) ([][]byte, error) {
// In aggressive locking mode, we can skip locking if all of these conditions are met:
// * The primary is unchanged during the current aggressive locking (which means primary is already set
// before the current aggressive locking or the selected primary is the same as that selected during the
// previous attempt).
// * The key is already locked in the previous attempt.
// * The time since last attempt is short enough so that the locks we acquired during last attempt is
// unlikely to be resolved by other transactions.
// In case primary is not assigned in this phase, or primary is already set but unchanged, we don't need
// to update the locks.
canTrySkip := !txn.aggressiveLockingContext.assignedPrimaryKey || bytes.Equal(txn.aggressiveLockingContext.lastPrimaryKey, txn.aggressiveLockingContext.primaryKey)
// Do not preallocate since in most cases the keys need to lock doesn't change during pessimistic-retry.
keys := make([][]byte, 0)
for _, k := range allKeys {
keyStr := string(k)
if lastResult, ok := txn.aggressiveLockingContext.lastRetryUnnecessaryLocks[keyStr]; ok {
if lockCtx.ForUpdateTS < lastResult.Value.LockedWithConflictTS {
// This should be an unreachable path.
return nil, errors.Errorf("Txn %v Retrying aggressive locking with ForUpdateTS (%v) less than previous LockedWithConflictTS (%v)", txn.StartTS(), lockCtx.ForUpdateTS, lastResult.Value.LockedWithConflictTS)
}
delete(txn.aggressiveLockingContext.lastRetryUnnecessaryLocks, keyStr)
if canTrySkip &&
lastResult.trySkipLockingOnRetry(lockCtx.ReturnValues, lockCtx.CheckExistence) &&
!txn.mayAggressiveLockingLastLockedKeysExpire() {
// We can skip locking it since it's already locked during last attempt to aggressive locking, and
// we already have the information that we need.
lockCtx.Values[keyStr] = lastResult.Value
txn.aggressiveLockingContext.currentLockedKeys[keyStr] = lastResult
continue
}
}
keys = append(keys, k)
}
return keys, nil
}
// LockKeys tries to lock the entries with the keys in KV store.
// lockCtx is the context for lock, lockCtx.lockWaitTime in ms
func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput ...[]byte) error {
@ -635,6 +870,10 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput
}
}()
if !txn.IsPessimistic() && txn.aggressiveLockingContext != nil {
return errors.New("trying to perform aggressive locking in optimistic transaction")
}
memBuf := txn.us.GetMemBuffer()
// Avoid data race with concurrent updates to the memBuf
memBuf.RLock()
@ -646,6 +885,14 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput
valueExist = flags.HasLockedValueExists()
checkKeyExists = flags.HasNeedCheckExists()
}
// If the key is locked in the current aggressive locking stage, override the information in memBuf.
if txn.aggressiveLockingContext != nil {
if entry, ok := txn.aggressiveLockingContext.currentLockedKeys[string(key)]; ok {
locked = true
valueExist = entry.Value.Exists
}
}
if !locked {
keys = append(keys, key)
} else if txn.IsPessimistic() {
@ -657,9 +904,10 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput
}
}
if lockCtx.ReturnValues && locked {
keyStr := string(key)
// An already locked key can not return values, we add an entry to let the caller get the value
// in other ways.
lockCtx.Values[string(key)] = tikv.ReturnedValue{AlreadyLocked: true}
lockCtx.Values[keyStr] = tikv.ReturnedValue{AlreadyLocked: true}
}
}
memBuf.RUnlock()
@ -704,19 +952,40 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput
}
}
if txn.committer.primaryKey == nil {
txn.committer.primaryKey = keys[0]
assignedPrimaryKey = true
txn.selectPrimaryForPessimisticLock(keys)
}
txn.committer.forUpdateTS = lockCtx.ForUpdateTS
allKeys := keys
// If aggressive locking is enabled and we don't need to update the primary for all locks, we can avoid sending
// RPC to those already locked keys.
if txn.aggressiveLockingContext != nil {
keys, err = txn.filterAggressiveLockedKeys(lockCtx, allKeys)
if err != nil {
return err
}
if len(keys) == 0 {
return nil
}
}
lockWaitMode := kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeNormal
if txn.aggressiveLockingContext != nil && len(keys) == 1 && !lockCtx.LockOnlyIfExists {
lockWaitMode = kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock
}
lockCtx.Stats = &util.LockKeysDetails{
LockKeys: int32(len(keys)),
ResolveLock: util.ResolveLockDetail{},
}
bo := retry.NewBackofferWithVars(ctx, pessimisticLockMaxBackoff, txn.vars)
txn.committer.forUpdateTS = lockCtx.ForUpdateTS
// If the number of keys greater than 1, it can be on different region,
// concurrently execute on multiple regions may lead to deadlock.
txn.committer.isFirstLock = txn.lockedCnt == 0 && len(keys) == 1
err = txn.committer.pessimisticLockMutations(bo, lockCtx, &PlainMutations{keys: keys})
err = txn.committer.pessimisticLockMutations(bo, lockCtx, lockWaitMode, &PlainMutations{keys: keys})
if lockCtx.Stats != nil && bo.GetTotalSleep() > 0 {
atomic.AddInt64(&lockCtx.Stats.BackoffTime, int64(bo.GetTotalSleep())*int64(time.Millisecond))
lockCtx.Stats.Mu.Lock()
@ -729,6 +998,11 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput
// We need to reset the killed flag here.
atomic.CompareAndSwapUint32(lockCtx.Killed, 1, 0)
}
if txn.aggressiveLockingContext != nil {
if txn.aggressiveLockingContext.maxLockedWithConflictTS < lockCtx.MaxLockedWithConflictTS {
txn.aggressiveLockingContext.maxLockedWithConflictTS = lockCtx.MaxLockedWithConflictTS
}
}
if err != nil {
var unmarkKeys [][]byte
// Avoid data race with concurrent updates to the memBuf
@ -756,7 +1030,20 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput
}
}
wg := txn.asyncPessimisticRollback(ctx, keys)
// TODO: It's possible that there are some locks successfully locked with conflict but the client didn't
// receive the response due to RPC error. In case the lost LockedWithConflictTS > any received
// LockedWithConflictTS, it might not be successfully rolled back here.
rollbackForUpdateTS := lockCtx.ForUpdateTS
if lockCtx.MaxLockedWithConflictTS > rollbackForUpdateTS {
rollbackForUpdateTS = lockCtx.MaxLockedWithConflictTS
}
wg := txn.asyncPessimisticRollback(ctx, allKeys, rollbackForUpdateTS)
txn.lockedCnt -= len(allKeys) - len(keys)
if txn.aggressiveLockingContext != nil {
for _, k := range allKeys {
delete(txn.aggressiveLockingContext.currentLockedKeys, string(k))
}
}
if isDeadlock {
logutil.Logger(ctx).Debug("deadlock error received", zap.Uint64("startTS", txn.startTS), zap.Stringer("deadlockInfo", dl))
@ -773,8 +1060,7 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput
}
if assignedPrimaryKey {
// unset the primary key and stop heartbeat if we assigned primary key when failed to lock it.
txn.committer.primaryKey = nil
txn.committer.ttlManager.reset()
txn.resetPrimary()
}
return err
}
@ -791,30 +1077,88 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput
}
skipedLockKeys := 0
for _, key := range keys {
valExists := tikv.SetKeyLockedValueExists
valExists := true
// PointGet and BatchPointGet will return value in pessimistic lock response, the value may not exist.
// For other lock modes, the locked key values always exist.
if lockCtx.ReturnValues || checkedExistence {
// If ReturnValue is disabled and CheckExistence is requested, it's still possible that the TiKV's version
// is too old and CheckExistence is not supported.
if val, ok := lockCtx.Values[string(key)]; ok {
// TODO: Check if it's safe to use `val.Exists` instead of assuming empty value.
keyStr := string(key)
var val tikv.ReturnedValue
var ok bool
if val, ok = lockCtx.Values[keyStr]; ok {
if lockCtx.ReturnValues || checkedExistence || val.LockedWithConflictTS != 0 {
if !val.Exists {
valExists = tikv.SetKeyLockedValueNotExists
valExists = false
}
}
}
if lockCtx.LockOnlyIfExists && valExists == tikv.SetKeyLockedValueNotExists {
skipedLockKeys++
continue
if txn.aggressiveLockingContext != nil {
txn.aggressiveLockingContext.currentLockedKeys[keyStr] = tempLockBufferEntry{
HasReturnValue: lockCtx.ReturnValues,
HasCheckExistence: lockCtx.CheckExistence,
Value: val,
}
txn.aggressiveLockingDirty = true
} else {
setValExists := tikv.SetKeyLockedValueExists
if !valExists {
setValExists = tikv.SetKeyLockedValueNotExists
}
// TODO: Fix the calculation when aggressive-locking is active
if lockCtx.LockOnlyIfExists && !valExists {
skipedLockKeys++
continue
}
memBuf.UpdateFlags(key, tikv.SetKeyLocked, tikv.DelNeedCheckExists, setValExists)
}
memBuf.UpdateFlags(key, tikv.SetKeyLocked, tikv.DelNeedCheckExists, valExists)
}
txn.lockedCnt += len(keys) - skipedLockKeys
return nil
}
func (txn *KVTxn) resetPrimary() {
txn.committer.primaryKey = nil
txn.committer.ttlManager.reset()
}
func (txn *KVTxn) selectPrimaryForPessimisticLock(sortedKeys [][]byte) {
if txn.aggressiveLockingContext != nil {
lastPrimaryKey := txn.aggressiveLockingContext.lastPrimaryKey
if lastPrimaryKey != nil {
foundIdx := sort.Search(len(sortedKeys), func(i int) bool {
return bytes.Compare(sortedKeys[i], lastPrimaryKey) >= 0
})
if foundIdx < len(sortedKeys) && bytes.Equal(sortedKeys[foundIdx], lastPrimaryKey) {
// The last selected primary is included in the current list of keys we are going to lock. Select it
// as the primary.
txn.committer.primaryKey = sortedKeys[foundIdx]
txn.aggressiveLockingContext.assignedPrimaryKey = true
txn.aggressiveLockingContext.primaryKey = sortedKeys[foundIdx]
} else {
txn.committer.primaryKey = sortedKeys[0]
txn.aggressiveLockingContext.assignedPrimaryKey = true
txn.aggressiveLockingContext.primaryKey = sortedKeys[0]
}
} else {
txn.committer.primaryKey = sortedKeys[0]
txn.aggressiveLockingContext.assignedPrimaryKey = true
txn.aggressiveLockingContext.primaryKey = sortedKeys[0]
}
} else {
txn.committer.primaryKey = sortedKeys[0]
}
}
type aggressiveLockingContext struct {
lastRetryUnnecessaryLocks map[string]tempLockBufferEntry
lastPrimaryKey []byte
lastAttemptStartTime time.Time
currentLockedKeys map[string]tempLockBufferEntry
maxLockedWithConflictTS uint64
assignedPrimaryKey bool
primaryKey []byte
startTime time.Time
}
// unsetPrimaryKeyIfNeed is used to unset primary key of the transaction after performing LockOnlyIfExists.
// When locking only one key with LockOnlyIfExists flag, the key will be selected as primary if
// it's the first lock of the transaction. If the key doesn't exist on TiKV, the key won't be
@ -850,7 +1194,10 @@ func deduplicateKeys(keys [][]byte) [][]byte {
const pessimisticRollbackMaxBackoff = 20000
func (txn *KVTxn) asyncPessimisticRollback(ctx context.Context, keys [][]byte) *sync.WaitGroup {
// asyncPessimisticRollback rollbacks pessimistic locks of the current transaction on the specified keys asynchronously.
// Pessimistic locks on specified keys with its forUpdateTS <= specifiedForUpdateTS will be unlocked. If 0 is passed
// to specifiedForUpdateTS, the current forUpdateTS of the current transaction will be used.
func (txn *KVTxn) asyncPessimisticRollback(ctx context.Context, keys [][]byte, specifiedForUpdateTS uint64) *sync.WaitGroup {
// Clone a new committer for execute in background.
committer := &twoPhaseCommitter{
store: txn.committer.store,
@ -859,6 +1206,9 @@ func (txn *KVTxn) asyncPessimisticRollback(ctx context.Context, keys [][]byte) *
forUpdateTS: txn.committer.forUpdateTS,
primaryKey: txn.committer.primaryKey,
}
if specifiedForUpdateTS > committer.forUpdateTS {
committer.forUpdateTS = specifiedForUpdateTS
}
wg := new(sync.WaitGroup)
wg.Add(1)
go func() {
@ -898,7 +1248,7 @@ func hashInKeys(deadlockKeyHash uint64, keys [][]byte) bool {
// IsReadOnly checks if the transaction has only performed read operations.
func (txn *KVTxn) IsReadOnly() bool {
return !txn.us.GetMemBuffer().Dirty()
return !(txn.us.GetMemBuffer().Dirty() || txn.aggressiveLockingDirty)
}
// StartTS returns the transaction start timestamp.