fail pipelined dml when max ttl exceeded (#1329)

Signed-off-by: you06 <you1474600@gmail.com>
Co-authored-by: ekexium <eke@fastmail.com>
This commit is contained in:
you06 2024-04-30 23:52:41 +09:00 committed by GitHub
parent 52c232be3d
commit 6cb0704fce
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 45 additions and 0 deletions

View File

@ -26,6 +26,7 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/pkg/store/mockstore/unistore"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/config/retry"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/oracle"
@ -466,3 +467,42 @@ func (s *testPipelinedMemDBSuite) TestPipelinedDMLFailedByPKRollback() {
s.NotNil(err)
s.ErrorContains(err, "ttl manager is closed")
}
func (s *testPipelinedMemDBSuite) TestPipelinedDMLFailedByPKMaxTTLExceeded() {
originManagedTTLVal := atomic.LoadUint64(&transaction.ManagedLockTTL)
originMaxPipelinedTxnTTL := atomic.LoadUint64(&transaction.MaxPipelinedTxnTTL)
atomic.StoreUint64(&transaction.ManagedLockTTL, 100) // set to 100ms
atomic.StoreUint64(&transaction.MaxPipelinedTxnTTL, 200) // set to 200ms
updateGlobalConfig(func(conf *config.Config) {
conf.MaxTxnTTL = 200 // set to 200ms
})
defer func() {
atomic.StoreUint64(&transaction.ManagedLockTTL, originManagedTTLVal)
atomic.StoreUint64(&transaction.MaxPipelinedTxnTTL, originMaxPipelinedTxnTTL)
restoreGlobalConfFunc()
}()
txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
s.Nil(err)
txn.Set([]byte("key1"), []byte("value1"))
txnProbe := transaction.TxnProbe{KVTxn: txn}
flushed, err := txnProbe.GetMemBuffer().Flush(true)
s.Nil(err)
s.True(flushed)
s.Nil(txn.GetMemBuffer().FlushWait())
s.Equal(txnProbe.GetCommitter().GetPrimaryKey(), []byte("key1"))
s.True(txnProbe.GetCommitter().IsTTLRunning())
s.Eventuallyf(func() bool {
return !txnProbe.GetCommitter().IsTTLRunning()
}, 5*time.Second, 100*time.Millisecond, "ttl manager should stop after max ttl")
txn.Set([]byte("key2"), []byte("value2"))
flushed, err = txn.GetMemBuffer().Flush(true)
s.Nil(err)
s.True(flushed)
err = txn.GetMemBuffer().FlushWait()
s.NotNil(err)
s.ErrorContains(err, "ttl manager is closed")
}

View File

@ -1229,6 +1229,11 @@ func keepAlive(
if c.isPessimistic && lockCtx != nil && lockCtx.LockExpired != nil {
atomic.StoreUint32(lockCtx.LockExpired, 1)
}
if isPipelinedTxn {
// the pipelined txn can last a long time after max ttl exceeded.
// if we don't stop it, it may fail when committing the primary key with high probability.
tm.close()
}
return
}