From 94f080e269922e8c72c016e98937c4e325df9327 Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Tue, 30 Jul 2019 18:54:10 +0800 Subject: [PATCH 1/8] Add integration tests for get TS Signed-off-by: Yilin Chen --- src/transaction/client.rs | 9 ++++++--- tests/integration_tests.rs | 41 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 3 deletions(-) create mode 100644 tests/integration_tests.rs diff --git a/src/transaction/client.rs b/src/transaction/client.rs index d59e78f..d82da83 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -78,7 +78,7 @@ impl Client { /// # futures::executor::block_on(async { /// let connect = Client::connect(Config::default()); /// let client = connect.await.unwrap(); - /// let timestamp = Timestamp { physical: 1564474902, logical: 1 }; + /// let timestamp = Timestamp { physical: 1564481750172, logical: 1 }; /// let snapshot = client.snapshot_at(timestamp); /// // ... Issue some commands. /// # }); @@ -127,7 +127,10 @@ impl Future for Connect { type Output = Result; fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll { - let _config = &self.config; - unimplemented!() + let config = &self.config; + // TODO: RpcClient::connect currently uses a blocking implementation. + // Make it asynchronous later. + let rpc = Arc::new(RpcClient::connect(config)?); + Poll::Ready(Ok(Client { rpc })) } } diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs new file mode 100644 index 0000000..ac8c032 --- /dev/null +++ b/tests/integration_tests.rs @@ -0,0 +1,41 @@ +#![cfg(feature = "integration-tests")] +#![feature(async_await)] + +use failure::Fallible; +use futures::executor::ThreadPool; +use futures::prelude::*; +use std::env; +use tikv_client::{transaction::*, Config, Result}; + +#[test] +fn get_timestamp() -> Fallible<()> { + const COUNT: usize = 1 << 12; + let mut pool = ThreadPool::new()?; + let config = Config::new(pd_addrs()); + let fut = async { + let client = Client::connect(config).await?; + Result::Ok(future::join_all((0..COUNT).map(|_| client.current_timestamp())).await) + }; + // Calculate each version of retrieved timestamp + let mut versions = pool + .run(fut)? + .into_iter() + .map(|res| res.map(|ts| ts.physical << 18 + ts.logical)) + .collect::>>()?; + + // Each version should be unique + versions.sort_unstable(); + versions.dedup(); + assert_eq!(versions.len(), COUNT); + Ok(()) +} + +const ENV_PD_ADDRS: &str = "PD_ADDRS"; + +fn pd_addrs() -> Vec { + env::var(ENV_PD_ADDRS) + .expect(&format!("Expected {}:", ENV_PD_ADDRS)) + .split(",") + .map(From::from) + .collect() +} From e652a6aeb45001fdd471ed0438aedfd2f2724243 Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Wed, 31 Jul 2019 15:00:37 +0800 Subject: [PATCH 2/8] Add a new TSO implementation Signed-off-by: Yilin Chen --- Cargo.toml | 3 +- README.md | 6 +- rust-toolchain | 2 +- src/compat.rs | 121 ------------ src/errors.rs | 6 +- src/rpc/pd/client.rs | 18 +- src/rpc/pd/timestamp.rs | 375 +++++++++++++++++-------------------- src/transaction/client.rs | 2 +- tests/integration_tests.rs | 4 +- 9 files changed, 189 insertions(+), 348 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9f9c2e5..a2de081 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ name = "tikv_client" [dependencies] derive-new = "0.5" failure = "0.1" -futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] } +futures-preview = { version = "0.3.0-alpha.17", features = ["compat", "async-await", "nightly"] } grpcio = { version = "0.5.0-alpha", features = [ "secure", "prost-codec" ], default-features = false } kvproto = { git = "https://github.com/pingcap/kvproto.git", rev = "ec9df5f208a768074c28d04bfb8b90e5321d2f69", features = [ "prost-codec" ], default-features = false } lazy_static = "1" @@ -28,7 +28,6 @@ log = "0.3.9" regex = "1" serde = "1.0" serde_derive = "1.0" -tokio-core = "0.1" tokio-timer = "0.2" [dependencies.prometheus] diff --git a/README.md b/README.md index b34d367..7557384 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,9 @@ This is an open source (Apache 2) project hosted by the Cloud Native Computing F ## Using the client -The TiKV client is a Rust library (crate). It requires a nightly Rust compiler with async/await support. +The TiKV client is a Rust library (crate). It uses async/await internally and exposes some `async fn` APIs as well. + +Async/await is a new feature in Rust and is currently unstable. To use it you'll need to add the feature flag `#![async_await]` to your crate and use a nightly compiler (see below). To use this crate in your project, add it as a dependency in your `Cargo.toml`: @@ -50,7 +52,7 @@ To check what version of Rust you are using, run rustc --version ``` -You'll see something like `rustc 1.38.0-nightly (4b65a86eb 2019-07-15)` where the `1.38.0` is the toolchain version, and `nightly` is the channel (stable/beta/nightly). To install another toolchain use +You'll see something like `rustc 1.38.0-nightly (dddb7fca0 2019-07-30)` where the `1.38.0` is the toolchain version, and `nightly` is the channel (stable/beta/nightly). To install another toolchain use ```bash rustup toolchain install nightly diff --git a/rust-toolchain b/rust-toolchain index 664d942..ed6f0c1 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -nightly-2019-07-16 +nightly-2019-07-31 diff --git a/src/compat.rs b/src/compat.rs index b36f765..66e2fe1 100644 --- a/src/compat.rs +++ b/src/compat.rs @@ -94,124 +94,3 @@ pub(crate) trait ClientFutureExt { } impl ClientFutureExt for T {} - -/// Emulate `send_all`/`SendAll` from futures 0.1 since the 0.3 versions don't -/// work with Tokio `Handle`s due to ownership differences. -pub(crate) trait SinkCompat { - fn send_all_compat(self, stream: S) -> SendAllCompat - where - S: Stream + Unpin, - Self: Sink + Sized + Unpin, - { - SendAllCompat::new(self, stream) - } -} - -impl> SinkCompat for S {} - -#[derive(Debug)] -#[must_use = "futures do nothing unless polled"] -pub(crate) struct SendAllCompat -where - Si: Sink + Unpin, - St: Stream + Unpin, -{ - sink: Option, - stream: Option>, - buffered: Option, -} - -impl Unpin for SendAllCompat -where - Si: Sink + Unpin, - St: Stream + Unpin, -{ -} - -impl SendAllCompat -where - Si: Sink + Unpin, - St: Stream + Unpin, -{ - pub(crate) fn new(sink: Si, stream: St) -> SendAllCompat { - SendAllCompat { - sink: Some(sink), - stream: Some(stream.fuse()), - buffered: None, - } - } - - fn sink_mut(&mut self) -> Pin<&mut Si> { - Pin::new( - self.sink - .as_mut() - .take() - .expect("Attempted to poll SendAllCompat after completion"), - ) - } - - fn stream_mut(&mut self) -> Pin<&mut stream::Fuse> { - Pin::new( - self.stream - .as_mut() - .take() - .expect("Attempted to poll SendAllCompat after completion"), - ) - } - - fn take_result(&mut self) -> (Si, St) { - let sink = self - .sink - .take() - .expect("Attempted to poll SendAllCompat after completion"); - let fuse = self - .stream - .take() - .expect("Attempted to poll SendAllCompat after completion"); - (sink, fuse.into_inner()) - } - - fn try_start_send( - &mut self, - item: St::Item, - cx: &mut Context, - ) -> Poll> { - debug_assert!(self.buffered.is_none()); - match self.sink_mut().poll_ready(cx) { - Poll::Ready(Ok(())) => Poll::Ready(self.sink_mut().start_send(item)), - Poll::Ready(Err(e)) => Poll::Ready(Err(e)), - Poll::Pending => { - self.buffered = Some(item); - Poll::Pending - } - } - } -} - -impl Future for SendAllCompat -where - Si: Sink + Unpin, - St: Stream + Unpin, -{ - type Output = Result<((Si, St)), Si::Error>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - if let Some(item) = self.buffered.take() { - ready!(self.try_start_send(item, cx))? - } - - loop { - match self.stream_mut().poll_next(cx) { - Poll::Ready(Some(item)) => ready!(self.try_start_send(item, cx))?, - Poll::Ready(None) => { - ready!(self.sink_mut().poll_close(cx))?; - return Poll::Ready(Ok(self.take_result())); - } - Poll::Pending => { - ready!(self.sink_mut().poll_flush(cx))?; - return Poll::Pending; - } - } - } - } -} diff --git a/src/errors.rs b/src/errors.rs index 65d12e1..0adad84 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -198,8 +198,10 @@ impl Error { }) } - pub(crate) fn internal_error(message: String) -> Self { - Error::from(ErrorKind::InternalError { message }) + pub(crate) fn internal_error(message: impl Into) -> Self { + Error::from(ErrorKind::InternalError { + message: message.into(), + }) } } diff --git a/src/rpc/pd/client.rs b/src/rpc/pd/client.rs index 08ac6cc..83dec1f 100644 --- a/src/rpc/pd/client.rs +++ b/src/rpc/pd/client.rs @@ -16,8 +16,8 @@ use kvproto::{metapb, pdpb}; use crate::{ rpc::{ pd::{ - context::request_context, request::retry_request, timestamp::PdReactor, Region, - RegionId, StoreId, Timestamp, + context::request_context, request::retry_request, timestamp::Tso, Region, RegionId, + StoreId, Timestamp, }, security::SecurityManager, }, @@ -108,6 +108,7 @@ impl PdClient for RetryClient { } fn get_timestamp(self: Arc) -> BoxFuture<'static, Result> { + // FIXME: retry or reconnect on error Box::pin(self.cluster.read().unwrap().get_timestamp()) } } @@ -142,7 +143,7 @@ pub struct Cluster { pub id: u64, pub(super) client: pdpb::PdClient, members: pdpb::GetMembersResponse, - reactor: Arc>, + tso: Tso, } // These methods make a single attempt to make a request. @@ -256,7 +257,7 @@ impl Cluster { } fn get_timestamp(&self) -> impl Future> { - self.reactor.write().unwrap().get_timestamp() + self.tso.clone().get_timestamp() } } @@ -281,14 +282,13 @@ impl Connection { let (client, members) = self.try_connect_leader(&members, timeout)?; let id = members.get_header().get_cluster_id(); + let tso = Tso::new(id, &client)?; let cluster = Cluster { id, members, client, - reactor: Arc::new(RwLock::new(PdReactor::new())), + tso, }; - - PdReactor::start(cluster.reactor.clone(), &cluster); Ok(cluster) } @@ -307,14 +307,14 @@ impl Connection { warn!("updating pd client, blocking the tokio core"); let start = Instant::now(); let (client, members) = self.try_connect_leader(&old_cluster.members, timeout)?; + let tso = Tso::new(old_cluster.id, &client)?; let cluster = Cluster { id: old_cluster.id, client, members, - reactor: old_cluster.reactor.clone(), + tso, }; - PdReactor::start(cluster.reactor.clone(), &cluster); *self.last_update.write().unwrap() = Instant::now(); warn!("updating PD client done, spent {:?}", start.elapsed()); diff --git a/src/rpc/pd/timestamp.rs b/src/rpc/pd/timestamp.rs index 07e26fd..c917912 100644 --- a/src/rpc/pd/timestamp.rs +++ b/src/rpc/pd/timestamp.rs @@ -3,226 +3,185 @@ //! This module is the low-level mechanisms for getting timestamps from a PD //! cluster. It should be used via the `get_timestamp` API in `PdClient`. -use std::{ - sync::{Arc, RwLock}, - thread::{self, JoinHandle}, +use super::Timestamp; +use crate::{Error, Result}; + +use futures::{ + channel::{mpsc, oneshot}, + compat::*, + executor::block_on, + join, pin_mut, + prelude::*, + task::{Context, Poll, Waker}, }; +use grpcio::{ClientDuplexReceiver, ClientDuplexSender, WriteFlags}; +use kvproto::pdpb::{PdClient, *}; +use std::{cell::RefCell, collections::VecDeque, pin::Pin, rc::Rc, thread}; -use futures::channel::{ - mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender}, - oneshot, -}; -use futures::compat::{Compat01As03, Compat01As03Sink}; -use futures::future::TryFutureExt; -use futures::prelude::*; -use grpcio::WriteFlags; -use kvproto::pdpb; -use tokio_core::reactor::{Core, Handle as TokioHandle}; +const MAX_PENDING_COUNT: usize = 64; -use crate::{ - compat::SinkCompat, - rpc::pd::{ - client::Cluster, - context::{observe_tso_batch, request_context}, - Timestamp, - }, - Result, -}; - -type TsoChannel = oneshot::Sender; - -enum Task { - Init, - Request, - Response(Vec>, pdpb::TsoResponse), +/// The timestamp oracle which provides monotonically increasing timestamps. +#[derive(Clone)] +pub struct Tso { + /// The transmitter of a bounded channel which transports the sender of an oneshot channel to + /// the TSO working thread. + /// In the working thread, the `oneshot::Sender` is used to send back timestamp results. + result_sender_tx: mpsc::Sender>, } -impl Task { - fn dispatch( - self, - reactor: Arc>, +impl Tso { + pub fn new(cluster_id: u64, pd_client: &PdClient) -> Result { + let (result_sender_tx, result_sender_rx) = mpsc::channel(MAX_PENDING_COUNT); + + // Start a background thread to handle TSO requests and responses + let worker = TsoWorker::new(cluster_id, pd_client, result_sender_rx)?; + thread::spawn(move || block_on(worker.run())); + + Ok(Tso { result_sender_tx }) + } + + pub async fn get_timestamp(mut self) -> Result { + let (result_sender, result_receiver) = oneshot::channel(); + self.result_sender_tx + .send(result_sender) + .await + .map_err(|_| Error::internal_error("Result sender channel is closed"))?; + Ok(result_receiver.await?) + } +} + +struct TsoWorker { + cluster_id: u64, + result_sender_rx: mpsc::Receiver>, + rpc_sender: Compat01As03Sink, (TsoRequest, WriteFlags)>, + rpc_receiver: Compat01As03>, +} + +impl TsoWorker { + fn new( cluster_id: u64, - pd_client: &pdpb::PdClient, - handle: &TokioHandle, - ) { - match self { - Task::Request => reactor.write().unwrap().tso_request(cluster_id), - Task::Response(requests, response) => { - reactor.write().unwrap().tso_response(requests, response) - } - Task::Init => PdReactor::init(reactor, pd_client, handle), - } - } -} - -/// A special-purpose event loop for asynchronously requesting timestamps. This is -/// more complex than just sending a request since requests are batched on the client. -pub(super) struct PdReactor { - // These fields are for communicating internally within the reactor to initialize - // it and send communicate between threads. - task_tx: Option>, - tso_tx: Sender, - tso_rx: Option>, - - // These fields are for communicating with the PD cluster to get batches of timestamps. - handle: Option>, - tso_pending: Option>, - tso_buffer: Option>, - tso_batch: Vec, -} - -impl Drop for PdReactor { - fn drop(&mut self) { - if let Some(handle) = self.handle.take() { - handle.join().unwrap(); - } - } -} - -impl PdReactor { - pub(super) fn new() -> Self { - let (tso_tx, tso_rx) = mpsc::channel(1); - PdReactor { - task_tx: None, - tso_tx, - tso_rx: Some(tso_rx), - handle: None, - tso_buffer: Some(Vec::with_capacity(8)), - tso_batch: Vec::with_capacity(8), - tso_pending: None, - } + pd_client: &PdClient, + result_sender_rx: mpsc::Receiver>, + ) -> Result { + // FIXME: use tso_opt + let (rpc_sender, rpc_receiver) = pd_client.tso()?; + Ok(TsoWorker { + cluster_id, + result_sender_rx, + rpc_sender: rpc_sender.sink_compat(), + rpc_receiver: rpc_receiver.compat(), + }) } - /// Startup the reactor, including the communication thread if necessary. - pub(super) fn start(r: Arc>, cluster: &Cluster) { - let reactor = &mut r.write().unwrap(); - if reactor.handle.is_none() { - info!("starting pd reactor thread"); - let (task_tx, task_rx) = mpsc::unbounded(); - task_tx.unbounded_send(Task::Init).unwrap(); - let pd_client = cluster.client.clone(); - let cluster_id = cluster.id; - reactor.task_tx = Some(task_tx); - let r2 = r.clone(); - reactor.handle = Some( - thread::Builder::new() - .name("dispatcher thread".to_owned()) - .spawn(move || PdReactor::poll(r2, cluster_id, pd_client, task_rx)) - .unwrap(), - ) - } else { - warn!("tso sender and receiver are stale, refreshing..."); - let (tso_tx, tso_rx) = mpsc::channel(1); - reactor.tso_tx = tso_tx; - reactor.tso_rx = Some(tso_rx); - reactor.schedule(Task::Init); - } - } + async fn run(mut self) { + let ctx = Rc::new(RefCell::new(TsoContext { + cluster_id: self.cluster_id, + pending_queue: VecDeque::with_capacity(MAX_PENDING_COUNT), + waker: None, + })); - fn poll( - r: Arc>, - cluster_id: u64, - pd_client: pdpb::PdClient, - task_rx: UnboundedReceiver, - ) { - let mut core = Core::new().unwrap(); - let handle = core.handle(); + let result_sender_rx = self.result_sender_rx; + pin_mut!(result_sender_rx); + let mut request_stream = TsoRequestStream { + result_sender_rx, + ctx: ctx.clone(), + }; - let f = task_rx - .map(|t| { - t.dispatch(r.clone(), cluster_id, &pd_client, &handle); - }) - .collect::>(); - core.run(TryFutureExt::compat(f.unit_error())).unwrap(); - } + let send_requests = self.rpc_sender.send_all(&mut request_stream); - // Initialise client for communicating with the PD cluster. - fn init(r: Arc>, pd_client: &pdpb::PdClient, handle: &TokioHandle) { - let (tx, rx) = pd_client.tso().unwrap(); - let tx = Compat01As03Sink::new(tx); - let rx = Compat01As03::new(rx); - let tso_rx = r.write().unwrap().tso_rx.take().unwrap(); // Receiver: Stream - - handle.spawn( - tx.sink_map_err(Into::into) - .send_all_compat(tso_rx.map(|r| (r, WriteFlags::default()))) - .map(|r: Result<_>| match r { - Ok((_sender, _)) => { - // FIXME(#54) the previous code doesn't work because we can't get mutable - // access to the underlying StreamingCallSink to call `cancel`. But I think - // that is OK because it will be canceled when it is dropped. - // - // _sender.get_mut().get_ref().cancel(); - Ok(()) - } - Err(e) => { - error!("failed to send tso requests: {:?}", e); - Err(()) - } - }) - .compat(), - ); - - handle.spawn( - rx.try_for_each(move |resp| { - let reactor = &mut r.write().unwrap(); - let tso_pending = reactor.tso_pending.take().unwrap(); - reactor.schedule(Task::Response(tso_pending, resp)); - if !reactor.tso_batch.is_empty() { - // Schedule another tso_batch of request - reactor.schedule(Task::Request); + let mut rpc_receiver = self.rpc_receiver; + let receive_and_handle_responses = async move { + while let Some(Ok(resp)) = rpc_receiver.next().await { + let mut ctx = ctx.borrow_mut(); + ctx.allocate_timestamps(&resp)?; + if let Some(waker) = &ctx.waker { + waker.wake_by_ref(); } - future::ready(Ok(())) - }) - .map_err(|e| panic!("unexpected error: {:?}", e)) - .compat(), - ); - } + } + Err(Error::internal_error("TSO stream terminated")) + }; - fn tso_request(&mut self, cluster_id: u64) { - let mut tso_batch = self.tso_buffer.take().unwrap(); - tso_batch.extend(self.tso_batch.drain(..)); - let mut request = pd_request!(cluster_id, pdpb::TsoRequest); - let batch_size = tso_batch.len(); - observe_tso_batch(batch_size); - request.count = batch_size as u32; - self.tso_pending = Some(tso_batch); - self.tso_tx - .try_send(request) - .expect("channel can never be full"); - } - - fn tso_response(&mut self, mut requests: Vec, response: pdpb::TsoResponse) { - let timestamp = response.get_timestamp(); - for (offset, request) in requests.drain(..).enumerate() { - request - .send(Timestamp { - physical: timestamp.physical, - logical: timestamp.logical + offset as i64, - }) - .unwrap(); - } - self.tso_buffer = Some(requests); - } - - fn schedule(&self, task: Task) { - self.task_tx - .as_ref() - .unwrap() - .unbounded_send(task) - .expect("unbounded send should never fail"); - } - - pub fn get_timestamp(&mut self) -> impl Future> { - let context = request_context("get_ts"); - let (tx, rx) = oneshot::channel::(); - self.tso_batch.push(tx); - if self.tso_pending.is_none() { - // Schedule tso request to run. - self.schedule(Task::Request); - } - rx.map_err(Into::into) - .into_future() - .map(move |r| context.done(r)) + let _: (_, Result<()>) = join!(send_requests, receive_and_handle_responses); + } +} + +struct TsoRequestStream<'a> { + result_sender_rx: Pin<&'a mut mpsc::Receiver>>, + ctx: Rc>, +} + +impl<'a> Stream for TsoRequestStream<'a> { + type Item = (TsoRequest, WriteFlags); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let ctx = self.ctx.clone(); + let mut ctx = ctx.borrow_mut(); + + // Set the waker to the context, then the stream can be waked up after the pending queue + // is no longer full. + if ctx.waker.is_none() { + ctx.waker = Some(cx.waker().clone()); + } + + let mut count = 0; + while ctx.pending_queue.len() < MAX_PENDING_COUNT { + match self.result_sender_rx.as_mut().poll_next(cx) { + Poll::Ready(Some(sender)) => { + ctx.pending_queue.push_back(sender); + count += 1; + } + Poll::Ready(None) => return Poll::Ready(None), + Poll::Pending => break, + } + } + + if count > 0 { + let req = TsoRequest { + header: Some(RequestHeader { + cluster_id: ctx.cluster_id, + }), + count, + }; + let write_flags = WriteFlags::default().buffer_hint(false); + Poll::Ready(Some((req, write_flags))) + } else { + Poll::Pending + } + } +} + +struct TsoContext { + cluster_id: u64, + pending_queue: VecDeque>, + waker: Option, +} + +impl TsoContext { + fn allocate_timestamps(&mut self, resp: &TsoResponse) -> Result<()> { + // PD returns the timestamp with the biggest logical value. We can send back timestamps + // whose logical value is from `logical - count + 1` to `logical` using the senders + // in `pending`. + let tail_ts = resp + .timestamp + .as_ref() + .ok_or_else(|| Error::internal_error("No timestamp in TsoResponse"))?; + let mut offset = resp.count as i64; + while offset > 0 { + offset -= 1; + if let Some(sender) = self.pending_queue.pop_front() { + let ts = Timestamp { + physical: tail_ts.physical, + logical: tail_ts.logical - offset, + }; + + // It doesn't matter if the receiver of the channel is dropped. + let _ = sender.send(ts); + } else { + Err(Error::internal_error( + "PD gives more timestamps than expected", + ))? + } + } + Ok(()) } } diff --git a/src/transaction/client.rs b/src/transaction/client.rs index d82da83..ff59190 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -100,7 +100,7 @@ impl Client { /// # }); /// ``` pub async fn current_timestamp(&self) -> Result { - Arc::clone(&self.rpc).get_timestamp().await + self.rpc.clone().get_timestamp().await } } diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index ac8c032..88a6b15 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -9,7 +9,7 @@ use tikv_client::{transaction::*, Config, Result}; #[test] fn get_timestamp() -> Fallible<()> { - const COUNT: usize = 1 << 12; + const COUNT: usize = 1 << 16; let mut pool = ThreadPool::new()?; let config = Config::new(pd_addrs()); let fut = async { @@ -20,7 +20,7 @@ fn get_timestamp() -> Fallible<()> { let mut versions = pool .run(fut)? .into_iter() - .map(|res| res.map(|ts| ts.physical << 18 + ts.logical)) + .map(|res| res.map(|ts| (ts.physical << 18) + ts.logical)) .collect::>>()?; // Each version should be unique From 52ea5dd60071692e239da983191c5e97ce6dcf17 Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Thu, 1 Aug 2019 02:08:31 +0800 Subject: [PATCH 3/8] Fix CI by changing PD_ADDR to PD_ADDRS Signed-off-by: Yilin Chen --- .travis.yml | 2 +- src/proptests/mod.rs | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/.travis.yml b/.travis.yml index 82eaad6..ed763ac 100644 --- a/.travis.yml +++ b/.travis.yml @@ -45,4 +45,4 @@ script: - if [[ $TRAVIS_OS_NAME == "linux" ]]; then docker ps; fi - if [[ $TRAVIS_OS_NAME == "linux" ]]; then docker logs pd; fi - if [[ $TRAVIS_OS_NAME == "linux" ]]; then docker logs kv; fi - - if [[ $TRAVIS_OS_NAME == "linux" ]]; then PD_ADDR="127.0.0.1:2379" cargo test --all --features integration-tests -- --nocapture; fi + - if [[ $TRAVIS_OS_NAME == "linux" ]]; then PD_ADDRS="127.0.0.1:2379" cargo test --all --features integration-tests -- --nocapture; fi diff --git a/src/proptests/mod.rs b/src/proptests/mod.rs index 7e0144d..6760ddc 100644 --- a/src/proptests/mod.rs +++ b/src/proptests/mod.rs @@ -9,7 +9,7 @@ use std::env::var; mod raw; -pub(crate) const ENV_PD_ADDR: &str = "PD_ADDR"; +pub(crate) const ENV_PD_ADDRS: &str = "PD_ADDRS"; pub(crate) const PROPTEST_KEY_MAX: usize = 1024 * 2; // 2 KB pub(crate) const PROPTEST_VALUE_MAX: usize = 1024 * 16; // 16 KB pub(crate) const PROPTEST_BATCH_SIZE_MAX: usize = 16; @@ -22,9 +22,9 @@ pub fn arb_batch( proptest::collection::vec(single_strategy, 0..max_batch_size) } -pub fn pd_addr() -> Vec { - var(ENV_PD_ADDR) - .expect(&format!("Expected {}:", ENV_PD_ADDR)) +pub fn pd_addrs() -> Vec { + var(ENV_PD_ADDRS) + .expect(&format!("Expected {}:", ENV_PD_ADDRS)) .split(",") .map(From::from) .collect() From 3522d33696e66d364d7423be928844adf8d622fa Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Thu, 1 Aug 2019 02:44:18 +0800 Subject: [PATCH 4/8] Fix CI by changing PD_ADDR to PD_ADDRS Signed-off-by: Yilin Chen --- src/proptests/raw.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/proptests/raw.rs b/src/proptests/raw.rs index 972016e..a76c83b 100644 --- a/src/proptests/raw.rs +++ b/src/proptests/raw.rs @@ -1,6 +1,6 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. -use super::{arb_batch, pd_addr}; +use super::{arb_batch, pd_addrs}; use crate::{raw::Client, Config, KvPair, Value}; use futures::executor::block_on; use proptest::{arbitrary::any, proptest}; @@ -12,7 +12,7 @@ proptest! { fn point( pair in any::(), ) { - let client = block_on(Client::connect(Config::new(pd_addr()))).unwrap(); + let client = block_on(Client::connect(Config::new(pd_addrs()))).unwrap(); block_on( client.put(pair.key().clone(), pair.value().clone()) @@ -36,7 +36,7 @@ proptest! { fn batch( kvs in arb_batch(any::(), None), ) { - let client = block_on(Client::connect(Config::new(pd_addr()))).unwrap(); + let client = block_on(Client::connect(Config::new(pd_addrs()))).unwrap(); let keys = kvs.iter().map(|kv| kv.key()).cloned().collect::>(); block_on( From 8f6479b9ad1d6037fda1f3321e560ae4d927d0b0 Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Thu, 1 Aug 2019 10:32:12 +0800 Subject: [PATCH 5/8] Address CI warnings and some comments Signed-off-by: Yilin Chen --- src/rpc/pd/client.rs | 10 +++++----- src/rpc/pd/timestamp.rs | 22 +++++++++++----------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/rpc/pd/client.rs b/src/rpc/pd/client.rs index 83dec1f..fc6f083 100644 --- a/src/rpc/pd/client.rs +++ b/src/rpc/pd/client.rs @@ -16,8 +16,8 @@ use kvproto::{metapb, pdpb}; use crate::{ rpc::{ pd::{ - context::request_context, request::retry_request, timestamp::Tso, Region, RegionId, - StoreId, Timestamp, + context::request_context, request::retry_request, timestamp::TimestampOracle, Region, + RegionId, StoreId, Timestamp, }, security::SecurityManager, }, @@ -143,7 +143,7 @@ pub struct Cluster { pub id: u64, pub(super) client: pdpb::PdClient, members: pdpb::GetMembersResponse, - tso: Tso, + tso: TimestampOracle, } // These methods make a single attempt to make a request. @@ -282,7 +282,7 @@ impl Connection { let (client, members) = self.try_connect_leader(&members, timeout)?; let id = members.get_header().get_cluster_id(); - let tso = Tso::new(id, &client)?; + let tso = TimestampOracle::new(id, &client)?; let cluster = Cluster { id, members, @@ -307,7 +307,7 @@ impl Connection { warn!("updating pd client, blocking the tokio core"); let start = Instant::now(); let (client, members) = self.try_connect_leader(&old_cluster.members, timeout)?; - let tso = Tso::new(old_cluster.id, &client)?; + let tso = TimestampOracle::new(old_cluster.id, &client)?; let cluster = Cluster { id: old_cluster.id, diff --git a/src/rpc/pd/timestamp.rs b/src/rpc/pd/timestamp.rs index c917912..e31a992 100644 --- a/src/rpc/pd/timestamp.rs +++ b/src/rpc/pd/timestamp.rs @@ -22,22 +22,22 @@ const MAX_PENDING_COUNT: usize = 64; /// The timestamp oracle which provides monotonically increasing timestamps. #[derive(Clone)] -pub struct Tso { - /// The transmitter of a bounded channel which transports the sender of an oneshot channel to +pub struct TimestampOracle { + /// The transmitter of a bounded channel which transports the sender of a oneshot channel to /// the TSO working thread. /// In the working thread, the `oneshot::Sender` is used to send back timestamp results. result_sender_tx: mpsc::Sender>, } -impl Tso { - pub fn new(cluster_id: u64, pd_client: &PdClient) -> Result { +impl TimestampOracle { + pub fn new(cluster_id: u64, pd_client: &PdClient) -> Result { let (result_sender_tx, result_sender_rx) = mpsc::channel(MAX_PENDING_COUNT); // Start a background thread to handle TSO requests and responses let worker = TsoWorker::new(cluster_id, pd_client, result_sender_rx)?; thread::spawn(move || block_on(worker.run())); - Ok(Tso { result_sender_tx }) + Ok(TimestampOracle { result_sender_tx }) } pub async fn get_timestamp(mut self) -> Result { @@ -75,7 +75,6 @@ impl TsoWorker { async fn run(mut self) { let ctx = Rc::new(RefCell::new(TsoContext { - cluster_id: self.cluster_id, pending_queue: VecDeque::with_capacity(MAX_PENDING_COUNT), waker: None, })); @@ -83,6 +82,7 @@ impl TsoWorker { let result_sender_rx = self.result_sender_rx; pin_mut!(result_sender_rx); let mut request_stream = TsoRequestStream { + cluster_id: self.cluster_id, result_sender_rx, ctx: ctx.clone(), }; @@ -106,6 +106,7 @@ impl TsoWorker { } struct TsoRequestStream<'a> { + cluster_id: u64, result_sender_rx: Pin<&'a mut mpsc::Receiver>>, ctx: Rc>, } @@ -138,7 +139,7 @@ impl<'a> Stream for TsoRequestStream<'a> { if count > 0 { let req = TsoRequest { header: Some(RequestHeader { - cluster_id: ctx.cluster_id, + cluster_id: self.cluster_id, }), count, }; @@ -151,7 +152,6 @@ impl<'a> Stream for TsoRequestStream<'a> { } struct TsoContext { - cluster_id: u64, pending_queue: VecDeque>, waker: Option, } @@ -165,7 +165,7 @@ impl TsoContext { .timestamp .as_ref() .ok_or_else(|| Error::internal_error("No timestamp in TsoResponse"))?; - let mut offset = resp.count as i64; + let mut offset = i64::from(resp.count); while offset > 0 { offset -= 1; if let Some(sender) = self.pending_queue.pop_front() { @@ -177,9 +177,9 @@ impl TsoContext { // It doesn't matter if the receiver of the channel is dropped. let _ = sender.send(ts); } else { - Err(Error::internal_error( + return Err(Error::internal_error( "PD gives more timestamps than expected", - ))? + )); } } Ok(()) From aea6538919146c3d30597aca7ae60d0bfc0557d5 Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Thu, 1 Aug 2019 10:57:24 +0800 Subject: [PATCH 6/8] Clarify that the abbreviation TSO refers to timestamp oracle Signed-off-by: Yilin Chen --- src/rpc/pd/timestamp.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rpc/pd/timestamp.rs b/src/rpc/pd/timestamp.rs index e31a992..5d4015d 100644 --- a/src/rpc/pd/timestamp.rs +++ b/src/rpc/pd/timestamp.rs @@ -20,7 +20,7 @@ use std::{cell::RefCell, collections::VecDeque, pin::Pin, rc::Rc, thread}; const MAX_PENDING_COUNT: usize = 64; -/// The timestamp oracle which provides monotonically increasing timestamps. +/// The timestamp oracle (TSO) which provides monotonically increasing timestamps. #[derive(Clone)] pub struct TimestampOracle { /// The transmitter of a bounded channel which transports the sender of a oneshot channel to From 31511aa9f0c1fb8b8f9915b3fbe405518f234bca Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Mon, 5 Aug 2019 16:04:38 +0800 Subject: [PATCH 7/8] Do some refactoring and add some more docs Signed-off-by: Yilin Chen --- src/rpc/client.rs | 2 +- src/rpc/pd/timestamp.rs | 217 +++++++++++++++++++++------------------- 2 files changed, 113 insertions(+), 106 deletions(-) diff --git a/src/rpc/client.rs b/src/rpc/client.rs index 02984ae..6b2df52 100644 --- a/src/rpc/client.rs +++ b/src/rpc/client.rs @@ -226,7 +226,7 @@ impl RpcClient { } pub fn get_timestamp(self: Arc) -> impl Future> { - Arc::clone(&self.pd).get_timestamp() + self.pd.clone().get_timestamp() } // Returns a Steam which iterates over the contexts for each region covered by range. diff --git a/src/rpc/pd/timestamp.rs b/src/rpc/pd/timestamp.rs index 5d4015d..2b38392 100644 --- a/src/rpc/pd/timestamp.rs +++ b/src/rpc/pd/timestamp.rs @@ -2,6 +2,14 @@ //! This module is the low-level mechanisms for getting timestamps from a PD //! cluster. It should be used via the `get_timestamp` API in `PdClient`. +//! +//! Once a `TimestampOracle` is created, there will be two futures running in a background working +//! thread created automatically. The `get_timestamp` method creates a oneshot channel whose +//! transmitter is served as a `TimestampRequest`. `TimestampRequest`s are sent to the working +//! thread through a bounded multi-producer, single-consumer channel. The first future tries to +//! exhaust the channel to get as many requests as possible and sends a single `TsoRequest` to the +//! PD server. The other future receives `TsoResponse`s from the PD server and allocates timestamps +//! for the requests. use super::Timestamp; use crate::{Error, Result}; @@ -12,123 +20,122 @@ use futures::{ executor::block_on, join, pin_mut, prelude::*, - task::{Context, Poll, Waker}, + task::{AtomicWaker, Context, Poll}, }; -use grpcio::{ClientDuplexReceiver, ClientDuplexSender, WriteFlags}; +use grpcio::WriteFlags; use kvproto::pdpb::{PdClient, *}; use std::{cell::RefCell, collections::VecDeque, pin::Pin, rc::Rc, thread}; -const MAX_PENDING_COUNT: usize = 64; +/// It is an empirical value. +const MAX_BATCH_SIZE: usize = 64; + +/// TODO: This value should be adjustable. +const MAX_PENDING_COUNT: usize = 1 << 16; + +type TimestampRequest = oneshot::Sender; /// The timestamp oracle (TSO) which provides monotonically increasing timestamps. #[derive(Clone)] pub struct TimestampOracle { - /// The transmitter of a bounded channel which transports the sender of a oneshot channel to - /// the TSO working thread. - /// In the working thread, the `oneshot::Sender` is used to send back timestamp results. - result_sender_tx: mpsc::Sender>, + /// The transmitter of a bounded channel which transports requests of getting a single + /// timestamp to the TSO working thread. A bounded channel is used to prevent using + /// too much memory unexpectedly. + /// In the working thread, the `TimestampRequest`, which is actually a one channel sender, + /// is used to send back the timestamp result. + request_tx: mpsc::Sender, } impl TimestampOracle { pub fn new(cluster_id: u64, pd_client: &PdClient) -> Result { - let (result_sender_tx, result_sender_rx) = mpsc::channel(MAX_PENDING_COUNT); + let (request_tx, request_rx) = mpsc::channel(MAX_BATCH_SIZE); + // FIXME: use tso_opt + let (rpc_sender, rpc_receiver) = pd_client.tso()?; // Start a background thread to handle TSO requests and responses - let worker = TsoWorker::new(cluster_id, pd_client, result_sender_rx)?; - thread::spawn(move || block_on(worker.run())); + thread::spawn(move || { + block_on(run_tso( + cluster_id, + rpc_sender.sink_compat().sink_err_into(), + rpc_receiver.compat().err_into(), + request_rx, + )) + }); - Ok(TimestampOracle { result_sender_tx }) + Ok(TimestampOracle { request_tx }) } pub async fn get_timestamp(mut self) -> Result { - let (result_sender, result_receiver) = oneshot::channel(); - self.result_sender_tx - .send(result_sender) + let (request, response) = oneshot::channel(); + self.request_tx + .send(request) .await - .map_err(|_| Error::internal_error("Result sender channel is closed"))?; - Ok(result_receiver.await?) + .map_err(|_| Error::internal_error("TimestampRequest channel is closed"))?; + Ok(response.await?) } } -struct TsoWorker { +async fn run_tso( cluster_id: u64, - result_sender_rx: mpsc::Receiver>, - rpc_sender: Compat01As03Sink, (TsoRequest, WriteFlags)>, - rpc_receiver: Compat01As03>, -} + mut rpc_sender: impl Sink<(TsoRequest, WriteFlags), Error = Error> + Unpin, + mut rpc_receiver: impl Stream> + Unpin, + request_rx: mpsc::Receiver, +) { + // The `TimestampRequest`s which are waiting for the responses from the PD server + let pending_requests = Rc::new(RefCell::new(VecDeque::with_capacity(MAX_PENDING_COUNT))); -impl TsoWorker { - fn new( - cluster_id: u64, - pd_client: &PdClient, - result_sender_rx: mpsc::Receiver>, - ) -> Result { - // FIXME: use tso_opt - let (rpc_sender, rpc_receiver) = pd_client.tso()?; - Ok(TsoWorker { - cluster_id, - result_sender_rx, - rpc_sender: rpc_sender.sink_compat(), - rpc_receiver: rpc_receiver.compat(), - }) - } + // When there are too many pending requests, the `send_request` future will refuse to fetch + // more requests from the bounded channel. This waker is used to wake up the sending future + // if the queue containing pending requests is no longer full. + let sending_future_waker = Rc::new(AtomicWaker::new()); - async fn run(mut self) { - let ctx = Rc::new(RefCell::new(TsoContext { - pending_queue: VecDeque::with_capacity(MAX_PENDING_COUNT), - waker: None, - })); + pin_mut!(request_rx); + let mut request_stream = TsoRequestStream { + cluster_id, + request_rx, + pending_requests: pending_requests.clone(), + self_waker: sending_future_waker.clone(), + }; - let result_sender_rx = self.result_sender_rx; - pin_mut!(result_sender_rx); - let mut request_stream = TsoRequestStream { - cluster_id: self.cluster_id, - result_sender_rx, - ctx: ctx.clone(), - }; + let send_requests = rpc_sender.send_all(&mut request_stream); - let send_requests = self.rpc_sender.send_all(&mut request_stream); + let receive_and_handle_responses = async move { + while let Some(Ok(resp)) = rpc_receiver.next().await { + let mut pending_requests = pending_requests.borrow_mut(); - let mut rpc_receiver = self.rpc_receiver; - let receive_and_handle_responses = async move { - while let Some(Ok(resp)) = rpc_receiver.next().await { - let mut ctx = ctx.borrow_mut(); - ctx.allocate_timestamps(&resp)?; - if let Some(waker) = &ctx.waker { - waker.wake_by_ref(); - } + // Wake up the sending future blocked by too many pending requests as we are consuming + // some of them here. + if pending_requests.len() == MAX_PENDING_COUNT { + sending_future_waker.wake(); } - Err(Error::internal_error("TSO stream terminated")) - }; - let _: (_, Result<()>) = join!(send_requests, receive_and_handle_responses); - } + allocate_timestamps(&resp, &mut pending_requests)?; + } + Err(Error::internal_error("TSO stream terminated")) + }; + + let (send_res, recv_res): (_, Result<()>) = join!(send_requests, receive_and_handle_responses); + error!("TSO send error: {:?}", send_res); + error!("TSO receive error: {:?}", recv_res); } struct TsoRequestStream<'a> { cluster_id: u64, - result_sender_rx: Pin<&'a mut mpsc::Receiver>>, - ctx: Rc>, + request_rx: Pin<&'a mut mpsc::Receiver>>, + pending_requests: Rc>>, + self_waker: Rc, } impl<'a> Stream for TsoRequestStream<'a> { type Item = (TsoRequest, WriteFlags); fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let ctx = self.ctx.clone(); - let mut ctx = ctx.borrow_mut(); - - // Set the waker to the context, then the stream can be waked up after the pending queue - // is no longer full. - if ctx.waker.is_none() { - ctx.waker = Some(cx.waker().clone()); - } - + let pending_requests = self.pending_requests.clone(); + let mut pending_requests = pending_requests.borrow_mut(); let mut count = 0; - while ctx.pending_queue.len() < MAX_PENDING_COUNT { - match self.result_sender_rx.as_mut().poll_next(cx) { + while count < MAX_BATCH_SIZE && pending_requests.len() < MAX_PENDING_COUNT { + match self.request_rx.as_mut().poll_next(cx) { Poll::Ready(Some(sender)) => { - ctx.pending_queue.push_back(sender); + pending_requests.push_back(sender); count += 1; } Poll::Ready(None) => return Poll::Ready(None), @@ -141,47 +148,47 @@ impl<'a> Stream for TsoRequestStream<'a> { header: Some(RequestHeader { cluster_id: self.cluster_id, }), - count, + count: count as u32, }; let write_flags = WriteFlags::default().buffer_hint(false); Poll::Ready(Some((req, write_flags))) } else { + // Set the waker to the context, then the stream can be waked up after the pending queue + // is no longer full. + self.self_waker.register(cx.waker()); Poll::Pending } } } -struct TsoContext { - pending_queue: VecDeque>, - waker: Option, -} +fn allocate_timestamps( + resp: &TsoResponse, + pending_requests: &mut VecDeque, +) -> Result<()> { + // PD returns the timestamp with the biggest logical value. We can send back timestamps + // whose logical value is from `logical - count + 1` to `logical` using the senders + // in `pending`. + let tail_ts = resp + .timestamp + .as_ref() + .ok_or_else(|| Error::internal_error("No timestamp in TsoResponse"))?; -impl TsoContext { - fn allocate_timestamps(&mut self, resp: &TsoResponse) -> Result<()> { - // PD returns the timestamp with the biggest logical value. We can send back timestamps - // whose logical value is from `logical - count + 1` to `logical` using the senders - // in `pending`. - let tail_ts = resp - .timestamp - .as_ref() - .ok_or_else(|| Error::internal_error("No timestamp in TsoResponse"))?; - let mut offset = i64::from(resp.count); - while offset > 0 { - offset -= 1; - if let Some(sender) = self.pending_queue.pop_front() { - let ts = Timestamp { - physical: tail_ts.physical, - logical: tail_ts.logical - offset, - }; + let mut offset = i64::from(resp.count); + while offset > 0 { + offset -= 1; + if let Some(sender) = pending_requests.pop_front() { + let ts = Timestamp { + physical: tail_ts.physical, + logical: tail_ts.logical - offset, + }; - // It doesn't matter if the receiver of the channel is dropped. - let _ = sender.send(ts); - } else { - return Err(Error::internal_error( - "PD gives more timestamps than expected", - )); - } + // It doesn't matter if the receiver of the channel is dropped. + let _ = sender.send(ts); + } else { + return Err(Error::internal_error( + "PD gives more timestamps than expected", + )); } - Ok(()) } + Ok(()) } From c64281fbd493791091668ad5ee11bd9b954d3d6e Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Mon, 5 Aug 2019 20:44:45 +0800 Subject: [PATCH 8/8] Refine some docs Signed-off-by: Yilin Chen --- src/rpc/pd/timestamp.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/rpc/pd/timestamp.rs b/src/rpc/pd/timestamp.rs index 2b38392..ee44499 100644 --- a/src/rpc/pd/timestamp.rs +++ b/src/rpc/pd/timestamp.rs @@ -6,10 +6,10 @@ //! Once a `TimestampOracle` is created, there will be two futures running in a background working //! thread created automatically. The `get_timestamp` method creates a oneshot channel whose //! transmitter is served as a `TimestampRequest`. `TimestampRequest`s are sent to the working -//! thread through a bounded multi-producer, single-consumer channel. The first future tries to -//! exhaust the channel to get as many requests as possible and sends a single `TsoRequest` to the -//! PD server. The other future receives `TsoResponse`s from the PD server and allocates timestamps -//! for the requests. +//! thread through a bounded multi-producer, single-consumer channel. Every time the first future +//! is polled, it tries to exhaust the channel to get as many requests as possible and sends a +//! single `TsoRequest` to the PD server. The other future receives `TsoResponse`s from the PD +//! server and allocates timestamps for the requests. use super::Timestamp; use crate::{Error, Result}; @@ -23,7 +23,7 @@ use futures::{ task::{AtomicWaker, Context, Poll}, }; use grpcio::WriteFlags; -use kvproto::pdpb::{PdClient, *}; +use kvproto::pdpb::*; use std::{cell::RefCell, collections::VecDeque, pin::Pin, rc::Rc, thread}; /// It is an empirical value.