From 1e9ff8be0326a67baa130f3df19c449b2cf3822d Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 23 Jan 2018 16:14:07 -0800 Subject: [PATCH] proxy: add transparent protocol detection and handling The proxy will now try to detect what protocol new connections are using, and route them accordingly. Specifically: - HTTP/2 stays the same. - HTTP/1 is now accepted, and will try to send an HTTP/1 request to the target. - If neither HTTP/1 nor 2, assume a TCP stream and simply forward between the source and destination. * tower-h2: fix Server Clone bounds * proxy: implement Async{Read,Write} extra methods for Connection Closes #130 Closes #131 --- Cargo.lock | 250 ++++++++++++++++--- proxy/Cargo.toml | 2 + proxy/src/bind.rs | 70 ++++-- proxy/src/connection.rs | 58 ++++- proxy/src/control/discovery.rs | 22 +- proxy/src/control/pb.rs | 2 +- proxy/src/control/telemetry.rs | 2 +- proxy/src/ctx/transport.rs | 80 +++++- proxy/src/inbound.rs | 121 ++------- proxy/src/lib.rs | 92 ++++--- proxy/src/main.rs | 2 +- proxy/src/map_err.rs | 4 +- proxy/src/outbound.rs | 33 ++- proxy/src/telemetry/tap/match_.rs | 2 +- proxy/src/transparency/client.rs | 215 ++++++++++++++++ proxy/src/transparency/glue.rs | 333 +++++++++++++++++++++++++ proxy/src/transparency/h1.rs | 99 ++++++++ proxy/src/transparency/mod.rs | 10 + proxy/src/transparency/protocol.rs | 45 ++++ proxy/src/transparency/server.rs | 160 ++++++++++++ proxy/src/transparency/tcp.rs | 84 +++++++ proxy/src/transport/mod.rs | 2 +- proxy/src/transport/so_original_dst.rs | 36 ++- proxy/tests/discovery.rs | 26 +- proxy/tests/support/client.rs | 185 ++++++++++---- proxy/tests/support/mod.rs | 13 +- proxy/tests/support/proxy.rs | 45 +++- proxy/tests/support/server.rs | 201 +++++++++++---- proxy/tests/support/tcp.rs | 190 ++++++++++++++ proxy/tests/transparency.rs | 290 +++++++++++++++++++++ tower-h2/src/server/mod.rs | 18 +- 31 files changed, 2338 insertions(+), 354 deletions(-) create mode 100644 proxy/src/transparency/client.rs create mode 100644 proxy/src/transparency/glue.rs create mode 100644 proxy/src/transparency/h1.rs create mode 100644 proxy/src/transparency/mod.rs create mode 100644 proxy/src/transparency/protocol.rs create mode 100644 proxy/src/transparency/server.rs create mode 100644 proxy/src/transparency/tcp.rs create mode 100644 proxy/tests/support/tcp.rs create mode 100644 proxy/tests/transparency.rs diff --git a/Cargo.lock b/Cargo.lock index b0e42f866..49d764b67 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -40,11 +40,25 @@ dependencies = [ "libc 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "base64" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "safemem 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "bitflags" version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "bitflags" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "byteorder" version = "1.1.0" @@ -115,10 +129,12 @@ dependencies = [ "futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", "futures-mpsc-lossy 0.1.3", "h2 0.1.0 (git+https://github.com/carllerche/h2)", - "http 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "http 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "httparse 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)", + "hyper 0.11.15 (registry+https://github.com/rust-lang/crates.io-index)", "ipnet 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "ns-dns-tokio 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "ordermap 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)", "prost 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", @@ -126,7 +142,7 @@ dependencies = [ "prost-types 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", "quickcheck 0.4.2 (git+https://github.com/BurntSushi/quickcheck?rev=a1658ce)", "tokio-connect 0.1.0 (git+https://github.com/carllerche/tokio-connect)", - "tokio-core 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "tower 0.1.0 (git+https://github.com/tower-rs/tower)", "tower-balance 0.1.0 (git+https://github.com/tower-rs/tower)", @@ -185,7 +201,7 @@ dependencies = [ "byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-core 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -203,7 +219,7 @@ name = "env_logger" version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "regex 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -242,6 +258,15 @@ dependencies = [ "fuchsia-zircon-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "fuchsia-zircon" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bitflags 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", + "fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "fuchsia-zircon-sys" version = "0.2.0" @@ -250,6 +275,11 @@ dependencies = [ "bitflags 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "fuchsia-zircon-sys" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "futures" version = "0.1.17" @@ -263,6 +293,15 @@ dependencies = [ "futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "futures-cpupool" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "futures-mpsc-lossy" version = "0.1.3" @@ -279,8 +318,8 @@ dependencies = [ "bytes 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", - "http 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", + "http 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "ordermap 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)", "slab 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "string 0.1.0 (git+https://github.com/carllerche/string)", @@ -297,13 +336,42 @@ dependencies = [ [[package]] name = "http" -version = "0.1.1" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "bytes 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "httparse" +version = "1.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "hyper" +version = "0.11.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "base64 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", + "bytes 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "http 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "httparse 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)", + "language-tags 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", + "mime 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", + "percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", + "relay 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "time 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-proto 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "unicase 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "idna" version = "0.1.4" @@ -358,6 +426,11 @@ dependencies = [ "winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "language-tags" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "lazy_static" version = "0.2.10" @@ -365,7 +438,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "lazycell" -version = "0.5.1" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] @@ -386,8 +459,19 @@ dependencies = [ [[package]] name = "log" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "log" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", +] [[package]] name = "matches" @@ -402,6 +486,14 @@ dependencies = [ "libc 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "mime" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "unicase 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "miniz-sys" version = "0.1.10" @@ -413,16 +505,16 @@ dependencies = [ [[package]] name = "mio" -version = "0.6.11" +version = "0.6.12" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "fuchsia-zircon 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", - "fuchsia-zircon-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", + "fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "iovec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", - "lazycell 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", + "lazycell 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "net2 0.2.31 (registry+https://github.com/rust-lang/crates.io-index)", "slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -475,7 +567,7 @@ dependencies = [ "abstract-ns 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "domain 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-core 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -510,6 +602,14 @@ name = "num-traits" version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "num_cpus" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "openssl-probe" version = "0.1.1" @@ -579,7 +679,7 @@ dependencies = [ "env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "heck 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "itertools 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "multimap 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "petgraph 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", "prost 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", @@ -620,7 +720,7 @@ version = "0.4.2" source = "git+https://github.com/BurntSushi/quickcheck?rev=a1658ce#a1658ce9fc9ab41fd3aa1faeaa326fcf28dfcd45" dependencies = [ "env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -660,11 +760,24 @@ name = "regex-syntax" version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "relay" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "rustc-demangle" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "safemem" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "scoped-tls" version = "0.1.0" @@ -715,6 +828,11 @@ name = "slab" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "smallvec" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "socket2" version = "0.2.4" @@ -750,6 +868,11 @@ dependencies = [ "unicode-xid 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "take" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "tempdir" version = "0.3.5" @@ -789,14 +912,14 @@ dependencies = [ [[package]] name = "tokio-core" -version = "0.1.10" +version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "bytes 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", "iovec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", - "mio 0.6.11 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.12 (registry+https://github.com/rust-lang/crates.io-index)", "scoped-tls 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "slab 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", @@ -809,7 +932,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "bytes 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "tokio-proto" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", + "net2 0.2.31 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)", + "slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "smallvec 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "take 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "tokio-service" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -857,12 +1005,12 @@ dependencies = [ "env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", "h2 0.1.0 (git+https://github.com/carllerche/h2)", - "http 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", + "http 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "prost 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", "prost-derive 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-connect 0.1.0 (git+https://github.com/carllerche/tokio-connect)", - "tokio-core 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", "tower 0.1.0 (git+https://github.com/tower-rs/tower)", "tower-h2 0.1.3", "tower-router 0.1.0 (git+https://github.com/tower-rs/tower)", @@ -883,13 +1031,13 @@ dependencies = [ "bytes 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "prost 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", "prost-derive 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.21 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.21 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-core 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", "tower 0.1.0 (git+https://github.com/tower-rs/tower)", "tower-grpc 0.1.3", "tower-grpc-build 0.1.3", @@ -904,11 +1052,11 @@ dependencies = [ "env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", "h2 0.1.0 (git+https://github.com/carllerche/h2)", - "http 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", + "http 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "string 0.1.0 (git+https://github.com/carllerche/string)", "tokio-connect 0.1.0 (git+https://github.com/carllerche/tokio-connect)", - "tokio-core 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "tower 0.1.0 (git+https://github.com/tower-rs/tower)", ] @@ -950,6 +1098,14 @@ dependencies = [ "tower 0.1.0 (git+https://github.com/tower-rs/tower)", ] +[[package]] +name = "unicase" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "version_check 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "unicode-bidi" version = "0.3.4" @@ -1001,6 +1157,11 @@ name = "vcpkg" version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "version_check" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "void" version = "1.0.2" @@ -1042,7 +1203,9 @@ dependencies = [ "checksum aho-corasick 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)" = "500909c4f87a9e52355b26626d890833e9e1d53ac566db76c36faa984b889699" "checksum backtrace 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "8709cc7ec06f6f0ae6c2c7e12f6ed41540781f72b488d83734978295ceae182e" "checksum backtrace-sys 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)" = "44585761d6161b0f57afc49482ab6bd067e4edef48c12a152c237eb0203f7661" +"checksum base64 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "229d032f1a99302697f10b27167ae6d03d49d032e6a8e2550e8d3fc13356d2b4" "checksum bitflags 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "aad18937a628ec6abcd26d1489012cc0e18c21798210f491af69ded9b881106d" +"checksum bitflags 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b3c30d3802dfb7281680d6285f2ccdaa8c2d8fee41f93805dba5c4cf50dc23cf" "checksum byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ff81738b726f5d099632ceaffe7fb65b90212e8dce59d518729e7e8634032d3d" "checksum bytes 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)" = "d828f97b58cc5de3e40c421d0cf2132d6b2da4ee0e11b8632fa838f0f9333ad6" "checksum bzip2 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c3eafc42c44e0d827de6b1c131175098fe7fb53b8ce8a47e65cb3ea94688be24" @@ -1062,12 +1225,17 @@ dependencies = [ "checksum flate2 0.2.20 (registry+https://github.com/rust-lang/crates.io-index)" = "e6234dd4468ae5d1e2dbb06fe2b058696fdc50a339c68a393aefbf00bc81e423" "checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3" "checksum fuchsia-zircon 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f6c0581a4e363262e52b87f59ee2afe3415361c6ec35e665924eb08afe8ff159" +"checksum fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" "checksum fuchsia-zircon-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "43f3795b4bae048dc6123a6b972cadde2e676f9ded08aef6bb77f5f157684a82" +"checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" "checksum futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)" = "118b49cac82e04121117cbd3121ede3147e885627d82c4546b87c702debb90c1" "checksum futures-borrow 0.1.0 (git+https://github.com/carllerche/better-future)" = "" +"checksum futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "ab90cde24b3319636588d0c35fe03b1333857621051837ed769faefb4c2162e4" "checksum h2 0.1.0 (git+https://github.com/carllerche/h2)" = "" "checksum heck 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ea04fa3ead4e05e51a7c806fc07271fdbde4e246a6c6d1efd52e72230b771b82" -"checksum http 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7fa2bbed743b54e56a0f1afa2d6d6eeb195383a60fb733eec4f8107c47bd4576" +"checksum http 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "bf8217d8829cc05dedadc08b4bc0684e5e3fbba1126c5edc680af49053fa230c" +"checksum httparse 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "c2f407128745b78abc95c0ffbe4e5d37427fdc0d45470710cfef8c44522a2e37" +"checksum hyper 0.11.15 (registry+https://github.com/rust-lang/crates.io-index)" = "4d6105c5eeb03068b10ff34475a0d166964f98e7b9777cc34b342a225af9b87c" "checksum idna 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "014b298351066f1512874135335d62a789ffe78a9974f94b43ed5621951eaf7d" "checksum iovec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b6e8b9c2247fcf6c6a1151f1156932be5606c9fd6f55a2d7f9fc1cb29386b2f7" "checksum ipnet 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "51268c3a27ad46afd1cca0bbf423a5be2e9fd3e6a7534736c195f0f834b763ef" @@ -1075,15 +1243,18 @@ dependencies = [ "checksum itertools 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "2c52051d3fd3b505796a0ee90f2e5ec43213808585e8adc4d0182492cf62751a" "checksum itoa 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "8324a32baf01e2ae060e9de58ed0bc2320c9a2833491ee36cd3b4c414de4db8c" "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" +"checksum language-tags 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a91d884b6667cd606bb5a69aa0c99ba811a115fc68915e7056ec08a46e93199a" "checksum lazy_static 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)" = "236eb37a62591d4a41a89b7763d7de3e06ca02d5ab2815446a8bae5d2f8c2d57" -"checksum lazycell 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3b585b7a6811fb03aa10e74b278a0f00f8dd9b45dc681f148bb29fa5cb61859b" +"checksum lazycell 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a6f08839bc70ef4a3fe1d566d5350f519c5912ea86be0df1740a7d247c7fc0ef" "checksum libc 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)" = "5ba3df4dcb460b9dfbd070d41c94c19209620c191b0340b929ce748a2bcd42d2" "checksum libz-sys 1.0.18 (registry+https://github.com/rust-lang/crates.io-index)" = "87f737ad6cc6fd6eefe3d9dc5412f1573865bded441300904d2f42269e140f16" -"checksum log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)" = "880f77541efa6e5cc74e76910c9884d9859683118839d6a1dc3b11e63512565b" +"checksum log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "e19e8d5c34a3e0e2223db8e060f9e8264aeeb5c5fc64a4ee9965c062211c024b" +"checksum log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "89f010e843f2b1a31dbd316b3b8d443758bc634bed37aabade59c686d644e0a2" "checksum matches 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "100aabe6b8ff4e4a7e32c1c13523379802df0772b82466207ac25b013f193376" "checksum memchr 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "148fab2e51b4f1cfc66da2a7c32981d1d3c083a803978268bb11fe4b86925e7a" +"checksum mime 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e2e00e17be181010a91dbfefb01660b17311059dc8c7f48b9017677721e732bd" "checksum miniz-sys 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "609ce024854aeb19a0ef7567d348aaa5a746b32fb72e336df7fcc16869d7e2b4" -"checksum mio 0.6.11 (registry+https://github.com/rust-lang/crates.io-index)" = "0e8411968194c7b139e9105bc4ae7db0bae232af087147e72f0616ebf5fdb9cb" +"checksum mio 0.6.12 (registry+https://github.com/rust-lang/crates.io-index)" = "75f72a93f046f1517e3cfddc0a096eb756a2ba727d36edc8227dee769a50a9b0" "checksum miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919" "checksum msdos_time 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "65ba9d75bcea84e07812618fedf284a64776c2f2ea0cad6bca7f69739695a958" "checksum multimap 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2eb04b9f127583ed176e163fb9ec6f3e793b87e21deedd5734a69386a18a0151" @@ -1093,6 +1264,7 @@ dependencies = [ "checksum num-integer 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)" = "d1452e8b06e448a07f0e6ebb0bb1d92b8890eea63288c0b627331d53514d0fba" "checksum num-iter 0.1.34 (registry+https://github.com/rust-lang/crates.io-index)" = "7485fcc84f85b4ecd0ea527b14189281cf27d60e583ae65ebc9c088b13dffe01" "checksum num-traits 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)" = "99843c856d68d8b4313b03a17e33c4bb42ae8f6610ea81b28abe076ac721b9b0" +"checksum num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c51a3322e4bca9d212ad9a158a02abc6934d005490c054a2778df73a70aa0a30" "checksum openssl-probe 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d98df0270d404ccd3c050a41d579c52d1db15375168bb3471e04ec0f5f378daf" "checksum openssl-sys 0.9.20 (registry+https://github.com/rust-lang/crates.io-index)" = "0ad395f1cee51b64a8d07cc8063498dc7554db62d5f3ca87a67f4eed2791d0c8" "checksum ordermap 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)" = "b81cf3b8cb96aa0e73bbedfcdc9708d09fec2854ba8d474be4e6f666d7379e8b" @@ -1112,7 +1284,9 @@ dependencies = [ "checksum redox_syscall 0.1.31 (registry+https://github.com/rust-lang/crates.io-index)" = "8dde11f18c108289bef24469638a04dce49da56084f2d50618b226e47eb04509" "checksum regex 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1731164734096285ec2a5ec7fea5248ae2f5485b3feeb0115af4fda2183b2d1b" "checksum regex-syntax 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ad890a5eef7953f55427c50575c680c42841653abd2b028b68cd223d157f62db" +"checksum relay 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f301bafeb60867c85170031bdb2fcf24c8041f33aee09e7b116a58d4e9f781c5" "checksum rustc-demangle 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "aee45432acc62f7b9a108cc054142dac51f979e69e71ddce7d6fc7adf29e817e" +"checksum safemem 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e27a8b19b835f7aea908818e871f5cc3a5a186550c30773be987e155e8163d8f" "checksum scoped-tls 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f417c22df063e9450888a7561788e9bd46d3bb3c1466435b4eccb903807f147d" "checksum serde 1.0.21 (registry+https://github.com/rust-lang/crates.io-index)" = "6eda663e865517ee783b0891a3f6eb3a253e0b0dabb46418969ee9635beadd9e" "checksum serde_derive 1.0.21 (registry+https://github.com/rust-lang/crates.io-index)" = "652bc323d694dc925829725ec6c890156d8e70ae5202919869cb00fe2eff3788" @@ -1120,16 +1294,20 @@ dependencies = [ "checksum serde_json 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "e4586746d1974a030c48919731ecffd0ed28d0c40749d0d18d43b3a7d6c9b20e" "checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23" "checksum slab 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fdeff4cd9ecff59ec7e3744cbca73dfe5ac35c2aedb2cfba8a1c715a18912e9d" +"checksum smallvec 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4c8cbcd6df1e117c2210e13ab5109635ad68a929fcbb8964dc965b76cb5ee013" "checksum socket2 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "36b4896961171cd3317c7e9603d88f379f8c6e45342212235d356496680c68fd" "checksum string 0.1.0 (git+https://github.com/carllerche/string)" = "" "checksum syn 0.11.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d3b891b9015c88c576343b9b3e41c2c11a51c219ef067b264bd9c8aa9b441dad" "checksum synom 0.11.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a393066ed9010ebaed60b9eafa373d4b1baac186dd7e008555b0f702b51945b6" +"checksum take 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b157868d8ac1f56b64604539990685fa7611d8fa9e5476cf0c02cf34d32917c5" "checksum tempdir 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "87974a6f5c1dfb344d733055601650059a3363de2a6104819293baff662132d6" "checksum thread_local 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "1697c4b57aeeb7a536b647165a2825faddffb1d3bad386d507709bd51a90bb14" "checksum time 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)" = "d5d788d3aa77bc0ef3e9621256885555368b47bd495c13dd2e7413c89f845520" "checksum tokio-connect 0.1.0 (git+https://github.com/carllerche/tokio-connect)" = "" -"checksum tokio-core 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "c843a027f7c1df5f81e7734a0df3f67bf329411781ebf36393ce67beef6071e3" +"checksum tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "52b4e32d8edbf29501aabb3570f027c6ceb00ccef6538f4bddba0200503e74e8" "checksum tokio-io 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "514aae203178929dbf03318ad7c683126672d4d96eccb77b29603d33c9e25743" +"checksum tokio-proto 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8fbb47ae81353c63c487030659494b295f6cb6576242f907f203473b191b0389" +"checksum tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "24da22d077e0f15f55162bdbdc661228c1581892f52074fb242678d015b45162" "checksum tower 0.1.0 (git+https://github.com/tower-rs/tower)" = "" "checksum tower-balance 0.1.0 (git+https://github.com/tower-rs/tower)" = "" "checksum tower-buffer 0.1.0 (git+https://github.com/tower-rs/tower)" = "" @@ -1137,6 +1315,7 @@ dependencies = [ "checksum tower-reconnect 0.1.0 (git+https://github.com/tower-rs/tower)" = "" "checksum tower-router 0.1.0 (git+https://github.com/tower-rs/tower)" = "" "checksum tower-util 0.1.0 (git+https://github.com/tower-rs/tower)" = "" +"checksum unicase 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "284b6d3db520d67fbe88fd778c21510d1b0ba4a551e5d0fbb023d33405f6de8a" "checksum unicode-bidi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "49f2bd0c6468a8230e1db229cff8029217cf623c767ea5d60bfbd42729ea54d5" "checksum unicode-normalization 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "51ccda9ef9efa3f7ef5d91e8f9b83bbe6955f9bf86aec89d5cce2c874625920f" "checksum unicode-segmentation 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a8083c594e02b8ae1654ae26f0ade5158b119bd88ad0e8227a5d8fcd72407946" @@ -1145,6 +1324,7 @@ dependencies = [ "checksum url 1.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fa35e768d4daf1d85733418a49fb42e10d7f633e394fccab4ab7aba897053fe2" "checksum utf8-ranges 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "662fab6525a98beff2921d7f61a39e7d59e0b425ebc7d0d9e66d316e55124122" "checksum vcpkg 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "9e0a7d8bed3178a8fb112199d466eeca9ed09a14ba8ad67718179b4fd5487d0b" +"checksum version_check 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "6b772017e347561807c1aa192438c5fd74242a670a6cffacc40f2defd1dc069d" "checksum void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" "checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" "checksum winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 7b6d8fb5d..f5c43212c 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -11,6 +11,8 @@ domain = "0.2.2" env_logger = "0.4" futures = "0.1" http = "0.1" +httparse = "1.2" +hyper = { version = "0.11.15", features = ["compat"] } ipnet = "1.0" log = "0.3" ordermap = "0.2" diff --git a/proxy/src/bind.rs b/proxy/src/bind.rs index 05d539bfb..e2a0ec74a 100644 --- a/proxy/src/bind.rs +++ b/proxy/src/bind.rs @@ -5,7 +5,6 @@ use std::sync::Arc; use std::sync::atomic::AtomicUsize; use std::time::Duration; -use h2; use http; use tokio_core::reactor::Handle; use tower_h2; @@ -14,6 +13,7 @@ use tower_reconnect::{self, Reconnect}; use control; use ctx; use telemetry; +use transparency; use transport; use ::timeout::Timeout; @@ -28,7 +28,6 @@ const DEFAULT_TIMEOUT_MS: u64 = 300; /// Buffering is not bounded and no timeouts are applied. pub struct Bind { ctx: C, - h2_builder: h2::client::Builder, sensors: telemetry::Sensors, executor: Handle, req_ids: Arc, @@ -36,32 +35,37 @@ pub struct Bind { _p: PhantomData, } +/// Binds a `Service` from a `SocketAddr` for a pre-determined protocol. +pub struct BindProtocol { + bind: Bind, + protocol: Protocol, +} + +/// Mark whether to use HTTP/1 or HTTP/2 +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub enum Protocol { + Http1, + Http2 +} + type Service = Reconnect< telemetry::sensor::NewHttp< - tower_h2::client::Client< + transparency::Client< telemetry::sensor::Connect>, - CtxtExec, B, >, B, - tower_h2::RecvBody, + transparency::HttpBody, >, >; -type CtxtExec = ::logging::ContextualExecutor<(&'static str, SocketAddr), Handle>; - impl Bind<(), B> { pub fn new(executor: Handle) -> Self { - let mut h2_builder = h2::client::Builder::default(); - // h2 currently doesn't handle PUSH_PROMISE that well, so we just - // disable it for now. - h2_builder.enable_push(false); Self { executor, ctx: (), sensors: telemetry::Sensors::null(), req_ids: Default::default(), - h2_builder, connect_timeout: Duration::from_millis(DEFAULT_TIMEOUT_MS), _p: PhantomData, } @@ -84,7 +88,6 @@ impl Bind<(), B> { pub fn with_ctx(self, ctx: C) -> Bind { Bind { ctx, - h2_builder: self.h2_builder, sensors: self.sensors, executor: self.executor, req_ids: self.req_ids, @@ -98,7 +101,6 @@ impl Clone for Bind { fn clone(&self) -> Self { Self { ctx: self.ctx.clone(), - h2_builder: self.h2_builder.clone(), sensors: self.sensors.clone(), executor: self.executor.clone(), req_ids: self.req_ids.clone(), @@ -110,6 +112,10 @@ impl Clone for Bind { impl Bind { + pub fn connect_timeout(&self) -> Duration { + self.connect_timeout + } + // pub fn ctx(&self) -> &C { // &self.ctx // } @@ -132,15 +138,15 @@ impl Bind, B> where B: tower_h2::Body + 'static, { - pub fn bind_service(&self, addr: &SocketAddr) -> Service { - trace!("bind_service {}", addr); + pub fn bind_service(&self, addr: &SocketAddr, protocol: Protocol) -> Service { + trace!("bind_service addr={}, protocol={:?}", addr, protocol); let client_ctx = ctx::transport::Client::new( &self.ctx, addr, control::pb::proxy::common::Protocol::Http, ); - // Map a socket address to an HTTP/2.0 connection. + // Map a socket address to a connection. let connect = { let c = Timeout::new( transport::Connect::new(*addr, &self.executor), @@ -151,30 +157,39 @@ where self.sensors.connect(c, &client_ctx) }; - // Establishes an HTTP/2.0 connection - let client = tower_h2::client::Client::new( + let client = transparency::Client::new( + protocol, connect, - self.h2_builder.clone(), - ::logging::context_executor(("client", *addr), self.executor.clone()), + self.executor.clone(), ); - let h2_proxy = self.sensors.http(self.req_ids.clone(), client, &client_ctx); + let proxy = self.sensors.http(self.req_ids.clone(), client, &client_ctx); // Automatically perform reconnects if the connection fails. // // TODO: Add some sort of backoff logic. - Reconnect::new(h2_proxy) + Reconnect::new(proxy) } } -// ===== impl Bind ===== +// ===== impl BindProtocol ===== -impl control::discovery::Bind for Bind, B> + +impl Bind { + pub fn with_protocol(self, protocol: Protocol) -> BindProtocol { + BindProtocol { + bind: self, + protocol, + } + } +} + +impl control::discovery::Bind for BindProtocol, B> where B: tower_h2::Body + 'static, { type Request = http::Request; - type Response = http::Response>; + type Response = http::Response>; type Error = tower_reconnect::Error< tower_h2::client::Error, tower_h2::client::ConnectError>, @@ -183,6 +198,7 @@ where type BindError = (); fn bind(&self, addr: &SocketAddr) -> Result { - Ok::<_, ()>(self.bind_service(addr)) + Ok(self.bind.bind_service(addr, self.protocol)) } } + diff --git a/proxy/src/connection.rs b/proxy/src/connection.rs index 5723e2bbb..01d461b23 100644 --- a/proxy/src/connection.rs +++ b/proxy/src/connection.rs @@ -1,3 +1,4 @@ +use bytes::Buf; use futures::*; use std; use std::io; @@ -8,7 +9,7 @@ use tokio_core::reactor::Handle; use tokio_io::{AsyncRead, AsyncWrite}; use config::Addr; -use transport; +use transport::GetOriginalDst; pub type PlaintextSocket = tokio_core::net::TcpStream; @@ -97,14 +98,20 @@ impl Future for Connecting { // ===== impl Connection ===== impl Connection { - pub fn original_dst_addr(&self) -> Option { - transport::get_original_dst(self.socket()) + pub fn original_dst_addr(&self, get: &T) -> Option { + get.get_original_dst(self.socket()) } pub fn local_addr(&self) -> Result { self.socket().local_addr() } + pub fn peek_future>(self, buf: T) -> Peek { + Peek { + inner: Some((self, buf)) + } + } + // This must never be made public so that in the future `Connection` can // control access to the plaintext socket for TLS, to ensure no private // data is accidentally writen to the socket and to ensure no unprotected @@ -128,8 +135,15 @@ impl io::Read for Connection { } } -// TODO: impl specialty functions -impl AsyncRead for Connection {} +impl AsyncRead for Connection { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + use self::Connection::*; + + match *self { + Plain(ref t) => t.prepare_uninitialized_buffer(buf), + } + } +} impl io::Write for Connection { fn write(&mut self, buf: &[u8]) -> io::Result { @@ -149,7 +163,6 @@ impl io::Write for Connection { } } -// TODO: impl specialty functions impl AsyncWrite for Connection { fn shutdown(&mut self) -> Poll<(), io::Error> { use self::Connection::*; @@ -158,6 +171,39 @@ impl AsyncWrite for Connection { Plain(ref mut t) => t.shutdown(), } } + + fn write_buf(&mut self, buf: &mut B) -> Poll { + use self::Connection::*; + + match *self { + Plain(ref mut t) => t.write_buf(buf), + } + } +} + +// impl Peek + +pub struct Peek { + inner: Option<(Connection, T)>, +} + +impl> Future for Peek { + type Item = (Connection, T, usize); + type Error = std::io::Error; + + fn poll(&mut self) -> Poll { + let (conn, mut buf) = self.inner.take().expect("polled after completed"); + match conn.socket().peek(buf.as_mut()) { + Ok(n) => Ok(Async::Ready((conn, buf, n))), + Err(e) => match e.kind() { + std::io::ErrorKind::WouldBlock => { + self.inner = Some((conn, buf)); + Ok(Async::NotReady) + }, + _ => Err(e) + }, + } + } } // Misc. diff --git a/proxy/src/control/discovery.rs b/proxy/src/control/discovery.rs index 89ff86662..e3adda5ae 100644 --- a/proxy/src/control/discovery.rs +++ b/proxy/src/control/discovery.rs @@ -67,7 +67,7 @@ struct DestinationSet { addrs: HashSet, needs_reconnect: bool, rx: R, - tx: mpsc::UnboundedSender, + txs: Vec>, } #[derive(Debug)] @@ -247,7 +247,13 @@ where trace!("Destination.Get {:?}", auth); match self.destinations.entry(auth) { Entry::Occupied(mut occ) => { - occ.get_mut().tx = tx; + let set = occ.get_mut(); + // we may already know of some addresses here, so push + // them onto the new watch first + for &addr in &set.addrs { + let _ = tx.unbounded_send(Update::Insert(addr)); + } + set.txs.push(tx); } Entry::Vacant(vac) => { let req = Destination { @@ -259,7 +265,7 @@ where addrs: HashSet::new(), needs_reconnect: false, rx: stream, - tx, + txs: vec![tx], }); } } @@ -317,7 +323,10 @@ where if let Some(addr) = addr.addr.and_then(pb_to_sock_addr) { if set.addrs.insert(addr) { trace!("update {:?} for {:?}", addr, auth); - let _ = set.tx.unbounded_send(Update::Insert(addr)); + // retain is used to drop any senders that are dead + set.txs.retain(|tx| { + tx.unbounded_send(Update::Insert(addr)).is_ok() + }); } } }, @@ -325,7 +334,10 @@ where if let Some(addr) = pb_to_sock_addr(addr) { if set.addrs.remove(&addr) { trace!("remove {:?} for {:?}", addr, auth); - let _ = set.tx.unbounded_send(Update::Remove(addr)); + // retain is used to drop any senders that are dead + set.txs.retain(|tx| { + tx.unbounded_send(Update::Remove(addr)).is_ok() + }); } } }, diff --git a/proxy/src/control/pb.rs b/proxy/src/control/pb.rs index feffd8743..60b4216a2 100644 --- a/proxy/src/control/pb.rs +++ b/proxy/src/control/pb.rs @@ -126,7 +126,7 @@ impl<'a> TryFrom<&'a Event> for common::TapEvent { stream: ctx.id as u64, }), method: Some((&ctx.method).into()), - scheme: ctx.uri.scheme().map(|s| s.into()), + scheme: ctx.uri.scheme_part().map(|s| s.as_str().into()), authority: ctx.uri .authority_part() .map(|a| a.as_str()) diff --git a/proxy/src/control/telemetry.rs b/proxy/src/control/telemetry.rs index bff954b8a..b547e58c2 100644 --- a/proxy/src/control/telemetry.rs +++ b/proxy/src/control/telemetry.rs @@ -96,7 +96,7 @@ where return; } Ok(Async::Ready(None)) => { - error!("report stream complete"); + debug!("report stream complete"); return; } Err(err) => { diff --git a/proxy/src/ctx/transport.rs b/proxy/src/ctx/transport.rs index a9b542eb4..2aaa35a56 100644 --- a/proxy/src/ctx/transport.rs +++ b/proxy/src/ctx/transport.rs @@ -1,4 +1,4 @@ -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; use control::pb::common::Protocol; @@ -56,6 +56,29 @@ impl Server { Arc::new(s) } + + pub fn orig_dst_if_not_local(&self) -> Option { + match self.orig_dst { + None => None, + Some(orig_dst) => { + // If the original destination is actually the listening socket, + // we don't want to create a loop. + if same_addr(&orig_dst, &self.local) { + None + } else { + Some(orig_dst) + } + } + } + } +} + +fn same_addr(a0: &SocketAddr, a1: &SocketAddr) -> bool { + (a0.port() == a1.port()) && match (a0.ip(), a1.ip()) { + (IpAddr::V6(a0), IpAddr::V4(a1)) => a0.to_ipv4() == Some(a1), + (IpAddr::V4(a0), IpAddr::V6(a1)) => Some(a0) == a1.to_ipv4(), + (a0, a1) => (a0 == a1), + } } impl Client { @@ -85,3 +108,58 @@ impl From> for Ctx { Ctx::Server(s) } } + +#[cfg(test)] +mod tests { + use std::net; + + use quickcheck::TestResult; + + use super::same_addr; + + quickcheck! { + fn same_addr_ipv4(ip0: net::Ipv4Addr, ip1: net::Ipv4Addr, port0: u16, port1: u16) -> TestResult { + if port0 == 0 || port0 == ::std::u16::MAX { + return TestResult::discard(); + } else if port1 == 0 || port1 == ::std::u16::MAX { + return TestResult::discard(); + } + + let addr0 = net::SocketAddr::new(net::IpAddr::V4(ip0), port0); + let addr1 = net::SocketAddr::new(net::IpAddr::V4(ip1), port1); + TestResult::from_bool(same_addr(&addr0, &addr1) == (addr0 == addr1)) + } + + fn same_addr_ipv6(ip0: net::Ipv6Addr, ip1: net::Ipv6Addr, port0: u16, port1: u16) -> TestResult { + if port0 == 0 || port0 == ::std::u16::MAX { + return TestResult::discard(); + } else if port1 == 0 || port1 == ::std::u16::MAX { + return TestResult::discard(); + } + + let addr0 = net::SocketAddr::new(net::IpAddr::V6(ip0), port0); + let addr1 = net::SocketAddr::new(net::IpAddr::V6(ip1), port1); + TestResult::from_bool(same_addr(&addr0, &addr1) == (addr0 == addr1)) + } + + fn same_addr_ip6_mapped_ipv4(ip: net::Ipv4Addr, port: u16) -> TestResult { + if port == 0 || port == ::std::u16::MAX { + return TestResult::discard(); + } + + let addr4 = net::SocketAddr::new(net::IpAddr::V4(ip), port); + let addr6 = net::SocketAddr::new(net::IpAddr::V6(ip.to_ipv6_mapped()), port); + TestResult::from_bool(same_addr(&addr4, &addr6)) + } + + fn same_addr_ip6_compat_ipv4(ip: net::Ipv4Addr, port: u16) -> TestResult { + if port == 0 || port == ::std::u16::MAX { + return TestResult::discard(); + } + + let addr4 = net::SocketAddr::new(net::IpAddr::V4(ip), port); + let addr6 = net::SocketAddr::new(net::IpAddr::V6(ip.to_ipv6_compatible()), port); + TestResult::from_bool(same_addr(&addr4, &addr6)) + } + } +} diff --git a/proxy/src/inbound.rs b/proxy/src/inbound.rs index 2c2f2b6db..1aa577a83 100644 --- a/proxy/src/inbound.rs +++ b/proxy/src/inbound.rs @@ -1,9 +1,8 @@ use std::io; -use std::net::{IpAddr, SocketAddr}; +use std::net::{SocketAddr}; use std::sync::Arc; use http; -use tokio_core::reactor::Handle; use tower_buffer::{self, Buffer}; use tower_h2; use tower_reconnect::{self, Reconnect}; @@ -12,6 +11,7 @@ use tower_router::Recognize; use bind; use ctx; use telemetry; +use transparency; use transport; type Bind = bind::Bind, B>; @@ -21,14 +21,11 @@ pub struct Inbound { bind: Bind, } -type Client = tower_h2::client::Client< +type Client = transparency::Client< telemetry::sensor::Connect>, - CtxtExec, B, >; -type CtxtExec = ::logging::ContextualExecutor<(&'static str, SocketAddr), Handle>; - // ===== impl Inbound ===== impl Inbound { @@ -38,14 +35,6 @@ impl Inbound { bind, } } - - fn same_addr(a0: &SocketAddr, a1: &SocketAddr) -> bool { - (a0.port() == a1.port()) && match (a0.ip(), a1.ip()) { - (IpAddr::V6(a0), IpAddr::V4(a1)) => a0.to_ipv4() == Some(a1), - (IpAddr::V4(a0), IpAddr::V6(a1)) => Some(a0) == a1.to_ipv4(), - (a0, a1) => (a0 == a1), - } - } } impl Recognize for Inbound @@ -53,37 +42,33 @@ where B: tower_h2::Body + 'static, { type Request = http::Request; - type Response = http::Response>; + type Response = http::Response>; type Error = tower_buffer::Error< tower_reconnect::Error< tower_h2::client::Error, tower_h2::client::ConnectError>, >, >; - type Key = SocketAddr; + type Key = (SocketAddr, bind::Protocol); type RouteError = (); - type Service = Buffer, B, tower_h2::RecvBody>>>; + type Service = Buffer, B, transparency::HttpBody>>>; fn recognize(&self, req: &Self::Request) -> Option { let key = req.extensions() .get::>() .and_then(|ctx| { trace!("recognize local={} orig={:?}", ctx.local, ctx.orig_dst); - match ctx.orig_dst { - None => None, - Some(orig_dst) => { - // If the original destination is actually the listening socket, - // we don't want to create a loop. - if Self::same_addr(&orig_dst, &ctx.local) { - None - } else { - Some(orig_dst) - } - } - } + ctx.orig_dst_if_not_local() }) .or_else(|| self.default_addr); + let proto = match req.version() { + http::Version::HTTP_2 => bind::Protocol::Http2, + _ => bind::Protocol::Http1, + }; + + let key = key.map(|addr| (addr, proto)); + trace!("recognize key={:?}", key); key @@ -95,14 +80,15 @@ where /// /// Buffering is currently unbounded and does not apply timeouts. This must be /// changed. - fn bind_service(&mut self, addr: &SocketAddr) -> Result { - debug!("building inbound client to {}", addr); + fn bind_service(&mut self, key: &Self::Key) -> Result { + let &(ref addr, proto) = key; + debug!("building inbound {:?} client to {}", proto, addr); // Wrap with buffering. This currently is an unbounded buffer, which // is not ideal. // // TODO: Don't use unbounded buffering. - Buffer::new(self.bind.bind_service(addr), self.bind.executor()).map_err(|_| {}) + Buffer::new(self.bind.bind_service(addr, proto), self.bind.executor()).map_err(|_| {}) } } @@ -112,13 +98,12 @@ mod tests { use std::sync::Arc; use http; - use quickcheck::TestResult; use tokio_core::reactor::Core; use tower_router::Recognize; use super::Inbound; use control::pb::common::Protocol; - use bind::Bind; + use bind::{self, Bind}; use ctx; fn new_inbound(default: Option, ctx: &Arc) -> Inbound<()> { @@ -128,50 +113,6 @@ mod tests { } quickcheck! { - fn same_addr_ipv4(ip0: net::Ipv4Addr, ip1: net::Ipv4Addr, port0: u16, port1: u16) -> TestResult { - if port0 == 0 || port0 == ::std::u16::MAX { - return TestResult::discard(); - } else if port1 == 0 || port1 == ::std::u16::MAX { - return TestResult::discard(); - } - - let addr0 = net::SocketAddr::new(net::IpAddr::V4(ip0), port0); - let addr1 = net::SocketAddr::new(net::IpAddr::V4(ip1), port1); - TestResult::from_bool(Inbound::<()>::same_addr(&addr0, &addr1) == (addr0 == addr1)) - } - - fn same_addr_ipv6(ip0: net::Ipv6Addr, ip1: net::Ipv6Addr, port0: u16, port1: u16) -> TestResult { - if port0 == 0 || port0 == ::std::u16::MAX { - return TestResult::discard(); - } else if port1 == 0 || port1 == ::std::u16::MAX { - return TestResult::discard(); - } - - let addr0 = net::SocketAddr::new(net::IpAddr::V6(ip0), port0); - let addr1 = net::SocketAddr::new(net::IpAddr::V6(ip1), port1); - TestResult::from_bool(Inbound::<()>::same_addr(&addr0, &addr1) == (addr0 == addr1)) - } - - fn same_addr_ip6_mapped_ipv4(ip: net::Ipv4Addr, port: u16) -> TestResult { - if port == 0 || port == ::std::u16::MAX { - return TestResult::discard(); - } - - let addr4 = net::SocketAddr::new(net::IpAddr::V4(ip), port); - let addr6 = net::SocketAddr::new(net::IpAddr::V6(ip.to_ipv6_mapped()), port); - TestResult::from_bool(Inbound::<()>::same_addr(&addr4, &addr6)) - } - - fn same_addr_ip6_compat_ipv4(ip: net::Ipv4Addr, port: u16) -> TestResult { - if port == 0 || port == ::std::u16::MAX { - return TestResult::discard(); - } - - let addr4 = net::SocketAddr::new(net::IpAddr::V4(ip), port); - let addr6 = net::SocketAddr::new(net::IpAddr::V6(ip.to_ipv6_compatible()), port); - TestResult::from_bool(Inbound::<()>::same_addr(&addr4, &addr6)) - } - fn recognize_orig_dst( orig_dst: net::SocketAddr, local: net::SocketAddr, @@ -181,21 +122,13 @@ mod tests { let inbound = new_inbound(None, &ctx); + let srv_ctx = ctx::transport::Server::new(&ctx, &local, &remote, &Some(orig_dst), Protocol::Http); + + let rec = srv_ctx.orig_dst_if_not_local().map(|addr| (addr, bind::Protocol::Http1)); + let mut req = http::Request::new(()); req.extensions_mut() - .insert(ctx::transport::Server::new( - &ctx, - &local, - &remote, - &Some(orig_dst), - Protocol::Http, - )); - - let rec = if Inbound::<()>::same_addr(&orig_dst, &local) { - None - } else { - Some(orig_dst) - }; + .insert(srv_ctx); inbound.recognize(&req) == rec } @@ -219,7 +152,7 @@ mod tests { Protocol::Http, )); - inbound.recognize(&req) == default + inbound.recognize(&req) == default.map(|addr| (addr, bind::Protocol::Http1)) } fn recognize_default_no_ctx(default: Option) -> bool { @@ -229,7 +162,7 @@ mod tests { let req = http::Request::new(()); - inbound.recognize(&req) == default + inbound.recognize(&req) == default.map(|addr| (addr, bind::Protocol::Http1)) } fn recognize_default_no_loop( @@ -251,7 +184,7 @@ mod tests { Protocol::Http, )); - inbound.recognize(&req) == default + inbound.recognize(&req) == default.map(|addr| (addr, bind::Protocol::Http1)) } } } diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index f5be3265b..da81cbc64 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -12,6 +12,8 @@ extern crate futures; extern crate futures_mpsc_lossy; extern crate h2; extern crate http; +extern crate httparse; +extern crate hyper; extern crate ipnet; #[cfg(target_os = "linux")] extern crate libc; @@ -46,12 +48,11 @@ use std::io; use std::net::SocketAddr; use std::sync::Arc; use std::thread; -use std::time::Instant; +use std::time::Duration; use tokio_core::reactor::{Core, Handle}; use tower::NewService; use tower_fn::*; -use tower_h2::*; use tower_router::{Recognize, Router}; pub mod app; @@ -68,6 +69,7 @@ mod logging; mod map_err; mod outbound; mod telemetry; +mod transparency; mod transport; pub mod timeout; mod tower_fn; // TODO: move to tower-fn @@ -77,6 +79,8 @@ use connection::BoundPort; use control::pb::proxy::tap; use inbound::Inbound; use map_err::MapErr; +use transparency::{HttpBody, Server}; +pub use transport::{GetOriginalDst, SoOriginalDst}; use outbound::Outbound; /// Runs a sidecar proxy. @@ -92,16 +96,21 @@ use outbound::Outbound; /// The private listener routes requests to service-discovery-aware load-balancer. /// -pub struct Main { +pub struct Main { config: config::Config, control_listener: BoundPort, inbound_listener: BoundPort, outbound_listener: BoundPort, + + get_original_dst: G, } -impl Main { - pub fn new(config: config::Config) -> Self { +impl Main +where + G: GetOriginalDst + Clone + 'static, +{ + pub fn new(config: config::Config, get_original_dst: G) -> Self { let control_listener = BoundPort::new(config.control_listener.addr) .expect("controller listener bind"); @@ -114,6 +123,7 @@ impl Main { control_listener, inbound_listener, outbound_listener, + get_original_dst, } } @@ -145,6 +155,7 @@ impl Main { control_listener, inbound_listener, outbound_listener, + get_original_dst, } = self; let control_host_and_port = config.control_host_and_port.clone(); @@ -186,10 +197,11 @@ impl Main { let fut = serve( inbound_listener, - h2::server::Builder::default(), Inbound::new(default_addr, bind), + config.private_connect_timeout, ctx, sensors.clone(), + get_original_dst.clone(), &executor, ); ::logging::context_future("inbound", fut) @@ -206,6 +218,8 @@ impl Main { .map_or_else(|| bind.clone(), |t| bind.clone().with_connect_timeout(t)) .with_ctx(ctx.clone()); + let tcp_connect_timeout = bind.connect_timeout(); + let outgoing = Outbound::new( bind, control, @@ -214,10 +228,11 @@ impl Main { let fut = serve( outbound_listener, - h2::server::Builder::default(), outgoing, + tcp_connect_timeout, ctx, sensors, + get_original_dst, &executor, ); ::logging::context_future("outbound", fut) @@ -239,7 +254,6 @@ impl Main { let server = serve_control( control_listener, - h2::server::Builder::default(), new_service, &executor, ); @@ -275,85 +289,69 @@ impl Main { } } -fn serve( +fn serve( bound_port: BoundPort, - h2_builder: h2::server::Builder, recognize: R, + tcp_connect_timeout: Duration, proxy_ctx: Arc, sensors: telemetry::Sensors, + get_orig_dst: G, executor: &Handle, ) -> Box + 'static> where - B: Body + Default + 'static, + B: tower_h2::Body + Default + 'static, E: ::std::fmt::Debug + 'static, F: ::std::fmt::Debug + 'static, R: Recognize< - Request = http::Request, + Request = http::Request, Response = http::Response>, Error = E, RouteError = F, > + 'static, + G: GetOriginalDst + 'static, { let router = Router::new(recognize); - let stack = NewServiceFn::new(move || { + let stack = Arc::new(NewServiceFn::new(move || { // Clone the router handle let router = router.clone(); // Map errors to 500 responses MapErr::new(router) - }); + })); let listen_addr = bound_port.local_addr(); let server = Server::new( + listen_addr, + proxy_ctx, + sensors, + get_orig_dst, stack, - h2_builder, - ::logging::context_executor(("serve", listen_addr), executor.clone()), + tcp_connect_timeout, + executor.clone(), ); + bound_port.listen_and_fold( executor, - (server, proxy_ctx, sensors, executor.clone()), - move |(server, proxy_ctx, sensors, executor), (connection, remote_addr)| { - let opened_at = Instant::now(); - let orig_dst = connection.original_dst_addr(); - let local_addr = connection.local_addr().unwrap_or(listen_addr); - // TODO: detect protocol. - let protocol = control::pb::common::Protocol::Http; - let srv_ctx = - ctx::transport::Server::new( - &proxy_ctx, - &local_addr, - &remote_addr, - &orig_dst, - protocol, - ); - - let io = sensors.accept(connection, opened_at, &srv_ctx); - - // TODO session context - let set_ctx = move |request: &mut http::Request<()>| { - request.extensions_mut().insert(Arc::clone(&srv_ctx)); - }; - - let s = server.serve_modified(io, set_ctx).map_err(|_| ()); - executor.spawn(::logging::context_future(("serve", local_addr), s)); - - future::ok((server, proxy_ctx, sensors, executor)) + (), + move |(), (connection, remote_addr)| { + server.serve(connection, remote_addr); + Ok(()) }, ) } fn serve_control( bound_port: BoundPort, - h2_builder: h2::server::Builder, new_service: N, executor: &Handle, ) -> Box + 'static> where - B: Body + 'static, - N: NewService, Response = http::Response> + 'static, + B: tower_h2::Body + 'static, + N: NewService, Response = http::Response> + 'static, { - let server = Server::new(new_service, h2_builder, executor.clone()); + let h2_builder = h2::server::Builder::default(); + let server = tower_h2::Server::new(new_service, h2_builder, executor.clone()); bound_port.listen_and_fold( executor, (server, executor.clone()), diff --git a/proxy/src/main.rs b/proxy/src/main.rs index 8b4514c15..0c07eb88c 100644 --- a/proxy/src/main.rs +++ b/proxy/src/main.rs @@ -13,5 +13,5 @@ fn main() { process::exit(64) } }; - conduit_proxy::Main::new(config).run(); + conduit_proxy::Main::new(config, conduit_proxy::SoOriginalDst).run(); } diff --git a/proxy/src/map_err.rs b/proxy/src/map_err.rs index 1d002aa73..8d4e879f2 100644 --- a/proxy/src/map_err.rs +++ b/proxy/src/map_err.rs @@ -4,6 +4,7 @@ use std::marker::PhantomData; use futures::{Future, Poll}; use h2; use http; +use http::header::CONTENT_LENGTH; use tower::Service; /// Map an HTTP service's error to an appropriate 500 response. @@ -73,9 +74,10 @@ where fn poll(&mut self) -> Poll { self.inner.poll().or_else(|e| { - error!("turning h2 error into 500: {:?}", e); + error!("turning service error into 500: {:?}", e); let response = http::Response::builder() .status(500) + .header(CONTENT_LENGTH, "0") .body(Default::default()) .unwrap(); diff --git a/proxy/src/outbound.rs b/proxy/src/outbound.rs index 6233fd3a4..1352ad09c 100644 --- a/proxy/src/outbound.rs +++ b/proxy/src/outbound.rs @@ -8,14 +8,15 @@ use tower_h2; use tower_reconnect; use tower_router::Recognize; -use bind::Bind; +use bind::{Bind, BindProtocol, Protocol}; use control; use ctx; use fully_qualified_authority::FullyQualifiedAuthority; use telemetry; +use transparency; use transport; -type Discovery = control::discovery::Watch, B>>; +type Discovery = control::discovery::Watch, B>>; type Error = tower_buffer::Error< tower_balance::Error< @@ -54,19 +55,25 @@ where B: tower_h2::Body + 'static, { type Request = http::Request; - type Response = http::Response>; + type Response = http::Response>; type Error = Error; - type Key = FullyQualifiedAuthority; + type Key = (FullyQualifiedAuthority, Protocol); type RouteError = (); type Service = Buffer>>; fn recognize(&self, req: &Self::Request) -> Option { - req.uri().authority_part().map(|authority| - FullyQualifiedAuthority::new( + req.uri().authority_part().map(|authority| { + let auth = FullyQualifiedAuthority::new( authority, self.default_namespace.as_ref().map(|s| s.as_ref()), - self.default_zone.as_ref().map(|s| s.as_ref())) - ) + self.default_zone.as_ref().map(|s| s.as_ref())); + + let proto = match req.version() { + http::Version::HTTP_2 => Protocol::Http2, + _ => Protocol::Http1, + }; + (auth, proto) + }) } /// Builds a dynamic, load balancing service. @@ -80,11 +87,15 @@ where /// changed. fn bind_service( &mut self, - authority: &FullyQualifiedAuthority, + key: &Self::Key, ) -> Result { - debug!("building outbound client to {:?}", authority); + let &(ref authority, protocol) = key; + debug!("building outbound {:?} client to {:?}", protocol, authority); - let resolve = self.discovery.resolve(authority, self.bind.clone()); + let resolve = self.discovery.resolve( + authority, + self.bind.clone().with_protocol(protocol), + ); let balance = Balance::new(resolve); diff --git a/proxy/src/telemetry/tap/match_.rs b/proxy/src/telemetry/tap/match_.rs index 598e3f858..0fb8e3b4b 100644 --- a/proxy/src/telemetry/tap/match_.rs +++ b/proxy/src/telemetry/tap/match_.rs @@ -260,7 +260,7 @@ impl<'a> TryFrom<&'a observe_request::match_::tcp::Netmask> for NetMatch { impl HttpMatch { fn matches(&self, req: &Arc) -> bool { match *self { - HttpMatch::Scheme(ref m) => req.uri.scheme().map(|s| *m == s).unwrap_or(false), + HttpMatch::Scheme(ref m) => req.uri.scheme_part().map(|s| &**m == s).unwrap_or(false), HttpMatch::Method(ref m) => *m == req.method, diff --git a/proxy/src/transparency/client.rs b/proxy/src/transparency/client.rs new file mode 100644 index 000000000..7bc5f7507 --- /dev/null +++ b/proxy/src/transparency/client.rs @@ -0,0 +1,215 @@ +use futures::{Async, Future, Poll}; +use h2; +use http; +use hyper; +use tokio_connect::Connect; +use tokio_core::reactor::Handle; +use tower::{Service, NewService}; +use tower_h2; + +use bind; +use super::glue::{BodyStream, HttpBody, HyperConnect}; + +/// A `NewService` that can speak either HTTP/1 or HTTP/2. +pub struct Client +where + B: tower_h2::Body, +{ + inner: ClientInner, +} + +enum ClientInner +where + B: tower_h2::Body, +{ + Http1(hyper::Client, BodyStream>), + Http2(tower_h2::client::Client), +} + +/// A `Future` returned from `Client::new_service()`. +pub struct ClientNewServiceFuture +where + B: tower_h2::Body + 'static, + C: Connect + 'static, +{ + inner: ClientNewServiceFutureInner, +} + +enum ClientNewServiceFutureInner +where + B: tower_h2::Body + 'static, + C: Connect + 'static, +{ + Http1(Option, BodyStream>>), + Http2(tower_h2::client::ConnectFuture), +} + +/// The `Service` yielded by `Client::new_service()`. +pub struct ClientService +where + B: tower_h2::Body, +{ + inner: ClientServiceInner, +} + +enum ClientServiceInner +where + B: tower_h2::Body, +{ + Http1(hyper::Client, BodyStream>), + Http2(tower_h2::client::Service), +} + +impl Client +where + C: Connect + Clone + 'static, + C::Future: 'static, + B: tower_h2::Body + 'static, +{ + /// Create a new `Client`, bound to a specific protocol (HTTP/1 or HTTP/2). + pub fn new(protocol: bind::Protocol, connect: C, executor: Handle) -> Self { + match protocol { + bind::Protocol::Http1 => { + let h1 = hyper::Client::configure() + .connector(HyperConnect::new(connect)) + .body() + .build(&executor); + Client { + inner: ClientInner::Http1(h1), + } + }, + bind::Protocol::Http2 => { + let mut h2_builder = h2::client::Builder::default(); + // h2 currently doesn't handle PUSH_PROMISE that well, so we just + // disable it for now. + h2_builder.enable_push(false); + let h2 = tower_h2::client::Client::new(connect, h2_builder, executor); + + Client { + inner: ClientInner::Http2(h2), + } + } + } + } +} + +impl NewService for Client +where + C: Connect + Clone + 'static, + C::Future: 'static, + B: tower_h2::Body + 'static, +{ + type Request = http::Request; + type Response = http::Response; + type Error = tower_h2::client::Error; + type InitError = tower_h2::client::ConnectError; + type Service = ClientService; + type Future = ClientNewServiceFuture; + + fn new_service(&self) -> Self::Future { + let inner = match self.inner { + ClientInner::Http1(ref h1) => { + ClientNewServiceFutureInner::Http1(Some(h1.clone())) + }, + ClientInner::Http2(ref h2) => { + ClientNewServiceFutureInner::Http2(h2.new_service()) }, + }; + ClientNewServiceFuture { + inner, + } + } +} + +impl Future for ClientNewServiceFuture +where + C: Connect + 'static, + B: tower_h2::Body + 'static, +{ + type Item = ClientService; + type Error = tower_h2::client::ConnectError; + + fn poll(&mut self) -> Poll { + let inner = match self.inner { + ClientNewServiceFutureInner::Http1(ref mut h1) => { + ClientServiceInner::Http1(h1.take().expect("poll more than once")) + }, + ClientNewServiceFutureInner::Http2(ref mut h2) => { + let s = try_ready!(h2.poll()); + ClientServiceInner::Http2(s) + }, + }; + Ok(Async::Ready(ClientService { + inner, + })) + } +} + +impl Service for ClientService +where + C: Connect + 'static, + C::Future: 'static, + B: tower_h2::Body + 'static, +{ + type Request = http::Request; + type Response = http::Response; + type Error = tower_h2::client::Error; + type Future = ClientServiceFuture; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + match self.inner { + ClientServiceInner::Http1(_) => Ok(Async::Ready(())), + ClientServiceInner::Http2(ref mut h2) => h2.poll_ready(), + } + } + + fn call(&mut self, req: Self::Request) -> Self::Future { + match self.inner { + ClientServiceInner::Http1(ref h1) => { + let is_body_empty = req.body().is_end_stream(); + let mut req = hyper::Request::from(req.map(BodyStream)); + if is_body_empty { + req.headers_mut().set(hyper::header::ContentLength(0)); + } + ClientServiceFuture::Http1(h1.request(req)) + }, + ClientServiceInner::Http2(ref mut h2) => { + ClientServiceFuture::Http2(h2.call(req)) + }, + } + } +} + +pub enum ClientServiceFuture { + Http1(hyper::client::FutureResponse), + Http2(tower_h2::client::ResponseFuture), +} + +impl Future for ClientServiceFuture { + type Item = http::Response; + type Error = tower_h2::client::Error; + + fn poll(&mut self) -> Poll { + match *self { + ClientServiceFuture::Http1(ref mut f) => { + match f.poll() { + Ok(Async::Ready(res)) => { + let res = http::Response::from(res); + let res = res.map(HttpBody::Http1); + Ok(Async::Ready(res)) + }, + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(e) => { + debug!("http/1 client error: {}", e); + Err(h2::Reason::INTERNAL_ERROR.into()) + } + } + }, + ClientServiceFuture::Http2(ref mut f) => { + let res = try_ready!(f.poll()); + let res = res.map(HttpBody::Http2); + Ok(Async::Ready(res)) + } + } + } +} + diff --git a/proxy/src/transparency/glue.rs b/proxy/src/transparency/glue.rs new file mode 100644 index 000000000..714cfcaf5 --- /dev/null +++ b/proxy/src/transparency/glue.rs @@ -0,0 +1,333 @@ +use std::cell::RefCell; +use std::fmt; +use std::io; +use std::sync::Arc; + +use bytes::Bytes; +use futures::{future, Async, Future, Poll, Stream}; +use futures::future::Either; +use h2; +use http; +use hyper; +use tokio_connect::Connect; +use tower::{Service, NewService}; +use tower_h2; + +use ctx::transport::{Server as ServerCtx}; +use super::h1; + +/// Glue between `hyper::Body` and `tower_h2::RecvBody`. +#[derive(Debug)] +pub enum HttpBody { + Http1(hyper::Body), + Http2(tower_h2::RecvBody), +} + +/// Glue for `tower_h2::Body`s to be used in hyper. +#[derive(Debug)] +pub(super) struct BodyStream(pub(super) B); + +/// Glue for the `Data` part of a `tower_h2::Body` to be used as an `AsRef` in `BodyStream`. +#[derive(Debug)] +pub(super) struct BufAsRef(B); + +/// Glue for a `tower::Service` to used as a `hyper::server::Service`. +#[derive(Debug)] +pub(super) struct HyperServerSvc { + service: RefCell, + srv_ctx: Arc, +} + +/// Future returned by `HyperServerSvc`. +pub(super) struct HyperServerSvcFuture { + inner: F, +} + +/// Glue for any `Service` taking an h2 body to receive an `HttpBody`. +#[derive(Debug)] +pub(super) struct HttpBodySvc { + service: S, +} + +/// Glue for any `NewService` taking an h2 body to receive an `HttpBody`. +#[derive(Clone)] +pub(super) struct HttpBodyNewSvc { + new_service: N, +} + +/// Future returned by `HttpBodyNewSvc`. +pub(super) struct HttpBodyNewSvcFuture { + inner: F, +} + +/// Glue for any `tokio_connect::Connect` to implement `hyper::client::Connect`. +#[derive(Debug, Clone)] +pub(super) struct HyperConnect { + connect: C, +} + +/// Future returned by `HyperConnect`. +pub(super) struct HyperConnectFuture { + inner: F, +} + +// ===== impl HttpBody ===== + +impl tower_h2::Body for HttpBody { + type Data = Bytes; + + fn is_end_stream(&self) -> bool { + match *self { + HttpBody::Http1(_) => false, + HttpBody::Http2(ref b) => b.is_end_stream(), + } + } + + fn poll_data(&mut self) -> Poll, h2::Error> { + match *self { + HttpBody::Http1(ref mut b) => { + match b.poll() { + Ok(Async::Ready(Some(chunk))) => Ok(Async::Ready(Some(chunk.into()))), + Ok(Async::Ready(None)) => Ok(Async::Ready(None)), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(e) => { + debug!("http/1 body error: {}", e); + Err(h2::Reason::INTERNAL_ERROR.into()) + } + } + }, + HttpBody::Http2(ref mut b) => b.poll_data().map(|async| async.map(|opt| opt.map(|data| data.into()))), + } + } + + fn poll_trailers(&mut self) -> Poll, h2::Error> { + match *self { + HttpBody::Http1(_) => Ok(Async::Ready(None)), + HttpBody::Http2(ref mut b) => b.poll_trailers(), + } + } +} + +impl Default for HttpBody { + fn default() -> HttpBody { + HttpBody::Http2(Default::default()) + } +} + +// ===== impl BodyStream ===== + +impl Stream for BodyStream +where + B: tower_h2::Body, +{ + type Item = BufAsRef<::Buf>; + type Error = hyper::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + self.0.poll_data() + .map(|async| async.map(|opt| opt.map(|buf| BufAsRef(::bytes::IntoBuf::into_buf(buf))))) + .map_err(|e| { + trace!("h2 body error: {:?}", e); + hyper::Error::Io(io::ErrorKind::Other.into()) + }) + } +} + +// ===== impl BufAsRef ===== + +impl AsRef<[u8]> for BufAsRef { + fn as_ref(&self) -> &[u8] { + ::bytes::Buf::bytes(&self.0) + } +} + +// ===== impl HyperServerSvc ===== + +impl HyperServerSvc { + pub fn new(svc: S, ctx: Arc) -> Self { + HyperServerSvc { + service: RefCell::new(svc), + srv_ctx: ctx, + } + } +} + +impl hyper::server::Service for HyperServerSvc +where + S: Service< + Request=http::Request, + Response=http::Response, + >, + S::Error: fmt::Debug, + B: tower_h2::Body + 'static, +{ + type Request = hyper::server::Request; + type Response = hyper::server::Response>; + type Error = hyper::Error; + type Future = Either< + HyperServerSvcFuture, + future::FutureResult, + >; + + fn call(&self, req: Self::Request) -> Self::Future { + if let &hyper::Method::Connect = req.method() { + debug!("HTTP/1.1 CONNECT not supported"); + let res = hyper::Response::new() + .with_status(hyper::StatusCode::BadGateway); + return Either::B(future::ok(res)); + + } + + let mut req: http::Request = req.into(); + req.extensions_mut().insert(self.srv_ctx.clone()); + + if let Err(()) = h1::reconstruct_uri(&mut req) { + let res = hyper::Response::new() + .with_status(hyper::BadRequest); + return Either::B(future::ok(res)); + } + h1::strip_connection_headers(req.headers_mut()); + + let req = req.map(|b| HttpBody::Http1(b)); + let f = HyperServerSvcFuture { + inner: self.service.borrow_mut().call(req), + }; + Either::A(f) + } +} + +impl Future for HyperServerSvcFuture +where + F: Future>, + F::Error: fmt::Debug, +{ + type Item = hyper::server::Response>; + type Error = hyper::Error; + + fn poll(&mut self) -> Poll { + let mut res = try_ready!(self.inner.poll().map_err(|e| { + debug!("h2 error: {:?}", e); + hyper::Error::Io(io::ErrorKind::Other.into()) + })); + + if res.status() == http::StatusCode::SWITCHING_PROTOCOLS { + debug!("HTTP/1.1 101 upgrade not supported"); + let res = hyper::Response::new() + .with_status(hyper::StatusCode::BadGateway); + return Ok(Async::Ready(res)); + } + h1::strip_connection_headers(res.headers_mut()); + Ok(Async::Ready(res.map(BodyStream).into())) + } +} + +// ==== impl HttpBodySvc ==== + + +impl Service for HttpBodySvc +where + S: Service< + Request=http::Request, + >, +{ + type Request = http::Request; + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.service.poll_ready() + } + + fn call(&mut self, req: Self::Request) -> Self::Future { + self.service.call(req.map(|b| HttpBody::Http2(b))) + } +} + +impl HttpBodyNewSvc +where + N: NewService>, +{ + pub fn new(new_service: N) -> Self { + HttpBodyNewSvc { + new_service, + } + } +} + +impl NewService for HttpBodyNewSvc +where + N: NewService>, +{ + type Request = http::Request; + type Response = N::Response; + type Error = N::Error; + type Service = HttpBodySvc; + type InitError = N::InitError; + type Future = HttpBodyNewSvcFuture; + + fn new_service(&self) -> Self::Future { + HttpBodyNewSvcFuture { + inner: self.new_service.new_service(), + } + } +} + +impl Future for HttpBodyNewSvcFuture +where + F: Future, +{ + type Item = HttpBodySvc; + type Error = F::Error; + + fn poll(&mut self) -> Poll { + let s = try_ready!(self.inner.poll()); + Ok(Async::Ready(HttpBodySvc { + service: s, + })) + } +} + +// ===== impl HyperConnect ===== + +impl HyperConnect +where + C: Connect, + C::Future: 'static, +{ + pub fn new(connect: C) -> Self { + HyperConnect { + connect, + } + } +} + +impl hyper::client::Service for HyperConnect +where + C: Connect, + C::Future: 'static, +{ + type Request = hyper::Uri; + type Response = C::Connected; + type Error = io::Error; + type Future = HyperConnectFuture; + + fn call(&self, _uri: Self::Request) -> Self::Future { + HyperConnectFuture { + inner: self.connect.connect(), + } + } +} + +impl Future for HyperConnectFuture +where + F: Future, +{ + type Item = F::Item; + type Error = io::Error; + + fn poll(&mut self) -> Poll { + self.inner.poll() + .map_err(|_| io::ErrorKind::Other.into()) + } +} diff --git a/proxy/src/transparency/h1.rs b/proxy/src/transparency/h1.rs new file mode 100644 index 000000000..5219a1e52 --- /dev/null +++ b/proxy/src/transparency/h1.rs @@ -0,0 +1,99 @@ +use std::fmt::Write; +use std::mem; +use std::sync::Arc; + +use bytes::BytesMut; +use http; +use http::header::{HeaderValue, HOST}; +use http::uri::{Authority, Parts, Scheme, Uri}; + +use ctx::transport::{Server as ServerCtx}; + +pub fn reconstruct_uri(req: &mut http::Request) -> Result<(), ()> { + // RFC7230#section-5.4 + // If an absolute-form uri is received, it must replace + // the host header + if let Some(auth) = req.uri().authority_part().cloned() { + if let Some(host) = req.headers().get(HOST) { + if auth.as_str().as_bytes() == host.as_bytes() { + // host and absolute-form agree, nothing more to do + return Ok(()); + } + } + let host = HeaderValue::from_shared(auth.into_bytes()) + .expect("a valid authority is valid header value"); + req.headers_mut().insert(HOST, host); + return Ok(()); + } + + // try to parse the Host header + if let Some(host) = req.headers().get(HOST).cloned() { + let auth = host.to_str() + .ok() + .and_then(|s| { + if s.is_empty() { + None + } else { + s.parse::().ok() + } + }); + if let Some(auth) = auth { + set_authority(req.uri_mut(), auth); + return Ok(()); + } + } + + // last resort is to use the so_original_dst + let orig_dst = req.extensions() + .get::>() + .and_then(|ctx| ctx.orig_dst_if_not_local()); + if let Some(orig_dst) = orig_dst { + let mut bytes = BytesMut::with_capacity(31); + write!(&mut bytes, "{}", orig_dst) + .expect("socket address display is under 31 bytes"); + let bytes = bytes.freeze(); + let auth = Authority::from_shared(bytes) + .expect("socket address is valid authority"); + set_authority(req.uri_mut(), auth); + + return Ok(()); + } + + Err(()) +} + +fn set_authority(uri: &mut http::Uri, auth: Authority) { + let mut parts = Parts::from(mem::replace(uri, Uri::default())); + parts.scheme = Some(Scheme::HTTP); + parts.authority = Some(auth); + + let new = Uri::from_parts(parts) + .expect("absolute uri"); + + *uri = new; +} + +pub fn strip_connection_headers(headers: &mut http::HeaderMap) { + let conn_val = if let Some(val) = headers.remove(http::header::CONNECTION) { + val + } else { + return + }; + + let conn_header = if let Ok(s) = conn_val.to_str() { + s + } else { + return + }; + + // A `Connection` header may have a comma-separated list of + // names of other headers that are meant for only this specific connection. + // + // Iterate these names and remove them as headers. + for name in conn_header.split(',') { + let name = name.trim(); + headers.remove(name); + } +} + + diff --git a/proxy/src/transparency/mod.rs b/proxy/src/transparency/mod.rs new file mode 100644 index 000000000..3fc0d48d5 --- /dev/null +++ b/proxy/src/transparency/mod.rs @@ -0,0 +1,10 @@ +mod client; +mod glue; +mod h1; +mod protocol; +mod server; +mod tcp; + +pub use self::client::Client; +pub use self::glue::HttpBody; +pub use self::server::Server; diff --git a/proxy/src/transparency/protocol.rs b/proxy/src/transparency/protocol.rs new file mode 100644 index 000000000..77c4940de --- /dev/null +++ b/proxy/src/transparency/protocol.rs @@ -0,0 +1,45 @@ +use httparse; + +/// Known protocols that we proxy transparently. +#[derive(Debug)] +pub enum Protocol { + Http1, + Http2, +} + +const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; + +impl Protocol { + /// Tries to detect a known protocol in the peeked bytes. + /// + /// If no protocol can be determined, returns `None`. + pub fn detect(bytes: &[u8]) -> Option { + // http2 is easiest to detect + if bytes.len() >= H2_PREFACE.len() { + if &bytes[..H2_PREFACE.len()] == H2_PREFACE { + return Some(Protocol::Http2); + } + } + + // http1 can have a really long first line, but if the bytes so far + // look like http1, we'll assume it is. a different protocol + // should look different in the first few bytes + + let mut headers = [httparse::EMPTY_HEADER; 0]; + let mut req = httparse::Request::new(&mut headers); + match req.parse(bytes) { + // Ok(Compelete) or Ok(Partial) both mean it looks like HTTP1! + // + // If we got past the first line, we'll see TooManyHeaders, + // because we passed an array of 0 headers to parse into. That's fine! + // We didn't want to keep parsing headers, just validate that + // the first line is HTTP1. + Ok(_) | Err(httparse::Error::TooManyHeaders) => { + return Some(Protocol::Http1); + }, + _ => {} + } + + None + } +} diff --git a/proxy/src/transparency/server.rs b/proxy/src/transparency/server.rs new file mode 100644 index 000000000..aa7f2442f --- /dev/null +++ b/proxy/src/transparency/server.rs @@ -0,0 +1,160 @@ +use std::fmt; +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use futures::Future; +use http; +use hyper; +use tokio_core::reactor::Handle; +use tower::NewService; +use tower_h2; + +use control; +use connection::Connection; +use ctx::Proxy as ProxyCtx; +use ctx::transport::{Server as ServerCtx}; +use telemetry::Sensors; +use transport::GetOriginalDst; +use super::glue::{HttpBody, HttpBodyNewSvc, HyperServerSvc}; +use super::protocol::Protocol; +use super::tcp; + +/// A protocol-transparent Server! +/// +/// This type can `serve` new connections, determine what protocol +/// the connection is speaking, and route it to the corresponding +/// service. +pub struct Server +where + S: NewService>, + S::Future: 'static, +{ + executor: Handle, + get_orig_dst: G, + h1: hyper::server::Http, + h2: tower_h2::Server, Handle, B>, + listen_addr: SocketAddr, + new_service: S, + proxy_ctx: Arc, + sensors: Sensors, + tcp: tcp::Proxy, +} + +impl Server +where + S: NewService< + Request = http::Request, + Response = http::Response + > + Clone + 'static, + S::Future: 'static, + S::Error: fmt::Debug, + B: tower_h2::Body + 'static, + G: GetOriginalDst, +{ + /// Creates a new `Server`. + pub fn new( + listen_addr: SocketAddr, + proxy_ctx: Arc, + sensors: Sensors, + get_orig_dst: G, + stack: S, + tcp_connect_timeout: Duration, + executor: Handle, + ) -> Self { + let recv_body_svc = HttpBodyNewSvc::new(stack.clone()); + let tcp = tcp::Proxy::new(tcp_connect_timeout, sensors.clone(), &executor); + Server { + executor: executor.clone(), + get_orig_dst, + h1: hyper::server::Http::new(), + h2: tower_h2::Server::new(recv_body_svc, Default::default(), executor), + listen_addr, + new_service: stack, + proxy_ctx, + sensors, + tcp, + } + } + + /// Handle a new connection. + /// + /// This will peek on the connection for the first bytes to determine + /// what protocol the connection is speaking. From there, the connection + /// will be mapped into respective services, and spawned into an + /// executor. + pub fn serve(&self, connection: Connection, remote_addr: SocketAddr) { + let opened_at = Instant::now(); + + // create Server context + let orig_dst = connection.original_dst_addr(&self.get_orig_dst); + let local_addr = connection.local_addr().unwrap_or(self.listen_addr); + let proxy_ctx = self.proxy_ctx.clone(); + + // try to sniff protocol + let sniff = [0u8; 32]; + let sensors = self.sensors.clone(); + let h1 = self.h1.clone(); + let h2 = self.h2.clone(); + let tcp = self.tcp.clone(); + let new_service = self.new_service.clone(); + let fut = connection + .peek_future(sniff) + .map_err(|_| ()) + .and_then(move |(connection, sniff, n)| -> Box> { + if let Some(proto) = Protocol::detect(&sniff[..n]) { + let srv_ctx = ServerCtx::new( + &proxy_ctx, + &local_addr, + &remote_addr, + &orig_dst, + control::pb::proxy::common::Protocol::Http, + ); + + // record telemetry + let io = sensors.accept(connection, opened_at, &srv_ctx); + + match proto { + Protocol::Http1 => { + trace!("transparency detected HTTP/1"); + + Box::new(new_service.new_service() + .map_err(|_| ()) + .and_then(move |s| { + let svc = HyperServerSvc::new(s, srv_ctx); + h1.serve_connection(io, svc) + .map(|_| ()) + .map_err(|_| ()) + })) + }, + Protocol::Http2 => { + trace!("transparency detected HTTP/2"); + + let set_ctx = move |request: &mut http::Request<()>| { + request.extensions_mut().insert(srv_ctx.clone()); + }; + Box::new(h2.serve_modified(io, set_ctx).map_err(|_| ())) + } + } + } else { + trace!("transparency did not detect protocol, treating as TCP"); + + let srv_ctx = ServerCtx::new( + &proxy_ctx, + &local_addr, + &remote_addr, + &orig_dst, + control::pb::proxy::common::Protocol::Tcp, + ); + + // record telemetry + let tcp_in = sensors.accept(connection, opened_at, &srv_ctx); + + tcp.serve(tcp_in, srv_ctx) + } + }); + + self.executor.spawn(fut); + } +} + diff --git a/proxy/src/transparency/tcp.rs b/proxy/src/transparency/tcp.rs new file mode 100644 index 000000000..f20d76179 --- /dev/null +++ b/proxy/src/transparency/tcp.rs @@ -0,0 +1,84 @@ +use std::sync::Arc; +use std::time::Duration; + +use futures::{future, Future}; +use tokio_connect::Connect; +use tokio_core::reactor::Handle; +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_io::io::copy; + +use control; +use ctx::transport::{Client as ClientCtx, Server as ServerCtx}; +use telemetry::Sensors; +use timeout::Timeout; +use transport; + +/// TCP Server Proxy +#[derive(Debug, Clone)] +pub struct Proxy { + connect_timeout: Duration, + executor: Handle, + sensors: Sensors, +} + +impl Proxy { + /// Create a new TCP `Proxy`. + pub fn new(connect_timeout: Duration, sensors: Sensors, executor: &Handle) -> Self { + Self { + connect_timeout, + executor: executor.clone(), + sensors, + } + } + + /// Serve a TCP connection, trying to forward it to its destination. + pub fn serve(&self, tcp_in: T, srv_ctx: Arc) -> Box> + where + T: AsyncRead + AsyncWrite + 'static, + { + let orig_dst = srv_ctx.orig_dst_if_not_local(); + + // For TCP, we really have no extra information other than the + // SO_ORIGINAL_DST socket option. If that isn't set, the only thing + // to do is to drop this connection. + let orig_dst = if let Some(orig_dst) = orig_dst { + debug!( + "tcp accepted, forwarding ({}) to {}", + srv_ctx.remote, + orig_dst, + ); + orig_dst + } else { + debug!( + "tcp accepted, no SO_ORIGINAL_DST to forward: remote={}", + srv_ctx.remote, + ); + return Box::new(future::ok(())); + }; + + let client_ctx = ClientCtx::new( + &srv_ctx.proxy, + &orig_dst, + control::pb::proxy::common::Protocol::Tcp, + ); + let c = Timeout::new( + transport::Connect::new(orig_dst, &self.executor), + self.connect_timeout, + &self.executor, + ); + let connect = self.sensors.connect(c, &client_ctx); + + let fut = connect.connect() + .map_err(|e| debug!("tcp connect error: {:?}", e)) + .and_then(move |tcp_out| { + let (in_r, in_w) = tcp_in.split(); + let (out_r, out_w) = tcp_out.split(); + + copy(in_r, out_w) + .join(copy(out_r, in_w)) + .map(|_| ()) + .map_err(|e| debug!("tcp error: {}", e)) + }); + Box::new(fut) + } +} diff --git a/proxy/src/transport/mod.rs b/proxy/src/transport/mod.rs index ebd2acfb7..93137d21b 100644 --- a/proxy/src/transport/mod.rs +++ b/proxy/src/transport/mod.rs @@ -2,4 +2,4 @@ mod connect; mod so_original_dst; pub use self::connect::{Connect, LookupAddressAndConnect, TimeoutConnect, TimeoutError}; -pub use self::so_original_dst::get_original_dst; +pub use self::so_original_dst::{GetOriginalDst, SoOriginalDst}; diff --git a/proxy/src/transport/so_original_dst.rs b/proxy/src/transport/so_original_dst.rs index 493632f2f..d88a8b517 100644 --- a/proxy/src/transport/so_original_dst.rs +++ b/proxy/src/transport/so_original_dst.rs @@ -1,22 +1,34 @@ use std::net::SocketAddr; use tokio_core::net::TcpStream; -#[cfg(not(target_os = "linux"))] -pub fn get_original_dst(_: &TcpStream) -> Option { - debug!("no support for SO_ORIGINAL_DST"); - None +/// A generic way to get the original destination address of a socket. +/// +/// This is especially useful to allow tests to provide a mock implementation. +pub trait GetOriginalDst { + fn get_original_dst(&self, socket: &TcpStream) -> Option; } -// TODO change/remove once https://github.com/tokio-rs/tokio/issues/25 is addressed -#[cfg(target_os = "linux")] -pub fn get_original_dst(sock: &TcpStream) -> Option { - use self::linux; - use std::os::unix::io::AsRawFd; +#[derive(Copy, Clone, Debug)] +pub struct SoOriginalDst; - debug!("get_original_dst {:?}", sock); +impl GetOriginalDst for SoOriginalDst { + #[cfg(not(target_os = "linux"))] + fn get_original_dst(&self, _: &TcpStream) -> Option { + debug!("no support for SO_ORIGINAL_DST"); + None + } - let res = unsafe { linux::so_original_dst(sock.as_raw_fd()) }; - res.ok() + // TODO change/remove once https://github.com/tokio-rs/tokio/issues/25 is addressed + #[cfg(target_os = "linux")] + fn get_original_dst(&self, sock: &TcpStream) -> Option { + use self::linux; + use std::os::unix::io::AsRawFd; + + debug!("get_original_dst {:?}", sock); + + let res = unsafe { linux::so_original_dst(sock.as_raw_fd()) }; + res.ok() + } } #[cfg(target_os = "linux")] diff --git a/proxy/tests/discovery.rs b/proxy/tests/discovery.rs index 8486893fb..6e91faab3 100644 --- a/proxy/tests/discovery.rs +++ b/proxy/tests/discovery.rs @@ -1,6 +1,3 @@ -#[macro_use] -extern crate log; - mod support; use self::support::*; @@ -34,6 +31,29 @@ fn outbound_reconnects_if_controller_stream_ends() { assert_eq!(client.get("/recon"), "nect"); } +#[test] +fn outbound_updates_newer_services() { + let _ = env_logger::init(); + + //TODO: when the support server can listen on both http1 and http2 + //at the same time, do that here + let srv = server::http1().route("/h1", "hello h1").run(); + let ctrl = controller::new() + .destination("test.conduit.local", srv.addr) + .run(); + let proxy = proxy::new().controller(ctrl).outbound(srv).run(); + // the HTTP2 service starts watching first, receiving an addr + // from the controller + let client1 = client::http2(proxy.outbound, "test.conduit.local"); + client1.get("/h2"); // 500, ignore + + // a new HTTP1 service needs to be build now, while the HTTP2 + // service already exists, so make sure previously sent addrs + // get into the newer service + let client2 = client::http1(proxy.outbound, "test.conduit.local"); + assert_eq!(client2.get("/h1"), "hello h1"); +} + #[test] #[ignore] fn outbound_times_out() { diff --git a/proxy/tests/support/client.rs b/proxy/tests/support/client.rs index d49627ba9..4dde60b25 100644 --- a/proxy/tests/support/client.rs +++ b/proxy/tests/support/client.rs @@ -2,84 +2,160 @@ use support::*; use self::futures::sync::{mpsc, oneshot}; use self::tokio_core::net::TcpStream; -use self::tower_h2::client::Error; type Request = http::Request<()>; -type Response = http::Response; -type Sender = mpsc::UnboundedSender<(Request, oneshot::Sender>)>; +type Response = http::Response; +type BodyStream = Box + Send>; +type Sender = mpsc::UnboundedSender<(Request, oneshot::Sender>)>; pub fn new>(addr: SocketAddr, auth: T) -> Client { - Client::new(addr, auth.into()) + http2(addr, auth.into()) +} + +pub fn http1>(addr: SocketAddr, auth: T) -> Client { + Client::new(addr, auth.into(), Run::Http1 { + absolute_uris: false, + }) +} + +// This sends `GET http://foo.com/ HTTP/1.1` instead of just `GET / HTTP/1.1`. +pub fn http1_absolute_uris>(addr: SocketAddr, auth: T) -> Client { + Client::new(addr, auth.into(), Run::Http1 { + absolute_uris: true, + }) +} + +pub fn http2>(addr: SocketAddr, auth: T) -> Client { + Client::new(addr, auth.into(), Run::Http2) +} + +pub fn tcp(addr: SocketAddr) -> tcp::TcpClient { + tcp::client(addr) } -#[derive(Debug)] pub struct Client { authority: String, tx: Sender, + version: http::Version, } impl Client { - pub fn new(addr: SocketAddr, authority: String) -> Client { + fn new(addr: SocketAddr, authority: String, r: Run) -> Client { + let v = match r { + Run::Http1 { .. } => http::Version::HTTP_11, + Run::Http2 => http::Version::HTTP_2, + }; Client { authority, - tx: run(addr), + tx: run(addr, r), + version: v, } } pub fn get(&self, path: &str) -> String { - let (tx, rx) = oneshot::channel(); - let req = Request::builder() - .method("GET") - .uri(format!("http://{}{}", self.authority, path).as_str()) - .version(http::Version::HTTP_2) - .body(()) - .unwrap(); - let _ = self.tx.unbounded_send((req, tx)); - rx.map_err(|_| panic!("client request dropped")) - .and_then(|res| { - let stream = RecvBodyStream(res.unwrap().into_parts().1); - stream.concat2() - }) + let mut req = self.request_builder(path); + let res = self.request(req.method("GET")); + let stream = res.into_parts().1; + stream.concat2() .map(|body| ::std::str::from_utf8(&body).unwrap().to_string()) .wait() .unwrap() } + + pub fn request(&self, builder: &mut http::request::Builder) -> Response { + let (tx, rx) = oneshot::channel(); + let _ = self.tx.unbounded_send((builder.body(()).unwrap(), tx)); + rx.map_err(|_| panic!("client request dropped")) + .wait() + .map(|result| result.unwrap()) + .unwrap() + } + + pub fn request_builder(&self, path: &str) -> http::request::Builder { + let mut b = Request::builder(); + b.uri(format!("http://{}{}", self.authority, path).as_str()) + .version(self.version); + b + } } -fn run(addr: SocketAddr) -> Sender { - let (tx, rx) = mpsc::unbounded::<(Request, oneshot::Sender>)>(); +enum Run { + Http1 { + absolute_uris: bool, + }, + Http2, +} - ::std::thread::Builder::new() - .name("support client".into()) - .spawn(move || { - let mut core = Core::new().unwrap(); - let reactor = core.handle(); +fn run(addr: SocketAddr, version: Run) -> Sender { + let (tx, rx) = mpsc::unbounded::<(Request, oneshot::Sender>)>(); - let conn = Conn(addr, reactor.clone()); - let h2 = tower_h2::Client::::new( - conn, - Default::default(), - reactor.clone(), - ); + ::std::thread::Builder::new().name("support client".into()).spawn(move || { + let mut core = Core::new().unwrap(); + let reactor = core.handle(); - let done = h2.new_service() - .map_err(move |err| println!("connect error ({:?}): {:?}", addr, err)) - .and_then(move |mut h2| { - rx.for_each(move |(req, cb)| { - let fut = h2.call(req).then(|result| { - let _ = cb.send(result); - Ok(()) - }); - reactor.spawn(fut); + let conn = Conn(addr, reactor.clone()); + + let work: Box> = match version { + Run::Http1 { absolute_uris } => { + let client = hyper::Client::configure() + .connector(conn) + .build(&reactor); + Box::new(rx.for_each(move |(req, cb)| { + let mut req = hyper::Request::from(req.map(|()| hyper::Body::empty())); + if absolute_uris { + req.set_proxy(true); + } + let fut = client.request(req).then(move |result| { + let result = result + .map(|res| { + let res = http::Response::from(res); + res.map(|body| -> BodyStream { + Box::new(body.map(|chunk| chunk.into()) + .map_err(|e| e.to_string())) + }) + }) + .map_err(|e| e.to_string()); + let _ = cb.send(result); Ok(()) - }) + }); + reactor.spawn(fut); + Ok(()) }) - .map(|_| ()) - .map_err(|e| println!("client error: {:?}", e)); + .map_err(|e| println!("client error: {:?}", e))) + }, + Run::Http2 => { + let h2 = tower_h2::Client::::new( + conn, + Default::default(), + reactor.clone(), + ); - core.run(done).unwrap(); - }) - .unwrap(); + Box::new(h2.new_service() + .map_err(move |err| println!("connect error ({:?}): {:?}", addr, err)) + .and_then(move |mut h2| { + rx.for_each(move |(req, cb)| { + let fut = h2.call(req).then(|result| { + let result = result + .map(|res| { + res.map(|body| -> BodyStream { + Box::new(RecvBodyStream(body).map_err(|e| format!("{:?}", e))) + }) + }) + .map_err(|e| format!("{:?}", e)); + let _ = cb.send(result); + Ok(()) + }); + reactor.spawn(fut); + Ok(()) + }) + }) + .map(|_| ()) + .map_err(|e| println!("client error: {:?}", e))) + } + }; + + core.run(work).unwrap(); + }).unwrap(); tx } @@ -96,3 +172,16 @@ impl Connect for Conn { Box::new(c) } } + + +impl hyper::client::Service for Conn { + type Request = hyper::Uri; + type Response = TcpStream; + type Future = Box>; + type Error = ::std::io::Error; + fn call(&self, _: hyper::Uri) -> ::Future { + let c = TcpStream::connect(&self.0, &self.1) + .and_then(|tcp| tcp.set_nodelay(true).map(move |_| tcp)); + Box::new(c) + } +} diff --git a/proxy/tests/support/mod.rs b/proxy/tests/support/mod.rs index 5a179d8a0..a43957263 100644 --- a/proxy/tests/support/mod.rs +++ b/proxy/tests/support/mod.rs @@ -4,10 +4,12 @@ extern crate bytes; extern crate conduit_proxy; extern crate futures; extern crate h2; -extern crate http; +pub extern crate http; +extern crate hyper; extern crate prost; extern crate tokio_connect; extern crate tokio_core; +pub extern crate tokio_io; extern crate tower; extern crate tower_h2; extern crate url; @@ -16,10 +18,10 @@ pub extern crate env_logger; use self::bytes::{BigEndian, Bytes, BytesMut}; pub use self::futures::*; use self::futures::sync::oneshot; -use self::http::{HeaderMap, Request}; +pub use self::http::{HeaderMap, Request, Response}; use self::http::header::HeaderValue; use self::tokio_connect::Connect; -use self::tokio_core::net::TcpListener; +use self::tokio_core::net::{TcpListener, TcpStream}; use self::tokio_core::reactor::{Core, Handle}; use self::tower::{NewService, Service}; use self::tower_h2::{Body, RecvBody}; @@ -30,6 +32,7 @@ pub mod client; pub mod controller; pub mod proxy; pub mod server; +mod tcp; pub type Shutdown = oneshot::Sender<()>; pub type ShutdownRx = future::Then< @@ -55,3 +58,7 @@ impl Stream for RecvBodyStream { Ok(Async::Ready(data.map(From::from))) } } + +pub fn s(bytes: &[u8]) -> &str { + ::std::str::from_utf8(bytes.as_ref()).unwrap() +} diff --git a/proxy/tests/support/proxy.rs b/proxy/tests/support/proxy.rs index 5c8552135..dd13f8075 100644 --- a/proxy/tests/support/proxy.rs +++ b/proxy/tests/support/proxy.rs @@ -1,5 +1,7 @@ use support::*; +use std::sync::{Arc, Mutex}; + use support::conduit_proxy::convert::TryFrom; pub fn new() -> Proxy { @@ -60,18 +62,51 @@ impl Proxy { } } +#[derive(Clone, Debug)] +struct MockOriginalDst(Arc>); + +#[derive(Debug, Default)] +struct DstInner { + inbound_orig_addr: Option, + inbound_local_addr: Option, + outbound_orig_addr: Option, + outbound_local_addr: Option, +} + +impl conduit_proxy::GetOriginalDst for MockOriginalDst { + fn get_original_dst(&self, sock: &TcpStream) -> Option { + sock.local_addr() + .ok() + .and_then(|local| { + let inner = self.0.lock().unwrap(); + if inner.inbound_local_addr == Some(local) { + inner.inbound_orig_addr + } else if inner.outbound_local_addr == Some(local) { + inner.outbound_orig_addr + } else { + None + } + }) + } +} + fn run(proxy: Proxy) -> Listening { use self::conduit_proxy::config; let controller = proxy.controller.expect("proxy controller missing"); let inbound = proxy.inbound; let outbound = proxy.outbound; + let mut mock_orig_dst = DstInner::default(); let mut env = config::TestEnv::new(); env.put(config::ENV_CONTROL_URL, format!("tcp://{}", controller.addr)); env.put(config::ENV_PRIVATE_LISTENER, "tcp://127.0.0.1:0".to_owned()); if let Some(ref inbound) = inbound { env.put(config::ENV_PRIVATE_FORWARD, format!("tcp://{}", inbound.addr)); + mock_orig_dst.inbound_orig_addr = Some(inbound.addr); + } + if let Some(ref outbound) = outbound { + mock_orig_dst.outbound_orig_addr = Some(outbound.addr); } env.put(config::ENV_PUBLIC_LISTENER, "tcp://127.0.0.1:0".to_owned()); env.put(config::ENV_CONTROL_LISTENER, "tcp://127.0.0.1:0".to_owned()); @@ -86,12 +121,20 @@ fn run(proxy: Proxy) -> Listening { config.metrics_flush_interval = dur; } - let main = conduit_proxy::Main::new(config); + let mock_orig_dst = MockOriginalDst(Arc::new(Mutex::new(mock_orig_dst))); + + let main = conduit_proxy::Main::new(config, mock_orig_dst.clone()); let control_addr = main.control_addr(); let inbound_addr = main.inbound_addr(); let outbound_addr = main.outbound_addr(); + { + let mut inner = mock_orig_dst.0.lock().unwrap(); + inner.inbound_local_addr = Some(inbound_addr); + inner.outbound_local_addr = Some(outbound_addr); + } + let (running_tx, running_rx) = shutdown_signal(); let (tx, rx) = shutdown_signal(); diff --git a/proxy/tests/support/server.rs b/proxy/tests/support/server.rs index ce76ab2c1..66890f36d 100644 --- a/proxy/tests/support/server.rs +++ b/proxy/tests/support/server.rs @@ -4,78 +4,123 @@ use std::sync::Arc; use support::*; pub fn new() -> Server { - Server::new() + http2() +} + +pub fn http1() -> Server { + Server::http1() +} + +pub fn http2() -> Server { + Server::http2() +} + +pub fn tcp() -> tcp::TcpServer { + tcp::server() } #[derive(Debug)] pub struct Server { - routes: HashMap, + routes: HashMap, + version: Run, } #[derive(Debug)] pub struct Listening { pub addr: SocketAddr, - shutdown: Shutdown, + pub(super) shutdown: Shutdown, } impl Server { - pub fn new() -> Self { + fn new(run: Run) -> Self { Server { routes: HashMap::new(), + version: run, } } + fn http1() -> Self { + Server::new(Run::Http1) + } + + fn http2() -> Self { + Server::new(Run::Http2) + } pub fn route(mut self, path: &str, resp: &str) -> Self { - self.routes.insert(path.into(), resp.into()); + self.routes.insert(path.into(), Route::string(resp)); + self + } + + pub fn route_fn(mut self, path: &str, cb: F) -> Self + where + F: Fn(Request<()>) -> Response + Send + 'static, + { + self.routes.insert(path.into(), Route(Box::new(cb))); self } pub fn run(self) -> Listening { let (tx, rx) = shutdown_signal(); let (addr_tx, addr_rx) = oneshot::channel(); - ::std::thread::Builder::new() - .name("support server".into()) - .spawn(move || { - let mut core = Core::new().unwrap(); - let reactor = core.handle(); + ::std::thread::Builder::new().name("support server".into()).spawn(move || { + let mut core = Core::new().unwrap(); + let reactor = core.handle(); - let h2 = tower_h2::Server::new( - NewSvc(Arc::new(self.routes)), - Default::default(), - reactor.clone(), - ); + let new_svc = NewSvc(Arc::new(self.routes)); - let addr = ([127, 0, 0, 1], 0).into(); - let bind = TcpListener::bind(&addr, &reactor).expect("bind"); + let srv: Box Box>> = match self.version { + Run::Http1 => { + let h1 = hyper::server::Http::::new(); - let local_addr = bind.local_addr().expect("local_addr"); - info!("bound listener, sending addr: {}", local_addr); - let _ = addr_tx.send(local_addr); + Box::new(move |sock| { + let h1_clone = h1.clone(); + let conn = new_svc.new_service() + .from_err() + .and_then(move |svc| h1_clone.serve_connection(sock, svc)) + .map(|_| ()) + .map_err(|e| println!("server h1 error: {}", e)); + Box::new(conn) + }) + }, + Run::Http2 => { + let h2 = tower_h2::Server::new( + new_svc, + Default::default(), + reactor.clone(), + ); + Box::new(move |sock| { + let conn = h2.serve(sock) + .map_err(|e| println!("server h2 error: {:?}", e)); + Box::new(conn) + }) + }, + }; - let serve = bind.incoming() - .fold((h2, reactor), |(h2, reactor), (sock, _)| { - if let Err(e) = sock.set_nodelay(true) { - return Err(e); - } + let addr = ([127, 0, 0, 1], 0).into(); + let bind = TcpListener::bind(&addr, &reactor).expect("bind"); - let serve = h2.serve(sock); - reactor.spawn(serve.map_err(|e| println!("server error: {:?}", e))); + let local_addr = bind.local_addr().expect("local_addr"); + let _ = addr_tx.send(local_addr); - Ok((h2, reactor)) - }); + let serve = bind.incoming() + .fold((srv, reactor), |(srv, reactor), (sock, _)| { + if let Err(e) = sock.set_nodelay(true) { + return Err(e); + } + reactor.spawn(srv(sock)); - core.handle().spawn( - serve - .map(|_| ()) - .map_err(|e| println!("server error: {}", e)), - ); + Ok((srv, reactor)) + }); - info!("running"); - core.run(rx).unwrap(); - }) - .unwrap(); + core.handle().spawn( + serve + .map(|_| ()) + .map_err(|e| println!("server error: {}", e)), + ); + + core.run(rx).unwrap(); + }).unwrap(); - info!("awaiting listening addr"); let addr = addr_rx.wait().expect("addr"); Listening { @@ -85,7 +130,11 @@ impl Server { } } -type Response = http::Response; +#[derive(Debug)] +enum Run { + Http1, + Http2, +} struct RspBody(Option); @@ -115,31 +164,49 @@ impl Body for RspBody { } } +struct Route(Box) -> Response + Send>); + +impl Route { + fn string(body: &str) -> Route { + let body = body.to_owned(); + Route(Box::new(move |_| { + http::Response::builder() + .status(200) + .body(body.clone()) + .unwrap() + })) + } +} + +impl ::std::fmt::Debug for Route { + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + f.write_str("Route") + } +} + #[derive(Debug)] -struct Svc(Arc>); +struct Svc(Arc>); impl Service for Svc { type Request = Request; - type Response = Response; + type Response = Response; type Error = h2::Error; - type Future = future::FutureResult; + type Future = future::FutureResult; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) } fn call(&mut self, req: Self::Request) -> Self::Future { - let mut rsp = http::Response::builder(); - rsp.version(http::Version::HTTP_2); - - let path = req.uri().path(); - let rsp = match self.0.get(path) { - Some(body) => { - let body = RspBody::new(body.as_bytes().into()); - rsp.status(200).body(body).unwrap() + let rsp = match self.0.get(req.uri().path()) { + Some(route) => { + (route.0)(req.map(|_| ())) + .map(|s| RspBody::new(s.as_bytes().into())) } None => { - println!("server 404: {:?}", path); + println!("server 404: {:?}", req.uri().path()); + let mut rsp = http::Response::builder(); + rsp.version(http::Version::HTTP_2); let body = RspBody::empty(); rsp.status(404).body(body).unwrap() } @@ -148,11 +215,37 @@ impl Service for Svc { } } +impl hyper::server::Service for Svc { + type Request = hyper::server::Request; + type Response = hyper::server::Response; + type Error = hyper::Error; + type Future = future::FutureResult, hyper::Error>; + + fn call(&self, req: Self::Request) -> Self::Future { + + let rsp = match self.0.get(req.uri().path()) { + Some(route) => { + (route.0)(Request::from(req).map(|_| ())) + .map(|s| hyper::Body::from(s)) + .into() + } + None => { + println!("server 404: {:?}", req.uri().path()); + let rsp = hyper::server::Response::new(); + let body = hyper::Body::empty(); + rsp.with_status(hyper::NotFound) + .with_body(body) + } + }; + future::ok(rsp) + } +} + #[derive(Debug)] -struct NewSvc(Arc>); +struct NewSvc(Arc>); impl NewService for NewSvc { type Request = Request; - type Response = Response; + type Response = Response; type Error = h2::Error; type InitError = ::std::io::Error; type Service = Svc; diff --git a/proxy/tests/support/tcp.rs b/proxy/tests/support/tcp.rs new file mode 100644 index 000000000..87256b4b0 --- /dev/null +++ b/proxy/tests/support/tcp.rs @@ -0,0 +1,190 @@ +use support::*; + +use std::collections::VecDeque; +use std::io; + +use self::futures::sync::{mpsc, oneshot}; +use self::tokio_core::net::TcpStream; + +type TcpSender = mpsc::UnboundedSender>; +type TcpConnSender = mpsc::UnboundedSender<(Option>, oneshot::Sender>>>)>; + +pub fn client(addr: SocketAddr) -> TcpClient { + let tx = run_client(addr); + TcpClient { + tx, + } +} + +pub fn server() -> TcpServer { + TcpServer { + accepts: VecDeque::new(), + } +} + +pub struct TcpClient { + tx: TcpSender, +} + +pub struct TcpServer { + accepts: VecDeque) -> Vec + Send>>, +} + +pub struct TcpConn { + tx: TcpConnSender, +} + +impl TcpClient { + pub fn connect(&self) -> TcpConn { + let (tx, rx) = oneshot::channel(); + let _ = self.tx.unbounded_send(tx); + let tx = rx.map_err(|_| panic!("tcp connect dropped")) + .wait() + .unwrap(); + TcpConn { + tx, + } + } +} + +impl TcpServer { + pub fn accept(mut self, cb: F) -> Self + where + F: Fn(Vec) -> U + Send + 'static, + U: Into>, + { + self.accepts.push_back(Box::new(move |v| cb(v).into())); + self + } + + pub fn run(self) -> server::Listening { + run_server(self) + } +} + +impl TcpConn { + pub fn read(&self) -> Vec { + self.try_read().expect("read") + } + + pub fn try_read(&self) -> io::Result> { + let (tx, rx) = oneshot::channel(); + let _ = self.tx.unbounded_send((None, tx)); + rx.map_err(|_| panic!("tcp read dropped")) + .map(|res| res.map(|opt| opt.unwrap())) + .wait() + .unwrap() + } + + pub fn write>>(&self, buf: T) { + let (tx, rx) = oneshot::channel(); + let _ = self.tx.unbounded_send((Some(buf.into()), tx)); + rx.map_err(|_| panic!("tcp write dropped")) + .map(|rsp| assert!(rsp.unwrap().is_none())) + .wait() + .unwrap() + } +} + + +fn run_client(addr: SocketAddr) -> TcpSender { + let (tx, rx) = mpsc::unbounded(); + ::std::thread::Builder::new().name("support client".into()).spawn(move || { + let mut core = Core::new().unwrap(); + let handle = core.handle(); + + let work = rx.for_each(|cb: oneshot::Sender<_>| { + let fut = TcpStream::connect(&addr, &handle) + .map_err(|e| panic!("connect error: {}", e)) + .and_then(move |tcp| { + let (tx, rx) = mpsc::unbounded(); + cb.send(tx).unwrap(); + rx.fold(tcp, |tcp, (action, cb): (Option>, oneshot::Sender>>>)| { + let f: Box> = match action { + None => { + Box::new(tokio_io::io::read(tcp, vec![0; 1024]) + .then(move |res| { + match res { + Ok((tcp, mut vec, n)) => { + vec.truncate(n); + cb.send(Ok(Some(vec))).unwrap(); + Ok(tcp) + } + Err(e) => { + cb.send(Err(e)).unwrap(); + Err(()) + } + } + })) + }, + Some(vec) => { + Box::new(tokio_io::io::write_all(tcp, vec) + .then(move |res| { + match res { + Ok((tcp, _)) => { + cb.send(Ok(None)).unwrap(); + Ok(tcp) + }, + Err(e) => { + cb.send(Err(e)).unwrap(); + Err(()) + } + } + })) + } + }; + f + }) + .map(|_| ()) + .map_err(|_| ()) + }); + + handle.spawn(fut); + Ok(()) + + }).map_err(|e| println!("client error: {:?}", e)); + core.run(work).unwrap(); + }).unwrap(); + tx +} + +fn run_server(tcp: TcpServer) -> server::Listening { + let (tx, rx) = shutdown_signal(); + let (addr_tx, addr_rx) = oneshot::channel(); + ::std::thread::Builder::new().name("support server".into()).spawn(move || { + let mut core = Core::new().unwrap(); + let reactor = core.handle(); + + let addr = ([127, 0, 0, 1], 0).into(); + let bind = TcpListener::bind(&addr, &reactor).expect("bind"); + + let local_addr = bind.local_addr().expect("local_addr"); + let _ = addr_tx.send(local_addr); + + let mut accepts = tcp.accepts; + + let work = bind.incoming().for_each(move |(sock, _)| { + let cb = accepts.pop_front().expect("no more accepts"); + + let fut = tokio_io::io::read(sock, vec![0; 1024]) + .and_then(move |(sock, mut vec, n)| { + vec.truncate(n); + let write = cb(vec); + tokio_io::io::write_all(sock, write) + }) + .map(|_| ()) + .map_err(|e| panic!("tcp server error: {}", e)); + + reactor.spawn(fut); + Ok(()) + }); + + core.run(work).unwrap(); + }).unwrap(); + + let addr = addr_rx.wait().expect("addr"); + server::Listening { + addr, + shutdown: tx, + } +} diff --git a/proxy/tests/transparency.rs b/proxy/tests/transparency.rs new file mode 100644 index 000000000..8d0cd503a --- /dev/null +++ b/proxy/tests/transparency.rs @@ -0,0 +1,290 @@ +mod support; +use self::support::*; + +#[test] +fn outbound_http1() { + let _ = env_logger::init(); + + let srv = server::http1().route("/", "hello h1").run(); + let ctrl = controller::new() + .destination("test.conduit.local", srv.addr) + .run(); + let proxy = proxy::new().controller(ctrl).outbound(srv).run(); + let client = client::http1(proxy.outbound, "test.conduit.local"); + + assert_eq!(client.get("/"), "hello h1"); +} + +#[test] +fn inbound_http1() { + let _ = env_logger::init(); + + let srv = server::http1().route("/", "hello h1").run(); + let ctrl = controller::new().run(); + let proxy = proxy::new() + .controller(ctrl) + .inbound(srv) + .run(); + let client = client::http1(proxy.inbound, "test.conduit.local"); + + assert_eq!(client.get("/"), "hello h1"); +} + +#[test] +fn http1_connect_not_supported() { + let _ = env_logger::init(); + + let srv = server::tcp() + .run(); + let ctrl = controller::new().run(); + let proxy = proxy::new() + .controller(ctrl) + .inbound(srv) + .run(); + + let client = client::tcp(proxy.inbound); + + let tcp_client = client.connect(); + tcp_client.write("CONNECT foo.bar:443 HTTP/1.1\r\nHost: foo.bar:443\r\n\r\n"); + + let expected = "HTTP/1.1 502 Bad Gateway\r\n"; + assert_eq!(s(&tcp_client.read()[..expected.len()]), expected); +} + +#[test] +fn http1_removes_connection_headers() { + let _ = env_logger::init(); + + let srv = server::http1() + .route_fn("/", |req| { + assert!(!req.headers().contains_key("x-foo-bar")); + Response::builder() + .header("x-server-quux", "lorem ipsum") + .header("connection", "close, x-server-quux") + .body("".into()) + .unwrap() + }) + .run(); + let ctrl = controller::new().run(); + let proxy = proxy::new() + .controller(ctrl) + .inbound(srv) + .run(); + let client = client::http1(proxy.inbound, "test.conduit.local"); + + let res = client.request(client.request_builder("/") + .header("x-foo-bar", "baz") + .header("connection", "x-foo-bar, close")); + + assert_eq!(res.status(), http::StatusCode::OK); + assert!(!res.headers().contains_key("x-server-quux")); +} + +#[test] +fn http10_with_host() { + let _ = env_logger::init(); + + let host = "test.conduit.local"; + let srv = server::http1() + .route_fn("/", move |req| { + assert_eq!(req.version(), http::Version::HTTP_10); + assert_eq!(req.headers().get("host").unwrap(), host); + Response::builder() + .version(http::Version::HTTP_10) + .body("".into()) + .unwrap() + }) + .run(); + let ctrl = controller::new().run(); + let proxy = proxy::new() + .controller(ctrl) + .inbound(srv) + .run(); + let client = client::http1(proxy.inbound, host); + + let res = client.request(client.request_builder("/") + .version(http::Version::HTTP_10) + .header("host", host)); + + assert_eq!(res.status(), http::StatusCode::OK); + assert_eq!(res.version(), http::Version::HTTP_10); +} + +#[test] +fn http10_without_host() { + let _ = env_logger::init(); + + let srv = server::http1() + .route_fn("/", move |req| { + assert_eq!(req.version(), http::Version::HTTP_10); + Response::builder() + .version(http::Version::HTTP_10) + .body("".into()) + .unwrap() + }) + .run(); + let ctrl = controller::new() + .destination(&srv.addr.to_string(), srv.addr) + .run(); + let proxy = proxy::new() + .controller(ctrl) + .outbound(srv) + .run(); + let client = client::http1(proxy.outbound, "foo.bar"); + + let res = client.request(client.request_builder("/") + .version(http::Version::HTTP_10) + .header("host", "")); + + assert_eq!(res.status(), http::StatusCode::OK); + assert_eq!(res.version(), http::Version::HTTP_10); +} + +#[test] +fn http11_absolute_uri_differs_from_host() { + let _ = env_logger::init(); + + let host = "test.conduit.local"; + let srv = server::http1() + .route_fn("/", move |req| { + assert_eq!(req.version(), http::Version::HTTP_11); + assert_eq!(req.headers().get("host").unwrap(), host); + Response::builder() + .body("".into()) + .unwrap() + }) + .run(); + let ctrl = controller::new().run(); + let proxy = proxy::new() + .controller(ctrl) + .inbound(srv) + .run(); + let client = client::http1_absolute_uris(proxy.inbound, host); + + let res = client.request(client.request_builder("/") + .version(http::Version::HTTP_11) + .header("host", "foo.bar")); + + assert_eq!(res.status(), http::StatusCode::OK); + assert_eq!(res.version(), http::Version::HTTP_11); +} + +#[test] +fn outbound_tcp() { + let _ = env_logger::init(); + + let msg1 = "custom tcp hello"; + let msg2 = "custom tcp bye"; + + let srv = server::tcp() + .accept(move |read| { + assert_eq!(read, msg1.as_bytes()); + msg2 + }) + .run(); + let ctrl = controller::new().run(); + let proxy = proxy::new() + .controller(ctrl) + .outbound(srv) + .run(); + + let client = client::tcp(proxy.outbound); + + let tcp_client = client.connect(); + + tcp_client.write(msg1); + assert_eq!(tcp_client.read(), msg2.as_bytes()); +} + +#[test] +fn inbound_tcp() { + let _ = env_logger::init(); + + let msg1 = "custom tcp hello"; + let msg2 = "custom tcp bye"; + + let srv = server::tcp() + .accept(move |read| { + assert_eq!(read, msg1.as_bytes()); + msg2 + }) + .run(); + let ctrl = controller::new().run(); + let proxy = proxy::new() + .controller(ctrl) + .inbound(srv) + .run(); + + let client = client::tcp(proxy.inbound); + + let tcp_client = client.connect(); + + tcp_client.write(msg1); + assert_eq!(tcp_client.read(), msg2.as_bytes()); +} + +#[test] +fn tcp_with_no_orig_dst() { + let _ = env_logger::init(); + + let srv = server::tcp().run(); + let ctrl = controller::new().run(); + let proxy = proxy::new() + .controller(ctrl) + .inbound(srv) + .run(); + + // no outbound configured for proxy + let client = client::tcp(proxy.outbound); + + let tcp_client = client.connect(); + tcp_client.write("custom tcp hello"); + + assert!(tcp_client.try_read().is_err()); +} + +#[test] +fn http11_upgrade_not_supported() { + let _ = env_logger::init(); + + // our h1 proxy will strip the Connection header + // and headers it mentions + let msg1 = "\ + GET /chat HTTP/1.1\r\n\ + Host: foo.bar\r\n\ + Connection: Upgrade\r\n\ + Upgrade: websocket\r\n\ + \r\n\ + "; + + // but let's pretend the server tries to upgrade + // anyways + let msg2 = "\ + HTTP/1.1 101 Switching Protocols\r\n\ + Upgrade: websocket\r\n\ + Connection: Upgrade\r\n\ + \r\n\ + "; + + let srv = server::tcp() + .accept(move |read| { + let head = s(&read); + assert!(!head.contains("Upgrade: websocket")); + msg2 + }) + .run(); + let ctrl = controller::new().run(); + let proxy = proxy::new() + .controller(ctrl) + .inbound(srv) + .run(); + + let client = client::tcp(proxy.inbound); + + let tcp_client = client.connect(); + + tcp_client.write(msg1); + + let expected = "HTTP/1.1 502 Bad Gateway\r\n"; + assert_eq!(s(&tcp_client.read()[..expected.len()]), expected); +} diff --git a/tower-h2/src/server/mod.rs b/tower-h2/src/server/mod.rs index 6a2b97af2..e9ec50e30 100644 --- a/tower-h2/src/server/mod.rs +++ b/tower-h2/src/server/mod.rs @@ -11,7 +11,6 @@ use tower::{NewService, Service}; use std::marker::PhantomData; /// Attaches service implementations to h2 connections. -#[derive(Clone)] pub struct Server where S: NewService, B: Body, @@ -153,6 +152,23 @@ where S: NewService, Response = Response>, } } +// B doesn't need to be Clone, it's just a marker type. +impl Clone for Server +where + S: NewService + Clone, + E: Clone, + B: Body, +{ + fn clone(&self) -> Self { + Server { + new_service: self.new_service.clone(), + executor: self.executor.clone(), + builder: self.builder.clone(), + _p: PhantomData, + } + } +} + // ===== impl Connection ===== impl Connection