refactor: Use a stack-based controller client (#115)

The controller's client is instantiated in the
`control::destination::background` module and is tightly coupled to its
use for address resolution.

In order to share this client across different modules---and to bring it
into line with the rest of the proxy's modular layout---the controller
client is now configured and instantiated in `app::main`. The
`app::control` module includes additional stack modules needed to
configure this client.

Our dependency on tower-buffer has been updated so that buffered
services may be cloned.

The `proxy::reconnect` module has been extended to support a
configurable fixed reconnect backoff; and this backoff delay has been
made configurable via the environment.
This commit is contained in:
Oliver Gould 2018-11-01 11:21:38 -07:00 committed by GitHub
parent f97239baf0
commit 19606bd528
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 717 additions and 410 deletions

View File

@ -1307,7 +1307,7 @@ dependencies = [
[[package]] [[package]]
name = "tower-balance" name = "tower-balance"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/tower-rs/tower#679dcbe3272785277081acd4b6671e2bf69fcc52" source = "git+https://github.com/tower-rs/tower#b95c8d103056d9876b80b856b5f76754bf0f7b85"
dependencies = [ dependencies = [
"futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)",
"indexmap 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "indexmap 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1321,7 +1321,7 @@ dependencies = [
[[package]] [[package]]
name = "tower-buffer" name = "tower-buffer"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/tower-rs/tower#679dcbe3272785277081acd4b6671e2bf69fcc52" source = "git+https://github.com/tower-rs/tower#b95c8d103056d9876b80b856b5f76754bf0f7b85"
dependencies = [ dependencies = [
"futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)",
"tower-service 0.1.0 (git+https://github.com/tower-rs/tower)", "tower-service 0.1.0 (git+https://github.com/tower-rs/tower)",
@ -1330,7 +1330,7 @@ dependencies = [
[[package]] [[package]]
name = "tower-discover" name = "tower-discover"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/tower-rs/tower#679dcbe3272785277081acd4b6671e2bf69fcc52" source = "git+https://github.com/tower-rs/tower#b95c8d103056d9876b80b856b5f76754bf0f7b85"
dependencies = [ dependencies = [
"futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)",
"tower-service 0.1.0 (git+https://github.com/tower-rs/tower)", "tower-service 0.1.0 (git+https://github.com/tower-rs/tower)",
@ -1392,7 +1392,7 @@ dependencies = [
[[package]] [[package]]
name = "tower-in-flight-limit" name = "tower-in-flight-limit"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/tower-rs/tower#679dcbe3272785277081acd4b6671e2bf69fcc52" source = "git+https://github.com/tower-rs/tower#b95c8d103056d9876b80b856b5f76754bf0f7b85"
dependencies = [ dependencies = [
"futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)",
"tower-service 0.1.0 (git+https://github.com/tower-rs/tower)", "tower-service 0.1.0 (git+https://github.com/tower-rs/tower)",
@ -1401,7 +1401,7 @@ dependencies = [
[[package]] [[package]]
name = "tower-reconnect" name = "tower-reconnect"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/tower-rs/tower#679dcbe3272785277081acd4b6671e2bf69fcc52" source = "git+https://github.com/tower-rs/tower#b95c8d103056d9876b80b856b5f76754bf0f7b85"
dependencies = [ dependencies = [
"futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1411,7 +1411,7 @@ dependencies = [
[[package]] [[package]]
name = "tower-service" name = "tower-service"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/tower-rs/tower#7b6460dff2e9969c9c998dd77558f803c770c6ea" source = "git+https://github.com/tower-rs/tower#b95c8d103056d9876b80b856b5f76754bf0f7b85"
dependencies = [ dependencies = [
"futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
@ -1419,7 +1419,7 @@ dependencies = [
[[package]] [[package]]
name = "tower-util" name = "tower-util"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/tower-rs/tower#679dcbe3272785277081acd4b6671e2bf69fcc52" source = "git+https://github.com/tower-rs/tower#b95c8d103056d9876b80b856b5f76754bf0f7b85"
dependencies = [ dependencies = [
"futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)",
"tower-service 0.1.0 (git+https://github.com/tower-rs/tower)", "tower-service 0.1.0 (git+https://github.com/tower-rs/tower)",

View File

@ -36,10 +36,10 @@ pub struct Config {
/// Where to forward externally received connections. /// Where to forward externally received connections.
pub inbound_forward: Option<Addr>, pub inbound_forward: Option<Addr>,
/// The maximum amount of time to wait for a connection to the public peer. /// The maximum amount of time to wait for a connection to a local peer.
pub inbound_connect_timeout: Duration, pub inbound_connect_timeout: Duration,
/// The maximum amount of time to wait for a connection to the private peer. /// The maximum amount of time to wait for a connection to a remote peer.
pub outbound_connect_timeout: Duration, pub outbound_connect_timeout: Duration,
pub inbound_ports_disable_protocol_detection: IndexSet<u16>, pub inbound_ports_disable_protocol_detection: IndexSet<u16>,
@ -77,6 +77,9 @@ pub struct Config {
/// a new connection. /// a new connection.
pub control_backoff_delay: Duration, pub control_backoff_delay: Duration,
/// The maximum amount of time to wait for a connection to the controller.
pub control_connect_timeout: Duration,
/// Age after which metrics may be dropped. /// Age after which metrics may be dropped.
pub metrics_retain_idle: Duration, pub metrics_retain_idle: Duration,
@ -215,6 +218,7 @@ pub const VAR_POD_NAMESPACE: &str = "$LINKERD2_PROXY_POD_NAMESPACE";
pub const ENV_CONTROL_URL: &str = "LINKERD2_PROXY_CONTROL_URL"; pub const ENV_CONTROL_URL: &str = "LINKERD2_PROXY_CONTROL_URL";
pub const ENV_CONTROL_BACKOFF_DELAY: &str = "LINKERD2_PROXY_CONTROL_BACKOFF_DELAY"; pub const ENV_CONTROL_BACKOFF_DELAY: &str = "LINKERD2_PROXY_CONTROL_BACKOFF_DELAY";
const ENV_CONTROL_CONNECT_TIMEOUT: &str = "LINKERD2_PROXY_CONTROL_CONNECT_TIMEOUT";
const ENV_RESOLV_CONF: &str = "LINKERD2_PROXY_RESOLV_CONF"; const ENV_RESOLV_CONF: &str = "LINKERD2_PROXY_RESOLV_CONF";
/// Configures a minimum value for the TTL of DNS lookups. /// Configures a minimum value for the TTL of DNS lookups.
@ -236,6 +240,7 @@ const DEFAULT_INBOUND_CONNECT_TIMEOUT: Duration = Duration::from_millis(20);
const DEFAULT_OUTBOUND_CONNECT_TIMEOUT: Duration = Duration::from_millis(300); const DEFAULT_OUTBOUND_CONNECT_TIMEOUT: Duration = Duration::from_millis(300);
const DEFAULT_BIND_TIMEOUT: Duration = Duration::from_secs(10); // same as in Linkerd const DEFAULT_BIND_TIMEOUT: Duration = Duration::from_secs(10); // same as in Linkerd
const DEFAULT_CONTROL_BACKOFF_DELAY: Duration = Duration::from_secs(5); const DEFAULT_CONTROL_BACKOFF_DELAY: Duration = Duration::from_secs(5);
const DEFAULT_CONTROL_CONNECT_TIMEOUT: Duration = Duration::from_secs(3);
const DEFAULT_RESOLV_CONF: &str = "/etc/resolv.conf"; const DEFAULT_RESOLV_CONF: &str = "/etc/resolv.conf";
/// It's assumed that a typical proxy can serve inbound traffic for up to 100 pod-local /// It's assumed that a typical proxy can serve inbound traffic for up to 100 pod-local
@ -324,6 +329,8 @@ impl<'a> TryFrom<&'a Strings> for Config {
let control_backoff_delay = parse(strings, ENV_CONTROL_BACKOFF_DELAY, parse_duration)? let control_backoff_delay = parse(strings, ENV_CONTROL_BACKOFF_DELAY, parse_duration)?
.unwrap_or(DEFAULT_CONTROL_BACKOFF_DELAY); .unwrap_or(DEFAULT_CONTROL_BACKOFF_DELAY);
let control_connect_timeout = parse(strings, ENV_CONTROL_CONNECT_TIMEOUT, parse_duration)?
.unwrap_or(DEFAULT_CONTROL_CONNECT_TIMEOUT);
let namespaces = Namespaces { let namespaces = Namespaces {
pod: pod_namespace?, pod: pod_namespace?,
@ -444,6 +451,7 @@ impl<'a> TryFrom<&'a Strings> for Config {
.into(), .into(),
control_host_and_port, control_host_and_port,
control_backoff_delay, control_backoff_delay,
control_connect_timeout,
metrics_retain_idle: metrics_retain_idle?.unwrap_or(DEFAULT_METRICS_RETAIN_IDLE), metrics_retain_idle: metrics_retain_idle?.unwrap_or(DEFAULT_METRICS_RETAIN_IDLE),

420
src/app/control.rs Normal file
View File

@ -0,0 +1,420 @@
use h2;
use std::fmt;
use std::time::Duration;
use svc;
use transport::{tls, HostAndPort};
use Conditional;
#[derive(Clone, Debug)]
pub struct Config {
host_and_port: HostAndPort,
tls_server_identity: Conditional<tls::Identity, tls::ReasonForNoTls>,
tls_config: tls::ConditionalClientConfig,
backoff: Duration,
connect_timeout: Duration,
builder: h2::client::Builder,
}
impl Config {
pub fn new(
host_and_port: HostAndPort,
tls_server_identity: Conditional<tls::Identity, tls::ReasonForNoTls>,
backoff: Duration,
connect_timeout: Duration,
) -> Self {
Self {
host_and_port,
tls_server_identity,
tls_config: Conditional::None(tls::ReasonForNoTls::Disabled),
backoff,
connect_timeout,
builder: h2::client::Builder::default(),
}
}
}
impl svc::watch::WithUpdate<tls::ConditionalClientConfig> for Config {
type Updated = Self;
fn with_update(&self, tls_config: &tls::ConditionalClientConfig) -> Self::Updated {
let mut c = self.clone();
c.tls_config = tls_config.clone();
c
}
}
impl fmt::Display for Config {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Display::fmt(&self.host_and_port, f)
}
}
/// Sets the request's URI from `Config`.
pub mod add_origin {
extern crate tower_add_origin;
use self::tower_add_origin::AddOrigin;
use bytes::Bytes;
use http::uri;
use std::marker::PhantomData;
use svc;
#[derive(Debug)]
pub struct Layer<M> {
_p: PhantomData<fn() -> M>,
}
#[derive(Clone, Debug)]
pub struct Stack<M> {
inner: M,
}
// === impl Layer ===
impl<M> Layer<M>
where
M: svc::Stack<super::Config>,
{
pub fn new() -> Self {
Self { _p: PhantomData }
}
}
impl<M> Clone for Layer<M>
where
M: svc::Stack<super::Config>,
{
fn clone(&self) -> Self {
Self::new()
}
}
impl<M> svc::Layer<super::Config, super::Config, M> for Layer<M>
where
M: svc::Stack<super::Config>,
{
type Value = <Stack<M> as svc::Stack<super::Config>>::Value;
type Error = <Stack<M> as svc::Stack<super::Config>>::Error;
type Stack = Stack<M>;
fn bind(&self, inner: M) -> Self::Stack {
Stack { inner }
}
}
// === impl Stack ===
impl<M> svc::Stack<super::Config> for Stack<M>
where
M: svc::Stack<super::Config>,
{
type Value = AddOrigin<M::Value>;
type Error = M::Error;
fn make(&self, config: &super::Config) -> Result<Self::Value, Self::Error> {
let inner = self.inner.make(config)?;
let scheme = uri::Scheme::from_shared(Bytes::from_static(b"http")).unwrap();
let authority = uri::Authority::from(&config.host_and_port);
Ok(AddOrigin::new(inner, scheme, authority))
}
}
}
/// Resolves the controller's `host_and_port` once before building a client.
pub mod resolve {
use futures::{Future, Poll};
use std::marker::PhantomData;
use std::net::SocketAddr;
use std::{error, fmt};
use super::client;
use dns;
use svc;
use transport::{connect, tls};
#[derive(Debug)]
pub struct Layer<M> {
dns: dns::Resolver,
_p: PhantomData<fn() -> M>,
}
#[derive(Clone, Debug)]
pub struct Stack<M> {
dns: dns::Resolver,
inner: M,
}
pub struct NewService<M> {
config: super::Config,
dns: dns::Resolver,
stack: M,
}
pub struct Init<M>
where
M: svc::Stack<client::Target>,
M::Value: svc::NewService,
{
state: State<M>,
}
enum State<M>
where
M: svc::Stack<client::Target>,
M::Value: svc::NewService,
{
Resolve {
future: dns::IpAddrFuture,
config: super::Config,
stack: M,
},
Inner(<M::Value as svc::NewService>::Future),
}
#[derive(Debug)]
pub enum Error<S, I> {
Dns(dns::Error),
Invalid(S),
Inner(I),
}
// === impl Layer ===
impl<M> Layer<M>
where
M: svc::Stack<client::Target> + Clone,
{
pub fn new(dns: dns::Resolver) -> Self {
Self {
dns,
_p: PhantomData,
}
}
}
impl<M> Clone for Layer<M>
where
M: svc::Stack<client::Target> + Clone,
{
fn clone(&self) -> Self {
Self::new(self.dns.clone())
}
}
impl<M> svc::Layer<super::Config, client::Target, M> for Layer<M>
where
M: svc::Stack<client::Target> + Clone,
{
type Value = <Stack<M> as svc::Stack<super::Config>>::Value;
type Error = <Stack<M> as svc::Stack<super::Config>>::Error;
type Stack = Stack<M>;
fn bind(&self, inner: M) -> Self::Stack {
Stack {
inner,
dns: self.dns.clone(),
}
}
}
// === impl Stack ===
impl<M> svc::Stack<super::Config> for Stack<M>
where
M: svc::Stack<client::Target> + Clone,
{
type Value = NewService<M>;
type Error = M::Error;
fn make(&self, config: &super::Config) -> Result<Self::Value, Self::Error> {
Ok(NewService {
dns: self.dns.clone(),
config: config.clone(),
stack: self.inner.clone(),
})
}
}
// === impl NewService ===
impl<M> svc::NewService for NewService<M>
where
M: svc::Stack<client::Target> + Clone,
M::Value: svc::NewService,
{
type Request = <M::Value as svc::NewService>::Request;
type Response = <M::Value as svc::NewService>::Response;
type Error = <M::Value as svc::NewService>::Error;
type Service = <M::Value as svc::NewService>::Service;
type InitError = <Init<M> as Future>::Error;
type Future = Init<M>;
fn new_service(&self) -> Self::Future {
Init {
state: State::Resolve {
future: self.dns.resolve_one_ip(&self.config.host_and_port.host),
stack: self.stack.clone(),
config: self.config.clone(),
},
}
}
}
// === impl Init ===
impl<M> Future for Init<M>
where
M: svc::Stack<client::Target>,
M::Value: svc::NewService,
{
type Item = <M::Value as svc::NewService>::Service;
type Error = Error<M::Error, <M::Value as svc::NewService>::InitError>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
self.state = match self.state {
State::Inner(ref mut fut) => {
return fut.poll().map_err(Error::Inner);
}
State::Resolve {
ref mut future,
ref config,
ref stack,
} => {
let ip = try_ready!(future.poll().map_err(Error::Dns));
let sa = SocketAddr::from((ip, config.host_and_port.port));
let tls = config.tls_server_identity.as_ref().and_then(|id| {
config
.tls_config
.as_ref()
.map(|config| tls::ConnectionConfig {
server_identity: id.clone(),
config: config.clone(),
})
});
let target = client::Target {
connect: connect::Target::new(sa, tls),
builder: config.builder.clone(),
log_ctx: ::logging::admin()
.client("control", config.host_and_port.clone()),
};
let inner = stack.make(&target).map_err(Error::Invalid)?;
State::Inner(svc::NewService::new_service(&inner))
}
};
}
}
}
// === impl Error ===
impl<S: fmt::Display, I: fmt::Display> fmt::Display for Error<S, I> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Error::Dns(dns::Error::NoAddressesFound) => write!(f, "no addresses found"),
Error::Dns(dns::Error::ResolutionFailed(e)) => fmt::Display::fmt(&e, f),
Error::Invalid(ref e) => fmt::Display::fmt(&e, f),
Error::Inner(ref e) => fmt::Display::fmt(&e, f),
}
}
}
impl<S: error::Error, I: error::Error> error::Error for Error<S, I> {}
}
/// Creates a client suitable for gRPC.
pub mod client {
use h2;
use std::marker::PhantomData;
use tower_h2::{client, BoxBody};
use svc;
use transport::{connect, HostAndPort};
#[derive(Clone, Debug)]
pub struct Target {
pub(super) connect: connect::Target,
pub(super) builder: h2::client::Builder,
pub(super) log_ctx: ::logging::Client<&'static str, HostAndPort>,
}
#[derive(Debug)]
pub struct Layer<C> {
_p: PhantomData<fn() -> C>,
}
#[derive(Clone, Debug)]
pub struct Stack<C> {
connect: C,
}
// === impl Layer ===
impl<C> Layer<C>
where
C: svc::Stack<connect::Target> + Clone,
C::Value: connect::Connect,
<C::Value as connect::Connect>::Connected: Send + 'static,
{
pub fn new() -> Self {
Self { _p: PhantomData }
}
}
impl<C> Clone for Layer<C>
where
C: svc::Stack<connect::Target> + Clone,
C::Value: connect::Connect,
<C::Value as connect::Connect>::Connected: Send + 'static,
{
fn clone(&self) -> Self {
Self::new()
}
}
impl<C> svc::Layer<Target, connect::Target, C> for Layer<C>
where
C: svc::Stack<connect::Target> + Clone,
C::Value: connect::Connect,
<C::Value as connect::Connect>::Connected: Send + 'static,
{
type Value = <Stack<C> as svc::Stack<Target>>::Value;
type Error = <Stack<C> as svc::Stack<Target>>::Error;
type Stack = Stack<C>;
fn bind(&self, connect: C) -> Self::Stack {
Stack { connect }
}
}
// === impl Stack ===
impl<C> svc::Stack<Target> for Stack<C>
where
C: svc::Stack<connect::Target> + Clone,
C::Value: connect::Connect,
<C::Value as connect::Connect>::Connected: Send + 'static,
{
type Value = client::Connect<
C::Value,
::logging::ClientExecutor<&'static str, HostAndPort>,
BoxBody,
>;
type Error = C::Error;
fn make(&self, target: &Target) -> Result<Self::Value, Self::Error> {
let c = self.connect.make(&target.connect)?;
let h2 = target.builder.clone();
let e = target
.log_ctx
.clone()
.with_remote(target.connect.addr)
.executor();
Ok(client::Connect::new(c, h2, e))
}
}
}

View File

@ -1,5 +1,5 @@
use bytes; use bytes;
use futures::*; use futures::{self, future, Future, Poll};
use h2; use h2;
use http; use http;
use indexmap::IndexSet; use indexmap::IndexSet;
@ -12,11 +12,10 @@ use tokio::runtime::current_thread;
use tower_h2; use tower_h2;
use app::classify::{Class, ClassifyResponse}; use app::classify::{Class, ClassifyResponse};
use app::{metric_labels::EndpointLabels}; use app::metric_labels::EndpointLabels;
use control; use control;
use dns; use dns;
use drain; use drain;
use futures;
use logging; use logging;
use metrics::{self, FmtMetrics}; use metrics::{self, FmtMetrics};
use proxy::{ use proxy::{
@ -150,6 +149,7 @@ where
mut runtime, mut runtime,
} = self; } = self;
const MAX_IN_FLIGHT: usize = 10_000;
let control_host_and_port = config.control_host_and_port.clone(); let control_host_and_port = config.control_host_and_port.clone();
info!("using controller at {:?}", control_host_and_port); info!("using controller at {:?}", control_host_and_port);
@ -172,12 +172,20 @@ where
config.outbound_ports_disable_protocol_detection, config.outbound_ports_disable_protocol_detection,
); );
let (drain_tx, drain_rx) = drain::channel();
let (dns_resolver, dns_bg) = dns::Resolver::from_system_config_and_env(&config)
.unwrap_or_else(|e| {
// FIXME: DNS configuration should be infallible.
panic!("invalid DNS configuration: {:?}", e);
});
let tap_next_id = tap::NextId::default(); let tap_next_id = tap::NextId::default();
let (taps, observe) = control::Observe::new(100); let (taps, observe) = control::Observe::new(100);
let (http_metrics, http_report) = proxy::http::metrics::new::<
EndpointLabels, let (http_metrics, http_report) =
Class, proxy::http::metrics::new::<EndpointLabels, Class>(config.metrics_retain_idle);
>(config.metrics_retain_idle);
let (transport_metrics, transport_report) = transport::metrics::new(); let (transport_metrics, transport_report) = transport::metrics::new();
let (tls_config_sensor, tls_config_report) = telemetry::tls_config_reload::new(); let (tls_config_sensor, tls_config_report) = telemetry::tls_config_reload::new();
@ -190,172 +198,207 @@ where
let tls_client_config = tls_config_watch.client.clone(); let tls_client_config = tls_config_watch.client.clone();
let tls_cfg_bg = tls_config_watch.start(tls_config_sensor); let tls_cfg_bg = tls_config_watch.start(tls_config_sensor);
let controller_tls = config.tls_settings.as_ref().and_then(|settings| { let controller_fut = {
settings use super::control;
.controller_identity
.as_ref()
.map(|controller_identity| tls::ConnectionConfig {
server_identity: controller_identity.clone(),
config: tls_client_config.clone(),
})
});
let (dns_resolver, dns_bg) = dns::Resolver::from_system_config_and_env(&config) let tls_server_identity = config
.unwrap_or_else(|e| { .tls_settings
// TODO: DNS configuration should be infallible. .as_ref()
panic!("invalid DNS configuration: {:?}", e); .and_then(|s| s.controller_identity.clone().map(|id| id));
let control_config = control_host_and_port.map(|host_and_port| {
control::Config::new(
host_and_port,
tls_server_identity,
config.control_backoff_delay,
config.control_connect_timeout,
)
}); });
let (resolver, resolver_bg) = control::destination::new( // TODO metrics
dns_resolver.clone(), let stack = connect::Stack::new()
config.namespaces.clone(), .push(control::client::Layer::new())
control_host_and_port, .push(control::resolve::Layer::new(dns_resolver.clone()))
controller_tls, .push(reconnect::layer().with_fixed_backoff(config.control_backoff_delay))
config.control_backoff_delay, .push(proxy::timeout::layer(config.control_connect_timeout))
config.destination_concurrency_limit, .push(svc::watch::layer(tls_client_config.clone()))
); .push(svc::stack::phantom_data::layer())
.push(control::add_origin::Layer::new())
.push(buffer::layer())
.push(limit::layer(config.destination_concurrency_limit));
const MAX_IN_FLIGHT: usize = 10_000; // Because the control client is buffered, we need to be able to
// spawn a task on an executor when `make` is called. This is done
// lazily so that a default executor is available to spawn the
// background buffering task.
future::lazy(move || match control_config {
None => Ok(None),
Some(config) => stack
.make(&config)
.map(Some)
.map_err(|e| error!("failed to build controller: {}", e)),
})
};
let (drain_tx, drain_rx) = drain::channel(); // The resolver is created in the proxy core but runs on the admin core.
// This channel is used to move the task.
let (resolver_bg_tx, resolver_bg_rx) = futures::sync::oneshot::channel();
let outbound = { // Build the outbound and inbound proxies using the controller client.
use super::outbound::{discovery::Resolve, orig_proto_upgrade, Endpoint, Recognize}; let main_fut = controller_fut.and_then(move |controller| {
use proxy::{ let (resolver, resolver_bg) = control::destination::new(
http::{balance, metrics}, controller.clone(),
resolve, dns_resolver.clone(),
config.namespaces.clone(),
config.destination_concurrency_limit,
);
resolver_bg_tx
.send(resolver_bg)
.ok()
.expect("admin thread must receive resolver task");
let outbound = {
use super::outbound::{
discovery::Resolve, orig_proto_upgrade, Endpoint, Recognize,
};
use proxy::{
http::{balance, metrics},
resolve,
};
let http_metrics = http_metrics.clone();
// As the outbound proxy accepts connections, we don't do any
// special transport-level handling.
let accept = transport_metrics.accept("outbound").bind(());
// Establishes connections to remote peers.
let connect = connect::Stack::new()
.push(proxy::timeout::layer(config.outbound_connect_timeout))
.push(transport_metrics.connect("outbound"));
let client_stack = connect
.clone()
.push(client::layer("out"))
.push(svc::stack::map_target::layer(|ep: &Endpoint| {
client::Config::from(ep.clone())
}))
.push(reconnect::layer());
let endpoint_stack = client_stack
.push(svc::stack_per_request::layer())
.push(normalize_uri::layer())
.push(orig_proto_upgrade::layer())
.push(tap::layer(tap_next_id.clone(), taps.clone()))
.push(metrics::layer::<_, ClassifyResponse>(http_metrics))
.push(svc::watch::layer(tls_client_config))
.push(buffer::layer());
let dst_router_stack = endpoint_stack
.push(resolve::layer(Resolve::new(resolver)))
.push(balance::layer())
.push(buffer::layer())
.push(timeout::layer(config.bind_timeout))
.push(limit::layer(MAX_IN_FLIGHT))
.push(router::layer(Recognize::new()));
let capacity = config.outbound_router_capacity;
let max_idle_age = config.outbound_router_max_idle_age;
let router = dst_router_stack
.make(&router::Config::new("out", capacity, max_idle_age))
.expect("outbound router");
// As HTTP requests are accepted, we add some request extensions
// including metadata about the request's origin.
let server_stack = svc::stack::phantom_data::layer()
.push(insert_target::layer())
.push(timestamp_request_open::layer())
.bind(svc::shared::stack(router));
serve(
"out",
outbound_listener,
accept,
connect,
server_stack.map_err(|_| {}),
config.outbound_ports_disable_protocol_detection,
get_original_dst.clone(),
drain_rx.clone(),
).map_err(|e| error!("outbound proxy background task failed: {}", e))
}; };
let http_metrics = http_metrics.clone(); let inbound = {
use super::inbound::{self, Endpoint};
// As the outbound proxy accepts connections, we don't do any // As the inbound proxy accepts connections, we don't do any
// special transport-level handling. // special transport-level handling.
let accept = transport_metrics.accept("outbound").bind(()); let accept = transport_metrics.accept("inbound").bind(());
// Establishes connections to remote peers. // Establishes connections to the local application.
let connect = connect::Stack::new() let connect = connect::Stack::new()
.push(proxy::timeout::layer(config.outbound_connect_timeout)) .push(proxy::timeout::layer(config.inbound_connect_timeout))
.push(transport_metrics.connect("outbound")); .push(transport_metrics.connect("inbound"));
let client_stack = connect // A stack configured by `router::Config`, responsible for building
.clone() // a router made of route stacks configured by `inbound::Endpoint`.
.push(client::layer("out")) //
.push(svc::stack::map_target::layer(|ep: &Endpoint| { // If there is no `SO_ORIGINAL_DST` for an inbound socket,
client::Config::from(ep.clone()) // `default_fwd_addr` may be used.
})) //
.push(reconnect::layer()); // `normalize_uri` and `stack_per_request` are applied on the stack
// selectively. For HTTP/2 stacks, for instance, neither service will be
// employed.
let default_fwd_addr = config.inbound_forward.map(|a| a.into());
let stack = connect
.clone()
.push(client::layer("in"))
.push(svc::stack::map_target::layer(|ep: &Endpoint| {
client::Config::from(ep.clone())
}))
.push(reconnect::layer())
.push(svc::stack_per_request::layer())
.push(normalize_uri::layer())
.push(tap::layer(tap_next_id, taps))
.push(proxy::http::metrics::layer::<_, ClassifyResponse>(
http_metrics,
))
.push(buffer::layer())
.push(limit::layer(MAX_IN_FLIGHT))
.push(router::layer(inbound::Recognize::new(default_fwd_addr)));
let endpoint_stack = client_stack // Build a router using the above policy
.push(svc::stack_per_request::layer()) let capacity = config.inbound_router_capacity;
.push(normalize_uri::layer()) let max_idle_age = config.inbound_router_max_idle_age;
.push(orig_proto_upgrade::layer()) let router = stack
.push(tap::layer(tap_next_id.clone(), taps.clone())) .make(&router::Config::new("in", capacity, max_idle_age))
.push(metrics::layer::<_, ClassifyResponse>(http_metrics)) .expect("inbound router");
.push(svc::watch::layer(tls_client_config));
let dst_router_stack = endpoint_stack // As HTTP requests are accepted, we add some request extensions
.push(resolve::layer(Resolve::new(resolver))) // including metadata about the request's origin.
.push(balance::layer()) //
.push(buffer::layer()) // Furthermore, HTTP/2 requests may be downgraded to HTTP/1.1 per
.push(timeout::layer(config.bind_timeout)) // `orig-proto` headers. This happens in the source stack so that
.push(limit::layer(MAX_IN_FLIGHT)) // the router need not detect whether a request _will be_ downgraded.
.push(router::layer(Recognize::new())); let source_stack = svc::stack::phantom_data::layer()
.push(inbound::orig_proto_downgrade::layer())
.push(insert_target::layer())
.push(timestamp_request_open::layer())
.bind(svc::shared::stack(router));
let capacity = config.outbound_router_capacity; serve(
let max_idle_age = config.outbound_router_max_idle_age; "in",
let router = dst_router_stack inbound_listener,
.make(&router::Config::new("out", capacity, max_idle_age)) accept,
.expect("outbound router"); connect,
source_stack.map_err(|_| {}),
config.inbound_ports_disable_protocol_detection,
get_original_dst.clone(),
drain_rx.clone(),
).map_err(|e| error!("inbound proxy background task failed: {}", e))
};
// As HTTP requests are accepted, we add some request extensions inbound.join(outbound).map(|_| {})
// including metadata about the request's origin. });
let server_stack = svc::stack::phantom_data::layer()
.push(insert_target::layer())
.push(timestamp_request_open::layer())
.bind(svc::shared::stack(router));
serve(
"out",
outbound_listener,
accept,
connect,
server_stack.map_err(|_| {}),
config.outbound_ports_disable_protocol_detection,
get_original_dst.clone(),
drain_rx.clone(),
)
};
let inbound = {
use super::inbound::{self, Endpoint};
use proxy::http::metrics;
// As the inbound proxy accepts connections, we don't do any
// special transport-level handling.
let accept = transport_metrics.accept("inbound").bind(());
// Establishes connections to the local application.
let connect = connect::Stack::new()
.push(proxy::timeout::layer(config.inbound_connect_timeout))
.push(transport_metrics.connect("inbound"));
// A stack configured by `router::Config`, responsible for building
// a router made of route stacks configured by `inbound::Endpoint`.
//
// If there is no `SO_ORIGINAL_DST` for an inbound socket,
// `default_fwd_addr` may be used.
//
// `normalize_uri` and `stack_per_request` are applied on the stack
// selectively. For HTTP/2 stacks, for instance, neither service will be
// employed.
let default_fwd_addr = config.inbound_forward.map(|a| a.into());
let stack = connect
.clone()
.push(client::layer("in"))
.push(svc::stack::map_target::layer(|ep: &Endpoint| {
client::Config::from(ep.clone())
}))
.push(reconnect::layer())
.push(svc::stack_per_request::layer())
.push(normalize_uri::layer())
.push(tap::layer(tap_next_id, taps))
.push(metrics::layer::<_, ClassifyResponse>(http_metrics))
.push(buffer::layer())
.push(limit::layer(MAX_IN_FLIGHT))
.push(router::layer(inbound::Recognize::new(default_fwd_addr)));
// Build a router using the above policy
let capacity = config.inbound_router_capacity;
let max_idle_age = config.inbound_router_max_idle_age;
let router = stack
.make(&router::Config::new("in", capacity, max_idle_age))
.expect("inbound router");
// As HTTP requests are accepted, we add some request extensions
// including metadata about the request's origin.
//
// Furthermore, HTTP/2 requests may be downgraded to HTTP/1.1 per
// `orig-proto` headers. This happens in the source stack so that
// the router need not detect whether a request _will be_ downgraded.
let source_stack = svc::stack::phantom_data::layer()
.push(inbound::orig_proto_downgrade::layer())
.push(insert_target::layer())
.push(timestamp_request_open::layer())
.bind(svc::shared::stack(router));
serve(
"in",
inbound_listener,
accept,
connect,
source_stack.map_err(|_| {}),
config.inbound_ports_disable_protocol_detection,
get_original_dst.clone(),
drain_rx.clone(),
)
};
trace!("running");
let (_tx, admin_shutdown_signal) = futures::sync::oneshot::channel::<()>(); let (_tx, admin_shutdown_signal) = futures::sync::oneshot::channel::<()>();
{ {
@ -375,12 +418,16 @@ where
metrics::Serve::new(report), metrics::Serve::new(report),
); );
rt.spawn(::logging::admin().bg("resolver").future(resolver_bg));
// tap is already pushped in a logging Future. // tap is already pushped in a logging Future.
rt.spawn(tap); rt.spawn(tap);
// metrics_server is already pushped in a logging Future. // metrics_server is already pushped in a logging Future.
rt.spawn(metrics); rt.spawn(metrics);
rt.spawn(::logging::admin().bg("dns-resolver").future(dns_bg)); rt.spawn(::logging::admin().bg("dns-resolver").future(dns_bg));
rt.spawn(
::logging::admin()
.bg("resolver")
.future(resolver_bg_rx.map_err(|_| {}).flatten()),
);
rt.spawn(::logging::admin().bg("tls-config").future(tls_cfg_bg)); rt.spawn(::logging::admin().bg("tls-config").future(tls_cfg_bg));
@ -392,12 +439,8 @@ where
trace!("controller client thread spawned"); trace!("controller client thread spawned");
} }
let fut = inbound trace!("running");
.join(outbound) runtime.spawn(Box::new(main_fut));
.map(|_| ())
.map_err(|err| error!("main error: {:?}", err));
runtime.spawn(Box::new(fut));
trace!("main task spawned"); trace!("main task spawned");
let shutdown_signal = shutdown_signal.and_then(move |()| { let shutdown_signal = shutdown_signal.and_then(move |()| {

View File

@ -5,6 +5,7 @@ use logging;
mod classify; mod classify;
pub mod config; pub mod config;
mod control;
mod destination; mod destination;
mod inbound; mod inbound;
mod main; mod main;

View File

@ -1,124 +0,0 @@
use std::time::Duration;
use bytes::Bytes;
use futures::Poll;
use h2;
use http;
use tower_h2::{self, BoxBody, RecvBody};
use tower_add_origin::AddOrigin;
use Conditional;
use dns;
use proxy;
use svc;
use timeout::Timeout;
use transport::{tls, HostAndPort, LookupAddressAndConnect};
/// Type of the client service stack used to make destination requests.
pub(super) struct ClientService(Service);
type Service = AddOrigin<
proxy::reconnect::Service<
HostAndPort,
tower_h2::client::Connect<
Timeout<LookupAddressAndConnect>,
::logging::ContextualExecutor<
::logging::Client<
&'static str,
HostAndPort
>
>,
BoxBody,
>
>
>;
/// The state needed to bind a new controller client stack.
pub(super) struct BindClient {
backoff_delay: Duration,
identity: Conditional<tls::Identity, tls::ReasonForNoTls>,
host_and_port: HostAndPort,
dns_resolver: dns::Resolver,
log_ctx: ::logging::Client<&'static str, HostAndPort>,
}
// ===== impl ClientService =====
impl svc::Service for ClientService {
type Request = http::Request<BoxBody>;
type Response = http::Response<RecvBody>;
type Error = <Service as svc::Service>::Error;
type Future = <Service as svc::Service>::Future;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
// If an error occurs on established connection, log it; otherwise the `reconnect` module will handle re-establishing a client.
loop {
match self.0.poll_ready() {
Ok(v) => return Ok(v),
Err(e) => {
info!("Controller client error: {}", e)
}
}
}
}
fn call(&mut self, request: Self::Request) -> Self::Future {
self.0.call(request)
}
}
// ===== impl BindClient =====
impl BindClient {
pub(super) fn new(
identity: Conditional<tls::Identity, tls::ReasonForNoTls>,
dns_resolver: &dns::Resolver,
host_and_port: HostAndPort,
backoff_delay: Duration,
) -> Self {
let log_ctx = ::logging::admin().client("control", host_and_port.clone());
Self {
backoff_delay,
identity,
dns_resolver: dns_resolver.clone(),
host_and_port,
log_ctx,
}
}
}
impl svc::Stack<tls::ConditionalClientConfig> for BindClient {
type Value = ClientService;
type Error = ();
fn make(&self, cfg: &tls::ConditionalClientConfig) -> Result<Self::Value, Self::Error> {
let conn_cfg = match (&self.identity, cfg) {
(Conditional::Some(ref id), Conditional::Some(ref cfg)) =>
Conditional::Some(tls::ConnectionConfig {
server_identity: id.clone(),
config: cfg.clone(),
}),
(Conditional::None(ref reason), _) |
(_, Conditional::None(ref reason)) =>
Conditional::None(reason.clone()),
};
let scheme = http::uri::Scheme::from_shared(Bytes::from_static(b"http")).unwrap();
let authority = http::uri::Authority::from(&self.host_and_port);
let connect = Timeout::new(
LookupAddressAndConnect::new(self.host_and_port.clone(),
self.dns_resolver.clone(),
conn_cfg),
Duration::from_secs(3),
);
let h2_client = tower_h2::client::Connect::new(
connect,
h2::client::Builder::default(),
self.log_ctx.clone().executor()
);
let reconnect = proxy::reconnect::Service::new(self.host_and_port.clone(), h2_client)
.with_fixed_backoff(self.backoff_delay);
Ok(ClientService(AddOrigin::new(reconnect, scheme, authority)))
}
}

View File

@ -8,7 +8,7 @@ use std::{
}; };
use futures::{Async, Future, Stream,}; use futures::{Async, Future, Stream,};
use tower_h2::{BoxBody, HttpService, RecvBody}; use tower_h2::{Body, Data, HttpService};
use api::{ use api::{
destination::{ destination::{
@ -21,7 +21,7 @@ use api::{
use control::{ use control::{
cache::{Cache, CacheChange, Exists}, cache::{Cache, CacheChange, Exists},
destination::{Metadata, Responder, ProtocolHint, Update}, destination::{Metadata, ProtocolHint, Responder, Update},
remote_stream::Remote, remote_stream::Remote,
}; };
use dns::{self, IpAddrListFuture}; use dns::{self, IpAddrListFuture};
@ -31,7 +31,7 @@ use Conditional;
use super::{ActiveQuery, DestinationServiceQuery, UpdateRx}; use super::{ActiveQuery, DestinationServiceQuery, UpdateRx};
/// Holds the state of a single resolution. /// Holds the state of a single resolution.
pub(super) struct DestinationSet<T: HttpService<ResponseBody = RecvBody>> { pub(super) struct DestinationSet<T: HttpService> {
pub addrs: Exists<Cache<SocketAddr, Metadata>>, pub addrs: Exists<Cache<SocketAddr, Metadata>>,
pub query: DestinationServiceQuery<T>, pub query: DestinationServiceQuery<T>,
pub dns_query: Option<IpAddrListFuture>, pub dns_query: Option<IpAddrListFuture>,
@ -42,7 +42,8 @@ pub(super) struct DestinationSet<T: HttpService<ResponseBody = RecvBody>> {
impl<T> DestinationSet<T> impl<T> DestinationSet<T>
where where
T: HttpService<RequestBody = BoxBody, ResponseBody = RecvBody>, T: HttpService,
T::ResponseBody: Body<Data = Data>,
T::Error: fmt::Debug, T::Error: fmt::Debug,
{ {
pub(super) fn reset_dns_query( pub(super) fn reset_dns_query(
@ -179,8 +180,11 @@ where
} }
} }
impl<T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> { impl<T> DestinationSet<T>
where
T: HttpService,
T::ResponseBody: Body<Data = Data>,
{
/// Returns `true` if the authority that created this query _should_ query /// Returns `true` if the authority that created this query _should_ query
/// the Destination service, but was unable to due to insufficient capaacity. /// the Destination service, but was unable to due to insufficient capaacity.
pub(super) fn needs_query_capacity(&self) -> bool { pub(super) fn needs_query_capacity(&self) -> bool {

View File

@ -5,17 +5,15 @@ use std::{
}, },
fmt, fmt,
mem, mem,
time::{Instant, Duration}, time::Instant,
sync::Arc, sync::Arc,
}; };
use futures::{ use futures::{
future,
sync::mpsc, sync::mpsc,
Async, Future, Poll, Stream, Async, Poll, Stream,
}; };
use futures_watch;
use tower_grpc as grpc; use tower_grpc as grpc;
use tower_h2::{BoxBody, HttpService, RecvBody}; use tower_h2::{Body, BoxBody, Data, HttpService};
use api::destination::client::Destination; use api::destination::client::Destination;
use api::destination::{ use api::destination::{
@ -31,17 +29,11 @@ use control::{
remote_stream::{Receiver, Remote}, remote_stream::{Receiver, Remote},
}; };
use dns; use dns;
use transport::{tls, DnsNameAndPort, HostAndPort}; use transport::DnsNameAndPort;
use Conditional;
use svc::stack::watch;
mod client;
mod destination_set; mod destination_set;
use self::{ use self::destination_set::DestinationSet;
client::BindClient,
destination_set::DestinationSet,
};
type ActiveQuery<T> = Remote<PbUpdate, T>; type ActiveQuery<T> = Remote<PbUpdate, T>;
type UpdateRx<T> = Receiver<PbUpdate, T>; type UpdateRx<T> = Receiver<PbUpdate, T>;
@ -52,7 +44,7 @@ type UpdateRx<T> = Receiver<PbUpdate, T>;
/// service is healthy, it reads requests from `request_rx`, determines how to resolve the /// service is healthy, it reads requests from `request_rx`, determines how to resolve the
/// provided authority to a set of addresses, and ensures that resolution updates are /// provided authority to a set of addresses, and ensures that resolution updates are
/// propagated to all requesters. /// propagated to all requesters.
struct Background<T: HttpService<ResponseBody = RecvBody>> { pub(super) struct Background<T: HttpService> {
new_query: NewQuery, new_query: NewQuery,
dns_resolver: dns::Resolver, dns_resolver: dns::Resolver,
dsts: DestinationCache<T>, dsts: DestinationCache<T>,
@ -66,7 +58,7 @@ struct Background<T: HttpService<ResponseBody = RecvBody>> {
/// Holds the currently active `DestinationSet`s and a list of any destinations /// Holds the currently active `DestinationSet`s and a list of any destinations
/// which require reconnects. /// which require reconnects.
#[derive(Default)] #[derive(Default)]
struct DestinationCache<T: HttpService<ResponseBody = RecvBody>> { struct DestinationCache<T: HttpService> {
destinations: HashMap<DnsNameAndPort, DestinationSet<T>>, destinations: HashMap<DnsNameAndPort, DestinationSet<T>>,
/// A queue of authorities that need to be reconnected. /// A queue of authorities that need to be reconnected.
reconnects: VecDeque<DnsNameAndPort>, reconnects: VecDeque<DnsNameAndPort>,
@ -86,67 +78,21 @@ struct NewQuery {
concurrency_limit: usize, concurrency_limit: usize,
} }
enum DestinationServiceQuery<T: HttpService<ResponseBody = RecvBody>> { enum DestinationServiceQuery<T: HttpService> {
Inactive, Inactive,
Active(ActiveQuery<T>), Active(ActiveQuery<T>),
NoCapacity, NoCapacity,
} }
/// Returns a new discovery background task.
pub(super) fn task(
request_rx: mpsc::UnboundedReceiver<ResolveRequest>,
dns_resolver: dns::Resolver,
namespaces: Namespaces,
host_and_port: Option<HostAndPort>,
controller_tls: tls::ConditionalConnectionConfig<tls::ClientConfigWatch>,
control_backoff_delay: Duration,
concurrency_limit: usize,
) -> impl Future<Item = (), Error = ()>
{
// Build up the Controller Client Stack
let mut client = host_and_port.map(|host_and_port| {
let (identity, watch) = match controller_tls {
Conditional::Some(config) =>
(Conditional::Some(config.server_identity), config.config),
Conditional::None(reason) => {
// If there's no connection config, then construct a new
// `Watch` that never updates to construct the `WatchService`.
// We do this here rather than calling `ClientConfig::no_tls`
// in order to propagate the reason for no TLS to the watch.
let (watch, _) = futures_watch::Watch::new(Conditional::None(reason));
(Conditional::None(reason), watch)
},
};
let bind_client = BindClient::new(
identity,
&dns_resolver,
host_and_port,
control_backoff_delay,
);
watch::Service::try(watch, bind_client)
.expect("client construction should be infallible")
});
let mut disco = Background::new(
request_rx,
dns_resolver,
namespaces,
concurrency_limit,
);
future::poll_fn(move || {
disco.poll_rpc(&mut client)
})
}
// ==== impl Background ===== // ==== impl Background =====
impl<T> Background<T> impl<T> Background<T>
where where
T: HttpService<RequestBody = BoxBody, ResponseBody = RecvBody>, T: HttpService<RequestBody = BoxBody>,
T::ResponseBody: Body<Data = Data>,
T::Error: fmt::Debug, T::Error: fmt::Debug,
{ {
fn new( pub(super) fn new(
request_rx: mpsc::UnboundedReceiver<ResolveRequest>, request_rx: mpsc::UnboundedReceiver<ResolveRequest>,
dns_resolver: dns::Resolver, dns_resolver: dns::Resolver,
namespaces: Namespaces, namespaces: Namespaces,
@ -161,7 +107,7 @@ where
} }
} }
fn poll_rpc(&mut self, client: &mut Option<T>) -> Poll<(), ()> { pub(super) fn poll_rpc(&mut self, client: &mut Option<T>) -> Poll<(), ()> {
// This loop is make sure any streams that were found disconnected // This loop is make sure any streams that were found disconnected
// in `poll_destinations` while the `rpc` service is ready should // in `poll_destinations` while the `rpc` service is ready should
// be reconnected now, otherwise the task would just sleep... // be reconnected now, otherwise the task would just sleep...
@ -396,7 +342,8 @@ impl NewQuery {
connect_or_reconnect: &str, connect_or_reconnect: &str,
) -> DestinationServiceQuery<T> ) -> DestinationServiceQuery<T>
where where
T: HttpService<RequestBody = BoxBody, ResponseBody = RecvBody>, T: HttpService<RequestBody = BoxBody>,
T::ResponseBody: Body<Data = Data>,
T::Error: fmt::Debug, T::Error: fmt::Debug,
{ {
trace!( trace!(
@ -452,7 +399,8 @@ impl NewQuery {
impl<T> DestinationCache<T> impl<T> DestinationCache<T>
where where
T: HttpService<RequestBody = BoxBody, ResponseBody = RecvBody>, T: HttpService,
T::ResponseBody: Body<Data = Data>,
T::Error: fmt::Debug, T::Error: fmt::Debug,
{ {
@ -486,7 +434,11 @@ where
// ===== impl DestinationServiceQuery ===== // ===== impl DestinationServiceQuery =====
impl<T: HttpService<ResponseBody = RecvBody>> DestinationServiceQuery<T> { impl<T> DestinationServiceQuery<T>
where
T: HttpService,
T::ResponseBody: Body<Data = Data>,
{
pub fn is_active(&self) -> bool { pub fn is_active(&self) -> bool {
match self { match self {
@ -512,7 +464,8 @@ impl<T: HttpService<ResponseBody = RecvBody>> DestinationServiceQuery<T> {
impl<T> From<ActiveQuery<T>> for DestinationServiceQuery<T> impl<T> From<ActiveQuery<T>> for DestinationServiceQuery<T>
where where
T: HttpService<ResponseBody = RecvBody>, T: HttpService,
T::ResponseBody: Body<Data = Data>,
{ {
fn from(active: ActiveQuery<T>) -> Self { fn from(active: ActiveQuery<T>) -> Self {
DestinationServiceQuery::Active(active) DestinationServiceQuery::Active(active)

View File

@ -24,30 +24,32 @@
//! - We need some means to limit the number of endpoints that can be returned for a //! - We need some means to limit the number of endpoints that can be returned for a
//! single resolution so that `control::Cache` is not effectively unbounded. //! single resolution so that `control::Cache` is not effectively unbounded.
use indexmap::IndexMap;
use std::sync::{Arc, Weak};
use std::time::Duration;
use futures::{ use futures::{
future,
sync::mpsc, sync::mpsc,
Async, Async,
Future, Future,
Poll, Poll,
Stream Stream
}; };
use indexmap::IndexMap;
use std::fmt;
use std::sync::{Arc, Weak};
use tower_h2::{Body, BoxBody, Data, HttpService};
use dns; use dns;
use transport::tls; use transport::tls;
use proxy::resolve::{self, Resolve, Update}; use proxy::resolve::{self, Resolve, Update};
use transport::{DnsNameAndPort, HostAndPort}; use transport::DnsNameAndPort;
pub mod background; pub mod background;
use app::config::Namespaces; use app::config::Namespaces;
use self::background::Background;
use Conditional; use Conditional;
/// A handle to request resolutions from the background discovery task. /// A handle to request resolutions from the background discovery task.
#[derive(Clone, Debug)] #[derive(Clone)]
pub struct Resolver { pub struct Resolver {
request_tx: mpsc::UnboundedSender<ResolveRequest>, request_tx: mpsc::UnboundedSender<ResolveRequest>,
} }
@ -109,26 +111,27 @@ pub enum ProtocolHint {
/// The `Resolver` is used by a listener to request resolutions, while /// The `Resolver` is used by a listener to request resolutions, while
/// the background future is executed on the controller thread's executor /// the background future is executed on the controller thread's executor
/// to drive the background task. /// to drive the background task.
pub fn new( pub fn new<T>(
mut client: Option<T>,
dns_resolver: dns::Resolver, dns_resolver: dns::Resolver,
namespaces: Namespaces, namespaces: Namespaces,
host_and_port: Option<HostAndPort>,
controller_tls: tls::ConditionalConnectionConfig<tls::ClientConfigWatch>,
control_backoff_delay: Duration,
concurrency_limit: usize, concurrency_limit: usize,
) -> (Resolver, impl Future<Item = (), Error = ()>) { ) -> (Resolver, impl Future<Item = (), Error = ()>)
where
T: HttpService<RequestBody = BoxBody>,
T::ResponseBody: Body<Data = Data>,
T::Error: fmt::Debug,
{
let (request_tx, rx) = mpsc::unbounded(); let (request_tx, rx) = mpsc::unbounded();
let disco = Resolver { request_tx }; let disco = Resolver { request_tx };
let bg = background::task( let mut bg = Background::new(
rx, rx,
dns_resolver, dns_resolver,
namespaces, namespaces,
host_and_port,
controller_tls,
control_backoff_delay,
concurrency_limit, concurrency_limit,
); );
(disco, bg) let task = future::poll_fn(move || bg.poll_rpc(&mut client));
(disco, task)
} }
// ==== impl Resolver ===== // ==== impl Resolver =====

View File

@ -24,6 +24,7 @@ pub enum IpAddrFuture {
Fixed(IpAddr), Fixed(IpAddr),
} }
#[derive(Debug)]
pub enum Error { pub enum Error {
NoAddressesFound, NoAddressesFound,
ResolutionFailed(ResolveError), ResolutionFailed(ResolveError),

View File

@ -34,7 +34,6 @@ extern crate regex;
extern crate ring; extern crate ring;
extern crate tokio; extern crate tokio;
extern crate tokio_timer; extern crate tokio_timer;
extern crate tower_add_origin;
extern crate tower_grpc; extern crate tower_grpc;
extern crate tower_h2; extern crate tower_h2;
extern crate tower_util; extern crate tower_util;

View File

@ -197,14 +197,14 @@ pub fn admin() -> Section {
Section::Admin Section::Admin
} }
#[derive(Copy, Clone)] #[derive(Copy, Clone, Debug)]
pub enum Section { pub enum Section {
Proxy, Proxy,
Admin, Admin,
} }
/// A utility for logging actions taken on behalf of a server task. /// A utility for logging actions taken on behalf of a server task.
#[derive(Clone)] #[derive(Clone, Debug)]
pub struct Server { pub struct Server {
section: Section, section: Section,
name: &'static str, name: &'static str,
@ -213,7 +213,7 @@ pub struct Server {
} }
/// A utility for logging actions taken on behalf of a client task. /// A utility for logging actions taken on behalf of a client task.
#[derive(Clone)] #[derive(Clone, Debug)]
pub struct Client<C: fmt::Display, D: fmt::Display> { pub struct Client<C: fmt::Display, D: fmt::Display> {
section: Section, section: Section,
client: C, client: C,

View File

@ -60,15 +60,14 @@ pub fn layer() -> Layer {
} }
} }
// TODO: once a stacked clients needs backoff... impl Layer {
// impl Layer { pub fn with_fixed_backoff(self, wait: Duration) -> Self {
// pub fn with_fixed_backoff(self, wait: Duration) -> Self { Self {
// Self { backoff: Backoff::Fixed(wait),
// backoff: Backoff::Fixed(wait), .. self
// .. self }
// } }
// } }
// }
impl<T, M> svc::Layer<T, T, M> for Layer impl<T, M> svc::Layer<T, T, M> for Layer
where where
@ -113,23 +112,23 @@ where
// === impl Service === // === impl Service ===
impl<T, N> Service<T, N> #[cfg(test)]
impl<N> Service<&'static str, N>
where where
T: fmt::Debug,
N: svc::NewService, N: svc::NewService,
N::InitError: fmt::Display, N::InitError: fmt::Display,
{ {
pub fn new(target: T, new_service: N) -> Self { fn for_test(new_service: N) -> Self {
Self { Self {
inner: Reconnect::new(new_service), inner: Reconnect::new(new_service),
target, target: "test",
backoff: Backoff::None, backoff: Backoff::None,
active_backoff: None, active_backoff: None,
mute_connect_error_log: false, mute_connect_error_log: false,
} }
} }
pub fn with_fixed_backoff(self, wait: Duration) -> Self { fn with_fixed_backoff(self, wait: Duration) -> Self {
Self { Self {
backoff: Backoff::Fixed(wait), backoff: Backoff::Fixed(wait),
.. self .. self
@ -195,7 +194,7 @@ where
// task is notified below. // task is notified below.
self.active_backoff = match self.backoff { self.active_backoff = match self.backoff {
Backoff::None => None, Backoff::None => None,
Backoff::Fixed(wait) => Some(Delay::new(clock::now() + wait)), Backoff::Fixed(ref wait) => Some(Delay::new(clock::now() + *wait)),
}; };
// The inner service is now idle and will renew its internal // The inner service is now idle and will renew its internal
@ -317,7 +316,7 @@ mod tests {
#[test] #[test]
fn reconnects_with_backoff() { fn reconnects_with_backoff() {
let mock = NewService { fails: 2.into() }; let mock = NewService { fails: 2.into() };
let mut backoff = super::Service::new("test", mock) let mut backoff = super::Service::for_test(mock)
.with_fixed_backoff(Duration::from_millis(100)); .with_fixed_backoff(Duration::from_millis(100));
let mut rt = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();