Merge pull request #193 from ekexium/move-store

Refactor: move some modules to main crate
This commit is contained in:
Nick Cameron 2020-11-03 16:20:25 +13:00 committed by GitHub
commit c7eeff9e4a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 68 additions and 77 deletions

6
Cargo.lock generated
View File

@ -884,7 +884,6 @@ dependencies = [
"futures 0.3.5",
"grpcio",
"log",
"tikv-client-common",
"tikv-client-proto",
]
@ -1854,7 +1853,6 @@ name = "tikv-client-common"
version = "0.0.0"
dependencies = [
"clap",
"derive-new",
"fail",
"failure",
"futures 0.3.5",
@ -1864,8 +1862,6 @@ dependencies = [
"proptest",
"proptest-derive",
"regex",
"serde",
"serde_derive",
"tempdir",
"tikv-client-proto",
"tokio",
@ -1877,7 +1873,6 @@ version = "0.0.0"
dependencies = [
"async-trait",
"clap",
"derive-new",
"fail",
"futures 0.3.5",
"grpcio",
@ -1887,7 +1882,6 @@ dependencies = [
"tempdir",
"tikv-client-common",
"tikv-client-proto",
"tokio",
]
[[package]]

View File

@ -8,5 +8,4 @@ derive-new = "0.5.8"
futures = "0.3"
grpcio = { version = "0.6", features = [ "secure", "prost-codec" ], default-features = false }
log = "0.4"
tikv-client-common = { path = "../tikv-client-common"}
tikv-client-proto = { path = "../tikv-client-proto"}

View File

@ -27,7 +27,7 @@ use tikv_client_proto::kvrpcpb;
/// ```rust
/// # use std::ops::{Range, RangeInclusive, RangeTo, RangeToInclusive, RangeFrom, RangeFull, Bound};
/// # use std::convert::TryInto;
/// # use tikv_client_common::{Key, BoundRange};
/// # use tikv_client::{Key, BoundRange};
///
/// let explict_range: Range<Key> = Range { start: Key::from("Rust".to_owned()), end: Key::from("TiKV".to_owned()) };
/// let from_explict_range: BoundRange = explict_range.into();
@ -78,7 +78,7 @@ impl BoundRange {
/// The **end** of a scan is exclusive, unless appended with an '\0', then it is inclusive.
///
/// ```rust
/// use tikv_client_common::{BoundRange, Key, ToOwnedRange};
/// use tikv_client::{BoundRange, Key, ToOwnedRange};
/// // Exclusive
/// let range = "a".."z";
/// assert_eq!(

View File

@ -1,5 +1,7 @@
use crate::{errors::Result, Error};
use std::{io::Write, ptr};
use tikv_client_common::Error;
use crate::Result;
const ENC_GROUP_SIZE: usize = 8;
const ENC_MARKER: u8 = 0xff;

View File

@ -21,7 +21,7 @@ const _PROPTEST_KEY_MAX: usize = 1024 * 2; // 2 KB
/// This type wraps around an owned value, so it should be treated it like `String` or `Vec<u8>`.
///
/// ```rust
/// use tikv_client_common::Key;
/// use tikv_client::Key;
///
/// let static_str: &'static str = "TiKV";
/// let from_static_str = Key::from(static_str.to_owned());
@ -44,7 +44,7 @@ const _PROPTEST_KEY_MAX: usize = 1024 * 2; // 2 KB
/// these cases using the fully-qualified-syntax is useful:
///
/// ```rust
/// use tikv_client_common::Key;
/// use tikv_client::Key;
///
/// let buf = "TiKV".as_bytes().to_owned();
/// let key = Key::from(buf.clone());

View File

@ -9,7 +9,7 @@ use tikv_client_proto::kvrpcpb;
/// A key/value pair.
///
/// ```rust
/// # use tikv_client_common::{Key, Value, KvPair};
/// # use tikv_client::{Key, Value, KvPair};
/// let key = "key".to_owned();
/// let value = "value".to_owned();
/// let constructed = KvPair::new(key.clone(), value.clone());

View File

@ -13,7 +13,7 @@ const _PROPTEST_VALUE_MAX: usize = 1024 * 16; // 16 KB
/// This type wraps around an owned value, so it should be treated it like `String` or `Vec<u8>`.
///
/// ```rust
/// use tikv_client_common::Value;
/// use tikv_client::Value;
///
/// let static_str: &'static str = "TiKV";
/// let from_static_str = Value::from(static_str.to_owned());
@ -36,7 +36,7 @@ const _PROPTEST_VALUE_MAX: usize = 1024 * 16; // 16 KB
/// these cases using the fully-qualified-syntax is useful:
///
/// ```rust
/// use tikv_client_common::Value;
/// use tikv_client::Value;
///
/// let buf = "TiKV".as_bytes().to_owned();
/// let value = Value::from(buf.clone());

View File

@ -79,10 +79,14 @@ mod transaction;
mod backoff;
mod compat;
mod config;
mod kv;
mod pd;
mod raw;
mod region;
mod request;
mod stats;
mod store;
mod timestamp;
#[cfg(test)]
mod mock;
@ -92,15 +96,16 @@ mod proptests;
#[macro_use]
extern crate log;
#[doc(inline)]
pub use crate::kv::{BoundRange, Key, KvPair, ToOwnedRange, Value};
#[doc(inline)]
pub use crate::raw::{Client as RawClient, ColumnFamily};
#[doc(inline)]
pub use crate::timestamp::Timestamp;
#[doc(inline)]
pub use crate::transaction::{Client as TransactionClient, Snapshot, Transaction};
pub use config::Config;
#[doc(inline)]
pub use tikv_client_common::{
security::SecurityManager, BoundRange, Error, ErrorKind, Key, KvPair, Result, Timestamp,
ToOwnedRange, Value,
};
pub use region::{Region, RegionId, RegionVerId, StoreId};
#[doc(inline)]
pub use tikv_client_store::region::{Region, RegionId, RegionVerId, StoreId};
pub use tikv_client_common::{security::SecurityManager, Error, ErrorKind, Result};

View File

@ -7,13 +7,14 @@
use crate::{
pd::{PdClient, PdRpcClient, RetryClient},
Config, Error, Key, Result, Timestamp,
store::Store,
Config, Error, Key, Region, RegionId, Result, Timestamp,
};
use async_trait::async_trait;
use derive_new::new;
use std::{any::Any, sync::Arc};
use tikv_client_proto::metapb;
use tikv_client_store::{KvClient, KvConnect, Region, RegionId, Request, Store};
use tikv_client_store::{KvClient, KvConnect, Request};
/// Create a `PdRpcClient` with it's internals replaced with mocks so that the
/// client can be tested without doing any RPC calls.

View File

@ -1,8 +1,8 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
use crate::{
compat::stream_fn, pd::RetryClient, BoundRange, Config, Key, Region, RegionId, Result,
SecurityManager,
compat::stream_fn, kv::codec, pd::RetryClient, store::Store, BoundRange, Config, Key, Region,
RegionId, Result, SecurityManager, Timestamp,
};
use async_trait::async_trait;
use futures::{prelude::*, stream::BoxStream};
@ -12,9 +12,8 @@ use std::{
sync::{Arc, RwLock},
thread,
};
use tikv_client_common::{codec, Timestamp};
use tikv_client_pd::Cluster;
use tikv_client_store::{KvClient, KvConnect, Store, TikvConnect};
use tikv_client_store::{KvClient, KvConnect, TikvConnect};
const CQ_COUNT: usize = 1;
const CLIENT_PREFIX: &str = "tikv-client";
@ -315,7 +314,6 @@ pub mod test {
use crate::mock::*;
use futures::{executor, executor::block_on};
use tikv_client_common::BoundRange;
#[test]
fn test_kv_client_caching() {

View File

@ -7,6 +7,7 @@ use crate::{
store_stream_for_key, store_stream_for_keys, store_stream_for_range,
store_stream_for_ranges, KvRequest,
},
store::Store,
transaction::HasLocks,
BoundRange, ColumnFamily, Key, KvPair, Result, Value,
};
@ -14,7 +15,6 @@ use async_trait::async_trait;
use futures::{prelude::*, stream::BoxStream};
use std::{mem, sync::Arc};
use tikv_client_proto::kvrpcpb;
use tikv_client_store::Store;
#[async_trait]
impl KvRequest for kvrpcpb::RawGetRequest {

View File

@ -4,6 +4,7 @@ use crate::{
backoff::{Backoff, NoBackoff, NoJitterBackoff},
pd::PdClient,
stats::tikv_stats,
store::Store,
transaction::{resolve_locks, HasLocks},
BoundRange, Error, ErrorKind, Key, Result,
};
@ -13,7 +14,7 @@ use std::{
cmp::{max, min},
sync::Arc,
};
use tikv_client_store::{HasError, HasRegionError, Request, Store};
use tikv_client_store::{HasError, HasRegionError, Request};
const DEFAULT_REGION_BACKOFF: NoJitterBackoff = NoJitterBackoff::new(2, 500, 10);
pub const OPTIMISTIC_BACKOFF: NoJitterBackoff = NoJitterBackoff::new(2, 500, 10);

31
src/store.rs Normal file
View File

@ -0,0 +1,31 @@
use crate::{Region, Result};
use derive_new::new;
use std::any::Any;
use tikv_client_store::{KvClient, KvConnect, Request, TikvConnect};
#[derive(new)]
pub struct Store {
pub region: Region,
pub client: Box<dyn KvClient + Send + Sync>,
}
impl Store {
pub async fn dispatch<Req: Request, Resp: Any>(&self, request: &Req) -> Result<Box<Resp>> {
Ok(self
.client
.dispatch(request)
.await?
.downcast()
.expect("Downcast failed"))
}
}
pub trait KvConnectStore: KvConnect {
fn connect_to_store(&self, region: Region, address: String) -> Result<Store> {
info!("connect to tikv endpoint: {:?}", &address);
let client = self.connect(address.as_str())?;
Ok(Store::new(region, Box::new(client)))
}
}
impl KvConnectStore for TikvConnect {}

View File

@ -5,12 +5,12 @@ use crate::{
config::Config,
pd::{PdClient, PdRpcClient},
request::{KvRequest, OPTIMISTIC_BACKOFF},
timestamp::TimestampExt,
transaction::{Snapshot, Transaction},
Result,
};
use futures::executor::ThreadPool;
use std::{mem, sync::Arc};
use tikv_client_common::TimestampExt;
use tikv_client_proto::{kvrpcpb, pdpb::Timestamp};
const SCAN_LOCK_BATCH_SIZE: u32 = 1024; // TODO: cargo-culted value

View File

@ -1,6 +1,7 @@
use crate::{
pd::PdClient,
request::{KvRequest, OPTIMISTIC_BACKOFF},
timestamp::TimestampExt,
transaction::requests,
ErrorKind, Key, RegionVerId, Result,
};
@ -8,7 +9,6 @@ use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use tikv_client_common::TimestampExt;
use tikv_client_proto::{kvrpcpb, pdpb::Timestamp};
const RESOLVE_LOCK_RETRY_LIMIT: usize = 10;

View File

@ -4,15 +4,15 @@ use crate::{
backoff::Backoff,
pd::PdClient,
request::{store_stream_for_key, store_stream_for_keys, store_stream_for_range, KvRequest},
store::Store,
timestamp::TimestampExt,
transaction::HasLocks,
BoundRange, Error, Key, KvPair, Result, Value,
};
use async_trait::async_trait;
use futures::{prelude::*, stream::BoxStream};
use std::{iter, mem, sync::Arc};
use tikv_client_common::TimestampExt;
use tikv_client_proto::{kvrpcpb, pdpb::Timestamp};
use tikv_client_store::Store;
#[async_trait]
impl KvRequest for kvrpcpb::GetRequest {

View File

@ -3,13 +3,13 @@
use crate::{
pd::{PdClient, PdRpcClient},
request::{KvRequest, OPTIMISTIC_BACKOFF, PESSIMISTIC_BACKOFF},
timestamp::TimestampExt,
transaction::{buffer::Buffer, requests::*},
BoundRange, Error, ErrorKind, Key, KvPair, Result, Value,
};
use derive_new::new;
use futures::{executor::ThreadPool, prelude::*, stream::BoxStream};
use std::{iter, mem, ops::RangeBounds, sync::Arc};
use tikv_client_common::TimestampExt;
use tikv_client_proto::{kvrpcpb, pdpb::Timestamp};
/// A undo-able set of actions on the dataset.

View File

@ -5,15 +5,12 @@ edition = "2018"
[dependencies]
derive-new = "0.5"
failure = "0.1"
futures = { version = "0.3.5", features = ["compat", "async-await", "thread-pool"] }
grpcio = { version = "0.6", features = [ "secure", "prost-codec" ], default-features = false }
lazy_static = "1"
log = "0.4"
regex = "1"
serde = "1.0"
serde_derive = "1.0"
tikv-client-proto = { path = "../tikv-client-proto" }
[dev-dependencies]

View File

@ -1,15 +1,9 @@
#[macro_use]
mod errors;
mod kv;
pub mod security;
mod timestamp;
#[macro_use]
extern crate log;
#[doc(inline)]
pub use crate::errors::{Error, ErrorKind, Result};
#[doc(inline)]
pub use crate::kv::{codec, BoundRange, Key, KvPair, ToOwnedRange, Value};
#[doc(inline)]
pub use crate::timestamp::{Timestamp, TimestampExt};

View File

@ -5,13 +5,11 @@ edition = "2018"
[dependencies]
async-trait = "0.1"
derive-new = "0.5"
futures = { version = "0.3.5", features = ["compat", "async-await", "thread-pool"] }
grpcio = { version = "0.6", features = [ "secure", "prost-codec" ], default-features = false }
log = "0.4"
tikv-client-common = { path = "../tikv-client-common" }
tikv-client-proto = { path = "../tikv-client-proto" }
tokio = { version = "0.2", features = ["sync"] }
[dev-dependencies]
clap = "2.32"
@ -19,4 +17,3 @@ fail = { version = "0.3", features = [ "failpoints" ] }
proptest = "0.9"
proptest-derive = "0.1.0"
tempdir = "0.3"
tokio = { version = "0.2", features = ["rt-threaded", "macros"] }

View File

@ -1,6 +1,6 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.
use crate::{request::Request, Region, Result, SecurityManager};
use crate::{request::Request, Result, SecurityManager};
use async_trait::async_trait;
use derive_new::new;
use grpcio::{CallOption, Environment};
@ -12,12 +12,6 @@ pub trait KvConnect: Sized + Send + Sync + 'static {
type KvClient: KvClient + Clone + Send + Sync + 'static;
fn connect(&self, address: &str) -> Result<Self::KvClient>;
fn connect_to_store(&self, region: Region, address: String) -> Result<Store> {
info!("connect to tikv endpoint: {:?}", &address);
let client = self.connect(address.as_str())?;
Ok(Store::new(region, Box::new(client)))
}
}
#[derive(new, Clone)]
@ -61,20 +55,3 @@ impl KvClient for KvRpcClient {
.await
}
}
#[derive(new)]
pub struct Store {
pub region: Region,
pub client: Box<dyn KvClient + Send + Sync>,
}
impl Store {
pub async fn dispatch<Req: Request, Resp: Any>(&self, request: &Req) -> Result<Box<Resp>> {
Ok(self
.client
.dispatch(request)
.await?
.downcast()
.expect("Downcast failed"))
}
}

View File

@ -1,18 +1,13 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
#[macro_use]
extern crate log;
mod client;
mod errors;
pub mod region;
mod request;
#[doc(inline)]
pub use crate::{
client::{KvClient, KvConnect, Store, TikvConnect},
client::{KvClient, KvConnect, TikvConnect},
errors::{HasError, HasRegionError},
region::{Region, RegionId, RegionVerId, StoreId},
request::Request,
};
pub use tikv_client_common::{security::SecurityManager, Error, ErrorKind, Key, Result};
pub use tikv_client_common::{security::SecurityManager, Error, ErrorKind, Result};