From d4fcc7af1bf193514f934917e197b78384de6e6b Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 25 Aug 2025 17:06:34 +0800 Subject: [PATCH] cargo fmt Signed-off-by: lance6716 --- src/raw/requests.rs | 19 ++++++++++--------- src/request/mod.rs | 5 ++--- src/request/shard.rs | 5 ++++- 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/src/raw/requests.rs b/src/raw/requests.rs index 4de8df7..f9da742 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -10,7 +10,6 @@ use crate::proto::tikvpb::tikv_client::TikvClient; use crate::range_request; use crate::region::RegionWithLeader; use crate::request::plan::ResponseWithShard; -use crate::request::{Batchable, Collect}; use crate::request::CollectSingle; use crate::request::DefaultProcessor; use crate::request::KvRequest; @@ -19,6 +18,7 @@ use crate::request::Process; use crate::request::RangeRequest; use crate::request::Shardable; use crate::request::SingleKey; +use crate::request::{Batchable, Collect}; use crate::shardable_key; use crate::shardable_keys; use crate::shardable_range; @@ -35,11 +35,11 @@ use crate::Result; use crate::Value; use async_trait::async_trait; use futures::stream::BoxStream; +use futures::{stream, StreamExt}; use std::any::Any; use std::ops::Range; use std::sync::Arc; use std::time::Duration; -use futures::{stream, StreamExt}; use tonic::transport::Channel; const RAW_KV_REQUEST_BATCH_SIZE: u64 = 16 * 1024; // 16 KB @@ -216,13 +216,14 @@ impl Shardable for kvrpcpb::RawBatchPutRequest { kv_ttl.sort_by(|a, b| a.0.key.cmp(&b.0.key)); region_stream_for_keys(kv_ttl.into_iter(), pd_client.clone()) .flat_map(|result| match result { - Ok((keys, region)) => { - stream::iter(kvrpcpb::RawBatchPutRequest::batches(keys, RAW_KV_REQUEST_BATCH_SIZE)) - .map(move |batch| Ok((batch, region.clone()))) - .boxed() - } - Err(e) => stream::iter(Err(e)).boxed(), - }) + Ok((keys, region)) => stream::iter(kvrpcpb::RawBatchPutRequest::batches( + keys, + RAW_KV_REQUEST_BATCH_SIZE, + )) + .map(move |batch| Ok((batch, region.clone()))) + .boxed(), + Err(e) => stream::iter(Err(e)).boxed(), + }) .boxed() } diff --git a/src/request/mod.rs b/src/request/mod.rs index 558fdd4..f7c81a7 100644 --- a/src/request/mod.rs +++ b/src/request/mod.rs @@ -129,9 +129,8 @@ mod test { impl HasLocks for MockRpcResponse {} - #[derive(Clone)] - #[derive(Default)] -struct MockKvRequest { + #[derive(Clone, Default)] + struct MockKvRequest { test_invoking_count: Arc, } diff --git a/src/request/shard.rs b/src/request/shard.rs index 2c03c10..1bac69e 100644 --- a/src/request/shard.rs +++ b/src/request/shard.rs @@ -117,7 +117,10 @@ impl Shardable for Dispatch { where Self: Sized + Clone, { - Dispatch{ request: self.request.clone_then_apply_shard(shard), kv_client: self.kv_client.clone() } + Dispatch { + request: self.request.clone_then_apply_shard(shard), + kv_client: self.kv_client.clone(), + } } fn apply_store(&mut self, store: &RegionStore) -> Result<()> {