prepare for rebase on main

Signed-off-by: Mohamed Belgaied Hassine <belgaied2@hotmail.com>
This commit is contained in:
Mohamed Belgaied Hassine 2022-11-24 15:00:13 +00:00
parent 1cfab9f30a
commit 9a1a2cc718
No known key found for this signature in database
GPG Key ID: 391F62FCFA48CCD9
15 changed files with 889 additions and 28 deletions

View File

@ -29,10 +29,14 @@ const (
)
const (
// DataSecretGenerationFailedReason (Severity=Warning) documents a RKE2Config controller detecting
// an error while generating a data secret; those kind of errors are usually due to misconfigurations
// and user intervention is required to get them fixed.
DataSecretGenerationFailedReason string = "DataSecretGenerationFailed"
// WaitingForClusterInfrastructureReason (Severity=Info) document a bootstrap secret generation process
// waiting for the cluster infrastructure to be ready.
//
// NOTE: Having the cluster infrastructure ready is a pre-condition for starting to create machines;
WaitingForClusterInfrastructureReason string = "WaitingForClusterInfrastructure"
)

View File

@ -22,7 +22,7 @@ spec:
- /manager
args:
- --leader-elect
- --metrics-bind-address=localhost:8080
- --metrics-bind-addr=localhost:8080
image: controller:latest
name: manager
securityContext:

View File

@ -47,3 +47,12 @@ rules:
- get
- list
- watch
- apiGroups:
- controlplane.cluster.x-k8s.io
resources:
- rke2controlplanes
- rke2controlplanes/status
verbs:
- get
- list
- watch

View File

@ -8,5 +8,5 @@ roleRef:
name: manager-role
subjects:
- kind: ServiceAccount
name: controller-manager
name: manager
namespace: system

View File

@ -19,33 +19,43 @@ package controllers
import (
"context"
"fmt"
"time"
"github.com/go-logr/logr"
"github.com/pkg/errors"
bootstrapv1 "github.com/rancher-sandbox/cluster-api-provider-rke2/bootstrap/api/v1alpha1"
controlplanev1 "github.com/rancher-sandbox/cluster-api-provider-rke2/controlplane/api/v1alpha1"
"github.com/rancher-sandbox/cluster-api-provider-rke2/pkg/cloudinit"
"github.com/rancher-sandbox/cluster-api-provider-rke2/pkg/locking"
"github.com/rancher-sandbox/cluster-api-provider-rke2/pkg/rke2"
bsutil "github.com/rancher-sandbox/cluster-api-provider-rke2/pkg/util"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/utils/pointer"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/conditions"
"sigs.k8s.io/cluster-api/util/patch"
kubeyaml "sigs.k8s.io/yaml"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)
// Rke2ConfigReconciler reconciles a Rke2Config object
type Rke2ConfigReconciler struct {
// RKE2ConfigReconciler reconciles a Rke2Config object
type RKE2ConfigReconciler struct {
RKE2InitLock RKE2InitLock
client.Client
Scheme *runtime.Scheme
}
//+kubebuilder:rbac:groups=bootstrap.cluster.x-k8s.io,resources=rke2configs;rke2configs/status;rke2configs/finalizers,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=controlplane.cluster.x-k8s.io,resources=rke2controlplanes;rke2controlplanes/status,verbs=get;list;watch
//+kubebuilder:rbac:groups=cluster.x-k8s.io,resources=clusters;clusters/status;machinesets;machines;machines/status;machinepools;machinepools/status,verbs=get;list;watch
//+kubebuilder:rbac:groups="",resources=secrets;events;configmaps,verbs=get;list;watch;create;update;patch;delete
@ -58,7 +68,7 @@ type Rke2ConfigReconciler struct {
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.12.2/pkg/reconcile
func (r *Rke2ConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, rerr error) {
func (r *RKE2ConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, rerr error) {
logger := log.FromContext(ctx)
logger.Info("Reconcile RKE2Config")
@ -82,7 +92,7 @@ func (r *Rke2ConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, err
}
if cp == nil {
logger.Info("This config is for a workerNode")
logger.Info("This config is for a worker node")
scope.HasControlPlaneOwner = false
} else {
logger.Info("This config is for a ControlPlane node")
@ -155,7 +165,7 @@ func (r *Rke2ConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
}()
if !machine.Status.InfrastructureReady {
if !cluster.Status.InfrastructureReady {
logger.Info("Infrastructure machine not yet ready")
conditions.MarkFalse(config, bootstrapv1.DataSecretAvailableCondition, bootstrapv1.WaitingForClusterInfrastructureReason, clusterv1.ConditionSeverityInfo, "")
return ctrl.Result{Requeue: true}, nil
@ -206,7 +216,7 @@ func (r *Rke2ConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// Scope is a scoped struct used during reconciliation.
type Scope struct {
logr.Logger
Logger logr.Logger
Config *bootstrapv1.RKE2Config
Machine *clusterv1.Machine
Cluster *clusterv1.Cluster
@ -215,7 +225,11 @@ type Scope struct {
}
// SetupWithManager sets up the controller with the Manager.
func (r *Rke2ConfigReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *RKE2ConfigReconciler) SetupWithManager(mgr ctrl.Manager) error {
if r.RKE2InitLock == nil {
r.RKE2InitLock = locking.NewControlPlaneInitMutex(mgr.GetClient())
}
return ctrl.NewControllerManagedBy(mgr).
For(&bootstrapv1.RKE2Config{}).
Complete(r)
@ -224,25 +238,191 @@ func (r *Rke2ConfigReconciler) SetupWithManager(mgr ctrl.Manager) error {
// TODO: Implement these functions
// handleClusterNotInitialized handles the first control plane node
func (r *Rke2ConfigReconciler) handleClusterNotInitialized(ctx context.Context, scope *Scope) (res ctrl.Result, rerr error) {
return ctrl.Result{}, nil
}
func (r *RKE2ConfigReconciler) handleClusterNotInitialized(ctx context.Context, scope *Scope) (res ctrl.Result, reterr error) {
// initialize the DataSecretAvailableCondition if missing.
// this is required in order to avoid the condition's LastTransitionTime to flicker in case of errors surfacing
// using the DataSecretGeneratedFailedReason
if conditions.GetReason(scope.Config, bootstrapv1.DataSecretAvailableCondition) != bootstrapv1.DataSecretGenerationFailedReason {
conditions.MarkFalse(scope.Config, bootstrapv1.DataSecretAvailableCondition, clusterv1.WaitingForControlPlaneAvailableReason, clusterv1.ConditionSeverityInfo, "")
}
func Unlock(ctx context.Context, cluster *clusterv1.Cluster) bool {
return true
if !scope.HasControlPlaneOwner {
scope.Logger.Info("Requeuing because this machine is not a Control Plane machine")
return ctrl.Result{RequeueAfter: 1 * time.Minute}, nil
}
if r.RKE2InitLock.Lock(ctx, scope.Cluster, scope.Machine) {
scope.Logger.Info("A control plane is already being initialized, requeuing until control plane is ready")
return ctrl.Result{RequeueAfter: 1 * time.Minute}, nil
}
defer func() {
if reterr != nil {
if !r.RKE2InitLock.Unlock(ctx, scope.Cluster) {
reterr = kerrors.NewAggregate([]error{reterr, errors.New("failed to unlock the rke2 init lock")})
}
}
}()
token, err := r.generateAndStoreToken(ctx, scope)
if err != nil {
return ctrl.Result{}, err
}
configStruct, files, err := rke2.GenerateInitControlPlaneConfig(
rke2.RKE2ServerConfigOpts{
ControlPlaneEndpoint: scope.Cluster.Spec.ControlPlaneEndpoint.Host,
Token: token,
ServerURL: "https://" + scope.Cluster.Spec.ControlPlaneEndpoint.Host + ":9345",
ServerConfig: scope.ControlPlane.Spec.ServerConfig,
AgentConfig: scope.Config.Spec.AgentConfig,
Ctx: ctx,
Client: r.Client,
})
if err != nil {
return ctrl.Result{}, err
}
b, err := kubeyaml.Marshal(configStruct)
if err != nil {
return ctrl.Result{}, err
}
initConfigFile := bootstrapv1.File{
Path: rke2.DefaultRKE2ConfigLocation,
Content: string(b),
Owner: "root:root",
Permissions: "0640",
}
//files, err := r.resolveFiles(ctx, scope.Config)
//if err != nil {
//conditions.MarkFalse(scope.Config, bootstrapv1.DataSecretAvailableCondition, bootstrapv1.DataSecretGenerationFailedReason, clusterv1.ConditionSeverityWarning, err.Error())
//return ctrl.Result{}, err
//}
cpinput := &cloudinit.ControlPlaneInput{
BaseUserData: cloudinit.BaseUserData{
PreRKE2Commands: scope.Config.Spec.PreRKE2Commands,
PostRKE2Commands: scope.Config.Spec.PostRKE2Commands,
ConfigFile: initConfigFile,
RKE2Version: scope.Config.Spec.AgentConfig.Version,
WriteFiles: files,
},
}
cloudInitData, err := cloudinit.NewInitControlPlane(cpinput)
if err != nil {
return ctrl.Result{}, err
}
if err := r.storeBootstrapData(ctx, scope, cloudInitData); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
type RKE2InitLock interface {
Unlock(context.Context, *clusterv1.Cluster) bool
Lock(context.Context, *clusterv1.Cluster) bool
Unlock(ctx context.Context, cluster *clusterv1.Cluster) bool
Lock(ctx context.Context, cluster *clusterv1.Cluster, machine *clusterv1.Machine) bool
}
// TODO: Implement these functions
func (r *Rke2ConfigReconciler) joinControlplane(ctx context.Context, scope *Scope) (res ctrl.Result, rerr error) {
func (r *RKE2ConfigReconciler) joinControlplane(ctx context.Context, scope *Scope) (res ctrl.Result, rerr error) {
return ctrl.Result{}, nil
}
// TODO: Implement these functions
func (r *Rke2ConfigReconciler) joinWorker(ctx context.Context, scope *Scope) (res ctrl.Result, rerr error) {
func (r *RKE2ConfigReconciler) joinWorker(ctx context.Context, scope *Scope) (res ctrl.Result, rerr error) {
return ctrl.Result{}, nil
}
func (r *RKE2ConfigReconciler) generateAndStoreToken(ctx context.Context, scope *Scope) (string, error) {
tokn, err := bsutil.Random(16)
if err != nil {
return "", err
}
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: bsutil.TokenName(scope.Cluster.Name),
Namespace: scope.Config.Namespace,
Labels: map[string]string{
clusterv1.ClusterLabelName: scope.Cluster.Name,
},
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: scope.Cluster.APIVersion,
Kind: scope.Cluster.Kind,
Name: scope.Cluster.Name,
UID: scope.Cluster.UID,
Controller: pointer.BoolPtr(true),
},
},
},
Data: map[string][]byte{
"value": []byte(tokn),
},
Type: clusterv1.ClusterSecretType,
}
// as secret creation and scope.Config status patch are not atomic operations
// it is possible that secret creation happens but the config.Status patches are not applied
if err := r.Client.Create(ctx, secret); err != nil {
if !apierrors.IsAlreadyExists(err) {
return "", errors.Wrapf(err, "failed to create token for RKE2Config %s/%s", scope.Config.Namespace, scope.Config.Name)
}
// r.Log.Info("bootstrap data secret for RKE2Config already exists, updating", "secret", secret.Name, "RKE2Config", scope.Config.Name)
if err := r.Client.Update(ctx, secret); err != nil {
return "", errors.Wrapf(err, "failed to update bootstrap token secret for RKE2Config %s/%s", scope.Config.Namespace, scope.Config.Name)
}
}
return tokn, nil
}
// storeBootstrapData creates a new secret with the data passed in as input,
// sets the reference in the configuration status and ready to true.
func (r *RKE2ConfigReconciler) storeBootstrapData(ctx context.Context, scope *Scope, data []byte) error {
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: scope.Config.Name,
Namespace: scope.Config.Namespace,
Labels: map[string]string{
clusterv1.ClusterLabelName: scope.Cluster.Name,
},
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: bootstrapv1.GroupVersion.String(),
Kind: "RKE2Config",
Name: scope.Config.Name,
UID: scope.Config.UID,
Controller: pointer.BoolPtr(true),
},
},
},
Data: map[string][]byte{
"value": data,
},
Type: clusterv1.ClusterSecretType,
}
// as secret creation and scope.Config status patch are not atomic operations
// it is possible that secret creation happens but the config.Status patches are not applied
if err := r.Client.Create(ctx, secret); err != nil {
if !apierrors.IsAlreadyExists(err) {
return errors.Wrapf(err, "failed to create bootstrap data secret for RKE2Config %s/%s", scope.Config.Namespace, scope.Config.Name)
}
scope.Logger.Info("bootstrap data secret for RKE2Config already exists, updating", "secret", secret.Name, "RKE2Config", scope.Config.Name)
if err := r.Client.Update(ctx, secret); err != nil {
return errors.Wrapf(err, "failed to update bootstrap data secret for RKE2Config %s/%s", scope.Config.Namespace, scope.Config.Name)
}
}
scope.Config.Status.DataSecretName = pointer.StringPtr(secret.Name)
scope.Config.Status.Ready = true
// conditions.MarkTrue(scope.Config, bootstrapv1.DataSecretAvailableCondition)
return nil
}

View File

@ -171,7 +171,7 @@ func setupChecks(mgr ctrl.Manager) {
}
func setupReconcilers(mgr ctrl.Manager) {
if err := (&controllers.Rke2ConfigReconciler{
if err := (&controllers.RKE2ConfigReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {

View File

@ -22,7 +22,7 @@ spec:
- /manager
args:
- "--leader-elect"
- "--metrics-bind-address=localhost:8080"
- "--metrics-bind-addr=localhost:8080"
image: controller:latest
name: manager
env:

View File

@ -417,8 +417,6 @@ func (r *RKE2ControlPlaneReconciler) upgradeControlPlane(
return ctrl.Result{}, nil
}
// TODO: handle reconciliation of etcd members and RKE2 config in case they get out of sync with cluster
workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster))
if err != nil {
logger.Error(err, "failed to get remote client for workload cluster", "cluster key", util.ObjectKey(cluster))

2
go.mod
View File

@ -16,6 +16,7 @@ require (
k8s.io/utils v0.0.0-20220823124924-e9cbc92d1a73
sigs.k8s.io/cluster-api v1.3.1
sigs.k8s.io/controller-runtime v0.13.1
sigs.k8s.io/yaml v1.3.0
)
require (
@ -87,5 +88,4 @@ require (
k8s.io/kube-openapi v0.0.0-20220803164354-a70c9af30aea // indirect
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)

View File

@ -0,0 +1,88 @@
package cloudinit
import (
"bytes"
"strings"
"text/template"
"github.com/pkg/errors"
bootstrapv1 "github.com/rancher-sandbox/cluster-api-provider-rke2/bootstrap/api/v1alpha1"
)
var (
defaultTemplateFuncMap = template.FuncMap{
"Indent": templateYAMLIndent,
}
)
func templateYAMLIndent(i int, input string) string {
split := strings.Split(input, "\n")
ident := "\n" + strings.Repeat(" ", i)
return strings.Repeat(" ", i) + strings.Join(split, ident)
}
const (
cloudConfigHeader = `## template: jinja
#cloud-config
`
filesTemplate = `{{ define "files" -}}
write_files:{{ range . }}
- path: {{.Path}}
{{ if ne .Encoding "" -}}
encoding: "{{.Encoding}}"
{{ end -}}
{{ if ne .Owner "" -}}
owner: {{.Owner}}
{{ end -}}
{{ if ne .Permissions "" -}}
permissions: '{{.Permissions}}'
{{ end -}}
content: |
{{.Content | Indent 6}}
{{- end -}}
{{- end -}}
`
commandsTemplate = `{{- define "commands" -}}
{{ range . }}
- {{printf "%q" .}}
{{- end -}}
{{- end -}}
`
)
// BaseUserData is shared across all the various types of files written to disk.
type BaseUserData struct {
Header string
PreRKE2Commands []string
DeployRKE2Commands []string
PostRKE2Commands []string
AdditionalFiles []bootstrapv1.File
WriteFiles []bootstrapv1.File
ConfigFile bootstrapv1.File
RKE2Version string
}
func generate(kind string, tpl string, data interface{}) ([]byte, error) {
tm := template.New(kind).Funcs(defaultTemplateFuncMap)
if _, err := tm.Parse(filesTemplate); err != nil {
return nil, errors.Wrap(err, "failed to parse files template")
}
if _, err := tm.Parse(commandsTemplate); err != nil {
return nil, errors.Wrap(err, "failed to parse commands template")
}
t, err := tm.Parse(tpl)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse %s template", kind)
}
var out bytes.Buffer
if err := t.Execute(&out, data); err != nil {
return nil, errors.Wrapf(err, "failed to generate %s template", kind)
}
return out.Bytes(), nil
}

View File

@ -0,0 +1,57 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cloudinit
import (
"fmt"
)
const (
controlPlaneCloudInit = `{{.Header}}
{{template "files" .WriteFiles}}
runcmd:
{{- template "commands" .PreRKE2Commands }}
- export INSTALL_RKE2_VERSION=%[1]s
{{- template "commands" .DeployRKE2Commands }}
{{else}}
- 'curl -sfL https://get.rke2.io | INSTALL_RKE2_VERSION=%[1]s sh -s - server'
- 'systemctl enable rke2-server.service'
- 'systemctl start rke2-server.service'
{{end}}
{{- template "commands" .PostRKE2Commands }}
`
)
// ControlPlaneInput defines the context to generate a controlplane instance user data.
type ControlPlaneInput struct {
BaseUserData
//secret.Certificates
}
// NewInitControlPlane returns the user data string to be used on a controlplane instance.
func NewInitControlPlane(input *ControlPlaneInput) ([]byte, error) {
input.Header = cloudConfigHeader
input.WriteFiles = append(input.WriteFiles, input.ConfigFile)
controlPlaneCloudJoinWithVersion := fmt.Sprintf(controlPlaneCloudInit, input.RKE2Version)
userData, err := generate("InitControlplane", controlPlaneCloudJoinWithVersion, input)
if err != nil {
return nil, err
}
return userData, nil
}

View File

@ -0,0 +1,191 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package locking implements locking functionality.
package locking
import (
"context"
"encoding/json"
"fmt"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
)
const semaphoreInformationKey = "lock-information"
// ControlPlaneInitMutex uses a ConfigMap to synchronize cluster initialization.
type ControlPlaneInitMutex struct {
client client.Client
}
// NewControlPlaneInitMutex returns a lock that can be held by a control plane node before init.
func NewControlPlaneInitMutex(client client.Client) *ControlPlaneInitMutex {
return &ControlPlaneInitMutex{
client: client,
}
}
// Lock allows a control plane node to be the first and only node to initialize an RKE2 cluster.
func (c *ControlPlaneInitMutex) Lock(ctx context.Context, cluster *clusterv1.Cluster, machine *clusterv1.Machine) bool {
sema := newSemaphore()
cmName := configMapName(cluster.Name)
log := ctrl.LoggerFrom(ctx, "ConfigMap", klog.KRef(cluster.Namespace, cmName))
err := c.client.Get(ctx, client.ObjectKey{
Namespace: cluster.Namespace,
Name: cmName,
}, sema.ConfigMap)
switch {
case apierrors.IsNotFound(err):
break
case err != nil:
log.Error(err, "Failed to acquire init lock")
return false
default: // Successfully found an existing config map.
info, err := sema.information()
if err != nil {
log.Error(err, "Failed to get information about the existing init lock")
return false
}
// The machine requesting the lock is the machine that created the lock, therefore the lock is acquired.
if info.MachineName == machine.Name {
return true
}
// If the machine that created the lock can not be found unlock the mutex.
if err := c.client.Get(ctx, client.ObjectKey{
Namespace: cluster.Namespace,
Name: info.MachineName,
}, &clusterv1.Machine{}); err != nil {
log.Error(err, "Failed to get machine holding init lock")
if apierrors.IsNotFound(err) {
c.Unlock(ctx, cluster)
}
}
log.Info(fmt.Sprintf("Waiting for Machine %s to initialize", info.MachineName))
return false
}
// Adds owner reference, namespace and name
sema.setMetadata(cluster)
// Adds the additional information
if err := sema.setInformation(&information{MachineName: machine.Name}); err != nil {
log.Error(err, "Failed to acquire init lock while setting semaphore information")
return false
}
log.Info("Attempting to acquire the lock")
err = c.client.Create(ctx, sema.ConfigMap)
switch {
case apierrors.IsAlreadyExists(err):
log.Info("Cannot acquire the init lock. The init lock has been acquired by someone else")
return false
case err != nil:
log.Error(err, "Error acquiring the init lock")
return false
default:
return true
}
}
// Unlock releases the lock.
func (c *ControlPlaneInitMutex) Unlock(ctx context.Context, cluster *clusterv1.Cluster) bool {
sema := newSemaphore()
cmName := configMapName(cluster.Name)
log := ctrl.LoggerFrom(ctx, "ConfigMap", klog.KRef(cluster.Namespace, cmName))
err := c.client.Get(ctx, client.ObjectKey{
Namespace: cluster.Namespace,
Name: cmName,
}, sema.ConfigMap)
switch {
case apierrors.IsNotFound(err):
log.Info("Control plane init lock not found, it may have been released already")
return true
case err != nil:
log.Error(err, "Error unlocking the control plane init lock")
return false
default:
// Delete the config map semaphore if there is no error fetching it
if err := c.client.Delete(ctx, sema.ConfigMap); err != nil {
if apierrors.IsNotFound(err) {
return true
}
log.Error(err, "Error deleting the config map underlying the control plane init lock")
return false
}
return true
}
}
type information struct {
MachineName string `json:"machineName"`
}
type semaphore struct {
*corev1.ConfigMap
}
func newSemaphore() *semaphore {
return &semaphore{&corev1.ConfigMap{}}
}
func configMapName(clusterName string) string {
return fmt.Sprintf("%s-lock", clusterName)
}
func (s semaphore) information() (*information, error) {
li := &information{}
if err := json.Unmarshal([]byte(s.Data[semaphoreInformationKey]), li); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal semaphore information")
}
return li, nil
}
func (s semaphore) setInformation(information *information) error {
b, err := json.Marshal(information)
if err != nil {
return errors.Wrap(err, "failed to marshal semaphore information")
}
s.Data = map[string]string{}
s.Data[semaphoreInformationKey] = string(b)
return nil
}
func (s *semaphore) setMetadata(cluster *clusterv1.Cluster) {
s.ObjectMeta = metav1.ObjectMeta{
Namespace: cluster.Namespace,
Name: configMapName(cluster.Name),
Labels: map[string]string{
clusterv1.ClusterLabelName: cluster.Name,
},
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: cluster.APIVersion,
Kind: cluster.Kind,
Name: cluster.Name,
UID: cluster.UID,
},
},
}
}

View File

@ -0,0 +1,309 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package locking
import (
"context"
"errors"
"fmt"
"testing"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
)
const (
clusterName = "test-cluster"
clusterNamespace = "test-namespace"
)
var (
ctx = ctrl.SetupSignalHandler()
)
func TestControlPlaneInitMutex_Lock(t *testing.T) {
g := NewWithT(t)
scheme := runtime.NewScheme()
g.Expect(clusterv1.AddToScheme(scheme)).To(Succeed())
g.Expect(corev1.AddToScheme(scheme)).To(Succeed())
uid := types.UID("test-uid")
tests := []struct {
name string
client client.Client
shouldAcquire bool
}{
{
name: "should successfully acquire lock if the config cannot be found",
client: &fakeClient{
Client: fake.NewClientBuilder().WithScheme(scheme).Build(),
getError: apierrors.NewNotFound(schema.GroupResource{Group: "", Resource: "configmaps"}, fmt.Sprintf("%s-controlplane", uid)),
},
shouldAcquire: true,
},
{
name: "should not acquire lock if already exits",
client: &fakeClient{
Client: fake.NewClientBuilder().WithScheme(scheme).WithObjects(&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: configMapName(clusterName),
Namespace: clusterNamespace,
},
}).Build(),
},
shouldAcquire: false,
},
{
name: "should not acquire lock if cannot create config map",
client: &fakeClient{
Client: fake.NewClientBuilder().WithScheme(scheme).Build(),
getError: apierrors.NewNotFound(schema.GroupResource{Group: "", Resource: "configmaps"}, configMapName(clusterName)),
createError: errors.New("create error"),
},
shouldAcquire: false,
},
{
name: "should not acquire lock if config map already exists while creating",
client: &fakeClient{
Client: fake.NewClientBuilder().WithScheme(scheme).Build(),
getError: apierrors.NewNotFound(schema.GroupResource{Group: "", Resource: "configmaps"}, fmt.Sprintf("%s-controlplane", uid)),
createError: apierrors.NewAlreadyExists(schema.GroupResource{Group: "", Resource: "configmaps"}, fmt.Sprintf("%s-controlplane", uid)),
},
shouldAcquire: false,
},
}
for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
gs := NewWithT(t)
l := &ControlPlaneInitMutex{
client: tc.client,
}
cluster := &clusterv1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Namespace: clusterNamespace,
Name: clusterName,
UID: uid,
},
}
machine := &clusterv1.Machine{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("machine-%s", cluster.Name),
},
}
gs.Expect(l.Lock(ctx, cluster, machine)).To(Equal(tc.shouldAcquire))
})
}
}
func TestControlPlaneInitMutex_LockWithMachineDeletion(t *testing.T) {
g := NewWithT(t)
scheme := runtime.NewScheme()
g.Expect(clusterv1.AddToScheme(scheme)).To(Succeed())
g.Expect(corev1.AddToScheme(scheme)).To(Succeed())
newMachineName := "new-machine"
tests := []struct {
name string
client client.Client
expectedMachineName string
}{
{
name: "should not give the lock to new machine if the machine that created it does exist",
client: &fakeClient{
Client: fake.NewClientBuilder().WithScheme(scheme).WithObjects(
&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: configMapName(clusterName),
Namespace: clusterNamespace},
Data: map[string]string{
"lock-information": "{\"machineName\":\"existent-machine\"}",
}},
&clusterv1.Machine{
ObjectMeta: metav1.ObjectMeta{
Name: "existent-machine",
Namespace: clusterNamespace,
},
},
).Build(),
},
expectedMachineName: "existent-machine",
},
{
name: "should give the lock to new machine if the machine that created it does not exist",
client: &fakeClient{
Client: fake.NewClientBuilder().WithScheme(scheme).WithObjects(
&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: configMapName(clusterName),
Namespace: clusterNamespace},
Data: map[string]string{
"lock-information": "{\"machineName\":\"non-existent-machine\"}",
}},
).Build(),
},
expectedMachineName: newMachineName,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
l := &ControlPlaneInitMutex{
client: tc.client,
}
cluster := &clusterv1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Namespace: clusterNamespace,
Name: clusterName,
},
}
machine := &clusterv1.Machine{
ObjectMeta: metav1.ObjectMeta{
Name: newMachineName,
},
}
g.Eventually(func(g Gomega) error {
l.Lock(ctx, cluster, machine)
cm := &corev1.ConfigMap{}
g.Expect(tc.client.Get(ctx, client.ObjectKey{
Name: configMapName(clusterName),
Namespace: cluster.Namespace,
}, cm)).To(Succeed())
info, err := semaphore{cm}.information()
g.Expect(err).To(BeNil())
g.Expect(info.MachineName).To(Equal(tc.expectedMachineName))
return nil
}, "20s").Should(Succeed())
})
}
}
func TestControlPlaneInitMutex_UnLock(t *testing.T) {
uid := types.UID("test-uid")
configMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: configMapName(clusterName),
Namespace: clusterNamespace,
},
}
tests := []struct {
name string
client client.Client
shouldRelease bool
}{
{
name: "should release lock by deleting config map",
client: &fakeClient{
Client: fake.NewClientBuilder().Build(),
},
shouldRelease: true,
},
{
name: "should not release lock if cannot delete config map",
client: &fakeClient{
Client: fake.NewClientBuilder().WithObjects(configMap).Build(),
deleteError: errors.New("delete error"),
},
shouldRelease: false,
},
{
name: "should release lock if config map does not exist",
client: &fakeClient{
Client: fake.NewClientBuilder().Build(),
getError: apierrors.NewNotFound(schema.GroupResource{Group: "", Resource: "configmaps"}, fmt.Sprintf("%s-controlplane", uid)),
},
shouldRelease: true,
},
{
name: "should not release lock if error while getting config map",
client: &fakeClient{
Client: fake.NewClientBuilder().Build(),
getError: errors.New("get error"),
},
shouldRelease: false,
},
}
for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
gs := NewWithT(t)
l := &ControlPlaneInitMutex{
client: tc.client,
}
cluster := &clusterv1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Namespace: clusterNamespace,
Name: clusterName,
UID: uid,
},
}
gs.Expect(l.Unlock(ctx, cluster)).To(Equal(tc.shouldRelease))
})
}
}
type fakeClient struct {
client.Client
getError error
createError error
deleteError error
}
func (fc *fakeClient) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error {
if fc.getError != nil {
return fc.getError
}
return fc.Client.Get(ctx, key, obj, opts...)
}
func (fc *fakeClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error {
if fc.createError != nil {
return fc.createError
}
return fc.Client.Create(ctx, obj, opts...)
}
func (fc *fakeClient) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error {
if fc.deleteError != nil {
return fc.deleteError
}
return fc.Client.Delete(ctx, obj, opts...)
}

View File

@ -18,23 +18,30 @@ package util
import (
"context"
"crypto/rand"
"encoding/hex"
"fmt"
controlplanev1 "github.com/rancher-sandbox/cluster-api-provider-rke2/controlplane/api/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)
func GetOwnerControlPlane(ctx context.Context, c client.Client, obj metav1.ObjectMeta) (*controlplanev1.RKE2ControlPlane, error) {
logger := log.FromContext(ctx)
ownerRefs := obj.OwnerReferences
var cpOwnerRef metav1.OwnerReference
for _, ownerRef := range ownerRefs {
if ownerRef.APIVersion == controlplanev1.GroupVersion.Group && ownerRef.Kind == "RKE2ControlPlane" {
logger.Info("Inside GetOwnerControlPlane", "ownerRef.APIVersion", ownerRef.APIVersion, "ownerRef.Kind", ownerRef.Kind, "cpv1.GroupVersion.Group", controlplanev1.GroupVersion.Group)
if ownerRef.APIVersion == controlplanev1.GroupVersion.Group+"/"+controlplanev1.GroupVersion.Version && ownerRef.Kind == "RKE2ControlPlane" {
cpOwnerRef = ownerRef
}
}
logger.Info("GetOwnerControlPlane result:", "cpOwnerRef", cpOwnerRef)
if (cpOwnerRef != metav1.OwnerReference{}) {
return GetControlPlaneByName(ctx, c, obj.Namespace, cpOwnerRef.Name)
}
@ -61,3 +68,18 @@ func GetClusterByName(ctx context.Context, c client.Client, namespace, name stri
}
return m, nil
}
// Random generates a random string with length size
func Random(size int) (string, error) {
token := make([]byte, size)
_, err := rand.Read(token)
if err != nil {
return "", err
}
return hex.EncodeToString(token), err
}
// TokenName returns a token name from the cluster name
func TokenName(clusterName string) string {
return fmt.Sprintf("%s-token", clusterName)
}

View File

@ -12,7 +12,8 @@
"internal",
"pkg"
],
"label": "CAPRKE2"
"label": "CAPRKE2",
"kustomize_config": true
}
},
{
@ -26,9 +27,11 @@
"go.sum",
"api",
"internal",
"pkg"
"pkg",
"../pkg"
],
"label": "CAPBPR"
"label": "CAPBPR",
"kustomize_config": true
}
}
]