From 8bedd9d38ab545138313d4c0c97441f1fa74036d Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Tue, 8 May 2018 16:54:20 -0700 Subject: [PATCH] rfc: Make DestinationServiceQuery generic (#749) The goals of this change are: 1. Reduce the size/complexity of `control::discovery` in order to ease code reviews. 2. Extract a reusable grpc streaming utility. There are no intended functional changes. `control::discovery::DestinationServiceQuery` is used to track the state of a request (and streaming response) to the destination service. Very little of this logic is specific to the destination service. The `DestinationServiceQuery` and associated `UpdateRx` type have been moved to a new module, `control::remote_stream`, as `Remote` and `Receiver`, respectively. Both of these types are generic over the gRPC message type, so it will be possible to use this utility with additional API endpoints. The `Receiver::poll` implementation has been simplified to be more idiomatic with the rest of our code (namely, using `try_ready!`). --- proxy/src/control/discovery.rs | 108 ++++++----------------------- proxy/src/control/mod.rs | 1 + proxy/src/control/remote_stream.rs | 86 +++++++++++++++++++++++ 3 files changed, 108 insertions(+), 87 deletions(-) create mode 100644 proxy/src/control/remote_stream.rs diff --git a/proxy/src/control/discovery.rs b/proxy/src/control/discovery.rs index a5bc5d975..841b37c34 100644 --- a/proxy/src/control/discovery.rs +++ b/proxy/src/control/discovery.rs @@ -28,6 +28,7 @@ use conduit_proxy_controller_grpc::destination::client::{Destination as Destinat use transport::DnsNameAndPort; use control::cache::{Cache, CacheChange, Exists}; +use control::remote_stream::{Remote, Receiver}; use ::telemetry::metrics::DstLabels; @@ -45,6 +46,9 @@ pub struct Endpoint { pub type DstLabelsWatch = futures_watch::Watch>; +type DestinationServiceQuery = Remote; +type UpdateRx = Receiver; + /// A `tower_discover::Discover`, given to a `tower_balance::Balance`. #[derive(Debug)] pub struct Watch { @@ -95,44 +99,6 @@ struct DestinationSet> { txs: Vec>, } -enum DestinationServiceQuery> { - NeedsReconnect, - ConnectedOrConnecting { - rx: UpdateRx - }, -} - -/// Receiver for destination set updates. -/// -/// The destination RPC returns a `ResponseFuture` whose item is a -/// `Response`, so this type holds the state of that RPC call --- -/// either we're waiting for the future, or we have a stream --- and allows -/// us to implement `Stream` regardless of whether the RPC has returned yet -/// or not. -/// -/// Polling an `UpdateRx` polls the wrapped future while we are -/// `Waiting`, and the `Stream` if we are `Streaming`. If the future is `Ready`, -/// then we switch states to `Streaming`. -enum UpdateRx> { - Waiting(UpdateRsp), - Streaming(grpc::Streaming), -} - -type UpdateRsp = - grpc::client::server_streaming::ResponseFuture; - -/// Wraps the error types returned by `UpdateRx` polls. -/// -/// An `UpdateRx` error is either the error type of the `Future` in the -/// `UpdateRx::Waiting` state, or the `Stream` in the `UpdateRx::Streaming` -/// state. -// TODO: impl Error? -#[derive(Debug)] -enum RxError { - Future(grpc::Error), - Stream(grpc::Error), -} - #[derive(Debug, Clone)] enum Update { Insert(SocketAddr, Metadata), @@ -404,12 +370,11 @@ where set.txs.push(tx); } Entry::Vacant(vac) => { - let query = - DestinationServiceQuery::connect_maybe( - &self.default_destination_namespace, - client, - vac.key(), - "connect"); + let query = Self::query_destination_service_if_relevant( + &self.default_destination_namespace, + client, + vac.key(), + "connect"); let mut set = DestinationSet { addrs: Exists::Unknown, query, @@ -445,7 +410,7 @@ where while let Some(auth) = self.reconnects.pop_front() { if let Some(set) = self.destinations.get_mut(&auth) { - set.query = DestinationServiceQuery::connect_maybe( + set.query = Self::query_destination_service_if_relevant( &self.default_destination_namespace, client, &auth, @@ -462,10 +427,10 @@ where for (auth, set) in &mut self.destinations { // Query the Destination service first. let (new_query, found_by_destination_service) = match set.query.take() { - Some(DestinationServiceQuery::ConnectedOrConnecting{ rx }) => { + Some(Remote::ConnectedOrConnecting{ rx }) => { let (new_query, found_by_destination_service) = set.poll_destination_service(auth, rx); - if let DestinationServiceQuery::NeedsReconnect = new_query { + if let Remote::NeedsReconnect = new_query { set.reset_on_next_modification(); self.reconnects.push_back(auth.clone()); } @@ -500,21 +465,16 @@ where set.poll_dns(&self.dns_resolver, auth); } } -} - -// ===== impl DestinationServiceQuery ===== - -impl> DestinationServiceQuery { - // Initiates a query `query` to the Destination service and returns it as `Some(query)` if the - // given authority's host is of a form suitable for using to query the Destination service. - // Otherwise, returns `None`. - fn connect_maybe( + /// Initiates a query `query` to the Destination service and returns it as + /// `Some(query)` if the given authority's host is of a form suitable for using to + /// query the Destination service. Otherwise, returns `None`. + fn query_destination_service_if_relevant( default_destination_namespace: &str, client: &mut T, auth: &DnsNameAndPort, connect_or_reconnect: &str) - -> Option + -> Option> { trace!("DestinationServiceQuery {} {:?}", connect_or_reconnect, auth); FullyQualifiedAuthority::normalize(auth, default_destination_namespace) @@ -523,10 +483,9 @@ impl> Destination scheme: "k8s".into(), path: auth.without_trailing_dot().to_owned(), }; - // TODO: Can grpc::Request::new be removed? let mut svc = DestinationSvc::new(client.lift_ref()); let response = svc.get(grpc::Request::new(req)); - DestinationServiceQuery::ConnectedOrConnecting { rx: UpdateRx::Waiting(response) } + Remote::ConnectedOrConnecting { rx: Receiver::new(response) } }) } } @@ -591,14 +550,14 @@ impl DestinationSet "Destination.Get stream ended for {:?}, must reconnect", auth ); - return (DestinationServiceQuery::NeedsReconnect, exists); + return (Remote::NeedsReconnect, exists); }, Ok(Async::NotReady) => { - return (DestinationServiceQuery::ConnectedOrConnecting { rx }, exists); + return (Remote::ConnectedOrConnecting { rx }, exists); }, Err(err) => { warn!("Destination.Get stream errored for {:?}: {:?}", auth, err); - return (DestinationServiceQuery::NeedsReconnect, exists); + return (Remote::NeedsReconnect, exists); } }; } @@ -712,31 +671,6 @@ impl > DestinationSet { } } -// ===== impl UpdateRx ===== - -impl Stream for UpdateRx -where T: HttpService, - T::Error: fmt::Debug, -{ - type Item = PbUpdate; - type Error = RxError; - - fn poll(&mut self) -> Poll, Self::Error> { - // this is not ideal. - let stream = match *self { - UpdateRx::Waiting(ref mut future) => match future.poll() { - Ok(Async::Ready(response)) => response.into_inner(), - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(e) => return Err(RxError::Future(e)), - }, - UpdateRx::Streaming(ref mut stream) => - return stream.poll().map_err(RxError::Stream), - }; - *self = UpdateRx::Streaming(stream); - self.poll() - } -} - // ===== impl Metadata ===== impl Metadata { diff --git a/proxy/src/control/mod.rs b/proxy/src/control/mod.rs index c3c570d2c..6fd972597 100644 --- a/proxy/src/control/mod.rs +++ b/proxy/src/control/mod.rs @@ -25,6 +25,7 @@ pub mod discovery; mod fully_qualified_authority; mod observe; pub mod pb; +mod remote_stream; use self::discovery::{Background as DiscoBg, Discovery, Watch}; pub use self::discovery::Bind; diff --git a/proxy/src/control/remote_stream.rs b/proxy/src/control/remote_stream.rs new file mode 100644 index 000000000..b2d483315 --- /dev/null +++ b/proxy/src/control/remote_stream.rs @@ -0,0 +1,86 @@ +use futures::{Future, Poll, Stream}; +use http::HeaderMap; +use prost::Message; +use std::fmt; +use tower_h2::{HttpService, Body, Data}; +use tower_grpc::{ + self as grpc, + Streaming, + client::server_streaming::ResponseFuture, +}; + +/// Tracks the state of a gRPC response stream from a remote. +/// +/// A remote may hold a `Receiver` that can be used to read `M`-typed messages from the +/// remote stream. +pub enum Remote { + NeedsReconnect, + ConnectedOrConnecting { + rx: Receiver + }, +} + +/// Receives streaming RPCs updates. +/// +/// Streaming gRPC endpoints return a `ResponseFuture` whose item is a `Response`. +/// A `Receiver` holds the state of that RPC call, exposing a `Stream` that drives both +/// the gRPC response and its streaming body. +pub struct Receiver(Rx); + +enum Rx { + Waiting(ResponseFuture), + Streaming(Streaming), +} + +// ===== impl Receiver ===== + +impl Receiver +where + S::ResponseBody: Body, + S::Error: fmt::Debug, +{ + pub fn new(future: ResponseFuture) -> Self { + Receiver(Rx::Waiting(future)) + } + + // Coerces the stream's Error<()> to an Error. + fn coerce_stream_err(e: grpc::Error<()>) -> grpc::Error { + match e { + grpc::Error::Grpc(s, h) => grpc::Error::Grpc(s, h), + grpc::Error::Decode(e) => grpc::Error::Decode(e), + grpc::Error::Protocol(e) => grpc::Error::Protocol(e), + grpc::Error::Inner(()) => { + // `stream.poll` shouldn't return this variant. If it for + // some reason does, we report this as an unknown error. + warn!("unexpected gRPC stream error"); + debug_assert!(false); + grpc::Error::Grpc(grpc::Status::UNKNOWN, HeaderMap::new()) + } + } + } +} + +impl Stream for Receiver +where + S::ResponseBody: Body, + S::Error: fmt::Debug, +{ + type Item = M; + type Error = grpc::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + loop { + let stream = match self.0 { + Rx::Waiting(ref mut future) => { + try_ready!(future.poll()).into_inner() + } + + Rx::Streaming(ref mut stream) => { + return stream.poll().map_err(Self::coerce_stream_err); + } + }; + + self.0 = Rx::Streaming(stream); + } + } +}