mirror of https://github.com/istio/ztunnel.git
Compare commits
7 Commits
master
...
1.27.0-rc.
Author | SHA1 | Date |
---|---|---|
|
7537647876 | |
|
c9cd46a609 | |
|
700791a158 | |
|
8ee3492325 | |
|
193d425d57 | |
|
31ddeb929b | |
|
39333d0566 |
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"name": "istio build-tools",
|
||||
"image": "gcr.io/istio-testing/build-tools:master-9a50b34135f1aa16b04d56dcb89c5b91101a28aa",
|
||||
"image": "gcr.io/istio-testing/build-tools:release-1.27-56a42ed008b9070d88166dbec82e798d9eca43de",
|
||||
"privileged": true,
|
||||
"remoteEnv": {
|
||||
"USE_GKE_GCLOUD_AUTH_PLUGIN": "True",
|
||||
|
|
|
@ -1,6 +1 @@
|
|||
* @istio/wg-networking-maintainers-ztunnel
|
||||
/Makefile* @istio/wg-test-and-release-maintainers
|
||||
/*.md @istio/wg-test-and-release-maintainers
|
||||
/common/ @istio/wg-test-and-release-maintainers
|
||||
/common-protos/ @istio/wg-test-and-release-maintainers
|
||||
/scripts/ @istio/wg-test-and-release-maintainers
|
||||
* @istio/release-managers-1-27
|
||||
|
|
|
@ -71,7 +71,7 @@ itertools = "0.14"
|
|||
keyed_priority_queue = "0.4"
|
||||
libc = "0.2"
|
||||
log = "0.4"
|
||||
nix = { version = "0.29", features = ["socket", "sched", "uio", "fs", "ioctl", "user", "net", "mount"] }
|
||||
nix = { version = "0.29", features = ["socket", "sched", "uio", "fs", "ioctl", "user", "net", "mount", "resource" ] }
|
||||
once_cell = "1.21"
|
||||
num_cpus = "1.16"
|
||||
ppp = "2.3"
|
||||
|
|
|
@ -1 +1 @@
|
|||
73315b91eb0bb247476b5b03911ec7f2cb6b5176
|
||||
e00505c43a460ae40f8a26e1a1d62ffebb68d918
|
||||
|
|
|
@ -92,7 +92,7 @@ mirror-licenses: mod-download-go
|
|||
@license-lint --mirror
|
||||
|
||||
TMP := $(shell mktemp -d -u)
|
||||
UPDATE_BRANCH ?= "master"
|
||||
UPDATE_BRANCH ?= "release-1.27"
|
||||
|
||||
BUILD_TOOLS_ORG ?= "istio"
|
||||
|
||||
|
|
|
@ -75,7 +75,7 @@ fi
|
|||
TOOLS_REGISTRY_PROVIDER=${TOOLS_REGISTRY_PROVIDER:-gcr.io}
|
||||
PROJECT_ID=${PROJECT_ID:-istio-testing}
|
||||
if [[ "${IMAGE_VERSION:-}" == "" ]]; then
|
||||
IMAGE_VERSION=master-9a50b34135f1aa16b04d56dcb89c5b91101a28aa
|
||||
IMAGE_VERSION=release-1.27-56a42ed008b9070d88166dbec82e798d9eca43de
|
||||
fi
|
||||
if [[ "${IMAGE_NAME:-}" == "" ]]; then
|
||||
IMAGE_NAME=build-tools
|
||||
|
|
|
@ -309,6 +309,8 @@ pub struct Config {
|
|||
pub ztunnel_identity: Option<identity::Identity>,
|
||||
|
||||
pub ztunnel_workload: Option<state::WorkloadInfo>,
|
||||
|
||||
pub ipv6_enabled: bool,
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize, Clone, Copy, Debug)]
|
||||
|
@ -862,6 +864,7 @@ pub fn construct_config(pc: ProxyConfig) -> Result<Config, Error> {
|
|||
localhost_app_tunnel: parse_default(LOCALHOST_APP_TUNNEL, true)?,
|
||||
ztunnel_identity,
|
||||
ztunnel_workload,
|
||||
ipv6_enabled,
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -86,6 +86,7 @@ impl Server {
|
|||
socket_factory: &(dyn SocketFactory + Send + Sync),
|
||||
local_workload_information: Arc<LocalWorkloadFetcher>,
|
||||
prefered_service_namespace: Option<String>,
|
||||
ipv6_enabled: bool,
|
||||
) -> Result<Self, Error> {
|
||||
// if the address we got from config is supposed to be v6-enabled,
|
||||
// actually check if the local pod context our socketfactory operates in supports V6.
|
||||
|
@ -104,6 +105,7 @@ impl Server {
|
|||
metrics,
|
||||
local_workload_information,
|
||||
prefered_service_namespace,
|
||||
ipv6_enabled,
|
||||
);
|
||||
let store = Arc::new(store);
|
||||
let handler = dns::handler::Handler::new(store.clone());
|
||||
|
@ -194,6 +196,7 @@ struct Store {
|
|||
metrics: Arc<Metrics>,
|
||||
local_workload: Arc<LocalWorkloadFetcher>,
|
||||
prefered_service_namespace: Option<String>,
|
||||
ipv6_enabled: bool,
|
||||
}
|
||||
|
||||
impl Store {
|
||||
|
@ -204,6 +207,7 @@ impl Store {
|
|||
metrics: Arc<Metrics>,
|
||||
local_workload_information: Arc<LocalWorkloadFetcher>,
|
||||
prefered_service_namespace: Option<String>,
|
||||
ipv6_enabled: bool,
|
||||
) -> Self {
|
||||
let domain = as_name(domain);
|
||||
let svc_domain = append_name(as_name("svc"), &domain);
|
||||
|
@ -216,6 +220,7 @@ impl Store {
|
|||
metrics,
|
||||
local_workload: local_workload_information,
|
||||
prefered_service_namespace,
|
||||
ipv6_enabled,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -422,6 +427,13 @@ impl Store {
|
|||
None
|
||||
}
|
||||
|
||||
fn record_type_enabled(&self, addr: &IpAddr) -> bool {
|
||||
match addr {
|
||||
IpAddr::V4(_) => true, // IPv4 always
|
||||
IpAddr::V6(_) => self.ipv6_enabled, // IPv6 must be not be disabled in config
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets the list of addresses of the requested record type from the server.
|
||||
fn get_addresses(
|
||||
&self,
|
||||
|
@ -434,7 +446,7 @@ impl Store {
|
|||
.workload_ips
|
||||
.iter()
|
||||
.filter_map(|addr| {
|
||||
if is_record_type(addr, record_type) {
|
||||
if is_record_type(addr, record_type) && self.record_type_enabled(addr) {
|
||||
Some(*addr)
|
||||
} else {
|
||||
None
|
||||
|
@ -453,10 +465,9 @@ impl Store {
|
|||
debug!("failed to fetch workload for {}", ep.workload_uid);
|
||||
return None;
|
||||
};
|
||||
wl.workload_ips
|
||||
.iter()
|
||||
.copied()
|
||||
.find(|addr| is_record_type(addr, record_type))
|
||||
wl.workload_ips.iter().copied().find(|addr| {
|
||||
is_record_type(addr, record_type) && self.record_type_enabled(addr)
|
||||
})
|
||||
})
|
||||
.collect()
|
||||
} else {
|
||||
|
@ -468,6 +479,7 @@ impl Store {
|
|||
.filter_map(|vip| {
|
||||
if is_record_type(&vip.address, record_type)
|
||||
&& client.network == vip.network
|
||||
&& self.record_type_enabled(&vip.address)
|
||||
{
|
||||
Some(vip.address)
|
||||
} else {
|
||||
|
@ -637,7 +649,7 @@ impl Resolver for Store {
|
|||
// From this point on, we are the authority for the response.
|
||||
let is_authoritative = true;
|
||||
|
||||
if !service_family_allowed(&service_match.server, record_type) {
|
||||
if !service_family_allowed(&service_match.server, record_type, self.ipv6_enabled) {
|
||||
access_log(
|
||||
request,
|
||||
Some(&client),
|
||||
|
@ -706,7 +718,13 @@ impl Resolver for Store {
|
|||
/// anyway, so would naturally work.
|
||||
/// Headless services, however, do not have VIPs, and the Pods behind them can have dual stack IPs even with
|
||||
/// the Service being single-stack. In this case, we are NOT supposed to return both IPs.
|
||||
fn service_family_allowed(server: &Address, record_type: RecordType) -> bool {
|
||||
/// If IPv6 is globally disabled, AAAA records are not allowed.
|
||||
fn service_family_allowed(server: &Address, record_type: RecordType, ipv6_enabled: bool) -> bool {
|
||||
// If IPv6 is globally disabled, don't allow AAAA records
|
||||
if !ipv6_enabled && record_type == RecordType::AAAA {
|
||||
return false;
|
||||
}
|
||||
|
||||
match server {
|
||||
Address::Service(service) => match service.ip_families {
|
||||
Some(IpFamily::IPv4) if record_type == RecordType::AAAA => false,
|
||||
|
@ -1087,6 +1105,7 @@ mod tests {
|
|||
metrics: test_metrics(),
|
||||
local_workload,
|
||||
prefered_service_namespace: None,
|
||||
ipv6_enabled: true,
|
||||
};
|
||||
|
||||
let namespaced_domain = n(format!("{}.svc.cluster.local", c.client_namespace));
|
||||
|
@ -1432,6 +1451,7 @@ mod tests {
|
|||
&factory,
|
||||
local_workload,
|
||||
Some(PREFERRED.to_string()),
|
||||
true, // ipv6_enabled for tests
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -1519,6 +1539,7 @@ mod tests {
|
|||
&factory,
|
||||
local_workload,
|
||||
None,
|
||||
true, // ipv6_enabled for tests
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -1569,6 +1590,7 @@ mod tests {
|
|||
state.clone(),
|
||||
),
|
||||
prefered_service_namespace: None,
|
||||
ipv6_enabled: true,
|
||||
};
|
||||
|
||||
let ip4n6_client_ip = ip("::ffff:202:202");
|
||||
|
@ -1603,6 +1625,7 @@ mod tests {
|
|||
&factory,
|
||||
local_workload,
|
||||
None,
|
||||
true, // ipv6_enabled for tests
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
|
24
src/main.rs
24
src/main.rs
|
@ -14,8 +14,9 @@
|
|||
|
||||
extern crate core;
|
||||
|
||||
use nix::sys::resource::{Resource, getrlimit, setrlimit};
|
||||
use std::sync::Arc;
|
||||
use tracing::info;
|
||||
use tracing::{info, warn};
|
||||
use ztunnel::*;
|
||||
|
||||
#[cfg(feature = "jemalloc")]
|
||||
|
@ -28,6 +29,26 @@ static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
|
|||
#[unsafe(export_name = "malloc_conf")]
|
||||
pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19\0";
|
||||
|
||||
// We use this on Unix systems to increase the number of open file descriptors
|
||||
// if possible. This is useful for high-load scenarios where the default limit
|
||||
// is too low, which can lead to droopped connections and other issues:
|
||||
// see: https://github.com/istio/ztunnel/issues/1585
|
||||
fn increase_open_files_limit() {
|
||||
#[cfg(unix)]
|
||||
if let Ok((soft_limit, hard_limit)) = getrlimit(Resource::RLIMIT_NOFILE) {
|
||||
if let Err(e) = setrlimit(Resource::RLIMIT_NOFILE, hard_limit, hard_limit) {
|
||||
warn!("failed to set file descriptor limits: {e}");
|
||||
} else {
|
||||
info!(
|
||||
"set file descriptor limits from {} to {}",
|
||||
soft_limit, hard_limit
|
||||
);
|
||||
}
|
||||
} else {
|
||||
warn!("failed to get file descriptor limits");
|
||||
}
|
||||
}
|
||||
|
||||
fn main() -> anyhow::Result<()> {
|
||||
let _log_flush = telemetry::setup_logging();
|
||||
|
||||
|
@ -74,6 +95,7 @@ fn version() -> anyhow::Result<()> {
|
|||
|
||||
async fn proxy(cfg: Arc<config::Config>) -> anyhow::Result<()> {
|
||||
info!("version: {}", version::BuildInfo::new());
|
||||
increase_open_files_limit();
|
||||
info!("running with config: {}", serde_yaml::to_string(&cfg)?);
|
||||
app::build(cfg).await?.wait_termination().await
|
||||
}
|
||||
|
|
|
@ -484,6 +484,9 @@ pub enum Error {
|
|||
#[error("requested service {0} found, but has no IP addresses")]
|
||||
NoIPForService(String),
|
||||
|
||||
#[error("no service for target address: {0}")]
|
||||
NoService(SocketAddr),
|
||||
|
||||
#[error(
|
||||
"ip addresses were resolved for workload {0}, but valid dns response had no A/AAAA records"
|
||||
)]
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
// limitations under the License.
|
||||
|
||||
use crate::copy;
|
||||
use bytes::{BufMut, Bytes};
|
||||
use bytes::Bytes;
|
||||
use futures_core::ready;
|
||||
use h2::Reason;
|
||||
use std::io::Error;
|
||||
|
@ -85,7 +85,10 @@ pub struct H2StreamWriteHalf {
|
|||
_dropped: Option<DropCounter>,
|
||||
}
|
||||
|
||||
pub struct TokioH2Stream(H2Stream);
|
||||
pub struct TokioH2Stream {
|
||||
stream: H2Stream,
|
||||
buf: Bytes,
|
||||
}
|
||||
|
||||
struct DropCounter {
|
||||
// Whether the other end of this shared counter has already dropped.
|
||||
|
@ -144,7 +147,10 @@ impl Drop for DropCounter {
|
|||
// then the specific implementation will conflict with the generic one.
|
||||
impl TokioH2Stream {
|
||||
pub fn new(stream: H2Stream) -> Self {
|
||||
Self(stream)
|
||||
Self {
|
||||
stream,
|
||||
buf: Bytes::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -154,21 +160,21 @@ impl tokio::io::AsyncRead for TokioH2Stream {
|
|||
cx: &mut Context<'_>,
|
||||
buf: &mut tokio::io::ReadBuf<'_>,
|
||||
) -> Poll<std::io::Result<()>> {
|
||||
let pinned = std::pin::Pin::new(&mut self.0.read);
|
||||
copy::ResizeBufRead::poll_bytes(pinned, cx).map(|r| match r {
|
||||
Ok(bytes) => {
|
||||
if buf.remaining() < bytes.len() {
|
||||
Err(Error::other(format!(
|
||||
"kould overflow buffer of with {} remaining",
|
||||
buf.remaining()
|
||||
)))
|
||||
} else {
|
||||
buf.put(bytes);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
})
|
||||
// Just return the bytes we have left over and don't poll the stream because
|
||||
// its unclear what to do if there are bytes left over from the previous read, and when we
|
||||
// poll, we get an error.
|
||||
if self.buf.is_empty() {
|
||||
// If we have no unread bytes, we can poll the stream
|
||||
// and fill self.buf with the bytes we read.
|
||||
let pinned = std::pin::Pin::new(&mut self.stream.read);
|
||||
let res = ready!(copy::ResizeBufRead::poll_bytes(pinned, cx))?;
|
||||
self.buf = res;
|
||||
}
|
||||
// Copy as many bytes as we can from self.buf.
|
||||
let cnt = Ord::min(buf.remaining(), self.buf.len());
|
||||
buf.put_slice(&self.buf[..cnt]);
|
||||
self.buf = self.buf.split_off(cnt);
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -178,7 +184,7 @@ impl tokio::io::AsyncWrite for TokioH2Stream {
|
|||
cx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<Result<usize, tokio::io::Error>> {
|
||||
let pinned = std::pin::Pin::new(&mut self.0.write);
|
||||
let pinned = std::pin::Pin::new(&mut self.stream.write);
|
||||
let buf = Bytes::copy_from_slice(buf);
|
||||
copy::AsyncWriteBuf::poll_write_buf(pinned, cx, buf)
|
||||
}
|
||||
|
@ -187,7 +193,7 @@ impl tokio::io::AsyncWrite for TokioH2Stream {
|
|||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Result<(), std::io::Error>> {
|
||||
let pinned = std::pin::Pin::new(&mut self.0.write);
|
||||
let pinned = std::pin::Pin::new(&mut self.stream.write);
|
||||
copy::AsyncWriteBuf::poll_flush(pinned, cx)
|
||||
}
|
||||
|
||||
|
@ -195,7 +201,7 @@ impl tokio::io::AsyncWrite for TokioH2Stream {
|
|||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Result<(), std::io::Error>> {
|
||||
let pinned = std::pin::Pin::new(&mut self.0.write);
|
||||
let pinned = std::pin::Pin::new(&mut self.stream.write);
|
||||
copy::AsyncWriteBuf::poll_shutdown(pinned, cx)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,10 +35,10 @@ use crate::proxy::{ConnectionOpen, ConnectionResult, DerivedWorkload, metrics};
|
|||
use crate::drain::DrainWatcher;
|
||||
use crate::drain::run_with_drain;
|
||||
use crate::proxy::h2::{H2Stream, client::WorkloadKey};
|
||||
use crate::state::ServiceResolutionMode;
|
||||
use crate::state::service::ServiceDescription;
|
||||
use crate::state::service::{Service, ServiceDescription};
|
||||
use crate::state::workload::OutboundProtocol;
|
||||
use crate::state::workload::{InboundProtocol, NetworkAddress, Workload, address::Address};
|
||||
use crate::state::{ServiceResolutionMode, Upstream};
|
||||
use crate::{assertions, copy, proxy, socket};
|
||||
|
||||
use super::h2::TokioH2Stream;
|
||||
|
@ -369,6 +369,83 @@ impl OutboundConnection {
|
|||
}
|
||||
}
|
||||
|
||||
// This function is called when the select next hop is on a different network,
|
||||
// so we expect the upstream workload to have a network gatewy configured.
|
||||
//
|
||||
// When we use a gateway to reach to a workload on a remote network we have to
|
||||
// use double HBONE (HBONE incapsulated inside HBONE). The gateway will
|
||||
// terminate the outer HBONE tunnel and forward the inner HBONE to the actual
|
||||
// destination as a opaque stream of bytes and the actual destination will
|
||||
// interpret it as an HBONE connection.
|
||||
//
|
||||
// If the upstream workload does not have an E/W gateway this function returns
|
||||
// an error indicating that it could not find a valid destination.
|
||||
//
|
||||
// A note about double HBONE, in double HBONE both inner and outer HBONE use
|
||||
// destination service name as HBONE target URI.
|
||||
//
|
||||
// Having target URI in the outer HBONE tunnel allows E/W gateway to figure out
|
||||
// where to route the data next witout the need to terminate inner HBONE tunnel.
|
||||
// In other words, it could forward inner HBONE as if it's an opaque stream of
|
||||
// bytes without trying to interpret it.
|
||||
//
|
||||
// NOTE: when connecting through an E/W gateway, regardless of whether there is
|
||||
// a waypoint or not, we always use service hostname and the service port. It's
|
||||
// somewhat different from how regular HBONE works, so I'm calling it out here.
|
||||
async fn build_request_through_gateway(
|
||||
&self,
|
||||
source: Arc<Workload>,
|
||||
// next hop on the remote network that we picked as our destination.
|
||||
// It may be a local view of a Waypoint workload on remote network or
|
||||
// a local view of the service workload (when waypoint is not
|
||||
// configured).
|
||||
upstream: Upstream,
|
||||
// This is a target service we wanted to reach in the first place.
|
||||
//
|
||||
// NOTE: Crossing network boundaries is only supported for services
|
||||
// at the moment, so we should always have a service we could use.
|
||||
service: &Service,
|
||||
target: SocketAddr,
|
||||
) -> Result<Request, Error> {
|
||||
if let Some(gateway) = &upstream.workload.network_gateway {
|
||||
let gateway_upstream = self
|
||||
.pi
|
||||
.state
|
||||
.fetch_network_gateway(gateway, &source, target)
|
||||
.await?;
|
||||
let hbone_target_destination = Some(HboneAddress::SvcHostname(
|
||||
service.hostname.clone(),
|
||||
target.port(),
|
||||
));
|
||||
|
||||
debug!("built request to a destination on another network through an E/W gateway");
|
||||
Ok(Request {
|
||||
protocol: OutboundProtocol::DOUBLEHBONE,
|
||||
source,
|
||||
hbone_target_destination,
|
||||
actual_destination_workload: Some(gateway_upstream.workload.clone()),
|
||||
intended_destination_service: Some(ServiceDescription::from(service)),
|
||||
actual_destination: gateway_upstream.workload_socket_addr().ok_or(
|
||||
Error::NoValidDestination(Box::new((*gateway_upstream.workload).clone())),
|
||||
)?,
|
||||
// The outer tunnel of double HBONE is terminated by the E/W
|
||||
// gateway and so for the credentials of the next hop
|
||||
// (upstream_sans) we use gateway credentials.
|
||||
upstream_sans: gateway_upstream.workload_and_services_san(),
|
||||
// The inner HBONE tunnel is terminated by either the server
|
||||
// we want to reach or a Waypoint in front of it, depending on
|
||||
// the configuration. So for the final destination credentials
|
||||
// (final_sans) we use the upstream workload credentials.
|
||||
final_sans: upstream.service_sans(),
|
||||
})
|
||||
} else {
|
||||
// Do not try to send cross-network traffic without network gateway.
|
||||
Err(Error::NoValidDestination(Box::new(
|
||||
(*upstream.workload).clone(),
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
// build_request computes all information about the request we should send
|
||||
// TODO: Do we want a single lock for source and upstream...?
|
||||
async fn build_request(
|
||||
|
@ -381,7 +458,7 @@ impl OutboundConnection {
|
|||
|
||||
// If this is to-service traffic check for a service waypoint
|
||||
// Capture result of whether this is svc addressed
|
||||
let svc_addressed = if let Some(Address::Service(target_service)) = state
|
||||
let service = if let Some(Address::Service(target_service)) = state
|
||||
.fetch_address(&NetworkAddress {
|
||||
network: self.pi.cfg.network.clone(),
|
||||
address: target.ip(),
|
||||
|
@ -393,6 +470,18 @@ impl OutboundConnection {
|
|||
.fetch_service_waypoint(&target_service, &source_workload, target)
|
||||
.await?
|
||||
{
|
||||
if waypoint.workload.network != source_workload.network {
|
||||
debug!("picked a waypoint on remote network");
|
||||
return self
|
||||
.build_request_through_gateway(
|
||||
source_workload.clone(),
|
||||
waypoint,
|
||||
&target_service,
|
||||
target,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
let upstream_sans = waypoint.workload_and_services_san();
|
||||
let actual_destination =
|
||||
waypoint
|
||||
|
@ -413,10 +502,10 @@ impl OutboundConnection {
|
|||
});
|
||||
}
|
||||
// this was service addressed but we did not find a waypoint
|
||||
true
|
||||
Some(target_service)
|
||||
} else {
|
||||
// this wasn't service addressed
|
||||
false
|
||||
None
|
||||
};
|
||||
|
||||
let Some(us) = state
|
||||
|
@ -428,7 +517,7 @@ impl OutboundConnection {
|
|||
)
|
||||
.await?
|
||||
else {
|
||||
if svc_addressed {
|
||||
if service.is_some() {
|
||||
return Err(Error::NoHealthyUpstream(target));
|
||||
}
|
||||
debug!("built request as passthrough; no upstream found");
|
||||
|
@ -446,37 +535,26 @@ impl OutboundConnection {
|
|||
|
||||
// Check whether we are using an E/W gateway and sending cross network traffic
|
||||
if us.workload.network != source_workload.network {
|
||||
if let Some(ew_gtw) = &us.workload.network_gateway {
|
||||
let gtw_us = {
|
||||
self.pi
|
||||
.state
|
||||
.fetch_network_gateway(ew_gtw, &source_workload, target)
|
||||
.await?
|
||||
};
|
||||
|
||||
let svc = us
|
||||
.destination_service
|
||||
.as_ref()
|
||||
.expect("Workloads with network gateways must be service addressed.");
|
||||
let hbone_target_destination =
|
||||
Some(HboneAddress::SvcHostname(svc.hostname.clone(), us.port));
|
||||
|
||||
return Ok(Request {
|
||||
protocol: OutboundProtocol::DOUBLEHBONE,
|
||||
source: source_workload,
|
||||
hbone_target_destination,
|
||||
actual_destination_workload: Some(gtw_us.workload.clone()),
|
||||
intended_destination_service: us.destination_service.clone(),
|
||||
actual_destination: gtw_us.workload_socket_addr().ok_or(
|
||||
Error::NoValidDestination(Box::new((*gtw_us.workload).clone())),
|
||||
)?,
|
||||
upstream_sans: gtw_us.workload_and_services_san(),
|
||||
final_sans: us.service_sans(),
|
||||
});
|
||||
} else {
|
||||
// Do not try to send cross-network traffic without network gateway.
|
||||
return Err(Error::NoValidDestination(Box::new((*us.workload).clone())));
|
||||
}
|
||||
// Workloads on remote network must be service addressed, so if we got here
|
||||
// and we don't have a service for the original target address then it's a
|
||||
// bug either in ztunnel itself or in istiod.
|
||||
//
|
||||
// For a double HBONE protocol implementation we have to know the
|
||||
// destination service and if there is no service for the target it's a bug.
|
||||
//
|
||||
// This situation "should never happen" because for workloads fetch_upstream
|
||||
// above only checks the workloads on the same network as this ztunnel
|
||||
// instance and therefore it should not be able to find a workload on a
|
||||
// different network.
|
||||
debug_assert!(
|
||||
service.is_some(),
|
||||
"workload on remote network is not service addressed"
|
||||
);
|
||||
debug!("picked a workload on remote network");
|
||||
let service = service.as_ref().ok_or(Error::NoService(target))?;
|
||||
return self
|
||||
.build_request_through_gateway(source_workload.clone(), us, service, target)
|
||||
.await;
|
||||
}
|
||||
|
||||
// We are not using a network gateway and there is no workload address.
|
||||
|
@ -491,7 +569,7 @@ impl OutboundConnection {
|
|||
// Check if we need to go through a workload addressed waypoint.
|
||||
// Don't traverse waypoint twice if the source is sandwich-outbound.
|
||||
// Don't traverse waypoint if traffic was addressed to a service (handled before)
|
||||
if !from_waypoint && !svc_addressed {
|
||||
if !from_waypoint && service.is_none() {
|
||||
// For case upstream server has enabled waypoint
|
||||
let waypoint = state
|
||||
.fetch_workload_waypoint(&us.workload, &source_workload, target)
|
||||
|
@ -816,6 +894,8 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn build_request_double_hbone() {
|
||||
// example.com service has a workload on remote network.
|
||||
// E/W gateway is addressed by an IP.
|
||||
run_build_request_multi(
|
||||
"127.0.0.1",
|
||||
"127.0.0.3:80",
|
||||
|
@ -867,11 +947,13 @@ mod tests {
|
|||
],
|
||||
Some(ExpectedRequest {
|
||||
protocol: OutboundProtocol::DOUBLEHBONE,
|
||||
hbone_destination: "example.com:8080",
|
||||
hbone_destination: "example.com:80",
|
||||
destination: "10.22.1.1:15009",
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
// example.com service has a workload on remote network.
|
||||
// E/W gateway is addressed by a hostname.
|
||||
run_build_request_multi(
|
||||
"127.0.0.1",
|
||||
"127.0.0.3:80",
|
||||
|
@ -944,11 +1026,218 @@ mod tests {
|
|||
],
|
||||
Some(ExpectedRequest {
|
||||
protocol: OutboundProtocol::DOUBLEHBONE,
|
||||
hbone_destination: "example.com:8080",
|
||||
hbone_destination: "example.com:80",
|
||||
destination: "127.0.0.5:15008",
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
// example.com service has a waypoint and waypoint workload is on remote network.
|
||||
// E/W gateway is addressed by an IP.
|
||||
run_build_request_multi(
|
||||
"127.0.0.1",
|
||||
"127.0.0.3:80",
|
||||
vec![
|
||||
XdsAddressType::Service(XdsService {
|
||||
hostname: "example.com".to_string(),
|
||||
addresses: vec![XdsNetworkAddress {
|
||||
network: "".to_string(),
|
||||
address: vec![127, 0, 0, 3],
|
||||
}],
|
||||
ports: vec![Port {
|
||||
service_port: 80,
|
||||
target_port: 8080,
|
||||
}],
|
||||
waypoint: Some(xds::istio::workload::GatewayAddress {
|
||||
destination: Some(
|
||||
xds::istio::workload::gateway_address::Destination::Hostname(
|
||||
XdsNamespacedHostname {
|
||||
namespace: Default::default(),
|
||||
hostname: "waypoint.com".into(),
|
||||
},
|
||||
),
|
||||
),
|
||||
hbone_mtls_port: 15008,
|
||||
}),
|
||||
..Default::default()
|
||||
}),
|
||||
XdsAddressType::Service(XdsService {
|
||||
hostname: "waypoint.com".to_string(),
|
||||
addresses: vec![XdsNetworkAddress {
|
||||
network: "".to_string(),
|
||||
address: vec![127, 0, 0, 4],
|
||||
}],
|
||||
ports: vec![Port {
|
||||
service_port: 15008,
|
||||
target_port: 15008,
|
||||
}],
|
||||
..Default::default()
|
||||
}),
|
||||
XdsAddressType::Workload(XdsWorkload {
|
||||
uid: "Kubernetes//Pod/default/remote-waypoint-pod".to_string(),
|
||||
addresses: vec![],
|
||||
network: "remote".to_string(),
|
||||
network_gateway: Some(xds::istio::workload::GatewayAddress {
|
||||
destination: Some(
|
||||
xds::istio::workload::gateway_address::Destination::Address(
|
||||
XdsNetworkAddress {
|
||||
network: "remote".to_string(),
|
||||
address: vec![10, 22, 1, 1],
|
||||
},
|
||||
),
|
||||
),
|
||||
hbone_mtls_port: 15009,
|
||||
}),
|
||||
services: std::collections::HashMap::from([(
|
||||
"/waypoint.com".to_string(),
|
||||
PortList {
|
||||
ports: vec![Port {
|
||||
service_port: 15008,
|
||||
target_port: 15008,
|
||||
}],
|
||||
},
|
||||
)]),
|
||||
..Default::default()
|
||||
}),
|
||||
XdsAddressType::Workload(XdsWorkload {
|
||||
uid: "Kubernetes//Pod/default/remote-ew-gtw".to_string(),
|
||||
addresses: vec![Bytes::copy_from_slice(&[10, 22, 1, 1])],
|
||||
network: "remote".to_string(),
|
||||
..Default::default()
|
||||
}),
|
||||
],
|
||||
Some(ExpectedRequest {
|
||||
protocol: OutboundProtocol::DOUBLEHBONE,
|
||||
hbone_destination: "example.com:80",
|
||||
destination: "10.22.1.1:15009",
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn build_request_failover_to_remote() {
|
||||
// Similar to the double HBONE test that we already have, but it sets up a scenario when
|
||||
// load balancing logic will pick a workload on a remote cluster when local workloads are
|
||||
// unhealthy, thus showing the expected failover behavior.
|
||||
let service = XdsAddressType::Service(XdsService {
|
||||
hostname: "example.com".to_string(),
|
||||
addresses: vec![XdsNetworkAddress {
|
||||
network: "".to_string(),
|
||||
address: vec![127, 0, 0, 3],
|
||||
}],
|
||||
ports: vec![Port {
|
||||
service_port: 80,
|
||||
target_port: 8080,
|
||||
}],
|
||||
// Prefer routing to workloads on the same network, but when nothing is healthy locally
|
||||
// allow failing over to remote networks.
|
||||
load_balancing: Some(xds::istio::workload::LoadBalancing {
|
||||
routing_preference: vec![
|
||||
xds::istio::workload::load_balancing::Scope::Network.into(),
|
||||
],
|
||||
mode: xds::istio::workload::load_balancing::Mode::Failover.into(),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
});
|
||||
let ew_gateway = XdsAddressType::Workload(XdsWorkload {
|
||||
uid: "Kubernetes//Pod/default/remote-ew-gtw".to_string(),
|
||||
addresses: vec![Bytes::copy_from_slice(&[10, 22, 1, 1])],
|
||||
network: "remote".to_string(),
|
||||
..Default::default()
|
||||
});
|
||||
let remote_workload = XdsAddressType::Workload(XdsWorkload {
|
||||
uid: "Kubernetes//Pod/default/remote-example.com-pod".to_string(),
|
||||
addresses: vec![],
|
||||
network: "remote".to_string(),
|
||||
network_gateway: Some(xds::istio::workload::GatewayAddress {
|
||||
destination: Some(xds::istio::workload::gateway_address::Destination::Address(
|
||||
XdsNetworkAddress {
|
||||
network: "remote".to_string(),
|
||||
address: vec![10, 22, 1, 1],
|
||||
},
|
||||
)),
|
||||
hbone_mtls_port: 15009,
|
||||
}),
|
||||
services: std::collections::HashMap::from([(
|
||||
"/example.com".to_string(),
|
||||
PortList {
|
||||
ports: vec![Port {
|
||||
service_port: 80,
|
||||
target_port: 8080,
|
||||
}],
|
||||
},
|
||||
)]),
|
||||
..Default::default()
|
||||
});
|
||||
let healthy_local_workload = XdsAddressType::Workload(XdsWorkload {
|
||||
uid: "Kubernetes//Pod/default/local-example.com-pod".to_string(),
|
||||
addresses: vec![Bytes::copy_from_slice(&[127, 0, 0, 2])],
|
||||
network: "".to_string(),
|
||||
tunnel_protocol: xds::istio::workload::TunnelProtocol::Hbone.into(),
|
||||
services: std::collections::HashMap::from([(
|
||||
"/example.com".to_string(),
|
||||
PortList {
|
||||
ports: vec![Port {
|
||||
service_port: 80,
|
||||
target_port: 8080,
|
||||
}],
|
||||
},
|
||||
)]),
|
||||
status: xds::istio::workload::WorkloadStatus::Healthy.into(),
|
||||
..Default::default()
|
||||
});
|
||||
let unhealthy_local_workload = XdsAddressType::Workload(XdsWorkload {
|
||||
uid: "Kubernetes//Pod/default/local-example.com-pod".to_string(),
|
||||
addresses: vec![Bytes::copy_from_slice(&[127, 0, 0, 2])],
|
||||
network: "".to_string(),
|
||||
tunnel_protocol: xds::istio::workload::TunnelProtocol::Hbone.into(),
|
||||
services: std::collections::HashMap::from([(
|
||||
"/example.com".to_string(),
|
||||
PortList {
|
||||
ports: vec![Port {
|
||||
service_port: 80,
|
||||
target_port: 8080,
|
||||
}],
|
||||
},
|
||||
)]),
|
||||
status: xds::istio::workload::WorkloadStatus::Unhealthy.into(),
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
run_build_request_multi(
|
||||
"127.0.0.1",
|
||||
"127.0.0.3:80",
|
||||
vec![
|
||||
service.clone(),
|
||||
ew_gateway.clone(),
|
||||
remote_workload.clone(),
|
||||
healthy_local_workload.clone(),
|
||||
],
|
||||
Some(ExpectedRequest {
|
||||
protocol: OutboundProtocol::HBONE,
|
||||
hbone_destination: "127.0.0.2:8080",
|
||||
destination: "127.0.0.2:15008",
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
run_build_request_multi(
|
||||
"127.0.0.1",
|
||||
"127.0.0.3:80",
|
||||
vec![
|
||||
service.clone(),
|
||||
ew_gateway.clone(),
|
||||
remote_workload.clone(),
|
||||
unhealthy_local_workload.clone(),
|
||||
],
|
||||
Some(ExpectedRequest {
|
||||
protocol: OutboundProtocol::DOUBLEHBONE,
|
||||
hbone_destination: "example.com:80",
|
||||
destination: "10.22.1.1:15009",
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
|
@ -594,9 +594,10 @@ mod test {
|
|||
}
|
||||
|
||||
/// This is really a test for TokioH2Stream, but its nicer here because we have access to
|
||||
/// streams
|
||||
/// streams.
|
||||
/// Most important, we make sure there are no panics.
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn small_reads() {
|
||||
async fn read_buffering() {
|
||||
let (mut pool, srv) = setup_test(3).await;
|
||||
|
||||
let key = key(&srv, 2);
|
||||
|
@ -612,13 +613,28 @@ mod test {
|
|||
let c = pool.send_request_pooled(&key.clone(), req()).await.unwrap();
|
||||
let mut c = TokioH2Stream::new(c);
|
||||
c.write_all(b"abcde").await.unwrap();
|
||||
let mut b = [0u8; 0];
|
||||
// Crucially, this should error rather than panic.
|
||||
if let Err(e) = c.read(&mut b).await {
|
||||
assert_eq!(e.kind(), io::ErrorKind::Other);
|
||||
} else {
|
||||
panic!("Should have errored");
|
||||
}
|
||||
let mut b = [0u8; 100];
|
||||
// Properly buffer reads and don't error
|
||||
assert_eq!(c.read(&mut b).await.unwrap(), 8);
|
||||
assert_eq!(&b[..8], b"poolsrv\n"); // this is added by itself
|
||||
assert_eq!(c.read(&mut b[..1]).await.unwrap(), 1);
|
||||
assert_eq!(&b[..1], b"a");
|
||||
assert_eq!(c.read(&mut b[..1]).await.unwrap(), 1);
|
||||
assert_eq!(&b[..1], b"b");
|
||||
assert_eq!(c.read(&mut b[..1]).await.unwrap(), 1);
|
||||
assert_eq!(&b[..1], b"c");
|
||||
assert_eq!(c.read(&mut b).await.unwrap(), 2); // there are only two bytes left
|
||||
assert_eq!(&b[..2], b"de");
|
||||
|
||||
// Once we drop the pool, we should still retained the buffered data,
|
||||
// but then we should error.
|
||||
c.write_all(b"abcde").await.unwrap();
|
||||
assert_eq!(c.read(&mut b[..3]).await.unwrap(), 3);
|
||||
assert_eq!(&b[..3], b"abc");
|
||||
drop(pool);
|
||||
assert_eq!(c.read(&mut b[..2]).await.unwrap(), 2);
|
||||
assert_eq!(&b[..2], b"de");
|
||||
assert!(c.read(&mut b).await.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
|
|
|
@ -113,6 +113,7 @@ impl ProxyFactory {
|
|||
socket_factory.as_ref(),
|
||||
local_workload_information.as_fetcher(),
|
||||
self.config.prefered_service_namespace.clone(),
|
||||
self.config.ipv6_enabled,
|
||||
)
|
||||
.await?;
|
||||
resolver = Some(server.resolver());
|
||||
|
|
|
@ -299,6 +299,7 @@ pub async fn run_dns(responses: HashMap<Name, Vec<IpAddr>>) -> anyhow::Result<Te
|
|||
state.clone(),
|
||||
),
|
||||
Some("prefered-namespace".to_string()),
|
||||
true, // ipv6_enabled for tests
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
|
Loading…
Reference in New Issue