Introduce a "server stack" (#90)

Previously, `proxy::Server` was generic over a `NewService` (constructed
in `lib.rs`) that instruments error handling around the router and metrics. In
preparation of adding a metrics module into the server stack (that is configured
by the source connection), `Server` should be changed to instantaneously build
clients with a `MakeClient<Arc<ctx::transport::Server>>`.

In order to do this, the following changes were made:

1. The `svc::NewClient` type was changed to `svc::MakeClient<Target>`. The
    naming change ("New" to "Make") is intended to differentiate the type from
    `NewService`, which is asynchronous and does not accept a `Target` argument.
2. The `proxy::h2_router` module was split from `lib.rs` and `map_err.rs`.  `MapErr`
    tried to be generic, though we only used it in once place. Now, the `h2_router::Make`
    type supports cloning routers and handling their errors.
3. The `TimestampRequestOpen` middleware was split into its own file and given a
    `MakeClient` implementation.
4. The `svc::Layer` trait has been introduced to support layering middlewares like
    `TimestampRequestOpen`. This is analogous to Finagle's `Stack.Module` type.

There are no functional changes.
This commit is contained in:
Oliver Gould 2018-09-11 16:47:25 -07:00 committed by GitHub
parent 9f9518a98f
commit ccd5d21978
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 467 additions and 312 deletions

View File

@ -11,7 +11,7 @@ use tower_h2;
use control::destination::Endpoint;
use ctx;
use svc::{NewClient, Reconnect};
use svc::{MakeClient, Reconnect};
use telemetry;
use proxy::{self, HttpBody, h1, orig_proto};
use transport;
@ -343,16 +343,15 @@ impl<C, B> Bind<C, B> {
}
}
impl<B> NewClient for BindProtocol<ctx::Proxy, B>
impl<B> MakeClient<Endpoint> for BindProtocol<ctx::Proxy, B>
where
B: tower_h2::Body + Send + 'static,
<B::Data as ::bytes::IntoBuf>::Buf: Send,
{
type Target = Endpoint;
type Error = ();
type Client = BoundService<B>;
fn new_client(&mut self, ep: &Endpoint) -> Result<Self::Client, ()> {
fn make_client(&self, ep: &Endpoint) -> Result<Self::Client, ()> {
Ok(self.bind.bind_service(ep, &self.protocol))
}
}

View File

@ -40,7 +40,7 @@ use tower_discover::{Change, Discover};
use tower_service::Service;
use dns;
use svc::NewClient;
use svc::MakeClient;
use tls;
use transport::{DnsNameAndPort, HostAndPort};
@ -158,7 +158,7 @@ impl Resolver {
/// Start watching for address changes for a certain authority.
pub fn resolve<N>(&self, authority: &DnsNameAndPort, new_endpoint: N) -> Resolution<N>
where
N: NewClient,
N: MakeClient<Endpoint>,
{
trace!("resolve; authority={:?}", authority);
let (update_tx, update_rx) = mpsc::unbounded();
@ -189,7 +189,7 @@ impl Resolver {
impl<N> Discover for Resolution<N>
where
N: NewClient<Target = Endpoint>,
N: MakeClient<Endpoint>,
{
type Key = SocketAddr;
type Request = <N::Client as Service>::Request;
@ -212,7 +212,7 @@ where
// existing ones can be handled in the same way.
let endpoint = Endpoint::new(addr, meta);
let service = self.new_endpoint.new_client(&endpoint).map_err(|_| ())?;
let service = self.new_endpoint.make_client(&endpoint).map_err(|_| ())?;
return Ok(Async::Ready(Change::Insert(addr, service)));
},

View File

@ -5,10 +5,10 @@ use tower_service as tower;
use tower_buffer::Buffer;
use tower_in_flight_limit::InFlightLimit;
use tower_h2;
use linkerd2_proxy_router::Recognize;
use bind;
use ctx;
use proxy::h2_router::Recognize;
use proxy::orig_proto;
type Bind<B> = bind::Bind<ctx::Proxy, B>;
@ -100,7 +100,7 @@ mod tests {
use std::net;
use http;
use linkerd2_proxy_router::Recognize;
use proxy::h2_router::Recognize;
use super::Inbound;
use bind::{self, Bind, Host};

View File

@ -53,14 +53,12 @@ extern crate try_lock;
#[macro_use]
extern crate linkerd2_metrics;
extern crate linkerd2_proxy_api;
extern crate linkerd2_proxy_router;
use futures::*;
use std::error::Error;
use std::io;
use std::net::SocketAddr;
use std::sync::Arc;
use std::thread;
use std::time::{Duration, SystemTime};
@ -70,8 +68,6 @@ use tokio::{
runtime::current_thread,
};
use tower_service::NewService;
use tower_fn::*;
use linkerd2_proxy_router::{Recognize, Router, Error as RouteError};
pub mod app;
mod bind;
@ -85,7 +81,6 @@ mod drain;
pub mod fs_watch;
mod inbound;
mod logging;
mod map_err;
mod outbound;
pub mod stream;
mod svc;
@ -94,15 +89,16 @@ pub mod telemetry;
mod proxy;
mod transport;
pub mod timeout;
mod tower_fn; // TODO: move to tower-fn
mod watch_service; // TODO: move to tower
use bind::Bind;
use conditional::Conditional;
use inbound::Inbound;
use map_err::MapErr;
use task::MainRuntime;
use proxy::{HttpBody, Server};
use proxy::h2_router::{self, Router, Recognize};
use svc::Layer;
use telemetry::http::timestamp_request_open;
use transport::{BoundPort, Connection};
pub use transport::{AddrInfo, GetOriginalDst, SoOriginalDst, tls};
use outbound::Outbound;
@ -433,39 +429,16 @@ where
Router<R>: Send,
G: GetOriginalDst + Send + 'static,
{
let stack = Arc::new(NewServiceFn::new(move || {
// Clone the router handle
let router = router.clone();
// Map errors to appropriate response error codes.
let map_err = MapErr::new(router, |e| {
match e {
RouteError::Route(r) => {
error!(" turning route error: {} into 500", r);
http::StatusCode::INTERNAL_SERVER_ERROR
}
RouteError::Inner(i) => {
error!("turning {} into 500", i);
http::StatusCode::INTERNAL_SERVER_ERROR
}
RouteError::NotRecognized => {
error!("turning route not recognized error into 500");
http::StatusCode::INTERNAL_SERVER_ERROR
}
RouteError::NoCapacity(capacity) => {
// TODO For H2 streams, we should probably signal a protocol-level
// capacity change.
error!("router at capacity ({}); returning a 503", capacity);
http::StatusCode::SERVICE_UNAVAILABLE
}
}
});
// Install the request open timestamp module at the very top
// of the stack, in order to take the timestamp as close as
// possible to the beginning of the request's lifetime.
telemetry::http::service::TimestampRequestOpen::new(map_err)
}));
// Install the request open timestamp module at the very top of the
// stack, in order to take the timestamp as close as possible to the
// beginning of the request's lifetime.
//
// TODO replace with a metrics module that is registered to the server
// transport.
let stack = timestamp_request_open::Layer::new()
.bind(h2_router::Make::new(router));
let listen_addr = bound_port.local_addr();
let server = Server::new(
@ -477,6 +450,7 @@ where
tcp_connect_timeout,
disable_protocol_detection_ports,
drain_rx.clone(),
h2::server::Builder::default(),
);
let log = server.log().clone();

View File

@ -1,92 +0,0 @@
use std::marker::PhantomData;
use std::sync::Arc;
use futures::{Future, Poll};
use h2;
use http;
use http::header::CONTENT_LENGTH;
use tower_service::Service;
/// Map an HTTP service's error to an appropriate 500 response.
pub struct MapErr<T, E, F> {
inner: T,
f: Arc<F>,
_p: PhantomData<E>,
}
/// Catches errors from the inner future and maps them to 500 responses.
pub struct ResponseFuture<T, E, F> {
inner: T,
f: Arc<F>,
_p: PhantomData<E>,
}
// ===== impl MapErr =====
impl<T, E, F> MapErr<T, E, F>
where
T: Service<Error = E>,
F: Fn(E) -> http::StatusCode,
{
/// Crete a new `MapErr`
pub fn new(inner: T, f: F) -> Self {
MapErr {
inner,
f: Arc::new(f),
_p: PhantomData,
}
}
}
impl<T, B, E, F> Service for MapErr<T, E, F>
where
T: Service<Response = http::Response<B>, Error = E>,
B: Default,
F: Fn(E) -> http::StatusCode,
{
type Request = T::Request;
type Response = T::Response;
type Error = h2::Error;
type Future = ResponseFuture<T::Future, E, F>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.inner.poll_ready()
// TODO: Do something with the original error
.map_err(|_| h2::Reason::INTERNAL_ERROR.into())
}
fn call(&mut self, request: Self::Request) -> Self::Future {
let inner = self.inner.call(request);
ResponseFuture {
inner,
f: self.f.clone(),
_p: PhantomData,
}
}
}
// ===== impl ResponseFuture =====
impl<T, B, E, F> Future for ResponseFuture<T, E, F>
where
T: Future<Item = http::Response<B>, Error = E>,
B: Default,
F: Fn(E) -> http::StatusCode,
{
type Item = T::Item;
type Error = h2::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll().or_else(|e| {
let status = (self.f)(e);
let response = http::Response::builder()
.status(status)
.header(CONTENT_LENGTH, "0")
.body(Default::default())
.unwrap();
Ok(response.into())
})
}
}

View File

@ -12,12 +12,12 @@ use tower_discover::{Change, Discover};
use tower_in_flight_limit::InFlightLimit;
use tower_h2;
use tower_h2_balance::{PendingUntilFirstData, PendingUntilFirstDataBody};
use linkerd2_proxy_router::Recognize;
use bind::{self, Bind, Protocol};
use control::destination::{self, Resolution};
use svc::NewClient;
use ctx;
use proxy::h2_router::Recognize;
use svc::MakeClient;
use telemetry::http::service::{ResponseBody as SensorBody};
use timeout::Timeout;
use proxy::{h1, HttpBody};
@ -225,7 +225,7 @@ where
// circuit-breaking, this should be able to take care of itself,
// closing down when the connection is no longer usable.
if let Some((addr, mut bind)) = opt.take() {
let svc = bind.new_client(&addr.into())
let svc = bind.make_client(&addr.into())
.map_err(|_| BindError::External { addr })?;
Ok(Async::Ready(Change::Insert(addr, svc)))
} else {

168
src/proxy/h2_router.rs Normal file
View File

@ -0,0 +1,168 @@
use futures::{Future, Poll};
use h2;
use http;
use http::header::CONTENT_LENGTH;
use std::{fmt, error};
use std::sync::Arc;
use ctx;
use svc::{MakeClient, Service};
extern crate linkerd2_proxy_router;
use self::linkerd2_proxy_router::Error;
pub use self::linkerd2_proxy_router::{Recognize, Router};
pub struct Make<R>
where
R: Recognize,
R::Error: error::Error,
R::RouteError: fmt::Display,
{
router: Router<R>,
}
pub struct H2Router<R>
where
R: Recognize,
R::Error: error::Error,
R::RouteError: fmt::Display,
{
inner: Router<R>,
}
/// Catches errors from the inner future and maps them to 500 responses.
pub struct ResponseFuture<R>
where
R: Recognize,
R::Error: error::Error,
R::RouteError: fmt::Display,
{
inner: <Router<R> as Service>::Future,
}
// ===== impl Make =====
impl<R, A, B> Make<R>
where
R: Recognize<Request = http::Request<A>, Response = http::Response<B>>,
R: Send + Sync + 'static,
R::Error: error::Error + Send + 'static,
R::RouteError: fmt::Display + Send + 'static,
A: Send + 'static,
B: Default + Send + 'static,
{
pub fn new(router: Router<R>) -> Self {
Self { router }
}
}
impl<R> Clone for Make<R>
where
R: Recognize,
R::Error: error::Error,
R::RouteError: fmt::Display,
{
fn clone(&self) -> Self {
Self {
router: self.router.clone(),
}
}
}
impl<R, A, B> MakeClient<Arc<ctx::transport::Server>> for Make<R>
where
R: Recognize<Request = http::Request<A>, Response = http::Response<B>>,
R: Send + Sync + 'static,
R::Error: error::Error + Send + 'static,
R::RouteError: fmt::Display + Send + 'static,
A: Send + 'static,
B: Default + Send + 'static,
{
type Error = ();
type Client = H2Router<R>;
fn make_client(&self, _: &Arc<ctx::transport::Server>) -> Result<Self::Client, Self::Error> {
let inner = self.router.clone();
Ok(H2Router { inner })
}
}
fn route_err_to_5xx<E, F>(e: Error<E, F>) -> http::StatusCode
where
E: fmt::Display,
F: fmt::Display,
{
match e {
Error::Route(r) => {
error!("router error: {}", r);
http::StatusCode::INTERNAL_SERVER_ERROR
}
Error::Inner(i) => {
error!("service error: {}", i);
http::StatusCode::INTERNAL_SERVER_ERROR
}
Error::NotRecognized => {
error!("could not recognize request");
http::StatusCode::INTERNAL_SERVER_ERROR
}
Error::NoCapacity(capacity) => {
// TODO For H2 streams, we should probably signal a protocol-level
// capacity change.
error!("router at capacity ({})", capacity);
http::StatusCode::SERVICE_UNAVAILABLE
}
}
}
// ===== impl Router =====
impl<R, B> Service for H2Router<R>
where
R: Recognize<Response = http::Response<B>>,
R::Error: error::Error,
R::RouteError: fmt::Display,
B: Default,
{
type Request = <Router<R> as Service>::Request;
type Response = <Router<R> as Service>::Response;
type Error = h2::Error;
type Future = ResponseFuture<R>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.inner.poll_ready().map_err(|e| {
error!("router failed to become ready: {}", e);
h2::Reason::INTERNAL_ERROR.into()
})
}
fn call(&mut self, request: Self::Request) -> Self::Future {
let inner = self.inner.call(request);
ResponseFuture { inner }
}
}
// ===== impl ResponseFuture =====
impl<R, B> Future for ResponseFuture<R>
where
R: Recognize<Response = http::Response<B>>,
R::Error: error::Error,
R::RouteError: fmt::Display,
B: Default,
{
type Item = R::Response;
type Error = h2::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll().or_else(|e| {
let response = http::Response::builder()
.status(route_err_to_5xx(e))
.header(CONTENT_LENGTH, "0")
.body(B::default())
.unwrap();
Ok(response.into())
})
}
}

View File

@ -16,6 +16,7 @@
mod client;
mod glue;
pub mod h1;
pub mod h2_router;
mod upgrade;
pub mod orig_proto;
mod protocol;

View File

@ -1,22 +1,22 @@
use std::{
error::Error,
fmt,
error,
net::SocketAddr,
sync::Arc,
time::Duration,
};
use futures::{future::Either, Future};
use futures::{future::{self, Either}, Future};
use h2;
use http;
use hyper;
use indexmap::IndexSet;
use tokio::io::{AsyncRead, AsyncWrite};
use tower_service::NewService;
use tower_h2;
use ctx::Proxy as ProxyCtx;
use ctx::transport::{Server as ServerCtx};
use drain;
use svc::{MakeClient, Service};
use transport::{self, Connection, GetOriginalDst, Peek};
use super::glue::{HttpBody, HttpBodyNewSvc, HyperServerSvc};
use super::protocol::Protocol;
@ -27,44 +27,40 @@ use super::tcp;
/// This type can `serve` new connections, determine what protocol
/// the connection is speaking, and route it to the corresponding
/// service.
pub struct Server<S, B, G>
pub struct Server<M, B, G>
where
S: NewService<Request=http::Request<HttpBody>>,
S::Future: 'static,
M: MakeClient<Arc<ServerCtx>, Error = ()> + Clone,
M::Client: Service<
Request = http::Request<HttpBody>,
Response = http::Response<B>,
>,
B: tower_h2::Body,
G: GetOriginalDst,
{
disable_protocol_detection_ports: IndexSet<u16>,
drain_signal: drain::Watch,
get_orig_dst: G,
h1: hyper::server::conn::Http,
h2: tower_h2::Server<
HttpBodyNewSvc<S>,
::logging::ServerExecutor,
B
>,
h2_settings: h2::server::Builder,
listen_addr: SocketAddr,
new_service: S,
make_client: M,
proxy_ctx: ProxyCtx,
transport_registry: transport::metrics::Registry,
tcp: tcp::Forward,
log: ::logging::Server,
}
impl<S, B, G> Server<S, B, G>
impl<M, B, G> Server<M, B, G>
where
S: NewService<
M: MakeClient<Arc<ServerCtx>, Error = ()> + Clone,
M::Client: Service<
Request = http::Request<HttpBody>,
Response = http::Response<B>
> + Clone + Send + 'static,
S::Future: 'static,
<S as NewService>::Service: Send,
<<S as NewService>::Service as ::tower_service::Service>::Future: Send,
S::InitError: fmt::Debug,
S::Future: Send + 'static,
B: tower_h2::Body + 'static,
S::Error: Error + Send + Sync + 'static,
S::InitError: Send + fmt::Debug,
B: tower_h2::Body + Send + Default + 'static,
Response = http::Response<B>,
>,
M::Client: Send + 'static,
<M::Client as Service>::Error: error::Error + Send + Sync + 'static,
<M::Client as Service>::Future: Send + 'static,
B: tower_h2::Body + Default + Send + 'static,
B::Data: Send,
<B::Data as ::bytes::IntoBuf>::Buf: Send,
G: GetOriginalDst,
@ -76,12 +72,12 @@ where
proxy_ctx: ProxyCtx,
transport_registry: transport::metrics::Registry,
get_orig_dst: G,
stack: S,
make_client: M,
tcp_connect_timeout: Duration,
disable_protocol_detection_ports: IndexSet<u16>,
drain_signal: drain::Watch,
h2_settings: h2::server::Builder,
) -> Self {
let recv_body_svc = HttpBodyNewSvc::new(stack.clone());
let tcp = tcp::Forward::new(tcp_connect_timeout, transport_registry.clone());
let log = ::logging::Server::proxy(proxy_ctx, listen_addr);
Server {
@ -89,13 +85,9 @@ where
drain_signal,
get_orig_dst,
h1: hyper::server::conn::Http::new(),
h2: tower_h2::Server::new(
recv_body_svc,
Default::default(),
log.clone().executor(),
),
h2_settings,
listen_addr,
new_service: stack,
make_client,
proxy_ctx,
transport_registry,
tcp,
@ -153,69 +145,73 @@ where
return log.future(Either::B(fut));
}
// try to sniff protocol
let detect_protocol = io.peek()
.map_err(|e| debug!("peek error: {}", e))
.map(|io| {
let p = Protocol::detect(io.peeked());
(p, io)
});
let h1 = self.h1.clone();
let h2 = self.h2.clone();
let h2_settings = self.h2_settings.clone();
let make_client = self.make_client.clone();
let tcp = self.tcp.clone();
let new_service = self.new_service.clone();
let drain_signal = self.drain_signal.clone();
let log_clone = log.clone();
let fut = Either::A(io.peek()
.map_err(|e| debug!("peek error: {}", e))
.and_then(move |io| match Protocol::detect(io.peeked()) {
Some(Protocol::Http1) => Either::A({
trace!("detected HTTP/1");
let serve = detect_protocol
.and_then(move |(proto, io)| match proto {
None => Either::A({
trace!("did not detect protocol; forwarding TCP");
tcp_serve(&tcp, io, srv_ctx, drain_signal)
}),
let fut = new_service.new_service()
.map_err(|e| trace!("h1 new_service error: {:?}", e))
.and_then(move |s| {
let svc = HyperServerSvc::new(
s,
srv_ctx,
drain_signal.clone(),
log_clone.executor(),
);
let conn = h1
.serve_connection(io, svc)
// Since using `Connection`s, enable
// support for HTTP upgrades (CONNECT
// and websockets).
.with_upgrades();
drain_signal
.watch(conn, |conn| {
conn.graceful_shutdown();
})
.map(|_| ())
.map_err(|e| trace!("http1 server error: {:?}", e))
Some(proto) => Either::B(match proto {
Protocol::Http1 => Either::A({
trace!("detected HTTP/1");
match make_client.make_client(&srv_ctx) {
Err(()) => Either::A({
error!("failed to build HTTP/1 client");
future::err(())
}),
Ok(s) => Either::B({
let svc = HyperServerSvc::new(
s,
srv_ctx,
drain_signal.clone(),
log_clone.executor(),
);
// Enable support for HTTP upgrades (CONNECT and websockets).
let conn = h1
.serve_connection(io, svc)
.with_upgrades();
drain_signal
.watch(conn, |conn| {
conn.graceful_shutdown();
})
.map(|_| ())
.map_err(|e| trace!("http1 server error: {:?}", e))
}),
}
}),
Protocol::Http2 => Either::B({
trace!("detected HTTP/2");
let new_service = make_client.into_new_service(srv_ctx.clone());
let h2 = tower_h2::Server::new(
HttpBodyNewSvc::new(new_service),
h2_settings,
log_clone.executor(),
);
let serve = h2.serve_modified(io, move |r: &mut http::Request<()>| {
r.extensions_mut().insert(srv_ctx.clone());
});
Either::A(fut)
drain_signal
.watch(serve, |conn| conn.graceful_shutdown())
.map_err(|e| trace!("h2 server error: {:?}", e))
}),
}),
Some(Protocol::Http2) => Either::A({
trace!("detected HTTP/2");
let set_ctx = move |request: &mut http::Request<()>| {
request.extensions_mut().insert(srv_ctx.clone());
};
});
let fut = drain_signal
.watch(h2.serve_modified(io, set_ctx), |conn| {
conn.graceful_shutdown();
})
.map_err(|e| trace!("h2 server error: {:?}", e));
Either::B(fut)
}),
None => {
trace!("did not detect protocol, treating as TCP");
Either::B(tcp_serve(
&tcp,
io,
srv_ctx,
drain_signal,
))
}
}));
log.future(fut)
log.future(Either::A(serve))
}
}

50
src/svc/layer.rs Normal file
View File

@ -0,0 +1,50 @@
use std::marker::PhantomData;
/// A stackable element.
///
/// Given a `Next`-typed inner value, produces a `Bound`-typed value.
/// This is especially useful for composable types like `MakeClient`s.
pub trait Layer<Next> {
type Bound;
/// Produce a `Bound` value from a `Next` value.
fn bind(&self, next: Next) -> Self::Bound;
/// Compose this `Layer` with another.
fn and_then<M>(self, inner: M) -> AndThen<Next, Self, M>
where
Self: Layer<M::Bound> + Sized,
M: Layer<Next>,
{
AndThen {
outer: self,
inner,
_p: PhantomData,
}
}
}
/// Combines two `Layers` into one layer.
#[derive(Debug, Clone)]
pub struct AndThen<Next, Outer, Inner>
where
Outer: Layer<Inner::Bound>,
Inner: Layer<Next>,
{
outer: Outer,
inner: Inner,
// `AndThen` should be Send/Sync independently of `Next`.
_p: PhantomData<fn() -> Next>,
}
impl<Next, Outer, Inner> Layer<Next> for AndThen<Next, Outer, Inner>
where
Outer: Layer<Inner::Bound>,
Inner: Layer<Next>,
{
type Bound = Outer::Bound;
fn bind(&self, next: Next) -> Self::Bound {
self.outer.bind(self.inner.bind(next))
}
}

View File

@ -21,25 +21,23 @@
//!
//! * Move HTTP-specific service infrastructure into `svc::http`.
use futures::future;
pub use tower_service::{NewService, Service};
mod reconnect;
pub mod reconnect;
pub mod layer;
pub use self::reconnect::Reconnect;
pub use self::layer::Layer;
pub trait NewClient {
/// Describes a resource to which the client will be attached.
///
/// Depending on the implementation, the target may describe a logical name
/// to be resolved (i.e. via DNS) and load balanced, or it may describe a
/// specific network address to which one or more connections will be
/// established, or it may describe an entirely arbitrary "virtual" service
/// (i.e. that exists locally in memory).
type Target;
/// Indicates why the provided `Target` cannot be used to instantiate a client.
type Error;
/// `Target` describes a resource to which the client will be attached.
///
/// Depending on the implementation, the target may describe a logical name
/// to be resolved (i.e. via DNS) and load balanced, or it may describe a
/// specific network address to which one or more connections will be
/// established, or it may describe an entirely arbitrary "virtual" service
/// (i.e. that exists locally in memory).
pub trait MakeClient<Target> {
/// Serves requests on behalf of a target.
///
@ -51,10 +49,42 @@ pub trait NewClient {
/// client must be discarded.
type Client: Service;
/// Indicates why the provided `Target` cannot be used to instantiate a client.
type Error;
/// Creates a client
///
/// If the provided `Target` is valid, immediately return a `Client` that may
/// become ready lazily, i.e. as the target is resolved and connections are
/// established.
fn new_client(&mut self, t: &Self::Target) -> Result<Self::Client, Self::Error>;
fn make_client(&self, t: &Target) -> Result<Self::Client, Self::Error>;
fn into_new_service(self, target: Target) -> IntoNewService<Target, Self>
where
Self: Sized,
{
IntoNewService {
target,
make_client: self,
}
}
}
#[derive(Clone, Debug)]
pub struct IntoNewService<T, M: MakeClient<T>> {
target: T,
make_client: M,
}
impl<T, M: MakeClient<T>> NewService for IntoNewService<T, M> {
type Request = <M::Client as Service>::Request;
type Response = <M::Client as Service>::Response;
type Error = <M::Client as Service>::Error;
type Service = M::Client;
type InitError = M::Error;
type Future = future::FutureResult<Self::Service, Self::InitError>;
fn new_service(&self) -> Self::Future {
future::result(self.make_client.make_client(&self.target))
}
}

View File

@ -16,6 +16,7 @@ mod labels;
mod record;
mod sensors;
pub mod service;
pub mod timestamp_request_open;
use self::labels::{RequestLabels, ResponseLabels};
use self::record::Record;

View File

@ -1,12 +1,12 @@
use bytes::{Buf, IntoBuf};
use futures::{future, Async, Future, Poll, Stream};
use futures::{Async, Future, Poll, Stream};
use h2;
use http;
use std::default::Default;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Instant;
use tower_service::{NewService, Service};
use tower_service::Service;
use tower_h2::Body;
use ctx;
@ -14,28 +14,10 @@ use proxy::ClientError;
use super::event::{self, Event};
use super::sensors::Handle;
use super::timestamp_request_open::RequestOpen;
const GRPC_STATUS: &str = "grpc-status";
/// A `RequestOpen` timestamp.
///
/// This is added to a request's `Extensions` by the `TimestampRequestOpen`
/// middleware. It's a newtype in order to distinguish it from other
/// `Instant`s that may be added as request extensions.
#[derive(Copy, Clone, Debug)]
pub struct RequestOpen(pub Instant);
/// Middleware that adds a `RequestOpen` timestamp to requests.
///
/// This is a separate middleware from `sensor::Http`, because we want
/// to install it at the earliest point in the stack. This is in order
/// to ensure that request latency metrics cover the overhead added by
/// the proxy as accurately as possible.
#[derive(Copy, Clone, Debug)]
pub struct TimestampRequestOpen<S> {
inner: S,
}
/// Wraps a transport with telemetry.
#[derive(Debug)]
pub struct Http<S, A, B> {
@ -553,47 +535,3 @@ impl BodySensor for RequestBodyInner {
)
}
}
impl<S> TimestampRequestOpen<S> {
pub fn new(inner: S) -> Self {
Self { inner }
}
}
impl<S, B> Service for TimestampRequestOpen<S>
where
S: Service<Request = http::Request<B>>,
{
type Request = http::Request<B>;
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.inner.poll_ready()
}
fn call(&mut self, mut req: Self::Request) -> Self::Future {
req.extensions_mut().insert(RequestOpen(Instant::now()));
self.inner.call(req)
}
}
impl<S, B> NewService for TimestampRequestOpen<S>
where
S: NewService<Request = http::Request<B>>,
{
type Request = S::Request;
type Response = S::Response;
type Error = S::Error;
type InitError = S::InitError;
type Future = future::Map<
S::Future,
fn(S::Service) -> Self::Service
>;
type Service = TimestampRequestOpen<S::Service>;
fn new_service(&self) -> Self::Future {
self.inner.new_service().map(TimestampRequestOpen::new)
}
}

View File

@ -0,0 +1,90 @@
use futures::Poll;
use http;
use std::marker::PhantomData;
use std::time::Instant;
use svc::{self, Service, MakeClient};
/// A `RequestOpen` timestamp.
///
/// This is added to a request's `Extensions` by the `TimestampRequestOpen`
/// middleware. It's a newtype in order to distinguish it from other
/// `Instant`s that may be added as request extensions.
#[derive(Copy, Clone, Debug)]
pub struct RequestOpen(pub Instant);
/// Middleware that adds a `RequestOpen` timestamp to requests.
///
/// This is a separate middleware from `sensor::Http`, because we want
/// to install it at the earliest point in the stack. This is in order
/// to ensure that request latency metrics cover the overhead added by
/// the proxy as accurately as possible.
#[derive(Copy, Clone, Debug)]
pub struct TimestampRequestOpen<S> {
inner: S,
}
/// Layers a `TimestampRequestOpen` middleware on an HTTP client.
#[derive(Clone, Debug)]
pub struct Layer<T, B>(PhantomData<fn() -> (T, B)>);
/// Uses an `M`-typed `MakeClient` to build a `TimestampRequestOpen` service.
#[derive(Clone, Debug)]
pub struct Make<M>(M);
// === impl TimestampRequsetOpen ===
impl<S, B> Service for TimestampRequestOpen<S>
where
S: Service<Request = http::Request<B>>,
{
type Request = http::Request<B>;
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.inner.poll_ready()
}
fn call(&mut self, mut req: Self::Request) -> Self::Future {
req.extensions_mut().insert(RequestOpen(Instant::now()));
self.inner.call(req)
}
}
// === impl Layer ===
impl<T, B> Layer<T, B> {
pub fn new() -> Self {
Layer(PhantomData)
}
}
impl<N, T, B> svc::Layer<N> for Layer<T, B>
where
N: MakeClient<T>,
N::Client: Service<Request = http::Request<B>>,
{
type Bound = Make<N>;
fn bind(&self, next: N) -> Make<N> {
Make(next)
}
}
// === impl Make ===
impl<N, T, B> MakeClient<T> for Make<N>
where
N: MakeClient<T>,
N::Client: Service<Request = http::Request<B>>,
{
type Client = TimestampRequestOpen<N::Client>;
type Error = N::Error;
fn make_client(&self, target: &T) -> Result<Self::Client, Self::Error> {
let inner = self.0.make_client(target)?;
Ok(TimestampRequestOpen { inner })
}
}