/* Copyright 2022 SUSE. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package rke2 import ( "context" "crypto/tls" "crypto/x509" "fmt" "strings" "time" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/rest" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" "sigs.k8s.io/cluster-api/util" "sigs.k8s.io/cluster-api/util/annotations" "sigs.k8s.io/cluster-api/util/collections" "sigs.k8s.io/cluster-api/util/conditions" "sigs.k8s.io/cluster-api/util/patch" controlplanev1 "github.com/rancher/cluster-api-provider-rke2/controlplane/api/v1beta1" "github.com/rancher/cluster-api-provider-rke2/pkg/etcd" ) const ( labelNodeRoleControlPlane = "node-role.kubernetes.io/master" remoteEtcdTimeout = 30 * time.Second etcdDialTimeout = 10 * time.Second etcdCallTimeout = 15 * time.Second minimalNodeCount = 2 rke2ServingSecretKey = "rke2-serving" //nolint: gosec ) // WorkloadCluster defines all behaviors necessary to upgrade kubernetes on a workload cluster. type WorkloadCluster interface { // Basic health and status checks. InitWorkload(ctx context.Context, controlPlane *ControlPlane) error UpdateNodeMetadata(ctx context.Context, controlPlane *ControlPlane) error ClusterStatus(ctx context.Context) ClusterStatus UpdateAgentConditions(controlPlane *ControlPlane) UpdateEtcdConditions(controlPlane *ControlPlane) // Upgrade related tasks. // AllowBootstrapTokensToGetNodes(ctx context.Context) error // State recovery tasks. RemoveEtcdMemberForMachine(ctx context.Context, machine *clusterv1.Machine) error IsEtcdMemberSafelyRemovedForMachine(ctx context.Context, machine *clusterv1.Machine) (bool, error) ForwardEtcdLeadership(ctx context.Context, machine *clusterv1.Machine, leaderCandidate *clusterv1.Machine) error EtcdMembers(ctx context.Context) ([]string, error) // Common tasks. ApplyLabelOnNode(ctx context.Context, machine *clusterv1.Machine, label, value string) error } // Workload defines operations on workload clusters. type Workload struct { ctrlclient.Client Nodes map[string]*corev1.Node nodePatchHelpers map[string]*patch.Helper etcdClientGenerator etcd.ClientFor } // NewWorkload is creating a new ClusterWorkload instance. func (m *Management) NewWorkload( ctx context.Context, cl ctrlclient.Client, restConfig *rest.Config, clusterKey ctrlclient.ObjectKey, ) (*Workload, error) { workload := &Workload{ Client: cl, Nodes: map[string]*corev1.Node{}, nodePatchHelpers: map[string]*patch.Helper{}, } restConfig = rest.CopyConfig(restConfig) restConfig.Timeout = remoteEtcdTimeout // Retrieves the etcd CA key Pair etcdKeyPair, err := m.getEtcdCAKeyPair(ctx, m.SecretCachingClient, clusterKey) if ctrlclient.IgnoreNotFound(err) != nil { return nil, err } else if apierrors.IsNotFound(err) { etcdKeyPair, err = m.getEtcdCAKeyPair(ctx, m.Client, clusterKey) if ctrlclient.IgnoreNotFound(err) != nil { return nil, err } } if apierrors.IsNotFound(err) || etcdKeyPair == nil { log.FromContext(ctx).Info("Cluster does not provide etcd certificates for creating child etcd ctrlclient.") return workload, nil } clientCert, err := tls.X509KeyPair(etcdKeyPair.Cert, etcdKeyPair.Key) if err != nil { return nil, err } if !strings.Contains(string(etcdKeyPair.Key), "EC PRIVATE KEY") { clientKey, err := m.ClusterCache.GetClientCertificatePrivateKey(ctx, clusterKey) if err != nil { return nil, err } if clientKey == nil { return nil, errors.New("client key is not populated yet, requeuing") } clientCert, err = generateClientCert(etcdKeyPair.Cert, etcdKeyPair.Key, clientKey) if err != nil { return nil, err } } caPool := x509.NewCertPool() caPool.AppendCertsFromPEM(etcdKeyPair.Cert) tlsConfig := &tls.Config{ RootCAs: caPool, Certificates: []tls.Certificate{clientCert}, MinVersion: tls.VersionTLS12, } tlsConfig.InsecureSkipVerify = true workload.etcdClientGenerator = etcd.NewClientGenerator(restConfig, tlsConfig, etcdDialTimeout, etcdCallTimeout) return workload, nil } // InitWorkload prepares workload for evaluating status conditions. func (w *Workload) InitWorkload(ctx context.Context, cp *ControlPlane) error { nodes, err := w.getControlPlaneNodes(ctx) if err != nil { conditions.MarkUnknown( cp.RCP, controlplanev1.ControlPlaneComponentsHealthyCondition, controlplanev1.ControlPlaneComponentsInspectionFailedReason, "Failed to list nodes which are hosting control plane components") return err } for _, node := range nodes.Items { nodeCopy := node w.Nodes[node.Name] = &nodeCopy } for _, node := range w.Nodes { patchHelper, err := patch.NewHelper(node, w.Client) if err != nil { conditions.MarkUnknown( cp.RCP, controlplanev1.ControlPlaneComponentsHealthyCondition, controlplanev1.ControlPlaneComponentsInspectionFailedReason, "Failed to create patch helpers for control plane nodes") return err } w.nodePatchHelpers[node.Name] = patchHelper } return nil } // ClusterStatus holds stats information about the cluster. type ClusterStatus struct { // Nodes are a total count of nodes Nodes int32 // ReadyNodes are the count of nodes that are reporting ready ReadyNodes int32 // HasRKE2ServingSecret will be true if the rke2-serving secret has been uploaded, false otherwise. HasRKE2ServingSecret bool } // PatchNodes patches the nodes in the workload cluster. func (w *Workload) PatchNodes(ctx context.Context, cp *ControlPlane) error { errList := []error{} for i := range w.Nodes { node := w.Nodes[i] machine, found := cp.Machines[node.Name] if !found { for _, m := range cp.Machines { if m.Status.NodeRef != nil && m.Status.NodeRef.Name == node.Name { machine = m break } } if machine == nil { continue } } if helper, ok := w.nodePatchHelpers[node.Name]; ok { if err := helper.Patch(ctx, node); err != nil { conditions.MarkUnknown( machine, controlplanev1.NodeMetadataUpToDate, controlplanev1.NodePatchFailedReason, "%s", err.Error()) errList = append(errList, err) } continue } errList = append(errList, errors.Errorf("failed to get patch helper for node %s", node.Name)) } return kerrors.NewAggregate(errList) } // ClusterStatus returns the status of the cluster. func (w *Workload) ClusterStatus(ctx context.Context) ClusterStatus { status := ClusterStatus{} // count the control plane nodes for _, node := range w.Nodes { nodeCopy := node status.Nodes++ if util.IsNodeReady(nodeCopy) { status.ReadyNodes++ } } // find the rke2-serving secret key := ctrlclient.ObjectKey{ Name: rke2ServingSecretKey, Namespace: metav1.NamespaceSystem, } err := w.Get(ctx, key, &corev1.Secret{}) // In case of error we do assume the control plane is not initialized yet. if err != nil { logger := log.FromContext(ctx) logger.Info("Control Plane does not seem to be initialized yet.", "reason", err.Error()) } status.HasRKE2ServingSecret = err == nil return status } func hasProvisioningMachine(machines collections.Machines) bool { for _, machine := range machines { if machine.Status.NodeRef == nil { return true } } return false } // nodeHasUnreachableTaint returns true if the node has is unreachable from the node controller. func nodeHasUnreachableTaint(node corev1.Node) bool { for _, taint := range node.Spec.Taints { if taint.Key == corev1.TaintNodeUnreachable && taint.Effect == corev1.TaintEffectNoExecute { return true } } return false } // UpdateAgentConditions is responsible for updating machine conditions reflecting the status of all the control plane // components running in a static pod generated by RKE2. This operation is best effort, in the sense that in case // of problems in retrieving the pod status, it sets the condition to Unknown state without returning any error. func (w *Workload) UpdateAgentConditions(controlPlane *ControlPlane) { allMachinePodConditions := []clusterv1.ConditionType{ controlplanev1.MachineAgentHealthyCondition, } // Update conditions for control plane components hosted as static pods on the nodes. var rcpErrors []string for k := range w.Nodes { node := w.Nodes[k] // Search for the machine corresponding to the node. machine, found := controlPlane.Machines[node.Name] // If there is no machine corresponding to a node, determine if this is an error or not. if !found { // If there are machines still provisioning there is the chance that a chance that a node might be linked to a machine soon, // otherwise report the error at RCP level given that there is no machine to report on. if hasProvisioningMachine(controlPlane.Machines) { continue } for _, m := range controlPlane.Machines { if m.Status.NodeRef != nil && m.Status.NodeRef.Name == node.Name { machine = m break } } if machine == nil { rcpErrors = append(rcpErrors, fmt.Sprintf("Control plane node %s does not have a corresponding machine", node.Name)) continue } } // If the machine is deleting, report all the conditions as deleting if !machine.DeletionTimestamp.IsZero() { for _, condition := range allMachinePodConditions { conditions.MarkFalse(machine, condition, clusterv1.DeletingReason, clusterv1.ConditionSeverityInfo, "") } continue } // If the node is Unreachable, information about static pods could be stale so set all conditions to unknown. if nodeHasUnreachableTaint(*node) { // NOTE: We are assuming unreachable as a temporary condition, leaving to MHC // the responsibility to determine if the node is unhealthy or not. for _, condition := range allMachinePodConditions { conditions.MarkUnknown(machine, condition, controlplanev1.PodInspectionFailedReason, "Node is unreachable") } continue } } // If there are provisioned machines without corresponding nodes, report this as a failing conditions with SeverityError. for i := range controlPlane.Machines { machine := controlPlane.Machines[i] if machine.Status.NodeRef == nil { continue } node, found := w.Nodes[machine.Status.NodeRef.Name] if !found { for _, condition := range allMachinePodConditions { conditions.MarkFalse(machine, condition, controlplanev1.PodFailedReason, clusterv1.ConditionSeverityError, "Missing node") } continue } for _, condition := range node.Status.Conditions { if condition.Type == corev1.NodeReady && condition.Status == corev1.ConditionTrue { conditions.MarkTrue(machine, controlplanev1.MachineAgentHealthyCondition) } } } // Aggregate components error from machines at RCP level. aggregateFromMachinesToRCP(aggregateFromMachinesToRCPInput{ controlPlane: controlPlane, machineConditions: allMachinePodConditions, rcpErrors: rcpErrors, condition: controlplanev1.ControlPlaneComponentsHealthyCondition, unhealthyReason: controlplanev1.ControlPlaneComponentsUnhealthyReason, unknownReason: controlplanev1.ControlPlaneComponentsUnknownReason, note: "control plane", }) } // UpdateNodeMetadata is responsible for populating node metadata after // it is referenced from machine object. func (w *Workload) UpdateNodeMetadata(ctx context.Context, controlPlane *ControlPlane) error { for nodeName, machine := range controlPlane.Machines { if machine.Spec.Bootstrap.ConfigRef == nil { continue } if machine.Status.NodeRef != nil { nodeName = machine.Status.NodeRef.Name } conditions.MarkTrue(machine, controlplanev1.NodeMetadataUpToDate) node, nodeFound := w.Nodes[nodeName] if !nodeFound { conditions.MarkUnknown( machine, controlplanev1.NodeMetadataUpToDate, controlplanev1.NodePatchFailedReason, "associated node not found") continue } else if name, ok := node.Annotations[clusterv1.MachineAnnotation]; !ok || name != machine.Name { conditions.MarkUnknown( machine, controlplanev1.NodeMetadataUpToDate, controlplanev1.NodePatchFailedReason, "node object is missing %s annotation", clusterv1.MachineAnnotation) continue } rkeConfig, found := controlPlane.Rke2Configs[machine.Name] if !found { conditions.MarkUnknown( machine, controlplanev1.NodeMetadataUpToDate, controlplanev1.NodePatchFailedReason, "associated RKE2 config not found") continue } annotations.AddAnnotations(node, rkeConfig.Spec.AgentConfig.NodeAnnotations) } return w.PatchNodes(ctx, controlPlane) } // ApplyLabelOnNode applies a label key and value to the Node associated to the Machine, if any. func (w *Workload) ApplyLabelOnNode(ctx context.Context, machine *clusterv1.Machine, key, value string) error { logger := log.FromContext(ctx) if machine == nil || machine.Status.NodeRef == nil { // Nothing to do, no node for Machine logger.Info("Nothing to do, no node for Machine") return nil } controlPlaneNodes, err := w.getControlPlaneNodes(ctx) if err != nil { return fmt.Errorf("getting control plane nodes: %w", err) } for _, node := range controlPlaneNodes.Items { if node.Name == machine.Status.NodeRef.Name { if node.Labels == nil { node.Labels = map[string]string{} } node.Labels[key] = value if helper, ok := w.nodePatchHelpers[node.Name]; ok { if err := helper.Patch(ctx, &node); err != nil { return fmt.Errorf("patching node %s with %s label: %w", node.Name, key, err) } } else { return fmt.Errorf("could not find patch helper for node %s", node.Name) } break } } logger.Info(fmt.Sprintf("Applied label %s on node %s", key, machine.Status.NodeRef.Name)) return nil } // UpdateEtcdConditions is responsible for updating machine conditions reflecting the status of all the etcd members. // This operation is best effort, in the sense that in case of problems in retrieving member status, it sets // the condition to Unknown state without returning any error. func (w *Workload) UpdateEtcdConditions(controlPlane *ControlPlane) { w.updateManagedEtcdConditions(controlPlane) } type aggregateFromMachinesToRCPInput struct { controlPlane *ControlPlane machineConditions []clusterv1.ConditionType rcpErrors []string condition clusterv1.ConditionType unhealthyReason string unknownReason string note string } func (w *Workload) getControlPlaneNodes(ctx context.Context) (*corev1.NodeList, error) { nodes := &corev1.NodeList{} labels := map[string]string{ labelNodeRoleControlPlane: "true", } if err := w.List(ctx, nodes, ctrlclient.MatchingLabels(labels)); err != nil { return nil, err } return nodes, nil } // aggregateFromMachinesToRCP aggregates a group of conditions from machines to RCP. // NOTE: this func follows the same aggregation rules used by conditions.Merge thus giving priority to // errors, then warning, info down to unknown. func aggregateFromMachinesToRCP(input aggregateFromMachinesToRCPInput) { // Aggregates machines for condition status. // NB. A machine could be assigned to many groups, but only the group with the highest severity will be reported. rcpMachinesWithErrors := sets.NewString() rcpMachinesWithWarnings := sets.NewString() rcpMachinesWithInfo := sets.NewString() rcpMachinesWithTrue := sets.NewString() rcpMachinesWithUnknown := sets.NewString() for i := range input.controlPlane.Machines { machine := input.controlPlane.Machines[i] for _, condition := range input.machineConditions { if machineCondition := conditions.Get(machine, condition); machineCondition != nil { switch machineCondition.Status { case corev1.ConditionTrue: rcpMachinesWithTrue.Insert(machine.Name) case corev1.ConditionFalse: switch machineCondition.Severity { case clusterv1.ConditionSeverityInfo: rcpMachinesWithInfo.Insert(machine.Name) case clusterv1.ConditionSeverityWarning: rcpMachinesWithWarnings.Insert(machine.Name) case clusterv1.ConditionSeverityError: rcpMachinesWithErrors.Insert(machine.Name) } case corev1.ConditionUnknown: rcpMachinesWithUnknown.Insert(machine.Name) } } } } // In case of at least one machine with errors or RCP level errors (nodes without machines), report false, error. if len(rcpMachinesWithErrors) > 0 { input.rcpErrors = append( input.rcpErrors, fmt.Sprintf("Following machines are reporting %s errors: %s", input.note, strings.Join(rcpMachinesWithErrors.List(), ", "))) } if len(input.rcpErrors) > 0 { conditions.MarkFalse( input.controlPlane.RCP, input.condition, input.unhealthyReason, clusterv1.ConditionSeverityError, "%s", strings.Join(input.rcpErrors, "; ")) return } // In case of no errors and at least one machine with warnings, report false, warnings. if len(rcpMachinesWithWarnings) > 0 { conditions.MarkFalse( input.controlPlane.RCP, input.condition, input.unhealthyReason, clusterv1.ConditionSeverityWarning, "Following machines are reporting %s warnings: %s", input.note, strings.Join(rcpMachinesWithWarnings.List(), ", ")) return } // In case of no errors, no warning, and at least one machine with info, report false, info. if len(rcpMachinesWithWarnings) > 0 { conditions.MarkFalse( input.controlPlane.RCP, input.condition, input.unhealthyReason, clusterv1.ConditionSeverityWarning, "Following machines are reporting %s info: %s", input.note, strings.Join(rcpMachinesWithInfo.List(), ", ")) return } // In case of no errors, no warning, no Info, and at least one machine with true conditions, report true. if len(rcpMachinesWithTrue) > 0 { conditions.MarkTrue(input.controlPlane.RCP, input.condition) return } // Otherwise, if there is at least one machine with unknown, report unknown. if len(rcpMachinesWithUnknown) > 0 { conditions.MarkUnknown( input.controlPlane.RCP, input.condition, input.unknownReason, "Following machines are reporting unknown %s status: %s", input.note, strings.Join(rcpMachinesWithUnknown.List(), ", ")) return } } func (w *Workload) updateManagedEtcdConditions(controlPlane *ControlPlane) { // NOTE: This methods uses control plane nodes only to get in contact with etcd but then it relies on etcd // as ultimate source of truth for the list of members and for their health. for k := range w.Nodes { node := w.Nodes[k] machine, found := controlPlane.Machines[node.Name] if !found { // If there are machines still provisioning there is the chance that a chance that a node might be linked to a machine soon, // otherwise report the error at RCP level given that there is no machine to report on. if hasProvisioningMachine(controlPlane.Machines) { continue } for _, m := range controlPlane.Machines { if m.Status.NodeRef != nil && m.Status.NodeRef.Name == node.Name { machine = m } } if machine == nil { continue } } // If the machine is deleting, report all the conditions as deleting if !machine.DeletionTimestamp.IsZero() { conditions.MarkFalse(machine, controlplanev1.MachineEtcdMemberHealthyCondition, clusterv1.DeletingReason, clusterv1.ConditionSeverityInfo, "") continue } conditions.MarkTrue(machine, controlplanev1.MachineEtcdMemberHealthyCondition) } }