diff --git a/src/raw/requests.rs b/src/raw/requests.rs index 2f77422..681fca7 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -10,11 +10,10 @@ use crate::{ transaction::HasLocks, BoundRange, ColumnFamily, Key, KvPair, Result, Value, }; -use tikv_client_store::{KvClient, RpcFnType, Store}; - use futures::{future::BoxFuture, prelude::*, stream::BoxStream}; use kvproto::{kvrpcpb, tikvpb::TikvClient}; use std::{mem, sync::Arc}; +use tikv_client_store::{KvClient, RpcFnType, Store}; impl KvRequest for kvrpcpb::RawGetRequest { type Result = Option; @@ -331,10 +330,7 @@ impl KvRequest for kvrpcpb::RawDeleteRangeRequest { fn reduce( results: BoxStream<'static, Result>, ) -> BoxFuture<'static, Result> { - results - .into_future() - .map(|(f, _)| f.expect("no results should be impossible")) - .boxed() + results.try_for_each(|_| future::ready(Ok(()))).boxed() } } diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index c826ccc..08d0663 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -346,10 +346,7 @@ impl KvRequest for kvrpcpb::PrewriteRequest { fn reduce( results: BoxStream<'static, Result>, ) -> BoxFuture<'static, Result> { - results - .into_future() - .map(|(f, _)| f.expect("no results should be impossible")) - .boxed() + results.try_for_each(|_| future::ready(Ok(()))).boxed() } } @@ -408,10 +405,7 @@ impl KvRequest for kvrpcpb::CommitRequest { fn reduce( results: BoxStream<'static, Result>, ) -> BoxFuture<'static, Result> { - results - .into_future() - .map(|(f, _)| f.expect("no results should be impossible")) - .boxed() + results.try_for_each(|_| future::ready(Ok(()))).boxed() } } @@ -456,10 +450,7 @@ impl KvRequest for kvrpcpb::BatchRollbackRequest { fn reduce( results: BoxStream<'static, Result>, ) -> BoxFuture<'static, Result> { - results - .into_future() - .map(|(f, _)| f.expect("no results should be impossible")) - .boxed() + results.try_for_each(|_| future::ready(Ok(()))).boxed() } }