karmada/operator/pkg/controlplane/etcd/etcd.go

130 lines
5.0 KiB
Go

package etcd
import (
"fmt"
"strings"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
kuberuntime "k8s.io/apimachinery/pkg/runtime"
clientset "k8s.io/client-go/kubernetes"
clientsetscheme "k8s.io/client-go/kubernetes/scheme"
operatorv1alpha1 "github.com/karmada-io/karmada/operator/pkg/apis/operator/v1alpha1"
"github.com/karmada-io/karmada/operator/pkg/constants"
"github.com/karmada-io/karmada/operator/pkg/util"
"github.com/karmada-io/karmada/operator/pkg/util/apiclient"
"github.com/karmada-io/karmada/operator/pkg/util/patcher"
)
// EnsureKarmadaEtcd creates etcd StatefulSet and service resource.
func EnsureKarmadaEtcd(client clientset.Interface, cfg *operatorv1alpha1.LocalEtcd, name, namespace string) error {
if err := installKarmadaEtcd(client, name, namespace, cfg); err != nil {
return err
}
return createEtcdService(client, name, namespace)
}
func installKarmadaEtcd(client clientset.Interface, name, namespace string, cfg *operatorv1alpha1.LocalEtcd) error {
// if the number of etcd is greater than one, we need to concatenate the peerURL for each member cluster.
// memberName is podName generated by etcd statefuleset: ${statefulsetName}-index
// memberPeerURL uses the etcd peer headless service name: ${podName}.${serviceName}.${namespace}.svc.cluster.local:2380
initialClusters := make([]string, *cfg.Replicas)
for index := range initialClusters {
memberName := fmt.Sprintf("%s-%d", util.KarmadaEtcdName(name), index)
// build etcd member cluster peer url
memberPeerURL := fmt.Sprintf("http://%s.%s.%s.svc.cluster.local:%v",
memberName,
util.KarmadaEtcdName(name),
namespace,
constants.EtcdListenPeerPort,
)
initialClusters[index] = fmt.Sprintf("%s=%s", memberName, memberPeerURL)
}
etcdStatefuleSetBytes, err := util.ParseTemplate(KarmadaEtcdStatefulSet, struct {
StatefulSetName, Namespace, Image, EtcdClientService string
CertsSecretName, EtcdPeerServiceName string
InitialCluster, EtcdDataVolumeName string
Replicas, EtcdListenClientPort, EtcdListenPeerPort int32
}{
StatefulSetName: util.KarmadaEtcdName(name),
Namespace: namespace,
Image: cfg.Image.Name(),
EtcdClientService: util.KarmadaEtcdClientName(name),
CertsSecretName: util.EtcdCertSecretName(name),
EtcdPeerServiceName: util.KarmadaEtcdName(name),
EtcdDataVolumeName: constants.EtcdDataVolumeName,
InitialCluster: strings.Join(initialClusters, ","),
Replicas: *cfg.Replicas,
EtcdListenClientPort: constants.EtcdListenClientPort,
EtcdListenPeerPort: constants.EtcdListenPeerPort,
})
if err != nil {
return fmt.Errorf("error when parsing Etcd statefuelset template: %w", err)
}
etcdStatefulSet := &appsv1.StatefulSet{}
if err := kuberuntime.DecodeInto(clientsetscheme.Codecs.UniversalDecoder(), etcdStatefuleSetBytes, etcdStatefulSet); err != nil {
return fmt.Errorf("error when decoding Etcd StatefulSet: %w", err)
}
patcher.NewPatcher().WithAnnotations(cfg.Annotations).WithLabels(cfg.Labels).
WithVolumeData(cfg.VolumeData).ForStatefulSet(etcdStatefulSet)
if err := apiclient.CreateOrUpdateStatefulSet(client, etcdStatefulSet); err != nil {
return fmt.Errorf("error when creating Etcd statefulset, err: %w", err)
}
return nil
}
func createEtcdService(client clientset.Interface, name, namespace string) error {
etcdServicePeerBytes, err := util.ParseTemplate(KarmadaEtcdPeerService, struct {
ServiceName, Namespace string
EtcdListenClientPort, EtcdListenPeerPort int32
}{
ServiceName: util.KarmadaEtcdName(name),
Namespace: namespace,
EtcdListenClientPort: constants.EtcdListenClientPort,
EtcdListenPeerPort: constants.EtcdListenPeerPort,
})
if err != nil {
return fmt.Errorf("error when parsing Etcd client serive template: %w", err)
}
etcdPeerService := &corev1.Service{}
if err := kuberuntime.DecodeInto(clientsetscheme.Codecs.UniversalDecoder(), etcdServicePeerBytes, etcdPeerService); err != nil {
return fmt.Errorf("error when decoding Etcd client service: %w", err)
}
if err := apiclient.CreateOrUpdateService(client, etcdPeerService); err != nil {
return fmt.Errorf("error when creating etcd client service, err: %w", err)
}
etcdClientServiceBytes, err := util.ParseTemplate(KarmadaEtcdClientService, struct {
ServiceName, Namespace string
EtcdListenClientPort int32
}{
ServiceName: util.KarmadaEtcdClientName(name),
Namespace: namespace,
EtcdListenClientPort: constants.EtcdListenClientPort,
})
if err != nil {
return fmt.Errorf("error when parsing Etcd client serive template: %w", err)
}
etcdClientService := &corev1.Service{}
if err := kuberuntime.DecodeInto(clientsetscheme.Codecs.UniversalDecoder(), etcdClientServiceBytes, etcdClientService); err != nil {
return fmt.Errorf("err when decoding Etcd client service: %w", err)
}
if err := apiclient.CreateOrUpdateService(client, etcdClientService); err != nil {
return fmt.Errorf("err when creating etcd client service, err: %w", err)
}
return nil
}