mirror of https://github.com/tikv/client-go.git
client: support interceptor in SendRequestAsync (#1735)
Signed-off-by: zyguan <zhongyangguan@gmail.com>
This commit is contained in:
parent
a05a5382fb
commit
c0b6188ffd
|
|
@ -19,7 +19,6 @@ import (
|
|||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/tikv/client-go/v2/internal/logutil"
|
||||
"github.com/tikv/client-go/v2/internal/resourcecontrol"
|
||||
"github.com/tikv/client-go/v2/tikvrpc"
|
||||
"github.com/tikv/client-go/v2/tikvrpc/interceptor"
|
||||
|
|
@ -114,6 +113,14 @@ func (r interceptedClient) SendRequestAsync(ctx context.Context, addr string, re
|
|||
}
|
||||
|
||||
cb.Inject(func(resp *tikvrpc.Response, err error) (*tikvrpc.Response, error) {
|
||||
if ctxInterceptor := interceptor.GetRPCInterceptorFromCtx(ctx); ctxInterceptor != nil {
|
||||
// TODO(zyguan): In async API, the interceptor is only triggered upon receiving the response. Maybe
|
||||
// support AsyncRPCInterceptor later.
|
||||
getResp := func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
|
||||
return resp, err
|
||||
}
|
||||
resp, err = ctxInterceptor.Wrap(getResp)(addr, req)
|
||||
}
|
||||
if resp != nil {
|
||||
respInfo := resourcecontrol.MakeResponseInfo(resp)
|
||||
consumption, waitDuration, err := resourceControlInterceptor.OnResponseWait(ctx, resourceGroupName, reqInfo, respInfo)
|
||||
|
|
@ -128,11 +135,6 @@ func (r interceptedClient) SendRequestAsync(ctx context.Context, addr string, re
|
|||
})
|
||||
}
|
||||
|
||||
if ctxInterceptor := interceptor.GetRPCInterceptorFromCtx(ctx); ctxInterceptor != nil {
|
||||
// TODO(zyguan): support AsyncRPCInterceptor
|
||||
logutil.Logger(ctx).Warn("SendRequestAsync with interceptor is unsupported")
|
||||
}
|
||||
|
||||
r.Client.SendRequestAsync(ctx, addr, req, cb)
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue