diff --git a/Cargo.lock b/Cargo.lock index 1b84532..496c2e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2539,7 +2539,8 @@ dependencies = [ [[package]] name = "provider-archive" version = "0.9.0" -source = "git+https://github.com/wasmcloud/wasmcloud.git?tag=wash-cli-v0.27.0-alpha.2#f6e5f0e804d4a7eced93778b739bf58c30ad75e7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d917297b393db1e9c8362cd9809aaaa91dd60870b24aa1f139e52b9b191c25a" dependencies = [ "async-compression", "data-encoding", @@ -2548,7 +2549,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-tar", - "wascap 0.13.0 (git+https://github.com/wasmcloud/wasmcloud.git?tag=wash-cli-v0.27.0-alpha.2)", + "wascap", ] [[package]] @@ -4045,9 +4046,9 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" [[package]] name = "wadm" -version = "0.11.0-alpha.2" +version = "0.11.0-alpha.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86a2fc81038e2d2a90c30a49237973224ee76ed0b11d0d714d027798ad5eb565" +checksum = "8a31e7c622ddb533044d444bed6ac34a87d885e14a9f847563218cb2111c0512" dependencies = [ "anyhow", "async-nats", @@ -4073,7 +4074,7 @@ dependencies = [ "tracing", "tracing-futures", "uuid", - "wasmcloud-control-interface 1.0.0-alpha.3 (registry+https://github.com/rust-lang/crates.io-index)", + "wasmcloud-control-interface", ] [[package]] @@ -4114,27 +4115,11 @@ dependencies = [ "wasmparser 0.121.2", ] -[[package]] -name = "wascap" -version = "0.13.0" -source = "git+https://github.com/wasmcloud/wasmcloud.git?tag=wash-cli-v0.27.0-alpha.2#f6e5f0e804d4a7eced93778b739bf58c30ad75e7" -dependencies = [ - "data-encoding", - "humantime", - "nkeys", - "nuid 0.4.1", - "ring 0.17.5", - "serde", - "serde_json", - "wasm-encoder 0.202.0", - "wasm-gen", - "wasmparser 0.202.0", -] - [[package]] name = "wash-lib" -version = "0.20.0-alpha.1" -source = "git+https://github.com/wasmcloud/wasmcloud.git?tag=wash-cli-v0.27.0-alpha.2#f6e5f0e804d4a7eced93778b739bf58c30ad75e7" +version = "0.20.0-alpha.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecd215ea07875aa4fdc8fa3a65131d3085535e154ac7bd531826c46ddc4c934f" dependencies = [ "anyhow", "async-compression", @@ -4174,10 +4159,10 @@ dependencies = [ "url", "wadm", "walkdir", - "wascap 0.13.0 (git+https://github.com/wasmcloud/wasmcloud.git?tag=wash-cli-v0.27.0-alpha.2)", + "wascap", "wasm-encoder 0.202.0", "wasmcloud-component-adapters", - "wasmcloud-control-interface 1.0.0-alpha.3 (git+https://github.com/wasmcloud/wasmcloud.git?tag=wash-cli-v0.27.0-alpha.2)", + "wasmcloud-control-interface", "wasmcloud-core 0.5.0", "wasmparser 0.202.0", "wat", @@ -4369,28 +4354,6 @@ dependencies = [ "wasmcloud-core 0.4.0", ] -[[package]] -name = "wasmcloud-control-interface" -version = "1.0.0-alpha.3" -source = "git+https://github.com/wasmcloud/wasmcloud.git?tag=wash-cli-v0.27.0-alpha.2#f6e5f0e804d4a7eced93778b739bf58c30ad75e7" -dependencies = [ - "anyhow", - "async-nats", - "async-trait", - "bytes", - "cloudevents-sdk", - "futures", - "oci-distribution", - "opentelemetry 0.21.0", - "opentelemetry_sdk 0.21.2", - "serde", - "serde_json", - "tokio", - "tracing", - "tracing-opentelemetry 0.22.0", - "wasmcloud-core 0.5.0", -] - [[package]] name = "wasmcloud-core" version = "0.4.0" @@ -4414,7 +4377,7 @@ dependencies = [ "tracing", "ulid", "uuid", - "wascap 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)", + "wascap", "wrpc-transport 0.22.0", "wrpc-transport-nats 0.19.0", ] @@ -4422,7 +4385,8 @@ dependencies = [ [[package]] name = "wasmcloud-core" version = "0.5.0" -source = "git+https://github.com/wasmcloud/wasmcloud.git?tag=wash-cli-v0.27.0-alpha.2#f6e5f0e804d4a7eced93778b739bf58c30ad75e7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a646a51b5c4410c6bd93aecbcc98f24921c88712de4549ab64617d88fb4acb88" dependencies = [ "anyhow", "async-nats", @@ -4444,14 +4408,14 @@ dependencies = [ "tracing", "ulid", "uuid", - "wascap 0.13.0 (git+https://github.com/wasmcloud/wasmcloud.git?tag=wash-cli-v0.27.0-alpha.2)", + "wascap", "wrpc-transport 0.24.2", "wrpc-transport-nats 0.21.0", ] [[package]] name = "wasmcloud-operator" -version = "0.2.0" +version = "0.2.0-alpha.1" dependencies = [ "anyhow", "async-nats", diff --git a/Cargo.toml b/Cargo.toml index 780789d..adc4d5a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "wasmcloud-operator" -version = "0.2.0" +version = "0.2.0-alpha.1" edition = "2021" [[bin]] @@ -21,53 +21,60 @@ path = "src/lib.rs" edition = "2021" [dependencies] -async-nats = {workspace = true} -axum = {workspace = true} -axum-server = {workspace = true} -anyhow = {workspace = true} -ctrlc = {workspace = true} -cloudevents-sdk = {workspace = true} -config = {workspace = true} -futures = {workspace = true} -handlebars = {workspace = true} -json-patch = {workspace = true} -k8s-openapi = {workspace = true, features = ["v1_28", "schemars"]} -kube = {workspace = true, features = ["runtime", "derive", "default"]} -opentelemetry = {workspace = true} -opentelemetry-otlp = {workspace = true} -rcgen = {workspace = true} -schemars = {workspace = true} -secrecy = {workspace = true} -serde = {workspace = true} -serde_json = {workspace = true} -serde_yaml = {workspace = true} -thiserror = {workspace = true} -time = {workspace = true} -tokio = {workspace = true} -tokio-util = {workspace = true} -tracing = {workspace = true} -tracing-opentelemetry = {workspace = true} -tracing-subscriber = {workspace = true} -utoipa = {workspace = true} -uuid = {workspace = true} -wadm = {workspace = true} -wash-lib = {workspace = true} -wasmcloud-operator-types = {workspace = true} +async-nats = { workspace = true } +axum = { workspace = true } +axum-server = { workspace = true } +anyhow = { workspace = true } +ctrlc = { workspace = true } +cloudevents-sdk = { workspace = true } +config = { workspace = true } +futures = { workspace = true } +handlebars = { workspace = true } +json-patch = { workspace = true } +k8s-openapi = { workspace = true, features = ["v1_28", "schemars"] } +kube = { workspace = true, features = ["runtime", "derive", "default"] } +opentelemetry = { workspace = true } +opentelemetry-otlp = { workspace = true } +rcgen = { workspace = true } +schemars = { workspace = true } +secrecy = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +serde_yaml = { workspace = true } +thiserror = { workspace = true } +time = { workspace = true } +tokio = { workspace = true } +tokio-util = { workspace = true } +tracing = { workspace = true } +tracing-opentelemetry = { workspace = true } +tracing-subscriber = { workspace = true } +utoipa = { workspace = true } +uuid = { workspace = true } +wadm = { workspace = true } +wash-lib = { workspace = true } +wasmcloud-operator-types = { workspace = true } [workspace.dependencies] async-nats = "0.33" axum = { version = "0.6", features = ["headers"] } axum-server = { version = "0.4", features = ["tls-rustls"] } anyhow = "1" -config = {version = "0.14", default-features = false, features = ["convert-case", "async"]} +config = { version = "0.14", default-features = false, features = [ + "convert-case", + "async", +] } cloudevents-sdk = "0.7" ctrlc = "3" futures = "0.3" handlebars = "5.1" json-patch = "1.2.0" -k8s-openapi = { version = "0.20", default-features = false} -kube = { version = "0.87", default-features = false} -opentelemetry = { version = "0.20", features = ["metrics", "trace", "rt-tokio"] } +k8s-openapi = { version = "0.20", default-features = false } +kube = { version = "0.87", default-features = false } +opentelemetry = { version = "0.20", features = [ + "metrics", + "trace", + "rt-tokio", +] } opentelemetry-otlp = { version = "0.13", features = ["tokio"] } rcgen = "0.11" schemars = "0.8" @@ -84,14 +91,12 @@ tracing-opentelemetry = "0.20" tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } utoipa = { version = "4.1", features = ["axum_extras"] } uuid = { version = "1", features = ["v5"] } -wadm = "0.11.0-alpha.2" -wash-lib = { git = "https://github.com/wasmcloud/wasmcloud.git",tag = "wash-cli-v0.27.0-alpha.2" } -wasmcloud-operator-types = { version="*", path = "./crates/types" } +wadm = "0.11.0-alpha.3" +wash-lib = "0.20.0-alpha.2" +wasmcloud-operator-types = { version = "*", path = "./crates/types" } [workspace] -members = [ - "crates/*" -] +members = ["crates/*"] resolver = "2" [profile.release] diff --git a/LICENSE b/LICENSE index d645695..083dacd 100644 --- a/LICENSE +++ b/LICENSE @@ -187,7 +187,7 @@ same "printed page" as the copyright notice for easier identification within third-party archives. - Copyright [yyyy] [name of copyright owner] + Copyright 2024 wasmCloud Maintainers Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/sample.yaml b/sample.yaml index 514a805..2628b74 100644 --- a/sample.yaml +++ b/sample.yaml @@ -13,7 +13,7 @@ spec: hostLabels: test: value # Which wasmCloud version to use - version: 1.0.0-rc.1 + version: 1.0.0-rc.2 # The name of a secret in the same namespace that provides the required secrets. secretName: cluster-secrets logLevel: INFO @@ -22,19 +22,19 @@ spec: # This includes setting resource requirements for the nats and wasmCloud host # containers along with any additional pot template settings. #schedulingOptions: - # Enable the following to run the wasmCloud hosts as a DaemonSet - #daemonset: true - # Set the resource requirements for the nats and wasmCloud host containers. - #resources: - # nats: - # requests: - # cpu: 100m - # wasmCloudHost: - # requests: - # cpu: 100m - # Any additional pod template settings to apply to the wasmCloud host pods. - # See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.29/#podspec-v1-core for all valid options. - # Note that you *cannot* set the `containers` field here as it is managed by the controller. - #pod_template_additions: - # nodeSelector: - # kubernetes.io/os: linux + # Enable the following to run the wasmCloud hosts as a DaemonSet + #daemonset: true + # Set the resource requirements for the nats and wasmCloud host containers. + #resources: + # nats: + # requests: + # cpu: 100m + # wasmCloudHost: + # requests: + # cpu: 100m + # Any additional pod template settings to apply to the wasmCloud host pods. + # See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.29/#podspec-v1-core for all valid options. + # Note that you *cannot* set the `containers` field here as it is managed by the controller. + #pod_template_additions: + # nodeSelector: + # kubernetes.io/os: linux diff --git a/src/resources/application.rs b/src/resources/application.rs index 59fab2e..1d9cc46 100644 --- a/src/resources/application.rs +++ b/src/resources/application.rs @@ -279,31 +279,12 @@ pub async fn create_application( Err(e) => return internal_error(anyhow!("Unable to list cosmonic host configs: {}", e)), }; - let mut cfgs = cfgs.iter(); // TODO(joonas): Remove this once we move to pulling NATS creds+secrets from lattice instead of hosts. - let mut lattice_id = String::new(); - let nats_client = loop { - let Some(cfg) = cfgs.next() else { - return internal_error(anyhow!( - "unable to find Host Config to use for initializing nats client " - )); + let (nats_client, lattice_id) = + match get_lattice_connection(cfgs.into_iter(), state, namespace).await { + Ok(data) => data, + Err(resp) => return resp, }; - let cluster_url = cfg.spec.nats_address.clone(); - lattice_id = cfg.spec.lattice.clone(); - let lattice_name = cfg.metadata.name.clone().unwrap(); - let nst: NameNamespace = NameNamespace::new(lattice_name.clone(), namespace.clone()); - match get_client(&cluster_url, state.nats_creds.clone(), nst).await { - Ok(c) => break Some(c), - Err(e) => { - error!("error connecting to nats: {}", e); - continue; - } - }; - }; - - let Some(nats_client) = nats_client else { - return internal_error(anyhow!("unable to initialize nats client")); - }; let model: serde_json::Value = match serde_json::from_slice(&body) { Ok(v) => v, @@ -539,31 +520,12 @@ pub async fn get_application( Err(e) => return internal_error(anyhow!("unable to list cosmonic host configs: {}", e)), }; - let mut cfgs = cfgs.iter(); // TODO(joonas): Remove this once we move to pulling NATS creds+secrets from lattice instead of hosts. - let mut lattice_id = String::new(); - let nats_client = loop { - let Some(cfg) = cfgs.next() else { - return internal_error(anyhow!( - "unable to find Host Config to use for initializing nats client " - )); + let (nats_client, lattice_id) = + match get_lattice_connection(cfgs.into_iter(), state, namespace.clone()).await { + Ok(data) => data, + Err(resp) => return resp, }; - let cluster_url = cfg.spec.nats_address.clone(); - lattice_id = cfg.spec.lattice.clone(); - let lattice_name = cfg.metadata.name.clone().unwrap(); - let nst: NameNamespace = NameNamespace::new(lattice_name.clone(), namespace.clone()); - match get_client(&cluster_url, state.nats_creds.clone(), nst).await { - Ok(c) => break Some(c), - Err(e) => { - error!("error connecting to nats: {}", e); - continue; - } - }; - }; - - let Some(nats_client) = nats_client else { - return internal_error(anyhow!("unable to initialize nats client")); - }; let get = match wash_lib::app::get_model_details(&nats_client, Some(lattice_id.clone()), &name, None) @@ -662,31 +624,12 @@ pub async fn patch_application( Err(e) => return internal_error(anyhow!("unable to list cosmonic host configs: {}", e)), }; - let mut cfgs = cfgs.iter(); // TODO(joonas): Remove this once we move to pulling NATS creds+secrets from lattice instead of hosts. - let mut lattice_id = String::new(); - let nats_client = loop { - let Some(cfg) = cfgs.next() else { - return internal_error(anyhow!( - "unable to find Host Config to use for initializing nats client " - )); + let (nats_client, lattice_id) = + match get_lattice_connection(cfgs.into_iter(), state, namespace).await { + Ok(data) => data, + Err(resp) => return resp, }; - let cluster_url = cfg.spec.nats_address.clone(); - lattice_id = cfg.spec.lattice.clone(); - let lattice_name = cfg.metadata.name.clone().unwrap(); - let nst: NameNamespace = NameNamespace::new(lattice_name.clone(), namespace.clone()); - match get_client(&cluster_url, state.nats_creds.clone(), nst).await { - Ok(c) => break Some(c), - Err(e) => { - error!("error connecting to nats: {}", e); - continue; - } - }; - }; - - let Some(nats_client) = nats_client else { - return internal_error(anyhow!("unable to initialize nats client")); - }; // Fist, check if the model exists. // TODO(joonas): we should likely fetch the version of the manifest that's running in Kubernetes @@ -791,31 +734,12 @@ pub async fn delete_application( Err(e) => return internal_error(anyhow!("unable to list cosmonic host configs: {}", e)), }; - let mut cfgs = cfgs.iter(); // TODO(joonas): Remove this once we move to pulling NATS creds+secrets from lattice instead of hosts. - let mut lattice_id = String::new(); - let nats_client = loop { - let Some(cfg) = cfgs.next() else { - return internal_error(anyhow!( - "unable to find Host Config to use for initializing nats client " - )); + let (nats_client, lattice_id) = + match get_lattice_connection(cfgs.into_iter(), state, namespace).await { + Ok(data) => data, + Err(resp) => return resp, }; - let cluster_url = cfg.spec.nats_address.clone(); - lattice_id = cfg.spec.lattice.clone(); - let lattice_name = cfg.metadata.name.clone().unwrap(); - let nst: NameNamespace = NameNamespace::new(lattice_name.clone(), namespace.clone()); - match get_client(&cluster_url, state.nats_creds.clone(), nst).await { - Ok(c) => break Some(c), - Err(e) => { - error!("error connecting to nats: {}", e); - continue; - } - }; - }; - - let Some(nats_client) = nats_client else { - return internal_error(anyhow!("unable to initialize nats client")); - }; // Fist, check if the model exists. // TODO(joonas): Replace this with wash_lib::app::get_model_status once @@ -872,3 +796,32 @@ pub async fn delete_application( )), } } + +async fn get_lattice_connection( + cfgs: impl Iterator, + state: State, + namespace: String, +) -> Result<(async_nats::Client, String), Response> { + let connection_data = + cfgs.map(|cfg| (cfg, namespace.clone())) + .filter_map(|(cfg, namespace)| { + let cluster_url = cfg.spec.nats_address; + let lattice_id = cfg.spec.lattice; + let lattice_name = cfg.metadata.name?; + let nst: NameNamespace = NameNamespace::new(lattice_name, namespace); + Some((cluster_url, nst, lattice_id)) + }); + + for (cluster_url, ns, lattice_id) in connection_data { + match get_client(&cluster_url, state.nats_creds.clone(), ns).await { + Ok(c) => return Ok((c, lattice_id)), + Err(e) => { + error!(err = %e, %lattice_id, "error connecting to nats"); + continue; + } + }; + } + + // If we get here, we couldn't get a NATS client, so return an error + Err(internal_error(anyhow!("unable to initialize nats client"))) +} diff --git a/src/services.rs b/src/services.rs index ca8dd2d..441e41a 100644 --- a/src/services.rs +++ b/src/services.rs @@ -1,7 +1,7 @@ -use crate::controller::{ - common_labels, CLUSTER_CONFIG_FINALIZER, SERVICE_FINALIZER, - WASMCLOUD_OPERATOR_HOST_LABEL_PREFIX, WASMCLOUD_OPERATOR_MANAGED_BY_LABEL_REQUIREMENT, -}; +use std::collections::{BTreeMap, HashMap, HashSet}; +use std::net::SocketAddr; +use std::sync::Arc; + use anyhow::Result; use async_nats::{ jetstream, @@ -22,9 +22,6 @@ use kube::{ client::Client as KubeClient, Resource, }; -use std::collections::{BTreeMap, HashMap, HashSet}; -use std::net::SocketAddr; -use std::sync::Arc; use tokio::sync::{mpsc, RwLock}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, warn}; @@ -36,6 +33,11 @@ use wadm::{ use wash_lib::app; use wasmcloud_operator_types::v1alpha1::WasmCloudHostConfig; +use crate::controller::{ + common_labels, CLUSTER_CONFIG_FINALIZER, SERVICE_FINALIZER, + WASMCLOUD_OPERATOR_HOST_LABEL_PREFIX, WASMCLOUD_OPERATOR_MANAGED_BY_LABEL_REQUIREMENT, +}; + const CONSUMER_PREFIX: &str = "wasmcloud_operator_service"; // This should probably be exposed by wadm somewhere const WADM_EVT_SUBJECT: &str = "wadm.evt"; @@ -105,10 +107,10 @@ impl Watcher { tokio::spawn(async move { tokio::select! { _ = watcher_dup.shutdown.cancelled() => { - debug!("Service watcher shutting down for lattice {}", lattice_id); + debug!(%lattice_id, "Service watcher shutting down for lattice"); } _ = watcher_dup.watch_events(&watcher_dup.consumer) => { - error!("Service watcher for lattice {} has stopped", lattice_id); + error!(%lattice_id, "Service watcher for lattice has stopped"); } } }); @@ -126,7 +128,7 @@ impl Watcher { .ack() .await .map_err(|e| { - error!(error=%e, "Error acking message"); + error!(err=%e, "Error acking message"); e }) .ok(), @@ -134,7 +136,7 @@ impl Watcher { .ack_with(AckKind::Nak(None)) .await .map_err(|e| { - error!(error=%e, "Error nacking message"); + error!(err=%e, "Error nacking message"); e }) .ok(), @@ -152,7 +154,7 @@ impl Watcher { Ok(evt) => evt, Err(e) => { warn!( - error=%e, + err=%e, event_type=%event.ty(), "Error converting cloudevent to wadm event", ); @@ -185,7 +187,7 @@ impl Watcher { let manifest = mp.manifest; if let Some(httpserver_service) = http_server_component(&manifest) { if let Ok(addr) = httpserver_service.address.parse::() { - debug!("Upserting service for manifest: {}", manifest.metadata.name); + debug!(manifest = %manifest.metadata.name, "Upserting service for manifest"); self.tx .send(WatcherCommand::UpsertService(ServiceParams { name: manifest.metadata.name.clone(), @@ -239,23 +241,23 @@ impl ServiceWatcher { WatcherCommand::UpsertService(params) => { create_or_update_service(client.clone(), ¶ms, None) .await - .map_err(|e| error!(error=%e, "Error creating/updating service")) + .map_err(|e| error!(err=%e, "Error creating/updating service")) .ok(); } WatcherCommand::RemoveService { name, namespaces } => { for namespace in namespaces { delete_service(client.clone(), &namespace, name.as_str()) .await - .map_err(|e| error!(error=%e, namespace=namespace, "Error deleting service")) + .map_err(|e| error!(err=%e, %namespace, "Error deleting service")) .ok(); } } WatcherCommand::RemoveServices { namespaces } => { for namespace in namespaces { delete_services(client.clone(), namespace.as_str()) - .await - .map_err(|e| error!(error=%e, namespace=namespace, "Error deleting service")) - .ok(); + .await + .map_err(|e| error!(err=%e, %namespace, "Error deleting service")) + .ok(); } } } @@ -291,10 +293,10 @@ impl ServiceWatcher { // all let _ = watcher.handle_manifest_published(ManifestPublished { manifest: model.manifest.unwrap(), - }).map_err(|e| error!(error=%e, lattice_id=%lattice_id, app=app.name, "failed to trigger service reconciliation for app")); + }).map_err(|e| error!(err = %e, %lattice_id, app = %app.name, "failed to trigger service reconciliation for app")); } } - Err(e) => warn!(error=%e, "Unable to retrieve model"), + Err(e) => warn!(err=%e, "Unable to retrieve model"), }; } }; @@ -404,9 +406,11 @@ pub async fn create_or_update_service( "app.kubernetes.io/name".to_string(), "wasmcloud".to_string(), ); - host_labels.iter().for_each(|(k, v)| { - selector.insert(format_service_selector(k), v.clone()); - }) + selector.extend( + host_labels + .iter() + .map(|(k, v)| (format_service_selector(k), v.clone())), + ); } else { create_endpoints = true; } @@ -439,7 +443,7 @@ pub async fn create_or_update_service( svc.metadata.owner_references = Some(vec![owner_ref.clone()]); } - debug!(service =? svc, namespace=namespace, "Creating/updating service"); + debug!(service =? svc, %namespace, "Creating/updating service"); let svc = api .patch( @@ -449,7 +453,7 @@ pub async fn create_or_update_service( ) .await .map_err(|e| { - error!("Error creating/updating service: {}", e); + error!(err = %e, "Error creating/updating service"); e })?; @@ -473,15 +477,19 @@ pub async fn create_or_update_service( ..Default::default() }) .await?; - for pod in pods { - if let Some(status) = pod.status { - if status.phase == Some("Running".to_string()) { - if let Some(pod_ips) = status.pod_ips { - ips.extend(pod_ips); + let pod_ips = pods + .into_iter() + .filter_map(|pod| { + pod.status.and_then(|status| { + if status.phase == Some("Running".to_string()) { + status.pod_ips + } else { + None } - } - } - } + }) + }) + .flatten(); + ips.extend(pod_ips); } }