From 44b0cf7aba2be652cb17e36f70227c8fdb906d20 Mon Sep 17 00:00:00 2001 From: zyguan Date: Mon, 25 Sep 2023 11:25:02 +0800 Subject: [PATCH] do not try leader if it's unreachable (#971) * do not try leader if it's unreachable Signed-off-by: zyguan * fix the flaky test Signed-off-by: zyguan --------- Signed-off-by: zyguan --- internal/locate/region_request.go | 8 +++-- internal/locate/region_request3_test.go | 39 +++++++++++++++++++++++++ internal/locate/region_request_test.go | 2 ++ 3 files changed, 47 insertions(+), 2 deletions(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 9d823e4f..f4718cee 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -730,11 +730,15 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector if selector.targetIdx < 0 { leader := selector.replicas[state.leaderIdx] leaderEpochStale := leader.isEpochStale() - leaderInvalid := leaderEpochStale || state.IsLeaderExhausted(leader) + leaderUnreachable := leader.store.getLivenessState() != reachable + leaderExhausted := state.IsLeaderExhausted(leader) + leaderInvalid := leaderEpochStale || leaderUnreachable || leaderExhausted if len(state.option.labels) > 0 { logutil.Logger(bo.GetCtx()).Warn("unable to find stores with given labels", zap.Uint64("region", selector.region.GetID()), - zap.Bool("leader-invalid", leaderInvalid), + zap.Bool("leader-epoch-stale", leaderEpochStale), + zap.Bool("leader-unreachable", leaderUnreachable), + zap.Bool("leader-exhausted", leaderExhausted), zap.Bool("stale-read", state.isStaleRead), zap.Any("labels", state.option.labels)) } diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index 41dd69b6..991bc04d 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -922,6 +922,16 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { // Verify switch to the leader immediately when stale read requests with global txn scope meet region errors. s.cluster.ChangeLeader(region.Region.id, s.peerIDs[0]) + tf = func(s *Store, bo *retry.Backoffer) livenessState { + return reachable + } + s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf)) + s.Eventually(func() bool { + stores := s.regionRequestSender.replicaSelector.regionStore.stores + return stores[0].getLivenessState() == reachable && + stores[1].getLivenessState() == reachable && + stores[2].getLivenessState() == reachable + }, 3*time.Second, 200*time.Millisecond) reloadRegion() req = tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}) req.ReadReplicaScope = oracle.GlobalTxnScope @@ -1581,3 +1591,32 @@ func (s *testRegionRequestToThreeStoresSuite) TestStaleReadTryFollowerAfterTimeo s.Equal(int64(2), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 2 rpc s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. } + +func (s *testRegionRequestToThreeStoresSuite) TestDoNotTryUnreachableLeader() { + key := []byte("key") + region, err := s.regionRequestSender.regionCache.findRegionByKey(s.bo, key, false) + s.Nil(err) + regionStore := region.getStore() + leader, _, _, _ := region.WorkStorePeer(regionStore) + follower, _, _, _ := region.FollowerStorePeer(regionStore, 0, &storeSelectorOp{}) + + s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + if req.StaleRead && addr == follower.addr { + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{DataIsNotReady: &errorpb.DataIsNotReady{}}}}, nil + } + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{ + Value: []byte(addr), + }}, nil + }} + atomic.StoreUint32(&leader.livenessState, uint32(unreachable)) + + req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: key}, kv.ReplicaReadLeader, nil) + req.ReadReplicaScope = oracle.GlobalTxnScope + req.TxnScope = oracle.GlobalTxnScope + req.EnableStaleRead() + bo := retry.NewBackoffer(context.Background(), -1) + resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, region.VerID(), time.Second, tikvrpc.TiKV, WithMatchLabels(follower.labels)) + s.Nil(err) + // `tryFollower` always try the local peer firstly + s.Equal(follower.addr, string(resp.Resp.(*kvrpcpb.GetResponse).Value)) +} diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index 2923648f..d14e7449 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -724,6 +724,8 @@ func (s *testRegionRequestToSingleStoreSuite) TestKVReadTimeoutWithDisableBatchC s.Nil(err) s.NotNil(region) req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("a"), Version: 1}) + // send a probe request to make sure the mock server is ready. + s.regionRequestSender.SendReq(retry.NewNoopBackoff(context.Background()), req, region.Region, time.Second) resp, _, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Millisecond*10) s.Nil(err) s.NotNil(resp)