mirror of https://github.com/tikv/client-go.git
*: fix issue of configure kv timeout not work when disable batch client (#980)
* *: fix issue of configure kv timeout not work when disable batch client Signed-off-by: crazycs520 <crazycs520@gmail.com> * refine test Signed-off-by: crazycs520 <crazycs520@gmail.com> * address comment Signed-off-by: crazycs520 <crazycs520@gmail.com> --------- Signed-off-by: crazycs520 <crazycs520@gmail.com>
This commit is contained in:
parent
001735b0b5
commit
342301689f
|
|
@ -36,7 +36,6 @@ package client
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
|
@ -47,6 +46,7 @@ import (
|
|||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/tikv/client-go/v2/config"
|
||||
"github.com/tikv/client-go/v2/internal/client/mock_server"
|
||||
"github.com/tikv/client-go/v2/tikvrpc"
|
||||
)
|
||||
|
||||
|
|
@ -54,11 +54,11 @@ func TestPanicInRecvLoop(t *testing.T) {
|
|||
require.Nil(t, failpoint.Enable("tikvclient/panicInFailPendingRequests", `panic`))
|
||||
require.Nil(t, failpoint.Enable("tikvclient/gotErrorInRecvLoop", `return("0")`))
|
||||
|
||||
server, port := startMockTikvService()
|
||||
server, port := mock_server.StartMockTikvService()
|
||||
require.True(t, port > 0)
|
||||
defer server.Stop()
|
||||
|
||||
addr := fmt.Sprintf("%s:%d", "127.0.0.1", port)
|
||||
addr := server.Addr()
|
||||
rpcClient := NewRPCClient()
|
||||
defer rpcClient.Close()
|
||||
rpcClient.option.dialTimeout = time.Second / 3
|
||||
|
|
@ -81,10 +81,10 @@ func TestPanicInRecvLoop(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestRecvErrorInMultipleRecvLoops(t *testing.T) {
|
||||
server, port := startMockTikvService()
|
||||
server, port := mock_server.StartMockTikvService()
|
||||
require.True(t, port > 0)
|
||||
defer server.Stop()
|
||||
addr := fmt.Sprintf("%s:%d", "127.0.0.1", port)
|
||||
addr := server.Addr()
|
||||
|
||||
// Enable batch and limit the connection count to 1 so that
|
||||
// there is only one BatchCommands stream for each host or forwarded host.
|
||||
|
|
|
|||
|
|
@ -55,6 +55,7 @@ import (
|
|||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/tikv/client-go/v2/config"
|
||||
"github.com/tikv/client-go/v2/internal/client/mock_server"
|
||||
"github.com/tikv/client-go/v2/internal/logutil"
|
||||
"github.com/tikv/client-go/v2/tikvrpc"
|
||||
"go.uber.org/zap"
|
||||
|
|
@ -118,12 +119,12 @@ func TestCancelTimeoutRetErr(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestSendWhenReconnect(t *testing.T) {
|
||||
server, port := startMockTikvService()
|
||||
server, port := mock_server.StartMockTikvService()
|
||||
require.True(t, port > 0)
|
||||
|
||||
rpcClient := NewRPCClient()
|
||||
defer rpcClient.Close()
|
||||
addr := fmt.Sprintf("%s:%d", "127.0.0.1", port)
|
||||
addr := server.Addr()
|
||||
conn, err := rpcClient.getConnArray(addr, true)
|
||||
assert.Nil(t, err)
|
||||
|
||||
|
|
@ -242,7 +243,7 @@ func TestCollapseResolveLock(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestForwardMetadataByUnaryCall(t *testing.T) {
|
||||
server, port := startMockTikvService()
|
||||
server, port := mock_server.StartMockTikvService()
|
||||
require.True(t, port > 0)
|
||||
defer server.Stop()
|
||||
addr := fmt.Sprintf("%s:%d", "127.0.0.1", port)
|
||||
|
|
@ -257,7 +258,7 @@ func TestForwardMetadataByUnaryCall(t *testing.T) {
|
|||
|
||||
var checkCnt uint64
|
||||
// Check no corresponding metadata if ForwardedHost is empty.
|
||||
server.setMetaChecker(func(ctx context.Context) error {
|
||||
server.SetMetaChecker(func(ctx context.Context) error {
|
||||
atomic.AddUint64(&checkCnt, 1)
|
||||
// gRPC may set some metadata by default, e.g. "context-type".
|
||||
md, ok := metadata.FromIncomingContext(ctx)
|
||||
|
|
@ -285,7 +286,7 @@ func TestForwardMetadataByUnaryCall(t *testing.T) {
|
|||
checkCnt = 0
|
||||
forwardedHost := "127.0.0.1:6666"
|
||||
// Check the metadata exists.
|
||||
server.setMetaChecker(func(ctx context.Context) error {
|
||||
server.SetMetaChecker(func(ctx context.Context) error {
|
||||
atomic.AddUint64(&checkCnt, 1)
|
||||
// gRPC may set some metadata by default, e.g. "context-type".
|
||||
md, ok := metadata.FromIncomingContext(ctx)
|
||||
|
|
@ -310,10 +311,10 @@ func TestForwardMetadataByUnaryCall(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestForwardMetadataByBatchCommands(t *testing.T) {
|
||||
server, port := startMockTikvService()
|
||||
server, port := mock_server.StartMockTikvService()
|
||||
require.True(t, port > 0)
|
||||
defer server.Stop()
|
||||
addr := fmt.Sprintf("%s:%d", "127.0.0.1", port)
|
||||
addr := server.Addr()
|
||||
|
||||
// Enable batch and limit the connection count to 1 so that
|
||||
// there is only one BatchCommands stream for each host or forwarded host.
|
||||
|
|
@ -326,7 +327,7 @@ func TestForwardMetadataByBatchCommands(t *testing.T) {
|
|||
|
||||
var checkCnt uint64
|
||||
setCheckHandler := func(forwardedHost string) {
|
||||
server.setMetaChecker(func(ctx context.Context) error {
|
||||
server.SetMetaChecker(func(ctx context.Context) error {
|
||||
atomic.AddUint64(&checkCnt, 1)
|
||||
md, ok := metadata.FromIncomingContext(ctx)
|
||||
if forwardedHost == "" {
|
||||
|
|
@ -649,10 +650,10 @@ func TestBatchClientRecoverAfterServerRestart(t *testing.T) {
|
|||
conf.TiKVClient.MaxBatchSize = 128
|
||||
})()
|
||||
|
||||
server, port := startMockTikvService()
|
||||
server, port := mock_server.StartMockTikvService()
|
||||
require.True(t, port > 0)
|
||||
require.True(t, server.IsRunning())
|
||||
addr := server.addr
|
||||
addr := server.Addr()
|
||||
client := NewRPCClient()
|
||||
defer func() {
|
||||
err := client.Close()
|
||||
|
|
@ -689,7 +690,7 @@ func TestBatchClientRecoverAfterServerRestart(t *testing.T) {
|
|||
logutil.BgLogger().Info("restart mock tikv server")
|
||||
server.Start(addr)
|
||||
require.True(t, server.IsRunning())
|
||||
require.Equal(t, addr, server.addr)
|
||||
require.Equal(t, addr, server.Addr())
|
||||
|
||||
// Wait batch client to auto reconnect.
|
||||
start := time.Now()
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@
|
|||
// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/client/mock_tikv_service_test.go
|
||||
//
|
||||
|
||||
package client
|
||||
package mock_server
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
|
@ -36,7 +36,7 @@ import (
|
|||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type server struct {
|
||||
type MockServer struct {
|
||||
tikvpb.TikvServer
|
||||
grpcServer *grpc.Server
|
||||
addr string
|
||||
|
|
@ -49,21 +49,28 @@ type server struct {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *server) KvPrewrite(ctx context.Context, req *kvrpcpb.PrewriteRequest) (*kvrpcpb.PrewriteResponse, error) {
|
||||
func (s *MockServer) KvGet(ctx context.Context, req *kvrpcpb.GetRequest) (*kvrpcpb.GetResponse, error) {
|
||||
if err := s.checkMetadata(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &kvrpcpb.GetResponse{}, nil
|
||||
}
|
||||
|
||||
func (s *MockServer) KvPrewrite(ctx context.Context, req *kvrpcpb.PrewriteRequest) (*kvrpcpb.PrewriteResponse, error) {
|
||||
if err := s.checkMetadata(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &kvrpcpb.PrewriteResponse{}, nil
|
||||
}
|
||||
|
||||
func (s *server) CoprocessorStream(req *coprocessor.Request, ss tikvpb.Tikv_CoprocessorStreamServer) error {
|
||||
func (s *MockServer) CoprocessorStream(req *coprocessor.Request, ss tikvpb.Tikv_CoprocessorStreamServer) error {
|
||||
if err := s.checkMetadata(ss.Context()); err != nil {
|
||||
return err
|
||||
}
|
||||
return ss.Send(&coprocessor.Response{})
|
||||
}
|
||||
|
||||
func (s *server) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error {
|
||||
func (s *MockServer) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error {
|
||||
if err := s.checkMetadata(ss.Context()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -94,13 +101,13 @@ func (s *server) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *server) setMetaChecker(check func(context.Context) error) {
|
||||
func (s *MockServer) SetMetaChecker(check func(context.Context) error) {
|
||||
s.metaChecker.Lock()
|
||||
s.metaChecker.check = check
|
||||
s.metaChecker.Unlock()
|
||||
}
|
||||
|
||||
func (s *server) checkMetadata(ctx context.Context) error {
|
||||
func (s *MockServer) checkMetadata(ctx context.Context) error {
|
||||
s.metaChecker.Lock()
|
||||
defer s.metaChecker.Unlock()
|
||||
if s.metaChecker.check != nil {
|
||||
|
|
@ -109,16 +116,20 @@ func (s *server) checkMetadata(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *server) IsRunning() bool {
|
||||
func (s *MockServer) IsRunning() bool {
|
||||
return atomic.LoadInt64(&s.running) == 1
|
||||
}
|
||||
|
||||
func (s *server) Stop() {
|
||||
func (s *MockServer) Addr() string {
|
||||
return s.addr
|
||||
}
|
||||
|
||||
func (s *MockServer) Stop() {
|
||||
s.grpcServer.Stop()
|
||||
atomic.StoreInt64(&s.running, 0)
|
||||
}
|
||||
|
||||
func (s *server) Start(addr string) int {
|
||||
func (s *MockServer) Start(addr string) int {
|
||||
if addr == "" {
|
||||
addr = fmt.Sprintf("%s:%d", "127.0.0.1", 0)
|
||||
}
|
||||
|
|
@ -148,9 +159,9 @@ func (s *server) Start(addr string) int {
|
|||
return port
|
||||
}
|
||||
|
||||
// Try to start a gRPC server and retrun the server instance and binded port.
|
||||
func startMockTikvService() (*server, int) {
|
||||
server := &server{}
|
||||
// StartMockTikvService try to start a gRPC server and retrun the server instance and binded port.
|
||||
func StartMockTikvService() (*MockServer, int) {
|
||||
server := &MockServer{}
|
||||
port := server.Start("")
|
||||
return server, port
|
||||
}
|
||||
|
|
@ -1776,7 +1776,7 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, r
|
|||
return errors.WithStack(err)
|
||||
} else if LoadShuttingDown() > 0 {
|
||||
return errors.WithStack(tikverr.ErrTiDBShuttingDown)
|
||||
} else if errors.Cause(err) == context.DeadlineExceeded && req.MaxExecutionDurationMs < uint64(client.ReadTimeoutShort.Milliseconds()) {
|
||||
} else if isCauseByDeadlineExceeded(err) && req.MaxExecutionDurationMs < uint64(client.ReadTimeoutShort.Milliseconds()) {
|
||||
if s.replicaSelector != nil {
|
||||
s.replicaSelector.onDeadlineExceeded()
|
||||
return nil
|
||||
|
|
@ -1834,6 +1834,12 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, r
|
|||
return err
|
||||
}
|
||||
|
||||
func isCauseByDeadlineExceeded(err error) bool {
|
||||
causeErr := errors.Cause(err)
|
||||
return causeErr == context.DeadlineExceeded || // batch-client will return this error.
|
||||
status.Code(causeErr) == codes.DeadlineExceeded // when batch-client is disabled, grpc will return this error.
|
||||
}
|
||||
|
||||
// NeedReloadRegion checks is all peers has sent failed, if so need reload.
|
||||
func (s *RegionRequestSender) NeedReloadRegion(ctx *RPCContext) (need bool) {
|
||||
if s.failStoreIDs == nil {
|
||||
|
|
|
|||
|
|
@ -52,8 +52,10 @@ import (
|
|||
"github.com/pingcap/kvproto/pkg/tikvpb"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"github.com/tikv/client-go/v2/config"
|
||||
"github.com/tikv/client-go/v2/internal/apicodec"
|
||||
"github.com/tikv/client-go/v2/internal/client"
|
||||
"github.com/tikv/client-go/v2/internal/client/mock_server"
|
||||
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
|
||||
"github.com/tikv/client-go/v2/internal/retry"
|
||||
"github.com/tikv/client-go/v2/tikvrpc"
|
||||
|
|
@ -697,3 +699,35 @@ func (s *testRegionRequestToSingleStoreSuite) TestStaleReadRetry() {
|
|||
s.Nil(regionErr)
|
||||
s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value)
|
||||
}
|
||||
|
||||
func (s *testRegionRequestToSingleStoreSuite) TestKVReadTimeoutWithDisableBatchClient() {
|
||||
config.UpdateGlobal(func(conf *config.Config) {
|
||||
conf.TiKVClient.MaxBatchSize = 0
|
||||
})()
|
||||
|
||||
server, port := mock_server.StartMockTikvService()
|
||||
s.True(port > 0)
|
||||
server.SetMetaChecker(func(ctx context.Context) error {
|
||||
return context.DeadlineExceeded
|
||||
})
|
||||
rpcClient := client.NewRPCClient()
|
||||
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
|
||||
return rpcClient.SendRequest(ctx, server.Addr(), req, timeout)
|
||||
}}
|
||||
defer func() {
|
||||
rpcClient.Close()
|
||||
server.Stop()
|
||||
}()
|
||||
|
||||
bo := retry.NewBackofferWithVars(context.Background(), 2000, nil)
|
||||
region, err := s.cache.LocateRegionByID(bo, s.region)
|
||||
s.Nil(err)
|
||||
s.NotNil(region)
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("a"), Version: 1})
|
||||
resp, _, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Millisecond*10)
|
||||
s.Nil(err)
|
||||
s.NotNil(resp)
|
||||
regionErr, _ := resp.GetRegionError()
|
||||
s.True(IsFakeRegionError(regionErr))
|
||||
s.Equal(0, bo.GetTotalBackoffTimes()) // use kv read timeout will do fast retry, so backoff times should be 0.
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue