mirror of https://github.com/linkerd/linkerd2.git
proxy: Rebind controller client on TLS configuration changes (#1192)
This branch adds the rebinding logic added to outbound clients in #1185 to the controller client used in the proxy's `control::destination::background` module. Now, if we are communicating with the control plane over TLS, we will rebind the controller client stack if the TLS client configuration changes, using the `WatchService` added in #1177. Signed-off-by: Eliza Weisman <eliza@buoyant.io> Signed-off-by: Brian Smith <brian@briansmith.org> Co-authored-by: Brian Smith <brian@briansmith.org>
This commit is contained in:
parent
8b0b681ee6
commit
8a513af862
|
@ -18,7 +18,6 @@ use transparency::{self, HttpBody, h1};
|
||||||
use transport;
|
use transport;
|
||||||
use tls;
|
use tls;
|
||||||
use ctx::transport::TlsStatus;
|
use ctx::transport::TlsStatus;
|
||||||
use conditional::Conditional;
|
|
||||||
use watch_service::{WatchService, Rebind};
|
use watch_service::{WatchService, Rebind};
|
||||||
|
|
||||||
/// Binds a `Service` from a `SocketAddr`.
|
/// Binds a `Service` from a `SocketAddr`.
|
||||||
|
@ -63,10 +62,6 @@ where
|
||||||
protocol: Protocol,
|
protocol: Protocol,
|
||||||
}
|
}
|
||||||
|
|
||||||
// `Bind` cannot use `ConditionalConnectionConfig` since it uses a
|
|
||||||
// `tls::Identity` and a `tls::ClientConfig` obtained from different sources.
|
|
||||||
pub type ConditionalTlsClientConfig = Conditional<tls::ClientConfig, tls::ReasonForNoTls>;
|
|
||||||
|
|
||||||
/// A type of service binding.
|
/// A type of service binding.
|
||||||
///
|
///
|
||||||
/// Some services, for various reasons, may not be able to be used to serve multiple
|
/// Some services, for various reasons, may not be able to be used to serve multiple
|
||||||
|
@ -79,7 +74,7 @@ where
|
||||||
B: tower_h2::Body + Send + 'static,
|
B: tower_h2::Body + Send + 'static,
|
||||||
<B::Data as ::bytes::IntoBuf>::Buf: Send,
|
<B::Data as ::bytes::IntoBuf>::Buf: Send,
|
||||||
{
|
{
|
||||||
Bound(WatchService<ConditionalTlsClientConfig, RebindTls<B>>),
|
Bound(WatchService<tls::ConditionalClientConfig, RebindTls<B>>),
|
||||||
BindsPerRequest {
|
BindsPerRequest {
|
||||||
// When `poll_ready` is called, the _next_ service to be used may be bound
|
// When `poll_ready` is called, the _next_ service to be used may be bound
|
||||||
// ahead-of-time. This stack is used only to serve the next request to this
|
// ahead-of-time. This stack is used only to serve the next request to this
|
||||||
|
@ -136,7 +131,7 @@ pub struct RebindTls<B> {
|
||||||
|
|
||||||
pub type Service<B> = BoundService<B>;
|
pub type Service<B> = BoundService<B>;
|
||||||
|
|
||||||
pub type Stack<B> = WatchService<ConditionalTlsClientConfig, RebindTls<B>>;
|
pub type Stack<B> = WatchService<tls::ConditionalClientConfig, RebindTls<B>>;
|
||||||
|
|
||||||
type StackInner<B> = Reconnect<NormalizeUri<NewHttp<B>>>;
|
type StackInner<B> = Reconnect<NormalizeUri<NewHttp<B>>>;
|
||||||
|
|
||||||
|
@ -234,12 +229,11 @@ where
|
||||||
&self,
|
&self,
|
||||||
ep: &Endpoint,
|
ep: &Endpoint,
|
||||||
protocol: &Protocol,
|
protocol: &Protocol,
|
||||||
tls_client_config: &ConditionalTlsClientConfig,
|
tls_client_config: &tls::ConditionalClientConfig,
|
||||||
)-> StackInner<B> {
|
)-> StackInner<B> {
|
||||||
debug!("bind_inner_stack endpoint={:?}, protocol={:?}", ep, protocol);
|
debug!("bind_inner_stack endpoint={:?}, protocol={:?}", ep, protocol);
|
||||||
let addr = ep.address();
|
let addr = ep.address();
|
||||||
|
|
||||||
// Like `tls::current_connection_config()`.
|
|
||||||
let tls = ep.tls_identity().and_then(|identity| {
|
let tls = ep.tls_identity().and_then(|identity| {
|
||||||
tls_client_config.as_ref().map(|config| {
|
tls_client_config.as_ref().map(|config| {
|
||||||
tls::ConnectionConfig {
|
tls::ConnectionConfig {
|
||||||
|
@ -580,13 +574,13 @@ impl Protocol {
|
||||||
|
|
||||||
// ===== impl RebindTls =====
|
// ===== impl RebindTls =====
|
||||||
|
|
||||||
impl<B> Rebind<ConditionalTlsClientConfig> for RebindTls<B>
|
impl<B> Rebind<tls::ConditionalClientConfig> for RebindTls<B>
|
||||||
where
|
where
|
||||||
B: tower_h2::Body + Send + 'static,
|
B: tower_h2::Body + Send + 'static,
|
||||||
<B::Data as ::bytes::IntoBuf>::Buf: Send,
|
<B::Data as ::bytes::IntoBuf>::Buf: Send,
|
||||||
{
|
{
|
||||||
type Service = StackInner<B>;
|
type Service = StackInner<B>;
|
||||||
fn rebind(&mut self, tls: &ConditionalTlsClientConfig) -> Self::Service {
|
fn rebind(&mut self, tls: &tls::ConditionalClientConfig) -> Self::Service {
|
||||||
debug!(
|
debug!(
|
||||||
"rebinding endpoint stack for {:?}:{:?} on TLS config change",
|
"rebinding endpoint stack for {:?}:{:?} on TLS config change",
|
||||||
self.endpoint, self.protocol,
|
self.endpoint, self.protocol,
|
||||||
|
|
|
@ -142,7 +142,18 @@ impl BoundPort {
|
||||||
// do it here.
|
// do it here.
|
||||||
set_nodelay_or_warn(&socket);
|
set_nodelay_or_warn(&socket);
|
||||||
|
|
||||||
let conn = match tls::current_connection_config(&tls) {
|
let tls = match &tls {
|
||||||
|
Conditional::Some(tls) => match &*tls.config.borrow() {
|
||||||
|
Conditional::Some(config) =>
|
||||||
|
Conditional::Some(tls::ConnectionConfig {
|
||||||
|
identity: tls.identity.clone(),
|
||||||
|
config: config.clone(),
|
||||||
|
}),
|
||||||
|
Conditional::None(r) => Conditional::None(*r),
|
||||||
|
},
|
||||||
|
Conditional::None(r) => Conditional::None(*r),
|
||||||
|
};
|
||||||
|
let conn = match tls {
|
||||||
Conditional::Some(config) => {
|
Conditional::Some(config) => {
|
||||||
// TODO: use `config.identity` to differentiate
|
// TODO: use `config.identity` to differentiate
|
||||||
// between TLS that the proxy should terminate vs.
|
// between TLS that the proxy should terminate vs.
|
||||||
|
|
|
@ -41,10 +41,26 @@ use transport::{DnsNameAndPort, HostAndPort, LookupAddressAndConnect};
|
||||||
use timeout::Timeout;
|
use timeout::Timeout;
|
||||||
use transport::tls;
|
use transport::tls;
|
||||||
use conditional::Conditional;
|
use conditional::Conditional;
|
||||||
|
use watch_service::{Rebind, WatchService};
|
||||||
|
use futures_watch::Watch;
|
||||||
|
|
||||||
type DestinationServiceQuery<T> = Remote<PbUpdate, T>;
|
type DestinationServiceQuery<T> = Remote<PbUpdate, T>;
|
||||||
type UpdateRx<T> = Receiver<PbUpdate, T>;
|
type UpdateRx<T> = Receiver<PbUpdate, T>;
|
||||||
|
|
||||||
|
/// Type of the client service stack used to make destination requests.
|
||||||
|
type ClientService = AddOrigin<Backoff<LogErrors<Reconnect<
|
||||||
|
tower_h2::client::Connect<
|
||||||
|
Timeout<LookupAddressAndConnect>,
|
||||||
|
::logging::ContextualExecutor<
|
||||||
|
::logging::Client<
|
||||||
|
&'static str,
|
||||||
|
HostAndPort
|
||||||
|
>
|
||||||
|
>,
|
||||||
|
BoxBody,
|
||||||
|
>
|
||||||
|
>>>>;
|
||||||
|
|
||||||
/// Satisfies resolutions as requested via `request_rx`.
|
/// Satisfies resolutions as requested via `request_rx`.
|
||||||
///
|
///
|
||||||
/// As the `Background` is polled with a client to Destination service, if the client to the
|
/// As the `Background` is polled with a client to Destination service, if the client to the
|
||||||
|
@ -72,6 +88,13 @@ struct DestinationSet<T: HttpService<ResponseBody = RecvBody>> {
|
||||||
responders: Vec<Responder>,
|
responders: Vec<Responder>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The state needed to bind a new controller client stack.
|
||||||
|
struct BindClient {
|
||||||
|
identity: Conditional<tls::Identity, tls::ReasonForNoTls>,
|
||||||
|
host_and_port: HostAndPort,
|
||||||
|
dns_resolver: dns::Resolver,
|
||||||
|
log_ctx: ::logging::Client<&'static str, HostAndPort>,
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns a new discovery background task.
|
/// Returns a new discovery background task.
|
||||||
pub(super) fn task(
|
pub(super) fn task(
|
||||||
|
@ -84,26 +107,24 @@ pub(super) fn task(
|
||||||
{
|
{
|
||||||
// Build up the Controller Client Stack
|
// Build up the Controller Client Stack
|
||||||
let mut client = host_and_port.map(|host_and_port| {
|
let mut client = host_and_port.map(|host_and_port| {
|
||||||
let scheme = http::uri::Scheme::from_shared(Bytes::from_static(b"http")).unwrap();
|
let (identity, watch) = match controller_tls {
|
||||||
let authority = http::uri::Authority::from(&host_and_port);
|
Conditional::Some(cfg) =>
|
||||||
let connect = Timeout::new(
|
(Conditional::Some(cfg.identity), cfg.config),
|
||||||
LookupAddressAndConnect::new(host_and_port.clone(), dns_resolver.clone(),
|
Conditional::None(reason) => {
|
||||||
controller_tls),
|
// If there's no connection config, then construct a new
|
||||||
Duration::from_secs(3),
|
// `Watch` that never updates to construct the `WatchService`.
|
||||||
|
// We do this here rather than calling `ClientConfig::no_tls`
|
||||||
|
// in order to propagate the reason for no TLS to the watch.
|
||||||
|
let (watch, _) = Watch::new(Conditional::None(reason));
|
||||||
|
(Conditional::None(reason), watch)
|
||||||
|
},
|
||||||
|
};
|
||||||
|
let bind_client = BindClient::new(
|
||||||
|
identity,
|
||||||
|
&dns_resolver,
|
||||||
|
host_and_port,
|
||||||
);
|
);
|
||||||
|
WatchService::new(watch, bind_client)
|
||||||
let log = ::logging::admin().client("control", host_and_port.clone());
|
|
||||||
let h2_client = tower_h2::client::Connect::new(
|
|
||||||
connect,
|
|
||||||
h2::client::Builder::default(),
|
|
||||||
log.executor()
|
|
||||||
);
|
|
||||||
|
|
||||||
let reconnect = Reconnect::new(h2_client);
|
|
||||||
let log_errors = LogErrors::new(reconnect);
|
|
||||||
let backoff = Backoff::new(log_errors, Duration::from_secs(5));
|
|
||||||
// TODO: Use AddOrigin in tower-http
|
|
||||||
AddOrigin::new(scheme, authority, backoff)
|
|
||||||
});
|
});
|
||||||
|
|
||||||
let mut disco = Background::new(
|
let mut disco = Background::new(
|
||||||
|
@ -571,6 +592,62 @@ impl<T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ===== impl BindClient =====
|
||||||
|
|
||||||
|
impl BindClient {
|
||||||
|
fn new(
|
||||||
|
identity: Conditional<tls::Identity, tls::ReasonForNoTls>,
|
||||||
|
dns_resolver: &dns::Resolver,
|
||||||
|
host_and_port: HostAndPort,
|
||||||
|
) -> Self {
|
||||||
|
let log_ctx = ::logging::admin().client("control", host_and_port.clone());
|
||||||
|
Self {
|
||||||
|
identity,
|
||||||
|
dns_resolver: dns_resolver.clone(),
|
||||||
|
host_and_port,
|
||||||
|
log_ctx,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl Rebind<tls::ConditionalClientConfig> for BindClient {
|
||||||
|
type Service = ClientService;
|
||||||
|
fn rebind(
|
||||||
|
&mut self,
|
||||||
|
client_cfg: &tls::ConditionalClientConfig,
|
||||||
|
) -> Self::Service {
|
||||||
|
let conn_cfg = match (&self.identity, client_cfg) {
|
||||||
|
(Conditional::Some(ref id), Conditional::Some(ref cfg)) =>
|
||||||
|
Conditional::Some(tls::ConnectionConfig {
|
||||||
|
identity: id.clone(),
|
||||||
|
config: cfg.clone(),
|
||||||
|
}),
|
||||||
|
(Conditional::None(ref reason), _) |
|
||||||
|
(_, Conditional::None(ref reason)) =>
|
||||||
|
Conditional::None(reason.clone()),
|
||||||
|
};
|
||||||
|
let scheme = http::uri::Scheme::from_shared(Bytes::from_static(b"http")).unwrap();
|
||||||
|
let authority = http::uri::Authority::from(&self.host_and_port);
|
||||||
|
let connect = Timeout::new(
|
||||||
|
LookupAddressAndConnect::new(self.host_and_port.clone(),
|
||||||
|
self.dns_resolver.clone(),
|
||||||
|
conn_cfg),
|
||||||
|
Duration::from_secs(3),
|
||||||
|
);
|
||||||
|
let h2_client = tower_h2::client::Connect::new(
|
||||||
|
connect,
|
||||||
|
h2::client::Builder::default(),
|
||||||
|
self.log_ctx.clone().executor()
|
||||||
|
);
|
||||||
|
|
||||||
|
let reconnect = Reconnect::new(h2_client);
|
||||||
|
let log_errors = LogErrors::new(reconnect);
|
||||||
|
let backoff = Backoff::new(log_errors, Duration::from_secs(5));
|
||||||
|
// TODO: Use AddOrigin in tower-http
|
||||||
|
AddOrigin::new(scheme, authority, backoff)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
/// Construct a new labeled `SocketAddr `from a protobuf `WeightedAddr`.
|
/// Construct a new labeled `SocketAddr `from a protobuf `WeightedAddr`.
|
||||||
fn pb_to_addr_meta(
|
fn pb_to_addr_meta(
|
||||||
pb: WeightedAddr,
|
pb: WeightedAddr,
|
||||||
|
|
|
@ -50,7 +50,7 @@ pub enum HostAndPortError {
|
||||||
pub struct LookupAddressAndConnect {
|
pub struct LookupAddressAndConnect {
|
||||||
host_and_port: HostAndPort,
|
host_and_port: HostAndPort,
|
||||||
dns_resolver: dns::Resolver,
|
dns_resolver: dns::Resolver,
|
||||||
tls: tls::ConditionalConnectionConfig<tls::ClientConfigWatch>,
|
tls: tls::ConditionalConnectionConfig<tls::ClientConfig>,
|
||||||
}
|
}
|
||||||
|
|
||||||
// ===== impl HostAndPort =====
|
// ===== impl HostAndPort =====
|
||||||
|
@ -129,7 +129,7 @@ impl LookupAddressAndConnect {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
host_and_port: HostAndPort,
|
host_and_port: HostAndPort,
|
||||||
dns_resolver: dns::Resolver,
|
dns_resolver: dns::Resolver,
|
||||||
tls: tls::ConditionalConnectionConfig<tls::ClientConfigWatch>,
|
tls: tls::ConditionalConnectionConfig<tls::ClientConfig>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
host_and_port,
|
host_and_port,
|
||||||
|
@ -147,7 +147,7 @@ impl tokio_connect::Connect for LookupAddressAndConnect {
|
||||||
fn connect(&self) -> Self::Future {
|
fn connect(&self) -> Self::Future {
|
||||||
let port = self.host_and_port.port;
|
let port = self.host_and_port.port;
|
||||||
let host = self.host_and_port.host.clone();
|
let host = self.host_and_port.host.clone();
|
||||||
let tls = tls::current_connection_config(&self.tls);
|
let tls = self.tls.clone();
|
||||||
let c = self.dns_resolver
|
let c = self.dns_resolver
|
||||||
.resolve_one_ip(&self.host_and_port.host)
|
.resolve_one_ip(&self.host_and_port.host)
|
||||||
.map_err(|_| {
|
.map_err(|_| {
|
||||||
|
|
|
@ -55,7 +55,6 @@ struct CommonConfig {
|
||||||
cert_resolver: Arc<CertResolver>,
|
cert_resolver: Arc<CertResolver>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/// Validated configuration for TLS servers.
|
/// Validated configuration for TLS servers.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct ClientConfig(pub(super) Arc<rustls::ClientConfig>);
|
pub struct ClientConfig(pub(super) Arc<rustls::ClientConfig>);
|
||||||
|
@ -133,6 +132,7 @@ impl From<ReasonForNoIdentity> for ReasonForNoTls {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type ConditionalConnectionConfig<C> = Conditional<ConnectionConfig<C>, ReasonForNoTls>;
|
pub type ConditionalConnectionConfig<C> = Conditional<ConnectionConfig<C>, ReasonForNoTls>;
|
||||||
|
pub type ConditionalClientConfig = Conditional<ClientConfig, ReasonForNoTls>;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
|
@ -362,20 +362,6 @@ impl ServerConfig {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn current_connection_config<C>(
|
|
||||||
watch: &ConditionalConnectionConfig<Watch<Conditional<C, ReasonForNoTls>>>)
|
|
||||||
-> ConditionalConnectionConfig<C> where C: Clone
|
|
||||||
{
|
|
||||||
watch.as_ref().and_then(|c| {
|
|
||||||
c.config.borrow().as_ref().map(|config| {
|
|
||||||
ConnectionConfig {
|
|
||||||
identity: c.identity.clone(),
|
|
||||||
config: config.clone()
|
|
||||||
}
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn load_file_contents(path: &PathBuf) -> Result<Vec<u8>, Error> {
|
fn load_file_contents(path: &PathBuf) -> Result<Vec<u8>, Error> {
|
||||||
fn load_file(path: &PathBuf) -> Result<Vec<u8>, io::Error> {
|
fn load_file(path: &PathBuf) -> Result<Vec<u8>, io::Error> {
|
||||||
let mut result = Vec::new();
|
let mut result = Vec::new();
|
||||||
|
|
|
@ -16,12 +16,12 @@ pub use self::{
|
||||||
ClientConfigWatch,
|
ClientConfigWatch,
|
||||||
CommonSettings,
|
CommonSettings,
|
||||||
ConditionalConnectionConfig,
|
ConditionalConnectionConfig,
|
||||||
|
ConditionalClientConfig,
|
||||||
ConnectionConfig,
|
ConnectionConfig,
|
||||||
ReasonForNoTls,
|
ReasonForNoTls,
|
||||||
ReasonForNoIdentity,
|
ReasonForNoIdentity,
|
||||||
ServerConfig,
|
ServerConfig,
|
||||||
ServerConfigWatch,
|
ServerConfigWatch,
|
||||||
current_connection_config,
|
|
||||||
watch_for_config_changes,
|
watch_for_config_changes,
|
||||||
},
|
},
|
||||||
connection::{Connection, Session, UpgradeClientToTls},
|
connection::{Connection, Session, UpgradeClientToTls},
|
||||||
|
|
Loading…
Reference in New Issue