Move HTTP-specific `proxy` modules into proxy::http (#93)

To prepare to move http-specific proxying logic from bind.rs into
proxy, let's carve out a proxy::http module for HTTP-specific behavior.
This commit is contained in:
Oliver Gould 2018-09-13 13:45:56 -07:00 committed by GitHub
parent ccd5d21978
commit bbf296668b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 63 additions and 62 deletions

View File

@ -13,24 +13,24 @@ use control::destination::Endpoint;
use ctx; use ctx;
use svc::{MakeClient, Reconnect}; use svc::{MakeClient, Reconnect};
use telemetry; use telemetry;
use proxy::{self, HttpBody, h1, orig_proto}; use proxy;
use transport; use transport;
use tls; use tls;
use ctx::transport::TlsStatus; use ctx::transport::TlsStatus;
use watch_service::{WatchService, Rebind}; use watch_service::{WatchService, Rebind};
/// An HTTP `Service` that is created for each `Endpoint` and `Protocol`. /// An HTTP `Service` that is created for each `Endpoint` and `Protocol`.
pub type Stack<B> = orig_proto::Upgrade<NormalizeUri<WatchTls<B>>>; pub type Stack<B> = proxy::http::orig_proto::Upgrade<NormalizeUri<WatchTls<B>>>;
type WatchTls<B> = WatchService<tls::ConditionalClientConfig, RebindTls<B>>; type WatchTls<B> = WatchService<tls::ConditionalClientConfig, RebindTls<B>>;
/// An HTTP `Service` that is created for each `Endpoint`, `Protocol`, and client /// An HTTP `Service` that is created for each `Endpoint`, `Protocol`, and client
/// TLS configuration. /// TLS configuration.
pub type TlsStack<B> = telemetry::http::service::Http<HttpService<B>, B, HttpBody>; pub type TlsStack<B> = telemetry::http::service::Http<HttpService<B>, B, proxy::http::Body>;
type HttpService<B> = Reconnect< type HttpService<B> = Reconnect<
Arc<ctx::transport::Client>, Arc<ctx::transport::Client>,
proxy::Client< proxy::http::Client<
transport::metrics::Connect<transport::Connect>, transport::metrics::Connect<transport::Connect>,
::logging::ClientExecutor<&'static str, SocketAddr>, ::logging::ClientExecutor<&'static str, SocketAddr>,
telemetry::http::service::RequestBody<B>, telemetry::http::service::RequestBody<B>,
@ -263,7 +263,7 @@ where
client_ctx.clone(), client_ctx.clone(),
Reconnect::new( Reconnect::new(
client_ctx.clone(), client_ctx.clone(),
proxy::Client::new(protocol, connect, log.executor()) proxy::http::Client::new(protocol, connect, log.executor())
) )
) )
} }
@ -296,7 +296,7 @@ where
let normalize_uri = NormalizeUri::new(watch_tls, protocol.was_absolute_form()); let normalize_uri = NormalizeUri::new(watch_tls, protocol.was_absolute_form());
// Upgrade HTTP/1.1 requests to be HTTP/2 if the endpoint supports HTTP/2. // Upgrade HTTP/1.1 requests to be HTTP/2 if the endpoint supports HTTP/2.
orig_proto::Upgrade::new(normalize_uri, protocol.is_http2()) proxy::http::orig_proto::Upgrade::new(normalize_uri, protocol.is_http2())
} }
pub fn bind_service(&self, ep: &Endpoint, protocol: &Protocol) -> BoundService<B> { pub fn bind_service(&self, ep: &Endpoint, protocol: &Protocol) -> BoundService<B> {
@ -384,7 +384,7 @@ where
// absolute form. // absolute form.
!self.was_absolute_form !self.was_absolute_form
{ {
h1::normalize_our_view_of_uri(&mut request); proxy::http::h1::normalize_our_view_of_uri(&mut request);
} }
self.inner.call(request) self.inner.call(request)
} }
@ -442,7 +442,7 @@ impl Protocol {
return Protocol::Http2; return Protocol::Http2;
} }
let was_absolute_form = h1::is_absolute_form(req.uri()); let was_absolute_form = proxy::http::h1::is_absolute_form(req.uri());
trace!( trace!(
"Protocol::detect(); req.uri='{:?}'; was_absolute_form={:?};", "Protocol::detect(); req.uri='{:?}'; was_absolute_form={:?};",
req.uri(), was_absolute_form req.uri(), was_absolute_form
@ -451,7 +451,7 @@ impl Protocol {
// the key for an HTTP/1.x request. // the key for an HTTP/1.x request.
let host = Host::detect(req); let host = Host::detect(req);
let is_h1_upgrade = h1::wants_upgrade(req); let is_h1_upgrade = proxy::http::h1::wants_upgrade(req);
Protocol::Http1 { Protocol::Http1 {
host, host,
@ -496,7 +496,7 @@ impl Host {
.uri() .uri()
.authority_part() .authority_part()
.cloned() .cloned()
.or_else(|| h1::authority_from_host(req)) .or_else(|| proxy::http::h1::authority_from_host(req))
.map(Host::Authority) .map(Host::Authority)
.unwrap_or_else(|| Host::NoAuthority) .unwrap_or_else(|| Host::NoAuthority)
} }

View File

@ -8,8 +8,8 @@ use tower_h2;
use bind; use bind;
use ctx; use ctx;
use proxy::h2_router::Recognize; use proxy::http::router::Recognize;
use proxy::orig_proto; use proxy::http::orig_proto;
type Bind<B> = bind::Bind<ctx::Proxy, B>; type Bind<B> = bind::Bind<ctx::Proxy, B>;
@ -100,7 +100,7 @@ mod tests {
use std::net; use std::net;
use http; use http;
use proxy::h2_router::Recognize; use proxy::http::router::Recognize;
use super::Inbound; use super::Inbound;
use bind::{self, Bind, Host}; use bind::{self, Bind, Host};

View File

@ -95,8 +95,7 @@ use bind::Bind;
use conditional::Conditional; use conditional::Conditional;
use inbound::Inbound; use inbound::Inbound;
use task::MainRuntime; use task::MainRuntime;
use proxy::{HttpBody, Server}; use proxy::http::router::{Router, Recognize};
use proxy::h2_router::{self, Router, Recognize};
use svc::Layer; use svc::Layer;
use telemetry::http::timestamp_request_open; use telemetry::http::timestamp_request_open;
use transport::{BoundPort, Connection}; use transport::{BoundPort, Connection};
@ -417,7 +416,7 @@ where
E: Error + Send + 'static, E: Error + Send + 'static,
F: Error + Send + 'static, F: Error + Send + 'static,
R: Recognize< R: Recognize<
Request = http::Request<HttpBody>, Request = http::Request<proxy::http::Body>,
Response = http::Response<B>, Response = http::Response<B>,
Error = E, Error = E,
RouteError = F, RouteError = F,
@ -438,10 +437,10 @@ where
// TODO replace with a metrics module that is registered to the server // TODO replace with a metrics module that is registered to the server
// transport. // transport.
let stack = timestamp_request_open::Layer::new() let stack = timestamp_request_open::Layer::new()
.bind(h2_router::Make::new(router)); .bind(proxy::http::router::Make::new(router));
let listen_addr = bound_port.local_addr(); let listen_addr = bound_port.local_addr();
let server = Server::new( let server = proxy::Server::new(
listen_addr, listen_addr,
proxy_ctx, proxy_ctx,
transport_registry, transport_registry,

View File

@ -16,11 +16,11 @@ use tower_h2_balance::{PendingUntilFirstData, PendingUntilFirstDataBody};
use bind::{self, Bind, Protocol}; use bind::{self, Bind, Protocol};
use control::destination::{self, Resolution}; use control::destination::{self, Resolution};
use ctx; use ctx;
use proxy::h2_router::Recognize; use proxy::{self, http::h1};
use proxy::http::router::Recognize;
use svc::MakeClient; use svc::MakeClient;
use telemetry::http::service::{ResponseBody as SensorBody}; use telemetry::http::service::{ResponseBody as SensorBody};
use timeout::Timeout; use timeout::Timeout;
use proxy::{h1, HttpBody};
use transport::{DnsNameAndPort, Host, HostAndPort}; use transport::{DnsNameAndPort, Host, HostAndPort};
type BindProtocol<B> = bind::BindProtocol<ctx::Proxy, B>; type BindProtocol<B> = bind::BindProtocol<ctx::Proxy, B>;
@ -142,7 +142,7 @@ where
type Request = http::Request<B>; type Request = http::Request<B>;
type Response = http::Response<PendingUntilFirstDataBody< type Response = http::Response<PendingUntilFirstDataBody<
load::peak_ewma::Handle, load::peak_ewma::Handle,
SensorBody<HttpBody>, SensorBody<proxy::http::Body>,
>>; >>;
type Error = <Self::Service as tower::Service>::Error; type Error = <Self::Service as tower::Service>::Error;
type Key = (Destination, Protocol); type Key = (Destination, Protocol);

View File

@ -9,10 +9,10 @@ use tower_service::{Service, NewService};
use tower_h2; use tower_h2;
use bind; use bind;
use proxy::http::glue::{BodyPayload, HttpBody, HyperConnect};
use proxy::http::h1;
use proxy::http::upgrade::{HttpConnect, Http11Upgrade};
use task::BoxExecutor; use task::BoxExecutor;
use super::glue::{BodyPayload, HttpBody, HyperConnect};
use super::h1;
use super::upgrade::{HttpConnect, Http11Upgrade};
use std::{self, fmt}; use std::{self, fmt};

View File

@ -17,8 +17,8 @@ use tower_h2;
use ctx::transport::{Server as ServerCtx}; use ctx::transport::{Server as ServerCtx};
use drain; use drain;
use super::h1; use proxy::http::h1;
use super::upgrade::Http11Upgrade; use proxy::http::upgrade::Http11Upgrade;
use task::{BoxSendFuture, ErasedExecutor, Executor}; use task::{BoxSendFuture, ErasedExecutor, Executor};
/// Glue between `hyper::Body` and `tower_h2::RecvBody`. /// Glue between `hyper::Body` and `tower_h2::RecvBody`.
@ -35,13 +35,13 @@ pub enum HttpBody {
/// Glue for `tower_h2::Body`s to be used in hyper. /// Glue for `tower_h2::Body`s to be used in hyper.
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub(super) struct BodyPayload<B> { pub(in proxy) struct BodyPayload<B> {
body: B, body: B,
} }
/// Glue for a `tower::Service` to used as a `hyper::server::Service`. /// Glue for a `tower::Service` to used as a `hyper::server::Service`.
#[derive(Debug)] #[derive(Debug)]
pub(super) struct HyperServerSvc<S, E> { pub(in proxy) struct HyperServerSvc<S, E> {
service: S, service: S,
srv_ctx: Arc<ServerCtx>, srv_ctx: Arc<ServerCtx>,
/// Watch any spawned HTTP/1.1 upgrade tasks. /// Watch any spawned HTTP/1.1 upgrade tasks.
@ -52,36 +52,36 @@ pub(super) struct HyperServerSvc<S, E> {
} }
/// Future returned by `HyperServerSvc`. /// Future returned by `HyperServerSvc`.
pub(super) struct HyperServerSvcFuture<F> { pub(in proxy) struct HyperServerSvcFuture<F> {
inner: F, inner: F,
} }
/// Glue for any `Service` taking an h2 body to receive an `HttpBody`. /// Glue for any `Service` taking an h2 body to receive an `HttpBody`.
#[derive(Debug)] #[derive(Debug)]
pub(super) struct HttpBodySvc<S> { pub(in proxy) struct HttpBodySvc<S> {
service: S, service: S,
} }
/// Glue for any `NewService` taking an h2 body to receive an `HttpBody`. /// Glue for any `NewService` taking an h2 body to receive an `HttpBody`.
#[derive(Clone)] #[derive(Clone)]
pub(super) struct HttpBodyNewSvc<N> { pub(in proxy) struct HttpBodyNewSvc<N> {
new_service: N, new_service: N,
} }
/// Future returned by `HttpBodyNewSvc`. /// Future returned by `HttpBodyNewSvc`.
pub(super) struct HttpBodyNewSvcFuture<F> { pub(in proxy) struct HttpBodyNewSvcFuture<F> {
inner: F, inner: F,
} }
/// Glue for any `tokio_connect::Connect` to implement `hyper::client::Connect`. /// Glue for any `tokio_connect::Connect` to implement `hyper::client::Connect`.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub(super) struct HyperConnect<C> { pub(in proxy) struct HyperConnect<C> {
connect: C, connect: C,
absolute_form: bool, absolute_form: bool,
} }
/// Future returned by `HyperConnect`. /// Future returned by `HyperConnect`.
pub(super) struct HyperConnectFuture<F> { pub(in proxy) struct HyperConnectFuture<F> {
inner: F, inner: F,
absolute_form: bool, absolute_form: bool,
} }
@ -183,7 +183,7 @@ impl Drop for HttpBody {
impl<B> BodyPayload<B> { impl<B> BodyPayload<B> {
/// Wrap a `tower_h2::Body` into a `Stream` hyper can understand. /// Wrap a `tower_h2::Body` into a `Stream` hyper can understand.
pub fn new(body: B) -> Self { pub(in proxy) fn new(body: B) -> Self {
BodyPayload { BodyPayload {
body, body,
} }
@ -222,7 +222,7 @@ where
// ===== impl HyperServerSvc ===== // ===== impl HyperServerSvc =====
impl<S, E> HyperServerSvc<S, E> { impl<S, E> HyperServerSvc<S, E> {
pub fn new( pub(in proxy) fn new(
service: S, service: S,
srv_ctx: Arc<ServerCtx>, srv_ctx: Arc<ServerCtx>,
upgrade_drain_signal: drain::Watch, upgrade_drain_signal: drain::Watch,
@ -347,7 +347,7 @@ impl<N> HttpBodyNewSvc<N>
where where
N: NewService<Request=http::Request<HttpBody>>, N: NewService<Request=http::Request<HttpBody>>,
{ {
pub fn new(new_service: N) -> Self { pub(in proxy) fn new(new_service: N) -> Self {
HttpBodyNewSvc { HttpBodyNewSvc {
new_service, new_service,
} }
@ -394,7 +394,7 @@ where
C: Connect, C: Connect,
C::Future: 'static, C::Future: 'static,
{ {
pub fn new(connect: C, absolute_form: bool) -> Self { pub(in proxy) fn new(connect: C, absolute_form: bool) -> Self {
HyperConnect { HyperConnect {
connect, connect,
absolute_form, absolute_form,

9
src/proxy/http/mod.rs Normal file
View File

@ -0,0 +1,9 @@
pub mod client;
pub(super) mod glue;
pub mod h1;
pub mod router;
pub mod upgrade;
pub mod orig_proto;
pub use self::client::{Client, Error as ClientError};
pub use self::glue::HttpBody as Body;

View File

@ -6,7 +6,7 @@ use std::{fmt, error};
use std::sync::Arc; use std::sync::Arc;
use ctx; use ctx;
use svc::{MakeClient, Service}; use svc;
extern crate linkerd2_proxy_router; extern crate linkerd2_proxy_router;
@ -22,7 +22,7 @@ where
router: Router<R>, router: Router<R>,
} }
pub struct H2Router<R> pub struct Service<R>
where where
R: Recognize, R: Recognize,
R::Error: error::Error, R::Error: error::Error,
@ -38,7 +38,7 @@ where
R::Error: error::Error, R::Error: error::Error,
R::RouteError: fmt::Display, R::RouteError: fmt::Display,
{ {
inner: <Router<R> as Service>::Future, inner: <Router<R> as svc::Service>::Future,
} }
// ===== impl Make ===== // ===== impl Make =====
@ -70,7 +70,7 @@ where
} }
} }
impl<R, A, B> MakeClient<Arc<ctx::transport::Server>> for Make<R> impl<R, A, B> svc::MakeClient<Arc<ctx::transport::Server>> for Make<R>
where where
R: Recognize<Request = http::Request<A>, Response = http::Response<B>>, R: Recognize<Request = http::Request<A>, Response = http::Response<B>>,
R: Send + Sync + 'static, R: Send + Sync + 'static,
@ -80,11 +80,11 @@ where
B: Default + Send + 'static, B: Default + Send + 'static,
{ {
type Error = (); type Error = ();
type Client = H2Router<R>; type Client = Service<R>;
fn make_client(&self, _: &Arc<ctx::transport::Server>) -> Result<Self::Client, Self::Error> { fn make_client(&self, _: &Arc<ctx::transport::Server>) -> Result<Self::Client, Self::Error> {
let inner = self.router.clone(); let inner = self.router.clone();
Ok(H2Router { inner }) Ok(Service { inner })
} }
} }
@ -115,17 +115,17 @@ where
} }
} }
// ===== impl Router ===== // ===== impl Service =====
impl<R, B> Service for H2Router<R> impl<R, B> svc::Service for Service<R>
where where
R: Recognize<Response = http::Response<B>>, R: Recognize<Response = http::Response<B>>,
R::Error: error::Error, R::Error: error::Error,
R::RouteError: fmt::Display, R::RouteError: fmt::Display,
B: Default, B: Default,
{ {
type Request = <Router<R> as Service>::Request; type Request = <Router<R> as svc::Service>::Request;
type Response = <Router<R> as Service>::Response; type Response = <Router<R> as svc::Service>::Response;
type Error = h2::Error; type Error = h2::Error;
type Future = ResponseFuture<R>; type Future = ResponseFuture<R>;

View File

@ -8,7 +8,7 @@ use hyper::upgrade::OnUpgrade;
use try_lock::TryLock; use try_lock::TryLock;
use drain; use drain;
use super::tcp; use proxy::tcp;
use task::{ErasedExecutor, Executor}; use task::{ErasedExecutor, Executor};
/// A type inserted into `http::Extensions` to bridge together HTTP Upgrades. /// A type inserted into `http::Extensions` to bridge together HTTP Upgrades.

View File

@ -13,16 +13,9 @@
//! This module is intended only to store the infrastructure for building a //! This module is intended only to store the infrastructure for building a
//! proxy. The specific logic implemented by a proxy should live elsewhere. //! proxy. The specific logic implemented by a proxy should live elsewhere.
mod client; pub mod http;
mod glue;
pub mod h1;
pub mod h2_router;
mod upgrade;
pub mod orig_proto;
mod protocol; mod protocol;
mod server; mod server;
mod tcp; mod tcp;
pub use self::client::{Client, Error as ClientError};
pub use self::glue::HttpBody;
pub use self::server::Server; pub use self::server::Server;

View File

@ -18,9 +18,9 @@ use ctx::transport::{Server as ServerCtx};
use drain; use drain;
use svc::{MakeClient, Service}; use svc::{MakeClient, Service};
use transport::{self, Connection, GetOriginalDst, Peek}; use transport::{self, Connection, GetOriginalDst, Peek};
use super::glue::{HttpBody, HttpBodyNewSvc, HyperServerSvc}; use proxy::http::glue::{HttpBody, HttpBodyNewSvc, HyperServerSvc};
use super::protocol::Protocol; use proxy::protocol::Protocol;
use super::tcp; use proxy::tcp;
/// A protocol-transparent Server! /// A protocol-transparent Server!
/// ///

View File

@ -6,7 +6,7 @@ use tower_h2::Body;
use ctx; use ctx;
use telemetry::{http::event, tap}; use telemetry::{http::event, tap};
use proxy::ClientError; use proxy::http::ClientError;
use super::record::Record; use super::record::Record;
use super::service::{Http, RequestBody}; use super::service::{Http, RequestBody};

View File

@ -10,7 +10,7 @@ use tower_service::Service;
use tower_h2::Body; use tower_h2::Body;
use ctx; use ctx;
use proxy::ClientError; use proxy::http::ClientError;
use super::event::{self, Event}; use super::event::{self, Event};
use super::sensors::Handle; use super::sensors::Handle;