mirror of https://github.com/tikv/client-go.git
client: implement SendRequestAsync for RPCClient (#1604)
ref tikv/client-go#1586 Signed-off-by: zyguan <zhongyangguan@gmail.com>
This commit is contained in:
parent
bdaed3eba7
commit
70e1ca6d5c
|
|
@ -0,0 +1,196 @@
|
|||
// Copyright 2025 TiKV Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"runtime/trace"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/tikv/client-go/v2/config"
|
||||
"github.com/tikv/client-go/v2/internal/logutil"
|
||||
"github.com/tikv/client-go/v2/metrics"
|
||||
"github.com/tikv/client-go/v2/tikvrpc"
|
||||
"github.com/tikv/client-go/v2/util"
|
||||
"github.com/tikv/client-go/v2/util/async"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type ClientAsync interface {
|
||||
Client
|
||||
// SendRequestAsync sends a request to the target address asynchronously.
|
||||
SendRequestAsync(ctx context.Context, addr string, req *tikvrpc.Request, cb async.Callback[*tikvrpc.Response])
|
||||
}
|
||||
|
||||
// SendRequestAsync implements the ClientAsync interface.
|
||||
func (c *RPCClient) SendRequestAsync(ctx context.Context, addr string, req *tikvrpc.Request, cb async.Callback[*tikvrpc.Response]) {
|
||||
var err error
|
||||
|
||||
if atomic.CompareAndSwapUint32(&c.idleNotify, 1, 0) {
|
||||
go c.recycleIdleConnArray()
|
||||
}
|
||||
|
||||
if config.GetGlobalConfig().TiKVClient.MaxBatchSize == 0 {
|
||||
cb.Invoke(nil, errors.New("batch client is disabled"))
|
||||
return
|
||||
}
|
||||
if req.StoreTp != tikvrpc.TiKV {
|
||||
cb.Invoke(nil, errors.New("unsupported store type: "+req.StoreTp.Name()))
|
||||
return
|
||||
}
|
||||
|
||||
batchReq := req.ToBatchCommandsRequest()
|
||||
if batchReq == nil {
|
||||
cb.Invoke(nil, errors.New("unsupported request type: "+req.Type.String()))
|
||||
return
|
||||
}
|
||||
|
||||
regionRPC := trace.StartRegion(ctx, req.Type.String())
|
||||
spanRPC := opentracing.SpanFromContext(ctx)
|
||||
if spanRPC != nil && spanRPC.Tracer() != nil {
|
||||
spanRPC = spanRPC.Tracer().StartSpan(fmt.Sprintf("rpcClient.SendRequestAsync, region ID: %d, type: %s", req.RegionId, req.Type), opentracing.ChildOf(spanRPC.Context()))
|
||||
ctx = opentracing.ContextWithSpan(ctx, spanRPC)
|
||||
}
|
||||
|
||||
useCodec := c.option != nil && c.option.codec != nil
|
||||
if useCodec {
|
||||
req, err = c.option.codec.EncodeRequest(req)
|
||||
if err != nil {
|
||||
cb.Invoke(nil, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
tikvrpc.AttachContext(req, req.Context)
|
||||
|
||||
// TODO(zyguan): If the client created `WithGRPCDialOptions(grpc.WithBlock())`, `getConnArray` might be blocked for
|
||||
// a while when the corresponding conn array is uninitialized. However, since tidb won't set this option, we just
|
||||
// keep `getConnArray` synchronous for now.
|
||||
connArray, err := c.getConnArray(addr, true)
|
||||
if err != nil {
|
||||
cb.Invoke(nil, err)
|
||||
return
|
||||
}
|
||||
|
||||
var (
|
||||
entry = &batchCommandsEntry{
|
||||
ctx: ctx,
|
||||
req: batchReq,
|
||||
cb: cb,
|
||||
forwardedHost: req.ForwardedHost,
|
||||
canceled: 0,
|
||||
err: nil,
|
||||
pri: req.GetResourceControlContext().GetOverridePriority(),
|
||||
start: time.Now(),
|
||||
}
|
||||
stop func() bool
|
||||
)
|
||||
|
||||
// defer post actions
|
||||
entry.cb.Inject(func(resp *tikvrpc.Response, err error) (*tikvrpc.Response, error) {
|
||||
if stop != nil {
|
||||
stop()
|
||||
}
|
||||
|
||||
elapsed := time.Since(entry.start)
|
||||
|
||||
// batch client metrics
|
||||
if sendLat := atomic.LoadInt64(&entry.sendLat); sendLat > 0 {
|
||||
metrics.BatchRequestDurationSend.Observe(time.Duration(sendLat).Seconds())
|
||||
}
|
||||
if recvLat := atomic.LoadInt64(&entry.recvLat); recvLat > 0 {
|
||||
metrics.BatchRequestDurationRecv.Observe(time.Duration(recvLat).Seconds())
|
||||
}
|
||||
metrics.BatchRequestDurationDone.Observe(elapsed.Seconds())
|
||||
|
||||
// rpc metrics
|
||||
connArray.updateRPCMetrics(req, resp, elapsed)
|
||||
|
||||
// resource control
|
||||
if stmtExec := ctx.Value(util.ExecDetailsKey); stmtExec != nil {
|
||||
execDetails := stmtExec.(*util.ExecDetails)
|
||||
atomic.AddInt64(&execDetails.WaitKVRespDuration, int64(elapsed))
|
||||
execNetworkCollector := networkCollector{}
|
||||
execNetworkCollector.onReq(req, execDetails)
|
||||
execNetworkCollector.onResp(req, resp, execDetails)
|
||||
}
|
||||
|
||||
// tracing
|
||||
if spanRPC != nil {
|
||||
if util.TraceExecDetailsEnabled(ctx) {
|
||||
if si := buildSpanInfoFromResp(resp); si != nil {
|
||||
si.addTo(spanRPC, entry.start)
|
||||
}
|
||||
}
|
||||
spanRPC.Finish()
|
||||
}
|
||||
regionRPC.End()
|
||||
|
||||
// codec
|
||||
if useCodec && err == nil {
|
||||
resp, err = c.option.codec.DecodeResponse(req, resp)
|
||||
}
|
||||
|
||||
return resp, WrapErrConn(err, connArray)
|
||||
})
|
||||
|
||||
stop = context.AfterFunc(ctx, func() {
|
||||
logutil.Logger(ctx).Debug("async send request cancelled (context done)", zap.String("to", addr), zap.Error(ctx.Err()))
|
||||
entry.error(ctx.Err())
|
||||
})
|
||||
|
||||
batchConn := connArray.batchConn
|
||||
if val, err := util.EvalFailpoint("mockBatchCommandsChannelFullOnAsyncSend"); err == nil {
|
||||
mockBatchCommandsChannelFullOnAsyncSend(ctx, batchConn, cb, val)
|
||||
}
|
||||
select {
|
||||
case batchConn.batchCommandsCh <- entry:
|
||||
// will be fulfilled in batch send/recv loop.
|
||||
case <-ctx.Done():
|
||||
// will be fulfilled by the after callback of ctx.
|
||||
case <-batchConn.closed:
|
||||
logutil.Logger(ctx).Debug("async send request cancelled (conn closed)", zap.String("to", addr))
|
||||
cb.Invoke(nil, errors.New("batchConn closed"))
|
||||
}
|
||||
}
|
||||
|
||||
func mockBatchCommandsChannelFullOnAsyncSend(ctx context.Context, batchConn *batchConn, cb async.Callback[*tikvrpc.Response], val any) {
|
||||
var dur time.Duration
|
||||
switch v := val.(type) {
|
||||
case int:
|
||||
dur = time.Duration(v) * time.Millisecond
|
||||
case string:
|
||||
dur, _ = time.ParseDuration(v)
|
||||
}
|
||||
if dur > 0 {
|
||||
logutil.Logger(ctx).Info("[failpoint] mock channel full for " + dur.String())
|
||||
select {
|
||||
case <-time.After(dur):
|
||||
case <-ctx.Done():
|
||||
case <-batchConn.closed:
|
||||
cb.Invoke(nil, errors.New("batchConn closed"))
|
||||
}
|
||||
} else {
|
||||
logutil.Logger(ctx).Info("[failpoint] mock channel full")
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-batchConn.closed:
|
||||
cb.Invoke(nil, errors.New("batchConn closed"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,269 @@
|
|||
// Copyright 2025 TiKV Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/failpoint"
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/pingcap/kvproto/pkg/tikvpb"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/tikv/client-go/v2/config"
|
||||
"github.com/tikv/client-go/v2/internal/client/mockserver"
|
||||
"github.com/tikv/client-go/v2/tikvrpc"
|
||||
"github.com/tikv/client-go/v2/util/async"
|
||||
)
|
||||
|
||||
func TestSendRequestAsyncBasic(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
srv, port := mockserver.StartMockTikvService()
|
||||
require.True(t, port > 0)
|
||||
require.True(t, srv.IsRunning())
|
||||
addr := srv.Addr()
|
||||
|
||||
cli := NewRPCClient()
|
||||
defer func() {
|
||||
cli.Close()
|
||||
srv.Stop()
|
||||
}()
|
||||
|
||||
t.Run("BatchDisabled", func(t *testing.T) {
|
||||
defer config.UpdateGlobal(func(conf *config.Config) { conf.TiKVClient.MaxBatchSize = 0 })()
|
||||
called := false
|
||||
cb := async.NewCallback(nil, func(resp *tikvrpc.Response, err error) {
|
||||
called = true
|
||||
require.Nil(t, resp)
|
||||
require.ErrorContains(t, err, "batch client is disabled")
|
||||
})
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdEmpty, &tikvpb.BatchCommandsEmptyRequest{})
|
||||
cli.SendRequestAsync(ctx, addr, req, cb)
|
||||
require.True(t, called)
|
||||
})
|
||||
|
||||
t.Run("UnsupportedStoreType", func(t *testing.T) {
|
||||
called := false
|
||||
cb := async.NewCallback(nil, func(resp *tikvrpc.Response, err error) {
|
||||
called = true
|
||||
require.Nil(t, resp)
|
||||
require.ErrorContains(t, err, "unsupported store type")
|
||||
})
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdEmpty, &tikvpb.BatchCommandsEmptyRequest{})
|
||||
req.StoreTp = tikvrpc.TiFlash
|
||||
cli.SendRequestAsync(ctx, addr, req, cb)
|
||||
require.True(t, called)
|
||||
})
|
||||
|
||||
t.Run("UnsupportedRequestType", func(t *testing.T) {
|
||||
called := false
|
||||
cb := async.NewCallback(nil, func(resp *tikvrpc.Response, err error) {
|
||||
called = true
|
||||
require.Nil(t, resp)
|
||||
require.ErrorContains(t, err, "unsupported request type")
|
||||
})
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdMvccGetByKey, &kvrpcpb.MvccGetByKeyRequest{})
|
||||
cli.SendRequestAsync(ctx, addr, req, cb)
|
||||
require.True(t, called)
|
||||
})
|
||||
|
||||
t.Run("OK", func(t *testing.T) {
|
||||
rl := async.NewRunLoop()
|
||||
ok := false
|
||||
cb := async.NewCallback(rl, func(resp *tikvrpc.Response, err error) {
|
||||
require.NoError(t, err)
|
||||
ok = true
|
||||
})
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdEmpty, &tikvpb.BatchCommandsEmptyRequest{})
|
||||
cli.SendRequestAsync(ctx, addr, req, cb)
|
||||
|
||||
rl.Exec(ctx)
|
||||
require.True(t, ok)
|
||||
})
|
||||
}
|
||||
|
||||
func TestSendRequestAsyncTimeout(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
srv, port := mockserver.StartMockTikvService()
|
||||
require.True(t, port > 0)
|
||||
require.True(t, srv.IsRunning())
|
||||
addr := srv.Addr()
|
||||
|
||||
cli := NewRPCClient()
|
||||
defer func() {
|
||||
cli.Close()
|
||||
srv.Stop()
|
||||
}()
|
||||
|
||||
makeBatchResponse := func(req *tikvpb.BatchCommandsRequest) *tikvpb.BatchCommandsResponse {
|
||||
resp := &tikvpb.BatchCommandsResponse{RequestIds: req.GetRequestIds()}
|
||||
for range req.GetRequestIds() {
|
||||
resp.Responses = append(resp.Responses, &tikvpb.BatchCommandsResponse_Response{
|
||||
Cmd: &tikvpb.BatchCommandsResponse_Response_Empty{},
|
||||
})
|
||||
}
|
||||
return resp
|
||||
}
|
||||
|
||||
t.Run("TimeoutOnHandle", func(t *testing.T) {
|
||||
sendCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
|
||||
defer cancel()
|
||||
|
||||
var received atomic.Bool
|
||||
handle := func(req *tikvpb.BatchCommandsRequest) (*tikvpb.BatchCommandsResponse, error) {
|
||||
received.Store(true)
|
||||
<-sendCtx.Done()
|
||||
return makeBatchResponse(req), nil
|
||||
}
|
||||
srv.OnBatchCommandsRequest.Store(&handle)
|
||||
defer srv.OnBatchCommandsRequest.Store(nil)
|
||||
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdEmpty, &tikvpb.BatchCommandsEmptyRequest{})
|
||||
called := false
|
||||
rl := async.NewRunLoop()
|
||||
cb := async.NewCallback(rl, func(resp *tikvrpc.Response, err error) {
|
||||
called = true
|
||||
require.Nil(t, resp)
|
||||
require.ErrorIs(t, err, context.DeadlineExceeded)
|
||||
})
|
||||
cli.SendRequestAsync(sendCtx, addr, req, cb)
|
||||
rl.Exec(ctx)
|
||||
require.True(t, received.Load())
|
||||
require.True(t, called)
|
||||
})
|
||||
|
||||
t.Run("CanceledOnHandle", func(t *testing.T) {
|
||||
sendCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
|
||||
|
||||
var received atomic.Bool
|
||||
handle := func(req *tikvpb.BatchCommandsRequest) (*tikvpb.BatchCommandsResponse, error) {
|
||||
received.Store(true)
|
||||
<-sendCtx.Done()
|
||||
return makeBatchResponse(req), nil
|
||||
}
|
||||
srv.OnBatchCommandsRequest.Store(&handle)
|
||||
defer srv.OnBatchCommandsRequest.Store(nil)
|
||||
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdEmpty, &tikvpb.BatchCommandsEmptyRequest{})
|
||||
called := false
|
||||
rl := async.NewRunLoop()
|
||||
cb := async.NewCallback(rl, func(resp *tikvrpc.Response, err error) {
|
||||
called = true
|
||||
require.Nil(t, resp)
|
||||
require.ErrorIs(t, err, context.Canceled)
|
||||
})
|
||||
cli.SendRequestAsync(sendCtx, addr, req, cb)
|
||||
time.AfterFunc(time.Millisecond, cancel)
|
||||
rl.Exec(ctx)
|
||||
require.True(t, received.Load())
|
||||
require.True(t, called)
|
||||
})
|
||||
|
||||
t.Run("TimeoutBeforeSend", func(t *testing.T) {
|
||||
sendCtx, cancel := context.WithTimeout(ctx, time.Millisecond)
|
||||
defer cancel()
|
||||
failpoint.Enable("tikvclient/mockBatchCommandsChannelFullOnAsyncSend", `1*return(100)`)
|
||||
defer failpoint.Disable("tikvclient/mockBatchCommandsChannelFullOnAsyncSend")
|
||||
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdEmpty, &tikvpb.BatchCommandsEmptyRequest{})
|
||||
called := false
|
||||
rl := async.NewRunLoop()
|
||||
cb := async.NewCallback(rl, func(resp *tikvrpc.Response, err error) {
|
||||
called = true
|
||||
require.Nil(t, resp)
|
||||
require.ErrorIs(t, err, context.DeadlineExceeded)
|
||||
})
|
||||
cli.SendRequestAsync(sendCtx, addr, req, cb)
|
||||
rl.Exec(ctx)
|
||||
require.True(t, called)
|
||||
})
|
||||
|
||||
t.Run("CanceledBeforeSend", func(t *testing.T) {
|
||||
sendCtx, cancel := context.WithTimeout(ctx, time.Millisecond)
|
||||
failpoint.Enable("tikvclient/mockBatchCommandsChannelFullOnAsyncSend", `1*return(100)`)
|
||||
defer failpoint.Disable("tikvclient/mockBatchCommandsChannelFullOnAsyncSend")
|
||||
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdEmpty, &tikvpb.BatchCommandsEmptyRequest{})
|
||||
called := false
|
||||
rl := async.NewRunLoop()
|
||||
cb := async.NewCallback(rl, func(resp *tikvrpc.Response, err error) {
|
||||
called = true
|
||||
require.Nil(t, resp)
|
||||
require.ErrorIs(t, err, context.Canceled)
|
||||
})
|
||||
cancel()
|
||||
cli.SendRequestAsync(sendCtx, addr, req, cb)
|
||||
rl.Exec(ctx)
|
||||
require.True(t, called)
|
||||
})
|
||||
}
|
||||
|
||||
func TestSendRequestAsyncAndCloseClientOnHandle(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
srv, port := mockserver.StartMockTikvService()
|
||||
require.True(t, port > 0)
|
||||
require.True(t, srv.IsRunning())
|
||||
defer srv.Stop()
|
||||
addr := srv.Addr()
|
||||
cli := NewRPCClient()
|
||||
|
||||
var received atomic.Bool
|
||||
handle := func(req *tikvpb.BatchCommandsRequest) (*tikvpb.BatchCommandsResponse, error) {
|
||||
received.Store(true)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
return nil, errors.New("mock server error")
|
||||
}
|
||||
srv.OnBatchCommandsRequest.Store(&handle)
|
||||
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdEmpty, &tikvpb.BatchCommandsEmptyRequest{})
|
||||
rl, called := async.NewRunLoop(), false
|
||||
cb := async.NewCallback(rl, func(resp *tikvrpc.Response, err error) {
|
||||
called = true
|
||||
require.Nil(t, resp)
|
||||
require.ErrorContains(t, err, "batch client closed")
|
||||
})
|
||||
cli.SendRequestAsync(ctx, addr, req, cb)
|
||||
time.AfterFunc(10*time.Millisecond, func() { cli.Close() })
|
||||
rl.Exec(ctx)
|
||||
require.True(t, received.Load())
|
||||
require.True(t, called)
|
||||
}
|
||||
|
||||
func TestSendRequestAsyncAndCloseClientBeforeSend(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
srv, port := mockserver.StartMockTikvService()
|
||||
require.True(t, port > 0)
|
||||
require.True(t, srv.IsRunning())
|
||||
defer srv.Stop()
|
||||
addr := srv.Addr()
|
||||
cli := NewRPCClient()
|
||||
|
||||
failpoint.Enable("tikvclient/mockBatchCommandsChannelFullOnAsyncSend", `1*return(100)`)
|
||||
defer failpoint.Disable("tikvclient/mockBatchCommandsChannelFullOnAsyncSend")
|
||||
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdEmpty, &tikvpb.BatchCommandsEmptyRequest{})
|
||||
called := false
|
||||
cb := async.NewCallback(nil, func(resp *tikvrpc.Response, err error) {
|
||||
called = true
|
||||
require.Nil(t, resp)
|
||||
require.ErrorContains(t, err, "batchConn closed")
|
||||
})
|
||||
time.AfterFunc(10*time.Millisecond, func() { cli.Close() })
|
||||
cli.SendRequestAsync(ctx, addr, req, cb)
|
||||
require.True(t, called)
|
||||
}
|
||||
|
|
@ -58,6 +58,7 @@ import (
|
|||
"github.com/tikv/client-go/v2/metrics"
|
||||
"github.com/tikv/client-go/v2/tikvrpc"
|
||||
"github.com/tikv/client-go/v2/util"
|
||||
"github.com/tikv/client-go/v2/util/async"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
|
|
@ -68,6 +69,7 @@ type batchCommandsEntry struct {
|
|||
ctx context.Context
|
||||
req *tikvpb.BatchCommandsRequest_Request
|
||||
res chan *tikvpb.BatchCommandsResponse_Response
|
||||
cb async.Callback[*tikvrpc.Response]
|
||||
// forwardedHost is the address of a store which will handle the request.
|
||||
// It's different from the address the request sent to.
|
||||
forwardedHost string
|
||||
|
|
@ -90,9 +92,25 @@ func (b *batchCommandsEntry) priority() uint64 {
|
|||
return b.pri
|
||||
}
|
||||
|
||||
func (b *batchCommandsEntry) async() bool {
|
||||
return b.cb != nil
|
||||
}
|
||||
|
||||
func (b *batchCommandsEntry) response(resp *tikvpb.BatchCommandsResponse_Response) {
|
||||
if b.async() {
|
||||
b.cb.Schedule(tikvrpc.FromBatchCommandsResponse(resp))
|
||||
} else {
|
||||
b.res <- resp
|
||||
}
|
||||
}
|
||||
|
||||
func (b *batchCommandsEntry) error(err error) {
|
||||
b.err = err
|
||||
close(b.res)
|
||||
if b.async() {
|
||||
b.cb.Schedule(nil, err)
|
||||
} else {
|
||||
close(b.res)
|
||||
}
|
||||
}
|
||||
|
||||
// batchCommandsBuilder collects a batch of `batchCommandsEntry`s to build
|
||||
|
|
@ -834,6 +852,19 @@ func (c *batchCommandsClient) failPendingRequests(err error, forwardedHost strin
|
|||
})
|
||||
}
|
||||
|
||||
// failAsyncRequestsOnClose fails all async requests when the client is closed.
|
||||
func (c *batchCommandsClient) failAsyncRequestsOnClose() {
|
||||
err := errors.New("batch client closed")
|
||||
c.batched.Range(func(key, value interface{}) bool {
|
||||
id, _ := key.(uint64)
|
||||
entry, _ := value.(*batchCommandsEntry)
|
||||
if entry.async() {
|
||||
c.failRequest(err, id, entry)
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
// failRequestsByIDs fails requests by requestID.
|
||||
func (c *batchCommandsClient) failRequestsByIDs(err error, requestIDs []uint64) {
|
||||
for _, requestID := range requestIDs {
|
||||
|
|
@ -913,6 +944,8 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransport
|
|||
zap.Stack("stack"))
|
||||
logutil.BgLogger().Info("restart batchRecvLoop")
|
||||
go c.batchRecvLoop(cfg, tikvTransportLayerLoad, connMetrics, streamClient)
|
||||
} else {
|
||||
c.failAsyncRequestsOnClose()
|
||||
}
|
||||
}()
|
||||
|
||||
|
|
@ -970,7 +1003,7 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransport
|
|||
logutil.Eventf(entry.ctx, "receive %T response with other %d batched requests from %s", responses[i].GetCmd(), len(responses), c.target)
|
||||
if atomic.LoadInt32(&entry.canceled) == 0 {
|
||||
// Put the response only if the request is not canceled.
|
||||
entry.res <- responses[i]
|
||||
entry.response(responses[i])
|
||||
}
|
||||
c.batched.Delete(requestID)
|
||||
c.sent.Add(-1)
|
||||
|
|
|
|||
|
|
@ -42,6 +42,7 @@ import (
|
|||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/tikv/client-go/v2/tikvrpc"
|
||||
"github.com/tikv/client-go/v2/util/async"
|
||||
"golang.org/x/sync/singleflight"
|
||||
)
|
||||
|
||||
|
|
@ -74,6 +75,41 @@ func (r reqCollapse) SendRequest(ctx context.Context, addr string, req *tikvrpc.
|
|||
return r.Client.SendRequest(ctx, addr, req, timeout)
|
||||
}
|
||||
|
||||
func (r reqCollapse) SendRequestAsync(ctx context.Context, addr string, req *tikvrpc.Request, cb async.Callback[*tikvrpc.Response]) {
|
||||
if r.Client == nil {
|
||||
panic("client should not be nil")
|
||||
}
|
||||
cli, ok := r.Client.(ClientAsync)
|
||||
if !ok {
|
||||
cb.Invoke(nil, errors.Errorf("%T dose not implement ClientAsync interface", r.Client))
|
||||
return
|
||||
}
|
||||
if req.Type == tikvrpc.CmdResolveLock && len(req.ResolveLock().Keys) == 0 && len(req.ResolveLock().TxnInfos) == 0 {
|
||||
// try collapse resolve lock request.
|
||||
key := strconv.FormatUint(req.Context.RegionId, 10) + "-" + strconv.FormatUint(req.ResolveLock().StartVersion, 10)
|
||||
copyReq := *req
|
||||
rsC := resolveRegionSf.DoChan(key, func() (interface{}, error) {
|
||||
// resolveRegionSf will call this function in a goroutine, thus use SendRequest directly.
|
||||
return r.Client.SendRequest(context.Background(), addr, ©Req, ReadTimeoutShort)
|
||||
})
|
||||
// waiting the response in another goroutine.
|
||||
cb.Executor().Go(func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
cb.Schedule(nil, errors.WithStack(ctx.Err()))
|
||||
case rs := <-rsC:
|
||||
if rs.Err != nil {
|
||||
cb.Schedule(nil, errors.WithStack(rs.Err))
|
||||
return
|
||||
}
|
||||
cb.Schedule(rs.Val.(*tikvrpc.Response), nil)
|
||||
}
|
||||
})
|
||||
} else {
|
||||
cli.SendRequestAsync(ctx, addr, req, cb)
|
||||
}
|
||||
}
|
||||
|
||||
func (r reqCollapse) tryCollapseRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (canCollapse bool, resp *tikvrpc.Response, err error) {
|
||||
switch req.Type {
|
||||
case tikvrpc.CmdResolveLock:
|
||||
|
|
|
|||
|
|
@ -19,10 +19,13 @@ import (
|
|||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/tikv/client-go/v2/internal/logutil"
|
||||
"github.com/tikv/client-go/v2/internal/resourcecontrol"
|
||||
"github.com/tikv/client-go/v2/tikvrpc"
|
||||
"github.com/tikv/client-go/v2/tikvrpc/interceptor"
|
||||
"github.com/tikv/client-go/v2/util"
|
||||
"github.com/tikv/client-go/v2/util/async"
|
||||
resourceControlClient "github.com/tikv/pd/client/resource_group/controller"
|
||||
)
|
||||
|
||||
|
|
@ -86,6 +89,60 @@ func (r interceptedClient) SendRequest(ctx context.Context, addr string, req *ti
|
|||
return resp, err
|
||||
}
|
||||
|
||||
func (r interceptedClient) SendRequestAsync(ctx context.Context, addr string, req *tikvrpc.Request, cb async.Callback[*tikvrpc.Response]) {
|
||||
cli, ok := r.Client.(ClientAsync)
|
||||
if !ok {
|
||||
cb.Invoke(nil, errors.Errorf("%T dose not implement ClientAsync interface", r.Client))
|
||||
return
|
||||
}
|
||||
|
||||
// since all async requests processed by one runloop share the same resource group, if the quota is exceeded, all
|
||||
// requests/responses shall wait for the tokens, thus it's ok to call OnRequestWait/OnResponseWait synchronously.
|
||||
resourceGroupName, resourceControlInterceptor, reqInfo := getResourceControlInfo(ctx, req)
|
||||
if resourceControlInterceptor != nil {
|
||||
consumption, penalty, waitDuration, priority, err := resourceControlInterceptor.OnRequestWait(ctx, resourceGroupName, reqInfo)
|
||||
if err != nil {
|
||||
cb.Invoke(nil, err)
|
||||
return
|
||||
}
|
||||
req.GetResourceControlContext().Penalty = penalty
|
||||
// override request priority with resource group priority if it's not set.
|
||||
// Get the priority at tikv side has some performance issue, so we pass it
|
||||
// at client side. See: https://github.com/tikv/tikv/issues/15994 for more details.
|
||||
if req.GetResourceControlContext().OverridePriority == 0 {
|
||||
req.GetResourceControlContext().OverridePriority = uint64(priority)
|
||||
}
|
||||
|
||||
var ruDetails *util.RUDetails
|
||||
|
||||
if val := ctx.Value(util.RUDetailsCtxKey); val != nil {
|
||||
ruDetails = val.(*util.RUDetails)
|
||||
ruDetails.Update(consumption, waitDuration)
|
||||
}
|
||||
|
||||
cb.Inject(func(resp *tikvrpc.Response, err error) (*tikvrpc.Response, error) {
|
||||
if resp != nil {
|
||||
respInfo := resourcecontrol.MakeResponseInfo(resp)
|
||||
consumption, waitDuration, err := resourceControlInterceptor.OnResponseWait(ctx, resourceGroupName, reqInfo, respInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if ruDetails != nil {
|
||||
ruDetails.Update(consumption, waitDuration)
|
||||
}
|
||||
}
|
||||
return resp, err
|
||||
})
|
||||
}
|
||||
|
||||
if ctxInterceptor := interceptor.GetRPCInterceptorFromCtx(ctx); ctxInterceptor != nil {
|
||||
// TODO(zyguan): AsyncRPCInterceptor
|
||||
logutil.Logger(ctx).Warn("SendRequestAsync with interceptor is unsupported")
|
||||
}
|
||||
|
||||
cli.SendRequestAsync(ctx, addr, req, cb)
|
||||
}
|
||||
|
||||
var (
|
||||
// ResourceControlSwitch is used to control whether to enable the resource control.
|
||||
ResourceControlSwitch atomic.Value
|
||||
|
|
|
|||
|
|
@ -48,6 +48,8 @@ type MockServer struct {
|
|||
sync.Mutex
|
||||
check func(context.Context) error
|
||||
}
|
||||
|
||||
OnBatchCommandsRequest atomic.Pointer[func(*tikvpb.BatchCommandsRequest) (*tikvpb.BatchCommandsResponse, error)]
|
||||
}
|
||||
|
||||
// KvGet implements the TikvServer interface.
|
||||
|
|
@ -86,27 +88,35 @@ func (s *MockServer) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error {
|
|||
logutil.BgLogger().Error("batch commands receive fail", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
handle := s.OnBatchCommandsRequest.Load()
|
||||
if handle != nil {
|
||||
var resp *tikvpb.BatchCommandsResponse
|
||||
resp, err = (*handle)(req)
|
||||
if err == nil {
|
||||
err = ss.Send(resp)
|
||||
}
|
||||
} else {
|
||||
responses := make([]*tikvpb.BatchCommandsResponse_Response, 0, len(req.GetRequestIds()))
|
||||
for i := 0; i < len(req.GetRequestIds()); i++ {
|
||||
responses = append(responses, &tikvpb.BatchCommandsResponse_Response{
|
||||
Cmd: &tikvpb.BatchCommandsResponse_Response_Empty{
|
||||
Empty: &tikvpb.BatchCommandsEmptyResponse{},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
responses := make([]*tikvpb.BatchCommandsResponse_Response, 0, len(req.GetRequestIds()))
|
||||
for i := 0; i < len(req.GetRequestIds()); i++ {
|
||||
responses = append(responses, &tikvpb.BatchCommandsResponse_Response{
|
||||
Cmd: &tikvpb.BatchCommandsResponse_Response_Empty{
|
||||
Empty: &tikvpb.BatchCommandsEmptyResponse{},
|
||||
err = ss.Send(&tikvpb.BatchCommandsResponse{
|
||||
Responses: responses,
|
||||
RequestIds: req.GetRequestIds(),
|
||||
HealthFeedback: &kvrpcpb.HealthFeedback{
|
||||
StoreId: 1,
|
||||
FeedbackSeqNo: feedbackSeq,
|
||||
SlowScore: 1,
|
||||
},
|
||||
})
|
||||
feedbackSeq++
|
||||
}
|
||||
|
||||
err = ss.Send(&tikvpb.BatchCommandsResponse{
|
||||
Responses: responses,
|
||||
RequestIds: req.GetRequestIds(),
|
||||
HealthFeedback: &kvrpcpb.HealthFeedback{
|
||||
StoreId: 1,
|
||||
FeedbackSeqNo: feedbackSeq,
|
||||
SlowScore: 1,
|
||||
},
|
||||
})
|
||||
feedbackSeq++
|
||||
|
||||
if err != nil {
|
||||
logutil.BgLogger().Error("batch commands send fail", zap.Error(err))
|
||||
return err
|
||||
|
|
|
|||
|
|
@ -44,6 +44,9 @@ import (
|
|||
// It should not be used after calling Close().
|
||||
type Client = client.Client
|
||||
|
||||
// ClientAsync is a client that can send RPC asynchronously.
|
||||
type ClientAsync = client.ClientAsync
|
||||
|
||||
// ClientEventListener is a listener to handle events produced by `Client`.
|
||||
type ClientEventListener = client.ClientEventListener
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue