From 6fbd134007d594841f10bfaa60853f540b32f3e2 Mon Sep 17 00:00:00 2001 From: Nick Cameron Date: Thu, 29 Oct 2020 10:30:54 +1300 Subject: [PATCH] MOve tikv-store client to its own module Signed-off-by: Nick Cameron --- tikv-client-store/src/client.rs | 103 ++++++++++++++++++++++++++++++ tikv-client-store/src/lib.rs | 109 ++------------------------------ 2 files changed, 109 insertions(+), 103 deletions(-) create mode 100644 tikv-client-store/src/client.rs diff --git a/tikv-client-store/src/client.rs b/tikv-client-store/src/client.rs new file mode 100644 index 0000000..561b66c --- /dev/null +++ b/tikv-client-store/src/client.rs @@ -0,0 +1,103 @@ +// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. + +use crate::{ErrorKind, HasError, Region, Result, SecurityManager}; +use async_trait::async_trait; +use derive_new::new; +use grpcio::{CallOption, Environment}; +use kvproto::tikvpb::TikvClient; +use std::{sync::Arc, time::Duration}; +use tikv_client_common::stats::tikv_stats; + +/// A trait for connecting to TiKV stores. +pub trait KvConnect: Sized + Send + Sync + 'static { + type KvClient: KvClient + Clone + Send + Sync + 'static; + + fn connect(&self, address: &str) -> Result; + + fn connect_to_store(&self, region: Region, address: String) -> Result> { + info!("connect to tikv endpoint: {:?}", &address); + let client = self.connect(address.as_str())?; + Ok(Store::new(region, client)) + } +} + +pub type RpcFnType = + for<'a, 'b> fn( + &'a TikvClient, + &'b Req, + CallOption, + ) + -> std::result::Result<::grpcio::ClientUnaryReceiver, ::grpcio::Error>; + +#[derive(new, Clone)] +pub struct TikvConnect { + env: Arc, + security_mgr: Arc, + timeout: Duration, +} + +impl KvConnect for TikvConnect { + type KvClient = impl KvClient + Clone + Send + Sync + 'static; + + fn connect(&self, address: &str) -> Result { + self.security_mgr + .connect(self.env.clone(), address, TikvClient::new) + .map(|c| KvRpcClient::new(Arc::new(c), self.timeout)) + } +} + +#[async_trait] +pub trait KvClient { + async fn dispatch(&self, fun: RpcFnType, request: Req) -> Result + where + Req: Send + Sync + 'static, + Resp: HasError + Sized + Clone + Send + 'static; +} + +/// This client handles requests for a single TiKV node. It converts the data +/// types and abstractions of the client program into the grpc data types. +#[derive(new, Clone)] +struct KvRpcClient { + rpc_client: Arc, + timeout: Duration, +} + +#[async_trait] +impl KvClient for KvRpcClient { + async fn dispatch(&self, fun: RpcFnType, request: Req) -> Result + where + Req: Send + Sync + 'static, + Resp: HasError + Sized + Clone + Send + 'static, + { + fun( + &self.rpc_client, + &request, + CallOption::default().timeout(self.timeout), + )? + .await + .map_err(|e| ErrorKind::Grpc(e).into()) + } +} + +#[derive(new)] +pub struct Store { + pub region: Region, + pub client: Client, +} + +impl Store { + pub async fn dispatch( + &self, + request_name: &'static str, + fun: RpcFnType, + request: Req, + ) -> Result + where + Req: Send + Sync + 'static, + Resp: HasError + Sized + Clone + Send + 'static, + { + let result = self.client.dispatch(fun, request).await; + + tikv_stats(request_name).done(result) + } +} diff --git a/tikv-client-store/src/lib.rs b/tikv-client-store/src/lib.rs index cff4445..28dbc76 100644 --- a/tikv-client-store/src/lib.rs +++ b/tikv-client-store/src/lib.rs @@ -4,111 +4,14 @@ #[macro_use] extern crate log; +mod client; mod errors; pub mod region; -pub use crate::errors::{HasError, HasRegionError}; #[doc(inline)] -pub use crate::region::{Region, RegionId, RegionVerId, StoreId}; -pub use kvproto::tikvpb::TikvClient; +pub use crate::{ + client::{KvClient, KvConnect, RpcFnType, Store, TikvConnect}, + errors::{HasError, HasRegionError}, + region::{Region, RegionId, RegionVerId, StoreId}, +}; pub use tikv_client_common::{security::SecurityManager, Error, ErrorKind, Key, Result}; - -use async_trait::async_trait; -use derive_new::new; -use grpcio::{CallOption, Environment}; -use std::{sync::Arc, time::Duration}; -use tikv_client_common::stats::tikv_stats; - -/// A trait for connecting to TiKV stores. -pub trait KvConnect: Sized + Send + Sync + 'static { - type KvClient: KvClient + Clone + Send + Sync + 'static; - - fn connect(&self, address: &str) -> Result; - - fn connect_to_store(&self, region: Region, address: String) -> Result> { - info!("connect to tikv endpoint: {:?}", &address); - let client = self.connect(address.as_str())?; - Ok(Store::new(region, client)) - } -} - -pub type RpcFnType = - for<'a, 'b> fn( - &'a TikvClient, - &'b Req, - CallOption, - ) - -> std::result::Result<::grpcio::ClientUnaryReceiver, ::grpcio::Error>; - -#[derive(new, Clone)] -pub struct TikvConnect { - env: Arc, - security_mgr: Arc, - timeout: Duration, -} - -impl KvConnect for TikvConnect { - type KvClient = impl KvClient + Clone + Send + Sync + 'static; - - fn connect(&self, address: &str) -> Result { - self.security_mgr - .connect(self.env.clone(), address, TikvClient::new) - .map(|c| KvRpcClient::new(Arc::new(c), self.timeout)) - } -} - -#[async_trait] -pub trait KvClient { - async fn dispatch(&self, fun: RpcFnType, request: Req) -> Result - where - Req: Send + Sync + 'static, - Resp: HasError + Sized + Clone + Send + 'static; -} - -/// This client handles requests for a single TiKV node. It converts the data -/// types and abstractions of the client program into the grpc data types. -#[derive(new, Clone)] -struct KvRpcClient { - rpc_client: Arc, - timeout: Duration, -} - -#[async_trait] -impl KvClient for KvRpcClient { - async fn dispatch(&self, fun: RpcFnType, request: Req) -> Result - where - Req: Send + Sync + 'static, - Resp: HasError + Sized + Clone + Send + 'static, - { - fun( - &self.rpc_client, - &request, - CallOption::default().timeout(self.timeout), - )? - .await - .map_err(|e| ErrorKind::Grpc(e).into()) - } -} - -#[derive(new)] -pub struct Store { - pub region: Region, - pub client: Client, -} - -impl Store { - pub async fn dispatch( - &self, - request_name: &'static str, - fun: RpcFnType, - request: Req, - ) -> Result - where - Req: Send + Sync + 'static, - Resp: HasError + Sized + Clone + Send + 'static, - { - let result = self.client.dispatch(fun, request).await; - - tikv_stats(request_name).done(result) - } -}