cluster-api-provider-rke2/controlplane/internal/controllers/rke2controlplane_controller.go

1143 lines
41 KiB
Go

/*
Copyright 2022 SUSE.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
"context"
"fmt"
"time"
"github.com/go-logr/logr"
"github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/source"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/controllers/clustercache"
"sigs.k8s.io/cluster-api/controllers/remote"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/certs"
"sigs.k8s.io/cluster-api/util/collections"
"sigs.k8s.io/cluster-api/util/conditions"
capikubeconfig "sigs.k8s.io/cluster-api/util/kubeconfig"
"sigs.k8s.io/cluster-api/util/patch"
controlplanev1 "github.com/rancher/cluster-api-provider-rke2/controlplane/api/v1beta1"
"github.com/rancher/cluster-api-provider-rke2/controlplane/internal/contract"
"github.com/rancher/cluster-api-provider-rke2/controlplane/internal/util/ssa"
"github.com/rancher/cluster-api-provider-rke2/pkg/kubeconfig"
"github.com/rancher/cluster-api-provider-rke2/pkg/registration"
"github.com/rancher/cluster-api-provider-rke2/pkg/rke2"
"github.com/rancher/cluster-api-provider-rke2/pkg/secret"
rke2util "github.com/rancher/cluster-api-provider-rke2/pkg/util"
)
const (
// rke2ManagerName is the name of the RKE2 manager deployment.
rke2ManagerName = "rke2controlplane"
// rke2ControlPlaneKind is the kind of the RKE2 control plane.
rke2ControlPlaneKind = "RKE2ControlPlane"
// dependentCertRequeueAfter is how long to wait before checking again to see if
// dependent certificates have been created.
dependentCertRequeueAfter = 30 * time.Second
// DefaultRequeueTime is the default requeue time for the controller.
DefaultRequeueTime = 20 * time.Second
)
// RKE2ControlPlaneReconciler reconciles a RKE2ControlPlane object.
type RKE2ControlPlaneReconciler struct {
Log logr.Logger
client.Client
Scheme *runtime.Scheme
SecretCachingClient client.Client
// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string
managementClusterUncached rke2.ManagementCluster
managementCluster rke2.ManagementCluster
recorder record.EventRecorder
controller controller.Controller
ssaCache ssa.Cache
}
//nolint:lll
//+kubebuilder:rbac:groups=controlplane.cluster.x-k8s.io,resources=rke2controlplanes,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=controlplane.cluster.x-k8s.io,resources=rke2controlplanes/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=controlplane.cluster.x-k8s.io,resources=rke2controlplanes/finalizers,verbs=update
// +kubebuilder:rbac:groups=cluster.x-k8s.io,resources=clusters;clusters/status;machinesets;machines;machines/status;machinepools;machinepools/status,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups="",resources=secrets;events;configmaps,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups="bootstrap.cluster.x-k8s.io",resources=rke2configs,verbs=get;list;watch;create;patch;delete
// +kubebuilder:rbac:groups="infrastructure.cluster.x-k8s.io",resources=*,verbs=get;list;watch;create;patch;delete
// +kubebuilder:rbac:groups="apiextensions.k8s.io",resources=customresourcedefinitions,verbs=get;list;watch
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
func (r *RKE2ControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, reterr error) {
logger := log.FromContext(ctx)
r.Log = logger
rcp := &controlplanev1.RKE2ControlPlane{}
if err := r.Get(ctx, req.NamespacedName, rcp); err != nil {
if apierrors.IsNotFound(err) {
return ctrl.Result{}, err
}
return ctrl.Result{}, err
}
// Fetch the Cluster.
cluster, err := util.GetOwnerCluster(ctx, r.Client, rcp.ObjectMeta)
if err != nil {
logger.Error(err, "Failed to retrieve owner Cluster from the API Server")
return ctrl.Result{}, err
}
if cluster == nil {
logger.Info("Cluster Controller has not yet set OwnerRef")
return ctrl.Result{Requeue: true}, nil
}
logger = logger.WithValues("cluster", cluster.Name)
if annotations.IsPaused(cluster, rcp) {
logger.Info("Reconciliation is paused for this object")
return ctrl.Result{}, nil
}
// Initialize the patch helper.
patchHelper, err := patch.NewHelper(rcp, r.Client)
if err != nil {
logger.Error(err, "Failed to configure the patch helper")
return ctrl.Result{Requeue: true}, nil
}
// Add finalizer first if not exist to avoid the race condition between init and delete
if !controllerutil.ContainsFinalizer(rcp, controlplanev1.RKE2ControlPlaneFinalizer) {
controllerutil.AddFinalizer(rcp, controlplanev1.RKE2ControlPlaneFinalizer)
// patch and return right away instead of reusing the main defer,
// because the main defer may take too much time to get cluster status
// Patch ObservedGeneration only if the reconciliation completed successfully
patchOpts := []patch.Option{patch.WithStatusObservedGeneration{}}
if err := patchHelper.Patch(ctx, rcp, patchOpts...); err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed to add finalizer")
}
return ctrl.Result{Requeue: true}, nil
}
defer func() {
// Always attempt to update status.
if err := r.updateStatus(ctx, rcp, cluster); err != nil {
var connFailure *rke2.RemoteClusterConnectionError
if errors.As(err, &connFailure) {
logger.Info("Could not connect to workload cluster to fetch status", "err", err.Error())
} else {
logger.Error(err, "Failed to update RKE2ControlPlane Status")
reterr = kerrors.NewAggregate([]error{reterr, err})
}
}
// Always attempt to Patch the RKE2ControlPlane object and status after each reconciliation.
if err := patchRKE2ControlPlane(ctx, patchHelper, rcp); err != nil {
reterr = kerrors.NewAggregate([]error{reterr, err})
}
// Make rcp to requeue in case status is not ready, so we can check for node
// status without waiting for a full resync (by default 10 minutes).
// Only requeue if we are not going in exponential backoff due to error,
// or if we are not already re-queueing, or if the object has a deletion timestamp.
if reterr == nil && !res.Requeue && res.RequeueAfter <= 0 && rcp.DeletionTimestamp.IsZero() {
if !rcp.Status.Ready {
res = ctrl.Result{RequeueAfter: DefaultRequeueTime}
}
}
}()
if !rcp.DeletionTimestamp.IsZero() {
// Handle deletion reconciliation loop.
res, err = r.reconcileDelete(ctx, cluster, rcp)
return res, err
}
updated := false
// Backfill MachineTemplate.InfrastructureRef if missing but legacy Spec.InfrastructureRef exists
if rcp.Spec.InfrastructureRef.Name != "" && rcp.Spec.MachineTemplate.InfrastructureRef.Name == "" {
rcp.Spec.MachineTemplate.InfrastructureRef = rcp.Spec.InfrastructureRef
updated = true
}
// Ensure MachineTemplate.InfrastructureRef.Namespace is set
if rcp.Spec.MachineTemplate.InfrastructureRef.Name != "" && rcp.Spec.MachineTemplate.InfrastructureRef.Namespace == "" {
rcp.Spec.MachineTemplate.InfrastructureRef.Namespace = rcp.Namespace
updated = true
}
if updated {
if err := patchHelper.Patch(ctx, rcp); err != nil {
logger.Error(err, "Failed to patch RKE2ControlPlane during backfill")
// If patching fails, we return an error to avoid re-queuing the object.
return ctrl.Result{}, err
}
// Log the backfill operation.
logger.Info("Backfilled missing RKE2ControlPlane fields from legacy format", "rcp", klog.KObj(rcp))
// Requeue to ensure the controller reprocesses the object with the updated fields.
return ctrl.Result{Requeue: true}, nil
}
// Handle normal reconciliation loop.
res, err = r.reconcileNormal(ctx, cluster, rcp)
return res, err
}
func patchRKE2ControlPlane(ctx context.Context, patchHelper *patch.Helper, rcp *controlplanev1.RKE2ControlPlane) error {
// Always update the readyCondition by summarizing the state of other conditions.
conditions.SetSummary(rcp,
conditions.WithConditions(
controlplanev1.MachinesReadyCondition,
controlplanev1.MachinesSpecUpToDateCondition,
controlplanev1.ResizedCondition,
controlplanev1.MachinesReadyCondition,
controlplanev1.AvailableCondition,
// controlplanev1.CertificatesAvailableCondition,
),
)
// Patch the object, ignoring conflicts on the conditions owned by this controller.
return patchHelper.Patch(
ctx,
rcp,
patch.WithOwnedConditions{Conditions: []clusterv1.ConditionType{
clusterv1.ReadyCondition,
controlplanev1.MachinesSpecUpToDateCondition,
controlplanev1.ResizedCondition,
controlplanev1.MachinesReadyCondition,
controlplanev1.AvailableCondition,
}},
patch.WithStatusObservedGeneration{},
)
}
// SetupWithManager sets up the controller with the Manager.
func (r *RKE2ControlPlaneReconciler) SetupWithManager(
ctx context.Context, mgr ctrl.Manager, clientQPS float32,
clientBurst, clusterCacheConcurrency, concurrency int,
) error {
c, err := ctrl.NewControllerManagedBy(mgr).
For(&controlplanev1.RKE2ControlPlane{}).
Owns(&clusterv1.Machine{}).
WithOptions(controller.Options{
MaxConcurrentReconciles: concurrency,
}).
Build(r)
if err != nil {
return errors.Wrap(err, "failed setting up with a controller manager")
}
err = c.Watch(
source.Kind[client.Object](mgr.GetCache(), &clusterv1.Cluster{},
handler.EnqueueRequestsFromMapFunc((r.ClusterToRKE2ControlPlane(ctx))),
),
)
if err != nil {
return errors.Wrap(err, "failed adding Watch for Clusters to controller manager")
}
r.controller = c
r.recorder = mgr.GetEventRecorderFor("rke2-control-plane-controller")
r.ssaCache = ssa.NewCache("rke2-control-plane")
// Set up a clusterCache to provide to controllers
// requiring a connection to a remote cluster
clusterCache, err := clustercache.SetupWithManager(ctx, mgr, clustercache.Options{
SecretClient: r.SecretCachingClient,
Cache: clustercache.CacheOptions{
Indexes: []clustercache.CacheOptionsIndex{clustercache.NodeProviderIDIndex},
},
Client: clustercache.ClientOptions{
QPS: clientQPS,
Burst: clientBurst,
UserAgent: remote.DefaultClusterAPIUserAgent("rke2-control-plane-controller"),
Cache: clustercache.ClientCacheOptions{
DisableFor: []client.Object{
// Don't cache ConfigMaps & Secrets.
&corev1.ConfigMap{},
&corev1.Secret{},
// Don't cache Pods & DaemonSets (we get/list them e.g. during drain).
&corev1.Pod{},
&appsv1.DaemonSet{},
// Don't cache PersistentVolumes and VolumeAttachments (we get/list them e.g. during wait for volumes to detach)
&storagev1.VolumeAttachment{},
&corev1.PersistentVolume{},
},
},
},
}, controller.Options{
MaxConcurrentReconciles: clusterCacheConcurrency,
})
if err != nil {
return errors.Wrap(err, "unable to create cluster cache tracker")
}
if r.managementCluster == nil {
r.managementCluster = &rke2.Management{
Client: r.Client,
SecretCachingClient: r.SecretCachingClient,
ClusterCache: clusterCache,
}
}
if r.managementClusterUncached == nil {
r.managementClusterUncached = &rke2.Management{Client: mgr.GetClient()}
}
return nil
}
// ClusterToRKE2ControlPlane is a handler.ToRequestsFunc to be used to enqueue requests for reconciliation
// for RKE2ControlPlane based on updates to a Cluster.
func (r *RKE2ControlPlaneReconciler) ClusterToRKE2ControlPlane(ctx context.Context) handler.MapFunc {
log := log.FromContext(ctx)
return func(_ context.Context, o client.Object) []ctrl.Request {
c, ok := o.(*clusterv1.Cluster)
if !ok {
log.Error(nil, fmt.Sprintf("Expected a Cluster but got a %T", o))
return nil
}
controlPlaneRef := c.Spec.ControlPlaneRef
if controlPlaneRef != nil && controlPlaneRef.Kind == "RKE2ControlPlane" {
return []ctrl.Request{{NamespacedName: client.ObjectKey{Namespace: controlPlaneRef.Namespace, Name: controlPlaneRef.Name}}}
}
return nil
}
}
// nolint:gocyclo
func (r *RKE2ControlPlaneReconciler) updateStatus(ctx context.Context, rcp *controlplanev1.RKE2ControlPlane, cluster *clusterv1.Cluster) error {
logger := log.FromContext(ctx)
if cluster == nil {
logger.Info("Cluster is nil, skipping status update")
return nil
}
if rcp.Spec.Replicas == nil {
logger.Info("RKE2ControlPlane.Spec.Replicas is nil, skipping status update")
return nil
}
ownedMachines, err := r.managementCluster.GetMachinesForCluster(
ctx,
util.ObjectKey(cluster),
collections.OwnedMachines(rcp))
if err != nil {
return errors.Wrap(err, "failed to get list of owned machines")
}
if ownedMachines == nil {
logger.Info("Owned machines list is nil, skipping status update")
return nil
}
readyMachines := ownedMachines.Filter(collections.IsReady())
if readyMachines == nil {
logger.Info("Ready machines list is nil, skipping status update")
return nil
}
for _, readyMachine := range readyMachines {
logger.V(3).Info("Ready Machine : " + readyMachine.Name)
}
controlPlane, err := rke2.NewControlPlane(ctx, r.managementCluster, r.Client, cluster, rcp, ownedMachines)
if err != nil {
logger.Error(err, "failed to initialize control plane")
return err
}
rcp.Status.UpdatedReplicas = rke2util.SafeInt32(len(controlPlane.UpToDateMachines(ctx)))
replicas := rke2util.SafeInt32(len(ownedMachines))
desiredReplicas := *rcp.Spec.Replicas
// set basic data that does not require interacting with the workload cluster
// ReadyReplicas and UnavailableReplicas are set in case the function returns before updating them
rcp.Status.Replicas = replicas
rcp.Status.ReadyReplicas = 0
rcp.Status.UnavailableReplicas = replicas
// Return early if the deletion timestamp is set, because we don't want to try to connect to the workload cluster
// and we don't want to report resize condition (because it is set to deleting into reconcile delete).
if !rcp.DeletionTimestamp.IsZero() {
return nil
}
switch {
// We are scaling up
case replicas < desiredReplicas:
conditions.MarkFalse(
rcp,
controlplanev1.ResizedCondition,
controlplanev1.ScalingUpReason,
clusterv1.ConditionSeverityWarning,
"Scaling up control plane to %d replicas (actual %d)",
desiredReplicas,
replicas)
// We are scaling down
case replicas > desiredReplicas:
conditions.MarkFalse(
rcp,
controlplanev1.ResizedCondition,
controlplanev1.ScalingDownReason,
clusterv1.ConditionSeverityWarning,
"Scaling down control plane to %d replicas (actual %d)",
desiredReplicas,
replicas)
default:
// make sure last resize operation is marked as completed.
// NOTE: we are checking the number of machines ready so we report resize completed only when the machines
// are actually provisioned (vs reporting completed immediately after the last machine object is created).
if rke2util.SafeInt32(len(readyMachines)) == replicas {
conditions.MarkTrue(rcp, controlplanev1.ResizedCondition)
}
}
kubeconfigSecret := corev1.Secret{}
err = r.Get(ctx, types.NamespacedName{
Namespace: cluster.Namespace,
Name: secret.Name(cluster.Name, secret.Kubeconfig),
}, &kubeconfigSecret)
if err != nil {
logger.Info("Kubeconfig secret does not yet exist")
return err
}
kubeConfig := kubeconfigSecret.Data[secret.KubeconfigDataName]
if kubeConfig == nil {
return errors.New("unable to find a value entry in the kubeconfig secret")
}
rcp.Status.ReadyReplicas = rke2util.SafeInt32(len(readyMachines))
rcp.Status.UnavailableReplicas = replicas - rcp.Status.ReadyReplicas
workloadCluster, err := controlPlane.GetWorkloadCluster(ctx)
if err != nil {
logger.Error(err, "Failed to get remote client for workload cluster", "cluster key", util.ObjectKey(cluster))
return fmt.Errorf("getting workload cluster: %w", err)
}
if workloadCluster == nil {
logger.Info("Workload cluster is nil, skipping status update")
return nil
}
status := workloadCluster.ClusterStatus(ctx)
if status.HasRKE2ServingSecret {
rcp.Status.Initialized = true
}
if len(ownedMachines) == 0 || len(readyMachines) == 0 {
logger.Info(fmt.Sprintf("No Control Plane Machines exist or are ready for RKE2ControlPlane %s/%s", rcp.Namespace, rcp.Name))
return nil
}
availableCPMachines := readyMachines
registrationmethod, err := registration.NewRegistrationMethod(string(rcp.Spec.RegistrationMethod))
if err != nil {
logger.Error(err, "Failed to get node registration method")
return fmt.Errorf("getting node registration method: %w", err)
}
validIPAddresses, err := registrationmethod(cluster, rcp, availableCPMachines)
if err != nil {
logger.Error(err, "Failed to get registration addresses")
return fmt.Errorf("getting registration addresses: %w", err)
}
rcp.Status.AvailableServerIPs = validIPAddresses
if len(rcp.Status.AvailableServerIPs) == 0 {
return errors.New("some Control Plane machines exist and are ready but they have no IP Address available")
}
if len(readyMachines) == len(ownedMachines) {
rcp.Status.Ready = true
}
conditions.MarkTrue(rcp, controlplanev1.AvailableCondition)
lowestVersion := controlPlane.Machines.LowestVersion()
if lowestVersion != nil {
controlPlane.RCP.Status.Version = lowestVersion
}
// Surface lastRemediation data in status.
// LastRemediation is the remediation currently in progress, in any, or the
// most recent of the remediation we are keeping track on machines.
var lastRemediation *RemediationData
if v, ok := controlPlane.RCP.Annotations[controlplanev1.RemediationInProgressAnnotation]; ok {
remediationData, err := RemediationDataFromAnnotation(v)
if err != nil {
return err
}
lastRemediation = remediationData
} else {
for _, m := range controlPlane.Machines.UnsortedList() {
if v, ok := m.Annotations[controlplanev1.RemediationForAnnotation]; ok {
remediationData, err := RemediationDataFromAnnotation(v)
if err != nil {
return err
}
if lastRemediation == nil || lastRemediation.Timestamp.Time.Before(remediationData.Timestamp.Time) {
lastRemediation = remediationData
}
}
}
}
if lastRemediation != nil {
controlPlane.RCP.Status.LastRemediation = lastRemediation.ToStatus()
}
logger.Info("Successfully updated RKE2ControlPlane status", "namespace", rcp.Namespace, "name", rcp.Name)
return nil
}
func (r *RKE2ControlPlaneReconciler) reconcileNormal(
ctx context.Context,
cluster *clusterv1.Cluster,
rcp *controlplanev1.RKE2ControlPlane,
) (ctrl.Result, error) {
logger := log.FromContext(ctx)
logger.Info("Reconcile RKE2 Control Plane")
// Wait for the cluster infrastructure to be ready before creating machines
if !cluster.Status.InfrastructureReady {
logger.Info("Cluster infrastructure is not ready yet")
return ctrl.Result{}, nil
}
certificates := secret.NewCertificatesForInitialControlPlane()
if _, found := rcp.Annotations[controlplanev1.LegacyRKE2ControlPlane]; found {
certificates = secret.NewCertificatesForLegacyControlPlane()
}
controllerRef := metav1.NewControllerRef(rcp, controlplanev1.GroupVersion.WithKind("RKE2ControlPlane"))
if err := certificates.LookupOrGenerate(ctx, r.Client, util.ObjectKey(cluster), *controllerRef); err != nil {
logger.Error(err, "unable to lookup or create cluster certificates")
conditions.MarkFalse(
rcp, controlplanev1.CertificatesAvailableCondition,
controlplanev1.CertificatesGenerationFailedReason,
clusterv1.ConditionSeverityWarning, "%s", err.Error())
return ctrl.Result{}, err
}
conditions.MarkTrue(rcp, controlplanev1.CertificatesAvailableCondition)
// If ControlPlaneEndpoint is not set, return early
if !cluster.Spec.ControlPlaneEndpoint.IsValid() {
logger.Info("Cluster does not yet have a ControlPlaneEndpoint defined")
return ctrl.Result{}, nil
}
// Generate Cluster Kubeconfig if needed
if result, err := r.reconcileKubeconfig(
ctx,
util.ObjectKey(cluster),
cluster.Spec.ControlPlaneEndpoint,
rcp); err != nil {
logger.Error(err, "failed to reconcile Kubeconfig")
return result, err
}
controlPlaneMachines, err := r.managementClusterUncached.GetMachinesForCluster(
ctx,
util.ObjectKey(cluster),
collections.ControlPlaneMachines(cluster.Name))
if err != nil {
logger.Error(err, "failed to retrieve control plane machines for cluster")
return ctrl.Result{}, err
}
ownedMachines := controlPlaneMachines.Filter(collections.OwnedMachines(rcp))
if len(ownedMachines) != len(controlPlaneMachines) {
logger.Info("Not all control plane machines are owned by this RKE2ControlPlane, refusing to operate in mixed management mode") //nolint:lll
return ctrl.Result{}, nil
}
controlPlane, err := rke2.NewControlPlane(ctx, r.managementCluster, r.Client, cluster, rcp, ownedMachines)
if err != nil {
logger.Error(err, "failed to initialize control plane")
return ctrl.Result{}, err
}
if err := controlPlane.ReconcileExternalReference(ctx, r.Client); err != nil {
logger.Error(err, "Could not reconcile external reference")
return ctrl.Result{}, fmt.Errorf("reconciling external reference: %w", err)
}
if err := r.syncMachines(ctx, controlPlane); err != nil {
return ctrl.Result{}, errors.Wrap(err, "failed to sync Machines")
}
// Aggregate the operational state of all the machines; while aggregating we are adding the
// source ref (reason@machine/name) so the problem can be easily tracked down to its source machine.
conditions.SetAggregate(controlPlane.RCP, controlplanev1.MachinesReadyCondition,
ownedMachines.ConditionGetters(),
conditions.AddSourceRef(),
conditions.WithStepCounterIf(false))
// Updates conditions reporting the status of static pods and the status of the etcd cluster.
// NOTE: Conditions reporting RCP operation progress like e.g. Resized or SpecUpToDate are inlined with the rest of the execution.
if result, err := r.reconcileControlPlaneConditions(ctx, controlPlane); err != nil || !result.IsZero() {
logger.Error(err, "failed to reconcile Control Plane conditions")
return result, err
}
if result, err := r.reconcileLifecycleHooks(ctx, controlPlane); err != nil || !result.IsZero() {
return result, err
}
// Reconcile unhealthy machines by triggering deletion and requeue if it is considered safe to remediate,
// otherwise continue with the other RCP operations.
if result, err := r.reconcileUnhealthyMachines(ctx, controlPlane); err != nil || !result.IsZero() {
return result, err
}
// Control plane machines rollout due to configuration changes (e.g. upgrades) takes precedence over other operations.
needRollout := controlPlane.MachinesNeedingRollout(ctx)
switch {
case len(needRollout) > 0:
logger.Info("Rolling out Control Plane machines", "needRollout", needRollout.Names())
conditions.MarkFalse(controlPlane.RCP,
controlplanev1.MachinesSpecUpToDateCondition,
controlplanev1.RollingUpdateInProgressReason,
clusterv1.ConditionSeverityWarning,
"Rolling %d replicas with outdated spec (%d replicas up to date)",
len(needRollout),
len(controlPlane.Machines)-len(needRollout))
return r.upgradeControlPlane(ctx, cluster, rcp, controlPlane, needRollout)
default:
// make sure last upgrade operation is marked as completed.
// NOTE: we are checking the condition already exists in order to avoid to set this condition at the first
// reconciliation/before a rolling upgrade actually starts.
if conditions.Has(controlPlane.RCP, controlplanev1.MachinesSpecUpToDateCondition) {
conditions.MarkTrue(controlPlane.RCP, controlplanev1.MachinesSpecUpToDateCondition)
}
}
// If we've made it this far, we can assume that all ownedMachines are up to date
numMachines := len(ownedMachines)
desiredReplicas := int(*rcp.Spec.Replicas)
switch {
// We are creating the first replica
case numMachines < desiredReplicas && numMachines == 0:
// Create new Machine w/ init
logger.Info("Initializing control plane", "Desired", desiredReplicas, "Existing", numMachines)
conditions.MarkFalse(controlPlane.RCP,
controlplanev1.AvailableCondition,
controlplanev1.WaitingForRKE2ServerReason,
clusterv1.ConditionSeverityInfo, "")
return r.initializeControlPlane(ctx, cluster, rcp, controlPlane)
// We are scaling up
case numMachines < desiredReplicas && numMachines > 0:
// Create a new Machine w/ join
logger.Info("Scaling up control plane", "Desired", desiredReplicas, "Existing", numMachines)
return r.scaleUpControlPlane(ctx, cluster, rcp, controlPlane)
// We are scaling down
case numMachines > desiredReplicas:
logger.Info("Scaling down control plane", "Desired", desiredReplicas, "Existing", numMachines)
// The last parameter (i.e. machines needing to be rolled out) should always be empty here.
return r.scaleDownControlPlane(ctx, cluster, rcp, controlPlane, collections.Machines{})
}
return ctrl.Result{}, nil
}
func (r *RKE2ControlPlaneReconciler) reconcileDelete(ctx context.Context,
cluster *clusterv1.Cluster,
rcp *controlplanev1.RKE2ControlPlane,
) (res ctrl.Result, err error) {
logger := log.FromContext(ctx)
// Gets all machines, not just control plane machines.
allMachines, err := r.managementCluster.GetMachinesForCluster(ctx, util.ObjectKey(cluster))
if err != nil {
return ctrl.Result{}, err
}
ownedMachines := allMachines.Filter(collections.OwnedMachines(rcp))
// If no control plane machines remain, remove the finalizer
if len(ownedMachines) == 0 {
// If the legacy finalizer is present, remove it.
if controllerutil.ContainsFinalizer(rcp, controlplanev1.RKE2ControlPlaneLegacyFinalizer) {
controllerutil.RemoveFinalizer(rcp, controlplanev1.RKE2ControlPlaneLegacyFinalizer)
}
controllerutil.RemoveFinalizer(rcp, controlplanev1.RKE2ControlPlaneFinalizer)
return ctrl.Result{}, nil
}
controlPlane, err := rke2.NewControlPlane(ctx, r.managementCluster, r.Client, cluster, rcp, ownedMachines)
if err != nil {
logger.Error(err, "failed to initialize control plane")
return ctrl.Result{}, err
}
// Updates conditions reporting the status of static pods and the status of the etcd cluster.
// NOTE: Ignoring failures given that we are deleting
if _, err := r.reconcileControlPlaneConditions(ctx, controlPlane); err != nil {
logger.Info("failed to reconcile conditions", "error", err.Error())
}
// Aggregate the operational state of all the machines; while aggregating we are adding the
// source ref (reason@machine/name) so the problem can be easily tracked down to its source machine.
// However, during delete we are hiding the counter (1 of x) because it does not make sense given that
// all the machines are deleted in parallel.
conditions.SetAggregate(rcp,
controlplanev1.MachinesReadyCondition,
ownedMachines.ConditionGetters(),
conditions.AddSourceRef(),
conditions.WithStepCounterIf(false))
// Verify that only control plane machines remain
if len(allMachines) != len(ownedMachines) {
logger.Info("Waiting for worker nodes to be deleted first")
conditions.MarkFalse(rcp,
controlplanev1.ResizedCondition,
clusterv1.DeletingReason,
clusterv1.ConditionSeverityInfo,
"Waiting for worker nodes to be deleted first")
return ctrl.Result{RequeueAfter: deleteRequeueAfter}, nil
}
// Delete control plane machines in parallel
machinesToDelete := ownedMachines
var errs []error
for i := range machinesToDelete {
m := machinesToDelete[i]
logger := logger.WithValues("machine", m)
// During RKE2CP deletion we don't care about forwarding etcd leadership or removing etcd members.
// So we are removing the pre-terminate hook.
// This is important because when deleting RKE2CP we will delete all members of etcd and it's not possible
// to forward etcd leadership without any member left after we went through the Machine deletion.
// Also in this case the reconcileDelete code of the Machine controller won't execute Node drain
// and wait for volume detach.
if err := r.removePreTerminateHookAnnotationFromMachine(ctx, m); err != nil {
errs = append(errs, err)
continue
}
if err := r.removeHookAnnotationFromMachine(ctx, m, controlplanev1.PreDrainLoadbalancerExclusionAnnotation); err != nil {
errs = append(errs, err)
continue
}
if !m.DeletionTimestamp.IsZero() {
// Nothing to do, Machine already has deletionTimestamp set.
continue
}
if err := r.Delete(ctx, machinesToDelete[i]); err != nil && !apierrors.IsNotFound(err) {
logger.Error(err, "Failed to cleanup owned machine")
errs = append(errs, err)
}
}
if len(errs) > 0 {
err := kerrors.NewAggregate(errs)
r.recorder.Eventf(rcp, corev1.EventTypeWarning, "FailedDelete",
"Failed to delete control plane Machines for cluster %s/%s control plane: %v", cluster.Namespace, cluster.Name, err)
return ctrl.Result{}, err
}
logger.Info("Waiting for control plane Machines to not exist anymore")
conditions.MarkFalse(rcp, controlplanev1.ResizedCondition, clusterv1.DeletingReason, clusterv1.ConditionSeverityInfo, "")
return ctrl.Result{RequeueAfter: deleteRequeueAfter}, nil
}
func (r *RKE2ControlPlaneReconciler) reconcileKubeconfig(
ctx context.Context,
clusterName client.ObjectKey,
endpoint clusterv1.APIEndpoint,
rcp *controlplanev1.RKE2ControlPlane,
) (ctrl.Result, error) {
logger := ctrl.LoggerFrom(ctx)
if endpoint.IsZero() {
logger.V(5).Info("API Endpoint not yet known")
return ctrl.Result{RequeueAfter: DefaultRequeueTime}, nil
}
controllerOwnerRef := *metav1.NewControllerRef(rcp, controlplanev1.GroupVersion.WithKind("RKE2ControlPlane"))
configSecret, err := secret.GetFromNamespacedName(ctx, r.Client, clusterName, secret.Kubeconfig)
switch {
case apierrors.IsNotFound(err):
logger.Info("Kubeconfig Secret not found, creating a new one")
createErr := kubeconfig.CreateSecretWithOwner(
ctx,
r.Client,
clusterName,
endpoint.String(),
controllerOwnerRef,
)
if errors.Is(createErr, kubeconfig.ErrDependentCertificateNotFound) {
logger.Error(createErr, "Could not find Secret CA to create Kubeconfig Secret, requeuing...")
return ctrl.Result{RequeueAfter: dependentCertRequeueAfter}, nil
}
// always return if we have just created in order to skip rotation checks
return ctrl.Result{}, createErr
case err != nil:
return ctrl.Result{}, errors.Wrap(err, "failed to retrieve kubeconfig Secret")
}
// only do rotation on owned secrets
if !util.IsControlledBy(configSecret, rcp) {
logger.Info("Kubeconfig Secret not controlled by RKE2ControlPlane, nothing to do")
return ctrl.Result{}, nil
}
needsRotation, err := capikubeconfig.NeedsClientCertRotation(configSecret, certs.ClientCertificateRenewalDuration)
if err != nil {
return ctrl.Result{}, err
}
if needsRotation {
logger.Info("Rotating kubeconfig secret")
if err := kubeconfig.UpdateSecret(ctx, r.Client, clusterName, endpoint.String(), configSecret); err != nil {
return ctrl.Result{}, errors.Wrap(err, "failed to regenerate kubeconfig")
}
}
return ctrl.Result{}, nil
}
// reconcileControlPlaneConditions is responsible of reconciling conditions reporting the status of static pods and
// the status of the etcd cluster.
func (r *RKE2ControlPlaneReconciler) reconcileControlPlaneConditions(
ctx context.Context, controlPlane *rke2.ControlPlane,
) (res ctrl.Result, retErr error) {
logger := log.FromContext(ctx)
readyCPMachines := controlPlane.Machines.Filter(collections.IsReady())
if readyCPMachines.Len() == 0 {
controlPlane.RCP.Status.Initialized = false
controlPlane.RCP.Status.Ready = false
controlPlane.RCP.Status.ReadyReplicas = 0
controlPlane.RCP.Status.AvailableServerIPs = nil
conditions.MarkFalse(
controlPlane.RCP,
controlplanev1.AvailableCondition,
controlplanev1.WaitingForRKE2ServerReason,
clusterv1.ConditionSeverityInfo, "")
conditions.MarkFalse(
controlPlane.RCP,
controlplanev1.MachinesReadyCondition,
controlplanev1.WaitingForRKE2ServerReason,
clusterv1.ConditionSeverityInfo, "")
}
// If the cluster is not yet initialized, there is no way to connect to the workload cluster and fetch information
// for updating conditions. Return early.
if !controlPlane.RCP.Status.Initialized {
return ctrl.Result{}, nil
}
workloadCluster, err := controlPlane.GetWorkloadCluster(ctx)
if err != nil {
logger.Error(err, "Failed to get remote client for workload cluster", "cluster key", util.ObjectKey(controlPlane.Cluster))
return ctrl.Result{}, fmt.Errorf("getting workload cluster: %w", err)
}
defer func() {
// Always attempt to Patch the Machine conditions after each reconcile.
if err := controlPlane.PatchMachines(ctx); err != nil {
retErr = kerrors.NewAggregate([]error{retErr, err})
}
}()
if err := workloadCluster.InitWorkload(ctx, controlPlane); err != nil {
logger.Error(err, "Unable to initialize workload cluster")
return ctrl.Result{}, err
}
// Update conditions status
workloadCluster.UpdateAgentConditions(controlPlane)
workloadCluster.UpdateEtcdConditions(controlPlane)
// Patch nodes metadata
if err := workloadCluster.UpdateNodeMetadata(ctx, controlPlane); err != nil {
logger.Error(err, "Unable to update node metadata")
return ctrl.Result{}, err
}
// RCP will be patched at the end of Reconcile to reflect updated conditions, so we can return now.
return ctrl.Result{}, nil
}
func (r *RKE2ControlPlaneReconciler) upgradeControlPlane(
ctx context.Context,
cluster *clusterv1.Cluster,
rcp *controlplanev1.RKE2ControlPlane,
controlPlane *rke2.ControlPlane,
machinesRequireUpgrade collections.Machines,
) (ctrl.Result, error) {
logger := controlPlane.Logger()
// If the cluster is not yet initialized, there is no way to connect to the workload cluster and fetch information
// for updating conditions. Return early.
if !rcp.Status.Initialized {
logger.Info("ControlPlane not yet initialized")
return ctrl.Result{}, nil
}
workloadCluster, err := controlPlane.GetWorkloadCluster(ctx)
if err != nil {
logger.Error(err, "Failed to get remote client for workload cluster", "cluster key", util.ObjectKey(cluster))
return ctrl.Result{}, fmt.Errorf("getting workload cluster: %w", err)
}
if err := workloadCluster.InitWorkload(ctx, controlPlane); err != nil {
return ctrl.Result{}, err
}
switch rcp.Spec.RolloutStrategy.Type {
case controlplanev1.RollingUpdateStrategyType:
// RolloutStrategy is currently defaulted and validated to be RollingUpdate.
// Defaulted to 1 if not specified
maxSurge := intstr.FromInt(1)
if rcp.Spec.RolloutStrategy.RollingUpdate != nil && rcp.Spec.RolloutStrategy.RollingUpdate.MaxSurge != nil {
maxSurge = *rcp.Spec.RolloutStrategy.RollingUpdate.MaxSurge
}
maxNodes := *rcp.Spec.Replicas + rke2util.SafeInt32(maxSurge.IntValue())
if rke2util.SafeInt32(controlPlane.Machines.Len()) < maxNodes {
// scaleUpControlPlane ensures that we don't continue scaling up while waiting for Machines to have NodeRefs
return r.scaleUpControlPlane(ctx, cluster, rcp, controlPlane)
}
return r.scaleDownControlPlane(ctx, cluster, rcp, controlPlane, machinesRequireUpgrade)
default:
err := fmt.Errorf("unknown rollout strategy type %q", rcp.Spec.RolloutStrategy.Type)
logger.Error(err, "RolloutStrategy type is not set to RollingUpdateStrategyType, unable to determine the strategy for rolling out machines")
return ctrl.Result{}, nil
}
}
// syncMachines updates Machines, InfrastructureMachines and Rke2Configs to propagate in-place mutable fields from RKE2ControlPlane.
// Note: For InfrastructureMachines and Rke2Configs it also drops ownership of "metadata.labels" and
// "metadata.annotations" from "manager" so that "rke2controlplane" can own these fields and can work with SSA.
// Otherwise, fields would be co-owned by our "old" "manager" and "rke2controlplane" and then we would not be
// able to e.g. drop labels and annotations.
func (r *RKE2ControlPlaneReconciler) syncMachines(ctx context.Context, controlPlane *rke2.ControlPlane) error {
patchHelpers := map[string]*patch.Helper{}
for machineName := range controlPlane.Machines {
m := controlPlane.Machines[machineName]
// If the Machine is already being deleted, we only need to sync
// the subset of fields that impact tearing down the Machine.
if !m.DeletionTimestamp.IsZero() {
patchHelper, err := patch.NewHelper(m, r.Client)
if err != nil {
return err
}
// Set all other in-place mutable fields that impact the ability to tear down existing machines.
m.Spec.NodeDrainTimeout = controlPlane.RCP.Spec.MachineTemplate.NodeDrainTimeout
m.Spec.NodeDeletionTimeout = controlPlane.RCP.Spec.MachineTemplate.NodeDeletionTimeout
m.Spec.NodeVolumeDetachTimeout = controlPlane.RCP.Spec.MachineTemplate.NodeVolumeDetachTimeout
if err := patchHelper.Patch(ctx, m); err != nil {
return err
}
controlPlane.Machines[machineName] = m
patchHelper, err = patch.NewHelper(m, r.Client)
if err != nil { //nolint:wsl
return err
}
patchHelpers[machineName] = patchHelper
continue
}
// Cleanup managed fields of all Machines.
if err := ssa.CleanUpManagedFieldsForSSAAdoption(ctx, r.Client, m, rke2ManagerName); err != nil {
return errors.Wrapf(err, "failed to update Machine: failed to adjust the managedFields of the Machine %s", klog.KObj(m))
}
// Update Machine to propagate in-place mutable fields from RCP.
updatedMachine, err := r.UpdateMachine(ctx, m, controlPlane.RCP, controlPlane.Cluster)
if err != nil {
return errors.Wrapf(err, "failed to update Machine: %s", klog.KObj(m))
}
controlPlane.Machines[machineName] = updatedMachine
// Since the machine is updated, re-create the patch helper so that any subsequent
// Patch calls use the correct base machine object to calculate the diffs.
// Example: reconcileControlPlaneConditions patches the machine objects in a subsequent call
// and, it should use the updated machine to calculate the diff.
// Note: If the patchHelpers are not re-computed based on the new updated machines, subsequent
// Patch calls will fail because the patch will be calculated based on an outdated machine and will error
// because of outdated resourceVersion.
patchHelper, err := patch.NewHelper(updatedMachine, r.Client)
if err != nil {
return errors.Wrapf(err, "failed to create patch helper for Machine %s", klog.KObj(updatedMachine))
}
patchHelpers[machineName] = patchHelper
labelsAndAnnotationsManagedFieldPaths := []contract.Path{
{"f:metadata", "f:annotations"},
{"f:metadata", "f:labels"},
}
infraMachine, infraMachineFound := controlPlane.InfraResources[machineName]
// Only update the InfraMachine if it is already found, otherwise just skip it.
// This could happen e.g. if the cache is not up-to-date yet.
if infraMachineFound {
// Cleanup managed fields of all InfrastructureMachines to drop ownership of labels and annotations
// from "manager". We do this so that InfrastructureMachines that are created using the Create method
// can also work with SSA. Otherwise, labels and annotations would be co-owned by our "old" "manager"
// and "rke2-controlplane" and then we would not be able to e.g. drop labels and annotations.
if err := ssa.DropManagedFields(ctx, r.Client, infraMachine, rke2ManagerName, labelsAndAnnotationsManagedFieldPaths); err != nil {
return errors.Wrapf(err, "failed to clean up managedFields of InfrastructureMachine %s", klog.KObj(infraMachine))
}
// Update in-place mutating fields on InfrastructureMachine.
if err := r.UpdateExternalObject(ctx, infraMachine, controlPlane.RCP, controlPlane.Cluster); err != nil {
return errors.Wrapf(err, "failed to update InfrastructureMachine %s", klog.KObj(infraMachine))
}
}
rke2Config, rke2ConfigFound := controlPlane.Rke2Configs[machineName]
// Only update the RKE2Config if it is already found, otherwise just skip it.
// This could happen e.g. if the cache is not up-to-date yet.
if rke2ConfigFound {
// Note: Set the GroupVersionKind because updateExternalObject depends on it.
rke2Config.SetGroupVersionKind(m.Spec.Bootstrap.ConfigRef.GroupVersionKind())
// Cleanup managed fields of all RKE2Configs to drop ownership of labels and annotations
// from "manager". We do this so that RKE2Configs that are created using the Create method
// can also work with SSA. Otherwise, labels and annotations would be co-owned by our "old" "manager"
// and "rke2-controlplane" and then we would not be able to e.g. drop labels and annotations.
if err := ssa.DropManagedFields(ctx, r.Client, rke2Config, rke2ManagerName, labelsAndAnnotationsManagedFieldPaths); err != nil {
return errors.Wrapf(err, "failed to clean up managedFields of RKE2Config %s", klog.KObj(rke2Config))
}
// Update in-place mutating fields on BootstrapConfig.
if err := r.UpdateExternalObject(ctx, rke2Config, controlPlane.RCP, controlPlane.Cluster); err != nil {
return errors.Wrapf(err, "failed to update RKE2Config %s", klog.KObj(rke2Config))
}
}
}
// Update the patch helpers.
controlPlane.SetPatchHelpers(patchHelpers)
return nil
}