ref(*): Refactors some copied code and bumps version

This fixes a few Rust-y things and dedupes some code I found while
addressing clippy errors

Signed-off-by: Taylor Thomas <taylor@cosmonic.com>
This commit is contained in:
Taylor Thomas 2024-04-13 13:52:40 -06:00
parent 9a9a4a8f97
commit 9eb70ec23a
6 changed files with 168 additions and 238 deletions

68
Cargo.lock generated
View File

@ -2539,7 +2539,8 @@ dependencies = [
[[package]] [[package]]
name = "provider-archive" name = "provider-archive"
version = "0.9.0" version = "0.9.0"
source = "git+https://github.com/wasmcloud/wasmcloud.git?tag=wash-cli-v0.27.0-alpha.2#f6e5f0e804d4a7eced93778b739bf58c30ad75e7" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d917297b393db1e9c8362cd9809aaaa91dd60870b24aa1f139e52b9b191c25a"
dependencies = [ dependencies = [
"async-compression", "async-compression",
"data-encoding", "data-encoding",
@ -2548,7 +2549,7 @@ dependencies = [
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tokio-tar", "tokio-tar",
"wascap 0.13.0 (git+https://github.com/wasmcloud/wasmcloud.git?tag=wash-cli-v0.27.0-alpha.2)", "wascap",
] ]
[[package]] [[package]]
@ -4045,9 +4046,9 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]] [[package]]
name = "wadm" name = "wadm"
version = "0.11.0-alpha.2" version = "0.11.0-alpha.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86a2fc81038e2d2a90c30a49237973224ee76ed0b11d0d714d027798ad5eb565" checksum = "8a31e7c622ddb533044d444bed6ac34a87d885e14a9f847563218cb2111c0512"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-nats", "async-nats",
@ -4073,7 +4074,7 @@ dependencies = [
"tracing", "tracing",
"tracing-futures", "tracing-futures",
"uuid", "uuid",
"wasmcloud-control-interface 1.0.0-alpha.3 (registry+https://github.com/rust-lang/crates.io-index)", "wasmcloud-control-interface",
] ]
[[package]] [[package]]
@ -4114,27 +4115,11 @@ dependencies = [
"wasmparser 0.121.2", "wasmparser 0.121.2",
] ]
[[package]]
name = "wascap"
version = "0.13.0"
source = "git+https://github.com/wasmcloud/wasmcloud.git?tag=wash-cli-v0.27.0-alpha.2#f6e5f0e804d4a7eced93778b739bf58c30ad75e7"
dependencies = [
"data-encoding",
"humantime",
"nkeys",
"nuid 0.4.1",
"ring 0.17.5",
"serde",
"serde_json",
"wasm-encoder 0.202.0",
"wasm-gen",
"wasmparser 0.202.0",
]
[[package]] [[package]]
name = "wash-lib" name = "wash-lib"
version = "0.20.0-alpha.1" version = "0.20.0-alpha.2"
source = "git+https://github.com/wasmcloud/wasmcloud.git?tag=wash-cli-v0.27.0-alpha.2#f6e5f0e804d4a7eced93778b739bf58c30ad75e7" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ecd215ea07875aa4fdc8fa3a65131d3085535e154ac7bd531826c46ddc4c934f"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-compression", "async-compression",
@ -4174,10 +4159,10 @@ dependencies = [
"url", "url",
"wadm", "wadm",
"walkdir", "walkdir",
"wascap 0.13.0 (git+https://github.com/wasmcloud/wasmcloud.git?tag=wash-cli-v0.27.0-alpha.2)", "wascap",
"wasm-encoder 0.202.0", "wasm-encoder 0.202.0",
"wasmcloud-component-adapters", "wasmcloud-component-adapters",
"wasmcloud-control-interface 1.0.0-alpha.3 (git+https://github.com/wasmcloud/wasmcloud.git?tag=wash-cli-v0.27.0-alpha.2)", "wasmcloud-control-interface",
"wasmcloud-core 0.5.0", "wasmcloud-core 0.5.0",
"wasmparser 0.202.0", "wasmparser 0.202.0",
"wat", "wat",
@ -4369,28 +4354,6 @@ dependencies = [
"wasmcloud-core 0.4.0", "wasmcloud-core 0.4.0",
] ]
[[package]]
name = "wasmcloud-control-interface"
version = "1.0.0-alpha.3"
source = "git+https://github.com/wasmcloud/wasmcloud.git?tag=wash-cli-v0.27.0-alpha.2#f6e5f0e804d4a7eced93778b739bf58c30ad75e7"
dependencies = [
"anyhow",
"async-nats",
"async-trait",
"bytes",
"cloudevents-sdk",
"futures",
"oci-distribution",
"opentelemetry 0.21.0",
"opentelemetry_sdk 0.21.2",
"serde",
"serde_json",
"tokio",
"tracing",
"tracing-opentelemetry 0.22.0",
"wasmcloud-core 0.5.0",
]
[[package]] [[package]]
name = "wasmcloud-core" name = "wasmcloud-core"
version = "0.4.0" version = "0.4.0"
@ -4414,7 +4377,7 @@ dependencies = [
"tracing", "tracing",
"ulid", "ulid",
"uuid", "uuid",
"wascap 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)", "wascap",
"wrpc-transport 0.22.0", "wrpc-transport 0.22.0",
"wrpc-transport-nats 0.19.0", "wrpc-transport-nats 0.19.0",
] ]
@ -4422,7 +4385,8 @@ dependencies = [
[[package]] [[package]]
name = "wasmcloud-core" name = "wasmcloud-core"
version = "0.5.0" version = "0.5.0"
source = "git+https://github.com/wasmcloud/wasmcloud.git?tag=wash-cli-v0.27.0-alpha.2#f6e5f0e804d4a7eced93778b739bf58c30ad75e7" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a646a51b5c4410c6bd93aecbcc98f24921c88712de4549ab64617d88fb4acb88"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-nats", "async-nats",
@ -4444,14 +4408,14 @@ dependencies = [
"tracing", "tracing",
"ulid", "ulid",
"uuid", "uuid",
"wascap 0.13.0 (git+https://github.com/wasmcloud/wasmcloud.git?tag=wash-cli-v0.27.0-alpha.2)", "wascap",
"wrpc-transport 0.24.2", "wrpc-transport 0.24.2",
"wrpc-transport-nats 0.21.0", "wrpc-transport-nats 0.21.0",
] ]
[[package]] [[package]]
name = "wasmcloud-operator" name = "wasmcloud-operator"
version = "0.2.0" version = "0.2.0-alpha.1"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-nats", "async-nats",

View File

@ -1,6 +1,6 @@
[package] [package]
name = "wasmcloud-operator" name = "wasmcloud-operator"
version = "0.2.0" version = "0.2.0-alpha.1"
edition = "2021" edition = "2021"
[[bin]] [[bin]]
@ -21,53 +21,60 @@ path = "src/lib.rs"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
async-nats = {workspace = true} async-nats = { workspace = true }
axum = {workspace = true} axum = { workspace = true }
axum-server = {workspace = true} axum-server = { workspace = true }
anyhow = {workspace = true} anyhow = { workspace = true }
ctrlc = {workspace = true} ctrlc = { workspace = true }
cloudevents-sdk = {workspace = true} cloudevents-sdk = { workspace = true }
config = {workspace = true} config = { workspace = true }
futures = {workspace = true} futures = { workspace = true }
handlebars = {workspace = true} handlebars = { workspace = true }
json-patch = {workspace = true} json-patch = { workspace = true }
k8s-openapi = {workspace = true, features = ["v1_28", "schemars"]} k8s-openapi = { workspace = true, features = ["v1_28", "schemars"] }
kube = {workspace = true, features = ["runtime", "derive", "default"]} kube = { workspace = true, features = ["runtime", "derive", "default"] }
opentelemetry = {workspace = true} opentelemetry = { workspace = true }
opentelemetry-otlp = {workspace = true} opentelemetry-otlp = { workspace = true }
rcgen = {workspace = true} rcgen = { workspace = true }
schemars = {workspace = true} schemars = { workspace = true }
secrecy = {workspace = true} secrecy = { workspace = true }
serde = {workspace = true} serde = { workspace = true }
serde_json = {workspace = true} serde_json = { workspace = true }
serde_yaml = {workspace = true} serde_yaml = { workspace = true }
thiserror = {workspace = true} thiserror = { workspace = true }
time = {workspace = true} time = { workspace = true }
tokio = {workspace = true} tokio = { workspace = true }
tokio-util = {workspace = true} tokio-util = { workspace = true }
tracing = {workspace = true} tracing = { workspace = true }
tracing-opentelemetry = {workspace = true} tracing-opentelemetry = { workspace = true }
tracing-subscriber = {workspace = true} tracing-subscriber = { workspace = true }
utoipa = {workspace = true} utoipa = { workspace = true }
uuid = {workspace = true} uuid = { workspace = true }
wadm = {workspace = true} wadm = { workspace = true }
wash-lib = {workspace = true} wash-lib = { workspace = true }
wasmcloud-operator-types = {workspace = true} wasmcloud-operator-types = { workspace = true }
[workspace.dependencies] [workspace.dependencies]
async-nats = "0.33" async-nats = "0.33"
axum = { version = "0.6", features = ["headers"] } axum = { version = "0.6", features = ["headers"] }
axum-server = { version = "0.4", features = ["tls-rustls"] } axum-server = { version = "0.4", features = ["tls-rustls"] }
anyhow = "1" anyhow = "1"
config = {version = "0.14", default-features = false, features = ["convert-case", "async"]} config = { version = "0.14", default-features = false, features = [
"convert-case",
"async",
] }
cloudevents-sdk = "0.7" cloudevents-sdk = "0.7"
ctrlc = "3" ctrlc = "3"
futures = "0.3" futures = "0.3"
handlebars = "5.1" handlebars = "5.1"
json-patch = "1.2.0" json-patch = "1.2.0"
k8s-openapi = { version = "0.20", default-features = false} k8s-openapi = { version = "0.20", default-features = false }
kube = { version = "0.87", default-features = false} kube = { version = "0.87", default-features = false }
opentelemetry = { version = "0.20", features = ["metrics", "trace", "rt-tokio"] } opentelemetry = { version = "0.20", features = [
"metrics",
"trace",
"rt-tokio",
] }
opentelemetry-otlp = { version = "0.13", features = ["tokio"] } opentelemetry-otlp = { version = "0.13", features = ["tokio"] }
rcgen = "0.11" rcgen = "0.11"
schemars = "0.8" schemars = "0.8"
@ -84,14 +91,12 @@ tracing-opentelemetry = "0.20"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
utoipa = { version = "4.1", features = ["axum_extras"] } utoipa = { version = "4.1", features = ["axum_extras"] }
uuid = { version = "1", features = ["v5"] } uuid = { version = "1", features = ["v5"] }
wadm = "0.11.0-alpha.2" wadm = "0.11.0-alpha.3"
wash-lib = { git = "https://github.com/wasmcloud/wasmcloud.git",tag = "wash-cli-v0.27.0-alpha.2" } wash-lib = "0.20.0-alpha.2"
wasmcloud-operator-types = { version="*", path = "./crates/types" } wasmcloud-operator-types = { version = "*", path = "./crates/types" }
[workspace] [workspace]
members = [ members = ["crates/*"]
"crates/*"
]
resolver = "2" resolver = "2"
[profile.release] [profile.release]

View File

@ -187,7 +187,7 @@
same "printed page" as the copyright notice for easier same "printed page" as the copyright notice for easier
identification within third-party archives. identification within third-party archives.
Copyright [yyyy] [name of copyright owner] Copyright 2024 wasmCloud Maintainers
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.

View File

@ -13,7 +13,7 @@ spec:
hostLabels: hostLabels:
test: value test: value
# Which wasmCloud version to use # Which wasmCloud version to use
version: 1.0.0-rc.1 version: 1.0.0-rc.2
# The name of a secret in the same namespace that provides the required secrets. # The name of a secret in the same namespace that provides the required secrets.
secretName: cluster-secrets secretName: cluster-secrets
logLevel: INFO logLevel: INFO
@ -22,19 +22,19 @@ spec:
# This includes setting resource requirements for the nats and wasmCloud host # This includes setting resource requirements for the nats and wasmCloud host
# containers along with any additional pot template settings. # containers along with any additional pot template settings.
#schedulingOptions: #schedulingOptions:
# Enable the following to run the wasmCloud hosts as a DaemonSet # Enable the following to run the wasmCloud hosts as a DaemonSet
#daemonset: true #daemonset: true
# Set the resource requirements for the nats and wasmCloud host containers. # Set the resource requirements for the nats and wasmCloud host containers.
#resources: #resources:
# nats: # nats:
# requests: # requests:
# cpu: 100m # cpu: 100m
# wasmCloudHost: # wasmCloudHost:
# requests: # requests:
# cpu: 100m # cpu: 100m
# Any additional pod template settings to apply to the wasmCloud host pods. # Any additional pod template settings to apply to the wasmCloud host pods.
# See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.29/#podspec-v1-core for all valid options. # See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.29/#podspec-v1-core for all valid options.
# Note that you *cannot* set the `containers` field here as it is managed by the controller. # Note that you *cannot* set the `containers` field here as it is managed by the controller.
#pod_template_additions: #pod_template_additions:
# nodeSelector: # nodeSelector:
# kubernetes.io/os: linux # kubernetes.io/os: linux

View File

@ -279,31 +279,12 @@ pub async fn create_application(
Err(e) => return internal_error(anyhow!("Unable to list cosmonic host configs: {}", e)), Err(e) => return internal_error(anyhow!("Unable to list cosmonic host configs: {}", e)),
}; };
let mut cfgs = cfgs.iter();
// TODO(joonas): Remove this once we move to pulling NATS creds+secrets from lattice instead of hosts. // TODO(joonas): Remove this once we move to pulling NATS creds+secrets from lattice instead of hosts.
let mut lattice_id = String::new(); let (nats_client, lattice_id) =
let nats_client = loop { match get_lattice_connection(cfgs.into_iter(), state, namespace).await {
let Some(cfg) = cfgs.next() else { Ok(data) => data,
return internal_error(anyhow!( Err(resp) => return resp,
"unable to find Host Config to use for initializing nats client "
));
}; };
let cluster_url = cfg.spec.nats_address.clone();
lattice_id = cfg.spec.lattice.clone();
let lattice_name = cfg.metadata.name.clone().unwrap();
let nst: NameNamespace = NameNamespace::new(lattice_name.clone(), namespace.clone());
match get_client(&cluster_url, state.nats_creds.clone(), nst).await {
Ok(c) => break Some(c),
Err(e) => {
error!("error connecting to nats: {}", e);
continue;
}
};
};
let Some(nats_client) = nats_client else {
return internal_error(anyhow!("unable to initialize nats client"));
};
let model: serde_json::Value = match serde_json::from_slice(&body) { let model: serde_json::Value = match serde_json::from_slice(&body) {
Ok(v) => v, Ok(v) => v,
@ -539,31 +520,12 @@ pub async fn get_application(
Err(e) => return internal_error(anyhow!("unable to list cosmonic host configs: {}", e)), Err(e) => return internal_error(anyhow!("unable to list cosmonic host configs: {}", e)),
}; };
let mut cfgs = cfgs.iter();
// TODO(joonas): Remove this once we move to pulling NATS creds+secrets from lattice instead of hosts. // TODO(joonas): Remove this once we move to pulling NATS creds+secrets from lattice instead of hosts.
let mut lattice_id = String::new(); let (nats_client, lattice_id) =
let nats_client = loop { match get_lattice_connection(cfgs.into_iter(), state, namespace.clone()).await {
let Some(cfg) = cfgs.next() else { Ok(data) => data,
return internal_error(anyhow!( Err(resp) => return resp,
"unable to find Host Config to use for initializing nats client "
));
}; };
let cluster_url = cfg.spec.nats_address.clone();
lattice_id = cfg.spec.lattice.clone();
let lattice_name = cfg.metadata.name.clone().unwrap();
let nst: NameNamespace = NameNamespace::new(lattice_name.clone(), namespace.clone());
match get_client(&cluster_url, state.nats_creds.clone(), nst).await {
Ok(c) => break Some(c),
Err(e) => {
error!("error connecting to nats: {}", e);
continue;
}
};
};
let Some(nats_client) = nats_client else {
return internal_error(anyhow!("unable to initialize nats client"));
};
let get = let get =
match wash_lib::app::get_model_details(&nats_client, Some(lattice_id.clone()), &name, None) match wash_lib::app::get_model_details(&nats_client, Some(lattice_id.clone()), &name, None)
@ -662,31 +624,12 @@ pub async fn patch_application(
Err(e) => return internal_error(anyhow!("unable to list cosmonic host configs: {}", e)), Err(e) => return internal_error(anyhow!("unable to list cosmonic host configs: {}", e)),
}; };
let mut cfgs = cfgs.iter();
// TODO(joonas): Remove this once we move to pulling NATS creds+secrets from lattice instead of hosts. // TODO(joonas): Remove this once we move to pulling NATS creds+secrets from lattice instead of hosts.
let mut lattice_id = String::new(); let (nats_client, lattice_id) =
let nats_client = loop { match get_lattice_connection(cfgs.into_iter(), state, namespace).await {
let Some(cfg) = cfgs.next() else { Ok(data) => data,
return internal_error(anyhow!( Err(resp) => return resp,
"unable to find Host Config to use for initializing nats client "
));
}; };
let cluster_url = cfg.spec.nats_address.clone();
lattice_id = cfg.spec.lattice.clone();
let lattice_name = cfg.metadata.name.clone().unwrap();
let nst: NameNamespace = NameNamespace::new(lattice_name.clone(), namespace.clone());
match get_client(&cluster_url, state.nats_creds.clone(), nst).await {
Ok(c) => break Some(c),
Err(e) => {
error!("error connecting to nats: {}", e);
continue;
}
};
};
let Some(nats_client) = nats_client else {
return internal_error(anyhow!("unable to initialize nats client"));
};
// Fist, check if the model exists. // Fist, check if the model exists.
// TODO(joonas): we should likely fetch the version of the manifest that's running in Kubernetes // TODO(joonas): we should likely fetch the version of the manifest that's running in Kubernetes
@ -791,31 +734,12 @@ pub async fn delete_application(
Err(e) => return internal_error(anyhow!("unable to list cosmonic host configs: {}", e)), Err(e) => return internal_error(anyhow!("unable to list cosmonic host configs: {}", e)),
}; };
let mut cfgs = cfgs.iter();
// TODO(joonas): Remove this once we move to pulling NATS creds+secrets from lattice instead of hosts. // TODO(joonas): Remove this once we move to pulling NATS creds+secrets from lattice instead of hosts.
let mut lattice_id = String::new(); let (nats_client, lattice_id) =
let nats_client = loop { match get_lattice_connection(cfgs.into_iter(), state, namespace).await {
let Some(cfg) = cfgs.next() else { Ok(data) => data,
return internal_error(anyhow!( Err(resp) => return resp,
"unable to find Host Config to use for initializing nats client "
));
}; };
let cluster_url = cfg.spec.nats_address.clone();
lattice_id = cfg.spec.lattice.clone();
let lattice_name = cfg.metadata.name.clone().unwrap();
let nst: NameNamespace = NameNamespace::new(lattice_name.clone(), namespace.clone());
match get_client(&cluster_url, state.nats_creds.clone(), nst).await {
Ok(c) => break Some(c),
Err(e) => {
error!("error connecting to nats: {}", e);
continue;
}
};
};
let Some(nats_client) = nats_client else {
return internal_error(anyhow!("unable to initialize nats client"));
};
// Fist, check if the model exists. // Fist, check if the model exists.
// TODO(joonas): Replace this with wash_lib::app::get_model_status once // TODO(joonas): Replace this with wash_lib::app::get_model_status once
@ -872,3 +796,32 @@ pub async fn delete_application(
)), )),
} }
} }
async fn get_lattice_connection(
cfgs: impl Iterator<Item = WasmCloudHostConfig>,
state: State,
namespace: String,
) -> Result<(async_nats::Client, String), Response> {
let connection_data =
cfgs.map(|cfg| (cfg, namespace.clone()))
.filter_map(|(cfg, namespace)| {
let cluster_url = cfg.spec.nats_address;
let lattice_id = cfg.spec.lattice;
let lattice_name = cfg.metadata.name?;
let nst: NameNamespace = NameNamespace::new(lattice_name, namespace);
Some((cluster_url, nst, lattice_id))
});
for (cluster_url, ns, lattice_id) in connection_data {
match get_client(&cluster_url, state.nats_creds.clone(), ns).await {
Ok(c) => return Ok((c, lattice_id)),
Err(e) => {
error!(err = %e, %lattice_id, "error connecting to nats");
continue;
}
};
}
// If we get here, we couldn't get a NATS client, so return an error
Err(internal_error(anyhow!("unable to initialize nats client")))
}

View File

@ -1,7 +1,7 @@
use crate::controller::{ use std::collections::{BTreeMap, HashMap, HashSet};
common_labels, CLUSTER_CONFIG_FINALIZER, SERVICE_FINALIZER, use std::net::SocketAddr;
WASMCLOUD_OPERATOR_HOST_LABEL_PREFIX, WASMCLOUD_OPERATOR_MANAGED_BY_LABEL_REQUIREMENT, use std::sync::Arc;
};
use anyhow::Result; use anyhow::Result;
use async_nats::{ use async_nats::{
jetstream, jetstream,
@ -22,9 +22,6 @@ use kube::{
client::Client as KubeClient, client::Client as KubeClient,
Resource, Resource,
}; };
use std::collections::{BTreeMap, HashMap, HashSet};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock}; use tokio::sync::{mpsc, RwLock};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{debug, error, warn}; use tracing::{debug, error, warn};
@ -36,6 +33,11 @@ use wadm::{
use wash_lib::app; use wash_lib::app;
use wasmcloud_operator_types::v1alpha1::WasmCloudHostConfig; use wasmcloud_operator_types::v1alpha1::WasmCloudHostConfig;
use crate::controller::{
common_labels, CLUSTER_CONFIG_FINALIZER, SERVICE_FINALIZER,
WASMCLOUD_OPERATOR_HOST_LABEL_PREFIX, WASMCLOUD_OPERATOR_MANAGED_BY_LABEL_REQUIREMENT,
};
const CONSUMER_PREFIX: &str = "wasmcloud_operator_service"; const CONSUMER_PREFIX: &str = "wasmcloud_operator_service";
// This should probably be exposed by wadm somewhere // This should probably be exposed by wadm somewhere
const WADM_EVT_SUBJECT: &str = "wadm.evt"; const WADM_EVT_SUBJECT: &str = "wadm.evt";
@ -105,10 +107,10 @@ impl Watcher {
tokio::spawn(async move { tokio::spawn(async move {
tokio::select! { tokio::select! {
_ = watcher_dup.shutdown.cancelled() => { _ = watcher_dup.shutdown.cancelled() => {
debug!("Service watcher shutting down for lattice {}", lattice_id); debug!(%lattice_id, "Service watcher shutting down for lattice");
} }
_ = watcher_dup.watch_events(&watcher_dup.consumer) => { _ = watcher_dup.watch_events(&watcher_dup.consumer) => {
error!("Service watcher for lattice {} has stopped", lattice_id); error!(%lattice_id, "Service watcher for lattice has stopped");
} }
} }
}); });
@ -126,7 +128,7 @@ impl Watcher {
.ack() .ack()
.await .await
.map_err(|e| { .map_err(|e| {
error!(error=%e, "Error acking message"); error!(err=%e, "Error acking message");
e e
}) })
.ok(), .ok(),
@ -134,7 +136,7 @@ impl Watcher {
.ack_with(AckKind::Nak(None)) .ack_with(AckKind::Nak(None))
.await .await
.map_err(|e| { .map_err(|e| {
error!(error=%e, "Error nacking message"); error!(err=%e, "Error nacking message");
e e
}) })
.ok(), .ok(),
@ -152,7 +154,7 @@ impl Watcher {
Ok(evt) => evt, Ok(evt) => evt,
Err(e) => { Err(e) => {
warn!( warn!(
error=%e, err=%e,
event_type=%event.ty(), event_type=%event.ty(),
"Error converting cloudevent to wadm event", "Error converting cloudevent to wadm event",
); );
@ -185,7 +187,7 @@ impl Watcher {
let manifest = mp.manifest; let manifest = mp.manifest;
if let Some(httpserver_service) = http_server_component(&manifest) { if let Some(httpserver_service) = http_server_component(&manifest) {
if let Ok(addr) = httpserver_service.address.parse::<SocketAddr>() { if let Ok(addr) = httpserver_service.address.parse::<SocketAddr>() {
debug!("Upserting service for manifest: {}", manifest.metadata.name); debug!(manifest = %manifest.metadata.name, "Upserting service for manifest");
self.tx self.tx
.send(WatcherCommand::UpsertService(ServiceParams { .send(WatcherCommand::UpsertService(ServiceParams {
name: manifest.metadata.name.clone(), name: manifest.metadata.name.clone(),
@ -239,23 +241,23 @@ impl ServiceWatcher {
WatcherCommand::UpsertService(params) => { WatcherCommand::UpsertService(params) => {
create_or_update_service(client.clone(), &params, None) create_or_update_service(client.clone(), &params, None)
.await .await
.map_err(|e| error!(error=%e, "Error creating/updating service")) .map_err(|e| error!(err=%e, "Error creating/updating service"))
.ok(); .ok();
} }
WatcherCommand::RemoveService { name, namespaces } => { WatcherCommand::RemoveService { name, namespaces } => {
for namespace in namespaces { for namespace in namespaces {
delete_service(client.clone(), &namespace, name.as_str()) delete_service(client.clone(), &namespace, name.as_str())
.await .await
.map_err(|e| error!(error=%e, namespace=namespace, "Error deleting service")) .map_err(|e| error!(err=%e, %namespace, "Error deleting service"))
.ok(); .ok();
} }
} }
WatcherCommand::RemoveServices { namespaces } => { WatcherCommand::RemoveServices { namespaces } => {
for namespace in namespaces { for namespace in namespaces {
delete_services(client.clone(), namespace.as_str()) delete_services(client.clone(), namespace.as_str())
.await .await
.map_err(|e| error!(error=%e, namespace=namespace, "Error deleting service")) .map_err(|e| error!(err=%e, %namespace, "Error deleting service"))
.ok(); .ok();
} }
} }
} }
@ -291,10 +293,10 @@ impl ServiceWatcher {
// all // all
let _ = watcher.handle_manifest_published(ManifestPublished { let _ = watcher.handle_manifest_published(ManifestPublished {
manifest: model.manifest.unwrap(), manifest: model.manifest.unwrap(),
}).map_err(|e| error!(error=%e, lattice_id=%lattice_id, app=app.name, "failed to trigger service reconciliation for app")); }).map_err(|e| error!(err = %e, %lattice_id, app = %app.name, "failed to trigger service reconciliation for app"));
} }
} }
Err(e) => warn!(error=%e, "Unable to retrieve model"), Err(e) => warn!(err=%e, "Unable to retrieve model"),
}; };
} }
}; };
@ -404,9 +406,11 @@ pub async fn create_or_update_service(
"app.kubernetes.io/name".to_string(), "app.kubernetes.io/name".to_string(),
"wasmcloud".to_string(), "wasmcloud".to_string(),
); );
host_labels.iter().for_each(|(k, v)| { selector.extend(
selector.insert(format_service_selector(k), v.clone()); host_labels
}) .iter()
.map(|(k, v)| (format_service_selector(k), v.clone())),
);
} else { } else {
create_endpoints = true; create_endpoints = true;
} }
@ -439,7 +443,7 @@ pub async fn create_or_update_service(
svc.metadata.owner_references = Some(vec![owner_ref.clone()]); svc.metadata.owner_references = Some(vec![owner_ref.clone()]);
} }
debug!(service =? svc, namespace=namespace, "Creating/updating service"); debug!(service =? svc, %namespace, "Creating/updating service");
let svc = api let svc = api
.patch( .patch(
@ -449,7 +453,7 @@ pub async fn create_or_update_service(
) )
.await .await
.map_err(|e| { .map_err(|e| {
error!("Error creating/updating service: {}", e); error!(err = %e, "Error creating/updating service");
e e
})?; })?;
@ -473,15 +477,19 @@ pub async fn create_or_update_service(
..Default::default() ..Default::default()
}) })
.await?; .await?;
for pod in pods { let pod_ips = pods
if let Some(status) = pod.status { .into_iter()
if status.phase == Some("Running".to_string()) { .filter_map(|pod| {
if let Some(pod_ips) = status.pod_ips { pod.status.and_then(|status| {
ips.extend(pod_ips); if status.phase == Some("Running".to_string()) {
status.pod_ips
} else {
None
} }
} })
} })
} .flatten();
ips.extend(pod_ips);
} }
} }