diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 4601ff25..5139d5b2 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -459,6 +459,7 @@ func NewRegionCache(pdClient pd.Client) *RegionCache { c.enableForwarding = config.GetGlobalConfig().EnableForwarding // Default use 15s as the update inerval. go c.asyncUpdateStoreSlowScore(time.Duration(interval/4) * time.Second) + go c.asyncReportStoreReplicaFlows(time.Duration(interval/2) * time.Second) return c } @@ -2277,6 +2278,8 @@ type Store struct { // A statistic for counting the request latency to this store slowScore SlowScoreStat + // A statistic for counting the flows of different replicas on this store + replicaFlowsStats [numReplicaFlowsType]uint64 } type resolveState uint64 @@ -2718,6 +2721,7 @@ func (s *Store) recordSlowScoreStat(timecost time.Duration) { s.slowScore.recordSlowScoreStat(timecost) } +// markAlreadySlow marks the related store already slow. func (s *Store) markAlreadySlow() { s.slowScore.markAlreadySlow() } @@ -2737,6 +2741,7 @@ func (c *RegionCache) asyncUpdateStoreSlowScore(interval time.Duration) { } } +// checkAndUpdateStoreSlowScores checks and updates slowScore on each store. func (c *RegionCache) checkAndUpdateStoreSlowScores() { defer func() { r := recover() @@ -2758,6 +2763,42 @@ func (c *RegionCache) checkAndUpdateStoreSlowScores() { } } +// getReplicaFlowsStats returns the statistics on the related replicaFlowsType. +func (s *Store) getReplicaFlowsStats(destType replicaFlowsType) uint64 { + return atomic.LoadUint64(&s.replicaFlowsStats[destType]) +} + +// resetReplicaFlowsStats resets the statistics on the related replicaFlowsType. +func (s *Store) resetReplicaFlowsStats(destType replicaFlowsType) { + atomic.StoreUint64(&s.replicaFlowsStats[destType], 0) +} + +// recordReplicaFlowsStats records the statistics on the related replicaFlowsType. +func (s *Store) recordReplicaFlowsStats(destType replicaFlowsType) { + atomic.AddUint64(&s.replicaFlowsStats[destType], 1) +} + +// asyncReportStoreReplicaFlows reports the statistics on the related replicaFlowsType. +func (c *RegionCache) asyncReportStoreReplicaFlows(interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-c.ctx.Done(): + return + case <-ticker.C: + c.storeMu.RLock() + for _, store := range c.storeMu.stores { + for destType := toLeader; destType < numReplicaFlowsType; destType++ { + metrics.TiKVPreferLeaderFlowsGauge.WithLabelValues(destType.String(), store.addr).Set(float64(store.getReplicaFlowsStats(destType))) + store.resetReplicaFlowsStats(destType) + } + } + c.storeMu.RUnlock() + } + } +} + func createKVHealthClient(ctx context.Context, addr string) (*grpc.ClientConn, healthpb.HealthClient, error) { // Temporarily directly load the config from the global config, however it's not a good idea to let RegionCache to // access it. diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 1e364965..5b21f739 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -594,6 +594,14 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector state.lastIdx = state.leaderIdx selector.targetIdx = state.leaderIdx } + // Monitor the flows destination if selector is under `ReplicaReadPreferLeader` mode. + if state.option.preferLeader { + if selector.targetIdx != state.leaderIdx { + selector.replicas[selector.targetIdx].store.recordReplicaFlowsStats(toFollower) + } else { + selector.replicas[selector.targetIdx].store.recordReplicaFlowsStats(toLeader) + } + } return selector.buildRPCContext(bo) } diff --git a/internal/locate/slow_score.go b/internal/locate/slow_score.go index 0da7f145..562c9e9d 100644 --- a/internal/locate/slow_score.go +++ b/internal/locate/slow_score.go @@ -15,6 +15,7 @@ package locate import ( + "fmt" "math" "sync/atomic" "time" @@ -155,3 +156,26 @@ func (ss *SlowScoreStat) markAlreadySlow() { func (ss *SlowScoreStat) isSlow() bool { return ss.getSlowScore() >= slowScoreThreshold } + +// replicaFlowsType indicates the type of the destination replica of flows. +type replicaFlowsType int + +const ( + // toLeader indicates that flows are sent to leader replica. + toLeader replicaFlowsType = iota + // toFollower indicates that flows are sent to followers' replica + toFollower + // numflowsDestType reserved to keep max replicaFlowsType value. + numReplicaFlowsType +) + +func (a replicaFlowsType) String() string { + switch a { + case toLeader: + return "ToLeader" + case toFollower: + return "ToFollower" + default: + return fmt.Sprintf("%d", a) + } +} diff --git a/metrics/metrics.go b/metrics/metrics.go index b72cfb5d..ea91ec4c 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -99,6 +99,7 @@ var ( TiKVGrpcConnectionState *prometheus.GaugeVec TiKVAggressiveLockedKeysCounter *prometheus.CounterVec TiKVStoreSlowScoreGauge *prometheus.GaugeVec + TiKVPreferLeaderFlowsGauge *prometheus.GaugeVec ) // Label constants. @@ -619,6 +620,14 @@ func initMetrics(namespace, subsystem string) { Help: "Slow scores of each tikv node based on RPC timecosts", }, []string{LblStore}) + TiKVPreferLeaderFlowsGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "prefer_leader_flows_gauge", + Help: "Counter of flows under PreferLeader mode.", + }, []string{LblType, LblStore}) + initShortcuts() } @@ -692,6 +701,7 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVGrpcConnectionState) prometheus.MustRegister(TiKVAggressiveLockedKeysCounter) prometheus.MustRegister(TiKVStoreSlowScoreGauge) + prometheus.MustRegister(TiKVPreferLeaderFlowsGauge) } // readCounter reads the value of a prometheus.Counter.