client: cleanup RPCClient options (#268)

Signed-off-by: disksing <i@disksing.com>
This commit is contained in:
disksing 2021-08-16 15:11:08 +08:00 committed by GitHub
parent b7d1044eb5
commit df2119f51b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 34 additions and 19 deletions

View File

@ -74,7 +74,7 @@ func NewTestStore(t *testing.T) *tikv.KVStore {
require.Nil(t, err)
spKV, err := tikv.NewEtcdSafePointKV(addrs, tlsConfig)
require.Nil(t, err)
store, err := tikv.NewKVStore("test-store", &tikv.CodecPDClient{Client: pdClient}, spKV, tikv.NewRPCClient(securityConfig))
store, err := tikv.NewKVStore("test-store", &tikv.CodecPDClient{Client: pdClient}, spKV, tikv.NewRPCClient())
require.Nil(t, err)
err = clearStorage(store)
require.Nil(t, err)

View File

@ -240,6 +240,16 @@ func (a *connArray) Close() {
close(a.done)
}
// Opt is the option for the client.
type Opt func(*RPCClient)
// WithSecurity is used to set the security config.
func WithSecurity(security config.Security) Opt {
return func(c *RPCClient) {
c.security = security
}
}
// RPCClient is RPC client struct.
// TODO: Add flow control between RPC clients in TiDB ond RPC servers in TiKV.
// Since we use shared client connection to communicate to the same TiKV, it's possible
@ -261,10 +271,9 @@ type RPCClient struct {
}
// NewRPCClient creates a client that manages connections and rpc calls with tikv-servers.
func NewRPCClient(security config.Security, opts ...func(c *RPCClient)) *RPCClient {
func NewRPCClient(opts ...Opt) *RPCClient {
cli := &RPCClient{
conns: make(map[string]*connArray),
security: security,
dialTimeout: dialTimeout,
}
for _, opt := range opts {

View File

@ -59,9 +59,8 @@ func TestPanicInRecvLoop(t *testing.T) {
defer server.Stop()
addr := fmt.Sprintf("%s:%d", "127.0.0.1", port)
rpcClient := NewRPCClient(config.Security{}, func(c *RPCClient) {
c.dialTimeout = time.Second / 3
})
rpcClient := NewRPCClient()
rpcClient.dialTimeout = time.Second / 3
// Start batchRecvLoop, and it should panic in `failPendingRequests`.
_, err := rpcClient.getConnArray(addr, true, func(cfg *config.TiKVClient) { cfg.GrpcConnectionCount = 1 })
@ -94,7 +93,7 @@ func TestRecvErrorInMultipleRecvLoops(t *testing.T) {
conf.TiKVClient.MaxBatchSize = 128
conf.TiKVClient.GrpcConnectionCount = 1
})()
rpcClient := NewRPCClient(config.Security{})
rpcClient := NewRPCClient()
defer rpcClient.closeConns()
// Create 4 BatchCommands streams.

View File

@ -59,7 +59,7 @@ func TestConn(t *testing.T) {
conf.TiKVClient.MaxBatchSize = 0
})()
client := NewRPCClient(config.Security{})
client := NewRPCClient()
addr := "127.0.0.1:6379"
conn1, err := client.getConnArray(addr, true)
@ -92,7 +92,7 @@ func TestSendWhenReconnect(t *testing.T) {
server, port := startMockTikvService()
require.True(t, port > 0)
rpcClient := NewRPCClient(config.Security{})
rpcClient := NewRPCClient()
addr := fmt.Sprintf("%s:%d", "127.0.0.1", port)
conn, err := rpcClient.getConnArray(addr, true)
assert.Nil(t, err)
@ -219,7 +219,7 @@ func TestForwardMetadataByUnaryCall(t *testing.T) {
conf.TiKVClient.MaxBatchSize = 0
conf.TiKVClient.GrpcConnectionCount = 1
})()
rpcClient := NewRPCClient(config.Security{})
rpcClient := NewRPCClient()
defer rpcClient.closeConns()
var checkCnt uint64
@ -288,7 +288,7 @@ func TestForwardMetadataByBatchCommands(t *testing.T) {
conf.TiKVClient.MaxBatchSize = 128
conf.TiKVClient.GrpcConnectionCount = 1
})()
rpcClient := NewRPCClient(config.Security{})
rpcClient := NewRPCClient()
defer rpcClient.closeConns()
var checkCnt uint64

View File

@ -51,7 +51,6 @@ import (
"github.com/pingcap/kvproto/pkg/mpp"
"github.com/pingcap/kvproto/pkg/tikvpb"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/internal/client"
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
"github.com/tikv/client-go/v2/internal/retry"
@ -469,7 +468,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestNoReloadRegionForGrpcWhenCtxCa
wg.Done()
}()
cli := client.NewRPCClient(config.Security{})
cli := client.NewRPCClient()
sender := NewRegionRequestSender(s.cache, cli)
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
Key: []byte("key"),
@ -486,7 +485,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestNoReloadRegionForGrpcWhenCtxCa
// Just for covering error code = codes.Canceled.
client1 := &cancelContextClient{
Client: client.NewRPCClient(config.Security{}),
Client: client.NewRPCClient(),
redirectAddr: addr,
}
sender = NewRegionRequestSender(s.cache, client1)

View File

@ -96,7 +96,7 @@ func NewClient(ctx context.Context, pdAddrs []string, security config.Security,
clusterID: pdCli.GetClusterID(ctx),
regionCache: locate.NewRegionCache(pdCli),
pdClient: pdCli,
rpcClient: client.NewRPCClient(security),
rpcClient: client.NewRPCClient(client.WithSecurity(security)),
}, nil
}

View File

@ -43,6 +43,14 @@ import (
// It should not be used after calling Close().
type Client = client.Client
// ClientOpt defines the option to create RPC client.
type ClientOpt = client.Opt
// WithSecurity is used to set security config.
func WithSecurity(security config.Security) ClientOpt {
return client.WithSecurity(security)
}
// Timeout durations.
const (
ReadTimeoutMedium = client.ReadTimeoutMedium
@ -50,6 +58,6 @@ const (
)
// NewRPCClient creates a client that manages connections and rpc calls with tikv-servers.
func NewRPCClient(security config.Security, opts ...func(c *client.RPCClient)) *client.RPCClient {
return client.NewRPCClient(security, opts...)
func NewRPCClient(opts ...ClientOpt) *client.RPCClient {
return client.NewRPCClient(opts...)
}

View File

@ -211,7 +211,7 @@ func NewTxnClient(pdAddrs []string) (*KVStore, error) {
return nil, errors.Trace(err)
}
s, err := NewKVStore(uuid, pdClient, spkv, NewRPCClient(cfg.Security))
s, err := NewKVStore(uuid, pdClient, spkv, NewRPCClient(WithSecurity(cfg.Security)))
if err != nil {
return nil, errors.Trace(err)
}
@ -609,7 +609,7 @@ func NewLockResolver(etcdAddrs []string, security config.Security, opts ...pd.Cl
return nil, errors.Trace(err)
}
s, err := NewKVStore(uuid, locate.NewCodeCPDClient(pdCli), spkv, client.NewRPCClient(security))
s, err := NewKVStore(uuid, locate.NewCodeCPDClient(pdCli), spkv, client.NewRPCClient(WithSecurity(security)))
if err != nil {
return nil, errors.Trace(err)
}