transport: introduce Bind trait and move orig dst to the type level (#957)

This branch introduces a new `transport::listen::Bind` trait that
abstracts over binding and listening on a socket. Now, rather than
owning an instance of  the `BindTcp` type, `ServerConfig`s are passed as
a parameter to a type implementing the `Bind` trait. This will
eventually allow end-to-end testing without creating actual TCP sockets,
as `Bind` produces a `Stream` of an IO type and an `Addrs` type.

PR #955 changed the proxy stacks to be a target type which implements
the `Param` trait for various types of address, rather than a fixed
type. This PR also changes the inbound and outbound proxies to take
targets which implement `Param<OrigDstAddr>`, rather than requiring
`Param<Option<OrigDstAddr>>` and punting fallibility to every point
where original dst addresses are used.

The `BindTcp` type is now generic over a more general `GetAddrs` trait,
rather than a `SO_ORIGINAL_DST`-specific `OrigDstAddr` trait. Rather
than always producing a `listen::Addrs` with an `Option<OrigDstAddr>`,
we now produce a variable addresses type, which may or may not incldue
the original destination address (the admin and tap servers don't
require SO_ORIGINAL_DST). This allows whether or not original
destination addresses are included to be determined at compile time
rather than at runtime.

Finally, the incorrect feature flagging of mock original destination
addresses has been fixed, The "mock-orig-dst" feature has been removed,
and the integration tests now simply construct their own mock `GetAddrs`
function that's used to configure the test proxy. In the future, we
should be able to refactor the tests to avoid this by simply passing in
their own mock `Bind` types which produce streams of in-memory mock
connections, rather than actual TCP connections.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Co-authored-by: Oliver Gould <ver@buoyant.io>
This commit is contained in:
Eliza Weisman 2021-03-31 14:30:36 -07:00 committed by GitHub
parent 110e9bea59
commit 07e9249769
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 633 additions and 637 deletions

View File

@ -31,9 +31,7 @@ FROM $RUST_IMAGE as build
# When set, causes the proxy to be compiled in development mode.
ARG PROXY_UNOPTIMIZED
# Controls what features are enabled in the proxy. This is typically empty but
# may be set to `mock-orig-dst` for profiling builds, or to `multithreaded` to
# enable the multithreaded Tokio runtime.
# Controls what features are enabled in the proxy.
ARG PROXY_FEATURES
RUN --mount=type=cache,target=/var/lib/apt/lists \

View File

@ -13,7 +13,6 @@ This is used by tests and the executable.
[features]
allow-loopback = ["linkerd-app-outbound/allow-loopback"]
mock-orig-dst = ["linkerd-app-core/mock-orig-dst"]
[dependencies]
futures = "0.3.9"

View File

@ -12,9 +12,6 @@ This crate conglomerates proxy configuration, runtime administration, etc,
independently of the inbound and outbound proxy logic.
"""
[features]
mock-orig-dst = ["linkerd-proxy-transport/mock-orig-dst"]
[dependencies]
bytes = "1"
http = "0.2"

View File

@ -1,11 +1,15 @@
pub use crate::exp_backoff::ExponentialBackoff;
pub use crate::proxy::http::{h1, h2};
pub use crate::transport::{BindTcp, DefaultOrigDstAddr, GetOrigDstAddr, Keepalive, NoOrigDstAddr};
use crate::{
proxy::http::{h1, h2},
svc::Param,
transport::{Keepalive, ListenAddr},
};
use std::time::Duration;
#[derive(Clone, Debug)]
pub struct ServerConfig<A = NoOrigDstAddr> {
pub bind: BindTcp<A>,
pub struct ServerConfig {
pub addr: ListenAddr,
pub keepalive: Keepalive,
pub h2_settings: h2::Settings,
}
@ -20,7 +24,7 @@ pub struct ConnectConfig {
#[derive(Clone, Debug)]
pub struct ProxyConfig {
pub server: ServerConfig<DefaultOrigDstAddr>,
pub server: ServerConfig,
pub connect: ConnectConfig,
pub buffer_capacity: usize,
pub cache_max_idle_age: Duration,
@ -31,11 +35,14 @@ pub struct ProxyConfig {
// === impl ServerConfig ===
impl<A: GetOrigDstAddr> ServerConfig<A> {
pub fn with_orig_dst_addr<B: GetOrigDstAddr>(self, orig_dst_addrs: B) -> ServerConfig<B> {
ServerConfig {
bind: self.bind.with_orig_dst_addr(orig_dst_addrs),
h2_settings: self.h2_settings,
impl Param<ListenAddr> for ServerConfig {
fn param(&self) -> ListenAddr {
self.addr
}
}
impl Param<Keepalive> for ServerConfig {
fn param(&self) -> Keepalive {
self.keepalive
}
}

View File

@ -1,26 +1,28 @@
use crate::io;
use crate::svc;
use crate::{
io,
svc::{self, Param},
transport::{ClientAddr, Remote},
};
use futures::prelude::*;
use linkerd_error::Error;
use linkerd_proxy_transport::listen::Addrs;
use tower::util::ServiceExt;
use tracing::instrument::Instrument;
use tracing::{debug, debug_span, info, warn};
use tracing::{debug, debug_span, info, instrument::Instrument, warn};
/// Spawns a task that binds an `L`-typed listener with an `A`-typed
/// connection-accepting service.
///
/// The task is driven until shutdown is signaled.
pub async fn serve<M, A, I>(
listen: impl Stream<Item = std::io::Result<(Addrs, I)>>,
pub async fn serve<M, S, I, A>(
listen: impl Stream<Item = std::io::Result<(A, I)>>,
mut new_accept: M,
shutdown: impl Future,
) where
I: Send + 'static,
M: svc::NewService<Addrs, Service = A>,
A: tower::Service<io::ScopedIo<I>, Response = ()> + Send + 'static,
A::Error: Into<Error>,
A::Future: Send + 'static,
A: Param<Remote<ClientAddr>>,
M: svc::NewService<A, Service = S>,
S: tower::Service<io::ScopedIo<I>, Response = ()> + Send + 'static,
S::Error: Into<Error>,
S::Future: Send + 'static,
{
let accept = async move {
futures::pin_mut!(listen);
@ -38,7 +40,7 @@ pub async fn serve<M, A, I>(
};
// The local addr should be instrumented from the listener's context.
let span = debug_span!("accept", client.addr = %addrs.client());
let span = debug_span!("accept", client.addr = %addrs.param());
let accept = span.in_scope(|| new_accept.new_service(addrs));

View File

@ -71,7 +71,7 @@ impl<N> Inbound<N> {
> + Clone,
>
where
T: Param<Remote<ClientAddr>> + Param<Option<OrigDstAddr>> + Clone + Send + 'static,
T: Param<Remote<ClientAddr>> + Param<OrigDstAddr> + Clone + Send + 'static,
I: io::AsyncRead + io::AsyncWrite + io::Peek + io::PeerAddr,
I: Debug + Send + Sync + Unpin + 'static,
N: svc::NewService<TcpEndpoint, Service = NSvc> + Clone + Send + Sync + Unpin + 'static,
@ -173,7 +173,7 @@ impl<N> Inbound<N> {
impl<T> TryFrom<(tls::ConditionalServerTls, T)> for ClientInfo
where
T: Param<Option<OrigDstAddr>>,
T: Param<OrigDstAddr>,
T: Param<Remote<ClientAddr>>,
{
type Error = Error;
@ -184,14 +184,7 @@ where
client_id: Some(client_id),
negotiated_protocol,
}) => {
let local: Option<OrigDstAddr> = addrs.param();
let OrigDstAddr(local_addr) = local.ok_or_else(|| {
tracing::warn!("No SO_ORIGINAL_DST address found!");
std::io::Error::new(
std::io::ErrorKind::NotFound,
"No SO_ORIGINAL_DST address found",
)
})?;
let OrigDstAddr(local_addr) = addrs.param();
Ok(Self {
client_id,
alpn: negotiated_protocol,

View File

@ -71,7 +71,7 @@ async fn unmeshed_http1_hello_world() {
profile_tx.send(profile::Profile::default()).unwrap();
// Build the outbound server
let cfg = default_config(accept.tcp.target_addr);
let cfg = default_config();
let (rt, _shutdown) = runtime();
let server = build_server(cfg, rt, profiles, connect).new_service(accept);
let (mut client, bg) = http_util::connect_and_accept(&mut client, server).await;
@ -118,7 +118,7 @@ async fn downgrade_origin_form() {
profile_tx.send(profile::Profile::default()).unwrap();
// Build the outbound server
let cfg = default_config(accept.tcp.target_addr);
let cfg = default_config();
let (rt, _shutdown) = runtime();
let server = build_server(cfg, rt, profiles, connect).new_service(accept);
let (mut client, bg) = http_util::connect_and_accept(&mut client, server).await;
@ -166,7 +166,7 @@ async fn downgrade_absolute_form() {
profile_tx.send(profile::Profile::default()).unwrap();
// Build the outbound server
let cfg = default_config(accept.tcp.target_addr);
let cfg = default_config();
let (rt, _shutdown) = runtime();
let server = build_server(cfg, rt, profiles, connect).new_service(accept);
let (mut client, bg) = http_util::connect_and_accept(&mut client, server).await;

View File

@ -21,16 +21,14 @@ use self::{
target::{HttpAccept, TcpAccept},
};
use linkerd_app_core::{
config::{ConnectConfig, ProxyConfig},
config::{ConnectConfig, ProxyConfig, ServerConfig},
detect, drain, io, metrics, profiles,
proxy::tcp,
serve,
svc::{self, Param},
tls,
transport::{self, ClientAddr, OrigDstAddr, Remote, ServerAddr},
serve, svc, tls,
transport::{self, listen::Bind, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr},
Error, NameMatch, ProxyRuntime,
};
use std::{convert::TryFrom, fmt::Debug, future::Future, net::SocketAddr, time::Duration};
use std::{convert::TryFrom, fmt::Debug, future::Future, time::Duration};
use tracing::{debug_span, info_span};
#[derive(Clone, Debug)]
@ -52,7 +50,7 @@ pub struct Inbound<S> {
stack: svc::Stack<S>,
}
// === impl Config ===
// === impl Inbound ===
impl<S> Inbound<S> {
pub fn config(&self) -> &Config {
@ -89,7 +87,7 @@ impl Inbound<()> {
}
}
pub fn to_tcp_connect<T: Param<u16>>(
pub fn to_tcp_connect<T: svc::Param<u16>>(
&self,
) -> Inbound<
impl svc::Service<
@ -122,35 +120,34 @@ impl Inbound<()> {
}
}
pub fn serve<G, GSvc, P>(
pub fn serve<B, G, GSvc, P>(
self,
bind: B,
profiles: P,
gateway: G,
) -> (SocketAddr, impl Future<Output = ()> + Send)
) -> (Local<ServerAddr>, impl Future<Output = ()> + Send)
where
B: Bind<ServerConfig>,
B::Addrs: svc::Param<Remote<ClientAddr>>
+ svc::Param<Local<ServerAddr>>
+ svc::Param<OrigDstAddr>,
G: svc::NewService<direct::GatewayConnection, Service = GSvc>,
G: Clone + Send + Sync + Unpin + 'static,
GSvc: svc::Service<direct::GatewayIo<io::ScopedIo<tokio::net::TcpStream>>, Response = ()>
+ Send
+ 'static,
GSvc: svc::Service<direct::GatewayIo<io::ScopedIo<B::Io>>, Response = ()> + Send + 'static,
GSvc::Error: Into<Error>,
GSvc::Future: Send,
P: profiles::GetProfile<profiles::LookupAddr> + Clone + Send + Sync + 'static,
P::Error: Send,
P::Future: Send,
{
let (listen_addr, listen) = self
.config
.proxy
.server
.bind
.bind()
let (listen_addr, listen) = bind
.bind(&self.config.proxy.server)
.expect("Failed to bind inbound listener");
let serve = async move {
let stack = self
.to_tcp_connect()
.into_server(listen_addr.port(), profiles, gateway);
let stack =
self.to_tcp_connect()
.into_server(listen_addr.as_ref().port(), profiles, gateway);
let shutdown = self.runtime.drain.signaled();
serve::serve(listen, stack, shutdown).await
};
@ -218,7 +215,7 @@ where
Service = impl svc::Service<I, Response = (), Error = Error, Future = impl Send>,
> + Clone
where
T: Param<Remote<ClientAddr>> + Param<Option<OrigDstAddr>> + Clone + Send + 'static,
T: svc::Param<Remote<ClientAddr>> + svc::Param<OrigDstAddr> + Clone + Send + 'static,
I: io::AsyncRead + io::AsyncWrite + io::Peek + io::PeerAddr,
I: Debug + Send + Sync + Unpin + 'static,
G: svc::NewService<direct::GatewayConnection, Service = GSvc>,
@ -267,7 +264,7 @@ where
.stack
.push_map_target(TcpEndpoint::from)
.push(self.runtime.metrics.transport.layer_accept())
.push_request_filter(TcpAccept::port_skipped)
.push_map_target(TcpAccept::port_skipped)
.check_new_service::<T, _>()
.instrument(|_: &T| debug_span!("forward"))
.into_inner(),
@ -282,13 +279,9 @@ where
.into_inner(),
)
.instrument(|a: &T| {
if let Some(OrigDstAddr(target_addr)) = a.param() {
let OrigDstAddr(target_addr) = a.param();
info_span!("server", port = target_addr.port())
} else {
info_span!("server", port = %"<no original dst>")
}
})
.check_new_service::<T, I>()
.into_inner()
}
}
@ -303,18 +296,12 @@ impl From<indexmap::IndexSet<u16>> for SkipByPort {
impl<T> svc::Predicate<T> for SkipByPort
where
T: Param<Option<OrigDstAddr>>,
T: svc::Param<OrigDstAddr>,
{
type Request = svc::Either<T, T>;
fn check(&mut self, t: T) -> Result<Self::Request, Error> {
let OrigDstAddr(addr) = t.param().ok_or_else(|| {
tracing::warn!("No SO_ORIGINAL_DST address found!");
std::io::Error::new(
std::io::ErrorKind::NotFound,
"No SO_ORIGINAL_DST address found",
)
})?;
let OrigDstAddr(addr) = t.param();
if !self.0.contains(&addr.port()) {
Ok(svc::Either::A(t))
} else {

View File

@ -44,17 +44,11 @@ impl PreventLoop {
}
}
impl<T: Param<Option<OrigDstAddr>>> Predicate<T> for SwitchLoop {
impl<T: Param<OrigDstAddr>> Predicate<T> for SwitchLoop {
type Request = Either<T, T>;
fn check(&mut self, addrs: T) -> Result<Either<T, T>, Error> {
let OrigDstAddr(addr) = addrs.param().ok_or_else(|| {
tracing::warn!("No SO_ORIGINAL_DST address found!");
std::io::Error::new(
std::io::ErrorKind::NotFound,
"No SO_ORIGINAL_DST address found",
)
})?;
let OrigDstAddr(addr) = addrs.param();
tracing::debug!(%addr, self.port);
if addr.port() != self.port {
Ok(Either::A(addrs))

View File

@ -10,13 +10,7 @@ use linkerd_app_core::{
transport_header::TransportHeader,
Addr, Conditional, Error, CANONICAL_DST_HEADER, DST_OVERRIDE_HEADER,
};
use std::{
convert::{TryFrom, TryInto},
io,
net::SocketAddr,
str::FromStr,
sync::Arc,
};
use std::{convert::TryInto, net::SocketAddr, str::FromStr, sync::Arc};
use tracing::debug;
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
@ -66,30 +60,25 @@ pub struct RequestTarget {
// === impl TcpAccept ===
impl TcpAccept {
pub fn port_skipped<T>(tcp: T) -> Result<Self, io::Error>
pub fn port_skipped<T>(tcp: T) -> Self
where
T: Param<Remote<ClientAddr>> + Param<Option<OrigDstAddr>>,
T: Param<Remote<ClientAddr>> + Param<OrigDstAddr>,
{
let orig_dst: Option<OrigDstAddr> = tcp.param();
let OrigDstAddr(target_addr) = orig_dst.ok_or_else(|| {
tracing::warn!("No SO_ORIGINAL_DST address found!");
io::Error::new(io::ErrorKind::NotFound, "No SO_ORIGINAL_DST address found")
})?;
Ok(Self {
let OrigDstAddr(target_addr) = tcp.param();
Self {
target_addr,
client_addr: tcp.param(),
tls: Conditional::None(tls::NoServerTls::PortSkipped),
})
}
}
}
/// Returns a `TcpAccept` for the provided TLS metadata and addresses,
/// determining the target address from the server's local listener address
/// rather than a `SO_ORIGINAL_DST` address.
pub fn from_local_addr<T>((tls, addrs): tls::server::Meta<T>) -> Self
impl<T> From<tls::server::Meta<T>> for TcpAccept
where
T: Param<Remote<ClientAddr>> + Param<Local<ServerAddr>>,
T: Param<Remote<ClientAddr>> + Param<OrigDstAddr>,
{
let Local(ServerAddr(target_addr)) = addrs.param();
fn from((tls, addrs): tls::server::Meta<T>) -> Self {
let OrigDstAddr(target_addr) = addrs.param();
Self {
target_addr,
client_addr: addrs.param(),
@ -98,25 +87,6 @@ impl TcpAccept {
}
}
impl<T> TryFrom<tls::server::Meta<T>> for TcpAccept
where
T: Param<Remote<ClientAddr>> + Param<Option<OrigDstAddr>>,
{
type Error = io::Error;
fn try_from((tls, addrs): tls::server::Meta<T>) -> Result<Self, Self::Error> {
let orig_dst: Option<OrigDstAddr> = addrs.param();
let OrigDstAddr(target_addr) = orig_dst.ok_or_else(|| {
tracing::warn!("No SO_ORIGINAL_DST address found!");
io::Error::new(io::ErrorKind::NotFound, "No SO_ORIGINAL_DST address found")
})?;
Ok(Self {
target_addr,
client_addr: addrs.param(),
tls,
})
}
}
impl Param<SocketAddr> for TcpAccept {
fn param(&self) -> SocketAddr {
self.target_addr

View File

@ -8,15 +8,13 @@ use linkerd_app_core::{
http::{h1, h2},
tap,
},
transport::{BindTcp, Keepalive, ListenAddr},
transport::{Keepalive, ListenAddr},
NameMatch, ProxyRuntime,
};
pub use linkerd_app_test as support;
use std::{net::SocketAddr, time::Duration};
use std::time::Duration;
const LOCALHOST: [u8; 4] = [127, 0, 0, 1];
pub fn default_config(orig_dst: SocketAddr) -> Config {
pub fn default_config() -> Config {
let cluster_local = "svc.cluster.local."
.parse::<Suffix>()
.expect("`svc.cluster.local.` suffix is definitely valid");
@ -24,11 +22,8 @@ pub fn default_config(orig_dst: SocketAddr) -> Config {
allow_discovery: NameMatch::new(Some(cluster_local)),
proxy: config::ProxyConfig {
server: config::ServerConfig {
bind: BindTcp::new(
ListenAddr(SocketAddr::new(LOCALHOST.into(), 0)),
Keepalive(None),
)
.with_orig_dst_addr(orig_dst.into()),
addr: ListenAddr(([0, 0, 0, 0], 0).into()),
keepalive: Keepalive(None),
h2_settings: h2::Settings::default(),
},
connect: config::ConnectConfig {

View File

@ -24,8 +24,8 @@ http = "0.2"
http-body = "0.4"
hyper = { version = "0.14.2", features = ["http1", "http2", "stream", "client", "server"] }
linkerd-channel = { path = "../../channel" }
linkerd-app = { path = "..", features = ["allow-loopback", "mock-orig-dst"] }
linkerd-app-core = { path = "../core", features = ["mock-orig-dst"] }
linkerd-app = { path = "..", features = ["allow-loopback"] }
linkerd-app-core = { path = "../core" }
linkerd-metrics = { path = "../../metrics", features = ["test_util"] }
linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", tag = "v0.1.18", features = ["arbitrary"] }
linkerd-app-test = { path = "../test" }

View File

@ -1,5 +1,11 @@
use super::*;
use std::{future::Future, pin::Pin, task::Poll, thread};
use app_core::transport::OrigDstAddr;
use linkerd_app_core::{
svc::Param,
transport::{listen, orig_dst, Keepalive, ListenAddr},
};
use std::{future::Future, net::SocketAddr, pin::Pin, task::Poll, thread};
use tokio::net::TcpStream;
use tracing::instrument::Instrument;
pub fn new() -> Proxy {
@ -26,6 +32,9 @@ pub struct Proxy {
shutdown_signal: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
}
#[derive(Copy, Clone, Debug)]
struct MockOrigDst(Option<SocketAddr>);
pub struct Listening {
pub tap: Option<SocketAddr>,
pub inbound: SocketAddr,
@ -44,6 +53,35 @@ pub struct Listening {
thread: thread::JoinHandle<()>,
}
// === impl MockOrigDst ===
impl<T> listen::Bind<T> for MockOrigDst
where
T: Param<Keepalive> + Param<ListenAddr>,
{
type Addrs = orig_dst::Addrs;
type Io = tokio::net::TcpStream;
type Incoming = Pin<
Box<dyn Stream<Item = io::Result<(orig_dst::Addrs, TcpStream)>> + Send + Sync + 'static>,
>;
fn bind(self, params: &T) -> io::Result<listen::Bound<Self::Incoming>> {
let (bound, incoming) = listen::BindTcp::default().bind(params)?;
let addr = self.0;
let incoming = Box::pin(incoming.map(move |res| {
let (inner, tcp) = res?;
let orig_dst = addr
.map(OrigDstAddr)
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "No mocked SO_ORIG_DST"))?;
let addrs = orig_dst::Addrs { inner, orig_dst };
Ok((addrs, tcp))
}));
Ok((bound, incoming))
}
}
// === impl Proxy ===
impl Proxy {
/// Pass a customized support `Controller` for this proxy to use.
///
@ -200,16 +238,6 @@ async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening {
env.put(app::env::ENV_OUTBOUND_LISTEN_ADDR, "127.0.0.1:0".to_owned());
}
// If a given server is missing, use the admin server as a substitute.
env.put(
app::env::ENV_INBOUND_ORIG_DST_ADDR,
inbound.unwrap_or(controller.addr).to_string(),
);
env.put(
app::env::ENV_OUTBOUND_ORIG_DST_ADDR,
outbound.unwrap_or(controller.addr).to_string(),
);
if random_ports {
env.put(app::env::ENV_INBOUND_LISTEN_ADDR, "127.0.0.1:0".to_owned());
env.put(app::env::ENV_CONTROL_LISTEN_ADDR, "127.0.0.1:0".to_owned());
@ -264,6 +292,7 @@ async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening {
}
let config = app::env::parse_config(&env).unwrap();
let dispatch = tracing::Dispatch::default();
let (trace, trace_handle) = if dispatch
.downcast_ref::<tracing_subscriber::fmt::TestWriter>()
@ -301,9 +330,12 @@ async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening {
.build()
.expect("proxy")
.block_on(async move {
let bind_in = MockOrigDst(inbound);
let bind_out = MockOrigDst(outbound);
let bind_adm = listen::BindTcp::default();
let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel();
let main = config
.build(shutdown_tx, trace_handle)
.build(bind_in, bind_out, bind_adm, shutdown_tx, trace_handle)
.await
.expect("config");
@ -358,14 +390,13 @@ async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening {
outbound.addr = ?outbound_addr,
outbound.orig_dst = ?outbound.as_ref(),
metrics.addr = ?metrics_addr,
"proxy running",
);
Listening {
tap: tap_addr,
inbound: inbound_addr,
outbound: outbound_addr,
metrics: metrics_addr,
tap: tap_addr.map(Into::into),
inbound: inbound_addr.into(),
outbound: outbound_addr.into(),
metrics: metrics_addr.into(),
outbound_server: proxy.outbound_server,
inbound_server: proxy.inbound_server,

View File

@ -22,7 +22,7 @@ impl<N> Outbound<N> {
>,
>
where
T: Param<Option<OrigDstAddr>>,
T: Param<OrigDstAddr>,
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + std::fmt::Debug + Send + Unpin + 'static,
N: svc::NewService<tcp::Logical, Service = NSvc> + Clone + Send + 'static,
NSvc: svc::Service<SensorIo<I>, Response = (), Error = Error> + Send + 'static,

View File

@ -13,7 +13,7 @@ use linkerd_app_core::{
io,
svc::{self, NewService},
tls,
transport::{listen, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr},
transport::orig_dst,
Error, NameAddr, ProxyRuntime,
};
use std::{
@ -34,7 +34,7 @@ fn build_server<I>(
resolver: resolver::Dst<resolver::Metadata>,
connect: Connect<Endpoint>,
) -> impl svc::NewService<
listen::Addrs,
orig_dst::Addrs,
Service = impl tower::Service<
I,
Response = (),
@ -97,6 +97,7 @@ impl<I> svc::Service<I> for NoTcpBalancer {
type Response = ();
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'static>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
unreachable!("no TCP load balancer should be created in this test!");
}
@ -114,7 +115,7 @@ async fn profile_endpoint_propagates_conn_errors() {
let ep1 = SocketAddr::new([10, 0, 0, 41].into(), 5550);
let cfg = default_config(ep1);
let cfg = default_config();
let id = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local")
.expect("hostname is invalid");
let meta = support::resolver::Metadata::new(
@ -217,7 +218,7 @@ async fn meshed_hello_world() {
let _trace = support::trace_init();
let ep1 = SocketAddr::new([10, 0, 0, 41].into(), 5550);
let cfg = default_config(ep1);
let cfg = default_config();
let id = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local")
.expect("hostname is invalid");
let svc_addr = NameAddr::from_str("foo.ns1.svc.example.com:5550").unwrap();
@ -267,7 +268,7 @@ async fn stacks_idle_out() {
let ep1 = SocketAddr::new([10, 0, 0, 41].into(), 5550);
let idle_timeout = Duration::from_millis(500);
let mut cfg = default_config(ep1);
let mut cfg = default_config();
cfg.proxy.cache_max_idle_age = idle_timeout;
let id = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local")
@ -332,7 +333,7 @@ async fn active_stacks_dont_idle_out() {
let ep1 = SocketAddr::new([10, 0, 0, 41].into(), 5550);
let idle_timeout = Duration::from_millis(500);
let mut cfg = default_config(ep1);
let mut cfg = default_config();
cfg.proxy.cache_max_idle_age = idle_timeout;
let id = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local")
@ -422,14 +423,6 @@ async fn active_stacks_dont_idle_out() {
proxy_bg.await.unwrap();
}
pub fn addrs(od: SocketAddr) -> listen::Addrs {
listen::Addrs::new(
Local(ServerAddr(([127, 0, 0, 1], 4140).into())),
Remote(ClientAddr(([127, 0, 0, 1], 666).into())),
Some(OrigDstAddr(od)),
)
}
async fn unmeshed_hello_world(
server_settings: hyper::server::conn::Http,
mut client_settings: ClientBuilder,
@ -437,7 +430,7 @@ async fn unmeshed_hello_world(
let _trace = support::trace_init();
let ep1 = SocketAddr::new([10, 0, 0, 41].into(), 5550);
let cfg = default_config(ep1);
let cfg = default_config();
// Build a mock "connector" that returns the upstream "server" IO.
let connect = support::connect().endpoint_fn_boxed(ep1, hello_server(server_settings));

View File

@ -5,10 +5,9 @@ use linkerd_app_core::{
io, profiles,
svc::{self, stack::Param},
tls,
transport::{self, OrigDstAddr},
transport::{self, ClientAddr, OrigDstAddr, Remote},
Addr, AddrMatch, Error,
};
use std::convert::TryFrom;
use tracing::{debug_span, info_span};
impl Outbound<()> {
@ -28,7 +27,7 @@ impl Outbound<()> {
Service = impl svc::Service<I, Response = (), Error = Error, Future = impl Send>,
>
where
T: Param<Option<OrigDstAddr>>,
T: Param<OrigDstAddr> + Param<Remote<ClientAddr>> + Clone + Send + Sync + 'static,
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + std::fmt::Debug + Send + Unpin + 'static,
N: svc::NewService<tcp::Endpoint, Service = NSvc> + Clone + Send + Sync + 'static,
NSvc: svc::Service<io::PrefixedIo<transport::metrics::SensorIo<I>>, Response = ()>
@ -136,7 +135,10 @@ impl Outbound<()> {
))
.push(self.runtime.metrics.transport.layer_accept())
.instrument(|a: &tcp::Accept| info_span!("ingress", orig_dst = %a.orig_dst))
.push_request_filter(|a: T| tcp::Accept::try_from(a.param()))
.push_map_target(|a: T| {
let orig_dst = Param::<OrigDstAddr>::param(&a);
tcp::Accept::from(orig_dst)
})
// Boxing is necessary purely to limit the link-time overhead of
// having enormous types.
.push(svc::BoxNewService::layer())

View File

@ -15,7 +15,7 @@ pub mod tcp;
pub(crate) mod test_util;
use linkerd_app_core::{
config::ProxyConfig,
config::{ProxyConfig, ServerConfig},
io, metrics, profiles,
proxy::{
api_resolve::{ConcreteAddr, Metadata},
@ -24,10 +24,10 @@ use linkerd_app_core::{
serve,
svc::{self, stack::Param},
tls,
transport::OrigDstAddr,
transport::{addrs::*, listen::Bind},
AddrMatch, Error, ProxyRuntime,
};
use std::{collections::HashMap, future::Future, net::SocketAddr, time::Duration};
use std::{collections::HashMap, future::Future, time::Duration};
use tracing::info;
const EWMA_DEFAULT_RTT: Duration = Duration::from_millis(30);
@ -103,7 +103,8 @@ impl<S> Outbound<S> {
Service = impl svc::Service<I, Response = (), Error = Error, Future = impl Send>,
>
where
T: Param<Option<OrigDstAddr>>,
Self: Clone + 'static,
T: Param<OrigDstAddr> + Param<Remote<ClientAddr>> + Clone + Send + Sync + 'static,
S: Clone + Send + Sync + Unpin + 'static,
S: svc::Service<tcp::Connect, Error = io::Error>,
S::Response: tls::HasNegotiatedProtocol,
@ -135,8 +136,15 @@ impl<S> Outbound<S> {
}
impl Outbound<()> {
pub fn serve<P, R>(self, profiles: P, resolve: R) -> (SocketAddr, impl Future<Output = ()>)
pub fn serve<B, P, R>(
self,
bind: B,
profiles: P,
resolve: R,
) -> (Local<ServerAddr>, impl Future<Output = ()>)
where
B: Bind<ServerConfig>,
B::Addrs: Param<Remote<ClientAddr>> + Param<OrigDstAddr>,
R: Clone + Send + Sync + Unpin + 'static,
R: Resolve<ConcreteAddr, Endpoint = Metadata, Error = Error>,
R::Resolution: Send,
@ -145,12 +153,8 @@ impl Outbound<()> {
P::Future: Send,
P::Error: Send,
{
let (listen_addr, listen) = self
.config
.proxy
.server
.bind
.bind()
let (listen_addr, listen) = bind
.bind(&self.config.proxy.server)
.expect("Failed to bind outbound listener");
let serve = async move {

View File

@ -23,23 +23,6 @@ impl From<OrigDstAddr> for Accept {
}
}
impl std::convert::TryFrom<Option<OrigDstAddr>> for Accept {
type Error = std::io::Error;
fn try_from(orig_dst: Option<OrigDstAddr>) -> Result<Self, Self::Error> {
match orig_dst {
Some(addr) => Ok(Self::from(addr)),
None => {
tracing::warn!("No SO_ORIGINAL_DST address found!");
Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
"No SO_ORIGINAL_DST address found",
))
}
}
}
}
impl<P> From<(P, Accept)> for target::Accept<P> {
fn from((protocol, Accept { orig_dst, .. }): (P, Accept)) -> Self {
Self { orig_dst, protocol }

View File

@ -13,7 +13,7 @@ use linkerd_app_core::{
io, svc,
svc::NewService,
tls,
transport::{listen, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr},
transport::{addrs::*, listen, orig_dst},
Addr, Conditional, Error, IpMatch, NameAddr,
};
use std::{
@ -62,7 +62,7 @@ async fn plaintext_tcp() {
support::resolver().endpoint_exists(target_addr, target_addr, Default::default());
// Build the outbound TCP balancer stack.
let cfg = default_config(target_addr);
let cfg = default_config();
let (rt, _) = runtime();
Outbound::new(cfg, rt)
.with_stack(connect)
@ -141,7 +141,7 @@ async fn tls_when_hinted() {
// Build the outbound TCP balancer stack.
let (rt, _) = runtime();
let mut stack = Outbound::new(default_config(([0, 0, 0, 0], 0).into()), rt)
let mut stack = Outbound::new(default_config(), rt)
.with_stack(connect)
.push_tcp_logical(resolver)
.into_inner();
@ -164,7 +164,7 @@ async fn resolutions_are_reused() {
let _trace = support::trace_init();
let addr = SocketAddr::new([0, 0, 0, 0].into(), 5550);
let cfg = default_config(addr);
let cfg = default_config();
let svc_addr = NameAddr::from_str("foo.ns1.svc.example.com:5550").unwrap();
// Build a mock "connector" that returns the upstream "server" IO.
@ -242,7 +242,7 @@ async fn load_balances() {
),
];
let cfg = default_config(addr);
let cfg = default_config();
let svc_addr = NameAddr::from_str("foo.ns1.svc.example.com:5550").unwrap();
let id_name = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local")
.expect("hostname is valid");
@ -336,7 +336,7 @@ async fn load_balancer_add_endpoints() {
),
];
let cfg = default_config(addr);
let cfg = default_config();
let svc_addr = NameAddr::from_str("foo.ns1.svc.example.com:5550").unwrap();
let id_name = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local")
.expect("hostname is valid");
@ -449,7 +449,7 @@ async fn load_balancer_remove_endpoints() {
),
];
let cfg = default_config(addr);
let cfg = default_config();
let svc_addr = NameAddr::from_str("foo.ns1.svc.example.com:5550").unwrap();
let id_name = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local")
.expect("hostname is valid");
@ -542,7 +542,7 @@ async fn no_profiles_when_outside_search_nets() {
let no_profile_addr = SocketAddr::new([126, 32, 5, 18].into(), 5550);
let cfg = Config {
allow_discovery: IpMatch::new(Some(IpNet::from_str("10.0.0.0/8").unwrap())).into(),
..default_config(profile_addr)
..default_config()
};
let svc_addr = NameAddr::from_str("foo.ns1.svc.example.com:5550").unwrap();
let id_name = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local")
@ -616,7 +616,7 @@ async fn no_discovery_when_profile_has_an_endpoint() {
let _trace = support::trace_init();
let ep = SocketAddr::new([10, 0, 0, 41].into(), 5550);
let cfg = default_config(ep);
let cfg = default_config();
let meta = support::resolver::Metadata::new(
Default::default(),
support::resolver::ProtocolHint::Unknown,
@ -661,7 +661,7 @@ async fn profile_endpoint_propagates_conn_errors() {
let ep1 = SocketAddr::new([10, 0, 0, 41].into(), 5550);
let ep2 = SocketAddr::new([10, 0, 0, 42].into(), 5550);
let cfg = default_config(ep1);
let cfg = default_config();
let id_name = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local")
.expect("hostname is invalid");
let meta = support::resolver::Metadata::new(
@ -697,12 +697,7 @@ async fn profile_endpoint_propagates_conn_errors() {
// Build the outbound server
let mut server = build_server(cfg, profiles, resolver, connect);
let svc = server.new_service(listen::Addrs::new(
Local(ServerAddr(([127, 0, 0, 1], 4140).into())),
Remote(ClientAddr(([127, 0, 0, 1], 666).into())),
Some(OrigDstAddr(ep1)),
));
let svc = server.new_service(addrs(ep1));
let res = svc
.oneshot(
support::io()
@ -765,7 +760,7 @@ fn build_server<I>(
resolver: resolver::Dst<resolver::Metadata>,
connect: Connect<Endpoint>,
) -> impl svc::NewService<
listen::Addrs,
orig_dst::Addrs,
Service = impl tower::Service<
I,
Response = (),
@ -792,7 +787,7 @@ fn hello_world_client<N, S>(
new_svc: &mut N,
) -> impl Future<Output = ()> + Send
where
N: svc::NewService<listen::Addrs, Service = S> + Send + 'static,
N: svc::NewService<orig_dst::Addrs, Service = S> + Send + 'static,
S: svc::Service<support::io::Mock, Response = ()> + Send + 'static,
S::Error: Into<Error>,
S::Future: Send + 'static,
@ -800,11 +795,13 @@ where
let span = tracing::info_span!("hello_world_client", %orig_dst);
let svc = {
let _e = span.enter();
let addrs = listen::Addrs::new(
Local(ServerAddr(([127, 0, 0, 1], 4140).into())),
Remote(ClientAddr(([127, 0, 0, 1], 666).into())),
Some(OrigDstAddr(orig_dst)),
);
let addrs = orig_dst::Addrs {
orig_dst: OrigDstAddr(orig_dst),
inner: listen::Addrs {
server: Local(ServerAddr(([127, 0, 0, 1], 4140).into())),
client: Remote(ClientAddr(([127, 0, 0, 1], 666).into())),
},
};
let svc = new_svc.new_service(addrs);
tracing::trace!("new service");
svc

View File

@ -7,25 +7,20 @@ use linkerd_app_core::{
http::{h1, h2},
tap,
},
transport::{BindTcp, Keepalive, ListenAddr},
transport::{self, orig_dst, Keepalive, ListenAddr},
IpMatch, ProxyRuntime,
};
pub use linkerd_app_test as support;
use std::{net::SocketAddr, str::FromStr, time::Duration};
const LOCALHOST: [u8; 4] = [127, 0, 0, 1];
pub fn default_config(orig_dst: SocketAddr) -> Config {
pub fn default_config() -> Config {
Config {
ingress_mode: false,
allow_discovery: IpMatch::new(Some(IpNet::from_str("0.0.0.0/0").unwrap())).into(),
proxy: config::ProxyConfig {
server: config::ServerConfig {
bind: BindTcp::new(
ListenAddr(SocketAddr::new(LOCALHOST.into(), 0)),
Keepalive(None),
)
.with_orig_dst_addr(orig_dst.into()),
addr: ListenAddr(([0, 0, 0, 0], 0).into()),
keepalive: Keepalive(None),
h2_settings: h2::Settings::default(),
},
connect: config::ConnectConfig {
@ -65,3 +60,14 @@ pub fn runtime() -> (ProxyRuntime, drain::Signal) {
};
(runtime, drain_tx)
}
pub fn addrs(od: SocketAddr) -> orig_dst::Addrs {
use transport::{addrs::*, listen};
orig_dst::Addrs {
orig_dst: OrigDstAddr(od),
inner: listen::Addrs {
server: Local(ServerAddr(([127, 0, 0, 1], 4140).into())),
client: Remote(ClientAddr(([127, 0, 0, 1], 666).into())),
},
}
}

View File

@ -3,17 +3,18 @@ use crate::core::{
config::ServerConfig,
detect, drain, errors,
metrics::{self, FmtMetrics},
serve, tls, trace,
transport::listen,
serve,
svc::{self, Param},
tls, trace,
transport::{listen::Bind, ClientAddr, Local, Remote, ServerAddr},
Error,
};
use crate::{
http,
identity::LocalCrtKey,
inbound::target::{HttpAccept, Target, TcpAccept},
svc,
};
use std::{net::SocketAddr, pin::Pin, time::Duration};
use std::{pin::Pin, time::Duration};
use tokio::sync::mpsc;
use tracing::debug;
@ -24,7 +25,7 @@ pub struct Config {
}
pub struct Admin {
pub listen_addr: SocketAddr,
pub listen_addr: Local<ServerAddr>,
pub latch: admin::Latch,
pub serve: Pin<Box<dyn std::future::Future<Output = ()> + Send + 'static>>,
}
@ -32,8 +33,10 @@ pub struct Admin {
// === impl Config ===
impl Config {
pub fn build<R>(
#[allow(clippy::clippy::too_many_arguments)]
pub fn build<B, R>(
self,
bind: B,
identity: Option<LocalCrtKey>,
report: R,
metrics: metrics::Proxy,
@ -43,10 +46,12 @@ impl Config {
) -> Result<Admin, Error>
where
R: FmtMetrics + Clone + Send + 'static + Unpin,
B: Bind<ServerConfig>,
B::Addrs: svc::Param<Remote<ClientAddr>> + svc::Param<Local<ServerAddr>>,
{
const DETECT_TIMEOUT: Duration = Duration::from_secs(1);
let (listen_addr, listen) = self.server.bind.bind()?;
let (listen_addr, listen) = bind.bind(&self.server)?;
let (ready, latch) = admin::Readiness::new();
let admin = admin::Admin::new(report, ready, shutdown, trace);
@ -76,8 +81,15 @@ impl Config {
http::DetectHttp::default(),
))
.push(metrics.transport.layer_accept())
.push_map_target(TcpAccept::from_local_addr)
.check_new_clone::<tls::server::Meta<listen::Addrs>>()
.push_map_target(|(tls, addrs): (tls::ConditionalServerTls, B::Addrs)| {
// TODO this should use an admin-specific target type.
let Local(ServerAddr(target_addr)) = addrs.param();
TcpAccept {
tls,
client_addr: addrs.param(),
target_addr,
}
})
.push(tls::NewDetectTls::layer(identity, DETECT_TIMEOUT))
.into_inner();

View File

@ -4,7 +4,7 @@ use crate::core::{
control::{Config as ControlConfig, ControlAddr},
proxy::http::{h1, h2},
tls,
transport::{BindTcp, Keepalive, ListenAddr},
transport::{Keepalive, ListenAddr},
Addr, AddrMatch, Conditional, NameMatch,
};
use crate::{dns, gateway, identity, inbound, oc_collector, outbound};
@ -52,15 +52,6 @@ pub const ENV_INBOUND_LISTEN_ADDR: &str = "LINKERD2_PROXY_INBOUND_LISTEN_ADDR";
pub const ENV_CONTROL_LISTEN_ADDR: &str = "LINKERD2_PROXY_CONTROL_LISTEN_ADDR";
pub const ENV_ADMIN_LISTEN_ADDR: &str = "LINKERD2_PROXY_ADMIN_LISTEN_ADDR";
// When compiled with the `mock-orig-dst` flag, these environment variables are required to
// configure the proxy's behavior.
#[cfg(feature = "mock-orig-dst")]
pub const ENV_INBOUND_ORIG_DST_ADDR: &str = "LINKERD2_PROXY_INBOUND_ORIG_DST_ADDR";
#[cfg(feature = "mock-orig-dst")]
pub const ENV_OUTBOUND_ORIG_DST_ADDR: &str = "LINKERD2_PROXY_OUTBOUND_ORIG_DST_ADDR";
pub const ENV_METRICS_RETAIN_IDLE: &str = "LINKERD2_PROXY_METRICS_RETAIN_IDLE";
const ENV_INGRESS_MODE: &str = "LINKERD2_PROXY_INGRESS_MODE";
@ -262,12 +253,6 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
let inbound_connect_keepalive = parse(strings, ENV_INBOUND_CONNECT_KEEPALIVE, parse_duration);
let outbound_connect_keepalive = parse(strings, ENV_OUTBOUND_CONNECT_KEEPALIVE, parse_duration);
#[cfg(feature = "mock-orig-dst")]
let (inbound_mock_orig_dst, outbound_mock_orig_dst) = (
parse(strings, ENV_INBOUND_ORIG_DST_ADDR, parse_socket_addr),
parse(strings, ENV_OUTBOUND_ORIG_DST_ADDR, parse_socket_addr),
);
let inbound_disable_ports = parse(
strings,
ENV_INBOUND_PORTS_DISABLE_PROTOCOL_DETECTION,
@ -360,26 +345,6 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
let buffer_capacity = buffer_capacity?.unwrap_or(DEFAULT_BUFFER_CAPACITY);
#[cfg(feature = "mock-orig-dst")]
let (inbound_orig_dst, outbound_orig_dst) = (
inbound_mock_orig_dst?
.map(DefaultOrigDstAddr::from)
.ok_or_else(|| {
error!("{} must be specified", ENV_INBOUND_ORIG_DST_ADDR);
EnvError::NoDestinationAddress
})?,
outbound_mock_orig_dst?
.map(DefaultOrigDstAddr::from)
.ok_or_else(|| {
error!("{} must be specified", ENV_OUTBOUND_ORIG_DST_ADDR);
EnvError::NoDestinationAddress
})?,
);
#[cfg(not(feature = "mock-orig-dst"))]
let (inbound_orig_dst, outbound_orig_dst): (DefaultOrigDstAddr, DefaultOrigDstAddr) =
Default::default();
let dst_profile_suffixes = dst_profile_suffixes?
.unwrap_or_else(|| parse_dns_suffixes(DEFAULT_DESTINATION_PROFILE_SUFFIXES).unwrap());
let dst_profile_networks = dst_profile_networks?.unwrap_or_default();
@ -387,16 +352,14 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
let outbound = {
let ingress_mode = parse(strings, ENV_INGRESS_MODE, parse_bool)?.unwrap_or(false);
let keepalive = Keepalive(outbound_accept_keepalive?);
let bind = BindTcp::new(
ListenAddr(
let addr = ListenAddr(
outbound_listener_addr?
.unwrap_or_else(|| parse_socket_addr(DEFAULT_OUTBOUND_LISTEN_ADDR).unwrap()),
),
keepalive,
);
let keepalive = Keepalive(outbound_accept_keepalive?);
let server = ServerConfig {
bind: bind.with_orig_dst_addr(outbound_orig_dst),
addr,
keepalive,
h2_settings: h2::Settings {
keepalive_timeout: keepalive.into(),
..h2_settings
@ -451,16 +414,14 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
};
let inbound = {
let keepalive = Keepalive(inbound_accept_keepalive?);
let bind = BindTcp::new(
ListenAddr(
let addr = ListenAddr(
inbound_listener_addr?
.unwrap_or_else(|| parse_socket_addr(DEFAULT_INBOUND_LISTEN_ADDR).unwrap()),
),
keepalive,
);
let keepalive = Keepalive(inbound_accept_keepalive?);
let server = ServerConfig {
bind: bind.with_orig_dst_addr(inbound_orig_dst),
addr,
keepalive,
h2_settings: h2::Settings {
keepalive_timeout: keepalive.into(),
..h2_settings
@ -507,7 +468,7 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
// Ensure that connections thaat directly target the inbound port are
// secured (unless identity is disabled).
let inbound_port = server.bind.addr().as_ref().port();
let inbound_port = server.addr.as_ref().port();
if !id_disabled && !require_identity_for_inbound_ports.contains(&inbound_port) {
debug!(
"Adding {} to {}",
@ -566,13 +527,11 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
let admin = super::admin::Config {
metrics_retain_idle: metrics_retain_idle?.unwrap_or(DEFAULT_METRICS_RETAIN_IDLE),
server: ServerConfig {
bind: BindTcp::new(
ListenAddr(
addr: ListenAddr(
admin_listener_addr?
.unwrap_or_else(|| parse_socket_addr(DEFAULT_ADMIN_LISTEN_ADDR).unwrap()),
),
inbound.proxy.server.bind.keepalive(),
),
keepalive: inbound.proxy.server.keepalive,
h2_settings,
},
};
@ -617,7 +576,8 @@ pub fn parse_config<S: Strings>(strings: &S) -> Result<super::Config, EnvError>
.map(|(addr, ids)| super::tap::Config::Enabled {
permitted_client_ids: ids,
config: ServerConfig {
bind: BindTcp::new(ListenAddr(addr), inbound.proxy.server.bind.keepalive()),
addr: ListenAddr(addr),
keepalive: inbound.proxy.server.keepalive,
h2_settings,
},
})

View File

@ -12,12 +12,20 @@ pub mod tap;
pub use self::metrics::Metrics;
use futures::{future, FutureExt, TryFutureExt};
pub use linkerd_app_core::{self as core, metrics, trace};
use linkerd_app_core::{control::ControlAddr, dns, drain, proxy::http, svc, Error, ProxyRuntime};
use linkerd_app_core::{
config::ServerConfig,
control::ControlAddr,
dns, drain,
proxy::http,
svc::Param,
transport::{listen::Bind, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr},
Error, ProxyRuntime,
};
use linkerd_app_gateway as gateway;
use linkerd_app_inbound::{self as inbound, Inbound};
use linkerd_app_outbound::{self as outbound, Outbound};
use linkerd_channel::into_stream::IntoStream;
use std::{net::SocketAddr, pin::Pin};
use std::pin::Pin;
use tokio::{sync::mpsc, time::Duration};
use tracing::instrument::Instrument;
use tracing::{debug, info, info_span};
@ -53,9 +61,9 @@ pub struct App {
drain: drain::Signal,
dst: ControlAddr,
identity: identity::Identity,
inbound_addr: SocketAddr,
inbound_addr: Local<ServerAddr>,
oc_collector: oc_collector::OcCollector,
outbound_addr: SocketAddr,
outbound_addr: Local<ServerAddr>,
start_proxy: Pin<Box<dyn std::future::Future<Output = ()> + Send + 'static>>,
tap: tap::Tap,
}
@ -64,16 +72,29 @@ impl Config {
pub fn try_from_env() -> Result<Self, env::EnvError> {
env::Env.try_config()
}
}
impl Config {
/// Build an application.
///
/// It is currently required that this be run on a Tokio runtime, since some
/// services are created eagerly and must spawn tasks to do so.
pub async fn build(
pub async fn build<BIn, BOut, BAdmin>(
self,
bind_in: BIn,
bind_out: BOut,
bind_admin: BAdmin,
shutdown_tx: mpsc::UnboundedSender<()>,
log_level: trace::Handle,
) -> Result<App, Error> {
) -> Result<App, Error>
where
BIn: Bind<ServerConfig> + 'static,
BIn::Addrs: Param<Remote<ClientAddr>> + Param<Local<ServerAddr>> + Param<OrigDstAddr>,
BOut: Bind<ServerConfig> + 'static,
BOut::Addrs: Param<Remote<ClientAddr>> + Param<Local<ServerAddr>> + Param<OrigDstAddr>,
BAdmin: Bind<ServerConfig> + Clone + 'static,
BAdmin::Addrs: Param<Remote<ClientAddr>> + Param<Local<ServerAddr>>,
{
use metrics::FmtMetrics;
let Config {
@ -98,7 +119,11 @@ impl Config {
let (drain_tx, drain_rx) = drain::channel();
let tap = info_span!("tap").in_scope(|| tap.build(identity.local(), drain_rx.clone()))?;
let tap = {
let bind = bind_admin.clone();
info_span!("tap").in_scope(|| tap.build(bind, identity.local(), drain_rx.clone()))?
};
let dst = {
let metrics = metrics.control.clone();
let dns = dns.resolver.clone();
@ -119,7 +144,15 @@ impl Config {
let drain = drain_rx.clone();
let metrics = metrics.inbound.clone();
info_span!("admin").in_scope(move || {
admin.build(identity, report, metrics, log_level, drain, shutdown_tx)
admin.build(
bind_admin,
identity,
report,
metrics,
log_level,
drain,
shutdown_tx,
)
})?
};
@ -155,8 +188,9 @@ impl Config {
dst.resolve.clone(),
);
let (inbound_addr, inbound_serve) = inbound.serve(dst.profiles.clone(), gateway_stack);
let (outbound_addr, outbound_serve) = outbound.serve(dst.profiles, dst.resolve);
let (inbound_addr, inbound_serve) =
inbound.serve(bind_in, dst.profiles.clone(), gateway_stack);
let (outbound_addr, outbound_serve) = outbound.serve(bind_out, dst.profiles, dst.resolve);
let start_proxy = Box::pin(async move {
tokio::spawn(outbound_serve.instrument(info_span!("outbound")));
@ -178,19 +212,19 @@ impl Config {
}
impl App {
pub fn admin_addr(&self) -> SocketAddr {
pub fn admin_addr(&self) -> Local<ServerAddr> {
self.admin.listen_addr
}
pub fn inbound_addr(&self) -> SocketAddr {
pub fn inbound_addr(&self) -> Local<ServerAddr> {
self.inbound_addr
}
pub fn outbound_addr(&self) -> SocketAddr {
pub fn outbound_addr(&self) -> Local<ServerAddr> {
self.outbound_addr
}
pub fn tap_addr(&self) -> Option<SocketAddr> {
pub fn tap_addr(&self) -> Option<Local<ServerAddr>> {
match self.tap {
tap::Tap::Disabled { .. } => None,
tap::Tap::Enabled { listen_addr, .. } => Some(listen_addr),

View File

@ -1,10 +1,17 @@
use futures::prelude::*;
use indexmap::IndexSet;
use linkerd_app_core::{
config::ServerConfig, drain, proxy::identity::LocalCrtKey, proxy::tap, serve, tls,
transport::listen::Addrs, Error,
config::ServerConfig,
drain,
proxy::identity::LocalCrtKey,
proxy::tap,
serve,
svc::{self, Param},
tls,
transport::{listen::Bind, ClientAddr, Local, Remote, ServerAddr},
Error,
};
use std::{net::SocketAddr, pin::Pin};
use std::pin::Pin;
use tower::util::{service_fn, ServiceExt};
#[derive(Clone, Debug)]
@ -21,14 +28,23 @@ pub enum Tap {
registry: tap::Registry,
},
Enabled {
listen_addr: SocketAddr,
listen_addr: Local<ServerAddr>,
registry: tap::Registry,
serve: Pin<Box<dyn std::future::Future<Output = ()> + Send + 'static>>,
},
}
impl Config {
pub fn build(self, identity: Option<LocalCrtKey>, drain: drain::Watch) -> Result<Tap, Error> {
pub fn build<B>(
self,
bind: B,
identity: Option<LocalCrtKey>,
drain: drain::Watch,
) -> Result<Tap, Error>
where
B: Bind<ServerConfig>,
B::Addrs: Param<Remote<ClientAddr>>,
{
let (registry, server) = tap::new();
match self {
Config::Disabled => {
@ -39,12 +55,16 @@ impl Config {
config,
permitted_client_ids,
} => {
let (listen_addr, listen) = config.bind.bind()?;
let service = tap::AcceptPermittedClients::new(permitted_client_ids.into(), server);
let accept = tls::NewDetectTls::new(
identity,
move |meta: tls::server::Meta<Addrs>| {
let (listen_addr, listen) = bind.bind(&config)?;
let accept = svc::stack(server)
.push(svc::layer::mk(move |service| {
tap::AcceptPermittedClients::new(
permitted_client_ids.clone().into(),
service,
)
}))
.push(svc::layer::mk(|service: tap::AcceptPermittedClients| {
move |meta: tls::server::Meta<B::Addrs>| {
let service = service.clone();
service_fn(move |io| {
let fut = service.clone().oneshot((meta.clone(), io));
@ -52,9 +72,15 @@ impl Config {
fut.err_into::<Error>().await?.err_into::<Error>().await
})
})
},
}
}))
.check_new_service::<tls::server::Meta<B::Addrs>, _>()
.push(tls::NewDetectTls::layer(
identity,
std::time::Duration::from_secs(1),
);
))
.check_new_service::<B::Addrs, _>()
.into_inner();
let serve = Box::pin(serve::serve(listen, accept, drain.signaled()));

View File

@ -23,7 +23,7 @@ http = "0.2"
http-body = "0.4"
hyper = { version = "0.14.2", features = ["http1", "http2"] }
linkerd-channel = { path = "../../channel" }
linkerd-app-core = { path = "../core", features = ["mock-orig-dst"] }
linkerd-app-core = { path = "../core" }
linkerd-identity = { path = "../../identity" }
linkerd-io = { path = "../../io", features = ["tokio-test"] }
regex = "1"

View File

@ -13,7 +13,6 @@ use std::{
sync::Arc,
task::{Context, Poll},
};
use tokio::net::TcpStream;
use tower::Service;
#[derive(Clone, Debug)]
@ -64,7 +63,10 @@ impl AcceptPermittedClients {
}
}
impl<T> Service<Connection<T, io::ScopedIo<TcpStream>>> for AcceptPermittedClients {
impl<T, I> Service<Connection<T, I>> for AcceptPermittedClients
where
I: io::AsyncRead + io::AsyncWrite + Send + Unpin + 'static,
{
type Response = ServeFuture;
type Error = Error;
type Future = future::Ready<Result<Self::Response, Self::Error>>;
@ -73,7 +75,7 @@ impl<T> Service<Connection<T, io::ScopedIo<TcpStream>>> for AcceptPermittedClien
Poll::Ready(Ok(()))
}
fn call(&mut self, conn: Connection<T, io::ScopedIo<TcpStream>>) -> Self::Future {
fn call(&mut self, conn: Connection<T, I>) -> Self::Future {
match conn {
((Conditional::Some(tls), _), io) => {
if let tls::ServerTls::Established {

View File

@ -9,11 +9,6 @@ description = """
Transport-level implementations that rely on core proxy infrastructure
"""
# This should probably be decomposed into smaller, decoupled crates.
[features]
mock-orig-dst = []
[dependencies]
bytes = "1"
futures = "0.3.9"

View File

@ -4,11 +4,13 @@ pub mod addrs;
mod connect;
pub mod listen;
pub mod metrics;
pub mod orig_dst;
pub use self::{
addrs::{ClientAddr, ListenAddr, Local, OrigDstAddr, Remote, ServerAddr},
connect::ConnectTcp,
listen::{BindTcp, DefaultOrigDstAddr, GetOrigDstAddr, NoOrigDstAddr},
listen::{Bind, BindTcp},
orig_dst::BindWithOrigDst,
};
use std::time::Duration;
use tokio::net::TcpStream;

View File

@ -1,307 +1,92 @@
use crate::{addrs::*, Keepalive};
use futures::prelude::*;
use linkerd_io as io;
use linkerd_stack::Param;
use std::{io, net::SocketAddr};
use std::{fmt, pin::Pin};
use tokio::net::TcpStream;
use tokio_stream::wrappers::TcpListenerStream;
use tracing::trace;
/// A mockable source for address info, i.e., for tests.
pub trait GetOrigDstAddr: Clone {
fn orig_dst_addr(&self, socket: &TcpStream) -> Option<OrigDstAddr>;
/// Binds a listener, producing a stream of incoming connections.
///
/// Typically, this represents binding a TCP socket. However, it may also be an
/// stream of in-memory mock connections, for testing purposes.
pub trait Bind<T> {
type Io: io::AsyncRead
+ io::AsyncWrite
+ io::Peek
+ io::PeerAddr
+ fmt::Debug
+ Unpin
+ Send
+ Sync
+ 'static;
type Addrs: Clone + Send + Sync + 'static;
type Incoming: Stream<Item = io::Result<(Self::Addrs, Self::Io)>> + Send + Sync + 'static;
fn bind(self, params: &T) -> io::Result<Bound<Self::Incoming>>;
}
#[derive(Clone, Debug)]
pub struct BindTcp<O = NoOrigDstAddr> {
addr: ListenAddr,
keepalive: Keepalive,
orig_dst_addr: O,
}
pub type Bound<I> = (Local<ServerAddr>, I);
pub type Connection = (Addrs, TcpStream);
#[derive(Copy, Clone, Debug, Default)]
pub struct BindTcp(());
#[derive(Clone, Debug)]
pub struct Addrs {
server: Local<ServerAddr>,
client: Remote<ClientAddr>,
orig_dst: Option<OrigDstAddr>,
pub server: Local<ServerAddr>,
pub client: Remote<ClientAddr>,
}
#[derive(Copy, Clone, Debug)]
pub struct NoOrigDstAddr(());
// The mock-orig-dst feature disables use of the syscall-based OrigDstAddr implementation and
// replaces it with one that must be configured.
#[cfg(not(feature = "mock-orig-dst"))]
pub use self::sys::SysOrigDstAddr as DefaultOrigDstAddr;
#[cfg(feature = "mock-orig-dst")]
pub use self::mock::MockOrigDstAddr as DefaultOrigDstAddr;
// === impl BindTcp ===
impl BindTcp {
pub fn new(addr: ListenAddr, keepalive: Keepalive) -> Self {
Self {
addr,
keepalive,
orig_dst_addr: NoOrigDstAddr(()),
}
}
}
impl<A: GetOrigDstAddr> BindTcp<A> {
pub fn with_orig_dst_addr<B: GetOrigDstAddr>(self, orig_dst_addr: B) -> BindTcp<B> {
BindTcp {
orig_dst_addr,
addr: self.addr,
keepalive: self.keepalive,
pub fn with_orig_dst() -> super::BindWithOrigDst<Self> {
super::BindWithOrigDst::from(Self::default())
}
}
pub fn addr(&self) -> ListenAddr {
self.addr
}
impl<T> Bind<T> for BindTcp
where
T: Param<ListenAddr> + Param<Keepalive>,
{
type Addrs = Addrs;
type Incoming = Pin<Box<dyn Stream<Item = io::Result<(Self::Addrs, Self::Io)>> + Send + Sync>>;
type Io = TcpStream;
pub fn keepalive(&self) -> Keepalive {
self.keepalive
}
pub fn bind(&self) -> io::Result<(SocketAddr, impl Stream<Item = io::Result<Connection>>)> {
fn bind(self, params: &T) -> io::Result<Bound<Self::Incoming>> {
let listen = {
let l = std::net::TcpListener::bind(self.addr)?;
let ListenAddr(addr) = params.param();
let l = std::net::TcpListener::bind(addr)?;
// Ensure that O_NONBLOCK is set on the socket before using it with Tokio.
l.set_nonblocking(true)?;
tokio::net::TcpListener::from_std(l).expect("listener must be valid")
};
let addr = listen.local_addr()?;
let keepalive = self.keepalive;
let get_orig = self.orig_dst_addr.clone();
let accept = TcpListenerStream::new(listen)
.and_then(move |tcp| future::ready(Self::accept(tcp, keepalive, get_orig.clone())));
Ok((addr, accept))
}
fn accept(tcp: TcpStream, keepalive: Keepalive, get_orig: A) -> io::Result<Connection> {
let addrs = {
let server = Local(ServerAddr(tcp.local_addr()?));
let client = Remote(ClientAddr(tcp.peer_addr()?));
let orig_dst = get_orig.orig_dst_addr(&tcp);
trace!(
server.addr = %server,
client.addr = %client,
orig.addr = ?orig_dst,
"Accepted",
);
Addrs::new(server, client, orig_dst)
};
let Keepalive(keepalive) = keepalive;
let server = Local(ServerAddr(listen.local_addr()?));
let Keepalive(keepalive) = params.param();
let accept = TcpListenerStream::new(listen).map(move |res| {
let tcp = res?;
super::set_nodelay_or_warn(&tcp);
super::set_keepalive_or_warn(&tcp, keepalive);
let client = Remote(ClientAddr(tcp.peer_addr()?));
Ok((Addrs { server, client }, tcp))
});
Ok((addrs, tcp))
Ok((server, Box::pin(accept)))
}
}
impl Addrs {
pub fn new(
server: Local<ServerAddr>,
client: Remote<ClientAddr>,
orig_dst: Option<OrigDstAddr>,
) -> Self {
Self {
server,
client,
orig_dst,
}
}
pub fn server(&self) -> Local<ServerAddr> {
self.server
}
pub fn client(&self) -> Remote<ClientAddr> {
self.client
}
pub fn orig_dst(&self) -> Option<OrigDstAddr> {
self.orig_dst
}
pub fn target_addr(&self) -> SocketAddr {
self.orig_dst
.map(Into::into)
.unwrap_or_else(|| self.server.into())
}
}
impl GetOrigDstAddr for NoOrigDstAddr {
fn orig_dst_addr(&self, _: &TcpStream) -> Option<OrigDstAddr> {
None
}
}
impl Param<Option<OrigDstAddr>> for Addrs {
#[inline]
fn param(&self) -> Option<OrigDstAddr> {
self.orig_dst()
}
}
// === impl Addrs ===
impl Param<Remote<ClientAddr>> for Addrs {
#[inline]
fn param(&self) -> Remote<ClientAddr> {
self.client()
self.client
}
}
impl Param<Local<ServerAddr>> for Addrs {
#[inline]
fn param(&self) -> Local<ServerAddr> {
self.server()
}
}
#[cfg(not(feature = "mock-orig-dst"))]
mod sys {
use super::{GetOrigDstAddr, OrigDstAddr, TcpStream};
#[derive(Copy, Clone, Debug, Default)]
pub struct SysOrigDstAddr(());
impl GetOrigDstAddr for SysOrigDstAddr {
#[cfg(target_os = "linux")]
fn orig_dst_addr(&self, sock: &TcpStream) -> Option<OrigDstAddr> {
use std::os::unix::io::AsRawFd;
let fd = sock.as_raw_fd();
let r = unsafe { linux::so_original_dst(fd) };
r.map(OrigDstAddr).ok()
}
#[cfg(not(target_os = "linux"))]
fn orig_dst_addr(&self, _sock: &TcpStream) -> Option<OrigDstAddr> {
None
}
}
#[cfg(target_os = "linux")]
mod linux {
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use std::os::unix::io::RawFd;
use std::{io, mem};
use tracing::warn;
pub unsafe fn so_original_dst(fd: RawFd) -> io::Result<SocketAddr> {
let mut sockaddr: libc::sockaddr_storage = mem::zeroed();
let mut socklen: libc::socklen_t = mem::size_of::<libc::sockaddr_storage>() as u32;
let ret = libc::getsockopt(
fd,
libc::SOL_IP,
libc::SO_ORIGINAL_DST,
&mut sockaddr as *mut _ as *mut _,
&mut socklen as *mut _ as *mut _,
);
if ret != 0 {
let e = io::Error::last_os_error();
warn!("failed to read SO_ORIGINAL_DST: {:?}", e);
return Err(e);
}
mk_addr(&sockaddr, socklen)
}
// Borrowed with love from net2-rs
// https://github.com/rust-lang-nursery/net2-rs/blob/1b4cb4fb05fbad750b271f38221eab583b666e5e/src/socket.rs#L103
fn mk_addr(
storage: &libc::sockaddr_storage,
len: libc::socklen_t,
) -> io::Result<SocketAddr> {
match storage.ss_family as libc::c_int {
libc::AF_INET => {
assert!(len as usize >= mem::size_of::<libc::sockaddr_in>());
let sa = {
let sa = storage as *const _ as *const libc::sockaddr_in;
unsafe { *sa }
};
let bits = ntoh32(sa.sin_addr.s_addr);
let ip = Ipv4Addr::new(
(bits >> 24) as u8,
(bits >> 16) as u8,
(bits >> 8) as u8,
bits as u8,
);
let port = sa.sin_port;
Ok(SocketAddr::V4(SocketAddrV4::new(ip, ntoh16(port))))
}
libc::AF_INET6 => {
assert!(len as usize >= mem::size_of::<libc::sockaddr_in6>());
let sa = {
let sa = storage as *const _ as *const libc::sockaddr_in6;
unsafe { *sa }
};
let arr = sa.sin6_addr.s6_addr;
let ip = Ipv6Addr::new(
(arr[0] as u16) << 8 | (arr[1] as u16),
(arr[2] as u16) << 8 | (arr[3] as u16),
(arr[4] as u16) << 8 | (arr[5] as u16),
(arr[6] as u16) << 8 | (arr[7] as u16),
(arr[8] as u16) << 8 | (arr[9] as u16),
(arr[10] as u16) << 8 | (arr[11] as u16),
(arr[12] as u16) << 8 | (arr[13] as u16),
(arr[14] as u16) << 8 | (arr[15] as u16),
);
let port = sa.sin6_port;
let flowinfo = sa.sin6_flowinfo;
let scope_id = sa.sin6_scope_id;
Ok(SocketAddr::V6(SocketAddrV6::new(
ip,
ntoh16(port),
flowinfo,
scope_id,
)))
}
_ => Err(io::Error::new(
io::ErrorKind::InvalidInput,
"invalid argument",
)),
}
}
fn ntoh16(i: u16) -> u16 {
<u16>::from_be(i)
}
fn ntoh32(i: u32) -> u32 {
<u32>::from_be(i)
}
}
}
#[cfg(feature = "mock-orig-dst")]
mod mock {
use super::{GetOrigDstAddr, OrigDstAddr, SocketAddr, TcpStream};
#[derive(Copy, Clone, Debug)]
pub struct MockOrigDstAddr(SocketAddr);
impl From<SocketAddr> for MockOrigDstAddr {
fn from(addr: SocketAddr) -> Self {
MockOrigDstAddr(addr)
}
}
impl GetOrigDstAddr for MockOrigDstAddr {
fn orig_dst_addr(&self, _: &TcpStream) -> Option<OrigDstAddr> {
Some(OrigDstAddr(self.0))
}
self.server
}
}

View File

@ -0,0 +1,191 @@
use crate::{
addrs::*,
listen::{self, Bind, Bound},
};
use futures::prelude::*;
use linkerd_io as io;
use linkerd_stack::Param;
use std::pin::Pin;
use tokio::net::TcpStream;
#[derive(Copy, Clone, Debug, Default)]
pub struct BindWithOrigDst<B = listen::BindTcp> {
inner: B,
}
#[derive(Clone, Debug)]
pub struct Addrs<A = listen::Addrs> {
pub inner: A,
pub orig_dst: OrigDstAddr,
}
// === impl Addrs ===
impl<A> Param<OrigDstAddr> for Addrs<A> {
fn param(&self) -> OrigDstAddr {
self.orig_dst
}
}
impl<A> Param<Remote<ClientAddr>> for Addrs<A>
where
A: Param<Remote<ClientAddr>>,
{
fn param(&self) -> Remote<ClientAddr> {
self.inner.param()
}
}
impl<A> Param<Local<ServerAddr>> for Addrs<A>
where
A: Param<Local<ServerAddr>>,
{
fn param(&self) -> Local<ServerAddr> {
self.inner.param()
}
}
// === impl WithOrigDst ===
impl<B> From<B> for BindWithOrigDst<B> {
fn from(inner: B) -> Self {
Self { inner }
}
}
impl<T, B> Bind<T> for BindWithOrigDst<B>
where
B: Bind<T, Io = TcpStream> + 'static,
{
type Addrs = Addrs<B::Addrs>;
type Io = TcpStream;
type Incoming =
Pin<Box<dyn Stream<Item = io::Result<(Self::Addrs, TcpStream)>> + Send + Sync + 'static>>;
fn bind(self, t: &T) -> io::Result<Bound<Self::Incoming>> {
let (addr, incoming) = self.inner.bind(t)?;
let incoming = incoming.map(|res| {
let (inner, tcp) = res?;
let orig_dst = orig_dst_addr(&tcp)?;
let addrs = Addrs { inner, orig_dst };
Ok((addrs, tcp))
});
Ok((addr, Box::pin(incoming)))
}
}
#[cfg(target_os = "linux")]
fn orig_dst_addr(sock: &TcpStream) -> io::Result<OrigDstAddr> {
use std::os::unix::io::AsRawFd;
let fd = sock.as_raw_fd();
let r = unsafe { linux::so_original_dst(fd) };
r.map(OrigDstAddr)
}
#[cfg(not(target_os = "linux"))]
fn orig_dst_addr(&self, _: &TcpStream) -> io::Result<OrigDstAddr> {
io::Error::new(
io::ErrorKind::Other,
"SO_ORIGINAL_DST not supported on this operating system",
)
}
#[cfg(target_os = "linux")]
mod linux {
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use std::os::unix::io::RawFd;
use std::{io, mem};
use tracing::warn;
pub unsafe fn so_original_dst(fd: RawFd) -> io::Result<SocketAddr> {
let mut sockaddr: libc::sockaddr_storage = mem::zeroed();
let mut socklen: libc::socklen_t = mem::size_of::<libc::sockaddr_storage>() as u32;
let ret = libc::getsockopt(
fd,
libc::SOL_IP,
libc::SO_ORIGINAL_DST,
&mut sockaddr as *mut _ as *mut _,
&mut socklen as *mut _ as *mut _,
);
if ret != 0 {
let e = io::Error::last_os_error();
warn!("failed to read SO_ORIGINAL_DST: {:?}", e);
return Err(e);
}
mk_addr(&sockaddr, socklen)
}
// Borrowed with love from net2-rs
// https://github.com/rust-lang-nursery/net2-rs/blob/1b4cb4fb05fbad750b271f38221eab583b666e5e/src/socket.rs#L103
//
// Copyright (c) 2014 The Rust Project Developers
fn mk_addr(storage: &libc::sockaddr_storage, len: libc::socklen_t) -> io::Result<SocketAddr> {
match storage.ss_family as libc::c_int {
libc::AF_INET => {
assert!(len as usize >= mem::size_of::<libc::sockaddr_in>());
let sa = {
let sa = storage as *const _ as *const libc::sockaddr_in;
unsafe { *sa }
};
let bits = ntoh32(sa.sin_addr.s_addr);
let ip = Ipv4Addr::new(
(bits >> 24) as u8,
(bits >> 16) as u8,
(bits >> 8) as u8,
bits as u8,
);
let port = sa.sin_port;
Ok(SocketAddr::V4(SocketAddrV4::new(ip, ntoh16(port))))
}
libc::AF_INET6 => {
assert!(len as usize >= mem::size_of::<libc::sockaddr_in6>());
let sa = {
let sa = storage as *const _ as *const libc::sockaddr_in6;
unsafe { *sa }
};
let arr = sa.sin6_addr.s6_addr;
let ip = Ipv6Addr::new(
(arr[0] as u16) << 8 | (arr[1] as u16),
(arr[2] as u16) << 8 | (arr[3] as u16),
(arr[4] as u16) << 8 | (arr[5] as u16),
(arr[6] as u16) << 8 | (arr[7] as u16),
(arr[8] as u16) << 8 | (arr[9] as u16),
(arr[10] as u16) << 8 | (arr[11] as u16),
(arr[12] as u16) << 8 | (arr[13] as u16),
(arr[14] as u16) << 8 | (arr[15] as u16),
);
let port = sa.sin6_port;
let flowinfo = sa.sin6_flowinfo;
let scope_id = sa.sin6_scope_id;
Ok(SocketAddr::V6(SocketAddrV6::new(
ip,
ntoh16(port),
flowinfo,
scope_id,
)))
}
_ => Err(io::Error::new(
io::ErrorKind::InvalidInput,
"invalid argument",
)),
}
}
fn ntoh16(i: u16) -> u16 {
<u16>::from_be(i)
}
fn ntoh32(i: u32) -> u32 {
<u32>::from_be(i)
}
}

View File

@ -11,7 +11,9 @@ use linkerd_error::Never;
use linkerd_identity as id;
use linkerd_io::{self as io, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use linkerd_proxy_transport::{
listen::Addrs, BindTcp, ConnectTcp, Keepalive, ListenAddr, Remote, ServerAddr,
addrs::*,
listen::{Addrs, Bind, BindTcp},
ConnectTcp, Keepalive, ListenAddr,
};
use linkerd_stack::{NewService, Param};
use linkerd_tls as tls;
@ -156,12 +158,6 @@ where
// Saves the result of every connection.
let (sender, receiver) = mpsc::channel::<Transported<tls::ConditionalServerTls, SR>>();
// Let the OS decide the port number and then return the resulting
// `SocketAddr` so the client can connect to it. This allows multiple
// tests to run at once, which wouldn't work if they all were bound on
// a fixed port.
let addr = "127.0.0.1:0".parse::<SocketAddr>().unwrap();
let mut detect = tls::NewDetectTls::new(
server_tls.map(Tls),
move |meta: tls::server::Meta<Addrs>| {
@ -188,18 +184,16 @@ where
std::time::Duration::from_secs(10),
);
let (listen_addr, listen) = BindTcp::new(ListenAddr(addr), Keepalive(None))
.bind()
.expect("must bind");
let (listen_addr, listen) = BindTcp::default().bind(&Server).expect("must bind");
let server = async move {
futures::pin_mut!(listen);
let (meta, io) = listen
let (addrs, io) = listen
.next()
.await
.expect("listen failed")
.expect("listener closed");
tracing::debug!("incoming connection");
let accept = detect.new_service(meta);
let accept = detect.new_service(addrs);
accept.oneshot(io).await.expect("connection failed");
tracing::debug!("done");
}
@ -220,7 +214,7 @@ where
let client = async move {
let conn = tls::Client::layer(client_tls)
.layer(ConnectTcp::new(Keepalive(None)))
.oneshot(Target(server_addr, client_server_id.map(Into::into)))
.oneshot(Target(server_addr.into(), client_server_id.map(Into::into)))
.await;
match conn {
Err(e) => {
@ -306,12 +300,17 @@ const PING: &[u8] = b"ping";
const PONG: &[u8] = b"pong";
const START_OF_TLS: &[u8] = &[22, 3, 1]; // ContentType::handshake version 3.1
#[derive(Copy, Clone, Debug)]
struct Server;
#[derive(Clone)]
struct Target(SocketAddr, tls::ConditionalClientTls);
#[derive(Clone)]
struct Tls(id::CrtKey);
// === impl Target ===
impl Param<Remote<ServerAddr>> for Target {
fn param(&self) -> Remote<ServerAddr> {
Remote(ServerAddr(self.0))
@ -324,6 +323,8 @@ impl Param<tls::ConditionalClientTls> for Target {
}
}
// === impl Tls ===
impl Param<tls::client::Config> for Tls {
fn param(&self) -> tls::client::Config {
self.0.client_config()
@ -341,3 +342,20 @@ impl Param<tls::LocalId> for Tls {
self.0.id().clone()
}
}
// === impl Server ===
impl Param<ListenAddr> for Server {
fn param(&self) -> ListenAddr {
// Let the OS decide the port number and then return the resulting
// `SocketAddr` so the client can connect to it. This allows multiple
// tests to run at once, which wouldn't work if they all were bound on
// a fixed port.
ListenAddr(([127, 0, 0, 1], 0).into())
}
}
impl Param<Keepalive> for Server {
fn param(&self) -> Keepalive {
Keepalive(None)
}
}

View File

@ -9,7 +9,6 @@ description = "The main proxy executable"
[features]
default = ["multicore"]
mock-orig-dst = ["linkerd-app/mock-orig-dst"]
multicore = ["tokio/rt-multi-thread", "num_cpus"]
[dependencies]

View File

@ -3,7 +3,7 @@
#![deny(warnings, rust_2018_idioms)]
#![type_length_limit = "16289823"]
use linkerd_app::{trace, Config};
use linkerd_app::{core::transport::BindTcp, trace, Config};
use linkerd_signal as signal;
use tokio::sync::mpsc;
pub use tracing::{debug, error, info, warn};
@ -14,22 +14,36 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
mod rt;
const EX_USAGE: i32 = 64;
fn main() {
let trace = trace::init();
let trace = match trace::init() {
Ok(t) => t,
Err(e) => {
eprintln!("Invalid logging configuration: {}", e);
std::process::exit(EX_USAGE);
}
};
// Load configuration from the environment without binding ports.
let config = match Config::try_from_env() {
Ok(config) => config,
Err(e) => {
eprintln!("Invalid configuration: {}", e);
const EX_USAGE: i32 = 64;
std::process::exit(EX_USAGE);
}
};
// Builds a runtime with the appropriate number of cores:
// `LINKERD2_PROXY_CORES` env or the number of available CPUs (as provided
// by cgroups, when possible).
rt::build().block_on(async move {
let (shutdown_tx, mut shutdown_rx) = mpsc::unbounded_channel();
let app = match async move { config.build(shutdown_tx, trace?).await }.await {
let bind = BindTcp::with_orig_dst();
let app = match config
.build(bind, bind, BindTcp::default(), shutdown_tx, trace)
.await
{
Ok(app) => app,
Err(e) => {
eprintln!("Initialization failure: {}", e);