Merge branch 'fix-tso' into enable-async-await

This commit is contained in:
Yilin Chen 2019-08-05 16:28:47 +08:00
commit 67f741acb0
13 changed files with 198 additions and 350 deletions

View File

@ -45,4 +45,4 @@ script:
- if [[ $TRAVIS_OS_NAME == "linux" ]]; then docker ps; fi - if [[ $TRAVIS_OS_NAME == "linux" ]]; then docker ps; fi
- if [[ $TRAVIS_OS_NAME == "linux" ]]; then docker logs pd; fi - if [[ $TRAVIS_OS_NAME == "linux" ]]; then docker logs pd; fi
- if [[ $TRAVIS_OS_NAME == "linux" ]]; then docker logs kv; fi - if [[ $TRAVIS_OS_NAME == "linux" ]]; then docker logs kv; fi
- if [[ $TRAVIS_OS_NAME == "linux" ]]; then PD_ADDR="127.0.0.1:2379" cargo test --all --features integration-tests -- --nocapture; fi - if [[ $TRAVIS_OS_NAME == "linux" ]]; then PD_ADDRS="127.0.0.1:2379" cargo test --all --features integration-tests -- --nocapture; fi

View File

@ -20,7 +20,7 @@ name = "tikv_client"
[dependencies] [dependencies]
derive-new = "0.5" derive-new = "0.5"
failure = "0.1" failure = "0.1"
futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] } futures-preview = { version = "0.3.0-alpha.17", features = ["compat", "async-await", "nightly"] }
grpcio = { version = "0.5.0-alpha", features = [ "secure", "prost-codec" ], default-features = false } grpcio = { version = "0.5.0-alpha", features = [ "secure", "prost-codec" ], default-features = false }
kvproto = { git = "https://github.com/pingcap/kvproto.git", rev = "ec9df5f208a768074c28d04bfb8b90e5321d2f69", features = [ "prost-codec" ], default-features = false } kvproto = { git = "https://github.com/pingcap/kvproto.git", rev = "ec9df5f208a768074c28d04bfb8b90e5321d2f69", features = [ "prost-codec" ], default-features = false }
lazy_static = "1" lazy_static = "1"
@ -28,7 +28,6 @@ log = "0.3.9"
regex = "1" regex = "1"
serde = "1.0" serde = "1.0"
serde_derive = "1.0" serde_derive = "1.0"
tokio-core = "0.1"
tokio-timer = "0.2" tokio-timer = "0.2"
[dependencies.prometheus] [dependencies.prometheus]

View File

@ -14,7 +14,9 @@ This is an open source (Apache 2) project hosted by the Cloud Native Computing F
## Using the client ## Using the client
The TiKV client is a Rust library (crate). It requires a nightly Rust compiler with async/await support. The TiKV client is a Rust library (crate). It uses async/await internally and exposes some `async fn` APIs as well.
Async/await is a new feature in Rust and is currently unstable. To use it you'll need to add the feature flag `#![async_await]` to your crate and use a nightly compiler (see below).
To use this crate in your project, add it as a dependency in your `Cargo.toml`: To use this crate in your project, add it as a dependency in your `Cargo.toml`:
@ -50,7 +52,7 @@ To check what version of Rust you are using, run
rustc --version rustc --version
``` ```
You'll see something like `rustc 1.38.0-nightly (4b65a86eb 2019-07-15)` where the `1.38.0` is the toolchain version, and `nightly` is the channel (stable/beta/nightly). To install another toolchain use You'll see something like `rustc 1.38.0-nightly (dddb7fca0 2019-07-30)` where the `1.38.0` is the toolchain version, and `nightly` is the channel (stable/beta/nightly). To install another toolchain use
```bash ```bash
rustup toolchain install nightly rustup toolchain install nightly

View File

@ -1 +1 @@
nightly-2019-07-16 nightly-2019-07-31

View File

@ -94,124 +94,3 @@ pub(crate) trait ClientFutureExt {
} }
impl<T: TryFuture> ClientFutureExt for T {} impl<T: TryFuture> ClientFutureExt for T {}
/// Emulate `send_all`/`SendAll` from futures 0.1 since the 0.3 versions don't
/// work with Tokio `Handle`s due to ownership differences.
pub(crate) trait SinkCompat<I, E> {
fn send_all_compat<S>(self, stream: S) -> SendAllCompat<Self, S>
where
S: Stream<Item = I> + Unpin,
Self: Sink<I, Error = E> + Sized + Unpin,
{
SendAllCompat::new(self, stream)
}
}
impl<T, E, S: Sink<T, Error = E>> SinkCompat<T, E> for S {}
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub(crate) struct SendAllCompat<Si, St>
where
Si: Sink<St::Item> + Unpin,
St: Stream + Unpin,
{
sink: Option<Si>,
stream: Option<stream::Fuse<St>>,
buffered: Option<St::Item>,
}
impl<Si, St> Unpin for SendAllCompat<Si, St>
where
Si: Sink<St::Item> + Unpin,
St: Stream + Unpin,
{
}
impl<Si, St> SendAllCompat<Si, St>
where
Si: Sink<St::Item> + Unpin,
St: Stream + Unpin,
{
pub(crate) fn new(sink: Si, stream: St) -> SendAllCompat<Si, St> {
SendAllCompat {
sink: Some(sink),
stream: Some(stream.fuse()),
buffered: None,
}
}
fn sink_mut(&mut self) -> Pin<&mut Si> {
Pin::new(
self.sink
.as_mut()
.take()
.expect("Attempted to poll SendAllCompat after completion"),
)
}
fn stream_mut(&mut self) -> Pin<&mut stream::Fuse<St>> {
Pin::new(
self.stream
.as_mut()
.take()
.expect("Attempted to poll SendAllCompat after completion"),
)
}
fn take_result(&mut self) -> (Si, St) {
let sink = self
.sink
.take()
.expect("Attempted to poll SendAllCompat after completion");
let fuse = self
.stream
.take()
.expect("Attempted to poll SendAllCompat after completion");
(sink, fuse.into_inner())
}
fn try_start_send(
&mut self,
item: St::Item,
cx: &mut Context,
) -> Poll<Result<(()), Si::Error>> {
debug_assert!(self.buffered.is_none());
match self.sink_mut().poll_ready(cx) {
Poll::Ready(Ok(())) => Poll::Ready(self.sink_mut().start_send(item)),
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => {
self.buffered = Some(item);
Poll::Pending
}
}
}
}
impl<Si, St> Future for SendAllCompat<Si, St>
where
Si: Sink<St::Item> + Unpin,
St: Stream + Unpin,
{
type Output = Result<((Si, St)), Si::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<((Si, St)), Si::Error>> {
if let Some(item) = self.buffered.take() {
ready!(self.try_start_send(item, cx))?
}
loop {
match self.stream_mut().poll_next(cx) {
Poll::Ready(Some(item)) => ready!(self.try_start_send(item, cx))?,
Poll::Ready(None) => {
ready!(self.sink_mut().poll_close(cx))?;
return Poll::Ready(Ok(self.take_result()));
}
Poll::Pending => {
ready!(self.sink_mut().poll_flush(cx))?;
return Poll::Pending;
}
}
}
}
}

View File

@ -198,8 +198,10 @@ impl Error {
}) })
} }
pub(crate) fn internal_error(message: String) -> Self { pub(crate) fn internal_error(message: impl Into<String>) -> Self {
Error::from(ErrorKind::InternalError { message }) Error::from(ErrorKind::InternalError {
message: message.into(),
})
} }
} }

View File

@ -9,7 +9,7 @@ use std::env::var;
mod raw; mod raw;
pub(crate) const ENV_PD_ADDR: &str = "PD_ADDR"; pub(crate) const ENV_PD_ADDRS: &str = "PD_ADDRS";
pub(crate) const PROPTEST_KEY_MAX: usize = 1024 * 2; // 2 KB pub(crate) const PROPTEST_KEY_MAX: usize = 1024 * 2; // 2 KB
pub(crate) const PROPTEST_VALUE_MAX: usize = 1024 * 16; // 16 KB pub(crate) const PROPTEST_VALUE_MAX: usize = 1024 * 16; // 16 KB
pub(crate) const PROPTEST_BATCH_SIZE_MAX: usize = 16; pub(crate) const PROPTEST_BATCH_SIZE_MAX: usize = 16;
@ -22,9 +22,9 @@ pub fn arb_batch<T: core::fmt::Debug>(
proptest::collection::vec(single_strategy, 0..max_batch_size) proptest::collection::vec(single_strategy, 0..max_batch_size)
} }
pub fn pd_addr() -> Vec<String> { pub fn pd_addrs() -> Vec<String> {
var(ENV_PD_ADDR) var(ENV_PD_ADDRS)
.expect(&format!("Expected {}:", ENV_PD_ADDR)) .expect(&format!("Expected {}:", ENV_PD_ADDRS))
.split(",") .split(",")
.map(From::from) .map(From::from)
.collect() .collect()

View File

@ -1,6 +1,6 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use super::{arb_batch, pd_addr}; use super::{arb_batch, pd_addrs};
use crate::{raw::Client, Config, KvPair, Value}; use crate::{raw::Client, Config, KvPair, Value};
use futures::executor::block_on; use futures::executor::block_on;
use proptest::{arbitrary::any, proptest}; use proptest::{arbitrary::any, proptest};
@ -12,7 +12,7 @@ proptest! {
fn point( fn point(
pair in any::<KvPair>(), pair in any::<KvPair>(),
) { ) {
let client = block_on(Client::connect(Config::new(pd_addr()))).unwrap(); let client = block_on(Client::connect(Config::new(pd_addrs()))).unwrap();
block_on( block_on(
client.put(pair.key().clone(), pair.value().clone()) client.put(pair.key().clone(), pair.value().clone())
@ -36,7 +36,7 @@ proptest! {
fn batch( fn batch(
kvs in arb_batch(any::<KvPair>(), None), kvs in arb_batch(any::<KvPair>(), None),
) { ) {
let client = block_on(Client::connect(Config::new(pd_addr()))).unwrap(); let client = block_on(Client::connect(Config::new(pd_addrs()))).unwrap();
let keys = kvs.iter().map(|kv| kv.key()).cloned().collect::<Vec<_>>(); let keys = kvs.iter().map(|kv| kv.key()).cloned().collect::<Vec<_>>();
block_on( block_on(

View File

@ -226,7 +226,7 @@ impl<PdC: PdClient> RpcClient<PdC> {
} }
pub fn get_timestamp(self: Arc<Self>) -> impl Future<Output = Result<Timestamp>> { pub fn get_timestamp(self: Arc<Self>) -> impl Future<Output = Result<Timestamp>> {
Arc::clone(&self.pd).get_timestamp() self.pd.clone().get_timestamp()
} }
// Returns a Steam which iterates over the contexts for each region covered by range. // Returns a Steam which iterates over the contexts for each region covered by range.

View File

@ -16,7 +16,7 @@ use kvproto::{metapb, pdpb};
use crate::{ use crate::{
rpc::{ rpc::{
pd::{ pd::{
context::request_context, request::retry_request, timestamp::PdReactor, Region, context::request_context, request::retry_request, timestamp::TimestampOracle, Region,
RegionId, StoreId, Timestamp, RegionId, StoreId, Timestamp,
}, },
security::SecurityManager, security::SecurityManager,
@ -108,6 +108,7 @@ impl PdClient for RetryClient {
} }
fn get_timestamp(self: Arc<Self>) -> BoxFuture<'static, Result<Timestamp>> { fn get_timestamp(self: Arc<Self>) -> BoxFuture<'static, Result<Timestamp>> {
// FIXME: retry or reconnect on error
Box::pin(self.cluster.read().unwrap().get_timestamp()) Box::pin(self.cluster.read().unwrap().get_timestamp())
} }
} }
@ -142,7 +143,7 @@ pub struct Cluster {
pub id: u64, pub id: u64,
pub(super) client: pdpb::PdClient, pub(super) client: pdpb::PdClient,
members: pdpb::GetMembersResponse, members: pdpb::GetMembersResponse,
reactor: Arc<RwLock<PdReactor>>, tso: TimestampOracle,
} }
// These methods make a single attempt to make a request. // These methods make a single attempt to make a request.
@ -256,7 +257,7 @@ impl Cluster {
} }
fn get_timestamp(&self) -> impl Future<Output = Result<Timestamp>> { fn get_timestamp(&self) -> impl Future<Output = Result<Timestamp>> {
self.reactor.write().unwrap().get_timestamp() self.tso.clone().get_timestamp()
} }
} }
@ -281,14 +282,13 @@ impl Connection {
let (client, members) = self.try_connect_leader(&members, timeout)?; let (client, members) = self.try_connect_leader(&members, timeout)?;
let id = members.get_header().get_cluster_id(); let id = members.get_header().get_cluster_id();
let tso = TimestampOracle::new(id, &client)?;
let cluster = Cluster { let cluster = Cluster {
id, id,
members, members,
client, client,
reactor: Arc::new(RwLock::new(PdReactor::new())), tso,
}; };
PdReactor::start(cluster.reactor.clone(), &cluster);
Ok(cluster) Ok(cluster)
} }
@ -307,14 +307,14 @@ impl Connection {
warn!("updating pd client, blocking the tokio core"); warn!("updating pd client, blocking the tokio core");
let start = Instant::now(); let start = Instant::now();
let (client, members) = self.try_connect_leader(&old_cluster.members, timeout)?; let (client, members) = self.try_connect_leader(&old_cluster.members, timeout)?;
let tso = TimestampOracle::new(old_cluster.id, &client)?;
let cluster = Cluster { let cluster = Cluster {
id: old_cluster.id, id: old_cluster.id,
client, client,
members, members,
reactor: old_cluster.reactor.clone(), tso,
}; };
PdReactor::start(cluster.reactor.clone(), &cluster);
*self.last_update.write().unwrap() = Instant::now(); *self.last_update.write().unwrap() = Instant::now();
warn!("updating PD client done, spent {:?}", start.elapsed()); warn!("updating PD client done, spent {:?}", start.elapsed());

View File

@ -2,227 +2,193 @@
//! This module is the low-level mechanisms for getting timestamps from a PD //! This module is the low-level mechanisms for getting timestamps from a PD
//! cluster. It should be used via the `get_timestamp` API in `PdClient`. //! cluster. It should be used via the `get_timestamp` API in `PdClient`.
//!
//! Once a `TimestampOracle` is created, there will be two futures running in a background working
//! thread created automatically. The `get_timestamp` method creates a oneshot channel whose
//! transmitter is served as a `TimestampRequest`. `TimestampRequest`s are sent to the working
//! thread through a bounded multi-producer, single-consumer channel. The first future tries to
//! exhaust the channel to get as many requests as possible and sends a single `TsoRequest` to the
//! PD server. The other future receives `TsoResponse`s from the PD server and allocates timestamps
//! for the requests.
use std::{ use super::Timestamp;
sync::{Arc, RwLock}, use crate::{Error, Result};
thread::{self, JoinHandle},
};
use futures::channel::{ use futures::{
mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender}, channel::{mpsc, oneshot},
oneshot, compat::*,
executor::block_on,
join, pin_mut,
prelude::*,
task::{AtomicWaker, Context, Poll},
}; };
use futures::compat::{Compat01As03, Compat01As03Sink};
use futures::future::TryFutureExt;
use futures::prelude::*;
use grpcio::WriteFlags; use grpcio::WriteFlags;
use kvproto::pdpb; use kvproto::pdpb::{PdClient, *};
use tokio_core::reactor::{Core, Handle as TokioHandle}; use std::{cell::RefCell, collections::VecDeque, pin::Pin, rc::Rc, thread};
use crate::{ /// It is an empirical value.
compat::SinkCompat, const MAX_BATCH_SIZE: usize = 64;
rpc::pd::{
client::Cluster, /// TODO: This value should be adjustable.
context::{observe_tso_batch, request_context}, const MAX_PENDING_COUNT: usize = 1 << 16;
Timestamp,
}, type TimestampRequest = oneshot::Sender<Timestamp>;
Result,
/// The timestamp oracle (TSO) which provides monotonically increasing timestamps.
#[derive(Clone)]
pub struct TimestampOracle {
/// The transmitter of a bounded channel which transports requests of getting a single
/// timestamp to the TSO working thread. A bounded channel is used to prevent using
/// too much memory unexpectedly.
/// In the working thread, the `TimestampRequest`, which is actually a one channel sender,
/// is used to send back the timestamp result.
request_tx: mpsc::Sender<TimestampRequest>,
}
impl TimestampOracle {
pub fn new(cluster_id: u64, pd_client: &PdClient) -> Result<TimestampOracle> {
let (request_tx, request_rx) = mpsc::channel(MAX_BATCH_SIZE);
// FIXME: use tso_opt
let (rpc_sender, rpc_receiver) = pd_client.tso()?;
// Start a background thread to handle TSO requests and responses
thread::spawn(move || {
block_on(run_tso(
cluster_id,
rpc_sender.sink_compat().sink_err_into(),
rpc_receiver.compat().err_into(),
request_rx,
))
});
Ok(TimestampOracle { request_tx })
}
pub async fn get_timestamp(mut self) -> Result<Timestamp> {
let (request, response) = oneshot::channel();
self.request_tx
.send(request)
.await
.map_err(|_| Error::internal_error("TimestampRequest channel is closed"))?;
Ok(response.await?)
}
}
async fn run_tso(
cluster_id: u64,
mut rpc_sender: impl Sink<(TsoRequest, WriteFlags), Error = Error> + Unpin,
mut rpc_receiver: impl Stream<Item = Result<TsoResponse>> + Unpin,
request_rx: mpsc::Receiver<TimestampRequest>,
) {
// The `TimestampRequest`s which are waiting for the responses from the PD server
let pending_requests = Rc::new(RefCell::new(VecDeque::with_capacity(MAX_PENDING_COUNT)));
// When there are too many pending requests, the `send_request` future will refuse to fetch
// more requests from the bounded channel. This waker is used to wake up the sending future
// if the queue containing pending requests is no longer full.
let sending_future_waker = Rc::new(AtomicWaker::new());
pin_mut!(request_rx);
let mut request_stream = TsoRequestStream {
cluster_id,
request_rx,
pending_requests: pending_requests.clone(),
self_waker: sending_future_waker.clone(),
}; };
type TsoChannel = oneshot::Sender<Timestamp>; let send_requests = rpc_sender.send_all(&mut request_stream);
enum Task { let receive_and_handle_responses = async move {
Init, while let Some(Ok(resp)) = rpc_receiver.next().await {
Request, let mut pending_requests = pending_requests.borrow_mut();
Response(Vec<oneshot::Sender<Timestamp>>, pdpb::TsoResponse),
// Wake up the sending future blocked by too many pending requests as we are consuming
// some of them here.
if pending_requests.len() == MAX_PENDING_COUNT {
sending_future_waker.wake();
} }
impl Task { allocate_timestamps(&resp, &mut pending_requests)?;
fn dispatch( }
self, Err(Error::internal_error("TSO stream terminated"))
reactor: Arc<RwLock<PdReactor>>, };
let (send_res, recv_res): (_, Result<()>) = join!(send_requests, receive_and_handle_responses);
error!("TSO send error: {:?}", send_res);
error!("TSO receive error: {:?}", recv_res);
}
struct TsoRequestStream<'a> {
cluster_id: u64, cluster_id: u64,
pd_client: &pdpb::PdClient, request_rx: Pin<&'a mut mpsc::Receiver<oneshot::Sender<Timestamp>>>,
handle: &TokioHandle, pending_requests: Rc<RefCell<VecDeque<TimestampRequest>>>,
) { self_waker: Rc<AtomicWaker>,
match self {
Task::Request => reactor.write().unwrap().tso_request(cluster_id),
Task::Response(requests, response) => {
reactor.write().unwrap().tso_response(requests, response)
} }
Task::Init => PdReactor::init(reactor, pd_client, handle),
impl<'a> Stream for TsoRequestStream<'a> {
type Item = (TsoRequest, WriteFlags);
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let pending_requests = self.pending_requests.clone();
let mut pending_requests = pending_requests.borrow_mut();
let mut count = 0;
while count < MAX_BATCH_SIZE && pending_requests.len() < MAX_PENDING_COUNT {
match self.request_rx.as_mut().poll_next(cx) {
Poll::Ready(Some(sender)) => {
pending_requests.push_back(sender);
count += 1;
} }
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => break,
} }
} }
/// A special-purpose event loop for asynchronously requesting timestamps. This is if count > 0 {
/// more complex than just sending a request since requests are batched on the client. let req = TsoRequest {
pub(super) struct PdReactor { header: Some(RequestHeader {
// These fields are for communicating internally within the reactor to initialize cluster_id: self.cluster_id,
// it and send communicate between threads. }),
task_tx: Option<UnboundedSender<Task>>, count: count as u32,
tso_tx: Sender<pdpb::TsoRequest>, };
tso_rx: Option<Receiver<pdpb::TsoRequest>>, let write_flags = WriteFlags::default().buffer_hint(false);
Poll::Ready(Some((req, write_flags)))
// These fields are for communicating with the PD cluster to get batches of timestamps.
handle: Option<JoinHandle<()>>,
tso_pending: Option<Vec<TsoChannel>>,
tso_buffer: Option<Vec<TsoChannel>>,
tso_batch: Vec<TsoChannel>,
}
impl Drop for PdReactor {
fn drop(&mut self) {
if let Some(handle) = self.handle.take() {
handle.join().unwrap();
}
}
}
impl PdReactor {
pub(super) fn new() -> Self {
let (tso_tx, tso_rx) = mpsc::channel(1);
PdReactor {
task_tx: None,
tso_tx,
tso_rx: Some(tso_rx),
handle: None,
tso_buffer: Some(Vec::with_capacity(8)),
tso_batch: Vec::with_capacity(8),
tso_pending: None,
}
}
/// Startup the reactor, including the communication thread if necessary.
pub(super) fn start(r: Arc<RwLock<Self>>, cluster: &Cluster) {
let reactor = &mut r.write().unwrap();
if reactor.handle.is_none() {
info!("starting pd reactor thread");
let (task_tx, task_rx) = mpsc::unbounded();
task_tx.unbounded_send(Task::Init).unwrap();
let pd_client = cluster.client.clone();
let cluster_id = cluster.id;
reactor.task_tx = Some(task_tx);
let r2 = r.clone();
reactor.handle = Some(
thread::Builder::new()
.name("dispatcher thread".to_owned())
.spawn(move || PdReactor::poll(r2, cluster_id, pd_client, task_rx))
.unwrap(),
)
} else { } else {
warn!("tso sender and receiver are stale, refreshing..."); // Set the waker to the context, then the stream can be waked up after the pending queue
let (tso_tx, tso_rx) = mpsc::channel(1); // is no longer full.
reactor.tso_tx = tso_tx; self.self_waker.register(cx.waker());
reactor.tso_rx = Some(tso_rx); Poll::Pending
reactor.schedule(Task::Init); }
} }
} }
fn poll( fn allocate_timestamps(
r: Arc<RwLock<Self>>, resp: &TsoResponse,
cluster_id: u64, pending_requests: &mut VecDeque<TimestampRequest>,
pd_client: pdpb::PdClient, ) -> Result<()> {
task_rx: UnboundedReceiver<Task>, // PD returns the timestamp with the biggest logical value. We can send back timestamps
) { // whose logical value is from `logical - count + 1` to `logical` using the senders
let mut core = Core::new().unwrap(); // in `pending`.
let handle = core.handle(); let tail_ts = resp
.timestamp
.as_ref()
.ok_or_else(|| Error::internal_error("No timestamp in TsoResponse"))?;
let f = task_rx let mut offset = i64::from(resp.count);
.map(|t| { while offset > 0 {
t.dispatch(r.clone(), cluster_id, &pd_client, &handle); offset -= 1;
}) if let Some(sender) = pending_requests.pop_front() {
.collect::<Vec<()>>(); let ts = Timestamp {
core.run(TryFutureExt::compat(f.unit_error())).unwrap(); physical: tail_ts.physical,
logical: tail_ts.logical - offset,
};
// It doesn't matter if the receiver of the channel is dropped.
let _ = sender.send(ts);
} else {
return Err(Error::internal_error(
"PD gives more timestamps than expected",
));
}
} }
// Initialise client for communicating with the PD cluster.
fn init(r: Arc<RwLock<Self>>, pd_client: &pdpb::PdClient, handle: &TokioHandle) {
let (tx, rx) = pd_client.tso().unwrap();
let tx = Compat01As03Sink::new(tx);
let rx = Compat01As03::new(rx);
let tso_rx = r.write().unwrap().tso_rx.take().unwrap(); // Receiver<TsoRequest>: Stream
handle.spawn(
tx.sink_map_err(Into::into)
.send_all_compat(tso_rx.map(|r| (r, WriteFlags::default())))
.map(|r: Result<_>| match r {
Ok((_sender, _)) => {
// FIXME(#54) the previous code doesn't work because we can't get mutable
// access to the underlying StreamingCallSink to call `cancel`. But I think
// that is OK because it will be canceled when it is dropped.
//
// _sender.get_mut().get_ref().cancel();
Ok(()) Ok(())
} }
Err(e) => {
error!("failed to send tso requests: {:?}", e);
Err(())
}
})
.compat(),
);
handle.spawn(
rx.try_for_each(move |resp| {
let reactor = &mut r.write().unwrap();
let tso_pending = reactor.tso_pending.take().unwrap();
reactor.schedule(Task::Response(tso_pending, resp));
if !reactor.tso_batch.is_empty() {
// Schedule another tso_batch of request
reactor.schedule(Task::Request);
}
future::ready(Ok(()))
})
.map_err(|e| panic!("unexpected error: {:?}", e))
.compat(),
);
}
fn tso_request(&mut self, cluster_id: u64) {
let mut tso_batch = self.tso_buffer.take().unwrap();
tso_batch.extend(self.tso_batch.drain(..));
let mut request = pd_request!(cluster_id, pdpb::TsoRequest);
let batch_size = tso_batch.len();
observe_tso_batch(batch_size);
request.count = batch_size as u32;
self.tso_pending = Some(tso_batch);
self.tso_tx
.try_send(request)
.expect("channel can never be full");
}
fn tso_response(&mut self, mut requests: Vec<TsoChannel>, response: pdpb::TsoResponse) {
let timestamp = response.get_timestamp();
for (offset, request) in requests.drain(..).enumerate() {
request
.send(Timestamp {
physical: timestamp.physical,
logical: timestamp.logical + offset as i64,
})
.unwrap();
}
self.tso_buffer = Some(requests);
}
fn schedule(&self, task: Task) {
self.task_tx
.as_ref()
.unwrap()
.unbounded_send(task)
.expect("unbounded send should never fail");
}
pub fn get_timestamp(&mut self) -> impl Future<Output = Result<Timestamp>> {
let context = request_context("get_ts");
let (tx, rx) = oneshot::channel::<Timestamp>();
self.tso_batch.push(tx);
if self.tso_pending.is_none() {
// Schedule tso request to run.
self.schedule(Task::Request);
}
rx.map_err(Into::into)
.into_future()
.map(move |r| context.done(r))
}
}

View File

@ -102,7 +102,7 @@ impl Client {
/// # }); /// # });
/// ``` /// ```
pub async fn current_timestamp(&self) -> Result<Timestamp> { pub async fn current_timestamp(&self) -> Result<Timestamp> {
Arc::clone(&self.rpc).get_timestamp().await self.rpc.clone().get_timestamp().await
} }
} }

View File

@ -9,7 +9,7 @@ use tikv_client::{transaction::*, Config, Result};
#[test] #[test]
fn get_timestamp() -> Fallible<()> { fn get_timestamp() -> Fallible<()> {
const COUNT: usize = 1 << 12; const COUNT: usize = 1 << 16;
let mut pool = ThreadPool::new()?; let mut pool = ThreadPool::new()?;
let config = Config::new(pd_addrs()); let config = Config::new(pd_addrs());
let fut = async { let fut = async {
@ -20,7 +20,7 @@ fn get_timestamp() -> Fallible<()> {
let mut versions = pool let mut versions = pool
.run(fut)? .run(fut)?
.into_iter() .into_iter()
.map(|res| res.map(|ts| ts.physical << 18 + ts.logical)) .map(|res| res.map(|ts| (ts.physical << 18) + ts.logical))
.collect::<Result<Vec<_>>>()?; .collect::<Result<Vec<_>>>()?;
// Each version should be unique // Each version should be unique