mirror of https://github.com/tikv/client-go.git
Preserve is_retry_request flag for prewrite when retrying with region errors (#513)
* add a test to verify the incorrect setting of is_retry_request Signed-off-by: ekexium <ekexium@gmail.com> * preserve the retry flag for prewrite Signed-off-by: ekexium <ekexium@gmail.com> * fix test Signed-off-by: ekexium <ekexium@gmail.com> * add more explanations Signed-off-by: ekexium <ekexium@gmail.com>
This commit is contained in:
parent
75d3f86f3b
commit
e69cd80e58
|
@ -590,15 +590,15 @@ func (s *testAsyncCommitSuite) TestPessimisticTxnResolveAsyncCommitLock() {
|
|||
ctx := context.Background()
|
||||
k := []byte("k")
|
||||
|
||||
// Lock the key with an async-commit lock.
|
||||
s.lockKeysWithAsyncCommit([][]byte{}, [][]byte{}, k, k, false)
|
||||
|
||||
txn, err := s.store.Begin()
|
||||
s.Nil(err)
|
||||
txn.SetPessimistic(true)
|
||||
err = txn.LockKeys(ctx, &kv.LockCtx{ForUpdateTS: txn.StartTS()}, []byte("k1"))
|
||||
s.Nil(err)
|
||||
|
||||
// Lock the key with a async-commit lock.
|
||||
s.lockKeysWithAsyncCommit([][]byte{}, [][]byte{}, k, k, false)
|
||||
|
||||
txn.Set(k, k)
|
||||
err = txn.Commit(context.Background())
|
||||
s.Nil(err)
|
||||
|
|
|
@ -35,13 +35,18 @@
|
|||
package tikv_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/failpoint"
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/tikv/client-go/v2/testutils"
|
||||
"github.com/tikv/client-go/v2/tikv"
|
||||
"github.com/tikv/client-go/v2/tikvrpc"
|
||||
"github.com/tikv/client-go/v2/txnkv/transaction"
|
||||
)
|
||||
|
||||
|
@ -84,3 +89,64 @@ func TestSetMinCommitTSInAsyncCommit(t *testing.T) {
|
|||
assert.Equal(req.MinCommitTs, committer.GetMinCommitTS())
|
||||
|
||||
}
|
||||
|
||||
// TestIsRetryRequestFlagWithRegionError tests that the is_retry_request flag is true for all retrying prewrite requests.
|
||||
func TestIsRetryRequestFlagWithRegionError(t *testing.T) {
|
||||
require := require.New(t)
|
||||
|
||||
client, cluster, pdClient, err := testutils.NewMockTiKV("", nil)
|
||||
require.Nil(err)
|
||||
_, peerID, regionID := testutils.BootstrapWithSingleStore(cluster)
|
||||
store, err := tikv.NewTestTiKVStore(client, pdClient, nil, nil, 0)
|
||||
require.Nil(err)
|
||||
defer store.Close()
|
||||
|
||||
failpoint.Enable("tikvclient/mockRetrySendReqToRegion", "1*return(true)->return(false)")
|
||||
defer failpoint.Disable("tikvclient/mockRetrySendReqToRegion")
|
||||
failpoint.Enable("tikvclient/invalidCacheAndRetry", "1*off->pause")
|
||||
|
||||
// the history of this field
|
||||
isRetryRequest := make([]bool, 0, 0)
|
||||
var mu sync.Mutex
|
||||
hook := func(req *tikvrpc.Request) {
|
||||
if req.Type != tikvrpc.CmdPrewrite {
|
||||
return
|
||||
}
|
||||
if req != nil {
|
||||
mu.Lock()
|
||||
isRetryRequest = append(isRetryRequest, req.Context.IsRetryRequest)
|
||||
mu.Unlock()
|
||||
}
|
||||
}
|
||||
failpoint.Enable("tikvclient/beforeSendReqToRegion", "return")
|
||||
defer failpoint.Disable("tikvclient/beforeSendReqToRegion")
|
||||
ctx := context.WithValue(context.TODO(), "sendReqToRegionHook", hook)
|
||||
|
||||
tx, err := store.Begin()
|
||||
require.Nil(err)
|
||||
txn := transaction.TxnProbe{KVTxn: tx}
|
||||
err = txn.Set([]byte("a"), []byte("v"))
|
||||
require.Nil(err)
|
||||
err = txn.Set([]byte("z"), []byte("v"))
|
||||
require.Nil(err)
|
||||
committer, err := txn.NewCommitter(1)
|
||||
require.Nil(err)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
committer.PrewriteAllMutations(ctx)
|
||||
wg.Done()
|
||||
}()
|
||||
time.Sleep(time.Second * 3)
|
||||
cluster.Split(regionID, cluster.AllocID(), []byte("h"), []uint64{peerID}, peerID)
|
||||
failpoint.Disable("tikvclient/invalidCacheAndRetry")
|
||||
wg.Wait()
|
||||
|
||||
// The event history should be:
|
||||
// 1. The first prewrite succeeds in TiKV, but due to some reason, client-go doesn't get the response. We inject a retry to simulate it.
|
||||
// 2. The second prewrite request returns a region error, which is caused by the region split. And it retries.
|
||||
// 3. The third and fourth prewrite requests (for 'a' and 'z' respectively, so there are 2 of them) succeed.
|
||||
//
|
||||
// The last three requests are retry requests, we assert the is_retry_request flags are all true.
|
||||
require.Equal([]bool{false, true, true, true}, isRetryRequest)
|
||||
}
|
||||
|
|
|
@ -989,6 +989,14 @@ func (s *RegionRequestSender) SendReqCtx(
|
|||
|
||||
logutil.Eventf(bo.GetCtx(), "send %s request to region %d at %s", req.Type, regionID.id, rpcCtx.Addr)
|
||||
s.storeAddr = rpcCtx.Addr
|
||||
|
||||
if _, err := util.EvalFailpoint("beforeSendReqToRegion"); err == nil {
|
||||
if hook := bo.GetCtx().Value("sendReqToRegionHook"); hook != nil {
|
||||
h := hook.(func(*tikvrpc.Request))
|
||||
h(req)
|
||||
}
|
||||
}
|
||||
|
||||
var retry bool
|
||||
resp, retry, err = s.sendReqToRegion(bo, rpcCtx, req, timeout)
|
||||
if err != nil {
|
||||
|
|
|
@ -239,6 +239,9 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B
|
|||
}()
|
||||
for {
|
||||
attempts++
|
||||
if attempts > 1 || action.retry {
|
||||
req.IsRetryRequest = true
|
||||
}
|
||||
if time.Since(tBegin) > slowRequestThreshold {
|
||||
logutil.BgLogger().Warn("slow prewrite request", zap.Uint64("startTS", c.startTS), zap.Stringer("region", &batch.region), zap.Int("attempts", attempts))
|
||||
tBegin = time.Now()
|
||||
|
|
Loading…
Reference in New Issue