mirror of https://github.com/tikv/client-go.git
expand min-resolved-ts to support stores
Signed-off-by: husharp <jinhao.hu@pingcap.com>
This commit is contained in:
parent
719e6456f7
commit
c5bf330532
|
|
@ -6,19 +6,19 @@ require (
|
|||
github.com/ninedraft/israce v0.0.3
|
||||
github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32
|
||||
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c
|
||||
github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333
|
||||
github.com/pingcap/tidb v1.1.0-beta.0.20230619015310-8b1006f1af04
|
||||
github.com/pingcap/kvproto v0.0.0-20230728080053-8a9db88bc88a
|
||||
github.com/pingcap/tidb v1.1.0-beta.0.20230801093416-2d23240b376a
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/stretchr/testify v1.8.4
|
||||
github.com/tidwall/gjson v1.14.1
|
||||
github.com/tikv/client-go/v2 v2.0.8-0.20230714052714-85fc8f337565
|
||||
github.com/tikv/client-go/v2 v2.0.8-0.20230731032349-719e6456f7d5
|
||||
github.com/tikv/pd/client v0.0.0-20230724080549-de985b8e0afc
|
||||
go.uber.org/goleak v1.2.1
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect
|
||||
github.com/BurntSushi/toml v1.3.0 // indirect
|
||||
github.com/BurntSushi/toml v1.3.2 // indirect
|
||||
github.com/benbjohnson/clock v1.3.5 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
||||
|
|
@ -65,14 +65,14 @@ require (
|
|||
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 // indirect
|
||||
github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22 // indirect
|
||||
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect
|
||||
github.com/pingcap/tidb/parser v0.0.0-20230619015310-8b1006f1af04 // indirect
|
||||
github.com/pingcap/tidb/parser v0.0.0-20230801093416-2d23240b376a // indirect
|
||||
github.com/pingcap/tipb v0.0.0-20230607071926-bda24015c2d6 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect
|
||||
github.com/prometheus/client_golang v1.16.0 // indirect
|
||||
github.com/prometheus/client_model v0.4.0 // indirect
|
||||
github.com/prometheus/common v0.44.0 // indirect
|
||||
github.com/prometheus/procfs v0.11.0 // indirect
|
||||
github.com/prometheus/procfs v0.11.1 // indirect
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||
github.com/rogpeppe/go-internal v1.10.0 // indirect
|
||||
github.com/sasha-s/go-deadlock v0.2.0 // indirect
|
||||
|
|
@ -112,7 +112,5 @@ require (
|
|||
|
||||
replace (
|
||||
github.com/go-ldap/ldap/v3 => github.com/YangKeao/ldap/v3 v3.4.5-0.20230421065457-369a3bab1117
|
||||
github.com/pingcap/tidb => github.com/HuSharp/tidb v1.1.0-beta.0.20230726045237-a2b0085ad7c5
|
||||
github.com/pingcap/tidb/parser => github.com/HuSharp/tidb/parser v0.0.0-20230726045237-a2b0085ad7c5
|
||||
github.com/tikv/client-go/v2 => ../
|
||||
)
|
||||
|
|
|
|||
|
|
@ -14,16 +14,12 @@ github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 h1:mFRzDkZVAjdal+
|
|||
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358/go.mod h1:chxPXzSsl7ZWRAuOIE23GDNzjWuZquvFlgA8xmpunjU=
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 h1:BWe8a+f/t+7KY7zH2mqygeUD0t8hNFXe08p1Pb3/jKE=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/BurntSushi/toml v1.3.0 h1:Ws8e5YmnrGEHzZEzg0YvK/7COGYtTC5PbaH9oSSbgfA=
|
||||
github.com/BurntSushi/toml v1.3.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
|
||||
github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8=
|
||||
github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
|
||||
github.com/CloudyKit/fastprinter v0.0.0-20170127035650-74b38d55f37a/go.mod h1:EFZQ978U7x8IRnstaskI3IysnWY5Ao3QgZUKOXlsAdw=
|
||||
github.com/CloudyKit/jet v2.1.3-0.20180809161101-62edd43e4f88+incompatible/go.mod h1:HPYO+50pSWkPoj9Q/eq0aRGByCL6ScRlUmiEX5Zgm+w=
|
||||
github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ=
|
||||
github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob0t8PQPMybUNFM=
|
||||
github.com/HuSharp/tidb v1.1.0-beta.0.20230726045237-a2b0085ad7c5 h1:wSOvDYbKkvHjlWWFBihIoeJ5yBc1jZe9Ehkku3Jn8cA=
|
||||
github.com/HuSharp/tidb v1.1.0-beta.0.20230726045237-a2b0085ad7c5/go.mod h1:C3tuWINS2/Vt/gxZ0OLdGI2x5crlN8E3/qNJJkIIkTI=
|
||||
github.com/HuSharp/tidb/parser v0.0.0-20230726045237-a2b0085ad7c5 h1:bxwmPI7ambmbOAaozdYz81HFpIeu6ctWo7TiXfOGE14=
|
||||
github.com/HuSharp/tidb/parser v0.0.0-20230726045237-a2b0085ad7c5/go.mod h1:ENXEsaVS6N3CTMpL4txc6m93y6XaztF9W4SFLjhPWJg=
|
||||
github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKzY=
|
||||
github.com/Joker/jade v1.0.1-0.20190614124447-d475f43051e7/go.mod h1:6E6s8o2AE4KhCrqr6GRJjdC/gNfTdxkIXvuGZZda2VM=
|
||||
github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww=
|
||||
|
|
@ -363,14 +359,18 @@ github.com/pingcap/fn v1.0.0 h1:CyA6AxcOZkQh52wIqYlAmaVmF6EvrcqFywP463pjA8g=
|
|||
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
|
||||
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
|
||||
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
|
||||
github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333 h1:A6Wqgq0uMw51UiRAH27TVN0QlzVR5CVtV6fTQSAmvKM=
|
||||
github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
|
||||
github.com/pingcap/kvproto v0.0.0-20230728080053-8a9db88bc88a h1:5gSwd337GRaT1E0O3y10jb2ZDzv+h30pjCpRcCC5Phg=
|
||||
github.com/pingcap/kvproto v0.0.0-20230728080053-8a9db88bc88a/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
|
||||
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
|
||||
github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
|
||||
github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22 h1:2SOzvGvE8beiC1Y4g9Onkvu6UmuBBOeWRGQEjJaT/JY=
|
||||
github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
|
||||
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I=
|
||||
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM=
|
||||
github.com/pingcap/tidb v1.1.0-beta.0.20230801093416-2d23240b376a h1:JBh4xHmZ6w33+4fFTWZi5J3yPhLnOWChlf0Za875z8Y=
|
||||
github.com/pingcap/tidb v1.1.0-beta.0.20230801093416-2d23240b376a/go.mod h1:KxfAxDSUAAgPyCpaAgQ7qTYaqDonaJA73eyGnUdMlcQ=
|
||||
github.com/pingcap/tidb/parser v0.0.0-20230801093416-2d23240b376a h1:GEx1TEBP1O/mUumHhJodRa2Mk3w6Jtt+yNRq3yAxPjk=
|
||||
github.com/pingcap/tidb/parser v0.0.0-20230801093416-2d23240b376a/go.mod h1:ENXEsaVS6N3CTMpL4txc6m93y6XaztF9W4SFLjhPWJg=
|
||||
github.com/pingcap/tipb v0.0.0-20230607071926-bda24015c2d6 h1:D79RE4RVhq2ic8sqDSv7QdL0tT5aZV3CaCXUAT41iWc=
|
||||
github.com/pingcap/tipb v0.0.0-20230607071926-bda24015c2d6/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
|
||||
github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 h1:Qj1ukM4GlMWXNdMBuXcXfz/Kw9s1qm0CLY32QxuSImI=
|
||||
|
|
@ -407,8 +407,8 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R
|
|||
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
|
||||
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
|
||||
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
|
||||
github.com/prometheus/procfs v0.11.0 h1:5EAgkfkMl659uZPbe9AS2N68a7Cc1TJbPEuGzFuRbyk=
|
||||
github.com/prometheus/procfs v0.11.0/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM=
|
||||
github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI=
|
||||
github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY=
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
|
||||
github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis=
|
||||
|
|
@ -437,7 +437,7 @@ github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd h1:ug7PpSOB5RBPK1K
|
|||
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
|
||||
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
|
||||
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
|
||||
github.com/sirupsen/logrus v1.9.2 h1:oxx1eChJGI6Uks2ZC4W1zpLlVgqB8ner4EuQwV4Ik1Y=
|
||||
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
|
||||
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
|
||||
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
|
||||
github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js=
|
||||
|
|
|
|||
|
|
@ -72,8 +72,11 @@ func (s *apiTestSuite) SetupTest() {
|
|||
// Set PD HTTP client.
|
||||
store, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0, tikv.WithPDHTTPClient(nil, addrs))
|
||||
s.store = store
|
||||
storeID := uint64(1)
|
||||
// Need to start cluster at least with 3 stores for test.
|
||||
for i := 1; i <= 3; i++ {
|
||||
storeID := uint64(i)
|
||||
s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, nil)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *apiTestSuite) storeAddr(id uint64) string {
|
||||
|
|
@ -120,7 +123,7 @@ func (s *apiTestSuite) TestGetStoreMinResolvedTS() {
|
|||
}
|
||||
retryCount++
|
||||
}
|
||||
require.Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0))
|
||||
require.Equal(int32(0), atomic.LoadInt32(&mockClient.requestCount))
|
||||
require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
|
||||
require.Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
|
||||
|
||||
|
|
|
|||
26
tikv/kv.go
26
tikv/kv.go
|
|
@ -584,6 +584,23 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
|
|||
tikvClient := s.GetTiKVClient()
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(len(stores))
|
||||
// Try to get the minimum resolved timestamp of the store from PD.
|
||||
var (
|
||||
err error
|
||||
)
|
||||
var storeIDs []string
|
||||
for _, store := range stores {
|
||||
storeIDs = append(storeIDs, strconv.FormatUint(store.StoreID(), 10))
|
||||
}
|
||||
StoreMinResolvedTSs := make(map[uint64]uint64)
|
||||
if s.pdHttpClient != nil {
|
||||
StoreMinResolvedTSs, err = s.pdHttpClient.GetMinResolvedTSByStoresIDs(ctx, storeIDs)
|
||||
if err != nil {
|
||||
// If getting the minimum resolved timestamp from PD failed, log the error and need to get it from TiKV.
|
||||
logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err), zap.Any("stores", storeIDs))
|
||||
}
|
||||
}
|
||||
|
||||
for _, store := range stores {
|
||||
storeID := store.StoreID()
|
||||
storeAddr := store.GetAddr()
|
||||
|
|
@ -595,16 +612,11 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
|
|||
|
||||
var (
|
||||
safeTS uint64
|
||||
err error
|
||||
)
|
||||
storeIDStr := strconv.Itoa(int(storeID))
|
||||
// Try to get the minimum resolved timestamp of the store from PD.
|
||||
if s.pdHttpClient != nil {
|
||||
safeTS, err = s.pdHttpClient.GetStoreMinResolvedTS(ctx, storeID)
|
||||
if err != nil {
|
||||
logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err), zap.Uint64("store-id", storeID))
|
||||
}
|
||||
safeTS = StoreMinResolvedTSs[storeID]
|
||||
}
|
||||
storeIDStr := strconv.FormatUint(storeID, 10)
|
||||
// If getting the minimum resolved timestamp from PD failed or returned 0, try to get it from TiKV.
|
||||
if safeTS == 0 || err != nil {
|
||||
resp, err := tikvClient.SendRequest(
|
||||
|
|
|
|||
36
util/pd.go
36
util/pd.go
|
|
@ -35,6 +35,7 @@
|
|||
package util
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
|
|
@ -56,7 +57,7 @@ const (
|
|||
// pd request retry time when connection fail.
|
||||
pdRequestRetryTime = 10
|
||||
|
||||
storeMinResolvedTSPrefix = "pd/api/v1/min-resolved-ts"
|
||||
minResolvedTSPrefix = "pd/api/v1/min-resolved-ts"
|
||||
)
|
||||
|
||||
// PDHTTPClient is an HTTP client of pd.
|
||||
|
|
@ -86,45 +87,52 @@ func NewPDHTTPClient(
|
|||
}
|
||||
}
|
||||
|
||||
// GetStoreMinResolvedTS get store-level min-resolved-ts from pd.
|
||||
func (p *PDHTTPClient) GetStoreMinResolvedTS(ctx context.Context, storeID uint64) (uint64, error) {
|
||||
// GetMinResolvedTSByStoresIDs get min-resolved-ts from pd by stores ids.
|
||||
func (p *PDHTTPClient) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []string) (map[uint64]uint64, error) {
|
||||
var err error
|
||||
for _, addr := range p.addrs {
|
||||
query := fmt.Sprintf("%s/%d", storeMinResolvedTSPrefix, storeID)
|
||||
v, e := pdRequest(ctx, addr, query, p.cli, http.MethodGet, nil)
|
||||
data, err := json.Marshal(storeIDs)
|
||||
if err != nil {
|
||||
logutil.BgLogger().Debug("failed to marshal store ids", zap.String("addr", addr), zap.Error(err))
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
v, e := pdRequest(ctx, addr, minResolvedTSPrefix, p.cli, http.MethodGet, bytes.NewBuffer(data))
|
||||
if e != nil {
|
||||
logutil.BgLogger().Debug("failed to get min resolved ts", zap.String("addr", addr), zap.Error(e))
|
||||
err = e
|
||||
continue
|
||||
}
|
||||
logutil.BgLogger().Debug("store min resolved ts", zap.String("resp", string(v)))
|
||||
logutil.BgLogger().Debug("min resolved ts", zap.String("resp", string(v)))
|
||||
d := struct {
|
||||
IsRealTime bool `json:"is_real_time,omitempty"`
|
||||
MinResolvedTS uint64 `json:"min_resolved_ts"`
|
||||
StoreMinResolvedTS map[uint64]uint64 `json:"store_min_resolved_ts"`
|
||||
}{}
|
||||
err = json.Unmarshal(v, &d)
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
if !d.IsRealTime {
|
||||
message := fmt.Errorf("store min resolved ts not enabled, addr: %s", addr)
|
||||
message := fmt.Errorf("min resolved ts not enabled, addr: %s", addr)
|
||||
logutil.BgLogger().Debug(message.Error())
|
||||
return 0, errors.Trace(message)
|
||||
return nil, errors.Trace(message)
|
||||
}
|
||||
if val, e := EvalFailpoint("InjectMinResolvedTS"); e == nil {
|
||||
// Need to make sure successfully get from real pd.
|
||||
if d.MinResolvedTS != 0 {
|
||||
for storeID, v := range d.StoreMinResolvedTS {
|
||||
if v != 0 {
|
||||
// Should be val.(uint64) but failpoint doesn't support that.
|
||||
if tmp, ok := val.(int); ok {
|
||||
d.MinResolvedTS = uint64(tmp)
|
||||
d.StoreMinResolvedTS[storeID] = uint64(tmp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return d.MinResolvedTS, nil
|
||||
}
|
||||
|
||||
return 0, errors.Trace(err)
|
||||
return d.StoreMinResolvedTS, nil
|
||||
}
|
||||
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
// pdRequest is a func to send an HTTP to pd and return the result bytes.
|
||||
|
|
|
|||
Loading…
Reference in New Issue