region cache: retry scan or batch scan regions when returned region has no leader (#1480)

Signed-off-by: cfzjywxk <cfzjywxk@gmail.com>
This commit is contained in:
cfzjywxk 2024-10-23 10:31:20 +08:00 committed by GitHub
parent 8dfa86b5d1
commit 691e80ae0e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 68 additions and 4 deletions

View File

@ -2223,7 +2223,18 @@ func (c *RegionCache) scanRegions(bo *retry.Backoffer, startKey, endKey []byte,
backoffErr = errors.Errorf("PD returned regions have gaps, limit: %d", limit)
continue
}
return c.handleRegionInfos(bo, regionsInfo, true)
validRegions, err := c.handleRegionInfos(bo, regionsInfo, true)
if err != nil {
return nil, err
}
// If the region information is loaded from the local disk and the current leader has not
// yet reported a heartbeat to PD, the region information scanned at this time will not include the leader.
// Retry if there is no valid regions with leaders.
if len(validRegions) == 0 {
backoffErr = errors.Errorf("All returned regions have no leaders, limit: %d", limit)
continue
}
return validRegions, nil
}
}
@ -2290,7 +2301,18 @@ func (c *RegionCache) batchScanRegions(bo *retry.Backoffer, keyRanges []pd.KeyRa
)
continue
}
return c.handleRegionInfos(bo, regionsInfo, opt.needRegionHasLeaderPeer)
validRegions, err := c.handleRegionInfos(bo, regionsInfo, opt.needRegionHasLeaderPeer)
if err != nil {
return nil, err
}
// If the region information is loaded from the local disk and the current leader has not
// yet reported a heartbeat to PD, the region information scanned at this time will not include the leader.
// Retry if there is no valid regions with leaders.
if len(validRegions) == 0 {
backoffErr = errors.Errorf("All returned regions have no leaders, limit: %d", limit)
continue
}
return validRegions, nil
}
}
@ -2397,7 +2419,7 @@ func (c *RegionCache) handleRegionInfos(bo *retry.Backoffer, regionsInfo []*pd.R
regions = append(regions, region)
}
if len(regions) == 0 {
return nil, errors.New("receive Regions with no peer")
return nil, nil
}
if len(regions) < len(regionsInfo) {
logutil.Logger(context.Background()).Debug(

View File

@ -66,7 +66,8 @@ import (
type inspectedPDClient struct {
pd.Client
getRegion func(ctx context.Context, cli pd.Client, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error)
getRegion func(ctx context.Context, cli pd.Client, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error)
batchScanRegions func(ctx context.Context, keyRanges []pd.KeyRange, limit int, opts ...pd.GetRegionOption) ([]*pd.Region, error)
}
func (c *inspectedPDClient) GetRegion(ctx context.Context, key []byte, opts ...pd.GetRegionOption) (*pd.Region, error) {
@ -76,6 +77,13 @@ func (c *inspectedPDClient) GetRegion(ctx context.Context, key []byte, opts ...p
return c.Client.GetRegion(ctx, key, opts...)
}
func (c *inspectedPDClient) BatchScanRegions(ctx context.Context, keyRanges []pd.KeyRange, limit int, opts ...pd.GetRegionOption) ([]*pd.Region, error) {
if c.batchScanRegions != nil {
return c.batchScanRegions(ctx, keyRanges, limit, opts...)
}
return c.Client.BatchScanRegions(ctx, keyRanges, limit, opts...)
}
func TestBackgroundRunner(t *testing.T) {
t.Run("ShutdownWait", func(t *testing.T) {
dur := 100 * time.Millisecond
@ -459,6 +467,40 @@ func (s *testRegionCacheSuite) TestResolveStateTransition() {
s.cluster.AddStore(storeMeta.GetId(), storeMeta.GetAddress(), storeMeta.GetLabels()...)
}
func (s *testRegionCacheSuite) TestReturnRegionWithNoLeader() {
region := s.getRegion([]byte("x"))
NoLeaderRegion := &pd.Region{
Meta: region.meta,
Leader: nil,
}
originalBatchScanRegions := s.cache.pdClient.BatchScanRegions
batchScanCnt := 0
s.cache.pdClient = &inspectedPDClient{
Client: s.cache.pdClient,
batchScanRegions: func(ctx context.Context, keyRanges []pd.KeyRange, limit int, opts ...pd.GetRegionOption) ([]*pd.Region, error) {
if batchScanCnt == 0 {
batchScanCnt++
return []*pd.Region{NoLeaderRegion}, nil
} else {
return originalBatchScanRegions(ctx, keyRanges, limit, opts...)
}
},
}
bo := retry.NewBackofferWithVars(context.Background(), 1000, nil)
returnedRegions, err := s.cache.scanRegions(bo, nil, nil, 100)
s.Nil(err)
s.Equal(len(returnedRegions), 1)
s.Equal(returnedRegions[0].meta.GetId(), region.GetID())
returnedRegions, err = s.cache.batchScanRegions(bo, []pd.KeyRange{{StartKey: nil, EndKey: nil}}, 100, WithNeedRegionHasLeaderPeer())
s.Nil(err)
s.Equal(len(returnedRegions), 1)
s.Equal(returnedRegions[0].meta.GetId(), region.GetID())
}
func (s *testRegionCacheSuite) TestNeedExpireRegionAfterTTL() {
s.onClosed = func() { SetRegionCacheTTLWithJitter(600, 60) }
SetRegionCacheTTLWithJitter(2, 0)