525 lines
15 KiB
Go
525 lines
15 KiB
Go
package controller
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"slices"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/go-logr/logr"
|
|
appsv1 "k8s.io/api/apps/v1"
|
|
corev1 "k8s.io/api/core/v1"
|
|
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
|
"k8s.io/apimachinery/pkg/api/resource"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/klog/v2"
|
|
"k8s.io/utils/ptr"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
|
|
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
|
|
|
|
ecv1alpha1 "go.etcd.io/etcd-operator/api/v1alpha1"
|
|
"go.etcd.io/etcd-operator/internal/etcdutils"
|
|
clientv3 "go.etcd.io/etcd/client/v3"
|
|
)
|
|
|
|
const (
|
|
etcdDataDir = "/var/lib/etcd"
|
|
volumeName = "etcd-data"
|
|
)
|
|
|
|
type etcdClusterState string
|
|
|
|
const (
|
|
etcdClusterStateNew etcdClusterState = "new"
|
|
etcdClusterStateExisting etcdClusterState = "existing"
|
|
)
|
|
|
|
func prepareOwnerReference(ec *ecv1alpha1.EtcdCluster, scheme *runtime.Scheme) ([]metav1.OwnerReference, error) {
|
|
gvk, err := apiutil.GVKForObject(ec, scheme)
|
|
if err != nil {
|
|
return []metav1.OwnerReference{}, err
|
|
}
|
|
ref := metav1.OwnerReference{
|
|
APIVersion: gvk.GroupVersion().String(),
|
|
Kind: gvk.Kind,
|
|
Name: ec.GetName(),
|
|
UID: ec.GetUID(),
|
|
BlockOwnerDeletion: ptr.To(true),
|
|
Controller: ptr.To(true),
|
|
}
|
|
|
|
var owners []metav1.OwnerReference
|
|
owners = append(owners, ref)
|
|
return owners, nil
|
|
}
|
|
|
|
func reconcileStatefulSet(ctx context.Context, logger logr.Logger, ec *ecv1alpha1.EtcdCluster, c client.Client, replicas int32, scheme *runtime.Scheme) (*appsv1.StatefulSet, error) {
|
|
|
|
// prepare/update configmap for StatefulSet
|
|
err := applyEtcdClusterState(ctx, ec, int(replicas), c, scheme, logger)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Create Update StatefulSet
|
|
err = createOrPatchStatefulSet(ctx, logger, ec, c, replicas, scheme)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Wait for statefulset to be ready
|
|
err = waitForStatefulSetReady(ctx, logger, c, ec.Name, ec.Namespace)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Return latest Stateful set. (This is to ensure that we return the latest statefulset for next operation to act on)
|
|
return getStatefulSet(ctx, c, ec.Name, ec.Namespace)
|
|
}
|
|
|
|
func defaultArgs(name string) []string {
|
|
return []string{
|
|
"--name=$(POD_NAME)",
|
|
"--listen-peer-urls=http://0.0.0.0:2380", // TODO: only listen on 127.0.0.1 and host IP
|
|
"--listen-client-urls=http://0.0.0.0:2379", // TODO: only listen on 127.0.0.1 and host IP
|
|
fmt.Sprintf("--initial-advertise-peer-urls=http://$(POD_NAME).%s.$(POD_NAMESPACE).svc.cluster.local:2380", name),
|
|
fmt.Sprintf("--advertise-client-urls=http://$(POD_NAME).%s.$(POD_NAMESPACE).svc.cluster.local:2379", name),
|
|
}
|
|
}
|
|
|
|
func RemoveStringFromSlice(s []string, str string) []string {
|
|
for i := range s {
|
|
defaultArg := getArgName(s[i])
|
|
if defaultArg == str {
|
|
s = slices.Delete(s, i, i+1)
|
|
break
|
|
}
|
|
}
|
|
return s
|
|
}
|
|
|
|
func getArgName(s string) string {
|
|
idx := strings.Index(s, "=")
|
|
|
|
if idx != -1 {
|
|
return s[:idx]
|
|
}
|
|
|
|
idx = strings.Index(s, " ")
|
|
if idx != -1 {
|
|
return s[:idx]
|
|
}
|
|
|
|
// Assume arg is bool switch if idx is still -1
|
|
return strings.TrimSpace(s)
|
|
}
|
|
|
|
func createArgs(name string, etcdOptions []string) []string {
|
|
defaultArgs := defaultArgs(name)
|
|
if len(etcdOptions) > 0 {
|
|
var argName string
|
|
// Remove default arguments if conflicts with user supplied
|
|
for i := range etcdOptions {
|
|
argName = getArgName(etcdOptions[i])
|
|
defaultArgs = RemoveStringFromSlice(defaultArgs, argName)
|
|
}
|
|
}
|
|
defaultArgs = append(defaultArgs, etcdOptions...)
|
|
return defaultArgs
|
|
}
|
|
|
|
func createOrPatchStatefulSet(ctx context.Context, logger logr.Logger, ec *ecv1alpha1.EtcdCluster, c client.Client, replicas int32, scheme *runtime.Scheme) error {
|
|
sts := &appsv1.StatefulSet{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: ec.Name,
|
|
Namespace: ec.Namespace,
|
|
},
|
|
}
|
|
|
|
labels := map[string]string{
|
|
"app": ec.Name,
|
|
"controller": ec.Name,
|
|
}
|
|
|
|
// Create a new controller ref.
|
|
owners, err := prepareOwnerReference(ec, scheme)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
podSpec := corev1.PodSpec{
|
|
Containers: []corev1.Container{
|
|
{
|
|
Name: "etcd",
|
|
Command: []string{"/usr/local/bin/etcd"},
|
|
Args: createArgs(ec.Name, ec.Spec.EtcdOptions),
|
|
Image: fmt.Sprintf("gcr.io/etcd-development/etcd:%s", ec.Spec.Version),
|
|
Env: []corev1.EnvVar{
|
|
{
|
|
Name: "POD_NAME",
|
|
ValueFrom: &corev1.EnvVarSource{
|
|
FieldRef: &corev1.ObjectFieldSelector{
|
|
FieldPath: "metadata.name",
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Name: "POD_NAMESPACE",
|
|
ValueFrom: &corev1.EnvVarSource{
|
|
FieldRef: &corev1.ObjectFieldSelector{
|
|
FieldPath: "metadata.namespace",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
EnvFrom: []corev1.EnvFromSource{
|
|
{
|
|
ConfigMapRef: &corev1.ConfigMapEnvSource{
|
|
LocalObjectReference: corev1.LocalObjectReference{
|
|
Name: configMapNameForEtcdCluster(ec),
|
|
},
|
|
},
|
|
},
|
|
},
|
|
Ports: []corev1.ContainerPort{
|
|
{
|
|
Name: "client",
|
|
ContainerPort: 2379,
|
|
},
|
|
{
|
|
Name: "peer",
|
|
ContainerPort: 2380,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
stsSpec := appsv1.StatefulSetSpec{
|
|
Replicas: &replicas,
|
|
ServiceName: ec.Name,
|
|
Selector: &metav1.LabelSelector{
|
|
MatchLabels: labels,
|
|
},
|
|
Template: corev1.PodTemplateSpec{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Labels: labels,
|
|
},
|
|
Spec: podSpec,
|
|
},
|
|
}
|
|
|
|
if ec.Spec.StorageSpec != nil {
|
|
|
|
stsSpec.Template.Spec.Containers[0].VolumeMounts = []corev1.VolumeMount{{
|
|
Name: volumeName,
|
|
MountPath: etcdDataDir,
|
|
SubPathExpr: "$(POD_NAME)",
|
|
}}
|
|
// Create a new volume claim template
|
|
if ec.Spec.StorageSpec.VolumeSizeRequest.Cmp(resource.MustParse("1Mi")) < 0 {
|
|
return fmt.Errorf("VolumeSizeRequest must be at least 1Mi")
|
|
}
|
|
|
|
if ec.Spec.StorageSpec.VolumeSizeLimit.IsZero() {
|
|
logger.Info("VolumeSizeLimit is not set. Setting it to VolumeSizeRequest")
|
|
ec.Spec.StorageSpec.VolumeSizeLimit = ec.Spec.StorageSpec.VolumeSizeRequest
|
|
}
|
|
|
|
pvcObjectMeta := metav1.ObjectMeta{
|
|
Name: volumeName,
|
|
OwnerReferences: owners,
|
|
}
|
|
|
|
pvcResources := corev1.VolumeResourceRequirements{
|
|
Requests: corev1.ResourceList{
|
|
corev1.ResourceStorage: ec.Spec.StorageSpec.VolumeSizeRequest,
|
|
},
|
|
Limits: corev1.ResourceList{
|
|
corev1.ResourceStorage: ec.Spec.StorageSpec.VolumeSizeLimit,
|
|
},
|
|
}
|
|
|
|
switch ec.Spec.StorageSpec.AccessModes {
|
|
case corev1.ReadWriteOnce, "":
|
|
stsSpec.VolumeClaimTemplates = []corev1.PersistentVolumeClaim{
|
|
{
|
|
ObjectMeta: pvcObjectMeta,
|
|
Spec: corev1.PersistentVolumeClaimSpec{
|
|
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
|
|
Resources: pvcResources,
|
|
},
|
|
},
|
|
}
|
|
|
|
if ec.Spec.StorageSpec.StorageClassName != "" {
|
|
stsSpec.VolumeClaimTemplates[0].Spec.StorageClassName = &ec.Spec.StorageSpec.StorageClassName
|
|
}
|
|
case corev1.ReadWriteMany:
|
|
if ec.Spec.StorageSpec.PVCName == "" {
|
|
return fmt.Errorf("PVCName must be set when AccessModes is ReadWriteMany")
|
|
}
|
|
stsSpec.Template.Spec.Volumes = append(stsSpec.Template.Spec.Volumes, corev1.Volume{
|
|
Name: volumeName,
|
|
VolumeSource: corev1.VolumeSource{
|
|
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
|
|
ClaimName: ec.Spec.StorageSpec.PVCName,
|
|
},
|
|
},
|
|
})
|
|
default:
|
|
return fmt.Errorf("AccessMode %s is not supported", ec.Spec.StorageSpec.AccessModes)
|
|
}
|
|
}
|
|
|
|
logger.Info("Now creating/updating statefulset", "name", ec.Name, "namespace", ec.Namespace, "replicas", replicas)
|
|
_, err = controllerutil.CreateOrPatch(ctx, c, sts, func() error {
|
|
// Define or update the desired spec
|
|
sts.ObjectMeta = metav1.ObjectMeta{
|
|
Name: ec.Name,
|
|
Namespace: ec.Namespace,
|
|
OwnerReferences: owners,
|
|
}
|
|
sts.Spec = stsSpec
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
logger.Info("Stateful set created/updated", "name", ec.Name, "namespace", ec.Namespace, "replicas", replicas)
|
|
return nil
|
|
}
|
|
|
|
func waitForStatefulSetReady(ctx context.Context, logger logr.Logger, r client.Client, name, namespace string) error {
|
|
logger.Info("Now checking the readiness of statefulset", "name", name, "namespace", namespace)
|
|
|
|
backoff := wait.Backoff{
|
|
Duration: 3 * time.Second,
|
|
Factor: 2.0,
|
|
Steps: 5,
|
|
}
|
|
|
|
err := wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (bool, error) {
|
|
// Fetch the StatefulSet
|
|
sts, err := getStatefulSet(ctx, r, name, namespace)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
// Check if the StatefulSet is ready
|
|
if sts.Status.ReadyReplicas == *sts.Spec.Replicas {
|
|
// StatefulSet is ready
|
|
logger.Info("StatefulSet is ready", "name", name, "namespace", namespace)
|
|
return true, nil
|
|
}
|
|
|
|
// Log the current status
|
|
logger.Info("StatefulSet is not ready", "ReadyReplicas", strconv.Itoa(int(sts.Status.ReadyReplicas)), "DesiredReplicas", strconv.Itoa(int(*sts.Spec.Replicas)))
|
|
return false, nil
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("StatefulSet %s/%s did not become ready: %w", namespace, name, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func createHeadlessServiceIfNotExist(ctx context.Context, logger logr.Logger, c client.Client, ec *ecv1alpha1.EtcdCluster, scheme *runtime.Scheme) error {
|
|
service := &corev1.Service{}
|
|
err := c.Get(ctx, client.ObjectKey{Name: ec.Name, Namespace: ec.Namespace}, service)
|
|
|
|
if err != nil {
|
|
if k8serrors.IsNotFound(err) {
|
|
logger.Info("Headless service does not exist. Creating headless service")
|
|
owners, err1 := prepareOwnerReference(ec, scheme)
|
|
if err1 != nil {
|
|
return err1
|
|
}
|
|
labels := map[string]string{
|
|
"app": ec.Name,
|
|
"controller": ec.Name,
|
|
}
|
|
// Create the headless service
|
|
headlessSvc := &corev1.Service{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: ec.Name,
|
|
Namespace: ec.Namespace,
|
|
Labels: labels,
|
|
OwnerReferences: owners,
|
|
},
|
|
Spec: corev1.ServiceSpec{
|
|
ClusterIP: "None", // Key for headless service
|
|
Selector: labels,
|
|
},
|
|
}
|
|
if createErr := c.Create(ctx, headlessSvc); createErr != nil {
|
|
return fmt.Errorf("failed to create headless service: %w", createErr)
|
|
}
|
|
logger.Info("Headless service created successfully")
|
|
return nil
|
|
}
|
|
return fmt.Errorf("failed to get headless service: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func checkStatefulSetControlledByEtcdOperator(ec *ecv1alpha1.EtcdCluster, sts *appsv1.StatefulSet) error {
|
|
if !metav1.IsControlledBy(sts, ec) {
|
|
return fmt.Errorf("StatefulSet %s/%s is not controlled by EtcdCluster %s/%s", sts.Namespace, sts.Name, ec.Namespace, ec.Name)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func configMapNameForEtcdCluster(ec *ecv1alpha1.EtcdCluster) string {
|
|
return fmt.Sprintf("%s-state", ec.Name)
|
|
}
|
|
|
|
func peerEndpointForOrdinalIndex(ec *ecv1alpha1.EtcdCluster, index int) (string, string) {
|
|
name := fmt.Sprintf("%s-%d", ec.Name, index)
|
|
return name, fmt.Sprintf("http://%s-%d.%s.%s.svc.cluster.local:2380",
|
|
ec.Name, index, ec.Name, ec.Namespace)
|
|
}
|
|
|
|
func newEtcdClusterState(ec *ecv1alpha1.EtcdCluster, replica int) *corev1.ConfigMap {
|
|
// We always add members one by one, so the state is always
|
|
// "existing" if replica > 1.
|
|
|
|
state := etcdClusterStateNew
|
|
if replica > 1 {
|
|
state = etcdClusterStateExisting
|
|
}
|
|
|
|
var initialCluster []string
|
|
for i := 0; i < replica; i++ {
|
|
name, peerURL := peerEndpointForOrdinalIndex(ec, i)
|
|
initialCluster = append(initialCluster, fmt.Sprintf("%s=%s", name, peerURL))
|
|
}
|
|
|
|
return &corev1.ConfigMap{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: configMapNameForEtcdCluster(ec),
|
|
Namespace: ec.Namespace,
|
|
},
|
|
Data: map[string]string{
|
|
"ETCD_INITIAL_CLUSTER_STATE": string(state),
|
|
"ETCD_INITIAL_CLUSTER": strings.Join(initialCluster, ","),
|
|
"ETCD_DATA_DIR": etcdDataDir,
|
|
},
|
|
}
|
|
}
|
|
|
|
func applyEtcdClusterState(ctx context.Context, ec *ecv1alpha1.EtcdCluster, replica int, c client.Client, scheme *runtime.Scheme, logger logr.Logger) error {
|
|
cm := newEtcdClusterState(ec, replica)
|
|
|
|
// Create a new controller ref.
|
|
owners, err := prepareOwnerReference(ec, scheme)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
cm.OwnerReferences = owners
|
|
|
|
logger.Info("Now updating configmap", "name", configMapNameForEtcdCluster(ec), "namespace", ec.Namespace)
|
|
err = c.Get(ctx, types.NamespacedName{Name: configMapNameForEtcdCluster(ec), Namespace: ec.Namespace}, &corev1.ConfigMap{})
|
|
if err != nil {
|
|
if k8serrors.IsNotFound(err) {
|
|
createErr := c.Create(ctx, cm)
|
|
return createErr
|
|
}
|
|
return err
|
|
}
|
|
|
|
updateErr := c.Update(ctx, cm)
|
|
return updateErr
|
|
}
|
|
|
|
func clientEndpointForOrdinalIndex(sts *appsv1.StatefulSet, index int) string {
|
|
return fmt.Sprintf("http://%s-%d.%s.%s.svc.cluster.local:2379",
|
|
sts.Name, index, sts.Name, sts.Namespace)
|
|
}
|
|
|
|
func getStatefulSet(ctx context.Context, c client.Client, name, namespace string) (*appsv1.StatefulSet, error) {
|
|
sts := &appsv1.StatefulSet{}
|
|
err := c.Get(ctx, client.ObjectKey{Name: name, Namespace: namespace}, sts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return sts, nil
|
|
}
|
|
|
|
func clientEndpointsFromStatefulsets(sts *appsv1.StatefulSet) []string {
|
|
var endpoints []string
|
|
replica := int(*sts.Spec.Replicas)
|
|
if replica > 0 {
|
|
for i := 0; i < replica; i++ {
|
|
endpoints = append(endpoints, clientEndpointForOrdinalIndex(sts, i))
|
|
}
|
|
}
|
|
return endpoints
|
|
}
|
|
|
|
func areAllMembersHealthy(sts *appsv1.StatefulSet, logger logr.Logger) (bool, error) {
|
|
_, health, err := healthCheck(sts, logger)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
for _, h := range health {
|
|
if !h.Health {
|
|
return false, nil
|
|
}
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
// healthCheck returns a memberList and an error.
|
|
// If any member (excluding not yet started or already removed member)
|
|
// is unhealthy, the error won't be nil.
|
|
func healthCheck(sts *appsv1.StatefulSet, lg klog.Logger) (*clientv3.MemberListResponse, []etcdutils.EpHealth, error) {
|
|
replica := int(*sts.Spec.Replicas)
|
|
if replica == 0 {
|
|
return nil, nil, nil
|
|
}
|
|
|
|
endpoints := clientEndpointsFromStatefulsets(sts)
|
|
|
|
memberlistResp, err := etcdutils.MemberList(endpoints)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
memberCnt := len(memberlistResp.Members)
|
|
|
|
// Usually replica should be equal to memberCnt. If it isn't, then
|
|
// it means previous reconcile loop somehow interrupted right after
|
|
// adding (replica < memberCnt) or removing (replica > memberCnt)
|
|
// a member from the cluster. In that case, we shouldn't run health
|
|
// check on the not yet started or already removed member.
|
|
cnt := min(replica, memberCnt)
|
|
|
|
lg.Info("health checking", "replica", replica, "len(members)", memberCnt)
|
|
endpoints = endpoints[:cnt]
|
|
|
|
healthInfos, err := etcdutils.ClusterHealth(endpoints)
|
|
if err != nil {
|
|
return memberlistResp, nil, err
|
|
}
|
|
|
|
for _, healthInfo := range healthInfos {
|
|
if !healthInfo.Health {
|
|
// TODO: also update metrics?
|
|
return memberlistResp, healthInfos, errors.New(healthInfo.String())
|
|
}
|
|
lg.Info(healthInfo.String())
|
|
}
|
|
|
|
return memberlistResp, healthInfos, nil
|
|
}
|