diff --git a/internal/locate/replica_selector.go b/internal/locate/replica_selector.go index 895add05..c2888612 100644 --- a/internal/locate/replica_selector.go +++ b/internal/locate/replica_selector.go @@ -122,10 +122,13 @@ func (s *replicaSelectorV2) next(bo *retry.Backoffer, req *tikvrpc.Request) (rpc func (s *replicaSelectorV2) nextForReplicaReadLeader(req *tikvrpc.Request) { if s.regionCache.enableForwarding { strategy := ReplicaSelectLeaderWithProxyStrategy{} - s.target, s.proxy = strategy.next(s.replicas, s.region) + s.target, s.proxy = strategy.next(s) if s.target != nil && s.proxy != nil { return } + if s.target == nil && s.proxy == nil { + return + } } leaderIdx := s.region.getStore().workTiKVIdx strategy := ReplicaSelectLeaderStrategy{leaderIdx: leaderIdx} @@ -134,7 +137,7 @@ func (s *replicaSelectorV2) nextForReplicaReadLeader(req *tikvrpc.Request) { // If the leader is busy in our estimation, try other idle replicas. // If other replicas are all busy, tryIdleReplica will try the leader again without busy threshold. mixedStrategy := ReplicaSelectMixedStrategy{leaderIdx: leaderIdx, busyThreshold: s.busyThreshold} - idleTarget := mixedStrategy.next(s, s.region) + idleTarget := mixedStrategy.next(s) if idleTarget != nil { s.target = idleTarget req.ReplicaRead = true @@ -149,7 +152,7 @@ func (s *replicaSelectorV2) nextForReplicaReadLeader(req *tikvrpc.Request) { return } mixedStrategy := ReplicaSelectMixedStrategy{leaderIdx: leaderIdx, leaderOnly: s.option.leaderOnly} - s.target = mixedStrategy.next(s, s.region) + s.target = mixedStrategy.next(s) if s.target != nil && s.isReadOnlyReq && s.replicas[leaderIdx].deadlineErrUsingConfTimeout { req.ReplicaRead = true req.StaleRead = false @@ -178,7 +181,7 @@ func (s *replicaSelectorV2) nextForReplicaReadMixed(req *tikvrpc.Request) { labels: s.option.labels, stores: s.option.stores, } - s.target = strategy.next(s, s.region) + s.target = strategy.next(s) if s.target != nil { if s.isStaleRead && s.attempts == 1 { // stale-read request first access. @@ -233,7 +236,7 @@ type ReplicaSelectMixedStrategy struct { busyThreshold time.Duration } -func (s *ReplicaSelectMixedStrategy) next(selector *replicaSelectorV2, region *Region) *replica { +func (s *ReplicaSelectMixedStrategy) next(selector *replicaSelectorV2) *replica { replicas := selector.replicas maxScoreIdxes := make([]int, 0, len(replicas)) var maxScore storeSelectionScore = -1 @@ -396,14 +399,15 @@ func (s *ReplicaSelectMixedStrategy) calculateScore(r *replica, isLeader bool) s type ReplicaSelectLeaderWithProxyStrategy struct{} -func (s ReplicaSelectLeaderWithProxyStrategy) next(replicas []*replica, region *Region) (leader *replica, proxy *replica) { - rs := region.getStore() +func (s ReplicaSelectLeaderWithProxyStrategy) next(selector *replicaSelectorV2) (leader *replica, proxy *replica) { + rs := selector.region.getStore() leaderIdx := rs.workTiKVIdx + replicas := selector.replicas leader = replicas[leaderIdx] if leader.store.getLivenessState() == reachable || leader.notLeader { // if leader's store is reachable, no need use proxy. - rs.unsetProxyStoreIfNeeded(region) - return nil, nil + rs.unsetProxyStoreIfNeeded(selector.region) + return leader, nil } proxyIdx := rs.proxyTiKVIdx if proxyIdx >= 0 && int(proxyIdx) < len(replicas) && s.isCandidate(replicas[proxyIdx], proxyIdx == leaderIdx) { @@ -415,6 +419,10 @@ func (s ReplicaSelectLeaderWithProxyStrategy) next(replicas []*replica, region * return leader, r } } + // If all followers are tried as a proxy and fail, mark the leader store invalid, then backoff and retry. + metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc() + selector.invalidateReplicaStore(leader, errors.Errorf("all followers are tried as proxy but fail")) + selector.region.setSyncFlags(needReloadOnAccess) return nil, nil } diff --git a/internal/locate/replica_selector_test.go b/internal/locate/replica_selector_test.go index 521700d8..e4d249ba 100644 --- a/internal/locate/replica_selector_test.go +++ b/internal/locate/replica_selector_test.go @@ -95,8 +95,8 @@ type replicaSelectorAccessPathCase struct { accessErrInValid bool expect *accessPathResult result accessPathResult - beforeRun func() // beforeRun will be called before the test case execute, if it is nil, resetStoreState will be called. - afterRun func() // afterRun will be called after the test case execute, if it is nil, invalidateRegion will be called. + beforeRun func() // beforeRun will be called before the test case execute, if it is nil, resetStoreState will be called. + afterRun func(ReplicaSelector) // afterRun will be called after the test case execute, if it is nil, invalidateRegion will be called. } type accessPathResult struct { @@ -1408,7 +1408,7 @@ func TestReplicaReadAccessPathByLeaderCase(t *testing.T) { backoffDetail: []string{"regionScheduling+1"}, regionIsValid: true, }, - afterRun: func() { /* don't invalid region */ }, + afterRun: func(_ ReplicaSelector) { /* don't invalid region */ }, }, { reqType: tikvrpc.CmdGet, @@ -1426,7 +1426,7 @@ func TestReplicaReadAccessPathByLeaderCase(t *testing.T) { backoffDetail: []string{}, regionIsValid: true, }, - afterRun: func() { /* don't invalid region */ }, + afterRun: func(_ ReplicaSelector) { /* don't invalid region */ }, }, { reqType: tikvrpc.CmdGet, @@ -1732,7 +1732,7 @@ func TestReplicaReadAccessPathByMixedAndPreferLeaderCase(t *testing.T) { backoffDetail: []string{}, regionIsValid: true, }, - afterRun: func() { /* don't invalid region */ }, + afterRun: func(_ ReplicaSelector) { /* don't invalid region */ }, }, { reqType: tikvrpc.CmdGet, @@ -1776,7 +1776,7 @@ func TestReplicaReadAccessPathByMixedAndPreferLeaderCase(t *testing.T) { backoffDetail: []string{}, regionIsValid: true, }, - afterRun: func() { /* don't invalid region */ }, + afterRun: func(_ ReplicaSelector) { /* don't invalid region */ }, }, { reqType: tikvrpc.CmdGet, @@ -2125,7 +2125,7 @@ func TestReplicaReadAccessPathByStaleReadCase(t *testing.T) { backoffDetail: []string{"tikvRPC+2"}, regionIsValid: true, }, - afterRun: func() { /* don't invalid region */ }, + afterRun: func(_ ReplicaSelector) { /* don't invalid region */ }, }, { reqType: tikvrpc.CmdGet, @@ -2144,7 +2144,7 @@ func TestReplicaReadAccessPathByStaleReadCase(t *testing.T) { backoffDetail: []string{"tikvServerBusy+1"}, regionIsValid: false, }, - afterRun: func() { /* don't invalid region */ }, + afterRun: func(_ ReplicaSelector) { /* don't invalid region */ }, }, } s.True(s.runMultiCaseAndCompare(cas)) @@ -2404,7 +2404,7 @@ func TestReplicaReadAccessPathByProxyCase(t *testing.T) { backoffDetail: []string{"tikvRPC+1"}, regionIsValid: true, }, - afterRun: func() { /* don't invalid region */ }, + afterRun: func(_ ReplicaSelector) { /* don't invalid region */ }, }, { reqType: tikvrpc.CmdGet, @@ -2480,6 +2480,13 @@ func TestReplicaReadAccessPathByProxyCase(t *testing.T) { backoffDetail: []string{"tikvRPC+1", "tikvServerBusy+2"}, regionIsValid: false, }, + afterRun: func(selector ReplicaSelector) { + base := selector.getBaseReplicaSelector() + s.NotNil(base) + s.True(base.replicas[0].isEpochStale()) + s.True(base.replicas[0].epoch < atomic.LoadUint32(&base.replicas[0].store.epoch)) + s.False(base.region.isValid()) + }, } s.True(s.runCaseAndCompare(ca)) @@ -2955,7 +2962,7 @@ func beforeRun(s *testReplicaSelectorSuite, ca *replicaSelectorAccessPathCase) { func afterRun(ca *replicaSelectorAccessPathCase, sender *RegionRequestSender) { if ca.afterRun != nil { - ca.afterRun() + ca.afterRun(sender.replicaSelector) } else { sender.replicaSelector.invalidateRegion() // invalidate region to reload for next test case. }