Replace tower-h2 tap service with hyper

This untangles some of the HTTP/gRPC glue, providing services/stacks
that have more specific focuses. The `HyperServerSvc` now *only*
converts to a `tower::Service`, and the HTTP/1.1 and Upgrade pieces were
moved to a specific `proxy::http::upgrade::Service`.

Several stack modules were added to `proxy::grpc`, which can map request
and response bodies into `Payload`, or into `grpc::Body`, as needed.

Signed-off-by: Sean McArthur <sean@buoyant.io>
This commit is contained in:
Sean McArthur 2018-12-18 11:48:30 -08:00 committed by Sean McArthur
parent 0a085f98e8
commit 792c04b7d1
13 changed files with 378 additions and 247 deletions

View File

@ -568,7 +568,6 @@ dependencies = [
"tower-buffer 0.1.0 (git+https://github.com/tower-rs/tower)",
"tower-discover 0.1.0 (git+https://github.com/tower-rs/tower)",
"tower-grpc 0.1.0 (git+https://github.com/tower-rs/tower-grpc)",
"tower-h2 0.1.0 (git+https://github.com/tower-rs/tower-h2)",
"tower-http 0.1.0 (git+https://github.com/tower-rs/tower-http)",
"tower-in-flight-limit 0.1.0 (git+https://github.com/tower-rs/tower)",
"tower-reconnect 0.1.0 (git+https://github.com/tower-rs/tower)",

View File

@ -68,8 +68,7 @@ tower-reconnect = { git = "https://github.com/tower-rs/tower" }
tower-service = { git = "https://github.com/tower-rs/tower" }
tower-util = { git = "https://github.com/tower-rs/tower" }
tower-http = { git = "https://github.com/tower-rs/tower-http" }
tower-h2 = { git = "https://github.com/tower-rs/tower-h2" }
tower-grpc = { git = "https://github.com/tower-rs/tower-grpc" }
tower-grpc = { git = "https://github.com/tower-rs/tower-grpc", default-features = false, features = ["protobuf"] }
# FIXME update to a release when available (>0.10)
trust-dns-resolver = { git = "https://github.com/bluejekyll/trust-dns", rev = "c017c114", default-features = false }

View File

@ -455,104 +455,3 @@ pub mod client {
}
}
pub mod grpc_request_payload {
use bytes::Bytes;
use http;
use futures::Poll;
use std::marker::PhantomData;
use tower_grpc::{Body};
use proxy::http::GrpcBody as GlueBody;
use svc;
#[derive(Debug)]
pub struct Layer<B>(PhantomData<fn() -> B>);
#[derive(Debug)]
pub struct Stack<B, M> {
inner: M,
_p: PhantomData<fn() -> B>,
}
#[derive(Debug)]
pub struct Service<B, S> {
inner: S,
_p: PhantomData<fn() -> B>,
}
// === impl Layer ===
pub fn layer<B>() -> Layer<B>
where
B: Body + Send + 'static,
{
Layer(PhantomData)
}
impl<B> Clone for Layer<B> {
fn clone(&self) -> Self {
Layer(PhantomData)
}
}
impl<B, T, M> svc::Layer<T, T, M> for Layer<B>
where
B: Body<Data = Bytes> + Send + 'static,
M: svc::Stack<T>,
M::Value: svc::Service<http::Request<GlueBody<B>>>,
{
type Value = <Stack<B, M> as svc::Stack<T>>::Value;
type Error = <Stack<B, M> as svc::Stack<T>>::Error;
type Stack = Stack<B, M>;
fn bind(&self, inner: M) -> Self::Stack {
Stack { inner, _p: PhantomData }
}
}
// === impl Stack ===
impl<B, M: Clone> Clone for Stack<B, M> {
fn clone(&self) -> Self {
Stack {
inner: self.inner.clone(),
_p: PhantomData,
}
}
}
impl<B, T, M> svc::Stack<T> for Stack<B, M>
where
B: Body<Data = Bytes> + Send + 'static,
M: svc::Stack<T>,
M::Value: svc::Service<http::Request<GlueBody<B>>>,
{
type Value = Service<B, M::Value>;
type Error = M::Error;
fn make(&self, target: &T) -> Result<Self::Value, Self::Error> {
let inner = self.inner.make(target)?;
Ok(Service { inner, _p: PhantomData })
}
}
// === impl Service ===
impl<B, S> svc::Service<http::Request<B>> for Service<B, S>
where
B: Body<Data = Bytes> + Send + 'static,
S: svc::Service<http::Request<GlueBody<B>>>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.inner.poll_ready()
}
fn call(&mut self, req: http::Request<B>) -> Self::Future {
self.inner.call(req.map(GlueBody::new))
}
}
}

View File

@ -1,6 +1,4 @@
use bytes;
use futures::{self, future, Future, Poll};
use h2;
use http;
use hyper;
use indexmap::IndexSet;
@ -10,7 +8,7 @@ use std::time::{Duration, SystemTime};
use std::{error, fmt, io};
use tokio::executor::{self, DefaultExecutor, Executor};
use tokio::runtime::current_thread;
use tower_h2;
use tower_grpc as grpc;
use app::classify::{self, Class};
use app::metric_labels::{ControlLabels, EndpointLabels, RouteLabels};
@ -247,7 +245,7 @@ where
.push(http_metrics::layer::<_, classify::Response>(
ctl_http_metrics,
))
.push(control::grpc_request_payload::layer())
.push(proxy::grpc::req_body_as_payload::layer())
.push(svc::watch::layer(tls_client_config.clone()))
.push(phantom_data::layer())
.push(control::add_origin::layer())
@ -733,29 +731,42 @@ fn serve_tap<N, B>(
new_service: N,
) -> impl Future<Item = (), Error = ()> + 'static
where
B: tower_h2::Body + Send + 'static,
<B::Data as bytes::IntoBuf>::Buf: Send,
N: svc::MakeService<(), http::Request<tower_h2::RecvBody>, Response = http::Response<B>>
B: tower_grpc::Body + Send + 'static,
B::Data: Send + 'static,
<B::Data as bytes::IntoBuf>::Buf: Send + 'static,
N: svc::MakeService<(), http::Request<grpc::BoxBody>, Response = http::Response<B>>
+ Send
+ 'static,
tower_h2::server::Connection<Connection, N, ::logging::ServerExecutor, B, ()>:
Future<Item = ()>,
N::Error: error::Error + Send + Sync,
N::MakeError: error::Error,
<N::Service as svc::Service<http::Request<grpc::BoxBody>>>::Future: Send + 'static,
{
let log = logging::admin().server("tap", bound_port.local_addr());
let h2_builder = h2::server::Builder::default();
let server = tower_h2::Server::new(new_service, h2_builder, log.clone().executor());
let fut = {
let log = log.clone();
// TODO: serve over TLS.
bound_port
.listen_and_fold(server, move |mut server, (session, remote)| {
.listen_and_fold(new_service, move |mut new_service, (session, remote)| {
let log = log.clone().with_remote(remote);
let serve = server.serve(session).map_err(|_| ());
let log_clone = log.clone();
let serve = new_service
.make_service(())
.map_err(|err| error!("tap MakeService error: {}", err))
.and_then(move |svc| {
let svc = proxy::grpc::req_box_body::Service::new(svc);
let svc = proxy::grpc::res_body_as_payload::Service::new(svc);
let svc = proxy::http::HyperServerSvc::new(svc);
hyper::server::conn::Http::new()
.with_executor(log_clone.executor())
.http2_only(true)
.serve_connection(session, svc)
.map_err(|err| debug!("tap connection error: {}", err))
});
let r = executor::current_thread::TaskExecutor::current()
.spawn_local(Box::new(log.future(serve)))
.map(move |_| server)
.map(|()| new_service)
.map_err(task::Error::into_io);
future::result(r)
})
@ -764,3 +775,5 @@ where
log.future(fut)
}

View File

@ -37,7 +37,6 @@ extern crate ring;
extern crate tokio;
extern crate tokio_timer;
extern crate tower_grpc;
extern crate tower_h2;
extern crate tower_http;
extern crate tower_util;
extern crate trust_dns_resolver;

58
src/proxy/grpc/body.rs Normal file
View File

@ -0,0 +1,58 @@
use bytes::IntoBuf;
use futures::{Poll};
use http;
use hyper::body::Payload;
use tower_grpc as grpc;
#[derive(Debug)]
pub struct GrpcBody<B>(B);
// ===== impl GrpcBody =====
impl<B> GrpcBody<B> {
pub fn new(inner: B) -> Self {
GrpcBody(inner)
}
}
impl<B> Payload for GrpcBody<B>
where
B: grpc::Body + Send + 'static,
B::Data: Send + 'static,
<B::Data as IntoBuf>::Buf: Send + 'static,
{
type Data = <B::Data as IntoBuf>::Buf;
type Error = h2::Error;
fn is_end_stream(&self) -> bool {
grpc::Body::is_end_stream(&self.0)
}
fn poll_data(&mut self) -> Poll<Option<Self::Data>, Self::Error> {
let data = try_ready!(grpc::Body::poll_data(&mut self.0));
Ok(data.map(IntoBuf::into_buf).into())
}
fn poll_trailers(&mut self) -> Poll<Option<http::HeaderMap>, Self::Error> {
grpc::Body::poll_metadata(&mut self.0).map_err(From::from)
}
}
impl<B> grpc::Body for GrpcBody<B>
where
B: grpc::Body,
{
type Data = B::Data;
fn is_end_stream(&self) -> bool {
grpc::Body::is_end_stream(&self.0)
}
fn poll_data(&mut self) -> Poll<Option<Self::Data>, grpc::Error> {
grpc::Body::poll_data(&mut self.0)
}
fn poll_metadata(&mut self) -> Poll<Option<http::HeaderMap>, grpc::Error> {
grpc::Body::poll_metadata(&mut self.0)
}
}

9
src/proxy/grpc/mod.rs Normal file
View File

@ -0,0 +1,9 @@
mod body;
mod service;
pub use self::body::GrpcBody;
pub use self::service::{
req_body_as_payload,
req_box_body,
res_body_as_payload,
};

152
src/proxy/grpc/service.rs Normal file
View File

@ -0,0 +1,152 @@
pub mod req_body_as_payload {
use bytes::Bytes;
use http;
use futures::Poll;
use tower_grpc::Body;
use super::super::GrpcBody;
use svc;
#[derive(Clone, Debug)]
pub struct Layer;
#[derive(Clone, Debug)]
pub struct Stack<M> {
inner: M,
}
#[derive(Debug)]
pub struct Service<S>(S);
// === impl Layer ===
pub fn layer() -> Layer {
Layer
}
impl<T, M> svc::Layer<T, T, M> for Layer
where
M: svc::Stack<T>,
{
type Value = <Stack<M> as svc::Stack<T>>::Value;
type Error = <Stack<M> as svc::Stack<T>>::Error;
type Stack = Stack<M>;
fn bind(&self, inner: M) -> Self::Stack {
Stack { inner }
}
}
// === impl Stack ===
impl<T, M> svc::Stack<T> for Stack<M>
where
M: svc::Stack<T>,
{
type Value = Service<M::Value>;
type Error = M::Error;
fn make(&self, target: &T) -> Result<Self::Value, Self::Error> {
let inner = self.inner.make(target)?;
Ok(Service(inner))
}
}
// === impl Service ===
impl<B, S> svc::Service<http::Request<B>> for Service<S>
where
B: Body<Data = Bytes> + Send + 'static,
S: svc::Service<http::Request<GrpcBody<B>>>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.0.poll_ready()
}
fn call(&mut self, req: http::Request<B>) -> Self::Future {
self.0.call(req.map(GrpcBody::new))
}
}
}
pub mod req_box_body {
use bytes::Bytes;
use http;
use futures::Poll;
use tower_grpc::{Body, BoxBody};
use svc;
pub struct Service<S>(S);
impl<S> Service<S> {
pub fn new(service: S) -> Self {
Service(service)
}
}
impl<B, S> svc::Service<http::Request<B>> for Service<S>
where
B: Body<Data = Bytes> + Send + 'static,
S: svc::Service<http::Request<BoxBody>>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.0.poll_ready()
}
fn call(&mut self, req: http::Request<B>) -> Self::Future {
self.0.call(req.map(|b| BoxBody::new(Box::new(b))))
}
}
}
pub mod res_body_as_payload {
use http;
use futures::{future, Future, Poll};
use tower_grpc::Body;
use super::super::GrpcBody;
use svc;
pub struct Service<S>(S);
impl<S> Service<S> {
pub fn new(service: S) -> Self {
Service(service)
}
}
impl<B1, B2, S> svc::Service<http::Request<B1>> for Service<S>
where
B2: Body,
S: svc::Service<
http::Request<B1>,
Response = http::Response<B2>,
>,
{
type Response = http::Response<GrpcBody<B2>>;
type Error = S::Error;
type Future = future::Map<S::Future, fn(http::Response<B2>) -> http::Response<GrpcBody<B2>>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.0.poll_ready()
}
fn call(&mut self, req: http::Request<B1>) -> Self::Future {
self.0.call(req)
.map(|res| {
res.map(GrpcBody::new)
})
}
}
}

View File

@ -1,6 +1,5 @@
use bytes::IntoBuf;
use futures::{future, Async, Future, Poll};
use futures::future::Either;
use bytes::{Bytes};
use futures::{Async, Future, Poll};
use http;
use h2;
use hyper::{self, body::Payload};
@ -8,10 +7,8 @@ use hyper::client::connect as hyper_connect;
use std::{error::Error as StdError, fmt};
use tower_grpc as grpc;
use drain;
use proxy::http::{HasH2Reason, h1, upgrade::Http11Upgrade};
use proxy::http::{HasH2Reason, upgrade::Http11Upgrade};
use svc;
use task::{BoxSendFuture, ErasedExecutor, Executor};
use transport::Connect;
/// Provides optional HTTP/1.1 upgrade support on the body.
@ -23,18 +20,10 @@ pub struct HttpBody {
pub(super) upgrade: Option<Http11Upgrade>
}
#[derive(Debug)]
pub struct GrpcBody<B>(B);
/// Glue for a `tower::Service` to used as a `hyper::server::Service`.
#[derive(Debug)]
pub(in proxy) struct HyperServerSvc<S, E> {
pub struct HyperServerSvc<S> {
service: S,
/// Watch any spawned HTTP/1.1 upgrade tasks.
upgrade_drain_signal: drain::Watch,
/// Executor used to spawn HTTP/1.1 upgrade tasks, and TCP proxies
/// after they succeed.
upgrade_executor: E,
}
/// Glue for any `tokio_connect::Connect` to implement `hyper::client::Connect`.
@ -98,6 +87,28 @@ impl Payload for HttpBody {
}
}
impl grpc::Body for HttpBody {
type Data = Bytes;
fn is_end_stream(&self) -> bool {
Payload::is_end_stream(self)
}
fn poll_data(&mut self) -> Poll<Option<Self::Data>, grpc::Error> {
match Payload::poll_data(self) {
Ok(Async::Ready(Some(chunk))) => Ok(Async::Ready(Some(chunk.into()))),
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => Err(err.into()),
}
}
fn poll_metadata(&mut self) -> Poll<Option<http::HeaderMap>, grpc::Error> {
Payload::poll_trailers(self)
.map_err(From::from)
}
}
impl Default for HttpBody {
fn default() -> HttpBody {
HttpBody {
@ -121,127 +132,35 @@ impl Drop for HttpBody {
}
}
// ===== impl GrpcBody =====
impl<B> GrpcBody<B> {
pub fn new(inner: B) -> Self {
GrpcBody(inner)
}
}
impl<B> Payload for GrpcBody<B>
where
B: grpc::Body + Send + 'static,
B::Data: Send + 'static,
<B::Data as IntoBuf>::Buf: Send + 'static,
{
type Data = <B::Data as IntoBuf>::Buf;
type Error = h2::Error;
fn is_end_stream(&self) -> bool {
grpc::Body::is_end_stream(&self.0)
}
fn poll_data(&mut self) -> Poll<Option<Self::Data>, Self::Error> {
let data = try_ready!(grpc::Body::poll_data(&mut self.0));
Ok(data.map(IntoBuf::into_buf).into())
}
fn poll_trailers(&mut self) -> Poll<Option<http::HeaderMap>, Self::Error> {
grpc::Body::poll_metadata(&mut self.0).map_err(From::from)
}
}
impl<B> grpc::Body for GrpcBody<B>
where
B: grpc::Body,
{
type Data = B::Data;
fn is_end_stream(&self) -> bool {
grpc::Body::is_end_stream(&self.0)
}
fn poll_data(&mut self) -> Poll<Option<Self::Data>, grpc::Error> {
grpc::Body::poll_data(&mut self.0)
}
fn poll_metadata(&mut self) -> Poll<Option<http::HeaderMap>, grpc::Error> {
grpc::Body::poll_metadata(&mut self.0)
}
}
// ===== impl HyperServerSvc =====
impl<S, E> HyperServerSvc<S, E> {
pub(in proxy) fn new(
service: S,
upgrade_drain_signal: drain::Watch,
upgrade_executor: E,
) -> Self {
impl<S> HyperServerSvc<S> {
pub fn new(service: S) -> Self {
HyperServerSvc {
service,
upgrade_drain_signal,
upgrade_executor,
}
}
}
impl<S, E, B> hyper::service::Service for HyperServerSvc<S, E>
impl<S, B> hyper::service::Service for HyperServerSvc<S>
where
S: svc::Service<
http::Request<HttpBody>,
Response=http::Response<B>,
>,
S::Error: StdError + Send + Sync + 'static,
B: Payload + Default + Send + 'static,
E: Executor<BoxSendFuture> + Clone + Send + Sync + 'static,
B: Payload,
{
type ReqBody = hyper::Body;
type ResBody = B;
type Error = S::Error;
type Future = Either<
S::Future,
future::FutureResult<http::Response<Self::ResBody>, Self::Error>,
>;
type Future = S::Future;
fn call(&mut self, mut req: http::Request<Self::ReqBody>) -> Self::Future {
// Should this rejection happen later in the Service stack?
//
// Rejecting here means telemetry doesn't record anything about it...
//
// At the same time, this stuff is specifically HTTP1, so it feels
// proper to not have the HTTP2 requests going through it...
if h1::is_bad_request(&req) {
let mut res = http::Response::default();
*res.status_mut() = http::StatusCode::BAD_REQUEST;
return Either::B(future::ok(res));
}
let upgrade = if h1::wants_upgrade(&req) {
trace!("server request wants HTTP/1.1 upgrade");
// Upgrade requests include several "connection" headers that
// cannot be removed.
// Setup HTTP Upgrade machinery.
let halves = Http11Upgrade::new(
self.upgrade_drain_signal.clone(),
ErasedExecutor::erase(self.upgrade_executor.clone()),
);
req.extensions_mut().insert(halves.client);
Some(halves.server)
} else {
h1::strip_connection_headers(req.headers_mut());
None
};
let req = req.map(move |b| HttpBody {
fn call(&mut self, req: http::Request<Self::ReqBody>) -> Self::Future {
self.service.call(req.map(|b| HttpBody {
body: Some(b),
upgrade,
});
Either::A(self.service.call(req))
upgrade: None,
}))
}
}

View File

@ -14,7 +14,7 @@ pub mod settings;
pub mod upgrade;
pub use self::client::Client;
pub use self::glue::{Error, HttpBody as Body, GrpcBody};
pub use self::glue::{Error, HttpBody as Body, HyperServerSvc};
pub use self::settings::Settings;
use svc::Either;

View File

@ -3,13 +3,15 @@ use std::fmt;
use std::mem;
use std::sync::Arc;
use futures::Future;
use futures::{Future, Poll, future::{self, Either}};
use hyper::upgrade::OnUpgrade;
use try_lock::TryLock;
use drain;
use proxy::tcp;
use task::{ErasedExecutor, Executor};
use super::{h1, glue::HttpBody};
use svc;
use task::{BoxSendFuture, ErasedExecutor, Executor};
/// A type inserted into `http::Extensions` to bridge together HTTP Upgrades.
///
@ -60,6 +62,16 @@ enum Half {
Client,
}
#[derive(Debug)]
pub struct Service<S, E> {
service: S,
/// Watch any spawned HTTP/1.1 upgrade tasks.
upgrade_drain_signal: drain::Watch,
/// Executor used to spawn HTTP/1.1 upgrade tasks, and TCP proxies
/// after they succeed.
upgrade_executor: E,
}
// ===== impl Http11Upgrade =====
@ -169,3 +181,74 @@ impl Drop for Inner {
}
}
// ===== impl Service =====
impl<S, E> Service<S, E> {
pub(in proxy) fn new(
service: S,
upgrade_drain_signal: drain::Watch,
upgrade_executor: E,
) -> Self {
Service {
service,
upgrade_drain_signal,
upgrade_executor,
}
}
}
impl<S, E, B> svc::Service<http::Request<HttpBody>> for Service<S, E>
where
S: svc::Service<
http::Request<HttpBody>,
Response = http::Response<B>,
>,
E: Executor<BoxSendFuture> + Clone + Send + Sync + 'static,
B: Default,
{
type Response = S::Response;
type Error = S::Error;
type Future = Either<
S::Future,
future::FutureResult<http::Response<B>, Self::Error>,
>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.service.poll_ready()
}
fn call(&mut self, mut req: http::Request<HttpBody>) -> Self::Future {
// Should this rejection happen later in the Service stack?
//
// Rejecting here means telemetry doesn't record anything about it...
//
// At the same time, this stuff is specifically HTTP1, so it feels
// proper to not have the HTTP2 requests going through it...
if h1::is_bad_request(&req) {
let mut res = http::Response::default();
*res.status_mut() = http::StatusCode::BAD_REQUEST;
return Either::B(future::ok(res));
}
let upgrade = if h1::wants_upgrade(&req) {
trace!("server request wants HTTP/1.1 upgrade");
// Upgrade requests include several "connection" headers that
// cannot be removed.
// Setup HTTP Upgrade machinery.
let halves = Http11Upgrade::new(
self.upgrade_drain_signal.clone(),
ErasedExecutor::erase(self.upgrade_executor.clone()),
);
req.extensions_mut().insert(halves.client);
Some(halves.server)
} else {
h1::strip_connection_headers(req.headers_mut());
None
};
req.body_mut().upgrade = upgrade;
Either::A(self.service.call(req))
}
}

View File

@ -5,6 +5,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
pub mod buffer;
pub mod canonicalize;
pub mod http;
pub mod grpc;
pub mod limit;
mod protocol;
pub mod reconnect;

View File

@ -9,7 +9,7 @@ use drain;
use never::Never;
use svc::{Stack, Service};
use transport::{connect, tls, Connection, Peek};
use proxy::http::glue::{HttpBody, HyperServerSvc};
use proxy::http::{glue::{HttpBody, HyperServerSvc}, upgrade};
use proxy::protocol::Protocol;
use proxy::tcp;
use super::Accept;
@ -262,7 +262,7 @@ where
(p, io)
});
let http = self.http.clone();
let mut http = self.http.clone();
let route = self.route.clone();
let connect = self.connect.clone();
let drain_signal = self.drain_signal.clone();
@ -281,13 +281,15 @@ where
match route.make(&source) {
Err(never) => match never {},
Ok(s) => {
let svc = HyperServerSvc::new(
// Enable support for HTTP upgrades (CONNECT and websockets).
let svc = upgrade::Service::new(
s,
drain_signal.clone(),
log_clone.executor(),
);
// Enable support for HTTP upgrades (CONNECT and websockets).
let svc = HyperServerSvc::new(svc);
let conn = http
.http1_only(true)
.serve_connection(io, svc)
.with_upgrades();
drain_signal
@ -304,12 +306,10 @@ where
match route.make(&source) {
Err(never) => match never {},
Ok(s) => {
let svc = HyperServerSvc::new(
s,
drain_signal.clone(),
log_clone.executor(),
);
let svc = HyperServerSvc::new(s);
let conn = http
.with_executor(log_clone.executor())
.http2_only(true)
.serve_connection(io, svc);
drain_signal
.watch(conn, |conn| {