diff --git a/examples/transaction.rs b/examples/transaction.rs index 3d258fa..72aaf7a 100644 --- a/examples/transaction.rs +++ b/examples/transaction.rs @@ -5,11 +5,7 @@ mod common; use crate::common::parse_args; -use futures::{ - future, - prelude::{StreamExt, TryStreamExt}, - stream, TryFutureExt, -}; +use futures::prelude::*; use std::ops::RangeBounds; use tikv_client::{ transaction::{Client, IsolationLevel}, diff --git a/src/compat.rs b/src/compat.rs index 1b4e2ae..e7b3d09 100644 --- a/src/compat.rs +++ b/src/compat.rs @@ -3,8 +3,7 @@ //! This module contains utility types and functions for making the transition //! from futures 0.1 to 1.0 easier. -use futures::prelude::{Future, Sink, Stream, StreamExt}; -use futures::stream::Fuse; +use futures::prelude::*; use futures::task::{Context, Poll}; use futures::try_ready; use std::pin::Pin; @@ -85,7 +84,7 @@ where St: Stream + Unpin, { sink: Option, - stream: Option>, + stream: Option>, buffered: Option, } @@ -118,7 +117,7 @@ where ) } - fn stream_mut(&mut self) -> Pin<&mut Fuse> { + fn stream_mut(&mut self) -> Pin<&mut stream::Fuse> { Pin::new( self.stream .as_mut() diff --git a/src/raw.rs b/src/raw.rs index 9806249..e7c06f7 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -10,7 +10,8 @@ //! **Warning:** It is not advisable to use both raw and transactional functionality in the same keyspace. //! use crate::{rpc::RpcClient, Config, Error, Key, KeyRange, KvPair, Result, Value}; -use futures::{future, task::Context, Future, Poll}; +use futures::prelude::*; +use futures::task::{Context, Poll}; use std::{fmt, ops::Bound, pin::Pin, sync::Arc, u32}; const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240; diff --git a/src/rpc/client.rs b/src/rpc/client.rs index 73ae8ca..686d433 100644 --- a/src/rpc/client.rs +++ b/src/rpc/client.rs @@ -10,8 +10,7 @@ use std::{ time::Duration, }; -use futures::future::{self, ready, Either, Future}; -use futures::prelude::TryFutureExt; +use futures::prelude::*; use grpcio::{EnvBuilder, Environment}; use kvproto::kvrpcpb; use log::*; @@ -153,33 +152,31 @@ impl RpcClient { let inner = self.inner(); loop_fn((0, tasks, result), move |(mut index, tasks, mut result)| { if index == tasks.len() { - Either::Left(future::ok(Loop::Break(result))) + future::Either::Left(future::ok(Loop::Break(result))) } else { let inner = Arc::clone(&inner); - Either::Right( - inner - .locate_key(tasks[index].key()) - .map_ok(move |location| { - while let Some(item) = tasks.get(index) { - if !location.contains(item.key()) { - break; - } - let ver_id = location.ver_id(); - let item = item.clone(); - if let Some(ref mut grouped) = result { - grouped.add(ver_id, item); - } else { - result = Some(GroupedTasks::new(ver_id, item)); - } - index += 1; + future::Either::Right(inner.locate_key(tasks[index].key()).map_ok( + move |location| { + while let Some(item) = tasks.get(index) { + if !location.contains(item.key()) { + break; } - if index == tasks.len() { - Loop::Break(result) + let ver_id = location.ver_id(); + let item = item.clone(); + if let Some(ref mut grouped) = result { + grouped.add(ver_id, item); } else { - Loop::Continue((index, tasks, result)) + result = Some(GroupedTasks::new(ver_id, item)); } - }), - ) + index += 1; + } + if index == tasks.len() { + Loop::Break(result) + } else { + Loop::Continue((index, tasks, result)) + } + }, + )) } }) .map_ok(|r| r.unwrap_or_default()) @@ -200,7 +197,7 @@ impl RpcClient { store, }) }) - .and_then(move |region| ready(inner2.kv_client(region))) + .and_then(move |region| future::ready(inner2.kv_client(region))) } fn region_context_by_id( @@ -217,7 +214,7 @@ impl RpcClient { .load_store(store_id) .map_ok(|store| RegionContext { region, store }) }) - .and_then(move |region| ready(inner2.kv_client(region))) + .and_then(move |region| future::ready(inner2.kv_client(region))) } fn raw( @@ -279,9 +276,9 @@ impl RpcClient { cf: Option, ) -> impl Future> { if value.is_empty() { - Either::Left(future::err(Error::empty_value())) + future::Either::Left(future::err(Error::empty_value())) } else { - Either::Right( + future::Either::Right( Self::raw(self.inner(), &key, cf) .and_then(|context| context.client().raw_put(context, key, value)), ) @@ -294,10 +291,10 @@ impl RpcClient { cf: Option, ) -> impl Future> { if pairs.iter().any(|p| p.value().is_empty()) { - Either::Left(future::err(Error::empty_value())) + future::Either::Left(future::err(Error::empty_value())) } else { let inner = self.inner(); - Either::Right( + future::Either::Right( self.group_tasks_by_region(pairs) .and_then(move |task_groups| { let mut tasks = Vec::with_capacity(task_groups.len()); diff --git a/src/rpc/pd/client.rs b/src/rpc/pd/client.rs index cb4842a..737871c 100644 --- a/src/rpc/pd/client.rs +++ b/src/rpc/pd/client.rs @@ -7,8 +7,7 @@ use std::{ }; use futures::compat::Compat01As03; -use futures::future::{ready, Future}; -use futures::prelude::{FutureExt, TryFutureExt}; +use futures::prelude::*; use grpcio::{CallOption, Environment}; use kvproto::{metapb, pdpb, pdpb::PdClient as RpcClient}; @@ -93,14 +92,14 @@ impl PdClient { let region = if resp.has_region() { resp.take_region() } else { - return ready(Err(Error::region_for_key_not_found(key))); + return future::ready(Err(Error::region_for_key_not_found(key))); }; let leader = if resp.has_leader() { Some(resp.take_leader()) } else { None }; - ready(Ok((region, leader))) + future::ready(Ok((region, leader))) }) } @@ -122,14 +121,14 @@ impl PdClient { let region = if resp.has_region() { resp.take_region() } else { - return ready(Err(Error::region_not_found(region_id, None))); + return future::ready(Err(Error::region_not_found(region_id, None))); }; let leader = if resp.has_leader() { Some(resp.take_leader()) } else { None }; - ready(Ok((region, leader))) + future::ready(Ok((region, leader))) }) } diff --git a/src/rpc/pd/leader.rs b/src/rpc/pd/leader.rs index 4778ad7..eaf2949 100644 --- a/src/rpc/pd/leader.rs +++ b/src/rpc/pd/leader.rs @@ -7,21 +7,16 @@ use std::{ time::{Duration, Instant}, }; -use futures::prelude::{FutureExt, SinkExt, StreamExt, TryFutureExt}; -use futures::{ - channel::{ - mpsc::{channel, unbounded, Receiver, Sender, UnboundedReceiver, UnboundedSender}, - oneshot, - }, - compat::{Compat01As03, Compat01As03Sink}, - future::ready, - stream::TryStreamExt, - Future, +use futures::channel::{ + mpsc::{channel, unbounded, Receiver, Sender, UnboundedReceiver, UnboundedSender}, + oneshot, }; +use futures::compat::{Compat01As03, Compat01As03Sink}; +use futures::prelude::*; use grpcio::{CallOption, Environment, WriteFlags}; use kvproto::pdpb; use log::*; -use tokio_core::reactor::{Core, Handle as OtherHandle}; +use tokio_core::reactor::{Core, Handle as TokioHandle}; use crate::{ compat::SinkCompat, @@ -119,15 +114,15 @@ impl PdReactor { let mut core = Core::new().unwrap(); let handle = core.handle(); { - let f = rx.take_while(|t| ready(t.is_some())).for_each(|t| { + let f = rx.take_while(|t| future::ready(t.is_some())).for_each(|t| { Self::dispatch(&client, t.unwrap(), &handle); - ready(()) + future::ready(()) }); core.run(TryFutureExt::compat(f.unit_error())).unwrap(); } } - fn init(client: &Arc>, handle: &OtherHandle) { + fn init(client: &Arc>, handle: &TokioHandle) { let client = Arc::clone(client); let (tx, rx) = client.write().unwrap().client.tso().unwrap(); let tx = Compat01As03Sink::new(tx); @@ -164,7 +159,7 @@ impl PdReactor { // Schedule another tso_batch of request reactor.schedule(PdTask::Request); } - ready(Ok(())) + future::ready(Ok(())) }) .map_err(|e| panic!("unexpected error: {:?}", e)) .compat(), @@ -204,7 +199,7 @@ impl PdReactor { client.write().unwrap().reactor.tso_buffer = Some(requests); } - fn dispatch(client: &Arc>, task: PdTask, handle: &OtherHandle) { + fn dispatch(client: &Arc>, task: PdTask, handle: &TokioHandle) { match task { PdTask::Request => Self::tso_request(client), PdTask::Response(requests, response) => Self::tso_response(client, requests, &response), diff --git a/src/rpc/pd/request.rs b/src/rpc/pd/request.rs index 0fe9575..68a2e43 100644 --- a/src/rpc/pd/request.rs +++ b/src/rpc/pd/request.rs @@ -7,8 +7,7 @@ use std::{ }; use futures::compat::Compat01As03; -use futures::future::{ok, ready, Either, Future, TryFutureExt}; -use futures::prelude::FutureExt; +use futures::prelude::*; use log::*; use tokio_timer::timer::Handle; @@ -61,7 +60,7 @@ where debug!("reconnect remains: {}", self.reconnect_count); if self.request_sent < MAX_REQUEST_COUNT { - return Either::Left(ok(self)); + return future::Either::Left(future::ok(self)); } // Updating client. @@ -71,9 +70,9 @@ where match (self.reconnect)(&self.client, RECONNECT_INTERVAL_SEC) { Ok(_) => { self.request_sent = 0; - Either::Left(ok(self)) + future::Either::Left(future::ok(self)) } - Err(_) => Either::Right( + Err(_) => future::Either::Right( Compat01As03::new( self.timer .delay(Instant::now() + Duration::from_secs(RECONNECT_INTERVAL_SEC)), @@ -87,7 +86,7 @@ where self.request_sent += 1; debug!("request sent: {}", self.request_sent); - ok(self).and_then(|mut ctx| { + future::ok(self).and_then(|mut ctx| { let req = (ctx.func)(&ctx.client); req.map(|resp| match resp { Ok(resp) => { @@ -128,6 +127,6 @@ where .and_then(Self::send_and_receive) .map(Self::break_or_continue) }) - .and_then(|r| ready(r.post_loop())) + .and_then(|r| future::ready(r.post_loop())) } } diff --git a/src/rpc/tikv/client.rs b/src/rpc/tikv/client.rs index 49b1ea7..e3f4fc1 100644 --- a/src/rpc/tikv/client.rs +++ b/src/rpc/tikv/client.rs @@ -6,8 +6,7 @@ use std::{fmt, sync::Arc, time::Duration}; use futures::compat::Compat01As03; -use futures::future::Future; -use futures::prelude::{FutureExt, TryFutureExt}; +use futures::prelude::*; use grpcio::{CallOption, Environment}; use kvproto::{errorpb, kvrpcpb, tikvpb::TikvClient}; diff --git a/src/transaction.rs b/src/transaction.rs index c5ab9e7..1d0c986 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -10,7 +10,9 @@ //! use crate::{Config, Error, Key, KvPair, Value}; -use futures::{task::Context, Future, Poll, Stream}; + +use futures::prelude::*; +use futures::task::{Context, Poll}; use std::ops::RangeBounds; use std::pin::Pin;