diff --git a/internal/client/client_interceptor.go b/internal/client/client_interceptor.go index b4ceab38..6ad7acd3 100644 --- a/internal/client/client_interceptor.go +++ b/internal/client/client_interceptor.go @@ -19,7 +19,6 @@ import ( "sync/atomic" "time" - "github.com/tikv/client-go/v2/internal/logutil" "github.com/tikv/client-go/v2/internal/resourcecontrol" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/tikvrpc/interceptor" @@ -114,6 +113,14 @@ func (r interceptedClient) SendRequestAsync(ctx context.Context, addr string, re } cb.Inject(func(resp *tikvrpc.Response, err error) (*tikvrpc.Response, error) { + if ctxInterceptor := interceptor.GetRPCInterceptorFromCtx(ctx); ctxInterceptor != nil { + // TODO(zyguan): In async API, the interceptor is only triggered upon receiving the response. Maybe + // support AsyncRPCInterceptor later. + getResp := func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { + return resp, err + } + resp, err = ctxInterceptor.Wrap(getResp)(addr, req) + } if resp != nil { respInfo := resourcecontrol.MakeResponseInfo(resp) consumption, waitDuration, err := resourceControlInterceptor.OnResponseWait(ctx, resourceGroupName, reqInfo, respInfo) @@ -128,11 +135,6 @@ func (r interceptedClient) SendRequestAsync(ctx context.Context, addr string, re }) } - if ctxInterceptor := interceptor.GetRPCInterceptorFromCtx(ctx); ctxInterceptor != nil { - // TODO(zyguan): support AsyncRPCInterceptor - logutil.Logger(ctx).Warn("SendRequestAsync with interceptor is unsupported") - } - r.Client.SendRequestAsync(ctx, addr, req, cb) }