From edba2544d5dcbe0cf6f8679bddabd321752debdc Mon Sep 17 00:00:00 2001 From: you06 Date: Tue, 23 Apr 2024 18:36:44 +0900 Subject: [PATCH] txn: abort pipelined dml when pk rollback (#1317) * fail p-dml when ttl manager is closed Signed-off-by: you06 * close ttl manager when pk is rollbcked Signed-off-by: you06 * stablize test Signed-off-by: you06 * add test Signed-off-by: you06 * fix race Signed-off-by: you06 * fix leak in test Signed-off-by: you06 * remote testlog Signed-off-by: you06 * check error message Signed-off-by: you06 --------- Signed-off-by: you06 --- integration_tests/pipelined_memdb_test.go | 65 +++++++++++++++++++++ internal/unionstore/pipelined_memdb_test.go | 4 +- tikv/test_probe.go | 4 +- txnkv/transaction/2pc.go | 11 +++- txnkv/transaction/txn.go | 3 + 5 files changed, 81 insertions(+), 6 deletions(-) diff --git a/integration_tests/pipelined_memdb_test.go b/integration_tests/pipelined_memdb_test.go index ac38d59d..3d6c2b47 100644 --- a/integration_tests/pipelined_memdb_test.go +++ b/integration_tests/pipelined_memdb_test.go @@ -18,6 +18,7 @@ import ( "bytes" "context" "strconv" + "sync/atomic" "testing" "time" @@ -27,10 +28,13 @@ import ( "github.com/stretchr/testify/suite" "github.com/tikv/client-go/v2/config/retry" tikverr "github.com/tikv/client-go/v2/error" + "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/testutils" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/txnkv" "github.com/tikv/client-go/v2/txnkv/transaction" + "github.com/tikv/client-go/v2/txnkv/txnlock" "github.com/tikv/client-go/v2/txnkv/txnsnapshot" ) @@ -70,6 +74,26 @@ func (s *testPipelinedMemDBSuite) TearDownTest() { s.store.Close() } +func (s *testPipelinedMemDBSuite) mustGetLock(key []byte) *txnkv.Lock { + ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + s.Nil(err) + bo := tikv.NewBackofferWithVars(context.Background(), getMaxBackoff, nil) + req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{ + Key: key, + Version: ver, + }) + loc, err := s.store.GetRegionCache().LocateKey(bo, key) + s.Nil(err) + resp, err := s.store.SendReq(bo, req, loc.Region, tikv.ReadTimeoutShort) + s.Nil(err) + s.NotNil(resp.Resp) + keyErr := resp.Resp.(*kvrpcpb.GetResponse).GetError() + s.NotNil(keyErr) + lock, err := txnlock.ExtractLockFromKeyErr(keyErr) + s.Nil(err) + return lock +} + func (s *testPipelinedMemDBSuite) TestPipelinedAndFlush() { ctx := context.Background() txn, err := s.store.Begin(tikv.WithPipelinedMemDB()) @@ -163,6 +187,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedFlushBlock() { s.Nil(failpoint.Disable("tikvclient/beforePipelinedFlush")) <-flushReturned s.Nil(txn.GetMemBuffer().FlushWait()) + s.Nil(txn.Rollback()) } func (s *testPipelinedMemDBSuite) TestPipelinedSkipFlushedLock() { @@ -401,3 +426,43 @@ func (s *testPipelinedMemDBSuite) TestPipelinedPrefetch() { s.True(tikverr.IsErrNotFound(err)) txn.Rollback() } + +func (s *testPipelinedMemDBSuite) TestPipelinedDMLFailedByPKRollback() { + originManageTTLVal := atomic.LoadUint64(&transaction.ManagedLockTTL) + atomic.StoreUint64(&transaction.ManagedLockTTL, 100) // set to 100ms + defer atomic.StoreUint64(&transaction.ManagedLockTTL, originManageTTLVal) + + txn, err := s.store.Begin(tikv.WithPipelinedMemDB()) + s.Nil(err) + txn.Set([]byte("key1"), []byte("value1")) + txnProbe := transaction.TxnProbe{KVTxn: txn} + flushed, err := txnProbe.GetMemBuffer().Flush(true) + s.Nil(err) + s.True(flushed) + s.Nil(txn.GetMemBuffer().FlushWait()) + s.Equal(txnProbe.GetCommitter().GetPrimaryKey(), []byte("key1")) + + s.True(txnProbe.GetCommitter().IsTTLRunning()) + + // resolve the primary lock + locks := []*txnlock.Lock{s.mustGetLock([]byte("key1"))} + lr := s.store.GetLockResolver() + bo := tikv.NewGcResolveLockMaxBackoffer(context.Background()) + loc, err := s.store.GetRegionCache().LocateKey(bo, locks[0].Primary) + s.Nil(err) + success, err := lr.BatchResolveLocks(bo, locks, loc.Region) + s.True(success) + s.Nil(err) + + s.Eventuallyf(func() bool { + return !txnProbe.GetCommitter().IsTTLRunning() + }, 5*time.Second, 100*time.Millisecond, "ttl manager should stop after primary lock is resolved") + + txn.Set([]byte("key2"), []byte("value2")) + flushed, err = txn.GetMemBuffer().Flush(true) + s.Nil(err) + s.True(flushed) + err = txn.GetMemBuffer().FlushWait() + s.NotNil(err) + s.ErrorContains(err, "ttl manager is closed") +} diff --git a/internal/unionstore/pipelined_memdb_test.go b/internal/unionstore/pipelined_memdb_test.go index 745489f5..4bbf6846 100644 --- a/internal/unionstore/pipelined_memdb_test.go +++ b/internal/unionstore/pipelined_memdb_test.go @@ -63,7 +63,7 @@ func TestPipelinedFlushTrigger(t *testing.T) { }) for i := 0; i < MinFlushKeys-1; i++ { key := []byte(strconv.Itoa(i)) - value := make([]byte, avgKeySize-len(key)+1) + value := make([]byte, avgKeySize*2-len(key)+1) // (key + value) * (MinFLushKeys - 1) > MinFlushMemSize memdb.Set(key, value) flushed, err := memdb.Flush(false) @@ -82,7 +82,7 @@ func TestPipelinedFlushTrigger(t *testing.T) { }) for i := 0; i < MinFlushKeys; i++ { key := []byte(strconv.Itoa(i)) - value := make([]byte, avgKeySize-len(key)+1) // (key + value) * MinFLushKeys > MinFlushKeys + value := make([]byte, avgKeySize*2-len(key)+1) // (key + value) * MinFLushKeys > MinFlushKeys memdb.Set(key, value) flushed, err := memdb.Flush(false) require.Nil(t, err) diff --git a/tikv/test_probe.go b/tikv/test_probe.go index 34fab5c3..3686f67f 100644 --- a/tikv/test_probe.go +++ b/tikv/test_probe.go @@ -60,8 +60,8 @@ func (s StoreProbe) NewLockResolver() LockResolverProbe { } // Begin starts a transaction. -func (s StoreProbe) Begin() (transaction.TxnProbe, error) { - txn, err := s.KVStore.Begin() +func (s StoreProbe) Begin(opts ...TxnOption) (transaction.TxnProbe, error) { + txn, err := s.KVStore.Begin(opts...) return transaction.TxnProbe{KVTxn: txn}, err } diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index ddfe17ad..75741bfd 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -1158,7 +1158,7 @@ func (tm *ttlManager) run(c *twoPhaseCommitter, lockCtx *kv.LockCtx, isPipelined tm.ch = make(chan struct{}) tm.lockCtx = lockCtx - go keepAlive(c, tm.ch, c.primary(), lockCtx, isPipelinedTxn) + go keepAlive(c, tm.ch, tm, c.primary(), lockCtx, isPipelinedTxn) } func (tm *ttlManager) close() { @@ -1180,7 +1180,7 @@ const pessimisticLockMaxBackoff = 20000 const maxConsecutiveFailure = 10 func keepAlive( - c *twoPhaseCommitter, closeCh chan struct{}, primaryKey []byte, + c *twoPhaseCommitter, closeCh chan struct{}, tm *ttlManager, primaryKey []byte, lockCtx *kv.LockCtx, isPipelinedTxn bool, ) { // Ticker is set to 1/2 of the ManagedLockTTL. @@ -1193,6 +1193,7 @@ func keepAlive( keepFail := 0 for { select { + // because ttlManager can be reset, closeCh may not be equal to tm.ch. case <-closeCh: return case <-ticker.C: @@ -1254,6 +1255,12 @@ func keepAlive( zap.Uint64("txnStartTS", c.startTS), zap.Bool("isPipelinedTxn", isPipelinedTxn), ) + if isPipelinedTxn { + // pipelined DML cannot run without the ttlManager. + // Once the ttl manager fails, the transaction should be rolled back to avoid writing useless locks. + // close the ttlManager and the further flush will stop. + tm.close() + } return } continue diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 28c712bc..aea701f2 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -467,6 +467,9 @@ func (txn *KVTxn) InitPipelinedMemDB() error { pipelinedMemDB := unionstore.NewPipelinedMemDB(func(ctx context.Context, keys [][]byte) (map[string][]byte, error) { return txn.snapshot.BatchGetWithTier(ctx, keys, txnsnapshot.BatchGetBufferTier) }, func(generation uint64, memdb *unionstore.MemDB) (err error) { + if atomic.LoadUint32((*uint32)(&txn.committer.ttlManager.state)) == uint32(stateClosed) { + return errors.New("ttl manager is closed") + } startTime := time.Now() defer func() { if err != nil {