From 07e9249769474f19e11904f9beed29a01dda7fe5 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 31 Mar 2021 14:30:36 -0700 Subject: [PATCH] transport: introduce Bind trait and move orig dst to the type level (#957) This branch introduces a new `transport::listen::Bind` trait that abstracts over binding and listening on a socket. Now, rather than owning an instance of the `BindTcp` type, `ServerConfig`s are passed as a parameter to a type implementing the `Bind` trait. This will eventually allow end-to-end testing without creating actual TCP sockets, as `Bind` produces a `Stream` of an IO type and an `Addrs` type. PR #955 changed the proxy stacks to be a target type which implements the `Param` trait for various types of address, rather than a fixed type. This PR also changes the inbound and outbound proxies to take targets which implement `Param`, rather than requiring `Param>` and punting fallibility to every point where original dst addresses are used. The `BindTcp` type is now generic over a more general `GetAddrs` trait, rather than a `SO_ORIGINAL_DST`-specific `OrigDstAddr` trait. Rather than always producing a `listen::Addrs` with an `Option`, we now produce a variable addresses type, which may or may not incldue the original destination address (the admin and tap servers don't require SO_ORIGINAL_DST). This allows whether or not original destination addresses are included to be determined at compile time rather than at runtime. Finally, the incorrect feature flagging of mock original destination addresses has been fixed, The "mock-orig-dst" feature has been removed, and the integration tests now simply construct their own mock `GetAddrs` function that's used to configure the test proxy. In the future, we should be able to refactor the tests to avoid this by simply passing in their own mock `Bind` types which produce streams of in-memory mock connections, rather than actual TCP connections. Signed-off-by: Eliza Weisman Co-authored-by: Oliver Gould --- Dockerfile | 4 +- linkerd/app/Cargo.toml | 1 - linkerd/app/core/Cargo.toml | 3 - linkerd/app/core/src/config.rs | 29 ++- linkerd/app/core/src/serve.rs | 26 +- linkerd/app/inbound/src/direct.rs | 13 +- linkerd/app/inbound/src/http/tests.rs | 6 +- linkerd/app/inbound/src/lib.rs | 63 ++--- linkerd/app/inbound/src/prevent_loop.rs | 10 +- linkerd/app/inbound/src/target.rs | 56 +---- linkerd/app/inbound/src/test_util.rs | 15 +- linkerd/app/integration/Cargo.toml | 4 +- linkerd/app/integration/src/proxy.rs | 65 +++-- linkerd/app/outbound/src/discover.rs | 2 +- linkerd/app/outbound/src/http/tests.rs | 23 +- linkerd/app/outbound/src/ingress.rs | 10 +- linkerd/app/outbound/src/lib.rs | 26 +- linkerd/app/outbound/src/tcp/mod.rs | 17 -- linkerd/app/outbound/src/tcp/tests.rs | 43 ++-- linkerd/app/outbound/src/test_util.rs | 24 +- linkerd/app/src/admin.rs | 30 ++- linkerd/app/src/env.rs | 80 ++---- linkerd/app/src/lib.rs | 62 +++-- linkerd/app/src/tap.rs | 66 +++-- linkerd/app/test/Cargo.toml | 2 +- linkerd/proxy/tap/src/accept.rs | 8 +- linkerd/proxy/transport/Cargo.toml | 5 - linkerd/proxy/transport/src/lib.rs | 4 +- linkerd/proxy/transport/src/listen.rs | 315 ++++-------------------- linkerd/proxy/transport/src/orig_dst.rs | 191 ++++++++++++++ linkerd/tls/tests/tls_accept.rs | 44 +++- linkerd2-proxy/Cargo.toml | 1 - linkerd2-proxy/src/main.rs | 22 +- 33 files changed, 633 insertions(+), 637 deletions(-) create mode 100644 linkerd/proxy/transport/src/orig_dst.rs diff --git a/Dockerfile b/Dockerfile index 236940473..bfb4ee0ab 100644 --- a/Dockerfile +++ b/Dockerfile @@ -31,9 +31,7 @@ FROM $RUST_IMAGE as build # When set, causes the proxy to be compiled in development mode. ARG PROXY_UNOPTIMIZED -# Controls what features are enabled in the proxy. This is typically empty but -# may be set to `mock-orig-dst` for profiling builds, or to `multithreaded` to -# enable the multithreaded Tokio runtime. +# Controls what features are enabled in the proxy. ARG PROXY_FEATURES RUN --mount=type=cache,target=/var/lib/apt/lists \ diff --git a/linkerd/app/Cargo.toml b/linkerd/app/Cargo.toml index c8871063a..d2ca68f37 100644 --- a/linkerd/app/Cargo.toml +++ b/linkerd/app/Cargo.toml @@ -13,7 +13,6 @@ This is used by tests and the executable. [features] allow-loopback = ["linkerd-app-outbound/allow-loopback"] -mock-orig-dst = ["linkerd-app-core/mock-orig-dst"] [dependencies] futures = "0.3.9" diff --git a/linkerd/app/core/Cargo.toml b/linkerd/app/core/Cargo.toml index af2669719..bb0bd6b10 100644 --- a/linkerd/app/core/Cargo.toml +++ b/linkerd/app/core/Cargo.toml @@ -12,9 +12,6 @@ This crate conglomerates proxy configuration, runtime administration, etc, independently of the inbound and outbound proxy logic. """ -[features] -mock-orig-dst = ["linkerd-proxy-transport/mock-orig-dst"] - [dependencies] bytes = "1" http = "0.2" diff --git a/linkerd/app/core/src/config.rs b/linkerd/app/core/src/config.rs index 3df0e7524..b8f3f381c 100644 --- a/linkerd/app/core/src/config.rs +++ b/linkerd/app/core/src/config.rs @@ -1,11 +1,15 @@ pub use crate::exp_backoff::ExponentialBackoff; -pub use crate::proxy::http::{h1, h2}; -pub use crate::transport::{BindTcp, DefaultOrigDstAddr, GetOrigDstAddr, Keepalive, NoOrigDstAddr}; +use crate::{ + proxy::http::{h1, h2}, + svc::Param, + transport::{Keepalive, ListenAddr}, +}; use std::time::Duration; #[derive(Clone, Debug)] -pub struct ServerConfig { - pub bind: BindTcp, +pub struct ServerConfig { + pub addr: ListenAddr, + pub keepalive: Keepalive, pub h2_settings: h2::Settings, } @@ -20,7 +24,7 @@ pub struct ConnectConfig { #[derive(Clone, Debug)] pub struct ProxyConfig { - pub server: ServerConfig, + pub server: ServerConfig, pub connect: ConnectConfig, pub buffer_capacity: usize, pub cache_max_idle_age: Duration, @@ -31,11 +35,14 @@ pub struct ProxyConfig { // === impl ServerConfig === -impl ServerConfig { - pub fn with_orig_dst_addr(self, orig_dst_addrs: B) -> ServerConfig { - ServerConfig { - bind: self.bind.with_orig_dst_addr(orig_dst_addrs), - h2_settings: self.h2_settings, - } +impl Param for ServerConfig { + fn param(&self) -> ListenAddr { + self.addr + } +} + +impl Param for ServerConfig { + fn param(&self) -> Keepalive { + self.keepalive } } diff --git a/linkerd/app/core/src/serve.rs b/linkerd/app/core/src/serve.rs index a76e9cce5..6eddcb7a0 100644 --- a/linkerd/app/core/src/serve.rs +++ b/linkerd/app/core/src/serve.rs @@ -1,26 +1,28 @@ -use crate::io; -use crate::svc; +use crate::{ + io, + svc::{self, Param}, + transport::{ClientAddr, Remote}, +}; use futures::prelude::*; use linkerd_error::Error; -use linkerd_proxy_transport::listen::Addrs; use tower::util::ServiceExt; -use tracing::instrument::Instrument; -use tracing::{debug, debug_span, info, warn}; +use tracing::{debug, debug_span, info, instrument::Instrument, warn}; /// Spawns a task that binds an `L`-typed listener with an `A`-typed /// connection-accepting service. /// /// The task is driven until shutdown is signaled. -pub async fn serve( - listen: impl Stream>, +pub async fn serve( + listen: impl Stream>, mut new_accept: M, shutdown: impl Future, ) where I: Send + 'static, - M: svc::NewService, - A: tower::Service, Response = ()> + Send + 'static, - A::Error: Into, - A::Future: Send + 'static, + A: Param>, + M: svc::NewService, + S: tower::Service, Response = ()> + Send + 'static, + S::Error: Into, + S::Future: Send + 'static, { let accept = async move { futures::pin_mut!(listen); @@ -38,7 +40,7 @@ pub async fn serve( }; // The local addr should be instrumented from the listener's context. - let span = debug_span!("accept", client.addr = %addrs.client()); + let span = debug_span!("accept", client.addr = %addrs.param()); let accept = span.in_scope(|| new_accept.new_service(addrs)); diff --git a/linkerd/app/inbound/src/direct.rs b/linkerd/app/inbound/src/direct.rs index f597e799f..90c72171e 100644 --- a/linkerd/app/inbound/src/direct.rs +++ b/linkerd/app/inbound/src/direct.rs @@ -71,7 +71,7 @@ impl Inbound { > + Clone, > where - T: Param> + Param> + Clone + Send + 'static, + T: Param> + Param + Clone + Send + 'static, I: io::AsyncRead + io::AsyncWrite + io::Peek + io::PeerAddr, I: Debug + Send + Sync + Unpin + 'static, N: svc::NewService + Clone + Send + Sync + Unpin + 'static, @@ -173,7 +173,7 @@ impl Inbound { impl TryFrom<(tls::ConditionalServerTls, T)> for ClientInfo where - T: Param>, + T: Param, T: Param>, { type Error = Error; @@ -184,14 +184,7 @@ where client_id: Some(client_id), negotiated_protocol, }) => { - let local: Option = addrs.param(); - let OrigDstAddr(local_addr) = local.ok_or_else(|| { - tracing::warn!("No SO_ORIGINAL_DST address found!"); - std::io::Error::new( - std::io::ErrorKind::NotFound, - "No SO_ORIGINAL_DST address found", - ) - })?; + let OrigDstAddr(local_addr) = addrs.param(); Ok(Self { client_id, alpn: negotiated_protocol, diff --git a/linkerd/app/inbound/src/http/tests.rs b/linkerd/app/inbound/src/http/tests.rs index 5b4a269af..bf08e46af 100644 --- a/linkerd/app/inbound/src/http/tests.rs +++ b/linkerd/app/inbound/src/http/tests.rs @@ -71,7 +71,7 @@ async fn unmeshed_http1_hello_world() { profile_tx.send(profile::Profile::default()).unwrap(); // Build the outbound server - let cfg = default_config(accept.tcp.target_addr); + let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(accept); let (mut client, bg) = http_util::connect_and_accept(&mut client, server).await; @@ -118,7 +118,7 @@ async fn downgrade_origin_form() { profile_tx.send(profile::Profile::default()).unwrap(); // Build the outbound server - let cfg = default_config(accept.tcp.target_addr); + let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(accept); let (mut client, bg) = http_util::connect_and_accept(&mut client, server).await; @@ -166,7 +166,7 @@ async fn downgrade_absolute_form() { profile_tx.send(profile::Profile::default()).unwrap(); // Build the outbound server - let cfg = default_config(accept.tcp.target_addr); + let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(accept); let (mut client, bg) = http_util::connect_and_accept(&mut client, server).await; diff --git a/linkerd/app/inbound/src/lib.rs b/linkerd/app/inbound/src/lib.rs index 11df30f63..a0f087e72 100644 --- a/linkerd/app/inbound/src/lib.rs +++ b/linkerd/app/inbound/src/lib.rs @@ -21,16 +21,14 @@ use self::{ target::{HttpAccept, TcpAccept}, }; use linkerd_app_core::{ - config::{ConnectConfig, ProxyConfig}, + config::{ConnectConfig, ProxyConfig, ServerConfig}, detect, drain, io, metrics, profiles, proxy::tcp, - serve, - svc::{self, Param}, - tls, - transport::{self, ClientAddr, OrigDstAddr, Remote, ServerAddr}, + serve, svc, tls, + transport::{self, listen::Bind, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr}, Error, NameMatch, ProxyRuntime, }; -use std::{convert::TryFrom, fmt::Debug, future::Future, net::SocketAddr, time::Duration}; +use std::{convert::TryFrom, fmt::Debug, future::Future, time::Duration}; use tracing::{debug_span, info_span}; #[derive(Clone, Debug)] @@ -52,7 +50,7 @@ pub struct Inbound { stack: svc::Stack, } -// === impl Config === +// === impl Inbound === impl Inbound { pub fn config(&self) -> &Config { @@ -89,7 +87,7 @@ impl Inbound<()> { } } - pub fn to_tcp_connect>( + pub fn to_tcp_connect>( &self, ) -> Inbound< impl svc::Service< @@ -122,35 +120,34 @@ impl Inbound<()> { } } - pub fn serve( + pub fn serve( self, + bind: B, profiles: P, gateway: G, - ) -> (SocketAddr, impl Future + Send) + ) -> (Local, impl Future + Send) where + B: Bind, + B::Addrs: svc::Param> + + svc::Param> + + svc::Param, G: svc::NewService, G: Clone + Send + Sync + Unpin + 'static, - GSvc: svc::Service>, Response = ()> - + Send - + 'static, + GSvc: svc::Service>, Response = ()> + Send + 'static, GSvc::Error: Into, GSvc::Future: Send, P: profiles::GetProfile + Clone + Send + Sync + 'static, P::Error: Send, P::Future: Send, { - let (listen_addr, listen) = self - .config - .proxy - .server - .bind - .bind() + let (listen_addr, listen) = bind + .bind(&self.config.proxy.server) .expect("Failed to bind inbound listener"); let serve = async move { - let stack = self - .to_tcp_connect() - .into_server(listen_addr.port(), profiles, gateway); + let stack = + self.to_tcp_connect() + .into_server(listen_addr.as_ref().port(), profiles, gateway); let shutdown = self.runtime.drain.signaled(); serve::serve(listen, stack, shutdown).await }; @@ -218,7 +215,7 @@ where Service = impl svc::Service, > + Clone where - T: Param> + Param> + Clone + Send + 'static, + T: svc::Param> + svc::Param + Clone + Send + 'static, I: io::AsyncRead + io::AsyncWrite + io::Peek + io::PeerAddr, I: Debug + Send + Sync + Unpin + 'static, G: svc::NewService, @@ -267,7 +264,7 @@ where .stack .push_map_target(TcpEndpoint::from) .push(self.runtime.metrics.transport.layer_accept()) - .push_request_filter(TcpAccept::port_skipped) + .push_map_target(TcpAccept::port_skipped) .check_new_service::() .instrument(|_: &T| debug_span!("forward")) .into_inner(), @@ -282,13 +279,9 @@ where .into_inner(), ) .instrument(|a: &T| { - if let Some(OrigDstAddr(target_addr)) = a.param() { - info_span!("server", port = target_addr.port()) - } else { - info_span!("server", port = %"") - } + let OrigDstAddr(target_addr) = a.param(); + info_span!("server", port = target_addr.port()) }) - .check_new_service::() .into_inner() } } @@ -303,18 +296,12 @@ impl From> for SkipByPort { impl svc::Predicate for SkipByPort where - T: Param>, + T: svc::Param, { type Request = svc::Either; fn check(&mut self, t: T) -> Result { - let OrigDstAddr(addr) = t.param().ok_or_else(|| { - tracing::warn!("No SO_ORIGINAL_DST address found!"); - std::io::Error::new( - std::io::ErrorKind::NotFound, - "No SO_ORIGINAL_DST address found", - ) - })?; + let OrigDstAddr(addr) = t.param(); if !self.0.contains(&addr.port()) { Ok(svc::Either::A(t)) } else { diff --git a/linkerd/app/inbound/src/prevent_loop.rs b/linkerd/app/inbound/src/prevent_loop.rs index 85c7e1c8a..ce8108441 100644 --- a/linkerd/app/inbound/src/prevent_loop.rs +++ b/linkerd/app/inbound/src/prevent_loop.rs @@ -44,17 +44,11 @@ impl PreventLoop { } } -impl>> Predicate for SwitchLoop { +impl> Predicate for SwitchLoop { type Request = Either; fn check(&mut self, addrs: T) -> Result, Error> { - let OrigDstAddr(addr) = addrs.param().ok_or_else(|| { - tracing::warn!("No SO_ORIGINAL_DST address found!"); - std::io::Error::new( - std::io::ErrorKind::NotFound, - "No SO_ORIGINAL_DST address found", - ) - })?; + let OrigDstAddr(addr) = addrs.param(); tracing::debug!(%addr, self.port); if addr.port() != self.port { Ok(Either::A(addrs)) diff --git a/linkerd/app/inbound/src/target.rs b/linkerd/app/inbound/src/target.rs index 0553270dd..b39021b02 100644 --- a/linkerd/app/inbound/src/target.rs +++ b/linkerd/app/inbound/src/target.rs @@ -10,13 +10,7 @@ use linkerd_app_core::{ transport_header::TransportHeader, Addr, Conditional, Error, CANONICAL_DST_HEADER, DST_OVERRIDE_HEADER, }; -use std::{ - convert::{TryFrom, TryInto}, - io, - net::SocketAddr, - str::FromStr, - sync::Arc, -}; +use std::{convert::TryInto, net::SocketAddr, str::FromStr, sync::Arc}; use tracing::debug; #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -66,30 +60,25 @@ pub struct RequestTarget { // === impl TcpAccept === impl TcpAccept { - pub fn port_skipped(tcp: T) -> Result + pub fn port_skipped(tcp: T) -> Self where - T: Param> + Param>, + T: Param> + Param, { - let orig_dst: Option = tcp.param(); - let OrigDstAddr(target_addr) = orig_dst.ok_or_else(|| { - tracing::warn!("No SO_ORIGINAL_DST address found!"); - io::Error::new(io::ErrorKind::NotFound, "No SO_ORIGINAL_DST address found") - })?; - Ok(Self { + let OrigDstAddr(target_addr) = tcp.param(); + Self { target_addr, client_addr: tcp.param(), tls: Conditional::None(tls::NoServerTls::PortSkipped), - }) + } } +} - /// Returns a `TcpAccept` for the provided TLS metadata and addresses, - /// determining the target address from the server's local listener address - /// rather than a `SO_ORIGINAL_DST` address. - pub fn from_local_addr((tls, addrs): tls::server::Meta) -> Self - where - T: Param> + Param>, - { - let Local(ServerAddr(target_addr)) = addrs.param(); +impl From> for TcpAccept +where + T: Param> + Param, +{ + fn from((tls, addrs): tls::server::Meta) -> Self { + let OrigDstAddr(target_addr) = addrs.param(); Self { target_addr, client_addr: addrs.param(), @@ -98,25 +87,6 @@ impl TcpAccept { } } -impl TryFrom> for TcpAccept -where - T: Param> + Param>, -{ - type Error = io::Error; - fn try_from((tls, addrs): tls::server::Meta) -> Result { - let orig_dst: Option = addrs.param(); - let OrigDstAddr(target_addr) = orig_dst.ok_or_else(|| { - tracing::warn!("No SO_ORIGINAL_DST address found!"); - io::Error::new(io::ErrorKind::NotFound, "No SO_ORIGINAL_DST address found") - })?; - Ok(Self { - target_addr, - client_addr: addrs.param(), - tls, - }) - } -} - impl Param for TcpAccept { fn param(&self) -> SocketAddr { self.target_addr diff --git a/linkerd/app/inbound/src/test_util.rs b/linkerd/app/inbound/src/test_util.rs index 1d5bce408..f6c766fc8 100644 --- a/linkerd/app/inbound/src/test_util.rs +++ b/linkerd/app/inbound/src/test_util.rs @@ -8,15 +8,13 @@ use linkerd_app_core::{ http::{h1, h2}, tap, }, - transport::{BindTcp, Keepalive, ListenAddr}, + transport::{Keepalive, ListenAddr}, NameMatch, ProxyRuntime, }; pub use linkerd_app_test as support; -use std::{net::SocketAddr, time::Duration}; +use std::time::Duration; -const LOCALHOST: [u8; 4] = [127, 0, 0, 1]; - -pub fn default_config(orig_dst: SocketAddr) -> Config { +pub fn default_config() -> Config { let cluster_local = "svc.cluster.local." .parse::() .expect("`svc.cluster.local.` suffix is definitely valid"); @@ -24,11 +22,8 @@ pub fn default_config(orig_dst: SocketAddr) -> Config { allow_discovery: NameMatch::new(Some(cluster_local)), proxy: config::ProxyConfig { server: config::ServerConfig { - bind: BindTcp::new( - ListenAddr(SocketAddr::new(LOCALHOST.into(), 0)), - Keepalive(None), - ) - .with_orig_dst_addr(orig_dst.into()), + addr: ListenAddr(([0, 0, 0, 0], 0).into()), + keepalive: Keepalive(None), h2_settings: h2::Settings::default(), }, connect: config::ConnectConfig { diff --git a/linkerd/app/integration/Cargo.toml b/linkerd/app/integration/Cargo.toml index ed3abbfc2..417cf7abe 100644 --- a/linkerd/app/integration/Cargo.toml +++ b/linkerd/app/integration/Cargo.toml @@ -24,8 +24,8 @@ http = "0.2" http-body = "0.4" hyper = { version = "0.14.2", features = ["http1", "http2", "stream", "client", "server"] } linkerd-channel = { path = "../../channel" } -linkerd-app = { path = "..", features = ["allow-loopback", "mock-orig-dst"] } -linkerd-app-core = { path = "../core", features = ["mock-orig-dst"] } +linkerd-app = { path = "..", features = ["allow-loopback"] } +linkerd-app-core = { path = "../core" } linkerd-metrics = { path = "../../metrics", features = ["test_util"] } linkerd2-proxy-api = { git = "https://github.com/linkerd/linkerd2-proxy-api", tag = "v0.1.18", features = ["arbitrary"] } linkerd-app-test = { path = "../test" } diff --git a/linkerd/app/integration/src/proxy.rs b/linkerd/app/integration/src/proxy.rs index 14910b2c1..78f7ecf3f 100644 --- a/linkerd/app/integration/src/proxy.rs +++ b/linkerd/app/integration/src/proxy.rs @@ -1,5 +1,11 @@ use super::*; -use std::{future::Future, pin::Pin, task::Poll, thread}; +use app_core::transport::OrigDstAddr; +use linkerd_app_core::{ + svc::Param, + transport::{listen, orig_dst, Keepalive, ListenAddr}, +}; +use std::{future::Future, net::SocketAddr, pin::Pin, task::Poll, thread}; +use tokio::net::TcpStream; use tracing::instrument::Instrument; pub fn new() -> Proxy { @@ -26,6 +32,9 @@ pub struct Proxy { shutdown_signal: Option + Send>>>, } +#[derive(Copy, Clone, Debug)] +struct MockOrigDst(Option); + pub struct Listening { pub tap: Option, pub inbound: SocketAddr, @@ -44,6 +53,35 @@ pub struct Listening { thread: thread::JoinHandle<()>, } +// === impl MockOrigDst === + +impl listen::Bind for MockOrigDst +where + T: Param + Param, +{ + type Addrs = orig_dst::Addrs; + type Io = tokio::net::TcpStream; + type Incoming = Pin< + Box> + Send + Sync + 'static>, + >; + + fn bind(self, params: &T) -> io::Result> { + let (bound, incoming) = listen::BindTcp::default().bind(params)?; + let addr = self.0; + let incoming = Box::pin(incoming.map(move |res| { + let (inner, tcp) = res?; + let orig_dst = addr + .map(OrigDstAddr) + .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "No mocked SO_ORIG_DST"))?; + let addrs = orig_dst::Addrs { inner, orig_dst }; + Ok((addrs, tcp)) + })); + Ok((bound, incoming)) + } +} + +// === impl Proxy === + impl Proxy { /// Pass a customized support `Controller` for this proxy to use. /// @@ -200,16 +238,6 @@ async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening { env.put(app::env::ENV_OUTBOUND_LISTEN_ADDR, "127.0.0.1:0".to_owned()); } - // If a given server is missing, use the admin server as a substitute. - env.put( - app::env::ENV_INBOUND_ORIG_DST_ADDR, - inbound.unwrap_or(controller.addr).to_string(), - ); - env.put( - app::env::ENV_OUTBOUND_ORIG_DST_ADDR, - outbound.unwrap_or(controller.addr).to_string(), - ); - if random_ports { env.put(app::env::ENV_INBOUND_LISTEN_ADDR, "127.0.0.1:0".to_owned()); env.put(app::env::ENV_CONTROL_LISTEN_ADDR, "127.0.0.1:0".to_owned()); @@ -264,6 +292,7 @@ async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening { } let config = app::env::parse_config(&env).unwrap(); + let dispatch = tracing::Dispatch::default(); let (trace, trace_handle) = if dispatch .downcast_ref::() @@ -301,9 +330,12 @@ async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening { .build() .expect("proxy") .block_on(async move { + let bind_in = MockOrigDst(inbound); + let bind_out = MockOrigDst(outbound); + let bind_adm = listen::BindTcp::default(); let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel(); let main = config - .build(shutdown_tx, trace_handle) + .build(bind_in, bind_out, bind_adm, shutdown_tx, trace_handle) .await .expect("config"); @@ -358,14 +390,13 @@ async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening { outbound.addr = ?outbound_addr, outbound.orig_dst = ?outbound.as_ref(), metrics.addr = ?metrics_addr, - "proxy running", ); Listening { - tap: tap_addr, - inbound: inbound_addr, - outbound: outbound_addr, - metrics: metrics_addr, + tap: tap_addr.map(Into::into), + inbound: inbound_addr.into(), + outbound: outbound_addr.into(), + metrics: metrics_addr.into(), outbound_server: proxy.outbound_server, inbound_server: proxy.inbound_server, diff --git a/linkerd/app/outbound/src/discover.rs b/linkerd/app/outbound/src/discover.rs index a69a5fec2..7ce444205 100644 --- a/linkerd/app/outbound/src/discover.rs +++ b/linkerd/app/outbound/src/discover.rs @@ -22,7 +22,7 @@ impl Outbound { >, > where - T: Param>, + T: Param, I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + std::fmt::Debug + Send + Unpin + 'static, N: svc::NewService + Clone + Send + 'static, NSvc: svc::Service, Response = (), Error = Error> + Send + 'static, diff --git a/linkerd/app/outbound/src/http/tests.rs b/linkerd/app/outbound/src/http/tests.rs index ef47aec55..a77d1b424 100644 --- a/linkerd/app/outbound/src/http/tests.rs +++ b/linkerd/app/outbound/src/http/tests.rs @@ -13,7 +13,7 @@ use linkerd_app_core::{ io, svc::{self, NewService}, tls, - transport::{listen, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr}, + transport::orig_dst, Error, NameAddr, ProxyRuntime, }; use std::{ @@ -34,7 +34,7 @@ fn build_server( resolver: resolver::Dst, connect: Connect, ) -> impl svc::NewService< - listen::Addrs, + orig_dst::Addrs, Service = impl tower::Service< I, Response = (), @@ -97,6 +97,7 @@ impl svc::Service for NoTcpBalancer { type Response = (); type Error = Error; type Future = Pin> + Send + 'static>>; + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { unreachable!("no TCP load balancer should be created in this test!"); } @@ -114,7 +115,7 @@ async fn profile_endpoint_propagates_conn_errors() { let ep1 = SocketAddr::new([10, 0, 0, 41].into(), 5550); - let cfg = default_config(ep1); + let cfg = default_config(); let id = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local") .expect("hostname is invalid"); let meta = support::resolver::Metadata::new( @@ -217,7 +218,7 @@ async fn meshed_hello_world() { let _trace = support::trace_init(); let ep1 = SocketAddr::new([10, 0, 0, 41].into(), 5550); - let cfg = default_config(ep1); + let cfg = default_config(); let id = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local") .expect("hostname is invalid"); let svc_addr = NameAddr::from_str("foo.ns1.svc.example.com:5550").unwrap(); @@ -267,7 +268,7 @@ async fn stacks_idle_out() { let ep1 = SocketAddr::new([10, 0, 0, 41].into(), 5550); let idle_timeout = Duration::from_millis(500); - let mut cfg = default_config(ep1); + let mut cfg = default_config(); cfg.proxy.cache_max_idle_age = idle_timeout; let id = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local") @@ -332,7 +333,7 @@ async fn active_stacks_dont_idle_out() { let ep1 = SocketAddr::new([10, 0, 0, 41].into(), 5550); let idle_timeout = Duration::from_millis(500); - let mut cfg = default_config(ep1); + let mut cfg = default_config(); cfg.proxy.cache_max_idle_age = idle_timeout; let id = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local") @@ -422,14 +423,6 @@ async fn active_stacks_dont_idle_out() { proxy_bg.await.unwrap(); } -pub fn addrs(od: SocketAddr) -> listen::Addrs { - listen::Addrs::new( - Local(ServerAddr(([127, 0, 0, 1], 4140).into())), - Remote(ClientAddr(([127, 0, 0, 1], 666).into())), - Some(OrigDstAddr(od)), - ) -} - async fn unmeshed_hello_world( server_settings: hyper::server::conn::Http, mut client_settings: ClientBuilder, @@ -437,7 +430,7 @@ async fn unmeshed_hello_world( let _trace = support::trace_init(); let ep1 = SocketAddr::new([10, 0, 0, 41].into(), 5550); - let cfg = default_config(ep1); + let cfg = default_config(); // Build a mock "connector" that returns the upstream "server" IO. let connect = support::connect().endpoint_fn_boxed(ep1, hello_server(server_settings)); diff --git a/linkerd/app/outbound/src/ingress.rs b/linkerd/app/outbound/src/ingress.rs index 73ca34791..7fc0424c0 100644 --- a/linkerd/app/outbound/src/ingress.rs +++ b/linkerd/app/outbound/src/ingress.rs @@ -5,10 +5,9 @@ use linkerd_app_core::{ io, profiles, svc::{self, stack::Param}, tls, - transport::{self, OrigDstAddr}, + transport::{self, ClientAddr, OrigDstAddr, Remote}, Addr, AddrMatch, Error, }; -use std::convert::TryFrom; use tracing::{debug_span, info_span}; impl Outbound<()> { @@ -28,7 +27,7 @@ impl Outbound<()> { Service = impl svc::Service, > where - T: Param>, + T: Param + Param> + Clone + Send + Sync + 'static, I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + std::fmt::Debug + Send + Unpin + 'static, N: svc::NewService + Clone + Send + Sync + 'static, NSvc: svc::Service>, Response = ()> @@ -136,7 +135,10 @@ impl Outbound<()> { )) .push(self.runtime.metrics.transport.layer_accept()) .instrument(|a: &tcp::Accept| info_span!("ingress", orig_dst = %a.orig_dst)) - .push_request_filter(|a: T| tcp::Accept::try_from(a.param())) + .push_map_target(|a: T| { + let orig_dst = Param::::param(&a); + tcp::Accept::from(orig_dst) + }) // Boxing is necessary purely to limit the link-time overhead of // having enormous types. .push(svc::BoxNewService::layer()) diff --git a/linkerd/app/outbound/src/lib.rs b/linkerd/app/outbound/src/lib.rs index a419c6e5b..ec4bab0bf 100644 --- a/linkerd/app/outbound/src/lib.rs +++ b/linkerd/app/outbound/src/lib.rs @@ -15,7 +15,7 @@ pub mod tcp; pub(crate) mod test_util; use linkerd_app_core::{ - config::ProxyConfig, + config::{ProxyConfig, ServerConfig}, io, metrics, profiles, proxy::{ api_resolve::{ConcreteAddr, Metadata}, @@ -24,10 +24,10 @@ use linkerd_app_core::{ serve, svc::{self, stack::Param}, tls, - transport::OrigDstAddr, + transport::{addrs::*, listen::Bind}, AddrMatch, Error, ProxyRuntime, }; -use std::{collections::HashMap, future::Future, net::SocketAddr, time::Duration}; +use std::{collections::HashMap, future::Future, time::Duration}; use tracing::info; const EWMA_DEFAULT_RTT: Duration = Duration::from_millis(30); @@ -103,7 +103,8 @@ impl Outbound { Service = impl svc::Service, > where - T: Param>, + Self: Clone + 'static, + T: Param + Param> + Clone + Send + Sync + 'static, S: Clone + Send + Sync + Unpin + 'static, S: svc::Service, S::Response: tls::HasNegotiatedProtocol, @@ -135,8 +136,15 @@ impl Outbound { } impl Outbound<()> { - pub fn serve(self, profiles: P, resolve: R) -> (SocketAddr, impl Future) + pub fn serve( + self, + bind: B, + profiles: P, + resolve: R, + ) -> (Local, impl Future) where + B: Bind, + B::Addrs: Param> + Param, R: Clone + Send + Sync + Unpin + 'static, R: Resolve, R::Resolution: Send, @@ -145,12 +153,8 @@ impl Outbound<()> { P::Future: Send, P::Error: Send, { - let (listen_addr, listen) = self - .config - .proxy - .server - .bind - .bind() + let (listen_addr, listen) = bind + .bind(&self.config.proxy.server) .expect("Failed to bind outbound listener"); let serve = async move { diff --git a/linkerd/app/outbound/src/tcp/mod.rs b/linkerd/app/outbound/src/tcp/mod.rs index 684b093ee..19da0d33e 100644 --- a/linkerd/app/outbound/src/tcp/mod.rs +++ b/linkerd/app/outbound/src/tcp/mod.rs @@ -23,23 +23,6 @@ impl From for Accept { } } -impl std::convert::TryFrom> for Accept { - type Error = std::io::Error; - - fn try_from(orig_dst: Option) -> Result { - match orig_dst { - Some(addr) => Ok(Self::from(addr)), - None => { - tracing::warn!("No SO_ORIGINAL_DST address found!"); - Err(std::io::Error::new( - std::io::ErrorKind::NotFound, - "No SO_ORIGINAL_DST address found", - )) - } - } - } -} - impl

From<(P, Accept)> for target::Accept

{ fn from((protocol, Accept { orig_dst, .. }): (P, Accept)) -> Self { Self { orig_dst, protocol } diff --git a/linkerd/app/outbound/src/tcp/tests.rs b/linkerd/app/outbound/src/tcp/tests.rs index 892de2d11..1b7e992ec 100644 --- a/linkerd/app/outbound/src/tcp/tests.rs +++ b/linkerd/app/outbound/src/tcp/tests.rs @@ -13,7 +13,7 @@ use linkerd_app_core::{ io, svc, svc::NewService, tls, - transport::{listen, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr}, + transport::{addrs::*, listen, orig_dst}, Addr, Conditional, Error, IpMatch, NameAddr, }; use std::{ @@ -62,7 +62,7 @@ async fn plaintext_tcp() { support::resolver().endpoint_exists(target_addr, target_addr, Default::default()); // Build the outbound TCP balancer stack. - let cfg = default_config(target_addr); + let cfg = default_config(); let (rt, _) = runtime(); Outbound::new(cfg, rt) .with_stack(connect) @@ -141,7 +141,7 @@ async fn tls_when_hinted() { // Build the outbound TCP balancer stack. let (rt, _) = runtime(); - let mut stack = Outbound::new(default_config(([0, 0, 0, 0], 0).into()), rt) + let mut stack = Outbound::new(default_config(), rt) .with_stack(connect) .push_tcp_logical(resolver) .into_inner(); @@ -164,7 +164,7 @@ async fn resolutions_are_reused() { let _trace = support::trace_init(); let addr = SocketAddr::new([0, 0, 0, 0].into(), 5550); - let cfg = default_config(addr); + let cfg = default_config(); let svc_addr = NameAddr::from_str("foo.ns1.svc.example.com:5550").unwrap(); // Build a mock "connector" that returns the upstream "server" IO. @@ -242,7 +242,7 @@ async fn load_balances() { ), ]; - let cfg = default_config(addr); + let cfg = default_config(); let svc_addr = NameAddr::from_str("foo.ns1.svc.example.com:5550").unwrap(); let id_name = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local") .expect("hostname is valid"); @@ -336,7 +336,7 @@ async fn load_balancer_add_endpoints() { ), ]; - let cfg = default_config(addr); + let cfg = default_config(); let svc_addr = NameAddr::from_str("foo.ns1.svc.example.com:5550").unwrap(); let id_name = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local") .expect("hostname is valid"); @@ -449,7 +449,7 @@ async fn load_balancer_remove_endpoints() { ), ]; - let cfg = default_config(addr); + let cfg = default_config(); let svc_addr = NameAddr::from_str("foo.ns1.svc.example.com:5550").unwrap(); let id_name = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local") .expect("hostname is valid"); @@ -542,7 +542,7 @@ async fn no_profiles_when_outside_search_nets() { let no_profile_addr = SocketAddr::new([126, 32, 5, 18].into(), 5550); let cfg = Config { allow_discovery: IpMatch::new(Some(IpNet::from_str("10.0.0.0/8").unwrap())).into(), - ..default_config(profile_addr) + ..default_config() }; let svc_addr = NameAddr::from_str("foo.ns1.svc.example.com:5550").unwrap(); let id_name = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local") @@ -616,7 +616,7 @@ async fn no_discovery_when_profile_has_an_endpoint() { let _trace = support::trace_init(); let ep = SocketAddr::new([10, 0, 0, 41].into(), 5550); - let cfg = default_config(ep); + let cfg = default_config(); let meta = support::resolver::Metadata::new( Default::default(), support::resolver::ProtocolHint::Unknown, @@ -661,7 +661,7 @@ async fn profile_endpoint_propagates_conn_errors() { let ep1 = SocketAddr::new([10, 0, 0, 41].into(), 5550); let ep2 = SocketAddr::new([10, 0, 0, 42].into(), 5550); - let cfg = default_config(ep1); + let cfg = default_config(); let id_name = tls::ServerId::from_str("foo.ns1.serviceaccount.identity.linkerd.cluster.local") .expect("hostname is invalid"); let meta = support::resolver::Metadata::new( @@ -697,12 +697,7 @@ async fn profile_endpoint_propagates_conn_errors() { // Build the outbound server let mut server = build_server(cfg, profiles, resolver, connect); - let svc = server.new_service(listen::Addrs::new( - Local(ServerAddr(([127, 0, 0, 1], 4140).into())), - Remote(ClientAddr(([127, 0, 0, 1], 666).into())), - Some(OrigDstAddr(ep1)), - )); - + let svc = server.new_service(addrs(ep1)); let res = svc .oneshot( support::io() @@ -765,7 +760,7 @@ fn build_server( resolver: resolver::Dst, connect: Connect, ) -> impl svc::NewService< - listen::Addrs, + orig_dst::Addrs, Service = impl tower::Service< I, Response = (), @@ -792,7 +787,7 @@ fn hello_world_client( new_svc: &mut N, ) -> impl Future + Send where - N: svc::NewService + Send + 'static, + N: svc::NewService + Send + 'static, S: svc::Service + Send + 'static, S::Error: Into, S::Future: Send + 'static, @@ -800,11 +795,13 @@ where let span = tracing::info_span!("hello_world_client", %orig_dst); let svc = { let _e = span.enter(); - let addrs = listen::Addrs::new( - Local(ServerAddr(([127, 0, 0, 1], 4140).into())), - Remote(ClientAddr(([127, 0, 0, 1], 666).into())), - Some(OrigDstAddr(orig_dst)), - ); + let addrs = orig_dst::Addrs { + orig_dst: OrigDstAddr(orig_dst), + inner: listen::Addrs { + server: Local(ServerAddr(([127, 0, 0, 1], 4140).into())), + client: Remote(ClientAddr(([127, 0, 0, 1], 666).into())), + }, + }; let svc = new_svc.new_service(addrs); tracing::trace!("new service"); svc diff --git a/linkerd/app/outbound/src/test_util.rs b/linkerd/app/outbound/src/test_util.rs index 5b36533e6..7c38becef 100644 --- a/linkerd/app/outbound/src/test_util.rs +++ b/linkerd/app/outbound/src/test_util.rs @@ -7,25 +7,20 @@ use linkerd_app_core::{ http::{h1, h2}, tap, }, - transport::{BindTcp, Keepalive, ListenAddr}, + transport::{self, orig_dst, Keepalive, ListenAddr}, IpMatch, ProxyRuntime, }; pub use linkerd_app_test as support; use std::{net::SocketAddr, str::FromStr, time::Duration}; -const LOCALHOST: [u8; 4] = [127, 0, 0, 1]; - -pub fn default_config(orig_dst: SocketAddr) -> Config { +pub fn default_config() -> Config { Config { ingress_mode: false, allow_discovery: IpMatch::new(Some(IpNet::from_str("0.0.0.0/0").unwrap())).into(), proxy: config::ProxyConfig { server: config::ServerConfig { - bind: BindTcp::new( - ListenAddr(SocketAddr::new(LOCALHOST.into(), 0)), - Keepalive(None), - ) - .with_orig_dst_addr(orig_dst.into()), + addr: ListenAddr(([0, 0, 0, 0], 0).into()), + keepalive: Keepalive(None), h2_settings: h2::Settings::default(), }, connect: config::ConnectConfig { @@ -65,3 +60,14 @@ pub fn runtime() -> (ProxyRuntime, drain::Signal) { }; (runtime, drain_tx) } + +pub fn addrs(od: SocketAddr) -> orig_dst::Addrs { + use transport::{addrs::*, listen}; + orig_dst::Addrs { + orig_dst: OrigDstAddr(od), + inner: listen::Addrs { + server: Local(ServerAddr(([127, 0, 0, 1], 4140).into())), + client: Remote(ClientAddr(([127, 0, 0, 1], 666).into())), + }, + } +} diff --git a/linkerd/app/src/admin.rs b/linkerd/app/src/admin.rs index 91d362d9a..7f9c88f04 100644 --- a/linkerd/app/src/admin.rs +++ b/linkerd/app/src/admin.rs @@ -3,17 +3,18 @@ use crate::core::{ config::ServerConfig, detect, drain, errors, metrics::{self, FmtMetrics}, - serve, tls, trace, - transport::listen, + serve, + svc::{self, Param}, + tls, trace, + transport::{listen::Bind, ClientAddr, Local, Remote, ServerAddr}, Error, }; use crate::{ http, identity::LocalCrtKey, inbound::target::{HttpAccept, Target, TcpAccept}, - svc, }; -use std::{net::SocketAddr, pin::Pin, time::Duration}; +use std::{pin::Pin, time::Duration}; use tokio::sync::mpsc; use tracing::debug; @@ -24,7 +25,7 @@ pub struct Config { } pub struct Admin { - pub listen_addr: SocketAddr, + pub listen_addr: Local, pub latch: admin::Latch, pub serve: Pin + Send + 'static>>, } @@ -32,8 +33,10 @@ pub struct Admin { // === impl Config === impl Config { - pub fn build( + #[allow(clippy::clippy::too_many_arguments)] + pub fn build( self, + bind: B, identity: Option, report: R, metrics: metrics::Proxy, @@ -43,10 +46,12 @@ impl Config { ) -> Result where R: FmtMetrics + Clone + Send + 'static + Unpin, + B: Bind, + B::Addrs: svc::Param> + svc::Param>, { const DETECT_TIMEOUT: Duration = Duration::from_secs(1); - let (listen_addr, listen) = self.server.bind.bind()?; + let (listen_addr, listen) = bind.bind(&self.server)?; let (ready, latch) = admin::Readiness::new(); let admin = admin::Admin::new(report, ready, shutdown, trace); @@ -76,8 +81,15 @@ impl Config { http::DetectHttp::default(), )) .push(metrics.transport.layer_accept()) - .push_map_target(TcpAccept::from_local_addr) - .check_new_clone::>() + .push_map_target(|(tls, addrs): (tls::ConditionalServerTls, B::Addrs)| { + // TODO this should use an admin-specific target type. + let Local(ServerAddr(target_addr)) = addrs.param(); + TcpAccept { + tls, + client_addr: addrs.param(), + target_addr, + } + }) .push(tls::NewDetectTls::layer(identity, DETECT_TIMEOUT)) .into_inner(); diff --git a/linkerd/app/src/env.rs b/linkerd/app/src/env.rs index 24e2464fd..5d7ffb21b 100644 --- a/linkerd/app/src/env.rs +++ b/linkerd/app/src/env.rs @@ -4,7 +4,7 @@ use crate::core::{ control::{Config as ControlConfig, ControlAddr}, proxy::http::{h1, h2}, tls, - transport::{BindTcp, Keepalive, ListenAddr}, + transport::{Keepalive, ListenAddr}, Addr, AddrMatch, Conditional, NameMatch, }; use crate::{dns, gateway, identity, inbound, oc_collector, outbound}; @@ -52,15 +52,6 @@ pub const ENV_INBOUND_LISTEN_ADDR: &str = "LINKERD2_PROXY_INBOUND_LISTEN_ADDR"; pub const ENV_CONTROL_LISTEN_ADDR: &str = "LINKERD2_PROXY_CONTROL_LISTEN_ADDR"; pub const ENV_ADMIN_LISTEN_ADDR: &str = "LINKERD2_PROXY_ADMIN_LISTEN_ADDR"; -// When compiled with the `mock-orig-dst` flag, these environment variables are required to -// configure the proxy's behavior. - -#[cfg(feature = "mock-orig-dst")] -pub const ENV_INBOUND_ORIG_DST_ADDR: &str = "LINKERD2_PROXY_INBOUND_ORIG_DST_ADDR"; - -#[cfg(feature = "mock-orig-dst")] -pub const ENV_OUTBOUND_ORIG_DST_ADDR: &str = "LINKERD2_PROXY_OUTBOUND_ORIG_DST_ADDR"; - pub const ENV_METRICS_RETAIN_IDLE: &str = "LINKERD2_PROXY_METRICS_RETAIN_IDLE"; const ENV_INGRESS_MODE: &str = "LINKERD2_PROXY_INGRESS_MODE"; @@ -262,12 +253,6 @@ pub fn parse_config(strings: &S) -> Result let inbound_connect_keepalive = parse(strings, ENV_INBOUND_CONNECT_KEEPALIVE, parse_duration); let outbound_connect_keepalive = parse(strings, ENV_OUTBOUND_CONNECT_KEEPALIVE, parse_duration); - #[cfg(feature = "mock-orig-dst")] - let (inbound_mock_orig_dst, outbound_mock_orig_dst) = ( - parse(strings, ENV_INBOUND_ORIG_DST_ADDR, parse_socket_addr), - parse(strings, ENV_OUTBOUND_ORIG_DST_ADDR, parse_socket_addr), - ); - let inbound_disable_ports = parse( strings, ENV_INBOUND_PORTS_DISABLE_PROTOCOL_DETECTION, @@ -360,26 +345,6 @@ pub fn parse_config(strings: &S) -> Result let buffer_capacity = buffer_capacity?.unwrap_or(DEFAULT_BUFFER_CAPACITY); - #[cfg(feature = "mock-orig-dst")] - let (inbound_orig_dst, outbound_orig_dst) = ( - inbound_mock_orig_dst? - .map(DefaultOrigDstAddr::from) - .ok_or_else(|| { - error!("{} must be specified", ENV_INBOUND_ORIG_DST_ADDR); - EnvError::NoDestinationAddress - })?, - outbound_mock_orig_dst? - .map(DefaultOrigDstAddr::from) - .ok_or_else(|| { - error!("{} must be specified", ENV_OUTBOUND_ORIG_DST_ADDR); - EnvError::NoDestinationAddress - })?, - ); - - #[cfg(not(feature = "mock-orig-dst"))] - let (inbound_orig_dst, outbound_orig_dst): (DefaultOrigDstAddr, DefaultOrigDstAddr) = - Default::default(); - let dst_profile_suffixes = dst_profile_suffixes? .unwrap_or_else(|| parse_dns_suffixes(DEFAULT_DESTINATION_PROFILE_SUFFIXES).unwrap()); let dst_profile_networks = dst_profile_networks?.unwrap_or_default(); @@ -387,16 +352,14 @@ pub fn parse_config(strings: &S) -> Result let outbound = { let ingress_mode = parse(strings, ENV_INGRESS_MODE, parse_bool)?.unwrap_or(false); - let keepalive = Keepalive(outbound_accept_keepalive?); - let bind = BindTcp::new( - ListenAddr( - outbound_listener_addr? - .unwrap_or_else(|| parse_socket_addr(DEFAULT_OUTBOUND_LISTEN_ADDR).unwrap()), - ), - keepalive, + let addr = ListenAddr( + outbound_listener_addr? + .unwrap_or_else(|| parse_socket_addr(DEFAULT_OUTBOUND_LISTEN_ADDR).unwrap()), ); + let keepalive = Keepalive(outbound_accept_keepalive?); let server = ServerConfig { - bind: bind.with_orig_dst_addr(outbound_orig_dst), + addr, + keepalive, h2_settings: h2::Settings { keepalive_timeout: keepalive.into(), ..h2_settings @@ -451,16 +414,14 @@ pub fn parse_config(strings: &S) -> Result }; let inbound = { - let keepalive = Keepalive(inbound_accept_keepalive?); - let bind = BindTcp::new( - ListenAddr( - inbound_listener_addr? - .unwrap_or_else(|| parse_socket_addr(DEFAULT_INBOUND_LISTEN_ADDR).unwrap()), - ), - keepalive, + let addr = ListenAddr( + inbound_listener_addr? + .unwrap_or_else(|| parse_socket_addr(DEFAULT_INBOUND_LISTEN_ADDR).unwrap()), ); + let keepalive = Keepalive(inbound_accept_keepalive?); let server = ServerConfig { - bind: bind.with_orig_dst_addr(inbound_orig_dst), + addr, + keepalive, h2_settings: h2::Settings { keepalive_timeout: keepalive.into(), ..h2_settings @@ -507,7 +468,7 @@ pub fn parse_config(strings: &S) -> Result // Ensure that connections thaat directly target the inbound port are // secured (unless identity is disabled). - let inbound_port = server.bind.addr().as_ref().port(); + let inbound_port = server.addr.as_ref().port(); if !id_disabled && !require_identity_for_inbound_ports.contains(&inbound_port) { debug!( "Adding {} to {}", @@ -566,13 +527,11 @@ pub fn parse_config(strings: &S) -> Result let admin = super::admin::Config { metrics_retain_idle: metrics_retain_idle?.unwrap_or(DEFAULT_METRICS_RETAIN_IDLE), server: ServerConfig { - bind: BindTcp::new( - ListenAddr( - admin_listener_addr? - .unwrap_or_else(|| parse_socket_addr(DEFAULT_ADMIN_LISTEN_ADDR).unwrap()), - ), - inbound.proxy.server.bind.keepalive(), + addr: ListenAddr( + admin_listener_addr? + .unwrap_or_else(|| parse_socket_addr(DEFAULT_ADMIN_LISTEN_ADDR).unwrap()), ), + keepalive: inbound.proxy.server.keepalive, h2_settings, }, }; @@ -617,7 +576,8 @@ pub fn parse_config(strings: &S) -> Result .map(|(addr, ids)| super::tap::Config::Enabled { permitted_client_ids: ids, config: ServerConfig { - bind: BindTcp::new(ListenAddr(addr), inbound.proxy.server.bind.keepalive()), + addr: ListenAddr(addr), + keepalive: inbound.proxy.server.keepalive, h2_settings, }, }) diff --git a/linkerd/app/src/lib.rs b/linkerd/app/src/lib.rs index 829a3c9b3..e9f21f138 100644 --- a/linkerd/app/src/lib.rs +++ b/linkerd/app/src/lib.rs @@ -12,12 +12,20 @@ pub mod tap; pub use self::metrics::Metrics; use futures::{future, FutureExt, TryFutureExt}; pub use linkerd_app_core::{self as core, metrics, trace}; -use linkerd_app_core::{control::ControlAddr, dns, drain, proxy::http, svc, Error, ProxyRuntime}; +use linkerd_app_core::{ + config::ServerConfig, + control::ControlAddr, + dns, drain, + proxy::http, + svc::Param, + transport::{listen::Bind, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr}, + Error, ProxyRuntime, +}; use linkerd_app_gateway as gateway; use linkerd_app_inbound::{self as inbound, Inbound}; use linkerd_app_outbound::{self as outbound, Outbound}; use linkerd_channel::into_stream::IntoStream; -use std::{net::SocketAddr, pin::Pin}; +use std::pin::Pin; use tokio::{sync::mpsc, time::Duration}; use tracing::instrument::Instrument; use tracing::{debug, info, info_span}; @@ -53,9 +61,9 @@ pub struct App { drain: drain::Signal, dst: ControlAddr, identity: identity::Identity, - inbound_addr: SocketAddr, + inbound_addr: Local, oc_collector: oc_collector::OcCollector, - outbound_addr: SocketAddr, + outbound_addr: Local, start_proxy: Pin + Send + 'static>>, tap: tap::Tap, } @@ -64,16 +72,29 @@ impl Config { pub fn try_from_env() -> Result { env::Env.try_config() } +} +impl Config { /// Build an application. /// /// It is currently required that this be run on a Tokio runtime, since some /// services are created eagerly and must spawn tasks to do so. - pub async fn build( + pub async fn build( self, + bind_in: BIn, + bind_out: BOut, + bind_admin: BAdmin, shutdown_tx: mpsc::UnboundedSender<()>, log_level: trace::Handle, - ) -> Result { + ) -> Result + where + BIn: Bind + 'static, + BIn::Addrs: Param> + Param> + Param, + BOut: Bind + 'static, + BOut::Addrs: Param> + Param> + Param, + BAdmin: Bind + Clone + 'static, + BAdmin::Addrs: Param> + Param>, + { use metrics::FmtMetrics; let Config { @@ -98,7 +119,11 @@ impl Config { let (drain_tx, drain_rx) = drain::channel(); - let tap = info_span!("tap").in_scope(|| tap.build(identity.local(), drain_rx.clone()))?; + let tap = { + let bind = bind_admin.clone(); + info_span!("tap").in_scope(|| tap.build(bind, identity.local(), drain_rx.clone()))? + }; + let dst = { let metrics = metrics.control.clone(); let dns = dns.resolver.clone(); @@ -119,7 +144,15 @@ impl Config { let drain = drain_rx.clone(); let metrics = metrics.inbound.clone(); info_span!("admin").in_scope(move || { - admin.build(identity, report, metrics, log_level, drain, shutdown_tx) + admin.build( + bind_admin, + identity, + report, + metrics, + log_level, + drain, + shutdown_tx, + ) })? }; @@ -155,8 +188,9 @@ impl Config { dst.resolve.clone(), ); - let (inbound_addr, inbound_serve) = inbound.serve(dst.profiles.clone(), gateway_stack); - let (outbound_addr, outbound_serve) = outbound.serve(dst.profiles, dst.resolve); + let (inbound_addr, inbound_serve) = + inbound.serve(bind_in, dst.profiles.clone(), gateway_stack); + let (outbound_addr, outbound_serve) = outbound.serve(bind_out, dst.profiles, dst.resolve); let start_proxy = Box::pin(async move { tokio::spawn(outbound_serve.instrument(info_span!("outbound"))); @@ -178,19 +212,19 @@ impl Config { } impl App { - pub fn admin_addr(&self) -> SocketAddr { + pub fn admin_addr(&self) -> Local { self.admin.listen_addr } - pub fn inbound_addr(&self) -> SocketAddr { + pub fn inbound_addr(&self) -> Local { self.inbound_addr } - pub fn outbound_addr(&self) -> SocketAddr { + pub fn outbound_addr(&self) -> Local { self.outbound_addr } - pub fn tap_addr(&self) -> Option { + pub fn tap_addr(&self) -> Option> { match self.tap { tap::Tap::Disabled { .. } => None, tap::Tap::Enabled { listen_addr, .. } => Some(listen_addr), diff --git a/linkerd/app/src/tap.rs b/linkerd/app/src/tap.rs index 5f3ff8d3d..e78c7c44f 100644 --- a/linkerd/app/src/tap.rs +++ b/linkerd/app/src/tap.rs @@ -1,10 +1,17 @@ use futures::prelude::*; use indexmap::IndexSet; use linkerd_app_core::{ - config::ServerConfig, drain, proxy::identity::LocalCrtKey, proxy::tap, serve, tls, - transport::listen::Addrs, Error, + config::ServerConfig, + drain, + proxy::identity::LocalCrtKey, + proxy::tap, + serve, + svc::{self, Param}, + tls, + transport::{listen::Bind, ClientAddr, Local, Remote, ServerAddr}, + Error, }; -use std::{net::SocketAddr, pin::Pin}; +use std::pin::Pin; use tower::util::{service_fn, ServiceExt}; #[derive(Clone, Debug)] @@ -21,14 +28,23 @@ pub enum Tap { registry: tap::Registry, }, Enabled { - listen_addr: SocketAddr, + listen_addr: Local, registry: tap::Registry, serve: Pin + Send + 'static>>, }, } impl Config { - pub fn build(self, identity: Option, drain: drain::Watch) -> Result { + pub fn build( + self, + bind: B, + identity: Option, + drain: drain::Watch, + ) -> Result + where + B: Bind, + B::Addrs: Param>, + { let (registry, server) = tap::new(); match self { Config::Disabled => { @@ -39,22 +55,32 @@ impl Config { config, permitted_client_ids, } => { - let (listen_addr, listen) = config.bind.bind()?; - - let service = tap::AcceptPermittedClients::new(permitted_client_ids.into(), server); - let accept = tls::NewDetectTls::new( - identity, - move |meta: tls::server::Meta| { - let service = service.clone(); - service_fn(move |io| { - let fut = service.clone().oneshot((meta.clone(), io)); - Box::pin(async move { - fut.err_into::().await?.err_into::().await + let (listen_addr, listen) = bind.bind(&config)?; + let accept = svc::stack(server) + .push(svc::layer::mk(move |service| { + tap::AcceptPermittedClients::new( + permitted_client_ids.clone().into(), + service, + ) + })) + .push(svc::layer::mk(|service: tap::AcceptPermittedClients| { + move |meta: tls::server::Meta| { + let service = service.clone(); + service_fn(move |io| { + let fut = service.clone().oneshot((meta.clone(), io)); + Box::pin(async move { + fut.err_into::().await?.err_into::().await + }) }) - }) - }, - std::time::Duration::from_secs(1), - ); + } + })) + .check_new_service::, _>() + .push(tls::NewDetectTls::layer( + identity, + std::time::Duration::from_secs(1), + )) + .check_new_service::() + .into_inner(); let serve = Box::pin(serve::serve(listen, accept, drain.signaled())); diff --git a/linkerd/app/test/Cargo.toml b/linkerd/app/test/Cargo.toml index 5044a7e41..c1a56efbd 100644 --- a/linkerd/app/test/Cargo.toml +++ b/linkerd/app/test/Cargo.toml @@ -23,7 +23,7 @@ http = "0.2" http-body = "0.4" hyper = { version = "0.14.2", features = ["http1", "http2"] } linkerd-channel = { path = "../../channel" } -linkerd-app-core = { path = "../core", features = ["mock-orig-dst"] } +linkerd-app-core = { path = "../core" } linkerd-identity = { path = "../../identity" } linkerd-io = { path = "../../io", features = ["tokio-test"] } regex = "1" diff --git a/linkerd/proxy/tap/src/accept.rs b/linkerd/proxy/tap/src/accept.rs index 6dbf8134c..8c7523580 100644 --- a/linkerd/proxy/tap/src/accept.rs +++ b/linkerd/proxy/tap/src/accept.rs @@ -13,7 +13,6 @@ use std::{ sync::Arc, task::{Context, Poll}, }; -use tokio::net::TcpStream; use tower::Service; #[derive(Clone, Debug)] @@ -64,7 +63,10 @@ impl AcceptPermittedClients { } } -impl Service>> for AcceptPermittedClients { +impl Service> for AcceptPermittedClients +where + I: io::AsyncRead + io::AsyncWrite + Send + Unpin + 'static, +{ type Response = ServeFuture; type Error = Error; type Future = future::Ready>; @@ -73,7 +75,7 @@ impl Service>> for AcceptPermittedClien Poll::Ready(Ok(())) } - fn call(&mut self, conn: Connection>) -> Self::Future { + fn call(&mut self, conn: Connection) -> Self::Future { match conn { ((Conditional::Some(tls), _), io) => { if let tls::ServerTls::Established { diff --git a/linkerd/proxy/transport/Cargo.toml b/linkerd/proxy/transport/Cargo.toml index cf5b1cd67..ef24ca9ed 100644 --- a/linkerd/proxy/transport/Cargo.toml +++ b/linkerd/proxy/transport/Cargo.toml @@ -9,11 +9,6 @@ description = """ Transport-level implementations that rely on core proxy infrastructure """ -# This should probably be decomposed into smaller, decoupled crates. - -[features] -mock-orig-dst = [] - [dependencies] bytes = "1" futures = "0.3.9" diff --git a/linkerd/proxy/transport/src/lib.rs b/linkerd/proxy/transport/src/lib.rs index ee59fef6b..bd2cccdd6 100644 --- a/linkerd/proxy/transport/src/lib.rs +++ b/linkerd/proxy/transport/src/lib.rs @@ -4,11 +4,13 @@ pub mod addrs; mod connect; pub mod listen; pub mod metrics; +pub mod orig_dst; pub use self::{ addrs::{ClientAddr, ListenAddr, Local, OrigDstAddr, Remote, ServerAddr}, connect::ConnectTcp, - listen::{BindTcp, DefaultOrigDstAddr, GetOrigDstAddr, NoOrigDstAddr}, + listen::{Bind, BindTcp}, + orig_dst::BindWithOrigDst, }; use std::time::Duration; use tokio::net::TcpStream; diff --git a/linkerd/proxy/transport/src/listen.rs b/linkerd/proxy/transport/src/listen.rs index 7b348d9c3..22afd49d4 100644 --- a/linkerd/proxy/transport/src/listen.rs +++ b/linkerd/proxy/transport/src/listen.rs @@ -1,307 +1,92 @@ use crate::{addrs::*, Keepalive}; use futures::prelude::*; +use linkerd_io as io; use linkerd_stack::Param; -use std::{io, net::SocketAddr}; +use std::{fmt, pin::Pin}; use tokio::net::TcpStream; use tokio_stream::wrappers::TcpListenerStream; -use tracing::trace; -/// A mockable source for address info, i.e., for tests. -pub trait GetOrigDstAddr: Clone { - fn orig_dst_addr(&self, socket: &TcpStream) -> Option; +/// Binds a listener, producing a stream of incoming connections. +/// +/// Typically, this represents binding a TCP socket. However, it may also be an +/// stream of in-memory mock connections, for testing purposes. +pub trait Bind { + type Io: io::AsyncRead + + io::AsyncWrite + + io::Peek + + io::PeerAddr + + fmt::Debug + + Unpin + + Send + + Sync + + 'static; + type Addrs: Clone + Send + Sync + 'static; + type Incoming: Stream> + Send + Sync + 'static; + + fn bind(self, params: &T) -> io::Result>; } -#[derive(Clone, Debug)] -pub struct BindTcp { - addr: ListenAddr, - keepalive: Keepalive, - orig_dst_addr: O, -} +pub type Bound = (Local, I); -pub type Connection = (Addrs, TcpStream); +#[derive(Copy, Clone, Debug, Default)] +pub struct BindTcp(()); #[derive(Clone, Debug)] pub struct Addrs { - server: Local, - client: Remote, - orig_dst: Option, + pub server: Local, + pub client: Remote, } -#[derive(Copy, Clone, Debug)] -pub struct NoOrigDstAddr(()); - -// The mock-orig-dst feature disables use of the syscall-based OrigDstAddr implementation and -// replaces it with one that must be configured. - -#[cfg(not(feature = "mock-orig-dst"))] -pub use self::sys::SysOrigDstAddr as DefaultOrigDstAddr; - -#[cfg(feature = "mock-orig-dst")] -pub use self::mock::MockOrigDstAddr as DefaultOrigDstAddr; +// === impl BindTcp === impl BindTcp { - pub fn new(addr: ListenAddr, keepalive: Keepalive) -> Self { - Self { - addr, - keepalive, - orig_dst_addr: NoOrigDstAddr(()), - } + pub fn with_orig_dst() -> super::BindWithOrigDst { + super::BindWithOrigDst::from(Self::default()) } } -impl BindTcp { - pub fn with_orig_dst_addr(self, orig_dst_addr: B) -> BindTcp { - BindTcp { - orig_dst_addr, - addr: self.addr, - keepalive: self.keepalive, - } - } +impl Bind for BindTcp +where + T: Param + Param, +{ + type Addrs = Addrs; + type Incoming = Pin> + Send + Sync>>; + type Io = TcpStream; - pub fn addr(&self) -> ListenAddr { - self.addr - } - - pub fn keepalive(&self) -> Keepalive { - self.keepalive - } - - pub fn bind(&self) -> io::Result<(SocketAddr, impl Stream>)> { + fn bind(self, params: &T) -> io::Result> { let listen = { - let l = std::net::TcpListener::bind(self.addr)?; + let ListenAddr(addr) = params.param(); + let l = std::net::TcpListener::bind(addr)?; // Ensure that O_NONBLOCK is set on the socket before using it with Tokio. l.set_nonblocking(true)?; tokio::net::TcpListener::from_std(l).expect("listener must be valid") }; - let addr = listen.local_addr()?; - let keepalive = self.keepalive; - let get_orig = self.orig_dst_addr.clone(); - - let accept = TcpListenerStream::new(listen) - .and_then(move |tcp| future::ready(Self::accept(tcp, keepalive, get_orig.clone()))); - - Ok((addr, accept)) - } - - fn accept(tcp: TcpStream, keepalive: Keepalive, get_orig: A) -> io::Result { - let addrs = { - let server = Local(ServerAddr(tcp.local_addr()?)); + let server = Local(ServerAddr(listen.local_addr()?)); + let Keepalive(keepalive) = params.param(); + let accept = TcpListenerStream::new(listen).map(move |res| { + let tcp = res?; + super::set_nodelay_or_warn(&tcp); + super::set_keepalive_or_warn(&tcp, keepalive); let client = Remote(ClientAddr(tcp.peer_addr()?)); - let orig_dst = get_orig.orig_dst_addr(&tcp); - trace!( - server.addr = %server, - client.addr = %client, - orig.addr = ?orig_dst, - "Accepted", - ); - Addrs::new(server, client, orig_dst) - }; + Ok((Addrs { server, client }, tcp)) + }); - let Keepalive(keepalive) = keepalive; - super::set_nodelay_or_warn(&tcp); - super::set_keepalive_or_warn(&tcp, keepalive); - - Ok((addrs, tcp)) + Ok((server, Box::pin(accept))) } } -impl Addrs { - pub fn new( - server: Local, - client: Remote, - orig_dst: Option, - ) -> Self { - Self { - server, - client, - orig_dst, - } - } - - pub fn server(&self) -> Local { - self.server - } - - pub fn client(&self) -> Remote { - self.client - } - - pub fn orig_dst(&self) -> Option { - self.orig_dst - } - - pub fn target_addr(&self) -> SocketAddr { - self.orig_dst - .map(Into::into) - .unwrap_or_else(|| self.server.into()) - } -} - -impl GetOrigDstAddr for NoOrigDstAddr { - fn orig_dst_addr(&self, _: &TcpStream) -> Option { - None - } -} - -impl Param> for Addrs { - #[inline] - fn param(&self) -> Option { - self.orig_dst() - } -} +// === impl Addrs === impl Param> for Addrs { #[inline] fn param(&self) -> Remote { - self.client() + self.client } } impl Param> for Addrs { #[inline] fn param(&self) -> Local { - self.server() - } -} - -#[cfg(not(feature = "mock-orig-dst"))] -mod sys { - use super::{GetOrigDstAddr, OrigDstAddr, TcpStream}; - - #[derive(Copy, Clone, Debug, Default)] - pub struct SysOrigDstAddr(()); - - impl GetOrigDstAddr for SysOrigDstAddr { - #[cfg(target_os = "linux")] - fn orig_dst_addr(&self, sock: &TcpStream) -> Option { - use std::os::unix::io::AsRawFd; - - let fd = sock.as_raw_fd(); - let r = unsafe { linux::so_original_dst(fd) }; - r.map(OrigDstAddr).ok() - } - - #[cfg(not(target_os = "linux"))] - fn orig_dst_addr(&self, _sock: &TcpStream) -> Option { - None - } - } - - #[cfg(target_os = "linux")] - mod linux { - use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; - use std::os::unix::io::RawFd; - use std::{io, mem}; - use tracing::warn; - - pub unsafe fn so_original_dst(fd: RawFd) -> io::Result { - let mut sockaddr: libc::sockaddr_storage = mem::zeroed(); - let mut socklen: libc::socklen_t = mem::size_of::() as u32; - - let ret = libc::getsockopt( - fd, - libc::SOL_IP, - libc::SO_ORIGINAL_DST, - &mut sockaddr as *mut _ as *mut _, - &mut socklen as *mut _ as *mut _, - ); - if ret != 0 { - let e = io::Error::last_os_error(); - warn!("failed to read SO_ORIGINAL_DST: {:?}", e); - return Err(e); - } - - mk_addr(&sockaddr, socklen) - } - - // Borrowed with love from net2-rs - // https://github.com/rust-lang-nursery/net2-rs/blob/1b4cb4fb05fbad750b271f38221eab583b666e5e/src/socket.rs#L103 - fn mk_addr( - storage: &libc::sockaddr_storage, - len: libc::socklen_t, - ) -> io::Result { - match storage.ss_family as libc::c_int { - libc::AF_INET => { - assert!(len as usize >= mem::size_of::()); - - let sa = { - let sa = storage as *const _ as *const libc::sockaddr_in; - unsafe { *sa } - }; - - let bits = ntoh32(sa.sin_addr.s_addr); - let ip = Ipv4Addr::new( - (bits >> 24) as u8, - (bits >> 16) as u8, - (bits >> 8) as u8, - bits as u8, - ); - let port = sa.sin_port; - Ok(SocketAddr::V4(SocketAddrV4::new(ip, ntoh16(port)))) - } - libc::AF_INET6 => { - assert!(len as usize >= mem::size_of::()); - - let sa = { - let sa = storage as *const _ as *const libc::sockaddr_in6; - unsafe { *sa } - }; - - let arr = sa.sin6_addr.s6_addr; - let ip = Ipv6Addr::new( - (arr[0] as u16) << 8 | (arr[1] as u16), - (arr[2] as u16) << 8 | (arr[3] as u16), - (arr[4] as u16) << 8 | (arr[5] as u16), - (arr[6] as u16) << 8 | (arr[7] as u16), - (arr[8] as u16) << 8 | (arr[9] as u16), - (arr[10] as u16) << 8 | (arr[11] as u16), - (arr[12] as u16) << 8 | (arr[13] as u16), - (arr[14] as u16) << 8 | (arr[15] as u16), - ); - - let port = sa.sin6_port; - let flowinfo = sa.sin6_flowinfo; - let scope_id = sa.sin6_scope_id; - Ok(SocketAddr::V6(SocketAddrV6::new( - ip, - ntoh16(port), - flowinfo, - scope_id, - ))) - } - _ => Err(io::Error::new( - io::ErrorKind::InvalidInput, - "invalid argument", - )), - } - } - - fn ntoh16(i: u16) -> u16 { - ::from_be(i) - } - - fn ntoh32(i: u32) -> u32 { - ::from_be(i) - } - } -} - -#[cfg(feature = "mock-orig-dst")] -mod mock { - use super::{GetOrigDstAddr, OrigDstAddr, SocketAddr, TcpStream}; - - #[derive(Copy, Clone, Debug)] - pub struct MockOrigDstAddr(SocketAddr); - - impl From for MockOrigDstAddr { - fn from(addr: SocketAddr) -> Self { - MockOrigDstAddr(addr) - } - } - - impl GetOrigDstAddr for MockOrigDstAddr { - fn orig_dst_addr(&self, _: &TcpStream) -> Option { - Some(OrigDstAddr(self.0)) - } + self.server } } diff --git a/linkerd/proxy/transport/src/orig_dst.rs b/linkerd/proxy/transport/src/orig_dst.rs new file mode 100644 index 000000000..8772b03cd --- /dev/null +++ b/linkerd/proxy/transport/src/orig_dst.rs @@ -0,0 +1,191 @@ +use crate::{ + addrs::*, + listen::{self, Bind, Bound}, +}; +use futures::prelude::*; +use linkerd_io as io; +use linkerd_stack::Param; +use std::pin::Pin; +use tokio::net::TcpStream; + +#[derive(Copy, Clone, Debug, Default)] +pub struct BindWithOrigDst { + inner: B, +} + +#[derive(Clone, Debug)] +pub struct Addrs { + pub inner: A, + pub orig_dst: OrigDstAddr, +} + +// === impl Addrs === + +impl Param for Addrs { + fn param(&self) -> OrigDstAddr { + self.orig_dst + } +} + +impl Param> for Addrs +where + A: Param>, +{ + fn param(&self) -> Remote { + self.inner.param() + } +} + +impl Param> for Addrs +where + A: Param>, +{ + fn param(&self) -> Local { + self.inner.param() + } +} + +// === impl WithOrigDst === + +impl From for BindWithOrigDst { + fn from(inner: B) -> Self { + Self { inner } + } +} + +impl Bind for BindWithOrigDst +where + B: Bind + 'static, +{ + type Addrs = Addrs; + type Io = TcpStream; + type Incoming = + Pin> + Send + Sync + 'static>>; + + fn bind(self, t: &T) -> io::Result> { + let (addr, incoming) = self.inner.bind(t)?; + + let incoming = incoming.map(|res| { + let (inner, tcp) = res?; + let orig_dst = orig_dst_addr(&tcp)?; + let addrs = Addrs { inner, orig_dst }; + Ok((addrs, tcp)) + }); + + Ok((addr, Box::pin(incoming))) + } +} + +#[cfg(target_os = "linux")] +fn orig_dst_addr(sock: &TcpStream) -> io::Result { + use std::os::unix::io::AsRawFd; + + let fd = sock.as_raw_fd(); + let r = unsafe { linux::so_original_dst(fd) }; + r.map(OrigDstAddr) +} + +#[cfg(not(target_os = "linux"))] +fn orig_dst_addr(&self, _: &TcpStream) -> io::Result { + io::Error::new( + io::ErrorKind::Other, + "SO_ORIGINAL_DST not supported on this operating system", + ) +} + +#[cfg(target_os = "linux")] +mod linux { + use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; + use std::os::unix::io::RawFd; + use std::{io, mem}; + use tracing::warn; + + pub unsafe fn so_original_dst(fd: RawFd) -> io::Result { + let mut sockaddr: libc::sockaddr_storage = mem::zeroed(); + let mut socklen: libc::socklen_t = mem::size_of::() as u32; + + let ret = libc::getsockopt( + fd, + libc::SOL_IP, + libc::SO_ORIGINAL_DST, + &mut sockaddr as *mut _ as *mut _, + &mut socklen as *mut _ as *mut _, + ); + if ret != 0 { + let e = io::Error::last_os_error(); + warn!("failed to read SO_ORIGINAL_DST: {:?}", e); + return Err(e); + } + + mk_addr(&sockaddr, socklen) + } + + // Borrowed with love from net2-rs + // https://github.com/rust-lang-nursery/net2-rs/blob/1b4cb4fb05fbad750b271f38221eab583b666e5e/src/socket.rs#L103 + // + // Copyright (c) 2014 The Rust Project Developers + fn mk_addr(storage: &libc::sockaddr_storage, len: libc::socklen_t) -> io::Result { + match storage.ss_family as libc::c_int { + libc::AF_INET => { + assert!(len as usize >= mem::size_of::()); + + let sa = { + let sa = storage as *const _ as *const libc::sockaddr_in; + unsafe { *sa } + }; + + let bits = ntoh32(sa.sin_addr.s_addr); + let ip = Ipv4Addr::new( + (bits >> 24) as u8, + (bits >> 16) as u8, + (bits >> 8) as u8, + bits as u8, + ); + let port = sa.sin_port; + Ok(SocketAddr::V4(SocketAddrV4::new(ip, ntoh16(port)))) + } + libc::AF_INET6 => { + assert!(len as usize >= mem::size_of::()); + + let sa = { + let sa = storage as *const _ as *const libc::sockaddr_in6; + unsafe { *sa } + }; + + let arr = sa.sin6_addr.s6_addr; + let ip = Ipv6Addr::new( + (arr[0] as u16) << 8 | (arr[1] as u16), + (arr[2] as u16) << 8 | (arr[3] as u16), + (arr[4] as u16) << 8 | (arr[5] as u16), + (arr[6] as u16) << 8 | (arr[7] as u16), + (arr[8] as u16) << 8 | (arr[9] as u16), + (arr[10] as u16) << 8 | (arr[11] as u16), + (arr[12] as u16) << 8 | (arr[13] as u16), + (arr[14] as u16) << 8 | (arr[15] as u16), + ); + + let port = sa.sin6_port; + let flowinfo = sa.sin6_flowinfo; + let scope_id = sa.sin6_scope_id; + Ok(SocketAddr::V6(SocketAddrV6::new( + ip, + ntoh16(port), + flowinfo, + scope_id, + ))) + } + _ => Err(io::Error::new( + io::ErrorKind::InvalidInput, + "invalid argument", + )), + } + } + + fn ntoh16(i: u16) -> u16 { + ::from_be(i) + } + + fn ntoh32(i: u32) -> u32 { + ::from_be(i) + } +} diff --git a/linkerd/tls/tests/tls_accept.rs b/linkerd/tls/tests/tls_accept.rs index fd099c27c..faa82500f 100644 --- a/linkerd/tls/tests/tls_accept.rs +++ b/linkerd/tls/tests/tls_accept.rs @@ -11,7 +11,9 @@ use linkerd_error::Never; use linkerd_identity as id; use linkerd_io::{self as io, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use linkerd_proxy_transport::{ - listen::Addrs, BindTcp, ConnectTcp, Keepalive, ListenAddr, Remote, ServerAddr, + addrs::*, + listen::{Addrs, Bind, BindTcp}, + ConnectTcp, Keepalive, ListenAddr, }; use linkerd_stack::{NewService, Param}; use linkerd_tls as tls; @@ -156,12 +158,6 @@ where // Saves the result of every connection. let (sender, receiver) = mpsc::channel::>(); - // Let the OS decide the port number and then return the resulting - // `SocketAddr` so the client can connect to it. This allows multiple - // tests to run at once, which wouldn't work if they all were bound on - // a fixed port. - let addr = "127.0.0.1:0".parse::().unwrap(); - let mut detect = tls::NewDetectTls::new( server_tls.map(Tls), move |meta: tls::server::Meta| { @@ -188,18 +184,16 @@ where std::time::Duration::from_secs(10), ); - let (listen_addr, listen) = BindTcp::new(ListenAddr(addr), Keepalive(None)) - .bind() - .expect("must bind"); + let (listen_addr, listen) = BindTcp::default().bind(&Server).expect("must bind"); let server = async move { futures::pin_mut!(listen); - let (meta, io) = listen + let (addrs, io) = listen .next() .await .expect("listen failed") .expect("listener closed"); tracing::debug!("incoming connection"); - let accept = detect.new_service(meta); + let accept = detect.new_service(addrs); accept.oneshot(io).await.expect("connection failed"); tracing::debug!("done"); } @@ -220,7 +214,7 @@ where let client = async move { let conn = tls::Client::layer(client_tls) .layer(ConnectTcp::new(Keepalive(None))) - .oneshot(Target(server_addr, client_server_id.map(Into::into))) + .oneshot(Target(server_addr.into(), client_server_id.map(Into::into))) .await; match conn { Err(e) => { @@ -306,12 +300,17 @@ const PING: &[u8] = b"ping"; const PONG: &[u8] = b"pong"; const START_OF_TLS: &[u8] = &[22, 3, 1]; // ContentType::handshake version 3.1 +#[derive(Copy, Clone, Debug)] +struct Server; + #[derive(Clone)] struct Target(SocketAddr, tls::ConditionalClientTls); #[derive(Clone)] struct Tls(id::CrtKey); +// === impl Target === + impl Param> for Target { fn param(&self) -> Remote { Remote(ServerAddr(self.0)) @@ -324,6 +323,8 @@ impl Param for Target { } } +// === impl Tls === + impl Param for Tls { fn param(&self) -> tls::client::Config { self.0.client_config() @@ -341,3 +342,20 @@ impl Param for Tls { self.0.id().clone() } } + +// === impl Server === + +impl Param for Server { + fn param(&self) -> ListenAddr { + // Let the OS decide the port number and then return the resulting + // `SocketAddr` so the client can connect to it. This allows multiple + // tests to run at once, which wouldn't work if they all were bound on + // a fixed port. + ListenAddr(([127, 0, 0, 1], 0).into()) + } +} +impl Param for Server { + fn param(&self) -> Keepalive { + Keepalive(None) + } +} diff --git a/linkerd2-proxy/Cargo.toml b/linkerd2-proxy/Cargo.toml index 27e04bd74..1b5451c8d 100644 --- a/linkerd2-proxy/Cargo.toml +++ b/linkerd2-proxy/Cargo.toml @@ -9,7 +9,6 @@ description = "The main proxy executable" [features] default = ["multicore"] -mock-orig-dst = ["linkerd-app/mock-orig-dst"] multicore = ["tokio/rt-multi-thread", "num_cpus"] [dependencies] diff --git a/linkerd2-proxy/src/main.rs b/linkerd2-proxy/src/main.rs index c6123cbed..a726c2825 100644 --- a/linkerd2-proxy/src/main.rs +++ b/linkerd2-proxy/src/main.rs @@ -3,7 +3,7 @@ #![deny(warnings, rust_2018_idioms)] #![type_length_limit = "16289823"] -use linkerd_app::{trace, Config}; +use linkerd_app::{core::transport::BindTcp, trace, Config}; use linkerd_signal as signal; use tokio::sync::mpsc; pub use tracing::{debug, error, info, warn}; @@ -14,22 +14,36 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; mod rt; +const EX_USAGE: i32 = 64; + fn main() { - let trace = trace::init(); + let trace = match trace::init() { + Ok(t) => t, + Err(e) => { + eprintln!("Invalid logging configuration: {}", e); + std::process::exit(EX_USAGE); + } + }; // Load configuration from the environment without binding ports. let config = match Config::try_from_env() { Ok(config) => config, Err(e) => { eprintln!("Invalid configuration: {}", e); - const EX_USAGE: i32 = 64; std::process::exit(EX_USAGE); } }; + // Builds a runtime with the appropriate number of cores: + // `LINKERD2_PROXY_CORES` env or the number of available CPUs (as provided + // by cgroups, when possible). rt::build().block_on(async move { let (shutdown_tx, mut shutdown_rx) = mpsc::unbounded_channel(); - let app = match async move { config.build(shutdown_tx, trace?).await }.await { + let bind = BindTcp::with_orig_dst(); + let app = match config + .build(bind, bind, BindTcp::default(), shutdown_tx, trace) + .await + { Ok(app) => app, Err(e) => { eprintln!("Initialization failure: {}", e);