outbound: handle `Opaque` protocol hints on endpoints (#2237)

Currently, when the outbound proxy makes a direct connection prefixed
with a `TransportHeader` in order to send HTTP traffic, it will always
send a `SessionProtocol` hint with the HTTP version as part of the
header. This instructs the inbound proxy to use that protocol, even if
the target port has a ServerPolicy that marks that port as opaque, which
can result in incorrect handling of that connection. See
linkerd/linkerd2#9888 for details.

In order to prevent this, linkerd/linkerd2-proxy-api#197 adds a new
`ProtocolHint` value to the protobuf endpoint metadata message. This
will allow the Destination controller to explicitly indicate to the
outbound proxy that a given endpoint is known to handle all connections
to a port as an opaque TCP stream, and that the proxy should not perform
a protocol upgrade or send a `SessionProtocol` in the transport header.
This branch updates the proxy to handle this new hint value, and adds
tests that the outbound proxy behaves as expected.

Along with linkerd/linkerd2#10301, this will fix linkerd/linkerd2#9888.

I opened a new PR for this change rather than attempting to rebase my
previous PR #2209, as it felt a bit easier to start with a new branch
and just make the changes that were still relevant. Therefore, this
closes #2209.
This commit is contained in:
Eliza Weisman 2023-04-13 14:09:01 -07:00 committed by GitHub
parent abab7fd783
commit c7918cfb1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 202 additions and 78 deletions

View File

@ -1,4 +1,5 @@
mod client_policy;
mod direct;
mod discovery;
mod identity;
mod orig_proto;

View File

@ -1,7 +1,7 @@
use crate::*;
#[tokio::test]
async fn tagged_transport_http() {
async fn h2_hinted() {
let _trace = trace_init();
// identity is always required for direct connections
@ -39,6 +39,56 @@ async fn tagged_transport_http() {
assert_eq!(client.get("/").await, "hello");
}
/// Reproduces linkerd/linkerd2#9888. A proxy receives HTTP traffic direct
/// traffic with a transport header, and the port is also in the
/// `INBOUND_PORTS_DISABLE_PROTOCOL_DETECTION` env var.
/// TODO(eliza): add a similar test where the policy on the opaque port is
/// discovered from the policy controller.
#[tokio::test]
async fn opaque_hinted() {
let _trace = trace_init();
// identity is always required for direct connections
let in_svc_acct = "foo.ns1.serviceaccount.identity.linkerd.cluster.local";
let in_identity = identity::Identity::new("foo-ns1", in_svc_acct.to_string());
let out_svc_acct = "bar.ns1.serviceaccount.identity.linkerd.cluster.local";
let out_identity = identity::Identity::new("bar-ns1", out_svc_acct.to_string());
let srv = server::http1().route("/", "hello").run().await;
let srv_addr = srv.addr;
let dst = format!("opaque.test.svc.cluster.local:{}", srv_addr.port());
let (inbound, _profile_in) = {
let id_svc = in_identity.service();
let mut env = in_identity.env;
env.put(
app::env::ENV_INBOUND_PORTS_DISABLE_PROTOCOL_DETECTION,
srv_addr.port().to_string(),
);
let (proxy, profile) = mk_inbound(srv, id_svc, &dst).await;
let proxy = proxy.run_with_test_env(env).await;
(proxy, profile)
};
let (outbound, _profile_out, _dst) = {
let ctrl = controller::new();
let dst = ctrl.destination_tx(dst);
dst.send(
controller::destination_add(srv_addr)
.hint(controller::Hint::Opaque)
.opaque_port(inbound.inbound.port())
.identity(in_svc_acct),
);
let (proxy, profile) = mk_outbound(srv_addr, ctrl, out_identity).await;
(proxy, profile, dst)
};
let client = client::http1(outbound.outbound, "opaque.test.svc.cluster.local");
assert_eq!(client.get("/").await, "hello");
}
async fn mk_inbound(
srv: server::Listening,
id: identity::Controller,
@ -54,8 +104,7 @@ async fn mk_inbound(
.controller(ctrl)
.identity(id.run().await)
.inbound(srv)
.inbound_direct()
.named("inbound");
.inbound_direct();
(proxy, profile)
}
@ -73,7 +122,6 @@ async fn mk_outbound(
.controller(ctrl)
.identity(out_identity.service().run().await)
.outbound_ip(srv_addr)
.named("outbound")
.run_with_test_env(out_identity.env)
.await;
(proxy, profile)

View File

@ -408,13 +408,22 @@ where
match self.param() {
http::Version::H2 => client::Settings::H2,
http::Version::Http1 => match self.metadata.protocol_hint() {
ProtocolHint::Unknown => client::Settings::Http1,
// If the protocol hint is unknown or indicates that the
// endpoint's proxy will treat connections as opaque, do not
// perform a protocol upgrade to HTTP/2.
ProtocolHint::Unknown | ProtocolHint::Opaque => client::Settings::Http1,
ProtocolHint::Http2 => client::Settings::OrigProtoUpgrade,
},
}
}
}
impl<T> svc::Param<ProtocolHint> for Endpoint<T> {
fn param(&self) -> ProtocolHint {
self.metadata.protocol_hint()
}
}
// TODO(ver) move this into the endpoint stack?
impl<T> tap::Inspect for Endpoint<T> {
fn src_addr<B>(&self, req: &http::Request<B>) -> Option<SocketAddr> {

View File

@ -7,7 +7,7 @@ use super::{
use crate::{tcp::tagged_transport, Outbound};
use linkerd_app_core::{
classify, config, errors, http_tracing, metrics,
proxy::{http, tap},
proxy::{api_resolve::ProtocolHint, http, tap},
svc::{self, ExtractParam},
tls,
transport::{self, Remote, ServerAddr},
@ -206,9 +206,19 @@ impl errors::HttpRescue<Error> for ClientRescue {
// === impl Connect ===
impl<T> svc::Param<Option<SessionProtocol>> for Connect<T> {
impl<T> svc::Param<Option<SessionProtocol>> for Connect<T>
where
T: svc::Param<ProtocolHint>,
{
#[inline]
fn param(&self) -> Option<SessionProtocol> {
// The discovered protocol hint indicates that this endpoint will treat
// all connections as opaque TCP streams. Don't send our detected
// session protocol as part of a transport header.
if self.inner.param() == ProtocolHint::Opaque {
return None;
}
match self.version {
http::Version::Http1 => Some(SessionProtocol::Http1),
http::Version::H2 => Some(SessionProtocol::Http2),

View File

@ -314,13 +314,19 @@ impl svc::Param<http::client::Settings> for Endpoint {
match self.version {
http::Version::H2 => http::client::Settings::H2,
http::Version::Http1 => match self.hint {
ProtocolHint::Unknown => http::client::Settings::Http1,
ProtocolHint::Unknown | ProtocolHint::Opaque => http::client::Settings::Http1,
ProtocolHint::Http2 => http::client::Settings::OrigProtoUpgrade,
},
}
}
}
impl svc::Param<ProtocolHint> for Endpoint {
fn param(&self) -> ProtocolHint {
self.hint
}
}
impl tap::Inspect for Endpoint {
fn src_addr<B>(&self, req: &http::Request<B>) -> Option<SocketAddr> {
req.extensions().get::<http::ClientHandle>().map(|c| c.addr)

View File

@ -155,6 +155,7 @@ mod test {
port_override: Option<u16>,
authority: Option<http::uri::Authority>,
server_id: Option<tls::ServerId>,
proto: Option<SessionProtocol>,
}
impl svc::Param<tls::ConditionalClientTls> for Endpoint {
@ -197,7 +198,28 @@ mod test {
impl svc::Param<Option<SessionProtocol>> for Endpoint {
fn param(&self) -> Option<SessionProtocol> {
None
self.proto.clone()
}
}
fn expect_header(
header: TransportHeader,
) -> impl Fn(Connect) -> futures::future::Ready<Result<(tokio_test::io::Mock, ConnectMeta), io::Error>>
{
move |ep| {
let Remote(ServerAddr(sa)) = ep.addr;
assert_eq!(sa.port(), 4143);
assert!(ep.tls.is_some());
let buf = header.encode_prefaced_buf().expect("Must encode");
let io = tokio_test::io::Builder::new()
.write(&buf[..])
.write(b"hello")
.build();
let meta = tls::ConnectMeta {
socket: Local(ClientAddr(([0, 0, 0, 0], 0).into())),
tls: Conditional::Some(Some(tls::NegotiatedProtocolRef(PROTOCOL).into())),
};
future::ready(Ok::<_, io::Error>((io, meta)))
}
}
@ -230,26 +252,11 @@ mod test {
let _trace = linkerd_tracing::test::trace_init();
let svc = TaggedTransport {
inner: service_fn(|ep: Connect| {
let Remote(ServerAddr(sa)) = ep.addr;
assert_eq!(sa.port(), 4143);
assert!(ep.tls.is_some());
let hdr = TransportHeader {
port: 4321,
name: None,
protocol: None,
};
let buf = hdr.encode_prefaced_buf().expect("Must encode");
let io = tokio_test::io::Builder::new()
.write(&buf[..])
.write(b"hello")
.build();
let meta = tls::ConnectMeta {
socket: Local(ClientAddr(([0, 0, 0, 0], 0).into())),
tls: Conditional::Some(Some(tls::NegotiatedProtocolRef(PROTOCOL).into())),
};
future::ready(Ok::<_, io::Error>((io, meta)))
}),
inner: service_fn(expect_header(TransportHeader {
port: 4321,
name: None,
protocol: None,
})),
};
let e = Endpoint {
@ -258,6 +265,7 @@ mod test {
identity::Name::from_str("server.id").unwrap(),
)),
authority: None,
proto: None,
};
let (mut io, _meta) = svc.oneshot(e).await.expect("Connect must not fail");
io.write_all(b"hello").await.expect("Write must succeed");
@ -268,26 +276,11 @@ mod test {
let _trace = linkerd_tracing::test::trace_init();
let svc = TaggedTransport {
inner: service_fn(|ep: Connect| {
let Remote(ServerAddr(sa)) = ep.addr;
assert_eq!(sa.port(), 4143);
assert!(ep.tls.is_some());
let hdr = TransportHeader {
port: 5555,
name: Some(dns::Name::from_str("foo.bar.example.com").unwrap()),
protocol: None,
};
let buf = hdr.encode_prefaced_buf().expect("Must encode");
let io = tokio_test::io::Builder::new()
.write(&buf[..])
.write(b"hello")
.build();
let meta = tls::ConnectMeta {
socket: Local(ClientAddr(([0, 0, 0, 0], 0).into())),
tls: Conditional::Some(Some(tls::NegotiatedProtocolRef(PROTOCOL).into())),
};
future::ready(Ok::<_, io::Error>((io, meta)))
}),
inner: service_fn(expect_header(TransportHeader {
port: 5555,
name: Some(dns::Name::from_str("foo.bar.example.com").unwrap()),
protocol: None,
})),
};
let e = Endpoint {
@ -296,6 +289,7 @@ mod test {
identity::Name::from_str("server.id").unwrap(),
)),
authority: Some(http::uri::Authority::from_str("foo.bar.example.com:5555").unwrap()),
proto: None,
};
let (mut io, _meta) = svc.oneshot(e).await.expect("Connect must not fail");
io.write_all(b"hello").await.expect("Write must succeed");
@ -306,26 +300,11 @@ mod test {
let _trace = linkerd_tracing::test::trace_init();
let svc = TaggedTransport {
inner: service_fn(|ep: Connect| {
let Remote(ServerAddr(sa)) = ep.addr;
assert_eq!(sa.port(), 4143);
assert!(ep.tls.is_some());
let hdr = TransportHeader {
port: 4321,
name: None,
protocol: None,
};
let buf = hdr.encode_prefaced_buf().expect("Must encode");
let io = tokio_test::io::Builder::new()
.write(&buf[..])
.write(b"hello")
.build();
let meta = tls::ConnectMeta {
socket: Local(ClientAddr(([0, 0, 0, 0], 0).into())),
tls: Conditional::Some(Some(tls::NegotiatedProtocolRef(PROTOCOL).into())),
};
future::ready(Ok::<_, io::Error>((io, meta)))
}),
inner: service_fn(expect_header(TransportHeader {
port: 4321,
name: None,
protocol: None,
})),
};
let e = Endpoint {
@ -334,6 +313,79 @@ mod test {
identity::Name::from_str("server.id").unwrap(),
)),
authority: None,
proto: None,
};
let (mut io, _meta) = svc.oneshot(e).await.expect("Connect must not fail");
io.write_all(b"hello").await.expect("Write must succeed");
}
#[tokio::test(flavor = "current_thread")]
async fn http_no_name() {
let _trace = linkerd_tracing::test::trace_init();
let svc = TaggedTransport {
inner: service_fn(expect_header(TransportHeader {
port: 4321,
name: None,
protocol: Some(SessionProtocol::Http1),
})),
};
let e = Endpoint {
port_override: Some(4143),
server_id: Some(tls::ServerId(
identity::Name::from_str("server.id").unwrap(),
)),
authority: None,
proto: Some(SessionProtocol::Http1),
};
let (mut io, _meta) = svc.oneshot(e).await.expect("Connect must not fail");
io.write_all(b"hello").await.expect("Write must succeed");
}
#[tokio::test(flavor = "current_thread")]
async fn http_named_with_port() {
let _trace = linkerd_tracing::test::trace_init();
let svc = TaggedTransport {
inner: service_fn(expect_header(TransportHeader {
port: 5555,
name: Some(dns::Name::from_str("foo.bar.example.com").unwrap()),
protocol: Some(SessionProtocol::Http1),
})),
};
let e = Endpoint {
port_override: Some(4143),
server_id: Some(tls::ServerId(
identity::Name::from_str("server.id").unwrap(),
)),
authority: Some(http::uri::Authority::from_str("foo.bar.example.com:5555").unwrap()),
proto: Some(SessionProtocol::Http1),
};
let (mut io, _meta) = svc.oneshot(e).await.expect("Connect must not fail");
io.write_all(b"hello").await.expect("Write must succeed");
}
#[tokio::test(flavor = "current_thread")]
async fn http_named_no_port() {
let _trace = linkerd_tracing::test::trace_init();
let svc = TaggedTransport {
inner: service_fn(expect_header(TransportHeader {
port: 4321,
name: None,
protocol: Some(SessionProtocol::Http1),
})),
};
let e = Endpoint {
port_override: Some(4143),
server_id: Some(tls::ServerId(
identity::Name::from_str("server.id").unwrap(),
)),
authority: None,
proto: Some(SessionProtocol::Http1),
};
let (mut io, _meta) = svc.oneshot(e).await.expect("Connect must not fail");
io.write_all(b"hello").await.expect("Write must succeed");

View File

@ -31,6 +31,9 @@ pub enum ProtocolHint {
Unknown,
/// The destination can receive HTTP2 messages.
Http2,
/// The destination will handle traffic as opaque, regardless of
/// the local proxy's handling of the traffic.
Opaque,
}
// === impl Metadata ===

View File

@ -26,15 +26,10 @@ pub fn to_addr_meta(
let mut proto_hint = ProtocolHint::Unknown;
let mut tagged_transport_port = None;
if let Some(hint) = pb.protocol_hint {
// the match will make sense later...
#[allow(clippy::collapsible_match, clippy::single_match)]
if let Some(proto) = hint.protocol {
match proto {
Protocol::H2(..) => {
proto_hint = ProtocolHint::Http2;
}
_ => {} // TODO(eliza): handle opaque protocol hints
}
match hint.protocol {
Some(Protocol::H2(..)) => proto_hint = ProtocolHint::Http2,
Some(Protocol::Opaque(..)) => proto_hint = ProtocolHint::Opaque,
None => {}
}
if let Some(OpaqueTransport { inbound_port }) = hint.opaque_transport {