Fix stale read metrics (#1649)

close tikv/client-go#1648

Signed-off-by: you06 <you1474600@gmail.com>
This commit is contained in:
you06 2025-06-14 11:02:13 +09:00 committed by GitHub
parent 74c0a81150
commit 924198a868
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 238 additions and 87 deletions

View File

@ -638,14 +638,6 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R
elapsed := time.Since(start)
connArray.updateRPCMetrics(req, resp, elapsed)
if stmtExec := ctx.Value(util.ExecDetailsKey); stmtExec != nil {
execDetails := stmtExec.(*util.ExecDetails)
atomic.AddInt64(&execDetails.WaitKVRespDuration, int64(elapsed))
execNetworkCollector := networkCollector{}
execNetworkCollector.onReq(req, execDetails)
execNetworkCollector.onResp(req, resp, execDetails)
}
if spanRPC != nil && util.TraceExecDetailsEnabled(ctx) {
if si := buildSpanInfoFromResp(resp); si != nil {
si.addTo(spanRPC, start)

View File

@ -121,15 +121,6 @@ func (c *RPCClient) SendRequestAsync(ctx context.Context, addr string, req *tikv
// rpc metrics
connArray.updateRPCMetrics(req, resp, elapsed)
// resource control
if stmtExec := ctx.Value(util.ExecDetailsKey); stmtExec != nil {
execDetails := stmtExec.(*util.ExecDetails)
atomic.AddInt64(&execDetails.WaitKVRespDuration, int64(elapsed))
execNetworkCollector := networkCollector{}
execNetworkCollector.onReq(req, execDetails)
execNetworkCollector.onResp(req, resp, execDetails)
}
// tracing
if spanRPC != nil {
if util.TraceExecDetailsEnabled(ctx) {

View File

@ -40,6 +40,7 @@ import (
"strconv"
"time"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util/async"
@ -136,6 +137,9 @@ func (r reqCollapse) collapse(ctx context.Context, key string, sf *singleflight.
addr string, req *tikvrpc.Request, timeout time.Duration) (resp *tikvrpc.Response, err error) {
// because the request may be used by other goroutines, copy the request to avoid data race.
copyReq := *req
if req.Type == tikvrpc.CmdResolveLock && req.Req != nil {
copyReq.Req = proto.Clone(req.ResolveLock())
}
rsC := sf.DoChan(key, func() (interface{}, error) {
return r.Client.SendRequest(context.Background(), addr, &copyReq, ReadTimeoutShort) // use resolveLock timeout.
})

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package client
package locate
import (
"sync/atomic"
@ -20,36 +20,15 @@ import (
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/mpp"
"github.com/pingcap/kvproto/pkg/tikvpb"
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util"
)
type staleReadMetricsCollector struct {
}
func (s *staleReadMetricsCollector) onReq(size float64, isCrossZoneTraffic bool) {
if isCrossZoneTraffic {
metrics.StaleReadRemoteOutBytes.Add(float64(size))
metrics.StaleReadReqCrossZoneCounter.Add(1)
} else {
metrics.StaleReadLocalOutBytes.Add(float64(size))
metrics.StaleReadReqLocalCounter.Add(1)
}
}
func (s *staleReadMetricsCollector) onResp(size float64, isCrossZoneTraffic bool) {
if isCrossZoneTraffic {
metrics.StaleReadRemoteInBytes.Add(float64(size))
} else {
metrics.StaleReadLocalInBytes.Add(float64(size))
}
}
type networkCollector struct {
staleReadMetricsCollector
staleRead bool
}
func (s *networkCollector) onReq(req *tikvrpc.Request, details *util.ExecDetails) {
@ -92,24 +71,25 @@ func (s *networkCollector) onReq(req *tikvrpc.Request, details *util.ExecDetails
// ignore others
return
}
isTiflashTarget := req.StoreTp == tikvrpc.TiFlash
var total, crossZone *int64
if isTiflashTarget {
total = &details.UnpackedBytesSentMPPTotal
crossZone = &details.UnpackedBytesSentMPPCrossZone
} else {
total = &details.UnpackedBytesSentKVTotal
crossZone = &details.UnpackedBytesSentKVCrossZone
}
atomic.AddInt64(total, int64(size))
isCrossZoneTraffic := req.AccessLocation == kv.AccessCrossZone
if isCrossZoneTraffic {
atomic.AddInt64(crossZone, int64(size))
if details != nil {
isTiflashTarget := req.StoreTp == tikvrpc.TiFlash
var total, crossZone *int64
if isTiflashTarget {
total = &details.UnpackedBytesSentMPPTotal
crossZone = &details.UnpackedBytesSentMPPCrossZone
} else {
total = &details.UnpackedBytesSentKVTotal
crossZone = &details.UnpackedBytesSentKVCrossZone
}
atomic.AddInt64(total, int64(size))
if isCrossZoneTraffic {
atomic.AddInt64(crossZone, int64(size))
}
}
// stale read metrics
if req.StaleRead {
s.staleReadMetricsCollector.onReq(float64(size), isCrossZoneTraffic)
if s.staleRead {
s.onReqStaleRead(float64(size), isCrossZoneTraffic)
}
}
@ -117,6 +97,9 @@ func (s *networkCollector) onResp(req *tikvrpc.Request, resp *tikvrpc.Response,
if resp == nil {
return
}
if _, ok := resp.Resp.(*tikvpb.BatchCommandsEmptyResponse); ok {
return
}
size := 0
switch req.Type {
case tikvrpc.CmdGet:
@ -160,23 +143,46 @@ func (s *networkCollector) onResp(req *tikvrpc.Request, resp *tikvrpc.Response,
// ignore others
return
}
var total, crossZone *int64
isTiflashTarget := req.StoreTp == tikvrpc.TiFlash
if isTiflashTarget {
total = &details.UnpackedBytesReceivedMPPTotal
crossZone = &details.UnpackedBytesReceivedMPPCrossZone
} else {
total = &details.UnpackedBytesReceivedKVTotal
crossZone = &details.UnpackedBytesReceivedKVCrossZone
isCrossZoneTraffic := req.AccessLocation == kv.AccessCrossZone
// exec details
if details != nil {
var total, crossZone *int64
isTiflashTarget := req.StoreTp == tikvrpc.TiFlash
if isTiflashTarget {
total = &details.UnpackedBytesReceivedMPPTotal
crossZone = &details.UnpackedBytesReceivedMPPCrossZone
} else {
total = &details.UnpackedBytesReceivedKVTotal
crossZone = &details.UnpackedBytesReceivedKVCrossZone
}
atomic.AddInt64(total, int64(size))
if isCrossZoneTraffic {
atomic.AddInt64(crossZone, int64(size))
}
}
atomic.AddInt64(total, int64(size))
isCrossZoneTraffic := req.AccessLocation == kv.AccessCrossZone
if isCrossZoneTraffic {
atomic.AddInt64(crossZone, int64(size))
}
// stale read metrics
if req.StaleRead {
s.staleReadMetricsCollector.onResp(float64(size), isCrossZoneTraffic)
if s.staleRead {
s.onRespStaleRead(float64(size), isCrossZoneTraffic)
}
}
func (s *networkCollector) onReqStaleRead(size float64, isCrossZoneTraffic bool) {
if isCrossZoneTraffic {
metrics.StaleReadRemoteOutBytes.Add(size)
metrics.StaleReadReqCrossZoneCounter.Add(1)
} else {
metrics.StaleReadLocalOutBytes.Add(size)
metrics.StaleReadReqLocalCounter.Add(1)
}
}
func (s *networkCollector) onRespStaleRead(size float64, isCrossZoneTraffic bool) {
if isCrossZoneTraffic {
metrics.StaleReadRemoteInBytes.Add(size)
} else {
metrics.StaleReadLocalInBytes.Add(size)
}
}

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package client
package locate
import (
"sync/atomic"
@ -85,9 +85,6 @@ func TestNetworkCollectorOnReq(t *testing.T) {
}
func TestNetworkCollectorOnResp(t *testing.T) {
// Initialize the collector and dependencies
collector := &networkCollector{}
// Construct requests and responses
reqs := []*tikvrpc.Request{
tikvrpc.NewRequest(
@ -140,6 +137,10 @@ func TestNetworkCollectorOnResp(t *testing.T) {
for _, cas := range testCases {
// Call the method
cas.req.AccessLocation = kv.AccessLocalZone
// Initialize the collector and dependencies
collector := &networkCollector{
staleRead: cas.req.StaleRead,
}
collector.onResp(cas.req, cas.resp, details)
// Verify metrics

View File

@ -499,6 +499,9 @@ func (s *RegionRequestSender) SendReqAsync(
et: tikvrpc.TiKV,
opts: opts,
},
invariants: reqInvariants{
staleRead: req.StaleRead,
},
}
cb.Inject(func(resp *tikvrpc.ResponseExt, err error) (*tikvrpc.ResponseExt, error) {
@ -522,7 +525,7 @@ func (s *RegionRequestSender) SendReqAsync(
if retryTimes > 0 {
metrics.TiKVRequestRetryTimesHistogram.Observe(float64(retryTimes))
}
if req.StaleRead {
if state.invariants.staleRead {
if state.vars.sendTimes == 1 {
metrics.StaleReadHitCounter.Add(1)
} else {
@ -573,7 +576,11 @@ func (s *RegionRequestSender) SendReqAsync(
cli.SendRequestAsync(ctx, sendToAddr, req, async.NewCallback(cb.Executor(), func(resp *tikvrpc.Response, err error) {
state.vars.sendTimes++
canceled := err != nil && hookCtx.Err() != nil && errors.Cause(hookCtx.Err()) == context.Canceled
if state.handleAsyncResponse(startTime, canceled, resp, err, cancels...) {
var execDetails *util.ExecDetails
if val := ctx.Value(util.ExecDetailsKey); val != nil {
execDetails = val.(*util.ExecDetails)
}
if state.handleAsyncResponse(startTime, canceled, resp, err, execDetails, cancels...) {
cb.Invoke(state.toResponseExt())
return
}
@ -885,6 +892,15 @@ type sendReqState struct {
msg string
sendTimes int
}
invariants reqInvariants
}
// reqInvariants holds the input state of the request.
// If the tikvrpc.Request is changed during the retries or other operations.
// the reqInvariants can tell the initial state.
type reqInvariants struct {
staleRead bool
}
// next encapsulates one iteration of the retry loop. calling `next` will handle send error (s.vars.err) or region error
@ -966,7 +982,6 @@ func (s *sendReqState) next() (done bool) {
if s.replicaSelector != nil &&
s.replicaSelector.target != nil &&
req.AccessLocation == kv.AccessUnknown &&
len(s.replicaSelector.option.labels) != 0 {
// patch the access location if it is not set under region request sender.
if s.replicaSelector.target.store.IsLabelsMatch(s.replicaSelector.option.labels) {
@ -1114,6 +1129,18 @@ func (s *sendReqState) send() (canceled bool) {
if s.replicaSelector != nil {
recordAttemptedTime(s.replicaSelector, rpcDuration)
}
var execDetails *util.ExecDetails
if stmtExec := ctx.Value(util.ExecDetailsKey); stmtExec != nil {
execDetails := stmtExec.(*util.ExecDetails)
atomic.AddInt64(&execDetails.WaitKVRespDuration, int64(rpcDuration))
}
collector := networkCollector{
staleRead: s.invariants.staleRead,
}
collector.onReq(req, execDetails)
collector.onResp(req, s.vars.resp, execDetails)
// Record timecost of external requests on related Store when `ReplicaReadMode == "PreferLeader"`.
if rpcCtx.Store != nil && req.ReplicaReadType == kv.ReplicaReadPreferLeader && !util.IsInternalRequest(req.RequestSource) {
rpcCtx.Store.healthStatus.recordClientSideSlowScoreStat(rpcDuration)
@ -1218,7 +1245,6 @@ func (s *sendReqState) initForAsyncRequest() (ok bool) {
if s.replicaSelector != nil &&
s.replicaSelector.target != nil &&
req.AccessLocation == kv.AccessUnknown &&
len(s.replicaSelector.option.labels) != 0 {
// patch the access location if it is not set under region request sender.
if s.replicaSelector.target.store.IsLabelsMatch(s.replicaSelector.option.labels) {
@ -1251,7 +1277,7 @@ func (s *sendReqState) initForAsyncRequest() (ok bool) {
}
// handleAsyncResponse handles the response of an async request.
func (s *sendReqState) handleAsyncResponse(start time.Time, canceled bool, resp *tikvrpc.Response, err error, cancels ...context.CancelFunc) (done bool) {
func (s *sendReqState) handleAsyncResponse(start time.Time, canceled bool, resp *tikvrpc.Response, err error, execDetails *util.ExecDetails, cancels ...context.CancelFunc) (done bool) {
if len(cancels) > 0 {
defer func() {
for i := len(cancels) - 1; i >= 0; i-- {
@ -1268,6 +1294,15 @@ func (s *sendReqState) handleAsyncResponse(start time.Time, canceled bool, resp
if s.Stats != nil {
s.Stats.RecordRPCRuntimeStats(req.Type, rpcDuration)
}
if execDetails != nil {
atomic.AddInt64(&execDetails.WaitKVRespDuration, int64(rpcDuration))
}
collector := networkCollector{
staleRead: s.invariants.staleRead,
}
collector.onReq(req, execDetails)
collector.onResp(req, resp, execDetails)
if s.vars.rpcCtx.Store != nil && req.ReplicaReadType == kv.ReplicaReadPreferLeader && !util.IsInternalRequest(req.RequestSource) {
s.vars.rpcCtx.Store.healthStatus.recordClientSideSlowScoreStat(rpcDuration)
}
@ -1394,13 +1429,16 @@ func (s *RegionRequestSender) SendReqCtx(
et: et,
opts: opts,
},
invariants: reqInvariants{
staleRead: req.StaleRead,
},
}
defer func() {
if retryTimes := state.vars.sendTimes - 1; retryTimes > 0 {
metrics.TiKVRequestRetryTimesHistogram.Observe(float64(retryTimes))
}
if req.StaleRead {
if state.invariants.staleRead {
if state.vars.sendTimes == 1 {
metrics.StaleReadHitCounter.Add(1)
} else {

View File

@ -50,6 +50,8 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/config/retry"
tikverr "github.com/tikv/client-go/v2/error"
@ -57,8 +59,11 @@ import (
"github.com/tikv/client-go/v2/internal/client"
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util"
"github.com/tikv/client-go/v2/util/async"
"go.uber.org/zap"
)
@ -1639,3 +1644,99 @@ func (s *testRegionRequestToThreeStoresSuite) TestTiKVRecoveredFromDown() {
}
s.Require().Fail("should access recovered peer after region reloading within RegionCacheTTL")
}
func (s *testRegionRequestToThreeStoresSuite) TestStaleReadMetrics() {
readMetric := func(col prometheus.Collector) int {
ch := make(chan prometheus.Metric, 1)
col.Collect(ch)
var m dto.Metric
s.Nil((<-ch).Write(&m))
return int(*m.Counter.Value + 0.000001) // round to int and avoid floating point precision issues
}
for _, staleReadHit := range []bool{false, true} {
for _, asyncReq := range []bool{false, true} {
caseName := fmt.Sprintf("async=%t, staleReadHit=%t", asyncReq, staleReadHit)
// Delete all vectors and recreate them before each test case.
metrics.TiKVStaleReadCounter.Reset()
metrics.TiKVStaleReadReqCounter.Reset()
metrics.TiKVStaleReadBytes.Reset()
metrics.StaleReadHitCounter = metrics.TiKVStaleReadCounter.WithLabelValues("hit")
metrics.StaleReadMissCounter = metrics.TiKVStaleReadCounter.WithLabelValues("miss")
metrics.StaleReadReqLocalCounter = metrics.TiKVStaleReadReqCounter.WithLabelValues("local")
metrics.StaleReadReqCrossZoneCounter = metrics.TiKVStaleReadReqCounter.WithLabelValues("cross-zone")
metrics.StaleReadLocalInBytes = metrics.TiKVStaleReadBytes.WithLabelValues("local", "in")
metrics.StaleReadLocalOutBytes = metrics.TiKVStaleReadBytes.WithLabelValues("local", "out")
metrics.StaleReadRemoteInBytes = metrics.TiKVStaleReadBytes.WithLabelValues("cross-zone", "in")
metrics.StaleReadRemoteOutBytes = metrics.TiKVStaleReadBytes.WithLabelValues("cross-zone", "out")
key := []byte("key")
value := []byte("value")
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (resp *tikvrpc.Response, err error) {
if req.StaleRead && !staleReadHit {
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
DataIsNotReady: &errorpb.DataIsNotReady{},
}}}, nil
}
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: value}}, nil
}}
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: key}, kv.ReplicaReadLeader, nil)
req.ReadReplicaScope = oracle.GlobalTxnScope
req.EnableStaleWithMixedReplicaRead()
ctx := context.WithValue(context.Background(), util.ExecDetailsKey, &util.ExecDetails{})
bo := retry.NewBackoffer(ctx, -1)
loc, err := s.cache.LocateKey(bo, key)
s.Require().Nil(err)
var resp *tikvrpc.Response
if asyncReq {
complete := false
rl := async.NewRunLoop()
s.regionRequestSender.SendReqAsync(bo, req, loc.Region, time.Second, async.NewCallback(rl, func(innerResp *tikvrpc.ResponseExt, innerErr error) {
resp, err = &innerResp.Response, innerErr
complete = true
}), WithMatchLabels(s.cluster.GetStore(s.storeIDs[0]).Labels))
for !complete {
_, err := rl.Exec(ctx)
s.Nil(err)
}
} else {
resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV, WithMatchLabels(s.cluster.GetStore(s.storeIDs[0]).Labels))
}
s.Require().Nil(err)
s.Equal(value, resp.Resp.(*kvrpcpb.GetResponse).Value)
hits, misses := readMetric(metrics.StaleReadHitCounter), readMetric(metrics.StaleReadMissCounter)
localReq, remoteReq := readMetric(metrics.StaleReadReqLocalCounter), readMetric(metrics.StaleReadReqCrossZoneCounter)
localInBytes, localOutBytes := readMetric(metrics.StaleReadLocalInBytes), readMetric(metrics.StaleReadLocalOutBytes)
remoteInBytes, remoteOutBytes := readMetric(metrics.StaleReadRemoteInBytes), readMetric(metrics.StaleReadRemoteOutBytes)
if staleReadHit {
// when stale read hitting
// local metrics should be counted
s.Equal(1, hits, caseName)
s.Equal(1, localReq, caseName)
s.Greater(localInBytes, 0, caseName)
s.Greater(localOutBytes, 0, caseName)
// remote metrics should be zero
s.Zero(misses, caseName)
s.Zero(remoteReq, caseName)
s.Zero(remoteInBytes, caseName)
s.Zero(remoteOutBytes, caseName)
} else {
// when stale read missing
s.Zero(hits, caseName)
// local replica is tried first, so local metrics will also be counted
s.Greater(localReq, 0, caseName)
s.Greater(localInBytes, 0, caseName)
s.Greater(localOutBytes, 0, caseName)
// remote metrics should be counted
s.Equal(1, misses, caseName)
s.Equal(1, remoteReq, caseName)
s.Greater(remoteInBytes, 0, caseName)
s.Greater(remoteInBytes, 0, caseName)
}
}
}
}

View File

@ -2864,18 +2864,36 @@ func (ca *replicaSelectorAccessPathCase) run(s *testReplicaSelectorSuite) *Regio
s.NotNil(rc)
regionErr, err := ca.genAccessErr(s.cache, rc, ca.accessErr[idx])
if regionErr != nil {
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{
RegionError: regionErr,
}}, nil
switch req.Type {
case tikvrpc.CmdGet:
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{
RegionError: regionErr,
}}, nil
case tikvrpc.CmdPrewrite:
return &tikvrpc.Response{Resp: &kvrpcpb.PrewriteResponse{
RegionError: regionErr,
}}, nil
default:
s.FailNow("unsupported reqType " + req.Type.String())
return nil, fmt.Errorf("unsupported reqType %v", req.Type)
}
}
if err != nil {
return nil, err
}
}
}
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{
Value: []byte("hello world"),
}}, nil
switch req.Type {
case tikvrpc.CmdGet:
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{
Value: []byte("hello world"),
}}, nil
case tikvrpc.CmdPrewrite:
return &tikvrpc.Response{Resp: &kvrpcpb.PrewriteResponse{}}, nil
default:
s.FailNow("unsupported reqType " + req.Type.String())
return nil, fmt.Errorf("unsupported reqType %v", req.Type)
}
}}
sender := NewRegionRequestSender(s.cache, fnClient, oracle.NoopReadTSValidator{})
req, opts, timeout := ca.buildRequest(s)