From ccd5d219786ce79df572b25e1ee5072e2d6e2068 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Tue, 11 Sep 2018 16:47:25 -0700 Subject: [PATCH] Introduce a "server stack" (#90) Previously, `proxy::Server` was generic over a `NewService` (constructed in `lib.rs`) that instruments error handling around the router and metrics. In preparation of adding a metrics module into the server stack (that is configured by the source connection), `Server` should be changed to instantaneously build clients with a `MakeClient>`. In order to do this, the following changes were made: 1. The `svc::NewClient` type was changed to `svc::MakeClient`. The naming change ("New" to "Make") is intended to differentiate the type from `NewService`, which is asynchronous and does not accept a `Target` argument. 2. The `proxy::h2_router` module was split from `lib.rs` and `map_err.rs`. `MapErr` tried to be generic, though we only used it in once place. Now, the `h2_router::Make` type supports cloning routers and handling their errors. 3. The `TimestampRequestOpen` middleware was split into its own file and given a `MakeClient` implementation. 4. The `svc::Layer` trait has been introduced to support layering middlewares like `TimestampRequestOpen`. This is analogous to Finagle's `Stack.Module` type. There are no functional changes. --- src/bind.rs | 7 +- src/control/destination/mod.rs | 8 +- src/inbound.rs | 4 +- src/lib.rs | 50 ++---- src/map_err.rs | 92 ---------- src/outbound.rs | 6 +- src/proxy/h2_router.rs | 168 ++++++++++++++++++ src/proxy/mod.rs | 1 + src/proxy/server.rs | 174 +++++++++---------- src/svc/layer.rs | 50 ++++++ src/svc/mod.rs | 60 +++++-- src/telemetry/http/mod.rs | 1 + src/telemetry/http/service.rs | 68 +------- src/telemetry/http/timestamp_request_open.rs | 90 ++++++++++ 14 files changed, 467 insertions(+), 312 deletions(-) delete mode 100644 src/map_err.rs create mode 100644 src/proxy/h2_router.rs create mode 100644 src/svc/layer.rs create mode 100644 src/telemetry/http/timestamp_request_open.rs diff --git a/src/bind.rs b/src/bind.rs index bd3f5239d..042263285 100644 --- a/src/bind.rs +++ b/src/bind.rs @@ -11,7 +11,7 @@ use tower_h2; use control::destination::Endpoint; use ctx; -use svc::{NewClient, Reconnect}; +use svc::{MakeClient, Reconnect}; use telemetry; use proxy::{self, HttpBody, h1, orig_proto}; use transport; @@ -343,16 +343,15 @@ impl Bind { } } -impl NewClient for BindProtocol +impl MakeClient for BindProtocol where B: tower_h2::Body + Send + 'static, ::Buf: Send, { - type Target = Endpoint; type Error = (); type Client = BoundService; - fn new_client(&mut self, ep: &Endpoint) -> Result { + fn make_client(&self, ep: &Endpoint) -> Result { Ok(self.bind.bind_service(ep, &self.protocol)) } } diff --git a/src/control/destination/mod.rs b/src/control/destination/mod.rs index 288c4ef9f..afc76ceb7 100644 --- a/src/control/destination/mod.rs +++ b/src/control/destination/mod.rs @@ -40,7 +40,7 @@ use tower_discover::{Change, Discover}; use tower_service::Service; use dns; -use svc::NewClient; +use svc::MakeClient; use tls; use transport::{DnsNameAndPort, HostAndPort}; @@ -158,7 +158,7 @@ impl Resolver { /// Start watching for address changes for a certain authority. pub fn resolve(&self, authority: &DnsNameAndPort, new_endpoint: N) -> Resolution where - N: NewClient, + N: MakeClient, { trace!("resolve; authority={:?}", authority); let (update_tx, update_rx) = mpsc::unbounded(); @@ -189,7 +189,7 @@ impl Resolver { impl Discover for Resolution where - N: NewClient, + N: MakeClient, { type Key = SocketAddr; type Request = ::Request; @@ -212,7 +212,7 @@ where // existing ones can be handled in the same way. let endpoint = Endpoint::new(addr, meta); - let service = self.new_endpoint.new_client(&endpoint).map_err(|_| ())?; + let service = self.new_endpoint.make_client(&endpoint).map_err(|_| ())?; return Ok(Async::Ready(Change::Insert(addr, service))); }, diff --git a/src/inbound.rs b/src/inbound.rs index 86c5b0ba1..f23383e7a 100644 --- a/src/inbound.rs +++ b/src/inbound.rs @@ -5,10 +5,10 @@ use tower_service as tower; use tower_buffer::Buffer; use tower_in_flight_limit::InFlightLimit; use tower_h2; -use linkerd2_proxy_router::Recognize; use bind; use ctx; +use proxy::h2_router::Recognize; use proxy::orig_proto; type Bind = bind::Bind; @@ -100,7 +100,7 @@ mod tests { use std::net; use http; - use linkerd2_proxy_router::Recognize; + use proxy::h2_router::Recognize; use super::Inbound; use bind::{self, Bind, Host}; diff --git a/src/lib.rs b/src/lib.rs index f5ebb2e12..68d07580e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -53,14 +53,12 @@ extern crate try_lock; #[macro_use] extern crate linkerd2_metrics; extern crate linkerd2_proxy_api; -extern crate linkerd2_proxy_router; use futures::*; use std::error::Error; use std::io; use std::net::SocketAddr; -use std::sync::Arc; use std::thread; use std::time::{Duration, SystemTime}; @@ -70,8 +68,6 @@ use tokio::{ runtime::current_thread, }; use tower_service::NewService; -use tower_fn::*; -use linkerd2_proxy_router::{Recognize, Router, Error as RouteError}; pub mod app; mod bind; @@ -85,7 +81,6 @@ mod drain; pub mod fs_watch; mod inbound; mod logging; -mod map_err; mod outbound; pub mod stream; mod svc; @@ -94,15 +89,16 @@ pub mod telemetry; mod proxy; mod transport; pub mod timeout; -mod tower_fn; // TODO: move to tower-fn mod watch_service; // TODO: move to tower use bind::Bind; use conditional::Conditional; use inbound::Inbound; -use map_err::MapErr; use task::MainRuntime; use proxy::{HttpBody, Server}; +use proxy::h2_router::{self, Router, Recognize}; +use svc::Layer; +use telemetry::http::timestamp_request_open; use transport::{BoundPort, Connection}; pub use transport::{AddrInfo, GetOriginalDst, SoOriginalDst, tls}; use outbound::Outbound; @@ -433,39 +429,16 @@ where Router: Send, G: GetOriginalDst + Send + 'static, { - let stack = Arc::new(NewServiceFn::new(move || { - // Clone the router handle - let router = router.clone(); - // Map errors to appropriate response error codes. - let map_err = MapErr::new(router, |e| { - match e { - RouteError::Route(r) => { - error!(" turning route error: {} into 500", r); - http::StatusCode::INTERNAL_SERVER_ERROR - } - RouteError::Inner(i) => { - error!("turning {} into 500", i); - http::StatusCode::INTERNAL_SERVER_ERROR - } - RouteError::NotRecognized => { - error!("turning route not recognized error into 500"); - http::StatusCode::INTERNAL_SERVER_ERROR - } - RouteError::NoCapacity(capacity) => { - // TODO For H2 streams, we should probably signal a protocol-level - // capacity change. - error!("router at capacity ({}); returning a 503", capacity); - http::StatusCode::SERVICE_UNAVAILABLE - } - } - }); - // Install the request open timestamp module at the very top - // of the stack, in order to take the timestamp as close as - // possible to the beginning of the request's lifetime. - telemetry::http::service::TimestampRequestOpen::new(map_err) - })); + // Install the request open timestamp module at the very top of the + // stack, in order to take the timestamp as close as possible to the + // beginning of the request's lifetime. + // + // TODO replace with a metrics module that is registered to the server + // transport. + let stack = timestamp_request_open::Layer::new() + .bind(h2_router::Make::new(router)); let listen_addr = bound_port.local_addr(); let server = Server::new( @@ -477,6 +450,7 @@ where tcp_connect_timeout, disable_protocol_detection_ports, drain_rx.clone(), + h2::server::Builder::default(), ); let log = server.log().clone(); diff --git a/src/map_err.rs b/src/map_err.rs deleted file mode 100644 index f0b38208f..000000000 --- a/src/map_err.rs +++ /dev/null @@ -1,92 +0,0 @@ -use std::marker::PhantomData; -use std::sync::Arc; - -use futures::{Future, Poll}; -use h2; -use http; -use http::header::CONTENT_LENGTH; -use tower_service::Service; - -/// Map an HTTP service's error to an appropriate 500 response. -pub struct MapErr { - inner: T, - f: Arc, - _p: PhantomData, -} - -/// Catches errors from the inner future and maps them to 500 responses. -pub struct ResponseFuture { - inner: T, - f: Arc, - _p: PhantomData, -} - - -// ===== impl MapErr ===== - -impl MapErr -where - T: Service, - F: Fn(E) -> http::StatusCode, -{ - /// Crete a new `MapErr` - pub fn new(inner: T, f: F) -> Self { - MapErr { - inner, - f: Arc::new(f), - _p: PhantomData, - } - } -} - -impl Service for MapErr -where - T: Service, Error = E>, - B: Default, - F: Fn(E) -> http::StatusCode, -{ - type Request = T::Request; - type Response = T::Response; - type Error = h2::Error; - type Future = ResponseFuture; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.inner.poll_ready() - // TODO: Do something with the original error - .map_err(|_| h2::Reason::INTERNAL_ERROR.into()) - } - - fn call(&mut self, request: Self::Request) -> Self::Future { - let inner = self.inner.call(request); - ResponseFuture { - inner, - f: self.f.clone(), - _p: PhantomData, - } - } -} - -// ===== impl ResponseFuture ===== - -impl Future for ResponseFuture -where - T: Future, Error = E>, - B: Default, - F: Fn(E) -> http::StatusCode, -{ - type Item = T::Item; - type Error = h2::Error; - - fn poll(&mut self) -> Poll { - self.inner.poll().or_else(|e| { - let status = (self.f)(e); - let response = http::Response::builder() - .status(status) - .header(CONTENT_LENGTH, "0") - .body(Default::default()) - .unwrap(); - - Ok(response.into()) - }) - } -} diff --git a/src/outbound.rs b/src/outbound.rs index ed06c0633..88f0964f6 100644 --- a/src/outbound.rs +++ b/src/outbound.rs @@ -12,12 +12,12 @@ use tower_discover::{Change, Discover}; use tower_in_flight_limit::InFlightLimit; use tower_h2; use tower_h2_balance::{PendingUntilFirstData, PendingUntilFirstDataBody}; -use linkerd2_proxy_router::Recognize; use bind::{self, Bind, Protocol}; use control::destination::{self, Resolution}; -use svc::NewClient; use ctx; +use proxy::h2_router::Recognize; +use svc::MakeClient; use telemetry::http::service::{ResponseBody as SensorBody}; use timeout::Timeout; use proxy::{h1, HttpBody}; @@ -225,7 +225,7 @@ where // circuit-breaking, this should be able to take care of itself, // closing down when the connection is no longer usable. if let Some((addr, mut bind)) = opt.take() { - let svc = bind.new_client(&addr.into()) + let svc = bind.make_client(&addr.into()) .map_err(|_| BindError::External { addr })?; Ok(Async::Ready(Change::Insert(addr, svc))) } else { diff --git a/src/proxy/h2_router.rs b/src/proxy/h2_router.rs new file mode 100644 index 000000000..4de8c70f9 --- /dev/null +++ b/src/proxy/h2_router.rs @@ -0,0 +1,168 @@ +use futures::{Future, Poll}; +use h2; +use http; +use http::header::CONTENT_LENGTH; +use std::{fmt, error}; +use std::sync::Arc; + +use ctx; +use svc::{MakeClient, Service}; + +extern crate linkerd2_proxy_router; + +use self::linkerd2_proxy_router::Error; +pub use self::linkerd2_proxy_router::{Recognize, Router}; + +pub struct Make +where + R: Recognize, + R::Error: error::Error, + R::RouteError: fmt::Display, +{ + router: Router, +} + +pub struct H2Router +where + R: Recognize, + R::Error: error::Error, + R::RouteError: fmt::Display, +{ + inner: Router, +} + +/// Catches errors from the inner future and maps them to 500 responses. +pub struct ResponseFuture +where + R: Recognize, + R::Error: error::Error, + R::RouteError: fmt::Display, +{ + inner: as Service>::Future, +} + +// ===== impl Make ===== + +impl Make +where + R: Recognize, Response = http::Response>, + R: Send + Sync + 'static, + R::Error: error::Error + Send + 'static, + R::RouteError: fmt::Display + Send + 'static, + A: Send + 'static, + B: Default + Send + 'static, +{ + pub fn new(router: Router) -> Self { + Self { router } + } +} + +impl Clone for Make +where + R: Recognize, + R::Error: error::Error, + R::RouteError: fmt::Display, +{ + fn clone(&self) -> Self { + Self { + router: self.router.clone(), + } + } +} + +impl MakeClient> for Make +where + R: Recognize, Response = http::Response>, + R: Send + Sync + 'static, + R::Error: error::Error + Send + 'static, + R::RouteError: fmt::Display + Send + 'static, + A: Send + 'static, + B: Default + Send + 'static, +{ + type Error = (); + type Client = H2Router; + + fn make_client(&self, _: &Arc) -> Result { + let inner = self.router.clone(); + Ok(H2Router { inner }) + } +} + +fn route_err_to_5xx(e: Error) -> http::StatusCode +where + E: fmt::Display, + F: fmt::Display, +{ + match e { + Error::Route(r) => { + error!("router error: {}", r); + http::StatusCode::INTERNAL_SERVER_ERROR + } + Error::Inner(i) => { + error!("service error: {}", i); + http::StatusCode::INTERNAL_SERVER_ERROR + } + Error::NotRecognized => { + error!("could not recognize request"); + http::StatusCode::INTERNAL_SERVER_ERROR + } + Error::NoCapacity(capacity) => { + // TODO For H2 streams, we should probably signal a protocol-level + // capacity change. + error!("router at capacity ({})", capacity); + http::StatusCode::SERVICE_UNAVAILABLE + } + } +} + +// ===== impl Router ===== + +impl Service for H2Router +where + R: Recognize>, + R::Error: error::Error, + R::RouteError: fmt::Display, + B: Default, +{ + type Request = as Service>::Request; + type Response = as Service>::Response; + type Error = h2::Error; + type Future = ResponseFuture; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.inner.poll_ready().map_err(|e| { + error!("router failed to become ready: {}", e); + h2::Reason::INTERNAL_ERROR.into() + }) + } + + fn call(&mut self, request: Self::Request) -> Self::Future { + let inner = self.inner.call(request); + ResponseFuture { inner } + } +} + +// ===== impl ResponseFuture ===== + +impl Future for ResponseFuture +where + R: Recognize>, + R::Error: error::Error, + R::RouteError: fmt::Display, + B: Default, +{ + type Item = R::Response; + type Error = h2::Error; + + fn poll(&mut self) -> Poll { + self.inner.poll().or_else(|e| { + let response = http::Response::builder() + .status(route_err_to_5xx(e)) + .header(CONTENT_LENGTH, "0") + .body(B::default()) + .unwrap(); + + Ok(response.into()) + }) + } +} diff --git a/src/proxy/mod.rs b/src/proxy/mod.rs index 8ea7cb539..36969edac 100644 --- a/src/proxy/mod.rs +++ b/src/proxy/mod.rs @@ -16,6 +16,7 @@ mod client; mod glue; pub mod h1; +pub mod h2_router; mod upgrade; pub mod orig_proto; mod protocol; diff --git a/src/proxy/server.rs b/src/proxy/server.rs index c8ba5bc59..53eed22f8 100644 --- a/src/proxy/server.rs +++ b/src/proxy/server.rs @@ -1,22 +1,22 @@ use std::{ - error::Error, - fmt, + error, net::SocketAddr, sync::Arc, time::Duration, }; -use futures::{future::Either, Future}; +use futures::{future::{self, Either}, Future}; +use h2; use http; use hyper; use indexmap::IndexSet; use tokio::io::{AsyncRead, AsyncWrite}; -use tower_service::NewService; use tower_h2; use ctx::Proxy as ProxyCtx; use ctx::transport::{Server as ServerCtx}; use drain; +use svc::{MakeClient, Service}; use transport::{self, Connection, GetOriginalDst, Peek}; use super::glue::{HttpBody, HttpBodyNewSvc, HyperServerSvc}; use super::protocol::Protocol; @@ -27,44 +27,40 @@ use super::tcp; /// This type can `serve` new connections, determine what protocol /// the connection is speaking, and route it to the corresponding /// service. -pub struct Server +pub struct Server where - S: NewService>, - S::Future: 'static, + M: MakeClient, Error = ()> + Clone, + M::Client: Service< + Request = http::Request, + Response = http::Response, + >, B: tower_h2::Body, + G: GetOriginalDst, { disable_protocol_detection_ports: IndexSet, drain_signal: drain::Watch, get_orig_dst: G, h1: hyper::server::conn::Http, - h2: tower_h2::Server< - HttpBodyNewSvc, - ::logging::ServerExecutor, - B - >, + h2_settings: h2::server::Builder, listen_addr: SocketAddr, - new_service: S, + make_client: M, proxy_ctx: ProxyCtx, transport_registry: transport::metrics::Registry, tcp: tcp::Forward, log: ::logging::Server, } -impl Server +impl Server where - S: NewService< + M: MakeClient, Error = ()> + Clone, + M::Client: Service< Request = http::Request, - Response = http::Response - > + Clone + Send + 'static, - S::Future: 'static, - ::Service: Send, - <::Service as ::tower_service::Service>::Future: Send, - S::InitError: fmt::Debug, - S::Future: Send + 'static, - B: tower_h2::Body + 'static, - S::Error: Error + Send + Sync + 'static, - S::InitError: Send + fmt::Debug, - B: tower_h2::Body + Send + Default + 'static, + Response = http::Response, + >, + M::Client: Send + 'static, + ::Error: error::Error + Send + Sync + 'static, + ::Future: Send + 'static, + B: tower_h2::Body + Default + Send + 'static, B::Data: Send, ::Buf: Send, G: GetOriginalDst, @@ -76,12 +72,12 @@ where proxy_ctx: ProxyCtx, transport_registry: transport::metrics::Registry, get_orig_dst: G, - stack: S, + make_client: M, tcp_connect_timeout: Duration, disable_protocol_detection_ports: IndexSet, drain_signal: drain::Watch, + h2_settings: h2::server::Builder, ) -> Self { - let recv_body_svc = HttpBodyNewSvc::new(stack.clone()); let tcp = tcp::Forward::new(tcp_connect_timeout, transport_registry.clone()); let log = ::logging::Server::proxy(proxy_ctx, listen_addr); Server { @@ -89,13 +85,9 @@ where drain_signal, get_orig_dst, h1: hyper::server::conn::Http::new(), - h2: tower_h2::Server::new( - recv_body_svc, - Default::default(), - log.clone().executor(), - ), + h2_settings, listen_addr, - new_service: stack, + make_client, proxy_ctx, transport_registry, tcp, @@ -153,69 +145,73 @@ where return log.future(Either::B(fut)); } - // try to sniff protocol + let detect_protocol = io.peek() + .map_err(|e| debug!("peek error: {}", e)) + .map(|io| { + let p = Protocol::detect(io.peeked()); + (p, io) + }); + let h1 = self.h1.clone(); - let h2 = self.h2.clone(); + let h2_settings = self.h2_settings.clone(); + let make_client = self.make_client.clone(); let tcp = self.tcp.clone(); - let new_service = self.new_service.clone(); let drain_signal = self.drain_signal.clone(); let log_clone = log.clone(); - let fut = Either::A(io.peek() - .map_err(|e| debug!("peek error: {}", e)) - .and_then(move |io| match Protocol::detect(io.peeked()) { - Some(Protocol::Http1) => Either::A({ - trace!("detected HTTP/1"); + let serve = detect_protocol + .and_then(move |(proto, io)| match proto { + None => Either::A({ + trace!("did not detect protocol; forwarding TCP"); + tcp_serve(&tcp, io, srv_ctx, drain_signal) + }), - let fut = new_service.new_service() - .map_err(|e| trace!("h1 new_service error: {:?}", e)) - .and_then(move |s| { - let svc = HyperServerSvc::new( - s, - srv_ctx, - drain_signal.clone(), - log_clone.executor(), - ); - let conn = h1 - .serve_connection(io, svc) - // Since using `Connection`s, enable - // support for HTTP upgrades (CONNECT - // and websockets). - .with_upgrades(); - drain_signal - .watch(conn, |conn| { - conn.graceful_shutdown(); - }) - .map(|_| ()) - .map_err(|e| trace!("http1 server error: {:?}", e)) + Some(proto) => Either::B(match proto { + Protocol::Http1 => Either::A({ + trace!("detected HTTP/1"); + match make_client.make_client(&srv_ctx) { + Err(()) => Either::A({ + error!("failed to build HTTP/1 client"); + future::err(()) + }), + Ok(s) => Either::B({ + let svc = HyperServerSvc::new( + s, + srv_ctx, + drain_signal.clone(), + log_clone.executor(), + ); + // Enable support for HTTP upgrades (CONNECT and websockets). + let conn = h1 + .serve_connection(io, svc) + .with_upgrades(); + drain_signal + .watch(conn, |conn| { + conn.graceful_shutdown(); + }) + .map(|_| ()) + .map_err(|e| trace!("http1 server error: {:?}", e)) + }), + } + }), + Protocol::Http2 => Either::B({ + trace!("detected HTTP/2"); + let new_service = make_client.into_new_service(srv_ctx.clone()); + let h2 = tower_h2::Server::new( + HttpBodyNewSvc::new(new_service), + h2_settings, + log_clone.executor(), + ); + let serve = h2.serve_modified(io, move |r: &mut http::Request<()>| { + r.extensions_mut().insert(srv_ctx.clone()); }); - Either::A(fut) + drain_signal + .watch(serve, |conn| conn.graceful_shutdown()) + .map_err(|e| trace!("h2 server error: {:?}", e)) + }), }), - Some(Protocol::Http2) => Either::A({ - trace!("detected HTTP/2"); - let set_ctx = move |request: &mut http::Request<()>| { - request.extensions_mut().insert(srv_ctx.clone()); - }; + }); - let fut = drain_signal - .watch(h2.serve_modified(io, set_ctx), |conn| { - conn.graceful_shutdown(); - }) - .map_err(|e| trace!("h2 server error: {:?}", e)); - - Either::B(fut) - }), - None => { - trace!("did not detect protocol, treating as TCP"); - Either::B(tcp_serve( - &tcp, - io, - srv_ctx, - drain_signal, - )) - } - })); - - log.future(fut) + log.future(Either::A(serve)) } } diff --git a/src/svc/layer.rs b/src/svc/layer.rs new file mode 100644 index 000000000..4d2a95a18 --- /dev/null +++ b/src/svc/layer.rs @@ -0,0 +1,50 @@ +use std::marker::PhantomData; + +/// A stackable element. +/// +/// Given a `Next`-typed inner value, produces a `Bound`-typed value. +/// This is especially useful for composable types like `MakeClient`s. +pub trait Layer { + type Bound; + + /// Produce a `Bound` value from a `Next` value. + fn bind(&self, next: Next) -> Self::Bound; + + /// Compose this `Layer` with another. + fn and_then(self, inner: M) -> AndThen + where + Self: Layer + Sized, + M: Layer, + { + AndThen { + outer: self, + inner, + _p: PhantomData, + } + } +} + +/// Combines two `Layers` into one layer. +#[derive(Debug, Clone)] +pub struct AndThen +where + Outer: Layer, + Inner: Layer, +{ + outer: Outer, + inner: Inner, + // `AndThen` should be Send/Sync independently of `Next`. + _p: PhantomData Next>, +} + +impl Layer for AndThen +where + Outer: Layer, + Inner: Layer, +{ + type Bound = Outer::Bound; + + fn bind(&self, next: Next) -> Self::Bound { + self.outer.bind(self.inner.bind(next)) + } +} diff --git a/src/svc/mod.rs b/src/svc/mod.rs index 906a2d51e..d263b22c7 100644 --- a/src/svc/mod.rs +++ b/src/svc/mod.rs @@ -21,25 +21,23 @@ //! //! * Move HTTP-specific service infrastructure into `svc::http`. +use futures::future; pub use tower_service::{NewService, Service}; -mod reconnect; +pub mod reconnect; +pub mod layer; pub use self::reconnect::Reconnect; +pub use self::layer::Layer; -pub trait NewClient { - - /// Describes a resource to which the client will be attached. - /// - /// Depending on the implementation, the target may describe a logical name - /// to be resolved (i.e. via DNS) and load balanced, or it may describe a - /// specific network address to which one or more connections will be - /// established, or it may describe an entirely arbitrary "virtual" service - /// (i.e. that exists locally in memory). - type Target; - - /// Indicates why the provided `Target` cannot be used to instantiate a client. - type Error; +/// `Target` describes a resource to which the client will be attached. +/// +/// Depending on the implementation, the target may describe a logical name +/// to be resolved (i.e. via DNS) and load balanced, or it may describe a +/// specific network address to which one or more connections will be +/// established, or it may describe an entirely arbitrary "virtual" service +/// (i.e. that exists locally in memory). + pub trait MakeClient { /// Serves requests on behalf of a target. /// @@ -51,10 +49,42 @@ pub trait NewClient { /// client must be discarded. type Client: Service; + /// Indicates why the provided `Target` cannot be used to instantiate a client. + type Error; + /// Creates a client /// /// If the provided `Target` is valid, immediately return a `Client` that may /// become ready lazily, i.e. as the target is resolved and connections are /// established. - fn new_client(&mut self, t: &Self::Target) -> Result; + fn make_client(&self, t: &Target) -> Result; + + fn into_new_service(self, target: Target) -> IntoNewService + where + Self: Sized, + { + IntoNewService { + target, + make_client: self, + } + } +} + +#[derive(Clone, Debug)] +pub struct IntoNewService> { + target: T, + make_client: M, +} + +impl> NewService for IntoNewService { + type Request = ::Request; + type Response = ::Response; + type Error = ::Error; + type Service = M::Client; + type InitError = M::Error; + type Future = future::FutureResult; + + fn new_service(&self) -> Self::Future { + future::result(self.make_client.make_client(&self.target)) + } } diff --git a/src/telemetry/http/mod.rs b/src/telemetry/http/mod.rs index ea1f393c8..2c075fa3b 100644 --- a/src/telemetry/http/mod.rs +++ b/src/telemetry/http/mod.rs @@ -16,6 +16,7 @@ mod labels; mod record; mod sensors; pub mod service; +pub mod timestamp_request_open; use self::labels::{RequestLabels, ResponseLabels}; use self::record::Record; diff --git a/src/telemetry/http/service.rs b/src/telemetry/http/service.rs index ec4d22a49..1a9920003 100644 --- a/src/telemetry/http/service.rs +++ b/src/telemetry/http/service.rs @@ -1,12 +1,12 @@ use bytes::{Buf, IntoBuf}; -use futures::{future, Async, Future, Poll, Stream}; +use futures::{Async, Future, Poll, Stream}; use h2; use http; use std::default::Default; use std::marker::PhantomData; use std::sync::Arc; use std::time::Instant; -use tower_service::{NewService, Service}; +use tower_service::Service; use tower_h2::Body; use ctx; @@ -14,28 +14,10 @@ use proxy::ClientError; use super::event::{self, Event}; use super::sensors::Handle; +use super::timestamp_request_open::RequestOpen; const GRPC_STATUS: &str = "grpc-status"; -/// A `RequestOpen` timestamp. -/// -/// This is added to a request's `Extensions` by the `TimestampRequestOpen` -/// middleware. It's a newtype in order to distinguish it from other -/// `Instant`s that may be added as request extensions. -#[derive(Copy, Clone, Debug)] -pub struct RequestOpen(pub Instant); - -/// Middleware that adds a `RequestOpen` timestamp to requests. -/// -/// This is a separate middleware from `sensor::Http`, because we want -/// to install it at the earliest point in the stack. This is in order -/// to ensure that request latency metrics cover the overhead added by -/// the proxy as accurately as possible. -#[derive(Copy, Clone, Debug)] -pub struct TimestampRequestOpen { - inner: S, -} - /// Wraps a transport with telemetry. #[derive(Debug)] pub struct Http { @@ -553,47 +535,3 @@ impl BodySensor for RequestBodyInner { ) } } - -impl TimestampRequestOpen { - pub fn new(inner: S) -> Self { - Self { inner } - } -} - -impl Service for TimestampRequestOpen -where - S: Service>, -{ - type Request = http::Request; - type Response = S::Response; - type Error = S::Error; - type Future = S::Future; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.inner.poll_ready() - } - - fn call(&mut self, mut req: Self::Request) -> Self::Future { - req.extensions_mut().insert(RequestOpen(Instant::now())); - self.inner.call(req) - } -} - -impl NewService for TimestampRequestOpen -where - S: NewService>, -{ - type Request = S::Request; - type Response = S::Response; - type Error = S::Error; - type InitError = S::InitError; - type Future = future::Map< - S::Future, - fn(S::Service) -> Self::Service - >; - type Service = TimestampRequestOpen; - - fn new_service(&self) -> Self::Future { - self.inner.new_service().map(TimestampRequestOpen::new) - } -} diff --git a/src/telemetry/http/timestamp_request_open.rs b/src/telemetry/http/timestamp_request_open.rs new file mode 100644 index 000000000..fdaafaaa3 --- /dev/null +++ b/src/telemetry/http/timestamp_request_open.rs @@ -0,0 +1,90 @@ +use futures::Poll; +use http; +use std::marker::PhantomData; +use std::time::Instant; + +use svc::{self, Service, MakeClient}; + +/// A `RequestOpen` timestamp. +/// +/// This is added to a request's `Extensions` by the `TimestampRequestOpen` +/// middleware. It's a newtype in order to distinguish it from other +/// `Instant`s that may be added as request extensions. +#[derive(Copy, Clone, Debug)] +pub struct RequestOpen(pub Instant); + +/// Middleware that adds a `RequestOpen` timestamp to requests. +/// +/// This is a separate middleware from `sensor::Http`, because we want +/// to install it at the earliest point in the stack. This is in order +/// to ensure that request latency metrics cover the overhead added by +/// the proxy as accurately as possible. +#[derive(Copy, Clone, Debug)] +pub struct TimestampRequestOpen { + inner: S, +} + +/// Layers a `TimestampRequestOpen` middleware on an HTTP client. +#[derive(Clone, Debug)] +pub struct Layer(PhantomData (T, B)>); + +/// Uses an `M`-typed `MakeClient` to build a `TimestampRequestOpen` service. +#[derive(Clone, Debug)] +pub struct Make(M); + +// === impl TimestampRequsetOpen === + +impl Service for TimestampRequestOpen +where + S: Service>, +{ + type Request = http::Request; + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.inner.poll_ready() + } + + fn call(&mut self, mut req: Self::Request) -> Self::Future { + req.extensions_mut().insert(RequestOpen(Instant::now())); + self.inner.call(req) + } +} + +// === impl Layer === + +impl Layer { + pub fn new() -> Self { + Layer(PhantomData) + } +} + +impl svc::Layer for Layer +where + N: MakeClient, + N::Client: Service>, +{ + type Bound = Make; + + fn bind(&self, next: N) -> Make { + Make(next) + } +} + +// === impl Make === + +impl MakeClient for Make +where + N: MakeClient, + N::Client: Service>, +{ + type Client = TimestampRequestOpen; + type Error = N::Error; + + fn make_client(&self, target: &T) -> Result { + let inner = self.0.make_client(target)?; + Ok(TimestampRequestOpen { inner }) + } +}