metrcis: add replica read traffic metrics (#1717)

ref pingcap/tidb#62740

Signed-off-by: you06 <you1474600@gmail.com>
This commit is contained in:
you06 2025-08-25 11:37:58 +08:00 committed by GitHub
parent c0b6188ffd
commit 97cad411eb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 66 additions and 9 deletions

View File

@ -17,6 +17,7 @@ package locate
import ( import (
"sync/atomic" "sync/atomic"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/metrics" "github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/tikvrpc"
@ -24,7 +25,8 @@ import (
) )
type networkCollector struct { type networkCollector struct {
staleRead bool staleRead bool
replicaReadType kv.ReplicaReadType
} }
func (s *networkCollector) onReq(req *tikvrpc.Request, details *util.ExecDetails) { func (s *networkCollector) onReq(req *tikvrpc.Request, details *util.ExecDetails) {
@ -51,6 +53,7 @@ func (s *networkCollector) onReq(req *tikvrpc.Request, details *util.ExecDetails
atomic.AddInt64(crossZone, int64(size)) atomic.AddInt64(crossZone, int64(size))
} }
} }
// stale read metrics // stale read metrics
if s.staleRead { if s.staleRead {
s.onReqStaleRead(float64(size), isCrossZoneTraffic) s.onReqStaleRead(float64(size), isCrossZoneTraffic)
@ -89,6 +92,29 @@ func (s *networkCollector) onResp(req *tikvrpc.Request, resp *tikvrpc.Response,
if s.staleRead { if s.staleRead {
s.onRespStaleRead(float64(size), isCrossZoneTraffic) s.onRespStaleRead(float64(size), isCrossZoneTraffic)
} }
// replica read metrics
if isReadReq(req.Type) {
var observer prometheus.Observer
switch req.AccessLocation {
case kv.AccessLocalZone:
if req.ReplicaRead {
observer = metrics.ReadRequestFollowerLocalBytes
} else {
observer = metrics.ReadRequestLeaderLocalBytes
}
case kv.AccessCrossZone:
if req.ReplicaRead {
observer = metrics.ReadRequestFollowerRemoteBytes
} else {
observer = metrics.ReadRequestLeaderRemoteBytes
}
case kv.AccessUnknown:
}
if observer != nil {
observer.Observe(float64(size + req.GetSize()))
}
}
} }
func (s *networkCollector) onReqStaleRead(size float64, isCrossZoneTraffic bool) { func (s *networkCollector) onReqStaleRead(size float64, isCrossZoneTraffic bool) {

View File

@ -495,7 +495,8 @@ func (s *RegionRequestSender) SendReqAsync(
opts: opts, opts: opts,
}, },
invariants: reqInvariants{ invariants: reqInvariants{
staleRead: req.StaleRead, staleRead: req.StaleRead,
replicaReadType: req.ReplicaReadType,
}, },
} }
@ -895,7 +896,8 @@ type sendReqState struct {
// If the tikvrpc.Request is changed during the retries or other operations. // If the tikvrpc.Request is changed during the retries or other operations.
// the reqInvariants can tell the initial state. // the reqInvariants can tell the initial state.
type reqInvariants struct { type reqInvariants struct {
staleRead bool staleRead bool
replicaReadType kv.ReplicaReadType
} }
// next encapsulates one iteration of the retry loop. calling `next` will handle send error (s.vars.err) or region error // next encapsulates one iteration of the retry loop. calling `next` will handle send error (s.vars.err) or region error
@ -1125,7 +1127,8 @@ func (s *sendReqState) send() (canceled bool) {
atomic.AddInt64(&execDetails.WaitKVRespDuration, int64(rpcDuration)) atomic.AddInt64(&execDetails.WaitKVRespDuration, int64(rpcDuration))
} }
collector := networkCollector{ collector := networkCollector{
staleRead: s.invariants.staleRead, staleRead: s.invariants.staleRead,
replicaReadType: s.invariants.replicaReadType,
} }
collector.onReq(req, execDetails) collector.onReq(req, execDetails)
collector.onResp(req, s.vars.resp, execDetails) collector.onResp(req, s.vars.resp, execDetails)
@ -1234,6 +1237,7 @@ func (s *sendReqState) initForAsyncRequest() (ok bool) {
// set access location based on source and target "zone" label. // set access location based on source and target "zone" label.
s.setReqAccessLocation(req) s.setReqAccessLocation(req)
req.Context.ClusterId = s.vars.rpcCtx.ClusterID req.Context.ClusterId = s.vars.rpcCtx.ClusterID
if req.InputRequestSource != "" && s.replicaSelector != nil { if req.InputRequestSource != "" && s.replicaSelector != nil {
patchRequestSource(req, s.replicaSelector.replicaType()) patchRequestSource(req, s.replicaSelector.replicaType())
@ -1263,8 +1267,8 @@ func (s *sendReqState) setReqAccessLocation(req *tikvrpc.Request) {
// set access location based on source and target "zone" label. // set access location based on source and target "zone" label.
if s.replicaSelector != nil && s.replicaSelector.target != nil { if s.replicaSelector != nil && s.replicaSelector.target != nil {
selfZoneLabel := config.GetGlobalConfig().ZoneLabel selfZoneLabel := config.GetGlobalConfig().ZoneLabel
targetZoneLabel, _ := s.replicaSelector.target.store.GetLabelValue("zone") targetZoneLabel, _ := s.replicaSelector.target.store.GetLabelValue(DCLabelKey)
// if either "zone" label is "", we actually don't known if it involves cross AZ traffic. // if either "zone" label is "", we actually don't know if it involves cross AZ traffic.
if selfZoneLabel == "" || targetZoneLabel == "" { if selfZoneLabel == "" || targetZoneLabel == "" {
req.AccessLocation = kv.AccessUnknown req.AccessLocation = kv.AccessUnknown
} else if selfZoneLabel == targetZoneLabel { } else if selfZoneLabel == targetZoneLabel {
@ -1297,7 +1301,8 @@ func (s *sendReqState) handleAsyncResponse(start time.Time, canceled bool, resp
atomic.AddInt64(&execDetails.WaitKVRespDuration, int64(rpcDuration)) atomic.AddInt64(&execDetails.WaitKVRespDuration, int64(rpcDuration))
} }
collector := networkCollector{ collector := networkCollector{
staleRead: s.invariants.staleRead, staleRead: s.invariants.staleRead,
replicaReadType: s.invariants.replicaReadType,
} }
collector.onReq(req, execDetails) collector.onReq(req, execDetails)
collector.onResp(req, resp, execDetails) collector.onResp(req, resp, execDetails)
@ -1434,7 +1439,8 @@ func (s *RegionRequestSender) SendReqCtx(
opts: opts, opts: opts,
}, },
invariants: reqInvariants{ invariants: reqInvariants{
staleRead: req.StaleRead, staleRead: req.StaleRead,
replicaReadType: req.ReplicaReadType,
}, },
} }

View File

@ -44,6 +44,11 @@ import (
"google.golang.org/grpc/keepalive" "google.golang.org/grpc/keepalive"
) )
const (
// DCLabelKey indicates the key of label which represents the dc for Store.
DCLabelKey = "zone"
)
type testingKnobs interface { type testingKnobs interface {
getMockRequestLiveness() livenessFunc getMockRequestLiveness() livenessFunc
setMockRequestLiveness(f livenessFunc) setMockRequestLiveness(f livenessFunc)

View File

@ -121,6 +121,7 @@ var (
TiKVTxnWriteConflictCounter prometheus.Counter TiKVTxnWriteConflictCounter prometheus.Counter
TiKVAsyncSendReqCounter *prometheus.CounterVec TiKVAsyncSendReqCounter *prometheus.CounterVec
TiKVAsyncBatchGetCounter *prometheus.CounterVec TiKVAsyncBatchGetCounter *prometheus.CounterVec
TiKVReadRequestBytes *prometheus.SummaryVec
) )
// Label constants. // Label constants.
@ -889,6 +890,14 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) {
ConstLabels: constLabels, ConstLabels: constLabels,
}, []string{LblResult}) }, []string{LblResult})
TiKVReadRequestBytes = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "read_request_bytes",
Help: "Summary of read requests bytes",
}, []string{LblType, LblResult})
initShortcuts() initShortcuts()
} }
@ -988,6 +997,7 @@ func RegisterMetrics() {
prometheus.MustRegister(TiKVTxnWriteConflictCounter) prometheus.MustRegister(TiKVTxnWriteConflictCounter)
prometheus.MustRegister(TiKVAsyncSendReqCounter) prometheus.MustRegister(TiKVAsyncSendReqCounter)
prometheus.MustRegister(TiKVAsyncBatchGetCounter) prometheus.MustRegister(TiKVAsyncBatchGetCounter)
prometheus.MustRegister(TiKVReadRequestBytes)
} }
// readCounter reads the value of a prometheus.Counter. // readCounter reads the value of a prometheus.Counter.

View File

@ -187,6 +187,11 @@ var (
AsyncBatchGetCounterWithRegionError prometheus.Counter AsyncBatchGetCounterWithRegionError prometheus.Counter
AsyncBatchGetCounterWithLockError prometheus.Counter AsyncBatchGetCounterWithLockError prometheus.Counter
AsyncBatchGetCounterWithOtherError prometheus.Counter AsyncBatchGetCounterWithOtherError prometheus.Counter
ReadRequestLeaderLocalBytes prometheus.Observer
ReadRequestLeaderRemoteBytes prometheus.Observer
ReadRequestFollowerLocalBytes prometheus.Observer
ReadRequestFollowerRemoteBytes prometheus.Observer
) )
func initShortcuts() { func initShortcuts() {
@ -343,4 +348,9 @@ func initShortcuts() {
AsyncBatchGetCounterWithRegionError = TiKVAsyncBatchGetCounter.WithLabelValues("region_error") AsyncBatchGetCounterWithRegionError = TiKVAsyncBatchGetCounter.WithLabelValues("region_error")
AsyncBatchGetCounterWithLockError = TiKVAsyncBatchGetCounter.WithLabelValues("lock_error") AsyncBatchGetCounterWithLockError = TiKVAsyncBatchGetCounter.WithLabelValues("lock_error")
AsyncBatchGetCounterWithOtherError = TiKVAsyncBatchGetCounter.WithLabelValues("other_error") AsyncBatchGetCounterWithOtherError = TiKVAsyncBatchGetCounter.WithLabelValues("other_error")
ReadRequestLeaderLocalBytes = TiKVReadRequestBytes.WithLabelValues("leader", "local")
ReadRequestLeaderRemoteBytes = TiKVReadRequestBytes.WithLabelValues("leader", "cross-zone")
ReadRequestFollowerLocalBytes = TiKVReadRequestBytes.WithLabelValues("follower", "local")
ReadRequestFollowerRemoteBytes = TiKVReadRequestBytes.WithLabelValues("follower", "cross-zone")
} }

View File

@ -83,7 +83,7 @@ import (
const ( const (
// DCLabelKey indicates the key of label which represents the dc for Store. // DCLabelKey indicates the key of label which represents the dc for Store.
DCLabelKey = "zone" DCLabelKey = locate.DCLabelKey
safeTSUpdateInterval = time.Second * 2 safeTSUpdateInterval = time.Second * 2
defaultPipelinedFlushConcurrency = 128 defaultPipelinedFlushConcurrency = 128