mirror of https://github.com/tikv/client-go.git
Remove old replica selector (#1265)
* remove EnableReplicaSelectorV2 config Signed-off-by: crazycs520 <crazycs520@gmail.com> * remove replicaSelectorV1 Signed-off-by: crazycs520 <crazycs520@gmail.com> * refine code Signed-off-by: crazycs520 <crazycs520@gmail.com> * refine code Signed-off-by: crazycs520 <crazycs520@gmail.com> * refine code Signed-off-by: crazycs520 <crazycs520@gmail.com> * add replica flag to reduce struct size Signed-off-by: crazycs520 <crazycs520@gmail.com> * remove contextPatcher Signed-off-by: crazycs520 <crazycs520@gmail.com> * use option.preferLeader Signed-off-by: crazycs520 <crazycs520@gmail.com> * refine code Signed-off-by: crazycs520 <crazycs520@gmail.com> * fix test Signed-off-by: crazycs520 <crazycs520@gmail.com> * refine code Signed-off-by: crazycs520 <crazycs520@gmail.com> * fix test Signed-off-by: crazycs520 <crazycs520@gmail.com> --------- Signed-off-by: crazycs520 <crazycs520@gmail.com>
This commit is contained in:
parent
31a8ddab19
commit
0cc1c5239d
|
|
@ -100,9 +100,6 @@ type TiKVClient struct {
|
|||
// MaxConcurrencyRequestLimit is the max concurrency number of request to be sent the tikv
|
||||
// 0 means auto adjust by feedback.
|
||||
MaxConcurrencyRequestLimit int64 `toml:"max-concurrency-request-limit" json:"max-concurrency-request-limit"`
|
||||
// EnableReplicaSelectorV2 indicate whether to use the new replica-selector-v2.
|
||||
// TODO(crazycs520): remove this config after the new replica-selector-v2 is stable.
|
||||
EnableReplicaSelectorV2 bool `toml:"enable-replica-selector-v2" json:"enable-replica-selector-v2"`
|
||||
}
|
||||
|
||||
// AsyncCommit is the config for the async commit feature. The switch to enable it is a system variable.
|
||||
|
|
@ -176,7 +173,6 @@ func DefaultTiKVClient() TiKVClient {
|
|||
|
||||
ResolveLockLiteThreshold: 16,
|
||||
MaxConcurrencyRequestLimit: DefMaxConcurrencyRequestLimit,
|
||||
EnableReplicaSelectorV2: true,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -39,7 +39,6 @@ import (
|
|||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"math"
|
||||
"math/rand"
|
||||
"slices"
|
||||
"sort"
|
||||
|
|
@ -799,8 +798,6 @@ type RPCContext struct {
|
|||
ProxyAddr string // valid when ProxyStore is not nil
|
||||
TiKVNum int // Number of TiKV nodes among the region's peers. Assuming non-TiKV peers are all TiFlash peers.
|
||||
BucketVersion uint64
|
||||
|
||||
contextPatcher contextPatcher // kvrpcpb.Context fields that need to be overridden
|
||||
}
|
||||
|
||||
func (c *RPCContext) String() string {
|
||||
|
|
@ -816,29 +813,6 @@ func (c *RPCContext) String() string {
|
|||
return res
|
||||
}
|
||||
|
||||
type contextPatcher struct {
|
||||
replicaRead *bool
|
||||
busyThreshold *time.Duration
|
||||
staleRead *bool
|
||||
}
|
||||
|
||||
func (patcher *contextPatcher) applyTo(pbCtx *kvrpcpb.Context) {
|
||||
if patcher.replicaRead != nil {
|
||||
pbCtx.ReplicaRead = *patcher.replicaRead
|
||||
}
|
||||
if patcher.staleRead != nil {
|
||||
pbCtx.StaleRead = *patcher.staleRead
|
||||
}
|
||||
if patcher.busyThreshold != nil {
|
||||
millis := patcher.busyThreshold.Milliseconds()
|
||||
if millis > 0 && millis <= math.MaxUint32 {
|
||||
pbCtx.BusyThresholdMs = uint32(millis)
|
||||
} else {
|
||||
pbCtx.BusyThresholdMs = 0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type storeSelectorOp struct {
|
||||
leaderOnly bool
|
||||
preferLeader bool
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load Diff
|
|
@ -197,33 +197,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestSwitchPeerWhenNoLeaderErrorWit
|
|||
s.True(r.isValid())
|
||||
}
|
||||
|
||||
func (s *testRegionRequestToThreeStoresSuite) TestSliceIdentical() {
|
||||
a := make([]int, 0)
|
||||
b := a
|
||||
s.True(sliceIdentical(a, b))
|
||||
b = make([]int, 0)
|
||||
s.False(sliceIdentical(a, b))
|
||||
|
||||
a = append(a, 1, 2, 3)
|
||||
b = a
|
||||
s.True(sliceIdentical(a, b))
|
||||
b = a[:2]
|
||||
s.False(sliceIdentical(a, b))
|
||||
b = a[1:]
|
||||
s.False(sliceIdentical(a, b))
|
||||
a = a[1:]
|
||||
s.True(sliceIdentical(a, b))
|
||||
|
||||
a = nil
|
||||
b = nil
|
||||
|
||||
s.True(sliceIdentical(a, b))
|
||||
a = make([]int, 0)
|
||||
s.False(sliceIdentical(a, b))
|
||||
a = append(a, 1)
|
||||
s.False(sliceIdentical(a, b))
|
||||
}
|
||||
|
||||
func (s *testRegionRequestToThreeStoresSuite) loadAndGetLeaderStore() (*Store, string) {
|
||||
region, err := s.regionRequestSender.regionCache.findRegionByKey(s.bo, []byte("a"), false)
|
||||
s.Nil(err)
|
||||
|
|
@ -378,6 +351,14 @@ func refreshLivenessStates(regionStore *regionStore) {
|
|||
}
|
||||
}
|
||||
|
||||
func refreshStoreHealthStatus(regionStore *regionStore) {
|
||||
for _, store := range regionStore.stores {
|
||||
store.healthStatus.clientSideSlowScore.resetSlowScore()
|
||||
store.healthStatus.ResetTiKVServerSideSlowScoreForTest(50)
|
||||
store.healthStatus.updateSlowFlag()
|
||||
}
|
||||
}
|
||||
|
||||
func AssertRPCCtxEqual(s *testRegionRequestToThreeStoresSuite, rpcCtx *RPCContext, target *replica, proxy *replica) {
|
||||
s.Equal(rpcCtx.Store, target.store)
|
||||
s.Equal(rpcCtx.Peer, target.peer)
|
||||
|
|
@ -429,7 +410,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestLearnerReplicaSelector() {
|
|||
refreshRegionTTL(region)
|
||||
refreshEpochs(regionStore)
|
||||
req.ReplicaReadType = kv.ReplicaReadLearner
|
||||
replicaSelector, err := NewReplicaSelector(cache, regionLoc.Region, req)
|
||||
replicaSelector, err := newReplicaSelector(cache, regionLoc.Region, req)
|
||||
s.NotNil(replicaSelector)
|
||||
s.Nil(err)
|
||||
|
||||
|
|
@ -437,7 +418,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestLearnerReplicaSelector() {
|
|||
refreshRegionTTL(region)
|
||||
rpcCtx, err := replicaSelector.next(s.bo, req)
|
||||
s.Nil(err)
|
||||
target := replicaSelector.targetReplica()
|
||||
target := replicaSelector.target
|
||||
AssertRPCCtxEqual(s, rpcCtx, target, nil)
|
||||
s.Equal(target.peer.Role, metapb.PeerRole_Learner)
|
||||
s.Equal(target.peer.Id, tikvLearner.Id)
|
||||
|
|
@ -472,7 +453,8 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
|
|||
region.meta.Peers = append(region.meta.Peers, tiflash)
|
||||
atomic.StorePointer(®ion.store, unsafe.Pointer(regionStore))
|
||||
|
||||
cache := NewRegionCache(s.cache.pdClient)
|
||||
// Disable the tick on health status.
|
||||
cache := NewRegionCache(s.cache.pdClient, RegionCacheNoHealthTick)
|
||||
defer cache.Close()
|
||||
cache.mu.Lock()
|
||||
cache.insertRegionToCache(region, true, true)
|
||||
|
|
@ -486,7 +468,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
|
|||
// Should only contain TiKV stores.
|
||||
s.Equal(len(replicaSelector.replicas), regionStore.accessStoreNum(tiKVOnly))
|
||||
s.Equal(len(replicaSelector.replicas), len(regionStore.stores)-1)
|
||||
s.IsType(&accessKnownLeader{}, replicaSelector.state)
|
||||
|
||||
// Verify that the store matches the peer and epoch.
|
||||
for _, replica := range replicaSelector.replicas {
|
||||
|
|
@ -501,14 +482,11 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
|
|||
}
|
||||
}
|
||||
|
||||
// Test accessKnownLeader state
|
||||
s.IsType(&accessKnownLeader{}, replicaSelector.state)
|
||||
// Try the leader for maxReplicaAttempt times
|
||||
for i := 1; i <= maxReplicaAttempt; i++ {
|
||||
rpcCtx, err := replicaSelector.next(s.bo, req)
|
||||
s.Nil(err)
|
||||
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil)
|
||||
s.IsType(&accessKnownLeader{}, replicaSelector.state)
|
||||
s.Equal(replicaSelector.replicas[regionStore.workTiKVIdx].attempts, i)
|
||||
}
|
||||
|
||||
|
|
@ -516,15 +494,11 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
|
|||
for i := 0; i < len(replicaSelector.replicas)-1; i++ {
|
||||
rpcCtx, err := replicaSelector.next(s.bo, req)
|
||||
s.Nil(err)
|
||||
state, ok := replicaSelector.state.(*tryFollower)
|
||||
s.True(ok)
|
||||
s.Equal(regionStore.workTiKVIdx, state.leaderIdx)
|
||||
s.NotEqual(state.lastIdx, regionStore.workTiKVIdx)
|
||||
s.Equal(replicaSelector.targetIdx, state.lastIdx)
|
||||
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[replicaSelector.targetIdx], nil)
|
||||
s.Equal(replicaSelector.targetReplica().attempts, 1)
|
||||
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.target, nil)
|
||||
s.Equal(replicaSelector.target.attempts, 1)
|
||||
s.NotEqual(rpcCtx.Peer.Id, replicaSelector.replicas[regionStore.workTiKVIdx].peer.Id)
|
||||
}
|
||||
// In tryFollower state, if all replicas are tried, nil RPCContext should be returned
|
||||
// If all replicas are tried, nil RPCContext should be returned
|
||||
rpcCtx, err := replicaSelector.next(s.bo, req)
|
||||
s.Nil(err)
|
||||
s.Nil(rpcCtx)
|
||||
|
|
@ -537,22 +511,19 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
|
|||
s.Nil(err)
|
||||
s.NotNil(replicaSelector)
|
||||
unreachable.injectConstantLiveness(cache.stores)
|
||||
s.IsType(&accessKnownLeader{}, replicaSelector.state)
|
||||
_, err = replicaSelector.next(s.bo, req)
|
||||
s.Nil(err)
|
||||
replicaSelector.onSendFailure(s.bo, nil)
|
||||
rpcCtx, err = replicaSelector.next(s.bo, req)
|
||||
s.NotNil(rpcCtx)
|
||||
s.Nil(err)
|
||||
s.IsType(&tryFollower{}, replicaSelector.state)
|
||||
s.NotEqual(replicaSelector.targetIdx, regionStore.workTiKVIdx)
|
||||
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.targetReplica(), nil)
|
||||
s.Equal(replicaSelector.targetReplica().attempts, 1)
|
||||
s.NotEqual(replicaSelector.target.peer.Id, region.GetLeaderPeerID())
|
||||
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.target, nil)
|
||||
s.Equal(replicaSelector.target.attempts, 1)
|
||||
// If the NotLeader errors provides an unreachable leader, do not switch to it.
|
||||
replicaSelector.onNotLeader(s.bo, rpcCtx, &errorpb.NotLeader{
|
||||
RegionId: region.GetID(), Leader: &metapb.Peer{Id: s.peerIDs[regionStore.workTiKVIdx], StoreId: s.storeIDs[regionStore.workTiKVIdx]},
|
||||
})
|
||||
s.IsType(&tryFollower{}, replicaSelector.state)
|
||||
|
||||
// If the leader is unreachable and forwarding is not enabled, just do not try
|
||||
// the unreachable leader.
|
||||
|
|
@ -560,14 +531,11 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
|
|||
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req)
|
||||
s.Nil(err)
|
||||
s.NotNil(replicaSelector)
|
||||
s.IsType(&accessKnownLeader{}, replicaSelector.state)
|
||||
// Now, livenessState is unreachable, so it will try a reachable follower instead of the unreachable leader.
|
||||
rpcCtx, err = replicaSelector.next(s.bo, req)
|
||||
s.Nil(err)
|
||||
s.NotNil(rpcCtx)
|
||||
_, ok := replicaSelector.state.(*tryFollower)
|
||||
s.True(ok)
|
||||
s.NotEqual(regionStore.workTiKVIdx, replicaSelector.targetIdx)
|
||||
s.NotEqual(regionStore.workTiKVIdx, replicaSelector.target.peer.Id, replicaSelector.replicas[regionStore.workTiKVIdx].peer.Id)
|
||||
|
||||
// Do not try to use proxy if livenessState is unknown instead of unreachable.
|
||||
refreshEpochs(regionStore)
|
||||
|
|
@ -579,14 +547,11 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
|
|||
s.Eventually(func() bool {
|
||||
return regionStore.stores[regionStore.workTiKVIdx].getLivenessState() == unknown
|
||||
}, 3*time.Second, 200*time.Millisecond)
|
||||
s.IsType(&accessKnownLeader{}, replicaSelector.state)
|
||||
// Now, livenessState is unknown. Even if forwarding is enabled, it should try followers
|
||||
// instead of using the proxy.
|
||||
rpcCtx, err = replicaSelector.next(s.bo, req)
|
||||
s.Nil(err)
|
||||
s.NotNil(rpcCtx)
|
||||
_, ok = replicaSelector.state.(*tryFollower)
|
||||
s.True(ok)
|
||||
|
||||
// Test switching to tryNewProxy if leader is unreachable and forwarding is enabled
|
||||
refreshEpochs(regionStore)
|
||||
|
|
@ -598,65 +563,39 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
|
|||
s.Eventually(func() bool {
|
||||
return regionStore.stores[regionStore.workTiKVIdx].getLivenessState() == unreachable
|
||||
}, 3*time.Second, 200*time.Millisecond)
|
||||
s.IsType(&accessKnownLeader{}, replicaSelector.state)
|
||||
// Now, livenessState is unreachable, so it will try a new proxy instead of the leader.
|
||||
rpcCtx, err = replicaSelector.next(s.bo, req)
|
||||
s.Nil(err)
|
||||
s.NotNil(rpcCtx)
|
||||
state, ok := replicaSelector.state.(*tryNewProxy)
|
||||
s.True(ok)
|
||||
s.Equal(regionStore.workTiKVIdx, state.leaderIdx)
|
||||
s.Equal(AccessIndex(2), replicaSelector.targetIdx)
|
||||
s.NotEqual(AccessIndex(2), replicaSelector.proxyIdx)
|
||||
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.targetReplica(), replicaSelector.proxyReplica())
|
||||
s.Equal(replicaSelector.targetReplica().attempts, 1)
|
||||
s.Equal(replicaSelector.proxyReplica().attempts, 1)
|
||||
s.NotNil(replicaSelector.target)
|
||||
s.NotNil(replicaSelector.proxy)
|
||||
s.NotEqual(replicaSelector.target.peer.Id, &replicaSelector.proxy.peer.Id)
|
||||
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.target, replicaSelector.proxy)
|
||||
s.Equal(replicaSelector.target.attempts, 1)
|
||||
s.Equal(replicaSelector.proxy.attempts, 1)
|
||||
|
||||
// When the current proxy node fails, it should try another one.
|
||||
lastProxy := replicaSelector.proxyIdx
|
||||
replicaSelector.onSendFailure(s.bo, nil)
|
||||
rpcCtx, err = replicaSelector.next(s.bo, req)
|
||||
s.NotNil(rpcCtx)
|
||||
s.Nil(err)
|
||||
state, ok = replicaSelector.state.(*tryNewProxy)
|
||||
s.True(ok)
|
||||
s.Equal(regionStore.workTiKVIdx, state.leaderIdx)
|
||||
s.Equal(AccessIndex(2), replicaSelector.targetIdx)
|
||||
s.NotEqual(lastProxy, replicaSelector.proxyIdx)
|
||||
s.Equal(replicaSelector.targetReplica().attempts, 2)
|
||||
s.Equal(replicaSelector.proxyReplica().attempts, 1)
|
||||
s.Equal(replicaSelector.target.attempts, 2)
|
||||
s.Equal(replicaSelector.proxy.attempts, 1)
|
||||
|
||||
// Test proxy store is saves when proxy is enabled
|
||||
replicaSelector.onSendSuccess(req)
|
||||
regionStore = region.getStore()
|
||||
s.Equal(replicaSelector.proxyIdx, regionStore.proxyTiKVIdx)
|
||||
s.Equal(replicaSelector.proxy.peer.Id, replicaSelector.replicas[regionStore.proxyTiKVIdx].peer.Id)
|
||||
|
||||
// Test initial state is accessByKnownProxy when proxyTiKVIdx is valid
|
||||
// Test when proxyTiKVIdx is valid
|
||||
refreshEpochs(regionStore)
|
||||
cache.enableForwarding = true
|
||||
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req)
|
||||
s.Nil(err)
|
||||
s.NotNil(replicaSelector)
|
||||
state2, ok := replicaSelector.state.(*accessByKnownProxy)
|
||||
s.True(ok)
|
||||
s.Equal(regionStore.workTiKVIdx, state2.leaderIdx)
|
||||
_, err = replicaSelector.next(s.bo, req)
|
||||
s.Nil(err)
|
||||
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.targetReplica(), replicaSelector.proxyReplica())
|
||||
|
||||
// Switch to tryNewProxy if the current proxy is not available
|
||||
replicaSelector.onSendFailure(s.bo, nil)
|
||||
s.IsType(&tryNewProxy{}, replicaSelector.state)
|
||||
rpcCtx, err = replicaSelector.next(s.bo, req)
|
||||
s.Nil(err)
|
||||
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.targetReplica(), replicaSelector.proxyReplica())
|
||||
s.Equal(regionStore.workTiKVIdx, state2.leaderIdx)
|
||||
s.Equal(AccessIndex(2), replicaSelector.targetIdx)
|
||||
s.NotEqual(regionStore.proxyTiKVIdx, replicaSelector.proxyIdx)
|
||||
s.Equal(replicaSelector.targetReplica().attempts, 2)
|
||||
s.Equal(replicaSelector.proxyReplica().attempts, 1)
|
||||
// FIXME: the chosen proxy-replica's store should be reachable.
|
||||
//s.Equal(replicaSelector.proxyReplica().store.getLivenessState(), reachable)
|
||||
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.target, replicaSelector.proxy)
|
||||
|
||||
// Test accessFollower state with kv.ReplicaReadFollower request type.
|
||||
req = tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}, kv.ReplicaReadFollower, nil)
|
||||
|
|
@ -665,28 +604,15 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
|
|||
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req)
|
||||
s.Nil(err)
|
||||
s.NotNil(replicaSelector)
|
||||
state3, ok := replicaSelector.state.(*accessFollower)
|
||||
s.True(ok)
|
||||
s.Equal(regionStore.workTiKVIdx, state3.leaderIdx)
|
||||
s.Equal(state3.lastIdx, AccessIndex(-1))
|
||||
|
||||
lastIdx := AccessIndex(-1)
|
||||
for i := 0; i < regionStore.accessStoreNum(tiKVOnly)-1; i++ {
|
||||
rpcCtx, err := replicaSelector.next(s.bo, req)
|
||||
s.Nil(err)
|
||||
// Should switch to the next follower.
|
||||
s.NotEqual(lastIdx, state3.lastIdx)
|
||||
// Shouldn't access the leader if followers aren't exhausted.
|
||||
s.NotEqual(regionStore.workTiKVIdx, state3.lastIdx)
|
||||
s.Equal(replicaSelector.targetIdx, state3.lastIdx)
|
||||
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[replicaSelector.targetIdx], nil)
|
||||
lastIdx = state3.lastIdx
|
||||
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.target, nil)
|
||||
}
|
||||
// Fallback to the leader for 1 time
|
||||
rpcCtx, err = replicaSelector.next(s.bo, req)
|
||||
s.Nil(err)
|
||||
s.Equal(regionStore.workTiKVIdx, state3.lastIdx)
|
||||
s.Equal(replicaSelector.targetIdx, state3.lastIdx)
|
||||
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil)
|
||||
// All replicas are exhausted.
|
||||
rpcCtx, err = replicaSelector.next(s.bo, req)
|
||||
|
|
@ -704,17 +630,15 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
|
|||
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req)
|
||||
s.NotNil(replicaSelector)
|
||||
s.Nil(err)
|
||||
state3 = replicaSelector.state.(*accessFollower)
|
||||
// Should fallback to the leader immediately.
|
||||
rpcCtx, err = replicaSelector.next(s.bo, req)
|
||||
s.Nil(err)
|
||||
s.Equal(regionStore.workTiKVIdx, state3.lastIdx)
|
||||
s.Equal(replicaSelector.targetIdx, state3.lastIdx)
|
||||
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil)
|
||||
|
||||
// Test accessFollower state filtering label-not-match stores.
|
||||
refreshRegionTTL(region)
|
||||
refreshEpochs(regionStore)
|
||||
refreshStoreHealthStatus(regionStore)
|
||||
labels := []*metapb.StoreLabel{
|
||||
{
|
||||
Key: "a",
|
||||
|
|
@ -814,7 +738,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
|
|||
resp, _, err = sender.SendReq(bo, req, region.Region, client.ReadTimeoutShort)
|
||||
s.Nil(err)
|
||||
s.NotNil(resp)
|
||||
s.Equal(sender.replicaSelector.targetReplica().peer.Id, s.peerIDs[1])
|
||||
s.Equal(sender.replicaSelector.target.peer.Id, s.peerIDs[1])
|
||||
s.True(bo.GetTotalBackoffTimes() == 1)
|
||||
s.cluster.StartStore(s.storeIDs[0])
|
||||
atomic.StoreUint32(®ionStore.stores[0].livenessState, uint32(reachable))
|
||||
|
|
@ -825,7 +749,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
|
|||
resp, _, err = sender.SendReq(bo, req, region.Region, client.ReadTimeoutShort)
|
||||
s.Nil(err)
|
||||
s.NotNil(resp)
|
||||
s.Equal(sender.replicaSelector.targetReplica().peer.Id, s.peerIDs[1])
|
||||
s.Equal(sender.replicaSelector.target.peer.Id, s.peerIDs[1])
|
||||
s.True(bo.GetTotalBackoffTimes() == 0)
|
||||
|
||||
// Switch to the next peer due to leader failure but the new leader is not elected.
|
||||
|
|
@ -856,31 +780,13 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
|
|||
s.Nil(err)
|
||||
s.True(hasFakeRegionError(resp))
|
||||
s.Equal(bo.GetTotalBackoffTimes(), 3)
|
||||
getReplicaSelectorRegion := func() *Region {
|
||||
if selector, ok := sender.replicaSelector.(*replicaSelector); ok {
|
||||
return selector.region
|
||||
}
|
||||
if selector, ok := sender.replicaSelector.(*replicaSelectorV2); ok {
|
||||
return selector.region
|
||||
}
|
||||
return nil
|
||||
}
|
||||
s.False(getReplicaSelectorRegion().isValid())
|
||||
s.False(sender.replicaSelector.region.isValid())
|
||||
s.cluster.ChangeLeader(s.regionID, s.peerIDs[0])
|
||||
|
||||
// The leader store is alive but can't provide service.
|
||||
getReplicaSelectorRegionStores := func() []*Store {
|
||||
if selector, ok := sender.replicaSelector.(*replicaSelector); ok {
|
||||
return selector.regionStore.stores
|
||||
}
|
||||
if selector, ok := sender.replicaSelector.(*replicaSelectorV2); ok {
|
||||
return selector.region.getStore().stores
|
||||
}
|
||||
return nil
|
||||
}
|
||||
reachable.injectConstantLiveness(s.cache.stores)
|
||||
s.Eventually(func() bool {
|
||||
stores := getReplicaSelectorRegionStores()
|
||||
stores := sender.replicaSelector.region.getStore().stores
|
||||
return stores[0].getLivenessState() == reachable &&
|
||||
stores[1].getLivenessState() == reachable &&
|
||||
stores[2].getLivenessState() == reachable
|
||||
|
|
@ -892,7 +798,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
|
|||
resp, _, err = sender.SendReq(bo, req, region.Region, time.Second)
|
||||
s.Nil(err)
|
||||
s.True(hasFakeRegionError(resp))
|
||||
s.False(getReplicaSelectorRegion().isValid())
|
||||
s.False(sender.replicaSelector.region.isValid())
|
||||
s.Equal(bo.GetTotalBackoffTimes(), maxReplicaAttempt+2)
|
||||
s.cluster.StartStore(s.storeIDs[0])
|
||||
|
||||
|
|
@ -926,7 +832,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
|
|||
resp, _, err := sender.SendReq(bo, req, region.Region, time.Second)
|
||||
s.Nil(err)
|
||||
s.True(hasFakeRegionError(resp))
|
||||
s.False(getReplicaSelectorRegion().isValid())
|
||||
s.False(sender.replicaSelector.region.isValid())
|
||||
s.Equal(bo.GetTotalBackoffTimes(), maxReplicaAttempt+2)
|
||||
}()
|
||||
}
|
||||
|
|
@ -946,7 +852,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
|
|||
resp, _, err := sender.SendReq(bo, req, region.Region, time.Second)
|
||||
s.Nil(err)
|
||||
s.True(hasFakeRegionError(resp))
|
||||
s.False(getReplicaSelectorRegion().isValid())
|
||||
s.False(sender.replicaSelector.region.isValid())
|
||||
s.Equal(bo.GetTotalBackoffTimes(), 0)
|
||||
}()
|
||||
|
||||
|
|
@ -965,7 +871,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
|
|||
resp, _, err := sender.SendReq(bo, req, region.Region, time.Second)
|
||||
s.Nil(err)
|
||||
s.True(hasFakeRegionError(resp))
|
||||
s.False(getReplicaSelectorRegion().isValid())
|
||||
s.False(sender.replicaSelector.region.isValid())
|
||||
s.Equal(bo.GetTotalBackoffTimes(), 0)
|
||||
}()
|
||||
|
||||
|
|
@ -998,7 +904,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
|
|||
regionErr, _ := resp.GetRegionError()
|
||||
s.NotNil(regionErr)
|
||||
}
|
||||
s.False(getReplicaSelectorRegion().isValid())
|
||||
s.False(sender.replicaSelector.region.isValid())
|
||||
s.Equal(bo.GetTotalBackoffTimes(), 0)
|
||||
}()
|
||||
}
|
||||
|
|
@ -1014,7 +920,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
|
|||
s.Nil(err)
|
||||
s.True(hasFakeRegionError(resp))
|
||||
s.True(bo.GetTotalBackoffTimes() == 3)
|
||||
s.False(getReplicaSelectorRegion().isValid())
|
||||
s.False(sender.replicaSelector.region.isValid())
|
||||
for _, store := range s.storeIDs {
|
||||
s.cluster.StartStore(store)
|
||||
}
|
||||
|
|
@ -1029,12 +935,12 @@ func (s *testRegionRequestToThreeStoresSuite) TestLoadBasedReplicaRead() {
|
|||
BusyThresholdMs: 50,
|
||||
})
|
||||
|
||||
replicaSelector, err := NewReplicaSelector(s.cache, regionLoc.Region, req)
|
||||
replicaSelector, err := newReplicaSelector(s.cache, regionLoc.Region, req)
|
||||
s.NotNil(replicaSelector)
|
||||
s.Nil(err)
|
||||
s.Equal(replicaSelector.getBaseReplicaSelector().region, region)
|
||||
s.Equal(replicaSelector.region, region)
|
||||
// The busyThreshold in replicaSelector should be initialized with the request context.
|
||||
s.Equal(replicaSelector.getBaseReplicaSelector().busyThreshold, 50*time.Millisecond)
|
||||
s.Equal(replicaSelector.busyThreshold, 50*time.Millisecond)
|
||||
|
||||
bo := retry.NewBackoffer(context.Background(), -1)
|
||||
rpcCtx, err := replicaSelector.next(bo, req)
|
||||
|
|
@ -1051,7 +957,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestLoadBasedReplicaRead() {
|
|||
rpcCtx, err = replicaSelector.next(bo, req)
|
||||
s.Nil(err)
|
||||
s.NotEqual(rpcCtx.Peer.Id, s.leaderPeer)
|
||||
rpcCtx.contextPatcher.applyTo(&req.Context)
|
||||
s.True(req.ReplicaRead)
|
||||
s.Equal(req.BusyThresholdMs, uint32(50))
|
||||
lastPeerID := rpcCtx.Peer.Id
|
||||
|
|
@ -1065,7 +970,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestLoadBasedReplicaRead() {
|
|||
// Should choose a peer different from before
|
||||
s.NotEqual(rpcCtx.Peer.Id, s.leaderPeer)
|
||||
s.NotEqual(rpcCtx.Peer.Id, lastPeerID)
|
||||
rpcCtx.contextPatcher.applyTo(&req.Context)
|
||||
s.True(req.ReplicaRead)
|
||||
s.Equal(req.BusyThresholdMs, uint32(50))
|
||||
|
||||
|
|
@ -1079,22 +983,20 @@ func (s *testRegionRequestToThreeStoresSuite) TestLoadBasedReplicaRead() {
|
|||
rpcCtx, err = replicaSelector.next(bo, req)
|
||||
s.Nil(err)
|
||||
s.Equal(rpcCtx.Peer.Id, s.leaderPeer)
|
||||
rpcCtx.contextPatcher.applyTo(&req.Context)
|
||||
s.False(req.ReplicaRead)
|
||||
s.Equal(req.BusyThresholdMs, uint32(0))
|
||||
s.True(replicaSelector.getBaseReplicaSelector().region.isValid()) // don't invalidate region when can't find an idle replica.
|
||||
s.True(replicaSelector.region.isValid()) // don't invalidate region when can't find an idle replica.
|
||||
|
||||
time.Sleep(120 * time.Millisecond)
|
||||
|
||||
// When there comes a new request, it should skip busy leader and choose a less busy store
|
||||
req.BusyThresholdMs = 50
|
||||
replicaSelector, err = NewReplicaSelector(s.cache, regionLoc.Region, req)
|
||||
replicaSelector, err = newReplicaSelector(s.cache, regionLoc.Region, req)
|
||||
s.NotNil(replicaSelector)
|
||||
s.Nil(err)
|
||||
rpcCtx, err = replicaSelector.next(bo, req)
|
||||
s.Nil(err)
|
||||
s.Equal(rpcCtx.Peer.Id, lessBusyPeer)
|
||||
rpcCtx.contextPatcher.applyTo(&req.Context)
|
||||
s.True(req.ReplicaRead)
|
||||
}
|
||||
|
||||
|
|
@ -1407,7 +1309,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestRetryRequestSource() {
|
|||
}
|
||||
}
|
||||
|
||||
setTargetReplica := func(selector ReplicaSelector, readType string) {
|
||||
setTargetReplica := func(selector *replicaSelector, readType string) {
|
||||
var leader bool
|
||||
switch readType {
|
||||
case "leader", "stale_leader":
|
||||
|
|
@ -1417,23 +1319,13 @@ func (s *testRegionRequestToThreeStoresSuite) TestRetryRequestSource() {
|
|||
default:
|
||||
panic("unreachable")
|
||||
}
|
||||
for idx, replica := range selector.getBaseReplicaSelector().replicas {
|
||||
for _, replica := range selector.replicas {
|
||||
if replica.store.storeID == leaderStore.storeID && leader {
|
||||
if v1, ok := selector.(*replicaSelector); ok {
|
||||
v1.targetIdx = AccessIndex(idx)
|
||||
}
|
||||
if v2, ok := selector.(*replicaSelectorV2); ok {
|
||||
v2.target = replica
|
||||
}
|
||||
selector.target = replica
|
||||
return
|
||||
}
|
||||
if replica.store.storeID != leaderStore.storeID && !leader {
|
||||
if v1, ok := selector.(*replicaSelector); ok {
|
||||
v1.targetIdx = AccessIndex(idx)
|
||||
}
|
||||
if v2, ok := selector.(*replicaSelectorV2); ok {
|
||||
v2.target = replica
|
||||
}
|
||||
selector.target = replica
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
@ -1447,23 +1339,25 @@ func (s *testRegionRequestToThreeStoresSuite) TestRetryRequestSource() {
|
|||
bo := retry.NewBackoffer(context.Background(), -1)
|
||||
req.IsRetryRequest = false
|
||||
setReadType(req, firstReplica)
|
||||
replicaSelector, err := NewReplicaSelector(s.cache, regionLoc.Region, req)
|
||||
replicaSelector, err := newReplicaSelector(s.cache, regionLoc.Region, req)
|
||||
s.Nil(err)
|
||||
setTargetReplica(replicaSelector, firstReplica)
|
||||
rpcCtx, err := replicaSelector.getBaseReplicaSelector().buildRPCContext(bo, replicaSelector.targetReplica(), replicaSelector.proxyReplica())
|
||||
rpcCtx, err := replicaSelector.buildRPCContext(bo, replicaSelector.target, replicaSelector.proxy)
|
||||
s.Nil(err)
|
||||
patchRequestSource(req, replicaSelector.replicaType(rpcCtx))
|
||||
s.NotNil(rpcCtx)
|
||||
patchRequestSource(req, replicaSelector.replicaType())
|
||||
s.Equal(firstReplica+"_test", req.RequestSource)
|
||||
|
||||
// retry
|
||||
setReadType(req, retryReplica)
|
||||
replicaSelector, err = NewReplicaSelector(s.cache, regionLoc.Region, req)
|
||||
replicaSelector, err = newReplicaSelector(s.cache, regionLoc.Region, req)
|
||||
s.Nil(err)
|
||||
setTargetReplica(replicaSelector, retryReplica)
|
||||
rpcCtx, err = replicaSelector.getBaseReplicaSelector().buildRPCContext(bo, replicaSelector.targetReplica(), replicaSelector.proxyReplica())
|
||||
rpcCtx, err = replicaSelector.buildRPCContext(bo, replicaSelector.target, replicaSelector.proxy)
|
||||
s.Nil(err)
|
||||
s.NotNil(rpcCtx)
|
||||
req.IsRetryRequest = true
|
||||
patchRequestSource(req, replicaSelector.replicaType(rpcCtx))
|
||||
patchRequestSource(req, replicaSelector.replicaType())
|
||||
s.Equal("retry_"+firstReplica+"_"+retryReplica+"_test", req.RequestSource)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,45 +21,15 @@ import (
|
|||
"github.com/pingcap/kvproto/pkg/errorpb"
|
||||
"github.com/pingcap/kvproto/pkg/metapb"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/tikv/client-go/v2/config"
|
||||
"github.com/tikv/client-go/v2/config/retry"
|
||||
"github.com/tikv/client-go/v2/internal/client"
|
||||
"github.com/tikv/client-go/v2/kv"
|
||||
"github.com/tikv/client-go/v2/metrics"
|
||||
"github.com/tikv/client-go/v2/tikvrpc"
|
||||
"github.com/tikv/client-go/v2/util"
|
||||
)
|
||||
|
||||
type ReplicaSelector interface {
|
||||
next(bo *retry.Backoffer, req *tikvrpc.Request) (*RPCContext, error)
|
||||
targetReplica() *replica
|
||||
proxyReplica() *replica
|
||||
replicaType(rpcCtx *RPCContext) string
|
||||
String() string
|
||||
getBaseReplicaSelector() *baseReplicaSelector
|
||||
getLabels() []*metapb.StoreLabel
|
||||
onSendSuccess(req *tikvrpc.Request)
|
||||
onSendFailure(bo *retry.Backoffer, err error)
|
||||
invalidateRegion()
|
||||
// Following methods are used to handle region errors.
|
||||
onNotLeader(bo *retry.Backoffer, ctx *RPCContext, notLeader *errorpb.NotLeader) (shouldRetry bool, err error)
|
||||
onDataIsNotReady()
|
||||
onServerIsBusy(bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, serverIsBusy *errorpb.ServerIsBusy) (shouldRetry bool, err error)
|
||||
onReadReqConfigurableTimeout(req *tikvrpc.Request) bool
|
||||
isValid() bool
|
||||
}
|
||||
|
||||
// NewReplicaSelector returns a new ReplicaSelector.
|
||||
//
|
||||
//nolint:staticcheck // ignore SA4023, never returns a nil interface value
|
||||
func NewReplicaSelector(
|
||||
regionCache *RegionCache, regionID RegionVerID, req *tikvrpc.Request, opts ...StoreSelectorOption,
|
||||
) (ReplicaSelector, error) {
|
||||
if config.GetGlobalConfig().TiKVClient.EnableReplicaSelectorV2 {
|
||||
return newReplicaSelectorV2(regionCache, regionID, req, opts...)
|
||||
}
|
||||
return newReplicaSelector(regionCache, regionID, req, opts...)
|
||||
}
|
||||
|
||||
type replicaSelectorV2 struct {
|
||||
type replicaSelector struct {
|
||||
baseReplicaSelector
|
||||
replicaReadType kv.ReplicaReadType
|
||||
isStaleRead bool
|
||||
|
|
@ -70,9 +40,9 @@ type replicaSelectorV2 struct {
|
|||
attempts int
|
||||
}
|
||||
|
||||
func newReplicaSelectorV2(
|
||||
func newReplicaSelector(
|
||||
regionCache *RegionCache, regionID RegionVerID, req *tikvrpc.Request, opts ...StoreSelectorOption,
|
||||
) (*replicaSelectorV2, error) {
|
||||
) (*replicaSelector, error) {
|
||||
cachedRegion := regionCache.GetCachedRegionWithRLock(regionID)
|
||||
if cachedRegion == nil || !cachedRegion.isValid() {
|
||||
return nil, nil
|
||||
|
|
@ -85,7 +55,7 @@ func newReplicaSelectorV2(
|
|||
if req.ReplicaReadType == kv.ReplicaReadPreferLeader {
|
||||
WithPerferLeader()(&option)
|
||||
}
|
||||
return &replicaSelectorV2{
|
||||
return &replicaSelector{
|
||||
baseReplicaSelector: baseReplicaSelector{
|
||||
regionCache: regionCache,
|
||||
region: cachedRegion,
|
||||
|
|
@ -101,11 +71,33 @@ func newReplicaSelectorV2(
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (s *replicaSelectorV2) isValid() bool {
|
||||
return s != nil
|
||||
func buildTiKVReplicas(region *Region) []*replica {
|
||||
regionStore := region.getStore()
|
||||
replicas := make([]*replica, 0, regionStore.accessStoreNum(tiKVOnly))
|
||||
for _, storeIdx := range regionStore.accessIndex[tiKVOnly] {
|
||||
replicas = append(
|
||||
replicas, &replica{
|
||||
store: regionStore.stores[storeIdx],
|
||||
peer: region.meta.Peers[storeIdx],
|
||||
epoch: regionStore.storeEpochs[storeIdx],
|
||||
attempts: 0,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
if val, err := util.EvalFailpoint("newReplicaSelectorInitialAttemptedTime"); err == nil {
|
||||
attemptedTime, err := time.ParseDuration(val.(string))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
for _, r := range replicas {
|
||||
r.attemptedTime = attemptedTime
|
||||
}
|
||||
}
|
||||
return replicas
|
||||
}
|
||||
|
||||
func (s *replicaSelectorV2) next(bo *retry.Backoffer, req *tikvrpc.Request) (rpcCtx *RPCContext, err error) {
|
||||
func (s *replicaSelector) next(bo *retry.Backoffer, req *tikvrpc.Request) (rpcCtx *RPCContext, err error) {
|
||||
if !s.region.isValid() {
|
||||
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("invalid").Inc()
|
||||
return nil, nil
|
||||
|
|
@ -126,7 +118,7 @@ func (s *replicaSelectorV2) next(bo *retry.Backoffer, req *tikvrpc.Request) (rpc
|
|||
return s.buildRPCContext(bo, s.target, s.proxy)
|
||||
}
|
||||
|
||||
func (s *replicaSelectorV2) nextForReplicaReadLeader(req *tikvrpc.Request) {
|
||||
func (s *replicaSelector) nextForReplicaReadLeader(req *tikvrpc.Request) {
|
||||
if s.regionCache.enableForwarding {
|
||||
strategy := ReplicaSelectLeaderWithProxyStrategy{}
|
||||
s.target, s.proxy = strategy.next(s)
|
||||
|
|
@ -140,7 +132,7 @@ func (s *replicaSelectorV2) nextForReplicaReadLeader(req *tikvrpc.Request) {
|
|||
leaderIdx := s.region.getStore().workTiKVIdx
|
||||
strategy := ReplicaSelectLeaderStrategy{leaderIdx: leaderIdx}
|
||||
s.target = strategy.next(s.replicas)
|
||||
if s.target != nil && s.busyThreshold > 0 && s.isReadOnlyReq && (s.target.store.EstimatedWaitTime() > s.busyThreshold || s.target.serverIsBusy) {
|
||||
if s.target != nil && s.busyThreshold > 0 && s.isReadOnlyReq && (s.target.store.EstimatedWaitTime() > s.busyThreshold || s.target.hasFlag(serverIsBusyFlag)) {
|
||||
// If the leader is busy in our estimation, try other idle replicas.
|
||||
// If other replicas are all busy, tryIdleReplica will try the leader again without busy threshold.
|
||||
mixedStrategy := ReplicaSelectMixedStrategy{leaderIdx: leaderIdx, busyThreshold: s.busyThreshold}
|
||||
|
|
@ -160,13 +152,13 @@ func (s *replicaSelectorV2) nextForReplicaReadLeader(req *tikvrpc.Request) {
|
|||
}
|
||||
mixedStrategy := ReplicaSelectMixedStrategy{leaderIdx: leaderIdx, leaderOnly: s.option.leaderOnly}
|
||||
s.target = mixedStrategy.next(s)
|
||||
if s.target != nil && s.isReadOnlyReq && s.replicas[leaderIdx].deadlineErrUsingConfTimeout {
|
||||
if s.target != nil && s.isReadOnlyReq && s.replicas[leaderIdx].hasFlag(deadlineErrUsingConfTimeoutFlag) {
|
||||
req.ReplicaRead = true
|
||||
req.StaleRead = false
|
||||
}
|
||||
}
|
||||
|
||||
func (s *replicaSelectorV2) nextForReplicaReadMixed(req *tikvrpc.Request) {
|
||||
func (s *replicaSelector) nextForReplicaReadMixed(req *tikvrpc.Request) {
|
||||
leaderIdx := s.region.getStore().workTiKVIdx
|
||||
if s.isStaleRead && s.attempts == 2 {
|
||||
// For stale read second retry, try leader by leader read.
|
||||
|
|
@ -230,6 +222,24 @@ func (s ReplicaSelectLeaderStrategy) next(replicas []*replica) *replica {
|
|||
return nil
|
||||
}
|
||||
|
||||
// check leader is candidate or not.
|
||||
func isLeaderCandidate(leader *replica) bool {
|
||||
// If hibernate region is enabled and the leader is not reachable, the raft group
|
||||
// will not be wakened up and re-elect the leader until the follower receives
|
||||
// a request. So, before the new leader is elected, we should not send requests
|
||||
// to the unreachable old leader to avoid unnecessary timeout.
|
||||
// If leader.deadlineErrUsingConfTimeout is true, it means the leader is already tried and received deadline exceeded error, then don't retry it.
|
||||
// If leader.notLeader is true, it means the leader is already tried and received not leader error, then don't retry it.
|
||||
if leader.store.getLivenessState() != reachable ||
|
||||
leader.isExhausted(maxReplicaAttempt, maxReplicaAttemptTime) ||
|
||||
leader.hasFlag(deadlineErrUsingConfTimeoutFlag) ||
|
||||
leader.hasFlag(notLeaderFlag) ||
|
||||
leader.isEpochStale() { // check leader epoch here, if leader.epoch staled, we can try other replicas. instead of buildRPCContext failed and invalidate region then retry.
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// ReplicaSelectMixedStrategy is used to select a replica by calculating a score for each replica, and then choose the one with the highest score.
|
||||
// Attention, if you want the leader replica must be chosen in some case, you should use ReplicaSelectLeaderStrategy, instead of use ReplicaSelectMixedStrategy with preferLeader flag.
|
||||
type ReplicaSelectMixedStrategy struct {
|
||||
|
|
@ -243,7 +253,7 @@ type ReplicaSelectMixedStrategy struct {
|
|||
busyThreshold time.Duration
|
||||
}
|
||||
|
||||
func (s *ReplicaSelectMixedStrategy) next(selector *replicaSelectorV2) *replica {
|
||||
func (s *ReplicaSelectMixedStrategy) next(selector *replicaSelector) *replica {
|
||||
replicas := selector.replicas
|
||||
maxScoreIdxes := make([]int, 0, len(replicas))
|
||||
var maxScore storeSelectionScore = -1
|
||||
|
|
@ -290,13 +300,23 @@ func (s *ReplicaSelectMixedStrategy) next(selector *replicaSelectorV2) *replica
|
|||
return nil
|
||||
}
|
||||
|
||||
func hasDeadlineExceededError(replicas []*replica) bool {
|
||||
for _, replica := range replicas {
|
||||
if replica.hasFlag(deadlineErrUsingConfTimeoutFlag) {
|
||||
// when meet deadline exceeded error, do fast retry without invalidate region cache.
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *ReplicaSelectMixedStrategy) isCandidate(r *replica, isLeader bool, epochStale bool, liveness livenessState) bool {
|
||||
if epochStale || liveness == unreachable {
|
||||
// the replica is not available, skip it.
|
||||
return false
|
||||
}
|
||||
maxAttempt := 1
|
||||
if r.dataIsNotReady && !isLeader {
|
||||
if r.hasFlag(dataIsNotReadyFlag) && !isLeader {
|
||||
// If the replica is failed by data not ready with stale read, we can retry it with replica-read.
|
||||
// after https://github.com/tikv/tikv/pull/15726, the leader will not return DataIsNotReady error,
|
||||
// then no need to retry leader again. If you try it again, you may get a NotLeader error.
|
||||
|
|
@ -309,7 +329,7 @@ func (s *ReplicaSelectMixedStrategy) isCandidate(r *replica, isLeader bool, epoc
|
|||
if s.leaderOnly && !isLeader {
|
||||
return false
|
||||
}
|
||||
if s.busyThreshold > 0 && (r.store.EstimatedWaitTime() > s.busyThreshold || r.serverIsBusy || isLeader) {
|
||||
if s.busyThreshold > 0 && (r.store.EstimatedWaitTime() > s.busyThreshold || r.hasFlag(serverIsBusyFlag) || isLeader) {
|
||||
return false
|
||||
}
|
||||
if s.preferLeader && r.store.healthStatus.IsSlow() && !isLeader {
|
||||
|
|
@ -406,12 +426,12 @@ func (s *ReplicaSelectMixedStrategy) calculateScore(r *replica, isLeader bool) s
|
|||
|
||||
type ReplicaSelectLeaderWithProxyStrategy struct{}
|
||||
|
||||
func (s ReplicaSelectLeaderWithProxyStrategy) next(selector *replicaSelectorV2) (leader *replica, proxy *replica) {
|
||||
func (s ReplicaSelectLeaderWithProxyStrategy) next(selector *replicaSelector) (leader *replica, proxy *replica) {
|
||||
rs := selector.region.getStore()
|
||||
leaderIdx := rs.workTiKVIdx
|
||||
replicas := selector.replicas
|
||||
leader = replicas[leaderIdx]
|
||||
if leader.store.getLivenessState() == reachable || leader.notLeader {
|
||||
if leader.store.getLivenessState() == reachable || leader.hasFlag(notLeaderFlag) {
|
||||
// if leader's store is reachable, no need use proxy.
|
||||
rs.unsetProxyStoreIfNeeded(selector.region)
|
||||
return leader, nil
|
||||
|
|
@ -444,16 +464,19 @@ func (s ReplicaSelectLeaderWithProxyStrategy) isCandidate(r *replica, isLeader b
|
|||
return true
|
||||
}
|
||||
|
||||
func (s *replicaSelectorV2) onNotLeader(
|
||||
func (s *replicaSelector) onNotLeader(
|
||||
bo *retry.Backoffer, ctx *RPCContext, notLeader *errorpb.NotLeader,
|
||||
) (shouldRetry bool, err error) {
|
||||
if s.target != nil {
|
||||
s.target.notLeader = true
|
||||
s.target.addFlag(notLeaderFlag)
|
||||
}
|
||||
leaderIdx, err := s.baseReplicaSelector.onNotLeader(bo, ctx, notLeader)
|
||||
if err != nil {
|
||||
return false, err
|
||||
leader := notLeader.GetLeader()
|
||||
if leader == nil {
|
||||
// The region may be during transferring leader.
|
||||
err = bo.Backoff(retry.BoRegionScheduling, errors.Errorf("no leader, ctx: %v", ctx))
|
||||
return err == nil, err
|
||||
}
|
||||
leaderIdx := s.updateLeader(leader)
|
||||
if leaderIdx >= 0 {
|
||||
if isLeaderCandidate(s.replicas[leaderIdx]) {
|
||||
s.replicaReadType = kv.ReplicaReadLeader
|
||||
|
|
@ -462,7 +485,7 @@ func (s *replicaSelectorV2) onNotLeader(
|
|||
return true, nil
|
||||
}
|
||||
|
||||
func (s *replicaSelectorV2) onFlashbackInProgress(ctx *RPCContext, req *tikvrpc.Request) bool {
|
||||
func (s *replicaSelector) onFlashbackInProgress(req *tikvrpc.Request) bool {
|
||||
// if the failure is caused by replica read, we can retry it with leader safely.
|
||||
if req.ReplicaRead && s.target != nil && s.target.peer.Id != s.region.GetLeaderPeerID() {
|
||||
req.BusyThresholdMs = 0
|
||||
|
|
@ -474,13 +497,13 @@ func (s *replicaSelectorV2) onFlashbackInProgress(ctx *RPCContext, req *tikvrpc.
|
|||
return false
|
||||
}
|
||||
|
||||
func (s *replicaSelectorV2) onDataIsNotReady() {
|
||||
func (s *replicaSelector) onDataIsNotReady() {
|
||||
if s.target != nil {
|
||||
s.target.dataIsNotReady = true
|
||||
s.target.addFlag(dataIsNotReadyFlag)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *replicaSelectorV2) onServerIsBusy(
|
||||
func (s *replicaSelector) onServerIsBusy(
|
||||
bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, serverIsBusy *errorpb.ServerIsBusy,
|
||||
) (shouldRetry bool, err error) {
|
||||
var store *Store
|
||||
|
|
@ -495,7 +518,7 @@ func (s *replicaSelectorV2) onServerIsBusy(
|
|||
return false, nil
|
||||
}
|
||||
if s.target != nil {
|
||||
s.target.serverIsBusy = true
|
||||
s.target.addFlag(serverIsBusyFlag)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
|
@ -515,28 +538,47 @@ func (s *replicaSelectorV2) onServerIsBusy(
|
|||
return true, nil
|
||||
}
|
||||
|
||||
func (s *replicaSelectorV2) canFastRetry() bool {
|
||||
func (s *replicaSelector) canFastRetry() bool {
|
||||
if s.replicaReadType == kv.ReplicaReadLeader {
|
||||
leaderIdx := s.region.getStore().workTiKVIdx
|
||||
leader := s.replicas[leaderIdx]
|
||||
if isLeaderCandidate(leader) && !leader.serverIsBusy {
|
||||
if isLeaderCandidate(leader) && !leader.hasFlag(serverIsBusyFlag) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *replicaSelectorV2) onReadReqConfigurableTimeout(req *tikvrpc.Request) bool {
|
||||
func (s *replicaSelector) onReadReqConfigurableTimeout(req *tikvrpc.Request) bool {
|
||||
if isReadReqConfigurableTimeout(req) {
|
||||
if s.target != nil {
|
||||
s.target.deadlineErrUsingConfTimeout = true
|
||||
s.target.addFlag(deadlineErrUsingConfTimeoutFlag)
|
||||
}
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *replicaSelectorV2) onSendFailure(bo *retry.Backoffer, err error) {
|
||||
func isReadReqConfigurableTimeout(req *tikvrpc.Request) bool {
|
||||
if req.MaxExecutionDurationMs >= uint64(client.ReadTimeoutShort.Milliseconds()) {
|
||||
// Configurable timeout should less than `ReadTimeoutShort`.
|
||||
return false
|
||||
}
|
||||
// Only work for read requests, return false for non-read requests.
|
||||
return isReadReq(req.Type)
|
||||
}
|
||||
|
||||
func isReadReq(tp tikvrpc.CmdType) bool {
|
||||
switch tp {
|
||||
case tikvrpc.CmdGet, tikvrpc.CmdBatchGet, tikvrpc.CmdScan,
|
||||
tikvrpc.CmdCop, tikvrpc.CmdBatchCop, tikvrpc.CmdCopStream:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (s *replicaSelector) onSendFailure(bo *retry.Backoffer, err error) {
|
||||
metrics.RegionCacheCounterWithSendFail.Inc()
|
||||
// todo: mark store need check and return to fast retry.
|
||||
target := s.target
|
||||
|
|
@ -554,7 +596,7 @@ func (s *replicaSelectorV2) onSendFailure(bo *retry.Backoffer, err error) {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *replicaSelectorV2) onSendSuccess(req *tikvrpc.Request) {
|
||||
func (s *replicaSelector) onSendSuccess(req *tikvrpc.Request) {
|
||||
if s.proxy != nil && s.target != nil {
|
||||
for idx, r := range s.replicas {
|
||||
if r.peer.Id == s.proxy.peer.Id {
|
||||
|
|
@ -568,19 +610,7 @@ func (s *replicaSelectorV2) onSendSuccess(req *tikvrpc.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *replicaSelectorV2) targetReplica() *replica {
|
||||
return s.target
|
||||
}
|
||||
|
||||
func (s *replicaSelectorV2) proxyReplica() *replica {
|
||||
return s.proxy
|
||||
}
|
||||
|
||||
func (s *replicaSelectorV2) getLabels() []*metapb.StoreLabel {
|
||||
return s.option.labels
|
||||
}
|
||||
|
||||
func (s *replicaSelectorV2) replicaType(_ *RPCContext) string {
|
||||
func (s *replicaSelector) replicaType() string {
|
||||
if s.target != nil {
|
||||
if s.target.peer.Id == s.region.GetLeaderPeerID() {
|
||||
return "leader"
|
||||
|
|
@ -590,7 +620,7 @@ func (s *replicaSelectorV2) replicaType(_ *RPCContext) string {
|
|||
return "unknown"
|
||||
}
|
||||
|
||||
func (s *replicaSelectorV2) String() string {
|
||||
func (s *replicaSelector) String() string {
|
||||
if s == nil {
|
||||
return ""
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,22 +17,19 @@ import (
|
|||
"github.com/pingcap/kvproto/pkg/errorpb"
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/pingcap/kvproto/pkg/metapb"
|
||||
"github.com/pingcap/log"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"github.com/tikv/client-go/v2/config"
|
||||
"github.com/tikv/client-go/v2/config/retry"
|
||||
"github.com/tikv/client-go/v2/internal/apicodec"
|
||||
"github.com/tikv/client-go/v2/internal/client"
|
||||
"github.com/tikv/client-go/v2/internal/client/mockserver"
|
||||
"github.com/tikv/client-go/v2/internal/logutil"
|
||||
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
|
||||
"github.com/tikv/client-go/v2/kv"
|
||||
"github.com/tikv/client-go/v2/oracle"
|
||||
"github.com/tikv/client-go/v2/tikvrpc"
|
||||
"github.com/tikv/client-go/v2/util/israce"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
)
|
||||
|
||||
type testReplicaSelectorSuite struct {
|
||||
|
|
@ -96,7 +93,7 @@ type replicaSelectorAccessPathCase struct {
|
|||
expect *accessPathResult
|
||||
result accessPathResult
|
||||
beforeRun func() // beforeRun will be called before the test case execute, if it is nil, resetStoreState will be called.
|
||||
afterRun func(ReplicaSelector) // afterRun will be called after the test case execute, if it is nil, invalidateRegion will be called.
|
||||
afterRun func(*replicaSelector) // afterRun will be called after the test case execute, if it is nil, invalidateRegion will be called.
|
||||
}
|
||||
|
||||
type accessPathResult struct {
|
||||
|
|
@ -118,29 +115,28 @@ func TestReplicaSelectorBasic(t *testing.T) {
|
|||
rc := s.getRegion()
|
||||
s.NotNil(rc)
|
||||
rc.invalidate(Other)
|
||||
selector, err := newReplicaSelectorV2(s.cache, rc.VerID(), req)
|
||||
selector, err := newReplicaSelector(s.cache, rc.VerID(), req)
|
||||
s.Nil(err)
|
||||
s.Nil(selector)
|
||||
s.Equal("", selector.String())
|
||||
selector2, err := NewReplicaSelector(s.cache, rc.VerID(), req)
|
||||
selector2, err := newReplicaSelector(s.cache, rc.VerID(), req)
|
||||
s.Nil(err)
|
||||
s.Nil(selector2)
|
||||
s.False(selector2 == nil) // since never returns a nil interface value
|
||||
s.False(selector2.isValid())
|
||||
s.True(selector2 == nil)
|
||||
s.Equal("", selector2.String())
|
||||
|
||||
rc = s.getRegion()
|
||||
selector, err = newReplicaSelectorV2(s.cache, rc.VerID(), req)
|
||||
selector, err = newReplicaSelector(s.cache, rc.VerID(), req)
|
||||
s.Nil(err)
|
||||
s.NotNil(selector)
|
||||
for _, reqSource := range []string{"leader", "follower", "follower", "unknown"} {
|
||||
ctx, err := selector.next(s.bo, req)
|
||||
_, err := selector.next(s.bo, req)
|
||||
s.Nil(err)
|
||||
s.Equal(reqSource, selector.replicaType(ctx))
|
||||
s.Equal(reqSource, selector.replicaType())
|
||||
}
|
||||
|
||||
rc = s.getRegion()
|
||||
selector, err = newReplicaSelectorV2(s.cache, rc.VerID(), req)
|
||||
selector, err = newReplicaSelector(s.cache, rc.VerID(), req)
|
||||
s.Nil(err)
|
||||
s.NotNil(selector)
|
||||
ctx, err := selector.next(s.bo, req)
|
||||
|
|
@ -161,9 +157,9 @@ func TestReplicaSelectorCalculateScore(t *testing.T) {
|
|||
region, err := s.cache.LocateKey(s.bo, []byte("a"))
|
||||
s.Nil(err)
|
||||
s.NotNil(region)
|
||||
selector, err := NewReplicaSelector(s.cache, region.Region, req)
|
||||
selector, err := newReplicaSelector(s.cache, region.Region, req)
|
||||
s.Nil(err)
|
||||
for i, r := range selector.getBaseReplicaSelector().replicas {
|
||||
for i, r := range selector.replicas {
|
||||
rc := s.cache.GetCachedRegionWithRLock(region.Region)
|
||||
s.NotNil(rc)
|
||||
isLeader := r.peer.Id == rc.GetLeaderPeerID()
|
||||
|
|
@ -1413,7 +1409,7 @@ func TestReplicaReadAccessPathByLeaderCase(t *testing.T) {
|
|||
backoffDetail: []string{"regionScheduling+1"},
|
||||
regionIsValid: true,
|
||||
},
|
||||
afterRun: func(_ ReplicaSelector) { /* don't invalid region */ },
|
||||
afterRun: func(_ *replicaSelector) { /* don't invalid region */ },
|
||||
},
|
||||
{
|
||||
reqType: tikvrpc.CmdGet,
|
||||
|
|
@ -1431,7 +1427,7 @@ func TestReplicaReadAccessPathByLeaderCase(t *testing.T) {
|
|||
backoffDetail: []string{},
|
||||
regionIsValid: true,
|
||||
},
|
||||
afterRun: func(_ ReplicaSelector) { /* don't invalid region */ },
|
||||
afterRun: func(_ *replicaSelector) { /* don't invalid region */ },
|
||||
},
|
||||
{
|
||||
reqType: tikvrpc.CmdGet,
|
||||
|
|
@ -1737,7 +1733,7 @@ func TestReplicaReadAccessPathByMixedAndPreferLeaderCase(t *testing.T) {
|
|||
backoffDetail: []string{},
|
||||
regionIsValid: true,
|
||||
},
|
||||
afterRun: func(_ ReplicaSelector) { /* don't invalid region */ },
|
||||
afterRun: func(_ *replicaSelector) { /* don't invalid region */ },
|
||||
},
|
||||
{
|
||||
reqType: tikvrpc.CmdGet,
|
||||
|
|
@ -1781,7 +1777,7 @@ func TestReplicaReadAccessPathByMixedAndPreferLeaderCase(t *testing.T) {
|
|||
backoffDetail: []string{},
|
||||
regionIsValid: true,
|
||||
},
|
||||
afterRun: func(_ ReplicaSelector) { /* don't invalid region */ },
|
||||
afterRun: func(_ *replicaSelector) { /* don't invalid region */ },
|
||||
},
|
||||
{
|
||||
reqType: tikvrpc.CmdGet,
|
||||
|
|
@ -2130,7 +2126,7 @@ func TestReplicaReadAccessPathByStaleReadCase(t *testing.T) {
|
|||
backoffDetail: []string{"tikvRPC+2"},
|
||||
regionIsValid: true,
|
||||
},
|
||||
afterRun: func(_ ReplicaSelector) { /* don't invalid region */ },
|
||||
afterRun: func(_ *replicaSelector) { /* don't invalid region */ },
|
||||
},
|
||||
{
|
||||
reqType: tikvrpc.CmdGet,
|
||||
|
|
@ -2149,7 +2145,7 @@ func TestReplicaReadAccessPathByStaleReadCase(t *testing.T) {
|
|||
backoffDetail: []string{"tikvServerBusy+1"},
|
||||
regionIsValid: false,
|
||||
},
|
||||
afterRun: func(_ ReplicaSelector) { /* don't invalid region */ },
|
||||
afterRun: func(_ *replicaSelector) { /* don't invalid region */ },
|
||||
},
|
||||
}
|
||||
s.True(s.runMultiCaseAndCompare(cas))
|
||||
|
|
@ -2324,19 +2320,6 @@ func TestReplicaReadAccessPathByFlashbackInProgressCase(t *testing.T) {
|
|||
label: nil,
|
||||
accessErr: []RegionErrorType{FlashbackInProgressErr},
|
||||
expect: &accessPathResult{
|
||||
accessPath: []string{
|
||||
"{addr: store1, replica-read: true, stale-read: false}",
|
||||
"{addr: store1, replica-read: true, stale-read: false}",
|
||||
},
|
||||
respErr: "",
|
||||
respRegionError: nil,
|
||||
backoffCnt: 0,
|
||||
backoffDetail: []string{},
|
||||
regionIsValid: true,
|
||||
},
|
||||
}
|
||||
s.True(s.runCase(ca, false))
|
||||
ca.expect = &accessPathResult{
|
||||
accessPath: []string{
|
||||
"{addr: store1, replica-read: true, stale-read: false}",
|
||||
},
|
||||
|
|
@ -2345,8 +2328,9 @@ func TestReplicaReadAccessPathByFlashbackInProgressCase(t *testing.T) {
|
|||
backoffCnt: 0,
|
||||
backoffDetail: []string{},
|
||||
regionIsValid: true,
|
||||
},
|
||||
}
|
||||
s.True(s.runCase(ca, true))
|
||||
s.True(s.runCaseAndCompare(ca))
|
||||
|
||||
ca = replicaSelectorAccessPathCase{
|
||||
reqType: tikvrpc.CmdGet,
|
||||
|
|
@ -2356,20 +2340,6 @@ func TestReplicaReadAccessPathByFlashbackInProgressCase(t *testing.T) {
|
|||
label: nil,
|
||||
accessErr: []RegionErrorType{DeadLineExceededErr, FlashbackInProgressErr, FlashbackInProgressErr},
|
||||
expect: &accessPathResult{
|
||||
accessPath: []string{
|
||||
"{addr: store1, replica-read: true, stale-read: false}",
|
||||
"{addr: store2, replica-read: true, stale-read: false}",
|
||||
"{addr: store1, replica-read: true, stale-read: false}",
|
||||
},
|
||||
respErr: "region 0 is in flashback progress, FlashbackStartTS is 0",
|
||||
respRegionError: nil,
|
||||
backoffCnt: 0,
|
||||
backoffDetail: []string{},
|
||||
regionIsValid: true,
|
||||
},
|
||||
}
|
||||
s.True(s.runCase(ca, false))
|
||||
ca.expect = &accessPathResult{
|
||||
accessPath: []string{
|
||||
"{addr: store1, replica-read: true, stale-read: false}",
|
||||
"{addr: store2, replica-read: true, stale-read: false}",
|
||||
|
|
@ -2380,8 +2350,9 @@ func TestReplicaReadAccessPathByFlashbackInProgressCase(t *testing.T) {
|
|||
backoffCnt: 0,
|
||||
backoffDetail: []string{},
|
||||
regionIsValid: true,
|
||||
},
|
||||
}
|
||||
s.True(s.runCase(ca, true))
|
||||
s.True(s.runCaseAndCompare(ca))
|
||||
}
|
||||
|
||||
func TestReplicaReadAccessPathByProxyCase(t *testing.T) {
|
||||
|
|
@ -2409,7 +2380,7 @@ func TestReplicaReadAccessPathByProxyCase(t *testing.T) {
|
|||
backoffDetail: []string{"tikvRPC+1"},
|
||||
regionIsValid: true,
|
||||
},
|
||||
afterRun: func(_ ReplicaSelector) { /* don't invalid region */ },
|
||||
afterRun: func(_ *replicaSelector) { /* don't invalid region */ },
|
||||
},
|
||||
{
|
||||
reqType: tikvrpc.CmdGet,
|
||||
|
|
@ -2485,8 +2456,8 @@ func TestReplicaReadAccessPathByProxyCase(t *testing.T) {
|
|||
backoffDetail: []string{"tikvRPC+1", "tikvServerBusy+2"},
|
||||
regionIsValid: false,
|
||||
},
|
||||
afterRun: func(selector ReplicaSelector) {
|
||||
base := selector.getBaseReplicaSelector()
|
||||
afterRun: func(selector *replicaSelector) {
|
||||
base := selector.baseReplicaSelector
|
||||
s.NotNil(base)
|
||||
s.True(base.replicas[0].isEpochStale())
|
||||
s.True(base.replicas[0].epoch < atomic.LoadUint32(&base.replicas[0].store.epoch))
|
||||
|
|
@ -2651,8 +2622,7 @@ func TestReplicaReadAvoidSlowStore(t *testing.T) {
|
|||
store.healthStatus.updateTiKVServerSideSlowScore(100, time.Now())
|
||||
},
|
||||
}
|
||||
// v1 doesn't support avoiding slow stores. We only test this on v2.
|
||||
s.True(s.runCase(ca, true))
|
||||
s.True(s.runCaseAndCompare(ca))
|
||||
|
||||
s.T().Logf("test case: stale read: %v, with label: %v, slow: false, encoutner err: true", staleRead, withLabel)
|
||||
var expectedSecondPath string
|
||||
|
|
@ -2731,100 +2701,11 @@ func TestReplicaReadAvoidSlowStore(t *testing.T) {
|
|||
store.healthStatus.updateTiKVServerSideSlowScore(100, time.Now())
|
||||
},
|
||||
}
|
||||
s.True(s.runCase(ca, true))
|
||||
s.True(s.runCaseAndCompare(ca))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestReplicaReadAccessPathByGenError(t *testing.T) {
|
||||
t.Skip("skip TestReplicaReadAccessPathByGenError because it's unstable and slow.")
|
||||
s := new(testReplicaSelectorSuite)
|
||||
s.SetupTest(t)
|
||||
defer func(lv zapcore.Level) {
|
||||
log.SetLevel(lv)
|
||||
s.TearDownTest()
|
||||
}(log.GetLevel())
|
||||
log.SetLevel(zapcore.ErrorLevel)
|
||||
|
||||
maxAccessErrCnt := 4
|
||||
if israce.RaceEnabled {
|
||||
// When run this test with race, it will take a long time, so we reduce the maxAccessErrCnt to 2 to speed up test to avoid timeout.
|
||||
maxAccessErrCnt = 2
|
||||
}
|
||||
totalValidCaseCount := 0
|
||||
totalCaseCount := 0
|
||||
lastLogCnt := 0
|
||||
testCase := func(req tikvrpc.CmdType, readType kv.ReplicaReadType, staleRead bool, timeout time.Duration, busyThresholdMs uint32, label *metapb.StoreLabel) {
|
||||
isRead := isReadReq(req)
|
||||
accessErrGen := newAccessErrGenerator(isRead, staleRead, maxAccessErrCnt)
|
||||
for {
|
||||
accessErr, done := accessErrGen.genAccessErr(staleRead)
|
||||
if done {
|
||||
break
|
||||
}
|
||||
ca := replicaSelectorAccessPathCase{
|
||||
reqType: req,
|
||||
readType: readType,
|
||||
staleRead: staleRead,
|
||||
timeout: timeout,
|
||||
busyThresholdMs: busyThresholdMs,
|
||||
label: label,
|
||||
accessErr: accessErr,
|
||||
}
|
||||
valid := s.runCaseAndCompare(ca)
|
||||
if valid {
|
||||
totalValidCaseCount++
|
||||
}
|
||||
totalCaseCount++
|
||||
if totalCaseCount-lastLogCnt > 100000 {
|
||||
lastLogCnt = totalCaseCount
|
||||
logutil.BgLogger().Info("TestReplicaReadAccessPathByGenError is running",
|
||||
zap.Int("total-case", totalCaseCount),
|
||||
zap.Int("valid-case", totalValidCaseCount),
|
||||
zap.Int("invalid-case", totalCaseCount-totalValidCaseCount),
|
||||
zap.String("req", req.String()),
|
||||
zap.String("read-type", readType.String()),
|
||||
zap.Bool("stale-read", staleRead),
|
||||
zap.Duration("timeout", timeout),
|
||||
zap.Any("label", label),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
testCase(tikvrpc.CmdPrewrite, kv.ReplicaReadLeader, false, 0, 0, nil)
|
||||
testCase(tikvrpc.CmdPrewrite, kv.ReplicaReadLeader, false, 0, 10, nil)
|
||||
testCase(tikvrpc.CmdGet, kv.ReplicaReadLeader, false, 0, 0, nil)
|
||||
testCase(tikvrpc.CmdGet, kv.ReplicaReadLeader, false, 0, 10, nil)
|
||||
testCase(tikvrpc.CmdGet, kv.ReplicaReadFollower, false, 0, 0, nil)
|
||||
testCase(tikvrpc.CmdGet, kv.ReplicaReadPreferLeader, false, 0, 0, nil)
|
||||
testCase(tikvrpc.CmdGet, kv.ReplicaReadMixed, false, 0, 0, nil)
|
||||
testCase(tikvrpc.CmdGet, kv.ReplicaReadLeader, false, time.Second, 0, nil)
|
||||
testCase(tikvrpc.CmdGet, kv.ReplicaReadMixed, false, time.Second, 0, nil)
|
||||
testCase(tikvrpc.CmdGet, kv.ReplicaReadMixed, false, time.Second, 0, &metapb.StoreLabel{Key: "id", Value: "1"})
|
||||
testCase(tikvrpc.CmdGet, kv.ReplicaReadMixed, false, time.Second, 0, &metapb.StoreLabel{Key: "id", Value: "2"})
|
||||
testCase(tikvrpc.CmdGet, kv.ReplicaReadMixed, false, time.Second, 0, &metapb.StoreLabel{Key: "id", Value: "3"})
|
||||
testCase(tikvrpc.CmdGet, kv.ReplicaReadMixed, true, 0, 0, nil)
|
||||
testCase(tikvrpc.CmdGet, kv.ReplicaReadMixed, true, 0, 0, &metapb.StoreLabel{Key: "id", Value: "1"})
|
||||
testCase(tikvrpc.CmdGet, kv.ReplicaReadMixed, true, 0, 0, &metapb.StoreLabel{Key: "id", Value: "2"})
|
||||
testCase(tikvrpc.CmdGet, kv.ReplicaReadMixed, true, 0, 0, &metapb.StoreLabel{Key: "id", Value: "3"})
|
||||
testCase(tikvrpc.CmdGet, kv.ReplicaReadMixed, true, time.Second, 0, &metapb.StoreLabel{Key: "id", Value: "1"})
|
||||
testCase(tikvrpc.CmdGet, kv.ReplicaReadMixed, true, time.Second, 0, &metapb.StoreLabel{Key: "id", Value: "2"})
|
||||
testCase(tikvrpc.CmdGet, kv.ReplicaReadMixed, true, time.Second, 0, &metapb.StoreLabel{Key: "id", Value: "3"})
|
||||
|
||||
// Test for forwarding proxy.
|
||||
s.cache.enableForwarding = true
|
||||
testCase(tikvrpc.CmdPrewrite, kv.ReplicaReadLeader, false, 0, 0, nil)
|
||||
testCase(tikvrpc.CmdPrewrite, kv.ReplicaReadLeader, false, 0, 10, nil)
|
||||
testCase(tikvrpc.CmdGet, kv.ReplicaReadLeader, false, 0, 0, nil)
|
||||
testCase(tikvrpc.CmdGet, kv.ReplicaReadLeader, false, 0, 10, nil)
|
||||
|
||||
logutil.BgLogger().Info("TestReplicaReadAccessPathByGenError Finished",
|
||||
zap.Int("total-case", totalCaseCount),
|
||||
zap.Int("valid-case", totalValidCaseCount),
|
||||
zap.Int("invalid-case", totalCaseCount-totalValidCaseCount))
|
||||
}
|
||||
|
||||
func (s *testReplicaSelectorSuite) changeRegionLeader(storeId uint64) {
|
||||
loc, err := s.cache.LocateKey(s.bo, []byte("key"))
|
||||
s.Nil(err)
|
||||
|
|
@ -2838,72 +2719,27 @@ func (s *testReplicaSelectorSuite) changeRegionLeader(storeId uint64) {
|
|||
s.cache.InvalidateCachedRegion(loc.Region)
|
||||
}
|
||||
|
||||
func (s *testReplicaSelectorSuite) runCaseAndCompare(ca1 replicaSelectorAccessPathCase) bool {
|
||||
ca2 := ca1
|
||||
config.UpdateGlobal(func(conf *config.Config) {
|
||||
conf.TiKVClient.EnableReplicaSelectorV2 = false
|
||||
})
|
||||
sender := ca1.run(s)
|
||||
ca1.checkResult(s, "v1", sender)
|
||||
|
||||
config.UpdateGlobal(func(conf *config.Config) {
|
||||
conf.TiKVClient.EnableReplicaSelectorV2 = true
|
||||
})
|
||||
sender = ca2.run(s)
|
||||
if ca2.expect == nil {
|
||||
// compare with ca1 result.
|
||||
ca2.expect = &ca1.result
|
||||
}
|
||||
ca2.checkResult(s, "v2", sender)
|
||||
return !ca1.accessErrInValid
|
||||
}
|
||||
|
||||
func (s *testReplicaSelectorSuite) runCase(ca replicaSelectorAccessPathCase, v2 bool) bool {
|
||||
config.UpdateGlobal(func(conf *config.Config) {
|
||||
conf.TiKVClient.EnableReplicaSelectorV2 = v2
|
||||
})
|
||||
func (s *testReplicaSelectorSuite) runCaseAndCompare(ca replicaSelectorAccessPathCase) bool {
|
||||
sender := ca.run(s)
|
||||
version := "v1"
|
||||
if v2 {
|
||||
version = "v2"
|
||||
}
|
||||
ca.checkResult(s, version, sender)
|
||||
ca.checkResult(s, sender)
|
||||
return !ca.accessErrInValid
|
||||
}
|
||||
|
||||
func (s *testReplicaSelectorSuite) runMultiCaseAndCompare(cas []replicaSelectorAccessPathCase) bool {
|
||||
expects := make([]accessPathResult, 0, len(cas))
|
||||
valid := true
|
||||
config.UpdateGlobal(func(conf *config.Config) {
|
||||
conf.TiKVClient.EnableReplicaSelectorV2 = false
|
||||
})
|
||||
for _, ca1 := range cas {
|
||||
sender := ca1.run(s)
|
||||
ca1.checkResult(s, "v1", sender)
|
||||
expects = append(expects, ca1.result)
|
||||
valid = valid && !ca1.accessErrInValid
|
||||
}
|
||||
|
||||
config.UpdateGlobal(func(conf *config.Config) {
|
||||
conf.TiKVClient.EnableReplicaSelectorV2 = true
|
||||
})
|
||||
for i, ca2 := range cas {
|
||||
sender := ca2.run(s)
|
||||
if ca2.expect == nil {
|
||||
// compare with ca1 result.
|
||||
ca2.expect = &expects[i]
|
||||
}
|
||||
ca2.checkResult(s, "v2", sender)
|
||||
valid = valid && !ca2.accessErrInValid
|
||||
for _, ca := range cas {
|
||||
sender := ca.run(s)
|
||||
ca.checkResult(s, sender)
|
||||
valid = valid && !ca.accessErrInValid
|
||||
}
|
||||
return valid
|
||||
}
|
||||
|
||||
func (ca *replicaSelectorAccessPathCase) checkResult(s *testReplicaSelectorSuite, version string, sender *RegionRequestSender) {
|
||||
func (ca *replicaSelectorAccessPathCase) checkResult(s *testReplicaSelectorSuite, sender *RegionRequestSender) {
|
||||
if ca.expect == nil {
|
||||
return
|
||||
}
|
||||
msg := fmt.Sprintf("enable_forwarding: %v\nversion: %v\n%v\nsender: %v\n", s.cache.enableForwarding, version, ca.Format(), sender.String())
|
||||
msg := fmt.Sprintf("enable_forwarding: %v\n%v\nsender: %v\n", s.cache.enableForwarding, ca.Format(), sender.String())
|
||||
expect := ca.expect
|
||||
result := ca.result
|
||||
s.Equal(expect.accessPath, result.accessPath, msg)
|
||||
|
|
@ -2953,7 +2789,7 @@ func (ca *replicaSelectorAccessPathCase) run(s *testReplicaSelectorSuite) *Regio
|
|||
s.NotNil(rc)
|
||||
bo := retry.NewBackofferWithVars(context.Background(), 40000, nil)
|
||||
resp, _, _, err := sender.SendReqCtx(bo, req, rc.VerID(), timeout, tikvrpc.TiKV, opts...)
|
||||
ca.recordResult(s, bo, sender.replicaSelector.getBaseReplicaSelector().region, access, resp, err)
|
||||
ca.recordResult(s, bo, sender.replicaSelector.region, access, resp, err)
|
||||
afterRun(ca, sender)
|
||||
return sender
|
||||
}
|
||||
|
|
@ -3312,43 +3148,6 @@ func (tp RegionErrorType) String() string {
|
|||
}
|
||||
}
|
||||
|
||||
type accessErrGenerator struct {
|
||||
maxAccessErrCnt int
|
||||
mode int
|
||||
idx int
|
||||
baseIdx int
|
||||
allErrs []RegionErrorType
|
||||
retryErrs []RegionErrorType
|
||||
}
|
||||
|
||||
func newAccessErrGenerator(isRead, staleRead bool, maxAccessErrCnt int) *accessErrGenerator {
|
||||
filter := func(tp RegionErrorType) bool {
|
||||
// read request won't meet RaftEntryTooLargeErr.
|
||||
if isRead && tp == RaftEntryTooLargeErr {
|
||||
return false
|
||||
}
|
||||
if staleRead == false && tp == DataIsNotReadyErr {
|
||||
return false
|
||||
}
|
||||
// TODO: since v2 has come compatibility issue with v1, so skip FlashbackInProgressErr.
|
||||
if tp == FlashbackInProgressErr {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
allErrs := getAllRegionErrors(filter)
|
||||
retryableErr := getAllRegionErrors(func(tp RegionErrorType) bool {
|
||||
return filter(tp) && isRegionErrorRetryable(tp)
|
||||
})
|
||||
return &accessErrGenerator{
|
||||
maxAccessErrCnt: maxAccessErrCnt,
|
||||
mode: 0,
|
||||
idx: 0,
|
||||
allErrs: allErrs,
|
||||
retryErrs: retryableErr,
|
||||
}
|
||||
}
|
||||
|
||||
func getAllRegionErrors(filter func(errorType RegionErrorType) bool) []RegionErrorType {
|
||||
errs := make([]RegionErrorType, 0, int(RegionErrorTypeMax))
|
||||
for tp := NotLeaderErr; tp < RegionErrorTypeMax; tp++ {
|
||||
|
|
@ -3360,84 +3159,6 @@ func getAllRegionErrors(filter func(errorType RegionErrorType) bool) []RegionErr
|
|||
return errs
|
||||
}
|
||||
|
||||
func isRegionErrorRetryable(tp RegionErrorType) bool {
|
||||
switch tp {
|
||||
case NotLeaderErr, NotLeaderWithNewLeader1Err, NotLeaderWithNewLeader2Err, NotLeaderWithNewLeader3Err, ServerIsBusyErr, ServerIsBusyWithEstimatedWaitMsErr,
|
||||
StaleCommandErr, MaxTimestampNotSyncedErr, ReadIndexNotReadyErr, ProposalInMergingModeErr, DataIsNotReadyErr,
|
||||
RegionNotInitializedErr, FlashbackInProgressErr, DiskFullErr, DeadLineExceededErr:
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (a *accessErrGenerator) genAccessErr(staleRead bool) ([]RegionErrorType, bool) {
|
||||
if a.mode == 0 {
|
||||
a.idx = 0
|
||||
a.mode = 1
|
||||
return nil, false
|
||||
}
|
||||
if a.mode == 1 {
|
||||
idx := a.idx
|
||||
a.idx++
|
||||
if a.idx >= len(a.allErrs) {
|
||||
a.idx = 0
|
||||
a.baseIdx = 0
|
||||
a.mode = 2
|
||||
}
|
||||
return []RegionErrorType{a.allErrs[idx]}, false
|
||||
}
|
||||
for a.mode <= a.maxAccessErrCnt {
|
||||
errs := a.genAccessErrs(a.allErrs, a.retryErrs)
|
||||
if len(errs) > 0 {
|
||||
return errs, false
|
||||
}
|
||||
a.baseIdx = 0
|
||||
a.idx = 0
|
||||
a.mode++
|
||||
// if mode >= 4 , reduce the error type to avoid generating too many combinations.
|
||||
if a.mode > 4 {
|
||||
if a.mode > 8 {
|
||||
a.allErrs = []RegionErrorType{ServerIsBusyErr, ServerIsBusyWithEstimatedWaitMsErr}
|
||||
} else if a.mode > 7 {
|
||||
a.allErrs = []RegionErrorType{ServerIsBusyWithEstimatedWaitMsErr, DeadLineExceededErr}
|
||||
} else if a.mode > 6 {
|
||||
a.allErrs = []RegionErrorType{NotLeaderWithNewLeader2Err, ServerIsBusyWithEstimatedWaitMsErr, DeadLineExceededErr}
|
||||
} else if a.mode > 5 {
|
||||
a.allErrs = []RegionErrorType{NotLeaderErr, NotLeaderWithNewLeader2Err, ServerIsBusyWithEstimatedWaitMsErr, DeadLineExceededErr}
|
||||
} else {
|
||||
a.allErrs = []RegionErrorType{NotLeaderErr, NotLeaderWithNewLeader1Err, NotLeaderWithNewLeader2Err, NotLeaderWithNewLeader3Err, ServerIsBusyWithEstimatedWaitMsErr, RegionNotInitializedErr, DeadLineExceededErr}
|
||||
}
|
||||
if staleRead {
|
||||
a.allErrs = append(a.allErrs, DataIsNotReadyErr)
|
||||
}
|
||||
a.retryErrs = a.allErrs
|
||||
}
|
||||
}
|
||||
return nil, true
|
||||
}
|
||||
|
||||
func (a *accessErrGenerator) genAccessErrs(allErrs, retryErrs []RegionErrorType) []RegionErrorType {
|
||||
defer func() {
|
||||
a.baseIdx++
|
||||
if a.baseIdx >= len(allErrs) {
|
||||
a.baseIdx = 0
|
||||
a.idx++
|
||||
}
|
||||
}()
|
||||
mode := a.mode
|
||||
errs := make([]RegionErrorType, mode)
|
||||
errs[mode-1] = allErrs[a.baseIdx%len(allErrs)]
|
||||
value := a.idx
|
||||
for i := mode - 2; i >= 0; i-- {
|
||||
if i == 0 && value > len(retryErrs) {
|
||||
return nil
|
||||
}
|
||||
errs[i] = retryErrs[value%len(retryErrs)]
|
||||
value = value / len(retryErrs)
|
||||
}
|
||||
return errs
|
||||
}
|
||||
|
||||
func (s *testReplicaSelectorSuite) getRegion() *Region {
|
||||
for i := 0; i < 100; i++ {
|
||||
loc, err := s.cache.LocateKey(s.bo, []byte("key"))
|
||||
|
|
@ -3517,6 +3238,28 @@ func TestTiKVClientReadTimeout(t *testing.T) {
|
|||
}, accessPath)
|
||||
}
|
||||
|
||||
func TestReplicaFlag(t *testing.T) {
|
||||
r := &replica{}
|
||||
allFlags := []uint8{deadlineErrUsingConfTimeoutFlag, dataIsNotReadyFlag, notLeaderFlag, serverIsBusyFlag}
|
||||
for i, flag := range allFlags {
|
||||
if i > 0 {
|
||||
require.True(t, flag > allFlags[i-1])
|
||||
}
|
||||
for j := i; j < len(allFlags); j++ {
|
||||
require.Equal(t, false, r.hasFlag(allFlags[j]))
|
||||
}
|
||||
r.addFlag(flag)
|
||||
require.Equal(t, true, r.hasFlag(flag))
|
||||
}
|
||||
for i, flag := range allFlags {
|
||||
for j := i; j < len(allFlags); j++ {
|
||||
require.Equal(t, true, r.hasFlag(allFlags[j]))
|
||||
}
|
||||
r.deleteFlag(flag)
|
||||
require.Equal(t, false, r.hasFlag(flag))
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkReplicaSelector(b *testing.B) {
|
||||
mvccStore := mocktikv.MustNewMVCCStore()
|
||||
cluster := mocktikv.NewCluster(mvccStore)
|
||||
|
|
@ -3528,9 +3271,6 @@ func BenchmarkReplicaSelector(b *testing.B) {
|
|||
mvccStore.Close()
|
||||
}()
|
||||
|
||||
config.UpdateGlobal(func(conf *config.Config) {
|
||||
conf.TiKVClient.EnableReplicaSelectorV2 = true
|
||||
})
|
||||
cnt := 0
|
||||
allErrs := getAllRegionErrors(nil)
|
||||
fnClient := &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
|
||||
|
|
|
|||
Loading…
Reference in New Issue