mirror of https://github.com/tikv/client-rust.git
Merge pull request #92 from sticnarf/fix-tso
Use a new TSO client implementation
This commit is contained in:
commit
2733e28ece
|
|
@ -45,4 +45,4 @@ script:
|
||||||
- if [[ $TRAVIS_OS_NAME == "linux" ]]; then docker ps; fi
|
- if [[ $TRAVIS_OS_NAME == "linux" ]]; then docker ps; fi
|
||||||
- if [[ $TRAVIS_OS_NAME == "linux" ]]; then docker logs pd; fi
|
- if [[ $TRAVIS_OS_NAME == "linux" ]]; then docker logs pd; fi
|
||||||
- if [[ $TRAVIS_OS_NAME == "linux" ]]; then docker logs kv; fi
|
- if [[ $TRAVIS_OS_NAME == "linux" ]]; then docker logs kv; fi
|
||||||
- if [[ $TRAVIS_OS_NAME == "linux" ]]; then PD_ADDR="127.0.0.1:2379" cargo test --all --features integration-tests -- --nocapture; fi
|
- if [[ $TRAVIS_OS_NAME == "linux" ]]; then PD_ADDRS="127.0.0.1:2379" cargo test --all --features integration-tests -- --nocapture; fi
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,7 @@ name = "tikv_client"
|
||||||
[dependencies]
|
[dependencies]
|
||||||
derive-new = "0.5"
|
derive-new = "0.5"
|
||||||
failure = "0.1"
|
failure = "0.1"
|
||||||
futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] }
|
futures-preview = { version = "0.3.0-alpha.17", features = ["compat", "async-await", "nightly"] }
|
||||||
grpcio = { version = "0.5.0-alpha", features = [ "secure", "prost-codec" ], default-features = false }
|
grpcio = { version = "0.5.0-alpha", features = [ "secure", "prost-codec" ], default-features = false }
|
||||||
kvproto = { git = "https://github.com/pingcap/kvproto.git", rev = "ec9df5f208a768074c28d04bfb8b90e5321d2f69", features = [ "prost-codec" ], default-features = false }
|
kvproto = { git = "https://github.com/pingcap/kvproto.git", rev = "ec9df5f208a768074c28d04bfb8b90e5321d2f69", features = [ "prost-codec" ], default-features = false }
|
||||||
lazy_static = "1"
|
lazy_static = "1"
|
||||||
|
|
@ -28,7 +28,6 @@ log = "0.3.9"
|
||||||
regex = "1"
|
regex = "1"
|
||||||
serde = "1.0"
|
serde = "1.0"
|
||||||
serde_derive = "1.0"
|
serde_derive = "1.0"
|
||||||
tokio-core = "0.1"
|
|
||||||
tokio-timer = "0.2"
|
tokio-timer = "0.2"
|
||||||
|
|
||||||
[dependencies.prometheus]
|
[dependencies.prometheus]
|
||||||
|
|
|
||||||
121
src/compat.rs
121
src/compat.rs
|
|
@ -94,124 +94,3 @@ pub(crate) trait ClientFutureExt {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: TryFuture> ClientFutureExt for T {}
|
impl<T: TryFuture> ClientFutureExt for T {}
|
||||||
|
|
||||||
/// Emulate `send_all`/`SendAll` from futures 0.1 since the 0.3 versions don't
|
|
||||||
/// work with Tokio `Handle`s due to ownership differences.
|
|
||||||
pub(crate) trait SinkCompat<I, E> {
|
|
||||||
fn send_all_compat<S>(self, stream: S) -> SendAllCompat<Self, S>
|
|
||||||
where
|
|
||||||
S: Stream<Item = I> + Unpin,
|
|
||||||
Self: Sink<I, Error = E> + Sized + Unpin,
|
|
||||||
{
|
|
||||||
SendAllCompat::new(self, stream)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T, E, S: Sink<T, Error = E>> SinkCompat<T, E> for S {}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
#[must_use = "futures do nothing unless polled"]
|
|
||||||
pub(crate) struct SendAllCompat<Si, St>
|
|
||||||
where
|
|
||||||
Si: Sink<St::Item> + Unpin,
|
|
||||||
St: Stream + Unpin,
|
|
||||||
{
|
|
||||||
sink: Option<Si>,
|
|
||||||
stream: Option<stream::Fuse<St>>,
|
|
||||||
buffered: Option<St::Item>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<Si, St> Unpin for SendAllCompat<Si, St>
|
|
||||||
where
|
|
||||||
Si: Sink<St::Item> + Unpin,
|
|
||||||
St: Stream + Unpin,
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<Si, St> SendAllCompat<Si, St>
|
|
||||||
where
|
|
||||||
Si: Sink<St::Item> + Unpin,
|
|
||||||
St: Stream + Unpin,
|
|
||||||
{
|
|
||||||
pub(crate) fn new(sink: Si, stream: St) -> SendAllCompat<Si, St> {
|
|
||||||
SendAllCompat {
|
|
||||||
sink: Some(sink),
|
|
||||||
stream: Some(stream.fuse()),
|
|
||||||
buffered: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn sink_mut(&mut self) -> Pin<&mut Si> {
|
|
||||||
Pin::new(
|
|
||||||
self.sink
|
|
||||||
.as_mut()
|
|
||||||
.take()
|
|
||||||
.expect("Attempted to poll SendAllCompat after completion"),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn stream_mut(&mut self) -> Pin<&mut stream::Fuse<St>> {
|
|
||||||
Pin::new(
|
|
||||||
self.stream
|
|
||||||
.as_mut()
|
|
||||||
.take()
|
|
||||||
.expect("Attempted to poll SendAllCompat after completion"),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn take_result(&mut self) -> (Si, St) {
|
|
||||||
let sink = self
|
|
||||||
.sink
|
|
||||||
.take()
|
|
||||||
.expect("Attempted to poll SendAllCompat after completion");
|
|
||||||
let fuse = self
|
|
||||||
.stream
|
|
||||||
.take()
|
|
||||||
.expect("Attempted to poll SendAllCompat after completion");
|
|
||||||
(sink, fuse.into_inner())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn try_start_send(
|
|
||||||
&mut self,
|
|
||||||
item: St::Item,
|
|
||||||
cx: &mut Context,
|
|
||||||
) -> Poll<Result<(()), Si::Error>> {
|
|
||||||
debug_assert!(self.buffered.is_none());
|
|
||||||
match self.sink_mut().poll_ready(cx) {
|
|
||||||
Poll::Ready(Ok(())) => Poll::Ready(self.sink_mut().start_send(item)),
|
|
||||||
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
|
|
||||||
Poll::Pending => {
|
|
||||||
self.buffered = Some(item);
|
|
||||||
Poll::Pending
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<Si, St> Future for SendAllCompat<Si, St>
|
|
||||||
where
|
|
||||||
Si: Sink<St::Item> + Unpin,
|
|
||||||
St: Stream + Unpin,
|
|
||||||
{
|
|
||||||
type Output = Result<((Si, St)), Si::Error>;
|
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<((Si, St)), Si::Error>> {
|
|
||||||
if let Some(item) = self.buffered.take() {
|
|
||||||
ready!(self.try_start_send(item, cx))?
|
|
||||||
}
|
|
||||||
|
|
||||||
loop {
|
|
||||||
match self.stream_mut().poll_next(cx) {
|
|
||||||
Poll::Ready(Some(item)) => ready!(self.try_start_send(item, cx))?,
|
|
||||||
Poll::Ready(None) => {
|
|
||||||
ready!(self.sink_mut().poll_close(cx))?;
|
|
||||||
return Poll::Ready(Ok(self.take_result()));
|
|
||||||
}
|
|
||||||
Poll::Pending => {
|
|
||||||
ready!(self.sink_mut().poll_flush(cx))?;
|
|
||||||
return Poll::Pending;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -198,8 +198,10 @@ impl Error {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn internal_error(message: String) -> Self {
|
pub(crate) fn internal_error(message: impl Into<String>) -> Self {
|
||||||
Error::from(ErrorKind::InternalError { message })
|
Error::from(ErrorKind::InternalError {
|
||||||
|
message: message.into(),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,7 @@ use std::env::var;
|
||||||
|
|
||||||
mod raw;
|
mod raw;
|
||||||
|
|
||||||
pub(crate) const ENV_PD_ADDR: &str = "PD_ADDR";
|
pub(crate) const ENV_PD_ADDRS: &str = "PD_ADDRS";
|
||||||
pub(crate) const PROPTEST_KEY_MAX: usize = 1024 * 2; // 2 KB
|
pub(crate) const PROPTEST_KEY_MAX: usize = 1024 * 2; // 2 KB
|
||||||
pub(crate) const PROPTEST_VALUE_MAX: usize = 1024 * 16; // 16 KB
|
pub(crate) const PROPTEST_VALUE_MAX: usize = 1024 * 16; // 16 KB
|
||||||
pub(crate) const PROPTEST_BATCH_SIZE_MAX: usize = 16;
|
pub(crate) const PROPTEST_BATCH_SIZE_MAX: usize = 16;
|
||||||
|
|
@ -22,9 +22,9 @@ pub fn arb_batch<T: core::fmt::Debug>(
|
||||||
proptest::collection::vec(single_strategy, 0..max_batch_size)
|
proptest::collection::vec(single_strategy, 0..max_batch_size)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn pd_addr() -> Vec<String> {
|
pub fn pd_addrs() -> Vec<String> {
|
||||||
var(ENV_PD_ADDR)
|
var(ENV_PD_ADDRS)
|
||||||
.expect(&format!("Expected {}:", ENV_PD_ADDR))
|
.expect(&format!("Expected {}:", ENV_PD_ADDRS))
|
||||||
.split(",")
|
.split(",")
|
||||||
.map(From::from)
|
.map(From::from)
|
||||||
.collect()
|
.collect()
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
|
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
|
||||||
|
|
||||||
use super::{arb_batch, pd_addr};
|
use super::{arb_batch, pd_addrs};
|
||||||
use crate::{raw::Client, Config, KvPair, Value};
|
use crate::{raw::Client, Config, KvPair, Value};
|
||||||
use futures::executor::block_on;
|
use futures::executor::block_on;
|
||||||
use proptest::{arbitrary::any, proptest};
|
use proptest::{arbitrary::any, proptest};
|
||||||
|
|
@ -12,7 +12,7 @@ proptest! {
|
||||||
fn point(
|
fn point(
|
||||||
pair in any::<KvPair>(),
|
pair in any::<KvPair>(),
|
||||||
) {
|
) {
|
||||||
let client = block_on(Client::connect(Config::new(pd_addr()))).unwrap();
|
let client = block_on(Client::connect(Config::new(pd_addrs()))).unwrap();
|
||||||
|
|
||||||
block_on(
|
block_on(
|
||||||
client.put(pair.key().clone(), pair.value().clone())
|
client.put(pair.key().clone(), pair.value().clone())
|
||||||
|
|
@ -36,7 +36,7 @@ proptest! {
|
||||||
fn batch(
|
fn batch(
|
||||||
kvs in arb_batch(any::<KvPair>(), None),
|
kvs in arb_batch(any::<KvPair>(), None),
|
||||||
) {
|
) {
|
||||||
let client = block_on(Client::connect(Config::new(pd_addr()))).unwrap();
|
let client = block_on(Client::connect(Config::new(pd_addrs()))).unwrap();
|
||||||
let keys = kvs.iter().map(|kv| kv.key()).cloned().collect::<Vec<_>>();
|
let keys = kvs.iter().map(|kv| kv.key()).cloned().collect::<Vec<_>>();
|
||||||
|
|
||||||
block_on(
|
block_on(
|
||||||
|
|
|
||||||
|
|
@ -226,7 +226,7 @@ impl<PdC: PdClient> RpcClient<PdC> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_timestamp(self: Arc<Self>) -> impl Future<Output = Result<Timestamp>> {
|
pub fn get_timestamp(self: Arc<Self>) -> impl Future<Output = Result<Timestamp>> {
|
||||||
Arc::clone(&self.pd).get_timestamp()
|
self.pd.clone().get_timestamp()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns a Steam which iterates over the contexts for each region covered by range.
|
// Returns a Steam which iterates over the contexts for each region covered by range.
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,7 @@ use kvproto::{metapb, pdpb};
|
||||||
use crate::{
|
use crate::{
|
||||||
rpc::{
|
rpc::{
|
||||||
pd::{
|
pd::{
|
||||||
context::request_context, request::retry_request, timestamp::PdReactor, Region,
|
context::request_context, request::retry_request, timestamp::TimestampOracle, Region,
|
||||||
RegionId, StoreId, Timestamp,
|
RegionId, StoreId, Timestamp,
|
||||||
},
|
},
|
||||||
security::SecurityManager,
|
security::SecurityManager,
|
||||||
|
|
@ -108,6 +108,7 @@ impl PdClient for RetryClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_timestamp(self: Arc<Self>) -> BoxFuture<'static, Result<Timestamp>> {
|
fn get_timestamp(self: Arc<Self>) -> BoxFuture<'static, Result<Timestamp>> {
|
||||||
|
// FIXME: retry or reconnect on error
|
||||||
Box::pin(self.cluster.read().unwrap().get_timestamp())
|
Box::pin(self.cluster.read().unwrap().get_timestamp())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -142,7 +143,7 @@ pub struct Cluster {
|
||||||
pub id: u64,
|
pub id: u64,
|
||||||
pub(super) client: pdpb::PdClient,
|
pub(super) client: pdpb::PdClient,
|
||||||
members: pdpb::GetMembersResponse,
|
members: pdpb::GetMembersResponse,
|
||||||
reactor: Arc<RwLock<PdReactor>>,
|
tso: TimestampOracle,
|
||||||
}
|
}
|
||||||
|
|
||||||
// These methods make a single attempt to make a request.
|
// These methods make a single attempt to make a request.
|
||||||
|
|
@ -256,7 +257,7 @@ impl Cluster {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_timestamp(&self) -> impl Future<Output = Result<Timestamp>> {
|
fn get_timestamp(&self) -> impl Future<Output = Result<Timestamp>> {
|
||||||
self.reactor.write().unwrap().get_timestamp()
|
self.tso.clone().get_timestamp()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -281,14 +282,13 @@ impl Connection {
|
||||||
let (client, members) = self.try_connect_leader(&members, timeout)?;
|
let (client, members) = self.try_connect_leader(&members, timeout)?;
|
||||||
|
|
||||||
let id = members.get_header().get_cluster_id();
|
let id = members.get_header().get_cluster_id();
|
||||||
|
let tso = TimestampOracle::new(id, &client)?;
|
||||||
let cluster = Cluster {
|
let cluster = Cluster {
|
||||||
id,
|
id,
|
||||||
members,
|
members,
|
||||||
client,
|
client,
|
||||||
reactor: Arc::new(RwLock::new(PdReactor::new())),
|
tso,
|
||||||
};
|
};
|
||||||
|
|
||||||
PdReactor::start(cluster.reactor.clone(), &cluster);
|
|
||||||
Ok(cluster)
|
Ok(cluster)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -307,14 +307,14 @@ impl Connection {
|
||||||
warn!("updating pd client, blocking the tokio core");
|
warn!("updating pd client, blocking the tokio core");
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
let (client, members) = self.try_connect_leader(&old_cluster.members, timeout)?;
|
let (client, members) = self.try_connect_leader(&old_cluster.members, timeout)?;
|
||||||
|
let tso = TimestampOracle::new(old_cluster.id, &client)?;
|
||||||
|
|
||||||
let cluster = Cluster {
|
let cluster = Cluster {
|
||||||
id: old_cluster.id,
|
id: old_cluster.id,
|
||||||
client,
|
client,
|
||||||
members,
|
members,
|
||||||
reactor: old_cluster.reactor.clone(),
|
tso,
|
||||||
};
|
};
|
||||||
PdReactor::start(cluster.reactor.clone(), &cluster);
|
|
||||||
*self.last_update.write().unwrap() = Instant::now();
|
*self.last_update.write().unwrap() = Instant::now();
|
||||||
|
|
||||||
warn!("updating PD client done, spent {:?}", start.elapsed());
|
warn!("updating PD client done, spent {:?}", start.elapsed());
|
||||||
|
|
|
||||||
|
|
@ -2,227 +2,193 @@
|
||||||
|
|
||||||
//! This module is the low-level mechanisms for getting timestamps from a PD
|
//! This module is the low-level mechanisms for getting timestamps from a PD
|
||||||
//! cluster. It should be used via the `get_timestamp` API in `PdClient`.
|
//! cluster. It should be used via the `get_timestamp` API in `PdClient`.
|
||||||
|
//!
|
||||||
|
//! Once a `TimestampOracle` is created, there will be two futures running in a background working
|
||||||
|
//! thread created automatically. The `get_timestamp` method creates a oneshot channel whose
|
||||||
|
//! transmitter is served as a `TimestampRequest`. `TimestampRequest`s are sent to the working
|
||||||
|
//! thread through a bounded multi-producer, single-consumer channel. Every time the first future
|
||||||
|
//! is polled, it tries to exhaust the channel to get as many requests as possible and sends a
|
||||||
|
//! single `TsoRequest` to the PD server. The other future receives `TsoResponse`s from the PD
|
||||||
|
//! server and allocates timestamps for the requests.
|
||||||
|
|
||||||
use std::{
|
use super::Timestamp;
|
||||||
sync::{Arc, RwLock},
|
use crate::{Error, Result};
|
||||||
thread::{self, JoinHandle},
|
|
||||||
};
|
|
||||||
|
|
||||||
use futures::channel::{
|
use futures::{
|
||||||
mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender},
|
channel::{mpsc, oneshot},
|
||||||
oneshot,
|
compat::*,
|
||||||
|
executor::block_on,
|
||||||
|
join, pin_mut,
|
||||||
|
prelude::*,
|
||||||
|
task::{AtomicWaker, Context, Poll},
|
||||||
};
|
};
|
||||||
use futures::compat::{Compat01As03, Compat01As03Sink};
|
|
||||||
use futures::future::TryFutureExt;
|
|
||||||
use futures::prelude::*;
|
|
||||||
use grpcio::WriteFlags;
|
use grpcio::WriteFlags;
|
||||||
use kvproto::pdpb;
|
use kvproto::pdpb::*;
|
||||||
use tokio_core::reactor::{Core, Handle as TokioHandle};
|
use std::{cell::RefCell, collections::VecDeque, pin::Pin, rc::Rc, thread};
|
||||||
|
|
||||||
use crate::{
|
/// It is an empirical value.
|
||||||
compat::SinkCompat,
|
const MAX_BATCH_SIZE: usize = 64;
|
||||||
rpc::pd::{
|
|
||||||
client::Cluster,
|
/// TODO: This value should be adjustable.
|
||||||
context::{observe_tso_batch, request_context},
|
const MAX_PENDING_COUNT: usize = 1 << 16;
|
||||||
Timestamp,
|
|
||||||
},
|
type TimestampRequest = oneshot::Sender<Timestamp>;
|
||||||
Result,
|
|
||||||
|
/// The timestamp oracle (TSO) which provides monotonically increasing timestamps.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct TimestampOracle {
|
||||||
|
/// The transmitter of a bounded channel which transports requests of getting a single
|
||||||
|
/// timestamp to the TSO working thread. A bounded channel is used to prevent using
|
||||||
|
/// too much memory unexpectedly.
|
||||||
|
/// In the working thread, the `TimestampRequest`, which is actually a one channel sender,
|
||||||
|
/// is used to send back the timestamp result.
|
||||||
|
request_tx: mpsc::Sender<TimestampRequest>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TimestampOracle {
|
||||||
|
pub fn new(cluster_id: u64, pd_client: &PdClient) -> Result<TimestampOracle> {
|
||||||
|
let (request_tx, request_rx) = mpsc::channel(MAX_BATCH_SIZE);
|
||||||
|
// FIXME: use tso_opt
|
||||||
|
let (rpc_sender, rpc_receiver) = pd_client.tso()?;
|
||||||
|
|
||||||
|
// Start a background thread to handle TSO requests and responses
|
||||||
|
thread::spawn(move || {
|
||||||
|
block_on(run_tso(
|
||||||
|
cluster_id,
|
||||||
|
rpc_sender.sink_compat().sink_err_into(),
|
||||||
|
rpc_receiver.compat().err_into(),
|
||||||
|
request_rx,
|
||||||
|
))
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(TimestampOracle { request_tx })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_timestamp(mut self) -> Result<Timestamp> {
|
||||||
|
let (request, response) = oneshot::channel();
|
||||||
|
self.request_tx
|
||||||
|
.send(request)
|
||||||
|
.await
|
||||||
|
.map_err(|_| Error::internal_error("TimestampRequest channel is closed"))?;
|
||||||
|
Ok(response.await?)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run_tso(
|
||||||
|
cluster_id: u64,
|
||||||
|
mut rpc_sender: impl Sink<(TsoRequest, WriteFlags), Error = Error> + Unpin,
|
||||||
|
mut rpc_receiver: impl Stream<Item = Result<TsoResponse>> + Unpin,
|
||||||
|
request_rx: mpsc::Receiver<TimestampRequest>,
|
||||||
|
) {
|
||||||
|
// The `TimestampRequest`s which are waiting for the responses from the PD server
|
||||||
|
let pending_requests = Rc::new(RefCell::new(VecDeque::with_capacity(MAX_PENDING_COUNT)));
|
||||||
|
|
||||||
|
// When there are too many pending requests, the `send_request` future will refuse to fetch
|
||||||
|
// more requests from the bounded channel. This waker is used to wake up the sending future
|
||||||
|
// if the queue containing pending requests is no longer full.
|
||||||
|
let sending_future_waker = Rc::new(AtomicWaker::new());
|
||||||
|
|
||||||
|
pin_mut!(request_rx);
|
||||||
|
let mut request_stream = TsoRequestStream {
|
||||||
|
cluster_id,
|
||||||
|
request_rx,
|
||||||
|
pending_requests: pending_requests.clone(),
|
||||||
|
self_waker: sending_future_waker.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
type TsoChannel = oneshot::Sender<Timestamp>;
|
let send_requests = rpc_sender.send_all(&mut request_stream);
|
||||||
|
|
||||||
enum Task {
|
let receive_and_handle_responses = async move {
|
||||||
Init,
|
while let Some(Ok(resp)) = rpc_receiver.next().await {
|
||||||
Request,
|
let mut pending_requests = pending_requests.borrow_mut();
|
||||||
Response(Vec<oneshot::Sender<Timestamp>>, pdpb::TsoResponse),
|
|
||||||
|
// Wake up the sending future blocked by too many pending requests as we are consuming
|
||||||
|
// some of them here.
|
||||||
|
if pending_requests.len() == MAX_PENDING_COUNT {
|
||||||
|
sending_future_waker.wake();
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Task {
|
allocate_timestamps(&resp, &mut pending_requests)?;
|
||||||
fn dispatch(
|
}
|
||||||
self,
|
Err(Error::internal_error("TSO stream terminated"))
|
||||||
reactor: Arc<RwLock<PdReactor>>,
|
};
|
||||||
|
|
||||||
|
let (send_res, recv_res): (_, Result<()>) = join!(send_requests, receive_and_handle_responses);
|
||||||
|
error!("TSO send error: {:?}", send_res);
|
||||||
|
error!("TSO receive error: {:?}", recv_res);
|
||||||
|
}
|
||||||
|
|
||||||
|
struct TsoRequestStream<'a> {
|
||||||
cluster_id: u64,
|
cluster_id: u64,
|
||||||
pd_client: &pdpb::PdClient,
|
request_rx: Pin<&'a mut mpsc::Receiver<oneshot::Sender<Timestamp>>>,
|
||||||
handle: &TokioHandle,
|
pending_requests: Rc<RefCell<VecDeque<TimestampRequest>>>,
|
||||||
) {
|
self_waker: Rc<AtomicWaker>,
|
||||||
match self {
|
|
||||||
Task::Request => reactor.write().unwrap().tso_request(cluster_id),
|
|
||||||
Task::Response(requests, response) => {
|
|
||||||
reactor.write().unwrap().tso_response(requests, response)
|
|
||||||
}
|
}
|
||||||
Task::Init => PdReactor::init(reactor, pd_client, handle),
|
|
||||||
|
impl<'a> Stream for TsoRequestStream<'a> {
|
||||||
|
type Item = (TsoRequest, WriteFlags);
|
||||||
|
|
||||||
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||||
|
let pending_requests = self.pending_requests.clone();
|
||||||
|
let mut pending_requests = pending_requests.borrow_mut();
|
||||||
|
let mut count = 0;
|
||||||
|
while count < MAX_BATCH_SIZE && pending_requests.len() < MAX_PENDING_COUNT {
|
||||||
|
match self.request_rx.as_mut().poll_next(cx) {
|
||||||
|
Poll::Ready(Some(sender)) => {
|
||||||
|
pending_requests.push_back(sender);
|
||||||
|
count += 1;
|
||||||
}
|
}
|
||||||
|
Poll::Ready(None) => return Poll::Ready(None),
|
||||||
|
Poll::Pending => break,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A special-purpose event loop for asynchronously requesting timestamps. This is
|
if count > 0 {
|
||||||
/// more complex than just sending a request since requests are batched on the client.
|
let req = TsoRequest {
|
||||||
pub(super) struct PdReactor {
|
header: Some(RequestHeader {
|
||||||
// These fields are for communicating internally within the reactor to initialize
|
cluster_id: self.cluster_id,
|
||||||
// it and send communicate between threads.
|
}),
|
||||||
task_tx: Option<UnboundedSender<Task>>,
|
count: count as u32,
|
||||||
tso_tx: Sender<pdpb::TsoRequest>,
|
};
|
||||||
tso_rx: Option<Receiver<pdpb::TsoRequest>>,
|
let write_flags = WriteFlags::default().buffer_hint(false);
|
||||||
|
Poll::Ready(Some((req, write_flags)))
|
||||||
// These fields are for communicating with the PD cluster to get batches of timestamps.
|
|
||||||
handle: Option<JoinHandle<()>>,
|
|
||||||
tso_pending: Option<Vec<TsoChannel>>,
|
|
||||||
tso_buffer: Option<Vec<TsoChannel>>,
|
|
||||||
tso_batch: Vec<TsoChannel>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Drop for PdReactor {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
if let Some(handle) = self.handle.take() {
|
|
||||||
handle.join().unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl PdReactor {
|
|
||||||
pub(super) fn new() -> Self {
|
|
||||||
let (tso_tx, tso_rx) = mpsc::channel(1);
|
|
||||||
PdReactor {
|
|
||||||
task_tx: None,
|
|
||||||
tso_tx,
|
|
||||||
tso_rx: Some(tso_rx),
|
|
||||||
handle: None,
|
|
||||||
tso_buffer: Some(Vec::with_capacity(8)),
|
|
||||||
tso_batch: Vec::with_capacity(8),
|
|
||||||
tso_pending: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Startup the reactor, including the communication thread if necessary.
|
|
||||||
pub(super) fn start(r: Arc<RwLock<Self>>, cluster: &Cluster) {
|
|
||||||
let reactor = &mut r.write().unwrap();
|
|
||||||
if reactor.handle.is_none() {
|
|
||||||
info!("starting pd reactor thread");
|
|
||||||
let (task_tx, task_rx) = mpsc::unbounded();
|
|
||||||
task_tx.unbounded_send(Task::Init).unwrap();
|
|
||||||
let pd_client = cluster.client.clone();
|
|
||||||
let cluster_id = cluster.id;
|
|
||||||
reactor.task_tx = Some(task_tx);
|
|
||||||
let r2 = r.clone();
|
|
||||||
reactor.handle = Some(
|
|
||||||
thread::Builder::new()
|
|
||||||
.name("dispatcher thread".to_owned())
|
|
||||||
.spawn(move || PdReactor::poll(r2, cluster_id, pd_client, task_rx))
|
|
||||||
.unwrap(),
|
|
||||||
)
|
|
||||||
} else {
|
} else {
|
||||||
warn!("tso sender and receiver are stale, refreshing...");
|
// Set the waker to the context, then the stream can be waked up after the pending queue
|
||||||
let (tso_tx, tso_rx) = mpsc::channel(1);
|
// is no longer full.
|
||||||
reactor.tso_tx = tso_tx;
|
self.self_waker.register(cx.waker());
|
||||||
reactor.tso_rx = Some(tso_rx);
|
Poll::Pending
|
||||||
reactor.schedule(Task::Init);
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll(
|
fn allocate_timestamps(
|
||||||
r: Arc<RwLock<Self>>,
|
resp: &TsoResponse,
|
||||||
cluster_id: u64,
|
pending_requests: &mut VecDeque<TimestampRequest>,
|
||||||
pd_client: pdpb::PdClient,
|
) -> Result<()> {
|
||||||
task_rx: UnboundedReceiver<Task>,
|
// PD returns the timestamp with the biggest logical value. We can send back timestamps
|
||||||
) {
|
// whose logical value is from `logical - count + 1` to `logical` using the senders
|
||||||
let mut core = Core::new().unwrap();
|
// in `pending`.
|
||||||
let handle = core.handle();
|
let tail_ts = resp
|
||||||
|
.timestamp
|
||||||
|
.as_ref()
|
||||||
|
.ok_or_else(|| Error::internal_error("No timestamp in TsoResponse"))?;
|
||||||
|
|
||||||
let f = task_rx
|
let mut offset = i64::from(resp.count);
|
||||||
.map(|t| {
|
while offset > 0 {
|
||||||
t.dispatch(r.clone(), cluster_id, &pd_client, &handle);
|
offset -= 1;
|
||||||
})
|
if let Some(sender) = pending_requests.pop_front() {
|
||||||
.collect::<Vec<()>>();
|
let ts = Timestamp {
|
||||||
core.run(TryFutureExt::compat(f.unit_error())).unwrap();
|
physical: tail_ts.physical,
|
||||||
|
logical: tail_ts.logical - offset,
|
||||||
|
};
|
||||||
|
|
||||||
|
// It doesn't matter if the receiver of the channel is dropped.
|
||||||
|
let _ = sender.send(ts);
|
||||||
|
} else {
|
||||||
|
return Err(Error::internal_error(
|
||||||
|
"PD gives more timestamps than expected",
|
||||||
|
));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialise client for communicating with the PD cluster.
|
|
||||||
fn init(r: Arc<RwLock<Self>>, pd_client: &pdpb::PdClient, handle: &TokioHandle) {
|
|
||||||
let (tx, rx) = pd_client.tso().unwrap();
|
|
||||||
let tx = Compat01As03Sink::new(tx);
|
|
||||||
let rx = Compat01As03::new(rx);
|
|
||||||
let tso_rx = r.write().unwrap().tso_rx.take().unwrap(); // Receiver<TsoRequest>: Stream
|
|
||||||
|
|
||||||
handle.spawn(
|
|
||||||
tx.sink_map_err(Into::into)
|
|
||||||
.send_all_compat(tso_rx.map(|r| (r, WriteFlags::default())))
|
|
||||||
.map(|r: Result<_>| match r {
|
|
||||||
Ok((_sender, _)) => {
|
|
||||||
// FIXME(#54) the previous code doesn't work because we can't get mutable
|
|
||||||
// access to the underlying StreamingCallSink to call `cancel`. But I think
|
|
||||||
// that is OK because it will be canceled when it is dropped.
|
|
||||||
//
|
|
||||||
// _sender.get_mut().get_ref().cancel();
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
Err(e) => {
|
|
||||||
error!("failed to send tso requests: {:?}", e);
|
|
||||||
Err(())
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.compat(),
|
|
||||||
);
|
|
||||||
|
|
||||||
handle.spawn(
|
|
||||||
rx.try_for_each(move |resp| {
|
|
||||||
let reactor = &mut r.write().unwrap();
|
|
||||||
let tso_pending = reactor.tso_pending.take().unwrap();
|
|
||||||
reactor.schedule(Task::Response(tso_pending, resp));
|
|
||||||
if !reactor.tso_batch.is_empty() {
|
|
||||||
// Schedule another tso_batch of request
|
|
||||||
reactor.schedule(Task::Request);
|
|
||||||
}
|
|
||||||
future::ready(Ok(()))
|
|
||||||
})
|
|
||||||
.map_err(|e| panic!("unexpected error: {:?}", e))
|
|
||||||
.compat(),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn tso_request(&mut self, cluster_id: u64) {
|
|
||||||
let mut tso_batch = self.tso_buffer.take().unwrap();
|
|
||||||
tso_batch.extend(self.tso_batch.drain(..));
|
|
||||||
let mut request = pd_request!(cluster_id, pdpb::TsoRequest);
|
|
||||||
let batch_size = tso_batch.len();
|
|
||||||
observe_tso_batch(batch_size);
|
|
||||||
request.count = batch_size as u32;
|
|
||||||
self.tso_pending = Some(tso_batch);
|
|
||||||
self.tso_tx
|
|
||||||
.try_send(request)
|
|
||||||
.expect("channel can never be full");
|
|
||||||
}
|
|
||||||
|
|
||||||
fn tso_response(&mut self, mut requests: Vec<TsoChannel>, response: pdpb::TsoResponse) {
|
|
||||||
let timestamp = response.get_timestamp();
|
|
||||||
for (offset, request) in requests.drain(..).enumerate() {
|
|
||||||
request
|
|
||||||
.send(Timestamp {
|
|
||||||
physical: timestamp.physical,
|
|
||||||
logical: timestamp.logical + offset as i64,
|
|
||||||
})
|
|
||||||
.unwrap();
|
|
||||||
}
|
|
||||||
self.tso_buffer = Some(requests);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn schedule(&self, task: Task) {
|
|
||||||
self.task_tx
|
|
||||||
.as_ref()
|
|
||||||
.unwrap()
|
|
||||||
.unbounded_send(task)
|
|
||||||
.expect("unbounded send should never fail");
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_timestamp(&mut self) -> impl Future<Output = Result<Timestamp>> {
|
|
||||||
let context = request_context("get_ts");
|
|
||||||
let (tx, rx) = oneshot::channel::<Timestamp>();
|
|
||||||
self.tso_batch.push(tx);
|
|
||||||
if self.tso_pending.is_none() {
|
|
||||||
// Schedule tso request to run.
|
|
||||||
self.schedule(Task::Request);
|
|
||||||
}
|
|
||||||
rx.map_err(Into::into)
|
|
||||||
.into_future()
|
|
||||||
.map(move |r| context.done(r))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -78,7 +78,7 @@ impl Client {
|
||||||
/// # futures::executor::block_on(async {
|
/// # futures::executor::block_on(async {
|
||||||
/// let connect = Client::connect(Config::default());
|
/// let connect = Client::connect(Config::default());
|
||||||
/// let client = connect.await.unwrap();
|
/// let client = connect.await.unwrap();
|
||||||
/// let timestamp = Timestamp { physical: 1564474902, logical: 1 };
|
/// let timestamp = Timestamp { physical: 1564481750172, logical: 1 };
|
||||||
/// let snapshot = client.snapshot_at(timestamp);
|
/// let snapshot = client.snapshot_at(timestamp);
|
||||||
/// // ... Issue some commands.
|
/// // ... Issue some commands.
|
||||||
/// # });
|
/// # });
|
||||||
|
|
@ -127,7 +127,10 @@ impl Future for Connect {
|
||||||
type Output = Result<Client>;
|
type Output = Result<Client>;
|
||||||
|
|
||||||
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
|
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
|
||||||
let _config = &self.config;
|
let config = &self.config;
|
||||||
unimplemented!()
|
// TODO: RpcClient::connect currently uses a blocking implementation.
|
||||||
|
// Make it asynchronous later.
|
||||||
|
let rpc = Arc::new(RpcClient::connect(config)?);
|
||||||
|
Poll::Ready(Ok(Client { rpc }))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,41 @@
|
||||||
|
#![cfg(feature = "integration-tests")]
|
||||||
|
#![feature(async_await)]
|
||||||
|
|
||||||
|
use failure::Fallible;
|
||||||
|
use futures::executor::ThreadPool;
|
||||||
|
use futures::prelude::*;
|
||||||
|
use std::env;
|
||||||
|
use tikv_client::{transaction::*, Config, Result};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn get_timestamp() -> Fallible<()> {
|
||||||
|
const COUNT: usize = 1 << 16;
|
||||||
|
let mut pool = ThreadPool::new()?;
|
||||||
|
let config = Config::new(pd_addrs());
|
||||||
|
let fut = async {
|
||||||
|
let client = Client::connect(config).await?;
|
||||||
|
Result::Ok(future::join_all((0..COUNT).map(|_| client.current_timestamp())).await)
|
||||||
|
};
|
||||||
|
// Calculate each version of retrieved timestamp
|
||||||
|
let mut versions = pool
|
||||||
|
.run(fut)?
|
||||||
|
.into_iter()
|
||||||
|
.map(|res| res.map(|ts| (ts.physical << 18) + ts.logical))
|
||||||
|
.collect::<Result<Vec<_>>>()?;
|
||||||
|
|
||||||
|
// Each version should be unique
|
||||||
|
versions.sort_unstable();
|
||||||
|
versions.dedup();
|
||||||
|
assert_eq!(versions.len(), COUNT);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
const ENV_PD_ADDRS: &str = "PD_ADDRS";
|
||||||
|
|
||||||
|
fn pd_addrs() -> Vec<String> {
|
||||||
|
env::var(ENV_PD_ADDRS)
|
||||||
|
.expect(&format!("Expected {}:", ENV_PD_ADDRS))
|
||||||
|
.split(",")
|
||||||
|
.map(From::from)
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue