client-go/internal/locate/slow_score.go

193 lines
6.5 KiB
Go

// Copyright 2023 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package locate
import (
"fmt"
"math"
"sync/atomic"
"time"
)
const (
slowScoreInitVal = 1
slowScoreThreshold = 80
slowScoreMax = 100
slowScoreInitTimeoutInUs = 500000 // unit: us
slowScoreMaxTimeoutInUs = 30000000 // max timeout of one txn, unit: us
slidingWindowSize = 10 // default size of sliding window
)
// CountSlidingWindow represents the statistics on a bunch of sliding windows.
type CountSlidingWindow struct {
avg uint64
sum uint64
history []uint64
}
// Avg returns the average value of this sliding window
func (cnt *CountSlidingWindow) Avg() uint64 {
return cnt.avg
}
// Sum returns the sum value of this sliding window
func (cnt *CountSlidingWindow) Sum() uint64 {
return cnt.sum
}
// Append adds one value into this sliding windown and returns the gradient.
func (cnt *CountSlidingWindow) Append(value uint64) (gradient float64) {
prevAvg := cnt.avg
if len(cnt.history) < slidingWindowSize {
cnt.sum += value
} else {
cnt.sum = cnt.sum - cnt.history[0] + value
cnt.history = cnt.history[1:]
}
cnt.history = append(cnt.history, value)
cnt.avg = cnt.sum / (uint64(len(cnt.history)))
gradient = 1e-6
if prevAvg > 0 && value != prevAvg {
gradient = (float64(value) - float64(prevAvg)) / float64(prevAvg)
}
return gradient
}
// SlowScoreStat represents the statistics on business of Store.
type SlowScoreStat struct {
avgScore uint64
avgTimecost uint64
intervalTimecost uint64 // sum of the timecost in one counting interval. Unit: us
intervalUpdCount uint64 // count of update in one counting interval.
tsCntSlidingWindow CountSlidingWindow // sliding window on timecost
updCntSlidingWindow CountSlidingWindow // sliding window on update count
}
func (ss *SlowScoreStat) getSlowScore() uint64 {
return atomic.LoadUint64(&ss.avgScore)
}
// updateSlowScore updates the statistics on SlowScore periodically.
//
// updateSlowScore will update the SlowScore of each Store according to the two factors:
// - Requests in one timing tick. This factor can be regarded as QPS on each store.
// - Average timecost on each request in one timing tick. This factor is used to detect
// whether the relative store is busy on processing requests.
//
// If one Store is slow, its Requests will keep decreasing gradually, but Average timecost will
// keep ascending. And the updating algorithm just follows this mechanism and compute the
// trend of slow, by calculating gradients of slow in each tick.
func (ss *SlowScoreStat) updateSlowScore() {
if atomic.LoadUint64(&ss.avgTimecost) == 0 {
// Init the whole statistics.
atomic.StoreUint64(&ss.avgScore, slowScoreInitVal)
atomic.StoreUint64(&ss.avgTimecost, slowScoreInitTimeoutInUs)
return
}
avgTimecost := atomic.LoadUint64(&ss.avgTimecost)
intervalUpdCount := atomic.LoadUint64(&ss.intervalUpdCount)
intervalTimecost := atomic.LoadUint64(&ss.intervalTimecost)
updGradient := float64(1.0)
tsGradient := float64(1.0)
if intervalUpdCount > 0 {
intervalAvgTimecost := intervalTimecost / intervalUpdCount
updGradient = ss.updCntSlidingWindow.Append(intervalUpdCount)
tsGradient = ss.tsCntSlidingWindow.Append(intervalAvgTimecost)
}
// Update avgScore & avgTimecost
avgScore := atomic.LoadUint64(&ss.avgScore)
if updGradient+0.1 <= float64(1e-9) && tsGradient-0.1 >= float64(1e-9) {
risenRatio := math.Min(float64(5.43), math.Abs(tsGradient/updGradient))
curAvgScore := math.Ceil(math.Min(float64(avgScore)*risenRatio+float64(1.0), float64(slowScoreMax)))
atomic.CompareAndSwapUint64(&ss.avgScore, avgScore, uint64(curAvgScore))
} else {
costScore := uint64(math.Ceil(math.Max(float64(slowScoreInitVal), math.Min(float64(2.71), 1.0+math.Abs(updGradient)))))
if avgScore <= slowScoreInitVal+costScore {
atomic.CompareAndSwapUint64(&ss.avgScore, avgScore, slowScoreInitVal)
} else {
atomic.CompareAndSwapUint64(&ss.avgScore, avgScore, avgScore-costScore)
}
}
atomic.CompareAndSwapUint64(&ss.avgTimecost, avgTimecost, ss.tsCntSlidingWindow.Avg())
// Resets the counter of interval timecost
atomic.StoreUint64(&ss.intervalTimecost, 0)
atomic.StoreUint64(&ss.intervalUpdCount, 0)
}
// recordSlowScoreStat records the timecost of each request.
func (ss *SlowScoreStat) recordSlowScoreStat(timecost time.Duration) {
atomic.AddUint64(&ss.intervalUpdCount, 1)
avgTimecost := atomic.LoadUint64(&ss.avgTimecost)
if avgTimecost == 0 {
// Init the whole statistics with the original one.
atomic.StoreUint64(&ss.avgScore, slowScoreInitVal)
atomic.StoreUint64(&ss.avgTimecost, slowScoreInitTimeoutInUs)
atomic.StoreUint64(&ss.intervalTimecost, uint64(timecost/time.Microsecond))
return
}
curTimecost := uint64(timecost / time.Microsecond)
if curTimecost >= slowScoreMaxTimeoutInUs {
// Current query is too slow to serve (>= 30s, max timeout of a request) in this tick.
atomic.StoreUint64(&ss.avgScore, slowScoreMax)
return
}
atomic.AddUint64(&ss.intervalTimecost, curTimecost)
}
func (ss *SlowScoreStat) markAlreadySlow() {
atomic.StoreUint64(&ss.avgScore, slowScoreMax)
}
// resetSlowScore resets the slow score to 0. It's used for test.
func (ss *SlowScoreStat) resetSlowScore() {
*ss = SlowScoreStat{
avgScore: 1,
}
}
func (ss *SlowScoreStat) isSlow() bool {
return clientSideSlowScoreIsSlow(ss.getSlowScore())
}
func clientSideSlowScoreIsSlow(value uint64) bool {
return value >= slowScoreThreshold
}
// replicaFlowsType indicates the type of the destination replica of flows.
type replicaFlowsType int
const (
// toLeader indicates that flows are sent to leader replica.
toLeader replicaFlowsType = iota
// toFollower indicates that flows are sent to followers' replica
toFollower
// numReplicaFlowsType is reserved to keep max replicaFlowsType value.
numReplicaFlowsType
)
func (a replicaFlowsType) String() string {
switch a {
case toLeader:
return "ToLeader"
case toFollower:
return "ToFollower"
default:
return fmt.Sprintf("%d", a)
}
}