Make slow store filtering the highest priority in replica selector v2 (#1267)

* Add some logs

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Make slow store filtering the highest priority in replica selector v2

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Add non stale read case to TestMultiReplicaInOneAZ

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Enrich the multi replcia in one AZ case but it failed...

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* update test to adapt the fix on master branch

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Remove TestMultiReplicaInOneAZ

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

---------

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
Co-authored-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
This commit is contained in:
MyonKeminta 2024-04-08 14:23:38 +08:00 committed by GitHub
parent 8fc819c1ca
commit 642a09bef1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 272 additions and 43 deletions

2
go.sum
View File

@ -111,8 +111,6 @@ github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/pd/client v0.0.0-20240319071242-d3b94c97c12b h1:LUeYme5++BRU4DSEi2BmdIki0dRki4dFt2/8IhmIXy4=
github.com/tikv/pd/client v0.0.0-20240319071242-d3b94c97c12b/go.mod h1:Z/QAgOt29zvwBTd0H6pdx45VO6KRNc/O/DzGkVmSyZg=
github.com/tikv/pd/client v0.0.0-20240320081713-c00c42e77b31 h1:qiIt9AyEUW5yabTbCIgwxSMKi3p8ZE/YAk1Z6+fJq8M=
github.com/tikv/pd/client v0.0.0-20240320081713-c00c42e77b31/go.mod h1:Z/QAgOt29zvwBTd0H6pdx45VO6KRNc/O/DzGkVmSyZg=
github.com/twmb/murmur3 v1.1.3 h1:D83U0XYKcHRYwYIpBKf3Pks91Z0Byda/9SJ8B6EMRcA=

View File

@ -655,8 +655,23 @@ type RegionCache struct {
clusterID uint64
}
type regionCacheOptions struct {
noHealthTick bool
}
type RegionCacheOpt func(*regionCacheOptions)
func RegionCacheNoHealthTick(o *regionCacheOptions) {
o.noHealthTick = true
}
// NewRegionCache creates a RegionCache.
func NewRegionCache(pdClient pd.Client) *RegionCache {
func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache {
var options regionCacheOptions
for _, o := range opt {
o(&options)
}
c := &RegionCache{
pdClient: pdClient,
}
@ -705,7 +720,9 @@ func NewRegionCache(pdClient pd.Client) *RegionCache {
needCheckStores = c.checkAndResolve(needCheckStores[:0], func(s *Store) bool { return filter(s.getResolveState()) })
return false
}, time.Duration(refreshStoreInterval/4)*time.Second, c.getCheckStoreEvents())
c.bg.schedule(repeat(c.checkAndUpdateStoreHealthStatus), time.Duration(refreshStoreInterval/4)*time.Second)
if !options.noHealthTick {
c.bg.schedule(repeat(c.checkAndUpdateStoreHealthStatus), time.Duration(refreshStoreInterval/4)*time.Second)
}
c.bg.schedule(repeat(c.reportStoreReplicaFlows), time.Duration(refreshStoreInterval/2)*time.Second)
if refreshCacheInterval := config.GetGlobalConfig().RegionsRefreshInterval; refreshCacheInterval > 0 {
c.bg.schedule(func(ctx context.Context, _ time.Time) bool {

View File

@ -2088,7 +2088,7 @@ func (s *testRegionCacheSuite) TestHealthCheckWithStoreReplace() {
}
func (s *testRegionCacheSuite) TestTiKVSideSlowScore() {
stats := newStoreHealthStatus()
stats := newStoreHealthStatus(1)
s.LessOrEqual(stats.GetHealthStatusDetail().TiKVSideSlowScore, int64(1))
now := time.Now()
stats.tick(now)
@ -2124,7 +2124,7 @@ func (s *testRegionCacheSuite) TestTiKVSideSlowScore() {
}
func (s *testRegionCacheSuite) TestStoreHealthStatus() {
stats := newStoreHealthStatus()
stats := newStoreHealthStatus(1)
now := time.Now()
s.False(stats.IsSlow())

View File

@ -236,7 +236,7 @@ type ReplicaSelectMixedStrategy struct {
func (s *ReplicaSelectMixedStrategy) next(selector *replicaSelectorV2, region *Region) *replica {
replicas := selector.replicas
maxScoreIdxes := make([]int, 0, len(replicas))
maxScore := -1
var maxScore storeSelectionScore = -1
reloadRegion := false
for i, r := range replicas {
epochStale := r.isEpochStale()
@ -289,7 +289,7 @@ func (s *ReplicaSelectMixedStrategy) isCandidate(r *replica, isLeader bool, epoc
if r.dataIsNotReady && !isLeader {
// If the replica is failed by data not ready with stale read, we can retry it with replica-read.
// after https://github.com/tikv/tikv/pull/15726, the leader will not return DataIsNotReady error,
// then no need to retry leader again, if you try it again, you may got a NotLeader error.
// then no need to retry leader again. If you try it again, you may get a NotLeader error.
maxAttempt = 2
}
if r.isExhausted(maxAttempt, 0) {
@ -310,20 +310,51 @@ func (s *ReplicaSelectMixedStrategy) isCandidate(r *replica, isLeader bool, epoc
return true
}
type storeSelectionScore int64
const (
// The definition of the score is:
// MSB LSB
// [unused bits][1 bit: LabelMatches][1 bit: PreferLeader][2 bits: NormalPeer + NotSlow]
flagLabelMatches = 1 << 4
flagPreferLeader = 1 << 3
flagNormalPeer = 1 << 2
flagNotSlow = 1 << 1
flagNotAttempt = 1
// MSB LSB
// [unused bits][1 bit: NotSlow][1 bit: LabelMatches][1 bit: PreferLeader][1 bit: NormalPeer][1 bit: NotAttempted]
flagNotAttempted storeSelectionScore = 1 << iota
flagNormalPeer
flagPreferLeader
flagLabelMatches
flagNotSlow
)
func (s storeSelectionScore) String() string {
if s == 0 {
return "0"
}
res := ""
appendFactor := func(name string) {
if len(res) != 0 {
res += "|"
}
res += name
}
if (s & flagNotSlow) != 0 {
appendFactor("NotSlow")
}
if (s & flagLabelMatches) != 0 {
appendFactor("LableMatches")
}
if (s & flagPreferLeader) != 0 {
appendFactor("PreferLeader")
}
if (s & flagNormalPeer) != 0 {
appendFactor("NormalPeer")
}
if (s & flagNotAttempted) != 0 {
appendFactor("NotAttempted")
}
return res
}
// calculateScore calculates the score of the replica.
func (s *ReplicaSelectMixedStrategy) calculateScore(r *replica, isLeader bool) int {
score := 0
func (s *ReplicaSelectMixedStrategy) calculateScore(r *replica, isLeader bool) storeSelectionScore {
var score storeSelectionScore = 0
if r.store.IsStoreMatch(s.stores) && r.store.IsLabelsMatch(s.labels) {
score |= flagLabelMatches
}
@ -338,7 +369,8 @@ func (s *ReplicaSelectMixedStrategy) calculateScore(r *replica, isLeader bool) i
}
} else if s.tryLeader {
if len(s.labels) > 0 {
// When the leader has matching labels, prefer leader than other mismatching peers.
// When label matching is enabled, prefer selecting the leader for replicas that has same label-matching
// results.
score |= flagPreferLeader
} else {
score |= flagNormalPeer
@ -357,7 +389,7 @@ func (s *ReplicaSelectMixedStrategy) calculateScore(r *replica, isLeader bool) i
score |= flagNotSlow
}
if r.attempts == 0 {
score |= flagNotAttempt
score |= flagNotAttempted
}
return score
}

View File

@ -52,7 +52,8 @@ func (s *testReplicaSelectorSuite) SetupTest(t *testing.T) {
s.cluster = mocktikv.NewCluster(s.mvccStore)
s.storeIDs, s.peerIDs, s.regionID, s.leaderPeer = mocktikv.BootstrapWithMultiStores(s.cluster, 3)
pdCli := &CodecPDClient{mocktikv.NewPDClient(s.cluster), apicodec.NewCodecV1(apicodec.ModeTxn)}
s.cache = NewRegionCache(pdCli)
// Disable the tick on health status.
s.cache = NewRegionCache(pdCli, RegionCacheNoHealthTick)
s.bo = retry.NewNoopBackoff(context.Background())
s.SetT(t)
s.SetS(s)
@ -166,29 +167,29 @@ func TestReplicaSelectorCalculateScore(t *testing.T) {
score := strategy.calculateScore(r, isLeader)
s.Equal(r.store.healthStatus.IsSlow(), false)
if isLeader {
s.Equal(score, flagLabelMatches+flagNotSlow+flagNotAttempt)
s.Equal(score, flagLabelMatches+flagNotSlow+flagNotAttempted)
} else {
s.Equal(score, flagLabelMatches+flagNormalPeer+flagNotSlow+flagNotAttempt)
s.Equal(score, flagLabelMatches+flagNormalPeer+flagNotSlow+flagNotAttempted)
}
r.store.healthStatus.markAlreadySlow()
s.Equal(r.store.healthStatus.IsSlow(), true)
score = strategy.calculateScore(r, isLeader)
if isLeader {
s.Equal(score, flagLabelMatches+flagNotAttempt)
s.Equal(score, flagLabelMatches+flagNotAttempted)
} else {
s.Equal(score, flagLabelMatches+flagNormalPeer+flagNotAttempt)
s.Equal(score, flagLabelMatches+flagNormalPeer+flagNotAttempted)
}
strategy.tryLeader = true
score = strategy.calculateScore(r, isLeader)
s.Equal(score, flagLabelMatches+flagNormalPeer+flagNotAttempt)
s.Equal(score, flagLabelMatches+flagNormalPeer+flagNotAttempted)
strategy.preferLeader = true
score = strategy.calculateScore(r, isLeader)
s.Equal(score, flagLabelMatches+flagNormalPeer+flagNotAttempt)
s.Equal(score, flagLabelMatches+flagNormalPeer+flagNotAttempted)
strategy.learnerOnly = true
strategy.tryLeader = false
strategy.preferLeader = false
score = strategy.calculateScore(r, isLeader)
s.Equal(score, flagLabelMatches+flagNotAttempt)
s.Equal(score, flagLabelMatches+flagNotAttempted)
labels := []*metapb.StoreLabel{
{
Key: "zone",
@ -197,7 +198,7 @@ func TestReplicaSelectorCalculateScore(t *testing.T) {
}
strategy.labels = labels
score = strategy.calculateScore(r, isLeader)
s.Equal(score, flagNotAttempt)
s.Equal(score, flagNotAttempted)
strategy = ReplicaSelectMixedStrategy{
leaderIdx: rc.getStore().workTiKVIdx,
@ -206,9 +207,9 @@ func TestReplicaSelectorCalculateScore(t *testing.T) {
}
score = strategy.calculateScore(r, isLeader)
if isLeader {
s.Equal(score, flagPreferLeader+flagNotAttempt)
s.Equal(score, flagPreferLeader+flagNotAttempted)
} else {
s.Equal(score, flagNormalPeer+flagNotAttempt)
s.Equal(score, flagNormalPeer+flagNotAttempted)
}
strategy = ReplicaSelectMixedStrategy{
@ -217,10 +218,10 @@ func TestReplicaSelectorCalculateScore(t *testing.T) {
labels: labels,
}
score = strategy.calculateScore(r, isLeader)
s.Equal(score, flagNormalPeer+flagNotAttempt)
s.Equal(score, flagNormalPeer+flagNotAttempted)
r.store.labels = labels
score = strategy.calculateScore(r, isLeader)
s.Equal(score, flagLabelMatches+flagNormalPeer+flagNotAttempt)
s.Equal(score, flagLabelMatches+flagNormalPeer+flagNotAttempted)
r.store.labels = nil
}
}
@ -2570,6 +2571,159 @@ func TestReplicaReadAccessPathByLearnerCase(t *testing.T) {
s.True(s.runCaseAndCompare(ca))
}
func TestReplicaReadAvoidSlowStore(t *testing.T) {
s := new(testReplicaSelectorSuite)
s.SetupTest(t)
defer s.TearDownTest()
s.changeRegionLeader(3)
store, exists := s.cache.getStore(1)
s.True(exists)
for _, staleRead := range []bool{false, true} {
for _, withLabel := range []bool{false, true} {
var label *metapb.StoreLabel
if withLabel {
label = &metapb.StoreLabel{Key: "id", Value: "1"}
}
s.T().Logf("test case: stale read: %v, with label: %v, slow: false", staleRead, withLabel)
ca := replicaSelectorAccessPathCase{
reqType: tikvrpc.CmdGet,
readType: kv.ReplicaReadMixed,
staleRead: staleRead,
timeout: 0,
busyThresholdMs: 0,
label: label,
accessErr: []RegionErrorType{},
expect: &accessPathResult{
accessPath: []string{
fmt.Sprintf("{addr: store1, replica-read: %v, stale-read: %v}", !staleRead, staleRead),
},
respErr: "",
respRegionError: nil,
backoffCnt: 0,
backoffDetail: []string{},
regionIsValid: true,
},
}
s.True(s.runCaseAndCompare(ca))
s.T().Logf("test case: stale read: %v, with label: %v, slow: true", staleRead, withLabel)
expectedFirstStore := 2
if withLabel {
// Leader is preferred in this case
expectedFirstStore = 3
}
ca = replicaSelectorAccessPathCase{
reqType: tikvrpc.CmdGet,
readType: kv.ReplicaReadMixed,
staleRead: staleRead,
timeout: 0,
busyThresholdMs: 0,
label: label,
accessErr: []RegionErrorType{},
expect: &accessPathResult{
accessPath: []string{
fmt.Sprintf("{addr: store%v, replica-read: %v, stale-read: %v}", expectedFirstStore, !staleRead, staleRead),
},
respErr: "",
respRegionError: nil,
backoffCnt: 0,
backoffDetail: []string{},
regionIsValid: true,
},
beforeRun: func() {
s.resetStoreState()
store.healthStatus.updateTiKVServerSideSlowScore(100, time.Now())
},
}
// v1 doesn't support avoiding slow stores. We only test this on v2.
s.True(s.runCase(ca, true))
s.T().Logf("test case: stale read: %v, with label: %v, slow: false, encoutner err: true", staleRead, withLabel)
var expectedSecondPath string
if staleRead {
// Retry leader, and fallback to leader-read mode.
expectedSecondPath = "{addr: store3, replica-read: false, stale-read: false}"
} else {
if withLabel {
// Prefer retrying leader.
expectedSecondPath = "{addr: store3, replica-read: true, stale-read: false}"
} else {
// Retry any another replica.
expectedSecondPath = "{addr: store2, replica-read: true, stale-read: false}"
}
}
ca = replicaSelectorAccessPathCase{
reqType: tikvrpc.CmdGet,
readType: kv.ReplicaReadMixed,
staleRead: staleRead,
timeout: 0,
busyThresholdMs: 0,
label: label,
accessErr: []RegionErrorType{ServerIsBusyErr},
expect: &accessPathResult{
accessPath: []string{
fmt.Sprintf("{addr: store1, replica-read: %v, stale-read: %v}", !staleRead, staleRead),
// Retry leader.
// For stale read, it fallbacks to leader read. However, replica-read doesn't do so.
expectedSecondPath,
},
respErr: "",
respRegionError: nil,
backoffCnt: 0,
backoffDetail: []string{},
regionIsValid: true,
},
}
s.True(s.runCaseAndCompare(ca))
s.T().Logf("test case: stale read: %v, with label: %v, slow: true, encoutner err: true", staleRead, withLabel)
if expectedFirstStore == 3 {
// Retry on store 2 which is a follower.
// Stale-read mode falls back to replica-read mode.
expectedSecondPath = "{addr: store2, replica-read: true, stale-read: false}"
} else {
if staleRead {
// Retry in leader read mode
expectedSecondPath = "{addr: store3, replica-read: false, stale-read: false}"
} else {
// Retry with the same mode, which is replica-read mode.
expectedSecondPath = "{addr: store3, replica-read: true, stale-read: false}"
}
}
ca = replicaSelectorAccessPathCase{
reqType: tikvrpc.CmdGet,
readType: kv.ReplicaReadMixed,
staleRead: staleRead,
timeout: 0,
busyThresholdMs: 0,
label: label,
accessErr: []RegionErrorType{ServerIsBusyErr},
expect: &accessPathResult{
accessPath: []string{
fmt.Sprintf("{addr: store%v, replica-read: %v, stale-read: %v}", expectedFirstStore, !staleRead, staleRead),
expectedSecondPath,
},
respErr: "",
respRegionError: nil,
backoffCnt: 0,
backoffDetail: []string{},
regionIsValid: true,
},
beforeRun: func() {
s.resetStoreState()
store.healthStatus.updateTiKVServerSideSlowScore(100, time.Now())
},
}
s.True(s.runCase(ca, true))
}
}
}
func TestReplicaReadAccessPathByGenError(t *testing.T) {
s := new(testReplicaSelectorSuite)
s.SetupTest(t)
@ -2934,7 +3088,7 @@ func (s *testReplicaSelectorSuite) resetStoreState() {
for _, store := range rc.getStore().stores {
store.loadStats.Store(nil)
store.healthStatus.clientSideSlowScore.resetSlowScore()
store.healthStatus.updateTiKVServerSideSlowScore(0, time.Now())
store.healthStatus.resetTiKVServerSideSlowScoreForTest()
store.healthStatus.updateSlowFlag()
atomic.StoreUint32(&store.livenessState, uint32(reachable))
store.setResolveState(resolved)

View File

@ -124,7 +124,7 @@ func (ss *SlowScoreStat) updateSlowScore() {
}
atomic.CompareAndSwapUint64(&ss.avgTimecost, avgTimecost, ss.tsCntSlidingWindow.Avg())
// Resets the counter of inteval timecost
// Resets the counter of interval timecost
atomic.StoreUint64(&ss.intervalTimecost, 0)
atomic.StoreUint64(&ss.intervalUpdCount, 0)
}
@ -155,11 +155,17 @@ func (ss *SlowScoreStat) markAlreadySlow() {
// resetSlowScore resets the slow score to 0. It's used for test.
func (ss *SlowScoreStat) resetSlowScore() {
atomic.StoreUint64(&ss.avgScore, 0)
*ss = SlowScoreStat{
avgScore: 1,
}
}
func (ss *SlowScoreStat) isSlow() bool {
return ss.getSlowScore() >= slowScoreThreshold
return clientSideSlowScoreIsSlow(ss.getSlowScore())
}
func clientSideSlowScoreIsSlow(value uint64) bool {
return value >= slowScoreThreshold
}
// replicaFlowsType indicates the type of the destination replica of flows.
@ -170,7 +176,7 @@ const (
toLeader replicaFlowsType = iota
// toFollower indicates that flows are sent to followers' replica
toFollower
// numflowsDestType reserved to keep max replicaFlowsType value.
// numReplicaFlowsType is reserved to keep max replicaFlowsType value.
numReplicaFlowsType
)

View File

@ -214,7 +214,7 @@ func newStore(
peerAddr: peerAddr,
saddr: statusAddr,
// Make sure healthStatus field is never null.
healthStatus: newStoreHealthStatus(),
healthStatus: newStoreHealthStatus(id),
}
}
@ -223,7 +223,7 @@ func newUninitializedStore(id uint64) *Store {
return &Store{
storeID: id,
// Make sure healthStatus field is never null.
healthStatus: newStoreHealthStatus(),
healthStatus: newStoreHealthStatus(id),
}
}
@ -794,6 +794,9 @@ const (
)
type StoreHealthStatus struct {
// Used for logging.
storeID uint64
isSlow atomic.Bool
// A statistic for counting the request latency to this store
@ -816,8 +819,18 @@ type HealthStatusDetail struct {
TiKVSideSlowScore int64
}
func newStoreHealthStatus() *StoreHealthStatus {
return &StoreHealthStatus{}
func (d HealthStatusDetail) IsSlow() bool {
return clientSideSlowScoreIsSlow(uint64(d.ClientSideSlowScore)) || d.TiKVSideSlowScore >= tikvSlowScoreSlowThreshold
}
func (d HealthStatusDetail) String() string {
return fmt.Sprintf("{ ClientSideSlowScore: %d, TiKVSideSlowScore: %d }", d.ClientSideSlowScore, d.TiKVSideSlowScore)
}
func newStoreHealthStatus(storeID uint64) *StoreHealthStatus {
return &StoreHealthStatus{
storeID: storeID,
}
}
// IsSlow returns whether current Store is slow.
@ -935,9 +948,18 @@ func (s *StoreHealthStatus) updateTiKVServerSideSlowScore(score int64, currTime
s.tikvSideSlowScore.lastUpdateTime.Store(newUpdateTime)
}
func (s *StoreHealthStatus) resetTiKVServerSideSlowScoreForTest() {
s.setTiKVSlowScoreLastUpdateTimeForTest(time.Now().Add(-time.Hour * 2))
s.updateTiKVServerSideSlowScore(1, time.Now().Add(-time.Hour))
}
func (s *StoreHealthStatus) updateSlowFlag() {
isSlow := s.clientSideSlowScore.isSlow() || s.tikvSideSlowScore.score.Load() >= tikvSlowScoreSlowThreshold
s.isSlow.Store(isSlow)
healthDetail := s.GetHealthStatusDetail()
isSlow := healthDetail.IsSlow()
old := s.isSlow.Swap(isSlow)
if old != isSlow {
logutil.BgLogger().Info("store health status changed", zap.Uint64("storeID", s.storeID), zap.Bool("isSlow", isSlow), zap.Stringer("healthDetail", healthDetail))
}
}
// setTiKVSlowScoreLastUpdateTimeForTest force sets last update time of TiKV server side slow score to specified value.