mirror of https://github.com/tikv/client-go.git
region_cache: more comments and minor refactor (#427)
Signed-off-by: youjiali1995 <zlwgx1023@gmail.com>
This commit is contained in:
parent
79b962a84d
commit
f43cc3acc0
|
|
@ -126,12 +126,19 @@ type AccessIndex int
|
|||
// regionStore represents region stores info
|
||||
// it will be store as unsafe.Pointer and be load at once
|
||||
type regionStore struct {
|
||||
workTiKVIdx AccessIndex // point to current work peer in meta.Peers and work store in stores(same idx) for tikv peer
|
||||
proxyTiKVIdx AccessIndex // point to the tikv peer that can forward requests to the leader. -1 means not using proxy
|
||||
workTiFlashIdx int32 // point to current work peer in meta.Peers and work store in stores(same idx) for tiflash peer
|
||||
stores []*Store // stores in this region
|
||||
storeEpochs []uint32 // snapshots of store's epoch, need reload when `storeEpochs[curr] != stores[cur].fail`
|
||||
accessIndex [numAccessMode][]int // AccessMode => idx in stores
|
||||
// corresponding stores(in the same order) of Region.meta.Peers in this region.
|
||||
stores []*Store
|
||||
// snapshots of store's epoch, need reload when `storeEpochs[curr] != stores[cur].fail`
|
||||
storeEpochs []uint32
|
||||
// A region can consist of stores with different type(TiKV and TiFlash). It maintains AccessMode => idx in stores,
|
||||
// e.g., stores[accessIndex[tiKVOnly][workTiKVIdx]] is the current working TiKV.
|
||||
accessIndex [numAccessMode][]int
|
||||
// accessIndex[tiKVOnly][workTiKVIdx] is the index of the current working TiKV in stores.
|
||||
workTiKVIdx AccessIndex
|
||||
// accessIndex[tiKVOnly][proxyTiKVIdx] is the index of TiKV that can forward requests to the leader in stores, -1 means not using proxy.
|
||||
proxyTiKVIdx AccessIndex
|
||||
// accessIndex[tiFlashOnly][workTiFlashIdx] is the index of the current working TiFlash in stores.
|
||||
workTiFlashIdx int32
|
||||
}
|
||||
|
||||
func (r *regionStore) accessStore(mode accessMode, idx AccessIndex) (int, *Store) {
|
||||
|
|
@ -155,6 +162,7 @@ func (r *regionStore) accessStoreNum(mode accessMode) int {
|
|||
// clone clones region store struct.
|
||||
func (r *regionStore) clone() *regionStore {
|
||||
storeEpochs := make([]uint32, len(r.stores))
|
||||
copy(storeEpochs, r.storeEpochs)
|
||||
rs := ®ionStore{
|
||||
workTiFlashIdx: r.workTiFlashIdx,
|
||||
proxyTiKVIdx: r.proxyTiKVIdx,
|
||||
|
|
@ -162,7 +170,6 @@ func (r *regionStore) clone() *regionStore {
|
|||
stores: r.stores,
|
||||
storeEpochs: storeEpochs,
|
||||
}
|
||||
copy(storeEpochs, r.storeEpochs)
|
||||
for i := 0; i < int(numAccessMode); i++ {
|
||||
rs.accessIndex[i] = make([]int, len(r.accessIndex[i]))
|
||||
copy(rs.accessIndex[i], r.accessIndex[i])
|
||||
|
|
@ -218,9 +225,9 @@ func (r *regionStore) filterStoreCandidate(aidx AccessIndex, op *storeSelectorOp
|
|||
return s.IsLabelsMatch(op.labels)
|
||||
}
|
||||
|
||||
// init initializes region after constructed.
|
||||
func (r *Region) init(bo *retry.Backoffer, c *RegionCache) error {
|
||||
// region store pull used store from global store map
|
||||
func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Region, error) {
|
||||
r := &Region{meta: pdRegion.Meta}
|
||||
// regionStore pull used store from global store map
|
||||
// to avoid acquire storeMu in later access.
|
||||
rs := ®ionStore{
|
||||
workTiKVIdx: 0,
|
||||
|
|
@ -229,6 +236,9 @@ func (r *Region) init(bo *retry.Backoffer, c *RegionCache) error {
|
|||
stores: make([]*Store, 0, len(r.meta.Peers)),
|
||||
storeEpochs: make([]uint32, 0, len(r.meta.Peers)),
|
||||
}
|
||||
|
||||
leader := pdRegion.Leader
|
||||
var leaderAccessIdx AccessIndex
|
||||
availablePeers := r.meta.GetPeers()[:0]
|
||||
for _, p := range r.meta.Peers {
|
||||
c.storeMu.RLock()
|
||||
|
|
@ -239,12 +249,15 @@ func (r *Region) init(bo *retry.Backoffer, c *RegionCache) error {
|
|||
}
|
||||
addr, err := store.initResolve(bo, c)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
// Filter the peer on a tombstone store.
|
||||
if addr == "" {
|
||||
continue
|
||||
}
|
||||
if isSamePeer(p, leader) {
|
||||
leaderAccessIdx = AccessIndex(len(rs.accessIndex[tiKVOnly]))
|
||||
}
|
||||
availablePeers = append(availablePeers, p)
|
||||
switch store.storeType {
|
||||
case tikvrpc.TiKV:
|
||||
|
|
@ -258,15 +271,16 @@ func (r *Region) init(bo *retry.Backoffer, c *RegionCache) error {
|
|||
// TODO(youjiali1995): It's possible the region info in PD is stale for now but it can recover.
|
||||
// Maybe we need backoff here.
|
||||
if len(availablePeers) == 0 {
|
||||
return errors.Errorf("no available peers, region: {%v}", r.meta)
|
||||
return nil, errors.Errorf("no available peers, region: {%v}", r.meta)
|
||||
}
|
||||
rs.workTiKVIdx = leaderAccessIdx
|
||||
r.meta.Peers = availablePeers
|
||||
|
||||
atomic.StorePointer(&r.store, unsafe.Pointer(rs))
|
||||
|
||||
// mark region has been init accessed.
|
||||
r.lastAccess = time.Now().Unix()
|
||||
return nil
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func (r *Region) getStore() (store *regionStore) {
|
||||
|
|
@ -781,7 +795,6 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey
|
|||
|
||||
// OnSendFailForTiFlash handles send request fail logic for tiflash.
|
||||
func (c *RegionCache) OnSendFailForTiFlash(bo *retry.Backoffer, store *Store, region RegionVerID, prev *metapb.Region, scheduleReload bool, err error, skipSwitchPeerLog bool) {
|
||||
|
||||
r := c.GetCachedRegionWithRLock(region)
|
||||
if r == nil {
|
||||
return
|
||||
|
|
@ -1064,7 +1077,7 @@ func (c *RegionCache) UpdateLeader(regionID RegionVerID, leader *metapb.Peer, cu
|
|||
return
|
||||
}
|
||||
|
||||
if !c.switchWorkLeaderToPeer(r, leader) {
|
||||
if !r.switchWorkLeaderToPeer(leader) {
|
||||
logutil.BgLogger().Info("invalidate region cache due to cannot find peer when updating leader",
|
||||
zap.Uint64("regionID", regionID.GetID()),
|
||||
zap.Int("currIdx", int(currentPeerIdx)),
|
||||
|
|
@ -1274,15 +1287,7 @@ func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool)
|
|||
searchPrev = true
|
||||
continue
|
||||
}
|
||||
region := &Region{meta: reg.Meta}
|
||||
err = region.init(bo, c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if reg.Leader != nil {
|
||||
c.switchWorkLeaderToPeer(region, reg.Leader)
|
||||
}
|
||||
return region, nil
|
||||
return newRegion(bo, c, reg)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1322,15 +1327,7 @@ func (c *RegionCache) loadRegionByID(bo *retry.Backoffer, regionID uint64) (*Reg
|
|||
if len(reg.Meta.Peers) == 0 {
|
||||
return nil, errors.New("receive Region with no available peer")
|
||||
}
|
||||
region := &Region{meta: reg.Meta}
|
||||
err = region.init(bo, c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if reg.Leader != nil {
|
||||
c.switchWorkLeaderToPeer(region, reg.Leader)
|
||||
}
|
||||
return region, nil
|
||||
return newRegion(bo, c, reg)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1376,17 +1373,15 @@ func (c *RegionCache) scanRegions(bo *retry.Backoffer, startKey, endKey []byte,
|
|||
}
|
||||
regions := make([]*Region, 0, len(regionsInfo))
|
||||
for _, r := range regionsInfo {
|
||||
region := &Region{meta: r.Meta}
|
||||
err := region.init(bo, c)
|
||||
// Leader id = 0 indicates no leader.
|
||||
if r.Leader == nil || r.Leader.GetId() == 0 {
|
||||
continue
|
||||
}
|
||||
region, err := newRegion(bo, c, r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
leader := r.Leader
|
||||
// Leader id = 0 indicates no leader.
|
||||
if leader != nil && leader.GetId() != 0 {
|
||||
c.switchWorkLeaderToPeer(region, leader)
|
||||
regions = append(regions, region)
|
||||
}
|
||||
regions = append(regions, region)
|
||||
}
|
||||
if len(regions) == 0 {
|
||||
return nil, errors.New("receive Regions with no peer")
|
||||
|
|
@ -1548,8 +1543,7 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *retry.Backoffer, ctx *RPCContext
|
|||
return false, errors.Errorf("newRegion's range key is not encoded: %v, %v", meta, err)
|
||||
}
|
||||
}
|
||||
region := &Region{meta: meta}
|
||||
err := region.init(bo, c)
|
||||
region, err := newRegion(bo, c, &pd.Region{Meta: meta})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
|
@ -1559,7 +1553,7 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *retry.Backoffer, ctx *RPCContext
|
|||
} else {
|
||||
initLeaderStoreID = ctx.Store.storeID
|
||||
}
|
||||
c.switchWorkLeaderToPeer(region, region.getPeerOnStore(initLeaderStoreID))
|
||||
region.switchWorkLeaderToPeer(region.getPeerOnStore(initLeaderStoreID))
|
||||
newRegions = append(newRegions, region)
|
||||
if ctx.Region == region.VerID() {
|
||||
needInvalidateOld = false
|
||||
|
|
@ -1728,10 +1722,24 @@ func (r *Region) EndKey() []byte {
|
|||
return r.meta.EndKey
|
||||
}
|
||||
|
||||
func (r *Region) getPeerStoreIndex(peer *metapb.Peer) (idx int, found bool) {
|
||||
if len(r.meta.Peers) == 0 || peer == nil {
|
||||
return
|
||||
}
|
||||
for i, p := range r.meta.Peers {
|
||||
if isSamePeer(p, peer) {
|
||||
idx = i
|
||||
found = true
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// switchWorkLeaderToPeer switches current store to the one on specific store. It returns
|
||||
// false if no peer matches the peer.
|
||||
func (c *RegionCache) switchWorkLeaderToPeer(r *Region, peer *metapb.Peer) (found bool) {
|
||||
globalStoreIdx, found := c.getPeerStoreIndex(r, peer)
|
||||
func (r *Region) switchWorkLeaderToPeer(peer *metapb.Peer) (found bool) {
|
||||
globalStoreIdx, found := r.getPeerStoreIndex(peer)
|
||||
if !found {
|
||||
return
|
||||
}
|
||||
|
|
@ -1811,20 +1819,6 @@ func (r *Region) getPeerOnStore(storeID uint64) *metapb.Peer {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *RegionCache) getPeerStoreIndex(r *Region, peer *metapb.Peer) (idx int, found bool) {
|
||||
if len(r.meta.Peers) == 0 || peer == nil {
|
||||
return
|
||||
}
|
||||
for i, p := range r.meta.Peers {
|
||||
if isSamePeer(p, peer) {
|
||||
idx = i
|
||||
found = true
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Contains checks whether the key is in the region, for the maximum region endKey is empty.
|
||||
// startKey <= key < endKey.
|
||||
func (r *Region) Contains(key []byte) bool {
|
||||
|
|
@ -2226,7 +2220,7 @@ func createKVHealthClient(ctx context.Context, addr string) (*grpc.ClientConn, h
|
|||
|
||||
cfg := config.GetGlobalConfig()
|
||||
|
||||
opt := grpc.WithInsecure() //nolint
|
||||
opt := grpc.WithInsecure()
|
||||
if len(cfg.Security.ClusterSSLCA) != 0 {
|
||||
tlsConfig, err := cfg.Security.ToTLSConfig()
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -1263,8 +1263,7 @@ func (s *testRegionCacheSuite) TestPeersLenChange() {
|
|||
DownPeers: []*metapb.Peer{{Id: s.peer1, StoreId: s.store1}},
|
||||
}
|
||||
filterUnavailablePeers(cpRegion)
|
||||
region := &Region{meta: cpRegion.Meta}
|
||||
err = region.init(s.bo, s.cache)
|
||||
region, err := newRegion(s.bo, s.cache, cpRegion)
|
||||
s.Nil(err)
|
||||
s.cache.insertRegionToCache(region)
|
||||
|
||||
|
|
|
|||
|
|
@ -387,7 +387,7 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (
|
|||
}
|
||||
|
||||
func (state *tryFollower) onSendSuccess(selector *replicaSelector) {
|
||||
if !selector.regionCache.switchWorkLeaderToPeer(selector.region, selector.targetReplica().peer) {
|
||||
if !selector.region.switchWorkLeaderToPeer(selector.targetReplica().peer) {
|
||||
panic("the store must exist")
|
||||
}
|
||||
}
|
||||
|
|
@ -822,7 +822,7 @@ func (s *replicaSelector) updateLeader(leader *metapb.Peer) {
|
|||
}
|
||||
s.state = &accessKnownLeader{leaderIdx: AccessIndex(i)}
|
||||
// Update the workTiKVIdx so that following requests can be sent to the leader immediately.
|
||||
if !s.regionCache.switchWorkLeaderToPeer(s.region, leader) {
|
||||
if !s.region.switchWorkLeaderToPeer(leader) {
|
||||
panic("the store must exist")
|
||||
}
|
||||
logutil.BgLogger().Debug("switch region leader to specific leader due to kv return NotLeader",
|
||||
|
|
|
|||
Loading…
Reference in New Issue