mirror of https://github.com/tikv/client-go.git
* fallback to follower when leader is busy Signed-off-by: you06 <you1474600@gmail.com> Co-authored-by: cfzjywxk <cfzjywxk@gmail.com> Co-authored-by: cfzjywxk <lsswxrxr@163.com>
This commit is contained in:
parent
45894d9d36
commit
44f5025f5a
|
|
@ -1289,9 +1289,11 @@ func (c *RegionCache) reloadRegion(regionID uint64) {
|
|||
// ignore error and use old region info.
|
||||
logutil.Logger(bo.GetCtx()).Error("load region failure",
|
||||
zap.Uint64("regionID", regionID), zap.Error(err))
|
||||
c.mu.RLock()
|
||||
if oldRegion := c.getRegionByIDFromCache(regionID); oldRegion != nil {
|
||||
oldRegion.asyncReload.Store(false)
|
||||
}
|
||||
c.mu.RUnlock()
|
||||
return
|
||||
}
|
||||
c.mu.Lock()
|
||||
|
|
|
|||
|
|
@ -395,26 +395,51 @@ type tryFollower struct {
|
|||
lastIdx AccessIndex
|
||||
// fromOnNotLeader indicates whether the state is changed from onNotLeader.
|
||||
fromOnNotLeader bool
|
||||
labels []*metapb.StoreLabel
|
||||
}
|
||||
|
||||
func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
|
||||
var targetReplica *replica
|
||||
hasDeadlineExceededErr := false
|
||||
// Search replica that is not attempted from the last accessed replica
|
||||
for i := 1; i < len(selector.replicas); i++ {
|
||||
idx := AccessIndex((int(state.lastIdx) + i) % len(selector.replicas))
|
||||
targetReplica = selector.replicas[idx]
|
||||
hasDeadlineExceededErr = hasDeadlineExceededErr || targetReplica.deadlineErrUsingConfTimeout
|
||||
if idx == state.leaderIdx {
|
||||
continue
|
||||
//hasDeadlineExceededErr || targetReplica.deadlineErrUsingConfTimeout
|
||||
filterReplicas := func(fn func(*replica) bool) (AccessIndex, *replica) {
|
||||
for i := 0; i < len(selector.replicas); i++ {
|
||||
idx := AccessIndex((int(state.lastIdx) + i) % len(selector.replicas))
|
||||
if idx == state.leaderIdx {
|
||||
continue
|
||||
}
|
||||
selectReplica := selector.replicas[idx]
|
||||
hasDeadlineExceededErr = hasDeadlineExceededErr || selectReplica.deadlineErrUsingConfTimeout
|
||||
if selectReplica.store.getLivenessState() != unreachable && !selectReplica.deadlineErrUsingConfTimeout &&
|
||||
fn(selectReplica) {
|
||||
return idx, selectReplica
|
||||
}
|
||||
}
|
||||
// Each follower is only tried once
|
||||
if !targetReplica.isExhausted(1) && targetReplica.store.getLivenessState() != unreachable && !targetReplica.deadlineErrUsingConfTimeout {
|
||||
return -1, nil
|
||||
}
|
||||
|
||||
if len(state.labels) > 0 {
|
||||
idx, selectReplica := filterReplicas(func(selectReplica *replica) bool {
|
||||
return selectReplica.store.IsLabelsMatch(state.labels)
|
||||
})
|
||||
if selectReplica != nil && idx >= 0 {
|
||||
state.lastIdx = idx
|
||||
selector.targetIdx = idx
|
||||
}
|
||||
// labels only take effect for first try.
|
||||
state.labels = nil
|
||||
}
|
||||
|
||||
if selector.targetIdx < 0 {
|
||||
// Search replica that is not attempted from the last accessed replica
|
||||
idx, selectReplica := filterReplicas(func(selectReplica *replica) bool {
|
||||
return !selectReplica.isExhausted(1)
|
||||
})
|
||||
if selectReplica != nil && idx >= 0 {
|
||||
state.lastIdx = idx
|
||||
selector.targetIdx = idx
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// If all followers are tried and fail, backoff and retry.
|
||||
if selector.targetIdx < 0 {
|
||||
if hasDeadlineExceededErr {
|
||||
|
|
@ -427,22 +452,24 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (
|
|||
}
|
||||
rpcCtx, err := selector.buildRPCContext(bo)
|
||||
if err != nil || rpcCtx == nil {
|
||||
return nil, err
|
||||
return rpcCtx, err
|
||||
}
|
||||
// If the state is changed from onNotLeader, the `replicaRead` flag should not be set as leader read would still be used.
|
||||
if !state.fromOnNotLeader {
|
||||
replicaRead := selector.targetIdx != state.leaderIdx
|
||||
replicaRead := true
|
||||
rpcCtx.contextPatcher.replicaRead = &replicaRead
|
||||
}
|
||||
disableStaleRead := false
|
||||
rpcCtx.contextPatcher.staleRead = &disableStaleRead
|
||||
staleRead := false
|
||||
rpcCtx.contextPatcher.staleRead = &staleRead
|
||||
return rpcCtx, nil
|
||||
}
|
||||
|
||||
func (state *tryFollower) onSendSuccess(selector *replicaSelector) {
|
||||
if state.fromOnNotLeader {
|
||||
if !selector.region.switchWorkLeaderToPeer(selector.targetReplica().peer) {
|
||||
panic("the store must exist")
|
||||
peer := selector.targetReplica().peer
|
||||
if !selector.region.switchWorkLeaderToPeer(peer) {
|
||||
logutil.BgLogger().Warn("the store must exist",
|
||||
zap.Uint64("store", peer.StoreId),
|
||||
zap.Uint64("peer", peer.Id))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -565,6 +592,10 @@ type accessFollower struct {
|
|||
learnerOnly bool
|
||||
}
|
||||
|
||||
// Follower read will try followers first, if no follower is available, it will fallback to leader.
|
||||
// Specially, for stale read, it tries local peer(can be either leader or follower), then use snapshot read in the leader,
|
||||
// if the leader read receive server-is-busy and connection errors, the region cache is still valid,
|
||||
// and the state will be changed to tryFollower, which will read by replica read.
|
||||
func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
|
||||
replicaSize := len(selector.replicas)
|
||||
resetStaleRead := false
|
||||
|
|
@ -633,7 +664,8 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
|
|||
// If there is no candidate, fallback to the leader.
|
||||
if selector.targetIdx < 0 {
|
||||
leader := selector.replicas[state.leaderIdx]
|
||||
leaderInvalid := leader.isEpochStale() || state.IsLeaderExhausted(leader)
|
||||
leaderEpochStale := leader.isEpochStale()
|
||||
leaderInvalid := leaderEpochStale || state.IsLeaderExhausted(leader)
|
||||
if len(state.option.labels) > 0 {
|
||||
logutil.Logger(bo.GetCtx()).Warn("unable to find stores with given labels",
|
||||
zap.Uint64("region", selector.region.GetID()),
|
||||
|
|
@ -645,6 +677,20 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
|
|||
return nil, nil
|
||||
}
|
||||
if leaderInvalid {
|
||||
// In stale-read, the request will fallback to leader after the local follower failure.
|
||||
// If the leader is also unavailable, we can fallback to the follower and use replica-read flag again,
|
||||
// The remote follower not tried yet, and the local follower can retry without stale-read flag.
|
||||
if state.isStaleRead {
|
||||
selector.state = &tryFollower{
|
||||
leaderIdx: state.leaderIdx,
|
||||
lastIdx: state.leaderIdx,
|
||||
labels: state.option.labels,
|
||||
}
|
||||
if leaderEpochStale {
|
||||
selector.regionCache.scheduleReloadRegion(selector.region)
|
||||
}
|
||||
return nil, stateChanged{}
|
||||
}
|
||||
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc()
|
||||
selector.invalidateRegion()
|
||||
return nil, nil
|
||||
|
|
@ -695,23 +741,25 @@ func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool
|
|||
if replica.isEpochStale() || replica.isExhausted(1) || replica.store.getLivenessState() == unreachable || replica.deadlineErrUsingConfTimeout {
|
||||
return false
|
||||
}
|
||||
// The request can only be sent to the leader.
|
||||
if state.option.leaderOnly && idx == state.leaderIdx {
|
||||
return true
|
||||
if state.option.leaderOnly {
|
||||
// The request can only be sent to the leader.
|
||||
return idx == state.leaderIdx
|
||||
}
|
||||
// Choose a replica with matched labels.
|
||||
followerCandidate := !state.option.leaderOnly && (state.tryLeader || idx != state.leaderIdx) &&
|
||||
replica.store.IsLabelsMatch(state.option.labels) && (!state.learnerOnly || replica.peer.Role == metapb.PeerRole_Learner)
|
||||
if !followerCandidate {
|
||||
if !state.tryLeader && idx == state.leaderIdx {
|
||||
// The request cannot be sent to leader.
|
||||
return false
|
||||
}
|
||||
if state.learnerOnly {
|
||||
// The request can only be sent to the learner.
|
||||
return replica.peer.Role == metapb.PeerRole_Learner
|
||||
}
|
||||
// And If the leader store is abnormal to be accessed under `ReplicaReadPreferLeader` mode, we should choose other valid followers
|
||||
// as candidates to serve the Read request.
|
||||
if state.option.preferLeader && replica.store.isSlow() {
|
||||
return false
|
||||
}
|
||||
// If the stores are limited, check if the store is in the list.
|
||||
return replica.store.IsStoreMatch(state.option.stores)
|
||||
// Choose a replica with matched labels.
|
||||
return replica.store.IsStoreMatch(state.option.stores) && replica.store.IsLabelsMatch(state.option.labels)
|
||||
}
|
||||
|
||||
// tryIdleReplica is the state where we find the leader is busy and retry the request using replica read.
|
||||
|
|
@ -1101,6 +1149,9 @@ func (s *replicaSelector) onServerIsBusy(
|
|||
// Mark the server is busy (the next incoming READs could be redirect
|
||||
// to expected followers. )
|
||||
ctx.Store.markAlreadySlow()
|
||||
if s.canFallback2Follower() {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
err = bo.Backoff(retry.BoTiKVServerBusy, errors.Errorf("server is busy, ctx: %v", ctx))
|
||||
if err != nil {
|
||||
|
|
@ -1109,6 +1160,23 @@ func (s *replicaSelector) onServerIsBusy(
|
|||
return true, nil
|
||||
}
|
||||
|
||||
// For some reasons, the leader is unreachable by now, try followers instead.
|
||||
// the state is changed in accessFollower.next when leader is unavailable.
|
||||
func (s *replicaSelector) canFallback2Follower() bool {
|
||||
if s == nil || s.state == nil {
|
||||
return false
|
||||
}
|
||||
state, ok := s.state.(*accessFollower)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
if !state.isStaleRead {
|
||||
return false
|
||||
}
|
||||
// can fallback to follower only when the leader is exhausted.
|
||||
return state.lastIdx == state.leaderIdx && state.IsLeaderExhausted(s.replicas[state.leaderIdx])
|
||||
}
|
||||
|
||||
func (s *replicaSelector) invalidateRegion() {
|
||||
if s.region != nil {
|
||||
s.region.invalidate(Other)
|
||||
|
|
|
|||
|
|
@ -348,7 +348,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestLearnerReplicaSelector() {
|
|||
atomic.StoreInt64(®ion.lastAccess, time.Now().Unix())
|
||||
rpcCtx, err := replicaSelector.next(s.bo)
|
||||
s.Nil(err)
|
||||
// Should swith to the next follower.
|
||||
// Should switch to the next follower.
|
||||
s.Equal(AccessIndex(tikvLearnerAccessIdx), accessLearner.lastIdx)
|
||||
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[replicaSelector.targetIdx], nil)
|
||||
}
|
||||
|
|
@ -590,7 +590,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
|
|||
for i := 0; i < regionStore.accessStoreNum(tiKVOnly)-1; i++ {
|
||||
rpcCtx, err := replicaSelector.next(s.bo)
|
||||
s.Nil(err)
|
||||
// Should swith to the next follower.
|
||||
// Should switch to the next follower.
|
||||
s.NotEqual(lastIdx, state3.lastIdx)
|
||||
// Shouldn't access the leader if followers aren't exhausted.
|
||||
s.NotEqual(regionStore.workTiKVIdx, state3.lastIdx)
|
||||
|
|
@ -1284,3 +1284,95 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback2Follower() {
|
||||
leaderStore, _ := s.loadAndGetLeaderStore()
|
||||
leaderLabel := []*metapb.StoreLabel{
|
||||
{
|
||||
Key: "id",
|
||||
Value: strconv.FormatUint(leaderStore.StoreID(), 10),
|
||||
},
|
||||
}
|
||||
var followerID *uint64
|
||||
for _, storeID := range s.storeIDs {
|
||||
if storeID != leaderStore.storeID {
|
||||
id := storeID
|
||||
followerID = &id
|
||||
break
|
||||
}
|
||||
}
|
||||
s.NotNil(followerID)
|
||||
followerLabel := []*metapb.StoreLabel{
|
||||
{
|
||||
Key: "id",
|
||||
Value: strconv.FormatUint(*followerID, 10),
|
||||
},
|
||||
}
|
||||
|
||||
regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID)
|
||||
s.Nil(err)
|
||||
s.NotNil(regionLoc)
|
||||
|
||||
dataIsNotReady := false
|
||||
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, errors.New("timeout")
|
||||
default:
|
||||
}
|
||||
if dataIsNotReady && req.StaleRead {
|
||||
dataIsNotReady = false
|
||||
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
|
||||
DataIsNotReady: &errorpb.DataIsNotReady{},
|
||||
}}}, nil
|
||||
}
|
||||
if addr == leaderStore.addr {
|
||||
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
|
||||
ServerIsBusy: &errorpb.ServerIsBusy{},
|
||||
}}}, nil
|
||||
}
|
||||
if !req.ReplicaRead {
|
||||
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
|
||||
NotLeader: &errorpb.NotLeader{},
|
||||
}}}, nil
|
||||
}
|
||||
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte(addr)}}, nil
|
||||
}}
|
||||
|
||||
for _, localLeader := range []bool{true, false} {
|
||||
dataIsNotReady = true
|
||||
// data is not ready, then server is busy in the first round,
|
||||
// directly server is busy in the second round.
|
||||
for i := 0; i < 2; i++ {
|
||||
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil)
|
||||
req.ReadReplicaScope = oracle.GlobalTxnScope
|
||||
req.TxnScope = oracle.GlobalTxnScope
|
||||
req.EnableStaleRead()
|
||||
req.ReplicaReadType = kv.ReplicaReadMixed
|
||||
var ops []StoreSelectorOption
|
||||
if localLeader {
|
||||
ops = append(ops, WithMatchLabels(leaderLabel))
|
||||
} else {
|
||||
ops = append(ops, WithMatchLabels(followerLabel))
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10000*time.Second)
|
||||
bo := retry.NewBackoffer(ctx, -1)
|
||||
s.Nil(err)
|
||||
resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV, ops...)
|
||||
s.Nil(err)
|
||||
|
||||
regionErr, err := resp.GetRegionError()
|
||||
s.Nil(err)
|
||||
s.Nil(regionErr)
|
||||
getResp, ok := resp.Resp.(*kvrpcpb.GetResponse)
|
||||
s.True(ok)
|
||||
if localLeader {
|
||||
s.NotEqual(getResp.Value, []byte("store"+leaderLabel[0].Value))
|
||||
} else {
|
||||
s.Equal(getResp.Value, []byte("store"+followerLabel[0].Value))
|
||||
}
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue