Remove redundant reconnect logic (#87)

`bind::BoundService` wraps a `Reconnect` service and handles its Connect
errors. However, `BoundService` exposes `Reconnect`'s Error type to
callers even though these errors can never be returned.

Furthermore, `Reconnect` is allowed be polled after returning an error,
triggering the inner service to be rebuilt. We needlessly duplicate this
logic in `BoundService`.

Before splitting this file up into smaller chunks, let's update
`BoundService` to more narrowly adhere to `Reconnect`s API:

- Only the inner error type is returned. `unreachable!` assertions
  have been made where error variants cannot be returned.
- Do not "rebind" the stack explicitly. Instead, let `Reconnect` do
  this.
- Now BoundService::call may panic if invoked before poll_ready. It's a
  programming error, since `Reconnect` requires that `poll_ready` be
  called first.
This commit is contained in:
Oliver Gould 2018-08-31 15:58:28 -07:00 committed by GitHub
parent d98c83404b
commit b86694546a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 61 additions and 44 deletions

View File

@ -63,6 +63,14 @@ where
protocol: Protocol, protocol: Protocol,
} }
pub struct ResponseFuture<B>
where
B: tower_h2::Body + Send + 'static,
<B::Data as ::bytes::IntoBuf>::Buf: Send,
{
inner: <Stack<B> as tower::Service>::Future,
}
/// A type of service binding. /// A type of service binding.
/// ///
/// Some services, for various reasons, may not be able to be used to serve multiple /// Some services, for various reasons, may not be able to be used to serve multiple
@ -141,9 +149,9 @@ pub type Service<B> = BoundService<B>;
pub type Stack<B> = WatchService<tls::ConditionalClientConfig, RebindTls<B>>; pub type Stack<B> = WatchService<tls::ConditionalClientConfig, RebindTls<B>>;
type StackInner<B> = Reconnect<orig_proto::Upgrade<NormalizeUri<NewHttp<B>>>>; type ReconnectStack<B> = Reconnect<NewHttp<B>>;
pub type NewHttp<B> = telemetry::http::service::NewHttp<Client<B>, B, HttpBody>; pub type NewHttp<B> = orig_proto::Upgrade<NormalizeUri<telemetry::http::service::NewHttp<Client<B>, B, HttpBody>>>;
pub type HttpResponse = http::Response<telemetry::http::service::ResponseBody<HttpBody>>; pub type HttpResponse = http::Response<telemetry::http::service::ResponseBody<HttpBody>>;
@ -236,13 +244,13 @@ where
/// ///
/// When the TLS client configuration is invalidated, this function will /// When the TLS client configuration is invalidated, this function will
/// be called again to bind a new stack. /// be called again to bind a new stack.
fn bind_inner_stack( fn bind_reconnect_stack(
&self, &self,
ep: &Endpoint, ep: &Endpoint,
protocol: &Protocol, protocol: &Protocol,
tls_client_config: &tls::ConditionalClientConfig, tls_client_config: &tls::ConditionalClientConfig,
)-> StackInner<B> { ) -> ReconnectStack<B> {
debug!("bind_inner_stack endpoint={:?}, protocol={:?}", ep, protocol); debug!("bind_reconnect_stack endpoint={:?}, protocol={:?}", ep, protocol);
let addr = ep.address(); let addr = ep.address();
let tls = ep.tls_identity().and_then(|identity| { let tls = ep.tls_identity().and_then(|identity| {
@ -292,7 +300,7 @@ where
/// Binds the endpoint stack used to construct a bound service. /// Binds the endpoint stack used to construct a bound service.
/// ///
/// This will wrap the service stack returned by `bind_inner_stack` /// This will wrap the service stack returned by `bind_reconnect_stack`
/// with a middleware layer that causes it to be re-constructed when /// with a middleware layer that causes it to be re-constructed when
/// the TLS client configuration changes. /// the TLS client configuration changes.
/// ///
@ -456,8 +464,8 @@ where
{ {
type Request = <Stack<B> as tower::Service>::Request; type Request = <Stack<B> as tower::Service>::Request;
type Response = <Stack<B> as tower::Service>::Response; type Response = <Stack<B> as tower::Service>::Response;
type Error = <Stack<B> as tower::Service>::Error; type Error = <NewHttp<B> as tower::NewService>::Error;
type Future = <Stack<B> as tower::Service>::Future; type Future = ResponseFuture<B>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self) -> Poll<(), Self::Error> {
let ready = match self.binding { let ready = match self.binding {
@ -487,6 +495,20 @@ where
// If they *don't* call `poll_ready` again, that's ok, we won't ever // If they *don't* call `poll_ready` again, that's ok, we won't ever
// try to connect again. // try to connect again.
match ready { match ready {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(ready) => {
trace!("poll_ready: ready for business");
self.debounce_connect_error_log = false;
Ok(ready)
},
Err(ReconnectError::Inner(err)) => {
trace!("poll_ready: inner error");
self.debounce_connect_error_log = false;
Err(err)
},
Err(ReconnectError::Connect(err)) => { Err(ReconnectError::Connect(err)) => {
if !self.debounce_connect_error_log { if !self.debounce_connect_error_log {
self.debounce_connect_error_log = true; self.debounce_connect_error_log = true;
@ -494,55 +516,50 @@ where
} else { } else {
debug!("connect error to {:?}: {}", self.endpoint, err); debug!("connect error to {:?}: {}", self.endpoint, err);
} }
match self.binding {
Binding::Bound(ref mut svc) => {
trace!("poll_ready: binding stack after error");
*svc = self.bind.bind_stack(&self.endpoint, &self.protocol);
},
Binding::BindsPerRequest { ref mut next } => {
trace!("poll_ready: dropping bound stack after error");
next.take();
}
}
// So, this service isn't "ready" yet. Instead of trying to make // `Reconnect` is currently idle and needs to be polled to
// it ready, schedule the task for notification so the caller can // rebuild its inner service. Instead of doing this immediately,
// schedule the task for notification so the caller can
// determine whether readiness is still necessary (i.e. whether // determine whether readiness is still necessary (i.e. whether
// there are still requests to be sent). // there are still requests to be sent).
// //
// But, to return NotReady, we must notify the task. So, // This prevents busy-loops when the connection fails
// this notifies the task immediately, and figures that // immediately.
// whoever owns this service will call `poll_ready` if they
// are still interested.
task::current().notify(); task::current().notify();
Ok(Async::NotReady) Ok(Async::NotReady)
} }
// don't debounce on NotReady...
Ok(Async::NotReady) => Ok(Async::NotReady), Err(ReconnectError::NotReady) => {
other => { unreachable!("Reconnect::poll_ready cannot fail with NotReady");
trace!("poll_ready: ready for business"); }
self.debounce_connect_error_log = false;
other
},
} }
} }
fn call(&mut self, request: Self::Request) -> Self::Future { fn call(&mut self, request: Self::Request) -> Self::Future {
match self.binding { let inner = match self.binding {
Binding::Bound(ref mut svc) => svc.call(request), Binding::Bound(ref mut svc) => svc.call(request),
Binding::BindsPerRequest { ref mut next } => { Binding::BindsPerRequest { ref mut next } => {
// If a service has already been bound in `poll_ready`, consume it. let mut svc = next.take().expect("poll_ready must be called before call");
// Otherwise, bind a new service on-the-spot.
let bind = &self.bind;
let endpoint = &self.endpoint;
let protocol = &self.protocol;
let mut svc = next.take()
.unwrap_or_else(|| {
bind.bind_stack(endpoint, protocol)
});
svc.call(request) svc.call(request)
} }
} };
ResponseFuture { inner }
}
}
impl<B> Future for ResponseFuture<B>
where
B: tower_h2::Body + Send + 'static,
<B::Data as ::bytes::IntoBuf>::Buf: Send,
{
type Item = <Stack<B> as tower::Service>::Response;
type Error = <NewHttp<B> as tower::NewService>::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll().map_err(|e| match e {
ReconnectError::Inner(e) => e,
_ => unreachable!("Reconnect response futures can only fail with inner errors"),
})
} }
} }
@ -622,12 +639,12 @@ where
B: tower_h2::Body + Send + 'static, B: tower_h2::Body + Send + 'static,
<B::Data as ::bytes::IntoBuf>::Buf: Send, <B::Data as ::bytes::IntoBuf>::Buf: Send,
{ {
type Service = StackInner<B>; type Service = ReconnectStack<B>;
fn rebind(&mut self, tls: &tls::ConditionalClientConfig) -> Self::Service { fn rebind(&mut self, tls: &tls::ConditionalClientConfig) -> Self::Service {
debug!( debug!(
"rebinding endpoint stack for {:?}:{:?} on TLS config change", "rebinding endpoint stack for {:?}:{:?} on TLS config change",
self.endpoint, self.protocol, self.endpoint, self.protocol,
); );
self.bind.bind_inner_stack(&self.endpoint, &self.protocol, tls) self.bind.bind_reconnect_stack(&self.endpoint, &self.protocol, tls)
} }
} }