From 216fe16523a2a4434348b7f46b6ccd40acba21ac Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Mon, 10 Sep 2018 14:27:36 -0700 Subject: [PATCH] Reorder endpoint-specific HTTP middlewares (#88) Currently, the layered service implementations that comprise the HTTP stack are a mix of `Service` and `NewService` types. In the endpoint-specific stack. `transparency::Client` is the only layer that actually needs to be a `NewService` if it is wrapped immediately with a `Reconnect`. This allows us to remove several `NewService` implementations. This extracts a new `svc::Reconnect` middleware from `bind`, handling connection error logging and hiding `tower_reconnect::Error` from outer layers. Furthermore, two HTTP/1-specific middlewares have been moved outside of the TLS rebinding layer, since they are not dependent on TLS configuration. Finally, `bind`'s type aliases have been simplified, removing the `HttpRequest` and `HttpResponse` aliases. By removing these, and removing `transparency::Client`'s dependency on the telemetry body types, it should be easier to change type signatures going forward. --- src/bind.rs | 271 +++++++++------------------------ src/inbound.rs | 17 +-- src/outbound.rs | 6 +- src/svc/mod.rs | 6 +- src/svc/reconnect.rs | 122 +++++++++++++++ src/telemetry/http/sensors.rs | 16 +- src/telemetry/http/service.rs | 87 +---------- src/transparency/client.rs | 17 +-- src/transparency/orig_proto.rs | 31 +--- 9 files changed, 233 insertions(+), 340 deletions(-) create mode 100644 src/svc/reconnect.rs diff --git a/src/bind.rs b/src/bind.rs index aac4ba72d..2681e7bd6 100644 --- a/src/bind.rs +++ b/src/bind.rs @@ -2,16 +2,16 @@ use std::error::Error; use std::fmt; use std::marker::PhantomData; use std::net::SocketAddr; +use std::sync::Arc; -use futures::{Async, Future, Poll, future, task}; +use futures::Poll; use http::{self, uri}; use tower_service as tower; use tower_h2; -use tower_reconnect::{Reconnect, Error as ReconnectError}; use control::destination::Endpoint; use ctx; -use svc::NewClient; +use svc::{NewClient, Reconnect}; use telemetry; use transparency::{self, HttpBody, h1, orig_proto}; use transport; @@ -19,6 +19,24 @@ use tls; use ctx::transport::TlsStatus; use watch_service::{WatchService, Rebind}; +/// An HTTP `Service` that is created for each `Endpoint` and `Protocol`. +pub type Stack = orig_proto::Upgrade>>; + +type WatchTls = WatchService>; + +/// An HTTP `Service` that is created for each `Endpoint`, `Protocol`, and client +/// TLS configuration. +pub type TlsStack = telemetry::http::service::Http, B, HttpBody>; + +type HttpService = Reconnect< + Arc, + transparency::Client< + transport::metrics::Connect, + ::logging::ClientExecutor<&'static str, SocketAddr>, + telemetry::http::service::RequestBody, + > +>; + /// Binds a `Service` from a `SocketAddr`. /// /// The returned `Service` buffers request until a connection is established. @@ -55,22 +73,10 @@ where { bind: Bind, binding: Binding, - /// Prevents logging repeated connect errors. - /// - /// Set back to false after a connect succeeds, to log about future errors. - debounce_connect_error_log: bool, endpoint: Endpoint, protocol: Protocol, } -pub struct ResponseFuture -where - B: tower_h2::Body + Send + 'static, - ::Buf: Send, -{ - inner: as tower::Service>::Future, -} - /// A type of service binding. /// /// Some services, for various reasons, may not be able to be used to serve multiple @@ -83,7 +89,7 @@ where B: tower_h2::Body + Send + 'static, ::Buf: Send, { - Bound(WatchService>), + Bound(Stack), BindsPerRequest { // When `poll_ready` is called, the _next_ service to be used may be bound // ahead-of-time. This stack is used only to serve the next request to this @@ -145,24 +151,6 @@ pub struct RebindTls { endpoint: Endpoint, } -pub type Service = BoundService; - -pub type Stack = WatchService>; - -type ReconnectStack = Reconnect>; - -pub type NewHttp = orig_proto::Upgrade, B, HttpBody>>>; - -pub type HttpResponse = http::Response>; - -pub type HttpRequest = http::Request>; - -pub type Client = transparency::Client< - transport::metrics::Connect, - ::logging::ClientExecutor<&'static str, SocketAddr>, - B, ->; - #[derive(Copy, Clone, Debug)] pub enum BufferSpawnError { Inbound, @@ -232,27 +220,24 @@ where B: tower_h2::Body + Send + 'static, ::Buf: Send, { - /// Binds the "inner" layers of the stack. + /// Binds the innermost layers of the stack with a TLS configuration. /// - /// This binds a service stack that comprises the client for that individual - /// endpoint. It will have to be rebuilt if the TLS configuration changes. + /// A reconnecting HTTP client is established with the given endpont, + /// protocol, and TLS configuration. /// - /// This includes: - /// + Reconnects - /// + URI normalization - /// + HTTP sensors - /// - /// When the TLS client configuration is invalidated, this function will - /// be called again to bind a new stack. - fn bind_reconnect_stack( + /// This client is instrumented with metrics. + fn bind_with_tls( &self, ep: &Endpoint, protocol: &Protocol, tls_client_config: &tls::ConditionalClientConfig, - ) -> ReconnectStack { - debug!("bind_reconnect_stack endpoint={:?}, protocol={:?}", ep, protocol); + ) -> TlsStack { + debug!("bind_with_tls endpoint={:?}, protocol={:?}", ep, protocol); let addr = ep.address(); + let log = ::logging::Client::proxy(self.ctx, addr) + .with_protocol(protocol.clone()); + let tls = ep.tls_identity().and_then(|identity| { tls_client_config.as_ref().map(|config| { tls::ConnectionConfig { @@ -273,54 +258,45 @@ where let connect = self.transport_registry .new_connect(client_ctx.as_ref(), transport::Connect::new(addr, tls)); + // TODO: Add some sort of backoff logic between reconnects. + self.sensors.http( + client_ctx.clone(), + Reconnect::new( + client_ctx.clone(), + transparency::Client::new(protocol, connect, log.executor()) + ) + ) + } - let log = ::logging::Client::proxy(self.ctx, addr) - .with_protocol(protocol.clone()); - let client = transparency::Client::new( - protocol, - connect, - log.executor(), - ); - - let sensors = self.sensors.http( - client, - &client_ctx - ); - - // Rewrite the HTTP/1 URI, if the authorities in the Host header - // and request URI are not in agreement, or are not present. - let normalize_uri = NormalizeUri::new(sensors, protocol.was_absolute_form()); - let upgrade_orig_proto = orig_proto::Upgrade::new(normalize_uri, protocol.is_http2()); - - // Automatically perform reconnects if the connection fails. - // - // TODO: Add some sort of backoff logic. - Reconnect::new(upgrade_orig_proto) - } - - /// Binds the endpoint stack used to construct a bound service. + /// Build a `Service` for the given endpoint and `Protocol`. /// - /// This will wrap the service stack returned by `bind_reconnect_stack` - /// with a middleware layer that causes it to be re-constructed when - /// the TLS client configuration changes. + /// The service attempts to upgrade HTTP/1 requests to HTTP/2 (if it's known + /// with prior knowledge that the endpoint supports HTTP/2). /// - /// This function will itself be called again by `BoundService` if the - /// service binds per request, or if the initial connection to the - /// endpoint fails. + /// As `tls_client_config` updates, `bind_with_tls` is called to rebuild the + /// client with the appropriate TLS configuraiton. fn bind_stack(&self, ep: &Endpoint, protocol: &Protocol) -> Stack { debug!("bind_stack: endpoint={:?}, protocol={:?}", ep, protocol); - // TODO: Since `BindsPerRequest` bindings are only used for a - // single request, it seems somewhat unnecessary to wrap them in a - // `WatchService` middleware so that they can be rebound when the TLS - // config changes, since they _always_ get rebound regardless. For now, - // we still add the `WatchService` layer so that the per-request and - // bound service stacks have the same type. let rebind = RebindTls { bind: self.clone(), endpoint: ep.clone(), protocol: protocol.clone(), }; - WatchService::new(self.tls_client_config.clone(), rebind) + let watch_tls = WatchService::new(self.tls_client_config.clone(), rebind); + + // HTTP/1.1 middlewares + // + // TODO make this conditional based on `protocol` + // TODO extract HTTP/1 rebinding logic up here + + // Rewrite the HTTP/1 URI, if the authorities in the Host header + // and request URI are not in agreement, or are not present. + // + // TODO move this into transparency::Client? + let normalize_uri = NormalizeUri::new(watch_tls, protocol.was_absolute_form()); + + // Upgrade HTTP/1.1 requests to be HTTP/2 if the endpoint supports HTTP/2. + orig_proto::Upgrade::new(normalize_uri, protocol.is_http2()) } pub fn bind_service(&self, ep: &Endpoint, protocol: &Protocol) -> BoundService { @@ -347,7 +323,6 @@ where BoundService { bind: self.clone(), binding, - debounce_connect_error_log: false, endpoint: ep.clone(), protocol: protocol.clone(), } @@ -375,7 +350,7 @@ where { type Target = Endpoint; type Error = (); - type Client = Service; + type Client = BoundService; fn new_client(&mut self, ep: &Endpoint) -> Result { Ok(self.bind.bind_service(ep, &self.protocol)) @@ -385,58 +360,18 @@ where // ===== impl NormalizeUri ===== - impl NormalizeUri { fn new(inner: S, was_absolute_form: bool) -> Self { Self { inner, was_absolute_form } } } -impl tower::NewService for NormalizeUri -where - S: tower::NewService< - Request=http::Request, - Response=HttpResponse, - >, - S::Service: tower::Service< - Request=http::Request, - Response=HttpResponse, - >, - NormalizeUri: tower::Service, - B: tower_h2::Body, -{ - type Request = ::Request; - type Response = ::Response; - type Error = ::Error; - type Service = NormalizeUri; - type InitError = S::InitError; - type Future = future::Map< - S::Future, - fn(S::Service) -> NormalizeUri - >; - fn new_service(&self) -> Self::Future { - let s = self.inner.new_service(); - // This weird dance is so that the closure doesn't have to - // capture `self` and can just be a `fn` (so the `Map`) - // can be returned unboxed. - if self.was_absolute_form { - s.map(|inner| NormalizeUri::new(inner, true)) - } else { - s.map(|inner| NormalizeUri::new(inner, false)) - } - } -} - impl tower::Service for NormalizeUri where - S: tower::Service< - Request=http::Request, - Response=HttpResponse, - >, - B: tower_h2::Body, + S: tower::Service>, { type Request = S::Request; - type Response = HttpResponse; + type Response = S::Response; type Error = S::Error; type Future = S::Future; @@ -464,11 +399,11 @@ where { type Request = as tower::Service>::Request; type Response = as tower::Service>::Response; - type Error = as tower::NewService>::Error; - type Future = ResponseFuture; + type Error = as tower::Service>::Error; + type Future = as tower::Service>::Future; fn poll_ready(&mut self) -> Poll<(), Self::Error> { - let ready = match self.binding { + match self.binding { // A service is already bound, so poll its readiness. Binding::Bound(ref mut svc) | Binding::BindsPerRequest { next: Some(ref mut svc) } => { @@ -485,81 +420,17 @@ where *next = Some(svc); ready } - }; - - // If there was a connect error, don't terminate this BoundService - // completely. Instead, simply clean up the inner service, prepare to - // make a new one, and tell our caller that we could maybe be ready - // if they call `poll_ready` again. - // - // If they *don't* call `poll_ready` again, that's ok, we won't ever - // try to connect again. - match ready { - Ok(Async::NotReady) => Ok(Async::NotReady), - - Ok(ready) => { - trace!("poll_ready: ready for business"); - self.debounce_connect_error_log = false; - Ok(ready) - }, - - Err(ReconnectError::Inner(err)) => { - trace!("poll_ready: inner error"); - self.debounce_connect_error_log = false; - Err(err) - }, - - Err(ReconnectError::Connect(err)) => { - if !self.debounce_connect_error_log { - self.debounce_connect_error_log = true; - warn!("connect error to {:?}: {}", self.endpoint, err); - } else { - debug!("connect error to {:?}: {}", self.endpoint, err); - } - - // `Reconnect` is currently idle and needs to be polled to - // rebuild its inner service. Instead of doing this immediately, - // schedule the task for notification so the caller can - // determine whether readiness is still necessary (i.e. whether - // there are still requests to be sent). - // - // This prevents busy-loops when the connection fails - // immediately. - task::current().notify(); - Ok(Async::NotReady) - } - - Err(ReconnectError::NotReady) => { - unreachable!("Reconnect::poll_ready cannot fail with NotReady"); - } } } fn call(&mut self, request: Self::Request) -> Self::Future { - let inner = match self.binding { + match self.binding { Binding::Bound(ref mut svc) => svc.call(request), Binding::BindsPerRequest { ref mut next } => { let mut svc = next.take().expect("poll_ready must be called before call"); svc.call(request) } - }; - ResponseFuture { inner } - } -} - -impl Future for ResponseFuture -where - B: tower_h2::Body + Send + 'static, - ::Buf: Send, -{ - type Item = as tower::Service>::Response; - type Error = as tower::NewService>::Error; - - fn poll(&mut self) -> Poll { - self.inner.poll().map_err(|e| match e { - ReconnectError::Inner(e) => e, - _ => unreachable!("Reconnect response futures can only fail with inner errors"), - }) + } } } @@ -639,12 +510,12 @@ where B: tower_h2::Body + Send + 'static, ::Buf: Send, { - type Service = ReconnectStack; + type Service = TlsStack; fn rebind(&mut self, tls: &tls::ConditionalClientConfig) -> Self::Service { debug!( "rebinding endpoint stack for {:?}:{:?} on TLS config change", self.endpoint, self.protocol, ); - self.bind.bind_reconnect_stack(&self.endpoint, &self.protocol, tls) + self.bind.bind_with_tls(&self.endpoint, &self.protocol, tls) } } diff --git a/src/inbound.rs b/src/inbound.rs index 28b7fc765..934e9ac0c 100644 --- a/src/inbound.rs +++ b/src/inbound.rs @@ -1,10 +1,9 @@ use std::net::{SocketAddr}; use std::sync::Arc; -use http; use tower_service as tower; -use tower_buffer::{self, Buffer}; -use tower_in_flight_limit::{self, InFlightLimit}; +use tower_buffer::Buffer; +use tower_in_flight_limit::InFlightLimit; use tower_h2; use linkerd2_proxy_router::Recognize; @@ -49,16 +48,12 @@ where B: tower_h2::Body + Send + 'static, ::Buf: Send, { - type Request = http::Request; - type Response = bind::HttpResponse; - type Error = tower_in_flight_limit::Error< - tower_buffer::Error< - as tower::Service>::Error - > - >; type Key = (SocketAddr, bind::Protocol); + type Request = ::Request; + type Response = ::Response; + type Error = ::Error; type RouteError = bind::BufferSpawnError; - type Service = InFlightLimit>>>; + type Service = InFlightLimit>>>; fn recognize(&self, req: &Self::Request) -> Option { let key = req.extensions() diff --git a/src/outbound.rs b/src/outbound.rs index 9aa129e00..aaf848a70 100644 --- a/src/outbound.rs +++ b/src/outbound.rs @@ -208,10 +208,10 @@ where ::Buf: Send, { type Key = SocketAddr; - type Request = http::Request; - type Response = bind::HttpResponse; + type Request = ::Request; + type Response = ::Response; type Error = ::Error; - type Service = bind::Service; + type Service = bind::BoundService; type DiscoverError = BindError; fn poll(&mut self) -> Poll, Self::DiscoverError> { diff --git a/src/svc/mod.rs b/src/svc/mod.rs index 31336192e..906a2d51e 100644 --- a/src/svc/mod.rs +++ b/src/svc/mod.rs @@ -21,7 +21,11 @@ //! //! * Move HTTP-specific service infrastructure into `svc::http`. -pub use tower_service::Service; +pub use tower_service::{NewService, Service}; + +mod reconnect; + +pub use self::reconnect::Reconnect; pub trait NewClient { diff --git a/src/svc/reconnect.rs b/src/svc/reconnect.rs new file mode 100644 index 000000000..8e8d0f610 --- /dev/null +++ b/src/svc/reconnect.rs @@ -0,0 +1,122 @@ +use std::fmt; + +use futures::{task, Async, Future, Poll}; +use tower_reconnect; + +use super::{NewService, Service}; + +/// Wraps `tower_reconnect`, handling errors. +/// +/// Ensures that the underlying service is ready and, if the underlying service +/// fails to become ready, rebuilds the inner stack. +pub struct Reconnect +where + T: fmt::Debug, + N: NewService, +{ + inner: tower_reconnect::Reconnect, + + /// The target, used for debug logging. + target: T, + + /// Prevents logging repeated connect errors. + /// + /// Set back to false after a connect succeeds, to log about future errors. + mute_connect_error_log: bool, +} + +pub struct ResponseFuture { + inner: as Service>::Future, +} + +// ===== impl Reconnect ===== + +impl Reconnect +where + T: fmt::Debug, + N: NewService, + N::InitError: fmt::Display, +{ + pub fn new(target: T, new_service: N) -> Self { + let inner = tower_reconnect::Reconnect::new(new_service); + Self { + target, + inner, + mute_connect_error_log: false, + } + } +} + +impl Service for Reconnect +where + T: fmt::Debug, + N: NewService, + N::InitError: fmt::Display, +{ + type Request = N::Request; + type Response = N::Response; + type Error = N::Error; + type Future = ResponseFuture; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + match self.inner.poll_ready() { + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(ready) => { + trace!("poll_ready: ready for business"); + self.mute_connect_error_log = false; + Ok(ready) + } + + Err(tower_reconnect::Error::Inner(err)) => { + trace!("poll_ready: inner error, debouncing"); + self.mute_connect_error_log = false; + Err(err) + } + + Err(tower_reconnect::Error::Connect(err)) => { + // A connection could not be established to the target. + + // This is only logged as a warning at most once. Subsequent + // errors are logged at debug. + if !self.mute_connect_error_log { + self.mute_connect_error_log = true; + warn!("connect error to {:?}: {}", self.target, err); + } else { + debug!("connect error to {:?}: {}", self.target, err); + } + + // The inner service is now idle and will renew its internal + // state on the next poll. Instead of doing this immediately, + // the task is scheduled to be polled again only if the caller + // decides not to drop it. + // + // This prevents busy-looping when the connect error is + // instantaneous. + task::current().notify(); + Ok(Async::NotReady) + } + + Err(tower_reconnect::Error::NotReady) => { + unreachable!("poll_ready can't fail with NotReady"); + } + } + } + + fn call(&mut self, request: Self::Request) -> Self::Future { + ResponseFuture { + inner: self.inner.call(request), + } + } +} + +impl Future for ResponseFuture { + type Item = N::Response; + type Error = N::Error; + + fn poll(&mut self) -> Poll { + self.inner.poll().map_err(|e| match e { + tower_reconnect::Error::Inner(err) => err, + _ => unreachable!("response future must fail with inner error"), + }) + } +} diff --git a/src/telemetry/http/sensors.rs b/src/telemetry/http/sensors.rs index d0c80600f..14de1a9ba 100644 --- a/src/telemetry/http/sensors.rs +++ b/src/telemetry/http/sensors.rs @@ -1,7 +1,7 @@ use std::sync::{Arc, Mutex}; use http::{Request, Response}; -use tower_service::NewService; +use tower_service::Service; use tower_h2::Body; use ctx; @@ -9,7 +9,7 @@ use telemetry::{http::event, tap}; use transparency::ClientError; use super::record::Record; -use super::service::{NewHttp, RequestBody}; +use super::service::{Http, RequestBody}; #[derive(Clone, Debug)] struct Inner { @@ -54,21 +54,21 @@ impl Sensors { Self::new(Record::for_test(), &Default::default()) } - pub fn http( + pub fn http( &self, - new_service: N, - client_ctx: &Arc, - ) -> NewHttp + client_ctx: Arc, + service: S, + ) -> Http where A: Body + 'static, B: Body + 'static, - N: NewService< + S: Service< Request = Request>, Response = Response, Error = ClientError > + 'static, { - NewHttp::new(new_service, Handle(self.0.clone()), client_ctx) + Http::new(service, Handle(self.0.clone()), client_ctx) } } diff --git a/src/telemetry/http/service.rs b/src/telemetry/http/service.rs index 0c78bda7f..10fc5d540 100644 --- a/src/telemetry/http/service.rs +++ b/src/telemetry/http/service.rs @@ -36,20 +36,6 @@ pub struct TimestampRequestOpen { inner: S, } -pub struct NewHttp { - new_service: N, - handle: Handle, - client_ctx: Arc, - _p: PhantomData<(A, B)>, -} - -pub struct Init { - future: F, - handle: Handle, - client_ctx: Arc, - _p: PhantomData<(A, B)>, -} - /// Wraps a transport with telemetry. #[derive(Debug)] pub struct Http { @@ -112,13 +98,13 @@ pub struct RequestBodyInner { request_open_at: Instant, } -// === NewHttp === +// === Http === -impl NewHttp +impl Http where A: Body + 'static, B: Body + 'static, - N: NewService< + S: Service< Request = http::Request>, Response = http::Response, Error = ClientError, @@ -126,76 +112,19 @@ where + 'static, { pub(super) fn new( - new_service: N, + service: S, handle: Handle, - client_ctx: &Arc, + client_ctx: Arc, ) -> Self { Self { - new_service, - handle, - client_ctx: Arc::clone(client_ctx), - _p: PhantomData, - } - } -} - -impl NewService for NewHttp -where - A: Body + 'static, - B: Body + 'static, - N: NewService< - Request = http::Request>, - Response = http::Response, - Error = ClientError, - > - + 'static, -{ - type Request = http::Request; - type Response = http::Response>; - type Error = N::Error; - type InitError = N::InitError; - type Future = Init; - type Service = Http; - - fn new_service(&self) -> Self::Future { - Init { - future: self.new_service.new_service(), - handle: self.handle.clone(), - client_ctx: Arc::clone(&self.client_ctx), - _p: PhantomData, - } - } -} - -// === Init === - -impl Future for Init -where - A: Body + 'static, - B: Body + 'static, - F: Future, - F::Item: Service< - Request = http::Request>, - Response = http::Response - >, -{ - type Item = Http; - type Error = F::Error; - - fn poll(&mut self) -> Poll { - let service = try_ready!(self.future.poll()); - - Ok(Async::Ready(Http { service, - handle: self.handle.clone(), - client_ctx: self.client_ctx.clone(), + handle, + client_ctx, _p: PhantomData, - })) + } } } -// === Http === - impl Service for Http where A: Body + 'static, diff --git a/src/transparency/client.rs b/src/transparency/client.rs index 0b5de8d7a..57374e578 100644 --- a/src/transparency/client.rs +++ b/src/transparency/client.rs @@ -10,7 +10,6 @@ use tower_h2; use bind; use task::BoxExecutor; -use telemetry::http::service::RequestBody; use super::glue::{BodyPayload, HttpBody, HyperConnect}; use super::h1; use super::upgrade::{HttpConnect, Http11Upgrade}; @@ -18,7 +17,7 @@ use super::upgrade::{HttpConnect, Http11Upgrade}; use std::{self, fmt}; type HyperClient = - hyper::Client, BodyPayload>>; + hyper::Client, BodyPayload>; /// A wrapper around the error types produced by the HTTP/1 and HTTP/2 clients. /// @@ -52,7 +51,7 @@ where E: future::Executor + Send + 'static>> + Send + Sync + 'static, { Http1(HyperClient), - Http2(tower_h2::client::Connect, RequestBody>), + Http2(tower_h2::client::Connect, B>), } /// A `Future` returned from `Client::new_service()`. @@ -74,7 +73,7 @@ where E: future::Executor + Send + 'static>> + Send + Sync + 'static, { Http1(Option>), - Http2(tower_h2::client::ConnectFuture, RequestBody>), + Http2(tower_h2::client::ConnectFuture, B>), } /// The `Service` yielded by `Client::new_service()`. @@ -99,7 +98,7 @@ where Http2(tower_h2::client::Connection< ::Connected, BoxExecutor, - RequestBody, + B, >), } @@ -154,9 +153,9 @@ where B: tower_h2::Body + Send + 'static, ::Buf: Send + 'static, { - type Request = bind::HttpRequest; - type Response = http::Response; - type Error = Error; + type Request = ::Request; + type Response = ::Response; + type Error = ::Error; type InitError = tower_h2::client::ConnectError; type Service = ClientService; type Future = ClientNewServiceFuture; @@ -216,7 +215,7 @@ where B: tower_h2::Body + Send + 'static, ::Buf: Send + 'static, { - type Request = bind::HttpRequest; + type Request = http::Request; type Response = http::Response; type Error = Error; type Future = ClientServiceFuture; diff --git a/src/transparency/orig_proto.rs b/src/transparency/orig_proto.rs index 13045bf68..5c986ea20 100644 --- a/src/transparency/orig_proto.rs +++ b/src/transparency/orig_proto.rs @@ -1,7 +1,7 @@ use futures::{future, Future, Poll}; use http; use http::header::{TRANSFER_ENCODING, HeaderValue}; -use tower_service::{Service, NewService}; +use tower_service::Service; use bind; use super::h1; @@ -133,34 +133,7 @@ where } } -impl NewService for Upgrade -where - S: NewService, Response = http::Response>, -{ - type Request = S::Request; - type Response = S::Response; - type Error = S::Error; - type Service = Upgrade; - type InitError = S::InitError; - type Future = future::Map< - S::Future, - fn(S::Service) -> Upgrade - >; - - fn new_service(&self) -> Self::Future { - let s = self.inner.new_service(); - // This weird dance is so that the closure doesn't have to - // capture `self` and can just be a `fn` (so the `Map`) - // can be returned unboxed. - if self.upgrade_h1 { - s.map(|inner| Upgrade::new(inner, true)) - } else { - s.map(|inner| Upgrade::new(inner, false)) - } - } -} - -// ===== impl Upgrade ===== +// ===== impl Downgrade ===== impl Downgrade { pub fn new(inner: S) -> Self {