Reorganise futures imports

Signed-off-by: Nick Cameron <nrc@ncameron.org>
This commit is contained in:
Nick Cameron 2019-05-07 14:11:22 +12:00
parent 98108bf1fe
commit 18f538c008
9 changed files with 59 additions and 72 deletions

View File

@ -5,11 +5,7 @@
mod common; mod common;
use crate::common::parse_args; use crate::common::parse_args;
use futures::{ use futures::prelude::*;
future,
prelude::{StreamExt, TryStreamExt},
stream, TryFutureExt,
};
use std::ops::RangeBounds; use std::ops::RangeBounds;
use tikv_client::{ use tikv_client::{
transaction::{Client, IsolationLevel}, transaction::{Client, IsolationLevel},

View File

@ -3,8 +3,7 @@
//! This module contains utility types and functions for making the transition //! This module contains utility types and functions for making the transition
//! from futures 0.1 to 1.0 easier. //! from futures 0.1 to 1.0 easier.
use futures::prelude::{Future, Sink, Stream, StreamExt}; use futures::prelude::*;
use futures::stream::Fuse;
use futures::task::{Context, Poll}; use futures::task::{Context, Poll};
use futures::try_ready; use futures::try_ready;
use std::pin::Pin; use std::pin::Pin;
@ -85,7 +84,7 @@ where
St: Stream + Unpin, St: Stream + Unpin,
{ {
sink: Option<Si>, sink: Option<Si>,
stream: Option<Fuse<St>>, stream: Option<stream::Fuse<St>>,
buffered: Option<St::Item>, buffered: Option<St::Item>,
} }
@ -118,7 +117,7 @@ where
) )
} }
fn stream_mut(&mut self) -> Pin<&mut Fuse<St>> { fn stream_mut(&mut self) -> Pin<&mut stream::Fuse<St>> {
Pin::new( Pin::new(
self.stream self.stream
.as_mut() .as_mut()

View File

@ -10,7 +10,8 @@
//! **Warning:** It is not advisable to use both raw and transactional functionality in the same keyspace. //! **Warning:** It is not advisable to use both raw and transactional functionality in the same keyspace.
//! //!
use crate::{rpc::RpcClient, Config, Error, Key, KeyRange, KvPair, Result, Value}; use crate::{rpc::RpcClient, Config, Error, Key, KeyRange, KvPair, Result, Value};
use futures::{future, task::Context, Future, Poll}; use futures::prelude::*;
use futures::task::{Context, Poll};
use std::{fmt, ops::Bound, pin::Pin, sync::Arc, u32}; use std::{fmt, ops::Bound, pin::Pin, sync::Arc, u32};
const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240; const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240;

View File

@ -10,8 +10,7 @@ use std::{
time::Duration, time::Duration,
}; };
use futures::future::{self, ready, Either, Future}; use futures::prelude::*;
use futures::prelude::TryFutureExt;
use grpcio::{EnvBuilder, Environment}; use grpcio::{EnvBuilder, Environment};
use kvproto::kvrpcpb; use kvproto::kvrpcpb;
use log::*; use log::*;
@ -153,33 +152,31 @@ impl RpcClient {
let inner = self.inner(); let inner = self.inner();
loop_fn((0, tasks, result), move |(mut index, tasks, mut result)| { loop_fn((0, tasks, result), move |(mut index, tasks, mut result)| {
if index == tasks.len() { if index == tasks.len() {
Either::Left(future::ok(Loop::Break(result))) future::Either::Left(future::ok(Loop::Break(result)))
} else { } else {
let inner = Arc::clone(&inner); let inner = Arc::clone(&inner);
Either::Right( future::Either::Right(inner.locate_key(tasks[index].key()).map_ok(
inner move |location| {
.locate_key(tasks[index].key()) while let Some(item) = tasks.get(index) {
.map_ok(move |location| { if !location.contains(item.key()) {
while let Some(item) = tasks.get(index) { break;
if !location.contains(item.key()) {
break;
}
let ver_id = location.ver_id();
let item = item.clone();
if let Some(ref mut grouped) = result {
grouped.add(ver_id, item);
} else {
result = Some(GroupedTasks::new(ver_id, item));
}
index += 1;
} }
if index == tasks.len() { let ver_id = location.ver_id();
Loop::Break(result) let item = item.clone();
if let Some(ref mut grouped) = result {
grouped.add(ver_id, item);
} else { } else {
Loop::Continue((index, tasks, result)) result = Some(GroupedTasks::new(ver_id, item));
} }
}), index += 1;
) }
if index == tasks.len() {
Loop::Break(result)
} else {
Loop::Continue((index, tasks, result))
}
},
))
} }
}) })
.map_ok(|r| r.unwrap_or_default()) .map_ok(|r| r.unwrap_or_default())
@ -200,7 +197,7 @@ impl RpcClient {
store, store,
}) })
}) })
.and_then(move |region| ready(inner2.kv_client(region))) .and_then(move |region| future::ready(inner2.kv_client(region)))
} }
fn region_context_by_id( fn region_context_by_id(
@ -217,7 +214,7 @@ impl RpcClient {
.load_store(store_id) .load_store(store_id)
.map_ok(|store| RegionContext { region, store }) .map_ok(|store| RegionContext { region, store })
}) })
.and_then(move |region| ready(inner2.kv_client(region))) .and_then(move |region| future::ready(inner2.kv_client(region)))
} }
fn raw( fn raw(
@ -279,9 +276,9 @@ impl RpcClient {
cf: Option<ColumnFamily>, cf: Option<ColumnFamily>,
) -> impl Future<Output = Result<()>> { ) -> impl Future<Output = Result<()>> {
if value.is_empty() { if value.is_empty() {
Either::Left(future::err(Error::empty_value())) future::Either::Left(future::err(Error::empty_value()))
} else { } else {
Either::Right( future::Either::Right(
Self::raw(self.inner(), &key, cf) Self::raw(self.inner(), &key, cf)
.and_then(|context| context.client().raw_put(context, key, value)), .and_then(|context| context.client().raw_put(context, key, value)),
) )
@ -294,10 +291,10 @@ impl RpcClient {
cf: Option<ColumnFamily>, cf: Option<ColumnFamily>,
) -> impl Future<Output = Result<()>> { ) -> impl Future<Output = Result<()>> {
if pairs.iter().any(|p| p.value().is_empty()) { if pairs.iter().any(|p| p.value().is_empty()) {
Either::Left(future::err(Error::empty_value())) future::Either::Left(future::err(Error::empty_value()))
} else { } else {
let inner = self.inner(); let inner = self.inner();
Either::Right( future::Either::Right(
self.group_tasks_by_region(pairs) self.group_tasks_by_region(pairs)
.and_then(move |task_groups| { .and_then(move |task_groups| {
let mut tasks = Vec::with_capacity(task_groups.len()); let mut tasks = Vec::with_capacity(task_groups.len());

View File

@ -7,8 +7,7 @@ use std::{
}; };
use futures::compat::Compat01As03; use futures::compat::Compat01As03;
use futures::future::{ready, Future}; use futures::prelude::*;
use futures::prelude::{FutureExt, TryFutureExt};
use grpcio::{CallOption, Environment}; use grpcio::{CallOption, Environment};
use kvproto::{metapb, pdpb, pdpb::PdClient as RpcClient}; use kvproto::{metapb, pdpb, pdpb::PdClient as RpcClient};
@ -93,14 +92,14 @@ impl PdClient {
let region = if resp.has_region() { let region = if resp.has_region() {
resp.take_region() resp.take_region()
} else { } else {
return ready(Err(Error::region_for_key_not_found(key))); return future::ready(Err(Error::region_for_key_not_found(key)));
}; };
let leader = if resp.has_leader() { let leader = if resp.has_leader() {
Some(resp.take_leader()) Some(resp.take_leader())
} else { } else {
None None
}; };
ready(Ok((region, leader))) future::ready(Ok((region, leader)))
}) })
} }
@ -122,14 +121,14 @@ impl PdClient {
let region = if resp.has_region() { let region = if resp.has_region() {
resp.take_region() resp.take_region()
} else { } else {
return ready(Err(Error::region_not_found(region_id, None))); return future::ready(Err(Error::region_not_found(region_id, None)));
}; };
let leader = if resp.has_leader() { let leader = if resp.has_leader() {
Some(resp.take_leader()) Some(resp.take_leader())
} else { } else {
None None
}; };
ready(Ok((region, leader))) future::ready(Ok((region, leader)))
}) })
} }

View File

@ -7,21 +7,16 @@ use std::{
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use futures::prelude::{FutureExt, SinkExt, StreamExt, TryFutureExt}; use futures::channel::{
use futures::{ mpsc::{channel, unbounded, Receiver, Sender, UnboundedReceiver, UnboundedSender},
channel::{ oneshot,
mpsc::{channel, unbounded, Receiver, Sender, UnboundedReceiver, UnboundedSender},
oneshot,
},
compat::{Compat01As03, Compat01As03Sink},
future::ready,
stream::TryStreamExt,
Future,
}; };
use futures::compat::{Compat01As03, Compat01As03Sink};
use futures::prelude::*;
use grpcio::{CallOption, Environment, WriteFlags}; use grpcio::{CallOption, Environment, WriteFlags};
use kvproto::pdpb; use kvproto::pdpb;
use log::*; use log::*;
use tokio_core::reactor::{Core, Handle as OtherHandle}; use tokio_core::reactor::{Core, Handle as TokioHandle};
use crate::{ use crate::{
compat::SinkCompat, compat::SinkCompat,
@ -119,15 +114,15 @@ impl PdReactor {
let mut core = Core::new().unwrap(); let mut core = Core::new().unwrap();
let handle = core.handle(); let handle = core.handle();
{ {
let f = rx.take_while(|t| ready(t.is_some())).for_each(|t| { let f = rx.take_while(|t| future::ready(t.is_some())).for_each(|t| {
Self::dispatch(&client, t.unwrap(), &handle); Self::dispatch(&client, t.unwrap(), &handle);
ready(()) future::ready(())
}); });
core.run(TryFutureExt::compat(f.unit_error())).unwrap(); core.run(TryFutureExt::compat(f.unit_error())).unwrap();
} }
} }
fn init(client: &Arc<RwLock<LeaderClient>>, handle: &OtherHandle) { fn init(client: &Arc<RwLock<LeaderClient>>, handle: &TokioHandle) {
let client = Arc::clone(client); let client = Arc::clone(client);
let (tx, rx) = client.write().unwrap().client.tso().unwrap(); let (tx, rx) = client.write().unwrap().client.tso().unwrap();
let tx = Compat01As03Sink::new(tx); let tx = Compat01As03Sink::new(tx);
@ -164,7 +159,7 @@ impl PdReactor {
// Schedule another tso_batch of request // Schedule another tso_batch of request
reactor.schedule(PdTask::Request); reactor.schedule(PdTask::Request);
} }
ready(Ok(())) future::ready(Ok(()))
}) })
.map_err(|e| panic!("unexpected error: {:?}", e)) .map_err(|e| panic!("unexpected error: {:?}", e))
.compat(), .compat(),
@ -204,7 +199,7 @@ impl PdReactor {
client.write().unwrap().reactor.tso_buffer = Some(requests); client.write().unwrap().reactor.tso_buffer = Some(requests);
} }
fn dispatch(client: &Arc<RwLock<LeaderClient>>, task: PdTask, handle: &OtherHandle) { fn dispatch(client: &Arc<RwLock<LeaderClient>>, task: PdTask, handle: &TokioHandle) {
match task { match task {
PdTask::Request => Self::tso_request(client), PdTask::Request => Self::tso_request(client),
PdTask::Response(requests, response) => Self::tso_response(client, requests, &response), PdTask::Response(requests, response) => Self::tso_response(client, requests, &response),

View File

@ -7,8 +7,7 @@ use std::{
}; };
use futures::compat::Compat01As03; use futures::compat::Compat01As03;
use futures::future::{ok, ready, Either, Future, TryFutureExt}; use futures::prelude::*;
use futures::prelude::FutureExt;
use log::*; use log::*;
use tokio_timer::timer::Handle; use tokio_timer::timer::Handle;
@ -61,7 +60,7 @@ where
debug!("reconnect remains: {}", self.reconnect_count); debug!("reconnect remains: {}", self.reconnect_count);
if self.request_sent < MAX_REQUEST_COUNT { if self.request_sent < MAX_REQUEST_COUNT {
return Either::Left(ok(self)); return future::Either::Left(future::ok(self));
} }
// Updating client. // Updating client.
@ -71,9 +70,9 @@ where
match (self.reconnect)(&self.client, RECONNECT_INTERVAL_SEC) { match (self.reconnect)(&self.client, RECONNECT_INTERVAL_SEC) {
Ok(_) => { Ok(_) => {
self.request_sent = 0; self.request_sent = 0;
Either::Left(ok(self)) future::Either::Left(future::ok(self))
} }
Err(_) => Either::Right( Err(_) => future::Either::Right(
Compat01As03::new( Compat01As03::new(
self.timer self.timer
.delay(Instant::now() + Duration::from_secs(RECONNECT_INTERVAL_SEC)), .delay(Instant::now() + Duration::from_secs(RECONNECT_INTERVAL_SEC)),
@ -87,7 +86,7 @@ where
self.request_sent += 1; self.request_sent += 1;
debug!("request sent: {}", self.request_sent); debug!("request sent: {}", self.request_sent);
ok(self).and_then(|mut ctx| { future::ok(self).and_then(|mut ctx| {
let req = (ctx.func)(&ctx.client); let req = (ctx.func)(&ctx.client);
req.map(|resp| match resp { req.map(|resp| match resp {
Ok(resp) => { Ok(resp) => {
@ -128,6 +127,6 @@ where
.and_then(Self::send_and_receive) .and_then(Self::send_and_receive)
.map(Self::break_or_continue) .map(Self::break_or_continue)
}) })
.and_then(|r| ready(r.post_loop())) .and_then(|r| future::ready(r.post_loop()))
} }
} }

View File

@ -6,8 +6,7 @@
use std::{fmt, sync::Arc, time::Duration}; use std::{fmt, sync::Arc, time::Duration};
use futures::compat::Compat01As03; use futures::compat::Compat01As03;
use futures::future::Future; use futures::prelude::*;
use futures::prelude::{FutureExt, TryFutureExt};
use grpcio::{CallOption, Environment}; use grpcio::{CallOption, Environment};
use kvproto::{errorpb, kvrpcpb, tikvpb::TikvClient}; use kvproto::{errorpb, kvrpcpb, tikvpb::TikvClient};

View File

@ -10,7 +10,9 @@
//! //!
use crate::{Config, Error, Key, KvPair, Value}; use crate::{Config, Error, Key, KvPair, Value};
use futures::{task::Context, Future, Poll, Stream};
use futures::prelude::*;
use futures::task::{Context, Poll};
use std::ops::RangeBounds; use std::ops::RangeBounds;
use std::pin::Pin; use std::pin::Pin;