update nightly (#124)

* update nightly

Signed-off-by: Ana Hobden <operator@hoverbear.org>

* Resolve clippy

Signed-off-by: Ana Hobden <operator@hoverbear.org>

* Fix integration tests

Signed-off-by: Ana Hobden <operator@hoverbear.org>

* Reflect comments

Signed-off-by: Ana Hobden <operator@hoverbear.org>

* fmt

Signed-off-by: Ana Hobden <operator@hoverbear.org>
This commit is contained in:
Ana Hobden 2019-11-21 18:04:15 -08:00 committed by GitHub
parent e97407a817
commit 8d2c784c88
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 94 additions and 91 deletions

View File

@ -20,7 +20,7 @@ name = "tikv_client"
[dependencies]
derive-new = "0.5"
failure = "0.1"
futures-preview = { version = "0.3.0-alpha.18", features = ["compat", "async-await", "nightly"] }
futures = { version = "0.3.1", features = ["compat", "async-await", "thread-pool"] }
grpcio = { version = "0.5.0-alpha", features = [ "secure", "prost-codec" ], default-features = false }
kvproto = { git = "https://github.com/pingcap/kvproto.git", rev = "2fc6229ed6097b59dbe51525c7a65b19543a94ca", features = [ "prost-codec" ], default-features = false }
lazy_static = "1"
@ -28,7 +28,7 @@ log = "0.3.9"
regex = "1"
serde = "1.0"
serde_derive = "1.0"
futures-timer = "0.3"
futures-timer = "2.0"
[dependencies.prometheus]
version = "0.4.2"
@ -38,7 +38,7 @@ features = ["push", "process"]
[dev-dependencies]
clap = "2.32"
tempdir = "0.3"
runtime = { version = "0.3.0-alpha.7", default-features = false }
runtime = { version = "0.3.0-alpha.8", default-features = false }
runtime-tokio = "0.3.0-alpha.6"
proptest = "0.9"
proptest-derive = "0.1.0"

View File

@ -1 +1 @@
nightly-2019-08-25
nightly-2019-11-16

View File

@ -44,7 +44,7 @@ pub trait PdClient: Send + Sync + 'static {
key: &Key,
) -> BoxFuture<'static, Result<Store<Self::KvClient>>> {
self.region_for_key(key)
.and_then(move |region| self.clone().map_region_to_store(region))
.and_then(move |region| self.map_region_to_store(region))
.boxed()
}
@ -53,7 +53,7 @@ pub trait PdClient: Send + Sync + 'static {
id: RegionId,
) -> BoxFuture<'static, Result<Store<Self::KvClient>>> {
self.region_for_id(id)
.and_then(move |region| self.clone().map_region_to_store(region).boxed())
.and_then(move |region| self.map_region_to_store(region).boxed())
.boxed()
}

View File

@ -155,7 +155,7 @@ where
if reconnect_count == 0 {
return Err(e);
}
Delay::new(Duration::from_secs(RECONNECT_INTERVAL_SEC)).await?;
Delay::new(Duration::from_secs(RECONNECT_INTERVAL_SEC)).await;
}
}

View File

@ -93,7 +93,8 @@ async fn run_tso(
request_rx,
pending_requests: pending_requests.clone(),
self_waker: sending_future_waker.clone(),
};
}
.map(Ok);
let send_requests = rpc_sender.send_all(&mut request_stream);

View File

@ -83,7 +83,7 @@ pub trait KvRequest: Sync + Send + 'static + Sized {
) -> BoxStream<'static, Result<Self::RpcResponse>> {
// Retry on region errors by default
// TODO: Add backoff and retry limit
self.response_stream(pd_client.clone())
self.response_stream(pd_client)
}
fn store_stream<PdC: PdClient>(
@ -163,7 +163,15 @@ pub trait DispatchHook: KvRequest {
}
}
impl<T: KvRequest> DispatchHook for T {}
impl<T: KvRequest> DispatchHook for T {
#[cfg(test)]
default fn dispatch_hook(
&self,
_opt: CallOption,
) -> Option<BoxFuture<'static, Result<Self::RpcResponse>>> {
None
}
}
pub trait KvRpcRequest: Default {
fn set_context(&mut self, context: kvrpcpb::Context);

View File

@ -1,24 +1,19 @@
#![cfg(feature = "integration-tests")]
use failure::Fallible;
use futures::executor::{block_on, ThreadPool};
use futures::prelude::*;
use std::collections::HashMap;
use std::env;
use tikv_client::{Config, Key, Result, TransactionClient, Value};
#[test]
fn get_timestamp() -> Fallible<()> {
#[runtime::test(runtime_tokio::Tokio)]
async fn get_timestamp() -> Fallible<()> {
const COUNT: usize = 1 << 16;
let mut pool = ThreadPool::new()?;
let config = Config::new(pd_addrs());
let fut = async {
let client = TransactionClient::new(config).await?;
Result::Ok(future::join_all((0..COUNT).map(|_| client.current_timestamp())).await)
};
// Calculate each version of retrieved timestamp
let mut versions = pool
.run(fut)?
let client = TransactionClient::new(config).await?;
let mut versions = future::join_all((0..COUNT).map(|_| client.current_timestamp()))
.await
.into_iter()
.map(|res| res.map(|ts| (ts.physical << 18) + ts.logical))
.collect::<Result<Vec<_>>>()?;
@ -30,81 +25,80 @@ fn get_timestamp() -> Fallible<()> {
Ok(())
}
#[test]
fn crud() -> Fallible<()> {
#[runtime::test(runtime_tokio::Tokio)]
async fn crud() -> Fallible<()> {
let config = Config::new(pd_addrs());
block_on(async move {
let client = TransactionClient::new(config).await?;
let mut txn = client.begin().await?;
// Get non-existent keys
assert!(txn.get("foo".to_owned()).await?.is_none());
assert_eq!(
txn.batch_get(vec!["foo".to_owned(), "bar".to_owned()])
.await?
.filter(|(_, v)| v.is_some())
.count(),
0
);
txn.set("foo".to_owned(), "bar".to_owned()).await?;
txn.set("bar".to_owned(), "foo".to_owned()).await?;
// Read buffered values
assert_eq!(
txn.get("foo".to_owned()).await?,
Some("bar".to_owned().into())
);
let batch_get_res: HashMap<Key, Value> = txn
.batch_get(vec!["foo".to_owned(), "bar".to_owned()])
let client = TransactionClient::new(config).await?;
let mut txn = client.begin().await?;
// Get non-existent keys
assert!(txn.get("foo".to_owned()).await?.is_none());
assert_eq!(
txn.batch_get(vec!["foo".to_owned(), "bar".to_owned()])
.await?
.filter_map(|(k, v)| v.map(|v| (k, v)))
.collect();
assert_eq!(
batch_get_res.get(&Key::from("foo".to_owned())),
Some(Value::from("bar".to_owned())).as_ref()
);
assert_eq!(
batch_get_res.get(&Key::from("bar".to_owned())),
Some(Value::from("foo".to_owned())).as_ref()
);
txn.commit().await?;
.filter(|(_, v)| v.is_some())
.count(),
0
);
// Read from TiKV then update and delete
let mut txn = client.begin().await?;
assert_eq!(
txn.get("foo".to_owned()).await?,
Some("bar".to_owned().into())
);
let batch_get_res: HashMap<Key, Value> = txn
.batch_get(vec!["foo".to_owned(), "bar".to_owned()])
.await?
.filter_map(|(k, v)| v.map(|v| (k, v)))
.collect();
assert_eq!(
batch_get_res.get(&Key::from("foo".to_owned())),
Some(Value::from("bar".to_owned())).as_ref()
);
assert_eq!(
batch_get_res.get(&Key::from("bar".to_owned())),
Some(Value::from("foo".to_owned())).as_ref()
);
txn.set("foo".to_owned(), "foo".to_owned()).await?;
txn.delete("bar".to_owned()).await?;
txn.commit().await?;
txn.set("foo".to_owned(), "bar".to_owned()).await?;
txn.set("bar".to_owned(), "foo".to_owned()).await?;
// Read buffered values
assert_eq!(
txn.get("foo".to_owned()).await?,
Some("bar".to_owned().into())
);
let batch_get_res: HashMap<Key, Value> = txn
.batch_get(vec!["foo".to_owned(), "bar".to_owned()])
.await?
.filter_map(|(k, v)| v.map(|v| (k, v)))
.collect();
assert_eq!(
batch_get_res.get(&Key::from("foo".to_owned())),
Some(Value::from("bar".to_owned())).as_ref()
);
assert_eq!(
batch_get_res.get(&Key::from("bar".to_owned())),
Some(Value::from("foo".to_owned())).as_ref()
);
txn.commit().await?;
// Read again from TiKV
let txn = client.begin().await?;
let batch_get_res: HashMap<Key, Value> = txn
.batch_get(vec!["foo".to_owned(), "bar".to_owned()])
.await?
.filter_map(|(k, v)| v.map(|v| (k, v)))
.collect();
assert_eq!(
batch_get_res.get(&Key::from("foo".to_owned())),
Some(Value::from("foo".to_owned())).as_ref()
);
assert_eq!(batch_get_res.get(&Key::from("bar".to_owned())), None);
Fallible::Ok(())
})
// Read from TiKV then update and delete
let mut txn = client.begin().await?;
assert_eq!(
txn.get("foo".to_owned()).await?,
Some("bar".to_owned().into())
);
let batch_get_res: HashMap<Key, Value> = txn
.batch_get(vec!["foo".to_owned(), "bar".to_owned()])
.await?
.filter_map(|(k, v)| v.map(|v| (k, v)))
.collect();
assert_eq!(
batch_get_res.get(&Key::from("foo".to_owned())),
Some(Value::from("bar".to_owned())).as_ref()
);
assert_eq!(
batch_get_res.get(&Key::from("bar".to_owned())),
Some(Value::from("foo".to_owned())).as_ref()
);
txn.set("foo".to_owned(), "foo".to_owned()).await?;
txn.delete("bar".to_owned()).await?;
txn.commit().await?;
// Read again from TiKV
let txn = client.begin().await?;
let batch_get_res: HashMap<Key, Value> = txn
.batch_get(vec!["foo".to_owned(), "bar".to_owned()])
.await?
.filter_map(|(k, v)| v.map(|v| (k, v)))
.collect();
assert_eq!(
batch_get_res.get(&Key::from("foo".to_owned())),
Some(Value::from("foo".to_owned())).as_ref()
);
assert_eq!(batch_get_res.get(&Key::from("bar".to_owned())), None);
Fallible::Ok(())
}
const ENV_PD_ADDRS: &str = "PD_ADDRS";