Introduce outbound route metrics (#117)

The Destination Profile API---provided by linkerd2-proxy-api v0.1.3--
allows the proxy to discovery route information for an HTTP service. As
the proxy processes outbound requests, in addition to doing address
resolution through the Destination service, the proxy may also discover
profiles including route patterns and labels.

When the proxy has route information for a destination, it applies the
RequestMatch for each route to find the first-matching route. The
route's labels are used to expose `route_`-prefixed HTTP metrics (and
each label is prefixed with `rt_`).

Furthermore, if a route includes ResponseMatches, they are used to
perform classification (i.e. for the `response_total` and
`route_response_total` metrics).

A new `proxy::http::profiles` module implements a router that consumes
routes from an infinite stream of route lists.

The `app::profiles` module implements a client that continually and
repeatedly tries to establish a watch for the destination's routes (with
some backoff).

Route discovery does not _block_ routing; that is, the first request to
a destination will likely be processed before the route information is
retrieved from the controller (i.e. on the default route). Route
configuration is applied in a best-effort fashion.
This commit is contained in:
Oliver Gould 2018-11-05 16:30:39 -08:00 committed by GitHub
parent 0b6e35857b
commit 5e0a15b8a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 1077 additions and 348 deletions

View File

@ -525,7 +525,7 @@ dependencies = [
"libc 0.2.40 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.40 (registry+https://github.com/rust-lang/crates.io-index)",
"linkerd2-fs-watch 0.1.0", "linkerd2-fs-watch 0.1.0",
"linkerd2-metrics 0.1.0", "linkerd2-metrics 0.1.0",
"linkerd2-proxy-api 0.1.1 (git+https://github.com/linkerd/linkerd2-proxy-api?tag=v0.1.1)", "linkerd2-proxy-api 0.1.3 (git+https://github.com/linkerd/linkerd2-proxy-api?tag=v0.1.3)",
"linkerd2-router 0.1.0", "linkerd2-router 0.1.0",
"linkerd2-stack 0.1.0", "linkerd2-stack 0.1.0",
"linkerd2-task 0.1.0", "linkerd2-task 0.1.0",
@ -565,8 +565,8 @@ dependencies = [
[[package]] [[package]]
name = "linkerd2-proxy-api" name = "linkerd2-proxy-api"
version = "0.1.1" version = "0.1.3"
source = "git+https://github.com/linkerd/linkerd2-proxy-api?tag=v0.1.1#b0543809839fd0e6bc7cb8e4a644ce48df88b27d" source = "git+https://github.com/linkerd/linkerd2-proxy-api?tag=v0.1.3#4fe4a6294fc68d1ed83948ddbc5d4f86aec6c5ef"
dependencies = [ dependencies = [
"bytes 0.4.9 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.9 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1690,7 +1690,7 @@ dependencies = [
"checksum lazycell 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a6f08839bc70ef4a3fe1d566d5350f519c5912ea86be0df1740a7d247c7fc0ef" "checksum lazycell 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a6f08839bc70ef4a3fe1d566d5350f519c5912ea86be0df1740a7d247c7fc0ef"
"checksum libc 0.2.40 (registry+https://github.com/rust-lang/crates.io-index)" = "6fd41f331ac7c5b8ac259b8bf82c75c0fb2e469bbf37d2becbba9a6a2221965b" "checksum libc 0.2.40 (registry+https://github.com/rust-lang/crates.io-index)" = "6fd41f331ac7c5b8ac259b8bf82c75c0fb2e469bbf37d2becbba9a6a2221965b"
"checksum linked-hash-map 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7860ec297f7008ff7a1e3382d7f7e1dcd69efc94751a2284bafc3d013c2aa939" "checksum linked-hash-map 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7860ec297f7008ff7a1e3382d7f7e1dcd69efc94751a2284bafc3d013c2aa939"
"checksum linkerd2-proxy-api 0.1.1 (git+https://github.com/linkerd/linkerd2-proxy-api?tag=v0.1.1)" = "<none>" "checksum linkerd2-proxy-api 0.1.3 (git+https://github.com/linkerd/linkerd2-proxy-api?tag=v0.1.3)" = "<none>"
"checksum log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "89f010e843f2b1a31dbd316b3b8d443758bc634bed37aabade59c686d644e0a2" "checksum log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "89f010e843f2b1a31dbd316b3b8d443758bc634bed37aabade59c686d644e0a2"
"checksum lru-cache 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4d06ff7ff06f729ce5f4e227876cb88d10bc59cd4ae1e09fbb2bde15c850dc21" "checksum lru-cache 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4d06ff7ff06f729ce5f4e227876cb88d10bc59cd4ae1e09fbb2bde15c850dc21"
"checksum matches 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "100aabe6b8ff4e4a7e32c1c13523379802df0772b82466207ac25b013f193376" "checksum matches 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "100aabe6b8ff4e4a7e32c1c13523379802df0772b82466207ac25b013f193376"

View File

@ -30,7 +30,7 @@ linkerd2-stack = { path = "lib/stack" }
linkerd2-task = { path = "lib/task" } linkerd2-task = { path = "lib/task" }
linkerd2-timeout = { path = "lib/timeout" } linkerd2-timeout = { path = "lib/timeout" }
linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", tag = "v0.1.1", version = "0.1.1" } linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", tag = "v0.1.3", version = "0.1.3" }
bytes = "0.4" bytes = "0.4"
env_logger = { version = "0.5", default-features = false } env_logger = { version = "0.5", default-features = false }
@ -88,7 +88,7 @@ net2 = "0.2"
quickcheck = { version = "0.6", default-features = false } quickcheck = { version = "0.6", default-features = false }
linkerd2-metrics = { path = "./lib/metrics", features = ["test_util"] } linkerd2-metrics = { path = "./lib/metrics", features = ["test_util"] }
linkerd2-task = { path = "lib/task", features = ["test_util"] } linkerd2-task = { path = "lib/task", features = ["test_util"] }
linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", tag = "v0.1.1", version = "0.1.1", features = ["arbitrary"] } linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", tag = "v0.1.3", version = "0.1.3", features = ["arbitrary"] }
flate2 = { version = "1.0.1", default-features = false, features = ["rust_backend"] } flate2 = { version = "1.0.1", default-features = false, features = ["rust_backend"] }
# `tokio-io` is needed for TCP tests, because `tokio::io` doesn't re-export # `tokio-io` is needed for TCP tests, because `tokio::io` doesn't re-export
# the `read` function. # the `read` function.

View File

@ -2,26 +2,28 @@ use h2;
use http; use http;
pub use proxy::http::classify::{CanClassify, layer}; pub use proxy::http::classify::{CanClassify, layer};
use proxy::http::classify; use proxy::http::{classify, profiles};
#[derive(Clone, Debug, Default)] #[derive(Clone, Debug)]
pub struct Request; pub enum Request {
Default,
Profile(profiles::ResponseClasses),
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum Response { pub enum Response {
Grpc, Grpc,
Http, Http,
Profile(profiles::ResponseClasses),
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum Eos { pub enum Eos {
Http(HttpEos), Http(http::StatusCode),
Grpc(GrpcEos), Grpc(GrpcEos),
Profile(Class),
} }
#[derive(Clone, Debug)]
pub struct HttpEos(http::StatusCode);
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum GrpcEos { pub enum GrpcEos {
NoBody(Class), NoBody(Class),
@ -43,6 +45,22 @@ pub enum SuccessOrFailure {
// === impl Request === // === impl Request ===
impl From<profiles::ResponseClasses> for Request {
fn from(classes: profiles::ResponseClasses) -> Self {
if classes.is_empty() {
Request::Default
} else {
Request::Profile(classes)
}
}
}
impl Default for Request {
fn default() -> Self {
Request::Default
}
}
impl classify::Classify for Request { impl classify::Classify for Request {
type Class = Class; type Class = Class;
type Error = h2::Error; type Error = h2::Error;
@ -50,26 +68,45 @@ impl classify::Classify for Request {
type ClassifyEos = Eos; type ClassifyEos = Eos;
fn classify<B>(&self, req: &http::Request<B>) -> Self::ClassifyResponse { fn classify<B>(&self, req: &http::Request<B>) -> Self::ClassifyResponse {
// Determine if the request is a gRPC request by checking the content-type. match self {
if let Some(ref ct) = req Request::Profile(classes) => Response::Profile(classes.clone()),
.headers() Request::Default => {
.get("content-type") // Determine if the request is a gRPC request by checking the content-type.
.and_then(|v| v.to_str().ok()) let content_type = req
{ .headers()
if ct.starts_with("application/grpc+") { .get("content-type")
return Response::Grpc; .and_then(|v| v.to_str().ok());
if let Some(ref ct) = content_type {
if ct.starts_with("application/grpc+") {
return Response::Grpc;
}
}
Response::Http
} }
} }
Response::Http {}
} }
} }
// === impl Response === // === impl Response ===
impl Default for Response { impl Response {
fn default() -> Self { fn match_class<B>(
Response::Http rsp: &http::Response<B>,
classes: &Vec<profiles::ResponseClass>,
) -> Option<Class> {
for class in classes {
if class.is_match(rsp) {
let result = if class.is_failure() {
SuccessOrFailure::Failure
} else {
SuccessOrFailure::Success
};
return Some(Class::Http(result));
}
}
None
} }
} }
@ -80,11 +117,15 @@ impl classify::ClassifyResponse for Response {
fn start<B>(self, rsp: &http::Response<B>) -> Eos { fn start<B>(self, rsp: &http::Response<B>) -> Eos {
match self { match self {
Response::Http => Eos::Http(HttpEos(rsp.status())), Response::Http => Eos::Http(rsp.status()),
Response::Grpc => Eos::Grpc(match grpc_class(rsp.headers()) { Response::Grpc => match grpc_class(rsp.headers()) {
None => GrpcEos::Open, None => Eos::Grpc(GrpcEos::Open),
Some(class) => GrpcEos::NoBody(class.clone()), Some(class) => Eos::Grpc(GrpcEos::NoBody(class.clone())),
}), },
Response::Profile(ref classes) => match Self::match_class(rsp, classes.as_ref()) {
None => Eos::Http(rsp.status()),
Some(class) => Eos::Profile(class.clone()),
},
} }
} }
@ -93,6 +134,14 @@ impl classify::ClassifyResponse for Response {
} }
} }
impl Default for Response {
fn default() -> Self {
// By default, simply perform HTTP classification. This only applies
// when no `insert` layer is present.
Response::Http
}
}
// === impl Eos === // === impl Eos ===
impl classify::ClassifyEos for Eos { impl classify::ClassifyEos for Eos {
@ -101,50 +150,19 @@ impl classify::ClassifyEos for Eos {
fn eos(self, trailers: Option<&http::HeaderMap>) -> Self::Class { fn eos(self, trailers: Option<&http::HeaderMap>) -> Self::Class {
match self { match self {
Eos::Http(http) => http.eos(trailers), Eos::Http(status) if status.is_server_error() => Class::Http(SuccessOrFailure::Failure),
Eos::Grpc(grpc) => grpc.eos(trailers), Eos::Http(_) => trailers
} .and_then(grpc_class)
} .unwrap_or_else(|| Class::Http(SuccessOrFailure::Success)),
Eos::Grpc(GrpcEos::NoBody(class)) => class,
fn error(self, err: &h2::Error) -> Self::Class { Eos::Grpc(GrpcEos::Open) => trailers
match self {
Eos::Http(http) => http.error(err),
Eos::Grpc(grpc) => grpc.error(err),
}
}
}
impl classify::ClassifyEos for HttpEos {
type Class = Class;
type Error = h2::Error;
fn eos(self, _: Option<&http::HeaderMap>) -> Self::Class {
match self {
HttpEos(status) if status.is_server_error() => Class::Http(SuccessOrFailure::Failure),
HttpEos(_) => Class::Http(SuccessOrFailure::Success),
}
}
fn error(self, err: &h2::Error) -> Self::Class {
Class::Stream(SuccessOrFailure::Failure, format!("{}", err))
}
}
impl classify::ClassifyEos for GrpcEos {
type Class = Class;
type Error = h2::Error;
fn eos(self, trailers: Option<&http::HeaderMap>) -> Self::Class {
match self {
GrpcEos::NoBody(class) => class,
GrpcEos::Open => trailers
.and_then(grpc_class) .and_then(grpc_class)
.unwrap_or_else(|| Class::Grpc(SuccessOrFailure::Success, 0)), .unwrap_or_else(|| Class::Grpc(SuccessOrFailure::Success, 0)),
Eos::Profile(class) => class,
} }
} }
fn error(self, err: &h2::Error) -> Self::Class { fn error(self, err: &h2::Error) -> Self::Class {
// Ignore the original classification when an error is encountered.
Class::Stream(SuccessOrFailure::Failure, format!("{}", err)) Class::Stream(SuccessOrFailure::Failure, format!("{}", err))
} }
} }
@ -173,9 +191,7 @@ mod tests {
#[test] #[test]
fn http_response_status_ok() { fn http_response_status_ok() {
let rsp = Response::builder().status(StatusCode::OK).body(()).unwrap(); let rsp = Response::builder().status(StatusCode::OK).body(()).unwrap();
let crsp = super::Response::default(); let class = super::Response::Http.start(&rsp).eos(None);
let ceos = crsp.start(&rsp);
let class = ceos.eos(None);
assert_eq!(class, Class::Http(SuccessOrFailure::Success)); assert_eq!(class, Class::Http(SuccessOrFailure::Success));
} }
@ -185,9 +201,7 @@ mod tests {
.status(StatusCode::BAD_REQUEST) .status(StatusCode::BAD_REQUEST)
.body(()) .body(())
.unwrap(); .unwrap();
let crsp = super::Response::default(); let class = super::Response::Http.start(&rsp).eos(None);
let ceos = crsp.start(&rsp);
let class = ceos.eos(None);
assert_eq!(class, Class::Http(SuccessOrFailure::Success)); assert_eq!(class, Class::Http(SuccessOrFailure::Success));
} }
@ -197,9 +211,7 @@ mod tests {
.status(StatusCode::INTERNAL_SERVER_ERROR) .status(StatusCode::INTERNAL_SERVER_ERROR)
.body(()) .body(())
.unwrap(); .unwrap();
let crsp = super::Response::default(); let class = super::Response::Http.start(&rsp).eos(None);
let ceos = crsp.start(&rsp);
let class = ceos.eos(None);
assert_eq!(class, Class::Http(SuccessOrFailure::Failure)); assert_eq!(class, Class::Http(SuccessOrFailure::Failure));
} }
@ -210,9 +222,7 @@ mod tests {
.status(StatusCode::OK) .status(StatusCode::OK)
.body(()) .body(())
.unwrap(); .unwrap();
let crsp = super::Response::Grpc; let class = super::Response::Grpc.start(&rsp).eos(None);
let ceos = crsp.start(&rsp);
let class = ceos.eos(None);
assert_eq!(class, Class::Grpc(SuccessOrFailure::Success, 0)); assert_eq!(class, Class::Grpc(SuccessOrFailure::Success, 0));
} }
@ -223,35 +233,27 @@ mod tests {
.status(StatusCode::OK) .status(StatusCode::OK)
.body(()) .body(())
.unwrap(); .unwrap();
let crsp = super::Response::Grpc; let class = super::Response::Grpc.start(&rsp).eos(None);
let ceos = crsp.start(&rsp);
let class = ceos.eos(None);
assert_eq!(class, Class::Grpc(SuccessOrFailure::Failure, 2)); assert_eq!(class, Class::Grpc(SuccessOrFailure::Failure, 2));
} }
#[test] #[test]
fn grpc_response_trailer_ok() { fn grpc_response_trailer_ok() {
let rsp = Response::builder().status(StatusCode::OK).body(()).unwrap(); let rsp = Response::builder().status(StatusCode::OK).body(()).unwrap();
let crsp = super::Response::Grpc;
let ceos = crsp.start(&rsp);
let mut trailers = HeaderMap::new(); let mut trailers = HeaderMap::new();
trailers.insert("grpc-status", 0.into()); trailers.insert("grpc-status", 0.into());
let class = ceos.eos(Some(&trailers)); let class = super::Response::Grpc.start(&rsp).eos(Some(&trailers));
assert_eq!(class, Class::Grpc(SuccessOrFailure::Success, 0)); assert_eq!(class, Class::Grpc(SuccessOrFailure::Success, 0));
} }
#[test] #[test]
fn grpc_response_trailer_error() { fn grpc_response_trailer_error() {
let rsp = Response::builder().status(StatusCode::OK).body(()).unwrap(); let rsp = Response::builder().status(StatusCode::OK).body(()).unwrap();
let crsp = super::Response::Grpc;
let ceos = crsp.start(&rsp);
let mut trailers = HeaderMap::new(); let mut trailers = HeaderMap::new();
trailers.insert("grpc-status", 3.into()); trailers.insert("grpc-status", 3.into());
let class = ceos.eos(Some(&trailers)); let class = super::Response::Grpc.start(&rsp).eos(Some(&trailers));
assert_eq!(class, Class::Grpc(SuccessOrFailure::Failure, 3)); assert_eq!(class, Class::Grpc(SuccessOrFailure::Failure, 3));
} }
} }

View File

@ -1,119 +0,0 @@
use http;
use std::fmt;
use std::net::SocketAddr;
use proxy::{
http::{h1, Settings},
Source,
};
use transport::{DnsNameAndPort, Host, HostAndPort};
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct Destination {
pub name_or_addr: NameOrAddr,
pub settings: Settings,
_p: (),
}
/// Describes a destination for HTTP requests.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum NameOrAddr {
/// A logical, lazily-bound endpoint.
Name(DnsNameAndPort),
/// A single, bound endpoint.
Addr(SocketAddr),
}
impl Destination {
pub fn new(name_or_addr: NameOrAddr, settings: Settings) -> Self {
Self {
name_or_addr,
settings,
_p: (),
}
}
pub fn from_request<A>(req: &http::Request<A>) -> Option<Self> {
let name_or_addr = NameOrAddr::from_request(req)?;
let settings = Settings::detect(req);
Some(Self::new(name_or_addr, settings))
}
}
impl fmt::Display for Destination {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.name_or_addr.fmt(f)
}
}
impl fmt::Display for NameOrAddr {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
NameOrAddr::Name(ref name) => write!(f, "{}:{}", name.host, name.port),
NameOrAddr::Addr(ref addr) => addr.fmt(f),
}
}
}
impl NameOrAddr {
/// Determines the destination for a request.
///
/// Typically, a request's authority is used to produce a `NameOrAddr`. If the
/// authority addresses a DNS name, a `NameOrAddr::Name` is returned; and, otherwise,
/// it addresses a fixed IP address and a `NameOrAddr::Addr` is returned. The port is
/// inferred if not specified in the authority.
///
/// If no authority is available, the `SO_ORIGINAL_DST` socket option is checked. If
/// it's available, it is used to return a `NameOrAddr::Addr`. This socket option is
/// typically set by `iptables(8)` in containerized environments like Kubernetes (as
/// configured by the `proxy-init` program).
///
/// If none of this information is available, no `NameOrAddr` is returned.
pub fn from_request<B>(req: &http::Request<B>) -> Option<NameOrAddr> {
match Self::host_port(req) {
Some(HostAndPort {
host: Host::DnsName(host),
port,
}) => {
let name_or_addr = DnsNameAndPort { host, port };
Some(NameOrAddr::Name(name_or_addr))
}
Some(HostAndPort {
host: Host::Ip(ip),
port,
}) => {
let name_or_addr = SocketAddr::from((ip, port));
Some(NameOrAddr::Addr(name_or_addr))
}
None => req
.extensions()
.get::<Source>()
.and_then(|src| src.orig_dst_if_not_local())
.map(NameOrAddr::Addr),
}
}
/// Determines the logical host:port of the request.
///
/// If the parsed URI includes an authority, use that. Otherwise, try to load the
/// authority from the `Host` header.
///
/// The port is either parsed from the authority or a default of 80 is used.
fn host_port<B>(req: &http::Request<B>) -> Option<HostAndPort> {
// Note: Calls to `normalize` cannot be deduped without cloning `authority`.
req.uri()
.authority_part()
.and_then(Self::normalize)
.or_else(|| h1::authority_from_host(req).and_then(|h| Self::normalize(&h)))
}
/// TODO: Return error when `HostAndPort::normalize()` fails.
/// TODO: Use scheme-appropriate default port.
fn normalize(authority: &http::uri::Authority) -> Option<HostAndPort> {
const DEFAULT_PORT: Option<u16> = Some(80);
HostAndPort::normalize(authority, DEFAULT_PORT).ok()
}
}

View File

@ -45,7 +45,7 @@ impl classify::CanClassify for Endpoint {
type Classify = classify::Request; type Classify = classify::Request;
fn classify(&self) -> classify::Request { fn classify(&self) -> classify::Request {
classify::Request classify::Request::default()
} }
} }

View File

@ -5,14 +5,14 @@ use http;
use indexmap::IndexSet; use indexmap::IndexSet;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::thread; use std::thread;
use std::time::SystemTime; use std::time::{Duration, SystemTime};
use std::{error, fmt, io}; use std::{error, fmt, io};
use tokio::executor::{self, DefaultExecutor, Executor}; use tokio::executor::{self, DefaultExecutor, Executor};
use tokio::runtime::current_thread; use tokio::runtime::current_thread;
use tower_h2; use tower_h2;
use app::classify::{self, Class}; use app::classify::{self, Class};
use app::metric_labels::EndpointLabels; use app::metric_labels::{EndpointLabels, RouteLabels};
use control; use control;
use dns; use dns;
use drain; use drain;
@ -183,14 +183,21 @@ where
let tap_next_id = tap::NextId::default(); let tap_next_id = tap::NextId::default();
let (taps, observe) = control::Observe::new(100); let (taps, observe) = control::Observe::new(100);
let (http_metrics, http_report) = let (endpoint_http_metrics, endpoint_http_report) =
proxy::http::metrics::new::<EndpointLabels, Class>(config.metrics_retain_idle); proxy::http::metrics::new::<EndpointLabels, Class>(config.metrics_retain_idle);
let (route_http_metrics, route_http_report) = {
let (m, r) =
proxy::http::metrics::new::<RouteLabels, Class>(config.metrics_retain_idle);
(m, r.with_prefix("route"))
};
let (transport_metrics, transport_report) = transport::metrics::new(); let (transport_metrics, transport_report) = transport::metrics::new();
let (tls_config_sensor, tls_config_report) = telemetry::tls_config_reload::new(); let (tls_config_sensor, tls_config_report) = telemetry::tls_config_reload::new();
let report = http_report let report = endpoint_http_report
.and_then(route_http_report)
.and_then(transport_report) .and_then(transport_report)
.and_then(tls_config_report) .and_then(tls_config_report)
.and_then(telemetry::process::Report::new(start_time)); .and_then(telemetry::process::Report::new(start_time));
@ -261,12 +268,13 @@ where
use super::outbound::{ use super::outbound::{
discovery::Resolve, orig_proto_upgrade, Endpoint, Recognize, discovery::Resolve, orig_proto_upgrade, Endpoint, Recognize,
}; };
use super::profiles::Client as ProfilesClient;
use proxy::{ use proxy::{
http::{balance, metrics}, http::{balance, metrics, profiles},
resolve, resolve,
}; };
let http_metrics = http_metrics.clone(); let endpoint_http_metrics = endpoint_http_metrics.clone();
// As the outbound proxy accepts connections, we don't do any // As the outbound proxy accepts connections, we don't do any
// special transport-level handling. // special transport-level handling.
@ -290,22 +298,34 @@ where
.push(normalize_uri::layer()) .push(normalize_uri::layer())
.push(orig_proto_upgrade::layer()) .push(orig_proto_upgrade::layer())
.push(tap::layer(tap_next_id.clone(), taps.clone())) .push(tap::layer(tap_next_id.clone(), taps.clone()))
.push(metrics::layer::<_, classify::Response>(http_metrics)) .push(metrics::layer::<_, classify::Response>(endpoint_http_metrics))
.push(classify::layer())
.push(svc::watch::layer(tls_client_config)) .push(svc::watch::layer(tls_client_config))
.push(buffer::layer()); .push(buffer::layer());
let dst_router_stack = endpoint_stack let profiles_client = ProfilesClient::new(
controller,
Duration::from_secs(3),
control::KubernetesNormalize::new(config.namespaces.pod.clone()),
);
let dst_route_stack = endpoint_stack
.push(resolve::layer(Resolve::new(resolver))) .push(resolve::layer(Resolve::new(resolver)))
.push(balance::layer()) .push(balance::layer())
.push(buffer::layer()) .push(buffer::layer())
.push(profiles::router::layer(
profiles_client,
svc::stack::phantom_data::layer()
.push(metrics::layer::<_, classify::Response>(route_http_metrics))
.push(classify::layer()),
))
.push(buffer::layer())
.push(timeout::layer(config.bind_timeout)) .push(timeout::layer(config.bind_timeout))
.push(limit::layer(MAX_IN_FLIGHT)) .push(limit::layer(MAX_IN_FLIGHT))
.push(router::layer(Recognize::new())); .push(router::layer(Recognize::new()));
let capacity = config.outbound_router_capacity; let capacity = config.outbound_router_capacity;
let max_idle_age = config.outbound_router_max_idle_age; let max_idle_age = config.outbound_router_max_idle_age;
let router = dst_router_stack let router = dst_route_stack
.make(&router::Config::new("out", capacity, max_idle_age)) .make(&router::Config::new("out", capacity, max_idle_age))
.expect("outbound router"); .expect("outbound router");
@ -361,7 +381,7 @@ where
.push(svc::stack_per_request::layer()) .push(svc::stack_per_request::layer())
.push(normalize_uri::layer()) .push(normalize_uri::layer())
.push(tap::layer(tap_next_id, taps)) .push(tap::layer(tap_next_id, taps))
.push(metrics::layer::<_, classify::Response>(http_metrics)) .push(metrics::layer::<_, classify::Response>(endpoint_http_metrics))
.push(classify::layer()) .push(classify::layer())
.push(buffer::layer()) .push(buffer::layer())
.push(limit::layer(MAX_IN_FLIGHT)) .push(limit::layer(MAX_IN_FLIGHT))
@ -419,9 +439,9 @@ where
metrics::Serve::new(report), metrics::Serve::new(report),
); );
// tap is already pushped in a logging Future. // tap is already wrapped in a logging Future.
rt.spawn(tap); rt.spawn(tap);
// metrics_server is already pushped in a logging Future. // metrics_server is already wrapped in a logging Future.
rt.spawn(metrics); rt.spawn(metrics);
rt.spawn(::logging::admin().bg("dns-resolver").future(dns_bg)); rt.spawn(::logging::admin().bg("dns-resolver").future(dns_bg));
rt.spawn( rt.spawn(

View File

@ -5,10 +5,11 @@ use std::{
}; };
use metrics::FmtLabels; use metrics::FmtLabels;
use transport::{tls, DnsNameAndPort};
use transport::tls;
use Conditional; use Conditional;
use super::{classify, inbound, outbound, NameOrAddr}; use super::{classify, inbound, outbound};
#[derive(Clone, Debug, PartialEq, Eq, Hash)] #[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct EndpointLabels { pub struct EndpointLabels {
@ -19,6 +20,13 @@ pub struct EndpointLabels {
labels: Option<String>, labels: Option<String>,
} }
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct RouteLabels {
direction: Direction,
dst: Dst,
labels: Option<String>,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
enum Direction { enum Direction {
In, In,
@ -28,6 +36,35 @@ enum Direction {
#[derive(Clone, Debug, PartialEq, Eq, Hash)] #[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct Authority(Option<uri::Authority>); struct Authority(Option<uri::Authority>);
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct Dst(outbound::Destination);
// === impl RouteLabels ===
impl From<outbound::Route> for RouteLabels {
fn from(r: outbound::Route) -> Self {
RouteLabels {
dst: Dst(r.dst.clone()),
direction: Direction::Out,
labels: prefix_labels("rt", r.route.labels().as_ref().into_iter()),
}
}
}
impl FmtLabels for RouteLabels {
fn fmt_labels(&self, f: &mut fmt::Formatter) -> fmt::Result {
(&self.dst, &self.direction).fmt_labels(f)?;
if let Some(labels) = self.labels.as_ref() {
write!(f, ",{}", labels)?;
}
Ok(())
}
}
// === impl EndpointLabels ===
impl From<inbound::Endpoint> for EndpointLabels { impl From<inbound::Endpoint> for EndpointLabels {
fn from(ep: inbound::Endpoint) -> Self { fn from(ep: inbound::Endpoint) -> Self {
Self { Self {
@ -40,18 +77,23 @@ impl From<inbound::Endpoint> for EndpointLabels {
} }
} }
fn prefix_labels<'i, I>(prefix: &str, mut labels_iter: I) -> Option<String>
where
I: Iterator<Item = (&'i String, &'i String)>,
{
let (k0, v0) = labels_iter.next()?;
let mut out = format!("{}_{}=\"{}\"", prefix, k0, v0);
for (k, v) in labels_iter {
write!(out, ",{}_{}=\"{}\"", prefix, k, v).expect("label concat must succeed");
}
Some(out)
}
impl From<outbound::Endpoint> for EndpointLabels { impl From<outbound::Endpoint> for EndpointLabels {
fn from(ep: outbound::Endpoint) -> Self { fn from(ep: outbound::Endpoint) -> Self {
let mut label_iter = ep.metadata.labels().into_iter(); use self::outbound::NameOrAddr;
let labels = if let Some((k0, v0)) = label_iter.next() { use transport::DnsNameAndPort;
let mut s = format!("dst_{}=\"{}\"", k0, v0);
for (k, v) in label_iter {
write!(s, ",dst_{}=\"{}\"", k, v).expect("label concat must succeed");
}
Some(s)
} else {
None
};
let authority = { let authority = {
let a = match ep.dst.name_or_addr { let a = match ep.dst.name_or_addr {
@ -72,7 +114,7 @@ impl From<outbound::Endpoint> for EndpointLabels {
authority, authority,
direction: Direction::Out, direction: Direction::Out,
tls_status: ep.connect.tls_status(), tls_status: ep.connect.tls_status(),
labels, labels: prefix_labels("dst", ep.metadata.labels().into_iter()),
} }
} }
} }
@ -110,6 +152,23 @@ impl FmtLabels for Authority {
} }
} }
impl FmtLabels for Dst {
fn fmt_labels(&self, f: &mut fmt::Formatter) -> fmt::Result {
let proto = if self.0.settings.is_http2() {
"h2"
} else {
"h1"
};
write!(
f,
"dst=\"{}\",dst_protocol=\"{}\"",
self.0.name_or_addr, proto
)?;
Ok(())
}
}
impl FmtLabels for classify::Class { impl FmtLabels for classify::Class {
fn fmt_labels(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt_labels(&self, f: &mut fmt::Formatter) -> fmt::Result {
use self::classify::Class; use self::classify::Class;
@ -119,9 +178,7 @@ impl FmtLabels for classify::Class {
"classification=\"{}\",grpc_status=\"{}\"", "classification=\"{}\",grpc_status=\"{}\"",
result, status result, status
), ),
Class::Http(result) => { Class::Http(result) => write!(f, "classification=\"{}\"", result),
write!(f, "classification=\"{}\"", result)
}
Class::Stream(result, status) => { Class::Stream(result, status) => {
write!(f, "classification=\"{}\",h2_err=\"{}\"", result, status) write!(f, "classification=\"{}\",h2_err=\"{}\"", result, status)
} }

View File

@ -6,14 +6,13 @@ use logging;
mod classify; mod classify;
pub mod config; pub mod config;
mod control; mod control;
mod destination;
mod inbound; mod inbound;
mod main; mod main;
mod metric_labels; mod metric_labels;
mod outbound; mod outbound;
mod profiles;
use self::config::{Config, Env}; use self::config::{Config, Env};
use self::destination::{Destination, NameOrAddr};
pub use self::main::Main; pub use self::main::Main;
pub fn init() -> Result<Config, config::Error> { pub fn init() -> Result<Config, config::Error> {

View File

@ -1,12 +1,22 @@
use http; use http;
use std::fmt; use std::fmt;
use std::net::SocketAddr;
use app::{classify, Destination}; use app::classify;
use control::destination::{Metadata, ProtocolHint}; use control::destination::{Metadata, ProtocolHint};
use proxy::http::{client, normalize_uri::ShouldNormalizeUri, router}; use proxy::{
http::{
classify::CanClassify,
client, h1,
normalize_uri::ShouldNormalizeUri,
profiles::{self, CanGetDestination},
router, Settings,
},
Source,
};
use svc::{self, stack_per_request::ShouldStackPerRequest}; use svc::{self, stack_per_request::ShouldStackPerRequest};
use tap; use tap;
use transport::{connect, tls}; use transport::{connect, tls, DnsNameAndPort, Host, HostAndPort};
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct Endpoint { pub struct Endpoint {
@ -16,6 +26,29 @@ pub struct Endpoint {
_p: (), _p: (),
} }
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct Destination {
pub name_or_addr: NameOrAddr,
pub settings: Settings,
_p: (),
}
/// Describes a destination for HTTP requests.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum NameOrAddr {
/// A logical, lazily-bound endpoint.
Name(DnsNameAndPort),
/// A single, bound endpoint.
Addr(SocketAddr),
}
#[derive(Clone, Debug)]
pub struct Route {
pub dst: Destination,
pub route: profiles::Route,
}
#[derive(Clone, Debug, Default)] #[derive(Clone, Debug, Default)]
pub struct Recognize {} pub struct Recognize {}
@ -42,11 +75,9 @@ impl ShouldStackPerRequest for Endpoint {
} }
} }
impl classify::CanClassify for Endpoint { impl fmt::Display for Endpoint {
type Classify = classify::Request; fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.connect.addr.fmt(f)
fn classify(&self) -> classify::Request {
classify::Request
} }
} }
@ -65,12 +96,6 @@ impl svc::watch::WithUpdate<tls::ConditionalClientConfig> for Endpoint {
} }
} }
impl fmt::Display for Endpoint {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.connect.addr.fmt(f)
}
}
// Makes it possible to build a client::Stack<Endpoint>. // Makes it possible to build a client::Stack<Endpoint>.
impl From<Endpoint> for client::Config { impl From<Endpoint> for client::Config {
fn from(ep: Endpoint) -> Self { fn from(ep: Endpoint) -> Self {
@ -80,6 +105,7 @@ impl From<Endpoint> for client::Config {
impl From<Endpoint> for tap::Endpoint { impl From<Endpoint> for tap::Endpoint {
fn from(ep: Endpoint) -> Self { fn from(ep: Endpoint) -> Self {
// TODO add route labels...
tap::Endpoint { tap::Endpoint {
direction: tap::Direction::Out, direction: tap::Direction::Out,
labels: ep.metadata.labels().clone(), labels: ep.metadata.labels().clone(),
@ -88,6 +114,16 @@ impl From<Endpoint> for tap::Endpoint {
} }
} }
// === impl Route ===
impl CanClassify for Route {
type Classify = classify::Request;
fn classify(&self) -> classify::Request {
self.route.response_classes().clone().into()
}
}
// === impl Recognize === // === impl Recognize ===
impl Recognize { impl Recognize {
@ -106,12 +142,123 @@ impl<B> router::Recognize<http::Request<B>> for Recognize {
} }
} }
// === impl Destination ===
impl Destination {
pub fn new(name_or_addr: NameOrAddr, settings: Settings) -> Self {
Self {
name_or_addr,
settings,
_p: (),
}
}
pub fn from_request<A>(req: &http::Request<A>) -> Option<Self> {
let name_or_addr = NameOrAddr::from_request(req)?;
let settings = Settings::detect(req);
Some(Self::new(name_or_addr, settings))
}
}
impl CanGetDestination for Destination {
fn get_destination(&self) -> Option<&DnsNameAndPort> {
match self.name_or_addr {
NameOrAddr::Name(ref dst) => Some(dst),
_ => None,
}
}
}
impl fmt::Display for Destination {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.name_or_addr.fmt(f)
}
}
impl fmt::Display for NameOrAddr {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
NameOrAddr::Name(ref name) => write!(f, "{}:{}", name.host, name.port),
NameOrAddr::Addr(ref addr) => addr.fmt(f),
}
}
}
impl NameOrAddr {
/// Determines the destination for a request.
///
/// Typically, a request's authority is used to produce a `NameOrAddr`. If the
/// authority addresses a DNS name, a `NameOrAddr::Name` is returned; and, otherwise,
/// it addresses a fixed IP address and a `NameOrAddr::Addr` is returned. The port is
/// inferred if not specified in the authority.
///
/// If no authority is available, the `SO_ORIGINAL_DST` socket option is checked. If
/// it's available, it is used to return a `NameOrAddr::Addr`. This socket option is
/// typically set by `iptables(8)` in containerized environments like Kubernetes (as
/// configured by the `proxy-init` program).
///
/// If none of this information is available, no `NameOrAddr` is returned.
pub fn from_request<B>(req: &http::Request<B>) -> Option<NameOrAddr> {
match Self::host_port(req) {
Some(HostAndPort {
host: Host::DnsName(host),
port,
}) => {
let name_or_addr = DnsNameAndPort { host, port };
Some(NameOrAddr::Name(name_or_addr))
}
Some(HostAndPort {
host: Host::Ip(ip),
port,
}) => {
let name_or_addr = SocketAddr::from((ip, port));
Some(NameOrAddr::Addr(name_or_addr))
}
None => req
.extensions()
.get::<Source>()
.and_then(|src| src.orig_dst_if_not_local())
.map(NameOrAddr::Addr),
}
}
/// Determines the logical host:port of the request.
///
/// If the parsed URI includes an authority, use that. Otherwise, try to load the
/// authority from the `Host` header.
///
/// The port is either parsed from the authority or a default of 80 is used.
fn host_port<B>(req: &http::Request<B>) -> Option<HostAndPort> {
// Note: Calls to `normalize` cannot be deduped without cloning `authority`.
req.uri()
.authority_part()
.and_then(Self::normalize)
.or_else(|| h1::authority_from_host(req).and_then(|h| Self::normalize(&h)))
}
/// TODO: Return error when `HostAndPort::normalize()` fails.
/// TODO: Use scheme-appropriate default port.
fn normalize(authority: &http::uri::Authority) -> Option<HostAndPort> {
const DEFAULT_PORT: Option<u16> = Some(80);
HostAndPort::normalize(authority, DEFAULT_PORT).ok()
}
}
impl profiles::WithRoute for Destination {
type Output = Route;
fn with_route(self, route: profiles::Route) -> Self::Output {
Route { dst: self, route }
}
}
pub mod discovery { pub mod discovery {
use futures::{Async, Poll}; use futures::{Async, Poll};
use std::net::SocketAddr; use std::net::SocketAddr;
use super::Endpoint; use super::{Destination, Endpoint, NameOrAddr};
use app::{Destination, NameOrAddr};
use control::destination::Metadata; use control::destination::Metadata;
use proxy::resolve; use proxy::resolve;
use transport::{connect, tls, DnsNameAndPort}; use transport::{connect, tls, DnsNameAndPort};
@ -240,6 +387,8 @@ pub mod orig_proto_upgrade {
} }
} }
// === impl Stack ===
impl<M, A, B> svc::Stack<Endpoint> for Stack<M> impl<M, A, B> svc::Stack<Endpoint> for Stack<M>
where where
M: svc::Stack<Endpoint>, M: svc::Stack<Endpoint>,

219
src/app/profiles.rs Normal file
View File

@ -0,0 +1,219 @@
use futures::{Async, Future, Poll, Stream};
use http;
use regex::Regex;
use std::fmt;
use std::time::Duration;
use tokio_timer::{clock, Delay};
use tower_grpc as grpc;
use tower_h2::{Body, BoxBody, Data, HttpService};
use api::destination as api;
use control;
use proxy::http::profiles;
use transport::DnsNameAndPort;
#[derive(Clone, Debug)]
pub struct Client<T, N> {
service: Option<T>,
normalize_name: N,
backoff: Duration,
}
pub struct Rx<T: HttpService> {
dst: String,
backoff: Duration,
service: Option<T>,
state: State<T>,
}
enum State<T: HttpService> {
Disconnected,
Backoff(Delay),
Waiting(grpc::client::server_streaming::ResponseFuture<api::DestinationProfile, T::Future>),
Streaming(grpc::Streaming<api::DestinationProfile, T::ResponseBody>),
}
// === impl Client ===
impl<T, N> Client<T, N>
where
T: HttpService<RequestBody = BoxBody> + Clone,
T::ResponseBody: Body<Data = Data>,
T::Error: fmt::Debug,
N: control::Normalize,
{
pub fn new(service: Option<T>, backoff: Duration, normalize_name: N) -> Self {
Self {
service,
backoff,
normalize_name,
}
}
}
impl<T, N> profiles::GetRoutes for Client<T, N>
where
T: HttpService<RequestBody = BoxBody> + Clone,
T::ResponseBody: Body<Data = Data>,
T::Error: fmt::Debug,
N: control::Normalize,
{
type Stream = Rx<T>;
fn get_routes(&self, dst: &DnsNameAndPort) -> Option<Self::Stream> {
let fqa = self.normalize_name.normalize(dst)?;
Some(Rx {
dst: fqa.without_trailing_dot().to_owned(),
state: State::Disconnected,
service: self.service.clone(),
backoff: self.backoff,
})
}
}
// === impl Rx ===
impl<T> Stream for Rx<T>
where
T: HttpService<RequestBody = BoxBody> + Clone,
T::ResponseBody: Body<Data = Data>,
T::Error: fmt::Debug,
{
type Item = Vec<(profiles::RequestMatch, profiles::Route)>;
type Error = profiles::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let service = match self.service {
Some(ref s) => s,
None => return Ok(Async::Ready(Some(Vec::new()))),
};
loop {
self.state = match self.state {
State::Disconnected => {
let mut client = api::client::Destination::new(service.clone());
let req = api::GetDestination {
scheme: "k8s".to_owned(),
path: self.dst.clone(),
};
debug!("disconnected; getting profile: {:?}", req);
let rspf = client.get_profile(grpc::Request::new(req));
State::Waiting(rspf)
}
State::Waiting(ref mut f) => match f.poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(rsp)) => {
debug!("response received");
State::Streaming(rsp.into_inner())
}
Err(e) => {
warn!("error fetching profile for {}: {:?}", self.dst, e);
State::Backoff(Delay::new(clock::now() + self.backoff))
}
},
State::Streaming(ref mut s) => match s.poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(Some(profile))) => {
debug!("profile received: {:?}", profile);
let rs = profile.routes.into_iter().filter_map(convert_route);
return Ok(Async::Ready(Some(rs.collect())));
}
Ok(Async::Ready(None)) => {
debug!("profile stream ended");
State::Backoff(Delay::new(clock::now() + self.backoff))
}
Err(e) => {
warn!("profile stream failed: {:?}", e);
State::Backoff(Delay::new(clock::now() + self.backoff))
}
},
State::Backoff(ref mut f) => match f.poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(_) | Ok(Async::Ready(())) => State::Disconnected,
},
};
}
}
}
fn convert_route(orig: api::Route) -> Option<(profiles::RequestMatch, profiles::Route)> {
let req_match = orig.condition.and_then(convert_req_match)?;
let rsp_classes = orig
.response_classes
.into_iter()
.filter_map(convert_rsp_class)
.collect();
let route = profiles::Route::new(orig.metrics_labels.into_iter(), rsp_classes);
Some((req_match, route))
}
fn convert_req_match(orig: api::RequestMatch) -> Option<profiles::RequestMatch> {
let m = match orig.match_? {
api::request_match::Match::All(ms) => {
let ms = ms.matches.into_iter().filter_map(convert_req_match);
profiles::RequestMatch::All(ms.collect())
}
api::request_match::Match::Any(ms) => {
let ms = ms.matches.into_iter().filter_map(convert_req_match);
profiles::RequestMatch::Any(ms.collect())
}
api::request_match::Match::Not(m) => {
let m = convert_req_match(*m)?;
profiles::RequestMatch::Not(Box::new(m))
}
api::request_match::Match::Path(api::PathMatch { regex }) => {
let re = Regex::new(&regex).ok()?;
profiles::RequestMatch::Path(re)
}
api::request_match::Match::Method(mm) => {
let m = mm.type_.and_then(|m| m.try_as_http().ok())?;
profiles::RequestMatch::Method(m)
}
};
Some(m)
}
fn convert_rsp_class(orig: api::ResponseClass) -> Option<profiles::ResponseClass> {
let c = orig.condition.and_then(convert_rsp_match)?;
Some(profiles::ResponseClass::new(orig.is_failure, c))
}
fn convert_rsp_match(orig: api::ResponseMatch) -> Option<profiles::ResponseMatch> {
let m = match orig.match_? {
api::response_match::Match::All(ms) => {
let ms = ms
.matches
.into_iter()
.filter_map(convert_rsp_match)
.collect::<Vec<_>>();
if ms.is_empty() {
return None;
}
profiles::ResponseMatch::All(ms)
}
api::response_match::Match::Any(ms) => {
let ms = ms
.matches
.into_iter()
.filter_map(convert_rsp_match)
.collect::<Vec<_>>();
if ms.is_empty() {
return None;
}
profiles::ResponseMatch::Any(ms)
}
api::response_match::Match::Not(m) => {
let m = convert_rsp_match(*m)?;
profiles::ResponseMatch::Not(Box::new(m))
}
api::response_match::Match::Status(range) => {
let min = http::StatusCode::from_u16(range.min as u16).ok()?;
let max = http::StatusCode::from_u16(range.max as u16).ok()?;
profiles::ResponseMatch::Status { min, max }
}
};
Some(m)
}

View File

@ -25,7 +25,7 @@ use super::{ResolveRequest, Update};
use app::config::Namespaces; use app::config::Namespaces;
use control::{ use control::{
cache::Exists, cache::Exists,
fully_qualified_authority::FullyQualifiedAuthority, fully_qualified_authority::{KubernetesNormalize, Normalize as _Normalize},
remote_stream::{Receiver, Remote}, remote_stream::{Receiver, Remote},
}; };
use dns; use dns;
@ -351,9 +351,10 @@ impl NewQuery {
connect_or_reconnect, connect_or_reconnect,
auth auth
); );
let default_ns = &self.namespaces.pod; let default_ns = self.namespaces.pod.clone();
let client_and_authority = client.and_then(|client| { let client_and_authority = client.and_then(|client| {
FullyQualifiedAuthority::normalize(auth, default_ns) KubernetesNormalize::new(default_ns)
.normalize(auth)
.map(|auth| (auth, client)) .map(|auth| (auth, client))
}); });
match client_and_authority { match client_and_authority {

View File

@ -2,14 +2,29 @@ use bytes::{BytesMut};
use transport::DnsNameAndPort; use transport::DnsNameAndPort;
pub trait Normalize {
fn normalize(&self, authority: &DnsNameAndPort) -> Option<FullyQualifiedAuthority>;
}
#[derive(Clone, Debug)]
pub struct KubernetesNormalize {
default_namespace: String,
}
/// A normalized `Authority`. /// A normalized `Authority`.
#[derive(Clone, Debug, Eq, PartialEq)] #[derive(Clone, Debug, Eq, PartialEq)]
pub struct FullyQualifiedAuthority(String); pub struct FullyQualifiedAuthority(String);
impl FullyQualifiedAuthority { impl KubernetesNormalize {
pub fn new(default_namespace: String) -> Self {
Self { default_namespace }
}
}
impl Normalize for KubernetesNormalize {
/// Normalizes the name according to Kubernetes service naming conventions. /// Normalizes the name according to Kubernetes service naming conventions.
/// Case folding is not done; that is done internally inside `Authority`. /// Case folding is not done; that is done internally inside `Authority`.
pub fn normalize(authority: &DnsNameAndPort, default_namespace: &str) -> Option<Self> { fn normalize(&self, authority: &DnsNameAndPort) -> Option<FullyQualifiedAuthority> {
let name: &str = authority.host.as_ref(); let name: &str = authority.host.as_ref();
// parts should have a maximum 4 of pieces (name, namespace, svc, zone) // parts should have a maximum 4 of pieces (name, namespace, svc, zone)
@ -28,7 +43,7 @@ impl FullyQualifiedAuthority {
None => false, None => false,
}; };
let namespace_to_append = if !has_explicit_namespace { let namespace_to_append = if !has_explicit_namespace {
Some(default_namespace) Some(&self.default_namespace)
} else { } else {
None None
}; };
@ -117,7 +132,9 @@ impl FullyQualifiedAuthority {
Some(FullyQualifiedAuthority(String::from_utf8(normalized.freeze().to_vec()).unwrap())) Some(FullyQualifiedAuthority(String::from_utf8(normalized.freeze().to_vec()).unwrap()))
} }
}
impl FullyQualifiedAuthority {
pub fn without_trailing_dot(&self) -> &str { pub fn without_trailing_dot(&self) -> &str {
&self.0 &self.0
} }
@ -129,6 +146,8 @@ mod tests {
use http::uri::Authority; use http::uri::Authority;
use std::str::FromStr; use std::str::FromStr;
use super::Normalize;
#[test] #[test]
fn test_normalized_authority() { fn test_normalized_authority() {
fn dns_name_and_port_from_str(input: &str) -> DnsNameAndPort { fn dns_name_and_port_from_str(input: &str) -> DnsNameAndPort {
@ -145,14 +164,14 @@ mod tests {
fn local(input: &str, default_namespace: &str) -> String { fn local(input: &str, default_namespace: &str) -> String {
let name = dns_name_and_port_from_str(input); let name = dns_name_and_port_from_str(input);
let output = super::FullyQualifiedAuthority::normalize(&name, default_namespace); let output = super::KubernetesNormalize::new(default_namespace.to_owned()).normalize(&name);
assert!(output.is_some(), "input: {}", input); assert!(output.is_some(), "input: {}", input);
output.unwrap().without_trailing_dot().into() output.unwrap().without_trailing_dot().into()
} }
fn external(input: &str, default_namespace: &str) { fn external(input: &str, default_namespace: &str) {
let name = dns_name_and_port_from_str(input); let name = dns_name_and_port_from_str(input);
let output = super::FullyQualifiedAuthority::normalize(&name, default_namespace); let output = super::KubernetesNormalize::new(default_namespace.to_owned()).normalize(&name);
assert!(output.is_none(), "input: {}", input); assert!(output.is_none(), "input: {}", input);
} }

View File

@ -6,5 +6,6 @@ pub mod pb;
mod remote_stream; mod remote_stream;
mod serve_http; mod serve_http;
pub use self::fully_qualified_authority::{Normalize, KubernetesNormalize, FullyQualifiedAuthority};
pub use self::observe::Observe; pub use self::observe::Observe;
pub use self::serve_http::serve_http; pub use self::serve_http::serve_http;

View File

@ -119,7 +119,7 @@ impl<'a> TryFrom<&'a Event> for tap::TapEvent {
stream: ctx.id as u64, stream: ctx.id as u64,
}), }),
method: Some((&ctx.method).into()), method: Some((&ctx.method).into()),
scheme: ctx.scheme.as_ref().map(tap::Scheme::from), scheme: ctx.scheme.as_ref().map(http_types::Scheme::from),
authority: ctx.authority.as_ref() authority: ctx.authority.as_ref()
.map(|a| a.as_str()) .map(|a| a.as_str())
.unwrap_or_default() .unwrap_or_default()

View File

@ -469,12 +469,10 @@ impl std::error::Error for Error {
} }
} }
impl Error { impl super::HasH2Reason for Error {
pub fn reason(&self) -> Option<h2::Reason> { fn h2_reason(&self) -> Option<h2::Reason> {
match self { match self {
// TODO: it would be good to provide better error Error::Http1(_) => None,
// details in metrics for HTTP/1...
Error::Http1(_) => Some(h2::Reason::INTERNAL_ERROR),
Error::Http2(e) => e.reason(), Error::Http2(e) => e.reason(),
} }
} }

View File

@ -45,8 +45,6 @@ where
} }
} }
// FIXME This will be used for route_* metrics.
#[allow(dead_code)]
pub fn with_prefix(self, prefix: &'static str) -> Self { pub fn with_prefix(self, prefix: &'static str) -> Self {
if prefix.is_empty() { if prefix.is_empty() {
return self; return self;

View File

@ -7,6 +7,7 @@ pub mod insert_target;
pub mod metrics; pub mod metrics;
pub mod normalize_uri; pub mod normalize_uri;
pub mod orig_proto; pub mod orig_proto;
pub mod profiles;
pub mod router; pub mod router;
pub mod settings; pub mod settings;
pub mod upgrade; pub mod upgrade;
@ -15,3 +16,16 @@ pub use self::classify::{Classify, ClassifyResponse};
pub use self::client::{Client, Error as ClientError}; pub use self::client::{Client, Error as ClientError};
pub use self::glue::HttpBody as Body; pub use self::glue::HttpBody as Body;
pub use self::settings::Settings; pub use self::settings::Settings;
pub trait HasH2Reason {
fn h2_reason(&self) -> Option<::h2::Reason>;
}
impl<E: HasH2Reason> HasH2Reason for super::buffer::ServiceError<E> {
fn h2_reason(&self) -> Option<::h2::Reason> {
match self {
super::buffer::ServiceError::Inner(e) => e.h2_reason(),
super::buffer::ServiceError::Closed => None,
}
}
}

370
src/proxy/http/profiles.rs Normal file
View File

@ -0,0 +1,370 @@
#![allow(dead_code)]
extern crate tower_discover;
use futures::Stream;
use http;
use indexmap::IndexMap;
use regex::Regex;
use std::iter::FromIterator;
use std::sync::Arc;
use std::{error, fmt};
use transport::DnsNameAndPort;
pub type Routes = Vec<(RequestMatch, Route)>;
/// Watches a destination's Routes.
///
/// The stream updates with all routes for the given destination. The stream
/// never ends and cannot fail.
pub trait GetRoutes {
type Stream: Stream<Item = Routes, Error = Error>;
fn get_routes(&self, dst: &DnsNameAndPort) -> Option<Self::Stream>;
}
/// Implemented by target types that may be combined with a Route.
pub trait WithRoute {
type Output;
fn with_route(self, route: Route) -> Self::Output;
}
/// Implemented by target types that may have a `DnsNameAndPort` destination that
/// can be discovered via `GetRoutes`.
pub trait CanGetDestination {
fn get_destination(&self) -> Option<&DnsNameAndPort>;
}
#[derive(Debug)]
pub enum Error {}
#[derive(Clone, Debug, Default)]
pub struct Route {
labels: Arc<IndexMap<String, String>>,
response_classes: ResponseClasses,
}
#[derive(Clone, Debug)]
pub enum RequestMatch {
All(Vec<RequestMatch>),
Any(Vec<RequestMatch>),
Not(Box<RequestMatch>),
Path(Regex),
Method(http::Method),
}
#[derive(Clone, Debug)]
pub struct ResponseClass {
is_failure: bool,
match_: ResponseMatch,
}
pub type ResponseClasses = Arc<Vec<ResponseClass>>;
#[derive(Clone, Debug)]
pub enum ResponseMatch {
All(Vec<ResponseMatch>),
Any(Vec<ResponseMatch>),
Not(Box<ResponseMatch>),
Status {
min: http::StatusCode,
max: http::StatusCode,
},
}
// === impl Route ===
impl Route {
pub fn new<I>(label_iter: I, response_classes: Vec<ResponseClass>) -> Self
where
I: Iterator<Item = (String, String)>,
{
let labels = {
let mut pairs = label_iter.collect::<Vec<_>>();
pairs.sort_by(|(k0, _), (k1, _)| k0.cmp(k1));
Arc::new(IndexMap::from_iter(pairs))
};
Self {
labels,
response_classes: response_classes.into(),
}
}
pub fn labels(&self) -> &Arc<IndexMap<String, String>> {
&self.labels
}
pub fn response_classes(&self) -> &ResponseClasses {
&self.response_classes
}
}
// === impl RequestMatch ===
impl RequestMatch {
fn is_match<B>(&self, req: &http::Request<B>) -> bool {
match self {
RequestMatch::Method(ref method) => req.method() == *method,
RequestMatch::Path(ref re) => re.is_match(req.uri().path()),
RequestMatch::Not(ref m) => !m.is_match(req),
RequestMatch::All(ref ms) => ms.iter().all(|m| m.is_match(req)),
RequestMatch::Any(ref ms) => ms.iter().any(|m| m.is_match(req)),
}
}
}
// === impl ResponseClass ===
impl ResponseClass {
pub fn new(is_failure: bool, match_: ResponseMatch) -> Self {
Self { is_failure, match_ }
}
pub fn is_failure(&self) -> bool {
self.is_failure
}
pub fn is_match<B>(&self, req: &http::Response<B>) -> bool {
self.match_.is_match(req)
}
}
// === impl ResponseMatch ===
impl ResponseMatch {
fn is_match<B>(&self, req: &http::Response<B>) -> bool {
match self {
ResponseMatch::Status { ref min, ref max } => {
*min <= req.status() && req.status() <= *max
}
ResponseMatch::Not(ref m) => !m.is_match(req),
ResponseMatch::All(ref ms) => ms.iter().all(|m| m.is_match(req)),
ResponseMatch::Any(ref ms) => ms.iter().any(|m| m.is_match(req)),
}
}
}
// === impl Error ===
impl fmt::Display for Error {
fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result {
unreachable!()
}
}
impl error::Error for Error {}
/// A stack module that produces a Service that routes requests through alternate
/// middleware configurations
///
/// As the router's Stack is built, a destination is extracted from the stack's
/// target and it is used to get route profiles from ` GetRoutes` implemetnation.
///
/// Each route uses a shared underlying stack. As such, it assumed that the
/// underlying stack is buffered, and so `poll_ready` is NOT called on the routes
/// before requests are dispatched. If an individual route wishes to apply
/// backpressure, it must implement its own buffer/limit strategy.
pub mod router {
use futures::{Async, Poll, Stream};
use http;
use std::{error, fmt};
use svc;
use super::*;
pub fn layer<T, G, M, R>(get_routes: G, route_layer: R) -> Layer<G, M, R>
where
T: CanGetDestination + WithRoute + Clone,
M: svc::Stack<T>,
M::Value: Clone,
G: GetRoutes + Clone,
R: svc::Layer<
<T as WithRoute>::Output,
<T as WithRoute>::Output,
svc::shared::Stack<M::Value>,
>
+ Clone,
R::Value: svc::Service,
{
Layer {
get_routes,
route_layer,
default_route: Route::default(),
_p: ::std::marker::PhantomData,
}
}
#[derive(Clone, Debug)]
pub struct Layer<G, M, R = ()> {
get_routes: G,
route_layer: R,
default_route: Route,
_p: ::std::marker::PhantomData<fn() -> M>,
}
#[derive(Clone, Debug)]
pub struct Stack<G, M, R = ()> {
inner: M,
get_routes: G,
route_layer: R,
default_route: Route,
}
#[derive(Debug)]
pub enum Error<D, R> {
Inner(D),
Route(R),
}
pub struct Service<G, T, R>
where
T: WithRoute,
R: svc::Stack<T::Output>,
R::Value: svc::Service,
{
target: T,
stack: R,
route_stream: Option<G>,
routes: Vec<(RequestMatch, R::Value)>,
default_route: R::Value,
}
impl<D: fmt::Display, R: fmt::Display> fmt::Display for Error<D, R> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Error::Inner(e) => fmt::Display::fmt(&e, f),
Error::Route(e) => fmt::Display::fmt(&e, f),
}
}
}
impl<D: error::Error, R: error::Error> error::Error for Error<D, R> {}
impl<T, G, M, R> svc::Layer<T, T, M> for Layer<G, M, R>
where
T: CanGetDestination + WithRoute + Clone,
G: GetRoutes + Clone,
M: svc::Stack<T>,
M::Value: Clone,
R: svc::Layer<
<T as WithRoute>::Output,
<T as WithRoute>::Output,
svc::shared::Stack<M::Value>,
>
+ Clone,
R::Value: svc::Service,
{
type Value = <Stack<G, M, R> as svc::Stack<T>>::Value;
type Error = <Stack<G, M, R> as svc::Stack<T>>::Error;
type Stack = Stack<G, M, R>;
fn bind(&self, inner: M) -> Self::Stack {
Stack {
inner,
get_routes: self.get_routes.clone(),
route_layer: self.route_layer.clone(),
default_route: self.default_route.clone(),
}
}
}
impl<T, G, M, R> svc::Stack<T> for Stack<G, M, R>
where
T: CanGetDestination + WithRoute + Clone,
M: svc::Stack<T>,
M::Value: Clone,
G: GetRoutes,
R: svc::Layer<
<T as WithRoute>::Output,
<T as WithRoute>::Output,
svc::shared::Stack<M::Value>,
>
+ Clone,
R::Value: svc::Service,
{
type Value = Service<G::Stream, T, R::Stack>;
type Error = Error<M::Error, R::Error>;
fn make(&self, target: &T) -> Result<Self::Value, Self::Error> {
let inner = self.inner.make(&target).map_err(Error::Inner)?;
let stack = self.route_layer.bind(svc::shared::stack(inner));
let default_route = {
let t = target.clone().with_route(self.default_route.clone());
stack.make(&t).map_err(Error::Route)?
};
let route_stream = target
.get_destination()
.and_then(|d| self.get_routes.get_routes(&d));
Ok(Service {
target: target.clone(),
stack,
route_stream,
default_route,
routes: Vec::new(),
})
}
}
impl<G, T, R> Service<G, T, R>
where
G: Stream<Item = Routes, Error = super::Error>,
T: WithRoute + Clone,
R: svc::Stack<T::Output> + Clone,
R::Value: svc::Service,
{
fn update_routes(&mut self, mut routes: Routes) {
self.routes = Vec::with_capacity(routes.len());
for (req_match, route) in routes.drain(..) {
let target = self.target.clone().with_route(route.clone());
match self.stack.make(&target) {
Ok(svc) => self.routes.push((req_match, svc)),
Err(_) => error!("failed to build service for route: route={:?}", route),
}
}
}
fn poll_route_stream(&mut self) -> Option<Async<Option<Routes>>> {
self.route_stream
.as_mut()
.and_then(|ref mut s| s.poll().ok())
}
}
impl<G, T, R, B> svc::Service for Service<G, T, R>
where
G: Stream<Item = Routes, Error = super::Error>,
T: WithRoute + Clone,
R: svc::Stack<T::Output> + Clone,
R::Value: svc::Service<Request = http::Request<B>>,
{
type Request = <R::Value as svc::Service>::Request;
type Response = <R::Value as svc::Service>::Response;
type Error = <R::Value as svc::Service>::Error;
type Future = <R::Value as svc::Service>::Future;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
while let Some(Async::Ready(Some(routes))) = self.poll_route_stream() {
self.update_routes(routes);
}
Ok(Async::Ready(()))
}
fn call(&mut self, req: Self::Request) -> Self::Future {
for (ref condition, ref mut service) in &mut self.routes {
if condition.is_match(&req) {
return service.call(req);
}
}
self.default_route.call(req)
}
}
}

View File

@ -383,7 +383,7 @@ mod tests {
use quickcheck::*; use quickcheck::*;
use super::*; use super::*;
use api::tap; use api::http_types;
impl Arbitrary for LabelMatch { impl Arbitrary for LabelMatch {
fn arbitrary<G: Gen>(g: &mut G) -> Self { fn arbitrary<G: Gen>(g: &mut G) -> Self {
@ -488,7 +488,7 @@ mod tests {
Some(&http::Match::Method(ref m)) => { Some(&http::Match::Method(ref m)) => {
match m.type_.as_ref() { match m.type_.as_ref() {
None => Some(InvalidMatch::Empty), None => Some(InvalidMatch::Empty),
Some(&tap::http_method::Type::Unregistered(ref m)) => if m.len() <= 15 { Some(&http_types::http_method::Type::Unregistered(ref m)) => if m.len() <= 15 {
let mut err = None; let mut err = None;
if let Err(_) = ::http::Method::from_bytes(m.as_bytes()) { if let Err(_) = ::http::Method::from_bytes(m.as_bytes()) {
err = Some(InvalidMatch::InvalidHttpMethod); err = Some(InvalidMatch::InvalidHttpMethod);
@ -497,7 +497,7 @@ mod tests {
} else { } else {
Some(InvalidMatch::InvalidHttpMethod) Some(InvalidMatch::InvalidHttpMethod)
} }
Some(&tap::http_method::Type::Registered(m)) => if m < 9 { Some(&http_types::http_method::Type::Registered(m)) => if m < 9 {
None None
} else { } else {
Some(InvalidMatch::InvalidHttpMethod) Some(InvalidMatch::InvalidHttpMethod)
@ -507,8 +507,8 @@ mod tests {
Some(&http::Match::Scheme(ref m)) => { Some(&http::Match::Scheme(ref m)) => {
match m.type_.as_ref() { match m.type_.as_ref() {
None => Some(InvalidMatch::Empty), None => Some(InvalidMatch::Empty),
Some(&tap::scheme::Type::Unregistered(_)) => None, Some(&http_types::scheme::Type::Unregistered(_)) => None,
Some(&tap::scheme::Type::Registered(m)) => { Some(&http_types::scheme::Type::Registered(m)) => {
if m < 2 { if m < 2 {
None None
} else { } else {

View File

@ -11,7 +11,7 @@ use tower_h2::Body;
use super::{event, NextId, Taps}; use super::{event, NextId, Taps};
use proxy::{ use proxy::{
self, self,
http::{client::Error, h1}, http::{h1, HasH2Reason},
}; };
use svc; use svc;
@ -86,11 +86,8 @@ pub fn layer<T, M, A, B>(next_id: NextId, taps: Arc<Mutex<Taps>>) -> Layer<T, M>
where where
T: Clone + Into<event::Endpoint>, T: Clone + Into<event::Endpoint>,
M: svc::Stack<T>, M: svc::Stack<T>,
M::Value: svc::Service< M::Value: svc::Service<Request = http::Request<RequestBody<A>>, Response = http::Response<B>>,
Request = http::Request<RequestBody<A>>, <M::Value as svc::Service>::Error: HasH2Reason,
Response = http::Response<B>,
Error = Error,
>,
A: Body, A: Body,
B: Body, B: Body,
{ {
@ -108,8 +105,8 @@ where
M::Value: svc::Service< M::Value: svc::Service<
Request = http::Request<RequestBody<A>>, Request = http::Request<RequestBody<A>>,
Response = http::Response<B>, Response = http::Response<B>,
Error = Error,
>, >,
<M::Value as svc::Service>::Error: HasH2Reason,
A: Body, A: Body,
B: Body, B: Body,
{ {
@ -136,8 +133,8 @@ where
M::Value: svc::Service< M::Value: svc::Service<
Request = http::Request<RequestBody<A>>, Request = http::Request<RequestBody<A>>,
Response = http::Response<B>, Response = http::Response<B>,
Error = Error,
>, >,
<M::Value as svc::Service>::Error: HasH2Reason,
A: Body, A: Body,
B: Body, B: Body,
{ {
@ -162,8 +159,8 @@ where
S: svc::Service< S: svc::Service<
Request = http::Request<RequestBody<A>>, Request = http::Request<RequestBody<A>>,
Response = http::Response<B>, Response = http::Response<B>,
Error = Error,
>, >,
S::Error: HasH2Reason,
A: Body, A: Body,
B: Body, B: Body,
{ {
@ -182,7 +179,9 @@ where
// Only tap a request iff a `Source` is known. // Only tap a request iff a `Source` is known.
let meta = req.extensions().get::<proxy::Source>().map(|source| { let meta = req.extensions().get::<proxy::Source>().map(|source| {
let scheme = req.uri().scheme_part().cloned(); let scheme = req.uri().scheme_part().cloned();
let authority = req.uri().authority_part() let authority = req
.uri()
.authority_part()
.cloned() .cloned()
.or_else(|| h1::authority_from_host(&req)); .or_else(|| h1::authority_from_host(&req));
let path = req.uri().path().into(); let path = req.uri().path().into();
@ -226,10 +225,11 @@ where
impl<S, B> Future for ResponseFuture<S> impl<S, B> Future for ResponseFuture<S>
where where
B: Body, B: Body,
S: svc::Service<Response = http::Response<B>, Error = Error>, S: svc::Service<Response = http::Response<B>>,
S::Error: HasH2Reason,
{ {
type Item = http::Response<ResponseBody<B>>; type Item = http::Response<ResponseBody<B>>;
type Error = Error; type Error = S::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let rsp = try_ready!(self.inner.poll().map_err(|e| self.tap_err(e))); let rsp = try_ready!(self.inner.poll().map_err(|e| self.tap_err(e)));
@ -266,9 +266,10 @@ where
impl<S, B> ResponseFuture<S> impl<S, B> ResponseFuture<S>
where where
B: Body, B: Body,
S: svc::Service<Response = http::Response<B>, Error = Error>, S: svc::Service<Response = http::Response<B>>,
S::Error: HasH2Reason,
{ {
fn tap_err(&mut self, e: Error) -> Error { fn tap_err(&mut self, e: S::Error) -> S::Error {
if let Some(request) = self.meta.take() { if let Some(request) = self.meta.take() {
let meta = event::Response { let meta = event::Response {
request, request,
@ -285,7 +286,7 @@ where
response_open_at: now, response_open_at: now,
response_first_frame_at: None, response_first_frame_at: None,
response_fail_at: now, response_fail_at: now,
error: e.reason().unwrap_or(h2::Reason::INTERNAL_ERROR), error: e.h2_reason().unwrap_or(h2::Reason::INTERNAL_ERROR),
bytes_sent: 0, bytes_sent: 0,
}, },
)); ));

View File

@ -1,4 +1,4 @@
use metrics as metrics; use metrics;
mod errno; mod errno;
pub mod process; pub mod process;

View File

@ -2,14 +2,11 @@ extern crate tokio_connect;
pub use self::tokio_connect::Connect; pub use self::tokio_connect::Connect;
use futures::Future; use http;
use std::{error, fmt, io}; use std::{error, fmt, io};
use std::net::{IpAddr, SocketAddr}; use std::net::{IpAddr, SocketAddr};
use std::str::FromStr; use std::str::FromStr;
use http;
use convert::TryFrom; use convert::TryFrom;
use dns; use dns;
use svc; use svc;
@ -57,13 +54,6 @@ pub enum HostAndPortError {
MissingPort, MissingPort,
} }
#[derive(Debug, Clone)]
pub struct LookupAddressAndConnect {
host_and_port: HostAndPort,
dns_resolver: dns::Resolver,
tls: tls::ConditionalConnectionConfig<tls::ClientConfig>,
}
// ===== impl HostAndPort ===== // ===== impl HostAndPort =====
impl HostAndPort { impl HostAndPort {
@ -170,46 +160,6 @@ impl fmt::Display for InvalidTarget {
impl error::Error for InvalidTarget {} impl error::Error for InvalidTarget {}
// ===== impl LookupAddressAndConnect =====
impl LookupAddressAndConnect {
pub fn new(
host_and_port: HostAndPort,
dns_resolver: dns::Resolver,
tls: tls::ConditionalConnectionConfig<tls::ClientConfig>,
) -> Self {
Self {
host_and_port,
dns_resolver,
tls,
}
}
}
impl tokio_connect::Connect for LookupAddressAndConnect {
type Connected = connection::Connection;
type Error = io::Error;
type Future = Box<Future<Item = connection::Connection, Error = io::Error> + Send>;
fn connect(&self) -> Self::Future {
let port = self.host_and_port.port;
let host = self.host_and_port.host.clone();
let tls = self.tls.clone();
let c = self.dns_resolver
.resolve_one_ip(&self.host_and_port.host)
.map_err(|_| {
io::Error::new(io::ErrorKind::NotFound, "DNS resolution failed")
})
.and_then(move |ip_addr: IpAddr| {
info!("DNS resolved {:?} to {}", host, ip_addr);
let addr = SocketAddr::from((ip_addr, port));
trace!("connect {}", addr);
connection::connect(&addr, tls)
});
Box::new(c)
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use http::uri::Authority; use http::uri::Authority;

View File

@ -18,7 +18,6 @@ pub use self::{
connect::{ connect::{
Connect, Connect,
DnsNameAndPort, Host, HostAndPort, HostAndPortError, DnsNameAndPort, Host, HostAndPort, HostAndPortError,
LookupAddressAndConnect,
}, },
connection::{ connection::{
BoundPort, BoundPort,

View File

@ -24,9 +24,16 @@ pub struct DstReceiver(sync::mpsc::UnboundedReceiver<pb::Update>);
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct DstSender(sync::mpsc::UnboundedSender<pb::Update>); pub struct DstSender(sync::mpsc::UnboundedSender<pb::Update>);
#[derive(Debug)]
pub struct ProfileReceiver(sync::mpsc::UnboundedReceiver<pb::DestinationProfile>);
#[derive(Clone, Debug)]
pub struct ProfileSender(sync::mpsc::UnboundedSender<pb::DestinationProfile>);
#[derive(Clone, Debug, Default)] #[derive(Clone, Debug, Default)]
pub struct Controller { pub struct Controller {
expect_dst_calls: Arc<Mutex<VecDeque<(pb::GetDestination, DstReceiver)>>>, expect_dst_calls: Arc<Mutex<VecDeque<(pb::GetDestination, DstReceiver)>>>,
expect_profile_calls: Arc<Mutex<VecDeque<(pb::GetDestination, ProfileReceiver)>>>,
} }
pub struct Listening { pub struct Listening {
@ -69,6 +76,19 @@ impl Controller {
run(self, Some(Box::new(f.then(|_| Ok(()))))) run(self, Some(Box::new(f.then(|_| Ok(())))))
} }
pub fn profile_tx(&self, dest: &str) -> ProfileSender {
let (tx, rx) = sync::mpsc::unbounded();
let dst = pb::GetDestination {
scheme: "k8s".into(),
path: dest.into(),
};
self.expect_profile_calls
.lock()
.unwrap()
.push_back((dst, ProfileReceiver(rx)));
ProfileSender(tx)
}
pub fn run(self) -> Listening { pub fn run(self) -> Listening {
run(self, None) run(self, None)
} }
@ -100,6 +120,20 @@ impl DstSender {
} }
} }
impl Stream for ProfileReceiver {
type Item = pb::DestinationProfile;
type Error = grpc::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.0.poll().map_err(|_| grpc::Error::Grpc(grpc::Status::INTERNAL, HeaderMap::new()))
}
}
impl ProfileSender {
pub fn send(&self, up: pb::DestinationProfile) {
self.0.unbounded_send(up).expect("send profile update")
}
}
impl pb::server::Destination for Controller { impl pb::server::Destination for Controller {
type GetStream = DstReceiver; type GetStream = DstReceiver;
type GetFuture = future::FutureResult<grpc::Response<Self::GetStream>, grpc::Error>; type GetFuture = future::FutureResult<grpc::Response<Self::GetStream>, grpc::Error>;
@ -117,6 +151,23 @@ impl pb::server::Destination for Controller {
future::err(grpc::Error::Grpc(grpc::Status::INTERNAL, HeaderMap::new())) future::err(grpc::Error::Grpc(grpc::Status::INTERNAL, HeaderMap::new()))
} }
type GetProfileStream = ProfileReceiver;
type GetProfileFuture = future::FutureResult<grpc::Response<Self::GetProfileStream>, grpc::Error>;
fn get_profile(&mut self, req: grpc::Request<pb::GetDestination>) -> Self::GetProfileFuture {
if let Ok(mut calls) = self.expect_profile_calls.lock() {
if let Some((dst, profile)) = calls.pop_front() {
if &dst == req.get_ref() {
return future::ok(grpc::Response::new(profile));
}
calls.push_front((dst, profile));
}
}
future::err(grpc::Error::Grpc(grpc::Status::INTERNAL, HeaderMap::new()))
}
} }
fn run(controller: Controller, delay: Option<Box<Future<Item=(), Error=()> + Send>>) -> Listening { fn run(controller: Controller, delay: Option<Box<Future<Item=(), Error=()> + Send>>) -> Listening {