From 8f6479b9ad1d6037fda1f3321e560ae4d927d0b0 Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Thu, 1 Aug 2019 10:32:12 +0800 Subject: [PATCH] Address CI warnings and some comments Signed-off-by: Yilin Chen --- src/rpc/pd/client.rs | 10 +++++----- src/rpc/pd/timestamp.rs | 22 +++++++++++----------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/rpc/pd/client.rs b/src/rpc/pd/client.rs index 83dec1f..fc6f083 100644 --- a/src/rpc/pd/client.rs +++ b/src/rpc/pd/client.rs @@ -16,8 +16,8 @@ use kvproto::{metapb, pdpb}; use crate::{ rpc::{ pd::{ - context::request_context, request::retry_request, timestamp::Tso, Region, RegionId, - StoreId, Timestamp, + context::request_context, request::retry_request, timestamp::TimestampOracle, Region, + RegionId, StoreId, Timestamp, }, security::SecurityManager, }, @@ -143,7 +143,7 @@ pub struct Cluster { pub id: u64, pub(super) client: pdpb::PdClient, members: pdpb::GetMembersResponse, - tso: Tso, + tso: TimestampOracle, } // These methods make a single attempt to make a request. @@ -282,7 +282,7 @@ impl Connection { let (client, members) = self.try_connect_leader(&members, timeout)?; let id = members.get_header().get_cluster_id(); - let tso = Tso::new(id, &client)?; + let tso = TimestampOracle::new(id, &client)?; let cluster = Cluster { id, members, @@ -307,7 +307,7 @@ impl Connection { warn!("updating pd client, blocking the tokio core"); let start = Instant::now(); let (client, members) = self.try_connect_leader(&old_cluster.members, timeout)?; - let tso = Tso::new(old_cluster.id, &client)?; + let tso = TimestampOracle::new(old_cluster.id, &client)?; let cluster = Cluster { id: old_cluster.id, diff --git a/src/rpc/pd/timestamp.rs b/src/rpc/pd/timestamp.rs index c917912..e31a992 100644 --- a/src/rpc/pd/timestamp.rs +++ b/src/rpc/pd/timestamp.rs @@ -22,22 +22,22 @@ const MAX_PENDING_COUNT: usize = 64; /// The timestamp oracle which provides monotonically increasing timestamps. #[derive(Clone)] -pub struct Tso { - /// The transmitter of a bounded channel which transports the sender of an oneshot channel to +pub struct TimestampOracle { + /// The transmitter of a bounded channel which transports the sender of a oneshot channel to /// the TSO working thread. /// In the working thread, the `oneshot::Sender` is used to send back timestamp results. result_sender_tx: mpsc::Sender>, } -impl Tso { - pub fn new(cluster_id: u64, pd_client: &PdClient) -> Result { +impl TimestampOracle { + pub fn new(cluster_id: u64, pd_client: &PdClient) -> Result { let (result_sender_tx, result_sender_rx) = mpsc::channel(MAX_PENDING_COUNT); // Start a background thread to handle TSO requests and responses let worker = TsoWorker::new(cluster_id, pd_client, result_sender_rx)?; thread::spawn(move || block_on(worker.run())); - Ok(Tso { result_sender_tx }) + Ok(TimestampOracle { result_sender_tx }) } pub async fn get_timestamp(mut self) -> Result { @@ -75,7 +75,6 @@ impl TsoWorker { async fn run(mut self) { let ctx = Rc::new(RefCell::new(TsoContext { - cluster_id: self.cluster_id, pending_queue: VecDeque::with_capacity(MAX_PENDING_COUNT), waker: None, })); @@ -83,6 +82,7 @@ impl TsoWorker { let result_sender_rx = self.result_sender_rx; pin_mut!(result_sender_rx); let mut request_stream = TsoRequestStream { + cluster_id: self.cluster_id, result_sender_rx, ctx: ctx.clone(), }; @@ -106,6 +106,7 @@ impl TsoWorker { } struct TsoRequestStream<'a> { + cluster_id: u64, result_sender_rx: Pin<&'a mut mpsc::Receiver>>, ctx: Rc>, } @@ -138,7 +139,7 @@ impl<'a> Stream for TsoRequestStream<'a> { if count > 0 { let req = TsoRequest { header: Some(RequestHeader { - cluster_id: ctx.cluster_id, + cluster_id: self.cluster_id, }), count, }; @@ -151,7 +152,6 @@ impl<'a> Stream for TsoRequestStream<'a> { } struct TsoContext { - cluster_id: u64, pending_queue: VecDeque>, waker: Option, } @@ -165,7 +165,7 @@ impl TsoContext { .timestamp .as_ref() .ok_or_else(|| Error::internal_error("No timestamp in TsoResponse"))?; - let mut offset = resp.count as i64; + let mut offset = i64::from(resp.count); while offset > 0 { offset -= 1; if let Some(sender) = self.pending_queue.pop_front() { @@ -177,9 +177,9 @@ impl TsoContext { // It doesn't matter if the receiver of the channel is dropped. let _ = sender.send(ts); } else { - Err(Error::internal_error( + return Err(Error::internal_error( "PD gives more timestamps than expected", - ))? + )); } } Ok(())