replica selector refactor (#1142)

* init

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* add ReplicaSelector interface

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* add replica_selector_v2, todo: fix test

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* fix all test

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* fix test in another way to compatible with old version about stale-read request

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* tiny refactor

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* refactor to remove duplicate code

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* add more test

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* refine onServerIsBusy region error handing logic

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* support forwarding by proxy

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* support busyThreshold and tiny reractor code

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* add config

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* fix test

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* fix proxy bug

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* fix test and tiny refactor

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* add some test

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* add more test

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* tidy refine

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* address comment

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* fix test in v1

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* add backoff-cnt check and timeout test

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* refine test

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* refine code

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* refine code

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* add benchmark test

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* make test stale

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* fix golangci

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* add more test and refine code

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* fix race test

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* add more comprehensive enumeration testing

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* refine test

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* fix test

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* fix accessFollower with label retry bug

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* refine test and fix some bug

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* fix test and add more test

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* add more test

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* refine test

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* refine test

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* refine onNotLeader logic when new leader is not available

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* refine calculateScore logic, if the replica already tried, decrease the score

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* refine logic: replica-read request with mixed strategy and with label, should be able to retry all remain replicas

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* prefer-leader for mixed read with label for non stale-read req

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* add more test case

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* refine code

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* resolve conflict

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* remove some duplicate test

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* refactor test

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* refine code

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* refine test

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* add busy_threshold test and fix bug

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* fix proxy bug and add more test

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* refine code and add test

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* add learner test

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* add more test

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* remove old test and refine test

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* move test

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* address comment

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* use new score calculation by flag bit

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* make test stable

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* refine comment,license and fix lint

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* refine comment

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* refine comment

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* fix race test timeout

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* add flagNotAttemp in score

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* address comment

Signed-off-by: crazycs520 <crazycs520@gmail.com>

---------

Signed-off-by: crazycs520 <crazycs520@gmail.com>
This commit is contained in:
crazycs 2024-03-11 15:40:44 +08:00 committed by GitHub
parent 9a37a0a77f
commit 8d6a95f73d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 3165 additions and 316 deletions

View File

@ -82,6 +82,8 @@ type Config struct {
RegionsRefreshInterval uint64
// EnablePreload indicates whether to preload region info when initializing the client.
EnablePreload bool
// EnableReplicaSelectorV2 indicates whether to enable ReplicaSelectorV2.
EnableReplicaSelectorV2 bool
}
// DefaultConfig returns the default configuration.
@ -99,6 +101,7 @@ func DefaultConfig() Config {
TxnScope: "",
EnableAsyncCommit: false,
Enable1PC: false,
EnableReplicaSelectorV2: true,
}
}

View File

@ -110,7 +110,7 @@ type RegionRequestSender struct {
client client.Client
storeAddr string
rpcError error
replicaSelector *replicaSelector
replicaSelector ReplicaSelector
failStoreIDs map[uint64]struct{}
failProxyStoreIDs map[uint64]struct{}
RegionRequestRuntimeStats
@ -258,6 +258,7 @@ type replica struct {
deadlineErrUsingConfTimeout bool
dataIsNotReady bool
notLeader bool
serverIsBusy bool
}
func (r *replica) getEpoch() uint32 {
@ -282,17 +283,10 @@ func (r *replica) onUpdateLeader() {
r.notLeader = false
}
type replicaSelector struct {
type baseReplicaSelector struct {
regionCache *RegionCache
region *Region
regionStore *regionStore
replicas []*replica
labels []*metapb.StoreLabel
state selectorState
// replicas[targetIdx] is the replica handling the request this time
targetIdx AccessIndex
// replicas[proxyIdx] is the store used to redirect requests this time
proxyIdx AccessIndex
// TiKV can reject the request when its estimated wait duration exceeds busyThreshold.
// Then, the client will receive a ServerIsBusy error and choose another replica to retry.
busyThreshold time.Duration
@ -315,6 +309,18 @@ type replicaSelector struct {
pendingBackoffs map[uint64]*backoffArgs
}
// TODO(crazycs520): remove this after replicaSelectorV2 stable.
type replicaSelector struct {
baseReplicaSelector
regionStore *regionStore
labels []*metapb.StoreLabel
state selectorState
// replicas[targetIdx] is the replica handling the request this time
targetIdx AccessIndex
// replicas[proxyIdx] is the store used to redirect requests this time
proxyIdx AccessIndex
}
func selectorStateToString(state selectorState) string {
replicaSelectorState := "nil"
if state != nil {
@ -345,11 +351,18 @@ func selectorStateToString(state selectorState) string {
}
func (s *replicaSelector) String() string {
var replicaStatus []string
cacheRegionIsValid := "unknown"
selectorStateStr := "nil"
if s != nil {
selectorStateStr = selectorStateToString(s.state)
}
return fmt.Sprintf("replicaSelector{state: %v, %v}", selectorStateStr, s.baseReplicaSelector.String())
}
func (s *baseReplicaSelector) String() string {
var replicaStatus []string
cacheRegionIsValid := "unknown"
if s != nil {
if s.region != nil {
if s.region.isValid() {
cacheRegionIsValid = "true"
@ -371,8 +384,7 @@ func (s *replicaSelector) String() string {
))
}
}
return fmt.Sprintf("replicaSelector{selectorStateStr: %v, cacheRegionIsValid: %v, replicaStatus: %v}", selectorStateStr, cacheRegionIsValid, replicaStatus)
return fmt.Sprintf("cacheRegionIsValid: %v, replicaStatus: %v", cacheRegionIsValid, replicaStatus)
}
// selectorState is the interface of states of the replicaSelector.
@ -463,17 +475,17 @@ func (state *accessKnownLeader) next(bo *retry.Backoffer, selector *replicaSelec
// check leader is candidate or not.
func isLeaderCandidate(leader *replica) bool {
liveness := leader.store.getLivenessState()
// If hibernate region is enabled and the leader is not reachable, the raft group
// will not be wakened up and re-elect the leader until the follower receives
// a request. So, before the new leader is elected, we should not send requests
// to the unreachable old leader to avoid unnecessary timeout.
// If leader.deadlineErrUsingConfTimeout is true, it means the leader is already tried and received deadline exceeded error, then don't retry it.
// If leader.notLeader is true, it means the leader is already tried and received not leader error, then don't retry it.
if liveness != reachable ||
if leader.store.getLivenessState() != reachable ||
leader.isExhausted(maxReplicaAttempt, maxReplicaAttemptTime) ||
leader.deadlineErrUsingConfTimeout ||
leader.notLeader {
leader.notLeader ||
leader.isEpochStale() { // check leader epoch here, if leader.epoch staled, we can try other replicas. instead of buildRPCContext failed and invalidate region then retry.
return false
}
return true
@ -557,9 +569,9 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (
if selector.targetIdx < 0 {
// when meet deadline exceeded error, do fast retry without invalidate region cache.
if !hasDeadlineExceededError(selector.replicas) {
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc()
selector.invalidateRegion()
}
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc()
return nil, nil
}
rpcCtx, err := selector.buildRPCContext(bo)
@ -633,7 +645,7 @@ func (state *accessByKnownProxy) onSendFailure(bo *retry.Backoffer, selector *re
}
func (state *accessByKnownProxy) onNoLeader(selector *replicaSelector) {
selector.state = &invalidLeader{}
selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true}
}
// tryNewProxy is the state where we try to find a node from followers as proxy.
@ -696,7 +708,7 @@ func (state *tryNewProxy) onSendFailure(bo *retry.Backoffer, selector *replicaSe
}
func (state *tryNewProxy) onNoLeader(selector *replicaSelector) {
selector.state = &invalidLeader{}
selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true}
}
// accessFollower is the state where we are sending requests to TiKV followers.
@ -759,13 +771,14 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
idx = state.lastIdx
} else {
// randomly select next replica, but skip state.lastIdx
if (i+offset)%replicaSize == int(state.leaderIdx) {
// since i must be greater than or equal to 1, so use i-1 to try from the first replica to make test stable.
if (i-1+offset)%replicaSize == int(state.leaderIdx) {
offset++
}
idx = AccessIndex((i + offset) % replicaSize)
idx = AccessIndex((i - 1 + offset) % replicaSize)
}
} else {
idx = AccessIndex((int(state.lastIdx) + i) % replicaSize)
idx = AccessIndex((offset + i) % replicaSize)
}
selectReplica := selector.replicas[idx]
if state.isCandidate(idx, selectReplica) {
@ -820,9 +833,9 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
}
// when meet deadline exceeded error, do fast retry without invalidate region cache.
if !hasDeadlineExceededError(selector.replicas) {
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc()
selector.invalidateRegion()
}
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc()
return nil, nil
}
state.lastIdx = state.leaderIdx
@ -929,9 +942,9 @@ func (state *tryIdleReplica) next(bo *retry.Backoffer, selector *replicaSelector
if targetIdx == state.leaderIdx && !isLeaderCandidate(selector.replicas[targetIdx]) {
// when meet deadline exceeded error, do fast retry without invalidate region cache.
if !hasDeadlineExceededError(selector.replicas) {
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc()
selector.invalidateRegion()
}
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc()
return nil, nil
}
selector.targetIdx = targetIdx
@ -999,19 +1012,8 @@ func newReplicaSelector(
return nil, errors.New("cached region ttl expired")
}
replicas := buildTiKVReplicas(cachedRegion)
regionStore := cachedRegion.getStore()
replicas := make([]*replica, 0, regionStore.accessStoreNum(tiKVOnly))
for _, storeIdx := range regionStore.accessIndex[tiKVOnly] {
replicas = append(
replicas, &replica{
store: regionStore.stores[storeIdx],
peer: cachedRegion.meta.Peers[storeIdx],
epoch: regionStore.storeEpochs[storeIdx],
attempts: 0,
},
)
}
option := storeSelectorOp{}
for _, op := range opts {
op(&option)
@ -1038,6 +1040,35 @@ func newReplicaSelector(
}
}
return &replicaSelector{
baseReplicaSelector: baseReplicaSelector{
regionCache: regionCache,
region: cachedRegion,
replicas: replicas,
busyThreshold: time.Duration(req.BusyThresholdMs) * time.Millisecond,
},
regionStore: regionStore,
labels: option.labels,
state: state,
targetIdx: -1,
proxyIdx: -1,
}, nil
}
func buildTiKVReplicas(region *Region) []*replica {
regionStore := region.getStore()
replicas := make([]*replica, 0, regionStore.accessStoreNum(tiKVOnly))
for _, storeIdx := range regionStore.accessIndex[tiKVOnly] {
replicas = append(
replicas, &replica{
store: regionStore.stores[storeIdx],
peer: region.meta.Peers[storeIdx],
epoch: regionStore.storeEpochs[storeIdx],
attempts: 0,
},
)
}
if val, err := util.EvalFailpoint("newReplicaSelectorInitialAttemptedTime"); err == nil {
attemptedTime, err := time.ParseDuration(val.(string))
if err != nil {
@ -1047,19 +1078,7 @@ func newReplicaSelector(
r.attemptedTime = attemptedTime
}
}
return &replicaSelector{
regionCache,
cachedRegion,
regionStore,
replicas,
option.labels,
state,
-1,
-1,
time.Duration(req.BusyThresholdMs) * time.Millisecond,
nil,
}, nil
return replicas
}
const (
@ -1072,7 +1091,7 @@ const (
// next creates the RPCContext of the current candidate replica.
// It returns a SendError if runs out of all replicas or the cached region is invalidated.
func (s *replicaSelector) next(bo *retry.Backoffer) (rpcCtx *RPCContext, err error) {
func (s *replicaSelector) next(bo *retry.Backoffer, _ *tikvrpc.Request) (rpcCtx *RPCContext, err error) {
if !s.region.isValid() {
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("invalid").Inc()
return nil, nil
@ -1103,6 +1122,10 @@ func (s *replicaSelector) proxyReplica() *replica {
return nil
}
func (s *replicaSelector) getLabels() []*metapb.StoreLabel {
return s.labels
}
// sliceIdentical checks whether two slices are referencing the same block of memory. Two `nil`s are also considered
// the same.
func sliceIdentical[T any](a, b []T) bool {
@ -1146,8 +1169,10 @@ func (s *replicaSelector) refreshRegionStore() {
}
func (s *replicaSelector) buildRPCContext(bo *retry.Backoffer) (*RPCContext, error) {
targetReplica, proxyReplica := s.targetReplica(), s.proxyReplica()
return s.baseReplicaSelector.buildRPCContext(bo, s.targetReplica(), s.proxyReplica())
}
func (s *baseReplicaSelector) buildRPCContext(bo *retry.Backoffer, targetReplica, proxyReplica *replica) (*RPCContext, error) {
// Backoff and retry if no replica is selected or the selected replica is stale
if targetReplica == nil || targetReplica.isEpochStale() ||
(proxyReplica != nil && proxyReplica.isEpochStale()) {
@ -1201,11 +1226,7 @@ func (s *replicaSelector) onSendFailure(bo *retry.Backoffer, err error) {
}
func (s *replicaSelector) onReadReqConfigurableTimeout(req *tikvrpc.Request) bool {
if req.MaxExecutionDurationMs >= uint64(client.ReadTimeoutShort.Milliseconds()) {
// Configurable timeout should less than `ReadTimeoutShort`.
return false
}
if isReadReq(req.Type) {
if isReadReqConfigurableTimeout(req) {
if target := s.targetReplica(); target != nil {
target.deadlineErrUsingConfTimeout = true
}
@ -1215,10 +1236,18 @@ func (s *replicaSelector) onReadReqConfigurableTimeout(req *tikvrpc.Request) boo
}
return true
}
// Only work for read requests, return false for non-read requests.
return false
}
func isReadReqConfigurableTimeout(req *tikvrpc.Request) bool {
if req.MaxExecutionDurationMs >= uint64(client.ReadTimeoutShort.Milliseconds()) {
// Configurable timeout should less than `ReadTimeoutShort`.
return false
}
// Only work for read requests, return false for non-read requests.
return isReadReq(req.Type)
}
func isReadReq(tp tikvrpc.CmdType) bool {
switch tp {
case tikvrpc.CmdGet, tikvrpc.CmdBatchGet, tikvrpc.CmdScan,
@ -1229,11 +1258,15 @@ func isReadReq(tp tikvrpc.CmdType) bool {
}
}
func (s *replicaSelector) checkLiveness(bo *retry.Backoffer, accessReplica *replica) livenessState {
func (s *baseReplicaSelector) getBaseReplicaSelector() *baseReplicaSelector {
return s
}
func (s *baseReplicaSelector) checkLiveness(bo *retry.Backoffer, accessReplica *replica) livenessState {
return accessReplica.store.requestLivenessAndStartHealthCheckLoopIfNeeded(bo, s.regionCache)
}
func (s *replicaSelector) invalidateReplicaStore(replica *replica, cause error) {
func (s *baseReplicaSelector) invalidateReplicaStore(replica *replica, cause error) {
store := replica.store
if atomic.CompareAndSwapUint32(&store.epoch, replica.epoch, replica.epoch+1) {
logutil.BgLogger().Info(
@ -1249,7 +1282,7 @@ func (s *replicaSelector) invalidateReplicaStore(replica *replica, cause error)
}
}
func (s *replicaSelector) onSendSuccess() {
func (s *replicaSelector) onSendSuccess(_ *tikvrpc.Request) {
s.state.onSendSuccess(s)
}
@ -1259,24 +1292,38 @@ func (s *replicaSelector) onNotLeader(
if target := s.targetReplica(); target != nil {
target.notLeader = true
}
leader := notLeader.GetLeader()
if leader == nil {
// The region may be during transferring leader.
s.state.onNoLeader(s)
if err = bo.Backoff(retry.BoRegionScheduling, errors.Errorf("no leader, ctx: %v", ctx)); err != nil {
leaderIdx, err := s.baseReplicaSelector.onNotLeader(bo, ctx, notLeader)
if err != nil {
return false, err
}
if leaderIdx >= 0 {
if isLeaderCandidate(s.replicas[leaderIdx]) {
s.state = &accessKnownLeader{leaderIdx: AccessIndex(leaderIdx)}
}
} else {
s.updateLeader(notLeader.GetLeader())
s.state.onNoLeader(s)
}
return true, nil
}
func (s *baseReplicaSelector) onNotLeader(
bo *retry.Backoffer, ctx *RPCContext, notLeader *errorpb.NotLeader,
) (leaderIdx int, err error) {
leader := notLeader.GetLeader()
if leader == nil {
// The region may be during transferring leader.
err = bo.Backoff(retry.BoRegionScheduling, errors.Errorf("no leader, ctx: %v", ctx))
return -1, err
}
return s.updateLeader(leader), nil
}
// updateLeader updates the leader of the cached region.
// If the leader peer isn't found in the region, the region will be invalidated.
func (s *replicaSelector) updateLeader(leader *metapb.Peer) {
// If switch to new leader successfully, returns the AccessIndex of the new leader in the replicas.
func (s *baseReplicaSelector) updateLeader(leader *metapb.Peer) int {
if leader == nil {
return
return -1
}
for i, replica := range s.replicas {
if isSamePeer(replica.peer, leader) {
@ -1285,14 +1332,9 @@ func (s *replicaSelector) updateLeader(leader *metapb.Peer) {
// a request. So, before the new leader is elected, we should not send requests
// to the unreachable old leader to avoid unnecessary timeout.
if replica.store.getLivenessState() != reachable {
return
return -1
}
replica.onUpdateLeader()
if isLeaderCandidate(s.replicas[i]) {
// If the new leader is candidate, switch to the new leader.
// the leader may have deadlineErrUsingConfTimeout and isn't candidate, if so, keep the state unchanged and retry the request.
s.state = &accessKnownLeader{leaderIdx: AccessIndex(i)}
}
// Update the workTiKVIdx so that following requests can be sent to the leader immediately.
if !s.region.switchWorkLeaderToPeer(leader) {
panic("the store must exist")
@ -1302,11 +1344,12 @@ func (s *replicaSelector) updateLeader(leader *metapb.Peer) {
zap.Uint64("regionID", s.region.GetID()),
zap.Uint64("leaderStoreID", leader.GetStoreId()),
)
return
return i
}
}
// Invalidate the region since the new leader is not in the cached version.
s.region.invalidate(StoreNotFound)
return -1
}
func (s *replicaSelector) onServerIsBusy(
@ -1331,7 +1374,7 @@ func (s *replicaSelector) onServerIsBusy(
}
}
} else {
// Mark the server is busy (the next incoming READs could be redirect to expected followers.)
// Mark the server is busy (the next incoming READs could be redirected to expected followers.)
ctx.Store.healthStatus.markAlreadySlow()
}
}
@ -1353,7 +1396,7 @@ func (s *replicaSelector) onDataIsNotReady() {
}
}
func (s *replicaSelector) invalidateRegion() {
func (s *baseReplicaSelector) invalidateRegion() {
if s.region != nil {
s.region.invalidate(Other)
}
@ -1369,14 +1412,14 @@ func (s *RegionRequestSender) getRPCContext(
switch et {
case tikvrpc.TiKV:
if s.replicaSelector == nil {
selector, err := newReplicaSelector(s.regionCache, regionID, req, opts...)
if selector == nil || err != nil {
selector, err := NewReplicaSelector(s.regionCache, regionID, req, opts...)
if err != nil {
s.rpcError = err
return nil, nil
}
s.replicaSelector = selector
}
return s.replicaSelector.next(bo)
return s.replicaSelector.next(bo, req)
case tikvrpc.TiFlash:
// Should ignore WN, because in disaggregated tiflash mode, TiDB will build rpcCtx itself.
return s.regionCache.GetTiFlashRPCContext(bo, regionID, true, LabelFilterNoTiFlashWriteNode)
@ -1520,7 +1563,7 @@ func (s *RegionRequestSender) SendReqCtx(
// and handle this error like EpochNotMatch, which means to re-split the request and retry.
s.logSendReqError(bo, "throwing pseudo region error due to no replica available", regionID, retryTimes, req, totalErrors)
if s.replicaSelector != nil {
if err := s.replicaSelector.backoffOnNoCandidate(bo); err != nil {
if err := s.replicaSelector.getBaseReplicaSelector().backoffOnNoCandidate(bo); err != nil {
return nil, nil, retryTimes, err
}
}
@ -1531,7 +1574,7 @@ func (s *RegionRequestSender) SendReqCtx(
var isLocalTraffic bool
if staleReadCollector != nil && s.replicaSelector != nil {
if target := s.replicaSelector.targetReplica(); target != nil {
isLocalTraffic = target.store.IsLabelsMatch(s.replicaSelector.labels)
isLocalTraffic = target.store.IsLabelsMatch(s.replicaSelector.getLabels())
staleReadCollector.onReq(req, isLocalTraffic)
}
}
@ -1549,13 +1592,13 @@ func (s *RegionRequestSender) SendReqCtx(
req.Context.ClusterId = rpcCtx.ClusterID
rpcCtx.contextPatcher.applyTo(&req.Context)
if req.InputRequestSource != "" && s.replicaSelector != nil {
s.replicaSelector.patchRequestSource(req, rpcCtx)
patchRequestSource(req, s.replicaSelector.replicaType(rpcCtx))
}
if e := tikvrpc.SetContext(req, rpcCtx.Meta, rpcCtx.Peer); e != nil {
return nil, nil, retryTimes, err
}
if s.replicaSelector != nil {
if err := s.replicaSelector.backoffOnRetry(rpcCtx.Store, bo); err != nil {
if err := s.replicaSelector.getBaseReplicaSelector().backoffOnRetry(rpcCtx.Store, bo); err != nil {
return nil, nil, retryTimes, err
}
}
@ -1611,7 +1654,7 @@ func (s *RegionRequestSender) SendReqCtx(
s.logSendReqError(bo, "send request meet region error without retry", regionID, retryTimes, req, totalErrors)
} else {
if s.replicaSelector != nil {
s.replicaSelector.onSendSuccess()
s.replicaSelector.onSendSuccess(req)
}
}
if staleReadCollector != nil {
@ -1783,7 +1826,7 @@ func (s *RegionRequestSender) sendReqToRegion(
resp, err = s.client.SendRequest(ctx, sendToAddr, req, timeout)
rpcDuration := time.Since(start)
if s.replicaSelector != nil {
s.replicaSelector.recordAttemptedTime(rpcDuration)
recordAttemptedTime(s.replicaSelector, rpcDuration)
}
// Record timecost of external requests on related Store when `ReplicaReadMode == "PreferLeader"`.
if rpcCtx.Store != nil && req.ReplicaReadType == kv.ReplicaReadPreferLeader && !util.IsInternalRequest(req.RequestSource) {
@ -2150,17 +2193,7 @@ func (s *RegionRequestSender) onRegionError(
zap.Stringer("ctx", ctx),
)
if req != nil {
// if the failure is caused by replica read, we can retry it with leader safely.
if ctx.contextPatcher.replicaRead != nil && *ctx.contextPatcher.replicaRead {
req.BusyThresholdMs = 0
s.replicaSelector.busyThreshold = 0
ctx.contextPatcher.replicaRead = nil
ctx.contextPatcher.busyThreshold = nil
return true, nil
}
if req.ReplicaReadType.IsFollowerRead() {
s.replicaSelector = nil
req.ReplicaReadType = kv.ReplicaReadLeader
if s.onFlashbackInProgressRegionError(ctx, req) {
return true, nil
}
}
@ -2396,6 +2429,28 @@ func (s *RegionRequestSender) onRegionError(
return false, nil
}
func (s *RegionRequestSender) onFlashbackInProgressRegionError(ctx *RPCContext, req *tikvrpc.Request) bool {
switch selector := s.replicaSelector.(type) {
case *replicaSelector:
// if the failure is caused by replica read, we can retry it with leader safely.
if ctx.contextPatcher.replicaRead != nil && *ctx.contextPatcher.replicaRead {
req.BusyThresholdMs = 0
selector.busyThreshold = 0
ctx.contextPatcher.replicaRead = nil
ctx.contextPatcher.busyThreshold = nil
return true
}
if req.ReplicaReadType.IsFollowerRead() {
s.replicaSelector = nil
req.ReplicaReadType = kv.ReplicaReadLeader
return true
}
case *replicaSelectorV2:
return selector.onFlashbackInProgress(ctx, req)
}
return false
}
type staleReadMetricsCollector struct {
}
@ -2471,7 +2526,7 @@ func (s *replicaSelector) replicaType(rpcCtx *RPCContext) string {
return "unknown"
}
func (s *replicaSelector) patchRequestSource(req *tikvrpc.Request, rpcCtx *RPCContext) {
func patchRequestSource(req *tikvrpc.Request, replicaType string) {
var sb strings.Builder
defer func() {
// TiKV does the limit control by the last part of the request source.
@ -2480,8 +2535,6 @@ func (s *replicaSelector) patchRequestSource(req *tikvrpc.Request, rpcCtx *RPCCo
req.RequestSource = sb.String()
}()
replicaType := s.replicaType(rpcCtx)
if req.IsRetryRequest {
sb.WriteString("retry_")
sb.WriteString(req.ReadType)
@ -2497,7 +2550,7 @@ func (s *replicaSelector) patchRequestSource(req *tikvrpc.Request, rpcCtx *RPCCo
sb.WriteString(req.ReadType)
}
func (s *replicaSelector) recordAttemptedTime(duration time.Duration) {
func recordAttemptedTime(s ReplicaSelector, duration time.Duration) {
if targetReplica := s.targetReplica(); targetReplica != nil {
targetReplica.attemptedTime += duration
}
@ -2523,7 +2576,7 @@ type backoffArgs struct {
}
// addPendingBackoff adds pending backoff for the store.
func (s *replicaSelector) addPendingBackoff(store *Store, cfg *retry.Config, err error) {
func (s *baseReplicaSelector) addPendingBackoff(store *Store, cfg *retry.Config, err error) {
storeId := uint64(0)
if store != nil {
storeId = store.storeID
@ -2535,7 +2588,7 @@ func (s *replicaSelector) addPendingBackoff(store *Store, cfg *retry.Config, err
}
// backoffOnRetry apply pending backoff on the store when retry in this store.
func (s *replicaSelector) backoffOnRetry(store *Store, bo *retry.Backoffer) error {
func (s *baseReplicaSelector) backoffOnRetry(store *Store, bo *retry.Backoffer) error {
storeId := uint64(0)
if store != nil {
storeId = store.storeID
@ -2549,7 +2602,7 @@ func (s *replicaSelector) backoffOnRetry(store *Store, bo *retry.Backoffer) erro
}
// backoffOnNoCandidate apply the largest base pending backoff when no candidate.
func (s *replicaSelector) backoffOnNoCandidate(bo *retry.Backoffer) error {
func (s *baseReplicaSelector) backoffOnNoCandidate(bo *retry.Backoffer) error {
var args *backoffArgs
for _, pbo := range s.pendingBackoffs {
if args == nil || args.cfg.Base() < pbo.cfg.Base() {

View File

@ -235,7 +235,8 @@ func (s *testRegionRequestToThreeStoresSuite) loadAndGetLeaderStore() (*Store, s
}
func (s *testRegionRequestToThreeStoresSuite) TestForwarding() {
s.regionRequestSender.regionCache.enableForwarding = true
sender := NewRegionRequestSender(s.cache, s.regionRequestSender.client)
sender.regionCache.enableForwarding = true
// First get the leader's addr from region cache
leaderStore, leaderAddr := s.loadAndGetLeaderStore()
@ -243,8 +244,8 @@ func (s *testRegionRequestToThreeStoresSuite) TestForwarding() {
bo := retry.NewBackoffer(context.Background(), 10000)
// Simulate that the leader is network-partitioned but can be accessed by forwarding via a follower
innerClient := s.regionRequestSender.client
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
innerClient := sender.client
sender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
if addr == leaderAddr {
return nil, errors.New("simulated rpc error")
}
@ -255,21 +256,21 @@ func (s *testRegionRequestToThreeStoresSuite) TestForwarding() {
return innerClient.SendRequest(ctx, addr, req, timeout)
}}
var storeState = uint32(unreachable)
s.regionRequestSender.regionCache.setMockRequestLiveness(func(ctx context.Context, s *Store) livenessState {
sender.regionCache.setMockRequestLiveness(func(ctx context.Context, s *Store) livenessState {
if s.addr == leaderAddr {
return livenessState(atomic.LoadUint32(&storeState))
}
return reachable
})
loc, err := s.regionRequestSender.regionCache.LocateKey(bo, []byte("k"))
loc, err := sender.regionCache.LocateKey(bo, []byte("k"))
s.Nil(err)
s.Equal(loc.Region.GetID(), s.regionID)
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
Key: []byte("k"),
Value: []byte("v1"),
})
resp, ctx, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
resp, ctx, _, err := sender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
s.Nil(err)
regionErr, err := resp.GetRegionError()
s.Nil(err)
@ -281,7 +282,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestForwarding() {
s.Nil(err)
// Simulate recovering to normal
s.regionRequestSender.client = innerClient
sender.client = innerClient
atomic.StoreUint32(&storeState, uint32(reachable))
start := time.Now()
for {
@ -296,7 +297,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestForwarding() {
atomic.StoreUint32(&storeState, uint32(unreachable))
req = tikvrpc.NewRequest(tikvrpc.CmdRawGet, &kvrpcpb.RawGetRequest{Key: []byte("k")})
resp, ctx, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
resp, ctx, _, err = sender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
s.Nil(err)
regionErr, err = resp.GetRegionError()
s.Nil(err)
@ -305,7 +306,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestForwarding() {
s.Nil(ctx.ProxyStore)
// Simulate server down
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
sender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
if addr == leaderAddr || req.ForwardedHost == leaderAddr {
return nil, errors.New("simulated rpc error")
}
@ -329,7 +330,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestForwarding() {
Key: []byte("k"),
Value: []byte("v2"),
})
resp, ctx, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
resp, ctx, _, err = sender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
s.Nil(err)
regionErr, err = resp.GetRegionError()
s.Nil(err)
@ -337,19 +338,19 @@ func (s *testRegionRequestToThreeStoresSuite) TestForwarding() {
// Then SendReqCtx will throw a pseudo EpochNotMatch to tell the caller to reload the region.
s.NotNil(regionErr.EpochNotMatch)
s.Nil(ctx)
s.Equal(len(s.regionRequestSender.failStoreIDs), 0)
s.Equal(len(s.regionRequestSender.failProxyStoreIDs), 0)
region := s.regionRequestSender.regionCache.GetCachedRegionWithRLock(loc.Region)
s.Equal(len(sender.failStoreIDs), 0)
s.Equal(len(sender.failProxyStoreIDs), 0)
region := sender.regionCache.GetCachedRegionWithRLock(loc.Region)
s.NotNil(region)
s.False(region.isValid())
loc, err = s.regionRequestSender.regionCache.LocateKey(bo, []byte("k"))
loc, err = sender.regionCache.LocateKey(bo, []byte("k"))
s.Nil(err)
req = tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
Key: []byte("k"),
Value: []byte("v2"),
})
resp, ctx, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
resp, ctx, _, err = sender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
s.Nil(err)
regionErr, err = resp.GetRegionError()
s.Nil(err)
@ -428,18 +429,18 @@ func (s *testRegionRequestToThreeStoresSuite) TestLearnerReplicaSelector() {
refreshRegionTTL(region)
refreshEpochs(regionStore)
req.ReplicaReadType = kv.ReplicaReadLearner
replicaSelector, err := newReplicaSelector(cache, regionLoc.Region, req)
replicaSelector, err := NewReplicaSelector(cache, regionLoc.Region, req)
s.NotNil(replicaSelector)
s.Nil(err)
accessLearner, _ := replicaSelector.state.(*accessFollower)
// Invalidate the region if the leader is not in the region.
refreshRegionTTL(region)
rpcCtx, err := replicaSelector.next(s.bo)
rpcCtx, err := replicaSelector.next(s.bo, req)
s.Nil(err)
// Should switch to the next follower.
s.Equal(AccessIndex(tikvLearnerAccessIdx), accessLearner.lastIdx)
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[replicaSelector.targetIdx], nil)
target := replicaSelector.targetReplica()
AssertRPCCtxEqual(s, rpcCtx, target, nil)
s.Equal(target.peer.Role, metapb.PeerRole_Learner)
s.Equal(target.peer.Id, tikvLearner.Id)
}
func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
@ -504,7 +505,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
s.IsType(&accessKnownLeader{}, replicaSelector.state)
// Try the leader for maxReplicaAttempt times
for i := 1; i <= maxReplicaAttempt; i++ {
rpcCtx, err := replicaSelector.next(s.bo)
rpcCtx, err := replicaSelector.next(s.bo, req)
s.Nil(err)
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil)
s.IsType(&accessKnownLeader{}, replicaSelector.state)
@ -513,7 +514,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
// After that it should switch to tryFollower
for i := 0; i < len(replicaSelector.replicas)-1; i++ {
rpcCtx, err := replicaSelector.next(s.bo)
rpcCtx, err := replicaSelector.next(s.bo, req)
s.Nil(err)
state, ok := replicaSelector.state.(*tryFollower)
s.True(ok)
@ -524,7 +525,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
s.Equal(replicaSelector.targetReplica().attempts, 1)
}
// In tryFollower state, if all replicas are tried, nil RPCContext should be returned
rpcCtx, err := replicaSelector.next(s.bo)
rpcCtx, err := replicaSelector.next(s.bo, req)
s.Nil(err)
s.Nil(rpcCtx)
// The region should be invalidated
@ -537,10 +538,10 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
s.NotNil(replicaSelector)
unreachable.injectConstantLiveness(cache)
s.IsType(&accessKnownLeader{}, replicaSelector.state)
_, err = replicaSelector.next(s.bo)
_, err = replicaSelector.next(s.bo, req)
s.Nil(err)
replicaSelector.onSendFailure(s.bo, nil)
rpcCtx, err = replicaSelector.next(s.bo)
rpcCtx, err = replicaSelector.next(s.bo, req)
s.NotNil(rpcCtx)
s.Nil(err)
s.IsType(&tryFollower{}, replicaSelector.state)
@ -561,7 +562,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
s.NotNil(replicaSelector)
s.IsType(&accessKnownLeader{}, replicaSelector.state)
// Now, livenessState is unreachable, so it will try a reachable follower instead of the unreachable leader.
rpcCtx, err = replicaSelector.next(s.bo)
rpcCtx, err = replicaSelector.next(s.bo, req)
s.Nil(err)
s.NotNil(rpcCtx)
_, ok := replicaSelector.state.(*tryFollower)
@ -581,7 +582,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
s.IsType(&accessKnownLeader{}, replicaSelector.state)
// Now, livenessState is unknown. Even if forwarding is enabled, it should try followers
// instead of using the proxy.
rpcCtx, err = replicaSelector.next(s.bo)
rpcCtx, err = replicaSelector.next(s.bo, req)
s.Nil(err)
s.NotNil(rpcCtx)
_, ok = replicaSelector.state.(*tryFollower)
@ -599,7 +600,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
}, 3*time.Second, 200*time.Millisecond)
s.IsType(&accessKnownLeader{}, replicaSelector.state)
// Now, livenessState is unreachable, so it will try a new proxy instead of the leader.
rpcCtx, err = replicaSelector.next(s.bo)
rpcCtx, err = replicaSelector.next(s.bo, req)
s.Nil(err)
s.NotNil(rpcCtx)
state, ok := replicaSelector.state.(*tryNewProxy)
@ -614,7 +615,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
// When the current proxy node fails, it should try another one.
lastProxy := replicaSelector.proxyIdx
replicaSelector.onSendFailure(s.bo, nil)
rpcCtx, err = replicaSelector.next(s.bo)
rpcCtx, err = replicaSelector.next(s.bo, req)
s.NotNil(rpcCtx)
s.Nil(err)
state, ok = replicaSelector.state.(*tryNewProxy)
@ -626,7 +627,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
s.Equal(replicaSelector.proxyReplica().attempts, 1)
// Test proxy store is saves when proxy is enabled
replicaSelector.onSendSuccess()
replicaSelector.onSendSuccess(req)
regionStore = region.getStore()
s.Equal(replicaSelector.proxyIdx, regionStore.proxyTiKVIdx)
@ -639,14 +640,14 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
state2, ok := replicaSelector.state.(*accessByKnownProxy)
s.True(ok)
s.Equal(regionStore.workTiKVIdx, state2.leaderIdx)
_, err = replicaSelector.next(s.bo)
_, err = replicaSelector.next(s.bo, req)
s.Nil(err)
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.targetReplica(), replicaSelector.proxyReplica())
// Switch to tryNewProxy if the current proxy is not available
replicaSelector.onSendFailure(s.bo, nil)
s.IsType(&tryNewProxy{}, replicaSelector.state)
rpcCtx, err = replicaSelector.next(s.bo)
rpcCtx, err = replicaSelector.next(s.bo, req)
s.Nil(err)
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.targetReplica(), replicaSelector.proxyReplica())
s.Equal(regionStore.workTiKVIdx, state2.leaderIdx)
@ -654,6 +655,8 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
s.NotEqual(regionStore.proxyTiKVIdx, replicaSelector.proxyIdx)
s.Equal(replicaSelector.targetReplica().attempts, 2)
s.Equal(replicaSelector.proxyReplica().attempts, 1)
// FIXME: the chosen proxy-replica's store should be reachable.
//s.Equal(replicaSelector.proxyReplica().store.getLivenessState(), reachable)
// Test accessFollower state with kv.ReplicaReadFollower request type.
req = tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}, kv.ReplicaReadFollower, nil)
@ -669,7 +672,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
lastIdx := AccessIndex(-1)
for i := 0; i < regionStore.accessStoreNum(tiKVOnly)-1; i++ {
rpcCtx, err := replicaSelector.next(s.bo)
rpcCtx, err := replicaSelector.next(s.bo, req)
s.Nil(err)
// Should switch to the next follower.
s.NotEqual(lastIdx, state3.lastIdx)
@ -680,13 +683,13 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
lastIdx = state3.lastIdx
}
// Fallback to the leader for 1 time
rpcCtx, err = replicaSelector.next(s.bo)
rpcCtx, err = replicaSelector.next(s.bo, req)
s.Nil(err)
s.Equal(regionStore.workTiKVIdx, state3.lastIdx)
s.Equal(replicaSelector.targetIdx, state3.lastIdx)
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil)
// All replicas are exhausted.
rpcCtx, err = replicaSelector.next(s.bo)
rpcCtx, err = replicaSelector.next(s.bo, req)
s.Nil(rpcCtx)
s.Nil(err)
@ -703,7 +706,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
s.Nil(err)
state3 = replicaSelector.state.(*accessFollower)
// Should fallback to the leader immediately.
rpcCtx, err = replicaSelector.next(s.bo)
rpcCtx, err = replicaSelector.next(s.bo, req)
s.Nil(err)
s.Equal(regionStore.workTiKVIdx, state3.lastIdx)
s.Equal(replicaSelector.targetIdx, state3.lastIdx)
@ -726,7 +729,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req, WithMatchLabels(labels))
s.NotNil(replicaSelector)
s.Nil(err)
rpcCtx, err = replicaSelector.next(s.bo)
rpcCtx, err = replicaSelector.next(s.bo, req)
s.Nil(err)
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[accessIdx], nil)
}
@ -738,7 +741,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req, WithLeaderOnly())
s.NotNil(replicaSelector)
s.Nil(err)
rpcCtx, err = replicaSelector.next(s.bo)
rpcCtx, err = replicaSelector.next(s.bo, req)
s.Nil(err)
// Should always access the leader.
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil)
@ -757,7 +760,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
replicaSelector.updateLeader(&metapb.Peer{Id: s.cluster.AllocID(), StoreId: s.cluster.AllocID()})
s.False(region.isValid())
// Don't try next replica if the region is invalidated.
rpcCtx, err = replicaSelector.next(s.bo)
rpcCtx, err = replicaSelector.next(s.bo, req)
s.Nil(rpcCtx)
s.Nil(err)
}
@ -780,7 +783,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
s.NotNil(regionStore)
reloadRegion := func() {
s.regionRequestSender.replicaSelector.region.invalidate(Other)
s.regionRequestSender.replicaSelector.invalidateRegion()
region, _ = s.cache.LocateRegionByID(s.bo, s.regionID)
regionStore = s.cache.GetCachedRegionWithRLock(region.Region).getStore()
}
@ -811,7 +814,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
resp, _, err = sender.SendReq(bo, req, region.Region, client.ReadTimeoutShort)
s.Nil(err)
s.NotNil(resp)
s.Equal(sender.replicaSelector.targetIdx, AccessIndex(1))
s.Equal(sender.replicaSelector.targetReplica().peer.Id, s.peerIDs[1])
s.True(bo.GetTotalBackoffTimes() == 1)
s.cluster.StartStore(s.storeIDs[0])
atomic.StoreUint32(&regionStore.stores[0].livenessState, uint32(reachable))
@ -822,7 +825,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
resp, _, err = sender.SendReq(bo, req, region.Region, client.ReadTimeoutShort)
s.Nil(err)
s.NotNil(resp)
s.Equal(sender.replicaSelector.targetIdx, AccessIndex(1))
s.Equal(sender.replicaSelector.targetReplica().peer.Id, s.peerIDs[1])
s.True(bo.GetTotalBackoffTimes() == 0)
// Switch to the next peer due to leader failure but the new leader is not elected.
@ -853,13 +856,31 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
s.Nil(err)
s.True(hasFakeRegionError(resp))
s.Equal(bo.GetTotalBackoffTimes(), 3)
s.False(sender.replicaSelector.region.isValid())
getReplicaSelectorRegion := func() *Region {
if selector, ok := sender.replicaSelector.(*replicaSelector); ok {
return selector.region
}
if selector, ok := sender.replicaSelector.(*replicaSelectorV2); ok {
return selector.region
}
return nil
}
s.False(getReplicaSelectorRegion().isValid())
s.cluster.ChangeLeader(s.regionID, s.peerIDs[0])
// The leader store is alive but can't provide service.
getReplicaSelectorRegionStores := func() []*Store {
if selector, ok := sender.replicaSelector.(*replicaSelector); ok {
return selector.regionStore.stores
}
if selector, ok := sender.replicaSelector.(*replicaSelectorV2); ok {
return selector.region.getStore().stores
}
return nil
}
reachable.injectConstantLiveness(s.cache)
s.Eventually(func() bool {
stores := s.regionRequestSender.replicaSelector.regionStore.stores
stores := getReplicaSelectorRegionStores()
return stores[0].getLivenessState() == reachable &&
stores[1].getLivenessState() == reachable &&
stores[2].getLivenessState() == reachable
@ -871,7 +892,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
resp, _, err = sender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.True(hasFakeRegionError(resp))
s.False(sender.replicaSelector.region.isValid())
s.False(getReplicaSelectorRegion().isValid())
s.Equal(bo.GetTotalBackoffTimes(), maxReplicaAttempt+2)
s.cluster.StartStore(s.storeIDs[0])
@ -905,7 +926,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
resp, _, err := sender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.True(hasFakeRegionError(resp))
s.False(sender.replicaSelector.region.isValid())
s.False(getReplicaSelectorRegion().isValid())
s.Equal(bo.GetTotalBackoffTimes(), maxReplicaAttempt+2)
}()
}
@ -925,7 +946,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
resp, _, err := sender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.True(hasFakeRegionError(resp))
s.False(sender.replicaSelector.region.isValid())
s.False(getReplicaSelectorRegion().isValid())
s.Equal(bo.GetTotalBackoffTimes(), 0)
}()
@ -944,7 +965,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
resp, _, err := sender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.True(hasFakeRegionError(resp))
s.False(sender.replicaSelector.region.isValid())
s.False(getReplicaSelectorRegion().isValid())
s.Equal(bo.GetTotalBackoffTimes(), 0)
}()
@ -977,7 +998,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
regionErr, _ := resp.GetRegionError()
s.NotNil(regionErr)
}
s.False(sender.replicaSelector.region.isValid())
s.False(getReplicaSelectorRegion().isValid())
s.Equal(bo.GetTotalBackoffTimes(), 0)
}()
}
@ -993,7 +1014,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
s.Nil(err)
s.True(hasFakeRegionError(resp))
s.True(bo.GetTotalBackoffTimes() == 3)
s.False(sender.replicaSelector.region.isValid())
s.False(getReplicaSelectorRegion().isValid())
for _, store := range s.storeIDs {
s.cluster.StartStore(store)
}
@ -1008,42 +1029,45 @@ func (s *testRegionRequestToThreeStoresSuite) TestLoadBasedReplicaRead() {
BusyThresholdMs: 50,
})
replicaSelector, err := newReplicaSelector(s.cache, regionLoc.Region, req)
replicaSelector, err := NewReplicaSelector(s.cache, regionLoc.Region, req)
s.NotNil(replicaSelector)
s.Nil(err)
s.Equal(replicaSelector.region, region)
s.IsType(&accessKnownLeader{}, replicaSelector.state)
s.Equal(replicaSelector.getBaseReplicaSelector().region, region)
// The busyThreshold in replicaSelector should be initialized with the request context.
s.Equal(replicaSelector.busyThreshold, 50*time.Millisecond)
s.Equal(replicaSelector.getBaseReplicaSelector().busyThreshold, 50*time.Millisecond)
bo := retry.NewBackoffer(context.Background(), -1)
rpcCtx, err := replicaSelector.next(bo)
rpcCtx, err := replicaSelector.next(bo, req)
s.Nil(err)
s.Equal(rpcCtx.Peer.Id, s.leaderPeer)
s.False(req.ReplicaRead)
s.Equal(req.BusyThresholdMs, uint32(50))
// Receive a ServerIsBusy error
replicaSelector.onServerIsBusy(bo, rpcCtx, req, &errorpb.ServerIsBusy{
EstimatedWaitMs: 500,
})
rpcCtx, err = replicaSelector.next(bo)
rpcCtx, err = replicaSelector.next(bo, req)
s.Nil(err)
s.NotEqual(rpcCtx.Peer.Id, s.leaderPeer)
s.IsType(&tryIdleReplica{}, replicaSelector.state)
s.True(*rpcCtx.contextPatcher.replicaRead)
rpcCtx.contextPatcher.applyTo(&req.Context)
s.True(req.ReplicaRead)
s.Equal(req.BusyThresholdMs, uint32(50))
lastPeerID := rpcCtx.Peer.Id
replicaSelector.onServerIsBusy(bo, rpcCtx, req, &errorpb.ServerIsBusy{
EstimatedWaitMs: 800,
})
rpcCtx, err = replicaSelector.next(bo)
rpcCtx, err = replicaSelector.next(bo, req)
s.Nil(err)
// Should choose a peer different from before
s.NotEqual(rpcCtx.Peer.Id, s.leaderPeer)
s.NotEqual(rpcCtx.Peer.Id, lastPeerID)
s.IsType(&tryIdleReplica{}, replicaSelector.state)
s.True(*rpcCtx.contextPatcher.replicaRead)
rpcCtx.contextPatcher.applyTo(&req.Context)
s.True(req.ReplicaRead)
s.Equal(req.BusyThresholdMs, uint32(50))
// All peers are too busy
replicaSelector.onServerIsBusy(bo, rpcCtx, req, &errorpb.ServerIsBusy{
@ -1052,24 +1076,26 @@ func (s *testRegionRequestToThreeStoresSuite) TestLoadBasedReplicaRead() {
lessBusyPeer := rpcCtx.Peer.Id
// Then, send to the leader again with no threshold.
rpcCtx, err = replicaSelector.next(bo)
rpcCtx, err = replicaSelector.next(bo, req)
s.Nil(err)
s.Equal(rpcCtx.Peer.Id, s.leaderPeer)
s.IsType(&tryIdleReplica{}, replicaSelector.state)
s.False(*rpcCtx.contextPatcher.replicaRead)
s.Equal(*rpcCtx.contextPatcher.busyThreshold, time.Duration(0))
rpcCtx.contextPatcher.applyTo(&req.Context)
s.False(req.ReplicaRead)
s.Equal(req.BusyThresholdMs, uint32(0))
s.True(replicaSelector.getBaseReplicaSelector().region.isValid()) // don't invalidate region when can't find an idle replica.
time.Sleep(120 * time.Millisecond)
// When there comes a new request, it should skip busy leader and choose a less busy store
replicaSelector, err = newReplicaSelector(s.cache, regionLoc.Region, req)
req.BusyThresholdMs = 50
replicaSelector, err = NewReplicaSelector(s.cache, regionLoc.Region, req)
s.NotNil(replicaSelector)
s.Nil(err)
rpcCtx, err = replicaSelector.next(bo)
rpcCtx, err = replicaSelector.next(bo, req)
s.Nil(err)
s.Equal(rpcCtx.Peer.Id, lessBusyPeer)
s.IsType(&tryIdleReplica{}, replicaSelector.state)
s.True(*rpcCtx.contextPatcher.replicaRead)
rpcCtx.contextPatcher.applyTo(&req.Context)
s.True(req.ReplicaRead)
}
func (s *testRegionRequestToThreeStoresSuite) TestReplicaReadWithFlashbackInProgress() {
@ -1381,7 +1407,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestRetryRequestSource() {
}
}
setTargetReplica := func(selector *replicaSelector, readType string) {
setTargetReplica := func(selector ReplicaSelector, readType string) {
var leader bool
switch readType {
case "leader", "stale_leader":
@ -1391,13 +1417,23 @@ func (s *testRegionRequestToThreeStoresSuite) TestRetryRequestSource() {
default:
panic("unreachable")
}
for idx, replica := range selector.replicas {
for idx, replica := range selector.getBaseReplicaSelector().replicas {
if replica.store.storeID == leaderStore.storeID && leader {
selector.targetIdx = AccessIndex(idx)
if v1, ok := selector.(*replicaSelector); ok {
v1.targetIdx = AccessIndex(idx)
}
if v2, ok := selector.(*replicaSelectorV2); ok {
v2.target = replica
}
return
}
if replica.store.storeID != leaderStore.storeID && !leader {
selector.targetIdx = AccessIndex(idx)
if v1, ok := selector.(*replicaSelector); ok {
v1.targetIdx = AccessIndex(idx)
}
if v2, ok := selector.(*replicaSelectorV2); ok {
v2.target = replica
}
return
}
}
@ -1411,23 +1447,23 @@ func (s *testRegionRequestToThreeStoresSuite) TestRetryRequestSource() {
bo := retry.NewBackoffer(context.Background(), -1)
req.IsRetryRequest = false
setReadType(req, firstReplica)
replicaSelector, err := newReplicaSelector(s.cache, regionLoc.Region, req)
replicaSelector, err := NewReplicaSelector(s.cache, regionLoc.Region, req)
s.Nil(err)
setTargetReplica(replicaSelector, firstReplica)
rpcCtx, err := replicaSelector.buildRPCContext(bo)
rpcCtx, err := replicaSelector.getBaseReplicaSelector().buildRPCContext(bo, replicaSelector.targetReplica(), replicaSelector.proxyReplica())
s.Nil(err)
replicaSelector.patchRequestSource(req, rpcCtx)
patchRequestSource(req, replicaSelector.replicaType(rpcCtx))
s.Equal(firstReplica+"_test", req.RequestSource)
// retry
setReadType(req, retryReplica)
replicaSelector, err = newReplicaSelector(s.cache, regionLoc.Region, req)
replicaSelector, err = NewReplicaSelector(s.cache, regionLoc.Region, req)
s.Nil(err)
setTargetReplica(replicaSelector, retryReplica)
rpcCtx, err = replicaSelector.buildRPCContext(bo)
rpcCtx, err = replicaSelector.getBaseReplicaSelector().buildRPCContext(bo, replicaSelector.targetReplica(), replicaSelector.proxyReplica())
s.Nil(err)
req.IsRetryRequest = true
replicaSelector.patchRequestSource(req, rpcCtx)
patchRequestSource(req, replicaSelector.replicaType(rpcCtx))
s.Equal("retry_"+firstReplica+"_"+retryReplica+"_test", req.RequestSource)
}
}

View File

@ -0,0 +1,536 @@
// Copyright 2024 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package locate
import (
"fmt"
"time"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pkg/errors"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/config/retry"
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/tikvrpc"
)
type ReplicaSelector interface {
next(bo *retry.Backoffer, req *tikvrpc.Request) (*RPCContext, error)
targetReplica() *replica
proxyReplica() *replica
replicaType(rpcCtx *RPCContext) string
String() string
getBaseReplicaSelector() *baseReplicaSelector
getLabels() []*metapb.StoreLabel
onSendSuccess(req *tikvrpc.Request)
onSendFailure(bo *retry.Backoffer, err error)
invalidateRegion()
// Following methods are used to handle region errors.
onNotLeader(bo *retry.Backoffer, ctx *RPCContext, notLeader *errorpb.NotLeader) (shouldRetry bool, err error)
onDataIsNotReady()
onServerIsBusy(bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, serverIsBusy *errorpb.ServerIsBusy) (shouldRetry bool, err error)
onReadReqConfigurableTimeout(req *tikvrpc.Request) bool
}
// NewReplicaSelector returns a new ReplicaSelector.
func NewReplicaSelector(
regionCache *RegionCache, regionID RegionVerID, req *tikvrpc.Request, opts ...StoreSelectorOption,
) (ReplicaSelector, error) {
if config.GetGlobalConfig().EnableReplicaSelectorV2 {
return newReplicaSelectorV2(regionCache, regionID, req, opts...)
}
return newReplicaSelector(regionCache, regionID, req, opts...)
}
type replicaSelectorV2 struct {
baseReplicaSelector
replicaReadType kv.ReplicaReadType
isStaleRead bool
isReadOnlyReq bool
option storeSelectorOp
target *replica
proxy *replica
attempts int
}
func newReplicaSelectorV2(
regionCache *RegionCache, regionID RegionVerID, req *tikvrpc.Request, opts ...StoreSelectorOption,
) (*replicaSelectorV2, error) {
cachedRegion := regionCache.GetCachedRegionWithRLock(regionID)
if cachedRegion == nil || !cachedRegion.isValid() {
return nil, errors.New("cached region invalid")
}
replicas := buildTiKVReplicas(cachedRegion)
option := storeSelectorOp{}
for _, op := range opts {
op(&option)
}
return &replicaSelectorV2{
baseReplicaSelector: baseReplicaSelector{
regionCache: regionCache,
region: cachedRegion,
replicas: replicas,
busyThreshold: time.Duration(req.BusyThresholdMs) * time.Millisecond,
},
replicaReadType: req.ReplicaReadType,
isStaleRead: req.StaleRead,
isReadOnlyReq: isReadReq(req.Type),
option: option,
target: nil,
attempts: 0,
}, nil
}
func (s *replicaSelectorV2) next(bo *retry.Backoffer, req *tikvrpc.Request) (rpcCtx *RPCContext, err error) {
if !s.region.isValid() {
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("invalid").Inc()
return nil, nil
}
s.attempts++
s.target = nil
s.proxy = nil
switch s.replicaReadType {
case kv.ReplicaReadLeader:
s.nextForReplicaReadLeader(req)
default:
s.nextForReplicaReadMixed(req)
}
if s.target == nil {
return nil, nil
}
return s.buildRPCContext(bo, s.target, s.proxy)
}
func (s *replicaSelectorV2) nextForReplicaReadLeader(req *tikvrpc.Request) {
if s.regionCache.enableForwarding {
strategy := ReplicaSelectLeaderWithProxyStrategy{}
s.target, s.proxy = strategy.next(s.replicas, s.region)
if s.target != nil && s.proxy != nil {
return
}
}
leaderIdx := s.region.getStore().workTiKVIdx
strategy := ReplicaSelectLeaderStrategy{leaderIdx: leaderIdx}
s.target = strategy.next(s.replicas)
if s.target != nil && s.busyThreshold > 0 && s.isReadOnlyReq && (s.target.store.EstimatedWaitTime() > s.busyThreshold || s.target.serverIsBusy) {
// If the leader is busy in our estimation, try other idle replicas.
// If other replicas are all busy, tryIdleReplica will try the leader again without busy threshold.
mixedStrategy := ReplicaSelectMixedStrategy{leaderIdx: leaderIdx, busyThreshold: s.busyThreshold}
idleTarget := mixedStrategy.next(s, s.region)
if idleTarget != nil {
s.target = idleTarget
req.ReplicaRead = true
} else {
// No threshold if all peers are too busy, remove busy threshold and still use leader.
s.busyThreshold = 0
req.BusyThresholdMs = 0
req.ReplicaRead = false
}
}
if s.target != nil {
return
}
mixedStrategy := ReplicaSelectMixedStrategy{leaderIdx: leaderIdx, leaderOnly: s.option.leaderOnly}
s.target = mixedStrategy.next(s, s.region)
if s.target != nil && s.isReadOnlyReq && s.replicas[leaderIdx].deadlineErrUsingConfTimeout {
req.ReplicaRead = true
req.StaleRead = false
}
}
func (s *replicaSelectorV2) nextForReplicaReadMixed(req *tikvrpc.Request) {
leaderIdx := s.region.getStore().workTiKVIdx
if s.isStaleRead && s.attempts == 2 {
// For stale read second retry, try leader by leader read.
strategy := ReplicaSelectLeaderStrategy{leaderIdx: leaderIdx}
s.target = strategy.next(s.replicas)
if s.target != nil && !s.target.isExhausted(1, 0) {
// For stale read, don't retry leader again if it is accessed at the first attempt.
req.StaleRead = false
req.ReplicaRead = false
return
}
}
preferLeader := req.ReplicaReadType == kv.ReplicaReadPreferLeader
if s.attempts > 1 {
if req.ReplicaReadType == kv.ReplicaReadMixed {
// For mixed read retry, prefer retry leader first.
preferLeader = true
}
}
strategy := ReplicaSelectMixedStrategy{
leaderIdx: leaderIdx,
tryLeader: req.ReplicaReadType == kv.ReplicaReadMixed || req.ReplicaReadType == kv.ReplicaReadPreferLeader,
preferLeader: preferLeader,
leaderOnly: s.option.leaderOnly,
learnerOnly: req.ReplicaReadType == kv.ReplicaReadLearner,
labels: s.option.labels,
stores: s.option.stores,
}
s.target = strategy.next(s, s.region)
if s.target != nil {
if s.isStaleRead && s.attempts == 1 {
// stale-read request first access.
if !s.target.store.IsLabelsMatch(s.option.labels) && s.target.peer.Id != s.region.GetLeaderPeerID() {
// If the target replica's labels is not match and not leader, use replica read.
// This is for compatible with old version.
req.StaleRead = false
req.ReplicaRead = true
} else {
// use stale read.
req.StaleRead = true
req.ReplicaRead = false
}
} else {
// always use replica.
req.StaleRead = false
req.ReplicaRead = s.isReadOnlyReq
}
}
}
type ReplicaSelectLeaderStrategy struct {
leaderIdx AccessIndex
}
func (s ReplicaSelectLeaderStrategy) next(replicas []*replica) *replica {
leader := replicas[s.leaderIdx]
if isLeaderCandidate(leader) {
return leader
}
return nil
}
// ReplicaSelectMixedStrategy is used to select a replica by calculating a score for each replica, and then choose the one with the highest score.
// Attention, if you want the leader replica must be chosen in some case, you should use ReplicaSelectLeaderStrategy, instead of use ReplicaSelectMixedStrategy with preferLeader flag.
type ReplicaSelectMixedStrategy struct {
leaderIdx AccessIndex
tryLeader bool
preferLeader bool
leaderOnly bool
learnerOnly bool
labels []*metapb.StoreLabel
stores []uint64
busyThreshold time.Duration
}
func (s *ReplicaSelectMixedStrategy) next(selector *replicaSelectorV2, region *Region) *replica {
replicas := selector.replicas
maxScoreIdxes := make([]int, 0, len(replicas))
maxScore := -1
reloadRegion := false
for i, r := range replicas {
epochStale := r.isEpochStale()
liveness := r.store.getLivenessState()
isLeader := AccessIndex(i) == s.leaderIdx
if epochStale && ((liveness == reachable && r.store.getResolveState() == resolved) || isLeader) {
reloadRegion = true
}
if !s.isCandidate(r, isLeader, epochStale, liveness) {
continue
}
score := s.calculateScore(r, isLeader)
if score > maxScore {
maxScore = score
maxScoreIdxes = append(maxScoreIdxes[:0], i)
} else if score == maxScore && score > -1 {
maxScoreIdxes = append(maxScoreIdxes, i)
}
}
if reloadRegion {
selector.region.setSyncFlags(needDelayedReloadPending)
}
if len(maxScoreIdxes) == 1 {
idx := maxScoreIdxes[0]
return replicas[idx]
} else if len(maxScoreIdxes) > 1 {
// if there are more than one replica with the same max score, we will randomly select one
// todo: consider use store statistics information to select a faster one.
idx := maxScoreIdxes[randIntn(len(maxScoreIdxes))]
return replicas[idx]
}
if s.busyThreshold > 0 {
// when can't find an idle replica, no need to invalidate region.
return nil
}
// when meet deadline exceeded error, do fast retry without invalidate region cache.
if !hasDeadlineExceededError(selector.replicas) {
selector.invalidateRegion()
}
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc()
return nil
}
func (s *ReplicaSelectMixedStrategy) isCandidate(r *replica, isLeader bool, epochStale bool, liveness livenessState) bool {
if epochStale || liveness == unreachable {
// the replica is not available, skip it.
return false
}
maxAttempt := 1
if r.dataIsNotReady && !isLeader {
// If the replica is failed by data not ready with stale read, we can retry it with replica-read.
// after https://github.com/tikv/tikv/pull/15726, the leader will not return DataIsNotReady error,
// then no need to retry leader again, if you try it again, you may got a NotLeader error.
maxAttempt = 2
}
if r.isExhausted(maxAttempt, 0) {
// attempts is exhausted, skip it.
return false
}
if s.leaderOnly && !isLeader {
return false
}
if s.busyThreshold > 0 && (r.store.EstimatedWaitTime() > s.busyThreshold || r.serverIsBusy || isLeader) {
return false
}
return true
}
const (
// The definition of the score is:
// MSB LSB
// [unused bits][1 bit: LabelMatches][1 bit: PreferLeader][2 bits: NormalPeer + NotSlow]
flagLabelMatches = 1 << 4
flagPreferLeader = 1 << 3
flagNormalPeer = 1 << 2
flagNotSlow = 1 << 1
flagNotAttempt = 1
)
// calculateScore calculates the score of the replica.
func (s *ReplicaSelectMixedStrategy) calculateScore(r *replica, isLeader bool) int {
score := 0
if r.store.IsStoreMatch(s.stores) && r.store.IsLabelsMatch(s.labels) {
score |= flagLabelMatches
}
if isLeader {
if s.preferLeader {
score |= flagPreferLeader
} else if s.tryLeader {
if len(s.labels) > 0 {
// When the leader has matching labels, prefer leader than other mismatching peers.
score |= flagPreferLeader
} else {
score |= flagNormalPeer
}
}
} else {
if s.learnerOnly {
if r.peer.Role == metapb.PeerRole_Learner {
score |= flagNormalPeer
}
} else {
score |= flagNormalPeer
}
}
if !r.store.healthStatus.IsSlow() {
score |= flagNotSlow
}
if r.attempts == 0 {
score |= flagNotAttempt
}
return score
}
type ReplicaSelectLeaderWithProxyStrategy struct{}
func (s ReplicaSelectLeaderWithProxyStrategy) next(replicas []*replica, region *Region) (leader *replica, proxy *replica) {
rs := region.getStore()
leaderIdx := rs.workTiKVIdx
leader = replicas[leaderIdx]
if leader.store.getLivenessState() == reachable || leader.notLeader {
// if leader's store is reachable, no need use proxy.
rs.unsetProxyStoreIfNeeded(region)
return nil, nil
}
proxyIdx := rs.proxyTiKVIdx
if proxyIdx >= 0 && int(proxyIdx) < len(replicas) && s.isCandidate(replicas[proxyIdx], proxyIdx == leaderIdx) {
return leader, replicas[proxyIdx]
}
for i, r := range replicas {
if s.isCandidate(r, AccessIndex(i) == leaderIdx) {
return leader, r
}
}
return nil, nil
}
func (s ReplicaSelectLeaderWithProxyStrategy) isCandidate(r *replica, isLeader bool) bool {
if isLeader ||
r.isExhausted(1, 0) ||
r.store.getLivenessState() != reachable ||
r.isEpochStale() {
// check epoch here, if epoch staled, we can try other replicas. instead of buildRPCContext failed and invalidate region then retry.
return false
}
return true
}
func (s *replicaSelectorV2) onNotLeader(
bo *retry.Backoffer, ctx *RPCContext, notLeader *errorpb.NotLeader,
) (shouldRetry bool, err error) {
if s.target != nil {
s.target.notLeader = true
}
leaderIdx, err := s.baseReplicaSelector.onNotLeader(bo, ctx, notLeader)
if err != nil {
return false, err
}
if leaderIdx >= 0 {
if isLeaderCandidate(s.replicas[leaderIdx]) {
s.replicaReadType = kv.ReplicaReadLeader
}
}
return true, nil
}
func (s *replicaSelectorV2) onFlashbackInProgress(ctx *RPCContext, req *tikvrpc.Request) bool {
// if the failure is caused by replica read, we can retry it with leader safely.
if req.ReplicaRead && s.target != nil && s.target.peer.Id != s.region.GetLeaderPeerID() {
req.BusyThresholdMs = 0
s.busyThreshold = 0
s.replicaReadType = kv.ReplicaReadLeader
req.ReplicaReadType = kv.ReplicaReadLeader
return true
}
return false
}
func (s *replicaSelectorV2) onDataIsNotReady() {
if s.target != nil {
s.target.dataIsNotReady = true
}
}
func (s *replicaSelectorV2) onServerIsBusy(
bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, serverIsBusy *errorpb.ServerIsBusy,
) (shouldRetry bool, err error) {
var store *Store
if ctx != nil && ctx.Store != nil {
store = ctx.Store
if serverIsBusy.EstimatedWaitMs != 0 {
ctx.Store.updateServerLoadStats(serverIsBusy.EstimatedWaitMs)
if s.busyThreshold != 0 && isReadReq(req.Type) {
// do not retry with batched coprocessor requests.
// it'll be region misses if we send the tasks to replica.
if req.Type == tikvrpc.CmdCop && len(req.Cop().Tasks) > 0 {
return false, nil
}
if s.target != nil {
s.target.serverIsBusy = true
}
}
} else {
// Mark the server is busy (the next incoming READs could be redirected to expected followers.)
ctx.Store.healthStatus.markAlreadySlow()
}
}
backoffErr := errors.Errorf("server is busy, ctx: %v", ctx)
if s.canFastRetry() {
s.addPendingBackoff(store, retry.BoTiKVServerBusy, backoffErr)
return true, nil
}
err = bo.Backoff(retry.BoTiKVServerBusy, backoffErr)
if err != nil {
return false, err
}
return true, nil
}
func (s *replicaSelectorV2) canFastRetry() bool {
if s.replicaReadType == kv.ReplicaReadLeader {
leaderIdx := s.region.getStore().workTiKVIdx
leader := s.replicas[leaderIdx]
if isLeaderCandidate(leader) && !leader.serverIsBusy {
return false
}
}
return true
}
func (s *replicaSelectorV2) onReadReqConfigurableTimeout(req *tikvrpc.Request) bool {
if isReadReqConfigurableTimeout(req) {
if s.target != nil {
s.target.deadlineErrUsingConfTimeout = true
}
return true
}
return false
}
func (s *replicaSelectorV2) onSendFailure(bo *retry.Backoffer, err error) {
metrics.RegionCacheCounterWithSendFail.Inc()
// todo: mark store need check and return to fast retry.
target := s.target
if s.proxy != nil {
target = s.proxy
}
liveness := s.checkLiveness(bo, target)
if s.replicaReadType == kv.ReplicaReadLeader && s.proxy == nil && s.target != nil && s.target.peer.Id == s.region.GetLeaderPeerID() &&
liveness == unreachable && len(s.replicas) > 1 && s.regionCache.enableForwarding {
// just return to use proxy.
return
}
if liveness != reachable {
s.invalidateReplicaStore(target, err)
}
}
func (s *replicaSelectorV2) onSendSuccess(req *tikvrpc.Request) {
if s.proxy != nil && s.target != nil {
for idx, r := range s.replicas {
if r.peer.Id == s.proxy.peer.Id {
s.region.getStore().setProxyStoreIdx(s.region, AccessIndex(idx))
break
}
}
}
if s.target != nil && s.target.peer.Id != s.region.GetLeaderPeerID() && req != nil && !req.StaleRead && !req.ReplicaRead {
s.region.switchWorkLeaderToPeer(s.target.peer)
}
}
func (s *replicaSelectorV2) targetReplica() *replica {
return s.target
}
func (s *replicaSelectorV2) proxyReplica() *replica {
return s.proxy
}
func (s *replicaSelectorV2) getLabels() []*metapb.StoreLabel {
return s.option.labels
}
func (s *replicaSelectorV2) replicaType(_ *RPCContext) string {
if s.target != nil {
if s.target.peer.Id == s.region.GetLeaderPeerID() {
return "leader"
}
return "follower"
}
return "unknown"
}
func (s *replicaSelectorV2) String() string {
if s == nil {
return ""
}
return fmt.Sprintf("replicaSelectorV2{replicaReadType: %v, attempts: %v, %v}", s.replicaReadType.String(), s.attempts, s.baseReplicaSelector.String())
}

File diff suppressed because it is too large Load Diff

View File

@ -427,7 +427,15 @@ func (c *Cluster) AddPeer(regionID, storeID, peerID uint64) {
c.Lock()
defer c.Unlock()
c.regions[regionID].addPeer(peerID, storeID)
c.regions[regionID].addPeer(peerID, storeID, metapb.PeerRole_Voter)
}
// AddLearner adds a new learner for the Region on the Store.
func (c *Cluster) AddLearner(regionID, storeID, peerID uint64) {
c.Lock()
defer c.Unlock()
c.regions[regionID].addPeer(peerID, storeID, metapb.PeerRole_Learner)
}
// RemovePeer removes the Peer from the Region. Note that if the Peer is leader,
@ -666,8 +674,10 @@ func newRegion(regionID uint64, storeIDs, peerIDs []uint64, leaderPeerID uint64,
}
}
func (r *Region) addPeer(peerID, storeID uint64) {
r.Meta.Peers = append(r.Meta.Peers, newPeerMeta(peerID, storeID))
func (r *Region) addPeer(peerID, storeID uint64, role metapb.PeerRole) {
peer := newPeerMeta(peerID, storeID)
peer.Role = role
r.Meta.Peers = append(r.Meta.Peers, peer)
r.incConfVer()
}

20
util/israce/israce.go Normal file
View File

@ -0,0 +1,20 @@
// Copyright 2024 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build race
package israce
// RaceEnabled checks if race is enabled.
const RaceEnabled = true

20
util/israce/norace.go Normal file
View File

@ -0,0 +1,20 @@
// Copyright 2024 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build !race
package israce
// RaceEnabled checks if race is enabled.
const RaceEnabled = false