mirror of https://github.com/tikv/client-go.git
region cache: fallback to ScanRegions when BatchScanRegions receive unimplement error (#1378)
* add fallback Signed-off-by: you06 <you1474600@gmail.com> * fallback to ScanRegions when BatchScanRegions receive unimplement error Signed-off-by: you06 <you1474600@gmail.com> * add comment & test Signed-off-by: you06 <you1474600@gmail.com> --------- Signed-off-by: you06 <you1474600@gmail.com>
This commit is contained in:
parent
4a72526f6c
commit
0206a3c142
|
|
@ -2192,6 +2192,9 @@ func (c *RegionCache) batchScanRegions(bo *retry.Backoffer, keyRanges []pd.KeyRa
|
||||||
regionsInfo, err := c.pdClient.BatchScanRegions(ctx, keyRanges, limit, pdOpts...)
|
regionsInfo, err := c.pdClient.BatchScanRegions(ctx, keyRanges, limit, pdOpts...)
|
||||||
metrics.LoadRegionCacheHistogramWithBatchScanRegions.Observe(time.Since(start).Seconds())
|
metrics.LoadRegionCacheHistogramWithBatchScanRegions.Observe(time.Since(start).Seconds())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if st, ok := status.FromError(err); ok && st.Code() == codes.Unimplemented {
|
||||||
|
return c.batchScanRegionsFallback(bo, keyRanges, limit, opts...)
|
||||||
|
}
|
||||||
if apicodec.IsDecodeError(err) {
|
if apicodec.IsDecodeError(err) {
|
||||||
return nil, errors.Errorf("failed to decode region range key, range num: %d, limit: %d, err: %v",
|
return nil, errors.Errorf("failed to decode region range key, range num: %d, limit: %d, err: %v",
|
||||||
len(keyRanges), limit, err)
|
len(keyRanges), limit, err)
|
||||||
|
|
@ -2216,6 +2219,40 @@ func (c *RegionCache) batchScanRegions(bo *retry.Backoffer, keyRanges []pd.KeyRa
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *RegionCache) batchScanRegionsFallback(bo *retry.Backoffer, keyRanges []pd.KeyRange, limit int, opts ...BatchLocateKeyRangesOpt) ([]*Region, error) {
|
||||||
|
logutil.BgLogger().Warn("batch scan regions fallback to scan regions", zap.Int("range-num", len(keyRanges)))
|
||||||
|
res := make([]*Region, 0, len(keyRanges))
|
||||||
|
var lastRegion *Region
|
||||||
|
for _, keyRange := range keyRanges {
|
||||||
|
if lastRegion != nil {
|
||||||
|
endKey := lastRegion.EndKey()
|
||||||
|
if len(endKey) == 0 {
|
||||||
|
// end_key is empty means the last region is the last region of the store, which certainly contains all the rest ranges.
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if bytes.Compare(endKey, keyRange.EndKey) >= 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if bytes.Compare(endKey, keyRange.StartKey) > 0 {
|
||||||
|
keyRange.StartKey = endKey
|
||||||
|
}
|
||||||
|
}
|
||||||
|
regions, err := c.scanRegions(bo, keyRange.StartKey, keyRange.EndKey, limit)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if len(regions) > 0 {
|
||||||
|
lastRegion = regions[len(regions)-1]
|
||||||
|
}
|
||||||
|
res = append(res, regions...)
|
||||||
|
if len(regions) >= limit {
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
limit -= len(regions)
|
||||||
|
}
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *RegionCache) handleRegionInfos(bo *retry.Backoffer, regionsInfo []*pd.Region, needLeader bool) ([]*Region, error) {
|
func (c *RegionCache) handleRegionInfos(bo *retry.Backoffer, regionsInfo []*pd.Region, needLeader bool) ([]*Region, error) {
|
||||||
regions := make([]*Region, 0, len(regionsInfo))
|
regions := make([]*Region, 0, len(regionsInfo))
|
||||||
for _, r := range regionsInfo {
|
for _, r := range regionsInfo {
|
||||||
|
|
|
||||||
|
|
@ -2594,6 +2594,16 @@ func (s *testRegionCacheSuite) TestSplitKeyRanges() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *testRegionCacheSuite) TestBatchScanRegions() {
|
func (s *testRegionCacheSuite) TestBatchScanRegions() {
|
||||||
|
s.testBatchScanRegions()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *testRegionCacheSuite) TestBatchScanRegionsFallback() {
|
||||||
|
s.Nil(failpoint.Enable("tikvclient/mockBatchScanRegionsUnimplemented", `return`))
|
||||||
|
s.testBatchScanRegions()
|
||||||
|
s.Nil(failpoint.Disable("tikvclient/mockBatchScanRegionsUnimplemented"))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *testRegionCacheSuite) testBatchScanRegions() {
|
||||||
// Split at "a", "b", "c", "d", "e", "f", "g"
|
// Split at "a", "b", "c", "d", "e", "f", "g"
|
||||||
// nil --- 'a' --- 'b' --- 'c' --- 'd' --- 'e' --- 'f' --- 'g' --- nil
|
// nil --- 'a' --- 'b' --- 'c' --- 'd' --- 'e' --- 'f' --- 'g' --- nil
|
||||||
// <- 0 -> <- 1 -> <- 2 -> <- 3 -> <- 4 -> <- 5 -> <- 6 -> <- 7 ->
|
// <- 0 -> <- 1 -> <- 2 -> <- 3 -> <- 4 -> <- 5 -> <- 6 -> <- 7 ->
|
||||||
|
|
|
||||||
|
|
@ -49,8 +49,11 @@ import (
|
||||||
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
|
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/tikv/client-go/v2/oracle"
|
"github.com/tikv/client-go/v2/oracle"
|
||||||
|
"github.com/tikv/client-go/v2/util"
|
||||||
pd "github.com/tikv/pd/client"
|
pd "github.com/tikv/pd/client"
|
||||||
"go.uber.org/atomic"
|
"go.uber.org/atomic"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Use global variables to prevent pdClients from creating duplicate timestamps.
|
// Use global variables to prevent pdClients from creating duplicate timestamps.
|
||||||
|
|
@ -254,6 +257,9 @@ func (c *pdClient) ScanRegions(ctx context.Context, startKey []byte, endKey []by
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *pdClient) BatchScanRegions(ctx context.Context, keyRanges []pd.KeyRange, limit int, opts ...pd.GetRegionOption) ([]*pd.Region, error) {
|
func (c *pdClient) BatchScanRegions(ctx context.Context, keyRanges []pd.KeyRange, limit int, opts ...pd.GetRegionOption) ([]*pd.Region, error) {
|
||||||
|
if _, err := util.EvalFailpoint("mockBatchScanRegionsUnimplemented"); err == nil {
|
||||||
|
return nil, status.Errorf(codes.Unimplemented, "mock BatchScanRegions is not implemented")
|
||||||
|
}
|
||||||
regions := make([]*pd.Region, 0, len(keyRanges))
|
regions := make([]*pd.Region, 0, len(keyRanges))
|
||||||
var lastRegion *pd.Region
|
var lastRegion *pd.Region
|
||||||
for _, keyRange := range keyRanges {
|
for _, keyRange := range keyRanges {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue