Add initial tests for client and server connection handling w.r.t. TLS. (#28)
* Add initial tests for client and server connection handling w.r.t. TLS. Add a simple framework for TLS connection handling and some initial tests that use it. An explicit effort has been made to keep the test configuration as close to the production configuration as possible; e.g. we use regular TCP sockets instead of some mock TCP sockets. This matters less now, but will matter more later, if/when we implement more low-level TLS-over-TCP optimizations. Rename `ConnectionConfig::identity` to `ConnectionConfig::server_identity` to make it clearer that it is always the identity of the server, regardless of which role the `ConnectionConfig` is being used in. Signed-off-by: Brian Smith <brian@briansmith.org>
This commit is contained in:
parent
04a8ae3edf
commit
9a19457ca1
|
@ -241,7 +241,7 @@ where
|
|||
let tls = ep.tls_identity().and_then(|identity| {
|
||||
tls_client_config.as_ref().map(|config| {
|
||||
tls::ConnectionConfig {
|
||||
identity: identity.clone(),
|
||||
server_identity: identity.clone(),
|
||||
config: config.clone(),
|
||||
}
|
||||
})
|
||||
|
|
|
@ -458,6 +458,12 @@ impl From<Addr> for SocketAddr {
|
|||
}
|
||||
}
|
||||
|
||||
impl From<SocketAddr> for Addr {
|
||||
fn from(addr: SocketAddr) -> Self {
|
||||
Addr(addr)
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl Env =====
|
||||
|
||||
impl Strings for Env {
|
||||
|
|
|
@ -112,7 +112,7 @@ pub(super) fn task(
|
|||
let mut client = host_and_port.map(|host_and_port| {
|
||||
let (identity, watch) = match controller_tls {
|
||||
Conditional::Some(cfg) =>
|
||||
(Conditional::Some(cfg.identity), cfg.config),
|
||||
(Conditional::Some(cfg.server_identity), cfg.config),
|
||||
Conditional::None(reason) => {
|
||||
// If there's no connection config, then construct a new
|
||||
// `Watch` that never updates to construct the `WatchService`.
|
||||
|
@ -625,7 +625,7 @@ impl Rebind<tls::ConditionalClientConfig> for BindClient {
|
|||
let conn_cfg = match (&self.identity, client_cfg) {
|
||||
(Conditional::Some(ref id), Conditional::Some(ref cfg)) =>
|
||||
Conditional::Some(tls::ConnectionConfig {
|
||||
identity: id.clone(),
|
||||
server_identity: id.clone(),
|
||||
config: cfg.clone(),
|
||||
}),
|
||||
(Conditional::None(ref reason), _) |
|
||||
|
|
|
@ -155,7 +155,7 @@ where
|
|||
let tls = config.tls_settings.as_ref().and_then(|settings| {
|
||||
tls_config_watch.server.as_ref().map(|tls_server_config| {
|
||||
tls::ConnectionConfig {
|
||||
identity: settings.pod_identity.clone(),
|
||||
server_identity: settings.pod_identity.clone(),
|
||||
config: tls_server_config.clone(),
|
||||
}
|
||||
})
|
||||
|
@ -259,7 +259,7 @@ where
|
|||
let controller_tls = config.tls_settings.as_ref().and_then(|settings| {
|
||||
settings.controller_identity.as_ref().map(|controller_identity| {
|
||||
tls::ConnectionConfig {
|
||||
identity: controller_identity.clone(),
|
||||
server_identity: controller_identity.clone(),
|
||||
config: tls_client_config.clone(),
|
||||
}
|
||||
})
|
||||
|
|
|
@ -141,6 +141,37 @@ impl BoundPort {
|
|||
initial: T,
|
||||
f: F)
|
||||
-> impl Future<Item = (), Error = io::Error> + Send + 'static
|
||||
where
|
||||
F: Fn(T, (Connection, SocketAddr)) -> Fut + Send + 'static,
|
||||
T: Send + 'static,
|
||||
Fut: IntoFuture<Item = T, Error = std::io::Error> + Send + 'static,
|
||||
<Fut as IntoFuture>::Future: Send,
|
||||
{
|
||||
self.listen_and_fold_inner(std::u64::MAX, initial, f)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn listen_and_fold_n<T, F, Fut>(
|
||||
self,
|
||||
connection_limit: u64,
|
||||
initial: T,
|
||||
f: F)
|
||||
-> impl Future<Item = (), Error = io::Error> + Send + 'static
|
||||
where
|
||||
F: Fn(T, (Connection, SocketAddr)) -> Fut + Send + 'static,
|
||||
T: Send + 'static,
|
||||
Fut: IntoFuture<Item = T, Error = std::io::Error> + Send + 'static,
|
||||
<Fut as IntoFuture>::Future: Send,
|
||||
{
|
||||
self.listen_and_fold_inner(connection_limit, initial, f)
|
||||
}
|
||||
|
||||
fn listen_and_fold_inner<T, F, Fut>(
|
||||
self,
|
||||
connection_limit: u64,
|
||||
initial: T,
|
||||
f: F)
|
||||
-> impl Future<Item = (), Error = io::Error> + Send + 'static
|
||||
where
|
||||
F: Fn(T, (Connection, SocketAddr)) -> Fut + Send + 'static,
|
||||
T: Send + 'static,
|
||||
|
@ -156,8 +187,9 @@ impl BoundPort {
|
|||
// background reactor if `listen_and_fold` is called before we've
|
||||
// initialized the runtime.
|
||||
TcpListener::from_std(inner, &Handle::current())
|
||||
}).and_then(|listener|
|
||||
}).and_then(move |listener|
|
||||
listener.incoming()
|
||||
.take(connection_limit)
|
||||
.and_then(move |socket| {
|
||||
let remote_addr = socket.peer_addr()
|
||||
.expect("couldn't get remote addr!");
|
||||
|
@ -173,7 +205,7 @@ impl BoundPort {
|
|||
let conn = match &tls {
|
||||
Conditional::Some(tls) => {
|
||||
let tls = tls::ConnectionConfig {
|
||||
identity: tls.identity.clone(),
|
||||
server_identity: tls.server_identity.clone(),
|
||||
config: tls.config.borrow().clone(),
|
||||
};
|
||||
Either::A(ConditionallyUpgradeServerToTls::new(socket, tls))
|
||||
|
@ -265,7 +297,7 @@ impl ConditionallyUpgradeServerToTlsInner {
|
|||
}
|
||||
|
||||
let buf = self.peek_buf.as_ref();
|
||||
Ok(tls::conditional_accept::match_client_hello(buf, &self.tls.identity).into())
|
||||
Ok(tls::conditional_accept::match_client_hello(buf, &self.tls.server_identity).into())
|
||||
}
|
||||
|
||||
fn into_tls_upgrade(self) -> tls::UpgradeServerToTls {
|
||||
|
@ -298,7 +330,7 @@ impl Future for Connecting {
|
|||
Conditional::Some(config) => {
|
||||
trace!("plaintext connection established; trying to upgrade");
|
||||
let upgrade = tls::Connection::connect(
|
||||
plaintext_stream, &config.identity, config.config);
|
||||
plaintext_stream, &config.server_identity, config.config);
|
||||
ConnectingState::UpgradeToTls(upgrade)
|
||||
},
|
||||
Conditional::None(why) => {
|
||||
|
|
|
@ -0,0 +1,231 @@
|
|||
// These are basically integration tests for the `connection` submodule, but
|
||||
// they cannot be "real" integration tests because `connection` isn't a public
|
||||
// interface and because `connection` exposes a `#[cfg(test)]`-only API for use
|
||||
// by these tests.
|
||||
|
||||
use std::{
|
||||
net::SocketAddr,
|
||||
sync::mpsc,
|
||||
};
|
||||
|
||||
use tokio::{
|
||||
self,
|
||||
io,
|
||||
prelude::*,
|
||||
};
|
||||
|
||||
use conditional::Conditional;
|
||||
use config::Addr;
|
||||
use ctx::transport::TlsStatus;
|
||||
|
||||
use super::{
|
||||
connection::{self, Connection},
|
||||
tls,
|
||||
};
|
||||
|
||||
#[test]
|
||||
fn plaintext() {
|
||||
let (client_result, server_result) = run_test(
|
||||
Conditional::None(tls::ReasonForNoTls::Disabled),
|
||||
|conn| write_then_read(conn, PING),
|
||||
Conditional::None(tls::ReasonForNoTls::Disabled),
|
||||
|conn| read_then_write(conn, PING.len(), PONG));
|
||||
assert_eq!(client_result.is_tls(), false);
|
||||
assert_eq!(&client_result.result.unwrap()[..], PONG);
|
||||
assert_eq!(server_result.is_tls(), false);
|
||||
assert_eq!(&server_result.result.unwrap()[..], PING);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn proxy_to_proxy_tls_works() {
|
||||
let server_tls = tls::config_test_util::FOO_NS1.server();
|
||||
let client_tls = tls::config_test_util::BAR_NS1.client(server_tls.server_identity.clone());
|
||||
let (client_result, server_result) = run_test(
|
||||
Conditional::Some(client_tls), |conn| write_then_read(conn, PING),
|
||||
Conditional::Some(server_tls), |conn| read_then_write(conn, PING.len(), PONG));
|
||||
assert_eq!(client_result.is_tls(), true);
|
||||
assert_eq!(&client_result.result.unwrap()[..], PONG);
|
||||
assert_eq!(server_result.is_tls(), true);
|
||||
assert_eq!(&server_result.result.unwrap()[..], PING);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn proxy_to_proxy_tls_pass_through_when_identity_does_not_match() {
|
||||
let server_tls = tls::config_test_util::FOO_NS1.server();
|
||||
|
||||
// Misuse the client's identity instead of the server's identity. Any
|
||||
// identity other than `server_tls.server_identity` would work.
|
||||
let client_tls = tls::config_test_util::BAR_NS1.client(
|
||||
tls::config_test_util::BAR_NS1.to_settings().pod_identity.clone());
|
||||
|
||||
let (client_result, server_result) = run_test(
|
||||
Conditional::Some(client_tls), |conn| write_then_read(conn, PING),
|
||||
Conditional::Some(server_tls), |conn| read_then_write(conn, START_OF_TLS.len(), PONG));
|
||||
|
||||
// The server's connection will succeed with the TLS client hello passed
|
||||
// through, because the SNI doesn't match its identity.
|
||||
assert_eq!(client_result.is_tls(), false);
|
||||
assert!(client_result.result.is_err());
|
||||
assert_eq!(server_result.is_tls(), false);
|
||||
assert_eq!(&server_result.result.unwrap()[..], START_OF_TLS);
|
||||
}
|
||||
|
||||
struct Transported<R> {
|
||||
/// The value of `Connection::tls_status()` for the established connection.
|
||||
///
|
||||
/// This will be `None` if we never even get a `Connection`.
|
||||
tls_status: Option<TlsStatus>,
|
||||
|
||||
/// The connection's result.
|
||||
result: Result<R, io::Error>,
|
||||
}
|
||||
|
||||
impl<R> Transported<R> {
|
||||
fn is_tls(&self) -> bool {
|
||||
match &self.tls_status {
|
||||
Some(Conditional::Some(())) => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Runs a test for a single TCP connection. `client` processes the connection
|
||||
/// on the client side and `server` processes the connection on the server
|
||||
/// side.
|
||||
fn run_test<C, CF, CR, S, SF, SR>(
|
||||
client_tls: tls::ConditionalConnectionConfig<tls::ClientConfigWatch>,
|
||||
client: C,
|
||||
server_tls: tls::ConditionalConnectionConfig<tls::ServerConfigWatch>,
|
||||
server: S)
|
||||
-> (Transported<CR>, Transported<SR>)
|
||||
where
|
||||
// Client
|
||||
C: FnOnce(Connection) -> CF + Send + 'static,
|
||||
CF: Future<Item=CR, Error=io::Error> + Send + 'static,
|
||||
CR: Send + 'static,
|
||||
// Server
|
||||
S: Fn(Connection) -> SF + Send + 'static,
|
||||
SF: Future<Item=SR, Error=io::Error> + Send + 'static,
|
||||
SR: Send + 'static,
|
||||
{
|
||||
let _ = ::env_logger::try_init();
|
||||
|
||||
// A future that will receive a single connection.
|
||||
let (server, server_addr, server_result) = {
|
||||
// Saves the result of every connection.
|
||||
let (sender, receiver) = mpsc::channel::<Transported<SR>>();
|
||||
|
||||
// Let the OS decide the port number and then return the resulting
|
||||
// `SocketAddr` so the client can connect to it. This allows multiple
|
||||
// tests to run at once, which wouldn't work if they all were bound on
|
||||
// a fixed port.
|
||||
let addr = "127.0.0.1:0".parse::<SocketAddr>().unwrap();
|
||||
let server_bound = connection::BoundPort::new(Addr::from(addr), server_tls)
|
||||
.unwrap();
|
||||
let server_addr = server_bound.local_addr();
|
||||
|
||||
let connection_limit = 1; // TODO: allow caller to set this.
|
||||
|
||||
let server = server_bound
|
||||
.listen_and_fold_n(connection_limit, sender, move |sender, (conn, _)| {
|
||||
let tls_status = Some(conn.tls_status());
|
||||
trace!("server tls_status: {:?}", tls_status);
|
||||
server(conn)
|
||||
.then(move |result| {
|
||||
sender.send(Transported { tls_status, result, }).unwrap();
|
||||
Ok(sender)
|
||||
})
|
||||
})
|
||||
.map_err(|e| panic!("Unexpected server error: {:?}", e));
|
||||
|
||||
(server, server_addr, receiver)
|
||||
};
|
||||
|
||||
// A future that will open a single connection to the server.
|
||||
let (client, client_result) = {
|
||||
let tls = client_tls.and_then(|conn_cfg| {
|
||||
let server_identity = conn_cfg.server_identity.clone();
|
||||
(*conn_cfg.config.borrow()).as_ref().map(|cfg| {
|
||||
tls::ConnectionConfig {
|
||||
server_identity,
|
||||
config: cfg.clone(),
|
||||
}
|
||||
})
|
||||
});
|
||||
|
||||
// Saves the result of the single connection. This could be a simpler
|
||||
// type, e.g. `Arc<Mutex>`, but using a channel simplifies the code and
|
||||
// parallels the server side.
|
||||
let (sender, receiver) = mpsc::channel::<Transported<CR>>();
|
||||
let sender_clone = sender.clone();
|
||||
|
||||
let client = connection::connect(&server_addr, tls)
|
||||
.map_err(move |e| {
|
||||
sender_clone.send(Transported { tls_status: None, result: Err(e) }).unwrap();
|
||||
()
|
||||
})
|
||||
.and_then(|conn| {
|
||||
let tls_status = Some(conn.tls_status());
|
||||
trace!("client tls_status: {:?}", tls_status);
|
||||
client(conn)
|
||||
.then(move |result| {
|
||||
sender.send(Transported { tls_status, result }).unwrap();
|
||||
Ok(())
|
||||
})
|
||||
});
|
||||
|
||||
(client, receiver)
|
||||
};
|
||||
|
||||
tokio::run({
|
||||
server.join(client)
|
||||
.map(|_| ())
|
||||
});
|
||||
|
||||
let client_result = client_result.try_recv().unwrap();
|
||||
|
||||
// XXX: This assumes that only one connection is accepted. TODO: allow the
|
||||
// caller to observe the results for every connection, once we have tests
|
||||
// that allow accepting multiple connections.
|
||||
let server_result = server_result.try_recv().unwrap();
|
||||
|
||||
(client_result, server_result)
|
||||
}
|
||||
|
||||
/// Writes `to_write` and shuts down the write side, then reads until EOF,
|
||||
/// returning the bytes read.
|
||||
fn write_then_read(conn: Connection, to_write: &'static [u8])
|
||||
-> impl Future<Item=Vec<u8>, Error=io::Error>
|
||||
{
|
||||
write_and_shutdown(conn, to_write)
|
||||
.and_then(|conn| io::read_to_end(conn, Vec::new()))
|
||||
.map(|(_conn, r)| r)
|
||||
}
|
||||
|
||||
/// Reads until EOF then writes `to_write` and shuts down the write side,
|
||||
/// returning the bytes read.
|
||||
fn read_then_write(conn: Connection, read_prefix_len: usize, to_write: &'static [u8])
|
||||
-> impl Future<Item=Vec<u8>, Error=io::Error>
|
||||
{
|
||||
io::read_exact(conn, vec![0; read_prefix_len])
|
||||
.and_then(move |(conn, r)| {
|
||||
write_and_shutdown(conn, to_write)
|
||||
.map(|_conn| r)
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
/// writes `to_write` to `conn` and then shuts down the write side of `conn`.
|
||||
fn write_and_shutdown(conn: connection::Connection, to_write: &'static [u8])
|
||||
-> impl Future<Item=Connection, Error=io::Error>
|
||||
{
|
||||
io::write_all(conn, to_write)
|
||||
.and_then(|(mut conn, _)| {
|
||||
conn.shutdown()?;
|
||||
Ok(conn)
|
||||
})
|
||||
}
|
||||
|
||||
const PING: &[u8] = b"ping";
|
||||
const PONG: &[u8] = b"pong";
|
||||
const START_OF_TLS: &[u8] = &[22, 3, 1]; // ContentType::handshake version 3.1
|
|
@ -5,6 +5,9 @@ mod io;
|
|||
mod prefixed;
|
||||
pub mod tls;
|
||||
|
||||
#[cfg(test)]
|
||||
mod connection_tests;
|
||||
|
||||
pub use self::{
|
||||
addr_info::{
|
||||
AddrInfo,
|
||||
|
|
|
@ -76,6 +76,14 @@ impl std::fmt::Debug for ClientConfig {
|
|||
#[derive(Clone)]
|
||||
pub struct ServerConfig(pub(super) Arc<rustls::ServerConfig>);
|
||||
|
||||
/// XXX: `rustls::ServerConfig` doesn't implement `Debug` yet.
|
||||
impl std::fmt::Debug for ServerConfig {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
|
||||
f.debug_struct("ServerConfig")
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
pub type ClientConfigWatch = Watch<Conditional<ClientConfig, ReasonForNoTls>>;
|
||||
pub type ServerConfigWatch = Watch<ServerConfig>;
|
||||
|
||||
|
@ -83,7 +91,7 @@ pub type ServerConfigWatch = Watch<ServerConfig>;
|
|||
/// (`ServerConfig`) TLS connection.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ConnectionConfig<C> where C: Clone {
|
||||
pub identity: Identity,
|
||||
pub server_identity: Identity,
|
||||
pub config: C,
|
||||
}
|
||||
|
||||
|
@ -475,11 +483,11 @@ pub(super) const SIGNATURE_ALG_RUSTLS_ALGORITHM: rustls::internal::msgs::enums::
|
|||
rustls::internal::msgs::enums::SignatureAlgorithm::ECDSA;
|
||||
|
||||
#[cfg(test)]
|
||||
mod test_util {
|
||||
pub mod test_util {
|
||||
use super::*;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use conditional::Conditional;
|
||||
use tls::{CommonSettings, Identity, ReasonForNoIdentity};
|
||||
use tls::Identity;
|
||||
|
||||
pub struct Strings {
|
||||
pub identity: &'static str,
|
||||
|
@ -495,6 +503,13 @@ mod test_util {
|
|||
private_key: "foo-ns1-ca1.p8",
|
||||
};
|
||||
|
||||
pub static BAR_NS1: Strings = Strings {
|
||||
identity: "bar.deployment.ns1.linkerd-managed.linkerd.svc.cluster.local",
|
||||
trust_anchors: "ca1.pem",
|
||||
end_entity_cert: "bar-ns1-ca1.crt",
|
||||
private_key: "bar-ns1-ca1.p8",
|
||||
};
|
||||
|
||||
impl Strings {
|
||||
pub fn to_settings(&self) -> CommonSettings {
|
||||
let dir = PathBuf::from("src/transport/tls/testdata");
|
||||
|
@ -506,6 +521,39 @@ mod test_util {
|
|||
private_key: dir.join(self.private_key),
|
||||
}
|
||||
}
|
||||
|
||||
// Returns a `ConnectionConfig<ClientConfigWatch>` preloaded with a
|
||||
// valid client TLS configuration.
|
||||
pub fn client(&self, server_identity: Identity) -> ConnectionConfig<ClientConfigWatch>
|
||||
{
|
||||
let settings = self.to_settings();
|
||||
let mut config_watch = ConfigWatch::new(Conditional::Some(settings.clone()));
|
||||
let common = CommonConfig::load_from_disk(&settings).unwrap();
|
||||
config_watch.client_store.store(Conditional::Some(ClientConfig::from(&common)))
|
||||
.unwrap();
|
||||
ConnectionConfig {
|
||||
server_identity: server_identity,
|
||||
config: config_watch.client,
|
||||
}
|
||||
}
|
||||
|
||||
// Returns a `ConnectionConfig<ServerConfigWatch>` preloaded with a
|
||||
// valid server TLS configuration.
|
||||
pub fn server(&self) -> ConnectionConfig<ServerConfigWatch>
|
||||
{
|
||||
let settings = self.to_settings();
|
||||
let mut config_watch = ConfigWatch::new(Conditional::Some(settings.clone()));
|
||||
let common = CommonConfig::load_from_disk(&settings).unwrap();
|
||||
config_watch.server_store.store(ServerConfig::from(&common)).unwrap();
|
||||
let config = match config_watch.server {
|
||||
Conditional::Some(watch) => watch,
|
||||
Conditional::None(_) => unreachable!(),
|
||||
};
|
||||
ConnectionConfig {
|
||||
server_identity: settings.pod_identity,
|
||||
config,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -36,3 +36,6 @@ pub use self::{
|
|||
identity::Identity,
|
||||
rustls::TLSError as Error,
|
||||
};
|
||||
|
||||
#[cfg(test)]
|
||||
pub use self::config::test_util as config_test_util;
|
||||
|
|
Loading…
Reference in New Issue