split batches for RawKV requests

Signed-off-by: lance6716 <lance6716@gmail.com>
This commit is contained in:
lance6716 2025-08-25 16:28:05 +08:00
parent 7c78aadf44
commit 6b34ce7edc
5 changed files with 145 additions and 5 deletions

View File

@ -0,0 +1,91 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.
#![type_length_limit = "8165158"]
mod common;
use std::time::Instant;
use tikv_client::Config;
use tikv_client::KvPair;
use tikv_client::RawClient as Client;
use tikv_client::Result;
use crate::common::parse_args;
const TARGET_SIZE_MB: usize = 40;
const KEY_SIZE: usize = 32;
const VALUE_SIZE: usize = 1024; // 1KB per value
#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();
// Parse command line arguments
let args = parse_args("raw");
// Create a configuration to use for the example
let config = if let (Some(ca), Some(cert), Some(key)) = (args.ca, args.cert, args.key) {
Config::default().with_security(ca, cert, key)
} else {
Config::default()
};
// Create the client
let client = Client::new_with_config(args.pd, config).await?;
// Calculate how many key-value pairs we need to reach 100MB
let pair_size = KEY_SIZE + VALUE_SIZE;
let target_size_bytes = TARGET_SIZE_MB * 1024 * 1024;
let num_pairs = target_size_bytes / pair_size;
println!("Preparing to create {} key-value pairs", num_pairs);
println!("Key size: {} bytes, Value size: {} bytes", KEY_SIZE, VALUE_SIZE);
println!("Total data size: ~{} MB", (num_pairs * pair_size) / (1024 * 1024));
// Generate key-value pairs
println!("Generating key-value pairs...");
let generation_start = Instant::now();
let mut pairs = Vec::with_capacity(num_pairs);
for i in 0..num_pairs {
// Generate key: "bench_key_" + zero-padded number
let key = format!("bench_key_{:010}", i);
// Generate value: repeat pattern to reach VALUE_SIZE
let pattern = format!("value_{}", i % 1000);
let mut value = String::new();
while value.len() < VALUE_SIZE {
value.push_str(&pattern);
}
value.truncate(VALUE_SIZE);
pairs.push(KvPair::from((key, value)));
}
let generation_duration = generation_start.elapsed();
println!("Generated {} pairs in {:?}", pairs.len(), generation_duration);
// Perform batch_put and measure timing
println!("Starting batch_put operation...");
let batch_put_start = Instant::now();
client.batch_put(pairs).await.expect("Failed to perform batch_put");
let batch_put_duration = batch_put_start.elapsed();
// Calculate statistics
let total_bytes = num_pairs * pair_size;
let throughput_mb_per_sec = (total_bytes as f64 / (1024.0 * 1024.0)) / batch_put_duration.as_secs_f64();
let ops_per_sec = num_pairs as f64 / batch_put_duration.as_secs_f64();
// Print results
println!("\n=== Batch Put Benchmark Results ===");
println!("Total key-value pairs: {}", num_pairs);
println!("Total data size: {:.2} MB", total_bytes as f64 / (1024.0 * 1024.0));
println!("Batch put duration: {:?}", batch_put_duration);
println!("Throughput: {:.2} MB/s", throughput_mb_per_sec);
println!("Operations per second: {:.2} ops/s", ops_per_sec);
println!("Average latency per operation: {:.2} μs", batch_put_duration.as_micros() as f64 / num_pairs as f64);
Ok(())
}

View File

@ -10,7 +10,7 @@ use crate::proto::tikvpb::tikv_client::TikvClient;
use crate::range_request;
use crate::region::RegionWithLeader;
use crate::request::plan::ResponseWithShard;
use crate::request::Collect;
use crate::request::{Batchable, Collect};
use crate::request::CollectSingle;
use crate::request::DefaultProcessor;
use crate::request::KvRequest;
@ -39,8 +39,11 @@ use std::any::Any;
use std::ops::Range;
use std::sync::Arc;
use std::time::Duration;
use futures::{stream, StreamExt};
use tonic::transport::Channel;
const RAW_KV_REQUEST_BATCH_SIZE: u64 = 16 * 1024; // 16 KB
pub fn new_raw_get_request(key: Vec<u8>, cf: Option<ColumnFamily>) -> kvrpcpb::RawGetRequest {
let mut req = kvrpcpb::RawGetRequest::default();
req.key = key;
@ -188,6 +191,14 @@ impl KvRequest for kvrpcpb::RawBatchPutRequest {
type Response = kvrpcpb::RawBatchPutResponse;
}
impl Batchable for kvrpcpb::RawBatchPutRequest {
type Item = (kvrpcpb::KvPair, u64);
fn item_size(item: &Self::Item) -> u64 {
(item.0.key.len() + item.0.value.len()) as u64
}
}
impl Shardable for kvrpcpb::RawBatchPutRequest {
type Shard = Vec<(kvrpcpb::KvPair, u64)>;
@ -204,6 +215,15 @@ impl Shardable for kvrpcpb::RawBatchPutRequest {
.collect();
kv_ttl.sort_by(|a, b| a.0.key.cmp(&b.0.key));
region_stream_for_keys(kv_ttl.into_iter(), pd_client.clone())
.flat_map(|result| match result {
Ok((keys, region)) => {
stream::iter(kvrpcpb::RawBatchPutRequest::batches(keys, RAW_KV_REQUEST_BATCH_SIZE))
.map(move |batch| Ok((batch, region.clone())))
.boxed()
}
Err(e) => stream::iter(Err(e)).boxed(),
})
.boxed()
}
fn apply_shard(&mut self, shard: Self::Shard) {
@ -212,6 +232,18 @@ impl Shardable for kvrpcpb::RawBatchPutRequest {
self.ttls = ttls;
}
fn clone_then_apply_shard(&self, shard: Self::Shard) -> Self
where
Self: Sized + Clone,
{
let mut cloned = Self::default();
cloned.context = self.context.clone();
cloned.cf = self.cf.clone();
cloned.for_cas = self.for_cas;
cloned.apply_shard(shard);
cloned
}
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)
}

View File

@ -130,6 +130,7 @@ mod test {
impl HasLocks for MockRpcResponse {}
#[derive(Clone)]
#[derive(Default)]
struct MockKvRequest {
test_invoking_count: Arc<AtomicUsize>,
}

View File

@ -117,11 +117,10 @@ where
) -> Result<<Self as Plan>::Result> {
let shards = current_plan.shards(&pd_client).collect::<Vec<_>>().await;
debug!("single_plan_handler, shards: {}", shards.len());
let mut handles = Vec::new();
let mut handles = Vec::with_capacity(shards.len());
for shard in shards {
let (shard, region) = shard?;
let mut clone = current_plan.clone();
clone.apply_shard(shard);
let clone = current_plan.clone_then_apply_shard(shard);
let handle = tokio::spawn(Self::single_shard_handler(
pd_client.clone(),
clone,

View File

@ -48,6 +48,16 @@ pub trait Shardable {
fn apply_shard(&mut self, shard: Self::Shard);
/// Implementation can skip unnecessary fields clone if fields will be overwritten by `apply_shard`.
fn clone_then_apply_shard(&self, shard: Self::Shard) -> Self
where
Self: Sized + Clone,
{
let mut cloned = self.clone();
cloned.apply_shard(shard);
cloned
}
fn apply_store(&mut self, store: &RegionStore) -> Result<()>;
}
@ -103,6 +113,13 @@ impl<Req: KvRequest + Shardable> Shardable for Dispatch<Req> {
self.request.apply_shard(shard);
}
fn clone_then_apply_shard(&self, shard: Self::Shard) -> Self
where
Self: Sized + Clone,
{
Dispatch{ request: self.request.clone_then_apply_shard(shard), kv_client: self.kv_client.clone() }
}
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
self.kv_client = Some(store.client.clone());
self.request.apply_store(store)