wip: app/core, io, meshtls, proxy/transport, tls

This commit is contained in:
katelyn martin 2025-02-19 18:15:34 -05:00
parent d12de6d964
commit 56dd9f55d8
18 changed files with 329 additions and 43 deletions

View File

@ -1408,6 +1408,7 @@ dependencies = [
"http 1.2.0",
"http-body",
"hyper",
"hyper-util",
"ipnet",
"linkerd-addr",
"linkerd-conditional",
@ -1995,6 +1996,7 @@ dependencies = [
"async-trait",
"bytes",
"futures",
"hyper",
"hyper-util",
"linkerd-errno",
"pin-project",
@ -2008,6 +2010,8 @@ name = "linkerd-meshtls"
version = "0.1.0"
dependencies = [
"futures",
"hyper",
"hyper-util",
"linkerd-conditional",
"linkerd-dns-name",
"linkerd-error",
@ -2033,6 +2037,8 @@ dependencies = [
"boring",
"futures",
"hex",
"hyper",
"hyper-util",
"linkerd-dns-name",
"linkerd-error",
"linkerd-identity",
@ -2052,6 +2058,8 @@ name = "linkerd-meshtls-rustls"
version = "0.1.0"
dependencies = [
"futures",
"hyper",
"hyper-util",
"linkerd-dns-name",
"linkerd-error",
"linkerd-identity",
@ -2486,6 +2494,7 @@ name = "linkerd-proxy-transport"
version = "0.1.0"
dependencies = [
"futures",
"hyper-util",
"libc",
"linkerd-error",
"linkerd-io",
@ -2633,6 +2642,7 @@ dependencies = [
"async-trait",
"bytes",
"futures",
"hyper",
"linkerd-conditional",
"linkerd-dns-name",
"linkerd-error",

View File

@ -18,6 +18,7 @@ drain = { version = "0.1", features = ["retain"] }
http = { workspace = true }
http-body = { workspace = true }
hyper = { workspace = true, features = ["http1", "http2"] }
hyper-util = { workspace = true }
futures = { version = "0.3", default-features = false }
ipnet = "2.11"
prometheus-client = "0.22"

View File

@ -69,8 +69,10 @@ impl fmt::Display for ControlAddr {
}
}
pub type RspBody =
linkerd_http_metrics::requests::ResponseBody<http::balance::Body<hyper::Body>, classify::Eos>;
pub type RspBody = linkerd_http_metrics::requests::ResponseBody<
http::balance::Body<hyper::body::Incoming>,
classify::Eos,
>;
#[derive(Clone, Debug, Default)]
pub struct Metrics {
@ -129,9 +131,9 @@ impl Config {
self.connect.user_timeout,
))
.push(tls::Client::layer(identity))
.push_connect_timeout(self.connect.timeout)
.push_connect_timeout(self.connect.timeout) // Client<NewClient, ConnectTcp>
.push_map_target(|(_version, target)| target)
.push(self::client::layer(self.connect.http2))
.push(self::client::layer::<_, _>(self.connect.http2))
.push_on_service(svc::MapErr::layer_boxed())
.into_new_service();
@ -147,6 +149,8 @@ impl Config {
.push_new_reconnect(self.connect.backoff)
.instrument(|t: &self::client::Target| info_span!("endpoint", addr = %t.addr));
todo!();
/*
let balance = endpoint
.lift_new()
.push(self::balance::layer(metrics.balance, dns, resolve_backoff))
@ -161,6 +165,7 @@ impl Config {
.push_on_service(svc::BoxCloneSyncService::layer())
.push(svc::ArcNewService::layer())
.into_inner()
*/
}
}

View File

@ -3,6 +3,7 @@ use super::{
respond::{HttpRescue, SyntheticHttpResponse},
};
use http::{header::HeaderValue, HeaderMap};
use http_body::Frame;
use linkerd_error::{Error, Result};
use pin_project::pin_project;
use std::{
@ -66,19 +67,18 @@ where
type Data = B::Data;
type Error = B::Error;
fn poll_data(
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
) -> Poll<Option<std::result::Result<http_body::Frame<Self::Data>, Self::Error>>> {
let ResponseBodyProj(inner) = self.as_mut().project();
match inner.project() {
InnerProj::Passthru(inner) => inner.poll_data(cx),
InnerProj::Rescued { trailers: _ } => Poll::Ready(None),
InnerProj::Passthru(inner) => inner.poll_frame(cx),
InnerProj::GrpcRescue {
inner,
rescue,
emit_headers,
} => match inner.poll_data(cx) {
} => match inner.poll_frame(cx) {
Poll::Ready(Some(Err(error))) => {
// The inner body has yielded an error, which we will try to rescue. If so,
// store our synthetic trailers reporting the error.
@ -88,19 +88,10 @@ where
}
data => data,
},
}
}
#[inline]
fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
let ResponseBodyProj(inner) = self.project();
match inner.project() {
InnerProj::Passthru(inner) => inner.poll_trailers(cx),
InnerProj::GrpcRescue { inner, .. } => inner.poll_trailers(cx),
InnerProj::Rescued { trailers } => Poll::Ready(Ok(trailers.take())),
InnerProj::Rescued { trailers } => {
let trailers = trailers.take().map(Frame::trailers).map(Ok);
Poll::Ready(trailers)
}
}
}

View File

@ -16,6 +16,7 @@ default = []
async-trait = "0.1"
futures = { version = "0.3", default-features = false }
bytes = { workspace = true }
hyper = { workspace = true, default-features = false }
hyper-util = { workspace = true, features = ["tokio"] }
linkerd-errno = { path = "../errno" }
tokio = { version = "1", features = ["io-util", "net"] }

View File

@ -47,6 +47,19 @@ impl<L: io::AsyncRead, R: io::AsyncRead> io::AsyncRead for EitherIo<L, R> {
}
}
impl<L: hyper::rt::Read, R: hyper::rt::Read> hyper::rt::Read for EitherIo<L, R> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: hyper::rt::ReadBufCursor<'_>,
) -> io::Poll<()> {
match self.project() {
EitherIoProj::Left(l) => l.poll_read(cx, buf),
EitherIoProj::Right(r) => r.poll_read(cx, buf),
}
}
}
impl<L: io::AsyncWrite, R: io::AsyncWrite> io::AsyncWrite for EitherIo<L, R> {
#[inline]
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> io::Poll<()> {
@ -92,3 +105,44 @@ impl<L: io::AsyncWrite, R: io::AsyncWrite> io::AsyncWrite for EitherIo<L, R> {
}
}
}
impl<L: hyper::rt::Write, R: hyper::rt::Write> hyper::rt::Write for EitherIo<L, R> {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> io::Poll<usize> {
match self.project() {
EitherIoProj::Left(l) => l.poll_write(cx, buf),
EitherIoProj::Right(r) => r.poll_write(cx, buf),
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> io::Poll<()> {
match self.project() {
EitherIoProj::Left(l) => l.poll_flush(cx),
EitherIoProj::Right(r) => r.poll_flush(cx),
}
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> io::Poll<()> {
match self.project() {
EitherIoProj::Left(l) => l.poll_shutdown(cx),
EitherIoProj::Right(r) => r.poll_shutdown(cx),
}
}
fn is_write_vectored(&self) -> bool {
match self {
EitherIo::Left(l) => l.is_write_vectored(),
EitherIo::Right(r) => r.is_write_vectored(),
}
}
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[std::io::IoSlice<'_>],
) -> io::Poll<usize> {
match self.project() {
EitherIoProj::Left(l) => l.poll_write_vectored(cx, bufs),
EitherIoProj::Right(r) => r.poll_write_vectored(cx, bufs),
}
}
}

View File

@ -78,6 +78,35 @@ impl<I: io::AsyncRead> io::AsyncRead for PrefixedIo<I> {
}
}
impl<I: hyper::rt::Read> hyper::rt::Read for PrefixedIo<I> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
mut buf: hyper::rt::ReadBufCursor<'_>,
) -> io::Poll<()> {
// XXX(kate): this is copy-pasted from `io::AsyncRead`, above.
let this = self.project();
// Check the length only once, since looking as the length
// of a Bytes isn't as cheap as the length of a &[u8].
let peeked_len = this.prefix.len();
if peeked_len == 0 {
this.io.poll_read(cx, buf)
} else {
let len = cmp::min(buf.remaining(), peeked_len);
buf.put_slice(&this.prefix.as_ref()[..len]);
this.prefix.advance(len);
// If we've finally emptied the prefix, drop it so we don't
// hold onto the allocated memory any longer. We won't peek
// again.
if peeked_len == len {
*this.prefix = Bytes::new();
}
io::Poll::Ready(Ok(()))
}
}
}
impl<I: io::Write> io::Write for PrefixedIo<I> {
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
@ -120,3 +149,29 @@ impl<I: io::AsyncWrite> io::AsyncWrite for PrefixedIo<I> {
self.io.is_write_vectored()
}
}
impl<I: hyper::rt::Write> hyper::rt::Write for PrefixedIo<I> {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> io::Poll<usize> {
self.project().io.poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> io::Poll<()> {
self.project().io.poll_flush(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> io::Poll<()> {
self.project().io.poll_shutdown(cx)
}
fn is_write_vectored(&self) -> bool {
self.io.is_write_vectored()
}
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[std::io::IoSlice<'_>],
) -> io::Poll<usize> {
self.project().io.poll_write_vectored(cx, bufs)
}
}

View File

@ -89,6 +89,17 @@ impl<I: io::AsyncRead> io::AsyncRead for ScopedIo<I> {
}
}
impl<I: hyper::rt::Read> hyper::rt::Read for ScopedIo<I> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: hyper::rt::ReadBufCursor<'_>,
) -> io::Poll<()> {
let this = self.project();
this.io.poll_read(cx, buf).map_err(this.scope.err())
}
}
impl<I: io::Write> io::Write for ScopedIo<I> {
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
@ -138,3 +149,38 @@ impl<I: io::AsyncWrite> io::AsyncWrite for ScopedIo<I> {
self.io.is_write_vectored()
}
}
impl<I: hyper::rt::Write> hyper::rt::Write for ScopedIo<I> {
#[inline]
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> io::Poll<usize> {
let this = self.project();
this.io.poll_write(cx, buf).map_err(this.scope.err())
}
#[inline]
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> io::Poll<()> {
let this = self.project();
this.io.poll_flush(cx).map_err(this.scope.err())
}
#[inline]
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> io::Poll<()> {
let this = self.project();
this.io.poll_shutdown(cx).map_err(this.scope.err())
}
fn is_write_vectored(&self) -> bool {
self.io.is_write_vectored()
}
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[std::io::IoSlice<'_>],
) -> io::Poll<usize> {
let this = self.project();
this.io
.poll_write_vectored(cx, bufs)
.map_err(this.scope.err())
}
}

View File

@ -77,6 +77,42 @@ impl<T: AsyncRead + AsyncWrite, S: Sensor> AsyncWrite for SensorIo<T, S> {
}
}
impl<T: hyper::rt::Write, S: Sensor> hyper::rt::Write for SensorIo<T, S> {
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let this = self.project();
this.sensor.record_error(this.io.poll_shutdown(cx))
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let this = self.project();
this.sensor.record_error(this.io.poll_flush(cx))
}
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<usize> {
let this = self.project();
let bytes = ready!(this.sensor.record_error(this.io.poll_write(cx, buf)))?;
this.sensor.record_write(bytes);
Poll::Ready(Ok(bytes))
}
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[std::io::IoSlice<'_>],
) -> Poll<usize> {
let this = self.project();
let bytes = ready!(this
.sensor
.record_error(this.io.poll_write_vectored(cx, bufs)))?;
this.sensor.record_write(bytes);
Poll::Ready(Ok(bytes))
}
fn is_write_vectored(&self) -> bool {
self.io.is_write_vectored()
}
}
impl<T: PeerAddr, S> PeerAddr for SensorIo<T, S> {
fn peer_addr(&self) -> Result<std::net::SocketAddr> {
self.io.peer_addr()

View File

@ -15,6 +15,8 @@ __has_any_tls_impls = []
[dependencies]
futures = { version = "0.3", default-features = false }
hyper = { workspace = true }
hyper-util = { workspace = true }
pin-project = "1"
linkerd-dns-name = { path = "../dns/name" }

View File

@ -10,6 +10,8 @@ publish = false
boring = "4"
futures = { version = "0.3", default-features = false }
hex = "0.4" # used for debug logging
hyper = { workspace = true }
hyper-util = { workspace = true }
linkerd-error = { path = "../../error" }
linkerd-dns-name = { path = "../../dns/name" }
linkerd-identity = { path = "../../identity" }

View File

@ -21,7 +21,7 @@ pub struct Connect {
pub type ConnectFuture<I> = Pin<Box<dyn Future<Output = io::Result<ClientIo<I>>> + Send>>;
#[derive(Debug)]
pub struct ClientIo<I>(tokio_boring::SslStream<I>);
pub struct ClientIo<I>(hyper_util::rt::TokioIo<tokio_boring::SslStream<I>>);
// === impl NewClient ===
@ -117,7 +117,7 @@ where
"Initiated TLS connection"
);
trace!(peer.id = %server_id, peer.name = %server_name);
Ok(ClientIo(io))
Ok(ClientIo(hyper_util::rt::TokioIo::new(io)))
})
}
}
@ -131,6 +131,16 @@ impl<I: io::AsyncRead + io::AsyncWrite + Unpin> io::AsyncRead for ClientIo<I> {
cx: &mut Context<'_>,
buf: &mut io::ReadBuf<'_>,
) -> io::Poll<()> {
Pin::new(self.0.inner_mut()).poll_read(cx, buf)
}
}
impl<I: io::AsyncRead + io::AsyncWrite + Unpin> hyper::rt::Read for ClientIo<I> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: hyper::rt::ReadBufCursor<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
Pin::new(&mut self.0).poll_read(cx, buf)
}
}
@ -138,17 +148,17 @@ impl<I: io::AsyncRead + io::AsyncWrite + Unpin> io::AsyncRead for ClientIo<I> {
impl<I: io::AsyncRead + io::AsyncWrite + Unpin> io::AsyncWrite for ClientIo<I> {
#[inline]
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> io::Poll<()> {
Pin::new(&mut self.0).poll_flush(cx)
Pin::new(self.0.inner_mut()).poll_flush(cx)
}
#[inline]
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> io::Poll<()> {
Pin::new(&mut self.0).poll_shutdown(cx)
Pin::new(self.0.inner_mut()).poll_shutdown(cx)
}
#[inline]
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> io::Poll<usize> {
Pin::new(&mut self.0).poll_write(cx, buf)
Pin::new(self.0.inner_mut()).poll_write(cx, buf)
}
#[inline]
@ -157,12 +167,12 @@ impl<I: io::AsyncRead + io::AsyncWrite + Unpin> io::AsyncWrite for ClientIo<I> {
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> io::Poll<usize> {
Pin::new(&mut self.0).poll_write_vectored(cx, bufs)
Pin::new(self.0.inner_mut()).poll_write_vectored(cx, bufs)
}
#[inline]
fn is_write_vectored(&self) -> bool {
self.0.is_write_vectored()
self.0.inner().is_write_vectored()
}
}
@ -170,6 +180,7 @@ impl<I> ClientIo<I> {
#[inline]
pub fn negotiated_protocol(&self) -> Option<NegotiatedProtocolRef<'_>> {
self.0
.inner()
.ssl()
.selected_alpn_protocol()
.map(NegotiatedProtocolRef)
@ -179,6 +190,6 @@ impl<I> ClientIo<I> {
impl<I: io::PeerAddr> io::PeerAddr for ClientIo<I> {
#[inline]
fn peer_addr(&self) -> io::Result<std::net::SocketAddr> {
self.0.get_ref().peer_addr()
self.0.inner().get_ref().peer_addr()
}
}

View File

@ -11,6 +11,8 @@ test-util = ["linkerd-tls-test-util"]
[dependencies]
futures = { version = "0.3", default-features = false }
hyper = { workspace = true }
hyper-util = { workspace = true }
ring = { version = "0.17", features = ["std"] }
rustls-pemfile = "2.2"
rustls-webpki = { version = "0.102.8", features = ["std"] }

View File

@ -3,7 +3,10 @@ use linkerd_identity as id;
use linkerd_io as io;
use linkerd_meshtls_verifier as verifier;
use linkerd_stack::{NewService, Service};
use linkerd_tls::{client::AlpnProtocols, ClientTls, NegotiatedProtocolRef};
use linkerd_tls::{
client::{self, AlpnProtocols},
ClientTls, NegotiatedProtocolRef,
};
use std::{convert::TryFrom, pin::Pin, sync::Arc, task::Context};
use tokio::sync::watch;
use tokio_rustls::rustls::{self, pki_types::CertificateDer, ClientConfig};
@ -25,7 +28,7 @@ pub struct Connect {
pub type ConnectFuture<I> = Pin<Box<dyn Future<Output = io::Result<ClientIo<I>>> + Send>>;
#[derive(Debug)]
pub struct ClientIo<I>(tokio_rustls::client::TlsStream<I>);
pub struct ClientIo<I>(hyper_util::rt::TokioIo<tokio_rustls::client::TlsStream<I>>);
// === impl NewClient ===
@ -115,7 +118,7 @@ where
let (_, conn) = s.get_ref();
let end_cert = extract_cert(conn)?;
verifier::verify_id(end_cert, &server_id)?;
Ok(ClientIo(s))
Ok(ClientIo(hyper_util::rt::TokioIo::new(s)))
}),
)
}
@ -130,6 +133,16 @@ impl<I: io::AsyncRead + io::AsyncWrite + Unpin> io::AsyncRead for ClientIo<I> {
cx: &mut Context<'_>,
buf: &mut io::ReadBuf<'_>,
) -> io::Poll<()> {
Pin::new(self.0.inner_mut()).poll_read(cx, buf)
}
}
impl<I: io::AsyncRead + io::AsyncWrite + Unpin> hyper::rt::Read for ClientIo<I> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: hyper::rt::ReadBufCursor<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
Pin::new(&mut self.0).poll_read(cx, buf)
}
}
@ -137,17 +150,17 @@ impl<I: io::AsyncRead + io::AsyncWrite + Unpin> io::AsyncRead for ClientIo<I> {
impl<I: io::AsyncRead + io::AsyncWrite + Unpin> io::AsyncWrite for ClientIo<I> {
#[inline]
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> io::Poll<()> {
Pin::new(&mut self.0).poll_flush(cx)
Pin::new(self.0.inner_mut()).poll_flush(cx)
}
#[inline]
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> io::Poll<()> {
Pin::new(&mut self.0).poll_shutdown(cx)
Pin::new(self.0.inner_mut()).poll_shutdown(cx)
}
#[inline]
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> io::Poll<usize> {
Pin::new(&mut self.0).poll_write(cx, buf)
Pin::new(self.0.inner_mut()).poll_write(cx, buf)
}
#[inline]
@ -156,12 +169,12 @@ impl<I: io::AsyncRead + io::AsyncWrite + Unpin> io::AsyncWrite for ClientIo<I> {
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> io::Poll<usize> {
Pin::new(&mut self.0).poll_write_vectored(cx, bufs)
Pin::new(self.0.inner_mut()).poll_write_vectored(cx, bufs)
}
#[inline]
fn is_write_vectored(&self) -> bool {
self.0.is_write_vectored()
self.0.inner().is_write_vectored()
}
}
@ -169,6 +182,7 @@ impl<I> ClientIo<I> {
#[inline]
pub fn negotiated_protocol(&self) -> Option<NegotiatedProtocolRef<'_>> {
self.0
.inner()
.get_ref()
.1
.alpn_protocol()
@ -179,6 +193,6 @@ impl<I> ClientIo<I> {
impl<I: io::PeerAddr> io::PeerAddr for ClientIo<I> {
#[inline]
fn peer_addr(&self) -> io::Result<std::net::SocketAddr> {
self.0.get_ref().0.peer_addr()
self.0.inner().get_ref().0.peer_addr()
}
}

View File

@ -180,6 +180,23 @@ impl<I: io::AsyncRead + io::AsyncWrite + Unpin> io::AsyncRead for ClientIo<I> {
}
}
impl<I: io::AsyncRead + io::AsyncWrite + Unpin> hyper::rt::Read for ClientIo<I> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: hyper::rt::ReadBufCursor<'_>,
) -> io::Poll<()> {
match self.project() {
#[cfg(feature = "boring")]
ClientIoProj::Boring(io) => io.poll_read(cx, buf),
#[cfg(feature = "rustls")]
ClientIoProj::Rustls(io) => io.poll_read(cx, buf),
#[cfg(not(feature = "__has_any_tls_impls"))]
_ => crate::no_tls!(cx, buf),
}
}
}
impl<I: io::AsyncRead + io::AsyncWrite + Unpin> io::AsyncWrite for ClientIo<I> {
#[inline]
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> io::Poll<()> {
@ -251,6 +268,39 @@ impl<I: io::AsyncRead + io::AsyncWrite + Unpin> io::AsyncWrite for ClientIo<I> {
}
}
impl<I> hyper::rt::Write for ClientIo<I> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
todo!()
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
todo!()
}
fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
todo!()
}
fn is_write_vectored(&self) -> bool {
todo!()
}
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[std::io::IoSlice<'_>],
) -> Poll<Result<usize, std::io::Error>> {
todo!()
}
}
impl<I: io::PeerAddr> io::PeerAddr for ClientIo<I> {
#[inline]
fn peer_addr(&self) -> io::Result<std::net::SocketAddr> {

View File

@ -11,6 +11,7 @@ Transport-level implementations that rely on core proxy infrastructure
[dependencies]
futures = { version = "0.3", default-features = false }
hyper-util = { workspace = true }
linkerd-error = { path = "../../error" }
linkerd-io = { path = "../../io" }
linkerd-stack = { path = "../../stack" }

View File

@ -6,7 +6,6 @@ use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::net::TcpStream;
use tracing::debug;
#[derive(Copy, Clone, Debug)]
@ -15,6 +14,8 @@ pub struct ConnectTcp {
user_timeout: UserTimeout,
}
type TcpStream = io::ScopedIo<hyper_util::rt::TokioIo<tokio::net::TcpStream>>;
impl ConnectTcp {
pub fn new(keepalive: Keepalive, user_timeout: UserTimeout) -> Self {
Self {
@ -25,7 +26,7 @@ impl ConnectTcp {
}
impl<T: Param<Remote<ServerAddr>>> Service<T> for ConnectTcp {
type Response = (io::ScopedIo<TcpStream>, Local<ClientAddr>);
type Response = (TcpStream, Local<ClientAddr>);
type Error = io::Error;
type Future = Pin<Box<dyn Future<Output = io::Result<Self::Response>> + Send + Sync + 'static>>;
@ -39,7 +40,7 @@ impl<T: Param<Remote<ServerAddr>>> Service<T> for ConnectTcp {
let Remote(ServerAddr(addr)) = t.param();
debug!(server.addr = %addr, "Connecting");
Box::pin(async move {
let io = TcpStream::connect(&addr).await?;
let io = tokio::net::TcpStream::connect(&addr).await?;
super::set_nodelay_or_warn(&io);
let io = super::set_keepalive_or_warn(io, keepalive)?;
let io = super::set_user_timeout_or_warn(io, user_timeout)?;
@ -49,7 +50,10 @@ impl<T: Param<Remote<ServerAddr>>> Service<T> for ConnectTcp {
?keepalive,
"Connected",
);
Ok((io::ScopedIo::client(io), Local(ClientAddr(local_addr))))
Ok((
io::ScopedIo::client(hyper_util::rt::TokioIo::new(io)),
Local(ClientAddr(local_addr)),
))
})
}
}

View File

@ -10,6 +10,7 @@ publish = false
async-trait = "0.1"
bytes = { workspace = true }
futures = { version = "0.3", default-features = false }
hyper = { workspace = true }
linkerd-conditional = { path = "../conditional" }
linkerd-dns-name = { path = "../dns/name" }
linkerd-error = { path = "../error" }