diff --git a/config/client.go b/config/client.go index b65d55eb..cfce73c6 100644 --- a/config/client.go +++ b/config/client.go @@ -100,9 +100,6 @@ type TiKVClient struct { // MaxConcurrencyRequestLimit is the max concurrency number of request to be sent the tikv // 0 means auto adjust by feedback. MaxConcurrencyRequestLimit int64 `toml:"max-concurrency-request-limit" json:"max-concurrency-request-limit"` - // EnableReplicaSelectorV2 indicate whether to use the new replica-selector-v2. - // TODO(crazycs520): remove this config after the new replica-selector-v2 is stable. - EnableReplicaSelectorV2 bool `toml:"enable-replica-selector-v2" json:"enable-replica-selector-v2"` } // AsyncCommit is the config for the async commit feature. The switch to enable it is a system variable. @@ -176,7 +173,6 @@ func DefaultTiKVClient() TiKVClient { ResolveLockLiteThreshold: 16, MaxConcurrencyRequestLimit: DefMaxConcurrencyRequestLimit, - EnableReplicaSelectorV2: true, } } diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 0c7d9730..2d9b00f7 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -39,7 +39,6 @@ import ( "context" "encoding/hex" "fmt" - "math" "math/rand" "slices" "sort" @@ -799,8 +798,6 @@ type RPCContext struct { ProxyAddr string // valid when ProxyStore is not nil TiKVNum int // Number of TiKV nodes among the region's peers. Assuming non-TiKV peers are all TiFlash peers. BucketVersion uint64 - - contextPatcher contextPatcher // kvrpcpb.Context fields that need to be overridden } func (c *RPCContext) String() string { @@ -816,29 +813,6 @@ func (c *RPCContext) String() string { return res } -type contextPatcher struct { - replicaRead *bool - busyThreshold *time.Duration - staleRead *bool -} - -func (patcher *contextPatcher) applyTo(pbCtx *kvrpcpb.Context) { - if patcher.replicaRead != nil { - pbCtx.ReplicaRead = *patcher.replicaRead - } - if patcher.staleRead != nil { - pbCtx.StaleRead = *patcher.staleRead - } - if patcher.busyThreshold != nil { - millis := patcher.busyThreshold.Milliseconds() - if millis > 0 && millis <= math.MaxUint32 { - pbCtx.BusyThresholdMs = uint32(millis) - } else { - pbCtx.BusyThresholdMs = 0 - } - } -} - type storeSelectorOp struct { leaderOnly bool preferLeader bool diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 04d0d12c..c756b296 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -38,14 +38,12 @@ import ( "context" "fmt" "maps" - "math" "math/rand" "strconv" "strings" "sync" "sync/atomic" "time" - "unsafe" "go.uber.org/zap" "google.golang.org/grpc/codes" @@ -110,7 +108,7 @@ type RegionRequestSender struct { client client.Client storeAddr string rpcError error - replicaSelector ReplicaSelector + replicaSelector *replicaSelector failStoreIDs map[uint64]struct{} failProxyStoreIDs map[uint64]struct{} Stats *RegionRequestRuntimeStats @@ -119,7 +117,7 @@ type RegionRequestSender struct { func (s *RegionRequestSender) String() string { if s.replicaSelector == nil { - return fmt.Sprintf("{rpcError:%v, replicaSelector: %v}", s.rpcError, s.replicaSelector) + return fmt.Sprintf("{rpcError:%v, replicaSelector: }", s.rpcError) } return fmt.Sprintf("{rpcError:%v, replicaSelector: %v}", s.rpcError, s.replicaSelector.String()) } @@ -450,11 +448,7 @@ type replica struct { epoch uint32 attempts int attemptedTime time.Duration - // deadlineErrUsingConfTimeout indicates the replica is already tried, but the received deadline exceeded error. - deadlineErrUsingConfTimeout bool - dataIsNotReady bool - notLeader bool - serverIsBusy bool + flag uint8 } func (r *replica) getEpoch() uint32 { @@ -476,7 +470,26 @@ func (r *replica) onUpdateLeader() { r.attempts = maxReplicaAttempt - 1 r.attemptedTime = 0 } - r.notLeader = false + r.deleteFlag(notLeaderFlag) +} + +const ( + deadlineErrUsingConfTimeoutFlag uint8 = 1 << iota // deadlineErrUsingConfTimeoutFlag indicates the replica is already tried, but the received deadline exceeded error. + dataIsNotReadyFlag // dataIsNotReadyFlag indicates the replica is already tried, but the received data is not ready error. + notLeaderFlag // notLeaderFlag indicates the replica is already tried, but the received not leader error. + serverIsBusyFlag // serverIsBusyFlag indicates the replica is already tried, but the received server is busy error. +) + +func (r *replica) addFlag(flag uint8) { + r.flag |= flag +} + +func (r *replica) deleteFlag(flag uint8) { + r.flag &= ^flag +} + +func (r *replica) hasFlag(flag uint8) bool { + return (r.flag & flag) > 0 } type baseReplicaSelector struct { @@ -505,56 +518,6 @@ type baseReplicaSelector struct { pendingBackoffs map[uint64]*backoffArgs } -// TODO(crazycs520): remove this after replicaSelectorV2 stable. -type replicaSelector struct { - baseReplicaSelector - regionStore *regionStore - labels []*metapb.StoreLabel - state selectorState - // replicas[targetIdx] is the replica handling the request this time - targetIdx AccessIndex - // replicas[proxyIdx] is the store used to redirect requests this time - proxyIdx AccessIndex -} - -func selectorStateToString(state selectorState) string { - replicaSelectorState := "nil" - if state != nil { - switch state.(type) { - case *accessKnownLeader: - replicaSelectorState = "accessKnownLeader" - case *accessFollower: - replicaSelectorState = "accessFollower" - case *accessByKnownProxy: - replicaSelectorState = "accessByKnownProxy" - case *tryFollower: - replicaSelectorState = "tryFollower" - case *tryNewProxy: - replicaSelectorState = "tryNewProxy" - case *tryIdleReplica: - replicaSelectorState = "tryIdleReplica" - case *invalidLeader: - replicaSelectorState = "invalidLeader" - case *invalidStore: - replicaSelectorState = "invalidStore" - case *stateBase: - replicaSelectorState = "stateBase" - case nil: - replicaSelectorState = "nil" - } - } - return replicaSelectorState -} - -func (s *replicaSelector) String() string { - selectorStateStr := "nil" - if s != nil { - selectorStateStr = selectorStateToString(s.state) - } - - return fmt.Sprintf("replicaSelector{state: %v, %v}", selectorStateStr, s.baseReplicaSelector.String()) -} - func (s *baseReplicaSelector) String() string { var replicaStatus []string cacheRegionIsValid := "unknown" @@ -584,699 +547,6 @@ func (s *baseReplicaSelector) String() string { return fmt.Sprintf("cacheRegionIsValid: %v, replicaStatus: %v", cacheRegionIsValid, replicaStatus) } -// selectorState is the interface of states of the replicaSelector. -// Here is the main state transition diagram: -// -// exceeding maxReplicaAttempt -// +-------------------+ || RPC failure && unreachable && no forwarding -// +-------->+ accessKnownLeader +----------------+ -// | +------+------------+ | -// | | | -// | | RPC failure v -// | | && unreachable +-----+-----+ -// | | && enable forwarding |tryFollower+------+ -// | | +-----------+ | -// | leader becomes v | all followers -// | reachable +----+-------------+ | are tried -// +-----------+accessByKnownProxy| | -// ^ +------+-----------+ | -// | | +-------+ | -// | | RPC failure |backoff+<---+ -// | leader becomes v +---+---+ -// | reachable +-----+-----+ all proxies are tried ^ -// +------------+tryNewProxy+-------------------------+ -// +-----------+ - -type selectorState interface { - next(*retry.Backoffer, *replicaSelector) (*RPCContext, error) - onSendSuccess(*replicaSelector) - onSendFailure(*retry.Backoffer, *replicaSelector, error) - onNoLeader(*replicaSelector) -} - -type stateChanged struct{} - -func (c stateChanged) Error() string { - return "replicaSelector state changed" -} - -type stateBase struct{} - -func (s stateBase) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { - return nil, nil -} - -func (s stateBase) onSendSuccess(selector *replicaSelector) { -} - -func (s stateBase) onSendFailure(backoffer *retry.Backoffer, selector *replicaSelector, err error) { -} - -func (s stateBase) onNoLeader(selector *replicaSelector) { -} - -// accessKnownLeader is the state where we are sending requests -// to the leader we suppose to be. -// -// After attempting maxReplicaAttempt times without success -// and without receiving new leader from the responses error, -// we should switch to tryFollower state. -type accessKnownLeader struct { - stateBase - leaderIdx AccessIndex -} - -func (state *accessKnownLeader) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { - leader := selector.replicas[state.leaderIdx] - liveness := leader.store.getLivenessState() - if liveness == unreachable && selector.regionCache.enableForwarding { - selector.state = &tryNewProxy{leaderIdx: state.leaderIdx} - return nil, stateChanged{} - } - if !isLeaderCandidate(leader) { - selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true} - return nil, stateChanged{} - } - if selector.busyThreshold > 0 { - // If the leader is busy in our estimation, change to tryIdleReplica state to try other replicas. - // If other replicas are all busy, tryIdleReplica will try the leader again without busy threshold. - leaderEstimated := selector.replicas[state.leaderIdx].store.EstimatedWaitTime() - if leaderEstimated > selector.busyThreshold { - selector.state = &tryIdleReplica{leaderIdx: state.leaderIdx} - return nil, stateChanged{} - } - } - selector.targetIdx = state.leaderIdx - return selector.buildRPCContext(bo) -} - -// check leader is candidate or not. -func isLeaderCandidate(leader *replica) bool { - // If hibernate region is enabled and the leader is not reachable, the raft group - // will not be wakened up and re-elect the leader until the follower receives - // a request. So, before the new leader is elected, we should not send requests - // to the unreachable old leader to avoid unnecessary timeout. - // If leader.deadlineErrUsingConfTimeout is true, it means the leader is already tried and received deadline exceeded error, then don't retry it. - // If leader.notLeader is true, it means the leader is already tried and received not leader error, then don't retry it. - if leader.store.getLivenessState() != reachable || - leader.isExhausted(maxReplicaAttempt, maxReplicaAttemptTime) || - leader.deadlineErrUsingConfTimeout || - leader.notLeader || - leader.isEpochStale() { // check leader epoch here, if leader.epoch staled, we can try other replicas. instead of buildRPCContext failed and invalidate region then retry. - return false - } - return true -} - -func (state *accessKnownLeader) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) { - liveness := selector.checkLiveness(bo, selector.targetReplica()) - // Only enable forwarding when unreachable to avoid using proxy to access a TiKV that cannot serve. - if liveness == unreachable && len(selector.replicas) > 1 && selector.regionCache.enableForwarding { - selector.state = &accessByKnownProxy{leaderIdx: state.leaderIdx} - return - } - if liveness != reachable || selector.targetReplica().isExhausted(maxReplicaAttempt, maxReplicaAttemptTime) { - selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true} - } - if liveness != reachable { - selector.invalidateReplicaStore(selector.targetReplica(), cause) - } -} - -func (state *accessKnownLeader) onNoLeader(selector *replicaSelector) { - selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true} -} - -// tryFollower is the state where we cannot access the known leader -// but still try other replicas in case they have become the leader. -// -// In this state, a follower that is not tried will be used. If all -// followers are tried, we think we have exhausted the replicas. -// On sending failure in this state, if leader info is returned, -// the leader will be updated to replicas[0] and give it another chance. -type tryFollower struct { - stateBase - leaderIdx AccessIndex - lastIdx AccessIndex - // fromAccessKnownLeader indicates whether the state is changed from `accessKnownLeader`. - fromAccessKnownLeader bool - labels []*metapb.StoreLabel -} - -func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { - filterReplicas := func(fn func(*replica) bool) (AccessIndex, *replica) { - for i := 0; i < len(selector.replicas); i++ { - idx := AccessIndex((int(state.lastIdx) + i) % len(selector.replicas)) - if idx == state.leaderIdx { - continue - } - selectReplica := selector.replicas[idx] - if selectReplica.store.getLivenessState() != unreachable && !selectReplica.deadlineErrUsingConfTimeout && - fn(selectReplica) { - return idx, selectReplica - } - } - return -1, nil - } - - if len(state.labels) > 0 { - idx, selectReplica := filterReplicas(func(selectReplica *replica) bool { - return selectReplica.store.IsLabelsMatch(state.labels) && !state.isExhausted(selectReplica) - }) - if selectReplica != nil && idx >= 0 { - state.lastIdx = idx - selector.targetIdx = idx - } - // labels only take effect for first try. - state.labels = nil - } - - if selector.targetIdx < 0 { - // Search replica that is not attempted from the last accessed replica - idx, selectReplica := filterReplicas(func(selectReplica *replica) bool { - return !state.isExhausted(selectReplica) - }) - if selectReplica != nil && idx >= 0 { - state.lastIdx = idx - selector.targetIdx = idx - } - } - - // If all followers are tried and fail, backoff and retry. - if selector.targetIdx < 0 { - // when meet deadline exceeded error, do fast retry without invalidate region cache. - if !hasDeadlineExceededError(selector.replicas) { - selector.invalidateRegion() - } - metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc() - return nil, nil - } - rpcCtx, err := selector.buildRPCContext(bo) - if err != nil || rpcCtx == nil { - return rpcCtx, err - } - if !state.fromAccessKnownLeader { - replicaRead := true - rpcCtx.contextPatcher.replicaRead = &replicaRead - } - staleRead := false - rpcCtx.contextPatcher.staleRead = &staleRead - return rpcCtx, nil -} - -func (state *tryFollower) isExhausted(replica *replica) bool { - if replica.dataIsNotReady { - // we can retry DataIsNotReady replica by replica-read. - return replica.isExhausted(2, 0) - } - return replica.isExhausted(1, 0) -} - -func (state *tryFollower) onSendSuccess(selector *replicaSelector) { - if state.fromAccessKnownLeader { - peer := selector.targetReplica().peer - if !selector.region.switchWorkLeaderToPeer(peer) { - logutil.BgLogger().Warn("the store must exist", - zap.Uint64("store", peer.StoreId), - zap.Uint64("peer", peer.Id)) - } - } -} - -func (state *tryFollower) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) { - if selector.checkLiveness(bo, selector.targetReplica()) != reachable { - selector.invalidateReplicaStore(selector.targetReplica(), cause) - } -} - -// accessByKnownProxy is the state where we are sending requests through -// regionStore.proxyTiKVIdx as a proxy. -type accessByKnownProxy struct { - stateBase - leaderIdx AccessIndex -} - -func (state *accessByKnownProxy) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { - leader := selector.replicas[state.leaderIdx] - if leader.store.getLivenessState() == reachable { - selector.regionStore.unsetProxyStoreIfNeeded(selector.region) - selector.state = &accessKnownLeader{leaderIdx: state.leaderIdx} - return nil, stateChanged{} - } - - if selector.regionStore.proxyTiKVIdx >= 0 { - selector.targetIdx = state.leaderIdx - selector.proxyIdx = selector.regionStore.proxyTiKVIdx - return selector.buildRPCContext(bo) - } - - selector.state = &tryNewProxy{leaderIdx: state.leaderIdx} - return nil, stateChanged{} -} - -func (state *accessByKnownProxy) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) { - selector.state = &tryNewProxy{leaderIdx: state.leaderIdx} - if selector.checkLiveness(bo, selector.proxyReplica()) != reachable { - selector.invalidateReplicaStore(selector.proxyReplica(), cause) - } -} - -func (state *accessByKnownProxy) onNoLeader(selector *replicaSelector) { - selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true} -} - -// tryNewProxy is the state where we try to find a node from followers as proxy. -type tryNewProxy struct { - leaderIdx AccessIndex -} - -func (state *tryNewProxy) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { - leader := selector.replicas[state.leaderIdx] - if leader.store.getLivenessState() == reachable { - selector.regionStore.unsetProxyStoreIfNeeded(selector.region) - selector.state = &accessKnownLeader{leaderIdx: state.leaderIdx} - return nil, stateChanged{} - } - - candidateNum := 0 - for idx, replica := range selector.replicas { - if state.isCandidate(AccessIndex(idx), replica) { - candidateNum++ - } - } - - // If all followers are tried as a proxy and fail, mark the leader store invalid, then backoff and retry. - if candidateNum == 0 { - metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc() - selector.invalidateReplicaStore(leader, errors.Errorf("all followers are tried as proxy but fail")) - selector.region.setSyncFlags(needReloadOnAccess) - return nil, nil - } - - // Skip advanceCnt valid candidates to find a proxy peer randomly - advanceCnt := randIntn(candidateNum) - for idx, replica := range selector.replicas { - if !state.isCandidate(AccessIndex(idx), replica) { - continue - } - if advanceCnt == 0 { - selector.targetIdx = state.leaderIdx - selector.proxyIdx = AccessIndex(idx) - break - } - advanceCnt-- - } - return selector.buildRPCContext(bo) -} - -func (state *tryNewProxy) isCandidate(idx AccessIndex, replica *replica) bool { - // Try each peer only once - return idx != state.leaderIdx && !replica.isExhausted(1, 0) -} - -func (state *tryNewProxy) onSendSuccess(selector *replicaSelector) { - selector.regionStore.setProxyStoreIdx(selector.region, selector.proxyIdx) -} - -func (state *tryNewProxy) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) { - if selector.checkLiveness(bo, selector.proxyReplica()) != reachable { - selector.invalidateReplicaStore(selector.proxyReplica(), cause) - } -} - -func (state *tryNewProxy) onNoLeader(selector *replicaSelector) { - selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true} -} - -// accessFollower is the state where we are sending requests to TiKV followers. -// If there is no suitable follower, requests will be sent to the leader as a fallback. -type accessFollower struct { - stateBase - // If tryLeader is true, the request can also be sent to the leader when !leader.isSlow() - tryLeader bool - isStaleRead bool - option storeSelectorOp - leaderIdx AccessIndex - lastIdx AccessIndex - learnerOnly bool -} - -// Follower read will try followers first, if no follower is available, it will fallback to leader. -// Specially, for stale read, it tries local peer(can be either leader or follower), then use snapshot read in the leader, -// if the leader read receive server-is-busy and connection errors, the region cache is still valid, -// and the state will be changed to tryFollower, which will read by replica read. -func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { - replicaSize := len(selector.replicas) - resetStaleRead := false - if state.lastIdx < 0 { - if state.tryLeader { - state.lastIdx = AccessIndex(randIntn(replicaSize)) - } else { - if replicaSize <= 1 { - state.lastIdx = state.leaderIdx - } else { - // Randomly select a non-leader peer - state.lastIdx = AccessIndex(randIntn(replicaSize - 1)) - if state.lastIdx >= state.leaderIdx { - state.lastIdx++ - } - } - } - } else { - // Stale Read request will retry the leader only by using the WithLeaderOnly option. - if state.isStaleRead { - WithLeaderOnly()(&state.option) - // retry on the leader should not use stale read flag to avoid possible DataIsNotReady error as it always can serve any read. - resetStaleRead = true - } - state.lastIdx++ - } - - // If selector is under `ReplicaReadPreferLeader` mode, we should choose leader as high priority. - if state.option.preferLeader { - state.lastIdx = state.leaderIdx - } - var offset int - if state.lastIdx >= 0 { - offset = randIntn(replicaSize) - } - reloadRegion := false - for i := 0; i < replicaSize && !state.option.leaderOnly; i++ { - var idx AccessIndex - if state.option.preferLeader { - if i == 0 { - idx = state.lastIdx - } else { - // randomly select next replica, but skip state.lastIdx - // since i must be greater than or equal to 1, so use i-1 to try from the first replica to make test stable. - if (i-1+offset)%replicaSize == int(state.leaderIdx) { - offset++ - } - idx = AccessIndex((i - 1 + offset) % replicaSize) - } - } else { - idx = AccessIndex((offset + i) % replicaSize) - } - selectReplica := selector.replicas[idx] - if state.isCandidate(idx, selectReplica) { - state.lastIdx = idx - selector.targetIdx = idx - break - } - if selectReplica.isEpochStale() && - selectReplica.store.getResolveState() == resolved && - selectReplica.store.getLivenessState() == reachable { - reloadRegion = true - } - } - if reloadRegion { - selector.region.setSyncFlags(needDelayedReloadPending) - } - // If there is no candidate, fallback to the leader. - if selector.targetIdx < 0 { - leader := selector.replicas[state.leaderIdx] - leaderEpochStale := leader.isEpochStale() - leaderUnreachable := leader.store.getLivenessState() != reachable - leaderExhausted := state.IsLeaderExhausted(leader) - leaderInvalid := leaderEpochStale || leaderUnreachable || leaderExhausted - if len(state.option.labels) > 0 && !state.option.leaderOnly { - logutil.Logger(bo.GetCtx()).Warn("unable to find a store with given labels", - zap.Uint64("region", selector.region.GetID()), - zap.Any("labels", state.option.labels)) - } - if leaderInvalid || leader.deadlineErrUsingConfTimeout { - logutil.Logger(bo.GetCtx()).Warn("unable to find valid leader", - zap.Uint64("region", selector.region.GetID()), - zap.Bool("epoch-stale", leaderEpochStale), - zap.Bool("unreachable", leaderUnreachable), - zap.Bool("exhausted", leaderExhausted), - zap.Bool("kv-timeout", leader.deadlineErrUsingConfTimeout), - zap.Bool("stale-read", state.isStaleRead)) - // In stale-read, the request will fallback to leader after the local follower failure. - // If the leader is also unavailable, we can fallback to the follower and use replica-read flag again, - // The remote follower not tried yet, and the local follower can retry without stale-read flag. - // If leader tried and received deadline exceeded error, try follower. - // If labels are used, some followers would be filtered by the labels and can't be candidates, they still need to be retried. - if state.isStaleRead || leader.deadlineErrUsingConfTimeout || len(state.option.labels) > 0 { - selector.state = &tryFollower{ - leaderIdx: state.leaderIdx, - lastIdx: state.leaderIdx, - labels: state.option.labels, - } - if leaderEpochStale { - selector.region.setSyncFlags(needDelayedReloadPending) - } - return nil, stateChanged{} - } - // when meet deadline exceeded error, do fast retry without invalidate region cache. - if !hasDeadlineExceededError(selector.replicas) { - selector.invalidateRegion() - } - metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc() - return nil, nil - } - state.lastIdx = state.leaderIdx - selector.targetIdx = state.leaderIdx - } - // Monitor the flows destination if selector is under `ReplicaReadPreferLeader` mode. - if state.option.preferLeader { - if selector.targetIdx != state.leaderIdx { - selector.replicas[selector.targetIdx].store.recordReplicaFlowsStats(toFollower) - } else { - selector.replicas[selector.targetIdx].store.recordReplicaFlowsStats(toLeader) - } - } - rpcCtx, err := selector.buildRPCContext(bo) - if err != nil || rpcCtx == nil { - return nil, err - } - if resetStaleRead { - staleRead := false - rpcCtx.contextPatcher.staleRead = &staleRead - } - return rpcCtx, nil -} - -func (state *accessFollower) IsLeaderExhausted(leader *replica) bool { - return leader.isExhausted(1, 0) -} - -func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) { - if selector.checkLiveness(bo, selector.targetReplica()) != reachable { - selector.invalidateReplicaStore(selector.targetReplica(), cause) - } -} - -func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool { - // the epoch is staled or retry exhausted, or the store is unreachable. - if replica.isEpochStale() || replica.isExhausted(1, 0) || replica.store.getLivenessState() == unreachable || replica.deadlineErrUsingConfTimeout { - return false - } - if state.option.leaderOnly { - // The request can only be sent to the leader. - return idx == state.leaderIdx - } - if !state.tryLeader && idx == state.leaderIdx { - // The request cannot be sent to leader. - return false - } - if state.learnerOnly { - // The request can only be sent to the learner. - return replica.peer.Role == metapb.PeerRole_Learner - } - // And If the leader store is abnormal to be accessed under `ReplicaReadPreferLeader` mode, we should choose other valid followers - // as candidates to serve the Read request. - if state.option.preferLeader && replica.store.healthStatus.IsSlow() { - return false - } - // Choose a replica with matched labels. - return replica.store.IsStoreMatch(state.option.stores) && replica.store.IsLabelsMatch(state.option.labels) -} - -func hasDeadlineExceededError(replicas []*replica) bool { - for _, replica := range replicas { - if replica.deadlineErrUsingConfTimeout { - // when meet deadline exceeded error, do fast retry without invalidate region cache. - return true - } - } - return false -} - -// tryIdleReplica is the state where we find the leader is busy and retry the request using replica read. -type tryIdleReplica struct { - stateBase - leaderIdx AccessIndex -} - -func (state *tryIdleReplica) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { - // Select a follower replica that has the lowest estimated wait duration - minWait := time.Duration(math.MaxInt64) - targetIdx := state.leaderIdx - startIdx := randIntn(len(selector.replicas)) - for i := 0; i < len(selector.replicas); i++ { - idx := (i + startIdx) % len(selector.replicas) - r := selector.replicas[idx] - // Don't choose leader again by default. - if idx == int(state.leaderIdx) { - continue - } - if !state.isCandidate(r) { - continue - } - estimated := r.store.EstimatedWaitTime() - if estimated > selector.busyThreshold { - continue - } - if estimated < minWait { - minWait = estimated - targetIdx = AccessIndex(idx) - } - if minWait == 0 { - break - } - } - if targetIdx == state.leaderIdx && !isLeaderCandidate(selector.replicas[targetIdx]) { - // when meet deadline exceeded error, do fast retry without invalidate region cache. - if !hasDeadlineExceededError(selector.replicas) { - selector.invalidateRegion() - } - metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc() - return nil, nil - } - selector.targetIdx = targetIdx - rpcCtx, err := selector.buildRPCContext(bo) - if err != nil || rpcCtx == nil { - return nil, err - } - replicaRead := targetIdx != state.leaderIdx - rpcCtx.contextPatcher.replicaRead = &replicaRead - if targetIdx == state.leaderIdx { - // No threshold if all peers are too busy. - selector.busyThreshold = 0 - rpcCtx.contextPatcher.busyThreshold = &selector.busyThreshold - } - return rpcCtx, nil -} - -func (state *tryIdleReplica) isCandidate(replica *replica) bool { - if replica.isEpochStale() || - replica.isExhausted(1, 0) || - replica.store.getLivenessState() != reachable || - replica.deadlineErrUsingConfTimeout { - return false - } - return true -} - -func (state *tryIdleReplica) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) { - if selector.checkLiveness(bo, selector.targetReplica()) != reachable { - selector.invalidateReplicaStore(selector.targetReplica(), cause) - } -} - -type invalidStore struct { - stateBase -} - -func (state *invalidStore) next(_ *retry.Backoffer, _ *replicaSelector) (*RPCContext, error) { - metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("invalidStore").Inc() - return nil, nil -} - -// TODO(sticnarf): If using request forwarding and the leader is unknown, try other followers -// instead of just switching to this state to backoff and retry. -type invalidLeader struct { - stateBase -} - -func (state *invalidLeader) next(_ *retry.Backoffer, _ *replicaSelector) (*RPCContext, error) { - metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("invalidLeader").Inc() - return nil, nil -} - -// newReplicaSelector creates a replicaSelector which selects replicas according to reqType and opts. -// opts is currently only effective for follower read. -func newReplicaSelector( - regionCache *RegionCache, regionID RegionVerID, req *tikvrpc.Request, opts ...StoreSelectorOption, -) (*replicaSelector, error) { - cachedRegion := regionCache.GetCachedRegionWithRLock(regionID) - if cachedRegion == nil || !cachedRegion.isValid() { - return nil, nil - } - replicas := buildTiKVReplicas(cachedRegion) - regionStore := cachedRegion.getStore() - option := storeSelectorOp{} - for _, op := range opts { - op(&option) - } - var state selectorState - if !req.ReplicaReadType.IsFollowerRead() { - if regionCache.enableForwarding && regionStore.proxyTiKVIdx >= 0 { - state = &accessByKnownProxy{leaderIdx: regionStore.workTiKVIdx} - } else { - state = &accessKnownLeader{leaderIdx: regionStore.workTiKVIdx} - } - } else { - if req.ReplicaReadType == kv.ReplicaReadPreferLeader { - WithPerferLeader()(&option) - } - tryLeader := req.ReplicaReadType == kv.ReplicaReadMixed || req.ReplicaReadType == kv.ReplicaReadPreferLeader - state = &accessFollower{ - tryLeader: tryLeader, - isStaleRead: req.StaleRead, - option: option, - leaderIdx: regionStore.workTiKVIdx, - lastIdx: -1, - learnerOnly: req.ReplicaReadType == kv.ReplicaReadLearner, - } - } - - return &replicaSelector{ - baseReplicaSelector: baseReplicaSelector{ - regionCache: regionCache, - region: cachedRegion, - replicas: replicas, - busyThreshold: time.Duration(req.BusyThresholdMs) * time.Millisecond, - }, - regionStore: regionStore, - labels: option.labels, - state: state, - targetIdx: -1, - proxyIdx: -1, - }, nil -} - -func (s *replicaSelector) isValid() bool { - return s != nil -} - -func buildTiKVReplicas(region *Region) []*replica { - regionStore := region.getStore() - replicas := make([]*replica, 0, regionStore.accessStoreNum(tiKVOnly)) - for _, storeIdx := range regionStore.accessIndex[tiKVOnly] { - replicas = append( - replicas, &replica{ - store: regionStore.stores[storeIdx], - peer: region.meta.Peers[storeIdx], - epoch: regionStore.storeEpochs[storeIdx], - attempts: 0, - }, - ) - } - - if val, err := util.EvalFailpoint("newReplicaSelectorInitialAttemptedTime"); err == nil { - attemptedTime, err := time.ParseDuration(val.(string)) - if err != nil { - panic(err) - } - for _, r := range replicas { - r.attemptedTime = attemptedTime - } - } - return replicas -} - const ( maxReplicaAttempt = 10 // The maximum time to allow retrying sending requests after RPC failure. In case an RPC request fails after @@ -1285,89 +555,6 @@ const ( maxReplicaAttemptTime = time.Second * 50 ) -// next creates the RPCContext of the current candidate replica. -// It returns a SendError if runs out of all replicas or the cached region is invalidated. -func (s *replicaSelector) next(bo *retry.Backoffer, _ *tikvrpc.Request) (rpcCtx *RPCContext, err error) { - if !s.region.isValid() { - metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("invalid").Inc() - return nil, nil - } - - s.targetIdx = -1 - s.proxyIdx = -1 - s.refreshRegionStore() - for { - rpcCtx, err = s.state.next(bo, s) - if _, isStateChanged := err.(stateChanged); !isStateChanged { - return - } - } -} - -func (s *replicaSelector) targetReplica() *replica { - if s.targetIdx >= 0 && int(s.targetIdx) < len(s.replicas) { - return s.replicas[s.targetIdx] - } - return nil -} - -func (s *replicaSelector) proxyReplica() *replica { - if s.proxyIdx >= 0 && int(s.proxyIdx) < len(s.replicas) { - return s.replicas[s.proxyIdx] - } - return nil -} - -func (s *replicaSelector) getLabels() []*metapb.StoreLabel { - return s.labels -} - -// sliceIdentical checks whether two slices are referencing the same block of memory. Two `nil`s are also considered -// the same. -func sliceIdentical[T any](a, b []T) bool { - return len(a) == len(b) && unsafe.SliceData(a) == unsafe.SliceData(b) -} - -func (s *replicaSelector) refreshRegionStore() { - oldRegionStore := s.regionStore - newRegionStore := s.region.getStore() - if oldRegionStore == newRegionStore { - return - } - s.regionStore = newRegionStore - - // In the current implementation, if stores change, the address of it must change. - // So we just compare the address here. - // When stores change, we mark this replicaSelector as invalid to let the caller - // recreate a new replicaSelector. - if !sliceIdentical(oldRegionStore.stores, newRegionStore.stores) { - s.state = &invalidStore{} - return - } - - // If leader has changed, it means a recent request succeeds an RPC - // on the new leader. - if oldRegionStore.workTiKVIdx != newRegionStore.workTiKVIdx { - switch state := s.state.(type) { - case *accessFollower: - state.leaderIdx = newRegionStore.workTiKVIdx - default: - // Try the new leader and give it an addition chance if the - // request is sent to the leader. - newLeaderIdx := newRegionStore.workTiKVIdx - s.state = &accessKnownLeader{leaderIdx: newLeaderIdx} - if s.replicas[newLeaderIdx].isExhausted(maxReplicaAttempt, maxReplicaAttemptTime) { - s.replicas[newLeaderIdx].attempts = maxReplicaAttempt - 1 - s.replicas[newLeaderIdx].attemptedTime = 0 - } - } - } -} - -func (s *replicaSelector) buildRPCContext(bo *retry.Backoffer) (*RPCContext, error) { - return s.baseReplicaSelector.buildRPCContext(bo, s.targetReplica(), s.proxyReplica()) -} - func (s *baseReplicaSelector) buildRPCContext(bo *retry.Backoffer, targetReplica, proxyReplica *replica) (*RPCContext, error) { // Backoff and retry if no replica is selected or the selected replica is stale if targetReplica == nil || targetReplica.isEpochStale() || @@ -1416,48 +603,6 @@ func (s *baseReplicaSelector) buildRPCContext(bo *retry.Backoffer, targetReplica return rpcCtx, nil } -func (s *replicaSelector) onSendFailure(bo *retry.Backoffer, err error) { - metrics.RegionCacheCounterWithSendFail.Inc() - s.state.onSendFailure(bo, s, err) -} - -func (s *replicaSelector) onReadReqConfigurableTimeout(req *tikvrpc.Request) bool { - if isReadReqConfigurableTimeout(req) { - if target := s.targetReplica(); target != nil { - target.deadlineErrUsingConfTimeout = true - } - if accessLeader, ok := s.state.(*accessKnownLeader); ok { - // If leader return deadline exceeded error, we should try to access follower next time. - s.state = &tryFollower{leaderIdx: accessLeader.leaderIdx, lastIdx: accessLeader.leaderIdx} - } - return true - } - return false -} - -func isReadReqConfigurableTimeout(req *tikvrpc.Request) bool { - if req.MaxExecutionDurationMs >= uint64(client.ReadTimeoutShort.Milliseconds()) { - // Configurable timeout should less than `ReadTimeoutShort`. - return false - } - // Only work for read requests, return false for non-read requests. - return isReadReq(req.Type) -} - -func isReadReq(tp tikvrpc.CmdType) bool { - switch tp { - case tikvrpc.CmdGet, tikvrpc.CmdBatchGet, tikvrpc.CmdScan, - tikvrpc.CmdCop, tikvrpc.CmdBatchCop, tikvrpc.CmdCopStream: - return true - default: - return false - } -} - -func (s *baseReplicaSelector) getBaseReplicaSelector() *baseReplicaSelector { - return s -} - func (s *baseReplicaSelector) checkLiveness(bo *retry.Backoffer, accessReplica *replica) livenessState { return accessReplica.store.requestLivenessAndStartHealthCheckLoopIfNeeded(bo, s.regionCache.bg, s.regionCache.stores) } @@ -1478,42 +623,6 @@ func (s *baseReplicaSelector) invalidateReplicaStore(replica *replica, cause err } } -func (s *replicaSelector) onSendSuccess(_ *tikvrpc.Request) { - s.state.onSendSuccess(s) -} - -func (s *replicaSelector) onNotLeader( - bo *retry.Backoffer, ctx *RPCContext, notLeader *errorpb.NotLeader, -) (shouldRetry bool, err error) { - if target := s.targetReplica(); target != nil { - target.notLeader = true - } - leaderIdx, err := s.baseReplicaSelector.onNotLeader(bo, ctx, notLeader) - if err != nil { - return false, err - } - if leaderIdx >= 0 { - if isLeaderCandidate(s.replicas[leaderIdx]) { - s.state = &accessKnownLeader{leaderIdx: AccessIndex(leaderIdx)} - } - } else { - s.state.onNoLeader(s) - } - return true, nil -} - -func (s *baseReplicaSelector) onNotLeader( - bo *retry.Backoffer, ctx *RPCContext, notLeader *errorpb.NotLeader, -) (leaderIdx int, err error) { - leader := notLeader.GetLeader() - if leader == nil { - // The region may be during transferring leader. - err = bo.Backoff(retry.BoRegionScheduling, errors.Errorf("no leader, ctx: %v", ctx)) - return -1, err - } - return s.updateLeader(leader), nil -} - // updateLeader updates the leader of the cached region. // If the leader peer isn't found in the region, the region will be invalidated. // If switch to new leader successfully, returns the AccessIndex of the new leader in the replicas. @@ -1548,50 +657,6 @@ func (s *baseReplicaSelector) updateLeader(leader *metapb.Peer) int { return -1 } -func (s *replicaSelector) onServerIsBusy( - bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, serverIsBusy *errorpb.ServerIsBusy, -) (shouldRetry bool, err error) { - var store *Store - if ctx != nil && ctx.Store != nil { - store = ctx.Store - if serverIsBusy.EstimatedWaitMs != 0 { - ctx.Store.updateServerLoadStats(serverIsBusy.EstimatedWaitMs) - if s.busyThreshold != 0 && isReadReq(req.Type) { - // do not retry with batched coprocessor requests. - // it'll be region misses if we send the tasks to replica. - if req.Type == tikvrpc.CmdCop && len(req.Cop().Tasks) > 0 { - return false, nil - } - switch state := s.state.(type) { - case *accessKnownLeader: - // Clear attempt history of the leader, so the leader can be accessed again. - s.replicas[state.leaderIdx].attempts = 0 - s.state = &tryIdleReplica{leaderIdx: state.leaderIdx} - } - } - } else { - // Mark the server is busy (the next incoming READs could be redirected to expected followers.) - ctx.Store.healthStatus.markAlreadySlow() - } - } - backoffErr := errors.Errorf("server is busy, ctx: %v", ctx) - if s.canFastRetry() { - s.addPendingBackoff(store, retry.BoTiKVServerBusy, backoffErr) - return true, nil - } - err = bo.Backoff(retry.BoTiKVServerBusy, backoffErr) - if err != nil { - return false, err - } - return true, nil -} - -func (s *replicaSelector) onDataIsNotReady() { - if target := s.targetReplica(); target != nil { - target.dataIsNotReady = true - } -} - func (s *baseReplicaSelector) invalidateRegion() { if s.region != nil { s.region.invalidate(Other) @@ -1608,8 +673,8 @@ func (s *RegionRequestSender) getRPCContext( switch et { case tikvrpc.TiKV: if s.replicaSelector == nil { - selector, err := NewReplicaSelector(s.regionCache, regionID, req, opts...) //nolint:staticcheck // ignore SA4023, never returns a nil interface value - if selector == nil || !selector.isValid() || err != nil { //nolint:staticcheck // ignore SA4023, never returns a nil interface value + selector, err := newReplicaSelector(s.regionCache, regionID, req, opts...) + if err != nil || selector == nil { return nil, err } s.replicaSelector = selector @@ -1760,7 +825,7 @@ func (s *RegionRequestSender) SendReqCtx( // TODO: Change the returned error to something like "region missing in cache", // and handle this error like EpochNotMatch, which means to re-split the request and retry. if s.replicaSelector != nil { - if err := s.replicaSelector.getBaseReplicaSelector().backoffOnNoCandidate(bo); err != nil { + if err := s.replicaSelector.backoffOnNoCandidate(bo); err != nil { return nil, nil, retryTimes, err } if cost := time.Since(startTime); cost > slowLogSendReqTime || cost > timeout { @@ -1772,11 +837,9 @@ func (s *RegionRequestSender) SendReqCtx( } var isLocalTraffic bool - if staleReadCollector != nil && s.replicaSelector != nil { - if target := s.replicaSelector.targetReplica(); target != nil { - isLocalTraffic = target.store.IsLabelsMatch(s.replicaSelector.getLabels()) - staleReadCollector.onReq(req, isLocalTraffic) - } + if staleReadCollector != nil && s.replicaSelector != nil && s.replicaSelector.target != nil { + isLocalTraffic = s.replicaSelector.target.store.IsLabelsMatch(s.replicaSelector.option.labels) + staleReadCollector.onReq(req, isLocalTraffic) } logutil.Eventf(bo.GetCtx(), "send %s request to region %d at %s", req.Type, regionID.id, rpcCtx.Addr) @@ -1790,15 +853,14 @@ func (s *RegionRequestSender) SendReqCtx( } req.Context.ClusterId = rpcCtx.ClusterID - rpcCtx.contextPatcher.applyTo(&req.Context) if req.InputRequestSource != "" && s.replicaSelector != nil { - patchRequestSource(req, s.replicaSelector.replicaType(rpcCtx)) + patchRequestSource(req, s.replicaSelector.replicaType()) } if e := tikvrpc.SetContext(req, rpcCtx.Meta, rpcCtx.Peer); e != nil { return nil, nil, retryTimes, err } if s.replicaSelector != nil { - if err := s.replicaSelector.getBaseReplicaSelector().backoffOnRetry(rpcCtx.Store, bo); err != nil { + if err := s.replicaSelector.backoffOnRetry(rpcCtx.Store, bo); err != nil { return nil, nil, retryTimes, err } } @@ -2457,10 +1519,8 @@ func (s *RegionRequestSender) onRegionError( zap.Stringer("req", req), zap.Stringer("ctx", ctx), ) - if req != nil { - if s.onFlashbackInProgressRegionError(ctx, req) { - return true, nil - } + if req != nil && s.replicaSelector != nil && s.replicaSelector.onFlashbackInProgress(req) { + return true, nil } return false, errors.Errorf( "region %d is in flashback progress, FlashbackStartTS is %d", @@ -2694,28 +1754,6 @@ func (s *RegionRequestSender) onRegionError( return false, nil } -func (s *RegionRequestSender) onFlashbackInProgressRegionError(ctx *RPCContext, req *tikvrpc.Request) bool { - switch selector := s.replicaSelector.(type) { - case *replicaSelector: - // if the failure is caused by replica read, we can retry it with leader safely. - if ctx.contextPatcher.replicaRead != nil && *ctx.contextPatcher.replicaRead { - req.BusyThresholdMs = 0 - selector.busyThreshold = 0 - ctx.contextPatcher.replicaRead = nil - ctx.contextPatcher.busyThreshold = nil - return true - } - if req.ReplicaReadType.IsFollowerRead() { - s.replicaSelector = nil - req.ReplicaReadType = kv.ReplicaReadLeader - return true - } - case *replicaSelectorV2: - return selector.onFlashbackInProgress(ctx, req) - } - return false -} - type staleReadMetricsCollector struct { } @@ -2766,31 +1804,6 @@ func (s *staleReadMetricsCollector) onResp(tp tikvrpc.CmdType, resp *tikvrpc.Res } } -func (s *replicaSelector) replicaType(rpcCtx *RPCContext) string { - leaderIdx := -1 - switch v := s.state.(type) { - case *accessKnownLeader: - return "leader" - case *tryFollower: - return "follower" - case *accessFollower: - leaderIdx = int(v.leaderIdx) - case *tryIdleReplica: - leaderIdx = int(v.leaderIdx) - } - if leaderIdx > -1 && rpcCtx != nil && rpcCtx.Peer != nil { - for idx, replica := range s.replicas { - if replica.peer.Id == rpcCtx.Peer.Id { - if idx == leaderIdx { - return "leader" - } - return "follower" - } - } - } - return "unknown" -} - func patchRequestSource(req *tikvrpc.Request, replicaType string) { var sb strings.Builder defer func() { @@ -2815,26 +1828,15 @@ func patchRequestSource(req *tikvrpc.Request, replicaType string) { sb.WriteString(req.ReadType) } -func recordAttemptedTime(s ReplicaSelector, duration time.Duration) { - if targetReplica := s.targetReplica(); targetReplica != nil { - targetReplica.attemptedTime += duration +func recordAttemptedTime(s *replicaSelector, duration time.Duration) { + if s.target != nil { + s.target.attemptedTime += duration } - if proxyReplica := s.proxyReplica(); proxyReplica != nil { - proxyReplica.attemptedTime += duration + if s.proxy != nil { + s.proxy.attemptedTime += duration } } -// canFastRetry returns true if the request can be sent to next replica. -func (s *replicaSelector) canFastRetry() bool { - accessLeader, ok := s.state.(*accessKnownLeader) - if ok && isLeaderCandidate(s.replicas[accessLeader.leaderIdx]) { - // If leader is still candidate, the request will be sent to leader again, - // so don't skip since the leader is still busy. - return false - } - return true -} - type backoffArgs struct { cfg *retry.Config err error diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index 8a72dba3..aa051385 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -197,33 +197,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestSwitchPeerWhenNoLeaderErrorWit s.True(r.isValid()) } -func (s *testRegionRequestToThreeStoresSuite) TestSliceIdentical() { - a := make([]int, 0) - b := a - s.True(sliceIdentical(a, b)) - b = make([]int, 0) - s.False(sliceIdentical(a, b)) - - a = append(a, 1, 2, 3) - b = a - s.True(sliceIdentical(a, b)) - b = a[:2] - s.False(sliceIdentical(a, b)) - b = a[1:] - s.False(sliceIdentical(a, b)) - a = a[1:] - s.True(sliceIdentical(a, b)) - - a = nil - b = nil - - s.True(sliceIdentical(a, b)) - a = make([]int, 0) - s.False(sliceIdentical(a, b)) - a = append(a, 1) - s.False(sliceIdentical(a, b)) -} - func (s *testRegionRequestToThreeStoresSuite) loadAndGetLeaderStore() (*Store, string) { region, err := s.regionRequestSender.regionCache.findRegionByKey(s.bo, []byte("a"), false) s.Nil(err) @@ -378,6 +351,14 @@ func refreshLivenessStates(regionStore *regionStore) { } } +func refreshStoreHealthStatus(regionStore *regionStore) { + for _, store := range regionStore.stores { + store.healthStatus.clientSideSlowScore.resetSlowScore() + store.healthStatus.ResetTiKVServerSideSlowScoreForTest(50) + store.healthStatus.updateSlowFlag() + } +} + func AssertRPCCtxEqual(s *testRegionRequestToThreeStoresSuite, rpcCtx *RPCContext, target *replica, proxy *replica) { s.Equal(rpcCtx.Store, target.store) s.Equal(rpcCtx.Peer, target.peer) @@ -429,7 +410,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestLearnerReplicaSelector() { refreshRegionTTL(region) refreshEpochs(regionStore) req.ReplicaReadType = kv.ReplicaReadLearner - replicaSelector, err := NewReplicaSelector(cache, regionLoc.Region, req) + replicaSelector, err := newReplicaSelector(cache, regionLoc.Region, req) s.NotNil(replicaSelector) s.Nil(err) @@ -437,7 +418,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestLearnerReplicaSelector() { refreshRegionTTL(region) rpcCtx, err := replicaSelector.next(s.bo, req) s.Nil(err) - target := replicaSelector.targetReplica() + target := replicaSelector.target AssertRPCCtxEqual(s, rpcCtx, target, nil) s.Equal(target.peer.Role, metapb.PeerRole_Learner) s.Equal(target.peer.Id, tikvLearner.Id) @@ -472,7 +453,8 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { region.meta.Peers = append(region.meta.Peers, tiflash) atomic.StorePointer(®ion.store, unsafe.Pointer(regionStore)) - cache := NewRegionCache(s.cache.pdClient) + // Disable the tick on health status. + cache := NewRegionCache(s.cache.pdClient, RegionCacheNoHealthTick) defer cache.Close() cache.mu.Lock() cache.insertRegionToCache(region, true, true) @@ -486,7 +468,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { // Should only contain TiKV stores. s.Equal(len(replicaSelector.replicas), regionStore.accessStoreNum(tiKVOnly)) s.Equal(len(replicaSelector.replicas), len(regionStore.stores)-1) - s.IsType(&accessKnownLeader{}, replicaSelector.state) // Verify that the store matches the peer and epoch. for _, replica := range replicaSelector.replicas { @@ -501,14 +482,11 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { } } - // Test accessKnownLeader state - s.IsType(&accessKnownLeader{}, replicaSelector.state) // Try the leader for maxReplicaAttempt times for i := 1; i <= maxReplicaAttempt; i++ { rpcCtx, err := replicaSelector.next(s.bo, req) s.Nil(err) AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil) - s.IsType(&accessKnownLeader{}, replicaSelector.state) s.Equal(replicaSelector.replicas[regionStore.workTiKVIdx].attempts, i) } @@ -516,15 +494,11 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { for i := 0; i < len(replicaSelector.replicas)-1; i++ { rpcCtx, err := replicaSelector.next(s.bo, req) s.Nil(err) - state, ok := replicaSelector.state.(*tryFollower) - s.True(ok) - s.Equal(regionStore.workTiKVIdx, state.leaderIdx) - s.NotEqual(state.lastIdx, regionStore.workTiKVIdx) - s.Equal(replicaSelector.targetIdx, state.lastIdx) - AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[replicaSelector.targetIdx], nil) - s.Equal(replicaSelector.targetReplica().attempts, 1) + AssertRPCCtxEqual(s, rpcCtx, replicaSelector.target, nil) + s.Equal(replicaSelector.target.attempts, 1) + s.NotEqual(rpcCtx.Peer.Id, replicaSelector.replicas[regionStore.workTiKVIdx].peer.Id) } - // In tryFollower state, if all replicas are tried, nil RPCContext should be returned + // If all replicas are tried, nil RPCContext should be returned rpcCtx, err := replicaSelector.next(s.bo, req) s.Nil(err) s.Nil(rpcCtx) @@ -537,22 +511,19 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { s.Nil(err) s.NotNil(replicaSelector) unreachable.injectConstantLiveness(cache.stores) - s.IsType(&accessKnownLeader{}, replicaSelector.state) _, err = replicaSelector.next(s.bo, req) s.Nil(err) replicaSelector.onSendFailure(s.bo, nil) rpcCtx, err = replicaSelector.next(s.bo, req) s.NotNil(rpcCtx) s.Nil(err) - s.IsType(&tryFollower{}, replicaSelector.state) - s.NotEqual(replicaSelector.targetIdx, regionStore.workTiKVIdx) - AssertRPCCtxEqual(s, rpcCtx, replicaSelector.targetReplica(), nil) - s.Equal(replicaSelector.targetReplica().attempts, 1) + s.NotEqual(replicaSelector.target.peer.Id, region.GetLeaderPeerID()) + AssertRPCCtxEqual(s, rpcCtx, replicaSelector.target, nil) + s.Equal(replicaSelector.target.attempts, 1) // If the NotLeader errors provides an unreachable leader, do not switch to it. replicaSelector.onNotLeader(s.bo, rpcCtx, &errorpb.NotLeader{ RegionId: region.GetID(), Leader: &metapb.Peer{Id: s.peerIDs[regionStore.workTiKVIdx], StoreId: s.storeIDs[regionStore.workTiKVIdx]}, }) - s.IsType(&tryFollower{}, replicaSelector.state) // If the leader is unreachable and forwarding is not enabled, just do not try // the unreachable leader. @@ -560,14 +531,11 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req) s.Nil(err) s.NotNil(replicaSelector) - s.IsType(&accessKnownLeader{}, replicaSelector.state) // Now, livenessState is unreachable, so it will try a reachable follower instead of the unreachable leader. rpcCtx, err = replicaSelector.next(s.bo, req) s.Nil(err) s.NotNil(rpcCtx) - _, ok := replicaSelector.state.(*tryFollower) - s.True(ok) - s.NotEqual(regionStore.workTiKVIdx, replicaSelector.targetIdx) + s.NotEqual(regionStore.workTiKVIdx, replicaSelector.target.peer.Id, replicaSelector.replicas[regionStore.workTiKVIdx].peer.Id) // Do not try to use proxy if livenessState is unknown instead of unreachable. refreshEpochs(regionStore) @@ -579,14 +547,11 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { s.Eventually(func() bool { return regionStore.stores[regionStore.workTiKVIdx].getLivenessState() == unknown }, 3*time.Second, 200*time.Millisecond) - s.IsType(&accessKnownLeader{}, replicaSelector.state) // Now, livenessState is unknown. Even if forwarding is enabled, it should try followers // instead of using the proxy. rpcCtx, err = replicaSelector.next(s.bo, req) s.Nil(err) s.NotNil(rpcCtx) - _, ok = replicaSelector.state.(*tryFollower) - s.True(ok) // Test switching to tryNewProxy if leader is unreachable and forwarding is enabled refreshEpochs(regionStore) @@ -598,65 +563,39 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { s.Eventually(func() bool { return regionStore.stores[regionStore.workTiKVIdx].getLivenessState() == unreachable }, 3*time.Second, 200*time.Millisecond) - s.IsType(&accessKnownLeader{}, replicaSelector.state) // Now, livenessState is unreachable, so it will try a new proxy instead of the leader. rpcCtx, err = replicaSelector.next(s.bo, req) s.Nil(err) s.NotNil(rpcCtx) - state, ok := replicaSelector.state.(*tryNewProxy) - s.True(ok) - s.Equal(regionStore.workTiKVIdx, state.leaderIdx) - s.Equal(AccessIndex(2), replicaSelector.targetIdx) - s.NotEqual(AccessIndex(2), replicaSelector.proxyIdx) - AssertRPCCtxEqual(s, rpcCtx, replicaSelector.targetReplica(), replicaSelector.proxyReplica()) - s.Equal(replicaSelector.targetReplica().attempts, 1) - s.Equal(replicaSelector.proxyReplica().attempts, 1) + s.NotNil(replicaSelector.target) + s.NotNil(replicaSelector.proxy) + s.NotEqual(replicaSelector.target.peer.Id, &replicaSelector.proxy.peer.Id) + AssertRPCCtxEqual(s, rpcCtx, replicaSelector.target, replicaSelector.proxy) + s.Equal(replicaSelector.target.attempts, 1) + s.Equal(replicaSelector.proxy.attempts, 1) // When the current proxy node fails, it should try another one. - lastProxy := replicaSelector.proxyIdx replicaSelector.onSendFailure(s.bo, nil) rpcCtx, err = replicaSelector.next(s.bo, req) s.NotNil(rpcCtx) s.Nil(err) - state, ok = replicaSelector.state.(*tryNewProxy) - s.True(ok) - s.Equal(regionStore.workTiKVIdx, state.leaderIdx) - s.Equal(AccessIndex(2), replicaSelector.targetIdx) - s.NotEqual(lastProxy, replicaSelector.proxyIdx) - s.Equal(replicaSelector.targetReplica().attempts, 2) - s.Equal(replicaSelector.proxyReplica().attempts, 1) + s.Equal(replicaSelector.target.attempts, 2) + s.Equal(replicaSelector.proxy.attempts, 1) // Test proxy store is saves when proxy is enabled replicaSelector.onSendSuccess(req) regionStore = region.getStore() - s.Equal(replicaSelector.proxyIdx, regionStore.proxyTiKVIdx) + s.Equal(replicaSelector.proxy.peer.Id, replicaSelector.replicas[regionStore.proxyTiKVIdx].peer.Id) - // Test initial state is accessByKnownProxy when proxyTiKVIdx is valid + // Test when proxyTiKVIdx is valid refreshEpochs(regionStore) cache.enableForwarding = true replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req) s.Nil(err) s.NotNil(replicaSelector) - state2, ok := replicaSelector.state.(*accessByKnownProxy) - s.True(ok) - s.Equal(regionStore.workTiKVIdx, state2.leaderIdx) _, err = replicaSelector.next(s.bo, req) s.Nil(err) - AssertRPCCtxEqual(s, rpcCtx, replicaSelector.targetReplica(), replicaSelector.proxyReplica()) - - // Switch to tryNewProxy if the current proxy is not available - replicaSelector.onSendFailure(s.bo, nil) - s.IsType(&tryNewProxy{}, replicaSelector.state) - rpcCtx, err = replicaSelector.next(s.bo, req) - s.Nil(err) - AssertRPCCtxEqual(s, rpcCtx, replicaSelector.targetReplica(), replicaSelector.proxyReplica()) - s.Equal(regionStore.workTiKVIdx, state2.leaderIdx) - s.Equal(AccessIndex(2), replicaSelector.targetIdx) - s.NotEqual(regionStore.proxyTiKVIdx, replicaSelector.proxyIdx) - s.Equal(replicaSelector.targetReplica().attempts, 2) - s.Equal(replicaSelector.proxyReplica().attempts, 1) - // FIXME: the chosen proxy-replica's store should be reachable. - //s.Equal(replicaSelector.proxyReplica().store.getLivenessState(), reachable) + AssertRPCCtxEqual(s, rpcCtx, replicaSelector.target, replicaSelector.proxy) // Test accessFollower state with kv.ReplicaReadFollower request type. req = tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}, kv.ReplicaReadFollower, nil) @@ -665,28 +604,15 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req) s.Nil(err) s.NotNil(replicaSelector) - state3, ok := replicaSelector.state.(*accessFollower) - s.True(ok) - s.Equal(regionStore.workTiKVIdx, state3.leaderIdx) - s.Equal(state3.lastIdx, AccessIndex(-1)) - lastIdx := AccessIndex(-1) for i := 0; i < regionStore.accessStoreNum(tiKVOnly)-1; i++ { rpcCtx, err := replicaSelector.next(s.bo, req) s.Nil(err) - // Should switch to the next follower. - s.NotEqual(lastIdx, state3.lastIdx) - // Shouldn't access the leader if followers aren't exhausted. - s.NotEqual(regionStore.workTiKVIdx, state3.lastIdx) - s.Equal(replicaSelector.targetIdx, state3.lastIdx) - AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[replicaSelector.targetIdx], nil) - lastIdx = state3.lastIdx + AssertRPCCtxEqual(s, rpcCtx, replicaSelector.target, nil) } // Fallback to the leader for 1 time rpcCtx, err = replicaSelector.next(s.bo, req) s.Nil(err) - s.Equal(regionStore.workTiKVIdx, state3.lastIdx) - s.Equal(replicaSelector.targetIdx, state3.lastIdx) AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil) // All replicas are exhausted. rpcCtx, err = replicaSelector.next(s.bo, req) @@ -704,17 +630,15 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req) s.NotNil(replicaSelector) s.Nil(err) - state3 = replicaSelector.state.(*accessFollower) // Should fallback to the leader immediately. rpcCtx, err = replicaSelector.next(s.bo, req) s.Nil(err) - s.Equal(regionStore.workTiKVIdx, state3.lastIdx) - s.Equal(replicaSelector.targetIdx, state3.lastIdx) AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil) // Test accessFollower state filtering label-not-match stores. refreshRegionTTL(region) refreshEpochs(regionStore) + refreshStoreHealthStatus(regionStore) labels := []*metapb.StoreLabel{ { Key: "a", @@ -814,7 +738,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { resp, _, err = sender.SendReq(bo, req, region.Region, client.ReadTimeoutShort) s.Nil(err) s.NotNil(resp) - s.Equal(sender.replicaSelector.targetReplica().peer.Id, s.peerIDs[1]) + s.Equal(sender.replicaSelector.target.peer.Id, s.peerIDs[1]) s.True(bo.GetTotalBackoffTimes() == 1) s.cluster.StartStore(s.storeIDs[0]) atomic.StoreUint32(®ionStore.stores[0].livenessState, uint32(reachable)) @@ -825,7 +749,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { resp, _, err = sender.SendReq(bo, req, region.Region, client.ReadTimeoutShort) s.Nil(err) s.NotNil(resp) - s.Equal(sender.replicaSelector.targetReplica().peer.Id, s.peerIDs[1]) + s.Equal(sender.replicaSelector.target.peer.Id, s.peerIDs[1]) s.True(bo.GetTotalBackoffTimes() == 0) // Switch to the next peer due to leader failure but the new leader is not elected. @@ -856,31 +780,13 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { s.Nil(err) s.True(hasFakeRegionError(resp)) s.Equal(bo.GetTotalBackoffTimes(), 3) - getReplicaSelectorRegion := func() *Region { - if selector, ok := sender.replicaSelector.(*replicaSelector); ok { - return selector.region - } - if selector, ok := sender.replicaSelector.(*replicaSelectorV2); ok { - return selector.region - } - return nil - } - s.False(getReplicaSelectorRegion().isValid()) + s.False(sender.replicaSelector.region.isValid()) s.cluster.ChangeLeader(s.regionID, s.peerIDs[0]) // The leader store is alive but can't provide service. - getReplicaSelectorRegionStores := func() []*Store { - if selector, ok := sender.replicaSelector.(*replicaSelector); ok { - return selector.regionStore.stores - } - if selector, ok := sender.replicaSelector.(*replicaSelectorV2); ok { - return selector.region.getStore().stores - } - return nil - } reachable.injectConstantLiveness(s.cache.stores) s.Eventually(func() bool { - stores := getReplicaSelectorRegionStores() + stores := sender.replicaSelector.region.getStore().stores return stores[0].getLivenessState() == reachable && stores[1].getLivenessState() == reachable && stores[2].getLivenessState() == reachable @@ -892,7 +798,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { resp, _, err = sender.SendReq(bo, req, region.Region, time.Second) s.Nil(err) s.True(hasFakeRegionError(resp)) - s.False(getReplicaSelectorRegion().isValid()) + s.False(sender.replicaSelector.region.isValid()) s.Equal(bo.GetTotalBackoffTimes(), maxReplicaAttempt+2) s.cluster.StartStore(s.storeIDs[0]) @@ -926,7 +832,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { resp, _, err := sender.SendReq(bo, req, region.Region, time.Second) s.Nil(err) s.True(hasFakeRegionError(resp)) - s.False(getReplicaSelectorRegion().isValid()) + s.False(sender.replicaSelector.region.isValid()) s.Equal(bo.GetTotalBackoffTimes(), maxReplicaAttempt+2) }() } @@ -946,7 +852,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { resp, _, err := sender.SendReq(bo, req, region.Region, time.Second) s.Nil(err) s.True(hasFakeRegionError(resp)) - s.False(getReplicaSelectorRegion().isValid()) + s.False(sender.replicaSelector.region.isValid()) s.Equal(bo.GetTotalBackoffTimes(), 0) }() @@ -965,7 +871,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { resp, _, err := sender.SendReq(bo, req, region.Region, time.Second) s.Nil(err) s.True(hasFakeRegionError(resp)) - s.False(getReplicaSelectorRegion().isValid()) + s.False(sender.replicaSelector.region.isValid()) s.Equal(bo.GetTotalBackoffTimes(), 0) }() @@ -998,7 +904,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { regionErr, _ := resp.GetRegionError() s.NotNil(regionErr) } - s.False(getReplicaSelectorRegion().isValid()) + s.False(sender.replicaSelector.region.isValid()) s.Equal(bo.GetTotalBackoffTimes(), 0) }() } @@ -1014,7 +920,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { s.Nil(err) s.True(hasFakeRegionError(resp)) s.True(bo.GetTotalBackoffTimes() == 3) - s.False(getReplicaSelectorRegion().isValid()) + s.False(sender.replicaSelector.region.isValid()) for _, store := range s.storeIDs { s.cluster.StartStore(store) } @@ -1029,12 +935,12 @@ func (s *testRegionRequestToThreeStoresSuite) TestLoadBasedReplicaRead() { BusyThresholdMs: 50, }) - replicaSelector, err := NewReplicaSelector(s.cache, regionLoc.Region, req) + replicaSelector, err := newReplicaSelector(s.cache, regionLoc.Region, req) s.NotNil(replicaSelector) s.Nil(err) - s.Equal(replicaSelector.getBaseReplicaSelector().region, region) + s.Equal(replicaSelector.region, region) // The busyThreshold in replicaSelector should be initialized with the request context. - s.Equal(replicaSelector.getBaseReplicaSelector().busyThreshold, 50*time.Millisecond) + s.Equal(replicaSelector.busyThreshold, 50*time.Millisecond) bo := retry.NewBackoffer(context.Background(), -1) rpcCtx, err := replicaSelector.next(bo, req) @@ -1051,7 +957,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestLoadBasedReplicaRead() { rpcCtx, err = replicaSelector.next(bo, req) s.Nil(err) s.NotEqual(rpcCtx.Peer.Id, s.leaderPeer) - rpcCtx.contextPatcher.applyTo(&req.Context) s.True(req.ReplicaRead) s.Equal(req.BusyThresholdMs, uint32(50)) lastPeerID := rpcCtx.Peer.Id @@ -1065,7 +970,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestLoadBasedReplicaRead() { // Should choose a peer different from before s.NotEqual(rpcCtx.Peer.Id, s.leaderPeer) s.NotEqual(rpcCtx.Peer.Id, lastPeerID) - rpcCtx.contextPatcher.applyTo(&req.Context) s.True(req.ReplicaRead) s.Equal(req.BusyThresholdMs, uint32(50)) @@ -1079,22 +983,20 @@ func (s *testRegionRequestToThreeStoresSuite) TestLoadBasedReplicaRead() { rpcCtx, err = replicaSelector.next(bo, req) s.Nil(err) s.Equal(rpcCtx.Peer.Id, s.leaderPeer) - rpcCtx.contextPatcher.applyTo(&req.Context) s.False(req.ReplicaRead) s.Equal(req.BusyThresholdMs, uint32(0)) - s.True(replicaSelector.getBaseReplicaSelector().region.isValid()) // don't invalidate region when can't find an idle replica. + s.True(replicaSelector.region.isValid()) // don't invalidate region when can't find an idle replica. time.Sleep(120 * time.Millisecond) // When there comes a new request, it should skip busy leader and choose a less busy store req.BusyThresholdMs = 50 - replicaSelector, err = NewReplicaSelector(s.cache, regionLoc.Region, req) + replicaSelector, err = newReplicaSelector(s.cache, regionLoc.Region, req) s.NotNil(replicaSelector) s.Nil(err) rpcCtx, err = replicaSelector.next(bo, req) s.Nil(err) s.Equal(rpcCtx.Peer.Id, lessBusyPeer) - rpcCtx.contextPatcher.applyTo(&req.Context) s.True(req.ReplicaRead) } @@ -1407,7 +1309,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestRetryRequestSource() { } } - setTargetReplica := func(selector ReplicaSelector, readType string) { + setTargetReplica := func(selector *replicaSelector, readType string) { var leader bool switch readType { case "leader", "stale_leader": @@ -1417,23 +1319,13 @@ func (s *testRegionRequestToThreeStoresSuite) TestRetryRequestSource() { default: panic("unreachable") } - for idx, replica := range selector.getBaseReplicaSelector().replicas { + for _, replica := range selector.replicas { if replica.store.storeID == leaderStore.storeID && leader { - if v1, ok := selector.(*replicaSelector); ok { - v1.targetIdx = AccessIndex(idx) - } - if v2, ok := selector.(*replicaSelectorV2); ok { - v2.target = replica - } + selector.target = replica return } if replica.store.storeID != leaderStore.storeID && !leader { - if v1, ok := selector.(*replicaSelector); ok { - v1.targetIdx = AccessIndex(idx) - } - if v2, ok := selector.(*replicaSelectorV2); ok { - v2.target = replica - } + selector.target = replica return } } @@ -1447,23 +1339,25 @@ func (s *testRegionRequestToThreeStoresSuite) TestRetryRequestSource() { bo := retry.NewBackoffer(context.Background(), -1) req.IsRetryRequest = false setReadType(req, firstReplica) - replicaSelector, err := NewReplicaSelector(s.cache, regionLoc.Region, req) + replicaSelector, err := newReplicaSelector(s.cache, regionLoc.Region, req) s.Nil(err) setTargetReplica(replicaSelector, firstReplica) - rpcCtx, err := replicaSelector.getBaseReplicaSelector().buildRPCContext(bo, replicaSelector.targetReplica(), replicaSelector.proxyReplica()) + rpcCtx, err := replicaSelector.buildRPCContext(bo, replicaSelector.target, replicaSelector.proxy) s.Nil(err) - patchRequestSource(req, replicaSelector.replicaType(rpcCtx)) + s.NotNil(rpcCtx) + patchRequestSource(req, replicaSelector.replicaType()) s.Equal(firstReplica+"_test", req.RequestSource) // retry setReadType(req, retryReplica) - replicaSelector, err = NewReplicaSelector(s.cache, regionLoc.Region, req) + replicaSelector, err = newReplicaSelector(s.cache, regionLoc.Region, req) s.Nil(err) setTargetReplica(replicaSelector, retryReplica) - rpcCtx, err = replicaSelector.getBaseReplicaSelector().buildRPCContext(bo, replicaSelector.targetReplica(), replicaSelector.proxyReplica()) + rpcCtx, err = replicaSelector.buildRPCContext(bo, replicaSelector.target, replicaSelector.proxy) s.Nil(err) + s.NotNil(rpcCtx) req.IsRetryRequest = true - patchRequestSource(req, replicaSelector.replicaType(rpcCtx)) + patchRequestSource(req, replicaSelector.replicaType()) s.Equal("retry_"+firstReplica+"_"+retryReplica+"_test", req.RequestSource) } } diff --git a/internal/locate/replica_selector.go b/internal/locate/replica_selector.go index c219d08d..c9064549 100644 --- a/internal/locate/replica_selector.go +++ b/internal/locate/replica_selector.go @@ -21,45 +21,15 @@ import ( "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pkg/errors" - "github.com/tikv/client-go/v2/config" "github.com/tikv/client-go/v2/config/retry" + "github.com/tikv/client-go/v2/internal/client" "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/metrics" "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/util" ) -type ReplicaSelector interface { - next(bo *retry.Backoffer, req *tikvrpc.Request) (*RPCContext, error) - targetReplica() *replica - proxyReplica() *replica - replicaType(rpcCtx *RPCContext) string - String() string - getBaseReplicaSelector() *baseReplicaSelector - getLabels() []*metapb.StoreLabel - onSendSuccess(req *tikvrpc.Request) - onSendFailure(bo *retry.Backoffer, err error) - invalidateRegion() - // Following methods are used to handle region errors. - onNotLeader(bo *retry.Backoffer, ctx *RPCContext, notLeader *errorpb.NotLeader) (shouldRetry bool, err error) - onDataIsNotReady() - onServerIsBusy(bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, serverIsBusy *errorpb.ServerIsBusy) (shouldRetry bool, err error) - onReadReqConfigurableTimeout(req *tikvrpc.Request) bool - isValid() bool -} - -// NewReplicaSelector returns a new ReplicaSelector. -// -//nolint:staticcheck // ignore SA4023, never returns a nil interface value -func NewReplicaSelector( - regionCache *RegionCache, regionID RegionVerID, req *tikvrpc.Request, opts ...StoreSelectorOption, -) (ReplicaSelector, error) { - if config.GetGlobalConfig().TiKVClient.EnableReplicaSelectorV2 { - return newReplicaSelectorV2(regionCache, regionID, req, opts...) - } - return newReplicaSelector(regionCache, regionID, req, opts...) -} - -type replicaSelectorV2 struct { +type replicaSelector struct { baseReplicaSelector replicaReadType kv.ReplicaReadType isStaleRead bool @@ -70,9 +40,9 @@ type replicaSelectorV2 struct { attempts int } -func newReplicaSelectorV2( +func newReplicaSelector( regionCache *RegionCache, regionID RegionVerID, req *tikvrpc.Request, opts ...StoreSelectorOption, -) (*replicaSelectorV2, error) { +) (*replicaSelector, error) { cachedRegion := regionCache.GetCachedRegionWithRLock(regionID) if cachedRegion == nil || !cachedRegion.isValid() { return nil, nil @@ -85,7 +55,7 @@ func newReplicaSelectorV2( if req.ReplicaReadType == kv.ReplicaReadPreferLeader { WithPerferLeader()(&option) } - return &replicaSelectorV2{ + return &replicaSelector{ baseReplicaSelector: baseReplicaSelector{ regionCache: regionCache, region: cachedRegion, @@ -101,11 +71,33 @@ func newReplicaSelectorV2( }, nil } -func (s *replicaSelectorV2) isValid() bool { - return s != nil +func buildTiKVReplicas(region *Region) []*replica { + regionStore := region.getStore() + replicas := make([]*replica, 0, regionStore.accessStoreNum(tiKVOnly)) + for _, storeIdx := range regionStore.accessIndex[tiKVOnly] { + replicas = append( + replicas, &replica{ + store: regionStore.stores[storeIdx], + peer: region.meta.Peers[storeIdx], + epoch: regionStore.storeEpochs[storeIdx], + attempts: 0, + }, + ) + } + + if val, err := util.EvalFailpoint("newReplicaSelectorInitialAttemptedTime"); err == nil { + attemptedTime, err := time.ParseDuration(val.(string)) + if err != nil { + panic(err) + } + for _, r := range replicas { + r.attemptedTime = attemptedTime + } + } + return replicas } -func (s *replicaSelectorV2) next(bo *retry.Backoffer, req *tikvrpc.Request) (rpcCtx *RPCContext, err error) { +func (s *replicaSelector) next(bo *retry.Backoffer, req *tikvrpc.Request) (rpcCtx *RPCContext, err error) { if !s.region.isValid() { metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("invalid").Inc() return nil, nil @@ -126,7 +118,7 @@ func (s *replicaSelectorV2) next(bo *retry.Backoffer, req *tikvrpc.Request) (rpc return s.buildRPCContext(bo, s.target, s.proxy) } -func (s *replicaSelectorV2) nextForReplicaReadLeader(req *tikvrpc.Request) { +func (s *replicaSelector) nextForReplicaReadLeader(req *tikvrpc.Request) { if s.regionCache.enableForwarding { strategy := ReplicaSelectLeaderWithProxyStrategy{} s.target, s.proxy = strategy.next(s) @@ -140,7 +132,7 @@ func (s *replicaSelectorV2) nextForReplicaReadLeader(req *tikvrpc.Request) { leaderIdx := s.region.getStore().workTiKVIdx strategy := ReplicaSelectLeaderStrategy{leaderIdx: leaderIdx} s.target = strategy.next(s.replicas) - if s.target != nil && s.busyThreshold > 0 && s.isReadOnlyReq && (s.target.store.EstimatedWaitTime() > s.busyThreshold || s.target.serverIsBusy) { + if s.target != nil && s.busyThreshold > 0 && s.isReadOnlyReq && (s.target.store.EstimatedWaitTime() > s.busyThreshold || s.target.hasFlag(serverIsBusyFlag)) { // If the leader is busy in our estimation, try other idle replicas. // If other replicas are all busy, tryIdleReplica will try the leader again without busy threshold. mixedStrategy := ReplicaSelectMixedStrategy{leaderIdx: leaderIdx, busyThreshold: s.busyThreshold} @@ -160,13 +152,13 @@ func (s *replicaSelectorV2) nextForReplicaReadLeader(req *tikvrpc.Request) { } mixedStrategy := ReplicaSelectMixedStrategy{leaderIdx: leaderIdx, leaderOnly: s.option.leaderOnly} s.target = mixedStrategy.next(s) - if s.target != nil && s.isReadOnlyReq && s.replicas[leaderIdx].deadlineErrUsingConfTimeout { + if s.target != nil && s.isReadOnlyReq && s.replicas[leaderIdx].hasFlag(deadlineErrUsingConfTimeoutFlag) { req.ReplicaRead = true req.StaleRead = false } } -func (s *replicaSelectorV2) nextForReplicaReadMixed(req *tikvrpc.Request) { +func (s *replicaSelector) nextForReplicaReadMixed(req *tikvrpc.Request) { leaderIdx := s.region.getStore().workTiKVIdx if s.isStaleRead && s.attempts == 2 { // For stale read second retry, try leader by leader read. @@ -230,6 +222,24 @@ func (s ReplicaSelectLeaderStrategy) next(replicas []*replica) *replica { return nil } +// check leader is candidate or not. +func isLeaderCandidate(leader *replica) bool { + // If hibernate region is enabled and the leader is not reachable, the raft group + // will not be wakened up and re-elect the leader until the follower receives + // a request. So, before the new leader is elected, we should not send requests + // to the unreachable old leader to avoid unnecessary timeout. + // If leader.deadlineErrUsingConfTimeout is true, it means the leader is already tried and received deadline exceeded error, then don't retry it. + // If leader.notLeader is true, it means the leader is already tried and received not leader error, then don't retry it. + if leader.store.getLivenessState() != reachable || + leader.isExhausted(maxReplicaAttempt, maxReplicaAttemptTime) || + leader.hasFlag(deadlineErrUsingConfTimeoutFlag) || + leader.hasFlag(notLeaderFlag) || + leader.isEpochStale() { // check leader epoch here, if leader.epoch staled, we can try other replicas. instead of buildRPCContext failed and invalidate region then retry. + return false + } + return true +} + // ReplicaSelectMixedStrategy is used to select a replica by calculating a score for each replica, and then choose the one with the highest score. // Attention, if you want the leader replica must be chosen in some case, you should use ReplicaSelectLeaderStrategy, instead of use ReplicaSelectMixedStrategy with preferLeader flag. type ReplicaSelectMixedStrategy struct { @@ -243,7 +253,7 @@ type ReplicaSelectMixedStrategy struct { busyThreshold time.Duration } -func (s *ReplicaSelectMixedStrategy) next(selector *replicaSelectorV2) *replica { +func (s *ReplicaSelectMixedStrategy) next(selector *replicaSelector) *replica { replicas := selector.replicas maxScoreIdxes := make([]int, 0, len(replicas)) var maxScore storeSelectionScore = -1 @@ -290,13 +300,23 @@ func (s *ReplicaSelectMixedStrategy) next(selector *replicaSelectorV2) *replica return nil } +func hasDeadlineExceededError(replicas []*replica) bool { + for _, replica := range replicas { + if replica.hasFlag(deadlineErrUsingConfTimeoutFlag) { + // when meet deadline exceeded error, do fast retry without invalidate region cache. + return true + } + } + return false +} + func (s *ReplicaSelectMixedStrategy) isCandidate(r *replica, isLeader bool, epochStale bool, liveness livenessState) bool { if epochStale || liveness == unreachable { // the replica is not available, skip it. return false } maxAttempt := 1 - if r.dataIsNotReady && !isLeader { + if r.hasFlag(dataIsNotReadyFlag) && !isLeader { // If the replica is failed by data not ready with stale read, we can retry it with replica-read. // after https://github.com/tikv/tikv/pull/15726, the leader will not return DataIsNotReady error, // then no need to retry leader again. If you try it again, you may get a NotLeader error. @@ -309,7 +329,7 @@ func (s *ReplicaSelectMixedStrategy) isCandidate(r *replica, isLeader bool, epoc if s.leaderOnly && !isLeader { return false } - if s.busyThreshold > 0 && (r.store.EstimatedWaitTime() > s.busyThreshold || r.serverIsBusy || isLeader) { + if s.busyThreshold > 0 && (r.store.EstimatedWaitTime() > s.busyThreshold || r.hasFlag(serverIsBusyFlag) || isLeader) { return false } if s.preferLeader && r.store.healthStatus.IsSlow() && !isLeader { @@ -406,12 +426,12 @@ func (s *ReplicaSelectMixedStrategy) calculateScore(r *replica, isLeader bool) s type ReplicaSelectLeaderWithProxyStrategy struct{} -func (s ReplicaSelectLeaderWithProxyStrategy) next(selector *replicaSelectorV2) (leader *replica, proxy *replica) { +func (s ReplicaSelectLeaderWithProxyStrategy) next(selector *replicaSelector) (leader *replica, proxy *replica) { rs := selector.region.getStore() leaderIdx := rs.workTiKVIdx replicas := selector.replicas leader = replicas[leaderIdx] - if leader.store.getLivenessState() == reachable || leader.notLeader { + if leader.store.getLivenessState() == reachable || leader.hasFlag(notLeaderFlag) { // if leader's store is reachable, no need use proxy. rs.unsetProxyStoreIfNeeded(selector.region) return leader, nil @@ -444,16 +464,19 @@ func (s ReplicaSelectLeaderWithProxyStrategy) isCandidate(r *replica, isLeader b return true } -func (s *replicaSelectorV2) onNotLeader( +func (s *replicaSelector) onNotLeader( bo *retry.Backoffer, ctx *RPCContext, notLeader *errorpb.NotLeader, ) (shouldRetry bool, err error) { if s.target != nil { - s.target.notLeader = true + s.target.addFlag(notLeaderFlag) } - leaderIdx, err := s.baseReplicaSelector.onNotLeader(bo, ctx, notLeader) - if err != nil { - return false, err + leader := notLeader.GetLeader() + if leader == nil { + // The region may be during transferring leader. + err = bo.Backoff(retry.BoRegionScheduling, errors.Errorf("no leader, ctx: %v", ctx)) + return err == nil, err } + leaderIdx := s.updateLeader(leader) if leaderIdx >= 0 { if isLeaderCandidate(s.replicas[leaderIdx]) { s.replicaReadType = kv.ReplicaReadLeader @@ -462,7 +485,7 @@ func (s *replicaSelectorV2) onNotLeader( return true, nil } -func (s *replicaSelectorV2) onFlashbackInProgress(ctx *RPCContext, req *tikvrpc.Request) bool { +func (s *replicaSelector) onFlashbackInProgress(req *tikvrpc.Request) bool { // if the failure is caused by replica read, we can retry it with leader safely. if req.ReplicaRead && s.target != nil && s.target.peer.Id != s.region.GetLeaderPeerID() { req.BusyThresholdMs = 0 @@ -474,13 +497,13 @@ func (s *replicaSelectorV2) onFlashbackInProgress(ctx *RPCContext, req *tikvrpc. return false } -func (s *replicaSelectorV2) onDataIsNotReady() { +func (s *replicaSelector) onDataIsNotReady() { if s.target != nil { - s.target.dataIsNotReady = true + s.target.addFlag(dataIsNotReadyFlag) } } -func (s *replicaSelectorV2) onServerIsBusy( +func (s *replicaSelector) onServerIsBusy( bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, serverIsBusy *errorpb.ServerIsBusy, ) (shouldRetry bool, err error) { var store *Store @@ -495,7 +518,7 @@ func (s *replicaSelectorV2) onServerIsBusy( return false, nil } if s.target != nil { - s.target.serverIsBusy = true + s.target.addFlag(serverIsBusyFlag) } } } else { @@ -515,28 +538,47 @@ func (s *replicaSelectorV2) onServerIsBusy( return true, nil } -func (s *replicaSelectorV2) canFastRetry() bool { +func (s *replicaSelector) canFastRetry() bool { if s.replicaReadType == kv.ReplicaReadLeader { leaderIdx := s.region.getStore().workTiKVIdx leader := s.replicas[leaderIdx] - if isLeaderCandidate(leader) && !leader.serverIsBusy { + if isLeaderCandidate(leader) && !leader.hasFlag(serverIsBusyFlag) { return false } } return true } -func (s *replicaSelectorV2) onReadReqConfigurableTimeout(req *tikvrpc.Request) bool { +func (s *replicaSelector) onReadReqConfigurableTimeout(req *tikvrpc.Request) bool { if isReadReqConfigurableTimeout(req) { if s.target != nil { - s.target.deadlineErrUsingConfTimeout = true + s.target.addFlag(deadlineErrUsingConfTimeoutFlag) } return true } return false } -func (s *replicaSelectorV2) onSendFailure(bo *retry.Backoffer, err error) { +func isReadReqConfigurableTimeout(req *tikvrpc.Request) bool { + if req.MaxExecutionDurationMs >= uint64(client.ReadTimeoutShort.Milliseconds()) { + // Configurable timeout should less than `ReadTimeoutShort`. + return false + } + // Only work for read requests, return false for non-read requests. + return isReadReq(req.Type) +} + +func isReadReq(tp tikvrpc.CmdType) bool { + switch tp { + case tikvrpc.CmdGet, tikvrpc.CmdBatchGet, tikvrpc.CmdScan, + tikvrpc.CmdCop, tikvrpc.CmdBatchCop, tikvrpc.CmdCopStream: + return true + default: + return false + } +} + +func (s *replicaSelector) onSendFailure(bo *retry.Backoffer, err error) { metrics.RegionCacheCounterWithSendFail.Inc() // todo: mark store need check and return to fast retry. target := s.target @@ -554,7 +596,7 @@ func (s *replicaSelectorV2) onSendFailure(bo *retry.Backoffer, err error) { } } -func (s *replicaSelectorV2) onSendSuccess(req *tikvrpc.Request) { +func (s *replicaSelector) onSendSuccess(req *tikvrpc.Request) { if s.proxy != nil && s.target != nil { for idx, r := range s.replicas { if r.peer.Id == s.proxy.peer.Id { @@ -568,19 +610,7 @@ func (s *replicaSelectorV2) onSendSuccess(req *tikvrpc.Request) { } } -func (s *replicaSelectorV2) targetReplica() *replica { - return s.target -} - -func (s *replicaSelectorV2) proxyReplica() *replica { - return s.proxy -} - -func (s *replicaSelectorV2) getLabels() []*metapb.StoreLabel { - return s.option.labels -} - -func (s *replicaSelectorV2) replicaType(_ *RPCContext) string { +func (s *replicaSelector) replicaType() string { if s.target != nil { if s.target.peer.Id == s.region.GetLeaderPeerID() { return "leader" @@ -590,7 +620,7 @@ func (s *replicaSelectorV2) replicaType(_ *RPCContext) string { return "unknown" } -func (s *replicaSelectorV2) String() string { +func (s *replicaSelector) String() string { if s == nil { return "" } diff --git a/internal/locate/replica_selector_test.go b/internal/locate/replica_selector_test.go index a59cd968..e7af35f3 100644 --- a/internal/locate/replica_selector_test.go +++ b/internal/locate/replica_selector_test.go @@ -17,22 +17,19 @@ import ( "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/log" "github.com/pkg/errors" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/tikv/client-go/v2/config" "github.com/tikv/client-go/v2/config/retry" "github.com/tikv/client-go/v2/internal/apicodec" "github.com/tikv/client-go/v2/internal/client" "github.com/tikv/client-go/v2/internal/client/mockserver" - "github.com/tikv/client-go/v2/internal/logutil" "github.com/tikv/client-go/v2/internal/mockstore/mocktikv" "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/util/israce" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" ) type testReplicaSelectorSuite struct { @@ -95,8 +92,8 @@ type replicaSelectorAccessPathCase struct { accessErrInValid bool expect *accessPathResult result accessPathResult - beforeRun func() // beforeRun will be called before the test case execute, if it is nil, resetStoreState will be called. - afterRun func(ReplicaSelector) // afterRun will be called after the test case execute, if it is nil, invalidateRegion will be called. + beforeRun func() // beforeRun will be called before the test case execute, if it is nil, resetStoreState will be called. + afterRun func(*replicaSelector) // afterRun will be called after the test case execute, if it is nil, invalidateRegion will be called. } type accessPathResult struct { @@ -118,29 +115,28 @@ func TestReplicaSelectorBasic(t *testing.T) { rc := s.getRegion() s.NotNil(rc) rc.invalidate(Other) - selector, err := newReplicaSelectorV2(s.cache, rc.VerID(), req) + selector, err := newReplicaSelector(s.cache, rc.VerID(), req) s.Nil(err) s.Nil(selector) s.Equal("", selector.String()) - selector2, err := NewReplicaSelector(s.cache, rc.VerID(), req) + selector2, err := newReplicaSelector(s.cache, rc.VerID(), req) s.Nil(err) s.Nil(selector2) - s.False(selector2 == nil) // since never returns a nil interface value - s.False(selector2.isValid()) + s.True(selector2 == nil) s.Equal("", selector2.String()) rc = s.getRegion() - selector, err = newReplicaSelectorV2(s.cache, rc.VerID(), req) + selector, err = newReplicaSelector(s.cache, rc.VerID(), req) s.Nil(err) s.NotNil(selector) for _, reqSource := range []string{"leader", "follower", "follower", "unknown"} { - ctx, err := selector.next(s.bo, req) + _, err := selector.next(s.bo, req) s.Nil(err) - s.Equal(reqSource, selector.replicaType(ctx)) + s.Equal(reqSource, selector.replicaType()) } rc = s.getRegion() - selector, err = newReplicaSelectorV2(s.cache, rc.VerID(), req) + selector, err = newReplicaSelector(s.cache, rc.VerID(), req) s.Nil(err) s.NotNil(selector) ctx, err := selector.next(s.bo, req) @@ -161,9 +157,9 @@ func TestReplicaSelectorCalculateScore(t *testing.T) { region, err := s.cache.LocateKey(s.bo, []byte("a")) s.Nil(err) s.NotNil(region) - selector, err := NewReplicaSelector(s.cache, region.Region, req) + selector, err := newReplicaSelector(s.cache, region.Region, req) s.Nil(err) - for i, r := range selector.getBaseReplicaSelector().replicas { + for i, r := range selector.replicas { rc := s.cache.GetCachedRegionWithRLock(region.Region) s.NotNil(rc) isLeader := r.peer.Id == rc.GetLeaderPeerID() @@ -1413,7 +1409,7 @@ func TestReplicaReadAccessPathByLeaderCase(t *testing.T) { backoffDetail: []string{"regionScheduling+1"}, regionIsValid: true, }, - afterRun: func(_ ReplicaSelector) { /* don't invalid region */ }, + afterRun: func(_ *replicaSelector) { /* don't invalid region */ }, }, { reqType: tikvrpc.CmdGet, @@ -1431,7 +1427,7 @@ func TestReplicaReadAccessPathByLeaderCase(t *testing.T) { backoffDetail: []string{}, regionIsValid: true, }, - afterRun: func(_ ReplicaSelector) { /* don't invalid region */ }, + afterRun: func(_ *replicaSelector) { /* don't invalid region */ }, }, { reqType: tikvrpc.CmdGet, @@ -1737,7 +1733,7 @@ func TestReplicaReadAccessPathByMixedAndPreferLeaderCase(t *testing.T) { backoffDetail: []string{}, regionIsValid: true, }, - afterRun: func(_ ReplicaSelector) { /* don't invalid region */ }, + afterRun: func(_ *replicaSelector) { /* don't invalid region */ }, }, { reqType: tikvrpc.CmdGet, @@ -1781,7 +1777,7 @@ func TestReplicaReadAccessPathByMixedAndPreferLeaderCase(t *testing.T) { backoffDetail: []string{}, regionIsValid: true, }, - afterRun: func(_ ReplicaSelector) { /* don't invalid region */ }, + afterRun: func(_ *replicaSelector) { /* don't invalid region */ }, }, { reqType: tikvrpc.CmdGet, @@ -2130,7 +2126,7 @@ func TestReplicaReadAccessPathByStaleReadCase(t *testing.T) { backoffDetail: []string{"tikvRPC+2"}, regionIsValid: true, }, - afterRun: func(_ ReplicaSelector) { /* don't invalid region */ }, + afterRun: func(_ *replicaSelector) { /* don't invalid region */ }, }, { reqType: tikvrpc.CmdGet, @@ -2149,7 +2145,7 @@ func TestReplicaReadAccessPathByStaleReadCase(t *testing.T) { backoffDetail: []string{"tikvServerBusy+1"}, regionIsValid: false, }, - afterRun: func(_ ReplicaSelector) { /* don't invalid region */ }, + afterRun: func(_ *replicaSelector) { /* don't invalid region */ }, }, } s.True(s.runMultiCaseAndCompare(cas)) @@ -2326,27 +2322,15 @@ func TestReplicaReadAccessPathByFlashbackInProgressCase(t *testing.T) { expect: &accessPathResult{ accessPath: []string{ "{addr: store1, replica-read: true, stale-read: false}", - "{addr: store1, replica-read: true, stale-read: false}", }, - respErr: "", + respErr: "region 0 is in flashback progress, FlashbackStartTS is 0", respRegionError: nil, backoffCnt: 0, backoffDetail: []string{}, regionIsValid: true, }, } - s.True(s.runCase(ca, false)) - ca.expect = &accessPathResult{ - accessPath: []string{ - "{addr: store1, replica-read: true, stale-read: false}", - }, - respErr: "region 0 is in flashback progress, FlashbackStartTS is 0", - respRegionError: nil, - backoffCnt: 0, - backoffDetail: []string{}, - regionIsValid: true, - } - s.True(s.runCase(ca, true)) + s.True(s.runCaseAndCompare(ca)) ca = replicaSelectorAccessPathCase{ reqType: tikvrpc.CmdGet, @@ -2359,29 +2343,16 @@ func TestReplicaReadAccessPathByFlashbackInProgressCase(t *testing.T) { accessPath: []string{ "{addr: store1, replica-read: true, stale-read: false}", "{addr: store2, replica-read: true, stale-read: false}", - "{addr: store1, replica-read: true, stale-read: false}", + "{addr: store3, replica-read: true, stale-read: false}", }, - respErr: "region 0 is in flashback progress, FlashbackStartTS is 0", - respRegionError: nil, + respErr: "", + respRegionError: fakeEpochNotMatch, backoffCnt: 0, backoffDetail: []string{}, regionIsValid: true, }, } - s.True(s.runCase(ca, false)) - ca.expect = &accessPathResult{ - accessPath: []string{ - "{addr: store1, replica-read: true, stale-read: false}", - "{addr: store2, replica-read: true, stale-read: false}", - "{addr: store3, replica-read: true, stale-read: false}", - }, - respErr: "", - respRegionError: fakeEpochNotMatch, - backoffCnt: 0, - backoffDetail: []string{}, - regionIsValid: true, - } - s.True(s.runCase(ca, true)) + s.True(s.runCaseAndCompare(ca)) } func TestReplicaReadAccessPathByProxyCase(t *testing.T) { @@ -2409,7 +2380,7 @@ func TestReplicaReadAccessPathByProxyCase(t *testing.T) { backoffDetail: []string{"tikvRPC+1"}, regionIsValid: true, }, - afterRun: func(_ ReplicaSelector) { /* don't invalid region */ }, + afterRun: func(_ *replicaSelector) { /* don't invalid region */ }, }, { reqType: tikvrpc.CmdGet, @@ -2485,8 +2456,8 @@ func TestReplicaReadAccessPathByProxyCase(t *testing.T) { backoffDetail: []string{"tikvRPC+1", "tikvServerBusy+2"}, regionIsValid: false, }, - afterRun: func(selector ReplicaSelector) { - base := selector.getBaseReplicaSelector() + afterRun: func(selector *replicaSelector) { + base := selector.baseReplicaSelector s.NotNil(base) s.True(base.replicas[0].isEpochStale()) s.True(base.replicas[0].epoch < atomic.LoadUint32(&base.replicas[0].store.epoch)) @@ -2651,8 +2622,7 @@ func TestReplicaReadAvoidSlowStore(t *testing.T) { store.healthStatus.updateTiKVServerSideSlowScore(100, time.Now()) }, } - // v1 doesn't support avoiding slow stores. We only test this on v2. - s.True(s.runCase(ca, true)) + s.True(s.runCaseAndCompare(ca)) s.T().Logf("test case: stale read: %v, with label: %v, slow: false, encoutner err: true", staleRead, withLabel) var expectedSecondPath string @@ -2731,100 +2701,11 @@ func TestReplicaReadAvoidSlowStore(t *testing.T) { store.healthStatus.updateTiKVServerSideSlowScore(100, time.Now()) }, } - s.True(s.runCase(ca, true)) + s.True(s.runCaseAndCompare(ca)) } } } -func TestReplicaReadAccessPathByGenError(t *testing.T) { - t.Skip("skip TestReplicaReadAccessPathByGenError because it's unstable and slow.") - s := new(testReplicaSelectorSuite) - s.SetupTest(t) - defer func(lv zapcore.Level) { - log.SetLevel(lv) - s.TearDownTest() - }(log.GetLevel()) - log.SetLevel(zapcore.ErrorLevel) - - maxAccessErrCnt := 4 - if israce.RaceEnabled { - // When run this test with race, it will take a long time, so we reduce the maxAccessErrCnt to 2 to speed up test to avoid timeout. - maxAccessErrCnt = 2 - } - totalValidCaseCount := 0 - totalCaseCount := 0 - lastLogCnt := 0 - testCase := func(req tikvrpc.CmdType, readType kv.ReplicaReadType, staleRead bool, timeout time.Duration, busyThresholdMs uint32, label *metapb.StoreLabel) { - isRead := isReadReq(req) - accessErrGen := newAccessErrGenerator(isRead, staleRead, maxAccessErrCnt) - for { - accessErr, done := accessErrGen.genAccessErr(staleRead) - if done { - break - } - ca := replicaSelectorAccessPathCase{ - reqType: req, - readType: readType, - staleRead: staleRead, - timeout: timeout, - busyThresholdMs: busyThresholdMs, - label: label, - accessErr: accessErr, - } - valid := s.runCaseAndCompare(ca) - if valid { - totalValidCaseCount++ - } - totalCaseCount++ - if totalCaseCount-lastLogCnt > 100000 { - lastLogCnt = totalCaseCount - logutil.BgLogger().Info("TestReplicaReadAccessPathByGenError is running", - zap.Int("total-case", totalCaseCount), - zap.Int("valid-case", totalValidCaseCount), - zap.Int("invalid-case", totalCaseCount-totalValidCaseCount), - zap.String("req", req.String()), - zap.String("read-type", readType.String()), - zap.Bool("stale-read", staleRead), - zap.Duration("timeout", timeout), - zap.Any("label", label), - ) - } - } - } - - testCase(tikvrpc.CmdPrewrite, kv.ReplicaReadLeader, false, 0, 0, nil) - testCase(tikvrpc.CmdPrewrite, kv.ReplicaReadLeader, false, 0, 10, nil) - testCase(tikvrpc.CmdGet, kv.ReplicaReadLeader, false, 0, 0, nil) - testCase(tikvrpc.CmdGet, kv.ReplicaReadLeader, false, 0, 10, nil) - testCase(tikvrpc.CmdGet, kv.ReplicaReadFollower, false, 0, 0, nil) - testCase(tikvrpc.CmdGet, kv.ReplicaReadPreferLeader, false, 0, 0, nil) - testCase(tikvrpc.CmdGet, kv.ReplicaReadMixed, false, 0, 0, nil) - testCase(tikvrpc.CmdGet, kv.ReplicaReadLeader, false, time.Second, 0, nil) - testCase(tikvrpc.CmdGet, kv.ReplicaReadMixed, false, time.Second, 0, nil) - testCase(tikvrpc.CmdGet, kv.ReplicaReadMixed, false, time.Second, 0, &metapb.StoreLabel{Key: "id", Value: "1"}) - testCase(tikvrpc.CmdGet, kv.ReplicaReadMixed, false, time.Second, 0, &metapb.StoreLabel{Key: "id", Value: "2"}) - testCase(tikvrpc.CmdGet, kv.ReplicaReadMixed, false, time.Second, 0, &metapb.StoreLabel{Key: "id", Value: "3"}) - testCase(tikvrpc.CmdGet, kv.ReplicaReadMixed, true, 0, 0, nil) - testCase(tikvrpc.CmdGet, kv.ReplicaReadMixed, true, 0, 0, &metapb.StoreLabel{Key: "id", Value: "1"}) - testCase(tikvrpc.CmdGet, kv.ReplicaReadMixed, true, 0, 0, &metapb.StoreLabel{Key: "id", Value: "2"}) - testCase(tikvrpc.CmdGet, kv.ReplicaReadMixed, true, 0, 0, &metapb.StoreLabel{Key: "id", Value: "3"}) - testCase(tikvrpc.CmdGet, kv.ReplicaReadMixed, true, time.Second, 0, &metapb.StoreLabel{Key: "id", Value: "1"}) - testCase(tikvrpc.CmdGet, kv.ReplicaReadMixed, true, time.Second, 0, &metapb.StoreLabel{Key: "id", Value: "2"}) - testCase(tikvrpc.CmdGet, kv.ReplicaReadMixed, true, time.Second, 0, &metapb.StoreLabel{Key: "id", Value: "3"}) - - // Test for forwarding proxy. - s.cache.enableForwarding = true - testCase(tikvrpc.CmdPrewrite, kv.ReplicaReadLeader, false, 0, 0, nil) - testCase(tikvrpc.CmdPrewrite, kv.ReplicaReadLeader, false, 0, 10, nil) - testCase(tikvrpc.CmdGet, kv.ReplicaReadLeader, false, 0, 0, nil) - testCase(tikvrpc.CmdGet, kv.ReplicaReadLeader, false, 0, 10, nil) - - logutil.BgLogger().Info("TestReplicaReadAccessPathByGenError Finished", - zap.Int("total-case", totalCaseCount), - zap.Int("valid-case", totalValidCaseCount), - zap.Int("invalid-case", totalCaseCount-totalValidCaseCount)) -} - func (s *testReplicaSelectorSuite) changeRegionLeader(storeId uint64) { loc, err := s.cache.LocateKey(s.bo, []byte("key")) s.Nil(err) @@ -2838,72 +2719,27 @@ func (s *testReplicaSelectorSuite) changeRegionLeader(storeId uint64) { s.cache.InvalidateCachedRegion(loc.Region) } -func (s *testReplicaSelectorSuite) runCaseAndCompare(ca1 replicaSelectorAccessPathCase) bool { - ca2 := ca1 - config.UpdateGlobal(func(conf *config.Config) { - conf.TiKVClient.EnableReplicaSelectorV2 = false - }) - sender := ca1.run(s) - ca1.checkResult(s, "v1", sender) - - config.UpdateGlobal(func(conf *config.Config) { - conf.TiKVClient.EnableReplicaSelectorV2 = true - }) - sender = ca2.run(s) - if ca2.expect == nil { - // compare with ca1 result. - ca2.expect = &ca1.result - } - ca2.checkResult(s, "v2", sender) - return !ca1.accessErrInValid -} - -func (s *testReplicaSelectorSuite) runCase(ca replicaSelectorAccessPathCase, v2 bool) bool { - config.UpdateGlobal(func(conf *config.Config) { - conf.TiKVClient.EnableReplicaSelectorV2 = v2 - }) +func (s *testReplicaSelectorSuite) runCaseAndCompare(ca replicaSelectorAccessPathCase) bool { sender := ca.run(s) - version := "v1" - if v2 { - version = "v2" - } - ca.checkResult(s, version, sender) + ca.checkResult(s, sender) return !ca.accessErrInValid } func (s *testReplicaSelectorSuite) runMultiCaseAndCompare(cas []replicaSelectorAccessPathCase) bool { - expects := make([]accessPathResult, 0, len(cas)) valid := true - config.UpdateGlobal(func(conf *config.Config) { - conf.TiKVClient.EnableReplicaSelectorV2 = false - }) - for _, ca1 := range cas { - sender := ca1.run(s) - ca1.checkResult(s, "v1", sender) - expects = append(expects, ca1.result) - valid = valid && !ca1.accessErrInValid - } - - config.UpdateGlobal(func(conf *config.Config) { - conf.TiKVClient.EnableReplicaSelectorV2 = true - }) - for i, ca2 := range cas { - sender := ca2.run(s) - if ca2.expect == nil { - // compare with ca1 result. - ca2.expect = &expects[i] - } - ca2.checkResult(s, "v2", sender) - valid = valid && !ca2.accessErrInValid + for _, ca := range cas { + sender := ca.run(s) + ca.checkResult(s, sender) + valid = valid && !ca.accessErrInValid } return valid } -func (ca *replicaSelectorAccessPathCase) checkResult(s *testReplicaSelectorSuite, version string, sender *RegionRequestSender) { +func (ca *replicaSelectorAccessPathCase) checkResult(s *testReplicaSelectorSuite, sender *RegionRequestSender) { if ca.expect == nil { return } - msg := fmt.Sprintf("enable_forwarding: %v\nversion: %v\n%v\nsender: %v\n", s.cache.enableForwarding, version, ca.Format(), sender.String()) + msg := fmt.Sprintf("enable_forwarding: %v\n%v\nsender: %v\n", s.cache.enableForwarding, ca.Format(), sender.String()) expect := ca.expect result := ca.result s.Equal(expect.accessPath, result.accessPath, msg) @@ -2953,7 +2789,7 @@ func (ca *replicaSelectorAccessPathCase) run(s *testReplicaSelectorSuite) *Regio s.NotNil(rc) bo := retry.NewBackofferWithVars(context.Background(), 40000, nil) resp, _, _, err := sender.SendReqCtx(bo, req, rc.VerID(), timeout, tikvrpc.TiKV, opts...) - ca.recordResult(s, bo, sender.replicaSelector.getBaseReplicaSelector().region, access, resp, err) + ca.recordResult(s, bo, sender.replicaSelector.region, access, resp, err) afterRun(ca, sender) return sender } @@ -3312,43 +3148,6 @@ func (tp RegionErrorType) String() string { } } -type accessErrGenerator struct { - maxAccessErrCnt int - mode int - idx int - baseIdx int - allErrs []RegionErrorType - retryErrs []RegionErrorType -} - -func newAccessErrGenerator(isRead, staleRead bool, maxAccessErrCnt int) *accessErrGenerator { - filter := func(tp RegionErrorType) bool { - // read request won't meet RaftEntryTooLargeErr. - if isRead && tp == RaftEntryTooLargeErr { - return false - } - if staleRead == false && tp == DataIsNotReadyErr { - return false - } - // TODO: since v2 has come compatibility issue with v1, so skip FlashbackInProgressErr. - if tp == FlashbackInProgressErr { - return false - } - return true - } - allErrs := getAllRegionErrors(filter) - retryableErr := getAllRegionErrors(func(tp RegionErrorType) bool { - return filter(tp) && isRegionErrorRetryable(tp) - }) - return &accessErrGenerator{ - maxAccessErrCnt: maxAccessErrCnt, - mode: 0, - idx: 0, - allErrs: allErrs, - retryErrs: retryableErr, - } -} - func getAllRegionErrors(filter func(errorType RegionErrorType) bool) []RegionErrorType { errs := make([]RegionErrorType, 0, int(RegionErrorTypeMax)) for tp := NotLeaderErr; tp < RegionErrorTypeMax; tp++ { @@ -3360,84 +3159,6 @@ func getAllRegionErrors(filter func(errorType RegionErrorType) bool) []RegionErr return errs } -func isRegionErrorRetryable(tp RegionErrorType) bool { - switch tp { - case NotLeaderErr, NotLeaderWithNewLeader1Err, NotLeaderWithNewLeader2Err, NotLeaderWithNewLeader3Err, ServerIsBusyErr, ServerIsBusyWithEstimatedWaitMsErr, - StaleCommandErr, MaxTimestampNotSyncedErr, ReadIndexNotReadyErr, ProposalInMergingModeErr, DataIsNotReadyErr, - RegionNotInitializedErr, FlashbackInProgressErr, DiskFullErr, DeadLineExceededErr: - return true - } - return false -} - -func (a *accessErrGenerator) genAccessErr(staleRead bool) ([]RegionErrorType, bool) { - if a.mode == 0 { - a.idx = 0 - a.mode = 1 - return nil, false - } - if a.mode == 1 { - idx := a.idx - a.idx++ - if a.idx >= len(a.allErrs) { - a.idx = 0 - a.baseIdx = 0 - a.mode = 2 - } - return []RegionErrorType{a.allErrs[idx]}, false - } - for a.mode <= a.maxAccessErrCnt { - errs := a.genAccessErrs(a.allErrs, a.retryErrs) - if len(errs) > 0 { - return errs, false - } - a.baseIdx = 0 - a.idx = 0 - a.mode++ - // if mode >= 4 , reduce the error type to avoid generating too many combinations. - if a.mode > 4 { - if a.mode > 8 { - a.allErrs = []RegionErrorType{ServerIsBusyErr, ServerIsBusyWithEstimatedWaitMsErr} - } else if a.mode > 7 { - a.allErrs = []RegionErrorType{ServerIsBusyWithEstimatedWaitMsErr, DeadLineExceededErr} - } else if a.mode > 6 { - a.allErrs = []RegionErrorType{NotLeaderWithNewLeader2Err, ServerIsBusyWithEstimatedWaitMsErr, DeadLineExceededErr} - } else if a.mode > 5 { - a.allErrs = []RegionErrorType{NotLeaderErr, NotLeaderWithNewLeader2Err, ServerIsBusyWithEstimatedWaitMsErr, DeadLineExceededErr} - } else { - a.allErrs = []RegionErrorType{NotLeaderErr, NotLeaderWithNewLeader1Err, NotLeaderWithNewLeader2Err, NotLeaderWithNewLeader3Err, ServerIsBusyWithEstimatedWaitMsErr, RegionNotInitializedErr, DeadLineExceededErr} - } - if staleRead { - a.allErrs = append(a.allErrs, DataIsNotReadyErr) - } - a.retryErrs = a.allErrs - } - } - return nil, true -} - -func (a *accessErrGenerator) genAccessErrs(allErrs, retryErrs []RegionErrorType) []RegionErrorType { - defer func() { - a.baseIdx++ - if a.baseIdx >= len(allErrs) { - a.baseIdx = 0 - a.idx++ - } - }() - mode := a.mode - errs := make([]RegionErrorType, mode) - errs[mode-1] = allErrs[a.baseIdx%len(allErrs)] - value := a.idx - for i := mode - 2; i >= 0; i-- { - if i == 0 && value > len(retryErrs) { - return nil - } - errs[i] = retryErrs[value%len(retryErrs)] - value = value / len(retryErrs) - } - return errs -} - func (s *testReplicaSelectorSuite) getRegion() *Region { for i := 0; i < 100; i++ { loc, err := s.cache.LocateKey(s.bo, []byte("key")) @@ -3517,6 +3238,28 @@ func TestTiKVClientReadTimeout(t *testing.T) { }, accessPath) } +func TestReplicaFlag(t *testing.T) { + r := &replica{} + allFlags := []uint8{deadlineErrUsingConfTimeoutFlag, dataIsNotReadyFlag, notLeaderFlag, serverIsBusyFlag} + for i, flag := range allFlags { + if i > 0 { + require.True(t, flag > allFlags[i-1]) + } + for j := i; j < len(allFlags); j++ { + require.Equal(t, false, r.hasFlag(allFlags[j])) + } + r.addFlag(flag) + require.Equal(t, true, r.hasFlag(flag)) + } + for i, flag := range allFlags { + for j := i; j < len(allFlags); j++ { + require.Equal(t, true, r.hasFlag(allFlags[j])) + } + r.deleteFlag(flag) + require.Equal(t, false, r.hasFlag(flag)) + } +} + func BenchmarkReplicaSelector(b *testing.B) { mvccStore := mocktikv.MustNewMVCCStore() cluster := mocktikv.NewCluster(mvccStore) @@ -3528,9 +3271,6 @@ func BenchmarkReplicaSelector(b *testing.B) { mvccStore.Close() }() - config.UpdateGlobal(func(conf *config.Config) { - conf.TiKVClient.EnableReplicaSelectorV2 = true - }) cnt := 0 allErrs := getAllRegionErrors(nil) fnClient := &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {