proxy: add transparent protocol detection and handling

The proxy will now try to detect what protocol new connections are
using, and route them accordingly. Specifically:

- HTTP/2 stays the same.
- HTTP/1 is now accepted, and will try to send an HTTP/1 request
  to the target.
- If neither HTTP/1 nor 2, assume a TCP stream and simply forward
  between the source and destination.

* tower-h2: fix Server Clone bounds
* proxy: implement Async{Read,Write} extra methods for Connection

Closes #130 
Closes #131
This commit is contained in:
Sean McArthur 2018-01-23 16:14:07 -08:00 committed by GitHub
parent cb6c2eab16
commit 1e9ff8be03
31 changed files with 2338 additions and 354 deletions

250
Cargo.lock generated
View File

@ -40,11 +40,25 @@ dependencies = [
"libc 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "base64"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"safemem 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "bitflags"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "bitflags"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "byteorder"
version = "1.1.0"
@ -115,10 +129,12 @@ dependencies = [
"futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-mpsc-lossy 0.1.3",
"h2 0.1.0 (git+https://github.com/carllerche/h2)",
"http 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"http 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"httparse 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
"hyper 0.11.15 (registry+https://github.com/rust-lang/crates.io-index)",
"ipnet 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
"ns-dns-tokio 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"ordermap 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)",
"prost 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
@ -126,7 +142,7 @@ dependencies = [
"prost-types 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
"quickcheck 0.4.2 (git+https://github.com/BurntSushi/quickcheck?rev=a1658ce)",
"tokio-connect 0.1.0 (git+https://github.com/carllerche/tokio-connect)",
"tokio-core 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-io 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"tower 0.1.0 (git+https://github.com/tower-rs/tower)",
"tower-balance 0.1.0 (git+https://github.com/tower-rs/tower)",
@ -185,7 +201,7 @@ dependencies = [
"byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-core 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -203,7 +219,7 @@ name = "env_logger"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
"regex 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -242,6 +258,15 @@ dependencies = [
"fuchsia-zircon-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "fuchsia-zircon"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bitflags 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "fuchsia-zircon-sys"
version = "0.2.0"
@ -250,6 +275,11 @@ dependencies = [
"bitflags 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "fuchsia-zircon-sys"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "futures"
version = "0.1.17"
@ -263,6 +293,15 @@ dependencies = [
"futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "futures-cpupool"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "futures-mpsc-lossy"
version = "0.1.3"
@ -279,8 +318,8 @@ dependencies = [
"bytes 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
"fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
"http 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
"http 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
"ordermap 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)",
"slab 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"string 0.1.0 (git+https://github.com/carllerche/string)",
@ -297,13 +336,42 @@ dependencies = [
[[package]]
name = "http"
version = "0.1.1"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bytes 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
"fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "httparse"
version = "1.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "hyper"
version = "0.11.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"base64 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
"http 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"httparse 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
"language-tags 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
"mime 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"relay 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-io 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-proto 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"unicase 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "idna"
version = "0.1.4"
@ -358,6 +426,11 @@ dependencies = [
"winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "language-tags"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "lazy_static"
version = "0.2.10"
@ -365,7 +438,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "lazycell"
version = "0.5.1"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
@ -386,8 +459,19 @@ dependencies = [
[[package]]
name = "log"
version = "0.3.8"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "log"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"cfg-if 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "matches"
@ -402,6 +486,14 @@ dependencies = [
"libc 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "mime"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"unicase 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "miniz-sys"
version = "0.1.10"
@ -413,16 +505,16 @@ dependencies = [
[[package]]
name = "mio"
version = "0.6.11"
version = "0.6.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"fuchsia-zircon 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"fuchsia-zircon-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"iovec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"lazycell 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
"lazycell 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
"miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"net2 0.2.31 (registry+https://github.com/rust-lang/crates.io-index)",
"slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -475,7 +567,7 @@ dependencies = [
"abstract-ns 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"domain 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-core 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -510,6 +602,14 @@ name = "num-traits"
version = "0.1.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "num_cpus"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"libc 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "openssl-probe"
version = "0.1.1"
@ -579,7 +679,7 @@ dependencies = [
"env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
"heck 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"itertools 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
"multimap 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"petgraph 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)",
"prost 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
@ -620,7 +720,7 @@ version = "0.4.2"
source = "git+https://github.com/BurntSushi/quickcheck?rev=a1658ce#a1658ce9fc9ab41fd3aa1faeaa326fcf28dfcd45"
dependencies = [
"env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -660,11 +760,24 @@ name = "regex-syntax"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "relay"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "rustc-demangle"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "safemem"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "scoped-tls"
version = "0.1.0"
@ -715,6 +828,11 @@ name = "slab"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "smallvec"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "socket2"
version = "0.2.4"
@ -750,6 +868,11 @@ dependencies = [
"unicode-xid 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "take"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "tempdir"
version = "0.3.5"
@ -789,14 +912,14 @@ dependencies = [
[[package]]
name = "tokio-core"
version = "0.1.10"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bytes 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
"iovec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
"mio 0.6.11 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
"mio 0.6.12 (registry+https://github.com/rust-lang/crates.io-index)",
"scoped-tls 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"slab 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-io 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
@ -809,7 +932,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bytes 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio-proto"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
"net2 0.2.31 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)",
"slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"smallvec 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"take 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-io 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tokio-service"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -857,12 +1005,12 @@ dependencies = [
"env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
"h2 0.1.0 (git+https://github.com/carllerche/h2)",
"http 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
"http 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
"prost 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
"prost-derive 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-connect 0.1.0 (git+https://github.com/carllerche/tokio-connect)",
"tokio-core 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
"tower 0.1.0 (git+https://github.com/tower-rs/tower)",
"tower-h2 0.1.3",
"tower-router 0.1.0 (git+https://github.com/tower-rs/tower)",
@ -883,13 +1031,13 @@ dependencies = [
"bytes 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
"prost 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
"prost-derive 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.21 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_derive 1.0.21 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-core 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
"tower 0.1.0 (git+https://github.com/tower-rs/tower)",
"tower-grpc 0.1.3",
"tower-grpc-build 0.1.3",
@ -904,11 +1052,11 @@ dependencies = [
"env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
"h2 0.1.0 (git+https://github.com/carllerche/h2)",
"http 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
"http 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
"string 0.1.0 (git+https://github.com/carllerche/string)",
"tokio-connect 0.1.0 (git+https://github.com/carllerche/tokio-connect)",
"tokio-core 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-io 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"tower 0.1.0 (git+https://github.com/tower-rs/tower)",
]
@ -950,6 +1098,14 @@ dependencies = [
"tower 0.1.0 (git+https://github.com/tower-rs/tower)",
]
[[package]]
name = "unicase"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"version_check 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "unicode-bidi"
version = "0.3.4"
@ -1001,6 +1157,11 @@ name = "vcpkg"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "version_check"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "void"
version = "1.0.2"
@ -1042,7 +1203,9 @@ dependencies = [
"checksum aho-corasick 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)" = "500909c4f87a9e52355b26626d890833e9e1d53ac566db76c36faa984b889699"
"checksum backtrace 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "8709cc7ec06f6f0ae6c2c7e12f6ed41540781f72b488d83734978295ceae182e"
"checksum backtrace-sys 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)" = "44585761d6161b0f57afc49482ab6bd067e4edef48c12a152c237eb0203f7661"
"checksum base64 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "229d032f1a99302697f10b27167ae6d03d49d032e6a8e2550e8d3fc13356d2b4"
"checksum bitflags 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "aad18937a628ec6abcd26d1489012cc0e18c21798210f491af69ded9b881106d"
"checksum bitflags 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b3c30d3802dfb7281680d6285f2ccdaa8c2d8fee41f93805dba5c4cf50dc23cf"
"checksum byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ff81738b726f5d099632ceaffe7fb65b90212e8dce59d518729e7e8634032d3d"
"checksum bytes 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)" = "d828f97b58cc5de3e40c421d0cf2132d6b2da4ee0e11b8632fa838f0f9333ad6"
"checksum bzip2 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c3eafc42c44e0d827de6b1c131175098fe7fb53b8ce8a47e65cb3ea94688be24"
@ -1062,12 +1225,17 @@ dependencies = [
"checksum flate2 0.2.20 (registry+https://github.com/rust-lang/crates.io-index)" = "e6234dd4468ae5d1e2dbb06fe2b058696fdc50a339c68a393aefbf00bc81e423"
"checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3"
"checksum fuchsia-zircon 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f6c0581a4e363262e52b87f59ee2afe3415361c6ec35e665924eb08afe8ff159"
"checksum fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82"
"checksum fuchsia-zircon-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "43f3795b4bae048dc6123a6b972cadde2e676f9ded08aef6bb77f5f157684a82"
"checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
"checksum futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)" = "118b49cac82e04121117cbd3121ede3147e885627d82c4546b87c702debb90c1"
"checksum futures-borrow 0.1.0 (git+https://github.com/carllerche/better-future)" = "<none>"
"checksum futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "ab90cde24b3319636588d0c35fe03b1333857621051837ed769faefb4c2162e4"
"checksum h2 0.1.0 (git+https://github.com/carllerche/h2)" = "<none>"
"checksum heck 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ea04fa3ead4e05e51a7c806fc07271fdbde4e246a6c6d1efd52e72230b771b82"
"checksum http 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7fa2bbed743b54e56a0f1afa2d6d6eeb195383a60fb733eec4f8107c47bd4576"
"checksum http 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "bf8217d8829cc05dedadc08b4bc0684e5e3fbba1126c5edc680af49053fa230c"
"checksum httparse 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "c2f407128745b78abc95c0ffbe4e5d37427fdc0d45470710cfef8c44522a2e37"
"checksum hyper 0.11.15 (registry+https://github.com/rust-lang/crates.io-index)" = "4d6105c5eeb03068b10ff34475a0d166964f98e7b9777cc34b342a225af9b87c"
"checksum idna 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "014b298351066f1512874135335d62a789ffe78a9974f94b43ed5621951eaf7d"
"checksum iovec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b6e8b9c2247fcf6c6a1151f1156932be5606c9fd6f55a2d7f9fc1cb29386b2f7"
"checksum ipnet 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "51268c3a27ad46afd1cca0bbf423a5be2e9fd3e6a7534736c195f0f834b763ef"
@ -1075,15 +1243,18 @@ dependencies = [
"checksum itertools 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "2c52051d3fd3b505796a0ee90f2e5ec43213808585e8adc4d0182492cf62751a"
"checksum itoa 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "8324a32baf01e2ae060e9de58ed0bc2320c9a2833491ee36cd3b4c414de4db8c"
"checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
"checksum language-tags 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a91d884b6667cd606bb5a69aa0c99ba811a115fc68915e7056ec08a46e93199a"
"checksum lazy_static 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)" = "236eb37a62591d4a41a89b7763d7de3e06ca02d5ab2815446a8bae5d2f8c2d57"
"checksum lazycell 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3b585b7a6811fb03aa10e74b278a0f00f8dd9b45dc681f148bb29fa5cb61859b"
"checksum lazycell 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a6f08839bc70ef4a3fe1d566d5350f519c5912ea86be0df1740a7d247c7fc0ef"
"checksum libc 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)" = "5ba3df4dcb460b9dfbd070d41c94c19209620c191b0340b929ce748a2bcd42d2"
"checksum libz-sys 1.0.18 (registry+https://github.com/rust-lang/crates.io-index)" = "87f737ad6cc6fd6eefe3d9dc5412f1573865bded441300904d2f42269e140f16"
"checksum log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)" = "880f77541efa6e5cc74e76910c9884d9859683118839d6a1dc3b11e63512565b"
"checksum log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "e19e8d5c34a3e0e2223db8e060f9e8264aeeb5c5fc64a4ee9965c062211c024b"
"checksum log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "89f010e843f2b1a31dbd316b3b8d443758bc634bed37aabade59c686d644e0a2"
"checksum matches 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "100aabe6b8ff4e4a7e32c1c13523379802df0772b82466207ac25b013f193376"
"checksum memchr 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "148fab2e51b4f1cfc66da2a7c32981d1d3c083a803978268bb11fe4b86925e7a"
"checksum mime 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e2e00e17be181010a91dbfefb01660b17311059dc8c7f48b9017677721e732bd"
"checksum miniz-sys 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "609ce024854aeb19a0ef7567d348aaa5a746b32fb72e336df7fcc16869d7e2b4"
"checksum mio 0.6.11 (registry+https://github.com/rust-lang/crates.io-index)" = "0e8411968194c7b139e9105bc4ae7db0bae232af087147e72f0616ebf5fdb9cb"
"checksum mio 0.6.12 (registry+https://github.com/rust-lang/crates.io-index)" = "75f72a93f046f1517e3cfddc0a096eb756a2ba727d36edc8227dee769a50a9b0"
"checksum miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919"
"checksum msdos_time 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "65ba9d75bcea84e07812618fedf284a64776c2f2ea0cad6bca7f69739695a958"
"checksum multimap 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2eb04b9f127583ed176e163fb9ec6f3e793b87e21deedd5734a69386a18a0151"
@ -1093,6 +1264,7 @@ dependencies = [
"checksum num-integer 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)" = "d1452e8b06e448a07f0e6ebb0bb1d92b8890eea63288c0b627331d53514d0fba"
"checksum num-iter 0.1.34 (registry+https://github.com/rust-lang/crates.io-index)" = "7485fcc84f85b4ecd0ea527b14189281cf27d60e583ae65ebc9c088b13dffe01"
"checksum num-traits 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)" = "99843c856d68d8b4313b03a17e33c4bb42ae8f6610ea81b28abe076ac721b9b0"
"checksum num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c51a3322e4bca9d212ad9a158a02abc6934d005490c054a2778df73a70aa0a30"
"checksum openssl-probe 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d98df0270d404ccd3c050a41d579c52d1db15375168bb3471e04ec0f5f378daf"
"checksum openssl-sys 0.9.20 (registry+https://github.com/rust-lang/crates.io-index)" = "0ad395f1cee51b64a8d07cc8063498dc7554db62d5f3ca87a67f4eed2791d0c8"
"checksum ordermap 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)" = "b81cf3b8cb96aa0e73bbedfcdc9708d09fec2854ba8d474be4e6f666d7379e8b"
@ -1112,7 +1284,9 @@ dependencies = [
"checksum redox_syscall 0.1.31 (registry+https://github.com/rust-lang/crates.io-index)" = "8dde11f18c108289bef24469638a04dce49da56084f2d50618b226e47eb04509"
"checksum regex 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1731164734096285ec2a5ec7fea5248ae2f5485b3feeb0115af4fda2183b2d1b"
"checksum regex-syntax 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ad890a5eef7953f55427c50575c680c42841653abd2b028b68cd223d157f62db"
"checksum relay 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f301bafeb60867c85170031bdb2fcf24c8041f33aee09e7b116a58d4e9f781c5"
"checksum rustc-demangle 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "aee45432acc62f7b9a108cc054142dac51f979e69e71ddce7d6fc7adf29e817e"
"checksum safemem 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e27a8b19b835f7aea908818e871f5cc3a5a186550c30773be987e155e8163d8f"
"checksum scoped-tls 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f417c22df063e9450888a7561788e9bd46d3bb3c1466435b4eccb903807f147d"
"checksum serde 1.0.21 (registry+https://github.com/rust-lang/crates.io-index)" = "6eda663e865517ee783b0891a3f6eb3a253e0b0dabb46418969ee9635beadd9e"
"checksum serde_derive 1.0.21 (registry+https://github.com/rust-lang/crates.io-index)" = "652bc323d694dc925829725ec6c890156d8e70ae5202919869cb00fe2eff3788"
@ -1120,16 +1294,20 @@ dependencies = [
"checksum serde_json 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "e4586746d1974a030c48919731ecffd0ed28d0c40749d0d18d43b3a7d6c9b20e"
"checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23"
"checksum slab 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fdeff4cd9ecff59ec7e3744cbca73dfe5ac35c2aedb2cfba8a1c715a18912e9d"
"checksum smallvec 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4c8cbcd6df1e117c2210e13ab5109635ad68a929fcbb8964dc965b76cb5ee013"
"checksum socket2 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "36b4896961171cd3317c7e9603d88f379f8c6e45342212235d356496680c68fd"
"checksum string 0.1.0 (git+https://github.com/carllerche/string)" = "<none>"
"checksum syn 0.11.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d3b891b9015c88c576343b9b3e41c2c11a51c219ef067b264bd9c8aa9b441dad"
"checksum synom 0.11.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a393066ed9010ebaed60b9eafa373d4b1baac186dd7e008555b0f702b51945b6"
"checksum take 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b157868d8ac1f56b64604539990685fa7611d8fa9e5476cf0c02cf34d32917c5"
"checksum tempdir 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "87974a6f5c1dfb344d733055601650059a3363de2a6104819293baff662132d6"
"checksum thread_local 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "1697c4b57aeeb7a536b647165a2825faddffb1d3bad386d507709bd51a90bb14"
"checksum time 0.1.38 (registry+https://github.com/rust-lang/crates.io-index)" = "d5d788d3aa77bc0ef3e9621256885555368b47bd495c13dd2e7413c89f845520"
"checksum tokio-connect 0.1.0 (git+https://github.com/carllerche/tokio-connect)" = "<none>"
"checksum tokio-core 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "c843a027f7c1df5f81e7734a0df3f67bf329411781ebf36393ce67beef6071e3"
"checksum tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "52b4e32d8edbf29501aabb3570f027c6ceb00ccef6538f4bddba0200503e74e8"
"checksum tokio-io 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "514aae203178929dbf03318ad7c683126672d4d96eccb77b29603d33c9e25743"
"checksum tokio-proto 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8fbb47ae81353c63c487030659494b295f6cb6576242f907f203473b191b0389"
"checksum tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "24da22d077e0f15f55162bdbdc661228c1581892f52074fb242678d015b45162"
"checksum tower 0.1.0 (git+https://github.com/tower-rs/tower)" = "<none>"
"checksum tower-balance 0.1.0 (git+https://github.com/tower-rs/tower)" = "<none>"
"checksum tower-buffer 0.1.0 (git+https://github.com/tower-rs/tower)" = "<none>"
@ -1137,6 +1315,7 @@ dependencies = [
"checksum tower-reconnect 0.1.0 (git+https://github.com/tower-rs/tower)" = "<none>"
"checksum tower-router 0.1.0 (git+https://github.com/tower-rs/tower)" = "<none>"
"checksum tower-util 0.1.0 (git+https://github.com/tower-rs/tower)" = "<none>"
"checksum unicase 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "284b6d3db520d67fbe88fd778c21510d1b0ba4a551e5d0fbb023d33405f6de8a"
"checksum unicode-bidi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "49f2bd0c6468a8230e1db229cff8029217cf623c767ea5d60bfbd42729ea54d5"
"checksum unicode-normalization 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "51ccda9ef9efa3f7ef5d91e8f9b83bbe6955f9bf86aec89d5cce2c874625920f"
"checksum unicode-segmentation 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a8083c594e02b8ae1654ae26f0ade5158b119bd88ad0e8227a5d8fcd72407946"
@ -1145,6 +1324,7 @@ dependencies = [
"checksum url 1.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fa35e768d4daf1d85733418a49fb42e10d7f633e394fccab4ab7aba897053fe2"
"checksum utf8-ranges 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "662fab6525a98beff2921d7f61a39e7d59e0b425ebc7d0d9e66d316e55124122"
"checksum vcpkg 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "9e0a7d8bed3178a8fb112199d466eeca9ed09a14ba8ad67718179b4fd5487d0b"
"checksum version_check 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "6b772017e347561807c1aa192438c5fd74242a670a6cffacc40f2defd1dc069d"
"checksum void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d"
"checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a"
"checksum winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc"

View File

@ -11,6 +11,8 @@ domain = "0.2.2"
env_logger = "0.4"
futures = "0.1"
http = "0.1"
httparse = "1.2"
hyper = { version = "0.11.15", features = ["compat"] }
ipnet = "1.0"
log = "0.3"
ordermap = "0.2"

View File

@ -5,7 +5,6 @@ use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::time::Duration;
use h2;
use http;
use tokio_core::reactor::Handle;
use tower_h2;
@ -14,6 +13,7 @@ use tower_reconnect::{self, Reconnect};
use control;
use ctx;
use telemetry;
use transparency;
use transport;
use ::timeout::Timeout;
@ -28,7 +28,6 @@ const DEFAULT_TIMEOUT_MS: u64 = 300;
/// Buffering is not bounded and no timeouts are applied.
pub struct Bind<C, B> {
ctx: C,
h2_builder: h2::client::Builder,
sensors: telemetry::Sensors,
executor: Handle,
req_ids: Arc<AtomicUsize>,
@ -36,32 +35,37 @@ pub struct Bind<C, B> {
_p: PhantomData<B>,
}
/// Binds a `Service` from a `SocketAddr` for a pre-determined protocol.
pub struct BindProtocol<C, B> {
bind: Bind<C, B>,
protocol: Protocol,
}
/// Mark whether to use HTTP/1 or HTTP/2
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub enum Protocol {
Http1,
Http2
}
type Service<B> = Reconnect<
telemetry::sensor::NewHttp<
tower_h2::client::Client<
transparency::Client<
telemetry::sensor::Connect<transport::TimeoutConnect<transport::Connect>>,
CtxtExec,
B,
>,
B,
tower_h2::RecvBody,
transparency::HttpBody,
>,
>;
type CtxtExec = ::logging::ContextualExecutor<(&'static str, SocketAddr), Handle>;
impl<B> Bind<(), B> {
pub fn new(executor: Handle) -> Self {
let mut h2_builder = h2::client::Builder::default();
// h2 currently doesn't handle PUSH_PROMISE that well, so we just
// disable it for now.
h2_builder.enable_push(false);
Self {
executor,
ctx: (),
sensors: telemetry::Sensors::null(),
req_ids: Default::default(),
h2_builder,
connect_timeout: Duration::from_millis(DEFAULT_TIMEOUT_MS),
_p: PhantomData,
}
@ -84,7 +88,6 @@ impl<B> Bind<(), B> {
pub fn with_ctx<C>(self, ctx: C) -> Bind<C, B> {
Bind {
ctx,
h2_builder: self.h2_builder,
sensors: self.sensors,
executor: self.executor,
req_ids: self.req_ids,
@ -98,7 +101,6 @@ impl<C: Clone, B> Clone for Bind<C, B> {
fn clone(&self) -> Self {
Self {
ctx: self.ctx.clone(),
h2_builder: self.h2_builder.clone(),
sensors: self.sensors.clone(),
executor: self.executor.clone(),
req_ids: self.req_ids.clone(),
@ -110,6 +112,10 @@ impl<C: Clone, B> Clone for Bind<C, B> {
impl<C, B> Bind<C, B> {
pub fn connect_timeout(&self) -> Duration {
self.connect_timeout
}
// pub fn ctx(&self) -> &C {
// &self.ctx
// }
@ -132,15 +138,15 @@ impl<B> Bind<Arc<ctx::Proxy>, B>
where
B: tower_h2::Body + 'static,
{
pub fn bind_service(&self, addr: &SocketAddr) -> Service<B> {
trace!("bind_service {}", addr);
pub fn bind_service(&self, addr: &SocketAddr, protocol: Protocol) -> Service<B> {
trace!("bind_service addr={}, protocol={:?}", addr, protocol);
let client_ctx = ctx::transport::Client::new(
&self.ctx,
addr,
control::pb::proxy::common::Protocol::Http,
);
// Map a socket address to an HTTP/2.0 connection.
// Map a socket address to a connection.
let connect = {
let c = Timeout::new(
transport::Connect::new(*addr, &self.executor),
@ -151,30 +157,39 @@ where
self.sensors.connect(c, &client_ctx)
};
// Establishes an HTTP/2.0 connection
let client = tower_h2::client::Client::new(
let client = transparency::Client::new(
protocol,
connect,
self.h2_builder.clone(),
::logging::context_executor(("client", *addr), self.executor.clone()),
self.executor.clone(),
);
let h2_proxy = self.sensors.http(self.req_ids.clone(), client, &client_ctx);
let proxy = self.sensors.http(self.req_ids.clone(), client, &client_ctx);
// Automatically perform reconnects if the connection fails.
//
// TODO: Add some sort of backoff logic.
Reconnect::new(h2_proxy)
Reconnect::new(proxy)
}
}
// ===== impl Bind =====
// ===== impl BindProtocol =====
impl<B> control::discovery::Bind for Bind<Arc<ctx::Proxy>, B>
impl<C, B> Bind<C, B> {
pub fn with_protocol(self, protocol: Protocol) -> BindProtocol<C, B> {
BindProtocol {
bind: self,
protocol,
}
}
}
impl<B> control::discovery::Bind for BindProtocol<Arc<ctx::Proxy>, B>
where
B: tower_h2::Body + 'static,
{
type Request = http::Request<B>;
type Response = http::Response<telemetry::sensor::http::ResponseBody<tower_h2::RecvBody>>;
type Response = http::Response<telemetry::sensor::http::ResponseBody<transparency::HttpBody>>;
type Error = tower_reconnect::Error<
tower_h2::client::Error,
tower_h2::client::ConnectError<transport::TimeoutError<io::Error>>,
@ -183,6 +198,7 @@ where
type BindError = ();
fn bind(&self, addr: &SocketAddr) -> Result<Self::Service, Self::BindError> {
Ok::<_, ()>(self.bind_service(addr))
Ok(self.bind.bind_service(addr, self.protocol))
}
}

View File

@ -1,3 +1,4 @@
use bytes::Buf;
use futures::*;
use std;
use std::io;
@ -8,7 +9,7 @@ use tokio_core::reactor::Handle;
use tokio_io::{AsyncRead, AsyncWrite};
use config::Addr;
use transport;
use transport::GetOriginalDst;
pub type PlaintextSocket = tokio_core::net::TcpStream;
@ -97,14 +98,20 @@ impl Future for Connecting {
// ===== impl Connection =====
impl Connection {
pub fn original_dst_addr(&self) -> Option<SocketAddr> {
transport::get_original_dst(self.socket())
pub fn original_dst_addr<T: GetOriginalDst>(&self, get: &T) -> Option<SocketAddr> {
get.get_original_dst(self.socket())
}
pub fn local_addr(&self) -> Result<SocketAddr, std::io::Error> {
self.socket().local_addr()
}
pub fn peek_future<T: AsMut<[u8]>>(self, buf: T) -> Peek<T> {
Peek {
inner: Some((self, buf))
}
}
// This must never be made public so that in the future `Connection` can
// control access to the plaintext socket for TLS, to ensure no private
// data is accidentally writen to the socket and to ensure no unprotected
@ -128,8 +135,15 @@ impl io::Read for Connection {
}
}
// TODO: impl specialty functions
impl AsyncRead for Connection {}
impl AsyncRead for Connection {
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
use self::Connection::*;
match *self {
Plain(ref t) => t.prepare_uninitialized_buffer(buf),
}
}
}
impl io::Write for Connection {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
@ -149,7 +163,6 @@ impl io::Write for Connection {
}
}
// TODO: impl specialty functions
impl AsyncWrite for Connection {
fn shutdown(&mut self) -> Poll<(), io::Error> {
use self::Connection::*;
@ -158,6 +171,39 @@ impl AsyncWrite for Connection {
Plain(ref mut t) => t.shutdown(),
}
}
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
use self::Connection::*;
match *self {
Plain(ref mut t) => t.write_buf(buf),
}
}
}
// impl Peek
pub struct Peek<T> {
inner: Option<(Connection, T)>,
}
impl<T: AsMut<[u8]>> Future for Peek<T> {
type Item = (Connection, T, usize);
type Error = std::io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let (conn, mut buf) = self.inner.take().expect("polled after completed");
match conn.socket().peek(buf.as_mut()) {
Ok(n) => Ok(Async::Ready((conn, buf, n))),
Err(e) => match e.kind() {
std::io::ErrorKind::WouldBlock => {
self.inner = Some((conn, buf));
Ok(Async::NotReady)
},
_ => Err(e)
},
}
}
}
// Misc.

View File

@ -67,7 +67,7 @@ struct DestinationSet<R> {
addrs: HashSet<SocketAddr>,
needs_reconnect: bool,
rx: R,
tx: mpsc::UnboundedSender<Update>,
txs: Vec<mpsc::UnboundedSender<Update>>,
}
#[derive(Debug)]
@ -247,7 +247,13 @@ where
trace!("Destination.Get {:?}", auth);
match self.destinations.entry(auth) {
Entry::Occupied(mut occ) => {
occ.get_mut().tx = tx;
let set = occ.get_mut();
// we may already know of some addresses here, so push
// them onto the new watch first
for &addr in &set.addrs {
let _ = tx.unbounded_send(Update::Insert(addr));
}
set.txs.push(tx);
}
Entry::Vacant(vac) => {
let req = Destination {
@ -259,7 +265,7 @@ where
addrs: HashSet::new(),
needs_reconnect: false,
rx: stream,
tx,
txs: vec![tx],
});
}
}
@ -317,7 +323,10 @@ where
if let Some(addr) = addr.addr.and_then(pb_to_sock_addr) {
if set.addrs.insert(addr) {
trace!("update {:?} for {:?}", addr, auth);
let _ = set.tx.unbounded_send(Update::Insert(addr));
// retain is used to drop any senders that are dead
set.txs.retain(|tx| {
tx.unbounded_send(Update::Insert(addr)).is_ok()
});
}
}
},
@ -325,7 +334,10 @@ where
if let Some(addr) = pb_to_sock_addr(addr) {
if set.addrs.remove(&addr) {
trace!("remove {:?} for {:?}", addr, auth);
let _ = set.tx.unbounded_send(Update::Remove(addr));
// retain is used to drop any senders that are dead
set.txs.retain(|tx| {
tx.unbounded_send(Update::Remove(addr)).is_ok()
});
}
}
},

View File

@ -126,7 +126,7 @@ impl<'a> TryFrom<&'a Event> for common::TapEvent {
stream: ctx.id as u64,
}),
method: Some((&ctx.method).into()),
scheme: ctx.uri.scheme().map(|s| s.into()),
scheme: ctx.uri.scheme_part().map(|s| s.as_str().into()),
authority: ctx.uri
.authority_part()
.map(|a| a.as_str())

View File

@ -96,7 +96,7 @@ where
return;
}
Ok(Async::Ready(None)) => {
error!("report stream complete");
debug!("report stream complete");
return;
}
Err(err) => {

View File

@ -1,4 +1,4 @@
use std::net::SocketAddr;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use control::pb::common::Protocol;
@ -56,6 +56,29 @@ impl Server {
Arc::new(s)
}
pub fn orig_dst_if_not_local(&self) -> Option<SocketAddr> {
match self.orig_dst {
None => None,
Some(orig_dst) => {
// If the original destination is actually the listening socket,
// we don't want to create a loop.
if same_addr(&orig_dst, &self.local) {
None
} else {
Some(orig_dst)
}
}
}
}
}
fn same_addr(a0: &SocketAddr, a1: &SocketAddr) -> bool {
(a0.port() == a1.port()) && match (a0.ip(), a1.ip()) {
(IpAddr::V6(a0), IpAddr::V4(a1)) => a0.to_ipv4() == Some(a1),
(IpAddr::V4(a0), IpAddr::V6(a1)) => Some(a0) == a1.to_ipv4(),
(a0, a1) => (a0 == a1),
}
}
impl Client {
@ -85,3 +108,58 @@ impl From<Arc<Server>> for Ctx {
Ctx::Server(s)
}
}
#[cfg(test)]
mod tests {
use std::net;
use quickcheck::TestResult;
use super::same_addr;
quickcheck! {
fn same_addr_ipv4(ip0: net::Ipv4Addr, ip1: net::Ipv4Addr, port0: u16, port1: u16) -> TestResult {
if port0 == 0 || port0 == ::std::u16::MAX {
return TestResult::discard();
} else if port1 == 0 || port1 == ::std::u16::MAX {
return TestResult::discard();
}
let addr0 = net::SocketAddr::new(net::IpAddr::V4(ip0), port0);
let addr1 = net::SocketAddr::new(net::IpAddr::V4(ip1), port1);
TestResult::from_bool(same_addr(&addr0, &addr1) == (addr0 == addr1))
}
fn same_addr_ipv6(ip0: net::Ipv6Addr, ip1: net::Ipv6Addr, port0: u16, port1: u16) -> TestResult {
if port0 == 0 || port0 == ::std::u16::MAX {
return TestResult::discard();
} else if port1 == 0 || port1 == ::std::u16::MAX {
return TestResult::discard();
}
let addr0 = net::SocketAddr::new(net::IpAddr::V6(ip0), port0);
let addr1 = net::SocketAddr::new(net::IpAddr::V6(ip1), port1);
TestResult::from_bool(same_addr(&addr0, &addr1) == (addr0 == addr1))
}
fn same_addr_ip6_mapped_ipv4(ip: net::Ipv4Addr, port: u16) -> TestResult {
if port == 0 || port == ::std::u16::MAX {
return TestResult::discard();
}
let addr4 = net::SocketAddr::new(net::IpAddr::V4(ip), port);
let addr6 = net::SocketAddr::new(net::IpAddr::V6(ip.to_ipv6_mapped()), port);
TestResult::from_bool(same_addr(&addr4, &addr6))
}
fn same_addr_ip6_compat_ipv4(ip: net::Ipv4Addr, port: u16) -> TestResult {
if port == 0 || port == ::std::u16::MAX {
return TestResult::discard();
}
let addr4 = net::SocketAddr::new(net::IpAddr::V4(ip), port);
let addr6 = net::SocketAddr::new(net::IpAddr::V6(ip.to_ipv6_compatible()), port);
TestResult::from_bool(same_addr(&addr4, &addr6))
}
}
}

View File

@ -1,9 +1,8 @@
use std::io;
use std::net::{IpAddr, SocketAddr};
use std::net::{SocketAddr};
use std::sync::Arc;
use http;
use tokio_core::reactor::Handle;
use tower_buffer::{self, Buffer};
use tower_h2;
use tower_reconnect::{self, Reconnect};
@ -12,6 +11,7 @@ use tower_router::Recognize;
use bind;
use ctx;
use telemetry;
use transparency;
use transport;
type Bind<B> = bind::Bind<Arc<ctx::Proxy>, B>;
@ -21,14 +21,11 @@ pub struct Inbound<B> {
bind: Bind<B>,
}
type Client<B> = tower_h2::client::Client<
type Client<B> = transparency::Client<
telemetry::sensor::Connect<transport::TimeoutConnect<transport::Connect>>,
CtxtExec,
B,
>;
type CtxtExec = ::logging::ContextualExecutor<(&'static str, SocketAddr), Handle>;
// ===== impl Inbound =====
impl<B> Inbound<B> {
@ -38,14 +35,6 @@ impl<B> Inbound<B> {
bind,
}
}
fn same_addr(a0: &SocketAddr, a1: &SocketAddr) -> bool {
(a0.port() == a1.port()) && match (a0.ip(), a1.ip()) {
(IpAddr::V6(a0), IpAddr::V4(a1)) => a0.to_ipv4() == Some(a1),
(IpAddr::V4(a0), IpAddr::V6(a1)) => Some(a0) == a1.to_ipv4(),
(a0, a1) => (a0 == a1),
}
}
}
impl<B> Recognize for Inbound<B>
@ -53,37 +42,33 @@ where
B: tower_h2::Body + 'static,
{
type Request = http::Request<B>;
type Response = http::Response<telemetry::sensor::http::ResponseBody<tower_h2::RecvBody>>;
type Response = http::Response<telemetry::sensor::http::ResponseBody<transparency::HttpBody>>;
type Error = tower_buffer::Error<
tower_reconnect::Error<
tower_h2::client::Error,
tower_h2::client::ConnectError<transport::TimeoutError<io::Error>>,
>,
>;
type Key = SocketAddr;
type Key = (SocketAddr, bind::Protocol);
type RouteError = ();
type Service = Buffer<Reconnect<telemetry::sensor::NewHttp<Client<B>, B, tower_h2::RecvBody>>>;
type Service = Buffer<Reconnect<telemetry::sensor::NewHttp<Client<B>, B, transparency::HttpBody>>>;
fn recognize(&self, req: &Self::Request) -> Option<Self::Key> {
let key = req.extensions()
.get::<Arc<ctx::transport::Server>>()
.and_then(|ctx| {
trace!("recognize local={} orig={:?}", ctx.local, ctx.orig_dst);
match ctx.orig_dst {
None => None,
Some(orig_dst) => {
// If the original destination is actually the listening socket,
// we don't want to create a loop.
if Self::same_addr(&orig_dst, &ctx.local) {
None
} else {
Some(orig_dst)
}
}
}
ctx.orig_dst_if_not_local()
})
.or_else(|| self.default_addr);
let proto = match req.version() {
http::Version::HTTP_2 => bind::Protocol::Http2,
_ => bind::Protocol::Http1,
};
let key = key.map(|addr| (addr, proto));
trace!("recognize key={:?}", key);
key
@ -95,14 +80,15 @@ where
///
/// Buffering is currently unbounded and does not apply timeouts. This must be
/// changed.
fn bind_service(&mut self, addr: &SocketAddr) -> Result<Self::Service, Self::RouteError> {
debug!("building inbound client to {}", addr);
fn bind_service(&mut self, key: &Self::Key) -> Result<Self::Service, Self::RouteError> {
let &(ref addr, proto) = key;
debug!("building inbound {:?} client to {}", proto, addr);
// Wrap with buffering. This currently is an unbounded buffer, which
// is not ideal.
//
// TODO: Don't use unbounded buffering.
Buffer::new(self.bind.bind_service(addr), self.bind.executor()).map_err(|_| {})
Buffer::new(self.bind.bind_service(addr, proto), self.bind.executor()).map_err(|_| {})
}
}
@ -112,13 +98,12 @@ mod tests {
use std::sync::Arc;
use http;
use quickcheck::TestResult;
use tokio_core::reactor::Core;
use tower_router::Recognize;
use super::Inbound;
use control::pb::common::Protocol;
use bind::Bind;
use bind::{self, Bind};
use ctx;
fn new_inbound(default: Option<net::SocketAddr>, ctx: &Arc<ctx::Proxy>) -> Inbound<()> {
@ -128,50 +113,6 @@ mod tests {
}
quickcheck! {
fn same_addr_ipv4(ip0: net::Ipv4Addr, ip1: net::Ipv4Addr, port0: u16, port1: u16) -> TestResult {
if port0 == 0 || port0 == ::std::u16::MAX {
return TestResult::discard();
} else if port1 == 0 || port1 == ::std::u16::MAX {
return TestResult::discard();
}
let addr0 = net::SocketAddr::new(net::IpAddr::V4(ip0), port0);
let addr1 = net::SocketAddr::new(net::IpAddr::V4(ip1), port1);
TestResult::from_bool(Inbound::<()>::same_addr(&addr0, &addr1) == (addr0 == addr1))
}
fn same_addr_ipv6(ip0: net::Ipv6Addr, ip1: net::Ipv6Addr, port0: u16, port1: u16) -> TestResult {
if port0 == 0 || port0 == ::std::u16::MAX {
return TestResult::discard();
} else if port1 == 0 || port1 == ::std::u16::MAX {
return TestResult::discard();
}
let addr0 = net::SocketAddr::new(net::IpAddr::V6(ip0), port0);
let addr1 = net::SocketAddr::new(net::IpAddr::V6(ip1), port1);
TestResult::from_bool(Inbound::<()>::same_addr(&addr0, &addr1) == (addr0 == addr1))
}
fn same_addr_ip6_mapped_ipv4(ip: net::Ipv4Addr, port: u16) -> TestResult {
if port == 0 || port == ::std::u16::MAX {
return TestResult::discard();
}
let addr4 = net::SocketAddr::new(net::IpAddr::V4(ip), port);
let addr6 = net::SocketAddr::new(net::IpAddr::V6(ip.to_ipv6_mapped()), port);
TestResult::from_bool(Inbound::<()>::same_addr(&addr4, &addr6))
}
fn same_addr_ip6_compat_ipv4(ip: net::Ipv4Addr, port: u16) -> TestResult {
if port == 0 || port == ::std::u16::MAX {
return TestResult::discard();
}
let addr4 = net::SocketAddr::new(net::IpAddr::V4(ip), port);
let addr6 = net::SocketAddr::new(net::IpAddr::V6(ip.to_ipv6_compatible()), port);
TestResult::from_bool(Inbound::<()>::same_addr(&addr4, &addr6))
}
fn recognize_orig_dst(
orig_dst: net::SocketAddr,
local: net::SocketAddr,
@ -181,21 +122,13 @@ mod tests {
let inbound = new_inbound(None, &ctx);
let srv_ctx = ctx::transport::Server::new(&ctx, &local, &remote, &Some(orig_dst), Protocol::Http);
let rec = srv_ctx.orig_dst_if_not_local().map(|addr| (addr, bind::Protocol::Http1));
let mut req = http::Request::new(());
req.extensions_mut()
.insert(ctx::transport::Server::new(
&ctx,
&local,
&remote,
&Some(orig_dst),
Protocol::Http,
));
let rec = if Inbound::<()>::same_addr(&orig_dst, &local) {
None
} else {
Some(orig_dst)
};
.insert(srv_ctx);
inbound.recognize(&req) == rec
}
@ -219,7 +152,7 @@ mod tests {
Protocol::Http,
));
inbound.recognize(&req) == default
inbound.recognize(&req) == default.map(|addr| (addr, bind::Protocol::Http1))
}
fn recognize_default_no_ctx(default: Option<net::SocketAddr>) -> bool {
@ -229,7 +162,7 @@ mod tests {
let req = http::Request::new(());
inbound.recognize(&req) == default
inbound.recognize(&req) == default.map(|addr| (addr, bind::Protocol::Http1))
}
fn recognize_default_no_loop(
@ -251,7 +184,7 @@ mod tests {
Protocol::Http,
));
inbound.recognize(&req) == default
inbound.recognize(&req) == default.map(|addr| (addr, bind::Protocol::Http1))
}
}
}

View File

@ -12,6 +12,8 @@ extern crate futures;
extern crate futures_mpsc_lossy;
extern crate h2;
extern crate http;
extern crate httparse;
extern crate hyper;
extern crate ipnet;
#[cfg(target_os = "linux")]
extern crate libc;
@ -46,12 +48,11 @@ use std::io;
use std::net::SocketAddr;
use std::sync::Arc;
use std::thread;
use std::time::Instant;
use std::time::Duration;
use tokio_core::reactor::{Core, Handle};
use tower::NewService;
use tower_fn::*;
use tower_h2::*;
use tower_router::{Recognize, Router};
pub mod app;
@ -68,6 +69,7 @@ mod logging;
mod map_err;
mod outbound;
mod telemetry;
mod transparency;
mod transport;
pub mod timeout;
mod tower_fn; // TODO: move to tower-fn
@ -77,6 +79,8 @@ use connection::BoundPort;
use control::pb::proxy::tap;
use inbound::Inbound;
use map_err::MapErr;
use transparency::{HttpBody, Server};
pub use transport::{GetOriginalDst, SoOriginalDst};
use outbound::Outbound;
/// Runs a sidecar proxy.
@ -92,16 +96,21 @@ use outbound::Outbound;
/// The private listener routes requests to service-discovery-aware load-balancer.
///
pub struct Main {
pub struct Main<G> {
config: config::Config,
control_listener: BoundPort,
inbound_listener: BoundPort,
outbound_listener: BoundPort,
get_original_dst: G,
}
impl Main {
pub fn new(config: config::Config) -> Self {
impl<G> Main<G>
where
G: GetOriginalDst + Clone + 'static,
{
pub fn new(config: config::Config, get_original_dst: G) -> Self {
let control_listener = BoundPort::new(config.control_listener.addr)
.expect("controller listener bind");
@ -114,6 +123,7 @@ impl Main {
control_listener,
inbound_listener,
outbound_listener,
get_original_dst,
}
}
@ -145,6 +155,7 @@ impl Main {
control_listener,
inbound_listener,
outbound_listener,
get_original_dst,
} = self;
let control_host_and_port = config.control_host_and_port.clone();
@ -186,10 +197,11 @@ impl Main {
let fut = serve(
inbound_listener,
h2::server::Builder::default(),
Inbound::new(default_addr, bind),
config.private_connect_timeout,
ctx,
sensors.clone(),
get_original_dst.clone(),
&executor,
);
::logging::context_future("inbound", fut)
@ -206,6 +218,8 @@ impl Main {
.map_or_else(|| bind.clone(), |t| bind.clone().with_connect_timeout(t))
.with_ctx(ctx.clone());
let tcp_connect_timeout = bind.connect_timeout();
let outgoing = Outbound::new(
bind,
control,
@ -214,10 +228,11 @@ impl Main {
let fut = serve(
outbound_listener,
h2::server::Builder::default(),
outgoing,
tcp_connect_timeout,
ctx,
sensors,
get_original_dst,
&executor,
);
::logging::context_future("outbound", fut)
@ -239,7 +254,6 @@ impl Main {
let server = serve_control(
control_listener,
h2::server::Builder::default(),
new_service,
&executor,
);
@ -275,85 +289,69 @@ impl Main {
}
}
fn serve<R, B, E, F>(
fn serve<R, B, E, F, G>(
bound_port: BoundPort,
h2_builder: h2::server::Builder,
recognize: R,
tcp_connect_timeout: Duration,
proxy_ctx: Arc<ctx::Proxy>,
sensors: telemetry::Sensors,
get_orig_dst: G,
executor: &Handle,
) -> Box<Future<Item = (), Error = io::Error> + 'static>
where
B: Body + Default + 'static,
B: tower_h2::Body + Default + 'static,
E: ::std::fmt::Debug + 'static,
F: ::std::fmt::Debug + 'static,
R: Recognize<
Request = http::Request<RecvBody>,
Request = http::Request<HttpBody>,
Response = http::Response<telemetry::sensor::http::ResponseBody<B>>,
Error = E,
RouteError = F,
>
+ 'static,
G: GetOriginalDst + 'static,
{
let router = Router::new(recognize);
let stack = NewServiceFn::new(move || {
let stack = Arc::new(NewServiceFn::new(move || {
// Clone the router handle
let router = router.clone();
// Map errors to 500 responses
MapErr::new(router)
});
}));
let listen_addr = bound_port.local_addr();
let server = Server::new(
listen_addr,
proxy_ctx,
sensors,
get_orig_dst,
stack,
h2_builder,
::logging::context_executor(("serve", listen_addr), executor.clone()),
tcp_connect_timeout,
executor.clone(),
);
bound_port.listen_and_fold(
executor,
(server, proxy_ctx, sensors, executor.clone()),
move |(server, proxy_ctx, sensors, executor), (connection, remote_addr)| {
let opened_at = Instant::now();
let orig_dst = connection.original_dst_addr();
let local_addr = connection.local_addr().unwrap_or(listen_addr);
// TODO: detect protocol.
let protocol = control::pb::common::Protocol::Http;
let srv_ctx =
ctx::transport::Server::new(
&proxy_ctx,
&local_addr,
&remote_addr,
&orig_dst,
protocol,
);
let io = sensors.accept(connection, opened_at, &srv_ctx);
// TODO session context
let set_ctx = move |request: &mut http::Request<()>| {
request.extensions_mut().insert(Arc::clone(&srv_ctx));
};
let s = server.serve_modified(io, set_ctx).map_err(|_| ());
executor.spawn(::logging::context_future(("serve", local_addr), s));
future::ok((server, proxy_ctx, sensors, executor))
(),
move |(), (connection, remote_addr)| {
server.serve(connection, remote_addr);
Ok(())
},
)
}
fn serve_control<N, B>(
bound_port: BoundPort,
h2_builder: h2::server::Builder,
new_service: N,
executor: &Handle,
) -> Box<Future<Item = (), Error = io::Error> + 'static>
where
B: Body + 'static,
N: NewService<Request = http::Request<RecvBody>, Response = http::Response<B>> + 'static,
B: tower_h2::Body + 'static,
N: NewService<Request = http::Request<tower_h2::RecvBody>, Response = http::Response<B>> + 'static,
{
let server = Server::new(new_service, h2_builder, executor.clone());
let h2_builder = h2::server::Builder::default();
let server = tower_h2::Server::new(new_service, h2_builder, executor.clone());
bound_port.listen_and_fold(
executor,
(server, executor.clone()),

View File

@ -13,5 +13,5 @@ fn main() {
process::exit(64)
}
};
conduit_proxy::Main::new(config).run();
conduit_proxy::Main::new(config, conduit_proxy::SoOriginalDst).run();
}

View File

@ -4,6 +4,7 @@ use std::marker::PhantomData;
use futures::{Future, Poll};
use h2;
use http;
use http::header::CONTENT_LENGTH;
use tower::Service;
/// Map an HTTP service's error to an appropriate 500 response.
@ -73,9 +74,10 @@ where
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll().or_else(|e| {
error!("turning h2 error into 500: {:?}", e);
error!("turning service error into 500: {:?}", e);
let response = http::Response::builder()
.status(500)
.header(CONTENT_LENGTH, "0")
.body(Default::default())
.unwrap();

View File

@ -8,14 +8,15 @@ use tower_h2;
use tower_reconnect;
use tower_router::Recognize;
use bind::Bind;
use bind::{Bind, BindProtocol, Protocol};
use control;
use ctx;
use fully_qualified_authority::FullyQualifiedAuthority;
use telemetry;
use transparency;
use transport;
type Discovery<B> = control::discovery::Watch<Bind<Arc<ctx::Proxy>, B>>;
type Discovery<B> = control::discovery::Watch<BindProtocol<Arc<ctx::Proxy>, B>>;
type Error = tower_buffer::Error<
tower_balance::Error<
@ -54,19 +55,25 @@ where
B: tower_h2::Body + 'static,
{
type Request = http::Request<B>;
type Response = http::Response<telemetry::sensor::http::ResponseBody<tower_h2::RecvBody>>;
type Response = http::Response<telemetry::sensor::http::ResponseBody<transparency::HttpBody>>;
type Error = Error;
type Key = FullyQualifiedAuthority;
type Key = (FullyQualifiedAuthority, Protocol);
type RouteError = ();
type Service = Buffer<Balance<Discovery<B>>>;
fn recognize(&self, req: &Self::Request) -> Option<Self::Key> {
req.uri().authority_part().map(|authority|
FullyQualifiedAuthority::new(
req.uri().authority_part().map(|authority| {
let auth = FullyQualifiedAuthority::new(
authority,
self.default_namespace.as_ref().map(|s| s.as_ref()),
self.default_zone.as_ref().map(|s| s.as_ref()))
)
self.default_zone.as_ref().map(|s| s.as_ref()));
let proto = match req.version() {
http::Version::HTTP_2 => Protocol::Http2,
_ => Protocol::Http1,
};
(auth, proto)
})
}
/// Builds a dynamic, load balancing service.
@ -80,11 +87,15 @@ where
/// changed.
fn bind_service(
&mut self,
authority: &FullyQualifiedAuthority,
key: &Self::Key,
) -> Result<Self::Service, Self::RouteError> {
debug!("building outbound client to {:?}", authority);
let &(ref authority, protocol) = key;
debug!("building outbound {:?} client to {:?}", protocol, authority);
let resolve = self.discovery.resolve(authority, self.bind.clone());
let resolve = self.discovery.resolve(
authority,
self.bind.clone().with_protocol(protocol),
);
let balance = Balance::new(resolve);

View File

@ -260,7 +260,7 @@ impl<'a> TryFrom<&'a observe_request::match_::tcp::Netmask> for NetMatch {
impl HttpMatch {
fn matches(&self, req: &Arc<ctx::http::Request>) -> bool {
match *self {
HttpMatch::Scheme(ref m) => req.uri.scheme().map(|s| *m == s).unwrap_or(false),
HttpMatch::Scheme(ref m) => req.uri.scheme_part().map(|s| &**m == s).unwrap_or(false),
HttpMatch::Method(ref m) => *m == req.method,

View File

@ -0,0 +1,215 @@
use futures::{Async, Future, Poll};
use h2;
use http;
use hyper;
use tokio_connect::Connect;
use tokio_core::reactor::Handle;
use tower::{Service, NewService};
use tower_h2;
use bind;
use super::glue::{BodyStream, HttpBody, HyperConnect};
/// A `NewService` that can speak either HTTP/1 or HTTP/2.
pub struct Client<C, B>
where
B: tower_h2::Body,
{
inner: ClientInner<C, B>,
}
enum ClientInner<C, B>
where
B: tower_h2::Body,
{
Http1(hyper::Client<HyperConnect<C>, BodyStream<B>>),
Http2(tower_h2::client::Client<C, Handle, B>),
}
/// A `Future` returned from `Client::new_service()`.
pub struct ClientNewServiceFuture<C, B>
where
B: tower_h2::Body + 'static,
C: Connect + 'static,
{
inner: ClientNewServiceFutureInner<C, B>,
}
enum ClientNewServiceFutureInner<C, B>
where
B: tower_h2::Body + 'static,
C: Connect + 'static,
{
Http1(Option<hyper::Client<HyperConnect<C>, BodyStream<B>>>),
Http2(tower_h2::client::ConnectFuture<C, Handle, B>),
}
/// The `Service` yielded by `Client::new_service()`.
pub struct ClientService<C, B>
where
B: tower_h2::Body,
{
inner: ClientServiceInner<C, B>,
}
enum ClientServiceInner<C, B>
where
B: tower_h2::Body,
{
Http1(hyper::Client<HyperConnect<C>, BodyStream<B>>),
Http2(tower_h2::client::Service<C, Handle, B>),
}
impl<C, B> Client<C, B>
where
C: Connect + Clone + 'static,
C::Future: 'static,
B: tower_h2::Body + 'static,
{
/// Create a new `Client`, bound to a specific protocol (HTTP/1 or HTTP/2).
pub fn new(protocol: bind::Protocol, connect: C, executor: Handle) -> Self {
match protocol {
bind::Protocol::Http1 => {
let h1 = hyper::Client::configure()
.connector(HyperConnect::new(connect))
.body()
.build(&executor);
Client {
inner: ClientInner::Http1(h1),
}
},
bind::Protocol::Http2 => {
let mut h2_builder = h2::client::Builder::default();
// h2 currently doesn't handle PUSH_PROMISE that well, so we just
// disable it for now.
h2_builder.enable_push(false);
let h2 = tower_h2::client::Client::new(connect, h2_builder, executor);
Client {
inner: ClientInner::Http2(h2),
}
}
}
}
}
impl<C, B> NewService for Client<C, B>
where
C: Connect + Clone + 'static,
C::Future: 'static,
B: tower_h2::Body + 'static,
{
type Request = http::Request<B>;
type Response = http::Response<HttpBody>;
type Error = tower_h2::client::Error;
type InitError = tower_h2::client::ConnectError<C::Error>;
type Service = ClientService<C, B>;
type Future = ClientNewServiceFuture<C, B>;
fn new_service(&self) -> Self::Future {
let inner = match self.inner {
ClientInner::Http1(ref h1) => {
ClientNewServiceFutureInner::Http1(Some(h1.clone()))
},
ClientInner::Http2(ref h2) => {
ClientNewServiceFutureInner::Http2(h2.new_service()) },
};
ClientNewServiceFuture {
inner,
}
}
}
impl<C, B> Future for ClientNewServiceFuture<C, B>
where
C: Connect + 'static,
B: tower_h2::Body + 'static,
{
type Item = ClientService<C, B>;
type Error = tower_h2::client::ConnectError<C::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let inner = match self.inner {
ClientNewServiceFutureInner::Http1(ref mut h1) => {
ClientServiceInner::Http1(h1.take().expect("poll more than once"))
},
ClientNewServiceFutureInner::Http2(ref mut h2) => {
let s = try_ready!(h2.poll());
ClientServiceInner::Http2(s)
},
};
Ok(Async::Ready(ClientService {
inner,
}))
}
}
impl<C, B> Service for ClientService<C, B>
where
C: Connect + 'static,
C::Future: 'static,
B: tower_h2::Body + 'static,
{
type Request = http::Request<B>;
type Response = http::Response<HttpBody>;
type Error = tower_h2::client::Error;
type Future = ClientServiceFuture;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
match self.inner {
ClientServiceInner::Http1(_) => Ok(Async::Ready(())),
ClientServiceInner::Http2(ref mut h2) => h2.poll_ready(),
}
}
fn call(&mut self, req: Self::Request) -> Self::Future {
match self.inner {
ClientServiceInner::Http1(ref h1) => {
let is_body_empty = req.body().is_end_stream();
let mut req = hyper::Request::from(req.map(BodyStream));
if is_body_empty {
req.headers_mut().set(hyper::header::ContentLength(0));
}
ClientServiceFuture::Http1(h1.request(req))
},
ClientServiceInner::Http2(ref mut h2) => {
ClientServiceFuture::Http2(h2.call(req))
},
}
}
}
pub enum ClientServiceFuture {
Http1(hyper::client::FutureResponse),
Http2(tower_h2::client::ResponseFuture),
}
impl Future for ClientServiceFuture {
type Item = http::Response<HttpBody>;
type Error = tower_h2::client::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match *self {
ClientServiceFuture::Http1(ref mut f) => {
match f.poll() {
Ok(Async::Ready(res)) => {
let res = http::Response::from(res);
let res = res.map(HttpBody::Http1);
Ok(Async::Ready(res))
},
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => {
debug!("http/1 client error: {}", e);
Err(h2::Reason::INTERNAL_ERROR.into())
}
}
},
ClientServiceFuture::Http2(ref mut f) => {
let res = try_ready!(f.poll());
let res = res.map(HttpBody::Http2);
Ok(Async::Ready(res))
}
}
}
}

View File

@ -0,0 +1,333 @@
use std::cell::RefCell;
use std::fmt;
use std::io;
use std::sync::Arc;
use bytes::Bytes;
use futures::{future, Async, Future, Poll, Stream};
use futures::future::Either;
use h2;
use http;
use hyper;
use tokio_connect::Connect;
use tower::{Service, NewService};
use tower_h2;
use ctx::transport::{Server as ServerCtx};
use super::h1;
/// Glue between `hyper::Body` and `tower_h2::RecvBody`.
#[derive(Debug)]
pub enum HttpBody {
Http1(hyper::Body),
Http2(tower_h2::RecvBody),
}
/// Glue for `tower_h2::Body`s to be used in hyper.
#[derive(Debug)]
pub(super) struct BodyStream<B>(pub(super) B);
/// Glue for the `Data` part of a `tower_h2::Body` to be used as an `AsRef` in `BodyStream`.
#[derive(Debug)]
pub(super) struct BufAsRef<B>(B);
/// Glue for a `tower::Service` to used as a `hyper::server::Service`.
#[derive(Debug)]
pub(super) struct HyperServerSvc<S> {
service: RefCell<S>,
srv_ctx: Arc<ServerCtx>,
}
/// Future returned by `HyperServerSvc`.
pub(super) struct HyperServerSvcFuture<F> {
inner: F,
}
/// Glue for any `Service` taking an h2 body to receive an `HttpBody`.
#[derive(Debug)]
pub(super) struct HttpBodySvc<S> {
service: S,
}
/// Glue for any `NewService` taking an h2 body to receive an `HttpBody`.
#[derive(Clone)]
pub(super) struct HttpBodyNewSvc<N> {
new_service: N,
}
/// Future returned by `HttpBodyNewSvc`.
pub(super) struct HttpBodyNewSvcFuture<F> {
inner: F,
}
/// Glue for any `tokio_connect::Connect` to implement `hyper::client::Connect`.
#[derive(Debug, Clone)]
pub(super) struct HyperConnect<C> {
connect: C,
}
/// Future returned by `HyperConnect`.
pub(super) struct HyperConnectFuture<F> {
inner: F,
}
// ===== impl HttpBody =====
impl tower_h2::Body for HttpBody {
type Data = Bytes;
fn is_end_stream(&self) -> bool {
match *self {
HttpBody::Http1(_) => false,
HttpBody::Http2(ref b) => b.is_end_stream(),
}
}
fn poll_data(&mut self) -> Poll<Option<Self::Data>, h2::Error> {
match *self {
HttpBody::Http1(ref mut b) => {
match b.poll() {
Ok(Async::Ready(Some(chunk))) => Ok(Async::Ready(Some(chunk.into()))),
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => {
debug!("http/1 body error: {}", e);
Err(h2::Reason::INTERNAL_ERROR.into())
}
}
},
HttpBody::Http2(ref mut b) => b.poll_data().map(|async| async.map(|opt| opt.map(|data| data.into()))),
}
}
fn poll_trailers(&mut self) -> Poll<Option<http::HeaderMap>, h2::Error> {
match *self {
HttpBody::Http1(_) => Ok(Async::Ready(None)),
HttpBody::Http2(ref mut b) => b.poll_trailers(),
}
}
}
impl Default for HttpBody {
fn default() -> HttpBody {
HttpBody::Http2(Default::default())
}
}
// ===== impl BodyStream =====
impl<B> Stream for BodyStream<B>
where
B: tower_h2::Body,
{
type Item = BufAsRef<<B::Data as ::bytes::IntoBuf>::Buf>;
type Error = hyper::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.0.poll_data()
.map(|async| async.map(|opt| opt.map(|buf| BufAsRef(::bytes::IntoBuf::into_buf(buf)))))
.map_err(|e| {
trace!("h2 body error: {:?}", e);
hyper::Error::Io(io::ErrorKind::Other.into())
})
}
}
// ===== impl BufAsRef =====
impl<B: ::bytes::Buf> AsRef<[u8]> for BufAsRef<B> {
fn as_ref(&self) -> &[u8] {
::bytes::Buf::bytes(&self.0)
}
}
// ===== impl HyperServerSvc =====
impl<S> HyperServerSvc<S> {
pub fn new(svc: S, ctx: Arc<ServerCtx>) -> Self {
HyperServerSvc {
service: RefCell::new(svc),
srv_ctx: ctx,
}
}
}
impl<S, B> hyper::server::Service for HyperServerSvc<S>
where
S: Service<
Request=http::Request<HttpBody>,
Response=http::Response<B>,
>,
S::Error: fmt::Debug,
B: tower_h2::Body + 'static,
{
type Request = hyper::server::Request;
type Response = hyper::server::Response<BodyStream<B>>;
type Error = hyper::Error;
type Future = Either<
HyperServerSvcFuture<S::Future>,
future::FutureResult<Self::Response, Self::Error>,
>;
fn call(&self, req: Self::Request) -> Self::Future {
if let &hyper::Method::Connect = req.method() {
debug!("HTTP/1.1 CONNECT not supported");
let res = hyper::Response::new()
.with_status(hyper::StatusCode::BadGateway);
return Either::B(future::ok(res));
}
let mut req: http::Request<hyper::Body> = req.into();
req.extensions_mut().insert(self.srv_ctx.clone());
if let Err(()) = h1::reconstruct_uri(&mut req) {
let res = hyper::Response::new()
.with_status(hyper::BadRequest);
return Either::B(future::ok(res));
}
h1::strip_connection_headers(req.headers_mut());
let req = req.map(|b| HttpBody::Http1(b));
let f = HyperServerSvcFuture {
inner: self.service.borrow_mut().call(req),
};
Either::A(f)
}
}
impl<F, B> Future for HyperServerSvcFuture<F>
where
F: Future<Item=http::Response<B>>,
F::Error: fmt::Debug,
{
type Item = hyper::server::Response<BodyStream<B>>;
type Error = hyper::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut res = try_ready!(self.inner.poll().map_err(|e| {
debug!("h2 error: {:?}", e);
hyper::Error::Io(io::ErrorKind::Other.into())
}));
if res.status() == http::StatusCode::SWITCHING_PROTOCOLS {
debug!("HTTP/1.1 101 upgrade not supported");
let res = hyper::Response::new()
.with_status(hyper::StatusCode::BadGateway);
return Ok(Async::Ready(res));
}
h1::strip_connection_headers(res.headers_mut());
Ok(Async::Ready(res.map(BodyStream).into()))
}
}
// ==== impl HttpBodySvc ====
impl<S> Service for HttpBodySvc<S>
where
S: Service<
Request=http::Request<HttpBody>,
>,
{
type Request = http::Request<tower_h2::RecvBody>;
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.service.poll_ready()
}
fn call(&mut self, req: Self::Request) -> Self::Future {
self.service.call(req.map(|b| HttpBody::Http2(b)))
}
}
impl<N> HttpBodyNewSvc<N>
where
N: NewService<Request=http::Request<HttpBody>>,
{
pub fn new(new_service: N) -> Self {
HttpBodyNewSvc {
new_service,
}
}
}
impl<N> NewService for HttpBodyNewSvc<N>
where
N: NewService<Request=http::Request<HttpBody>>,
{
type Request = http::Request<tower_h2::RecvBody>;
type Response = N::Response;
type Error = N::Error;
type Service = HttpBodySvc<N::Service>;
type InitError = N::InitError;
type Future = HttpBodyNewSvcFuture<N::Future>;
fn new_service(&self) -> Self::Future {
HttpBodyNewSvcFuture {
inner: self.new_service.new_service(),
}
}
}
impl<F> Future for HttpBodyNewSvcFuture<F>
where
F: Future,
{
type Item = HttpBodySvc<F::Item>;
type Error = F::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let s = try_ready!(self.inner.poll());
Ok(Async::Ready(HttpBodySvc {
service: s,
}))
}
}
// ===== impl HyperConnect =====
impl<C> HyperConnect<C>
where
C: Connect,
C::Future: 'static,
{
pub fn new(connect: C) -> Self {
HyperConnect {
connect,
}
}
}
impl<C> hyper::client::Service for HyperConnect<C>
where
C: Connect,
C::Future: 'static,
{
type Request = hyper::Uri;
type Response = C::Connected;
type Error = io::Error;
type Future = HyperConnectFuture<C::Future>;
fn call(&self, _uri: Self::Request) -> Self::Future {
HyperConnectFuture {
inner: self.connect.connect(),
}
}
}
impl<F> Future for HyperConnectFuture<F>
where
F: Future,
{
type Item = F::Item;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll()
.map_err(|_| io::ErrorKind::Other.into())
}
}

View File

@ -0,0 +1,99 @@
use std::fmt::Write;
use std::mem;
use std::sync::Arc;
use bytes::BytesMut;
use http;
use http::header::{HeaderValue, HOST};
use http::uri::{Authority, Parts, Scheme, Uri};
use ctx::transport::{Server as ServerCtx};
pub fn reconstruct_uri<B>(req: &mut http::Request<B>) -> Result<(), ()> {
// RFC7230#section-5.4
// If an absolute-form uri is received, it must replace
// the host header
if let Some(auth) = req.uri().authority_part().cloned() {
if let Some(host) = req.headers().get(HOST) {
if auth.as_str().as_bytes() == host.as_bytes() {
// host and absolute-form agree, nothing more to do
return Ok(());
}
}
let host = HeaderValue::from_shared(auth.into_bytes())
.expect("a valid authority is valid header value");
req.headers_mut().insert(HOST, host);
return Ok(());
}
// try to parse the Host header
if let Some(host) = req.headers().get(HOST).cloned() {
let auth = host.to_str()
.ok()
.and_then(|s| {
if s.is_empty() {
None
} else {
s.parse::<Authority>().ok()
}
});
if let Some(auth) = auth {
set_authority(req.uri_mut(), auth);
return Ok(());
}
}
// last resort is to use the so_original_dst
let orig_dst = req.extensions()
.get::<Arc<ServerCtx>>()
.and_then(|ctx| ctx.orig_dst_if_not_local());
if let Some(orig_dst) = orig_dst {
let mut bytes = BytesMut::with_capacity(31);
write!(&mut bytes, "{}", orig_dst)
.expect("socket address display is under 31 bytes");
let bytes = bytes.freeze();
let auth = Authority::from_shared(bytes)
.expect("socket address is valid authority");
set_authority(req.uri_mut(), auth);
return Ok(());
}
Err(())
}
fn set_authority(uri: &mut http::Uri, auth: Authority) {
let mut parts = Parts::from(mem::replace(uri, Uri::default()));
parts.scheme = Some(Scheme::HTTP);
parts.authority = Some(auth);
let new = Uri::from_parts(parts)
.expect("absolute uri");
*uri = new;
}
pub fn strip_connection_headers(headers: &mut http::HeaderMap) {
let conn_val = if let Some(val) = headers.remove(http::header::CONNECTION) {
val
} else {
return
};
let conn_header = if let Ok(s) = conn_val.to_str() {
s
} else {
return
};
// A `Connection` header may have a comma-separated list of
// names of other headers that are meant for only this specific connection.
//
// Iterate these names and remove them as headers.
for name in conn_header.split(',') {
let name = name.trim();
headers.remove(name);
}
}

View File

@ -0,0 +1,10 @@
mod client;
mod glue;
mod h1;
mod protocol;
mod server;
mod tcp;
pub use self::client::Client;
pub use self::glue::HttpBody;
pub use self::server::Server;

View File

@ -0,0 +1,45 @@
use httparse;
/// Known protocols that we proxy transparently.
#[derive(Debug)]
pub enum Protocol {
Http1,
Http2,
}
const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
impl Protocol {
/// Tries to detect a known protocol in the peeked bytes.
///
/// If no protocol can be determined, returns `None`.
pub fn detect(bytes: &[u8]) -> Option<Protocol> {
// http2 is easiest to detect
if bytes.len() >= H2_PREFACE.len() {
if &bytes[..H2_PREFACE.len()] == H2_PREFACE {
return Some(Protocol::Http2);
}
}
// http1 can have a really long first line, but if the bytes so far
// look like http1, we'll assume it is. a different protocol
// should look different in the first few bytes
let mut headers = [httparse::EMPTY_HEADER; 0];
let mut req = httparse::Request::new(&mut headers);
match req.parse(bytes) {
// Ok(Compelete) or Ok(Partial) both mean it looks like HTTP1!
//
// If we got past the first line, we'll see TooManyHeaders,
// because we passed an array of 0 headers to parse into. That's fine!
// We didn't want to keep parsing headers, just validate that
// the first line is HTTP1.
Ok(_) | Err(httparse::Error::TooManyHeaders) => {
return Some(Protocol::Http1);
},
_ => {}
}
None
}
}

View File

@ -0,0 +1,160 @@
use std::fmt;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use futures::Future;
use http;
use hyper;
use tokio_core::reactor::Handle;
use tower::NewService;
use tower_h2;
use control;
use connection::Connection;
use ctx::Proxy as ProxyCtx;
use ctx::transport::{Server as ServerCtx};
use telemetry::Sensors;
use transport::GetOriginalDst;
use super::glue::{HttpBody, HttpBodyNewSvc, HyperServerSvc};
use super::protocol::Protocol;
use super::tcp;
/// A protocol-transparent Server!
///
/// This type can `serve` new connections, determine what protocol
/// the connection is speaking, and route it to the corresponding
/// service.
pub struct Server<S: NewService, B: tower_h2::Body, G>
where
S: NewService<Request=http::Request<HttpBody>>,
S::Future: 'static,
{
executor: Handle,
get_orig_dst: G,
h1: hyper::server::Http,
h2: tower_h2::Server<HttpBodyNewSvc<S>, Handle, B>,
listen_addr: SocketAddr,
new_service: S,
proxy_ctx: Arc<ProxyCtx>,
sensors: Sensors,
tcp: tcp::Proxy,
}
impl<S, B, G> Server<S, B, G>
where
S: NewService<
Request = http::Request<HttpBody>,
Response = http::Response<B>
> + Clone + 'static,
S::Future: 'static,
S::Error: fmt::Debug,
B: tower_h2::Body + 'static,
G: GetOriginalDst,
{
/// Creates a new `Server`.
pub fn new(
listen_addr: SocketAddr,
proxy_ctx: Arc<ProxyCtx>,
sensors: Sensors,
get_orig_dst: G,
stack: S,
tcp_connect_timeout: Duration,
executor: Handle,
) -> Self {
let recv_body_svc = HttpBodyNewSvc::new(stack.clone());
let tcp = tcp::Proxy::new(tcp_connect_timeout, sensors.clone(), &executor);
Server {
executor: executor.clone(),
get_orig_dst,
h1: hyper::server::Http::new(),
h2: tower_h2::Server::new(recv_body_svc, Default::default(), executor),
listen_addr,
new_service: stack,
proxy_ctx,
sensors,
tcp,
}
}
/// Handle a new connection.
///
/// This will peek on the connection for the first bytes to determine
/// what protocol the connection is speaking. From there, the connection
/// will be mapped into respective services, and spawned into an
/// executor.
pub fn serve(&self, connection: Connection, remote_addr: SocketAddr) {
let opened_at = Instant::now();
// create Server context
let orig_dst = connection.original_dst_addr(&self.get_orig_dst);
let local_addr = connection.local_addr().unwrap_or(self.listen_addr);
let proxy_ctx = self.proxy_ctx.clone();
// try to sniff protocol
let sniff = [0u8; 32];
let sensors = self.sensors.clone();
let h1 = self.h1.clone();
let h2 = self.h2.clone();
let tcp = self.tcp.clone();
let new_service = self.new_service.clone();
let fut = connection
.peek_future(sniff)
.map_err(|_| ())
.and_then(move |(connection, sniff, n)| -> Box<Future<Item=(), Error=()>> {
if let Some(proto) = Protocol::detect(&sniff[..n]) {
let srv_ctx = ServerCtx::new(
&proxy_ctx,
&local_addr,
&remote_addr,
&orig_dst,
control::pb::proxy::common::Protocol::Http,
);
// record telemetry
let io = sensors.accept(connection, opened_at, &srv_ctx);
match proto {
Protocol::Http1 => {
trace!("transparency detected HTTP/1");
Box::new(new_service.new_service()
.map_err(|_| ())
.and_then(move |s| {
let svc = HyperServerSvc::new(s, srv_ctx);
h1.serve_connection(io, svc)
.map(|_| ())
.map_err(|_| ())
}))
},
Protocol::Http2 => {
trace!("transparency detected HTTP/2");
let set_ctx = move |request: &mut http::Request<()>| {
request.extensions_mut().insert(srv_ctx.clone());
};
Box::new(h2.serve_modified(io, set_ctx).map_err(|_| ()))
}
}
} else {
trace!("transparency did not detect protocol, treating as TCP");
let srv_ctx = ServerCtx::new(
&proxy_ctx,
&local_addr,
&remote_addr,
&orig_dst,
control::pb::proxy::common::Protocol::Tcp,
);
// record telemetry
let tcp_in = sensors.accept(connection, opened_at, &srv_ctx);
tcp.serve(tcp_in, srv_ctx)
}
});
self.executor.spawn(fut);
}
}

View File

@ -0,0 +1,84 @@
use std::sync::Arc;
use std::time::Duration;
use futures::{future, Future};
use tokio_connect::Connect;
use tokio_core::reactor::Handle;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::io::copy;
use control;
use ctx::transport::{Client as ClientCtx, Server as ServerCtx};
use telemetry::Sensors;
use timeout::Timeout;
use transport;
/// TCP Server Proxy
#[derive(Debug, Clone)]
pub struct Proxy {
connect_timeout: Duration,
executor: Handle,
sensors: Sensors,
}
impl Proxy {
/// Create a new TCP `Proxy`.
pub fn new(connect_timeout: Duration, sensors: Sensors, executor: &Handle) -> Self {
Self {
connect_timeout,
executor: executor.clone(),
sensors,
}
}
/// Serve a TCP connection, trying to forward it to its destination.
pub fn serve<T>(&self, tcp_in: T, srv_ctx: Arc<ServerCtx>) -> Box<Future<Item=(), Error=()>>
where
T: AsyncRead + AsyncWrite + 'static,
{
let orig_dst = srv_ctx.orig_dst_if_not_local();
// For TCP, we really have no extra information other than the
// SO_ORIGINAL_DST socket option. If that isn't set, the only thing
// to do is to drop this connection.
let orig_dst = if let Some(orig_dst) = orig_dst {
debug!(
"tcp accepted, forwarding ({}) to {}",
srv_ctx.remote,
orig_dst,
);
orig_dst
} else {
debug!(
"tcp accepted, no SO_ORIGINAL_DST to forward: remote={}",
srv_ctx.remote,
);
return Box::new(future::ok(()));
};
let client_ctx = ClientCtx::new(
&srv_ctx.proxy,
&orig_dst,
control::pb::proxy::common::Protocol::Tcp,
);
let c = Timeout::new(
transport::Connect::new(orig_dst, &self.executor),
self.connect_timeout,
&self.executor,
);
let connect = self.sensors.connect(c, &client_ctx);
let fut = connect.connect()
.map_err(|e| debug!("tcp connect error: {:?}", e))
.and_then(move |tcp_out| {
let (in_r, in_w) = tcp_in.split();
let (out_r, out_w) = tcp_out.split();
copy(in_r, out_w)
.join(copy(out_r, in_w))
.map(|_| ())
.map_err(|e| debug!("tcp error: {}", e))
});
Box::new(fut)
}
}

View File

@ -2,4 +2,4 @@ mod connect;
mod so_original_dst;
pub use self::connect::{Connect, LookupAddressAndConnect, TimeoutConnect, TimeoutError};
pub use self::so_original_dst::get_original_dst;
pub use self::so_original_dst::{GetOriginalDst, SoOriginalDst};

View File

@ -1,22 +1,34 @@
use std::net::SocketAddr;
use tokio_core::net::TcpStream;
#[cfg(not(target_os = "linux"))]
pub fn get_original_dst(_: &TcpStream) -> Option<SocketAddr> {
debug!("no support for SO_ORIGINAL_DST");
None
/// A generic way to get the original destination address of a socket.
///
/// This is especially useful to allow tests to provide a mock implementation.
pub trait GetOriginalDst {
fn get_original_dst(&self, socket: &TcpStream) -> Option<SocketAddr>;
}
// TODO change/remove once https://github.com/tokio-rs/tokio/issues/25 is addressed
#[cfg(target_os = "linux")]
pub fn get_original_dst(sock: &TcpStream) -> Option<SocketAddr> {
use self::linux;
use std::os::unix::io::AsRawFd;
#[derive(Copy, Clone, Debug)]
pub struct SoOriginalDst;
debug!("get_original_dst {:?}", sock);
impl GetOriginalDst for SoOriginalDst {
#[cfg(not(target_os = "linux"))]
fn get_original_dst(&self, _: &TcpStream) -> Option<SocketAddr> {
debug!("no support for SO_ORIGINAL_DST");
None
}
let res = unsafe { linux::so_original_dst(sock.as_raw_fd()) };
res.ok()
// TODO change/remove once https://github.com/tokio-rs/tokio/issues/25 is addressed
#[cfg(target_os = "linux")]
fn get_original_dst(&self, sock: &TcpStream) -> Option<SocketAddr> {
use self::linux;
use std::os::unix::io::AsRawFd;
debug!("get_original_dst {:?}", sock);
let res = unsafe { linux::so_original_dst(sock.as_raw_fd()) };
res.ok()
}
}
#[cfg(target_os = "linux")]

View File

@ -1,6 +1,3 @@
#[macro_use]
extern crate log;
mod support;
use self::support::*;
@ -34,6 +31,29 @@ fn outbound_reconnects_if_controller_stream_ends() {
assert_eq!(client.get("/recon"), "nect");
}
#[test]
fn outbound_updates_newer_services() {
let _ = env_logger::init();
//TODO: when the support server can listen on both http1 and http2
//at the same time, do that here
let srv = server::http1().route("/h1", "hello h1").run();
let ctrl = controller::new()
.destination("test.conduit.local", srv.addr)
.run();
let proxy = proxy::new().controller(ctrl).outbound(srv).run();
// the HTTP2 service starts watching first, receiving an addr
// from the controller
let client1 = client::http2(proxy.outbound, "test.conduit.local");
client1.get("/h2"); // 500, ignore
// a new HTTP1 service needs to be build now, while the HTTP2
// service already exists, so make sure previously sent addrs
// get into the newer service
let client2 = client::http1(proxy.outbound, "test.conduit.local");
assert_eq!(client2.get("/h1"), "hello h1");
}
#[test]
#[ignore]
fn outbound_times_out() {

View File

@ -2,84 +2,160 @@ use support::*;
use self::futures::sync::{mpsc, oneshot};
use self::tokio_core::net::TcpStream;
use self::tower_h2::client::Error;
type Request = http::Request<()>;
type Response = http::Response<RecvBody>;
type Sender = mpsc::UnboundedSender<(Request, oneshot::Sender<Result<Response, Error>>)>;
type Response = http::Response<BodyStream>;
type BodyStream = Box<Stream<Item=Bytes, Error=String> + Send>;
type Sender = mpsc::UnboundedSender<(Request, oneshot::Sender<Result<Response, String>>)>;
pub fn new<T: Into<String>>(addr: SocketAddr, auth: T) -> Client {
Client::new(addr, auth.into())
http2(addr, auth.into())
}
pub fn http1<T: Into<String>>(addr: SocketAddr, auth: T) -> Client {
Client::new(addr, auth.into(), Run::Http1 {
absolute_uris: false,
})
}
// This sends `GET http://foo.com/ HTTP/1.1` instead of just `GET / HTTP/1.1`.
pub fn http1_absolute_uris<T: Into<String>>(addr: SocketAddr, auth: T) -> Client {
Client::new(addr, auth.into(), Run::Http1 {
absolute_uris: true,
})
}
pub fn http2<T: Into<String>>(addr: SocketAddr, auth: T) -> Client {
Client::new(addr, auth.into(), Run::Http2)
}
pub fn tcp(addr: SocketAddr) -> tcp::TcpClient {
tcp::client(addr)
}
#[derive(Debug)]
pub struct Client {
authority: String,
tx: Sender,
version: http::Version,
}
impl Client {
pub fn new(addr: SocketAddr, authority: String) -> Client {
fn new(addr: SocketAddr, authority: String, r: Run) -> Client {
let v = match r {
Run::Http1 { .. } => http::Version::HTTP_11,
Run::Http2 => http::Version::HTTP_2,
};
Client {
authority,
tx: run(addr),
tx: run(addr, r),
version: v,
}
}
pub fn get(&self, path: &str) -> String {
let (tx, rx) = oneshot::channel();
let req = Request::builder()
.method("GET")
.uri(format!("http://{}{}", self.authority, path).as_str())
.version(http::Version::HTTP_2)
.body(())
.unwrap();
let _ = self.tx.unbounded_send((req, tx));
rx.map_err(|_| panic!("client request dropped"))
.and_then(|res| {
let stream = RecvBodyStream(res.unwrap().into_parts().1);
stream.concat2()
})
let mut req = self.request_builder(path);
let res = self.request(req.method("GET"));
let stream = res.into_parts().1;
stream.concat2()
.map(|body| ::std::str::from_utf8(&body).unwrap().to_string())
.wait()
.unwrap()
}
pub fn request(&self, builder: &mut http::request::Builder) -> Response {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send((builder.body(()).unwrap(), tx));
rx.map_err(|_| panic!("client request dropped"))
.wait()
.map(|result| result.unwrap())
.unwrap()
}
pub fn request_builder(&self, path: &str) -> http::request::Builder {
let mut b = Request::builder();
b.uri(format!("http://{}{}", self.authority, path).as_str())
.version(self.version);
b
}
}
fn run(addr: SocketAddr) -> Sender {
let (tx, rx) = mpsc::unbounded::<(Request, oneshot::Sender<Result<Response, Error>>)>();
enum Run {
Http1 {
absolute_uris: bool,
},
Http2,
}
::std::thread::Builder::new()
.name("support client".into())
.spawn(move || {
let mut core = Core::new().unwrap();
let reactor = core.handle();
fn run(addr: SocketAddr, version: Run) -> Sender {
let (tx, rx) = mpsc::unbounded::<(Request, oneshot::Sender<Result<Response, String>>)>();
let conn = Conn(addr, reactor.clone());
let h2 = tower_h2::Client::<Conn, Handle, ()>::new(
conn,
Default::default(),
reactor.clone(),
);
::std::thread::Builder::new().name("support client".into()).spawn(move || {
let mut core = Core::new().unwrap();
let reactor = core.handle();
let done = h2.new_service()
.map_err(move |err| println!("connect error ({:?}): {:?}", addr, err))
.and_then(move |mut h2| {
rx.for_each(move |(req, cb)| {
let fut = h2.call(req).then(|result| {
let _ = cb.send(result);
Ok(())
});
reactor.spawn(fut);
let conn = Conn(addr, reactor.clone());
let work: Box<Future<Item=(), Error=()>> = match version {
Run::Http1 { absolute_uris } => {
let client = hyper::Client::configure()
.connector(conn)
.build(&reactor);
Box::new(rx.for_each(move |(req, cb)| {
let mut req = hyper::Request::from(req.map(|()| hyper::Body::empty()));
if absolute_uris {
req.set_proxy(true);
}
let fut = client.request(req).then(move |result| {
let result = result
.map(|res| {
let res = http::Response::from(res);
res.map(|body| -> BodyStream {
Box::new(body.map(|chunk| chunk.into())
.map_err(|e| e.to_string()))
})
})
.map_err(|e| e.to_string());
let _ = cb.send(result);
Ok(())
})
});
reactor.spawn(fut);
Ok(())
})
.map(|_| ())
.map_err(|e| println!("client error: {:?}", e));
.map_err(|e| println!("client error: {:?}", e)))
},
Run::Http2 => {
let h2 = tower_h2::Client::<Conn, Handle, ()>::new(
conn,
Default::default(),
reactor.clone(),
);
core.run(done).unwrap();
})
.unwrap();
Box::new(h2.new_service()
.map_err(move |err| println!("connect error ({:?}): {:?}", addr, err))
.and_then(move |mut h2| {
rx.for_each(move |(req, cb)| {
let fut = h2.call(req).then(|result| {
let result = result
.map(|res| {
res.map(|body| -> BodyStream {
Box::new(RecvBodyStream(body).map_err(|e| format!("{:?}", e)))
})
})
.map_err(|e| format!("{:?}", e));
let _ = cb.send(result);
Ok(())
});
reactor.spawn(fut);
Ok(())
})
})
.map(|_| ())
.map_err(|e| println!("client error: {:?}", e)))
}
};
core.run(work).unwrap();
}).unwrap();
tx
}
@ -96,3 +172,16 @@ impl Connect for Conn {
Box::new(c)
}
}
impl hyper::client::Service for Conn {
type Request = hyper::Uri;
type Response = TcpStream;
type Future = Box<Future<Item = TcpStream, Error = ::std::io::Error>>;
type Error = ::std::io::Error;
fn call(&self, _: hyper::Uri) -> <Self as hyper::client::Service>::Future {
let c = TcpStream::connect(&self.0, &self.1)
.and_then(|tcp| tcp.set_nodelay(true).map(move |_| tcp));
Box::new(c)
}
}

View File

@ -4,10 +4,12 @@ extern crate bytes;
extern crate conduit_proxy;
extern crate futures;
extern crate h2;
extern crate http;
pub extern crate http;
extern crate hyper;
extern crate prost;
extern crate tokio_connect;
extern crate tokio_core;
pub extern crate tokio_io;
extern crate tower;
extern crate tower_h2;
extern crate url;
@ -16,10 +18,10 @@ pub extern crate env_logger;
use self::bytes::{BigEndian, Bytes, BytesMut};
pub use self::futures::*;
use self::futures::sync::oneshot;
use self::http::{HeaderMap, Request};
pub use self::http::{HeaderMap, Request, Response};
use self::http::header::HeaderValue;
use self::tokio_connect::Connect;
use self::tokio_core::net::TcpListener;
use self::tokio_core::net::{TcpListener, TcpStream};
use self::tokio_core::reactor::{Core, Handle};
use self::tower::{NewService, Service};
use self::tower_h2::{Body, RecvBody};
@ -30,6 +32,7 @@ pub mod client;
pub mod controller;
pub mod proxy;
pub mod server;
mod tcp;
pub type Shutdown = oneshot::Sender<()>;
pub type ShutdownRx = future::Then<
@ -55,3 +58,7 @@ impl Stream for RecvBodyStream {
Ok(Async::Ready(data.map(From::from)))
}
}
pub fn s(bytes: &[u8]) -> &str {
::std::str::from_utf8(bytes.as_ref()).unwrap()
}

View File

@ -1,5 +1,7 @@
use support::*;
use std::sync::{Arc, Mutex};
use support::conduit_proxy::convert::TryFrom;
pub fn new() -> Proxy {
@ -60,18 +62,51 @@ impl Proxy {
}
}
#[derive(Clone, Debug)]
struct MockOriginalDst(Arc<Mutex<DstInner>>);
#[derive(Debug, Default)]
struct DstInner {
inbound_orig_addr: Option<SocketAddr>,
inbound_local_addr: Option<SocketAddr>,
outbound_orig_addr: Option<SocketAddr>,
outbound_local_addr: Option<SocketAddr>,
}
impl conduit_proxy::GetOriginalDst for MockOriginalDst {
fn get_original_dst(&self, sock: &TcpStream) -> Option<SocketAddr> {
sock.local_addr()
.ok()
.and_then(|local| {
let inner = self.0.lock().unwrap();
if inner.inbound_local_addr == Some(local) {
inner.inbound_orig_addr
} else if inner.outbound_local_addr == Some(local) {
inner.outbound_orig_addr
} else {
None
}
})
}
}
fn run(proxy: Proxy) -> Listening {
use self::conduit_proxy::config;
let controller = proxy.controller.expect("proxy controller missing");
let inbound = proxy.inbound;
let outbound = proxy.outbound;
let mut mock_orig_dst = DstInner::default();
let mut env = config::TestEnv::new();
env.put(config::ENV_CONTROL_URL, format!("tcp://{}", controller.addr));
env.put(config::ENV_PRIVATE_LISTENER, "tcp://127.0.0.1:0".to_owned());
if let Some(ref inbound) = inbound {
env.put(config::ENV_PRIVATE_FORWARD, format!("tcp://{}", inbound.addr));
mock_orig_dst.inbound_orig_addr = Some(inbound.addr);
}
if let Some(ref outbound) = outbound {
mock_orig_dst.outbound_orig_addr = Some(outbound.addr);
}
env.put(config::ENV_PUBLIC_LISTENER, "tcp://127.0.0.1:0".to_owned());
env.put(config::ENV_CONTROL_LISTENER, "tcp://127.0.0.1:0".to_owned());
@ -86,12 +121,20 @@ fn run(proxy: Proxy) -> Listening {
config.metrics_flush_interval = dur;
}
let main = conduit_proxy::Main::new(config);
let mock_orig_dst = MockOriginalDst(Arc::new(Mutex::new(mock_orig_dst)));
let main = conduit_proxy::Main::new(config, mock_orig_dst.clone());
let control_addr = main.control_addr();
let inbound_addr = main.inbound_addr();
let outbound_addr = main.outbound_addr();
{
let mut inner = mock_orig_dst.0.lock().unwrap();
inner.inbound_local_addr = Some(inbound_addr);
inner.outbound_local_addr = Some(outbound_addr);
}
let (running_tx, running_rx) = shutdown_signal();
let (tx, rx) = shutdown_signal();

View File

@ -4,78 +4,123 @@ use std::sync::Arc;
use support::*;
pub fn new() -> Server {
Server::new()
http2()
}
pub fn http1() -> Server {
Server::http1()
}
pub fn http2() -> Server {
Server::http2()
}
pub fn tcp() -> tcp::TcpServer {
tcp::server()
}
#[derive(Debug)]
pub struct Server {
routes: HashMap<String, String>,
routes: HashMap<String, Route>,
version: Run,
}
#[derive(Debug)]
pub struct Listening {
pub addr: SocketAddr,
shutdown: Shutdown,
pub(super) shutdown: Shutdown,
}
impl Server {
pub fn new() -> Self {
fn new(run: Run) -> Self {
Server {
routes: HashMap::new(),
version: run,
}
}
fn http1() -> Self {
Server::new(Run::Http1)
}
fn http2() -> Self {
Server::new(Run::Http2)
}
pub fn route(mut self, path: &str, resp: &str) -> Self {
self.routes.insert(path.into(), resp.into());
self.routes.insert(path.into(), Route::string(resp));
self
}
pub fn route_fn<F>(mut self, path: &str, cb: F) -> Self
where
F: Fn(Request<()>) -> Response<String> + Send + 'static,
{
self.routes.insert(path.into(), Route(Box::new(cb)));
self
}
pub fn run(self) -> Listening {
let (tx, rx) = shutdown_signal();
let (addr_tx, addr_rx) = oneshot::channel();
::std::thread::Builder::new()
.name("support server".into())
.spawn(move || {
let mut core = Core::new().unwrap();
let reactor = core.handle();
::std::thread::Builder::new().name("support server".into()).spawn(move || {
let mut core = Core::new().unwrap();
let reactor = core.handle();
let h2 = tower_h2::Server::new(
NewSvc(Arc::new(self.routes)),
Default::default(),
reactor.clone(),
);
let new_svc = NewSvc(Arc::new(self.routes));
let addr = ([127, 0, 0, 1], 0).into();
let bind = TcpListener::bind(&addr, &reactor).expect("bind");
let srv: Box<Fn(TcpStream) -> Box<Future<Item=(), Error=()>>> = match self.version {
Run::Http1 => {
let h1 = hyper::server::Http::<hyper::Chunk>::new();
let local_addr = bind.local_addr().expect("local_addr");
info!("bound listener, sending addr: {}", local_addr);
let _ = addr_tx.send(local_addr);
Box::new(move |sock| {
let h1_clone = h1.clone();
let conn = new_svc.new_service()
.from_err()
.and_then(move |svc| h1_clone.serve_connection(sock, svc))
.map(|_| ())
.map_err(|e| println!("server h1 error: {}", e));
Box::new(conn)
})
},
Run::Http2 => {
let h2 = tower_h2::Server::new(
new_svc,
Default::default(),
reactor.clone(),
);
Box::new(move |sock| {
let conn = h2.serve(sock)
.map_err(|e| println!("server h2 error: {:?}", e));
Box::new(conn)
})
},
};
let serve = bind.incoming()
.fold((h2, reactor), |(h2, reactor), (sock, _)| {
if let Err(e) = sock.set_nodelay(true) {
return Err(e);
}
let addr = ([127, 0, 0, 1], 0).into();
let bind = TcpListener::bind(&addr, &reactor).expect("bind");
let serve = h2.serve(sock);
reactor.spawn(serve.map_err(|e| println!("server error: {:?}", e)));
let local_addr = bind.local_addr().expect("local_addr");
let _ = addr_tx.send(local_addr);
Ok((h2, reactor))
});
let serve = bind.incoming()
.fold((srv, reactor), |(srv, reactor), (sock, _)| {
if let Err(e) = sock.set_nodelay(true) {
return Err(e);
}
reactor.spawn(srv(sock));
core.handle().spawn(
serve
.map(|_| ())
.map_err(|e| println!("server error: {}", e)),
);
Ok((srv, reactor))
});
info!("running");
core.run(rx).unwrap();
})
.unwrap();
core.handle().spawn(
serve
.map(|_| ())
.map_err(|e| println!("server error: {}", e)),
);
core.run(rx).unwrap();
}).unwrap();
info!("awaiting listening addr");
let addr = addr_rx.wait().expect("addr");
Listening {
@ -85,7 +130,11 @@ impl Server {
}
}
type Response = http::Response<RspBody>;
#[derive(Debug)]
enum Run {
Http1,
Http2,
}
struct RspBody(Option<Bytes>);
@ -115,31 +164,49 @@ impl Body for RspBody {
}
}
struct Route(Box<Fn(Request<()>) -> Response<String> + Send>);
impl Route {
fn string(body: &str) -> Route {
let body = body.to_owned();
Route(Box::new(move |_| {
http::Response::builder()
.status(200)
.body(body.clone())
.unwrap()
}))
}
}
impl ::std::fmt::Debug for Route {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
f.write_str("Route")
}
}
#[derive(Debug)]
struct Svc(Arc<HashMap<String, String>>);
struct Svc(Arc<HashMap<String, Route>>);
impl Service for Svc {
type Request = Request<RecvBody>;
type Response = Response;
type Response = Response<RspBody>;
type Error = h2::Error;
type Future = future::FutureResult<Response, Self::Error>;
type Future = future::FutureResult<Self::Response, Self::Error>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, req: Self::Request) -> Self::Future {
let mut rsp = http::Response::builder();
rsp.version(http::Version::HTTP_2);
let path = req.uri().path();
let rsp = match self.0.get(path) {
Some(body) => {
let body = RspBody::new(body.as_bytes().into());
rsp.status(200).body(body).unwrap()
let rsp = match self.0.get(req.uri().path()) {
Some(route) => {
(route.0)(req.map(|_| ()))
.map(|s| RspBody::new(s.as_bytes().into()))
}
None => {
println!("server 404: {:?}", path);
println!("server 404: {:?}", req.uri().path());
let mut rsp = http::Response::builder();
rsp.version(http::Version::HTTP_2);
let body = RspBody::empty();
rsp.status(404).body(body).unwrap()
}
@ -148,11 +215,37 @@ impl Service for Svc {
}
}
impl hyper::server::Service for Svc {
type Request = hyper::server::Request;
type Response = hyper::server::Response<hyper::Body>;
type Error = hyper::Error;
type Future = future::FutureResult<hyper::server::Response<hyper::Body>, hyper::Error>;
fn call(&self, req: Self::Request) -> Self::Future {
let rsp = match self.0.get(req.uri().path()) {
Some(route) => {
(route.0)(Request::from(req).map(|_| ()))
.map(|s| hyper::Body::from(s))
.into()
}
None => {
println!("server 404: {:?}", req.uri().path());
let rsp = hyper::server::Response::new();
let body = hyper::Body::empty();
rsp.with_status(hyper::NotFound)
.with_body(body)
}
};
future::ok(rsp)
}
}
#[derive(Debug)]
struct NewSvc(Arc<HashMap<String, String>>);
struct NewSvc(Arc<HashMap<String, Route>>);
impl NewService for NewSvc {
type Request = Request<RecvBody>;
type Response = Response;
type Response = Response<RspBody>;
type Error = h2::Error;
type InitError = ::std::io::Error;
type Service = Svc;

190
proxy/tests/support/tcp.rs Normal file
View File

@ -0,0 +1,190 @@
use support::*;
use std::collections::VecDeque;
use std::io;
use self::futures::sync::{mpsc, oneshot};
use self::tokio_core::net::TcpStream;
type TcpSender = mpsc::UnboundedSender<oneshot::Sender<TcpConnSender>>;
type TcpConnSender = mpsc::UnboundedSender<(Option<Vec<u8>>, oneshot::Sender<io::Result<Option<Vec<u8>>>>)>;
pub fn client(addr: SocketAddr) -> TcpClient {
let tx = run_client(addr);
TcpClient {
tx,
}
}
pub fn server() -> TcpServer {
TcpServer {
accepts: VecDeque::new(),
}
}
pub struct TcpClient {
tx: TcpSender,
}
pub struct TcpServer {
accepts: VecDeque<Box<Fn(Vec<u8>) -> Vec<u8> + Send>>,
}
pub struct TcpConn {
tx: TcpConnSender,
}
impl TcpClient {
pub fn connect(&self) -> TcpConn {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(tx);
let tx = rx.map_err(|_| panic!("tcp connect dropped"))
.wait()
.unwrap();
TcpConn {
tx,
}
}
}
impl TcpServer {
pub fn accept<F, U>(mut self, cb: F) -> Self
where
F: Fn(Vec<u8>) -> U + Send + 'static,
U: Into<Vec<u8>>,
{
self.accepts.push_back(Box::new(move |v| cb(v).into()));
self
}
pub fn run(self) -> server::Listening {
run_server(self)
}
}
impl TcpConn {
pub fn read(&self) -> Vec<u8> {
self.try_read().expect("read")
}
pub fn try_read(&self) -> io::Result<Vec<u8>> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send((None, tx));
rx.map_err(|_| panic!("tcp read dropped"))
.map(|res| res.map(|opt| opt.unwrap()))
.wait()
.unwrap()
}
pub fn write<T: Into<Vec<u8>>>(&self, buf: T) {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send((Some(buf.into()), tx));
rx.map_err(|_| panic!("tcp write dropped"))
.map(|rsp| assert!(rsp.unwrap().is_none()))
.wait()
.unwrap()
}
}
fn run_client(addr: SocketAddr) -> TcpSender {
let (tx, rx) = mpsc::unbounded();
::std::thread::Builder::new().name("support client".into()).spawn(move || {
let mut core = Core::new().unwrap();
let handle = core.handle();
let work = rx.for_each(|cb: oneshot::Sender<_>| {
let fut = TcpStream::connect(&addr, &handle)
.map_err(|e| panic!("connect error: {}", e))
.and_then(move |tcp| {
let (tx, rx) = mpsc::unbounded();
cb.send(tx).unwrap();
rx.fold(tcp, |tcp, (action, cb): (Option<Vec<u8>>, oneshot::Sender<io::Result<Option<Vec<u8>>>>)| {
let f: Box<Future<Item=TcpStream, Error=()>> = match action {
None => {
Box::new(tokio_io::io::read(tcp, vec![0; 1024])
.then(move |res| {
match res {
Ok((tcp, mut vec, n)) => {
vec.truncate(n);
cb.send(Ok(Some(vec))).unwrap();
Ok(tcp)
}
Err(e) => {
cb.send(Err(e)).unwrap();
Err(())
}
}
}))
},
Some(vec) => {
Box::new(tokio_io::io::write_all(tcp, vec)
.then(move |res| {
match res {
Ok((tcp, _)) => {
cb.send(Ok(None)).unwrap();
Ok(tcp)
},
Err(e) => {
cb.send(Err(e)).unwrap();
Err(())
}
}
}))
}
};
f
})
.map(|_| ())
.map_err(|_| ())
});
handle.spawn(fut);
Ok(())
}).map_err(|e| println!("client error: {:?}", e));
core.run(work).unwrap();
}).unwrap();
tx
}
fn run_server(tcp: TcpServer) -> server::Listening {
let (tx, rx) = shutdown_signal();
let (addr_tx, addr_rx) = oneshot::channel();
::std::thread::Builder::new().name("support server".into()).spawn(move || {
let mut core = Core::new().unwrap();
let reactor = core.handle();
let addr = ([127, 0, 0, 1], 0).into();
let bind = TcpListener::bind(&addr, &reactor).expect("bind");
let local_addr = bind.local_addr().expect("local_addr");
let _ = addr_tx.send(local_addr);
let mut accepts = tcp.accepts;
let work = bind.incoming().for_each(move |(sock, _)| {
let cb = accepts.pop_front().expect("no more accepts");
let fut = tokio_io::io::read(sock, vec![0; 1024])
.and_then(move |(sock, mut vec, n)| {
vec.truncate(n);
let write = cb(vec);
tokio_io::io::write_all(sock, write)
})
.map(|_| ())
.map_err(|e| panic!("tcp server error: {}", e));
reactor.spawn(fut);
Ok(())
});
core.run(work).unwrap();
}).unwrap();
let addr = addr_rx.wait().expect("addr");
server::Listening {
addr,
shutdown: tx,
}
}

290
proxy/tests/transparency.rs Normal file
View File

@ -0,0 +1,290 @@
mod support;
use self::support::*;
#[test]
fn outbound_http1() {
let _ = env_logger::init();
let srv = server::http1().route("/", "hello h1").run();
let ctrl = controller::new()
.destination("test.conduit.local", srv.addr)
.run();
let proxy = proxy::new().controller(ctrl).outbound(srv).run();
let client = client::http1(proxy.outbound, "test.conduit.local");
assert_eq!(client.get("/"), "hello h1");
}
#[test]
fn inbound_http1() {
let _ = env_logger::init();
let srv = server::http1().route("/", "hello h1").run();
let ctrl = controller::new().run();
let proxy = proxy::new()
.controller(ctrl)
.inbound(srv)
.run();
let client = client::http1(proxy.inbound, "test.conduit.local");
assert_eq!(client.get("/"), "hello h1");
}
#[test]
fn http1_connect_not_supported() {
let _ = env_logger::init();
let srv = server::tcp()
.run();
let ctrl = controller::new().run();
let proxy = proxy::new()
.controller(ctrl)
.inbound(srv)
.run();
let client = client::tcp(proxy.inbound);
let tcp_client = client.connect();
tcp_client.write("CONNECT foo.bar:443 HTTP/1.1\r\nHost: foo.bar:443\r\n\r\n");
let expected = "HTTP/1.1 502 Bad Gateway\r\n";
assert_eq!(s(&tcp_client.read()[..expected.len()]), expected);
}
#[test]
fn http1_removes_connection_headers() {
let _ = env_logger::init();
let srv = server::http1()
.route_fn("/", |req| {
assert!(!req.headers().contains_key("x-foo-bar"));
Response::builder()
.header("x-server-quux", "lorem ipsum")
.header("connection", "close, x-server-quux")
.body("".into())
.unwrap()
})
.run();
let ctrl = controller::new().run();
let proxy = proxy::new()
.controller(ctrl)
.inbound(srv)
.run();
let client = client::http1(proxy.inbound, "test.conduit.local");
let res = client.request(client.request_builder("/")
.header("x-foo-bar", "baz")
.header("connection", "x-foo-bar, close"));
assert_eq!(res.status(), http::StatusCode::OK);
assert!(!res.headers().contains_key("x-server-quux"));
}
#[test]
fn http10_with_host() {
let _ = env_logger::init();
let host = "test.conduit.local";
let srv = server::http1()
.route_fn("/", move |req| {
assert_eq!(req.version(), http::Version::HTTP_10);
assert_eq!(req.headers().get("host").unwrap(), host);
Response::builder()
.version(http::Version::HTTP_10)
.body("".into())
.unwrap()
})
.run();
let ctrl = controller::new().run();
let proxy = proxy::new()
.controller(ctrl)
.inbound(srv)
.run();
let client = client::http1(proxy.inbound, host);
let res = client.request(client.request_builder("/")
.version(http::Version::HTTP_10)
.header("host", host));
assert_eq!(res.status(), http::StatusCode::OK);
assert_eq!(res.version(), http::Version::HTTP_10);
}
#[test]
fn http10_without_host() {
let _ = env_logger::init();
let srv = server::http1()
.route_fn("/", move |req| {
assert_eq!(req.version(), http::Version::HTTP_10);
Response::builder()
.version(http::Version::HTTP_10)
.body("".into())
.unwrap()
})
.run();
let ctrl = controller::new()
.destination(&srv.addr.to_string(), srv.addr)
.run();
let proxy = proxy::new()
.controller(ctrl)
.outbound(srv)
.run();
let client = client::http1(proxy.outbound, "foo.bar");
let res = client.request(client.request_builder("/")
.version(http::Version::HTTP_10)
.header("host", ""));
assert_eq!(res.status(), http::StatusCode::OK);
assert_eq!(res.version(), http::Version::HTTP_10);
}
#[test]
fn http11_absolute_uri_differs_from_host() {
let _ = env_logger::init();
let host = "test.conduit.local";
let srv = server::http1()
.route_fn("/", move |req| {
assert_eq!(req.version(), http::Version::HTTP_11);
assert_eq!(req.headers().get("host").unwrap(), host);
Response::builder()
.body("".into())
.unwrap()
})
.run();
let ctrl = controller::new().run();
let proxy = proxy::new()
.controller(ctrl)
.inbound(srv)
.run();
let client = client::http1_absolute_uris(proxy.inbound, host);
let res = client.request(client.request_builder("/")
.version(http::Version::HTTP_11)
.header("host", "foo.bar"));
assert_eq!(res.status(), http::StatusCode::OK);
assert_eq!(res.version(), http::Version::HTTP_11);
}
#[test]
fn outbound_tcp() {
let _ = env_logger::init();
let msg1 = "custom tcp hello";
let msg2 = "custom tcp bye";
let srv = server::tcp()
.accept(move |read| {
assert_eq!(read, msg1.as_bytes());
msg2
})
.run();
let ctrl = controller::new().run();
let proxy = proxy::new()
.controller(ctrl)
.outbound(srv)
.run();
let client = client::tcp(proxy.outbound);
let tcp_client = client.connect();
tcp_client.write(msg1);
assert_eq!(tcp_client.read(), msg2.as_bytes());
}
#[test]
fn inbound_tcp() {
let _ = env_logger::init();
let msg1 = "custom tcp hello";
let msg2 = "custom tcp bye";
let srv = server::tcp()
.accept(move |read| {
assert_eq!(read, msg1.as_bytes());
msg2
})
.run();
let ctrl = controller::new().run();
let proxy = proxy::new()
.controller(ctrl)
.inbound(srv)
.run();
let client = client::tcp(proxy.inbound);
let tcp_client = client.connect();
tcp_client.write(msg1);
assert_eq!(tcp_client.read(), msg2.as_bytes());
}
#[test]
fn tcp_with_no_orig_dst() {
let _ = env_logger::init();
let srv = server::tcp().run();
let ctrl = controller::new().run();
let proxy = proxy::new()
.controller(ctrl)
.inbound(srv)
.run();
// no outbound configured for proxy
let client = client::tcp(proxy.outbound);
let tcp_client = client.connect();
tcp_client.write("custom tcp hello");
assert!(tcp_client.try_read().is_err());
}
#[test]
fn http11_upgrade_not_supported() {
let _ = env_logger::init();
// our h1 proxy will strip the Connection header
// and headers it mentions
let msg1 = "\
GET /chat HTTP/1.1\r\n\
Host: foo.bar\r\n\
Connection: Upgrade\r\n\
Upgrade: websocket\r\n\
\r\n\
";
// but let's pretend the server tries to upgrade
// anyways
let msg2 = "\
HTTP/1.1 101 Switching Protocols\r\n\
Upgrade: websocket\r\n\
Connection: Upgrade\r\n\
\r\n\
";
let srv = server::tcp()
.accept(move |read| {
let head = s(&read);
assert!(!head.contains("Upgrade: websocket"));
msg2
})
.run();
let ctrl = controller::new().run();
let proxy = proxy::new()
.controller(ctrl)
.inbound(srv)
.run();
let client = client::tcp(proxy.inbound);
let tcp_client = client.connect();
tcp_client.write(msg1);
let expected = "HTTP/1.1 502 Bad Gateway\r\n";
assert_eq!(s(&tcp_client.read()[..expected.len()]), expected);
}

View File

@ -11,7 +11,6 @@ use tower::{NewService, Service};
use std::marker::PhantomData;
/// Attaches service implementations to h2 connections.
#[derive(Clone)]
pub struct Server<S, E, B>
where S: NewService,
B: Body,
@ -153,6 +152,23 @@ where S: NewService<Request = http::Request<RecvBody>, Response = Response<B>>,
}
}
// B doesn't need to be Clone, it's just a marker type.
impl<S, E, B> Clone for Server<S, E, B>
where
S: NewService + Clone,
E: Clone,
B: Body,
{
fn clone(&self) -> Self {
Server {
new_service: self.new_service.clone(),
executor: self.executor.clone(),
builder: self.builder.clone(),
_p: PhantomData,
}
}
}
// ===== impl Connection =====
impl<T, S, E, B, F> Connection<T, S, E, B, F>