mirror of https://github.com/tikv/client-go.git
Update lock ttl when retry to pessimistic lock keys (#417)
Signed-off-by: longfangsong <longfangsong@icloud.com>
This commit is contained in:
parent
7e34d88af3
commit
cd7e7681c2
|
|
@ -1345,6 +1345,55 @@ func (s *testCommitterSuite) TestAsyncCommit() {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *testCommitterSuite) TestRetryPushTTL() {
|
||||||
|
ctx := context.Background()
|
||||||
|
k := []byte("a")
|
||||||
|
|
||||||
|
txn1 := s.begin()
|
||||||
|
txn1.SetPessimistic(true)
|
||||||
|
// txn1 lock k
|
||||||
|
lockCtx := &kv.LockCtx{ForUpdateTS: txn1.StartTS(), WaitStartTime: time.Now()}
|
||||||
|
err := txn1.LockKeys(ctx, lockCtx, k)
|
||||||
|
s.Nil(err)
|
||||||
|
txn2 := s.begin()
|
||||||
|
txn2.SetPessimistic(true)
|
||||||
|
txn2GotLock := make(chan struct{})
|
||||||
|
txn3GotLock := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
// txn2 tries to lock k, will blocked by txn1
|
||||||
|
lockCtx := &kv.LockCtx{ForUpdateTS: txn2.StartTS(), WaitStartTime: time.Now()}
|
||||||
|
// after txn1 rolled back, txn2 should acquire its lock successfully
|
||||||
|
// with the **latest** ttl
|
||||||
|
err := txn2.LockKeys(ctx, lockCtx, k)
|
||||||
|
s.Nil(err)
|
||||||
|
txn2GotLock <- struct{}{}
|
||||||
|
}()
|
||||||
|
time.Sleep(time.Second * 2)
|
||||||
|
txn1.Rollback()
|
||||||
|
<-txn2GotLock
|
||||||
|
txn3 := s.begin()
|
||||||
|
txn3.SetPessimistic(true)
|
||||||
|
lockCtx = &kv.LockCtx{ForUpdateTS: txn3.StartTS(), WaitStartTime: time.Now()}
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
// if txn2 use the old ttl calculation method, here txn3 can resolve its lock and
|
||||||
|
// get lock successfully here, which is not expected behavior
|
||||||
|
txn3.LockKeys(ctx, lockCtx, k)
|
||||||
|
txn3GotLock <- struct{}{}
|
||||||
|
txn3.Rollback()
|
||||||
|
done <- struct{}{}
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case <-txn3GotLock:
|
||||||
|
s.Fail("txn3 should not get lock at this time")
|
||||||
|
case <-time.After(time.Second * 2):
|
||||||
|
break
|
||||||
|
}
|
||||||
|
txn2.Rollback()
|
||||||
|
<-txn3GotLock
|
||||||
|
<-done
|
||||||
|
}
|
||||||
|
|
||||||
func updateGlobalConfig(f func(conf *config.Config)) {
|
func updateGlobalConfig(f func(conf *config.Config)) {
|
||||||
g := config.GetGlobalConfig()
|
g := config.GetGlobalConfig()
|
||||||
newConf := *g
|
newConf := *g
|
||||||
|
|
|
||||||
|
|
@ -96,23 +96,11 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
|
||||||
}
|
}
|
||||||
mutations[i] = mut
|
mutations[i] = mut
|
||||||
}
|
}
|
||||||
elapsed := uint64(time.Since(c.txn.startTime) / time.Millisecond)
|
|
||||||
ttl := elapsed + atomic.LoadUint64(&ManagedLockTTL)
|
|
||||||
if _, err := util.EvalFailpoint("shortPessimisticLockTTL"); err == nil {
|
|
||||||
ttl = 1
|
|
||||||
keys := make([]string, 0, len(mutations))
|
|
||||||
for _, m := range mutations {
|
|
||||||
keys = append(keys, hex.EncodeToString(m.Key))
|
|
||||||
}
|
|
||||||
logutil.BgLogger().Info("[failpoint] injected lock ttl = 1 on pessimistic lock",
|
|
||||||
zap.Uint64("txnStartTS", c.startTS), zap.Strings("keys", keys))
|
|
||||||
}
|
|
||||||
req := tikvrpc.NewRequest(tikvrpc.CmdPessimisticLock, &kvrpcpb.PessimisticLockRequest{
|
req := tikvrpc.NewRequest(tikvrpc.CmdPessimisticLock, &kvrpcpb.PessimisticLockRequest{
|
||||||
Mutations: mutations,
|
Mutations: mutations,
|
||||||
PrimaryLock: c.primary(),
|
PrimaryLock: c.primary(),
|
||||||
StartVersion: c.startTS,
|
StartVersion: c.startTS,
|
||||||
ForUpdateTs: c.forUpdateTS,
|
ForUpdateTs: c.forUpdateTS,
|
||||||
LockTtl: ttl,
|
|
||||||
IsFirstLock: c.isFirstLock,
|
IsFirstLock: c.isFirstLock,
|
||||||
WaitTimeout: action.LockWaitTime(),
|
WaitTimeout: action.LockWaitTime(),
|
||||||
ReturnValues: action.ReturnValues,
|
ReturnValues: action.ReturnValues,
|
||||||
|
|
@ -134,6 +122,18 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
|
||||||
req.PessimisticLock().WaitTimeout = timeLeft
|
req.PessimisticLock().WaitTimeout = timeLeft
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
elapsed := uint64(time.Since(c.txn.startTime) / time.Millisecond)
|
||||||
|
ttl := elapsed + atomic.LoadUint64(&ManagedLockTTL)
|
||||||
|
if _, err := util.EvalFailpoint("shortPessimisticLockTTL"); err == nil {
|
||||||
|
ttl = 1
|
||||||
|
keys := make([]string, 0, len(mutations))
|
||||||
|
for _, m := range mutations {
|
||||||
|
keys = append(keys, hex.EncodeToString(m.Key))
|
||||||
|
}
|
||||||
|
logutil.BgLogger().Info("[failpoint] injected lock ttl = 1 on pessimistic lock",
|
||||||
|
zap.Uint64("txnStartTS", c.startTS), zap.Strings("keys", keys))
|
||||||
|
}
|
||||||
|
req.PessimisticLock().LockTtl = ttl
|
||||||
if _, err := util.EvalFailpoint("PessimisticLockErrWriteConflict"); err == nil {
|
if _, err := util.EvalFailpoint("PessimisticLockErrWriteConflict"); err == nil {
|
||||||
time.Sleep(300 * time.Millisecond)
|
time.Sleep(300 * time.Millisecond)
|
||||||
return errors.WithStack(&tikverr.ErrWriteConflict{WriteConflict: nil})
|
return errors.WithStack(&tikverr.ErrWriteConflict{WriteConflict: nil})
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue