mirror of https://github.com/tikv/client-go.git
193 lines
6.5 KiB
Go
193 lines
6.5 KiB
Go
// Copyright 2023 TiKV Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package locate
|
|
|
|
import (
|
|
"fmt"
|
|
"math"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
slowScoreInitVal = 1
|
|
slowScoreThreshold = 80
|
|
slowScoreMax = 100
|
|
slowScoreInitTimeoutInUs = 500000 // unit: us
|
|
slowScoreMaxTimeoutInUs = 30000000 // max timeout of one txn, unit: us
|
|
slidingWindowSize = 10 // default size of sliding window
|
|
)
|
|
|
|
// CountSlidingWindow represents the statistics on a bunch of sliding windows.
|
|
type CountSlidingWindow struct {
|
|
avg uint64
|
|
sum uint64
|
|
history []uint64
|
|
}
|
|
|
|
// Avg returns the average value of this sliding window
|
|
func (cnt *CountSlidingWindow) Avg() uint64 {
|
|
return cnt.avg
|
|
}
|
|
|
|
// Sum returns the sum value of this sliding window
|
|
func (cnt *CountSlidingWindow) Sum() uint64 {
|
|
return cnt.sum
|
|
}
|
|
|
|
// Append adds one value into this sliding windown and returns the gradient.
|
|
func (cnt *CountSlidingWindow) Append(value uint64) (gradient float64) {
|
|
prevAvg := cnt.avg
|
|
if len(cnt.history) < slidingWindowSize {
|
|
cnt.sum += value
|
|
} else {
|
|
cnt.sum = cnt.sum - cnt.history[0] + value
|
|
cnt.history = cnt.history[1:]
|
|
}
|
|
cnt.history = append(cnt.history, value)
|
|
cnt.avg = cnt.sum / (uint64(len(cnt.history)))
|
|
gradient = 1e-6
|
|
if prevAvg > 0 && value != prevAvg {
|
|
gradient = (float64(value) - float64(prevAvg)) / float64(prevAvg)
|
|
}
|
|
return gradient
|
|
}
|
|
|
|
// SlowScoreStat represents the statistics on business of Store.
|
|
type SlowScoreStat struct {
|
|
avgScore uint64
|
|
avgTimecost uint64
|
|
intervalTimecost uint64 // sum of the timecost in one counting interval. Unit: us
|
|
intervalUpdCount uint64 // count of update in one counting interval.
|
|
tsCntSlidingWindow CountSlidingWindow // sliding window on timecost
|
|
updCntSlidingWindow CountSlidingWindow // sliding window on update count
|
|
}
|
|
|
|
func (ss *SlowScoreStat) getSlowScore() uint64 {
|
|
return atomic.LoadUint64(&ss.avgScore)
|
|
}
|
|
|
|
// updateSlowScore updates the statistics on SlowScore periodically.
|
|
//
|
|
// updateSlowScore will update the SlowScore of each Store according to the two factors:
|
|
// - Requests in one timing tick. This factor can be regarded as QPS on each store.
|
|
// - Average timecost on each request in one timing tick. This factor is used to detect
|
|
// whether the relative store is busy on processing requests.
|
|
//
|
|
// If one Store is slow, its Requests will keep decreasing gradually, but Average timecost will
|
|
// keep ascending. And the updating algorithm just follows this mechanism and compute the
|
|
// trend of slow, by calculating gradients of slow in each tick.
|
|
func (ss *SlowScoreStat) updateSlowScore() {
|
|
if atomic.LoadUint64(&ss.avgTimecost) == 0 {
|
|
// Init the whole statistics.
|
|
atomic.StoreUint64(&ss.avgScore, slowScoreInitVal)
|
|
atomic.StoreUint64(&ss.avgTimecost, slowScoreInitTimeoutInUs)
|
|
return
|
|
}
|
|
|
|
avgTimecost := atomic.LoadUint64(&ss.avgTimecost)
|
|
intervalUpdCount := atomic.LoadUint64(&ss.intervalUpdCount)
|
|
intervalTimecost := atomic.LoadUint64(&ss.intervalTimecost)
|
|
|
|
updGradient := float64(1.0)
|
|
tsGradient := float64(1.0)
|
|
if intervalUpdCount > 0 {
|
|
intervalAvgTimecost := intervalTimecost / intervalUpdCount
|
|
updGradient = ss.updCntSlidingWindow.Append(intervalUpdCount)
|
|
tsGradient = ss.tsCntSlidingWindow.Append(intervalAvgTimecost)
|
|
}
|
|
// Update avgScore & avgTimecost
|
|
avgScore := atomic.LoadUint64(&ss.avgScore)
|
|
if updGradient+0.1 <= float64(1e-9) && tsGradient-0.1 >= float64(1e-9) {
|
|
risenRatio := math.Min(float64(5.43), math.Abs(tsGradient/updGradient))
|
|
curAvgScore := math.Ceil(math.Min(float64(avgScore)*risenRatio+float64(1.0), float64(slowScoreMax)))
|
|
atomic.CompareAndSwapUint64(&ss.avgScore, avgScore, uint64(curAvgScore))
|
|
} else {
|
|
costScore := uint64(math.Ceil(math.Max(float64(slowScoreInitVal), math.Min(float64(2.71), 1.0+math.Abs(updGradient)))))
|
|
if avgScore <= slowScoreInitVal+costScore {
|
|
atomic.CompareAndSwapUint64(&ss.avgScore, avgScore, slowScoreInitVal)
|
|
} else {
|
|
atomic.CompareAndSwapUint64(&ss.avgScore, avgScore, avgScore-costScore)
|
|
}
|
|
}
|
|
atomic.CompareAndSwapUint64(&ss.avgTimecost, avgTimecost, ss.tsCntSlidingWindow.Avg())
|
|
|
|
// Resets the counter of interval timecost
|
|
atomic.StoreUint64(&ss.intervalTimecost, 0)
|
|
atomic.StoreUint64(&ss.intervalUpdCount, 0)
|
|
}
|
|
|
|
// recordSlowScoreStat records the timecost of each request.
|
|
func (ss *SlowScoreStat) recordSlowScoreStat(timecost time.Duration) {
|
|
atomic.AddUint64(&ss.intervalUpdCount, 1)
|
|
avgTimecost := atomic.LoadUint64(&ss.avgTimecost)
|
|
if avgTimecost == 0 {
|
|
// Init the whole statistics with the original one.
|
|
atomic.StoreUint64(&ss.avgScore, slowScoreInitVal)
|
|
atomic.StoreUint64(&ss.avgTimecost, slowScoreInitTimeoutInUs)
|
|
atomic.StoreUint64(&ss.intervalTimecost, uint64(timecost/time.Microsecond))
|
|
return
|
|
}
|
|
curTimecost := uint64(timecost / time.Microsecond)
|
|
if curTimecost >= slowScoreMaxTimeoutInUs {
|
|
// Current query is too slow to serve (>= 30s, max timeout of a request) in this tick.
|
|
atomic.StoreUint64(&ss.avgScore, slowScoreMax)
|
|
return
|
|
}
|
|
atomic.AddUint64(&ss.intervalTimecost, curTimecost)
|
|
}
|
|
|
|
func (ss *SlowScoreStat) markAlreadySlow() {
|
|
atomic.StoreUint64(&ss.avgScore, slowScoreMax)
|
|
}
|
|
|
|
// resetSlowScore resets the slow score to 0. It's used for test.
|
|
func (ss *SlowScoreStat) resetSlowScore() {
|
|
*ss = SlowScoreStat{
|
|
avgScore: 1,
|
|
}
|
|
}
|
|
|
|
func (ss *SlowScoreStat) isSlow() bool {
|
|
return clientSideSlowScoreIsSlow(ss.getSlowScore())
|
|
}
|
|
|
|
func clientSideSlowScoreIsSlow(value uint64) bool {
|
|
return value >= slowScoreThreshold
|
|
}
|
|
|
|
// replicaFlowsType indicates the type of the destination replica of flows.
|
|
type replicaFlowsType int
|
|
|
|
const (
|
|
// toLeader indicates that flows are sent to leader replica.
|
|
toLeader replicaFlowsType = iota
|
|
// toFollower indicates that flows are sent to followers' replica
|
|
toFollower
|
|
// numReplicaFlowsType is reserved to keep max replicaFlowsType value.
|
|
numReplicaFlowsType
|
|
)
|
|
|
|
func (a replicaFlowsType) String() string {
|
|
switch a {
|
|
case toLeader:
|
|
return "ToLeader"
|
|
case toFollower:
|
|
return "ToFollower"
|
|
default:
|
|
return fmt.Sprintf("%d", a)
|
|
}
|
|
}
|