mirror of https://github.com/tikv/client-go.git
region_cache: no need to metrics when inserting for scan regions (#1077)
* set false when scan region Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com> * set false when scan region Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com> --------- Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>
This commit is contained in:
parent
e80e9ca1fe
commit
8deef63a94
|
|
@ -408,6 +408,12 @@ func (r *Region) invalidate(reason InvalidReason) {
|
|||
atomic.StoreInt64(&r.lastAccess, invalidatedLastAccessTime)
|
||||
}
|
||||
|
||||
// invalidateWithoutMetrics invalidates a region without metrics, next time it will got null result.
|
||||
func (r *Region) invalidateWithoutMetrics(reason InvalidReason) {
|
||||
atomic.StoreInt32((*int32)(&r.invalidReason), int32(reason))
|
||||
atomic.StoreInt64(&r.lastAccess, invalidatedLastAccessTime)
|
||||
}
|
||||
|
||||
// scheduleReload schedules reload region request in next LocateKey.
|
||||
func (r *Region) scheduleReload() {
|
||||
oldValue := atomic.LoadInt32(&r.syncFlag)
|
||||
|
|
@ -448,7 +454,7 @@ func newRegionIndexMu(rs []*Region) *regionIndexMu {
|
|||
r.latestVersions = make(map[uint64]RegionVerID)
|
||||
r.sorted = NewSortedRegions(btreeDegree)
|
||||
for _, region := range rs {
|
||||
r.insertRegionToCache(region, true)
|
||||
r.insertRegionToCache(region, true, false)
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
|
@ -552,8 +558,8 @@ func (c *RegionCache) clear() {
|
|||
}
|
||||
|
||||
// thread unsafe, should use with lock
|
||||
func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) {
|
||||
c.mu.insertRegionToCache(cachedRegion, invalidateOldRegion)
|
||||
func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool, shouldCount bool) {
|
||||
c.mu.insertRegionToCache(cachedRegion, invalidateOldRegion, shouldCount)
|
||||
}
|
||||
|
||||
// Close releases region cache's resource.
|
||||
|
|
@ -1107,7 +1113,7 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey
|
|||
logutil.Eventf(bo.GetCtx(), "load region %d from pd, due to cache-miss", lr.GetID())
|
||||
r = lr
|
||||
c.mu.Lock()
|
||||
c.insertRegionToCache(r, true)
|
||||
c.insertRegionToCache(r, true, true)
|
||||
c.mu.Unlock()
|
||||
} else if r.checkNeedReloadAndMarkUpdated() {
|
||||
// load region when it be marked as need reload.
|
||||
|
|
@ -1121,7 +1127,7 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey
|
|||
logutil.Eventf(bo.GetCtx(), "load region %d from pd, due to need-reload", lr.GetID())
|
||||
r = lr
|
||||
c.mu.Lock()
|
||||
c.insertRegionToCache(r, true)
|
||||
c.insertRegionToCache(r, true, true)
|
||||
c.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
|
@ -1262,7 +1268,7 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K
|
|||
} else {
|
||||
r = lr
|
||||
c.mu.Lock()
|
||||
c.insertRegionToCache(r, true)
|
||||
c.insertRegionToCache(r, true, true)
|
||||
c.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
|
@ -1281,7 +1287,7 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K
|
|||
}
|
||||
|
||||
c.mu.Lock()
|
||||
c.insertRegionToCache(r, true)
|
||||
c.insertRegionToCache(r, true, true)
|
||||
c.mu.Unlock()
|
||||
return &KeyLocation{
|
||||
Region: r.VerID(),
|
||||
|
|
@ -1319,7 +1325,7 @@ func (c *RegionCache) reloadRegion(regionID uint64) {
|
|||
return
|
||||
}
|
||||
c.mu.Lock()
|
||||
c.insertRegionToCache(lr, false)
|
||||
c.insertRegionToCache(lr, false, false)
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
|
|
@ -1409,7 +1415,7 @@ func (c *RegionCache) BatchLoadRegionsWithKeyRange(bo *retry.Backoffer, startKey
|
|||
// TODO(youjiali1995): scanRegions always fetch regions from PD and these regions don't contain buckets information
|
||||
// for less traffic, so newly inserted regions in region cache don't have buckets information. We should improve it.
|
||||
for _, region := range regions {
|
||||
c.insertRegionToCache(region, true)
|
||||
c.insertRegionToCache(region, true, false)
|
||||
}
|
||||
|
||||
return
|
||||
|
|
@ -1485,7 +1491,7 @@ func (mu *regionIndexMu) removeVersionFromCache(oldVer RegionVerID, regionID uin
|
|||
// It should be protected by c.mu.l.Lock().
|
||||
// if `invalidateOldRegion` is false, the old region cache should be still valid,
|
||||
// and it may still be used by some kv requests.
|
||||
func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) {
|
||||
func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool, shouldCount bool) {
|
||||
oldRegion := mu.sorted.ReplaceOrInsert(cachedRegion)
|
||||
if oldRegion != nil {
|
||||
store := cachedRegion.getStore()
|
||||
|
|
@ -1503,7 +1509,11 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOld
|
|||
// If the old region is still valid, do not invalidate it to avoid unnecessary backoff.
|
||||
if invalidateOldRegion {
|
||||
// Invalidate the old region in case it's not invalidated and some requests try with the stale region information.
|
||||
oldRegion.invalidate(Other)
|
||||
if shouldCount {
|
||||
oldRegion.invalidate(Other)
|
||||
} else {
|
||||
oldRegion.invalidateWithoutMetrics(Other)
|
||||
}
|
||||
}
|
||||
// Don't refresh TiFlash work idx for region. Otherwise, it will always goto a invalid store which
|
||||
// is under transferring regions.
|
||||
|
|
@ -2049,7 +2059,7 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *retry.Backoffer, ctx *RPCContext
|
|||
|
||||
c.mu.Lock()
|
||||
for _, region := range newRegions {
|
||||
c.insertRegionToCache(region, true)
|
||||
c.insertRegionToCache(region, true, true)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
|
|
@ -2167,7 +2177,7 @@ func (c *RegionCache) UpdateBucketsIfNeeded(regionID RegionVerID, latestBucketsV
|
|||
return
|
||||
}
|
||||
c.mu.Lock()
|
||||
c.insertRegionToCache(new, true)
|
||||
c.insertRegionToCache(new, true, true)
|
||||
c.mu.Unlock()
|
||||
}()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1017,7 +1017,7 @@ func (s *testRegionCacheSuite) TestRegionEpochAheadOfTiKV() {
|
|||
region := createSampleRegion([]byte("k1"), []byte("k2"))
|
||||
region.meta.Id = 1
|
||||
region.meta.RegionEpoch = &metapb.RegionEpoch{Version: 10, ConfVer: 10}
|
||||
cache.insertRegionToCache(region, true)
|
||||
cache.insertRegionToCache(region, true, true)
|
||||
|
||||
r1 := metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 9, ConfVer: 10}}
|
||||
r2 := metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 10, ConfVer: 9}}
|
||||
|
|
@ -1308,7 +1308,7 @@ func (s *testRegionCacheSuite) TestPeersLenChange() {
|
|||
filterUnavailablePeers(cpRegion)
|
||||
region, err := newRegion(s.bo, s.cache, cpRegion)
|
||||
s.Nil(err)
|
||||
s.cache.insertRegionToCache(region, true)
|
||||
s.cache.insertRegionToCache(region, true, true)
|
||||
|
||||
// OnSendFail should not panic
|
||||
s.cache.OnSendFail(retry.NewNoopBackoff(context.Background()), ctx, false, errors.New("send fail"))
|
||||
|
|
@ -1344,7 +1344,7 @@ func (s *testRegionCacheSuite) TestPeersLenChangedByWitness() {
|
|||
cpRegion := &pd.Region{Meta: cpMeta}
|
||||
region, err := newRegion(s.bo, s.cache, cpRegion)
|
||||
s.Nil(err)
|
||||
s.cache.insertRegionToCache(region, true)
|
||||
s.cache.insertRegionToCache(region, true, true)
|
||||
|
||||
// OnSendFail should not panic
|
||||
s.cache.OnSendFail(retry.NewNoopBackoff(context.Background()), ctx, false, errors.New("send fail"))
|
||||
|
|
@ -1517,12 +1517,12 @@ func (s *testRegionCacheSuite) TestBuckets() {
|
|||
fakeRegion.setStore(cachedRegion.getStore().clone())
|
||||
// no buckets
|
||||
fakeRegion.getStore().buckets = nil
|
||||
s.cache.insertRegionToCache(fakeRegion, true)
|
||||
s.cache.insertRegionToCache(fakeRegion, true, true)
|
||||
cachedRegion = s.getRegion([]byte("a"))
|
||||
s.Equal(defaultBuckets, cachedRegion.getStore().buckets)
|
||||
// stale buckets
|
||||
fakeRegion.getStore().buckets = &metapb.Buckets{Version: defaultBuckets.Version - 1}
|
||||
s.cache.insertRegionToCache(fakeRegion, true)
|
||||
s.cache.insertRegionToCache(fakeRegion, true, true)
|
||||
cachedRegion = s.getRegion([]byte("a"))
|
||||
s.Equal(defaultBuckets, cachedRegion.getStore().buckets)
|
||||
// new buckets
|
||||
|
|
@ -1532,7 +1532,7 @@ func (s *testRegionCacheSuite) TestBuckets() {
|
|||
Keys: buckets.Keys,
|
||||
}
|
||||
fakeRegion.getStore().buckets = newBuckets
|
||||
s.cache.insertRegionToCache(fakeRegion, true)
|
||||
s.cache.insertRegionToCache(fakeRegion, true, true)
|
||||
cachedRegion = s.getRegion([]byte("a"))
|
||||
s.Equal(newBuckets, cachedRegion.getStore().buckets)
|
||||
|
||||
|
|
@ -1665,7 +1665,7 @@ func (s *testRegionCacheSuite) TestRemoveIntersectingRegions() {
|
|||
region, err := s.cache.loadRegion(s.bo, []byte("c"), false)
|
||||
s.Nil(err)
|
||||
s.Equal(region.GetID(), regions[0])
|
||||
s.cache.insertRegionToCache(region, true)
|
||||
s.cache.insertRegionToCache(region, true, true)
|
||||
loc, err = s.cache.LocateKey(s.bo, []byte{'c'})
|
||||
s.Nil(err)
|
||||
s.Equal(loc.Region.GetID(), regions[0])
|
||||
|
|
@ -1676,7 +1676,7 @@ func (s *testRegionCacheSuite) TestRemoveIntersectingRegions() {
|
|||
region, err = s.cache.loadRegion(s.bo, []byte("e"), false)
|
||||
s.Nil(err)
|
||||
s.Equal(region.GetID(), regions[0])
|
||||
s.cache.insertRegionToCache(region, true)
|
||||
s.cache.insertRegionToCache(region, true, true)
|
||||
loc, err = s.cache.LocateKey(s.bo, []byte{'e'})
|
||||
s.Nil(err)
|
||||
s.Equal(loc.Region.GetID(), regions[0])
|
||||
|
|
@ -1799,7 +1799,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestRefreshCache() {
|
|||
v2 := region.Region.confVer + 1
|
||||
r2 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v2}, StartKey: []byte{1}}
|
||||
st := &Store{storeID: s.store}
|
||||
s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true)
|
||||
s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true, true)
|
||||
|
||||
r, _ = s.cache.scanRegionsFromCache(s.bo, []byte{}, nil, 10)
|
||||
s.Equal(len(r), 2)
|
||||
|
|
|
|||
|
|
@ -332,7 +332,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestLearnerReplicaSelector() {
|
|||
cache := NewRegionCache(s.cache.pdClient)
|
||||
defer cache.Close()
|
||||
cache.mu.Lock()
|
||||
cache.insertRegionToCache(region, true)
|
||||
cache.insertRegionToCache(region, true, true)
|
||||
cache.mu.Unlock()
|
||||
|
||||
// Test accessFollower state with kv.ReplicaReadLearner request type.
|
||||
|
|
@ -383,7 +383,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
|
|||
cache := NewRegionCache(s.cache.pdClient)
|
||||
defer cache.Close()
|
||||
cache.mu.Lock()
|
||||
cache.insertRegionToCache(region, true)
|
||||
cache.insertRegionToCache(region, true, true)
|
||||
cache.mu.Unlock()
|
||||
|
||||
// Verify creating the replicaSelector.
|
||||
|
|
|
|||
|
|
@ -616,7 +616,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestGetRegionByIDFromCache() {
|
|||
v2 := region.Region.confVer + 1
|
||||
r2 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v2}, StartKey: []byte{1}}
|
||||
st := &Store{storeID: s.store}
|
||||
s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true)
|
||||
s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true, true)
|
||||
region, err = s.cache.LocateRegionByID(s.bo, s.region)
|
||||
s.Nil(err)
|
||||
s.NotNil(region)
|
||||
|
|
@ -626,7 +626,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestGetRegionByIDFromCache() {
|
|||
v3 := region.Region.confVer + 1
|
||||
r3 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: v3, ConfVer: region.Region.confVer}, StartKey: []byte{2}}
|
||||
st = &Store{storeID: s.store}
|
||||
s.cache.insertRegionToCache(&Region{meta: &r3, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true)
|
||||
s.cache.insertRegionToCache(&Region{meta: &r3, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true, true)
|
||||
region, err = s.cache.LocateRegionByID(s.bo, s.region)
|
||||
s.Nil(err)
|
||||
s.NotNil(region)
|
||||
|
|
|
|||
Loading…
Reference in New Issue