mirror of https://github.com/tikv/client-go.git
Introduce the switch
Signed-off-by: JmPotato <ghzpotato@gmail.com>
This commit is contained in:
parent
88f730126e
commit
320af6501e
|
|
@ -2,6 +2,8 @@ module integration_tests
|
|||
|
||||
go 1.18
|
||||
|
||||
replace github.com/tikv/pd => github.com/CabinfeverB/pd v1.1.0-beta.0.20230118184315-8efd2e043066
|
||||
|
||||
require (
|
||||
github.com/ninedraft/israce v0.0.3
|
||||
github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32
|
||||
|
|
@ -72,6 +74,7 @@ require (
|
|||
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect
|
||||
github.com/tidwall/match v1.1.1 // indirect
|
||||
github.com/tidwall/pretty v1.2.0 // indirect
|
||||
github.com/tikv/pd v1.1.0-beta.0.20230118040950-082fc6a9bc2e // indirect
|
||||
github.com/twmb/murmur3 v1.1.3 // indirect
|
||||
github.com/uber/jaeger-client-go v2.30.0+incompatible // indirect
|
||||
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
|
||||
|
|
|
|||
|
|
@ -388,6 +388,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
|
|||
github.com/BurntSushi/toml v1.2.1 h1:9F2/+DoOYIOksmaJFPw1tGFy1eDnIJXg+UHjuD8lTak=
|
||||
github.com/BurntSushi/toml v1.2.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
|
||||
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
|
||||
github.com/CabinfeverB/pd v1.1.0-beta.0.20230118184315-8efd2e043066 h1:Dzy4RAD15MmCOgHChzal7H+z+5uvwpVmmAB3C65YiEw=
|
||||
github.com/CabinfeverB/pd v1.1.0-beta.0.20230118184315-8efd2e043066/go.mod h1:ZJUZ8xZBjCWZAQixiX0y2EE5+TFiNJ9lnP0bfB3gOu8=
|
||||
github.com/CloudyKit/fastprinter v0.0.0-20170127035650-74b38d55f37a/go.mod h1:EFZQ978U7x8IRnstaskI3IysnWY5Ao3QgZUKOXlsAdw=
|
||||
github.com/CloudyKit/jet v2.1.3-0.20180809161101-62edd43e4f88+incompatible/go.mod h1:HPYO+50pSWkPoj9Q/eq0aRGByCL6ScRlUmiEX5Zgm+w=
|
||||
github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ=
|
||||
|
|
@ -772,14 +774,14 @@ github.com/onsi/ginkgo v1.13.0/go.mod h1:+REjRxOmWfHCjfv9TTWB1jD1Frx4XydAD3zm1ls
|
|||
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
|
||||
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
|
||||
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
|
||||
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
|
||||
github.com/onsi/gomega v1.20.1 h1:PA/3qinGoukvymdIDV8pii6tiZgC8kbmJO6Z5+b002Q=
|
||||
github.com/opentracing/basictracer-go v1.1.0 h1:Oa1fTSBvAl8pa3U+IJYqrKm0NALwH9OsgwOqDv4xJW0=
|
||||
github.com/opentracing/basictracer-go v1.1.0/go.mod h1:V2HZueSJEp879yv285Aap1BS69fQMD+MNP1mRs6mBQc=
|
||||
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
|
||||
github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs=
|
||||
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
|
||||
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
|
||||
github.com/petermattis/goid v0.0.0-20170504144140-0ded85884ba5 h1:rUMC+oZ89Om6l9wvUNjzI0ZrKrSnXzV+opsgAohYUNc=
|
||||
github.com/petermattis/goid v0.0.0-20211229010228-4d14c490ee36 h1:64bxqeTEN0/xoEqhKGowgihNuzISS9rEG6YUMU4bzJo=
|
||||
github.com/pingcap/badger v1.5.1-0.20230103063557-828f39b09b6d h1:AEcvKyVM8CUII3bYzgz8haFXtGiqcrtXW1csu/5UELY=
|
||||
github.com/pingcap/badger v1.5.1-0.20230103063557-828f39b09b6d/go.mod h1:p8QnkZnmyV8L/M/jzYb8rT7kv3bz9m7bn1Ju94wDifs=
|
||||
github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
|
||||
|
|
@ -850,7 +852,7 @@ github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUA
|
|||
github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
|
||||
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
|
||||
github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
|
||||
github.com/sasha-s/go-deadlock v0.0.0-20161201235124-341000892f3d h1:yVBZEAirqhDYAc7xftf/swe8eHcg63jqfwdqN8KSoR8=
|
||||
github.com/sasha-s/go-deadlock v0.2.0 h1:lMqc+fUb7RrFS3gQLtoQsJ7/6TV/pAIFvBsqX73DK8Y=
|
||||
github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZKJ48R1S7H23Ji7oFO5Bw=
|
||||
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
|
||||
github.com/shirou/gopsutil/v3 v3.22.9 h1:yibtJhIVEMcdw+tCTbOPiF1VcsuDeTE4utJ8Dm4c5eA=
|
||||
|
|
@ -942,6 +944,7 @@ github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5t
|
|||
github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg=
|
||||
github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
|
||||
go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU=
|
||||
go.etcd.io/etcd v0.5.0-alpha.5.0.20220915004622-85b640cee793 h1:fqmtdYQlwZ/vKWSz5amW+a4cnjg23ojz5iL7rjf08Wg=
|
||||
go.etcd.io/etcd/api/v3 v3.5.2 h1:tXok5yLlKyuQ/SXSjtqHc4uzNaMqZi2XsoSPr/LlJXI=
|
||||
go.etcd.io/etcd/api/v3 v3.5.2/go.mod h1:5GB2vv4A4AOn3yk7MftYGHkUfGtDHnEraIjym4dYz5A=
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.2 h1:4hzqQ6hIb3blLyQ8usCU4h3NghkqcsohEQ3o3VetYxE=
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ package client
|
|||
|
||||
import (
|
||||
"context"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/tikv/client-go/v2/internal/resource_control"
|
||||
|
|
@ -36,31 +37,10 @@ func NewInterceptedClient(client Client) Client {
|
|||
}
|
||||
|
||||
func (r interceptedClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
|
||||
// Build the resource control interceptor if there is one and the resource group name is given.
|
||||
var rcInterceptor interceptor.RPCInterceptor
|
||||
// Build the resource control interceptor.
|
||||
resourceGroupName := req.GetResourceGroupName()
|
||||
if resourceControlInterceptor != nil && len(resourceGroupName) > 0 {
|
||||
rcInterceptor = func(next interceptor.RPCInterceptorFunc) interceptor.RPCInterceptorFunc {
|
||||
return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
|
||||
reqInfo := resource_control.MakeRequestInfo(req)
|
||||
err := resourceControlInterceptor.OnRequestWait(ctx, resourceGroupName, reqInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp, err := next(target, req)
|
||||
if resp != nil {
|
||||
respInfo := resource_control.MakeResponseInfo(resp)
|
||||
resourceControlInterceptor.OnResponse(ctx, resourceGroupName, reqInfo, respInfo)
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
}
|
||||
}
|
||||
var finalInterceptor interceptor.RPCInterceptor = buildResourceControlInterceptor(ctx, req, resourceGroupName)
|
||||
// Chain the interceptors if there are multiple interceptors.
|
||||
var finalInterceptor interceptor.RPCInterceptor
|
||||
if rcInterceptor != nil {
|
||||
finalInterceptor = rcInterceptor
|
||||
}
|
||||
if it := interceptor.GetRPCInterceptorFromCtx(ctx); it != nil {
|
||||
if finalInterceptor != nil {
|
||||
finalInterceptor = interceptor.ChainRPCInterceptors(finalInterceptor, it)
|
||||
|
|
@ -76,7 +56,20 @@ func (r interceptedClient) SendRequest(ctx context.Context, addr string, req *ti
|
|||
return r.Client.SendRequest(ctx, addr, req, timeout)
|
||||
}
|
||||
|
||||
var resourceControlInterceptor client.ResourceGroupKVInterceptor
|
||||
var (
|
||||
resourceControlSwitch atomic.Bool
|
||||
resourceControlInterceptor client.ResourceGroupKVInterceptor
|
||||
)
|
||||
|
||||
// EnableResourceControl enables the resource control.
|
||||
func EnableResourceControl() {
|
||||
resourceControlSwitch.Store(true)
|
||||
}
|
||||
|
||||
// DisableResourceControl disables the resource control.
|
||||
func DisableResourceControl() {
|
||||
resourceControlSwitch.Store(false)
|
||||
}
|
||||
|
||||
// SetResourceControlInterceptor sets the interceptor for resource control.
|
||||
func SetResourceControlInterceptor(interceptor client.ResourceGroupKVInterceptor) {
|
||||
|
|
@ -87,3 +80,41 @@ func SetResourceControlInterceptor(interceptor client.ResourceGroupKVInterceptor
|
|||
func UnsetResourceControlInterceptor() {
|
||||
resourceControlInterceptor = nil
|
||||
}
|
||||
|
||||
// buildResourceControlInterceptor builds a resource control interceptor with
|
||||
// the given resource group name.
|
||||
func buildResourceControlInterceptor(
|
||||
ctx context.Context,
|
||||
req *tikvrpc.Request,
|
||||
resourceGroupName string,
|
||||
) interceptor.RPCInterceptor {
|
||||
if !resourceControlSwitch.Load() {
|
||||
return nil
|
||||
}
|
||||
// When the group name is empty or "default", we don't need to
|
||||
// perform the resource control.
|
||||
if len(resourceGroupName) == 0 || resourceGroupName == "default" {
|
||||
return nil
|
||||
}
|
||||
// No resource group interceptor is set.
|
||||
if resourceControlInterceptor == nil {
|
||||
return nil
|
||||
}
|
||||
// Make the request info.
|
||||
reqInfo := resource_control.MakeRequestInfo(req)
|
||||
// Build the interceptor.
|
||||
return func(next interceptor.RPCInterceptorFunc) interceptor.RPCInterceptorFunc {
|
||||
return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
|
||||
err := resourceControlInterceptor.OnRequestWait(ctx, resourceGroupName, reqInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp, err := next(target, req)
|
||||
if resp != nil {
|
||||
respInfo := resource_control.MakeResponseInfo(resp)
|
||||
resourceControlInterceptor.OnResponse(ctx, resourceGroupName, reqInfo, respInfo)
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue