Address CI warnings and some comments

Signed-off-by: Yilin Chen <sticnarf@gmail.com>
This commit is contained in:
Yilin Chen 2019-08-01 10:32:12 +08:00
parent 3522d33696
commit 8f6479b9ad
No known key found for this signature in database
GPG Key ID: 353E7ED34BF326E0
2 changed files with 16 additions and 16 deletions

View File

@ -16,8 +16,8 @@ use kvproto::{metapb, pdpb};
use crate::{ use crate::{
rpc::{ rpc::{
pd::{ pd::{
context::request_context, request::retry_request, timestamp::Tso, Region, RegionId, context::request_context, request::retry_request, timestamp::TimestampOracle, Region,
StoreId, Timestamp, RegionId, StoreId, Timestamp,
}, },
security::SecurityManager, security::SecurityManager,
}, },
@ -143,7 +143,7 @@ pub struct Cluster {
pub id: u64, pub id: u64,
pub(super) client: pdpb::PdClient, pub(super) client: pdpb::PdClient,
members: pdpb::GetMembersResponse, members: pdpb::GetMembersResponse,
tso: Tso, tso: TimestampOracle,
} }
// These methods make a single attempt to make a request. // These methods make a single attempt to make a request.
@ -282,7 +282,7 @@ impl Connection {
let (client, members) = self.try_connect_leader(&members, timeout)?; let (client, members) = self.try_connect_leader(&members, timeout)?;
let id = members.get_header().get_cluster_id(); let id = members.get_header().get_cluster_id();
let tso = Tso::new(id, &client)?; let tso = TimestampOracle::new(id, &client)?;
let cluster = Cluster { let cluster = Cluster {
id, id,
members, members,
@ -307,7 +307,7 @@ impl Connection {
warn!("updating pd client, blocking the tokio core"); warn!("updating pd client, blocking the tokio core");
let start = Instant::now(); let start = Instant::now();
let (client, members) = self.try_connect_leader(&old_cluster.members, timeout)?; let (client, members) = self.try_connect_leader(&old_cluster.members, timeout)?;
let tso = Tso::new(old_cluster.id, &client)?; let tso = TimestampOracle::new(old_cluster.id, &client)?;
let cluster = Cluster { let cluster = Cluster {
id: old_cluster.id, id: old_cluster.id,

View File

@ -22,22 +22,22 @@ const MAX_PENDING_COUNT: usize = 64;
/// The timestamp oracle which provides monotonically increasing timestamps. /// The timestamp oracle which provides monotonically increasing timestamps.
#[derive(Clone)] #[derive(Clone)]
pub struct Tso { pub struct TimestampOracle {
/// The transmitter of a bounded channel which transports the sender of an oneshot channel to /// The transmitter of a bounded channel which transports the sender of a oneshot channel to
/// the TSO working thread. /// the TSO working thread.
/// In the working thread, the `oneshot::Sender` is used to send back timestamp results. /// In the working thread, the `oneshot::Sender` is used to send back timestamp results.
result_sender_tx: mpsc::Sender<oneshot::Sender<Timestamp>>, result_sender_tx: mpsc::Sender<oneshot::Sender<Timestamp>>,
} }
impl Tso { impl TimestampOracle {
pub fn new(cluster_id: u64, pd_client: &PdClient) -> Result<Tso> { pub fn new(cluster_id: u64, pd_client: &PdClient) -> Result<TimestampOracle> {
let (result_sender_tx, result_sender_rx) = mpsc::channel(MAX_PENDING_COUNT); let (result_sender_tx, result_sender_rx) = mpsc::channel(MAX_PENDING_COUNT);
// Start a background thread to handle TSO requests and responses // Start a background thread to handle TSO requests and responses
let worker = TsoWorker::new(cluster_id, pd_client, result_sender_rx)?; let worker = TsoWorker::new(cluster_id, pd_client, result_sender_rx)?;
thread::spawn(move || block_on(worker.run())); thread::spawn(move || block_on(worker.run()));
Ok(Tso { result_sender_tx }) Ok(TimestampOracle { result_sender_tx })
} }
pub async fn get_timestamp(mut self) -> Result<Timestamp> { pub async fn get_timestamp(mut self) -> Result<Timestamp> {
@ -75,7 +75,6 @@ impl TsoWorker {
async fn run(mut self) { async fn run(mut self) {
let ctx = Rc::new(RefCell::new(TsoContext { let ctx = Rc::new(RefCell::new(TsoContext {
cluster_id: self.cluster_id,
pending_queue: VecDeque::with_capacity(MAX_PENDING_COUNT), pending_queue: VecDeque::with_capacity(MAX_PENDING_COUNT),
waker: None, waker: None,
})); }));
@ -83,6 +82,7 @@ impl TsoWorker {
let result_sender_rx = self.result_sender_rx; let result_sender_rx = self.result_sender_rx;
pin_mut!(result_sender_rx); pin_mut!(result_sender_rx);
let mut request_stream = TsoRequestStream { let mut request_stream = TsoRequestStream {
cluster_id: self.cluster_id,
result_sender_rx, result_sender_rx,
ctx: ctx.clone(), ctx: ctx.clone(),
}; };
@ -106,6 +106,7 @@ impl TsoWorker {
} }
struct TsoRequestStream<'a> { struct TsoRequestStream<'a> {
cluster_id: u64,
result_sender_rx: Pin<&'a mut mpsc::Receiver<oneshot::Sender<Timestamp>>>, result_sender_rx: Pin<&'a mut mpsc::Receiver<oneshot::Sender<Timestamp>>>,
ctx: Rc<RefCell<TsoContext>>, ctx: Rc<RefCell<TsoContext>>,
} }
@ -138,7 +139,7 @@ impl<'a> Stream for TsoRequestStream<'a> {
if count > 0 { if count > 0 {
let req = TsoRequest { let req = TsoRequest {
header: Some(RequestHeader { header: Some(RequestHeader {
cluster_id: ctx.cluster_id, cluster_id: self.cluster_id,
}), }),
count, count,
}; };
@ -151,7 +152,6 @@ impl<'a> Stream for TsoRequestStream<'a> {
} }
struct TsoContext { struct TsoContext {
cluster_id: u64,
pending_queue: VecDeque<oneshot::Sender<Timestamp>>, pending_queue: VecDeque<oneshot::Sender<Timestamp>>,
waker: Option<Waker>, waker: Option<Waker>,
} }
@ -165,7 +165,7 @@ impl TsoContext {
.timestamp .timestamp
.as_ref() .as_ref()
.ok_or_else(|| Error::internal_error("No timestamp in TsoResponse"))?; .ok_or_else(|| Error::internal_error("No timestamp in TsoResponse"))?;
let mut offset = resp.count as i64; let mut offset = i64::from(resp.count);
while offset > 0 { while offset > 0 {
offset -= 1; offset -= 1;
if let Some(sender) = self.pending_queue.pop_front() { if let Some(sender) = self.pending_queue.pop_front() {
@ -177,9 +177,9 @@ impl TsoContext {
// It doesn't matter if the receiver of the channel is dropped. // It doesn't matter if the receiver of the channel is dropped.
let _ = sender.send(ts); let _ = sender.send(ts);
} else { } else {
Err(Error::internal_error( return Err(Error::internal_error(
"PD gives more timestamps than expected", "PD gives more timestamps than expected",
))? ));
} }
} }
Ok(()) Ok(())