client-go/internal/client/metrics_collector.go

183 lines
5.5 KiB
Go

// Copyright 2024 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package client
import (
"sync/atomic"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/mpp"
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util"
)
type staleReadMetricsCollector struct {
}
func (s *staleReadMetricsCollector) onReq(size float64, isCrossZoneTraffic bool) {
if isCrossZoneTraffic {
metrics.StaleReadRemoteOutBytes.Add(float64(size))
metrics.StaleReadReqCrossZoneCounter.Add(1)
} else {
metrics.StaleReadLocalOutBytes.Add(float64(size))
metrics.StaleReadReqLocalCounter.Add(1)
}
}
func (s *staleReadMetricsCollector) onResp(size float64, isCrossZoneTraffic bool) {
if isCrossZoneTraffic {
metrics.StaleReadRemoteInBytes.Add(float64(size))
} else {
metrics.StaleReadLocalInBytes.Add(float64(size))
}
}
type networkCollector struct {
staleReadMetricsCollector
}
func (s *networkCollector) onReq(req *tikvrpc.Request, details *util.ExecDetails) {
if req == nil {
return
}
size := 0
switch req.Type {
case tikvrpc.CmdGet:
size = req.Get().Size()
case tikvrpc.CmdBatchGet:
size = req.BatchGet().Size()
case tikvrpc.CmdScan:
size = req.Scan().Size()
case tikvrpc.CmdCop:
size = req.Cop().Size()
case tikvrpc.CmdPrewrite:
size = req.Prewrite().Size()
case tikvrpc.CmdCommit:
size = req.Commit().Size()
case tikvrpc.CmdPessimisticLock:
size = req.PessimisticLock().Size()
case tikvrpc.CmdPessimisticRollback:
size = req.PessimisticRollback().Size()
case tikvrpc.CmdBatchRollback:
size = req.BatchRollback().Size()
case tikvrpc.CmdCheckSecondaryLocks:
size = req.CheckSecondaryLocks().Size()
case tikvrpc.CmdScanLock:
size = req.ScanLock().Size()
case tikvrpc.CmdResolveLock:
size = req.ResolveLock().Size()
case tikvrpc.CmdFlush:
size = req.Flush().Size()
case tikvrpc.CmdCheckTxnStatus:
size = req.CheckTxnStatus().Size()
case tikvrpc.CmdMPPTask:
size = req.DispatchMPPTask().Size()
default:
// ignore others
return
}
isTiflashTarget := req.StoreTp == tikvrpc.TiFlash
var total, crossZone *int64
if isTiflashTarget {
total = &details.UnpackedBytesSentMPPTotal
crossZone = &details.UnpackedBytesSentMPPCrossZone
} else {
total = &details.UnpackedBytesSentKVTotal
crossZone = &details.UnpackedBytesSentKVCrossZone
}
atomic.AddInt64(total, int64(size))
isCrossZoneTraffic := req.AccessLocation == kv.AccessCrossZone
if isCrossZoneTraffic {
atomic.AddInt64(crossZone, int64(size))
}
// stale read metrics
if req.StaleRead {
s.staleReadMetricsCollector.onReq(float64(size), isCrossZoneTraffic)
}
}
func (s *networkCollector) onResp(req *tikvrpc.Request, resp *tikvrpc.Response, details *util.ExecDetails) {
if resp == nil {
return
}
size := 0
switch req.Type {
case tikvrpc.CmdGet:
size += resp.Resp.(*kvrpcpb.GetResponse).Size()
case tikvrpc.CmdBatchGet:
size += resp.Resp.(*kvrpcpb.BatchGetResponse).Size()
case tikvrpc.CmdScan:
size += resp.Resp.(*kvrpcpb.ScanResponse).Size()
case tikvrpc.CmdCop:
size += resp.Resp.(*coprocessor.Response).Size()
case tikvrpc.CmdPrewrite:
size += resp.Resp.(*kvrpcpb.PrewriteResponse).Size()
case tikvrpc.CmdCommit:
size += resp.Resp.(*kvrpcpb.CommitResponse).Size()
case tikvrpc.CmdPessimisticLock:
size += resp.Resp.(*kvrpcpb.PessimisticLockResponse).Size()
case tikvrpc.CmdPessimisticRollback:
size += resp.Resp.(*kvrpcpb.PessimisticRollbackResponse).Size()
case tikvrpc.CmdBatchRollback:
size += resp.Resp.(*kvrpcpb.BatchRollbackResponse).Size()
case tikvrpc.CmdCheckSecondaryLocks:
size += resp.Resp.(*kvrpcpb.CheckSecondaryLocksResponse).Size()
case tikvrpc.CmdScanLock:
size += resp.Resp.(*kvrpcpb.ScanLockResponse).Size()
case tikvrpc.CmdResolveLock:
size += resp.Resp.(*kvrpcpb.ResolveLockResponse).Size()
case tikvrpc.CmdFlush:
size += resp.Resp.(*kvrpcpb.FlushResponse).Size()
case tikvrpc.CmdCheckTxnStatus:
size += resp.Resp.(*kvrpcpb.CheckTxnStatusResponse).Size()
case tikvrpc.CmdMPPTask:
// if is MPPDataPacket
if resp1, ok := resp.Resp.(*mpp.MPPDataPacket); ok && resp1 != nil {
size += resp1.Size()
}
// if is DispatchTaskResponse
if resp1, ok := resp.Resp.(*mpp.DispatchTaskResponse); ok && resp1 != nil {
size += resp1.Size()
}
default:
// ignore others
return
}
var total, crossZone *int64
isTiflashTarget := req.StoreTp == tikvrpc.TiFlash
if isTiflashTarget {
total = &details.UnpackedBytesReceivedMPPTotal
crossZone = &details.UnpackedBytesReceivedMPPCrossZone
} else {
total = &details.UnpackedBytesReceivedKVTotal
crossZone = &details.UnpackedBytesReceivedKVCrossZone
}
atomic.AddInt64(total, int64(size))
isCrossZoneTraffic := req.AccessLocation == kv.AccessCrossZone
if isCrossZoneTraffic {
atomic.AddInt64(crossZone, int64(size))
}
// stale read metrics
if req.StaleRead {
s.staleReadMetricsCollector.onResp(float64(size), isCrossZoneTraffic)
}
}