From 6f33dd97aff7cb42976453472686e9be7e9c3e98 Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Fri, 6 Aug 2021 11:14:25 +0800 Subject: [PATCH] Start heartbeat immediately after locking the primary key (#261) Signed-off-by: Yilin Chen --- integration_tests/lock_test.go | 58 ++++++++++++++++++++++++++++++++ txnkv/transaction/2pc.go | 37 +++++++++++--------- txnkv/transaction/pessimistic.go | 6 ++++ txnkv/transaction/txn.go | 6 ++-- 4 files changed, 87 insertions(+), 20 deletions(-) diff --git a/integration_tests/lock_test.go b/integration_tests/lock_test.go index 11681dbc..25641e2d 100644 --- a/integration_tests/lock_test.go +++ b/integration_tests/lock_test.go @@ -796,3 +796,61 @@ func (s *testLockSuite) TestDeadlockReportWaitChain() { waitAndRollback(txns, 0) waitAndRollback(txns, 2) } + +func (s *testLockSuite) TestStartHeartBeatAfterLockingPrimary() { + atomic.StoreUint64(&transaction.ManagedLockTTL, 500) + s.Nil(failpoint.Enable("tikvclient/twoPCRequestBatchSizeLimit", `return`)) + s.Nil(failpoint.Enable("tikvclient/afterPrimaryBatch", `pause`)) + defer func() { + atomic.StoreUint64(&transaction.ManagedLockTTL, 20000) + s.Nil(failpoint.Disable("tikvclient/twoPCRequestBatchSizeLimit")) + }() + + txn, err := s.store.Begin() + s.Nil(err) + txn.SetPessimistic(true) + lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} + lockResCh := make(chan error) + go func() { + err = txn.LockKeys(context.Background(), lockCtx, []byte("a"), []byte("b")) + lockResCh <- err + }() + + time.Sleep(500 * time.Millisecond) + + // Check the TTL should have been updated + lr := s.store.NewLockResolver() + status, err := lr.LockResolver.GetTxnStatus(txn.StartTS(), 0, []byte("a")) + s.Nil(err) + s.False(status.IsCommitted()) + s.Greater(status.TTL(), uint64(600)) + s.Equal(status.CommitTS(), uint64(0)) + + // Let locking the secondary key fail + s.Nil(failpoint.Enable("tikvclient/PessimisticLockErrWriteConflict", "return")) + s.Nil(failpoint.Disable("tikvclient/afterPrimaryBatch")) + s.Error(<-lockResCh) + s.Nil(failpoint.Disable("tikvclient/PessimisticLockErrWriteConflict")) + + err = txn.LockKeys(context.Background(), lockCtx, []byte("c"), []byte("d")) + s.Nil(err) + + time.Sleep(500 * time.Millisecond) + + // The original primary key "a" should be rolled back because its TTL is not updated + lr = s.store.NewLockResolver() + status, err = lr.LockResolver.GetTxnStatus(txn.StartTS(), 0, []byte("a")) + s.Nil(err) + s.False(status.IsCommitted()) + s.Equal(status.TTL(), uint64(0)) + + // The TTL of the new primary lock should be updated. + lr = s.store.NewLockResolver() + status, err = lr.LockResolver.GetTxnStatus(txn.StartTS(), 0, []byte("c")) + s.Nil(err) + s.False(status.IsCommitted()) + s.Greater(status.TTL(), uint64(1200)) + s.Equal(status.CommitTS(), uint64(0)) + + s.Nil(txn.Rollback()) +} diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index 7cb7452f..60839210 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -360,9 +360,6 @@ func newTwoPhaseCommitter(txn *KVTxn, sessionID uint64) (*twoPhaseCommitter, err startTS: txn.StartTS(), sessionID: sessionID, regionTxnSize: map[uint64]int{}, - ttlManager: ttlManager{ - ch: make(chan struct{}), - }, isPessimistic: txn.IsPessimistic(), binlog: txn.binlog, }, nil @@ -752,6 +749,8 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *retry.Backoffer, action } batchBuilder.forgetPrimary() } + util.EvalFailpoint("afterPrimaryBatch") + // Already spawned a goroutine for async commit transaction. if actionIsCommit && !actionCommit.retry && !c.isAsyncCommit() { secondaryBo := retry.NewBackofferWithVars(c.store.Ctx(), CommitSecondaryMaxBackoff, c.txn.vars) @@ -849,19 +848,18 @@ type ttlManager struct { } func (tm *ttlManager) run(c *twoPhaseCommitter, lockCtx *kv.LockCtx) { + if _, err := util.EvalFailpoint("doNotKeepAlive"); err == nil { + return + } + // Run only once. if !atomic.CompareAndSwapUint32((*uint32)(&tm.state), uint32(stateUninitialized), uint32(stateRunning)) { return } + tm.ch = make(chan struct{}) tm.lockCtx = lockCtx - noKeepAlive := false - if _, err := util.EvalFailpoint("doNotKeepAlive"); err == nil { - noKeepAlive = true - } - if !noKeepAlive { - go tm.keepAlive(c) - } + go keepAlive(c, tm.ch, c.primary(), lockCtx) } func (tm *ttlManager) close() { @@ -871,22 +869,29 @@ func (tm *ttlManager) close() { close(tm.ch) } +func (tm *ttlManager) reset() { + if !atomic.CompareAndSwapUint32((*uint32)(&tm.state), uint32(stateRunning), uint32(stateUninitialized)) { + return + } + close(tm.ch) +} + const keepAliveMaxBackoff = 20000 // 20 seconds const pessimisticLockMaxBackoff = 600000 // 10 minutes const maxConsecutiveFailure = 10 -func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) { +func keepAlive(c *twoPhaseCommitter, closeCh chan struct{}, primaryKey []byte, lockCtx *kv.LockCtx) { // Ticker is set to 1/2 of the ManagedLockTTL. ticker := time.NewTicker(time.Duration(atomic.LoadUint64(&ManagedLockTTL)) * time.Millisecond / 2) defer ticker.Stop() keepFail := 0 for { select { - case <-tm.ch: + case <-closeCh: return case <-ticker.C: // If kill signal is received, the ttlManager should exit. - if tm.lockCtx != nil && tm.lockCtx.Killed != nil && atomic.LoadUint32(tm.lockCtx.Killed) != 0 { + if lockCtx != nil && lockCtx.Killed != nil && atomic.LoadUint32(lockCtx.Killed) != 0 { return } bo := retry.NewBackofferWithVars(context.Background(), keepAliveMaxBackoff, c.txn.vars) @@ -908,8 +913,8 @@ func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) { metrics.TiKVTTLLifeTimeReachCounter.Inc() // the pessimistic locks may expire if the ttl manager has timed out, set `LockExpired` flag // so that this transaction could only commit or rollback with no more statement executions - if c.isPessimistic && tm.lockCtx != nil && tm.lockCtx.LockExpired != nil { - atomic.StoreUint32(tm.lockCtx.LockExpired, 1) + if c.isPessimistic && lockCtx != nil && lockCtx.LockExpired != nil { + atomic.StoreUint32(lockCtx.LockExpired, 1) } return } @@ -918,7 +923,7 @@ func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) { logutil.Logger(bo.GetCtx()).Info("send TxnHeartBeat", zap.Uint64("startTS", c.startTS), zap.Uint64("newTTL", newTTL)) startTime := time.Now() - _, stopHeartBeat, err := sendTxnHeartBeat(bo, c.store, c.primary(), c.startTS, newTTL) + _, stopHeartBeat, err := sendTxnHeartBeat(bo, c.store, primaryKey, c.startTS, newTTL) if err != nil { keepFail++ metrics.TxnHeartBeatHistogramError.Observe(time.Since(startTime).Seconds()) diff --git a/txnkv/transaction/pessimistic.go b/txnkv/transaction/pessimistic.go index b3298100..6bd0c905 100644 --- a/txnkv/transaction/pessimistic.go +++ b/txnkv/transaction/pessimistic.go @@ -170,6 +170,12 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * lockResp := resp.Resp.(*kvrpcpb.PessimisticLockResponse) keyErrs := lockResp.GetErrors() if len(keyErrs) == 0 { + if batch.isPrimary { + // After locking the primary key, we should protect the primary lock from expiring + // now in case locking the remaining keys take a long time. + c.run(c, action.LockCtx) + } + if action.ReturnValues { action.ValuesLock.Lock() for i, mutation := range mutations { diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 64076757..ce81288f 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -621,14 +621,12 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput } } if assignedPrimaryKey { - // unset the primary key if we assigned primary key when failed to lock it. + // unset the primary key and stop heartbeat if we assigned primary key when failed to lock it. txn.committer.primaryKey = nil + txn.committer.ttlManager.reset() } return err } - if assignedPrimaryKey { - txn.committer.ttlManager.run(txn.committer, lockCtx) - } } for _, key := range keys { valExists := tikv.SetKeyLockedValueExists