diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 1454c629..8581dc5b 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -177,6 +177,9 @@ type regionStore struct { // buckets is not accurate and it can change even if the region is not changed. // It can be stale and buckets keys can be out of the region range. buckets *metapb.Buckets + // record all storeIDs on which pending peers reside. + // key is storeID, val is peerID. + pendingTiFlashPeerStores map[uint64]uint64 } func (r *regionStore) accessStore(mode accessMode, idx AccessIndex) (int, *Store) { @@ -269,11 +272,12 @@ func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Regio // regionStore pull used store from global store map // to avoid acquire storeMu in later access. rs := ®ionStore{ - workTiKVIdx: 0, - proxyTiKVIdx: -1, - stores: make([]*Store, 0, len(r.meta.Peers)), - storeEpochs: make([]uint32, 0, len(r.meta.Peers)), - buckets: pdRegion.Buckets, + workTiKVIdx: 0, + proxyTiKVIdx: -1, + stores: make([]*Store, 0, len(r.meta.Peers)), + pendingTiFlashPeerStores: map[uint64]uint64{}, + storeEpochs: make([]uint32, 0, len(r.meta.Peers)), + buckets: pdRegion.Buckets, } leader := pdRegion.Leader @@ -314,6 +318,11 @@ func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Regio } rs.stores = append(rs.stores, store) rs.storeEpochs = append(rs.storeEpochs, atomic.LoadUint32(&store.epoch)) + for _, pendingPeer := range pdRegion.PendingPeers { + if pendingPeer.Id == p.Id { + rs.pendingTiFlashPeerStores[store.storeID] = p.Id + } + } } // TODO(youjiali1995): It's possible the region info in PD is stale for now but it can recover. // Maybe we need backoff here. @@ -780,23 +789,26 @@ func (c *RegionCache) GetTiKVRPCContext(bo *retry.Backoffer, id RegionVerID, rep } // GetAllValidTiFlashStores returns the store ids of all valid TiFlash stores, the store id of currentStore is always the first one -func (c *RegionCache) GetAllValidTiFlashStores(id RegionVerID, currentStore *Store, labelFilter LabelFilter) []uint64 { +// Caller may use `nonPendingStores` first, this can avoid task need to wait tiflash replica syncing from tikv. +// But if all tiflash peers are pending(len(nonPendingStores) == 0), use `allStores` is also ok. +func (c *RegionCache) GetAllValidTiFlashStores(id RegionVerID, currentStore *Store, labelFilter LabelFilter) ([]uint64, []uint64) { // set the cap to 2 because usually, TiFlash table will have 2 replicas allStores := make([]uint64, 0, 2) + nonPendingStores := make([]uint64, 0, 2) // make sure currentStore id is always the first in allStores allStores = append(allStores, currentStore.storeID) ts := time.Now().Unix() cachedRegion := c.GetCachedRegionWithRLock(id) if cachedRegion == nil { - return allStores + return allStores, nonPendingStores } if !cachedRegion.checkRegionCacheTTL(ts) { - return allStores + return allStores, nonPendingStores } regionStore := cachedRegion.getStore() currentIndex := regionStore.getAccessIndex(tiFlashOnly, currentStore) if currentIndex == -1 { - return allStores + return allStores, nonPendingStores } for startOffset := 1; startOffset < regionStore.accessStoreNum(tiFlashOnly); startOffset++ { accessIdx := AccessIndex((int(currentIndex) + startOffset) % regionStore.accessStoreNum(tiFlashOnly)) @@ -813,7 +825,12 @@ func (c *RegionCache) GetAllValidTiFlashStores(id RegionVerID, currentStore *Sto } allStores = append(allStores, store.storeID) } - return allStores + for _, storeID := range allStores { + if _, ok := regionStore.pendingTiFlashPeerStores[storeID]; !ok { + nonPendingStores = append(nonPendingStores, storeID) + } + } + return allStores, nonPendingStores } // GetTiFlashRPCContext returns RPCContext for a region must access flash store. If it returns nil, the region