diff --git a/integration_tests/pipelined_memdb_test.go b/integration_tests/pipelined_memdb_test.go index 3d6c2b47..cfc24013 100644 --- a/integration_tests/pipelined_memdb_test.go +++ b/integration_tests/pipelined_memdb_test.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/pkg/store/mockstore/unistore" "github.com/stretchr/testify/suite" + "github.com/tikv/client-go/v2/config" "github.com/tikv/client-go/v2/config/retry" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/oracle" @@ -466,3 +467,42 @@ func (s *testPipelinedMemDBSuite) TestPipelinedDMLFailedByPKRollback() { s.NotNil(err) s.ErrorContains(err, "ttl manager is closed") } + +func (s *testPipelinedMemDBSuite) TestPipelinedDMLFailedByPKMaxTTLExceeded() { + originManagedTTLVal := atomic.LoadUint64(&transaction.ManagedLockTTL) + originMaxPipelinedTxnTTL := atomic.LoadUint64(&transaction.MaxPipelinedTxnTTL) + atomic.StoreUint64(&transaction.ManagedLockTTL, 100) // set to 100ms + atomic.StoreUint64(&transaction.MaxPipelinedTxnTTL, 200) // set to 200ms + updateGlobalConfig(func(conf *config.Config) { + conf.MaxTxnTTL = 200 // set to 200ms + }) + defer func() { + atomic.StoreUint64(&transaction.ManagedLockTTL, originManagedTTLVal) + atomic.StoreUint64(&transaction.MaxPipelinedTxnTTL, originMaxPipelinedTxnTTL) + restoreGlobalConfFunc() + }() + + txn, err := s.store.Begin(tikv.WithPipelinedMemDB()) + s.Nil(err) + txn.Set([]byte("key1"), []byte("value1")) + txnProbe := transaction.TxnProbe{KVTxn: txn} + flushed, err := txnProbe.GetMemBuffer().Flush(true) + s.Nil(err) + s.True(flushed) + s.Nil(txn.GetMemBuffer().FlushWait()) + s.Equal(txnProbe.GetCommitter().GetPrimaryKey(), []byte("key1")) + + s.True(txnProbe.GetCommitter().IsTTLRunning()) + + s.Eventuallyf(func() bool { + return !txnProbe.GetCommitter().IsTTLRunning() + }, 5*time.Second, 100*time.Millisecond, "ttl manager should stop after max ttl") + + txn.Set([]byte("key2"), []byte("value2")) + flushed, err = txn.GetMemBuffer().Flush(true) + s.Nil(err) + s.True(flushed) + err = txn.GetMemBuffer().FlushWait() + s.NotNil(err) + s.ErrorContains(err, "ttl manager is closed") +} diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index 75741bfd..a772a0e8 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -1229,6 +1229,11 @@ func keepAlive( if c.isPessimistic && lockCtx != nil && lockCtx.LockExpired != nil { atomic.StoreUint32(lockCtx.LockExpired, 1) } + if isPipelinedTxn { + // the pipelined txn can last a long time after max ttl exceeded. + // if we don't stop it, it may fail when committing the primary key with high probability. + tm.close() + } return }