Merge diff with main (#205)

Signed-off-by: Danil-Grigorev <danil.grigorev@suse.com>
This commit is contained in:
Danil Grigorev 2025-03-11 18:05:26 +01:00 committed by GitHub
parent 28ddaa5b1a
commit b46225cab7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 1077 additions and 512 deletions

View File

@ -67,3 +67,25 @@ jobs:
with:
name: artifacts-import
path: _out/gather
test-e2e-import-rke2:
runs-on: ubuntu-latest
steps:
- name: Install just
uses: extractions/setup-just@v2
- name: Install kind
uses: helm/kind-action@v1
with:
install_only: true
version: v0.26.0
- uses: actions/checkout@v4
- name: Test
run: just test-import-rke2
- name: Collect artifacts
if: always()
run: just collect-test-import
- name: Store run artifacts
uses: actions/upload-artifact@v4
if: always()
with:
name: artifacts-import-rke2
path: _out/gather

478
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -34,7 +34,7 @@ actix-web = "4.9.0"
futures = "0.3.28"
tokio = { version = "1.43.0", features = ["macros", "rt-multi-thread"] }
k8s-openapi = { version = "0.24", features = ["latest", "schemars"] }
kube = { version = "0.98", features = [
kube = { version = "0.98.0", features = [
"runtime",
"client",
"derive",
@ -58,9 +58,12 @@ tonic = { version = "0.12.3", optional = true }
thiserror = "2.0.11"
anyhow = "1.0.96"
base64 = "0.22.1"
clap = { version = "4.5.30", features = ["derive"] }
clap = { version = "4.5.31", features = ["derive"] }
cluster-api-rs = "1.9.5"
fleet-api-rs = "0.11.4"
fleet-api-rs = "0.11.5"
async-broadcast = "0.7.2"
pin-project = "1.1.10"
async-stream = "0.3.6"
[dev-dependencies]
assert-json-diff = "2.0.2"

View File

@ -27,11 +27,11 @@ Refer to the book [configuration](./docs/book/03_tutorials/02_configuration) sec
## Demo
<script src="https://asciinema.org/a/659626.js" id="asciicast-659626" async="true"></script>
[![asciicast](https://asciinema.org/a/659626.svg)](https://asciinema.org/a/659626)
## Calico CNI installation demo
<script src="https://asciinema.org/a/700924.js" id="asciicast-700924" async="true"></script>
[![asciicast](https://asciinema.org/a/700924.svg)](https://asciinema.org/a/700924)
## Get in contact

View File

@ -122,6 +122,35 @@ spec:
description: Namespace selection for the fleet agent
nullable: true
type: string
agentTolerations:
description: Agent taint toleration settings for every cluster
items:
description: The pod this Toleration is attached to tolerates any taint that matches the triple <key,value,effect> using the matching operator <operator>.
properties:
effect:
description: Effect indicates the taint effect to match. Empty means match all taint effects. When specified, allowed values are NoSchedule, PreferNoSchedule and NoExecute.
nullable: true
type: string
key:
description: Key is the taint key that the toleration applies to. Empty means match all taint keys. If the key is empty, operator must be Exists; this combination means to match all values and all keys.
nullable: true
type: string
operator:
description: Operator represents a key's relationship to the value. Valid operators are Exists and Equal. Defaults to Equal. Exists is equivalent to wildcard for value, so that a pod can tolerate all taints of a particular category.
nullable: true
type: string
tolerationSeconds:
description: TolerationSeconds represents the period of time the toleration (which must be of effect NoExecute, otherwise this field is ignored) tolerates the taint. By default, it is not set, which means tolerate the taint forever (do not evict). Zero and negative values will be treated as 0 (evict immediately) by the system.
format: int64
nullable: true
type: integer
value:
description: Value is the taint value the toleration matches to. If the operator is Exists, the value should be empty, otherwise just a regular string.
nullable: true
type: string
type: object
nullable: true
type: array
applyClassGroup:
description: Apply a ClusterGroup for a ClusterClass referenced from a different namespace.
nullable: true
@ -131,7 +160,7 @@ spec:
nullable: true
type: boolean
namespaceSelector:
description: 'Namespace label selector. If set, only clusters in the namespace matching label selector will be imported. WARN: this field controls the state of opened watches to the cluster. If changed, requires controller to be reloaded.'
description: Namespace label selector. If set, only clusters in the namespace matching label selector will be imported.
properties:
matchExpressions:
description: matchExpressions is a list of label selector requirements. The requirements are ANDed.
@ -178,7 +207,7 @@ spec:
nullable: true
type: boolean
selector:
description: 'Cluster label selector. If set, only clusters matching label selector will be imported. WARN: this field controls the state of opened watches to the cluster. If changed, requires controller to be reloaded.'
description: Cluster label selector. If set, only clusters matching label selector will be imported.
properties:
matchExpressions:
description: matchExpressions is a list of label selector requirements. The requirements are ANDed.

View File

@ -29,7 +29,7 @@ spec:
server:
inferLocal: true # Uses default `kuberenetes` endpoint and secret for APIServerURL configuration
install:
version: v0.12.0-alpha.14 # We will install alpha for helmapp support
version: v0.12.0-beta.1 # We will install alpha for helmapp support
```
### Fleet Public URL and Certificate setup
@ -52,7 +52,7 @@ spec:
server:
inferLocal: true # Uses default `kuberenetes` endpoint and secret for APIServerURL configuration
install:
version: v0.12.0-alpha.14 # We will install alpha for helmapp support
version: v0.12.0-beta.1 # We will install alpha for helmapp support
```
This scenario works well in a test setup, while using CAPI docker provider and docker clusters.

View File

@ -27,8 +27,8 @@ kubectl config view -o json --raw | jq -r '.clusters[] | select(.name=="kind-dev
# Set the API server URL
API_SERVER_URL=`kubectl config view -o json --raw | jq -r '.clusters[] | select(.name=="kind-dev").cluster["server"]'`
# And proceed with the installation via helm
helm -n cattle-fleet-system install --version v0.12.0-alpha.14 --create-namespace --wait fleet-crd fleet/fleet-crd
helm install --create-namespace --version v0.12.0-alpha.14 -n cattle-fleet-system --set apiServerURL=$API_SERVER_URL --set-file apiServerCA=_out/ca.pem fleet fleet/fleet --wait
helm -n cattle-fleet-system install --version v0.12.0-beta.1 --create-namespace --wait fleet-crd fleet/fleet-crd
helm install --create-namespace --version v0.12.0-beta.1 -n cattle-fleet-system --set apiServerURL=$API_SERVER_URL --set-file apiServerCA=_out/ca.pem fleet fleet/fleet --wait
```
4. Install CAPI with the required experimental features enabled and initialized the Docker provider for testing.
```

View File

@ -47,3 +47,7 @@ This should result in a `kindnet` running on the matching cluster:
kube-system kindnet-dqzwh 1/1 Running 0 2m11s
kube-system kindnet-jbkjq 1/1 Running 0 2m11s
```
### Demo
<script src="https://asciinema.org/a/6x8WmsCXJQdDswAwfYHQlaVsj.js" id="asciicast-6x8WmsCXJQdDswAwfYHQlaVsj" async="true" data-start-at="327"></script>

View File

@ -3,7 +3,8 @@
<div class="warning">
Note: For this setup to work, you need to install Fleet and Fleet CRDs charts via
`FleetAddonConfig` resource. Both need to have version version >= v0.12.0-alpha.14, which provides support
`FleetAddonConfig` resource. Both need to have version >= v0.12.0-beta.1,
which provides support for `HelmApp` resource.
</div>
@ -13,7 +14,7 @@ In this tutorial we will deploy `Calico` CNI using `HelmApp` resource and `Fleet
Here's an example of how a `HelmApp` resource can be used in combination with templateValues to deploy application consistently on any matching cluster.
In this scenario we are matching cluster directly by name, using `clusterName` reference, but a `clusterGroup` or a label based selection can be used instead:
In this scenario we are matching cluster directly by name, using `clusterName` reference, but a `clusterGroup` or a label based selection can be used instead or together with `clusterName`:
```yaml
targets:
- clusterName: docker-demo
@ -22,33 +23,7 @@ In this scenario we are matching cluster directly by name, using `clusterName` r
We are deploying `HelmApp` resource in the `default` namespace. The namespace should be the same for the CAPI Cluster for fleet to locate it.
```yaml
apiVersion: fleet.cattle.io/v1alpha1
kind: HelmApp
metadata:
name: calico
spec:
helm:
releaseName: projectcalico
repo: https://docs.tigera.io/calico/charts
chart: tigera-operator
templateValues:
installation: |-
cni:
type: Calico
ipam:
type: HostLocal
calicoNetwork:
bgp: Disabled
mtu: 1350
ipPools:
${- range $cidr := .ClusterValues.Cluster.spec.clusterNetwork.pods.cidrBlocks }
- cidr: "${ $cidr }"
encapsulation: None
natOutgoing: Enabled
nodeSelector: all()${- end}
insecureSkipTLSVerify: true
targets:
- clusterName: docker-demo
{{#include ../../../testdata/helm.yaml}}
```
`HelmApp` supports fleet [templating][] options, otherwise available exclusively to the `fleet.yaml` configuration, stored in the [git repository contents][], and applied via the `GitRepo` resource.

View File

@ -0,0 +1,98 @@
# Installing Calico CNI using GitRepo
<div class="warning">
Note: For this setup to work, you need have Fleet and Fleet CRDs charts installed
with version >= `v0.12.0-alpha.14`.
</div>
In this tutorial we will deploy `Calico` CNI using `GitRepo` resource on `RKE2` based docker cluster.
## Deploying RKE2 docker cluster
We will first need to create a RKE2 based docker cluster from templates:
```bash
> kubectl apply -f testdata/cluster_docker_rke2.yaml
dockercluster.infrastructure.cluster.x-k8s.io/docker-demo created
cluster.cluster.x-k8s.io/docker-demo created
dockermachinetemplate.infrastructure.cluster.x-k8s.io/docker-demo-control-plane created
rke2controlplane.controlplane.cluster.x-k8s.io/docker-demo-control-plane created
dockermachinetemplate.infrastructure.cluster.x-k8s.io/docker-demo-md-0 created
rke2configtemplate.bootstrap.cluster.x-k8s.io/docker-demo-md-0 created
machinedeployment.cluster.x-k8s.io/docker-demo-md-0 created
configmap/docker-demo-lb-config created
```
In this scenario cluster is located in the `default` namespace, where the rest of fleet objects will go.
Cluster is labeled with `cni: calico` in order for the `GitRepo` to match on it.
```yaml
apiVersion: cluster.x-k8s.io/v1beta1
kind: Cluster
metadata:
name: docker-demo
labels:
cni: calico
```
Now that cluster is created, `GitRepo` can be applied which will be evaluated asynchroniously.
## Deploying Calico CNI via `GitRepo`
We will first review the content of our `fleet.yaml` file:
```yaml
{{#include ../../../fleet/applications/calico/fleet.yaml}}
```
In this scenario we are using `helm` definition which is consistent with the `HelmApp` spec from the [previous][] guide, and defines same templating rules.
We also need to [resolve conflicts][], which happen due to in-place modification of some resources by the `calico` controllers. For that, the `diff` section is used, where we remove blocking fields from comparison.
[previous]: ./03_installing_calico.md
[resolve conflicts]: https://fleet.rancher.io/bundle-diffs
Then we are specifying `targets.yaml` file, which will declare selection rules for this `fleet.yaml` configuration. In our case, we will match on clusters labeled with `cni: calico` label:
```yaml
{{#include ../../../fleet/applications/calico/targets.yaml}}
```
Once everything is ready, we need to apply our `GitRepo` in the `default` namespace:
```yaml
{{#include ../../../testdata/gitrepo-calico.yaml}}
```
```bash
> kubectl apply -f testdata/gitrepo-calico.yaml
gitrepo.fleet.cattle.io/calico created
# After some time
> kubectl get gitrepo
NAME REPO COMMIT BUNDLEDEPLOYMENTS-READY STATUS
calico https://github.com/rancher-sandbox/cluster-api-addon-provider-fleet.git 62b4fe6944687e02afb331b9e1839e33c539f0c7 1/1
```
Now our cluster have `calico` installed, and all nodes are marked as `Ready`:
```bash
# exec into one of the CP node containers
> docker exec -it fef3427009f6 /bin/bash
root@docker-demo-control-plane-krtnt:/#
root@docker-demo-control-plane-krtnt:/# kubectl get pods -n calico-system --kubeconfig /var/lib/rancher/rke2/server/cred/api-server.kubeconfig
NAME READY STATUS RESTARTS AGE
calico-kube-controllers-55cbcc7467-j5bbd 1/1 Running 0 3m30s
calico-node-mbrqg 1/1 Running 0 3m30s
calico-node-wlbwn 1/1 Running 0 3m30s
calico-typha-f48c7ddf7-kbq6d 1/1 Running 0 3m30s
csi-node-driver-87tlx 2/2 Running 0 3m30s
csi-node-driver-99pqw 2/2 Running 0 3m30s
```
## Demo
You can follow along with the demo to verify that your deployment is matching expected result:
<script src="https://asciinema.org/a/706570.js" id="asciicast-706570" async="true"></script>

View File

@ -19,8 +19,6 @@ Fleet mainly relies on `Cluster` labels, `Cluster` names and `ClusterGroups` whe
`FleetAddonConfig` provides several configuration options to define clusters to import.
**Note: Please be aware that chaning selection configuration requires restart of the `CAAPF` instance, as these selection options directly translate into watch configurations for controllers established on the `API` server.**
### Namespace Label Selection
This section defines how to select namespaces based on specific labels. The `namespaceSelector` field ensures that the import strategy applies only to namespaces that have the label `import: "true"`. This is useful for scoping automatic import to specific namespaces rather than applying it cluster-wide.

View File

@ -0,0 +1,27 @@
helm:
releaseName: projectcalico
repo: https://docs.tigera.io/calico/charts
chart: tigera-operator
templateValues:
installation: |-
cni:
type: Calico
ipam:
type: HostLocal
calicoNetwork:
bgp: Disabled
mtu: 1350
ipPools:
${- range $cidr := .ClusterValues.Cluster.spec.clusterNetwork.pods.cidrBlocks }
- cidr: "${ $cidr }"
encapsulation: None
natOutgoing: Enabled
nodeSelector: all()${- end}
diff:
comparePatches:
- apiVersion: operator.tigera.io/v1
kind: Installation
name: default
operations:
- {"op":"remove", "path":"/spec/kubernetesProvider"}

View File

@ -0,0 +1,4 @@
targets:
- clusterSelector:
matchLabels:
cni: calico

View File

@ -57,7 +57,6 @@ compile features="": _create-out-dir
[private]
_build features="":
just compile {{features}}
docker buildx build -t {{ORG}}/{{NAME}}:{{TAG}} .
# docker build base
@ -77,6 +76,10 @@ docker-build:
docker-push:
docker push {{ORG}}/{{NAME}}:{{TAG}}
build-and-load:
docker build . -t {{ORG}}/{{NAME}}:{{TAG}}
kind load docker-image {{ORG}}/{{NAME}}:{{TAG}} --name dev
load-base features="":
just _build {{features}}
kind load docker-image {{ORG}}/{{NAME}}:{{TAG}} --name dev
@ -100,14 +103,30 @@ deploy-kindnet:
deploy-calico:
kubectl --context kind-dev apply -f testdata/helm.yaml
deploy-calico-gitrepo: _download-yq
#!/usr/bin/env bash
set -euxo pipefail
repo=`git remote get-url origin`
branch=`git branch --show-current`
cp testdata/gitrepo-calico.yaml {{OUT_DIR}}/gitrepo-calico.yaml
yq -i ".spec.repo = \"${repo}\"" {{OUT_DIR}}/gitrepo-calico.yaml
yq -i ".spec.branch = \"${branch}\"" {{OUT_DIR}}/gitrepo-calico.yaml
kubectl apply -f {{OUT_DIR}}/gitrepo-calico.yaml
# Deploy an example app bundle to the cluster
deploy-app:
kubectl --context kind-dev apply -f testdata/bundle.yaml
# Deploy child cluster using docker & kubeadm
deploy-child-cluster:
kind delete cluster --name docker-demo || true
kubectl --context kind-dev apply -f testdata/cluster_docker_kcp.yaml
# Deploy child cluster using docker & rke2
deploy-child-rke2-cluster:
kind delete cluster --name docker-demo || true
kubectl --context kind-dev apply -f testdata/cluster_docker_rke2.yaml
# Deploy child cluster-call based cluster using docker & kubeadm
deploy-child-cluster-class:
kind delete cluster --name capi-quickstart || true
@ -115,11 +134,7 @@ deploy-child-cluster-class:
# Add and update helm repos used
update-helm-repos:
#helm repo add gitea-charts https://dl.gitea.com/charts/
helm repo add fleet https://rancher.github.io/fleet-helm-charts/
#helm repo add jetstack https://charts.jetstack.io
#helm repo add traefik https://traefik.github.io/charts
#helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
helm repo update
# Install fleet into the k8s cluster
@ -136,7 +151,7 @@ install-capi: _download-clusterctl
# Deploy will deploy the operator
deploy features="": _download-kustomize
just generate {{features}}
just load-base {{features}}
just build-and-load
kustomize build config/default | kubectl apply -f -
kubectl --context kind-dev apply -f testdata/config.yaml
kubectl wait fleetaddonconfigs fleet-addon-config --for=jsonpath='{.status.installedVersion}' --timeout=150s
@ -153,6 +168,12 @@ test-import: start-dev deploy deploy-child-cluster deploy-kindnet deploy-app &&
kubectl wait cluster --timeout=500s --for=condition=ControlPlaneReady=true docker-demo
kubectl wait clusters.fleet.cattle.io --timeout=300s --for=condition=Ready=true docker-demo
# Full e2e test of importing cluster in fleet
test-import-rke2: start-dev deploy deploy-child-rke2-cluster deploy-calico-gitrepo deploy-app
kubectl wait pods --for=condition=Ready --timeout=150s --all --all-namespaces
kubectl wait cluster --timeout=500s --for=condition=ControlPlaneReady=true docker-demo
kubectl wait clusters.fleet.cattle.io --timeout=300s --for=condition=Ready=true docker-demo
collect-test-import:
-just collect-artifacts dev
-just collect-artifacts docker-demo

View File

@ -1,5 +1,8 @@
use cluster_api_rs::capi_cluster::{ClusterSpec, ClusterStatus};
use kube::{api::{ObjectMeta, TypeMeta}, Resource};
use kube::{
api::{ObjectMeta, TypeMeta},
Resource,
};
use serde::{Deserialize, Serialize};
#[derive(Resource, Serialize, Deserialize, Clone, Debug, Default, PartialEq)]

View File

@ -1,5 +1,8 @@
use cluster_api_rs::capi_clusterclass::{ClusterClassSpec, ClusterClassStatus};
use kube::{api::{ObjectMeta, TypeMeta}, Resource};
use kube::{
api::{ObjectMeta, TypeMeta},
Resource,
};
use serde::{Deserialize, Serialize};
#[derive(Resource, Serialize, Deserialize, Clone, Debug, Default, PartialEq)]

View File

@ -1,4 +1,4 @@
use fleet_api_rs::fleet_cluster::ClusterAgentEnvVars;
use fleet_api_rs::fleet_cluster::{ClusterAgentEnvVars, ClusterAgentTolerations};
use k8s_openapi::{
api::core::v1::ObjectReference, apimachinery::pkg::apis::meta::v1::LabelSelector,
};
@ -107,6 +107,10 @@ pub struct ClusterConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub agent_namespace: Option<String>,
/// Agent taint toleration settings for every cluster
#[serde(skip_serializing_if = "Option::is_none")]
pub agent_tolerations: Option<Vec<ClusterAgentTolerations>>,
/// Host network allows to deploy agent configuration using hostNetwork: true setting
/// which eludes dependency on the CNI configuration for the cluster.
#[serde(skip_serializing_if = "Option::is_none")]
@ -134,6 +138,32 @@ impl ClusterConfig {
.unwrap_or(AGENT_NAMESPACE.to_string())
}
pub(crate) fn agent_tolerations(&self) -> Vec<ClusterAgentTolerations> {
let agent_tolerations = vec![
ClusterAgentTolerations {
effect: Some("NoSchedule".into()),
operator: Some("Exists".into()),
key: Some("node.kubernetes.io/not-ready".into()),
..Default::default()
},
ClusterAgentTolerations {
effect: Some("NoSchedule".into()),
operator: Some("Exists".into()),
key: Some("node.cluster.x-k8s.io/uninitialized".into()),
..Default::default()
},
ClusterAgentTolerations {
effect: Some("NoSchedule".into()),
operator: Some("Equal".into()),
key: Some("node.cloudprovider.kubernetes.io/uninitialized".into()),
value: Some("true".into()),
..Default::default()
},
];
self.agent_tolerations.clone().unwrap_or(agent_tolerations)
}
#[cfg(feature = "agent-initiated")]
pub(crate) fn agent_initiated_connection(&self) -> bool {
self.agent_initiated.filter(|&set| set).is_some()
@ -167,6 +197,7 @@ impl Default for ClusterConfig {
selectors: Default::default(),
patch_resource: Some(true),
agent_env_vars: None,
agent_tolerations: None,
}
}
}
@ -233,11 +264,9 @@ impl NamingStrategy {
#[serde(rename_all = "camelCase")]
pub struct Selectors {
/// Namespace label selector. If set, only clusters in the namespace matching label selector will be imported.
/// WARN: this field controls the state of opened watches to the cluster. If changed, requires controller to be reloaded.
pub namespace_selector: LabelSelector,
/// Cluster label selector. If set, only clusters matching label selector will be imported.
/// WARN: this field controls the state of opened watches to the cluster. If changed, requires controller to be reloaded.
pub selector: LabelSelector,
}

View File

@ -1,6 +1,9 @@
use serde::{Deserialize, Serialize};
use fleet_api_rs::fleet_cluster::{ClusterSpec, ClusterStatus};
use kube::{api::{ObjectMeta, TypeMeta}, Resource};
use kube::{
api::{ObjectMeta, TypeMeta},
Resource,
};
use serde::{Deserialize, Serialize};
#[derive(Resource, Serialize, Deserialize, Clone, Debug, Default, PartialEq)]
#[resource(inherit = fleet_api_rs::fleet_cluster::Cluster)]

View File

@ -1,8 +1,11 @@
use serde::{Deserialize, Serialize};
use fleet_api_rs::fleet_cluster_registration_token::{
ClusterRegistrationTokenSpec, ClusterRegistrationTokenStatus,
};
use kube::{api::{ObjectMeta, TypeMeta}, Resource};
use kube::{
api::{ObjectMeta, TypeMeta},
Resource,
};
use serde::{Deserialize, Serialize};
#[derive(Resource, Serialize, Deserialize, Clone, Debug, Default, PartialEq)]
#[resource(inherit = fleet_api_rs::fleet_cluster_registration_token::ClusterRegistrationToken)]

View File

@ -1,6 +1,9 @@
use serde::{Deserialize, Serialize};
use fleet_api_rs::fleet_clustergroup::{ClusterGroupSpec, ClusterGroupStatus};
use kube::{api::{ObjectMeta, TypeMeta}, Resource};
use kube::{
api::{ObjectMeta, TypeMeta},
Resource,
};
use serde::{Deserialize, Serialize};
#[derive(Resource, Serialize, Deserialize, Clone, Debug, Default, PartialEq)]
#[resource(inherit = fleet_api_rs::fleet_clustergroup::ClusterGroup)]

View File

@ -6,13 +6,16 @@ use crate::api::fleet_clustergroup::ClusterGroup;
use crate::controllers::addon_config::FleetConfig;
use crate::controllers::controller::{fetch_config, Context, FleetController};
use crate::metrics::Diagnostics;
use crate::multi_dispatcher::{broadcaster, BroadcastStream, MultiDispatcher};
use crate::{Error, Metrics};
use clap::Parser;
use futures::channel::mpsc;
use futures::StreamExt;
use futures::stream::SelectAll;
use futures::{Stream, StreamExt};
use k8s_openapi::api::core::v1::Namespace;
use kube::api::{DynamicObject, ListParams};
use kube::core::DeserializeGuard;
use kube::runtime::reflector::ObjectRef;
use kube::runtime::{metadata_watcher, predicates, reflector, watcher, WatchStreamExt};
@ -26,16 +29,22 @@ use kube::{
},
};
use tokio::sync::Mutex;
use tokio::time::sleep;
use std::future;
use std::ops::Deref;
use std::pin::Pin;
use std::sync::Arc;
use tokio::{sync::RwLock, time::Duration};
use tracing::{self, warn};
use tracing::{self, info, warn};
type DynamicStream = SelectAll<
Pin<Box<dyn Stream<Item = Result<watcher::Event<DynamicObject>, watcher::Error>> + Send>>,
>;
/// State shared between the controller and the web server
#[derive(Clone, Default)]
#[derive(Clone)]
pub struct State {
/// Diagnostics populated by the reconciler
diagnostics: Arc<RwLock<Diagnostics>>,
@ -45,6 +54,11 @@ pub struct State {
/// Additional flags for controller
pub flags: Flags,
// dispatcher
dispatcher: MultiDispatcher,
// shared stream of dynamic events
stream: BroadcastStream<DynamicStream>,
}
#[derive(Parser, Debug, Clone, Default)]
@ -55,6 +69,12 @@ pub struct Flags {
}
/// State wrapper around the controller outputs for the web server
impl Default for State {
fn default() -> Self {
Self::new()
}
}
impl State {
pub fn new() -> Self {
let registry = Default::default();
@ -62,7 +82,9 @@ impl State {
metrics: Metrics::default().register(&registry).unwrap(),
registry,
flags: Flags::parse(),
..Default::default()
dispatcher: MultiDispatcher::new(128),
diagnostics: Default::default(),
stream: BroadcastStream::new(Default::default()),
}
}
@ -82,6 +104,8 @@ impl State {
client,
metrics: self.metrics.clone(),
diagnostics: self.diagnostics.clone(),
dispatcher: self.dispatcher.clone(),
stream: self.stream.clone(),
})
}
}
@ -90,20 +114,53 @@ pub async fn run_fleet_addon_config_controller(state: State) {
let client = Client::try_default()
.await
.expect("failed to create kube Client");
let api: Api<FleetAddonConfig> = Api::all(client.clone());
let fleet_addon_config_controller = Controller::new(api, watcher::Config::default())
.watches(
Api::<DeserializeGuard<FleetConfig>>::all(client.clone()),
Config::default().fields("metadata.name=fleet-controller"),
|config| config.0.ok().map(|_| ObjectRef::new("fleet-addon-config")),
)
.run(
FleetAddonConfig::reconcile_config_sync,
error_policy,
state.to_context(client.clone()),
)
let config_controller = Controller::new(
Api::<FleetAddonConfig>::all(client.clone()),
Config::default().any_semantic(),
)
.watches(
Api::<DeserializeGuard<FleetConfig>>::all(client.clone()),
Config::default().fields("metadata.name=fleet-controller"),
|config| config.0.ok().map(|_| ObjectRef::new("fleet-addon-config")),
)
.shutdown_on_signal()
.run(
FleetAddonConfig::reconcile_config_sync,
error_policy,
state.to_context(client.clone()),
)
.for_each(|_| futures::future::ready(()));
let dynamic_watches_controller = Controller::new(
Api::<FleetAddonConfig>::all(client.clone()),
Config::default().any_semantic(),
)
.shutdown_on_signal()
.run(
FleetAddonConfig::reconcile_dynamic_watches,
error_policy,
state.to_context(client.clone()),
)
.for_each(|_| futures::future::ready(()));
let watcher = broadcaster(state.dispatcher.clone(), state.stream.clone())
.for_each(|_| futures::future::ready(()));
tokio::join!(fleet_addon_config_controller);
// Reconcile initial state of watches
Arc::new(
fetch_config(client.clone())
.await
.expect("failed to get FleetAddonConfig resource"),
)
.update_watches(state.to_context(client.clone()))
.await
.expect("Initial dynamic watches setup to succeed");
tokio::select! {
_ = watcher => {panic!("This should not happen before controllers exit")},
_ = futures::future::join(dynamic_watches_controller, config_controller) => {}
};
}
pub async fn run_fleet_helm_controller(state: State) {
@ -112,6 +169,7 @@ pub async fn run_fleet_helm_controller(state: State) {
.expect("failed to create kube Client");
let api: Api<FleetAddonConfig> = Api::all(client.clone());
let fleet_addon_config_controller = Controller::new(api, watcher::Config::default())
.shutdown_on_signal()
.run(
FleetAddonConfig::reconcile_helm,
error_policy,
@ -127,29 +185,21 @@ pub async fn run_cluster_controller(state: State) {
.await
.expect("failed to create kube Client");
loop {
let clusters = Api::<fleet_cluster::Cluster>::all(client.clone());
if let Err(e) = clusters.list(&ListParams::default().limit(1)).await {
info!("Fleet Clusters are not queryable; {e:?}. Is the CRD installed?");
sleep(Duration::new(5, 0)).await;
continue;
}
break;
}
let config = fetch_config(client.clone())
.await
.expect("failed to get FleetAddonConfig resource");
let (reader, writer) = reflector::store();
let clusters = watcher(
Api::<Cluster>::all(client.clone()),
Config::default()
.labels_from(
&config
.cluster_watch()
.expect("valid cluster label selector"),
)
.any_semantic(),
)
.default_backoff()
.modify(|c| {
c.managed_fields_mut().clear();
})
.reflect(writer)
.touched_objects()
.predicate_filter(predicates::resource_version);
let fleet = metadata_watcher(
Api::<fleet_cluster::Cluster>::all(client.clone()),
Config::default().any_semantic(),
@ -159,7 +209,8 @@ pub async fn run_cluster_controller(state: State) {
.predicate_filter(predicates::resource_version);
let (invoke_reconcile, namespace_trigger) = mpsc::channel(0);
let clusters = Controller::for_stream(clusters, reader)
let (sub, reader) = state.dispatcher.subscribe();
let clusters = Controller::for_shared_stream(sub, reader)
.owns_stream(fleet)
.reconcile_all_on(namespace_trigger)
.shutdown_on_signal()
@ -178,28 +229,8 @@ pub async fn run_cluster_controller(state: State) {
return clusters.await;
}
let (reader, writer) = reflector::store();
let namespaces = metadata_watcher(
Api::<Namespace>::all(client.clone()),
Config::default()
.labels_from(
&config
.namespace_selector()
.expect("valid namespace selector"),
)
.any_semantic(),
)
.default_backoff()
.modify(|ns| {
ns.managed_fields_mut().clear();
ns.annotations_mut().clear();
ns.labels_mut().clear();
})
.reflect(writer)
.touched_objects()
.predicate_filter(predicates::resource_version);
let ns_controller = Controller::for_stream(namespaces, reader)
let (sub, reader) = state.dispatcher.subscribe::<Namespace>();
let ns_controller = Controller::for_shared_stream(sub, reader)
.shutdown_on_signal()
.run(
Cluster::reconcile_ns,

View File

@ -1,19 +1,24 @@
use base64::prelude::*;
use cluster_api_rs::capi_cluster::Cluster;
use futures::StreamExt as _;
use std::{fmt::Display, io, str::FromStr, sync::Arc, time::Duration};
use k8s_openapi::api::core::v1::{ConfigMap, Endpoints};
use k8s_openapi::api::core::v1::{self, ConfigMap, Endpoints};
use kube::{
api::{ObjectMeta, Patch, PatchParams, TypeMeta},
api::{ApiResource, ObjectMeta, Patch, PatchParams, TypeMeta},
client::scope::Namespace,
core::object::HasSpec,
runtime::controller::Action,
runtime::{
controller::Action,
watcher::{self, Config},
},
Api, Resource, ResourceExt,
};
use serde::{ser, Deserialize, Serialize};
use serde_json::Value;
use serde_with::{serde_as, DisplayFromStr};
use thiserror::Error;
use tracing::instrument;
use tracing::{info, instrument};
use crate::{
api::fleet_addon_config::{FleetAddonConfig, Install, InstallOptions, Server},
@ -132,6 +137,52 @@ impl FleetAddonConfig {
)
.await?;
info!("Updated fleet config map");
Ok(Action::await_change())
}
#[instrument(skip_all, fields(trace_id = display(telemetry::get_trace_id()), name = self.name_any(), namespace = self.namespace()))]
pub async fn update_watches(self: Arc<Self>, ctx: Arc<Context>) -> DynamiWatcherResult<Action> {
info!("Reconciling dynamic watches");
let cluster_selector = self.cluster_watch()?;
let ns_selector = self.namespace_selector()?;
let mut stream = ctx.stream.stream.lock().await;
stream.clear();
stream.push(
watcher::watcher(
Api::all_with(ctx.client.clone(), &ApiResource::erase::<Cluster>(&())),
Config::default()
.labels_from(&cluster_selector)
.any_semantic(),
)
.boxed(),
);
stream.push(
watcher::watcher(
Api::all_with(
ctx.client.clone(),
&ApiResource::erase::<v1::Namespace>(&()),
),
Config::default().labels_from(&ns_selector).any_semantic(),
)
.boxed(),
);
info!("Reconciled dynamic watches to match selectors: namespace={ns_selector}, cluster={cluster_selector}");
Ok(Action::await_change())
}
#[instrument(skip_all, fields(trace_id = display(telemetry::get_trace_id()), name = self.name_any(), namespace = self.namespace()))]
pub async fn reconcile_dynamic_watches(
self: Arc<Self>,
ctx: Arc<Context>,
) -> crate::Result<Action> {
self.update_watches(ctx).await?;
Ok(Action::await_change())
}
@ -314,6 +365,14 @@ pub enum AddonConfigSyncError {
CommandError(#[from] io::Error),
}
pub type DynamiWatcherResult<T> = std::result::Result<T, DynamicWatcherError>;
#[derive(Error, Debug)]
pub enum DynamicWatcherError {
#[error("Invalid selector encountered: {0}")]
SelectorParseError(#[from] kube::core::ParseExpressionError),
}
mod tests {
#[test]
fn test() {

View File

@ -8,7 +8,7 @@ use crate::api::fleet_cluster_registration_token::ClusterRegistrationToken;
use crate::api::fleet_clustergroup::ClusterGroup;
use crate::Error;
use cluster_api_rs::capi_cluster::ClusterTopology;
use fleet_api_rs::fleet_cluster::{ClusterAgentTolerations, ClusterSpec};
use fleet_api_rs::fleet_cluster::ClusterSpec;
use fleet_api_rs::fleet_clustergroup::{ClusterGroupSelector, ClusterGroupSpec};
use futures::channel::mpsc::Sender;
use k8s_openapi::api::core::v1::Namespace;
@ -184,22 +184,6 @@ impl Cluster {
None | Some(ClusterTopology { .. }) => self.labels().clone(),
};
let agent_tolerations = Some(vec![
ClusterAgentTolerations {
effect: Some("NoSchedule".into()),
operator: Some("Equal".into()),
key: Some("node.kubernetes.io/not-ready".into()),
..Default::default()
},
ClusterAgentTolerations {
effect: Some("NoSchedule".into()),
operator: Some("Equal".into()),
key: Some("node.cloudprovider.kubernetes.io/uninitialized".into()),
value: Some("true".into()),
..Default::default()
},
]);
fleet_cluster::Cluster {
types: Some(TypeMeta::resource::<fleet_cluster::Cluster>()),
metadata: ObjectMeta {
@ -216,18 +200,18 @@ impl Cluster {
true => ClusterSpec {
client_id: Some(Alphanumeric.sample_string(&mut rand::rng(), 64)),
agent_namespace: config.agent_install_namespace().into(),
agent_tolerations: config.agent_tolerations().into(),
host_network: config.host_network,
agent_env_vars: config.agent_env_vars,
agent_tolerations,
..Default::default()
}
.into(),
false => ClusterSpec {
kube_config_secret: Some(format!("{}-kubeconfig", self.name_any())),
agent_namespace: config.agent_install_namespace().into(),
agent_tolerations: config.agent_tolerations().into(),
host_network: config.host_network,
agent_env_vars: config.agent_env_vars,
agent_tolerations,
..Default::default()
}
.into(),
@ -236,9 +220,9 @@ impl Cluster {
spec: ClusterSpec {
kube_config_secret: Some(format!("{}-kubeconfig", self.name_any())),
agent_namespace: config.agent_install_namespace().into(),
agent_tolerations: config.agent_tolerations().into(),
host_network: config.host_network,
agent_env_vars: config.agent_env_vars,
agent_tolerations,
..Default::default()
},
..Default::default()

View File

@ -1,21 +1,25 @@
use crate::api::fleet_addon_config::FleetAddonConfig;
use crate::controllers::PatchError;
use crate::metrics::Diagnostics;
use crate::multi_dispatcher::{BroadcastStream, MultiDispatcher};
use crate::{telemetry, Error, Metrics};
use chrono::Utc;
use futures::stream::SelectAll;
use futures::Stream;
use k8s_openapi::NamespaceResourceScope;
use kube::api::{Patch, PatchParams, PostParams};
use kube::api::{DynamicObject, Patch, PatchParams, PostParams};
use kube::runtime::events::{Event, EventType};
use kube::runtime::finalizer;
use kube::runtime::{finalizer, watcher};
use kube::{api::Api, client::Client, runtime::controller::Action};
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{self, debug, info, instrument};
@ -26,6 +30,10 @@ use super::{
pub static FLEET_FINALIZER: &str = "fleet.addons.cluster.x-k8s.io";
type DynamicStream = SelectAll<
Pin<Box<dyn Stream<Item = Result<watcher::Event<DynamicObject>, watcher::Error>> + Send>>,
>;
// Context for the reconciler
#[derive(Clone)]
pub struct Context {
@ -35,6 +43,10 @@ pub struct Context {
pub diagnostics: Arc<RwLock<Diagnostics>>,
/// Prom metrics
pub metrics: Metrics,
// Dispatcher for dynamic resource controllers
pub dispatcher: MultiDispatcher,
// shared stream of dynamic events
pub stream: BroadcastStream<DynamicStream>,
}
pub(crate) async fn get_or_create<R>(ctx: Arc<Context>, res: R) -> GetOrCreateResult<Action>

View File

@ -1,4 +1,7 @@
use controllers::{addon_config::AddonConfigSyncError, BundleError, SyncError};
use controllers::{
addon_config::{AddonConfigSyncError, DynamicWatcherError},
BundleError, SyncError,
};
use futures::channel::mpsc::TrySendError;
use thiserror::Error;
@ -19,6 +22,9 @@ pub enum Error {
#[error("Fleet config error: {0}")]
FleetConfigError(#[from] AddonConfigSyncError),
#[error("Dynamic watcher error: {0}")]
DynamicWatcherError(#[from] DynamicWatcherError),
#[error("Namespace trigger error: {0}")]
TriggerError(#[from] TrySendError<()>),
@ -44,6 +50,7 @@ pub mod controller;
pub use crate::controller::*;
pub mod api;
pub mod controllers;
mod multi_dispatcher;
pub mod predicates;
/// Log and trace integrations

236
src/multi_dispatcher.rs Normal file
View File

@ -0,0 +1,236 @@
use std::{
hash::Hash,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use async_broadcast::{InactiveReceiver, Receiver, Sender};
use async_stream::stream;
use futures::{lock::Mutex, ready, Stream, StreamExt as _};
use kube::{
api::{DynamicObject, GroupVersionKind},
runtime::{
reflector::{store::Writer, Lookup, Store},
watcher::{Event, Result},
},
Resource,
};
use pin_project::pin_project;
use serde::de::DeserializeOwned;
#[derive(Clone)]
pub struct MultiDispatcher {
dispatch_tx: Sender<Event<DynamicObject>>,
// An inactive reader that prevents the channel from closing until the
// writer is dropped.
_dispatch_rx: InactiveReceiver<Event<DynamicObject>>,
}
impl MultiDispatcher {
#[must_use]
pub fn new(buf_size: usize) -> Self {
// Create a broadcast (tx, rx) pair
let (mut dispatch_tx, dispatch_rx) = async_broadcast::broadcast(buf_size);
// The tx half will not wait for any receivers to be active before
// broadcasting events. If no receivers are active, events will be
// buffered.
dispatch_tx.set_await_active(false);
Self {
dispatch_tx,
_dispatch_rx: dispatch_rx.deactivate(),
}
}
/// Return a handle to a typed subscriber
///
/// Multiple subscribe handles may be obtained, by either calling
/// `subscribe` multiple times, or by calling `clone()`
///
/// This function returns a `Some` when the [`Writer`] is constructed through
/// [`Writer::new_shared`] or [`store_shared`], and a `None` otherwise.
#[must_use]
pub fn subscribe<K>(&self) -> (TypedReflectHandle<K>, Store<K>)
where
K: Resource + Clone + DeserializeOwned,
K::DynamicType: Eq + Clone + Hash + Default,
{
let sub = TypedReflectHandle::new(self.dispatch_tx.new_receiver());
let reader = sub.reader();
(sub, reader)
}
/// Broadcast an event to any downstream listeners subscribed on the store
pub(crate) async fn broadcast_event(&mut self, event: &Event<DynamicObject>) {
match event {
// Broadcast stores are pre-initialized
Event::InitDone => {}
ev => {
let _ = self.dispatch_tx.broadcast_direct(ev.clone()).await;
}
}
}
}
/// `BroadcastStream` allows to stream shared list of dynamic objects,
/// sources of which can be changed at any moment.
pub struct BroadcastStream<W> {
pub stream: Arc<Mutex<W>>,
}
impl<W> Clone for BroadcastStream<W> {
fn clone(&self) -> Self {
Self {
stream: self.stream.clone(),
}
}
}
impl<W> BroadcastStream<W>
where
W: Stream<Item = Result<Event<DynamicObject>>> + Unpin,
{
pub fn new(stream: Arc<Mutex<W>>) -> Self {
Self { stream }
}
}
impl<W> Stream for BroadcastStream<W>
where
W: Stream<Item = Result<Event<DynamicObject>>> + Unpin,
{
type Item = W::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(mut stream) = self.stream.try_lock() {
return stream.poll_next_unpin(cx);
}
Poll::Pending
}
}
/// A handle to a shared dynamic object stream
///
/// [`TypedReflectHandle`]s are created by calling [`subscribe()`] on a [`TypedDispatcher`],
/// Each shared stream reader should be polled independently and driven to readiness
/// to avoid deadlocks. When the [`TypedDispatcher`]'s buffer is filled, backpressure
/// will be applied on the root stream side.
///
/// When the root stream is dropped, or it ends, all [`TypedReflectHandle`]s
/// subscribed to the shared stream will also terminate after all events yielded by
/// the root stream have been observed. This means [`TypedReflectHandle`] streams
/// can still be polled after the root stream has been dropped.
#[pin_project]
pub struct TypedReflectHandle<K>
where
K: Lookup + Clone + 'static,
K::DynamicType: Eq + std::hash::Hash + Clone,
K: DeserializeOwned,
{
#[pin]
rx: Receiver<Event<DynamicObject>>,
store: Writer<K>,
}
impl<K> TypedReflectHandle<K>
where
K: Lookup + Clone + 'static,
K::DynamicType: Eq + std::hash::Hash + Clone + Default,
K: DeserializeOwned,
{
pub(super) fn new(rx: Receiver<Event<DynamicObject>>) -> TypedReflectHandle<K> {
Self {
rx,
// Initialize a ready store by default
store: {
let mut store: Writer<K> = Default::default();
store.apply_watcher_event(&Event::InitDone);
store
},
}
}
pub fn reader(&self) -> Store<K> {
self.store.as_reader()
}
}
pub fn gvk(obj: &DynamicObject) -> Option<GroupVersionKind> {
let gvk = obj.types.clone()?;
gvk.try_into().ok()
}
pub fn typed_gvk<K: Resource>(dt: K::DynamicType) -> GroupVersionKind {
GroupVersionKind::gvk(&K::group(&dt), &K::version(&dt), &K::kind(&dt))
}
impl<K> Stream for TypedReflectHandle<K>
where
K: Resource + Clone + 'static,
K::DynamicType: Eq + std::hash::Hash + Clone + Default,
K: DeserializeOwned,
{
type Item = Arc<K>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
return match ready!(this.rx.as_mut().poll_next(cx)) {
Some(event) => {
let obj = match event {
Event::InitApply(obj) | Event::Apply(obj)
if gvk(&obj) == Some(typed_gvk::<K>(Default::default())) =>
{
obj.try_parse::<K>()
.ok()
.inspect(|o| {
this.store.apply_watcher_event(&Event::Apply(o.clone()));
})
.map(Arc::new)
}
Event::Delete(obj)
if gvk(&obj) == Some(typed_gvk::<K>(Default::default())) =>
{
obj.try_parse::<K>()
.ok()
.inspect(|o| {
this.store.apply_watcher_event(&Event::Delete(o.clone()));
})
.map(Arc::new)
}
_ => None,
};
// Skip propagating all objects which do not belong to the cache
if obj.is_none() {
continue;
}
Poll::Ready(obj)
}
None => Poll::Ready(None),
};
}
}
}
pub fn broadcaster<W>(
mut writer: MultiDispatcher,
mut broadcast: BroadcastStream<W>,
) -> impl Stream<Item = W::Item>
where
W: Stream<Item = Result<Event<DynamicObject>>> + Unpin,
{
stream! {
while let Some(event) = broadcast.next().await {
match event {
Ok(ev) => {
writer.broadcast_event(&ev).await;
yield Ok(ev);
},
Err(ev) => yield Err(ev)
}
}
}
}

View File

@ -1,3 +1,121 @@
apiVersion: infrastructure.cluster.x-k8s.io/v1beta1
kind: DockerCluster
metadata:
name: docker-demo
spec:
loadBalancer:
customHAProxyConfigTemplateRef:
name: docker-demo-lb-config
---
apiVersion: cluster.x-k8s.io/v1beta1
kind: Cluster
metadata:
name: docker-demo
labels:
cni: calico
import: ""
spec:
clusterNetwork:
pods:
cidrBlocks:
- 10.1.0.0/16
services:
cidrBlocks:
- 10.10.0.0/16
serviceDomain: cluster.local
controlPlaneRef:
apiVersion: controlplane.cluster.x-k8s.io/v1beta1
kind: RKE2ControlPlane
name: docker-demo-control-plane
infrastructureRef:
apiVersion: infrastructure.cluster.x-k8s.io/v1beta1
kind: DockerCluster
name: docker-demo
---
apiVersion: infrastructure.cluster.x-k8s.io/v1beta1
kind: DockerMachineTemplate
metadata:
name: docker-demo-control-plane
spec:
template:
spec:
customImage: kindest/node:v1.31.0
bootstrapTimeout: 15m
---
apiVersion: controlplane.cluster.x-k8s.io/v1beta1
kind: RKE2ControlPlane
metadata:
name: docker-demo-control-plane
spec:
replicas: 1
version: v1.31.0+rke2r1
rolloutStrategy:
rollingUpdate:
maxSurge: 1
type: RollingUpdate
serverConfig:
cloudProviderName: external
cni: none
kubeAPIServer:
extraArgs:
- --anonymous-auth=true
disableComponents:
pluginComponents:
- rke2-ingress-nginx
kubernetesComponents:
- cloudController
machineTemplate:
infrastructureRef:
apiVersion: infrastructure.cluster.x-k8s.io/v1beta1
kind: DockerMachineTemplate
name: docker-demo-control-plane
nodeDrainTimeout: 30s
infrastructureRef:
apiVersion: infrastructure.cluster.x-k8s.io/v1beta1
kind: DockerMachineTemplate
name: docker-demo-control-plane
nodeDrainTimeout: 30s
---
apiVersion: infrastructure.cluster.x-k8s.io/v1beta1
kind: DockerMachineTemplate
metadata:
name: docker-demo-md-0
spec:
template:
spec:
customImage: kindest/node:v1.31.0
bootstrapTimeout: 15m
---
apiVersion: bootstrap.cluster.x-k8s.io/v1beta1
kind: RKE2ConfigTemplate
metadata:
name: docker-demo-md-0
spec: {}
---
apiVersion: cluster.x-k8s.io/v1beta1
kind: MachineDeployment
metadata:
name: docker-demo-md-0
spec:
clusterName: docker-demo
replicas: 1
selector:
matchLabels:
cluster.x-k8s.io/cluster-name: docker-demo
template:
spec:
version: v1.31.0+rke2r1
clusterName: docker-demo
bootstrap:
configRef:
apiVersion: bootstrap.cluster.x-k8s.io/v1beta1
kind: RKE2ConfigTemplate
name: docker-demo-md-0
infrastructureRef:
apiVersion: infrastructure.cluster.x-k8s.io/v1beta1
kind: DockerMachineTemplate
name: docker-demo-md-0
---
apiVersion: v1
data:
value: |-
@ -25,10 +143,12 @@ data:
default-server init-addr none
frontend stats
mode http
bind *:8404
stats enable
stats uri /
stats refresh 10s
stats uri /stats
stats refresh 1s
stats admin if TRUE
frontend control-plane
bind *:{{ .FrontendControlPlanePort }}
@ -39,10 +159,9 @@ data:
backend kube-apiservers
option httpchk GET /healthz
http-check expect status 401
# TODO: we should be verifying (!)
{{range $server, $address := .BackendServers}}
server {{ $server }} {{ JoinHostPort $address $.BackendControlPlanePort }} check check-ssl verify none resolvers docker resolve-prefer {{ if $.IPv6 -}} ipv6 {{- else -}} ipv4 {{- end }}
{{range $server, $backend := .BackendServers }}
server {{ $server }} {{ JoinHostPort $backend.Address $.BackendControlPlanePort }} check check-ssl verify none resolvers docker resolve-prefer {{ if $.IPv6 -}} ipv6 {{- else -}} ipv4 {{- end }}
{{- end}}
frontend rke2-join
@ -55,120 +174,11 @@ data:
backend rke2-servers
option httpchk GET /v1-rke2/readyz
http-check expect status 403
{{range $server, $address := .BackendServers}}
server {{ $server }} {{ $address }}:9345 check check-ssl verify none
{{range $server, $backend := .BackendServers }}
server {{ $server }} {{ $backend.Address }}:9345 check check-ssl verify none
{{- end}}
kind: ConfigMap
metadata:
name: test-lb-config
namespace: default
---
apiVersion: cluster.x-k8s.io/v1beta1
kind: Cluster
metadata:
name: test1
namespace: default
spec:
clusterNetwork:
pods:
cidrBlocks:
- 10.45.0.0/16
serviceDomain: cluster.local
services:
cidrBlocks:
- 10.46.0.0/16
controlPlaneRef:
apiVersion: controlplane.cluster.x-k8s.io/v1alpha1
kind: RKE2ControlPlane
name: test1-control-plane
infrastructureRef:
apiVersion: infrastructure.cluster.x-k8s.io/v1beta1
kind: DockerCluster
name: test1
---
apiVersion: infrastructure.cluster.x-k8s.io/v1beta1
kind: DockerCluster
metadata:
name: test1
namespace: default
spec:
loadBalancer:
customHAProxyConfigTemplateRef:
name: test-lb-config
---
apiVersion: controlplane.cluster.x-k8s.io/v1alpha1
kind: RKE2ControlPlane
metadata:
name: test1-control-plane
namespace: default
spec:
agentConfig:
version: v1.31.0+rke2r1
# nodeAnnotations:
# richtest: "true"
infrastructureRef:
apiVersion: infrastructure.cluster.x-k8s.io/v1beta1
kind: DockerMachineTemplate
name: controlplane
nodeDrainTimeout: 2m
replicas: 1
serverConfig:
cni: calico
---
apiVersion: infrastructure.cluster.x-k8s.io/v1beta1
kind: DockerMachineTemplate
metadata:
name: controlplane
namespace: default
spec:
template:
spec: {}
---
apiVersion: cluster.x-k8s.io/v1beta1
kind: MachineDeployment
metadata:
name: worker-md-0
namespace: default
spec:
clusterName: test1
replicas: 1
selector:
matchLabels:
cluster.x-k8s.io/cluster-name: test1
template:
spec:
bootstrap:
configRef:
apiVersion: bootstrap.cluster.x-k8s.io/v1alpha1
kind: RKE2ConfigTemplate
name: test1-agent
clusterName: test1
infrastructureRef:
apiVersion: infrastructure.cluster.x-k8s.io/v1beta1
kind: DockerMachineTemplate
name: worker
version: v1.31.0
---
apiVersion: infrastructure.cluster.x-k8s.io/v1beta1
kind: DockerMachineTemplate
metadata:
name: worker
namespace: default
spec:
template:
spec: {}
---
apiVersion: bootstrap.cluster.x-k8s.io/v1alpha1
kind: RKE2ConfigTemplate
metadata:
name: test1-agent
namespace: default
spec:
template:
spec:
agentConfig:
version: v1.31.0+rke2r1
#nodeAnnotations:
# richtest: "true"
# postRKE2Commands:
# - kubectl --kubeconfig /var/lib/rancher/rke2/agent/kubelet.kubeconfig annotate node $(hostname) richtest=true
name: docker-demo-lb-config
annotations:
"helm.sh/resource-policy": keep

View File

@ -21,5 +21,5 @@ spec:
matchLabels:
import: ""
install:
version: v0.12.0-alpha.14 # We will install alpha for helmapp support
version: v0.12.0-beta.1 # We will install alpha for helmapp support

13
testdata/gitrepo-calico.yaml vendored Normal file
View File

@ -0,0 +1,13 @@
apiVersion: fleet.cattle.io/v1alpha1
kind: GitRepo
metadata:
name: calico
spec:
branch: main
paths:
- /fleet/applications/calico
repo: https://github.com/rancher-sandbox/cluster-api-addon-provider-fleet.git
targets:
- clusterSelector:
matchLabels:
cni: calico

2
testdata/helm.yaml vendored
View File

@ -25,4 +25,4 @@ spec:
insecureSkipTLSVerify: true
targets:
- clusterName: docker-demo
- clusterGroup: quick-start
- clusterGroup: quick-start.clusterclass