From e69cd80e58fc89c6fb29f30669f783f5dac7cdac Mon Sep 17 00:00:00 2001 From: ekexium Date: Tue, 31 May 2022 16:04:28 +0800 Subject: [PATCH] Preserve is_retry_request flag for prewrite when retrying with region errors (#513) * add a test to verify the incorrect setting of is_retry_request Signed-off-by: ekexium * preserve the retry flag for prewrite Signed-off-by: ekexium * fix test Signed-off-by: ekexium * add more explanations Signed-off-by: ekexium --- integration_tests/async_commit_test.go | 6 +-- integration_tests/prewrite_test.go | 66 ++++++++++++++++++++++++++ internal/locate/region_request.go | 8 ++++ txnkv/transaction/prewrite.go | 3 ++ 4 files changed, 80 insertions(+), 3 deletions(-) diff --git a/integration_tests/async_commit_test.go b/integration_tests/async_commit_test.go index 55212cac..14f19ed9 100644 --- a/integration_tests/async_commit_test.go +++ b/integration_tests/async_commit_test.go @@ -590,15 +590,15 @@ func (s *testAsyncCommitSuite) TestPessimisticTxnResolveAsyncCommitLock() { ctx := context.Background() k := []byte("k") + // Lock the key with an async-commit lock. + s.lockKeysWithAsyncCommit([][]byte{}, [][]byte{}, k, k, false) + txn, err := s.store.Begin() s.Nil(err) txn.SetPessimistic(true) err = txn.LockKeys(ctx, &kv.LockCtx{ForUpdateTS: txn.StartTS()}, []byte("k1")) s.Nil(err) - // Lock the key with a async-commit lock. - s.lockKeysWithAsyncCommit([][]byte{}, [][]byte{}, k, k, false) - txn.Set(k, k) err = txn.Commit(context.Background()) s.Nil(err) diff --git a/integration_tests/prewrite_test.go b/integration_tests/prewrite_test.go index a2f38c98..84ae03f6 100644 --- a/integration_tests/prewrite_test.go +++ b/integration_tests/prewrite_test.go @@ -35,13 +35,18 @@ package tikv_test import ( + "context" + "sync" "testing" + "time" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/testutils" "github.com/tikv/client-go/v2/tikv" + "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/txnkv/transaction" ) @@ -84,3 +89,64 @@ func TestSetMinCommitTSInAsyncCommit(t *testing.T) { assert.Equal(req.MinCommitTs, committer.GetMinCommitTS()) } + +// TestIsRetryRequestFlagWithRegionError tests that the is_retry_request flag is true for all retrying prewrite requests. +func TestIsRetryRequestFlagWithRegionError(t *testing.T) { + require := require.New(t) + + client, cluster, pdClient, err := testutils.NewMockTiKV("", nil) + require.Nil(err) + _, peerID, regionID := testutils.BootstrapWithSingleStore(cluster) + store, err := tikv.NewTestTiKVStore(client, pdClient, nil, nil, 0) + require.Nil(err) + defer store.Close() + + failpoint.Enable("tikvclient/mockRetrySendReqToRegion", "1*return(true)->return(false)") + defer failpoint.Disable("tikvclient/mockRetrySendReqToRegion") + failpoint.Enable("tikvclient/invalidCacheAndRetry", "1*off->pause") + + // the history of this field + isRetryRequest := make([]bool, 0, 0) + var mu sync.Mutex + hook := func(req *tikvrpc.Request) { + if req.Type != tikvrpc.CmdPrewrite { + return + } + if req != nil { + mu.Lock() + isRetryRequest = append(isRetryRequest, req.Context.IsRetryRequest) + mu.Unlock() + } + } + failpoint.Enable("tikvclient/beforeSendReqToRegion", "return") + defer failpoint.Disable("tikvclient/beforeSendReqToRegion") + ctx := context.WithValue(context.TODO(), "sendReqToRegionHook", hook) + + tx, err := store.Begin() + require.Nil(err) + txn := transaction.TxnProbe{KVTxn: tx} + err = txn.Set([]byte("a"), []byte("v")) + require.Nil(err) + err = txn.Set([]byte("z"), []byte("v")) + require.Nil(err) + committer, err := txn.NewCommitter(1) + require.Nil(err) + var wg sync.WaitGroup + wg.Add(1) + go func() { + committer.PrewriteAllMutations(ctx) + wg.Done() + }() + time.Sleep(time.Second * 3) + cluster.Split(regionID, cluster.AllocID(), []byte("h"), []uint64{peerID}, peerID) + failpoint.Disable("tikvclient/invalidCacheAndRetry") + wg.Wait() + + // The event history should be: + // 1. The first prewrite succeeds in TiKV, but due to some reason, client-go doesn't get the response. We inject a retry to simulate it. + // 2. The second prewrite request returns a region error, which is caused by the region split. And it retries. + // 3. The third and fourth prewrite requests (for 'a' and 'z' respectively, so there are 2 of them) succeed. + // + // The last three requests are retry requests, we assert the is_retry_request flags are all true. + require.Equal([]bool{false, true, true, true}, isRetryRequest) +} diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 32c81f03..c9c4c8d4 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -989,6 +989,14 @@ func (s *RegionRequestSender) SendReqCtx( logutil.Eventf(bo.GetCtx(), "send %s request to region %d at %s", req.Type, regionID.id, rpcCtx.Addr) s.storeAddr = rpcCtx.Addr + + if _, err := util.EvalFailpoint("beforeSendReqToRegion"); err == nil { + if hook := bo.GetCtx().Value("sendReqToRegionHook"); hook != nil { + h := hook.(func(*tikvrpc.Request)) + h(req) + } + } + var retry bool resp, retry, err = s.sendReqToRegion(bo, rpcCtx, req, timeout) if err != nil { diff --git a/txnkv/transaction/prewrite.go b/txnkv/transaction/prewrite.go index 6caeab9a..b8be353d 100644 --- a/txnkv/transaction/prewrite.go +++ b/txnkv/transaction/prewrite.go @@ -239,6 +239,9 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B }() for { attempts++ + if attempts > 1 || action.retry { + req.IsRetryRequest = true + } if time.Since(tBegin) > slowRequestThreshold { logutil.BgLogger().Warn("slow prewrite request", zap.Uint64("startTS", c.startTS), zap.Stringer("region", &batch.region), zap.Int("attempts", attempts)) tBegin = time.Now()