mirror of https://github.com/tikv/client-go.git
region cache: batch find regions by key ranges from cache (#1410)
* try add benchmark Signed-off-by: wshwsh12 <793703860@qq.com> * impl Signed-off-by: wshwsh12 <793703860@qq.com> * time Signed-off-by: wshwsh12 <793703860@qq.com> * fix ut Signed-off-by: wshwsh12 <793703860@qq.com> * address comments Signed-off-by: wshwsh12 <793703860@qq.com> * startkey Signed-off-by: wshwsh12 <793703860@qq.com> * add comments Signed-off-by: wshwsh12 <793703860@qq.com> --------- Signed-off-by: wshwsh12 <793703860@qq.com> Co-authored-by: you06 <you1474600@gmail.com>
This commit is contained in:
parent
aa8b33873f
commit
f0f57f28c4
|
@ -1240,24 +1240,50 @@ func (c *RegionCache) BatchLocateKeyRanges(bo *retry.Backoffer, keyRanges []kv.K
|
|||
keyRange.StartKey = lastRegion.EndKey()
|
||||
}
|
||||
}
|
||||
// TODO: find all the cached regions in the range.
|
||||
// now we only check if the region is cached from the lower bound, if there is a uncached hole in the middle,
|
||||
// we will load the rest regions even they are cached.
|
||||
r := c.tryFindRegionByKey(keyRange.StartKey, false)
|
||||
lastRegion = r
|
||||
if r == nil {
|
||||
// region cache miss, add the cut range to uncachedRanges, load from PD later.
|
||||
uncachedRanges = append(uncachedRanges, pd.KeyRange{StartKey: keyRange.StartKey, EndKey: keyRange.EndKey})
|
||||
continue
|
||||
}
|
||||
// region cache hit, add the region to cachedRegions.
|
||||
cachedRegions = append(cachedRegions, r)
|
||||
if r.ContainsByEnd(keyRange.EndKey) {
|
||||
// the range is fully hit in the region cache.
|
||||
continue
|
||||
}
|
||||
keyRange.StartKey = r.EndKey()
|
||||
// Batch load rest regions from Cache.
|
||||
containsAll := false
|
||||
outer:
|
||||
for {
|
||||
// TODO: find all the cached regions in the range.
|
||||
// now we only check if the region is cached from the lower bound, if there is a uncached hole in the middle,
|
||||
// we will load the rest regions even they are cached.
|
||||
r := c.tryFindRegionByKey(keyRange.StartKey, false)
|
||||
lastRegion = r
|
||||
if r == nil {
|
||||
// region cache miss, add the cut range to uncachedRanges, load from PD later.
|
||||
uncachedRanges = append(uncachedRanges, pd.KeyRange{StartKey: keyRange.StartKey, EndKey: keyRange.EndKey})
|
||||
batchRegionInCache, err := c.scanRegionsFromCache(bo, keyRange.StartKey, keyRange.EndKey, defaultRegionsPerBatch)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, r = range batchRegionInCache {
|
||||
if !r.Contains(keyRange.StartKey) { // uncached hole, load the rest regions
|
||||
break outer
|
||||
}
|
||||
cachedRegions = append(cachedRegions, r)
|
||||
lastRegion = r
|
||||
if r.ContainsByEnd(keyRange.EndKey) {
|
||||
// the range is fully hit in the region cache.
|
||||
containsAll = true
|
||||
break outer
|
||||
}
|
||||
keyRange.StartKey = r.EndKey()
|
||||
}
|
||||
if len(batchRegionInCache) < defaultRegionsPerBatch { // region cache miss, load the rest regions
|
||||
break
|
||||
}
|
||||
// region cache hit, add the region to cachedRegions.
|
||||
cachedRegions = append(cachedRegions, r)
|
||||
if r.ContainsByEnd(keyRange.EndKey) {
|
||||
// the range is fully hit in the region cache.
|
||||
break
|
||||
}
|
||||
keyRange.StartKey = r.EndKey()
|
||||
}
|
||||
if !containsAll {
|
||||
uncachedRanges = append(uncachedRanges, pd.KeyRange{StartKey: keyRange.StartKey, EndKey: keyRange.EndKey})
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2078,7 +2104,8 @@ func (c *RegionCache) loadRegionByID(bo *retry.Backoffer, regionID uint64) (*Reg
|
|||
}
|
||||
}
|
||||
|
||||
// TODO(youjiali1995): for optimizing BatchLoadRegionsWithKeyRange, not used now.
|
||||
// For optimizing BatchLocateKeyRanges, scanRegionsFromCache scans at most `limit` regions from cache.
|
||||
// It is the caller's responsibility to make sure that startKey is a node in the B-tree, otherwise, the startKey will not be included in the return regions.
|
||||
func (c *RegionCache) scanRegionsFromCache(bo *retry.Backoffer, startKey, endKey []byte, limit int) ([]*Region, error) {
|
||||
if limit == 0 {
|
||||
return nil, nil
|
||||
|
@ -2089,9 +2116,6 @@ func (c *RegionCache) scanRegionsFromCache(bo *retry.Backoffer, startKey, endKey
|
|||
defer c.mu.RUnlock()
|
||||
regions = c.mu.sorted.AscendGreaterOrEqual(startKey, endKey, limit)
|
||||
|
||||
if len(regions) == 0 {
|
||||
return nil, errors.New("no regions in the cache")
|
||||
}
|
||||
return regions, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -37,6 +37,7 @@ package locate
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
|
@ -2254,18 +2255,57 @@ func (s *testRegionRequestToSingleStoreSuite) TestRefreshCache() {
|
|||
|
||||
region, _ := s.cache.LocateRegionByID(s.bo, s.region)
|
||||
v2 := region.Region.confVer + 1
|
||||
r2 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v2}, StartKey: []byte{1}}
|
||||
r2 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v2}, StartKey: []byte{2}}
|
||||
st := newUninitializedStore(s.store)
|
||||
s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), ttl: nextTTLWithoutJitter(time.Now().Unix())}, true, true)
|
||||
|
||||
// Since region cache doesn't remove the first intersected region(it scan intersected region by AscendGreaterOrEqual), the outdated region (-inf, inf) is still alive.
|
||||
// The new inserted valid region [{2}, inf) is ignored because the first seen region (-inf, inf) contains all the required ranges.
|
||||
r, _ = s.cache.scanRegionsFromCache(s.bo, []byte{}, nil, 10)
|
||||
s.Equal(len(r), 1)
|
||||
s.Equal(r[0].StartKey(), []byte(nil))
|
||||
|
||||
// regions: (-inf,2), [2, +inf). Get all regions.
|
||||
v3 := region.Region.confVer + 2
|
||||
r3 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v3}, StartKey: []byte{}, EndKey: []byte{2}}
|
||||
s.cache.insertRegionToCache(&Region{meta: &r3, store: unsafe.Pointer(st), ttl: nextTTLWithoutJitter(time.Now().Unix())}, true, true)
|
||||
r, _ = s.cache.scanRegionsFromCache(s.bo, []byte{}, nil, 10)
|
||||
s.Equal(len(r), 2)
|
||||
|
||||
// regions: (-inf,1), [2, +inf). Get region (-inf, 1).
|
||||
v4 := region.Region.confVer + 3
|
||||
r4 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v4}, StartKey: []byte{}, EndKey: []byte{1}}
|
||||
s.cache.insertRegionToCache(&Region{meta: &r4, store: unsafe.Pointer(st), ttl: nextTTLWithoutJitter(time.Now().Unix())}, true, true)
|
||||
r, _ = s.cache.scanRegionsFromCache(s.bo, []byte{}, nil, 10)
|
||||
s.Equal(len(r), 1)
|
||||
|
||||
_ = s.cache.refreshRegionIndex(s.bo)
|
||||
r, _ = s.cache.scanRegionsFromCache(s.bo, []byte{}, nil, 10)
|
||||
s.Equal(len(r), 1)
|
||||
}
|
||||
|
||||
func (s *testRegionRequestToSingleStoreSuite) TestRegionCacheStartNonEmpty() {
|
||||
_ = s.cache.refreshRegionIndex(s.bo)
|
||||
r, _ := s.cache.scanRegionsFromCache(s.bo, []byte{}, nil, 10)
|
||||
s.Equal(len(r), 1)
|
||||
|
||||
region, _ := s.cache.LocateRegionByID(s.bo, s.region)
|
||||
v2 := region.Region.confVer + 1
|
||||
r2 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v2}, StartKey: []byte{1}}
|
||||
st := newUninitializedStore(s.store)
|
||||
|
||||
s.cache.mu.Lock()
|
||||
s.cache.mu.sorted.Clear()
|
||||
s.cache.mu.Unlock()
|
||||
// region cache after clear: []
|
||||
|
||||
s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), ttl: nextTTLWithoutJitter(time.Now().Unix())}, true, true)
|
||||
// region cache after insert: [[1, +inf)]
|
||||
|
||||
r, _ = s.cache.scanRegionsFromCache(s.bo, []byte{}, nil, 10)
|
||||
s.Equal(len(r), 0)
|
||||
}
|
||||
|
||||
func (s *testRegionRequestToSingleStoreSuite) TestRefreshCacheConcurrency() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go func(cache *RegionCache) {
|
||||
|
@ -2874,3 +2914,40 @@ func (s *testRegionCacheSuite) TestIssue1401() {
|
|||
}, 3*time.Second, time.Second)
|
||||
s.Require().True(isStoreContainLabel(newStore1.labels, "host", "0.0.0.0:20161"))
|
||||
}
|
||||
|
||||
func BenchmarkBatchLocateKeyRangesFromCache(t *testing.B) {
|
||||
t.StopTimer()
|
||||
s := new(testRegionCacheSuite)
|
||||
s.SetT(&testing.T{})
|
||||
s.SetupTest()
|
||||
|
||||
regionNum := 10000
|
||||
regions := s.cluster.AllocIDs(regionNum)
|
||||
regions = append([]uint64{s.region1}, regions...)
|
||||
|
||||
peers := [][]uint64{{s.peer1, s.peer2}}
|
||||
for i := 0; i < regionNum-1; i++ {
|
||||
peers = append(peers, s.cluster.AllocIDs(2))
|
||||
}
|
||||
|
||||
for i := 0; i < regionNum-1; i++ {
|
||||
b := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(b, uint64(i*2))
|
||||
s.cluster.Split(regions[i], regions[i+1], b, peers[i+1], peers[i+1][0])
|
||||
}
|
||||
|
||||
// cache all regions
|
||||
keyLocation, err := s.cache.BatchLocateKeyRanges(s.bo, []kv.KeyRange{{StartKey: []byte(""), EndKey: []byte("")}})
|
||||
if err != nil || len(keyLocation) != regionNum {
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
t.StartTimer()
|
||||
for i := 0; i < t.N; i++ {
|
||||
keyLocation, err := s.cache.BatchLocateKeyRanges(s.bo, []kv.KeyRange{{StartKey: []byte(""), EndKey: []byte("")}})
|
||||
if err != nil || len(keyLocation) != regionNum {
|
||||
t.FailNow()
|
||||
}
|
||||
}
|
||||
s.TearDownTest()
|
||||
}
|
||||
|
|
|
@ -36,6 +36,7 @@ package locate
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"time"
|
||||
|
||||
"github.com/google/btree"
|
||||
"github.com/tikv/client-go/v2/internal/logutil"
|
||||
|
@ -79,12 +80,22 @@ func (s *SortedRegions) SearchByKey(key []byte, isEndKey bool) (r *Region) {
|
|||
}
|
||||
|
||||
// AscendGreaterOrEqual returns all items that are greater than or equal to the key.
|
||||
// It is the caller's responsibility to make sure that startKey is a node in the B-tree, otherwise, the startKey will not be included in the return regions.
|
||||
func (s *SortedRegions) AscendGreaterOrEqual(startKey, endKey []byte, limit int) (regions []*Region) {
|
||||
now := time.Now().Unix()
|
||||
lastStartKey := startKey
|
||||
s.b.AscendGreaterOrEqual(newBtreeSearchItem(startKey), func(item *btreeItem) bool {
|
||||
region := item.cachedRegion
|
||||
if len(endKey) > 0 && bytes.Compare(region.StartKey(), endKey) >= 0 {
|
||||
return false
|
||||
}
|
||||
if !region.checkRegionCacheTTL(now) {
|
||||
return false
|
||||
}
|
||||
if !region.Contains(lastStartKey) { // uncached hole
|
||||
return false
|
||||
}
|
||||
lastStartKey = region.EndKey()
|
||||
regions = append(regions, region)
|
||||
return len(regions) < limit
|
||||
})
|
||||
|
|
Loading…
Reference in New Issue