From d7000ea557732c92463c4d9c2ecf96b15fdab7b3 Mon Sep 17 00:00:00 2001 From: ekexium Date: Thu, 14 Mar 2024 10:54:19 +0800 Subject: [PATCH] feat: pipelined dml has its own max-txn-ttl of 24 hours (#1224) * feat: pipelined dml has its own max-txn-ttl of 24 hours Signed-off-by: ekexium * fix gh actions Signed-off-by: ekexium * style: fix lint Signed-off-by: ekexium --------- Signed-off-by: ekexium --- .github/workflows/test.yml | 1 + txnkv/transaction/2pc.go | 18 +++++++++++++----- txnkv/transaction/pessimistic.go | 4 ++-- txnkv/transaction/pipelined_flush.go | 2 +- txnkv/transaction/prewrite.go | 2 +- 5 files changed, 18 insertions(+), 9 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index f6bd35b3..1c2428c9 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -48,3 +48,4 @@ jobs: uses: golangci/golangci-lint-action@v3 with: version: v1.55.2 + skip-pkg-cache: true diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index 7c1a13e5..824a45de 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -80,7 +80,8 @@ type twoPhaseCommitAction interface { // Global variable set by config file. var ( - ManagedLockTTL uint64 = 20000 // 20s + ManagedLockTTL uint64 = 20000 // 20s + MaxPipelinedTxnTTL uint64 = 24 * 60 * 60 * 1000 // 24h ) var ( @@ -1145,7 +1146,7 @@ type ttlManager struct { lockCtx *kv.LockCtx } -func (tm *ttlManager) run(c *twoPhaseCommitter, lockCtx *kv.LockCtx) { +func (tm *ttlManager) run(c *twoPhaseCommitter, lockCtx *kv.LockCtx, isPipelinedTxn bool) { if _, err := util.EvalFailpoint("doNotKeepAlive"); err == nil { return } @@ -1157,7 +1158,7 @@ func (tm *ttlManager) run(c *twoPhaseCommitter, lockCtx *kv.LockCtx) { tm.ch = make(chan struct{}) tm.lockCtx = lockCtx - go keepAlive(c, tm.ch, c.primary(), lockCtx) + go keepAlive(c, tm.ch, c.primary(), lockCtx, isPipelinedTxn) } func (tm *ttlManager) close() { @@ -1178,7 +1179,10 @@ const keepAliveMaxBackoff = 20000 const pessimisticLockMaxBackoff = 20000 const maxConsecutiveFailure = 10 -func keepAlive(c *twoPhaseCommitter, closeCh chan struct{}, primaryKey []byte, lockCtx *kv.LockCtx) { +func keepAlive( + c *twoPhaseCommitter, closeCh chan struct{}, primaryKey []byte, + lockCtx *kv.LockCtx, isPipelinedTxn bool, +) { // Ticker is set to 1/2 of the ManagedLockTTL. ticker := time.NewTicker(time.Duration(atomic.LoadUint64(&ManagedLockTTL)) * time.Millisecond / 2) defer ticker.Stop() @@ -1205,7 +1209,11 @@ func keepAlive(c *twoPhaseCommitter, closeCh chan struct{}, primaryKey []byte, l } uptime := uint64(oracle.ExtractPhysical(now) - oracle.ExtractPhysical(c.startTS)) - if uptime > config.GetGlobalConfig().MaxTxnTTL { + maxTtl := config.GetGlobalConfig().MaxTxnTTL + if isPipelinedTxn { + maxTtl = max(maxTtl, MaxPipelinedTxnTTL) + } + if uptime > maxTtl { // Checks maximum lifetime for the ttlManager, so when something goes wrong // the key will not be locked forever. logutil.Logger(bo.GetCtx()).Info("ttlManager live up to its lifetime", diff --git a/txnkv/transaction/pessimistic.go b/txnkv/transaction/pessimistic.go index c1d9b95a..ac6050dd 100644 --- a/txnkv/transaction/pessimistic.go +++ b/txnkv/transaction/pessimistic.go @@ -313,7 +313,7 @@ func (action actionPessimisticLock) handlePessimisticLockResponseNormalMode( if batch.isPrimary { // After locking the primary key, we should protect the primary lock from expiring // now in case locking the remaining keys take a long time. - c.run(c, action.LockCtx) + c.run(c, action.LockCtx, false) } // Handle the case that the TiKV's version is too old and doesn't support `CheckExistence`. @@ -412,7 +412,7 @@ func (action actionPessimisticLock) handlePessimisticLockResponseForceLockMode( len(lockResp.Results) > 0 && lockResp.Results[0].Type != kvrpcpb.PessimisticLockKeyResultType_LockResultFailed { // After locking the primary key, we should protect the primary lock from expiring. - c.run(c, action.LockCtx) + c.run(c, action.LockCtx, false) } if len(lockResp.Results) > 0 { diff --git a/txnkv/transaction/pipelined_flush.go b/txnkv/transaction/pipelined_flush.go index 76464dab..fade8a79 100644 --- a/txnkv/transaction/pipelined_flush.go +++ b/txnkv/transaction/pipelined_flush.go @@ -198,7 +198,7 @@ func (action actionPipelinedFlush) handleSingleBatch( if batch.isPrimary { // start keepalive after primary key is written. - c.run(c, nil) + c.run(c, nil, true) } return nil } diff --git a/txnkv/transaction/prewrite.go b/txnkv/transaction/prewrite.go index 7e563065..ba6e4157 100644 --- a/txnkv/transaction/prewrite.go +++ b/txnkv/transaction/prewrite.go @@ -370,7 +370,7 @@ func (action actionPrewrite) handleSingleBatch( // In this case 1PC is not expected to be used, but still check it for safety. if int64(c.txnSize) > config.GetGlobalConfig().TiKVClient.TTLRefreshedTxnSize && prewriteResp.OnePcCommitTs == 0 { - c.run(c, nil) + c.run(c, nil, false) } }