From 792c04b7d1e87538493b1ebc4a8f722b56f99fb8 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 18 Dec 2018 11:48:30 -0800 Subject: [PATCH] Replace tower-h2 tap service with hyper This untangles some of the HTTP/gRPC glue, providing services/stacks that have more specific focuses. The `HyperServerSvc` now *only* converts to a `tower::Service`, and the HTTP/1.1 and Upgrade pieces were moved to a specific `proxy::http::upgrade::Service`. Several stack modules were added to `proxy::grpc`, which can map request and response bodies into `Payload`, or into `grpc::Body`, as needed. Signed-off-by: Sean McArthur --- Cargo.lock | 1 - Cargo.toml | 3 +- src/app/control.rs | 101 ------------------------- src/app/main.rs | 41 ++++++---- src/lib.rs | 1 - src/proxy/grpc/body.rs | 58 +++++++++++++++ src/proxy/grpc/mod.rs | 9 +++ src/proxy/grpc/service.rs | 152 ++++++++++++++++++++++++++++++++++++++ src/proxy/http/glue.rs | 151 +++++++++---------------------------- src/proxy/http/mod.rs | 2 +- src/proxy/http/upgrade.rs | 87 +++++++++++++++++++++- src/proxy/mod.rs | 1 + src/proxy/server.rs | 18 ++--- 13 files changed, 378 insertions(+), 247 deletions(-) create mode 100644 src/proxy/grpc/body.rs create mode 100644 src/proxy/grpc/mod.rs create mode 100644 src/proxy/grpc/service.rs diff --git a/Cargo.lock b/Cargo.lock index 5c5ee5016..7516ae8da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -568,7 +568,6 @@ dependencies = [ "tower-buffer 0.1.0 (git+https://github.com/tower-rs/tower)", "tower-discover 0.1.0 (git+https://github.com/tower-rs/tower)", "tower-grpc 0.1.0 (git+https://github.com/tower-rs/tower-grpc)", - "tower-h2 0.1.0 (git+https://github.com/tower-rs/tower-h2)", "tower-http 0.1.0 (git+https://github.com/tower-rs/tower-http)", "tower-in-flight-limit 0.1.0 (git+https://github.com/tower-rs/tower)", "tower-reconnect 0.1.0 (git+https://github.com/tower-rs/tower)", diff --git a/Cargo.toml b/Cargo.toml index d5574df14..52848ad40 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,8 +68,7 @@ tower-reconnect = { git = "https://github.com/tower-rs/tower" } tower-service = { git = "https://github.com/tower-rs/tower" } tower-util = { git = "https://github.com/tower-rs/tower" } tower-http = { git = "https://github.com/tower-rs/tower-http" } -tower-h2 = { git = "https://github.com/tower-rs/tower-h2" } -tower-grpc = { git = "https://github.com/tower-rs/tower-grpc" } +tower-grpc = { git = "https://github.com/tower-rs/tower-grpc", default-features = false, features = ["protobuf"] } # FIXME update to a release when available (>0.10) trust-dns-resolver = { git = "https://github.com/bluejekyll/trust-dns", rev = "c017c114", default-features = false } diff --git a/src/app/control.rs b/src/app/control.rs index 9425992ea..c23822b7d 100644 --- a/src/app/control.rs +++ b/src/app/control.rs @@ -455,104 +455,3 @@ pub mod client { } } -pub mod grpc_request_payload { - use bytes::Bytes; - use http; - use futures::Poll; - use std::marker::PhantomData; - use tower_grpc::{Body}; - - use proxy::http::GrpcBody as GlueBody; - use svc; - - #[derive(Debug)] - pub struct Layer(PhantomData B>); - - #[derive(Debug)] - pub struct Stack { - inner: M, - _p: PhantomData B>, - } - - #[derive(Debug)] - pub struct Service { - inner: S, - _p: PhantomData B>, - } - - // === impl Layer === - - pub fn layer() -> Layer - where - B: Body + Send + 'static, - { - Layer(PhantomData) - } - - impl Clone for Layer { - fn clone(&self) -> Self { - Layer(PhantomData) - } - } - - impl svc::Layer for Layer - where - B: Body + Send + 'static, - M: svc::Stack, - M::Value: svc::Service>>, - { - type Value = as svc::Stack>::Value; - type Error = as svc::Stack>::Error; - type Stack = Stack; - - fn bind(&self, inner: M) -> Self::Stack { - Stack { inner, _p: PhantomData } - } - } - - // === impl Stack === - - impl Clone for Stack { - fn clone(&self) -> Self { - Stack { - inner: self.inner.clone(), - _p: PhantomData, - } - } - } - - impl svc::Stack for Stack - where - B: Body + Send + 'static, - M: svc::Stack, - M::Value: svc::Service>>, - { - type Value = Service; - type Error = M::Error; - - fn make(&self, target: &T) -> Result { - let inner = self.inner.make(target)?; - Ok(Service { inner, _p: PhantomData }) - } - } - - // === impl Service === - - impl svc::Service> for Service - where - B: Body + Send + 'static, - S: svc::Service>>, - { - type Response = S::Response; - type Error = S::Error; - type Future = S::Future; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.inner.poll_ready() - } - - fn call(&mut self, req: http::Request) -> Self::Future { - self.inner.call(req.map(GlueBody::new)) - } - } -} diff --git a/src/app/main.rs b/src/app/main.rs index 8d87107fe..f5a2bc4d5 100644 --- a/src/app/main.rs +++ b/src/app/main.rs @@ -1,6 +1,4 @@ -use bytes; use futures::{self, future, Future, Poll}; -use h2; use http; use hyper; use indexmap::IndexSet; @@ -10,7 +8,7 @@ use std::time::{Duration, SystemTime}; use std::{error, fmt, io}; use tokio::executor::{self, DefaultExecutor, Executor}; use tokio::runtime::current_thread; -use tower_h2; +use tower_grpc as grpc; use app::classify::{self, Class}; use app::metric_labels::{ControlLabels, EndpointLabels, RouteLabels}; @@ -247,7 +245,7 @@ where .push(http_metrics::layer::<_, classify::Response>( ctl_http_metrics, )) - .push(control::grpc_request_payload::layer()) + .push(proxy::grpc::req_body_as_payload::layer()) .push(svc::watch::layer(tls_client_config.clone())) .push(phantom_data::layer()) .push(control::add_origin::layer()) @@ -733,29 +731,42 @@ fn serve_tap( new_service: N, ) -> impl Future + 'static where - B: tower_h2::Body + Send + 'static, - ::Buf: Send, - N: svc::MakeService<(), http::Request, Response = http::Response> + B: tower_grpc::Body + Send + 'static, + B::Data: Send + 'static, + ::Buf: Send + 'static, + N: svc::MakeService<(), http::Request, Response = http::Response> + Send + 'static, - tower_h2::server::Connection: - Future, + N::Error: error::Error + Send + Sync, + N::MakeError: error::Error, + >>::Future: Send + 'static, { let log = logging::admin().server("tap", bound_port.local_addr()); - let h2_builder = h2::server::Builder::default(); - let server = tower_h2::Server::new(new_service, h2_builder, log.clone().executor()); let fut = { let log = log.clone(); // TODO: serve over TLS. bound_port - .listen_and_fold(server, move |mut server, (session, remote)| { + .listen_and_fold(new_service, move |mut new_service, (session, remote)| { let log = log.clone().with_remote(remote); - let serve = server.serve(session).map_err(|_| ()); + let log_clone = log.clone(); + let serve = new_service + .make_service(()) + .map_err(|err| error!("tap MakeService error: {}", err)) + .and_then(move |svc| { + let svc = proxy::grpc::req_box_body::Service::new(svc); + let svc = proxy::grpc::res_body_as_payload::Service::new(svc); + let svc = proxy::http::HyperServerSvc::new(svc); + hyper::server::conn::Http::new() + .with_executor(log_clone.executor()) + .http2_only(true) + .serve_connection(session, svc) + .map_err(|err| debug!("tap connection error: {}", err)) + }); let r = executor::current_thread::TaskExecutor::current() .spawn_local(Box::new(log.future(serve))) - .map(move |_| server) + .map(|()| new_service) .map_err(task::Error::into_io); future::result(r) }) @@ -764,3 +775,5 @@ where log.future(fut) } + + diff --git a/src/lib.rs b/src/lib.rs index 204683664..285dcb236 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -37,7 +37,6 @@ extern crate ring; extern crate tokio; extern crate tokio_timer; extern crate tower_grpc; -extern crate tower_h2; extern crate tower_http; extern crate tower_util; extern crate trust_dns_resolver; diff --git a/src/proxy/grpc/body.rs b/src/proxy/grpc/body.rs new file mode 100644 index 000000000..9bb312cf0 --- /dev/null +++ b/src/proxy/grpc/body.rs @@ -0,0 +1,58 @@ +use bytes::IntoBuf; +use futures::{Poll}; +use http; +use hyper::body::Payload; +use tower_grpc as grpc; + +#[derive(Debug)] +pub struct GrpcBody(B); + +// ===== impl GrpcBody ===== + +impl GrpcBody { + pub fn new(inner: B) -> Self { + GrpcBody(inner) + } +} + +impl Payload for GrpcBody +where + B: grpc::Body + Send + 'static, + B::Data: Send + 'static, + ::Buf: Send + 'static, +{ + type Data = ::Buf; + type Error = h2::Error; + + fn is_end_stream(&self) -> bool { + grpc::Body::is_end_stream(&self.0) + } + + fn poll_data(&mut self) -> Poll, Self::Error> { + let data = try_ready!(grpc::Body::poll_data(&mut self.0)); + Ok(data.map(IntoBuf::into_buf).into()) + } + + fn poll_trailers(&mut self) -> Poll, Self::Error> { + grpc::Body::poll_metadata(&mut self.0).map_err(From::from) + } +} + +impl grpc::Body for GrpcBody +where + B: grpc::Body, +{ + type Data = B::Data; + + fn is_end_stream(&self) -> bool { + grpc::Body::is_end_stream(&self.0) + } + + fn poll_data(&mut self) -> Poll, grpc::Error> { + grpc::Body::poll_data(&mut self.0) + } + + fn poll_metadata(&mut self) -> Poll, grpc::Error> { + grpc::Body::poll_metadata(&mut self.0) + } +} diff --git a/src/proxy/grpc/mod.rs b/src/proxy/grpc/mod.rs new file mode 100644 index 000000000..edbdcb97d --- /dev/null +++ b/src/proxy/grpc/mod.rs @@ -0,0 +1,9 @@ +mod body; +mod service; + +pub use self::body::GrpcBody; +pub use self::service::{ + req_body_as_payload, + req_box_body, + res_body_as_payload, +}; diff --git a/src/proxy/grpc/service.rs b/src/proxy/grpc/service.rs new file mode 100644 index 000000000..015c266a4 --- /dev/null +++ b/src/proxy/grpc/service.rs @@ -0,0 +1,152 @@ +pub mod req_body_as_payload { + use bytes::Bytes; + use http; + use futures::Poll; + use tower_grpc::Body; + + use super::super::GrpcBody; + use svc; + + + #[derive(Clone, Debug)] + pub struct Layer; + + #[derive(Clone, Debug)] + pub struct Stack { + inner: M, + } + + #[derive(Debug)] + pub struct Service(S); + + // === impl Layer === + + pub fn layer() -> Layer { + Layer + } + + impl svc::Layer for Layer + where + M: svc::Stack, + { + type Value = as svc::Stack>::Value; + type Error = as svc::Stack>::Error; + type Stack = Stack; + + fn bind(&self, inner: M) -> Self::Stack { + Stack { inner } + } + } + + // === impl Stack === + + impl svc::Stack for Stack + where + M: svc::Stack, + { + type Value = Service; + type Error = M::Error; + + fn make(&self, target: &T) -> Result { + let inner = self.inner.make(target)?; + Ok(Service(inner)) + } + } + + // === impl Service === + + impl svc::Service> for Service + where + B: Body + Send + 'static, + S: svc::Service>>, + { + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.0.poll_ready() + } + + fn call(&mut self, req: http::Request) -> Self::Future { + self.0.call(req.map(GrpcBody::new)) + } + } +} + +pub mod req_box_body { + use bytes::Bytes; + use http; + use futures::Poll; + use tower_grpc::{Body, BoxBody}; + + use svc; + + pub struct Service(S); + + impl Service { + pub fn new(service: S) -> Self { + Service(service) + } + } + + impl svc::Service> for Service + where + B: Body + Send + 'static, + S: svc::Service>, + { + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.0.poll_ready() + } + + fn call(&mut self, req: http::Request) -> Self::Future { + self.0.call(req.map(|b| BoxBody::new(Box::new(b)))) + } + } +} + +pub mod res_body_as_payload { + use http; + use futures::{future, Future, Poll}; + use tower_grpc::Body; + + use super::super::GrpcBody; + use svc; + + pub struct Service(S); + + impl Service { + pub fn new(service: S) -> Self { + Service(service) + } + } + + impl svc::Service> for Service + where + B2: Body, + S: svc::Service< + http::Request, + Response = http::Response, + >, + { + type Response = http::Response>; + type Error = S::Error; + type Future = future::Map) -> http::Response>>; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.0.poll_ready() + } + + fn call(&mut self, req: http::Request) -> Self::Future { + self.0.call(req) + .map(|res| { + res.map(GrpcBody::new) + }) + } + } +} + diff --git a/src/proxy/http/glue.rs b/src/proxy/http/glue.rs index 56ad799c0..bbf80be26 100644 --- a/src/proxy/http/glue.rs +++ b/src/proxy/http/glue.rs @@ -1,6 +1,5 @@ -use bytes::IntoBuf; -use futures::{future, Async, Future, Poll}; -use futures::future::Either; +use bytes::{Bytes}; +use futures::{Async, Future, Poll}; use http; use h2; use hyper::{self, body::Payload}; @@ -8,10 +7,8 @@ use hyper::client::connect as hyper_connect; use std::{error::Error as StdError, fmt}; use tower_grpc as grpc; -use drain; -use proxy::http::{HasH2Reason, h1, upgrade::Http11Upgrade}; +use proxy::http::{HasH2Reason, upgrade::Http11Upgrade}; use svc; -use task::{BoxSendFuture, ErasedExecutor, Executor}; use transport::Connect; /// Provides optional HTTP/1.1 upgrade support on the body. @@ -23,18 +20,10 @@ pub struct HttpBody { pub(super) upgrade: Option } -#[derive(Debug)] -pub struct GrpcBody(B); - /// Glue for a `tower::Service` to used as a `hyper::server::Service`. #[derive(Debug)] -pub(in proxy) struct HyperServerSvc { +pub struct HyperServerSvc { service: S, - /// Watch any spawned HTTP/1.1 upgrade tasks. - upgrade_drain_signal: drain::Watch, - /// Executor used to spawn HTTP/1.1 upgrade tasks, and TCP proxies - /// after they succeed. - upgrade_executor: E, } /// Glue for any `tokio_connect::Connect` to implement `hyper::client::Connect`. @@ -98,6 +87,28 @@ impl Payload for HttpBody { } } +impl grpc::Body for HttpBody { + type Data = Bytes; + + fn is_end_stream(&self) -> bool { + Payload::is_end_stream(self) + } + + fn poll_data(&mut self) -> Poll, grpc::Error> { + match Payload::poll_data(self) { + Ok(Async::Ready(Some(chunk))) => Ok(Async::Ready(Some(chunk.into()))), + Ok(Async::Ready(None)) => Ok(Async::Ready(None)), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(err) => Err(err.into()), + } + } + + fn poll_metadata(&mut self) -> Poll, grpc::Error> { + Payload::poll_trailers(self) + .map_err(From::from) + } +} + impl Default for HttpBody { fn default() -> HttpBody { HttpBody { @@ -121,127 +132,35 @@ impl Drop for HttpBody { } } -// ===== impl GrpcBody ===== - -impl GrpcBody { - pub fn new(inner: B) -> Self { - GrpcBody(inner) - } -} - -impl Payload for GrpcBody -where - B: grpc::Body + Send + 'static, - B::Data: Send + 'static, - ::Buf: Send + 'static, -{ - type Data = ::Buf; - type Error = h2::Error; - - fn is_end_stream(&self) -> bool { - grpc::Body::is_end_stream(&self.0) - } - - fn poll_data(&mut self) -> Poll, Self::Error> { - let data = try_ready!(grpc::Body::poll_data(&mut self.0)); - Ok(data.map(IntoBuf::into_buf).into()) - } - - fn poll_trailers(&mut self) -> Poll, Self::Error> { - grpc::Body::poll_metadata(&mut self.0).map_err(From::from) - } -} - -impl grpc::Body for GrpcBody -where - B: grpc::Body, -{ - type Data = B::Data; - - fn is_end_stream(&self) -> bool { - grpc::Body::is_end_stream(&self.0) - } - - fn poll_data(&mut self) -> Poll, grpc::Error> { - grpc::Body::poll_data(&mut self.0) - } - - fn poll_metadata(&mut self) -> Poll, grpc::Error> { - grpc::Body::poll_metadata(&mut self.0) - } -} - // ===== impl HyperServerSvc ===== -impl HyperServerSvc { - pub(in proxy) fn new( - service: S, - upgrade_drain_signal: drain::Watch, - upgrade_executor: E, - ) -> Self { +impl HyperServerSvc { + pub fn new(service: S) -> Self { HyperServerSvc { service, - upgrade_drain_signal, - upgrade_executor, } } } -impl hyper::service::Service for HyperServerSvc +impl hyper::service::Service for HyperServerSvc where S: svc::Service< http::Request, Response=http::Response, >, S::Error: StdError + Send + Sync + 'static, - B: Payload + Default + Send + 'static, - E: Executor + Clone + Send + Sync + 'static, + B: Payload, { type ReqBody = hyper::Body; type ResBody = B; type Error = S::Error; - type Future = Either< - S::Future, - future::FutureResult, Self::Error>, - >; + type Future = S::Future; - fn call(&mut self, mut req: http::Request) -> Self::Future { - // Should this rejection happen later in the Service stack? - // - // Rejecting here means telemetry doesn't record anything about it... - // - // At the same time, this stuff is specifically HTTP1, so it feels - // proper to not have the HTTP2 requests going through it... - if h1::is_bad_request(&req) { - let mut res = http::Response::default(); - *res.status_mut() = http::StatusCode::BAD_REQUEST; - return Either::B(future::ok(res)); - } - - let upgrade = if h1::wants_upgrade(&req) { - trace!("server request wants HTTP/1.1 upgrade"); - // Upgrade requests include several "connection" headers that - // cannot be removed. - - // Setup HTTP Upgrade machinery. - let halves = Http11Upgrade::new( - self.upgrade_drain_signal.clone(), - ErasedExecutor::erase(self.upgrade_executor.clone()), - ); - req.extensions_mut().insert(halves.client); - - Some(halves.server) - } else { - h1::strip_connection_headers(req.headers_mut()); - None - }; - - - let req = req.map(move |b| HttpBody { + fn call(&mut self, req: http::Request) -> Self::Future { + self.service.call(req.map(|b| HttpBody { body: Some(b), - upgrade, - }); - Either::A(self.service.call(req)) + upgrade: None, + })) } } diff --git a/src/proxy/http/mod.rs b/src/proxy/http/mod.rs index 8d9704ee0..c1bc7539f 100644 --- a/src/proxy/http/mod.rs +++ b/src/proxy/http/mod.rs @@ -14,7 +14,7 @@ pub mod settings; pub mod upgrade; pub use self::client::Client; -pub use self::glue::{Error, HttpBody as Body, GrpcBody}; +pub use self::glue::{Error, HttpBody as Body, HyperServerSvc}; pub use self::settings::Settings; use svc::Either; diff --git a/src/proxy/http/upgrade.rs b/src/proxy/http/upgrade.rs index 0e0ffeac2..b634a3d12 100644 --- a/src/proxy/http/upgrade.rs +++ b/src/proxy/http/upgrade.rs @@ -3,13 +3,15 @@ use std::fmt; use std::mem; use std::sync::Arc; -use futures::Future; +use futures::{Future, Poll, future::{self, Either}}; use hyper::upgrade::OnUpgrade; use try_lock::TryLock; use drain; use proxy::tcp; -use task::{ErasedExecutor, Executor}; +use super::{h1, glue::HttpBody}; +use svc; +use task::{BoxSendFuture, ErasedExecutor, Executor}; /// A type inserted into `http::Extensions` to bridge together HTTP Upgrades. /// @@ -60,6 +62,16 @@ enum Half { Client, } +#[derive(Debug)] +pub struct Service { + service: S, + /// Watch any spawned HTTP/1.1 upgrade tasks. + upgrade_drain_signal: drain::Watch, + /// Executor used to spawn HTTP/1.1 upgrade tasks, and TCP proxies + /// after they succeed. + upgrade_executor: E, +} + // ===== impl Http11Upgrade ===== @@ -169,3 +181,74 @@ impl Drop for Inner { } } +// ===== impl Service ===== +impl Service { + pub(in proxy) fn new( + service: S, + upgrade_drain_signal: drain::Watch, + upgrade_executor: E, + ) -> Self { + Service { + service, + upgrade_drain_signal, + upgrade_executor, + } + } +} + +impl svc::Service> for Service +where + S: svc::Service< + http::Request, + Response = http::Response, + >, + E: Executor + Clone + Send + Sync + 'static, + B: Default, +{ + type Response = S::Response; + type Error = S::Error; + type Future = Either< + S::Future, + future::FutureResult, Self::Error>, + >; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.service.poll_ready() + } + + fn call(&mut self, mut req: http::Request) -> Self::Future { + // Should this rejection happen later in the Service stack? + // + // Rejecting here means telemetry doesn't record anything about it... + // + // At the same time, this stuff is specifically HTTP1, so it feels + // proper to not have the HTTP2 requests going through it... + if h1::is_bad_request(&req) { + let mut res = http::Response::default(); + *res.status_mut() = http::StatusCode::BAD_REQUEST; + return Either::B(future::ok(res)); + } + + let upgrade = if h1::wants_upgrade(&req) { + trace!("server request wants HTTP/1.1 upgrade"); + // Upgrade requests include several "connection" headers that + // cannot be removed. + + // Setup HTTP Upgrade machinery. + let halves = Http11Upgrade::new( + self.upgrade_drain_signal.clone(), + ErasedExecutor::erase(self.upgrade_executor.clone()), + ); + req.extensions_mut().insert(halves.client); + + Some(halves.server) + } else { + h1::strip_connection_headers(req.headers_mut()); + None + }; + + req.body_mut().upgrade = upgrade; + + Either::A(self.service.call(req)) + } +} diff --git a/src/proxy/mod.rs b/src/proxy/mod.rs index c8fa9ca43..ed168528f 100644 --- a/src/proxy/mod.rs +++ b/src/proxy/mod.rs @@ -5,6 +5,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; pub mod buffer; pub mod canonicalize; pub mod http; +pub mod grpc; pub mod limit; mod protocol; pub mod reconnect; diff --git a/src/proxy/server.rs b/src/proxy/server.rs index ccec8f058..dfc41c8a4 100644 --- a/src/proxy/server.rs +++ b/src/proxy/server.rs @@ -9,7 +9,7 @@ use drain; use never::Never; use svc::{Stack, Service}; use transport::{connect, tls, Connection, Peek}; -use proxy::http::glue::{HttpBody, HyperServerSvc}; +use proxy::http::{glue::{HttpBody, HyperServerSvc}, upgrade}; use proxy::protocol::Protocol; use proxy::tcp; use super::Accept; @@ -262,7 +262,7 @@ where (p, io) }); - let http = self.http.clone(); + let mut http = self.http.clone(); let route = self.route.clone(); let connect = self.connect.clone(); let drain_signal = self.drain_signal.clone(); @@ -281,13 +281,15 @@ where match route.make(&source) { Err(never) => match never {}, Ok(s) => { - let svc = HyperServerSvc::new( + // Enable support for HTTP upgrades (CONNECT and websockets). + let svc = upgrade::Service::new( s, drain_signal.clone(), log_clone.executor(), ); - // Enable support for HTTP upgrades (CONNECT and websockets). + let svc = HyperServerSvc::new(svc); let conn = http + .http1_only(true) .serve_connection(io, svc) .with_upgrades(); drain_signal @@ -304,12 +306,10 @@ where match route.make(&source) { Err(never) => match never {}, Ok(s) => { - let svc = HyperServerSvc::new( - s, - drain_signal.clone(), - log_clone.executor(), - ); + let svc = HyperServerSvc::new(s); let conn = http + .with_executor(log_clone.executor()) + .http2_only(true) .serve_connection(io, svc); drain_signal .watch(conn, |conn| {