From 128ccbde43c45eda93947946723ce1690c43c5a0 Mon Sep 17 00:00:00 2001 From: Alkaid <38248129+jyz0309@users.noreply.github.com> Date: Mon, 11 Oct 2021 11:46:20 +0800 Subject: [PATCH] Add read throughput metric for SLI (#321) Signed-off-by: jyz0309 <45495947@qq.com> --- integration_tests/snapshot_test.go | 3 +++ metrics/metrics.go | 26 ++++++++++++++++++++++---- txnkv/txnsnapshot/snapshot.go | 6 ++++-- util/execdetails.go | 7 +++++++ 4 files changed, 36 insertions(+), 6 deletions(-) diff --git a/integration_tests/snapshot_test.go b/integration_tests/snapshot_test.go index 9f9e8c1a..e87e14cb 100644 --- a/integration_tests/snapshot_test.go +++ b/integration_tests/snapshot_test.go @@ -306,6 +306,7 @@ func (s *testSnapshotSuite) TestSnapshotRuntimeStats() { }, ScanDetailV2: &kvrpcpb.ScanDetailV2{ ProcessedVersions: 10, + ProcessedVersionsSize: 10, TotalVersions: 15, RocksdbBlockReadCount: 20, RocksdbBlockReadByte: 15, @@ -318,6 +319,7 @@ func (s *testSnapshotSuite) TestSnapshotRuntimeStats() { expect = "Get:{num_rpc:4, total_time:2s},txnLockFast_backoff:{num:2, total_time:60ms}, " + "total_process_time: 100ms, total_wait_time: 100ms, " + "scan_detail: {total_process_keys: 10, " + + "total_process_keys_size: 10, " + "total_keys: 15, " + "rocksdb: {delete_skipped_count: 5, " + "key_skipped_count: 1, " + @@ -327,6 +329,7 @@ func (s *testSnapshotSuite) TestSnapshotRuntimeStats() { expect = "Get:{num_rpc:4, total_time:2s},txnLockFast_backoff:{num:2, total_time:60ms}, " + "total_process_time: 200ms, total_wait_time: 200ms, " + "scan_detail: {total_process_keys: 20, " + + "total_process_keys_size: 20, " + "total_keys: 30, " + "rocksdb: {delete_skipped_count: 10, " + "key_skipped_count: 2, " + diff --git a/metrics/metrics.go b/metrics/metrics.go index 47191768..eda2113e 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -89,6 +89,7 @@ var ( TiKVTxnCommitBackoffSeconds prometheus.Histogram TiKVTxnCommitBackoffCount prometheus.Histogram TiKVSmallReadDuration prometheus.Histogram + TiKVReadThroughput prometheus.Histogram TiKVUnsafeDestroyRangeFailuresCounterVec *prometheus.CounterVec ) @@ -523,6 +524,15 @@ func initMetrics(namespace, subsystem string) { Buckets: prometheus.ExponentialBuckets(0.0005, 2, 28), // 0.5ms ~ 74h }) + TiKVReadThroughput = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: "sli", + Name: "tikv_read_throughput", + Help: "Read throughput of TiKV read in Bytes/s.", + Buckets: prometheus.ExponentialBuckets(1024, 2, 13), // 1MB/s ~ 4GB/s + }) + TiKVUnsafeDestroyRangeFailuresCounterVec = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: namespace, @@ -594,6 +604,7 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVTxnCommitBackoffSeconds) prometheus.MustRegister(TiKVTxnCommitBackoffCount) prometheus.MustRegister(TiKVSmallReadDuration) + prometheus.MustRegister(TiKVReadThroughput) } // readCounter reads the value of a prometheus.Counter. @@ -635,11 +646,18 @@ func GetTxnCommitCounter() TxnCommitCounter { } } -const smallTxnAffectRow = 20 +const ( + smallTxnReadRow = 20 + smallTxnReadSize = 1 * 1024 * 1024 //1MB +) // ObserveReadSLI observes the read SLI metric. -func ObserveReadSLI(readKeys uint64, readTime float64) { - if readKeys <= smallTxnAffectRow && readKeys != 0 && readTime != 0 { - TiKVSmallReadDuration.Observe(readTime) +func ObserveReadSLI(readKeys uint64, readTime float64, readSize float64) { + if readKeys != 0 && readTime != 0 { + if readKeys <= smallTxnReadRow && readSize < smallTxnReadSize { + TiKVSmallReadDuration.Observe(readTime) + } else { + TiKVReadThroughput.Observe(readSize / readTime) + } } } diff --git a/txnkv/txnsnapshot/snapshot.go b/txnkv/txnsnapshot/snapshot.go index 07411b13..1f78603d 100644 --- a/txnkv/txnsnapshot/snapshot.go +++ b/txnkv/txnsnapshot/snapshot.go @@ -428,7 +428,8 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, if batchGetResp.ExecDetailsV2 != nil { readKeys := len(batchGetResp.Pairs) readTime := float64(batchGetResp.ExecDetailsV2.GetTimeDetail().GetKvReadWallTimeMs() / 1000) - metrics.ObserveReadSLI(uint64(readKeys), readTime) + readSize := float64(batchGetResp.ExecDetailsV2.GetScanDetailV2().GetProcessedVersionsSize()) + metrics.ObserveReadSLI(uint64(readKeys), readTime, readSize) s.mergeExecDetail(batchGetResp.ExecDetailsV2) } if len(lockedKeys) > 0 { @@ -568,7 +569,8 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([] if cmdGetResp.ExecDetailsV2 != nil { readKeys := len(cmdGetResp.Value) readTime := float64(cmdGetResp.ExecDetailsV2.GetTimeDetail().GetKvReadWallTimeMs() / 1000) - metrics.ObserveReadSLI(uint64(readKeys), readTime) + readSize := float64(cmdGetResp.ExecDetailsV2.GetScanDetailV2().GetProcessedVersionsSize()) + metrics.ObserveReadSLI(uint64(readKeys), readTime, readSize) s.mergeExecDetail(cmdGetResp.ExecDetailsV2) } val := cmdGetResp.GetValue() diff --git a/util/execdetails.go b/util/execdetails.go index 549d849c..adb026bc 100644 --- a/util/execdetails.go +++ b/util/execdetails.go @@ -214,6 +214,9 @@ type ScanDetail struct { // It does not include deleted version or RocksDB tombstone keys. // For Coprocessor requests, it includes keys that has been filtered out by Selection. ProcessedKeys int64 + // Number of bytes of user key-value pairs scanned from the storage, i.e. + // total size of data returned from MVCC layer. + ProcessedKeysSize int64 // RocksdbDeleteSkippedCount is the total number of deletes and single deletes skipped over during // iteration, i.e. how many RocksDB tombstones are skipped. RocksdbDeleteSkippedCount uint64 @@ -231,6 +234,7 @@ type ScanDetail struct { func (sd *ScanDetail) Merge(scanDetail *ScanDetail) { atomic.AddInt64(&sd.TotalKeys, scanDetail.TotalKeys) atomic.AddInt64(&sd.ProcessedKeys, scanDetail.ProcessedKeys) + atomic.AddInt64(&sd.ProcessedKeysSize, scanDetail.ProcessedKeysSize) atomic.AddUint64(&sd.RocksdbDeleteSkippedCount, scanDetail.RocksdbDeleteSkippedCount) atomic.AddUint64(&sd.RocksdbKeySkippedCount, scanDetail.RocksdbKeySkippedCount) atomic.AddUint64(&sd.RocksdbBlockCacheHitCount, scanDetail.RocksdbBlockCacheHitCount) @@ -249,6 +253,8 @@ func (sd *ScanDetail) String() string { buf.WriteString("scan_detail: {") buf.WriteString("total_process_keys: ") buf.WriteString(strconv.FormatInt(sd.ProcessedKeys, 10)) + buf.WriteString(", total_process_keys_size: ") + buf.WriteString(strconv.FormatInt(sd.ProcessedKeysSize, 10)) buf.WriteString(", total_keys: ") buf.WriteString(strconv.FormatInt(sd.TotalKeys, 10)) buf.WriteString(", rocksdb: {") @@ -272,6 +278,7 @@ func (sd *ScanDetail) MergeFromScanDetailV2(scanDetail *kvrpcpb.ScanDetailV2) { if scanDetail != nil { sd.TotalKeys += int64(scanDetail.TotalVersions) sd.ProcessedKeys += int64(scanDetail.ProcessedVersions) + sd.ProcessedKeysSize += int64(scanDetail.ProcessedVersionsSize) sd.RocksdbDeleteSkippedCount += scanDetail.RocksdbDeleteSkippedCount sd.RocksdbKeySkippedCount += scanDetail.RocksdbKeySkippedCount sd.RocksdbBlockCacheHitCount += scanDetail.RocksdbBlockCacheHitCount