mirror of https://github.com/tikv/client-go.git
Signed-off-by: crazycs520 <crazycs520@gmail.com> Co-authored-by: disksing <i@disksing.com>
This commit is contained in:
parent
2f119351bd
commit
4ec212d5f2
|
|
@ -259,6 +259,7 @@ type replicaSelector struct {
|
|||
region *Region
|
||||
regionStore *regionStore
|
||||
replicas []*replica
|
||||
labels []*metapb.StoreLabel
|
||||
state selectorState
|
||||
// replicas[targetIdx] is the replica handling the request this time
|
||||
targetIdx AccessIndex
|
||||
|
|
@ -747,6 +748,10 @@ func newReplicaSelector(
|
|||
)
|
||||
}
|
||||
|
||||
option := storeSelectorOp{}
|
||||
for _, op := range opts {
|
||||
op(&option)
|
||||
}
|
||||
var state selectorState
|
||||
if !req.ReplicaReadType.IsFollowerRead() {
|
||||
if regionCache.enableForwarding && regionStore.proxyTiKVIdx >= 0 {
|
||||
|
|
@ -755,10 +760,6 @@ func newReplicaSelector(
|
|||
state = &accessKnownLeader{leaderIdx: regionStore.workTiKVIdx}
|
||||
}
|
||||
} else {
|
||||
option := storeSelectorOp{}
|
||||
for _, op := range opts {
|
||||
op(&option)
|
||||
}
|
||||
if req.ReplicaReadType == kv.ReplicaReadPreferLeader {
|
||||
WithPerferLeader()(&option)
|
||||
}
|
||||
|
|
@ -778,6 +779,7 @@ func newReplicaSelector(
|
|||
cachedRegion,
|
||||
regionStore,
|
||||
replicas,
|
||||
option.labels,
|
||||
state,
|
||||
-1,
|
||||
-1,
|
||||
|
|
@ -1156,9 +1158,14 @@ func (s *RegionRequestSender) SendReqCtx(
|
|||
|
||||
var staleReadCollector *staleReadMetricsCollector
|
||||
if req.StaleRead {
|
||||
staleReadCollector = &staleReadMetricsCollector{hit: true}
|
||||
staleReadCollector.onReq(req)
|
||||
defer staleReadCollector.collect()
|
||||
staleReadCollector = &staleReadMetricsCollector{}
|
||||
defer func() {
|
||||
if retryTimes == 0 {
|
||||
metrics.StaleReadHitCounter.Add(1)
|
||||
} else {
|
||||
metrics.StaleReadMissCounter.Add(1)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
for {
|
||||
|
|
@ -1171,9 +1178,6 @@ func (s *RegionRequestSender) SendReqCtx(
|
|||
zap.Int("times", retryTimes),
|
||||
)
|
||||
}
|
||||
if req.StaleRead && staleReadCollector != nil {
|
||||
staleReadCollector.hit = false
|
||||
}
|
||||
}
|
||||
|
||||
rpcCtx, err = s.getRPCContext(bo, req, regionID, et, opts...)
|
||||
|
|
@ -1204,6 +1208,14 @@ func (s *RegionRequestSender) SendReqCtx(
|
|||
return resp, nil, retryTimes, err
|
||||
}
|
||||
|
||||
var isLocalTraffic bool
|
||||
if staleReadCollector != nil && s.replicaSelector != nil {
|
||||
if target := s.replicaSelector.targetReplica(); target != nil {
|
||||
isLocalTraffic = target.store.IsLabelsMatch(s.replicaSelector.labels)
|
||||
staleReadCollector.onReq(req, isLocalTraffic)
|
||||
}
|
||||
}
|
||||
|
||||
logutil.Eventf(bo.GetCtx(), "send %s request to region %d at %s", req.Type, regionID.id, rpcCtx.Addr)
|
||||
s.storeAddr = rpcCtx.Addr
|
||||
|
||||
|
|
@ -1262,7 +1274,7 @@ func (s *RegionRequestSender) SendReqCtx(
|
|||
}
|
||||
}
|
||||
if staleReadCollector != nil {
|
||||
staleReadCollector.onResp(resp)
|
||||
staleReadCollector.onResp(req.Type, resp, isLocalTraffic)
|
||||
}
|
||||
return resp, rpcCtx, retryTimes, nil
|
||||
}
|
||||
|
|
@ -1946,35 +1958,36 @@ func (s *RegionRequestSender) onRegionError(
|
|||
}
|
||||
|
||||
type staleReadMetricsCollector struct {
|
||||
tp tikvrpc.CmdType
|
||||
hit bool
|
||||
out int
|
||||
in int
|
||||
}
|
||||
|
||||
func (s *staleReadMetricsCollector) onReq(req *tikvrpc.Request) {
|
||||
func (s *staleReadMetricsCollector) onReq(req *tikvrpc.Request, isLocalTraffic bool) {
|
||||
size := 0
|
||||
switch req.Type {
|
||||
case tikvrpc.CmdGet:
|
||||
size += req.Get().Size()
|
||||
size = req.Get().Size()
|
||||
case tikvrpc.CmdBatchGet:
|
||||
size += req.BatchGet().Size()
|
||||
size = req.BatchGet().Size()
|
||||
case tikvrpc.CmdScan:
|
||||
size += req.Scan().Size()
|
||||
size = req.Scan().Size()
|
||||
case tikvrpc.CmdCop:
|
||||
size += req.Cop().Size()
|
||||
size = req.Cop().Size()
|
||||
default:
|
||||
// ignore non-read requests
|
||||
return
|
||||
}
|
||||
s.tp = req.Type
|
||||
size += req.Context.Size()
|
||||
s.out = size
|
||||
if isLocalTraffic {
|
||||
metrics.StaleReadLocalOutBytes.Add(float64(size))
|
||||
metrics.StaleReadReqLocalCounter.Add(1)
|
||||
} else {
|
||||
metrics.StaleReadRemoteOutBytes.Add(float64(size))
|
||||
metrics.StaleReadReqCrossZoneCounter.Add(1)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *staleReadMetricsCollector) onResp(resp *tikvrpc.Response) {
|
||||
func (s *staleReadMetricsCollector) onResp(tp tikvrpc.CmdType, resp *tikvrpc.Response, isLocalTraffic bool) {
|
||||
size := 0
|
||||
switch s.tp {
|
||||
switch tp {
|
||||
case tikvrpc.CmdGet:
|
||||
size += resp.Resp.(*kvrpcpb.GetResponse).Size()
|
||||
case tikvrpc.CmdBatchGet:
|
||||
|
|
@ -1984,19 +1997,12 @@ func (s *staleReadMetricsCollector) onResp(resp *tikvrpc.Response) {
|
|||
case tikvrpc.CmdCop:
|
||||
size += resp.Resp.(*coprocessor.Response).Size()
|
||||
default:
|
||||
// unreachable
|
||||
// ignore non-read requests
|
||||
return
|
||||
}
|
||||
s.in = size
|
||||
}
|
||||
|
||||
func (s *staleReadMetricsCollector) collect() {
|
||||
in, out := metrics.StaleReadHitInTraffic, metrics.StaleReadHitOutTraffic
|
||||
if !s.hit {
|
||||
in, out = metrics.StaleReadMissInTraffic, metrics.StaleReadMissOutTraffic
|
||||
}
|
||||
if s.in > 0 && s.out > 0 {
|
||||
in.Observe(float64(s.in))
|
||||
out.Observe(float64(s.out))
|
||||
if isLocalTraffic {
|
||||
metrics.StaleReadLocalInBytes.Add(float64(size))
|
||||
} else {
|
||||
metrics.StaleReadRemoteInBytes.Add(float64(size))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -101,7 +101,9 @@ var (
|
|||
TiKVAggressiveLockedKeysCounter *prometheus.CounterVec
|
||||
TiKVStoreSlowScoreGauge *prometheus.GaugeVec
|
||||
TiKVPreferLeaderFlowsGauge *prometheus.GaugeVec
|
||||
TiKVStaleReadSizeSummary *prometheus.SummaryVec
|
||||
TiKVStaleReadCounter *prometheus.CounterVec
|
||||
TiKVStaleReadReqCounter *prometheus.CounterVec
|
||||
TiKVStaleReadBytes *prometheus.CounterVec
|
||||
)
|
||||
|
||||
// Label constants.
|
||||
|
|
@ -700,13 +702,28 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) {
|
|||
ConstLabels: constLabels,
|
||||
}, []string{LblType, LblStore})
|
||||
|
||||
TiKVStaleReadSizeSummary = prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "stale_read_bytes",
|
||||
Help: "Size of stale read.",
|
||||
ConstLabels: constLabels,
|
||||
TiKVStaleReadCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "stale_read_counter",
|
||||
Help: "Counter of stale read hit/miss",
|
||||
}, []string{LblResult})
|
||||
|
||||
TiKVStaleReadReqCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "stale_read_req_counter",
|
||||
Help: "Counter of stale read requests",
|
||||
}, []string{LblType})
|
||||
|
||||
TiKVStaleReadBytes = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "stale_read_bytes",
|
||||
Help: "Counter of stale read requests bytes",
|
||||
}, []string{LblResult, LblDirection})
|
||||
|
||||
initShortcuts()
|
||||
|
|
@ -789,7 +806,9 @@ func RegisterMetrics() {
|
|||
prometheus.MustRegister(TiKVAggressiveLockedKeysCounter)
|
||||
prometheus.MustRegister(TiKVStoreSlowScoreGauge)
|
||||
prometheus.MustRegister(TiKVPreferLeaderFlowsGauge)
|
||||
prometheus.MustRegister(TiKVStaleReadSizeSummary)
|
||||
prometheus.MustRegister(TiKVStaleReadCounter)
|
||||
prometheus.MustRegister(TiKVStaleReadReqCounter)
|
||||
prometheus.MustRegister(TiKVStaleReadBytes)
|
||||
}
|
||||
|
||||
// readCounter reads the value of a prometheus.Counter.
|
||||
|
|
|
|||
|
|
@ -160,10 +160,16 @@ var (
|
|||
AggressiveLockedKeysLockedWithConflict prometheus.Counter
|
||||
AggressiveLockedKeysNonForceLock prometheus.Counter
|
||||
|
||||
StaleReadHitInTraffic prometheus.Observer
|
||||
StaleReadHitOutTraffic prometheus.Observer
|
||||
StaleReadMissInTraffic prometheus.Observer
|
||||
StaleReadMissOutTraffic prometheus.Observer
|
||||
StaleReadHitCounter prometheus.Counter
|
||||
StaleReadMissCounter prometheus.Counter
|
||||
|
||||
StaleReadReqLocalCounter prometheus.Counter
|
||||
StaleReadReqCrossZoneCounter prometheus.Counter
|
||||
|
||||
StaleReadLocalInBytes prometheus.Counter
|
||||
StaleReadLocalOutBytes prometheus.Counter
|
||||
StaleReadRemoteInBytes prometheus.Counter
|
||||
StaleReadRemoteOutBytes prometheus.Counter
|
||||
)
|
||||
|
||||
func initShortcuts() {
|
||||
|
|
@ -296,8 +302,14 @@ func initShortcuts() {
|
|||
// TiKV).
|
||||
AggressiveLockedKeysNonForceLock = TiKVAggressiveLockedKeysCounter.WithLabelValues("non_force_lock")
|
||||
|
||||
StaleReadHitInTraffic = TiKVStaleReadSizeSummary.WithLabelValues("hit", "in")
|
||||
StaleReadHitOutTraffic = TiKVStaleReadSizeSummary.WithLabelValues("hit", "out")
|
||||
StaleReadMissInTraffic = TiKVStaleReadSizeSummary.WithLabelValues("miss", "in")
|
||||
StaleReadMissOutTraffic = TiKVStaleReadSizeSummary.WithLabelValues("miss", "out")
|
||||
StaleReadHitCounter = TiKVStaleReadCounter.WithLabelValues("hit")
|
||||
StaleReadMissCounter = TiKVStaleReadCounter.WithLabelValues("miss")
|
||||
|
||||
StaleReadReqLocalCounter = TiKVStaleReadReqCounter.WithLabelValues("local")
|
||||
StaleReadReqCrossZoneCounter = TiKVStaleReadReqCounter.WithLabelValues("cross-zone")
|
||||
|
||||
StaleReadLocalInBytes = TiKVStaleReadBytes.WithLabelValues("local", "in")
|
||||
StaleReadLocalOutBytes = TiKVStaleReadBytes.WithLabelValues("local", "out")
|
||||
StaleReadRemoteInBytes = TiKVStaleReadBytes.WithLabelValues("cross-zone", "in")
|
||||
StaleReadRemoteOutBytes = TiKVStaleReadBytes.WithLabelValues("cross-zone", "out")
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue