diff --git a/Cargo.lock b/Cargo.lock index 1b84532..c73d314 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -209,6 +209,28 @@ dependencies = [ "url", ] +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.38", +] + [[package]] name = "async-trait" version = "0.1.74" @@ -3679,10 +3701,12 @@ version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" dependencies = [ + "async-stream", "async-trait", "axum", "base64 0.21.4", "bytes", + "flate2", "futures-core", "futures-util", "h2", @@ -3693,7 +3717,10 @@ dependencies = [ "percent-encoding", "pin-project", "prost", + "rustls-native-certs 0.6.3", + "rustls-pemfile 1.0.3", "tokio", + "tokio-rustls 0.24.1", "tokio-stream", "tower", "tower-layer", diff --git a/Cargo.toml b/Cargo.toml index 780789d..ecee647 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,7 +68,7 @@ json-patch = "1.2.0" k8s-openapi = { version = "0.20", default-features = false} kube = { version = "0.87", default-features = false} opentelemetry = { version = "0.20", features = ["metrics", "trace", "rt-tokio"] } -opentelemetry-otlp = { version = "0.13", features = ["tokio"] } +opentelemetry-otlp = { version = "0.13", features = ["tokio", "tls-roots", "gzip-tonic"] } rcgen = "0.11" schemars = "0.8" secrecy = "0.8" diff --git a/src/controller.rs b/src/controller.rs index 550a6da..0de3067 100644 --- a/src/controller.rs +++ b/src/controller.rs @@ -29,7 +29,7 @@ use std::collections::{BTreeMap, HashMap}; use std::str::from_utf8; use std::sync::Arc; use tokio::{sync::RwLock, time::Duration}; -use tracing::{debug, info, warn}; +use tracing::{debug, info, instrument, warn}; use wasmcloud_operator_types::v1alpha1::{ AppStatus, WasmCloudHostConfig, WasmCloudHostConfigStatus, }; @@ -89,6 +89,7 @@ impl Secrets { } } +#[instrument(level = "debug", skip(cluster, ctx))] pub async fn reconcile(cluster: Arc, ctx: Arc) -> Result { let cluster_configs: Api = Api::namespaced(ctx.client.clone(), &cluster.namespace().unwrap()); @@ -113,6 +114,7 @@ pub async fn reconcile(cluster: Arc, ctx: Arc) -> .map_err(|e| Error::FinalizerError(Box::new(e))) } +#[instrument(level = "debug", skip(config, ctx), fields(name = config.name_any(), namespace = config.namespace().unwrap_or_default()))] async fn reconcile_crd(config: &WasmCloudHostConfig, ctx: Arc) -> Result { let client = ctx.client.clone(); let ns = config.namespace().unwrap(); @@ -228,6 +230,7 @@ async fn reconcile_crd(config: &WasmCloudHostConfig, ctx: Arc) -> Resul Ok(Action::requeue(Duration::from_secs(5 * 60))) } +#[instrument(level = "debug", skip(config, ctx), fields(name = config.name_any(), namespace = config.namespace().unwrap_or_default()))] async fn cleanup(config: &WasmCloudHostConfig, ctx: Arc) -> Result { let client = ctx.client.clone(); let ns = config.namespace().unwrap(); @@ -871,6 +874,7 @@ fn error_policy(_object: Arc, _error: &Error, _ctx: Arc anyhow::Result<()> { let client = Client::try_default().await?; diff --git a/src/resources/application.rs b/src/resources/application.rs index 59fab2e..d81277a 100644 --- a/src/resources/application.rs +++ b/src/resources/application.rs @@ -19,7 +19,7 @@ use secrecy::{ExposeSecret, SecretString}; use serde::Serialize; use serde_json::json; use tokio::sync::RwLock; -use tracing::error; +use tracing::{error, instrument}; use uuid::Uuid; use wadm::{ model::Manifest, @@ -264,6 +264,7 @@ impl From> for ApplicationTable { post, path = "/apis/core.oam.dev/v1beta1/namespaces/{namespace}/applications" )] +#[instrument(level = "debug", skip(state, body))] pub async fn create_application( Path(namespace): Path, AxumState(state): AxumState, @@ -376,6 +377,7 @@ pub async fn create_application( } #[utoipa::path(get, path = "/apis/core.oam.dev/v1beta1/applications")] +#[instrument(level = "debug", skip(state))] pub async fn list_all_applications( TypedHeader(accept): TypedHeader, AxumState(state): AxumState, @@ -434,6 +436,7 @@ pub async fn list_all_applications( get, path = "/apis/core.oam.dev/v1beta1/namespaces/{namespace}/applications" )] +#[instrument(level = "debug", skip(state))] pub async fn list_applications( TypedHeader(accept): TypedHeader, Path(namespace): Path, @@ -483,6 +486,7 @@ pub async fn list_applications( } } +#[instrument(level = "debug", skip(creds))] pub async fn list_apps( cluster_url: &str, creds: Option<&SecretString>, @@ -501,6 +505,7 @@ pub async fn list_apps( Ok(models) } +#[instrument(level = "debug", skip(nats_creds))] pub async fn get_client( cluster_url: &str, nats_creds: Arc>>, @@ -523,6 +528,7 @@ pub async fn get_client( get, path = "/apis/core.oam.dev/v1beta1/namespaces/{namespace}/applications/{name}" )] +#[instrument(level = "debug", skip(state))] pub async fn get_application( TypedHeader(accept): TypedHeader, Path((namespace, name)): Path<(String, String)>, @@ -647,6 +653,7 @@ pub async fn get_application( patch, path = "/apis/core.oam.dev/v1beta1/namespaces/{namespace}/applications/{name}" )] +#[instrument(level = "debug", skip(state, body))] pub async fn patch_application( Path((namespace, name)): Path<(String, String)>, AxumState(state): AxumState, @@ -776,6 +783,7 @@ pub async fn patch_application( delete, path = "/apis/core.oam.dev/v1beta1/namespaces/{namespace}/applications/{name}" )] +#[instrument(level = "debug", skip(state))] pub async fn delete_application( Path((namespace, name)): Path<(String, String)>, AxumState(state): AxumState, diff --git a/src/router.rs b/src/router.rs index 4874014..a5638e2 100644 --- a/src/router.rs +++ b/src/router.rs @@ -13,7 +13,7 @@ use k8s_openapi::apimachinery::pkg::apis::meta::v1::{ }; use kube::core::{GroupVersionKind, ListMeta}; use serde::Serialize; -use tracing::error; +use tracing::{error, instrument}; use crate::{ discovery::{ @@ -29,6 +29,7 @@ use crate::{ State, }; +#[instrument(level = "debug", skip_all)] pub fn setup(state: State) -> Router { let openapi_router = openapi::router(); Router::new() diff --git a/src/services.rs b/src/services.rs index ca8dd2d..65a7c0a 100644 --- a/src/services.rs +++ b/src/services.rs @@ -27,7 +27,7 @@ use std::net::SocketAddr; use std::sync::Arc; use tokio::sync::{mpsc, RwLock}; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, warn}; +use tracing::{debug, error, instrument, warn}; use wadm::{ events::{Event, ManifestPublished, ManifestUnpublished}, model::{self, Manifest, Trait, TraitProperty}, @@ -271,6 +271,7 @@ impl ServiceWatcher { /// Reconciles services for a set of apps in a lattice. /// This intended to be called by the controller whenever it reconciles state. + #[instrument(level = "info", skip(self))] pub async fn reconcile_services(&self, apps: Vec, lattice_id: String) { if let Some(watcher) = self.watchers.read().await.get(lattice_id.as_str()) { for app in apps { @@ -302,6 +303,7 @@ impl ServiceWatcher { /// Create a new [`Watcher`] for a lattice. /// It will return early if a [`Watcher`] already exists for the lattice. + #[instrument(level = "info", skip(self, client))] pub async fn watch(&self, client: Client, namespace: String, lattice_id: String) -> Result<()> { // If we're already watching this lattice then return early // TODO is there an easy way to do this with a read lock? @@ -368,6 +370,7 @@ impl ServiceWatcher { } /// Stops watching a lattice by stopping the underlying [`Watcher`] if no namespaces require it. + #[instrument(level = "info", skip(self))] pub async fn stop_watch(&self, lattice_id: String, namespace: String) -> Result<()> { let mut watchers = self.watchers.write().await; if let Some(watcher) = watchers.get_mut(lattice_id.as_str()) { @@ -387,6 +390,7 @@ impl ServiceWatcher { } /// Creates or updates a service in the cluster based on the provided parameters. +#[instrument(level = "info", skip(k8s_client))] pub async fn create_or_update_service( k8s_client: KubeClient, params: &ServiceParams, @@ -556,6 +560,7 @@ pub struct HttpServerComponent { } /// Finds the httpserver component in a manifest and returns the details needed to create a service +#[instrument(level = "debug", skip_all)] fn http_server_component(manifest: &Manifest) -> Option { let components: Vec<&model::Component> = manifest .spec @@ -649,6 +654,7 @@ fn http_server_component(manifest: &Manifest) -> Option { } /// Deletes a service in the cluster. +#[instrument(level = "info", skip(k8s_client))] async fn delete_service(k8s_client: KubeClient, namespace: &str, name: &str) -> Result<()> { debug!(namespace = namespace, name = name, "Deleting service"); let api = Api::::namespaced(k8s_client.clone(), namespace); @@ -671,6 +677,7 @@ async fn delete_service(k8s_client: KubeClient, namespace: &str, name: &str) -> Ok(()) } +#[instrument(level = "info", skip(k8s_client))] async fn delete_services(k8s_client: KubeClient, namespace: &str) -> Result<()> { let api = Api::::namespaced(k8s_client.clone(), namespace); let services = api