Compare commits
62 Commits
chart-v0.1
...
main
Author | SHA1 | Date |
---|---|---|
|
a3f6886cb3 | |
|
599aec9cbc | |
|
4b8134a5f8 | |
|
9c3208ee3a | |
|
c6d4e59028 | |
|
cb051aac61 | |
|
52007e2927 | |
|
bc2d7bbea2 | |
|
d4758e98f1 | |
|
9cf6542828 | |
|
8d495ec202 | |
|
b7e4bf3761 | |
|
f9daa35480 | |
|
d74ad8bff7 | |
|
4ccfb20b30 | |
|
59552d2277 | |
|
1b129d69c9 | |
|
fff57db4fb | |
|
1c85b3937c | |
|
309b492dcd | |
|
0aaa277422 | |
|
6f63289a11 | |
|
788cbc089d | |
|
4ab407dc15 | |
|
d90e46055e | |
|
5988e643e7 | |
|
69dac08133 | |
|
2906b3f56e | |
|
ba909d170c | |
|
f7d3ddef5f | |
|
0c024e7718 | |
|
e6de0f9394 | |
|
38e77cd643 | |
|
226f9ab39e | |
|
ee120eb5d1 | |
|
26deb719ac | |
|
c30e1f8953 | |
|
97f0c12d03 | |
|
7c709465c0 | |
|
4f4bde877e | |
|
32fa6e40da | |
|
674ee31cb0 | |
|
27e83fe24c | |
|
d4a47a4d7c | |
|
0ebc5229bb | |
|
8853b526d4 | |
|
d0f810cd6c | |
|
bf8a74acfd | |
|
00bc95f293 | |
|
b3cfb7da54 | |
|
e7a5e9dc1f | |
|
845f78841a | |
|
f83dbf9215 | |
|
fa5594bab8 | |
|
7dcb409bde | |
|
b1d88813f4 | |
|
de5026f4e0 | |
|
43d087f1ad | |
|
280f2648e6 | |
|
105e2c54dc | |
|
0952050708 | |
|
5be39536f3 |
|
@ -0,0 +1,2 @@
|
|||
# wasmCloud operator maintainers
|
||||
* @wasmCloud/operator-maintainers
|
|
@ -0,0 +1,10 @@
|
|||
version: 2
|
||||
updates:
|
||||
- package-ecosystem: "cargo"
|
||||
directory: "/"
|
||||
schedule:
|
||||
interval: "daily"
|
||||
- package-ecosystem: "github-actions"
|
||||
directory: "/"
|
||||
schedule:
|
||||
interval: "daily"
|
|
@ -16,7 +16,7 @@ jobs:
|
|||
check:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
- name: Lint
|
||||
run: |
|
||||
cargo clippy -- --no-deps
|
||||
|
@ -71,7 +71,7 @@ jobs:
|
|||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
- name: Log in to the Container registry
|
||||
uses: docker/login-action@v2
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
registry: ${{ env.REGISTRY }}
|
||||
username: ${{ github.repository_owner }}
|
||||
|
@ -86,6 +86,15 @@ jobs:
|
|||
type=sha,prefix=
|
||||
type=semver,pattern={{version}}
|
||||
|
||||
- name: Extract metadata (tags, labels) for Docker
|
||||
id: meta_wolfi
|
||||
uses: docker/metadata-action@v5
|
||||
with:
|
||||
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
|
||||
tags: |
|
||||
type=sha,prefix=,suffix=-wolfi
|
||||
type=semver,pattern={{version}},suffix=-wolfi
|
||||
|
||||
- name: Load artifacts
|
||||
uses: actions/download-artifact@v4
|
||||
with:
|
||||
|
@ -98,7 +107,7 @@ jobs:
|
|||
chmod +x artifacts/wasmcloud-operator*
|
||||
|
||||
- name: Build and push Docker image
|
||||
uses: docker/build-push-action@v4
|
||||
uses: docker/build-push-action@v6
|
||||
with:
|
||||
push: true
|
||||
context: .
|
||||
|
@ -106,3 +115,14 @@ jobs:
|
|||
labels: ${{ steps.meta.outputs.labels }}
|
||||
platforms: linux/amd64,linux/arm64
|
||||
build-args: "BIN_PATH=artifacts/wasmcloud-operator"
|
||||
|
||||
- name: Build and push Docker image (wolfi)
|
||||
uses: docker/build-push-action@v6
|
||||
with:
|
||||
push: true
|
||||
context: .
|
||||
file: './Dockerfile.wolfi'
|
||||
tags: ${{ steps.meta_wolfi.outputs.tags }}
|
||||
labels: ${{ steps.meta_wolfi.outputs.labels }}
|
||||
platforms: linux/amd64,linux/arm64
|
||||
build-args: "BIN_PATH=artifacts/wasmcloud-operator"
|
||||
|
|
|
@ -32,12 +32,12 @@ jobs:
|
|||
|
||||
# Used by helm chart-testing below
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v5.0.0
|
||||
uses: actions/setup-python@v5.6.0
|
||||
with:
|
||||
python-version: '3.12.2'
|
||||
|
||||
- name: Set up chart-testing
|
||||
uses: helm/chart-testing-action@v2.6.1
|
||||
uses: helm/chart-testing-action@v2.7.0
|
||||
with:
|
||||
version: v3.10.1
|
||||
yamllint_version: 1.35.1
|
||||
|
@ -48,7 +48,7 @@ jobs:
|
|||
ct lint --config charts/wasmcloud-operator/ct.yaml
|
||||
|
||||
- name: Create kind cluster
|
||||
uses: helm/kind-action@v1.9.0
|
||||
uses: helm/kind-action@v1.12.0
|
||||
with:
|
||||
version: "v0.22.0"
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
20
Cargo.toml
20
Cargo.toml
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "wasmcloud-operator"
|
||||
version = "0.2.3"
|
||||
version = "0.5.0"
|
||||
edition = "2021"
|
||||
|
||||
[[bin]]
|
||||
|
@ -34,6 +34,7 @@ json-patch = { workspace = true }
|
|||
k8s-openapi = { workspace = true, features = ["v1_28", "schemars"] }
|
||||
kube = { workspace = true, features = ["runtime", "derive", "default"] }
|
||||
opentelemetry = { workspace = true }
|
||||
opentelemetry_sdk = { workspace = true }
|
||||
opentelemetry-otlp = { workspace = true }
|
||||
rcgen = { workspace = true }
|
||||
schemars = { workspace = true }
|
||||
|
@ -51,7 +52,8 @@ tracing-subscriber = { workspace = true }
|
|||
utoipa = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
wadm = { workspace = true }
|
||||
wash-lib = { workspace = true }
|
||||
wadm-client = { workspace = true }
|
||||
wadm-types = { workspace = true }
|
||||
wasmcloud-operator-types = { workspace = true }
|
||||
|
||||
[workspace.dependencies]
|
||||
|
@ -67,15 +69,16 @@ cloudevents-sdk = "0.7"
|
|||
ctrlc = "3"
|
||||
futures = "0.3"
|
||||
handlebars = "5.1"
|
||||
json-patch = "1.2.0"
|
||||
json-patch = "1.4.0"
|
||||
k8s-openapi = { version = "0.20", default-features = false }
|
||||
kube = { version = "0.87", default-features = false }
|
||||
opentelemetry = { version = "0.20", features = [
|
||||
opentelemetry = { version = "0.21", default-features = false }
|
||||
opentelemetry_sdk = { version = "0.21", features = [
|
||||
"metrics",
|
||||
"trace",
|
||||
"rt-tokio",
|
||||
] }
|
||||
opentelemetry-otlp = { version = "0.13", features = ["tokio"] }
|
||||
opentelemetry-otlp = { version = "0.14", features = ["tokio"] }
|
||||
rcgen = "0.11"
|
||||
schemars = "0.8"
|
||||
secrecy = "0.8"
|
||||
|
@ -87,12 +90,13 @@ time = "0.3"
|
|||
tokio = { version = "1", features = ["full"] }
|
||||
tokio-util = { version = "0.7", features = ["rt"] }
|
||||
tracing = "0.1"
|
||||
tracing-opentelemetry = "0.20"
|
||||
tracing-opentelemetry = "0.22"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
|
||||
utoipa = { version = "4.1", features = ["axum_extras"] }
|
||||
uuid = { version = "1", features = ["v5"] }
|
||||
wadm = "0.11.0"
|
||||
wash-lib = "0.20.0"
|
||||
wadm = "0.13.0"
|
||||
wadm-client = "0.2.0"
|
||||
wadm-types = "0.2.0"
|
||||
wasmcloud-operator-types = { version = "*", path = "./crates/types" }
|
||||
|
||||
[workspace]
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
# syntax=docker/dockerfile:1
|
||||
FROM chainguard/wolfi-base:latest
|
||||
ARG BIN_PATH
|
||||
ARG TARGETARCH
|
||||
|
||||
COPY ${BIN_PATH}-${TARGETARCH} /usr/local/bin/wasmcloud-operator
|
||||
ENTRYPOINT ["/usr/local/bin/wasmcloud-operator"]
|
|
@ -0,0 +1,22 @@
|
|||
# MAINTAINERS
|
||||
|
||||
The following individuals are responsible for reviewing code, managing issues, and ensuring the overall quality of `wasmcloud-operator`.
|
||||
|
||||
## @wasmCloud/operator-maintainers
|
||||
|
||||
Name: Joonas Bergius
|
||||
GitHub: @joonas
|
||||
Organization: Cosmonic
|
||||
|
||||
Name: Dan Norris
|
||||
GitHub: @protochron
|
||||
Organization: Cosmonic
|
||||
|
||||
Name: Taylor Thomas
|
||||
GitHub: @thomastaylor312
|
||||
Organization: Cosmonic
|
||||
|
||||
Name: Lucas Fontes
|
||||
GitHub: @lxfontes
|
||||
Organization: Cosmonic
|
||||
|
54
README.md
54
README.md
|
@ -17,22 +17,15 @@ metadata:
|
|||
spec:
|
||||
# The number of wasmCloud host pods to run
|
||||
hostReplicas: 2
|
||||
# The cluster issuers to use for each host
|
||||
issuers:
|
||||
# This needs to be the public key of a cluster seed generated by
|
||||
# `wash keys gen cluster`
|
||||
- CDK...
|
||||
# The lattice to connect the hosts to
|
||||
lattice: 83a5b52e-17cf-4080-bac8-f844099f142e
|
||||
lattice: default
|
||||
# Additional labels to apply to the host other than the defaults set in the operator
|
||||
hostLabels:
|
||||
some-label: value
|
||||
# The address to connect to nats
|
||||
natsAddress: nats://nats-cluster.default.svc.cluster.local
|
||||
natsAddress: nats://nats.default.svc.cluster.local
|
||||
# Which wasmCloud version to use
|
||||
version: 1.0.2
|
||||
# The name of a secret in the same namespace that provides the required secrets.
|
||||
secretName: my-wasmcloud-cluster
|
||||
version: 1.0.4
|
||||
# Enable the following to run the wasmCloud hosts as a DaemonSet
|
||||
#daemonset: true
|
||||
# The name of the image pull secret to use with wasmCloud hosts so that they
|
||||
|
@ -47,18 +40,42 @@ apiVersion: v1
|
|||
kind: Secret
|
||||
metadata:
|
||||
name: my-wasmcloud-cluster
|
||||
stringData:
|
||||
# You can generate this with wash:
|
||||
# `wash keys gen cluster`
|
||||
WASMCLOUD_CLUSTER_SEED: <seed>
|
||||
#data:
|
||||
# Only required if using a NATS creds file
|
||||
# nats.creds: <creds file>
|
||||
# Only required if using a NATS creds file
|
||||
# nats.creds: <creds file>
|
||||
```
|
||||
|
||||
The operator will fail to provision the wasmCloud Deployment if any of these
|
||||
secrets are missing!
|
||||
|
||||
#### Customizing the images used for wasmCloud host and NATS leaf
|
||||
|
||||
If you would like to customize the registry or image that gets used to provision the wasmCloud hosts and the NATS leaf that runs alongside them, you can specify the following options in the above `WasmCloudHostConfig` CRD.
|
||||
|
||||
For wasmCloud Host, use the `image` field:
|
||||
|
||||
```yaml
|
||||
apiVersion: k8s.wasmcloud.dev/v1alpha1
|
||||
kind: WasmCloudHostConfig
|
||||
metadata:
|
||||
name: my-wasmcloud-cluster
|
||||
spec:
|
||||
# other config options omitted
|
||||
image: registry.example.com/wasmcloud:1.0.2
|
||||
```
|
||||
|
||||
For the NATS leaf, use the `natsImageLeaf` field:
|
||||
|
||||
```yaml
|
||||
apiVersion: k8s.wasmcloud.dev/v1alpha1
|
||||
kind: WasmCloudHostConfig
|
||||
metadata:
|
||||
name: my-wasmcloud-cluster
|
||||
spec:
|
||||
# other config options omitted
|
||||
natsLeafImage: registry.example.com/nats:2.10.16
|
||||
```
|
||||
|
||||
### Image Pull Secrets
|
||||
|
||||
You can also specify an image pull secret to use use with the wasmCloud hosts
|
||||
|
@ -104,7 +121,7 @@ config:
|
|||
|
||||
```sh
|
||||
helm repo add nats https://nats-io.github.io/k8s/helm/charts/
|
||||
helm upgrade --install -f values.yaml nats-cluster nats/nats
|
||||
helm upgrade --install -f values.yaml nats nats/nats
|
||||
```
|
||||
|
||||
### Running Wadm
|
||||
|
@ -116,7 +133,7 @@ NATS server deployed above, all you need in your `values.yaml` file is:
|
|||
wadm:
|
||||
config:
|
||||
nats:
|
||||
server: "nats-cluster.default.svc.cluster.local:4222"
|
||||
server: "nats.default.svc.cluster.local:4222"
|
||||
```
|
||||
|
||||
You can deploy Wadm using your values file and Helm:
|
||||
|
@ -199,7 +216,6 @@ data:
|
|||
include [Kind](https://kind.sigs.k8s.io/) or Docker Desktop.
|
||||
- `RUST_LOG=info cargo run`
|
||||
|
||||
|
||||
## Types crate
|
||||
|
||||
This repo stores the types for any CRDs used by the operator in a separate
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
# Reporting a security issue
|
||||
|
||||
Please refer to the [wasmCloud Security Process and Policy](https://github.com/wasmCloud/wasmCloud/blob/main/SECURITY.md) for details on how to report security issues and vulnerabilities.
|
|
@ -7,10 +7,10 @@ type: application
|
|||
# This is the chart version. This version number should be incremented each time you make changes
|
||||
# to the chart and its templates, including the app version.
|
||||
# Versions are expected to follow Semantic Versioning (https://semver.org/)
|
||||
version: 0.1.3
|
||||
version: 0.1.6
|
||||
|
||||
# This is the version number of the application being deployed. This version number should be
|
||||
# incremented each time you make changes to the application. Versions are not expected to
|
||||
# follow Semantic Versioning. They should reflect the version the application is using.
|
||||
# It is recommended to use it with quotes.
|
||||
appVersion: "0.2.3"
|
||||
appVersion: "0.4.0"
|
||||
|
|
|
@ -43,10 +43,15 @@ Common labels
|
|||
{{- define "wasmcloud-operator.labels" -}}
|
||||
helm.sh/chart: {{ include "wasmcloud-operator.chart" . }}
|
||||
{{ include "wasmcloud-operator.selectorLabels" . }}
|
||||
app.kubernetes.io/component: operator
|
||||
{{- if .Chart.AppVersion }}
|
||||
app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
|
||||
{{- end }}
|
||||
app.kubernetes.io/managed-by: {{ .Release.Service }}
|
||||
app.kubernetes.io/part-of: wasmcloud-operator
|
||||
{{- with .Values.additionalLabels }}
|
||||
{{ . | toYaml }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
|
||||
{{/*
|
||||
|
|
|
@ -12,6 +12,9 @@ imagePullSecrets: []
|
|||
nameOverride: ""
|
||||
fullnameOverride: ""
|
||||
|
||||
additionalLabels: {}
|
||||
# app: wasmcloud-operator
|
||||
|
||||
serviceAccount:
|
||||
# Specifies whether a service account should be created
|
||||
create: true
|
||||
|
|
|
@ -1,11 +1,16 @@
|
|||
[package]
|
||||
name = "wasmcloud-operator-types"
|
||||
version = "0.1.4"
|
||||
version = "0.1.9"
|
||||
edition = "2021"
|
||||
|
||||
[package.metadata.cargo-machete]
|
||||
# NOTE: This exists because kube-derive needs it, and for reasons I don't
|
||||
# fully understand, it's not coming through kube-derive's own depedendencies.
|
||||
ignored = ["serde_json"]
|
||||
|
||||
[dependencies]
|
||||
k8s-openapi = {workspace = true}
|
||||
kube = {workspace = true, features = ["derive"]}
|
||||
schemars = {workspace = true}
|
||||
serde = {workspace = true}
|
||||
serde_json = {workspace = true}
|
||||
k8s-openapi = { workspace = true }
|
||||
kube = { workspace = true, features = ["derive"] }
|
||||
schemars = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
|
@ -1,4 +1,4 @@
|
|||
use k8s_openapi::api::core::v1::{PodSpec, ResourceRequirements};
|
||||
use k8s_openapi::api::core::v1::{Container, PodSpec, ResourceRequirements, Volume};
|
||||
use kube::CustomResource;
|
||||
use schemars::{gen::SchemaGenerator, schema::Schema, JsonSchema};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
@ -20,9 +20,10 @@ pub struct WasmCloudHostConfigSpec {
|
|||
/// The number of replicas to use for the wasmCloud host Deployment.
|
||||
#[serde(default = "default_host_replicas")]
|
||||
pub host_replicas: u32,
|
||||
/// A list of cluster issuers to use when provisioning hosts. See
|
||||
/// DEPRECATED: A list of cluster issuers to use when provisioning hosts. See
|
||||
/// https://wasmcloud.com/docs/deployment/security/zero-trust-invocations for more information.
|
||||
pub issuers: Vec<String>,
|
||||
#[deprecated(since = "0.3.1", note = "Removed in wasmcloud 1.0.0")]
|
||||
pub issuers: Option<Vec<String>>,
|
||||
/// The lattice to use for these hosts.
|
||||
pub lattice: String,
|
||||
/// An optional set of labels to apply to these hosts.
|
||||
|
@ -33,9 +34,12 @@ pub struct WasmCloudHostConfigSpec {
|
|||
/// If not provided, the default image for the version will be used.
|
||||
/// Also if provided, the version field will be ignored.
|
||||
pub image: Option<String>,
|
||||
/// The name of a secret containing the primary cluster issuer key along with an optional set
|
||||
/// of NATS credentials.
|
||||
pub secret_name: String,
|
||||
/// The image to use for the NATS leaf that is deployed alongside the wasmCloud host.
|
||||
/// If not provided, the default upstream image will be used.
|
||||
/// If provided, it should be fully qualified by including the image tag.
|
||||
pub nats_leaf_image: Option<String>,
|
||||
/// Optional. The name of a secret containing a set of NATS credentials under 'nats.creds' key.
|
||||
pub secret_name: Option<String>,
|
||||
/// Enable structured logging for host logs.
|
||||
pub enable_structured_logging: Option<bool>,
|
||||
/// Name of a secret containing the registry credentials
|
||||
|
@ -72,6 +76,14 @@ pub struct WasmCloudHostConfigSpec {
|
|||
pub policy_service: Option<PolicyService>,
|
||||
/// Kubernetes scheduling options for the wasmCloud host.
|
||||
pub scheduling_options: Option<KubernetesSchedulingOptions>,
|
||||
/// Observability options for configuring the OpenTelemetry integration
|
||||
pub observability: Option<ObservabilityConfiguration>,
|
||||
/// Certificates: Authorities, client certificates
|
||||
pub certificates: Option<WasmCloudHostCertificates>,
|
||||
/// wasmCloud secrets topic prefix, must not be empty if set.
|
||||
pub secrets_topic_prefix: Option<String>,
|
||||
/// Maximum memory in bytes that components can use.
|
||||
pub max_linear_memory_bytes: Option<u32>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)]
|
||||
|
@ -92,9 +104,62 @@ pub struct KubernetesSchedulingOptions {
|
|||
/// https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ for valid
|
||||
/// values to use here.
|
||||
pub resources: Option<WasmCloudHostConfigResources>,
|
||||
#[schemars(schema_with = "pod_schema")]
|
||||
/// Any other pod template spec options to set for the underlying wasmCloud host pods.
|
||||
#[schemars(schema_with = "pod_schema")]
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub pod_template_additions: Option<PodSpec>,
|
||||
/// Allow for customization of either the wasmcloud or nats leaf container inside of the wasmCloud host pod.
|
||||
pub container_template_additions: Option<ContainerTemplateAdditions>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ContainerTemplateAdditions {
|
||||
#[schemars(schema_with = "container_schema")]
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub nats: Option<Container>,
|
||||
#[schemars(schema_with = "container_schema")]
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub wasmcloud: Option<Container>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ObservabilityConfiguration {
|
||||
#[serde(default)]
|
||||
pub enable: bool,
|
||||
pub endpoint: String,
|
||||
pub protocol: Option<OtelProtocol>,
|
||||
pub logs: Option<OtelSignalConfiguration>,
|
||||
pub metrics: Option<OtelSignalConfiguration>,
|
||||
pub traces: Option<OtelSignalConfiguration>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub enum OtelProtocol {
|
||||
Grpc,
|
||||
Http,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for OtelProtocol {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"{}",
|
||||
match self {
|
||||
OtelProtocol::Grpc => "grpc",
|
||||
OtelProtocol::Http => "http",
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct OtelSignalConfiguration {
|
||||
pub enable: Option<bool>,
|
||||
pub endpoint: Option<String>,
|
||||
}
|
||||
|
||||
/// This is a workaround for the fact that we can't override the PodSpec schema to make containers
|
||||
|
@ -114,6 +179,24 @@ fn pod_schema(_gen: &mut SchemaGenerator) -> Schema {
|
|||
val.schema.into()
|
||||
}
|
||||
|
||||
/// This is a workaround for the fact that we can't override the Container schema to make name
|
||||
/// an optional field. It generates the OpenAPI schema for the Container type the same way that
|
||||
/// kube.rs does while dropping any required fields.
|
||||
fn container_schema(_gen: &mut SchemaGenerator) -> Schema {
|
||||
let gen = schemars::gen::SchemaSettings::openapi3()
|
||||
.with(|s| {
|
||||
s.inline_subschemas = true;
|
||||
s.meta_schema = None;
|
||||
})
|
||||
.with_visitor(kube::core::schema::StructuralSchemaRewriter)
|
||||
.into_generator();
|
||||
let mut val = gen.into_root_schema_for::<Container>();
|
||||
// Drop `name` as a required field as it will be filled in from container
|
||||
// definition coming the controller that this configuration gets merged into.
|
||||
val.schema.object.as_mut().unwrap().required = BTreeSet::new();
|
||||
val.schema.into()
|
||||
}
|
||||
|
||||
fn default_host_replicas() -> u32 {
|
||||
1
|
||||
}
|
||||
|
@ -142,6 +225,11 @@ fn default_nats_leafnode_port() -> u16 {
|
|||
7422
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
|
||||
pub struct WasmCloudHostCertificates {
|
||||
pub authorities: Option<Vec<Volume>>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)]
|
||||
pub struct WasmCloudHostConfigResources {
|
||||
pub nats: Option<ResourceRequirements>,
|
||||
|
|
|
@ -15,7 +15,7 @@ spec:
|
|||
spec:
|
||||
serviceAccountName: wasmcloud-operator
|
||||
containers:
|
||||
- image: ghcr.io/wasmcloud/wasmcloud-operator:0.2.3
|
||||
- image: ghcr.io/wasmcloud/wasmcloud-operator:0.5.0
|
||||
imagePullPolicy: Always
|
||||
name: wasmcloud-operator
|
||||
ports:
|
||||
|
|
|
@ -0,0 +1,97 @@
|
|||
apiVersion: k8s.wasmcloud.dev/v1alpha1
|
||||
kind: WasmCloudHostConfig
|
||||
metadata:
|
||||
name: my-wasmcloud-cluster
|
||||
namespace: default
|
||||
spec:
|
||||
# Optional: Number of hosts (pods). Defaults to 1.
|
||||
hostReplicas: 1
|
||||
# Required: The lattice to connect the hosts to.
|
||||
lattice: default
|
||||
# Optional: Additional labels to apply to the host other than the defaults set in the controller.
|
||||
hostLabels:
|
||||
test: value
|
||||
cluster: kind
|
||||
# Required: Which wasmCloud version to use.
|
||||
version: "1.0.4"
|
||||
# Optional: The image to use for the wasmCloud host.
|
||||
# If provided, the 'version' field will be ignored.
|
||||
image: "registry/wasmcloud:tag"
|
||||
# Optional: The image to use for the NATS leaf that is deployed alongside the wasmCloud host.
|
||||
# If not provided, the default upstream image will be used.
|
||||
natsLeafImage: "registry/nats:tag"
|
||||
# Optional. The name of a secret containing a set of NATS credentials under 'nats.creds' key.
|
||||
secretName: "wasmcloud-host-nats-secret"
|
||||
# Optional: Enable structured logging for host logs. Defaults to "false".
|
||||
enableStructuredLogging: true
|
||||
# Optional: The name of a secret containing the registry credentials.
|
||||
# See https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/#create-a-secret-by-providing-credentials-on-the-command-line
|
||||
registryCredentialsSecret: "wasmcloud-pull-secret"
|
||||
# Optional: The control topic prefix to use for the host. Defaults to "wasmbus.ctl"
|
||||
controlTopicPrefix: "wasmbus.custom-ctl"
|
||||
# Optional: The leaf node domain to use for the NATS sidecar. Defaults to "leaf".
|
||||
leafNodeDomain: "custom-leaf"
|
||||
# Optional: Enable the config service for this host. Defaults to "false".
|
||||
# Makes wasmCloud host issue requests to a config service on startup.
|
||||
configServiceEnabled: true
|
||||
# Optional: The log level to use for the host. Defaults to "INFO".
|
||||
logLevel: INFO
|
||||
# Optional: The address of the NATS server to connect to. Defaults to "nats://nats.default.svc.cluster.local".
|
||||
natsAddress: nats://nats.default.svc.cluster.local
|
||||
# Optional: Allow the host to deploy using the latest tag on OCI components or providers. Defaults to "false".
|
||||
allowLatest: true
|
||||
# Optional: Allow the host to pull artifacts from OCI registries insecurely.
|
||||
allowedInsecure:
|
||||
- "localhost:5001"
|
||||
- "kind-registry:5000"
|
||||
# Optional: Policy service configuration.
|
||||
policyService:
|
||||
# If provided, enables policy checks on start actions and component invocations.
|
||||
topic: "wasmcloud.policy"
|
||||
# If provided, allows the host to subscribe to updates on past policy decisions. Requires 'topic' above to be set.
|
||||
changesTopic: "wasmcloud.policy.changes"
|
||||
# If provided, allows setting a custom timeout for requesting policy decisions. Defaults to 1000. Requires 'topic' to be set.
|
||||
timeoutMs: 10000
|
||||
# Optional: Observability options for configuring the OpenTelemetry integration.
|
||||
observability:
|
||||
# NOTE: Enables all signals (logs/metrics/traces) at once. Set it to 'false' and enable each signal individually in case you don't need all of them.
|
||||
enable: true
|
||||
endpoint: "otel-collector.svc"
|
||||
# Either 'grpc' or 'http'
|
||||
protocol: "http"
|
||||
logs:
|
||||
enable: false
|
||||
endpoint: "logs-specific-otel-collector.svc"
|
||||
metrics:
|
||||
enable: false
|
||||
endpoint: "metrics-specific-otel-collector.svc"
|
||||
traces:
|
||||
enable: false
|
||||
endpoint: "traces-specific-otel-collector.svc"
|
||||
# Optional: Subject prefix that will be used by the host to query for wasmCloud Secrets.
|
||||
# See https://wasmcloud.com/docs/concepts/secrets for more context
|
||||
secretsTopicPrefix: "wasmcloud.secrets"
|
||||
# Optional: The maximum amount of memory bytes that a component can allocate.
|
||||
maxLinearMemoryBytes: 20000000
|
||||
# Optional: Additional options to control how the underlying wasmCloud hosts are scheduled in Kubernetes.
|
||||
# This includes setting resource requirements for the nats and wasmCloud host
|
||||
# containers along with any additional pot template settings.
|
||||
schedulingOptions:
|
||||
# Optional: Enable the following to run the wasmCloud hosts as a DaemonSet. Defaults to "false".
|
||||
daemonset: true
|
||||
# Optional: Set the resource requirements for the nats and wasmCloud host containers.
|
||||
# See https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ for valid values
|
||||
resources:
|
||||
nats:
|
||||
requests:
|
||||
cpu: "1"
|
||||
wasmCloudHost:
|
||||
requests:
|
||||
cpu: "1"
|
||||
# Optional: Any additional pod template settings to apply to the wasmCloud host pods.
|
||||
# See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.29/#podspec-v1-core for all valid options.
|
||||
# Note that you *cannot* set the `containers` field here as it is managed by the controller.
|
||||
podTemplateAdditions:
|
||||
spec:
|
||||
nodeSelector:
|
||||
kubernetes.io/os: linux
|
|
@ -0,0 +1,87 @@
|
|||
# Example setup
|
||||
|
||||
This example shows the bare minimum requirements to deploy applications on wasmCloud.
|
||||
|
||||
It relies on the Kubernetes `default` namespace for simplicity.
|
||||
|
||||
## Install [NATS](https://github.com/nats-io/nats-server)
|
||||
|
||||
```bash
|
||||
helm repo add nats https://nats-io.github.io/k8s/helm/charts/
|
||||
helm upgrade --install -f nats-values.yaml nats nats/nats
|
||||
```
|
||||
|
||||
Validate installation with:
|
||||
|
||||
```bash
|
||||
# make sure pods are ready
|
||||
kubectl rollout status deploy,sts -l app.kubernetes.io/instance=nats
|
||||
```
|
||||
|
||||
## Install wasmCloud Application Deployment Manager - [wadm](https://github.com/wasmCloud/wadm)
|
||||
|
||||
```sh
|
||||
helm install wadm -f wadm-values.yaml oci://ghcr.io/wasmcloud/charts/wadm
|
||||
```
|
||||
|
||||
Validate installation with:
|
||||
|
||||
```bash
|
||||
# make sure pods are ready
|
||||
kubectl rollout status deploy -l app.kubernetes.io/instance=wadm
|
||||
```
|
||||
|
||||
## Install the operator
|
||||
|
||||
```sh
|
||||
kubectl apply -k ../../deploy/base
|
||||
```
|
||||
|
||||
Validate installation with:
|
||||
|
||||
```bash
|
||||
# make sure pods are ready
|
||||
kubectl rollout status deploy -l app=wasmcloud-operator -n wasmcloud-operator
|
||||
# apiservice should be available
|
||||
kubectl get apiservices.apiregistration.k8s.io v1beta1.core.oam.dev
|
||||
```
|
||||
|
||||
## Create wasmcloud cluster
|
||||
|
||||
```bash
|
||||
kubectl apply -f wasmcloud-host.yaml
|
||||
```
|
||||
|
||||
Check wasmCloud host status with:
|
||||
|
||||
```bash
|
||||
kubectl describe wasmcloudhostconfig wasmcloud-host
|
||||
```
|
||||
|
||||
## Managing applications using kubectl
|
||||
|
||||
Install the rust hello world application:
|
||||
|
||||
```bash
|
||||
kubectl apply -f hello-world-application.yaml
|
||||
```
|
||||
|
||||
Check application status with:
|
||||
|
||||
```bash
|
||||
kubectl get applications
|
||||
```
|
||||
|
||||
## Managing applications with wash
|
||||
|
||||
Port forward into the NATS cluster. 4222 = NATS Service, 4223 = NATS Websockets
|
||||
|
||||
```bash
|
||||
kubectl port-forward svc/nats 4222:4222 4223:4223
|
||||
```
|
||||
|
||||
In another shell:
|
||||
|
||||
```bash
|
||||
wash app list
|
||||
```
|
|
@ -0,0 +1,53 @@
|
|||
apiVersion: core.oam.dev/v1beta1
|
||||
kind: Application
|
||||
metadata:
|
||||
name: hello-world
|
||||
annotations:
|
||||
version: v0.0.1
|
||||
description: "HTTP hello world demo in Rust, using the WebAssembly Component Model and WebAssembly Interfaces Types (WIT)"
|
||||
wasmcloud.dev/authors: wasmCloud team
|
||||
wasmcloud.dev/source-url: https://github.com/wasmCloud/wasmCloud/blob/main/examples/rusg/components/http-hello-world/wadm.yaml
|
||||
wasmcloud.dev/readme-md-url: https://github.com/wasmCloud/wasmCloud/blob/main/examples/rusg/components/http-hello-world/README.md
|
||||
wasmcloud.dev/homepage: https://github.com/wasmCloud/wasmCloud/tree/main/examples/rusg/components/http-hello-world
|
||||
wasmcloud.dev/categories: |
|
||||
http,http-server,rust,hello-world,example
|
||||
spec:
|
||||
components:
|
||||
- name: http-component
|
||||
type: component
|
||||
properties:
|
||||
image: ghcr.io/wasmcloud/components/http-hello-world-rust:0.1.0
|
||||
traits:
|
||||
# Govern the spread/scheduling of the component
|
||||
- type: daemonscaler
|
||||
properties:
|
||||
replicas: 100
|
||||
|
||||
# Add a capability provider that enables HTTP access
|
||||
- name: httpserver
|
||||
type: capability
|
||||
properties:
|
||||
image: ghcr.io/wasmcloud/http-server:0.21.0
|
||||
traits:
|
||||
# Establish a unidirectional link from this http server provider (the "source")
|
||||
# to the `http-component` component (the "target") so the component can handle incoming HTTP requests.
|
||||
#
|
||||
# The source (this provider) is configured such that the HTTP server listens on 0.0.0.0:8000.
|
||||
# When running the application on Kubernetes with the wasmCloud operator, you can change the
|
||||
# port but the address must be 0.0.0.0.
|
||||
- type: link
|
||||
properties:
|
||||
target: http-component
|
||||
namespace: wasi
|
||||
package: http
|
||||
interfaces: [incoming-handler]
|
||||
source_config:
|
||||
- name: default-http
|
||||
properties:
|
||||
address: 0.0.0.0:8000
|
||||
# When running the application on Kubernetes with the wasmCloud operator,
|
||||
# the operator automatically creates a Kubernetes service for applications that use
|
||||
# the httpserver provider with a daemonscaler.
|
||||
- type: daemonscaler
|
||||
properties:
|
||||
replicas: 1
|
|
@ -0,0 +1,16 @@
|
|||
config:
|
||||
cluster:
|
||||
enabled: true
|
||||
replicas: 3
|
||||
leafnodes:
|
||||
enabled: true
|
||||
websocket:
|
||||
enabled: true
|
||||
port: 4223
|
||||
jetstream:
|
||||
enabled: true
|
||||
fileStore:
|
||||
pvc:
|
||||
size: 10Gi
|
||||
merge:
|
||||
domain: default
|
|
@ -0,0 +1,6 @@
|
|||
wadm:
|
||||
image:
|
||||
tag: v0.12.2
|
||||
config:
|
||||
nats:
|
||||
server: "nats.default.svc.cluster.local:4222"
|
|
@ -0,0 +1,7 @@
|
|||
apiVersion: k8s.wasmcloud.dev/v1alpha1
|
||||
kind: WasmCloudHostConfig
|
||||
metadata:
|
||||
name: wasmcloud-host
|
||||
spec:
|
||||
lattice: default
|
||||
version: "1.0.4"
|
57
sample.yaml
57
sample.yaml
|
@ -1,57 +0,0 @@
|
|||
apiVersion: k8s.wasmcloud.dev/v1alpha1
|
||||
kind: WasmCloudHostConfig
|
||||
metadata:
|
||||
name: my-wasmcloud-cluster
|
||||
namespace: default
|
||||
spec:
|
||||
hostReplicas: 1
|
||||
issuers:
|
||||
- CDKF6OKPOBQKAX57UOXO7SCHURTOZWKWIVPC2HFJTGFXY5VJX44ECEHH
|
||||
# The lattice to connect the hosts to
|
||||
lattice: default
|
||||
# Additional labels to apply to the host other than the defaults set in the controller
|
||||
hostLabels:
|
||||
test: value
|
||||
cluster: kind
|
||||
# Which wasmCloud version to use
|
||||
version: "1.0.2"
|
||||
# The name of a secret in the same namespace that provides the required secrets.
|
||||
secretName: cluster-secrets
|
||||
logLevel: INFO
|
||||
natsAddress: nats://nats-cluster.default.svc.cluster.local
|
||||
################################################
|
||||
# Additional options that can be set for hosts:
|
||||
################################################
|
||||
# allowLatest: true
|
||||
# allowedInsecure:
|
||||
# - "localhost:5001"
|
||||
# - "kind-registry:5000"
|
||||
# Policy service configuration
|
||||
# policyService:
|
||||
# topic: "wasmcloud.policy"
|
||||
# changesTopic: "bar"
|
||||
# timeoutMs: 10000
|
||||
# Additional options to control how the underlying wasmCloud hosts are scheduled in Kubernetes.
|
||||
# This includes setting resource requirements for the nats and wasmCloud host
|
||||
# containers along with any additional pot template settings.
|
||||
# schedulingOptions:
|
||||
# Enable the following to run the wasmCloud hosts as a DaemonSet
|
||||
# daemonset: true
|
||||
# Set the resource requirements for the nats and wasmCloud host containers.
|
||||
# resources:
|
||||
# nats:
|
||||
# requests:
|
||||
# cpu: 100m
|
||||
# wasmCloudHost:
|
||||
# requests:
|
||||
# cpu: 100m
|
||||
# Any additional pod template settings to apply to the wasmCloud host pods.
|
||||
# See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.29/#podspec-v1-core for all valid options.
|
||||
# Note that you *cannot* set the `containers` field here as it is managed by the controller.
|
||||
# podTemplateAdditions:
|
||||
# spec:
|
||||
# env:
|
||||
# - name: HOST_IP
|
||||
# value: spec.hostIP
|
||||
# nodeSelector:
|
||||
# kubernetes.io/os: linux
|
|
@ -1,21 +1,22 @@
|
|||
use crate::{
|
||||
config::OperatorConfig, docker_secret::DockerConfigJson, resources::application::get_client,
|
||||
services::ServiceWatcher, Error, Result,
|
||||
config::OperatorConfig, docker_secret::DockerConfigJson,
|
||||
resources::application::get_nats_client, services::ServiceWatcher, Error, Result,
|
||||
};
|
||||
use anyhow::bail;
|
||||
use futures::StreamExt;
|
||||
use handlebars::Handlebars;
|
||||
use k8s_openapi::api::apps::v1::{DaemonSet, DaemonSetSpec, Deployment, DeploymentSpec};
|
||||
use k8s_openapi::api::core::v1::{
|
||||
ConfigMap, ConfigMapVolumeSource, Container, ContainerPort, EnvVar, EnvVarSource, ExecAction,
|
||||
Lifecycle, LifecycleHandler, Pod, PodSpec, PodTemplateSpec, Secret, SecretKeySelector,
|
||||
SecretVolumeSource, Service, ServiceAccount, ServicePort, ServiceSpec, Volume, VolumeMount,
|
||||
ConfigMap, ConfigMapVolumeSource, Container, ContainerPort, EnvVar, ExecAction, Lifecycle,
|
||||
LifecycleHandler, Pod, PodSpec, PodTemplateSpec, Secret, SecretVolumeSource, Service,
|
||||
ServiceAccount, ServicePort, ServiceSpec, Volume, VolumeMount,
|
||||
};
|
||||
use k8s_openapi::api::rbac::v1::{PolicyRule, Role, RoleBinding, RoleRef, Subject};
|
||||
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
|
||||
use k8s_openapi::DeepMerge;
|
||||
use kube::{
|
||||
api::{Api, ObjectMeta, Patch, PatchParams},
|
||||
client::Client,
|
||||
client::Client as KubeClient,
|
||||
runtime::{
|
||||
controller::{Action, Config, Controller},
|
||||
finalizer::{finalizer, Event as Finalizer},
|
||||
|
@ -31,7 +32,7 @@ use std::sync::Arc;
|
|||
use tokio::{sync::RwLock, time::Duration};
|
||||
use tracing::{debug, info, warn};
|
||||
use wasmcloud_operator_types::v1alpha1::{
|
||||
AppStatus, WasmCloudHostConfig, WasmCloudHostConfigStatus,
|
||||
AppStatus, WasmCloudHostConfig, WasmCloudHostConfigSpec, WasmCloudHostConfigStatus,
|
||||
};
|
||||
|
||||
pub static CLUSTER_CONFIG_FINALIZER: &str = "operator.k8s.wasmcloud.dev/wasmcloud-host-config";
|
||||
|
@ -48,7 +49,7 @@ pub const WASMCLOUD_OPERATOR_MANAGED_BY_LABEL_REQUIREMENT: &str =
|
|||
"app.kubernetes.io/managed-by=wasmcloud-operator";
|
||||
|
||||
pub struct Context {
|
||||
pub client: Client,
|
||||
pub client: KubeClient,
|
||||
pub wasmcloud_config: OperatorConfig,
|
||||
pub nats_creds: Arc<RwLock<HashMap<NameNamespace, SecretString>>>,
|
||||
service_watcher: ServiceWatcher,
|
||||
|
@ -56,7 +57,6 @@ pub struct Context {
|
|||
|
||||
#[derive(Clone, Default)]
|
||||
pub struct Secrets {
|
||||
pub wasmcloud_cluster_seed: String,
|
||||
pub nats_creds: Option<String>,
|
||||
}
|
||||
|
||||
|
@ -69,18 +69,9 @@ impl Secrets {
|
|||
);
|
||||
};
|
||||
let data = secret.data.as_ref().unwrap();
|
||||
let wasmcloud_cluster_seed = data.get("WASMCLOUD_CLUSTER_SEED");
|
||||
let nats_creds = data.get("nats.creds");
|
||||
|
||||
if wasmcloud_cluster_seed.is_none() {
|
||||
bail!(
|
||||
"Secret {} has no WASMCLOUD_CLUSTER_SEED",
|
||||
secret.metadata.name.as_ref().unwrap()
|
||||
);
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
wasmcloud_cluster_seed: from_utf8(&wasmcloud_cluster_seed.unwrap().0)?.to_string(),
|
||||
nats_creds: match &nats_creds {
|
||||
Some(c) => from_utf8(&c.0).ok().map(|s| s.to_string()),
|
||||
None => None,
|
||||
|
@ -114,38 +105,42 @@ pub async fn reconcile(cluster: Arc<WasmCloudHostConfig>, ctx: Arc<Context>) ->
|
|||
}
|
||||
|
||||
async fn reconcile_crd(config: &WasmCloudHostConfig, ctx: Arc<Context>) -> Result<Action> {
|
||||
let client = ctx.client.clone();
|
||||
let kube_client = ctx.client.clone();
|
||||
let ns = config.namespace().unwrap();
|
||||
let name = config.name_any();
|
||||
let config: Api<WasmCloudHostConfig> = Api::namespaced(client.clone(), &ns);
|
||||
let config: Api<WasmCloudHostConfig> = Api::namespaced(kube_client.clone(), &ns);
|
||||
let mut cfg = config.get(&name).await?;
|
||||
let secrets = Api::<Secret>::namespaced(client, &ns);
|
||||
let secrets_api = Api::<Secret>::namespaced(kube_client, &ns);
|
||||
|
||||
let secret = secrets.get(&cfg.spec.secret_name).await.map_err(|e| {
|
||||
warn!("Failed to read secrets: {}", e);
|
||||
e
|
||||
})?;
|
||||
let s = Secrets::from_k8s_secret(&secret).map_err(|e| {
|
||||
warn!("Failed to read secrets: {}", e);
|
||||
Error::SecretError(format!(
|
||||
"Failed to read all secrets from {}: {}",
|
||||
secret.metadata.name.unwrap(),
|
||||
let mut secrets = Secrets::default();
|
||||
|
||||
if let Some(secret_name) = &cfg.spec.secret_name {
|
||||
let kube_secrets = secrets_api.get(secret_name).await.map_err(|e| {
|
||||
warn!("Failed to read secrets: {}", e);
|
||||
e
|
||||
))
|
||||
})?;
|
||||
})?;
|
||||
secrets = Secrets::from_k8s_secret(&kube_secrets).map_err(|e| {
|
||||
warn!("Failed to read secrets: {}", e);
|
||||
Error::SecretError(format!(
|
||||
"Failed to read all secrets from {}: {}",
|
||||
kube_secrets.metadata.name.unwrap(),
|
||||
e
|
||||
))
|
||||
})?;
|
||||
|
||||
if let Err(e) = configmap(&cfg, ctx.clone(), s.nats_creds.is_some()).await {
|
||||
if let Some(nats_creds) = &secrets.nats_creds {
|
||||
if let Err(e) = store_nats_creds(&cfg, ctx.clone(), nats_creds.clone()).await {
|
||||
warn!("Failed to reconcile secret: {}", e);
|
||||
return Err(e);
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(e) = configmap(&cfg, ctx.clone(), secrets.nats_creds.is_some()).await {
|
||||
warn!("Failed to reconcile configmap: {}", e);
|
||||
return Err(e);
|
||||
};
|
||||
|
||||
if let Some(nats_creds) = &s.nats_creds {
|
||||
if let Err(e) = store_nats_creds(&cfg, ctx.clone(), nats_creds.clone()).await {
|
||||
warn!("Failed to reconcile secret: {}", e);
|
||||
return Err(e);
|
||||
};
|
||||
}
|
||||
|
||||
// TODO(protochron) these could be split out into separate functions.
|
||||
if let Err(e) = configure_auth(&cfg, ctx.clone()).await {
|
||||
warn!("Failed to configure auth: {}", e);
|
||||
|
@ -162,7 +157,7 @@ async fn reconcile_crd(config: &WasmCloudHostConfig, ctx: Arc<Context>) -> Resul
|
|||
return Err(e);
|
||||
};
|
||||
|
||||
let nc = s.nats_creds.map(SecretString::new);
|
||||
let nc = secrets.nats_creds.map(SecretString::new);
|
||||
let apps = crate::resources::application::list_apps(
|
||||
&cfg.spec.nats_address,
|
||||
&cfg.spec.nats_client_port,
|
||||
|
@ -201,7 +196,7 @@ async fn reconcile_crd(config: &WasmCloudHostConfig, ctx: Arc<Context>) -> Resul
|
|||
})?;
|
||||
|
||||
// Start the watcher so that services are automatically created in the cluster.
|
||||
let nats_client = get_client(
|
||||
let nats_client = get_nats_client(
|
||||
&cfg.spec.nats_address,
|
||||
&cfg.spec.nats_client_port,
|
||||
ctx.nats_creds.clone(),
|
||||
|
@ -253,22 +248,10 @@ async fn cleanup(config: &WasmCloudHostConfig, ctx: Arc<Context>) -> Result<Acti
|
|||
Ok(Action::await_change())
|
||||
}
|
||||
|
||||
fn pod_template(config: &WasmCloudHostConfig, _ctx: Arc<Context>) -> PodTemplateSpec {
|
||||
async fn pod_template(config: &WasmCloudHostConfig, ctx: Arc<Context>) -> Result<PodTemplateSpec> {
|
||||
let labels = pod_labels(config);
|
||||
|
||||
let mut wasmcloud_env = vec![
|
||||
EnvVar {
|
||||
name: "WASMCLOUD_CLUSTER_SEED".to_string(),
|
||||
value_from: Some(EnvVarSource {
|
||||
secret_key_ref: Some(SecretKeySelector {
|
||||
name: Some(config.spec.secret_name.clone()),
|
||||
key: "WASMCLOUD_CLUSTER_SEED".to_string(),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
},
|
||||
EnvVar {
|
||||
name: "WASMCLOUD_STRUCTURED_LOGGING_ENABLED".to_string(),
|
||||
value: Some(
|
||||
|
@ -301,11 +284,6 @@ fn pod_template(config: &WasmCloudHostConfig, _ctx: Arc<Context>) -> PodTemplate
|
|||
value: Some(config.spec.lattice.clone()),
|
||||
..Default::default()
|
||||
},
|
||||
EnvVar {
|
||||
name: "WASMCLOUD_CLUSTER_ISSUERS".to_string(),
|
||||
value: Some(config.spec.issuers.join(",")),
|
||||
..Default::default()
|
||||
},
|
||||
EnvVar {
|
||||
name: "WASMCLOUD_NATS_HOST".to_string(),
|
||||
value: Some("127.0.0.1".to_string()),
|
||||
|
@ -396,13 +374,34 @@ fn pod_template(config: &WasmCloudHostConfig, _ctx: Arc<Context>) -> PodTemplate
|
|||
}
|
||||
}
|
||||
|
||||
if let Some(secrets_prefix) = &config.spec.secrets_topic_prefix {
|
||||
wasmcloud_env.push(EnvVar {
|
||||
name: "WASMCLOUD_SECRETS_TOPIC".to_string(),
|
||||
value: Some(secrets_prefix.clone()),
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
|
||||
if let Some(max_linear_memory_bytes) = &config.spec.max_linear_memory_bytes {
|
||||
wasmcloud_env.push(EnvVar {
|
||||
name: "WASMCLOUD_MAX_LINEAR_MEMORY".to_string(),
|
||||
value: Some(max_linear_memory_bytes.to_string()),
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
|
||||
let mut wasmcloud_args = configure_observability(&config.spec);
|
||||
|
||||
let mut nats_resources: Option<k8s_openapi::api::core::v1::ResourceRequirements> = None;
|
||||
let mut wasmcloud_resources: Option<k8s_openapi::api::core::v1::ResourceRequirements> = None;
|
||||
if let Some(scheduling_options) = &config.spec.scheduling_options {
|
||||
if let Some(resources) = &scheduling_options.resources {
|
||||
nats_resources = resources.nats.clone();
|
||||
wasmcloud_resources = resources.wasmcloud.clone();
|
||||
}
|
||||
if let Some(resources) = &config
|
||||
.spec
|
||||
.scheduling_options
|
||||
.as_ref()
|
||||
.and_then(|so| so.resources.clone())
|
||||
{
|
||||
nats_resources.clone_from(&resources.nats);
|
||||
wasmcloud_resources.clone_from(&resources.wasmcloud);
|
||||
}
|
||||
|
||||
let image = match &config.spec.image {
|
||||
|
@ -410,10 +409,148 @@ fn pod_template(config: &WasmCloudHostConfig, _ctx: Arc<Context>) -> PodTemplate
|
|||
None => format!("ghcr.io/wasmcloud/wasmcloud:{}", config.spec.version),
|
||||
};
|
||||
|
||||
let containers = vec![
|
||||
let leaf_image = match &config.spec.nats_leaf_image {
|
||||
Some(img) => img.clone(),
|
||||
None => "nats:2.10-alpine".to_string(),
|
||||
};
|
||||
|
||||
let mut volumes = vec![Volume {
|
||||
name: "nats-config".to_string(),
|
||||
config_map: Some(ConfigMapVolumeSource {
|
||||
name: Some(config.name_any()),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
}];
|
||||
|
||||
let mut nats_volume_mounts = vec![VolumeMount {
|
||||
name: "nats-config".to_string(),
|
||||
mount_path: "/nats/nats.conf".to_string(),
|
||||
read_only: Some(true),
|
||||
sub_path: Some("nats.conf".to_string()),
|
||||
..Default::default()
|
||||
}];
|
||||
|
||||
let mut wasm_volume_mounts: Vec<VolumeMount> = vec![];
|
||||
if let Some(authorities) = &config
|
||||
.spec
|
||||
.certificates
|
||||
.clone()
|
||||
.and_then(|certificates| certificates.authorities)
|
||||
{
|
||||
for authority in authorities.iter() {
|
||||
let authority_name = authority.name.clone();
|
||||
let volume_name = format!("ca-{authority_name}");
|
||||
let volume_path = format!("/wasmcloud/certificates/{volume_name}");
|
||||
let mut items: Vec<String> = vec![];
|
||||
|
||||
// The Volume interface is quite broad. Permit only known types.
|
||||
if authority.secret.is_none() && authority.config_map.is_none() {
|
||||
return Err(Error::CertificateError(format!(
|
||||
"'{authority_name}' has to be a Configmap or Secret"
|
||||
)));
|
||||
}
|
||||
|
||||
// secrets
|
||||
if let Some(secret_ref) = &authority.secret {
|
||||
let secret_name = match &secret_ref.secret_name {
|
||||
Some(s) => s,
|
||||
None => {
|
||||
return Err(Error::CertificateError(format!(
|
||||
"Missing secret name for authority '{authority_name}'"
|
||||
)))
|
||||
}
|
||||
};
|
||||
|
||||
items = discover_secret_certificates(
|
||||
&config.namespace().unwrap_or_default(),
|
||||
secret_name,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
if items.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
volumes.push(Volume {
|
||||
name: volume_name.clone(),
|
||||
secret: Some(SecretVolumeSource {
|
||||
secret_name: Some(secret_name.to_string()),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
});
|
||||
}
|
||||
|
||||
// configmaps
|
||||
if let Some(configmap_ref) = &authority.config_map {
|
||||
let configmap_name = match &configmap_ref.name {
|
||||
Some(s) => s,
|
||||
None => {
|
||||
return Err(Error::CertificateError(format!(
|
||||
"Missing configmap name for authority '{authority_name}'"
|
||||
)))
|
||||
}
|
||||
};
|
||||
items = discover_configmap_certificates(
|
||||
&config.namespace().unwrap_or_default(),
|
||||
configmap_name,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
if items.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
volumes.push(Volume {
|
||||
name: volume_name.clone(),
|
||||
config_map: Some(ConfigMapVolumeSource {
|
||||
name: Some(configmap_name.to_string()),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
});
|
||||
}
|
||||
|
||||
wasm_volume_mounts.push(VolumeMount {
|
||||
name: volume_name.clone(),
|
||||
read_only: Some(true),
|
||||
mount_path: volume_path.clone(),
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
for item in items {
|
||||
wasmcloud_args.push("--tls-ca-path".to_string());
|
||||
wasmcloud_args.push(format!("{volume_path}/{item}"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(secret_name) = &config.spec.secret_name {
|
||||
volumes.push(Volume {
|
||||
name: "nats-creds".to_string(),
|
||||
secret: Some(SecretVolumeSource {
|
||||
secret_name: Some(secret_name.clone()),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
nats_volume_mounts.push(VolumeMount {
|
||||
name: "nats-creds".to_string(),
|
||||
mount_path: "/nats/nats.creds".to_string(),
|
||||
sub_path: Some("nats.creds".to_string()),
|
||||
read_only: Some(true),
|
||||
..Default::default()
|
||||
});
|
||||
}
|
||||
|
||||
let mut containers = vec![
|
||||
Container {
|
||||
name: "nats-leaf".to_string(),
|
||||
image: Some("nats:2.10-alpine".to_string()),
|
||||
image: Some(leaf_image),
|
||||
args: Some(vec![
|
||||
"-js".to_string(),
|
||||
"--config".to_string(),
|
||||
|
@ -435,51 +572,37 @@ fn pod_template(config: &WasmCloudHostConfig, _ctx: Arc<Context>) -> PodTemplate
|
|||
..Default::default()
|
||||
}),
|
||||
resources: nats_resources,
|
||||
volume_mounts: Some(vec![
|
||||
VolumeMount {
|
||||
name: "nats-config".to_string(),
|
||||
mount_path: "/nats/nats.conf".to_string(),
|
||||
read_only: Some(true),
|
||||
sub_path: Some("nats.conf".to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
VolumeMount {
|
||||
name: "nats-creds".to_string(),
|
||||
mount_path: "/nats/nats.creds".to_string(),
|
||||
sub_path: Some("nats.creds".to_string()),
|
||||
read_only: Some(true),
|
||||
..Default::default()
|
||||
},
|
||||
]),
|
||||
volume_mounts: Some(nats_volume_mounts),
|
||||
..Default::default()
|
||||
},
|
||||
Container {
|
||||
name: "wasmcloud-host".to_string(),
|
||||
image: Some(image),
|
||||
command: Some(vec!["wasmcloud".to_string()]),
|
||||
args: Some(wasmcloud_args),
|
||||
env: Some(wasmcloud_env),
|
||||
resources: wasmcloud_resources,
|
||||
volume_mounts: Some(wasm_volume_mounts),
|
||||
..Default::default()
|
||||
},
|
||||
];
|
||||
|
||||
let mut volumes = vec![
|
||||
Volume {
|
||||
name: "nats-config".to_string(),
|
||||
config_map: Some(ConfigMapVolumeSource {
|
||||
name: Some(config.name_any()),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
},
|
||||
Volume {
|
||||
name: "nats-creds".to_string(),
|
||||
secret: Some(SecretVolumeSource {
|
||||
secret_name: Some(config.spec.secret_name.clone()),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
},
|
||||
];
|
||||
if let Some(cta) = &config
|
||||
.spec
|
||||
.scheduling_options
|
||||
.as_ref()
|
||||
.and_then(|so| so.container_template_additions.clone())
|
||||
{
|
||||
if let Some(mut nats_container) = cta.nats.clone() {
|
||||
nats_container.merge_from(containers[0].clone());
|
||||
containers[0] = nats_container;
|
||||
}
|
||||
if let Some(mut wasmcloud_container) = cta.wasmcloud.clone() {
|
||||
wasmcloud_container.merge_from(containers[1].clone());
|
||||
containers[1] = wasmcloud_container;
|
||||
}
|
||||
}
|
||||
|
||||
let service_account = config.name_any();
|
||||
let mut template = PodTemplateSpec {
|
||||
metadata: Some(ObjectMeta {
|
||||
|
@ -489,56 +612,134 @@ fn pod_template(config: &WasmCloudHostConfig, _ctx: Arc<Context>) -> PodTemplate
|
|||
spec: Some(PodSpec {
|
||||
service_account: Some(config.name_any()),
|
||||
containers: containers.clone(),
|
||||
volumes: Some(vec![
|
||||
Volume {
|
||||
name: "nats-config".to_string(),
|
||||
config_map: Some(ConfigMapVolumeSource {
|
||||
name: Some(config.name_any()),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
},
|
||||
Volume {
|
||||
name: "nats-creds".to_string(),
|
||||
secret: Some(SecretVolumeSource {
|
||||
secret_name: Some(config.spec.secret_name.clone()),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
},
|
||||
]),
|
||||
volumes: Some(volumes.clone()),
|
||||
..Default::default()
|
||||
}),
|
||||
};
|
||||
|
||||
if let Some(scheduling_options) = &config.spec.scheduling_options {
|
||||
if let Some(pod_overrides) = &scheduling_options.pod_template_additions {
|
||||
let mut overrides = pod_overrides.clone();
|
||||
overrides.service_account_name = Some(service_account);
|
||||
overrides.containers = containers.clone();
|
||||
if let Some(vols) = overrides.volumes {
|
||||
volumes.extend(vols);
|
||||
}
|
||||
overrides.volumes = Some(volumes);
|
||||
template.spec = Some(overrides);
|
||||
if let Some(pod_overrides) = &config
|
||||
.spec
|
||||
.scheduling_options
|
||||
.as_ref()
|
||||
.and_then(|so| so.pod_template_additions.clone())
|
||||
{
|
||||
let mut overrides = pod_overrides.clone();
|
||||
overrides.service_account_name = Some(service_account);
|
||||
overrides.containers.clone_from(&containers);
|
||||
if let Some(vols) = overrides.volumes {
|
||||
volumes.extend(vols);
|
||||
}
|
||||
overrides.volumes = Some(volumes);
|
||||
template.spec = Some(overrides);
|
||||
};
|
||||
template
|
||||
Ok(template)
|
||||
}
|
||||
|
||||
fn deployment_spec(config: &WasmCloudHostConfig, ctx: Arc<Context>) -> DeploymentSpec {
|
||||
let pod_template = pod_template(config, ctx);
|
||||
async fn discover_configmap_certificates(
|
||||
namespace: &str,
|
||||
name: &str,
|
||||
ctx: &Context,
|
||||
) -> Result<Vec<String>> {
|
||||
let kube_client = ctx.client.clone();
|
||||
let api = Api::<ConfigMap>::namespaced(kube_client, namespace);
|
||||
let mut certs = Vec::new();
|
||||
|
||||
let raw_config_map = api.get(name).await?;
|
||||
|
||||
if let Some(raw_data) = raw_config_map.data {
|
||||
for (key, _) in raw_data {
|
||||
if key.ends_with(".crt") {
|
||||
certs.push(key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(certs)
|
||||
}
|
||||
|
||||
async fn discover_secret_certificates(
|
||||
namespace: &str,
|
||||
name: &str,
|
||||
ctx: &Context,
|
||||
) -> Result<Vec<String>> {
|
||||
let kube_client = ctx.client.clone();
|
||||
let api = Api::<Secret>::namespaced(kube_client, namespace);
|
||||
let mut certs = Vec::new();
|
||||
|
||||
let raw_config_map = api.get(name).await?;
|
||||
|
||||
if let Some(raw_data) = raw_config_map.data {
|
||||
for (key, _) in raw_data {
|
||||
if key.ends_with(".crt") {
|
||||
certs.push(key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(certs)
|
||||
}
|
||||
|
||||
fn configure_observability(spec: &WasmCloudHostConfigSpec) -> Vec<String> {
|
||||
let mut args = Vec::<String>::new();
|
||||
if let Some(observability) = &spec.observability {
|
||||
if observability.enable {
|
||||
args.push("--enable-observability".to_string());
|
||||
}
|
||||
if !observability.endpoint.is_empty() {
|
||||
args.push("--override-observability-endpoint".to_string());
|
||||
args.push(observability.endpoint.clone());
|
||||
}
|
||||
if let Some(protocol) = &observability.protocol {
|
||||
args.push("--observability-protocol".to_string());
|
||||
args.push(protocol.to_string());
|
||||
}
|
||||
if let Some(traces) = &observability.traces {
|
||||
if traces.enable.unwrap_or(false) {
|
||||
args.push("--enable-traces".to_string())
|
||||
}
|
||||
if let Some(traces_endpoint) = &traces.endpoint {
|
||||
args.push("--override-traces-endpoint".to_string());
|
||||
args.push(traces_endpoint.to_owned());
|
||||
}
|
||||
}
|
||||
if let Some(metrics) = &observability.metrics {
|
||||
if metrics.enable.unwrap_or(false) {
|
||||
args.push("--enable-metrics".to_string())
|
||||
}
|
||||
if let Some(metrics_endpoint) = &metrics.endpoint {
|
||||
args.push("--override-metrics-endpoint".to_string());
|
||||
args.push(metrics_endpoint.to_owned());
|
||||
}
|
||||
}
|
||||
if let Some(logs) = &observability.logs {
|
||||
if logs.enable.unwrap_or(false) {
|
||||
args.push("--enable-logs".to_string())
|
||||
}
|
||||
if let Some(logs_endpoint) = &logs.endpoint {
|
||||
args.push("--override-logs-endpoint".to_string());
|
||||
args.push(logs_endpoint.to_owned());
|
||||
}
|
||||
}
|
||||
}
|
||||
args
|
||||
}
|
||||
|
||||
async fn deployment_spec(
|
||||
config: &WasmCloudHostConfig,
|
||||
ctx: Arc<Context>,
|
||||
) -> Result<DeploymentSpec> {
|
||||
let pod_template = pod_template(config, ctx).await?;
|
||||
let ls = LabelSelector {
|
||||
match_labels: Some(selector_labels(config)),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
DeploymentSpec {
|
||||
Ok(DeploymentSpec {
|
||||
replicas: Some(config.spec.host_replicas as i32),
|
||||
selector: ls,
|
||||
template: pod_template,
|
||||
..Default::default()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn pod_labels(config: &WasmCloudHostConfig) -> BTreeMap<String, String> {
|
||||
|
@ -565,18 +766,18 @@ fn selector_labels(config: &WasmCloudHostConfig) -> BTreeMap<String, String> {
|
|||
labels
|
||||
}
|
||||
|
||||
fn daemonset_spec(config: &WasmCloudHostConfig, ctx: Arc<Context>) -> DaemonSetSpec {
|
||||
let pod_template = pod_template(config, ctx);
|
||||
async fn daemonset_spec(config: &WasmCloudHostConfig, ctx: Arc<Context>) -> Result<DaemonSetSpec> {
|
||||
let pod_template = pod_template(config, ctx).await?;
|
||||
let ls = LabelSelector {
|
||||
match_labels: Some(selector_labels(config)),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
DaemonSetSpec {
|
||||
Ok(DaemonSetSpec {
|
||||
selector: ls,
|
||||
template: pod_template,
|
||||
..Default::default()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async fn configure_hosts(config: &WasmCloudHostConfig, ctx: Arc<Context>) -> Result<()> {
|
||||
|
@ -646,37 +847,39 @@ async fn configure_hosts(config: &WasmCloudHostConfig, ctx: Arc<Context>) -> Res
|
|||
];
|
||||
}
|
||||
|
||||
if let Some(scheduling_options) = &config.spec.scheduling_options {
|
||||
if scheduling_options.daemonset {
|
||||
let mut spec = daemonset_spec(config, ctx.clone());
|
||||
spec.template.spec.as_mut().unwrap().containers[1]
|
||||
.env
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.append(&mut env_vars);
|
||||
let ds = DaemonSet {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(config.name_any()),
|
||||
namespace: Some(config.namespace().unwrap()),
|
||||
owner_references: Some(vec![config.controller_owner_ref(&()).unwrap()]),
|
||||
labels: Some(common_labels()),
|
||||
..Default::default()
|
||||
},
|
||||
spec: Some(spec),
|
||||
if config
|
||||
.spec
|
||||
.scheduling_options
|
||||
.as_ref()
|
||||
.is_some_and(|so| so.daemonset)
|
||||
{
|
||||
let mut spec = daemonset_spec(config, ctx.clone()).await?;
|
||||
spec.template.spec.as_mut().unwrap().containers[1]
|
||||
.env
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.append(&mut env_vars);
|
||||
let ds = DaemonSet {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(config.name_any()),
|
||||
namespace: Some(config.namespace().unwrap()),
|
||||
owner_references: Some(vec![config.controller_owner_ref(&()).unwrap()]),
|
||||
labels: Some(common_labels()),
|
||||
..Default::default()
|
||||
};
|
||||
},
|
||||
spec: Some(spec),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let api =
|
||||
Api::<DaemonSet>::namespaced(ctx.client.clone(), &config.namespace().unwrap());
|
||||
api.patch(
|
||||
&config.name_any(),
|
||||
&PatchParams::apply(CLUSTER_CONFIG_FINALIZER),
|
||||
&Patch::Apply(ds),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
let api = Api::<DaemonSet>::namespaced(ctx.client.clone(), &config.namespace().unwrap());
|
||||
api.patch(
|
||||
&config.name_any(),
|
||||
&PatchParams::apply(CLUSTER_CONFIG_FINALIZER),
|
||||
&Patch::Apply(ds),
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
let mut spec = deployment_spec(config, ctx.clone());
|
||||
let mut spec = deployment_spec(config, ctx.clone()).await?;
|
||||
spec.template.spec.as_mut().unwrap().containers[1]
|
||||
.env
|
||||
.as_mut()
|
||||
|
@ -916,19 +1119,19 @@ fn error_policy(_object: Arc<WasmCloudHostConfig>, _error: &Error, _ctx: Arc<Con
|
|||
}
|
||||
|
||||
pub async fn run(state: State) -> anyhow::Result<()> {
|
||||
let client = Client::try_default().await?;
|
||||
let kube_client = KubeClient::try_default().await?;
|
||||
|
||||
let configs = Api::<WasmCloudHostConfig>::all(client.clone());
|
||||
let cms = Api::<ConfigMap>::all(client.clone());
|
||||
let deployments = Api::<Deployment>::all(client.clone());
|
||||
let secrets = Api::<Secret>::all(client.clone());
|
||||
let services = Api::<Service>::all(client.clone());
|
||||
let pods = Api::<Pod>::all(client.clone());
|
||||
let configs = Api::<WasmCloudHostConfig>::all(kube_client.clone());
|
||||
let cms = Api::<ConfigMap>::all(kube_client.clone());
|
||||
let deployments = Api::<Deployment>::all(kube_client.clone());
|
||||
let secrets = Api::<Secret>::all(kube_client.clone());
|
||||
let services = Api::<Service>::all(kube_client.clone());
|
||||
let pods = Api::<Pod>::all(kube_client.clone());
|
||||
|
||||
let watcher = ServiceWatcher::new(client.clone(), state.config.stream_replicas);
|
||||
let watcher = ServiceWatcher::new(kube_client.clone(), state.config.stream_replicas);
|
||||
let config = Config::default();
|
||||
let ctx = Context {
|
||||
client,
|
||||
client: kube_client,
|
||||
wasmcloud_config: state.config.clone(),
|
||||
nats_creds: state.nats_creds.clone(),
|
||||
service_watcher: watcher,
|
||||
|
|
|
@ -31,6 +31,9 @@ pub enum Error {
|
|||
#[error("Error retrieving secrets: {0}")]
|
||||
SecretError(String),
|
||||
|
||||
#[error("Certificate error: {0}")]
|
||||
CertificateError(String),
|
||||
|
||||
#[error("Error rendering template: {0}")]
|
||||
RenderError(#[from] RenderError),
|
||||
}
|
||||
|
|
22
src/main.rs
22
src/main.rs
|
@ -13,9 +13,10 @@ use kube::{
|
|||
client::Client,
|
||||
CustomResourceExt,
|
||||
};
|
||||
use opentelemetry::sdk::{
|
||||
trace::{self, RandomIdGenerator, Sampler},
|
||||
Resource as OTELResource,
|
||||
use opentelemetry::KeyValue;
|
||||
use opentelemetry_sdk::{
|
||||
trace::{RandomIdGenerator, Sampler},
|
||||
Resource,
|
||||
};
|
||||
use std::io::IsTerminal;
|
||||
use std::net::SocketAddr;
|
||||
|
@ -27,6 +28,13 @@ use wasmcloud_operator_types::v1alpha1::WasmCloudHostConfig;
|
|||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
let args = std::env::args().collect::<Vec<_>>();
|
||||
if args.iter().any(|arg| arg == "-V" || arg == "--version") {
|
||||
let version = version();
|
||||
println!("{} {version}", env!("CARGO_BIN_NAME"));
|
||||
std::process::exit(0);
|
||||
}
|
||||
|
||||
let tracing_enabled = std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT").is_ok();
|
||||
configure_tracing(tracing_enabled).map_err(|e| {
|
||||
error!("Failed to configure tracing: {}", e);
|
||||
|
@ -80,12 +88,12 @@ fn configure_tracing(enabled: bool) -> anyhow::Result<()> {
|
|||
.tracing()
|
||||
.with_exporter(opentelemetry_otlp::new_exporter().tonic())
|
||||
.with_trace_config(
|
||||
trace::config()
|
||||
opentelemetry_sdk::trace::config()
|
||||
.with_sampler(Sampler::AlwaysOn)
|
||||
.with_id_generator(RandomIdGenerator::default())
|
||||
.with_max_attributes_per_span(32)
|
||||
.with_max_events_per_span(32)
|
||||
.with_resource(OTELResource::new(vec![opentelemetry::KeyValue::new(
|
||||
.with_resource(Resource::new(vec![KeyValue::new(
|
||||
"service.name",
|
||||
"wasmcloud-operator",
|
||||
)])),
|
||||
|
@ -191,3 +199,7 @@ async fn install_crd(client: &Client) -> anyhow::Result<()> {
|
|||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn version() -> &'static str {
|
||||
option_env!("CARGO_VERSION_INFO").unwrap_or(env!("CARGO_PKG_VERSION"))
|
||||
}
|
||||
|
|
1109
src/openapi.rs
1109
src/openapi.rs
File diff suppressed because it is too large
Load Diff
|
@ -2,7 +2,7 @@ use std::collections::{BTreeMap, HashMap, HashSet};
|
|||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{anyhow, Error};
|
||||
use async_nats::ConnectOptions;
|
||||
use async_nats::{Client as NatsClient, ConnectError, ConnectOptions};
|
||||
use axum::{
|
||||
body::Bytes,
|
||||
extract::{Path, State as AxumState},
|
||||
|
@ -12,21 +12,21 @@ use axum::{
|
|||
};
|
||||
use kube::{
|
||||
api::{Api, ListParams},
|
||||
client::Client,
|
||||
client::Client as KubeClient,
|
||||
core::{ListMeta, ObjectMeta},
|
||||
};
|
||||
use secrecy::{ExposeSecret, SecretString};
|
||||
use serde::Serialize;
|
||||
use serde_json::json;
|
||||
use serde_json::{json, Value};
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::error;
|
||||
use uuid::Uuid;
|
||||
use wadm::{
|
||||
model::Manifest,
|
||||
server::{
|
||||
DeleteResult, DeployResult, GetResult, ModelSummary, PutResult, StatusResult, StatusType,
|
||||
},
|
||||
use wadm_client::{error::ClientError, Client as WadmClient};
|
||||
use wadm_types::{
|
||||
api::{ModelSummary, Status, StatusType},
|
||||
Manifest,
|
||||
};
|
||||
|
||||
use wasmcloud_operator_types::v1alpha1::WasmCloudHostConfig;
|
||||
|
||||
use crate::{
|
||||
|
@ -46,6 +46,8 @@ use crate::{
|
|||
*/
|
||||
|
||||
const GROUP_VERSION: &str = "core.oam.dev/v1beta1";
|
||||
const KUBECTL_LAST_APPLIED_CONFIG_ANNOTATION: &str =
|
||||
"kubectl.kubernetes.io/last-applied-configuration";
|
||||
|
||||
pub struct AppError(Error);
|
||||
|
||||
|
@ -237,20 +239,17 @@ impl From<Vec<ModelSummary>> for ApplicationTable {
|
|||
}
|
||||
}
|
||||
|
||||
impl From<Vec<Manifest>> for ApplicationTable {
|
||||
fn from(manifests: Vec<Manifest>) -> Self {
|
||||
impl From<Vec<CombinedManifest>> for ApplicationTable {
|
||||
fn from(manifests: Vec<CombinedManifest>) -> Self {
|
||||
let mut table = Self::default();
|
||||
let rows = manifests
|
||||
.into_iter()
|
||||
.map(|m| TableRow {
|
||||
.map(|cm| TableRow {
|
||||
cells: vec![
|
||||
m.metadata.name,
|
||||
"N/A".to_string(),
|
||||
match m.metadata.annotations.get("version") {
|
||||
Some(v) => v.to_owned(),
|
||||
None => "N/A".to_string(),
|
||||
},
|
||||
"N/A".to_string(),
|
||||
cm.name(),
|
||||
cm.deployed_version(),
|
||||
cm.latest_version(),
|
||||
cm.status(),
|
||||
],
|
||||
})
|
||||
.collect();
|
||||
|
@ -260,6 +259,42 @@ impl From<Vec<Manifest>> for ApplicationTable {
|
|||
}
|
||||
}
|
||||
|
||||
struct CombinedManifest {
|
||||
manifest: Manifest,
|
||||
status: Status,
|
||||
}
|
||||
|
||||
impl CombinedManifest {
|
||||
pub(crate) fn new(manifest: Manifest, status: Status) -> Self {
|
||||
Self { manifest, status }
|
||||
}
|
||||
|
||||
pub(crate) fn name(&self) -> String {
|
||||
self.manifest.metadata.name.to_owned()
|
||||
}
|
||||
|
||||
pub(crate) fn deployed_version(&self) -> String {
|
||||
match self.manifest.metadata.annotations.get("version") {
|
||||
Some(v) => v.to_owned(),
|
||||
None => "N/A".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn latest_version(&self) -> String {
|
||||
self.status.version.to_owned()
|
||||
}
|
||||
|
||||
pub(crate) fn status(&self) -> String {
|
||||
match self.status.info.status_type {
|
||||
StatusType::Undeployed => "Undeployed",
|
||||
StatusType::Reconciling => "Reconciling",
|
||||
StatusType::Deployed => "Deployed",
|
||||
StatusType::Failed => "Failed",
|
||||
}
|
||||
.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
post,
|
||||
path = "/apis/core.oam.dev/v1beta1/namespaces/{namespace}/applications"
|
||||
|
@ -269,11 +304,11 @@ pub async fn create_application(
|
|||
AxumState(state): AxumState<State>,
|
||||
body: Bytes,
|
||||
) -> impl IntoResponse {
|
||||
let client = match Client::try_default().await {
|
||||
let kube_client = match KubeClient::try_default().await {
|
||||
Ok(c) => c,
|
||||
Err(e) => return internal_error(anyhow!("unable to initialize kubernetes client: {}", e)),
|
||||
};
|
||||
let configs: Api<WasmCloudHostConfig> = Api::namespaced(client, &namespace);
|
||||
let configs: Api<WasmCloudHostConfig> = Api::namespaced(kube_client, &namespace);
|
||||
let cfgs = match configs.list(&ListParams::default()).await {
|
||||
Ok(objs) => objs,
|
||||
Err(e) => return internal_error(anyhow!("Unable to list cosmonic host configs: {}", e)),
|
||||
|
@ -286,74 +321,20 @@ pub async fn create_application(
|
|||
Err(resp) => return resp,
|
||||
};
|
||||
|
||||
let model: serde_json::Value = match serde_json::from_slice(&body) {
|
||||
let wadm_client = WadmClient::from_nats_client(&lattice_id, None, nats_client);
|
||||
|
||||
let manifest: Manifest = match serde_json::from_slice(&body) {
|
||||
Ok(v) => v,
|
||||
Err(e) => return internal_error(anyhow!("unable to decode the patch: {}", e)),
|
||||
};
|
||||
|
||||
let put =
|
||||
match wash_lib::app::put_model(&nats_client, Some(lattice_id.clone()), &model.to_string())
|
||||
.await
|
||||
{
|
||||
Ok(res) => res,
|
||||
let (application_name, _application_version) =
|
||||
match wadm_client.put_and_deploy_manifest(manifest).await {
|
||||
Ok(application_bits) => application_bits,
|
||||
Err(e) => return internal_error(anyhow!("could not deploy app: {}", e)),
|
||||
};
|
||||
|
||||
let model_name = match put.result {
|
||||
PutResult::Created | PutResult::NewVersion => put.name,
|
||||
_ => {
|
||||
// TODO(joonas): Add handling for the case where the model version
|
||||
// might already exist (from prior deploy or otherwise).
|
||||
return internal_error(anyhow!(
|
||||
"unexpected response from wadm: result={:?}, message={}",
|
||||
put.result,
|
||||
put.message
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
let deploy = match wash_lib::app::deploy_model(
|
||||
&nats_client,
|
||||
Some(lattice_id.clone()),
|
||||
&model_name,
|
||||
Some(put.current_version.clone()),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(res) => res,
|
||||
Err(e) => return internal_error(anyhow!("could not deploy app: {}", e)),
|
||||
};
|
||||
|
||||
if deploy.result != DeployResult::Acknowledged {
|
||||
return internal_error(anyhow!(
|
||||
"unexpected response from wadm: result={:?}, message={}",
|
||||
deploy.result,
|
||||
deploy.message
|
||||
));
|
||||
}
|
||||
|
||||
// Get model from WADM for displaying in Kubernetes
|
||||
let get = match wash_lib::app::get_model_details(
|
||||
&nats_client,
|
||||
Some(lattice_id),
|
||||
&model_name,
|
||||
Some(put.current_version),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(res) => res,
|
||||
Err(e) => return internal_error(anyhow!("error getting deployed app: {}", e)),
|
||||
};
|
||||
|
||||
match get.result {
|
||||
GetResult::Success => Json(Application::new(model_name)).into_response(),
|
||||
// Either we received an error or could not find the deployed application, so return an error:
|
||||
_ => internal_error(anyhow!(
|
||||
"unexpected response from wadm: result={:?}, message={}",
|
||||
get.result,
|
||||
get.message
|
||||
)),
|
||||
}
|
||||
Json(Application::new(application_name)).into_response()
|
||||
}
|
||||
|
||||
#[utoipa::path(get, path = "/apis/core.oam.dev/v1beta1/applications")]
|
||||
|
@ -364,12 +345,12 @@ pub async fn list_all_applications(
|
|||
// TODO(joonas): Use lattices (or perhaps Controller specific/special creds) for instanciating NATS client.
|
||||
// TODO(joonas): Add watch support to stop Argo from spamming this endpoint every second.
|
||||
|
||||
let client = match Client::try_default().await {
|
||||
let kube_client = match KubeClient::try_default().await {
|
||||
Ok(c) => c,
|
||||
Err(e) => return internal_error(anyhow!("unable to initialize kubernetes client: {}", e)),
|
||||
};
|
||||
|
||||
let configs: Api<WasmCloudHostConfig> = Api::all(client);
|
||||
let configs: Api<WasmCloudHostConfig> = Api::all(kube_client);
|
||||
let cfgs = match configs.list(&ListParams::default()).await {
|
||||
Ok(objs) => objs,
|
||||
Err(e) => return internal_error(anyhow!("Unable to list cosmonic host configs: {}", e)),
|
||||
|
@ -427,11 +408,11 @@ pub async fn list_applications(
|
|||
Path(namespace): Path<String>,
|
||||
AxumState(state): AxumState<State>,
|
||||
) -> impl IntoResponse {
|
||||
let client = match Client::try_default().await {
|
||||
let kube_client = match KubeClient::try_default().await {
|
||||
Ok(c) => c,
|
||||
Err(e) => return internal_error(anyhow!("unable to initialize kubernetes client: {}", e)),
|
||||
};
|
||||
let configs: Api<WasmCloudHostConfig> = Api::namespaced(client, &namespace);
|
||||
let configs: Api<WasmCloudHostConfig> = Api::namespaced(kube_client, &namespace);
|
||||
let cfgs = match configs.list(&ListParams::default()).await {
|
||||
Ok(objs) => objs,
|
||||
Err(e) => return internal_error(anyhow!("Unable to list cosmonic host configs: {}", e)),
|
||||
|
@ -485,7 +466,7 @@ pub async fn list_apps(
|
|||
lattice_id: String,
|
||||
) -> Result<Vec<ModelSummary>, Error> {
|
||||
let addr = format!("{}:{}", cluster_url, port);
|
||||
let client = match creds {
|
||||
let nats_client = match creds {
|
||||
Some(creds) => {
|
||||
ConnectOptions::with_credentials(creds.expose_secret())?
|
||||
.connect(addr)
|
||||
|
@ -493,17 +474,16 @@ pub async fn list_apps(
|
|||
}
|
||||
None => ConnectOptions::new().connect(addr).await?,
|
||||
};
|
||||
let models = wash_lib::app::get_models(&client, Some(lattice_id)).await?;
|
||||
|
||||
Ok(models)
|
||||
let wadm_client = WadmClient::from_nats_client(&lattice_id, None, nats_client);
|
||||
Ok(wadm_client.list_manifests().await?)
|
||||
}
|
||||
|
||||
pub async fn get_client(
|
||||
pub async fn get_nats_client(
|
||||
cluster_url: &str,
|
||||
port: &u16,
|
||||
nats_creds: Arc<RwLock<HashMap<NameNamespace, SecretString>>>,
|
||||
namespace: NameNamespace,
|
||||
) -> Result<async_nats::Client, async_nats::ConnectError> {
|
||||
) -> Result<NatsClient, ConnectError> {
|
||||
let addr = format!("{}:{}", cluster_url, port);
|
||||
let creds = nats_creds.read().await;
|
||||
match creds.get(&namespace) {
|
||||
|
@ -527,12 +507,12 @@ pub async fn get_application(
|
|||
Path((namespace, name)): Path<(String, String)>,
|
||||
AxumState(state): AxumState<State>,
|
||||
) -> impl IntoResponse {
|
||||
let client = match Client::try_default().await {
|
||||
let kube_client = match KubeClient::try_default().await {
|
||||
Ok(c) => c,
|
||||
Err(e) => return internal_error(anyhow!("unable to initialize kubernetes client: {}", e)),
|
||||
};
|
||||
|
||||
let configs: Api<WasmCloudHostConfig> = Api::namespaced(client, &namespace);
|
||||
let configs: Api<WasmCloudHostConfig> = Api::namespaced(kube_client, &namespace);
|
||||
let cfgs = match configs.list(&ListParams::default()).await {
|
||||
Ok(objs) => objs,
|
||||
Err(e) => return internal_error(anyhow!("unable to list cosmonic host configs: {}", e)),
|
||||
|
@ -544,83 +524,60 @@ pub async fn get_application(
|
|||
Ok(data) => data,
|
||||
Err(resp) => return resp,
|
||||
};
|
||||
let wadm_client = WadmClient::from_nats_client(&lattice_id, None, nats_client);
|
||||
|
||||
let get =
|
||||
match wash_lib::app::get_model_details(&nats_client, Some(lattice_id.clone()), &name, None)
|
||||
.await
|
||||
{
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
return internal_error(anyhow!("unable to request app from wadm: {}", e));
|
||||
let manifest = match wadm_client.get_manifest(&name, None).await {
|
||||
Ok(m) => m,
|
||||
Err(e) => match e {
|
||||
ClientError::NotFound(_) => {
|
||||
return not_found_error(anyhow!("applications \"{}\" not found", name))
|
||||
}
|
||||
};
|
||||
|
||||
let status = match wash_lib::app::get_model_status(
|
||||
&nats_client,
|
||||
Some(lattice_id.clone()),
|
||||
&name,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
return internal_error(anyhow!("unable to request app status from wadm: {}", e));
|
||||
}
|
||||
_ => return internal_error(anyhow!("unable to request app from wadm: {}", e)),
|
||||
},
|
||||
};
|
||||
let status = match wadm_client.get_manifest_status(&name).await {
|
||||
Ok(s) => s,
|
||||
Err(e) => match e {
|
||||
ClientError::NotFound(_) => {
|
||||
return not_found_error(anyhow!("applications \"{}\" not found", name))
|
||||
}
|
||||
_ => return internal_error(anyhow!("unable to request app status from wadm: {}", e)),
|
||||
},
|
||||
};
|
||||
|
||||
match status.result {
|
||||
StatusResult::Ok => {}
|
||||
StatusResult::NotFound => {
|
||||
return not_found_error(anyhow!("applications \"{}\" not found", name));
|
||||
match accept.into() {
|
||||
As::Table => {
|
||||
let combined_manifest = CombinedManifest::new(manifest, status);
|
||||
Json(ApplicationTable::from(vec![combined_manifest])).into_response()
|
||||
}
|
||||
StatusResult::Error => {
|
||||
return internal_error(anyhow!(
|
||||
"unexpected response from wadm: result={:?}, message={}",
|
||||
status.result,
|
||||
status.message
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
if get.result == GetResult::Success {
|
||||
if let Some(manifest) = get.manifest {
|
||||
let response = match accept.into() {
|
||||
As::Table => Json(ApplicationTable::from(vec![manifest])).into_response(),
|
||||
As::NotSpecified => {
|
||||
// TODO(joonas): This is a terrible hack, but for now it's what we need to do to satisfy Argo/Kubernetes since WADM doesn't support this metadata.
|
||||
let mut manifest_value = serde_json::to_value(&manifest).unwrap();
|
||||
// TODO(joonas): We should add lattice id to this as well, but we need it in every place where the application is listed.
|
||||
let ns = format!("{}/{}", &name, &manifest.version());
|
||||
let uid = Uuid::new_v5(&Uuid::NAMESPACE_OID, ns.as_bytes());
|
||||
manifest_value["metadata"]["uid"] = json!(uid.to_string());
|
||||
manifest_value["metadata"]["resourceVersion"] = json!(uid.to_string());
|
||||
manifest_value["metadata"]["namespace"] = json!(namespace);
|
||||
manifest_value["metadata"]["labels"] = json!({
|
||||
"app.kubernetes.io/instance": &name
|
||||
});
|
||||
// TODO(joonas): refactor status and the metadata inputs into a struct we could just serialize
|
||||
// The custom health check we provide for Argo will handle the case where status is missing, so this is fine for now.
|
||||
if let Some(status) = status.status {
|
||||
let phase = match status.info.status_type {
|
||||
StatusType::Undeployed => "Undeployed",
|
||||
StatusType::Reconciling => "Reconciling",
|
||||
StatusType::Deployed => "Deployed",
|
||||
StatusType::Failed => "Failed",
|
||||
};
|
||||
manifest_value["status"] = json!({
|
||||
"phase": phase,
|
||||
});
|
||||
}
|
||||
Json(manifest_value).into_response()
|
||||
}
|
||||
// TODO(joonas): Add better error handling here
|
||||
t => return internal_error(anyhow!("unknown type: {}", t)),
|
||||
As::NotSpecified => {
|
||||
// TODO(joonas): This is a terrible hack, but for now it's what we need to do to satisfy Argo/Kubernetes since WADM doesn't support this metadata.
|
||||
let mut manifest_value = serde_json::to_value(&manifest).unwrap();
|
||||
// TODO(joonas): We should add lattice id to this as well, but we need it in every place where the application is listed.
|
||||
let ns = format!("{}/{}", &name, &manifest.version());
|
||||
let uid = Uuid::new_v5(&Uuid::NAMESPACE_OID, ns.as_bytes());
|
||||
manifest_value["metadata"]["uid"] = json!(uid.to_string());
|
||||
manifest_value["metadata"]["resourceVersion"] = json!(uid.to_string());
|
||||
manifest_value["metadata"]["namespace"] = json!(namespace);
|
||||
manifest_value["metadata"]["labels"] = json!({
|
||||
"app.kubernetes.io/instance": &name
|
||||
});
|
||||
// TODO(joonas): refactor status and the metadata inputs into a struct we could just serialize
|
||||
// The custom health check we provide for Argo will handle the case where status is missing, so this is fine for now.
|
||||
let phase = match status.info.status_type {
|
||||
StatusType::Undeployed => "Undeployed",
|
||||
StatusType::Reconciling => "Reconciling",
|
||||
StatusType::Deployed => "Deployed",
|
||||
StatusType::Failed => "Failed",
|
||||
};
|
||||
return response;
|
||||
manifest_value["status"] = json!({
|
||||
"phase": phase,
|
||||
});
|
||||
Json(manifest_value).into_response()
|
||||
}
|
||||
};
|
||||
|
||||
not_found_error(anyhow!("applications \"{}\" not found", name))
|
||||
// TODO(joonas): Add better error handling here
|
||||
t => internal_error(anyhow!("unknown type: {}", t)),
|
||||
}
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
|
@ -632,11 +589,11 @@ pub async fn patch_application(
|
|||
AxumState(state): AxumState<State>,
|
||||
body: Bytes,
|
||||
) -> impl IntoResponse {
|
||||
let client = match Client::try_default().await {
|
||||
let kube_client = match KubeClient::try_default().await {
|
||||
Ok(c) => c,
|
||||
Err(e) => return internal_error(anyhow!("unable to initialize kubernetes client: {}", e)),
|
||||
};
|
||||
let configs: Api<WasmCloudHostConfig> = Api::namespaced(client, &namespace);
|
||||
let configs: Api<WasmCloudHostConfig> = Api::namespaced(kube_client, &namespace);
|
||||
let cfgs = match configs.list(&ListParams::default()).await {
|
||||
Ok(objs) => objs,
|
||||
Err(e) => return internal_error(anyhow!("unable to list cosmonic host configs: {}", e)),
|
||||
|
@ -648,88 +605,86 @@ pub async fn patch_application(
|
|||
Ok(data) => data,
|
||||
Err(resp) => return resp,
|
||||
};
|
||||
|
||||
// Fist, check if the model exists.
|
||||
// TODO(joonas): we should likely fetch the version of the manifest that's running in Kubernetes
|
||||
// TODO(joonas): Should this use model.status instead of model.get?
|
||||
let get =
|
||||
match wash_lib::app::get_model_details(&nats_client, Some(lattice_id.clone()), &name, None)
|
||||
.await
|
||||
{
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
return internal_error(anyhow!("unable to find app: {}", e));
|
||||
let wadm_client = WadmClient::from_nats_client(&lattice_id, None, nats_client);
|
||||
let current_manifest = match wadm_client.get_manifest(&name, None).await {
|
||||
Ok(m) => m,
|
||||
Err(e) => match e {
|
||||
ClientError::NotFound(_) => {
|
||||
return not_found_error(anyhow!("applications \"{}\" not found", name))
|
||||
}
|
||||
};
|
||||
|
||||
if get.result != GetResult::Success {
|
||||
return internal_error(anyhow!(
|
||||
"unexpected response from wadm: result={:?}, reason={}",
|
||||
get.result,
|
||||
get.message
|
||||
));
|
||||
}
|
||||
|
||||
// Prepare manifest for patching
|
||||
let Some(manifest) = get.manifest else {
|
||||
return internal_error(anyhow!("no manifest was found for app: {}", &name));
|
||||
_ => return internal_error(anyhow!("unable to request app from wadm: {}", e)),
|
||||
},
|
||||
};
|
||||
|
||||
let mut model = serde_json::to_value(manifest).unwrap();
|
||||
let mut current = serde_json::to_value(current_manifest).unwrap();
|
||||
// Parse the Kubernetes-provided RFC 7386 patch
|
||||
let patch: serde_json::Value = match serde_json::from_slice(&body) {
|
||||
let patch = match serde_json::from_slice::<Value>(&body) {
|
||||
Ok(p) => p,
|
||||
Err(e) => return internal_error(anyhow!("unable to decode the patch: {}", e)),
|
||||
};
|
||||
|
||||
// Attempt to patch the currently running version
|
||||
json_patch::merge(&mut model, &patch);
|
||||
// Remove kubectl.kubernetes.io/last-applied-configuration annotation before
|
||||
// we compare against the patch, otherwise we'll always end up creating a new version.
|
||||
let last_applied_configuration = current
|
||||
.get_mut("metadata")
|
||||
.and_then(|metadata| metadata.get_mut("annotations"))
|
||||
.and_then(|annotations| annotations.as_object_mut())
|
||||
.and_then(|annotations| annotations.remove(KUBECTL_LAST_APPLIED_CONFIG_ANNOTATION));
|
||||
|
||||
let put =
|
||||
match wash_lib::app::put_model(&nats_client, Some(lattice_id.clone()), &model.to_string())
|
||||
.await
|
||||
{
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
return internal_error(anyhow!("could not update manifest for deploy: {}", e))
|
||||
}
|
||||
};
|
||||
// TODO(joonas): This doesn't quite work as intended at the moment,
|
||||
// there are some differences in terms like replicas vs. instances:
|
||||
// * Add(AddOperation { path: "/spec/components/0/traits/0/properties/replicas", value: Number(1) }),
|
||||
// * Remove(RemoveOperation { path: "/spec/components/0/traits/0/properties/instances" }),
|
||||
//
|
||||
// which cause the server to always patch. Also, top-level entries such
|
||||
// as apiVersion, kind and metadata are always removed.
|
||||
//
|
||||
// let diff = json_patch::diff(¤t, &patch);
|
||||
// if diff.is_empty() {
|
||||
// // If there's nothing to patch, return early.
|
||||
// return Json(()).into_response();
|
||||
// };
|
||||
|
||||
match put.result {
|
||||
PutResult::NewVersion => {}
|
||||
_ => {
|
||||
// For now we have to check the error message to see if we can continue,
|
||||
// despite getting an error.
|
||||
if !put.message.contains("already exists") {
|
||||
return internal_error(anyhow!(
|
||||
"unexpected response from wadm: result={:?}, message={}",
|
||||
put.result,
|
||||
put.message
|
||||
));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let deploy = match wash_lib::app::deploy_model(
|
||||
&nats_client,
|
||||
Some(lattice_id),
|
||||
&name,
|
||||
Some(put.current_version),
|
||||
)
|
||||
.await
|
||||
// Remove current version so that either a new version is generated,
|
||||
// or the one set in the incoming patch gets used.
|
||||
if let Some(annotations) = current
|
||||
.get_mut("metadata")
|
||||
.and_then(|metadata| metadata.get_mut("annotations"))
|
||||
.and_then(|annotations| annotations.as_object_mut())
|
||||
{
|
||||
Ok(res) => res,
|
||||
Err(e) => return internal_error(anyhow!("could not deploy app: {}", e)),
|
||||
annotations.remove("version");
|
||||
}
|
||||
|
||||
// Attempt to patch the currently running version
|
||||
json_patch::merge(&mut current, &patch);
|
||||
|
||||
// Re-insert "kubectl.kubernetes.io/last-applied-configuration" if one was set
|
||||
if let Some(last_applied_config) = last_applied_configuration {
|
||||
if let Some(annotations) = current
|
||||
.get_mut("metadata")
|
||||
.and_then(|metadata| metadata.get_mut("annotations"))
|
||||
.and_then(|annotations| annotations.as_object_mut())
|
||||
{
|
||||
annotations.insert(
|
||||
KUBECTL_LAST_APPLIED_CONFIG_ANNOTATION.to_string(),
|
||||
last_applied_config,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let updated_manifest = match serde_json::from_value::<Manifest>(current) {
|
||||
Ok(m) => m,
|
||||
Err(e) => return internal_error(anyhow!("unable to patch the application: {}", e)),
|
||||
};
|
||||
|
||||
match deploy.result {
|
||||
DeployResult::Acknowledged => Json(Application::new(name)).into_response(),
|
||||
DeployResult::Error => internal_error(anyhow!(
|
||||
"unexpected response from wadm: result={:?}, message={}",
|
||||
deploy.result,
|
||||
deploy.message
|
||||
)),
|
||||
DeployResult::NotFound => not_found_error(anyhow!("applications \"{}\" not found", &name)),
|
||||
match wadm_client.put_and_deploy_manifest(updated_manifest).await {
|
||||
Ok((app_name, _)) => Json(Application::new(app_name)).into_response(),
|
||||
Err(e) => match e {
|
||||
ClientError::NotFound(_) => {
|
||||
not_found_error(anyhow!("applications \"{}\" not found", &name))
|
||||
}
|
||||
_ => internal_error(anyhow!("could not update application: {}", e)),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -741,12 +696,12 @@ pub async fn delete_application(
|
|||
Path((namespace, name)): Path<(String, String)>,
|
||||
AxumState(state): AxumState<State>,
|
||||
) -> impl IntoResponse {
|
||||
let client = match Client::try_default().await {
|
||||
let kube_client = match KubeClient::try_default().await {
|
||||
Ok(c) => c,
|
||||
Err(e) => return internal_error(anyhow!("unable to initialize kubernetes client: {}", e)),
|
||||
};
|
||||
|
||||
let configs: Api<WasmCloudHostConfig> = Api::namespaced(client, &namespace);
|
||||
let configs: Api<WasmCloudHostConfig> = Api::namespaced(kube_client, &namespace);
|
||||
let cfgs = match configs.list(&ListParams::default()).await {
|
||||
Ok(objs) => objs,
|
||||
Err(e) => return internal_error(anyhow!("unable to list cosmonic host configs: {}", e)),
|
||||
|
@ -759,59 +714,13 @@ pub async fn delete_application(
|
|||
Err(resp) => return resp,
|
||||
};
|
||||
|
||||
// Fist, check if the model exists.
|
||||
// TODO(joonas): Replace this with wash_lib::app::get_model_status once
|
||||
// https://github.com/wasmCloud/wasmCloud/pull/1151 ships.
|
||||
let status = match wash_lib::app::get_model_status(
|
||||
&nats_client,
|
||||
Some(lattice_id.clone()),
|
||||
&name,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
return internal_error(anyhow!("unable to request app status from wadm: {}", e));
|
||||
}
|
||||
};
|
||||
|
||||
match status.result {
|
||||
StatusResult::Ok => {
|
||||
// Proceed
|
||||
}
|
||||
StatusResult::NotFound => {
|
||||
return not_found_error(anyhow!("apps \"{}\" not found", name));
|
||||
}
|
||||
StatusResult::Error => {
|
||||
return internal_error(anyhow!(
|
||||
"unexpected response from status command: result={:?}, message={}",
|
||||
status.result,
|
||||
status.message
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
let delete = match wash_lib::app::delete_model_version(
|
||||
&nats_client,
|
||||
Some(lattice_id),
|
||||
&name,
|
||||
None,
|
||||
true,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(res) => res,
|
||||
Err(e) => return internal_error(anyhow!("error deleting app: {}", e)),
|
||||
};
|
||||
|
||||
match delete.result {
|
||||
// TODO(joonas): do we need to handle DeleteResult::Noop differently?
|
||||
DeleteResult::Deleted | DeleteResult::Noop => Json(Application::new(name)).into_response(),
|
||||
DeleteResult::Error => internal_error(anyhow!(
|
||||
"unexpected response from wadm: result={:?}, message={}",
|
||||
delete.result,
|
||||
delete.message
|
||||
)),
|
||||
let wadm_client = WadmClient::from_nats_client(&lattice_id, None, nats_client);
|
||||
match wadm_client.delete_manifest(&name, None).await {
|
||||
Ok(_) => Json(Application::new(name)).into_response(),
|
||||
Err(e) => match e {
|
||||
ClientError::NotFound(_) => not_found_error(anyhow!("apps \"{}\" not found", name)),
|
||||
_ => internal_error(anyhow!("could not delete app: {}", e)),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -819,7 +728,7 @@ async fn get_lattice_connection(
|
|||
cfgs: impl Iterator<Item = WasmCloudHostConfig>,
|
||||
state: State,
|
||||
namespace: String,
|
||||
) -> Result<(async_nats::Client, String), Response> {
|
||||
) -> Result<(NatsClient, String), Response> {
|
||||
let connection_data =
|
||||
cfgs.map(|cfg| (cfg, namespace.clone()))
|
||||
.filter_map(|(cfg, namespace)| {
|
||||
|
@ -832,7 +741,7 @@ async fn get_lattice_connection(
|
|||
});
|
||||
|
||||
for (cluster_url, ns, lattice_id, port) in connection_data {
|
||||
match get_client(&cluster_url, &port, state.nats_creds.clone(), ns).await {
|
||||
match get_nats_client(&cluster_url, &port, state.nats_creds.clone(), ns).await {
|
||||
Ok(c) => return Ok((c, lattice_id)),
|
||||
Err(e) => {
|
||||
error!(err = %e, %lattice_id, "error connecting to nats");
|
||||
|
|
|
@ -25,12 +25,9 @@ use kube::{
|
|||
use tokio::sync::{mpsc, RwLock};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, warn};
|
||||
use wadm::{
|
||||
events::{Event, ManifestPublished, ManifestUnpublished},
|
||||
model::{self, Manifest, Trait, TraitProperty},
|
||||
server::{GetResult, ModelSummary},
|
||||
};
|
||||
use wash_lib::app;
|
||||
use wadm::events::{Event, ManifestPublished, ManifestUnpublished};
|
||||
use wadm_client::Client as WadmClient;
|
||||
use wadm_types::{api::ModelSummary, Component, Manifest, Properties, Trait, TraitProperty};
|
||||
use wasmcloud_operator_types::v1alpha1::WasmCloudHostConfig;
|
||||
|
||||
use crate::controller::{
|
||||
|
@ -40,8 +37,9 @@ use crate::controller::{
|
|||
|
||||
const CONSUMER_PREFIX: &str = "wasmcloud_operator_service";
|
||||
// This should probably be exposed by wadm somewhere
|
||||
const WADM_EVT_SUBJECT: &str = "wadm.evt";
|
||||
const WADM_EVENT_STREAM_NAME: &str = "wadm_events";
|
||||
const OPERATOR_STREAM_NAME: &str = "wasmcloud_operator_events";
|
||||
const OPERATOR_STREAM_SUBJECT: &str = "wasmcloud_operator_events.*.>";
|
||||
|
||||
/// Commands that can be sent to the watcher to trigger an update or removal of a service.
|
||||
#[derive(Clone, Debug)]
|
||||
|
@ -275,26 +273,20 @@ impl ServiceWatcher {
|
|||
/// This intended to be called by the controller whenever it reconciles state.
|
||||
pub async fn reconcile_services(&self, apps: Vec<ModelSummary>, lattice_id: String) {
|
||||
if let Some(watcher) = self.watchers.read().await.get(lattice_id.as_str()) {
|
||||
let wadm_client =
|
||||
WadmClient::from_nats_client(&lattice_id, None, watcher.nats_client.clone());
|
||||
for app in apps {
|
||||
if app.deployed_version.is_none() {
|
||||
continue;
|
||||
}
|
||||
match app::get_model_details(
|
||||
&watcher.nats_client,
|
||||
Some(lattice_id.clone()),
|
||||
app.name.as_str(),
|
||||
app.deployed_version,
|
||||
)
|
||||
.await
|
||||
match wadm_client
|
||||
.get_manifest(app.name.as_str(), app.deployed_version.as_deref())
|
||||
.await
|
||||
{
|
||||
Ok(model) => {
|
||||
if model.result == GetResult::Success {
|
||||
// TODO handle this or decide on whether or not to return a result at
|
||||
// all
|
||||
let _ = watcher.handle_manifest_published(ManifestPublished {
|
||||
manifest: model.manifest.unwrap(),
|
||||
Ok(manifest) => {
|
||||
let _ = watcher.handle_manifest_published(ManifestPublished {
|
||||
manifest,
|
||||
}).map_err(|e| error!(err = %e, %lattice_id, app = %app.name, "failed to trigger service reconciliation for app"));
|
||||
}
|
||||
}
|
||||
Err(e) => warn!(err=%e, "Unable to retrieve model"),
|
||||
};
|
||||
|
@ -314,8 +306,6 @@ impl ServiceWatcher {
|
|||
}
|
||||
|
||||
let js = jetstream::new(client.clone());
|
||||
let source_subject = format!("{WADM_EVT_SUBJECT}.{}", lattice_id.clone());
|
||||
let destination_subject = format!("wasmcloud_operator_events.{}", lattice_id.clone());
|
||||
|
||||
// Should we also be doing this when we first create the ServiceWatcher?
|
||||
let stream = js
|
||||
|
@ -330,10 +320,10 @@ impl ServiceWatcher {
|
|||
allow_rollup: false,
|
||||
num_replicas: self.stream_replicas as usize,
|
||||
mirror: Some(Source {
|
||||
name: "wadm_events".to_string(),
|
||||
name: WADM_EVENT_STREAM_NAME.to_string(),
|
||||
subject_transforms: vec![SubjectTransform {
|
||||
source: source_subject,
|
||||
destination: format!("wasmcloud_operator_events.{}", lattice_id.clone()),
|
||||
source: wadm::DEFAULT_WADM_EVENTS_TOPIC.to_string(),
|
||||
destination: OPERATOR_STREAM_SUBJECT.replacen('*', "{{wildcard(1)}}", 1),
|
||||
}],
|
||||
..Default::default()
|
||||
}),
|
||||
|
@ -352,7 +342,7 @@ impl ServiceWatcher {
|
|||
ack_wait: std::time::Duration::from_secs(2),
|
||||
max_deliver: 3,
|
||||
deliver_policy: async_nats::jetstream::consumer::DeliverPolicy::All,
|
||||
filter_subject: destination_subject.clone(),
|
||||
filter_subject: OPERATOR_STREAM_SUBJECT.replacen('*', &lattice_id, 1),
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
|
@ -570,14 +560,12 @@ pub struct HttpServerComponent {
|
|||
|
||||
/// Finds the httpserver component in a manifest and returns the details needed to create a service
|
||||
fn http_server_component(manifest: &Manifest) -> Option<HttpServerComponent> {
|
||||
let components: Vec<&model::Component> = manifest
|
||||
.spec
|
||||
.components
|
||||
.iter()
|
||||
let components: Vec<&Component> = manifest
|
||||
.components()
|
||||
// filter just for the wasmCloud httpserver for now. This should actually just filter for
|
||||
// the http capability
|
||||
.filter(|c| {
|
||||
if let wadm::model::Properties::Capability { properties } = &c.properties {
|
||||
if let Properties::Capability { properties } = &c.properties {
|
||||
if properties
|
||||
.image
|
||||
.starts_with("ghcr.io/wasmcloud/http-server")
|
||||
|
@ -630,13 +618,13 @@ fn http_server_component(manifest: &Manifest) -> Option<HttpServerComponent> {
|
|||
if props.namespace == "wasi"
|
||||
&& props.package == "http"
|
||||
&& props.interfaces.contains(&"incoming-handler".to_string())
|
||||
&& props.source.is_some()
|
||||
{
|
||||
for p in props.source_config.iter() {
|
||||
if let Some(config_props) = &p.properties {
|
||||
if let Some(addr) = config_props.get("address") {
|
||||
details.address = addr.clone();
|
||||
should_create_service = true;
|
||||
};
|
||||
let source = props.source.as_ref().unwrap();
|
||||
for cp in source.config.iter() {
|
||||
if let Some(addr) = cp.properties.as_ref().and_then(|p| p.get("address")) {
|
||||
details.address.clone_from(addr);
|
||||
should_create_service = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue