mirror of https://github.com/tikv/client-go.git
add metrics for stale-read traffic (#776)
Signed-off-by: you06 <you1474600@gmail.com>
This commit is contained in:
parent
f64d5dd79b
commit
c619a50474
|
|
@ -1155,6 +1155,14 @@ func (s *RegionRequestSender) SendReqCtx(
|
|||
metrics.TiKVRequestRetryTimesHistogram.Observe(float64(retryTimes))
|
||||
}
|
||||
}()
|
||||
|
||||
var staleReadCollector *staleReadMetricsCollector
|
||||
if req.StaleRead {
|
||||
staleReadCollector = &staleReadMetricsCollector{hit: true}
|
||||
staleReadCollector.onReq(req)
|
||||
defer staleReadCollector.collect()
|
||||
}
|
||||
|
||||
for {
|
||||
if retryTimes > 0 {
|
||||
req.IsRetryRequest = true
|
||||
|
|
@ -1165,6 +1173,9 @@ func (s *RegionRequestSender) SendReqCtx(
|
|||
zap.Int("times", retryTimes),
|
||||
)
|
||||
}
|
||||
if req.StaleRead && staleReadCollector != nil {
|
||||
staleReadCollector.hit = false
|
||||
}
|
||||
}
|
||||
|
||||
rpcCtx, err = s.getRPCContext(bo, req, regionID, et, opts...)
|
||||
|
|
@ -1252,6 +1263,9 @@ func (s *RegionRequestSender) SendReqCtx(
|
|||
s.replicaSelector.onSendSuccess()
|
||||
}
|
||||
}
|
||||
if staleReadCollector != nil {
|
||||
staleReadCollector.onResp(resp)
|
||||
}
|
||||
return resp, rpcCtx, retryTimes, nil
|
||||
}
|
||||
}
|
||||
|
|
@ -1917,3 +1931,59 @@ func (s *RegionRequestSender) onRegionError(
|
|||
// Because caller may need to re-split the request.
|
||||
return false, nil
|
||||
}
|
||||
|
||||
type staleReadMetricsCollector struct {
|
||||
tp tikvrpc.CmdType
|
||||
hit bool
|
||||
out int
|
||||
in int
|
||||
}
|
||||
|
||||
func (s *staleReadMetricsCollector) onReq(req *tikvrpc.Request) {
|
||||
size := 0
|
||||
switch req.Type {
|
||||
case tikvrpc.CmdGet:
|
||||
size += req.Get().Size()
|
||||
case tikvrpc.CmdBatchGet:
|
||||
size += req.BatchGet().Size()
|
||||
case tikvrpc.CmdScan:
|
||||
size += req.Scan().Size()
|
||||
case tikvrpc.CmdCop:
|
||||
size += req.Cop().Size()
|
||||
default:
|
||||
// ignore non-read requests
|
||||
return
|
||||
}
|
||||
s.tp = req.Type
|
||||
size += req.Context.Size()
|
||||
s.out = size
|
||||
}
|
||||
|
||||
func (s *staleReadMetricsCollector) onResp(resp *tikvrpc.Response) {
|
||||
size := 0
|
||||
switch s.tp {
|
||||
case tikvrpc.CmdGet:
|
||||
size += resp.Resp.(*kvrpcpb.GetResponse).Size()
|
||||
case tikvrpc.CmdBatchGet:
|
||||
size += resp.Resp.(*kvrpcpb.BatchGetResponse).Size()
|
||||
case tikvrpc.CmdScan:
|
||||
size += resp.Resp.(*kvrpcpb.ScanResponse).Size()
|
||||
case tikvrpc.CmdCop:
|
||||
size += resp.Resp.(*coprocessor.Response).Size()
|
||||
default:
|
||||
// unreachable
|
||||
return
|
||||
}
|
||||
s.in = size
|
||||
}
|
||||
|
||||
func (s *staleReadMetricsCollector) collect() {
|
||||
in, out := metrics.StaleReadHitInTraffic, metrics.StaleReadHitOutTraffic
|
||||
if !s.hit {
|
||||
in, out = metrics.StaleReadMissInTraffic, metrics.StaleReadMissOutTraffic
|
||||
}
|
||||
if s.in > 0 && s.out > 0 {
|
||||
in.Observe(float64(s.in))
|
||||
out.Observe(float64(s.out))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -101,6 +101,7 @@ var (
|
|||
TiKVAggressiveLockedKeysCounter *prometheus.CounterVec
|
||||
TiKVStoreSlowScoreGauge *prometheus.GaugeVec
|
||||
TiKVPreferLeaderFlowsGauge *prometheus.GaugeVec
|
||||
TiKVStaleReadSizeSummary *prometheus.SummaryVec
|
||||
)
|
||||
|
||||
// Label constants.
|
||||
|
|
@ -124,6 +125,7 @@ const (
|
|||
LblScope = "scope"
|
||||
LblInternal = "internal"
|
||||
LblGeneral = "general"
|
||||
LblDirection = "direction"
|
||||
)
|
||||
|
||||
func initMetrics(namespace, subsystem string) {
|
||||
|
|
@ -638,6 +640,14 @@ func initMetrics(namespace, subsystem string) {
|
|||
Help: "Counter of flows under PreferLeader mode.",
|
||||
}, []string{LblType, LblStore})
|
||||
|
||||
TiKVStaleReadSizeSummary = prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "stale_read_bytes",
|
||||
Help: "Size of stale read.",
|
||||
}, []string{LblResult, LblDirection})
|
||||
|
||||
initShortcuts()
|
||||
}
|
||||
|
||||
|
|
@ -713,6 +723,7 @@ func RegisterMetrics() {
|
|||
prometheus.MustRegister(TiKVAggressiveLockedKeysCounter)
|
||||
prometheus.MustRegister(TiKVStoreSlowScoreGauge)
|
||||
prometheus.MustRegister(TiKVPreferLeaderFlowsGauge)
|
||||
prometheus.MustRegister(TiKVStaleReadSizeSummary)
|
||||
}
|
||||
|
||||
// readCounter reads the value of a prometheus.Counter.
|
||||
|
|
|
|||
|
|
@ -159,6 +159,11 @@ var (
|
|||
AggressiveLockedKeysDerived prometheus.Counter
|
||||
AggressiveLockedKeysLockedWithConflict prometheus.Counter
|
||||
AggressiveLockedKeysNonForceLock prometheus.Counter
|
||||
|
||||
StaleReadHitInTraffic prometheus.Observer
|
||||
StaleReadHitOutTraffic prometheus.Observer
|
||||
StaleReadMissInTraffic prometheus.Observer
|
||||
StaleReadMissOutTraffic prometheus.Observer
|
||||
)
|
||||
|
||||
func initShortcuts() {
|
||||
|
|
@ -290,4 +295,9 @@ func initShortcuts() {
|
|||
// `WakeUpMode = PessimisticLockWakeUpMode_WakeUpModeNormal`, which will disable `allow_lock_with_conflict` in
|
||||
// TiKV).
|
||||
AggressiveLockedKeysNonForceLock = TiKVAggressiveLockedKeysCounter.WithLabelValues("non_force_lock")
|
||||
|
||||
StaleReadHitInTraffic = TiKVStaleReadSizeSummary.WithLabelValues("hit", "in")
|
||||
StaleReadHitOutTraffic = TiKVStaleReadSizeSummary.WithLabelValues("hit", "out")
|
||||
StaleReadMissInTraffic = TiKVStaleReadSizeSummary.WithLabelValues("miss", "in")
|
||||
StaleReadMissOutTraffic = TiKVStaleReadSizeSummary.WithLabelValues("miss", "out")
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue