mirror of https://github.com/tikv/client-go.git
do not try leader if it's unreachable (#971)
* do not try leader if it's unreachable Signed-off-by: zyguan <zhongyangguan@gmail.com> * fix the flaky test Signed-off-by: zyguan <zhongyangguan@gmail.com> --------- Signed-off-by: zyguan <zhongyangguan@gmail.com>
This commit is contained in:
parent
c8832b8466
commit
44b0cf7aba
|
|
@ -730,11 +730,15 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
|
|||
if selector.targetIdx < 0 {
|
||||
leader := selector.replicas[state.leaderIdx]
|
||||
leaderEpochStale := leader.isEpochStale()
|
||||
leaderInvalid := leaderEpochStale || state.IsLeaderExhausted(leader)
|
||||
leaderUnreachable := leader.store.getLivenessState() != reachable
|
||||
leaderExhausted := state.IsLeaderExhausted(leader)
|
||||
leaderInvalid := leaderEpochStale || leaderUnreachable || leaderExhausted
|
||||
if len(state.option.labels) > 0 {
|
||||
logutil.Logger(bo.GetCtx()).Warn("unable to find stores with given labels",
|
||||
zap.Uint64("region", selector.region.GetID()),
|
||||
zap.Bool("leader-invalid", leaderInvalid),
|
||||
zap.Bool("leader-epoch-stale", leaderEpochStale),
|
||||
zap.Bool("leader-unreachable", leaderUnreachable),
|
||||
zap.Bool("leader-exhausted", leaderExhausted),
|
||||
zap.Bool("stale-read", state.isStaleRead),
|
||||
zap.Any("labels", state.option.labels))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -922,6 +922,16 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
|
|||
|
||||
// Verify switch to the leader immediately when stale read requests with global txn scope meet region errors.
|
||||
s.cluster.ChangeLeader(region.Region.id, s.peerIDs[0])
|
||||
tf = func(s *Store, bo *retry.Backoffer) livenessState {
|
||||
return reachable
|
||||
}
|
||||
s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf))
|
||||
s.Eventually(func() bool {
|
||||
stores := s.regionRequestSender.replicaSelector.regionStore.stores
|
||||
return stores[0].getLivenessState() == reachable &&
|
||||
stores[1].getLivenessState() == reachable &&
|
||||
stores[2].getLivenessState() == reachable
|
||||
}, 3*time.Second, 200*time.Millisecond)
|
||||
reloadRegion()
|
||||
req = tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")})
|
||||
req.ReadReplicaScope = oracle.GlobalTxnScope
|
||||
|
|
@ -1581,3 +1591,32 @@ func (s *testRegionRequestToThreeStoresSuite) TestStaleReadTryFollowerAfterTimeo
|
|||
s.Equal(int64(2), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 2 rpc
|
||||
s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry.
|
||||
}
|
||||
|
||||
func (s *testRegionRequestToThreeStoresSuite) TestDoNotTryUnreachableLeader() {
|
||||
key := []byte("key")
|
||||
region, err := s.regionRequestSender.regionCache.findRegionByKey(s.bo, key, false)
|
||||
s.Nil(err)
|
||||
regionStore := region.getStore()
|
||||
leader, _, _, _ := region.WorkStorePeer(regionStore)
|
||||
follower, _, _, _ := region.FollowerStorePeer(regionStore, 0, &storeSelectorOp{})
|
||||
|
||||
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
|
||||
if req.StaleRead && addr == follower.addr {
|
||||
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{DataIsNotReady: &errorpb.DataIsNotReady{}}}}, nil
|
||||
}
|
||||
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{
|
||||
Value: []byte(addr),
|
||||
}}, nil
|
||||
}}
|
||||
atomic.StoreUint32(&leader.livenessState, uint32(unreachable))
|
||||
|
||||
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: key}, kv.ReplicaReadLeader, nil)
|
||||
req.ReadReplicaScope = oracle.GlobalTxnScope
|
||||
req.TxnScope = oracle.GlobalTxnScope
|
||||
req.EnableStaleRead()
|
||||
bo := retry.NewBackoffer(context.Background(), -1)
|
||||
resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, region.VerID(), time.Second, tikvrpc.TiKV, WithMatchLabels(follower.labels))
|
||||
s.Nil(err)
|
||||
// `tryFollower` always try the local peer firstly
|
||||
s.Equal(follower.addr, string(resp.Resp.(*kvrpcpb.GetResponse).Value))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -724,6 +724,8 @@ func (s *testRegionRequestToSingleStoreSuite) TestKVReadTimeoutWithDisableBatchC
|
|||
s.Nil(err)
|
||||
s.NotNil(region)
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("a"), Version: 1})
|
||||
// send a probe request to make sure the mock server is ready.
|
||||
s.regionRequestSender.SendReq(retry.NewNoopBackoff(context.Background()), req, region.Region, time.Second)
|
||||
resp, _, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Millisecond*10)
|
||||
s.Nil(err)
|
||||
s.NotNil(resp)
|
||||
|
|
|
|||
Loading…
Reference in New Issue