diff --git a/integration_tests/lock_test.go b/integration_tests/lock_test.go index 02660186..b2536320 100644 --- a/integration_tests/lock_test.go +++ b/integration_tests/lock_test.go @@ -1591,3 +1591,172 @@ func (s *testLockWithTiKVSuite) TestPessimisticRollbackWithRead() { err = txn1.Rollback() s.NoError(err) } + +func (s *testLockWithTiKVSuite) TestPessimisticLockMaxExecutionTime() { + if !*withTiKV { + s.T().Skip() + } + // This test covers the path where max_execution_time deadline is checked + // during pessimistic lock operations. + // Note: TiKV caps lock wait time at 1 second. + + ctx := context.Background() + k1 := []byte("lock_max_execution_time_k1") + k2 := []byte("lock_max_execution_time_k2") + + // Setup: Create a lock that will block our test transaction + txn1, err := s.store.Begin() + s.NoError(err) + txn1.SetPessimistic(true) + + // Lock k1 with txn1 to create blocking condition + lockCtx1 := kv.NewLockCtx(txn1.StartTS(), kv.LockAlwaysWait, time.Now()) + err = txn1.LockKeys(ctx, lockCtx1, k1) + s.NoError(err) + + // Test case 1: max_execution_time deadline already exceeded + txn2, err := s.store.Begin() + s.NoError(err) + txn2.SetPessimistic(true) + + // Set MaxExecutionDeadline to a time in the past + baseTime := time.Now() + lockCtx2 := kv.NewLockCtx(txn2.StartTS(), 800, baseTime) // 800ms lock wait + lockCtx2.MaxExecutionDeadline = baseTime.Add(-100 * time.Millisecond) // Already expired + + err = txn2.LockKeys(ctx, lockCtx2, k1) + s.Error(err) + + // Verify it's the correct max_execution_time error + var queryInterruptedErr tikverr.ErrQueryInterruptedWithSignal + s.ErrorAs(err, &queryInterruptedErr) + s.Equal(uint32(transaction.MaxExecTimeExceededSignal), queryInterruptedErr.Signal) + + // Test case 2: max_execution_time deadline limits lock wait time + // max_execution_time (200ms) < lock wait time (800ms) < TiKV limit (1000ms) + txn3, err := s.store.Begin() + s.NoError(err) + txn3.SetPessimistic(true) + + startTime := time.Now() + lockCtx3 := kv.NewLockCtx(txn3.StartTS(), 800, startTime) // 800ms lock wait + lockCtx3.MaxExecutionDeadline = startTime.Add(200 * time.Millisecond) // But max exec time only 200ms + + err = txn3.LockKeys(ctx, lockCtx3, k1) + elapsed := time.Since(startTime) + + s.Error(err) + s.ErrorAs(err, &queryInterruptedErr) + s.Equal(uint32(transaction.MaxExecTimeExceededSignal), queryInterruptedErr.Signal) + + // Should timeout around 200ms, not 800ms + s.Greater(elapsed, 190*time.Millisecond) + s.Less(elapsed, 400*time.Millisecond) + + // Test case 3: lock wait timeout shorter than max_execution_time + // lock wait time (150ms) < max_execution_time (600ms) < TiKV limit (1000ms) + txn4, err := s.store.Begin() + s.NoError(err) + txn4.SetPessimistic(true) + + startTime = time.Now() + lockCtx4 := kv.NewLockCtx(txn4.StartTS(), 150, startTime) // 150ms lock wait + lockCtx4.MaxExecutionDeadline = startTime.Add(600 * time.Millisecond) // 600ms max exec time + + err = txn4.LockKeys(ctx, lockCtx4, k1) + elapsed = time.Since(startTime) + + s.Error(err) + // Should be lock wait timeout, not max execution time error + s.Equal(tikverr.ErrLockWaitTimeout.Error(), err.Error()) + + // Should timeout around 150ms + s.Greater(elapsed, 120*time.Millisecond) + s.Less(elapsed, 300*time.Millisecond) + + // Test case 4: TiKV limit is the constraint + // max_execution_time (1200ms) > TiKV limit (1000ms) > lock wait time (900ms) + txn5, err := s.store.Begin() + s.NoError(err) + txn5.SetPessimistic(true) + + startTime = time.Now() + lockCtx5 := kv.NewLockCtx(txn5.StartTS(), 900, startTime) // 900ms lock wait + lockCtx5.MaxExecutionDeadline = startTime.Add(1200 * time.Millisecond) // 1200ms max exec time + + err = txn5.LockKeys(ctx, lockCtx5, k1) + elapsed = time.Since(startTime) + + s.Error(err) + // Should be lock wait timeout due to TiKV's 1-second cap, not max execution time + s.Equal(tikverr.ErrLockWaitTimeout.Error(), err.Error()) + + // Should timeout around 900ms (limited by requested lock wait time) + s.Greater(elapsed, 800*time.Millisecond) + s.Less(elapsed, 1200*time.Millisecond) + + // Test case 5: No max_execution_time deadline (normal behavior) + txn6, err := s.store.Begin() + s.NoError(err) + txn6.SetPessimistic(true) + + lockCtx6 := kv.NewLockCtx(txn6.StartTS(), kv.LockNoWait, time.Now()) + // MaxExecutionDeadline defaults to zero time (no deadline) + + err = txn6.LockKeys(ctx, lockCtx6, k1) + s.Error(err) + // Should be immediate lock acquisition failure, not max execution time error + s.Equal(tikverr.ErrLockAcquireFailAndNoWaitSet.Error(), err.Error()) + + // Cleanup: Unlock k1 and test successful lock with max_execution_time + s.NoError(txn1.Rollback()) + + // Test case 6: Successful lock acquisition with max_execution_time set + txn7, err := s.store.Begin() + s.NoError(err) + txn7.SetPessimistic(true) + + lockCtx7 := kv.NewLockCtx(txn7.StartTS(), kv.LockAlwaysWait, time.Now()) + lockCtx7.MaxExecutionDeadline = time.Now().Add(100 * time.Millisecond) + + err = txn7.LockKeys(ctx, lockCtx7, k2) // k2 is not locked + s.NoError(err) // Should succeed + + // Additional coverage: max_execution_time timeout should skip lock resolution. + blockerTxn, err := s.store.Begin() + s.NoError(err) + blockerTxn.SetPessimistic(true) + lockCtxBlocker := kv.NewLockCtx(blockerTxn.StartTS(), kv.LockAlwaysWait, time.Now()) + + s.NoError(blockerTxn.LockKeys(ctx, lockCtxBlocker, k1)) + s.NoError(failpoint.Enable("tikvclient/tryResolveLock", "panic")) + + txn8, err := s.store.Begin() + s.NoError(err) + txn8.SetPessimistic(true) + + startTime = time.Now() + lockCtx8 := kv.NewLockCtx(txn8.StartTS(), 800, startTime) + lockCtx8.MaxExecutionDeadline = startTime.Add(200 * time.Millisecond) + + err = txn8.LockKeys(ctx, lockCtx8, k1) + elapsed = time.Since(startTime) + + s.Error(err) + s.ErrorAs(err, &queryInterruptedErr) + s.Equal(uint32(transaction.MaxExecTimeExceededSignal), queryInterruptedErr.Signal) + s.Greater(elapsed, 190*time.Millisecond) + s.Less(elapsed, 400*time.Millisecond) + + s.NoError(txn8.Rollback()) + s.NoError(failpoint.Disable("tikvclient/tryResolveLock")) + s.NoError(blockerTxn.Rollback()) + + // Cleanup all transactions + s.NoError(txn2.Rollback()) + s.NoError(txn3.Rollback()) + s.NoError(txn4.Rollback()) + s.NoError(txn5.Rollback()) + s.NoError(txn6.Rollback()) + s.NoError(txn7.Rollback()) +} diff --git a/kv/kv.go b/kv/kv.go index 8bb5696c..59c1ebec 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -76,6 +76,8 @@ type LockCtx struct { // LockCtx specially. ResourceGroupTagger func(*kvrpcpb.PessimisticLockRequest) []byte OnDeadlock func(*tikverr.ErrDeadlock) + // max_execution_time support - if zero, timeout checking is disabled + MaxExecutionDeadline time.Time } // LockWaitTime returns lockWaitTimeInMs diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index 5820199d..7f446f73 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -567,7 +567,6 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error { flags := it.Flags() var value []byte var op kvrpcpb.Op - if !it.HasValue() { if !flags.HasLocked() { continue diff --git a/txnkv/transaction/pessimistic.go b/txnkv/transaction/pessimistic.go index 6d64a2c4..532a7d82 100644 --- a/txnkv/transaction/pessimistic.go +++ b/txnkv/transaction/pessimistic.go @@ -58,6 +58,51 @@ import ( "go.uber.org/zap" ) +// MaxExecTimeExceededSignal represents the signal value for max_execution_time exceeded errors. +// This must match the value of MaxExecTimeExceeded in TiDB's sqlkiller package. +const MaxExecTimeExceededSignal uint32 = 2 + +// checkMaxExecutionTimeExceeded checks if the max_execution_time deadline has been exceeded. +// Returns an error if exceeded, nil otherwise. +func checkMaxExecutionTimeExceeded(lockCtx *kv.LockCtx, now time.Time) error { + if !lockCtx.MaxExecutionDeadline.IsZero() && now.After(lockCtx.MaxExecutionDeadline) { + return errors.WithStack(tikverr.ErrQueryInterruptedWithSignal{Signal: MaxExecTimeExceededSignal}) + } + return nil +} + +// calculateEffectiveWaitTime calculates the effective timeout considering both lockWaitTime and max_execution_time. +// Returns the minimum timeout that should be applied, or kv.LockAlwaysWait if no timeout constraints apply. +func calculateEffectiveWaitTime(lockCtx *kv.LockCtx, lockWaitTime int64, lockWaitStartTime time.Time, now time.Time) int64 { + if lockWaitTime == kv.LockNoWait || lockWaitTime <= 0 { + return kv.LockNoWait + } + + effectiveTimeout := lockWaitTime + + // Consider lockWaitTime if set + if lockWaitTime > 0 && lockWaitTime != kv.LockAlwaysWait { + lockTimeLeft := lockWaitTime - (now.Sub(lockWaitStartTime)).Milliseconds() + if lockTimeLeft <= 0 { + return kv.LockNoWait + } + effectiveTimeout = lockTimeLeft + } + + // Consider max_execution_time deadline + if !lockCtx.MaxExecutionDeadline.IsZero() { + maxExecTimeLeft := lockCtx.MaxExecutionDeadline.Sub(now).Milliseconds() + if maxExecTimeLeft <= 0 { + return kv.LockNoWait + } + if effectiveTimeout == kv.LockAlwaysWait || maxExecTimeLeft < effectiveTimeout { + effectiveTimeout = maxExecTimeLeft + } + } + + return effectiveTimeout +} + type actionPessimisticLock struct { *kv.LockCtx wakeUpMode kvrpcpb.PessimisticLockWakeUpMode @@ -157,15 +202,11 @@ func (action actionPessimisticLock) handleSingleBatch( } }() for { - // if lockWaitTime set, refine the request `WaitTimeout` field based on timeout limit - if action.LockWaitTime() > 0 && action.LockWaitTime() != kv.LockAlwaysWait { - timeLeft := action.LockWaitTime() - (time.Since(lockWaitStartTime)).Milliseconds() - if timeLeft <= 0 { - req.PessimisticLock().WaitTimeout = kv.LockNoWait - } else { - req.PessimisticLock().WaitTimeout = timeLeft - } + now := time.Now() + if err := checkMaxExecutionTimeExceeded(action.LockCtx, now); err != nil { + return err } + req.PessimisticLock().WaitTimeout = calculateEffectiveWaitTime(action.LockCtx, action.LockWaitTime(), lockWaitStartTime, now) elapsed := uint64(time.Since(c.txn.startTime) / time.Millisecond) ttl := elapsed + atomic.LoadUint64(&ManagedLockTTL) if _, err := util.EvalFailpoint("shortPessimisticLockTTL"); err == nil { @@ -197,7 +238,8 @@ func (action actionPessimisticLock) handleSingleBatch( return err } - if action.wakeUpMode == kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeNormal { + switch action.wakeUpMode { + case kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeNormal: finished, err := action.handlePessimisticLockResponseNormalMode(c, bo, &batch, mutations, resp, &diagCtx) if err != nil { return err @@ -205,7 +247,7 @@ func (action actionPessimisticLock) handleSingleBatch( if finished { return nil } - } else if action.wakeUpMode == kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock { + case kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock: finished, err := action.handlePessimisticLockResponseForceLockMode(c, bo, &batch, mutations, resp, &diagCtx) if err != nil { return err @@ -340,6 +382,9 @@ func (action actionPessimisticLock) handlePessimisticLockResponseNormalMode( if len(locks) == 0 { return false, nil } + if err := checkMaxExecutionTimeExceeded(action.LockCtx, time.Now()); err != nil { + return true, err + } // Because we already waited on tikv, no need to Backoff here. // tikv default will wait 3s(also the maximum wait value) when lock error occurs @@ -365,6 +410,11 @@ func (action actionPessimisticLock) handlePessimisticLockResponseNormalMode( // If msBeforeTxnExpired is not zero, it means there are still locks blocking us acquiring // the pessimistic lock. We should return acquire fail with nowait set or timeout error if necessary. if resolveLockRes.TTL > 0 { + // Check max_execution_time deadline first + if err := checkMaxExecutionTimeExceeded(action.LockCtx, time.Now()); err != nil { + return true, err + } + if action.LockWaitTime() == kv.LockNoWait { return true, errors.WithStack(tikverr.ErrLockAcquireFailAndNoWaitSet) } else if action.LockWaitTime() == kv.LockAlwaysWait { @@ -467,6 +517,10 @@ func (action actionPessimisticLock) handlePessimisticLockResponseForceLockMode( if isMutationFailed { if len(locks) > 0 { + if err := checkMaxExecutionTimeExceeded(action.LockCtx, time.Now()); err != nil { + return true, err + } + // Because we already waited on tikv, no need to Backoff here. // tikv default will wait 3s(also the maximum wait value) when lock error occurs if diagCtx.resolvingRecordToken == nil { @@ -491,6 +545,11 @@ func (action actionPessimisticLock) handlePessimisticLockResponseForceLockMode( // If msBeforeTxnExpired is not zero, it means there are still locks blocking us acquiring // the pessimistic lock. We should return acquire fail with nowait set or timeout error if necessary. if resolveLockRes.TTL > 0 { + // Check max_execution_time deadline first + if err := checkMaxExecutionTimeExceeded(action.LockCtx, time.Now()); err != nil { + return true, err + } + if action.LockWaitTime() == kv.LockNoWait { return true, errors.WithStack(tikverr.ErrLockAcquireFailAndNoWaitSet) } else if action.LockWaitTime() == kv.LockAlwaysWait { diff --git a/txnkv/txnlock/lock_resolver.go b/txnkv/txnlock/lock_resolver.go index d1fc2389..e414b438 100644 --- a/txnkv/txnlock/lock_resolver.go +++ b/txnkv/txnlock/lock_resolver.go @@ -487,6 +487,7 @@ func (lr *LockResolver) ResolveLocksDone(callerStartTS uint64, token int) { func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, opts ResolveLocksOptions) (ResolveLockResult, error) { callerStartTS, locks, forRead, lite, detail, pessimisticRegionResolve := opts.CallerStartTS, opts.Locks, opts.ForRead, opts.Lite, opts.Detail, opts.PessimisticRegionResolve + util.EvalFailpoint("tryResolveLock") if lr.testingKnobs.meetLock != nil { lr.testingKnobs.meetLock(locks) }