mirror of https://github.com/tikv/client-go.git
Merge branch 'master' into remove-log
This commit is contained in:
commit
d6f3951e9d
|
|
@ -217,10 +217,9 @@ func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err e
|
|||
atomic.AddInt64(&detail.BackoffCount, 1)
|
||||
}
|
||||
|
||||
if b.vars != nil && b.vars.Killed != nil {
|
||||
if atomic.LoadUint32(b.vars.Killed) == 1 {
|
||||
return errors.WithStack(tikverr.ErrQueryInterrupted)
|
||||
}
|
||||
err2 := b.CheckKilled()
|
||||
if err2 != nil {
|
||||
return err2
|
||||
}
|
||||
|
||||
var startTs interface{}
|
||||
|
|
@ -382,3 +381,17 @@ func (b *Backoffer) longestSleepCfg() (*Config, int) {
|
|||
}
|
||||
return nil, 0
|
||||
}
|
||||
|
||||
func (b *Backoffer) CheckKilled() error {
|
||||
if b.vars != nil && b.vars.Killed != nil {
|
||||
killed := atomic.LoadUint32(b.vars.Killed)
|
||||
if killed != 0 {
|
||||
logutil.BgLogger().Info(
|
||||
"backoff stops because a killed signal is received",
|
||||
zap.Uint32("signal", killed),
|
||||
)
|
||||
return errors.WithStack(tikverr.ErrQueryInterrupted)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -77,8 +77,9 @@ func (s *Security) ToTLSConfig() (tlsConfig *tls.Config, err error) {
|
|||
return
|
||||
}
|
||||
tlsConfig = &tls.Config{
|
||||
RootCAs: certPool,
|
||||
ClientCAs: certPool,
|
||||
RootCAs: certPool,
|
||||
ClientCAs: certPool,
|
||||
MinVersion: tls.VersionTLS10,
|
||||
}
|
||||
|
||||
if len(s.ClusterSSLCert) != 0 && len(s.ClusterSSLKey) != 0 {
|
||||
|
|
|
|||
|
|
@ -64,13 +64,13 @@ var (
|
|||
// ErrTiFlashServerTimeout is the error when tiflash server is timeout.
|
||||
ErrTiFlashServerTimeout = errors.New("tiflash server timeout")
|
||||
// ErrQueryInterrupted is the error when the query is interrupted.
|
||||
ErrQueryInterrupted = errors.New("query interruppted")
|
||||
ErrQueryInterrupted = errors.New("query interrupted")
|
||||
// ErrTiKVStaleCommand is the error that the command is stale in tikv.
|
||||
ErrTiKVStaleCommand = errors.New("tikv stale command")
|
||||
// ErrTiKVMaxTimestampNotSynced is the error that tikv's max timestamp is not synced.
|
||||
ErrTiKVMaxTimestampNotSynced = errors.New("tikv max timestamp not synced")
|
||||
// ErrLockAcquireFailAndNoWaitSet is the error that acquire the lock failed while no wait is setted.
|
||||
ErrLockAcquireFailAndNoWaitSet = errors.New("lock acquired failed and no wait is setted")
|
||||
ErrLockAcquireFailAndNoWaitSet = errors.New("lock acquired failed and no wait is set")
|
||||
// ErrResolveLockTimeout is the error that resolve lock timeout.
|
||||
ErrResolveLockTimeout = errors.New("resolve lock timeout")
|
||||
// ErrLockWaitTimeout is the error that wait for the lock is timeout.
|
||||
|
|
|
|||
20
go.mod
20
go.mod
|
|
@ -14,22 +14,22 @@ require (
|
|||
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
|
||||
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c
|
||||
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
|
||||
github.com/pingcap/kvproto v0.0.0-20230904082117-ecdbf1f8c130
|
||||
github.com/pingcap/kvproto v0.0.0-20231222062942-c0c73f41d0b2
|
||||
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/prometheus/client_golang v1.15.1
|
||||
github.com/prometheus/client_model v0.3.0
|
||||
github.com/stretchr/testify v1.8.2
|
||||
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a
|
||||
github.com/tikv/pd/client v0.0.0-20231204034622-259435d93ae2
|
||||
github.com/tikv/pd/client v0.0.0-20231227041826-d3551ea0bdbe
|
||||
github.com/twmb/murmur3 v1.1.3
|
||||
go.etcd.io/etcd/api/v3 v3.5.10
|
||||
go.etcd.io/etcd/client/v3 v3.5.10
|
||||
go.uber.org/atomic v1.11.0
|
||||
go.uber.org/goleak v1.2.0
|
||||
go.uber.org/zap v1.26.0
|
||||
golang.org/x/sync v0.3.0
|
||||
google.golang.org/grpc v1.59.0
|
||||
golang.org/x/sync v0.4.0
|
||||
google.golang.org/grpc v1.60.1
|
||||
)
|
||||
|
||||
require (
|
||||
|
|
@ -50,13 +50,13 @@ require (
|
|||
go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect
|
||||
go.uber.org/multierr v1.11.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4 // indirect
|
||||
golang.org/x/net v0.17.0 // indirect
|
||||
golang.org/x/sys v0.13.0 // indirect
|
||||
golang.org/x/text v0.13.0 // indirect
|
||||
golang.org/x/net v0.19.0 // indirect
|
||||
golang.org/x/sys v0.15.0 // indirect
|
||||
golang.org/x/text v0.14.0 // indirect
|
||||
golang.org/x/tools v0.9.1 // indirect
|
||||
google.golang.org/genproto v0.0.0-20231016165738-49dd2c1f3d0b // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20231016165738-49dd2c1f3d0b // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b // indirect
|
||||
google.golang.org/genproto v0.0.0-20231211222908-989df2bf70f3 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20231120223509-83a465c0220f // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20231212172506-995d672761c0 // indirect
|
||||
google.golang.org/protobuf v1.31.0 // indirect
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
|
|
|
|||
44
go.sum
44
go.sum
|
|
@ -4,6 +4,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
|
|||
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
|
||||
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
|
||||
github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5 h1:BjkPE3785EwPhhyuFkbINB+2a1xATwk8SNDWnJiD41g=
|
||||
github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5/go.mod h1:jtAfVaU/2cu1+wdSRPWE2c1N2qeAA3K4RH9pYgqwets=
|
||||
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
|
||||
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
||||
|
|
@ -75,8 +77,8 @@ github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgW
|
|||
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
|
||||
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
|
||||
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
|
||||
github.com/pingcap/kvproto v0.0.0-20230904082117-ecdbf1f8c130 h1:qbLm5cOdCWxZ0mt6SaN2aXI+KFekbPqURd6YkNI+XRI=
|
||||
github.com/pingcap/kvproto v0.0.0-20230904082117-ecdbf1f8c130/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
|
||||
github.com/pingcap/kvproto v0.0.0-20231222062942-c0c73f41d0b2 h1:364A6VCS+l0oHBKZKotX9LzmfEtIO/NTccTIQcPp3Ug=
|
||||
github.com/pingcap/kvproto v0.0.0-20231222062942-c0c73f41d0b2/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
|
||||
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
|
||||
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
|
||||
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
|
|
@ -112,8 +114,8 @@ github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ
|
|||
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
||||
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4=
|
||||
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
|
||||
github.com/tikv/pd/client v0.0.0-20231204034622-259435d93ae2 h1:7fnKwFC9pgiOolvnUnquEAb60liIpna+0hFRkopaOSg=
|
||||
github.com/tikv/pd/client v0.0.0-20231204034622-259435d93ae2/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ=
|
||||
github.com/tikv/pd/client v0.0.0-20231227041826-d3551ea0bdbe h1:Zth7WfrApHJdUZJYzrDNiTm1kGria//OTV/CL6aUT60=
|
||||
github.com/tikv/pd/client v0.0.0-20231227041826-d3551ea0bdbe/go.mod h1:f1CP4ERhypFqqAXtE/175CqAvKm7qsLnUM94aZDQfq8=
|
||||
github.com/twmb/murmur3 v1.1.3 h1:D83U0XYKcHRYwYIpBKf3Pks91Z0Byda/9SJ8B6EMRcA=
|
||||
github.com/twmb/murmur3 v1.1.3/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
|
||||
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
|
|
@ -162,8 +164,8 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
|
|||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
|
||||
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
|
||||
golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
|
||||
golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U=
|
||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
|
|
@ -171,19 +173,19 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ
|
|||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
|
||||
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
|
||||
golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ=
|
||||
golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
|
||||
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
|
||||
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
|
||||
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
|
||||
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
|
||||
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
|
||||
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
|
|
@ -204,15 +206,17 @@ google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9Ywl
|
|||
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
|
||||
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
|
||||
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
|
||||
google.golang.org/genproto v0.0.0-20231016165738-49dd2c1f3d0b h1:+YaDE2r2OG8t/z5qmsh7Y+XXwCbvadxxZ0YY6mTdrVA=
|
||||
google.golang.org/genproto v0.0.0-20231016165738-49dd2c1f3d0b/go.mod h1:CgAqfJo+Xmu0GwA0411Ht3OU3OntXwsGmrmjI8ioGXI=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20231016165738-49dd2c1f3d0b h1:CIC2YMXmIhYw6evmhPxBKJ4fmLbOFtXQN/GV3XOZR8k=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20231016165738-49dd2c1f3d0b/go.mod h1:IBQ646DjkDkvUIsVq/cc03FUFQ9wbZu7yE396YcL870=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b h1:ZlWIi1wSK56/8hn4QcBp/j9M7Gt3U/3hZw3mC7vDICo=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b/go.mod h1:swOH3j0KzcDDgGUWr+SNpyTen5YrXjS3eyPzFYKc6lc=
|
||||
google.golang.org/genproto v0.0.0-20231211222908-989df2bf70f3 h1:1hfbdAfFbkmpg41000wDVqr7jUpK/Yo+LPnIxxGzmkg=
|
||||
google.golang.org/genproto v0.0.0-20231211222908-989df2bf70f3/go.mod h1:5RBcpGRxr25RbDzY5w+dmaqpSEvl8Gwl1x2CICf60ic=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20231120223509-83a465c0220f h1:2yNACc1O40tTnrsbk9Cv6oxiW8pxI/pXj0wRtdlYmgY=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20231120223509-83a465c0220f/go.mod h1:Uy9bTZJqmfrw2rIBxgGLnamc78euZULUBrLZ9XTITKI=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20231212172506-995d672761c0 h1:/jFB8jK5R3Sq3i/lmeZO0cATSzFfZaJq1J2Euan3XKU=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20231212172506-995d672761c0/go.mod h1:FUoWkonphQm3RhTS+kOEhF8h0iDpm4tdXolVCeZ9KKA=
|
||||
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
|
||||
google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk=
|
||||
google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98=
|
||||
google.golang.org/grpc v1.60.1 h1:26+wFr+cNqSGFcOXcabYC0lUVJVRa2Sb2ortSK7VrEU=
|
||||
google.golang.org/grpc v1.60.1/go.mod h1:OlCHIeLYqSSsLi6i49B5QGdzaMZK9+M7LXN2FKz4eGM=
|
||||
google.golang.org/grpc/examples v0.0.0-20231221225426-4f03f3ff32c9 h1:ATnmU8nL2NfIyTSiBvJVDIDIr3qBmeW+c7z7XU21eWs=
|
||||
google.golang.org/grpc/examples v0.0.0-20231221225426-4f03f3ff32c9/go.mod h1:j5uROIAAgi3YmtiETMt1LW0d/lHqQ7wwrIY4uGRXLQ4=
|
||||
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
|
||||
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
||||
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
|
||||
|
|
|
|||
|
|
@ -2502,3 +2502,13 @@ func (s *testCommitterSuite) TestExtractKeyExistsErr() {
|
|||
s.True(txn.GetMemBuffer().TryLock())
|
||||
txn.GetMemBuffer().Unlock()
|
||||
}
|
||||
|
||||
func (s *testCommitterSuite) TestKillSignal() {
|
||||
txn := s.begin()
|
||||
err := txn.Set([]byte("key"), []byte("value"))
|
||||
s.Nil(err)
|
||||
var killed uint32 = 2
|
||||
txn.SetVars(kv.NewVariables(&killed))
|
||||
err = txn.Commit(context.Background())
|
||||
s.ErrorContains(err, "query interrupted")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,15 +4,15 @@ go 1.21
|
|||
|
||||
require (
|
||||
github.com/ninedraft/israce v0.0.3
|
||||
github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32
|
||||
github.com/pingcap/errors v0.11.5-0.20231212100244-799fae176cfb
|
||||
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c
|
||||
github.com/pingcap/kvproto v0.0.0-20231122054644-fb0f5c2a0a10
|
||||
github.com/pingcap/kvproto v0.0.0-20231222062942-c0c73f41d0b2
|
||||
github.com/pingcap/tidb v1.1.0-beta.0.20231201112349-7353fbeea8c0
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/stretchr/testify v1.8.4
|
||||
github.com/tidwall/gjson v1.14.1
|
||||
github.com/tikv/client-go/v2 v2.0.8-0.20231201024404-0ff16620f6c0
|
||||
github.com/tikv/pd/client v0.0.0-20231204034622-259435d93ae2
|
||||
github.com/tikv/client-go/v2 v2.0.8-0.20231225015355-db2e85c4631a
|
||||
github.com/tikv/pd/client v0.0.0-20231227041826-d3551ea0bdbe
|
||||
go.uber.org/goleak v1.3.0
|
||||
)
|
||||
|
||||
|
|
@ -97,19 +97,19 @@ require (
|
|||
go.uber.org/atomic v1.11.0 // indirect
|
||||
go.uber.org/multierr v1.11.0 // indirect
|
||||
go.uber.org/zap v1.26.0 // indirect
|
||||
golang.org/x/crypto v0.16.0 // indirect
|
||||
golang.org/x/crypto v0.17.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect
|
||||
golang.org/x/net v0.19.0 // indirect
|
||||
golang.org/x/sync v0.5.0 // indirect
|
||||
golang.org/x/sys v0.15.0 // indirect
|
||||
golang.org/x/text v0.14.0 // indirect
|
||||
golang.org/x/time v0.5.0 // indirect
|
||||
golang.org/x/tools v0.15.0 // indirect
|
||||
google.golang.org/genproto v0.0.0-20231106174013-bbf56f31fb17 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20231030173426-d783a09b4405 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f // indirect
|
||||
google.golang.org/grpc v1.59.0 // indirect
|
||||
google.golang.org/protobuf v1.31.0 // indirect
|
||||
golang.org/x/tools v0.16.1 // indirect
|
||||
google.golang.org/genproto v0.0.0-20231211222908-989df2bf70f3 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20231120223509-83a465c0220f // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20231212172506-995d672761c0 // indirect
|
||||
google.golang.org/grpc v1.60.1 // indirect
|
||||
google.golang.org/protobuf v1.32.0 // indirect
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
|
||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
|
|
@ -117,5 +117,8 @@ require (
|
|||
|
||||
replace (
|
||||
github.com/go-ldap/ldap/v3 => github.com/YangKeao/ldap/v3 v3.4.5-0.20230421065457-369a3bab1117
|
||||
// remove this after tidb https://github.com/pingcap/tidb/pull/49833 is merged.
|
||||
github.com/pingcap/tidb => github.com/glorv/tidb v1.1.0-beta.0.20231227062622-cf95945480de
|
||||
github.com/pingcap/tidb/pkg/parser => github.com/glorv/tidb/pkg/parser v0.0.0-20231227062622-cf95945480de
|
||||
github.com/tikv/client-go/v2 => ../
|
||||
)
|
||||
|
|
|
|||
|
|
@ -59,6 +59,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
|||
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
|
||||
github.com/blacktear23/go-proxyprotocol v1.0.6 h1:eTt6UMpEnq59NjON49b3Cay8Dm0sCs1nDliwgkyEsRM=
|
||||
github.com/blacktear23/go-proxyprotocol v1.0.6/go.mod h1:FSCbgnRZrQXazBLL5snfBbrcFSMtcmUDhSRb9OfFA1o=
|
||||
github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5 h1:BjkPE3785EwPhhyuFkbINB+2a1xATwk8SNDWnJiD41g=
|
||||
github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5/go.mod h1:jtAfVaU/2cu1+wdSRPWE2c1N2qeAA3K4RH9pYgqwets=
|
||||
github.com/carlmjohnson/flagext v0.21.0 h1:/c4uK3ie786Z7caXLcIMvePNSSiH3bQVGDvmGLMme60=
|
||||
github.com/carlmjohnson/flagext v0.21.0/go.mod h1:Eenv0epIUAr4NuedNmkzI8WmBmjIxZC239XcKxYS2ac=
|
||||
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
|
||||
|
|
@ -147,6 +149,10 @@ github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbS
|
|||
github.com/gavv/httpexpect v2.0.0+incompatible/go.mod h1:x+9tiU1YnrOvnB725RkpoLv1M62hOWzwo5OXotisrKc=
|
||||
github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3/go.mod h1:VJ0WA2NBN22VlZ2dKZQPAPnyWw5XTlK1KymzLKsr59s=
|
||||
github.com/gin-gonic/gin v1.4.0/go.mod h1:OW2EZn3DO8Ln9oIKOvM++LBO+5UPHJJDH72/q/3rZdM=
|
||||
github.com/glorv/tidb v1.1.0-beta.0.20231227062622-cf95945480de h1:neEWeaJojIdQRr7fFnky2eL5aBDQnuW5X3UQA+ryQDk=
|
||||
github.com/glorv/tidb v1.1.0-beta.0.20231227062622-cf95945480de/go.mod h1:5QsYBq2fAXRU8X1Udongbursa7DwybrUg4QQQ8ylGwM=
|
||||
github.com/glorv/tidb/pkg/parser v0.0.0-20231227062622-cf95945480de h1:Aq/zT9aLXVE+Zan4MWp6NPmY3vfqullStb8gssWxiv4=
|
||||
github.com/glorv/tidb/pkg/parser v0.0.0-20231227062622-cf95945480de/go.mod h1:yRkiqLFwIqibYg2P7h4bclHjHcJiIFRLKhGRyBcKYus=
|
||||
github.com/go-asn1-ber/asn1-ber v1.5.4 h1:vXT6d/FNDiELJnLb6hGNa309LMsrCoYFvpwHDF0+Y1A=
|
||||
github.com/go-asn1-ber/asn1-ber v1.5.4/go.mod h1:hEBeB/ic+5LoWskz+yKT7vGhhPYkProFKoKdwZRWMe0=
|
||||
github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98=
|
||||
|
|
@ -162,6 +168,8 @@ github.com/go-martini/martini v0.0.0-20170121215854-22fa46961aab/go.mod h1:/P9AE
|
|||
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
|
||||
github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE=
|
||||
github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78=
|
||||
github.com/go-resty/resty/v2 v2.7.0 h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY=
|
||||
github.com/go-resty/resty/v2 v2.7.0/go.mod h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I=
|
||||
github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI=
|
||||
github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
|
||||
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
|
||||
|
|
@ -395,8 +403,8 @@ github.com/pingcap/badger v1.5.1-0.20230103063557-828f39b09b6d/go.mod h1:p8QnkZn
|
|||
github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
|
||||
github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
|
||||
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
|
||||
github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32 h1:m5ZsBa5o/0CkzZXfXLaThzKuR85SnHHetqBCpzQ30h8=
|
||||
github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
|
||||
github.com/pingcap/errors v0.11.5-0.20231212100244-799fae176cfb h1:yqyP+k0mgRPpXJQDOCrtaG2YZym0ZDD+vt5JzlBUkrw=
|
||||
github.com/pingcap/errors v0.11.5-0.20231212100244-799fae176cfb/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
|
||||
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgWM9fSBIvaxsJHuGP0uM74HXtv3MyyGQ=
|
||||
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
|
||||
github.com/pingcap/fn v1.0.0 h1:CyA6AxcOZkQh52wIqYlAmaVmF6EvrcqFywP463pjA8g=
|
||||
|
|
@ -404,18 +412,14 @@ github.com/pingcap/fn v1.0.0/go.mod h1:u9WZ1ZiOD1RpNhcI42RucFh/lBuzTu6rw88a+oF2Z
|
|||
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
|
||||
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
|
||||
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
|
||||
github.com/pingcap/kvproto v0.0.0-20231122054644-fb0f5c2a0a10 h1:qnhfzwdWOy8oOSZYX7/aK9XKDs4hJ6P/Gg+s7Sr9VKY=
|
||||
github.com/pingcap/kvproto v0.0.0-20231122054644-fb0f5c2a0a10/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
|
||||
github.com/pingcap/kvproto v0.0.0-20231222062942-c0c73f41d0b2 h1:364A6VCS+l0oHBKZKotX9LzmfEtIO/NTccTIQcPp3Ug=
|
||||
github.com/pingcap/kvproto v0.0.0-20231222062942-c0c73f41d0b2/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
|
||||
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
|
||||
github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
|
||||
github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22 h1:2SOzvGvE8beiC1Y4g9Onkvu6UmuBBOeWRGQEjJaT/JY=
|
||||
github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
|
||||
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I=
|
||||
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM=
|
||||
github.com/pingcap/tidb v1.1.0-beta.0.20231201112349-7353fbeea8c0 h1:QWOvH4VfaMqTw/B3nqT73FJ1C3JYgPUiZKiyBA8DrmI=
|
||||
github.com/pingcap/tidb v1.1.0-beta.0.20231201112349-7353fbeea8c0/go.mod h1:9SkqAFGe+/23yW0Qt7UhedUxzf/+0To1t3QLI8TXN08=
|
||||
github.com/pingcap/tidb/pkg/parser v0.0.0-20231020070330-48d69d39c3d0 h1:X1F/ScGDisfPvS8wsFqFMONBFm8XqmbprBQQVXnIHUk=
|
||||
github.com/pingcap/tidb/pkg/parser v0.0.0-20231020070330-48d69d39c3d0/go.mod h1:5s4ZS7VJ9W8ed0/hHpXZ9eKt3URTYQAsOLtgX6ysy/U=
|
||||
github.com/pingcap/tipb v0.0.0-20230919054518-dfd7d194838f h1:NCiI4Wyu4GkViLGTu6cYcxt79LZ1SenBBQX1OwEV6Jg=
|
||||
github.com/pingcap/tipb v0.0.0-20230919054518-dfd7d194838f/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
|
||||
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 h1:KoWmjvw+nsYOo29YJK9vDA65RGE3NrOnUtO7a+RF9HU=
|
||||
|
|
@ -516,8 +520,8 @@ github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
|
|||
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
|
||||
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
|
||||
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
|
||||
github.com/tikv/pd/client v0.0.0-20231204034622-259435d93ae2 h1:7fnKwFC9pgiOolvnUnquEAb60liIpna+0hFRkopaOSg=
|
||||
github.com/tikv/pd/client v0.0.0-20231204034622-259435d93ae2/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ=
|
||||
github.com/tikv/pd/client v0.0.0-20231227041826-d3551ea0bdbe h1:Zth7WfrApHJdUZJYzrDNiTm1kGria//OTV/CL6aUT60=
|
||||
github.com/tikv/pd/client v0.0.0-20231227041826-d3551ea0bdbe/go.mod h1:f1CP4ERhypFqqAXtE/175CqAvKm7qsLnUM94aZDQfq8=
|
||||
github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs=
|
||||
github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU=
|
||||
github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI=
|
||||
|
|
@ -610,8 +614,8 @@ go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A
|
|||
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||
go.uber.org/mock v0.3.0 h1:3mUxI1No2/60yUYax92Pt8eNOEecx2D3lcXZh2NEZJo=
|
||||
go.uber.org/mock v0.3.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc=
|
||||
go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=
|
||||
go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc=
|
||||
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
|
||||
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
|
||||
go.uber.org/multierr v1.4.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
|
||||
|
|
@ -636,8 +640,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
|
|||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU=
|
||||
golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY=
|
||||
golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
|
||||
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
|
||||
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
|
||||
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI=
|
||||
|
|
@ -767,8 +771,8 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f
|
|||
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
|
||||
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
|
||||
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
|
||||
golang.org/x/tools v0.15.0 h1:zdAyfUGbYmuVokhzVmghFl2ZJh5QhcfebBgmVPFYA+8=
|
||||
golang.org/x/tools v0.15.0/go.mod h1:hpksKq4dtpQWS1uQ61JkdqWM3LscIS6Slf+VVkm+wQk=
|
||||
golang.org/x/tools v0.16.1 h1:TLyB3WofjdOEepBHAU20JdNC1Zbg87elYofWYAY5oZA=
|
||||
golang.org/x/tools v0.16.1/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
|
|
@ -783,19 +787,19 @@ google.golang.org/api v0.149.0 h1:b2CqT6kG+zqJIVKRQ3ELJVLN1PwHZ6DJ3dW8yl82rgY=
|
|||
google.golang.org/api v0.149.0/go.mod h1:Mwn1B7JTXrzXtnvmzQE2BD6bYZQ8DShKZDZbeN9I7qI=
|
||||
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
|
||||
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
|
||||
google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c=
|
||||
google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
|
||||
google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM=
|
||||
google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds=
|
||||
google.golang.org/genproto v0.0.0-20180518175338-11a468237815/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
|
||||
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
|
||||
google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
|
||||
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
|
||||
google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
|
||||
google.golang.org/genproto v0.0.0-20231106174013-bbf56f31fb17 h1:wpZ8pe2x1Q3f2KyT5f8oP/fa9rHAKgFPr/HZdNuS+PQ=
|
||||
google.golang.org/genproto v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:J7XzRzVy1+IPwWHZUzoD0IccYZIrXILAQpc+Qy9CMhY=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20231030173426-d783a09b4405 h1:HJMDndgxest5n2y77fnErkM62iUsptE/H8p0dC2Huo4=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20231030173426-d783a09b4405/go.mod h1:oT32Z4o8Zv2xPQTg0pbVaPr0MPOH6f14RgXt7zfIpwg=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f h1:ultW7fxlIvee4HYrtnaRPon9HpEgFk5zYpmfMgtKB5I=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20231120223509-83a465c0220f/go.mod h1:L9KNLi232K1/xB6f7AlSX692koaRnKaWSR0stBki0Yc=
|
||||
google.golang.org/genproto v0.0.0-20231211222908-989df2bf70f3 h1:1hfbdAfFbkmpg41000wDVqr7jUpK/Yo+LPnIxxGzmkg=
|
||||
google.golang.org/genproto v0.0.0-20231211222908-989df2bf70f3/go.mod h1:5RBcpGRxr25RbDzY5w+dmaqpSEvl8Gwl1x2CICf60ic=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20231120223509-83a465c0220f h1:2yNACc1O40tTnrsbk9Cv6oxiW8pxI/pXj0wRtdlYmgY=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20231120223509-83a465c0220f/go.mod h1:Uy9bTZJqmfrw2rIBxgGLnamc78euZULUBrLZ9XTITKI=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20231212172506-995d672761c0 h1:/jFB8jK5R3Sq3i/lmeZO0cATSzFfZaJq1J2Euan3XKU=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20231212172506-995d672761c0/go.mod h1:FUoWkonphQm3RhTS+kOEhF8h0iDpm4tdXolVCeZ9KKA=
|
||||
google.golang.org/grpc v0.0.0-20180607172857-7a6a684ca69e/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
|
||||
google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
|
||||
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
|
||||
|
|
@ -804,8 +808,10 @@ google.golang.org/grpc v1.24.0/go.mod h1:XDChyiUovWa60DnaeDeZmSW86xtLtjtZbwvSiRn
|
|||
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
|
||||
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
|
||||
google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk=
|
||||
google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk=
|
||||
google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98=
|
||||
google.golang.org/grpc v1.60.1 h1:26+wFr+cNqSGFcOXcabYC0lUVJVRa2Sb2ortSK7VrEU=
|
||||
google.golang.org/grpc v1.60.1/go.mod h1:OlCHIeLYqSSsLi6i49B5QGdzaMZK9+M7LXN2FKz4eGM=
|
||||
google.golang.org/grpc/examples v0.0.0-20231221225426-4f03f3ff32c9 h1:ATnmU8nL2NfIyTSiBvJVDIDIr3qBmeW+c7z7XU21eWs=
|
||||
google.golang.org/grpc/examples v0.0.0-20231221225426-4f03f3ff32c9/go.mod h1:j5uROIAAgi3YmtiETMt1LW0d/lHqQ7wwrIY4uGRXLQ4=
|
||||
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
|
||||
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
|
||||
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
|
||||
|
|
@ -814,8 +820,8 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi
|
|||
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
|
||||
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
|
||||
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
||||
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
|
||||
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
|
||||
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
|
||||
google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
|
|
|
|||
|
|
@ -55,7 +55,7 @@ func (s *apiTestSuite) SetupTest() {
|
|||
rpcClient := tikv.NewRPCClient()
|
||||
require.NoError(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`))
|
||||
// Set PD HTTP client.
|
||||
s.store, err = tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0, tikv.WithPDHTTPClient(addrs))
|
||||
s.store, err = tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0, tikv.WithPDHTTPClient("pd-api-test", addrs))
|
||||
require.NoError(err)
|
||||
storeID := uint64(1)
|
||||
s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, nil)
|
||||
|
|
|
|||
|
|
@ -307,11 +307,11 @@ func (c *mockPDClient) UpdateKeyspaceState(ctx context.Context, id uint32, state
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
func (c *mockPDClient) ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) {
|
||||
func (c *mockPDClient) ListResourceGroups(ctx context.Context, opts ...pd.GetResourceGroupOption) ([]*rmpb.ResourceGroup, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (c *mockPDClient) GetResourceGroup(ctx context.Context, resourceGroupName string) (*rmpb.ResourceGroup, error) {
|
||||
func (c *mockPDClient) GetResourceGroup(ctx context.Context, resourceGroupName string, opts ...pd.GetResourceGroupOption) (*rmpb.ResourceGroup, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -76,6 +76,11 @@ func (b *batchCommandsEntry) isCanceled() bool {
|
|||
return atomic.LoadInt32(&b.canceled) == 1
|
||||
}
|
||||
|
||||
// TODO: implement by the request priority.
|
||||
func (b *batchCommandsEntry) priority() int {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (b *batchCommandsEntry) error(err error) {
|
||||
b.err = err
|
||||
close(b.res)
|
||||
|
|
@ -87,7 +92,7 @@ type batchCommandsBuilder struct {
|
|||
// Each BatchCommandsRequest_Request sent to a store has a unique identity to
|
||||
// distinguish its response.
|
||||
idAlloc uint64
|
||||
entries []*batchCommandsEntry
|
||||
entries *PriorityQueue
|
||||
requests []*tikvpb.BatchCommandsRequest_Request
|
||||
requestIDs []uint64
|
||||
// In most cases, there isn't any forwardingReq.
|
||||
|
|
@ -95,11 +100,11 @@ type batchCommandsBuilder struct {
|
|||
}
|
||||
|
||||
func (b *batchCommandsBuilder) len() int {
|
||||
return len(b.entries)
|
||||
return b.entries.Len()
|
||||
}
|
||||
|
||||
func (b *batchCommandsBuilder) push(entry *batchCommandsEntry) {
|
||||
b.entries = append(b.entries, entry)
|
||||
b.entries.Push(entry)
|
||||
}
|
||||
|
||||
// build builds BatchCommandsRequests and calls collect() for each valid entry.
|
||||
|
|
@ -108,7 +113,8 @@ func (b *batchCommandsBuilder) push(entry *batchCommandsEntry) {
|
|||
func (b *batchCommandsBuilder) build(
|
||||
collect func(id uint64, e *batchCommandsEntry),
|
||||
) (*tikvpb.BatchCommandsRequest, map[string]*tikvpb.BatchCommandsRequest) {
|
||||
for _, e := range b.entries {
|
||||
for _, entry := range b.entries.All() {
|
||||
e := entry.(*batchCommandsEntry)
|
||||
if e.isCanceled() {
|
||||
continue
|
||||
}
|
||||
|
|
@ -140,8 +146,8 @@ func (b *batchCommandsBuilder) build(
|
|||
}
|
||||
|
||||
func (b *batchCommandsBuilder) cancel(e error) {
|
||||
for _, entry := range b.entries {
|
||||
entry.error(e)
|
||||
for _, entry := range b.entries.All() {
|
||||
entry.(*batchCommandsEntry).error(e)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -152,10 +158,7 @@ func (b *batchCommandsBuilder) reset() {
|
|||
// The data in the cap part of the slice would reference the prewrite keys whose
|
||||
// underlying memory is borrowed from memdb. The reference cause GC can't release
|
||||
// the memdb, leading to serious memory leak problems in the large transaction case.
|
||||
for i := 0; i < len(b.entries); i++ {
|
||||
b.entries[i] = nil
|
||||
}
|
||||
b.entries = b.entries[:0]
|
||||
b.entries.Reset()
|
||||
for i := 0; i < len(b.requests); i++ {
|
||||
b.requests[i] = nil
|
||||
}
|
||||
|
|
@ -170,7 +173,7 @@ func (b *batchCommandsBuilder) reset() {
|
|||
func newBatchCommandsBuilder(maxBatchSize uint) *batchCommandsBuilder {
|
||||
return &batchCommandsBuilder{
|
||||
idAlloc: 0,
|
||||
entries: make([]*batchCommandsEntry, 0, maxBatchSize),
|
||||
entries: NewPriorityQueue(),
|
||||
requests: make([]*tikvpb.BatchCommandsRequest_Request, 0, maxBatchSize),
|
||||
requestIDs: make([]uint64, 0, maxBatchSize),
|
||||
forwardingReqs: make(map[string]*tikvpb.BatchCommandsRequest),
|
||||
|
|
|
|||
|
|
@ -475,7 +475,7 @@ func TestBatchCommandsBuilder(t *testing.T) {
|
|||
// Test reset
|
||||
builder.reset()
|
||||
assert.Equal(t, builder.len(), 0)
|
||||
assert.Equal(t, len(builder.entries), 0)
|
||||
assert.Equal(t, builder.entries.Len(), 0)
|
||||
assert.Equal(t, len(builder.requests), 0)
|
||||
assert.Equal(t, len(builder.requestIDs), 0)
|
||||
assert.Equal(t, len(builder.forwardingReqs), 0)
|
||||
|
|
@ -483,7 +483,6 @@ func TestBatchCommandsBuilder(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestTraceExecDetails(t *testing.T) {
|
||||
|
||||
assert.Nil(t, buildSpanInfoFromResp(nil))
|
||||
assert.Nil(t, buildSpanInfoFromResp(&tikvrpc.Response{}))
|
||||
assert.Nil(t, buildSpanInfoFromResp(&tikvrpc.Response{Resp: &kvrpcpb.GetResponse{}}))
|
||||
|
|
|
|||
|
|
@ -0,0 +1,108 @@
|
|||
// Copyright 2023 TiKV Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package client
|
||||
|
||||
import "container/heap"
|
||||
|
||||
// Item is the interface that all entries in a priority queue must implement.
|
||||
type Item interface {
|
||||
priority() int
|
||||
}
|
||||
|
||||
// entry is an entry in a priority queue.
|
||||
type entry struct {
|
||||
entry Item
|
||||
index int
|
||||
}
|
||||
|
||||
// prioritySlice implements heap.Interface and holds Entries.
|
||||
type prioritySlice []entry
|
||||
|
||||
// Len returns the length of the priority queue.
|
||||
func (ps prioritySlice) Len() int {
|
||||
return len(ps)
|
||||
}
|
||||
|
||||
// Less compares two entries in the priority queue.
|
||||
// The higher priority entry is the one with the lower value.
|
||||
func (ps prioritySlice) Less(i, j int) bool {
|
||||
return ps[i].entry.priority() > ps[j].entry.priority()
|
||||
}
|
||||
|
||||
// Swap swaps two entries in the priority queue.
|
||||
func (ps prioritySlice) Swap(i, j int) {
|
||||
ps[i], ps[j] = ps[j], ps[i]
|
||||
ps[i].index = i
|
||||
ps[j].index = j
|
||||
}
|
||||
|
||||
// Push adds an entry to the priority queue.
|
||||
func (ps *prioritySlice) Push(x interface{}) {
|
||||
item := x.(entry)
|
||||
item.index = len(*ps)
|
||||
*ps = append(*ps, item)
|
||||
}
|
||||
|
||||
// Pop removes the highest priority entry from the priority queue.
|
||||
func (ps *prioritySlice) Pop() interface{} {
|
||||
old := *ps
|
||||
n := len(old)
|
||||
item := old[n-1]
|
||||
item.index = -1
|
||||
*ps = old[0 : n-1]
|
||||
return item
|
||||
}
|
||||
|
||||
// PriorityQueue is a priority queue.
|
||||
type PriorityQueue struct {
|
||||
ps prioritySlice
|
||||
}
|
||||
|
||||
// NewPriorityQueue creates a new priority queue.
|
||||
func NewPriorityQueue() *PriorityQueue {
|
||||
return &PriorityQueue{}
|
||||
}
|
||||
|
||||
// Len returns the length of the priority queue.
|
||||
func (pq *PriorityQueue) Len() int {
|
||||
return pq.ps.Len()
|
||||
}
|
||||
|
||||
// Push adds an entry to the priority queue.
|
||||
func (pq *PriorityQueue) Push(item Item) {
|
||||
heap.Push(&pq.ps, entry{entry: item})
|
||||
}
|
||||
|
||||
// Pop removes the highest priority entry from the priority queue.
|
||||
func (pq *PriorityQueue) Pop() Item {
|
||||
return heap.Pop(&pq.ps).(entry).entry
|
||||
}
|
||||
|
||||
// All returns all entries in the priority queue not ensure the priority.
|
||||
func (pq *PriorityQueue) All() []Item {
|
||||
items := make([]Item, 0, pq.Len())
|
||||
for i := 0; i < pq.Len(); i++ {
|
||||
items = append(items, pq.ps[i].entry)
|
||||
}
|
||||
return items
|
||||
}
|
||||
|
||||
// Reset resets the priority queue.
|
||||
func (pq *PriorityQueue) Reset() {
|
||||
for i := 0; i < pq.Len(); i++ {
|
||||
pq.ps[i].entry = nil
|
||||
}
|
||||
pq.ps = pq.ps[:0]
|
||||
}
|
||||
|
|
@ -0,0 +1,49 @@
|
|||
// Copyright 2023 TiKV Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package client
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type FakeItem struct {
|
||||
pri int
|
||||
value int
|
||||
}
|
||||
|
||||
func (f *FakeItem) priority() int {
|
||||
return f.pri
|
||||
}
|
||||
|
||||
func TestPriority(t *testing.T) {
|
||||
re := require.New(t)
|
||||
pq := NewPriorityQueue()
|
||||
for i := 1; i <= 5; i++ {
|
||||
pq.Push(&FakeItem{value: i, pri: i})
|
||||
}
|
||||
re.Equal(5, pq.Len())
|
||||
arr := pq.All()
|
||||
re.Len(arr, 5)
|
||||
pq.Reset()
|
||||
re.Equal(0, pq.Len())
|
||||
for i := 1; i <= 5; i++ {
|
||||
pq.Push(&FakeItem{value: i, pri: i})
|
||||
}
|
||||
for i := pq.Len(); i > 0; i-- {
|
||||
re.Equal(i, pq.Pop().(*FakeItem).value)
|
||||
}
|
||||
}
|
||||
|
|
@ -217,23 +217,38 @@ func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err e
|
|||
atomic.AddInt64(&detail.BackoffCount, 1)
|
||||
}
|
||||
|
||||
if b.vars != nil && b.vars.Killed != nil {
|
||||
if atomic.LoadUint32(b.vars.Killed) == 1 {
|
||||
return errors.WithStack(tikverr.ErrQueryInterrupted)
|
||||
}
|
||||
err2 := b.checkKilled()
|
||||
if err2 != nil {
|
||||
return err2
|
||||
}
|
||||
|
||||
var startTs interface{}
|
||||
if ts := b.ctx.Value(TxnStartKey); ts != nil {
|
||||
startTs = ts
|
||||
}
|
||||
logutil.Logger(b.ctx).Debug("retry later",
|
||||
logutil.Logger(b.ctx).Debug(
|
||||
"retry later",
|
||||
zap.Error(err),
|
||||
zap.Int("totalSleep", b.totalSleep),
|
||||
zap.Int("excludedSleep", b.excludedSleep),
|
||||
zap.Int("maxSleep", b.maxSleep),
|
||||
zap.Stringer("type", cfg),
|
||||
zap.Reflect("txnStartTS", startTs))
|
||||
zap.Reflect("txnStartTS", startTs),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *Backoffer) checkKilled() error {
|
||||
if b.vars != nil && b.vars.Killed != nil {
|
||||
killed := atomic.LoadUint32(b.vars.Killed)
|
||||
if killed != 0 {
|
||||
logutil.BgLogger().Info(
|
||||
"backoff stops because a killed signal is received",
|
||||
zap.Uint32("signal", killed),
|
||||
)
|
||||
return errors.WithStack(tikverr.ErrQueryInterrupted)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -408,6 +408,12 @@ func (r *Region) invalidate(reason InvalidReason) {
|
|||
atomic.StoreInt64(&r.lastAccess, invalidatedLastAccessTime)
|
||||
}
|
||||
|
||||
// invalidateWithoutMetrics invalidates a region without metrics, next time it will got null result.
|
||||
func (r *Region) invalidateWithoutMetrics(reason InvalidReason) {
|
||||
atomic.StoreInt32((*int32)(&r.invalidReason), int32(reason))
|
||||
atomic.StoreInt64(&r.lastAccess, invalidatedLastAccessTime)
|
||||
}
|
||||
|
||||
// scheduleReload schedules reload region request in next LocateKey.
|
||||
func (r *Region) scheduleReload() {
|
||||
oldValue := atomic.LoadInt32(&r.syncFlag)
|
||||
|
|
@ -448,7 +454,7 @@ func newRegionIndexMu(rs []*Region) *regionIndexMu {
|
|||
r.latestVersions = make(map[uint64]RegionVerID)
|
||||
r.sorted = NewSortedRegions(btreeDegree)
|
||||
for _, region := range rs {
|
||||
r.insertRegionToCache(region, true)
|
||||
r.insertRegionToCache(region, true, false)
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
|
@ -543,6 +549,18 @@ func NewRegionCache(pdClient pd.Client) *RegionCache {
|
|||
return c
|
||||
}
|
||||
|
||||
// only used fot test.
|
||||
func newTestRegionCache() *RegionCache {
|
||||
c := &RegionCache{}
|
||||
c.storeMu.stores = make(map[uint64]*Store)
|
||||
c.tiflashComputeStoreMu.needReload = true
|
||||
c.tiflashComputeStoreMu.stores = make([]*Store, 0)
|
||||
c.notifyCheckCh = make(chan struct{}, 1)
|
||||
c.ctx, c.cancelFunc = context.WithCancel(context.Background())
|
||||
c.mu = *newRegionIndexMu(nil)
|
||||
return c
|
||||
}
|
||||
|
||||
// clear clears all cached data in the RegionCache. It's only used in tests.
|
||||
func (c *RegionCache) clear() {
|
||||
c.mu = *newRegionIndexMu(nil)
|
||||
|
|
@ -552,8 +570,8 @@ func (c *RegionCache) clear() {
|
|||
}
|
||||
|
||||
// thread unsafe, should use with lock
|
||||
func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) {
|
||||
c.mu.insertRegionToCache(cachedRegion, invalidateOldRegion)
|
||||
func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool, shouldCount bool) bool {
|
||||
return c.mu.insertRegionToCache(cachedRegion, invalidateOldRegion, shouldCount)
|
||||
}
|
||||
|
||||
// Close releases region cache's resource.
|
||||
|
|
@ -1099,7 +1117,7 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey
|
|||
r = c.searchCachedRegion(key, isEndKey)
|
||||
if r == nil {
|
||||
// load region when it is not exists or expired.
|
||||
lr, err := c.loadRegion(bo, key, isEndKey)
|
||||
lr, err := c.loadRegion(bo, key, isEndKey, pd.WithAllowFollowerHandle())
|
||||
if err != nil {
|
||||
// no region data, return error if failure.
|
||||
return nil, err
|
||||
|
|
@ -1107,8 +1125,20 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey
|
|||
logutil.Eventf(bo.GetCtx(), "load region %d from pd, due to cache-miss", lr.GetID())
|
||||
r = lr
|
||||
c.mu.Lock()
|
||||
c.insertRegionToCache(r, true)
|
||||
stale := !c.insertRegionToCache(r, true, true)
|
||||
c.mu.Unlock()
|
||||
// just retry once, it won't bring much overhead.
|
||||
if stale {
|
||||
lr, err = c.loadRegion(bo, key, isEndKey)
|
||||
if err != nil {
|
||||
// no region data, return error if failure.
|
||||
return nil, err
|
||||
}
|
||||
r = lr
|
||||
c.mu.Lock()
|
||||
c.insertRegionToCache(r, true, true)
|
||||
c.mu.Unlock()
|
||||
}
|
||||
} else if r.checkNeedReloadAndMarkUpdated() {
|
||||
// load region when it be marked as need reload.
|
||||
lr, err := c.loadRegion(bo, key, isEndKey)
|
||||
|
|
@ -1121,7 +1151,7 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey
|
|||
logutil.Eventf(bo.GetCtx(), "load region %d from pd, due to need-reload", lr.GetID())
|
||||
r = lr
|
||||
c.mu.Lock()
|
||||
c.insertRegionToCache(r, true)
|
||||
c.insertRegionToCache(r, true, true)
|
||||
c.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
|
@ -1262,7 +1292,7 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K
|
|||
} else {
|
||||
r = lr
|
||||
c.mu.Lock()
|
||||
c.insertRegionToCache(r, true)
|
||||
c.insertRegionToCache(r, true, true)
|
||||
c.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
|
@ -1281,7 +1311,7 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K
|
|||
}
|
||||
|
||||
c.mu.Lock()
|
||||
c.insertRegionToCache(r, true)
|
||||
c.insertRegionToCache(r, true, true)
|
||||
c.mu.Unlock()
|
||||
return &KeyLocation{
|
||||
Region: r.VerID(),
|
||||
|
|
@ -1319,7 +1349,7 @@ func (c *RegionCache) reloadRegion(regionID uint64) {
|
|||
return
|
||||
}
|
||||
c.mu.Lock()
|
||||
c.insertRegionToCache(lr, false)
|
||||
c.insertRegionToCache(lr, false, false)
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
|
|
@ -1409,7 +1439,7 @@ func (c *RegionCache) BatchLoadRegionsWithKeyRange(bo *retry.Backoffer, startKey
|
|||
// TODO(youjiali1995): scanRegions always fetch regions from PD and these regions don't contain buckets information
|
||||
// for less traffic, so newly inserted regions in region cache don't have buckets information. We should improve it.
|
||||
for _, region := range regions {
|
||||
c.insertRegionToCache(region, true)
|
||||
c.insertRegionToCache(region, true, false)
|
||||
}
|
||||
|
||||
return
|
||||
|
|
@ -1485,9 +1515,32 @@ func (mu *regionIndexMu) removeVersionFromCache(oldVer RegionVerID, regionID uin
|
|||
// It should be protected by c.mu.l.Lock().
|
||||
// if `invalidateOldRegion` is false, the old region cache should be still valid,
|
||||
// and it may still be used by some kv requests.
|
||||
func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) {
|
||||
oldRegion := mu.sorted.ReplaceOrInsert(cachedRegion)
|
||||
if oldRegion != nil {
|
||||
// Moreover, it will return false if the region is stale.
|
||||
func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool, shouldCount bool) bool {
|
||||
newVer := cachedRegion.VerID()
|
||||
oldVer, ok := mu.latestVersions[newVer.id]
|
||||
// There are two or more situations in which the region we got is stale.
|
||||
// The first case is that the process of getting a region is concurrent.
|
||||
// The stale region may be returned later due to network reasons.
|
||||
// The second case is that the region may be obtained from the PD follower,
|
||||
// and there is the synchronization time between the pd follower and the leader.
|
||||
// So we should check the epoch.
|
||||
if ok && (oldVer.GetVer() > newVer.GetVer() || oldVer.GetConfVer() > newVer.GetConfVer()) {
|
||||
logutil.BgLogger().Debug("get stale region",
|
||||
zap.Uint64("region", newVer.GetID()), zap.Uint64("new-ver", newVer.GetVer()), zap.Uint64("new-conf", newVer.GetConfVer()),
|
||||
zap.Uint64("old-ver", oldVer.GetVer()), zap.Uint64("old-conf", oldVer.GetConfVer()))
|
||||
return false
|
||||
}
|
||||
// Also check and remove the intersecting regions including the old region.
|
||||
intersectedRegions, stale := mu.sorted.removeIntersecting(cachedRegion, newVer)
|
||||
if stale {
|
||||
return false
|
||||
}
|
||||
// Insert the region (won't replace because of above deletion).
|
||||
mu.sorted.ReplaceOrInsert(cachedRegion)
|
||||
// Inherit the workTiKVIdx, workTiFlashIdx and buckets from the first intersected region.
|
||||
if len(intersectedRegions) > 0 {
|
||||
oldRegion := intersectedRegions[0].cachedRegion
|
||||
store := cachedRegion.getStore()
|
||||
oldRegionStore := oldRegion.getStore()
|
||||
// TODO(youjiali1995): remove this because the new retry logic can handle this issue.
|
||||
|
|
@ -1500,11 +1553,6 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOld
|
|||
if InvalidReason(atomic.LoadInt32((*int32)(&oldRegion.invalidReason))) == NoLeader {
|
||||
store.workTiKVIdx = (oldRegionStore.workTiKVIdx + 1) % AccessIndex(store.accessStoreNum(tiKVOnly))
|
||||
}
|
||||
// If the old region is still valid, do not invalidate it to avoid unnecessary backoff.
|
||||
if invalidateOldRegion {
|
||||
// Invalidate the old region in case it's not invalidated and some requests try with the stale region information.
|
||||
oldRegion.invalidate(Other)
|
||||
}
|
||||
// Don't refresh TiFlash work idx for region. Otherwise, it will always goto a invalid store which
|
||||
// is under transferring regions.
|
||||
store.workTiFlashIdx.Store(oldRegionStore.workTiFlashIdx.Load())
|
||||
|
|
@ -1513,21 +1561,27 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOld
|
|||
if store.buckets == nil || (oldRegionStore.buckets != nil && store.buckets.GetVersion() < oldRegionStore.buckets.GetVersion()) {
|
||||
store.buckets = oldRegionStore.buckets
|
||||
}
|
||||
mu.removeVersionFromCache(oldRegion.VerID(), cachedRegion.VerID().id)
|
||||
}
|
||||
mu.regions[cachedRegion.VerID()] = cachedRegion
|
||||
newVer := cachedRegion.VerID()
|
||||
latest, ok := mu.latestVersions[cachedRegion.VerID().id]
|
||||
if !ok || latest.GetVer() < newVer.GetVer() || latest.GetConfVer() < newVer.GetConfVer() {
|
||||
mu.latestVersions[cachedRegion.VerID().id] = newVer
|
||||
}
|
||||
// The intersecting regions in the cache are probably stale, clear them.
|
||||
deleted := mu.sorted.removeIntersecting(cachedRegion)
|
||||
for _, region := range deleted {
|
||||
for _, region := range intersectedRegions {
|
||||
mu.removeVersionFromCache(region.cachedRegion.VerID(), region.cachedRegion.GetID())
|
||||
// If the old region is still valid, do not invalidate it to avoid unnecessary backoff.
|
||||
if invalidateOldRegion {
|
||||
// Invalidate the old region in case it's not invalidated and some requests try with the stale region information.
|
||||
if shouldCount {
|
||||
region.cachedRegion.invalidate(Other)
|
||||
} else {
|
||||
region.cachedRegion.invalidateWithoutMetrics(Other)
|
||||
}
|
||||
}
|
||||
}
|
||||
// update related vars.
|
||||
mu.regions[newVer] = cachedRegion
|
||||
mu.latestVersions[newVer.id] = newVer
|
||||
return true
|
||||
}
|
||||
|
||||
} // searchCachedRegion finds a region from cache by key. Like `getCachedRegion`,
|
||||
// searchCachedRegion finds a region from cache by key. Like `getCachedRegion`,
|
||||
// it should be called with c.mu.RLock(), and the returned Region should not be
|
||||
// used after c.mu is RUnlock().
|
||||
// If the given key is the end key of the region that you want, you may set the second argument to true. This is useful
|
||||
|
|
@ -1631,7 +1685,7 @@ func filterUnavailablePeers(region *pd.Region) {
|
|||
// loadRegion loads region from pd client, and picks the first peer as leader.
|
||||
// If the given key is the end key of the region that you want, you may set the second argument to true. This is useful
|
||||
// when processing in reverse order.
|
||||
func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool) (*Region, error) {
|
||||
func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool, opts ...pd.GetRegionOption) (*Region, error) {
|
||||
ctx := bo.GetCtx()
|
||||
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
|
||||
span1 := span.Tracer().StartSpan("loadRegion", opentracing.ChildOf(span.Context()))
|
||||
|
|
@ -1641,6 +1695,7 @@ func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool)
|
|||
|
||||
var backoffErr error
|
||||
searchPrev := false
|
||||
opts = append(opts, pd.WithBuckets())
|
||||
for {
|
||||
if backoffErr != nil {
|
||||
err := bo.Backoff(retry.BoPDRPC, backoffErr)
|
||||
|
|
@ -1652,9 +1707,9 @@ func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool)
|
|||
var reg *pd.Region
|
||||
var err error
|
||||
if searchPrev {
|
||||
reg, err = c.pdClient.GetPrevRegion(ctx, key, pd.WithBuckets())
|
||||
reg, err = c.pdClient.GetPrevRegion(ctx, key, opts...)
|
||||
} else {
|
||||
reg, err = c.pdClient.GetRegion(ctx, key, pd.WithBuckets())
|
||||
reg, err = c.pdClient.GetRegion(ctx, key, opts...)
|
||||
}
|
||||
metrics.LoadRegionCacheHistogramWhenCacheMiss.Observe(time.Since(start).Seconds())
|
||||
if err != nil {
|
||||
|
|
@ -1806,7 +1861,7 @@ func (c *RegionCache) scanRegions(bo *retry.Backoffer, startKey, endKey []byte,
|
|||
}
|
||||
}
|
||||
start := time.Now()
|
||||
regionsInfo, err := c.pdClient.ScanRegions(ctx, startKey, endKey, limit)
|
||||
regionsInfo, err := c.pdClient.ScanRegions(ctx, startKey, endKey, limit, pd.WithAllowFollowerHandle())
|
||||
metrics.LoadRegionCacheHistogramWithRegions.Observe(time.Since(start).Seconds())
|
||||
if err != nil {
|
||||
if apicodec.IsDecodeError(err) {
|
||||
|
|
@ -2049,7 +2104,7 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *retry.Backoffer, ctx *RPCContext
|
|||
|
||||
c.mu.Lock()
|
||||
for _, region := range newRegions {
|
||||
c.insertRegionToCache(region, true)
|
||||
c.insertRegionToCache(region, true, true)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
|
|
@ -2167,7 +2222,7 @@ func (c *RegionCache) UpdateBucketsIfNeeded(regionID RegionVerID, latestBucketsV
|
|||
return
|
||||
}
|
||||
c.mu.Lock()
|
||||
c.insertRegionToCache(new, true)
|
||||
c.insertRegionToCache(new, true, true)
|
||||
c.mu.Unlock()
|
||||
}()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -55,6 +55,7 @@ import (
|
|||
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
|
||||
"github.com/tikv/client-go/v2/kv"
|
||||
pd "github.com/tikv/pd/client"
|
||||
uatomic "go.uber.org/atomic"
|
||||
)
|
||||
|
||||
func TestRegionCache(t *testing.T) {
|
||||
|
|
@ -1017,7 +1018,7 @@ func (s *testRegionCacheSuite) TestRegionEpochAheadOfTiKV() {
|
|||
region := createSampleRegion([]byte("k1"), []byte("k2"))
|
||||
region.meta.Id = 1
|
||||
region.meta.RegionEpoch = &metapb.RegionEpoch{Version: 10, ConfVer: 10}
|
||||
cache.insertRegionToCache(region, true)
|
||||
cache.insertRegionToCache(region, true, true)
|
||||
|
||||
r1 := metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 9, ConfVer: 10}}
|
||||
r2 := metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 10, ConfVer: 9}}
|
||||
|
|
@ -1308,7 +1309,7 @@ func (s *testRegionCacheSuite) TestPeersLenChange() {
|
|||
filterUnavailablePeers(cpRegion)
|
||||
region, err := newRegion(s.bo, s.cache, cpRegion)
|
||||
s.Nil(err)
|
||||
s.cache.insertRegionToCache(region, true)
|
||||
s.cache.insertRegionToCache(region, true, true)
|
||||
|
||||
// OnSendFail should not panic
|
||||
s.cache.OnSendFail(retry.NewNoopBackoff(context.Background()), ctx, false, errors.New("send fail"))
|
||||
|
|
@ -1344,7 +1345,7 @@ func (s *testRegionCacheSuite) TestPeersLenChangedByWitness() {
|
|||
cpRegion := &pd.Region{Meta: cpMeta}
|
||||
region, err := newRegion(s.bo, s.cache, cpRegion)
|
||||
s.Nil(err)
|
||||
s.cache.insertRegionToCache(region, true)
|
||||
s.cache.insertRegionToCache(region, true, true)
|
||||
|
||||
// OnSendFail should not panic
|
||||
s.cache.OnSendFail(retry.NewNoopBackoff(context.Background()), ctx, false, errors.New("send fail"))
|
||||
|
|
@ -1517,12 +1518,12 @@ func (s *testRegionCacheSuite) TestBuckets() {
|
|||
fakeRegion.setStore(cachedRegion.getStore().clone())
|
||||
// no buckets
|
||||
fakeRegion.getStore().buckets = nil
|
||||
s.cache.insertRegionToCache(fakeRegion, true)
|
||||
s.cache.insertRegionToCache(fakeRegion, true, true)
|
||||
cachedRegion = s.getRegion([]byte("a"))
|
||||
s.Equal(defaultBuckets, cachedRegion.getStore().buckets)
|
||||
// stale buckets
|
||||
fakeRegion.getStore().buckets = &metapb.Buckets{Version: defaultBuckets.Version - 1}
|
||||
s.cache.insertRegionToCache(fakeRegion, true)
|
||||
s.cache.insertRegionToCache(fakeRegion, true, true)
|
||||
cachedRegion = s.getRegion([]byte("a"))
|
||||
s.Equal(defaultBuckets, cachedRegion.getStore().buckets)
|
||||
// new buckets
|
||||
|
|
@ -1532,7 +1533,7 @@ func (s *testRegionCacheSuite) TestBuckets() {
|
|||
Keys: buckets.Keys,
|
||||
}
|
||||
fakeRegion.getStore().buckets = newBuckets
|
||||
s.cache.insertRegionToCache(fakeRegion, true)
|
||||
s.cache.insertRegionToCache(fakeRegion, true, true)
|
||||
cachedRegion = s.getRegion([]byte("a"))
|
||||
s.Equal(newBuckets, cachedRegion.getStore().buckets)
|
||||
|
||||
|
|
@ -1567,6 +1568,8 @@ func (s *testRegionCacheSuite) TestBuckets() {
|
|||
|
||||
// update buckets if it's nil.
|
||||
cachedRegion.getStore().buckets = nil
|
||||
// we should replace the version of `cacheRegion` because of stale.
|
||||
s.cluster.PutRegion(r.GetId(), newMeta.RegionEpoch.ConfVer, newMeta.RegionEpoch.Version, []uint64{s.store1, s.store2}, []uint64{s.peer1, s.peer2}, s.peer1)
|
||||
s.cluster.SplitRegionBuckets(cachedRegion.GetID(), defaultBuckets.Keys, defaultBuckets.Version)
|
||||
s.cache.UpdateBucketsIfNeeded(cachedRegion.VerID(), defaultBuckets.GetVersion())
|
||||
waitUpdateBuckets(defaultBuckets, []byte("a"))
|
||||
|
|
@ -1665,7 +1668,7 @@ func (s *testRegionCacheSuite) TestRemoveIntersectingRegions() {
|
|||
region, err := s.cache.loadRegion(s.bo, []byte("c"), false)
|
||||
s.Nil(err)
|
||||
s.Equal(region.GetID(), regions[0])
|
||||
s.cache.insertRegionToCache(region, true)
|
||||
s.cache.insertRegionToCache(region, true, true)
|
||||
loc, err = s.cache.LocateKey(s.bo, []byte{'c'})
|
||||
s.Nil(err)
|
||||
s.Equal(loc.Region.GetID(), regions[0])
|
||||
|
|
@ -1676,7 +1679,7 @@ func (s *testRegionCacheSuite) TestRemoveIntersectingRegions() {
|
|||
region, err = s.cache.loadRegion(s.bo, []byte("e"), false)
|
||||
s.Nil(err)
|
||||
s.Equal(region.GetID(), regions[0])
|
||||
s.cache.insertRegionToCache(region, true)
|
||||
s.cache.insertRegionToCache(region, true, true)
|
||||
loc, err = s.cache.LocateKey(s.bo, []byte{'e'})
|
||||
s.Nil(err)
|
||||
s.Equal(loc.Region.GetID(), regions[0])
|
||||
|
|
@ -1799,7 +1802,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestRefreshCache() {
|
|||
v2 := region.Region.confVer + 1
|
||||
r2 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v2}, StartKey: []byte{1}}
|
||||
st := &Store{storeID: s.store}
|
||||
s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true)
|
||||
s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true, true)
|
||||
|
||||
r, _ = s.cache.scanRegionsFromCache(s.bo, []byte{}, nil, 10)
|
||||
s.Equal(len(r), 2)
|
||||
|
|
@ -1833,3 +1836,235 @@ func (s *testRegionRequestToSingleStoreSuite) TestRefreshCacheConcurrency() {
|
|||
|
||||
cancel()
|
||||
}
|
||||
|
||||
func TestRegionCacheWithDelay(t *testing.T) {
|
||||
suite.Run(t, new(testRegionCacheWithDelaySuite))
|
||||
}
|
||||
|
||||
type testRegionCacheWithDelaySuite struct {
|
||||
suite.Suite
|
||||
mvccStore mocktikv.MVCCStore
|
||||
cluster *mocktikv.Cluster
|
||||
store uint64 // store1 is leader
|
||||
region1 uint64
|
||||
bo *retry.Backoffer
|
||||
|
||||
delay uatomic.Bool
|
||||
delayCache *RegionCache
|
||||
cache *RegionCache
|
||||
}
|
||||
|
||||
func (s *testRegionCacheWithDelaySuite) SetupTest() {
|
||||
s.mvccStore = mocktikv.MustNewMVCCStore()
|
||||
s.cluster = mocktikv.NewCluster(s.mvccStore)
|
||||
storeIDs, _, regionID, _ := mocktikv.BootstrapWithMultiStores(s.cluster, 1)
|
||||
s.region1 = regionID
|
||||
s.store = storeIDs[0]
|
||||
pdCli := &CodecPDClient{mocktikv.NewPDClient(s.cluster), apicodec.NewCodecV1(apicodec.ModeTxn)}
|
||||
s.cache = NewRegionCache(pdCli)
|
||||
pdCli2 := &CodecPDClient{mocktikv.NewPDClient(s.cluster, mocktikv.WithDelay(&s.delay)), apicodec.NewCodecV1(apicodec.ModeTxn)}
|
||||
s.delayCache = NewRegionCache(pdCli2)
|
||||
s.bo = retry.NewBackofferWithVars(context.Background(), 5000, nil)
|
||||
}
|
||||
|
||||
func (s *testRegionCacheWithDelaySuite) TearDownTest() {
|
||||
s.cache.Close()
|
||||
s.delayCache.Close()
|
||||
s.mvccStore.Close()
|
||||
}
|
||||
|
||||
func (s *testRegionCacheWithDelaySuite) TestInsertStaleRegion() {
|
||||
r, err := s.cache.findRegionByKey(s.bo, []byte("a"), false)
|
||||
s.NoError(err)
|
||||
fakeRegion := &Region{
|
||||
meta: r.meta,
|
||||
syncFlag: r.syncFlag,
|
||||
lastAccess: r.lastAccess,
|
||||
invalidReason: r.invalidReason,
|
||||
}
|
||||
fakeRegion.setStore(r.getStore().clone())
|
||||
keya := mocktikv.NewMvccKey([]byte("a"))
|
||||
keyb := mocktikv.NewMvccKey([]byte("b"))
|
||||
keyc := mocktikv.NewMvccKey([]byte("c"))
|
||||
newRegionID := s.cluster.AllocID()
|
||||
newPeersIDs := s.cluster.AllocIDs(1)
|
||||
s.cluster.Split(r.GetID(), newRegionID, []byte("b"), newPeersIDs, newPeersIDs[0])
|
||||
newPeersIDs = s.cluster.AllocIDs(1)
|
||||
s.cluster.Split(newRegionID, s.cluster.AllocID(), []byte("c"), newPeersIDs, newPeersIDs[0])
|
||||
|
||||
r.invalidate(Other)
|
||||
r2, err := s.cache.findRegionByKey(s.bo, keyc, false)
|
||||
s.NoError(err)
|
||||
s.Equal([]byte("c"), r2.StartKey())
|
||||
r2, err = s.cache.findRegionByKey(s.bo, keyb, false)
|
||||
s.NoError(err)
|
||||
s.Equal([]byte("b"), r2.StartKey())
|
||||
ra, err := s.cache.loadRegion(s.bo, keya, false)
|
||||
s.NoError(err)
|
||||
s.cache.mu.Lock()
|
||||
stale := s.cache.insertRegionToCache(ra, true, true)
|
||||
s.cache.mu.Unlock()
|
||||
s.True(stale)
|
||||
|
||||
stale = !s.cache.insertRegionToCache(fakeRegion, true, true)
|
||||
s.True(stale)
|
||||
|
||||
rs, err := s.cache.scanRegionsFromCache(s.bo, []byte(""), []byte(""), 100)
|
||||
s.NoError(err)
|
||||
s.Greater(len(rs), 1)
|
||||
s.NotEqual(rs[0].EndKey(), "")
|
||||
|
||||
r3, err := s.cache.findRegionByKey(s.bo, []byte("a"), false)
|
||||
s.NoError(err)
|
||||
s.Equal([]byte("b"), r3.EndKey())
|
||||
}
|
||||
|
||||
func (s *testRegionCacheWithDelaySuite) TestStaleGetRegion() {
|
||||
r1, err := s.cache.findRegionByKey(s.bo, []byte("a"), false)
|
||||
s.NoError(err)
|
||||
r2, err := s.delayCache.findRegionByKey(s.bo, []byte("a"), false)
|
||||
s.NoError(err)
|
||||
s.Equal(r1.meta, r2.meta)
|
||||
|
||||
// simulates network delay
|
||||
s.delay.Store(true)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
r2.invalidate(Other)
|
||||
_, err := s.delayCache.findRegionByKey(s.bo, []byte("b"), false)
|
||||
s.NoError(err)
|
||||
wg.Done()
|
||||
}()
|
||||
time.Sleep(30 * time.Millisecond)
|
||||
newPeersIDs := s.cluster.AllocIDs(1)
|
||||
s.cluster.Split(r1.GetID(), s.cluster.AllocID(), []byte("b"), newPeersIDs, newPeersIDs[0])
|
||||
r1.invalidate(Other)
|
||||
r, err := s.cache.findRegionByKey(s.bo, []byte("b"), false)
|
||||
s.NoError(err)
|
||||
s.Equal([]byte("b"), r.meta.StartKey)
|
||||
r, err = s.cache.findRegionByKey(s.bo, []byte("c"), false)
|
||||
s.NoError(err)
|
||||
s.Equal([]byte("b"), r.meta.StartKey)
|
||||
|
||||
s.delay.Store(false)
|
||||
r, err = s.delayCache.findRegionByKey(s.bo, []byte("b"), false)
|
||||
s.NoError(err)
|
||||
s.Equal([]byte("b"), r.meta.StartKey)
|
||||
wg.Wait()
|
||||
// the delay response is received, but insert failed.
|
||||
r, err = s.delayCache.findRegionByKey(s.bo, []byte("b"), false)
|
||||
s.NoError(err)
|
||||
s.Equal([]byte("b"), r.meta.StartKey)
|
||||
r, err = s.delayCache.findRegionByKey(s.bo, []byte("a"), false)
|
||||
s.NoError(err)
|
||||
s.Equal([]byte("b"), r.meta.EndKey)
|
||||
}
|
||||
|
||||
func (s *testRegionCacheWithDelaySuite) TestFollowerGetStaleRegion() {
|
||||
var delay uatomic.Bool
|
||||
pdCli3 := &CodecPDClient{mocktikv.NewPDClient(s.cluster, mocktikv.WithDelay(&delay)), apicodec.NewCodecV1(apicodec.ModeTxn)}
|
||||
followerDelayCache := NewRegionCache(pdCli3)
|
||||
|
||||
delay.Store(true)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
var final *Region
|
||||
go func() {
|
||||
var err error
|
||||
// followerDelayCache is empty now, so it will go follower.
|
||||
final, err = followerDelayCache.findRegionByKey(s.bo, []byte("z"), false)
|
||||
s.NoError(err)
|
||||
wg.Done()
|
||||
}()
|
||||
time.Sleep(30 * time.Millisecond)
|
||||
delay.Store(false)
|
||||
r, err := followerDelayCache.findRegionByKey(s.bo, []byte("y"), false)
|
||||
s.NoError(err)
|
||||
newPeersIDs := s.cluster.AllocIDs(1)
|
||||
s.cluster.Split(r.GetID(), s.cluster.AllocID(), []byte("z"), newPeersIDs, newPeersIDs[0])
|
||||
r.invalidate(Other)
|
||||
r, err = followerDelayCache.findRegionByKey(s.bo, []byte("y"), false)
|
||||
s.NoError(err)
|
||||
s.Equal([]byte("z"), r.meta.EndKey)
|
||||
|
||||
// no need to retry because
|
||||
wg.Wait()
|
||||
s.Equal([]byte("z"), final.meta.StartKey)
|
||||
|
||||
followerDelayCache.Close()
|
||||
}
|
||||
|
||||
func generateKeyForSimulator(id int, keyLen int) []byte {
|
||||
k := make([]byte, keyLen)
|
||||
copy(k, fmt.Sprintf("%010d", id))
|
||||
return k
|
||||
}
|
||||
|
||||
func BenchmarkInsertRegionToCache(b *testing.B) {
|
||||
b.StopTimer()
|
||||
cache := newTestRegionCache()
|
||||
r := &Region{
|
||||
meta: &metapb.Region{
|
||||
Id: 1,
|
||||
RegionEpoch: &metapb.RegionEpoch{},
|
||||
},
|
||||
}
|
||||
rs := ®ionStore{
|
||||
workTiKVIdx: 0,
|
||||
proxyTiKVIdx: -1,
|
||||
stores: make([]*Store, 0, len(r.meta.Peers)),
|
||||
pendingTiFlashPeerStores: map[uint64]uint64{},
|
||||
storeEpochs: make([]uint32, 0, len(r.meta.Peers)),
|
||||
}
|
||||
r.setStore(rs)
|
||||
b.StartTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
newMeta := proto.Clone(r.meta).(*metapb.Region)
|
||||
newMeta.Id = uint64(i + 1)
|
||||
newMeta.RegionEpoch.ConfVer = uint64(i+1) - uint64(rand.Intn(i+1))
|
||||
newMeta.RegionEpoch.Version = uint64(i+1) - uint64(rand.Intn(i+1))
|
||||
if i%2 == 0 {
|
||||
newMeta.StartKey = generateKeyForSimulator(rand.Intn(i+1), 56)
|
||||
newMeta.EndKey = []byte("")
|
||||
} else {
|
||||
newMeta.EndKey = generateKeyForSimulator(rand.Intn(i+1), 56)
|
||||
newMeta.StartKey = []byte("")
|
||||
}
|
||||
region := &Region{
|
||||
meta: newMeta,
|
||||
}
|
||||
region.setStore(r.getStore())
|
||||
cache.insertRegionToCache(region, true, true)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkInsertRegionToCache2(b *testing.B) {
|
||||
b.StopTimer()
|
||||
cache := newTestRegionCache()
|
||||
r := &Region{
|
||||
meta: &metapb.Region{
|
||||
Id: 1,
|
||||
RegionEpoch: &metapb.RegionEpoch{},
|
||||
},
|
||||
}
|
||||
rs := ®ionStore{
|
||||
workTiKVIdx: 0,
|
||||
proxyTiKVIdx: -1,
|
||||
stores: make([]*Store, 0, len(r.meta.Peers)),
|
||||
pendingTiFlashPeerStores: map[uint64]uint64{},
|
||||
storeEpochs: make([]uint32, 0, len(r.meta.Peers)),
|
||||
}
|
||||
r.setStore(rs)
|
||||
b.StartTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
newMeta := proto.Clone(r.meta).(*metapb.Region)
|
||||
newMeta.RegionEpoch.ConfVer = uint64(i + 1)
|
||||
newMeta.RegionEpoch.Version = uint64(i + 1)
|
||||
region := &Region{
|
||||
meta: newMeta,
|
||||
}
|
||||
region.setStore(r.getStore())
|
||||
cache.insertRegionToCache(region, true, true)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -733,16 +733,19 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
|
|||
leaderUnreachable := leader.store.getLivenessState() != reachable
|
||||
leaderExhausted := state.IsLeaderExhausted(leader)
|
||||
leaderInvalid := leaderEpochStale || leaderUnreachable || leaderExhausted
|
||||
if len(state.option.labels) > 0 {
|
||||
logutil.Logger(bo.GetCtx()).Warn("unable to find stores with given labels",
|
||||
if len(state.option.labels) > 0 && !state.option.leaderOnly {
|
||||
logutil.Logger(bo.GetCtx()).Warn("unable to find a store with given labels",
|
||||
zap.Uint64("region", selector.region.GetID()),
|
||||
zap.Bool("leader-epoch-stale", leaderEpochStale),
|
||||
zap.Bool("leader-unreachable", leaderUnreachable),
|
||||
zap.Bool("leader-exhausted", leaderExhausted),
|
||||
zap.Bool("stale-read", state.isStaleRead),
|
||||
zap.Any("labels", state.option.labels))
|
||||
}
|
||||
if leaderInvalid || leader.deadlineErrUsingConfTimeout {
|
||||
logutil.Logger(bo.GetCtx()).Warn("unable to find valid leader",
|
||||
zap.Uint64("region", selector.region.GetID()),
|
||||
zap.Bool("epoch-stale", leaderEpochStale),
|
||||
zap.Bool("unreachable", leaderUnreachable),
|
||||
zap.Bool("exhausted", leaderExhausted),
|
||||
zap.Bool("kv-timeout", leader.deadlineErrUsingConfTimeout),
|
||||
zap.Bool("stale-read", state.isStaleRead))
|
||||
// In stale-read, the request will fallback to leader after the local follower failure.
|
||||
// If the leader is also unavailable, we can fallback to the follower and use replica-read flag again,
|
||||
// The remote follower not tried yet, and the local follower can retry without stale-read flag.
|
||||
|
|
@ -1474,9 +1477,8 @@ func (s *RegionRequestSender) SendReqCtx(
|
|||
}
|
||||
|
||||
// recheck whether the session/query is killed during the Next()
|
||||
boVars := bo.GetVars()
|
||||
if boVars != nil && boVars.Killed != nil && atomic.LoadUint32(boVars.Killed) == 1 {
|
||||
return nil, nil, retryTimes, errors.WithStack(tikverr.ErrQueryInterrupted)
|
||||
if err2 := bo.CheckKilled(); err2 != nil {
|
||||
return nil, nil, retryTimes, err2
|
||||
}
|
||||
if val, err := util.EvalFailpoint("mockRetrySendReqToRegion"); err == nil {
|
||||
if val.(bool) {
|
||||
|
|
|
|||
|
|
@ -332,7 +332,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestLearnerReplicaSelector() {
|
|||
cache := NewRegionCache(s.cache.pdClient)
|
||||
defer cache.Close()
|
||||
cache.mu.Lock()
|
||||
cache.insertRegionToCache(region, true)
|
||||
cache.insertRegionToCache(region, true, true)
|
||||
cache.mu.Unlock()
|
||||
|
||||
// Test accessFollower state with kv.ReplicaReadLearner request type.
|
||||
|
|
@ -383,7 +383,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
|
|||
cache := NewRegionCache(s.cache.pdClient)
|
||||
defer cache.Close()
|
||||
cache.mu.Lock()
|
||||
cache.insertRegionToCache(region, true)
|
||||
cache.insertRegionToCache(region, true, true)
|
||||
cache.mu.Unlock()
|
||||
|
||||
// Verify creating the replicaSelector.
|
||||
|
|
|
|||
|
|
@ -616,7 +616,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestGetRegionByIDFromCache() {
|
|||
v2 := region.Region.confVer + 1
|
||||
r2 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v2}, StartKey: []byte{1}}
|
||||
st := &Store{storeID: s.store}
|
||||
s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true)
|
||||
s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true, true)
|
||||
region, err = s.cache.LocateRegionByID(s.bo, s.region)
|
||||
s.Nil(err)
|
||||
s.NotNil(region)
|
||||
|
|
@ -626,7 +626,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestGetRegionByIDFromCache() {
|
|||
v3 := region.Region.confVer + 1
|
||||
r3 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: v3, ConfVer: region.Region.confVer}, StartKey: []byte{2}}
|
||||
st = &Store{storeID: s.store}
|
||||
s.cache.insertRegionToCache(&Region{meta: &r3, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true)
|
||||
s.cache.insertRegionToCache(&Region{meta: &r3, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true, true)
|
||||
region, err = s.cache.LocateRegionByID(s.bo, s.region)
|
||||
s.Nil(err)
|
||||
s.NotNil(region)
|
||||
|
|
|
|||
|
|
@ -38,6 +38,8 @@ import (
|
|||
"bytes"
|
||||
|
||||
"github.com/google/btree"
|
||||
"github.com/tikv/client-go/v2/internal/logutil"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// SortedRegions is a sorted btree.
|
||||
|
|
@ -93,23 +95,30 @@ func (s *SortedRegions) AscendGreaterOrEqual(startKey, endKey []byte, limit int)
|
|||
|
||||
// removeIntersecting removes all items that have intersection with the key range of given region.
|
||||
// If the region itself is in the cache, it's not removed.
|
||||
func (s *SortedRegions) removeIntersecting(r *Region) []*btreeItem {
|
||||
func (s *SortedRegions) removeIntersecting(r *Region, verID RegionVerID) ([]*btreeItem, bool) {
|
||||
var deleted []*btreeItem
|
||||
var stale bool
|
||||
s.b.AscendGreaterOrEqual(newBtreeSearchItem(r.StartKey()), func(item *btreeItem) bool {
|
||||
// Skip the item that is equal to the given region.
|
||||
if item.cachedRegion.VerID() == r.VerID() {
|
||||
return true
|
||||
}
|
||||
if len(r.EndKey()) > 0 && bytes.Compare(item.cachedRegion.StartKey(), r.EndKey()) >= 0 {
|
||||
return false
|
||||
}
|
||||
if item.cachedRegion.meta.GetRegionEpoch().GetVersion() > verID.ver {
|
||||
logutil.BgLogger().Debug("get stale region",
|
||||
zap.Uint64("region", verID.GetID()), zap.Uint64("ver", verID.GetVer()), zap.Uint64("conf", verID.GetConfVer()),
|
||||
zap.Uint64("intersecting-ver", item.cachedRegion.meta.GetRegionEpoch().GetVersion()))
|
||||
stale = true
|
||||
return false
|
||||
}
|
||||
deleted = append(deleted, item)
|
||||
return true
|
||||
})
|
||||
if stale {
|
||||
return nil, true
|
||||
}
|
||||
for _, item := range deleted {
|
||||
s.b.Delete(item)
|
||||
}
|
||||
return deleted
|
||||
return deleted, false
|
||||
}
|
||||
|
||||
// Clear removes all items from the btree.
|
||||
|
|
|
|||
|
|
@ -413,6 +413,14 @@ func (c *Cluster) Bootstrap(regionID uint64, storeIDs, peerIDs []uint64, leaderP
|
|||
c.regions[regionID] = newRegion(regionID, storeIDs, peerIDs, leaderPeerID)
|
||||
}
|
||||
|
||||
// PutRegion adds or replaces a region.
|
||||
func (c *Cluster) PutRegion(regionID, confVer, ver uint64, storeIDs, peerIDs []uint64, leaderPeerID uint64) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
c.regions[regionID] = newRegion(regionID, storeIDs, peerIDs, leaderPeerID, confVer, ver)
|
||||
}
|
||||
|
||||
// AddPeer adds a new Peer for the Region on the Store.
|
||||
func (c *Cluster) AddPeer(regionID, storeID, peerID uint64) {
|
||||
c.Lock()
|
||||
|
|
@ -634,7 +642,7 @@ func newPeerMeta(peerID, storeID uint64) *metapb.Peer {
|
|||
}
|
||||
}
|
||||
|
||||
func newRegion(regionID uint64, storeIDs, peerIDs []uint64, leaderPeerID uint64) *Region {
|
||||
func newRegion(regionID uint64, storeIDs, peerIDs []uint64, leaderPeerID uint64, epoch ...uint64) *Region {
|
||||
if len(storeIDs) != len(peerIDs) {
|
||||
panic("len(storeIDs) != len(peerIds)")
|
||||
}
|
||||
|
|
@ -647,6 +655,10 @@ func newRegion(regionID uint64, storeIDs, peerIDs []uint64, leaderPeerID uint64)
|
|||
Peers: peers,
|
||||
RegionEpoch: &metapb.RegionEpoch{},
|
||||
}
|
||||
if len(epoch) == 2 {
|
||||
meta.RegionEpoch.ConfVer = epoch[0]
|
||||
meta.RegionEpoch.Version = epoch[1]
|
||||
}
|
||||
return &Region{
|
||||
Meta: meta,
|
||||
leader: leaderPeerID,
|
||||
|
|
|
|||
|
|
@ -61,6 +61,16 @@ var tsMu = struct {
|
|||
|
||||
const defaultResourceGroupName = "default"
|
||||
|
||||
var _ pd.Client = (*pdClient)(nil)
|
||||
|
||||
type MockPDOption func(*pdClient)
|
||||
|
||||
func WithDelay(delay *atomic.Bool) MockPDOption {
|
||||
return func(pc *pdClient) {
|
||||
pc.delay = delay
|
||||
}
|
||||
}
|
||||
|
||||
type pdClient struct {
|
||||
cluster *Cluster
|
||||
// SafePoint set by `UpdateGCSafePoint`. Not to be confused with SafePointKV.
|
||||
|
|
@ -73,11 +83,13 @@ type pdClient struct {
|
|||
externalTimestamp atomic.Uint64
|
||||
|
||||
groups map[string]*rmpb.ResourceGroup
|
||||
|
||||
delay *atomic.Bool
|
||||
}
|
||||
|
||||
// NewPDClient creates a mock pd.Client that uses local timestamp and meta data
|
||||
// from a Cluster.
|
||||
func NewPDClient(cluster *Cluster) pd.Client {
|
||||
func NewPDClient(cluster *Cluster, ops ...MockPDOption) *pdClient {
|
||||
mockCli := &pdClient{
|
||||
cluster: cluster,
|
||||
serviceSafePoints: make(map[string]uint64),
|
||||
|
|
@ -97,6 +109,9 @@ func NewPDClient(cluster *Cluster) pd.Client {
|
|||
},
|
||||
Priority: 8,
|
||||
}
|
||||
for _, op := range ops {
|
||||
op(mockCli)
|
||||
}
|
||||
return mockCli
|
||||
}
|
||||
|
||||
|
|
@ -206,6 +221,12 @@ func (c *pdClient) GetRegion(ctx context.Context, key []byte, opts ...pd.GetRegi
|
|||
if len(opts) == 0 {
|
||||
buckets = nil
|
||||
}
|
||||
if c.delay != nil && c.delay.Load() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
}
|
||||
}
|
||||
return &pd.Region{Meta: region, Leader: peer, Buckets: buckets, DownPeers: downPeers}, nil
|
||||
}
|
||||
|
||||
|
|
@ -340,11 +361,11 @@ func (c *pdClient) UpdateKeyspaceState(ctx context.Context, id uint32, state key
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
func (c *pdClient) ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) {
|
||||
func (c *pdClient) ListResourceGroups(ctx context.Context, opts ...pd.GetResourceGroupOption) ([]*rmpb.ResourceGroup, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (c *pdClient) GetResourceGroup(ctx context.Context, resourceGroupName string) (*rmpb.ResourceGroup, error) {
|
||||
func (c *pdClient) GetResourceGroup(ctx context.Context, resourceGroupName string, opts ...pd.GetResourceGroupOption) (*rmpb.ResourceGroup, error) {
|
||||
group, ok := c.groups[resourceGroupName]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("the group %s does not exist", resourceGroupName)
|
||||
|
|
|
|||
|
|
@ -44,6 +44,10 @@ type Variables struct {
|
|||
|
||||
// Pointer to SessionVars.Killed
|
||||
// Killed is a flag to indicate that this query is killed.
|
||||
// This is an enum value rather than a boolean. See sqlkiller.go
|
||||
// in TiDB for its definition.
|
||||
// When its value is 0, it's not killed
|
||||
// When its value is not 0, it's killed, the value indicates concrete reason.
|
||||
Killed *uint32
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -191,12 +191,15 @@ func WithPool(gp Pool) Option {
|
|||
}
|
||||
|
||||
// WithPDHTTPClient sets the PD HTTP client with the given PD addresses and options.
|
||||
// Source is to mark where the HTTP client is created, which is used for metrics and logs.
|
||||
func WithPDHTTPClient(
|
||||
source string,
|
||||
pdAddrs []string,
|
||||
opts ...pdhttp.ClientOption,
|
||||
) Option {
|
||||
return func(o *KVStore) {
|
||||
o.pdHttpClient = pdhttp.NewClient(
|
||||
source,
|
||||
pdAddrs,
|
||||
opts...,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -982,6 +982,16 @@ type getRegionError interface {
|
|||
GetRegionError() *errorpb.Error
|
||||
}
|
||||
|
||||
func isResponseOKToNotImplGetRegionError(resp interface{}) bool {
|
||||
switch resp.(type) {
|
||||
case *MPPStreamResponse, *mpp.CancelTaskResponse, *mpp.IsAliveResponse, *mpp.ReportTaskStatusResponse,
|
||||
*mpp.DispatchTaskResponse, *BatchCopStreamResponse, *tikvpb.BatchCommandsEmptyResponse:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// GetRegionError returns the RegionError of the underlying concrete response.
|
||||
func (resp *Response) GetRegionError() (*errorpb.Error, error) {
|
||||
if resp.Resp == nil {
|
||||
|
|
@ -989,7 +999,7 @@ func (resp *Response) GetRegionError() (*errorpb.Error, error) {
|
|||
}
|
||||
err, ok := resp.Resp.(getRegionError)
|
||||
if !ok {
|
||||
if _, isEmpty := resp.Resp.(*tikvpb.BatchCommandsEmptyResponse); isEmpty {
|
||||
if isResponseOKToNotImplGetRegionError(resp.Resp) {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, errors.Errorf("invalid response type %v", resp)
|
||||
|
|
|
|||
|
|
@ -1048,7 +1048,27 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *retry.Backoffer, action
|
|||
}
|
||||
|
||||
// doActionOnBatches does action to batches in parallel.
|
||||
func (c *twoPhaseCommitter) doActionOnBatches(bo *retry.Backoffer, action twoPhaseCommitAction, batches []batchMutations) error {
|
||||
func (c *twoPhaseCommitter) doActionOnBatches(
|
||||
bo *retry.Backoffer, action twoPhaseCommitAction,
|
||||
batches []batchMutations,
|
||||
) error {
|
||||
// killSignal should never be nil for TiDB
|
||||
if c.txn != nil && c.txn.vars != nil && c.txn.vars.Killed != nil {
|
||||
// Do not reset the killed flag here. Let the upper layer reset the flag.
|
||||
// Before it resets, any request is considered valid to be killed.
|
||||
status := atomic.LoadUint32(c.txn.vars.Killed)
|
||||
if status != 0 {
|
||||
logutil.BgLogger().Info(
|
||||
"query is killed", zap.Uint32(
|
||||
"signal",
|
||||
status,
|
||||
),
|
||||
)
|
||||
// TODO: There might be various signals besides a query interruption,
|
||||
// but we are unable to differentiate them, because the definition is in TiDB.
|
||||
return errors.WithStack(tikverr.ErrQueryInterrupted)
|
||||
}
|
||||
}
|
||||
if len(batches) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -214,18 +214,6 @@ func (action actionPessimisticLock) handleSingleBatch(
|
|||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Handle the killed flag when waiting for the pessimistic lock.
|
||||
// When a txn runs into LockKeys() and backoff here, it has no chance to call
|
||||
// executor.Next() and check the killed flag.
|
||||
if action.Killed != nil {
|
||||
// Do not reset the killed flag here!
|
||||
// actionPessimisticLock runs on each region parallelly, we have to consider that
|
||||
// the error may be dropped.
|
||||
if atomic.LoadUint32(action.Killed) == 1 {
|
||||
return errors.WithStack(tikverr.ErrQueryInterrupted)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1111,12 +1111,6 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func()
|
|||
lockCtx.Stats.Mu.BackoffTypes = append(lockCtx.Stats.Mu.BackoffTypes, bo.GetTypes()...)
|
||||
lockCtx.Stats.Mu.Unlock()
|
||||
}
|
||||
if lockCtx.Killed != nil {
|
||||
// If the kill signal is received during waiting for pessimisticLock,
|
||||
// pessimisticLockKeys would handle the error but it doesn't reset the flag.
|
||||
// We need to reset the killed flag here.
|
||||
atomic.CompareAndSwapUint32(lockCtx.Killed, 1, 0)
|
||||
}
|
||||
if txn.IsInAggressiveLockingMode() {
|
||||
if txn.aggressiveLockingContext.maxLockedWithConflictTS < lockCtx.MaxLockedWithConflictTS {
|
||||
txn.aggressiveLockingContext.maxLockedWithConflictTS = lockCtx.MaxLockedWithConflictTS
|
||||
|
|
|
|||
|
|
@ -703,6 +703,16 @@ func NewRUDetails() *RUDetails {
|
|||
}
|
||||
}
|
||||
|
||||
// NewRUDetails creates a new RUDetails with specifical values.
|
||||
// This function is used in tidb's unit test.
|
||||
func NewRUDetailsWith(rru, wru float64, waitDur time.Duration) *RUDetails {
|
||||
return &RUDetails{
|
||||
readRU: uatomic.NewFloat64(rru),
|
||||
writeRU: uatomic.NewFloat64(wru),
|
||||
ruWaitDuration: uatomic.NewDuration(waitDur),
|
||||
}
|
||||
}
|
||||
|
||||
// Clone implements the RuntimeStats interface.
|
||||
func (rd *RUDetails) Clone() *RUDetails {
|
||||
return &RUDetails{
|
||||
|
|
|
|||
Loading…
Reference in New Issue