From 6b34ce7edc3ac15406bfe726805a1cb096e37a06 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 25 Aug 2025 16:28:05 +0800 Subject: [PATCH] split batches for RawKV requests Signed-off-by: lance6716 --- examples/bench_batch_put.rs | 91 +++++++++++++++++++++++++++++++++++++ src/raw/requests.rs | 34 +++++++++++++- src/request/mod.rs | 3 +- src/request/plan.rs | 5 +- src/request/shard.rs | 17 +++++++ 5 files changed, 145 insertions(+), 5 deletions(-) create mode 100644 examples/bench_batch_put.rs diff --git a/examples/bench_batch_put.rs b/examples/bench_batch_put.rs new file mode 100644 index 0000000..223afa6 --- /dev/null +++ b/examples/bench_batch_put.rs @@ -0,0 +1,91 @@ +// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0. + +#![type_length_limit = "8165158"] + +mod common; + +use std::time::Instant; +use tikv_client::Config; +use tikv_client::KvPair; +use tikv_client::RawClient as Client; +use tikv_client::Result; + +use crate::common::parse_args; + +const TARGET_SIZE_MB: usize = 40; +const KEY_SIZE: usize = 32; +const VALUE_SIZE: usize = 1024; // 1KB per value + +#[tokio::main] +async fn main() -> Result<()> { + env_logger::init(); + + // Parse command line arguments + let args = parse_args("raw"); + + // Create a configuration to use for the example + let config = if let (Some(ca), Some(cert), Some(key)) = (args.ca, args.cert, args.key) { + Config::default().with_security(ca, cert, key) + } else { + Config::default() + }; + + // Create the client + let client = Client::new_with_config(args.pd, config).await?; + + // Calculate how many key-value pairs we need to reach 100MB + let pair_size = KEY_SIZE + VALUE_SIZE; + let target_size_bytes = TARGET_SIZE_MB * 1024 * 1024; + let num_pairs = target_size_bytes / pair_size; + + println!("Preparing to create {} key-value pairs", num_pairs); + println!("Key size: {} bytes, Value size: {} bytes", KEY_SIZE, VALUE_SIZE); + println!("Total data size: ~{} MB", (num_pairs * pair_size) / (1024 * 1024)); + + // Generate key-value pairs + println!("Generating key-value pairs..."); + let generation_start = Instant::now(); + + let mut pairs = Vec::with_capacity(num_pairs); + for i in 0..num_pairs { + // Generate key: "bench_key_" + zero-padded number + let key = format!("bench_key_{:010}", i); + + // Generate value: repeat pattern to reach VALUE_SIZE + let pattern = format!("value_{}", i % 1000); + let mut value = String::new(); + while value.len() < VALUE_SIZE { + value.push_str(&pattern); + } + value.truncate(VALUE_SIZE); + + pairs.push(KvPair::from((key, value))); + } + + let generation_duration = generation_start.elapsed(); + println!("Generated {} pairs in {:?}", pairs.len(), generation_duration); + + // Perform batch_put and measure timing + println!("Starting batch_put operation..."); + let batch_put_start = Instant::now(); + + client.batch_put(pairs).await.expect("Failed to perform batch_put"); + + let batch_put_duration = batch_put_start.elapsed(); + + // Calculate statistics + let total_bytes = num_pairs * pair_size; + let throughput_mb_per_sec = (total_bytes as f64 / (1024.0 * 1024.0)) / batch_put_duration.as_secs_f64(); + let ops_per_sec = num_pairs as f64 / batch_put_duration.as_secs_f64(); + + // Print results + println!("\n=== Batch Put Benchmark Results ==="); + println!("Total key-value pairs: {}", num_pairs); + println!("Total data size: {:.2} MB", total_bytes as f64 / (1024.0 * 1024.0)); + println!("Batch put duration: {:?}", batch_put_duration); + println!("Throughput: {:.2} MB/s", throughput_mb_per_sec); + println!("Operations per second: {:.2} ops/s", ops_per_sec); + println!("Average latency per operation: {:.2} μs", batch_put_duration.as_micros() as f64 / num_pairs as f64); + + Ok(()) +} diff --git a/src/raw/requests.rs b/src/raw/requests.rs index f6f32f3..4de8df7 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -10,7 +10,7 @@ use crate::proto::tikvpb::tikv_client::TikvClient; use crate::range_request; use crate::region::RegionWithLeader; use crate::request::plan::ResponseWithShard; -use crate::request::Collect; +use crate::request::{Batchable, Collect}; use crate::request::CollectSingle; use crate::request::DefaultProcessor; use crate::request::KvRequest; @@ -39,8 +39,11 @@ use std::any::Any; use std::ops::Range; use std::sync::Arc; use std::time::Duration; +use futures::{stream, StreamExt}; use tonic::transport::Channel; +const RAW_KV_REQUEST_BATCH_SIZE: u64 = 16 * 1024; // 16 KB + pub fn new_raw_get_request(key: Vec, cf: Option) -> kvrpcpb::RawGetRequest { let mut req = kvrpcpb::RawGetRequest::default(); req.key = key; @@ -188,6 +191,14 @@ impl KvRequest for kvrpcpb::RawBatchPutRequest { type Response = kvrpcpb::RawBatchPutResponse; } +impl Batchable for kvrpcpb::RawBatchPutRequest { + type Item = (kvrpcpb::KvPair, u64); + + fn item_size(item: &Self::Item) -> u64 { + (item.0.key.len() + item.0.value.len()) as u64 + } +} + impl Shardable for kvrpcpb::RawBatchPutRequest { type Shard = Vec<(kvrpcpb::KvPair, u64)>; @@ -204,6 +215,15 @@ impl Shardable for kvrpcpb::RawBatchPutRequest { .collect(); kv_ttl.sort_by(|a, b| a.0.key.cmp(&b.0.key)); region_stream_for_keys(kv_ttl.into_iter(), pd_client.clone()) + .flat_map(|result| match result { + Ok((keys, region)) => { + stream::iter(kvrpcpb::RawBatchPutRequest::batches(keys, RAW_KV_REQUEST_BATCH_SIZE)) + .map(move |batch| Ok((batch, region.clone()))) + .boxed() + } + Err(e) => stream::iter(Err(e)).boxed(), + }) + .boxed() } fn apply_shard(&mut self, shard: Self::Shard) { @@ -212,6 +232,18 @@ impl Shardable for kvrpcpb::RawBatchPutRequest { self.ttls = ttls; } + fn clone_then_apply_shard(&self, shard: Self::Shard) -> Self + where + Self: Sized + Clone, + { + let mut cloned = Self::default(); + cloned.context = self.context.clone(); + cloned.cf = self.cf.clone(); + cloned.for_cas = self.for_cas; + cloned.apply_shard(shard); + cloned + } + fn apply_store(&mut self, store: &RegionStore) -> Result<()> { self.set_leader(&store.region_with_leader) } diff --git a/src/request/mod.rs b/src/request/mod.rs index a9913e7..558fdd4 100644 --- a/src/request/mod.rs +++ b/src/request/mod.rs @@ -130,7 +130,8 @@ mod test { impl HasLocks for MockRpcResponse {} #[derive(Clone)] - struct MockKvRequest { + #[derive(Default)] +struct MockKvRequest { test_invoking_count: Arc, } diff --git a/src/request/plan.rs b/src/request/plan.rs index be915f7..c4a1a36 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -117,11 +117,10 @@ where ) -> Result<::Result> { let shards = current_plan.shards(&pd_client).collect::>().await; debug!("single_plan_handler, shards: {}", shards.len()); - let mut handles = Vec::new(); + let mut handles = Vec::with_capacity(shards.len()); for shard in shards { let (shard, region) = shard?; - let mut clone = current_plan.clone(); - clone.apply_shard(shard); + let clone = current_plan.clone_then_apply_shard(shard); let handle = tokio::spawn(Self::single_shard_handler( pd_client.clone(), clone, diff --git a/src/request/shard.rs b/src/request/shard.rs index a68f446..2c03c10 100644 --- a/src/request/shard.rs +++ b/src/request/shard.rs @@ -48,6 +48,16 @@ pub trait Shardable { fn apply_shard(&mut self, shard: Self::Shard); + /// Implementation can skip unnecessary fields clone if fields will be overwritten by `apply_shard`. + fn clone_then_apply_shard(&self, shard: Self::Shard) -> Self + where + Self: Sized + Clone, + { + let mut cloned = self.clone(); + cloned.apply_shard(shard); + cloned + } + fn apply_store(&mut self, store: &RegionStore) -> Result<()>; } @@ -103,6 +113,13 @@ impl Shardable for Dispatch { self.request.apply_shard(shard); } + fn clone_then_apply_shard(&self, shard: Self::Shard) -> Self + where + Self: Sized + Clone, + { + Dispatch{ request: self.request.clone_then_apply_shard(shard), kv_client: self.kv_client.clone() } + } + fn apply_store(&mut self, store: &RegionStore) -> Result<()> { self.kv_client = Some(store.client.clone()); self.request.apply_store(store)