mirror of https://github.com/tikv/client-go.git
fix issue of doesn't fast fail request when no available connections (#1339)
Signed-off-by: crazycs520 <crazycs520@gmail.com>
This commit is contained in:
parent
6cb0704fce
commit
c82e921992
|
|
@ -44,9 +44,10 @@ import (
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// DefStoreLivenessTimeout is the default value for store liveness timeout.
|
// DefStoreLivenessTimeout is the default value for store liveness timeout.
|
||||||
DefStoreLivenessTimeout = "1s"
|
DefStoreLivenessTimeout = "1s"
|
||||||
DefGrpcInitialWindowSize = 1 << 27 // 128MiB
|
DefGrpcInitialWindowSize = 1 << 27 // 128MiB
|
||||||
DefGrpcInitialConnWindowSize = 1 << 27 // 128MiB
|
DefGrpcInitialConnWindowSize = 1 << 27 // 128MiB
|
||||||
|
DefMaxConcurrencyRequestLimit = math.MaxInt64
|
||||||
)
|
)
|
||||||
|
|
||||||
// TiKVClient is the config for tikv client.
|
// TiKVClient is the config for tikv client.
|
||||||
|
|
@ -174,7 +175,7 @@ func DefaultTiKVClient() TiKVClient {
|
||||||
CoprReqTimeout: 60 * time.Second,
|
CoprReqTimeout: 60 * time.Second,
|
||||||
|
|
||||||
ResolveLockLiteThreshold: 16,
|
ResolveLockLiteThreshold: 16,
|
||||||
MaxConcurrencyRequestLimit: math.MaxInt64,
|
MaxConcurrencyRequestLimit: DefMaxConcurrencyRequestLimit,
|
||||||
EnableReplicaSelectorV2: true,
|
EnableReplicaSelectorV2: true,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -418,6 +418,12 @@ func (a *batchConn) getClientAndSend() {
|
||||||
if cli == nil {
|
if cli == nil {
|
||||||
logutil.BgLogger().Info("no available connections", zap.String("target", target), zap.Any("reasons", reasons))
|
logutil.BgLogger().Info("no available connections", zap.String("target", target), zap.Any("reasons", reasons))
|
||||||
metrics.TiKVNoAvailableConnectionCounter.Inc()
|
metrics.TiKVNoAvailableConnectionCounter.Inc()
|
||||||
|
if config.GetGlobalConfig().TiKVClient.MaxConcurrencyRequestLimit == config.DefMaxConcurrencyRequestLimit {
|
||||||
|
// Only cancel requests when MaxConcurrencyRequestLimit feature is not enabled, to be compatible with the behavior of older versions.
|
||||||
|
// TODO: But when MaxConcurrencyRequestLimit feature is enabled, the requests won't be canceled and will wait until timeout.
|
||||||
|
// This behavior may not be reasonable, as the timeout is usually 40s or 60s, which is too long to retry in time.
|
||||||
|
a.reqBuilder.cancel(errors.New("no available connections"))
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer cli.unlockForSend()
|
defer cli.unlockForSend()
|
||||||
|
|
|
||||||
|
|
@ -128,9 +128,15 @@ func TestCancelTimeoutRetErr(t *testing.T) {
|
||||||
func TestSendWhenReconnect(t *testing.T) {
|
func TestSendWhenReconnect(t *testing.T) {
|
||||||
server, port := mockserver.StartMockTikvService()
|
server, port := mockserver.StartMockTikvService()
|
||||||
require.True(t, port > 0)
|
require.True(t, port > 0)
|
||||||
|
restoreFn := config.UpdateGlobal(func(conf *config.Config) {
|
||||||
|
conf.TiKVClient.MaxConcurrencyRequestLimit = 10000
|
||||||
|
})
|
||||||
|
|
||||||
rpcClient := NewRPCClient()
|
rpcClient := NewRPCClient()
|
||||||
defer rpcClient.Close()
|
defer func() {
|
||||||
|
rpcClient.Close()
|
||||||
|
restoreFn()
|
||||||
|
}()
|
||||||
addr := server.Addr()
|
addr := server.Addr()
|
||||||
conn, err := rpcClient.getConnArray(addr, true)
|
conn, err := rpcClient.getConnArray(addr, true)
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
|
@ -142,7 +148,7 @@ func TestSendWhenReconnect(t *testing.T) {
|
||||||
|
|
||||||
req := tikvrpc.NewRequest(tikvrpc.CmdEmpty, &tikvpb.BatchCommandsEmptyRequest{})
|
req := tikvrpc.NewRequest(tikvrpc.CmdEmpty, &tikvpb.BatchCommandsEmptyRequest{})
|
||||||
_, err = rpcClient.SendRequest(context.Background(), addr, req, 5*time.Second)
|
_, err = rpcClient.SendRequest(context.Background(), addr, req, 5*time.Second)
|
||||||
assert.True(t, strings.Contains(err.Error(), "timeout"))
|
require.Regexp(t, "wait recvLoop timeout,timeout:5s, wait_duration:.* context deadline exceeded", err.Error())
|
||||||
server.Stop()
|
server.Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -950,6 +956,7 @@ func TestRandomRestartStoreAndForwarding(t *testing.T) {
|
||||||
err.Error() == "rpc error: code = Unavailable desc = error reading from server: EOF" ||
|
err.Error() == "rpc error: code = Unavailable desc = error reading from server: EOF" ||
|
||||||
strings.Contains(err.Error(), "context deadline exceeded") ||
|
strings.Contains(err.Error(), "context deadline exceeded") ||
|
||||||
strings.Contains(err.Error(), "connect: connection refused") ||
|
strings.Contains(err.Error(), "connect: connection refused") ||
|
||||||
|
strings.Contains(err.Error(), "no available connections") ||
|
||||||
strings.Contains(err.Error(), "rpc error: code = Unavailable desc = error reading from server") {
|
strings.Contains(err.Error(), "rpc error: code = Unavailable desc = error reading from server") {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
@ -1010,3 +1017,33 @@ func TestErrConn(t *testing.T) {
|
||||||
assert.True(t, errors.As(err1, &errMsg))
|
assert.True(t, errors.As(err1, &errMsg))
|
||||||
assert.EqualError(t, err1, errMsg.Error())
|
assert.EqualError(t, err1, errMsg.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFastFailWhenNoAvailableConn(t *testing.T) {
|
||||||
|
server, port := mockserver.StartMockTikvService()
|
||||||
|
require.True(t, port > 0)
|
||||||
|
require.True(t, server.IsRunning())
|
||||||
|
addr := server.Addr()
|
||||||
|
client := NewRPCClient()
|
||||||
|
defer func() {
|
||||||
|
err := client.Close()
|
||||||
|
require.NoError(t, err)
|
||||||
|
server.Stop()
|
||||||
|
}()
|
||||||
|
|
||||||
|
req := &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_Coprocessor{Coprocessor: &coprocessor.Request{}}}
|
||||||
|
conn, err := client.getConnArray(addr, true)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
_, err = sendBatchRequest(context.Background(), addr, "", conn.batchConn, req, time.Second, 0)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
for _, c := range conn.batchConn.batchCommandsClients {
|
||||||
|
// mock all client a in recreate.
|
||||||
|
c.lockForRecreate()
|
||||||
|
}
|
||||||
|
start := time.Now()
|
||||||
|
timeout := time.Second
|
||||||
|
_, err = sendBatchRequest(context.Background(), addr, "", conn.batchConn, req, timeout, 0)
|
||||||
|
require.Error(t, err)
|
||||||
|
require.Equal(t, "no available connections", err.Error())
|
||||||
|
require.Less(t, time.Since(start), timeout)
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue