[Metrics] Supply extra metrics to monitor the flows under `prefer-leader` mode. (#716)

Signed-off-by: Lucasliang <nkcs_lykx@hotmail.com>
This commit is contained in:
Lucas 2023-03-13 18:08:39 +08:00 committed by GitHub
parent c21bf9396a
commit a7e3df4ab1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 83 additions and 0 deletions

View File

@ -459,6 +459,7 @@ func NewRegionCache(pdClient pd.Client) *RegionCache {
c.enableForwarding = config.GetGlobalConfig().EnableForwarding c.enableForwarding = config.GetGlobalConfig().EnableForwarding
// Default use 15s as the update inerval. // Default use 15s as the update inerval.
go c.asyncUpdateStoreSlowScore(time.Duration(interval/4) * time.Second) go c.asyncUpdateStoreSlowScore(time.Duration(interval/4) * time.Second)
go c.asyncReportStoreReplicaFlows(time.Duration(interval/2) * time.Second)
return c return c
} }
@ -2277,6 +2278,8 @@ type Store struct {
// A statistic for counting the request latency to this store // A statistic for counting the request latency to this store
slowScore SlowScoreStat slowScore SlowScoreStat
// A statistic for counting the flows of different replicas on this store
replicaFlowsStats [numReplicaFlowsType]uint64
} }
type resolveState uint64 type resolveState uint64
@ -2718,6 +2721,7 @@ func (s *Store) recordSlowScoreStat(timecost time.Duration) {
s.slowScore.recordSlowScoreStat(timecost) s.slowScore.recordSlowScoreStat(timecost)
} }
// markAlreadySlow marks the related store already slow.
func (s *Store) markAlreadySlow() { func (s *Store) markAlreadySlow() {
s.slowScore.markAlreadySlow() s.slowScore.markAlreadySlow()
} }
@ -2737,6 +2741,7 @@ func (c *RegionCache) asyncUpdateStoreSlowScore(interval time.Duration) {
} }
} }
// checkAndUpdateStoreSlowScores checks and updates slowScore on each store.
func (c *RegionCache) checkAndUpdateStoreSlowScores() { func (c *RegionCache) checkAndUpdateStoreSlowScores() {
defer func() { defer func() {
r := recover() r := recover()
@ -2758,6 +2763,42 @@ func (c *RegionCache) checkAndUpdateStoreSlowScores() {
} }
} }
// getReplicaFlowsStats returns the statistics on the related replicaFlowsType.
func (s *Store) getReplicaFlowsStats(destType replicaFlowsType) uint64 {
return atomic.LoadUint64(&s.replicaFlowsStats[destType])
}
// resetReplicaFlowsStats resets the statistics on the related replicaFlowsType.
func (s *Store) resetReplicaFlowsStats(destType replicaFlowsType) {
atomic.StoreUint64(&s.replicaFlowsStats[destType], 0)
}
// recordReplicaFlowsStats records the statistics on the related replicaFlowsType.
func (s *Store) recordReplicaFlowsStats(destType replicaFlowsType) {
atomic.AddUint64(&s.replicaFlowsStats[destType], 1)
}
// asyncReportStoreReplicaFlows reports the statistics on the related replicaFlowsType.
func (c *RegionCache) asyncReportStoreReplicaFlows(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-c.ctx.Done():
return
case <-ticker.C:
c.storeMu.RLock()
for _, store := range c.storeMu.stores {
for destType := toLeader; destType < numReplicaFlowsType; destType++ {
metrics.TiKVPreferLeaderFlowsGauge.WithLabelValues(destType.String(), store.addr).Set(float64(store.getReplicaFlowsStats(destType)))
store.resetReplicaFlowsStats(destType)
}
}
c.storeMu.RUnlock()
}
}
}
func createKVHealthClient(ctx context.Context, addr string) (*grpc.ClientConn, healthpb.HealthClient, error) { func createKVHealthClient(ctx context.Context, addr string) (*grpc.ClientConn, healthpb.HealthClient, error) {
// Temporarily directly load the config from the global config, however it's not a good idea to let RegionCache to // Temporarily directly load the config from the global config, however it's not a good idea to let RegionCache to
// access it. // access it.

View File

@ -594,6 +594,14 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
state.lastIdx = state.leaderIdx state.lastIdx = state.leaderIdx
selector.targetIdx = state.leaderIdx selector.targetIdx = state.leaderIdx
} }
// Monitor the flows destination if selector is under `ReplicaReadPreferLeader` mode.
if state.option.preferLeader {
if selector.targetIdx != state.leaderIdx {
selector.replicas[selector.targetIdx].store.recordReplicaFlowsStats(toFollower)
} else {
selector.replicas[selector.targetIdx].store.recordReplicaFlowsStats(toLeader)
}
}
return selector.buildRPCContext(bo) return selector.buildRPCContext(bo)
} }

View File

@ -15,6 +15,7 @@
package locate package locate
import ( import (
"fmt"
"math" "math"
"sync/atomic" "sync/atomic"
"time" "time"
@ -155,3 +156,26 @@ func (ss *SlowScoreStat) markAlreadySlow() {
func (ss *SlowScoreStat) isSlow() bool { func (ss *SlowScoreStat) isSlow() bool {
return ss.getSlowScore() >= slowScoreThreshold return ss.getSlowScore() >= slowScoreThreshold
} }
// replicaFlowsType indicates the type of the destination replica of flows.
type replicaFlowsType int
const (
// toLeader indicates that flows are sent to leader replica.
toLeader replicaFlowsType = iota
// toFollower indicates that flows are sent to followers' replica
toFollower
// numflowsDestType reserved to keep max replicaFlowsType value.
numReplicaFlowsType
)
func (a replicaFlowsType) String() string {
switch a {
case toLeader:
return "ToLeader"
case toFollower:
return "ToFollower"
default:
return fmt.Sprintf("%d", a)
}
}

View File

@ -99,6 +99,7 @@ var (
TiKVGrpcConnectionState *prometheus.GaugeVec TiKVGrpcConnectionState *prometheus.GaugeVec
TiKVAggressiveLockedKeysCounter *prometheus.CounterVec TiKVAggressiveLockedKeysCounter *prometheus.CounterVec
TiKVStoreSlowScoreGauge *prometheus.GaugeVec TiKVStoreSlowScoreGauge *prometheus.GaugeVec
TiKVPreferLeaderFlowsGauge *prometheus.GaugeVec
) )
// Label constants. // Label constants.
@ -619,6 +620,14 @@ func initMetrics(namespace, subsystem string) {
Help: "Slow scores of each tikv node based on RPC timecosts", Help: "Slow scores of each tikv node based on RPC timecosts",
}, []string{LblStore}) }, []string{LblStore})
TiKVPreferLeaderFlowsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "prefer_leader_flows_gauge",
Help: "Counter of flows under PreferLeader mode.",
}, []string{LblType, LblStore})
initShortcuts() initShortcuts()
} }
@ -692,6 +701,7 @@ func RegisterMetrics() {
prometheus.MustRegister(TiKVGrpcConnectionState) prometheus.MustRegister(TiKVGrpcConnectionState)
prometheus.MustRegister(TiKVAggressiveLockedKeysCounter) prometheus.MustRegister(TiKVAggressiveLockedKeysCounter)
prometheus.MustRegister(TiKVStoreSlowScoreGauge) prometheus.MustRegister(TiKVStoreSlowScoreGauge)
prometheus.MustRegister(TiKVPreferLeaderFlowsGauge)
} }
// readCounter reads the value of a prometheus.Counter. // readCounter reads the value of a prometheus.Counter.