feat: add instrumentation to relevant functions

We already added all of the setup code for enabling OTEL tracing, so
this adds instrumentation attributes to actually relay that data.

Fixes #15.

Signed-off-by: Dan Norris <protochron@users.noreply.github.com>
This commit is contained in:
Dan Norris 2024-04-11 09:04:39 -04:00
parent 9a9a4a8f97
commit a87f48f411
6 changed files with 52 additions and 5 deletions

27
Cargo.lock generated
View File

@ -209,6 +209,28 @@ dependencies = [
"url",
]
[[package]]
name = "async-stream"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51"
dependencies = [
"async-stream-impl",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "async-stream-impl"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.38",
]
[[package]]
name = "async-trait"
version = "0.1.74"
@ -3679,10 +3701,12 @@ version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a"
dependencies = [
"async-stream",
"async-trait",
"axum",
"base64 0.21.4",
"bytes",
"flate2",
"futures-core",
"futures-util",
"h2",
@ -3693,7 +3717,10 @@ dependencies = [
"percent-encoding",
"pin-project",
"prost",
"rustls-native-certs 0.6.3",
"rustls-pemfile 1.0.3",
"tokio",
"tokio-rustls 0.24.1",
"tokio-stream",
"tower",
"tower-layer",

View File

@ -68,7 +68,7 @@ json-patch = "1.2.0"
k8s-openapi = { version = "0.20", default-features = false}
kube = { version = "0.87", default-features = false}
opentelemetry = { version = "0.20", features = ["metrics", "trace", "rt-tokio"] }
opentelemetry-otlp = { version = "0.13", features = ["tokio"] }
opentelemetry-otlp = { version = "0.13", features = ["tokio", "tls-roots", "gzip-tonic"] }
rcgen = "0.11"
schemars = "0.8"
secrecy = "0.8"

View File

@ -29,7 +29,7 @@ use std::collections::{BTreeMap, HashMap};
use std::str::from_utf8;
use std::sync::Arc;
use tokio::{sync::RwLock, time::Duration};
use tracing::{debug, info, warn};
use tracing::{debug, info, instrument, warn};
use wasmcloud_operator_types::v1alpha1::{
AppStatus, WasmCloudHostConfig, WasmCloudHostConfigStatus,
};
@ -89,6 +89,7 @@ impl Secrets {
}
}
#[instrument(level = "debug", skip(cluster, ctx))]
pub async fn reconcile(cluster: Arc<WasmCloudHostConfig>, ctx: Arc<Context>) -> Result<Action> {
let cluster_configs: Api<WasmCloudHostConfig> =
Api::namespaced(ctx.client.clone(), &cluster.namespace().unwrap());
@ -113,6 +114,7 @@ pub async fn reconcile(cluster: Arc<WasmCloudHostConfig>, ctx: Arc<Context>) ->
.map_err(|e| Error::FinalizerError(Box::new(e)))
}
#[instrument(level = "debug", skip(config, ctx), fields(name = config.name_any(), namespace = config.namespace().unwrap_or_default()))]
async fn reconcile_crd(config: &WasmCloudHostConfig, ctx: Arc<Context>) -> Result<Action> {
let client = ctx.client.clone();
let ns = config.namespace().unwrap();
@ -228,6 +230,7 @@ async fn reconcile_crd(config: &WasmCloudHostConfig, ctx: Arc<Context>) -> Resul
Ok(Action::requeue(Duration::from_secs(5 * 60)))
}
#[instrument(level = "debug", skip(config, ctx), fields(name = config.name_any(), namespace = config.namespace().unwrap_or_default()))]
async fn cleanup(config: &WasmCloudHostConfig, ctx: Arc<Context>) -> Result<Action> {
let client = ctx.client.clone();
let ns = config.namespace().unwrap();
@ -871,6 +874,7 @@ fn error_policy(_object: Arc<WasmCloudHostConfig>, _error: &Error, _ctx: Arc<Con
Action::requeue(Duration::from_secs(1))
}
#[instrument(level = "info", skip(state))]
pub async fn run(state: State) -> anyhow::Result<()> {
let client = Client::try_default().await?;

View File

@ -19,7 +19,7 @@ use secrecy::{ExposeSecret, SecretString};
use serde::Serialize;
use serde_json::json;
use tokio::sync::RwLock;
use tracing::error;
use tracing::{error, instrument};
use uuid::Uuid;
use wadm::{
model::Manifest,
@ -264,6 +264,7 @@ impl From<Vec<Manifest>> for ApplicationTable {
post,
path = "/apis/core.oam.dev/v1beta1/namespaces/{namespace}/applications"
)]
#[instrument(level = "debug", skip(state, body))]
pub async fn create_application(
Path(namespace): Path<String>,
AxumState(state): AxumState<State>,
@ -376,6 +377,7 @@ pub async fn create_application(
}
#[utoipa::path(get, path = "/apis/core.oam.dev/v1beta1/applications")]
#[instrument(level = "debug", skip(state))]
pub async fn list_all_applications(
TypedHeader(accept): TypedHeader<Accept>,
AxumState(state): AxumState<State>,
@ -434,6 +436,7 @@ pub async fn list_all_applications(
get,
path = "/apis/core.oam.dev/v1beta1/namespaces/{namespace}/applications"
)]
#[instrument(level = "debug", skip(state))]
pub async fn list_applications(
TypedHeader(accept): TypedHeader<Accept>,
Path(namespace): Path<String>,
@ -483,6 +486,7 @@ pub async fn list_applications(
}
}
#[instrument(level = "debug", skip(creds))]
pub async fn list_apps(
cluster_url: &str,
creds: Option<&SecretString>,
@ -501,6 +505,7 @@ pub async fn list_apps(
Ok(models)
}
#[instrument(level = "debug", skip(nats_creds))]
pub async fn get_client(
cluster_url: &str,
nats_creds: Arc<RwLock<HashMap<NameNamespace, SecretString>>>,
@ -523,6 +528,7 @@ pub async fn get_client(
get,
path = "/apis/core.oam.dev/v1beta1/namespaces/{namespace}/applications/{name}"
)]
#[instrument(level = "debug", skip(state))]
pub async fn get_application(
TypedHeader(accept): TypedHeader<Accept>,
Path((namespace, name)): Path<(String, String)>,
@ -647,6 +653,7 @@ pub async fn get_application(
patch,
path = "/apis/core.oam.dev/v1beta1/namespaces/{namespace}/applications/{name}"
)]
#[instrument(level = "debug", skip(state, body))]
pub async fn patch_application(
Path((namespace, name)): Path<(String, String)>,
AxumState(state): AxumState<State>,
@ -776,6 +783,7 @@ pub async fn patch_application(
delete,
path = "/apis/core.oam.dev/v1beta1/namespaces/{namespace}/applications/{name}"
)]
#[instrument(level = "debug", skip(state))]
pub async fn delete_application(
Path((namespace, name)): Path<(String, String)>,
AxumState(state): AxumState<State>,

View File

@ -13,7 +13,7 @@ use k8s_openapi::apimachinery::pkg::apis::meta::v1::{
};
use kube::core::{GroupVersionKind, ListMeta};
use serde::Serialize;
use tracing::error;
use tracing::{error, instrument};
use crate::{
discovery::{
@ -29,6 +29,7 @@ use crate::{
State,
};
#[instrument(level = "debug", skip_all)]
pub fn setup(state: State) -> Router {
let openapi_router = openapi::router();
Router::new()

View File

@ -27,7 +27,7 @@ use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, warn};
use tracing::{debug, error, instrument, warn};
use wadm::{
events::{Event, ManifestPublished, ManifestUnpublished},
model::{self, Manifest, Trait, TraitProperty},
@ -271,6 +271,7 @@ impl ServiceWatcher {
/// Reconciles services for a set of apps in a lattice.
/// This intended to be called by the controller whenever it reconciles state.
#[instrument(level = "info", skip(self))]
pub async fn reconcile_services(&self, apps: Vec<ModelSummary>, lattice_id: String) {
if let Some(watcher) = self.watchers.read().await.get(lattice_id.as_str()) {
for app in apps {
@ -302,6 +303,7 @@ impl ServiceWatcher {
/// Create a new [`Watcher`] for a lattice.
/// It will return early if a [`Watcher`] already exists for the lattice.
#[instrument(level = "info", skip(self, client))]
pub async fn watch(&self, client: Client, namespace: String, lattice_id: String) -> Result<()> {
// If we're already watching this lattice then return early
// TODO is there an easy way to do this with a read lock?
@ -368,6 +370,7 @@ impl ServiceWatcher {
}
/// Stops watching a lattice by stopping the underlying [`Watcher`] if no namespaces require it.
#[instrument(level = "info", skip(self))]
pub async fn stop_watch(&self, lattice_id: String, namespace: String) -> Result<()> {
let mut watchers = self.watchers.write().await;
if let Some(watcher) = watchers.get_mut(lattice_id.as_str()) {
@ -387,6 +390,7 @@ impl ServiceWatcher {
}
/// Creates or updates a service in the cluster based on the provided parameters.
#[instrument(level = "info", skip(k8s_client))]
pub async fn create_or_update_service(
k8s_client: KubeClient,
params: &ServiceParams,
@ -556,6 +560,7 @@ pub struct HttpServerComponent {
}
/// Finds the httpserver component in a manifest and returns the details needed to create a service
#[instrument(level = "debug", skip_all)]
fn http_server_component(manifest: &Manifest) -> Option<HttpServerComponent> {
let components: Vec<&model::Component> = manifest
.spec
@ -649,6 +654,7 @@ fn http_server_component(manifest: &Manifest) -> Option<HttpServerComponent> {
}
/// Deletes a service in the cluster.
#[instrument(level = "info", skip(k8s_client))]
async fn delete_service(k8s_client: KubeClient, namespace: &str, name: &str) -> Result<()> {
debug!(namespace = namespace, name = name, "Deleting service");
let api = Api::<Service>::namespaced(k8s_client.clone(), namespace);
@ -671,6 +677,7 @@ async fn delete_service(k8s_client: KubeClient, namespace: &str, name: &str) ->
Ok(())
}
#[instrument(level = "info", skip(k8s_client))]
async fn delete_services(k8s_client: KubeClient, namespace: &str) -> Result<()> {
let api = Api::<Service>::namespaced(k8s_client.clone(), namespace);
let services = api