From 97cad411eb08dc2b377f1e75005f395712c33c5b Mon Sep 17 00:00:00 2001 From: you06 Date: Mon, 25 Aug 2025 11:37:58 +0800 Subject: [PATCH] metrcis: add replica read traffic metrics (#1717) ref pingcap/tidb#62740 Signed-off-by: you06 --- internal/locate/metrics_collector.go | 28 +++++++++++++++++++++++++++- internal/locate/region_request.go | 20 +++++++++++++------- internal/locate/store_cache.go | 5 +++++ metrics/metrics.go | 10 ++++++++++ metrics/shortcuts.go | 10 ++++++++++ tikv/kv.go | 2 +- 6 files changed, 66 insertions(+), 9 deletions(-) diff --git a/internal/locate/metrics_collector.go b/internal/locate/metrics_collector.go index 516b6047..f86ea2df 100644 --- a/internal/locate/metrics_collector.go +++ b/internal/locate/metrics_collector.go @@ -17,6 +17,7 @@ package locate import ( "sync/atomic" + "github.com/prometheus/client_golang/prometheus" "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/metrics" "github.com/tikv/client-go/v2/tikvrpc" @@ -24,7 +25,8 @@ import ( ) type networkCollector struct { - staleRead bool + staleRead bool + replicaReadType kv.ReplicaReadType } func (s *networkCollector) onReq(req *tikvrpc.Request, details *util.ExecDetails) { @@ -51,6 +53,7 @@ func (s *networkCollector) onReq(req *tikvrpc.Request, details *util.ExecDetails atomic.AddInt64(crossZone, int64(size)) } } + // stale read metrics if s.staleRead { s.onReqStaleRead(float64(size), isCrossZoneTraffic) @@ -89,6 +92,29 @@ func (s *networkCollector) onResp(req *tikvrpc.Request, resp *tikvrpc.Response, if s.staleRead { s.onRespStaleRead(float64(size), isCrossZoneTraffic) } + + // replica read metrics + if isReadReq(req.Type) { + var observer prometheus.Observer + switch req.AccessLocation { + case kv.AccessLocalZone: + if req.ReplicaRead { + observer = metrics.ReadRequestFollowerLocalBytes + } else { + observer = metrics.ReadRequestLeaderLocalBytes + } + case kv.AccessCrossZone: + if req.ReplicaRead { + observer = metrics.ReadRequestFollowerRemoteBytes + } else { + observer = metrics.ReadRequestLeaderRemoteBytes + } + case kv.AccessUnknown: + } + if observer != nil { + observer.Observe(float64(size + req.GetSize())) + } + } } func (s *networkCollector) onReqStaleRead(size float64, isCrossZoneTraffic bool) { diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 231b453d..313fcb54 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -495,7 +495,8 @@ func (s *RegionRequestSender) SendReqAsync( opts: opts, }, invariants: reqInvariants{ - staleRead: req.StaleRead, + staleRead: req.StaleRead, + replicaReadType: req.ReplicaReadType, }, } @@ -895,7 +896,8 @@ type sendReqState struct { // If the tikvrpc.Request is changed during the retries or other operations. // the reqInvariants can tell the initial state. type reqInvariants struct { - staleRead bool + staleRead bool + replicaReadType kv.ReplicaReadType } // next encapsulates one iteration of the retry loop. calling `next` will handle send error (s.vars.err) or region error @@ -1125,7 +1127,8 @@ func (s *sendReqState) send() (canceled bool) { atomic.AddInt64(&execDetails.WaitKVRespDuration, int64(rpcDuration)) } collector := networkCollector{ - staleRead: s.invariants.staleRead, + staleRead: s.invariants.staleRead, + replicaReadType: s.invariants.replicaReadType, } collector.onReq(req, execDetails) collector.onResp(req, s.vars.resp, execDetails) @@ -1234,6 +1237,7 @@ func (s *sendReqState) initForAsyncRequest() (ok bool) { // set access location based on source and target "zone" label. s.setReqAccessLocation(req) + req.Context.ClusterId = s.vars.rpcCtx.ClusterID if req.InputRequestSource != "" && s.replicaSelector != nil { patchRequestSource(req, s.replicaSelector.replicaType()) @@ -1263,8 +1267,8 @@ func (s *sendReqState) setReqAccessLocation(req *tikvrpc.Request) { // set access location based on source and target "zone" label. if s.replicaSelector != nil && s.replicaSelector.target != nil { selfZoneLabel := config.GetGlobalConfig().ZoneLabel - targetZoneLabel, _ := s.replicaSelector.target.store.GetLabelValue("zone") - // if either "zone" label is "", we actually don't known if it involves cross AZ traffic. + targetZoneLabel, _ := s.replicaSelector.target.store.GetLabelValue(DCLabelKey) + // if either "zone" label is "", we actually don't know if it involves cross AZ traffic. if selfZoneLabel == "" || targetZoneLabel == "" { req.AccessLocation = kv.AccessUnknown } else if selfZoneLabel == targetZoneLabel { @@ -1297,7 +1301,8 @@ func (s *sendReqState) handleAsyncResponse(start time.Time, canceled bool, resp atomic.AddInt64(&execDetails.WaitKVRespDuration, int64(rpcDuration)) } collector := networkCollector{ - staleRead: s.invariants.staleRead, + staleRead: s.invariants.staleRead, + replicaReadType: s.invariants.replicaReadType, } collector.onReq(req, execDetails) collector.onResp(req, resp, execDetails) @@ -1434,7 +1439,8 @@ func (s *RegionRequestSender) SendReqCtx( opts: opts, }, invariants: reqInvariants{ - staleRead: req.StaleRead, + staleRead: req.StaleRead, + replicaReadType: req.ReplicaReadType, }, } diff --git a/internal/locate/store_cache.go b/internal/locate/store_cache.go index 9ac20ec9..28a864cf 100644 --- a/internal/locate/store_cache.go +++ b/internal/locate/store_cache.go @@ -44,6 +44,11 @@ import ( "google.golang.org/grpc/keepalive" ) +const ( + // DCLabelKey indicates the key of label which represents the dc for Store. + DCLabelKey = "zone" +) + type testingKnobs interface { getMockRequestLiveness() livenessFunc setMockRequestLiveness(f livenessFunc) diff --git a/metrics/metrics.go b/metrics/metrics.go index 6b5ae116..e34d59ce 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -121,6 +121,7 @@ var ( TiKVTxnWriteConflictCounter prometheus.Counter TiKVAsyncSendReqCounter *prometheus.CounterVec TiKVAsyncBatchGetCounter *prometheus.CounterVec + TiKVReadRequestBytes *prometheus.SummaryVec ) // Label constants. @@ -889,6 +890,14 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { ConstLabels: constLabels, }, []string{LblResult}) + TiKVReadRequestBytes = prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "read_request_bytes", + Help: "Summary of read requests bytes", + }, []string{LblType, LblResult}) + initShortcuts() } @@ -988,6 +997,7 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVTxnWriteConflictCounter) prometheus.MustRegister(TiKVAsyncSendReqCounter) prometheus.MustRegister(TiKVAsyncBatchGetCounter) + prometheus.MustRegister(TiKVReadRequestBytes) } // readCounter reads the value of a prometheus.Counter. diff --git a/metrics/shortcuts.go b/metrics/shortcuts.go index 90d7f25a..e7ff91f4 100644 --- a/metrics/shortcuts.go +++ b/metrics/shortcuts.go @@ -187,6 +187,11 @@ var ( AsyncBatchGetCounterWithRegionError prometheus.Counter AsyncBatchGetCounterWithLockError prometheus.Counter AsyncBatchGetCounterWithOtherError prometheus.Counter + + ReadRequestLeaderLocalBytes prometheus.Observer + ReadRequestLeaderRemoteBytes prometheus.Observer + ReadRequestFollowerLocalBytes prometheus.Observer + ReadRequestFollowerRemoteBytes prometheus.Observer ) func initShortcuts() { @@ -343,4 +348,9 @@ func initShortcuts() { AsyncBatchGetCounterWithRegionError = TiKVAsyncBatchGetCounter.WithLabelValues("region_error") AsyncBatchGetCounterWithLockError = TiKVAsyncBatchGetCounter.WithLabelValues("lock_error") AsyncBatchGetCounterWithOtherError = TiKVAsyncBatchGetCounter.WithLabelValues("other_error") + + ReadRequestLeaderLocalBytes = TiKVReadRequestBytes.WithLabelValues("leader", "local") + ReadRequestLeaderRemoteBytes = TiKVReadRequestBytes.WithLabelValues("leader", "cross-zone") + ReadRequestFollowerLocalBytes = TiKVReadRequestBytes.WithLabelValues("follower", "local") + ReadRequestFollowerRemoteBytes = TiKVReadRequestBytes.WithLabelValues("follower", "cross-zone") } diff --git a/tikv/kv.go b/tikv/kv.go index fda07b62..a76a705d 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -83,7 +83,7 @@ import ( const ( // DCLabelKey indicates the key of label which represents the dc for Store. - DCLabelKey = "zone" + DCLabelKey = locate.DCLabelKey safeTSUpdateInterval = time.Second * 2 defaultPipelinedFlushConcurrency = 128