diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index d8907ab8..c54ef7f3 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -126,12 +126,19 @@ type AccessIndex int // regionStore represents region stores info // it will be store as unsafe.Pointer and be load at once type regionStore struct { - workTiKVIdx AccessIndex // point to current work peer in meta.Peers and work store in stores(same idx) for tikv peer - proxyTiKVIdx AccessIndex // point to the tikv peer that can forward requests to the leader. -1 means not using proxy - workTiFlashIdx int32 // point to current work peer in meta.Peers and work store in stores(same idx) for tiflash peer - stores []*Store // stores in this region - storeEpochs []uint32 // snapshots of store's epoch, need reload when `storeEpochs[curr] != stores[cur].fail` - accessIndex [numAccessMode][]int // AccessMode => idx in stores + // corresponding stores(in the same order) of Region.meta.Peers in this region. + stores []*Store + // snapshots of store's epoch, need reload when `storeEpochs[curr] != stores[cur].fail` + storeEpochs []uint32 + // A region can consist of stores with different type(TiKV and TiFlash). It maintains AccessMode => idx in stores, + // e.g., stores[accessIndex[tiKVOnly][workTiKVIdx]] is the current working TiKV. + accessIndex [numAccessMode][]int + // accessIndex[tiKVOnly][workTiKVIdx] is the index of the current working TiKV in stores. + workTiKVIdx AccessIndex + // accessIndex[tiKVOnly][proxyTiKVIdx] is the index of TiKV that can forward requests to the leader in stores, -1 means not using proxy. + proxyTiKVIdx AccessIndex + // accessIndex[tiFlashOnly][workTiFlashIdx] is the index of the current working TiFlash in stores. + workTiFlashIdx int32 } func (r *regionStore) accessStore(mode accessMode, idx AccessIndex) (int, *Store) { @@ -155,6 +162,7 @@ func (r *regionStore) accessStoreNum(mode accessMode) int { // clone clones region store struct. func (r *regionStore) clone() *regionStore { storeEpochs := make([]uint32, len(r.stores)) + copy(storeEpochs, r.storeEpochs) rs := ®ionStore{ workTiFlashIdx: r.workTiFlashIdx, proxyTiKVIdx: r.proxyTiKVIdx, @@ -162,7 +170,6 @@ func (r *regionStore) clone() *regionStore { stores: r.stores, storeEpochs: storeEpochs, } - copy(storeEpochs, r.storeEpochs) for i := 0; i < int(numAccessMode); i++ { rs.accessIndex[i] = make([]int, len(r.accessIndex[i])) copy(rs.accessIndex[i], r.accessIndex[i]) @@ -218,9 +225,9 @@ func (r *regionStore) filterStoreCandidate(aidx AccessIndex, op *storeSelectorOp return s.IsLabelsMatch(op.labels) } -// init initializes region after constructed. -func (r *Region) init(bo *retry.Backoffer, c *RegionCache) error { - // region store pull used store from global store map +func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Region, error) { + r := &Region{meta: pdRegion.Meta} + // regionStore pull used store from global store map // to avoid acquire storeMu in later access. rs := ®ionStore{ workTiKVIdx: 0, @@ -229,6 +236,9 @@ func (r *Region) init(bo *retry.Backoffer, c *RegionCache) error { stores: make([]*Store, 0, len(r.meta.Peers)), storeEpochs: make([]uint32, 0, len(r.meta.Peers)), } + + leader := pdRegion.Leader + var leaderAccessIdx AccessIndex availablePeers := r.meta.GetPeers()[:0] for _, p := range r.meta.Peers { c.storeMu.RLock() @@ -239,12 +249,15 @@ func (r *Region) init(bo *retry.Backoffer, c *RegionCache) error { } addr, err := store.initResolve(bo, c) if err != nil { - return err + return nil, err } // Filter the peer on a tombstone store. if addr == "" { continue } + if isSamePeer(p, leader) { + leaderAccessIdx = AccessIndex(len(rs.accessIndex[tiKVOnly])) + } availablePeers = append(availablePeers, p) switch store.storeType { case tikvrpc.TiKV: @@ -258,15 +271,16 @@ func (r *Region) init(bo *retry.Backoffer, c *RegionCache) error { // TODO(youjiali1995): It's possible the region info in PD is stale for now but it can recover. // Maybe we need backoff here. if len(availablePeers) == 0 { - return errors.Errorf("no available peers, region: {%v}", r.meta) + return nil, errors.Errorf("no available peers, region: {%v}", r.meta) } + rs.workTiKVIdx = leaderAccessIdx r.meta.Peers = availablePeers atomic.StorePointer(&r.store, unsafe.Pointer(rs)) // mark region has been init accessed. r.lastAccess = time.Now().Unix() - return nil + return r, nil } func (r *Region) getStore() (store *regionStore) { @@ -781,7 +795,6 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey // OnSendFailForTiFlash handles send request fail logic for tiflash. func (c *RegionCache) OnSendFailForTiFlash(bo *retry.Backoffer, store *Store, region RegionVerID, prev *metapb.Region, scheduleReload bool, err error, skipSwitchPeerLog bool) { - r := c.GetCachedRegionWithRLock(region) if r == nil { return @@ -1064,7 +1077,7 @@ func (c *RegionCache) UpdateLeader(regionID RegionVerID, leader *metapb.Peer, cu return } - if !c.switchWorkLeaderToPeer(r, leader) { + if !r.switchWorkLeaderToPeer(leader) { logutil.BgLogger().Info("invalidate region cache due to cannot find peer when updating leader", zap.Uint64("regionID", regionID.GetID()), zap.Int("currIdx", int(currentPeerIdx)), @@ -1274,15 +1287,7 @@ func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool) searchPrev = true continue } - region := &Region{meta: reg.Meta} - err = region.init(bo, c) - if err != nil { - return nil, err - } - if reg.Leader != nil { - c.switchWorkLeaderToPeer(region, reg.Leader) - } - return region, nil + return newRegion(bo, c, reg) } } @@ -1322,15 +1327,7 @@ func (c *RegionCache) loadRegionByID(bo *retry.Backoffer, regionID uint64) (*Reg if len(reg.Meta.Peers) == 0 { return nil, errors.New("receive Region with no available peer") } - region := &Region{meta: reg.Meta} - err = region.init(bo, c) - if err != nil { - return nil, err - } - if reg.Leader != nil { - c.switchWorkLeaderToPeer(region, reg.Leader) - } - return region, nil + return newRegion(bo, c, reg) } } @@ -1376,17 +1373,15 @@ func (c *RegionCache) scanRegions(bo *retry.Backoffer, startKey, endKey []byte, } regions := make([]*Region, 0, len(regionsInfo)) for _, r := range regionsInfo { - region := &Region{meta: r.Meta} - err := region.init(bo, c) + // Leader id = 0 indicates no leader. + if r.Leader == nil || r.Leader.GetId() == 0 { + continue + } + region, err := newRegion(bo, c, r) if err != nil { return nil, err } - leader := r.Leader - // Leader id = 0 indicates no leader. - if leader != nil && leader.GetId() != 0 { - c.switchWorkLeaderToPeer(region, leader) - regions = append(regions, region) - } + regions = append(regions, region) } if len(regions) == 0 { return nil, errors.New("receive Regions with no peer") @@ -1548,8 +1543,7 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *retry.Backoffer, ctx *RPCContext return false, errors.Errorf("newRegion's range key is not encoded: %v, %v", meta, err) } } - region := &Region{meta: meta} - err := region.init(bo, c) + region, err := newRegion(bo, c, &pd.Region{Meta: meta}) if err != nil { return false, err } @@ -1559,7 +1553,7 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *retry.Backoffer, ctx *RPCContext } else { initLeaderStoreID = ctx.Store.storeID } - c.switchWorkLeaderToPeer(region, region.getPeerOnStore(initLeaderStoreID)) + region.switchWorkLeaderToPeer(region.getPeerOnStore(initLeaderStoreID)) newRegions = append(newRegions, region) if ctx.Region == region.VerID() { needInvalidateOld = false @@ -1728,10 +1722,24 @@ func (r *Region) EndKey() []byte { return r.meta.EndKey } +func (r *Region) getPeerStoreIndex(peer *metapb.Peer) (idx int, found bool) { + if len(r.meta.Peers) == 0 || peer == nil { + return + } + for i, p := range r.meta.Peers { + if isSamePeer(p, peer) { + idx = i + found = true + return + } + } + return +} + // switchWorkLeaderToPeer switches current store to the one on specific store. It returns // false if no peer matches the peer. -func (c *RegionCache) switchWorkLeaderToPeer(r *Region, peer *metapb.Peer) (found bool) { - globalStoreIdx, found := c.getPeerStoreIndex(r, peer) +func (r *Region) switchWorkLeaderToPeer(peer *metapb.Peer) (found bool) { + globalStoreIdx, found := r.getPeerStoreIndex(peer) if !found { return } @@ -1811,20 +1819,6 @@ func (r *Region) getPeerOnStore(storeID uint64) *metapb.Peer { return nil } -func (c *RegionCache) getPeerStoreIndex(r *Region, peer *metapb.Peer) (idx int, found bool) { - if len(r.meta.Peers) == 0 || peer == nil { - return - } - for i, p := range r.meta.Peers { - if isSamePeer(p, peer) { - idx = i - found = true - return - } - } - return -} - // Contains checks whether the key is in the region, for the maximum region endKey is empty. // startKey <= key < endKey. func (r *Region) Contains(key []byte) bool { @@ -2226,7 +2220,7 @@ func createKVHealthClient(ctx context.Context, addr string) (*grpc.ClientConn, h cfg := config.GetGlobalConfig() - opt := grpc.WithInsecure() //nolint + opt := grpc.WithInsecure() if len(cfg.Security.ClusterSSLCA) != 0 { tlsConfig, err := cfg.Security.ToTLSConfig() if err != nil { diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index 91f18dc1..ddd3c925 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -1263,8 +1263,7 @@ func (s *testRegionCacheSuite) TestPeersLenChange() { DownPeers: []*metapb.Peer{{Id: s.peer1, StoreId: s.store1}}, } filterUnavailablePeers(cpRegion) - region := &Region{meta: cpRegion.Meta} - err = region.init(s.bo, s.cache) + region, err := newRegion(s.bo, s.cache, cpRegion) s.Nil(err) s.cache.insertRegionToCache(region) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 10f25913..8c5ba21a 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -387,7 +387,7 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) ( } func (state *tryFollower) onSendSuccess(selector *replicaSelector) { - if !selector.regionCache.switchWorkLeaderToPeer(selector.region, selector.targetReplica().peer) { + if !selector.region.switchWorkLeaderToPeer(selector.targetReplica().peer) { panic("the store must exist") } } @@ -822,7 +822,7 @@ func (s *replicaSelector) updateLeader(leader *metapb.Peer) { } s.state = &accessKnownLeader{leaderIdx: AccessIndex(i)} // Update the workTiKVIdx so that following requests can be sent to the leader immediately. - if !s.regionCache.switchWorkLeaderToPeer(s.region, leader) { + if !s.region.switchWorkLeaderToPeer(leader) { panic("the store must exist") } logutil.BgLogger().Debug("switch region leader to specific leader due to kv return NotLeader",