mirror of https://github.com/tikv/client-go.git
Support to integrate with the PD HTTP client (#1049)
Signed-off-by: JmPotato <ghzpotato@gmail.com> Co-authored-by: disksing <i@disksing.com>
This commit is contained in:
parent
ea379b071f
commit
845e3b01a2
2
go.mod
2
go.mod
|
|
@ -2,6 +2,8 @@ module github.com/tikv/client-go/v2
|
|||
|
||||
go 1.21
|
||||
|
||||
replace github.com/tikv/pd/client v0.0.0-20230724080549-de985b8e0afc => github.com/JmPotato/pd/client v0.0.0-20231107024609-216b5b763864
|
||||
|
||||
require (
|
||||
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548
|
||||
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2
|
||||
|
|
|
|||
5
go.sum
5
go.sum
|
|
@ -1,5 +1,8 @@
|
|||
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
|
||||
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/JmPotato/pd/client v0.0.0-20231107024609-216b5b763864 h1:q7k2boDgGSdnX/gXaIhk59V33J6GcxhpI53eoyVvMM0=
|
||||
github.com/JmPotato/pd/client v0.0.0-20231107024609-216b5b763864/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ=
|
||||
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
|
||||
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
|
||||
|
|
@ -111,8 +114,6 @@ github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ
|
|||
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
||||
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4=
|
||||
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
|
||||
github.com/tikv/pd/client v0.0.0-20230724080549-de985b8e0afc h1:IUg0j2nWoGYj3FQ3vA3vg97fPSpJEZQrDpgF8RkMLEU=
|
||||
github.com/tikv/pd/client v0.0.0-20230724080549-de985b8e0afc/go.mod h1:wfHRc4iYaqJiOQZCHcrF+r4hYnkGDaYWDfcicee//pc=
|
||||
github.com/twmb/murmur3 v1.1.3 h1:D83U0XYKcHRYwYIpBKf3Pks91Z0Byda/9SJ8B6EMRcA=
|
||||
github.com/twmb/murmur3 v1.1.3/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
|
||||
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
|
|
|
|||
|
|
@ -2,6 +2,8 @@ module integration_tests
|
|||
|
||||
go 1.21
|
||||
|
||||
replace github.com/tikv/pd/client v0.0.0-20230912103610-2f57a9f050eb => github.com/JmPotato/pd/client v0.0.0-20231107024609-216b5b763864
|
||||
|
||||
require (
|
||||
github.com/ninedraft/israce v0.0.3
|
||||
github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32
|
||||
|
|
|
|||
|
|
@ -31,6 +31,8 @@ github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ=
|
|||
github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
|
||||
github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob0t8PQPMybUNFM=
|
||||
github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo=
|
||||
github.com/JmPotato/pd/client v0.0.0-20231107024609-216b5b763864 h1:q7k2boDgGSdnX/gXaIhk59V33J6GcxhpI53eoyVvMM0=
|
||||
github.com/JmPotato/pd/client v0.0.0-20231107024609-216b5b763864/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ=
|
||||
github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKzY=
|
||||
github.com/Joker/jade v1.0.1-0.20190614124447-d475f43051e7/go.mod h1:6E6s8o2AE4KhCrqr6GRJjdC/gNfTdxkIXvuGZZda2VM=
|
||||
github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww=
|
||||
|
|
@ -507,8 +509,6 @@ github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
|
|||
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
|
||||
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
|
||||
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
|
||||
github.com/tikv/pd/client v0.0.0-20230912103610-2f57a9f050eb h1:hAcH9tFjQzQ3+ofrAHm4ajOTLliYCOfXpj3+boKOtac=
|
||||
github.com/tikv/pd/client v0.0.0-20230912103610-2f57a9f050eb/go.mod h1:E+6qtPu8fJm5kNjvKWPVFqSgNAFPk07y2EjD03GWzuI=
|
||||
github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs=
|
||||
github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM=
|
||||
github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7AmcWpGR8lSZfqI=
|
||||
|
|
|
|||
|
|
@ -48,14 +48,15 @@ type apiTestSuite struct {
|
|||
}
|
||||
|
||||
func (s *apiTestSuite) SetupTest() {
|
||||
require := s.Require()
|
||||
addrs := strings.Split(*pdAddrs, ",")
|
||||
pdClient, err := pd.NewClient(addrs, pd.SecurityOption{})
|
||||
s.Require().NoError(err)
|
||||
require.NoError(err)
|
||||
rpcClient := tikv.NewRPCClient()
|
||||
s.Require().NoError(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`))
|
||||
require.NoError(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`))
|
||||
// Set PD HTTP client.
|
||||
store, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0, tikv.WithPDHTTPClient(nil, addrs))
|
||||
s.store = store
|
||||
s.store, err = tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0, tikv.WithPDHTTPClient(addrs, nil))
|
||||
require.NoError(err)
|
||||
storeID := uint64(1)
|
||||
s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, nil)
|
||||
}
|
||||
|
|
@ -122,18 +123,17 @@ func (s *apiTestSuite) TestGetStoresMinResolvedTS() {
|
|||
storeID := uint64(1)
|
||||
s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, labels)
|
||||
// Try to get the minimum resolved timestamp of the stores from PD.
|
||||
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
|
||||
var retryCount int
|
||||
for s.store.GetMinSafeTS(dcLabel) != 100 {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
if retryCount > 5 {
|
||||
break
|
||||
}
|
||||
retryCount++
|
||||
}
|
||||
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`))
|
||||
s.waitForMinSafeTS(dcLabel, 100)
|
||||
require.Equal(int32(0), atomic.LoadInt32(&mockClient.requestCount))
|
||||
require.Equal(uint64(100), s.store.GetMinSafeTS(dcLabel))
|
||||
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
|
||||
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))
|
||||
}
|
||||
|
||||
func (s *apiTestSuite) waitForMinSafeTS(txnScope string, ts uint64) {
|
||||
s.Eventually(func() bool {
|
||||
return s.store.GetMinSafeTS(txnScope) == ts
|
||||
}, time.Second, 200*time.Millisecond)
|
||||
}
|
||||
|
||||
func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
|
||||
|
|
@ -142,22 +142,15 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
|
|||
mockClient := newStoreSafeTsMockClient(s.store.GetTiKVClient())
|
||||
s.store.SetTiKVClient(&mockClient)
|
||||
// Try to get the minimum resolved timestamp of the cluster from PD.
|
||||
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
|
||||
var retryCount int
|
||||
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 100 {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
if retryCount > 5 {
|
||||
break
|
||||
}
|
||||
retryCount++
|
||||
}
|
||||
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`))
|
||||
s.waitForMinSafeTS(oracle.GlobalTxnScope, 100)
|
||||
require.Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0))
|
||||
require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
|
||||
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
|
||||
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))
|
||||
|
||||
// Set DC label for store 1.
|
||||
// Mock PD server not support get min resolved ts by stores.
|
||||
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`))
|
||||
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(0)`))
|
||||
dcLabel := "testDC"
|
||||
restore := config.UpdateGlobal(func(conf *config.Config) {
|
||||
conf.TxnScope = dcLabel
|
||||
|
|
@ -173,18 +166,10 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
|
|||
storeID := uint64(1)
|
||||
s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, labels)
|
||||
// Try to get the minimum resolved timestamp of the store from TiKV.
|
||||
retryCount = 0
|
||||
for s.store.GetMinSafeTS(dcLabel) != 150 {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
if retryCount > 5 {
|
||||
break
|
||||
}
|
||||
retryCount++
|
||||
}
|
||||
|
||||
s.waitForMinSafeTS(dcLabel, 150)
|
||||
require.GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1))
|
||||
require.Equal(uint64(150), s.store.GetMinSafeTS(dcLabel))
|
||||
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
|
||||
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))
|
||||
}
|
||||
|
||||
func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() {
|
||||
|
|
@ -196,47 +181,26 @@ func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() {
|
|||
// Make sure the store's min resolved ts is not initialized.
|
||||
mockClient.SetKVSafeTS(0)
|
||||
// Try to get the minimum resolved timestamp of the cluster from TiKV.
|
||||
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`))
|
||||
var retryCount int
|
||||
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != math.MaxUint64 {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
if retryCount > 5 {
|
||||
break
|
||||
}
|
||||
retryCount++
|
||||
}
|
||||
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(0)`))
|
||||
// Make sure the store's min resolved ts is not initialized.
|
||||
s.waitForMinSafeTS(oracle.GlobalTxnScope, math.MaxUint64)
|
||||
require.Equal(uint64(math.MaxUint64), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
|
||||
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
|
||||
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))
|
||||
|
||||
// Try to get the minimum resolved timestamp of the cluster from PD.
|
||||
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
|
||||
retryCount = 0
|
||||
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) == math.MaxUint64 {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
if retryCount > 5 {
|
||||
break
|
||||
}
|
||||
retryCount++
|
||||
}
|
||||
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`))
|
||||
// Make sure the store's min resolved ts is not regarded as MaxUint64.
|
||||
s.waitForMinSafeTS(oracle.GlobalTxnScope, 100)
|
||||
require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
|
||||
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
|
||||
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))
|
||||
|
||||
// Fallback to KV Request when PD server not support get min resolved ts.
|
||||
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`))
|
||||
require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(0)`))
|
||||
mockClient.SetKVSafeTS(150)
|
||||
retryCount = 0
|
||||
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 150 {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
if retryCount > 5 {
|
||||
break
|
||||
}
|
||||
retryCount++
|
||||
}
|
||||
// Make sure the minSafeTS can advance.
|
||||
s.waitForMinSafeTS(oracle.GlobalTxnScope, 150)
|
||||
require.Equal(uint64(150), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
|
||||
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
|
||||
require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS"))
|
||||
}
|
||||
|
||||
func (s *apiTestSuite) TearDownTest() {
|
||||
|
|
|
|||
|
|
@ -41,6 +41,7 @@ import (
|
|||
"github.com/tikv/client-go/v2/oracle"
|
||||
"github.com/tikv/client-go/v2/tikvrpc"
|
||||
"github.com/tikv/client-go/v2/txnkv/txnlock"
|
||||
pdhttp "github.com/tikv/pd/client/http"
|
||||
)
|
||||
|
||||
// Storage represent the kv.Storage runs on TiKV.
|
||||
|
|
@ -69,6 +70,9 @@ type Storage interface {
|
|||
// GetTiKVClient gets the TiKV client.
|
||||
GetTiKVClient() Client
|
||||
|
||||
// GetPDHTTPClient gets the PD HTTP client.
|
||||
GetPDHTTPClient() pdhttp.Client
|
||||
|
||||
// Closed returns the closed channel.
|
||||
Closed() <-chan struct{}
|
||||
|
||||
|
|
|
|||
61
tikv/kv.go
61
tikv/kv.go
|
|
@ -66,6 +66,7 @@ import (
|
|||
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
|
||||
"github.com/tikv/client-go/v2/util"
|
||||
pd "github.com/tikv/pd/client"
|
||||
pdhttp "github.com/tikv/pd/client/http"
|
||||
resourceControlClient "github.com/tikv/pd/client/resource_group/controller"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
atomicutil "go.uber.org/atomic"
|
||||
|
|
@ -115,7 +116,7 @@ type KVStore struct {
|
|||
client Client
|
||||
}
|
||||
pdClient pd.Client
|
||||
pdHttpClient *util.PDHTTPClient
|
||||
pdHttpClient pdhttp.Client
|
||||
regionCache *locate.RegionCache
|
||||
lockResolver *txnlock.LockResolver
|
||||
txnLatches *latch.LatchesScheduler
|
||||
|
|
@ -147,6 +148,8 @@ type KVStore struct {
|
|||
gP Pool
|
||||
}
|
||||
|
||||
var _ Storage = (*KVStore)(nil)
|
||||
|
||||
// Go run the function in a separate goroutine.
|
||||
func (s *KVStore) Go(f func()) error {
|
||||
return s.gP.Run(f)
|
||||
|
|
@ -195,9 +198,9 @@ func WithPool(gp Pool) Option {
|
|||
}
|
||||
|
||||
// WithPDHTTPClient set the PD HTTP client with the given address and TLS config.
|
||||
func WithPDHTTPClient(tlsConf *tls.Config, pdaddrs []string) Option {
|
||||
func WithPDHTTPClient(pdAddrs []string, tlsConf *tls.Config) Option {
|
||||
return func(o *KVStore) {
|
||||
o.pdHttpClient = util.NewPDHTTPClient(tlsConf, pdaddrs)
|
||||
o.pdHttpClient = pdhttp.NewClient(pdAddrs, pdhttp.WithTLSConfig(tlsConf))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -444,6 +447,11 @@ func (s *KVStore) GetPDClient() pd.Client {
|
|||
return s.pdClient
|
||||
}
|
||||
|
||||
// GetPDHTTPClient returns the PD HTTP client.
|
||||
func (s *KVStore) GetPDHTTPClient() pdhttp.Client {
|
||||
return s.pdHttpClient
|
||||
}
|
||||
|
||||
// SupportDeleteRange gets the storage support delete range or not.
|
||||
func (s *KVStore) SupportDeleteRange() (supported bool) {
|
||||
return !s.mock
|
||||
|
|
@ -598,28 +606,31 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
|
|||
err error
|
||||
storeMinResolvedTSs map[uint64]uint64
|
||||
)
|
||||
storeIDs := make([]string, len(stores))
|
||||
storeIDs := make([]uint64, len(stores))
|
||||
if s.pdHttpClient != nil {
|
||||
for i, store := range stores {
|
||||
storeIDs[i] = strconv.FormatUint(store.StoreID(), 10)
|
||||
storeIDs[i] = store.StoreID()
|
||||
}
|
||||
_, storeMinResolvedTSs, err = s.pdHttpClient.GetMinResolvedTSByStoresIDs(ctx, storeIDs)
|
||||
_, storeMinResolvedTSs, err = s.getMinResolvedTSByStoresIDs(ctx, storeIDs)
|
||||
if err != nil {
|
||||
// If getting the minimum resolved timestamp from PD failed, log the error and need to get it from TiKV.
|
||||
logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err), zap.Any("stores", storeIDs))
|
||||
}
|
||||
}
|
||||
|
||||
for i, store := range stores {
|
||||
for _, store := range stores {
|
||||
storeID := store.StoreID()
|
||||
storeAddr := store.GetAddr()
|
||||
if store.IsTiFlash() {
|
||||
storeAddr = store.GetPeerAddr()
|
||||
}
|
||||
go func(ctx context.Context, wg *sync.WaitGroup, storeID uint64, storeAddr string, storeIDStr string) {
|
||||
go func(ctx context.Context, wg *sync.WaitGroup, storeID uint64, storeAddr string) {
|
||||
defer wg.Done()
|
||||
|
||||
var safeTS uint64
|
||||
var (
|
||||
safeTS uint64
|
||||
storeIDStr = strconv.FormatUint(storeID, 10)
|
||||
)
|
||||
// If getting the minimum resolved timestamp from PD failed or returned 0, try to get it from TiKV.
|
||||
if storeMinResolvedTSs == nil || storeMinResolvedTSs[storeID] == 0 || err != nil {
|
||||
resp, err := tikvClient.SendRequest(
|
||||
|
|
@ -655,7 +666,7 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
|
|||
metrics.TiKVSafeTSUpdateCounter.WithLabelValues("success", storeIDStr).Inc()
|
||||
safeTSTime := oracle.GetTimeFromTS(safeTS)
|
||||
metrics.TiKVMinSafeTSGapSeconds.WithLabelValues(storeIDStr).Set(time.Since(safeTSTime).Seconds())
|
||||
}(ctx, wg, storeID, storeAddr, storeIDs[i])
|
||||
}(ctx, wg, storeID, storeAddr)
|
||||
}
|
||||
|
||||
txnScopeMap := make(map[string][]uint64)
|
||||
|
|
@ -672,6 +683,34 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
|
|||
wg.Wait()
|
||||
}
|
||||
|
||||
func (s *KVStore) getMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uint64) (uint64, map[uint64]uint64, error) {
|
||||
var (
|
||||
minResolvedTS uint64
|
||||
storeMinResolvedTSs map[uint64]uint64
|
||||
err error
|
||||
)
|
||||
minResolvedTS, storeMinResolvedTSs, err = s.pdHttpClient.GetMinResolvedTSByStoresIDs(ctx, storeIDs)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
if val, e := util.EvalFailpoint("InjectPDMinResolvedTS"); e == nil {
|
||||
injectedTS, ok := val.(int)
|
||||
if !ok {
|
||||
return minResolvedTS, storeMinResolvedTSs, err
|
||||
}
|
||||
minResolvedTS = uint64(injectedTS)
|
||||
logutil.BgLogger().Info("inject min resolved ts", zap.Uint64("ts", uint64(injectedTS)))
|
||||
// Currently we only have a store 1 in the test, so it's OK to inject the same min resolved TS for all stores here.
|
||||
for storeID, v := range storeMinResolvedTSs {
|
||||
if v != 0 && v != math.MaxUint64 {
|
||||
storeMinResolvedTSs[storeID] = uint64(injectedTS)
|
||||
logutil.BgLogger().Info("inject store min resolved ts", zap.Uint64("storeID", storeID), zap.Uint64("ts", uint64(injectedTS)))
|
||||
}
|
||||
}
|
||||
}
|
||||
return minResolvedTS, storeMinResolvedTSs, err
|
||||
}
|
||||
|
||||
var (
|
||||
skipSafeTSUpdateCounter = metrics.TiKVSafeTSUpdateCounter.WithLabelValues("skip", "cluster")
|
||||
successSafeTSUpdateCounter = metrics.TiKVSafeTSUpdateCounter.WithLabelValues("success", "cluster")
|
||||
|
|
@ -684,7 +723,7 @@ func (s *KVStore) updateGlobalTxnScopeTSFromPD(ctx context.Context) bool {
|
|||
isGlobal := config.GetTxnScopeFromConfig() == oracle.GlobalTxnScope
|
||||
// Try to get the minimum resolved timestamp of the cluster from PD.
|
||||
if s.pdHttpClient != nil && isGlobal {
|
||||
clusterMinSafeTS, _, err := s.pdHttpClient.GetMinResolvedTSByStoresIDs(ctx, nil)
|
||||
clusterMinSafeTS, _, err := s.getMinResolvedTSByStoresIDs(ctx, nil)
|
||||
if err != nil {
|
||||
logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err))
|
||||
} else if clusterMinSafeTS != 0 {
|
||||
|
|
|
|||
232
util/pd.go
232
util/pd.go
|
|
@ -1,232 +0,0 @@
|
|||
// Copyright 2023 TiKV Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// NOTE: The code in this file is based on code from the
|
||||
// TiDB project, licensed under the Apache License v 2.0
|
||||
//
|
||||
// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/util/execdetails.go
|
||||
//
|
||||
|
||||
// Copyright 2023 PingCAP, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/tikv/client-go/v2/internal/logutil"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
// pd request retry time when connection fail.
|
||||
pdRequestRetryTime = 10
|
||||
|
||||
minResolvedTSPrefix = "pd/api/v1/min-resolved-ts"
|
||||
)
|
||||
|
||||
// PDHTTPClient is an HTTP client of pd.
|
||||
type PDHTTPClient struct {
|
||||
addrs []string
|
||||
cli *http.Client
|
||||
}
|
||||
|
||||
func NewPDHTTPClient(
|
||||
tlsConf *tls.Config,
|
||||
pdAddrs []string,
|
||||
) *PDHTTPClient {
|
||||
for i, addr := range pdAddrs {
|
||||
if !strings.HasPrefix(addr, "http") {
|
||||
if tlsConf != nil {
|
||||
addr = "https://" + addr
|
||||
} else {
|
||||
addr = "http://" + addr
|
||||
}
|
||||
pdAddrs[i] = addr
|
||||
}
|
||||
}
|
||||
|
||||
return &PDHTTPClient{
|
||||
addrs: pdAddrs,
|
||||
cli: httpClient(tlsConf),
|
||||
}
|
||||
}
|
||||
|
||||
// GetMinResolvedTSByStoresIDs get min-resolved-ts from pd by stores ids.
|
||||
func (p *PDHTTPClient) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []string) (uint64, map[uint64]uint64, error) {
|
||||
var err error
|
||||
for _, addr := range p.addrs {
|
||||
// scope is an optional parameter, it can be `cluster` or specified store IDs.
|
||||
// - When no scope is given, cluster-level's min_resolved_ts will be returned and storesMinResolvedTS will be nil.
|
||||
// - When scope is `cluster`, cluster-level's min_resolved_ts will be returned and storesMinResolvedTS will be filled.
|
||||
// - When scope given a list of stores, min_resolved_ts will be provided for each store
|
||||
// and the scope-specific min_resolved_ts will be returned.
|
||||
query := minResolvedTSPrefix
|
||||
if len(storeIDs) != 0 {
|
||||
query = fmt.Sprintf("%s?scope=%s", query, strings.Join(storeIDs, ","))
|
||||
}
|
||||
v, e := pdRequest(ctx, addr, query, p.cli, http.MethodGet, nil)
|
||||
if e != nil {
|
||||
logutil.BgLogger().Debug("failed to get min resolved ts", zap.String("addr", addr), zap.Error(e))
|
||||
err = e
|
||||
continue
|
||||
}
|
||||
logutil.BgLogger().Debug("min resolved ts", zap.String("resp", string(v)))
|
||||
d := struct {
|
||||
MinResolvedTS uint64 `json:"min_resolved_ts"`
|
||||
IsRealTime bool `json:"is_real_time,omitempty"`
|
||||
StoresMinResolvedTS map[uint64]uint64 `json:"stores_min_resolved_ts"`
|
||||
}{}
|
||||
err = json.Unmarshal(v, &d)
|
||||
if err != nil {
|
||||
return 0, nil, errors.Trace(err)
|
||||
}
|
||||
if !d.IsRealTime {
|
||||
message := fmt.Errorf("min resolved ts not enabled, addr: %s", addr)
|
||||
logutil.BgLogger().Debug(message.Error())
|
||||
return 0, nil, errors.Trace(message)
|
||||
}
|
||||
if val, e := EvalFailpoint("InjectMinResolvedTS"); e == nil {
|
||||
// Need to make sure successfully get from real pd.
|
||||
if d.StoresMinResolvedTS != nil {
|
||||
for storeID, v := range d.StoresMinResolvedTS {
|
||||
if v != 0 {
|
||||
// Should be val.(uint64) but failpoint doesn't support that.
|
||||
if tmp, ok := val.(int); ok {
|
||||
d.StoresMinResolvedTS[storeID] = uint64(tmp)
|
||||
logutil.BgLogger().Info("inject min resolved ts", zap.Uint64("storeID", storeID), zap.Uint64("ts", uint64(tmp)))
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if tmp, ok := val.(int); ok {
|
||||
// Should be val.(uint64) but failpoint doesn't support that.
|
||||
// ci's store id is 1, we can change it if we have more stores.
|
||||
// but for pool ci it's no need to do that :(
|
||||
d.MinResolvedTS = uint64(tmp)
|
||||
logutil.BgLogger().Info("inject min resolved ts", zap.Uint64("ts", uint64(tmp)))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return d.MinResolvedTS, d.StoresMinResolvedTS, nil
|
||||
}
|
||||
|
||||
return 0, nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
// pdRequest is a func to send an HTTP to pd and return the result bytes.
|
||||
func pdRequest(
|
||||
ctx context.Context,
|
||||
addr string, prefix string,
|
||||
cli *http.Client, method string, body io.Reader) ([]byte, error) {
|
||||
u, err := url.Parse(addr)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
reqURL := fmt.Sprintf("%s/%s", u, prefix)
|
||||
req, err := http.NewRequestWithContext(ctx, method, reqURL, body)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
var resp *http.Response
|
||||
count := 0
|
||||
for {
|
||||
resp, err = cli.Do(req)
|
||||
count++
|
||||
|
||||
if _, e := EvalFailpoint("InjectClosed"); e == nil {
|
||||
resp = nil
|
||||
err = &url.Error{
|
||||
Op: "read",
|
||||
Err: os.NewSyscallError("connect", syscall.ECONNREFUSED),
|
||||
}
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
if count > pdRequestRetryTime || (resp != nil && resp.StatusCode < 500) ||
|
||||
err != nil {
|
||||
break
|
||||
}
|
||||
if resp != nil {
|
||||
_ = resp.Body.Close()
|
||||
}
|
||||
time.Sleep(pdRequestRetryInterval())
|
||||
}
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
defer func() {
|
||||
_ = resp.Body.Close()
|
||||
}()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
res, _ := io.ReadAll(resp.Body)
|
||||
return nil, errors.Errorf("[%d] %s %s", resp.StatusCode, res, reqURL)
|
||||
}
|
||||
|
||||
r, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
return r, err
|
||||
}
|
||||
|
||||
func pdRequestRetryInterval() time.Duration {
|
||||
if _, e := EvalFailpoint("FastRetry"); e == nil {
|
||||
return 0
|
||||
}
|
||||
return time.Second
|
||||
}
|
||||
|
||||
// httpClient returns an HTTP(s) client.
|
||||
func httpClient(tlsConf *tls.Config) *http.Client {
|
||||
// defaultTimeout for non-context requests.
|
||||
const defaultTimeout = 30 * time.Second
|
||||
cli := &http.Client{Timeout: defaultTimeout}
|
||||
if tlsConf != nil {
|
||||
transport := http.DefaultTransport.(*http.Transport).Clone()
|
||||
transport.TLSClientConfig = tlsConf
|
||||
cli.Transport = transport
|
||||
}
|
||||
return cli
|
||||
}
|
||||
|
||||
func (p *PDHTTPClient) Close() {
|
||||
p.cli.CloseIdleConnections()
|
||||
logutil.BgLogger().Info("closed pd http client")
|
||||
}
|
||||
|
|
@ -1,97 +0,0 @@
|
|||
// Copyright 2023 TiKV Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// NOTE: The code in this file is based on code from the
|
||||
// TiDB project, licensed under the Apache License v 2.0
|
||||
//
|
||||
// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/util/rate_limit_test.go
|
||||
//
|
||||
|
||||
// Copyright 2023 PingCAP, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/pingcap/failpoint"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestPDRequestRetry(t *testing.T) {
|
||||
EnableFailpoints()
|
||||
ctx := context.Background()
|
||||
require := require.New(t)
|
||||
require.Nil(failpoint.Enable("tikvclient/FastRetry", `return()`))
|
||||
defer func() {
|
||||
require.Nil(failpoint.Disable("tikvclient/FastRetry"))
|
||||
}()
|
||||
|
||||
count := 0
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
count++
|
||||
if count <= pdRequestRetryTime-1 {
|
||||
w.WriteHeader(http.StatusGatewayTimeout)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
cli := http.DefaultClient
|
||||
taddr := ts.URL
|
||||
_, reqErr := pdRequest(ctx, taddr, "", cli, http.MethodGet, nil)
|
||||
require.Nil(reqErr)
|
||||
ts.Close()
|
||||
|
||||
count = 0
|
||||
ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
count++
|
||||
if count <= pdRequestRetryTime+1 {
|
||||
w.WriteHeader(http.StatusGatewayTimeout)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
taddr = ts.URL
|
||||
_, reqErr = pdRequest(ctx, taddr, "", cli, http.MethodGet, nil)
|
||||
require.Error(reqErr)
|
||||
ts.Close()
|
||||
|
||||
require.Nil(failpoint.Enable("tikvclient/InjectClosed", fmt.Sprintf("return(%d)", 0)))
|
||||
defer func() {
|
||||
require.Nil(failpoint.Disable("tikvclient/InjectClosed"))
|
||||
}()
|
||||
ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
taddr = ts.URL
|
||||
_, reqErr = pdRequest(ctx, taddr, "", cli, http.MethodGet, nil)
|
||||
require.Error(reqErr)
|
||||
ts.Close()
|
||||
}
|
||||
Loading…
Reference in New Issue