mirror of https://github.com/tikv/client-go.git
region_request: ignore resource group errors that not relative storage layer (#1354)
Signed-off-by: nolouch <nolouch@gmail.com>
This commit is contained in:
parent
ac8fa1d73a
commit
4f2562f987
|
|
@ -63,6 +63,7 @@ import (
|
||||||
"github.com/tikv/client-go/v2/metrics"
|
"github.com/tikv/client-go/v2/metrics"
|
||||||
"github.com/tikv/client-go/v2/tikvrpc"
|
"github.com/tikv/client-go/v2/tikvrpc"
|
||||||
"github.com/tikv/client-go/v2/util"
|
"github.com/tikv/client-go/v2/util"
|
||||||
|
"github.com/tikv/pd/client/errs"
|
||||||
pderr "github.com/tikv/pd/client/errs"
|
pderr "github.com/tikv/pd/client/errs"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -1186,7 +1187,9 @@ func (s *RegionRequestSender) sendReqToRegion(
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.rpcError = err
|
if isRPCError(err) {
|
||||||
|
s.rpcError = err
|
||||||
|
}
|
||||||
if s.Stats != nil {
|
if s.Stats != nil {
|
||||||
errStr := getErrMsg(err)
|
errStr := getErrMsg(err)
|
||||||
s.Stats.RecordRPCErrorStats(errStr)
|
s.Stats.RecordRPCErrorStats(errStr)
|
||||||
|
|
@ -1213,6 +1216,11 @@ func (s *RegionRequestSender) sendReqToRegion(
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func isRPCError(err error) bool {
|
||||||
|
// exclude ErrClientResourceGroupThrottled
|
||||||
|
return err != nil && errs.ErrClientResourceGroupThrottled.NotEqual(err)
|
||||||
|
}
|
||||||
|
|
||||||
func storeIDLabel(rpcCtx *RPCContext) string {
|
func storeIDLabel(rpcCtx *RPCContext) string {
|
||||||
if rpcCtx != nil && rpcCtx.Store != nil {
|
if rpcCtx != nil && rpcCtx.Store != nil {
|
||||||
return strconv.FormatUint(rpcCtx.Store.storeID, 10)
|
return strconv.FormatUint(rpcCtx.Store.storeID, 10)
|
||||||
|
|
@ -1290,16 +1298,6 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, r
|
||||||
metrics.TiKVRPCErrorCounter.WithLabelValues("unknown", storeLabel).Inc()
|
metrics.TiKVRPCErrorCounter.WithLabelValues("unknown", storeLabel).Inc()
|
||||||
}
|
}
|
||||||
|
|
||||||
if ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlashCompute {
|
|
||||||
s.regionCache.InvalidateTiFlashComputeStoresIfGRPCError(err)
|
|
||||||
} else if ctx.Meta != nil {
|
|
||||||
if s.replicaSelector != nil {
|
|
||||||
s.replicaSelector.onSendFailure(bo, err)
|
|
||||||
} else {
|
|
||||||
s.regionCache.OnSendFail(bo, ctx, s.NeedReloadRegion(ctx), err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// don't need to retry for ResourceGroup error
|
// don't need to retry for ResourceGroup error
|
||||||
if errors.Is(err, pderr.ErrClientResourceGroupThrottled) {
|
if errors.Is(err, pderr.ErrClientResourceGroupThrottled) {
|
||||||
return err
|
return err
|
||||||
|
|
@ -1312,6 +1310,16 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, r
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlashCompute {
|
||||||
|
s.regionCache.InvalidateTiFlashComputeStoresIfGRPCError(err)
|
||||||
|
} else if ctx.Meta != nil {
|
||||||
|
if s.replicaSelector != nil {
|
||||||
|
s.replicaSelector.onSendFailure(bo, err)
|
||||||
|
} else {
|
||||||
|
s.regionCache.OnSendFail(bo, ctx, s.NeedReloadRegion(ctx), err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Retry on send request failure when it's not canceled.
|
// Retry on send request failure when it's not canceled.
|
||||||
// When a store is not available, the leader of related region should be elected quickly.
|
// When a store is not available, the leader of related region should be elected quickly.
|
||||||
// TODO: the number of retry time should be limited:since region may be unavailable
|
// TODO: the number of retry time should be limited:since region may be unavailable
|
||||||
|
|
|
||||||
|
|
@ -65,6 +65,7 @@ import (
|
||||||
"github.com/tikv/client-go/v2/internal/client/mockserver"
|
"github.com/tikv/client-go/v2/internal/client/mockserver"
|
||||||
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
|
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
|
||||||
"github.com/tikv/client-go/v2/tikvrpc"
|
"github.com/tikv/client-go/v2/tikvrpc"
|
||||||
|
pderr "github.com/tikv/pd/client/errs"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -160,6 +161,37 @@ func (s *testRegionRequestToSingleStoreSuite) TestOnRegionError() {
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailByResourceGroupThrottled() {
|
||||||
|
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
|
||||||
|
Key: []byte("key"),
|
||||||
|
Value: []byte("value"),
|
||||||
|
})
|
||||||
|
region, err := s.cache.LocateRegionByID(s.bo, s.region)
|
||||||
|
s.Nil(err)
|
||||||
|
s.NotNil(region)
|
||||||
|
|
||||||
|
// test ErrClientResourceGroupThrottled handled by regionRequestSender
|
||||||
|
func() {
|
||||||
|
oc := s.regionRequestSender.client
|
||||||
|
defer func() {
|
||||||
|
s.regionRequestSender.client = oc
|
||||||
|
}()
|
||||||
|
storeOld, _ := s.regionRequestSender.regionCache.stores.get(1)
|
||||||
|
epoch := storeOld.epoch
|
||||||
|
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
|
||||||
|
return nil, pderr.ErrClientResourceGroupThrottled
|
||||||
|
}}
|
||||||
|
bo := retry.NewBackofferWithVars(context.Background(), 5, nil)
|
||||||
|
_, _, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Second)
|
||||||
|
s.NotNil(err)
|
||||||
|
storeNew, _ := s.regionRequestSender.regionCache.stores.get(1)
|
||||||
|
// not mark the store need be refill, then the epoch should not be changed.
|
||||||
|
s.Equal(epoch, storeNew.epoch)
|
||||||
|
// no rpc error if the error is ErrClientResourceGroupThrottled
|
||||||
|
s.Nil(s.regionRequestSender.rpcError)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithStoreRestart() {
|
func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithStoreRestart() {
|
||||||
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
|
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
|
||||||
Key: []byte("key"),
|
Key: []byte("key"),
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue