mirror of https://github.com/tikv/client-go.git
Add read throughput metric for SLI (#321)
Signed-off-by: jyz0309 <45495947@qq.com>
This commit is contained in:
parent
843a5378aa
commit
128ccbde43
|
|
@ -306,6 +306,7 @@ func (s *testSnapshotSuite) TestSnapshotRuntimeStats() {
|
|||
},
|
||||
ScanDetailV2: &kvrpcpb.ScanDetailV2{
|
||||
ProcessedVersions: 10,
|
||||
ProcessedVersionsSize: 10,
|
||||
TotalVersions: 15,
|
||||
RocksdbBlockReadCount: 20,
|
||||
RocksdbBlockReadByte: 15,
|
||||
|
|
@ -318,6 +319,7 @@ func (s *testSnapshotSuite) TestSnapshotRuntimeStats() {
|
|||
expect = "Get:{num_rpc:4, total_time:2s},txnLockFast_backoff:{num:2, total_time:60ms}, " +
|
||||
"total_process_time: 100ms, total_wait_time: 100ms, " +
|
||||
"scan_detail: {total_process_keys: 10, " +
|
||||
"total_process_keys_size: 10, " +
|
||||
"total_keys: 15, " +
|
||||
"rocksdb: {delete_skipped_count: 5, " +
|
||||
"key_skipped_count: 1, " +
|
||||
|
|
@ -327,6 +329,7 @@ func (s *testSnapshotSuite) TestSnapshotRuntimeStats() {
|
|||
expect = "Get:{num_rpc:4, total_time:2s},txnLockFast_backoff:{num:2, total_time:60ms}, " +
|
||||
"total_process_time: 200ms, total_wait_time: 200ms, " +
|
||||
"scan_detail: {total_process_keys: 20, " +
|
||||
"total_process_keys_size: 20, " +
|
||||
"total_keys: 30, " +
|
||||
"rocksdb: {delete_skipped_count: 10, " +
|
||||
"key_skipped_count: 2, " +
|
||||
|
|
|
|||
|
|
@ -89,6 +89,7 @@ var (
|
|||
TiKVTxnCommitBackoffSeconds prometheus.Histogram
|
||||
TiKVTxnCommitBackoffCount prometheus.Histogram
|
||||
TiKVSmallReadDuration prometheus.Histogram
|
||||
TiKVReadThroughput prometheus.Histogram
|
||||
TiKVUnsafeDestroyRangeFailuresCounterVec *prometheus.CounterVec
|
||||
)
|
||||
|
||||
|
|
@ -523,6 +524,15 @@ func initMetrics(namespace, subsystem string) {
|
|||
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 28), // 0.5ms ~ 74h
|
||||
})
|
||||
|
||||
TiKVReadThroughput = prometheus.NewHistogram(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: "sli",
|
||||
Name: "tikv_read_throughput",
|
||||
Help: "Read throughput of TiKV read in Bytes/s.",
|
||||
Buckets: prometheus.ExponentialBuckets(1024, 2, 13), // 1MB/s ~ 4GB/s
|
||||
})
|
||||
|
||||
TiKVUnsafeDestroyRangeFailuresCounterVec = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
|
|
@ -594,6 +604,7 @@ func RegisterMetrics() {
|
|||
prometheus.MustRegister(TiKVTxnCommitBackoffSeconds)
|
||||
prometheus.MustRegister(TiKVTxnCommitBackoffCount)
|
||||
prometheus.MustRegister(TiKVSmallReadDuration)
|
||||
prometheus.MustRegister(TiKVReadThroughput)
|
||||
}
|
||||
|
||||
// readCounter reads the value of a prometheus.Counter.
|
||||
|
|
@ -635,11 +646,18 @@ func GetTxnCommitCounter() TxnCommitCounter {
|
|||
}
|
||||
}
|
||||
|
||||
const smallTxnAffectRow = 20
|
||||
const (
|
||||
smallTxnReadRow = 20
|
||||
smallTxnReadSize = 1 * 1024 * 1024 //1MB
|
||||
)
|
||||
|
||||
// ObserveReadSLI observes the read SLI metric.
|
||||
func ObserveReadSLI(readKeys uint64, readTime float64) {
|
||||
if readKeys <= smallTxnAffectRow && readKeys != 0 && readTime != 0 {
|
||||
TiKVSmallReadDuration.Observe(readTime)
|
||||
func ObserveReadSLI(readKeys uint64, readTime float64, readSize float64) {
|
||||
if readKeys != 0 && readTime != 0 {
|
||||
if readKeys <= smallTxnReadRow && readSize < smallTxnReadSize {
|
||||
TiKVSmallReadDuration.Observe(readTime)
|
||||
} else {
|
||||
TiKVReadThroughput.Observe(readSize / readTime)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -428,7 +428,8 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys,
|
|||
if batchGetResp.ExecDetailsV2 != nil {
|
||||
readKeys := len(batchGetResp.Pairs)
|
||||
readTime := float64(batchGetResp.ExecDetailsV2.GetTimeDetail().GetKvReadWallTimeMs() / 1000)
|
||||
metrics.ObserveReadSLI(uint64(readKeys), readTime)
|
||||
readSize := float64(batchGetResp.ExecDetailsV2.GetScanDetailV2().GetProcessedVersionsSize())
|
||||
metrics.ObserveReadSLI(uint64(readKeys), readTime, readSize)
|
||||
s.mergeExecDetail(batchGetResp.ExecDetailsV2)
|
||||
}
|
||||
if len(lockedKeys) > 0 {
|
||||
|
|
@ -568,7 +569,8 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([]
|
|||
if cmdGetResp.ExecDetailsV2 != nil {
|
||||
readKeys := len(cmdGetResp.Value)
|
||||
readTime := float64(cmdGetResp.ExecDetailsV2.GetTimeDetail().GetKvReadWallTimeMs() / 1000)
|
||||
metrics.ObserveReadSLI(uint64(readKeys), readTime)
|
||||
readSize := float64(cmdGetResp.ExecDetailsV2.GetScanDetailV2().GetProcessedVersionsSize())
|
||||
metrics.ObserveReadSLI(uint64(readKeys), readTime, readSize)
|
||||
s.mergeExecDetail(cmdGetResp.ExecDetailsV2)
|
||||
}
|
||||
val := cmdGetResp.GetValue()
|
||||
|
|
|
|||
|
|
@ -214,6 +214,9 @@ type ScanDetail struct {
|
|||
// It does not include deleted version or RocksDB tombstone keys.
|
||||
// For Coprocessor requests, it includes keys that has been filtered out by Selection.
|
||||
ProcessedKeys int64
|
||||
// Number of bytes of user key-value pairs scanned from the storage, i.e.
|
||||
// total size of data returned from MVCC layer.
|
||||
ProcessedKeysSize int64
|
||||
// RocksdbDeleteSkippedCount is the total number of deletes and single deletes skipped over during
|
||||
// iteration, i.e. how many RocksDB tombstones are skipped.
|
||||
RocksdbDeleteSkippedCount uint64
|
||||
|
|
@ -231,6 +234,7 @@ type ScanDetail struct {
|
|||
func (sd *ScanDetail) Merge(scanDetail *ScanDetail) {
|
||||
atomic.AddInt64(&sd.TotalKeys, scanDetail.TotalKeys)
|
||||
atomic.AddInt64(&sd.ProcessedKeys, scanDetail.ProcessedKeys)
|
||||
atomic.AddInt64(&sd.ProcessedKeysSize, scanDetail.ProcessedKeysSize)
|
||||
atomic.AddUint64(&sd.RocksdbDeleteSkippedCount, scanDetail.RocksdbDeleteSkippedCount)
|
||||
atomic.AddUint64(&sd.RocksdbKeySkippedCount, scanDetail.RocksdbKeySkippedCount)
|
||||
atomic.AddUint64(&sd.RocksdbBlockCacheHitCount, scanDetail.RocksdbBlockCacheHitCount)
|
||||
|
|
@ -249,6 +253,8 @@ func (sd *ScanDetail) String() string {
|
|||
buf.WriteString("scan_detail: {")
|
||||
buf.WriteString("total_process_keys: ")
|
||||
buf.WriteString(strconv.FormatInt(sd.ProcessedKeys, 10))
|
||||
buf.WriteString(", total_process_keys_size: ")
|
||||
buf.WriteString(strconv.FormatInt(sd.ProcessedKeysSize, 10))
|
||||
buf.WriteString(", total_keys: ")
|
||||
buf.WriteString(strconv.FormatInt(sd.TotalKeys, 10))
|
||||
buf.WriteString(", rocksdb: {")
|
||||
|
|
@ -272,6 +278,7 @@ func (sd *ScanDetail) MergeFromScanDetailV2(scanDetail *kvrpcpb.ScanDetailV2) {
|
|||
if scanDetail != nil {
|
||||
sd.TotalKeys += int64(scanDetail.TotalVersions)
|
||||
sd.ProcessedKeys += int64(scanDetail.ProcessedVersions)
|
||||
sd.ProcessedKeysSize += int64(scanDetail.ProcessedVersionsSize)
|
||||
sd.RocksdbDeleteSkippedCount += scanDetail.RocksdbDeleteSkippedCount
|
||||
sd.RocksdbKeySkippedCount += scanDetail.RocksdbKeySkippedCount
|
||||
sd.RocksdbBlockCacheHitCount += scanDetail.RocksdbBlockCacheHitCount
|
||||
|
|
|
|||
Loading…
Reference in New Issue