txn: abort pipelined dml when pk rollback (#1317)

* fail p-dml when ttl manager is closed

Signed-off-by: you06 <you1474600@gmail.com>

* close ttl manager when pk is rollbcked

Signed-off-by: you06 <you1474600@gmail.com>

* stablize test

Signed-off-by: you06 <you1474600@gmail.com>

* add test

Signed-off-by: you06 <you1474600@gmail.com>

* fix race

Signed-off-by: you06 <you1474600@gmail.com>

* fix leak in test

Signed-off-by: you06 <you1474600@gmail.com>

* remote testlog

Signed-off-by: you06 <you1474600@gmail.com>

* check error message

Signed-off-by: you06 <you1474600@gmail.com>

---------

Signed-off-by: you06 <you1474600@gmail.com>
This commit is contained in:
you06 2024-04-23 18:36:44 +09:00 committed by GitHub
parent 059938f7eb
commit edba2544d5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 81 additions and 6 deletions

View File

@ -18,6 +18,7 @@ import (
"bytes" "bytes"
"context" "context"
"strconv" "strconv"
"sync/atomic"
"testing" "testing"
"time" "time"
@ -27,10 +28,13 @@ import (
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/config/retry" "github.com/tikv/client-go/v2/config/retry"
tikverr "github.com/tikv/client-go/v2/error" tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/testutils" "github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv"
"github.com/tikv/client-go/v2/txnkv/transaction" "github.com/tikv/client-go/v2/txnkv/transaction"
"github.com/tikv/client-go/v2/txnkv/txnlock"
"github.com/tikv/client-go/v2/txnkv/txnsnapshot" "github.com/tikv/client-go/v2/txnkv/txnsnapshot"
) )
@ -70,6 +74,26 @@ func (s *testPipelinedMemDBSuite) TearDownTest() {
s.store.Close() s.store.Close()
} }
func (s *testPipelinedMemDBSuite) mustGetLock(key []byte) *txnkv.Lock {
ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
s.Nil(err)
bo := tikv.NewBackofferWithVars(context.Background(), getMaxBackoff, nil)
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{
Key: key,
Version: ver,
})
loc, err := s.store.GetRegionCache().LocateKey(bo, key)
s.Nil(err)
resp, err := s.store.SendReq(bo, req, loc.Region, tikv.ReadTimeoutShort)
s.Nil(err)
s.NotNil(resp.Resp)
keyErr := resp.Resp.(*kvrpcpb.GetResponse).GetError()
s.NotNil(keyErr)
lock, err := txnlock.ExtractLockFromKeyErr(keyErr)
s.Nil(err)
return lock
}
func (s *testPipelinedMemDBSuite) TestPipelinedAndFlush() { func (s *testPipelinedMemDBSuite) TestPipelinedAndFlush() {
ctx := context.Background() ctx := context.Background()
txn, err := s.store.Begin(tikv.WithPipelinedMemDB()) txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
@ -163,6 +187,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedFlushBlock() {
s.Nil(failpoint.Disable("tikvclient/beforePipelinedFlush")) s.Nil(failpoint.Disable("tikvclient/beforePipelinedFlush"))
<-flushReturned <-flushReturned
s.Nil(txn.GetMemBuffer().FlushWait()) s.Nil(txn.GetMemBuffer().FlushWait())
s.Nil(txn.Rollback())
} }
func (s *testPipelinedMemDBSuite) TestPipelinedSkipFlushedLock() { func (s *testPipelinedMemDBSuite) TestPipelinedSkipFlushedLock() {
@ -401,3 +426,43 @@ func (s *testPipelinedMemDBSuite) TestPipelinedPrefetch() {
s.True(tikverr.IsErrNotFound(err)) s.True(tikverr.IsErrNotFound(err))
txn.Rollback() txn.Rollback()
} }
func (s *testPipelinedMemDBSuite) TestPipelinedDMLFailedByPKRollback() {
originManageTTLVal := atomic.LoadUint64(&transaction.ManagedLockTTL)
atomic.StoreUint64(&transaction.ManagedLockTTL, 100) // set to 100ms
defer atomic.StoreUint64(&transaction.ManagedLockTTL, originManageTTLVal)
txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
s.Nil(err)
txn.Set([]byte("key1"), []byte("value1"))
txnProbe := transaction.TxnProbe{KVTxn: txn}
flushed, err := txnProbe.GetMemBuffer().Flush(true)
s.Nil(err)
s.True(flushed)
s.Nil(txn.GetMemBuffer().FlushWait())
s.Equal(txnProbe.GetCommitter().GetPrimaryKey(), []byte("key1"))
s.True(txnProbe.GetCommitter().IsTTLRunning())
// resolve the primary lock
locks := []*txnlock.Lock{s.mustGetLock([]byte("key1"))}
lr := s.store.GetLockResolver()
bo := tikv.NewGcResolveLockMaxBackoffer(context.Background())
loc, err := s.store.GetRegionCache().LocateKey(bo, locks[0].Primary)
s.Nil(err)
success, err := lr.BatchResolveLocks(bo, locks, loc.Region)
s.True(success)
s.Nil(err)
s.Eventuallyf(func() bool {
return !txnProbe.GetCommitter().IsTTLRunning()
}, 5*time.Second, 100*time.Millisecond, "ttl manager should stop after primary lock is resolved")
txn.Set([]byte("key2"), []byte("value2"))
flushed, err = txn.GetMemBuffer().Flush(true)
s.Nil(err)
s.True(flushed)
err = txn.GetMemBuffer().FlushWait()
s.NotNil(err)
s.ErrorContains(err, "ttl manager is closed")
}

View File

@ -63,7 +63,7 @@ func TestPipelinedFlushTrigger(t *testing.T) {
}) })
for i := 0; i < MinFlushKeys-1; i++ { for i := 0; i < MinFlushKeys-1; i++ {
key := []byte(strconv.Itoa(i)) key := []byte(strconv.Itoa(i))
value := make([]byte, avgKeySize-len(key)+1) value := make([]byte, avgKeySize*2-len(key)+1)
// (key + value) * (MinFLushKeys - 1) > MinFlushMemSize // (key + value) * (MinFLushKeys - 1) > MinFlushMemSize
memdb.Set(key, value) memdb.Set(key, value)
flushed, err := memdb.Flush(false) flushed, err := memdb.Flush(false)
@ -82,7 +82,7 @@ func TestPipelinedFlushTrigger(t *testing.T) {
}) })
for i := 0; i < MinFlushKeys; i++ { for i := 0; i < MinFlushKeys; i++ {
key := []byte(strconv.Itoa(i)) key := []byte(strconv.Itoa(i))
value := make([]byte, avgKeySize-len(key)+1) // (key + value) * MinFLushKeys > MinFlushKeys value := make([]byte, avgKeySize*2-len(key)+1) // (key + value) * MinFLushKeys > MinFlushKeys
memdb.Set(key, value) memdb.Set(key, value)
flushed, err := memdb.Flush(false) flushed, err := memdb.Flush(false)
require.Nil(t, err) require.Nil(t, err)

View File

@ -60,8 +60,8 @@ func (s StoreProbe) NewLockResolver() LockResolverProbe {
} }
// Begin starts a transaction. // Begin starts a transaction.
func (s StoreProbe) Begin() (transaction.TxnProbe, error) { func (s StoreProbe) Begin(opts ...TxnOption) (transaction.TxnProbe, error) {
txn, err := s.KVStore.Begin() txn, err := s.KVStore.Begin(opts...)
return transaction.TxnProbe{KVTxn: txn}, err return transaction.TxnProbe{KVTxn: txn}, err
} }

View File

@ -1158,7 +1158,7 @@ func (tm *ttlManager) run(c *twoPhaseCommitter, lockCtx *kv.LockCtx, isPipelined
tm.ch = make(chan struct{}) tm.ch = make(chan struct{})
tm.lockCtx = lockCtx tm.lockCtx = lockCtx
go keepAlive(c, tm.ch, c.primary(), lockCtx, isPipelinedTxn) go keepAlive(c, tm.ch, tm, c.primary(), lockCtx, isPipelinedTxn)
} }
func (tm *ttlManager) close() { func (tm *ttlManager) close() {
@ -1180,7 +1180,7 @@ const pessimisticLockMaxBackoff = 20000
const maxConsecutiveFailure = 10 const maxConsecutiveFailure = 10
func keepAlive( func keepAlive(
c *twoPhaseCommitter, closeCh chan struct{}, primaryKey []byte, c *twoPhaseCommitter, closeCh chan struct{}, tm *ttlManager, primaryKey []byte,
lockCtx *kv.LockCtx, isPipelinedTxn bool, lockCtx *kv.LockCtx, isPipelinedTxn bool,
) { ) {
// Ticker is set to 1/2 of the ManagedLockTTL. // Ticker is set to 1/2 of the ManagedLockTTL.
@ -1193,6 +1193,7 @@ func keepAlive(
keepFail := 0 keepFail := 0
for { for {
select { select {
// because ttlManager can be reset, closeCh may not be equal to tm.ch.
case <-closeCh: case <-closeCh:
return return
case <-ticker.C: case <-ticker.C:
@ -1254,6 +1255,12 @@ func keepAlive(
zap.Uint64("txnStartTS", c.startTS), zap.Uint64("txnStartTS", c.startTS),
zap.Bool("isPipelinedTxn", isPipelinedTxn), zap.Bool("isPipelinedTxn", isPipelinedTxn),
) )
if isPipelinedTxn {
// pipelined DML cannot run without the ttlManager.
// Once the ttl manager fails, the transaction should be rolled back to avoid writing useless locks.
// close the ttlManager and the further flush will stop.
tm.close()
}
return return
} }
continue continue

View File

@ -467,6 +467,9 @@ func (txn *KVTxn) InitPipelinedMemDB() error {
pipelinedMemDB := unionstore.NewPipelinedMemDB(func(ctx context.Context, keys [][]byte) (map[string][]byte, error) { pipelinedMemDB := unionstore.NewPipelinedMemDB(func(ctx context.Context, keys [][]byte) (map[string][]byte, error) {
return txn.snapshot.BatchGetWithTier(ctx, keys, txnsnapshot.BatchGetBufferTier) return txn.snapshot.BatchGetWithTier(ctx, keys, txnsnapshot.BatchGetBufferTier)
}, func(generation uint64, memdb *unionstore.MemDB) (err error) { }, func(generation uint64, memdb *unionstore.MemDB) (err error) {
if atomic.LoadUint32((*uint32)(&txn.committer.ttlManager.state)) == uint32(stateClosed) {
return errors.New("ttl manager is closed")
}
startTime := time.Now() startTime := time.Now()
defer func() { defer func() {
if err != nil { if err != nil {