From f37c9e51285ef585e600e2a2c7eea3263adedda0 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Fri, 16 Nov 2018 11:19:17 -0800 Subject: [PATCH] Update all tower pieces to use Service (#132) Signed-off-by: Sean McArthur --- Cargo.lock | 56 ++++++++----- Cargo.toml | 1 + lib/router/src/lib.rs | 26 +++--- lib/stack/src/either.rs | 9 +-- lib/stack/src/lib.rs | 4 +- lib/stack/src/stack_make_service.rs | 37 +++++++++ lib/stack/src/stack_new_service.rs | 38 --------- lib/stack/src/stack_per_request.rs | 13 ++- lib/stack/src/watch.rs | 21 +++-- lib/timeout/src/lib.rs | 7 +- src/app/control.rs | 54 +++++++------ src/app/inbound.rs | 58 ++++++++----- src/app/main.rs | 14 ++-- src/app/outbound.rs | 54 ++++++++----- src/app/profiles.rs | 28 ++++--- .../destination/background/destination_set.rs | 17 ++-- src/control/destination/background/mod.rs | 42 ++++++---- src/control/destination/mod.rs | 7 +- src/control/observe.rs | 6 +- src/control/remote_stream.rs | 32 +++++--- src/lib.rs | 1 + src/proxy/buffer.rs | 59 +++++++++----- src/proxy/canonicalize.rs | 19 ++--- src/proxy/http/balance.rs | 56 +++++++++---- src/proxy/http/client.rs | 31 ++++--- src/proxy/http/glue.rs | 30 +++---- src/proxy/http/header_from_target.rs | 13 ++- src/proxy/http/insert_target.rs | 13 ++- src/proxy/http/metrics/classify.rs | 13 ++- src/proxy/http/metrics/service.rs | 81 ++++++++++++------- src/proxy/http/normalize_uri.rs | 13 ++- src/proxy/http/orig_proto.rs | 34 ++++---- src/proxy/http/profiles.rs | 18 ++--- src/proxy/http/router.rs | 25 +++--- src/proxy/http/settings.rs | 73 ++++++++++------- src/proxy/limit.rs | 59 +++++++++----- src/proxy/reconnect.rs | 74 +++++++++-------- src/proxy/resolve.rs | 12 +-- src/proxy/server.rs | 14 ++-- src/svc.rs | 2 +- src/tap/service.rs | 59 +++++--------- tests/support/client.rs | 4 +- tests/support/controller.rs | 14 ++-- tests/support/mod.rs | 2 +- tests/support/server.rs | 32 ++++---- 45 files changed, 716 insertions(+), 559 deletions(-) create mode 100644 lib/stack/src/stack_make_service.rs delete mode 100644 lib/stack/src/stack_new_service.rs diff --git a/Cargo.lock b/Cargo.lock index 5cd58583e..76832d705 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -113,9 +113,9 @@ dependencies = [ [[package]] name = "codegen" version = "0.1.0" -source = "git+https://github.com/carllerche/codegen#9b2f81859e91931871456ad06437643585d35866" +source = "git+https://github.com/carllerche/codegen#2d4dcc96f530ba163674d5efa985c3b133b3022e" dependencies = [ - "indexmap 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -304,7 +304,7 @@ dependencies = [ "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", "http 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", - "indexmap 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "slab 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "string 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -381,7 +381,7 @@ dependencies = [ [[package]] name = "indexmap" -version = "1.0.0" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] @@ -500,7 +500,7 @@ dependencies = [ "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", "http 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.12.9 (registry+https://github.com/rust-lang/crates.io-index)", - "indexmap 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "quickcheck 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -523,7 +523,7 @@ dependencies = [ "http 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", "httparse 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.12.9 (registry+https://github.com/rust-lang/crates.io-index)", - "indexmap 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "inotify 0.5.2-dev (git+https://github.com/inotify-rs/inotify)", "ipnet 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.40 (registry+https://github.com/rust-lang/crates.io-index)", @@ -558,6 +558,7 @@ dependencies = [ "tower-grpc 0.1.0 (git+https://github.com/tower-rs/tower-grpc)", "tower-h2 0.1.0 (git+https://github.com/tower-rs/tower-h2)", "tower-h2-balance 0.1.0 (git+https://github.com/tower-rs/tower-h2)", + "tower-http 0.1.0 (git+https://github.com/tower-rs/tower-http)", "tower-in-flight-limit 0.1.0 (git+https://github.com/tower-rs/tower)", "tower-reconnect 0.1.0 (git+https://github.com/tower-rs/tower)", "tower-service 0.1.0 (git+https://github.com/tower-rs/tower)", @@ -591,7 +592,7 @@ name = "linkerd2-router" version = "0.1.0" dependencies = [ "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", - "indexmap 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "linkerd2-stack 0.1.0", "tower-service 0.1.0 (git+https://github.com/tower-rs/tower)", ] @@ -1303,7 +1304,7 @@ dependencies = [ [[package]] name = "tower-add-origin" version = "0.1.0" -source = "git+https://github.com/tower-rs/tower-http#c9e13f641a681b3ef01e96910789586e39aee2e2" +source = "git+https://github.com/tower-rs/tower-http#3599ce02f063cf7db5aae3edcdaeb9073a7ebc33" dependencies = [ "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", "http 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1313,10 +1314,10 @@ dependencies = [ [[package]] name = "tower-balance" version = "0.1.0" -source = "git+https://github.com/tower-rs/tower#b95c8d103056d9876b80b856b5f76754bf0f7b85" +source = "git+https://github.com/tower-rs/tower#f21e3e4df07a3c474f6873b5e02a90e3e574ef46" dependencies = [ "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", - "indexmap 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-timer 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1327,7 +1328,7 @@ dependencies = [ [[package]] name = "tower-buffer" version = "0.1.0" -source = "git+https://github.com/tower-rs/tower#b95c8d103056d9876b80b856b5f76754bf0f7b85" +source = "git+https://github.com/tower-rs/tower#f21e3e4df07a3c474f6873b5e02a90e3e574ef46" dependencies = [ "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", "tower-service 0.1.0 (git+https://github.com/tower-rs/tower)", @@ -1336,7 +1337,7 @@ dependencies = [ [[package]] name = "tower-discover" version = "0.1.0" -source = "git+https://github.com/tower-rs/tower#b95c8d103056d9876b80b856b5f76754bf0f7b85" +source = "git+https://github.com/tower-rs/tower#f21e3e4df07a3c474f6873b5e02a90e3e574ef46" dependencies = [ "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", "tower-service 0.1.0 (git+https://github.com/tower-rs/tower)", @@ -1345,7 +1346,7 @@ dependencies = [ [[package]] name = "tower-grpc" version = "0.1.0" -source = "git+https://github.com/tower-rs/tower-grpc#0afac3d409febe20db4432b5abe06dbd72bd4c95" +source = "git+https://github.com/tower-rs/tower-grpc#40b059abac9ca07edac252f4d0b69c55f6ecf88d" dependencies = [ "bytes 0.4.9 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1354,13 +1355,14 @@ dependencies = [ "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "prost 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "tower-h2 0.1.0 (git+https://github.com/tower-rs/tower-h2)", + "tower-http 0.1.0 (git+https://github.com/tower-rs/tower-http)", "tower-service 0.1.0 (git+https://github.com/tower-rs/tower)", ] [[package]] name = "tower-grpc-build" version = "0.1.0" -source = "git+https://github.com/tower-rs/tower-grpc#0afac3d409febe20db4432b5abe06dbd72bd4c95" +source = "git+https://github.com/tower-rs/tower-grpc#40b059abac9ca07edac252f4d0b69c55f6ecf88d" dependencies = [ "codegen 0.1.0 (git+https://github.com/carllerche/codegen)", "heck 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1370,7 +1372,7 @@ dependencies = [ [[package]] name = "tower-h2" version = "0.1.0" -source = "git+https://github.com/tower-rs/tower-h2#1299be66fee7be919699d7c1edfb30adac03d4c1" +source = "git+https://github.com/tower-rs/tower-h2#9b96d8d5eabe56a44a7d01228b14e96d875e84b3" dependencies = [ "bytes 0.4.9 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1385,7 +1387,7 @@ dependencies = [ [[package]] name = "tower-h2-balance" version = "0.1.0" -source = "git+https://github.com/tower-rs/tower-h2#760f9fc1c83c3a96edb6fbddb6b0cd3cac73d8ac" +source = "git+https://github.com/tower-rs/tower-h2#9b96d8d5eabe56a44a7d01228b14e96d875e84b3" dependencies = [ "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", "h2 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1395,10 +1397,21 @@ dependencies = [ "tower-service 0.1.0 (git+https://github.com/tower-rs/tower)", ] +[[package]] +name = "tower-http" +version = "0.1.0" +source = "git+https://github.com/tower-rs/tower-http#3599ce02f063cf7db5aae3edcdaeb9073a7ebc33" +dependencies = [ + "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", + "http 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", + "tower-add-origin 0.1.0 (git+https://github.com/tower-rs/tower-http)", + "tower-service 0.1.0 (git+https://github.com/tower-rs/tower)", +] + [[package]] name = "tower-in-flight-limit" version = "0.1.0" -source = "git+https://github.com/tower-rs/tower#b95c8d103056d9876b80b856b5f76754bf0f7b85" +source = "git+https://github.com/tower-rs/tower#f21e3e4df07a3c474f6873b5e02a90e3e574ef46" dependencies = [ "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", "tower-service 0.1.0 (git+https://github.com/tower-rs/tower)", @@ -1407,7 +1420,7 @@ dependencies = [ [[package]] name = "tower-reconnect" version = "0.1.0" -source = "git+https://github.com/tower-rs/tower#b95c8d103056d9876b80b856b5f76754bf0f7b85" +source = "git+https://github.com/tower-rs/tower#f21e3e4df07a3c474f6873b5e02a90e3e574ef46" dependencies = [ "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1417,7 +1430,7 @@ dependencies = [ [[package]] name = "tower-service" version = "0.1.0" -source = "git+https://github.com/tower-rs/tower#b95c8d103056d9876b80b856b5f76754bf0f7b85" +source = "git+https://github.com/tower-rs/tower#f21e3e4df07a3c474f6873b5e02a90e3e574ef46" dependencies = [ "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1425,7 +1438,7 @@ dependencies = [ [[package]] name = "tower-util" version = "0.1.0" -source = "git+https://github.com/tower-rs/tower#b95c8d103056d9876b80b856b5f76754bf0f7b85" +source = "git+https://github.com/tower-rs/tower#f21e3e4df07a3c474f6873b5e02a90e3e574ef46" dependencies = [ "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", "tower-service 0.1.0 (git+https://github.com/tower-rs/tower)", @@ -1683,7 +1696,7 @@ dependencies = [ "checksum httparse 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7b6288d7db100340ca12873fd4d08ad1b8f206a9457798dfb17c018a33fee540" "checksum hyper 0.12.9 (registry+https://github.com/rust-lang/crates.io-index)" = "081289d17dce471c8cbc0e69a3dd073b627e08338561d1167ab620b754d9fe90" "checksum idna 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "014b298351066f1512874135335d62a789ffe78a9974f94b43ed5621951eaf7d" -"checksum indexmap 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7b9378f1f3923647a9aea6af4c6b5de68cc8a71415459ad25ef191191c48f5b7" +"checksum indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7e81a7c05f79578dbc15793d8b619db9ba32b4577003ef3af1a91c416798c58d" "checksum inotify 0.5.2-dev (git+https://github.com/inotify-rs/inotify)" = "" "checksum inotify-sys 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7dceb94c43f70baf4c4cd6afbc1e9037d4161dbe68df8a2cd4351a23319ee4fb" "checksum iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dbe6e417e7d0975db6512b90796e8ce223145ac4e33c377e4a42882a0e88bb08" @@ -1780,6 +1793,7 @@ dependencies = [ "checksum tower-grpc-build 0.1.0 (git+https://github.com/tower-rs/tower-grpc)" = "" "checksum tower-h2 0.1.0 (git+https://github.com/tower-rs/tower-h2)" = "" "checksum tower-h2-balance 0.1.0 (git+https://github.com/tower-rs/tower-h2)" = "" +"checksum tower-http 0.1.0 (git+https://github.com/tower-rs/tower-http)" = "" "checksum tower-in-flight-limit 0.1.0 (git+https://github.com/tower-rs/tower)" = "" "checksum tower-reconnect 0.1.0 (git+https://github.com/tower-rs/tower)" = "" "checksum tower-service 0.1.0 (git+https://github.com/tower-rs/tower)" = "" diff --git a/Cargo.toml b/Cargo.toml index e66274ffc..597cc5b75 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,6 +65,7 @@ tower-in-flight-limit = { git = "https://github.com/tower-rs/tower" } tower-reconnect = { git = "https://github.com/tower-rs/tower" } tower-service = { git = "https://github.com/tower-rs/tower" } tower-util = { git = "https://github.com/tower-rs/tower" } +tower-http = { git = "https://github.com/tower-rs/tower-http" } tower-h2 = { git = "https://github.com/tower-rs/tower-h2" } tower-h2-balance = { git = "https://github.com/tower-rs/tower-h2" } tower-grpc = { git = "https://github.com/tower-rs/tower-grpc" } diff --git a/lib/router/src/lib.rs b/lib/router/src/lib.rs index 733c8091c..4fc75bf79 100644 --- a/lib/router/src/lib.rs +++ b/lib/router/src/lib.rs @@ -19,7 +19,7 @@ pub struct Router where Rec: Recognize, Stk: stack::Stack, - Stk::Value: svc::Service, + Stk::Value: svc::Service, { inner: Arc>, } @@ -57,7 +57,7 @@ struct Inner where Rec: Recognize, Stk: stack::Stack, - Stk::Value: svc::Service, + Stk::Value: svc::Service, { recognize: Rec, make: Stk, @@ -95,7 +95,7 @@ impl Router where Rec: Recognize, Stk: stack::Stack, - Stk::Value: svc::Service, + Stk::Value: svc::Service, { pub fn new(recognize: Rec, make: Stk, capacity: usize, max_idle_age: Duration) -> Self { Router { @@ -108,16 +108,15 @@ where } } -impl svc::Service for Router +impl svc::Service for Router where Rec: Recognize, Stk: stack::Stack, - Stk::Value: svc::Service, + Stk::Value: svc::Service, { - type Request = ::Request; - type Response = ::Response; - type Error = Error<::Error, Stk::Error>; - type Future = ResponseFuture<::Future, Stk::Error>; + type Response = >::Response; + type Error = Error<>::Error, Stk::Error>; + type Future = ResponseFuture<>::Future, Stk::Error>; /// Always ready to serve. /// @@ -133,7 +132,7 @@ where /// Routes the request through an underlying service. /// /// The response fails when the request cannot be routed. - fn call(&mut self, request: Self::Request) -> Self::Future { + fn call(&mut self, request: Req) -> Self::Future { let target = match self.inner.recognize.recognize(&request) { Some(target) => target, None => return ResponseFuture::not_recognized(), @@ -172,7 +171,7 @@ impl Clone for Router where Rec: Recognize, Stk: stack::Stack, - Stk::Value: svc::Service, + Stk::Value: svc::Service, { fn clone(&self) -> Self { Router { inner: self.inner.clone() } @@ -311,8 +310,7 @@ mod test_util { } } - impl Service for MultiplyAndAssign { - type Request = Request; + impl Service for MultiplyAndAssign { type Response = usize; type Error = (); type Future = future::FutureResult; @@ -321,7 +319,7 @@ mod test_util { unimplemented!() } - fn call(&mut self, req: Self::Request) -> Self::Future { + fn call(&mut self, req: Request) -> Self::Future { let n = match req { Request::NotRecognized => unreachable!(), Request::Recognized(n) => n, diff --git a/lib/stack/src/either.rs b/lib/stack/src/either.rs index cc13c7df4..705cac08b 100644 --- a/lib/stack/src/either.rs +++ b/lib/stack/src/either.rs @@ -44,12 +44,11 @@ where } } -impl svc::Service for Either +impl svc::Service for Either where - A: svc::Service, - B: svc::Service, + A: svc::Service, + B: svc::Service, { - type Request = A::Request; type Response = A::Response; type Error = Either; type Future = future::Either< @@ -64,7 +63,7 @@ where } } - fn call(&mut self, req: Self::Request) -> Self::Future { + fn call(&mut self, req: R) -> Self::Future { match self { Either::A(ref mut a) => future::Either::A(a.call(req).map_err(Either::A)), Either::B(ref mut b) => future::Either::B(b.call(req).map_err(Either::B)), diff --git a/lib/stack/src/lib.rs b/lib/stack/src/lib.rs index 8ab044c5f..9f7a0232b 100644 --- a/lib/stack/src/lib.rs +++ b/lib/stack/src/lib.rs @@ -9,13 +9,13 @@ pub mod layer; mod map_err; pub mod map_target; pub mod phantom_data; -pub mod stack_new_service; +pub mod stack_make_service; pub mod stack_per_request; pub mod watch; pub use self::either::Either; pub use self::layer::Layer; -pub use self::stack_new_service::StackNewService; +pub use self::stack_make_service::StackMakeService; /// A composable builder. /// diff --git a/lib/stack/src/stack_make_service.rs b/lib/stack/src/stack_make_service.rs new file mode 100644 index 000000000..597798971 --- /dev/null +++ b/lib/stack/src/stack_make_service.rs @@ -0,0 +1,37 @@ +use futures::{future, Poll}; +use svc; + +use super::Stack; + +/// Implements `MakeService` using a `Stack` of `Service`. +#[derive(Clone, Debug)] +pub struct StackMakeService> { + make: M, + target: T, +} + +impl StackMakeService +where + M: Stack, +{ + pub fn new(make: M, target: T) -> Self { + Self { make, target } + } +} + +impl svc::Service<()> for StackMakeService +where + M: Stack, +{ + type Response = M::Value; + type Error = M::Error; + type Future = future::FutureResult; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(().into()) + } + + fn call(&mut self, _target: ()) -> Self::Future { + future::result(self.make.make(&self.target)) + } +} diff --git a/lib/stack/src/stack_new_service.rs b/lib/stack/src/stack_new_service.rs deleted file mode 100644 index 46c09de7e..000000000 --- a/lib/stack/src/stack_new_service.rs +++ /dev/null @@ -1,38 +0,0 @@ -use futures::future; -use svc; - -use super::Stack; - -/// Implements `NewService` using a `Stack` of `Service`. -#[derive(Clone, Debug)] -pub struct StackNewService> { - make: M, - target: T, -} - -impl StackNewService -where - M: Stack, - M::Value: svc::Service, -{ - pub fn new(make: M, target: T) -> StackNewService { - Self { make, target } - } -} - -impl svc::NewService for StackNewService -where - M: Stack, - M::Value: svc::Service, -{ - type Request = ::Request; - type Response = ::Response; - type Error = ::Error; - type Service = M::Value; - type InitError = M::Error; - type Future = future::FutureResult; - - fn new_service(&self) -> Self::Future { - future::result(self.make.make(&self.target)) - } -} diff --git a/lib/stack/src/stack_per_request.rs b/lib/stack/src/stack_per_request.rs index 50004ca51..46adb6655 100644 --- a/lib/stack/src/stack_per_request.rs +++ b/lib/stack/src/stack_per_request.rs @@ -90,17 +90,16 @@ where // === Service === -impl svc::Service for Service +impl svc::Service for Service where T: ShouldStackPerRequest + Clone, N: super::Stack + Clone, - N::Value: svc::Service, + N::Value: svc::Service, N::Error: fmt::Debug, { - type Request = ::Request; - type Response = ::Response; - type Error = ::Error; - type Future = ::Future; + type Response = >::Response; + type Error = >::Error; + type Future = >::Future; fn poll_ready(&mut self) -> Poll<(), Self::Error> { if let Some(ref mut svc) = self.next { @@ -114,7 +113,7 @@ where Ok(ready) } - fn call(&mut self, request: Self::Request) -> Self::Future { + fn call(&mut self, request: R) -> Self::Future { // If a service has already been bound in `poll_ready`, consume it. // Otherwise, bind a new service on-the-spot. self.next diff --git a/lib/stack/src/watch.rs b/lib/stack/src/watch.rs index 0614d84ad..c8222282f 100644 --- a/lib/stack/src/watch.rs +++ b/lib/stack/src/watch.rs @@ -120,18 +120,17 @@ where // === impl Service === -impl svc::Service for Service +impl svc::Service for Service where T: WithUpdate, M: super::Stack, - M::Value: svc::Service, + M::Value: svc::Service, { - type Request = ::Request; - type Response = ::Response; - type Error = Error<::Error, M::Error>; + type Response = >::Response; + type Error = Error<>::Error, M::Error>; type Future = MapErr< - ::Future, - fn(::Error) -> Self::Error, + >::Future, + fn(>::Error) -> Self::Error, >; fn poll_ready(&mut self) -> Poll<(), Self::Error> { @@ -149,7 +148,7 @@ where self.inner.poll_ready().map_err(Error::Inner) } - fn call(&mut self, req: Self::Request) -> Self::Future { + fn call(&mut self, req: R) -> Self::Future { self.inner.call(req).map_err(Error::Inner) } } @@ -158,7 +157,6 @@ impl Service where U: Clone, M: super::Stack, - M::Value: svc::Service, { pub fn try(watch: Watch, stack: M) -> Result { let inner = stack.make(&*watch.borrow())?; @@ -175,7 +173,7 @@ impl Clone for Service where T: WithUpdate + Clone, M: super::Stack + Clone, - M::Value: svc::Service + Clone, + M::Value: Clone, { fn clone(&self) -> Self { Self { @@ -227,8 +225,7 @@ mod tests { #[test] fn rebind() { struct Svc(usize); - impl svc::Service for Svc { - type Request = (); + impl svc::Service<()> for Svc { type Response = usize; type Error = (); type Future = future::FutureResult; diff --git a/lib/timeout/src/lib.rs b/lib/timeout/src/lib.rs index eca7bbcec..3c0226555 100644 --- a/lib/timeout/src/lib.rs +++ b/lib/timeout/src/lib.rs @@ -69,11 +69,10 @@ impl Timeout { } } -impl svc::Service for Timeout +impl svc::Service for Timeout where - S: svc::Service, + S: svc::Service, { - type Request = S::Request; type Response = T; type Error = Error; type Future = Timeout>; @@ -82,7 +81,7 @@ where self.inner.poll_ready().map_err(|e| self.error(e)) } - fn call(&mut self, req: Self::Request) -> Self::Future { + fn call(&mut self, req: Req) -> Self::Future { let inner = timer::Timeout::new(self.inner.call(req), self.duration); Timeout { inner, diff --git a/src/app/control.rs b/src/app/control.rs index e1ee99a14..f9bb4a5f8 100644 --- a/src/app/control.rs +++ b/src/app/control.rs @@ -162,7 +162,7 @@ pub mod resolve { pub struct Init where M: svc::Stack, - M::Value: svc::NewService, + M::Value: svc::Service<()>, { state: State, } @@ -170,14 +170,14 @@ pub mod resolve { enum State where M: svc::Stack, - M::Value: svc::NewService, + M::Value: svc::Service<()>, { Resolve { future: dns::IpAddrFuture, config: super::Config, stack: M, }, - Inner(::Future), + Inner(>::Future), Invalid(Option), } @@ -245,19 +245,20 @@ pub mod resolve { // === impl NewService === - impl svc::NewService for NewService + impl svc::Service<()> for NewService where M: svc::Stack + Clone, - M::Value: svc::NewService, + M::Value: svc::Service<()>, { - type Request = ::Request; - type Response = ::Response; - type Error = ::Error; - type Service = ::Service; - type InitError = as Future>::Error; + type Response = >::Response; + type Error = as Future>::Error; type Future = Init; - fn new_service(&self) -> Self::Future { + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(().into()) + } + + fn call(&mut self, _target: ()) -> Self::Future { let state = match self.config.addr { Addr::Socket(sa) => State::make_inner(sa, &self.config, &self.stack), Addr::Name(ref na) => State::Resolve { @@ -276,10 +277,10 @@ pub mod resolve { impl Future for Init where M: svc::Stack, - M::Value: svc::NewService, + M::Value: svc::Service<()>, { - type Item = ::Service; - type Error = Error::InitError>; + type Item = >::Response; + type Error = Error>::Error>; fn poll(&mut self) -> Poll { loop { @@ -307,7 +308,7 @@ pub mod resolve { impl State where M: svc::Stack, - M::Value: svc::NewService, + M::Value: svc::Service<()>, { fn make_inner(addr: SocketAddr, config: &super::Config, stack: &M) -> Self { let tls = config.tls_server_identity.as_ref().and_then(|id| { @@ -327,7 +328,7 @@ pub mod resolve { }; match stack.make(&target) { - Ok(n) => State::Inner(svc::NewService::new_service(&n)), + Ok(mut n) => State::Inner(svc::Service::call(&mut n, ())), Err(e) => State::Invalid(Some(e)), } } @@ -353,7 +354,8 @@ pub mod resolve { pub mod client { use h2; use std::marker::PhantomData; - use tower_h2::{client, BoxBody}; + use tower_h2::client; + use tower_grpc::BoxBody; use svc; use transport::connect; @@ -442,10 +444,11 @@ pub mod client { } pub mod box_request_body { + use bytes::Bytes; use http; use futures::Poll; use std::marker::PhantomData; - use tower_h2::{Body, BoxBody}; + use tower_grpc::{Body, BoxBody}; use svc; @@ -481,9 +484,9 @@ pub mod box_request_body { impl svc::Layer for Layer where - B: Body + Send + 'static, + B: Body + Send + 'static, M: svc::Stack, - M::Value: svc::Service>>, + M::Value: svc::Service>, { type Value = as svc::Stack>::Value; type Error = as svc::Stack>::Error; @@ -507,9 +510,9 @@ pub mod box_request_body { impl svc::Stack for Stack where - B: Body + Send + 'static, + B: Body + Send + 'static, M: svc::Stack, - M::Value: svc::Service>>, + M::Value: svc::Service>, { type Value = Service; type Error = M::Error; @@ -522,12 +525,11 @@ pub mod box_request_body { // === impl Service === - impl svc::Service for Service + impl svc::Service> for Service where - B: Body + Send + 'static, - S: svc::Service>>, + B: Body + Send + 'static, + S: svc::Service>, { - type Request = http::Request; type Response = S::Response; type Error = S::Error; type Future = S::Future; diff --git a/src/app/inbound.rs b/src/app/inbound.rs index 43e8ff1b7..7ef081c60 100644 --- a/src/app/inbound.rs +++ b/src/app/inbound.rs @@ -103,57 +103,75 @@ impl router::Recognize> for RecognizeEndpoint { } pub mod orig_proto_downgrade { + use std::marker::PhantomData; use http; - use proxy::http::orig_proto; use proxy::server::Source; use svc; - #[derive(Debug, Clone)] - pub struct Layer; + #[derive(Debug)] + pub struct Layer(PhantomData B>); - #[derive(Clone, Debug)] - pub struct Stack - where - M: svc::Stack, - { + #[derive(Debug)] + pub struct Stack { inner: M, + _marker: PhantomData B>, } // === impl Layer === - pub fn layer() -> Layer { - Layer + pub fn layer() -> Layer { + Layer(PhantomData) } - impl svc::Layer for Layer + impl Clone for Layer { + fn clone(&self) -> Self { + Layer(PhantomData) + } + } + + impl svc::Layer for Layer where M: svc::Stack, - M::Value: svc::Service, Response = http::Response>, + M::Value: svc::Service, Response = http::Response>, { - type Value = as svc::Stack>::Value; - type Error = as svc::Stack>::Error; - type Stack = Stack; + type Value = as svc::Stack>::Value; + type Error = as svc::Stack>::Error; + type Stack = Stack; fn bind(&self, inner: M) -> Self::Stack { - Stack { inner } + Stack { + inner, + _marker: PhantomData, + } } } // === impl Stack === - impl svc::Stack for Stack + impl Clone for Stack { + fn clone(&self) -> Self { + Stack { + inner: self.inner.clone(), + _marker: PhantomData, + } + } + } + + impl svc::Stack for Stack where M: svc::Stack, - M::Value: svc::Service, Response = http::Response>, + M::Value: svc::Service, Response = http::Response>, { type Value = orig_proto::Downgrade; type Error = M::Error; fn make(&self, target: &Source) -> Result { info!("downgrading requests; source={:?}", target); - let inner = self.inner.make(&target)?; - Ok(inner.into()) + self + .inner + .make(&target) + .map(orig_proto::Downgrade::new) } } } diff --git a/src/app/main.rs b/src/app/main.rs index 633aa6b8b..e7673fa42 100644 --- a/src/app/main.rs +++ b/src/app/main.rs @@ -326,7 +326,7 @@ where // request version and headers). let endpoint_stack = client_stack .push(buffer::layer()) - .push(settings::router::layer::()) + .push(settings::router::layer::()) .push(orig_proto_upgrade::layer()) .push(tap::layer(tap_next_id.clone(), taps.clone())) .push(metrics::layer::<_, classify::Response>( @@ -476,7 +476,7 @@ where // `default_fwd_addr` may be used. let endpoint_router = client_stack .push(buffer::layer()) - .push(settings::router::layer::()) + .push(settings::router::layer::()) .push(tap::layer(tap_next_id, taps)) .push(http_metrics::layer::<_, classify::Response>( endpoint_http_metrics, @@ -655,10 +655,10 @@ where ::Error: fmt::Debug + 'static, R: svc::Stack + Send + Clone + 'static, R::Value: - svc::Service, Response = http::Response>, + svc::Service, Response = http::Response>, R::Value: Send + 'static, - ::Error: error::Error + Send + Sync + 'static, - ::Future: Send + 'static, + >>::Error: error::Error + Send + Sync + 'static, + >>::Future: Send + 'static, B: tower_h2::Body + Default + Send + 'static, B::Data: Send, ::Buf: Send, @@ -734,7 +734,7 @@ fn serve_tap( where B: tower_h2::Body + Send + 'static, ::Buf: Send, - N: svc::NewService, Response = http::Response> + N: svc::MakeService<(), http::Request, Response = http::Response> + Send + 'static, tower_h2::server::Connection: @@ -748,7 +748,7 @@ where let log = log.clone(); // TODO: serve over TLS. bound_port - .listen_and_fold(server, move |server, (session, remote)| { + .listen_and_fold(server, move |mut server, (session, remote)| { let log = log.clone().with_remote(remote); let serve = server.serve(session).map_err(|_| ()); diff --git a/src/app/outbound.rs b/src/app/outbound.rs index e5c453f61..fb33fbdbb 100644 --- a/src/app/outbound.rs +++ b/src/app/outbound.rs @@ -159,54 +159,72 @@ pub mod discovery { } pub mod orig_proto_upgrade { + use std::marker::PhantomData; + use http; use super::Endpoint; use proxy::http::orig_proto; use svc; - #[derive(Debug, Clone)] - pub struct Layer; + #[derive(Debug)] + pub struct Layer(PhantomData B>); - #[derive(Clone, Debug)] - pub struct Stack - where - M: svc::Stack, - { + #[derive(Debug)] + pub struct Stack { inner: M, + _marker: PhantomData B>, } - pub fn layer() -> Layer { - Layer + pub fn layer() -> Layer { + Layer(PhantomData) } - impl svc::Layer for Layer + impl Clone for Layer { + fn clone(&self) -> Self { + Layer(PhantomData) + } + } + + impl svc::Layer for Layer where M: svc::Stack, - M::Value: svc::Service, Response = http::Response>, + M::Value: svc::Service, Response = http::Response>, { - type Value = as svc::Stack>::Value; - type Error = as svc::Stack>::Error; - type Stack = Stack; + type Value = as svc::Stack>::Value; + type Error = as svc::Stack>::Error; + type Stack = Stack; fn bind(&self, inner: M) -> Self::Stack { - Stack { inner } + Stack { + inner, + _marker: PhantomData, + } } } // === impl Stack === - impl svc::Stack for Stack + impl Clone for Stack { + fn clone(&self) -> Self { + Stack { + inner: self.inner.clone(), + _marker: PhantomData, + } + } + } + + impl svc::Stack for Stack where M: svc::Stack, - M::Value: svc::Service, Response = http::Response>, + M::Value: svc::Service, Response = http::Response>, { type Value = svc::Either, M::Value>; type Error = M::Error; fn make(&self, endpoint: &Endpoint) -> Result { if endpoint.can_use_orig_proto() { - self.inner.make(&endpoint).map(|i| svc::Either::A(i.into())) + self.inner.make(&endpoint).map(|i| svc::Either::A(orig_proto::Upgrade::new(i))) } else { self.inner.make(&endpoint).map(svc::Either::B) } diff --git a/src/app/profiles.rs b/src/app/profiles.rs index 9f7f2460b..e6f43c08b 100644 --- a/src/app/profiles.rs +++ b/src/app/profiles.rs @@ -4,8 +4,8 @@ use regex::Regex; use std::fmt; use std::time::Duration; use tokio_timer::{clock, Delay}; -use tower_grpc as grpc; -use tower_h2::{Body, BoxBody, Data, HttpService}; +use tower_grpc::{self as grpc, Body, BoxBody}; +use tower_http::HttpService; use api::destination as api; @@ -18,14 +18,22 @@ pub struct Client { backoff: Duration, } -pub struct Rx { +pub struct Rx +where + T: HttpService, + T::ResponseBody: Body, +{ dst: String, backoff: Duration, service: Option, state: State, } -enum State { +enum State +where + T: HttpService, + T::ResponseBody: Body, +{ Disconnected, Backoff(Delay), Waiting(grpc::client::server_streaming::ResponseFuture), @@ -36,8 +44,8 @@ enum State { impl Client where - T: HttpService + Clone, - T::ResponseBody: Body, + T: HttpService + Clone, + T::ResponseBody: Body, T::Error: fmt::Debug, { pub fn new(service: Option, backoff: Duration) -> Self { @@ -50,8 +58,8 @@ where impl profiles::GetRoutes for Client where - T: HttpService + Clone, - T::ResponseBody: Body, + T: HttpService + Clone, + T::ResponseBody: Body, T::Error: fmt::Debug, { type Stream = Rx; @@ -70,8 +78,8 @@ where impl Stream for Rx where - T: HttpService + Clone, - T::ResponseBody: Body, + T: HttpService + Clone, + T::ResponseBody: Body, T::Error: fmt::Debug, { type Item = Vec<(profiles::RequestMatch, profiles::Route)>; diff --git a/src/control/destination/background/destination_set.rs b/src/control/destination/background/destination_set.rs index d4ba1ca2b..636aac399 100644 --- a/src/control/destination/background/destination_set.rs +++ b/src/control/destination/background/destination_set.rs @@ -8,7 +8,8 @@ use std::{ }; use futures::{Async, Future, Stream,}; -use tower_h2::{Body, Data, HttpService}; +use tower_http::HttpService; +use tower_grpc::{Body, BoxBody}; use api::{ destination::{ @@ -31,7 +32,11 @@ use {Conditional, NameAddr}; use super::{ActiveQuery, DestinationServiceQuery, UpdateRx}; /// Holds the state of a single resolution. -pub(super) struct DestinationSet { +pub(super) struct DestinationSet +where + T: HttpService, + T::ResponseBody: Body, +{ pub addrs: Exists>, pub query: DestinationServiceQuery, pub dns_query: Option, @@ -42,8 +47,8 @@ pub(super) struct DestinationSet { impl DestinationSet where - T: HttpService, - T::ResponseBody: Body, + T: HttpService, + T::ResponseBody: Body, T::Error: fmt::Debug, { pub(super) fn reset_dns_query( @@ -182,8 +187,8 @@ where impl DestinationSet where - T: HttpService, - T::ResponseBody: Body, + T: HttpService, + T::ResponseBody: Body, { /// Returns `true` if the authority that created this query _should_ query /// the Destination service, but was unable to due to insufficient capaacity. diff --git a/src/control/destination/background/mod.rs b/src/control/destination/background/mod.rs index 31fc7b0f7..be3b48dec 100644 --- a/src/control/destination/background/mod.rs +++ b/src/control/destination/background/mod.rs @@ -12,8 +12,8 @@ use futures::{ sync::mpsc, Async, Poll, Stream, }; -use tower_grpc as grpc; -use tower_h2::{Body, BoxBody, Data, HttpService}; +use tower_grpc::{self as grpc, Body, BoxBody}; +use tower_http::HttpService; use api::destination::client::Destination; use api::destination::{ @@ -43,7 +43,11 @@ type UpdateRx = Receiver; /// service is healthy, it reads requests from `request_rx`, determines how to resolve the /// provided authority to a set of addresses, and ensures that resolution updates are /// propagated to all requesters. -pub(super) struct Background { +pub(super) struct Background +where + T: HttpService, + T::ResponseBody: Body, +{ new_query: NewQuery, dns_resolver: dns::Resolver, dsts: DestinationCache, @@ -57,7 +61,11 @@ pub(super) struct Background { /// Holds the currently active `DestinationSet`s and a list of any destinations /// which require reconnects. #[derive(Default)] -struct DestinationCache { +struct DestinationCache +where + T: HttpService, + T::ResponseBody: Body, +{ destinations: HashMap>, /// A queue of authorities that need to be reconnected. reconnects: VecDeque, @@ -78,7 +86,11 @@ struct NewQuery { concurrency_limit: usize, } -enum DestinationServiceQuery { +enum DestinationServiceQuery +where + T: HttpService, + T::ResponseBody: Body, +{ Inactive, Active(ActiveQuery), NoCapacity, @@ -88,8 +100,8 @@ enum DestinationServiceQuery { impl Background where - T: HttpService, - T::ResponseBody: Body, + T: HttpService, + T::ResponseBody: Body, T::Error: fmt::Debug, { pub(super) fn new( @@ -344,8 +356,8 @@ impl NewQuery { connect_or_reconnect: &str, ) -> DestinationServiceQuery where - T: HttpService, - T::ResponseBody: Body, + T: HttpService, + T::ResponseBody: Body, T::Error: fmt::Debug, { trace!("DestinationServiceQuery {} {:?}", connect_or_reconnect, dst); @@ -396,8 +408,8 @@ impl NewQuery { impl DestinationCache where - T: HttpService, - T::ResponseBody: Body, + T: HttpService, + T::ResponseBody: Body, T::Error: fmt::Debug, { @@ -433,8 +445,8 @@ where impl DestinationServiceQuery where - T: HttpService, - T::ResponseBody: Body, + T: HttpService, + T::ResponseBody: Body, { pub fn is_active(&self) -> bool { @@ -461,8 +473,8 @@ where impl From> for DestinationServiceQuery where - T: HttpService, - T::ResponseBody: Body, + T: HttpService, + T::ResponseBody: Body, { fn from(active: ActiveQuery) -> Self { DestinationServiceQuery::Active(active) diff --git a/src/control/destination/mod.rs b/src/control/destination/mod.rs index d4071f820..dad8506f5 100644 --- a/src/control/destination/mod.rs +++ b/src/control/destination/mod.rs @@ -35,7 +35,8 @@ use futures::{ use indexmap::IndexMap; use std::fmt; use std::sync::{Arc, Weak}; -use tower_h2::{Body, BoxBody, Data, HttpService}; +use tower_http::HttpService; +use tower_grpc::{Body, BoxBody}; use dns; use transport::tls; @@ -118,8 +119,8 @@ pub fn new( concurrency_limit: usize, ) -> (Resolver, impl Future) where - T: HttpService, - T::ResponseBody: Body, + T: HttpService, + T::ResponseBody: Body, T::Error: fmt::Debug, { let (request_tx, rx) = mpsc::unbounded(); diff --git a/src/control/observe.rs b/src/control/observe.rs index d3521be92..e4d149978 100644 --- a/src/control/observe.rs +++ b/src/control/observe.rs @@ -46,7 +46,7 @@ impl server::Tap for Observe { fn observe(&mut self, req: grpc::Request) -> Self::ObserveFuture { if self.next_id.load(Ordering::Acquire) == ::std::usize::MAX { return future::err(grpc::Error::Grpc( - grpc::Status::INTERNAL, + grpc::Status::with_code(grpc::Code::Internal), HeaderMap::new(), )); } @@ -58,7 +58,7 @@ impl server::Tap for Observe { Some(m) => m, None => { return future::err(grpc::Error::Grpc( - grpc::Status::INVALID_ARGUMENT, + grpc::Status::with_code(grpc::Code::InvalidArgument), HeaderMap::new(), )); } @@ -72,7 +72,7 @@ impl server::Tap for Observe { } Err(_) => { return future::err(grpc::Error::Grpc( - grpc::Status::INTERNAL, + grpc::Status::with_code(grpc::Code::Internal), HeaderMap::new(), )); } diff --git a/src/control/remote_stream.rs b/src/control/remote_stream.rs index 8b5bcfd0a..834acb75c 100644 --- a/src/control/remote_stream.rs +++ b/src/control/remote_stream.rs @@ -5,9 +5,11 @@ use std::{ fmt, sync::Weak, }; -use tower_h2::{HttpService, Body, Data}; +use tower_http::{HttpService}; use tower_grpc::{ self as grpc, + Body, + BoxBody, Streaming, client::server_streaming::ResponseFuture, }; @@ -16,7 +18,11 @@ use tower_grpc::{ /// /// A remote may hold a `Receiver` that can be used to read `M`-typed messages from the /// remote stream. -pub enum Remote { +pub enum Remote +where + S: HttpService, + S::ResponseBody: Body, +{ NeedsReconnect, ConnectedOrConnecting { rx: Receiver, @@ -28,7 +34,11 @@ pub enum Remote { /// Streaming gRPC endpoints return a `ResponseFuture` whose item is a `Response`. /// A `Receiver` holds the state of that RPC call, exposing a `Stream` that drives both /// the gRPC response and its streaming body. -pub struct Receiver { +pub struct Receiver +where + S: HttpService, + S::ResponseBody: Body, +{ rx: Rx, /// Used by `background::NewQuery` for counting the number of currently @@ -36,16 +46,20 @@ pub struct Receiver { _active: Weak<()>, } -enum Rx { +enum Rx +where + S: HttpService, + S::ResponseBody: Body, +{ Waiting(ResponseFuture), Streaming(Streaming), } // ===== impl Receiver ===== -impl Receiver +impl> Receiver where - S::ResponseBody: Body, + S::ResponseBody: Body, S::Error: fmt::Debug, { pub fn new(future: ResponseFuture, active: Weak<()>) -> Self { @@ -66,15 +80,15 @@ where // some reason does, we report this as an unknown error. warn!("unexpected gRPC stream error"); debug_assert!(false); - grpc::Error::Grpc(grpc::Status::UNKNOWN, HeaderMap::new()) + grpc::Error::Grpc(grpc::Status::with_code(grpc::Code::Unknown), HeaderMap::new()) } } } } -impl Stream for Receiver +impl> Stream for Receiver where - S::ResponseBody: Body, + S::ResponseBody: Body, S::Error: fmt::Debug, { type Item = M; diff --git a/src/lib.rs b/src/lib.rs index 756b5e35d..3f4cc5d90 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,6 +38,7 @@ extern crate tokio; extern crate tokio_timer; extern crate tower_grpc; extern crate tower_h2; +extern crate tower_http; extern crate tower_util; extern crate trust_dns_resolver; extern crate try_lock; diff --git a/src/proxy/buffer.rs b/src/proxy/buffer.rs index a6d6fcdde..c1c2c735d 100644 --- a/src/proxy/buffer.rs +++ b/src/proxy/buffer.rs @@ -1,6 +1,6 @@ extern crate tower_buffer; -use std::{error, fmt}; +use std::{error, fmt, marker::PhantomData}; pub use self::tower_buffer::{Buffer, Error as ServiceError, SpawnError}; @@ -8,13 +8,14 @@ use logging; use svc; /// Wraps `Service` stacks with a `Buffer`. -#[derive(Debug, Clone)] -pub struct Layer(); +#[derive(Debug)] +pub struct Layer(PhantomData); /// Produces `Service`s wrapped with a `Buffer` -#[derive(Debug, Clone)] -pub struct Stack { +#[derive(Debug)] +pub struct Stack { inner: M, + _marker: PhantomData, } pub enum Error { @@ -24,38 +25,56 @@ pub enum Error { // === impl Layer === -pub fn layer() -> Layer { - Layer() +pub fn layer() -> Layer { + Layer(PhantomData) } -impl svc::Layer for Layer +impl Clone for Layer { + fn clone(&self) -> Self { + Layer(PhantomData) + } +} + +impl svc::Layer for Layer where T: fmt::Display + Clone + Send + Sync + 'static, M: svc::Stack, - M::Value: svc::Service + Send + 'static, - ::Request: Send, - ::Future: Send, + M::Value: svc::Service + Send + 'static, + >::Future: Send, + Req: Send + 'static, { - type Value = as svc::Stack>::Value; - type Error = as svc::Stack>::Error; - type Stack = Stack; + type Value = as svc::Stack>::Value; + type Error = as svc::Stack>::Error; + type Stack = Stack; fn bind(&self, inner: M) -> Self::Stack { - Stack { inner } + Stack { + inner, + _marker: PhantomData, + } } } // === impl Stack === -impl svc::Stack for Stack +impl Clone for Stack { + fn clone(&self) -> Self { + Stack { + inner: self.inner.clone(), + _marker: PhantomData, + } + } +} + +impl svc::Stack for Stack where T: fmt::Display + Clone + Send + Sync + 'static, M: svc::Stack, - M::Value: svc::Service + Send + 'static, - ::Request: Send, - ::Future: Send, + M::Value: svc::Service + Send + 'static, + >::Future: Send, + Req: Send + 'static, { - type Value = Buffer; + type Value = Buffer; type Error = Error; fn make(&self, target: &T) -> Result { diff --git a/src/proxy/canonicalize.rs b/src/proxy/canonicalize.rs index 38b399e9f..92973a668 100644 --- a/src/proxy/canonicalize.rs +++ b/src/proxy/canonicalize.rs @@ -74,7 +74,6 @@ pub fn layer(resolver: dns::Resolver) -> Layer { impl svc::Layer for Layer where M: svc::Stack + Clone, - M::Value: svc::Service, { type Value = as svc::Stack>::Value; type Error = as svc::Stack>::Error; @@ -94,7 +93,6 @@ where impl svc::Stack for Stack where M: svc::Stack + Clone, - M::Value: svc::Service, { type Value = svc::Either, M::Value>; type Error = M::Error; @@ -120,7 +118,7 @@ where impl Service where M: svc::Stack, - M::Value: svc::Service, + //M::Value: svc::Service, { fn new(original: NameAddr, stack: M, resolver: dns::Resolver, timeout: Duration) -> Self { trace!("refining name={}", original.name()); @@ -205,17 +203,16 @@ where } } -impl svc::Service for Service +impl svc::Service for Service where M: svc::Stack, - M::Value: svc::Service, + M::Value: svc::Service, { - type Request = ::Request; - type Response = ::Response; - type Error = Error::Error>; + type Response = >::Response; + type Error = Error>::Error>; type Future = future::MapErr< - ::Future, - fn(::Error) -> Self::Error, + >::Future, + fn(>::Error) -> Self::Error, >; fn poll_ready(&mut self) -> Poll<(), Self::Error> { @@ -232,7 +229,7 @@ where } } - fn call(&mut self, req: Self::Request) -> Self::Future { + fn call(&mut self, req: Req) -> Self::Future { self.service .as_mut() .expect("poll_ready must be called first") diff --git a/src/proxy/http/balance.rs b/src/proxy/http/balance.rs index 53aa8a6bf..63d129e35 100644 --- a/src/proxy/http/balance.rs +++ b/src/proxy/http/balance.rs @@ -2,39 +2,43 @@ extern crate tower_balance; extern crate tower_discover; extern crate tower_h2_balance; -use http; +use std::marker::PhantomData; use std::time::Duration; use self::tower_discover::Discover; -use tower_h2::Body; pub use self::tower_balance::{choose::PowerOfTwoChoices, load::WithPeakEwma, Balance}; pub use self::tower_h2_balance::{PendingUntilFirstData, PendingUntilFirstDataBody}; +use http; use svc; +use tower_h2::Body; /// Configures a stack to resolve `T` typed targets to balance requests over /// `M`-typed endpoint stacks. -#[derive(Clone, Debug)] -pub struct Layer { +#[derive(Debug)] +pub struct Layer { decay: Duration, + _marker: PhantomData B>, } /// Resolves `T` typed targets to balance requests over `M`-typed endpoint stacks. -#[derive(Clone, Debug)] -pub struct Stack { +#[derive(Debug)] +pub struct Stack { decay: Duration, inner: M, + _marker: PhantomData B>, } // === impl Layer === -pub fn layer() -> Layer { +pub fn layer() -> Layer { Layer { decay: Layer::DEFAULT_DECAY, + _marker: PhantomData, } } -impl Layer { +impl Layer<(), ()> { const DEFAULT_DECAY: Duration = Duration::from_secs(10); // pub fn with_decay(self, decay: Duration) -> Self { @@ -45,31 +49,53 @@ impl Layer { // } } -impl svc::Layer for Layer +impl Clone for Layer { + fn clone(&self) -> Self { + Layer { + decay: self.decay, + _marker: PhantomData, + } + } +} + +impl svc::Layer for Layer where M: svc::Stack + Clone, - M::Value: Discover, Response = http::Response>, + M::Value: Discover, + ::Service: svc::Service, Response = http::Response>, A: Body, B: Body, { - type Value = as svc::Stack>::Value; - type Error = as svc::Stack>::Error; - type Stack = Stack; + type Value = as svc::Stack>::Value; + type Error = as svc::Stack>::Error; + type Stack = Stack; fn bind(&self, inner: M) -> Self::Stack { Stack { decay: self.decay, inner, + _marker: PhantomData, } } } // === impl Stack === -impl svc::Stack for Stack +impl Clone for Stack { + fn clone(&self) -> Self { + Stack { + decay: self.decay, + inner: self.inner.clone(), + _marker: PhantomData, + } + } +} + +impl svc::Stack for Stack where M: svc::Stack + Clone, - M::Value: Discover, Response = http::Response>, + M::Value: Discover, + ::Service: svc::Service, Response = http::Response>, A: Body, B: Body, { diff --git a/src/proxy/http/client.rs b/src/proxy/http/client.rs index 13b2392ba..c9bd7499e 100644 --- a/src/proxy/http/client.rs +++ b/src/proxy/http/client.rs @@ -304,7 +304,7 @@ where } } -impl svc::NewService for Client +impl svc::Service<()> for Client where C: connect::Connect + Clone + Send + Sync + 'static, C::Future: Send + 'static, @@ -315,20 +315,28 @@ where B: tower_h2::Body + Send + 'static, ::Buf: Send + 'static, { - type Request = ::Request; - type Response = ::Response; - type Error = ::Error; - type InitError = tower_h2::client::ConnectError; - type Service = ClientService; + type Response = ClientService; + type Error = tower_h2::client::ConnectError; type Future = ClientNewServiceFuture; - fn new_service(&self) -> Self::Future { + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + match self.inner { + ClientInner::Http1(_) => { + Ok(().into()) + }, + ClientInner::Http2(ref mut h2) => { + h2.poll_ready() + }, + } + } + + fn call(&mut self, _target: ()) -> Self::Future { let inner = match self.inner { ClientInner::Http1(ref h1) => { ClientNewServiceFutureInner::Http1(Some(h1.clone())) }, - ClientInner::Http2(ref h2) => { - ClientNewServiceFutureInner::Http2(h2.new_service()) + ClientInner::Http2(ref mut h2) => { + ClientNewServiceFutureInner::Http2(h2.call(())) }, }; ClientNewServiceFuture { @@ -370,7 +378,7 @@ where // === impl ClientService === -impl svc::Service for ClientService +impl svc::Service> for ClientService where C: connect::Connect + Send + Sync + 'static, C::Connected: Send, @@ -381,7 +389,6 @@ where B: tower_h2::Body + Send + 'static, ::Buf: Send + 'static, { - type Request = http::Request; type Response = http::Response; type Error = Error; type Future = ClientServiceFuture; @@ -393,7 +400,7 @@ where } } - fn call(&mut self, req: Self::Request) -> Self::Future { + fn call(&mut self, req: http::Request) -> Self::Future { debug!("client request: method={} uri={} version={:?} headers={:?}", req.method(), req.uri(), req.version(), req.headers()); match self.inner { diff --git a/src/proxy/http/glue.rs b/src/proxy/http/glue.rs index f36eed45f..8c154361f 100644 --- a/src/proxy/http/glue.rs +++ b/src/proxy/http/glue.rs @@ -232,7 +232,7 @@ impl HyperServerSvc { impl hyper::service::Service for HyperServerSvc where S: svc::Service< - Request=http::Request, + http::Request, Response=http::Response, >, S::Error: Error + Send + Sync + 'static, @@ -312,13 +312,12 @@ where // ==== impl HttpBodySvc ==== -impl svc::Service for HttpBodySvc +impl svc::Service> for HttpBodySvc where S: svc::Service< - Request=http::Request, + http::Request, >, { - type Request = http::Request; type Response = S::Response; type Error = S::Error; type Future = S::Future; @@ -327,14 +326,14 @@ where self.service.poll_ready() } - fn call(&mut self, req: Self::Request) -> Self::Future { + fn call(&mut self, req: http::Request) -> Self::Future { self.service.call(req.map(|b| HttpBody::Http2(b))) } } impl HttpBodyNewSvc where - N: svc::NewService>, + N: svc::MakeService<(), http::Request>, { pub(in proxy) fn new(new_service: N) -> Self { HttpBodyNewSvc { @@ -343,20 +342,21 @@ where } } -impl svc::NewService for HttpBodyNewSvc +impl svc::Service<()> for HttpBodyNewSvc where - N: svc::NewService>, + N: svc::MakeService<(), http::Request>, { - type Request = http::Request; - type Response = N::Response; - type Error = N::Error; - type Service = HttpBodySvc; - type InitError = N::InitError; + type Response = HttpBodySvc; + type Error = N::MakeError; type Future = HttpBodyNewSvcFuture; - fn new_service(&self) -> Self::Future { + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.new_service.poll_ready() + } + + fn call(&mut self, target: ()) -> Self::Future { HttpBodyNewSvcFuture { - inner: self.new_service.new_service(), + inner: self.new_service.make_service(target), } } } diff --git a/src/proxy/http/header_from_target.rs b/src/proxy/http/header_from_target.rs index b3e070a49..43ab74d9b 100644 --- a/src/proxy/http/header_from_target.rs +++ b/src/proxy/http/header_from_target.rs @@ -35,13 +35,12 @@ where Layer { header } } -impl svc::Layer for Layer +impl svc::Layer for Layer where H: IntoHeaderName + Clone, T: Clone + Send + Sync + 'static, HeaderValue: for<'t> From<&'t T>, M: svc::Stack, - M::Value: svc::Service>, { type Value = as svc::Stack>::Value; type Error = as svc::Stack>::Error; @@ -57,13 +56,12 @@ where // === impl Stack === -impl svc::Stack for Stack +impl svc::Stack for Stack where H: IntoHeaderName + Clone, T: Clone + Send + Sync + 'static, HeaderValue: for<'t> From<&'t T>, M: svc::Stack, - M::Value: svc::Service>, { type Value = Service; type Error = M::Error; @@ -78,12 +76,11 @@ where // === impl Service === -impl svc::Service for Service +impl svc::Service> for Service where H: IntoHeaderName + Clone, - S: svc::Service>, + S: svc::Service>, { - type Request = S::Request; type Response = S::Response; type Error = S::Error; type Future = S::Future; @@ -92,7 +89,7 @@ where self.inner.poll_ready() } - fn call(&mut self, mut req: Self::Request) -> Self::Future { + fn call(&mut self, mut req: http::Request) -> Self::Future { req.headers_mut().insert(self.header.clone(), self.value.clone()); self.inner.call(req) } diff --git a/src/proxy/http/insert_target.rs b/src/proxy/http/insert_target.rs index 37f7ff1f4..e2d5a94ab 100644 --- a/src/proxy/http/insert_target.rs +++ b/src/proxy/http/insert_target.rs @@ -25,11 +25,10 @@ pub fn layer() -> Layer { Layer } -impl svc::Layer for Layer +impl svc::Layer for Layer where T: Clone + Send + Sync + 'static, M: svc::Stack, - M::Value: svc::Service>, { type Value = as svc::Stack>::Value; type Error = as svc::Stack>::Error; @@ -42,11 +41,10 @@ where // === impl Stack === -impl svc::Stack for Stack +impl svc::Stack for Stack where T: Clone + Send + Sync + 'static, M: svc::Stack, - M::Value: svc::Service>, { type Value = Service; type Error = M::Error; @@ -60,12 +58,11 @@ where // === impl Service === -impl svc::Service for Service +impl svc::Service> for Service where T: Clone + Send + Sync + 'static, - S: svc::Service>, + S: svc::Service>, { - type Request = S::Request; type Response = S::Response; type Error = S::Error; type Future = S::Future; @@ -74,7 +71,7 @@ where self.inner.poll_ready() } - fn call(&mut self, mut req: Self::Request) -> Self::Future { + fn call(&mut self, mut req: http::Request) -> Self::Future { req.extensions_mut().insert(self.target.clone()); self.inner.call(req) } diff --git a/src/proxy/http/metrics/classify.rs b/src/proxy/http/metrics/classify.rs index ae7dfbeaa..5ee5b9cc5 100644 --- a/src/proxy/http/metrics/classify.rs +++ b/src/proxy/http/metrics/classify.rs @@ -74,7 +74,7 @@ pub struct Stack { } #[derive(Clone, Debug)] -pub struct Service { +pub struct Service { classify: C, inner: S, } @@ -83,11 +83,10 @@ pub fn layer() -> Layer { Layer() } -impl svc::Layer for Layer +impl svc::Layer for Layer where T: CanClassify, M: svc::Stack, - M::Value: svc::Service, Response = http::Response>, { type Value = as svc::Stack>::Value; type Error = as svc::Stack>::Error; @@ -98,11 +97,10 @@ where } } -impl svc::Stack for Stack +impl svc::Stack for Stack where T: CanClassify, M: svc::Stack, - M::Value: svc::Service, Response = http::Response>, { type Value = Service; type Error = M::Error; @@ -114,12 +112,11 @@ where } } -impl svc::Service for Service +impl svc::Service> for Service where C: Classify, - S: svc::Service, Response = http::Response>, + S: svc::Service, Response = http::Response>, { - type Request = S::Request; type Response = S::Response; type Error = S::Error; type Future = S::Future; diff --git a/src/proxy/http/metrics/service.rs b/src/proxy/http/metrics/service.rs index 8ebb69525..d4f06899c 100644 --- a/src/proxy/http/metrics/service.rs +++ b/src/proxy/http/metrics/service.rs @@ -8,6 +8,7 @@ use std::sync::{Arc, Mutex}; use std::time::Instant; use tokio_timer::clock; use tower_h2; +use tower_grpc; use super::classify::{ClassifyEos, ClassifyResponse}; use super::{ClassMetrics, Metrics, Registry, StatusMetrics}; @@ -42,7 +43,6 @@ where #[derive(Debug)] pub struct Service where - S: svc::Service, C: ClassifyResponse + Clone, C::Class: Hash + Eq, { @@ -51,16 +51,15 @@ where _p: PhantomData C>, } -pub struct ResponseFuture +pub struct ResponseFuture where - S: svc::Service, C: ClassifyResponse, C::Class: Hash + Eq, { classify: Option, metrics: Option>>>, stream_open_at: Instant, - inner: S::Future, + inner: F, } #[derive(Debug)] @@ -116,17 +115,11 @@ where } } -impl svc::Layer for Layer +impl svc::Layer for Layer where T: Clone + Debug, K: Clone + Hash + Eq + From, M: svc::Stack, - M::Value: svc::Service< - Request = http::Request>, - Response = http::Response, - >, - A: tower_h2::Body, - B: tower_h2::Body, C: ClassifyResponse + Clone + Default + Send + Sync + 'static, C::Class: Hash + Eq, { @@ -161,17 +154,11 @@ where } } -impl svc::Stack for Stack +impl svc::Stack for Stack where T: Clone + Debug, K: Clone + Hash + Eq + From, M: svc::Stack, - M::Value: svc::Service< - Request = http::Request>, - Response = http::Response, - >, - A: tower_h2::Body, - B: tower_h2::Body, C: ClassifyResponse + Clone + Default + Send + Sync + 'static, C::Class: Hash + Eq, { @@ -205,7 +192,7 @@ where impl Clone for Service where - S: svc::Service + Clone, + S: Clone, C: ClassifyResponse + Clone + Default + Send + Sync + 'static, C::Class: Hash + Eq, { @@ -218,10 +205,10 @@ where } } -impl svc::Service for Service +impl svc::Service> for Service where S: svc::Service< - Request = http::Request>, + http::Request>, Response = http::Response, >, A: tower_h2::Body, @@ -229,16 +216,15 @@ where C: ClassifyResponse + Clone + Default + Send + Sync + 'static, C::Class: Hash + Eq, { - type Request = http::Request; type Response = http::Response>; type Error = S::Error; - type Future = ResponseFuture; + type Future = ResponseFuture; fn poll_ready(&mut self) -> Poll<(), Self::Error> { self.inner.poll_ready() } - fn call(&mut self, req: Self::Request) -> Self::Future { + fn call(&mut self, req: http::Request) -> Self::Future { let mut req_metrics = self.metrics.clone(); if req.body().is_end_stream() { @@ -271,15 +257,15 @@ where } } -impl Future for ResponseFuture +impl Future for ResponseFuture where - S: svc::Service>, + F: Future>, B: tower_h2::Body, C: ClassifyResponse + Send + Sync + 'static, C::Class: Hash + Eq, { type Item = http::Response>; - type Error = S::Error; + type Error = F::Error; fn poll(&mut self) -> Poll { let rsp = try_ready!(self.inner.poll()); @@ -333,6 +319,26 @@ where } } +impl tower_grpc::Body for RequestBody +where + B: tower_h2::Body, + C: Hash + Eq, +{ + type Data = B::Data; + + fn is_end_stream(&self) -> bool { + ::tower_h2::Body::is_end_stream(self) + } + + fn poll_data(&mut self) -> Poll, tower_grpc::Error> { + ::tower_h2::Body::poll_data(self).map_err(From::from) + } + + fn poll_metadata(&mut self) -> Poll, tower_grpc::Error> { + ::tower_h2::Body::poll_trailers(self).map_err(From::from) + } +} + impl Default for ResponseBody where B: tower_h2::Body + Default, @@ -448,6 +454,27 @@ where } } +impl tower_grpc::Body for ResponseBody +where + B: tower_h2::Body, + C: ClassifyEos, + C::Class: Hash + Eq, +{ + type Data = B::Data; + + fn is_end_stream(&self) -> bool { + ::tower_h2::Body::is_end_stream(self) + } + + fn poll_data(&mut self) -> Poll, tower_grpc::Error> { + ::tower_h2::Body::poll_data(self).map_err(From::from) + } + + fn poll_metadata(&mut self) -> Poll, tower_grpc::Error> { + ::tower_h2::Body::poll_trailers(self).map_err(From::from) + } +} + impl Drop for ResponseBody where B: tower_h2::Body, diff --git a/src/proxy/http/normalize_uri.rs b/src/proxy/http/normalize_uri.rs index 9adf27361..78d5033a6 100644 --- a/src/proxy/http/normalize_uri.rs +++ b/src/proxy/http/normalize_uri.rs @@ -27,11 +27,10 @@ pub fn layer() -> Layer { Layer() } -impl svc::Layer for Layer +impl svc::Layer for Layer where T: ShouldNormalizeUri, M: svc::Stack, - M::Value: svc::Service>, { type Value = as svc::Stack>::Value; type Error = as svc::Stack>::Error; @@ -44,11 +43,10 @@ where // === impl Stack === -impl svc::Stack for Stack +impl svc::Stack for Stack where T: ShouldNormalizeUri, M: svc::Stack, - M::Value: svc::Service>, { type Value = svc::Either, M::Value>; type Error = M::Error; @@ -65,11 +63,10 @@ where // === impl Service === -impl svc::Service for Service +impl svc::Service> for Service where - S: svc::Service>, + S: svc::Service>, { - type Request = S::Request; type Response = S::Response; type Error = S::Error; type Future = S::Future; @@ -78,7 +75,7 @@ where self.inner.poll_ready() } - fn call(&mut self, mut request: S::Request) -> Self::Future { + fn call(&mut self, mut request: http::Request) -> Self::Future { debug_assert!( request.version() != http::Version::HTTP_2, "normalize_uri must only be applied to HTTP/1" diff --git a/src/proxy/http/orig_proto.rs b/src/proxy/http/orig_proto.rs index 53eee4c32..309053e60 100644 --- a/src/proxy/http/orig_proto.rs +++ b/src/proxy/http/orig_proto.rs @@ -22,20 +22,19 @@ pub struct Downgrade { // ==== impl Upgrade ===== -impl From for Upgrade -where - S: svc::Service, Response = http::Response>, -{ - fn from(inner: S) -> Self { +impl Upgrade { + pub fn new(inner: S) -> Self + where + S: svc::Service, Response = http::Response>, + { Self { inner } } } -impl svc::Service for Upgrade +impl svc::Service> for Upgrade where - S: svc::Service, Response = http::Response>, + S: svc::Service, Response = http::Response>, { - type Request = S::Request; type Response = S::Response; type Error = S::Error; type Future = future::Map< @@ -47,7 +46,7 @@ where self.inner.poll_ready() } - fn call(&mut self, mut req: Self::Request) -> Self::Future { + fn call(&mut self, mut req: http::Request) -> Self::Future { if req.version() == http::Version::HTTP_2 || h1::wants_upgrade(&req) { // Just passing through... return self.inner.call(req).map(|res| res) @@ -106,21 +105,20 @@ where // ===== impl Downgrade ===== -impl From for Downgrade -where - S: svc::Service, Response = http::Response>, -{ - fn from(inner: S) -> Self { +impl Downgrade { + pub fn new(inner: S) -> Self + where + S: svc::Service, Response = http::Response>, + { Self { inner } } } -impl svc::Service for Downgrade +impl svc::Service> for Downgrade where - S: svc::Service, Response = http::Response>, + S: svc::Service, Response = http::Response>, { - type Request = S::Request; type Response = S::Response; type Error = S::Error; type Future = future::Map< @@ -132,7 +130,7 @@ where self.inner.poll_ready() } - fn call(&mut self, mut req: Self::Request) -> Self::Future { + fn call(&mut self, mut req: http::Request) -> Self::Future { let mut upgrade_response = false; if req.version() == http::Version::HTTP_2 { diff --git a/src/proxy/http/profiles.rs b/src/proxy/http/profiles.rs index 60a691f8a..0bddc6e3c 100644 --- a/src/proxy/http/profiles.rs +++ b/src/proxy/http/profiles.rs @@ -189,7 +189,6 @@ pub mod router { ::Output, svc::shared::Stack, > + Clone, - R::Value: svc::Service, { Layer { suffixes, @@ -228,7 +227,6 @@ pub mod router { where T: WithRoute, R: svc::Stack, - R::Value: svc::Service, { target: T, stack: R, @@ -259,7 +257,6 @@ pub mod router { ::Output, svc::shared::Stack, > + Clone, - R::Value: svc::Service, { type Value = as svc::Stack>::Value; type Error = as svc::Stack>::Error; @@ -287,7 +284,6 @@ pub mod router { ::Output, svc::shared::Stack, > + Clone, - R::Value: svc::Service, { type Value = Service; type Error = Error; @@ -332,7 +328,6 @@ pub mod router { G: Stream, T: WithRoute + Clone, R: svc::Stack + Clone, - R::Value: svc::Service, { fn update_routes(&mut self, mut routes: Routes) { self.routes = Vec::with_capacity(routes.len()); @@ -352,17 +347,16 @@ pub mod router { } } - impl svc::Service for Service + impl svc::Service> for Service where G: Stream, T: WithRoute + Clone, R: svc::Stack + Clone, - R::Value: svc::Service>, + R::Value: svc::Service>, { - type Request = ::Request; - type Response = ::Response; - type Error = ::Error; - type Future = ::Future; + type Response = >>::Response; + type Error = >>::Error; + type Future = >>::Future; fn poll_ready(&mut self) -> Poll<(), Self::Error> { while let Some(Async::Ready(Some(routes))) = self.poll_route_stream() { @@ -372,7 +366,7 @@ pub mod router { Ok(Async::Ready(())) } - fn call(&mut self, req: Self::Request) -> Self::Future { + fn call(&mut self, req: http::Request) -> Self::Future { for (ref condition, ref mut service) in &mut self.routes { if condition.is_match(&req) { trace!("using configured route: {:?}", condition); diff --git a/src/proxy/http/router.rs b/src/proxy/http/router.rs index 7889b7cc7..4cab2f313 100644 --- a/src/proxy/http/router.rs +++ b/src/proxy/http/router.rs @@ -43,7 +43,7 @@ pub struct Service where Rec: Recognize, Stk: svc::Stack, - Stk::Value: svc::Service, + Stk::Value: svc::Service, { inner: Router, } @@ -88,8 +88,8 @@ impl svc::Layer for Layer where Rec: Recognize + Clone + Send + Sync + 'static, Stk: svc::Stack + Clone + Send + Sync + 'static, - Stk::Value: svc::Service>, - ::Error: error::Error, + Stk::Value: svc::Service>, + >::Error: error::Error, Stk::Error: fmt::Debug, B: Default + Send + 'static, { @@ -112,8 +112,8 @@ impl svc::Stack for Stack where Rec: Recognize + Clone + Send + Sync + 'static, Stk: svc::Stack + Clone + Send + Sync + 'static, - Stk::Value: svc::Service>, - ::Error: error::Error, + Stk::Value: svc::Service>, + >::Error: error::Error, Stk::Error: fmt::Debug, B: Default + Send + 'static, { @@ -160,19 +160,18 @@ where // === impl Service === -impl svc::Service for Service +impl svc::Service for Service where Rec: Recognize + Send + Sync + 'static, Stk: svc::Stack + Send + Sync + 'static, - Stk::Value: svc::Service>, - ::Error: error::Error, + Stk::Value: svc::Service>, + >::Error: error::Error, Stk::Error: fmt::Debug, B: Default + Send + 'static, { - type Request = as svc::Service>::Request; - type Response = as svc::Service>::Response; + type Response = as svc::Service>::Response; type Error = h2::Error; - type Future = ResponseFuture< as svc::Service>::Future>; + type Future = ResponseFuture< as svc::Service>::Future>; fn poll_ready(&mut self) -> Poll<(), Self::Error> { self.inner.poll_ready().map_err(|e| { @@ -181,7 +180,7 @@ where }) } - fn call(&mut self, request: Self::Request) -> Self::Future { + fn call(&mut self, request: Req) -> Self::Future { trace!("routing..."); let inner = self.inner.call(request); ResponseFuture { inner } @@ -192,7 +191,7 @@ impl Clone for Service where Rec: Recognize, Stk: svc::Stack, - Stk::Value: svc::Service, + Stk::Value: svc::Service, Router: Clone, { fn clone(&self) -> Self { diff --git a/src/proxy/http/settings.rs b/src/proxy/http/settings.rs index 942679367..e3ac7775e 100644 --- a/src/proxy/http/settings.rs +++ b/src/proxy/http/settings.rs @@ -95,16 +95,16 @@ pub mod router { fn connect(&self) -> connect::Target; } - #[derive(Clone, Debug)] - pub struct Layer(PhantomData); + #[derive(Debug)] + pub struct Layer(PhantomData<(T, fn(B))>); - #[derive(Clone, Debug)] - pub struct Stack(M); + #[derive(Debug)] + pub struct Stack(M, PhantomData); pub struct Service where M: svc::Stack, - M::Value: svc::Service>, + M::Value: svc::Service>, { router: Router, } @@ -112,9 +112,9 @@ pub mod router { pub struct ResponseFuture where M: svc::Stack, - M::Value: svc::Service>, + M::Value: svc::Service>, { - inner: as svc::Service>::Future + inner: as svc::Service>>::Future } #[derive(Debug)] @@ -127,30 +127,42 @@ pub mod router { type Router = rt::Router, Recognize, M>; - pub fn layer() -> Layer { + pub fn layer() -> Layer { Layer(PhantomData) } - impl svc::Layer for Layer - where - T: HasConnect, - M: svc::Stack + Clone, - M::Value: svc::Service>, - { - type Value = as svc::Stack>::Value; - type Error = as svc::Stack>::Error; - type Stack = Stack; - - fn bind(&self, inner: M) -> Self::Stack { - Stack(inner) + impl Clone for Layer { + fn clone(&self) -> Self { + Layer(PhantomData) } } - impl svc::Stack for Stack + impl svc::Layer for Layer where T: HasConnect, M: svc::Stack + Clone, - M::Value: svc::Service>, + M::Value: svc::Service>, + { + type Value = as svc::Stack>::Value; + type Error = as svc::Stack>::Error; + type Stack = Stack; + + fn bind(&self, inner: M) -> Self::Stack { + Stack(inner, PhantomData) + } + } + + impl Clone for Stack { + fn clone(&self) -> Self { + Stack(self.0.clone(), PhantomData) + } + } + + impl svc::Stack for Stack + where + T: HasConnect, + M: svc::Stack + Clone, + M::Value: svc::Service>, { type Value = Service; type Error = M::Error; @@ -179,14 +191,13 @@ pub mod router { } } - impl svc::Service for Service + impl svc::Service> for Service where M: svc::Stack, - M::Value: svc::Service>, + M::Value: svc::Service>, { - type Request = as svc::Service>::Request; - type Response = as svc::Service>::Response; - type Error = Error<::Error, M::Error>; + type Response = as svc::Service>>::Response; + type Error = Error<>>::Error, M::Error>; type Future = ResponseFuture; fn poll_ready(&mut self) -> Poll<(), Self::Error> { @@ -200,7 +211,7 @@ pub mod router { } } - fn call(&mut self, req: Self::Request) -> Self::Future { + fn call(&mut self, req: http::Request) -> Self::Future { ResponseFuture { inner: self.router.call(req) } } } @@ -208,10 +219,10 @@ pub mod router { impl Future for ResponseFuture where M: svc::Stack, - M::Value: svc::Service>, + M::Value: svc::Service>, { - type Item = as svc::Service>::Response; - type Error = Error<::Error, M::Error>; + type Item = as svc::Service>>::Response; + type Error = Error<>>::Error, M::Error>; fn poll(&mut self) -> Poll { match self.inner.poll() { diff --git a/src/proxy/limit.rs b/src/proxy/limit.rs index 1a25dac2c..bff976a28 100644 --- a/src/proxy/limit.rs +++ b/src/proxy/limit.rs @@ -1,59 +1,80 @@ extern crate tower_in_flight_limit; -use std::fmt; +use std::{fmt, marker::PhantomData}; pub use self::tower_in_flight_limit::InFlightLimit; use svc; /// Wraps `Service` stacks with an `InFlightLimit`. -#[derive(Clone, Debug)] -pub struct Layer { +#[derive(Debug)] +pub struct Layer { max_in_flight: usize, + _marker: PhantomData, } /// Produces `Services` wrapped with an `InFlightLimit`. -#[derive(Clone, Debug)] -pub struct Stack { - max_in_flight: usize, +#[derive(Debug)] +pub struct Stack { inner: M, + max_in_flight: usize, + _marker: PhantomData, } // === impl Layer === -pub fn layer(max_in_flight: usize) -> Layer { - Layer { max_in_flight } +pub fn layer(max_in_flight: usize) -> Layer { + Layer { + max_in_flight, + _marker: PhantomData, + } } -impl svc::Layer for Layer +impl Clone for Layer { + fn clone(&self) -> Self { + Layer { + max_in_flight: self.max_in_flight, + _marker: PhantomData, + } + } +} + +impl svc::Layer for Layer where T: fmt::Display + Clone + Send + Sync + 'static, M: svc::Stack, - M::Value: svc::Service + Send + 'static, - ::Request: Send, - ::Future: Send, + M::Value: svc::Service, { - type Value = as svc::Stack>::Value; - type Error = as svc::Stack>::Error; - type Stack = Stack; + type Value = as svc::Stack>::Value; + type Error = as svc::Stack>::Error; + type Stack = Stack; fn bind(&self, inner: M) -> Self::Stack { Stack { inner, max_in_flight: self.max_in_flight, + _marker: PhantomData, } } } // === impl Stack === -impl svc::Stack for Stack +impl Clone for Stack { + fn clone(&self) -> Self { + Stack { + inner: self.inner.clone(), + max_in_flight: self.max_in_flight, + _marker: PhantomData, + } + } +} + +impl svc::Stack for Stack where T: fmt::Display + Clone + Send + Sync + 'static, M: svc::Stack, - M::Value: svc::Service + Send + 'static, - ::Request: Send, - ::Future: Send, + M::Value: svc::Service, { type Value = InFlightLimit; type Error = M::Error; diff --git a/src/proxy/reconnect.rs b/src/proxy/reconnect.rs index 5f05f9da3..2417a61a7 100644 --- a/src/proxy/reconnect.rs +++ b/src/proxy/reconnect.rs @@ -1,5 +1,6 @@ extern crate tower_reconnect; + use futures::{task, Async, Future, Poll}; use std::fmt; use std::time::Duration; @@ -26,9 +27,9 @@ pub struct Stack { pub struct Service where T: fmt::Debug, - N: svc::NewService, + N: svc::Service<()>, { - inner: Reconnect, + inner: Reconnect, /// The target, used for debug logging. target: T, @@ -48,8 +49,8 @@ enum Backoff { Fixed(Duration), } -pub struct ResponseFuture { - inner: as svc::Service>::Future, +pub struct ResponseFuture { + inner: F, } // === impl Layer === @@ -73,7 +74,7 @@ impl svc::Layer for Layer where T: Clone + fmt::Debug, M: svc::Stack, - M::Value: svc::NewService, + M::Value: svc::Service<()>, { type Value = as svc::Stack>::Value; type Error = as svc::Stack>::Error; @@ -93,7 +94,7 @@ impl svc::Stack for Stack where T: Clone + fmt::Debug, M: svc::Stack, - M::Value: svc::NewService, + M::Value: svc::Service<()>, { type Value = Service; type Error = M::Error; @@ -101,7 +102,7 @@ where fn make(&self, target: &T) -> Result { let new_service = self.inner.make(target)?; Ok(Service { - inner: Reconnect::new(new_service), + inner: Reconnect::new(new_service, ()), target: target.clone(), backoff: self.backoff.clone(), active_backoff: None, @@ -115,12 +116,12 @@ where #[cfg(test)] impl Service<&'static str, N> where - N: svc::NewService, - N::InitError: fmt::Display, + N: svc::Service<()>, + N::Error: fmt::Display, { fn for_test(new_service: N) -> Self { Self { - inner: Reconnect::new(new_service), + inner: Reconnect::new(new_service, ()), target: "test", backoff: Backoff::None, active_backoff: None, @@ -136,16 +137,16 @@ where } } -impl svc::Service for Service +impl svc::Service for Service where T: fmt::Debug, - N: svc::NewService, - N::InitError: fmt::Display, + N: svc::Service<(), Response=S>, + N::Error: fmt::Display, + S: svc::Service, { - type Request = N::Request; - type Response = N::Response; - type Error = N::Error; - type Future = ResponseFuture; + type Response = S::Response; + type Error = S::Error; + type Future = ResponseFuture< as svc::Service>::Future>; fn poll_ready(&mut self) -> Poll<(), Self::Error> { match self.backoff { @@ -171,7 +172,7 @@ where Ok(ready) } - Err(Error::Inner(err)) => { + Err(Error::Service(err)) => { self.mute_connect_error_log = false; Err(err) } @@ -214,14 +215,18 @@ where } } - fn call(&mut self, request: Self::Request) -> Self::Future { + fn call(&mut self, request: Req) -> Self::Future { ResponseFuture { inner: self.inner.call(request), } } } -impl fmt::Debug for Service { +impl fmt::Debug for Service +where + T: fmt::Debug, + N: svc::Service<()>, +{ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_struct("Reconnect") .field("target", &self.target) @@ -229,13 +234,16 @@ impl fmt::Debug for Service { } } -impl Future for ResponseFuture { - type Item = N::Response; - type Error = N::Error; +impl Future for ResponseFuture +where + F: Future>, +{ + type Item = F::Item; + type Error = E; fn poll(&mut self) -> Poll { self.inner.poll().map_err(|e| match e { - Error::Inner(err) => err, + Error::Service(err) => err, _ => unreachable!("response future must fail with inner error"), }) } @@ -263,23 +271,23 @@ mod tests { #[derive(Debug)] struct InitErr {} - impl svc::NewService for NewService { - type Request = (); - type Response = (); - type Error = (); - type Service = Service; - type InitError = InitErr; + impl svc::Service<()> for NewService { + type Response = Service; + type Error = InitErr; type Future = InitFuture; - fn new_service(&self) -> Self::Future { + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(().into()) + } + + fn call(&mut self, _: ()) -> Self::Future { InitFuture { should_fail: self.fails.fetch_sub(1, Relaxed) > 0, } } } - impl svc::Service for Service { - type Request = (); + impl svc::Service<()> for Service { type Response = (); type Error = (); type Future = future::FutureResult<(), ()>; diff --git a/src/proxy/resolve.rs b/src/proxy/resolve.rs index e1fcf65fe..4d17e1287 100644 --- a/src/proxy/resolve.rs +++ b/src/proxy/resolve.rs @@ -65,7 +65,6 @@ where R: Resolve + Clone, R::Endpoint: fmt::Debug, M: svc::Stack + Clone, - M::Value: svc::Service, { type Value = as svc::Stack>::Value; type Error = as svc::Stack>::Error; @@ -86,7 +85,6 @@ where R: Resolve, R::Endpoint: fmt::Debug, M: svc::Stack + Clone, - M::Value: svc::Service, { type Value = Discover; type Error = M::Error; @@ -102,21 +100,17 @@ where // === impl Discover === -impl tower_discover::Discover for Discover +impl tower_discover::Discover for Discover where R: Resolution, R::Endpoint: fmt::Debug, M: svc::Stack, - M::Value: svc::Service, { type Key = SocketAddr; - type Request = ::Request; - type Response = ::Response; - type Error = ::Error; type Service = M::Value; - type DiscoverError = Error; + type Error = Error; - fn poll(&mut self) -> Poll, Self::DiscoverError> { + fn poll(&mut self) -> Poll, Self::Error> { loop { let up = try_ready!(self.resolution.poll().map_err(Error::Resolve)); trace!("watch: {:?}", up); diff --git a/src/proxy/server.rs b/src/proxy/server.rs index fb6168a15..f2ce3bede 100644 --- a/src/proxy/server.rs +++ b/src/proxy/server.rs @@ -10,7 +10,7 @@ use tower_h2; use Conditional; use drain; use never::Never; -use svc::{Stack, Service, stack::StackNewService}; +use svc::{Stack, Service, stack::StackMakeService}; use transport::{connect, tls, Connection, GetOriginalDst, Peek}; use proxy::http::glue::{HttpBody, HttpBodyNewSvc, HyperServerSvc}; use proxy::protocol::Protocol; @@ -53,7 +53,7 @@ where // Prepares a route for each accepted HTTP connection. R: Stack + Clone, R::Value: Service< - Request = http::Request, + http::Request, Response = http::Response, >, B: tower_h2::Body, @@ -193,12 +193,12 @@ where ::Error: fmt::Debug + 'static, R: Stack + Clone, R::Value: Service< - Request = http::Request, + http::Request, Response = http::Response, >, R::Value: 'static, - ::Error: error::Error + Send + Sync + 'static, - ::Future: Send + 'static, + >>::Error: error::Error + Send + Sync + 'static, + >>::Future: Send + 'static, B: tower_h2::Body + Default + Send + 'static, B::Data: Send, ::Buf: Send, @@ -327,8 +327,8 @@ where }), Protocol::Http2 => Either::B({ trace!("detected HTTP/2"); - let new_service = StackNewService::new(route, source.clone()); - let h2 = tower_h2::Server::new( + let new_service = StackMakeService::new(route, source.clone()); + let mut h2 = tower_h2::Server::new( HttpBodyNewSvc::new(new_service), h2_settings, log_clone.executor(), diff --git a/src/svc.rs b/src/svc.rs index efb430346..cd67cdc6a 100644 --- a/src/svc.rs +++ b/src/svc.rs @@ -1,7 +1,7 @@ pub extern crate linkerd2_stack as stack; extern crate tower_service; -pub use self::tower_service::{NewService, Service}; +pub use self::tower_service::{MakeService, Service}; pub use self::stack::{ shared, diff --git a/src/tap/service.rs b/src/tap/service.rs index 8eea04277..9acecb546 100644 --- a/src/tap/service.rs +++ b/src/tap/service.rs @@ -37,10 +37,7 @@ where /// A middleware that records HTTP taps. #[derive(Clone, Debug)] -pub struct Service -where - S: svc::Service, -{ +pub struct Service { endpoint: event::Endpoint, next_id: NextId, taps: Arc>, @@ -48,11 +45,8 @@ where } #[derive(Debug, Clone)] -pub struct ResponseFuture -where - S: svc::Service, -{ - inner: S::Future, +pub struct ResponseFuture { + inner: F, meta: Option, taps: Option>>, request_open_at: Instant, @@ -86,8 +80,8 @@ pub fn layer(next_id: NextId, taps: Arc>) -> Layer where T: Clone + Into, M: svc::Stack, - M::Value: svc::Service>, Response = http::Response>, - ::Error: HasH2Reason, + M::Value: svc::Service>, Response = http::Response>, + >>>::Error: HasH2Reason, A: Body, B: Body, { @@ -98,17 +92,10 @@ where } } -impl svc::Layer for Layer +impl svc::Layer for Layer where T: Clone + Into, M: svc::Stack, - M::Value: svc::Service< - Request = http::Request>, - Response = http::Response, - >, - ::Error: HasH2Reason, - A: Body, - B: Body, { type Value = as svc::Stack>::Value; type Error = M::Error; @@ -126,17 +113,10 @@ where // === Stack === -impl svc::Stack for Stack +impl svc::Stack for Stack where T: Clone + Into, M: svc::Stack, - M::Value: svc::Service< - Request = http::Request>, - Response = http::Response, - >, - ::Error: HasH2Reason, - A: Body, - B: Body, { type Value = Service; type Error = M::Error; @@ -154,26 +134,25 @@ where // === Service === -impl svc::Service for Service +impl svc::Service> for Service where S: svc::Service< - Request = http::Request>, + http::Request>, Response = http::Response, >, S::Error: HasH2Reason, A: Body, B: Body, { - type Request = http::Request; type Response = http::Response>; type Error = S::Error; - type Future = ResponseFuture; + type Future = ResponseFuture; fn poll_ready(&mut self) -> Poll<(), Self::Error> { self.inner.poll_ready() } - fn call(&mut self, req: Self::Request) -> Self::Future { + fn call(&mut self, req: http::Request) -> Self::Future { let request_open_at = clock::now(); // Only tap a request iff a `Source` is known. @@ -222,14 +201,14 @@ where } } -impl Future for ResponseFuture +impl Future for ResponseFuture where B: Body, - S: svc::Service>, - S::Error: HasH2Reason, + F: Future>, + F::Error: HasH2Reason, { type Item = http::Response>; - type Error = S::Error; + type Error = F::Error; fn poll(&mut self) -> Poll { let rsp = try_ready!(self.inner.poll().map_err(|e| self.tap_err(e))); @@ -263,13 +242,13 @@ where } } -impl ResponseFuture +impl ResponseFuture where B: Body, - S: svc::Service>, - S::Error: HasH2Reason, + F: Future>, + F::Error: HasH2Reason, { - fn tap_err(&mut self, e: S::Error) -> S::Error { + fn tap_err(&mut self, e: F::Error) -> F::Error { if let Some(request) = self.meta.take() { let meta = event::Response { request, diff --git a/tests/support/client.rs b/tests/support/client.rs index 5a6f8116e..6e0b40b47 100644 --- a/tests/support/client.rs +++ b/tests/support/client.rs @@ -171,13 +171,13 @@ fn run(addr: SocketAddr, version: Run) -> (Sender, Running) { .map_err(|e| println!("client error: {:?}", e))) }, Run::Http2 => { - let connect = tower_h2::client::Connect::new( + let mut connect = tower_h2::client::Connect::new( conn, Default::default(), LazyExecutor, ); - Box::new(connect.new_service() + Box::new(connect.call(()) .map_err(move |err| println!("connect error ({:?}): {:?}", addr, err)) .and_then(move |mut h2| { rx.for_each(move |(req, cb)| { diff --git a/tests/support/controller.rs b/tests/support/controller.rs index f6c7fdcfc..abd2fad10 100644 --- a/tests/support/controller.rs +++ b/tests/support/controller.rs @@ -99,11 +99,15 @@ impl Controller { } } +fn grpc_internal_code() -> grpc::Error { + grpc::Error::Grpc(grpc::Status::with_code(grpc::Code::Internal), HeaderMap::new()) +} + impl Stream for DstReceiver { type Item = pb::Update; type Error = grpc::Error; fn poll(&mut self) -> Poll, Self::Error> { - self.0.poll().map_err(|_| grpc::Error::Grpc(grpc::Status::INTERNAL, HeaderMap::new())) + self.0.poll().map_err(|_| grpc_internal_code()) } } @@ -129,7 +133,7 @@ impl Stream for ProfileReceiver { type Item = pb::DestinationProfile; type Error = grpc::Error; fn poll(&mut self) -> Poll, Self::Error> { - self.0.poll().map_err(|_| grpc::Error::Grpc(grpc::Status::INTERNAL, HeaderMap::new())) + self.0.poll().map_err(|_| grpc_internal_code()) } } @@ -154,7 +158,7 @@ impl pb::server::Destination for Controller { } } - future::err(grpc::Error::Grpc(grpc::Status::INTERNAL, HeaderMap::new())) + future::err(grpc_internal_code()) } type GetProfileStream = ProfileReceiver; @@ -171,7 +175,7 @@ impl pb::server::Destination for Controller { } } - future::err(grpc::Error::Grpc(grpc::Status::INTERNAL, HeaderMap::new())) + future::err(grpc_internal_code()) } } @@ -212,7 +216,7 @@ fn run(controller: Controller, delay: Option + Sen } let serve = bind.incoming() - .fold(h2, |h2, sock| { + .fold(h2, |mut h2, sock| { if let Err(e) = sock.set_nodelay(true) { return Err(e); } diff --git a/tests/support/mod.rs b/tests/support/mod.rs index 59ad273a6..a704aaa59 100644 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -44,7 +44,7 @@ use self::tokio::{ use self::tokio_connect::Connect; use self::tower_h2::{Body, RecvBody}; use self::tower_grpc as grpc; -use self::tower_service::{NewService, Service}; +use self::tower_service::{Service}; /// Environment variable for overriding the test patience. pub const ENV_TEST_PATIENCE_MS: &'static str = "RUST_TEST_PATIENCE_MS"; diff --git a/tests/support/server.rs b/tests/support/server.rs index 1e7428c30..ce0112f74 100644 --- a/tests/support/server.rs +++ b/tests/support/server.rs @@ -152,9 +152,9 @@ impl Server { let mut runtime = runtime::current_thread::Runtime::new() .expect("initialize support server runtime"); - let new_svc = NewSvc(Arc::new(self.routes)); + let mut new_svc = NewSvc(Arc::new(self.routes)); - let srv: Box Box>> = match self.version { + let srv: Box Box>> = match self.version { Run::Http1 => { let mut h1 = hyper::server::conn::Http::new(); h1.http1_only(true); @@ -162,7 +162,7 @@ impl Server { Box::new(move |sock| { let h1_clone = h1.clone(); let srv_conn_count = Arc::clone(&srv_conn_count); - let conn = new_svc.new_service() + let conn = new_svc.call(()) .inspect(move |_| { srv_conn_count.fetch_add(1, Ordering::Release); }) @@ -176,7 +176,7 @@ impl Server { }) }, Run::Http2 => { - let h2 = tower_h2::Server::new( + let mut h2 = tower_h2::Server::new( new_svc, Default::default(), LazyExecutor, @@ -203,7 +203,7 @@ impl Server { } let serve = bind.incoming() - .fold(srv, move |srv, sock| { + .fold(srv, move |mut srv, sock| { if let Err(e) = sock.set_nodelay(true) { return Err(e); } @@ -322,8 +322,7 @@ impl Svc { } } -impl Service for Svc { - type Request = Request; +impl Service> for Svc { type Response = Response; type Error = h2::Error; type Future = Box + Send>; @@ -332,7 +331,7 @@ impl Service for Svc { Ok(Async::Ready(())) } - fn call(&mut self, req: Self::Request) -> Self::Future { + fn call(&mut self, req: Request) -> Self::Future { let req = req.map(|body| { assert!(body.is_end_stream(), "h2 test server doesn't support request bodies yet"); Box::new(futures::stream::empty()) as ReqBody @@ -373,15 +372,16 @@ impl hyper::service::Service for Svc { #[derive(Debug)] struct NewSvc(Arc>); -impl NewService for NewSvc { - type Request = Request; - type Response = Response; - type Error = h2::Error; - type InitError = ::std::io::Error; - type Service = Svc; - type Future = future::FutureResult; +impl Service<()> for NewSvc { + type Response = Svc; + type Error = ::std::io::Error; + type Future = future::FutureResult; - fn new_service(&self) -> Self::Future { + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(Async::Ready(())) + } + + fn call(&mut self, _: ()) -> Self::Future { future::ok(Svc(Arc::clone(&self.0))) } }