also split batch delete

Signed-off-by: lance6716 <lance6716@gmail.com>
This commit is contained in:
lance6716 2025-08-25 20:59:28 +08:00
parent 669774e9ff
commit c546874f64
2 changed files with 66 additions and 7 deletions

View File

@ -290,7 +290,56 @@ impl KvRequest for kvrpcpb::RawBatchDeleteRequest {
type Response = kvrpcpb::RawBatchDeleteResponse;
}
shardable_keys!(kvrpcpb::RawBatchDeleteRequest);
impl Batchable for kvrpcpb::RawBatchDeleteRequest {
type Item = Vec<u8>;
fn item_size(item: &Self::Item) -> u64 {
item.len() as u64
}
}
impl Shardable for kvrpcpb::RawBatchDeleteRequest {
type Shard = Vec<Vec<u8>>;
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
let mut keys = self.keys.clone();
keys.sort();
region_stream_for_keys(keys.into_iter(), pd_client.clone())
.flat_map(|result| match result {
Ok((keys, region)) => stream::iter(kvrpcpb::RawBatchDeleteRequest::batches(
keys,
RAW_KV_REQUEST_BATCH_SIZE,
))
.map(move |batch| Ok((batch, region.clone())))
.boxed(),
Err(e) => stream::iter(Err(e)).boxed(),
})
.boxed()
}
fn apply_shard(&mut self, shard: Self::Shard) {
self.keys = shard;
}
fn clone_then_apply_shard(&self, shard: Self::Shard) -> Self
where
Self: Sized + Clone,
{
let mut cloned = Self::default();
cloned.context = self.context.clone();
cloned.cf = self.cf.clone();
cloned.for_cas = self.for_cas;
cloned.apply_shard(shard);
cloned
}
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)
}
}
pub fn new_raw_delete_range_request(
start_key: Vec<u8>,

View File

@ -889,11 +889,8 @@ async fn raw_large_batch_put() -> Result<()> {
// Generate value: repeat pattern to reach VALUE_SIZE
let pattern = format!("value_{}", i % 1000);
let mut value = String::new();
while value.len() < VALUE_SIZE {
value.push_str(&pattern);
}
value.truncate(VALUE_SIZE);
let repeat_count = (VALUE_SIZE + pattern.len() - 1) / pattern.len();
let value = pattern.repeat(repeat_count);
pairs.push(KvPair::from((key, value)));
}
@ -902,7 +899,20 @@ async fn raw_large_batch_put() -> Result<()> {
let client =
RawClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()).await?;
client.batch_put(pairs).await?;
client.batch_put(pairs.clone()).await?;
let keys = pairs.iter().map(|pair| pair.0.clone()).collect::<Vec<_>>();
// split into multiple batch_get to avoid response too large error
const BATCH_SIZE: usize = 1000;
let mut got = Vec::with_capacity(num_pairs);
for chunk in keys.chunks(BATCH_SIZE) {
let mut partial = client.batch_get(chunk.to_vec()).await?;
got.append(&mut partial);
}
assert_eq!(got, pairs);
client.batch_delete(keys).await?;
Ok(())
}