locate: implement SendReqAsync for RegionRequestSender (#1618)

ref tikv/client-go#1586

Signed-off-by: zyguan <zhongyangguan@gmail.com>
This commit is contained in:
zyguan 2025-05-22 19:51:40 +08:00 committed by GitHub
parent 1880726302
commit ff15611bb9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 649 additions and 37 deletions

View File

@ -64,6 +64,7 @@ import (
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util"
"github.com/tikv/client-go/v2/util/async"
"github.com/tikv/pd/client/errs"
pderr "github.com/tikv/pd/client/errs"
)
@ -458,6 +459,136 @@ func (s *RegionRequestSender) SendReq(
return resp, retryTimes, err
}
// SendReqAsync likes SendReq but sends a request asynchronously. It only tries async api once and will fallback to sync
// api if retry is needed.
func (s *RegionRequestSender) SendReqAsync(
bo *retry.Backoffer,
req *tikvrpc.Request,
regionID RegionVerID,
timeout time.Duration,
cb async.Callback[*tikvrpc.ResponseExt],
opts ...StoreSelectorOption,
) {
cli, ok := s.client.(client.ClientAsync)
if !ok {
cb.Invoke(nil, errors.Errorf("%T dose not implement ClientAsync interface", s.client))
return
}
if err := s.validateReadTS(bo.GetCtx(), req); err != nil {
logutil.Logger(bo.GetCtx()).Error("validate read ts failed for request", zap.Stringer("reqType", req.Type), zap.Stringer("req", req.Req.(fmt.Stringer)), zap.Stringer("context", &req.Context), zap.Stack("stack"), zap.Error(err))
cb.Invoke(nil, err)
return
}
if req.Context.MaxExecutionDurationMs == 0 {
req.Context.MaxExecutionDurationMs = uint64(timeout.Milliseconds())
}
s.reset()
startTime := time.Now()
startBackOff := bo.GetTotalSleep()
state := &sendReqState{
RegionRequestSender: s,
args: sendReqArgs{
bo: bo,
req: req,
regionID: regionID,
timeout: timeout,
et: tikvrpc.TiKV,
opts: opts,
},
}
cb.Inject(func(resp *tikvrpc.ResponseExt, err error) (*tikvrpc.ResponseExt, error) {
retryTimes := 0
if state.vars.sendTimes > 1 {
retryTimes = state.vars.sendTimes - 1
}
// slow log
if len(state.vars.msg) > 0 || err != nil {
if cost := time.Since(startTime); cost > slowLogSendReqTime || cost > timeout || bo.GetTotalSleep() > 1000 {
msg := state.vars.msg
if len(msg) == 0 {
msg = fmt.Sprintf("send request failed: %v", err)
}
s.logSendReqError(bo, msg, regionID, retryTimes, req, cost, bo.GetTotalSleep()-startBackOff, timeout)
}
}
// metrics
if retryTimes > 0 {
metrics.TiKVRequestRetryTimesHistogram.Observe(float64(retryTimes))
}
if req.StaleRead {
if state.vars.sendTimes == 1 {
metrics.StaleReadHitCounter.Add(1)
} else {
metrics.StaleReadMissCounter.Add(1)
}
}
return resp, err
})
if !state.initForAsyncRequest() {
cb.Invoke(state.toResponseExt())
return
}
var (
cancels = make([]context.CancelFunc, 0, 3)
ctx = bo.GetCtx()
hookCtx = ctx
)
if limit := kv.StoreLimit.Load(); limit > 0 {
if state.vars.err = s.getStoreToken(state.vars.rpcCtx.Store, limit); state.vars.err != nil {
cb.Invoke(state.toResponseExt())
return
}
cancels = append(cancels, func() { s.releaseStoreToken(state.vars.rpcCtx.Store) })
}
if rawHook := ctx.Value(RPCCancellerCtxKey{}); rawHook != nil {
var cancel context.CancelFunc
ctx, cancel = rawHook.(*RPCCanceller).WithCancel(ctx)
cancels = append(cancels, cancel)
hookCtx = ctx
}
if timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
cancels = append(cancels, cancel)
}
sendToAddr := state.vars.rpcCtx.Addr
if state.vars.rpcCtx.ProxyStore == nil {
req.ForwardedHost = ""
} else {
req.ForwardedHost = state.vars.rpcCtx.Addr
sendToAddr = state.vars.rpcCtx.ProxyAddr
}
cli.SendRequestAsync(ctx, sendToAddr, req, async.NewCallback(cb.Executor(), func(resp *tikvrpc.Response, err error) {
state.vars.sendTimes++
canceled := err != nil && hookCtx.Err() != nil && errors.Cause(hookCtx.Err()) == context.Canceled
if state.handleAsyncResponse(startTime, canceled, resp, err, cancels...) {
cb.Invoke(state.toResponseExt())
return
}
// retry
cb.Executor().Go(func() {
for !state.next() {
if retryTimes := state.vars.sendTimes - 1; retryTimes > 0 && retryTimes%100 == 0 {
logutil.Logger(bo.GetCtx()).Warn("retry", zap.Uint64("region", regionID.GetID()), zap.Int("times", retryTimes))
}
}
cb.Schedule(state.toResponseExt())
})
}))
}
func (s *RegionRequestSender) recordRPCAccessInfo(req *tikvrpc.Request, rpcCtx *RPCContext, err string) {
if req == nil || rpcCtx == nil || rpcCtx.Peer == nil || rpcCtx.Store == nil {
return
@ -727,11 +858,25 @@ func (s *RegionRequestSender) reset() {
const slowLogSendReqTime = 100 * time.Millisecond
// sendReqArgs defines the input arguments of the send request.
type sendReqArgs struct {
bo *retry.Backoffer
req *tikvrpc.Request
regionID RegionVerID
timeout time.Duration
et tikvrpc.EndpointType
opts []StoreSelectorOption
}
// sendReqState represents the state of sending request with retry, which allows us to construct a state and start to
// retry from that state.
type sendReqState struct {
*RegionRequestSender
// args holds the input arguments of the send request.
args sendReqArgs
// vars maintains the local variables used in the retry loop.
vars struct {
rpcCtx *RPCContext
resp *tikvrpc.Response
@ -746,14 +891,9 @@ type sendReqState struct {
// (s.vars.regionErr) if one of them exists. When the error is retriable, `next` then constructs a new RPCContext and
// sends the request again. `next` returns true if the retry loop should stop, either because the request is done or
// exhausted (cannot complete by retrying).
func (s *sendReqState) next(
bo *retry.Backoffer,
req *tikvrpc.Request,
regionID RegionVerID,
timeout time.Duration,
et tikvrpc.EndpointType,
opts []StoreSelectorOption,
) (done bool) {
func (s *sendReqState) next() (done bool) {
bo, req := s.args.bo, s.args.req
// check whether the session/query is killed during the Next()
if err := bo.CheckKilled(); err != nil {
s.vars.resp, s.vars.err = nil, err
@ -791,7 +931,7 @@ func (s *sendReqState) next(
req.IsRetryRequest = true
}
s.vars.rpcCtx, s.vars.err = s.getRPCContext(bo, req, regionID, et, opts...)
s.vars.rpcCtx, s.vars.err = s.getRPCContext(bo, req, s.args.regionID, s.args.et, s.args.opts...)
if s.vars.err != nil {
return true
}
@ -836,7 +976,7 @@ func (s *sendReqState) next(
}
}
logutil.Eventf(bo.GetCtx(), "send %s request to region %d at %s", req.Type, regionID.id, s.vars.rpcCtx.Addr)
logutil.Eventf(bo.GetCtx(), "send %s request to region %d at %s", req.Type, s.args.regionID.id, s.vars.rpcCtx.Addr)
s.storeAddr = s.vars.rpcCtx.Addr
req.Context.ClusterId = s.vars.rpcCtx.ClusterID
@ -868,7 +1008,7 @@ func (s *sendReqState) next(
defer s.releaseStoreToken(s.vars.rpcCtx.Store)
}
canceled := s.send(bo, req, timeout)
canceled := s.send()
s.vars.sendTimes++
if s.vars.err != nil {
@ -906,7 +1046,8 @@ func (s *sendReqState) next(
return true
}
func (s *sendReqState) send(bo *retry.Backoffer, req *tikvrpc.Request, timeout time.Duration) (canceled bool) {
func (s *sendReqState) send() (canceled bool) {
bo, req := s.args.bo, s.args.req
rpcCtx := s.vars.rpcCtx
ctx := bo.GetCtx()
if rawHook := ctx.Value(RPCCancellerCtxKey{}); rawHook != nil {
@ -968,7 +1109,7 @@ func (s *sendReqState) send(bo *retry.Backoffer, req *tikvrpc.Request, timeout t
if !injectFailOnSend {
start := time.Now()
s.vars.resp, s.vars.err = s.client.SendRequest(ctx, sendToAddr, req, timeout)
s.vars.resp, s.vars.err = s.client.SendRequest(ctx, sendToAddr, req, s.args.timeout)
rpcDuration := time.Since(start)
if s.replicaSelector != nil {
recordAttemptedTime(s.replicaSelector, rpcDuration)
@ -1058,6 +1199,135 @@ func (s *sendReqState) send(bo *retry.Backoffer, req *tikvrpc.Request, timeout t
return
}
// initForAsyncRequest initializes the state for an async request. It should be called once before the first `next`.
func (s *sendReqState) initForAsyncRequest() (ok bool) {
bo, req := s.args.bo, s.args.req
s.vars.rpcCtx, s.vars.err = s.getRPCContext(bo, req, s.args.regionID, s.args.et, s.args.opts...)
if s.vars.err != nil {
return false
}
if s.vars.rpcCtx == nil {
s.vars.regionErr = &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}
s.vars.resp, s.vars.err = tikvrpc.GenRegionErrorResp(req, s.vars.regionErr)
s.vars.msg = "throwing pseudo region error due to no replica available"
return false
}
s.storeAddr = s.vars.rpcCtx.Addr
if s.replicaSelector != nil &&
s.replicaSelector.target != nil &&
req.AccessLocation == kv.AccessUnknown &&
len(s.replicaSelector.option.labels) != 0 {
// patch the access location if it is not set under region request sender.
if s.replicaSelector.target.store.IsLabelsMatch(s.replicaSelector.option.labels) {
req.AccessLocation = kv.AccessLocalZone
} else {
req.AccessLocation = kv.AccessCrossZone
}
}
req.Context.ClusterId = s.vars.rpcCtx.ClusterID
if req.InputRequestSource != "" && s.replicaSelector != nil {
patchRequestSource(req, s.replicaSelector.replicaType())
}
if s.vars.err = tikvrpc.SetContextNoAttach(req, s.vars.rpcCtx.Meta, s.vars.rpcCtx.Peer); s.vars.err != nil {
return false
}
// Count the replica number as the RU cost factor.
req.ReplicaNumber = 1
if s.vars.rpcCtx.Meta != nil && len(s.vars.rpcCtx.Meta.GetPeers()) > 0 {
req.ReplicaNumber = 0
for _, peer := range s.vars.rpcCtx.Meta.GetPeers() {
role := peer.GetRole()
if role == metapb.PeerRole_Voter || role == metapb.PeerRole_Learner {
req.ReplicaNumber++
}
}
}
return true
}
// handleAsyncResponse handles the response of an async request.
func (s *sendReqState) handleAsyncResponse(start time.Time, canceled bool, resp *tikvrpc.Response, err error, cancels ...context.CancelFunc) (done bool) {
if len(cancels) > 0 {
defer func() {
for i := len(cancels) - 1; i >= 0; i-- {
cancels[i]()
}
}()
}
s.vars.resp, s.vars.err = resp, err
req := s.args.req
rpcDuration := time.Since(start)
if s.replicaSelector != nil {
recordAttemptedTime(s.replicaSelector, rpcDuration)
}
if s.Stats != nil {
s.Stats.RecordRPCRuntimeStats(req.Type, rpcDuration)
}
if s.vars.rpcCtx.Store != nil && req.ReplicaReadType == kv.ReplicaReadPreferLeader && !util.IsInternalRequest(req.RequestSource) {
s.vars.rpcCtx.Store.healthStatus.recordClientSideSlowScoreStat(rpcDuration)
}
if s.vars.rpcCtx.ProxyStore != nil {
fromStore := strconv.FormatUint(s.vars.rpcCtx.ProxyStore.storeID, 10)
toStore := strconv.FormatUint(s.vars.rpcCtx.Store.storeID, 10)
result := "ok"
if s.vars.err != nil {
result = "fail"
}
metrics.TiKVForwardRequestCounter.WithLabelValues(fromStore, toStore, req.Type.String(), result).Inc()
}
if err := s.vars.err; err != nil {
if isRPCError(err) {
s.rpcError = err
}
if s.Stats != nil {
errStr := getErrMsg(err)
s.Stats.RecordRPCErrorStats(errStr)
s.recordRPCAccessInfo(req, s.vars.rpcCtx, errStr)
}
if canceled {
metrics.TiKVRPCErrorCounter.WithLabelValues("context-canceled", storeIDLabel(s.vars.rpcCtx)).Inc()
return true
}
return false
}
s.vars.regionErr, s.vars.err = s.vars.resp.GetRegionError()
if s.vars.err != nil {
s.vars.rpcCtx, s.vars.resp = nil, nil
return true
} else if s.vars.regionErr != nil {
// need to handle region error
return false
}
if s.replicaSelector != nil {
s.replicaSelector.onSendSuccess(req)
}
return true
}
// toResponseExt converts the state to a ResponseExt .
func (s *sendReqState) toResponseExt() (*tikvrpc.ResponseExt, error) {
if s.vars.err != nil {
return nil, s.vars.err
}
if s.vars.resp == nil {
return nil, errors.New("invalid state: response is nil")
}
resp := &tikvrpc.ResponseExt{Response: *s.vars.resp}
if s.vars.rpcCtx != nil {
resp.Addr = s.vars.rpcCtx.Addr
}
return resp, nil
}
// SendReqCtx sends a request to tikv server and return response and RPCCtx of this RPC.
func (s *RegionRequestSender) SendReqCtx(
bo *retry.Backoffer,
@ -1072,6 +1342,27 @@ func (s *RegionRequestSender) SendReqCtx(
retryTimes int,
err error,
) {
if et == tikvrpc.TiKV {
if _, err := util.EvalFailpoint("useSendReqAsync"); err == nil {
complete := false
rl := async.NewRunLoop()
cb := async.NewCallback(rl, func(r *tikvrpc.ResponseExt, e error) {
if r != nil {
resp = &r.Response
}
err = e
complete = true
})
s.SendReqAsync(bo, req, regionID, timeout, cb, opts...)
for !complete {
if _, e := rl.Exec(bo.GetCtx()); e != nil {
return nil, nil, 0, e
}
}
return resp, nil, 0, err
}
}
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("regionRequest.SendReqCtx", opentracing.ChildOf(span.Context()))
defer span1.Finish()
@ -1093,7 +1384,18 @@ func (s *RegionRequestSender) SendReqCtx(
req.Context.MaxExecutionDurationMs = uint64(timeout.Milliseconds())
}
state := &sendReqState{RegionRequestSender: s}
state := &sendReqState{
RegionRequestSender: s,
args: sendReqArgs{
bo: bo,
req: req,
regionID: regionID,
timeout: timeout,
et: et,
opts: opts,
},
}
defer func() {
if retryTimes := state.vars.sendTimes - 1; retryTimes > 0 {
metrics.TiKVRequestRetryTimesHistogram.Observe(float64(retryTimes))
@ -1111,7 +1413,7 @@ func (s *RegionRequestSender) SendReqCtx(
startTime := time.Now()
startBackOff := bo.GetTotalSleep()
for !state.next(bo, req, regionID, timeout, et, opts) {
for !state.next() {
if retryTimes := state.vars.sendTimes - 1; retryTimes > 0 && retryTimes%100 == 0 {
logutil.Logger(bo.GetCtx()).Warn("retry", zap.Uint64("region", regionID.GetID()), zap.Int("times", retryTimes))
}

View File

@ -23,6 +23,7 @@ import (
"testing"
"time"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
@ -256,6 +257,20 @@ func TestRegionCacheStaleRead(t *testing.T) {
defer func() {
retry.BoTiKVServerBusy = originBoTiKVServerBusy
}()
testRegionCacheStaleRead(t)
}
func TestRegionCacheStaleReadUsingAsyncAPI(t *testing.T) {
originBoTiKVServerBusy := retry.BoTiKVServerBusy
failpoint.Enable("tikvclient/useSendReqAsync", `return(true)`)
defer func() {
retry.BoTiKVServerBusy = originBoTiKVServerBusy
failpoint.Disable("tikvclient/useSendReqAsync")
}()
testRegionCacheStaleRead(t)
}
func testRegionCacheStaleRead(t *testing.T) {
retry.BoTiKVServerBusy = retry.NewConfig("tikvServerBusy", &metrics.BackoffHistogramServerBusy, retry.NewBackoffFnCfg(2, 10, retry.EqualJitter), tikverr.ErrTiKVServerBusy)
regionCacheTestCases := []RegionCacheTestCase{
{

View File

@ -60,13 +60,16 @@ import (
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/config/retry"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/internal/apicodec"
"github.com/tikv/client-go/v2/internal/client"
"github.com/tikv/client-go/v2/internal/client/mockserver"
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/oracle/oracles"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util/async"
pd "github.com/tikv/pd/client"
pderr "github.com/tikv/pd/client/errs"
"google.golang.org/grpc"
@ -136,6 +139,13 @@ func (f *fnClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Re
return f.fn(ctx, addr, req, timeout)
}
func (f *fnClient) SendRequestAsync(ctx context.Context, addr string, req *tikvrpc.Request, cb async.Callback[*tikvrpc.Response]) {
go func() {
tikvrpc.AttachContext(req, req.Context)
cb.Schedule(f.fn(ctx, addr, req, 0))
}()
}
func (s *testRegionRequestToSingleStoreSuite) TestOnRegionError() {
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
Key: []byte("key"),
@ -146,7 +156,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestOnRegionError() {
s.NotNil(region)
// test stale command retry.
func() {
test := func() {
oc := s.regionRequestSender.client
defer func() {
s.regionRequestSender.client = oc
@ -163,7 +173,13 @@ func (s *testRegionRequestToSingleStoreSuite) TestOnRegionError() {
s.NotNil(resp)
regionErr, _ := resp.GetRegionError()
s.NotNil(regionErr)
}()
}
s.Run("Default", test)
failpoint.Enable("tikvclient/useSendReqAsync", `return(true)`)
defer failpoint.Disable("tikvclient/useSendReqAsync")
s.Run("AsyncAPI", test)
}
func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailByResourceGroupThrottled() {
@ -176,7 +192,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailByResourceGroupThrot
s.NotNil(region)
// test ErrClientResourceGroupThrottled handled by regionRequestSender
func() {
test := func() {
oc := s.regionRequestSender.client
defer func() {
s.regionRequestSender.client = oc
@ -194,10 +210,26 @@ func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailByResourceGroupThrot
s.Equal(epoch, storeNew.epoch)
// no rpc error if the error is ErrClientResourceGroupThrottled
s.Nil(s.regionRequestSender.rpcError)
}()
}
s.Run("Default", test)
failpoint.Enable("tikvclient/useSendReqAsync", `return(true)`)
defer failpoint.Disable("tikvclient/useSendReqAsync")
s.Run("AsyncAPI", test)
}
func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithStoreRestart() {
s.testOnSendFailedWithStoreRestart()
}
func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithStoreRestartUsingAsyncAPI() {
failpoint.Enable("tikvclient/useSendReqAsync", `return(true)`)
defer failpoint.Disable("tikvclient/useSendReqAsync")
s.testOnSendFailedWithStoreRestart()
}
func (s *testRegionRequestToSingleStoreSuite) testOnSendFailedWithStoreRestart() {
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
Key: []byte("key"),
Value: []byte("value"),
@ -232,6 +264,16 @@ func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithStoreRestart()
}
func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithCloseKnownStoreThenUseNewOne() {
s.testOnSendFailedWithCloseKnownStoreThenUseNewOne()
}
func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithCloseKnownStoreThenUseNewOneUsingAsyncAPI() {
failpoint.Enable("tikvclient/useSendReqAsync", `return(true)`)
defer failpoint.Disable("tikvclient/useSendReqAsync")
s.testOnSendFailedWithCloseKnownStoreThenUseNewOne()
}
func (s *testRegionRequestToSingleStoreSuite) testOnSendFailedWithCloseKnownStoreThenUseNewOne() {
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
Key: []byte("key"),
Value: []byte("value"),
@ -265,26 +307,17 @@ func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithCloseKnownStor
s.NotNil(resp.Resp)
}
func (s *testRegionRequestToSingleStoreSuite) TestSendReqCtx() {
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
Key: []byte("key"),
Value: []byte("value"),
})
region, err := s.cache.LocateRegionByID(s.bo, s.region)
s.Nil(err)
s.NotNil(region)
resp, ctx, _, err := s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, tikvrpc.TiKV)
s.Nil(err)
s.NotNil(resp.Resp)
s.NotNil(ctx)
req.ReplicaRead = true
resp, ctx, _, err = s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, tikvrpc.TiKV)
s.Nil(err)
s.NotNil(resp.Resp)
s.NotNil(ctx)
func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithCancelled() {
s.testOnSendFailedWithCancelled()
}
func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithCancelled() {
func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithCancelledUsingAsyncAPI() {
failpoint.Enable("tikvclient/useSendReqAsync", `return(true)`)
defer failpoint.Disable("tikvclient/useSendReqAsync")
s.testOnSendFailedWithCancelled()
}
func (s *testRegionRequestToSingleStoreSuite) testOnSendFailedWithCancelled() {
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
Key: []byte("key"),
Value: []byte("value"),
@ -315,6 +348,16 @@ func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithCancelled() {
}
func (s *testRegionRequestToSingleStoreSuite) TestNoReloadRegionWhenCtxCanceled() {
s.testNoReloadRegionWhenCtxCanceled()
}
func (s *testRegionRequestToSingleStoreSuite) TestNoReloadRegionWhenCtxCanceledUsingAsyncAPI() {
failpoint.Enable("tikvclient/useSendReqAsync", `return(true)`)
defer failpoint.Disable("tikvclient/useSendReqAsync")
s.testNoReloadRegionWhenCtxCanceled()
}
func (s *testRegionRequestToSingleStoreSuite) testNoReloadRegionWhenCtxCanceled() {
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
Key: []byte("key"),
Value: []byte("value"),
@ -335,6 +378,158 @@ func (s *testRegionRequestToSingleStoreSuite) TestNoReloadRegionWhenCtxCanceled(
s.NotNil(r)
}
func (s *testRegionRequestToSingleStoreSuite) TestSendReqCtx() {
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
Key: []byte("key"),
Value: []byte("value"),
})
region, err := s.cache.LocateRegionByID(s.bo, s.region)
s.Nil(err)
s.NotNil(region)
resp, ctx, _, err := s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, tikvrpc.TiKV)
s.Nil(err)
s.NotNil(resp.Resp)
s.NotNil(ctx)
req.ReplicaRead = true
resp, ctx, _, err = s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, tikvrpc.TiKV)
s.Nil(err)
s.NotNil(resp.Resp)
s.NotNil(ctx)
}
func (s *testRegionRequestToSingleStoreSuite) TestSendReqAsync() {
reachable.injectConstantLiveness(s.regionRequestSender.regionCache.stores)
ctx := context.Background()
rl := async.NewRunLoop()
s.Run("Basic", func() {
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
Key: []byte("key"),
Value: []byte("value"),
})
region, err := s.cache.LocateRegionByID(s.bo, s.region)
s.Nil(err)
s.NotNil(region)
complete := false
s.regionRequestSender.SendReqAsync(s.bo, req, region.Region, time.Second, async.NewCallback(rl, func(resp *tikvrpc.ResponseExt, err error) {
s.Nil(err)
s.NotNil(resp.Resp)
s.NotEmpty(resp.Addr)
complete = true
}))
for !complete {
_, err := rl.Exec(ctx)
s.Require().NoError(err)
}
})
s.Run("StoreLimit", func() {
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
Key: []byte("key"),
Value: []byte("value"),
})
region, err := s.cache.LocateRegionByID(s.bo, s.region)
s.Nil(err)
s.NotNil(region)
store := s.cache.stores.getOrInsertDefault(s.store)
defer func(storeLimit int64, tokenCount int64) {
kv.StoreLimit.Store(storeLimit)
store.tokenCount.Store(tokenCount)
}(kv.StoreLimit.Load(), store.tokenCount.Load())
kv.StoreLimit.Store(100)
store.tokenCount.Store(100)
complete := false
s.regionRequestSender.SendReqAsync(s.bo, req, region.Region, time.Second, async.NewCallback(rl, func(resp *tikvrpc.ResponseExt, err error) {
s.Nil(resp)
s.NotNil(err)
e, ok := errors.Cause(err).(*tikverr.ErrTokenLimit)
s.True(ok)
s.Equal(s.store, e.StoreID)
complete = true
}))
for !complete {
_, err := rl.Exec(ctx)
s.Require().NoError(err)
}
})
s.Run("RPCCancel", func() {
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
Key: []byte("key"),
Value: []byte("value"),
})
region, err := s.cache.LocateRegionByID(s.bo, s.region)
s.Nil(err)
s.NotNil(region)
defer func(ctx context.Context, cli client.Client) {
s.bo.SetCtx(ctx)
s.regionRequestSender.client = cli
}(s.bo.GetCtx(), s.regionRequestSender.client)
var once sync.Once
rpcCanceller := NewRPCanceller()
s.bo.SetCtx(context.WithValue(s.bo.GetCtx(), RPCCancellerCtxKey{}, rpcCanceller))
s.regionRequestSender.client = &fnClient{
fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
once.Do(func() { rpcCanceller.CancelAll() })
return nil, context.Canceled
},
}
complete := false
s.regionRequestSender.SendReqAsync(s.bo, req, region.Region, time.Second, async.NewCallback(rl, func(resp *tikvrpc.ResponseExt, err error) {
s.Nil(resp)
s.ErrorIs(err, context.Canceled)
complete = true
}))
for !complete {
_, err := rl.Exec(ctx)
s.Require().NoError(err)
}
})
s.Run("Timeout", func() {
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{
Key: []byte("key"),
Version: math.MaxUint64,
})
region, err := s.cache.LocateRegionByID(s.bo, s.region)
s.Nil(err)
s.NotNil(region)
defer func(cli client.Client) {
s.regionRequestSender.client = cli
}(s.regionRequestSender.client)
s.regionRequestSender.client = &fnClient{
fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
<-ctx.Done()
return nil, ctx.Err()
},
}
complete := false
s.regionRequestSender.SendReqAsync(s.bo, req, region.Region, 100*time.Millisecond, async.NewCallback(rl, func(resp *tikvrpc.ResponseExt, err error) {
s.Nil(err)
s.NotNil(resp)
regionErr, err := resp.GetRegionError()
s.Nil(err)
s.True(IsFakeRegionError(regionErr))
complete = true
}))
for !complete {
_, err := rl.Exec(ctx)
s.Require().NoError(err)
}
})
}
// cancelContextClient wraps rpcClient and always cancels context before sending requests.
type cancelContextClient struct {
client.Client

View File

@ -310,6 +310,20 @@ func TestReplicaReadAccessPathByCase(t *testing.T) {
s.SetupTest(t)
defer s.TearDownTest()
testReplicaReadAccessPathByCase(s)
}
func TestReplicaReadAccessPathByCaseUsingAsyncAPI(t *testing.T) {
s := new(testReplicaSelectorSuite)
s.SetupTest(t)
defer s.TearDownTest()
failpoint.Enable("tikvclient/useSendReqAsync", `return(true)`)
defer failpoint.Disable("tikvclient/useSendReqAsync")
testReplicaReadAccessPathByCase(s)
}
func testReplicaReadAccessPathByCase(s *testReplicaSelectorSuite) {
fakeEpochNotMatch := &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}} // fake region error, cause by no replica is available.
ca := replicaSelectorAccessPathCase{
reqType: tikvrpc.CmdGet,
@ -735,6 +749,20 @@ func TestReplicaReadAccessPathByCase2(t *testing.T) {
s.SetupTest(t)
defer s.TearDownTest()
testReplicaReadAccessPathByCase2(s)
}
func TestReplicaReadAccessPathByCase2UsingAsyncAPI(t *testing.T) {
s := new(testReplicaSelectorSuite)
s.SetupTest(t)
defer s.TearDownTest()
failpoint.Enable("tikvclient/useSendReqAsync", `return(true)`)
defer failpoint.Disable("tikvclient/useSendReqAsync")
testReplicaReadAccessPathByCase2(s)
}
func testReplicaReadAccessPathByCase2(s *testReplicaSelectorSuite) {
fakeEpochNotMatch := &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}
// Following cases are found by other test, careful.
ca := replicaSelectorAccessPathCase{
@ -1000,6 +1028,20 @@ func TestReplicaReadAccessPathByBasicCase(t *testing.T) {
s.SetupTest(t)
defer s.TearDownTest()
testReplicaReadAccessPathByBasicCase(s)
}
func TestReplicaReadAccessPathByBasicCaseUsingAsyncAPI(t *testing.T) {
s := new(testReplicaSelectorSuite)
s.SetupTest(t)
defer s.TearDownTest()
failpoint.Enable("tikvclient/useSendReqAsync", `return(true)`)
defer failpoint.Disable("tikvclient/useSendReqAsync")
testReplicaReadAccessPathByBasicCase(s)
}
func testReplicaReadAccessPathByBasicCase(s *testReplicaSelectorSuite) {
retryableErrors := []RegionErrorType{ServerIsBusyErr, ServerIsBusyWithEstimatedWaitMsErr, StaleCommandErr, MaxTimestampNotSyncedErr, ProposalInMergingModeErr, ReadIndexNotReadyErr, RegionNotInitializedErr, DiskFullErr}
noRetryErrors := []RegionErrorType{RegionNotFoundErr, KeyNotInRegionErr, EpochNotMatchErr, StoreNotMatchErr, RaftEntryTooLargeErr, RecoveryInProgressErr, FlashbackNotPreparedErr, IsWitnessErr, MismatchPeerIdErr, BucketVersionNotMatchErr}
for _, reqType := range []tikvrpc.CmdType{tikvrpc.CmdGet, tikvrpc.CmdPrewrite} {
@ -1144,6 +1186,20 @@ func TestReplicaReadAccessPathByLeaderCase(t *testing.T) {
s.SetupTest(t)
defer s.TearDownTest()
testReplicaReadAccessPathByLeaderCase(s)
}
func TestReplicaReadAccessPathByLeaderCaseUsingAsyncAPI(t *testing.T) {
s := new(testReplicaSelectorSuite)
s.SetupTest(t)
defer s.TearDownTest()
failpoint.Enable("tikvclient/useSendReqAsync", `return(true)`)
defer failpoint.Disable("tikvclient/useSendReqAsync")
testReplicaReadAccessPathByLeaderCase(s)
}
func testReplicaReadAccessPathByLeaderCase(s *testReplicaSelectorSuite) {
fakeEpochNotMatch := &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}} // fake region error, cause by no replica is available.
ca := replicaSelectorAccessPathCase{
reqType: tikvrpc.CmdGet,
@ -1462,6 +1518,20 @@ func TestReplicaReadAccessPathByFollowerCase(t *testing.T) {
s.SetupTest(t)
defer s.TearDownTest()
testReplicaReadAccessPathByFollowerCase(s)
}
func TestReplicaReadAccessPathByFollowerCaseUsingAsyncAPI(t *testing.T) {
s := new(testReplicaSelectorSuite)
s.SetupTest(t)
defer s.TearDownTest()
failpoint.Enable("tikvclient/useSendReqAsync", `return(true)`)
defer failpoint.Disable("tikvclient/useSendReqAsync")
testReplicaReadAccessPathByFollowerCase(s)
}
func testReplicaReadAccessPathByFollowerCase(s *testReplicaSelectorSuite) {
fakeEpochNotMatch := &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}
ca := replicaSelectorAccessPathCase{
reqType: tikvrpc.CmdGet,
@ -1563,6 +1633,20 @@ func TestReplicaReadAccessPathByMixedAndPreferLeaderCase(t *testing.T) {
s.SetupTest(t)
defer s.TearDownTest()
testReplicaReadAccessPathByMixedAndPreferLeaderCase(s)
}
func TestReplicaReadAccessPathByMixedAndPreferLeaderCaseUsingAsyncAPI(t *testing.T) {
s := new(testReplicaSelectorSuite)
s.SetupTest(t)
defer s.TearDownTest()
failpoint.Enable("tikvclient/useSendReqAsync", `return(true)`)
defer failpoint.Disable("tikvclient/useSendReqAsync")
testReplicaReadAccessPathByMixedAndPreferLeaderCase(s)
}
func testReplicaReadAccessPathByMixedAndPreferLeaderCase(s *testReplicaSelectorSuite) {
fakeEpochNotMatch := &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}
var ca replicaSelectorAccessPathCase
// since leader in store1, so ReplicaReadMixed and ReplicaReadPreferLeader will have the same access path.

View File

@ -52,6 +52,7 @@ import (
"github.com/tikv/client-go/v2/internal/client"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util"
"github.com/tikv/client-go/v2/util/async"
)
const requestMaxSize = 8 * 1024 * 1024
@ -1090,6 +1091,13 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
return resp, nil
}
// SendRequestAsync sends a request to mock cluster asynchronously.
func (c *RPCClient) SendRequestAsync(ctx context.Context, addr string, req *tikvrpc.Request, cb async.Callback[*tikvrpc.Response]) {
go func() {
cb.Schedule(c.SendRequest(ctx, addr, req, 0))
}()
}
// Close closes the client.
func (c *RPCClient) Close() error {
if c.coprHandler != nil {

View File

@ -672,6 +672,14 @@ type Response struct {
Resp interface{}
}
// ResponseExt likes Response but contains extra information.
type ResponseExt struct {
Response
// The address of the target store. When forwarding is enabled, it points to the target node handling the request
// rather than the node forwarding the message.
Addr string
}
// FromBatchCommandsResponse converts a BatchCommands response to Response.
func FromBatchCommandsResponse(res *tikvpb.BatchCommandsResponse_Response) (*Response, error) {
if res.GetCmd() == nil {