package etcd import ( "fmt" "strings" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" kuberuntime "k8s.io/apimachinery/pkg/runtime" clientset "k8s.io/client-go/kubernetes" clientsetscheme "k8s.io/client-go/kubernetes/scheme" operatorv1alpha1 "github.com/karmada-io/karmada/operator/pkg/apis/operator/v1alpha1" "github.com/karmada-io/karmada/operator/pkg/constants" "github.com/karmada-io/karmada/operator/pkg/util" "github.com/karmada-io/karmada/operator/pkg/util/apiclient" "github.com/karmada-io/karmada/operator/pkg/util/patcher" ) // EnsureKarmadaEtcd creates etcd StatefulSet and service resource. func EnsureKarmadaEtcd(client clientset.Interface, cfg *operatorv1alpha1.LocalEtcd, name, namespace string) error { if err := installKarmadaEtcd(client, name, namespace, cfg); err != nil { return err } return createEtcdService(client, name, namespace) } func installKarmadaEtcd(client clientset.Interface, name, namespace string, cfg *operatorv1alpha1.LocalEtcd) error { // if the number of etcd is greater than one, we need to concatenate the peerURL for each member cluster. // memberName is podName generated by etcd statefuleset: ${statefulsetName}-index // memberPeerURL uses the etcd peer headless service name: ${podName}.${serviceName}.${namespace}.svc.cluster.local:2380 initialClusters := make([]string, *cfg.Replicas) for index := range initialClusters { memberName := fmt.Sprintf("%s-%d", util.KarmadaEtcdName(name), index) // build etcd member cluster peer url memberPeerURL := fmt.Sprintf("http://%s.%s.%s.svc.cluster.local:%v", memberName, util.KarmadaEtcdName(name), namespace, constants.EtcdListenPeerPort, ) initialClusters[index] = fmt.Sprintf("%s=%s", memberName, memberPeerURL) } etcdStatefuleSetBytes, err := util.ParseTemplate(KarmadaEtcdStatefulSet, struct { StatefulSetName, Namespace, Image string EtcdClientService, CertsSecretName string EtcdPeerServiceName, InitialCluster string Replicas, EtcdListenClientPort, EtcdListenPeerPort int32 }{ StatefulSetName: util.KarmadaEtcdName(name), Namespace: namespace, Image: cfg.Image.Name(), EtcdClientService: util.KarmadaEtcdClientName(name), CertsSecretName: util.EtcdCertSecretName(name), EtcdPeerServiceName: util.KarmadaEtcdName(name), InitialCluster: strings.Join(initialClusters, ","), Replicas: *cfg.Replicas, EtcdListenClientPort: constants.EtcdListenClientPort, EtcdListenPeerPort: constants.EtcdListenPeerPort, }) if err != nil { return fmt.Errorf("error when parsing Etcd statefuelset template: %w", err) } etcdStatefulSet := &appsv1.StatefulSet{} if err := kuberuntime.DecodeInto(clientsetscheme.Codecs.UniversalDecoder(), etcdStatefuleSetBytes, etcdStatefulSet); err != nil { return fmt.Errorf("error when decoding Etcd StatefulSet: %w", err) } patcher.NewPatcher().WithAnnotations(cfg.Annotations).WithLabels(cfg.Labels).ForStatefulSet(etcdStatefulSet) if err := apiclient.CreateOrUpdateStatefulSet(client, etcdStatefulSet); err != nil { return fmt.Errorf("error when creating Etcd statefulset, err: %w", err) } return nil } func createEtcdService(client clientset.Interface, name, namespace string) error { etcdServicePeerBytes, err := util.ParseTemplate(KarmadaEtcdPeerService, struct { ServiceName, Namespace string EtcdListenClientPort, EtcdListenPeerPort int32 }{ ServiceName: util.KarmadaEtcdName(name), Namespace: namespace, EtcdListenClientPort: constants.EtcdListenClientPort, EtcdListenPeerPort: constants.EtcdListenPeerPort, }) if err != nil { return fmt.Errorf("error when parsing Etcd client serive template: %w", err) } etcdPeerService := &corev1.Service{} if err := kuberuntime.DecodeInto(clientsetscheme.Codecs.UniversalDecoder(), etcdServicePeerBytes, etcdPeerService); err != nil { return fmt.Errorf("error when decoding Etcd client service: %w", err) } if err := apiclient.CreateOrUpdateService(client, etcdPeerService); err != nil { return fmt.Errorf("error when creating etcd client service, err: %w", err) } etcdClientServiceBytes, err := util.ParseTemplate(KarmadaEtcdClientService, struct { ServiceName, Namespace string EtcdListenClientPort int32 }{ ServiceName: util.KarmadaEtcdClientName(name), Namespace: namespace, EtcdListenClientPort: constants.EtcdListenClientPort, }) if err != nil { return fmt.Errorf("error when parsing Etcd client serive template: %w", err) } etcdClientService := &corev1.Service{} if err := kuberuntime.DecodeInto(clientsetscheme.Codecs.UniversalDecoder(), etcdClientServiceBytes, etcdClientService); err != nil { return fmt.Errorf("err when decoding Etcd client service: %w", err) } if err := apiclient.CreateOrUpdateService(client, etcdClientService); err != nil { return fmt.Errorf("err when creating etcd client service, err: %w", err) } return nil }