feat: pipelined dml has its own max-txn-ttl of 24 hours (#1224)

* feat: pipelined dml has its own max-txn-ttl of 24 hours

Signed-off-by: ekexium <eke@fastmail.com>

* fix gh actions

Signed-off-by: ekexium <eke@fastmail.com>

* style: fix lint

Signed-off-by: ekexium <eke@fastmail.com>

---------

Signed-off-by: ekexium <eke@fastmail.com>
This commit is contained in:
ekexium 2024-03-14 10:54:19 +08:00 committed by GitHub
parent d59fea5757
commit d7000ea557
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 18 additions and 9 deletions

View File

@ -48,3 +48,4 @@ jobs:
uses: golangci/golangci-lint-action@v3
with:
version: v1.55.2
skip-pkg-cache: true

View File

@ -81,6 +81,7 @@ type twoPhaseCommitAction interface {
// Global variable set by config file.
var (
ManagedLockTTL uint64 = 20000 // 20s
MaxPipelinedTxnTTL uint64 = 24 * 60 * 60 * 1000 // 24h
)
var (
@ -1145,7 +1146,7 @@ type ttlManager struct {
lockCtx *kv.LockCtx
}
func (tm *ttlManager) run(c *twoPhaseCommitter, lockCtx *kv.LockCtx) {
func (tm *ttlManager) run(c *twoPhaseCommitter, lockCtx *kv.LockCtx, isPipelinedTxn bool) {
if _, err := util.EvalFailpoint("doNotKeepAlive"); err == nil {
return
}
@ -1157,7 +1158,7 @@ func (tm *ttlManager) run(c *twoPhaseCommitter, lockCtx *kv.LockCtx) {
tm.ch = make(chan struct{})
tm.lockCtx = lockCtx
go keepAlive(c, tm.ch, c.primary(), lockCtx)
go keepAlive(c, tm.ch, c.primary(), lockCtx, isPipelinedTxn)
}
func (tm *ttlManager) close() {
@ -1178,7 +1179,10 @@ const keepAliveMaxBackoff = 20000
const pessimisticLockMaxBackoff = 20000
const maxConsecutiveFailure = 10
func keepAlive(c *twoPhaseCommitter, closeCh chan struct{}, primaryKey []byte, lockCtx *kv.LockCtx) {
func keepAlive(
c *twoPhaseCommitter, closeCh chan struct{}, primaryKey []byte,
lockCtx *kv.LockCtx, isPipelinedTxn bool,
) {
// Ticker is set to 1/2 of the ManagedLockTTL.
ticker := time.NewTicker(time.Duration(atomic.LoadUint64(&ManagedLockTTL)) * time.Millisecond / 2)
defer ticker.Stop()
@ -1205,7 +1209,11 @@ func keepAlive(c *twoPhaseCommitter, closeCh chan struct{}, primaryKey []byte, l
}
uptime := uint64(oracle.ExtractPhysical(now) - oracle.ExtractPhysical(c.startTS))
if uptime > config.GetGlobalConfig().MaxTxnTTL {
maxTtl := config.GetGlobalConfig().MaxTxnTTL
if isPipelinedTxn {
maxTtl = max(maxTtl, MaxPipelinedTxnTTL)
}
if uptime > maxTtl {
// Checks maximum lifetime for the ttlManager, so when something goes wrong
// the key will not be locked forever.
logutil.Logger(bo.GetCtx()).Info("ttlManager live up to its lifetime",

View File

@ -313,7 +313,7 @@ func (action actionPessimisticLock) handlePessimisticLockResponseNormalMode(
if batch.isPrimary {
// After locking the primary key, we should protect the primary lock from expiring
// now in case locking the remaining keys take a long time.
c.run(c, action.LockCtx)
c.run(c, action.LockCtx, false)
}
// Handle the case that the TiKV's version is too old and doesn't support `CheckExistence`.
@ -412,7 +412,7 @@ func (action actionPessimisticLock) handlePessimisticLockResponseForceLockMode(
len(lockResp.Results) > 0 &&
lockResp.Results[0].Type != kvrpcpb.PessimisticLockKeyResultType_LockResultFailed {
// After locking the primary key, we should protect the primary lock from expiring.
c.run(c, action.LockCtx)
c.run(c, action.LockCtx, false)
}
if len(lockResp.Results) > 0 {

View File

@ -198,7 +198,7 @@ func (action actionPipelinedFlush) handleSingleBatch(
if batch.isPrimary {
// start keepalive after primary key is written.
c.run(c, nil)
c.run(c, nil, true)
}
return nil
}

View File

@ -370,7 +370,7 @@ func (action actionPrewrite) handleSingleBatch(
// In this case 1PC is not expected to be used, but still check it for safety.
if int64(c.txnSize) > config.GetGlobalConfig().TiKVClient.TTLRefreshedTxnSize &&
prewriteResp.OnePcCommitTs == 0 {
c.run(c, nil)
c.run(c, nil, false)
}
}