stale read request shoudn't retry leader if leader is already tried (#1174)

Signed-off-by: crazycs520 <crazycs520@gmail.com>
This commit is contained in:
crazycs 2024-02-23 18:01:14 +08:00 committed by GitHub
parent 8d28d3cd3a
commit 6f9550fda3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 563 additions and 218 deletions

View File

@ -46,6 +46,7 @@ import (
"github.com/tikv/client-go/v2/internal/logutil" "github.com/tikv/client-go/v2/internal/logutil"
"github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/metrics" "github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/util"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -191,6 +192,11 @@ func newBackoffFn(base, cap, jitter int) backoffFn {
if maxSleepMs >= 0 && realSleep > maxSleepMs { if maxSleepMs >= 0 && realSleep > maxSleepMs {
realSleep = maxSleepMs realSleep = maxSleepMs
} }
if _, err := util.EvalFailpoint("fastBackoffBySkipSleep"); err == nil {
attempts++
lastSleep = sleep
return realSleep
}
select { select {
case <-time.After(time.Duration(realSleep) * time.Millisecond): case <-time.After(time.Duration(realSleep) * time.Millisecond):
attempts++ attempts++

View File

@ -2843,6 +2843,9 @@ func (s *Store) requestLivenessAndStartHealthCheckLoopIfNeeded(bo *retry.Backoff
reResolveInterval = dur reResolveInterval = dur
} }
} }
if _, err := util.EvalFailpoint("skipStoreCheckUntilHealth"); err == nil {
return
}
go s.checkUntilHealth(c, liveness, reResolveInterval) go s.checkUntilHealth(c, liveness, reResolveInterval)
} }
return return

View File

@ -73,6 +73,9 @@ import (
// network error because tidb-server expect tikv client to exit as soon as possible. // network error because tidb-server expect tikv client to exit as soon as possible.
var shuttingDown uint32 var shuttingDown uint32
// randIntn is only use for testing.
var randIntn = rand.Intn
// StoreShuttingDown atomically stores ShuttingDown into v. // StoreShuttingDown atomically stores ShuttingDown into v.
func StoreShuttingDown(v uint32) { func StoreShuttingDown(v uint32) {
atomic.StoreUint32(&shuttingDown, v) atomic.StoreUint32(&shuttingDown, v)
@ -612,7 +615,7 @@ func (state *tryNewProxy) next(bo *retry.Backoffer, selector *replicaSelector) (
} }
// Skip advanceCnt valid candidates to find a proxy peer randomly // Skip advanceCnt valid candidates to find a proxy peer randomly
advanceCnt := rand.Intn(candidateNum) advanceCnt := randIntn(candidateNum)
for idx, replica := range selector.replicas { for idx, replica := range selector.replicas {
if !state.isCandidate(AccessIndex(idx), replica) { if !state.isCandidate(AccessIndex(idx), replica) {
continue continue
@ -668,13 +671,13 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
resetStaleRead := false resetStaleRead := false
if state.lastIdx < 0 { if state.lastIdx < 0 {
if state.tryLeader { if state.tryLeader {
state.lastIdx = AccessIndex(rand.Intn(replicaSize)) state.lastIdx = AccessIndex(randIntn(replicaSize))
} else { } else {
if replicaSize <= 1 { if replicaSize <= 1 {
state.lastIdx = state.leaderIdx state.lastIdx = state.leaderIdx
} else { } else {
// Randomly select a non-leader peer // Randomly select a non-leader peer
state.lastIdx = AccessIndex(rand.Intn(replicaSize - 1)) state.lastIdx = AccessIndex(randIntn(replicaSize - 1))
if state.lastIdx >= state.leaderIdx { if state.lastIdx >= state.leaderIdx {
state.lastIdx++ state.lastIdx++
} }
@ -696,7 +699,7 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
} }
var offset int var offset int
if state.lastIdx >= 0 { if state.lastIdx >= 0 {
offset = rand.Intn(replicaSize) offset = randIntn(replicaSize)
} }
reloadRegion := false reloadRegion := false
for i := 0; i < replicaSize && !state.option.leaderOnly; i++ { for i := 0; i < replicaSize && !state.option.leaderOnly; i++ {
@ -791,16 +794,7 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
} }
func (state *accessFollower) IsLeaderExhausted(leader *replica) bool { func (state *accessFollower) IsLeaderExhausted(leader *replica) bool {
// Allow another extra retry for the following case: return leader.isExhausted(1, 0)
// 1. The stale read is enabled and leader peer is selected as the target peer at first.
// 2. Data is not ready is returned from the leader peer.
// 3. Stale read flag is removed and processing falls back to snapshot read on the leader peer.
// 4. The leader peer should be retried again using snapshot read.
if state.isStaleRead && state.option.leaderOnly {
return leader.isExhausted(2, 0)
} else {
return leader.isExhausted(1, 0)
}
} }
func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) { func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) {
@ -845,7 +839,7 @@ func (state *tryIdleReplica) next(bo *retry.Backoffer, selector *replicaSelector
// Select a follower replica that has the lowest estimated wait duration // Select a follower replica that has the lowest estimated wait duration
minWait := time.Duration(math.MaxInt64) minWait := time.Duration(math.MaxInt64)
targetIdx := state.leaderIdx targetIdx := state.leaderIdx
startIdx := rand.Intn(len(selector.replicas)) startIdx := randIntn(len(selector.replicas))
for i := 0; i < len(selector.replicas); i++ { for i := 0; i < len(selector.replicas); i++ {
idx := (i + startIdx) % len(selector.replicas) idx := (i + startIdx) % len(selector.replicas)
r := selector.replicas[idx] r := selector.replicas[idx]

View File

@ -759,6 +759,10 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
// TODO(youjiali1995): Remove duplicated tests. This test may be duplicated with other // TODO(youjiali1995): Remove duplicated tests. This test may be duplicated with other
// tests but it's a dedicated one to test sending requests with the replica selector. // tests but it's a dedicated one to test sending requests with the replica selector.
func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
s.NoError(failpoint.Enable("tikvclient/fastBackoffBySkipSleep", `return`))
defer func() {
s.NoError(failpoint.Disable("tikvclient/fastBackoffBySkipSleep"))
}()
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{ req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
Key: []byte("key"), Key: []byte("key"),
Value: []byte("value"), Value: []byte("value"),
@ -987,49 +991,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
for _, store := range s.storeIDs { for _, store := range s.storeIDs {
s.cluster.StartStore(store) s.cluster.StartStore(store)
} }
// Verify switch to the leader immediately when stale read requests with global txn scope meet region errors.
s.cluster.ChangeLeader(region.Region.id, s.peerIDs[0])
reachable.injectConstantLiveness(s.cache)
s.Eventually(func() bool {
stores := s.regionRequestSender.replicaSelector.regionStore.stores
return stores[0].getLivenessState() == reachable &&
stores[1].getLivenessState() == reachable &&
stores[2].getLivenessState() == reachable
}, 3*time.Second, 200*time.Millisecond)
reloadRegion()
req = tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")})
req.ReadReplicaScope = oracle.GlobalTxnScope
req.TxnScope = oracle.GlobalTxnScope
for i := 0; i < 10; i++ {
req.EnableStaleWithMixedReplicaRead()
// The request may be sent to the leader directly. We have to distinguish it.
failureOnFollower := 0
failureOnLeader := 0
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
if addr != s.cluster.GetStore(s.storeIDs[0]).Address {
failureOnFollower++
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{}}}, nil
} else if failureOnLeader == 0 && i%2 == 0 {
failureOnLeader++
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{}}}, nil
} else {
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{}}, nil
}
}}
sender.SendReq(bo, req, region.Region, time.Second)
state, ok := sender.replicaSelector.state.(*accessFollower)
s.True(ok)
s.True(failureOnFollower <= 1) // any retry should go to the leader, hence at most one failure on the follower allowed
if failureOnFollower == 0 && failureOnLeader == 0 {
// if the request goes to the leader and succeeds then it is executed as a StaleRead
s.True(req.StaleRead)
} else {
// otherwise #leaderOnly flag should be set and retry request as a normal read
s.True(state.option.leaderOnly)
s.False(req.StaleRead)
}
}
} }
func (s *testRegionRequestToThreeStoresSuite) TestLoadBasedReplicaRead() { func (s *testRegionRequestToThreeStoresSuite) TestLoadBasedReplicaRead() {
@ -1223,62 +1184,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown()
} }
} }
func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback() {
leaderStore, _ := s.loadAndGetLeaderStore()
leaderLabel := []*metapb.StoreLabel{
{
Key: "id",
Value: strconv.FormatUint(leaderStore.StoreID(), 10),
},
}
regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID)
s.Nil(err)
s.NotNil(regionLoc)
value := []byte("value")
isFirstReq := true
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
select {
case <-ctx.Done():
return nil, errors.New("timeout")
default:
}
// Return `DataIsNotReady` for the first time on leader.
if isFirstReq {
isFirstReq = false
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
DataIsNotReady: &errorpb.DataIsNotReady{},
}}}, nil
}
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: value}}, nil
}}
region, _ := s.cache.searchCachedRegionByID(regionLoc.Region.GetID())
s.True(region.isValid())
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil)
req.ReadReplicaScope = oracle.GlobalTxnScope
req.TxnScope = oracle.GlobalTxnScope
req.EnableStaleWithMixedReplicaRead()
req.ReplicaReadType = kv.ReplicaReadMixed
var ops []StoreSelectorOption
ops = append(ops, WithMatchLabels(leaderLabel))
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
bo := retry.NewBackoffer(ctx, -1)
s.Nil(err)
resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV, ops...)
s.Nil(err)
regionErr, err := resp.GetRegionError()
s.Nil(err)
s.Nil(regionErr)
getResp, ok := resp.Resp.(*kvrpcpb.GetResponse)
s.True(ok)
s.Equal(getResp.Value, value)
}
func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() { func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() {
leaderAddr := "" leaderAddr := ""
reqTargetAddrs := make(map[string]struct{}) reqTargetAddrs := make(map[string]struct{})

View File

@ -348,17 +348,6 @@ func TestRegionCacheStaleRead(t *testing.T) {
followerSuccessReplica: []string{"z2"}, followerSuccessReplica: []string{"z2"},
followerSuccessReadType: SuccessStaleRead, followerSuccessReadType: SuccessStaleRead,
}, },
{
do: leaderDataIsNotReady,
leaderRegionValid: true,
leaderAsyncReload: Some(false),
leaderSuccessReplica: []string{"z1"},
leaderSuccessReadType: SuccessLeaderRead,
followerRegionValid: true,
followerAsyncReload: Some(false),
followerSuccessReplica: []string{"z2"},
followerSuccessReadType: SuccessStaleRead,
},
{ {
do: followerDataIsNotReady, do: followerDataIsNotReady,
leaderRegionValid: true, leaderRegionValid: true,
@ -395,45 +384,6 @@ func TestRegionCacheStaleRead(t *testing.T) {
followerSuccessReplica: []string{"z1"}, followerSuccessReplica: []string{"z1"},
followerSuccessReadType: SuccessLeaderRead, followerSuccessReadType: SuccessLeaderRead,
}, },
{
do: leaderDataIsNotReady,
extra: []func(suite *testRegionCacheStaleReadSuite){followerServerIsBusy},
recoverable: true,
leaderRegionValid: true,
leaderAsyncReload: Some(false),
leaderSuccessReplica: []string{"z1"},
leaderSuccessReadType: SuccessLeaderRead,
followerRegionValid: true,
followerAsyncReload: Some(false),
followerSuccessReplica: []string{"z1"},
followerSuccessReadType: SuccessLeaderRead,
},
{
do: leaderDataIsNotReady,
extra: []func(suite *testRegionCacheStaleReadSuite){followerDataIsNotReady},
recoverable: true,
leaderRegionValid: true,
leaderAsyncReload: Some(false),
leaderSuccessReplica: []string{"z1"},
leaderSuccessReadType: SuccessLeaderRead,
followerRegionValid: true,
followerAsyncReload: Some(false),
followerSuccessReplica: []string{"z1"},
followerSuccessReadType: SuccessLeaderRead,
},
{
do: leaderDataIsNotReady,
extra: []func(suite *testRegionCacheStaleReadSuite){followerDown},
recoverable: true,
leaderRegionValid: true,
leaderAsyncReload: Some(false),
leaderSuccessReplica: []string{"z1"},
leaderSuccessReadType: SuccessLeaderRead,
followerRegionValid: true,
followerAsyncReload: Some(false),
followerSuccessReplica: []string{"z1"},
followerSuccessReadType: SuccessLeaderRead,
},
{ {
do: leaderServerIsBusy, do: leaderServerIsBusy,
extra: []func(suite *testRegionCacheStaleReadSuite){followerServerIsBusy}, extra: []func(suite *testRegionCacheStaleReadSuite){followerServerIsBusy},
@ -598,10 +548,11 @@ func testStaleRead(s *testRegionCacheStaleReadSuite, r *RegionCacheTestCase, zon
return return
} }
msg := fmt.Sprintf("%v %#v", string(resp.Resp.(*kvrpcpb.GetResponse).Value), r)
_, successZone, successReadType := s.extractResp(resp) _, successZone, successReadType := s.extractResp(resp)
find := false find := false
if leaderZone { if leaderZone {
s.Equal(r.leaderSuccessReadType, successReadType) s.Equal(r.leaderSuccessReadType, successReadType, msg)
for _, z := range r.leaderSuccessReplica { for _, z := range r.leaderSuccessReplica {
if z == successZone { if z == successZone {
find = true find = true
@ -617,7 +568,7 @@ func testStaleRead(s *testRegionCacheStaleReadSuite, r *RegionCacheTestCase, zon
} }
} }
} }
s.True(find) s.True(find, msg)
} }
type Option[T interface{}] struct { type Option[T interface{}] struct {
@ -767,22 +718,6 @@ func leaderDownAndElect(s *testRegionCacheStaleReadSuite) {
s.setUnavailableStore(leader.Id) s.setUnavailableStore(leader.Id)
} }
func leaderDataIsNotReady(s *testRegionCacheStaleReadSuite) {
peerID, _ := s.getLeader()
s.injection.leaderRegionError = func(req *tikvrpc.Request, zone string) *errorpb.Error {
if !req.StaleRead || zone != "z1" {
return nil
}
return &errorpb.Error{
DataIsNotReady: &errorpb.DataIsNotReady{
RegionId: s.regionID,
PeerId: peerID,
SafeTs: 0,
},
}
}
}
func leaderServerIsBusy(s *testRegionCacheStaleReadSuite) { func leaderServerIsBusy(s *testRegionCacheStaleReadSuite) {
s.injection.leaderRegionError = func(req *tikvrpc.Request, zone string) *errorpb.Error { s.injection.leaderRegionError = func(req *tikvrpc.Request, zone string) *errorpb.Error {
if zone != "z1" { if zone != "z1" {

View File

@ -679,42 +679,6 @@ func (s *testRegionRequestToSingleStoreSuite) TestCloseConnectionOnStoreNotMatch
s.Equal(target, client.closedAddr) s.Equal(target, client.closedAddr)
} }
func (s *testRegionRequestToSingleStoreSuite) TestStaleReadRetry() {
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{
Key: []byte("key"),
})
req.EnableStaleWithMixedReplicaRead()
req.ReadReplicaScope = "z1" // not global stale read.
region, err := s.cache.LocateRegionByID(s.bo, s.region)
s.Nil(err)
s.NotNil(region)
oc := s.regionRequestSender.client
defer func() {
s.regionRequestSender.client = oc
}()
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
if req.StaleRead {
// Mock for stale-read request always return DataIsNotReady error when tikv `ResolvedTS` is blocked.
response = &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{
RegionError: &errorpb.Error{DataIsNotReady: &errorpb.DataIsNotReady{}},
}}
} else {
response = &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte("value")}}
}
return response, nil
}}
bo := retry.NewBackofferWithVars(context.Background(), 5, nil)
resp, _, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.NotNil(resp)
regionErr, _ := resp.GetRegionError()
s.Nil(regionErr)
s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value)
}
func (s *testRegionRequestToSingleStoreSuite) TestKVReadTimeoutWithDisableBatchClient() { func (s *testRegionRequestToSingleStoreSuite) TestKVReadTimeoutWithDisableBatchClient() {
config.UpdateGlobal(func(conf *config.Config) { config.UpdateGlobal(func(conf *config.Config) {
conf.TiKVClient.MaxBatchSize = 0 conf.TiKVClient.MaxBatchSize = 0

View File

@ -0,0 +1,533 @@
package locate
import (
"context"
"fmt"
"math/rand"
"sort"
"strconv"
"strings"
"sync/atomic"
"testing"
"time"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/config/retry"
"github.com/tikv/client-go/v2/internal/apicodec"
"github.com/tikv/client-go/v2/internal/client"
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikvrpc"
)
type testReplicaSelectorSuite struct {
suite.Suite
cluster *mocktikv.Cluster
storeIDs []uint64
peerIDs []uint64
regionID uint64
leaderPeer uint64
cache *RegionCache
bo *retry.Backoffer
mvccStore mocktikv.MVCCStore
}
func (s *testReplicaSelectorSuite) SetupTest(t *testing.T) {
s.mvccStore = mocktikv.MustNewMVCCStore()
s.cluster = mocktikv.NewCluster(s.mvccStore)
s.storeIDs, s.peerIDs, s.regionID, s.leaderPeer = mocktikv.BootstrapWithMultiStores(s.cluster, 3)
pdCli := &CodecPDClient{mocktikv.NewPDClient(s.cluster), apicodec.NewCodecV1(apicodec.ModeTxn)}
s.cache = NewRegionCache(pdCli)
s.bo = retry.NewNoopBackoff(context.Background())
s.SetT(t)
s.SetS(s)
randIntn = func(n int) int { return 0 }
s.NoError(failpoint.Enable("tikvclient/fastBackoffBySkipSleep", `return`))
s.NoError(failpoint.Enable("tikvclient/skipStoreCheckUntilHealth", `return`))
loc, err := s.cache.LocateKey(s.bo, []byte("key"))
s.Nil(err)
r := s.cache.GetCachedRegionWithRLock(loc.Region)
s.NotNil(r)
// The following assumptions are made in the latter tests, which should be checked in advance:
s.Equal(r.GetLeaderStoreID(), uint64(1)) // region's leader in store1.
s.Equal(len(r.getStore().stores), 3) // region has 3 peer(stores).
for _, store := range r.getStore().stores {
s.Equal(store.labels[0].Key, "id") // Each store has a label "id", and the value is the store's ID.
s.Equal(store.labels[0].Value, fmt.Sprintf("%v", store.storeID))
}
}
func (s *testReplicaSelectorSuite) TearDownTest() {
s.cache.Close()
s.mvccStore.Close()
randIntn = rand.Intn
s.NoError(failpoint.Disable("tikvclient/fastBackoffBySkipSleep"))
s.NoError(failpoint.Disable("tikvclient/skipStoreCheckUntilHealth"))
}
type replicaSelectorAccessPathCase struct {
reqType tikvrpc.CmdType
readType kv.ReplicaReadType
staleRead bool
timeout time.Duration
label *metapb.StoreLabel
accessErr []RegionErrorType
accessErrInValid bool
accessPathResult // use to record the execution result.
expect *accessPathResult //
}
type accessPathResult struct {
accessPath []string
respErr string
respRegionError *errorpb.Error
backoffCnt int
backoffDetail []string
regionIsValid bool
}
func TestReplicaReadStaleReadAccessPathByCase(t *testing.T) {
s := new(testReplicaSelectorSuite)
s.SetupTest(t)
defer s.TearDownTest()
var ca replicaSelectorAccessPathCase
ca = replicaSelectorAccessPathCase{
reqType: tikvrpc.CmdGet,
readType: kv.ReplicaReadMixed,
staleRead: true,
accessErr: []RegionErrorType{DataIsNotReadyErr, ServerIsBusyErr},
expect: &accessPathResult{
accessPath: []string{
"{addr: store1, replica-read: false, stale-read: true}",
"{addr: store2, replica-read: true, stale-read: false}",
"{addr: store3, replica-read: true, stale-read: false}",
},
respErr: "",
respRegionError: nil,
backoffCnt: 1,
backoffDetail: []string{"tikvServerBusy+1"},
regionIsValid: true,
},
}
s.True(s.runCaseAndCompare(ca))
// test stale read with label.
ca = replicaSelectorAccessPathCase{
reqType: tikvrpc.CmdGet,
readType: kv.ReplicaReadMixed,
staleRead: true,
label: &metapb.StoreLabel{Key: "id", Value: "2"},
accessErr: []RegionErrorType{DataIsNotReadyErr},
expect: &accessPathResult{
accessPath: []string{
"{addr: store2, replica-read: false, stale-read: true}",
"{addr: store1, replica-read: false, stale-read: false}", // try leader with leader read.
},
respErr: "",
respRegionError: nil,
backoffCnt: 0,
backoffDetail: []string{},
regionIsValid: true,
},
}
s.True(s.runCaseAndCompare(ca))
ca = replicaSelectorAccessPathCase{
reqType: tikvrpc.CmdGet,
readType: kv.ReplicaReadMixed,
staleRead: true,
label: &metapb.StoreLabel{Key: "id", Value: "2"},
accessErr: []RegionErrorType{DataIsNotReadyErr, ServerIsBusyErr},
expect: &accessPathResult{
accessPath: []string{
"{addr: store2, replica-read: false, stale-read: true}",
"{addr: store1, replica-read: false, stale-read: false}",
"{addr: store2, replica-read: true, stale-read: false}"},
respErr: "",
respRegionError: nil,
backoffCnt: 0,
backoffDetail: []string{},
regionIsValid: true,
},
}
s.True(s.runCaseAndCompare(ca))
}
func (s *testReplicaSelectorSuite) runCaseAndCompare(ca2 replicaSelectorAccessPathCase) bool {
ca2.run(s)
if ca2.accessErrInValid {
// the case has been marked as invalid, just ignore it.
return false
}
if ca2.expect != nil {
msg := fmt.Sprintf("%v\n\n", ca2.Format())
expect := ca2.expect
result := ca2.accessPathResult
s.Equal(expect.accessPath, result.accessPath, msg)
s.Equal(expect.respErr, result.respErr, msg)
s.Equal(expect.respRegionError, result.respRegionError, msg)
s.Equal(expect.regionIsValid, result.regionIsValid, msg)
s.Equal(expect.backoffCnt, result.backoffCnt, msg)
s.Equal(expect.backoffDetail, result.backoffDetail, msg)
}
return true
}
func (ca *replicaSelectorAccessPathCase) run(s *testReplicaSelectorSuite) {
reachable.injectConstantLiveness(s.cache) // inject reachable liveness.
msg := ca.Format()
access := []string{}
fnClient := &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
idx := len(access)
access = append(access, fmt.Sprintf("{addr: %v, replica-read: %v, stale-read: %v}", addr, req.ReplicaRead, req.StaleRead))
if idx < len(ca.accessErr) {
if !ca.accessErr[idx].Valid(addr, req) {
// mark this case is invalid. just ignore this case.
ca.accessErrInValid = true
} else {
loc, err := s.cache.LocateKey(s.bo, []byte("key"))
s.Nil(err)
rc := s.cache.GetCachedRegionWithRLock(loc.Region)
s.NotNil(rc)
regionErr, err := ca.genAccessErr(s.cache, rc, ca.accessErr[idx])
if regionErr != nil {
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{
RegionError: regionErr,
}}, nil
}
if err != nil {
return nil, err
}
}
}
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{
Value: []byte("hello world"),
}}, nil
}}
sender := NewRegionRequestSender(s.cache, fnClient)
var req *tikvrpc.Request
switch ca.reqType {
case tikvrpc.CmdGet:
req = tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{
Key: []byte("key"),
})
case tikvrpc.CmdPrewrite:
req = tikvrpc.NewRequest(tikvrpc.CmdPrewrite, &kvrpcpb.PrewriteRequest{})
default:
s.FailNow("unsupported reqType " + ca.reqType.String())
}
if ca.staleRead {
req.EnableStaleWithMixedReplicaRead()
req.ReadReplicaScope = oracle.GlobalTxnScope
req.TxnScope = oracle.GlobalTxnScope
} else {
req.ReplicaReadType = ca.readType
req.ReplicaRead = ca.readType.IsFollowerRead()
}
opts := []StoreSelectorOption{}
if ca.label != nil {
opts = append(opts, WithMatchLabels([]*metapb.StoreLabel{ca.label}))
}
// reset slow score, since serverIsBusyErr will mark the store is slow, and affect remaining test cases.
loc, err := s.cache.LocateKey(s.bo, []byte("key"))
s.Nil(err)
rc := s.cache.GetCachedRegionWithRLock(loc.Region)
s.NotNil(rc)
for _, store := range rc.getStore().stores {
store.slowScore.resetSlowScore()
atomic.StoreUint32(&store.livenessState, uint32(reachable))
store.setResolveState(resolved)
}
bo := retry.NewBackofferWithVars(context.Background(), 40000, nil)
timeout := ca.timeout
if timeout == 0 {
timeout = client.ReadTimeoutShort
}
resp, _, _, err := sender.SendReqCtx(bo, req, loc.Region, timeout, tikvrpc.TiKV, opts...)
if err == nil {
s.NotNil(resp, msg)
regionErr, err := resp.GetRegionError()
s.Nil(err, msg)
ca.respRegionError = regionErr
} else {
ca.respErr = err.Error()
}
ca.accessPath = access
ca.backoffCnt = bo.GetTotalBackoffTimes()
detail := make([]string, 0, len(bo.GetBackoffTimes()))
for tp, cnt := range bo.GetBackoffTimes() {
detail = append(detail, fmt.Sprintf("%v+%v", tp, cnt))
}
sort.Strings(detail)
ca.backoffDetail = detail
ca.regionIsValid = sender.replicaSelector.region.isValid()
sender.replicaSelector.invalidateRegion() // invalidate region to reload for next test case.
}
func (ca *replicaSelectorAccessPathCase) genAccessErr(regionCache *RegionCache, r *Region, accessErr RegionErrorType) (regionErr *errorpb.Error, err error) {
genNotLeaderErr := func(storeID uint64) *errorpb.Error {
var peerInStore *metapb.Peer
for _, peer := range r.meta.Peers {
if peer.StoreId == storeID {
peerInStore = peer
break
}
}
return &errorpb.Error{
NotLeader: &errorpb.NotLeader{
RegionId: r.meta.Id,
Leader: peerInStore,
},
}
}
switch accessErr {
case NotLeaderWithNewLeader1Err:
regionErr = genNotLeaderErr(1)
case NotLeaderWithNewLeader2Err:
regionErr = genNotLeaderErr(2)
case NotLeaderWithNewLeader3Err:
regionErr = genNotLeaderErr(3)
default:
regionErr, err = accessErr.GenError()
}
if err != nil {
// inject unreachable liveness.
unreachable.injectConstantLiveness(regionCache)
}
return regionErr, err
}
func (c *replicaSelectorAccessPathCase) Format() string {
label := ""
if c.label != nil {
label = fmt.Sprintf("%v->%v", c.label.Key, c.label.Value)
}
respRegionError := ""
if c.respRegionError != nil {
respRegionError = c.respRegionError.String()
}
accessErr := make([]string, len(c.accessErr))
for i := range c.accessErr {
accessErr[i] = c.accessErr[i].String()
}
return fmt.Sprintf("{\n"+
"\treq: %v\n"+
"\tread_type: %v\n"+
"\tstale_read: %v\n"+
"\ttimeout: %v\n"+
"\tlabel: %v\n"+
"\taccess_err: %v\n"+
"\taccess_path: %v\n"+
"\tresp_err: %v\n"+
"\tresp_region_err: %v\n"+
"\tbackoff_cnt: %v\n"+
"\tbackoff_detail: %v\n"+
"\tregion_is_valid: %v\n}",
c.reqType, c.readType, c.staleRead, c.timeout, label, strings.Join(accessErr, ", "), strings.Join(c.accessPath, ", "),
c.respErr, respRegionError, c.backoffCnt, strings.Join(c.backoffDetail, ", "), c.regionIsValid)
}
type RegionErrorType int
const (
NotLeaderErr RegionErrorType = iota + 1
NotLeaderWithNewLeader1Err
NotLeaderWithNewLeader2Err
NotLeaderWithNewLeader3Err
RegionNotFoundErr
KeyNotInRegionErr
EpochNotMatchErr
ServerIsBusyErr
ServerIsBusyWithEstimatedWaitMsErr
StaleCommandErr
StoreNotMatchErr
RaftEntryTooLargeErr
MaxTimestampNotSyncedErr
ReadIndexNotReadyErr
ProposalInMergingModeErr
DataIsNotReadyErr
RegionNotInitializedErr
DiskFullErr
RecoveryInProgressErr
FlashbackInProgressErr
FlashbackNotPreparedErr
IsWitnessErr
MismatchPeerIdErr
BucketVersionNotMatchErr
// following error type is not region error.
DeadLineExceededErr
RegionErrorTypeMax
)
func (tp RegionErrorType) GenRegionError() *errorpb.Error {
err := &errorpb.Error{}
switch tp {
case NotLeaderErr:
err.NotLeader = &errorpb.NotLeader{}
case RegionNotFoundErr:
err.RegionNotFound = &errorpb.RegionNotFound{}
case KeyNotInRegionErr:
err.KeyNotInRegion = &errorpb.KeyNotInRegion{}
case EpochNotMatchErr:
err.EpochNotMatch = &errorpb.EpochNotMatch{}
case ServerIsBusyErr:
err.ServerIsBusy = &errorpb.ServerIsBusy{}
case ServerIsBusyWithEstimatedWaitMsErr:
err.ServerIsBusy = &errorpb.ServerIsBusy{EstimatedWaitMs: 10}
case StaleCommandErr:
err.StaleCommand = &errorpb.StaleCommand{}
case StoreNotMatchErr:
err.StoreNotMatch = &errorpb.StoreNotMatch{}
case RaftEntryTooLargeErr:
err.RaftEntryTooLarge = &errorpb.RaftEntryTooLarge{}
case MaxTimestampNotSyncedErr:
err.MaxTimestampNotSynced = &errorpb.MaxTimestampNotSynced{}
case ReadIndexNotReadyErr:
err.ReadIndexNotReady = &errorpb.ReadIndexNotReady{}
case ProposalInMergingModeErr:
err.ProposalInMergingMode = &errorpb.ProposalInMergingMode{}
case DataIsNotReadyErr:
err.DataIsNotReady = &errorpb.DataIsNotReady{}
case RegionNotInitializedErr:
err.RegionNotInitialized = &errorpb.RegionNotInitialized{}
case DiskFullErr:
err.DiskFull = &errorpb.DiskFull{}
case RecoveryInProgressErr:
err.RecoveryInProgress = &errorpb.RecoveryInProgress{}
case FlashbackInProgressErr:
err.FlashbackInProgress = &errorpb.FlashbackInProgress{}
case FlashbackNotPreparedErr:
err.FlashbackNotPrepared = &errorpb.FlashbackNotPrepared{}
case IsWitnessErr:
err.IsWitness = &errorpb.IsWitness{}
case MismatchPeerIdErr:
err.MismatchPeerId = &errorpb.MismatchPeerId{}
case BucketVersionNotMatchErr:
err.BucketVersionNotMatch = &errorpb.BucketVersionNotMatch{}
default:
return nil
}
return err
}
func (tp RegionErrorType) GenError() (*errorpb.Error, error) {
regionErr := tp.GenRegionError()
if regionErr != nil {
return regionErr, nil
}
switch tp {
case DeadLineExceededErr:
return nil, context.DeadlineExceeded
}
return nil, nil
}
func (tp RegionErrorType) Valid(addr string, req *tikvrpc.Request) bool {
// leader-read.
if !req.StaleRead && !req.ReplicaRead {
switch tp {
case DataIsNotReadyErr:
// DataIsNotReadyErr only return when req is a stale read.
return false
}
}
// replica-read.
if !req.StaleRead && req.ReplicaRead {
switch tp {
case NotLeaderErr, NotLeaderWithNewLeader1Err, NotLeaderWithNewLeader2Err, NotLeaderWithNewLeader3Err:
// NotLeaderErr will not return in replica read.
return false
case DataIsNotReadyErr:
// DataIsNotReadyErr only return when req is a stale read.
return false
}
}
// stale-read.
if req.StaleRead && !req.ReplicaRead {
switch tp {
case NotLeaderErr, NotLeaderWithNewLeader1Err, NotLeaderWithNewLeader2Err, NotLeaderWithNewLeader3Err:
// NotLeaderErr will not return in stale read.
return false
}
}
// store1 can't return a not leader error with new leader in store1.
if addr == "store1" && tp == NotLeaderWithNewLeader1Err {
return false
}
// ditto.
if addr == "store2" && tp == NotLeaderWithNewLeader2Err {
return false
}
// ditto.
if addr == "store3" && tp == NotLeaderWithNewLeader3Err {
return false
}
return true
}
func (tp RegionErrorType) String() string {
switch tp {
case NotLeaderErr:
return "NotLeaderErr"
case NotLeaderWithNewLeader1Err:
return "NotLeaderWithNewLeader1Err"
case NotLeaderWithNewLeader2Err:
return "NotLeaderWithNewLeader2Err"
case NotLeaderWithNewLeader3Err:
return "NotLeaderWithNewLeader3Err"
case RegionNotFoundErr:
return "RegionNotFoundErr"
case KeyNotInRegionErr:
return "KeyNotInRegionErr"
case EpochNotMatchErr:
return "EpochNotMatchErr"
case ServerIsBusyErr:
return "ServerIsBusyErr"
case ServerIsBusyWithEstimatedWaitMsErr:
return "ServerIsBusyWithEstimatedWaitMsErr"
case StaleCommandErr:
return "StaleCommandErr"
case StoreNotMatchErr:
return "StoreNotMatchErr"
case RaftEntryTooLargeErr:
return "RaftEntryTooLargeErr"
case MaxTimestampNotSyncedErr:
return "MaxTimestampNotSyncedErr"
case ReadIndexNotReadyErr:
return "ReadIndexNotReadyErr"
case ProposalInMergingModeErr:
return "ProposalInMergingModeErr"
case DataIsNotReadyErr:
return "DataIsNotReadyErr"
case RegionNotInitializedErr:
return "RegionNotInitializedErr"
case DiskFullErr:
return "DiskFullErr"
case RecoveryInProgressErr:
return "RecoveryInProgressErr"
case FlashbackInProgressErr:
return "FlashbackInProgressErr"
case FlashbackNotPreparedErr:
return "FlashbackNotPreparedErr"
case IsWitnessErr:
return "IsWitnessErr"
case MismatchPeerIdErr:
return "MismatchPeerIdErr"
case BucketVersionNotMatchErr:
return "BucketVersionNotMatchErr"
case DeadLineExceededErr:
return "DeadLineExceededErr"
default:
return "unknown_" + strconv.Itoa(int(tp))
}
}

View File

@ -153,6 +153,11 @@ func (ss *SlowScoreStat) markAlreadySlow() {
atomic.StoreUint64(&ss.avgScore, slowScoreMax) atomic.StoreUint64(&ss.avgScore, slowScoreMax)
} }
// resetSlowScore resets the slow score to 0. It's used for test.
func (ss *SlowScoreStat) resetSlowScore() {
atomic.StoreUint64(&ss.avgScore, 0)
}
func (ss *SlowScoreStat) isSlow() bool { func (ss *SlowScoreStat) isSlow() bool {
return ss.getSlowScore() >= slowScoreThreshold return ss.getSlowScore() >= slowScoreThreshold
} }