region_cache: support check pending tiflash peer (#821)

Signed-off-by: guo-shaoge <shaoge1994@163.com>
Co-authored-by: disksing <i@disksing.com>
This commit is contained in:
guo-shaoge 2023-07-13 14:08:45 +08:00 committed by GitHub
parent e540aa3b96
commit 51633ada95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 27 additions and 10 deletions

View File

@ -177,6 +177,9 @@ type regionStore struct {
// buckets is not accurate and it can change even if the region is not changed.
// It can be stale and buckets keys can be out of the region range.
buckets *metapb.Buckets
// record all storeIDs on which pending peers reside.
// key is storeID, val is peerID.
pendingTiFlashPeerStores map[uint64]uint64
}
func (r *regionStore) accessStore(mode accessMode, idx AccessIndex) (int, *Store) {
@ -269,11 +272,12 @@ func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Regio
// regionStore pull used store from global store map
// to avoid acquire storeMu in later access.
rs := &regionStore{
workTiKVIdx: 0,
proxyTiKVIdx: -1,
stores: make([]*Store, 0, len(r.meta.Peers)),
storeEpochs: make([]uint32, 0, len(r.meta.Peers)),
buckets: pdRegion.Buckets,
workTiKVIdx: 0,
proxyTiKVIdx: -1,
stores: make([]*Store, 0, len(r.meta.Peers)),
pendingTiFlashPeerStores: map[uint64]uint64{},
storeEpochs: make([]uint32, 0, len(r.meta.Peers)),
buckets: pdRegion.Buckets,
}
leader := pdRegion.Leader
@ -314,6 +318,11 @@ func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Regio
}
rs.stores = append(rs.stores, store)
rs.storeEpochs = append(rs.storeEpochs, atomic.LoadUint32(&store.epoch))
for _, pendingPeer := range pdRegion.PendingPeers {
if pendingPeer.Id == p.Id {
rs.pendingTiFlashPeerStores[store.storeID] = p.Id
}
}
}
// TODO(youjiali1995): It's possible the region info in PD is stale for now but it can recover.
// Maybe we need backoff here.
@ -780,23 +789,26 @@ func (c *RegionCache) GetTiKVRPCContext(bo *retry.Backoffer, id RegionVerID, rep
}
// GetAllValidTiFlashStores returns the store ids of all valid TiFlash stores, the store id of currentStore is always the first one
func (c *RegionCache) GetAllValidTiFlashStores(id RegionVerID, currentStore *Store, labelFilter LabelFilter) []uint64 {
// Caller may use `nonPendingStores` first, this can avoid task need to wait tiflash replica syncing from tikv.
// But if all tiflash peers are pending(len(nonPendingStores) == 0), use `allStores` is also ok.
func (c *RegionCache) GetAllValidTiFlashStores(id RegionVerID, currentStore *Store, labelFilter LabelFilter) ([]uint64, []uint64) {
// set the cap to 2 because usually, TiFlash table will have 2 replicas
allStores := make([]uint64, 0, 2)
nonPendingStores := make([]uint64, 0, 2)
// make sure currentStore id is always the first in allStores
allStores = append(allStores, currentStore.storeID)
ts := time.Now().Unix()
cachedRegion := c.GetCachedRegionWithRLock(id)
if cachedRegion == nil {
return allStores
return allStores, nonPendingStores
}
if !cachedRegion.checkRegionCacheTTL(ts) {
return allStores
return allStores, nonPendingStores
}
regionStore := cachedRegion.getStore()
currentIndex := regionStore.getAccessIndex(tiFlashOnly, currentStore)
if currentIndex == -1 {
return allStores
return allStores, nonPendingStores
}
for startOffset := 1; startOffset < regionStore.accessStoreNum(tiFlashOnly); startOffset++ {
accessIdx := AccessIndex((int(currentIndex) + startOffset) % regionStore.accessStoreNum(tiFlashOnly))
@ -813,7 +825,12 @@ func (c *RegionCache) GetAllValidTiFlashStores(id RegionVerID, currentStore *Sto
}
allStores = append(allStores, store.storeID)
}
return allStores
for _, storeID := range allStores {
if _, ok := regionStore.pendingTiFlashPeerStores[storeID]; !ok {
nonPendingStores = append(nonPendingStores, storeID)
}
}
return allStores, nonPendingStores
}
// GetTiFlashRPCContext returns RPCContext for a region must access flash store. If it returns nil, the region