[to #556] metrics: attach cluster label to metrics (#558)

This commit is contained in:
iosmanthus 2022-03-28 09:20:50 +08:00 committed by GitHub
parent f4e7c302ad
commit 6cbf56aede
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 470 additions and 237 deletions

View File

@ -113,7 +113,7 @@
"targets": [ "targets": [
{ {
"exemplar": true, "exemplar": true,
"expr": "sum(rate(client_java_raw_requests_latency_count{instance=~\"$instance\"}[$__rate_interval])) by (type)", "expr": "sum(rate(client_java_raw_requests_latency_count{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (type)",
"format": "time_series", "format": "time_series",
"interval": "", "interval": "",
"intervalFactor": 1, "intervalFactor": 1,
@ -214,7 +214,7 @@
"targets": [ "targets": [
{ {
"exemplar": true, "exemplar": true,
"expr": "sum(rate(client_java_raw_requests_failure_total{instance=~\"$instance\"}[$__rate_interval])) by (type)", "expr": "sum(rate(client_java_raw_requests_failure_total{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (type)",
"format": "time_series", "format": "time_series",
"interval": "", "interval": "",
"intervalFactor": 1, "intervalFactor": 1,
@ -315,7 +315,7 @@
"targets": [ "targets": [
{ {
"exemplar": true, "exemplar": true,
"expr": "histogram_quantile(0.99, sum(rate(client_java_raw_requests_latency_bucket{instance=~\"$instance\"}[$__rate_interval])) by (le, type))", "expr": "histogram_quantile(0.99, sum(rate(client_java_raw_requests_latency_bucket{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (le, type))",
"format": "time_series", "format": "time_series",
"hide": false, "hide": false,
"interval": "", "interval": "",
@ -325,7 +325,7 @@
}, },
{ {
"exemplar": true, "exemplar": true,
"expr": "histogram_quantile(1, sum(rate(client_java_raw_requests_latency_bucket{instance=~\"$instance\"}[$__rate_interval])) by (le, type))", "expr": "histogram_quantile(1, sum(rate(client_java_raw_requests_latency_bucket{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (le, type))",
"hide": false, "hide": false,
"interval": "", "interval": "",
"legendFormat": "{{type}} - max", "legendFormat": "{{type}} - max",
@ -435,7 +435,7 @@
"targets": [ "targets": [
{ {
"exemplar": true, "exemplar": true,
"expr": "sum(rate(client_java_raw_requests_latency_sum{instance=~\"$instance\"}[$__rate_interval])) by (type) / sum(rate(client_java_raw_requests_latency_count{instance=~\"$instance\"}[$__rate_interval])) by (type)", "expr": "sum(rate(client_java_raw_requests_latency_sum{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (type) / sum(rate(client_java_raw_requests_latency_count{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (type)",
"format": "time_series", "format": "time_series",
"hide": false, "hide": false,
"interval": "", "interval": "",
@ -561,7 +561,7 @@
"targets": [ "targets": [
{ {
"exemplar": true, "exemplar": true,
"expr": "histogram_quantile(1, sum(rate(client_java_grpc_raw_requests_latency_bucket{instance=~\"$instance\"}[$__rate_interval])) by (le, type))", "expr": "histogram_quantile(1, sum(rate(client_java_grpc_raw_requests_latency_bucket{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (le, type))",
"format": "time_series", "format": "time_series",
"instant": false, "instant": false,
"interval": "", "interval": "",
@ -572,7 +572,7 @@
}, },
{ {
"exemplar": true, "exemplar": true,
"expr": "histogram_quantile(0.99, sum(rate(client_java_grpc_raw_requests_latency_bucket{instance=~\"$instance\"}[$__rate_interval])) by (le, type))", "expr": "histogram_quantile(0.99, sum(rate(client_java_grpc_raw_requests_latency_bucket{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (le, type))",
"format": "time_series", "format": "time_series",
"hide": false, "hide": false,
"instant": false, "instant": false,
@ -686,7 +686,7 @@
"targets": [ "targets": [
{ {
"exemplar": true, "exemplar": true,
"expr": "sum(rate(client_java_grpc_raw_requests_latency_sum{instance=~\"$instance\"}[$__rate_interval])) by (type) / sum(rate(client_java_grpc_raw_requests_latency_count{instance=~\"$instance\"}[$__rate_interval])) by (type)", "expr": "sum(rate(client_java_grpc_raw_requests_latency_sum{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (type) / sum(rate(client_java_grpc_raw_requests_latency_count{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (type)",
"format": "time_series", "format": "time_series",
"interval": "", "interval": "",
"intervalFactor": 1, "intervalFactor": 1,
@ -796,7 +796,7 @@
"targets": [ "targets": [
{ {
"exemplar": true, "exemplar": true,
"expr": "histogram_quantile(1,sum(rate(client_java_grpc_single_requests_latency_bucket{instance=~\"$instance\", type!=\"pdpb.PD/GetRegion\", type!=\"pdpb.PD/GetStore\"}[$__rate_interval])) by (le, type))", "expr": "histogram_quantile(1,sum(rate(client_java_grpc_single_requests_latency_bucket{instance=~\"$instance\", cluster=~\"$cluster\", type!=\"pdpb.PD/GetRegion\", type!=\"pdpb.PD/GetStore\"}[$__rate_interval])) by (le, type))",
"interval": "", "interval": "",
"legendFormat": "{{ type }} -- max", "legendFormat": "{{ type }} -- max",
"queryType": "randomWalk", "queryType": "randomWalk",
@ -804,7 +804,7 @@
}, },
{ {
"exemplar": true, "exemplar": true,
"expr": "histogram_quantile(0.99,sum(rate(client_java_grpc_single_requests_latency_bucket{instance=~\"$instance\", type!=\"pdpb.PD/GetRegion\", type!=\"pdpb.PD/GetStore\"}[$__rate_interval])) by (le, type))", "expr": "histogram_quantile(0.99,sum(rate(client_java_grpc_single_requests_latency_bucket{instance=~\"$instance\", cluster=~\"$cluster\", type!=\"pdpb.PD/GetRegion\", type!=\"pdpb.PD/GetStore\"}[$__rate_interval])) by (le, type))",
"hide": false, "hide": false,
"interval": "", "interval": "",
"legendFormat": "{{ type }} -- 99", "legendFormat": "{{ type }} -- 99",
@ -914,7 +914,7 @@
"targets": [ "targets": [
{ {
"exemplar": true, "exemplar": true,
"expr": "sum(rate(client_java_grpc_single_requests_latency_sum{instance=~\"$instance\", type!=\"pdpb.PD/GetRegion\", type!=\"pdpb.PD/GetStore\"}[$__rate_interval])) by (type) / sum(rate(client_java_grpc_single_requests_latency_count{instance=~\"$instance\", type!=\"pdpb.PD/GetRegion\", type!=\"pdpb.PD/GetStore\"}[$__rate_interval])) by (type)", "expr": "sum(rate(client_java_grpc_single_requests_latency_sum{instance=~\"$instance\", cluster=~\"$cluster\", type!=\"pdpb.PD/GetRegion\", type!=\"pdpb.PD/GetStore\"}[$__rate_interval])) by (type) / sum(rate(client_java_grpc_single_requests_latency_count{instance=~\"$instance\", cluster=~\"$cluster\", type!=\"pdpb.PD/GetRegion\", type!=\"pdpb.PD/GetStore\"}[$__rate_interval])) by (type)",
"interval": "", "interval": "",
"legendFormat": "{{ type }}", "legendFormat": "{{ type }}",
"queryType": "randomWalk", "queryType": "randomWalk",
@ -1023,7 +1023,7 @@
"targets": [ "targets": [
{ {
"exemplar": true, "exemplar": true,
"expr": "sum(rate(client_java_seek_leader_store_duration_sum{instance=~\"$instance\"}[$__rate_interval])) by (le) / sum(rate(client_java_seek_leader_store_duration_count{instance=~\"$instance\"}[$__rate_interval])) by (le)", "expr": "sum(rate(client_java_seek_leader_store_duration_sum{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (le) / sum(rate(client_java_seek_leader_store_duration_count{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (le)",
"interval": "", "interval": "",
"legendFormat": "seek-leader-store-avg", "legendFormat": "seek-leader-store-avg",
"queryType": "randomWalk", "queryType": "randomWalk",
@ -1031,7 +1031,7 @@
}, },
{ {
"exemplar": true, "exemplar": true,
"expr": "histogram_quantile(0.99,sum(rate(client_java_seek_leader_store_duration_bucket{instance=~\"$instance\"}[$__rate_interval])) by (le, type))", "expr": "histogram_quantile(0.99,sum(rate(client_java_seek_leader_store_duration_bucket{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (le, type))",
"hide": false, "hide": false,
"interval": "", "interval": "",
"legendFormat": "seek-leader-store-99", "legendFormat": "seek-leader-store-99",
@ -1140,7 +1140,7 @@
"targets": [ "targets": [
{ {
"exemplar": true, "exemplar": true,
"expr": "sum(rate(client_java_seek_proxy_store_duration_sum{instance=~\"$instance\"}[$__rate_interval])) by (le) / sum(rate(client_java_seek_proxy_store_duration_count{instance=~\"$instance\"}[$__rate_interval])) by (le)", "expr": "sum(rate(client_java_seek_proxy_store_duration_sum{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (le) / sum(rate(client_java_seek_proxy_store_duration_count{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (le)",
"interval": "", "interval": "",
"legendFormat": "seek-proxy-store-avg", "legendFormat": "seek-proxy-store-avg",
"queryType": "randomWalk", "queryType": "randomWalk",
@ -1148,7 +1148,7 @@
}, },
{ {
"exemplar": true, "exemplar": true,
"expr": "histogram_quantile(0.99,sum(rate(client_java_seek_proxy_store_duration_bucket{instance=~\"$instance\"}[$__rate_interval])) by (le, type))", "expr": "histogram_quantile(0.99,sum(rate(client_java_seek_proxy_store_duration_bucket{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (le, type))",
"hide": false, "hide": false,
"interval": "", "interval": "",
"legendFormat": "seek-proxy-store-99", "legendFormat": "seek-proxy-store-99",
@ -1259,7 +1259,7 @@
"targets": [ "targets": [
{ {
"exemplar": true, "exemplar": true,
"expr": "sum(rate(client_java_backoff_duration_sum{instance=~\"$instance\", type!=\"BoPDRPC\"}[$__rate_interval])) by (le, type)", "expr": "sum(rate(client_java_backoff_duration_sum{instance=~\"$instance\", cluster=~\"$cluster\", type!=\"BoPDRPC\"}[$__rate_interval])) by (le, type)",
"hide": false, "hide": false,
"interval": "", "interval": "",
"legendFormat": "{{type}}-total", "legendFormat": "{{type}}-total",
@ -1358,7 +1358,7 @@
"targets": [ "targets": [
{ {
"exemplar": true, "exemplar": true,
"expr": "sum(rate(client_java_backoff_duration_count{instance=~\"$instance\"}[$__rate_interval])) by (le, type)", "expr": "sum(rate(client_java_backoff_duration_count{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (le, type)",
"interval": "", "interval": "",
"legendFormat": "{{type}}-count", "legendFormat": "{{type}}-count",
"queryType": "randomWalk", "queryType": "randomWalk",
@ -1459,7 +1459,7 @@
"targets": [ "targets": [
{ {
"exemplar": true, "exemplar": true,
"expr": "sum(rate(client_java_backoff_duration_sum{instance=~\"$instance\", type!=\"BoPDRPC\"}[$__rate_interval])) by (le, type) / sum(rate(client_java_backoff_duration_count{instance=~\"$instance\", type!=\"BoPDRPC\"}[$__rate_interval])) by (le, type)", "expr": "sum(rate(client_java_backoff_duration_sum{instance=~\"$instance\", cluster=~\"$cluster\", type!=\"BoPDRPC\"}[$__rate_interval])) by (le, type) / sum(rate(client_java_backoff_duration_count{instance=~\"$instance\", cluster=~\"$cluster\", type!=\"BoPDRPC\"}[$__rate_interval])) by (le, type)",
"interval": "", "interval": "",
"legendFormat": "{{type}}-avg", "legendFormat": "{{type}}-avg",
"queryType": "randomWalk", "queryType": "randomWalk",
@ -1573,7 +1573,7 @@
"targets": [ "targets": [
{ {
"exemplar": true, "exemplar": true,
"expr": "sum(rate(client_java_get_region_by_requests_latency_sum{instance=~\"$instance\"}[$__rate_interval])) / sum(rate(client_java_get_region_by_requests_latency_count{instance=~\"$instance\"}[$__rate_interval]))", "expr": "sum(rate(client_java_get_region_by_requests_latency_sum{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) / sum(rate(client_java_get_region_by_requests_latency_count{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval]))",
"interval": "", "interval": "",
"legendFormat": "avg", "legendFormat": "avg",
"queryType": "randomWalk", "queryType": "randomWalk",
@ -1581,7 +1581,7 @@
}, },
{ {
"exemplar": true, "exemplar": true,
"expr": "histogram_quantile(0.99, sum(rate(client_java_get_region_by_requests_latency_bucket{instance=~\"$instance\"}[$__rate_interval])) by (le))", "expr": "histogram_quantile(0.99, sum(rate(client_java_get_region_by_requests_latency_bucket{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (le))",
"hide": false, "hide": false,
"interval": "", "interval": "",
"legendFormat": "99th", "legendFormat": "99th",
@ -1680,7 +1680,7 @@
"targets": [ "targets": [
{ {
"exemplar": true, "exemplar": true,
"expr": "1 - sum(rate(client_java_grpc_single_requests_latency_count{instance=~\"$instance\", type=\"pdpb.PD/GetRegion\"}[$__rate_interval])) / sum(rate(client_java_get_region_by_requests_latency_count{instance=~\"$instance\"}[$__rate_interval]))", "expr": "1 - sum(rate(client_java_grpc_single_requests_latency_count{instance=~\"$instance\", cluster=~\"$cluster\", type=\"pdpb.PD/GetRegion\"}[$__rate_interval])) / sum(rate(client_java_get_region_by_requests_latency_count{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval]))",
"interval": "", "interval": "",
"legendFormat": "hit ratio", "legendFormat": "hit ratio",
"queryType": "randomWalk", "queryType": "randomWalk",
@ -1788,7 +1788,7 @@
"targets": [ "targets": [
{ {
"exemplar": true, "exemplar": true,
"expr": "histogram_quantile(0.99,sum(rate(client_java_grpc_single_requests_latency_bucket{instance=~\"$instance\", type=\"pdpb.PD/GetStore\"}[$__rate_interval])) by (le, type))", "expr": "histogram_quantile(0.99,sum(rate(client_java_grpc_single_requests_latency_bucket{instance=~\"$instance\", cluster=~\"$cluster\", type=\"pdpb.PD/GetStore\"}[$__rate_interval])) by (le, type))",
"interval": "", "interval": "",
"legendFormat": "{{ type }}-99th", "legendFormat": "{{ type }}-99th",
"queryType": "randomWalk", "queryType": "randomWalk",
@ -1796,7 +1796,7 @@
}, },
{ {
"exemplar": true, "exemplar": true,
"expr": "histogram_quantile(0.99,sum(rate(client_java_grpc_single_requests_latency_bucket{instance=~\"$instance\", type=\"pdpb.PD/GetRegion\"}[$__rate_interval])) by (le, type))", "expr": "histogram_quantile(0.99,sum(rate(client_java_grpc_single_requests_latency_bucket{instance=~\"$instance\", cluster=~\"$cluster\", type=\"pdpb.PD/GetRegion\"}[$__rate_interval])) by (le, type))",
"hide": false, "hide": false,
"interval": "", "interval": "",
"legendFormat": "{{ type }}-99th", "legendFormat": "{{ type }}-99th",
@ -1804,7 +1804,7 @@
}, },
{ {
"exemplar": true, "exemplar": true,
"expr": "histogram_quantile(0.99,sum(rate(client_java_grpc_single_requests_latency_bucket{instance=~\"$instance\", type=\"pdpb.PD/GetMembers\"}[$__rate_interval])) by (le, type))", "expr": "histogram_quantile(0.99,sum(rate(client_java_grpc_single_requests_latency_bucket{instance=~\"$instance\", cluster=~\"$cluster\", type=\"pdpb.PD/GetMembers\"}[$__rate_interval])) by (le, type))",
"hide": false, "hide": false,
"interval": "", "interval": "",
"legendFormat": "{{ type }}-99th", "legendFormat": "{{ type }}-99th",
@ -1904,7 +1904,7 @@
"targets": [ "targets": [
{ {
"exemplar": true, "exemplar": true,
"expr": "sum(rate(client_java_grpc_single_requests_latency_count{instance=~\"$instance\", type=\"pdpb.PD/GetRegion\"}[$__rate_interval])) by (type)", "expr": "sum(rate(client_java_grpc_single_requests_latency_count{instance=~\"$instance\", cluster=~\"$cluster\", type=\"pdpb.PD/GetRegion\"}[$__rate_interval])) by (type)",
"hide": false, "hide": false,
"interval": "", "interval": "",
"legendFormat": "{{type}}", "legendFormat": "{{type}}",
@ -1912,7 +1912,7 @@
}, },
{ {
"exemplar": true, "exemplar": true,
"expr": "sum(rate(client_java_grpc_single_requests_latency_count{instance=~\"$instance\", type=\"pdpb.PD/GetStore\"}[$__rate_interval])) by (type)", "expr": "sum(rate(client_java_grpc_single_requests_latency_count{instance=~\"$instance\", cluster=~\"$cluster\", type=\"pdpb.PD/GetStore\"}[$__rate_interval])) by (type)",
"hide": false, "hide": false,
"interval": "", "interval": "",
"legendFormat": "{{type}}", "legendFormat": "{{type}}",
@ -1920,7 +1920,7 @@
}, },
{ {
"exemplar": true, "exemplar": true,
"expr": "sum(rate(client_java_grpc_single_requests_latency_count{instance=~\"$instance\", type=\"pdpb.PD/GetMembers\"}[$__rate_interval])) by (type)", "expr": "sum(rate(client_java_grpc_single_requests_latency_count{instance=~\"$instance\", cluster=~\"$cluster\", type=\"pdpb.PD/GetMembers\"}[$__rate_interval])) by (type)",
"hide": false, "hide": false,
"interval": "", "interval": "",
"legendFormat": "{{type}}", "legendFormat": "{{type}}",
@ -2021,7 +2021,7 @@
"targets": [ "targets": [
{ {
"exemplar": true, "exemplar": true,
"expr": "sum(rate(client_java_backoff_duration_sum{instance=~\"$instance\", type=\"BoPDRPC\"}[$__rate_interval])) by (le, type) / sum(rate(client_java_backoff_duration_count{instance=~\"$instance\", type=\"BoPDRPC\"}[$__rate_interval])) by (le, type)", "expr": "sum(rate(client_java_backoff_duration_sum{instance=~\"$instance\", cluster=~\"$cluster\", type=\"BoPDRPC\"}[$__rate_interval])) by (le, type) / sum(rate(client_java_backoff_duration_count{instance=~\"$instance\", cluster=~\"$cluster\", type=\"BoPDRPC\"}[$__rate_interval])) by (le, type)",
"interval": "", "interval": "",
"legendFormat": "{{type}}-avg", "legendFormat": "{{type}}-avg",
"queryType": "randomWalk", "queryType": "randomWalk",
@ -2029,7 +2029,7 @@
}, },
{ {
"exemplar": true, "exemplar": true,
"expr": "sum(rate(client_java_backoff_duration_sum{instance=~\"$instance\", type=\"BoPDRPC\"}[$__rate_interval])) by (le, type)", "expr": "sum(rate(client_java_backoff_duration_sum{instance=~\"$instance\", cluster=~\"$cluster\", type=\"BoPDRPC\"}[$__rate_interval])) by (le, type)",
"hide": false, "hide": false,
"interval": "", "interval": "",
"legendFormat": "{{type}}-sum", "legendFormat": "{{type}}-sum",
@ -2128,7 +2128,7 @@
"targets": [ "targets": [
{ {
"exemplar": true, "exemplar": true,
"expr": "sum(rate(client_java_backoff_duration_count{instance=~\"$instance\", type=\"BoPDRPC\"}[$__rate_interval])) by (le, type)", "expr": "sum(rate(client_java_backoff_duration_count{instance=~\"$instance\", cluster=~\"$cluster\", type=\"BoPDRPC\"}[$__rate_interval])) by (le, type)",
"interval": "", "interval": "",
"legendFormat": "{{type}}", "legendFormat": "{{type}}",
"queryType": "randomWalk", "queryType": "randomWalk",
@ -2350,7 +2350,7 @@
"targets": [ "targets": [
{ {
"exemplar": true, "exemplar": true,
"expr": "sum(rate(client_java_smart_raw_requests_latency_count{instance=~\"$instance\"}[$__rate_interval])) by (type)", "expr": "sum(rate(client_java_smart_raw_requests_latency_count{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (type)",
"format": "time_series", "format": "time_series",
"interval": "", "interval": "",
"intervalFactor": 1, "intervalFactor": 1,
@ -2451,7 +2451,7 @@
"targets": [ "targets": [
{ {
"exemplar": true, "exemplar": true,
"expr": "sum(rate(client_java_smart_raw_requests_failure_total{instance=~\"$instance\"}[$__rate_interval])) by (type)", "expr": "sum(rate(client_java_smart_raw_requests_failure_total{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (type)",
"format": "time_series", "format": "time_series",
"interval": "", "interval": "",
"intervalFactor": 1, "intervalFactor": 1,
@ -2592,7 +2592,7 @@
"targets": [ "targets": [
{ {
"exemplar": true, "exemplar": true,
"expr": "histogram_quantile(0.99, sum(rate(client_java_smart_raw_requests_latency_bucket{instance=~\"$instance\"}[$__rate_interval])) by (le, type))", "expr": "histogram_quantile(0.99, sum(rate(client_java_smart_raw_requests_latency_bucket{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (le, type))",
"format": "time_series", "format": "time_series",
"interval": "", "interval": "",
"intervalFactor": 1, "intervalFactor": 1,
@ -2742,7 +2742,7 @@
"targets": [ "targets": [
{ {
"exemplar": true, "exemplar": true,
"expr": "sum(rate(client_java_smart_raw_requests_latency_sum{instance=~\"$instance\"}[$__rate_interval])) by (type) / sum(rate(client_java_smart_raw_requests_latency_count{instance=~\"$instance\"}[$__rate_interval])) by (type)", "expr": "sum(rate(client_java_smart_raw_requests_latency_sum{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (type) / sum(rate(client_java_smart_raw_requests_latency_count{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (type)",
"format": "time_series", "format": "time_series",
"hide": false, "hide": false,
"interval": "", "interval": "",
@ -2853,7 +2853,7 @@
"targets": [ "targets": [
{ {
"exemplar": true, "exemplar": true,
"expr": "sum(rate(client_java_smart_raw_circuit_breaker_opened_total{instance=~\"$instance\"}[$__rate_interval])) by (type)", "expr": "sum(rate(client_java_smart_raw_circuit_breaker_opened_total{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (type)",
"format": "time_series", "format": "time_series",
"interval": "", "interval": "",
"intervalFactor": 1, "intervalFactor": 1,
@ -2954,7 +2954,7 @@
"targets": [ "targets": [
{ {
"exemplar": true, "exemplar": true,
"expr": "sum(rate(client_java_circuit_breaker_attempt_counter_total{instance=~\"$instance\"}[$__rate_interval])) by (type)", "expr": "sum(rate(client_java_circuit_breaker_attempt_counter_total{instance=~\"$instance\", cluster=~\"$cluster\"}[$__rate_interval])) by (type)",
"format": "time_series", "format": "time_series",
"interval": "", "interval": "",
"intervalFactor": 1, "intervalFactor": 1,
@ -5151,6 +5151,37 @@
"tagsQuery": "", "tagsQuery": "",
"type": "query", "type": "query",
"useTags": false "useTags": false
},
{
"allValue": ".*",
"current": {
"selected": true,
"text": "All",
"value": "$__all"
},
"datasource": "${DS_TEST-CLUSTER}",
"definition": "label_values(client_java_raw_requests_latency_count, cluster)",
"description": null,
"error": null,
"hide": 0,
"includeAll": true,
"label": "cluster",
"multi": false,
"name": "cluster",
"options": [],
"query": {
"query": "label_values(client_java_raw_requests_latency_count, cluster)",
"refId": "StandardVariableQuery"
},
"refresh": 1,
"regex": "",
"skipUrlSync": false,
"sort": 0,
"tagValuesQuery": "",
"tags": [],
"tagsQuery": "",
"type": "query",
"useTags": false
} }
] ]
}, },

View File

@ -82,18 +82,16 @@ public abstract class AbstractGRPCClient<
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace(String.format("Calling %s...", method.getFullMethodName())); logger.trace(String.format("Calling %s...", method.getFullMethodName()));
} }
RetryPolicy.Builder<RespT> builder = new Builder<>(backOffer); RetryPolicy<RespT> policy = new Builder<RespT>(backOffer).create(handler);
RespT resp = RespT resp =
builder policy.callWithRetry(
.create(handler) () -> {
.callWithRetry( BlockingStubT stub = getBlockingStub();
() -> { return ClientCalls.blockingUnaryCall(
BlockingStubT stub = getBlockingStub(); stub.getChannel(), method, stub.getCallOptions(), requestFactory.get());
return ClientCalls.blockingUnaryCall( },
stub.getChannel(), method, stub.getCallOptions(), requestFactory.get()); method.getFullMethodName(),
}, backOffer);
method.getFullMethodName(),
backOffer);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace(String.format("leaving %s...", method.getFullMethodName())); logger.trace(String.format("leaving %s...", method.getFullMethodName()));
@ -109,20 +107,18 @@ public abstract class AbstractGRPCClient<
ErrorHandler<RespT> handler) { ErrorHandler<RespT> handler) {
logger.debug(String.format("Calling %s...", method.getFullMethodName())); logger.debug(String.format("Calling %s...", method.getFullMethodName()));
RetryPolicy.Builder<RespT> builder = new Builder<>(backOffer); RetryPolicy<RespT> policy = new Builder<RespT>(backOffer).create(handler);
builder policy.callWithRetry(
.create(handler) () -> {
.callWithRetry( FutureStubT stub = getAsyncStub();
() -> { ClientCalls.asyncUnaryCall(
FutureStubT stub = getAsyncStub(); stub.getChannel().newCall(method, stub.getCallOptions()),
ClientCalls.asyncUnaryCall( requestFactory.get(),
stub.getChannel().newCall(method, stub.getCallOptions()), responseObserver);
requestFactory.get(), return null;
responseObserver); },
return null; method.getFullMethodName(),
}, backOffer);
method.getFullMethodName(),
backOffer);
logger.debug(String.format("leaving %s...", method.getFullMethodName())); logger.debug(String.format("leaving %s...", method.getFullMethodName()));
} }
@ -133,18 +129,17 @@ public abstract class AbstractGRPCClient<
ErrorHandler<StreamObserver<ReqT>> handler) { ErrorHandler<StreamObserver<ReqT>> handler) {
logger.debug(String.format("Calling %s...", method.getFullMethodName())); logger.debug(String.format("Calling %s...", method.getFullMethodName()));
RetryPolicy.Builder<StreamObserver<ReqT>> builder = new Builder<>(backOffer); RetryPolicy<StreamObserver<ReqT>> policy =
new Builder<StreamObserver<ReqT>>(backOffer).create(handler);
StreamObserver<ReqT> observer = StreamObserver<ReqT> observer =
builder policy.callWithRetry(
.create(handler) () -> {
.callWithRetry( FutureStubT stub = getAsyncStub();
() -> { return asyncBidiStreamingCall(
FutureStubT stub = getAsyncStub(); stub.getChannel().newCall(method, stub.getCallOptions()), responseObserver);
return asyncBidiStreamingCall( },
stub.getChannel().newCall(method, stub.getCallOptions()), responseObserver); method.getFullMethodName(),
}, backOffer);
method.getFullMethodName(),
backOffer);
logger.debug(String.format("leaving %s...", method.getFullMethodName())); logger.debug(String.format("leaving %s...", method.getFullMethodName()));
return observer; return observer;
} }
@ -156,19 +151,18 @@ public abstract class AbstractGRPCClient<
ErrorHandler<StreamingResponse> handler) { ErrorHandler<StreamingResponse> handler) {
logger.debug(String.format("Calling %s...", method.getFullMethodName())); logger.debug(String.format("Calling %s...", method.getFullMethodName()));
RetryPolicy.Builder<StreamingResponse> builder = new Builder<>(backOffer); RetryPolicy<StreamingResponse> policy =
new Builder<StreamingResponse>(backOffer).create(handler);
StreamingResponse response = StreamingResponse response =
builder policy.callWithRetry(
.create(handler) () -> {
.callWithRetry( BlockingStubT stub = getBlockingStub();
() -> { return new StreamingResponse(
BlockingStubT stub = getBlockingStub(); blockingServerStreamingCall(
return new StreamingResponse( stub.getChannel(), method, stub.getCallOptions(), requestFactory.get()));
blockingServerStreamingCall( },
stub.getChannel(), method, stub.getCallOptions(), requestFactory.get())); method.getFullMethodName(),
}, backOffer);
method.getFullMethodName(),
backOffer);
logger.debug(String.format("leaving %s...", method.getFullMethodName())); logger.debug(String.format("leaving %s...", method.getFullMethodName()));
return response; return response;
} }

View File

@ -65,7 +65,9 @@ public class KVClient implements AutoCloseable {
* @return a ByteString value if key exists, ByteString.EMPTY if key does not exist * @return a ByteString value if key exists, ByteString.EMPTY if key does not exist
*/ */
public ByteString get(ByteString key, long version) throws GrpcException { public ByteString get(ByteString key, long version) throws GrpcException {
BackOffer backOffer = ConcreteBackOffer.newGetBackOff(); BackOffer backOffer =
ConcreteBackOffer.newGetBackOff(
clientBuilder.getRegionManager().getPDClient().getClusterId());
while (true) { while (true) {
RegionStoreClient client = clientBuilder.build(key); RegionStoreClient client = clientBuilder.build(key);
try { try {

View File

@ -127,6 +127,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
HistogramUtils.buildDuration() HistogramUtils.buildDuration()
.name("client_java_pd_get_region_by_requests_latency") .name("client_java_pd_get_region_by_requests_latency")
.help("pd getRegionByKey request latency.") .help("pd getRegionByKey request latency.")
.labelNames("cluster")
.register(); .register();
private PDClient(TiConfiguration conf, ChannelFactory channelFactory) { private PDClient(TiConfiguration conf, ChannelFactory channelFactory) {
@ -281,7 +282,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
() -> GetOperatorRequest.newBuilder().setHeader(header).setRegionId(regionId).build(); () -> GetOperatorRequest.newBuilder().setHeader(header).setRegionId(regionId).build();
// get operator no need to handle error and no need back offer. // get operator no need to handle error and no need back offer.
return callWithRetry( return callWithRetry(
ConcreteBackOffer.newCustomBackOff(0), ConcreteBackOffer.newCustomBackOff(0, getClusterId()),
PDGrpc.getGetOperatorMethod(), PDGrpc.getGetOperatorMethod(),
request, request,
new NoopHandler<>()); new NoopHandler<>());
@ -309,7 +310,8 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
@Override @Override
public Pair<Metapb.Region, Metapb.Peer> getRegionByKey(BackOffer backOffer, ByteString key) { public Pair<Metapb.Region, Metapb.Peer> getRegionByKey(BackOffer backOffer, ByteString key) {
Histogram.Timer requestTimer = PD_GET_REGION_BY_KEY_REQUEST_LATENCY.startTimer(); Histogram.Timer requestTimer =
PD_GET_REGION_BY_KEY_REQUEST_LATENCY.labels(getClusterId().toString()).startTimer();
try { try {
if (conf.isTxnKVMode()) { if (conf.isTxnKVMode()) {
CodecDataOutput cdo = new CodecDataOutput(); CodecDataOutput cdo = new CodecDataOutput();
@ -841,7 +843,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
return builder.build(); return builder.build();
} }
public long getClusterId() { public Long getClusterId() {
return header.getClusterId(); return header.getClusterId();
} }

View File

@ -67,4 +67,6 @@ public interface ReadOnlyPDClient {
List<Store> getAllStores(BackOffer backOffer); List<Store> getAllStores(BackOffer backOffer);
TiConfiguration.ReplicaRead getReplicaRead(); TiConfiguration.ReplicaRead getReplicaRead();
Long getClusterId();
} }

View File

@ -80,7 +80,9 @@ public class Snapshot {
try (KVClient client = new KVClient(session, session.getRegionStoreClientBuilder())) { try (KVClient client = new KVClient(session, session.getRegionStoreClientBuilder())) {
List<KvPair> kvPairList = List<KvPair> kvPairList =
client.batchGet( client.batchGet(
ConcreteBackOffer.newCustomBackOff(backOffer), list, timestamp.getVersion()); ConcreteBackOffer.newCustomBackOff(backOffer, session.getPDClient().getClusterId()),
list,
timestamp.getVersion());
return kvPairList return kvPairList
.stream() .stream()
.map( .map(

View File

@ -62,7 +62,8 @@ public class StoreVersion {
public static boolean minTiKVVersion(String version, PDClient pdClient) { public static boolean minTiKVVersion(String version, PDClient pdClient) {
StoreVersion storeVersion = new StoreVersion(version); StoreVersion storeVersion = new StoreVersion(version);
BackOffer bo = ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF); BackOffer bo =
ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF, pdClient.getClusterId());
List<Metapb.Store> storeList = List<Metapb.Store> storeList =
pdClient pdClient
.getAllStores(bo) .getAllStores(bo)

View File

@ -158,7 +158,7 @@ public class TiSession implements AutoCloseable {
if (conf.isWarmUpEnable() && conf.isRawKVMode()) { if (conf.isWarmUpEnable() && conf.isRawKVMode()) {
warmUp(); warmUp();
} }
this.circuitBreaker = new CircuitBreakerImpl(conf); this.circuitBreaker = new CircuitBreakerImpl(conf, client.getClusterId());
logger.info("TiSession initialized in " + conf.getKvMode() + " mode"); logger.info("TiSession initialized in " + conf.getKvMode() + " mode");
} }
@ -179,7 +179,7 @@ public class TiSession implements AutoCloseable {
private synchronized void warmUp() { private synchronized void warmUp() {
long warmUpStartTime = System.nanoTime(); long warmUpStartTime = System.nanoTime();
BackOffer backOffer = ConcreteBackOffer.newRawKVBackOff(); BackOffer backOffer = ConcreteBackOffer.newRawKVBackOff(getPDClient().getClusterId());
try { try {
// let JVM ClassLoader load gRPC error related classes // let JVM ClassLoader load gRPC error related classes
// this operation may cost 100ms // this operation may cost 100ms
@ -329,7 +329,8 @@ public class TiSession implements AutoCloseable {
public TiTimestamp getTimestamp() { public TiTimestamp getTimestamp() {
checkIsClosed(); checkIsClosed();
return getPDClient().getTimestamp(ConcreteBackOffer.newTsoBackOff()); return getPDClient()
.getTimestamp(ConcreteBackOffer.newTsoBackOff(getPDClient().getClusterId()));
} }
public Snapshot createSnapshot() { public Snapshot createSnapshot() {
@ -586,13 +587,16 @@ public class TiSession implements AutoCloseable {
.stream() .stream()
.map(k -> Key.toRawKey(k).toByteString()) .map(k -> Key.toRawKey(k).toByteString())
.collect(Collectors.toList()), .collect(Collectors.toList()),
ConcreteBackOffer.newCustomBackOff(splitRegionBackoffMS)); ConcreteBackOffer.newCustomBackOff(splitRegionBackoffMS, getPDClient().getClusterId()));
// scatter region // scatter region
for (Metapb.Region newRegion : newRegions) { for (Metapb.Region newRegion : newRegions) {
try { try {
getPDClient() getPDClient()
.scatterRegion(newRegion, ConcreteBackOffer.newCustomBackOff(scatterRegionBackoffMS)); .scatterRegion(
newRegion,
ConcreteBackOffer.newCustomBackOff(
scatterRegionBackoffMS, getPDClient().getClusterId()));
} catch (Exception e) { } catch (Exception e) {
logger.warn(String.format("failed to scatter region: %d", newRegion.getId()), e); logger.warn(String.format("failed to scatter region: %d", newRegion.getId()), e);
} }
@ -609,7 +613,9 @@ public class TiSession implements AutoCloseable {
return; return;
} }
getPDClient() getPDClient()
.waitScatterRegionFinish(newRegion, ConcreteBackOffer.newCustomBackOff((int) remainMS)); .waitScatterRegionFinish(
newRegion,
ConcreteBackOffer.newCustomBackOff((int) remainMS, getPDClient().getClusterId()));
} }
} else { } else {
logger.info("skip to wait scatter region finish"); logger.info("skip to wait scatter region finish");

View File

@ -259,7 +259,9 @@ public class ImporterClient {
} }
Object writeResponse = clientLeader.getWriteResponse(); Object writeResponse = clientLeader.getWriteResponse();
BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(BackOffer.INGEST_BACKOFF); BackOffer backOffer =
ConcreteBackOffer.newCustomBackOff(
BackOffer.INGEST_BACKOFF, tiSession.getPDClient().getClusterId());
ingestWithRetry(writeResponse, backOffer); ingestWithRetry(writeResponse, backOffer);
} }

View File

@ -73,7 +73,8 @@ public class SwitchTiKVModeClient {
} }
private void doSwitchTiKVMode(ImportSstpb.SwitchMode mode) { private void doSwitchTiKVMode(ImportSstpb.SwitchMode mode) {
BackOffer bo = ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF); BackOffer bo =
ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF, pdClient.getClusterId());
List<Metapb.Store> allStores = pdClient.getAllStores(bo); List<Metapb.Store> allStores = pdClient.getAllStores(bo);
for (Metapb.Store store : allStores) { for (Metapb.Store store : allStores) {
ImporterStoreClient client = builder.build(new TiStore(store)); ImporterStoreClient client = builder.build(new TiStore(store));

View File

@ -36,5 +36,7 @@ public interface SlowLog {
return withFields(ImmutableMap.of(key, value)); return withFields(ImmutableMap.of(key, value));
} }
Object getField(String key);
void log(); void log();
} }

View File

@ -47,6 +47,11 @@ public class SlowLogEmptyImpl implements SlowLog {
return this; return this;
} }
@Override
public Object getField(String key) {
return null;
}
@Override @Override
public void log() {} public void log() {}
} }

View File

@ -92,6 +92,11 @@ public class SlowLogImpl implements SlowLog {
return this; return this;
} }
@Override
public Object getField(String key) {
return fields.get(key);
}
@Override @Override
public void log() { public void log() {
recordTime(); recordTime();

View File

@ -88,7 +88,8 @@ public class ConcreteScanIterator extends ScanIterator {
builder.getRegionManager().getRegionStorePairByKey(current.getKey()); builder.getRegionManager().getRegionStorePairByKey(current.getKey());
TiRegion region = pair.first; TiRegion region = pair.first;
TiStore store = pair.second; TiStore store = pair.second;
BackOffer backOffer = ConcreteBackOffer.newGetBackOff(); BackOffer backOffer =
ConcreteBackOffer.newGetBackOff(builder.getRegionManager().getPDClient().getClusterId());
try (RegionStoreClient client = builder.build(region, store)) { try (RegionStoreClient client = builder.build(region, store)) {
return client.get(backOffer, current.getKey(), version); return client.get(backOffer, current.getKey(), version);
} catch (Exception e) { } catch (Exception e) {

View File

@ -35,19 +35,19 @@ public abstract class RetryPolicy<RespT> {
HistogramUtils.buildDuration() HistogramUtils.buildDuration()
.name("client_java_grpc_single_requests_latency") .name("client_java_grpc_single_requests_latency")
.help("grpc request latency.") .help("grpc request latency.")
.labelNames("type") .labelNames("type", "cluster")
.register(); .register();
public static final Histogram CALL_WITH_RETRY_DURATION = public static final Histogram CALL_WITH_RETRY_DURATION =
HistogramUtils.buildDuration() HistogramUtils.buildDuration()
.name("client_java_call_with_retry_duration") .name("client_java_call_with_retry_duration")
.help("callWithRetry duration.") .help("callWithRetry duration.")
.labelNames("type") .labelNames("type", "cluster")
.register(); .register();
public static final Counter GRPC_REQUEST_RETRY_NUM = public static final Counter GRPC_REQUEST_RETRY_NUM =
Counter.build() Counter.build()
.name("client_java_grpc_requests_retry_num") .name("client_java_grpc_requests_retry_num")
.help("grpc request retry num.") .help("grpc request retry num.")
.labelNames("type") .labelNames("type", "cluster")
.register(); .register();
// handles PD and TiKV's error. // handles PD and TiKV's error.
@ -72,7 +72,8 @@ public abstract class RetryPolicy<RespT> {
} }
public RespT callWithRetry(Callable<RespT> proc, String methodName, BackOffer backOffer) { public RespT callWithRetry(Callable<RespT> proc, String methodName, BackOffer backOffer) {
Histogram.Timer callWithRetryTimer = CALL_WITH_RETRY_DURATION.labels(methodName).startTimer(); String[] labels = new String[] {methodName, backOffer.getClusterId().toString()};
Histogram.Timer callWithRetryTimer = CALL_WITH_RETRY_DURATION.labels(labels).startTimer();
SlowLogSpan callWithRetrySlowLogSpan = backOffer.getSlowLog().start("callWithRetry"); SlowLogSpan callWithRetrySlowLogSpan = backOffer.getSlowLog().start("callWithRetry");
callWithRetrySlowLogSpan.addProperty("method", methodName); callWithRetrySlowLogSpan.addProperty("method", methodName);
try { try {
@ -80,8 +81,7 @@ public abstract class RetryPolicy<RespT> {
RespT result = null; RespT result = null;
try { try {
// add single request duration histogram // add single request duration histogram
Histogram.Timer requestTimer = Histogram.Timer requestTimer = GRPC_SINGLE_REQUEST_LATENCY.labels(labels).startTimer();
GRPC_SINGLE_REQUEST_LATENCY.labels(methodName).startTimer();
SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("gRPC"); SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("gRPC");
slowLogSpan.addProperty("method", methodName); slowLogSpan.addProperty("method", methodName);
try { try {
@ -96,7 +96,7 @@ public abstract class RetryPolicy<RespT> {
backOffer.checkTimeout(); backOffer.checkTimeout();
boolean retry = handler.handleRequestError(backOffer, e); boolean retry = handler.handleRequestError(backOffer, e);
if (retry) { if (retry) {
GRPC_REQUEST_RETRY_NUM.labels(methodName).inc(); GRPC_REQUEST_RETRY_NUM.labels(labels).inc();
continue; continue;
} else { } else {
return result; return result;
@ -107,7 +107,7 @@ public abstract class RetryPolicy<RespT> {
if (handler != null) { if (handler != null) {
boolean retry = handler.handleResponseError(backOffer, result); boolean retry = handler.handleResponseError(backOffer, result);
if (retry) { if (retry) {
GRPC_REQUEST_RETRY_NUM.labels(methodName).inc(); GRPC_REQUEST_RETRY_NUM.labels(labels).inc();
continue; continue;
} }
} }

View File

@ -55,12 +55,14 @@ public abstract class AbstractRegionStoreClient
HistogramUtils.buildDuration() HistogramUtils.buildDuration()
.name("client_java_seek_leader_store_duration") .name("client_java_seek_leader_store_duration")
.help("seek leader store duration.") .help("seek leader store duration.")
.labelNames("cluster")
.register(); .register();
public static final Histogram SEEK_PROXY_STORE_DURATION = public static final Histogram SEEK_PROXY_STORE_DURATION =
HistogramUtils.buildDuration() HistogramUtils.buildDuration()
.name("client_java_seek_proxy_store_duration") .name("client_java_seek_proxy_store_duration")
.help("seek proxy store duration.") .help("seek proxy store duration.")
.labelNames("cluster")
.register(); .register();
protected final RegionManager regionManager; protected final RegionManager regionManager;
@ -202,7 +204,10 @@ public abstract class AbstractRegionStoreClient
} }
private Boolean seekLeaderStore(BackOffer backOffer) { private Boolean seekLeaderStore(BackOffer backOffer) {
Histogram.Timer switchLeaderDurationTimer = SEEK_LEADER_STORE_DURATION.startTimer(); Histogram.Timer switchLeaderDurationTimer =
SEEK_LEADER_STORE_DURATION
.labels(regionManager.getPDClient().getClusterId().toString())
.startTimer();
SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("seekLeaderStore"); SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("seekLeaderStore");
try { try {
List<Metapb.Peer> peers = region.getFollowerList(); List<Metapb.Peer> peers = region.getFollowerList();
@ -251,7 +256,10 @@ public abstract class AbstractRegionStoreClient
private boolean seekProxyStore(BackOffer backOffer) { private boolean seekProxyStore(BackOffer backOffer) {
SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("seekProxyStore"); SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("seekProxyStore");
Histogram.Timer grpcForwardDurationTimer = SEEK_PROXY_STORE_DURATION.startTimer(); Histogram.Timer grpcForwardDurationTimer =
SEEK_PROXY_STORE_DURATION
.labels(regionManager.getPDClient().getClusterId().toString())
.startTimer();
try { try {
logger.info(String.format("try grpc forward: region[%d]", region.getId())); logger.info(String.format("try grpc forward: region[%d]", region.getId()));
// when current leader cannot be reached // when current leader cannot be reached

View File

@ -52,11 +52,13 @@ public class RegionManager {
HistogramUtils.buildDuration() HistogramUtils.buildDuration()
.name("client_java_get_region_by_requests_latency") .name("client_java_get_region_by_requests_latency")
.help("getRegionByKey request latency.") .help("getRegionByKey request latency.")
.labelNames("cluster")
.register(); .register();
public static final Histogram SCAN_REGIONS_REQUEST_LATENCY = public static final Histogram SCAN_REGIONS_REQUEST_LATENCY =
HistogramUtils.buildDuration() HistogramUtils.buildDuration()
.name("client_java_scan_regions_request_latency") .name("client_java_scan_regions_request_latency")
.help("scanRegions request latency.") .help("scanRegions request latency.")
.labelNames("cluster")
.register(); .register();
// TODO: the region cache logic need rewrite. // TODO: the region cache logic need rewrite.
@ -105,7 +107,9 @@ public class RegionManager {
public List<Pdpb.Region> scanRegions( public List<Pdpb.Region> scanRegions(
BackOffer backOffer, ByteString startKey, ByteString endKey, int limit) { BackOffer backOffer, ByteString startKey, ByteString endKey, int limit) {
Histogram.Timer requestTimer = SCAN_REGIONS_REQUEST_LATENCY.startTimer(); Long clusterId = pdClient.getClusterId();
Histogram.Timer requestTimer =
SCAN_REGIONS_REQUEST_LATENCY.labels(clusterId.toString()).startTimer();
SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("scanRegions"); SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("scanRegions");
try { try {
return pdClient.scanRegions(backOffer, startKey, endKey, limit); return pdClient.scanRegions(backOffer, startKey, endKey, limit);
@ -122,7 +126,9 @@ public class RegionManager {
} }
public TiRegion getRegionByKey(ByteString key, BackOffer backOffer) { public TiRegion getRegionByKey(ByteString key, BackOffer backOffer) {
Histogram.Timer requestTimer = GET_REGION_BY_KEY_REQUEST_LATENCY.startTimer(); Long clusterId = pdClient.getClusterId();
Histogram.Timer requestTimer =
GET_REGION_BY_KEY_REQUEST_LATENCY.labels(clusterId.toString()).startTimer();
SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("getRegionByKey"); SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("getRegionByKey");
TiRegion region = cache.getRegionByKey(key, backOffer); TiRegion region = cache.getRegionByKey(key, backOffer);
try { try {
@ -316,6 +322,7 @@ public class RegionManager {
} }
private BackOffer defaultBackOff() { private BackOffer defaultBackOff() {
return ConcreteBackOffer.newCustomBackOff(conf.getRawKVDefaultBackoffInMS()); return ConcreteBackOffer.newCustomBackOff(
conf.getRawKVDefaultBackoffInMS(), pdClient.getClusterId());
} }
} }

View File

@ -18,7 +18,9 @@
package org.tikv.common.region; package org.tikv.common.region;
import static org.tikv.common.region.RegionStoreClient.RequestTypes.REQ_TYPE_DAG; import static org.tikv.common.region.RegionStoreClient.RequestTypes.REQ_TYPE_DAG;
import static org.tikv.common.util.BackOffFunction.BackOffFuncType.*; import static org.tikv.common.util.BackOffFunction.BackOffFuncType.BoRegionMiss;
import static org.tikv.common.util.BackOffFunction.BackOffFuncType.BoTxnLock;
import static org.tikv.common.util.BackOffFunction.BackOffFuncType.BoTxnLockFast;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
@ -29,7 +31,17 @@ import io.grpc.ManagedChannel;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.stub.MetadataUtils; import io.grpc.stub.MetadataUtils;
import io.prometheus.client.Histogram; import io.prometheus.client.Histogram;
import java.util.*; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -37,15 +49,64 @@ import org.tikv.common.PDClient;
import org.tikv.common.StoreVersion; import org.tikv.common.StoreVersion;
import org.tikv.common.TiConfiguration; import org.tikv.common.TiConfiguration;
import org.tikv.common.Version; import org.tikv.common.Version;
import org.tikv.common.exception.*; import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.KeyException;
import org.tikv.common.exception.RawCASConflictException;
import org.tikv.common.exception.RegionException;
import org.tikv.common.exception.SelectException;
import org.tikv.common.exception.TiClientInternalException;
import org.tikv.common.exception.TiKVException;
import org.tikv.common.log.SlowLogEmptyImpl; import org.tikv.common.log.SlowLogEmptyImpl;
import org.tikv.common.operation.KVErrorHandler; import org.tikv.common.operation.KVErrorHandler;
import org.tikv.common.operation.RegionErrorHandler; import org.tikv.common.operation.RegionErrorHandler;
import org.tikv.common.streaming.StreamingResponse; import org.tikv.common.streaming.StreamingResponse;
import org.tikv.common.util.*; import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.Batch;
import org.tikv.common.util.ChannelFactory;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.HistogramUtils;
import org.tikv.common.util.Pair;
import org.tikv.common.util.RangeSplitter;
import org.tikv.kvproto.Coprocessor; import org.tikv.kvproto.Coprocessor;
import org.tikv.kvproto.Errorpb; import org.tikv.kvproto.Errorpb;
import org.tikv.kvproto.Kvrpcpb.*; import org.tikv.kvproto.Kvrpcpb.BatchGetRequest;
import org.tikv.kvproto.Kvrpcpb.BatchGetResponse;
import org.tikv.kvproto.Kvrpcpb.CommitRequest;
import org.tikv.kvproto.Kvrpcpb.CommitResponse;
import org.tikv.kvproto.Kvrpcpb.GetRequest;
import org.tikv.kvproto.Kvrpcpb.GetResponse;
import org.tikv.kvproto.Kvrpcpb.KeyError;
import org.tikv.kvproto.Kvrpcpb.KvPair;
import org.tikv.kvproto.Kvrpcpb.Mutation;
import org.tikv.kvproto.Kvrpcpb.PrewriteRequest;
import org.tikv.kvproto.Kvrpcpb.PrewriteResponse;
import org.tikv.kvproto.Kvrpcpb.RawBatchDeleteRequest;
import org.tikv.kvproto.Kvrpcpb.RawBatchDeleteResponse;
import org.tikv.kvproto.Kvrpcpb.RawBatchGetRequest;
import org.tikv.kvproto.Kvrpcpb.RawBatchGetResponse;
import org.tikv.kvproto.Kvrpcpb.RawBatchPutRequest;
import org.tikv.kvproto.Kvrpcpb.RawBatchPutResponse;
import org.tikv.kvproto.Kvrpcpb.RawCASRequest;
import org.tikv.kvproto.Kvrpcpb.RawCASResponse;
import org.tikv.kvproto.Kvrpcpb.RawDeleteRangeRequest;
import org.tikv.kvproto.Kvrpcpb.RawDeleteRangeResponse;
import org.tikv.kvproto.Kvrpcpb.RawDeleteRequest;
import org.tikv.kvproto.Kvrpcpb.RawDeleteResponse;
import org.tikv.kvproto.Kvrpcpb.RawGetKeyTTLRequest;
import org.tikv.kvproto.Kvrpcpb.RawGetKeyTTLResponse;
import org.tikv.kvproto.Kvrpcpb.RawGetRequest;
import org.tikv.kvproto.Kvrpcpb.RawGetResponse;
import org.tikv.kvproto.Kvrpcpb.RawPutRequest;
import org.tikv.kvproto.Kvrpcpb.RawPutResponse;
import org.tikv.kvproto.Kvrpcpb.RawScanRequest;
import org.tikv.kvproto.Kvrpcpb.RawScanResponse;
import org.tikv.kvproto.Kvrpcpb.ScanRequest;
import org.tikv.kvproto.Kvrpcpb.ScanResponse;
import org.tikv.kvproto.Kvrpcpb.SplitRegionRequest;
import org.tikv.kvproto.Kvrpcpb.SplitRegionResponse;
import org.tikv.kvproto.Kvrpcpb.TxnHeartBeatRequest;
import org.tikv.kvproto.Kvrpcpb.TxnHeartBeatResponse;
import org.tikv.kvproto.Metapb; import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.TikvGrpc; import org.tikv.kvproto.TikvGrpc;
import org.tikv.kvproto.TikvGrpc.TikvBlockingStub; import org.tikv.kvproto.TikvGrpc.TikvBlockingStub;
@ -78,7 +139,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
HistogramUtils.buildDuration() HistogramUtils.buildDuration()
.name("client_java_grpc_raw_requests_latency") .name("client_java_grpc_raw_requests_latency")
.help("grpc raw request latency.") .help("grpc raw request latency.")
.labelNames("type") .labelNames("type", "cluster")
.register(); .register();
private synchronized Boolean getIsV4() { private synchronized Boolean getIsV4() {
@ -742,7 +803,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
StreamingResponse responseIterator = StreamingResponse responseIterator =
this.callServerStreamingWithRetry( this.callServerStreamingWithRetry(
ConcreteBackOffer.newCopNextMaxBackOff(), ConcreteBackOffer.newCopNextMaxBackOff(pdClient.getClusterId()),
TikvGrpc.getCoprocessorStreamMethod(), TikvGrpc.getCoprocessorStreamMethod(),
reqToSend, reqToSend,
handler); handler);
@ -778,7 +839,10 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
SplitRegionResponse resp = SplitRegionResponse resp =
callWithRetry( callWithRetry(
ConcreteBackOffer.newGetBackOff(), TikvGrpc.getSplitRegionMethod(), request, handler); ConcreteBackOffer.newGetBackOff(pdClient.getClusterId()),
TikvGrpc.getSplitRegionMethod(),
request,
handler);
if (resp == null) { if (resp == null) {
this.regionManager.onRequestFail(region); this.regionManager.onRequestFail(region);
@ -798,8 +862,9 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
// APIs for Raw Scan/Put/Get/Delete // APIs for Raw Scan/Put/Get/Delete
public Optional<ByteString> rawGet(BackOffer backOffer, ByteString key) { public Optional<ByteString> rawGet(BackOffer backOffer, ByteString key) {
Long clusterId = pdClient.getClusterId();
Histogram.Timer requestTimer = Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_get").startTimer(); GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_get", clusterId.toString()).startTimer();
try { try {
Supplier<RawGetRequest> factory = Supplier<RawGetRequest> factory =
() -> () ->
@ -837,8 +902,11 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
} }
public Optional<Long> rawGetKeyTTL(BackOffer backOffer, ByteString key) { public Optional<Long> rawGetKeyTTL(BackOffer backOffer, ByteString key) {
Long clusterId = pdClient.getClusterId();
Histogram.Timer requestTimer = Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_get_key_ttl").startTimer(); GRPC_RAW_REQUEST_LATENCY
.labels("client_grpc_raw_get_key_ttl", clusterId.toString())
.startTimer();
try { try {
Supplier<RawGetKeyTTLRequest> factory = Supplier<RawGetKeyTTLRequest> factory =
() -> () ->
@ -876,8 +944,11 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
} }
public void rawDelete(BackOffer backOffer, ByteString key, boolean atomicForCAS) { public void rawDelete(BackOffer backOffer, ByteString key, boolean atomicForCAS) {
Long clusterId = pdClient.getClusterId();
Histogram.Timer requestTimer = Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_delete").startTimer(); GRPC_RAW_REQUEST_LATENCY
.labels("client_grpc_raw_delete", clusterId.toString())
.startTimer();
try { try {
Supplier<RawDeleteRequest> factory = Supplier<RawDeleteRequest> factory =
() -> () ->
@ -914,8 +985,9 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
public void rawPut( public void rawPut(
BackOffer backOffer, ByteString key, ByteString value, long ttl, boolean atomicForCAS) { BackOffer backOffer, ByteString key, ByteString value, long ttl, boolean atomicForCAS) {
Long clusterId = pdClient.getClusterId();
Histogram.Timer requestTimer = Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_put").startTimer(); GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_put", clusterId.toString()).startTimer();
try { try {
Supplier<RawPutRequest> factory = Supplier<RawPutRequest> factory =
() -> () ->
@ -958,8 +1030,11 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
ByteString value, ByteString value,
long ttl) long ttl)
throws RawCASConflictException { throws RawCASConflictException {
Long clusterId = pdClient.getClusterId();
Histogram.Timer requestTimer = Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_put_if_absent").startTimer(); GRPC_RAW_REQUEST_LATENCY
.labels("client_grpc_raw_put_if_absent", clusterId.toString())
.startTimer();
try { try {
Supplier<RawCASRequest> factory = Supplier<RawCASRequest> factory =
() -> () ->
@ -1008,8 +1083,11 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
} }
public List<KvPair> rawBatchGet(BackOffer backoffer, List<ByteString> keys) { public List<KvPair> rawBatchGet(BackOffer backoffer, List<ByteString> keys) {
Long clusterId = pdClient.getClusterId();
Histogram.Timer requestTimer = Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_batch_get").startTimer(); GRPC_RAW_REQUEST_LATENCY
.labels("client_grpc_raw_batch_get", clusterId.toString())
.startTimer();
try { try {
if (keys.isEmpty()) { if (keys.isEmpty()) {
return new ArrayList<>(); return new ArrayList<>();
@ -1044,8 +1122,11 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
public void rawBatchPut( public void rawBatchPut(
BackOffer backOffer, List<KvPair> kvPairs, long ttl, boolean atomicForCAS) { BackOffer backOffer, List<KvPair> kvPairs, long ttl, boolean atomicForCAS) {
Long clusterId = pdClient.getClusterId();
Histogram.Timer requestTimer = Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_batch_put").startTimer(); GRPC_RAW_REQUEST_LATENCY
.labels("client_grpc_raw_batch_put", clusterId.toString())
.startTimer();
try { try {
if (kvPairs.isEmpty()) { if (kvPairs.isEmpty()) {
return; return;
@ -1097,8 +1178,11 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
} }
public void rawBatchDelete(BackOffer backoffer, List<ByteString> keys, boolean atomicForCAS) { public void rawBatchDelete(BackOffer backoffer, List<ByteString> keys, boolean atomicForCAS) {
Long clusterId = pdClient.getClusterId();
Histogram.Timer requestTimer = Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_batch_delete").startTimer(); GRPC_RAW_REQUEST_LATENCY
.labels("client_grpc_raw_batch_delete", clusterId.toString())
.startTimer();
try { try {
if (keys.isEmpty()) { if (keys.isEmpty()) {
return; return;
@ -1145,8 +1229,9 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
* @return KvPair list * @return KvPair list
*/ */
public List<KvPair> rawScan(BackOffer backOffer, ByteString key, int limit, boolean keyOnly) { public List<KvPair> rawScan(BackOffer backOffer, ByteString key, int limit, boolean keyOnly) {
Long clusterId = pdClient.getClusterId();
Histogram.Timer requestTimer = Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_scan").startTimer(); GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_scan", clusterId.toString()).startTimer();
try { try {
Supplier<RawScanRequest> factory = Supplier<RawScanRequest> factory =
() -> () ->
@ -1191,8 +1276,11 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
* @param endKey endKey * @param endKey endKey
*/ */
public void rawDeleteRange(BackOffer backOffer, ByteString startKey, ByteString endKey) { public void rawDeleteRange(BackOffer backOffer, ByteString startKey, ByteString endKey) {
Long clusterId = pdClient.getClusterId();
Histogram.Timer requestTimer = Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_delete_range").startTimer(); GRPC_RAW_REQUEST_LATENCY
.labels("client_grpc_raw_delete_range", clusterId.toString())
.startTimer();
try { try {
Supplier<RawDeleteRangeRequest> factory = Supplier<RawDeleteRangeRequest> factory =
() -> () ->
@ -1349,7 +1437,10 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
} }
private BackOffer defaultBackOff() { private BackOffer defaultBackOff() {
return ConcreteBackOffer.newCustomBackOff(conf.getRawKVDefaultBackoffInMS()); BackOffer backoffer =
ConcreteBackOffer.newCustomBackOff(
conf.getRawKVDefaultBackoffInMS(), pdClient.getClusterId());
return backoffer;
} }
} }
} }

View File

@ -89,7 +89,9 @@ public class StoreHealthyChecker implements Runnable {
private boolean checkStoreTombstone(TiStore store) { private boolean checkStoreTombstone(TiStore store) {
try { try {
Metapb.Store newStore = pdClient.getStore(ConcreteBackOffer.newRawKVBackOff(), store.getId()); Metapb.Store newStore =
pdClient.getStore(
ConcreteBackOffer.newRawKVBackOff(pdClient.getClusterId()), store.getId());
if (newStore != null && newStore.getState() == Metapb.StoreState.Tombstone) { if (newStore != null && newStore.getState() == Metapb.StoreState.Tombstone) {
return true; return true;
} }

View File

@ -70,4 +70,6 @@ public interface BackOffer {
} }
SlowLog getSlowLog(); SlowLog getSlowLog();
Long getClusterId();
} }

View File

@ -39,6 +39,7 @@ import org.tikv.common.log.SlowLogSpan;
public class ConcreteBackOffer implements BackOffer { public class ConcreteBackOffer implements BackOffer {
private static final Logger logger = LoggerFactory.getLogger(ConcreteBackOffer.class); private static final Logger logger = LoggerFactory.getLogger(ConcreteBackOffer.class);
private final int maxSleep; private final int maxSleep;
private final Long clusterId;
@VisibleForTesting @VisibleForTesting
public final Map<BackOffFunction.BackOffFuncType, BackOffFunction> backOffFunctionMap; public final Map<BackOffFunction.BackOffFuncType, BackOffFunction> backOffFunctionMap;
@ -52,14 +53,15 @@ public class ConcreteBackOffer implements BackOffer {
HistogramUtils.buildDuration() HistogramUtils.buildDuration()
.name("client_java_backoff_duration") .name("client_java_backoff_duration")
.help("backoff duration.") .help("backoff duration.")
.labelNames("type") .labelNames("type", "cluster")
.register(); .register();
private ConcreteBackOffer(int maxSleep, long deadline, SlowLog slowLog) { private ConcreteBackOffer(int maxSleep, long deadline, SlowLog slowLog, long clusterId) {
Preconditions.checkArgument( Preconditions.checkArgument(
maxSleep == 0 || deadline == 0, "Max sleep time should be 0 or Deadline should be 0."); maxSleep == 0 || deadline == 0, "Max sleep time should be 0 or Deadline should be 0.");
Preconditions.checkArgument(maxSleep >= 0, "Max sleep time cannot be less than 0."); Preconditions.checkArgument(maxSleep >= 0, "Max sleep time cannot be less than 0.");
Preconditions.checkArgument(deadline >= 0, "Deadline cannot be less than 0."); Preconditions.checkArgument(deadline >= 0, "Deadline cannot be less than 0.");
this.clusterId = clusterId;
this.maxSleep = maxSleep; this.maxSleep = maxSleep;
this.errors = Collections.synchronizedList(new ArrayList<>()); this.errors = Collections.synchronizedList(new ArrayList<>());
this.backOffFunctionMap = new ConcurrentHashMap<>(); this.backOffFunctionMap = new ConcurrentHashMap<>();
@ -68,6 +70,7 @@ public class ConcreteBackOffer implements BackOffer {
} }
private ConcreteBackOffer(ConcreteBackOffer source) { private ConcreteBackOffer(ConcreteBackOffer source) {
this.clusterId = source.clusterId;
this.maxSleep = source.maxSleep; this.maxSleep = source.maxSleep;
this.totalSleep = source.totalSleep; this.totalSleep = source.totalSleep;
this.errors = source.errors; this.errors = source.errors;
@ -76,37 +79,54 @@ public class ConcreteBackOffer implements BackOffer {
this.slowLog = source.slowLog; this.slowLog = source.slowLog;
} }
public static ConcreteBackOffer newDeadlineBackOff(int timeoutInMs, SlowLog slowLog) { public static ConcreteBackOffer newDeadlineBackOff(
int timeoutInMs, SlowLog slowLog, long clusterId) {
long deadline = System.currentTimeMillis() + timeoutInMs; long deadline = System.currentTimeMillis() + timeoutInMs;
return new ConcreteBackOffer(0, deadline, slowLog); return new ConcreteBackOffer(0, deadline, slowLog, clusterId);
}
public static ConcreteBackOffer newDeadlineBackOff(int timeoutInMs, SlowLog slowLog) {
return newDeadlineBackOff(timeoutInMs, slowLog, 0);
}
public static ConcreteBackOffer newCustomBackOff(int maxSleep, long clusterId) {
return new ConcreteBackOffer(maxSleep, 0, SlowLogEmptyImpl.INSTANCE, clusterId);
} }
public static ConcreteBackOffer newCustomBackOff(int maxSleep) { public static ConcreteBackOffer newCustomBackOff(int maxSleep) {
return new ConcreteBackOffer(maxSleep, 0, SlowLogEmptyImpl.INSTANCE); return newCustomBackOff(maxSleep, 0);
} }
public static ConcreteBackOffer newScannerNextMaxBackOff() { public static ConcreteBackOffer newScannerNextMaxBackOff() {
return new ConcreteBackOffer(SCANNER_NEXT_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE); return new ConcreteBackOffer(SCANNER_NEXT_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE, 0);
} }
public static ConcreteBackOffer newBatchGetMaxBackOff() { public static ConcreteBackOffer newBatchGetMaxBackOff() {
return new ConcreteBackOffer(BATCH_GET_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE); return new ConcreteBackOffer(BATCH_GET_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE, 0);
} }
public static ConcreteBackOffer newCopNextMaxBackOff() { public static ConcreteBackOffer newCopNextMaxBackOff() {
return new ConcreteBackOffer(COP_NEXT_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE); return newCopNextMaxBackOff(0);
} }
public static ConcreteBackOffer newGetBackOff() { public static ConcreteBackOffer newCopNextMaxBackOff(long clusterId) {
return new ConcreteBackOffer(GET_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE); return new ConcreteBackOffer(COP_NEXT_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE, clusterId);
}
public static ConcreteBackOffer newGetBackOff(long clusterId) {
return new ConcreteBackOffer(GET_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE, clusterId);
}
public static ConcreteBackOffer newRawKVBackOff(long clusterId) {
return new ConcreteBackOffer(RAWKV_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE, clusterId);
} }
public static ConcreteBackOffer newRawKVBackOff() { public static ConcreteBackOffer newRawKVBackOff() {
return new ConcreteBackOffer(RAWKV_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE); return newRawKVBackOff(0);
} }
public static ConcreteBackOffer newTsoBackOff() { public static ConcreteBackOffer newTsoBackOff(long clusterId) {
return new ConcreteBackOffer(TSO_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE); return new ConcreteBackOffer(TSO_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE, clusterId);
} }
public static ConcreteBackOffer create(BackOffer source) { public static ConcreteBackOffer create(BackOffer source) {
@ -173,7 +193,8 @@ public class ConcreteBackOffer implements BackOffer {
} }
public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType, long maxSleepMs) { public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType, long maxSleepMs) {
Histogram.Timer backOffTimer = BACKOFF_DURATION.labels(funcType.name()).startTimer(); String[] labels = new String[] {funcType.name(), clusterId.toString()};
Histogram.Timer backOffTimer = BACKOFF_DURATION.labels(labels).startTimer();
SlowLogSpan slowLogSpan = getSlowLog().start("backoff"); SlowLogSpan slowLogSpan = getSlowLog().start("backoff");
slowLogSpan.addProperty("type", funcType.name()); slowLogSpan.addProperty("type", funcType.name());
BackOffFunction backOffFunction = BackOffFunction backOffFunction =
@ -239,4 +260,8 @@ public class ConcreteBackOffer implements BackOffer {
public SlowLog getSlowLog() { public SlowLog getSlowLog() {
return slowLog; return slowLog;
} }
public Long getClusterId() {
return clusterId;
}
} }

View File

@ -73,7 +73,7 @@ import org.tikv.common.util.ScanOption;
import org.tikv.kvproto.Kvrpcpb.KvPair; import org.tikv.kvproto.Kvrpcpb.KvPair;
public class RawKVClient implements RawKVClientBase { public class RawKVClient implements RawKVClientBase {
private final long clusterId; private final Long clusterId;
private final List<URI> pdAddresses; private final List<URI> pdAddresses;
private final TiSession tiSession; private final TiSession tiSession;
private final RegionStoreClientBuilder clientBuilder; private final RegionStoreClientBuilder clientBuilder;
@ -90,21 +90,21 @@ public class RawKVClient implements RawKVClientBase {
HistogramUtils.buildDuration() HistogramUtils.buildDuration()
.name("client_java_raw_requests_latency") .name("client_java_raw_requests_latency")
.help("client raw request latency.") .help("client raw request latency.")
.labelNames("type") .labelNames("type", "cluster")
.register(); .register();
public static final Counter RAW_REQUEST_SUCCESS = public static final Counter RAW_REQUEST_SUCCESS =
Counter.build() Counter.build()
.name("client_java_raw_requests_success") .name("client_java_raw_requests_success")
.help("client raw request success.") .help("client raw request success.")
.labelNames("type") .labelNames("type", "cluster")
.register(); .register();
public static final Counter RAW_REQUEST_FAILURE = public static final Counter RAW_REQUEST_FAILURE =
Counter.build() Counter.build()
.name("client_java_raw_requests_failure") .name("client_java_raw_requests_failure")
.help("client raw request failure.") .help("client raw request failure.")
.labelNames("type") .labelNames("type", "cluster")
.register(); .register();
private static final TiKVException ERR_MAX_SCAN_LIMIT_EXCEEDED = private static final TiKVException ERR_MAX_SCAN_LIMIT_EXCEEDED =
@ -130,6 +130,10 @@ public class RawKVClient implements RawKVClientBase {
return logger.withField("cluster_id", clusterId).withField("pd_addresses", pdAddresses); return logger.withField("cluster_id", clusterId).withField("pd_addresses", pdAddresses);
} }
private String[] withClusterId(String label) {
return new String[] {label, clusterId.toString()};
}
@Override @Override
public void close() {} public void close() {}
@ -140,21 +144,21 @@ public class RawKVClient implements RawKVClientBase {
@Override @Override
public void put(ByteString key, ByteString value, long ttl) { public void put(ByteString key, ByteString value, long ttl) {
String label = "client_raw_put"; String[] labels = withClusterId("client_raw_put");
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVWriteSlowLogInMS())); SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVWriteSlowLogInMS()));
SlowLogSpan span = slowLog.start("put"); SlowLogSpan span = slowLog.start("put");
span.addProperty("key", KeyUtils.formatBytesUTF8(key)); span.addProperty("key", KeyUtils.formatBytesUTF8(key));
ConcreteBackOffer backOffer = ConcreteBackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog); ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog, clusterId);
try { try {
while (true) { while (true) {
try (RegionStoreClient client = clientBuilder.build(key, backOffer)) { try (RegionStoreClient client = clientBuilder.build(key, backOffer)) {
span.addProperty("region", client.getRegion().toString()); span.addProperty("region", client.getRegion().toString());
client.rawPut(backOffer, key, value, ttl, atomicForCAS); client.rawPut(backOffer, key, value, ttl, atomicForCAS);
RAW_REQUEST_SUCCESS.labels(label).inc(); RAW_REQUEST_SUCCESS.labels(labels).inc();
return; return;
} catch (final TiKVException e) { } catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
@ -162,7 +166,7 @@ public class RawKVClient implements RawKVClientBase {
} }
} }
} catch (Exception e) { } catch (Exception e) {
RAW_REQUEST_FAILURE.labels(label).inc(); RAW_REQUEST_FAILURE.labels(labels).inc();
slowLog.setError(e); slowLog.setError(e);
throw e; throw e;
} finally { } finally {
@ -202,21 +206,21 @@ public class RawKVClient implements RawKVClientBase {
"To use compareAndSet or putIfAbsent, please enable the config tikv.enable_atomic_for_cas."); "To use compareAndSet or putIfAbsent, please enable the config tikv.enable_atomic_for_cas.");
} }
String label = "client_raw_compare_and_set"; String[] labels = withClusterId("client_raw_compare_and_set");
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVWriteSlowLogInMS())); SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVWriteSlowLogInMS()));
SlowLogSpan span = slowLog.start("putIfAbsent"); SlowLogSpan span = slowLog.start("putIfAbsent");
span.addProperty("key", KeyUtils.formatBytesUTF8(key)); span.addProperty("key", KeyUtils.formatBytesUTF8(key));
ConcreteBackOffer backOffer = ConcreteBackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog); ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog, clusterId);
try { try {
while (true) { while (true) {
try (RegionStoreClient client = clientBuilder.build(key, backOffer)) { try (RegionStoreClient client = clientBuilder.build(key, backOffer)) {
span.addProperty("region", client.getRegion().toString()); span.addProperty("region", client.getRegion().toString());
client.rawCompareAndSet(backOffer, key, prevValue, value, ttl); client.rawCompareAndSet(backOffer, key, prevValue, value, ttl);
RAW_REQUEST_SUCCESS.labels(label).inc(); RAW_REQUEST_SUCCESS.labels(labels).inc();
return; return;
} catch (final TiKVException e) { } catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
@ -224,7 +228,7 @@ public class RawKVClient implements RawKVClientBase {
} }
} }
} catch (Exception e) { } catch (Exception e) {
RAW_REQUEST_FAILURE.labels(label).inc(); RAW_REQUEST_FAILURE.labels(labels).inc();
slowLog.setError(e); slowLog.setError(e);
throw e; throw e;
} finally { } finally {
@ -241,21 +245,22 @@ public class RawKVClient implements RawKVClientBase {
@Override @Override
public void batchPut(Map<ByteString, ByteString> kvPairs, long ttl) { public void batchPut(Map<ByteString, ByteString> kvPairs, long ttl) {
String label = "client_raw_batch_put"; String[] labels = withClusterId("client_raw_batch_put");
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVBatchWriteSlowLogInMS())); SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVBatchWriteSlowLogInMS()));
SlowLogSpan span = slowLog.start("batchPut"); SlowLogSpan span = slowLog.start("batchPut");
span.addProperty("keySize", String.valueOf(kvPairs.size())); span.addProperty("keySize", String.valueOf(kvPairs.size()));
ConcreteBackOffer backOffer = ConcreteBackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVBatchWriteTimeoutInMS(), slowLog); ConcreteBackOffer.newDeadlineBackOff(
conf.getRawKVBatchWriteTimeoutInMS(), slowLog, clusterId);
try { try {
long deadline = System.currentTimeMillis() + conf.getRawKVBatchWriteTimeoutInMS(); long deadline = System.currentTimeMillis() + conf.getRawKVBatchWriteTimeoutInMS();
doSendBatchPut(backOffer, kvPairs, ttl, deadline); doSendBatchPut(backOffer, kvPairs, ttl, deadline);
RAW_REQUEST_SUCCESS.labels(label).inc(); RAW_REQUEST_SUCCESS.labels(labels).inc();
} catch (Exception e) { } catch (Exception e) {
RAW_REQUEST_FAILURE.labels(label).inc(); RAW_REQUEST_FAILURE.labels(labels).inc();
slowLog.setError(e); slowLog.setError(e);
throw e; throw e;
} finally { } finally {
@ -267,21 +272,21 @@ public class RawKVClient implements RawKVClientBase {
@Override @Override
public Optional<ByteString> get(ByteString key) { public Optional<ByteString> get(ByteString key) {
String label = "client_raw_get"; String[] labels = withClusterId("client_raw_get");
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVReadSlowLogInMS())); SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVReadSlowLogInMS()));
SlowLogSpan span = slowLog.start("get"); SlowLogSpan span = slowLog.start("get");
span.addProperty("key", KeyUtils.formatBytesUTF8(key)); span.addProperty("key", KeyUtils.formatBytesUTF8(key));
ConcreteBackOffer backOffer = ConcreteBackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS(), slowLog); ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS(), slowLog, clusterId);
try { try {
while (true) { while (true) {
try (RegionStoreClient client = clientBuilder.build(key, backOffer)) { try (RegionStoreClient client = clientBuilder.build(key, backOffer)) {
span.addProperty("region", client.getRegion().toString()); span.addProperty("region", client.getRegion().toString());
Optional<ByteString> result = client.rawGet(backOffer, key); Optional<ByteString> result = client.rawGet(backOffer, key);
RAW_REQUEST_SUCCESS.labels(label).inc(); RAW_REQUEST_SUCCESS.labels(labels).inc();
return result; return result;
} catch (final TiKVException e) { } catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
@ -289,7 +294,7 @@ public class RawKVClient implements RawKVClientBase {
} }
} }
} catch (Exception e) { } catch (Exception e) {
RAW_REQUEST_FAILURE.labels(label).inc(); RAW_REQUEST_FAILURE.labels(labels).inc();
slowLog.setError(e); slowLog.setError(e);
throw e; throw e;
} finally { } finally {
@ -301,20 +306,21 @@ public class RawKVClient implements RawKVClientBase {
@Override @Override
public List<KvPair> batchGet(List<ByteString> keys) { public List<KvPair> batchGet(List<ByteString> keys) {
String label = "client_raw_batch_get"; String[] labels = withClusterId("client_raw_batch_get");
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVBatchReadSlowLogInMS())); SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVBatchReadSlowLogInMS()));
SlowLogSpan span = slowLog.start("batchGet"); SlowLogSpan span = slowLog.start("batchGet");
span.addProperty("keySize", String.valueOf(keys.size())); span.addProperty("keySize", String.valueOf(keys.size()));
ConcreteBackOffer backOffer = ConcreteBackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVBatchReadTimeoutInMS(), slowLog); ConcreteBackOffer.newDeadlineBackOff(
conf.getRawKVBatchReadTimeoutInMS(), slowLog, clusterId);
try { try {
long deadline = System.currentTimeMillis() + conf.getRawKVBatchReadTimeoutInMS(); long deadline = System.currentTimeMillis() + conf.getRawKVBatchReadTimeoutInMS();
List<KvPair> result = doSendBatchGet(backOffer, keys, deadline); List<KvPair> result = doSendBatchGet(backOffer, keys, deadline);
RAW_REQUEST_SUCCESS.labels(label).inc(); RAW_REQUEST_SUCCESS.labels(labels).inc();
return result; return result;
} catch (Exception e) { } catch (Exception e) {
RAW_REQUEST_FAILURE.labels(label).inc(); RAW_REQUEST_FAILURE.labels(labels).inc();
slowLog.setError(e); slowLog.setError(e);
throw e; throw e;
} finally { } finally {
@ -326,20 +332,20 @@ public class RawKVClient implements RawKVClientBase {
@Override @Override
public void batchDelete(List<ByteString> keys) { public void batchDelete(List<ByteString> keys) {
String label = "client_raw_batch_delete"; String[] labels = withClusterId("client_raw_batch_delete");
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVBatchWriteSlowLogInMS())); SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVBatchWriteSlowLogInMS()));
SlowLogSpan span = slowLog.start("batchDelete"); SlowLogSpan span = slowLog.start("batchDelete");
span.addProperty("keySize", String.valueOf(keys.size())); span.addProperty("keySize", String.valueOf(keys.size()));
ConcreteBackOffer backOffer = ConcreteBackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVBatchWriteTimeoutInMS(), slowLog); ConcreteBackOffer.newDeadlineBackOff(
conf.getRawKVBatchWriteTimeoutInMS(), slowLog, clusterId);
try { try {
long deadline = System.currentTimeMillis() + conf.getRawKVBatchWriteTimeoutInMS(); long deadline = System.currentTimeMillis() + conf.getRawKVBatchWriteTimeoutInMS();
doSendBatchDelete(backOffer, keys, deadline); doSendBatchDelete(backOffer, keys, deadline);
RAW_REQUEST_SUCCESS.labels(label).inc(); RAW_REQUEST_SUCCESS.labels(labels).inc();
return;
} catch (Exception e) { } catch (Exception e) {
RAW_REQUEST_FAILURE.labels(label).inc(); RAW_REQUEST_FAILURE.labels(labels).inc();
slowLog.setError(e); slowLog.setError(e);
throw e; throw e;
} finally { } finally {
@ -351,19 +357,19 @@ public class RawKVClient implements RawKVClientBase {
@Override @Override
public Optional<Long> getKeyTTL(ByteString key) { public Optional<Long> getKeyTTL(ByteString key) {
String label = "client_raw_get_key_ttl"; String[] labels = withClusterId("client_raw_get_key_ttl");
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVReadSlowLogInMS())); SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVReadSlowLogInMS()));
SlowLogSpan span = slowLog.start("getKeyTTL"); SlowLogSpan span = slowLog.start("getKeyTTL");
span.addProperty("key", KeyUtils.formatBytesUTF8(key)); span.addProperty("key", KeyUtils.formatBytesUTF8(key));
ConcreteBackOffer backOffer = ConcreteBackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS(), slowLog); ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS(), slowLog, clusterId);
try { try {
while (true) { while (true) {
try (RegionStoreClient client = clientBuilder.build(key, backOffer)) { try (RegionStoreClient client = clientBuilder.build(key, backOffer)) {
span.addProperty("region", client.getRegion().toString()); span.addProperty("region", client.getRegion().toString());
Optional<Long> result = client.rawGetKeyTTL(backOffer, key); Optional<Long> result = client.rawGetKeyTTL(backOffer, key);
RAW_REQUEST_SUCCESS.labels(label).inc(); RAW_REQUEST_SUCCESS.labels(labels).inc();
return result; return result;
} catch (final TiKVException e) { } catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
@ -371,7 +377,7 @@ public class RawKVClient implements RawKVClientBase {
} }
} }
} catch (Exception e) { } catch (Exception e) {
RAW_REQUEST_FAILURE.labels(label).inc(); RAW_REQUEST_FAILURE.labels(labels).inc();
slowLog.setError(e); slowLog.setError(e);
throw e; throw e;
} finally { } finally {
@ -403,8 +409,8 @@ public class RawKVClient implements RawKVClientBase {
@Override @Override
public List<List<KvPair>> batchScan(List<ScanOption> ranges) { public List<List<KvPair>> batchScan(List<ScanOption> ranges) {
String label = "client_raw_batch_scan"; String[] labels = withClusterId("client_raw_batch_scan");
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
long deadline = System.currentTimeMillis() + conf.getRawKVScanTimeoutInMS(); long deadline = System.currentTimeMillis() + conf.getRawKVScanTimeoutInMS();
List<Future<Pair<Integer, List<KvPair>>>> futureList = new ArrayList<>(); List<Future<Pair<Integer, List<KvPair>>>> futureList = new ArrayList<>();
try { try {
@ -439,10 +445,10 @@ public class RawKVClient implements RawKVClientBase {
throw new TiKVException("Execution exception met.", e); throw new TiKVException("Execution exception met.", e);
} }
} }
RAW_REQUEST_SUCCESS.labels(label).inc(); RAW_REQUEST_SUCCESS.labels(labels).inc();
return scanResults; return scanResults;
} catch (Exception e) { } catch (Exception e) {
RAW_REQUEST_FAILURE.labels(label).inc(); RAW_REQUEST_FAILURE.labels(labels).inc();
for (Future<Pair<Integer, List<KvPair>>> future : futureList) { for (Future<Pair<Integer, List<KvPair>>> future : futureList) {
future.cancel(true); future.cancel(true);
} }
@ -459,8 +465,8 @@ public class RawKVClient implements RawKVClientBase {
@Override @Override
public List<KvPair> scan(ByteString startKey, ByteString endKey, int limit, boolean keyOnly) { public List<KvPair> scan(ByteString startKey, ByteString endKey, int limit, boolean keyOnly) {
String label = "client_raw_scan"; String[] labels = withClusterId("client_raw_scan");
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVScanSlowLogInMS())); SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVScanSlowLogInMS()));
SlowLogSpan span = slowLog.start("scan"); SlowLogSpan span = slowLog.start("scan");
span.addProperty("startKey", KeyUtils.formatBytesUTF8(startKey)); span.addProperty("startKey", KeyUtils.formatBytesUTF8(startKey));
@ -468,16 +474,16 @@ public class RawKVClient implements RawKVClientBase {
span.addProperty("limit", String.valueOf(limit)); span.addProperty("limit", String.valueOf(limit));
span.addProperty("keyOnly", String.valueOf(keyOnly)); span.addProperty("keyOnly", String.valueOf(keyOnly));
ConcreteBackOffer backOffer = ConcreteBackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVScanTimeoutInMS(), slowLog); ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVScanTimeoutInMS(), slowLog, clusterId);
try { try {
Iterator<KvPair> iterator = Iterator<KvPair> iterator =
rawScanIterator(conf, clientBuilder, startKey, endKey, limit, keyOnly, backOffer); rawScanIterator(conf, clientBuilder, startKey, endKey, limit, keyOnly, backOffer);
List<KvPair> result = new ArrayList<>(); List<KvPair> result = new ArrayList<>();
iterator.forEachRemaining(result::add); iterator.forEachRemaining(result::add);
RAW_REQUEST_SUCCESS.labels(label).inc(); RAW_REQUEST_SUCCESS.labels(labels).inc();
return result; return result;
} catch (Exception e) { } catch (Exception e) {
RAW_REQUEST_FAILURE.labels(label).inc(); RAW_REQUEST_FAILURE.labels(labels).inc();
slowLog.setError(e); slowLog.setError(e);
throw e; throw e;
} finally { } finally {
@ -504,15 +510,15 @@ public class RawKVClient implements RawKVClientBase {
@Override @Override
public List<KvPair> scan(ByteString startKey, ByteString endKey, boolean keyOnly) { public List<KvPair> scan(ByteString startKey, ByteString endKey, boolean keyOnly) {
String label = "client_raw_scan_without_limit"; String[] labels = withClusterId("client_raw_scan_without_limit");
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVScanSlowLogInMS())); SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVScanSlowLogInMS()));
SlowLogSpan span = slowLog.start("scan"); SlowLogSpan span = slowLog.start("scan");
span.addProperty("startKey", KeyUtils.formatBytesUTF8(startKey)); span.addProperty("startKey", KeyUtils.formatBytesUTF8(startKey));
span.addProperty("endKey", KeyUtils.formatBytesUTF8(endKey)); span.addProperty("endKey", KeyUtils.formatBytesUTF8(endKey));
span.addProperty("keyOnly", String.valueOf(keyOnly)); span.addProperty("keyOnly", String.valueOf(keyOnly));
ConcreteBackOffer backOffer = ConcreteBackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVScanTimeoutInMS(), slowLog); ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVScanTimeoutInMS(), slowLog, clusterId);
try { try {
ByteString newStartKey = startKey; ByteString newStartKey = startKey;
List<KvPair> result = new ArrayList<>(); List<KvPair> result = new ArrayList<>();
@ -532,10 +538,10 @@ public class RawKVClient implements RawKVClientBase {
iterator.forEachRemaining(result::add); iterator.forEachRemaining(result::add);
newStartKey = Key.toRawKey(result.get(result.size() - 1).getKey()).next().toByteString(); newStartKey = Key.toRawKey(result.get(result.size() - 1).getKey()).next().toByteString();
} }
RAW_REQUEST_SUCCESS.labels(label).inc(); RAW_REQUEST_SUCCESS.labels(labels).inc();
return result; return result;
} catch (Exception e) { } catch (Exception e) {
RAW_REQUEST_FAILURE.labels(label).inc(); RAW_REQUEST_FAILURE.labels(labels).inc();
slowLog.setError(e); slowLog.setError(e);
throw e; throw e;
} finally { } finally {
@ -570,19 +576,19 @@ public class RawKVClient implements RawKVClientBase {
@Override @Override
public void delete(ByteString key) { public void delete(ByteString key) {
String label = "client_raw_delete"; String[] labels = withClusterId("client_raw_delete");
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVWriteSlowLogInMS())); SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVWriteSlowLogInMS()));
SlowLogSpan span = slowLog.start("delete"); SlowLogSpan span = slowLog.start("delete");
span.addProperty("key", KeyUtils.formatBytesUTF8(key)); span.addProperty("key", KeyUtils.formatBytesUTF8(key));
ConcreteBackOffer backOffer = ConcreteBackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog); ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog, clusterId);
try { try {
while (true) { while (true) {
try (RegionStoreClient client = clientBuilder.build(key, backOffer)) { try (RegionStoreClient client = clientBuilder.build(key, backOffer)) {
span.addProperty("region", client.getRegion().toString()); span.addProperty("region", client.getRegion().toString());
client.rawDelete(backOffer, key, atomicForCAS); client.rawDelete(backOffer, key, atomicForCAS);
RAW_REQUEST_SUCCESS.labels(label).inc(); RAW_REQUEST_SUCCESS.labels(labels).inc();
return; return;
} catch (final TiKVException e) { } catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e); backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
@ -590,7 +596,7 @@ public class RawKVClient implements RawKVClientBase {
} }
} }
} catch (Exception e) { } catch (Exception e) {
RAW_REQUEST_FAILURE.labels(label).inc(); RAW_REQUEST_FAILURE.labels(labels).inc();
slowLog.setError(e); slowLog.setError(e);
throw e; throw e;
} finally { } finally {
@ -602,17 +608,17 @@ public class RawKVClient implements RawKVClientBase {
@Override @Override
public synchronized void deleteRange(ByteString startKey, ByteString endKey) { public synchronized void deleteRange(ByteString startKey, ByteString endKey) {
String label = "client_raw_delete_range"; String[] labels = withClusterId("client_raw_delete_range");
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
ConcreteBackOffer backOffer = ConcreteBackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff( ConcreteBackOffer.newDeadlineBackOff(
conf.getRawKVCleanTimeoutInMS(), SlowLogEmptyImpl.INSTANCE); conf.getRawKVCleanTimeoutInMS(), SlowLogEmptyImpl.INSTANCE, clusterId);
try { try {
long deadline = System.currentTimeMillis() + conf.getRawKVCleanTimeoutInMS(); long deadline = System.currentTimeMillis() + conf.getRawKVCleanTimeoutInMS();
doSendDeleteRange(backOffer, startKey, endKey, deadline); doSendDeleteRange(backOffer, startKey, endKey, deadline);
RAW_REQUEST_SUCCESS.labels(label).inc(); RAW_REQUEST_SUCCESS.labels(labels).inc();
} catch (Exception e) { } catch (Exception e) {
RAW_REQUEST_FAILURE.labels(label).inc(); RAW_REQUEST_FAILURE.labels(labels).inc();
throw e; throw e;
} finally { } finally {
requestTimer.observeDuration(); requestTimer.observeDuration();
@ -625,6 +631,11 @@ public class RawKVClient implements RawKVClientBase {
deleteRange(key, endKey); deleteRange(key, endKey);
} }
@Override
public TiSession getSession() {
return tiSession;
}
/** /**
* Ingest KV pairs to RawKV using StreamKV API. * Ingest KV pairs to RawKV using StreamKV API.
* *
@ -1048,15 +1059,15 @@ public class RawKVClient implements RawKVClientBase {
*/ */
public Iterator<KvPair> scan0( public Iterator<KvPair> scan0(
ByteString startKey, ByteString endKey, int limit, boolean keyOnly) { ByteString startKey, ByteString endKey, int limit, boolean keyOnly) {
String label = "client_raw_scan"; String[] labels = withClusterId("client_raw_scan");
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer(); Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(labels).startTimer();
try { try {
Iterator<KvPair> iterator = Iterator<KvPair> iterator =
rawScanIterator(conf, clientBuilder, startKey, endKey, limit, keyOnly, defaultBackOff()); rawScanIterator(conf, clientBuilder, startKey, endKey, limit, keyOnly, defaultBackOff());
RAW_REQUEST_SUCCESS.labels(label).inc(); RAW_REQUEST_SUCCESS.labels(labels).inc();
return iterator; return iterator;
} catch (Exception e) { } catch (Exception e) {
RAW_REQUEST_FAILURE.labels(label).inc(); RAW_REQUEST_FAILURE.labels(labels).inc();
throw e; throw e;
} finally { } finally {
requestTimer.observeDuration(); requestTimer.observeDuration();
@ -1171,6 +1182,6 @@ public class RawKVClient implements RawKVClientBase {
} }
private BackOffer defaultBackOff() { private BackOffer defaultBackOff() {
return ConcreteBackOffer.newCustomBackOff(conf.getRawKVDefaultBackoffInMS()); return ConcreteBackOffer.newCustomBackOff(conf.getRawKVDefaultBackoffInMS(), clusterId);
} }
} }

View File

@ -21,6 +21,7 @@ import com.google.protobuf.ByteString;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import org.tikv.common.TiSession;
import org.tikv.common.util.Pair; import org.tikv.common.util.Pair;
import org.tikv.common.util.ScanOption; import org.tikv.common.util.ScanOption;
import org.tikv.kvproto.Kvrpcpb; import org.tikv.kvproto.Kvrpcpb;
@ -271,4 +272,7 @@ public interface RawKVClientBase extends AutoCloseable {
* @param key prefix of keys to be deleted * @param key prefix of keys to be deleted
*/ */
void deletePrefix(ByteString key); void deletePrefix(ByteString key);
/** Get the session of the current client */
TiSession getSession();
} }

View File

@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Optional; import java.util.Optional;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.tikv.common.TiSession;
import org.tikv.common.exception.CircuitBreakerOpenException; import org.tikv.common.exception.CircuitBreakerOpenException;
import org.tikv.common.util.HistogramUtils; import org.tikv.common.util.HistogramUtils;
import org.tikv.common.util.Pair; import org.tikv.common.util.Pair;
@ -39,28 +40,28 @@ public class SmartRawKVClient implements RawKVClientBase {
HistogramUtils.buildDuration() HistogramUtils.buildDuration()
.name("client_java_smart_raw_requests_latency") .name("client_java_smart_raw_requests_latency")
.help("client smart raw request latency.") .help("client smart raw request latency.")
.labelNames("type") .labelNames("type", "cluster")
.register(); .register();
private static final Counter REQUEST_SUCCESS = private static final Counter REQUEST_SUCCESS =
Counter.build() Counter.build()
.name("client_java_smart_raw_requests_success") .name("client_java_smart_raw_requests_success")
.help("client smart raw request success.") .help("client smart raw request success.")
.labelNames("type") .labelNames("type", "cluster")
.register(); .register();
private static final Counter REQUEST_FAILURE = private static final Counter REQUEST_FAILURE =
Counter.build() Counter.build()
.name("client_java_smart_raw_requests_failure") .name("client_java_smart_raw_requests_failure")
.help("client smart raw request failure.") .help("client smart raw request failure.")
.labelNames("type") .labelNames("type", "cluster")
.register(); .register();
private static final Counter CIRCUIT_BREAKER_OPENED = private static final Counter CIRCUIT_BREAKER_OPENED =
Counter.build() Counter.build()
.name("client_java_smart_raw_circuit_breaker_opened") .name("client_java_smart_raw_circuit_breaker_opened")
.help("client smart raw circuit breaker opened.") .help("client smart raw circuit breaker opened.")
.labelNames("type") .labelNames("type", "cluster")
.register(); .register();
private final RawKVClientBase client; private final RawKVClientBase client;
@ -204,14 +205,22 @@ public class SmartRawKVClient implements RawKVClientBase {
callWithCircuitBreaker("deletePrefix", () -> client.deletePrefix(key)); callWithCircuitBreaker("deletePrefix", () -> client.deletePrefix(key));
} }
@Override
public TiSession getSession() {
return client.getSession();
}
<T> T callWithCircuitBreaker(String funcName, Function1<T> func) { <T> T callWithCircuitBreaker(String funcName, Function1<T> func) {
Histogram.Timer requestTimer = REQUEST_LATENCY.labels(funcName).startTimer(); String[] labels =
new String[] {funcName, client.getSession().getPDClient().getClusterId().toString()};
Histogram.Timer requestTimer = REQUEST_LATENCY.labels(labels).startTimer();
try { try {
T result = callWithCircuitBreaker0(funcName, func); T result = callWithCircuitBreaker0(funcName, func);
REQUEST_SUCCESS.labels(funcName).inc(); REQUEST_SUCCESS.labels(labels).inc();
return result; return result;
} catch (Exception e) { } catch (Exception e) {
REQUEST_FAILURE.labels(funcName).inc(); REQUEST_FAILURE.labels(labels).inc();
throw e; throw e;
} finally { } finally {
requestTimer.observeDuration(); requestTimer.observeDuration();
@ -244,7 +253,9 @@ public class SmartRawKVClient implements RawKVClientBase {
} }
} else { } else {
logger.debug("Circuit Breaker Opened"); logger.debug("Circuit Breaker Opened");
CIRCUIT_BREAKER_OPENED.labels(funcName).inc(); CIRCUIT_BREAKER_OPENED
.labels(funcName, client.getSession().getPDClient().getClusterId().toString())
.inc();
throw new CircuitBreakerOpenException(); throw new CircuitBreakerOpenException();
} }
} }

View File

@ -32,9 +32,10 @@ public class CircuitBreakerImpl implements CircuitBreaker {
Counter.build() Counter.build()
.name("client_java_circuit_breaker_attempt_counter") .name("client_java_circuit_breaker_attempt_counter")
.help("client circuit breaker attempt counter.") .help("client circuit breaker attempt counter.")
.labelNames("type") .labelNames("type", "cluster")
.register(); .register();
private final Long clusterId;
private final boolean enable; private final boolean enable;
private final int windowInSeconds; private final int windowInSeconds;
private final int errorThresholdPercentage; private final int errorThresholdPercentage;
@ -49,14 +50,15 @@ public class CircuitBreakerImpl implements CircuitBreaker {
private final CircuitBreakerMetrics metrics; private final CircuitBreakerMetrics metrics;
public CircuitBreakerImpl(TiConfiguration conf) { public CircuitBreakerImpl(TiConfiguration conf, long clusterId) {
this( this(
conf.isCircuitBreakEnable(), conf.isCircuitBreakEnable(),
conf.getCircuitBreakAvailabilityWindowInSeconds(), conf.getCircuitBreakAvailabilityWindowInSeconds(),
conf.getCircuitBreakAvailabilityErrorThresholdPercentage(), conf.getCircuitBreakAvailabilityErrorThresholdPercentage(),
conf.getCircuitBreakAvailabilityRequestVolumnThreshold(), conf.getCircuitBreakAvailabilityRequestVolumnThreshold(),
conf.getCircuitBreakSleepWindowInSeconds(), conf.getCircuitBreakSleepWindowInSeconds(),
conf.getCircuitBreakAttemptRequestCount()); conf.getCircuitBreakAttemptRequestCount(),
clusterId);
} }
public CircuitBreakerImpl( public CircuitBreakerImpl(
@ -65,8 +67,10 @@ public class CircuitBreakerImpl implements CircuitBreaker {
int errorThresholdPercentage, int errorThresholdPercentage,
int requestVolumeThreshold, int requestVolumeThreshold,
int sleepWindowInSeconds, int sleepWindowInSeconds,
int attemptRequestCount) { int attemptRequestCount,
long clusterId) {
this.enable = enable; this.enable = enable;
this.clusterId = clusterId;
this.windowInSeconds = windowInSeconds; this.windowInSeconds = windowInSeconds;
this.errorThresholdPercentage = errorThresholdPercentage; this.errorThresholdPercentage = errorThresholdPercentage;
this.requestVolumeThreshold = requestVolumeThreshold; this.requestVolumeThreshold = requestVolumeThreshold;
@ -125,7 +129,7 @@ public class CircuitBreakerImpl implements CircuitBreaker {
@Override @Override
public void recordAttemptSuccess() { public void recordAttemptSuccess() {
CIRCUIT_BREAKER_ATTEMPT_COUNTER.labels("success").inc(); CIRCUIT_BREAKER_ATTEMPT_COUNTER.labels("success", clusterId.toString()).inc();
if (attemptSuccessCount.incrementAndGet() >= this.attemptRequestCount) { if (attemptSuccessCount.incrementAndGet() >= this.attemptRequestCount) {
halfOpen2Close(); halfOpen2Close();
} }
@ -133,7 +137,7 @@ public class CircuitBreakerImpl implements CircuitBreaker {
@Override @Override
public void recordAttemptFailure() { public void recordAttemptFailure() {
CIRCUIT_BREAKER_ATTEMPT_COUNTER.labels("failure").inc(); CIRCUIT_BREAKER_ATTEMPT_COUNTER.labels("failure", clusterId.toString()).inc();
halfOpen2Open(); halfOpen2Open();
} }

View File

@ -77,7 +77,9 @@ public class KVClient implements AutoCloseable {
* @return a ByteString value if key exists, ByteString.EMPTY if key does not exist * @return a ByteString value if key exists, ByteString.EMPTY if key does not exist
*/ */
public ByteString get(ByteString key, long version) throws GrpcException { public ByteString get(ByteString key, long version) throws GrpcException {
BackOffer backOffer = ConcreteBackOffer.newGetBackOff(); BackOffer backOffer =
ConcreteBackOffer.newGetBackOff(
clientBuilder.getRegionManager().getPDClient().getClusterId());
while (true) { while (true) {
RegionStoreClient client = clientBuilder.build(key); RegionStoreClient client = clientBuilder.build(key);
try { try {
@ -178,7 +180,9 @@ public class KVClient implements AutoCloseable {
List<ByteString> keyList = list.stream().map(pair -> pair.first).collect(Collectors.toList()); List<ByteString> keyList = list.stream().map(pair -> pair.first).collect(Collectors.toList());
Map<TiRegion, List<ByteString>> groupKeys = Map<TiRegion, List<ByteString>> groupKeys =
groupKeysByRegion( groupKeysByRegion(
clientBuilder.getRegionManager(), keyList, ConcreteBackOffer.newRawKVBackOff()); clientBuilder.getRegionManager(),
keyList,
ConcreteBackOffer.newRawKVBackOff(tiSession.getPDClient().getClusterId()));
// ingest for each region // ingest for each region
for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) { for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {

View File

@ -92,7 +92,9 @@ public class TTLManager {
} }
private void doKeepAlive() { private void doKeepAlive() {
BackOffer bo = ConcreteBackOffer.newCustomBackOff(MANAGED_LOCK_TTL); BackOffer bo =
ConcreteBackOffer.newCustomBackOff(
MANAGED_LOCK_TTL, regionManager.getPDClient().getClusterId());
long uptime = kvClient.getTimestamp().getPhysical() - TiTimestamp.extractPhysical(startTS); long uptime = kvClient.getTimestamp().getPhysical() - TiTimestamp.extractPhysical(startTS);
long ttl = uptime + MANAGED_LOCK_TTL; long ttl = uptime + MANAGED_LOCK_TTL;

View File

@ -284,7 +284,9 @@ public class TwoPhaseCommitter implements AutoCloseable {
// consume one task if reaches task limit // consume one task if reaches task limit
completionService.take().get(); completionService.take().get();
} }
BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(maxBackOfferMS); BackOffer backOffer =
ConcreteBackOffer.newCustomBackOff(
maxBackOfferMS, regionManager.getPDClient().getClusterId());
completionService.submit( completionService.submit(
() -> { () -> {
doPrewriteSecondaryKeysInBatchesWithRetry( doPrewriteSecondaryKeysInBatchesWithRetry(
@ -541,7 +543,9 @@ public class TwoPhaseCommitter implements AutoCloseable {
// consume one task if reaches task limit // consume one task if reaches task limit
completionService.take().get(); completionService.take().get();
} }
BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(commitBackOfferMS); BackOffer backOffer =
ConcreteBackOffer.newCustomBackOff(
commitBackOfferMS, regionManager.getPDClient().getClusterId());
completionService.submit( completionService.submit(
() -> { () -> {
doCommitSecondaryKeysWithRetry(backOffer, keyBytes, curSize, commitTs); doCommitSecondaryKeysWithRetry(backOffer, keyBytes, curSize, commitTs);

View File

@ -70,7 +70,7 @@ public class TxnKVClient implements AutoCloseable {
} }
public TiTimestamp getTimestamp() { public TiTimestamp getTimestamp() {
BackOffer bo = ConcreteBackOffer.newTsoBackOff(); BackOffer bo = ConcreteBackOffer.newTsoBackOff(pdClient.getClusterId());
TiTimestamp timestamp = new TiTimestamp(0, 0); TiTimestamp timestamp = new TiTimestamp(0, 0);
try { try {
while (true) { while (true) {

View File

@ -201,7 +201,8 @@ public class RawKVClientTest extends BaseRawKVTest {
public void testDeadlineBackOff() { public void testDeadlineBackOff() {
int timeout = 2000; int timeout = 2000;
int sleep = 150; int sleep = 150;
BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(timeout, SlowLogEmptyImpl.INSTANCE); BackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(timeout, SlowLogEmptyImpl.INSTANCE, 0);
long s = System.currentTimeMillis(); long s = System.currentTimeMillis();
try { try {
while (true) { while (true) {

View File

@ -42,7 +42,8 @@ public class CircuitBreakerTest {
errorThresholdPercentage, errorThresholdPercentage,
requestVolumeThreshold, requestVolumeThreshold,
sleepWindowInSeconds, sleepWindowInSeconds,
attemptRequestCount); attemptRequestCount,
1024);
CircuitBreakerMetrics metrics = circuitBreaker.getMetrics(); CircuitBreakerMetrics metrics = circuitBreaker.getMetrics();
// initial state: CLOSE // initial state: CLOSE