address PR feedback

Signed-off-by: limbooverlambda <schakra1@gmail.com>
This commit is contained in:
limbooverlambda 2024-07-08 13:12:50 -07:00
parent dcb6b894e6
commit 181cde120a
1 changed files with 2 additions and 68 deletions

View File

@ -774,7 +774,7 @@ impl<PdC: PdClient> Client<PdC> {
let scan_args = ScanInnerArgs { let scan_args = ScanInnerArgs {
start_key: current_key.clone(), start_key: current_key.clone(),
range: range.clone(), range: range.clone(),
limit, limit: current_limit,
key_only, key_only,
reverse, reverse,
backoff: backoff.clone(), backoff: backoff.clone(),
@ -789,12 +789,7 @@ impl<PdC: PdClient> Client<PdC> {
current_limit -= kvs.len() as u32; current_limit -= kvs.len() as u32;
result.append(&mut kvs); result.append(&mut kvs);
} }
if end_key if end_key.clone().is_some_and(|ek| ek <= next_key) {
.as_ref()
.map(|ek| ek <= next_key.as_ref())
.unwrap_or(false)
|| next_key.is_empty()
{
break; break;
} else { } else {
current_key = next_key; current_key = next_key;
@ -928,13 +923,11 @@ struct ScanInnerArgs {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::any::Any; use std::any::Any;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use super::*; use super::*;
use crate::mock::MockKvClient; use crate::mock::MockKvClient;
use crate::mock::MockPdClient; use crate::mock::MockPdClient;
use crate::proto::errorpb::EpochNotMatch;
use crate::proto::kvrpcpb; use crate::proto::kvrpcpb;
use crate::Result; use crate::Result;
@ -1016,63 +1009,4 @@ mod tests {
); );
Ok(()) Ok(())
} }
#[tokio::test]
async fn test_raw_scan_retryer() -> Result<()> {
let epoch_not_match_error = EpochNotMatch {
current_regions: vec![],
};
let region_error = errorpb::Error {
epoch_not_match: Some(epoch_not_match_error),
..Default::default()
};
let flag = Arc::new(AtomicBool::new(true));
let tikv_flag = flag.clone();
let error_handler_flag = flag.clone();
let mock_tikv_client = MockKvClient::with_dispatch_hook(move |req: &dyn Any| {
if req.downcast_ref::<RawScanRequest>().is_some() {
let v = tikv_flag.clone().load(Ordering::Relaxed);
let resp = if v {
RawScanResponse {
region_error: Some(region_error.clone()),
..Default::default()
}
} else {
RawScanResponse {
..Default::default()
}
};
Ok(Box::new(resp) as Box<dyn Any>)
} else {
unreachable!()
}
});
let mock_pd_client = MockPdClient::new(mock_tikv_client);
let client = Client {
rpc: Arc::new(mock_pd_client),
cf: Some(ColumnFamily::Default),
backoff: DEFAULT_REGION_BACKOFF,
atomic: false,
keyspace: Keyspace::Enable { keyspace_id: 0 },
};
let scan_args = ScanInnerArgs {
start_key: "k1".to_string().into(),
range: BoundRange::from("k1".to_owned().."k2".to_owned()),
limit: 10,
key_only: false,
reverse: false,
backoff: Backoff::no_backoff(),
};
let error_handler = |_, _, _| {
error_handler_flag.clone().store(false, Ordering::Relaxed);
Box::pin(async move {
let res: Result<bool> = Ok(true);
res
})
} as _;
client.retryable_scan(scan_args, error_handler).await?;
assert!(!error_handler_flag.load(Ordering::Relaxed));
Ok(())
}
} }