mirror of https://github.com/tikv/client-go.git
region_cache: allow pd follower handle region api (#1072)
Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>
This commit is contained in:
parent
85ca0a4a3f
commit
1a33acab05
|
|
@ -1117,7 +1117,7 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey
|
|||
r = c.searchCachedRegion(key, isEndKey)
|
||||
if r == nil {
|
||||
// load region when it is not exists or expired.
|
||||
lr, err := c.loadRegion(bo, key, isEndKey)
|
||||
lr, err := c.loadRegion(bo, key, isEndKey, pd.WithAllowFollowerHandle())
|
||||
if err != nil {
|
||||
// no region data, return error if failure.
|
||||
return nil, err
|
||||
|
|
@ -1125,8 +1125,20 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey
|
|||
logutil.Eventf(bo.GetCtx(), "load region %d from pd, due to cache-miss", lr.GetID())
|
||||
r = lr
|
||||
c.mu.Lock()
|
||||
c.insertRegionToCache(r, true, true)
|
||||
stale := !c.insertRegionToCache(r, true, true)
|
||||
c.mu.Unlock()
|
||||
// just retry once, it won't bring much overhead.
|
||||
if stale {
|
||||
lr, err = c.loadRegion(bo, key, isEndKey)
|
||||
if err != nil {
|
||||
// no region data, return error if failure.
|
||||
return nil, err
|
||||
}
|
||||
r = lr
|
||||
c.mu.Lock()
|
||||
c.insertRegionToCache(r, true, true)
|
||||
c.mu.Unlock()
|
||||
}
|
||||
} else if r.checkNeedReloadAndMarkUpdated() {
|
||||
// load region when it be marked as need reload.
|
||||
lr, err := c.loadRegion(bo, key, isEndKey)
|
||||
|
|
@ -1673,7 +1685,7 @@ func filterUnavailablePeers(region *pd.Region) {
|
|||
// loadRegion loads region from pd client, and picks the first peer as leader.
|
||||
// If the given key is the end key of the region that you want, you may set the second argument to true. This is useful
|
||||
// when processing in reverse order.
|
||||
func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool) (*Region, error) {
|
||||
func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool, opts ...pd.GetRegionOption) (*Region, error) {
|
||||
ctx := bo.GetCtx()
|
||||
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
|
||||
span1 := span.Tracer().StartSpan("loadRegion", opentracing.ChildOf(span.Context()))
|
||||
|
|
@ -1683,6 +1695,7 @@ func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool)
|
|||
|
||||
var backoffErr error
|
||||
searchPrev := false
|
||||
opts = append(opts, pd.WithBuckets())
|
||||
for {
|
||||
if backoffErr != nil {
|
||||
err := bo.Backoff(retry.BoPDRPC, backoffErr)
|
||||
|
|
@ -1694,9 +1707,9 @@ func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool)
|
|||
var reg *pd.Region
|
||||
var err error
|
||||
if searchPrev {
|
||||
reg, err = c.pdClient.GetPrevRegion(ctx, key, pd.WithBuckets())
|
||||
reg, err = c.pdClient.GetPrevRegion(ctx, key, opts...)
|
||||
} else {
|
||||
reg, err = c.pdClient.GetRegion(ctx, key, pd.WithBuckets())
|
||||
reg, err = c.pdClient.GetRegion(ctx, key, opts...)
|
||||
}
|
||||
metrics.LoadRegionCacheHistogramWhenCacheMiss.Observe(time.Since(start).Seconds())
|
||||
if err != nil {
|
||||
|
|
@ -1848,7 +1861,7 @@ func (c *RegionCache) scanRegions(bo *retry.Backoffer, startKey, endKey []byte,
|
|||
}
|
||||
}
|
||||
start := time.Now()
|
||||
regionsInfo, err := c.pdClient.ScanRegions(ctx, startKey, endKey, limit)
|
||||
regionsInfo, err := c.pdClient.ScanRegions(ctx, startKey, endKey, limit, pd.WithAllowFollowerHandle())
|
||||
metrics.LoadRegionCacheHistogramWithRegions.Observe(time.Since(start).Seconds())
|
||||
if err != nil {
|
||||
if apicodec.IsDecodeError(err) {
|
||||
|
|
|
|||
|
|
@ -1952,6 +1952,40 @@ func (s *testRegionCacheWithDelaySuite) TestStaleGetRegion() {
|
|||
s.Equal([]byte("b"), r.meta.EndKey)
|
||||
}
|
||||
|
||||
func (s *testRegionCacheWithDelaySuite) TestFollowerGetStaleRegion() {
|
||||
var delay uatomic.Bool
|
||||
pdCli3 := &CodecPDClient{mocktikv.NewPDClient(s.cluster, mocktikv.WithDelay(&delay)), apicodec.NewCodecV1(apicodec.ModeTxn)}
|
||||
followerDelayCache := NewRegionCache(pdCli3)
|
||||
|
||||
delay.Store(true)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
var final *Region
|
||||
go func() {
|
||||
var err error
|
||||
// followerDelayCache is empty now, so it will go follower.
|
||||
final, err = followerDelayCache.findRegionByKey(s.bo, []byte("z"), false)
|
||||
s.NoError(err)
|
||||
wg.Done()
|
||||
}()
|
||||
time.Sleep(30 * time.Millisecond)
|
||||
delay.Store(false)
|
||||
r, err := followerDelayCache.findRegionByKey(s.bo, []byte("y"), false)
|
||||
s.NoError(err)
|
||||
newPeersIDs := s.cluster.AllocIDs(1)
|
||||
s.cluster.Split(r.GetID(), s.cluster.AllocID(), []byte("z"), newPeersIDs, newPeersIDs[0])
|
||||
r.invalidate(Other)
|
||||
r, err = followerDelayCache.findRegionByKey(s.bo, []byte("y"), false)
|
||||
s.NoError(err)
|
||||
s.Equal([]byte("z"), r.meta.EndKey)
|
||||
|
||||
// no need to retry because
|
||||
wg.Wait()
|
||||
s.Equal([]byte("z"), final.meta.StartKey)
|
||||
|
||||
followerDelayCache.Close()
|
||||
}
|
||||
|
||||
func generateKeyForSimulator(id int, keyLen int) []byte {
|
||||
k := make([]byte, keyLen)
|
||||
copy(k, fmt.Sprintf("%010d", id))
|
||||
|
|
|
|||
Loading…
Reference in New Issue