schedule async reload for region that has unavailable tiflash peers to avoid load un-balance issue (#1029)

Signed-off-by: xufei <xufei@pingcap.com>
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
This commit is contained in:
xufei 2023-10-25 10:24:11 +08:00 committed by GitHub
parent 33e722ef0b
commit cad3142206
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 183 additions and 82 deletions

View File

@ -559,7 +559,7 @@ func (s *testCommitterSuite) TestPrewritePrimaryKeyFailed() {
func (s *testCommitterSuite) TestWrittenKeysOnConflict() {
// This test checks that when there is a write conflict, written keys is collected,
// so we can use it to clean up keys.
region, _, _ := s.cluster.GetRegionByKey([]byte("x"))
region, _, _, _ := s.cluster.GetRegionByKey([]byte("x"))
newRegionID := s.cluster.AllocID()
newPeerID := s.cluster.AllocID()
s.cluster.Split(region.Id, newRegionID, []byte("y"), []uint64{newPeerID}, newPeerID)
@ -590,7 +590,7 @@ func (s *testCommitterSuite) TestWrittenKeysOnConflict() {
func (s *testCommitterSuite) TestPrewriteTxnSize() {
// Prepare two regions first: (, 100) and [100, )
region, _, _ := s.cluster.GetRegionByKey([]byte{50})
region, _, _, _ := s.cluster.GetRegionByKey([]byte{50})
newRegionID := s.cluster.AllocID()
newPeerID := s.cluster.AllocID()
s.cluster.Split(region.Id, newRegionID, []byte{100}, []uint64{newPeerID}, newPeerID)
@ -1868,8 +1868,8 @@ func (s *testCommitterSuite) TestCommitDeadLock() {
k1 := []byte("a_deadlock_k1")
k2 := []byte("y_deadlock_k2")
region1, _, _ := s.cluster.GetRegionByKey(k1)
region2, _, _ := s.cluster.GetRegionByKey(k2)
region1, _, _, _ := s.cluster.GetRegionByKey(k1)
region2, _, _, _ := s.cluster.GetRegionByKey(k2)
s.True(region1.Id != region2.Id)
txn1 := s.begin()
@ -2029,7 +2029,7 @@ func (s *testCommitterSuite) TestResolveMixed() {
// accurate list of secondary keys.
func (s *testCommitterSuite) TestPrewriteSecondaryKeys() {
// Prepare two regions first: (, 100) and [100, )
region, _, _ := s.cluster.GetRegionByKey([]byte{50})
region, _, _, _ := s.cluster.GetRegionByKey([]byte{50})
newRegionID := s.cluster.AllocID()
newPeerID := s.cluster.AllocID()
s.cluster.Split(region.Id, newRegionID, []byte{100}, []uint64{newPeerID}, newPeerID)

View File

@ -44,7 +44,7 @@ import (
"time"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/store/mockstore/unistore"
"github.com/pingcap/tidb/pkg/store/mockstore/unistore"
"github.com/pkg/errors"
"github.com/stretchr/testify/suite"
tikverr "github.com/tikv/client-go/v2/error"

View File

@ -6,12 +6,12 @@ require (
github.com/ninedraft/israce v0.0.3
github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c
github.com/pingcap/kvproto v0.0.0-20230904082117-ecdbf1f8c130
github.com/pingcap/tidb v1.1.0-beta.0.20230927025416-38023987346f
github.com/pingcap/kvproto v0.0.0-20230925123611-87bebcc0d071
github.com/pingcap/tidb v1.1.0-beta.0.20231020070330-48d69d39c3d0
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.4
github.com/tidwall/gjson v1.14.1
github.com/tikv/client-go/v2 v2.0.8-0.20230925032502-44b0cf7aba2b
github.com/tikv/client-go/v2 v2.0.8-0.20231010061802-07432ef6c031
github.com/tikv/pd/client v0.0.0-20230912103610-2f57a9f050eb
go.uber.org/goleak v1.2.1
)
@ -24,7 +24,6 @@ require (
require (
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect
github.com/BurntSushi/toml v1.3.2 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cloudfoundry/gosigar v1.3.6 // indirect
@ -54,7 +53,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/influxdata/tdigest v0.0.1 // indirect
github.com/jellydator/ttlcache/v3 v3.0.1 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/klauspost/compress v1.17.1 // indirect
github.com/klauspost/cpuid v1.3.1 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
@ -70,12 +69,12 @@ require (
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 // indirect
github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22 // indirect
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect
github.com/pingcap/tidb/parser v0.0.0-20230927025416-38023987346f // indirect
github.com/pingcap/tidb/pkg/parser v0.0.0-20231020070330-48d69d39c3d0 // indirect
github.com/pingcap/tipb v0.0.0-20230919054518-dfd7d194838f // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect
github.com/prometheus/client_golang v1.16.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.1 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
@ -98,7 +97,7 @@ require (
go.etcd.io/etcd/client/v3 v3.5.2 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.25.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4 // indirect
golang.org/x/net v0.17.0 // indirect
@ -109,7 +108,7 @@ require (
golang.org/x/tools v0.10.0 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
google.golang.org/grpc v1.54.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
@ -119,3 +118,5 @@ replace (
github.com/go-ldap/ldap/v3 => github.com/YangKeao/ldap/v3 v3.4.5-0.20230421065457-369a3bab1117
github.com/tikv/client-go/v2 => ../
)
replace github.com/pingcap/tidb => github.com/windtalker/tidb v1.1.0-beta.0.20231020063218-4d1c15539f3f

View File

@ -62,8 +62,6 @@ github.com/aws/aws-sdk-go v1.44.259/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o=
github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
@ -318,8 +316,8 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
github.com/klauspost/compress v1.8.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.9.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.1 h1:NE3C767s2ak2bweCZo3+rdP4U/HoyVXLv/X9f2gPS5g=
github.com/klauspost/compress v1.17.1/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/cpuid v1.3.1 h1:5JNjFYYQrZeKRJ0734q51WCEEn2huer72Dc7K+R/b6s=
github.com/klauspost/cpuid v1.3.1/go.mod h1:bYW4mA6ZgKPob1/Dlai2LviZJO7KGI3uoWLd42rAQw4=
@ -333,6 +331,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/ks3sdklib/aws-sdk-go v1.2.6 h1:X0Du7oVv+YBp08vhLZnDeCqOd1Ge17cx5kYG8wC7WN8=
github.com/ks3sdklib/aws-sdk-go v1.2.6/go.mod h1:xBNbOrxSnd36AQpZ8o99mGGu+blblUd9rI0MKGmeufo=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/labstack/echo/v4 v4.1.11/go.mod h1:i541M3Fj6f76NZtHSj7TXnyM8n2gaodfvfxNnFqi74g=
@ -428,18 +428,16 @@ github.com/pingcap/fn v1.0.0/go.mod h1:u9WZ1ZiOD1RpNhcI42RucFh/lBuzTu6rw88a+oF2Z
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20230904082117-ecdbf1f8c130 h1:qbLm5cOdCWxZ0mt6SaN2aXI+KFekbPqURd6YkNI+XRI=
github.com/pingcap/kvproto v0.0.0-20230904082117-ecdbf1f8c130/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/kvproto v0.0.0-20230925123611-87bebcc0d071 h1:giqmIJSWHs+jhHfd+rth8CXWR18KAtqJu4imY1YdA6o=
github.com/pingcap/kvproto v0.0.0-20230925123611-87bebcc0d071/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22 h1:2SOzvGvE8beiC1Y4g9Onkvu6UmuBBOeWRGQEjJaT/JY=
github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I=
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM=
github.com/pingcap/tidb v1.1.0-beta.0.20230927025416-38023987346f h1:dn8jgHp2Xv/SfBMDvRAIZiRnQsEqI25Hzi89BHnhXa0=
github.com/pingcap/tidb v1.1.0-beta.0.20230927025416-38023987346f/go.mod h1:xlM+ek4NXAh8wgUF14D3K4S6Ba6O2eohp3glt55KeiE=
github.com/pingcap/tidb/parser v0.0.0-20230927025416-38023987346f h1:FjlKZ4IoMbxkwOAMvF8nE/ARq4t27VPEqBO9ijSdYtc=
github.com/pingcap/tidb/parser v0.0.0-20230927025416-38023987346f/go.mod h1:cwq4bKUlftpWuznB+rqNwbN0xy6/i5SL/nYvEKeJn4s=
github.com/pingcap/tidb/pkg/parser v0.0.0-20231020070330-48d69d39c3d0 h1:X1F/ScGDisfPvS8wsFqFMONBFm8XqmbprBQQVXnIHUk=
github.com/pingcap/tidb/pkg/parser v0.0.0-20231020070330-48d69d39c3d0/go.mod h1:5s4ZS7VJ9W8ed0/hHpXZ9eKt3URTYQAsOLtgX6ysy/U=
github.com/pingcap/tipb v0.0.0-20230919054518-dfd7d194838f h1:NCiI4Wyu4GkViLGTu6cYcxt79LZ1SenBBQX1OwEV6Jg=
github.com/pingcap/tipb v0.0.0-20230919054518-dfd7d194838f/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 h1:Qj1ukM4GlMWXNdMBuXcXfz/Kw9s1qm0CLY32QxuSImI=
@ -465,8 +463,8 @@ github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.4.0 h1:5lQXD3cAg1OXBf4Wq03gTrXHeaV0TQvGfUooCfx1yqY=
github.com/prometheus/client_model v0.4.0/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU=
github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI=
github.com/prometheus/common v0.0.0-20181020173914-7e9e6cabbd39/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
@ -583,6 +581,8 @@ github.com/vbauerster/mpb/v7 v7.5.3 h1:BkGfmb6nMrrBQDFECR/Q7RkKCw7ylMetCb4079CGs
github.com/vbauerster/mpb/v7 v7.5.3/go.mod h1:i+h4QY6lmLvBNK2ah1fSreiw3ajskRlBp9AhY/PnuOE=
github.com/wangjohn/quickselect v0.0.0-20161129230411-ed8402a42d5f h1:9DDCDwOyEy/gId+IEMrFHLuQ5R/WV0KNxWLler8X2OY=
github.com/wangjohn/quickselect v0.0.0-20161129230411-ed8402a42d5f/go.mod h1:8sdOQnirw1PrcnTJYkmW1iOHtUmblMmGdUOHyWYycLI=
github.com/windtalker/tidb v1.1.0-beta.0.20231020063218-4d1c15539f3f h1:zBhycVdXkUzyRFgr+7GW8YlBfNLYW5mCWvWYgO7YtZc=
github.com/windtalker/tidb v1.1.0-beta.0.20231020063218-4d1c15539f3f/go.mod h1:Zp1mtUK8/IRg3alayGtFPnYPc95/dscSlweFfB81+3o=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
@ -654,6 +654,8 @@ go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
go.uber.org/mock v0.2.0 h1:TaP3xedm7JaAgScZO7tlvlKrqT0p7I6OsdGB5YNSMDU=
go.uber.org/mock v0.2.0/go.mod h1:J0y0rp9L3xiff1+ZBfKxlC1fz2+aO16tw0tsDOixfuM=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.4.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
@ -669,8 +671,8 @@ go.uber.org/zap v1.12.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=
go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo=
go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw=
go.uber.org/zap v1.25.0 h1:4Hvk6GtkucQ790dqmj7l1eEnRdKm3k3ZUrUMS2d5+5c=
go.uber.org/zap v1.25.0/go.mod h1:JIAUzQIH94IC4fOJQm7gMmBJP5k7wQfdcnYdPoEXJYk=
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
@ -879,8 +881,8 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

View File

@ -45,9 +45,9 @@ import (
"unsafe"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/kv"
txndriver "github.com/pingcap/tidb/store/driver/txn"
"github.com/pingcap/tidb/store/mockstore/unistore"
"github.com/pingcap/tidb/pkg/kv"
txndriver "github.com/pingcap/tidb/pkg/store/driver/txn"
"github.com/pingcap/tidb/pkg/store/mockstore/unistore"
"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"
"github.com/tikv/client-go/v2/config"

View File

@ -148,12 +148,14 @@ const (
// Region presents kv region
type Region struct {
meta *metapb.Region // raw region meta from PD, immutable after init
store unsafe.Pointer // point to region store info, see RegionStore
syncFlag int32 // region need be sync in next turn
lastAccess int64 // last region access time, see checkRegionCacheTTL
invalidReason InvalidReason // the reason why the region is invalidated
asyncReload atomic.Bool // the region need to be reloaded in async mode
meta *metapb.Region // raw region meta from PD, immutable after init
store unsafe.Pointer // point to region store info, see RegionStore
syncFlag int32 // region need be sync in next turn
lastAccess int64 // last region access time, see checkRegionCacheTTL
invalidReason InvalidReason // the reason why the region is invalidated
asyncReload atomic.Bool // the region need to be reloaded in async mode
lastLoad int64 // last region load time
hasUnavailableTiFlashStore bool // has unavailable TiFlash store, if yes, need to trigger async reload periodically
}
// AccessIndex represent the index for accessIndex array
@ -330,6 +332,29 @@ func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Regio
if len(availablePeers) == 0 {
return nil, errors.Errorf("no available peers, region: {%v}", r.meta)
}
for _, p := range pdRegion.DownPeers {
c.storeMu.RLock()
store, exists := c.storeMu.stores[p.StoreId]
c.storeMu.RUnlock()
if !exists {
store = c.getStoreByStoreID(p.StoreId)
}
addr, err := store.initResolve(bo, c)
if err != nil {
continue
}
// Filter the peer on a tombstone store.
if addr == "" {
continue
}
if store.storeType == tikvrpc.TiFlash {
r.hasUnavailableTiFlashStore = true
break
}
}
rs.workTiKVIdx = leaderAccessIdx
r.meta.Peers = availablePeers
@ -337,6 +362,7 @@ func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Regio
// mark region has been init accessed.
r.lastAccess = time.Now().Unix()
r.lastLoad = r.lastAccess
return r, nil
}
@ -722,18 +748,8 @@ func WithMatchStores(stores []uint64) StoreSelectorOption {
// GetTiKVRPCContext returns RPCContext for a region. If it returns nil, the region
// must be out of date and already dropped from cache.
func (c *RegionCache) GetTiKVRPCContext(bo *retry.Backoffer, id RegionVerID, replicaRead kv.ReplicaReadType, followerStoreSeed uint32, opts ...StoreSelectorOption) (*RPCContext, error) {
ts := time.Now().Unix()
cachedRegion := c.GetCachedRegionWithRLock(id)
if cachedRegion == nil {
return nil, nil
}
if cachedRegion.checkNeedReload() {
return nil, nil
}
if !cachedRegion.checkRegionCacheTTL(ts) {
if !cachedRegion.isValid() {
return nil, nil
}
@ -867,14 +883,15 @@ func (c *RegionCache) GetAllValidTiFlashStores(id RegionVerID, currentStore *Sto
// must be out of date and already dropped from cache or not flash store found.
// `loadBalance` is an option. For batch cop, it is pointless and might cause try the failed store repeatly.
func (c *RegionCache) GetTiFlashRPCContext(bo *retry.Backoffer, id RegionVerID, loadBalance bool, labelFilter LabelFilter) (*RPCContext, error) {
ts := time.Now().Unix()
cachedRegion := c.GetCachedRegionWithRLock(id)
if cachedRegion == nil {
if !cachedRegion.isValid() {
return nil, nil
}
if !cachedRegion.checkRegionCacheTTL(ts) {
return nil, nil
if cachedRegion.hasUnavailableTiFlashStore && time.Now().Unix()-cachedRegion.lastLoad > regionCacheTTLSec {
/// schedule an async reload to avoid load balance issue, refer https://github.com/pingcap/tidb/issues/35418 for details
c.scheduleReloadRegion(cachedRegion)
}
regionStore := cachedRegion.getStore()

View File

@ -40,6 +40,7 @@ import (
"fmt"
"math/rand"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"
@ -291,6 +292,56 @@ func (s *testRegionCacheSuite) TestResolveStateTransition() {
s.cluster.AddStore(storeMeta.GetId(), storeMeta.GetAddress(), storeMeta.GetLabels()...)
}
func (s *testRegionCacheSuite) TestTiFlashDownPeersAndAsyncReload() {
store3 := s.cluster.AllocID()
peer3 := s.cluster.AllocID()
s.cluster.AddStore(store3, s.storeAddr(store3))
s.cluster.AddPeer(s.region1, store3, peer3)
s.cluster.UpdateStoreAddr(store3, s.storeAddr(store3), &metapb.StoreLabel{Key: "engine", Value: "tiflash"})
store4 := s.cluster.AllocID()
peer4 := s.cluster.AllocID()
s.cluster.AddStore(store4, s.storeAddr(store4))
s.cluster.AddPeer(s.region1, store4, peer4)
s.cluster.UpdateStoreAddr(store4, s.storeAddr(store4), &metapb.StoreLabel{Key: "engine", Value: "tiflash"})
// load region to region cache with no down tiflash peer
loc, err := s.cache.LocateKey(s.bo, []byte("a"))
s.Nil(err)
s.Equal(loc.Region.id, s.region1)
ctx, err := s.cache.GetTiFlashRPCContext(s.bo, loc.Region, true, LabelFilterNoTiFlashWriteNode)
s.Nil(err)
s.NotNil(ctx)
region := s.cache.GetCachedRegionWithRLock(loc.Region)
s.Equal(region.hasUnavailableTiFlashStore, false)
s.Equal(region.asyncReload.Load(), false)
s.cache.clear()
s.cluster.MarkPeerDown(peer3)
s.cache.reloadRegion(loc.Region.id)
loc, err = s.cache.LocateKey(s.bo, []byte("a"))
s.Nil(err)
s.Equal(loc.Region.id, s.region1)
region = s.cache.GetCachedRegionWithRLock(loc.Region)
s.Equal(region.hasUnavailableTiFlashStore, true)
s.Equal(region.asyncReload.Load(), false)
SetRegionCacheTTLSec(3)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i <= 3; i++ {
s.cache.GetTiFlashRPCContext(s.bo, loc.Region, true, LabelFilterNoTiFlashWriteNode)
time.Sleep(1 * time.Second)
}
}()
wg.Wait()
s.cache.GetTiFlashRPCContext(s.bo, loc.Region, true, LabelFilterNoTiFlashWriteNode)
s.Equal(region.hasUnavailableTiFlashStore, true)
s.Equal(region.asyncReload.Load(), true)
}
// TestFilterDownPeersOrPeersOnTombstoneOrDroppedStore verifies the RegionCache filter
// region's down peers and peers on tombstone or dropped stores. RegionCache shouldn't
// report errors in such cases if there are available peers.

View File

@ -190,7 +190,7 @@ func (s *testRegionCacheStaleReadSuite) setClient() {
} else {
// follower read leader
if !req.ReplicaRead && !req.StaleRead {
_, leaderPeer, _ := s.cluster.GetRegionByID(s.regionID)
_, leaderPeer, _, _ := s.cluster.GetRegionByID(s.regionID)
response.Resp = &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
NotLeader: &errorpb.NotLeader{
RegionId: req.RegionId,

View File

@ -47,7 +47,7 @@ type Cluster interface {
// StoreID, RegionID, or PeerID.
AllocID() uint64
// GetRegionByKey returns the Region, Buckets and its leader whose range contains the key.
GetRegionByKey(key []byte) (*metapb.Region, *metapb.Peer, *metapb.Buckets)
GetRegionByKey(key []byte) (*metapb.Region, *metapb.Peer, *metapb.Buckets, []*metapb.Peer)
// GetAllStores returns all Stores' meta.
GetAllStores() []*metapb.Store
// ScheduleDelay schedules a delay event for a transaction on a region.

View File

@ -63,9 +63,10 @@ var _ cluster.Cluster = &Cluster{}
// to client's request.
type Cluster struct {
sync.RWMutex
id uint64
stores map[uint64]*Store
regions map[uint64]*Region
id uint64
stores map[uint64]*Store
regions map[uint64]*Region
downPeers map[uint64]struct{}
mvccStore MVCCStore
@ -85,6 +86,7 @@ func NewCluster(mvccStore MVCCStore) *Cluster {
return &Cluster{
stores: make(map[uint64]*Store),
regions: make(map[uint64]*Region),
downPeers: make(map[uint64]struct{}),
delayEvents: make(map[delayKey]time.Duration),
mvccStore: mvccStore,
}
@ -244,6 +246,18 @@ func (c *Cluster) MarkTombstone(storeID uint64) {
c.stores[storeID].meta = &nm
}
func (c *Cluster) MarkPeerDown(peerID uint64) {
c.Lock()
defer c.Unlock()
c.downPeers[peerID] = struct{}{}
}
func (c *Cluster) RemoveDownPeer(peerID uint64) {
c.Lock()
defer c.Unlock()
delete(c.downPeers, peerID)
}
// UpdateStoreAddr updates store address for cluster.
func (c *Cluster) UpdateStoreAddr(storeID uint64, addr string, labels ...*metapb.StoreLabel) {
c.Lock()
@ -272,7 +286,7 @@ func (c *Cluster) GetRegion(regionID uint64) (*metapb.Region, uint64) {
}
// GetRegionByKey returns the Region and its leader whose range contains the key.
func (c *Cluster) GetRegionByKey(key []byte) (*metapb.Region, *metapb.Peer, *metapb.Buckets) {
func (c *Cluster) GetRegionByKey(key []byte) (*metapb.Region, *metapb.Peer, *metapb.Buckets, []*metapb.Peer) {
c.RLock()
defer c.RUnlock()
@ -280,43 +294,58 @@ func (c *Cluster) GetRegionByKey(key []byte) (*metapb.Region, *metapb.Peer, *met
}
// getRegionByKeyNoLock returns the Region and its leader whose range contains the key without Lock.
func (c *Cluster) getRegionByKeyNoLock(key []byte) (*metapb.Region, *metapb.Peer, *metapb.Buckets) {
func (c *Cluster) getRegionByKeyNoLock(key []byte) (*metapb.Region, *metapb.Peer, *metapb.Buckets, []*metapb.Peer) {
for _, r := range c.regions {
if regionContains(r.Meta.StartKey, r.Meta.EndKey, key) {
return proto.Clone(r.Meta).(*metapb.Region), proto.Clone(r.leaderPeer()).(*metapb.Peer), proto.Clone(r.Buckets).(*metapb.Buckets)
return proto.Clone(r.Meta).(*metapb.Region), proto.Clone(r.leaderPeer()).(*metapb.Peer),
proto.Clone(r.Buckets).(*metapb.Buckets), c.getDownPeers(r)
}
}
return nil, nil, nil
return nil, nil, nil, nil
}
// GetPrevRegionByKey returns the previous Region and its leader whose range contains the key.
func (c *Cluster) GetPrevRegionByKey(key []byte) (*metapb.Region, *metapb.Peer, *metapb.Buckets) {
func (c *Cluster) GetPrevRegionByKey(key []byte) (*metapb.Region, *metapb.Peer, *metapb.Buckets, []*metapb.Peer) {
c.RLock()
defer c.RUnlock()
currentRegion, _, _ := c.getRegionByKeyNoLock(key)
currentRegion, _, _, _ := c.getRegionByKeyNoLock(key)
if len(currentRegion.StartKey) == 0 {
return nil, nil, nil
return nil, nil, nil, nil
}
for _, r := range c.regions {
if bytes.Equal(r.Meta.EndKey, currentRegion.StartKey) {
return proto.Clone(r.Meta).(*metapb.Region), proto.Clone(r.leaderPeer()).(*metapb.Peer), proto.Clone(r.Buckets).(*metapb.Buckets)
return proto.Clone(r.Meta).(*metapb.Region), proto.Clone(r.leaderPeer()).(*metapb.Peer),
proto.Clone(r.Buckets).(*metapb.Buckets), c.getDownPeers(r)
}
}
return nil, nil, nil
return nil, nil, nil, nil
}
func (c *Cluster) getDownPeers(region *Region) []*metapb.Peer {
var downPeers []*metapb.Peer
for peerID := range c.downPeers {
for _, peer := range region.Meta.Peers {
if peer.GetId() == peerID {
downPeers = append(downPeers, proto.Clone(peer).(*metapb.Peer))
}
}
}
return downPeers
}
// GetRegionByID returns the Region and its leader whose ID is regionID.
func (c *Cluster) GetRegionByID(regionID uint64) (*metapb.Region, *metapb.Peer, *metapb.Buckets) {
func (c *Cluster) GetRegionByID(regionID uint64) (*metapb.Region, *metapb.Peer, *metapb.Buckets, []*metapb.Peer) {
c.RLock()
defer c.RUnlock()
for _, r := range c.regions {
if r.Meta.GetId() == regionID {
return proto.Clone(r.Meta).(*metapb.Region), proto.Clone(r.leaderPeer()).(*metapb.Peer), proto.Clone(r.Buckets).(*metapb.Buckets)
return proto.Clone(r.Meta).(*metapb.Region), proto.Clone(r.leaderPeer()).(*metapb.Peer),
proto.Clone(r.Buckets).(*metapb.Buckets), c.getDownPeers(r)
}
}
return nil, nil, nil
return nil, nil, nil, nil
}
// ScanRegions returns at most `limit` regions from given `key` and their leaders.
@ -362,8 +391,9 @@ func (c *Cluster) ScanRegions(startKey, endKey []byte, limit int) []*pd.Region {
}
r := &pd.Region{
Meta: proto.Clone(region.Meta).(*metapb.Region),
Leader: leader,
Meta: proto.Clone(region.Meta).(*metapb.Region),
Leader: leader,
DownPeers: c.getDownPeers(region),
}
result = append(result, r)
}

View File

@ -202,11 +202,11 @@ func (m *mockTSFuture) Wait() (int64, int64, error) {
}
func (c *pdClient) GetRegion(ctx context.Context, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error) {
region, peer, buckets := c.cluster.GetRegionByKey(key)
region, peer, buckets, downPeers := c.cluster.GetRegionByKey(key)
if len(opts) == 0 {
buckets = nil
}
return &pd.Region{Meta: region, Leader: peer, Buckets: buckets}, nil
return &pd.Region{Meta: region, Leader: peer, Buckets: buckets, DownPeers: downPeers}, nil
}
func (c *pdClient) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string) (*pd.Region, error) {
@ -214,16 +214,16 @@ func (c *pdClient) GetRegionFromMember(ctx context.Context, key []byte, memberUR
}
func (c *pdClient) GetPrevRegion(ctx context.Context, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error) {
region, peer, buckets := c.cluster.GetPrevRegionByKey(key)
region, peer, buckets, downPeers := c.cluster.GetPrevRegionByKey(key)
if len(opts) == 0 {
buckets = nil
}
return &pd.Region{Meta: region, Leader: peer, Buckets: buckets}, nil
return &pd.Region{Meta: region, Leader: peer, Buckets: buckets, DownPeers: downPeers}, nil
}
func (c *pdClient) GetRegionByID(ctx context.Context, regionID uint64, opts ...pd.GetRegionOption) (*pd.Region, error) {
region, peer, buckets := c.cluster.GetRegionByID(regionID)
return &pd.Region{Meta: region, Leader: peer, Buckets: buckets}, nil
region, peer, buckets, downPeers := c.cluster.GetRegionByID(regionID)
return &pd.Region{Meta: region, Leader: peer, Buckets: buckets, DownPeers: downPeers}, nil
}
func (c *pdClient) ScanRegions(ctx context.Context, startKey []byte, endKey []byte, limit int) ([]*pd.Region, error) {

View File

@ -645,7 +645,7 @@ func (h kvHandler) handleSplitRegion(req *kvrpcpb.SplitRegionRequest) *kvrpcpb.S
resp := &kvrpcpb.SplitRegionResponse{Regions: make([]*metapb.Region, 0, len(keys)+1)}
for i, key := range keys {
k := NewMvccKey(key)
region, _, _ := h.cluster.GetRegionByKey(k)
region, _, _, _ := h.cluster.GetRegionByKey(k)
if bytes.Equal(region.GetStartKey(), key) {
continue
}

View File

@ -142,7 +142,7 @@ func (s *Session) CheckRequestContext(ctx *kvrpcpb.Context) *errorpb.Error {
}
// Region epoch does not match.
if !proto.Equal(region.GetRegionEpoch(), ctx.GetRegionEpoch()) {
nextRegion, _, _ := s.cluster.GetRegionByKey(region.GetEndKey())
nextRegion, _, _, _ := s.cluster.GetRegionByKey(region.GetEndKey())
currentRegions := []*metapb.Region{region}
if nextRegion != nil {
currentRegions = append(currentRegions, nextRegion)