diff --git a/.travis.yml b/.travis.yml index 82eaad6..ed763ac 100644 --- a/.travis.yml +++ b/.travis.yml @@ -45,4 +45,4 @@ script: - if [[ $TRAVIS_OS_NAME == "linux" ]]; then docker ps; fi - if [[ $TRAVIS_OS_NAME == "linux" ]]; then docker logs pd; fi - if [[ $TRAVIS_OS_NAME == "linux" ]]; then docker logs kv; fi - - if [[ $TRAVIS_OS_NAME == "linux" ]]; then PD_ADDR="127.0.0.1:2379" cargo test --all --features integration-tests -- --nocapture; fi + - if [[ $TRAVIS_OS_NAME == "linux" ]]; then PD_ADDRS="127.0.0.1:2379" cargo test --all --features integration-tests -- --nocapture; fi diff --git a/Cargo.toml b/Cargo.toml index 9f9c2e5..a2de081 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ name = "tikv_client" [dependencies] derive-new = "0.5" failure = "0.1" -futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] } +futures-preview = { version = "0.3.0-alpha.17", features = ["compat", "async-await", "nightly"] } grpcio = { version = "0.5.0-alpha", features = [ "secure", "prost-codec" ], default-features = false } kvproto = { git = "https://github.com/pingcap/kvproto.git", rev = "ec9df5f208a768074c28d04bfb8b90e5321d2f69", features = [ "prost-codec" ], default-features = false } lazy_static = "1" @@ -28,7 +28,6 @@ log = "0.3.9" regex = "1" serde = "1.0" serde_derive = "1.0" -tokio-core = "0.1" tokio-timer = "0.2" [dependencies.prometheus] diff --git a/src/compat.rs b/src/compat.rs index b36f765..66e2fe1 100644 --- a/src/compat.rs +++ b/src/compat.rs @@ -94,124 +94,3 @@ pub(crate) trait ClientFutureExt { } impl ClientFutureExt for T {} - -/// Emulate `send_all`/`SendAll` from futures 0.1 since the 0.3 versions don't -/// work with Tokio `Handle`s due to ownership differences. -pub(crate) trait SinkCompat { - fn send_all_compat(self, stream: S) -> SendAllCompat - where - S: Stream + Unpin, - Self: Sink + Sized + Unpin, - { - SendAllCompat::new(self, stream) - } -} - -impl> SinkCompat for S {} - -#[derive(Debug)] -#[must_use = "futures do nothing unless polled"] -pub(crate) struct SendAllCompat -where - Si: Sink + Unpin, - St: Stream + Unpin, -{ - sink: Option, - stream: Option>, - buffered: Option, -} - -impl Unpin for SendAllCompat -where - Si: Sink + Unpin, - St: Stream + Unpin, -{ -} - -impl SendAllCompat -where - Si: Sink + Unpin, - St: Stream + Unpin, -{ - pub(crate) fn new(sink: Si, stream: St) -> SendAllCompat { - SendAllCompat { - sink: Some(sink), - stream: Some(stream.fuse()), - buffered: None, - } - } - - fn sink_mut(&mut self) -> Pin<&mut Si> { - Pin::new( - self.sink - .as_mut() - .take() - .expect("Attempted to poll SendAllCompat after completion"), - ) - } - - fn stream_mut(&mut self) -> Pin<&mut stream::Fuse> { - Pin::new( - self.stream - .as_mut() - .take() - .expect("Attempted to poll SendAllCompat after completion"), - ) - } - - fn take_result(&mut self) -> (Si, St) { - let sink = self - .sink - .take() - .expect("Attempted to poll SendAllCompat after completion"); - let fuse = self - .stream - .take() - .expect("Attempted to poll SendAllCompat after completion"); - (sink, fuse.into_inner()) - } - - fn try_start_send( - &mut self, - item: St::Item, - cx: &mut Context, - ) -> Poll> { - debug_assert!(self.buffered.is_none()); - match self.sink_mut().poll_ready(cx) { - Poll::Ready(Ok(())) => Poll::Ready(self.sink_mut().start_send(item)), - Poll::Ready(Err(e)) => Poll::Ready(Err(e)), - Poll::Pending => { - self.buffered = Some(item); - Poll::Pending - } - } - } -} - -impl Future for SendAllCompat -where - Si: Sink + Unpin, - St: Stream + Unpin, -{ - type Output = Result<((Si, St)), Si::Error>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - if let Some(item) = self.buffered.take() { - ready!(self.try_start_send(item, cx))? - } - - loop { - match self.stream_mut().poll_next(cx) { - Poll::Ready(Some(item)) => ready!(self.try_start_send(item, cx))?, - Poll::Ready(None) => { - ready!(self.sink_mut().poll_close(cx))?; - return Poll::Ready(Ok(self.take_result())); - } - Poll::Pending => { - ready!(self.sink_mut().poll_flush(cx))?; - return Poll::Pending; - } - } - } - } -} diff --git a/src/errors.rs b/src/errors.rs index 65d12e1..0adad84 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -198,8 +198,10 @@ impl Error { }) } - pub(crate) fn internal_error(message: String) -> Self { - Error::from(ErrorKind::InternalError { message }) + pub(crate) fn internal_error(message: impl Into) -> Self { + Error::from(ErrorKind::InternalError { + message: message.into(), + }) } } diff --git a/src/proptests/mod.rs b/src/proptests/mod.rs index 7e0144d..6760ddc 100644 --- a/src/proptests/mod.rs +++ b/src/proptests/mod.rs @@ -9,7 +9,7 @@ use std::env::var; mod raw; -pub(crate) const ENV_PD_ADDR: &str = "PD_ADDR"; +pub(crate) const ENV_PD_ADDRS: &str = "PD_ADDRS"; pub(crate) const PROPTEST_KEY_MAX: usize = 1024 * 2; // 2 KB pub(crate) const PROPTEST_VALUE_MAX: usize = 1024 * 16; // 16 KB pub(crate) const PROPTEST_BATCH_SIZE_MAX: usize = 16; @@ -22,9 +22,9 @@ pub fn arb_batch( proptest::collection::vec(single_strategy, 0..max_batch_size) } -pub fn pd_addr() -> Vec { - var(ENV_PD_ADDR) - .expect(&format!("Expected {}:", ENV_PD_ADDR)) +pub fn pd_addrs() -> Vec { + var(ENV_PD_ADDRS) + .expect(&format!("Expected {}:", ENV_PD_ADDRS)) .split(",") .map(From::from) .collect() diff --git a/src/proptests/raw.rs b/src/proptests/raw.rs index 972016e..a76c83b 100644 --- a/src/proptests/raw.rs +++ b/src/proptests/raw.rs @@ -1,6 +1,6 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. -use super::{arb_batch, pd_addr}; +use super::{arb_batch, pd_addrs}; use crate::{raw::Client, Config, KvPair, Value}; use futures::executor::block_on; use proptest::{arbitrary::any, proptest}; @@ -12,7 +12,7 @@ proptest! { fn point( pair in any::(), ) { - let client = block_on(Client::connect(Config::new(pd_addr()))).unwrap(); + let client = block_on(Client::connect(Config::new(pd_addrs()))).unwrap(); block_on( client.put(pair.key().clone(), pair.value().clone()) @@ -36,7 +36,7 @@ proptest! { fn batch( kvs in arb_batch(any::(), None), ) { - let client = block_on(Client::connect(Config::new(pd_addr()))).unwrap(); + let client = block_on(Client::connect(Config::new(pd_addrs()))).unwrap(); let keys = kvs.iter().map(|kv| kv.key()).cloned().collect::>(); block_on( diff --git a/src/rpc/client.rs b/src/rpc/client.rs index 02984ae..6b2df52 100644 --- a/src/rpc/client.rs +++ b/src/rpc/client.rs @@ -226,7 +226,7 @@ impl RpcClient { } pub fn get_timestamp(self: Arc) -> impl Future> { - Arc::clone(&self.pd).get_timestamp() + self.pd.clone().get_timestamp() } // Returns a Steam which iterates over the contexts for each region covered by range. diff --git a/src/rpc/pd/client.rs b/src/rpc/pd/client.rs index 08ac6cc..fc6f083 100644 --- a/src/rpc/pd/client.rs +++ b/src/rpc/pd/client.rs @@ -16,7 +16,7 @@ use kvproto::{metapb, pdpb}; use crate::{ rpc::{ pd::{ - context::request_context, request::retry_request, timestamp::PdReactor, Region, + context::request_context, request::retry_request, timestamp::TimestampOracle, Region, RegionId, StoreId, Timestamp, }, security::SecurityManager, @@ -108,6 +108,7 @@ impl PdClient for RetryClient { } fn get_timestamp(self: Arc) -> BoxFuture<'static, Result> { + // FIXME: retry or reconnect on error Box::pin(self.cluster.read().unwrap().get_timestamp()) } } @@ -142,7 +143,7 @@ pub struct Cluster { pub id: u64, pub(super) client: pdpb::PdClient, members: pdpb::GetMembersResponse, - reactor: Arc>, + tso: TimestampOracle, } // These methods make a single attempt to make a request. @@ -256,7 +257,7 @@ impl Cluster { } fn get_timestamp(&self) -> impl Future> { - self.reactor.write().unwrap().get_timestamp() + self.tso.clone().get_timestamp() } } @@ -281,14 +282,13 @@ impl Connection { let (client, members) = self.try_connect_leader(&members, timeout)?; let id = members.get_header().get_cluster_id(); + let tso = TimestampOracle::new(id, &client)?; let cluster = Cluster { id, members, client, - reactor: Arc::new(RwLock::new(PdReactor::new())), + tso, }; - - PdReactor::start(cluster.reactor.clone(), &cluster); Ok(cluster) } @@ -307,14 +307,14 @@ impl Connection { warn!("updating pd client, blocking the tokio core"); let start = Instant::now(); let (client, members) = self.try_connect_leader(&old_cluster.members, timeout)?; + let tso = TimestampOracle::new(old_cluster.id, &client)?; let cluster = Cluster { id: old_cluster.id, client, members, - reactor: old_cluster.reactor.clone(), + tso, }; - PdReactor::start(cluster.reactor.clone(), &cluster); *self.last_update.write().unwrap() = Instant::now(); warn!("updating PD client done, spent {:?}", start.elapsed()); diff --git a/src/rpc/pd/timestamp.rs b/src/rpc/pd/timestamp.rs index 07e26fd..ee44499 100644 --- a/src/rpc/pd/timestamp.rs +++ b/src/rpc/pd/timestamp.rs @@ -2,227 +2,193 @@ //! This module is the low-level mechanisms for getting timestamps from a PD //! cluster. It should be used via the `get_timestamp` API in `PdClient`. +//! +//! Once a `TimestampOracle` is created, there will be two futures running in a background working +//! thread created automatically. The `get_timestamp` method creates a oneshot channel whose +//! transmitter is served as a `TimestampRequest`. `TimestampRequest`s are sent to the working +//! thread through a bounded multi-producer, single-consumer channel. Every time the first future +//! is polled, it tries to exhaust the channel to get as many requests as possible and sends a +//! single `TsoRequest` to the PD server. The other future receives `TsoResponse`s from the PD +//! server and allocates timestamps for the requests. -use std::{ - sync::{Arc, RwLock}, - thread::{self, JoinHandle}, -}; +use super::Timestamp; +use crate::{Error, Result}; -use futures::channel::{ - mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender}, - oneshot, +use futures::{ + channel::{mpsc, oneshot}, + compat::*, + executor::block_on, + join, pin_mut, + prelude::*, + task::{AtomicWaker, Context, Poll}, }; -use futures::compat::{Compat01As03, Compat01As03Sink}; -use futures::future::TryFutureExt; -use futures::prelude::*; use grpcio::WriteFlags; -use kvproto::pdpb; -use tokio_core::reactor::{Core, Handle as TokioHandle}; +use kvproto::pdpb::*; +use std::{cell::RefCell, collections::VecDeque, pin::Pin, rc::Rc, thread}; -use crate::{ - compat::SinkCompat, - rpc::pd::{ - client::Cluster, - context::{observe_tso_batch, request_context}, - Timestamp, - }, - Result, -}; +/// It is an empirical value. +const MAX_BATCH_SIZE: usize = 64; -type TsoChannel = oneshot::Sender; +/// TODO: This value should be adjustable. +const MAX_PENDING_COUNT: usize = 1 << 16; -enum Task { - Init, - Request, - Response(Vec>, pdpb::TsoResponse), +type TimestampRequest = oneshot::Sender; + +/// The timestamp oracle (TSO) which provides monotonically increasing timestamps. +#[derive(Clone)] +pub struct TimestampOracle { + /// The transmitter of a bounded channel which transports requests of getting a single + /// timestamp to the TSO working thread. A bounded channel is used to prevent using + /// too much memory unexpectedly. + /// In the working thread, the `TimestampRequest`, which is actually a one channel sender, + /// is used to send back the timestamp result. + request_tx: mpsc::Sender, } -impl Task { - fn dispatch( - self, - reactor: Arc>, - cluster_id: u64, - pd_client: &pdpb::PdClient, - handle: &TokioHandle, - ) { - match self { - Task::Request => reactor.write().unwrap().tso_request(cluster_id), - Task::Response(requests, response) => { - reactor.write().unwrap().tso_response(requests, response) +impl TimestampOracle { + pub fn new(cluster_id: u64, pd_client: &PdClient) -> Result { + let (request_tx, request_rx) = mpsc::channel(MAX_BATCH_SIZE); + // FIXME: use tso_opt + let (rpc_sender, rpc_receiver) = pd_client.tso()?; + + // Start a background thread to handle TSO requests and responses + thread::spawn(move || { + block_on(run_tso( + cluster_id, + rpc_sender.sink_compat().sink_err_into(), + rpc_receiver.compat().err_into(), + request_rx, + )) + }); + + Ok(TimestampOracle { request_tx }) + } + + pub async fn get_timestamp(mut self) -> Result { + let (request, response) = oneshot::channel(); + self.request_tx + .send(request) + .await + .map_err(|_| Error::internal_error("TimestampRequest channel is closed"))?; + Ok(response.await?) + } +} + +async fn run_tso( + cluster_id: u64, + mut rpc_sender: impl Sink<(TsoRequest, WriteFlags), Error = Error> + Unpin, + mut rpc_receiver: impl Stream> + Unpin, + request_rx: mpsc::Receiver, +) { + // The `TimestampRequest`s which are waiting for the responses from the PD server + let pending_requests = Rc::new(RefCell::new(VecDeque::with_capacity(MAX_PENDING_COUNT))); + + // When there are too many pending requests, the `send_request` future will refuse to fetch + // more requests from the bounded channel. This waker is used to wake up the sending future + // if the queue containing pending requests is no longer full. + let sending_future_waker = Rc::new(AtomicWaker::new()); + + pin_mut!(request_rx); + let mut request_stream = TsoRequestStream { + cluster_id, + request_rx, + pending_requests: pending_requests.clone(), + self_waker: sending_future_waker.clone(), + }; + + let send_requests = rpc_sender.send_all(&mut request_stream); + + let receive_and_handle_responses = async move { + while let Some(Ok(resp)) = rpc_receiver.next().await { + let mut pending_requests = pending_requests.borrow_mut(); + + // Wake up the sending future blocked by too many pending requests as we are consuming + // some of them here. + if pending_requests.len() == MAX_PENDING_COUNT { + sending_future_waker.wake(); } - Task::Init => PdReactor::init(reactor, pd_client, handle), + + allocate_timestamps(&resp, &mut pending_requests)?; } - } + Err(Error::internal_error("TSO stream terminated")) + }; + + let (send_res, recv_res): (_, Result<()>) = join!(send_requests, receive_and_handle_responses); + error!("TSO send error: {:?}", send_res); + error!("TSO receive error: {:?}", recv_res); } -/// A special-purpose event loop for asynchronously requesting timestamps. This is -/// more complex than just sending a request since requests are batched on the client. -pub(super) struct PdReactor { - // These fields are for communicating internally within the reactor to initialize - // it and send communicate between threads. - task_tx: Option>, - tso_tx: Sender, - tso_rx: Option>, - - // These fields are for communicating with the PD cluster to get batches of timestamps. - handle: Option>, - tso_pending: Option>, - tso_buffer: Option>, - tso_batch: Vec, +struct TsoRequestStream<'a> { + cluster_id: u64, + request_rx: Pin<&'a mut mpsc::Receiver>>, + pending_requests: Rc>>, + self_waker: Rc, } -impl Drop for PdReactor { - fn drop(&mut self) { - if let Some(handle) = self.handle.take() { - handle.join().unwrap(); - } - } -} +impl<'a> Stream for TsoRequestStream<'a> { + type Item = (TsoRequest, WriteFlags); -impl PdReactor { - pub(super) fn new() -> Self { - let (tso_tx, tso_rx) = mpsc::channel(1); - PdReactor { - task_tx: None, - tso_tx, - tso_rx: Some(tso_rx), - handle: None, - tso_buffer: Some(Vec::with_capacity(8)), - tso_batch: Vec::with_capacity(8), - tso_pending: None, - } - } - - /// Startup the reactor, including the communication thread if necessary. - pub(super) fn start(r: Arc>, cluster: &Cluster) { - let reactor = &mut r.write().unwrap(); - if reactor.handle.is_none() { - info!("starting pd reactor thread"); - let (task_tx, task_rx) = mpsc::unbounded(); - task_tx.unbounded_send(Task::Init).unwrap(); - let pd_client = cluster.client.clone(); - let cluster_id = cluster.id; - reactor.task_tx = Some(task_tx); - let r2 = r.clone(); - reactor.handle = Some( - thread::Builder::new() - .name("dispatcher thread".to_owned()) - .spawn(move || PdReactor::poll(r2, cluster_id, pd_client, task_rx)) - .unwrap(), - ) - } else { - warn!("tso sender and receiver are stale, refreshing..."); - let (tso_tx, tso_rx) = mpsc::channel(1); - reactor.tso_tx = tso_tx; - reactor.tso_rx = Some(tso_rx); - reactor.schedule(Task::Init); - } - } - - fn poll( - r: Arc>, - cluster_id: u64, - pd_client: pdpb::PdClient, - task_rx: UnboundedReceiver, - ) { - let mut core = Core::new().unwrap(); - let handle = core.handle(); - - let f = task_rx - .map(|t| { - t.dispatch(r.clone(), cluster_id, &pd_client, &handle); - }) - .collect::>(); - core.run(TryFutureExt::compat(f.unit_error())).unwrap(); - } - - // Initialise client for communicating with the PD cluster. - fn init(r: Arc>, pd_client: &pdpb::PdClient, handle: &TokioHandle) { - let (tx, rx) = pd_client.tso().unwrap(); - let tx = Compat01As03Sink::new(tx); - let rx = Compat01As03::new(rx); - let tso_rx = r.write().unwrap().tso_rx.take().unwrap(); // Receiver: Stream - - handle.spawn( - tx.sink_map_err(Into::into) - .send_all_compat(tso_rx.map(|r| (r, WriteFlags::default()))) - .map(|r: Result<_>| match r { - Ok((_sender, _)) => { - // FIXME(#54) the previous code doesn't work because we can't get mutable - // access to the underlying StreamingCallSink to call `cancel`. But I think - // that is OK because it will be canceled when it is dropped. - // - // _sender.get_mut().get_ref().cancel(); - Ok(()) - } - Err(e) => { - error!("failed to send tso requests: {:?}", e); - Err(()) - } - }) - .compat(), - ); - - handle.spawn( - rx.try_for_each(move |resp| { - let reactor = &mut r.write().unwrap(); - let tso_pending = reactor.tso_pending.take().unwrap(); - reactor.schedule(Task::Response(tso_pending, resp)); - if !reactor.tso_batch.is_empty() { - // Schedule another tso_batch of request - reactor.schedule(Task::Request); + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let pending_requests = self.pending_requests.clone(); + let mut pending_requests = pending_requests.borrow_mut(); + let mut count = 0; + while count < MAX_BATCH_SIZE && pending_requests.len() < MAX_PENDING_COUNT { + match self.request_rx.as_mut().poll_next(cx) { + Poll::Ready(Some(sender)) => { + pending_requests.push_back(sender); + count += 1; } - future::ready(Ok(())) - }) - .map_err(|e| panic!("unexpected error: {:?}", e)) - .compat(), - ); - } - - fn tso_request(&mut self, cluster_id: u64) { - let mut tso_batch = self.tso_buffer.take().unwrap(); - tso_batch.extend(self.tso_batch.drain(..)); - let mut request = pd_request!(cluster_id, pdpb::TsoRequest); - let batch_size = tso_batch.len(); - observe_tso_batch(batch_size); - request.count = batch_size as u32; - self.tso_pending = Some(tso_batch); - self.tso_tx - .try_send(request) - .expect("channel can never be full"); - } - - fn tso_response(&mut self, mut requests: Vec, response: pdpb::TsoResponse) { - let timestamp = response.get_timestamp(); - for (offset, request) in requests.drain(..).enumerate() { - request - .send(Timestamp { - physical: timestamp.physical, - logical: timestamp.logical + offset as i64, - }) - .unwrap(); + Poll::Ready(None) => return Poll::Ready(None), + Poll::Pending => break, + } } - self.tso_buffer = Some(requests); - } - fn schedule(&self, task: Task) { - self.task_tx - .as_ref() - .unwrap() - .unbounded_send(task) - .expect("unbounded send should never fail"); - } - - pub fn get_timestamp(&mut self) -> impl Future> { - let context = request_context("get_ts"); - let (tx, rx) = oneshot::channel::(); - self.tso_batch.push(tx); - if self.tso_pending.is_none() { - // Schedule tso request to run. - self.schedule(Task::Request); + if count > 0 { + let req = TsoRequest { + header: Some(RequestHeader { + cluster_id: self.cluster_id, + }), + count: count as u32, + }; + let write_flags = WriteFlags::default().buffer_hint(false); + Poll::Ready(Some((req, write_flags))) + } else { + // Set the waker to the context, then the stream can be waked up after the pending queue + // is no longer full. + self.self_waker.register(cx.waker()); + Poll::Pending } - rx.map_err(Into::into) - .into_future() - .map(move |r| context.done(r)) } } + +fn allocate_timestamps( + resp: &TsoResponse, + pending_requests: &mut VecDeque, +) -> Result<()> { + // PD returns the timestamp with the biggest logical value. We can send back timestamps + // whose logical value is from `logical - count + 1` to `logical` using the senders + // in `pending`. + let tail_ts = resp + .timestamp + .as_ref() + .ok_or_else(|| Error::internal_error("No timestamp in TsoResponse"))?; + + let mut offset = i64::from(resp.count); + while offset > 0 { + offset -= 1; + if let Some(sender) = pending_requests.pop_front() { + let ts = Timestamp { + physical: tail_ts.physical, + logical: tail_ts.logical - offset, + }; + + // It doesn't matter if the receiver of the channel is dropped. + let _ = sender.send(ts); + } else { + return Err(Error::internal_error( + "PD gives more timestamps than expected", + )); + } + } + Ok(()) +} diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 8577a47..ff59190 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -78,7 +78,7 @@ impl Client { /// # futures::executor::block_on(async { /// let connect = Client::connect(Config::default()); /// let client = connect.await.unwrap(); - /// let timestamp = Timestamp { physical: 1564474902, logical: 1 }; + /// let timestamp = Timestamp { physical: 1564481750172, logical: 1 }; /// let snapshot = client.snapshot_at(timestamp); /// // ... Issue some commands. /// # }); @@ -127,7 +127,10 @@ impl Future for Connect { type Output = Result; fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll { - let _config = &self.config; - unimplemented!() + let config = &self.config; + // TODO: RpcClient::connect currently uses a blocking implementation. + // Make it asynchronous later. + let rpc = Arc::new(RpcClient::connect(config)?); + Poll::Ready(Ok(Client { rpc })) } } diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs new file mode 100644 index 0000000..88a6b15 --- /dev/null +++ b/tests/integration_tests.rs @@ -0,0 +1,41 @@ +#![cfg(feature = "integration-tests")] +#![feature(async_await)] + +use failure::Fallible; +use futures::executor::ThreadPool; +use futures::prelude::*; +use std::env; +use tikv_client::{transaction::*, Config, Result}; + +#[test] +fn get_timestamp() -> Fallible<()> { + const COUNT: usize = 1 << 16; + let mut pool = ThreadPool::new()?; + let config = Config::new(pd_addrs()); + let fut = async { + let client = Client::connect(config).await?; + Result::Ok(future::join_all((0..COUNT).map(|_| client.current_timestamp())).await) + }; + // Calculate each version of retrieved timestamp + let mut versions = pool + .run(fut)? + .into_iter() + .map(|res| res.map(|ts| (ts.physical << 18) + ts.logical)) + .collect::>>()?; + + // Each version should be unique + versions.sort_unstable(); + versions.dedup(); + assert_eq!(versions.len(), COUNT); + Ok(()) +} + +const ENV_PD_ADDRS: &str = "PD_ADDRS"; + +fn pd_addrs() -> Vec { + env::var(ENV_PD_ADDRS) + .expect(&format!("Expected {}:", ENV_PD_ADDRS)) + .split(",") + .map(From::from) + .collect() +}