Clear intersecting regions in the cache when inserting a region (#566)

Signed-off-by: Yilin Chen <sticnarf@gmail.com>
This commit is contained in:
Yilin Chen 2022-08-15 17:47:24 +08:00 committed by GitHub
parent a31f03ebc4
commit 025596b7a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 82 additions and 0 deletions

View File

@ -1316,6 +1316,11 @@ func (c *RegionCache) insertRegionToCache(cachedRegion *Region) {
if !ok || latest.GetVer() < newVer.GetVer() || latest.GetConfVer() < newVer.GetConfVer() {
c.mu.latestVersions[cachedRegion.VerID().id] = newVer
}
// The intersecting regions in the cache are probably stale, clear them.
deleted := c.mu.sorted.removeIntersecting(cachedRegion)
for _, r := range deleted {
c.removeVersionFromCache(r.cachedRegion.VerID(), r.cachedRegion.GetID())
}
}
// searchCachedRegion finds a region from cache by key. Like `getCachedRegion`,

View File

@ -1536,3 +1536,59 @@ func (s *testRegionCacheSuite) TestLocateBucket() {
s.True(b.Contains(key))
}
}
func (s *testRegionCacheSuite) TestRemoveIntersectingRegions() {
// Split at "b", "c", "d", "e"
regions := s.cluster.AllocIDs(4)
regions = append([]uint64{s.region1}, regions...)
peers := [][]uint64{{s.peer1, s.peer2}}
for i := 0; i < 4; i++ {
peers = append(peers, s.cluster.AllocIDs(2))
}
for i := 0; i < 4; i++ {
s.cluster.Split(regions[i], regions[i+1], []byte{'b' + byte(i)}, peers[i+1], peers[i+1][0])
}
for c := 'a'; c <= 'e'; c++ {
loc, err := s.cache.LocateKey(s.bo, []byte{byte(c)})
s.Nil(err)
s.Equal(loc.Region.GetID(), regions[c-'a'])
}
// merge all except the last region together
for i := 1; i <= 3; i++ {
s.cluster.Merge(regions[0], regions[i])
}
// Now the region cache contains stale information
loc, err := s.cache.LocateKey(s.bo, []byte{'c'})
s.Nil(err)
s.NotEqual(loc.Region.GetID(), regions[0]) // This is incorrect, but is expected
loc, err = s.cache.LocateKey(s.bo, []byte{'e'})
s.Nil(err)
s.Equal(loc.Region.GetID(), regions[4]) // 'e' is not merged yet, so it's still correct
// If we insert the new region into the cache, the old intersecting regions will be removed.
// And the result will be correct.
region, err := s.cache.loadRegion(s.bo, []byte("c"), false)
s.Nil(err)
s.Equal(region.GetID(), regions[0])
s.cache.insertRegionToCache(region)
loc, err = s.cache.LocateKey(s.bo, []byte{'c'})
s.Nil(err)
s.Equal(loc.Region.GetID(), regions[0])
s.checkCache(2)
// Now, we merge the last region. This case tests against how we handle the empty end_key.
s.cluster.Merge(regions[0], regions[4])
region, err = s.cache.loadRegion(s.bo, []byte("e"), false)
s.Nil(err)
s.Equal(region.GetID(), regions[0])
s.cache.insertRegionToCache(region)
loc, err = s.cache.LocateKey(s.bo, []byte{'e'})
s.Nil(err)
s.Equal(loc.Region.GetID(), regions[0])
s.checkCache(1)
}

View File

@ -91,6 +91,27 @@ func (s *SortedRegions) AscendGreaterOrEqual(startKey, endKey []byte, limit int)
return regions
}
// removeIntersecting removes all items that have intersection with the key range of given region.
// If the region itself is in the cache, it's not removed.
func (s *SortedRegions) removeIntersecting(r *Region) []*btreeItem {
var deleted []*btreeItem
s.b.AscendGreaterOrEqual(newBtreeSearchItem(r.StartKey()), func(item *btreeItem) bool {
// Skip the item that is equal to the given region.
if item.cachedRegion.VerID() == r.VerID() {
return true
}
if len(r.EndKey()) > 0 && bytes.Compare(item.cachedRegion.StartKey(), r.EndKey()) >= 0 {
return false
}
deleted = append(deleted, item)
return true
})
for _, item := range deleted {
s.b.Delete(item)
}
return deleted
}
// Clear removes all items from the btree.
func (s *SortedRegions) Clear() {
s.b.Clear(false)