mirror of https://github.com/tikv/client-rust.git
parent
e78d803aba
commit
d4fcc7af1b
|
|
@ -10,7 +10,6 @@ use crate::proto::tikvpb::tikv_client::TikvClient;
|
|||
use crate::range_request;
|
||||
use crate::region::RegionWithLeader;
|
||||
use crate::request::plan::ResponseWithShard;
|
||||
use crate::request::{Batchable, Collect};
|
||||
use crate::request::CollectSingle;
|
||||
use crate::request::DefaultProcessor;
|
||||
use crate::request::KvRequest;
|
||||
|
|
@ -19,6 +18,7 @@ use crate::request::Process;
|
|||
use crate::request::RangeRequest;
|
||||
use crate::request::Shardable;
|
||||
use crate::request::SingleKey;
|
||||
use crate::request::{Batchable, Collect};
|
||||
use crate::shardable_key;
|
||||
use crate::shardable_keys;
|
||||
use crate::shardable_range;
|
||||
|
|
@ -35,11 +35,11 @@ use crate::Result;
|
|||
use crate::Value;
|
||||
use async_trait::async_trait;
|
||||
use futures::stream::BoxStream;
|
||||
use futures::{stream, StreamExt};
|
||||
use std::any::Any;
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use futures::{stream, StreamExt};
|
||||
use tonic::transport::Channel;
|
||||
|
||||
const RAW_KV_REQUEST_BATCH_SIZE: u64 = 16 * 1024; // 16 KB
|
||||
|
|
@ -216,13 +216,14 @@ impl Shardable for kvrpcpb::RawBatchPutRequest {
|
|||
kv_ttl.sort_by(|a, b| a.0.key.cmp(&b.0.key));
|
||||
region_stream_for_keys(kv_ttl.into_iter(), pd_client.clone())
|
||||
.flat_map(|result| match result {
|
||||
Ok((keys, region)) => {
|
||||
stream::iter(kvrpcpb::RawBatchPutRequest::batches(keys, RAW_KV_REQUEST_BATCH_SIZE))
|
||||
.map(move |batch| Ok((batch, region.clone())))
|
||||
.boxed()
|
||||
}
|
||||
Err(e) => stream::iter(Err(e)).boxed(),
|
||||
})
|
||||
Ok((keys, region)) => stream::iter(kvrpcpb::RawBatchPutRequest::batches(
|
||||
keys,
|
||||
RAW_KV_REQUEST_BATCH_SIZE,
|
||||
))
|
||||
.map(move |batch| Ok((batch, region.clone())))
|
||||
.boxed(),
|
||||
Err(e) => stream::iter(Err(e)).boxed(),
|
||||
})
|
||||
.boxed()
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -129,9 +129,8 @@ mod test {
|
|||
|
||||
impl HasLocks for MockRpcResponse {}
|
||||
|
||||
#[derive(Clone)]
|
||||
#[derive(Default)]
|
||||
struct MockKvRequest {
|
||||
#[derive(Clone, Default)]
|
||||
struct MockKvRequest {
|
||||
test_invoking_count: Arc<AtomicUsize>,
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -117,7 +117,10 @@ impl<Req: KvRequest + Shardable> Shardable for Dispatch<Req> {
|
|||
where
|
||||
Self: Sized + Clone,
|
||||
{
|
||||
Dispatch{ request: self.request.clone_then_apply_shard(shard), kv_client: self.kv_client.clone() }
|
||||
Dispatch {
|
||||
request: self.request.clone_then_apply_shard(shard),
|
||||
kv_client: self.kv_client.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
|
||||
|
|
|
|||
Loading…
Reference in New Issue