mirror of https://github.com/istio/ztunnel.git
Compare commits
13 Commits
1.27.0-bet
...
master
Author | SHA1 | Date |
---|---|---|
|
ac67367eb9 | |
|
ec120597bd | |
|
2837b0f410 | |
|
6640774d9f | |
|
4f50f8403b | |
|
b102502cbd | |
|
d58e82441f | |
|
c555eaa812 | |
|
85a94b6cc4 | |
|
3fa6335035 | |
|
dfa3b58bbc | |
|
c2d2534edb | |
|
84f0e52e64 |
|
@ -1,6 +1,6 @@
|
||||||
{
|
{
|
||||||
"name": "istio build-tools",
|
"name": "istio build-tools",
|
||||||
"image": "gcr.io/istio-testing/build-tools:master-9a50b34135f1aa16b04d56dcb89c5b91101a28aa",
|
"image": "gcr.io/istio-testing/build-tools:master-672e6089ff843019a2b28cf9e87754c7b74358ea",
|
||||||
"privileged": true,
|
"privileged": true,
|
||||||
"remoteEnv": {
|
"remoteEnv": {
|
||||||
"USE_GKE_GCLOUD_AUTH_PLUGIN": "True",
|
"USE_GKE_GCLOUD_AUTH_PLUGIN": "True",
|
||||||
|
|
|
@ -1802,7 +1802,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34"
|
checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"cfg-if",
|
"cfg-if",
|
||||||
"windows-targets 0.48.5",
|
"windows-targets 0.52.6",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -2777,6 +2777,21 @@ dependencies = [
|
||||||
"yasna",
|
"yasna",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "rcgen"
|
||||||
|
version = "0.14.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "49bc8ffa8a832eb1d7c8000337f8b0d2f4f2f5ec3cf4ddc26f125e3ad2451824"
|
||||||
|
dependencies = [
|
||||||
|
"aws-lc-rs",
|
||||||
|
"pem",
|
||||||
|
"ring",
|
||||||
|
"rustls-pki-types",
|
||||||
|
"time",
|
||||||
|
"x509-parser",
|
||||||
|
"yasna",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "redox_syscall"
|
name = "redox_syscall"
|
||||||
version = "0.5.11"
|
version = "0.5.11"
|
||||||
|
@ -4455,7 +4470,8 @@ dependencies = [
|
||||||
"prost-build",
|
"prost-build",
|
||||||
"prost-types",
|
"prost-types",
|
||||||
"rand 0.9.0",
|
"rand 0.9.0",
|
||||||
"rcgen",
|
"rcgen 0.13.3",
|
||||||
|
"rcgen 0.14.2",
|
||||||
"ring",
|
"ring",
|
||||||
"rustc_version",
|
"rustc_version",
|
||||||
"rustls",
|
"rustls",
|
||||||
|
|
|
@ -71,7 +71,7 @@ itertools = "0.14"
|
||||||
keyed_priority_queue = "0.4"
|
keyed_priority_queue = "0.4"
|
||||||
libc = "0.2"
|
libc = "0.2"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
nix = { version = "0.29", features = ["socket", "sched", "uio", "fs", "ioctl", "user", "net", "mount"] }
|
nix = { version = "0.29", features = ["socket", "sched", "uio", "fs", "ioctl", "user", "net", "mount", "resource" ] }
|
||||||
once_cell = "1.21"
|
once_cell = "1.21"
|
||||||
num_cpus = "1.16"
|
num_cpus = "1.16"
|
||||||
ppp = "2.3"
|
ppp = "2.3"
|
||||||
|
@ -80,7 +80,7 @@ prometheus-parse = "0.2"
|
||||||
prost = "0.13"
|
prost = "0.13"
|
||||||
prost-types = "0.13"
|
prost-types = "0.13"
|
||||||
rand = { version = "0.9" , features = ["small_rng"]}
|
rand = { version = "0.9" , features = ["small_rng"]}
|
||||||
rcgen = { version = "0.13", optional = true, features = ["pem"] }
|
rcgen = { version = "0.14", optional = true, features = ["pem"] }
|
||||||
rustls = { version = "0.23", default-features = false }
|
rustls = { version = "0.23", default-features = false }
|
||||||
rustls-native-certs = "0.8"
|
rustls-native-certs = "0.8"
|
||||||
rustls-pemfile = "2.2"
|
rustls-pemfile = "2.2"
|
||||||
|
|
|
@ -458,10 +458,10 @@ fn hbone_connection_config() -> ztunnel::config::ConfigSource {
|
||||||
workload: Workload {
|
workload: Workload {
|
||||||
workload_ips: vec![hbone_connection_ip(i)],
|
workload_ips: vec![hbone_connection_ip(i)],
|
||||||
protocol: InboundProtocol::HBONE,
|
protocol: InboundProtocol::HBONE,
|
||||||
uid: strng::format!("cluster1//v1/Pod/default/remote{}", i),
|
uid: strng::format!("cluster1//v1/Pod/default/remote{i}"),
|
||||||
name: strng::format!("workload-{}", i),
|
name: strng::format!("workload-{i}"),
|
||||||
namespace: strng::format!("namespace-{}", i),
|
namespace: strng::format!("namespace-{i}"),
|
||||||
service_account: strng::format!("service-account-{}", i),
|
service_account: strng::format!("service-account-{i}"),
|
||||||
..test_helpers::test_default_workload()
|
..test_helpers::test_default_workload()
|
||||||
},
|
},
|
||||||
services: Default::default(),
|
services: Default::default(),
|
||||||
|
|
5
build.rs
5
build.rs
|
@ -94,9 +94,6 @@ fn main() -> Result<(), anyhow::Error> {
|
||||||
"cargo:rustc-env=ZTUNNEL_BUILD_RUSTC_VERSION={}",
|
"cargo:rustc-env=ZTUNNEL_BUILD_RUSTC_VERSION={}",
|
||||||
rustc_version::version().unwrap()
|
rustc_version::version().unwrap()
|
||||||
);
|
);
|
||||||
println!(
|
println!("cargo:rustc-env=ZTUNNEL_BUILD_PROFILE_NAME={profile_name}");
|
||||||
"cargo:rustc-env=ZTUNNEL_BUILD_PROFILE_NAME={}",
|
|
||||||
profile_name
|
|
||||||
);
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
73315b91eb0bb247476b5b03911ec7f2cb6b5176
|
d235bc9f4a20f3c78c5aacbfa3f24d08a884a82e
|
||||||
|
|
|
@ -184,6 +184,10 @@ linters:
|
||||||
- linters:
|
- linters:
|
||||||
- staticcheck
|
- staticcheck
|
||||||
text: 'S1007'
|
text: 'S1007'
|
||||||
|
# TODO: remove once we have updated package names
|
||||||
|
- linters:
|
||||||
|
- revive
|
||||||
|
text: "var-naming: avoid meaningless package names"
|
||||||
paths:
|
paths:
|
||||||
- .*\.pb\.go
|
- .*\.pb\.go
|
||||||
- .*\.gen\.go
|
- .*\.gen\.go
|
||||||
|
|
|
@ -75,7 +75,7 @@ fi
|
||||||
TOOLS_REGISTRY_PROVIDER=${TOOLS_REGISTRY_PROVIDER:-gcr.io}
|
TOOLS_REGISTRY_PROVIDER=${TOOLS_REGISTRY_PROVIDER:-gcr.io}
|
||||||
PROJECT_ID=${PROJECT_ID:-istio-testing}
|
PROJECT_ID=${PROJECT_ID:-istio-testing}
|
||||||
if [[ "${IMAGE_VERSION:-}" == "" ]]; then
|
if [[ "${IMAGE_VERSION:-}" == "" ]]; then
|
||||||
IMAGE_VERSION=master-9a50b34135f1aa16b04d56dcb89c5b91101a28aa
|
IMAGE_VERSION=master-672e6089ff843019a2b28cf9e87754c7b74358ea
|
||||||
fi
|
fi
|
||||||
if [[ "${IMAGE_NAME:-}" == "" ]]; then
|
if [[ "${IMAGE_NAME:-}" == "" ]]; then
|
||||||
IMAGE_NAME=build-tools
|
IMAGE_NAME=build-tools
|
||||||
|
|
|
@ -2359,9 +2359,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "rcgen"
|
name = "rcgen"
|
||||||
version = "0.13.2"
|
version = "0.14.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "75e669e5202259b5314d1ea5397316ad400819437857b90861765f24c4cf80a2"
|
checksum = "49bc8ffa8a832eb1d7c8000337f8b0d2f4f2f5ec3cf4ddc26f125e3ad2451824"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"aws-lc-rs",
|
"aws-lc-rs",
|
||||||
"pem",
|
"pem",
|
||||||
|
|
|
@ -390,7 +390,7 @@ fn change_log_level(reset: bool, level: &str) -> Response<Full<Bytes>> {
|
||||||
// Invalid level provided
|
// Invalid level provided
|
||||||
return plaintext_response(
|
return plaintext_response(
|
||||||
hyper::StatusCode::BAD_REQUEST,
|
hyper::StatusCode::BAD_REQUEST,
|
||||||
format!("Invalid level provided: {}\n{}", level, HELP_STRING),
|
format!("Invalid level provided: {level}\n{HELP_STRING}"),
|
||||||
);
|
);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -398,7 +398,7 @@ fn change_log_level(reset: bool, level: &str) -> Response<Full<Bytes>> {
|
||||||
Ok(_) => list_loggers(),
|
Ok(_) => list_loggers(),
|
||||||
Err(e) => plaintext_response(
|
Err(e) => plaintext_response(
|
||||||
hyper::StatusCode::BAD_REQUEST,
|
hyper::StatusCode::BAD_REQUEST,
|
||||||
format!("Failed to set new level: {}\n{}", e, HELP_STRING),
|
format!("Failed to set new level: {e}\n{HELP_STRING}"),
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -309,6 +309,8 @@ pub struct Config {
|
||||||
pub ztunnel_identity: Option<identity::Identity>,
|
pub ztunnel_identity: Option<identity::Identity>,
|
||||||
|
|
||||||
pub ztunnel_workload: Option<state::WorkloadInfo>,
|
pub ztunnel_workload: Option<state::WorkloadInfo>,
|
||||||
|
|
||||||
|
pub ipv6_enabled: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(serde::Serialize, Clone, Copy, Debug)]
|
#[derive(serde::Serialize, Clone, Copy, Debug)]
|
||||||
|
@ -438,7 +440,7 @@ fn parse_worker_threads(default: usize) -> Result<usize, Error> {
|
||||||
Error::EnvVar(
|
Error::EnvVar(
|
||||||
ZTUNNEL_WORKER_THREADS.to_string(),
|
ZTUNNEL_WORKER_THREADS.to_string(),
|
||||||
value.clone(),
|
value.clone(),
|
||||||
format!("invalid percentage: {}", e),
|
format!("invalid percentage: {e}"),
|
||||||
)
|
)
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
|
@ -460,7 +462,7 @@ fn parse_worker_threads(default: usize) -> Result<usize, Error> {
|
||||||
Error::EnvVar(
|
Error::EnvVar(
|
||||||
ZTUNNEL_WORKER_THREADS.to_string(),
|
ZTUNNEL_WORKER_THREADS.to_string(),
|
||||||
value,
|
value,
|
||||||
format!("invalid number: {}", e),
|
format!("invalid number: {e}"),
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -597,7 +599,7 @@ pub fn construct_config(pc: ProxyConfig) -> Result<Config, Error> {
|
||||||
// on a pod-by-pod basis.
|
// on a pod-by-pod basis.
|
||||||
let dns_proxy_addr: Address = match pc.proxy_metadata.get(DNS_PROXY_ADDR_METADATA) {
|
let dns_proxy_addr: Address = match pc.proxy_metadata.get(DNS_PROXY_ADDR_METADATA) {
|
||||||
Some(dns_addr) => Address::new(ipv6_localhost_enabled, dns_addr)
|
Some(dns_addr) => Address::new(ipv6_localhost_enabled, dns_addr)
|
||||||
.unwrap_or_else(|_| panic!("failed to parse DNS_PROXY_ADDR: {}", dns_addr)),
|
.unwrap_or_else(|_| panic!("failed to parse DNS_PROXY_ADDR: {dns_addr}")),
|
||||||
None => Address::Localhost(ipv6_localhost_enabled, DEFAULT_DNS_PORT),
|
None => Address::Localhost(ipv6_localhost_enabled, DEFAULT_DNS_PORT),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -862,6 +864,7 @@ pub fn construct_config(pc: ProxyConfig) -> Result<Config, Error> {
|
||||||
localhost_app_tunnel: parse_default(LOCALHOST_APP_TUNNEL, true)?,
|
localhost_app_tunnel: parse_default(LOCALHOST_APP_TUNNEL, true)?,
|
||||||
ztunnel_identity,
|
ztunnel_identity,
|
||||||
ztunnel_workload,
|
ztunnel_workload,
|
||||||
|
ipv6_enabled,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -86,6 +86,7 @@ impl Server {
|
||||||
socket_factory: &(dyn SocketFactory + Send + Sync),
|
socket_factory: &(dyn SocketFactory + Send + Sync),
|
||||||
local_workload_information: Arc<LocalWorkloadFetcher>,
|
local_workload_information: Arc<LocalWorkloadFetcher>,
|
||||||
prefered_service_namespace: Option<String>,
|
prefered_service_namespace: Option<String>,
|
||||||
|
ipv6_enabled: bool,
|
||||||
) -> Result<Self, Error> {
|
) -> Result<Self, Error> {
|
||||||
// if the address we got from config is supposed to be v6-enabled,
|
// if the address we got from config is supposed to be v6-enabled,
|
||||||
// actually check if the local pod context our socketfactory operates in supports V6.
|
// actually check if the local pod context our socketfactory operates in supports V6.
|
||||||
|
@ -104,6 +105,7 @@ impl Server {
|
||||||
metrics,
|
metrics,
|
||||||
local_workload_information,
|
local_workload_information,
|
||||||
prefered_service_namespace,
|
prefered_service_namespace,
|
||||||
|
ipv6_enabled,
|
||||||
);
|
);
|
||||||
let store = Arc::new(store);
|
let store = Arc::new(store);
|
||||||
let handler = dns::handler::Handler::new(store.clone());
|
let handler = dns::handler::Handler::new(store.clone());
|
||||||
|
@ -194,6 +196,7 @@ struct Store {
|
||||||
metrics: Arc<Metrics>,
|
metrics: Arc<Metrics>,
|
||||||
local_workload: Arc<LocalWorkloadFetcher>,
|
local_workload: Arc<LocalWorkloadFetcher>,
|
||||||
prefered_service_namespace: Option<String>,
|
prefered_service_namespace: Option<String>,
|
||||||
|
ipv6_enabled: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Store {
|
impl Store {
|
||||||
|
@ -204,6 +207,7 @@ impl Store {
|
||||||
metrics: Arc<Metrics>,
|
metrics: Arc<Metrics>,
|
||||||
local_workload_information: Arc<LocalWorkloadFetcher>,
|
local_workload_information: Arc<LocalWorkloadFetcher>,
|
||||||
prefered_service_namespace: Option<String>,
|
prefered_service_namespace: Option<String>,
|
||||||
|
ipv6_enabled: bool,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let domain = as_name(domain);
|
let domain = as_name(domain);
|
||||||
let svc_domain = append_name(as_name("svc"), &domain);
|
let svc_domain = append_name(as_name("svc"), &domain);
|
||||||
|
@ -216,6 +220,7 @@ impl Store {
|
||||||
metrics,
|
metrics,
|
||||||
local_workload: local_workload_information,
|
local_workload: local_workload_information,
|
||||||
prefered_service_namespace,
|
prefered_service_namespace,
|
||||||
|
ipv6_enabled,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -422,6 +427,13 @@ impl Store {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn record_type_enabled(&self, addr: &IpAddr) -> bool {
|
||||||
|
match addr {
|
||||||
|
IpAddr::V4(_) => true, // IPv4 always
|
||||||
|
IpAddr::V6(_) => self.ipv6_enabled, // IPv6 must be not be disabled in config
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Gets the list of addresses of the requested record type from the server.
|
/// Gets the list of addresses of the requested record type from the server.
|
||||||
fn get_addresses(
|
fn get_addresses(
|
||||||
&self,
|
&self,
|
||||||
|
@ -434,7 +446,7 @@ impl Store {
|
||||||
.workload_ips
|
.workload_ips
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(|addr| {
|
.filter_map(|addr| {
|
||||||
if is_record_type(addr, record_type) {
|
if is_record_type(addr, record_type) && self.record_type_enabled(addr) {
|
||||||
Some(*addr)
|
Some(*addr)
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
|
@ -453,10 +465,9 @@ impl Store {
|
||||||
debug!("failed to fetch workload for {}", ep.workload_uid);
|
debug!("failed to fetch workload for {}", ep.workload_uid);
|
||||||
return None;
|
return None;
|
||||||
};
|
};
|
||||||
wl.workload_ips
|
wl.workload_ips.iter().copied().find(|addr| {
|
||||||
.iter()
|
is_record_type(addr, record_type) && self.record_type_enabled(addr)
|
||||||
.copied()
|
})
|
||||||
.find(|addr| is_record_type(addr, record_type))
|
|
||||||
})
|
})
|
||||||
.collect()
|
.collect()
|
||||||
} else {
|
} else {
|
||||||
|
@ -468,6 +479,7 @@ impl Store {
|
||||||
.filter_map(|vip| {
|
.filter_map(|vip| {
|
||||||
if is_record_type(&vip.address, record_type)
|
if is_record_type(&vip.address, record_type)
|
||||||
&& client.network == vip.network
|
&& client.network == vip.network
|
||||||
|
&& self.record_type_enabled(&vip.address)
|
||||||
{
|
{
|
||||||
Some(vip.address)
|
Some(vip.address)
|
||||||
} else {
|
} else {
|
||||||
|
@ -637,7 +649,7 @@ impl Resolver for Store {
|
||||||
// From this point on, we are the authority for the response.
|
// From this point on, we are the authority for the response.
|
||||||
let is_authoritative = true;
|
let is_authoritative = true;
|
||||||
|
|
||||||
if !service_family_allowed(&service_match.server, record_type) {
|
if !service_family_allowed(&service_match.server, record_type, self.ipv6_enabled) {
|
||||||
access_log(
|
access_log(
|
||||||
request,
|
request,
|
||||||
Some(&client),
|
Some(&client),
|
||||||
|
@ -706,7 +718,13 @@ impl Resolver for Store {
|
||||||
/// anyway, so would naturally work.
|
/// anyway, so would naturally work.
|
||||||
/// Headless services, however, do not have VIPs, and the Pods behind them can have dual stack IPs even with
|
/// Headless services, however, do not have VIPs, and the Pods behind them can have dual stack IPs even with
|
||||||
/// the Service being single-stack. In this case, we are NOT supposed to return both IPs.
|
/// the Service being single-stack. In this case, we are NOT supposed to return both IPs.
|
||||||
fn service_family_allowed(server: &Address, record_type: RecordType) -> bool {
|
/// If IPv6 is globally disabled, AAAA records are not allowed.
|
||||||
|
fn service_family_allowed(server: &Address, record_type: RecordType, ipv6_enabled: bool) -> bool {
|
||||||
|
// If IPv6 is globally disabled, don't allow AAAA records
|
||||||
|
if !ipv6_enabled && record_type == RecordType::AAAA {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
match server {
|
match server {
|
||||||
Address::Service(service) => match service.ip_families {
|
Address::Service(service) => match service.ip_families {
|
||||||
Some(IpFamily::IPv4) if record_type == RecordType::AAAA => false,
|
Some(IpFamily::IPv4) if record_type == RecordType::AAAA => false,
|
||||||
|
@ -1087,6 +1105,7 @@ mod tests {
|
||||||
metrics: test_metrics(),
|
metrics: test_metrics(),
|
||||||
local_workload,
|
local_workload,
|
||||||
prefered_service_namespace: None,
|
prefered_service_namespace: None,
|
||||||
|
ipv6_enabled: true,
|
||||||
};
|
};
|
||||||
|
|
||||||
let namespaced_domain = n(format!("{}.svc.cluster.local", c.client_namespace));
|
let namespaced_domain = n(format!("{}.svc.cluster.local", c.client_namespace));
|
||||||
|
@ -1432,6 +1451,7 @@ mod tests {
|
||||||
&factory,
|
&factory,
|
||||||
local_workload,
|
local_workload,
|
||||||
Some(PREFERRED.to_string()),
|
Some(PREFERRED.to_string()),
|
||||||
|
true, // ipv6_enabled for tests
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
@ -1451,8 +1471,8 @@ mod tests {
|
||||||
tasks.push(async move {
|
tasks.push(async move {
|
||||||
let name = format!("[{protocol}] {}", c.name);
|
let name = format!("[{protocol}] {}", c.name);
|
||||||
let resp = send_request(&mut client, n(c.host), c.query_type).await;
|
let resp = send_request(&mut client, n(c.host), c.query_type).await;
|
||||||
assert_eq!(c.expect_authoritative, resp.authoritative(), "{}", name);
|
assert_eq!(c.expect_authoritative, resp.authoritative(), "{name}");
|
||||||
assert_eq!(c.expect_code, resp.response_code(), "{}", name);
|
assert_eq!(c.expect_code, resp.response_code(), "{name}");
|
||||||
|
|
||||||
if c.expect_code == ResponseCode::NoError {
|
if c.expect_code == ResponseCode::NoError {
|
||||||
let mut actual = resp.answers().to_vec();
|
let mut actual = resp.answers().to_vec();
|
||||||
|
@ -1463,7 +1483,7 @@ mod tests {
|
||||||
if c.expect_authoritative {
|
if c.expect_authoritative {
|
||||||
sort_records(&mut actual);
|
sort_records(&mut actual);
|
||||||
}
|
}
|
||||||
assert_eq!(c.expect_records, actual, "{}", name);
|
assert_eq!(c.expect_records, actual, "{name}");
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -1519,6 +1539,7 @@ mod tests {
|
||||||
&factory,
|
&factory,
|
||||||
local_workload,
|
local_workload,
|
||||||
None,
|
None,
|
||||||
|
true, // ipv6_enabled for tests
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
@ -1533,7 +1554,7 @@ mod tests {
|
||||||
for (protocol, client) in [("tcp", &mut tcp_client), ("udp", &mut udp_client)] {
|
for (protocol, client) in [("tcp", &mut tcp_client), ("udp", &mut udp_client)] {
|
||||||
let name = format!("[{protocol}] {}", c.name);
|
let name = format!("[{protocol}] {}", c.name);
|
||||||
let resp = send_request(client, n(c.host), RecordType::A).await;
|
let resp = send_request(client, n(c.host), RecordType::A).await;
|
||||||
assert_eq!(c.expect_code, resp.response_code(), "{}", name);
|
assert_eq!(c.expect_code, resp.response_code(), "{name}");
|
||||||
if c.expect_code == ResponseCode::NoError {
|
if c.expect_code == ResponseCode::NoError {
|
||||||
assert!(!resp.answers().is_empty());
|
assert!(!resp.answers().is_empty());
|
||||||
}
|
}
|
||||||
|
@ -1569,6 +1590,7 @@ mod tests {
|
||||||
state.clone(),
|
state.clone(),
|
||||||
),
|
),
|
||||||
prefered_service_namespace: None,
|
prefered_service_namespace: None,
|
||||||
|
ipv6_enabled: true,
|
||||||
};
|
};
|
||||||
|
|
||||||
let ip4n6_client_ip = ip("::ffff:202:202");
|
let ip4n6_client_ip = ip("::ffff:202:202");
|
||||||
|
@ -1576,7 +1598,7 @@ mod tests {
|
||||||
match store.lookup(&req).await {
|
match store.lookup(&req).await {
|
||||||
Ok(_) => {}
|
Ok(_) => {}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
panic!("IPv6 encoded IPv4 should work! Error was {:?}", e)
|
panic!("IPv6 encoded IPv4 should work! Error was {e:?}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1603,6 +1625,7 @@ mod tests {
|
||||||
&factory,
|
&factory,
|
||||||
local_workload,
|
local_workload,
|
||||||
None,
|
None,
|
||||||
|
true, // ipv6_enabled for tests
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
|
@ -148,7 +148,7 @@ impl crate::proxy::SocketFactory for InPodSocketPortReuseFactory {
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
if let Err(e) = sock.set_reuseport(true) {
|
if let Err(e) = sock.set_reuseport(true) {
|
||||||
tracing::warn!("setting set_reuseport failed: {} addr: {}", e, addr);
|
tracing::warn!("setting set_reuseport failed: {e} addr: {addr}");
|
||||||
}
|
}
|
||||||
|
|
||||||
sock.bind(addr)?;
|
sock.bind(addr)?;
|
||||||
|
|
|
@ -37,7 +37,7 @@ use std::os::fd::{AsRawFd, OwnedFd};
|
||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
|
|
||||||
pub fn uid(i: usize) -> crate::inpod::WorkloadUid {
|
pub fn uid(i: usize) -> crate::inpod::WorkloadUid {
|
||||||
crate::inpod::WorkloadUid::new(format!("uid{}", i))
|
crate::inpod::WorkloadUid::new(format!("uid{i}"))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Fixture {
|
pub struct Fixture {
|
||||||
|
@ -138,7 +138,7 @@ pub async fn read_msg(s: &mut UnixStream) -> WorkloadResponse {
|
||||||
debug!("read {} bytes", read_amount);
|
debug!("read {} bytes", read_amount);
|
||||||
|
|
||||||
let ret = WorkloadResponse::decode(&buf[..read_amount])
|
let ret = WorkloadResponse::decode(&buf[..read_amount])
|
||||||
.unwrap_or_else(|_| panic!("failed to decode. read amount: {}", read_amount));
|
.unwrap_or_else(|_| panic!("failed to decode. read amount: {read_amount}"));
|
||||||
|
|
||||||
debug!("decoded {:?}", ret);
|
debug!("decoded {:?}", ret);
|
||||||
ret
|
ret
|
||||||
|
|
|
@ -401,7 +401,7 @@ pub(crate) mod tests {
|
||||||
assert!(e.contains("EOF"));
|
assert!(e.contains("EOF"));
|
||||||
}
|
}
|
||||||
Ok(()) => {}
|
Ok(()) => {}
|
||||||
Err(e) => panic!("expected error due to EOF {:?}", e),
|
Err(e) => panic!("expected error due to EOF {e:?}"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
24
src/main.rs
24
src/main.rs
|
@ -14,8 +14,9 @@
|
||||||
|
|
||||||
extern crate core;
|
extern crate core;
|
||||||
|
|
||||||
|
use nix::sys::resource::{Resource, getrlimit, setrlimit};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tracing::info;
|
use tracing::{info, warn};
|
||||||
use ztunnel::*;
|
use ztunnel::*;
|
||||||
|
|
||||||
#[cfg(feature = "jemalloc")]
|
#[cfg(feature = "jemalloc")]
|
||||||
|
@ -28,6 +29,26 @@ static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
|
||||||
#[unsafe(export_name = "malloc_conf")]
|
#[unsafe(export_name = "malloc_conf")]
|
||||||
pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19\0";
|
pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19\0";
|
||||||
|
|
||||||
|
// We use this on Unix systems to increase the number of open file descriptors
|
||||||
|
// if possible. This is useful for high-load scenarios where the default limit
|
||||||
|
// is too low, which can lead to droopped connections and other issues:
|
||||||
|
// see: https://github.com/istio/ztunnel/issues/1585
|
||||||
|
fn increase_open_files_limit() {
|
||||||
|
#[cfg(unix)]
|
||||||
|
if let Ok((soft_limit, hard_limit)) = getrlimit(Resource::RLIMIT_NOFILE) {
|
||||||
|
if let Err(e) = setrlimit(Resource::RLIMIT_NOFILE, hard_limit, hard_limit) {
|
||||||
|
warn!("failed to set file descriptor limits: {e}");
|
||||||
|
} else {
|
||||||
|
info!(
|
||||||
|
"set file descriptor limits from {} to {}",
|
||||||
|
soft_limit, hard_limit
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
warn!("failed to get file descriptor limits");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn main() -> anyhow::Result<()> {
|
fn main() -> anyhow::Result<()> {
|
||||||
let _log_flush = telemetry::setup_logging();
|
let _log_flush = telemetry::setup_logging();
|
||||||
|
|
||||||
|
@ -74,6 +95,7 @@ fn version() -> anyhow::Result<()> {
|
||||||
|
|
||||||
async fn proxy(cfg: Arc<config::Config>) -> anyhow::Result<()> {
|
async fn proxy(cfg: Arc<config::Config>) -> anyhow::Result<()> {
|
||||||
info!("version: {}", version::BuildInfo::new());
|
info!("version: {}", version::BuildInfo::new());
|
||||||
|
increase_open_files_limit();
|
||||||
info!("running with config: {}", serde_yaml::to_string(&cfg)?);
|
info!("running with config: {}", serde_yaml::to_string(&cfg)?);
|
||||||
app::build(cfg).await?.wait_termination().await
|
app::build(cfg).await?.wait_termination().await
|
||||||
}
|
}
|
||||||
|
|
11
src/proxy.rs
11
src/proxy.rs
|
@ -306,7 +306,7 @@ impl Proxy {
|
||||||
old_cfg.inbound_addr = inbound.address();
|
old_cfg.inbound_addr = inbound.address();
|
||||||
let mut new_pi = (*pi).clone();
|
let mut new_pi = (*pi).clone();
|
||||||
new_pi.cfg = Arc::new(old_cfg);
|
new_pi.cfg = Arc::new(old_cfg);
|
||||||
std::mem::swap(&mut pi, &mut Arc::new(new_pi));
|
pi = Arc::new(new_pi);
|
||||||
warn!("TEST FAKE: new address is {:?}", pi.cfg.inbound_addr);
|
warn!("TEST FAKE: new address is {:?}", pi.cfg.inbound_addr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -373,7 +373,7 @@ impl fmt::Display for AuthorizationRejectionError {
|
||||||
match self {
|
match self {
|
||||||
Self::NoWorkload => write!(fmt, "workload not found"),
|
Self::NoWorkload => write!(fmt, "workload not found"),
|
||||||
Self::WorkloadMismatch => write!(fmt, "workload mismatch"),
|
Self::WorkloadMismatch => write!(fmt, "workload mismatch"),
|
||||||
Self::ExplicitlyDenied(a, b) => write!(fmt, "explicitly denied by: {}/{}", a, b),
|
Self::ExplicitlyDenied(a, b) => write!(fmt, "explicitly denied by: {a}/{b}"),
|
||||||
Self::NotAllowed => write!(fmt, "allow policies exist, but none allowed"),
|
Self::NotAllowed => write!(fmt, "allow policies exist, but none allowed"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -484,6 +484,9 @@ pub enum Error {
|
||||||
#[error("requested service {0} found, but has no IP addresses")]
|
#[error("requested service {0} found, but has no IP addresses")]
|
||||||
NoIPForService(String),
|
NoIPForService(String),
|
||||||
|
|
||||||
|
#[error("no service for target address: {0}")]
|
||||||
|
NoService(SocketAddr),
|
||||||
|
|
||||||
#[error(
|
#[error(
|
||||||
"ip addresses were resolved for workload {0}, but valid dns response had no A/AAAA records"
|
"ip addresses were resolved for workload {0}, but valid dns response had no A/AAAA records"
|
||||||
)]
|
)]
|
||||||
|
@ -844,8 +847,8 @@ impl HboneAddress {
|
||||||
impl std::fmt::Display for HboneAddress {
|
impl std::fmt::Display for HboneAddress {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
match self {
|
match self {
|
||||||
HboneAddress::SocketAddr(addr) => write!(f, "{}", addr),
|
HboneAddress::SocketAddr(addr) => write!(f, "{addr}"),
|
||||||
HboneAddress::SvcHostname(host, port) => write!(f, "{}:{}", host, port),
|
HboneAddress::SvcHostname(host, port) => write!(f, "{host}:{port}"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
use crate::copy;
|
use crate::copy;
|
||||||
use bytes::{BufMut, Bytes};
|
use bytes::Bytes;
|
||||||
use futures_core::ready;
|
use futures_core::ready;
|
||||||
use h2::Reason;
|
use h2::Reason;
|
||||||
use std::io::Error;
|
use std::io::Error;
|
||||||
|
@ -85,7 +85,10 @@ pub struct H2StreamWriteHalf {
|
||||||
_dropped: Option<DropCounter>,
|
_dropped: Option<DropCounter>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct TokioH2Stream(H2Stream);
|
pub struct TokioH2Stream {
|
||||||
|
stream: H2Stream,
|
||||||
|
buf: Bytes,
|
||||||
|
}
|
||||||
|
|
||||||
struct DropCounter {
|
struct DropCounter {
|
||||||
// Whether the other end of this shared counter has already dropped.
|
// Whether the other end of this shared counter has already dropped.
|
||||||
|
@ -144,7 +147,10 @@ impl Drop for DropCounter {
|
||||||
// then the specific implementation will conflict with the generic one.
|
// then the specific implementation will conflict with the generic one.
|
||||||
impl TokioH2Stream {
|
impl TokioH2Stream {
|
||||||
pub fn new(stream: H2Stream) -> Self {
|
pub fn new(stream: H2Stream) -> Self {
|
||||||
Self(stream)
|
Self {
|
||||||
|
stream,
|
||||||
|
buf: Bytes::new(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -154,21 +160,21 @@ impl tokio::io::AsyncRead for TokioH2Stream {
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
buf: &mut tokio::io::ReadBuf<'_>,
|
buf: &mut tokio::io::ReadBuf<'_>,
|
||||||
) -> Poll<std::io::Result<()>> {
|
) -> Poll<std::io::Result<()>> {
|
||||||
let pinned = std::pin::Pin::new(&mut self.0.read);
|
// Just return the bytes we have left over and don't poll the stream because
|
||||||
copy::ResizeBufRead::poll_bytes(pinned, cx).map(|r| match r {
|
// its unclear what to do if there are bytes left over from the previous read, and when we
|
||||||
Ok(bytes) => {
|
// poll, we get an error.
|
||||||
if buf.remaining() < bytes.len() {
|
if self.buf.is_empty() {
|
||||||
Err(Error::other(format!(
|
// If we have no unread bytes, we can poll the stream
|
||||||
"kould overflow buffer of with {} remaining",
|
// and fill self.buf with the bytes we read.
|
||||||
buf.remaining()
|
let pinned = std::pin::Pin::new(&mut self.stream.read);
|
||||||
)))
|
let res = ready!(copy::ResizeBufRead::poll_bytes(pinned, cx))?;
|
||||||
} else {
|
self.buf = res;
|
||||||
buf.put(bytes);
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
}
|
// Copy as many bytes as we can from self.buf.
|
||||||
Err(e) => Err(e),
|
let cnt = Ord::min(buf.remaining(), self.buf.len());
|
||||||
})
|
buf.put_slice(&self.buf[..cnt]);
|
||||||
|
self.buf = self.buf.split_off(cnt);
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -178,7 +184,7 @@ impl tokio::io::AsyncWrite for TokioH2Stream {
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
buf: &[u8],
|
buf: &[u8],
|
||||||
) -> Poll<Result<usize, tokio::io::Error>> {
|
) -> Poll<Result<usize, tokio::io::Error>> {
|
||||||
let pinned = std::pin::Pin::new(&mut self.0.write);
|
let pinned = std::pin::Pin::new(&mut self.stream.write);
|
||||||
let buf = Bytes::copy_from_slice(buf);
|
let buf = Bytes::copy_from_slice(buf);
|
||||||
copy::AsyncWriteBuf::poll_write_buf(pinned, cx, buf)
|
copy::AsyncWriteBuf::poll_write_buf(pinned, cx, buf)
|
||||||
}
|
}
|
||||||
|
@ -187,7 +193,7 @@ impl tokio::io::AsyncWrite for TokioH2Stream {
|
||||||
mut self: Pin<&mut Self>,
|
mut self: Pin<&mut Self>,
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
) -> Poll<Result<(), std::io::Error>> {
|
) -> Poll<Result<(), std::io::Error>> {
|
||||||
let pinned = std::pin::Pin::new(&mut self.0.write);
|
let pinned = std::pin::Pin::new(&mut self.stream.write);
|
||||||
copy::AsyncWriteBuf::poll_flush(pinned, cx)
|
copy::AsyncWriteBuf::poll_flush(pinned, cx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -195,7 +201,7 @@ impl tokio::io::AsyncWrite for TokioH2Stream {
|
||||||
mut self: Pin<&mut Self>,
|
mut self: Pin<&mut Self>,
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
) -> Poll<Result<(), std::io::Error>> {
|
) -> Poll<Result<(), std::io::Error>> {
|
||||||
let pinned = std::pin::Pin::new(&mut self.0.write);
|
let pinned = std::pin::Pin::new(&mut self.stream.write);
|
||||||
copy::AsyncWriteBuf::poll_shutdown(pinned, cx)
|
copy::AsyncWriteBuf::poll_shutdown(pinned, cx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -391,7 +391,7 @@ pub fn log_early_deny<E: std::error::Error>(
|
||||||
"inbound"
|
"inbound"
|
||||||
},
|
},
|
||||||
|
|
||||||
error = format!("{}", err),
|
error = format!("{err}"),
|
||||||
|
|
||||||
"connection failed"
|
"connection failed"
|
||||||
);
|
);
|
||||||
|
|
|
@ -35,10 +35,10 @@ use crate::proxy::{ConnectionOpen, ConnectionResult, DerivedWorkload, metrics};
|
||||||
use crate::drain::DrainWatcher;
|
use crate::drain::DrainWatcher;
|
||||||
use crate::drain::run_with_drain;
|
use crate::drain::run_with_drain;
|
||||||
use crate::proxy::h2::{H2Stream, client::WorkloadKey};
|
use crate::proxy::h2::{H2Stream, client::WorkloadKey};
|
||||||
use crate::state::ServiceResolutionMode;
|
use crate::state::service::{Service, ServiceDescription};
|
||||||
use crate::state::service::ServiceDescription;
|
|
||||||
use crate::state::workload::OutboundProtocol;
|
use crate::state::workload::OutboundProtocol;
|
||||||
use crate::state::workload::{InboundProtocol, NetworkAddress, Workload, address::Address};
|
use crate::state::workload::{InboundProtocol, NetworkAddress, Workload, address::Address};
|
||||||
|
use crate::state::{ServiceResolutionMode, Upstream};
|
||||||
use crate::{assertions, copy, proxy, socket};
|
use crate::{assertions, copy, proxy, socket};
|
||||||
|
|
||||||
use super::h2::TokioH2Stream;
|
use super::h2::TokioH2Stream;
|
||||||
|
@ -369,6 +369,83 @@ impl OutboundConnection {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This function is called when the select next hop is on a different network,
|
||||||
|
// so we expect the upstream workload to have a network gatewy configured.
|
||||||
|
//
|
||||||
|
// When we use a gateway to reach to a workload on a remote network we have to
|
||||||
|
// use double HBONE (HBONE incapsulated inside HBONE). The gateway will
|
||||||
|
// terminate the outer HBONE tunnel and forward the inner HBONE to the actual
|
||||||
|
// destination as a opaque stream of bytes and the actual destination will
|
||||||
|
// interpret it as an HBONE connection.
|
||||||
|
//
|
||||||
|
// If the upstream workload does not have an E/W gateway this function returns
|
||||||
|
// an error indicating that it could not find a valid destination.
|
||||||
|
//
|
||||||
|
// A note about double HBONE, in double HBONE both inner and outer HBONE use
|
||||||
|
// destination service name as HBONE target URI.
|
||||||
|
//
|
||||||
|
// Having target URI in the outer HBONE tunnel allows E/W gateway to figure out
|
||||||
|
// where to route the data next witout the need to terminate inner HBONE tunnel.
|
||||||
|
// In other words, it could forward inner HBONE as if it's an opaque stream of
|
||||||
|
// bytes without trying to interpret it.
|
||||||
|
//
|
||||||
|
// NOTE: when connecting through an E/W gateway, regardless of whether there is
|
||||||
|
// a waypoint or not, we always use service hostname and the service port. It's
|
||||||
|
// somewhat different from how regular HBONE works, so I'm calling it out here.
|
||||||
|
async fn build_request_through_gateway(
|
||||||
|
&self,
|
||||||
|
source: Arc<Workload>,
|
||||||
|
// next hop on the remote network that we picked as our destination.
|
||||||
|
// It may be a local view of a Waypoint workload on remote network or
|
||||||
|
// a local view of the service workload (when waypoint is not
|
||||||
|
// configured).
|
||||||
|
upstream: Upstream,
|
||||||
|
// This is a target service we wanted to reach in the first place.
|
||||||
|
//
|
||||||
|
// NOTE: Crossing network boundaries is only supported for services
|
||||||
|
// at the moment, so we should always have a service we could use.
|
||||||
|
service: &Service,
|
||||||
|
target: SocketAddr,
|
||||||
|
) -> Result<Request, Error> {
|
||||||
|
if let Some(gateway) = &upstream.workload.network_gateway {
|
||||||
|
let gateway_upstream = self
|
||||||
|
.pi
|
||||||
|
.state
|
||||||
|
.fetch_network_gateway(gateway, &source, target)
|
||||||
|
.await?;
|
||||||
|
let hbone_target_destination = Some(HboneAddress::SvcHostname(
|
||||||
|
service.hostname.clone(),
|
||||||
|
target.port(),
|
||||||
|
));
|
||||||
|
|
||||||
|
debug!("built request to a destination on another network through an E/W gateway");
|
||||||
|
Ok(Request {
|
||||||
|
protocol: OutboundProtocol::DOUBLEHBONE,
|
||||||
|
source,
|
||||||
|
hbone_target_destination,
|
||||||
|
actual_destination_workload: Some(gateway_upstream.workload.clone()),
|
||||||
|
intended_destination_service: Some(ServiceDescription::from(service)),
|
||||||
|
actual_destination: gateway_upstream.workload_socket_addr().ok_or(
|
||||||
|
Error::NoValidDestination(Box::new((*gateway_upstream.workload).clone())),
|
||||||
|
)?,
|
||||||
|
// The outer tunnel of double HBONE is terminated by the E/W
|
||||||
|
// gateway and so for the credentials of the next hop
|
||||||
|
// (upstream_sans) we use gateway credentials.
|
||||||
|
upstream_sans: gateway_upstream.workload_and_services_san(),
|
||||||
|
// The inner HBONE tunnel is terminated by either the server
|
||||||
|
// we want to reach or a Waypoint in front of it, depending on
|
||||||
|
// the configuration. So for the final destination credentials
|
||||||
|
// (final_sans) we use the upstream workload credentials.
|
||||||
|
final_sans: upstream.service_sans(),
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
// Do not try to send cross-network traffic without network gateway.
|
||||||
|
Err(Error::NoValidDestination(Box::new(
|
||||||
|
(*upstream.workload).clone(),
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// build_request computes all information about the request we should send
|
// build_request computes all information about the request we should send
|
||||||
// TODO: Do we want a single lock for source and upstream...?
|
// TODO: Do we want a single lock for source and upstream...?
|
||||||
async fn build_request(
|
async fn build_request(
|
||||||
|
@ -381,7 +458,7 @@ impl OutboundConnection {
|
||||||
|
|
||||||
// If this is to-service traffic check for a service waypoint
|
// If this is to-service traffic check for a service waypoint
|
||||||
// Capture result of whether this is svc addressed
|
// Capture result of whether this is svc addressed
|
||||||
let svc_addressed = if let Some(Address::Service(target_service)) = state
|
let service = if let Some(Address::Service(target_service)) = state
|
||||||
.fetch_address(&NetworkAddress {
|
.fetch_address(&NetworkAddress {
|
||||||
network: self.pi.cfg.network.clone(),
|
network: self.pi.cfg.network.clone(),
|
||||||
address: target.ip(),
|
address: target.ip(),
|
||||||
|
@ -393,6 +470,18 @@ impl OutboundConnection {
|
||||||
.fetch_service_waypoint(&target_service, &source_workload, target)
|
.fetch_service_waypoint(&target_service, &source_workload, target)
|
||||||
.await?
|
.await?
|
||||||
{
|
{
|
||||||
|
if waypoint.workload.network != source_workload.network {
|
||||||
|
debug!("picked a waypoint on remote network");
|
||||||
|
return self
|
||||||
|
.build_request_through_gateway(
|
||||||
|
source_workload.clone(),
|
||||||
|
waypoint,
|
||||||
|
&target_service,
|
||||||
|
target,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
let upstream_sans = waypoint.workload_and_services_san();
|
let upstream_sans = waypoint.workload_and_services_san();
|
||||||
let actual_destination =
|
let actual_destination =
|
||||||
waypoint
|
waypoint
|
||||||
|
@ -413,10 +502,10 @@ impl OutboundConnection {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
// this was service addressed but we did not find a waypoint
|
// this was service addressed but we did not find a waypoint
|
||||||
true
|
Some(target_service)
|
||||||
} else {
|
} else {
|
||||||
// this wasn't service addressed
|
// this wasn't service addressed
|
||||||
false
|
None
|
||||||
};
|
};
|
||||||
|
|
||||||
let Some(us) = state
|
let Some(us) = state
|
||||||
|
@ -428,7 +517,7 @@ impl OutboundConnection {
|
||||||
)
|
)
|
||||||
.await?
|
.await?
|
||||||
else {
|
else {
|
||||||
if svc_addressed {
|
if service.is_some() {
|
||||||
return Err(Error::NoHealthyUpstream(target));
|
return Err(Error::NoHealthyUpstream(target));
|
||||||
}
|
}
|
||||||
debug!("built request as passthrough; no upstream found");
|
debug!("built request as passthrough; no upstream found");
|
||||||
|
@ -446,37 +535,26 @@ impl OutboundConnection {
|
||||||
|
|
||||||
// Check whether we are using an E/W gateway and sending cross network traffic
|
// Check whether we are using an E/W gateway and sending cross network traffic
|
||||||
if us.workload.network != source_workload.network {
|
if us.workload.network != source_workload.network {
|
||||||
if let Some(ew_gtw) = &us.workload.network_gateway {
|
// Workloads on remote network must be service addressed, so if we got here
|
||||||
let gtw_us = {
|
// and we don't have a service for the original target address then it's a
|
||||||
self.pi
|
// bug either in ztunnel itself or in istiod.
|
||||||
.state
|
//
|
||||||
.fetch_network_gateway(ew_gtw, &source_workload, target)
|
// For a double HBONE protocol implementation we have to know the
|
||||||
.await?
|
// destination service and if there is no service for the target it's a bug.
|
||||||
};
|
//
|
||||||
|
// This situation "should never happen" because for workloads fetch_upstream
|
||||||
let svc = us
|
// above only checks the workloads on the same network as this ztunnel
|
||||||
.destination_service
|
// instance and therefore it should not be able to find a workload on a
|
||||||
.as_ref()
|
// different network.
|
||||||
.expect("Workloads with network gateways must be service addressed.");
|
debug_assert!(
|
||||||
let hbone_target_destination =
|
service.is_some(),
|
||||||
Some(HboneAddress::SvcHostname(svc.hostname.clone(), us.port));
|
"workload on remote network is not service addressed"
|
||||||
|
);
|
||||||
return Ok(Request {
|
debug!("picked a workload on remote network");
|
||||||
protocol: OutboundProtocol::DOUBLEHBONE,
|
let service = service.as_ref().ok_or(Error::NoService(target))?;
|
||||||
source: source_workload,
|
return self
|
||||||
hbone_target_destination,
|
.build_request_through_gateway(source_workload.clone(), us, service, target)
|
||||||
actual_destination_workload: Some(gtw_us.workload.clone()),
|
.await;
|
||||||
intended_destination_service: us.destination_service.clone(),
|
|
||||||
actual_destination: gtw_us.workload_socket_addr().ok_or(
|
|
||||||
Error::NoValidDestination(Box::new((*gtw_us.workload).clone())),
|
|
||||||
)?,
|
|
||||||
upstream_sans: gtw_us.workload_and_services_san(),
|
|
||||||
final_sans: us.service_sans(),
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
// Do not try to send cross-network traffic without network gateway.
|
|
||||||
return Err(Error::NoValidDestination(Box::new((*us.workload).clone())));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// We are not using a network gateway and there is no workload address.
|
// We are not using a network gateway and there is no workload address.
|
||||||
|
@ -491,7 +569,7 @@ impl OutboundConnection {
|
||||||
// Check if we need to go through a workload addressed waypoint.
|
// Check if we need to go through a workload addressed waypoint.
|
||||||
// Don't traverse waypoint twice if the source is sandwich-outbound.
|
// Don't traverse waypoint twice if the source is sandwich-outbound.
|
||||||
// Don't traverse waypoint if traffic was addressed to a service (handled before)
|
// Don't traverse waypoint if traffic was addressed to a service (handled before)
|
||||||
if !from_waypoint && !svc_addressed {
|
if !from_waypoint && service.is_none() {
|
||||||
// For case upstream server has enabled waypoint
|
// For case upstream server has enabled waypoint
|
||||||
let waypoint = state
|
let waypoint = state
|
||||||
.fetch_workload_waypoint(&us.workload, &source_workload, target)
|
.fetch_workload_waypoint(&us.workload, &source_workload, target)
|
||||||
|
@ -816,6 +894,8 @@ mod tests {
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn build_request_double_hbone() {
|
async fn build_request_double_hbone() {
|
||||||
|
// example.com service has a workload on remote network.
|
||||||
|
// E/W gateway is addressed by an IP.
|
||||||
run_build_request_multi(
|
run_build_request_multi(
|
||||||
"127.0.0.1",
|
"127.0.0.1",
|
||||||
"127.0.0.3:80",
|
"127.0.0.3:80",
|
||||||
|
@ -867,11 +947,13 @@ mod tests {
|
||||||
],
|
],
|
||||||
Some(ExpectedRequest {
|
Some(ExpectedRequest {
|
||||||
protocol: OutboundProtocol::DOUBLEHBONE,
|
protocol: OutboundProtocol::DOUBLEHBONE,
|
||||||
hbone_destination: "example.com:8080",
|
hbone_destination: "example.com:80",
|
||||||
destination: "10.22.1.1:15009",
|
destination: "10.22.1.1:15009",
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
// example.com service has a workload on remote network.
|
||||||
|
// E/W gateway is addressed by a hostname.
|
||||||
run_build_request_multi(
|
run_build_request_multi(
|
||||||
"127.0.0.1",
|
"127.0.0.1",
|
||||||
"127.0.0.3:80",
|
"127.0.0.3:80",
|
||||||
|
@ -944,11 +1026,218 @@ mod tests {
|
||||||
],
|
],
|
||||||
Some(ExpectedRequest {
|
Some(ExpectedRequest {
|
||||||
protocol: OutboundProtocol::DOUBLEHBONE,
|
protocol: OutboundProtocol::DOUBLEHBONE,
|
||||||
hbone_destination: "example.com:8080",
|
hbone_destination: "example.com:80",
|
||||||
destination: "127.0.0.5:15008",
|
destination: "127.0.0.5:15008",
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
// example.com service has a waypoint and waypoint workload is on remote network.
|
||||||
|
// E/W gateway is addressed by an IP.
|
||||||
|
run_build_request_multi(
|
||||||
|
"127.0.0.1",
|
||||||
|
"127.0.0.3:80",
|
||||||
|
vec![
|
||||||
|
XdsAddressType::Service(XdsService {
|
||||||
|
hostname: "example.com".to_string(),
|
||||||
|
addresses: vec![XdsNetworkAddress {
|
||||||
|
network: "".to_string(),
|
||||||
|
address: vec![127, 0, 0, 3],
|
||||||
|
}],
|
||||||
|
ports: vec![Port {
|
||||||
|
service_port: 80,
|
||||||
|
target_port: 8080,
|
||||||
|
}],
|
||||||
|
waypoint: Some(xds::istio::workload::GatewayAddress {
|
||||||
|
destination: Some(
|
||||||
|
xds::istio::workload::gateway_address::Destination::Hostname(
|
||||||
|
XdsNamespacedHostname {
|
||||||
|
namespace: Default::default(),
|
||||||
|
hostname: "waypoint.com".into(),
|
||||||
|
},
|
||||||
|
),
|
||||||
|
),
|
||||||
|
hbone_mtls_port: 15008,
|
||||||
|
}),
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
XdsAddressType::Service(XdsService {
|
||||||
|
hostname: "waypoint.com".to_string(),
|
||||||
|
addresses: vec![XdsNetworkAddress {
|
||||||
|
network: "".to_string(),
|
||||||
|
address: vec![127, 0, 0, 4],
|
||||||
|
}],
|
||||||
|
ports: vec![Port {
|
||||||
|
service_port: 15008,
|
||||||
|
target_port: 15008,
|
||||||
|
}],
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
XdsAddressType::Workload(XdsWorkload {
|
||||||
|
uid: "Kubernetes//Pod/default/remote-waypoint-pod".to_string(),
|
||||||
|
addresses: vec![],
|
||||||
|
network: "remote".to_string(),
|
||||||
|
network_gateway: Some(xds::istio::workload::GatewayAddress {
|
||||||
|
destination: Some(
|
||||||
|
xds::istio::workload::gateway_address::Destination::Address(
|
||||||
|
XdsNetworkAddress {
|
||||||
|
network: "remote".to_string(),
|
||||||
|
address: vec![10, 22, 1, 1],
|
||||||
|
},
|
||||||
|
),
|
||||||
|
),
|
||||||
|
hbone_mtls_port: 15009,
|
||||||
|
}),
|
||||||
|
services: std::collections::HashMap::from([(
|
||||||
|
"/waypoint.com".to_string(),
|
||||||
|
PortList {
|
||||||
|
ports: vec![Port {
|
||||||
|
service_port: 15008,
|
||||||
|
target_port: 15008,
|
||||||
|
}],
|
||||||
|
},
|
||||||
|
)]),
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
XdsAddressType::Workload(XdsWorkload {
|
||||||
|
uid: "Kubernetes//Pod/default/remote-ew-gtw".to_string(),
|
||||||
|
addresses: vec![Bytes::copy_from_slice(&[10, 22, 1, 1])],
|
||||||
|
network: "remote".to_string(),
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
],
|
||||||
|
Some(ExpectedRequest {
|
||||||
|
protocol: OutboundProtocol::DOUBLEHBONE,
|
||||||
|
hbone_destination: "example.com:80",
|
||||||
|
destination: "10.22.1.1:15009",
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn build_request_failover_to_remote() {
|
||||||
|
// Similar to the double HBONE test that we already have, but it sets up a scenario when
|
||||||
|
// load balancing logic will pick a workload on a remote cluster when local workloads are
|
||||||
|
// unhealthy, thus showing the expected failover behavior.
|
||||||
|
let service = XdsAddressType::Service(XdsService {
|
||||||
|
hostname: "example.com".to_string(),
|
||||||
|
addresses: vec![XdsNetworkAddress {
|
||||||
|
network: "".to_string(),
|
||||||
|
address: vec![127, 0, 0, 3],
|
||||||
|
}],
|
||||||
|
ports: vec![Port {
|
||||||
|
service_port: 80,
|
||||||
|
target_port: 8080,
|
||||||
|
}],
|
||||||
|
// Prefer routing to workloads on the same network, but when nothing is healthy locally
|
||||||
|
// allow failing over to remote networks.
|
||||||
|
load_balancing: Some(xds::istio::workload::LoadBalancing {
|
||||||
|
routing_preference: vec![
|
||||||
|
xds::istio::workload::load_balancing::Scope::Network.into(),
|
||||||
|
],
|
||||||
|
mode: xds::istio::workload::load_balancing::Mode::Failover.into(),
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
..Default::default()
|
||||||
|
});
|
||||||
|
let ew_gateway = XdsAddressType::Workload(XdsWorkload {
|
||||||
|
uid: "Kubernetes//Pod/default/remote-ew-gtw".to_string(),
|
||||||
|
addresses: vec![Bytes::copy_from_slice(&[10, 22, 1, 1])],
|
||||||
|
network: "remote".to_string(),
|
||||||
|
..Default::default()
|
||||||
|
});
|
||||||
|
let remote_workload = XdsAddressType::Workload(XdsWorkload {
|
||||||
|
uid: "Kubernetes//Pod/default/remote-example.com-pod".to_string(),
|
||||||
|
addresses: vec![],
|
||||||
|
network: "remote".to_string(),
|
||||||
|
network_gateway: Some(xds::istio::workload::GatewayAddress {
|
||||||
|
destination: Some(xds::istio::workload::gateway_address::Destination::Address(
|
||||||
|
XdsNetworkAddress {
|
||||||
|
network: "remote".to_string(),
|
||||||
|
address: vec![10, 22, 1, 1],
|
||||||
|
},
|
||||||
|
)),
|
||||||
|
hbone_mtls_port: 15009,
|
||||||
|
}),
|
||||||
|
services: std::collections::HashMap::from([(
|
||||||
|
"/example.com".to_string(),
|
||||||
|
PortList {
|
||||||
|
ports: vec![Port {
|
||||||
|
service_port: 80,
|
||||||
|
target_port: 8080,
|
||||||
|
}],
|
||||||
|
},
|
||||||
|
)]),
|
||||||
|
..Default::default()
|
||||||
|
});
|
||||||
|
let healthy_local_workload = XdsAddressType::Workload(XdsWorkload {
|
||||||
|
uid: "Kubernetes//Pod/default/local-example.com-pod".to_string(),
|
||||||
|
addresses: vec![Bytes::copy_from_slice(&[127, 0, 0, 2])],
|
||||||
|
network: "".to_string(),
|
||||||
|
tunnel_protocol: xds::istio::workload::TunnelProtocol::Hbone.into(),
|
||||||
|
services: std::collections::HashMap::from([(
|
||||||
|
"/example.com".to_string(),
|
||||||
|
PortList {
|
||||||
|
ports: vec![Port {
|
||||||
|
service_port: 80,
|
||||||
|
target_port: 8080,
|
||||||
|
}],
|
||||||
|
},
|
||||||
|
)]),
|
||||||
|
status: xds::istio::workload::WorkloadStatus::Healthy.into(),
|
||||||
|
..Default::default()
|
||||||
|
});
|
||||||
|
let unhealthy_local_workload = XdsAddressType::Workload(XdsWorkload {
|
||||||
|
uid: "Kubernetes//Pod/default/local-example.com-pod".to_string(),
|
||||||
|
addresses: vec![Bytes::copy_from_slice(&[127, 0, 0, 2])],
|
||||||
|
network: "".to_string(),
|
||||||
|
tunnel_protocol: xds::istio::workload::TunnelProtocol::Hbone.into(),
|
||||||
|
services: std::collections::HashMap::from([(
|
||||||
|
"/example.com".to_string(),
|
||||||
|
PortList {
|
||||||
|
ports: vec![Port {
|
||||||
|
service_port: 80,
|
||||||
|
target_port: 8080,
|
||||||
|
}],
|
||||||
|
},
|
||||||
|
)]),
|
||||||
|
status: xds::istio::workload::WorkloadStatus::Unhealthy.into(),
|
||||||
|
..Default::default()
|
||||||
|
});
|
||||||
|
|
||||||
|
run_build_request_multi(
|
||||||
|
"127.0.0.1",
|
||||||
|
"127.0.0.3:80",
|
||||||
|
vec![
|
||||||
|
service.clone(),
|
||||||
|
ew_gateway.clone(),
|
||||||
|
remote_workload.clone(),
|
||||||
|
healthy_local_workload.clone(),
|
||||||
|
],
|
||||||
|
Some(ExpectedRequest {
|
||||||
|
protocol: OutboundProtocol::HBONE,
|
||||||
|
hbone_destination: "127.0.0.2:8080",
|
||||||
|
destination: "127.0.0.2:15008",
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
run_build_request_multi(
|
||||||
|
"127.0.0.1",
|
||||||
|
"127.0.0.3:80",
|
||||||
|
vec![
|
||||||
|
service.clone(),
|
||||||
|
ew_gateway.clone(),
|
||||||
|
remote_workload.clone(),
|
||||||
|
unhealthy_local_workload.clone(),
|
||||||
|
],
|
||||||
|
Some(ExpectedRequest {
|
||||||
|
protocol: OutboundProtocol::DOUBLEHBONE,
|
||||||
|
hbone_destination: "example.com:80",
|
||||||
|
destination: "10.22.1.1:15009",
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
|
|
@ -594,9 +594,10 @@ mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This is really a test for TokioH2Stream, but its nicer here because we have access to
|
/// This is really a test for TokioH2Stream, but its nicer here because we have access to
|
||||||
/// streams
|
/// streams.
|
||||||
|
/// Most important, we make sure there are no panics.
|
||||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
async fn small_reads() {
|
async fn read_buffering() {
|
||||||
let (mut pool, srv) = setup_test(3).await;
|
let (mut pool, srv) = setup_test(3).await;
|
||||||
|
|
||||||
let key = key(&srv, 2);
|
let key = key(&srv, 2);
|
||||||
|
@ -612,13 +613,28 @@ mod test {
|
||||||
let c = pool.send_request_pooled(&key.clone(), req()).await.unwrap();
|
let c = pool.send_request_pooled(&key.clone(), req()).await.unwrap();
|
||||||
let mut c = TokioH2Stream::new(c);
|
let mut c = TokioH2Stream::new(c);
|
||||||
c.write_all(b"abcde").await.unwrap();
|
c.write_all(b"abcde").await.unwrap();
|
||||||
let mut b = [0u8; 0];
|
let mut b = [0u8; 100];
|
||||||
// Crucially, this should error rather than panic.
|
// Properly buffer reads and don't error
|
||||||
if let Err(e) = c.read(&mut b).await {
|
assert_eq!(c.read(&mut b).await.unwrap(), 8);
|
||||||
assert_eq!(e.kind(), io::ErrorKind::Other);
|
assert_eq!(&b[..8], b"poolsrv\n"); // this is added by itself
|
||||||
} else {
|
assert_eq!(c.read(&mut b[..1]).await.unwrap(), 1);
|
||||||
panic!("Should have errored");
|
assert_eq!(&b[..1], b"a");
|
||||||
}
|
assert_eq!(c.read(&mut b[..1]).await.unwrap(), 1);
|
||||||
|
assert_eq!(&b[..1], b"b");
|
||||||
|
assert_eq!(c.read(&mut b[..1]).await.unwrap(), 1);
|
||||||
|
assert_eq!(&b[..1], b"c");
|
||||||
|
assert_eq!(c.read(&mut b).await.unwrap(), 2); // there are only two bytes left
|
||||||
|
assert_eq!(&b[..2], b"de");
|
||||||
|
|
||||||
|
// Once we drop the pool, we should still retained the buffered data,
|
||||||
|
// but then we should error.
|
||||||
|
c.write_all(b"abcde").await.unwrap();
|
||||||
|
assert_eq!(c.read(&mut b[..3]).await.unwrap(), 3);
|
||||||
|
assert_eq!(&b[..3], b"abc");
|
||||||
|
drop(pool);
|
||||||
|
assert_eq!(c.read(&mut b[..2]).await.unwrap(), 2);
|
||||||
|
assert_eq!(&b[..2], b"de");
|
||||||
|
assert!(c.read(&mut b).await.is_err());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
|
|
@ -203,8 +203,7 @@ async fn negotiate_socks_connection(
|
||||||
|
|
||||||
if version != 0x05 {
|
if version != 0x05 {
|
||||||
return Err(SocksError::invalid_protocol(format!(
|
return Err(SocksError::invalid_protocol(format!(
|
||||||
"unsupported version {}",
|
"unsupported version {version}",
|
||||||
version
|
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -113,6 +113,7 @@ impl ProxyFactory {
|
||||||
socket_factory.as_ref(),
|
socket_factory.as_ref(),
|
||||||
local_workload_information.as_fetcher(),
|
local_workload_information.as_fetcher(),
|
||||||
self.config.prefered_service_namespace.clone(),
|
self.config.prefered_service_namespace.clone(),
|
||||||
|
self.config.ipv6_enabled,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
resolver = Some(server.resolver());
|
resolver = Some(server.resolver());
|
||||||
|
|
12
src/state.rs
12
src/state.rs
|
@ -870,7 +870,7 @@ impl DemandProxyState {
|
||||||
self.finalize_upstream(source_workload, target_address, res)
|
self.finalize_upstream(source_workload, target_address, res)
|
||||||
.await?
|
.await?
|
||||||
.ok_or_else(|| {
|
.ok_or_else(|| {
|
||||||
Error::UnknownNetworkGateway(format!("network gateway {:?} not found", gw_address))
|
Error::UnknownNetworkGateway(format!("network gateway {gw_address:?} not found"))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -926,7 +926,7 @@ impl DemandProxyState {
|
||||||
};
|
};
|
||||||
self.finalize_upstream(source_workload, target_address, res)
|
self.finalize_upstream(source_workload, target_address, res)
|
||||||
.await?
|
.await?
|
||||||
.ok_or_else(|| Error::UnknownWaypoint(format!("waypoint {:?} not found", gw_address)))
|
.ok_or_else(|| Error::UnknownWaypoint(format!("waypoint {gw_address:?} not found")))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn fetch_service_waypoint(
|
pub async fn fetch_service_waypoint(
|
||||||
|
@ -1378,17 +1378,17 @@ mod tests {
|
||||||
fn create_workload(dest_uid: u8) -> Workload {
|
fn create_workload(dest_uid: u8) -> Workload {
|
||||||
Workload {
|
Workload {
|
||||||
name: "test".into(),
|
name: "test".into(),
|
||||||
namespace: format!("ns{}", dest_uid).into(),
|
namespace: format!("ns{dest_uid}").into(),
|
||||||
trust_domain: "cluster.local".into(),
|
trust_domain: "cluster.local".into(),
|
||||||
service_account: "defaultacct".into(),
|
service_account: "defaultacct".into(),
|
||||||
workload_ips: vec![IpAddr::V4(Ipv4Addr::new(192, 168, 0, dest_uid))],
|
workload_ips: vec![IpAddr::V4(Ipv4Addr::new(192, 168, 0, dest_uid))],
|
||||||
uid: format!("{}", dest_uid).into(),
|
uid: format!("{dest_uid}").into(),
|
||||||
..test_helpers::test_default_workload()
|
..test_helpers::test_default_workload()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_workload(state: &DemandProxyState, dest_uid: u8) -> Arc<Workload> {
|
fn get_workload(state: &DemandProxyState, dest_uid: u8) -> Arc<Workload> {
|
||||||
let key: Strng = format!("{}", dest_uid).into();
|
let key: Strng = format!("{dest_uid}").into();
|
||||||
state.read().workloads.by_uid[&key].clone()
|
state.read().workloads.by_uid[&key].clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1397,7 +1397,7 @@ mod tests {
|
||||||
dest_uid: u8,
|
dest_uid: u8,
|
||||||
src_svc_acct: &str,
|
src_svc_acct: &str,
|
||||||
) -> crate::state::ProxyRbacContext {
|
) -> crate::state::ProxyRbacContext {
|
||||||
let key: Strng = format!("{}", dest_uid).into();
|
let key: Strng = format!("{dest_uid}").into();
|
||||||
let workload = &state.read().workloads.by_uid[&key];
|
let workload = &state.read().workloads.by_uid[&key];
|
||||||
crate::state::ProxyRbacContext {
|
crate::state::ProxyRbacContext {
|
||||||
conn: rbac::Connection {
|
conn: rbac::Connection {
|
||||||
|
|
|
@ -1028,8 +1028,8 @@ mod tests {
|
||||||
},
|
},
|
||||||
)]);
|
)]);
|
||||||
|
|
||||||
let uid1 = format!("cluster1//v1/Pod/default/my-pod/{:?}", ip1);
|
let uid1 = format!("cluster1//v1/Pod/default/my-pod/{ip1:?}");
|
||||||
let uid2 = format!("cluster1//v1/Pod/default/my-pod/{:?}", ip2);
|
let uid2 = format!("cluster1//v1/Pod/default/my-pod/{ip2:?}");
|
||||||
|
|
||||||
updater
|
updater
|
||||||
.insert_workload(
|
.insert_workload(
|
||||||
|
@ -1734,7 +1734,7 @@ mod tests {
|
||||||
|
|
||||||
let xds_ip1 = Bytes::copy_from_slice(&[127, 0, 0, 1]);
|
let xds_ip1 = Bytes::copy_from_slice(&[127, 0, 0, 1]);
|
||||||
let ip1 = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
|
let ip1 = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
|
||||||
let uid1 = format!("cluster1//v1/Pod/default/my-pod/{:?}", ip1);
|
let uid1 = format!("cluster1//v1/Pod/default/my-pod/{ip1:?}");
|
||||||
|
|
||||||
let services = HashMap::from([(
|
let services = HashMap::from([(
|
||||||
"ns/svc1.ns.svc.cluster.local".to_string(),
|
"ns/svc1.ns.svc.cluster.local".to_string(),
|
||||||
|
|
|
@ -170,7 +170,7 @@ impl Visitor<'_> {
|
||||||
} else {
|
} else {
|
||||||
" "
|
" "
|
||||||
};
|
};
|
||||||
write!(self.writer, "{}{:?}", padding, value)
|
write!(self.writer, "{padding}{value:?}")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -188,9 +188,9 @@ impl field::Visit for Visitor<'_> {
|
||||||
// Skip fields that are actually log metadata that have already been handled
|
// Skip fields that are actually log metadata that have already been handled
|
||||||
name if name.starts_with("log.") => Ok(()),
|
name if name.starts_with("log.") => Ok(()),
|
||||||
// For the message, write out the message and a tab to separate the future fields
|
// For the message, write out the message and a tab to separate the future fields
|
||||||
"message" => write!(self.writer, "{:?}\t", val),
|
"message" => write!(self.writer, "{val:?}\t"),
|
||||||
// For the rest, k=v.
|
// For the rest, k=v.
|
||||||
_ => self.write_padded(&format_args!("{}={:?}", field.name(), val)),
|
_ => self.write_padded(&format_args!("{}={val:?}", field.name())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -234,7 +234,7 @@ where
|
||||||
let target = meta.target();
|
let target = meta.target();
|
||||||
// No need to prefix everything
|
// No need to prefix everything
|
||||||
let target = target.strip_prefix("ztunnel::").unwrap_or(target);
|
let target = target.strip_prefix("ztunnel::").unwrap_or(target);
|
||||||
write!(writer, "{}", target)?;
|
write!(writer, "{target}")?;
|
||||||
|
|
||||||
// Write out span fields. Istio logging outside of Rust doesn't really have this concept
|
// Write out span fields. Istio logging outside of Rust doesn't really have this concept
|
||||||
if let Some(scope) = ctx.event_scope() {
|
if let Some(scope) = ctx.event_scope() {
|
||||||
|
@ -243,7 +243,7 @@ where
|
||||||
let ext = span.extensions();
|
let ext = span.extensions();
|
||||||
if let Some(fields) = &ext.get::<FormattedFields<N>>() {
|
if let Some(fields) = &ext.get::<FormattedFields<N>>() {
|
||||||
if !fields.is_empty() {
|
if !fields.is_empty() {
|
||||||
write!(writer, "{{{}}}", fields)?;
|
write!(writer, "{{{fields}}}")?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -285,7 +285,7 @@ impl<S: SerializeMap> Visit for JsonVisitory<S> {
|
||||||
if self.state.is_ok() {
|
if self.state.is_ok() {
|
||||||
self.state = self
|
self.state = self
|
||||||
.serializer
|
.serializer
|
||||||
.serialize_entry(field.name(), &format_args!("{:?}", value))
|
.serialize_entry(field.name(), &format_args!("{value:?}"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -505,7 +505,7 @@ pub mod testing {
|
||||||
.map(|h| {
|
.map(|h| {
|
||||||
h.iter()
|
h.iter()
|
||||||
.sorted_by_key(|(k, _)| *k)
|
.sorted_by_key(|(k, _)| *k)
|
||||||
.map(|(k, err)| format!("{}:{}", k, err))
|
.map(|(k, err)| format!("{k}:{err}"))
|
||||||
.join("\n")
|
.join("\n")
|
||||||
})
|
})
|
||||||
.join("\n\n");
|
.join("\n\n");
|
||||||
|
|
|
@ -169,10 +169,9 @@ pub fn localhost_error_message() -> String {
|
||||||
TEST_VIP,
|
TEST_VIP,
|
||||||
];
|
];
|
||||||
format!(
|
format!(
|
||||||
"These tests use the following loopback addresses: {:?}. \
|
"These tests use the following loopback addresses: {addrs:?}. \
|
||||||
Your OS may require an explicit alias for each. If so, you'll need to manually \
|
Your OS may require an explicit alias for each. If so, you'll need to manually \
|
||||||
configure your system for each IP (e.g. `sudo ifconfig lo0 alias 127.0.0.2 up`).",
|
configure your system for each IP (e.g. `sudo ifconfig lo0 alias 127.0.0.2 up`).",
|
||||||
addrs
|
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -239,7 +238,7 @@ fn test_custom_workload(
|
||||||
hostname_only: bool,
|
hostname_only: bool,
|
||||||
) -> anyhow::Result<LocalWorkload> {
|
) -> anyhow::Result<LocalWorkload> {
|
||||||
let host = match hostname_only {
|
let host = match hostname_only {
|
||||||
true => format!("{}.reflect.internal.", ip_str),
|
true => format!("{ip_str}.reflect.internal."),
|
||||||
false => "".to_string(),
|
false => "".to_string(),
|
||||||
};
|
};
|
||||||
let wips = match hostname_only {
|
let wips = match hostname_only {
|
||||||
|
@ -250,7 +249,7 @@ fn test_custom_workload(
|
||||||
workload_ips: wips,
|
workload_ips: wips,
|
||||||
hostname: host.into(),
|
hostname: host.into(),
|
||||||
protocol,
|
protocol,
|
||||||
uid: format!("cluster1//v1/Pod/default/{}", name).into(),
|
uid: format!("cluster1//v1/Pod/default/{name}").into(),
|
||||||
name: name.into(),
|
name: name.into(),
|
||||||
namespace: "default".into(),
|
namespace: "default".into(),
|
||||||
service_account: "default".into(),
|
service_account: "default".into(),
|
||||||
|
@ -282,7 +281,7 @@ fn test_custom_svc(
|
||||||
}],
|
}],
|
||||||
ports: HashMap::from([(80u16, echo_port)]),
|
ports: HashMap::from([(80u16, echo_port)]),
|
||||||
endpoints: EndpointSet::from_list([Endpoint {
|
endpoints: EndpointSet::from_list([Endpoint {
|
||||||
workload_uid: format!("cluster1//v1/Pod/default/{}", workload_name).into(),
|
workload_uid: format!("cluster1//v1/Pod/default/{workload_name}").into(),
|
||||||
port: HashMap::from([(80u16, echo_port)]),
|
port: HashMap::from([(80u16, echo_port)]),
|
||||||
status: HealthStatus::Healthy,
|
status: HealthStatus::Healthy,
|
||||||
}]),
|
}]),
|
||||||
|
|
|
@ -105,7 +105,7 @@ impl TestApp {
|
||||||
let get_resp = move || async move {
|
let get_resp = move || async move {
|
||||||
let req = Request::builder()
|
let req = Request::builder()
|
||||||
.method(Method::GET)
|
.method(Method::GET)
|
||||||
.uri(format!("http://localhost:{}/{path}", port))
|
.uri(format!("http://localhost:{port}/{path}"))
|
||||||
.header("content-type", "application/json")
|
.header("content-type", "application/json")
|
||||||
.body(Empty::<Bytes>::new())
|
.body(Empty::<Bytes>::new())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
@ -130,7 +130,7 @@ impl TestApp {
|
||||||
let get_resp = move || async move {
|
let get_resp = move || async move {
|
||||||
let req = Request::builder()
|
let req = Request::builder()
|
||||||
.method(Method::GET)
|
.method(Method::GET)
|
||||||
.uri(format!("http://localhost:{}/{path}", port))
|
.uri(format!("http://localhost:{port}/{path}"))
|
||||||
.header("content-type", "application/json")
|
.header("content-type", "application/json")
|
||||||
.body(Empty::<Bytes>::new())
|
.body(Empty::<Bytes>::new())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
|
@ -299,6 +299,7 @@ pub async fn run_dns(responses: HashMap<Name, Vec<IpAddr>>) -> anyhow::Result<Te
|
||||||
state.clone(),
|
state.clone(),
|
||||||
),
|
),
|
||||||
Some("prefered-namespace".to_string()),
|
Some("prefered-namespace".to_string()),
|
||||||
|
true, // ipv6_enabled for tests
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
|
|
@ -428,7 +428,6 @@ mod test {
|
||||||
SystemTime::now() + Duration::from_secs(60),
|
SystemTime::now() + Duration::from_secs(60),
|
||||||
None,
|
None,
|
||||||
TEST_ROOT_KEY,
|
TEST_ROOT_KEY,
|
||||||
TEST_ROOT,
|
|
||||||
);
|
);
|
||||||
let cert1 =
|
let cert1 =
|
||||||
WorkloadCertificate::new(key.as_bytes(), cert.as_bytes(), vec![&joined]).unwrap();
|
WorkloadCertificate::new(key.as_bytes(), cert.as_bytes(), vec![&joined]).unwrap();
|
||||||
|
@ -440,7 +439,6 @@ mod test {
|
||||||
SystemTime::now() + Duration::from_secs(60),
|
SystemTime::now() + Duration::from_secs(60),
|
||||||
None,
|
None,
|
||||||
TEST_ROOT2_KEY,
|
TEST_ROOT2_KEY,
|
||||||
TEST_ROOT2,
|
|
||||||
);
|
);
|
||||||
let cert2 =
|
let cert2 =
|
||||||
WorkloadCertificate::new(key.as_bytes(), cert.as_bytes(), vec![&joined]).unwrap();
|
WorkloadCertificate::new(key.as_bytes(), cert.as_bytes(), vec![&joined]).unwrap();
|
||||||
|
|
|
@ -18,7 +18,6 @@ use std::fmt::{Display, Formatter};
|
||||||
use rand::RngCore;
|
use rand::RngCore;
|
||||||
use rand::SeedableRng;
|
use rand::SeedableRng;
|
||||||
use rand::rngs::SmallRng;
|
use rand::rngs::SmallRng;
|
||||||
use rcgen::{Certificate, CertificateParams, KeyPair};
|
|
||||||
use std::net::IpAddr;
|
use std::net::IpAddr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::{Duration, SystemTime};
|
use std::time::{Duration, SystemTime};
|
||||||
|
@ -105,8 +104,7 @@ pub fn generate_test_certs_at(
|
||||||
not_after: SystemTime,
|
not_after: SystemTime,
|
||||||
rng: Option<&mut dyn rand::RngCore>,
|
rng: Option<&mut dyn rand::RngCore>,
|
||||||
) -> WorkloadCertificate {
|
) -> WorkloadCertificate {
|
||||||
let (key, cert) =
|
let (key, cert) = generate_test_certs_with_root(id, not_before, not_after, rng, TEST_ROOT_KEY);
|
||||||
generate_test_certs_with_root(id, not_before, not_after, rng, TEST_ROOT_KEY, TEST_ROOT);
|
|
||||||
let mut workload =
|
let mut workload =
|
||||||
WorkloadCertificate::new(key.as_bytes(), cert.as_bytes(), vec![TEST_ROOT]).unwrap();
|
WorkloadCertificate::new(key.as_bytes(), cert.as_bytes(), vec![TEST_ROOT]).unwrap();
|
||||||
// Certificates do not allow sub-millisecond, but we need this for tests.
|
// Certificates do not allow sub-millisecond, but we need this for tests.
|
||||||
|
@ -121,7 +119,6 @@ pub fn generate_test_certs_with_root(
|
||||||
not_after: SystemTime,
|
not_after: SystemTime,
|
||||||
rng: Option<&mut dyn rand::RngCore>,
|
rng: Option<&mut dyn rand::RngCore>,
|
||||||
ca_key: &[u8],
|
ca_key: &[u8],
|
||||||
ca_cert: &[u8],
|
|
||||||
) -> (String, String) {
|
) -> (String, String) {
|
||||||
use rcgen::*;
|
use rcgen::*;
|
||||||
let serial_number = {
|
let serial_number = {
|
||||||
|
@ -150,15 +147,17 @@ pub fn generate_test_certs_with_root(
|
||||||
ExtendedKeyUsagePurpose::ClientAuth,
|
ExtendedKeyUsagePurpose::ClientAuth,
|
||||||
];
|
];
|
||||||
p.subject_alt_names = vec![match id {
|
p.subject_alt_names = vec![match id {
|
||||||
TestIdentity::Identity(i) => SanType::URI(Ia5String::try_from(i.to_string()).unwrap()),
|
TestIdentity::Identity(i) => {
|
||||||
|
SanType::URI(string::Ia5String::try_from(i.to_string()).unwrap())
|
||||||
|
}
|
||||||
TestIdentity::Ip(i) => SanType::IpAddress(*i),
|
TestIdentity::Ip(i) => SanType::IpAddress(*i),
|
||||||
}];
|
}];
|
||||||
|
|
||||||
let kp = KeyPair::from_pem(std::str::from_utf8(TEST_PKEY).unwrap()).unwrap();
|
let kp = KeyPair::from_pem(std::str::from_utf8(TEST_PKEY).unwrap()).unwrap();
|
||||||
let ca_kp = KeyPair::from_pem(std::str::from_utf8(ca_key).unwrap()).unwrap();
|
let ca_kp = KeyPair::from_pem(std::str::from_utf8(ca_key).unwrap()).unwrap();
|
||||||
let key = kp.serialize_pem();
|
let key = kp.serialize_pem();
|
||||||
let ca = test_ca(ca_key, ca_cert);
|
let issuer = Issuer::from_params(&p, &ca_kp);
|
||||||
let cert = p.signed_by(&kp, &ca, &ca_kp).unwrap();
|
let cert = p.signed_by(&kp, &issuer).unwrap();
|
||||||
let cert = cert.pem();
|
let cert = cert.pem();
|
||||||
(key, cert)
|
(key, cert)
|
||||||
}
|
}
|
||||||
|
@ -172,12 +171,6 @@ pub fn generate_test_certs(
|
||||||
generate_test_certs_at(id, not_before, not_before + duration_until_expiry, None)
|
generate_test_certs_at(id, not_before, not_before + duration_until_expiry, None)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn test_ca(key: &[u8], cert: &[u8]) -> Certificate {
|
|
||||||
let key = KeyPair::from_pem(std::str::from_utf8(key).unwrap()).unwrap();
|
|
||||||
let ca_param = CertificateParams::from_ca_cert_pem(std::str::from_utf8(cert).unwrap()).unwrap();
|
|
||||||
ca_param.self_signed(&key).unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct MockServerCertProvider(Arc<WorkloadCertificate>);
|
pub struct MockServerCertProvider(Arc<WorkloadCertificate>);
|
||||||
|
|
||||||
|
|
|
@ -876,7 +876,7 @@ mod tests {
|
||||||
|
|
||||||
fn get_auth(i: usize) -> ProtoResource {
|
fn get_auth(i: usize) -> ProtoResource {
|
||||||
let addr = XdsAuthorization {
|
let addr = XdsAuthorization {
|
||||||
name: format!("foo{}", i),
|
name: format!("foo{i}"),
|
||||||
namespace: "default".to_string(),
|
namespace: "default".to_string(),
|
||||||
scope: crate::xds::istio::security::Scope::Global as i32,
|
scope: crate::xds::istio::security::Scope::Global as i32,
|
||||||
action: crate::xds::istio::security::Action::Deny as i32,
|
action: crate::xds::istio::security::Action::Deny as i32,
|
||||||
|
@ -890,7 +890,7 @@ mod tests {
|
||||||
}],
|
}],
|
||||||
};
|
};
|
||||||
ProtoResource {
|
ProtoResource {
|
||||||
name: format!("foo{}", i),
|
name: format!("foo{i}"),
|
||||||
aliases: vec![],
|
aliases: vec![],
|
||||||
version: "0.0.1".to_string(),
|
version: "0.0.1".to_string(),
|
||||||
resource: Some(Any {
|
resource: Some(Any {
|
||||||
|
@ -908,8 +908,8 @@ mod tests {
|
||||||
};
|
};
|
||||||
let addr = XdsAddress {
|
let addr = XdsAddress {
|
||||||
r#type: Some(XdsType::Workload(XdsWorkload {
|
r#type: Some(XdsType::Workload(XdsWorkload {
|
||||||
name: format!("foo{}", i),
|
name: format!("foo{i}"),
|
||||||
uid: format!("default/foo{}", i),
|
uid: format!("default/foo{i}"),
|
||||||
namespace: "default".to_string(),
|
namespace: "default".to_string(),
|
||||||
addresses: vec![octets.into()],
|
addresses: vec![octets.into()],
|
||||||
tunnel_protocol: 0,
|
tunnel_protocol: 0,
|
||||||
|
@ -924,7 +924,7 @@ mod tests {
|
||||||
};
|
};
|
||||||
|
|
||||||
ProtoResource {
|
ProtoResource {
|
||||||
name: format!("foo{}", i),
|
name: format!("foo{i}"),
|
||||||
aliases: vec![],
|
aliases: vec![],
|
||||||
version: "0.0.1".to_string(),
|
version: "0.0.1".to_string(),
|
||||||
resource: Some(Any {
|
resource: Some(Any {
|
||||||
|
|
|
@ -282,8 +282,7 @@ fn on_demand_dns_assertions(metrics: ParsedMetrics) {
|
||||||
};
|
};
|
||||||
assert!(
|
assert!(
|
||||||
value == expected,
|
value == expected,
|
||||||
"expected metric {metric} to be 1, was {:?}",
|
"expected metric {metric} to be 1, was {value:?}",
|
||||||
value
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -361,9 +360,9 @@ async fn test_stats_exist() {
|
||||||
{
|
{
|
||||||
for (name, doc) in metric_info {
|
for (name, doc) in metric_info {
|
||||||
if stable_metrics.contains(&*name) {
|
if stable_metrics.contains(&*name) {
|
||||||
assert!(!doc.contains("unstable"), "{}: {}", name, doc);
|
assert!(!doc.contains("unstable"), "{name}: {doc}");
|
||||||
} else {
|
} else {
|
||||||
assert!(doc.contains("unstable"), "{}: {}", name, doc);
|
assert!(doc.contains("unstable"), "{name}: {doc}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1430,7 +1430,7 @@ mod namespaced {
|
||||||
// Use the actual metrics address ztunnel is listening on (e.g., [::]:15020)
|
// Use the actual metrics address ztunnel is listening on (e.g., [::]:15020)
|
||||||
// but combine it with the node IP for the client to target.
|
// but combine it with the node IP for the client to target.
|
||||||
let target_metrics_addr = SocketAddr::new(ztunnel_node_ip, zt.metrics_address.port());
|
let target_metrics_addr = SocketAddr::new(ztunnel_node_ip, zt.metrics_address.port());
|
||||||
let target_metrics_url = format!("http://{}/metrics", target_metrics_addr);
|
let target_metrics_url = format!("http://{target_metrics_addr}/metrics");
|
||||||
|
|
||||||
// Deploy a client workload (simulating Prometheus)
|
// Deploy a client workload (simulating Prometheus)
|
||||||
let client = manager
|
let client = manager
|
||||||
|
@ -1469,8 +1469,7 @@ mod namespaced {
|
||||||
|
|
||||||
assert!(
|
assert!(
|
||||||
response_str.contains("# TYPE"),
|
response_str.contains("# TYPE"),
|
||||||
"Expected Prometheus metrics (# TYPE) in response, got:\n{}",
|
"Expected Prometheus metrics (# TYPE) in response, got:\n{response_str}",
|
||||||
response_str
|
|
||||||
);
|
);
|
||||||
info!("Successfully verified metrics response body");
|
info!("Successfully verified metrics response body");
|
||||||
|
|
||||||
|
@ -1487,8 +1486,8 @@ mod namespaced {
|
||||||
verify_metrics(&zt, &metrics, &destination_labels()).await;
|
verify_metrics(&zt, &metrics, &destination_labels()).await;
|
||||||
|
|
||||||
// Verify INBOUND telemetry log for the metrics connection
|
// Verify INBOUND telemetry log for the metrics connection
|
||||||
let dst_addr_log = format!("{}:15008", ztunnel_node_ip);
|
let dst_addr_log = format!("{ztunnel_node_ip}:15008");
|
||||||
let dst_hbone_addr_log = format!("{}", target_metrics_addr);
|
let dst_hbone_addr_log = format!("{target_metrics_addr}");
|
||||||
|
|
||||||
// We don't know exact byte counts, so omit them from the check for now
|
// We don't know exact byte counts, so omit them from the check for now
|
||||||
let want = HashMap::from([
|
let want = HashMap::from([
|
||||||
|
|
Loading…
Reference in New Issue