Provide options to set api server url and ca (#105)

- Allow to automatically infer local endpoint and certificate

Signed-off-by: Danil-Grigorev <danil.grigorev@suse.com>
This commit is contained in:
Danil Grigorev 2024-09-11 13:34:04 +02:00 committed by GitHub
parent 7efde875f1
commit 4bba311d5c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 634 additions and 252 deletions

491
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -11,10 +11,10 @@ doc = false
name = "controller"
path = "src/main.rs"
# [[bin]]
# doc = false
# name = "crdgen"
# path = "src/crdgen.rs"
[[bin]]
doc = false
name = "crdgen"
path = "src/crdgen.rs"
[lib]
@ -34,9 +34,16 @@ actix-web = "4.4.0"
futures = "0.3.28"
tokio = { version = "1.39.2", features = ["macros", "rt-multi-thread"] }
k8s-openapi = { version = "0.22", features = ["latest", "schemars"] }
kube = { version = "0.93.1", features = ["runtime", "client", "derive", "unstable-runtime"]}
kube = { version = "0.94", features = [
"runtime",
"client",
"derive",
"unstable-runtime",
"unstable-client",
] }
schemars = { version = "0.8.21", features = ["chrono"] }
serde = { version = "1.0.203", features = ["derive"] }
serde_with = "3.9"
serde_json = "1.0.121"
serde_yaml = "0.9.25"
prometheus = "0.13.4"
@ -51,14 +58,10 @@ opentelemetry-otlp = { version = "0.16.0", features = [
tonic = { version = "0.12", optional = true }
thiserror = "1.0.62"
anyhow = "1.0.86"
base64 = "0.22.1"
[dev-dependencies]
assert-json-diff = "2.0.2"
http = "1"
hyper = "1"
tower-test = "0.4.0"
[[bin]]
doc = false
name = "crdgen"
path = "src/crdgen.rs"

View File

@ -137,6 +137,61 @@ spec:
nullable: true
type: boolean
type: object
config:
nullable: true
properties:
server:
oneOf:
- required:
- inferLocal
- required:
- custom
properties:
custom:
properties:
apiServerCaConfigRef:
description: ObjectReference contains enough information to let you inspect or modify the referred object.
nullable: true
properties:
apiVersion:
description: API version of the referent.
type: string
fieldPath:
description: 'If referring to a piece of an object instead of an entire object, this string should contain a valid JSON/Go field access statement, such as desiredState.manifest.containers[2]. For example, if the object reference is to a container within a pod, this would take on a value like: "spec.containers{name}" (where "name" refers to the name of the container that triggered the event) or if no container name is specified "spec.containers[2]" (container with index 2 in this pod). This syntax is chosen only to have some well-defined way of referencing a part of an object.'
type: string
kind:
description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
type: string
name:
description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names'
type: string
namespace:
description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/'
type: string
resourceVersion:
description: 'Specific resourceVersion to which this reference is made, if any. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#concurrency-control-and-consistency'
type: string
uid:
description: 'UID of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#uids'
type: string
type: object
apiServerUrl:
nullable: true
type: string
type: object
inferLocal:
type: boolean
type: object
required:
- server
type: object
type: object
status:
nullable: true
properties:
installed_version:
nullable: true
type: string
type: object
required:
- spec
@ -144,4 +199,5 @@ spec:
type: object
served: true
storage: true
subresources: {}
subresources:
status: {}

View File

@ -32,6 +32,19 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- configmaps
verbs:
- get
- patch
- apiGroups:
- ""
resources:
- endpoints
verbs:
- get
- apiGroups:
- apiextensions.k8s.io
resources:

View File

@ -131,11 +131,8 @@ update-helm-repos:
install-fleet: _create-out-dir
#!/usr/bin/env bash
set -euxo pipefail
kubectl config view -o json --raw | jq -r '.clusters[].cluster["certificate-authority-data"]' | base64 -d > {{OUT_DIR}}/ca.pem
API_SERVER_URL=`kubectl get nodes -o json | jq -r '.items[0].status.addresses[] | select(.type=="InternalIP").address'`
API_SERVER_URL=https://${API_SERVER_URL}:6443
helm -n cattle-fleet-system install --version v0.10.1 --create-namespace --wait fleet-crd fleet/fleet-crd
helm install --version v0.10.1 --create-namespace -n cattle-fleet-system --set bootstrap.enabled=false --set apiServerURL=$API_SERVER_URL --set-file apiServerCA={{OUT_DIR}}/ca.pem fleet fleet/fleet --wait
helm install --version v0.10.1 --create-namespace -n cattle-fleet-system --set bootstrap.enabled=false fleet fleet/fleet --wait
# Install cluster api and any providers
install-capi: _download-clusterctl

View File

@ -1,4 +1,6 @@
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
use k8s_openapi::{
api::core::v1::ObjectReference, apimachinery::pkg::apis::meta::v1::LabelSelector,
};
use kube::{
core::{ParseExpressionError, Selector},
CustomResource,
@ -13,7 +15,8 @@ pub const AGENT_NAMESPACE: &str = "fleet-addon-agent";
#[kube(
kind = "FleetAddonConfig",
group = "addons.cluster.x-k8s.io",
version = "v1alpha1"
version = "v1alpha1",
status = "FleetAddonConfigStatus"
)]
#[serde(rename_all = "camelCase")]
pub struct FleetAddonConfigSpec {
@ -28,6 +31,9 @@ pub struct FleetAddonConfigSpec {
/// In case the cluster specifies topology.class, the name of the ClusterClass
/// will be added to the Fleet Cluster labels.
pub cluster: Option<ClusterConfig>,
// Fleet chart installation options
pub config: Option<FleetConfig>,
}
impl Default for FleetAddonConfig {
@ -37,11 +43,18 @@ impl Default for FleetAddonConfig {
spec: FleetAddonConfigSpec {
cluster_class: Some(ClusterClassConfig::default()),
cluster: Some(ClusterConfig::default()),
config: None,
},
status: Default::default(),
}
}
}
#[derive(Deserialize, Serialize, Clone, Default, Debug, JsonSchema)]
pub struct FleetAddonConfigStatus {
pub installed_version: Option<String>,
}
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct ClusterClassConfig {
@ -137,6 +150,26 @@ impl Default for ClusterConfig {
}
}
#[derive(Clone, Debug, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct FleetConfig {
pub server: Server
}
#[derive(Clone, Debug, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub enum Server {
InferLocal(bool),
Custom(InstallOptions),
}
#[derive(Clone, Default, Debug, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct InstallOptions {
pub api_server_ca_config_ref: Option<ObjectReference>,
pub api_server_url: Option<String>,
}
impl NamingStrategy {
pub fn apply(&self, name: Option<String>) -> Option<String> {
name.map(|name| match &self.prefix {
@ -144,7 +177,7 @@ impl NamingStrategy {
None => name,
})
.map(|name| match &self.suffix {
Some(suffix) => name + &suffix,
Some(suffix) => name + suffix,
None => name,
})
}

View File

@ -1,5 +1,6 @@
use crate::api::capi_cluster::Cluster;
use crate::api::capi_clusterclass::ClusterClass;
use crate::api::fleet_addon_config::FleetAddonConfig;
use crate::api::fleet_cluster;
use crate::api::fleet_clustergroup::ClusterGroup;
use crate::controllers::controller::{fetch_config, Context, FleetController};
@ -70,6 +71,22 @@ impl State {
}
}
pub async fn run_fleet_addon_config_controller(state: State) {
let client = Client::try_default()
.await
.expect("failed to create kube Client");
let api: Api<FleetAddonConfig> = Api::all(client.clone());
let fleet_addon_config_controller = Controller::new(api, watcher::Config::default())
.run(
FleetAddonConfig::reconcile,
error_policy,
state.to_context(client.clone()),
)
.for_each(|_| futures::future::ready(()));
tokio::join!(fleet_addon_config_controller);
}
/// Initialize the controller and shared state (given the crd is installed)
pub async fn run_cluster_controller(state: State) {
let client = Client::try_default()

View File

@ -0,0 +1,222 @@
use base64::prelude::*;
use std::{fmt::Display, str::FromStr, sync::Arc};
use k8s_openapi::api::core::v1::{ConfigMap, Endpoints};
use kube::{
api::{ObjectMeta, Patch, PatchParams, TypeMeta},
client::scope::Namespace,
core::object::HasSpec,
runtime::controller::Action,
Api, Resource, ResourceExt,
};
use serde::{ser, Deserialize, Serialize};
use serde_json::Value;
use serde_with::{serde_as, DisplayFromStr};
use thiserror::Error;
use tracing::instrument;
use crate::{
api::fleet_addon_config::{FleetAddonConfig, InstallOptions, Server},
telemetry,
};
use super::controller::Context;
#[derive(Resource, Serialize, Deserialize, Default, Clone, Debug)]
#[resource(inherit = ConfigMap)]
struct FleetConfig {
#[serde(flatten, default)]
types: Option<TypeMeta>,
metadata: ObjectMeta,
data: FleetConfigSpec,
}
#[serde_as]
#[derive(Serialize, Deserialize, Default, Clone, Debug)]
struct FleetConfigSpec {
#[serde_as(as = "DisplayFromStr")]
config: FleetConfigData,
}
#[derive(Serialize, Deserialize, Default, Clone, Debug)]
struct FleetConfigData {
#[serde(rename = "apiServerURL")]
api_server_url: String,
#[serde(rename = "apiServerCA")]
api_server_ca: String,
#[serde(flatten)]
other: Value,
}
impl FromStr for FleetConfigData {
type Err = serde_json::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
serde_json::from_str(s)
}
}
impl Display for FleetConfigData {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&serde_json::to_string(self).map_err(ser::Error::custom)?)
}
}
#[derive(Resource, Deserialize, Serialize, Clone, Debug)]
#[resource(inherit = ConfigMap)]
struct CertConfigMap {
metadata: ObjectMeta,
data: CertData,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
struct CertData {
#[serde(rename = "ca.crt")]
ca_crt: String,
}
impl FleetAddonConfig {
#[instrument(skip_all, fields(trace_id = display(telemetry::get_trace_id()), name = self.name_any(), namespace = self.namespace()))]
pub async fn reconcile(self: Arc<Self>, ctx: Arc<Context>) -> crate::Result<Action> {
self.reconcile_config_sync(ctx).await.map_err(Into::into)
}
#[instrument(skip_all, fields(trace_id = display(telemetry::get_trace_id()), name = self.name_any(), namespace = self.namespace()))]
pub async fn reconcile_config_sync(
self: Arc<Self>,
ctx: Arc<Context>,
) -> AddonConfigSyncResult<Action> {
if self.name_any() != "fleet-addon-config" {
return Ok(Action::await_change());
}
let ns = Namespace::from("cattle-fleet-system");
let mut fleet_config: FleetConfig = ctx.client.get("fleet-controller", &ns).await?;
if let Some(config) = self.spec().config.as_ref() {
self.update_certificate(ctx.clone(), &mut fleet_config, &config.server)
.await?;
self.update_url(ctx.clone(), &mut fleet_config, &config.server)
.await?;
}
fleet_config.metadata.managed_fields = None;
fleet_config.types = Some(TypeMeta::resource::<FleetConfig>());
let api: Api<FleetConfig> = Api::namespaced(ctx.client.clone(), "cattle-fleet-system");
api.patch(
&fleet_config.name_any(),
&PatchParams::apply("addon-provider-fleet").force(),
&Patch::Apply(&fleet_config),
)
.await?;
Ok(Action::await_change())
}
fn default_endpoint_lookup(&self, endpoints: Endpoints) -> Option<String> {
let subsets = endpoints.subsets?;
let subnet = subsets.first()?.clone();
let addresses = subnet.addresses?;
let ports = subnet.ports?;
let address = addresses.first()?.clone();
let port = ports.first()?.clone();
let url = address.hostname.or(Some(address.ip))?;
let name = port.name;
let port = port.port;
match name {
Some(name) => Some(format!("{name}://{url}:{port}")),
None => Some(url),
}
}
async fn update_certificate(
&self,
ctx: Arc<Context>,
fleet_config: &mut FleetConfig,
fleet_install: &Server,
) -> AddonConfigSyncResult<()> {
let ns = Namespace::from("default");
let cert_config_map: CertConfigMap = match fleet_install {
Server::InferLocal(true) => ctx.client.get("kube-root-ca.crt", &ns).await?,
Server::Custom(InstallOptions {
api_server_ca_config_ref: Some(config_ref),
..
}) => ctx.client.fetch(config_ref).await?,
_ => return Ok(()),
};
fleet_config.data.config.api_server_ca =
BASE64_STANDARD.encode(cert_config_map.data.ca_crt);
Ok(())
}
async fn update_url(
&self,
ctx: Arc<Context>,
fleet_config: &mut FleetConfig,
fleet_install: &Server,
) -> AddonConfigSyncResult<()> {
let api_server_url = match fleet_install {
Server::InferLocal(true) => {
if let Some(api_server_url) = {
let ns = Namespace::from("default");
self.default_endpoint_lookup(ctx.client.get("kubernetes", &ns).await?)
} {
api_server_url
} else {
return Ok(());
}
}
Server::Custom(InstallOptions {
api_server_url: Some(api_server_url),
..
}) => api_server_url.clone(),
_ => return Ok(()),
};
fleet_config.data.config.api_server_url = api_server_url;
Ok(())
}
}
pub type AddonConfigSyncResult<T> = std::result::Result<T, AddonConfigSyncError>;
#[derive(Error, Debug)]
pub enum AddonConfigSyncError {
#[error("Certificate config map fetch error: {0}")]
CertificateConfigMapFetch(#[from] kube::Error),
}
mod tests {
#[test]
fn test() {
use crate::controllers::addon_config::FleetConfigData;
let data = r#"{
"systemDefaultRegistry": "",
"agentImage": "rancher/fleet-agent:v0.9.4",
"agentImagePullPolicy": "IfNotPresent",
"apiServerURL": "https://192.168.1.123:43473",
"apiServerCA": "test",
"agentCheckinInterval": "15m",
"ignoreClusterRegistrationLabels": false,
"bootstrap": {
"paths": "",
"repo": "",
"secret": "",
"branch": "master",
"namespace": "fleet-local",
"agentNamespace": ""
},
"webhookReceiverURL": "",
"githubURLPrefix": ""
}"#;
let _config: FleetConfigData = serde_json::from_str(data).unwrap();
}
}

View File

@ -92,6 +92,7 @@ pub enum ConfigFetchError {
Lookup(#[from] kube::Error),
}
pub mod addon_config;
pub mod cluster;
pub mod cluster_class;
pub mod cluster_group;

View File

@ -1,4 +1,4 @@
use controllers::{BundleError, SyncError};
use controllers::{addon_config::AddonConfigSyncError, BundleError, SyncError};
use futures::channel::mpsc::TrySendError;
use thiserror::Error;
@ -16,6 +16,9 @@ pub enum Error {
#[error("Fleet error: {0}")]
FleetError(#[from] SyncError),
#[error("Fleet config error: {0}")]
FleetConfigError(#[from] AddonConfigSyncError),
#[error("Namespace trigger error: {0}")]
TriggerError(#[from] TrySendError<()>),

View File

@ -30,6 +30,7 @@ async fn main() -> anyhow::Result<()> {
// Init k8s controller state
let state = State::new();
let fleet_config_controller = controller::run_fleet_addon_config_controller(state.clone());
let cluster_controller = controller::run_cluster_controller(state.clone());
let cluster_class_controller = controller::run_cluster_class_controller(state.clone());
@ -46,6 +47,12 @@ async fn main() -> anyhow::Result<()> {
.shutdown_timeout(5)
.run();
tokio::join!(cluster_controller, cluster_class_controller, server).2?;
tokio::join!(
cluster_controller,
cluster_class_controller,
fleet_config_controller,
server
)
.3?;
Ok(())
}

View File

@ -3,6 +3,9 @@ kind: FleetAddonConfig
metadata:
name: fleet-addon-config
spec:
config:
server:
inferLocal: true
clusterClass:
patchResource: true
setOwnerReferences: true