mirror of https://github.com/tikv/client-go.git
* fix unexpected slow query during GC running after stop 1 tikv-server Signed-off-by: crazycs520 <crazycs520@gmail.com> * fix test Signed-off-by: crazycs520 <crazycs520@gmail.com> --------- Signed-off-by: crazycs520 <crazycs520@gmail.com>
This commit is contained in:
parent
1b093b4ec3
commit
d880eca9e1
|
|
@ -402,7 +402,7 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (
|
|||
}
|
||||
targetReplica = selector.replicas[idx]
|
||||
// Each follower is only tried once
|
||||
if !targetReplica.isExhausted(1) {
|
||||
if !targetReplica.isExhausted(1) && targetReplica.store.getLivenessState() != unreachable {
|
||||
state.lastIdx = idx
|
||||
selector.targetIdx = idx
|
||||
break
|
||||
|
|
@ -649,8 +649,8 @@ func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replic
|
|||
}
|
||||
|
||||
func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool {
|
||||
// the epoch is staled or retry exhausted.
|
||||
if replica.isEpochStale() || replica.isExhausted(1) {
|
||||
// the epoch is staled or retry exhausted, or the store is unreachable.
|
||||
if replica.isEpochStale() || replica.isExhausted(1) || replica.store.getLivenessState() == unreachable {
|
||||
return false
|
||||
}
|
||||
// The request can only be sent to the leader.
|
||||
|
|
|
|||
|
|
@ -278,6 +278,12 @@ func refreshEpochs(regionStore *regionStore) {
|
|||
}
|
||||
}
|
||||
|
||||
func refreshLivenessStates(regionStore *regionStore) {
|
||||
for _, store := range regionStore.stores {
|
||||
atomic.StoreUint32(&store.livenessState, uint32(reachable))
|
||||
}
|
||||
}
|
||||
|
||||
func AssertRPCCtxEqual(s *testRegionRequestToThreeStoresSuite, rpcCtx *RPCContext, target *replica, proxy *replica) {
|
||||
s.Equal(rpcCtx.Store, target.store)
|
||||
s.Equal(rpcCtx.Peer, target.peer)
|
||||
|
|
@ -567,6 +573,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
|
|||
// Test accessFollower state with kv.ReplicaReadFollower request type.
|
||||
req = tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}, kv.ReplicaReadFollower, nil)
|
||||
refreshEpochs(regionStore)
|
||||
refreshLivenessStates(regionStore)
|
||||
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req)
|
||||
s.Nil(err)
|
||||
s.NotNil(replicaSelector)
|
||||
|
|
@ -680,10 +687,13 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
|
|||
region, err := s.cache.LocateRegionByID(s.bo, s.regionID)
|
||||
s.Nil(err)
|
||||
s.NotNil(region)
|
||||
regionStore := s.cache.GetCachedRegionWithRLock(region.Region).getStore()
|
||||
s.NotNil(regionStore)
|
||||
|
||||
reloadRegion := func() {
|
||||
s.regionRequestSender.replicaSelector.region.invalidate(Other)
|
||||
region, _ = s.cache.LocateRegionByID(s.bo, s.regionID)
|
||||
regionStore = s.cache.GetCachedRegionWithRLock(region.Region).getStore()
|
||||
}
|
||||
|
||||
hasFakeRegionError := func(resp *tikvrpc.Response) bool {
|
||||
|
|
@ -715,6 +725,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
|
|||
s.Equal(sender.replicaSelector.targetIdx, AccessIndex(1))
|
||||
s.True(bo.GetTotalBackoffTimes() == 1)
|
||||
s.cluster.StartStore(s.storeIDs[0])
|
||||
atomic.StoreUint32(®ionStore.stores[0].livenessState, uint32(reachable))
|
||||
|
||||
// Leader is updated because of send success, so no backoff.
|
||||
bo = retry.NewBackoffer(context.Background(), -1)
|
||||
|
|
@ -734,6 +745,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
|
|||
s.True(hasFakeRegionError(resp))
|
||||
s.Equal(bo.GetTotalBackoffTimes(), 1)
|
||||
s.cluster.StartStore(s.storeIDs[1])
|
||||
atomic.StoreUint32(®ionStore.stores[1].livenessState, uint32(reachable))
|
||||
|
||||
// Leader is changed. No backoff.
|
||||
reloadRegion()
|
||||
|
|
@ -750,7 +762,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
|
|||
resp, _, err = sender.SendReq(bo, req, region.Region, time.Second)
|
||||
s.Nil(err)
|
||||
s.True(hasFakeRegionError(resp))
|
||||
s.Equal(bo.GetTotalBackoffTimes(), 2) // The unreachable leader is skipped
|
||||
s.Equal(bo.GetTotalBackoffTimes(), 3)
|
||||
s.False(sender.replicaSelector.region.isValid())
|
||||
s.cluster.ChangeLeader(s.regionID, s.peerIDs[0])
|
||||
|
||||
|
|
@ -1052,3 +1064,81 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaReadWithFlashbackInProg
|
|||
s.Equal(resp.Resp.(*kvrpcpb.GetResponse).Value, []byte("value"))
|
||||
}
|
||||
}
|
||||
|
||||
func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown() {
|
||||
var leaderAddr string
|
||||
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
|
||||
// Returns error when accesses non-leader.
|
||||
if leaderAddr != addr {
|
||||
return nil, context.DeadlineExceeded
|
||||
}
|
||||
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{
|
||||
Value: []byte("value"),
|
||||
}}, nil
|
||||
}}
|
||||
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{
|
||||
Key: []byte("key"),
|
||||
})
|
||||
req.ReplicaReadType = kv.ReplicaReadMixed
|
||||
|
||||
loc, err := s.cache.LocateKey(s.bo, []byte("key"))
|
||||
s.Nil(err)
|
||||
region := s.cache.GetCachedRegionWithRLock(loc.Region)
|
||||
s.NotNil(region)
|
||||
regionStore := region.getStore()
|
||||
leaderAddr = regionStore.stores[regionStore.workTiKVIdx].addr
|
||||
s.NotEqual(leaderAddr, "")
|
||||
for i := 0; i < 10; i++ {
|
||||
bo := retry.NewBackofferWithVars(context.Background(), 100, nil)
|
||||
resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
|
||||
s.Nil(err)
|
||||
s.NotNil(resp)
|
||||
|
||||
// Since send req to follower will receive error, then all follower will be marked as unreachable and epoch stale.
|
||||
allFollowerStoreEpochStale := true
|
||||
for i, store := range regionStore.stores {
|
||||
if i == int(regionStore.workTiKVIdx) {
|
||||
continue
|
||||
}
|
||||
if store.epoch == regionStore.storeEpochs[i] {
|
||||
allFollowerStoreEpochStale = false
|
||||
break
|
||||
} else {
|
||||
s.Equal(store.getLivenessState(), unreachable)
|
||||
}
|
||||
}
|
||||
if allFollowerStoreEpochStale {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// mock for GC leader reload all regions.
|
||||
bo := retry.NewBackofferWithVars(context.Background(), 10, nil)
|
||||
_, err = s.cache.BatchLoadRegionsWithKeyRange(bo, []byte(""), nil, 1)
|
||||
s.Nil(err)
|
||||
|
||||
loc, err = s.cache.LocateKey(s.bo, []byte("key"))
|
||||
s.Nil(err)
|
||||
region = s.cache.GetCachedRegionWithRLock(loc.Region)
|
||||
s.NotNil(region)
|
||||
regionStore = region.getStore()
|
||||
for i, store := range regionStore.stores {
|
||||
if i == int(regionStore.workTiKVIdx) {
|
||||
continue
|
||||
}
|
||||
// After reload region, the region epoch will be updated, but the store liveness state is still unreachable.
|
||||
s.Equal(store.epoch, regionStore.storeEpochs[i])
|
||||
s.Equal(store.getLivenessState(), unreachable)
|
||||
}
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
bo := retry.NewBackofferWithVars(context.Background(), 1, nil)
|
||||
resp, _, retryTimes, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
|
||||
s.Nil(err)
|
||||
s.NotNil(resp)
|
||||
// since all follower'store is unreachable, the request will be sent to leader, the backoff times should be 0.
|
||||
s.Equal(0, bo.GetTotalBackoffTimes())
|
||||
s.Equal(0, retryTimes)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue