mirror of https://github.com/tikv/client-go.git
reload region cache when store is resolved from invalid status (#843)
Signed-off-by: you06 <you1474600@gmail.com> Co-authored-by: disksing <i@disksing.com>
This commit is contained in:
parent
51aab264f6
commit
85fc8f3375
|
|
@ -246,7 +246,7 @@ type ErrAssertionFailed struct {
|
|||
*kvrpcpb.AssertionFailed
|
||||
}
|
||||
|
||||
// ErrLockOnlyIfExistsNoReturnValue is used when the flag `LockOnlyIfExists` of `LockCtx` is set, but `ReturnValues“ is not.
|
||||
// ErrLockOnlyIfExistsNoReturnValue is used when the flag `LockOnlyIfExists` of `LockCtx` is set, but `ReturnValues` is not.
|
||||
type ErrLockOnlyIfExistsNoReturnValue struct {
|
||||
StartTS uint64
|
||||
ForUpdateTs uint64
|
||||
|
|
|
|||
|
|
@ -153,6 +153,7 @@ type Region struct {
|
|||
syncFlag int32 // region need be sync in next turn
|
||||
lastAccess int64 // last region access time, see checkRegionCacheTTL
|
||||
invalidReason InvalidReason // the reason why the region is invalidated
|
||||
asyncReload atomic.Bool // the region need to be reloaded in async mode
|
||||
}
|
||||
|
||||
// AccessIndex represent the index for accessIndex array
|
||||
|
|
@ -420,7 +421,7 @@ func newRegionIndexMu(rs []*Region) *regionIndexMu {
|
|||
r.latestVersions = make(map[uint64]RegionVerID)
|
||||
r.sorted = NewSortedRegions(btreeDegree)
|
||||
for _, region := range rs {
|
||||
r.insertRegionToCache(region)
|
||||
r.insertRegionToCache(region, true)
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
|
@ -466,6 +467,11 @@ type RegionCache struct {
|
|||
// requestLiveness always returns unreachable.
|
||||
mockRequestLiveness atomic.Pointer[livenessFunc]
|
||||
}
|
||||
|
||||
regionsNeedReload struct {
|
||||
sync.Mutex
|
||||
regions []uint64
|
||||
}
|
||||
}
|
||||
|
||||
// NewRegionCache creates a RegionCache.
|
||||
|
|
@ -519,8 +525,8 @@ func (c *RegionCache) clear() {
|
|||
}
|
||||
|
||||
// thread unsafe, should use with lock
|
||||
func (c *RegionCache) insertRegionToCache(cachedRegion *Region) {
|
||||
c.mu.insertRegionToCache(cachedRegion)
|
||||
func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) {
|
||||
c.mu.insertRegionToCache(cachedRegion, invalidateOldRegion)
|
||||
}
|
||||
|
||||
// Close releases region cache's resource.
|
||||
|
|
@ -531,8 +537,13 @@ func (c *RegionCache) Close() {
|
|||
// asyncCheckAndResolveLoop with
|
||||
func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) {
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
reloadRegionTicker := time.NewTicker(10 * time.Second)
|
||||
defer func() {
|
||||
ticker.Stop()
|
||||
reloadRegionTicker.Stop()
|
||||
}()
|
||||
var needCheckStores []*Store
|
||||
reloadNextLoop := make(map[uint64]struct{})
|
||||
for {
|
||||
needCheckStores = needCheckStores[:0]
|
||||
select {
|
||||
|
|
@ -550,6 +561,21 @@ func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) {
|
|||
// there's a deleted store in the stores map which guaranteed by reReslve().
|
||||
return state != unresolved && state != tombstone && state != deleted
|
||||
})
|
||||
case <-reloadRegionTicker.C:
|
||||
for regionID := range reloadNextLoop {
|
||||
c.reloadRegion(regionID)
|
||||
delete(reloadNextLoop, regionID)
|
||||
}
|
||||
c.regionsNeedReload.Lock()
|
||||
for _, regionID := range c.regionsNeedReload.regions {
|
||||
// will reload in next tick, wait a while for two reasons:
|
||||
// 1. there may an unavailable duration while recreating the connection.
|
||||
// 2. the store may just be started, and wait safe ts synced to avoid the
|
||||
// possible dataIsNotReady error.
|
||||
reloadNextLoop[regionID] = struct{}{}
|
||||
}
|
||||
c.regionsNeedReload.regions = c.regionsNeedReload.regions[:0]
|
||||
c.regionsNeedReload.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1060,7 +1086,7 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey
|
|||
logutil.Eventf(bo.GetCtx(), "load region %d from pd, due to cache-miss", lr.GetID())
|
||||
r = lr
|
||||
c.mu.Lock()
|
||||
c.insertRegionToCache(r)
|
||||
c.insertRegionToCache(r, true)
|
||||
c.mu.Unlock()
|
||||
} else if r.checkNeedReloadAndMarkUpdated() {
|
||||
// load region when it be marked as need reload.
|
||||
|
|
@ -1073,7 +1099,7 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey
|
|||
logutil.Eventf(bo.GetCtx(), "load region %d from pd, due to need-reload", lr.GetID())
|
||||
r = lr
|
||||
c.mu.Lock()
|
||||
c.insertRegionToCache(r)
|
||||
c.insertRegionToCache(r, true)
|
||||
c.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
|
@ -1214,7 +1240,7 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K
|
|||
} else {
|
||||
r = lr
|
||||
c.mu.Lock()
|
||||
c.insertRegionToCache(r)
|
||||
c.insertRegionToCache(r, true)
|
||||
c.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
|
@ -1233,7 +1259,7 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K
|
|||
}
|
||||
|
||||
c.mu.Lock()
|
||||
c.insertRegionToCache(r)
|
||||
c.insertRegionToCache(r, true)
|
||||
c.mu.Unlock()
|
||||
return &KeyLocation{
|
||||
Region: r.VerID(),
|
||||
|
|
@ -1243,6 +1269,36 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (c *RegionCache) scheduleReloadRegion(region *Region) {
|
||||
if region == nil || !region.asyncReload.CompareAndSwap(false, true) {
|
||||
// async reload scheduled by other thread.
|
||||
return
|
||||
}
|
||||
regionID := region.GetID()
|
||||
if regionID > 0 {
|
||||
c.regionsNeedReload.Lock()
|
||||
c.regionsNeedReload.regions = append(c.regionsNeedReload.regions, regionID)
|
||||
c.regionsNeedReload.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *RegionCache) reloadRegion(regionID uint64) {
|
||||
bo := retry.NewNoopBackoff(context.Background())
|
||||
lr, err := c.loadRegionByID(bo, regionID)
|
||||
if err != nil {
|
||||
// ignore error and use old region info.
|
||||
logutil.Logger(bo.GetCtx()).Error("load region failure",
|
||||
zap.Uint64("regionID", regionID), zap.Error(err))
|
||||
if oldRegion := c.getRegionByIDFromCache(regionID); oldRegion != nil {
|
||||
oldRegion.asyncReload.Store(false)
|
||||
}
|
||||
return
|
||||
}
|
||||
c.mu.Lock()
|
||||
c.insertRegionToCache(lr, false)
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// GroupKeysByRegion separates keys into groups by their belonging Regions.
|
||||
// Specially it also returns the first key's region which may be used as the
|
||||
// 'PrimaryLockKey' and should be committed ahead of others.
|
||||
|
|
@ -1327,7 +1383,7 @@ func (c *RegionCache) BatchLoadRegionsWithKeyRange(bo *retry.Backoffer, startKey
|
|||
// TODO(youjiali1995): scanRegions always fetch regions from PD and these regions don't contain buckets information
|
||||
// for less traffic, so newly inserted regions in region cache don't have buckets information. We should improve it.
|
||||
for _, region := range regions {
|
||||
c.insertRegionToCache(region)
|
||||
c.insertRegionToCache(region, true)
|
||||
}
|
||||
|
||||
return
|
||||
|
|
@ -1401,7 +1457,9 @@ func (mu *regionIndexMu) removeVersionFromCache(oldVer RegionVerID, regionID uin
|
|||
|
||||
// insertRegionToCache tries to insert the Region to cache.
|
||||
// It should be protected by c.mu.l.Lock().
|
||||
func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region) {
|
||||
// if `invalidateOldRegion` is false, the old region cache should be still valid,
|
||||
// and it may still be used by some kv requests.
|
||||
func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) {
|
||||
oldRegion := mu.sorted.ReplaceOrInsert(cachedRegion)
|
||||
if oldRegion != nil {
|
||||
store := cachedRegion.getStore()
|
||||
|
|
@ -1416,8 +1474,11 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region) {
|
|||
if InvalidReason(atomic.LoadInt32((*int32)(&oldRegion.invalidReason))) == NoLeader {
|
||||
store.workTiKVIdx = (oldRegionStore.workTiKVIdx + 1) % AccessIndex(store.accessStoreNum(tiKVOnly))
|
||||
}
|
||||
// If the old region is still valid, do not invalidate it to avoid unnecessary backoff.
|
||||
if invalidateOldRegion {
|
||||
// Invalidate the old region in case it's not invalidated and some requests try with the stale region information.
|
||||
oldRegion.invalidate(Other)
|
||||
}
|
||||
// Don't refresh TiFlash work idx for region. Otherwise, it will always goto a invalid store which
|
||||
// is under transferring regions.
|
||||
store.workTiFlashIdx.Store(oldRegionStore.workTiFlashIdx.Load())
|
||||
|
|
@ -1939,7 +2000,7 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *retry.Backoffer, ctx *RPCContext
|
|||
|
||||
c.mu.Lock()
|
||||
for _, region := range newRegions {
|
||||
c.insertRegionToCache(region)
|
||||
c.insertRegionToCache(region, true)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
|
|
@ -2057,7 +2118,7 @@ func (c *RegionCache) UpdateBucketsIfNeeded(regionID RegionVerID, latestBucketsV
|
|||
return
|
||||
}
|
||||
c.mu.Lock()
|
||||
c.insertRegionToCache(new)
|
||||
c.insertRegionToCache(new, true)
|
||||
c.mu.Unlock()
|
||||
}()
|
||||
}
|
||||
|
|
@ -2527,8 +2588,8 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) {
|
|||
}
|
||||
|
||||
func (s *Store) getResolveState() resolveState {
|
||||
var state resolveState
|
||||
if s == nil {
|
||||
var state resolveState
|
||||
return state
|
||||
}
|
||||
return resolveState(atomic.LoadUint64(&s.state))
|
||||
|
|
|
|||
|
|
@ -966,7 +966,7 @@ func (s *testRegionCacheSuite) TestRegionEpochAheadOfTiKV() {
|
|||
region := createSampleRegion([]byte("k1"), []byte("k2"))
|
||||
region.meta.Id = 1
|
||||
region.meta.RegionEpoch = &metapb.RegionEpoch{Version: 10, ConfVer: 10}
|
||||
cache.insertRegionToCache(region)
|
||||
cache.insertRegionToCache(region, true)
|
||||
|
||||
r1 := metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 9, ConfVer: 10}}
|
||||
r2 := metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 10, ConfVer: 9}}
|
||||
|
|
@ -1257,7 +1257,7 @@ func (s *testRegionCacheSuite) TestPeersLenChange() {
|
|||
filterUnavailablePeers(cpRegion)
|
||||
region, err := newRegion(s.bo, s.cache, cpRegion)
|
||||
s.Nil(err)
|
||||
s.cache.insertRegionToCache(region)
|
||||
s.cache.insertRegionToCache(region, true)
|
||||
|
||||
// OnSendFail should not panic
|
||||
s.cache.OnSendFail(retry.NewNoopBackoff(context.Background()), ctx, false, errors.New("send fail"))
|
||||
|
|
@ -1293,7 +1293,7 @@ func (s *testRegionCacheSuite) TestPeersLenChangedByWitness() {
|
|||
cpRegion := &pd.Region{Meta: cpMeta}
|
||||
region, err := newRegion(s.bo, s.cache, cpRegion)
|
||||
s.Nil(err)
|
||||
s.cache.insertRegionToCache(region)
|
||||
s.cache.insertRegionToCache(region, true)
|
||||
|
||||
// OnSendFail should not panic
|
||||
s.cache.OnSendFail(retry.NewNoopBackoff(context.Background()), ctx, false, errors.New("send fail"))
|
||||
|
|
@ -1466,12 +1466,12 @@ func (s *testRegionCacheSuite) TestBuckets() {
|
|||
fakeRegion.setStore(cachedRegion.getStore().clone())
|
||||
// no buckets
|
||||
fakeRegion.getStore().buckets = nil
|
||||
s.cache.insertRegionToCache(fakeRegion)
|
||||
s.cache.insertRegionToCache(fakeRegion, true)
|
||||
cachedRegion = s.getRegion([]byte("a"))
|
||||
s.Equal(defaultBuckets, cachedRegion.getStore().buckets)
|
||||
// stale buckets
|
||||
fakeRegion.getStore().buckets = &metapb.Buckets{Version: defaultBuckets.Version - 1}
|
||||
s.cache.insertRegionToCache(fakeRegion)
|
||||
s.cache.insertRegionToCache(fakeRegion, true)
|
||||
cachedRegion = s.getRegion([]byte("a"))
|
||||
s.Equal(defaultBuckets, cachedRegion.getStore().buckets)
|
||||
// new buckets
|
||||
|
|
@ -1481,7 +1481,7 @@ func (s *testRegionCacheSuite) TestBuckets() {
|
|||
Keys: buckets.Keys,
|
||||
}
|
||||
fakeRegion.getStore().buckets = newBuckets
|
||||
s.cache.insertRegionToCache(fakeRegion)
|
||||
s.cache.insertRegionToCache(fakeRegion, true)
|
||||
cachedRegion = s.getRegion([]byte("a"))
|
||||
s.Equal(newBuckets, cachedRegion.getStore().buckets)
|
||||
|
||||
|
|
@ -1614,7 +1614,7 @@ func (s *testRegionCacheSuite) TestRemoveIntersectingRegions() {
|
|||
region, err := s.cache.loadRegion(s.bo, []byte("c"), false)
|
||||
s.Nil(err)
|
||||
s.Equal(region.GetID(), regions[0])
|
||||
s.cache.insertRegionToCache(region)
|
||||
s.cache.insertRegionToCache(region, true)
|
||||
loc, err = s.cache.LocateKey(s.bo, []byte{'c'})
|
||||
s.Nil(err)
|
||||
s.Equal(loc.Region.GetID(), regions[0])
|
||||
|
|
@ -1625,7 +1625,7 @@ func (s *testRegionCacheSuite) TestRemoveIntersectingRegions() {
|
|||
region, err = s.cache.loadRegion(s.bo, []byte("e"), false)
|
||||
s.Nil(err)
|
||||
s.Equal(region.GetID(), regions[0])
|
||||
s.cache.insertRegionToCache(region)
|
||||
s.cache.insertRegionToCache(region, true)
|
||||
loc, err = s.cache.LocateKey(s.bo, []byte{'e'})
|
||||
s.Nil(err)
|
||||
s.Equal(loc.Region.GetID(), regions[0])
|
||||
|
|
@ -1739,7 +1739,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestRefreshCache() {
|
|||
v2 := region.Region.confVer + 1
|
||||
r2 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v2}, StartKey: []byte{1}}
|
||||
st := &Store{storeID: s.store}
|
||||
s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()})
|
||||
s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true)
|
||||
|
||||
r, _ = s.cache.scanRegionsFromCache(s.bo, []byte{}, nil, 10)
|
||||
s.Equal(len(r), 2)
|
||||
|
|
|
|||
|
|
@ -572,18 +572,39 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
|
|||
if state.option.preferLeader {
|
||||
state.lastIdx = state.leaderIdx
|
||||
}
|
||||
for i := 0; i < replicaSize && !state.option.leaderOnly; i++ {
|
||||
idx := AccessIndex((int(state.lastIdx) + i) % replicaSize)
|
||||
// If the given store is abnormal to be accessed under `ReplicaReadMixed` mode, we should choose other followers or leader
|
||||
// as candidates to serve the Read request. Meanwhile, we should make the choice of next() meet Uniform Distribution.
|
||||
for cnt := 0; cnt < replicaSize && !state.isCandidate(idx, selector.replicas[idx]); cnt++ {
|
||||
idx = AccessIndex((int(idx) + rand.Intn(replicaSize)) % replicaSize)
|
||||
var offset int
|
||||
if state.lastIdx >= 0 {
|
||||
offset = rand.Intn(replicaSize)
|
||||
}
|
||||
if state.isCandidate(idx, selector.replicas[idx]) {
|
||||
reloadRegion := false
|
||||
for i := 0; i < replicaSize && !state.option.leaderOnly; i++ {
|
||||
var idx AccessIndex
|
||||
if state.option.preferLeader {
|
||||
if i == 0 {
|
||||
idx = state.lastIdx
|
||||
} else {
|
||||
// randomly select next replica, but skip state.lastIdx
|
||||
if (i+offset)%replicaSize == 0 {
|
||||
offset++
|
||||
}
|
||||
}
|
||||
} else {
|
||||
idx = AccessIndex((int(state.lastIdx) + i) % replicaSize)
|
||||
}
|
||||
selectReplica := selector.replicas[idx]
|
||||
if state.isCandidate(idx, selectReplica) {
|
||||
state.lastIdx = idx
|
||||
selector.targetIdx = idx
|
||||
break
|
||||
}
|
||||
if selectReplica.isEpochStale() &&
|
||||
selectReplica.store.getResolveState() == resolved &&
|
||||
selectReplica.store.getLivenessState() == reachable {
|
||||
reloadRegion = true
|
||||
}
|
||||
}
|
||||
if reloadRegion {
|
||||
selector.regionCache.scheduleReloadRegion(selector.region)
|
||||
}
|
||||
// If there is no candidate, fallback to the leader.
|
||||
if selector.targetIdx < 0 {
|
||||
|
|
|
|||
|
|
@ -322,7 +322,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestLearnerReplicaSelector() {
|
|||
cache := NewRegionCache(s.cache.pdClient)
|
||||
defer cache.Close()
|
||||
cache.mu.Lock()
|
||||
cache.insertRegionToCache(region)
|
||||
cache.insertRegionToCache(region, true)
|
||||
cache.mu.Unlock()
|
||||
|
||||
// Test accessFollower state with kv.ReplicaReadLearner request type.
|
||||
|
|
@ -373,7 +373,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
|
|||
cache := NewRegionCache(s.cache.pdClient)
|
||||
defer cache.Close()
|
||||
cache.mu.Lock()
|
||||
cache.insertRegionToCache(region)
|
||||
cache.insertRegionToCache(region, true)
|
||||
cache.mu.Unlock()
|
||||
|
||||
// Verify creating the replicaSelector.
|
||||
|
|
|
|||
|
|
@ -608,7 +608,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestGetRegionByIDFromCache() {
|
|||
v2 := region.Region.confVer + 1
|
||||
r2 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v2}, StartKey: []byte{1}}
|
||||
st := &Store{storeID: s.store}
|
||||
s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()})
|
||||
s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true)
|
||||
region, err = s.cache.LocateRegionByID(s.bo, s.region)
|
||||
s.Nil(err)
|
||||
s.NotNil(region)
|
||||
|
|
@ -618,7 +618,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestGetRegionByIDFromCache() {
|
|||
v3 := region.Region.confVer + 1
|
||||
r3 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: v3, ConfVer: region.Region.confVer}, StartKey: []byte{2}}
|
||||
st = &Store{storeID: s.store}
|
||||
s.cache.insertRegionToCache(&Region{meta: &r3, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()})
|
||||
s.cache.insertRegionToCache(&Region{meta: &r3, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true)
|
||||
region, err = s.cache.LocateRegionByID(s.bo, s.region)
|
||||
s.Nil(err)
|
||||
s.NotNil(region)
|
||||
|
|
|
|||
Loading…
Reference in New Issue