rfc: Make DestinationServiceQuery generic (#749)

The goals of this change are:
1. Reduce the size/complexity of `control::discovery` in order to ease code reviews.
2. Extract a reusable grpc streaming utility.

There are no intended functional changes.

`control::discovery::DestinationServiceQuery` is used to track the state of a request (and
streaming response) to the destination service. Very little of this logic is specific to
the destination service.

The `DestinationServiceQuery` and associated `UpdateRx` type have been moved to a new
module, `control::remote_stream`, as `Remote` and `Receiver`, respectively. Both of these
types are generic over the gRPC message type, so it will be possible to use this utility
with additional API endpoints.

The `Receiver::poll` implementation has been simplified to be more idiomatic with the rest
of our code (namely, using `try_ready!`).
This commit is contained in:
Oliver Gould 2018-05-08 16:54:20 -07:00 committed by GitHub
parent 1275b1ae89
commit 8bedd9d38a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 108 additions and 87 deletions

View File

@ -28,6 +28,7 @@ use conduit_proxy_controller_grpc::destination::client::{Destination as Destinat
use transport::DnsNameAndPort;
use control::cache::{Cache, CacheChange, Exists};
use control::remote_stream::{Remote, Receiver};
use ::telemetry::metrics::DstLabels;
@ -45,6 +46,9 @@ pub struct Endpoint {
pub type DstLabelsWatch = futures_watch::Watch<Option<DstLabels>>;
type DestinationServiceQuery<T> = Remote<PbUpdate, T>;
type UpdateRx<T> = Receiver<PbUpdate, T>;
/// A `tower_discover::Discover`, given to a `tower_balance::Balance`.
#[derive(Debug)]
pub struct Watch<B> {
@ -95,44 +99,6 @@ struct DestinationSet<T: HttpService<ResponseBody = RecvBody>> {
txs: Vec<mpsc::UnboundedSender<Update>>,
}
enum DestinationServiceQuery<T: HttpService<ResponseBody = RecvBody>> {
NeedsReconnect,
ConnectedOrConnecting {
rx: UpdateRx<T>
},
}
/// Receiver for destination set updates.
///
/// The destination RPC returns a `ResponseFuture` whose item is a
/// `Response<Stream>`, so this type holds the state of that RPC call ---
/// either we're waiting for the future, or we have a stream --- and allows
/// us to implement `Stream` regardless of whether the RPC has returned yet
/// or not.
///
/// Polling an `UpdateRx` polls the wrapped future while we are
/// `Waiting`, and the `Stream` if we are `Streaming`. If the future is `Ready`,
/// then we switch states to `Streaming`.
enum UpdateRx<T: HttpService<ResponseBody = RecvBody>> {
Waiting(UpdateRsp<T::Future>),
Streaming(grpc::Streaming<PbUpdate, T::ResponseBody>),
}
type UpdateRsp<F> =
grpc::client::server_streaming::ResponseFuture<PbUpdate, F>;
/// Wraps the error types returned by `UpdateRx` polls.
///
/// An `UpdateRx` error is either the error type of the `Future` in the
/// `UpdateRx::Waiting` state, or the `Stream` in the `UpdateRx::Streaming`
/// state.
// TODO: impl Error?
#[derive(Debug)]
enum RxError<T> {
Future(grpc::Error<T>),
Stream(grpc::Error),
}
#[derive(Debug, Clone)]
enum Update {
Insert(SocketAddr, Metadata),
@ -404,12 +370,11 @@ where
set.txs.push(tx);
}
Entry::Vacant(vac) => {
let query =
DestinationServiceQuery::connect_maybe(
&self.default_destination_namespace,
client,
vac.key(),
"connect");
let query = Self::query_destination_service_if_relevant(
&self.default_destination_namespace,
client,
vac.key(),
"connect");
let mut set = DestinationSet {
addrs: Exists::Unknown,
query,
@ -445,7 +410,7 @@ where
while let Some(auth) = self.reconnects.pop_front() {
if let Some(set) = self.destinations.get_mut(&auth) {
set.query = DestinationServiceQuery::connect_maybe(
set.query = Self::query_destination_service_if_relevant(
&self.default_destination_namespace,
client,
&auth,
@ -462,10 +427,10 @@ where
for (auth, set) in &mut self.destinations {
// Query the Destination service first.
let (new_query, found_by_destination_service) = match set.query.take() {
Some(DestinationServiceQuery::ConnectedOrConnecting{ rx }) => {
Some(Remote::ConnectedOrConnecting{ rx }) => {
let (new_query, found_by_destination_service) =
set.poll_destination_service(auth, rx);
if let DestinationServiceQuery::NeedsReconnect = new_query {
if let Remote::NeedsReconnect = new_query {
set.reset_on_next_modification();
self.reconnects.push_back(auth.clone());
}
@ -500,21 +465,16 @@ where
set.poll_dns(&self.dns_resolver, auth);
}
}
}
// ===== impl DestinationServiceQuery =====
impl<T: HttpService<RequestBody = BoxBody, ResponseBody = RecvBody>> DestinationServiceQuery<T> {
// Initiates a query `query` to the Destination service and returns it as `Some(query)` if the
// given authority's host is of a form suitable for using to query the Destination service.
// Otherwise, returns `None`.
fn connect_maybe(
/// Initiates a query `query` to the Destination service and returns it as
/// `Some(query)` if the given authority's host is of a form suitable for using to
/// query the Destination service. Otherwise, returns `None`.
fn query_destination_service_if_relevant(
default_destination_namespace: &str,
client: &mut T,
auth: &DnsNameAndPort,
connect_or_reconnect: &str)
-> Option<Self>
-> Option<DestinationServiceQuery<T>>
{
trace!("DestinationServiceQuery {} {:?}", connect_or_reconnect, auth);
FullyQualifiedAuthority::normalize(auth, default_destination_namespace)
@ -523,10 +483,9 @@ impl<T: HttpService<RequestBody = BoxBody, ResponseBody = RecvBody>> Destination
scheme: "k8s".into(),
path: auth.without_trailing_dot().to_owned(),
};
// TODO: Can grpc::Request::new be removed?
let mut svc = DestinationSvc::new(client.lift_ref());
let response = svc.get(grpc::Request::new(req));
DestinationServiceQuery::ConnectedOrConnecting { rx: UpdateRx::Waiting(response) }
Remote::ConnectedOrConnecting { rx: Receiver::new(response) }
})
}
}
@ -591,14 +550,14 @@ impl<T> DestinationSet<T>
"Destination.Get stream ended for {:?}, must reconnect",
auth
);
return (DestinationServiceQuery::NeedsReconnect, exists);
return (Remote::NeedsReconnect, exists);
},
Ok(Async::NotReady) => {
return (DestinationServiceQuery::ConnectedOrConnecting { rx }, exists);
return (Remote::ConnectedOrConnecting { rx }, exists);
},
Err(err) => {
warn!("Destination.Get stream errored for {:?}: {:?}", auth, err);
return (DestinationServiceQuery::NeedsReconnect, exists);
return (Remote::NeedsReconnect, exists);
}
};
}
@ -712,31 +671,6 @@ impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
}
}
// ===== impl UpdateRx =====
impl<T> Stream for UpdateRx<T>
where T: HttpService<RequestBody = BoxBody, ResponseBody = RecvBody>,
T::Error: fmt::Debug,
{
type Item = PbUpdate;
type Error = RxError<T::Error>;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// this is not ideal.
let stream = match *self {
UpdateRx::Waiting(ref mut future) => match future.poll() {
Ok(Async::Ready(response)) => response.into_inner(),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(e) => return Err(RxError::Future(e)),
},
UpdateRx::Streaming(ref mut stream) =>
return stream.poll().map_err(RxError::Stream),
};
*self = UpdateRx::Streaming(stream);
self.poll()
}
}
// ===== impl Metadata =====
impl Metadata {

View File

@ -25,6 +25,7 @@ pub mod discovery;
mod fully_qualified_authority;
mod observe;
pub mod pb;
mod remote_stream;
use self::discovery::{Background as DiscoBg, Discovery, Watch};
pub use self::discovery::Bind;

View File

@ -0,0 +1,86 @@
use futures::{Future, Poll, Stream};
use http::HeaderMap;
use prost::Message;
use std::fmt;
use tower_h2::{HttpService, Body, Data};
use tower_grpc::{
self as grpc,
Streaming,
client::server_streaming::ResponseFuture,
};
/// Tracks the state of a gRPC response stream from a remote.
///
/// A remote may hold a `Receiver` that can be used to read `M`-typed messages from the
/// remote stream.
pub enum Remote<M, S: HttpService> {
NeedsReconnect,
ConnectedOrConnecting {
rx: Receiver<M, S>
},
}
/// Receives streaming RPCs updates.
///
/// Streaming gRPC endpoints return a `ResponseFuture` whose item is a `Response<Stream>`.
/// A `Receiver` holds the state of that RPC call, exposing a `Stream` that drives both
/// the gRPC response and its streaming body.
pub struct Receiver<M, S: HttpService>(Rx<M, S>);
enum Rx<M, S: HttpService> {
Waiting(ResponseFuture<M, S::Future>),
Streaming(Streaming<M, S::ResponseBody>),
}
// ===== impl Receiver =====
impl<M: Message + Default, S: HttpService> Receiver<M, S>
where
S::ResponseBody: Body<Data = Data>,
S::Error: fmt::Debug,
{
pub fn new(future: ResponseFuture<M, S::Future>) -> Self {
Receiver(Rx::Waiting(future))
}
// Coerces the stream's Error<()> to an Error<S::Error>.
fn coerce_stream_err(e: grpc::Error<()>) -> grpc::Error<S::Error> {
match e {
grpc::Error::Grpc(s, h) => grpc::Error::Grpc(s, h),
grpc::Error::Decode(e) => grpc::Error::Decode(e),
grpc::Error::Protocol(e) => grpc::Error::Protocol(e),
grpc::Error::Inner(()) => {
// `stream.poll` shouldn't return this variant. If it for
// some reason does, we report this as an unknown error.
warn!("unexpected gRPC stream error");
debug_assert!(false);
grpc::Error::Grpc(grpc::Status::UNKNOWN, HeaderMap::new())
}
}
}
}
impl<M: Message + Default, S: HttpService> Stream for Receiver<M, S>
where
S::ResponseBody: Body<Data = Data>,
S::Error: fmt::Debug,
{
type Item = M;
type Error = grpc::Error<S::Error>;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
let stream = match self.0 {
Rx::Waiting(ref mut future) => {
try_ready!(future.poll()).into_inner()
}
Rx::Streaming(ref mut stream) => {
return stream.poll().map_err(Self::coerce_stream_err);
}
};
self.0 = Rx::Streaming(stream);
}
}
}