region_cache: extract store related fields to store cache (#1279)

Signed-off-by: zyguan <zhongyangguan@gmail.com>
Co-authored-by: cfzjywxk <lsswxrxr@163.com>
This commit is contained in:
zyguan 2024-04-18 21:37:35 +08:00 committed by GitHub
parent 09b120cdf7
commit 36c8d2c668
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 146 additions and 133 deletions

View File

@ -326,11 +326,11 @@ func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Regio
var leaderAccessIdx AccessIndex
availablePeers := r.meta.GetPeers()[:0]
for _, p := range r.meta.Peers {
store, exists := c.getStore(p.StoreId)
store, exists := c.stores.get(p.StoreId)
if !exists {
store = c.getStoreOrInsertDefault(p.StoreId)
store = c.stores.getOrInsertDefault(p.StoreId)
}
addr, err := store.initResolve(bo, c)
addr, err := store.initResolve(bo, c.stores)
if err != nil {
return nil, err
}
@ -633,25 +633,11 @@ type RegionCache struct {
mu regionIndexMu
storeMu struct {
sync.RWMutex
stores map[uint64]*Store
}
tiflashComputeStoreMu struct {
sync.RWMutex
needReload bool
stores []*Store
}
notifyCheckCh chan struct{}
stores storeCache
// runner for background jobs
bg *bgRunner
testingKnobs struct {
// Replace the requestLiveness function for test purpose. Note that in unit tests, if this is not set,
// requestLiveness always returns unreachable.
mockRequestLiveness atomic.Pointer[livenessFunc]
}
clusterID uint64
}
@ -681,10 +667,7 @@ func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache {
c.codec = codecPDClient.GetCodec()
}
c.storeMu.stores = make(map[uint64]*Store)
c.tiflashComputeStoreMu.needReload = true
c.tiflashComputeStoreMu.stores = make([]*Store, 0)
c.notifyCheckCh = make(chan struct{}, 1)
c.stores = newStoreCache(pdClient)
c.bg = newBackgroundRunner(context.Background())
c.enableForwarding = config.GetGlobalConfig().EnableForwarding
if c.pdClient != nil {
@ -719,7 +702,7 @@ func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache {
}
needCheckStores = c.checkAndResolve(needCheckStores[:0], func(s *Store) bool { return filter(s.getResolveState()) })
return false
}, time.Duration(refreshStoreInterval/4)*time.Second, c.getCheckStoreEvents())
}, time.Duration(refreshStoreInterval/4)*time.Second, c.stores.getCheckStoreEvents())
if !options.noHealthTick {
c.bg.schedule(repeat(c.checkAndUpdateStoreHealthStatus), time.Duration(refreshStoreInterval/4)*time.Second)
}
@ -741,10 +724,6 @@ func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache {
// only used fot test.
func newTestRegionCache() *RegionCache {
c := &RegionCache{}
c.storeMu.stores = make(map[uint64]*Store)
c.tiflashComputeStoreMu.needReload = true
c.tiflashComputeStoreMu.stores = make([]*Store, 0)
c.notifyCheckCh = make(chan struct{}, 1)
c.bg = newBackgroundRunner(context.Background())
c.mu = *newRegionIndexMu(nil)
return c
@ -753,7 +732,7 @@ func newTestRegionCache() *RegionCache {
// clear clears all cached data in the RegionCache. It's only used in tests.
func (c *RegionCache) clear() {
c.mu.refresh(nil)
c.clearStores()
c.stores.clear()
}
// thread unsafe, should use with lock
@ -778,9 +757,9 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store, needCheck func(*
}
}()
needCheckStores = c.filterStores(needCheckStores, needCheck)
needCheckStores = c.stores.filter(needCheckStores, needCheck)
for _, store := range needCheckStores {
_, err := store.reResolve(c)
_, err := store.reResolve(c.stores)
tikverr.Log(err)
}
return needCheckStores
@ -788,12 +767,13 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store, needCheck func(*
// SetRegionCacheStore is used to set a store in region cache, for testing only
func (c *RegionCache) SetRegionCacheStore(id uint64, addr string, peerAddr string, storeType tikvrpc.EndpointType, state uint64, labels []*metapb.StoreLabel) {
c.putStore(newStore(id, addr, peerAddr, "", storeType, resolveState(state), labels))
c.stores.put(newStore(id, addr, peerAddr, "", storeType, resolveState(state), labels))
}
// SetPDClient replaces pd client,for testing only
func (c *RegionCache) SetPDClient(client pd.Client) {
c.pdClient = client
c.stores = newStoreCache(client)
}
// RPCContext contains data that is needed to send RPC to a region.
@ -1057,7 +1037,7 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *retry.Backoffer, id RegionVerID,
return nil, nil
}
if store.getResolveState() == needCheck {
_, err := store.reResolve(c)
_, err := store.reResolve(c.stores)
tikverr.Log(err)
}
regionStore.workTiFlashIdx.Store(int32(accessIdx))
@ -1406,7 +1386,7 @@ func (c *RegionCache) markRegionNeedBeRefill(s *Store, storeIdx int, rs *regionS
metrics.RegionCacheCounterWithInvalidateStoreRegionsOK.Inc()
}
// schedule a store addr resolve.
c.markStoreNeedCheck(s)
c.stores.markStoreNeedCheck(s)
return incEpochStoreIdx
}
@ -1759,14 +1739,14 @@ func (c *RegionCache) searchCachedRegionByID(regionID uint64) (*Region, bool) {
// GetStoresByType gets stores by type `typ`
func (c *RegionCache) GetStoresByType(typ tikvrpc.EndpointType) []*Store {
return c.filterStores(nil, func(s *Store) bool {
return c.stores.filter(nil, func(s *Store) bool {
return s.getResolveState() == resolved && s.storeType == typ
})
}
// GetAllStores gets TiKV and TiFlash stores.
func (c *RegionCache) GetAllStores() []*Store {
return c.filterStores(nil, func(s *Store) bool {
return c.stores.filter(nil, func(s *Store) bool {
return s.getResolveState() == resolved && (s.storeType == tikvrpc.TiKV || s.storeType == tikvrpc.TiFlash)
})
}
@ -2037,7 +2017,7 @@ func (c *RegionCache) getStoreAddr(bo *retry.Backoffer, region *Region, store *S
addr = store.addr
return
case unresolved:
addr, err = store.initResolve(bo, c)
addr, err = store.initResolve(bo, c.stores)
return
case deleted:
addr = c.changeToActiveStore(region, store.storeID)
@ -2094,7 +2074,7 @@ func (c *RegionCache) getProxyStore(region *Region, store *Store, rs *regionStor
// changeToActiveStore replace the deleted store in the region by an up-to-date store in the stores map.
// The order is guaranteed by reResolve() which adds the new store before marking old store deleted.
func (c *RegionCache) changeToActiveStore(region *Region, storeID uint64) (addr string) {
store, _ := c.getStore(storeID)
store, _ := c.stores.get(storeID)
for {
oldRegionStore := region.getStore()
newRegionStore := oldRegionStore.clone()
@ -2204,19 +2184,19 @@ func (c *RegionCache) PDClient() pd.Client {
// GetTiFlashStores returns the information of all tiflash nodes. Like `GetAllStores`, the method only returns resolved
// stores so that users won't be bothered by tombstones. (related issue: https://github.com/pingcap/tidb/issues/46602)
func (c *RegionCache) GetTiFlashStores(labelFilter LabelFilter) []*Store {
return c.filterStores(nil, func(s *Store) bool {
return c.stores.filter(nil, func(s *Store) bool {
return s.storeType == tikvrpc.TiFlash && labelFilter(s.labels) && s.getResolveState() == resolved
})
}
// GetTiFlashComputeStores returns all stores with lable <engine, tiflash_compute>.
func (c *RegionCache) GetTiFlashComputeStores(bo *retry.Backoffer) (res []*Store, err error) {
stores, needReload := c.listTiflashComputeStores()
stores, needReload := c.stores.listTiflashComputeStores()
if needReload {
stores, err = reloadTiFlashComputeStores(bo.GetCtx(), c)
stores, err = reloadTiFlashComputeStores(bo.GetCtx(), c.stores)
if err == nil {
c.setTiflashComputeStores(stores)
c.stores.setTiflashComputeStores(stores)
}
return stores, err
}
@ -2266,7 +2246,7 @@ func (c *RegionCache) InvalidateTiFlashComputeStoresIfGRPCError(err error) bool
// InvalidateTiFlashComputeStores set needReload be true,
// and will refresh tiflash_compute store cache next time.
func (c *RegionCache) InvalidateTiFlashComputeStores() {
c.markTiflashComputeStoresNeedReload()
c.stores.markTiflashComputeStoresNeedReload()
}
// UpdateBucketsIfNeeded queries PD to update the buckets of the region in the cache if
@ -2652,7 +2632,7 @@ func (c *RegionCache) checkAndUpdateStoreHealthStatus() {
}()
healthDetails := make(map[uint64]HealthStatusDetail)
now := time.Now()
c.forEachStore(func(store *Store) {
c.stores.forEach(func(store *Store) {
store.healthStatus.tick(now)
healthDetails[store.storeID] = store.healthStatus.GetHealthStatusDetail()
})
@ -2664,7 +2644,7 @@ func (c *RegionCache) checkAndUpdateStoreHealthStatus() {
// reportStoreReplicaFlows reports the statistics on the related replicaFlowsType.
func (c *RegionCache) reportStoreReplicaFlows() {
c.forEachStore(func(store *Store) {
c.stores.forEach(func(store *Store) {
for destType := toLeader; destType < numReplicaFlowsType; destType++ {
metrics.TiKVPreferLeaderFlowsGauge.WithLabelValues(destType.String(), store.addr).Set(float64(store.getReplicaFlowsStats(destType)))
store.resetReplicaFlowsStats(destType)
@ -2683,7 +2663,7 @@ func contains(startKey, endKey, key []byte) bool {
}
func (c *RegionCache) onHealthFeedback(feedback *tikvpb.HealthFeedback) {
store, ok := c.getStore(feedback.GetStoreId())
store, ok := c.stores.get(feedback.GetStoreId())
if !ok {
logutil.BgLogger().Info("dropped health feedback info due to unknown store id", zap.Uint64("storeID", feedback.GetStoreId()))
return

View File

@ -331,8 +331,8 @@ func (s *testRegionCacheSuite) TestStoreLabels() {
}
for _, testcase := range testcases {
s.T().Log(testcase.storeID)
store := s.cache.getStoreOrInsertDefault(testcase.storeID)
_, err := store.initResolve(s.bo, s.cache)
store := s.cache.stores.getOrInsertDefault(testcase.storeID)
_, err := store.initResolve(s.bo, s.cache.stores)
s.Nil(err)
labels := []*metapb.StoreLabel{
{
@ -340,7 +340,7 @@ func (s *testRegionCacheSuite) TestStoreLabels() {
Value: fmt.Sprintf("%v", testcase.storeID),
},
}
stores := s.cache.filterStores(nil, func(s *Store) bool { return s.IsLabelsMatch(labels) })
stores := s.cache.stores.filter(nil, func(s *Store) bool { return s.IsLabelsMatch(labels) })
s.Equal(len(stores), 1)
s.Equal(stores[0].labels, labels)
}
@ -372,9 +372,9 @@ func (s *testRegionCacheSuite) TestResolveStateTransition() {
// Check resolving normal stores. The resolve state should be resolved.
for _, storeMeta := range s.cluster.GetAllStores() {
store := cache.getStoreOrInsertDefault(storeMeta.GetId())
store := cache.stores.getOrInsertDefault(storeMeta.GetId())
s.Equal(store.getResolveState(), unresolved)
addr, err := store.initResolve(bo, cache)
addr, err := store.initResolve(bo, cache.stores)
s.Nil(err)
s.Equal(addr, storeMeta.GetAddress())
s.Equal(store.getResolveState(), resolved)
@ -390,26 +390,26 @@ func (s *testRegionCacheSuite) TestResolveStateTransition() {
}
// Mark the store needCheck. The resolve state should be resolved soon.
store := cache.getStoreOrInsertDefault(s.store1)
cache.markStoreNeedCheck(store)
store := cache.stores.getOrInsertDefault(s.store1)
cache.stores.markStoreNeedCheck(store)
waitResolve(store)
s.Equal(store.getResolveState(), resolved)
// Mark the store needCheck and it becomes a tombstone. The resolve state should be tombstone.
s.cluster.MarkTombstone(s.store1)
cache.markStoreNeedCheck(store)
cache.stores.markStoreNeedCheck(store)
waitResolve(store)
s.Equal(store.getResolveState(), tombstone)
s.cluster.StartStore(s.store1)
// Mark the store needCheck and it's deleted from PD. The resolve state should be tombstone.
cache.clear()
store = cache.getStoreOrInsertDefault(s.store1)
store.initResolve(bo, cache)
store = cache.stores.getOrInsertDefault(s.store1)
store.initResolve(bo, cache.stores)
s.Equal(store.getResolveState(), resolved)
storeMeta := s.cluster.GetStore(s.store1)
s.cluster.RemoveStore(s.store1)
cache.markStoreNeedCheck(store)
cache.stores.markStoreNeedCheck(store)
waitResolve(store)
s.Equal(store.getResolveState(), tombstone)
s.cluster.AddStore(storeMeta.GetId(), storeMeta.GetAddress(), storeMeta.GetLabels()...)
@ -417,14 +417,14 @@ func (s *testRegionCacheSuite) TestResolveStateTransition() {
// Mark the store needCheck and its address and labels are changed.
// The resolve state should be deleted and a new store is added to the cache.
cache.clear()
store = cache.getStoreOrInsertDefault(s.store1)
store.initResolve(bo, cache)
store = cache.stores.getOrInsertDefault(s.store1)
store.initResolve(bo, cache.stores)
s.Equal(store.getResolveState(), resolved)
s.cluster.UpdateStoreAddr(s.store1, store.addr+"0", &metapb.StoreLabel{Key: "k", Value: "v"})
cache.markStoreNeedCheck(store)
cache.stores.markStoreNeedCheck(store)
waitResolve(store)
s.Equal(store.getResolveState(), deleted)
newStore := cache.getStoreOrInsertDefault(s.store1)
newStore := cache.stores.getOrInsertDefault(s.store1)
s.Equal(newStore.getResolveState(), resolved)
s.Equal(newStore.addr, store.addr+"0")
s.Equal(newStore.labels, []*metapb.StoreLabel{{Key: "k", Value: "v"}})
@ -432,9 +432,9 @@ func (s *testRegionCacheSuite) TestResolveStateTransition() {
// Check initResolve()ing a tombstone store. The resolve state should be tombstone.
cache.clear()
s.cluster.MarkTombstone(s.store1)
store = cache.getStoreOrInsertDefault(s.store1)
store = cache.stores.getOrInsertDefault(s.store1)
for i := 0; i < 2; i++ {
addr, err := store.initResolve(bo, cache)
addr, err := store.initResolve(bo, cache.stores)
s.Nil(err)
s.Equal(addr, "")
s.Equal(store.getResolveState(), tombstone)
@ -446,9 +446,9 @@ func (s *testRegionCacheSuite) TestResolveStateTransition() {
cache.clear()
storeMeta = s.cluster.GetStore(s.store1)
s.cluster.RemoveStore(s.store1)
store = cache.getStoreOrInsertDefault(s.store1)
store = cache.stores.getOrInsertDefault(s.store1)
for i := 0; i < 2; i++ {
addr, err := store.initResolve(bo, cache)
addr, err := store.initResolve(bo, cache.stores)
s.Nil(err)
s.Equal(addr, "")
s.Equal(store.getResolveState(), tombstone)
@ -485,7 +485,7 @@ func (s *testRegionCacheSuite) TestNeedExpireRegionAfterTTL() {
s.Run("WithStaleStores", func() {
cntGetRegion = 0
s.cache.clear()
store2 := s.cache.getStoreOrInsertDefault(s.store2)
store2 := s.cache.stores.getOrInsertDefault(s.store2)
for i := 0; i < 50; i++ {
atomic.StoreUint32(&store2.epoch, uint32(i))
@ -499,7 +499,7 @@ func (s *testRegionCacheSuite) TestNeedExpireRegionAfterTTL() {
s.Run("WithUnreachableStores", func() {
cntGetRegion = 0
s.cache.clear()
store2 := s.cache.getStoreOrInsertDefault(s.store2)
store2 := s.cache.stores.getOrInsertDefault(s.store2)
atomic.StoreUint32(&store2.livenessState, uint32(unreachable))
defer atomic.StoreUint32(&store2.livenessState, uint32(reachable))
@ -1774,7 +1774,7 @@ func (s *testRegionCacheSuite) TestBuckets() {
newMeta := proto.Clone(cachedRegion.meta).(*metapb.Region)
newMeta.RegionEpoch.Version++
newMeta.RegionEpoch.ConfVer++
_, err = s.cache.OnRegionEpochNotMatch(s.bo, &RPCContext{Region: cachedRegion.VerID(), Store: s.cache.getStoreOrInsertDefault(s.store1)}, []*metapb.Region{newMeta})
_, err = s.cache.OnRegionEpochNotMatch(s.bo, &RPCContext{Region: cachedRegion.VerID(), Store: s.cache.stores.getOrInsertDefault(s.store1)}, []*metapb.Region{newMeta})
s.Nil(err)
cachedRegion = s.getRegion([]byte("a"))
s.Equal(newBuckets, cachedRegion.getStore().buckets)
@ -2049,13 +2049,13 @@ func (s *testRegionCacheSuite) TestHealthCheckWithStoreReplace() {
// init region cache
s.cache.LocateKey(s.bo, []byte("a"))
store1, _ := s.cache.getStore(s.store1)
store1, _ := s.cache.stores.get(s.store1)
s.Require().NotNil(store1)
s.Require().Equal(resolved, store1.getResolveState())
// setup mock liveness func
store1Liveness := uint32(unreachable)
s.cache.setMockRequestLiveness(func(ctx context.Context, s *Store) livenessState {
s.cache.stores.setMockRequestLiveness(func(ctx context.Context, s *Store) livenessState {
if s.storeID == store1.storeID {
return livenessState(atomic.LoadUint32(&store1Liveness))
}
@ -2064,7 +2064,7 @@ func (s *testRegionCacheSuite) TestHealthCheckWithStoreReplace() {
// start health check loop
atomic.StoreUint32(&store1.livenessState, store1Liveness)
startHealthCheckLoop(s.cache, store1, livenessState(store1Liveness), time.Second)
startHealthCheckLoop(s.cache.bg, s.cache.stores, store1, livenessState(store1Liveness), time.Second)
// update store meta
s.cluster.UpdateStoreAddr(store1.storeID, store1.addr+"'", store1.labels...)
@ -2075,7 +2075,7 @@ func (s *testRegionCacheSuite) TestHealthCheckWithStoreReplace() {
}, 3*time.Second, time.Second)
// assert that the new store should be added and it's also not reachable
newStore1, _ := s.cache.getStore(store1.storeID)
newStore1, _ := s.cache.stores.get(store1.storeID)
s.Require().NotEqual(reachable, newStore1.getLivenessState())
// recover store1
@ -2156,7 +2156,7 @@ func (s *testRegionCacheSuite) TestRegionCacheHandleHealthStatus() {
_, err := s.cache.LocateKey(s.bo, []byte("k"))
s.Nil(err)
store1, exists := s.cache.getStore(s.store1)
store1, exists := s.cache.stores.get(s.store1)
s.True(exists)
s.False(store1.healthStatus.IsSlow())
@ -2197,7 +2197,7 @@ func (s *testRegionCacheSuite) TestRegionCacheHandleHealthStatus() {
s.False(store1.healthStatus.IsSlow())
s.Equal(int64(50), store1.healthStatus.GetHealthStatusDetail().TiKVSideSlowScore)
store2, exists := s.cache.getStore(s.store2)
store2, exists := s.cache.stores.get(s.store2)
s.True(exists)
// Store 2 is never affected by updating store 1
s.LessOrEqual(store2.healthStatus.GetHealthStatusDetail().TiKVSideSlowScore, int64(1))

View File

@ -1273,7 +1273,7 @@ func (s *baseReplicaSelector) getBaseReplicaSelector() *baseReplicaSelector {
}
func (s *baseReplicaSelector) checkLiveness(bo *retry.Backoffer, accessReplica *replica) livenessState {
return accessReplica.store.requestLivenessAndStartHealthCheckLoopIfNeeded(bo, s.regionCache)
return accessReplica.store.requestLivenessAndStartHealthCheckLoopIfNeeded(bo, s.regionCache.bg, s.regionCache.stores)
}
func (s *baseReplicaSelector) invalidateReplicaStore(replica *replica, cause error) {
@ -1287,7 +1287,7 @@ func (s *baseReplicaSelector) invalidateReplicaStore(replica *replica, cause err
)
metrics.RegionCacheCounterWithInvalidateStoreRegionsOK.Inc()
// schedule a store addr resolve.
s.regionCache.markStoreNeedCheck(store)
s.regionCache.stores.markStoreNeedCheck(store)
store.healthStatus.markAlreadySlow()
}
}
@ -2329,7 +2329,7 @@ func (s *RegionRequestSender) onRegionError(
zap.Stringer("storeNotMatch", storeNotMatch),
zap.Stringer("ctx", ctx),
)
s.regionCache.markStoreNeedCheck(ctx.Store)
s.regionCache.stores.markStoreNeedCheck(ctx.Store)
s.regionCache.InvalidateCachedRegion(ctx.Region)
// It's possible the address of store is not changed but the DNS resolves to a different address in k8s environment,
// so we always reconnect in this case.

View File

@ -110,7 +110,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestStoreTokenLimit() {
s.NotNil(region)
oldStoreLimit := kv.StoreLimit.Load()
kv.StoreLimit.Store(500)
s.cache.getStoreOrInsertDefault(s.storeIDs[0]).tokenCount.Store(500)
s.cache.stores.getOrInsertDefault(s.storeIDs[0]).tokenCount.Store(500)
// cause there is only one region in this cluster, regionID maps this leader.
resp, _, err := s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second)
s.NotNil(err)
@ -256,7 +256,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestForwarding() {
return innerClient.SendRequest(ctx, addr, req, timeout)
}}
var storeState = uint32(unreachable)
sender.regionCache.setMockRequestLiveness(func(ctx context.Context, s *Store) livenessState {
sender.regionCache.stores.setMockRequestLiveness(func(ctx context.Context, s *Store) livenessState {
if s.addr == leaderAddr {
return livenessState(atomic.LoadUint32(&storeState))
}
@ -536,7 +536,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req)
s.Nil(err)
s.NotNil(replicaSelector)
unreachable.injectConstantLiveness(cache)
unreachable.injectConstantLiveness(cache.stores)
s.IsType(&accessKnownLeader{}, replicaSelector.state)
_, err = replicaSelector.next(s.bo, req)
s.Nil(err)
@ -572,7 +572,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
// Do not try to use proxy if livenessState is unknown instead of unreachable.
refreshEpochs(regionStore)
cache.enableForwarding = true
unknown.injectConstantLiveness(cache)
unknown.injectConstantLiveness(cache.stores)
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req)
s.Nil(err)
s.NotNil(replicaSelector)
@ -594,7 +594,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req)
s.Nil(err)
s.NotNil(replicaSelector)
unreachable.injectConstantLiveness(cache)
unreachable.injectConstantLiveness(cache.stores)
s.Eventually(func() bool {
return regionStore.stores[regionStore.workTiKVIdx].getLivenessState() == unreachable
}, 3*time.Second, 200*time.Millisecond)
@ -878,7 +878,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
}
return nil
}
reachable.injectConstantLiveness(s.cache)
reachable.injectConstantLiveness(s.cache.stores)
s.Eventually(func() bool {
stores := getReplicaSelectorRegionStores()
return stores[0].getLivenessState() == reachable &&
@ -1004,7 +1004,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
}
// Runs out of all replicas and then returns a send error.
unreachable.injectConstantLiveness(s.cache)
unreachable.injectConstantLiveness(s.cache.stores)
reloadRegion()
for _, store := range s.storeIDs {
s.cluster.StopStore(store)
@ -1292,7 +1292,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() {
}
// Test for write request.
reachable.injectConstantLiveness(s.cache)
reachable.injectConstantLiveness(s.cache.stores)
resetStats()
req := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, &kvrpcpb.PrewriteRequest{}, kvrpcpb.Context{})
req.ReplicaReadType = kv.ReplicaReadLeader

View File

@ -85,7 +85,7 @@ func (s *testRegionCacheStaleReadSuite) SetupTest() {
}
func (s *testRegionCacheStaleReadSuite) TearDownTest() {
s.cache.setMockRequestLiveness(nil)
s.cache.stores.setMockRequestLiveness(nil)
s.cache.Close()
s.mvccStore.Close()
}
@ -223,7 +223,7 @@ func (s *testRegionCacheStaleReadSuite) setClient() {
return
}}
s.cache.setMockRequestLiveness(func(ctx context.Context, store *Store) livenessState {
s.cache.stores.setMockRequestLiveness(func(ctx context.Context, store *Store) livenessState {
_, ok := s.injection.unavailableStoreIDs[store.storeID]
if ok {
return unreachable

View File

@ -765,7 +765,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestBatchClientSendLoopPanic() {
}()
req := tikvrpc.NewRequest(tikvrpc.CmdCop, &coprocessor.Request{Data: []byte("a"), StartTs: 1})
regionRequestSender := NewRegionRequestSender(s.cache, fnClient)
reachable.injectConstantLiveness(regionRequestSender.regionCache)
reachable.injectConstantLiveness(regionRequestSender.regionCache.stores)
regionRequestSender.SendReq(bo, req, region.Region, client.ReadTimeoutShort)
}
}()

View File

@ -2589,7 +2589,7 @@ func TestReplicaReadAvoidSlowStore(t *testing.T) {
defer s.TearDownTest()
s.changeRegionLeader(3)
store, exists := s.cache.getStore(1)
store, exists := s.cache.stores.get(1)
s.True(exists)
for _, staleRead := range []bool{false, true} {
@ -3056,7 +3056,7 @@ func (ca *replicaSelectorAccessPathCase) genAccessErr(regionCache *RegionCache,
}
if err != nil {
// inject unreachable liveness.
unreachable.injectConstantLiveness(regionCache)
unreachable.injectConstantLiveness(regionCache.stores)
}
return regionErr, err
}
@ -3094,7 +3094,7 @@ func (c *replicaSelectorAccessPathCase) Format() string {
func (s *testReplicaSelectorSuite) resetStoreState() {
// reset slow score, since serverIsBusyErr will mark the store is slow, and affect remaining test cases.
reachable.injectConstantLiveness(s.cache) // inject reachable liveness.
reachable.injectConstantLiveness(s.cache.stores) // inject reachable liveness.
rc := s.getRegion()
s.NotNil(rc)
for _, store := range rc.getStore().stores {

View File

@ -32,6 +32,7 @@ import (
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
"golang.org/x/sync/singleflight"
"google.golang.org/grpc"
@ -47,39 +48,20 @@ type testingKnobs interface {
setMockRequestLiveness(f livenessFunc)
}
func (c *RegionCache) getMockRequestLiveness() livenessFunc {
f := c.testingKnobs.mockRequestLiveness.Load()
if f == nil {
return nil
}
return *f
}
func (c *RegionCache) setMockRequestLiveness(f livenessFunc) {
c.testingKnobs.mockRequestLiveness.Store(&f)
}
type storeRegistry interface {
fetchStore(ctx context.Context, id uint64) (*metapb.Store, error)
fetchAllStores(ctx context.Context) ([]*metapb.Store, error)
}
func (c *RegionCache) fetchStore(ctx context.Context, id uint64) (*metapb.Store, error) {
return c.pdClient.GetStore(ctx, id)
}
func (c *RegionCache) fetchAllStores(ctx context.Context) ([]*metapb.Store, error) {
return c.pdClient.GetAllStores(ctx)
}
type storeCache interface {
testingKnobs
storeRegistry
getStore(id uint64) (store *Store, exists bool)
getStoreOrInsertDefault(id uint64) *Store
putStore(store *Store)
clearStores()
forEachStore(f func(*Store))
filterStores(dst []*Store, predicate func(*Store) bool) []*Store
get(id uint64) (store *Store, exists bool)
getOrInsertDefault(id uint64) *Store
put(store *Store)
clear()
forEach(f func(*Store))
filter(dst []*Store, predicate func(*Store) bool) []*Store
listTiflashComputeStores() (stores []*Store, needReload bool)
setTiflashComputeStores(stores []*Store)
markTiflashComputeStoresNeedReload()
@ -87,14 +69,65 @@ type storeCache interface {
getCheckStoreEvents() <-chan struct{}
}
func (c *RegionCache) getStore(id uint64) (store *Store, exists bool) {
func newStoreCache(pdClient pd.Client) *storeCacheImpl {
c := &storeCacheImpl{pdClient: pdClient}
c.notifyCheckCh = make(chan struct{}, 1)
c.storeMu.stores = make(map[uint64]*Store)
c.tiflashComputeStoreMu.needReload = true
c.tiflashComputeStoreMu.stores = make([]*Store, 0)
return c
}
type storeCacheImpl struct {
pdClient pd.Client
testingKnobs struct {
// Replace the requestLiveness function for test purpose. Note that in unit tests, if this is not set,
// requestLiveness always returns unreachable.
mockRequestLiveness atomic.Pointer[livenessFunc]
}
notifyCheckCh chan struct{}
storeMu struct {
sync.RWMutex
stores map[uint64]*Store
}
tiflashComputeStoreMu struct {
sync.RWMutex
needReload bool
stores []*Store
}
}
func (c *storeCacheImpl) getMockRequestLiveness() livenessFunc {
f := c.testingKnobs.mockRequestLiveness.Load()
if f == nil {
return nil
}
return *f
}
func (c *storeCacheImpl) setMockRequestLiveness(f livenessFunc) {
c.testingKnobs.mockRequestLiveness.Store(&f)
}
func (c *storeCacheImpl) fetchStore(ctx context.Context, id uint64) (*metapb.Store, error) {
return c.pdClient.GetStore(ctx, id)
}
func (c *storeCacheImpl) fetchAllStores(ctx context.Context) ([]*metapb.Store, error) {
return c.pdClient.GetAllStores(ctx)
}
func (c *storeCacheImpl) get(id uint64) (store *Store, exists bool) {
c.storeMu.RLock()
store, exists = c.storeMu.stores[id]
c.storeMu.RUnlock()
return
}
func (c *RegionCache) getStoreOrInsertDefault(id uint64) *Store {
func (c *storeCacheImpl) getOrInsertDefault(id uint64) *Store {
c.storeMu.Lock()
store, exists := c.storeMu.stores[id]
if !exists {
@ -105,19 +138,19 @@ func (c *RegionCache) getStoreOrInsertDefault(id uint64) *Store {
return store
}
func (c *RegionCache) putStore(store *Store) {
func (c *storeCacheImpl) put(store *Store) {
c.storeMu.Lock()
c.storeMu.stores[store.storeID] = store
c.storeMu.Unlock()
}
func (c *RegionCache) clearStores() {
func (c *storeCacheImpl) clear() {
c.storeMu.Lock()
c.storeMu.stores = make(map[uint64]*Store)
c.storeMu.Unlock()
}
func (c *RegionCache) forEachStore(f func(*Store)) {
func (c *storeCacheImpl) forEach(f func(*Store)) {
c.storeMu.RLock()
defer c.storeMu.RUnlock()
for _, s := range c.storeMu.stores {
@ -125,7 +158,7 @@ func (c *RegionCache) forEachStore(f func(*Store)) {
}
}
func (c *RegionCache) filterStores(dst []*Store, predicate func(*Store) bool) []*Store {
func (c *storeCacheImpl) filter(dst []*Store, predicate func(*Store) bool) []*Store {
c.storeMu.RLock()
for _, store := range c.storeMu.stores {
if predicate == nil || predicate(store) {
@ -136,7 +169,7 @@ func (c *RegionCache) filterStores(dst []*Store, predicate func(*Store) bool) []
return dst
}
func (c *RegionCache) listTiflashComputeStores() (stores []*Store, needReload bool) {
func (c *storeCacheImpl) listTiflashComputeStores() (stores []*Store, needReload bool) {
c.tiflashComputeStoreMu.RLock()
needReload = c.tiflashComputeStoreMu.needReload
stores = c.tiflashComputeStoreMu.stores
@ -144,20 +177,20 @@ func (c *RegionCache) listTiflashComputeStores() (stores []*Store, needReload bo
return
}
func (c *RegionCache) setTiflashComputeStores(stores []*Store) {
func (c *storeCacheImpl) setTiflashComputeStores(stores []*Store) {
c.tiflashComputeStoreMu.Lock()
c.tiflashComputeStoreMu.stores = stores
c.tiflashComputeStoreMu.needReload = false
c.tiflashComputeStoreMu.Unlock()
}
func (c *RegionCache) markTiflashComputeStoresNeedReload() {
func (c *storeCacheImpl) markTiflashComputeStoresNeedReload() {
c.tiflashComputeStoreMu.Lock()
c.tiflashComputeStoreMu.needReload = true
c.tiflashComputeStoreMu.Unlock()
}
func (c *RegionCache) markStoreNeedCheck(store *Store) {
func (c *storeCacheImpl) markStoreNeedCheck(store *Store) {
if store.changeResolveStateTo(resolved, needCheck) {
select {
case c.notifyCheckCh <- struct{}{}:
@ -166,7 +199,7 @@ func (c *RegionCache) markStoreNeedCheck(store *Store) {
}
}
func (c *RegionCache) getCheckStoreEvents() <-chan struct{} {
func (c *storeCacheImpl) getCheckStoreEvents() <-chan struct{} {
return c.notifyCheckCh
}
@ -435,7 +468,7 @@ func (s *Store) reResolve(c storeCache) (bool, error) {
if s.addr == addr {
newStore.healthStatus = s.healthStatus
}
c.putStore(newStore)
c.put(newStore)
s.setResolveState(deleted)
return false, nil
}
@ -536,7 +569,7 @@ func (s *Store) getLivenessState() livenessState {
return livenessState(atomic.LoadUint32(&s.livenessState))
}
func (s *Store) requestLivenessAndStartHealthCheckLoopIfNeeded(bo *retry.Backoffer, c *RegionCache) (liveness livenessState) {
func (s *Store) requestLivenessAndStartHealthCheckLoopIfNeeded(bo *retry.Backoffer, scheduler *bgRunner, c storeCache) (liveness livenessState) {
liveness = requestLiveness(bo.GetCtx(), s, c)
if liveness == reachable {
return
@ -560,15 +593,15 @@ func (s *Store) requestLivenessAndStartHealthCheckLoopIfNeeded(bo *retry.Backoff
if _, err := util.EvalFailpoint("skipStoreCheckUntilHealth"); err == nil {
return
}
startHealthCheckLoop(c, s, liveness, reResolveInterval)
startHealthCheckLoop(scheduler, c, s, liveness, reResolveInterval)
}
return
}
func startHealthCheckLoop(c *RegionCache, s *Store, liveness livenessState, reResolveInterval time.Duration) {
func startHealthCheckLoop(scheduler *bgRunner, c storeCache, s *Store, liveness livenessState, reResolveInterval time.Duration) {
lastCheckPDTime := time.Now()
c.bg.schedule(func(ctx context.Context, t time.Time) bool {
scheduler.schedule(func(ctx context.Context, t time.Time) bool {
if t.Sub(lastCheckPDTime) > reResolveInterval {
lastCheckPDTime = t
@ -584,7 +617,7 @@ func startHealthCheckLoop(c *RegionCache, s *Store, liveness livenessState, reRe
return true
}
// if the store is deleted, a new store with same id must be inserted (guaranteed by reResolve).
newStore, _ := c.getStore(s.storeID)
newStore, _ := c.get(s.storeID)
logutil.BgLogger().Info("[health check] store meta changed",
zap.Uint64("storeID", s.storeID),
zap.String("oldAddr", s.addr),