mirror of https://github.com/tikv/client-go.git
fix bug of not invalidate store when tikv store is down in replica-selector-v2 with enable forwarding (#1273)
* fix bug of enable-forwarding Signed-off-by: crazycs520 <crazycs520@gmail.com> * add test Signed-off-by: crazycs520 <crazycs520@gmail.com> --------- Signed-off-by: crazycs520 <crazycs520@gmail.com>
This commit is contained in:
parent
642a09bef1
commit
2bd95773ce
|
|
@ -122,10 +122,13 @@ func (s *replicaSelectorV2) next(bo *retry.Backoffer, req *tikvrpc.Request) (rpc
|
|||
func (s *replicaSelectorV2) nextForReplicaReadLeader(req *tikvrpc.Request) {
|
||||
if s.regionCache.enableForwarding {
|
||||
strategy := ReplicaSelectLeaderWithProxyStrategy{}
|
||||
s.target, s.proxy = strategy.next(s.replicas, s.region)
|
||||
s.target, s.proxy = strategy.next(s)
|
||||
if s.target != nil && s.proxy != nil {
|
||||
return
|
||||
}
|
||||
if s.target == nil && s.proxy == nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
leaderIdx := s.region.getStore().workTiKVIdx
|
||||
strategy := ReplicaSelectLeaderStrategy{leaderIdx: leaderIdx}
|
||||
|
|
@ -134,7 +137,7 @@ func (s *replicaSelectorV2) nextForReplicaReadLeader(req *tikvrpc.Request) {
|
|||
// If the leader is busy in our estimation, try other idle replicas.
|
||||
// If other replicas are all busy, tryIdleReplica will try the leader again without busy threshold.
|
||||
mixedStrategy := ReplicaSelectMixedStrategy{leaderIdx: leaderIdx, busyThreshold: s.busyThreshold}
|
||||
idleTarget := mixedStrategy.next(s, s.region)
|
||||
idleTarget := mixedStrategy.next(s)
|
||||
if idleTarget != nil {
|
||||
s.target = idleTarget
|
||||
req.ReplicaRead = true
|
||||
|
|
@ -149,7 +152,7 @@ func (s *replicaSelectorV2) nextForReplicaReadLeader(req *tikvrpc.Request) {
|
|||
return
|
||||
}
|
||||
mixedStrategy := ReplicaSelectMixedStrategy{leaderIdx: leaderIdx, leaderOnly: s.option.leaderOnly}
|
||||
s.target = mixedStrategy.next(s, s.region)
|
||||
s.target = mixedStrategy.next(s)
|
||||
if s.target != nil && s.isReadOnlyReq && s.replicas[leaderIdx].deadlineErrUsingConfTimeout {
|
||||
req.ReplicaRead = true
|
||||
req.StaleRead = false
|
||||
|
|
@ -178,7 +181,7 @@ func (s *replicaSelectorV2) nextForReplicaReadMixed(req *tikvrpc.Request) {
|
|||
labels: s.option.labels,
|
||||
stores: s.option.stores,
|
||||
}
|
||||
s.target = strategy.next(s, s.region)
|
||||
s.target = strategy.next(s)
|
||||
if s.target != nil {
|
||||
if s.isStaleRead && s.attempts == 1 {
|
||||
// stale-read request first access.
|
||||
|
|
@ -233,7 +236,7 @@ type ReplicaSelectMixedStrategy struct {
|
|||
busyThreshold time.Duration
|
||||
}
|
||||
|
||||
func (s *ReplicaSelectMixedStrategy) next(selector *replicaSelectorV2, region *Region) *replica {
|
||||
func (s *ReplicaSelectMixedStrategy) next(selector *replicaSelectorV2) *replica {
|
||||
replicas := selector.replicas
|
||||
maxScoreIdxes := make([]int, 0, len(replicas))
|
||||
var maxScore storeSelectionScore = -1
|
||||
|
|
@ -396,14 +399,15 @@ func (s *ReplicaSelectMixedStrategy) calculateScore(r *replica, isLeader bool) s
|
|||
|
||||
type ReplicaSelectLeaderWithProxyStrategy struct{}
|
||||
|
||||
func (s ReplicaSelectLeaderWithProxyStrategy) next(replicas []*replica, region *Region) (leader *replica, proxy *replica) {
|
||||
rs := region.getStore()
|
||||
func (s ReplicaSelectLeaderWithProxyStrategy) next(selector *replicaSelectorV2) (leader *replica, proxy *replica) {
|
||||
rs := selector.region.getStore()
|
||||
leaderIdx := rs.workTiKVIdx
|
||||
replicas := selector.replicas
|
||||
leader = replicas[leaderIdx]
|
||||
if leader.store.getLivenessState() == reachable || leader.notLeader {
|
||||
// if leader's store is reachable, no need use proxy.
|
||||
rs.unsetProxyStoreIfNeeded(region)
|
||||
return nil, nil
|
||||
rs.unsetProxyStoreIfNeeded(selector.region)
|
||||
return leader, nil
|
||||
}
|
||||
proxyIdx := rs.proxyTiKVIdx
|
||||
if proxyIdx >= 0 && int(proxyIdx) < len(replicas) && s.isCandidate(replicas[proxyIdx], proxyIdx == leaderIdx) {
|
||||
|
|
@ -415,6 +419,10 @@ func (s ReplicaSelectLeaderWithProxyStrategy) next(replicas []*replica, region *
|
|||
return leader, r
|
||||
}
|
||||
}
|
||||
// If all followers are tried as a proxy and fail, mark the leader store invalid, then backoff and retry.
|
||||
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc()
|
||||
selector.invalidateReplicaStore(leader, errors.Errorf("all followers are tried as proxy but fail"))
|
||||
selector.region.setSyncFlags(needReloadOnAccess)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -95,8 +95,8 @@ type replicaSelectorAccessPathCase struct {
|
|||
accessErrInValid bool
|
||||
expect *accessPathResult
|
||||
result accessPathResult
|
||||
beforeRun func() // beforeRun will be called before the test case execute, if it is nil, resetStoreState will be called.
|
||||
afterRun func() // afterRun will be called after the test case execute, if it is nil, invalidateRegion will be called.
|
||||
beforeRun func() // beforeRun will be called before the test case execute, if it is nil, resetStoreState will be called.
|
||||
afterRun func(ReplicaSelector) // afterRun will be called after the test case execute, if it is nil, invalidateRegion will be called.
|
||||
}
|
||||
|
||||
type accessPathResult struct {
|
||||
|
|
@ -1408,7 +1408,7 @@ func TestReplicaReadAccessPathByLeaderCase(t *testing.T) {
|
|||
backoffDetail: []string{"regionScheduling+1"},
|
||||
regionIsValid: true,
|
||||
},
|
||||
afterRun: func() { /* don't invalid region */ },
|
||||
afterRun: func(_ ReplicaSelector) { /* don't invalid region */ },
|
||||
},
|
||||
{
|
||||
reqType: tikvrpc.CmdGet,
|
||||
|
|
@ -1426,7 +1426,7 @@ func TestReplicaReadAccessPathByLeaderCase(t *testing.T) {
|
|||
backoffDetail: []string{},
|
||||
regionIsValid: true,
|
||||
},
|
||||
afterRun: func() { /* don't invalid region */ },
|
||||
afterRun: func(_ ReplicaSelector) { /* don't invalid region */ },
|
||||
},
|
||||
{
|
||||
reqType: tikvrpc.CmdGet,
|
||||
|
|
@ -1732,7 +1732,7 @@ func TestReplicaReadAccessPathByMixedAndPreferLeaderCase(t *testing.T) {
|
|||
backoffDetail: []string{},
|
||||
regionIsValid: true,
|
||||
},
|
||||
afterRun: func() { /* don't invalid region */ },
|
||||
afterRun: func(_ ReplicaSelector) { /* don't invalid region */ },
|
||||
},
|
||||
{
|
||||
reqType: tikvrpc.CmdGet,
|
||||
|
|
@ -1776,7 +1776,7 @@ func TestReplicaReadAccessPathByMixedAndPreferLeaderCase(t *testing.T) {
|
|||
backoffDetail: []string{},
|
||||
regionIsValid: true,
|
||||
},
|
||||
afterRun: func() { /* don't invalid region */ },
|
||||
afterRun: func(_ ReplicaSelector) { /* don't invalid region */ },
|
||||
},
|
||||
{
|
||||
reqType: tikvrpc.CmdGet,
|
||||
|
|
@ -2125,7 +2125,7 @@ func TestReplicaReadAccessPathByStaleReadCase(t *testing.T) {
|
|||
backoffDetail: []string{"tikvRPC+2"},
|
||||
regionIsValid: true,
|
||||
},
|
||||
afterRun: func() { /* don't invalid region */ },
|
||||
afterRun: func(_ ReplicaSelector) { /* don't invalid region */ },
|
||||
},
|
||||
{
|
||||
reqType: tikvrpc.CmdGet,
|
||||
|
|
@ -2144,7 +2144,7 @@ func TestReplicaReadAccessPathByStaleReadCase(t *testing.T) {
|
|||
backoffDetail: []string{"tikvServerBusy+1"},
|
||||
regionIsValid: false,
|
||||
},
|
||||
afterRun: func() { /* don't invalid region */ },
|
||||
afterRun: func(_ ReplicaSelector) { /* don't invalid region */ },
|
||||
},
|
||||
}
|
||||
s.True(s.runMultiCaseAndCompare(cas))
|
||||
|
|
@ -2404,7 +2404,7 @@ func TestReplicaReadAccessPathByProxyCase(t *testing.T) {
|
|||
backoffDetail: []string{"tikvRPC+1"},
|
||||
regionIsValid: true,
|
||||
},
|
||||
afterRun: func() { /* don't invalid region */ },
|
||||
afterRun: func(_ ReplicaSelector) { /* don't invalid region */ },
|
||||
},
|
||||
{
|
||||
reqType: tikvrpc.CmdGet,
|
||||
|
|
@ -2480,6 +2480,13 @@ func TestReplicaReadAccessPathByProxyCase(t *testing.T) {
|
|||
backoffDetail: []string{"tikvRPC+1", "tikvServerBusy+2"},
|
||||
regionIsValid: false,
|
||||
},
|
||||
afterRun: func(selector ReplicaSelector) {
|
||||
base := selector.getBaseReplicaSelector()
|
||||
s.NotNil(base)
|
||||
s.True(base.replicas[0].isEpochStale())
|
||||
s.True(base.replicas[0].epoch < atomic.LoadUint32(&base.replicas[0].store.epoch))
|
||||
s.False(base.region.isValid())
|
||||
},
|
||||
}
|
||||
s.True(s.runCaseAndCompare(ca))
|
||||
|
||||
|
|
@ -2955,7 +2962,7 @@ func beforeRun(s *testReplicaSelectorSuite, ca *replicaSelectorAccessPathCase) {
|
|||
|
||||
func afterRun(ca *replicaSelectorAccessPathCase, sender *RegionRequestSender) {
|
||||
if ca.afterRun != nil {
|
||||
ca.afterRun()
|
||||
ca.afterRun(sender.replicaSelector)
|
||||
} else {
|
||||
sender.replicaSelector.invalidateRegion() // invalidate region to reload for next test case.
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue