From c7918cfb1fa1e78cb986ef9a10c96afd8c13d8de Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 13 Apr 2023 14:09:01 -0700 Subject: [PATCH] outbound: handle `Opaque` protocol hints on endpoints (#2237) Currently, when the outbound proxy makes a direct connection prefixed with a `TransportHeader` in order to send HTTP traffic, it will always send a `SessionProtocol` hint with the HTTP version as part of the header. This instructs the inbound proxy to use that protocol, even if the target port has a ServerPolicy that marks that port as opaque, which can result in incorrect handling of that connection. See linkerd/linkerd2#9888 for details. In order to prevent this, linkerd/linkerd2-proxy-api#197 adds a new `ProtocolHint` value to the protobuf endpoint metadata message. This will allow the Destination controller to explicitly indicate to the outbound proxy that a given endpoint is known to handle all connections to a port as an opaque TCP stream, and that the proxy should not perform a protocol upgrade or send a `SessionProtocol` in the transport header. This branch updates the proxy to handle this new hint value, and adds tests that the outbound proxy behaves as expected. Along with linkerd/linkerd2#10301, this will fix linkerd/linkerd2#9888. I opened a new PR for this change rather than attempting to rebase my previous PR #2209, as it felt a bit easier to start with a new branch and just make the changes that were still relevant. Therefore, this closes #2209. --- linkerd/app/integration/src/tests.rs | 1 + linkerd/app/integration/src/tests/direct.rs | 56 +++++- linkerd/app/outbound/src/http/concrete.rs | 11 +- linkerd/app/outbound/src/http/endpoint.rs | 14 +- .../app/outbound/src/http/endpoint/tests.rs | 8 +- .../app/outbound/src/tcp/tagged_transport.rs | 174 ++++++++++++------ linkerd/proxy/api-resolve/src/metadata.rs | 3 + linkerd/proxy/api-resolve/src/pb.rs | 13 +- 8 files changed, 202 insertions(+), 78 deletions(-) diff --git a/linkerd/app/integration/src/tests.rs b/linkerd/app/integration/src/tests.rs index 774f08c9f..a01378b75 100644 --- a/linkerd/app/integration/src/tests.rs +++ b/linkerd/app/integration/src/tests.rs @@ -1,4 +1,5 @@ mod client_policy; +mod direct; mod discovery; mod identity; mod orig_proto; diff --git a/linkerd/app/integration/src/tests/direct.rs b/linkerd/app/integration/src/tests/direct.rs index 400360cf7..2d52bac08 100644 --- a/linkerd/app/integration/src/tests/direct.rs +++ b/linkerd/app/integration/src/tests/direct.rs @@ -1,7 +1,7 @@ use crate::*; #[tokio::test] -async fn tagged_transport_http() { +async fn h2_hinted() { let _trace = trace_init(); // identity is always required for direct connections @@ -39,6 +39,56 @@ async fn tagged_transport_http() { assert_eq!(client.get("/").await, "hello"); } +/// Reproduces linkerd/linkerd2#9888. A proxy receives HTTP traffic direct +/// traffic with a transport header, and the port is also in the +/// `INBOUND_PORTS_DISABLE_PROTOCOL_DETECTION` env var. +/// TODO(eliza): add a similar test where the policy on the opaque port is +/// discovered from the policy controller. +#[tokio::test] +async fn opaque_hinted() { + let _trace = trace_init(); + + // identity is always required for direct connections + let in_svc_acct = "foo.ns1.serviceaccount.identity.linkerd.cluster.local"; + let in_identity = identity::Identity::new("foo-ns1", in_svc_acct.to_string()); + + let out_svc_acct = "bar.ns1.serviceaccount.identity.linkerd.cluster.local"; + let out_identity = identity::Identity::new("bar-ns1", out_svc_acct.to_string()); + + let srv = server::http1().route("/", "hello").run().await; + let srv_addr = srv.addr; + let dst = format!("opaque.test.svc.cluster.local:{}", srv_addr.port()); + + let (inbound, _profile_in) = { + let id_svc = in_identity.service(); + let mut env = in_identity.env; + env.put( + app::env::ENV_INBOUND_PORTS_DISABLE_PROTOCOL_DETECTION, + srv_addr.port().to_string(), + ); + let (proxy, profile) = mk_inbound(srv, id_svc, &dst).await; + let proxy = proxy.run_with_test_env(env).await; + (proxy, profile) + }; + + let (outbound, _profile_out, _dst) = { + let ctrl = controller::new(); + let dst = ctrl.destination_tx(dst); + dst.send( + controller::destination_add(srv_addr) + .hint(controller::Hint::Opaque) + .opaque_port(inbound.inbound.port()) + .identity(in_svc_acct), + ); + let (proxy, profile) = mk_outbound(srv_addr, ctrl, out_identity).await; + (proxy, profile, dst) + }; + + let client = client::http1(outbound.outbound, "opaque.test.svc.cluster.local"); + + assert_eq!(client.get("/").await, "hello"); +} + async fn mk_inbound( srv: server::Listening, id: identity::Controller, @@ -54,8 +104,7 @@ async fn mk_inbound( .controller(ctrl) .identity(id.run().await) .inbound(srv) - .inbound_direct() - .named("inbound"); + .inbound_direct(); (proxy, profile) } @@ -73,7 +122,6 @@ async fn mk_outbound( .controller(ctrl) .identity(out_identity.service().run().await) .outbound_ip(srv_addr) - .named("outbound") .run_with_test_env(out_identity.env) .await; (proxy, profile) diff --git a/linkerd/app/outbound/src/http/concrete.rs b/linkerd/app/outbound/src/http/concrete.rs index 659d2140c..0fd700d21 100644 --- a/linkerd/app/outbound/src/http/concrete.rs +++ b/linkerd/app/outbound/src/http/concrete.rs @@ -408,13 +408,22 @@ where match self.param() { http::Version::H2 => client::Settings::H2, http::Version::Http1 => match self.metadata.protocol_hint() { - ProtocolHint::Unknown => client::Settings::Http1, + // If the protocol hint is unknown or indicates that the + // endpoint's proxy will treat connections as opaque, do not + // perform a protocol upgrade to HTTP/2. + ProtocolHint::Unknown | ProtocolHint::Opaque => client::Settings::Http1, ProtocolHint::Http2 => client::Settings::OrigProtoUpgrade, }, } } } +impl svc::Param for Endpoint { + fn param(&self) -> ProtocolHint { + self.metadata.protocol_hint() + } +} + // TODO(ver) move this into the endpoint stack? impl tap::Inspect for Endpoint { fn src_addr(&self, req: &http::Request) -> Option { diff --git a/linkerd/app/outbound/src/http/endpoint.rs b/linkerd/app/outbound/src/http/endpoint.rs index f698a086e..1e50dd067 100644 --- a/linkerd/app/outbound/src/http/endpoint.rs +++ b/linkerd/app/outbound/src/http/endpoint.rs @@ -7,7 +7,7 @@ use super::{ use crate::{tcp::tagged_transport, Outbound}; use linkerd_app_core::{ classify, config, errors, http_tracing, metrics, - proxy::{http, tap}, + proxy::{api_resolve::ProtocolHint, http, tap}, svc::{self, ExtractParam}, tls, transport::{self, Remote, ServerAddr}, @@ -206,9 +206,19 @@ impl errors::HttpRescue for ClientRescue { // === impl Connect === -impl svc::Param> for Connect { +impl svc::Param> for Connect +where + T: svc::Param, +{ #[inline] fn param(&self) -> Option { + // The discovered protocol hint indicates that this endpoint will treat + // all connections as opaque TCP streams. Don't send our detected + // session protocol as part of a transport header. + if self.inner.param() == ProtocolHint::Opaque { + return None; + } + match self.version { http::Version::Http1 => Some(SessionProtocol::Http1), http::Version::H2 => Some(SessionProtocol::Http2), diff --git a/linkerd/app/outbound/src/http/endpoint/tests.rs b/linkerd/app/outbound/src/http/endpoint/tests.rs index 4765fe563..461ea2e91 100644 --- a/linkerd/app/outbound/src/http/endpoint/tests.rs +++ b/linkerd/app/outbound/src/http/endpoint/tests.rs @@ -314,13 +314,19 @@ impl svc::Param for Endpoint { match self.version { http::Version::H2 => http::client::Settings::H2, http::Version::Http1 => match self.hint { - ProtocolHint::Unknown => http::client::Settings::Http1, + ProtocolHint::Unknown | ProtocolHint::Opaque => http::client::Settings::Http1, ProtocolHint::Http2 => http::client::Settings::OrigProtoUpgrade, }, } } } +impl svc::Param for Endpoint { + fn param(&self) -> ProtocolHint { + self.hint + } +} + impl tap::Inspect for Endpoint { fn src_addr(&self, req: &http::Request) -> Option { req.extensions().get::().map(|c| c.addr) diff --git a/linkerd/app/outbound/src/tcp/tagged_transport.rs b/linkerd/app/outbound/src/tcp/tagged_transport.rs index 5c6a8fe61..fc053592a 100644 --- a/linkerd/app/outbound/src/tcp/tagged_transport.rs +++ b/linkerd/app/outbound/src/tcp/tagged_transport.rs @@ -155,6 +155,7 @@ mod test { port_override: Option, authority: Option, server_id: Option, + proto: Option, } impl svc::Param for Endpoint { @@ -197,7 +198,28 @@ mod test { impl svc::Param> for Endpoint { fn param(&self) -> Option { - None + self.proto.clone() + } + } + + fn expect_header( + header: TransportHeader, + ) -> impl Fn(Connect) -> futures::future::Ready> + { + move |ep| { + let Remote(ServerAddr(sa)) = ep.addr; + assert_eq!(sa.port(), 4143); + assert!(ep.tls.is_some()); + let buf = header.encode_prefaced_buf().expect("Must encode"); + let io = tokio_test::io::Builder::new() + .write(&buf[..]) + .write(b"hello") + .build(); + let meta = tls::ConnectMeta { + socket: Local(ClientAddr(([0, 0, 0, 0], 0).into())), + tls: Conditional::Some(Some(tls::NegotiatedProtocolRef(PROTOCOL).into())), + }; + future::ready(Ok::<_, io::Error>((io, meta))) } } @@ -230,26 +252,11 @@ mod test { let _trace = linkerd_tracing::test::trace_init(); let svc = TaggedTransport { - inner: service_fn(|ep: Connect| { - let Remote(ServerAddr(sa)) = ep.addr; - assert_eq!(sa.port(), 4143); - assert!(ep.tls.is_some()); - let hdr = TransportHeader { - port: 4321, - name: None, - protocol: None, - }; - let buf = hdr.encode_prefaced_buf().expect("Must encode"); - let io = tokio_test::io::Builder::new() - .write(&buf[..]) - .write(b"hello") - .build(); - let meta = tls::ConnectMeta { - socket: Local(ClientAddr(([0, 0, 0, 0], 0).into())), - tls: Conditional::Some(Some(tls::NegotiatedProtocolRef(PROTOCOL).into())), - }; - future::ready(Ok::<_, io::Error>((io, meta))) - }), + inner: service_fn(expect_header(TransportHeader { + port: 4321, + name: None, + protocol: None, + })), }; let e = Endpoint { @@ -258,6 +265,7 @@ mod test { identity::Name::from_str("server.id").unwrap(), )), authority: None, + proto: None, }; let (mut io, _meta) = svc.oneshot(e).await.expect("Connect must not fail"); io.write_all(b"hello").await.expect("Write must succeed"); @@ -268,26 +276,11 @@ mod test { let _trace = linkerd_tracing::test::trace_init(); let svc = TaggedTransport { - inner: service_fn(|ep: Connect| { - let Remote(ServerAddr(sa)) = ep.addr; - assert_eq!(sa.port(), 4143); - assert!(ep.tls.is_some()); - let hdr = TransportHeader { - port: 5555, - name: Some(dns::Name::from_str("foo.bar.example.com").unwrap()), - protocol: None, - }; - let buf = hdr.encode_prefaced_buf().expect("Must encode"); - let io = tokio_test::io::Builder::new() - .write(&buf[..]) - .write(b"hello") - .build(); - let meta = tls::ConnectMeta { - socket: Local(ClientAddr(([0, 0, 0, 0], 0).into())), - tls: Conditional::Some(Some(tls::NegotiatedProtocolRef(PROTOCOL).into())), - }; - future::ready(Ok::<_, io::Error>((io, meta))) - }), + inner: service_fn(expect_header(TransportHeader { + port: 5555, + name: Some(dns::Name::from_str("foo.bar.example.com").unwrap()), + protocol: None, + })), }; let e = Endpoint { @@ -296,6 +289,7 @@ mod test { identity::Name::from_str("server.id").unwrap(), )), authority: Some(http::uri::Authority::from_str("foo.bar.example.com:5555").unwrap()), + proto: None, }; let (mut io, _meta) = svc.oneshot(e).await.expect("Connect must not fail"); io.write_all(b"hello").await.expect("Write must succeed"); @@ -306,26 +300,11 @@ mod test { let _trace = linkerd_tracing::test::trace_init(); let svc = TaggedTransport { - inner: service_fn(|ep: Connect| { - let Remote(ServerAddr(sa)) = ep.addr; - assert_eq!(sa.port(), 4143); - assert!(ep.tls.is_some()); - let hdr = TransportHeader { - port: 4321, - name: None, - protocol: None, - }; - let buf = hdr.encode_prefaced_buf().expect("Must encode"); - let io = tokio_test::io::Builder::new() - .write(&buf[..]) - .write(b"hello") - .build(); - let meta = tls::ConnectMeta { - socket: Local(ClientAddr(([0, 0, 0, 0], 0).into())), - tls: Conditional::Some(Some(tls::NegotiatedProtocolRef(PROTOCOL).into())), - }; - future::ready(Ok::<_, io::Error>((io, meta))) - }), + inner: service_fn(expect_header(TransportHeader { + port: 4321, + name: None, + protocol: None, + })), }; let e = Endpoint { @@ -334,6 +313,79 @@ mod test { identity::Name::from_str("server.id").unwrap(), )), authority: None, + proto: None, + }; + let (mut io, _meta) = svc.oneshot(e).await.expect("Connect must not fail"); + io.write_all(b"hello").await.expect("Write must succeed"); + } + + #[tokio::test(flavor = "current_thread")] + async fn http_no_name() { + let _trace = linkerd_tracing::test::trace_init(); + + let svc = TaggedTransport { + inner: service_fn(expect_header(TransportHeader { + port: 4321, + name: None, + protocol: Some(SessionProtocol::Http1), + })), + }; + + let e = Endpoint { + port_override: Some(4143), + server_id: Some(tls::ServerId( + identity::Name::from_str("server.id").unwrap(), + )), + authority: None, + proto: Some(SessionProtocol::Http1), + }; + let (mut io, _meta) = svc.oneshot(e).await.expect("Connect must not fail"); + io.write_all(b"hello").await.expect("Write must succeed"); + } + + #[tokio::test(flavor = "current_thread")] + async fn http_named_with_port() { + let _trace = linkerd_tracing::test::trace_init(); + + let svc = TaggedTransport { + inner: service_fn(expect_header(TransportHeader { + port: 5555, + name: Some(dns::Name::from_str("foo.bar.example.com").unwrap()), + protocol: Some(SessionProtocol::Http1), + })), + }; + + let e = Endpoint { + port_override: Some(4143), + server_id: Some(tls::ServerId( + identity::Name::from_str("server.id").unwrap(), + )), + authority: Some(http::uri::Authority::from_str("foo.bar.example.com:5555").unwrap()), + proto: Some(SessionProtocol::Http1), + }; + let (mut io, _meta) = svc.oneshot(e).await.expect("Connect must not fail"); + io.write_all(b"hello").await.expect("Write must succeed"); + } + + #[tokio::test(flavor = "current_thread")] + async fn http_named_no_port() { + let _trace = linkerd_tracing::test::trace_init(); + + let svc = TaggedTransport { + inner: service_fn(expect_header(TransportHeader { + port: 4321, + name: None, + protocol: Some(SessionProtocol::Http1), + })), + }; + + let e = Endpoint { + port_override: Some(4143), + server_id: Some(tls::ServerId( + identity::Name::from_str("server.id").unwrap(), + )), + authority: None, + proto: Some(SessionProtocol::Http1), }; let (mut io, _meta) = svc.oneshot(e).await.expect("Connect must not fail"); io.write_all(b"hello").await.expect("Write must succeed"); diff --git a/linkerd/proxy/api-resolve/src/metadata.rs b/linkerd/proxy/api-resolve/src/metadata.rs index 09639d6a5..dd1623a95 100644 --- a/linkerd/proxy/api-resolve/src/metadata.rs +++ b/linkerd/proxy/api-resolve/src/metadata.rs @@ -31,6 +31,9 @@ pub enum ProtocolHint { Unknown, /// The destination can receive HTTP2 messages. Http2, + /// The destination will handle traffic as opaque, regardless of + /// the local proxy's handling of the traffic. + Opaque, } // === impl Metadata === diff --git a/linkerd/proxy/api-resolve/src/pb.rs b/linkerd/proxy/api-resolve/src/pb.rs index 383b75f59..889dc6860 100644 --- a/linkerd/proxy/api-resolve/src/pb.rs +++ b/linkerd/proxy/api-resolve/src/pb.rs @@ -26,15 +26,10 @@ pub fn to_addr_meta( let mut proto_hint = ProtocolHint::Unknown; let mut tagged_transport_port = None; if let Some(hint) = pb.protocol_hint { - // the match will make sense later... - #[allow(clippy::collapsible_match, clippy::single_match)] - if let Some(proto) = hint.protocol { - match proto { - Protocol::H2(..) => { - proto_hint = ProtocolHint::Http2; - } - _ => {} // TODO(eliza): handle opaque protocol hints - } + match hint.protocol { + Some(Protocol::H2(..)) => proto_hint = ProtocolHint::Http2, + Some(Protocol::Opaque(..)) => proto_hint = ProtocolHint::Opaque, + None => {} } if let Some(OpaqueTransport { inbound_port }) = hint.opaque_transport {