integrate circuitbreaker for region calls (#1543)

ref tikv/pd#8678

Signed-off-by: artem_danilov <artem_danilov@airbnb.com>

Co-authored-by: artem_danilov <artem_danilov@airbnb.com>
This commit is contained in:
Artem Danilov 2025-01-14 20:06:13 -08:00 committed by GitHub
parent a348c17c6b
commit be4b478c11
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 42 additions and 5 deletions

View File

@ -69,6 +69,7 @@ import (
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/clients/router"
"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/pkg/circuitbreaker"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@ -133,6 +134,24 @@ func nextTTL(ts int64) int64 {
return ts + regionCacheTTLSec + jitter
}
var pdRegionMetaCircuitBreaker = circuitbreaker.NewCircuitBreaker("region-meta",
circuitbreaker.Settings{
ErrorRateWindow: 30,
MinQPSForOpen: 10,
CoolDownInterval: 10,
HalfOpenSuccessCount: 1,
})
// wrap context with circuit breaker for PD region metadata calls
func withPDCircuitBreaker(ctx context.Context) context.Context {
return circuitbreaker.WithCircuitBreaker(ctx, pdRegionMetaCircuitBreaker)
}
// ChangePDRegionMetaCircuitBreakerSettings changes circuit breaker changes for region metadata calls
func ChangePDRegionMetaCircuitBreakerSettings(apply func(config *circuitbreaker.Settings)) {
pdRegionMetaCircuitBreaker.ChangeSettings(apply)
}
// nextTTLWithoutJitter is used for test.
func nextTTLWithoutJitter(ts int64) int64 {
return ts + regionCacheTTLSec
@ -2071,9 +2090,9 @@ func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool,
var reg *router.Region
var err error
if searchPrev {
reg, err = c.pdClient.GetPrevRegion(ctx, key, opts...)
reg, err = c.pdClient.GetPrevRegion(withPDCircuitBreaker(ctx), key, opts...)
} else {
reg, err = c.pdClient.GetRegion(ctx, key, opts...)
reg, err = c.pdClient.GetRegion(withPDCircuitBreaker(ctx), key, opts...)
}
metrics.LoadRegionCacheHistogramWhenCacheMiss.Observe(time.Since(start).Seconds())
if err != nil {
@ -2121,7 +2140,7 @@ func (c *RegionCache) loadRegionByID(bo *retry.Backoffer, regionID uint64) (*Reg
}
}
start := time.Now()
reg, err := c.pdClient.GetRegionByID(ctx, regionID, opt.WithBuckets())
reg, err := c.pdClient.GetRegionByID(withPDCircuitBreaker(ctx), regionID, opt.WithBuckets())
metrics.LoadRegionCacheHistogramWithRegionByID.Observe(time.Since(start).Seconds())
if err != nil {
metrics.RegionCacheCounterWithGetRegionByIDError.Inc()
@ -2201,7 +2220,7 @@ func (c *RegionCache) scanRegions(bo *retry.Backoffer, startKey, endKey []byte,
}
start := time.Now()
//nolint:staticcheck
regionsInfo, err := c.pdClient.ScanRegions(ctx, startKey, endKey, limit, opt.WithAllowFollowerHandle())
regionsInfo, err := c.pdClient.ScanRegions(withPDCircuitBreaker(ctx), startKey, endKey, limit, opt.WithAllowFollowerHandle())
metrics.LoadRegionCacheHistogramWithRegions.Observe(time.Since(start).Seconds())
if err != nil {
if apicodec.IsDecodeError(err) {
@ -2270,7 +2289,7 @@ func (c *RegionCache) batchScanRegions(bo *retry.Backoffer, keyRanges []router.K
if batchOpt.needBuckets {
pdOpts = append(pdOpts, opt.WithBuckets())
}
regionsInfo, err := c.pdClient.BatchScanRegions(ctx, keyRanges, limit, pdOpts...)
regionsInfo, err := c.pdClient.BatchScanRegions(withPDCircuitBreaker(ctx), keyRanges, limit, pdOpts...)
metrics.LoadRegionCacheHistogramWithBatchScanRegions.Observe(time.Since(start).Seconds())
if err != nil {
if st, ok := status.FromError(err); ok && st.Code() == codes.Unimplemented {

View File

@ -55,6 +55,7 @@ import (
"github.com/tikv/pd/client/clients/tso"
"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/pkg/caller"
"github.com/tikv/pd/client/pkg/circuitbreaker"
sd "github.com/tikv/pd/client/servicediscovery"
"go.uber.org/atomic"
"google.golang.org/grpc/codes"
@ -226,6 +227,7 @@ func (m *mockTSFuture) Wait() (int64, int64, error) {
}
func (c *pdClient) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*router.Region, error) {
enforceCircuitBreakerFor("GetRegion", ctx)
region, peer, buckets, downPeers := c.cluster.GetRegionByKey(key)
if len(opts) == 0 {
buckets = nil
@ -244,6 +246,7 @@ func (c *pdClient) GetRegionFromMember(ctx context.Context, key []byte, memberUR
}
func (c *pdClient) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*router.Region, error) {
enforceCircuitBreakerFor("GetPrevRegion", ctx)
region, peer, buckets, downPeers := c.cluster.GetPrevRegionByKey(key)
if len(opts) == 0 {
buckets = nil
@ -252,16 +255,19 @@ func (c *pdClient) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.Ge
}
func (c *pdClient) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt.GetRegionOption) (*router.Region, error) {
enforceCircuitBreakerFor("GetRegionByID", ctx)
region, peer, buckets, downPeers := c.cluster.GetRegionByID(regionID)
return &router.Region{Meta: region, Leader: peer, Buckets: buckets, DownPeers: downPeers}, nil
}
func (c *pdClient) ScanRegions(ctx context.Context, startKey []byte, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*router.Region, error) {
enforceCircuitBreakerFor("ScanRegions", ctx)
regions := c.cluster.ScanRegions(startKey, endKey, limit, opts...)
return regions, nil
}
func (c *pdClient) BatchScanRegions(ctx context.Context, keyRanges []router.KeyRange, limit int, opts ...opt.GetRegionOption) ([]*router.Region, error) {
enforceCircuitBreakerFor("BatchScanRegions", ctx)
if _, err := util.EvalFailpoint("mockBatchScanRegionsUnimplemented"); err == nil {
return nil, status.Errorf(codes.Unimplemented, "mock BatchScanRegions is not implemented")
}
@ -465,3 +471,9 @@ func (m *pdClient) LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGrou
func (m *pdClient) GetServiceDiscovery() sd.ServiceDiscovery { return nil }
func (m *pdClient) WithCallerComponent(caller.Component) pd.Client { return m }
func enforceCircuitBreakerFor(name string, ctx context.Context) {
if circuitbreaker.FromContext(ctx) == nil {
panic(fmt.Errorf("CircuitBreaker must be configured for %s", name))
}
}

View File

@ -45,6 +45,7 @@ import (
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikvrpc"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/pkg/circuitbreaker"
)
// RPCContext contains data that is needed to send RPC to a region.
@ -197,6 +198,11 @@ func SetRegionCacheTTLSec(t int64) {
locate.SetRegionCacheTTLSec(t)
}
// ChangePDRegionMetaCircuitBreakerSettings changes circuit breaker settings for region metadata calls
func ChangePDRegionMetaCircuitBreakerSettings(apply func(config *circuitbreaker.Settings)) {
locate.ChangePDRegionMetaCircuitBreakerSettings(apply)
}
// SetRegionCacheTTLWithJitter sets region cache TTL with jitter. The real TTL is in range of [base, base+jitter).
func SetRegionCacheTTLWithJitter(base int64, jitter int64) {
locate.SetRegionCacheTTLWithJitter(base, jitter)