Compare commits

...

5 Commits

Author SHA1 Message Date
Istio Automation 700791a158
[release-1.27] Multinetwork/Support remote networks for services with waypoints (#1592)
* Multinetwork/Support remote networks for services with waypoints

Currently `build_request` when it sees a service with a waypoint
resolves the waypoint backend and routes request there using regular
HBONE.

In multi network scenario though the waypoint may have workload on a
remote network and to reach it we have to go through E/W gateway and use
double HBONE.

This change enables handling of services with waypoint on a remote
network.

Some of the assumptions that were used when I prepared this change:

1. We assume uniformity of configuration (e.g., if service X in local
   cluster has a waypoint, then service X in remote network also has a
   waypoint, if waypoint is service addressable, then it's using service
   to address waypoint both locally and on remote network)
2  Split-horizon representation of waypoint workloads, just like with
   any regular workloads and services (e.g., in the local cluster
   instead of an actual waypoint workload pointing to a pod on another
   network we will have a "proxy" representation that just has network
   gateway).

Both of those can be in hanled by the controlplane (e.g., controlplane
can generate split-horizon workloads and when configuration is
non-uniform, just filter out remote configs for remote networks), though
we don't yet have a complete implementation.

Signed-off-by: Mikhail Krinkin <mkrinkin@microsoft.com>

* Return an error instead of panicking

Signed-off-by: Mikhail Krinkin <mkrinkin@microsoft.com>

* Update comments in src/proxy/outbound.rs

Co-authored-by: Ian Rudie <ilrudie@gmail.com>

* Update comments in src/proxy/outbound.rs

Signed-off-by: Mikhail Krinkin <mkrinkin@microsoft.com>

* Add a debug assert to provide a bit more context to the error in tests

Signed-off-by: Mikhail Krinkin <mkrinkin@microsoft.com>

* Fix formatting

Signed-off-by: Mikhail Krinkin <mkrinkin@microsoft.com>

* Added a few debug logs to be able to trace when a workload on a remote network is picked

Signed-off-by: Mikhail Krinkin <mkrinkin@microsoft.com>

---------

Signed-off-by: Mikhail Krinkin <mkrinkin@microsoft.com>
Co-authored-by: Mikhail Krinkin <mkrinkin@microsoft.com>
Co-authored-by: Krinkin, Mike <krinkin.m.u@gmail.com>
Co-authored-by: Ian Rudie <ilrudie@gmail.com>
2025-07-09 07:38:31 -04:00
Istio Automation 8ee3492325
[release-1.27] increasing limit for open files (#1590)
* increasing limit for open files

* suggestion from PR

* adding comment

* Update src/main.rs

Co-authored-by: Daniel Hawton <daniel@hawton.org>

---------

Co-authored-by: Gustavo <grnmeira@gmail.com>
Co-authored-by: Gustavo Meira <grnmeira@users.noreply.github.com>
Co-authored-by: Daniel Hawton <daniel@hawton.org>
2025-07-09 04:31:31 -04:00
Istio Automation 193d425d57
[release-1.27] Buffer inner h2 streams (#1591)
* Buffer h2 streams

* Tests

* naming

* Review

simplify code

---------

Co-authored-by: Steven Jin Xuan <sjinxuan@microsoft.com>
2025-07-08 19:05:30 -04:00
Istio Automation 31ddeb929b
Automator: update common-files@release-1.27 in istio/ztunnel@release-1.27 (#1588) 2025-07-07 22:32:29 -04:00
zirain 39333d0566
[release-1.27] Automated branching step 4 (#1587) 2025-07-07 13:13:29 -04:00
10 changed files with 412 additions and 81 deletions

View File

@ -1,6 +1 @@
* @istio/wg-networking-maintainers-ztunnel * @istio/release-managers-1-27
/Makefile* @istio/wg-test-and-release-maintainers
/*.md @istio/wg-test-and-release-maintainers
/common/ @istio/wg-test-and-release-maintainers
/common-protos/ @istio/wg-test-and-release-maintainers
/scripts/ @istio/wg-test-and-release-maintainers

View File

@ -71,7 +71,7 @@ itertools = "0.14"
keyed_priority_queue = "0.4" keyed_priority_queue = "0.4"
libc = "0.2" libc = "0.2"
log = "0.4" log = "0.4"
nix = { version = "0.29", features = ["socket", "sched", "uio", "fs", "ioctl", "user", "net", "mount"] } nix = { version = "0.29", features = ["socket", "sched", "uio", "fs", "ioctl", "user", "net", "mount", "resource" ] }
once_cell = "1.21" once_cell = "1.21"
num_cpus = "1.16" num_cpus = "1.16"
ppp = "2.3" ppp = "2.3"

View File

@ -1 +1 @@
73315b91eb0bb247476b5b03911ec7f2cb6b5176 c86fa2ea9bd8350fe4fb040b78201c05a6023ced

View File

@ -92,7 +92,7 @@ mirror-licenses: mod-download-go
@license-lint --mirror @license-lint --mirror
TMP := $(shell mktemp -d -u) TMP := $(shell mktemp -d -u)
UPDATE_BRANCH ?= "master" UPDATE_BRANCH ?= "release-1.27"
BUILD_TOOLS_ORG ?= "istio" BUILD_TOOLS_ORG ?= "istio"

View File

@ -75,7 +75,7 @@ fi
TOOLS_REGISTRY_PROVIDER=${TOOLS_REGISTRY_PROVIDER:-gcr.io} TOOLS_REGISTRY_PROVIDER=${TOOLS_REGISTRY_PROVIDER:-gcr.io}
PROJECT_ID=${PROJECT_ID:-istio-testing} PROJECT_ID=${PROJECT_ID:-istio-testing}
if [[ "${IMAGE_VERSION:-}" == "" ]]; then if [[ "${IMAGE_VERSION:-}" == "" ]]; then
IMAGE_VERSION=master-9a50b34135f1aa16b04d56dcb89c5b91101a28aa IMAGE_VERSION=release-1.27-9a50b34135f1aa16b04d56dcb89c5b91101a28aa
fi fi
if [[ "${IMAGE_NAME:-}" == "" ]]; then if [[ "${IMAGE_NAME:-}" == "" ]]; then
IMAGE_NAME=build-tools IMAGE_NAME=build-tools

View File

@ -14,8 +14,9 @@
extern crate core; extern crate core;
use nix::sys::resource::{Resource, getrlimit, setrlimit};
use std::sync::Arc; use std::sync::Arc;
use tracing::info; use tracing::{info, warn};
use ztunnel::*; use ztunnel::*;
#[cfg(feature = "jemalloc")] #[cfg(feature = "jemalloc")]
@ -28,6 +29,26 @@ static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
#[unsafe(export_name = "malloc_conf")] #[unsafe(export_name = "malloc_conf")]
pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19\0"; pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19\0";
// We use this on Unix systems to increase the number of open file descriptors
// if possible. This is useful for high-load scenarios where the default limit
// is too low, which can lead to droopped connections and other issues:
// see: https://github.com/istio/ztunnel/issues/1585
fn increase_open_files_limit() {
#[cfg(unix)]
if let Ok((soft_limit, hard_limit)) = getrlimit(Resource::RLIMIT_NOFILE) {
if let Err(e) = setrlimit(Resource::RLIMIT_NOFILE, hard_limit, hard_limit) {
warn!("failed to set file descriptor limits: {e}");
} else {
info!(
"set file descriptor limits from {} to {}",
soft_limit, hard_limit
);
}
} else {
warn!("failed to get file descriptor limits");
}
}
fn main() -> anyhow::Result<()> { fn main() -> anyhow::Result<()> {
let _log_flush = telemetry::setup_logging(); let _log_flush = telemetry::setup_logging();
@ -74,6 +95,7 @@ fn version() -> anyhow::Result<()> {
async fn proxy(cfg: Arc<config::Config>) -> anyhow::Result<()> { async fn proxy(cfg: Arc<config::Config>) -> anyhow::Result<()> {
info!("version: {}", version::BuildInfo::new()); info!("version: {}", version::BuildInfo::new());
increase_open_files_limit();
info!("running with config: {}", serde_yaml::to_string(&cfg)?); info!("running with config: {}", serde_yaml::to_string(&cfg)?);
app::build(cfg).await?.wait_termination().await app::build(cfg).await?.wait_termination().await
} }

View File

@ -484,6 +484,9 @@ pub enum Error {
#[error("requested service {0} found, but has no IP addresses")] #[error("requested service {0} found, but has no IP addresses")]
NoIPForService(String), NoIPForService(String),
#[error("no service for target address: {0}")]
NoService(SocketAddr),
#[error( #[error(
"ip addresses were resolved for workload {0}, but valid dns response had no A/AAAA records" "ip addresses were resolved for workload {0}, but valid dns response had no A/AAAA records"
)] )]

View File

@ -13,7 +13,7 @@
// limitations under the License. // limitations under the License.
use crate::copy; use crate::copy;
use bytes::{BufMut, Bytes}; use bytes::Bytes;
use futures_core::ready; use futures_core::ready;
use h2::Reason; use h2::Reason;
use std::io::Error; use std::io::Error;
@ -85,7 +85,10 @@ pub struct H2StreamWriteHalf {
_dropped: Option<DropCounter>, _dropped: Option<DropCounter>,
} }
pub struct TokioH2Stream(H2Stream); pub struct TokioH2Stream {
stream: H2Stream,
buf: Bytes,
}
struct DropCounter { struct DropCounter {
// Whether the other end of this shared counter has already dropped. // Whether the other end of this shared counter has already dropped.
@ -144,7 +147,10 @@ impl Drop for DropCounter {
// then the specific implementation will conflict with the generic one. // then the specific implementation will conflict with the generic one.
impl TokioH2Stream { impl TokioH2Stream {
pub fn new(stream: H2Stream) -> Self { pub fn new(stream: H2Stream) -> Self {
Self(stream) Self {
stream,
buf: Bytes::new(),
}
} }
} }
@ -154,21 +160,21 @@ impl tokio::io::AsyncRead for TokioH2Stream {
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>, buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> { ) -> Poll<std::io::Result<()>> {
let pinned = std::pin::Pin::new(&mut self.0.read); // Just return the bytes we have left over and don't poll the stream because
copy::ResizeBufRead::poll_bytes(pinned, cx).map(|r| match r { // its unclear what to do if there are bytes left over from the previous read, and when we
Ok(bytes) => { // poll, we get an error.
if buf.remaining() < bytes.len() { if self.buf.is_empty() {
Err(Error::other(format!( // If we have no unread bytes, we can poll the stream
"kould overflow buffer of with {} remaining", // and fill self.buf with the bytes we read.
buf.remaining() let pinned = std::pin::Pin::new(&mut self.stream.read);
))) let res = ready!(copy::ResizeBufRead::poll_bytes(pinned, cx))?;
} else { self.buf = res;
buf.put(bytes); }
Ok(()) // Copy as many bytes as we can from self.buf.
} let cnt = Ord::min(buf.remaining(), self.buf.len());
} buf.put_slice(&self.buf[..cnt]);
Err(e) => Err(e), self.buf = self.buf.split_off(cnt);
}) Poll::Ready(Ok(()))
} }
} }
@ -178,7 +184,7 @@ impl tokio::io::AsyncWrite for TokioH2Stream {
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &[u8], buf: &[u8],
) -> Poll<Result<usize, tokio::io::Error>> { ) -> Poll<Result<usize, tokio::io::Error>> {
let pinned = std::pin::Pin::new(&mut self.0.write); let pinned = std::pin::Pin::new(&mut self.stream.write);
let buf = Bytes::copy_from_slice(buf); let buf = Bytes::copy_from_slice(buf);
copy::AsyncWriteBuf::poll_write_buf(pinned, cx, buf) copy::AsyncWriteBuf::poll_write_buf(pinned, cx, buf)
} }
@ -187,7 +193,7 @@ impl tokio::io::AsyncWrite for TokioH2Stream {
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> { ) -> Poll<Result<(), std::io::Error>> {
let pinned = std::pin::Pin::new(&mut self.0.write); let pinned = std::pin::Pin::new(&mut self.stream.write);
copy::AsyncWriteBuf::poll_flush(pinned, cx) copy::AsyncWriteBuf::poll_flush(pinned, cx)
} }
@ -195,7 +201,7 @@ impl tokio::io::AsyncWrite for TokioH2Stream {
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> { ) -> Poll<Result<(), std::io::Error>> {
let pinned = std::pin::Pin::new(&mut self.0.write); let pinned = std::pin::Pin::new(&mut self.stream.write);
copy::AsyncWriteBuf::poll_shutdown(pinned, cx) copy::AsyncWriteBuf::poll_shutdown(pinned, cx)
} }
} }

View File

@ -35,10 +35,10 @@ use crate::proxy::{ConnectionOpen, ConnectionResult, DerivedWorkload, metrics};
use crate::drain::DrainWatcher; use crate::drain::DrainWatcher;
use crate::drain::run_with_drain; use crate::drain::run_with_drain;
use crate::proxy::h2::{H2Stream, client::WorkloadKey}; use crate::proxy::h2::{H2Stream, client::WorkloadKey};
use crate::state::ServiceResolutionMode; use crate::state::service::{Service, ServiceDescription};
use crate::state::service::ServiceDescription;
use crate::state::workload::OutboundProtocol; use crate::state::workload::OutboundProtocol;
use crate::state::workload::{InboundProtocol, NetworkAddress, Workload, address::Address}; use crate::state::workload::{InboundProtocol, NetworkAddress, Workload, address::Address};
use crate::state::{ServiceResolutionMode, Upstream};
use crate::{assertions, copy, proxy, socket}; use crate::{assertions, copy, proxy, socket};
use super::h2::TokioH2Stream; use super::h2::TokioH2Stream;
@ -369,6 +369,83 @@ impl OutboundConnection {
} }
} }
// This function is called when the select next hop is on a different network,
// so we expect the upstream workload to have a network gatewy configured.
//
// When we use a gateway to reach to a workload on a remote network we have to
// use double HBONE (HBONE incapsulated inside HBONE). The gateway will
// terminate the outer HBONE tunnel and forward the inner HBONE to the actual
// destination as a opaque stream of bytes and the actual destination will
// interpret it as an HBONE connection.
//
// If the upstream workload does not have an E/W gateway this function returns
// an error indicating that it could not find a valid destination.
//
// A note about double HBONE, in double HBONE both inner and outer HBONE use
// destination service name as HBONE target URI.
//
// Having target URI in the outer HBONE tunnel allows E/W gateway to figure out
// where to route the data next witout the need to terminate inner HBONE tunnel.
// In other words, it could forward inner HBONE as if it's an opaque stream of
// bytes without trying to interpret it.
//
// NOTE: when connecting through an E/W gateway, regardless of whether there is
// a waypoint or not, we always use service hostname and the service port. It's
// somewhat different from how regular HBONE works, so I'm calling it out here.
async fn build_request_through_gateway(
&self,
source: Arc<Workload>,
// next hop on the remote network that we picked as our destination.
// It may be a local view of a Waypoint workload on remote network or
// a local view of the service workload (when waypoint is not
// configured).
upstream: Upstream,
// This is a target service we wanted to reach in the first place.
//
// NOTE: Crossing network boundaries is only supported for services
// at the moment, so we should always have a service we could use.
service: &Service,
target: SocketAddr,
) -> Result<Request, Error> {
if let Some(gateway) = &upstream.workload.network_gateway {
let gateway_upstream = self
.pi
.state
.fetch_network_gateway(gateway, &source, target)
.await?;
let hbone_target_destination = Some(HboneAddress::SvcHostname(
service.hostname.clone(),
target.port(),
));
debug!("built request to a destination on another network through an E/W gateway");
Ok(Request {
protocol: OutboundProtocol::DOUBLEHBONE,
source,
hbone_target_destination,
actual_destination_workload: Some(gateway_upstream.workload.clone()),
intended_destination_service: Some(ServiceDescription::from(service)),
actual_destination: gateway_upstream.workload_socket_addr().ok_or(
Error::NoValidDestination(Box::new((*gateway_upstream.workload).clone())),
)?,
// The outer tunnel of double HBONE is terminated by the E/W
// gateway and so for the credentials of the next hop
// (upstream_sans) we use gateway credentials.
upstream_sans: gateway_upstream.workload_and_services_san(),
// The inner HBONE tunnel is terminated by either the server
// we want to reach or a Waypoint in front of it, depending on
// the configuration. So for the final destination credentials
// (final_sans) we use the upstream workload credentials.
final_sans: upstream.service_sans(),
})
} else {
// Do not try to send cross-network traffic without network gateway.
Err(Error::NoValidDestination(Box::new(
(*upstream.workload).clone(),
)))
}
}
// build_request computes all information about the request we should send // build_request computes all information about the request we should send
// TODO: Do we want a single lock for source and upstream...? // TODO: Do we want a single lock for source and upstream...?
async fn build_request( async fn build_request(
@ -381,7 +458,7 @@ impl OutboundConnection {
// If this is to-service traffic check for a service waypoint // If this is to-service traffic check for a service waypoint
// Capture result of whether this is svc addressed // Capture result of whether this is svc addressed
let svc_addressed = if let Some(Address::Service(target_service)) = state let service = if let Some(Address::Service(target_service)) = state
.fetch_address(&NetworkAddress { .fetch_address(&NetworkAddress {
network: self.pi.cfg.network.clone(), network: self.pi.cfg.network.clone(),
address: target.ip(), address: target.ip(),
@ -393,6 +470,18 @@ impl OutboundConnection {
.fetch_service_waypoint(&target_service, &source_workload, target) .fetch_service_waypoint(&target_service, &source_workload, target)
.await? .await?
{ {
if waypoint.workload.network != source_workload.network {
debug!("picked a waypoint on remote network");
return self
.build_request_through_gateway(
source_workload.clone(),
waypoint,
&target_service,
target,
)
.await;
}
let upstream_sans = waypoint.workload_and_services_san(); let upstream_sans = waypoint.workload_and_services_san();
let actual_destination = let actual_destination =
waypoint waypoint
@ -413,10 +502,10 @@ impl OutboundConnection {
}); });
} }
// this was service addressed but we did not find a waypoint // this was service addressed but we did not find a waypoint
true Some(target_service)
} else { } else {
// this wasn't service addressed // this wasn't service addressed
false None
}; };
let Some(us) = state let Some(us) = state
@ -428,7 +517,7 @@ impl OutboundConnection {
) )
.await? .await?
else { else {
if svc_addressed { if service.is_some() {
return Err(Error::NoHealthyUpstream(target)); return Err(Error::NoHealthyUpstream(target));
} }
debug!("built request as passthrough; no upstream found"); debug!("built request as passthrough; no upstream found");
@ -446,37 +535,26 @@ impl OutboundConnection {
// Check whether we are using an E/W gateway and sending cross network traffic // Check whether we are using an E/W gateway and sending cross network traffic
if us.workload.network != source_workload.network { if us.workload.network != source_workload.network {
if let Some(ew_gtw) = &us.workload.network_gateway { // Workloads on remote network must be service addressed, so if we got here
let gtw_us = { // and we don't have a service for the original target address then it's a
self.pi // bug either in ztunnel itself or in istiod.
.state //
.fetch_network_gateway(ew_gtw, &source_workload, target) // For a double HBONE protocol implementation we have to know the
.await? // destination service and if there is no service for the target it's a bug.
}; //
// This situation "should never happen" because for workloads fetch_upstream
let svc = us // above only checks the workloads on the same network as this ztunnel
.destination_service // instance and therefore it should not be able to find a workload on a
.as_ref() // different network.
.expect("Workloads with network gateways must be service addressed."); debug_assert!(
let hbone_target_destination = service.is_some(),
Some(HboneAddress::SvcHostname(svc.hostname.clone(), us.port)); "workload on remote network is not service addressed"
);
return Ok(Request { debug!("picked a workload on remote network");
protocol: OutboundProtocol::DOUBLEHBONE, let service = service.as_ref().ok_or(Error::NoService(target))?;
source: source_workload, return self
hbone_target_destination, .build_request_through_gateway(source_workload.clone(), us, service, target)
actual_destination_workload: Some(gtw_us.workload.clone()), .await;
intended_destination_service: us.destination_service.clone(),
actual_destination: gtw_us.workload_socket_addr().ok_or(
Error::NoValidDestination(Box::new((*gtw_us.workload).clone())),
)?,
upstream_sans: gtw_us.workload_and_services_san(),
final_sans: us.service_sans(),
});
} else {
// Do not try to send cross-network traffic without network gateway.
return Err(Error::NoValidDestination(Box::new((*us.workload).clone())));
}
} }
// We are not using a network gateway and there is no workload address. // We are not using a network gateway and there is no workload address.
@ -491,7 +569,7 @@ impl OutboundConnection {
// Check if we need to go through a workload addressed waypoint. // Check if we need to go through a workload addressed waypoint.
// Don't traverse waypoint twice if the source is sandwich-outbound. // Don't traverse waypoint twice if the source is sandwich-outbound.
// Don't traverse waypoint if traffic was addressed to a service (handled before) // Don't traverse waypoint if traffic was addressed to a service (handled before)
if !from_waypoint && !svc_addressed { if !from_waypoint && service.is_none() {
// For case upstream server has enabled waypoint // For case upstream server has enabled waypoint
let waypoint = state let waypoint = state
.fetch_workload_waypoint(&us.workload, &source_workload, target) .fetch_workload_waypoint(&us.workload, &source_workload, target)
@ -816,6 +894,8 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn build_request_double_hbone() { async fn build_request_double_hbone() {
// example.com service has a workload on remote network.
// E/W gateway is addressed by an IP.
run_build_request_multi( run_build_request_multi(
"127.0.0.1", "127.0.0.1",
"127.0.0.3:80", "127.0.0.3:80",
@ -867,11 +947,13 @@ mod tests {
], ],
Some(ExpectedRequest { Some(ExpectedRequest {
protocol: OutboundProtocol::DOUBLEHBONE, protocol: OutboundProtocol::DOUBLEHBONE,
hbone_destination: "example.com:8080", hbone_destination: "example.com:80",
destination: "10.22.1.1:15009", destination: "10.22.1.1:15009",
}), }),
) )
.await; .await;
// example.com service has a workload on remote network.
// E/W gateway is addressed by a hostname.
run_build_request_multi( run_build_request_multi(
"127.0.0.1", "127.0.0.1",
"127.0.0.3:80", "127.0.0.3:80",
@ -944,11 +1026,218 @@ mod tests {
], ],
Some(ExpectedRequest { Some(ExpectedRequest {
protocol: OutboundProtocol::DOUBLEHBONE, protocol: OutboundProtocol::DOUBLEHBONE,
hbone_destination: "example.com:8080", hbone_destination: "example.com:80",
destination: "127.0.0.5:15008", destination: "127.0.0.5:15008",
}), }),
) )
.await; .await;
// example.com service has a waypoint and waypoint workload is on remote network.
// E/W gateway is addressed by an IP.
run_build_request_multi(
"127.0.0.1",
"127.0.0.3:80",
vec![
XdsAddressType::Service(XdsService {
hostname: "example.com".to_string(),
addresses: vec![XdsNetworkAddress {
network: "".to_string(),
address: vec![127, 0, 0, 3],
}],
ports: vec![Port {
service_port: 80,
target_port: 8080,
}],
waypoint: Some(xds::istio::workload::GatewayAddress {
destination: Some(
xds::istio::workload::gateway_address::Destination::Hostname(
XdsNamespacedHostname {
namespace: Default::default(),
hostname: "waypoint.com".into(),
},
),
),
hbone_mtls_port: 15008,
}),
..Default::default()
}),
XdsAddressType::Service(XdsService {
hostname: "waypoint.com".to_string(),
addresses: vec![XdsNetworkAddress {
network: "".to_string(),
address: vec![127, 0, 0, 4],
}],
ports: vec![Port {
service_port: 15008,
target_port: 15008,
}],
..Default::default()
}),
XdsAddressType::Workload(XdsWorkload {
uid: "Kubernetes//Pod/default/remote-waypoint-pod".to_string(),
addresses: vec![],
network: "remote".to_string(),
network_gateway: Some(xds::istio::workload::GatewayAddress {
destination: Some(
xds::istio::workload::gateway_address::Destination::Address(
XdsNetworkAddress {
network: "remote".to_string(),
address: vec![10, 22, 1, 1],
},
),
),
hbone_mtls_port: 15009,
}),
services: std::collections::HashMap::from([(
"/waypoint.com".to_string(),
PortList {
ports: vec![Port {
service_port: 15008,
target_port: 15008,
}],
},
)]),
..Default::default()
}),
XdsAddressType::Workload(XdsWorkload {
uid: "Kubernetes//Pod/default/remote-ew-gtw".to_string(),
addresses: vec![Bytes::copy_from_slice(&[10, 22, 1, 1])],
network: "remote".to_string(),
..Default::default()
}),
],
Some(ExpectedRequest {
protocol: OutboundProtocol::DOUBLEHBONE,
hbone_destination: "example.com:80",
destination: "10.22.1.1:15009",
}),
)
.await;
}
#[tokio::test]
async fn build_request_failover_to_remote() {
// Similar to the double HBONE test that we already have, but it sets up a scenario when
// load balancing logic will pick a workload on a remote cluster when local workloads are
// unhealthy, thus showing the expected failover behavior.
let service = XdsAddressType::Service(XdsService {
hostname: "example.com".to_string(),
addresses: vec![XdsNetworkAddress {
network: "".to_string(),
address: vec![127, 0, 0, 3],
}],
ports: vec![Port {
service_port: 80,
target_port: 8080,
}],
// Prefer routing to workloads on the same network, but when nothing is healthy locally
// allow failing over to remote networks.
load_balancing: Some(xds::istio::workload::LoadBalancing {
routing_preference: vec![
xds::istio::workload::load_balancing::Scope::Network.into(),
],
mode: xds::istio::workload::load_balancing::Mode::Failover.into(),
..Default::default()
}),
..Default::default()
});
let ew_gateway = XdsAddressType::Workload(XdsWorkload {
uid: "Kubernetes//Pod/default/remote-ew-gtw".to_string(),
addresses: vec![Bytes::copy_from_slice(&[10, 22, 1, 1])],
network: "remote".to_string(),
..Default::default()
});
let remote_workload = XdsAddressType::Workload(XdsWorkload {
uid: "Kubernetes//Pod/default/remote-example.com-pod".to_string(),
addresses: vec![],
network: "remote".to_string(),
network_gateway: Some(xds::istio::workload::GatewayAddress {
destination: Some(xds::istio::workload::gateway_address::Destination::Address(
XdsNetworkAddress {
network: "remote".to_string(),
address: vec![10, 22, 1, 1],
},
)),
hbone_mtls_port: 15009,
}),
services: std::collections::HashMap::from([(
"/example.com".to_string(),
PortList {
ports: vec![Port {
service_port: 80,
target_port: 8080,
}],
},
)]),
..Default::default()
});
let healthy_local_workload = XdsAddressType::Workload(XdsWorkload {
uid: "Kubernetes//Pod/default/local-example.com-pod".to_string(),
addresses: vec![Bytes::copy_from_slice(&[127, 0, 0, 2])],
network: "".to_string(),
tunnel_protocol: xds::istio::workload::TunnelProtocol::Hbone.into(),
services: std::collections::HashMap::from([(
"/example.com".to_string(),
PortList {
ports: vec![Port {
service_port: 80,
target_port: 8080,
}],
},
)]),
status: xds::istio::workload::WorkloadStatus::Healthy.into(),
..Default::default()
});
let unhealthy_local_workload = XdsAddressType::Workload(XdsWorkload {
uid: "Kubernetes//Pod/default/local-example.com-pod".to_string(),
addresses: vec![Bytes::copy_from_slice(&[127, 0, 0, 2])],
network: "".to_string(),
tunnel_protocol: xds::istio::workload::TunnelProtocol::Hbone.into(),
services: std::collections::HashMap::from([(
"/example.com".to_string(),
PortList {
ports: vec![Port {
service_port: 80,
target_port: 8080,
}],
},
)]),
status: xds::istio::workload::WorkloadStatus::Unhealthy.into(),
..Default::default()
});
run_build_request_multi(
"127.0.0.1",
"127.0.0.3:80",
vec![
service.clone(),
ew_gateway.clone(),
remote_workload.clone(),
healthy_local_workload.clone(),
],
Some(ExpectedRequest {
protocol: OutboundProtocol::HBONE,
hbone_destination: "127.0.0.2:8080",
destination: "127.0.0.2:15008",
}),
)
.await;
run_build_request_multi(
"127.0.0.1",
"127.0.0.3:80",
vec![
service.clone(),
ew_gateway.clone(),
remote_workload.clone(),
unhealthy_local_workload.clone(),
],
Some(ExpectedRequest {
protocol: OutboundProtocol::DOUBLEHBONE,
hbone_destination: "example.com:80",
destination: "10.22.1.1:15009",
}),
)
.await;
} }
#[tokio::test] #[tokio::test]

View File

@ -594,9 +594,10 @@ mod test {
} }
/// This is really a test for TokioH2Stream, but its nicer here because we have access to /// This is really a test for TokioH2Stream, but its nicer here because we have access to
/// streams /// streams.
/// Most important, we make sure there are no panics.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn small_reads() { async fn read_buffering() {
let (mut pool, srv) = setup_test(3).await; let (mut pool, srv) = setup_test(3).await;
let key = key(&srv, 2); let key = key(&srv, 2);
@ -612,13 +613,28 @@ mod test {
let c = pool.send_request_pooled(&key.clone(), req()).await.unwrap(); let c = pool.send_request_pooled(&key.clone(), req()).await.unwrap();
let mut c = TokioH2Stream::new(c); let mut c = TokioH2Stream::new(c);
c.write_all(b"abcde").await.unwrap(); c.write_all(b"abcde").await.unwrap();
let mut b = [0u8; 0]; let mut b = [0u8; 100];
// Crucially, this should error rather than panic. // Properly buffer reads and don't error
if let Err(e) = c.read(&mut b).await { assert_eq!(c.read(&mut b).await.unwrap(), 8);
assert_eq!(e.kind(), io::ErrorKind::Other); assert_eq!(&b[..8], b"poolsrv\n"); // this is added by itself
} else { assert_eq!(c.read(&mut b[..1]).await.unwrap(), 1);
panic!("Should have errored"); assert_eq!(&b[..1], b"a");
} assert_eq!(c.read(&mut b[..1]).await.unwrap(), 1);
assert_eq!(&b[..1], b"b");
assert_eq!(c.read(&mut b[..1]).await.unwrap(), 1);
assert_eq!(&b[..1], b"c");
assert_eq!(c.read(&mut b).await.unwrap(), 2); // there are only two bytes left
assert_eq!(&b[..2], b"de");
// Once we drop the pool, we should still retained the buffered data,
// but then we should error.
c.write_all(b"abcde").await.unwrap();
assert_eq!(c.read(&mut b[..3]).await.unwrap(), 3);
assert_eq!(&b[..3], b"abc");
drop(pool);
assert_eq!(c.read(&mut b[..2]).await.unwrap(), 2);
assert_eq!(&b[..2], b"de");
assert!(c.read(&mut b).await.is_err());
} }
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]