mirror of https://github.com/tikv/client-go.git
258 lines
9.4 KiB
Go
258 lines
9.4 KiB
Go
// Copyright 2021 TiKV Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package client
|
|
|
|
import (
|
|
"context"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
"github.com/tikv/client-go/v2/internal/logutil"
|
|
"github.com/tikv/client-go/v2/internal/resourcecontrol"
|
|
"github.com/tikv/client-go/v2/tikvrpc"
|
|
"github.com/tikv/client-go/v2/tikvrpc/interceptor"
|
|
"github.com/tikv/client-go/v2/util"
|
|
"github.com/tikv/client-go/v2/util/async"
|
|
resourceControlClient "github.com/tikv/pd/client/resource_group/controller"
|
|
)
|
|
|
|
func init() {
|
|
ResourceControlSwitch.Store(false)
|
|
}
|
|
|
|
var _ Client = interceptedClient{}
|
|
|
|
type interceptedClient struct {
|
|
Client
|
|
}
|
|
|
|
// NewInterceptedClient creates a Client which can execute interceptor.
|
|
func NewInterceptedClient(client Client) Client {
|
|
return interceptedClient{client}
|
|
}
|
|
|
|
func (r interceptedClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (resp *tikvrpc.Response, err error) {
|
|
var ruDetails *util.RUDetails
|
|
|
|
resourceGroupName, resourceControlInterceptor, reqInfo := getResourceControlInfo(ctx, req)
|
|
if resourceControlInterceptor != nil {
|
|
consumption, penalty, waitDuration, priority, err := resourceControlInterceptor.OnRequestWait(ctx, resourceGroupName, reqInfo)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
req.GetResourceControlContext().Penalty = penalty
|
|
// override request priority with resource group priority if it's not set.
|
|
// Get the priority at tikv side has some performance issue, so we pass it
|
|
// at client side. See: https://github.com/tikv/tikv/issues/15994 for more details.
|
|
if req.GetResourceControlContext().OverridePriority == 0 {
|
|
req.GetResourceControlContext().OverridePriority = uint64(priority)
|
|
}
|
|
|
|
if val := ctx.Value(util.RUDetailsCtxKey); val != nil {
|
|
ruDetails = val.(*util.RUDetails)
|
|
ruDetails.Update(consumption, waitDuration)
|
|
}
|
|
}
|
|
|
|
if ctxInterceptor := interceptor.GetRPCInterceptorFromCtx(ctx); ctxInterceptor == nil {
|
|
resp, err = r.Client.SendRequest(ctx, addr, req, timeout)
|
|
} else {
|
|
resp, err = ctxInterceptor.Wrap(func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
|
|
return r.Client.SendRequest(ctx, target, req, timeout)
|
|
})(addr, req)
|
|
}
|
|
|
|
if resourceControlInterceptor != nil && resp != nil {
|
|
respInfo := resourcecontrol.MakeResponseInfo(resp)
|
|
consumption, waitDuration, err := resourceControlInterceptor.OnResponseWait(ctx, resourceGroupName, reqInfo, respInfo)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if ruDetails != nil {
|
|
ruDetails.Update(consumption, waitDuration)
|
|
}
|
|
}
|
|
|
|
return resp, err
|
|
}
|
|
|
|
func (r interceptedClient) SendRequestAsync(ctx context.Context, addr string, req *tikvrpc.Request, cb async.Callback[*tikvrpc.Response]) {
|
|
cli, ok := r.Client.(ClientAsync)
|
|
if !ok {
|
|
cb.Invoke(nil, errors.Errorf("%T dose not implement ClientAsync interface", r.Client))
|
|
return
|
|
}
|
|
|
|
// since all async requests processed by one runloop share the same resource group, if the quota is exceeded, all
|
|
// requests/responses shall wait for the tokens, thus it's ok to call OnRequestWait/OnResponseWait synchronously.
|
|
resourceGroupName, resourceControlInterceptor, reqInfo := getResourceControlInfo(ctx, req)
|
|
if resourceControlInterceptor != nil {
|
|
consumption, penalty, waitDuration, priority, err := resourceControlInterceptor.OnRequestWait(ctx, resourceGroupName, reqInfo)
|
|
if err != nil {
|
|
cb.Invoke(nil, err)
|
|
return
|
|
}
|
|
req.GetResourceControlContext().Penalty = penalty
|
|
// override request priority with resource group priority if it's not set.
|
|
// Get the priority at tikv side has some performance issue, so we pass it
|
|
// at client side. See: https://github.com/tikv/tikv/issues/15994 for more details.
|
|
if req.GetResourceControlContext().OverridePriority == 0 {
|
|
req.GetResourceControlContext().OverridePriority = uint64(priority)
|
|
}
|
|
|
|
var ruDetails *util.RUDetails
|
|
|
|
if val := ctx.Value(util.RUDetailsCtxKey); val != nil {
|
|
ruDetails = val.(*util.RUDetails)
|
|
ruDetails.Update(consumption, waitDuration)
|
|
}
|
|
|
|
cb.Inject(func(resp *tikvrpc.Response, err error) (*tikvrpc.Response, error) {
|
|
if resp != nil {
|
|
respInfo := resourcecontrol.MakeResponseInfo(resp)
|
|
consumption, waitDuration, err := resourceControlInterceptor.OnResponseWait(ctx, resourceGroupName, reqInfo, respInfo)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if ruDetails != nil {
|
|
ruDetails.Update(consumption, waitDuration)
|
|
}
|
|
}
|
|
return resp, err
|
|
})
|
|
}
|
|
|
|
if ctxInterceptor := interceptor.GetRPCInterceptorFromCtx(ctx); ctxInterceptor != nil {
|
|
// TODO(zyguan): AsyncRPCInterceptor
|
|
logutil.Logger(ctx).Warn("SendRequestAsync with interceptor is unsupported")
|
|
}
|
|
|
|
cli.SendRequestAsync(ctx, addr, req, cb)
|
|
}
|
|
|
|
var (
|
|
// ResourceControlSwitch is used to control whether to enable the resource control.
|
|
ResourceControlSwitch atomic.Value
|
|
// ResourceControlInterceptor is used to build the resource control interceptor.
|
|
ResourceControlInterceptor atomic.Pointer[resourceControlClient.ResourceGroupKVInterceptor]
|
|
)
|
|
|
|
func getResourceControlInfo(ctx context.Context, req *tikvrpc.Request) (
|
|
string,
|
|
resourceControlClient.ResourceGroupKVInterceptor,
|
|
*resourcecontrol.RequestInfo,
|
|
) {
|
|
resourceGroupName := req.GetResourceControlContext().GetResourceGroupName()
|
|
if len(resourceGroupName) == 0 {
|
|
return "", nil, nil
|
|
}
|
|
if !ResourceControlSwitch.Load().(bool) {
|
|
return "", nil, nil
|
|
}
|
|
rcInterceptor := ResourceControlInterceptor.Load()
|
|
if rcInterceptor == nil {
|
|
return "", nil, nil
|
|
}
|
|
// bypass some internal requests and it's may influence user experience. For example, the
|
|
// request of `alter user password`, totally bypasses the resource control. it's not cost
|
|
// many resources, but it's may influence the user experience.
|
|
// If the resource group has background jobs, we should not record consumption and wait for it.
|
|
// Background jobs will record and report in tikv side.
|
|
resourceControlInterceptor := *rcInterceptor
|
|
if resourceControlInterceptor.IsBackgroundRequest(ctx, resourceGroupName, req.RequestSource) {
|
|
return "", nil, nil
|
|
}
|
|
reqInfo := resourcecontrol.MakeRequestInfo(req)
|
|
if reqInfo.Bypass() {
|
|
return "", nil, nil
|
|
}
|
|
return resourceGroupName, resourceControlInterceptor, reqInfo
|
|
}
|
|
|
|
// buildResourceControlInterceptor builds a resource control interceptor with
|
|
// the given resource group name.
|
|
//
|
|
// Deprecated: embedded in `interceptedClient.SendRequest` directly to reduce overhead.
|
|
func buildResourceControlInterceptor( //nolint:unused
|
|
ctx context.Context,
|
|
req *tikvrpc.Request,
|
|
) interceptor.RPCInterceptor {
|
|
if !ResourceControlSwitch.Load().(bool) {
|
|
return nil
|
|
}
|
|
resourceGroupName := req.GetResourceControlContext().GetResourceGroupName()
|
|
// When the group name is empty, we don't need to perform the resource control.
|
|
if len(resourceGroupName) == 0 {
|
|
return nil
|
|
}
|
|
|
|
rcInterceptor := ResourceControlInterceptor.Load()
|
|
// No resource group interceptor is set.
|
|
if rcInterceptor == nil {
|
|
return nil
|
|
}
|
|
resourceControlInterceptor := *rcInterceptor
|
|
|
|
ruDetails := ctx.Value(util.RUDetailsCtxKey)
|
|
|
|
// Make the request info.
|
|
reqInfo := resourcecontrol.MakeRequestInfo(req)
|
|
// Build the interceptor.
|
|
interceptFn := func(next interceptor.RPCInterceptorFunc) interceptor.RPCInterceptorFunc {
|
|
return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
|
|
// bypass some internal requests and it's may influence user experience. For example, the
|
|
// request of `alter user password`, totally bypasses the resource control. it's not cost
|
|
// many resources, but it's may influence the user experience.
|
|
// If the resource group has background jobs, we should not record consumption and wait for it.
|
|
// Background jobs will record and report in tikv side.
|
|
if reqInfo.Bypass() || resourceControlInterceptor.IsBackgroundRequest(ctx, resourceGroupName, req.RequestSource) {
|
|
return next(target, req)
|
|
}
|
|
|
|
consumption, penalty, waitDuration, priority, err := resourceControlInterceptor.OnRequestWait(ctx, resourceGroupName, reqInfo)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
req.GetResourceControlContext().Penalty = penalty
|
|
// override request priority with resource group priority if it's not set.
|
|
// Get the priority at tikv side has some performance issue, so we pass it
|
|
// at client side. See: https://github.com/tikv/tikv/issues/15994 for more details.
|
|
if req.GetResourceControlContext().OverridePriority == 0 {
|
|
req.GetResourceControlContext().OverridePriority = uint64(priority)
|
|
}
|
|
if ruDetails != nil {
|
|
detail := ruDetails.(*util.RUDetails)
|
|
detail.Update(consumption, waitDuration)
|
|
}
|
|
|
|
resp, err := next(target, req)
|
|
if resp != nil {
|
|
respInfo := resourcecontrol.MakeResponseInfo(resp)
|
|
consumption, waitDuration, err = resourceControlInterceptor.OnResponseWait(ctx, resourceGroupName, reqInfo, respInfo)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if ruDetails != nil {
|
|
detail := ruDetails.(*util.RUDetails)
|
|
detail.Update(consumption, waitDuration)
|
|
}
|
|
}
|
|
return resp, err
|
|
}
|
|
}
|
|
return interceptor.NewRPCInterceptor("resource_control", interceptFn)
|
|
}
|