diff --git a/integration_tests/go.mod b/integration_tests/go.mod index 97636a2f..b89e1af2 100644 --- a/integration_tests/go.mod +++ b/integration_tests/go.mod @@ -6,19 +6,19 @@ require ( github.com/ninedraft/israce v0.0.3 github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32 github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c - github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333 - github.com/pingcap/tidb v1.1.0-beta.0.20230619015310-8b1006f1af04 + github.com/pingcap/kvproto v0.0.0-20230728080053-8a9db88bc88a + github.com/pingcap/tidb v1.1.0-beta.0.20230801093416-2d23240b376a github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.8.4 github.com/tidwall/gjson v1.14.1 - github.com/tikv/client-go/v2 v2.0.8-0.20230714052714-85fc8f337565 + github.com/tikv/client-go/v2 v2.0.8-0.20230731032349-719e6456f7d5 github.com/tikv/pd/client v0.0.0-20230724080549-de985b8e0afc go.uber.org/goleak v1.2.1 ) require ( github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect - github.com/BurntSushi/toml v1.3.0 // indirect + github.com/BurntSushi/toml v1.3.2 // indirect github.com/benbjohnson/clock v1.3.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect @@ -65,14 +65,14 @@ require ( github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 // indirect github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22 // indirect github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect - github.com/pingcap/tidb/parser v0.0.0-20230619015310-8b1006f1af04 // indirect + github.com/pingcap/tidb/parser v0.0.0-20230801093416-2d23240b376a // indirect github.com/pingcap/tipb v0.0.0-20230607071926-bda24015c2d6 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect github.com/prometheus/client_golang v1.16.0 // indirect github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common v0.44.0 // indirect - github.com/prometheus/procfs v0.11.0 // indirect + github.com/prometheus/procfs v0.11.1 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/rogpeppe/go-internal v1.10.0 // indirect github.com/sasha-s/go-deadlock v0.2.0 // indirect @@ -112,7 +112,5 @@ require ( replace ( github.com/go-ldap/ldap/v3 => github.com/YangKeao/ldap/v3 v3.4.5-0.20230421065457-369a3bab1117 - github.com/pingcap/tidb => github.com/HuSharp/tidb v1.1.0-beta.0.20230726045237-a2b0085ad7c5 - github.com/pingcap/tidb/parser => github.com/HuSharp/tidb/parser v0.0.0-20230726045237-a2b0085ad7c5 github.com/tikv/client-go/v2 => ../ ) diff --git a/integration_tests/go.sum b/integration_tests/go.sum index ac10fa09..96255c9e 100644 --- a/integration_tests/go.sum +++ b/integration_tests/go.sum @@ -14,16 +14,12 @@ github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 h1:mFRzDkZVAjdal+ github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358/go.mod h1:chxPXzSsl7ZWRAuOIE23GDNzjWuZquvFlgA8xmpunjU= github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 h1:BWe8a+f/t+7KY7zH2mqygeUD0t8hNFXe08p1Pb3/jKE= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/BurntSushi/toml v1.3.0 h1:Ws8e5YmnrGEHzZEzg0YvK/7COGYtTC5PbaH9oSSbgfA= -github.com/BurntSushi/toml v1.3.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= +github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8= +github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/CloudyKit/fastprinter v0.0.0-20170127035650-74b38d55f37a/go.mod h1:EFZQ978U7x8IRnstaskI3IysnWY5Ao3QgZUKOXlsAdw= github.com/CloudyKit/jet v2.1.3-0.20180809161101-62edd43e4f88+incompatible/go.mod h1:HPYO+50pSWkPoj9Q/eq0aRGByCL6ScRlUmiEX5Zgm+w= github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ= github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob0t8PQPMybUNFM= -github.com/HuSharp/tidb v1.1.0-beta.0.20230726045237-a2b0085ad7c5 h1:wSOvDYbKkvHjlWWFBihIoeJ5yBc1jZe9Ehkku3Jn8cA= -github.com/HuSharp/tidb v1.1.0-beta.0.20230726045237-a2b0085ad7c5/go.mod h1:C3tuWINS2/Vt/gxZ0OLdGI2x5crlN8E3/qNJJkIIkTI= -github.com/HuSharp/tidb/parser v0.0.0-20230726045237-a2b0085ad7c5 h1:bxwmPI7ambmbOAaozdYz81HFpIeu6ctWo7TiXfOGE14= -github.com/HuSharp/tidb/parser v0.0.0-20230726045237-a2b0085ad7c5/go.mod h1:ENXEsaVS6N3CTMpL4txc6m93y6XaztF9W4SFLjhPWJg= github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKzY= github.com/Joker/jade v1.0.1-0.20190614124447-d475f43051e7/go.mod h1:6E6s8o2AE4KhCrqr6GRJjdC/gNfTdxkIXvuGZZda2VM= github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww= @@ -363,14 +359,18 @@ github.com/pingcap/fn v1.0.0 h1:CyA6AxcOZkQh52wIqYlAmaVmF6EvrcqFywP463pjA8g= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333 h1:A6Wqgq0uMw51UiRAH27TVN0QlzVR5CVtV6fTQSAmvKM= -github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20230728080053-8a9db88bc88a h1:5gSwd337GRaT1E0O3y10jb2ZDzv+h30pjCpRcCC5Phg= +github.com/pingcap/kvproto v0.0.0-20230728080053-8a9db88bc88a/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22 h1:2SOzvGvE8beiC1Y4g9Onkvu6UmuBBOeWRGQEjJaT/JY= github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= +github.com/pingcap/tidb v1.1.0-beta.0.20230801093416-2d23240b376a h1:JBh4xHmZ6w33+4fFTWZi5J3yPhLnOWChlf0Za875z8Y= +github.com/pingcap/tidb v1.1.0-beta.0.20230801093416-2d23240b376a/go.mod h1:KxfAxDSUAAgPyCpaAgQ7qTYaqDonaJA73eyGnUdMlcQ= +github.com/pingcap/tidb/parser v0.0.0-20230801093416-2d23240b376a h1:GEx1TEBP1O/mUumHhJodRa2Mk3w6Jtt+yNRq3yAxPjk= +github.com/pingcap/tidb/parser v0.0.0-20230801093416-2d23240b376a/go.mod h1:ENXEsaVS6N3CTMpL4txc6m93y6XaztF9W4SFLjhPWJg= github.com/pingcap/tipb v0.0.0-20230607071926-bda24015c2d6 h1:D79RE4RVhq2ic8sqDSv7QdL0tT5aZV3CaCXUAT41iWc= github.com/pingcap/tipb v0.0.0-20230607071926-bda24015c2d6/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 h1:Qj1ukM4GlMWXNdMBuXcXfz/Kw9s1qm0CLY32QxuSImI= @@ -407,8 +407,8 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= -github.com/prometheus/procfs v0.11.0 h1:5EAgkfkMl659uZPbe9AS2N68a7Cc1TJbPEuGzFuRbyk= -github.com/prometheus/procfs v0.11.0/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= +github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI= +github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= @@ -437,7 +437,7 @@ github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd h1:ug7PpSOB5RBPK1K github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= -github.com/sirupsen/logrus v1.9.2 h1:oxx1eChJGI6Uks2ZC4W1zpLlVgqB8ner4EuQwV4Ik1Y= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js= diff --git a/integration_tests/pd_api_test.go b/integration_tests/pd_api_test.go index aa5d3855..31dd942c 100644 --- a/integration_tests/pd_api_test.go +++ b/integration_tests/pd_api_test.go @@ -72,8 +72,11 @@ func (s *apiTestSuite) SetupTest() { // Set PD HTTP client. store, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0, tikv.WithPDHTTPClient(nil, addrs)) s.store = store - storeID := uint64(1) - s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, nil) + // Need to start cluster at least with 3 stores for test. + for i := 1; i <= 3; i++ { + storeID := uint64(i) + s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, nil) + } } func (s *apiTestSuite) storeAddr(id uint64) string { @@ -120,7 +123,7 @@ func (s *apiTestSuite) TestGetStoreMinResolvedTS() { } retryCount++ } - require.Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0)) + require.Equal(int32(0), atomic.LoadInt32(&mockClient.requestCount)) require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) require.Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS")) diff --git a/tikv/kv.go b/tikv/kv.go index 3e975361..752e70bb 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -584,6 +584,23 @@ func (s *KVStore) updateSafeTS(ctx context.Context) { tikvClient := s.GetTiKVClient() wg := &sync.WaitGroup{} wg.Add(len(stores)) + // Try to get the minimum resolved timestamp of the store from PD. + var ( + err error + ) + var storeIDs []string + for _, store := range stores { + storeIDs = append(storeIDs, strconv.FormatUint(store.StoreID(), 10)) + } + StoreMinResolvedTSs := make(map[uint64]uint64) + if s.pdHttpClient != nil { + StoreMinResolvedTSs, err = s.pdHttpClient.GetMinResolvedTSByStoresIDs(ctx, storeIDs) + if err != nil { + // If getting the minimum resolved timestamp from PD failed, log the error and need to get it from TiKV. + logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err), zap.Any("stores", storeIDs)) + } + } + for _, store := range stores { storeID := store.StoreID() storeAddr := store.GetAddr() @@ -595,16 +612,11 @@ func (s *KVStore) updateSafeTS(ctx context.Context) { var ( safeTS uint64 - err error ) - storeIDStr := strconv.Itoa(int(storeID)) - // Try to get the minimum resolved timestamp of the store from PD. if s.pdHttpClient != nil { - safeTS, err = s.pdHttpClient.GetStoreMinResolvedTS(ctx, storeID) - if err != nil { - logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err), zap.Uint64("store-id", storeID)) - } + safeTS = StoreMinResolvedTSs[storeID] } + storeIDStr := strconv.FormatUint(storeID, 10) // If getting the minimum resolved timestamp from PD failed or returned 0, try to get it from TiKV. if safeTS == 0 || err != nil { resp, err := tikvClient.SendRequest( diff --git a/util/pd.go b/util/pd.go index b4c405c1..9ac0ea28 100644 --- a/util/pd.go +++ b/util/pd.go @@ -35,6 +35,7 @@ package util import ( + "bytes" "context" "crypto/tls" "encoding/json" @@ -56,7 +57,7 @@ const ( // pd request retry time when connection fail. pdRequestRetryTime = 10 - storeMinResolvedTSPrefix = "pd/api/v1/min-resolved-ts" + minResolvedTSPrefix = "pd/api/v1/min-resolved-ts" ) // PDHTTPClient is an HTTP client of pd. @@ -86,45 +87,52 @@ func NewPDHTTPClient( } } -// GetStoreMinResolvedTS get store-level min-resolved-ts from pd. -func (p *PDHTTPClient) GetStoreMinResolvedTS(ctx context.Context, storeID uint64) (uint64, error) { +// GetMinResolvedTSByStoresIDs get min-resolved-ts from pd by stores ids. +func (p *PDHTTPClient) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []string) (map[uint64]uint64, error) { var err error for _, addr := range p.addrs { - query := fmt.Sprintf("%s/%d", storeMinResolvedTSPrefix, storeID) - v, e := pdRequest(ctx, addr, query, p.cli, http.MethodGet, nil) + data, err := json.Marshal(storeIDs) + if err != nil { + logutil.BgLogger().Debug("failed to marshal store ids", zap.String("addr", addr), zap.Error(err)) + return nil, errors.Trace(err) + } + v, e := pdRequest(ctx, addr, minResolvedTSPrefix, p.cli, http.MethodGet, bytes.NewBuffer(data)) if e != nil { logutil.BgLogger().Debug("failed to get min resolved ts", zap.String("addr", addr), zap.Error(e)) err = e continue } - logutil.BgLogger().Debug("store min resolved ts", zap.String("resp", string(v))) + logutil.BgLogger().Debug("min resolved ts", zap.String("resp", string(v))) d := struct { - IsRealTime bool `json:"is_real_time,omitempty"` - MinResolvedTS uint64 `json:"min_resolved_ts"` + IsRealTime bool `json:"is_real_time,omitempty"` + StoreMinResolvedTS map[uint64]uint64 `json:"store_min_resolved_ts"` }{} err = json.Unmarshal(v, &d) if err != nil { - return 0, errors.Trace(err) + return nil, errors.Trace(err) } if !d.IsRealTime { - message := fmt.Errorf("store min resolved ts not enabled, addr: %s", addr) + message := fmt.Errorf("min resolved ts not enabled, addr: %s", addr) logutil.BgLogger().Debug(message.Error()) - return 0, errors.Trace(message) + return nil, errors.Trace(message) } if val, e := EvalFailpoint("InjectMinResolvedTS"); e == nil { // Need to make sure successfully get from real pd. - if d.MinResolvedTS != 0 { - // Should be val.(uint64) but failpoint doesn't support that. - if tmp, ok := val.(int); ok { - d.MinResolvedTS = uint64(tmp) + for storeID, v := range d.StoreMinResolvedTS { + if v != 0 { + // Should be val.(uint64) but failpoint doesn't support that. + if tmp, ok := val.(int); ok { + d.StoreMinResolvedTS[storeID] = uint64(tmp) + } } } + } - return d.MinResolvedTS, nil + return d.StoreMinResolvedTS, nil } - return 0, errors.Trace(err) + return nil, errors.Trace(err) } // pdRequest is a func to send an HTTP to pd and return the result bytes.